logfile.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745
  1. package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutils"
  2. import (
  3. "compress/gzip"
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "io"
  8. "os"
  9. "runtime"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "time"
  14. "github.com/docker/docker/daemon/logger"
  15. "github.com/docker/docker/pkg/filenotify"
  16. "github.com/docker/docker/pkg/pools"
  17. "github.com/docker/docker/pkg/pubsub"
  18. "github.com/fsnotify/fsnotify"
  19. "github.com/pkg/errors"
  20. "github.com/sirupsen/logrus"
  21. )
  22. const tmpLogfileSuffix = ".tmp"
  23. // rotateFileMetadata is a metadata of the gzip header of the compressed log file
  24. type rotateFileMetadata struct {
  25. LastTime time.Time `json:"lastTime,omitempty"`
  26. }
  27. // refCounter is a counter of logfile being referenced
  28. type refCounter struct {
  29. mu sync.Mutex
  30. counter map[string]int
  31. }
  32. // Reference increase the reference counter for specified logfile
  33. func (rc *refCounter) GetReference(fileName string, openRefFile func(fileName string, exists bool) (*os.File, error)) (*os.File, error) {
  34. rc.mu.Lock()
  35. defer rc.mu.Unlock()
  36. var (
  37. file *os.File
  38. err error
  39. )
  40. _, ok := rc.counter[fileName]
  41. file, err = openRefFile(fileName, ok)
  42. if err != nil {
  43. return nil, err
  44. }
  45. if ok {
  46. rc.counter[fileName]++
  47. } else if file != nil {
  48. rc.counter[file.Name()] = 1
  49. }
  50. return file, nil
  51. }
  52. // Dereference reduce the reference counter for specified logfile
  53. func (rc *refCounter) Dereference(fileName string) error {
  54. rc.mu.Lock()
  55. defer rc.mu.Unlock()
  56. rc.counter[fileName]--
  57. if rc.counter[fileName] <= 0 {
  58. delete(rc.counter, fileName)
  59. err := os.Remove(fileName)
  60. if err != nil {
  61. return err
  62. }
  63. }
  64. return nil
  65. }
  66. // LogFile is Logger implementation for default Docker logging.
  67. type LogFile struct {
  68. mu sync.RWMutex // protects the logfile access
  69. f *os.File // store for closing
  70. closed bool
  71. rotateMu sync.Mutex // blocks the next rotation until the current rotation is completed
  72. capacity int64 // maximum size of each file
  73. currentSize int64 // current size of the latest file
  74. maxFiles int // maximum number of files
  75. compress bool // whether old versions of log files are compressed
  76. lastTimestamp time.Time // timestamp of the last log
  77. filesRefCounter refCounter // keep reference-counted of decompressed files
  78. notifyRotate *pubsub.Publisher
  79. marshal logger.MarshalFunc
  80. createDecoder MakeDecoderFn
  81. getTailReader GetTailReaderFunc
  82. perms os.FileMode
  83. }
  84. // MakeDecoderFn creates a decoder
  85. type MakeDecoderFn func(rdr io.Reader) Decoder
  86. // Decoder is for reading logs
  87. // It is created by the log reader by calling the `MakeDecoderFunc`
  88. type Decoder interface {
  89. // Reset resets the decoder
  90. // Reset is called for certain events, such as log rotations
  91. Reset(io.Reader)
  92. // Decode decodes the next log messeage from the stream
  93. Decode() (*logger.Message, error)
  94. // Close signals to the decoder that it can release whatever resources it was using.
  95. Close()
  96. }
  97. // SizeReaderAt defines a ReaderAt that also reports its size.
  98. // This is used for tailing log files.
  99. type SizeReaderAt interface {
  100. io.ReaderAt
  101. Size() int64
  102. }
  103. // GetTailReaderFunc is used to truncate a reader to only read as much as is required
  104. // in order to get the passed in number of log lines.
  105. // It returns the sectioned reader, the number of lines that the section reader
  106. // contains, and any error that occurs.
  107. type GetTailReaderFunc func(ctx context.Context, f SizeReaderAt, nLogLines int) (rdr io.Reader, nLines int, err error)
  108. // NewLogFile creates new LogFile
  109. func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, marshaller logger.MarshalFunc, decodeFunc MakeDecoderFn, perms os.FileMode, getTailReader GetTailReaderFunc) (*LogFile, error) {
  110. log, err := openFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, perms)
  111. if err != nil {
  112. return nil, err
  113. }
  114. size, err := log.Seek(0, io.SeekEnd)
  115. if err != nil {
  116. return nil, err
  117. }
  118. return &LogFile{
  119. f: log,
  120. capacity: capacity,
  121. currentSize: size,
  122. maxFiles: maxFiles,
  123. compress: compress,
  124. filesRefCounter: refCounter{counter: make(map[string]int)},
  125. notifyRotate: pubsub.NewPublisher(0, 1),
  126. marshal: marshaller,
  127. createDecoder: decodeFunc,
  128. perms: perms,
  129. getTailReader: getTailReader,
  130. }, nil
  131. }
  132. // WriteLogEntry writes the provided log message to the current log file.
  133. // This may trigger a rotation event if the max file/capacity limits are hit.
  134. func (w *LogFile) WriteLogEntry(msg *logger.Message) error {
  135. b, err := w.marshal(msg)
  136. if err != nil {
  137. return errors.Wrap(err, "error marshalling log message")
  138. }
  139. logger.PutMessage(msg)
  140. w.mu.Lock()
  141. if w.closed {
  142. w.mu.Unlock()
  143. return errors.New("cannot write because the output file was closed")
  144. }
  145. if err := w.checkCapacityAndRotate(); err != nil {
  146. w.mu.Unlock()
  147. return err
  148. }
  149. n, err := w.f.Write(b)
  150. if err == nil {
  151. w.currentSize += int64(n)
  152. w.lastTimestamp = msg.Timestamp
  153. }
  154. w.mu.Unlock()
  155. return err
  156. }
  157. func (w *LogFile) checkCapacityAndRotate() error {
  158. if w.capacity == -1 {
  159. return nil
  160. }
  161. if w.currentSize >= w.capacity {
  162. w.rotateMu.Lock()
  163. fname := w.f.Name()
  164. if err := w.f.Close(); err != nil {
  165. w.rotateMu.Unlock()
  166. return errors.Wrap(err, "error closing file")
  167. }
  168. if err := rotate(fname, w.maxFiles, w.compress); err != nil {
  169. w.rotateMu.Unlock()
  170. return err
  171. }
  172. file, err := openFile(fname, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, w.perms)
  173. if err != nil {
  174. w.rotateMu.Unlock()
  175. return err
  176. }
  177. w.f = file
  178. w.currentSize = 0
  179. w.notifyRotate.Publish(struct{}{})
  180. if w.maxFiles <= 1 || !w.compress {
  181. w.rotateMu.Unlock()
  182. return nil
  183. }
  184. go func() {
  185. compressFile(fname+".1", w.lastTimestamp)
  186. w.rotateMu.Unlock()
  187. }()
  188. }
  189. return nil
  190. }
  191. func rotate(name string, maxFiles int, compress bool) error {
  192. if maxFiles < 2 {
  193. return nil
  194. }
  195. var extension string
  196. if compress {
  197. extension = ".gz"
  198. }
  199. lastFile := fmt.Sprintf("%s.%d%s", name, maxFiles-1, extension)
  200. err := os.Remove(lastFile)
  201. if err != nil && !os.IsNotExist(err) {
  202. return errors.Wrap(err, "error removing oldest log file")
  203. }
  204. for i := maxFiles - 1; i > 1; i-- {
  205. toPath := name + "." + strconv.Itoa(i) + extension
  206. fromPath := name + "." + strconv.Itoa(i-1) + extension
  207. if err := os.Rename(fromPath, toPath); err != nil && !os.IsNotExist(err) {
  208. return err
  209. }
  210. }
  211. if err := os.Rename(name, name+".1"); err != nil && !os.IsNotExist(err) {
  212. return err
  213. }
  214. return nil
  215. }
  216. func compressFile(fileName string, lastTimestamp time.Time) {
  217. file, err := os.Open(fileName)
  218. if err != nil {
  219. logrus.Errorf("Failed to open log file: %v", err)
  220. return
  221. }
  222. defer func() {
  223. file.Close()
  224. err := os.Remove(fileName)
  225. if err != nil {
  226. logrus.Errorf("Failed to remove source log file: %v", err)
  227. }
  228. }()
  229. outFile, err := openFile(fileName+".gz", os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0640)
  230. if err != nil {
  231. logrus.Errorf("Failed to open or create gzip log file: %v", err)
  232. return
  233. }
  234. defer func() {
  235. outFile.Close()
  236. if err != nil {
  237. os.Remove(fileName + ".gz")
  238. }
  239. }()
  240. compressWriter := gzip.NewWriter(outFile)
  241. defer compressWriter.Close()
  242. // Add the last log entry timestamp to the gzip header
  243. extra := rotateFileMetadata{}
  244. extra.LastTime = lastTimestamp
  245. compressWriter.Header.Extra, err = json.Marshal(&extra)
  246. if err != nil {
  247. // Here log the error only and don't return since this is just an optimization.
  248. logrus.Warningf("Failed to marshal gzip header as JSON: %v", err)
  249. }
  250. _, err = pools.Copy(compressWriter, file)
  251. if err != nil {
  252. logrus.WithError(err).WithField("module", "container.logs").WithField("file", fileName).Error("Error compressing log file")
  253. return
  254. }
  255. }
  256. // MaxFiles return maximum number of files
  257. func (w *LogFile) MaxFiles() int {
  258. return w.maxFiles
  259. }
  260. // Close closes underlying file and signals all readers to stop.
  261. func (w *LogFile) Close() error {
  262. w.mu.Lock()
  263. defer w.mu.Unlock()
  264. if w.closed {
  265. return nil
  266. }
  267. if err := w.f.Close(); err != nil {
  268. return err
  269. }
  270. w.closed = true
  271. return nil
  272. }
  273. // ReadLogs decodes entries from log files and sends them the passed in watcher
  274. //
  275. // Note: Using the follow option can become inconsistent in cases with very frequent rotations and max log files is 1.
  276. // TODO: Consider a different implementation which can effectively follow logs under frequent rotations.
  277. func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) {
  278. w.mu.RLock()
  279. currentFile, err := os.Open(w.f.Name())
  280. if err != nil {
  281. w.mu.RUnlock()
  282. watcher.Err <- err
  283. return
  284. }
  285. defer currentFile.Close()
  286. dec := w.createDecoder(nil)
  287. defer dec.Close()
  288. currentChunk, err := newSectionReader(currentFile)
  289. if err != nil {
  290. w.mu.RUnlock()
  291. watcher.Err <- err
  292. return
  293. }
  294. if config.Tail != 0 {
  295. // TODO(@cpuguy83): Instead of opening every file, only get the files which
  296. // are needed to tail.
  297. // This is especially costly when compression is enabled.
  298. files, err := w.openRotatedFiles(config)
  299. w.mu.RUnlock()
  300. if err != nil {
  301. watcher.Err <- err
  302. return
  303. }
  304. closeFiles := func() {
  305. for _, f := range files {
  306. f.Close()
  307. fileName := f.Name()
  308. if strings.HasSuffix(fileName, tmpLogfileSuffix) {
  309. err := w.filesRefCounter.Dereference(fileName)
  310. if err != nil {
  311. logrus.Errorf("Failed to dereference the log file %q: %v", fileName, err)
  312. }
  313. }
  314. }
  315. }
  316. readers := make([]SizeReaderAt, 0, len(files)+1)
  317. for _, f := range files {
  318. stat, err := f.Stat()
  319. if err != nil {
  320. watcher.Err <- errors.Wrap(err, "error reading size of rotated file")
  321. closeFiles()
  322. return
  323. }
  324. readers = append(readers, io.NewSectionReader(f, 0, stat.Size()))
  325. }
  326. if currentChunk.Size() > 0 {
  327. readers = append(readers, currentChunk)
  328. }
  329. tailFiles(readers, watcher, dec, w.getTailReader, config)
  330. closeFiles()
  331. w.mu.RLock()
  332. }
  333. if !config.Follow || w.closed {
  334. w.mu.RUnlock()
  335. return
  336. }
  337. w.mu.RUnlock()
  338. notifyRotate := w.notifyRotate.Subscribe()
  339. defer w.notifyRotate.Evict(notifyRotate)
  340. followLogs(currentFile, watcher, notifyRotate, dec, config.Since, config.Until)
  341. }
  342. func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (files []*os.File, err error) {
  343. w.rotateMu.Lock()
  344. defer w.rotateMu.Unlock()
  345. defer func() {
  346. if err == nil {
  347. return
  348. }
  349. for _, f := range files {
  350. f.Close()
  351. if strings.HasSuffix(f.Name(), tmpLogfileSuffix) {
  352. err := os.Remove(f.Name())
  353. if err != nil && !os.IsNotExist(err) {
  354. logrus.Warnf("Failed to remove logfile: %v", err)
  355. }
  356. }
  357. }
  358. }()
  359. for i := w.maxFiles; i > 1; i-- {
  360. f, err := os.Open(fmt.Sprintf("%s.%d", w.f.Name(), i-1))
  361. if err != nil {
  362. if !os.IsNotExist(err) {
  363. return nil, errors.Wrap(err, "error opening rotated log file")
  364. }
  365. fileName := fmt.Sprintf("%s.%d.gz", w.f.Name(), i-1)
  366. decompressedFileName := fileName + tmpLogfileSuffix
  367. tmpFile, err := w.filesRefCounter.GetReference(decompressedFileName, func(refFileName string, exists bool) (*os.File, error) {
  368. if exists {
  369. return os.Open(refFileName)
  370. }
  371. return decompressfile(fileName, refFileName, config.Since)
  372. })
  373. if err != nil {
  374. if !errors.Is(err, os.ErrNotExist) {
  375. return nil, errors.Wrap(err, "error getting reference to decompressed log file")
  376. }
  377. continue
  378. }
  379. if tmpFile == nil {
  380. // The log before `config.Since` does not need to read
  381. break
  382. }
  383. files = append(files, tmpFile)
  384. continue
  385. }
  386. files = append(files, f)
  387. }
  388. return files, nil
  389. }
  390. func decompressfile(fileName, destFileName string, since time.Time) (*os.File, error) {
  391. cf, err := os.Open(fileName)
  392. if err != nil {
  393. return nil, errors.Wrap(err, "error opening file for decompression")
  394. }
  395. defer cf.Close()
  396. rc, err := gzip.NewReader(cf)
  397. if err != nil {
  398. return nil, errors.Wrap(err, "error making gzip reader for compressed log file")
  399. }
  400. defer rc.Close()
  401. // Extract the last log entry timestramp from the gzip header
  402. extra := &rotateFileMetadata{}
  403. err = json.Unmarshal(rc.Header.Extra, extra)
  404. if err == nil && extra.LastTime.Before(since) {
  405. return nil, nil
  406. }
  407. rs, err := openFile(destFileName, os.O_CREATE|os.O_RDWR, 0640)
  408. if err != nil {
  409. return nil, errors.Wrap(err, "error creating file for copying decompressed log stream")
  410. }
  411. _, err = pools.Copy(rs, rc)
  412. if err != nil {
  413. rs.Close()
  414. rErr := os.Remove(rs.Name())
  415. if rErr != nil && !os.IsNotExist(rErr) {
  416. logrus.Errorf("Failed to remove logfile: %v", rErr)
  417. }
  418. return nil, errors.Wrap(err, "error while copying decompressed log stream to file")
  419. }
  420. return rs, nil
  421. }
  422. func newSectionReader(f *os.File) (*io.SectionReader, error) {
  423. // seek to the end to get the size
  424. // we'll leave this at the end of the file since section reader does not advance the reader
  425. size, err := f.Seek(0, io.SeekEnd)
  426. if err != nil {
  427. return nil, errors.Wrap(err, "error getting current file size")
  428. }
  429. return io.NewSectionReader(f, 0, size), nil
  430. }
  431. func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, getTailReader GetTailReaderFunc, config logger.ReadConfig) {
  432. nLines := config.Tail
  433. ctx, cancel := context.WithCancel(context.Background())
  434. defer cancel()
  435. // TODO(@cpuguy83): we should plumb a context through instead of dealing with `WatchClose()` here.
  436. go func() {
  437. select {
  438. case <-ctx.Done():
  439. case <-watcher.WatchConsumerGone():
  440. cancel()
  441. }
  442. }()
  443. readers := make([]io.Reader, 0, len(files))
  444. if config.Tail > 0 {
  445. for i := len(files) - 1; i >= 0 && nLines > 0; i-- {
  446. tail, n, err := getTailReader(ctx, files[i], nLines)
  447. if err != nil {
  448. watcher.Err <- errors.Wrap(err, "error finding file position to start log tailing")
  449. return
  450. }
  451. nLines -= n
  452. readers = append([]io.Reader{tail}, readers...)
  453. }
  454. } else {
  455. for _, r := range files {
  456. readers = append(readers, &wrappedReaderAt{ReaderAt: r})
  457. }
  458. }
  459. rdr := io.MultiReader(readers...)
  460. dec.Reset(rdr)
  461. for {
  462. msg, err := dec.Decode()
  463. if err != nil {
  464. if !errors.Is(err, io.EOF) {
  465. watcher.Err <- err
  466. }
  467. return
  468. }
  469. if !config.Since.IsZero() && msg.Timestamp.Before(config.Since) {
  470. continue
  471. }
  472. if !config.Until.IsZero() && msg.Timestamp.After(config.Until) {
  473. return
  474. }
  475. select {
  476. case <-ctx.Done():
  477. return
  478. case watcher.Msg <- msg:
  479. }
  480. }
  481. }
  482. func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, dec Decoder, since, until time.Time) {
  483. dec.Reset(f)
  484. name := f.Name()
  485. fileWatcher, err := watchFile(name)
  486. if err != nil {
  487. logWatcher.Err <- err
  488. return
  489. }
  490. defer func() {
  491. f.Close()
  492. fileWatcher.Close()
  493. }()
  494. var retries int
  495. handleRotate := func() error {
  496. f.Close()
  497. fileWatcher.Remove(name)
  498. // retry when the file doesn't exist
  499. for retries := 0; retries <= 5; retries++ {
  500. f, err = os.Open(name)
  501. if err == nil || !os.IsNotExist(err) {
  502. break
  503. }
  504. }
  505. if err != nil {
  506. return err
  507. }
  508. if err := fileWatcher.Add(name); err != nil {
  509. return err
  510. }
  511. dec.Reset(f)
  512. return nil
  513. }
  514. errRetry := errors.New("retry")
  515. errDone := errors.New("done")
  516. waitRead := func() error {
  517. select {
  518. case e := <-fileWatcher.Events():
  519. switch e.Op {
  520. case fsnotify.Write:
  521. dec.Reset(f)
  522. return nil
  523. case fsnotify.Rename, fsnotify.Remove:
  524. select {
  525. case <-notifyRotate:
  526. case <-logWatcher.WatchProducerGone():
  527. return errDone
  528. case <-logWatcher.WatchConsumerGone():
  529. return errDone
  530. }
  531. if err := handleRotate(); err != nil {
  532. return err
  533. }
  534. return nil
  535. }
  536. return errRetry
  537. case err := <-fileWatcher.Errors():
  538. logrus.Debugf("logger got error watching file: %v", err)
  539. // Something happened, let's try and stay alive and create a new watcher
  540. if retries <= 5 {
  541. fileWatcher.Close()
  542. fileWatcher, err = watchFile(name)
  543. if err != nil {
  544. return err
  545. }
  546. retries++
  547. return errRetry
  548. }
  549. return err
  550. case <-logWatcher.WatchProducerGone():
  551. return errDone
  552. case <-logWatcher.WatchConsumerGone():
  553. return errDone
  554. }
  555. }
  556. oldSize := int64(-1)
  557. handleDecodeErr := func(err error) error {
  558. if !errors.Is(err, io.EOF) {
  559. return err
  560. }
  561. // Handle special case (#39235): max-file=1 and file was truncated
  562. st, stErr := f.Stat()
  563. if stErr == nil {
  564. size := st.Size()
  565. defer func() { oldSize = size }()
  566. if size < oldSize { // truncated
  567. f.Seek(0, 0)
  568. return nil
  569. }
  570. } else {
  571. logrus.WithError(stErr).Warn("logger: stat error")
  572. }
  573. for {
  574. err := waitRead()
  575. if err == nil {
  576. break
  577. }
  578. if err == errRetry {
  579. continue
  580. }
  581. return err
  582. }
  583. return nil
  584. }
  585. // main loop
  586. for {
  587. msg, err := dec.Decode()
  588. if err != nil {
  589. if err := handleDecodeErr(err); err != nil {
  590. if err == errDone {
  591. return
  592. }
  593. // we got an unrecoverable error, so return
  594. logWatcher.Err <- err
  595. return
  596. }
  597. // ready to try again
  598. continue
  599. }
  600. retries = 0 // reset retries since we've succeeded
  601. if !since.IsZero() && msg.Timestamp.Before(since) {
  602. continue
  603. }
  604. if !until.IsZero() && msg.Timestamp.After(until) {
  605. return
  606. }
  607. // send the message, unless the consumer is gone
  608. select {
  609. case logWatcher.Msg <- msg:
  610. case <-logWatcher.WatchConsumerGone():
  611. return
  612. }
  613. }
  614. }
  615. func watchFile(name string) (filenotify.FileWatcher, error) {
  616. var fileWatcher filenotify.FileWatcher
  617. if runtime.GOOS == "windows" {
  618. // FileWatcher on Windows files is based on the syscall notifications which has an issue because of file caching.
  619. // It is based on ReadDirectoryChangesW() which doesn't detect writes to the cache. It detects writes to disk only.
  620. // Because of the OS lazy writing, we don't get notifications for file writes and thereby the watcher
  621. // doesn't work. Hence for Windows we will use poll based notifier.
  622. fileWatcher = filenotify.NewPollingWatcher()
  623. } else {
  624. var err error
  625. fileWatcher, err = filenotify.New()
  626. if err != nil {
  627. return nil, err
  628. }
  629. }
  630. logger := logrus.WithFields(logrus.Fields{
  631. "module": "logger",
  632. "file": name,
  633. })
  634. if err := fileWatcher.Add(name); err != nil {
  635. // we will retry using file poller.
  636. logger.WithError(err).Warnf("falling back to file poller")
  637. fileWatcher.Close()
  638. fileWatcher = filenotify.NewPollingWatcher()
  639. if err := fileWatcher.Add(name); err != nil {
  640. fileWatcher.Close()
  641. logger.WithError(err).Debugf("error watching log file for modifications")
  642. return nil, err
  643. }
  644. }
  645. return fileWatcher, nil
  646. }
  647. type wrappedReaderAt struct {
  648. io.ReaderAt
  649. pos int64
  650. }
  651. func (r *wrappedReaderAt) Read(p []byte) (int, error) {
  652. n, err := r.ReaderAt.ReadAt(p, r.pos)
  653. r.pos += int64(n)
  654. return n, err
  655. }