awslogs: fix non-blocking log drop bug

Previously, the AWSLogs driver attempted to implement
non-blocking itself. Non-blocking is supposed to
implemented solely by the Docker RingBuffer that
wraps the log driver.

Please see issue and explanation here:
https://github.com/moby/moby/issues/45217

Signed-off-by: Wesley Pettit <wppttt@amazon.com>
(cherry picked from commit c8f8d11ac4)
This commit is contained in:
Wesley Pettit 2023-03-29 16:09:07 -07:00
parent de57aecf4a
commit ad45ece6fe
No known key found for this signature in database
GPG key ID: 7D0AF707490C3B65
2 changed files with 6 additions and 44 deletions

View file

@ -71,7 +71,6 @@ type logStream struct {
logStreamName string
logGroupName string
logCreateGroup bool
logNonBlocking bool
forceFlushInterval time.Duration
multilinePattern *regexp.Regexp
client api
@ -85,7 +84,6 @@ type logStreamConfig struct {
logStreamName string
logGroupName string
logCreateGroup bool
logNonBlocking bool
forceFlushInterval time.Duration
maxBufferedEvents int
multilinePattern *regexp.Regexp
@ -147,11 +145,12 @@ func New(info logger.Info) (logger.Logger, error) {
return nil, err
}
logNonBlocking := info.Config["mode"] == "non-blocking"
containerStream := &logStream{
logStreamName: containerStreamConfig.logStreamName,
logGroupName: containerStreamConfig.logGroupName,
logCreateGroup: containerStreamConfig.logCreateGroup,
logNonBlocking: containerStreamConfig.logNonBlocking,
forceFlushInterval: containerStreamConfig.forceFlushInterval,
multilinePattern: containerStreamConfig.multilinePattern,
client: client,
@ -159,7 +158,7 @@ func New(info logger.Info) (logger.Logger, error) {
}
creationDone := make(chan bool)
if containerStream.logNonBlocking {
if logNonBlocking {
go func() {
backoff := 1
maxBackoff := 32
@ -215,8 +214,6 @@ func newStreamConfig(info logger.Info) (*logStreamConfig, error) {
}
}
logNonBlocking := info.Config["mode"] == "non-blocking"
forceFlushInterval := defaultForceFlushInterval
if info.Config[forceFlushIntervalKey] != "" {
forceFlushIntervalAsInt, err := strconv.Atoi(info.Config[forceFlushIntervalKey])
@ -247,7 +244,6 @@ func newStreamConfig(info logger.Info) (*logStreamConfig, error) {
logStreamName: logStreamName,
logGroupName: logGroupName,
logCreateGroup: logCreateGroup,
logNonBlocking: logNonBlocking,
forceFlushInterval: forceFlushInterval,
maxBufferedEvents: maxBufferedEvents,
multilinePattern: multilinePattern,
@ -412,14 +408,6 @@ func (l *logStream) Log(msg *logger.Message) error {
if l.closed {
return errors.New("awslogs is closed")
}
if l.logNonBlocking {
select {
case l.messages <- msg:
return nil
default:
return errors.New("awslogs buffer is full")
}
}
l.messages <- msg
return nil
}

View file

@ -325,42 +325,16 @@ func TestLogBlocking(t *testing.T) {
}
}
func TestLogNonBlockingBufferEmpty(t *testing.T) {
func TestLogBufferEmpty(t *testing.T) {
mockClient := newMockClient()
stream := &logStream{
client: mockClient,
messages: make(chan *logger.Message, 1),
logNonBlocking: true,
client: mockClient,
messages: make(chan *logger.Message, 1),
}
err := stream.Log(&logger.Message{})
assert.NilError(t, err)
}
func TestLogNonBlockingBufferFull(t *testing.T) {
mockClient := newMockClient()
stream := &logStream{
client: mockClient,
messages: make(chan *logger.Message, 1),
logNonBlocking: true,
}
stream.messages <- &logger.Message{}
errorCh := make(chan error, 1)
started := make(chan bool)
go func() {
started <- true
err := stream.Log(&logger.Message{})
errorCh <- err
}()
<-started
select {
case err := <-errorCh:
if err == nil {
t.Fatal("Expected non-nil error")
}
case <-time.After(30 * time.Second):
t.Fatal("Expected Log call to not block")
}
}
func TestPublishBatchSuccess(t *testing.T) {
mockClient := newMockClient()
stream := &logStream{