logfile.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654
  1. package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutils"
  2. import (
  3. "compress/gzip"
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "io"
  8. "io/fs"
  9. "math"
  10. "os"
  11. "strconv"
  12. "sync"
  13. "time"
  14. "github.com/containerd/log"
  15. "github.com/docker/docker/daemon/logger"
  16. "github.com/docker/docker/pkg/pools"
  17. "github.com/pkg/errors"
  18. )
  19. // rotateFileMetadata is a metadata of the gzip header of the compressed log file
  20. type rotateFileMetadata struct {
  21. LastTime time.Time `json:"lastTime,omitempty"`
  22. }
  23. // LogFile is Logger implementation for default Docker logging.
  24. type LogFile struct {
  25. mu sync.Mutex // protects the logfile access
  26. closed chan struct{}
  27. rotateMu sync.Mutex // blocks the next rotation until the current rotation is completed
  28. // Lock out readers while performing a non-atomic sequence of filesystem
  29. // operations (RLock: open, Lock: rename, delete).
  30. //
  31. // fsopMu should be locked for writing only while holding rotateMu.
  32. fsopMu sync.RWMutex
  33. // Logger configuration
  34. capacity int64 // maximum size of each file
  35. maxFiles int // maximum number of files
  36. compress bool // whether old versions of log files are compressed
  37. perms os.FileMode
  38. // Log file codec
  39. createDecoder MakeDecoderFn
  40. getTailReader GetTailReaderFunc
  41. // Log reader state in a 1-buffered channel.
  42. //
  43. // Share memory by communicating: receive to acquire, send to release.
  44. // The state struct is passed around by value so that use-after-send
  45. // bugs cannot escalate to data races.
  46. //
  47. // A method which receives the state value takes ownership of it. The
  48. // owner is responsible for either passing ownership along or sending
  49. // the state back to the channel. By convention, the semantics of
  50. // passing along ownership is expressed with function argument types.
  51. // Methods which take a pointer *logReadState argument borrow the state,
  52. // analogous to functions which require a lock to be held when calling.
  53. // The caller retains ownership. Calling a method which which takes a
  54. // value logFileState argument gives ownership to the callee.
  55. read chan logReadState
  56. decompress *sharedTempFileConverter
  57. pos logPos // Current log file write position.
  58. f *os.File // Current log file for writing.
  59. lastTimestamp time.Time // timestamp of the last log
  60. }
  61. type logPos struct {
  62. // Size of the current file.
  63. size int64
  64. // File rotation sequence number (modulo 2**16).
  65. rotation uint16
  66. }
  67. type logReadState struct {
  68. // Current log file position.
  69. pos logPos
  70. // Wait list to be notified of the value of pos next time it changes.
  71. wait []chan<- logPos
  72. }
  73. // MakeDecoderFn creates a decoder
  74. type MakeDecoderFn func(rdr io.Reader) Decoder
  75. // Decoder is for reading logs
  76. // It is created by the log reader by calling the `MakeDecoderFunc`
  77. type Decoder interface {
  78. // Reset resets the decoder
  79. // Reset is called for certain events, such as log rotations
  80. Reset(io.Reader)
  81. // Decode decodes the next log messeage from the stream
  82. Decode() (*logger.Message, error)
  83. // Close signals to the decoder that it can release whatever resources it was using.
  84. Close()
  85. }
  86. // SizeReaderAt defines a ReaderAt that also reports its size.
  87. // This is used for tailing log files.
  88. type SizeReaderAt interface {
  89. io.Reader
  90. io.ReaderAt
  91. Size() int64
  92. }
  93. type readAtCloser interface {
  94. io.ReaderAt
  95. io.Closer
  96. }
  97. // GetTailReaderFunc is used to truncate a reader to only read as much as is required
  98. // in order to get the passed in number of log lines.
  99. // It returns the sectioned reader, the number of lines that the section reader
  100. // contains, and any error that occurs.
  101. type GetTailReaderFunc func(ctx context.Context, f SizeReaderAt, nLogLines int) (rdr io.Reader, nLines int, err error)
  102. // NewLogFile creates new LogFile
  103. func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, decodeFunc MakeDecoderFn, perms os.FileMode, getTailReader GetTailReaderFunc) (*LogFile, error) {
  104. log, err := openFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, perms)
  105. if err != nil {
  106. return nil, err
  107. }
  108. size, err := log.Seek(0, io.SeekEnd)
  109. if err != nil {
  110. return nil, err
  111. }
  112. pos := logPos{
  113. size: size,
  114. // Force a wraparound on first rotation to shake out any
  115. // modular-arithmetic bugs.
  116. rotation: math.MaxUint16,
  117. }
  118. st := make(chan logReadState, 1)
  119. st <- logReadState{pos: pos}
  120. return &LogFile{
  121. f: log,
  122. read: st,
  123. pos: pos,
  124. closed: make(chan struct{}),
  125. capacity: capacity,
  126. maxFiles: maxFiles,
  127. compress: compress,
  128. decompress: newSharedTempFileConverter(decompress),
  129. createDecoder: decodeFunc,
  130. perms: perms,
  131. getTailReader: getTailReader,
  132. }, nil
  133. }
  134. // WriteLogEntry writes the provided log message to the current log file.
  135. // This may trigger a rotation event if the max file/capacity limits are hit.
  136. func (w *LogFile) WriteLogEntry(timestamp time.Time, marshalled []byte) error {
  137. select {
  138. case <-w.closed:
  139. return errors.New("cannot write because the output file was closed")
  140. default:
  141. }
  142. w.mu.Lock()
  143. defer w.mu.Unlock()
  144. // Are we due for a rotation?
  145. if w.capacity != -1 && w.pos.size >= w.capacity {
  146. if err := w.rotate(); err != nil {
  147. return errors.Wrap(err, "error rotating log file")
  148. }
  149. }
  150. n, err := w.f.Write(marshalled)
  151. if err != nil {
  152. return errors.Wrap(err, "error writing log entry")
  153. }
  154. w.pos.size += int64(n)
  155. w.lastTimestamp = timestamp
  156. // Notify any waiting readers that there is a new log entry to read.
  157. st := <-w.read
  158. defer func() { w.read <- st }()
  159. st.pos = w.pos
  160. for _, c := range st.wait {
  161. c <- st.pos
  162. }
  163. // Optimization: retain the backing array to save a heap allocation next
  164. // time a reader appends to the list.
  165. if st.wait != nil {
  166. st.wait = st.wait[:0]
  167. }
  168. return nil
  169. }
  170. func (w *LogFile) rotate() (retErr error) {
  171. w.rotateMu.Lock()
  172. noCompress := w.maxFiles <= 1 || !w.compress
  173. defer func() {
  174. // If we aren't going to run the goroutine to compress the log file, then we need to unlock in this function.
  175. // Otherwise the lock will be released in the goroutine that handles compression.
  176. if retErr != nil || noCompress {
  177. w.rotateMu.Unlock()
  178. }
  179. }()
  180. fname := w.f.Name()
  181. if err := w.f.Close(); err != nil {
  182. // if there was an error during a prior rotate, the file could already be closed
  183. if !errors.Is(err, fs.ErrClosed) {
  184. return errors.Wrap(err, "error closing file")
  185. }
  186. }
  187. file, err := func() (*os.File, error) {
  188. w.fsopMu.Lock()
  189. defer w.fsopMu.Unlock()
  190. if err := rotate(fname, w.maxFiles, w.compress); err != nil {
  191. log.G(context.TODO()).WithError(err).Warn("Error rotating log file, log data may have been lost")
  192. } else {
  193. // We may have readers working their way through the
  194. // current log file so we can't truncate it. We need to
  195. // start writing new logs to an empty file with the same
  196. // name as the current one so we need to rotate the
  197. // current file out of the way.
  198. if w.maxFiles < 2 {
  199. if err := unlink(fname); err != nil && !errors.Is(err, fs.ErrNotExist) {
  200. log.G(context.TODO()).WithError(err).Error("Error unlinking current log file")
  201. }
  202. } else {
  203. if err := os.Rename(fname, fname+".1"); err != nil && !errors.Is(err, fs.ErrNotExist) {
  204. log.G(context.TODO()).WithError(err).Error("Error renaming current log file")
  205. }
  206. }
  207. }
  208. // Notwithstanding the above, open with the truncate flag anyway
  209. // in case rotation didn't work out as planned.
  210. return openFile(fname, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, w.perms)
  211. }()
  212. if err != nil {
  213. return err
  214. }
  215. w.f = file
  216. w.pos = logPos{rotation: w.pos.rotation + 1}
  217. if noCompress {
  218. return nil
  219. }
  220. ts := w.lastTimestamp
  221. go func() {
  222. defer w.rotateMu.Unlock()
  223. // No need to hold fsopMu as at no point will the filesystem be
  224. // in a state which would cause problems for readers. Opening
  225. // the uncompressed file is tried first, falling back to the
  226. // compressed one. compressFile only deletes the uncompressed
  227. // file once the compressed one is fully written out, so at no
  228. // point during the compression process will a reader fail to
  229. // open a complete copy of the file.
  230. if err := compressFile(fname+".1", ts); err != nil {
  231. log.G(context.TODO()).WithError(err).Error("Error compressing log file after rotation")
  232. }
  233. }()
  234. return nil
  235. }
  236. func rotate(name string, maxFiles int, compress bool) error {
  237. if maxFiles < 2 {
  238. return nil
  239. }
  240. var extension string
  241. if compress {
  242. extension = ".gz"
  243. }
  244. lastFile := fmt.Sprintf("%s.%d%s", name, maxFiles-1, extension)
  245. err := unlink(lastFile)
  246. if err != nil && !errors.Is(err, fs.ErrNotExist) {
  247. return errors.Wrap(err, "error removing oldest log file")
  248. }
  249. for i := maxFiles - 1; i > 1; i-- {
  250. toPath := name + "." + strconv.Itoa(i) + extension
  251. fromPath := name + "." + strconv.Itoa(i-1) + extension
  252. err := os.Rename(fromPath, toPath)
  253. log.G(context.TODO()).WithError(err).WithField("source", fromPath).WithField("target", toPath).Trace("Rotating log file")
  254. if err != nil && !errors.Is(err, fs.ErrNotExist) {
  255. return err
  256. }
  257. }
  258. return nil
  259. }
  260. func compressFile(fileName string, lastTimestamp time.Time) (retErr error) {
  261. file, err := open(fileName)
  262. if err != nil {
  263. if errors.Is(err, fs.ErrNotExist) {
  264. log.G(context.TODO()).WithField("file", fileName).WithError(err).Debug("Could not open log file to compress")
  265. return nil
  266. }
  267. return errors.Wrap(err, "failed to open log file")
  268. }
  269. defer func() {
  270. file.Close()
  271. if retErr == nil {
  272. err := unlink(fileName)
  273. if err != nil && !errors.Is(err, fs.ErrNotExist) {
  274. retErr = errors.Wrap(err, "failed to remove source log file")
  275. }
  276. }
  277. }()
  278. outFile, err := openFile(fileName+".gz", os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0o640)
  279. if err != nil {
  280. return errors.Wrap(err, "failed to open or create gzip log file")
  281. }
  282. defer func() {
  283. outFile.Close()
  284. if retErr != nil {
  285. if err := unlink(fileName + ".gz"); err != nil && !errors.Is(err, fs.ErrNotExist) {
  286. log.G(context.TODO()).WithError(err).Error("Error cleaning up after failed log compression")
  287. }
  288. }
  289. }()
  290. compressWriter := gzip.NewWriter(outFile)
  291. defer compressWriter.Close()
  292. // Add the last log entry timestamp to the gzip header
  293. extra := rotateFileMetadata{}
  294. extra.LastTime = lastTimestamp
  295. compressWriter.Header.Extra, err = json.Marshal(&extra)
  296. if err != nil {
  297. // Here log the error only and don't return since this is just an optimization.
  298. log.G(context.TODO()).Warningf("Failed to marshal gzip header as JSON: %v", err)
  299. }
  300. _, err = pools.Copy(compressWriter, file)
  301. if err != nil {
  302. return errors.Wrapf(err, "error compressing log file %s", fileName)
  303. }
  304. return nil
  305. }
  306. // MaxFiles return maximum number of files
  307. func (w *LogFile) MaxFiles() int {
  308. return w.maxFiles
  309. }
  310. // Close closes underlying file and signals all readers to stop.
  311. func (w *LogFile) Close() error {
  312. w.mu.Lock()
  313. defer w.mu.Unlock()
  314. select {
  315. case <-w.closed:
  316. return nil
  317. default:
  318. }
  319. if err := w.f.Close(); err != nil && !errors.Is(err, fs.ErrClosed) {
  320. return err
  321. }
  322. close(w.closed)
  323. // Wait until any in-progress rotation is complete.
  324. w.rotateMu.Lock()
  325. w.rotateMu.Unlock() //nolint:staticcheck
  326. return nil
  327. }
  328. // ReadLogs decodes entries from log files.
  329. //
  330. // It is the caller's responsibility to call ConsumerGone on the LogWatcher.
  331. func (w *LogFile) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
  332. watcher := logger.NewLogWatcher()
  333. // Lock out filesystem operations so that we can capture the read
  334. // position and atomically open the corresponding log file, without the
  335. // file getting rotated out from under us.
  336. w.fsopMu.RLock()
  337. // Capture the read position synchronously to ensure that we start
  338. // following from the last entry logged before ReadLogs was called,
  339. // which is required for flake-free unit testing.
  340. st := <-w.read
  341. pos := st.pos
  342. w.read <- st
  343. go w.readLogsLocked(pos, config, watcher)
  344. return watcher
  345. }
  346. // readLogsLocked is the bulk of the implementation of ReadLogs.
  347. //
  348. // w.fsopMu must be locked for reading when calling this method.
  349. // w.fsopMu.RUnlock() is called before returning.
  350. func (w *LogFile) readLogsLocked(currentPos logPos, config logger.ReadConfig, watcher *logger.LogWatcher) {
  351. defer close(watcher.Msg)
  352. currentFile, err := open(w.f.Name())
  353. if err != nil {
  354. watcher.Err <- err
  355. return
  356. }
  357. defer currentFile.Close()
  358. dec := w.createDecoder(nil)
  359. defer dec.Close()
  360. currentChunk := io.NewSectionReader(currentFile, 0, currentPos.size)
  361. fwd := newForwarder(config)
  362. if config.Tail != 0 {
  363. // TODO(@cpuguy83): Instead of opening every file, only get the files which
  364. // are needed to tail.
  365. // This is especially costly when compression is enabled.
  366. files, err := w.openRotatedFiles(config)
  367. if err != nil {
  368. watcher.Err <- err
  369. return
  370. }
  371. closeFiles := func() {
  372. for _, f := range files {
  373. f.Close()
  374. }
  375. }
  376. readers := make([]SizeReaderAt, 0, len(files)+1)
  377. for _, f := range files {
  378. switch ff := f.(type) {
  379. case SizeReaderAt:
  380. readers = append(readers, ff)
  381. case interface{ Stat() (fs.FileInfo, error) }:
  382. stat, err := ff.Stat()
  383. if err != nil {
  384. watcher.Err <- errors.Wrap(err, "error reading size of rotated file")
  385. closeFiles()
  386. return
  387. }
  388. readers = append(readers, io.NewSectionReader(f, 0, stat.Size()))
  389. default:
  390. panic(fmt.Errorf("rotated file value %#v (%[1]T) has neither Size() nor Stat() methods", f))
  391. }
  392. }
  393. if currentChunk.Size() > 0 {
  394. readers = append(readers, currentChunk)
  395. }
  396. ok := tailFiles(readers, watcher, dec, w.getTailReader, config.Tail, fwd)
  397. closeFiles()
  398. if !ok {
  399. return
  400. }
  401. } else {
  402. w.fsopMu.RUnlock()
  403. }
  404. if !config.Follow {
  405. return
  406. }
  407. (&follow{
  408. LogFile: w,
  409. Watcher: watcher,
  410. Decoder: dec,
  411. Forwarder: fwd,
  412. }).Do(currentFile, currentPos)
  413. }
  414. // openRotatedFiles returns a slice of files open for reading, in order from
  415. // oldest to newest, and calls w.fsopMu.RUnlock() before returning.
  416. //
  417. // This method must only be called with w.fsopMu locked for reading.
  418. func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (files []readAtCloser, err error) {
  419. type rotatedFile struct {
  420. f *os.File
  421. compressed bool
  422. }
  423. var q []rotatedFile
  424. defer func() {
  425. if err != nil {
  426. for _, qq := range q {
  427. qq.f.Close()
  428. }
  429. for _, f := range files {
  430. f.Close()
  431. }
  432. }
  433. }()
  434. q, err = func() (q []rotatedFile, err error) {
  435. defer w.fsopMu.RUnlock()
  436. q = make([]rotatedFile, 0, w.maxFiles)
  437. for i := w.maxFiles; i > 1; i-- {
  438. var f rotatedFile
  439. f.f, err = open(fmt.Sprintf("%s.%d", w.f.Name(), i-1))
  440. if err != nil {
  441. if !errors.Is(err, fs.ErrNotExist) {
  442. return nil, errors.Wrap(err, "error opening rotated log file")
  443. }
  444. f.compressed = true
  445. f.f, err = open(fmt.Sprintf("%s.%d.gz", w.f.Name(), i-1))
  446. if err != nil {
  447. if !errors.Is(err, fs.ErrNotExist) {
  448. return nil, errors.Wrap(err, "error opening file for decompression")
  449. }
  450. continue
  451. }
  452. }
  453. q = append(q, f)
  454. }
  455. return q, nil
  456. }()
  457. if err != nil {
  458. return nil, err
  459. }
  460. for len(q) > 0 {
  461. qq := q[0]
  462. q = q[1:]
  463. if qq.compressed {
  464. defer qq.f.Close()
  465. f, err := w.maybeDecompressFile(qq.f, config)
  466. if err != nil {
  467. return nil, err
  468. }
  469. if f != nil {
  470. // The log before `config.Since` does not need to read
  471. files = append(files, f)
  472. }
  473. } else {
  474. files = append(files, qq.f)
  475. }
  476. }
  477. return files, nil
  478. }
  479. func (w *LogFile) maybeDecompressFile(cf *os.File, config logger.ReadConfig) (readAtCloser, error) {
  480. rc, err := gzip.NewReader(cf)
  481. if err != nil {
  482. return nil, errors.Wrap(err, "error making gzip reader for compressed log file")
  483. }
  484. defer rc.Close()
  485. // Extract the last log entry timestramp from the gzip header
  486. extra := &rotateFileMetadata{}
  487. err = json.Unmarshal(rc.Header.Extra, extra)
  488. if err == nil && !extra.LastTime.IsZero() && extra.LastTime.Before(config.Since) {
  489. return nil, nil
  490. }
  491. tmpf, err := w.decompress.Do(cf)
  492. return tmpf, errors.Wrap(err, "error decompressing log file")
  493. }
  494. func decompress(dst io.WriteSeeker, src io.ReadSeeker) error {
  495. if _, err := src.Seek(0, io.SeekStart); err != nil {
  496. return err
  497. }
  498. rc, err := gzip.NewReader(src)
  499. if err != nil {
  500. return err
  501. }
  502. _, err = pools.Copy(dst, rc)
  503. if err != nil {
  504. return err
  505. }
  506. return rc.Close()
  507. }
  508. func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, getTailReader GetTailReaderFunc, nLines int, fwd *forwarder) (cont bool) {
  509. ctx, cancel := context.WithCancel(context.Background())
  510. defer cancel()
  511. cont = true
  512. // TODO(@cpuguy83): we should plumb a context through instead of dealing with `WatchClose()` here.
  513. go func() {
  514. select {
  515. case <-ctx.Done():
  516. case <-watcher.WatchConsumerGone():
  517. cancel()
  518. }
  519. }()
  520. readers := make([]io.Reader, 0, len(files))
  521. if nLines > 0 {
  522. for i := len(files) - 1; i >= 0 && nLines > 0; i-- {
  523. tail, n, err := getTailReader(ctx, files[i], nLines)
  524. if err != nil {
  525. watcher.Err <- errors.Wrap(err, "error finding file position to start log tailing")
  526. return false
  527. }
  528. nLines -= n
  529. readers = append([]io.Reader{tail}, readers...)
  530. }
  531. } else {
  532. for _, r := range files {
  533. readers = append(readers, r)
  534. }
  535. }
  536. rdr := io.MultiReader(readers...)
  537. dec.Reset(rdr)
  538. return fwd.Do(watcher, dec)
  539. }
  540. type forwarder struct {
  541. since, until time.Time
  542. }
  543. func newForwarder(config logger.ReadConfig) *forwarder {
  544. return &forwarder{since: config.Since, until: config.Until}
  545. }
  546. // Do reads log messages from dec and sends the messages matching the filter
  547. // conditions to watcher. Do returns cont=true iff it has read all messages from
  548. // dec without encountering a message with a timestamp which is after the
  549. // configured until time.
  550. func (fwd *forwarder) Do(watcher *logger.LogWatcher, dec Decoder) (cont bool) {
  551. for {
  552. msg, err := dec.Decode()
  553. if err != nil {
  554. if errors.Is(err, io.EOF) {
  555. return true
  556. }
  557. watcher.Err <- err
  558. return false
  559. }
  560. if !fwd.since.IsZero() {
  561. if msg.Timestamp.Before(fwd.since) {
  562. continue
  563. }
  564. // We've found our first message with a timestamp >= since. As message
  565. // timestamps might not be monotonic, we need to skip the since check for all
  566. // subsequent messages so we do not filter out later messages which happen to
  567. // have timestamps before since.
  568. fwd.since = time.Time{}
  569. }
  570. if !fwd.until.IsZero() && msg.Timestamp.After(fwd.until) {
  571. return false
  572. }
  573. select {
  574. case <-watcher.WatchConsumerGone():
  575. return false
  576. case watcher.Msg <- msg:
  577. }
  578. }
  579. }