123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190 |
- package logger // import "github.com/docker/docker/daemon/logger"
- import (
- "bytes"
- "context"
- "io"
- "sync"
- "time"
- "github.com/containerd/log"
- types "github.com/docker/docker/api/types/backend"
- "github.com/docker/docker/pkg/stringid"
- )
- const (
- // readSize is the maximum bytes read during a single read
- // operation.
- readSize = 2 * 1024
- // defaultBufSize provides a reasonable default for loggers that do
- // not have an external limit to impose on log line size.
- defaultBufSize = 16 * 1024
- )
- // Copier can copy logs from specified sources to Logger and attach Timestamp.
- // Writes are concurrent, so you need implement some sync in your logger.
- type Copier struct {
- // srcs is map of name -> reader pairs, for example "stdout", "stderr"
- srcs map[string]io.Reader
- dst Logger
- copyJobs sync.WaitGroup
- closeOnce sync.Once
- closed chan struct{}
- }
- // NewCopier creates a new Copier
- func NewCopier(srcs map[string]io.Reader, dst Logger) *Copier {
- return &Copier{
- srcs: srcs,
- dst: dst,
- closed: make(chan struct{}),
- }
- }
- // Run starts logs copying
- func (c *Copier) Run() {
- for src, w := range c.srcs {
- c.copyJobs.Add(1)
- go c.copySrc(src, w)
- }
- }
- func (c *Copier) copySrc(name string, src io.Reader) {
- defer c.copyJobs.Done()
- bufSize := defaultBufSize
- if sizedLogger, ok := c.dst.(SizedLogger); ok {
- size := sizedLogger.BufSize()
- // Loggers that wrap another loggers would have BufSize(), but cannot return the size
- // when the wrapped loggers doesn't have BufSize().
- if size > 0 {
- bufSize = size
- }
- }
- buf := make([]byte, bufSize)
- n := 0
- eof := false
- var partialid string
- var partialTS time.Time
- var ordinal int
- firstPartial := true
- hasMorePartial := false
- for {
- select {
- case <-c.closed:
- return
- default:
- // Work out how much more data we are okay with reading this time.
- upto := n + readSize
- if upto > cap(buf) {
- upto = cap(buf)
- }
- // Try to read that data.
- if upto > n {
- read, err := src.Read(buf[n:upto])
- if err != nil {
- if err != io.EOF {
- logReadsFailedCount.Inc(1)
- log.G(context.TODO()).Errorf("Error scanning log stream: %s", err)
- return
- }
- eof = true
- }
- n += read
- }
- // If we have no data to log, and there's no more coming, we're done.
- if n == 0 && eof {
- return
- }
- // Break up the data that we've buffered up into lines, and log each in turn.
- p := 0
- for q := bytes.IndexByte(buf[p:n], '\n'); q >= 0; q = bytes.IndexByte(buf[p:n], '\n') {
- select {
- case <-c.closed:
- return
- default:
- msg := NewMessage()
- msg.Source = name
- msg.Line = append(msg.Line, buf[p:p+q]...)
- if hasMorePartial {
- msg.PLogMetaData = &types.PartialLogMetaData{ID: partialid, Ordinal: ordinal, Last: true}
- // reset
- partialid = ""
- ordinal = 0
- firstPartial = true
- hasMorePartial = false
- }
- if msg.PLogMetaData == nil {
- msg.Timestamp = time.Now().UTC()
- } else {
- msg.Timestamp = partialTS
- }
- if logErr := c.dst.Log(msg); logErr != nil {
- logDriverError(c.dst.Name(), string(msg.Line), logErr)
- }
- }
- p += q + 1
- }
- // If there's no more coming, or the buffer is full but
- // has no newlines, log whatever we haven't logged yet,
- // noting that it's a partial log line.
- if eof || (p == 0 && n == len(buf)) {
- if p < n {
- msg := NewMessage()
- msg.Source = name
- msg.Line = append(msg.Line, buf[p:n]...)
- // Generate unique partialID for first partial. Use it across partials.
- // Record timestamp for first partial. Use it across partials.
- // Initialize Ordinal for first partial. Increment it across partials.
- if firstPartial {
- msg.Timestamp = time.Now().UTC()
- partialTS = msg.Timestamp
- partialid = stringid.GenerateRandomID()
- ordinal = 1
- firstPartial = false
- totalPartialLogs.Inc(1)
- } else {
- msg.Timestamp = partialTS
- }
- msg.PLogMetaData = &types.PartialLogMetaData{ID: partialid, Ordinal: ordinal, Last: false}
- ordinal++
- hasMorePartial = true
- if logErr := c.dst.Log(msg); logErr != nil {
- logDriverError(c.dst.Name(), string(msg.Line), logErr)
- }
- p = 0
- n = 0
- }
- if eof {
- return
- }
- }
- // Move any unlogged data to the front of the buffer in preparation for another read.
- if p > 0 {
- copy(buf[0:], buf[p:n])
- n -= p
- }
- }
- }
- }
- // Wait waits until all copying is done
- func (c *Copier) Wait() {
- c.copyJobs.Wait()
- }
- // Close closes the copier
- func (c *Copier) Close() {
- c.closeOnce.Do(func() {
- close(c.closed)
- })
- }
|