package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutils" import ( "compress/gzip" "context" "encoding/json" "fmt" "io" "io/fs" "math" "os" "strconv" "sync" "time" "github.com/containerd/log" "github.com/docker/docker/daemon/logger" "github.com/docker/docker/pkg/pools" "github.com/pkg/errors" ) // rotateFileMetadata is a metadata of the gzip header of the compressed log file type rotateFileMetadata struct { LastTime time.Time `json:"lastTime,omitempty"` } // LogFile is Logger implementation for default Docker logging. type LogFile struct { mu sync.Mutex // protects the logfile access closed chan struct{} rotateMu sync.Mutex // blocks the next rotation until the current rotation is completed // Lock out readers while performing a non-atomic sequence of filesystem // operations (RLock: open, Lock: rename, delete). // // fsopMu should be locked for writing only while holding rotateMu. fsopMu sync.RWMutex // Logger configuration capacity int64 // maximum size of each file maxFiles int // maximum number of files compress bool // whether old versions of log files are compressed perms os.FileMode // Log file codec createDecoder MakeDecoderFn getTailReader GetTailReaderFunc // Log reader state in a 1-buffered channel. // // Share memory by communicating: receive to acquire, send to release. // The state struct is passed around by value so that use-after-send // bugs cannot escalate to data races. // // A method which receives the state value takes ownership of it. The // owner is responsible for either passing ownership along or sending // the state back to the channel. By convention, the semantics of // passing along ownership is expressed with function argument types. // Methods which take a pointer *logReadState argument borrow the state, // analogous to functions which require a lock to be held when calling. // The caller retains ownership. Calling a method which takes a // value logFileState argument gives ownership to the callee. read chan logReadState decompress *sharedTempFileConverter pos logPos // Current log file write position. f *os.File // Current log file for writing. lastTimestamp time.Time // timestamp of the last log } type logPos struct { // Size of the current file. size int64 // File rotation sequence number (modulo 2**16). rotation uint16 } type logReadState struct { // Current log file position. pos logPos // Wait list to be notified of the value of pos next time it changes. wait []chan<- logPos } // MakeDecoderFn creates a decoder type MakeDecoderFn func(rdr io.Reader) Decoder // Decoder is for reading logs // It is created by the log reader by calling the `MakeDecoderFunc` type Decoder interface { // Reset resets the decoder // Reset is called for certain events, such as log rotations Reset(io.Reader) // Decode decodes the next log messeage from the stream Decode() (*logger.Message, error) // Close signals to the decoder that it can release whatever resources it was using. Close() } // SizeReaderAt defines a ReaderAt that also reports its size. // This is used for tailing log files. type SizeReaderAt interface { io.Reader io.ReaderAt Size() int64 } type readAtCloser interface { io.ReaderAt io.Closer } // GetTailReaderFunc is used to truncate a reader to only read as much as is required // in order to get the passed in number of log lines. // It returns the sectioned reader, the number of lines that the section reader // contains, and any error that occurs. type GetTailReaderFunc func(ctx context.Context, f SizeReaderAt, nLogLines int) (rdr io.Reader, nLines int, err error) // NewLogFile creates new LogFile func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, decodeFunc MakeDecoderFn, perms os.FileMode, getTailReader GetTailReaderFunc) (*LogFile, error) { log, err := openFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, perms) if err != nil { return nil, err } size, err := log.Seek(0, io.SeekEnd) if err != nil { return nil, err } pos := logPos{ size: size, // Force a wraparound on first rotation to shake out any // modular-arithmetic bugs. rotation: math.MaxUint16, } st := make(chan logReadState, 1) st <- logReadState{pos: pos} return &LogFile{ f: log, read: st, pos: pos, closed: make(chan struct{}), capacity: capacity, maxFiles: maxFiles, compress: compress, decompress: newSharedTempFileConverter(decompress), createDecoder: decodeFunc, perms: perms, getTailReader: getTailReader, }, nil } // WriteLogEntry writes the provided log message to the current log file. // This may trigger a rotation event if the max file/capacity limits are hit. func (w *LogFile) WriteLogEntry(timestamp time.Time, marshalled []byte) error { select { case <-w.closed: return errors.New("cannot write because the output file was closed") default: } w.mu.Lock() defer w.mu.Unlock() // Are we due for a rotation? if w.capacity != -1 && w.pos.size >= w.capacity { if err := w.rotate(); err != nil { return errors.Wrap(err, "error rotating log file") } } n, err := w.f.Write(marshalled) if err != nil { return errors.Wrap(err, "error writing log entry") } w.pos.size += int64(n) w.lastTimestamp = timestamp // Notify any waiting readers that there is a new log entry to read. st := <-w.read defer func() { w.read <- st }() st.pos = w.pos for _, c := range st.wait { c <- st.pos } // Optimization: retain the backing array to save a heap allocation next // time a reader appends to the list. if st.wait != nil { st.wait = st.wait[:0] } return nil } func (w *LogFile) rotate() (retErr error) { w.rotateMu.Lock() noCompress := w.maxFiles <= 1 || !w.compress defer func() { // If we aren't going to run the goroutine to compress the log file, then we need to unlock in this function. // Otherwise the lock will be released in the goroutine that handles compression. if retErr != nil || noCompress { w.rotateMu.Unlock() } }() fname := w.f.Name() if err := w.f.Close(); err != nil { // if there was an error during a prior rotate, the file could already be closed if !errors.Is(err, fs.ErrClosed) { return errors.Wrap(err, "error closing file") } } file, err := func() (*os.File, error) { w.fsopMu.Lock() defer w.fsopMu.Unlock() if err := rotate(fname, w.maxFiles, w.compress); err != nil { log.G(context.TODO()).WithError(err).Warn("Error rotating log file, log data may have been lost") } else { // We may have readers working their way through the // current log file so we can't truncate it. We need to // start writing new logs to an empty file with the same // name as the current one so we need to rotate the // current file out of the way. if w.maxFiles < 2 { if err := unlink(fname); err != nil && !errors.Is(err, fs.ErrNotExist) { log.G(context.TODO()).WithError(err).Error("Error unlinking current log file") } } else { if err := os.Rename(fname, fname+".1"); err != nil && !errors.Is(err, fs.ErrNotExist) { log.G(context.TODO()).WithError(err).Error("Error renaming current log file") } } } // Notwithstanding the above, open with the truncate flag anyway // in case rotation didn't work out as planned. return openFile(fname, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, w.perms) }() if err != nil { return err } w.f = file w.pos = logPos{rotation: w.pos.rotation + 1} if noCompress { return nil } ts := w.lastTimestamp go func() { defer w.rotateMu.Unlock() // No need to hold fsopMu as at no point will the filesystem be // in a state which would cause problems for readers. Opening // the uncompressed file is tried first, falling back to the // compressed one. compressFile only deletes the uncompressed // file once the compressed one is fully written out, so at no // point during the compression process will a reader fail to // open a complete copy of the file. if err := compressFile(fname+".1", ts); err != nil { log.G(context.TODO()).WithError(err).Error("Error compressing log file after rotation") } }() return nil } func rotate(name string, maxFiles int, compress bool) error { if maxFiles < 2 { return nil } var extension string if compress { extension = ".gz" } lastFile := fmt.Sprintf("%s.%d%s", name, maxFiles-1, extension) err := unlink(lastFile) if err != nil && !errors.Is(err, fs.ErrNotExist) { return errors.Wrap(err, "error removing oldest log file") } for i := maxFiles - 1; i > 1; i-- { toPath := name + "." + strconv.Itoa(i) + extension fromPath := name + "." + strconv.Itoa(i-1) + extension err := os.Rename(fromPath, toPath) log.G(context.TODO()).WithError(err).WithField("source", fromPath).WithField("target", toPath).Trace("Rotating log file") if err != nil && !errors.Is(err, fs.ErrNotExist) { return err } } return nil } func compressFile(fileName string, lastTimestamp time.Time) (retErr error) { file, err := open(fileName) if err != nil { if errors.Is(err, fs.ErrNotExist) { log.G(context.TODO()).WithField("file", fileName).WithError(err).Debug("Could not open log file to compress") return nil } return errors.Wrap(err, "failed to open log file") } defer func() { file.Close() if retErr == nil { err := unlink(fileName) if err != nil && !errors.Is(err, fs.ErrNotExist) { retErr = errors.Wrap(err, "failed to remove source log file") } } }() outFile, err := openFile(fileName+".gz", os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0o640) if err != nil { return errors.Wrap(err, "failed to open or create gzip log file") } defer func() { outFile.Close() if retErr != nil { if err := unlink(fileName + ".gz"); err != nil && !errors.Is(err, fs.ErrNotExist) { log.G(context.TODO()).WithError(err).Error("Error cleaning up after failed log compression") } } }() compressWriter := gzip.NewWriter(outFile) defer compressWriter.Close() // Add the last log entry timestamp to the gzip header extra := rotateFileMetadata{} extra.LastTime = lastTimestamp compressWriter.Header.Extra, err = json.Marshal(&extra) if err != nil { // Here log the error only and don't return since this is just an optimization. log.G(context.TODO()).Warningf("Failed to marshal gzip header as JSON: %v", err) } _, err = pools.Copy(compressWriter, file) if err != nil { return errors.Wrapf(err, "error compressing log file %s", fileName) } return nil } // MaxFiles return maximum number of files func (w *LogFile) MaxFiles() int { return w.maxFiles } // Close closes underlying file and signals all readers to stop. func (w *LogFile) Close() error { w.mu.Lock() defer w.mu.Unlock() select { case <-w.closed: return nil default: } if err := w.f.Close(); err != nil && !errors.Is(err, fs.ErrClosed) { return err } close(w.closed) // Wait until any in-progress rotation is complete. w.rotateMu.Lock() w.rotateMu.Unlock() //nolint:staticcheck return nil } // ReadLogs decodes entries from log files. // // It is the caller's responsibility to call ConsumerGone on the LogWatcher. func (w *LogFile) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { watcher := logger.NewLogWatcher() // Lock out filesystem operations so that we can capture the read // position and atomically open the corresponding log file, without the // file getting rotated out from under us. w.fsopMu.RLock() // Capture the read position synchronously to ensure that we start // following from the last entry logged before ReadLogs was called, // which is required for flake-free unit testing. st := <-w.read pos := st.pos w.read <- st go w.readLogsLocked(pos, config, watcher) return watcher } // readLogsLocked is the bulk of the implementation of ReadLogs. // // w.fsopMu must be locked for reading when calling this method. // w.fsopMu.RUnlock() is called before returning. func (w *LogFile) readLogsLocked(currentPos logPos, config logger.ReadConfig, watcher *logger.LogWatcher) { defer close(watcher.Msg) currentFile, err := open(w.f.Name()) if err != nil { watcher.Err <- err return } defer currentFile.Close() dec := w.createDecoder(nil) defer dec.Close() currentChunk := io.NewSectionReader(currentFile, 0, currentPos.size) fwd := newForwarder(config) if config.Tail != 0 { // TODO(@cpuguy83): Instead of opening every file, only get the files which // are needed to tail. // This is especially costly when compression is enabled. files, err := w.openRotatedFiles(config) if err != nil { watcher.Err <- err return } closeFiles := func() { for _, f := range files { f.Close() } } readers := make([]SizeReaderAt, 0, len(files)+1) for _, f := range files { switch ff := f.(type) { case SizeReaderAt: readers = append(readers, ff) case interface{ Stat() (fs.FileInfo, error) }: stat, err := ff.Stat() if err != nil { watcher.Err <- errors.Wrap(err, "error reading size of rotated file") closeFiles() return } readers = append(readers, io.NewSectionReader(f, 0, stat.Size())) default: panic(fmt.Errorf("rotated file value %#v (%[1]T) has neither Size() nor Stat() methods", f)) } } if currentChunk.Size() > 0 { readers = append(readers, currentChunk) } ok := tailFiles(readers, watcher, dec, w.getTailReader, config.Tail, fwd) closeFiles() if !ok { return } } else { w.fsopMu.RUnlock() } if !config.Follow { return } (&follow{ LogFile: w, Watcher: watcher, Decoder: dec, Forwarder: fwd, }).Do(currentFile, currentPos) } // openRotatedFiles returns a slice of files open for reading, in order from // oldest to newest, and calls w.fsopMu.RUnlock() before returning. // // This method must only be called with w.fsopMu locked for reading. func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (files []readAtCloser, err error) { type rotatedFile struct { f *os.File compressed bool } var q []rotatedFile defer func() { if err != nil { for _, qq := range q { qq.f.Close() } for _, f := range files { f.Close() } } }() q, err = func() (q []rotatedFile, err error) { defer w.fsopMu.RUnlock() q = make([]rotatedFile, 0, w.maxFiles) for i := w.maxFiles; i > 1; i-- { var f rotatedFile f.f, err = open(fmt.Sprintf("%s.%d", w.f.Name(), i-1)) if err != nil { if !errors.Is(err, fs.ErrNotExist) { return nil, errors.Wrap(err, "error opening rotated log file") } f.compressed = true f.f, err = open(fmt.Sprintf("%s.%d.gz", w.f.Name(), i-1)) if err != nil { if !errors.Is(err, fs.ErrNotExist) { return nil, errors.Wrap(err, "error opening file for decompression") } continue } } q = append(q, f) } return q, nil }() if err != nil { return nil, err } for len(q) > 0 { qq := q[0] q = q[1:] if qq.compressed { defer qq.f.Close() f, err := w.maybeDecompressFile(qq.f, config) if err != nil { return nil, err } if f != nil { // The log before `config.Since` does not need to read files = append(files, f) } } else { files = append(files, qq.f) } } return files, nil } func (w *LogFile) maybeDecompressFile(cf *os.File, config logger.ReadConfig) (readAtCloser, error) { rc, err := gzip.NewReader(cf) if err != nil { return nil, errors.Wrap(err, "error making gzip reader for compressed log file") } defer rc.Close() // Extract the last log entry timestramp from the gzip header extra := &rotateFileMetadata{} err = json.Unmarshal(rc.Header.Extra, extra) if err == nil && !extra.LastTime.IsZero() && extra.LastTime.Before(config.Since) { return nil, nil } tmpf, err := w.decompress.Do(cf) return tmpf, errors.Wrap(err, "error decompressing log file") } func decompress(dst io.WriteSeeker, src io.ReadSeeker) error { if _, err := src.Seek(0, io.SeekStart); err != nil { return err } rc, err := gzip.NewReader(src) if err != nil { return err } _, err = pools.Copy(dst, rc) if err != nil { return err } return rc.Close() } func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, getTailReader GetTailReaderFunc, nLines int, fwd *forwarder) (cont bool) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() cont = true // TODO(@cpuguy83): we should plumb a context through instead of dealing with `WatchClose()` here. go func() { select { case <-ctx.Done(): case <-watcher.WatchConsumerGone(): cancel() } }() readers := make([]io.Reader, 0, len(files)) if nLines > 0 { for i := len(files) - 1; i >= 0 && nLines > 0; i-- { tail, n, err := getTailReader(ctx, files[i], nLines) if err != nil { watcher.Err <- errors.Wrap(err, "error finding file position to start log tailing") return false } nLines -= n readers = append([]io.Reader{tail}, readers...) } } else { for _, r := range files { readers = append(readers, r) } } rdr := io.MultiReader(readers...) dec.Reset(rdr) return fwd.Do(watcher, dec) } type forwarder struct { since, until time.Time } func newForwarder(config logger.ReadConfig) *forwarder { return &forwarder{since: config.Since, until: config.Until} } // Do reads log messages from dec and sends the messages matching the filter // conditions to watcher. Do returns cont=true iff it has read all messages from // dec without encountering a message with a timestamp which is after the // configured until time. func (fwd *forwarder) Do(watcher *logger.LogWatcher, dec Decoder) (cont bool) { for { msg, err := dec.Decode() if err != nil { if errors.Is(err, io.EOF) { return true } watcher.Err <- err return false } if !fwd.since.IsZero() { if msg.Timestamp.Before(fwd.since) { continue } // We've found our first message with a timestamp >= since. As message // timestamps might not be monotonic, we need to skip the since check for all // subsequent messages so we do not filter out later messages which happen to // have timestamps before since. fwd.since = time.Time{} } if !fwd.until.IsZero() && msg.Timestamp.After(fwd.until) { return false } select { case <-watcher.WatchConsumerGone(): return false case watcher.Msg <- msg: } } }