|
@@ -2,32 +2,42 @@ package jsonfilelog
|
|
|
|
|
|
import (
|
|
|
"bytes"
|
|
|
+ "encoding/json"
|
|
|
"fmt"
|
|
|
"io"
|
|
|
"os"
|
|
|
"strconv"
|
|
|
"sync"
|
|
|
+ "time"
|
|
|
+
|
|
|
+ "gopkg.in/fsnotify.v1"
|
|
|
|
|
|
"github.com/Sirupsen/logrus"
|
|
|
"github.com/docker/docker/daemon/logger"
|
|
|
+ "github.com/docker/docker/pkg/ioutils"
|
|
|
"github.com/docker/docker/pkg/jsonlog"
|
|
|
+ "github.com/docker/docker/pkg/pubsub"
|
|
|
+ "github.com/docker/docker/pkg/tailfile"
|
|
|
"github.com/docker/docker/pkg/timeutils"
|
|
|
"github.com/docker/docker/pkg/units"
|
|
|
)
|
|
|
|
|
|
const (
|
|
|
- Name = "json-file"
|
|
|
+ Name = "json-file"
|
|
|
+ maxJSONDecodeRetry = 10
|
|
|
)
|
|
|
|
|
|
// JSONFileLogger is Logger implementation for default docker logging:
|
|
|
// JSON objects to file
|
|
|
type JSONFileLogger struct {
|
|
|
- buf *bytes.Buffer
|
|
|
- f *os.File // store for closing
|
|
|
- mu sync.Mutex // protects buffer
|
|
|
- capacity int64 //maximum size of each file
|
|
|
- n int //maximum number of files
|
|
|
- ctx logger.Context
|
|
|
+ buf *bytes.Buffer
|
|
|
+ f *os.File // store for closing
|
|
|
+ mu sync.Mutex // protects buffer
|
|
|
+ capacity int64 //maximum size of each file
|
|
|
+ n int //maximum number of files
|
|
|
+ ctx logger.Context
|
|
|
+ readers map[*logger.LogWatcher]struct{} // stores the active log followers
|
|
|
+ notifyRotate *pubsub.Publisher
|
|
|
}
|
|
|
|
|
|
func init() {
|
|
@@ -64,11 +74,13 @@ func New(ctx logger.Context) (logger.Logger, error) {
|
|
|
}
|
|
|
}
|
|
|
return &JSONFileLogger{
|
|
|
- f: log,
|
|
|
- buf: bytes.NewBuffer(nil),
|
|
|
- ctx: ctx,
|
|
|
- capacity: capval,
|
|
|
- n: maxFiles,
|
|
|
+ f: log,
|
|
|
+ buf: bytes.NewBuffer(nil),
|
|
|
+ ctx: ctx,
|
|
|
+ capacity: capval,
|
|
|
+ n: maxFiles,
|
|
|
+ readers: make(map[*logger.LogWatcher]struct{}),
|
|
|
+ notifyRotate: pubsub.NewPublisher(0, 1),
|
|
|
}, nil
|
|
|
}
|
|
|
|
|
@@ -111,6 +123,7 @@ func writeLog(l *JSONFileLogger) (int64, error) {
|
|
|
return -1, err
|
|
|
}
|
|
|
l.f = file
|
|
|
+ l.notifyRotate.Publish(struct{}{})
|
|
|
}
|
|
|
return writeToBuf(l)
|
|
|
}
|
|
@@ -148,11 +161,11 @@ func backup(old, curr string) error {
|
|
|
}
|
|
|
}
|
|
|
if _, err := os.Stat(curr); os.IsNotExist(err) {
|
|
|
- if f, err := os.Create(curr); err != nil {
|
|
|
+ f, err := os.Create(curr)
|
|
|
+ if err != nil {
|
|
|
return err
|
|
|
- } else {
|
|
|
- f.Close()
|
|
|
}
|
|
|
+ f.Close()
|
|
|
}
|
|
|
return os.Rename(curr, old)
|
|
|
}
|
|
@@ -169,31 +182,200 @@ func ValidateLogOpt(cfg map[string]string) error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (l *JSONFileLogger) ReadLog(args ...string) (io.Reader, error) {
|
|
|
- pth := l.ctx.LogPath
|
|
|
- if len(args) > 0 {
|
|
|
- //check if args[0] is an integer index
|
|
|
- index, err := strconv.ParseInt(args[0], 0, 0)
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
- if index > 0 {
|
|
|
- pth = pth + "." + args[0]
|
|
|
- }
|
|
|
- }
|
|
|
- return os.Open(pth)
|
|
|
-}
|
|
|
-
|
|
|
func (l *JSONFileLogger) LogPath() string {
|
|
|
return l.ctx.LogPath
|
|
|
}
|
|
|
|
|
|
-// Close closes underlying file
|
|
|
+// Close closes underlying file and signals all readers to stop
|
|
|
func (l *JSONFileLogger) Close() error {
|
|
|
- return l.f.Close()
|
|
|
+ l.mu.Lock()
|
|
|
+ err := l.f.Close()
|
|
|
+ for r := range l.readers {
|
|
|
+ r.Close()
|
|
|
+ delete(l.readers, r)
|
|
|
+ }
|
|
|
+ l.mu.Unlock()
|
|
|
+ return err
|
|
|
}
|
|
|
|
|
|
// Name returns name of this logger
|
|
|
func (l *JSONFileLogger) Name() string {
|
|
|
return Name
|
|
|
}
|
|
|
+
|
|
|
+func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) {
|
|
|
+ l.Reset()
|
|
|
+ if err := dec.Decode(l); err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ msg := &logger.Message{
|
|
|
+ Source: l.Stream,
|
|
|
+ Timestamp: l.Created,
|
|
|
+ Line: []byte(l.Log),
|
|
|
+ }
|
|
|
+ return msg, nil
|
|
|
+}
|
|
|
+
|
|
|
+// Reads from the log file
|
|
|
+func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
|
|
|
+ logWatcher := logger.NewLogWatcher()
|
|
|
+
|
|
|
+ go l.readLogs(logWatcher, config)
|
|
|
+ return logWatcher
|
|
|
+}
|
|
|
+
|
|
|
+func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
|
|
|
+ defer close(logWatcher.Msg)
|
|
|
+
|
|
|
+ pth := l.ctx.LogPath
|
|
|
+ var files []io.ReadSeeker
|
|
|
+ for i := l.n; i > 1; i-- {
|
|
|
+ f, err := os.Open(fmt.Sprintf("%s.%d", pth, i-1))
|
|
|
+ if err != nil {
|
|
|
+ if !os.IsNotExist(err) {
|
|
|
+ logWatcher.Err <- err
|
|
|
+ break
|
|
|
+ }
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ defer f.Close()
|
|
|
+ files = append(files, f)
|
|
|
+ }
|
|
|
+
|
|
|
+ latestFile, err := os.Open(pth)
|
|
|
+ if err != nil {
|
|
|
+ logWatcher.Err <- err
|
|
|
+ return
|
|
|
+ }
|
|
|
+ defer latestFile.Close()
|
|
|
+
|
|
|
+ files = append(files, latestFile)
|
|
|
+ tailer := ioutils.MultiReadSeeker(files...)
|
|
|
+
|
|
|
+ if config.Tail != 0 {
|
|
|
+ tailFile(tailer, logWatcher, config.Tail, config.Since)
|
|
|
+ }
|
|
|
+
|
|
|
+ if !config.Follow {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if config.Tail == 0 {
|
|
|
+ latestFile.Seek(0, os.SEEK_END)
|
|
|
+ }
|
|
|
+
|
|
|
+ l.mu.Lock()
|
|
|
+ l.readers[logWatcher] = struct{}{}
|
|
|
+ l.mu.Unlock()
|
|
|
+
|
|
|
+ notifyRotate := l.notifyRotate.Subscribe()
|
|
|
+ followLogs(latestFile, logWatcher, notifyRotate, config.Since)
|
|
|
+
|
|
|
+ l.mu.Lock()
|
|
|
+ delete(l.readers, logWatcher)
|
|
|
+ l.mu.Unlock()
|
|
|
+
|
|
|
+ l.notifyRotate.Evict(notifyRotate)
|
|
|
+}
|
|
|
+
|
|
|
+func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since time.Time) {
|
|
|
+ var rdr io.Reader = f
|
|
|
+ if tail > 0 {
|
|
|
+ ls, err := tailfile.TailFile(f, tail)
|
|
|
+ if err != nil {
|
|
|
+ logWatcher.Err <- err
|
|
|
+ return
|
|
|
+ }
|
|
|
+ rdr = bytes.NewBuffer(bytes.Join(ls, []byte("\n")))
|
|
|
+ }
|
|
|
+ dec := json.NewDecoder(rdr)
|
|
|
+ l := &jsonlog.JSONLog{}
|
|
|
+ for {
|
|
|
+ msg, err := decodeLogLine(dec, l)
|
|
|
+ if err != nil {
|
|
|
+ if err != io.EOF {
|
|
|
+ logWatcher.Err <- err
|
|
|
+ }
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if !since.IsZero() && msg.Timestamp.Before(since) {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ logWatcher.Msg <- msg
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) {
|
|
|
+ dec := json.NewDecoder(f)
|
|
|
+ l := &jsonlog.JSONLog{}
|
|
|
+ fileWatcher, err := fsnotify.NewWatcher()
|
|
|
+ if err != nil {
|
|
|
+ logWatcher.Err <- err
|
|
|
+ return
|
|
|
+ }
|
|
|
+ defer fileWatcher.Close()
|
|
|
+ if err := fileWatcher.Add(f.Name()); err != nil {
|
|
|
+ logWatcher.Err <- err
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ var retries int
|
|
|
+ for {
|
|
|
+ msg, err := decodeLogLine(dec, l)
|
|
|
+ if err != nil {
|
|
|
+ if err != io.EOF {
|
|
|
+ // try again because this shouldn't happen
|
|
|
+ if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry {
|
|
|
+ dec = json.NewDecoder(f)
|
|
|
+ retries += 1
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ logWatcher.Err <- err
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ select {
|
|
|
+ case <-fileWatcher.Events:
|
|
|
+ dec = json.NewDecoder(f)
|
|
|
+ continue
|
|
|
+ case <-fileWatcher.Errors:
|
|
|
+ logWatcher.Err <- err
|
|
|
+ return
|
|
|
+ case <-logWatcher.WatchClose():
|
|
|
+ return
|
|
|
+ case <-notifyRotate:
|
|
|
+ fileWatcher.Remove(f.Name())
|
|
|
+
|
|
|
+ f, err = os.Open(f.Name())
|
|
|
+ if err != nil {
|
|
|
+ logWatcher.Err <- err
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if err := fileWatcher.Add(f.Name()); err != nil {
|
|
|
+ logWatcher.Err <- err
|
|
|
+ }
|
|
|
+ dec = json.NewDecoder(f)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ retries = 0 // reset retries since we've succeeded
|
|
|
+ if !since.IsZero() && msg.Timestamp.Before(since) {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ select {
|
|
|
+ case logWatcher.Msg <- msg:
|
|
|
+ case <-logWatcher.WatchClose():
|
|
|
+ logWatcher.Msg <- msg
|
|
|
+ for {
|
|
|
+ msg, err := decodeLogLine(dec, l)
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if !since.IsZero() && msg.Timestamp.Before(since) {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ logWatcher.Msg <- msg
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|