|
@@ -110,13 +110,54 @@ func (d *decoder) Decode() (msg *logger.Message, err error) {
|
|
|
// If the json logger writes a partial json log entry to the disk
|
|
|
// while at the same time the decoder tries to decode it, the race condition happens.
|
|
|
if err == io.ErrUnexpectedEOF {
|
|
|
- d.dec = json.NewDecoder(io.MultiReader(d.dec.Buffered(), d.rdr))
|
|
|
+ d.rdr = combineReaders(d.dec.Buffered(), d.rdr)
|
|
|
+ d.dec = json.NewDecoder(d.rdr)
|
|
|
continue
|
|
|
}
|
|
|
}
|
|
|
return msg, err
|
|
|
}
|
|
|
|
|
|
+func combineReaders(pre, rdr io.Reader) io.Reader {
|
|
|
+ return &combinedReader{pre: pre, rdr: rdr}
|
|
|
+}
|
|
|
+
|
|
|
+// combinedReader is a reader which is like `io.MultiReader` where except it does not cache a full EOF.
|
|
|
+// Once `io.MultiReader` returns EOF, it is always EOF.
|
|
|
+//
|
|
|
+// For this usecase we have an underlying reader which is a file which may reach EOF but have more data written to it later.
|
|
|
+// As such, io.MultiReader does not work for us.
|
|
|
+type combinedReader struct {
|
|
|
+ pre io.Reader
|
|
|
+ rdr io.Reader
|
|
|
+}
|
|
|
+
|
|
|
+func (r *combinedReader) Read(p []byte) (int, error) {
|
|
|
+ var read int
|
|
|
+ if r.pre != nil {
|
|
|
+ n, err := r.pre.Read(p)
|
|
|
+ if err != nil {
|
|
|
+ if err != io.EOF {
|
|
|
+ return n, err
|
|
|
+ }
|
|
|
+ r.pre = nil
|
|
|
+ }
|
|
|
+ read = n
|
|
|
+ }
|
|
|
+
|
|
|
+ if read < len(p) {
|
|
|
+ n, err := r.rdr.Read(p[read:])
|
|
|
+ if n > 0 {
|
|
|
+ read += n
|
|
|
+ }
|
|
|
+ if err != nil {
|
|
|
+ return read, err
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return read, nil
|
|
|
+}
|
|
|
+
|
|
|
// decodeFunc is used to create a decoder for the log file reader
|
|
|
func decodeFunc(rdr io.Reader) loggerutils.Decoder {
|
|
|
return &decoder{
|