12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- package jsonfilelog // import "github.com/docker/docker/daemon/logger/jsonfilelog"
- import (
- "context"
- "encoding/json"
- "io"
- "github.com/docker/docker/api/types/backend"
- "github.com/docker/docker/daemon/logger"
- "github.com/docker/docker/daemon/logger/jsonfilelog/jsonlog"
- "github.com/docker/docker/daemon/logger/loggerutils"
- "github.com/docker/docker/pkg/tailfile"
- "github.com/sirupsen/logrus"
- )
- const maxJSONDecodeRetry = 20000
- // ReadLogs implements the logger's LogReader interface for the logs
- // created by this driver.
- func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
- logWatcher := logger.NewLogWatcher()
- go l.readLogs(logWatcher, config)
- return logWatcher
- }
- func (l *JSONFileLogger) readLogs(watcher *logger.LogWatcher, config logger.ReadConfig) {
- defer close(watcher.Msg)
- l.mu.Lock()
- l.readers[watcher] = struct{}{}
- l.mu.Unlock()
- l.writer.ReadLogs(config, watcher)
- l.mu.Lock()
- delete(l.readers, watcher)
- l.mu.Unlock()
- }
- func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) {
- l.Reset()
- if err := dec.Decode(l); err != nil {
- return nil, err
- }
- var attrs []backend.LogAttr
- if len(l.Attrs) != 0 {
- attrs = make([]backend.LogAttr, 0, len(l.Attrs))
- for k, v := range l.Attrs {
- attrs = append(attrs, backend.LogAttr{Key: k, Value: v})
- }
- }
- msg := &logger.Message{
- Source: l.Stream,
- Timestamp: l.Created,
- Line: []byte(l.Log),
- Attrs: attrs,
- }
- return msg, nil
- }
- // decodeFunc is used to create a decoder for the log file reader
- func decodeFunc(rdr io.Reader) func() (*logger.Message, error) {
- l := &jsonlog.JSONLog{}
- dec := json.NewDecoder(rdr)
- return func() (msg *logger.Message, err error) {
- for retries := 0; retries < maxJSONDecodeRetry; retries++ {
- msg, err = decodeLogLine(dec, l)
- if err == nil || err == io.EOF {
- break
- }
- logrus.WithError(err).WithField("retries", retries).Warn("got error while decoding json")
- // try again, could be due to a an incomplete json object as we read
- if _, ok := err.(*json.SyntaxError); ok {
- dec = json.NewDecoder(rdr)
- continue
- }
- // io.ErrUnexpectedEOF is returned from json.Decoder when there is
- // remaining data in the parser's buffer while an io.EOF occurs.
- // If the json logger writes a partial json log entry to the disk
- // while at the same time the decoder tries to decode it, the race condition happens.
- if err == io.ErrUnexpectedEOF {
- reader := io.MultiReader(dec.Buffered(), rdr)
- dec = json.NewDecoder(reader)
- continue
- }
- }
- return msg, err
- }
- }
- func getTailReader(ctx context.Context, r loggerutils.SizeReaderAt, req int) (io.Reader, int, error) {
- return tailfile.NewTailReader(ctx, r, req)
- }
|