Merge pull request #42174 from thaJeztah/20.10_backport_41820_fix_json_unexpected_eof

[20.10 backport] Fix handling for json-file io.UnexpectedEOF
This commit is contained in:
Sebastiaan van Stijn 2021-03-20 10:10:24 +01:00 committed by GitHub
commit 5a697ae130
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 109 additions and 5 deletions

View file

@ -64,6 +64,7 @@ type decoder struct {
rdr io.Reader rdr io.Reader
dec *json.Decoder dec *json.Decoder
jl *jsonlog.JSONLog jl *jsonlog.JSONLog
maxRetry int
} }
func (d *decoder) Reset(rdr io.Reader) { func (d *decoder) Reset(rdr io.Reader) {
@ -87,7 +88,11 @@ func (d *decoder) Decode() (msg *logger.Message, err error) {
if d.jl == nil { if d.jl == nil {
d.jl = &jsonlog.JSONLog{} d.jl = &jsonlog.JSONLog{}
} }
for retries := 0; retries < maxJSONDecodeRetry; retries++ { if d.maxRetry == 0 {
// We aren't using maxJSONDecodeRetry directly so we can give a custom value for testing.
d.maxRetry = maxJSONDecodeRetry
}
for retries := 0; retries < d.maxRetry; retries++ {
msg, err = decodeLogLine(d.dec, d.jl) msg, err = decodeLogLine(d.dec, d.jl)
if err == nil || err == io.EOF { if err == nil || err == io.EOF {
break break
@ -105,7 +110,7 @@ func (d *decoder) Decode() (msg *logger.Message, err error) {
// If the json logger writes a partial json log entry to the disk // If the json logger writes a partial json log entry to the disk
// while at the same time the decoder tries to decode it, the race condition happens. // while at the same time the decoder tries to decode it, the race condition happens.
if err == io.ErrUnexpectedEOF { if err == io.ErrUnexpectedEOF {
d.rdr = io.MultiReader(d.dec.Buffered(), d.rdr) d.rdr = combineReaders(d.dec.Buffered(), d.rdr)
d.dec = json.NewDecoder(d.rdr) d.dec = json.NewDecoder(d.rdr)
continue continue
} }
@ -113,6 +118,46 @@ func (d *decoder) Decode() (msg *logger.Message, err error) {
return msg, err return msg, err
} }
func combineReaders(pre, rdr io.Reader) io.Reader {
return &combinedReader{pre: pre, rdr: rdr}
}
// combinedReader is a reader which is like `io.MultiReader` where except it does not cache a full EOF.
// Once `io.MultiReader` returns EOF, it is always EOF.
//
// For this usecase we have an underlying reader which is a file which may reach EOF but have more data written to it later.
// As such, io.MultiReader does not work for us.
type combinedReader struct {
pre io.Reader
rdr io.Reader
}
func (r *combinedReader) Read(p []byte) (int, error) {
var read int
if r.pre != nil {
n, err := r.pre.Read(p)
if err != nil {
if err != io.EOF {
return n, err
}
r.pre = nil
}
read = n
}
if read < len(p) {
n, err := r.rdr.Read(p[read:])
if n > 0 {
read += n
}
if err != nil {
return read, err
}
}
return read, nil
}
// decodeFunc is used to create a decoder for the log file reader // decodeFunc is used to create a decoder for the log file reader
func decodeFunc(rdr io.Reader) loggerutils.Decoder { func decodeFunc(rdr io.Reader) loggerutils.Decoder {
return &decoder{ return &decoder{

View file

@ -2,6 +2,7 @@ package jsonfilelog // import "github.com/docker/docker/daemon/logger/jsonfilelo
import ( import (
"bytes" "bytes"
"encoding/json"
"io" "io"
"testing" "testing"
"time" "time"
@ -93,3 +94,61 @@ func TestEncodeDecode(t *testing.T) {
_, err = dec.Decode() _, err = dec.Decode()
assert.Assert(t, err == io.EOF) assert.Assert(t, err == io.EOF)
} }
func TestUnexpectedEOF(t *testing.T) {
buf := bytes.NewBuffer(nil)
msg1 := &logger.Message{Timestamp: time.Now(), Line: []byte("hello1")}
msg2 := &logger.Message{Timestamp: time.Now(), Line: []byte("hello2")}
err := marshalMessage(msg1, json.RawMessage{}, buf)
assert.NilError(t, err)
err = marshalMessage(msg2, json.RawMessage{}, buf)
assert.NilError(t, err)
r := &readerWithErr{
err: io.EOF,
after: buf.Len() / 4,
r: buf,
}
dec := &decoder{rdr: r, maxRetry: 1}
_, err = dec.Decode()
assert.Error(t, err, io.ErrUnexpectedEOF.Error())
// again just to check
_, err = dec.Decode()
assert.Error(t, err, io.ErrUnexpectedEOF.Error())
// reset the error
// from here all reads should succeed until we get EOF on the underlying reader
r.err = nil
msg, err := dec.Decode()
assert.NilError(t, err)
assert.Equal(t, string(msg1.Line)+"\n", string(msg.Line))
msg, err = dec.Decode()
assert.NilError(t, err)
assert.Equal(t, string(msg2.Line)+"\n", string(msg.Line))
_, err = dec.Decode()
assert.Error(t, err, io.EOF.Error())
}
type readerWithErr struct {
err error
after int
r io.Reader
read int
}
func (r *readerWithErr) Read(p []byte) (int, error) {
if r.err != nil && r.read > r.after {
return 0, r.err
}
n, err := r.r.Read(p[:1])
if n > 0 {
r.read += n
}
return n, err
}