adapter.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. package logger // import "github.com/docker/docker/daemon/logger"
  2. import (
  3. "io"
  4. "os"
  5. "path/filepath"
  6. "sync"
  7. "time"
  8. "github.com/docker/docker/api/types/plugins/logdriver"
  9. "github.com/docker/docker/pkg/plugingetter"
  10. "github.com/pkg/errors"
  11. "github.com/sirupsen/logrus"
  12. )
  13. // pluginAdapter takes a plugin and implements the Logger interface for logger
  14. // instances
  15. type pluginAdapter struct {
  16. driverName string
  17. id string
  18. plugin logPlugin
  19. fifoPath string
  20. capabilities Capability
  21. logInfo Info
  22. // synchronize access to the log stream and shared buffer
  23. mu sync.Mutex
  24. enc logdriver.LogEntryEncoder
  25. stream io.WriteCloser
  26. // buf is shared for each `Log()` call to reduce allocations.
  27. // buf must be protected by mutex
  28. buf logdriver.LogEntry
  29. }
  30. func (a *pluginAdapter) Log(msg *Message) error {
  31. a.mu.Lock()
  32. a.buf.Line = msg.Line
  33. a.buf.TimeNano = msg.Timestamp.UnixNano()
  34. a.buf.Partial = msg.PLogMetaData != nil
  35. a.buf.Source = msg.Source
  36. if msg.PLogMetaData != nil {
  37. a.buf.PartialLogMetadata = &logdriver.PartialLogEntryMetadata{
  38. Id: msg.PLogMetaData.ID,
  39. Last: msg.PLogMetaData.Last,
  40. Ordinal: int32(msg.PLogMetaData.Ordinal),
  41. }
  42. }
  43. err := a.enc.Encode(&a.buf)
  44. a.buf.Reset()
  45. a.mu.Unlock()
  46. PutMessage(msg)
  47. return err
  48. }
  49. func (a *pluginAdapter) Name() string {
  50. return a.driverName
  51. }
  52. func (a *pluginAdapter) Close() error {
  53. a.mu.Lock()
  54. defer a.mu.Unlock()
  55. if err := a.plugin.StopLogging(filepath.Join("/", "run", "docker", "logging", a.id)); err != nil {
  56. return err
  57. }
  58. if err := a.stream.Close(); err != nil {
  59. logrus.WithError(err).Error("error closing plugin fifo")
  60. }
  61. if err := os.Remove(a.fifoPath); err != nil && !os.IsNotExist(err) {
  62. logrus.WithError(err).Error("error cleaning up plugin fifo")
  63. }
  64. // may be nil, especially for unit tests
  65. if pluginGetter != nil {
  66. pluginGetter.Get(a.Name(), extName, plugingetter.Release)
  67. }
  68. return nil
  69. }
  70. type pluginAdapterWithRead struct {
  71. *pluginAdapter
  72. }
  73. func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher {
  74. watcher := NewLogWatcher()
  75. go func() {
  76. defer close(watcher.Msg)
  77. stream, err := a.plugin.ReadLogs(a.logInfo, config)
  78. if err != nil {
  79. watcher.Err <- errors.Wrap(err, "error getting log reader")
  80. return
  81. }
  82. defer stream.Close()
  83. dec := logdriver.NewLogEntryDecoder(stream)
  84. for {
  85. var buf logdriver.LogEntry
  86. if err := dec.Decode(&buf); err != nil {
  87. if err == io.EOF {
  88. return
  89. }
  90. watcher.Err <- errors.Wrap(err, "error decoding log message")
  91. return
  92. }
  93. msg := &Message{
  94. Timestamp: time.Unix(0, buf.TimeNano),
  95. Line: buf.Line,
  96. Source: buf.Source,
  97. }
  98. // plugin should handle this, but check just in case
  99. if !config.Since.IsZero() && msg.Timestamp.Before(config.Since) {
  100. continue
  101. }
  102. if !config.Until.IsZero() && msg.Timestamp.After(config.Until) {
  103. return
  104. }
  105. // send the message unless the consumer is gone
  106. select {
  107. case watcher.Msg <- msg:
  108. case <-watcher.WatchConsumerGone():
  109. return
  110. }
  111. }
  112. }()
  113. return watcher
  114. }