Merge pull request #37092 from cpuguy83/local_logger

Add "local" log driver
This commit is contained in:
Sebastiaan van Stijn 2018-08-20 07:01:41 +01:00 committed by GitHub
commit e0ad6d045c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 1587 additions and 117 deletions

View file

@ -10,6 +10,7 @@
It has these top-level messages:
LogEntry
PartialLogEntryMetadata
*/
package logdriver
@ -31,10 +32,11 @@ var _ = math.Inf
const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package
type LogEntry struct {
Source string `protobuf:"bytes,1,opt,name=source,proto3" json:"source,omitempty"`
TimeNano int64 `protobuf:"varint,2,opt,name=time_nano,json=timeNano,proto3" json:"time_nano,omitempty"`
Line []byte `protobuf:"bytes,3,opt,name=line,proto3" json:"line,omitempty"`
Partial bool `protobuf:"varint,4,opt,name=partial,proto3" json:"partial,omitempty"`
Source string `protobuf:"bytes,1,opt,name=source,proto3" json:"source,omitempty"`
TimeNano int64 `protobuf:"varint,2,opt,name=time_nano,json=timeNano,proto3" json:"time_nano,omitempty"`
Line []byte `protobuf:"bytes,3,opt,name=line,proto3" json:"line,omitempty"`
Partial bool `protobuf:"varint,4,opt,name=partial,proto3" json:"partial,omitempty"`
PartialLogMetadata *PartialLogEntryMetadata `protobuf:"bytes,5,opt,name=partial_log_metadata,json=partialLogMetadata" json:"partial_log_metadata,omitempty"`
}
func (m *LogEntry) Reset() { *m = LogEntry{} }
@ -70,8 +72,48 @@ func (m *LogEntry) GetPartial() bool {
return false
}
func (m *LogEntry) GetPartialLogMetadata() *PartialLogEntryMetadata {
if m != nil {
return m.PartialLogMetadata
}
return nil
}
type PartialLogEntryMetadata struct {
Last bool `protobuf:"varint,1,opt,name=last,proto3" json:"last,omitempty"`
Id string `protobuf:"bytes,2,opt,name=id,proto3" json:"id,omitempty"`
Ordinal int32 `protobuf:"varint,3,opt,name=ordinal,proto3" json:"ordinal,omitempty"`
}
func (m *PartialLogEntryMetadata) Reset() { *m = PartialLogEntryMetadata{} }
func (m *PartialLogEntryMetadata) String() string { return proto.CompactTextString(m) }
func (*PartialLogEntryMetadata) ProtoMessage() {}
func (*PartialLogEntryMetadata) Descriptor() ([]byte, []int) { return fileDescriptorEntry, []int{1} }
func (m *PartialLogEntryMetadata) GetLast() bool {
if m != nil {
return m.Last
}
return false
}
func (m *PartialLogEntryMetadata) GetId() string {
if m != nil {
return m.Id
}
return ""
}
func (m *PartialLogEntryMetadata) GetOrdinal() int32 {
if m != nil {
return m.Ordinal
}
return 0
}
func init() {
proto.RegisterType((*LogEntry)(nil), "LogEntry")
proto.RegisterType((*PartialLogEntryMetadata)(nil), "PartialLogEntryMetadata")
}
func (m *LogEntry) Marshal() (dAtA []byte, err error) {
size := m.Size()
@ -115,6 +157,55 @@ func (m *LogEntry) MarshalTo(dAtA []byte) (int, error) {
}
i++
}
if m.PartialLogMetadata != nil {
dAtA[i] = 0x2a
i++
i = encodeVarintEntry(dAtA, i, uint64(m.PartialLogMetadata.Size()))
n1, err := m.PartialLogMetadata.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n1
}
return i, nil
}
func (m *PartialLogEntryMetadata) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *PartialLogEntryMetadata) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
if m.Last {
dAtA[i] = 0x8
i++
if m.Last {
dAtA[i] = 1
} else {
dAtA[i] = 0
}
i++
}
if len(m.Id) > 0 {
dAtA[i] = 0x12
i++
i = encodeVarintEntry(dAtA, i, uint64(len(m.Id)))
i += copy(dAtA[i:], m.Id)
}
if m.Ordinal != 0 {
dAtA[i] = 0x18
i++
i = encodeVarintEntry(dAtA, i, uint64(m.Ordinal))
}
return i, nil
}
@ -162,6 +253,26 @@ func (m *LogEntry) Size() (n int) {
if m.Partial {
n += 2
}
if m.PartialLogMetadata != nil {
l = m.PartialLogMetadata.Size()
n += 1 + l + sovEntry(uint64(l))
}
return n
}
func (m *PartialLogEntryMetadata) Size() (n int) {
var l int
_ = l
if m.Last {
n += 2
}
l = len(m.Id)
if l > 0 {
n += 1 + l + sovEntry(uint64(l))
}
if m.Ordinal != 0 {
n += 1 + sovEntry(uint64(m.Ordinal))
}
return n
}
@ -306,6 +417,157 @@ func (m *LogEntry) Unmarshal(dAtA []byte) error {
}
}
m.Partial = bool(v != 0)
case 5:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field PartialLogMetadata", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEntry
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthEntry
}
postIndex := iNdEx + msglen
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.PartialLogMetadata == nil {
m.PartialLogMetadata = &PartialLogEntryMetadata{}
}
if err := m.PartialLogMetadata.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipEntry(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthEntry
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *PartialLogEntryMetadata) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEntry
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: PartialLogEntryMetadata: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: PartialLogEntryMetadata: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Last", wireType)
}
var v int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEntry
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
v |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
m.Last = bool(v != 0)
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Id", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEntry
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthEntry
}
postIndex := iNdEx + intStringLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Id = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 3:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Ordinal", wireType)
}
m.Ordinal = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEntry
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Ordinal |= (int32(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
default:
iNdEx = preIndex
skippy, err := skipEntry(dAtA[iNdEx:])
@ -435,15 +697,20 @@ var (
func init() { proto.RegisterFile("entry.proto", fileDescriptorEntry) }
var fileDescriptorEntry = []byte{
// 149 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x4e, 0xcd, 0x2b, 0x29,
0xaa, 0xd4, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x57, 0xca, 0xe5, 0xe2, 0xf0, 0xc9, 0x4f, 0x77, 0x05,
0x89, 0x08, 0x89, 0x71, 0xb1, 0x15, 0xe7, 0x97, 0x16, 0x25, 0xa7, 0x4a, 0x30, 0x2a, 0x30, 0x6a,
0x70, 0x06, 0x41, 0x79, 0x42, 0xd2, 0x5c, 0x9c, 0x25, 0x99, 0xb9, 0xa9, 0xf1, 0x79, 0x89, 0x79,
0xf9, 0x12, 0x4c, 0x0a, 0x8c, 0x1a, 0xcc, 0x41, 0x1c, 0x20, 0x01, 0xbf, 0xc4, 0xbc, 0x7c, 0x21,
0x21, 0x2e, 0x96, 0x9c, 0xcc, 0xbc, 0x54, 0x09, 0x66, 0x05, 0x46, 0x0d, 0x9e, 0x20, 0x30, 0x5b,
0x48, 0x82, 0x8b, 0xbd, 0x20, 0xb1, 0xa8, 0x24, 0x33, 0x31, 0x47, 0x82, 0x45, 0x81, 0x51, 0x83,
0x23, 0x08, 0xc6, 0x75, 0xe2, 0x39, 0xf1, 0x48, 0x8e, 0xf1, 0xc2, 0x23, 0x39, 0xc6, 0x07, 0x8f,
0xe4, 0x18, 0x93, 0xd8, 0xc0, 0x6e, 0x30, 0x06, 0x04, 0x00, 0x00, 0xff, 0xff, 0x2d, 0x24, 0x5a,
0xd4, 0x92, 0x00, 0x00, 0x00,
// 237 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x74, 0x90, 0xbd, 0x4a, 0x04, 0x31,
0x14, 0x85, 0xb9, 0xb3, 0x3f, 0xce, 0xdc, 0x5d, 0x2c, 0x82, 0x68, 0x40, 0x18, 0xc2, 0x56, 0xa9,
0xb6, 0xd0, 0x37, 0x10, 0x6c, 0x44, 0x45, 0xd2, 0x58, 0x0e, 0x57, 0x27, 0x2c, 0x81, 0xd9, 0xdc,
0x21, 0x13, 0x0b, 0x1f, 0xcd, 0x37, 0xb0, 0xf4, 0x11, 0x64, 0x9e, 0x44, 0x26, 0x4e, 0xec, 0xec,
0xce, 0x39, 0x5f, 0x8a, 0x2f, 0x17, 0x37, 0xd6, 0xc7, 0xf0, 0xbe, 0xef, 0x03, 0x47, 0xde, 0x7d,
0x00, 0x96, 0xf7, 0x7c, 0xb8, 0x9d, 0x26, 0x71, 0x8e, 0xeb, 0x81, 0xdf, 0xc2, 0xab, 0x95, 0xa0,
0x40, 0x57, 0x66, 0x6e, 0xe2, 0x12, 0xab, 0xe8, 0x8e, 0xb6, 0xf1, 0xe4, 0x59, 0x16, 0x0a, 0xf4,
0xc2, 0x94, 0xd3, 0xf0, 0x48, 0x9e, 0x85, 0xc0, 0x65, 0xe7, 0xbc, 0x95, 0x0b, 0x05, 0x7a, 0x6b,
0x52, 0x16, 0x12, 0x4f, 0x7a, 0x0a, 0xd1, 0x51, 0x27, 0x97, 0x0a, 0x74, 0x69, 0x72, 0x15, 0x77,
0x78, 0x36, 0xc7, 0xa6, 0xe3, 0x43, 0x73, 0xb4, 0x91, 0x5a, 0x8a, 0x24, 0x57, 0x0a, 0xf4, 0xe6,
0x4a, 0xee, 0x9f, 0x7e, 0x61, 0x56, 0x7a, 0x98, 0xb9, 0x11, 0xfd, 0x1f, 0xc8, 0xdb, 0xee, 0x19,
0x2f, 0xfe, 0x79, 0x9e, 0xa4, 0x68, 0x88, 0xe9, 0x1f, 0xa5, 0x49, 0x59, 0x9c, 0x62, 0xe1, 0xda,
0xa4, 0x5f, 0x99, 0xc2, 0xb5, 0x93, 0x24, 0x87, 0xd6, 0x79, 0xea, 0x92, 0xfb, 0xca, 0xe4, 0x7a,
0xb3, 0xfd, 0x1c, 0x6b, 0xf8, 0x1a, 0x6b, 0xf8, 0x1e, 0x6b, 0x78, 0x59, 0xa7, 0x4b, 0x5d, 0xff,
0x04, 0x00, 0x00, 0xff, 0xff, 0x8f, 0xed, 0x9f, 0xb6, 0x38, 0x01, 0x00, 0x00,
}

View file

@ -5,4 +5,12 @@ message LogEntry {
int64 time_nano = 2;
bytes line = 3;
bool partial = 4;
PartialLogEntryMetadata partial_log_metadata = 5;
}
message PartialLogEntryMetadata {
bool last = 1;
string id = 2;
int32 ordinal = 3;
}

View file

@ -22,7 +22,9 @@ import (
"github.com/docker/docker/daemon/exec"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/daemon/logger/jsonfilelog"
"github.com/docker/docker/daemon/logger/local"
"github.com/docker/docker/daemon/network"
"github.com/docker/docker/errdefs"
"github.com/docker/docker/image"
"github.com/docker/docker/layer"
"github.com/docker/docker/pkg/containerfs"
@ -375,13 +377,27 @@ func (container *Container) StartLogger() (logger.Logger, error) {
}
// Set logging file for "json-logger"
if cfg.Type == jsonfilelog.Name {
// TODO(@cpuguy83): Setup here based on log driver is a little weird.
switch cfg.Type {
case jsonfilelog.Name:
info.LogPath, err = container.GetRootResourcePath(fmt.Sprintf("%s-json.log", container.ID))
if err != nil {
return nil, err
}
container.LogPath = info.LogPath
case local.Name:
// Do not set container.LogPath for the local driver
// This would expose the value to the API, which should not be done as it means
// that the log file implementation would become a stable API that cannot change.
logDir, err := container.GetRootResourcePath("local-logs")
if err != nil {
return nil, err
}
if err := os.MkdirAll(logDir, 0700); err != nil {
return nil, errdefs.System(errors.Wrap(err, "error creating local logs dir"))
}
info.LogPath = filepath.Join(logDir, "container.log")
}
l, err := initDriver(info)

View file

@ -9,6 +9,7 @@ import (
_ "github.com/docker/docker/daemon/logger/gelf"
_ "github.com/docker/docker/daemon/logger/journald"
_ "github.com/docker/docker/daemon/logger/jsonfilelog"
_ "github.com/docker/docker/daemon/logger/local"
_ "github.com/docker/docker/daemon/logger/logentries"
_ "github.com/docker/docker/daemon/logger/splunk"
_ "github.com/docker/docker/daemon/logger/syslog"

View file

@ -110,7 +110,7 @@ func New(info logger.Info) (logger.Logger, error) {
return b, nil
}
writer, err := loggerutils.NewLogFile(info.LogPath, capval, maxFiles, compress, marshalFunc, decodeFunc, 0640)
writer, err := loggerutils.NewLogFile(info.LogPath, capval, maxFiles, compress, marshalFunc, decodeFunc, 0640, getTailReader)
if err != nil {
return nil, err
}

View file

@ -4,6 +4,7 @@ import (
"bytes"
"compress/gzip"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
@ -107,7 +108,10 @@ func BenchmarkJSONFileLoggerLog(b *testing.B) {
ContainerID: "a7317399f3f857173c6179d44823594f8294678dea9999662e5c625b5a1c7657",
LogPath: tmp.Join("container.log"),
Config: map[string]string{
"labels": "first,second",
"labels": "first,second",
"max-file": "10",
"compress": "true",
"max-size": "20m",
},
ContainerLabels: map[string]string{
"first": "label_value",
@ -117,21 +121,34 @@ func BenchmarkJSONFileLoggerLog(b *testing.B) {
assert.NilError(b, err)
defer jsonlogger.Close()
msg := &logger.Message{
Line: []byte("Line that thinks that it is log line from docker\n"),
Source: "stderr",
Timestamp: time.Now().UTC(),
}
t := time.Now().UTC()
for _, data := range [][]byte{
[]byte(""),
[]byte("a short string"),
bytes.Repeat([]byte("a long string"), 100),
bytes.Repeat([]byte("a really long string"), 10000),
} {
b.Run(fmt.Sprintf("%d", len(data)), func(b *testing.B) {
testMsg := &logger.Message{
Line: data,
Source: "stderr",
Timestamp: t,
}
buf := bytes.NewBuffer(nil)
assert.NilError(b, marshalMessage(msg, nil, buf))
b.SetBytes(int64(buf.Len()))
b.ResetTimer()
for i := 0; i < b.N; i++ {
if err := jsonlogger.Log(msg); err != nil {
b.Fatal(err)
}
buf := bytes.NewBuffer(nil)
assert.NilError(b, marshalMessage(testMsg, nil, buf))
b.SetBytes(int64(buf.Len()))
b.ResetTimer()
for i := 0; i < b.N; i++ {
msg := logger.NewMessage()
msg.Line = testMsg.Line
msg.Timestamp = testMsg.Timestamp
msg.Source = testMsg.Source
if err := jsonlogger.Log(msg); err != nil {
b.Fatal(err)
}
}
})
}
}

View file

@ -1,12 +1,16 @@
package jsonfilelog // import "github.com/docker/docker/daemon/logger/jsonfilelog"
import (
"context"
"encoding/json"
"io"
"github.com/docker/docker/api/types/backend"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/daemon/logger/jsonfilelog/jsonlog"
"github.com/docker/docker/daemon/logger/loggerutils"
"github.com/docker/docker/pkg/tailfile"
"github.com/sirupsen/logrus"
)
const maxJSONDecodeRetry = 20000
@ -63,14 +67,14 @@ func decodeFunc(rdr io.Reader) func() (*logger.Message, error) {
return func() (msg *logger.Message, err error) {
for retries := 0; retries < maxJSONDecodeRetry; retries++ {
msg, err = decodeLogLine(dec, l)
if err == nil {
if err == nil || err == io.EOF {
break
}
logrus.WithError(err).WithField("retries", retries).Warn("got error while decoding json")
// try again, could be due to a an incomplete json object as we read
if _, ok := err.(*json.SyntaxError); ok {
dec = json.NewDecoder(rdr)
retries++
continue
}
@ -81,9 +85,13 @@ func decodeFunc(rdr io.Reader) func() (*logger.Message, error) {
if err == io.ErrUnexpectedEOF {
reader := io.MultiReader(dec.Buffered(), rdr)
dec = json.NewDecoder(reader)
retries++
continue
}
}
return msg, err
}
}
func getTailReader(ctx context.Context, r loggerutils.SizeReaderAt, req int) (io.Reader, int, error) {
return tailfile.NewTailReader(ctx, r, req)
}

View file

@ -2,6 +2,7 @@ package jsonfilelog // import "github.com/docker/docker/daemon/logger/jsonfilelo
import (
"bytes"
"io"
"testing"
"time"
@ -62,3 +63,32 @@ func BenchmarkJSONFileLoggerReadLogs(b *testing.B) {
}
}
}
func TestEncodeDecode(t *testing.T) {
t.Parallel()
m1 := &logger.Message{Line: []byte("hello 1"), Timestamp: time.Now(), Source: "stdout"}
m2 := &logger.Message{Line: []byte("hello 2"), Timestamp: time.Now(), Source: "stdout"}
m3 := &logger.Message{Line: []byte("hello 3"), Timestamp: time.Now(), Source: "stdout"}
buf := bytes.NewBuffer(nil)
assert.Assert(t, marshalMessage(m1, nil, buf))
assert.Assert(t, marshalMessage(m2, nil, buf))
assert.Assert(t, marshalMessage(m3, nil, buf))
decode := decodeFunc(buf)
msg, err := decode()
assert.Assert(t, err)
assert.Assert(t, string(msg.Line) == "hello 1\n", string(msg.Line))
msg, err = decode()
assert.Assert(t, err)
assert.Assert(t, string(msg.Line) == "hello 2\n")
msg, err = decode()
assert.Assert(t, err)
assert.Assert(t, string(msg.Line) == "hello 3\n")
_, err = decode()
assert.Assert(t, err == io.EOF)
}

View file

@ -0,0 +1,36 @@
package local
import (
"github.com/pkg/errors"
)
// CreateConfig is used to configure new instances of driver
type CreateConfig struct {
DisableCompression bool
MaxFileSize int64
MaxFileCount int
}
func newDefaultConfig() *CreateConfig {
return &CreateConfig{
MaxFileSize: defaultMaxFileSize,
MaxFileCount: defaultMaxFileCount,
DisableCompression: !defaultCompressLogs,
}
}
func validateConfig(cfg *CreateConfig) error {
if cfg.MaxFileSize < 0 {
return errors.New("max size should be a positive number")
}
if cfg.MaxFileCount < 0 {
return errors.New("max file count cannot be less than 0")
}
if !cfg.DisableCompression {
if cfg.MaxFileCount <= 1 {
return errors.New("compression cannot be enabled when max file count is 1")
}
}
return nil
}

View file

@ -0,0 +1,9 @@
// Package local provides a logger implementation that stores logs on disk.
//
// Log messages are encoded as protobufs with a header and footer for each message.
// The header and footer are big-endian binary encoded uint32 values which indicate the size of the log message.
// The header and footer of each message allows you to efficiently read through a file either forwards or in
// backwards (such as is the case when tailing a file)
//
// Example log message format: [22][This is a log message.][22][28][This is another log message.][28]
package local // import "github.com/docker/docker/daemon/logger/local"

View file

@ -0,0 +1,218 @@
package local // import "github.com/docker/docker/daemon/logger/local"
import (
"encoding/binary"
"io"
"strconv"
"sync"
"time"
"github.com/docker/docker/api/types/backend"
"github.com/docker/docker/api/types/plugins/logdriver"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/daemon/logger/loggerutils"
"github.com/docker/docker/errdefs"
"github.com/docker/go-units"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
const (
// Name is the name of the driver
Name = "local"
encodeBinaryLen = 4
initialBufSize = 2048
maxDecodeRetry = 20000
defaultMaxFileSize int64 = 20 * 1024 * 1024
defaultMaxFileCount = 5
defaultCompressLogs = true
)
// LogOptKeys are the keys names used for log opts passed in to initialize the driver.
var LogOptKeys = map[string]bool{
"max-file": true,
"max-size": true,
"compress": true,
}
// ValidateLogOpt looks for log driver specific options.
func ValidateLogOpt(cfg map[string]string) error {
for key := range cfg {
if !LogOptKeys[key] {
return errors.Errorf("unknown log opt '%s' for log driver %s", key, Name)
}
}
return nil
}
func init() {
if err := logger.RegisterLogDriver(Name, New); err != nil {
logrus.Fatal(err)
}
if err := logger.RegisterLogOptValidator(Name, ValidateLogOpt); err != nil {
logrus.Fatal(err)
}
}
type driver struct {
mu sync.Mutex
closed bool
logfile *loggerutils.LogFile
readers map[*logger.LogWatcher]struct{} // stores the active log followers
}
// New creates a new local logger
// You must provide the `LogPath` in the passed in info argument, this is the file path that logs are written to.
func New(info logger.Info) (logger.Logger, error) {
if info.LogPath == "" {
return nil, errdefs.System(errors.New("log path is missing -- this is a bug and should not happen"))
}
cfg := newDefaultConfig()
if capacity, ok := info.Config["max-size"]; ok {
var err error
cfg.MaxFileSize, err = units.FromHumanSize(capacity)
if err != nil {
return nil, errdefs.InvalidParameter(errors.Wrapf(err, "invalid value for max-size: %s", capacity))
}
}
if userMaxFileCount, ok := info.Config["max-file"]; ok {
var err error
cfg.MaxFileCount, err = strconv.Atoi(userMaxFileCount)
if err != nil {
return nil, errdefs.InvalidParameter(errors.Wrapf(err, "invalid value for max-file: %s", userMaxFileCount))
}
}
if userCompress, ok := info.Config["compress"]; ok {
compressLogs, err := strconv.ParseBool(userCompress)
if err != nil {
return nil, errdefs.InvalidParameter(errors.Wrap(err, "error reading compress log option"))
}
cfg.DisableCompression = !compressLogs
}
return newDriver(info.LogPath, cfg)
}
func makeMarshaller() func(m *logger.Message) ([]byte, error) {
buf := make([]byte, initialBufSize)
// allocate the partial log entry separately, which allows for easier re-use
proto := &logdriver.LogEntry{}
md := &logdriver.PartialLogEntryMetadata{}
return func(m *logger.Message) ([]byte, error) {
resetProto(proto)
messageToProto(m, proto, md)
protoSize := proto.Size()
writeLen := protoSize + (2 * encodeBinaryLen) //+ len(messageDelimiter)
if writeLen > len(buf) {
buf = make([]byte, writeLen)
} else {
// shrink the buffer back down
if writeLen <= initialBufSize {
buf = buf[:initialBufSize]
} else {
buf = buf[:writeLen]
}
}
binary.BigEndian.PutUint32(buf[:encodeBinaryLen], uint32(protoSize))
n, err := proto.MarshalTo(buf[encodeBinaryLen:writeLen])
if err != nil {
return nil, errors.Wrap(err, "error marshaling log entry")
}
if n+(encodeBinaryLen*2) != writeLen {
return nil, io.ErrShortWrite
}
binary.BigEndian.PutUint32(buf[writeLen-encodeBinaryLen:writeLen], uint32(protoSize))
return buf[:writeLen], nil
}
}
func newDriver(logPath string, cfg *CreateConfig) (logger.Logger, error) {
if err := validateConfig(cfg); err != nil {
return nil, errdefs.InvalidParameter(err)
}
lf, err := loggerutils.NewLogFile(logPath, cfg.MaxFileSize, cfg.MaxFileCount, !cfg.DisableCompression, makeMarshaller(), decodeFunc, 0640, getTailReader)
if err != nil {
return nil, err
}
return &driver{
logfile: lf,
readers: make(map[*logger.LogWatcher]struct{}),
}, nil
}
func (d *driver) Name() string {
return Name
}
func (d *driver) Log(msg *logger.Message) error {
d.mu.Lock()
err := d.logfile.WriteLogEntry(msg)
d.mu.Unlock()
return err
}
func (d *driver) Close() error {
d.mu.Lock()
d.closed = true
err := d.logfile.Close()
for r := range d.readers {
r.Close()
delete(d.readers, r)
}
d.mu.Unlock()
return err
}
func messageToProto(msg *logger.Message, proto *logdriver.LogEntry, partial *logdriver.PartialLogEntryMetadata) {
proto.Source = msg.Source
proto.TimeNano = msg.Timestamp.UnixNano()
proto.Line = append(proto.Line[:0], msg.Line...)
proto.Partial = msg.PLogMetaData != nil
if proto.Partial {
partial.Ordinal = int32(msg.PLogMetaData.Ordinal)
partial.Last = msg.PLogMetaData.Last
partial.Id = msg.PLogMetaData.ID
proto.PartialLogMetadata = partial
} else {
proto.PartialLogMetadata = nil
}
}
func protoToMessage(proto *logdriver.LogEntry) *logger.Message {
msg := &logger.Message{
Source: proto.Source,
Timestamp: time.Unix(0, proto.TimeNano),
}
if proto.Partial {
var md backend.PartialLogMetaData
md.Last = proto.GetPartialLogMetadata().GetLast()
md.ID = proto.GetPartialLogMetadata().GetId()
md.Ordinal = int(proto.GetPartialLogMetadata().GetOrdinal())
msg.PLogMetaData = &md
}
msg.Line = append(msg.Line[:0], proto.Line...)
return msg
}
func resetProto(proto *logdriver.LogEntry) {
proto.Source = ""
proto.Line = proto.Line[:0]
proto.TimeNano = 0
proto.Partial = false
if proto.PartialLogMetadata != nil {
proto.PartialLogMetadata.Id = ""
proto.PartialLogMetadata.Last = false
proto.PartialLogMetadata.Ordinal = 0
}
proto.PartialLogMetadata = nil
}

View file

@ -0,0 +1,220 @@
package local
import (
"context"
"encoding/binary"
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"
"bytes"
"fmt"
"strings"
"io"
"github.com/docker/docker/api/types/backend"
"github.com/docker/docker/api/types/plugins/logdriver"
"github.com/docker/docker/daemon/logger"
protoio "github.com/gogo/protobuf/io"
"gotest.tools/assert"
is "gotest.tools/assert/cmp"
)
func TestWriteLog(t *testing.T) {
t.Parallel()
dir, err := ioutil.TempDir("", t.Name())
assert.Assert(t, err)
defer os.RemoveAll(dir)
logPath := filepath.Join(dir, "test.log")
l, err := New(logger.Info{LogPath: logPath})
assert.Assert(t, err)
defer l.Close()
m1 := logger.Message{Source: "stdout", Timestamp: time.Now().Add(-1 * 30 * time.Minute), Line: []byte("message 1")}
m2 := logger.Message{Source: "stdout", Timestamp: time.Now().Add(-1 * 20 * time.Minute), Line: []byte("message 2"), PLogMetaData: &backend.PartialLogMetaData{Last: true, ID: "0001", Ordinal: 1}}
m3 := logger.Message{Source: "stderr", Timestamp: time.Now().Add(-1 * 10 * time.Minute), Line: []byte("message 3")}
// copy the log message because the underying log writer resets the log message and returns it to a buffer pool
err = l.Log(copyLogMessage(&m1))
assert.Assert(t, err)
err = l.Log(copyLogMessage(&m2))
assert.Assert(t, err)
err = l.Log(copyLogMessage(&m3))
assert.Assert(t, err)
f, err := os.Open(logPath)
assert.Assert(t, err)
defer f.Close()
dec := protoio.NewUint32DelimitedReader(f, binary.BigEndian, 1e6)
var (
proto logdriver.LogEntry
testProto logdriver.LogEntry
partial logdriver.PartialLogEntryMetadata
)
lenBuf := make([]byte, encodeBinaryLen)
seekMsgLen := func() {
io.ReadFull(f, lenBuf)
}
err = dec.ReadMsg(&proto)
assert.Assert(t, err)
messageToProto(&m1, &testProto, &partial)
assert.Check(t, is.DeepEqual(testProto, proto), "expected:\n%+v\ngot:\n%+v", testProto, proto)
seekMsgLen()
err = dec.ReadMsg(&proto)
assert.Assert(t, err)
messageToProto(&m2, &testProto, &partial)
assert.Check(t, is.DeepEqual(testProto, proto))
seekMsgLen()
err = dec.ReadMsg(&proto)
assert.Assert(t, err)
messageToProto(&m3, &testProto, &partial)
assert.Check(t, is.DeepEqual(testProto, proto), "expected:\n%+v\ngot:\n%+v", testProto, proto)
}
func TestReadLog(t *testing.T) {
t.Parallel()
dir, err := ioutil.TempDir("", t.Name())
assert.Assert(t, err)
defer os.RemoveAll(dir)
logPath := filepath.Join(dir, "test.log")
l, err := New(logger.Info{LogPath: logPath})
assert.Assert(t, err)
defer l.Close()
m1 := logger.Message{Source: "stdout", Timestamp: time.Now().Add(-1 * 30 * time.Minute), Line: []byte("a message")}
m2 := logger.Message{Source: "stdout", Timestamp: time.Now().Add(-1 * 20 * time.Minute), Line: []byte("another message"), PLogMetaData: &backend.PartialLogMetaData{Ordinal: 1, Last: true}}
longMessage := []byte("a really long message " + strings.Repeat("a", initialBufSize*2))
m3 := logger.Message{Source: "stderr", Timestamp: time.Now().Add(-1 * 10 * time.Minute), Line: longMessage}
m4 := logger.Message{Source: "stderr", Timestamp: time.Now().Add(-1 * 10 * time.Minute), Line: []byte("just one more message")}
// copy the log message because the underlying log writer resets the log message and returns it to a buffer pool
err = l.Log(copyLogMessage(&m1))
assert.Assert(t, err)
err = l.Log(copyLogMessage(&m2))
assert.Assert(t, err)
err = l.Log(copyLogMessage(&m3))
assert.Assert(t, err)
err = l.Log(copyLogMessage(&m4))
assert.Assert(t, err)
lr := l.(logger.LogReader)
testMessage := func(t *testing.T, lw *logger.LogWatcher, m *logger.Message) {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
select {
case <-ctx.Done():
assert.Assert(t, ctx.Err())
case err := <-lw.Err:
assert.Assert(t, err)
case msg, open := <-lw.Msg:
if !open {
select {
case err := <-lw.Err:
assert.Assert(t, err)
default:
assert.Assert(t, m == nil)
return
}
}
assert.Assert(t, m != nil)
if m.PLogMetaData == nil {
// a `\n` is appended on read to make this work with the existing API's when the message is not a partial.
// make sure it's the last entry in the line, and then truncate it for the deep equal below.
assert.Check(t, msg.Line[len(msg.Line)-1] == '\n')
msg.Line = msg.Line[:len(msg.Line)-1]
}
assert.Check(t, is.DeepEqual(m, msg), fmt.Sprintf("\n%+v\n%+v", m, msg))
}
}
t.Run("tail exact", func(t *testing.T) {
lw := lr.ReadLogs(logger.ReadConfig{Tail: 4})
testMessage(t, lw, &m1)
testMessage(t, lw, &m2)
testMessage(t, lw, &m3)
testMessage(t, lw, &m4)
testMessage(t, lw, nil) // no more messages
})
t.Run("tail less than available", func(t *testing.T) {
lw := lr.ReadLogs(logger.ReadConfig{Tail: 2})
testMessage(t, lw, &m3)
testMessage(t, lw, &m4)
testMessage(t, lw, nil) // no more messages
})
t.Run("tail more than available", func(t *testing.T) {
lw := lr.ReadLogs(logger.ReadConfig{Tail: 100})
testMessage(t, lw, &m1)
testMessage(t, lw, &m2)
testMessage(t, lw, &m3)
testMessage(t, lw, &m4)
testMessage(t, lw, nil) // no more messages
})
}
func BenchmarkLogWrite(b *testing.B) {
f, err := ioutil.TempFile("", b.Name())
assert.Assert(b, err)
defer os.Remove(f.Name())
f.Close()
local, err := New(logger.Info{LogPath: f.Name()})
assert.Assert(b, err)
defer local.Close()
t := time.Now().UTC()
for _, data := range [][]byte{
[]byte(""),
[]byte("a short string"),
bytes.Repeat([]byte("a long string"), 100),
bytes.Repeat([]byte("a really long string"), 10000),
} {
b.Run(fmt.Sprintf("%d", len(data)), func(b *testing.B) {
entry := &logdriver.LogEntry{Line: data, Source: "stdout", TimeNano: t.UnixNano()}
b.SetBytes(int64(entry.Size() + encodeBinaryLen + encodeBinaryLen))
b.ResetTimer()
for i := 0; i < b.N; i++ {
msg := logger.NewMessage()
msg.Line = data
msg.Timestamp = t
msg.Source = "stdout"
if err := local.Log(msg); err != nil {
b.Fatal(err)
}
}
})
}
}
func copyLogMessage(src *logger.Message) *logger.Message {
dst := logger.NewMessage()
dst.Source = src.Source
dst.Timestamp = src.Timestamp
dst.Attrs = src.Attrs
dst.Err = src.Err
dst.Line = append(dst.Line, src.Line...)
if src.PLogMetaData != nil {
dst.PLogMetaData = &(*src.PLogMetaData)
}
return dst
}

174
daemon/logger/local/read.go Normal file
View file

@ -0,0 +1,174 @@
package local
import (
"context"
"encoding/binary"
"io"
"bytes"
"github.com/docker/docker/api/types/plugins/logdriver"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/daemon/logger/loggerutils"
"github.com/docker/docker/errdefs"
"github.com/pkg/errors"
)
func (d *driver) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
logWatcher := logger.NewLogWatcher()
go d.readLogs(logWatcher, config)
return logWatcher
}
func (d *driver) readLogs(watcher *logger.LogWatcher, config logger.ReadConfig) {
defer close(watcher.Msg)
d.mu.Lock()
d.readers[watcher] = struct{}{}
d.mu.Unlock()
d.logfile.ReadLogs(config, watcher)
d.mu.Lock()
delete(d.readers, watcher)
d.mu.Unlock()
}
func getTailReader(ctx context.Context, r loggerutils.SizeReaderAt, req int) (io.Reader, int, error) {
size := r.Size()
if req < 0 {
return nil, 0, errdefs.InvalidParameter(errors.Errorf("invalid number of lines to tail: %d", req))
}
if size < (encodeBinaryLen*2)+1 {
return bytes.NewReader(nil), 0, nil
}
const encodeBinaryLen64 = int64(encodeBinaryLen)
var found int
buf := make([]byte, encodeBinaryLen)
offset := size
for {
select {
case <-ctx.Done():
return nil, 0, ctx.Err()
default:
}
n, err := r.ReadAt(buf, offset-encodeBinaryLen64)
if err != nil && err != io.EOF {
return nil, 0, errors.Wrap(err, "error reading log message footer")
}
if n != encodeBinaryLen {
return nil, 0, errdefs.DataLoss(errors.New("unexpected number of bytes read from log message footer"))
}
msgLen := binary.BigEndian.Uint32(buf)
n, err = r.ReadAt(buf, offset-encodeBinaryLen64-encodeBinaryLen64-int64(msgLen))
if err != nil && err != io.EOF {
return nil, 0, errors.Wrap(err, "error reading log message header")
}
if n != encodeBinaryLen {
return nil, 0, errdefs.DataLoss(errors.New("unexpected number of bytes read from log message header"))
}
if msgLen != binary.BigEndian.Uint32(buf) {
return nil, 0, errdefs.DataLoss(errors.Wrap(err, "log message header and footer indicate different message sizes"))
}
found++
offset -= int64(msgLen)
offset -= encodeBinaryLen64 * 2
if found == req {
break
}
if offset <= 0 {
break
}
}
return io.NewSectionReader(r, offset, size), found, nil
}
func decodeFunc(rdr io.Reader) func() (*logger.Message, error) {
proto := &logdriver.LogEntry{}
buf := make([]byte, initialBufSize)
return func() (*logger.Message, error) {
var (
read int
err error
)
resetProto(proto)
for i := 0; i < maxDecodeRetry; i++ {
var n int
n, err = io.ReadFull(rdr, buf[read:encodeBinaryLen])
if err != nil {
if err != io.ErrUnexpectedEOF {
return nil, errors.Wrap(err, "error reading log message length")
}
read += n
continue
}
read += n
break
}
if err != nil {
return nil, errors.Wrapf(err, "could not read log message length: read: %d, expected: %d", read, encodeBinaryLen)
}
msgLen := int(binary.BigEndian.Uint32(buf[:read]))
if len(buf) < msgLen+encodeBinaryLen {
buf = make([]byte, msgLen+encodeBinaryLen)
} else {
if msgLen <= initialBufSize {
buf = buf[:initialBufSize]
} else {
buf = buf[:msgLen+encodeBinaryLen]
}
}
return decodeLogEntry(rdr, proto, buf, msgLen)
}
}
func decodeLogEntry(rdr io.Reader, proto *logdriver.LogEntry, buf []byte, msgLen int) (*logger.Message, error) {
var (
read int
err error
)
for i := 0; i < maxDecodeRetry; i++ {
var n int
n, err = io.ReadFull(rdr, buf[read:msgLen+encodeBinaryLen])
if err != nil {
if err != io.ErrUnexpectedEOF {
return nil, errors.Wrap(err, "could not decode log entry")
}
read += n
continue
}
break
}
if err != nil {
return nil, errors.Wrapf(err, "could not decode entry: read %d, expected: %d", read, msgLen)
}
if err := proto.Unmarshal(buf[:msgLen]); err != nil {
return nil, errors.Wrap(err, "error unmarshalling log entry")
}
msg := protoToMessage(proto)
if msg.PLogMetaData == nil {
msg.Line = append(msg.Line, '\n')
}
return msg, nil
}

View file

@ -1,7 +1,6 @@
package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutils"
import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
@ -15,11 +14,9 @@ import (
"time"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/daemon/logger/loggerutils/multireader"
"github.com/docker/docker/pkg/filenotify"
"github.com/docker/docker/pkg/pools"
"github.com/docker/docker/pkg/pubsub"
"github.com/docker/docker/pkg/tailfile"
"github.com/fsnotify/fsnotify"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
@ -93,13 +90,27 @@ type LogFile struct {
notifyRotate *pubsub.Publisher
marshal logger.MarshalFunc
createDecoder makeDecoderFunc
getTailReader GetTailReaderFunc
perms os.FileMode
}
type makeDecoderFunc func(rdr io.Reader) func() (*logger.Message, error)
// SizeReaderAt defines a ReaderAt that also reports its size.
// This is used for tailing log files.
type SizeReaderAt interface {
io.ReaderAt
Size() int64
}
// GetTailReaderFunc is used to truncate a reader to only read as much as is required
// in order to get the passed in number of log lines.
// It returns the sectioned reader, the number of lines that the section reader
// contains, and any error that occurs.
type GetTailReaderFunc func(ctx context.Context, f SizeReaderAt, nLogLines int) (rdr io.Reader, nLines int, err error)
// NewLogFile creates new LogFile
func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, marshaller logger.MarshalFunc, decodeFunc makeDecoderFunc, perms os.FileMode) (*LogFile, error) {
func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, marshaller logger.MarshalFunc, decodeFunc makeDecoderFunc, perms os.FileMode, getTailReader GetTailReaderFunc) (*LogFile, error) {
log, err := os.OpenFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, perms)
if err != nil {
return nil, err
@ -121,6 +132,7 @@ func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, mar
marshal: marshaller,
createDecoder: decodeFunc,
perms: perms,
getTailReader: getTailReader,
}, nil
}
@ -310,34 +322,46 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher)
}
if config.Tail != 0 {
// TODO(@cpuguy83): Instead of opening every file, only get the files which
// are needed to tail.
// This is especially costly when compression is enabled.
files, err := w.openRotatedFiles(config)
w.mu.RUnlock()
if err != nil {
w.mu.RUnlock()
watcher.Err <- err
return
}
w.mu.RUnlock()
seekers := make([]io.ReadSeeker, 0, len(files)+1)
for _, f := range files {
seekers = append(seekers, f)
}
if currentChunk.Size() > 0 {
seekers = append(seekers, currentChunk)
}
if len(seekers) > 0 {
tailFile(multireader.MultiReadSeeker(seekers...), watcher, w.createDecoder, config)
}
for _, f := range files {
f.Close()
fileName := f.Name()
if strings.HasSuffix(fileName, tmpLogfileSuffix) {
err := w.filesRefCounter.Dereference(fileName)
if err != nil {
logrus.Errorf("Failed to dereference log file %q: %v", fileName, err)
closeFiles := func() {
for _, f := range files {
f.Close()
fileName := f.Name()
if strings.HasSuffix(fileName, tmpLogfileSuffix) {
err := w.filesRefCounter.Dereference(fileName)
if err != nil {
logrus.Errorf("Failed to dereference the log file %q: %v", fileName, err)
}
}
}
}
readers := make([]SizeReaderAt, 0, len(files)+1)
for _, f := range files {
stat, err := f.Stat()
if err != nil {
watcher.Err <- errors.Wrap(err, "error reading size of rotated file")
closeFiles()
return
}
readers = append(readers, io.NewSectionReader(f, 0, stat.Size()))
}
if currentChunk.Size() > 0 {
readers = append(readers, currentChunk)
}
tailFiles(readers, watcher, w.createDecoder, w.getTailReader, config)
closeFiles()
w.mu.RLock()
}
@ -455,19 +479,39 @@ func newSectionReader(f *os.File) (*io.SectionReader, error) {
return io.NewSectionReader(f, 0, size), nil
}
type decodeFunc func() (*logger.Message, error)
func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, createDecoder makeDecoderFunc, getTailReader GetTailReaderFunc, config logger.ReadConfig) {
nLines := config.Tail
func tailFile(f io.ReadSeeker, watcher *logger.LogWatcher, createDecoder makeDecoderFunc, config logger.ReadConfig) {
var rdr io.Reader = f
if config.Tail > 0 {
ls, err := tailfile.TailFile(f, config.Tail)
if err != nil {
watcher.Err <- err
return
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// TODO(@cpuguy83): we should plumb a context through instead of dealing with `WatchClose()` here.
go func() {
select {
case <-ctx.Done():
case <-watcher.WatchClose():
cancel()
}
}()
readers := make([]io.Reader, 0, len(files))
if config.Tail > 0 {
for i := len(files) - 1; i >= 0 && nLines > 0; i-- {
tail, n, err := getTailReader(ctx, files[i], nLines)
if err != nil {
watcher.Err <- errors.Wrap(err, "error finding file position to start log tailing")
return
}
nLines -= n
readers = append([]io.Reader{tail}, readers...)
}
} else {
for _, r := range files {
readers = append(readers, &wrappedReaderAt{ReaderAt: r})
}
rdr = bytes.NewBuffer(bytes.Join(ls, []byte("\n")))
}
rdr := io.MultiReader(readers...)
decodeLogLine := createDecoder(rdr)
for {
msg, err := decodeLogLine()
@ -484,7 +528,7 @@ func tailFile(f io.ReadSeeker, watcher *logger.LogWatcher, createDecoder makeDec
return
}
select {
case <-watcher.WatchClose():
case <-ctx.Done():
return
case watcher.Msg <- msg:
}
@ -678,3 +722,14 @@ func watchFile(name string) (filenotify.FileWatcher, error) {
return fileWatcher, nil
}
type wrappedReaderAt struct {
io.ReaderAt
pos int64
}
func (r *wrappedReaderAt) Read(p []byte) (int, error) {
n, err := r.ReaderAt.ReadAt(p, r.pos)
r.pos += int64(n)
return n, err
}

View file

@ -0,0 +1,76 @@
package loggerutils
import (
"bufio"
"context"
"io"
"strings"
"testing"
"time"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/pkg/tailfile"
"gotest.tools/assert"
)
func TestTailFiles(t *testing.T) {
s1 := strings.NewReader("Hello.\nMy name is Inigo Montoya.\n")
s2 := strings.NewReader("I'm serious.\nDon't call me Shirley!\n")
s3 := strings.NewReader("Roads?\nWhere we're going we don't need roads.\n")
files := []SizeReaderAt{s1, s2, s3}
watcher := logger.NewLogWatcher()
createDecoder := func(r io.Reader) func() (*logger.Message, error) {
scanner := bufio.NewScanner(r)
return func() (*logger.Message, error) {
if !scanner.Scan() {
return nil, scanner.Err()
}
// some comment
return &logger.Message{Line: scanner.Bytes(), Timestamp: time.Now()}, nil
}
}
tailReader := func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) {
return tailfile.NewTailReader(ctx, r, lines)
}
for desc, config := range map[string]logger.ReadConfig{} {
t.Run(desc, func(t *testing.T) {
started := make(chan struct{})
go func() {
close(started)
tailFiles(files, watcher, createDecoder, tailReader, config)
}()
<-started
})
}
config := logger.ReadConfig{Tail: 2}
started := make(chan struct{})
go func() {
close(started)
tailFiles(files, watcher, createDecoder, tailReader, config)
}()
<-started
select {
case <-time.After(60 * time.Second):
t.Fatal("timeout waiting for tail line")
case err := <-watcher.Err:
assert.Assert(t, err)
case msg := <-watcher.Msg:
assert.Assert(t, msg != nil)
assert.Assert(t, string(msg.Line) == "Roads?", string(msg.Line))
}
select {
case <-time.After(60 * time.Second):
t.Fatal("timeout waiting for tail line")
case err := <-watcher.Err:
assert.Assert(t, err)
case msg := <-watcher.Msg:
assert.Assert(t, msg != nil)
assert.Assert(t, string(msg.Line) == "Where we're going we don't need roads.", string(msg.Line))
}
}

View file

@ -3,7 +3,9 @@
package tailfile // import "github.com/docker/docker/pkg/tailfile"
import (
"bufio"
"bytes"
"context"
"errors"
"io"
"os"
@ -16,51 +18,205 @@ var eol = []byte("\n")
// ErrNonPositiveLinesNumber is an error returned if the lines number was negative.
var ErrNonPositiveLinesNumber = errors.New("The number of lines to extract from the file must be positive")
//TailFile returns last n lines of reader f (could be a nil).
func TailFile(f io.ReadSeeker, n int) ([][]byte, error) {
if n <= 0 {
return nil, ErrNonPositiveLinesNumber
}
size, err := f.Seek(0, os.SEEK_END)
//TailFile returns last n lines of the passed in file.
func TailFile(f *os.File, n int) ([][]byte, error) {
size, err := f.Seek(0, io.SeekEnd)
if err != nil {
return nil, err
}
block := -1
var data []byte
var cnt int
for {
var b []byte
step := int64(block * blockSize)
left := size + step // how many bytes to beginning
if left < 0 {
if _, err := f.Seek(0, os.SEEK_SET); err != nil {
return nil, err
}
b = make([]byte, blockSize+left)
if _, err := f.Read(b); err != nil {
return nil, err
}
data = append(b, data...)
break
} else {
b = make([]byte, blockSize)
if _, err := f.Seek(left, os.SEEK_SET); err != nil {
return nil, err
}
if _, err := f.Read(b); err != nil {
return nil, err
}
data = append(b, data...)
}
cnt += bytes.Count(b, eol)
if cnt > n {
break
}
block--
rAt := io.NewSectionReader(f, 0, size)
r, nLines, err := NewTailReader(context.Background(), rAt, n)
if err != nil {
return nil, err
}
lines := bytes.Split(data, eol)
if n < len(lines) {
return lines[len(lines)-n-1 : len(lines)-1], nil
buf := make([][]byte, 0, nLines)
scanner := bufio.NewScanner(r)
for scanner.Scan() {
buf = append(buf, scanner.Bytes())
}
return buf, nil
}
// SizeReaderAt is an interface used to get a ReaderAt as well as the size of the underlying reader.
// Note that the size of the underlying reader should not change when using this interface.
type SizeReaderAt interface {
io.ReaderAt
Size() int64
}
// NewTailReader scopes the passed in reader to just the last N lines passed in
func NewTailReader(ctx context.Context, r SizeReaderAt, reqLines int) (io.Reader, int, error) {
return NewTailReaderWithDelimiter(ctx, r, reqLines, eol)
}
// NewTailReaderWithDelimiter scopes the passed in reader to just the last N lines passed in
// In this case a "line" is defined by the passed in delimiter.
//
// Delimiter lengths should be generally small, no more than 12 bytes
func NewTailReaderWithDelimiter(ctx context.Context, r SizeReaderAt, reqLines int, delimiter []byte) (io.Reader, int, error) {
if reqLines < 1 {
return nil, 0, ErrNonPositiveLinesNumber
}
if len(delimiter) == 0 {
return nil, 0, errors.New("must provide a delimiter")
}
var (
size = r.Size()
tailStart int64
tailEnd = size
found int
)
if int64(len(delimiter)) >= size {
return bytes.NewReader(nil), 0, nil
}
scanner := newScanner(r, delimiter)
for scanner.Scan(ctx) {
if err := scanner.Err(); err != nil {
return nil, 0, scanner.Err()
}
found++
if found == 1 {
tailEnd = scanner.End()
}
if found == reqLines {
break
}
}
tailStart = scanner.Start(ctx)
if found == 0 {
return bytes.NewReader(nil), 0, nil
}
if found < reqLines && tailStart != 0 {
tailStart = 0
}
return io.NewSectionReader(r, tailStart, tailEnd-tailStart), found, nil
}
func newScanner(r SizeReaderAt, delim []byte) *scanner {
size := r.Size()
readSize := blockSize
if readSize > int(size) {
readSize = int(size)
}
// silly case...
if len(delim) >= readSize/2 {
readSize = len(delim)*2 + 2
}
return &scanner{
r: r,
pos: size,
buf: make([]byte, readSize),
delim: delim,
}
}
type scanner struct {
r SizeReaderAt
pos int64
buf []byte
delim []byte
err error
idx int
done bool
}
func (s *scanner) Start(ctx context.Context) int64 {
if s.idx > 0 {
idx := bytes.LastIndex(s.buf[:s.idx], s.delim)
if idx >= 0 {
return s.pos + int64(idx) + int64(len(s.delim))
}
}
// slow path
buf := make([]byte, len(s.buf))
copy(buf, s.buf)
readAhead := &scanner{
r: s.r,
pos: s.pos,
delim: s.delim,
idx: s.idx,
buf: buf,
}
if !readAhead.Scan(ctx) {
return 0
}
return readAhead.End()
}
func (s *scanner) End() int64 {
return s.pos + int64(s.idx) + int64(len(s.delim))
}
func (s *scanner) Err() error {
return s.err
}
func (s *scanner) Scan(ctx context.Context) bool {
if s.err != nil {
return false
}
for {
select {
case <-ctx.Done():
s.err = ctx.Err()
return false
default:
}
idx := s.idx - len(s.delim)
if idx < 0 {
readSize := int(s.pos)
if readSize > len(s.buf) {
readSize = len(s.buf)
}
if readSize < len(s.delim) {
return false
}
offset := s.pos - int64(readSize)
n, err := s.r.ReadAt(s.buf[:readSize], offset)
if err != nil && err != io.EOF {
s.err = err
return false
}
s.pos -= int64(n)
idx = n
}
s.idx = bytes.LastIndex(s.buf[:idx], s.delim)
if s.idx >= 0 {
return true
}
if len(s.delim) > 1 && s.pos > 0 {
// in this case, there may be a partial delimiter at the front of the buffer, so set the position forward
// up to the maximum size partial that could be there so it can be read again in the next iteration with any
// potential remainder.
// An example where delimiter is `####`:
// [##asdfqwerty]
// ^
// This resets the position to where the arrow is pointing.
// It could actually check if a partial exists and at the front, but that is pretty similar to the indexing
// code above though a bit more complex since each byte has to be checked (`len(delimiter)-1`) factorial).
// It's much simpler and cleaner to just re-read `len(delimiter)-1` bytes again.
s.pos += int64(len(s.delim)) - 1
}
}
return lines[:len(lines)-1], nil
}

View file

@ -1,9 +1,17 @@
package tailfile // import "github.com/docker/docker/pkg/tailfile"
import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"os"
"strings"
"testing"
"gotest.tools/assert"
)
func TestTailFile(t *testing.T) {
@ -42,7 +50,7 @@ truncated line`)
if _, err := f.Write(testFile); err != nil {
t.Fatal(err)
}
if _, err := f.Seek(0, os.SEEK_SET); err != nil {
if _, err := f.Seek(0, io.SeekStart); err != nil {
t.Fatal(err)
}
expected := []string{"last fourth line", "last fifth line"}
@ -50,10 +58,12 @@ truncated line`)
if err != nil {
t.Fatal(err)
}
if len(res) != len(expected) {
t.Fatalf("\nexpected:\n%s\n\nactual:\n%s", expected, res)
}
for i, l := range res {
t.Logf("%s", l)
if expected[i] != string(l) {
t.Fatalf("Expected line %s, got %s", expected[i], l)
t.Fatalf("Expected line %q, got %q", expected[i], l)
}
}
}
@ -71,7 +81,7 @@ truncated line`)
if _, err := f.Write(testFile); err != nil {
t.Fatal(err)
}
if _, err := f.Seek(0, os.SEEK_SET); err != nil {
if _, err := f.Seek(0, io.SeekStart); err != nil {
t.Fatal(err)
}
expected := []string{"first line", "second line"}
@ -79,8 +89,10 @@ truncated line`)
if err != nil {
t.Fatal(err)
}
if len(expected) != len(res) {
t.Fatalf("\nexpected:\n%s\n\nactual:\n%s", expected, res)
}
for i, l := range res {
t.Logf("%s", l)
if expected[i] != string(l) {
t.Fatalf("Expected line %s, got %s", expected[i], l)
}
@ -116,11 +128,11 @@ truncated line`)
if _, err := f.Write(testFile); err != nil {
t.Fatal(err)
}
if _, err := f.Seek(0, os.SEEK_SET); err != nil {
if _, err := f.Seek(0, io.SeekStart); err != nil {
t.Fatal(err)
}
if _, err := TailFile(f, -1); err != ErrNonPositiveLinesNumber {
t.Fatalf("Expected ErrNonPositiveLinesNumber, got %s", err)
t.Fatalf("Expected ErrNonPositiveLinesNumber, got %v", err)
}
if _, err := TailFile(f, 0); err != ErrNonPositiveLinesNumber {
t.Fatalf("Expected ErrNonPositiveLinesNumber, got %s", err)
@ -146,3 +158,170 @@ func BenchmarkTail(b *testing.B) {
}
}
}
func TestNewTailReader(t *testing.T) {
t.Parallel()
ctx := context.Background()
for dName, delim := range map[string][]byte{
"no delimiter": {},
"single byte delimiter": {'\n'},
"2 byte delimiter": []byte(";\n"),
"4 byte delimiter": []byte("####"),
"8 byte delimiter": []byte("########"),
"12 byte delimiter": []byte("############"),
} {
t.Run(dName, func(t *testing.T) {
delim := delim
t.Parallel()
s1 := "Hello world."
s2 := "Today is a fine day."
s3 := "So long, and thanks for all the fish!"
s4 := strings.Repeat("a", blockSize/2) // same as block size
s5 := strings.Repeat("a", blockSize) // just to make sure
s6 := strings.Repeat("a", blockSize*2) // bigger than block size
s7 := strings.Repeat("a", blockSize-1) // single line same as block
s8 := `{"log":"Don't panic!\n","stream":"stdout","time":"2018-04-04T20:28:44.7207062Z"}`
jsonTest := make([]string, 0, 20)
for i := 0; i < 20; i++ {
jsonTest = append(jsonTest, s8)
}
for _, test := range []struct {
desc string
data []string
}{
{desc: "one small entry", data: []string{s1}},
{desc: "several small entries", data: []string{s1, s2, s3}},
{desc: "various sizes", data: []string{s1, s2, s3, s4, s5, s1, s2, s3, s7, s6}},
{desc: "multiple lines with one more than block", data: []string{s5, s5, s5, s5, s5}},
{desc: "multiple lines much bigger than block", data: []string{s6, s6, s6, s6, s6}},
{desc: "multiple lines same as block", data: []string{s4, s4, s4, s4, s4}},
{desc: "single line same as block", data: []string{s7}},
{desc: "single line half block", data: []string{s4}},
{desc: "single line twice block", data: []string{s6}},
{desc: "json encoded values", data: jsonTest},
{desc: "no lines", data: []string{}},
{desc: "same length as delimiter", data: []string{strings.Repeat("a", len(delim))}},
} {
t.Run(test.desc, func(t *testing.T) {
test := test
t.Parallel()
max := len(test.data)
if max > 10 {
max = 10
}
s := strings.Join(test.data, string(delim))
if len(test.data) > 0 {
s += string(delim)
}
for i := 1; i <= max; i++ {
t.Run(fmt.Sprintf("%d lines", i), func(t *testing.T) {
i := i
t.Parallel()
r := strings.NewReader(s)
tr, lines, err := NewTailReaderWithDelimiter(ctx, r, i, delim)
if len(delim) == 0 {
assert.Assert(t, err != nil)
assert.Assert(t, lines == 0)
return
}
assert.Assert(t, err)
assert.Check(t, lines == i, "%d -- %d", lines, i)
b, err := ioutil.ReadAll(tr)
assert.Assert(t, err)
expectLines := test.data[len(test.data)-i:]
assert.Check(t, len(expectLines) == i)
expect := strings.Join(expectLines, string(delim)) + string(delim)
assert.Check(t, string(b) == expect, "\n%v\n%v", b, []byte(expect))
})
}
t.Run("request more lines than available", func(t *testing.T) {
t.Parallel()
r := strings.NewReader(s)
tr, lines, err := NewTailReaderWithDelimiter(ctx, r, len(test.data)*2, delim)
if len(delim) == 0 {
assert.Assert(t, err != nil)
assert.Assert(t, lines == 0)
return
}
if len(test.data) == 0 {
assert.Assert(t, err == ErrNonPositiveLinesNumber, err)
return
}
assert.Assert(t, err)
assert.Check(t, lines == len(test.data), "%d -- %d", lines, len(test.data))
b, err := ioutil.ReadAll(tr)
assert.Assert(t, err)
assert.Check(t, bytes.Equal(b, []byte(s)), "\n%v\n%v", b, []byte(s))
})
})
}
})
}
t.Run("truncated last line", func(t *testing.T) {
t.Run("more than available", func(t *testing.T) {
tail, nLines, err := NewTailReader(ctx, strings.NewReader("a\nb\nextra"), 3)
assert.Assert(t, err)
assert.Check(t, nLines == 2, nLines)
rdr := bufio.NewReader(tail)
data, _, err := rdr.ReadLine()
assert.Assert(t, err)
assert.Check(t, string(data) == "a", string(data))
data, _, err = rdr.ReadLine()
assert.Assert(t, err)
assert.Check(t, string(data) == "b", string(data))
_, _, err = rdr.ReadLine()
assert.Assert(t, err == io.EOF, err)
})
})
t.Run("truncated last line", func(t *testing.T) {
t.Run("exact", func(t *testing.T) {
tail, nLines, err := NewTailReader(ctx, strings.NewReader("a\nb\nextra"), 2)
assert.Assert(t, err)
assert.Check(t, nLines == 2, nLines)
rdr := bufio.NewReader(tail)
data, _, err := rdr.ReadLine()
assert.Assert(t, err)
assert.Check(t, string(data) == "a", string(data))
data, _, err = rdr.ReadLine()
assert.Assert(t, err)
assert.Check(t, string(data) == "b", string(data))
_, _, err = rdr.ReadLine()
assert.Assert(t, err == io.EOF, err)
})
})
t.Run("truncated last line", func(t *testing.T) {
t.Run("one line", func(t *testing.T) {
tail, nLines, err := NewTailReader(ctx, strings.NewReader("a\nb\nextra"), 1)
assert.Assert(t, err)
assert.Check(t, nLines == 1, nLines)
rdr := bufio.NewReader(tail)
data, _, err := rdr.ReadLine()
assert.Assert(t, err)
assert.Check(t, string(data) == "b", string(data))
_, _, err = rdr.ReadLine()
assert.Assert(t, err == io.EOF, err)
})
})
}