logfile.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832
  1. package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutils"
  2. import (
  3. "compress/gzip"
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "io"
  8. "os"
  9. "runtime"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "time"
  14. "github.com/docker/docker/daemon/logger"
  15. "github.com/docker/docker/pkg/filenotify"
  16. "github.com/docker/docker/pkg/pools"
  17. "github.com/docker/docker/pkg/pubsub"
  18. "github.com/fsnotify/fsnotify"
  19. "github.com/pkg/errors"
  20. "github.com/sirupsen/logrus"
  21. )
  22. const tmpLogfileSuffix = ".tmp"
  23. // rotateFileMetadata is a metadata of the gzip header of the compressed log file
  24. type rotateFileMetadata struct {
  25. LastTime time.Time `json:"lastTime,omitempty"`
  26. }
  27. // refCounter is a counter of logfile being referenced
  28. type refCounter struct {
  29. mu sync.Mutex
  30. counter map[string]int
  31. }
  32. // Reference increase the reference counter for specified logfile
  33. func (rc *refCounter) GetReference(fileName string, openRefFile func(fileName string, exists bool) (*os.File, error)) (*os.File, error) {
  34. rc.mu.Lock()
  35. defer rc.mu.Unlock()
  36. var (
  37. file *os.File
  38. err error
  39. )
  40. _, ok := rc.counter[fileName]
  41. file, err = openRefFile(fileName, ok)
  42. if err != nil {
  43. return nil, err
  44. }
  45. if ok {
  46. rc.counter[fileName]++
  47. } else if file != nil {
  48. rc.counter[file.Name()] = 1
  49. }
  50. return file, nil
  51. }
  52. // Dereference reduce the reference counter for specified logfile
  53. func (rc *refCounter) Dereference(fileName string) error {
  54. rc.mu.Lock()
  55. defer rc.mu.Unlock()
  56. rc.counter[fileName]--
  57. if rc.counter[fileName] <= 0 {
  58. delete(rc.counter, fileName)
  59. err := os.Remove(fileName)
  60. if err != nil && !os.IsNotExist(err) {
  61. return err
  62. }
  63. }
  64. return nil
  65. }
  66. // LogFile is Logger implementation for default Docker logging.
  67. type LogFile struct {
  68. mu sync.RWMutex // protects the logfile access
  69. f *os.File // store for closing
  70. closed bool
  71. rotateMu sync.Mutex // blocks the next rotation until the current rotation is completed
  72. capacity int64 // maximum size of each file
  73. currentSize int64 // current size of the latest file
  74. maxFiles int // maximum number of files
  75. compress bool // whether old versions of log files are compressed
  76. lastTimestamp time.Time // timestamp of the last log
  77. filesRefCounter refCounter // keep reference-counted of decompressed files
  78. notifyReaders *pubsub.Publisher
  79. marshal logger.MarshalFunc
  80. createDecoder MakeDecoderFn
  81. getTailReader GetTailReaderFunc
  82. perms os.FileMode
  83. }
  84. // MakeDecoderFn creates a decoder
  85. type MakeDecoderFn func(rdr io.Reader) Decoder
  86. // Decoder is for reading logs
  87. // It is created by the log reader by calling the `MakeDecoderFunc`
  88. type Decoder interface {
  89. // Reset resets the decoder
  90. // Reset is called for certain events, such as log rotations
  91. Reset(io.Reader)
  92. // Decode decodes the next log messeage from the stream
  93. Decode() (*logger.Message, error)
  94. // Close signals to the decoder that it can release whatever resources it was using.
  95. Close()
  96. }
  97. // SizeReaderAt defines a ReaderAt that also reports its size.
  98. // This is used for tailing log files.
  99. type SizeReaderAt interface {
  100. io.ReaderAt
  101. Size() int64
  102. }
  103. // GetTailReaderFunc is used to truncate a reader to only read as much as is required
  104. // in order to get the passed in number of log lines.
  105. // It returns the sectioned reader, the number of lines that the section reader
  106. // contains, and any error that occurs.
  107. type GetTailReaderFunc func(ctx context.Context, f SizeReaderAt, nLogLines int) (rdr io.Reader, nLines int, err error)
  108. // NewLogFile creates new LogFile
  109. func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, marshaller logger.MarshalFunc, decodeFunc MakeDecoderFn, perms os.FileMode, getTailReader GetTailReaderFunc) (*LogFile, error) {
  110. log, err := openFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, perms)
  111. if err != nil {
  112. return nil, err
  113. }
  114. size, err := log.Seek(0, io.SeekEnd)
  115. if err != nil {
  116. return nil, err
  117. }
  118. return &LogFile{
  119. f: log,
  120. capacity: capacity,
  121. currentSize: size,
  122. maxFiles: maxFiles,
  123. compress: compress,
  124. filesRefCounter: refCounter{counter: make(map[string]int)},
  125. notifyReaders: pubsub.NewPublisher(0, 1),
  126. marshal: marshaller,
  127. createDecoder: decodeFunc,
  128. perms: perms,
  129. getTailReader: getTailReader,
  130. }, nil
  131. }
  132. // WriteLogEntry writes the provided log message to the current log file.
  133. // This may trigger a rotation event if the max file/capacity limits are hit.
  134. func (w *LogFile) WriteLogEntry(msg *logger.Message) error {
  135. b, err := w.marshal(msg)
  136. if err != nil {
  137. return errors.Wrap(err, "error marshalling log message")
  138. }
  139. logger.PutMessage(msg)
  140. w.mu.Lock()
  141. if w.closed {
  142. w.mu.Unlock()
  143. return errors.New("cannot write because the output file was closed")
  144. }
  145. if err := w.checkCapacityAndRotate(); err != nil {
  146. w.mu.Unlock()
  147. return errors.Wrap(err, "error rotating log file")
  148. }
  149. n, err := w.f.Write(b)
  150. if err == nil {
  151. w.currentSize += int64(n)
  152. w.lastTimestamp = msg.Timestamp
  153. }
  154. w.mu.Unlock()
  155. return errors.Wrap(err, "error writing log entry")
  156. }
  157. func (w *LogFile) checkCapacityAndRotate() (retErr error) {
  158. if w.capacity == -1 {
  159. return nil
  160. }
  161. if w.currentSize < w.capacity {
  162. return nil
  163. }
  164. w.rotateMu.Lock()
  165. noCompress := w.maxFiles <= 1 || !w.compress
  166. defer func() {
  167. // If we aren't going to run the goroutine to compress the log file, then we need to unlock in this function.
  168. // Otherwise the lock will be released in the goroutine that handles compression.
  169. if retErr != nil || noCompress {
  170. w.rotateMu.Unlock()
  171. }
  172. }()
  173. fname := w.f.Name()
  174. if err := w.f.Close(); err != nil {
  175. // if there was an error during a prior rotate, the file could already be closed
  176. if !errors.Is(err, os.ErrClosed) {
  177. return errors.Wrap(err, "error closing file")
  178. }
  179. }
  180. if err := rotate(fname, w.maxFiles, w.compress); err != nil {
  181. logrus.WithError(err).Warn("Error rotating log file, log data may have been lost")
  182. } else {
  183. var renameErr error
  184. for i := 0; i < 10; i++ {
  185. if renameErr = os.Rename(fname, fname+".1"); renameErr != nil && !os.IsNotExist(renameErr) {
  186. logrus.WithError(renameErr).WithField("file", fname).Debug("Error rotating current container log file, evicting readers and retrying")
  187. w.notifyReaders.Publish(renameErr)
  188. time.Sleep(100 * time.Millisecond)
  189. continue
  190. }
  191. break
  192. }
  193. if renameErr != nil {
  194. logrus.WithError(renameErr).Error("Error renaming current log file")
  195. }
  196. }
  197. file, err := openFile(fname, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, w.perms)
  198. if err != nil {
  199. return err
  200. }
  201. w.f = file
  202. w.currentSize = 0
  203. w.notifyReaders.Publish(struct{}{})
  204. if noCompress {
  205. return nil
  206. }
  207. ts := w.lastTimestamp
  208. go func() {
  209. if err := compressFile(fname+".1", ts); err != nil {
  210. logrus.WithError(err).Error("Error compressing log file after rotation")
  211. }
  212. w.rotateMu.Unlock()
  213. }()
  214. return nil
  215. }
  216. func rotate(name string, maxFiles int, compress bool) error {
  217. if maxFiles < 2 {
  218. return nil
  219. }
  220. var extension string
  221. if compress {
  222. extension = ".gz"
  223. }
  224. lastFile := fmt.Sprintf("%s.%d%s", name, maxFiles-1, extension)
  225. err := os.Remove(lastFile)
  226. if err != nil && !os.IsNotExist(err) {
  227. return errors.Wrap(err, "error removing oldest log file")
  228. }
  229. for i := maxFiles - 1; i > 1; i-- {
  230. toPath := name + "." + strconv.Itoa(i) + extension
  231. fromPath := name + "." + strconv.Itoa(i-1) + extension
  232. logrus.WithField("source", fromPath).WithField("target", toPath).Trace("Rotating log file")
  233. if err := os.Rename(fromPath, toPath); err != nil && !os.IsNotExist(err) {
  234. return err
  235. }
  236. }
  237. return nil
  238. }
  239. func compressFile(fileName string, lastTimestamp time.Time) (retErr error) {
  240. file, err := open(fileName)
  241. if err != nil {
  242. if os.IsNotExist(err) {
  243. logrus.WithField("file", fileName).WithError(err).Debug("Could not open log file to compress")
  244. return nil
  245. }
  246. return errors.Wrap(err, "failed to open log file")
  247. }
  248. defer func() {
  249. file.Close()
  250. if retErr == nil {
  251. err := os.Remove(fileName)
  252. if err != nil && !os.IsNotExist(err) {
  253. retErr = errors.Wrap(err, "failed to remove source log file")
  254. }
  255. }
  256. }()
  257. outFile, err := openFile(fileName+".gz", os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0640)
  258. if err != nil {
  259. return errors.Wrap(err, "failed to open or create gzip log file")
  260. }
  261. defer func() {
  262. outFile.Close()
  263. if retErr != nil {
  264. if err := os.Remove(fileName + ".gz"); err != nil && !os.IsExist(err) {
  265. logrus.WithError(err).Error("Error cleaning up after failed log compression")
  266. }
  267. }
  268. }()
  269. compressWriter := gzip.NewWriter(outFile)
  270. defer compressWriter.Close()
  271. // Add the last log entry timestamp to the gzip header
  272. extra := rotateFileMetadata{}
  273. extra.LastTime = lastTimestamp
  274. compressWriter.Header.Extra, err = json.Marshal(&extra)
  275. if err != nil {
  276. // Here log the error only and don't return since this is just an optimization.
  277. logrus.Warningf("Failed to marshal gzip header as JSON: %v", err)
  278. }
  279. _, err = pools.Copy(compressWriter, file)
  280. if err != nil {
  281. return errors.Wrapf(err, "error compressing log file %s", fileName)
  282. }
  283. return nil
  284. }
  285. // MaxFiles return maximum number of files
  286. func (w *LogFile) MaxFiles() int {
  287. return w.maxFiles
  288. }
  289. // Close closes underlying file and signals all readers to stop.
  290. func (w *LogFile) Close() error {
  291. w.mu.Lock()
  292. defer w.mu.Unlock()
  293. if w.closed {
  294. return nil
  295. }
  296. if err := w.f.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
  297. return err
  298. }
  299. w.closed = true
  300. return nil
  301. }
  302. // ReadLogs decodes entries from log files and sends them the passed in watcher
  303. //
  304. // Note: Using the follow option can become inconsistent in cases with very frequent rotations and max log files is 1.
  305. // TODO: Consider a different implementation which can effectively follow logs under frequent rotations.
  306. func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) {
  307. w.mu.RLock()
  308. currentFile, err := open(w.f.Name())
  309. if err != nil {
  310. w.mu.RUnlock()
  311. watcher.Err <- err
  312. return
  313. }
  314. defer currentFile.Close()
  315. dec := w.createDecoder(nil)
  316. defer dec.Close()
  317. currentChunk, err := newSectionReader(currentFile)
  318. if err != nil {
  319. w.mu.RUnlock()
  320. watcher.Err <- err
  321. return
  322. }
  323. notifyEvict := w.notifyReaders.SubscribeTopicWithBuffer(func(i interface{}) bool {
  324. _, ok := i.(error)
  325. return ok
  326. }, 1)
  327. defer w.notifyReaders.Evict(notifyEvict)
  328. if config.Tail != 0 {
  329. // TODO(@cpuguy83): Instead of opening every file, only get the files which
  330. // are needed to tail.
  331. // This is especially costly when compression is enabled.
  332. files, err := w.openRotatedFiles(config)
  333. w.mu.RUnlock()
  334. if err != nil {
  335. watcher.Err <- err
  336. return
  337. }
  338. closeFiles := func() {
  339. for _, f := range files {
  340. f.Close()
  341. fileName := f.Name()
  342. if strings.HasSuffix(fileName, tmpLogfileSuffix) {
  343. err := w.filesRefCounter.Dereference(fileName)
  344. if err != nil {
  345. logrus.WithError(err).WithField("file", fileName).Error("Failed to dereference the log file")
  346. }
  347. }
  348. }
  349. }
  350. readers := make([]SizeReaderAt, 0, len(files)+1)
  351. for _, f := range files {
  352. stat, err := f.Stat()
  353. if err != nil {
  354. watcher.Err <- errors.Wrap(err, "error reading size of rotated file")
  355. closeFiles()
  356. return
  357. }
  358. readers = append(readers, io.NewSectionReader(f, 0, stat.Size()))
  359. }
  360. if currentChunk.Size() > 0 {
  361. readers = append(readers, currentChunk)
  362. }
  363. ok := tailFiles(readers, watcher, dec, w.getTailReader, config, notifyEvict)
  364. closeFiles()
  365. if !ok {
  366. return
  367. }
  368. w.mu.RLock()
  369. }
  370. if !config.Follow || w.closed {
  371. w.mu.RUnlock()
  372. return
  373. }
  374. w.mu.RUnlock()
  375. notifyRotate := w.notifyReaders.SubscribeTopic(func(i interface{}) bool {
  376. _, ok := i.(struct{})
  377. return ok
  378. })
  379. defer w.notifyReaders.Evict(notifyRotate)
  380. followLogs(currentFile, watcher, notifyRotate, notifyEvict, dec, config.Since, config.Until)
  381. }
  382. func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (files []*os.File, err error) {
  383. w.rotateMu.Lock()
  384. defer w.rotateMu.Unlock()
  385. defer func() {
  386. if err == nil {
  387. return
  388. }
  389. for _, f := range files {
  390. f.Close()
  391. if strings.HasSuffix(f.Name(), tmpLogfileSuffix) {
  392. err := os.Remove(f.Name())
  393. if err != nil && !os.IsNotExist(err) {
  394. logrus.Warnf("Failed to remove logfile: %v", err)
  395. }
  396. }
  397. }
  398. }()
  399. for i := w.maxFiles; i > 1; i-- {
  400. f, err := open(fmt.Sprintf("%s.%d", w.f.Name(), i-1))
  401. if err != nil {
  402. if !os.IsNotExist(err) {
  403. return nil, errors.Wrap(err, "error opening rotated log file")
  404. }
  405. fileName := fmt.Sprintf("%s.%d.gz", w.f.Name(), i-1)
  406. decompressedFileName := fileName + tmpLogfileSuffix
  407. tmpFile, err := w.filesRefCounter.GetReference(decompressedFileName, func(refFileName string, exists bool) (*os.File, error) {
  408. if exists {
  409. return open(refFileName)
  410. }
  411. return decompressfile(fileName, refFileName, config.Since)
  412. })
  413. if err != nil {
  414. if !errors.Is(err, os.ErrNotExist) {
  415. return nil, errors.Wrap(err, "error getting reference to decompressed log file")
  416. }
  417. continue
  418. }
  419. if tmpFile == nil {
  420. // The log before `config.Since` does not need to read
  421. break
  422. }
  423. files = append(files, tmpFile)
  424. continue
  425. }
  426. files = append(files, f)
  427. }
  428. return files, nil
  429. }
  430. func decompressfile(fileName, destFileName string, since time.Time) (*os.File, error) {
  431. cf, err := open(fileName)
  432. if err != nil {
  433. return nil, errors.Wrap(err, "error opening file for decompression")
  434. }
  435. defer cf.Close()
  436. rc, err := gzip.NewReader(cf)
  437. if err != nil {
  438. return nil, errors.Wrap(err, "error making gzip reader for compressed log file")
  439. }
  440. defer rc.Close()
  441. // Extract the last log entry timestramp from the gzip header
  442. extra := &rotateFileMetadata{}
  443. err = json.Unmarshal(rc.Header.Extra, extra)
  444. if err == nil && extra.LastTime.Before(since) {
  445. return nil, nil
  446. }
  447. rs, err := openFile(destFileName, os.O_CREATE|os.O_RDWR, 0640)
  448. if err != nil {
  449. return nil, errors.Wrap(err, "error creating file for copying decompressed log stream")
  450. }
  451. _, err = pools.Copy(rs, rc)
  452. if err != nil {
  453. rs.Close()
  454. rErr := os.Remove(rs.Name())
  455. if rErr != nil && !os.IsNotExist(rErr) {
  456. logrus.Errorf("Failed to remove logfile: %v", rErr)
  457. }
  458. return nil, errors.Wrap(err, "error while copying decompressed log stream to file")
  459. }
  460. return rs, nil
  461. }
  462. func newSectionReader(f *os.File) (*io.SectionReader, error) {
  463. // seek to the end to get the size
  464. // we'll leave this at the end of the file since section reader does not advance the reader
  465. size, err := f.Seek(0, io.SeekEnd)
  466. if err != nil {
  467. return nil, errors.Wrap(err, "error getting current file size")
  468. }
  469. return io.NewSectionReader(f, 0, size), nil
  470. }
  471. func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, getTailReader GetTailReaderFunc, config logger.ReadConfig, notifyEvict <-chan interface{}) (cont bool) {
  472. nLines := config.Tail
  473. ctx, cancel := context.WithCancel(context.Background())
  474. defer cancel()
  475. cont = true
  476. // TODO(@cpuguy83): we should plumb a context through instead of dealing with `WatchClose()` here.
  477. go func() {
  478. select {
  479. case err := <-notifyEvict:
  480. if err != nil {
  481. watcher.Err <- err.(error)
  482. cont = false
  483. cancel()
  484. }
  485. case <-ctx.Done():
  486. case <-watcher.WatchConsumerGone():
  487. cont = false
  488. cancel()
  489. }
  490. }()
  491. readers := make([]io.Reader, 0, len(files))
  492. if config.Tail > 0 {
  493. for i := len(files) - 1; i >= 0 && nLines > 0; i-- {
  494. tail, n, err := getTailReader(ctx, files[i], nLines)
  495. if err != nil {
  496. watcher.Err <- errors.Wrap(err, "error finding file position to start log tailing")
  497. return
  498. }
  499. nLines -= n
  500. readers = append([]io.Reader{tail}, readers...)
  501. }
  502. } else {
  503. for _, r := range files {
  504. readers = append(readers, &wrappedReaderAt{ReaderAt: r})
  505. }
  506. }
  507. rdr := io.MultiReader(readers...)
  508. dec.Reset(rdr)
  509. for {
  510. msg, err := dec.Decode()
  511. if err != nil {
  512. if !errors.Is(err, io.EOF) {
  513. watcher.Err <- err
  514. }
  515. return
  516. }
  517. if !config.Since.IsZero() && msg.Timestamp.Before(config.Since) {
  518. continue
  519. }
  520. if !config.Until.IsZero() && msg.Timestamp.After(config.Until) {
  521. return
  522. }
  523. select {
  524. case <-ctx.Done():
  525. return
  526. case watcher.Msg <- msg:
  527. }
  528. }
  529. }
  530. func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate, notifyEvict chan interface{}, dec Decoder, since, until time.Time) {
  531. dec.Reset(f)
  532. name := f.Name()
  533. fileWatcher, err := watchFile(name)
  534. if err != nil {
  535. logWatcher.Err <- err
  536. return
  537. }
  538. defer func() {
  539. f.Close()
  540. dec.Close()
  541. fileWatcher.Close()
  542. }()
  543. var retries int
  544. handleRotate := func() error {
  545. f.Close()
  546. fileWatcher.Remove(name)
  547. // retry when the file doesn't exist
  548. for retries := 0; retries <= 5; retries++ {
  549. f, err = open(name)
  550. if err == nil || !os.IsNotExist(err) {
  551. break
  552. }
  553. }
  554. if err != nil {
  555. return err
  556. }
  557. if err := fileWatcher.Add(name); err != nil {
  558. return err
  559. }
  560. dec.Reset(f)
  561. return nil
  562. }
  563. errRetry := errors.New("retry")
  564. errDone := errors.New("done")
  565. handleMustClose := func(evictErr error) {
  566. f.Close()
  567. dec.Close()
  568. logWatcher.Err <- errors.Wrap(err, "log reader evicted due to errors")
  569. logrus.WithField("file", f.Name()).Error("Log reader notified that it must re-open log file, some log data may not be streamed to the client.")
  570. }
  571. waitRead := func() error {
  572. select {
  573. case e := <-notifyEvict:
  574. if e != nil {
  575. err := e.(error)
  576. handleMustClose(err)
  577. }
  578. return errDone
  579. case e := <-fileWatcher.Events():
  580. switch e.Op {
  581. case fsnotify.Write:
  582. dec.Reset(f)
  583. return nil
  584. case fsnotify.Rename, fsnotify.Remove:
  585. select {
  586. case <-notifyRotate:
  587. case <-logWatcher.WatchProducerGone():
  588. return errDone
  589. case <-logWatcher.WatchConsumerGone():
  590. return errDone
  591. }
  592. if err := handleRotate(); err != nil {
  593. return err
  594. }
  595. return nil
  596. }
  597. return errRetry
  598. case err := <-fileWatcher.Errors():
  599. logrus.Debugf("logger got error watching file: %v", err)
  600. // Something happened, let's try and stay alive and create a new watcher
  601. if retries <= 5 {
  602. fileWatcher.Close()
  603. fileWatcher, err = watchFile(name)
  604. if err != nil {
  605. return err
  606. }
  607. retries++
  608. return errRetry
  609. }
  610. return err
  611. case <-logWatcher.WatchProducerGone():
  612. return errDone
  613. case <-logWatcher.WatchConsumerGone():
  614. return errDone
  615. }
  616. }
  617. oldSize := int64(-1)
  618. handleDecodeErr := func(err error) error {
  619. if !errors.Is(err, io.EOF) {
  620. return err
  621. }
  622. // Handle special case (#39235): max-file=1 and file was truncated
  623. st, stErr := f.Stat()
  624. if stErr == nil {
  625. size := st.Size()
  626. defer func() { oldSize = size }()
  627. if size < oldSize { // truncated
  628. f.Seek(0, 0)
  629. dec.Reset(f)
  630. return nil
  631. }
  632. } else {
  633. logrus.WithError(stErr).Warn("logger: stat error")
  634. }
  635. for {
  636. err := waitRead()
  637. if err == nil {
  638. break
  639. }
  640. if err == errRetry {
  641. continue
  642. }
  643. return err
  644. }
  645. return nil
  646. }
  647. // main loop
  648. for {
  649. select {
  650. case err := <-notifyEvict:
  651. if err != nil {
  652. handleMustClose(err.(error))
  653. }
  654. return
  655. default:
  656. }
  657. msg, err := dec.Decode()
  658. if err != nil {
  659. if err := handleDecodeErr(err); err != nil {
  660. if err == errDone {
  661. return
  662. }
  663. // we got an unrecoverable error, so return
  664. logWatcher.Err <- err
  665. return
  666. }
  667. // ready to try again
  668. continue
  669. }
  670. retries = 0 // reset retries since we've succeeded
  671. if !since.IsZero() && msg.Timestamp.Before(since) {
  672. continue
  673. }
  674. if !until.IsZero() && msg.Timestamp.After(until) {
  675. return
  676. }
  677. // send the message, unless the consumer is gone
  678. select {
  679. case e := <-notifyEvict:
  680. if e != nil {
  681. err := e.(error)
  682. logrus.WithError(err).Debug("Reader evicted while sending log message")
  683. logWatcher.Err <- err
  684. }
  685. return
  686. case logWatcher.Msg <- msg:
  687. case <-logWatcher.WatchConsumerGone():
  688. return
  689. }
  690. }
  691. }
  692. func watchFile(name string) (filenotify.FileWatcher, error) {
  693. var fileWatcher filenotify.FileWatcher
  694. if runtime.GOOS == "windows" {
  695. // FileWatcher on Windows files is based on the syscall notifications which has an issue because of file caching.
  696. // It is based on ReadDirectoryChangesW() which doesn't detect writes to the cache. It detects writes to disk only.
  697. // Because of the OS lazy writing, we don't get notifications for file writes and thereby the watcher
  698. // doesn't work. Hence for Windows we will use poll based notifier.
  699. fileWatcher = filenotify.NewPollingWatcher()
  700. } else {
  701. var err error
  702. fileWatcher, err = filenotify.New()
  703. if err != nil {
  704. return nil, err
  705. }
  706. }
  707. logger := logrus.WithFields(logrus.Fields{
  708. "module": "logger",
  709. "file": name,
  710. })
  711. if err := fileWatcher.Add(name); err != nil {
  712. // we will retry using file poller.
  713. logger.WithError(err).Warnf("falling back to file poller")
  714. fileWatcher.Close()
  715. fileWatcher = filenotify.NewPollingWatcher()
  716. if err := fileWatcher.Add(name); err != nil {
  717. fileWatcher.Close()
  718. logger.WithError(err).Debugf("error watching log file for modifications")
  719. return nil, err
  720. }
  721. }
  722. return fileWatcher, nil
  723. }
  724. type wrappedReaderAt struct {
  725. io.ReaderAt
  726. pos int64
  727. }
  728. func (r *wrappedReaderAt) Read(p []byte) (int, error) {
  729. n, err := r.ReaderAt.ReadAt(p, r.pos)
  730. r.pos += int64(n)
  731. return n, err
  732. }