Bläddra i källkod

Merge pull request #40796 from cpuguy83/log_reads_allocs

Reduce allocations for logfile reader
Brian Goff 5 år sedan
förälder
incheckning
2200d938a2

+ 3 - 1
daemon/logger/jsonfilelog/jsonlog/jsonlog.go

@@ -21,5 +21,7 @@ func (jl *JSONLog) Reset() {
 	jl.Log = ""
 	jl.Log = ""
 	jl.Stream = ""
 	jl.Stream = ""
 	jl.Created = time.Time{}
 	jl.Created = time.Time{}
-	jl.Attrs = make(map[string]string)
+	for k := range jl.Attrs {
+		delete(jl.Attrs, k)
+	}
 }
 }

+ 58 - 28
daemon/logger/jsonfilelog/read.go

@@ -60,35 +60,65 @@ func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, erro
 	return msg, nil
 	return msg, nil
 }
 }
 
 
-// decodeFunc is used to create a decoder for the log file reader
-func decodeFunc(rdr io.Reader) func() (*logger.Message, error) {
-	l := &jsonlog.JSONLog{}
-	dec := json.NewDecoder(rdr)
-	return func() (msg *logger.Message, err error) {
-		for retries := 0; retries < maxJSONDecodeRetry; retries++ {
-			msg, err = decodeLogLine(dec, l)
-			if err == nil || err == io.EOF {
-				break
-			}
-
-			logrus.WithError(err).WithField("retries", retries).Warn("got error while decoding json")
-			// try again, could be due to a an incomplete json object as we read
-			if _, ok := err.(*json.SyntaxError); ok {
-				dec = json.NewDecoder(rdr)
-				continue
-			}
-
-			// io.ErrUnexpectedEOF is returned from json.Decoder when there is
-			// remaining data in the parser's buffer while an io.EOF occurs.
-			// If the json logger writes a partial json log entry to the disk
-			// while at the same time the decoder tries to decode it, the race condition happens.
-			if err == io.ErrUnexpectedEOF {
-				reader := io.MultiReader(dec.Buffered(), rdr)
-				dec = json.NewDecoder(reader)
-				continue
-			}
+type decoder struct {
+	rdr io.Reader
+	dec *json.Decoder
+	jl  *jsonlog.JSONLog
+}
+
+func (d *decoder) Reset(rdr io.Reader) {
+	d.rdr = rdr
+	d.dec = nil
+	if d.jl != nil {
+		d.jl.Reset()
+	}
+}
+
+func (d *decoder) Close() {
+	d.dec = nil
+	d.rdr = nil
+	d.jl = nil
+}
+
+func (d *decoder) Decode() (msg *logger.Message, err error) {
+	if d.dec == nil {
+		d.dec = json.NewDecoder(d.rdr)
+	}
+	if d.jl == nil {
+		d.jl = &jsonlog.JSONLog{}
+	}
+	for retries := 0; retries < maxJSONDecodeRetry; retries++ {
+		msg, err = decodeLogLine(d.dec, d.jl)
+		if err == nil || err == io.EOF {
+			break
+		}
+
+		logrus.WithError(err).WithField("retries", retries).Warn("got error while decoding json")
+		// try again, could be due to a an incomplete json object as we read
+		if _, ok := err.(*json.SyntaxError); ok {
+			d.dec = json.NewDecoder(d.rdr)
+			continue
+		}
+
+		// io.ErrUnexpectedEOF is returned from json.Decoder when there is
+		// remaining data in the parser's buffer while an io.EOF occurs.
+		// If the json logger writes a partial json log entry to the disk
+		// while at the same time the decoder tries to decode it, the race condition happens.
+		if err == io.ErrUnexpectedEOF {
+			d.rdr = io.MultiReader(d.dec.Buffered(), d.rdr)
+			d.dec = json.NewDecoder(d.rdr)
+			continue
 		}
 		}
-		return msg, err
+	}
+	return msg, err
+}
+
+// decodeFunc is used to create a decoder for the log file reader
+func decodeFunc(rdr io.Reader) loggerutils.Decoder {
+	return &decoder{
+		rdr: rdr,
+		dec: nil,
+		jl:  nil,
 	}
 	}
 }
 }
 
 

+ 7 - 5
daemon/logger/jsonfilelog/read_test.go

@@ -75,19 +75,21 @@ func TestEncodeDecode(t *testing.T) {
 	assert.Assert(t, marshalMessage(m2, nil, buf))
 	assert.Assert(t, marshalMessage(m2, nil, buf))
 	assert.Assert(t, marshalMessage(m3, nil, buf))
 	assert.Assert(t, marshalMessage(m3, nil, buf))
 
 
-	decode := decodeFunc(buf)
-	msg, err := decode()
+	dec := decodeFunc(buf)
+	defer dec.Close()
+
+	msg, err := dec.Decode()
 	assert.NilError(t, err)
 	assert.NilError(t, err)
 	assert.Assert(t, string(msg.Line) == "hello 1\n", string(msg.Line))
 	assert.Assert(t, string(msg.Line) == "hello 1\n", string(msg.Line))
 
 
-	msg, err = decode()
+	msg, err = dec.Decode()
 	assert.NilError(t, err)
 	assert.NilError(t, err)
 	assert.Assert(t, string(msg.Line) == "hello 2\n")
 	assert.Assert(t, string(msg.Line) == "hello 2\n")
 
 
-	msg, err = decode()
+	msg, err = dec.Decode()
 	assert.NilError(t, err)
 	assert.NilError(t, err)
 	assert.Assert(t, string(msg.Line) == "hello 3\n")
 	assert.Assert(t, string(msg.Line) == "hello 3\n")
 
 
-	_, err = decode()
+	_, err = dec.Decode()
 	assert.Assert(t, err == io.EOF)
 	assert.Assert(t, err == io.EOF)
 }
 }

+ 4 - 7
daemon/logger/local/local_test.go

@@ -1,21 +1,18 @@
 package local
 package local
 
 
 import (
 import (
+	"bytes"
 	"context"
 	"context"
 	"encoding/binary"
 	"encoding/binary"
+	"fmt"
+	"io"
 	"io/ioutil"
 	"io/ioutil"
 	"os"
 	"os"
 	"path/filepath"
 	"path/filepath"
+	"strings"
 	"testing"
 	"testing"
 	"time"
 	"time"
 
 
-	"bytes"
-	"fmt"
-
-	"strings"
-
-	"io"
-
 	"github.com/docker/docker/api/types/backend"
 	"github.com/docker/docker/api/types/backend"
 	"github.com/docker/docker/api/types/plugins/logdriver"
 	"github.com/docker/docker/api/types/plugins/logdriver"
 	"github.com/docker/docker/daemon/logger"
 	"github.com/docker/docker/daemon/logger"

+ 64 - 34
daemon/logger/local/read.go

@@ -96,51 +96,81 @@ func getTailReader(ctx context.Context, r loggerutils.SizeReaderAt, req int) (io
 	return io.NewSectionReader(r, offset, size), found, nil
 	return io.NewSectionReader(r, offset, size), found, nil
 }
 }
 
 
-func decodeFunc(rdr io.Reader) func() (*logger.Message, error) {
-	proto := &logdriver.LogEntry{}
-	buf := make([]byte, initialBufSize)
-
-	return func() (*logger.Message, error) {
-		var (
-			read int
-			err  error
-		)
-
-		resetProto(proto)
-
-		for i := 0; i < maxDecodeRetry; i++ {
-			var n int
-			n, err = io.ReadFull(rdr, buf[read:encodeBinaryLen])
-			if err != nil {
-				if err != io.ErrUnexpectedEOF {
-					return nil, errors.Wrap(err, "error reading log message length")
-				}
-				read += n
-				continue
+type decoder struct {
+	rdr   io.Reader
+	proto *logdriver.LogEntry
+	buf   []byte
+}
+
+func (d *decoder) Decode() (*logger.Message, error) {
+	if d.proto == nil {
+		d.proto = &logdriver.LogEntry{}
+	} else {
+		resetProto(d.proto)
+	}
+	if d.buf == nil {
+		d.buf = make([]byte, initialBufSize)
+	}
+	var (
+		read int
+		err  error
+	)
+
+	for i := 0; i < maxDecodeRetry; i++ {
+		var n int
+		n, err = io.ReadFull(d.rdr, d.buf[read:encodeBinaryLen])
+		if err != nil {
+			if err != io.ErrUnexpectedEOF {
+				return nil, errors.Wrap(err, "error reading log message length")
 			}
 			}
 			read += n
 			read += n
-			break
-		}
-		if err != nil {
-			return nil, errors.Wrapf(err, "could not read log message length: read: %d, expected: %d", read, encodeBinaryLen)
+			continue
 		}
 		}
+		read += n
+		break
+	}
+	if err != nil {
+		return nil, errors.Wrapf(err, "could not read log message length: read: %d, expected: %d", read, encodeBinaryLen)
+	}
 
 
-		msgLen := int(binary.BigEndian.Uint32(buf[:read]))
+	msgLen := int(binary.BigEndian.Uint32(d.buf[:read]))
 
 
-		if len(buf) < msgLen+encodeBinaryLen {
-			buf = make([]byte, msgLen+encodeBinaryLen)
+	if len(d.buf) < msgLen+encodeBinaryLen {
+		d.buf = make([]byte, msgLen+encodeBinaryLen)
+	} else {
+		if msgLen <= initialBufSize {
+			d.buf = d.buf[:initialBufSize]
 		} else {
 		} else {
-			if msgLen <= initialBufSize {
-				buf = buf[:initialBufSize]
-			} else {
-				buf = buf[:msgLen+encodeBinaryLen]
-			}
+			d.buf = d.buf[:msgLen+encodeBinaryLen]
 		}
 		}
+	}
 
 
-		return decodeLogEntry(rdr, proto, buf, msgLen)
+	return decodeLogEntry(d.rdr, d.proto, d.buf, msgLen)
+}
+
+func (d *decoder) Reset(rdr io.Reader) {
+	d.rdr = rdr
+	if d.proto != nil {
+		resetProto(d.proto)
+	}
+	if d.buf != nil {
+		d.buf = d.buf[:initialBufSize]
 	}
 	}
 }
 }
 
 
+func (d *decoder) Close() {
+	d.buf = d.buf[:0]
+	d.buf = nil
+	if d.proto != nil {
+		resetProto(d.proto)
+	}
+	d.rdr = nil
+}
+
+func decodeFunc(rdr io.Reader) loggerutils.Decoder {
+	return &decoder{rdr: rdr}
+}
+
 func decodeLogEntry(rdr io.Reader, proto *logdriver.LogEntry, buf []byte, msgLen int) (*logger.Message, error) {
 func decodeLogEntry(rdr io.Reader, proto *logdriver.LogEntry, buf []byte, msgLen int) (*logger.Message, error) {
 	var (
 	var (
 		read int
 		read int

+ 30 - 13
daemon/logger/loggerutils/logfile.go

@@ -89,12 +89,25 @@ type LogFile struct {
 	filesRefCounter refCounter // keep reference-counted of decompressed files
 	filesRefCounter refCounter // keep reference-counted of decompressed files
 	notifyRotate    *pubsub.Publisher
 	notifyRotate    *pubsub.Publisher
 	marshal         logger.MarshalFunc
 	marshal         logger.MarshalFunc
-	createDecoder   makeDecoderFunc
+	createDecoder   MakeDecoderFn
 	getTailReader   GetTailReaderFunc
 	getTailReader   GetTailReaderFunc
 	perms           os.FileMode
 	perms           os.FileMode
 }
 }
 
 
-type makeDecoderFunc func(rdr io.Reader) func() (*logger.Message, error)
+// MakeDecoderFn creates a decoder
+type MakeDecoderFn func(rdr io.Reader) Decoder
+
+// Decoder is for reading logs
+// It is created by the log reader by calling the `MakeDecoderFunc`
+type Decoder interface {
+	// Reset resets the decoder
+	// Reset is called for certain events, such as log rotations
+	Reset(io.Reader)
+	// Decode decodes the next log messeage from the stream
+	Decode() (*logger.Message, error)
+	// Close signals to the decoder that it can release whatever resources it was using.
+	Close()
+}
 
 
 // SizeReaderAt defines a ReaderAt that also reports its size.
 // SizeReaderAt defines a ReaderAt that also reports its size.
 // This is used for tailing log files.
 // This is used for tailing log files.
@@ -110,7 +123,7 @@ type SizeReaderAt interface {
 type GetTailReaderFunc func(ctx context.Context, f SizeReaderAt, nLogLines int) (rdr io.Reader, nLines int, err error)
 type GetTailReaderFunc func(ctx context.Context, f SizeReaderAt, nLogLines int) (rdr io.Reader, nLines int, err error)
 
 
 // NewLogFile creates new LogFile
 // NewLogFile creates new LogFile
-func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, marshaller logger.MarshalFunc, decodeFunc makeDecoderFunc, perms os.FileMode, getTailReader GetTailReaderFunc) (*LogFile, error) {
+func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, marshaller logger.MarshalFunc, decodeFunc MakeDecoderFn, perms os.FileMode, getTailReader GetTailReaderFunc) (*LogFile, error) {
 	log, err := openFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, perms)
 	log, err := openFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, perms)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
@@ -314,6 +327,9 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher)
 	}
 	}
 	defer currentFile.Close()
 	defer currentFile.Close()
 
 
+	dec := w.createDecoder(nil)
+	defer dec.Close()
+
 	currentChunk, err := newSectionReader(currentFile)
 	currentChunk, err := newSectionReader(currentFile)
 	if err != nil {
 	if err != nil {
 		w.mu.RUnlock()
 		w.mu.RUnlock()
@@ -359,7 +375,7 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher)
 			readers = append(readers, currentChunk)
 			readers = append(readers, currentChunk)
 		}
 		}
 
 
-		tailFiles(readers, watcher, w.createDecoder, w.getTailReader, config)
+		tailFiles(readers, watcher, dec, w.getTailReader, config)
 		closeFiles()
 		closeFiles()
 
 
 		w.mu.RLock()
 		w.mu.RLock()
@@ -373,7 +389,7 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher)
 
 
 	notifyRotate := w.notifyRotate.Subscribe()
 	notifyRotate := w.notifyRotate.Subscribe()
 	defer w.notifyRotate.Evict(notifyRotate)
 	defer w.notifyRotate.Evict(notifyRotate)
-	followLogs(currentFile, watcher, notifyRotate, w.createDecoder, config.Since, config.Until)
+	followLogs(currentFile, watcher, notifyRotate, dec, config.Since, config.Until)
 }
 }
 
 
 func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (files []*os.File, err error) {
 func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (files []*os.File, err error) {
@@ -479,7 +495,7 @@ func newSectionReader(f *os.File) (*io.SectionReader, error) {
 	return io.NewSectionReader(f, 0, size), nil
 	return io.NewSectionReader(f, 0, size), nil
 }
 }
 
 
-func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, createDecoder makeDecoderFunc, getTailReader GetTailReaderFunc, config logger.ReadConfig) {
+func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, getTailReader GetTailReaderFunc, config logger.ReadConfig) {
 	nLines := config.Tail
 	nLines := config.Tail
 
 
 	ctx, cancel := context.WithCancel(context.Background())
 	ctx, cancel := context.WithCancel(context.Background())
@@ -512,9 +528,10 @@ func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, createDecoder m
 	}
 	}
 
 
 	rdr := io.MultiReader(readers...)
 	rdr := io.MultiReader(readers...)
-	decodeLogLine := createDecoder(rdr)
+	dec.Reset(rdr)
+
 	for {
 	for {
-		msg, err := decodeLogLine()
+		msg, err := dec.Decode()
 		if err != nil {
 		if err != nil {
 			if errors.Cause(err) != io.EOF {
 			if errors.Cause(err) != io.EOF {
 				watcher.Err <- err
 				watcher.Err <- err
@@ -535,8 +552,8 @@ func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, createDecoder m
 	}
 	}
 }
 }
 
 
-func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, createDecoder makeDecoderFunc, since, until time.Time) {
-	decodeLogLine := createDecoder(f)
+func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, dec Decoder, since, until time.Time) {
+	dec.Reset(f)
 
 
 	name := f.Name()
 	name := f.Name()
 	fileWatcher, err := watchFile(name)
 	fileWatcher, err := watchFile(name)
@@ -567,7 +584,7 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int
 		if err := fileWatcher.Add(name); err != nil {
 		if err := fileWatcher.Add(name); err != nil {
 			return err
 			return err
 		}
 		}
-		decodeLogLine = createDecoder(f)
+		dec.Reset(f)
 		return nil
 		return nil
 	}
 	}
 
 
@@ -578,7 +595,7 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int
 		case e := <-fileWatcher.Events():
 		case e := <-fileWatcher.Events():
 			switch e.Op {
 			switch e.Op {
 			case fsnotify.Write:
 			case fsnotify.Write:
-				decodeLogLine = createDecoder(f)
+				dec.Reset(f)
 				return nil
 				return nil
 			case fsnotify.Rename, fsnotify.Remove:
 			case fsnotify.Rename, fsnotify.Remove:
 				select {
 				select {
@@ -648,7 +665,7 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int
 
 
 	// main loop
 	// main loop
 	for {
 	for {
-		msg, err := decodeLogLine()
+		msg, err := dec.Decode()
 		if err != nil {
 		if err != nil {
 			if err := handleDecodeErr(err); err != nil {
 			if err := handleDecodeErr(err); err != nil {
 				if err == errDone {
 				if err == errDone {

+ 65 - 31
daemon/logger/loggerutils/logfile_test.go

@@ -15,6 +15,32 @@ import (
 	"gotest.tools/v3/assert"
 	"gotest.tools/v3/assert"
 )
 )
 
 
+type testDecoder struct {
+	rdr     io.Reader
+	scanner *bufio.Scanner
+}
+
+func (d *testDecoder) Decode() (*logger.Message, error) {
+	if d.scanner == nil {
+		d.scanner = bufio.NewScanner(d.rdr)
+	}
+	if !d.scanner.Scan() {
+		return nil, d.scanner.Err()
+	}
+	// some comment
+	return &logger.Message{Line: d.scanner.Bytes(), Timestamp: time.Now()}, nil
+}
+
+func (d *testDecoder) Reset(rdr io.Reader) {
+	d.rdr = rdr
+	d.scanner = bufio.NewScanner(rdr)
+}
+
+func (d *testDecoder) Close() {
+	d.rdr = nil
+	d.scanner = nil
+}
+
 func TestTailFiles(t *testing.T) {
 func TestTailFiles(t *testing.T) {
 	s1 := strings.NewReader("Hello.\nMy name is Inigo Montoya.\n")
 	s1 := strings.NewReader("Hello.\nMy name is Inigo Montoya.\n")
 	s2 := strings.NewReader("I'm serious.\nDon't call me Shirley!\n")
 	s2 := strings.NewReader("I'm serious.\nDon't call me Shirley!\n")
@@ -22,27 +48,18 @@ func TestTailFiles(t *testing.T) {
 
 
 	files := []SizeReaderAt{s1, s2, s3}
 	files := []SizeReaderAt{s1, s2, s3}
 	watcher := logger.NewLogWatcher()
 	watcher := logger.NewLogWatcher()
-	createDecoder := func(r io.Reader) func() (*logger.Message, error) {
-		scanner := bufio.NewScanner(r)
-		return func() (*logger.Message, error) {
-			if !scanner.Scan() {
-				return nil, scanner.Err()
-			}
-			// some comment
-			return &logger.Message{Line: scanner.Bytes(), Timestamp: time.Now()}, nil
-		}
-	}
 
 
 	tailReader := func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) {
 	tailReader := func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) {
 		return tailfile.NewTailReader(ctx, r, lines)
 		return tailfile.NewTailReader(ctx, r, lines)
 	}
 	}
+	dec := &testDecoder{}
 
 
 	for desc, config := range map[string]logger.ReadConfig{} {
 	for desc, config := range map[string]logger.ReadConfig{} {
 		t.Run(desc, func(t *testing.T) {
 		t.Run(desc, func(t *testing.T) {
 			started := make(chan struct{})
 			started := make(chan struct{})
 			go func() {
 			go func() {
 				close(started)
 				close(started)
-				tailFiles(files, watcher, createDecoder, tailReader, config)
+				tailFiles(files, watcher, dec, tailReader, config)
 			}()
 			}()
 			<-started
 			<-started
 		})
 		})
@@ -52,7 +69,7 @@ func TestTailFiles(t *testing.T) {
 	started := make(chan struct{})
 	started := make(chan struct{})
 	go func() {
 	go func() {
 		close(started)
 		close(started)
-		tailFiles(files, watcher, createDecoder, tailReader, config)
+		tailFiles(files, watcher, dec, tailReader, config)
 	}()
 	}()
 	<-started
 	<-started
 
 
@@ -77,6 +94,15 @@ func TestTailFiles(t *testing.T) {
 	}
 	}
 }
 }
 
 
+type dummyDecoder struct{}
+
+func (dummyDecoder) Decode() (*logger.Message, error) {
+	return &logger.Message{}, nil
+}
+
+func (dummyDecoder) Close()          {}
+func (dummyDecoder) Reset(io.Reader) {}
+
 func TestFollowLogsConsumerGone(t *testing.T) {
 func TestFollowLogsConsumerGone(t *testing.T) {
 	lw := logger.NewLogWatcher()
 	lw := logger.NewLogWatcher()
 
 
@@ -87,16 +113,12 @@ func TestFollowLogsConsumerGone(t *testing.T) {
 		os.Remove(f.Name())
 		os.Remove(f.Name())
 	}()
 	}()
 
 
-	makeDecoder := func(rdr io.Reader) func() (*logger.Message, error) {
-		return func() (*logger.Message, error) {
-			return &logger.Message{}, nil
-		}
-	}
+	dec := dummyDecoder{}
 
 
 	followLogsDone := make(chan struct{})
 	followLogsDone := make(chan struct{})
 	var since, until time.Time
 	var since, until time.Time
 	go func() {
 	go func() {
-		followLogs(f, lw, make(chan interface{}), makeDecoder, since, until)
+		followLogs(f, lw, make(chan interface{}), dec, since, until)
 		close(followLogsDone)
 		close(followLogsDone)
 	}()
 	}()
 
 
@@ -118,6 +140,18 @@ func TestFollowLogsConsumerGone(t *testing.T) {
 	}
 	}
 }
 }
 
 
+type dummyWrapper struct {
+	dummyDecoder
+	fn func() error
+}
+
+func (d *dummyWrapper) Decode() (*logger.Message, error) {
+	if err := d.fn(); err != nil {
+		return nil, err
+	}
+	return d.dummyDecoder.Decode()
+}
+
 func TestFollowLogsProducerGone(t *testing.T) {
 func TestFollowLogsProducerGone(t *testing.T) {
 	lw := logger.NewLogWatcher()
 	lw := logger.NewLogWatcher()
 
 
@@ -126,25 +160,25 @@ func TestFollowLogsProducerGone(t *testing.T) {
 	defer os.Remove(f.Name())
 	defer os.Remove(f.Name())
 
 
 	var sent, received, closed int
 	var sent, received, closed int
-	makeDecoder := func(rdr io.Reader) func() (*logger.Message, error) {
-		return func() (*logger.Message, error) {
-			if closed == 1 {
-				closed++
-				t.Logf("logDecode() closed after sending %d messages\n", sent)
-				return nil, io.EOF
-			} else if closed > 1 {
-				t.Fatal("logDecode() called after closing!")
-				return nil, io.EOF
-			}
+	dec := &dummyWrapper{fn: func() error {
+		switch closed {
+		case 0:
 			sent++
 			sent++
-			return &logger.Message{}, nil
+			return nil
+		case 1:
+			closed++
+			t.Logf("logDecode() closed after sending %d messages\n", sent)
+			return io.EOF
+		default:
+			t.Fatal("logDecode() called after closing!")
+			return io.EOF
 		}
 		}
-	}
+	}}
 	var since, until time.Time
 	var since, until time.Time
 
 
 	followLogsDone := make(chan struct{})
 	followLogsDone := make(chan struct{})
 	go func() {
 	go func() {
-		followLogs(f, lw, make(chan interface{}), makeDecoder, since, until)
+		followLogs(f, lw, make(chan interface{}), dec, since, until)
 		close(followLogsDone)
 		close(followLogsDone)
 	}()
 	}()