0a48d26fbc
This was using `errors.Wrap` when there was no error to wrap, meanwhile we are supposed to be creating a new error. Found this while investigating some log corruption issues and unexpectedly getting a nil reader and a nil error from `getTailReader`. Signed-off-by: Brian Goff <cpuguy83@gmail.com>
210 lines
4.7 KiB
Go
210 lines
4.7 KiB
Go
package local
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/binary"
|
|
"fmt"
|
|
"io"
|
|
|
|
"github.com/docker/docker/api/types/plugins/logdriver"
|
|
"github.com/docker/docker/daemon/logger"
|
|
"github.com/docker/docker/daemon/logger/loggerutils"
|
|
"github.com/docker/docker/errdefs"
|
|
"github.com/pkg/errors"
|
|
)
|
|
|
|
// maxMsgLen is the maximum size of the logger.Message after serialization.
|
|
// logger.defaultBufSize caps the size of Line field.
|
|
const maxMsgLen int = 1e6 // 1MB.
|
|
|
|
func (d *driver) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
|
|
return d.logfile.ReadLogs(config)
|
|
}
|
|
|
|
func getTailReader(ctx context.Context, r loggerutils.SizeReaderAt, req int) (io.Reader, int, error) {
|
|
size := r.Size()
|
|
if req < 0 {
|
|
return nil, 0, errdefs.InvalidParameter(errors.Errorf("invalid number of lines to tail: %d", req))
|
|
}
|
|
|
|
if size < (encodeBinaryLen*2)+1 {
|
|
return bytes.NewReader(nil), 0, nil
|
|
}
|
|
|
|
const encodeBinaryLen64 = int64(encodeBinaryLen)
|
|
var found int
|
|
|
|
buf := make([]byte, encodeBinaryLen)
|
|
|
|
offset := size
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, 0, ctx.Err()
|
|
default:
|
|
}
|
|
|
|
n, err := r.ReadAt(buf, offset-encodeBinaryLen64)
|
|
if err != nil && err != io.EOF {
|
|
return nil, 0, errors.Wrap(err, "error reading log message footer")
|
|
}
|
|
|
|
if n != encodeBinaryLen {
|
|
return nil, 0, errdefs.DataLoss(errors.New("unexpected number of bytes read from log message footer"))
|
|
}
|
|
|
|
msgLen := binary.BigEndian.Uint32(buf)
|
|
|
|
n, err = r.ReadAt(buf, offset-encodeBinaryLen64-encodeBinaryLen64-int64(msgLen))
|
|
if err != nil && err != io.EOF {
|
|
return nil, 0, errors.Wrap(err, "error reading log message header")
|
|
}
|
|
|
|
if n != encodeBinaryLen {
|
|
return nil, 0, errdefs.DataLoss(errors.New("unexpected number of bytes read from log message header"))
|
|
}
|
|
|
|
if msgLen != binary.BigEndian.Uint32(buf) {
|
|
return nil, 0, errdefs.DataLoss(errors.New("log message header and footer indicate different message sizes"))
|
|
}
|
|
|
|
found++
|
|
offset -= int64(msgLen)
|
|
offset -= encodeBinaryLen64 * 2
|
|
if found == req {
|
|
break
|
|
}
|
|
if offset <= 0 {
|
|
break
|
|
}
|
|
}
|
|
|
|
return io.NewSectionReader(r, offset, size), found, nil
|
|
}
|
|
|
|
type decoder struct {
|
|
rdr io.Reader
|
|
proto *logdriver.LogEntry
|
|
// buf keeps bytes from rdr.
|
|
buf []byte
|
|
// offset is the position in buf.
|
|
// If offset > 0, buf[offset:] has bytes which are read but haven't used.
|
|
offset int
|
|
// nextMsgLen is the length of the next log message.
|
|
// If nextMsgLen = 0, a new value must be read from rdr.
|
|
nextMsgLen int
|
|
}
|
|
|
|
func (d *decoder) readRecord(size int) error {
|
|
var err error
|
|
for i := 0; i < maxDecodeRetry; i++ {
|
|
var n int
|
|
n, err = io.ReadFull(d.rdr, d.buf[d.offset:size])
|
|
d.offset += n
|
|
if err != nil {
|
|
if err != io.ErrUnexpectedEOF {
|
|
return err
|
|
}
|
|
continue
|
|
}
|
|
break
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
d.offset = 0
|
|
return nil
|
|
}
|
|
|
|
func (d *decoder) Decode() (*logger.Message, error) {
|
|
if d.proto == nil {
|
|
d.proto = &logdriver.LogEntry{}
|
|
} else {
|
|
resetProto(d.proto)
|
|
}
|
|
if d.buf == nil {
|
|
d.buf = make([]byte, initialBufSize)
|
|
}
|
|
|
|
if d.nextMsgLen == 0 {
|
|
msgLen, err := d.decodeSizeHeader()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if msgLen > maxMsgLen {
|
|
return nil, fmt.Errorf("log message is too large (%d > %d)", msgLen, maxMsgLen)
|
|
}
|
|
|
|
if len(d.buf) < msgLen+encodeBinaryLen {
|
|
d.buf = make([]byte, msgLen+encodeBinaryLen)
|
|
} else if msgLen <= initialBufSize {
|
|
d.buf = d.buf[:initialBufSize]
|
|
} else {
|
|
d.buf = d.buf[:msgLen+encodeBinaryLen]
|
|
}
|
|
|
|
d.nextMsgLen = msgLen
|
|
}
|
|
return d.decodeLogEntry()
|
|
}
|
|
|
|
func (d *decoder) Reset(rdr io.Reader) {
|
|
if d.rdr == rdr {
|
|
return
|
|
}
|
|
|
|
d.rdr = rdr
|
|
if d.proto != nil {
|
|
resetProto(d.proto)
|
|
}
|
|
if d.buf != nil {
|
|
d.buf = d.buf[:initialBufSize]
|
|
}
|
|
d.offset = 0
|
|
d.nextMsgLen = 0
|
|
}
|
|
|
|
func (d *decoder) Close() {
|
|
d.buf = d.buf[:0]
|
|
d.buf = nil
|
|
if d.proto != nil {
|
|
resetProto(d.proto)
|
|
}
|
|
d.rdr = nil
|
|
}
|
|
|
|
func decodeFunc(rdr io.Reader) loggerutils.Decoder {
|
|
return &decoder{rdr: rdr}
|
|
}
|
|
|
|
func (d *decoder) decodeSizeHeader() (int, error) {
|
|
err := d.readRecord(encodeBinaryLen)
|
|
if err != nil {
|
|
return 0, errors.Wrap(err, "could not read a size header")
|
|
}
|
|
|
|
msgLen := int(binary.BigEndian.Uint32(d.buf[:encodeBinaryLen]))
|
|
return msgLen, nil
|
|
}
|
|
|
|
func (d *decoder) decodeLogEntry() (*logger.Message, error) {
|
|
msgLen := d.nextMsgLen
|
|
err := d.readRecord(msgLen + encodeBinaryLen)
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "could not read a log entry (size=%d+%d)", msgLen, encodeBinaryLen)
|
|
}
|
|
d.nextMsgLen = 0
|
|
|
|
if err := d.proto.Unmarshal(d.buf[:msgLen]); err != nil {
|
|
return nil, errors.Wrapf(err, "error unmarshalling log entry (size=%d)", msgLen)
|
|
}
|
|
|
|
msg := protoToMessage(d.proto)
|
|
if msg.PLogMetaData == nil || msg.PLogMetaData.Last {
|
|
msg.Line = append(msg.Line, '\n')
|
|
}
|
|
|
|
return msg, nil
|
|
}
|