adapter.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. package logger // import "github.com/docker/docker/daemon/logger"
  2. import (
  3. "context"
  4. "io"
  5. "os"
  6. "path/filepath"
  7. "sync"
  8. "time"
  9. "github.com/containerd/log"
  10. "github.com/docker/docker/api/types/plugins/logdriver"
  11. "github.com/docker/docker/pkg/plugingetter"
  12. "github.com/pkg/errors"
  13. )
  14. // pluginAdapter takes a plugin and implements the Logger interface for logger
  15. // instances
  16. type pluginAdapter struct {
  17. driverName string
  18. id string
  19. plugin logPlugin
  20. fifoPath string
  21. capabilities Capability
  22. logInfo Info
  23. // synchronize access to the log stream and shared buffer
  24. mu sync.Mutex
  25. enc logdriver.LogEntryEncoder
  26. stream io.WriteCloser
  27. // buf is shared for each `Log()` call to reduce allocations.
  28. // buf must be protected by mutex
  29. buf logdriver.LogEntry
  30. }
  31. func (a *pluginAdapter) Log(msg *Message) error {
  32. a.mu.Lock()
  33. a.buf.Line = msg.Line
  34. a.buf.TimeNano = msg.Timestamp.UnixNano()
  35. a.buf.Partial = msg.PLogMetaData != nil
  36. a.buf.Source = msg.Source
  37. if msg.PLogMetaData != nil {
  38. a.buf.PartialLogMetadata = &logdriver.PartialLogEntryMetadata{
  39. Id: msg.PLogMetaData.ID,
  40. Last: msg.PLogMetaData.Last,
  41. Ordinal: int32(msg.PLogMetaData.Ordinal),
  42. }
  43. }
  44. err := a.enc.Encode(&a.buf)
  45. a.buf.Reset()
  46. a.mu.Unlock()
  47. PutMessage(msg)
  48. return err
  49. }
  50. func (a *pluginAdapter) Name() string {
  51. return a.driverName
  52. }
  53. func (a *pluginAdapter) Close() error {
  54. a.mu.Lock()
  55. defer a.mu.Unlock()
  56. if err := a.plugin.StopLogging(filepath.Join("/", "run", "docker", "logging", a.id)); err != nil {
  57. return err
  58. }
  59. if err := a.stream.Close(); err != nil {
  60. log.G(context.TODO()).WithError(err).Error("error closing plugin fifo")
  61. }
  62. if err := os.Remove(a.fifoPath); err != nil && !os.IsNotExist(err) {
  63. log.G(context.TODO()).WithError(err).Error("error cleaning up plugin fifo")
  64. }
  65. // may be nil, especially for unit tests
  66. if pluginGetter != nil {
  67. pluginGetter.Get(a.Name(), extName, plugingetter.Release)
  68. }
  69. return nil
  70. }
  71. type pluginAdapterWithRead struct {
  72. *pluginAdapter
  73. }
  74. func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher {
  75. watcher := NewLogWatcher()
  76. go func() {
  77. defer close(watcher.Msg)
  78. stream, err := a.plugin.ReadLogs(a.logInfo, config)
  79. if err != nil {
  80. watcher.Err <- errors.Wrap(err, "error getting log reader")
  81. return
  82. }
  83. defer stream.Close()
  84. dec := logdriver.NewLogEntryDecoder(stream)
  85. for {
  86. var buf logdriver.LogEntry
  87. if err := dec.Decode(&buf); err != nil {
  88. if err == io.EOF {
  89. return
  90. }
  91. watcher.Err <- errors.Wrap(err, "error decoding log message")
  92. return
  93. }
  94. msg := &Message{
  95. Timestamp: time.Unix(0, buf.TimeNano),
  96. Line: buf.Line,
  97. Source: buf.Source,
  98. }
  99. // plugin should handle this, but check just in case
  100. if !config.Since.IsZero() && msg.Timestamp.Before(config.Since) {
  101. continue
  102. }
  103. if !config.Until.IsZero() && msg.Timestamp.After(config.Until) {
  104. return
  105. }
  106. // send the message unless the consumer is gone
  107. select {
  108. case watcher.Msg <- msg:
  109. case <-watcher.WatchConsumerGone():
  110. return
  111. }
  112. }
  113. }()
  114. return watcher
  115. }