follow.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutils"
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "os"
  7. "github.com/containerd/log"
  8. "github.com/docker/docker/daemon/logger"
  9. "github.com/pkg/errors"
  10. )
  11. type follow struct {
  12. LogFile *LogFile
  13. Watcher *logger.LogWatcher
  14. Decoder Decoder
  15. Forwarder *forwarder
  16. log *log.Entry
  17. c chan logPos
  18. }
  19. // Do follows the log file as it is written, starting from f at read.
  20. func (fl *follow) Do(f *os.File, read logPos) {
  21. fl.log = log.G(context.TODO()).WithFields(log.Fields{
  22. "module": "logger",
  23. "file": f.Name(),
  24. })
  25. // Optimization: allocate the write-notifications channel only once and
  26. // reuse it for multiple invocations of nextPos().
  27. fl.c = make(chan logPos, 1)
  28. defer func() {
  29. if err := f.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
  30. fl.log.WithError(err).Warn("error closing current log file")
  31. }
  32. }()
  33. for {
  34. wrote, ok := fl.nextPos(read)
  35. if !ok {
  36. return
  37. }
  38. if wrote.rotation != read.rotation {
  39. // Flush the current file before moving on to the next.
  40. if _, err := f.Seek(read.size, io.SeekStart); err != nil {
  41. fl.Watcher.Err <- err
  42. return
  43. }
  44. if !fl.forward(f) {
  45. return
  46. }
  47. // Open the new file, which has the same name as the old
  48. // file thanks to file rotation. Make no mistake: they
  49. // are different files, with distinct identities.
  50. // Atomically capture the wrote position to make
  51. // absolutely sure that the position corresponds to the
  52. // file we have opened; more rotations could have
  53. // occurred since we previously received it.
  54. if err := f.Close(); err != nil {
  55. fl.log.WithError(err).Warn("error closing rotated log file")
  56. }
  57. var err error
  58. func() {
  59. fl.LogFile.fsopMu.RLock()
  60. st := <-fl.LogFile.read
  61. defer func() {
  62. fl.LogFile.read <- st
  63. fl.LogFile.fsopMu.RUnlock()
  64. }()
  65. f, err = open(f.Name())
  66. wrote = st.pos
  67. }()
  68. // We tried to open the file inside a critical section
  69. // so we shouldn't have been racing the rotation of the
  70. // file. Any error, even fs.ErrNotFound, is exceptional.
  71. if err != nil {
  72. fl.Watcher.Err <- fmt.Errorf("logger: error opening log file for follow after rotation: %w", err)
  73. return
  74. }
  75. if nrot := wrote.rotation - read.rotation; nrot > 1 {
  76. fl.log.WithField("missed-rotations", nrot).
  77. Warn("file rotations were missed while following logs; some log messages have been skipped over")
  78. }
  79. // Set up our read position to start from the top of the file.
  80. read.size = 0
  81. }
  82. if !fl.forward(io.NewSectionReader(f, read.size, wrote.size-read.size)) {
  83. return
  84. }
  85. read = wrote
  86. }
  87. }
  88. // nextPos waits until the write position of the LogFile being followed has
  89. // advanced from current and returns the new position.
  90. func (fl *follow) nextPos(current logPos) (next logPos, ok bool) {
  91. var st logReadState
  92. select {
  93. case <-fl.Watcher.WatchConsumerGone():
  94. return current, false
  95. case st = <-fl.LogFile.read:
  96. }
  97. // Have any any logs been written since we last checked?
  98. if st.pos == current { // Nope.
  99. // Add ourself to the notify list.
  100. st.wait = append(st.wait, fl.c)
  101. } else { // Yes.
  102. // "Notify" ourself immediately.
  103. fl.c <- st.pos
  104. }
  105. fl.LogFile.read <- st
  106. select {
  107. case <-fl.LogFile.closed: // No more logs will be written.
  108. select { // Have we followed to the end?
  109. case next = <-fl.c: // No: received a new position.
  110. default: // Yes.
  111. return current, false
  112. }
  113. case <-fl.Watcher.WatchConsumerGone():
  114. return current, false
  115. case next = <-fl.c:
  116. }
  117. return next, true
  118. }
  119. // forward decodes log messages from r and forwards them to the log watcher.
  120. //
  121. // The return value, cont, signals whether following should continue.
  122. func (fl *follow) forward(r io.Reader) (cont bool) {
  123. fl.Decoder.Reset(r)
  124. return fl.Forwarder.Do(fl.Watcher, fl.Decoder)
  125. }