|
@@ -6,91 +6,83 @@ import (
|
|
|
"encoding/json"
|
|
|
"fmt"
|
|
|
"io"
|
|
|
+ "io/fs"
|
|
|
+ "math"
|
|
|
"os"
|
|
|
- "runtime"
|
|
|
"strconv"
|
|
|
- "strings"
|
|
|
"sync"
|
|
|
"time"
|
|
|
|
|
|
"github.com/docker/docker/daemon/logger"
|
|
|
- "github.com/docker/docker/pkg/filenotify"
|
|
|
"github.com/docker/docker/pkg/pools"
|
|
|
- "github.com/docker/docker/pkg/pubsub"
|
|
|
"github.com/pkg/errors"
|
|
|
"github.com/sirupsen/logrus"
|
|
|
)
|
|
|
|
|
|
-const tmpLogfileSuffix = ".tmp"
|
|
|
-
|
|
|
// rotateFileMetadata is a metadata of the gzip header of the compressed log file
|
|
|
type rotateFileMetadata struct {
|
|
|
LastTime time.Time `json:"lastTime,omitempty"`
|
|
|
}
|
|
|
|
|
|
-// refCounter is a counter of logfile being referenced
|
|
|
-type refCounter struct {
|
|
|
- mu sync.Mutex
|
|
|
- counter map[string]int
|
|
|
-}
|
|
|
-
|
|
|
-// Reference increase the reference counter for specified logfile
|
|
|
-func (rc *refCounter) GetReference(fileName string, openRefFile func(fileName string, exists bool) (*os.File, error)) (*os.File, error) {
|
|
|
- rc.mu.Lock()
|
|
|
- defer rc.mu.Unlock()
|
|
|
-
|
|
|
- var (
|
|
|
- file *os.File
|
|
|
- err error
|
|
|
- )
|
|
|
- _, ok := rc.counter[fileName]
|
|
|
- file, err = openRefFile(fileName, ok)
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
-
|
|
|
- if ok {
|
|
|
- rc.counter[fileName]++
|
|
|
- } else if file != nil {
|
|
|
- rc.counter[file.Name()] = 1
|
|
|
- }
|
|
|
-
|
|
|
- return file, nil
|
|
|
+// LogFile is Logger implementation for default Docker logging.
|
|
|
+type LogFile struct {
|
|
|
+ mu sync.Mutex // protects the logfile access
|
|
|
+ closed chan struct{}
|
|
|
+ rotateMu sync.Mutex // blocks the next rotation until the current rotation is completed
|
|
|
+ // Lock out readers while performing a non-atomic sequence of filesystem
|
|
|
+ // operations (RLock: open, Lock: rename, delete).
|
|
|
+ //
|
|
|
+ // fsopMu should be locked for writing only while holding rotateMu.
|
|
|
+ fsopMu sync.RWMutex
|
|
|
+
|
|
|
+ // Logger configuration
|
|
|
+
|
|
|
+ capacity int64 // maximum size of each file
|
|
|
+ maxFiles int // maximum number of files
|
|
|
+ compress bool // whether old versions of log files are compressed
|
|
|
+ perms os.FileMode
|
|
|
+
|
|
|
+ // Log file codec
|
|
|
+
|
|
|
+ marshal logger.MarshalFunc
|
|
|
+ createDecoder MakeDecoderFn
|
|
|
+ getTailReader GetTailReaderFunc
|
|
|
+
|
|
|
+ // Log reader state in a 1-buffered channel.
|
|
|
+ //
|
|
|
+ // Share memory by communicating: receive to acquire, send to release.
|
|
|
+ // The state struct is passed around by value so that use-after-send
|
|
|
+ // bugs cannot escalate to data races.
|
|
|
+ //
|
|
|
+ // A method which receives the state value takes ownership of it. The
|
|
|
+ // owner is responsible for either passing ownership along or sending
|
|
|
+ // the state back to the channel. By convention, the semantics of
|
|
|
+ // passing along ownership is expressed with function argument types.
|
|
|
+ // Methods which take a pointer *logReadState argument borrow the state,
|
|
|
+ // analogous to functions which require a lock to be held when calling.
|
|
|
+ // The caller retains ownership. Calling a method which which takes a
|
|
|
+ // value logFileState argument gives ownership to the callee.
|
|
|
+ read chan logReadState
|
|
|
+
|
|
|
+ decompress *sharedTempFileConverter
|
|
|
+
|
|
|
+ pos logPos // Current log file write position.
|
|
|
+ f *os.File // Current log file for writing.
|
|
|
+ lastTimestamp time.Time // timestamp of the last log
|
|
|
}
|
|
|
|
|
|
-// Dereference reduce the reference counter for specified logfile
|
|
|
-func (rc *refCounter) Dereference(fileName string) error {
|
|
|
- rc.mu.Lock()
|
|
|
- defer rc.mu.Unlock()
|
|
|
-
|
|
|
- rc.counter[fileName]--
|
|
|
- if rc.counter[fileName] <= 0 {
|
|
|
- delete(rc.counter, fileName)
|
|
|
- err := os.Remove(fileName)
|
|
|
- if err != nil && !os.IsNotExist(err) {
|
|
|
- return err
|
|
|
- }
|
|
|
- }
|
|
|
- return nil
|
|
|
+type logPos struct {
|
|
|
+ // Size of the current file.
|
|
|
+ size int64
|
|
|
+ // File rotation sequence number (modulo 2**16).
|
|
|
+ rotation uint16
|
|
|
}
|
|
|
|
|
|
-// LogFile is Logger implementation for default Docker logging.
|
|
|
-type LogFile struct {
|
|
|
- mu sync.RWMutex // protects the logfile access
|
|
|
- f *os.File // store for closing
|
|
|
- closed bool
|
|
|
- rotateMu sync.Mutex // blocks the next rotation until the current rotation is completed
|
|
|
- capacity int64 // maximum size of each file
|
|
|
- currentSize int64 // current size of the latest file
|
|
|
- maxFiles int // maximum number of files
|
|
|
- compress bool // whether old versions of log files are compressed
|
|
|
- lastTimestamp time.Time // timestamp of the last log
|
|
|
- filesRefCounter refCounter // keep reference-counted of decompressed files
|
|
|
- notifyReaders *pubsub.Publisher
|
|
|
- marshal logger.MarshalFunc
|
|
|
- createDecoder MakeDecoderFn
|
|
|
- getTailReader GetTailReaderFunc
|
|
|
- perms os.FileMode
|
|
|
+type logReadState struct {
|
|
|
+ // Current log file position.
|
|
|
+ pos logPos
|
|
|
+ // Wait list to be notified of the value of pos next time it changes.
|
|
|
+ wait []chan<- logPos
|
|
|
}
|
|
|
|
|
|
// MakeDecoderFn creates a decoder
|
|
@@ -111,10 +103,16 @@ type Decoder interface {
|
|
|
// SizeReaderAt defines a ReaderAt that also reports its size.
|
|
|
// This is used for tailing log files.
|
|
|
type SizeReaderAt interface {
|
|
|
+ io.Reader
|
|
|
io.ReaderAt
|
|
|
Size() int64
|
|
|
}
|
|
|
|
|
|
+type readAtCloser interface {
|
|
|
+ io.ReaderAt
|
|
|
+ io.Closer
|
|
|
+}
|
|
|
+
|
|
|
// GetTailReaderFunc is used to truncate a reader to only read as much as is required
|
|
|
// in order to get the passed in number of log lines.
|
|
|
// It returns the sectioned reader, the number of lines that the section reader
|
|
@@ -133,18 +131,28 @@ func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, mar
|
|
|
return nil, err
|
|
|
}
|
|
|
|
|
|
+ pos := logPos{
|
|
|
+ size: size,
|
|
|
+ // Force a wraparound on first rotation to shake out any
|
|
|
+ // modular-arithmetic bugs.
|
|
|
+ rotation: math.MaxUint16,
|
|
|
+ }
|
|
|
+ st := make(chan logReadState, 1)
|
|
|
+ st <- logReadState{pos: pos}
|
|
|
+
|
|
|
return &LogFile{
|
|
|
- f: log,
|
|
|
- capacity: capacity,
|
|
|
- currentSize: size,
|
|
|
- maxFiles: maxFiles,
|
|
|
- compress: compress,
|
|
|
- filesRefCounter: refCounter{counter: make(map[string]int)},
|
|
|
- notifyReaders: pubsub.NewPublisher(0, 1),
|
|
|
- marshal: marshaller,
|
|
|
- createDecoder: decodeFunc,
|
|
|
- perms: perms,
|
|
|
- getTailReader: getTailReader,
|
|
|
+ f: log,
|
|
|
+ read: st,
|
|
|
+ pos: pos,
|
|
|
+ closed: make(chan struct{}),
|
|
|
+ capacity: capacity,
|
|
|
+ maxFiles: maxFiles,
|
|
|
+ compress: compress,
|
|
|
+ decompress: newSharedTempFileConverter(decompress),
|
|
|
+ marshal: marshaller,
|
|
|
+ createDecoder: decodeFunc,
|
|
|
+ perms: perms,
|
|
|
+ getTailReader: getTailReader,
|
|
|
}, nil
|
|
|
}
|
|
|
|
|
@@ -160,35 +168,45 @@ func (w *LogFile) WriteLogEntry(msg *logger.Message) error {
|
|
|
logger.PutMessage(msg)
|
|
|
msg = nil // Turn use-after-put bugs into panics.
|
|
|
|
|
|
- w.mu.Lock()
|
|
|
- if w.closed {
|
|
|
- w.mu.Unlock()
|
|
|
+ select {
|
|
|
+ case <-w.closed:
|
|
|
return errors.New("cannot write because the output file was closed")
|
|
|
+ default:
|
|
|
}
|
|
|
+ w.mu.Lock()
|
|
|
+ defer w.mu.Unlock()
|
|
|
|
|
|
- if err := w.checkCapacityAndRotate(); err != nil {
|
|
|
- w.mu.Unlock()
|
|
|
- return errors.Wrap(err, "error rotating log file")
|
|
|
+ // Are we due for a rotation?
|
|
|
+ if w.capacity != -1 && w.pos.size >= w.capacity {
|
|
|
+ if err := w.rotate(); err != nil {
|
|
|
+ return errors.Wrap(err, "error rotating log file")
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
n, err := w.f.Write(b)
|
|
|
- if err == nil {
|
|
|
- w.currentSize += int64(n)
|
|
|
- w.lastTimestamp = ts
|
|
|
+ if err != nil {
|
|
|
+ return errors.Wrap(err, "error writing log entry")
|
|
|
}
|
|
|
+ w.pos.size += int64(n)
|
|
|
+ w.lastTimestamp = ts
|
|
|
|
|
|
- w.mu.Unlock()
|
|
|
- return errors.Wrap(err, "error writing log entry")
|
|
|
-}
|
|
|
+ // Notify any waiting readers that there is a new log entry to read.
|
|
|
+ st := <-w.read
|
|
|
+ defer func() { w.read <- st }()
|
|
|
+ st.pos = w.pos
|
|
|
|
|
|
-func (w *LogFile) checkCapacityAndRotate() (retErr error) {
|
|
|
- if w.capacity == -1 {
|
|
|
- return nil
|
|
|
+ for _, c := range st.wait {
|
|
|
+ c <- st.pos
|
|
|
}
|
|
|
- if w.currentSize < w.capacity {
|
|
|
- return nil
|
|
|
+ // Optimization: retain the backing array to save a heap allocation next
|
|
|
+ // time a reader appends to the list.
|
|
|
+ if st.wait != nil {
|
|
|
+ st.wait = st.wait[:0]
|
|
|
}
|
|
|
+ return nil
|
|
|
+}
|
|
|
|
|
|
+func (w *LogFile) rotate() (retErr error) {
|
|
|
w.rotateMu.Lock()
|
|
|
noCompress := w.maxFiles <= 1 || !w.compress
|
|
|
defer func() {
|
|
@@ -202,49 +220,61 @@ func (w *LogFile) checkCapacityAndRotate() (retErr error) {
|
|
|
fname := w.f.Name()
|
|
|
if err := w.f.Close(); err != nil {
|
|
|
// if there was an error during a prior rotate, the file could already be closed
|
|
|
- if !errors.Is(err, os.ErrClosed) {
|
|
|
+ if !errors.Is(err, fs.ErrClosed) {
|
|
|
return errors.Wrap(err, "error closing file")
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if err := rotate(fname, w.maxFiles, w.compress); err != nil {
|
|
|
- logrus.WithError(err).Warn("Error rotating log file, log data may have been lost")
|
|
|
- } else {
|
|
|
- var renameErr error
|
|
|
- for i := 0; i < 10; i++ {
|
|
|
- if renameErr = os.Rename(fname, fname+".1"); renameErr != nil && !os.IsNotExist(renameErr) {
|
|
|
- logrus.WithError(renameErr).WithField("file", fname).Debug("Error rotating current container log file, evicting readers and retrying")
|
|
|
- w.notifyReaders.Publish(renameErr)
|
|
|
- time.Sleep(100 * time.Millisecond)
|
|
|
- continue
|
|
|
+ file, err := func() (*os.File, error) {
|
|
|
+ w.fsopMu.Lock()
|
|
|
+ defer w.fsopMu.Unlock()
|
|
|
+
|
|
|
+ if err := rotate(fname, w.maxFiles, w.compress); err != nil {
|
|
|
+ logrus.WithError(err).Warn("Error rotating log file, log data may have been lost")
|
|
|
+ } else {
|
|
|
+ // We may have readers working their way through the
|
|
|
+ // current log file so we can't truncate it. We need to
|
|
|
+ // start writing new logs to an empty file with the same
|
|
|
+ // name as the current one so we need to rotate the
|
|
|
+ // current file out of the way.
|
|
|
+ if w.maxFiles < 2 {
|
|
|
+ if err := unlink(fname); err != nil && !errors.Is(err, fs.ErrNotExist) {
|
|
|
+ logrus.WithError(err).Error("Error unlinking current log file")
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ if err := os.Rename(fname, fname+".1"); err != nil && !errors.Is(err, fs.ErrNotExist) {
|
|
|
+ logrus.WithError(err).Error("Error renaming current log file")
|
|
|
+ }
|
|
|
}
|
|
|
- break
|
|
|
}
|
|
|
- if renameErr != nil {
|
|
|
- logrus.WithError(renameErr).Error("Error renaming current log file")
|
|
|
- }
|
|
|
- }
|
|
|
|
|
|
- file, err := openFile(fname, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, w.perms)
|
|
|
+ // Notwithstanding the above, open with the truncate flag anyway
|
|
|
+ // in case rotation didn't work out as planned.
|
|
|
+ return openFile(fname, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, w.perms)
|
|
|
+ }()
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
w.f = file
|
|
|
- w.currentSize = 0
|
|
|
-
|
|
|
- w.notifyReaders.Publish(struct{}{})
|
|
|
+ w.pos = logPos{rotation: w.pos.rotation + 1}
|
|
|
|
|
|
if noCompress {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
ts := w.lastTimestamp
|
|
|
-
|
|
|
go func() {
|
|
|
+ defer w.rotateMu.Unlock()
|
|
|
+ // No need to hold fsopMu as at no point will the filesystem be
|
|
|
+ // in a state which would cause problems for readers. Opening
|
|
|
+ // the uncompressed file is tried first, falling back to the
|
|
|
+ // compressed one. compressFile only deletes the uncompressed
|
|
|
+ // file once the compressed one is fully written out, so at no
|
|
|
+ // point during the compression process will a reader fail to
|
|
|
+ // open a complete copy of the file.
|
|
|
if err := compressFile(fname+".1", ts); err != nil {
|
|
|
logrus.WithError(err).Error("Error compressing log file after rotation")
|
|
|
}
|
|
|
- w.rotateMu.Unlock()
|
|
|
}()
|
|
|
|
|
|
return nil
|
|
@@ -261,16 +291,17 @@ func rotate(name string, maxFiles int, compress bool) error {
|
|
|
}
|
|
|
|
|
|
lastFile := fmt.Sprintf("%s.%d%s", name, maxFiles-1, extension)
|
|
|
- err := os.Remove(lastFile)
|
|
|
- if err != nil && !os.IsNotExist(err) {
|
|
|
+ err := unlink(lastFile)
|
|
|
+ if err != nil && !errors.Is(err, fs.ErrNotExist) {
|
|
|
return errors.Wrap(err, "error removing oldest log file")
|
|
|
}
|
|
|
|
|
|
for i := maxFiles - 1; i > 1; i-- {
|
|
|
toPath := name + "." + strconv.Itoa(i) + extension
|
|
|
fromPath := name + "." + strconv.Itoa(i-1) + extension
|
|
|
- logrus.WithField("source", fromPath).WithField("target", toPath).Trace("Rotating log file")
|
|
|
- if err := os.Rename(fromPath, toPath); err != nil && !os.IsNotExist(err) {
|
|
|
+ err := os.Rename(fromPath, toPath)
|
|
|
+ logrus.WithError(err).WithField("source", fromPath).WithField("target", toPath).Trace("Rotating log file")
|
|
|
+ if err != nil && !errors.Is(err, fs.ErrNotExist) {
|
|
|
return err
|
|
|
}
|
|
|
}
|
|
@@ -281,7 +312,7 @@ func rotate(name string, maxFiles int, compress bool) error {
|
|
|
func compressFile(fileName string, lastTimestamp time.Time) (retErr error) {
|
|
|
file, err := open(fileName)
|
|
|
if err != nil {
|
|
|
- if os.IsNotExist(err) {
|
|
|
+ if errors.Is(err, fs.ErrNotExist) {
|
|
|
logrus.WithField("file", fileName).WithError(err).Debug("Could not open log file to compress")
|
|
|
return nil
|
|
|
}
|
|
@@ -290,8 +321,8 @@ func compressFile(fileName string, lastTimestamp time.Time) (retErr error) {
|
|
|
defer func() {
|
|
|
file.Close()
|
|
|
if retErr == nil {
|
|
|
- err := os.Remove(fileName)
|
|
|
- if err != nil && !os.IsNotExist(err) {
|
|
|
+ err := unlink(fileName)
|
|
|
+ if err != nil && !errors.Is(err, fs.ErrNotExist) {
|
|
|
retErr = errors.Wrap(err, "failed to remove source log file")
|
|
|
}
|
|
|
}
|
|
@@ -304,7 +335,7 @@ func compressFile(fileName string, lastTimestamp time.Time) (retErr error) {
|
|
|
defer func() {
|
|
|
outFile.Close()
|
|
|
if retErr != nil {
|
|
|
- if err := os.Remove(fileName + ".gz"); err != nil && !os.IsExist(err) {
|
|
|
+ if err := unlink(fileName + ".gz"); err != nil && !errors.Is(err, fs.ErrNotExist) {
|
|
|
logrus.WithError(err).Error("Error cleaning up after failed log compression")
|
|
|
}
|
|
|
}
|
|
@@ -339,25 +370,49 @@ func (w *LogFile) MaxFiles() int {
|
|
|
func (w *LogFile) Close() error {
|
|
|
w.mu.Lock()
|
|
|
defer w.mu.Unlock()
|
|
|
- if w.closed {
|
|
|
+ select {
|
|
|
+ case <-w.closed:
|
|
|
return nil
|
|
|
+ default:
|
|
|
}
|
|
|
- if err := w.f.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
|
|
|
+ if err := w.f.Close(); err != nil && !errors.Is(err, fs.ErrClosed) {
|
|
|
return err
|
|
|
}
|
|
|
- w.closed = true
|
|
|
+ close(w.closed)
|
|
|
+ // Wait until any in-progress rotation is complete.
|
|
|
+ w.rotateMu.Lock()
|
|
|
+ w.rotateMu.Unlock() //nolint:staticcheck
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-// ReadLogs decodes entries from log files and sends them the passed in watcher
|
|
|
+// ReadLogs decodes entries from log files.
|
|
|
+//
|
|
|
+// It is the caller's responsibility to call ConsumerGone on the LogWatcher.
|
|
|
+func (w *LogFile) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
|
|
|
+ watcher := logger.NewLogWatcher()
|
|
|
+ // Lock out filesystem operations so that we can capture the read
|
|
|
+ // position and atomically open the corresponding log file, without the
|
|
|
+ // file getting rotated out from under us.
|
|
|
+ w.fsopMu.RLock()
|
|
|
+ // Capture the read position synchronously to ensure that we start
|
|
|
+ // following from the last entry logged before ReadLogs was called,
|
|
|
+ // which is required for flake-free unit testing.
|
|
|
+ st := <-w.read
|
|
|
+ pos := st.pos
|
|
|
+ w.read <- st
|
|
|
+ go w.readLogsLocked(pos, config, watcher)
|
|
|
+ return watcher
|
|
|
+}
|
|
|
+
|
|
|
+// readLogsLocked is the bulk of the implementation of ReadLogs.
|
|
|
//
|
|
|
-// Note: Using the follow option can become inconsistent in cases with very frequent rotations and max log files is 1.
|
|
|
-// TODO: Consider a different implementation which can effectively follow logs under frequent rotations.
|
|
|
-func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) {
|
|
|
- w.mu.RLock()
|
|
|
+// w.fsopMu must be locked for reading when calling this method.
|
|
|
+// w.fsopMu.RUnlock() is called before returning.
|
|
|
+func (w *LogFile) readLogsLocked(currentPos logPos, config logger.ReadConfig, watcher *logger.LogWatcher) {
|
|
|
+ defer close(watcher.Msg)
|
|
|
+
|
|
|
currentFile, err := open(w.f.Name())
|
|
|
if err != nil {
|
|
|
- w.mu.RUnlock()
|
|
|
watcher.Err <- err
|
|
|
return
|
|
|
}
|
|
@@ -366,25 +421,13 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher)
|
|
|
dec := w.createDecoder(nil)
|
|
|
defer dec.Close()
|
|
|
|
|
|
- currentChunk, err := newSectionReader(currentFile)
|
|
|
- if err != nil {
|
|
|
- w.mu.RUnlock()
|
|
|
- watcher.Err <- err
|
|
|
- return
|
|
|
- }
|
|
|
-
|
|
|
- notifyEvict := w.notifyReaders.SubscribeTopicWithBuffer(func(i interface{}) bool {
|
|
|
- _, ok := i.(error)
|
|
|
- return ok
|
|
|
- }, 1)
|
|
|
- defer w.notifyReaders.Evict(notifyEvict)
|
|
|
+ currentChunk := io.NewSectionReader(currentFile, 0, currentPos.size)
|
|
|
|
|
|
if config.Tail != 0 {
|
|
|
// TODO(@cpuguy83): Instead of opening every file, only get the files which
|
|
|
// are needed to tail.
|
|
|
// This is especially costly when compression is enabled.
|
|
|
files, err := w.openRotatedFiles(config)
|
|
|
- w.mu.RUnlock()
|
|
|
if err != nil {
|
|
|
watcher.Err <- err
|
|
|
return
|
|
@@ -393,115 +436,123 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher)
|
|
|
closeFiles := func() {
|
|
|
for _, f := range files {
|
|
|
f.Close()
|
|
|
- fileName := f.Name()
|
|
|
- if strings.HasSuffix(fileName, tmpLogfileSuffix) {
|
|
|
- err := w.filesRefCounter.Dereference(fileName)
|
|
|
- if err != nil {
|
|
|
- logrus.WithError(err).WithField("file", fileName).Error("Failed to dereference the log file")
|
|
|
- }
|
|
|
- }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
readers := make([]SizeReaderAt, 0, len(files)+1)
|
|
|
for _, f := range files {
|
|
|
- stat, err := f.Stat()
|
|
|
- if err != nil {
|
|
|
- watcher.Err <- errors.Wrap(err, "error reading size of rotated file")
|
|
|
- closeFiles()
|
|
|
- return
|
|
|
+ switch ff := f.(type) {
|
|
|
+ case SizeReaderAt:
|
|
|
+ readers = append(readers, ff)
|
|
|
+ case interface{ Stat() (fs.FileInfo, error) }:
|
|
|
+ stat, err := ff.Stat()
|
|
|
+ if err != nil {
|
|
|
+ watcher.Err <- errors.Wrap(err, "error reading size of rotated file")
|
|
|
+ closeFiles()
|
|
|
+ return
|
|
|
+ }
|
|
|
+ readers = append(readers, io.NewSectionReader(f, 0, stat.Size()))
|
|
|
+ default:
|
|
|
+ panic(fmt.Errorf("rotated file value %#v (%[1]T) has neither Size() nor Stat() methods", f))
|
|
|
}
|
|
|
- readers = append(readers, io.NewSectionReader(f, 0, stat.Size()))
|
|
|
}
|
|
|
if currentChunk.Size() > 0 {
|
|
|
readers = append(readers, currentChunk)
|
|
|
}
|
|
|
|
|
|
- ok := tailFiles(readers, watcher, dec, w.getTailReader, config, notifyEvict)
|
|
|
+ ok := tailFiles(readers, watcher, dec, w.getTailReader, config)
|
|
|
closeFiles()
|
|
|
if !ok {
|
|
|
return
|
|
|
}
|
|
|
- w.mu.RLock()
|
|
|
+ } else {
|
|
|
+ w.fsopMu.RUnlock()
|
|
|
}
|
|
|
|
|
|
- if !config.Follow || w.closed {
|
|
|
- w.mu.RUnlock()
|
|
|
+ if !config.Follow {
|
|
|
return
|
|
|
}
|
|
|
- w.mu.RUnlock()
|
|
|
-
|
|
|
- notifyRotate := w.notifyReaders.SubscribeTopic(func(i interface{}) bool {
|
|
|
- _, ok := i.(struct{})
|
|
|
- return ok
|
|
|
- })
|
|
|
- defer w.notifyReaders.Evict(notifyRotate)
|
|
|
|
|
|
- followLogs(currentFile, watcher, notifyRotate, notifyEvict, dec, config.Since, config.Until)
|
|
|
+ (&follow{
|
|
|
+ LogFile: w,
|
|
|
+ Watcher: watcher,
|
|
|
+ Decoder: dec,
|
|
|
+ Since: config.Since,
|
|
|
+ Until: config.Until,
|
|
|
+ }).Do(currentFile, currentPos)
|
|
|
}
|
|
|
|
|
|
-func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (files []*os.File, err error) {
|
|
|
- w.rotateMu.Lock()
|
|
|
- defer w.rotateMu.Unlock()
|
|
|
+// openRotatedFiles returns a slice of files open for reading, in order from
|
|
|
+// oldest to newest, and calls w.fsopMu.RUnlock() before returning.
|
|
|
+//
|
|
|
+// This method must only be called with w.fsopMu locked for reading.
|
|
|
+func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (files []readAtCloser, err error) {
|
|
|
+ type rotatedFile struct {
|
|
|
+ f *os.File
|
|
|
+ compressed bool
|
|
|
+ }
|
|
|
|
|
|
+ var q []rotatedFile
|
|
|
defer func() {
|
|
|
- if err == nil {
|
|
|
- return
|
|
|
- }
|
|
|
- for _, f := range files {
|
|
|
- f.Close()
|
|
|
- if strings.HasSuffix(f.Name(), tmpLogfileSuffix) {
|
|
|
- err := os.Remove(f.Name())
|
|
|
- if err != nil && !os.IsNotExist(err) {
|
|
|
- logrus.Warnf("Failed to remove logfile: %v", err)
|
|
|
- }
|
|
|
+ if err != nil {
|
|
|
+ for _, qq := range q {
|
|
|
+ qq.f.Close()
|
|
|
+ }
|
|
|
+ for _, f := range files {
|
|
|
+ f.Close()
|
|
|
}
|
|
|
}
|
|
|
}()
|
|
|
|
|
|
- for i := w.maxFiles; i > 1; i-- {
|
|
|
- f, err := open(fmt.Sprintf("%s.%d", w.f.Name(), i-1))
|
|
|
- if err != nil {
|
|
|
- if !os.IsNotExist(err) {
|
|
|
- return nil, errors.Wrap(err, "error opening rotated log file")
|
|
|
- }
|
|
|
+ q, err = func() (q []rotatedFile, err error) {
|
|
|
+ defer w.fsopMu.RUnlock()
|
|
|
|
|
|
- fileName := fmt.Sprintf("%s.%d.gz", w.f.Name(), i-1)
|
|
|
- decompressedFileName := fileName + tmpLogfileSuffix
|
|
|
- tmpFile, err := w.filesRefCounter.GetReference(decompressedFileName, func(refFileName string, exists bool) (*os.File, error) {
|
|
|
- if exists {
|
|
|
- return open(refFileName)
|
|
|
+ q = make([]rotatedFile, 0, w.maxFiles)
|
|
|
+ for i := w.maxFiles; i > 1; i-- {
|
|
|
+ var f rotatedFile
|
|
|
+ f.f, err = open(fmt.Sprintf("%s.%d", w.f.Name(), i-1))
|
|
|
+ if err != nil {
|
|
|
+ if !errors.Is(err, fs.ErrNotExist) {
|
|
|
+ return nil, errors.Wrap(err, "error opening rotated log file")
|
|
|
+ }
|
|
|
+ f.compressed = true
|
|
|
+ f.f, err = open(fmt.Sprintf("%s.%d.gz", w.f.Name(), i-1))
|
|
|
+ if err != nil {
|
|
|
+ if !errors.Is(err, fs.ErrNotExist) {
|
|
|
+ return nil, errors.Wrap(err, "error opening file for decompression")
|
|
|
+ }
|
|
|
+ continue
|
|
|
}
|
|
|
- return decompressfile(fileName, refFileName, config.Since)
|
|
|
- })
|
|
|
+ }
|
|
|
+ q = append(q, f)
|
|
|
+ }
|
|
|
+ return q, nil
|
|
|
+ }()
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
|
|
|
+ for len(q) > 0 {
|
|
|
+ qq := q[0]
|
|
|
+ q = q[1:]
|
|
|
+ if qq.compressed {
|
|
|
+ defer qq.f.Close()
|
|
|
+ f, err := w.maybeDecompressFile(qq.f, config)
|
|
|
if err != nil {
|
|
|
- if !errors.Is(err, os.ErrNotExist) {
|
|
|
- return nil, errors.Wrap(err, "error getting reference to decompressed log file")
|
|
|
- }
|
|
|
- continue
|
|
|
+ return nil, err
|
|
|
}
|
|
|
- if tmpFile == nil {
|
|
|
+ if f != nil {
|
|
|
// The log before `config.Since` does not need to read
|
|
|
- break
|
|
|
+ files = append(files, f)
|
|
|
}
|
|
|
-
|
|
|
- files = append(files, tmpFile)
|
|
|
- continue
|
|
|
+ } else {
|
|
|
+ files = append(files, qq.f)
|
|
|
}
|
|
|
- files = append(files, f)
|
|
|
}
|
|
|
-
|
|
|
return files, nil
|
|
|
}
|
|
|
|
|
|
-func decompressfile(fileName, destFileName string, since time.Time) (*os.File, error) {
|
|
|
- cf, err := open(fileName)
|
|
|
- if err != nil {
|
|
|
- return nil, errors.Wrap(err, "error opening file for decompression")
|
|
|
- }
|
|
|
- defer cf.Close()
|
|
|
-
|
|
|
+func (w *LogFile) maybeDecompressFile(cf *os.File, config logger.ReadConfig) (readAtCloser, error) {
|
|
|
rc, err := gzip.NewReader(cf)
|
|
|
if err != nil {
|
|
|
return nil, errors.Wrap(err, "error making gzip reader for compressed log file")
|
|
@@ -511,41 +562,29 @@ func decompressfile(fileName, destFileName string, since time.Time) (*os.File, e
|
|
|
// Extract the last log entry timestramp from the gzip header
|
|
|
extra := &rotateFileMetadata{}
|
|
|
err = json.Unmarshal(rc.Header.Extra, extra)
|
|
|
- if err == nil && extra.LastTime.Before(since) {
|
|
|
+ if err == nil && !extra.LastTime.IsZero() && extra.LastTime.Before(config.Since) {
|
|
|
return nil, nil
|
|
|
}
|
|
|
+ tmpf, err := w.decompress.Do(cf)
|
|
|
+ return tmpf, errors.Wrap(err, "error decompressing log file")
|
|
|
+}
|
|
|
|
|
|
- rs, err := openFile(destFileName, os.O_CREATE|os.O_RDWR, 0640)
|
|
|
- if err != nil {
|
|
|
- return nil, errors.Wrap(err, "error creating file for copying decompressed log stream")
|
|
|
+func decompress(dst io.WriteSeeker, src io.ReadSeeker) error {
|
|
|
+ if _, err := src.Seek(0, io.SeekStart); err != nil {
|
|
|
+ return err
|
|
|
}
|
|
|
-
|
|
|
- _, err = pools.Copy(rs, rc)
|
|
|
+ rc, err := gzip.NewReader(src)
|
|
|
if err != nil {
|
|
|
- rs.Close()
|
|
|
- rErr := os.Remove(rs.Name())
|
|
|
- if rErr != nil && !os.IsNotExist(rErr) {
|
|
|
- logrus.Errorf("Failed to remove logfile: %v", rErr)
|
|
|
- }
|
|
|
- return nil, errors.Wrap(err, "error while copying decompressed log stream to file")
|
|
|
+ return err
|
|
|
}
|
|
|
-
|
|
|
- return rs, nil
|
|
|
-}
|
|
|
-
|
|
|
-func newSectionReader(f *os.File) (*io.SectionReader, error) {
|
|
|
- // seek to the end to get the size
|
|
|
- // we'll leave this at the end of the file since section reader does not advance the reader
|
|
|
- size, err := f.Seek(0, io.SeekEnd)
|
|
|
+ _, err = pools.Copy(dst, rc)
|
|
|
if err != nil {
|
|
|
- return nil, errors.Wrap(err, "error getting current file size")
|
|
|
+ return err
|
|
|
}
|
|
|
- return io.NewSectionReader(f, 0, size), nil
|
|
|
+ return rc.Close()
|
|
|
}
|
|
|
|
|
|
-func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, getTailReader GetTailReaderFunc, config logger.ReadConfig, notifyEvict <-chan interface{}) (cont bool) {
|
|
|
- nLines := config.Tail
|
|
|
-
|
|
|
+func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, getTailReader GetTailReaderFunc, config logger.ReadConfig) (cont bool) {
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
defer cancel()
|
|
|
|
|
@@ -553,12 +592,6 @@ func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, ge
|
|
|
// TODO(@cpuguy83): we should plumb a context through instead of dealing with `WatchClose()` here.
|
|
|
go func() {
|
|
|
select {
|
|
|
- case err := <-notifyEvict:
|
|
|
- if err != nil {
|
|
|
- watcher.Err <- err.(error)
|
|
|
- cont = false
|
|
|
- cancel()
|
|
|
- }
|
|
|
case <-ctx.Done():
|
|
|
case <-watcher.WatchConsumerGone():
|
|
|
cont = false
|
|
@@ -569,6 +602,7 @@ func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, ge
|
|
|
readers := make([]io.Reader, 0, len(files))
|
|
|
|
|
|
if config.Tail > 0 {
|
|
|
+ nLines := config.Tail
|
|
|
for i := len(files) - 1; i >= 0 && nLines > 0; i-- {
|
|
|
tail, n, err := getTailReader(ctx, files[i], nLines)
|
|
|
if err != nil {
|
|
@@ -580,7 +614,7 @@ func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, ge
|
|
|
}
|
|
|
} else {
|
|
|
for _, r := range files {
|
|
|
- readers = append(readers, &wrappedReaderAt{ReaderAt: r})
|
|
|
+ readers = append(readers, r)
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -608,52 +642,3 @@ func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, ge
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
-func watchFile(name string) (filenotify.FileWatcher, error) {
|
|
|
- var fileWatcher filenotify.FileWatcher
|
|
|
-
|
|
|
- if runtime.GOOS == "windows" {
|
|
|
- // FileWatcher on Windows files is based on the syscall notifications which has an issue because of file caching.
|
|
|
- // It is based on ReadDirectoryChangesW() which doesn't detect writes to the cache. It detects writes to disk only.
|
|
|
- // Because of the OS lazy writing, we don't get notifications for file writes and thereby the watcher
|
|
|
- // doesn't work. Hence for Windows we will use poll based notifier.
|
|
|
- fileWatcher = filenotify.NewPollingWatcher()
|
|
|
- } else {
|
|
|
- var err error
|
|
|
- fileWatcher, err = filenotify.New()
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- logger := logrus.WithFields(logrus.Fields{
|
|
|
- "module": "logger",
|
|
|
- "file": name,
|
|
|
- })
|
|
|
-
|
|
|
- if err := fileWatcher.Add(name); err != nil {
|
|
|
- // we will retry using file poller.
|
|
|
- logger.WithError(err).Warnf("falling back to file poller")
|
|
|
- fileWatcher.Close()
|
|
|
- fileWatcher = filenotify.NewPollingWatcher()
|
|
|
-
|
|
|
- if err := fileWatcher.Add(name); err != nil {
|
|
|
- fileWatcher.Close()
|
|
|
- logger.WithError(err).Debugf("error watching log file for modifications")
|
|
|
- return nil, err
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- return fileWatcher, nil
|
|
|
-}
|
|
|
-
|
|
|
-type wrappedReaderAt struct {
|
|
|
- io.ReaderAt
|
|
|
- pos int64
|
|
|
-}
|
|
|
-
|
|
|
-func (r *wrappedReaderAt) Read(p []byte) (int, error) {
|
|
|
- n, err := r.ReaderAt.ReadAt(p, r.pos)
|
|
|
- r.pos += int64(n)
|
|
|
- return n, err
|
|
|
-}
|