logfile.go 9.7 KB


  1. package loggerutils
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "io"
  7. "os"
  8. "strconv"
  9. "sync"
  10. "time"
  11. "github.com/docker/docker/daemon/logger"
  12. "github.com/docker/docker/daemon/logger/loggerutils/multireader"
  13. "github.com/docker/docker/pkg/filenotify"
  14. "github.com/docker/docker/pkg/pubsub"
  15. "github.com/docker/docker/pkg/tailfile"
  16. "github.com/fsnotify/fsnotify"
  17. "github.com/pkg/errors"
  18. "github.com/sirupsen/logrus"
  19. )
  20. // LogFile is Logger implementation for default Docker logging.
  21. type LogFile struct {
  22. f *os.File // store for closing
  23. closed bool
  24. mu sync.RWMutex
  25. capacity int64 //maximum size of each file
  26. currentSize int64 // current size of the latest file
  27. maxFiles int //maximum number of files
  28. notifyRotate *pubsub.Publisher
  29. marshal logger.MarshalFunc
  30. createDecoder makeDecoderFunc
  31. }
  32. type makeDecoderFunc func(rdr io.Reader) func() (*logger.Message, error)
  33. //NewLogFile creates new LogFile
  34. func NewLogFile(logPath string, capacity int64, maxFiles int, marshaller logger.MarshalFunc, decodeFunc makeDecoderFunc) (*LogFile, error) {
  35. log, err := os.OpenFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0640)
  36. if err != nil {
  37. return nil, err
  38. }
  39. size, err := log.Seek(0, os.SEEK_END)
  40. if err != nil {
  41. return nil, err
  42. }
  43. return &LogFile{
  44. f: log,
  45. capacity: capacity,
  46. currentSize: size,
  47. maxFiles: maxFiles,
  48. notifyRotate: pubsub.NewPublisher(0, 1),
  49. marshal: marshaller,
  50. createDecoder: decodeFunc,
  51. }, nil
  52. }
  53. // WriteLogEntry writes the provided log message to the current log file.
  54. // This may trigger a rotation event if the max file/capacity limits are hit.
  55. func (w *LogFile) WriteLogEntry(msg *logger.Message) error {
  56. b, err := w.marshal(msg)
  57. if err != nil {
  58. return errors.Wrap(err, "error marshalling log message")
  59. }
  60. logger.PutMessage(msg)
  61. w.mu.Lock()
  62. if w.closed {
  63. w.mu.Unlock()
  64. return errors.New("cannot write because the output file was closed")
  65. }
  66. if err := w.checkCapacityAndRotate(); err != nil {
  67. w.mu.Unlock()
  68. return err
  69. }
  70. n, err := w.f.Write(b)
  71. if err == nil {
  72. w.currentSize += int64(n)
  73. }
  74. w.mu.Unlock()
  75. return err
  76. }
  77. func (w *LogFile) checkCapacityAndRotate() error {
  78. if w.capacity == -1 {
  79. return nil
  80. }
  81. if w.currentSize >= w.capacity {
  82. name := w.f.Name()
  83. if err := w.f.Close(); err != nil {
  84. return errors.Wrap(err, "error closing file")
  85. }
  86. if err := rotate(name, w.maxFiles); err != nil {
  87. return err
  88. }
  89. file, err := os.OpenFile(name, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640)
  90. if err != nil {
  91. return err
  92. }
  93. w.f = file
  94. w.currentSize = 0
  95. w.notifyRotate.Publish(struct{}{})
  96. }
  97. return nil
  98. }
  99. func rotate(name string, maxFiles int) error {
  100. if maxFiles < 2 {
  101. return nil
  102. }
  103. for i := maxFiles - 1; i > 1; i-- {
  104. toPath := name + "." + strconv.Itoa(i)
  105. fromPath := name + "." + strconv.Itoa(i-1)
  106. if err := os.Rename(fromPath, toPath); err != nil && !os.IsNotExist(err) {
  107. return errors.Wrap(err, "error rotating old log entries")
  108. }
  109. }
  110. if err := os.Rename(name, name+".1"); err != nil && !os.IsNotExist(err) {
  111. return errors.Wrap(err, "error rotating current log")
  112. }
  113. return nil
  114. }
  115. // LogPath returns the location the given writer logs to.
  116. func (w *LogFile) LogPath() string {
  117. w.mu.Lock()
  118. defer w.mu.Unlock()
  119. return w.f.Name()
  120. }
  121. // MaxFiles return maximum number of files
  122. func (w *LogFile) MaxFiles() int {
  123. return w.maxFiles
  124. }
  125. // Close closes underlying file and signals all readers to stop.
  126. func (w *LogFile) Close() error {
  127. w.mu.Lock()
  128. defer w.mu.Unlock()
  129. if w.closed {
  130. return nil
  131. }
  132. if err := w.f.Close(); err != nil {
  133. return err
  134. }
  135. w.closed = true
  136. return nil
  137. }
  138. // ReadLogs decodes entries from log files and sends them the passed in watcher
  139. func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) {
  140. w.mu.RLock()
  141. files, err := w.openRotatedFiles()
  142. if err != nil {
  143. w.mu.RUnlock()
  144. watcher.Err <- err
  145. return
  146. }
  147. defer func() {
  148. for _, f := range files {
  149. f.Close()
  150. }
  151. }()
  152. currentFile, err := os.Open(w.f.Name())
  153. if err != nil {
  154. w.mu.RUnlock()
  155. watcher.Err <- err
  156. return
  157. }
  158. defer currentFile.Close()
  159. currentChunk, err := newSectionReader(currentFile)
  160. w.mu.RUnlock()
  161. if err != nil {
  162. watcher.Err <- err
  163. return
  164. }
  165. if config.Tail != 0 {
  166. seekers := make([]io.ReadSeeker, 0, len(files)+1)
  167. for _, f := range files {
  168. seekers = append(seekers, f)
  169. }
  170. seekers = append(seekers, currentChunk)
  171. tailFile(multireader.MultiReadSeeker(seekers...), watcher, w.createDecoder, config)
  172. }
  173. w.mu.RLock()
  174. if !config.Follow || w.closed {
  175. w.mu.RUnlock()
  176. return
  177. }
  178. w.mu.RUnlock()
  179. notifyRotate := w.notifyRotate.Subscribe()
  180. defer w.notifyRotate.Evict(notifyRotate)
  181. followLogs(currentFile, watcher, notifyRotate, w.createDecoder, config.Since, config.Until)
  182. }
  183. func (w *LogFile) openRotatedFiles() (files []*os.File, err error) {
  184. defer func() {
  185. if err == nil {
  186. return
  187. }
  188. for _, f := range files {
  189. f.Close()
  190. }
  191. }()
  192. for i := w.maxFiles; i > 1; i-- {
  193. f, err := os.Open(fmt.Sprintf("%s.%d", w.f.Name(), i-1))
  194. if err != nil {
  195. if !os.IsNotExist(err) {
  196. return nil, err
  197. }
  198. continue
  199. }
  200. files = append(files, f)
  201. }
  202. return files, nil
  203. }
  204. func newSectionReader(f *os.File) (*io.SectionReader, error) {
  205. // seek to the end to get the size
  206. // we'll leave this at the end of the file since section reader does not advance the reader
  207. size, err := f.Seek(0, os.SEEK_END)
  208. if err != nil {
  209. return nil, errors.Wrap(err, "error getting current file size")
  210. }
  211. return io.NewSectionReader(f, 0, size), nil
  212. }
  213. type decodeFunc func() (*logger.Message, error)
  214. func tailFile(f io.ReadSeeker, watcher *logger.LogWatcher, createDecoder makeDecoderFunc, config logger.ReadConfig) {
  215. var rdr io.Reader = f
  216. if config.Tail > 0 {
  217. ls, err := tailfile.TailFile(f, config.Tail)
  218. if err != nil {
  219. watcher.Err <- err
  220. return
  221. }
  222. rdr = bytes.NewBuffer(bytes.Join(ls, []byte("\n")))
  223. }
  224. decodeLogLine := createDecoder(rdr)
  225. for {
  226. msg, err := decodeLogLine()
  227. if err != nil {
  228. if err != io.EOF {
  229. watcher.Err <- err
  230. }
  231. return
  232. }
  233. if !config.Since.IsZero() && msg.Timestamp.Before(config.Since) {
  234. continue
  235. }
  236. if !config.Until.IsZero() && msg.Timestamp.After(config.Until) {
  237. return
  238. }
  239. select {
  240. case <-watcher.WatchClose():
  241. return
  242. case watcher.Msg <- msg:
  243. }
  244. }
  245. }
  246. func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, createDecoder makeDecoderFunc, since, until time.Time) {
  247. decodeLogLine := createDecoder(f)
  248. name := f.Name()
  249. fileWatcher, err := watchFile(name)
  250. if err != nil {
  251. logWatcher.Err <- err
  252. return
  253. }
  254. defer func() {
  255. f.Close()
  256. fileWatcher.Remove(name)
  257. fileWatcher.Close()
  258. }()
  259. ctx, cancel := context.WithCancel(context.Background())
  260. defer cancel()
  261. go func() {
  262. select {
  263. case <-logWatcher.WatchClose():
  264. fileWatcher.Remove(name)
  265. cancel()
  266. case <-ctx.Done():
  267. return
  268. }
  269. }()
  270. var retries int
  271. handleRotate := func() error {
  272. f.Close()
  273. fileWatcher.Remove(name)
  274. // retry when the file doesn't exist
  275. for retries := 0; retries <= 5; retries++ {
  276. f, err = os.Open(name)
  277. if err == nil || !os.IsNotExist(err) {
  278. break
  279. }
  280. }
  281. if err != nil {
  282. return err
  283. }
  284. if err := fileWatcher.Add(name); err != nil {
  285. return err
  286. }
  287. decodeLogLine = createDecoder(f)
  288. return nil
  289. }
  290. errRetry := errors.New("retry")
  291. errDone := errors.New("done")
  292. waitRead := func() error {
  293. select {
  294. case e := <-fileWatcher.Events():
  295. switch e.Op {
  296. case fsnotify.Write:
  297. decodeLogLine = createDecoder(f)
  298. return nil
  299. case fsnotify.Rename, fsnotify.Remove:
  300. select {
  301. case <-notifyRotate:
  302. case <-ctx.Done():
  303. return errDone
  304. }
  305. if err := handleRotate(); err != nil {
  306. return err
  307. }
  308. return nil
  309. }
  310. return errRetry
  311. case err := <-fileWatcher.Errors():
  312. logrus.Debug("logger got error watching file: %v", err)
  313. // Something happened, let's try and stay alive and create a new watcher
  314. if retries <= 5 {
  315. fileWatcher.Close()
  316. fileWatcher, err = watchFile(name)
  317. if err != nil {
  318. return err
  319. }
  320. retries++
  321. return errRetry
  322. }
  323. return err
  324. case <-ctx.Done():
  325. return errDone
  326. }
  327. }
  328. handleDecodeErr := func(err error) error {
  329. if err != io.EOF {
  330. return err
  331. }
  332. for {
  333. err := waitRead()
  334. if err == nil {
  335. break
  336. }
  337. if err == errRetry {
  338. continue
  339. }
  340. return err
  341. }
  342. return nil
  343. }
  344. // main loop
  345. for {
  346. msg, err := decodeLogLine()
  347. if err != nil {
  348. if err := handleDecodeErr(err); err != nil {
  349. if err == errDone {
  350. return
  351. }
  352. // we got an unrecoverable error, so return
  353. logWatcher.Err <- err
  354. return
  355. }
  356. // ready to try again
  357. continue
  358. }
  359. retries = 0 // reset retries since we've succeeded
  360. if !since.IsZero() && msg.Timestamp.Before(since) {
  361. continue
  362. }
  363. if !until.IsZero() && msg.Timestamp.After(until) {
  364. return
  365. }
  366. select {
  367. case logWatcher.Msg <- msg:
  368. case <-ctx.Done():
  369. logWatcher.Msg <- msg
  370. for {
  371. msg, err := decodeLogLine()
  372. if err != nil {
  373. return
  374. }
  375. if !since.IsZero() && msg.Timestamp.Before(since) {
  376. continue
  377. }
  378. if !until.IsZero() && msg.Timestamp.After(until) {
  379. return
  380. }
  381. logWatcher.Msg <- msg
  382. }
  383. }
  384. }
  385. }
  386. func watchFile(name string) (filenotify.FileWatcher, error) {
  387. fileWatcher, err := filenotify.New()
  388. if err != nil {
  389. return nil, err
  390. }
  391. logger := logrus.WithFields(logrus.Fields{
  392. "module": "logger",
  393. "fille": name,
  394. })
  395. if err := fileWatcher.Add(name); err != nil {
  396. logger.WithError(err).Warnf("falling back to file poller")
  397. fileWatcher.Close()
  398. fileWatcher = filenotify.NewPollingWatcher()
  399. if err := fileWatcher.Add(name); err != nil {
  400. fileWatcher.Close()
  401. logger.WithError(err).Debugf("error watching log file for modifications")
  402. return nil, err
  403. }
  404. }
  405. return fileWatcher, nil
  406. }