awslogs: fix non-blocking log drop bug

Previously, the AWSLogs driver attempted to implement
non-blocking itself. Non-blocking is supposed to
implemented solely by the Docker RingBuffer that
wraps the log driver.

Please see issue and explanation here:
https://github.com/moby/moby/issues/45217

Signed-off-by: Wesley Pettit <wppttt@amazon.com>
(cherry picked from commit c8f8d11ac4)
This commit is contained in:
Wesley Pettit 2023-03-29 16:09:07 -07:00
parent cbce331930
commit 28b694d32d
No known key found for this signature in database
GPG key ID: 7D0AF707490C3B65
2 changed files with 6 additions and 42 deletions

View file

@ -78,7 +78,6 @@ type logStream struct {
logGroupName string
logCreateGroup bool
logCreateStream bool
logNonBlocking bool
forceFlushInterval time.Duration
multilinePattern *regexp.Regexp
client api
@ -93,7 +92,6 @@ type logStreamConfig struct {
logGroupName string
logCreateGroup bool
logCreateStream bool
logNonBlocking bool
forceFlushInterval time.Duration
maxBufferedEvents int
multilinePattern *regexp.Regexp
@ -155,12 +153,13 @@ func New(info logger.Info) (logger.Logger, error) {
return nil, err
}
logNonBlocking := info.Config["mode"] == "non-blocking"
containerStream := &logStream{
logStreamName: containerStreamConfig.logStreamName,
logGroupName: containerStreamConfig.logGroupName,
logCreateGroup: containerStreamConfig.logCreateGroup,
logCreateStream: containerStreamConfig.logCreateStream,
logNonBlocking: containerStreamConfig.logNonBlocking,
forceFlushInterval: containerStreamConfig.forceFlushInterval,
multilinePattern: containerStreamConfig.multilinePattern,
client: client,
@ -168,7 +167,7 @@ func New(info logger.Info) (logger.Logger, error) {
}
creationDone := make(chan bool)
if containerStream.logNonBlocking {
if logNonBlocking {
go func() {
backoff := 1
maxBackoff := 32
@ -224,8 +223,6 @@ func newStreamConfig(info logger.Info) (*logStreamConfig, error) {
}
}
logNonBlocking := info.Config["mode"] == "non-blocking"
forceFlushInterval := defaultForceFlushInterval
if info.Config[forceFlushIntervalKey] != "" {
forceFlushIntervalAsInt, err := strconv.Atoi(info.Config[forceFlushIntervalKey])
@ -264,7 +261,6 @@ func newStreamConfig(info logger.Info) (*logStreamConfig, error) {
logGroupName: logGroupName,
logCreateGroup: logCreateGroup,
logCreateStream: logCreateStream,
logNonBlocking: logNonBlocking,
forceFlushInterval: forceFlushInterval,
maxBufferedEvents: maxBufferedEvents,
multilinePattern: multilinePattern,
@ -439,14 +435,6 @@ func (l *logStream) Log(msg *logger.Message) error {
if l.closed {
return errors.New("awslogs is closed")
}
if l.logNonBlocking {
select {
case l.messages <- msg:
return nil
default:
return errors.New("awslogs buffer is full")
}
}
l.messages <- msg
return nil
}

View file

@ -390,40 +390,16 @@ func TestLogBlocking(t *testing.T) {
}
}
func TestLogNonBlockingBufferEmpty(t *testing.T) {
func TestLogBufferEmpty(t *testing.T) {
mockClient := &mockClient{}
stream := &logStream{
client: mockClient,
messages: make(chan *logger.Message, 1),
logNonBlocking: true,
client: mockClient,
messages: make(chan *logger.Message, 1),
}
err := stream.Log(&logger.Message{})
assert.NilError(t, err)
}
func TestLogNonBlockingBufferFull(t *testing.T) {
mockClient := &mockClient{}
stream := &logStream{
client: mockClient,
messages: make(chan *logger.Message, 1),
logNonBlocking: true,
}
stream.messages <- &logger.Message{}
errorCh := make(chan error, 1)
started := make(chan bool)
go func() {
started <- true
err := stream.Log(&logger.Message{})
errorCh <- err
}()
<-started
select {
case err := <-errorCh:
assert.Check(t, err != nil)
case <-time.After(30 * time.Second):
t.Fatal("Expected Log call to not block")
}
}
func TestPublishBatchSuccess(t *testing.T) {
mockClient := &mockClient{}
stream := &logStream{