ring.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. package logger // import "github.com/docker/docker/daemon/logger"
  2. import (
  3. "errors"
  4. "sync"
  5. "sync/atomic"
  6. )
  7. const (
  8. defaultRingMaxSize = 1e6 // 1MB
  9. )
  10. // RingLogger is a ring buffer that implements the Logger interface.
  11. // This is used when lossy logging is OK.
  12. type RingLogger struct {
  13. buffer *messageRing
  14. l Logger
  15. logInfo Info
  16. closeFlag int32
  17. wg sync.WaitGroup
  18. }
  19. var _ SizedLogger = &RingLogger{}
  20. type ringWithReader struct {
  21. *RingLogger
  22. }
  23. func (r *ringWithReader) ReadLogs(cfg ReadConfig) *LogWatcher {
  24. reader, ok := r.l.(LogReader)
  25. if !ok {
  26. // something is wrong if we get here
  27. panic("expected log reader")
  28. }
  29. return reader.ReadLogs(cfg)
  30. }
  31. func newRingLogger(driver Logger, logInfo Info, maxSize int64) *RingLogger {
  32. l := &RingLogger{
  33. buffer: newRing(maxSize),
  34. l: driver,
  35. logInfo: logInfo,
  36. }
  37. l.wg.Add(1)
  38. go l.run()
  39. return l
  40. }
  41. // NewRingLogger creates a new Logger that is implemented as a RingBuffer wrapping
  42. // the passed in logger.
  43. func NewRingLogger(driver Logger, logInfo Info, maxSize int64) Logger {
  44. if maxSize < 0 {
  45. maxSize = defaultRingMaxSize
  46. }
  47. l := newRingLogger(driver, logInfo, maxSize)
  48. if _, ok := driver.(LogReader); ok {
  49. return &ringWithReader{l}
  50. }
  51. return l
  52. }
  53. // BufSize returns the buffer size of the underlying logger.
  54. // Returns -1 if the logger doesn't match SizedLogger interface.
  55. func (r *RingLogger) BufSize() int {
  56. if sl, ok := r.l.(SizedLogger); ok {
  57. return sl.BufSize()
  58. }
  59. return -1
  60. }
  61. // Log queues messages into the ring buffer
  62. func (r *RingLogger) Log(msg *Message) error {
  63. if r.closed() {
  64. return errClosed
  65. }
  66. return r.buffer.Enqueue(msg)
  67. }
  68. // Name returns the name of the underlying logger
  69. func (r *RingLogger) Name() string {
  70. return r.l.Name()
  71. }
  72. func (r *RingLogger) closed() bool {
  73. return atomic.LoadInt32(&r.closeFlag) == 1
  74. }
  75. func (r *RingLogger) setClosed() {
  76. atomic.StoreInt32(&r.closeFlag, 1)
  77. }
  78. // Close closes the logger
  79. func (r *RingLogger) Close() error {
  80. r.setClosed()
  81. r.buffer.Close()
  82. r.wg.Wait()
  83. // empty out the queue
  84. var logErr bool
  85. for _, msg := range r.buffer.Drain() {
  86. if logErr {
  87. // some error logging a previous message, so re-insert to message pool
  88. // and assume log driver is hosed
  89. PutMessage(msg)
  90. continue
  91. }
  92. if err := r.l.Log(msg); err != nil {
  93. logDriverError(r.l.Name(), string(msg.Line), err)
  94. logErr = true
  95. }
  96. }
  97. return r.l.Close()
  98. }
  99. // run consumes messages from the ring buffer and forwards them to the underling
  100. // logger.
  101. // This is run in a goroutine when the RingLogger is created
  102. func (r *RingLogger) run() {
  103. defer r.wg.Done()
  104. for {
  105. if r.closed() {
  106. return
  107. }
  108. msg, err := r.buffer.Dequeue()
  109. if err != nil {
  110. // buffer is closed
  111. return
  112. }
  113. if err := r.l.Log(msg); err != nil {
  114. logDriverError(r.l.Name(), string(msg.Line), err)
  115. }
  116. }
  117. }
  118. type messageRing struct {
  119. mu sync.Mutex
  120. // signals callers of `Dequeue` to wake up either on `Close` or when a new `Message` is added
  121. wait *sync.Cond
  122. sizeBytes int64 // current buffer size
  123. maxBytes int64 // max buffer size size
  124. queue []*Message
  125. closed bool
  126. }
  127. func newRing(maxBytes int64) *messageRing {
  128. queueSize := 1000
  129. if maxBytes == 0 || maxBytes == 1 {
  130. // With 0 or 1 max byte size, the maximum size of the queue would only ever be 1
  131. // message long.
  132. queueSize = 1
  133. }
  134. r := &messageRing{queue: make([]*Message, 0, queueSize), maxBytes: maxBytes}
  135. r.wait = sync.NewCond(&r.mu)
  136. return r
  137. }
  138. // Enqueue adds a message to the buffer queue
  139. // If the message is too big for the buffer it drops the new message.
  140. // If there are no messages in the queue and the message is still too big, it adds the message anyway.
  141. func (r *messageRing) Enqueue(m *Message) error {
  142. mSize := int64(len(m.Line))
  143. r.mu.Lock()
  144. if r.closed {
  145. r.mu.Unlock()
  146. return errClosed
  147. }
  148. if mSize+r.sizeBytes > r.maxBytes && len(r.queue) > 0 {
  149. r.wait.Signal()
  150. r.mu.Unlock()
  151. return nil
  152. }
  153. r.queue = append(r.queue, m)
  154. r.sizeBytes += mSize
  155. r.wait.Signal()
  156. r.mu.Unlock()
  157. return nil
  158. }
  159. // Dequeue pulls a message off the queue
  160. // If there are no messages, it waits for one.
  161. // If the buffer is closed, it will return immediately.
  162. func (r *messageRing) Dequeue() (*Message, error) {
  163. r.mu.Lock()
  164. for len(r.queue) == 0 && !r.closed {
  165. r.wait.Wait()
  166. }
  167. if r.closed {
  168. r.mu.Unlock()
  169. return nil, errClosed
  170. }
  171. msg := r.queue[0]
  172. r.queue = r.queue[1:]
  173. r.sizeBytes -= int64(len(msg.Line))
  174. r.mu.Unlock()
  175. return msg, nil
  176. }
  177. var errClosed = errors.New("closed")
  178. // Close closes the buffer ensuring no new messages can be added.
  179. // Any callers waiting to dequeue a message will be woken up.
  180. func (r *messageRing) Close() {
  181. r.mu.Lock()
  182. if r.closed {
  183. r.mu.Unlock()
  184. return
  185. }
  186. r.closed = true
  187. r.wait.Broadcast()
  188. r.mu.Unlock()
  189. }
  190. // Drain drains all messages from the queue.
  191. // This can be used after `Close()` to get any remaining messages that were in queue.
  192. func (r *messageRing) Drain() []*Message {
  193. r.mu.Lock()
  194. ls := make([]*Message, 0, len(r.queue))
  195. ls = append(ls, r.queue...)
  196. r.sizeBytes = 0
  197. r.queue = r.queue[:0]
  198. r.mu.Unlock()
  199. return ls
  200. }