Browse Source

Merge pull request #45349 from PettitWesley/awslogs-non-blocking-bug-20.10

[20.10 backport] awslogs: fix non-blocking log drop bug
Bjorn Neergaard 2 years ago
parent
commit
ac1cd0423f

+ 3 - 15
daemon/logger/awslogs/cloudwatchlogs.go

@@ -71,7 +71,6 @@ type logStream struct {
 	logStreamName      string
 	logGroupName       string
 	logCreateGroup     bool
-	logNonBlocking     bool
 	forceFlushInterval time.Duration
 	multilinePattern   *regexp.Regexp
 	client             api
@@ -85,7 +84,6 @@ type logStreamConfig struct {
 	logStreamName      string
 	logGroupName       string
 	logCreateGroup     bool
-	logNonBlocking     bool
 	forceFlushInterval time.Duration
 	maxBufferedEvents  int
 	multilinePattern   *regexp.Regexp
@@ -147,11 +145,12 @@ func New(info logger.Info) (logger.Logger, error) {
 		return nil, err
 	}
 
+	logNonBlocking := info.Config["mode"] == "non-blocking"
+
 	containerStream := &logStream{
 		logStreamName:      containerStreamConfig.logStreamName,
 		logGroupName:       containerStreamConfig.logGroupName,
 		logCreateGroup:     containerStreamConfig.logCreateGroup,
-		logNonBlocking:     containerStreamConfig.logNonBlocking,
 		forceFlushInterval: containerStreamConfig.forceFlushInterval,
 		multilinePattern:   containerStreamConfig.multilinePattern,
 		client:             client,
@@ -159,7 +158,7 @@ func New(info logger.Info) (logger.Logger, error) {
 	}
 
 	creationDone := make(chan bool)
-	if containerStream.logNonBlocking {
+	if logNonBlocking {
 		go func() {
 			backoff := 1
 			maxBackoff := 32
@@ -215,8 +214,6 @@ func newStreamConfig(info logger.Info) (*logStreamConfig, error) {
 		}
 	}
 
-	logNonBlocking := info.Config["mode"] == "non-blocking"
-
 	forceFlushInterval := defaultForceFlushInterval
 	if info.Config[forceFlushIntervalKey] != "" {
 		forceFlushIntervalAsInt, err := strconv.Atoi(info.Config[forceFlushIntervalKey])
@@ -247,7 +244,6 @@ func newStreamConfig(info logger.Info) (*logStreamConfig, error) {
 		logStreamName:      logStreamName,
 		logGroupName:       logGroupName,
 		logCreateGroup:     logCreateGroup,
-		logNonBlocking:     logNonBlocking,
 		forceFlushInterval: forceFlushInterval,
 		maxBufferedEvents:  maxBufferedEvents,
 		multilinePattern:   multilinePattern,
@@ -412,14 +408,6 @@ func (l *logStream) Log(msg *logger.Message) error {
 	if l.closed {
 		return errors.New("awslogs is closed")
 	}
-	if l.logNonBlocking {
-		select {
-		case l.messages <- msg:
-			return nil
-		default:
-			return errors.New("awslogs buffer is full")
-		}
-	}
 	l.messages <- msg
 	return nil
 }

+ 3 - 29
daemon/logger/awslogs/cloudwatchlogs_test.go

@@ -325,42 +325,16 @@ func TestLogBlocking(t *testing.T) {
 	}
 }
 
-func TestLogNonBlockingBufferEmpty(t *testing.T) {
+func TestLogBufferEmpty(t *testing.T) {
 	mockClient := newMockClient()
 	stream := &logStream{
-		client:         mockClient,
-		messages:       make(chan *logger.Message, 1),
-		logNonBlocking: true,
+		client:   mockClient,
+		messages: make(chan *logger.Message, 1),
 	}
 	err := stream.Log(&logger.Message{})
 	assert.NilError(t, err)
 }
 
-func TestLogNonBlockingBufferFull(t *testing.T) {
-	mockClient := newMockClient()
-	stream := &logStream{
-		client:         mockClient,
-		messages:       make(chan *logger.Message, 1),
-		logNonBlocking: true,
-	}
-	stream.messages <- &logger.Message{}
-	errorCh := make(chan error, 1)
-	started := make(chan bool)
-	go func() {
-		started <- true
-		err := stream.Log(&logger.Message{})
-		errorCh <- err
-	}()
-	<-started
-	select {
-	case err := <-errorCh:
-		if err == nil {
-			t.Fatal("Expected non-nil error")
-		}
-	case <-time.After(30 * time.Second):
-		t.Fatal("Expected Log call to not block")
-	}
-}
 func TestPublishBatchSuccess(t *testing.T) {
 	mockClient := newMockClient()
 	stream := &logStream{