|
@@ -61,9 +61,10 @@ func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, erro
|
|
}
|
|
}
|
|
|
|
|
|
type decoder struct {
|
|
type decoder struct {
|
|
- rdr io.Reader
|
|
|
|
- dec *json.Decoder
|
|
|
|
- jl *jsonlog.JSONLog
|
|
|
|
|
|
+ rdr io.Reader
|
|
|
|
+ dec *json.Decoder
|
|
|
|
+ jl *jsonlog.JSONLog
|
|
|
|
+ maxRetry int
|
|
}
|
|
}
|
|
|
|
|
|
func (d *decoder) Reset(rdr io.Reader) {
|
|
func (d *decoder) Reset(rdr io.Reader) {
|
|
@@ -87,7 +88,11 @@ func (d *decoder) Decode() (msg *logger.Message, err error) {
|
|
if d.jl == nil {
|
|
if d.jl == nil {
|
|
d.jl = &jsonlog.JSONLog{}
|
|
d.jl = &jsonlog.JSONLog{}
|
|
}
|
|
}
|
|
- for retries := 0; retries < maxJSONDecodeRetry; retries++ {
|
|
|
|
|
|
+ if d.maxRetry == 0 {
|
|
|
|
+ // We aren't using maxJSONDecodeRetry directly so we can give a custom value for testing.
|
|
|
|
+ d.maxRetry = maxJSONDecodeRetry
|
|
|
|
+ }
|
|
|
|
+ for retries := 0; retries < d.maxRetry; retries++ {
|
|
msg, err = decodeLogLine(d.dec, d.jl)
|
|
msg, err = decodeLogLine(d.dec, d.jl)
|
|
if err == nil || err == io.EOF {
|
|
if err == nil || err == io.EOF {
|
|
break
|
|
break
|
|
@@ -105,7 +110,7 @@ func (d *decoder) Decode() (msg *logger.Message, err error) {
|
|
// If the json logger writes a partial json log entry to the disk
|
|
// If the json logger writes a partial json log entry to the disk
|
|
// while at the same time the decoder tries to decode it, the race condition happens.
|
|
// while at the same time the decoder tries to decode it, the race condition happens.
|
|
if err == io.ErrUnexpectedEOF {
|
|
if err == io.ErrUnexpectedEOF {
|
|
- d.rdr = io.MultiReader(d.dec.Buffered(), d.rdr)
|
|
|
|
|
|
+ d.rdr = combineReaders(d.dec.Buffered(), d.rdr)
|
|
d.dec = json.NewDecoder(d.rdr)
|
|
d.dec = json.NewDecoder(d.rdr)
|
|
continue
|
|
continue
|
|
}
|
|
}
|
|
@@ -113,6 +118,46 @@ func (d *decoder) Decode() (msg *logger.Message, err error) {
|
|
return msg, err
|
|
return msg, err
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+func combineReaders(pre, rdr io.Reader) io.Reader {
|
|
|
|
+ return &combinedReader{pre: pre, rdr: rdr}
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// combinedReader is a reader which is like `io.MultiReader` where except it does not cache a full EOF.
|
|
|
|
+// Once `io.MultiReader` returns EOF, it is always EOF.
|
|
|
|
+//
|
|
|
|
+// For this usecase we have an underlying reader which is a file which may reach EOF but have more data written to it later.
|
|
|
|
+// As such, io.MultiReader does not work for us.
|
|
|
|
+type combinedReader struct {
|
|
|
|
+ pre io.Reader
|
|
|
|
+ rdr io.Reader
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (r *combinedReader) Read(p []byte) (int, error) {
|
|
|
|
+ var read int
|
|
|
|
+ if r.pre != nil {
|
|
|
|
+ n, err := r.pre.Read(p)
|
|
|
|
+ if err != nil {
|
|
|
|
+ if err != io.EOF {
|
|
|
|
+ return n, err
|
|
|
|
+ }
|
|
|
|
+ r.pre = nil
|
|
|
|
+ }
|
|
|
|
+ read = n
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if read < len(p) {
|
|
|
|
+ n, err := r.rdr.Read(p[read:])
|
|
|
|
+ if n > 0 {
|
|
|
|
+ read += n
|
|
|
|
+ }
|
|
|
|
+ if err != nil {
|
|
|
|
+ return read, err
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return read, nil
|
|
|
|
+}
|
|
|
|
+
|
|
// decodeFunc is used to create a decoder for the log file reader
|
|
// decodeFunc is used to create a decoder for the log file reader
|
|
func decodeFunc(rdr io.Reader) loggerutils.Decoder {
|
|
func decodeFunc(rdr io.Reader) loggerutils.Decoder {
|
|
return &decoder{
|
|
return &decoder{
|