logfile.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666
  1. package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutils"
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "context"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "os"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "time"
  14. "github.com/docker/docker/daemon/logger"
  15. "github.com/docker/docker/daemon/logger/loggerutils/multireader"
  16. "github.com/docker/docker/pkg/filenotify"
  17. "github.com/docker/docker/pkg/pools"
  18. "github.com/docker/docker/pkg/pubsub"
  19. "github.com/docker/docker/pkg/tailfile"
  20. "github.com/fsnotify/fsnotify"
  21. "github.com/pkg/errors"
  22. "github.com/sirupsen/logrus"
  23. )
  24. const tmpLogfileSuffix = ".tmp"
  25. // rotateFileMetadata is a metadata of the gzip header of the compressed log file
  26. type rotateFileMetadata struct {
  27. LastTime time.Time `json:"lastTime,omitempty"`
  28. }
  29. // refCounter is a counter of logfile being referenced
  30. type refCounter struct {
  31. mu sync.Mutex
  32. counter map[string]int
  33. }
  34. // Reference increase the reference counter for specified logfile
  35. func (rc *refCounter) GetReference(fileName string, openRefFile func(fileName string, exists bool) (*os.File, error)) (*os.File, error) {
  36. rc.mu.Lock()
  37. defer rc.mu.Unlock()
  38. var (
  39. file *os.File
  40. err error
  41. )
  42. _, ok := rc.counter[fileName]
  43. file, err = openRefFile(fileName, ok)
  44. if err != nil {
  45. return nil, err
  46. }
  47. if ok {
  48. rc.counter[fileName]++
  49. } else if file != nil {
  50. rc.counter[file.Name()] = 1
  51. }
  52. return file, nil
  53. }
  54. // Dereference reduce the reference counter for specified logfile
  55. func (rc *refCounter) Dereference(fileName string) error {
  56. rc.mu.Lock()
  57. defer rc.mu.Unlock()
  58. rc.counter[fileName]--
  59. if rc.counter[fileName] <= 0 {
  60. delete(rc.counter, fileName)
  61. err := os.Remove(fileName)
  62. if err != nil {
  63. return err
  64. }
  65. }
  66. return nil
  67. }
  68. // LogFile is Logger implementation for default Docker logging.
  69. type LogFile struct {
  70. mu sync.RWMutex // protects the logfile access
  71. f *os.File // store for closing
  72. closed bool
  73. rotateMu sync.Mutex // blocks the next rotation until the current rotation is completed
  74. capacity int64 // maximum size of each file
  75. currentSize int64 // current size of the latest file
  76. maxFiles int // maximum number of files
  77. compress bool // whether old versions of log files are compressed
  78. lastTimestamp time.Time // timestamp of the last log
  79. filesRefCounter refCounter // keep reference-counted of decompressed files
  80. notifyRotate *pubsub.Publisher
  81. marshal logger.MarshalFunc
  82. createDecoder makeDecoderFunc
  83. perms os.FileMode
  84. }
  85. type makeDecoderFunc func(rdr io.Reader) func() (*logger.Message, error)
  86. // NewLogFile creates new LogFile
  87. func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, marshaller logger.MarshalFunc, decodeFunc makeDecoderFunc, perms os.FileMode) (*LogFile, error) {
  88. log, err := os.OpenFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, perms)
  89. if err != nil {
  90. return nil, err
  91. }
  92. size, err := log.Seek(0, os.SEEK_END)
  93. if err != nil {
  94. return nil, err
  95. }
  96. return &LogFile{
  97. f: log,
  98. capacity: capacity,
  99. currentSize: size,
  100. maxFiles: maxFiles,
  101. compress: compress,
  102. filesRefCounter: refCounter{counter: make(map[string]int)},
  103. notifyRotate: pubsub.NewPublisher(0, 1),
  104. marshal: marshaller,
  105. createDecoder: decodeFunc,
  106. perms: perms,
  107. }, nil
  108. }
  109. // WriteLogEntry writes the provided log message to the current log file.
  110. // This may trigger a rotation event if the max file/capacity limits are hit.
  111. func (w *LogFile) WriteLogEntry(msg *logger.Message) error {
  112. b, err := w.marshal(msg)
  113. if err != nil {
  114. return errors.Wrap(err, "error marshalling log message")
  115. }
  116. logger.PutMessage(msg)
  117. w.mu.Lock()
  118. if w.closed {
  119. w.mu.Unlock()
  120. return errors.New("cannot write because the output file was closed")
  121. }
  122. if err := w.checkCapacityAndRotate(); err != nil {
  123. w.mu.Unlock()
  124. return err
  125. }
  126. n, err := w.f.Write(b)
  127. if err == nil {
  128. w.currentSize += int64(n)
  129. w.lastTimestamp = msg.Timestamp
  130. }
  131. w.mu.Unlock()
  132. return err
  133. }
  134. func (w *LogFile) checkCapacityAndRotate() error {
  135. if w.capacity == -1 {
  136. return nil
  137. }
  138. if w.currentSize >= w.capacity {
  139. w.rotateMu.Lock()
  140. fname := w.f.Name()
  141. if err := w.f.Close(); err != nil {
  142. w.rotateMu.Unlock()
  143. return errors.Wrap(err, "error closing file")
  144. }
  145. if err := rotate(fname, w.maxFiles, w.compress); err != nil {
  146. w.rotateMu.Unlock()
  147. return err
  148. }
  149. file, err := os.OpenFile(fname, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, w.perms)
  150. if err != nil {
  151. w.rotateMu.Unlock()
  152. return err
  153. }
  154. w.f = file
  155. w.currentSize = 0
  156. w.notifyRotate.Publish(struct{}{})
  157. if w.maxFiles <= 1 || !w.compress {
  158. w.rotateMu.Unlock()
  159. return nil
  160. }
  161. go func() {
  162. compressFile(fname+".1", w.lastTimestamp)
  163. w.rotateMu.Unlock()
  164. }()
  165. }
  166. return nil
  167. }
  168. func rotate(name string, maxFiles int, compress bool) error {
  169. if maxFiles < 2 {
  170. return nil
  171. }
  172. var extension string
  173. if compress {
  174. extension = ".gz"
  175. }
  176. lastFile := fmt.Sprintf("%s.%d%s", name, maxFiles-1, extension)
  177. err := os.Remove(lastFile)
  178. if err != nil && !os.IsNotExist(err) {
  179. return errors.Wrap(err, "error removing oldest log file")
  180. }
  181. for i := maxFiles - 1; i > 1; i-- {
  182. toPath := name + "." + strconv.Itoa(i) + extension
  183. fromPath := name + "." + strconv.Itoa(i-1) + extension
  184. if err := os.Rename(fromPath, toPath); err != nil && !os.IsNotExist(err) {
  185. return err
  186. }
  187. }
  188. if err := os.Rename(name, name+".1"); err != nil && !os.IsNotExist(err) {
  189. return err
  190. }
  191. return nil
  192. }
  193. func compressFile(fileName string, lastTimestamp time.Time) {
  194. file, err := os.Open(fileName)
  195. if err != nil {
  196. logrus.Errorf("Failed to open log file: %v", err)
  197. return
  198. }
  199. defer func() {
  200. file.Close()
  201. err := os.Remove(fileName)
  202. if err != nil {
  203. logrus.Errorf("Failed to remove source log file: %v", err)
  204. }
  205. }()
  206. outFile, err := os.OpenFile(fileName+".gz", os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0640)
  207. if err != nil {
  208. logrus.Errorf("Failed to open or create gzip log file: %v", err)
  209. return
  210. }
  211. defer func() {
  212. outFile.Close()
  213. if err != nil {
  214. os.Remove(fileName + ".gz")
  215. }
  216. }()
  217. compressWriter := gzip.NewWriter(outFile)
  218. defer compressWriter.Close()
  219. // Add the last log entry timestramp to the gzip header
  220. extra := rotateFileMetadata{}
  221. extra.LastTime = lastTimestamp
  222. compressWriter.Header.Extra, err = json.Marshal(&extra)
  223. if err != nil {
  224. // Here log the error only and don't return since this is just an optimization.
  225. logrus.Warningf("Failed to marshal gzip header as JSON: %v", err)
  226. }
  227. _, err = pools.Copy(compressWriter, file)
  228. if err != nil {
  229. logrus.WithError(err).WithField("module", "container.logs").WithField("file", fileName).Error("Error compressing log file")
  230. return
  231. }
  232. }
  233. // MaxFiles return maximum number of files
  234. func (w *LogFile) MaxFiles() int {
  235. return w.maxFiles
  236. }
  237. // Close closes underlying file and signals all readers to stop.
  238. func (w *LogFile) Close() error {
  239. w.mu.Lock()
  240. defer w.mu.Unlock()
  241. if w.closed {
  242. return nil
  243. }
  244. if err := w.f.Close(); err != nil {
  245. return err
  246. }
  247. w.closed = true
  248. return nil
  249. }
  250. // ReadLogs decodes entries from log files and sends them the passed in watcher
  251. //
  252. // Note: Using the follow option can become inconsistent in cases with very frequent rotations and max log files is 1.
  253. // TODO: Consider a different implementation which can effectively follow logs under frequent rotations.
  254. func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) {
  255. w.mu.RLock()
  256. currentFile, err := os.Open(w.f.Name())
  257. if err != nil {
  258. w.mu.RUnlock()
  259. watcher.Err <- err
  260. return
  261. }
  262. defer currentFile.Close()
  263. currentChunk, err := newSectionReader(currentFile)
  264. if err != nil {
  265. w.mu.RUnlock()
  266. watcher.Err <- err
  267. return
  268. }
  269. if config.Tail != 0 {
  270. files, err := w.openRotatedFiles(config)
  271. if err != nil {
  272. w.mu.RUnlock()
  273. watcher.Err <- err
  274. return
  275. }
  276. w.mu.RUnlock()
  277. seekers := make([]io.ReadSeeker, 0, len(files)+1)
  278. for _, f := range files {
  279. seekers = append(seekers, f)
  280. }
  281. if currentChunk.Size() > 0 {
  282. seekers = append(seekers, currentChunk)
  283. }
  284. if len(seekers) > 0 {
  285. tailFile(multireader.MultiReadSeeker(seekers...), watcher, w.createDecoder, config)
  286. }
  287. for _, f := range files {
  288. f.Close()
  289. fileName := f.Name()
  290. if strings.HasSuffix(fileName, tmpLogfileSuffix) {
  291. err := w.filesRefCounter.Dereference(fileName)
  292. if err != nil {
  293. logrus.Errorf("Failed to dereference the log file %q: %v", fileName, err)
  294. }
  295. }
  296. }
  297. w.mu.RLock()
  298. }
  299. if !config.Follow || w.closed {
  300. w.mu.RUnlock()
  301. return
  302. }
  303. w.mu.RUnlock()
  304. notifyRotate := w.notifyRotate.Subscribe()
  305. defer w.notifyRotate.Evict(notifyRotate)
  306. followLogs(currentFile, watcher, notifyRotate, w.createDecoder, config.Since, config.Until)
  307. }
  308. func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (files []*os.File, err error) {
  309. w.rotateMu.Lock()
  310. defer w.rotateMu.Unlock()
  311. defer func() {
  312. if err == nil {
  313. return
  314. }
  315. for _, f := range files {
  316. f.Close()
  317. if strings.HasSuffix(f.Name(), tmpLogfileSuffix) {
  318. err := os.Remove(f.Name())
  319. if err != nil && !os.IsNotExist(err) {
  320. logrus.Warningf("Failed to remove the logfile %q: %v", f.Name, err)
  321. }
  322. }
  323. }
  324. }()
  325. for i := w.maxFiles; i > 1; i-- {
  326. f, err := os.Open(fmt.Sprintf("%s.%d", w.f.Name(), i-1))
  327. if err != nil {
  328. if !os.IsNotExist(err) {
  329. return nil, errors.Wrap(err, "error opening rotated log file")
  330. }
  331. fileName := fmt.Sprintf("%s.%d.gz", w.f.Name(), i-1)
  332. decompressedFileName := fileName + tmpLogfileSuffix
  333. tmpFile, err := w.filesRefCounter.GetReference(decompressedFileName, func(refFileName string, exists bool) (*os.File, error) {
  334. if exists {
  335. return os.Open(refFileName)
  336. }
  337. return decompressfile(fileName, refFileName, config.Since)
  338. })
  339. if err != nil {
  340. if !os.IsNotExist(errors.Cause(err)) {
  341. return nil, errors.Wrap(err, "error getting reference to decompressed log file")
  342. }
  343. continue
  344. }
  345. if tmpFile == nil {
  346. // The log before `config.Since` does not need to read
  347. break
  348. }
  349. files = append(files, tmpFile)
  350. continue
  351. }
  352. files = append(files, f)
  353. }
  354. return files, nil
  355. }
  356. func decompressfile(fileName, destFileName string, since time.Time) (*os.File, error) {
  357. cf, err := os.Open(fileName)
  358. if err != nil {
  359. return nil, errors.Wrap(err, "error opening file for decompression")
  360. }
  361. defer cf.Close()
  362. rc, err := gzip.NewReader(cf)
  363. if err != nil {
  364. return nil, errors.Wrap(err, "error making gzip reader for compressed log file")
  365. }
  366. defer rc.Close()
  367. // Extract the last log entry timestramp from the gzip header
  368. extra := &rotateFileMetadata{}
  369. err = json.Unmarshal(rc.Header.Extra, extra)
  370. if err == nil && extra.LastTime.Before(since) {
  371. return nil, nil
  372. }
  373. rs, err := os.OpenFile(destFileName, os.O_CREATE|os.O_RDWR, 0640)
  374. if err != nil {
  375. return nil, errors.Wrap(err, "error creating file for copying decompressed log stream")
  376. }
  377. _, err = pools.Copy(rs, rc)
  378. if err != nil {
  379. rs.Close()
  380. rErr := os.Remove(rs.Name())
  381. if rErr != nil && !os.IsNotExist(rErr) {
  382. logrus.Errorf("Failed to remove the logfile %q: %v", rs.Name(), rErr)
  383. }
  384. return nil, errors.Wrap(err, "error while copying decompressed log stream to file")
  385. }
  386. return rs, nil
  387. }
  388. func newSectionReader(f *os.File) (*io.SectionReader, error) {
  389. // seek to the end to get the size
  390. // we'll leave this at the end of the file since section reader does not advance the reader
  391. size, err := f.Seek(0, os.SEEK_END)
  392. if err != nil {
  393. return nil, errors.Wrap(err, "error getting current file size")
  394. }
  395. return io.NewSectionReader(f, 0, size), nil
  396. }
  397. type decodeFunc func() (*logger.Message, error)
  398. func tailFile(f io.ReadSeeker, watcher *logger.LogWatcher, createDecoder makeDecoderFunc, config logger.ReadConfig) {
  399. var rdr io.Reader = f
  400. if config.Tail > 0 {
  401. ls, err := tailfile.TailFile(f, config.Tail)
  402. if err != nil {
  403. watcher.Err <- err
  404. return
  405. }
  406. rdr = bytes.NewBuffer(bytes.Join(ls, []byte("\n")))
  407. }
  408. decodeLogLine := createDecoder(rdr)
  409. for {
  410. msg, err := decodeLogLine()
  411. if err != nil {
  412. if errors.Cause(err) != io.EOF {
  413. watcher.Err <- err
  414. }
  415. return
  416. }
  417. if !config.Since.IsZero() && msg.Timestamp.Before(config.Since) {
  418. continue
  419. }
  420. if !config.Until.IsZero() && msg.Timestamp.After(config.Until) {
  421. return
  422. }
  423. select {
  424. case <-watcher.WatchClose():
  425. return
  426. case watcher.Msg <- msg:
  427. }
  428. }
  429. }
  430. func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, createDecoder makeDecoderFunc, since, until time.Time) {
  431. decodeLogLine := createDecoder(f)
  432. name := f.Name()
  433. fileWatcher, err := watchFile(name)
  434. if err != nil {
  435. logWatcher.Err <- err
  436. return
  437. }
  438. defer func() {
  439. f.Close()
  440. fileWatcher.Remove(name)
  441. fileWatcher.Close()
  442. }()
  443. ctx, cancel := context.WithCancel(context.Background())
  444. defer cancel()
  445. go func() {
  446. select {
  447. case <-logWatcher.WatchClose():
  448. fileWatcher.Remove(name)
  449. cancel()
  450. case <-ctx.Done():
  451. return
  452. }
  453. }()
  454. var retries int
  455. handleRotate := func() error {
  456. f.Close()
  457. fileWatcher.Remove(name)
  458. // retry when the file doesn't exist
  459. for retries := 0; retries <= 5; retries++ {
  460. f, err = os.Open(name)
  461. if err == nil || !os.IsNotExist(err) {
  462. break
  463. }
  464. }
  465. if err != nil {
  466. return err
  467. }
  468. if err := fileWatcher.Add(name); err != nil {
  469. return err
  470. }
  471. decodeLogLine = createDecoder(f)
  472. return nil
  473. }
  474. errRetry := errors.New("retry")
  475. errDone := errors.New("done")
  476. waitRead := func() error {
  477. select {
  478. case e := <-fileWatcher.Events():
  479. switch e.Op {
  480. case fsnotify.Write:
  481. decodeLogLine = createDecoder(f)
  482. return nil
  483. case fsnotify.Rename, fsnotify.Remove:
  484. select {
  485. case <-notifyRotate:
  486. case <-ctx.Done():
  487. return errDone
  488. }
  489. if err := handleRotate(); err != nil {
  490. return err
  491. }
  492. return nil
  493. }
  494. return errRetry
  495. case err := <-fileWatcher.Errors():
  496. logrus.Debug("logger got error watching file: %v", err)
  497. // Something happened, let's try and stay alive and create a new watcher
  498. if retries <= 5 {
  499. fileWatcher.Close()
  500. fileWatcher, err = watchFile(name)
  501. if err != nil {
  502. return err
  503. }
  504. retries++
  505. return errRetry
  506. }
  507. return err
  508. case <-ctx.Done():
  509. return errDone
  510. }
  511. }
  512. handleDecodeErr := func(err error) error {
  513. if errors.Cause(err) != io.EOF {
  514. return err
  515. }
  516. for {
  517. err := waitRead()
  518. if err == nil {
  519. break
  520. }
  521. if err == errRetry {
  522. continue
  523. }
  524. return err
  525. }
  526. return nil
  527. }
  528. // main loop
  529. for {
  530. msg, err := decodeLogLine()
  531. if err != nil {
  532. if err := handleDecodeErr(err); err != nil {
  533. if err == errDone {
  534. return
  535. }
  536. // we got an unrecoverable error, so return
  537. logWatcher.Err <- err
  538. return
  539. }
  540. // ready to try again
  541. continue
  542. }
  543. retries = 0 // reset retries since we've succeeded
  544. if !since.IsZero() && msg.Timestamp.Before(since) {
  545. continue
  546. }
  547. if !until.IsZero() && msg.Timestamp.After(until) {
  548. return
  549. }
  550. select {
  551. case logWatcher.Msg <- msg:
  552. case <-ctx.Done():
  553. logWatcher.Msg <- msg
  554. for {
  555. msg, err := decodeLogLine()
  556. if err != nil {
  557. return
  558. }
  559. if !since.IsZero() && msg.Timestamp.Before(since) {
  560. continue
  561. }
  562. if !until.IsZero() && msg.Timestamp.After(until) {
  563. return
  564. }
  565. logWatcher.Msg <- msg
  566. }
  567. }
  568. }
  569. }
  570. func watchFile(name string) (filenotify.FileWatcher, error) {
  571. fileWatcher, err := filenotify.New()
  572. if err != nil {
  573. return nil, err
  574. }
  575. logger := logrus.WithFields(logrus.Fields{
  576. "module": "logger",
  577. "fille": name,
  578. })
  579. if err := fileWatcher.Add(name); err != nil {
  580. logger.WithError(err).Warnf("falling back to file poller")
  581. fileWatcher.Close()
  582. fileWatcher = filenotify.NewPollingWatcher()
  583. if err := fileWatcher.Add(name); err != nil {
  584. fileWatcher.Close()
  585. logger.WithError(err).Debugf("error watching log file for modifications")
  586. return nil, err
  587. }
  588. }
  589. return fileWatcher, nil
  590. }