adapter.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. package logger // import "github.com/docker/docker/daemon/logger"
  2. import (
  3. "io"
  4. "os"
  5. "path/filepath"
  6. "sync"
  7. "time"
  8. "github.com/docker/docker/api/types/plugins/logdriver"
  9. "github.com/docker/docker/pkg/plugingetter"
  10. "github.com/pkg/errors"
  11. "github.com/sirupsen/logrus"
  12. )
  13. // pluginAdapter takes a plugin and implements the Logger interface for logger
  14. // instances
  15. type pluginAdapter struct {
  16. driverName string
  17. id string
  18. plugin logPlugin
  19. fifoPath string
  20. capabilities Capability
  21. logInfo Info
  22. // synchronize access to the log stream and shared buffer
  23. mu sync.Mutex
  24. enc logdriver.LogEntryEncoder
  25. stream io.WriteCloser
  26. // buf is shared for each `Log()` call to reduce allocations.
  27. // buf must be protected by mutex
  28. buf logdriver.LogEntry
  29. }
  30. func (a *pluginAdapter) Log(msg *Message) error {
  31. a.mu.Lock()
  32. a.buf.Line = msg.Line
  33. a.buf.TimeNano = msg.Timestamp.UnixNano()
  34. a.buf.Partial = msg.PLogMetaData != nil
  35. a.buf.Source = msg.Source
  36. err := a.enc.Encode(&a.buf)
  37. a.buf.Reset()
  38. a.mu.Unlock()
  39. PutMessage(msg)
  40. return err
  41. }
  42. func (a *pluginAdapter) Name() string {
  43. return a.driverName
  44. }
  45. func (a *pluginAdapter) Close() error {
  46. a.mu.Lock()
  47. defer a.mu.Unlock()
  48. if err := a.plugin.StopLogging(filepath.Join("/", "run", "docker", "logging", a.id)); err != nil {
  49. return err
  50. }
  51. if err := a.stream.Close(); err != nil {
  52. logrus.WithError(err).Error("error closing plugin fifo")
  53. }
  54. if err := os.Remove(a.fifoPath); err != nil && !os.IsNotExist(err) {
  55. logrus.WithError(err).Error("error cleaning up plugin fifo")
  56. }
  57. // may be nil, especially for unit tests
  58. if pluginGetter != nil {
  59. pluginGetter.Get(a.Name(), extName, plugingetter.Release)
  60. }
  61. return nil
  62. }
  63. type pluginAdapterWithRead struct {
  64. *pluginAdapter
  65. }
  66. func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher {
  67. watcher := NewLogWatcher()
  68. go func() {
  69. defer close(watcher.Msg)
  70. stream, err := a.plugin.ReadLogs(a.logInfo, config)
  71. if err != nil {
  72. watcher.Err <- errors.Wrap(err, "error getting log reader")
  73. return
  74. }
  75. defer stream.Close()
  76. dec := logdriver.NewLogEntryDecoder(stream)
  77. for {
  78. select {
  79. case <-watcher.WatchClose():
  80. return
  81. default:
  82. }
  83. var buf logdriver.LogEntry
  84. if err := dec.Decode(&buf); err != nil {
  85. if err == io.EOF {
  86. return
  87. }
  88. select {
  89. case watcher.Err <- errors.Wrap(err, "error decoding log message"):
  90. case <-watcher.WatchClose():
  91. }
  92. return
  93. }
  94. msg := &Message{
  95. Timestamp: time.Unix(0, buf.TimeNano),
  96. Line: buf.Line,
  97. Source: buf.Source,
  98. }
  99. // plugin should handle this, but check just in case
  100. if !config.Since.IsZero() && msg.Timestamp.Before(config.Since) {
  101. continue
  102. }
  103. if !config.Until.IsZero() && msg.Timestamp.After(config.Until) {
  104. return
  105. }
  106. select {
  107. case watcher.Msg <- msg:
  108. case <-watcher.WatchClose():
  109. // make sure the message we consumed is sent
  110. watcher.Msg <- msg
  111. return
  112. }
  113. }
  114. }()
  115. return watcher
  116. }