adapter.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. package logger
  2. import (
  3. "io"
  4. "os"
  5. "sync"
  6. "time"
  7. "github.com/Sirupsen/logrus"
  8. "github.com/docker/docker/api/types/plugins/logdriver"
  9. "github.com/docker/docker/pkg/plugingetter"
  10. "github.com/pkg/errors"
  11. )
  12. // pluginAdapter takes a plugin and implements the Logger interface for logger
  13. // instances
  14. type pluginAdapter struct {
  15. driverName string
  16. id string
  17. plugin logPlugin
  18. fifoPath string
  19. capabilities Capability
  20. logInfo Info
  21. // synchronize access to the log stream and shared buffer
  22. mu sync.Mutex
  23. enc logdriver.LogEntryEncoder
  24. stream io.WriteCloser
  25. // buf is shared for each `Log()` call to reduce allocations.
  26. // buf must be protected by mutex
  27. buf logdriver.LogEntry
  28. }
  29. func (a *pluginAdapter) Log(msg *Message) error {
  30. a.mu.Lock()
  31. a.buf.Line = msg.Line
  32. a.buf.TimeNano = msg.Timestamp.UnixNano()
  33. a.buf.Partial = msg.Partial
  34. a.buf.Source = msg.Source
  35. err := a.enc.Encode(&a.buf)
  36. a.buf.Reset()
  37. a.mu.Unlock()
  38. PutMessage(msg)
  39. return err
  40. }
  41. func (a *pluginAdapter) Name() string {
  42. return a.driverName
  43. }
  44. func (a *pluginAdapter) Close() error {
  45. a.mu.Lock()
  46. defer a.mu.Unlock()
  47. if err := a.plugin.StopLogging(a.fifoPath); err != nil {
  48. return err
  49. }
  50. if err := a.stream.Close(); err != nil {
  51. logrus.WithError(err).Error("error closing plugin fifo")
  52. }
  53. if err := os.Remove(a.fifoPath); err != nil && !os.IsNotExist(err) {
  54. logrus.WithError(err).Error("error cleaning up plugin fifo")
  55. }
  56. // may be nil, especially for unit tests
  57. if pluginGetter != nil {
  58. pluginGetter.Get(a.Name(), extName, plugingetter.Release)
  59. }
  60. return nil
  61. }
  62. type pluginAdapterWithRead struct {
  63. *pluginAdapter
  64. }
  65. func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher {
  66. watcher := NewLogWatcher()
  67. go func() {
  68. defer close(watcher.Msg)
  69. stream, err := a.plugin.ReadLogs(a.logInfo, config)
  70. if err != nil {
  71. watcher.Err <- errors.Wrap(err, "error getting log reader")
  72. return
  73. }
  74. defer stream.Close()
  75. dec := logdriver.NewLogEntryDecoder(stream)
  76. for {
  77. select {
  78. case <-watcher.WatchClose():
  79. return
  80. default:
  81. }
  82. var buf logdriver.LogEntry
  83. if err := dec.Decode(&buf); err != nil {
  84. if err == io.EOF {
  85. return
  86. }
  87. select {
  88. case watcher.Err <- errors.Wrap(err, "error decoding log message"):
  89. case <-watcher.WatchClose():
  90. }
  91. return
  92. }
  93. msg := &Message{
  94. Timestamp: time.Unix(0, buf.TimeNano),
  95. Line: buf.Line,
  96. Source: buf.Source,
  97. }
  98. // plugin should handle this, but check just in case
  99. if !config.Since.IsZero() && msg.Timestamp.Before(config.Since) {
  100. continue
  101. }
  102. select {
  103. case watcher.Msg <- msg:
  104. case <-watcher.WatchClose():
  105. // make sure the message we consumed is sent
  106. watcher.Msg <- msg
  107. return
  108. }
  109. }
  110. }()
  111. return watcher
  112. }