local.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. package local // import "github.com/docker/docker/daemon/logger/local"
  2. import (
  3. "encoding/binary"
  4. "io"
  5. "math/bits"
  6. "strconv"
  7. "sync"
  8. "time"
  9. "github.com/docker/docker/api/types/backend"
  10. "github.com/docker/docker/api/types/plugins/logdriver"
  11. "github.com/docker/docker/daemon/logger"
  12. "github.com/docker/docker/daemon/logger/loggerutils"
  13. "github.com/docker/docker/errdefs"
  14. units "github.com/docker/go-units"
  15. "github.com/pkg/errors"
  16. )
  17. const (
  18. // Name is the name of the driver
  19. Name = "local"
  20. encodeBinaryLen = 4
  21. initialBufSize = 2048
  22. maxDecodeRetry = 20000
  23. defaultMaxFileSize int64 = 20 * 1024 * 1024
  24. defaultMaxFileCount = 5
  25. defaultCompressLogs = true
  26. )
  27. var buffersPool = sync.Pool{New: func() interface{} {
  28. b := make([]byte, initialBufSize)
  29. return &b
  30. }}
  31. // LogOptKeys are the keys names used for log opts passed in to initialize the driver.
  32. var LogOptKeys = map[string]bool{
  33. "max-file": true,
  34. "max-size": true,
  35. "compress": true,
  36. }
  37. // ValidateLogOpt looks for log driver specific options.
  38. func ValidateLogOpt(cfg map[string]string) error {
  39. for key := range cfg {
  40. if !LogOptKeys[key] {
  41. return errors.Errorf("unknown log opt '%s' for log driver %s", key, Name)
  42. }
  43. }
  44. return nil
  45. }
  46. func init() {
  47. if err := logger.RegisterLogDriver(Name, New); err != nil {
  48. panic(err)
  49. }
  50. if err := logger.RegisterLogOptValidator(Name, ValidateLogOpt); err != nil {
  51. panic(err)
  52. }
  53. }
  54. type driver struct {
  55. logfile *loggerutils.LogFile
  56. }
  57. // New creates a new local logger
  58. // You must provide the `LogPath` in the passed in info argument, this is the file path that logs are written to.
  59. func New(info logger.Info) (logger.Logger, error) {
  60. if info.LogPath == "" {
  61. return nil, errdefs.System(errors.New("log path is missing -- this is a bug and should not happen"))
  62. }
  63. cfg := newDefaultConfig()
  64. if capacity, ok := info.Config["max-size"]; ok {
  65. var err error
  66. cfg.MaxFileSize, err = units.FromHumanSize(capacity)
  67. if err != nil {
  68. return nil, errdefs.InvalidParameter(errors.Wrapf(err, "invalid value for max-size: %s", capacity))
  69. }
  70. }
  71. if userMaxFileCount, ok := info.Config["max-file"]; ok {
  72. var err error
  73. cfg.MaxFileCount, err = strconv.Atoi(userMaxFileCount)
  74. if err != nil {
  75. return nil, errdefs.InvalidParameter(errors.Wrapf(err, "invalid value for max-file: %s", userMaxFileCount))
  76. }
  77. }
  78. if userCompress, ok := info.Config["compress"]; ok {
  79. compressLogs, err := strconv.ParseBool(userCompress)
  80. if err != nil {
  81. return nil, errdefs.InvalidParameter(errors.Wrap(err, "error reading compress log option"))
  82. }
  83. cfg.DisableCompression = !compressLogs
  84. }
  85. return newDriver(info.LogPath, cfg)
  86. }
  87. func marshal(m *logger.Message, buffer *[]byte) error {
  88. proto := logdriver.LogEntry{}
  89. md := logdriver.PartialLogEntryMetadata{}
  90. resetProto(&proto)
  91. messageToProto(m, &proto, &md)
  92. protoSize := proto.Size()
  93. writeLen := protoSize + (2 * encodeBinaryLen) // + len(messageDelimiter)
  94. buf := *buffer
  95. if writeLen > cap(buf) {
  96. // If we already need to reallocate the buffer, make it larger to be more reusable.
  97. // Round to the next power of two.
  98. capacity := 1 << (bits.Len(uint(writeLen)) + 1)
  99. buf = make([]byte, writeLen, capacity)
  100. } else {
  101. buf = buf[:writeLen]
  102. }
  103. *buffer = buf
  104. binary.BigEndian.PutUint32(buf[:encodeBinaryLen], uint32(protoSize))
  105. n, err := proto.MarshalTo(buf[encodeBinaryLen:writeLen])
  106. if err != nil {
  107. return errors.Wrap(err, "error marshaling log entry")
  108. }
  109. if n+(encodeBinaryLen*2) != writeLen {
  110. return io.ErrShortWrite
  111. }
  112. binary.BigEndian.PutUint32(buf[writeLen-encodeBinaryLen:writeLen], uint32(protoSize))
  113. return nil
  114. }
  115. func newDriver(logPath string, cfg *CreateConfig) (logger.Logger, error) {
  116. if err := validateConfig(cfg); err != nil {
  117. return nil, errdefs.InvalidParameter(err)
  118. }
  119. lf, err := loggerutils.NewLogFile(logPath, cfg.MaxFileSize, cfg.MaxFileCount, !cfg.DisableCompression, decodeFunc, 0o640, getTailReader)
  120. if err != nil {
  121. return nil, err
  122. }
  123. return &driver{
  124. logfile: lf,
  125. }, nil
  126. }
  127. func (d *driver) Name() string {
  128. return Name
  129. }
  130. func (d *driver) Log(msg *logger.Message) error {
  131. buf := buffersPool.Get().(*[]byte)
  132. defer buffersPool.Put(buf)
  133. timestamp := msg.Timestamp
  134. err := marshal(msg, buf)
  135. logger.PutMessage(msg)
  136. if err != nil {
  137. return errors.Wrap(err, "error marshalling logger.Message")
  138. }
  139. return d.logfile.WriteLogEntry(timestamp, *buf)
  140. }
  141. func (d *driver) Close() error {
  142. return d.logfile.Close()
  143. }
  144. func messageToProto(msg *logger.Message, proto *logdriver.LogEntry, partial *logdriver.PartialLogEntryMetadata) {
  145. proto.Source = msg.Source
  146. proto.TimeNano = msg.Timestamp.UnixNano()
  147. proto.Line = append(proto.Line[:0], msg.Line...)
  148. proto.Partial = msg.PLogMetaData != nil
  149. if proto.Partial {
  150. partial.Ordinal = int32(msg.PLogMetaData.Ordinal)
  151. partial.Last = msg.PLogMetaData.Last
  152. partial.Id = msg.PLogMetaData.ID
  153. proto.PartialLogMetadata = partial
  154. } else {
  155. proto.PartialLogMetadata = nil
  156. }
  157. }
  158. func protoToMessage(proto *logdriver.LogEntry) *logger.Message {
  159. msg := &logger.Message{
  160. Source: proto.Source,
  161. Timestamp: time.Unix(0, proto.TimeNano),
  162. }
  163. if proto.Partial {
  164. var md backend.PartialLogMetaData
  165. md.Last = proto.GetPartialLogMetadata().GetLast()
  166. md.ID = proto.GetPartialLogMetadata().GetId()
  167. md.Ordinal = int(proto.GetPartialLogMetadata().GetOrdinal())
  168. msg.PLogMetaData = &md
  169. }
  170. msg.Line = append(msg.Line[:0], proto.Line...)
  171. return msg
  172. }
  173. func resetProto(proto *logdriver.LogEntry) {
  174. proto.Source = ""
  175. proto.Line = proto.Line[:0]
  176. proto.TimeNano = 0
  177. proto.Partial = false
  178. if proto.PartialLogMetadata != nil {
  179. proto.PartialLogMetadata.Id = ""
  180. proto.PartialLogMetadata.Last = false
  181. proto.PartialLogMetadata.Ordinal = 0
  182. }
  183. proto.PartialLogMetadata = nil
  184. }