logfile.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659
  1. package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutils"
  2. import (
  3. "compress/gzip"
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "io"
  8. "os"
  9. "runtime"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "time"
  14. "github.com/docker/docker/daemon/logger"
  15. "github.com/docker/docker/pkg/filenotify"
  16. "github.com/docker/docker/pkg/pools"
  17. "github.com/docker/docker/pkg/pubsub"
  18. "github.com/pkg/errors"
  19. "github.com/sirupsen/logrus"
  20. )
  21. const tmpLogfileSuffix = ".tmp"
  22. // rotateFileMetadata is a metadata of the gzip header of the compressed log file
  23. type rotateFileMetadata struct {
  24. LastTime time.Time `json:"lastTime,omitempty"`
  25. }
  26. // refCounter is a counter of logfile being referenced
  27. type refCounter struct {
  28. mu sync.Mutex
  29. counter map[string]int
  30. }
  31. // Reference increase the reference counter for specified logfile
  32. func (rc *refCounter) GetReference(fileName string, openRefFile func(fileName string, exists bool) (*os.File, error)) (*os.File, error) {
  33. rc.mu.Lock()
  34. defer rc.mu.Unlock()
  35. var (
  36. file *os.File
  37. err error
  38. )
  39. _, ok := rc.counter[fileName]
  40. file, err = openRefFile(fileName, ok)
  41. if err != nil {
  42. return nil, err
  43. }
  44. if ok {
  45. rc.counter[fileName]++
  46. } else if file != nil {
  47. rc.counter[file.Name()] = 1
  48. }
  49. return file, nil
  50. }
  51. // Dereference reduce the reference counter for specified logfile
  52. func (rc *refCounter) Dereference(fileName string) error {
  53. rc.mu.Lock()
  54. defer rc.mu.Unlock()
  55. rc.counter[fileName]--
  56. if rc.counter[fileName] <= 0 {
  57. delete(rc.counter, fileName)
  58. err := os.Remove(fileName)
  59. if err != nil && !os.IsNotExist(err) {
  60. return err
  61. }
  62. }
  63. return nil
  64. }
  65. // LogFile is Logger implementation for default Docker logging.
  66. type LogFile struct {
  67. mu sync.RWMutex // protects the logfile access
  68. f *os.File // store for closing
  69. closed bool
  70. rotateMu sync.Mutex // blocks the next rotation until the current rotation is completed
  71. capacity int64 // maximum size of each file
  72. currentSize int64 // current size of the latest file
  73. maxFiles int // maximum number of files
  74. compress bool // whether old versions of log files are compressed
  75. lastTimestamp time.Time // timestamp of the last log
  76. filesRefCounter refCounter // keep reference-counted of decompressed files
  77. notifyReaders *pubsub.Publisher
  78. marshal logger.MarshalFunc
  79. createDecoder MakeDecoderFn
  80. getTailReader GetTailReaderFunc
  81. perms os.FileMode
  82. }
  83. // MakeDecoderFn creates a decoder
  84. type MakeDecoderFn func(rdr io.Reader) Decoder
  85. // Decoder is for reading logs
  86. // It is created by the log reader by calling the `MakeDecoderFunc`
  87. type Decoder interface {
  88. // Reset resets the decoder
  89. // Reset is called for certain events, such as log rotations
  90. Reset(io.Reader)
  91. // Decode decodes the next log messeage from the stream
  92. Decode() (*logger.Message, error)
  93. // Close signals to the decoder that it can release whatever resources it was using.
  94. Close()
  95. }
  96. // SizeReaderAt defines a ReaderAt that also reports its size.
  97. // This is used for tailing log files.
  98. type SizeReaderAt interface {
  99. io.ReaderAt
  100. Size() int64
  101. }
  102. // GetTailReaderFunc is used to truncate a reader to only read as much as is required
  103. // in order to get the passed in number of log lines.
  104. // It returns the sectioned reader, the number of lines that the section reader
  105. // contains, and any error that occurs.
  106. type GetTailReaderFunc func(ctx context.Context, f SizeReaderAt, nLogLines int) (rdr io.Reader, nLines int, err error)
  107. // NewLogFile creates new LogFile
  108. func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, marshaller logger.MarshalFunc, decodeFunc MakeDecoderFn, perms os.FileMode, getTailReader GetTailReaderFunc) (*LogFile, error) {
  109. log, err := openFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, perms)
  110. if err != nil {
  111. return nil, err
  112. }
  113. size, err := log.Seek(0, io.SeekEnd)
  114. if err != nil {
  115. return nil, err
  116. }
  117. return &LogFile{
  118. f: log,
  119. capacity: capacity,
  120. currentSize: size,
  121. maxFiles: maxFiles,
  122. compress: compress,
  123. filesRefCounter: refCounter{counter: make(map[string]int)},
  124. notifyReaders: pubsub.NewPublisher(0, 1),
  125. marshal: marshaller,
  126. createDecoder: decodeFunc,
  127. perms: perms,
  128. getTailReader: getTailReader,
  129. }, nil
  130. }
  131. // WriteLogEntry writes the provided log message to the current log file.
  132. // This may trigger a rotation event if the max file/capacity limits are hit.
  133. func (w *LogFile) WriteLogEntry(msg *logger.Message) error {
  134. b, err := w.marshal(msg)
  135. if err != nil {
  136. return errors.Wrap(err, "error marshalling log message")
  137. }
  138. ts := msg.Timestamp
  139. logger.PutMessage(msg)
  140. msg = nil // Turn use-after-put bugs into panics.
  141. w.mu.Lock()
  142. if w.closed {
  143. w.mu.Unlock()
  144. return errors.New("cannot write because the output file was closed")
  145. }
  146. if err := w.checkCapacityAndRotate(); err != nil {
  147. w.mu.Unlock()
  148. return errors.Wrap(err, "error rotating log file")
  149. }
  150. n, err := w.f.Write(b)
  151. if err == nil {
  152. w.currentSize += int64(n)
  153. w.lastTimestamp = ts
  154. }
  155. w.mu.Unlock()
  156. return errors.Wrap(err, "error writing log entry")
  157. }
  158. func (w *LogFile) checkCapacityAndRotate() (retErr error) {
  159. if w.capacity == -1 {
  160. return nil
  161. }
  162. if w.currentSize < w.capacity {
  163. return nil
  164. }
  165. w.rotateMu.Lock()
  166. noCompress := w.maxFiles <= 1 || !w.compress
  167. defer func() {
  168. // If we aren't going to run the goroutine to compress the log file, then we need to unlock in this function.
  169. // Otherwise the lock will be released in the goroutine that handles compression.
  170. if retErr != nil || noCompress {
  171. w.rotateMu.Unlock()
  172. }
  173. }()
  174. fname := w.f.Name()
  175. if err := w.f.Close(); err != nil {
  176. // if there was an error during a prior rotate, the file could already be closed
  177. if !errors.Is(err, os.ErrClosed) {
  178. return errors.Wrap(err, "error closing file")
  179. }
  180. }
  181. if err := rotate(fname, w.maxFiles, w.compress); err != nil {
  182. logrus.WithError(err).Warn("Error rotating log file, log data may have been lost")
  183. } else {
  184. var renameErr error
  185. for i := 0; i < 10; i++ {
  186. if renameErr = os.Rename(fname, fname+".1"); renameErr != nil && !os.IsNotExist(renameErr) {
  187. logrus.WithError(renameErr).WithField("file", fname).Debug("Error rotating current container log file, evicting readers and retrying")
  188. w.notifyReaders.Publish(renameErr)
  189. time.Sleep(100 * time.Millisecond)
  190. continue
  191. }
  192. break
  193. }
  194. if renameErr != nil {
  195. logrus.WithError(renameErr).Error("Error renaming current log file")
  196. }
  197. }
  198. file, err := openFile(fname, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, w.perms)
  199. if err != nil {
  200. return err
  201. }
  202. w.f = file
  203. w.currentSize = 0
  204. w.notifyReaders.Publish(struct{}{})
  205. if noCompress {
  206. return nil
  207. }
  208. ts := w.lastTimestamp
  209. go func() {
  210. if err := compressFile(fname+".1", ts); err != nil {
  211. logrus.WithError(err).Error("Error compressing log file after rotation")
  212. }
  213. w.rotateMu.Unlock()
  214. }()
  215. return nil
  216. }
  217. func rotate(name string, maxFiles int, compress bool) error {
  218. if maxFiles < 2 {
  219. return nil
  220. }
  221. var extension string
  222. if compress {
  223. extension = ".gz"
  224. }
  225. lastFile := fmt.Sprintf("%s.%d%s", name, maxFiles-1, extension)
  226. err := os.Remove(lastFile)
  227. if err != nil && !os.IsNotExist(err) {
  228. return errors.Wrap(err, "error removing oldest log file")
  229. }
  230. for i := maxFiles - 1; i > 1; i-- {
  231. toPath := name + "." + strconv.Itoa(i) + extension
  232. fromPath := name + "." + strconv.Itoa(i-1) + extension
  233. logrus.WithField("source", fromPath).WithField("target", toPath).Trace("Rotating log file")
  234. if err := os.Rename(fromPath, toPath); err != nil && !os.IsNotExist(err) {
  235. return err
  236. }
  237. }
  238. return nil
  239. }
  240. func compressFile(fileName string, lastTimestamp time.Time) (retErr error) {
  241. file, err := open(fileName)
  242. if err != nil {
  243. if os.IsNotExist(err) {
  244. logrus.WithField("file", fileName).WithError(err).Debug("Could not open log file to compress")
  245. return nil
  246. }
  247. return errors.Wrap(err, "failed to open log file")
  248. }
  249. defer func() {
  250. file.Close()
  251. if retErr == nil {
  252. err := os.Remove(fileName)
  253. if err != nil && !os.IsNotExist(err) {
  254. retErr = errors.Wrap(err, "failed to remove source log file")
  255. }
  256. }
  257. }()
  258. outFile, err := openFile(fileName+".gz", os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0640)
  259. if err != nil {
  260. return errors.Wrap(err, "failed to open or create gzip log file")
  261. }
  262. defer func() {
  263. outFile.Close()
  264. if retErr != nil {
  265. if err := os.Remove(fileName + ".gz"); err != nil && !os.IsExist(err) {
  266. logrus.WithError(err).Error("Error cleaning up after failed log compression")
  267. }
  268. }
  269. }()
  270. compressWriter := gzip.NewWriter(outFile)
  271. defer compressWriter.Close()
  272. // Add the last log entry timestamp to the gzip header
  273. extra := rotateFileMetadata{}
  274. extra.LastTime = lastTimestamp
  275. compressWriter.Header.Extra, err = json.Marshal(&extra)
  276. if err != nil {
  277. // Here log the error only and don't return since this is just an optimization.
  278. logrus.Warningf("Failed to marshal gzip header as JSON: %v", err)
  279. }
  280. _, err = pools.Copy(compressWriter, file)
  281. if err != nil {
  282. return errors.Wrapf(err, "error compressing log file %s", fileName)
  283. }
  284. return nil
  285. }
  286. // MaxFiles return maximum number of files
  287. func (w *LogFile) MaxFiles() int {
  288. return w.maxFiles
  289. }
  290. // Close closes underlying file and signals all readers to stop.
  291. func (w *LogFile) Close() error {
  292. w.mu.Lock()
  293. defer w.mu.Unlock()
  294. if w.closed {
  295. return nil
  296. }
  297. if err := w.f.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
  298. return err
  299. }
  300. w.closed = true
  301. return nil
  302. }
  303. // ReadLogs decodes entries from log files and sends them the passed in watcher
  304. //
  305. // Note: Using the follow option can become inconsistent in cases with very frequent rotations and max log files is 1.
  306. // TODO: Consider a different implementation which can effectively follow logs under frequent rotations.
  307. func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) {
  308. w.mu.RLock()
  309. currentFile, err := open(w.f.Name())
  310. if err != nil {
  311. w.mu.RUnlock()
  312. watcher.Err <- err
  313. return
  314. }
  315. defer currentFile.Close()
  316. dec := w.createDecoder(nil)
  317. defer dec.Close()
  318. currentChunk, err := newSectionReader(currentFile)
  319. if err != nil {
  320. w.mu.RUnlock()
  321. watcher.Err <- err
  322. return
  323. }
  324. notifyEvict := w.notifyReaders.SubscribeTopicWithBuffer(func(i interface{}) bool {
  325. _, ok := i.(error)
  326. return ok
  327. }, 1)
  328. defer w.notifyReaders.Evict(notifyEvict)
  329. if config.Tail != 0 {
  330. // TODO(@cpuguy83): Instead of opening every file, only get the files which
  331. // are needed to tail.
  332. // This is especially costly when compression is enabled.
  333. files, err := w.openRotatedFiles(config)
  334. w.mu.RUnlock()
  335. if err != nil {
  336. watcher.Err <- err
  337. return
  338. }
  339. closeFiles := func() {
  340. for _, f := range files {
  341. f.Close()
  342. fileName := f.Name()
  343. if strings.HasSuffix(fileName, tmpLogfileSuffix) {
  344. err := w.filesRefCounter.Dereference(fileName)
  345. if err != nil {
  346. logrus.WithError(err).WithField("file", fileName).Error("Failed to dereference the log file")
  347. }
  348. }
  349. }
  350. }
  351. readers := make([]SizeReaderAt, 0, len(files)+1)
  352. for _, f := range files {
  353. stat, err := f.Stat()
  354. if err != nil {
  355. watcher.Err <- errors.Wrap(err, "error reading size of rotated file")
  356. closeFiles()
  357. return
  358. }
  359. readers = append(readers, io.NewSectionReader(f, 0, stat.Size()))
  360. }
  361. if currentChunk.Size() > 0 {
  362. readers = append(readers, currentChunk)
  363. }
  364. ok := tailFiles(readers, watcher, dec, w.getTailReader, config, notifyEvict)
  365. closeFiles()
  366. if !ok {
  367. return
  368. }
  369. w.mu.RLock()
  370. }
  371. if !config.Follow || w.closed {
  372. w.mu.RUnlock()
  373. return
  374. }
  375. w.mu.RUnlock()
  376. notifyRotate := w.notifyReaders.SubscribeTopic(func(i interface{}) bool {
  377. _, ok := i.(struct{})
  378. return ok
  379. })
  380. defer w.notifyReaders.Evict(notifyRotate)
  381. followLogs(currentFile, watcher, notifyRotate, notifyEvict, dec, config.Since, config.Until)
  382. }
  383. func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (files []*os.File, err error) {
  384. w.rotateMu.Lock()
  385. defer w.rotateMu.Unlock()
  386. defer func() {
  387. if err == nil {
  388. return
  389. }
  390. for _, f := range files {
  391. f.Close()
  392. if strings.HasSuffix(f.Name(), tmpLogfileSuffix) {
  393. err := os.Remove(f.Name())
  394. if err != nil && !os.IsNotExist(err) {
  395. logrus.Warnf("Failed to remove logfile: %v", err)
  396. }
  397. }
  398. }
  399. }()
  400. for i := w.maxFiles; i > 1; i-- {
  401. f, err := open(fmt.Sprintf("%s.%d", w.f.Name(), i-1))
  402. if err != nil {
  403. if !os.IsNotExist(err) {
  404. return nil, errors.Wrap(err, "error opening rotated log file")
  405. }
  406. fileName := fmt.Sprintf("%s.%d.gz", w.f.Name(), i-1)
  407. decompressedFileName := fileName + tmpLogfileSuffix
  408. tmpFile, err := w.filesRefCounter.GetReference(decompressedFileName, func(refFileName string, exists bool) (*os.File, error) {
  409. if exists {
  410. return open(refFileName)
  411. }
  412. return decompressfile(fileName, refFileName, config.Since)
  413. })
  414. if err != nil {
  415. if !errors.Is(err, os.ErrNotExist) {
  416. return nil, errors.Wrap(err, "error getting reference to decompressed log file")
  417. }
  418. continue
  419. }
  420. if tmpFile == nil {
  421. // The log before `config.Since` does not need to read
  422. break
  423. }
  424. files = append(files, tmpFile)
  425. continue
  426. }
  427. files = append(files, f)
  428. }
  429. return files, nil
  430. }
  431. func decompressfile(fileName, destFileName string, since time.Time) (*os.File, error) {
  432. cf, err := open(fileName)
  433. if err != nil {
  434. return nil, errors.Wrap(err, "error opening file for decompression")
  435. }
  436. defer cf.Close()
  437. rc, err := gzip.NewReader(cf)
  438. if err != nil {
  439. return nil, errors.Wrap(err, "error making gzip reader for compressed log file")
  440. }
  441. defer rc.Close()
  442. // Extract the last log entry timestramp from the gzip header
  443. extra := &rotateFileMetadata{}
  444. err = json.Unmarshal(rc.Header.Extra, extra)
  445. if err == nil && extra.LastTime.Before(since) {
  446. return nil, nil
  447. }
  448. rs, err := openFile(destFileName, os.O_CREATE|os.O_RDWR, 0640)
  449. if err != nil {
  450. return nil, errors.Wrap(err, "error creating file for copying decompressed log stream")
  451. }
  452. _, err = pools.Copy(rs, rc)
  453. if err != nil {
  454. rs.Close()
  455. rErr := os.Remove(rs.Name())
  456. if rErr != nil && !os.IsNotExist(rErr) {
  457. logrus.Errorf("Failed to remove logfile: %v", rErr)
  458. }
  459. return nil, errors.Wrap(err, "error while copying decompressed log stream to file")
  460. }
  461. return rs, nil
  462. }
  463. func newSectionReader(f *os.File) (*io.SectionReader, error) {
  464. // seek to the end to get the size
  465. // we'll leave this at the end of the file since section reader does not advance the reader
  466. size, err := f.Seek(0, io.SeekEnd)
  467. if err != nil {
  468. return nil, errors.Wrap(err, "error getting current file size")
  469. }
  470. return io.NewSectionReader(f, 0, size), nil
  471. }
  472. func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, getTailReader GetTailReaderFunc, config logger.ReadConfig, notifyEvict <-chan interface{}) (cont bool) {
  473. nLines := config.Tail
  474. ctx, cancel := context.WithCancel(context.Background())
  475. defer cancel()
  476. cont = true
  477. // TODO(@cpuguy83): we should plumb a context through instead of dealing with `WatchClose()` here.
  478. go func() {
  479. select {
  480. case err := <-notifyEvict:
  481. if err != nil {
  482. watcher.Err <- err.(error)
  483. cont = false
  484. cancel()
  485. }
  486. case <-ctx.Done():
  487. case <-watcher.WatchConsumerGone():
  488. cont = false
  489. cancel()
  490. }
  491. }()
  492. readers := make([]io.Reader, 0, len(files))
  493. if config.Tail > 0 {
  494. for i := len(files) - 1; i >= 0 && nLines > 0; i-- {
  495. tail, n, err := getTailReader(ctx, files[i], nLines)
  496. if err != nil {
  497. watcher.Err <- errors.Wrap(err, "error finding file position to start log tailing")
  498. return
  499. }
  500. nLines -= n
  501. readers = append([]io.Reader{tail}, readers...)
  502. }
  503. } else {
  504. for _, r := range files {
  505. readers = append(readers, &wrappedReaderAt{ReaderAt: r})
  506. }
  507. }
  508. rdr := io.MultiReader(readers...)
  509. dec.Reset(rdr)
  510. for {
  511. msg, err := dec.Decode()
  512. if err != nil {
  513. if !errors.Is(err, io.EOF) {
  514. watcher.Err <- err
  515. }
  516. return
  517. }
  518. if !config.Since.IsZero() && msg.Timestamp.Before(config.Since) {
  519. continue
  520. }
  521. if !config.Until.IsZero() && msg.Timestamp.After(config.Until) {
  522. return
  523. }
  524. select {
  525. case <-ctx.Done():
  526. return
  527. case watcher.Msg <- msg:
  528. }
  529. }
  530. }
  531. func watchFile(name string) (filenotify.FileWatcher, error) {
  532. var fileWatcher filenotify.FileWatcher
  533. if runtime.GOOS == "windows" {
  534. // FileWatcher on Windows files is based on the syscall notifications which has an issue because of file caching.
  535. // It is based on ReadDirectoryChangesW() which doesn't detect writes to the cache. It detects writes to disk only.
  536. // Because of the OS lazy writing, we don't get notifications for file writes and thereby the watcher
  537. // doesn't work. Hence for Windows we will use poll based notifier.
  538. fileWatcher = filenotify.NewPollingWatcher()
  539. } else {
  540. var err error
  541. fileWatcher, err = filenotify.New()
  542. if err != nil {
  543. return nil, err
  544. }
  545. }
  546. logger := logrus.WithFields(logrus.Fields{
  547. "module": "logger",
  548. "file": name,
  549. })
  550. if err := fileWatcher.Add(name); err != nil {
  551. // we will retry using file poller.
  552. logger.WithError(err).Warnf("falling back to file poller")
  553. fileWatcher.Close()
  554. fileWatcher = filenotify.NewPollingWatcher()
  555. if err := fileWatcher.Add(name); err != nil {
  556. fileWatcher.Close()
  557. logger.WithError(err).Debugf("error watching log file for modifications")
  558. return nil, err
  559. }
  560. }
  561. return fileWatcher, nil
  562. }
  563. type wrappedReaderAt struct {
  564. io.ReaderAt
  565. pos int64
  566. }
  567. func (r *wrappedReaderAt) Read(p []byte) (int, error) {
  568. n, err := r.ReaderAt.ReadAt(p, r.pos)
  569. r.pos += int64(n)
  570. return n, err
  571. }