Merge pull request #45227 from PettitWesley/fix-awslogs-non-blocking

awslogs: fix non-blocking log drop bug
This commit is contained in:
Sebastiaan van Stijn 2023-04-15 00:56:57 +02:00 committed by GitHub
commit 5bf405b2af
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 6 additions and 42 deletions

View file

@ -73,7 +73,6 @@ type logStream struct {
logGroupName string
logCreateGroup bool
logCreateStream bool
logNonBlocking bool
forceFlushInterval time.Duration
multilinePattern *regexp.Regexp
client api
@ -88,7 +87,6 @@ type logStreamConfig struct {
logGroupName string
logCreateGroup bool
logCreateStream bool
logNonBlocking bool
forceFlushInterval time.Duration
maxBufferedEvents int
multilinePattern *regexp.Regexp
@ -150,12 +148,13 @@ func New(info logger.Info) (logger.Logger, error) {
return nil, err
}
logNonBlocking := info.Config["mode"] == "non-blocking"
containerStream := &logStream{
logStreamName: containerStreamConfig.logStreamName,
logGroupName: containerStreamConfig.logGroupName,
logCreateGroup: containerStreamConfig.logCreateGroup,
logCreateStream: containerStreamConfig.logCreateStream,
logNonBlocking: containerStreamConfig.logNonBlocking,
forceFlushInterval: containerStreamConfig.forceFlushInterval,
multilinePattern: containerStreamConfig.multilinePattern,
client: client,
@ -163,7 +162,7 @@ func New(info logger.Info) (logger.Logger, error) {
}
creationDone := make(chan bool)
if containerStream.logNonBlocking {
if logNonBlocking {
go func() {
backoff := 1
maxBackoff := 32
@ -219,8 +218,6 @@ func newStreamConfig(info logger.Info) (*logStreamConfig, error) {
}
}
logNonBlocking := info.Config["mode"] == "non-blocking"
forceFlushInterval := defaultForceFlushInterval
if info.Config[forceFlushIntervalKey] != "" {
forceFlushIntervalAsInt, err := strconv.Atoi(info.Config[forceFlushIntervalKey])
@ -259,7 +256,6 @@ func newStreamConfig(info logger.Info) (*logStreamConfig, error) {
logGroupName: logGroupName,
logCreateGroup: logCreateGroup,
logCreateStream: logCreateStream,
logNonBlocking: logNonBlocking,
forceFlushInterval: forceFlushInterval,
maxBufferedEvents: maxBufferedEvents,
multilinePattern: multilinePattern,
@ -437,14 +433,6 @@ func (l *logStream) Log(msg *logger.Message) error {
if l.closed {
return errors.New("awslogs is closed")
}
if l.logNonBlocking {
select {
case l.messages <- msg:
return nil
default:
return errors.New("awslogs buffer is full")
}
}
l.messages <- msg
return nil
}

View file

@ -404,40 +404,16 @@ func TestLogBlocking(t *testing.T) {
}
}
func TestLogNonBlockingBufferEmpty(t *testing.T) {
func TestLogBufferEmpty(t *testing.T) {
mockClient := &mockClient{}
stream := &logStream{
client: mockClient,
messages: make(chan *logger.Message, 1),
logNonBlocking: true,
client: mockClient,
messages: make(chan *logger.Message, 1),
}
err := stream.Log(&logger.Message{})
assert.NilError(t, err)
}
func TestLogNonBlockingBufferFull(t *testing.T) {
mockClient := &mockClient{}
stream := &logStream{
client: mockClient,
messages: make(chan *logger.Message, 1),
logNonBlocking: true,
}
stream.messages <- &logger.Message{}
errorCh := make(chan error, 1)
started := make(chan bool)
go func() {
started <- true
err := stream.Log(&logger.Message{})
errorCh <- err
}()
<-started
select {
case err := <-errorCh:
assert.Check(t, err != nil)
case <-time.After(30 * time.Second):
t.Fatal("Expected Log call to not block")
}
}
func TestPublishBatchSuccess(t *testing.T) {
mockClient := &mockClient{}
stream := &logStream{