瀏覽代碼

Merge pull request #45350 from PettitWesley/awslogs-non-blocking-bug-23.0

[23.0 backport] awslogs: fix non-blocking log drop bug
Sebastiaan van Stijn 2 年之前
父節點
當前提交
8fdca288c5
共有 2 個文件被更改,包括 6 次插入42 次删除
  1. 3 15
      daemon/logger/awslogs/cloudwatchlogs.go
  2. 3 27
      daemon/logger/awslogs/cloudwatchlogs_test.go

+ 3 - 15
daemon/logger/awslogs/cloudwatchlogs.go

@@ -78,7 +78,6 @@ type logStream struct {
 	logGroupName       string
 	logGroupName       string
 	logCreateGroup     bool
 	logCreateGroup     bool
 	logCreateStream    bool
 	logCreateStream    bool
-	logNonBlocking     bool
 	forceFlushInterval time.Duration
 	forceFlushInterval time.Duration
 	multilinePattern   *regexp.Regexp
 	multilinePattern   *regexp.Regexp
 	client             api
 	client             api
@@ -93,7 +92,6 @@ type logStreamConfig struct {
 	logGroupName       string
 	logGroupName       string
 	logCreateGroup     bool
 	logCreateGroup     bool
 	logCreateStream    bool
 	logCreateStream    bool
-	logNonBlocking     bool
 	forceFlushInterval time.Duration
 	forceFlushInterval time.Duration
 	maxBufferedEvents  int
 	maxBufferedEvents  int
 	multilinePattern   *regexp.Regexp
 	multilinePattern   *regexp.Regexp
@@ -155,12 +153,13 @@ func New(info logger.Info) (logger.Logger, error) {
 		return nil, err
 		return nil, err
 	}
 	}
 
 
+	logNonBlocking := info.Config["mode"] == "non-blocking"
+
 	containerStream := &logStream{
 	containerStream := &logStream{
 		logStreamName:      containerStreamConfig.logStreamName,
 		logStreamName:      containerStreamConfig.logStreamName,
 		logGroupName:       containerStreamConfig.logGroupName,
 		logGroupName:       containerStreamConfig.logGroupName,
 		logCreateGroup:     containerStreamConfig.logCreateGroup,
 		logCreateGroup:     containerStreamConfig.logCreateGroup,
 		logCreateStream:    containerStreamConfig.logCreateStream,
 		logCreateStream:    containerStreamConfig.logCreateStream,
-		logNonBlocking:     containerStreamConfig.logNonBlocking,
 		forceFlushInterval: containerStreamConfig.forceFlushInterval,
 		forceFlushInterval: containerStreamConfig.forceFlushInterval,
 		multilinePattern:   containerStreamConfig.multilinePattern,
 		multilinePattern:   containerStreamConfig.multilinePattern,
 		client:             client,
 		client:             client,
@@ -168,7 +167,7 @@ func New(info logger.Info) (logger.Logger, error) {
 	}
 	}
 
 
 	creationDone := make(chan bool)
 	creationDone := make(chan bool)
-	if containerStream.logNonBlocking {
+	if logNonBlocking {
 		go func() {
 		go func() {
 			backoff := 1
 			backoff := 1
 			maxBackoff := 32
 			maxBackoff := 32
@@ -224,8 +223,6 @@ func newStreamConfig(info logger.Info) (*logStreamConfig, error) {
 		}
 		}
 	}
 	}
 
 
-	logNonBlocking := info.Config["mode"] == "non-blocking"
-
 	forceFlushInterval := defaultForceFlushInterval
 	forceFlushInterval := defaultForceFlushInterval
 	if info.Config[forceFlushIntervalKey] != "" {
 	if info.Config[forceFlushIntervalKey] != "" {
 		forceFlushIntervalAsInt, err := strconv.Atoi(info.Config[forceFlushIntervalKey])
 		forceFlushIntervalAsInt, err := strconv.Atoi(info.Config[forceFlushIntervalKey])
@@ -264,7 +261,6 @@ func newStreamConfig(info logger.Info) (*logStreamConfig, error) {
 		logGroupName:       logGroupName,
 		logGroupName:       logGroupName,
 		logCreateGroup:     logCreateGroup,
 		logCreateGroup:     logCreateGroup,
 		logCreateStream:    logCreateStream,
 		logCreateStream:    logCreateStream,
-		logNonBlocking:     logNonBlocking,
 		forceFlushInterval: forceFlushInterval,
 		forceFlushInterval: forceFlushInterval,
 		maxBufferedEvents:  maxBufferedEvents,
 		maxBufferedEvents:  maxBufferedEvents,
 		multilinePattern:   multilinePattern,
 		multilinePattern:   multilinePattern,
@@ -439,14 +435,6 @@ func (l *logStream) Log(msg *logger.Message) error {
 	if l.closed {
 	if l.closed {
 		return errors.New("awslogs is closed")
 		return errors.New("awslogs is closed")
 	}
 	}
-	if l.logNonBlocking {
-		select {
-		case l.messages <- msg:
-			return nil
-		default:
-			return errors.New("awslogs buffer is full")
-		}
-	}
 	l.messages <- msg
 	l.messages <- msg
 	return nil
 	return nil
 }
 }

+ 3 - 27
daemon/logger/awslogs/cloudwatchlogs_test.go

@@ -390,40 +390,16 @@ func TestLogBlocking(t *testing.T) {
 	}
 	}
 }
 }
 
 
-func TestLogNonBlockingBufferEmpty(t *testing.T) {
+func TestLogBufferEmpty(t *testing.T) {
 	mockClient := &mockClient{}
 	mockClient := &mockClient{}
 	stream := &logStream{
 	stream := &logStream{
-		client:         mockClient,
-		messages:       make(chan *logger.Message, 1),
-		logNonBlocking: true,
+		client:   mockClient,
+		messages: make(chan *logger.Message, 1),
 	}
 	}
 	err := stream.Log(&logger.Message{})
 	err := stream.Log(&logger.Message{})
 	assert.NilError(t, err)
 	assert.NilError(t, err)
 }
 }
 
 
-func TestLogNonBlockingBufferFull(t *testing.T) {
-	mockClient := &mockClient{}
-	stream := &logStream{
-		client:         mockClient,
-		messages:       make(chan *logger.Message, 1),
-		logNonBlocking: true,
-	}
-	stream.messages <- &logger.Message{}
-	errorCh := make(chan error, 1)
-	started := make(chan bool)
-	go func() {
-		started <- true
-		err := stream.Log(&logger.Message{})
-		errorCh <- err
-	}()
-	<-started
-	select {
-	case err := <-errorCh:
-		assert.Check(t, err != nil)
-	case <-time.After(30 * time.Second):
-		t.Fatal("Expected Log call to not block")
-	}
-}
 func TestPublishBatchSuccess(t *testing.T) {
 func TestPublishBatchSuccess(t *testing.T) {
 	mockClient := &mockClient{}
 	mockClient := &mockClient{}
 	stream := &logStream{
 	stream := &logStream{