Merge pull request #34162 from cpuguy83/move_logread_logic
Move jsonlog read logic
This commit is contained in:
commit
edaba571ba
10 changed files with 522 additions and 485 deletions
|
@ -7,7 +7,6 @@ import (
|
|||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
|
@ -24,12 +23,9 @@ const Name = "json-file"
|
|||
|
||||
// JSONFileLogger is Logger implementation for default Docker logging.
|
||||
type JSONFileLogger struct {
|
||||
extra []byte // json-encoded extra attributes
|
||||
|
||||
mu sync.RWMutex
|
||||
buf *bytes.Buffer // avoids allocating a new buffer on each call to `Log()`
|
||||
mu sync.Mutex
|
||||
closed bool
|
||||
writer *loggerutils.RotateFileWriter
|
||||
writer *loggerutils.LogFile
|
||||
readers map[*logger.LogWatcher]struct{} // stores the active log followers
|
||||
}
|
||||
|
||||
|
@ -65,11 +61,6 @@ func New(info logger.Info) (logger.Logger, error) {
|
|||
}
|
||||
}
|
||||
|
||||
writer, err := loggerutils.NewRotateFileWriter(info.LogPath, capval, maxFiles)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var extra []byte
|
||||
attrs, err := info.ExtraAttributes(nil)
|
||||
if err != nil {
|
||||
|
@ -83,33 +74,35 @@ func New(info logger.Info) (logger.Logger, error) {
|
|||
}
|
||||
}
|
||||
|
||||
buf := bytes.NewBuffer(nil)
|
||||
marshalFunc := func(msg *logger.Message) ([]byte, error) {
|
||||
if err := marshalMessage(msg, extra, buf); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
b := buf.Bytes()
|
||||
buf.Reset()
|
||||
return b, nil
|
||||
}
|
||||
|
||||
writer, err := loggerutils.NewLogFile(info.LogPath, capval, maxFiles, marshalFunc, decodeFunc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &JSONFileLogger{
|
||||
buf: bytes.NewBuffer(nil),
|
||||
writer: writer,
|
||||
readers: make(map[*logger.LogWatcher]struct{}),
|
||||
extra: extra,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Log converts logger.Message to jsonlog.JSONLog and serializes it to file.
|
||||
func (l *JSONFileLogger) Log(msg *logger.Message) error {
|
||||
l.mu.Lock()
|
||||
err := writeMessageBuf(l.writer, msg, l.extra, l.buf)
|
||||
l.buf.Reset()
|
||||
err := l.writer.WriteLogEntry(msg)
|
||||
l.mu.Unlock()
|
||||
return err
|
||||
}
|
||||
|
||||
func writeMessageBuf(w io.Writer, m *logger.Message, extra json.RawMessage, buf *bytes.Buffer) error {
|
||||
if err := marshalMessage(m, extra, buf); err != nil {
|
||||
logger.PutMessage(m)
|
||||
return err
|
||||
}
|
||||
logger.PutMessage(m)
|
||||
_, err := w.Write(buf.Bytes())
|
||||
return errors.Wrap(err, "error writing log entry")
|
||||
}
|
||||
|
||||
func marshalMessage(msg *logger.Message, extra json.RawMessage, buf *bytes.Buffer) error {
|
||||
logLine := msg.Line
|
||||
if !msg.Partial {
|
||||
|
|
|
@ -82,7 +82,7 @@ func BenchmarkJSONFileLoggerLog(b *testing.B) {
|
|||
}
|
||||
|
||||
buf := bytes.NewBuffer(nil)
|
||||
require.NoError(b, marshalMessage(msg, jsonlogger.(*JSONFileLogger).extra, buf))
|
||||
require.NoError(b, marshalMessage(msg, nil, buf))
|
||||
b.SetBytes(int64(buf.Len()))
|
||||
|
||||
b.ResetTimer()
|
||||
|
|
|
@ -1,33 +1,45 @@
|
|||
package jsonfilelog
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/fsnotify/fsnotify"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/docker/docker/api/types/backend"
|
||||
"github.com/docker/docker/daemon/logger"
|
||||
"github.com/docker/docker/daemon/logger/jsonfilelog/jsonlog"
|
||||
"github.com/docker/docker/daemon/logger/jsonfilelog/multireader"
|
||||
"github.com/docker/docker/pkg/filenotify"
|
||||
"github.com/docker/docker/pkg/tailfile"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const maxJSONDecodeRetry = 20000
|
||||
|
||||
// ReadLogs implements the logger's LogReader interface for the logs
|
||||
// created by this driver.
|
||||
func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
|
||||
logWatcher := logger.NewLogWatcher()
|
||||
|
||||
go l.readLogs(logWatcher, config)
|
||||
return logWatcher
|
||||
}
|
||||
|
||||
func (l *JSONFileLogger) readLogs(watcher *logger.LogWatcher, config logger.ReadConfig) {
|
||||
defer close(watcher.Msg)
|
||||
|
||||
l.mu.Lock()
|
||||
l.readers[watcher] = struct{}{}
|
||||
l.mu.Unlock()
|
||||
|
||||
l.writer.ReadLogs(config, watcher)
|
||||
|
||||
l.mu.Lock()
|
||||
delete(l.readers, watcher)
|
||||
l.mu.Unlock()
|
||||
}
|
||||
|
||||
func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) {
|
||||
l.Reset()
|
||||
if err := dec.Decode(l); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var attrs []backend.LogAttr
|
||||
if len(l.Attrs) != 0 {
|
||||
attrs = make([]backend.LogAttr, 0, len(l.Attrs))
|
||||
|
@ -44,318 +56,34 @@ func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, erro
|
|||
return msg, nil
|
||||
}
|
||||
|
||||
// ReadLogs implements the logger's LogReader interface for the logs
|
||||
// created by this driver.
|
||||
func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
|
||||
logWatcher := logger.NewLogWatcher()
|
||||
|
||||
go l.readLogs(logWatcher, config)
|
||||
return logWatcher
|
||||
}
|
||||
|
||||
func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
|
||||
defer close(logWatcher.Msg)
|
||||
|
||||
// lock so the read stream doesn't get corrupted due to rotations or other log data written while we open these files
|
||||
// This will block writes!!!
|
||||
l.mu.RLock()
|
||||
|
||||
// TODO it would be nice to move a lot of this reader implementation to the rotate logger object
|
||||
pth := l.writer.LogPath()
|
||||
var files []io.ReadSeeker
|
||||
for i := l.writer.MaxFiles(); i > 1; i-- {
|
||||
f, err := os.Open(fmt.Sprintf("%s.%d", pth, i-1))
|
||||
if err != nil {
|
||||
if !os.IsNotExist(err) {
|
||||
logWatcher.Err <- err
|
||||
l.mu.RUnlock()
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
defer f.Close()
|
||||
files = append(files, f)
|
||||
}
|
||||
|
||||
latestFile, err := os.Open(pth)
|
||||
if err != nil {
|
||||
logWatcher.Err <- errors.Wrap(err, "error opening latest log file")
|
||||
l.mu.RUnlock()
|
||||
return
|
||||
}
|
||||
defer latestFile.Close()
|
||||
|
||||
latestChunk, err := newSectionReader(latestFile)
|
||||
|
||||
// Now we have the reader sectioned, all fd's opened, we can unlock.
|
||||
// New writes/rotates will not affect seeking through these files
|
||||
l.mu.RUnlock()
|
||||
|
||||
if err != nil {
|
||||
logWatcher.Err <- err
|
||||
return
|
||||
}
|
||||
|
||||
if config.Tail != 0 {
|
||||
tailer := multireader.MultiReadSeeker(append(files, latestChunk)...)
|
||||
tailFile(tailer, logWatcher, config.Tail, config.Since, config.Until)
|
||||
}
|
||||
|
||||
// close all the rotated files
|
||||
for _, f := range files {
|
||||
if err := f.(io.Closer).Close(); err != nil {
|
||||
logrus.WithField("logger", "json-file").Warnf("error closing tailed log file: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if !config.Follow || l.closed {
|
||||
return
|
||||
}
|
||||
|
||||
notifyRotate := l.writer.NotifyRotate()
|
||||
defer l.writer.NotifyRotateEvict(notifyRotate)
|
||||
|
||||
l.mu.Lock()
|
||||
l.readers[logWatcher] = struct{}{}
|
||||
l.mu.Unlock()
|
||||
|
||||
followLogs(latestFile, logWatcher, notifyRotate, config)
|
||||
|
||||
l.mu.Lock()
|
||||
delete(l.readers, logWatcher)
|
||||
l.mu.Unlock()
|
||||
}
|
||||
|
||||
func newSectionReader(f *os.File) (*io.SectionReader, error) {
|
||||
// seek to the end to get the size
|
||||
// we'll leave this at the end of the file since section reader does not advance the reader
|
||||
size, err := f.Seek(0, os.SEEK_END)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error getting current file size")
|
||||
}
|
||||
return io.NewSectionReader(f, 0, size), nil
|
||||
}
|
||||
|
||||
func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since, until time.Time) {
|
||||
rdr := io.Reader(f)
|
||||
if tail > 0 {
|
||||
ls, err := tailfile.TailFile(f, tail)
|
||||
if err != nil {
|
||||
logWatcher.Err <- err
|
||||
return
|
||||
}
|
||||
rdr = bytes.NewBuffer(bytes.Join(ls, []byte("\n")))
|
||||
}
|
||||
dec := json.NewDecoder(rdr)
|
||||
for {
|
||||
msg, err := decodeLogLine(dec, &jsonlog.JSONLog{})
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
logWatcher.Err <- err
|
||||
}
|
||||
return
|
||||
}
|
||||
if !since.IsZero() && msg.Timestamp.Before(since) {
|
||||
continue
|
||||
}
|
||||
if !until.IsZero() && msg.Timestamp.After(until) {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case <-logWatcher.WatchClose():
|
||||
return
|
||||
case logWatcher.Msg <- msg:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func watchFile(name string) (filenotify.FileWatcher, error) {
|
||||
fileWatcher, err := filenotify.New()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := fileWatcher.Add(name); err != nil {
|
||||
logrus.WithField("logger", "json-file").Warnf("falling back to file poller due to error: %v", err)
|
||||
fileWatcher.Close()
|
||||
fileWatcher = filenotify.NewPollingWatcher()
|
||||
|
||||
if err := fileWatcher.Add(name); err != nil {
|
||||
fileWatcher.Close()
|
||||
logrus.Debugf("error watching log file for modifications: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return fileWatcher, nil
|
||||
}
|
||||
|
||||
func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, config logger.ReadConfig) {
|
||||
dec := json.NewDecoder(f)
|
||||
// decodeFunc is used to create a decoder for the log file reader
|
||||
func decodeFunc(rdr io.Reader) func() (*logger.Message, error) {
|
||||
l := &jsonlog.JSONLog{}
|
||||
|
||||
name := f.Name()
|
||||
fileWatcher, err := watchFile(name)
|
||||
if err != nil {
|
||||
logWatcher.Err <- err
|
||||
return
|
||||
}
|
||||
defer func() {
|
||||
f.Close()
|
||||
fileWatcher.Remove(name)
|
||||
fileWatcher.Close()
|
||||
}()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
go func() {
|
||||
select {
|
||||
case <-logWatcher.WatchClose():
|
||||
fileWatcher.Remove(name)
|
||||
cancel()
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
var retries int
|
||||
handleRotate := func() error {
|
||||
f.Close()
|
||||
fileWatcher.Remove(name)
|
||||
|
||||
// retry when the file doesn't exist
|
||||
for retries := 0; retries <= 5; retries++ {
|
||||
f, err = os.Open(name)
|
||||
if err == nil || !os.IsNotExist(err) {
|
||||
dec := json.NewDecoder(rdr)
|
||||
return func() (msg *logger.Message, err error) {
|
||||
for retries := 0; retries < maxJSONDecodeRetry; retries++ {
|
||||
msg, err = decodeLogLine(dec, l)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := fileWatcher.Add(name); err != nil {
|
||||
return err
|
||||
}
|
||||
dec = json.NewDecoder(f)
|
||||
return nil
|
||||
}
|
||||
|
||||
errRetry := errors.New("retry")
|
||||
errDone := errors.New("done")
|
||||
waitRead := func() error {
|
||||
select {
|
||||
case e := <-fileWatcher.Events():
|
||||
switch e.Op {
|
||||
case fsnotify.Write:
|
||||
dec = json.NewDecoder(f)
|
||||
return nil
|
||||
case fsnotify.Rename, fsnotify.Remove:
|
||||
select {
|
||||
case <-notifyRotate:
|
||||
case <-ctx.Done():
|
||||
return errDone
|
||||
}
|
||||
if err := handleRotate(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return errRetry
|
||||
case err := <-fileWatcher.Errors():
|
||||
logrus.Debug("logger got error watching file: %v", err)
|
||||
// Something happened, let's try and stay alive and create a new watcher
|
||||
if retries <= 5 {
|
||||
fileWatcher.Close()
|
||||
fileWatcher, err = watchFile(name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// try again, could be due to a an incomplete json object as we read
|
||||
if _, ok := err.(*json.SyntaxError); ok {
|
||||
dec = json.NewDecoder(rdr)
|
||||
retries++
|
||||
return errRetry
|
||||
continue
|
||||
}
|
||||
return err
|
||||
case <-ctx.Done():
|
||||
return errDone
|
||||
}
|
||||
}
|
||||
|
||||
handleDecodeErr := func(err error) error {
|
||||
if err == io.EOF {
|
||||
for {
|
||||
err := waitRead()
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
if err == errRetry {
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
// try again because this shouldn't happen
|
||||
if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry {
|
||||
dec = json.NewDecoder(f)
|
||||
retries++
|
||||
return nil
|
||||
}
|
||||
// io.ErrUnexpectedEOF is returned from json.Decoder when there is
|
||||
// remaining data in the parser's buffer while an io.EOF occurs.
|
||||
// If the json logger writes a partial json log entry to the disk
|
||||
// while at the same time the decoder tries to decode it, the race condition happens.
|
||||
if err == io.ErrUnexpectedEOF && retries <= maxJSONDecodeRetry {
|
||||
reader := io.MultiReader(dec.Buffered(), f)
|
||||
dec = json.NewDecoder(reader)
|
||||
retries++
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// main loop
|
||||
for {
|
||||
msg, err := decodeLogLine(dec, l)
|
||||
if err != nil {
|
||||
if err := handleDecodeErr(err); err != nil {
|
||||
if err == errDone {
|
||||
return
|
||||
}
|
||||
// we got an unrecoverable error, so return
|
||||
logWatcher.Err <- err
|
||||
return
|
||||
}
|
||||
// ready to try again
|
||||
continue
|
||||
}
|
||||
|
||||
since := config.Since
|
||||
until := config.Until
|
||||
|
||||
retries = 0 // reset retries since we've succeeded
|
||||
if !since.IsZero() && msg.Timestamp.Before(since) {
|
||||
continue
|
||||
}
|
||||
if !until.IsZero() && msg.Timestamp.After(until) {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case logWatcher.Msg <- msg:
|
||||
case <-ctx.Done():
|
||||
logWatcher.Msg <- msg
|
||||
// This for loop is used when the logger is closed (ie, container
|
||||
// stopped) but the consumer is still waiting for logs.
|
||||
for {
|
||||
msg, err := decodeLogLine(dec, l)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if !since.IsZero() && msg.Timestamp.Before(since) {
|
||||
continue
|
||||
}
|
||||
if !until.IsZero() && msg.Timestamp.After(until) {
|
||||
return
|
||||
}
|
||||
logWatcher.Msg <- msg
|
||||
// io.ErrUnexpectedEOF is returned from json.Decoder when there is
|
||||
// remaining data in the parser's buffer while an io.EOF occurs.
|
||||
// If the json logger writes a partial json log entry to the disk
|
||||
// while at the same time the decoder tries to decode it, the race condition happens.
|
||||
if err == io.ErrUnexpectedEOF {
|
||||
reader := io.MultiReader(dec.Buffered(), rdr)
|
||||
dec = json.NewDecoder(reader)
|
||||
retries++
|
||||
}
|
||||
}
|
||||
return msg, err
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,7 +35,7 @@ func BenchmarkJSONFileLoggerReadLogs(b *testing.B) {
|
|||
}
|
||||
|
||||
buf := bytes.NewBuffer(nil)
|
||||
require.NoError(b, marshalMessage(msg, jsonlogger.(*JSONFileLogger).extra, buf))
|
||||
require.NoError(b, marshalMessage(msg, nil, buf))
|
||||
b.SetBytes(int64(buf.Len()))
|
||||
|
||||
b.ResetTimer()
|
||||
|
|
|
@ -140,3 +140,6 @@ type Capability struct {
|
|||
// Determines if a log driver can read back logs
|
||||
ReadLogs bool
|
||||
}
|
||||
|
||||
// MarshalFunc is a func that marshals a message into an arbitrary format
|
||||
type MarshalFunc func(*Message) ([]byte, error)
|
||||
|
|
454
daemon/logger/loggerutils/logfile.go
Normal file
454
daemon/logger/loggerutils/logfile.go
Normal file
|
@ -0,0 +1,454 @@
|
|||
package loggerutils
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/docker/docker/daemon/logger"
|
||||
"github.com/docker/docker/daemon/logger/loggerutils/multireader"
|
||||
"github.com/docker/docker/pkg/filenotify"
|
||||
"github.com/docker/docker/pkg/pubsub"
|
||||
"github.com/docker/docker/pkg/tailfile"
|
||||
"github.com/fsnotify/fsnotify"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// LogFile is Logger implementation for default Docker logging.
|
||||
type LogFile struct {
|
||||
f *os.File // store for closing
|
||||
closed bool
|
||||
mu sync.RWMutex
|
||||
capacity int64 //maximum size of each file
|
||||
currentSize int64 // current size of the latest file
|
||||
maxFiles int //maximum number of files
|
||||
notifyRotate *pubsub.Publisher
|
||||
marshal logger.MarshalFunc
|
||||
createDecoder makeDecoderFunc
|
||||
}
|
||||
|
||||
type makeDecoderFunc func(rdr io.Reader) func() (*logger.Message, error)
|
||||
|
||||
//NewLogFile creates new LogFile
|
||||
func NewLogFile(logPath string, capacity int64, maxFiles int, marshaller logger.MarshalFunc, decodeFunc makeDecoderFunc) (*LogFile, error) {
|
||||
log, err := os.OpenFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0640)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
size, err := log.Seek(0, os.SEEK_END)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &LogFile{
|
||||
f: log,
|
||||
capacity: capacity,
|
||||
currentSize: size,
|
||||
maxFiles: maxFiles,
|
||||
notifyRotate: pubsub.NewPublisher(0, 1),
|
||||
marshal: marshaller,
|
||||
createDecoder: decodeFunc,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// WriteLogEntry writes the provided log message to the current log file.
|
||||
// This may trigger a rotation event if the max file/capacity limits are hit.
|
||||
func (w *LogFile) WriteLogEntry(msg *logger.Message) error {
|
||||
b, err := w.marshal(msg)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error marshalling log message")
|
||||
}
|
||||
|
||||
logger.PutMessage(msg)
|
||||
|
||||
w.mu.Lock()
|
||||
if w.closed {
|
||||
w.mu.Unlock()
|
||||
return errors.New("cannot write because the output file was closed")
|
||||
}
|
||||
|
||||
if err := w.checkCapacityAndRotate(); err != nil {
|
||||
w.mu.Unlock()
|
||||
return err
|
||||
}
|
||||
|
||||
n, err := w.f.Write(b)
|
||||
if err == nil {
|
||||
w.currentSize += int64(n)
|
||||
}
|
||||
w.mu.Unlock()
|
||||
return err
|
||||
}
|
||||
|
||||
func (w *LogFile) checkCapacityAndRotate() error {
|
||||
if w.capacity == -1 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if w.currentSize >= w.capacity {
|
||||
name := w.f.Name()
|
||||
if err := w.f.Close(); err != nil {
|
||||
return errors.Wrap(err, "error closing file")
|
||||
}
|
||||
if err := rotate(name, w.maxFiles); err != nil {
|
||||
return err
|
||||
}
|
||||
file, err := os.OpenFile(name, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
w.f = file
|
||||
w.currentSize = 0
|
||||
w.notifyRotate.Publish(struct{}{})
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func rotate(name string, maxFiles int) error {
|
||||
if maxFiles < 2 {
|
||||
return nil
|
||||
}
|
||||
for i := maxFiles - 1; i > 1; i-- {
|
||||
toPath := name + "." + strconv.Itoa(i)
|
||||
fromPath := name + "." + strconv.Itoa(i-1)
|
||||
if err := os.Rename(fromPath, toPath); err != nil && !os.IsNotExist(err) {
|
||||
return errors.Wrap(err, "error rotating old log entries")
|
||||
}
|
||||
}
|
||||
|
||||
if err := os.Rename(name, name+".1"); err != nil && !os.IsNotExist(err) {
|
||||
return errors.Wrap(err, "error rotating current log")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// LogPath returns the location the given writer logs to.
|
||||
func (w *LogFile) LogPath() string {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
return w.f.Name()
|
||||
}
|
||||
|
||||
// MaxFiles return maximum number of files
|
||||
func (w *LogFile) MaxFiles() int {
|
||||
return w.maxFiles
|
||||
}
|
||||
|
||||
// Close closes underlying file and signals all readers to stop.
|
||||
func (w *LogFile) Close() error {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
if w.closed {
|
||||
return nil
|
||||
}
|
||||
if err := w.f.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
w.closed = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// ReadLogs decodes entries from log files and sends them the passed in watcher
|
||||
func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) {
|
||||
w.mu.RLock()
|
||||
files, err := w.openRotatedFiles()
|
||||
if err != nil {
|
||||
w.mu.RUnlock()
|
||||
watcher.Err <- err
|
||||
return
|
||||
}
|
||||
defer func() {
|
||||
for _, f := range files {
|
||||
f.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
currentFile, err := os.Open(w.f.Name())
|
||||
if err != nil {
|
||||
w.mu.RUnlock()
|
||||
watcher.Err <- err
|
||||
return
|
||||
}
|
||||
defer currentFile.Close()
|
||||
|
||||
currentChunk, err := newSectionReader(currentFile)
|
||||
w.mu.RUnlock()
|
||||
|
||||
if err != nil {
|
||||
watcher.Err <- err
|
||||
return
|
||||
}
|
||||
|
||||
if config.Tail != 0 {
|
||||
seekers := make([]io.ReadSeeker, 0, len(files)+1)
|
||||
for _, f := range files {
|
||||
seekers = append(seekers, f)
|
||||
}
|
||||
seekers = append(seekers, currentChunk)
|
||||
tailFile(multireader.MultiReadSeeker(seekers...), watcher, w.createDecoder, config)
|
||||
}
|
||||
|
||||
w.mu.RLock()
|
||||
if !config.Follow || w.closed {
|
||||
w.mu.RUnlock()
|
||||
return
|
||||
}
|
||||
w.mu.RUnlock()
|
||||
|
||||
notifyRotate := w.notifyRotate.Subscribe()
|
||||
defer w.notifyRotate.Evict(notifyRotate)
|
||||
followLogs(currentFile, watcher, notifyRotate, w.createDecoder, config.Since, config.Until)
|
||||
}
|
||||
|
||||
func (w *LogFile) openRotatedFiles() (files []*os.File, err error) {
|
||||
defer func() {
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
for _, f := range files {
|
||||
f.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
for i := w.maxFiles; i > 1; i-- {
|
||||
f, err := os.Open(fmt.Sprintf("%s.%d", w.f.Name(), i-1))
|
||||
if err != nil {
|
||||
if !os.IsNotExist(err) {
|
||||
return nil, err
|
||||
}
|
||||
continue
|
||||
}
|
||||
files = append(files, f)
|
||||
}
|
||||
|
||||
return files, nil
|
||||
}
|
||||
|
||||
func newSectionReader(f *os.File) (*io.SectionReader, error) {
|
||||
// seek to the end to get the size
|
||||
// we'll leave this at the end of the file since section reader does not advance the reader
|
||||
size, err := f.Seek(0, os.SEEK_END)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error getting current file size")
|
||||
}
|
||||
return io.NewSectionReader(f, 0, size), nil
|
||||
}
|
||||
|
||||
type decodeFunc func() (*logger.Message, error)
|
||||
|
||||
func tailFile(f io.ReadSeeker, watcher *logger.LogWatcher, createDecoder makeDecoderFunc, config logger.ReadConfig) {
|
||||
var rdr io.Reader = f
|
||||
if config.Tail > 0 {
|
||||
ls, err := tailfile.TailFile(f, config.Tail)
|
||||
if err != nil {
|
||||
watcher.Err <- err
|
||||
return
|
||||
}
|
||||
rdr = bytes.NewBuffer(bytes.Join(ls, []byte("\n")))
|
||||
}
|
||||
|
||||
decodeLogLine := createDecoder(rdr)
|
||||
for {
|
||||
msg, err := decodeLogLine()
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
watcher.Err <- err
|
||||
}
|
||||
return
|
||||
}
|
||||
if !config.Since.IsZero() && msg.Timestamp.Before(config.Since) {
|
||||
continue
|
||||
}
|
||||
if !config.Until.IsZero() && msg.Timestamp.After(config.Until) {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case <-watcher.WatchClose():
|
||||
return
|
||||
case watcher.Msg <- msg:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, createDecoder makeDecoderFunc, since, until time.Time) {
|
||||
decodeLogLine := createDecoder(f)
|
||||
|
||||
name := f.Name()
|
||||
fileWatcher, err := watchFile(name)
|
||||
if err != nil {
|
||||
logWatcher.Err <- err
|
||||
return
|
||||
}
|
||||
defer func() {
|
||||
f.Close()
|
||||
fileWatcher.Remove(name)
|
||||
fileWatcher.Close()
|
||||
}()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
go func() {
|
||||
select {
|
||||
case <-logWatcher.WatchClose():
|
||||
fileWatcher.Remove(name)
|
||||
cancel()
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
var retries int
|
||||
handleRotate := func() error {
|
||||
f.Close()
|
||||
fileWatcher.Remove(name)
|
||||
|
||||
// retry when the file doesn't exist
|
||||
for retries := 0; retries <= 5; retries++ {
|
||||
f, err = os.Open(name)
|
||||
if err == nil || !os.IsNotExist(err) {
|
||||
break
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := fileWatcher.Add(name); err != nil {
|
||||
return err
|
||||
}
|
||||
decodeLogLine = createDecoder(f)
|
||||
return nil
|
||||
}
|
||||
|
||||
errRetry := errors.New("retry")
|
||||
errDone := errors.New("done")
|
||||
waitRead := func() error {
|
||||
select {
|
||||
case e := <-fileWatcher.Events():
|
||||
switch e.Op {
|
||||
case fsnotify.Write:
|
||||
decodeLogLine = createDecoder(f)
|
||||
return nil
|
||||
case fsnotify.Rename, fsnotify.Remove:
|
||||
select {
|
||||
case <-notifyRotate:
|
||||
case <-ctx.Done():
|
||||
return errDone
|
||||
}
|
||||
if err := handleRotate(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return errRetry
|
||||
case err := <-fileWatcher.Errors():
|
||||
logrus.Debug("logger got error watching file: %v", err)
|
||||
// Something happened, let's try and stay alive and create a new watcher
|
||||
if retries <= 5 {
|
||||
fileWatcher.Close()
|
||||
fileWatcher, err = watchFile(name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
retries++
|
||||
return errRetry
|
||||
}
|
||||
return err
|
||||
case <-ctx.Done():
|
||||
return errDone
|
||||
}
|
||||
}
|
||||
|
||||
handleDecodeErr := func(err error) error {
|
||||
if err != io.EOF {
|
||||
return err
|
||||
}
|
||||
|
||||
for {
|
||||
err := waitRead()
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
if err == errRetry {
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// main loop
|
||||
for {
|
||||
msg, err := decodeLogLine()
|
||||
if err != nil {
|
||||
if err := handleDecodeErr(err); err != nil {
|
||||
if err == errDone {
|
||||
return
|
||||
}
|
||||
// we got an unrecoverable error, so return
|
||||
logWatcher.Err <- err
|
||||
return
|
||||
}
|
||||
// ready to try again
|
||||
continue
|
||||
}
|
||||
|
||||
retries = 0 // reset retries since we've succeeded
|
||||
if !since.IsZero() && msg.Timestamp.Before(since) {
|
||||
continue
|
||||
}
|
||||
if !until.IsZero() && msg.Timestamp.After(until) {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case logWatcher.Msg <- msg:
|
||||
case <-ctx.Done():
|
||||
logWatcher.Msg <- msg
|
||||
for {
|
||||
msg, err := decodeLogLine()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if !since.IsZero() && msg.Timestamp.Before(since) {
|
||||
continue
|
||||
}
|
||||
if !until.IsZero() && msg.Timestamp.After(until) {
|
||||
return
|
||||
}
|
||||
logWatcher.Msg <- msg
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func watchFile(name string) (filenotify.FileWatcher, error) {
|
||||
fileWatcher, err := filenotify.New()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
logger := logrus.WithFields(logrus.Fields{
|
||||
"module": "logger",
|
||||
"fille": name,
|
||||
})
|
||||
|
||||
if err := fileWatcher.Add(name); err != nil {
|
||||
logger.WithError(err).Warnf("falling back to file poller")
|
||||
fileWatcher.Close()
|
||||
fileWatcher = filenotify.NewPollingWatcher()
|
||||
|
||||
if err := fileWatcher.Add(name); err != nil {
|
||||
fileWatcher.Close()
|
||||
logger.WithError(err).Debugf("error watching log file for modifications")
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return fileWatcher, nil
|
||||
}
|
|
@ -1,141 +0,0 @@
|
|||
package loggerutils
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"os"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"github.com/docker/docker/pkg/pubsub"
|
||||
)
|
||||
|
||||
// RotateFileWriter is Logger implementation for default Docker logging.
|
||||
type RotateFileWriter struct {
|
||||
f *os.File // store for closing
|
||||
closed bool
|
||||
mu sync.Mutex
|
||||
capacity int64 //maximum size of each file
|
||||
currentSize int64 // current size of the latest file
|
||||
maxFiles int //maximum number of files
|
||||
notifyRotate *pubsub.Publisher
|
||||
}
|
||||
|
||||
//NewRotateFileWriter creates new RotateFileWriter
|
||||
func NewRotateFileWriter(logPath string, capacity int64, maxFiles int) (*RotateFileWriter, error) {
|
||||
log, err := os.OpenFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0640)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
size, err := log.Seek(0, os.SEEK_END)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &RotateFileWriter{
|
||||
f: log,
|
||||
capacity: capacity,
|
||||
currentSize: size,
|
||||
maxFiles: maxFiles,
|
||||
notifyRotate: pubsub.NewPublisher(0, 1),
|
||||
}, nil
|
||||
}
|
||||
|
||||
//WriteLog write log message to File
|
||||
func (w *RotateFileWriter) Write(message []byte) (int, error) {
|
||||
w.mu.Lock()
|
||||
if w.closed {
|
||||
w.mu.Unlock()
|
||||
return -1, errors.New("cannot write because the output file was closed")
|
||||
}
|
||||
if err := w.checkCapacityAndRotate(); err != nil {
|
||||
w.mu.Unlock()
|
||||
return -1, err
|
||||
}
|
||||
|
||||
n, err := w.f.Write(message)
|
||||
if err == nil {
|
||||
w.currentSize += int64(n)
|
||||
}
|
||||
w.mu.Unlock()
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (w *RotateFileWriter) checkCapacityAndRotate() error {
|
||||
if w.capacity == -1 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if w.currentSize >= w.capacity {
|
||||
name := w.f.Name()
|
||||
if err := w.f.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := rotate(name, w.maxFiles); err != nil {
|
||||
return err
|
||||
}
|
||||
file, err := os.OpenFile(name, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
w.f = file
|
||||
w.currentSize = 0
|
||||
w.notifyRotate.Publish(struct{}{})
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func rotate(name string, maxFiles int) error {
|
||||
if maxFiles < 2 {
|
||||
return nil
|
||||
}
|
||||
for i := maxFiles - 1; i > 1; i-- {
|
||||
toPath := name + "." + strconv.Itoa(i)
|
||||
fromPath := name + "." + strconv.Itoa(i-1)
|
||||
if err := os.Rename(fromPath, toPath); err != nil && !os.IsNotExist(err) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := os.Rename(name, name+".1"); err != nil && !os.IsNotExist(err) {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// LogPath returns the location the given writer logs to.
|
||||
func (w *RotateFileWriter) LogPath() string {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
return w.f.Name()
|
||||
}
|
||||
|
||||
// MaxFiles return maximum number of files
|
||||
func (w *RotateFileWriter) MaxFiles() int {
|
||||
return w.maxFiles
|
||||
}
|
||||
|
||||
//NotifyRotate returns the new subscriber
|
||||
func (w *RotateFileWriter) NotifyRotate() chan interface{} {
|
||||
return w.notifyRotate.Subscribe()
|
||||
}
|
||||
|
||||
//NotifyRotateEvict removes the specified subscriber from receiving any more messages.
|
||||
func (w *RotateFileWriter) NotifyRotateEvict(sub chan interface{}) {
|
||||
w.notifyRotate.Evict(sub)
|
||||
}
|
||||
|
||||
// Close closes underlying file and signals all readers to stop.
|
||||
func (w *RotateFileWriter) Close() error {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
if w.closed {
|
||||
return nil
|
||||
}
|
||||
if err := w.f.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
w.closed = true
|
||||
return nil
|
||||
}
|
|
@ -123,7 +123,7 @@ func (daemon *Daemon) ContainerLogs(ctx context.Context, containerName string, c
|
|||
}
|
||||
return
|
||||
case <-ctx.Done():
|
||||
lg.Debug("logs: end stream, ctx is done: %v", ctx.Err())
|
||||
lg.Debugf("logs: end stream, ctx is done: %v", ctx.Err())
|
||||
return
|
||||
case msg, ok := <-logs.Msg:
|
||||
// there is some kind of pool or ring buffer in the logger that
|
||||
|
|
Loading…
Add table
Reference in a new issue