logfile.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656
  1. package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutils"
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "context"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "os"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "time"
  14. "github.com/docker/docker/daemon/logger"
  15. "github.com/docker/docker/daemon/logger/loggerutils/multireader"
  16. "github.com/docker/docker/pkg/filenotify"
  17. "github.com/docker/docker/pkg/pools"
  18. "github.com/docker/docker/pkg/pubsub"
  19. "github.com/docker/docker/pkg/tailfile"
  20. "github.com/fsnotify/fsnotify"
  21. "github.com/pkg/errors"
  22. "github.com/sirupsen/logrus"
  23. )
  24. const tmpLogfileSuffix = ".tmp"
  25. // rotateFileMetadata is a metadata of the gzip header of the compressed log file
  26. type rotateFileMetadata struct {
  27. LastTime time.Time `json:"lastTime,omitempty"`
  28. }
  29. // refCounter is a counter of logfile being referenced
  30. type refCounter struct {
  31. mu sync.Mutex
  32. counter map[string]int
  33. }
  34. // Reference increase the reference counter for specified logfile
  35. func (rc *refCounter) GetReference(fileName string, openRefFile func(fileName string, exists bool) (*os.File, error)) (*os.File, error) {
  36. rc.mu.Lock()
  37. defer rc.mu.Unlock()
  38. var (
  39. file *os.File
  40. err error
  41. )
  42. _, ok := rc.counter[fileName]
  43. file, err = openRefFile(fileName, ok)
  44. if err != nil {
  45. return nil, err
  46. }
  47. if ok {
  48. rc.counter[fileName]++
  49. } else if file != nil {
  50. rc.counter[file.Name()] = 1
  51. }
  52. return file, nil
  53. }
  54. // Dereference reduce the reference counter for specified logfile
  55. func (rc *refCounter) Dereference(fileName string) error {
  56. rc.mu.Lock()
  57. defer rc.mu.Unlock()
  58. rc.counter[fileName]--
  59. if rc.counter[fileName] <= 0 {
  60. delete(rc.counter, fileName)
  61. err := os.Remove(fileName)
  62. if err != nil {
  63. return err
  64. }
  65. }
  66. return nil
  67. }
  68. // LogFile is Logger implementation for default Docker logging.
  69. type LogFile struct {
  70. mu sync.RWMutex // protects the logfile access
  71. f *os.File // store for closing
  72. closed bool
  73. rotateMu sync.Mutex // blocks the next rotation until the current rotation is completed
  74. capacity int64 // maximum size of each file
  75. currentSize int64 // current size of the latest file
  76. maxFiles int // maximum number of files
  77. compress bool // whether old versions of log files are compressed
  78. lastTimestamp time.Time // timestamp of the last log
  79. filesRefCounter refCounter // keep reference-counted of decompressed files
  80. notifyRotate *pubsub.Publisher
  81. marshal logger.MarshalFunc
  82. createDecoder makeDecoderFunc
  83. perms os.FileMode
  84. }
  85. type makeDecoderFunc func(rdr io.Reader) func() (*logger.Message, error)
  86. //NewLogFile creates new LogFile
  87. func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, marshaller logger.MarshalFunc, decodeFunc makeDecoderFunc, perms os.FileMode) (*LogFile, error) {
  88. log, err := os.OpenFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, perms)
  89. if err != nil {
  90. return nil, err
  91. }
  92. size, err := log.Seek(0, os.SEEK_END)
  93. if err != nil {
  94. return nil, err
  95. }
  96. return &LogFile{
  97. f: log,
  98. capacity: capacity,
  99. currentSize: size,
  100. maxFiles: maxFiles,
  101. compress: compress,
  102. filesRefCounter: refCounter{counter: make(map[string]int)},
  103. notifyRotate: pubsub.NewPublisher(0, 1),
  104. marshal: marshaller,
  105. createDecoder: decodeFunc,
  106. perms: perms,
  107. }, nil
  108. }
  109. // WriteLogEntry writes the provided log message to the current log file.
  110. // This may trigger a rotation event if the max file/capacity limits are hit.
  111. func (w *LogFile) WriteLogEntry(msg *logger.Message) error {
  112. b, err := w.marshal(msg)
  113. if err != nil {
  114. return errors.Wrap(err, "error marshalling log message")
  115. }
  116. logger.PutMessage(msg)
  117. w.mu.Lock()
  118. if w.closed {
  119. w.mu.Unlock()
  120. return errors.New("cannot write because the output file was closed")
  121. }
  122. if err := w.checkCapacityAndRotate(); err != nil {
  123. w.mu.Unlock()
  124. return err
  125. }
  126. n, err := w.f.Write(b)
  127. if err == nil {
  128. w.currentSize += int64(n)
  129. w.lastTimestamp = msg.Timestamp
  130. }
  131. w.mu.Unlock()
  132. return err
  133. }
  134. func (w *LogFile) checkCapacityAndRotate() error {
  135. if w.capacity == -1 {
  136. return nil
  137. }
  138. if w.currentSize >= w.capacity {
  139. w.rotateMu.Lock()
  140. fname := w.f.Name()
  141. if err := w.f.Close(); err != nil {
  142. w.rotateMu.Unlock()
  143. return errors.Wrap(err, "error closing file")
  144. }
  145. if err := rotate(fname, w.maxFiles, w.compress); err != nil {
  146. w.rotateMu.Unlock()
  147. return err
  148. }
  149. file, err := os.OpenFile(fname, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, w.perms)
  150. if err != nil {
  151. w.rotateMu.Unlock()
  152. return err
  153. }
  154. w.f = file
  155. w.currentSize = 0
  156. w.notifyRotate.Publish(struct{}{})
  157. if w.maxFiles <= 1 || !w.compress {
  158. w.rotateMu.Unlock()
  159. return nil
  160. }
  161. go func() {
  162. compressFile(fname+".1", w.lastTimestamp)
  163. w.rotateMu.Unlock()
  164. }()
  165. }
  166. return nil
  167. }
  168. func rotate(name string, maxFiles int, compress bool) error {
  169. if maxFiles < 2 {
  170. return nil
  171. }
  172. var extension string
  173. if compress {
  174. extension = ".gz"
  175. }
  176. for i := maxFiles - 1; i > 1; i-- {
  177. toPath := name + "." + strconv.Itoa(i) + extension
  178. fromPath := name + "." + strconv.Itoa(i-1) + extension
  179. if err := os.Rename(fromPath, toPath); err != nil && !os.IsNotExist(err) {
  180. return err
  181. }
  182. }
  183. if err := os.Rename(name, name+".1"); err != nil && !os.IsNotExist(err) {
  184. return err
  185. }
  186. return nil
  187. }
  188. func compressFile(fileName string, lastTimestamp time.Time) {
  189. file, err := os.Open(fileName)
  190. if err != nil {
  191. logrus.Errorf("Failed to open log file: %v", err)
  192. return
  193. }
  194. defer func() {
  195. file.Close()
  196. err := os.Remove(fileName)
  197. if err != nil {
  198. logrus.Errorf("Failed to remove source log file: %v", err)
  199. }
  200. }()
  201. outFile, err := os.OpenFile(fileName+".gz", os.O_CREATE|os.O_RDWR, 0640)
  202. if err != nil {
  203. logrus.Errorf("Failed to open or create gzip log file: %v", err)
  204. return
  205. }
  206. defer func() {
  207. outFile.Close()
  208. if err != nil {
  209. os.Remove(fileName + ".gz")
  210. }
  211. }()
  212. compressWriter := gzip.NewWriter(outFile)
  213. defer compressWriter.Close()
  214. // Add the last log entry timestramp to the gzip header
  215. extra := rotateFileMetadata{}
  216. extra.LastTime = lastTimestamp
  217. compressWriter.Header.Extra, err = json.Marshal(&extra)
  218. if err != nil {
  219. // Here log the error only and don't return since this is just an optimization.
  220. logrus.Warningf("Failed to marshal JSON: %v", err)
  221. }
  222. _, err = pools.Copy(compressWriter, file)
  223. if err != nil {
  224. logrus.WithError(err).WithField("module", "container.logs").WithField("file", fileName).Error("Error compressing log file")
  225. return
  226. }
  227. }
  228. // MaxFiles return maximum number of files
  229. func (w *LogFile) MaxFiles() int {
  230. return w.maxFiles
  231. }
  232. // Close closes underlying file and signals all readers to stop.
  233. func (w *LogFile) Close() error {
  234. w.mu.Lock()
  235. defer w.mu.Unlock()
  236. if w.closed {
  237. return nil
  238. }
  239. if err := w.f.Close(); err != nil {
  240. return err
  241. }
  242. w.closed = true
  243. return nil
  244. }
  245. // ReadLogs decodes entries from log files and sends them the passed in watcher
  246. func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) {
  247. w.mu.RLock()
  248. currentFile, err := os.Open(w.f.Name())
  249. if err != nil {
  250. w.mu.RUnlock()
  251. watcher.Err <- err
  252. return
  253. }
  254. defer currentFile.Close()
  255. currentChunk, err := newSectionReader(currentFile)
  256. if err != nil {
  257. w.mu.RUnlock()
  258. watcher.Err <- err
  259. return
  260. }
  261. if config.Tail != 0 {
  262. files, err := w.openRotatedFiles(config)
  263. if err != nil {
  264. w.mu.RUnlock()
  265. watcher.Err <- err
  266. return
  267. }
  268. w.mu.RUnlock()
  269. seekers := make([]io.ReadSeeker, 0, len(files)+1)
  270. for _, f := range files {
  271. seekers = append(seekers, f)
  272. }
  273. if currentChunk.Size() > 0 {
  274. seekers = append(seekers, currentChunk)
  275. }
  276. if len(seekers) > 0 {
  277. tailFile(multireader.MultiReadSeeker(seekers...), watcher, w.createDecoder, config)
  278. }
  279. for _, f := range files {
  280. f.Close()
  281. fileName := f.Name()
  282. if strings.HasSuffix(fileName, tmpLogfileSuffix) {
  283. err := w.filesRefCounter.Dereference(fileName)
  284. if err != nil {
  285. logrus.Errorf("Failed to dereference the log file %q: %v", fileName, err)
  286. }
  287. }
  288. }
  289. w.mu.RLock()
  290. }
  291. if !config.Follow || w.closed {
  292. w.mu.RUnlock()
  293. return
  294. }
  295. w.mu.RUnlock()
  296. notifyRotate := w.notifyRotate.Subscribe()
  297. defer w.notifyRotate.Evict(notifyRotate)
  298. followLogs(currentFile, watcher, notifyRotate, w.createDecoder, config.Since, config.Until)
  299. }
  300. func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (files []*os.File, err error) {
  301. w.rotateMu.Lock()
  302. defer w.rotateMu.Unlock()
  303. defer func() {
  304. if err == nil {
  305. return
  306. }
  307. for _, f := range files {
  308. f.Close()
  309. if strings.HasSuffix(f.Name(), tmpLogfileSuffix) {
  310. err := os.Remove(f.Name())
  311. if err != nil && !os.IsNotExist(err) {
  312. logrus.Warningf("Failed to remove the logfile %q: %v", f.Name, err)
  313. }
  314. }
  315. }
  316. }()
  317. for i := w.maxFiles; i > 1; i-- {
  318. f, err := os.Open(fmt.Sprintf("%s.%d", w.f.Name(), i-1))
  319. if err != nil {
  320. if !os.IsNotExist(err) {
  321. return nil, err
  322. }
  323. fileName := fmt.Sprintf("%s.%d.gz", w.f.Name(), i-1)
  324. decompressedFileName := fileName + tmpLogfileSuffix
  325. tmpFile, err := w.filesRefCounter.GetReference(decompressedFileName, func(refFileName string, exists bool) (*os.File, error) {
  326. if exists {
  327. return os.Open(refFileName)
  328. }
  329. return decompressfile(fileName, refFileName, config.Since)
  330. })
  331. if err != nil {
  332. if !os.IsNotExist(err) {
  333. return nil, err
  334. }
  335. continue
  336. }
  337. if tmpFile == nil {
  338. // The log before `config.Since` does not need to read
  339. break
  340. }
  341. files = append(files, tmpFile)
  342. continue
  343. }
  344. files = append(files, f)
  345. }
  346. return files, nil
  347. }
  348. func decompressfile(fileName, destFileName string, since time.Time) (*os.File, error) {
  349. cf, err := os.Open(fileName)
  350. if err != nil {
  351. return nil, err
  352. }
  353. defer cf.Close()
  354. rc, err := gzip.NewReader(cf)
  355. if err != nil {
  356. return nil, err
  357. }
  358. defer rc.Close()
  359. // Extract the last log entry timestramp from the gzip header
  360. extra := &rotateFileMetadata{}
  361. err = json.Unmarshal(rc.Header.Extra, extra)
  362. if err == nil && extra.LastTime.Before(since) {
  363. return nil, nil
  364. }
  365. rs, err := os.OpenFile(destFileName, os.O_CREATE|os.O_RDWR, 0640)
  366. if err != nil {
  367. return nil, err
  368. }
  369. _, err = pools.Copy(rs, rc)
  370. if err != nil {
  371. rs.Close()
  372. rErr := os.Remove(rs.Name())
  373. if rErr != nil && os.IsNotExist(rErr) {
  374. logrus.Errorf("Failed to remove the logfile %q: %v", rs.Name(), rErr)
  375. }
  376. return nil, err
  377. }
  378. return rs, nil
  379. }
  380. func newSectionReader(f *os.File) (*io.SectionReader, error) {
  381. // seek to the end to get the size
  382. // we'll leave this at the end of the file since section reader does not advance the reader
  383. size, err := f.Seek(0, os.SEEK_END)
  384. if err != nil {
  385. return nil, errors.Wrap(err, "error getting current file size")
  386. }
  387. return io.NewSectionReader(f, 0, size), nil
  388. }
  389. type decodeFunc func() (*logger.Message, error)
  390. func tailFile(f io.ReadSeeker, watcher *logger.LogWatcher, createDecoder makeDecoderFunc, config logger.ReadConfig) {
  391. var rdr io.Reader = f
  392. if config.Tail > 0 {
  393. ls, err := tailfile.TailFile(f, config.Tail)
  394. if err != nil {
  395. watcher.Err <- err
  396. return
  397. }
  398. rdr = bytes.NewBuffer(bytes.Join(ls, []byte("\n")))
  399. }
  400. decodeLogLine := createDecoder(rdr)
  401. for {
  402. msg, err := decodeLogLine()
  403. if err != nil {
  404. if err != io.EOF {
  405. watcher.Err <- err
  406. }
  407. return
  408. }
  409. if !config.Since.IsZero() && msg.Timestamp.Before(config.Since) {
  410. continue
  411. }
  412. if !config.Until.IsZero() && msg.Timestamp.After(config.Until) {
  413. return
  414. }
  415. select {
  416. case <-watcher.WatchClose():
  417. return
  418. case watcher.Msg <- msg:
  419. }
  420. }
  421. }
  422. func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, createDecoder makeDecoderFunc, since, until time.Time) {
  423. decodeLogLine := createDecoder(f)
  424. name := f.Name()
  425. fileWatcher, err := watchFile(name)
  426. if err != nil {
  427. logWatcher.Err <- err
  428. return
  429. }
  430. defer func() {
  431. f.Close()
  432. fileWatcher.Remove(name)
  433. fileWatcher.Close()
  434. }()
  435. ctx, cancel := context.WithCancel(context.Background())
  436. defer cancel()
  437. go func() {
  438. select {
  439. case <-logWatcher.WatchClose():
  440. fileWatcher.Remove(name)
  441. cancel()
  442. case <-ctx.Done():
  443. return
  444. }
  445. }()
  446. var retries int
  447. handleRotate := func() error {
  448. f.Close()
  449. fileWatcher.Remove(name)
  450. // retry when the file doesn't exist
  451. for retries := 0; retries <= 5; retries++ {
  452. f, err = os.Open(name)
  453. if err == nil || !os.IsNotExist(err) {
  454. break
  455. }
  456. }
  457. if err != nil {
  458. return err
  459. }
  460. if err := fileWatcher.Add(name); err != nil {
  461. return err
  462. }
  463. decodeLogLine = createDecoder(f)
  464. return nil
  465. }
  466. errRetry := errors.New("retry")
  467. errDone := errors.New("done")
  468. waitRead := func() error {
  469. select {
  470. case e := <-fileWatcher.Events():
  471. switch e.Op {
  472. case fsnotify.Write:
  473. decodeLogLine = createDecoder(f)
  474. return nil
  475. case fsnotify.Rename, fsnotify.Remove:
  476. select {
  477. case <-notifyRotate:
  478. case <-ctx.Done():
  479. return errDone
  480. }
  481. if err := handleRotate(); err != nil {
  482. return err
  483. }
  484. return nil
  485. }
  486. return errRetry
  487. case err := <-fileWatcher.Errors():
  488. logrus.Debug("logger got error watching file: %v", err)
  489. // Something happened, let's try and stay alive and create a new watcher
  490. if retries <= 5 {
  491. fileWatcher.Close()
  492. fileWatcher, err = watchFile(name)
  493. if err != nil {
  494. return err
  495. }
  496. retries++
  497. return errRetry
  498. }
  499. return err
  500. case <-ctx.Done():
  501. return errDone
  502. }
  503. }
  504. handleDecodeErr := func(err error) error {
  505. if err != io.EOF {
  506. return err
  507. }
  508. for {
  509. err := waitRead()
  510. if err == nil {
  511. break
  512. }
  513. if err == errRetry {
  514. continue
  515. }
  516. return err
  517. }
  518. return nil
  519. }
  520. // main loop
  521. for {
  522. msg, err := decodeLogLine()
  523. if err != nil {
  524. if err := handleDecodeErr(err); err != nil {
  525. if err == errDone {
  526. return
  527. }
  528. // we got an unrecoverable error, so return
  529. logWatcher.Err <- err
  530. return
  531. }
  532. // ready to try again
  533. continue
  534. }
  535. retries = 0 // reset retries since we've succeeded
  536. if !since.IsZero() && msg.Timestamp.Before(since) {
  537. continue
  538. }
  539. if !until.IsZero() && msg.Timestamp.After(until) {
  540. return
  541. }
  542. select {
  543. case logWatcher.Msg <- msg:
  544. case <-ctx.Done():
  545. logWatcher.Msg <- msg
  546. for {
  547. msg, err := decodeLogLine()
  548. if err != nil {
  549. return
  550. }
  551. if !since.IsZero() && msg.Timestamp.Before(since) {
  552. continue
  553. }
  554. if !until.IsZero() && msg.Timestamp.After(until) {
  555. return
  556. }
  557. logWatcher.Msg <- msg
  558. }
  559. }
  560. }
  561. }
  562. func watchFile(name string) (filenotify.FileWatcher, error) {
  563. fileWatcher, err := filenotify.New()
  564. if err != nil {
  565. return nil, err
  566. }
  567. logger := logrus.WithFields(logrus.Fields{
  568. "module": "logger",
  569. "fille": name,
  570. })
  571. if err := fileWatcher.Add(name); err != nil {
  572. logger.WithError(err).Warnf("falling back to file poller")
  573. fileWatcher.Close()
  574. fileWatcher = filenotify.NewPollingWatcher()
  575. if err := fileWatcher.Add(name); err != nil {
  576. fileWatcher.Close()
  577. logger.WithError(err).Debugf("error watching log file for modifications")
  578. return nil, err
  579. }
  580. }
  581. return fileWatcher, nil
  582. }