Fix various race conditions in loggerutils

Found by running with `go test -race`

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
This commit is contained in:
Brian Goff 2020-07-14 17:13:26 -07:00
parent 06172ee8ab
commit 3148a46657

View file

@ -8,6 +8,7 @@ import (
"os"
"path/filepath"
"strings"
"sync/atomic"
"testing"
"time"
@ -51,6 +52,7 @@ func TestTailFiles(t *testing.T) {
files := []SizeReaderAt{s1, s2, s3}
watcher := logger.NewLogWatcher()
defer watcher.ConsumerGone()
tailReader := func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) {
return tailfile.NewTailReader(ctx, r, lines)
@ -157,19 +159,20 @@ func (d *dummyWrapper) Decode() (*logger.Message, error) {
func TestFollowLogsProducerGone(t *testing.T) {
lw := logger.NewLogWatcher()
defer lw.ConsumerGone()
f, err := ioutil.TempFile("", t.Name())
assert.NilError(t, err)
defer os.Remove(f.Name())
var sent, received, closed int
var sent, received, closed int32
dec := &dummyWrapper{fn: func() error {
switch closed {
switch atomic.LoadInt32(&closed) {
case 0:
sent++
atomic.AddInt32(&sent, 1)
return nil
case 1:
closed++
atomic.AddInt32(&closed, 1)
t.Logf("logDecode() closed after sending %d messages\n", sent)
return io.EOF
default:
@ -198,7 +201,7 @@ func TestFollowLogsProducerGone(t *testing.T) {
}
// "stop" the "container"
closed = 1
atomic.StoreInt32(&closed, 1)
lw.ProducerGone()
// should receive all the messages sent
@ -209,7 +212,7 @@ func TestFollowLogsProducerGone(t *testing.T) {
select {
case <-lw.Msg:
received++
if received == sent {
if received == atomic.LoadInt32(&sent) {
return
}
case err := <-lw.Err:
@ -223,7 +226,7 @@ func TestFollowLogsProducerGone(t *testing.T) {
t.Fatalf("timeout waiting for log messages to be read (sent: %d, received: %d", sent, received)
}
t.Logf("messages sent: %d, received: %d", sent, received)
t.Logf("messages sent: %d, received: %d", atomic.LoadInt32(&sent), received)
// followLogs() should be done by now
select {
@ -305,7 +308,7 @@ func checkFileExists(name string) poll.Check {
case os.IsNotExist(err):
return poll.Continue("waiting for %s to exist", name)
default:
t.Logf("%s", dirStringer{filepath.Dir(name)})
t.Logf("waiting for %s: %v: %s", name, err, dirStringer{filepath.Dir(name)})
return poll.Error(err)
}
}