read.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. package jsonfilelog
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "os"
  8. "time"
  9. "github.com/fsnotify/fsnotify"
  10. "golang.org/x/net/context"
  11. "github.com/docker/docker/api/types/backend"
  12. "github.com/docker/docker/daemon/logger"
  13. "github.com/docker/docker/daemon/logger/jsonfilelog/multireader"
  14. "github.com/docker/docker/pkg/filenotify"
  15. "github.com/docker/docker/pkg/jsonlog"
  16. "github.com/docker/docker/pkg/tailfile"
  17. "github.com/pkg/errors"
  18. "github.com/sirupsen/logrus"
  19. )
  20. const maxJSONDecodeRetry = 20000
  21. func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) {
  22. l.Reset()
  23. if err := dec.Decode(l); err != nil {
  24. return nil, err
  25. }
  26. var attrs []backend.LogAttr
  27. if len(l.Attrs) != 0 {
  28. attrs = make([]backend.LogAttr, 0, len(l.Attrs))
  29. for k, v := range l.Attrs {
  30. attrs = append(attrs, backend.LogAttr{Key: k, Value: v})
  31. }
  32. }
  33. msg := &logger.Message{
  34. Source: l.Stream,
  35. Timestamp: l.Created,
  36. Line: []byte(l.Log),
  37. Attrs: attrs,
  38. }
  39. return msg, nil
  40. }
  41. // ReadLogs implements the logger's LogReader interface for the logs
  42. // created by this driver.
  43. func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
  44. logWatcher := logger.NewLogWatcher()
  45. go l.readLogs(logWatcher, config)
  46. return logWatcher
  47. }
  48. func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
  49. defer close(logWatcher.Msg)
  50. // lock so the read stream doesn't get corrupted due to rotations or other log data written while we open these files
  51. // This will block writes!!!
  52. l.mu.RLock()
  53. // TODO it would be nice to move a lot of this reader implementation to the rotate logger object
  54. pth := l.writer.LogPath()
  55. var files []io.ReadSeeker
  56. for i := l.writer.MaxFiles(); i > 1; i-- {
  57. f, err := os.Open(fmt.Sprintf("%s.%d", pth, i-1))
  58. if err != nil {
  59. if !os.IsNotExist(err) {
  60. logWatcher.Err <- err
  61. l.mu.RUnlock()
  62. return
  63. }
  64. continue
  65. }
  66. defer f.Close()
  67. files = append(files, f)
  68. }
  69. latestFile, err := os.Open(pth)
  70. if err != nil {
  71. logWatcher.Err <- errors.Wrap(err, "error opening latest log file")
  72. l.mu.RUnlock()
  73. return
  74. }
  75. defer latestFile.Close()
  76. latestChunk, err := newSectionReader(latestFile)
  77. // Now we have the reader sectioned, all fd's opened, we can unlock.
  78. // New writes/rotates will not affect seeking through these files
  79. l.mu.RUnlock()
  80. if err != nil {
  81. logWatcher.Err <- err
  82. return
  83. }
  84. if config.Tail != 0 {
  85. tailer := multireader.MultiReadSeeker(append(files, latestChunk)...)
  86. tailFile(tailer, logWatcher, config.Tail, config.Since)
  87. }
  88. // close all the rotated files
  89. for _, f := range files {
  90. if err := f.(io.Closer).Close(); err != nil {
  91. logrus.WithField("logger", "json-file").Warnf("error closing tailed log file: %v", err)
  92. }
  93. }
  94. if !config.Follow || l.closed {
  95. return
  96. }
  97. notifyRotate := l.writer.NotifyRotate()
  98. defer l.writer.NotifyRotateEvict(notifyRotate)
  99. l.mu.Lock()
  100. l.readers[logWatcher] = struct{}{}
  101. l.mu.Unlock()
  102. followLogs(latestFile, logWatcher, notifyRotate, config.Since)
  103. l.mu.Lock()
  104. delete(l.readers, logWatcher)
  105. l.mu.Unlock()
  106. }
  107. func newSectionReader(f *os.File) (*io.SectionReader, error) {
  108. // seek to the end to get the size
  109. // we'll leave this at the end of the file since section reader does not advance the reader
  110. size, err := f.Seek(0, os.SEEK_END)
  111. if err != nil {
  112. return nil, errors.Wrap(err, "error getting current file size")
  113. }
  114. return io.NewSectionReader(f, 0, size), nil
  115. }
  116. func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since time.Time) {
  117. rdr := io.Reader(f)
  118. if tail > 0 {
  119. ls, err := tailfile.TailFile(f, tail)
  120. if err != nil {
  121. logWatcher.Err <- err
  122. return
  123. }
  124. rdr = bytes.NewBuffer(bytes.Join(ls, []byte("\n")))
  125. }
  126. dec := json.NewDecoder(rdr)
  127. l := &jsonlog.JSONLog{}
  128. for {
  129. msg, err := decodeLogLine(dec, l)
  130. if err != nil {
  131. if err != io.EOF {
  132. logWatcher.Err <- err
  133. }
  134. return
  135. }
  136. if !since.IsZero() && msg.Timestamp.Before(since) {
  137. continue
  138. }
  139. select {
  140. case <-logWatcher.WatchClose():
  141. return
  142. case logWatcher.Msg <- msg:
  143. }
  144. }
  145. }
  146. func watchFile(name string) (filenotify.FileWatcher, error) {
  147. fileWatcher, err := filenotify.New()
  148. if err != nil {
  149. return nil, err
  150. }
  151. if err := fileWatcher.Add(name); err != nil {
  152. logrus.WithField("logger", "json-file").Warnf("falling back to file poller due to error: %v", err)
  153. fileWatcher.Close()
  154. fileWatcher = filenotify.NewPollingWatcher()
  155. if err := fileWatcher.Add(name); err != nil {
  156. fileWatcher.Close()
  157. logrus.Debugf("error watching log file for modifications: %v", err)
  158. return nil, err
  159. }
  160. }
  161. return fileWatcher, nil
  162. }
  163. func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) {
  164. dec := json.NewDecoder(f)
  165. l := &jsonlog.JSONLog{}
  166. name := f.Name()
  167. fileWatcher, err := watchFile(name)
  168. if err != nil {
  169. logWatcher.Err <- err
  170. return
  171. }
  172. defer func() {
  173. f.Close()
  174. fileWatcher.Remove(name)
  175. fileWatcher.Close()
  176. }()
  177. ctx, cancel := context.WithCancel(context.Background())
  178. defer cancel()
  179. go func() {
  180. select {
  181. case <-logWatcher.WatchClose():
  182. fileWatcher.Remove(name)
  183. cancel()
  184. case <-ctx.Done():
  185. return
  186. }
  187. }()
  188. var retries int
  189. handleRotate := func() error {
  190. f.Close()
  191. fileWatcher.Remove(name)
  192. // retry when the file doesn't exist
  193. for retries := 0; retries <= 5; retries++ {
  194. f, err = os.Open(name)
  195. if err == nil || !os.IsNotExist(err) {
  196. break
  197. }
  198. }
  199. if err != nil {
  200. return err
  201. }
  202. if err := fileWatcher.Add(name); err != nil {
  203. return err
  204. }
  205. dec = json.NewDecoder(f)
  206. return nil
  207. }
  208. errRetry := errors.New("retry")
  209. errDone := errors.New("done")
  210. waitRead := func() error {
  211. select {
  212. case e := <-fileWatcher.Events():
  213. switch e.Op {
  214. case fsnotify.Write:
  215. dec = json.NewDecoder(f)
  216. return nil
  217. case fsnotify.Rename, fsnotify.Remove:
  218. select {
  219. case <-notifyRotate:
  220. case <-ctx.Done():
  221. return errDone
  222. }
  223. if err := handleRotate(); err != nil {
  224. return err
  225. }
  226. return nil
  227. }
  228. return errRetry
  229. case err := <-fileWatcher.Errors():
  230. logrus.Debug("logger got error watching file: %v", err)
  231. // Something happened, let's try and stay alive and create a new watcher
  232. if retries <= 5 {
  233. fileWatcher.Close()
  234. fileWatcher, err = watchFile(name)
  235. if err != nil {
  236. return err
  237. }
  238. retries++
  239. return errRetry
  240. }
  241. return err
  242. case <-ctx.Done():
  243. return errDone
  244. }
  245. }
  246. handleDecodeErr := func(err error) error {
  247. if err == io.EOF {
  248. for {
  249. err := waitRead()
  250. if err == nil {
  251. break
  252. }
  253. if err == errRetry {
  254. continue
  255. }
  256. return err
  257. }
  258. return nil
  259. }
  260. // try again because this shouldn't happen
  261. if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry {
  262. dec = json.NewDecoder(f)
  263. retries++
  264. return nil
  265. }
  266. // io.ErrUnexpectedEOF is returned from json.Decoder when there is
  267. // remaining data in the parser's buffer while an io.EOF occurs.
  268. // If the json logger writes a partial json log entry to the disk
  269. // while at the same time the decoder tries to decode it, the race condition happens.
  270. if err == io.ErrUnexpectedEOF && retries <= maxJSONDecodeRetry {
  271. reader := io.MultiReader(dec.Buffered(), f)
  272. dec = json.NewDecoder(reader)
  273. retries++
  274. return nil
  275. }
  276. return err
  277. }
  278. // main loop
  279. for {
  280. msg, err := decodeLogLine(dec, l)
  281. if err != nil {
  282. if err := handleDecodeErr(err); err != nil {
  283. if err == errDone {
  284. return
  285. }
  286. // we got an unrecoverable error, so return
  287. logWatcher.Err <- err
  288. return
  289. }
  290. // ready to try again
  291. continue
  292. }
  293. retries = 0 // reset retries since we've succeeded
  294. if !since.IsZero() && msg.Timestamp.Before(since) {
  295. continue
  296. }
  297. select {
  298. case logWatcher.Msg <- msg:
  299. case <-ctx.Done():
  300. logWatcher.Msg <- msg
  301. for {
  302. msg, err := decodeLogLine(dec, l)
  303. if err != nil {
  304. return
  305. }
  306. if !since.IsZero() && msg.Timestamp.Before(since) {
  307. continue
  308. }
  309. logWatcher.Msg <- msg
  310. }
  311. }
  312. }
  313. }