ring.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. package logger // import "github.com/docker/docker/daemon/logger"
  2. import (
  3. "errors"
  4. "sync"
  5. "sync/atomic"
  6. "github.com/sirupsen/logrus"
  7. )
  8. const (
  9. defaultRingMaxSize = 1e6 // 1MB
  10. )
  11. // RingLogger is a ring buffer that implements the Logger interface.
  12. // This is used when lossy logging is OK.
  13. type RingLogger struct {
  14. buffer *messageRing
  15. l Logger
  16. logInfo Info
  17. closeFlag int32
  18. wg sync.WaitGroup
  19. }
  20. var _ SizedLogger = &RingLogger{}
  21. type ringWithReader struct {
  22. *RingLogger
  23. }
  24. func (r *ringWithReader) ReadLogs(cfg ReadConfig) *LogWatcher {
  25. reader, ok := r.l.(LogReader)
  26. if !ok {
  27. // something is wrong if we get here
  28. panic("expected log reader")
  29. }
  30. return reader.ReadLogs(cfg)
  31. }
  32. func newRingLogger(driver Logger, logInfo Info, maxSize int64) *RingLogger {
  33. l := &RingLogger{
  34. buffer: newRing(maxSize),
  35. l: driver,
  36. logInfo: logInfo,
  37. }
  38. l.wg.Add(1)
  39. go l.run()
  40. return l
  41. }
  42. // NewRingLogger creates a new Logger that is implemented as a RingBuffer wrapping
  43. // the passed in logger.
  44. func NewRingLogger(driver Logger, logInfo Info, maxSize int64) Logger {
  45. if maxSize < 0 {
  46. maxSize = defaultRingMaxSize
  47. }
  48. l := newRingLogger(driver, logInfo, maxSize)
  49. if _, ok := driver.(LogReader); ok {
  50. return &ringWithReader{l}
  51. }
  52. return l
  53. }
  54. // BufSize returns the buffer size of the underlying logger.
  55. // Returns -1 if the logger doesn't match SizedLogger interface.
  56. func (r *RingLogger) BufSize() int {
  57. if sl, ok := r.l.(SizedLogger); ok {
  58. return sl.BufSize()
  59. }
  60. return -1
  61. }
  62. // Log queues messages into the ring buffer
  63. func (r *RingLogger) Log(msg *Message) error {
  64. if r.closed() {
  65. return errClosed
  66. }
  67. return r.buffer.Enqueue(msg)
  68. }
  69. // Name returns the name of the underlying logger
  70. func (r *RingLogger) Name() string {
  71. return r.l.Name()
  72. }
  73. func (r *RingLogger) closed() bool {
  74. return atomic.LoadInt32(&r.closeFlag) == 1
  75. }
  76. func (r *RingLogger) setClosed() {
  77. atomic.StoreInt32(&r.closeFlag, 1)
  78. }
  79. // Close closes the logger
  80. func (r *RingLogger) Close() error {
  81. r.setClosed()
  82. r.buffer.Close()
  83. r.wg.Wait()
  84. // empty out the queue
  85. var logErr bool
  86. for _, msg := range r.buffer.Drain() {
  87. if logErr {
  88. // some error logging a previous message, so re-insert to message pool
  89. // and assume log driver is hosed
  90. PutMessage(msg)
  91. continue
  92. }
  93. if err := r.l.Log(msg); err != nil {
  94. logrus.WithField("driver", r.l.Name()).
  95. WithField("container", r.logInfo.ContainerID).
  96. WithError(err).
  97. Errorf("Error writing log message")
  98. logErr = true
  99. }
  100. }
  101. return r.l.Close()
  102. }
  103. // run consumes messages from the ring buffer and forwards them to the underling
  104. // logger.
  105. // This is run in a goroutine when the RingLogger is created
  106. func (r *RingLogger) run() {
  107. defer r.wg.Done()
  108. for {
  109. if r.closed() {
  110. return
  111. }
  112. msg, err := r.buffer.Dequeue()
  113. if err != nil {
  114. // buffer is closed
  115. return
  116. }
  117. if err := r.l.Log(msg); err != nil {
  118. logrus.WithField("driver", r.l.Name()).
  119. WithField("container", r.logInfo.ContainerID).
  120. WithError(err).
  121. Errorf("Error writing log message")
  122. }
  123. }
  124. }
  125. type messageRing struct {
  126. mu sync.Mutex
  127. // signals callers of `Dequeue` to wake up either on `Close` or when a new `Message` is added
  128. wait *sync.Cond
  129. sizeBytes int64 // current buffer size
  130. maxBytes int64 // max buffer size size
  131. queue []*Message
  132. closed bool
  133. }
  134. func newRing(maxBytes int64) *messageRing {
  135. queueSize := 1000
  136. if maxBytes == 0 || maxBytes == 1 {
  137. // With 0 or 1 max byte size, the maximum size of the queue would only ever be 1
  138. // message long.
  139. queueSize = 1
  140. }
  141. r := &messageRing{queue: make([]*Message, 0, queueSize), maxBytes: maxBytes}
  142. r.wait = sync.NewCond(&r.mu)
  143. return r
  144. }
  145. // Enqueue adds a message to the buffer queue
  146. // If the message is too big for the buffer it drops the new message.
  147. // If there are no messages in the queue and the message is still too big, it adds the message anyway.
  148. func (r *messageRing) Enqueue(m *Message) error {
  149. mSize := int64(len(m.Line))
  150. r.mu.Lock()
  151. if r.closed {
  152. r.mu.Unlock()
  153. return errClosed
  154. }
  155. if mSize+r.sizeBytes > r.maxBytes && len(r.queue) > 0 {
  156. r.wait.Signal()
  157. r.mu.Unlock()
  158. return nil
  159. }
  160. r.queue = append(r.queue, m)
  161. r.sizeBytes += mSize
  162. r.wait.Signal()
  163. r.mu.Unlock()
  164. return nil
  165. }
  166. // Dequeue pulls a message off the queue
  167. // If there are no messages, it waits for one.
  168. // If the buffer is closed, it will return immediately.
  169. func (r *messageRing) Dequeue() (*Message, error) {
  170. r.mu.Lock()
  171. for len(r.queue) == 0 && !r.closed {
  172. r.wait.Wait()
  173. }
  174. if r.closed {
  175. r.mu.Unlock()
  176. return nil, errClosed
  177. }
  178. msg := r.queue[0]
  179. r.queue = r.queue[1:]
  180. r.sizeBytes -= int64(len(msg.Line))
  181. r.mu.Unlock()
  182. return msg, nil
  183. }
  184. var errClosed = errors.New("closed")
  185. // Close closes the buffer ensuring no new messages can be added.
  186. // Any callers waiting to dequeue a message will be woken up.
  187. func (r *messageRing) Close() {
  188. r.mu.Lock()
  189. if r.closed {
  190. r.mu.Unlock()
  191. return
  192. }
  193. r.closed = true
  194. r.wait.Broadcast()
  195. r.mu.Unlock()
  196. }
  197. // Drain drains all messages from the queue.
  198. // This can be used after `Close()` to get any remaining messages that were in queue.
  199. func (r *messageRing) Drain() []*Message {
  200. r.mu.Lock()
  201. ls := make([]*Message, 0, len(r.queue))
  202. ls = append(ls, r.queue...)
  203. r.sizeBytes = 0
  204. r.queue = r.queue[:0]
  205. r.mu.Unlock()
  206. return ls
  207. }