adapter.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. package logger
  2. import (
  3. "io"
  4. "os"
  5. "strings"
  6. "sync"
  7. "time"
  8. "github.com/docker/docker/api/types/plugins/logdriver"
  9. "github.com/docker/docker/pkg/plugingetter"
  10. "github.com/pkg/errors"
  11. "github.com/sirupsen/logrus"
  12. )
  13. // pluginAdapter takes a plugin and implements the Logger interface for logger
  14. // instances
  15. type pluginAdapter struct {
  16. driverName string
  17. id string
  18. plugin logPlugin
  19. basePath string
  20. fifoPath string
  21. capabilities Capability
  22. logInfo Info
  23. // synchronize access to the log stream and shared buffer
  24. mu sync.Mutex
  25. enc logdriver.LogEntryEncoder
  26. stream io.WriteCloser
  27. // buf is shared for each `Log()` call to reduce allocations.
  28. // buf must be protected by mutex
  29. buf logdriver.LogEntry
  30. }
  31. func (a *pluginAdapter) Log(msg *Message) error {
  32. a.mu.Lock()
  33. a.buf.Line = msg.Line
  34. a.buf.TimeNano = msg.Timestamp.UnixNano()
  35. a.buf.Partial = msg.Partial
  36. a.buf.Source = msg.Source
  37. err := a.enc.Encode(&a.buf)
  38. a.buf.Reset()
  39. a.mu.Unlock()
  40. PutMessage(msg)
  41. return err
  42. }
  43. func (a *pluginAdapter) Name() string {
  44. return a.driverName
  45. }
  46. func (a *pluginAdapter) Close() error {
  47. a.mu.Lock()
  48. defer a.mu.Unlock()
  49. if err := a.plugin.StopLogging(strings.TrimPrefix(a.fifoPath, a.basePath)); err != nil {
  50. return err
  51. }
  52. if err := a.stream.Close(); err != nil {
  53. logrus.WithError(err).Error("error closing plugin fifo")
  54. }
  55. if err := os.Remove(a.fifoPath); err != nil && !os.IsNotExist(err) {
  56. logrus.WithError(err).Error("error cleaning up plugin fifo")
  57. }
  58. // may be nil, especially for unit tests
  59. if pluginGetter != nil {
  60. pluginGetter.Get(a.Name(), extName, plugingetter.Release)
  61. }
  62. return nil
  63. }
  64. type pluginAdapterWithRead struct {
  65. *pluginAdapter
  66. }
  67. func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher {
  68. watcher := NewLogWatcher()
  69. go func() {
  70. defer close(watcher.Msg)
  71. stream, err := a.plugin.ReadLogs(a.logInfo, config)
  72. if err != nil {
  73. watcher.Err <- errors.Wrap(err, "error getting log reader")
  74. return
  75. }
  76. defer stream.Close()
  77. dec := logdriver.NewLogEntryDecoder(stream)
  78. for {
  79. select {
  80. case <-watcher.WatchClose():
  81. return
  82. default:
  83. }
  84. var buf logdriver.LogEntry
  85. if err := dec.Decode(&buf); err != nil {
  86. if err == io.EOF {
  87. return
  88. }
  89. select {
  90. case watcher.Err <- errors.Wrap(err, "error decoding log message"):
  91. case <-watcher.WatchClose():
  92. }
  93. return
  94. }
  95. msg := &Message{
  96. Timestamp: time.Unix(0, buf.TimeNano),
  97. Line: buf.Line,
  98. Source: buf.Source,
  99. }
  100. // plugin should handle this, but check just in case
  101. if !config.Since.IsZero() && msg.Timestamp.Before(config.Since) {
  102. continue
  103. }
  104. select {
  105. case watcher.Msg <- msg:
  106. case <-watcher.WatchClose():
  107. // make sure the message we consumed is sent
  108. watcher.Msg <- msg
  109. return
  110. }
  111. }
  112. }()
  113. return watcher
  114. }