logfile.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676
  1. package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutils"
  2. import (
  3. "compress/gzip"
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "io"
  8. "io/fs"
  9. "os"
  10. "runtime"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "time"
  15. "github.com/docker/docker/daemon/logger"
  16. "github.com/docker/docker/pkg/filenotify"
  17. "github.com/docker/docker/pkg/pools"
  18. "github.com/docker/docker/pkg/pubsub"
  19. "github.com/pkg/errors"
  20. "github.com/sirupsen/logrus"
  21. )
  22. const tmpLogfileSuffix = ".tmp"
  23. // rotateFileMetadata is a metadata of the gzip header of the compressed log file
  24. type rotateFileMetadata struct {
  25. LastTime time.Time `json:"lastTime,omitempty"`
  26. }
  27. // refCounter is a counter of logfile being referenced
  28. type refCounter struct {
  29. mu sync.Mutex
  30. counter map[string]int
  31. }
  32. // Reference increase the reference counter for specified logfile
  33. func (rc *refCounter) GetReference(fileName string, openRefFile func(fileName string, exists bool) (*os.File, error)) (*os.File, error) {
  34. rc.mu.Lock()
  35. defer rc.mu.Unlock()
  36. var (
  37. file *os.File
  38. err error
  39. )
  40. _, ok := rc.counter[fileName]
  41. file, err = openRefFile(fileName, ok)
  42. if err != nil {
  43. return nil, err
  44. }
  45. if ok {
  46. rc.counter[fileName]++
  47. } else if file != nil {
  48. rc.counter[file.Name()] = 1
  49. }
  50. return file, nil
  51. }
  52. // Dereference reduce the reference counter for specified logfile
  53. func (rc *refCounter) Dereference(fileName string) error {
  54. rc.mu.Lock()
  55. defer rc.mu.Unlock()
  56. rc.counter[fileName]--
  57. if rc.counter[fileName] <= 0 {
  58. delete(rc.counter, fileName)
  59. err := unlink(fileName)
  60. if err != nil && !errors.Is(err, fs.ErrNotExist) {
  61. return err
  62. }
  63. }
  64. return nil
  65. }
  66. // LogFile is Logger implementation for default Docker logging.
  67. type LogFile struct {
  68. mu sync.RWMutex // protects the logfile access
  69. f *os.File // store for closing
  70. closed bool
  71. closedCh chan struct{}
  72. rotateMu sync.Mutex // blocks the next rotation until the current rotation is completed
  73. capacity int64 // maximum size of each file
  74. currentSize int64 // current size of the latest file
  75. maxFiles int // maximum number of files
  76. compress bool // whether old versions of log files are compressed
  77. lastTimestamp time.Time // timestamp of the last log
  78. filesRefCounter refCounter // keep reference-counted of decompressed files
  79. notifyReaders *pubsub.Publisher
  80. marshal logger.MarshalFunc
  81. createDecoder MakeDecoderFn
  82. getTailReader GetTailReaderFunc
  83. perms os.FileMode
  84. }
  85. // MakeDecoderFn creates a decoder
  86. type MakeDecoderFn func(rdr io.Reader) Decoder
  87. // Decoder is for reading logs
  88. // It is created by the log reader by calling the `MakeDecoderFunc`
  89. type Decoder interface {
  90. // Reset resets the decoder
  91. // Reset is called for certain events, such as log rotations
  92. Reset(io.Reader)
  93. // Decode decodes the next log messeage from the stream
  94. Decode() (*logger.Message, error)
  95. // Close signals to the decoder that it can release whatever resources it was using.
  96. Close()
  97. }
  98. // SizeReaderAt defines a ReaderAt that also reports its size.
  99. // This is used for tailing log files.
  100. type SizeReaderAt interface {
  101. io.ReaderAt
  102. Size() int64
  103. }
  104. // GetTailReaderFunc is used to truncate a reader to only read as much as is required
  105. // in order to get the passed in number of log lines.
  106. // It returns the sectioned reader, the number of lines that the section reader
  107. // contains, and any error that occurs.
  108. type GetTailReaderFunc func(ctx context.Context, f SizeReaderAt, nLogLines int) (rdr io.Reader, nLines int, err error)
  109. // NewLogFile creates new LogFile
  110. func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, marshaller logger.MarshalFunc, decodeFunc MakeDecoderFn, perms os.FileMode, getTailReader GetTailReaderFunc) (*LogFile, error) {
  111. log, err := openFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, perms)
  112. if err != nil {
  113. return nil, err
  114. }
  115. size, err := log.Seek(0, io.SeekEnd)
  116. if err != nil {
  117. return nil, err
  118. }
  119. return &LogFile{
  120. f: log,
  121. closedCh: make(chan struct{}),
  122. capacity: capacity,
  123. currentSize: size,
  124. maxFiles: maxFiles,
  125. compress: compress,
  126. filesRefCounter: refCounter{counter: make(map[string]int)},
  127. notifyReaders: pubsub.NewPublisher(0, 1),
  128. marshal: marshaller,
  129. createDecoder: decodeFunc,
  130. perms: perms,
  131. getTailReader: getTailReader,
  132. }, nil
  133. }
  134. // WriteLogEntry writes the provided log message to the current log file.
  135. // This may trigger a rotation event if the max file/capacity limits are hit.
  136. func (w *LogFile) WriteLogEntry(msg *logger.Message) error {
  137. b, err := w.marshal(msg)
  138. if err != nil {
  139. return errors.Wrap(err, "error marshalling log message")
  140. }
  141. ts := msg.Timestamp
  142. logger.PutMessage(msg)
  143. msg = nil // Turn use-after-put bugs into panics.
  144. w.mu.Lock()
  145. if w.closed {
  146. w.mu.Unlock()
  147. return errors.New("cannot write because the output file was closed")
  148. }
  149. if err := w.checkCapacityAndRotate(); err != nil {
  150. w.mu.Unlock()
  151. return errors.Wrap(err, "error rotating log file")
  152. }
  153. n, err := w.f.Write(b)
  154. if err == nil {
  155. w.currentSize += int64(n)
  156. w.lastTimestamp = ts
  157. }
  158. w.mu.Unlock()
  159. return errors.Wrap(err, "error writing log entry")
  160. }
  161. func (w *LogFile) checkCapacityAndRotate() (retErr error) {
  162. if w.capacity == -1 {
  163. return nil
  164. }
  165. if w.currentSize < w.capacity {
  166. return nil
  167. }
  168. w.rotateMu.Lock()
  169. noCompress := w.maxFiles <= 1 || !w.compress
  170. defer func() {
  171. // If we aren't going to run the goroutine to compress the log file, then we need to unlock in this function.
  172. // Otherwise the lock will be released in the goroutine that handles compression.
  173. if retErr != nil || noCompress {
  174. w.rotateMu.Unlock()
  175. }
  176. }()
  177. fname := w.f.Name()
  178. if err := w.f.Close(); err != nil {
  179. // if there was an error during a prior rotate, the file could already be closed
  180. if !errors.Is(err, fs.ErrClosed) {
  181. return errors.Wrap(err, "error closing file")
  182. }
  183. }
  184. if err := rotate(fname, w.maxFiles, w.compress); err != nil {
  185. logrus.WithError(err).Warn("Error rotating log file, log data may have been lost")
  186. } else {
  187. // We may have readers working their way through the current
  188. // log file so we can't truncate it. We need to start writing
  189. // new logs to an empty file with the same name as the current
  190. // one so we need to rotate the current file out of the way.
  191. if w.maxFiles < 2 {
  192. if err := unlink(fname); err != nil && !errors.Is(err, fs.ErrNotExist) {
  193. logrus.WithError(err).Error("Error unlinking current log file")
  194. }
  195. } else {
  196. if err := os.Rename(fname, fname+".1"); err != nil && !errors.Is(err, fs.ErrNotExist) {
  197. logrus.WithError(err).Error("Error renaming current log file")
  198. }
  199. }
  200. }
  201. // Notwithstanding the above, open with the truncate flag anyway in case
  202. // rotation didn't work out as planned.
  203. file, err := openFile(fname, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, w.perms)
  204. if err != nil {
  205. return err
  206. }
  207. w.f = file
  208. w.currentSize = 0
  209. w.notifyReaders.Publish(struct{}{})
  210. if noCompress {
  211. return nil
  212. }
  213. ts := w.lastTimestamp
  214. go func() {
  215. if err := compressFile(fname+".1", ts); err != nil {
  216. logrus.WithError(err).Error("Error compressing log file after rotation")
  217. }
  218. w.rotateMu.Unlock()
  219. }()
  220. return nil
  221. }
  222. func rotate(name string, maxFiles int, compress bool) error {
  223. if maxFiles < 2 {
  224. return nil
  225. }
  226. var extension string
  227. if compress {
  228. extension = ".gz"
  229. }
  230. lastFile := fmt.Sprintf("%s.%d%s", name, maxFiles-1, extension)
  231. err := unlink(lastFile)
  232. if err != nil && !errors.Is(err, fs.ErrNotExist) {
  233. return errors.Wrap(err, "error removing oldest log file")
  234. }
  235. for i := maxFiles - 1; i > 1; i-- {
  236. toPath := name + "." + strconv.Itoa(i) + extension
  237. fromPath := name + "." + strconv.Itoa(i-1) + extension
  238. logrus.WithField("source", fromPath).WithField("target", toPath).Trace("Rotating log file")
  239. if err := os.Rename(fromPath, toPath); err != nil && !errors.Is(err, fs.ErrNotExist) {
  240. return err
  241. }
  242. }
  243. return nil
  244. }
  245. func compressFile(fileName string, lastTimestamp time.Time) (retErr error) {
  246. file, err := open(fileName)
  247. if err != nil {
  248. if errors.Is(err, fs.ErrNotExist) {
  249. logrus.WithField("file", fileName).WithError(err).Debug("Could not open log file to compress")
  250. return nil
  251. }
  252. return errors.Wrap(err, "failed to open log file")
  253. }
  254. defer func() {
  255. file.Close()
  256. if retErr == nil {
  257. err := unlink(fileName)
  258. if err != nil && !errors.Is(err, fs.ErrNotExist) {
  259. retErr = errors.Wrap(err, "failed to remove source log file")
  260. }
  261. }
  262. }()
  263. outFile, err := openFile(fileName+".gz", os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0640)
  264. if err != nil {
  265. return errors.Wrap(err, "failed to open or create gzip log file")
  266. }
  267. defer func() {
  268. outFile.Close()
  269. if retErr != nil {
  270. if err := unlink(fileName + ".gz"); err != nil && !errors.Is(err, fs.ErrNotExist) {
  271. logrus.WithError(err).Error("Error cleaning up after failed log compression")
  272. }
  273. }
  274. }()
  275. compressWriter := gzip.NewWriter(outFile)
  276. defer compressWriter.Close()
  277. // Add the last log entry timestamp to the gzip header
  278. extra := rotateFileMetadata{}
  279. extra.LastTime = lastTimestamp
  280. compressWriter.Header.Extra, err = json.Marshal(&extra)
  281. if err != nil {
  282. // Here log the error only and don't return since this is just an optimization.
  283. logrus.Warningf("Failed to marshal gzip header as JSON: %v", err)
  284. }
  285. _, err = pools.Copy(compressWriter, file)
  286. if err != nil {
  287. return errors.Wrapf(err, "error compressing log file %s", fileName)
  288. }
  289. return nil
  290. }
  291. // MaxFiles return maximum number of files
  292. func (w *LogFile) MaxFiles() int {
  293. return w.maxFiles
  294. }
  295. // Close closes underlying file and signals all readers to stop.
  296. func (w *LogFile) Close() error {
  297. w.mu.Lock()
  298. defer w.mu.Unlock()
  299. if w.closed {
  300. return nil
  301. }
  302. if err := w.f.Close(); err != nil && !errors.Is(err, fs.ErrClosed) {
  303. return err
  304. }
  305. w.closed = true
  306. close(w.closedCh)
  307. return nil
  308. }
  309. // ReadLogs decodes entries from log files and sends them the passed in watcher
  310. //
  311. // Note: Using the follow option can become inconsistent in cases with very frequent rotations and max log files is 1.
  312. // TODO: Consider a different implementation which can effectively follow logs under frequent rotations.
  313. func (w *LogFile) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
  314. watcher := logger.NewLogWatcher()
  315. // Lock before starting the reader goroutine to synchronize operations
  316. // for race-free unit testing. The writer is locked out until the reader
  317. // has opened the log file and set the read cursor to the current
  318. // position.
  319. w.mu.RLock()
  320. go w.readLogsLocked(config, watcher)
  321. return watcher
  322. }
  323. func (w *LogFile) readLogsLocked(config logger.ReadConfig, watcher *logger.LogWatcher) {
  324. defer close(watcher.Msg)
  325. currentFile, err := open(w.f.Name())
  326. if err != nil {
  327. w.mu.RUnlock()
  328. watcher.Err <- err
  329. return
  330. }
  331. defer currentFile.Close()
  332. dec := w.createDecoder(nil)
  333. defer dec.Close()
  334. currentChunk, err := newSectionReader(currentFile)
  335. if err != nil {
  336. w.mu.RUnlock()
  337. watcher.Err <- err
  338. return
  339. }
  340. notifyEvict := w.notifyReaders.SubscribeTopicWithBuffer(func(i interface{}) bool {
  341. _, ok := i.(error)
  342. return ok
  343. }, 1)
  344. defer w.notifyReaders.Evict(notifyEvict)
  345. if config.Tail != 0 {
  346. // TODO(@cpuguy83): Instead of opening every file, only get the files which
  347. // are needed to tail.
  348. // This is especially costly when compression is enabled.
  349. files, err := w.openRotatedFiles(config)
  350. w.mu.RUnlock()
  351. if err != nil {
  352. watcher.Err <- err
  353. return
  354. }
  355. closeFiles := func() {
  356. for _, f := range files {
  357. f.Close()
  358. fileName := f.Name()
  359. if strings.HasSuffix(fileName, tmpLogfileSuffix) {
  360. err := w.filesRefCounter.Dereference(fileName)
  361. if err != nil {
  362. logrus.WithError(err).WithField("file", fileName).Error("Failed to dereference the log file")
  363. }
  364. }
  365. }
  366. }
  367. readers := make([]SizeReaderAt, 0, len(files)+1)
  368. for _, f := range files {
  369. stat, err := f.Stat()
  370. if err != nil {
  371. watcher.Err <- errors.Wrap(err, "error reading size of rotated file")
  372. closeFiles()
  373. return
  374. }
  375. readers = append(readers, io.NewSectionReader(f, 0, stat.Size()))
  376. }
  377. if currentChunk.Size() > 0 {
  378. readers = append(readers, currentChunk)
  379. }
  380. ok := tailFiles(readers, watcher, dec, w.getTailReader, config, notifyEvict)
  381. closeFiles()
  382. if !ok {
  383. return
  384. }
  385. w.mu.RLock()
  386. }
  387. w.mu.RUnlock()
  388. if !config.Follow {
  389. return
  390. }
  391. notifyRotate := w.notifyReaders.SubscribeTopic(func(i interface{}) bool {
  392. _, ok := i.(struct{})
  393. return ok
  394. })
  395. defer w.notifyReaders.Evict(notifyRotate)
  396. followLogs(currentFile, watcher, w.closedCh, notifyRotate, notifyEvict, dec, config.Since, config.Until)
  397. }
  398. func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (files []*os.File, err error) {
  399. w.rotateMu.Lock()
  400. defer w.rotateMu.Unlock()
  401. defer func() {
  402. if err == nil {
  403. return
  404. }
  405. for _, f := range files {
  406. f.Close()
  407. if strings.HasSuffix(f.Name(), tmpLogfileSuffix) {
  408. err := unlink(f.Name())
  409. if err != nil && !errors.Is(err, fs.ErrNotExist) {
  410. logrus.Warnf("Failed to remove logfile: %v", err)
  411. }
  412. }
  413. }
  414. }()
  415. for i := w.maxFiles; i > 1; i-- {
  416. f, err := open(fmt.Sprintf("%s.%d", w.f.Name(), i-1))
  417. if err != nil {
  418. if !errors.Is(err, fs.ErrNotExist) {
  419. return nil, errors.Wrap(err, "error opening rotated log file")
  420. }
  421. fileName := fmt.Sprintf("%s.%d.gz", w.f.Name(), i-1)
  422. decompressedFileName := fileName + tmpLogfileSuffix
  423. tmpFile, err := w.filesRefCounter.GetReference(decompressedFileName, func(refFileName string, exists bool) (*os.File, error) {
  424. if exists {
  425. return open(refFileName)
  426. }
  427. return decompressfile(fileName, refFileName, config.Since)
  428. })
  429. if err != nil {
  430. if !errors.Is(err, fs.ErrNotExist) {
  431. return nil, errors.Wrap(err, "error getting reference to decompressed log file")
  432. }
  433. continue
  434. }
  435. if tmpFile == nil {
  436. // The log before `config.Since` does not need to read
  437. break
  438. }
  439. files = append(files, tmpFile)
  440. continue
  441. }
  442. files = append(files, f)
  443. }
  444. return files, nil
  445. }
  446. func decompressfile(fileName, destFileName string, since time.Time) (*os.File, error) {
  447. cf, err := open(fileName)
  448. if err != nil {
  449. return nil, errors.Wrap(err, "error opening file for decompression")
  450. }
  451. defer cf.Close()
  452. rc, err := gzip.NewReader(cf)
  453. if err != nil {
  454. return nil, errors.Wrap(err, "error making gzip reader for compressed log file")
  455. }
  456. defer rc.Close()
  457. // Extract the last log entry timestramp from the gzip header
  458. extra := &rotateFileMetadata{}
  459. err = json.Unmarshal(rc.Header.Extra, extra)
  460. if err == nil && extra.LastTime.Before(since) {
  461. return nil, nil
  462. }
  463. rs, err := openFile(destFileName, os.O_CREATE|os.O_RDWR, 0640)
  464. if err != nil {
  465. return nil, errors.Wrap(err, "error creating file for copying decompressed log stream")
  466. }
  467. _, err = pools.Copy(rs, rc)
  468. if err != nil {
  469. rs.Close()
  470. rErr := unlink(rs.Name())
  471. if rErr != nil && !errors.Is(rErr, fs.ErrNotExist) {
  472. logrus.Errorf("Failed to remove logfile: %v", rErr)
  473. }
  474. return nil, errors.Wrap(err, "error while copying decompressed log stream to file")
  475. }
  476. return rs, nil
  477. }
  478. func newSectionReader(f *os.File) (*io.SectionReader, error) {
  479. // seek to the end to get the size
  480. // we'll leave this at the end of the file since section reader does not advance the reader
  481. size, err := f.Seek(0, io.SeekEnd)
  482. if err != nil {
  483. return nil, errors.Wrap(err, "error getting current file size")
  484. }
  485. return io.NewSectionReader(f, 0, size), nil
  486. }
  487. func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, getTailReader GetTailReaderFunc, config logger.ReadConfig, notifyEvict <-chan interface{}) (cont bool) {
  488. nLines := config.Tail
  489. ctx, cancel := context.WithCancel(context.Background())
  490. defer cancel()
  491. cont = true
  492. // TODO(@cpuguy83): we should plumb a context through instead of dealing with `WatchClose()` here.
  493. go func() {
  494. select {
  495. case err := <-notifyEvict:
  496. if err != nil {
  497. watcher.Err <- err.(error)
  498. cont = false
  499. cancel()
  500. }
  501. case <-ctx.Done():
  502. case <-watcher.WatchConsumerGone():
  503. cont = false
  504. cancel()
  505. }
  506. }()
  507. readers := make([]io.Reader, 0, len(files))
  508. if config.Tail > 0 {
  509. for i := len(files) - 1; i >= 0 && nLines > 0; i-- {
  510. tail, n, err := getTailReader(ctx, files[i], nLines)
  511. if err != nil {
  512. watcher.Err <- errors.Wrap(err, "error finding file position to start log tailing")
  513. return
  514. }
  515. nLines -= n
  516. readers = append([]io.Reader{tail}, readers...)
  517. }
  518. } else {
  519. for _, r := range files {
  520. readers = append(readers, &wrappedReaderAt{ReaderAt: r})
  521. }
  522. }
  523. rdr := io.MultiReader(readers...)
  524. dec.Reset(rdr)
  525. for {
  526. msg, err := dec.Decode()
  527. if err != nil {
  528. if !errors.Is(err, io.EOF) {
  529. watcher.Err <- err
  530. }
  531. return
  532. }
  533. if !config.Since.IsZero() && msg.Timestamp.Before(config.Since) {
  534. continue
  535. }
  536. if !config.Until.IsZero() && msg.Timestamp.After(config.Until) {
  537. return
  538. }
  539. select {
  540. case <-ctx.Done():
  541. return
  542. case watcher.Msg <- msg:
  543. }
  544. }
  545. }
  546. func watchFile(name string) (filenotify.FileWatcher, error) {
  547. var fileWatcher filenotify.FileWatcher
  548. if runtime.GOOS == "windows" {
  549. // FileWatcher on Windows files is based on the syscall notifications which has an issue because of file caching.
  550. // It is based on ReadDirectoryChangesW() which doesn't detect writes to the cache. It detects writes to disk only.
  551. // Because of the OS lazy writing, we don't get notifications for file writes and thereby the watcher
  552. // doesn't work. Hence for Windows we will use poll based notifier.
  553. fileWatcher = filenotify.NewPollingWatcher()
  554. } else {
  555. var err error
  556. fileWatcher, err = filenotify.New()
  557. if err != nil {
  558. return nil, err
  559. }
  560. }
  561. logger := logrus.WithFields(logrus.Fields{
  562. "module": "logger",
  563. "file": name,
  564. })
  565. if err := fileWatcher.Add(name); err != nil {
  566. // we will retry using file poller.
  567. logger.WithError(err).Warnf("falling back to file poller")
  568. fileWatcher.Close()
  569. fileWatcher = filenotify.NewPollingWatcher()
  570. if err := fileWatcher.Add(name); err != nil {
  571. fileWatcher.Close()
  572. logger.WithError(err).Debugf("error watching log file for modifications")
  573. return nil, err
  574. }
  575. }
  576. return fileWatcher, nil
  577. }
  578. type wrappedReaderAt struct {
  579. io.ReaderAt
  580. pos int64
  581. }
  582. func (r *wrappedReaderAt) Read(p []byte) (int, error) {
  583. n, err := r.ReaderAt.ReadAt(p, r.pos)
  584. r.pos += int64(n)
  585. return n, err
  586. }