d/logger/journald: fix data race in test harness

The Go race detector was detecting a data race when running the
TestLogRead/Follow/Concurrent test against the journald logging driver.
The race was in the test harness, specifically syncLogger. The waitOn
field would be reassigned each time a log entry is sent to the journal,
which is not concurrency-safe. Make it concurrency-safe using the same
patterns that are used in the log follower implementation to synchronize
with the logger.

Signed-off-by: Cory Snider <csnider@mirantis.com>
(cherry picked from commit 982e777d49)
Signed-off-by: Cory Snider <csnider@mirantis.com>
This commit is contained in:
Cory Snider 2024-01-30 14:34:15 -05:00
parent f2d0d87c46
commit f8a8cdaf9e

View file

@ -3,6 +3,7 @@
package journald // import "github.com/docker/docker/daemon/logger/journald"
import (
"sync"
"testing"
"time"
@ -46,32 +47,37 @@ func TestLogRead(t *testing.T) {
assert.NilError(t, rotatedJournal.Send("a log message from a totally different process in the active journal", journal.PriInfo, nil))
return func(t *testing.T) logger.Logger {
l, err := new(info)
assert.NilError(t, err)
l.journalReadDir = journalDir
sl := &syncLogger{journald: l, waiters: map[uint64]chan<- struct{}{}}
s := make(chan sendit, 100)
t.Cleanup(func() { close(s) })
go func() {
for m := range s {
<-m.after
activeJournal.Send(m.message, m.priority, m.vars)
if m.sent != nil {
close(m.sent)
sl.mu.Lock()
sl.sent++
if notify, ok := sl.waiters[sl.sent]; ok {
delete(sl.waiters, sl.sent)
close(notify)
}
sl.mu.Unlock()
}
}()
l, err := new(info)
assert.NilError(t, err)
l.journalReadDir = journalDir
sl := &syncLogger{journald: l}
l.sendToJournal = func(message string, priority journal.Priority, vars map[string]string) error {
sent := make(chan struct{})
sl.mu.Lock()
sl.queued++
sl.mu.Unlock()
s <- sendit{
message: message,
priority: priority,
vars: vars,
after: time.After(150 * time.Millisecond),
sent: sent,
}
sl.waitOn = sent
return nil
}
l.readSyncTimeout = 3 * time.Second
@ -88,17 +94,26 @@ type sendit struct {
priority journal.Priority
vars map[string]string
after <-chan time.Time
sent chan<- struct{}
}
type syncLogger struct {
*journald
waitOn <-chan struct{}
mu sync.Mutex
queued, sent uint64
waiters map[uint64]chan<- struct{}
}
func (l *syncLogger) Sync() error {
if l.waitOn != nil {
<-l.waitOn
l.mu.Lock()
waitFor := l.queued
if l.sent >= l.queued {
l.mu.Unlock()
return nil
}
notify := make(chan struct{})
l.waiters[waitFor] = notify
l.mu.Unlock()
<-notify
return nil
}