follow.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutils"
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "os"
  7. "github.com/containerd/containerd/log"
  8. "github.com/docker/docker/daemon/logger"
  9. "github.com/pkg/errors"
  10. "github.com/sirupsen/logrus"
  11. )
  12. type follow struct {
  13. LogFile *LogFile
  14. Watcher *logger.LogWatcher
  15. Decoder Decoder
  16. Forwarder *forwarder
  17. log *logrus.Entry
  18. c chan logPos
  19. }
  20. // Do follows the log file as it is written, starting from f at read.
  21. func (fl *follow) Do(f *os.File, read logPos) {
  22. fl.log = log.G(context.TODO()).WithFields(logrus.Fields{
  23. "module": "logger",
  24. "file": f.Name(),
  25. })
  26. // Optimization: allocate the write-notifications channel only once and
  27. // reuse it for multiple invocations of nextPos().
  28. fl.c = make(chan logPos, 1)
  29. defer func() {
  30. if err := f.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
  31. fl.log.WithError(err).Warn("error closing current log file")
  32. }
  33. }()
  34. for {
  35. wrote, ok := fl.nextPos(read)
  36. if !ok {
  37. return
  38. }
  39. if wrote.rotation != read.rotation {
  40. // Flush the current file before moving on to the next.
  41. if _, err := f.Seek(read.size, io.SeekStart); err != nil {
  42. fl.Watcher.Err <- err
  43. return
  44. }
  45. if !fl.forward(f) {
  46. return
  47. }
  48. // Open the new file, which has the same name as the old
  49. // file thanks to file rotation. Make no mistake: they
  50. // are different files, with distinct identities.
  51. // Atomically capture the wrote position to make
  52. // absolutely sure that the position corresponds to the
  53. // file we have opened; more rotations could have
  54. // occurred since we previously received it.
  55. if err := f.Close(); err != nil {
  56. fl.log.WithError(err).Warn("error closing rotated log file")
  57. }
  58. var err error
  59. func() {
  60. fl.LogFile.fsopMu.RLock()
  61. st := <-fl.LogFile.read
  62. defer func() {
  63. fl.LogFile.read <- st
  64. fl.LogFile.fsopMu.RUnlock()
  65. }()
  66. f, err = open(f.Name())
  67. wrote = st.pos
  68. }()
  69. // We tried to open the file inside a critical section
  70. // so we shouldn't have been racing the rotation of the
  71. // file. Any error, even fs.ErrNotFound, is exceptional.
  72. if err != nil {
  73. fl.Watcher.Err <- fmt.Errorf("logger: error opening log file for follow after rotation: %w", err)
  74. return
  75. }
  76. if nrot := wrote.rotation - read.rotation; nrot > 1 {
  77. fl.log.WithField("missed-rotations", nrot).
  78. Warn("file rotations were missed while following logs; some log messages have been skipped over")
  79. }
  80. // Set up our read position to start from the top of the file.
  81. read.size = 0
  82. }
  83. if !fl.forward(io.NewSectionReader(f, read.size, wrote.size-read.size)) {
  84. return
  85. }
  86. read = wrote
  87. }
  88. }
  89. // nextPos waits until the write position of the LogFile being followed has
  90. // advanced from current and returns the new position.
  91. func (fl *follow) nextPos(current logPos) (next logPos, ok bool) {
  92. var st logReadState
  93. select {
  94. case <-fl.Watcher.WatchConsumerGone():
  95. return current, false
  96. case st = <-fl.LogFile.read:
  97. }
  98. // Have any any logs been written since we last checked?
  99. if st.pos == current { // Nope.
  100. // Add ourself to the notify list.
  101. st.wait = append(st.wait, fl.c)
  102. } else { // Yes.
  103. // "Notify" ourself immediately.
  104. fl.c <- st.pos
  105. }
  106. fl.LogFile.read <- st
  107. select {
  108. case <-fl.LogFile.closed: // No more logs will be written.
  109. select { // Have we followed to the end?
  110. case next = <-fl.c: // No: received a new position.
  111. default: // Yes.
  112. return current, false
  113. }
  114. case <-fl.Watcher.WatchConsumerGone():
  115. return current, false
  116. case next = <-fl.c:
  117. }
  118. return next, true
  119. }
  120. // forward decodes log messages from r and forwards them to the log watcher.
  121. //
  122. // The return value, cont, signals whether following should continue.
  123. func (fl *follow) forward(r io.Reader) (cont bool) {
  124. fl.Decoder.Reset(r)
  125. return fl.Forwarder.Do(fl.Watcher, fl.Decoder)
  126. }