read.go 4.7 KB


  1. package local
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/binary"
  6. "fmt"
  7. "io"
  8. "github.com/docker/docker/api/types/plugins/logdriver"
  9. "github.com/docker/docker/daemon/logger"
  10. "github.com/docker/docker/daemon/logger/loggerutils"
  11. "github.com/docker/docker/errdefs"
  12. "github.com/pkg/errors"
  13. )
  14. // maxMsgLen is the maximum size of the logger.Message after serialization.
  15. // logger.defaultBufSize caps the size of Line field.
  16. const maxMsgLen int = 1e6 // 1MB.
  17. func (d *driver) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
  18. return d.logfile.ReadLogs(config)
  19. }
  20. func getTailReader(ctx context.Context, r loggerutils.SizeReaderAt, req int) (io.Reader, int, error) {
  21. size := r.Size()
  22. if req < 0 {
  23. return nil, 0, errdefs.InvalidParameter(errors.Errorf("invalid number of lines to tail: %d", req))
  24. }
  25. if size < (encodeBinaryLen*2)+1 {
  26. return bytes.NewReader(nil), 0, nil
  27. }
  28. const encodeBinaryLen64 = int64(encodeBinaryLen)
  29. var found int
  30. buf := make([]byte, encodeBinaryLen)
  31. offset := size
  32. for {
  33. select {
  34. case <-ctx.Done():
  35. return nil, 0, ctx.Err()
  36. default:
  37. }
  38. n, err := r.ReadAt(buf, offset-encodeBinaryLen64)
  39. if err != nil && err != io.EOF {
  40. return nil, 0, errors.Wrap(err, "error reading log message footer")
  41. }
  42. if n != encodeBinaryLen {
  43. return nil, 0, errdefs.DataLoss(errors.New("unexpected number of bytes read from log message footer"))
  44. }
  45. msgLen := binary.BigEndian.Uint32(buf)
  46. n, err = r.ReadAt(buf, offset-encodeBinaryLen64-encodeBinaryLen64-int64(msgLen))
  47. if err != nil && err != io.EOF {
  48. return nil, 0, errors.Wrap(err, "error reading log message header")
  49. }
  50. if n != encodeBinaryLen {
  51. return nil, 0, errdefs.DataLoss(errors.New("unexpected number of bytes read from log message header"))
  52. }
  53. if msgLen != binary.BigEndian.Uint32(buf) {
  54. return nil, 0, errdefs.DataLoss(errors.Wrap(err, "log message header and footer indicate different message sizes"))
  55. }
  56. found++
  57. offset -= int64(msgLen)
  58. offset -= encodeBinaryLen64 * 2
  59. if found == req {
  60. break
  61. }
  62. if offset <= 0 {
  63. break
  64. }
  65. }
  66. return io.NewSectionReader(r, offset, size), found, nil
  67. }
  68. type decoder struct {
  69. rdr io.Reader
  70. proto *logdriver.LogEntry
  71. // buf keeps bytes from rdr.
  72. buf []byte
  73. // offset is the position in buf.
  74. // If offset > 0, buf[offset:] has bytes which are read but haven't used.
  75. offset int
  76. // nextMsgLen is the length of the next log message.
  77. // If nextMsgLen = 0, a new value must be read from rdr.
  78. nextMsgLen int
  79. }
  80. func (d *decoder) readRecord(size int) error {
  81. var err error
  82. for i := 0; i < maxDecodeRetry; i++ {
  83. var n int
  84. n, err = io.ReadFull(d.rdr, d.buf[d.offset:size])
  85. d.offset += n
  86. if err != nil {
  87. if err != io.ErrUnexpectedEOF {
  88. return err
  89. }
  90. continue
  91. }
  92. break
  93. }
  94. if err != nil {
  95. return err
  96. }
  97. d.offset = 0
  98. return nil
  99. }
  100. func (d *decoder) Decode() (*logger.Message, error) {
  101. if d.proto == nil {
  102. d.proto = &logdriver.LogEntry{}
  103. } else {
  104. resetProto(d.proto)
  105. }
  106. if d.buf == nil {
  107. d.buf = make([]byte, initialBufSize)
  108. }
  109. if d.nextMsgLen == 0 {
  110. msgLen, err := d.decodeSizeHeader()
  111. if err != nil {
  112. return nil, err
  113. }
  114. if msgLen > maxMsgLen {
  115. return nil, fmt.Errorf("log message is too large (%d > %d)", msgLen, maxMsgLen)
  116. }
  117. if len(d.buf) < msgLen+encodeBinaryLen {
  118. d.buf = make([]byte, msgLen+encodeBinaryLen)
  119. } else if msgLen <= initialBufSize {
  120. d.buf = d.buf[:initialBufSize]
  121. } else {
  122. d.buf = d.buf[:msgLen+encodeBinaryLen]
  123. }
  124. d.nextMsgLen = msgLen
  125. }
  126. return d.decodeLogEntry()
  127. }
  128. func (d *decoder) Reset(rdr io.Reader) {
  129. if d.rdr == rdr {
  130. return
  131. }
  132. d.rdr = rdr
  133. if d.proto != nil {
  134. resetProto(d.proto)
  135. }
  136. if d.buf != nil {
  137. d.buf = d.buf[:initialBufSize]
  138. }
  139. d.offset = 0
  140. d.nextMsgLen = 0
  141. }
  142. func (d *decoder) Close() {
  143. d.buf = d.buf[:0]
  144. d.buf = nil
  145. if d.proto != nil {
  146. resetProto(d.proto)
  147. }
  148. d.rdr = nil
  149. }
  150. func decodeFunc(rdr io.Reader) loggerutils.Decoder {
  151. return &decoder{rdr: rdr}
  152. }
  153. func (d *decoder) decodeSizeHeader() (int, error) {
  154. err := d.readRecord(encodeBinaryLen)
  155. if err != nil {
  156. return 0, errors.Wrap(err, "could not read a size header")
  157. }
  158. msgLen := int(binary.BigEndian.Uint32(d.buf[:encodeBinaryLen]))
  159. return msgLen, nil
  160. }
  161. func (d *decoder) decodeLogEntry() (*logger.Message, error) {
  162. msgLen := d.nextMsgLen
  163. err := d.readRecord(msgLen + encodeBinaryLen)
  164. if err != nil {
  165. return nil, errors.Wrapf(err, "could not read a log entry (size=%d+%d)", msgLen, encodeBinaryLen)
  166. }
  167. d.nextMsgLen = 0
  168. if err := d.proto.Unmarshal(d.buf[:msgLen]); err != nil {
  169. return nil, errors.Wrapf(err, "error unmarshalling log entry (size=%d)", msgLen)
  170. }
  171. msg := protoToMessage(d.proto)
  172. if msg.PLogMetaData == nil || msg.PLogMetaData.Last {
  173. msg.Line = append(msg.Line, '\n')
  174. }
  175. return msg, nil
  176. }