123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654 |
- package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutils"
- import (
- "compress/gzip"
- "context"
- "encoding/json"
- "fmt"
- "io"
- "io/fs"
- "math"
- "os"
- "strconv"
- "sync"
- "time"
- "github.com/containerd/log"
- "github.com/docker/docker/daemon/logger"
- "github.com/docker/docker/pkg/pools"
- "github.com/pkg/errors"
- )
- // rotateFileMetadata is a metadata of the gzip header of the compressed log file
- type rotateFileMetadata struct {
- LastTime time.Time `json:"lastTime,omitempty"`
- }
- // LogFile is Logger implementation for default Docker logging.
- type LogFile struct {
- mu sync.Mutex // protects the logfile access
- closed chan struct{}
- rotateMu sync.Mutex // blocks the next rotation until the current rotation is completed
- // Lock out readers while performing a non-atomic sequence of filesystem
- // operations (RLock: open, Lock: rename, delete).
- //
- // fsopMu should be locked for writing only while holding rotateMu.
- fsopMu sync.RWMutex
- // Logger configuration
- capacity int64 // maximum size of each file
- maxFiles int // maximum number of files
- compress bool // whether old versions of log files are compressed
- perms os.FileMode
- // Log file codec
- createDecoder MakeDecoderFn
- getTailReader GetTailReaderFunc
- // Log reader state in a 1-buffered channel.
- //
- // Share memory by communicating: receive to acquire, send to release.
- // The state struct is passed around by value so that use-after-send
- // bugs cannot escalate to data races.
- //
- // A method which receives the state value takes ownership of it. The
- // owner is responsible for either passing ownership along or sending
- // the state back to the channel. By convention, the semantics of
- // passing along ownership is expressed with function argument types.
- // Methods which take a pointer *logReadState argument borrow the state,
- // analogous to functions which require a lock to be held when calling.
- // The caller retains ownership. Calling a method which which takes a
- // value logFileState argument gives ownership to the callee.
- read chan logReadState
- decompress *sharedTempFileConverter
- pos logPos // Current log file write position.
- f *os.File // Current log file for writing.
- lastTimestamp time.Time // timestamp of the last log
- }
- type logPos struct {
- // Size of the current file.
- size int64
- // File rotation sequence number (modulo 2**16).
- rotation uint16
- }
- type logReadState struct {
- // Current log file position.
- pos logPos
- // Wait list to be notified of the value of pos next time it changes.
- wait []chan<- logPos
- }
- // MakeDecoderFn creates a decoder
- type MakeDecoderFn func(rdr io.Reader) Decoder
- // Decoder is for reading logs
- // It is created by the log reader by calling the `MakeDecoderFunc`
- type Decoder interface {
- // Reset resets the decoder
- // Reset is called for certain events, such as log rotations
- Reset(io.Reader)
- // Decode decodes the next log messeage from the stream
- Decode() (*logger.Message, error)
- // Close signals to the decoder that it can release whatever resources it was using.
- Close()
- }
- // SizeReaderAt defines a ReaderAt that also reports its size.
- // This is used for tailing log files.
- type SizeReaderAt interface {
- io.Reader
- io.ReaderAt
- Size() int64
- }
- type readAtCloser interface {
- io.ReaderAt
- io.Closer
- }
- // GetTailReaderFunc is used to truncate a reader to only read as much as is required
- // in order to get the passed in number of log lines.
- // It returns the sectioned reader, the number of lines that the section reader
- // contains, and any error that occurs.
- type GetTailReaderFunc func(ctx context.Context, f SizeReaderAt, nLogLines int) (rdr io.Reader, nLines int, err error)
- // NewLogFile creates new LogFile
- func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, decodeFunc MakeDecoderFn, perms os.FileMode, getTailReader GetTailReaderFunc) (*LogFile, error) {
- log, err := openFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, perms)
- if err != nil {
- return nil, err
- }
- size, err := log.Seek(0, io.SeekEnd)
- if err != nil {
- return nil, err
- }
- pos := logPos{
- size: size,
- // Force a wraparound on first rotation to shake out any
- // modular-arithmetic bugs.
- rotation: math.MaxUint16,
- }
- st := make(chan logReadState, 1)
- st <- logReadState{pos: pos}
- return &LogFile{
- f: log,
- read: st,
- pos: pos,
- closed: make(chan struct{}),
- capacity: capacity,
- maxFiles: maxFiles,
- compress: compress,
- decompress: newSharedTempFileConverter(decompress),
- createDecoder: decodeFunc,
- perms: perms,
- getTailReader: getTailReader,
- }, nil
- }
- // WriteLogEntry writes the provided log message to the current log file.
- // This may trigger a rotation event if the max file/capacity limits are hit.
- func (w *LogFile) WriteLogEntry(timestamp time.Time, marshalled []byte) error {
- select {
- case <-w.closed:
- return errors.New("cannot write because the output file was closed")
- default:
- }
- w.mu.Lock()
- defer w.mu.Unlock()
- // Are we due for a rotation?
- if w.capacity != -1 && w.pos.size >= w.capacity {
- if err := w.rotate(); err != nil {
- return errors.Wrap(err, "error rotating log file")
- }
- }
- n, err := w.f.Write(marshalled)
- if err != nil {
- return errors.Wrap(err, "error writing log entry")
- }
- w.pos.size += int64(n)
- w.lastTimestamp = timestamp
- // Notify any waiting readers that there is a new log entry to read.
- st := <-w.read
- defer func() { w.read <- st }()
- st.pos = w.pos
- for _, c := range st.wait {
- c <- st.pos
- }
- // Optimization: retain the backing array to save a heap allocation next
- // time a reader appends to the list.
- if st.wait != nil {
- st.wait = st.wait[:0]
- }
- return nil
- }
- func (w *LogFile) rotate() (retErr error) {
- w.rotateMu.Lock()
- noCompress := w.maxFiles <= 1 || !w.compress
- defer func() {
- // If we aren't going to run the goroutine to compress the log file, then we need to unlock in this function.
- // Otherwise the lock will be released in the goroutine that handles compression.
- if retErr != nil || noCompress {
- w.rotateMu.Unlock()
- }
- }()
- fname := w.f.Name()
- if err := w.f.Close(); err != nil {
- // if there was an error during a prior rotate, the file could already be closed
- if !errors.Is(err, fs.ErrClosed) {
- return errors.Wrap(err, "error closing file")
- }
- }
- file, err := func() (*os.File, error) {
- w.fsopMu.Lock()
- defer w.fsopMu.Unlock()
- if err := rotate(fname, w.maxFiles, w.compress); err != nil {
- log.G(context.TODO()).WithError(err).Warn("Error rotating log file, log data may have been lost")
- } else {
- // We may have readers working their way through the
- // current log file so we can't truncate it. We need to
- // start writing new logs to an empty file with the same
- // name as the current one so we need to rotate the
- // current file out of the way.
- if w.maxFiles < 2 {
- if err := unlink(fname); err != nil && !errors.Is(err, fs.ErrNotExist) {
- log.G(context.TODO()).WithError(err).Error("Error unlinking current log file")
- }
- } else {
- if err := os.Rename(fname, fname+".1"); err != nil && !errors.Is(err, fs.ErrNotExist) {
- log.G(context.TODO()).WithError(err).Error("Error renaming current log file")
- }
- }
- }
- // Notwithstanding the above, open with the truncate flag anyway
- // in case rotation didn't work out as planned.
- return openFile(fname, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, w.perms)
- }()
- if err != nil {
- return err
- }
- w.f = file
- w.pos = logPos{rotation: w.pos.rotation + 1}
- if noCompress {
- return nil
- }
- ts := w.lastTimestamp
- go func() {
- defer w.rotateMu.Unlock()
- // No need to hold fsopMu as at no point will the filesystem be
- // in a state which would cause problems for readers. Opening
- // the uncompressed file is tried first, falling back to the
- // compressed one. compressFile only deletes the uncompressed
- // file once the compressed one is fully written out, so at no
- // point during the compression process will a reader fail to
- // open a complete copy of the file.
- if err := compressFile(fname+".1", ts); err != nil {
- log.G(context.TODO()).WithError(err).Error("Error compressing log file after rotation")
- }
- }()
- return nil
- }
- func rotate(name string, maxFiles int, compress bool) error {
- if maxFiles < 2 {
- return nil
- }
- var extension string
- if compress {
- extension = ".gz"
- }
- lastFile := fmt.Sprintf("%s.%d%s", name, maxFiles-1, extension)
- err := unlink(lastFile)
- if err != nil && !errors.Is(err, fs.ErrNotExist) {
- return errors.Wrap(err, "error removing oldest log file")
- }
- for i := maxFiles - 1; i > 1; i-- {
- toPath := name + "." + strconv.Itoa(i) + extension
- fromPath := name + "." + strconv.Itoa(i-1) + extension
- err := os.Rename(fromPath, toPath)
- log.G(context.TODO()).WithError(err).WithField("source", fromPath).WithField("target", toPath).Trace("Rotating log file")
- if err != nil && !errors.Is(err, fs.ErrNotExist) {
- return err
- }
- }
- return nil
- }
- func compressFile(fileName string, lastTimestamp time.Time) (retErr error) {
- file, err := open(fileName)
- if err != nil {
- if errors.Is(err, fs.ErrNotExist) {
- log.G(context.TODO()).WithField("file", fileName).WithError(err).Debug("Could not open log file to compress")
- return nil
- }
- return errors.Wrap(err, "failed to open log file")
- }
- defer func() {
- file.Close()
- if retErr == nil {
- err := unlink(fileName)
- if err != nil && !errors.Is(err, fs.ErrNotExist) {
- retErr = errors.Wrap(err, "failed to remove source log file")
- }
- }
- }()
- outFile, err := openFile(fileName+".gz", os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0o640)
- if err != nil {
- return errors.Wrap(err, "failed to open or create gzip log file")
- }
- defer func() {
- outFile.Close()
- if retErr != nil {
- if err := unlink(fileName + ".gz"); err != nil && !errors.Is(err, fs.ErrNotExist) {
- log.G(context.TODO()).WithError(err).Error("Error cleaning up after failed log compression")
- }
- }
- }()
- compressWriter := gzip.NewWriter(outFile)
- defer compressWriter.Close()
- // Add the last log entry timestamp to the gzip header
- extra := rotateFileMetadata{}
- extra.LastTime = lastTimestamp
- compressWriter.Header.Extra, err = json.Marshal(&extra)
- if err != nil {
- // Here log the error only and don't return since this is just an optimization.
- log.G(context.TODO()).Warningf("Failed to marshal gzip header as JSON: %v", err)
- }
- _, err = pools.Copy(compressWriter, file)
- if err != nil {
- return errors.Wrapf(err, "error compressing log file %s", fileName)
- }
- return nil
- }
- // MaxFiles return maximum number of files
- func (w *LogFile) MaxFiles() int {
- return w.maxFiles
- }
- // Close closes underlying file and signals all readers to stop.
- func (w *LogFile) Close() error {
- w.mu.Lock()
- defer w.mu.Unlock()
- select {
- case <-w.closed:
- return nil
- default:
- }
- if err := w.f.Close(); err != nil && !errors.Is(err, fs.ErrClosed) {
- return err
- }
- close(w.closed)
- // Wait until any in-progress rotation is complete.
- w.rotateMu.Lock()
- w.rotateMu.Unlock() //nolint:staticcheck
- return nil
- }
- // ReadLogs decodes entries from log files.
- //
- // It is the caller's responsibility to call ConsumerGone on the LogWatcher.
- func (w *LogFile) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
- watcher := logger.NewLogWatcher()
- // Lock out filesystem operations so that we can capture the read
- // position and atomically open the corresponding log file, without the
- // file getting rotated out from under us.
- w.fsopMu.RLock()
- // Capture the read position synchronously to ensure that we start
- // following from the last entry logged before ReadLogs was called,
- // which is required for flake-free unit testing.
- st := <-w.read
- pos := st.pos
- w.read <- st
- go w.readLogsLocked(pos, config, watcher)
- return watcher
- }
- // readLogsLocked is the bulk of the implementation of ReadLogs.
- //
- // w.fsopMu must be locked for reading when calling this method.
- // w.fsopMu.RUnlock() is called before returning.
- func (w *LogFile) readLogsLocked(currentPos logPos, config logger.ReadConfig, watcher *logger.LogWatcher) {
- defer close(watcher.Msg)
- currentFile, err := open(w.f.Name())
- if err != nil {
- watcher.Err <- err
- return
- }
- defer currentFile.Close()
- dec := w.createDecoder(nil)
- defer dec.Close()
- currentChunk := io.NewSectionReader(currentFile, 0, currentPos.size)
- fwd := newForwarder(config)
- if config.Tail != 0 {
- // TODO(@cpuguy83): Instead of opening every file, only get the files which
- // are needed to tail.
- // This is especially costly when compression is enabled.
- files, err := w.openRotatedFiles(config)
- if err != nil {
- watcher.Err <- err
- return
- }
- closeFiles := func() {
- for _, f := range files {
- f.Close()
- }
- }
- readers := make([]SizeReaderAt, 0, len(files)+1)
- for _, f := range files {
- switch ff := f.(type) {
- case SizeReaderAt:
- readers = append(readers, ff)
- case interface{ Stat() (fs.FileInfo, error) }:
- stat, err := ff.Stat()
- if err != nil {
- watcher.Err <- errors.Wrap(err, "error reading size of rotated file")
- closeFiles()
- return
- }
- readers = append(readers, io.NewSectionReader(f, 0, stat.Size()))
- default:
- panic(fmt.Errorf("rotated file value %#v (%[1]T) has neither Size() nor Stat() methods", f))
- }
- }
- if currentChunk.Size() > 0 {
- readers = append(readers, currentChunk)
- }
- ok := tailFiles(readers, watcher, dec, w.getTailReader, config.Tail, fwd)
- closeFiles()
- if !ok {
- return
- }
- } else {
- w.fsopMu.RUnlock()
- }
- if !config.Follow {
- return
- }
- (&follow{
- LogFile: w,
- Watcher: watcher,
- Decoder: dec,
- Forwarder: fwd,
- }).Do(currentFile, currentPos)
- }
- // openRotatedFiles returns a slice of files open for reading, in order from
- // oldest to newest, and calls w.fsopMu.RUnlock() before returning.
- //
- // This method must only be called with w.fsopMu locked for reading.
- func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (files []readAtCloser, err error) {
- type rotatedFile struct {
- f *os.File
- compressed bool
- }
- var q []rotatedFile
- defer func() {
- if err != nil {
- for _, qq := range q {
- qq.f.Close()
- }
- for _, f := range files {
- f.Close()
- }
- }
- }()
- q, err = func() (q []rotatedFile, err error) {
- defer w.fsopMu.RUnlock()
- q = make([]rotatedFile, 0, w.maxFiles)
- for i := w.maxFiles; i > 1; i-- {
- var f rotatedFile
- f.f, err = open(fmt.Sprintf("%s.%d", w.f.Name(), i-1))
- if err != nil {
- if !errors.Is(err, fs.ErrNotExist) {
- return nil, errors.Wrap(err, "error opening rotated log file")
- }
- f.compressed = true
- f.f, err = open(fmt.Sprintf("%s.%d.gz", w.f.Name(), i-1))
- if err != nil {
- if !errors.Is(err, fs.ErrNotExist) {
- return nil, errors.Wrap(err, "error opening file for decompression")
- }
- continue
- }
- }
- q = append(q, f)
- }
- return q, nil
- }()
- if err != nil {
- return nil, err
- }
- for len(q) > 0 {
- qq := q[0]
- q = q[1:]
- if qq.compressed {
- defer qq.f.Close()
- f, err := w.maybeDecompressFile(qq.f, config)
- if err != nil {
- return nil, err
- }
- if f != nil {
- // The log before `config.Since` does not need to read
- files = append(files, f)
- }
- } else {
- files = append(files, qq.f)
- }
- }
- return files, nil
- }
- func (w *LogFile) maybeDecompressFile(cf *os.File, config logger.ReadConfig) (readAtCloser, error) {
- rc, err := gzip.NewReader(cf)
- if err != nil {
- return nil, errors.Wrap(err, "error making gzip reader for compressed log file")
- }
- defer rc.Close()
- // Extract the last log entry timestramp from the gzip header
- extra := &rotateFileMetadata{}
- err = json.Unmarshal(rc.Header.Extra, extra)
- if err == nil && !extra.LastTime.IsZero() && extra.LastTime.Before(config.Since) {
- return nil, nil
- }
- tmpf, err := w.decompress.Do(cf)
- return tmpf, errors.Wrap(err, "error decompressing log file")
- }
- func decompress(dst io.WriteSeeker, src io.ReadSeeker) error {
- if _, err := src.Seek(0, io.SeekStart); err != nil {
- return err
- }
- rc, err := gzip.NewReader(src)
- if err != nil {
- return err
- }
- _, err = pools.Copy(dst, rc)
- if err != nil {
- return err
- }
- return rc.Close()
- }
- func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, getTailReader GetTailReaderFunc, nLines int, fwd *forwarder) (cont bool) {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- cont = true
- // TODO(@cpuguy83): we should plumb a context through instead of dealing with `WatchClose()` here.
- go func() {
- select {
- case <-ctx.Done():
- case <-watcher.WatchConsumerGone():
- cancel()
- }
- }()
- readers := make([]io.Reader, 0, len(files))
- if nLines > 0 {
- for i := len(files) - 1; i >= 0 && nLines > 0; i-- {
- tail, n, err := getTailReader(ctx, files[i], nLines)
- if err != nil {
- watcher.Err <- errors.Wrap(err, "error finding file position to start log tailing")
- return false
- }
- nLines -= n
- readers = append([]io.Reader{tail}, readers...)
- }
- } else {
- for _, r := range files {
- readers = append(readers, r)
- }
- }
- rdr := io.MultiReader(readers...)
- dec.Reset(rdr)
- return fwd.Do(watcher, dec)
- }
- type forwarder struct {
- since, until time.Time
- }
- func newForwarder(config logger.ReadConfig) *forwarder {
- return &forwarder{since: config.Since, until: config.Until}
- }
- // Do reads log messages from dec and sends the messages matching the filter
- // conditions to watcher. Do returns cont=true iff it has read all messages from
- // dec without encountering a message with a timestamp which is after the
- // configured until time.
- func (fwd *forwarder) Do(watcher *logger.LogWatcher, dec Decoder) (cont bool) {
- for {
- msg, err := dec.Decode()
- if err != nil {
- if errors.Is(err, io.EOF) {
- return true
- }
- watcher.Err <- err
- return false
- }
- if !fwd.since.IsZero() {
- if msg.Timestamp.Before(fwd.since) {
- continue
- }
- // We've found our first message with a timestamp >= since. As message
- // timestamps might not be monotonic, we need to skip the since check for all
- // subsequent messages so we do not filter out later messages which happen to
- // have timestamps before since.
- fwd.since = time.Time{}
- }
- if !fwd.until.IsZero() && msg.Timestamp.After(fwd.until) {
- return false
- }
- select {
- case <-watcher.WatchConsumerGone():
- return false
- case watcher.Msg <- msg:
- }
- }
- }
|