copier.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. package logger // import "github.com/docker/docker/daemon/logger"
  2. import (
  3. "bytes"
  4. "context"
  5. "io"
  6. "sync"
  7. "time"
  8. "github.com/containerd/log"
  9. types "github.com/docker/docker/api/types/backend"
  10. "github.com/docker/docker/pkg/stringid"
  11. )
  12. const (
  13. // readSize is the maximum bytes read during a single read
  14. // operation.
  15. readSize = 2 * 1024
  16. // defaultBufSize provides a reasonable default for loggers that do
  17. // not have an external limit to impose on log line size.
  18. defaultBufSize = 16 * 1024
  19. )
  20. // Copier can copy logs from specified sources to Logger and attach Timestamp.
  21. // Writes are concurrent, so you need implement some sync in your logger.
  22. type Copier struct {
  23. // srcs is map of name -> reader pairs, for example "stdout", "stderr"
  24. srcs map[string]io.Reader
  25. dst Logger
  26. copyJobs sync.WaitGroup
  27. closeOnce sync.Once
  28. closed chan struct{}
  29. }
  30. // NewCopier creates a new Copier
  31. func NewCopier(srcs map[string]io.Reader, dst Logger) *Copier {
  32. return &Copier{
  33. srcs: srcs,
  34. dst: dst,
  35. closed: make(chan struct{}),
  36. }
  37. }
  38. // Run starts logs copying
  39. func (c *Copier) Run() {
  40. for src, w := range c.srcs {
  41. c.copyJobs.Add(1)
  42. go c.copySrc(src, w)
  43. }
  44. }
  45. func (c *Copier) copySrc(name string, src io.Reader) {
  46. defer c.copyJobs.Done()
  47. bufSize := defaultBufSize
  48. if sizedLogger, ok := c.dst.(SizedLogger); ok {
  49. size := sizedLogger.BufSize()
  50. // Loggers that wrap another loggers would have BufSize(), but cannot return the size
  51. // when the wrapped loggers doesn't have BufSize().
  52. if size > 0 {
  53. bufSize = size
  54. }
  55. }
  56. buf := make([]byte, bufSize)
  57. n := 0
  58. eof := false
  59. var partialid string
  60. var partialTS time.Time
  61. var ordinal int
  62. firstPartial := true
  63. hasMorePartial := false
  64. for {
  65. select {
  66. case <-c.closed:
  67. return
  68. default:
  69. // Work out how much more data we are okay with reading this time.
  70. upto := n + readSize
  71. if upto > cap(buf) {
  72. upto = cap(buf)
  73. }
  74. // Try to read that data.
  75. if upto > n {
  76. read, err := src.Read(buf[n:upto])
  77. if err != nil {
  78. if err != io.EOF {
  79. logReadsFailedCount.Inc(1)
  80. log.G(context.TODO()).Errorf("Error scanning log stream: %s", err)
  81. return
  82. }
  83. eof = true
  84. }
  85. n += read
  86. }
  87. // If we have no data to log, and there's no more coming, we're done.
  88. if n == 0 && eof {
  89. return
  90. }
  91. // Break up the data that we've buffered up into lines, and log each in turn.
  92. p := 0
  93. for q := bytes.IndexByte(buf[p:n], '\n'); q >= 0; q = bytes.IndexByte(buf[p:n], '\n') {
  94. select {
  95. case <-c.closed:
  96. return
  97. default:
  98. msg := NewMessage()
  99. msg.Source = name
  100. msg.Line = append(msg.Line, buf[p:p+q]...)
  101. if hasMorePartial {
  102. msg.PLogMetaData = &types.PartialLogMetaData{ID: partialid, Ordinal: ordinal, Last: true}
  103. // reset
  104. partialid = ""
  105. ordinal = 0
  106. firstPartial = true
  107. hasMorePartial = false
  108. }
  109. if msg.PLogMetaData == nil {
  110. msg.Timestamp = time.Now().UTC()
  111. } else {
  112. msg.Timestamp = partialTS
  113. }
  114. if logErr := c.dst.Log(msg); logErr != nil {
  115. logDriverError(c.dst.Name(), string(msg.Line), logErr)
  116. }
  117. }
  118. p += q + 1
  119. }
  120. // If there's no more coming, or the buffer is full but
  121. // has no newlines, log whatever we haven't logged yet,
  122. // noting that it's a partial log line.
  123. if eof || (p == 0 && n == len(buf)) {
  124. if p < n {
  125. msg := NewMessage()
  126. msg.Source = name
  127. msg.Line = append(msg.Line, buf[p:n]...)
  128. // Generate unique partialID for first partial. Use it across partials.
  129. // Record timestamp for first partial. Use it across partials.
  130. // Initialize Ordinal for first partial. Increment it across partials.
  131. if firstPartial {
  132. msg.Timestamp = time.Now().UTC()
  133. partialTS = msg.Timestamp
  134. partialid = stringid.GenerateRandomID()
  135. ordinal = 1
  136. firstPartial = false
  137. totalPartialLogs.Inc(1)
  138. } else {
  139. msg.Timestamp = partialTS
  140. }
  141. msg.PLogMetaData = &types.PartialLogMetaData{ID: partialid, Ordinal: ordinal, Last: false}
  142. ordinal++
  143. hasMorePartial = true
  144. if logErr := c.dst.Log(msg); logErr != nil {
  145. logDriverError(c.dst.Name(), string(msg.Line), logErr)
  146. }
  147. p = 0
  148. n = 0
  149. }
  150. if eof {
  151. return
  152. }
  153. }
  154. // Move any unlogged data to the front of the buffer in preparation for another read.
  155. if p > 0 {
  156. copy(buf[0:], buf[p:n])
  157. n -= p
  158. }
  159. }
  160. }
  161. }
  162. // Wait waits until all copying is done
  163. func (c *Copier) Wait() {
  164. c.copyJobs.Wait()
  165. }
  166. // Close closes the copier
  167. func (c *Copier) Close() {
  168. c.closeOnce.Do(func() {
  169. close(c.closed)
  170. })
  171. }