read.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. package jsonfilelog
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "os"
  8. "time"
  9. "github.com/Sirupsen/logrus"
  10. "github.com/docker/docker/daemon/logger"
  11. "github.com/docker/docker/pkg/filenotify"
  12. "github.com/docker/docker/pkg/ioutils"
  13. "github.com/docker/docker/pkg/jsonlog"
  14. "github.com/docker/docker/pkg/tailfile"
  15. )
  16. const maxJSONDecodeRetry = 20000
  17. func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) {
  18. l.Reset()
  19. if err := dec.Decode(l); err != nil {
  20. return nil, err
  21. }
  22. msg := &logger.Message{
  23. Source: l.Stream,
  24. Timestamp: l.Created,
  25. Line: []byte(l.Log),
  26. Attrs: l.Attrs,
  27. }
  28. return msg, nil
  29. }
  30. // ReadLogs implements the logger's LogReader interface for the logs
  31. // created by this driver.
  32. func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
  33. logWatcher := logger.NewLogWatcher()
  34. go l.readLogs(logWatcher, config)
  35. return logWatcher
  36. }
  37. func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
  38. defer close(logWatcher.Msg)
  39. pth := l.writer.LogPath()
  40. var files []io.ReadSeeker
  41. for i := l.writer.MaxFiles(); i > 1; i-- {
  42. f, err := os.Open(fmt.Sprintf("%s.%d", pth, i-1))
  43. if err != nil {
  44. if !os.IsNotExist(err) {
  45. logWatcher.Err <- err
  46. break
  47. }
  48. continue
  49. }
  50. files = append(files, f)
  51. }
  52. latestFile, err := os.Open(pth)
  53. if err != nil {
  54. logWatcher.Err <- err
  55. return
  56. }
  57. if config.Tail != 0 {
  58. tailer := ioutils.MultiReadSeeker(append(files, latestFile)...)
  59. tailFile(tailer, logWatcher, config.Tail, config.Since)
  60. }
  61. // close all the rotated files
  62. for _, f := range files {
  63. if err := f.(io.Closer).Close(); err != nil {
  64. logrus.WithField("logger", "json-file").Warnf("error closing tailed log file: %v", err)
  65. }
  66. }
  67. if !config.Follow {
  68. if err := latestFile.Close(); err != nil {
  69. logrus.Errorf("Error closing file: %v", err)
  70. }
  71. return
  72. }
  73. if config.Tail >= 0 {
  74. latestFile.Seek(0, os.SEEK_END)
  75. }
  76. l.mu.Lock()
  77. l.readers[logWatcher] = struct{}{}
  78. l.mu.Unlock()
  79. notifyRotate := l.writer.NotifyRotate()
  80. followLogs(latestFile, logWatcher, notifyRotate, config.Since)
  81. l.mu.Lock()
  82. delete(l.readers, logWatcher)
  83. l.mu.Unlock()
  84. l.writer.NotifyRotateEvict(notifyRotate)
  85. }
  86. func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since time.Time) {
  87. var rdr io.Reader = f
  88. if tail > 0 {
  89. ls, err := tailfile.TailFile(f, tail)
  90. if err != nil {
  91. logWatcher.Err <- err
  92. return
  93. }
  94. rdr = bytes.NewBuffer(bytes.Join(ls, []byte("\n")))
  95. }
  96. dec := json.NewDecoder(rdr)
  97. l := &jsonlog.JSONLog{}
  98. for {
  99. msg, err := decodeLogLine(dec, l)
  100. if err != nil {
  101. if err != io.EOF {
  102. logWatcher.Err <- err
  103. }
  104. return
  105. }
  106. if !since.IsZero() && msg.Timestamp.Before(since) {
  107. continue
  108. }
  109. logWatcher.Msg <- msg
  110. }
  111. }
  112. func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) {
  113. dec := json.NewDecoder(f)
  114. l := &jsonlog.JSONLog{}
  115. fileWatcher, err := filenotify.New()
  116. if err != nil {
  117. logWatcher.Err <- err
  118. }
  119. defer func() {
  120. f.Close()
  121. fileWatcher.Close()
  122. }()
  123. name := f.Name()
  124. if err := fileWatcher.Add(name); err != nil {
  125. logrus.WithField("logger", "json-file").Warnf("falling back to file poller due to error: %v", err)
  126. fileWatcher.Close()
  127. fileWatcher = filenotify.NewPollingWatcher()
  128. if err := fileWatcher.Add(name); err != nil {
  129. logrus.Debugf("error watching log file for modifications: %v", err)
  130. logWatcher.Err <- err
  131. return
  132. }
  133. }
  134. var retries int
  135. for {
  136. msg, err := decodeLogLine(dec, l)
  137. if err != nil {
  138. if err != io.EOF {
  139. // try again because this shouldn't happen
  140. if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry {
  141. dec = json.NewDecoder(f)
  142. retries++
  143. continue
  144. }
  145. // io.ErrUnexpectedEOF is returned from json.Decoder when there is
  146. // remaining data in the parser's buffer while an io.EOF occurs.
  147. // If the json logger writes a partial json log entry to the disk
  148. // while at the same time the decoder tries to decode it, the race condition happens.
  149. if err == io.ErrUnexpectedEOF && retries <= maxJSONDecodeRetry {
  150. reader := io.MultiReader(dec.Buffered(), f)
  151. dec = json.NewDecoder(reader)
  152. retries++
  153. continue
  154. }
  155. return
  156. }
  157. select {
  158. case <-fileWatcher.Events():
  159. dec = json.NewDecoder(f)
  160. continue
  161. case <-fileWatcher.Errors():
  162. logWatcher.Err <- err
  163. return
  164. case <-logWatcher.WatchClose():
  165. fileWatcher.Remove(name)
  166. return
  167. case <-notifyRotate:
  168. f.Close()
  169. fileWatcher.Remove(name)
  170. // retry when the file doesn't exist
  171. for retries := 0; retries <= 5; retries++ {
  172. f, err = os.Open(name)
  173. if err == nil || !os.IsNotExist(err) {
  174. break
  175. }
  176. }
  177. if err = fileWatcher.Add(name); err != nil {
  178. logWatcher.Err <- err
  179. return
  180. }
  181. if err != nil {
  182. logWatcher.Err <- err
  183. return
  184. }
  185. dec = json.NewDecoder(f)
  186. continue
  187. }
  188. }
  189. retries = 0 // reset retries since we've succeeded
  190. if !since.IsZero() && msg.Timestamp.Before(since) {
  191. continue
  192. }
  193. select {
  194. case logWatcher.Msg <- msg:
  195. case <-logWatcher.WatchClose():
  196. logWatcher.Msg <- msg
  197. for {
  198. msg, err := decodeLogLine(dec, l)
  199. if err != nil {
  200. return
  201. }
  202. if !since.IsZero() && msg.Timestamp.Before(since) {
  203. continue
  204. }
  205. logWatcher.Msg <- msg
  206. }
  207. }
  208. }
  209. }