123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381 |
- package jsonfilelog
- import (
- "bytes"
- "encoding/json"
- "fmt"
- "io"
- "os"
- "strconv"
- "sync"
- "time"
- "gopkg.in/fsnotify.v1"
- "github.com/Sirupsen/logrus"
- "github.com/docker/docker/daemon/logger"
- "github.com/docker/docker/pkg/ioutils"
- "github.com/docker/docker/pkg/jsonlog"
- "github.com/docker/docker/pkg/pubsub"
- "github.com/docker/docker/pkg/tailfile"
- "github.com/docker/docker/pkg/timeutils"
- "github.com/docker/docker/pkg/units"
- )
- const (
- Name = "json-file"
- maxJSONDecodeRetry = 10
- )
- // JSONFileLogger is Logger implementation for default docker logging:
- // JSON objects to file
- type JSONFileLogger struct {
- buf *bytes.Buffer
- f *os.File // store for closing
- mu sync.Mutex // protects buffer
- capacity int64 //maximum size of each file
- n int //maximum number of files
- ctx logger.Context
- readers map[*logger.LogWatcher]struct{} // stores the active log followers
- notifyRotate *pubsub.Publisher
- }
- func init() {
- if err := logger.RegisterLogDriver(Name, New); err != nil {
- logrus.Fatal(err)
- }
- if err := logger.RegisterLogOptValidator(Name, ValidateLogOpt); err != nil {
- logrus.Fatal(err)
- }
- }
- // New creates new JSONFileLogger which writes to filename
- func New(ctx logger.Context) (logger.Logger, error) {
- log, err := os.OpenFile(ctx.LogPath, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600)
- if err != nil {
- return nil, err
- }
- var capval int64 = -1
- if capacity, ok := ctx.Config["max-size"]; ok {
- var err error
- capval, err = units.FromHumanSize(capacity)
- if err != nil {
- return nil, err
- }
- }
- var maxFiles int = 1
- if maxFileString, ok := ctx.Config["max-file"]; ok {
- maxFiles, err = strconv.Atoi(maxFileString)
- if err != nil {
- return nil, err
- }
- if maxFiles < 1 {
- return nil, fmt.Errorf("max-files cannot be less than 1.")
- }
- }
- return &JSONFileLogger{
- f: log,
- buf: bytes.NewBuffer(nil),
- ctx: ctx,
- capacity: capval,
- n: maxFiles,
- readers: make(map[*logger.LogWatcher]struct{}),
- notifyRotate: pubsub.NewPublisher(0, 1),
- }, nil
- }
- // Log converts logger.Message to jsonlog.JSONLog and serializes it to file
- func (l *JSONFileLogger) Log(msg *logger.Message) error {
- l.mu.Lock()
- defer l.mu.Unlock()
- timestamp, err := timeutils.FastMarshalJSON(msg.Timestamp)
- if err != nil {
- return err
- }
- err = (&jsonlog.JSONLogBytes{Log: append(msg.Line, '\n'), Stream: msg.Source, Created: timestamp}).MarshalJSONBuf(l.buf)
- if err != nil {
- return err
- }
- l.buf.WriteByte('\n')
- _, err = writeLog(l)
- return err
- }
- func writeLog(l *JSONFileLogger) (int64, error) {
- if l.capacity == -1 {
- return writeToBuf(l)
- }
- meta, err := l.f.Stat()
- if err != nil {
- return -1, err
- }
- if meta.Size() >= l.capacity {
- name := l.f.Name()
- if err := l.f.Close(); err != nil {
- return -1, err
- }
- if err := rotate(name, l.n); err != nil {
- return -1, err
- }
- file, err := os.OpenFile(name, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0666)
- if err != nil {
- return -1, err
- }
- l.f = file
- l.notifyRotate.Publish(struct{}{})
- }
- return writeToBuf(l)
- }
- func writeToBuf(l *JSONFileLogger) (int64, error) {
- i, err := l.buf.WriteTo(l.f)
- if err != nil {
- l.buf = bytes.NewBuffer(nil)
- }
- return i, err
- }
- func rotate(name string, n int) error {
- if n < 2 {
- return nil
- }
- for i := n - 1; i > 1; i-- {
- oldFile := name + "." + strconv.Itoa(i)
- replacingFile := name + "." + strconv.Itoa(i-1)
- if err := backup(oldFile, replacingFile); err != nil {
- return err
- }
- }
- if err := backup(name+".1", name); err != nil {
- return err
- }
- return nil
- }
- func backup(old, curr string) error {
- if _, err := os.Stat(old); !os.IsNotExist(err) {
- err := os.Remove(old)
- if err != nil {
- return err
- }
- }
- if _, err := os.Stat(curr); os.IsNotExist(err) {
- f, err := os.Create(curr)
- if err != nil {
- return err
- }
- f.Close()
- }
- return os.Rename(curr, old)
- }
- func ValidateLogOpt(cfg map[string]string) error {
- for key := range cfg {
- switch key {
- case "max-file":
- case "max-size":
- default:
- return fmt.Errorf("unknown log opt '%s' for json-file log driver", key)
- }
- }
- return nil
- }
- func (l *JSONFileLogger) LogPath() string {
- return l.ctx.LogPath
- }
- // Close closes underlying file and signals all readers to stop
- func (l *JSONFileLogger) Close() error {
- l.mu.Lock()
- err := l.f.Close()
- for r := range l.readers {
- r.Close()
- delete(l.readers, r)
- }
- l.mu.Unlock()
- return err
- }
- // Name returns name of this logger
- func (l *JSONFileLogger) Name() string {
- return Name
- }
- func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) {
- l.Reset()
- if err := dec.Decode(l); err != nil {
- return nil, err
- }
- msg := &logger.Message{
- Source: l.Stream,
- Timestamp: l.Created,
- Line: []byte(l.Log),
- }
- return msg, nil
- }
- // Reads from the log file
- func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
- logWatcher := logger.NewLogWatcher()
- go l.readLogs(logWatcher, config)
- return logWatcher
- }
- func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
- defer close(logWatcher.Msg)
- pth := l.ctx.LogPath
- var files []io.ReadSeeker
- for i := l.n; i > 1; i-- {
- f, err := os.Open(fmt.Sprintf("%s.%d", pth, i-1))
- if err != nil {
- if !os.IsNotExist(err) {
- logWatcher.Err <- err
- break
- }
- continue
- }
- defer f.Close()
- files = append(files, f)
- }
- latestFile, err := os.Open(pth)
- if err != nil {
- logWatcher.Err <- err
- return
- }
- defer latestFile.Close()
- files = append(files, latestFile)
- tailer := ioutils.MultiReadSeeker(files...)
- if config.Tail != 0 {
- tailFile(tailer, logWatcher, config.Tail, config.Since)
- }
- if !config.Follow {
- return
- }
- if config.Tail == 0 {
- latestFile.Seek(0, os.SEEK_END)
- }
- l.mu.Lock()
- l.readers[logWatcher] = struct{}{}
- l.mu.Unlock()
- notifyRotate := l.notifyRotate.Subscribe()
- followLogs(latestFile, logWatcher, notifyRotate, config.Since)
- l.mu.Lock()
- delete(l.readers, logWatcher)
- l.mu.Unlock()
- l.notifyRotate.Evict(notifyRotate)
- }
- func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since time.Time) {
- var rdr io.Reader = f
- if tail > 0 {
- ls, err := tailfile.TailFile(f, tail)
- if err != nil {
- logWatcher.Err <- err
- return
- }
- rdr = bytes.NewBuffer(bytes.Join(ls, []byte("\n")))
- }
- dec := json.NewDecoder(rdr)
- l := &jsonlog.JSONLog{}
- for {
- msg, err := decodeLogLine(dec, l)
- if err != nil {
- if err != io.EOF {
- logWatcher.Err <- err
- }
- return
- }
- if !since.IsZero() && msg.Timestamp.Before(since) {
- continue
- }
- logWatcher.Msg <- msg
- }
- }
- func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) {
- dec := json.NewDecoder(f)
- l := &jsonlog.JSONLog{}
- fileWatcher, err := fsnotify.NewWatcher()
- if err != nil {
- logWatcher.Err <- err
- return
- }
- defer fileWatcher.Close()
- if err := fileWatcher.Add(f.Name()); err != nil {
- logWatcher.Err <- err
- return
- }
- var retries int
- for {
- msg, err := decodeLogLine(dec, l)
- if err != nil {
- if err != io.EOF {
- // try again because this shouldn't happen
- if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry {
- dec = json.NewDecoder(f)
- retries += 1
- continue
- }
- logWatcher.Err <- err
- return
- }
- select {
- case <-fileWatcher.Events:
- dec = json.NewDecoder(f)
- continue
- case <-fileWatcher.Errors:
- logWatcher.Err <- err
- return
- case <-logWatcher.WatchClose():
- return
- case <-notifyRotate:
- fileWatcher.Remove(f.Name())
- f, err = os.Open(f.Name())
- if err != nil {
- logWatcher.Err <- err
- return
- }
- if err := fileWatcher.Add(f.Name()); err != nil {
- logWatcher.Err <- err
- }
- dec = json.NewDecoder(f)
- continue
- }
- }
- retries = 0 // reset retries since we've succeeded
- if !since.IsZero() && msg.Timestamp.Before(since) {
- continue
- }
- select {
- case logWatcher.Msg <- msg:
- case <-logWatcher.WatchClose():
- logWatcher.Msg <- msg
- for {
- msg, err := decodeLogLine(dec, l)
- if err != nil {
- return
- }
- if !since.IsZero() && msg.Timestamp.Before(since) {
- continue
- }
- logWatcher.Msg <- msg
- }
- }
- }
- }
|