Support configuration of log cacher.
Configuration over the API per container is intentionally left out for the time being, but is supported to configure the default from the daemon config. Signed-off-by: Brian Goff <cpuguy83@gmail.com> (cherry picked from commit cbecf48bc352e680a5390a7ca9cff53098cd16d7) Signed-off-by: Madhu Venugopal <madhu@docker.com>
This commit is contained in:
parent
e2ceb83a53
commit
750f0d1648
11 changed files with 282 additions and 54 deletions
|
@ -423,18 +423,21 @@ func (container *Container) StartLogger() (logger.Logger, error) {
|
|||
}
|
||||
|
||||
if _, ok := l.(logger.LogReader); !ok {
|
||||
logPath, err := container.GetRootResourcePath("container-cached.log")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
info.LogPath = logPath
|
||||
if cache.ShouldUseCache(cfg.Config) {
|
||||
logPath, err := container.GetRootResourcePath("container-cached.log")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !container.LocalLogCacheMeta.HaveNotifyEnabled {
|
||||
logrus.WithField("container", container.ID).Info("Configured log driver does not support reads, enabling local file cache for container logs")
|
||||
}
|
||||
l, err = cache.WithLocalCache(l, info)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error setting up local container log cache")
|
||||
if !container.LocalLogCacheMeta.HaveNotifyEnabled {
|
||||
logrus.WithField("container", container.ID).WithField("driver", container.HostConfig.LogConfig.Type).Info("Configured log driver does not support reads, enabling local file cache for container logs")
|
||||
container.LocalLogCacheMeta.HaveNotifyEnabled = true
|
||||
}
|
||||
info.LogPath = logPath
|
||||
l, err = cache.WithLocalCache(l, info)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error setting up local container log cache")
|
||||
}
|
||||
}
|
||||
}
|
||||
return l, nil
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
_ "github.com/docker/docker/daemon/logger/jsonfilelog"
|
||||
_ "github.com/docker/docker/daemon/logger/local"
|
||||
_ "github.com/docker/docker/daemon/logger/logentries"
|
||||
_ "github.com/docker/docker/daemon/logger/loggerutils/cache"
|
||||
_ "github.com/docker/docker/daemon/logger/splunk"
|
||||
_ "github.com/docker/docker/daemon/logger/syslog"
|
||||
)
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
_ "github.com/docker/docker/daemon/logger/gelf"
|
||||
_ "github.com/docker/docker/daemon/logger/jsonfilelog"
|
||||
_ "github.com/docker/docker/daemon/logger/logentries"
|
||||
_ "github.com/docker/docker/daemon/logger/loggerutils/cache"
|
||||
_ "github.com/docker/docker/daemon/logger/splunk"
|
||||
_ "github.com/docker/docker/daemon/logger/syslog"
|
||||
)
|
||||
|
|
|
@ -143,6 +143,10 @@ func ValidateLogOpts(name string, cfg map[string]string) error {
|
|||
}
|
||||
}
|
||||
|
||||
if err := validateExternal(cfg); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !factory.driverRegistered(name) {
|
||||
return fmt.Errorf("logger: no log driver named '%s' is registered", name)
|
||||
}
|
||||
|
|
29
daemon/logger/log_cache_opts.go
Normal file
29
daemon/logger/log_cache_opts.go
Normal file
|
@ -0,0 +1,29 @@
|
|||
package logger
|
||||
|
||||
var externalValidators []LogOptValidator
|
||||
|
||||
// RegisterExternalValidator adds the validator to the list of external validators.
|
||||
// External validators are used by packages outside this package that need to add their own validation logic.
|
||||
// This should only be called on package initialization.
|
||||
func RegisterExternalValidator(v LogOptValidator) {
|
||||
externalValidators = append(externalValidators, v)
|
||||
}
|
||||
|
||||
// AddBuiltinLogOpts updates the list of built-in log opts. This allows other packages to supplement additional log options
|
||||
// without having to register an actual log driver. This is used by things that are more proxy log drivers and should
|
||||
// not be exposed as a usable log driver to the API.
|
||||
// This should only be called on package initialization.
|
||||
func AddBuiltinLogOpts(opts map[string]bool) {
|
||||
for k, v := range opts {
|
||||
builtInLogOpts[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
func validateExternal(cfg map[string]string) error {
|
||||
for _, v := range externalValidators {
|
||||
if err := v(cfg); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
61
daemon/logger/loggerutils/cache/local_cache.go
vendored
61
daemon/logger/loggerutils/cache/local_cache.go
vendored
|
@ -1,22 +1,55 @@
|
|||
package cache // import "github.com/docker/docker/daemon/logger/loggerutils/cache"
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
|
||||
"github.com/docker/docker/api/types/container"
|
||||
"github.com/docker/docker/daemon/logger"
|
||||
"github.com/docker/docker/daemon/logger/local"
|
||||
units "github.com/docker/go-units"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const (
|
||||
// DriverName is the name of the driver used for local log caching
|
||||
DriverName = local.Name
|
||||
|
||||
cachePrefix = "cache-"
|
||||
cacheDisabledKey = cachePrefix + "disabled"
|
||||
)
|
||||
|
||||
var builtInCacheLogOpts = map[string]bool{
|
||||
cacheDisabledKey: true,
|
||||
}
|
||||
|
||||
// WithLocalCache wraps the passed in logger with a logger caches all writes locally
|
||||
// in addition to writing to the passed in logger.
|
||||
func WithLocalCache(l logger.Logger, logInfo logger.Info) (logger.Logger, error) {
|
||||
localLogger, err := local.New(logInfo)
|
||||
func WithLocalCache(l logger.Logger, info logger.Info) (logger.Logger, error) {
|
||||
initLogger, err := logger.GetLogDriver(DriverName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cacher, err := initLogger(info)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error initializing local log cache driver")
|
||||
}
|
||||
|
||||
if info.Config["mode"] == container.LogModeUnset || container.LogMode(info.Config["mode"]) == container.LogModeNonBlock {
|
||||
var size int64 = -1
|
||||
if s, exists := info.Config["max-buffer-size"]; exists {
|
||||
size, err = units.RAMInBytes(s)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
cacher = logger.NewRingLogger(cacher, info, size)
|
||||
}
|
||||
|
||||
return &loggerWithCache{
|
||||
l: l,
|
||||
// TODO(@cpuguy83): Should this be configurable?
|
||||
cache: logger.NewRingLogger(localLogger, logInfo, -1),
|
||||
l: l,
|
||||
cache: cacher,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -26,9 +59,10 @@ type loggerWithCache struct {
|
|||
}
|
||||
|
||||
func (l *loggerWithCache) Log(msg *logger.Message) error {
|
||||
// copy the message since the underlying logger will return the passed in message to the message pool
|
||||
// copy the message as the original will be reset once the call to `Log` is complete
|
||||
dup := logger.NewMessage()
|
||||
dumbCopyMessage(dup, msg)
|
||||
|
||||
if err := l.l.Log(msg); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -51,6 +85,19 @@ func (l *loggerWithCache) Close() error {
|
|||
return err
|
||||
}
|
||||
|
||||
// ShouldUseCache reads the log opts to determine if caching should be enabled
|
||||
func ShouldUseCache(cfg map[string]string) bool {
|
||||
if cfg[cacheDisabledKey] == "" {
|
||||
return true
|
||||
}
|
||||
b, err := strconv.ParseBool(cfg[cacheDisabledKey])
|
||||
if err != nil {
|
||||
// This shouldn't happen since the values are validated before hand.
|
||||
return false
|
||||
}
|
||||
return !b
|
||||
}
|
||||
|
||||
// dumbCopyMessage is a bit of a fake copy but avoids extra allocations which
|
||||
// are not necessary for this use case.
|
||||
func dumbCopyMessage(dst, src *logger.Message) {
|
||||
|
@ -59,5 +106,5 @@ func dumbCopyMessage(dst, src *logger.Message) {
|
|||
dst.PLogMetaData = src.PLogMetaData
|
||||
dst.Err = src.Err
|
||||
dst.Attrs = src.Attrs
|
||||
dst.Line = src.Line
|
||||
dst.Line = append(dst.Line[:0], src.Line...)
|
||||
}
|
||||
|
|
81
daemon/logger/loggerutils/cache/log_cache_test.go
vendored
Normal file
81
daemon/logger/loggerutils/cache/log_cache_test.go
vendored
Normal file
|
@ -0,0 +1,81 @@
|
|||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"time"
|
||||
|
||||
"bytes"
|
||||
|
||||
"github.com/docker/docker/daemon/logger"
|
||||
"gotest.tools/v3/assert"
|
||||
"gotest.tools/v3/assert/cmp"
|
||||
)
|
||||
|
||||
type fakeLogger struct {
|
||||
messages chan logger.Message
|
||||
close chan struct{}
|
||||
}
|
||||
|
||||
func (l *fakeLogger) Log(msg *logger.Message) error {
|
||||
select {
|
||||
case l.messages <- *msg:
|
||||
case <-l.close:
|
||||
}
|
||||
logger.PutMessage(msg)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *fakeLogger) Name() string {
|
||||
return "fake"
|
||||
}
|
||||
|
||||
func (l *fakeLogger) Close() error {
|
||||
close(l.close)
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestLog(t *testing.T) {
|
||||
cacher := &fakeLogger{make(chan logger.Message), make(chan struct{})}
|
||||
l := &loggerWithCache{
|
||||
l: &fakeLogger{make(chan logger.Message, 100), make(chan struct{})},
|
||||
cache: cacher,
|
||||
}
|
||||
defer l.Close()
|
||||
|
||||
var messages []logger.Message
|
||||
for i := 0; i < 100; i++ {
|
||||
messages = append(messages, logger.Message{
|
||||
Timestamp: time.Now(),
|
||||
Line: append(bytes.Repeat([]byte("a"), 100), '\n'),
|
||||
})
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
|
||||
defer cancel()
|
||||
|
||||
go func() {
|
||||
for _, msg := range messages {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
m := logger.NewMessage()
|
||||
dumbCopyMessage(m, &msg)
|
||||
l.Log(m)
|
||||
}
|
||||
}()
|
||||
|
||||
for _, m := range messages {
|
||||
var msg logger.Message
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
t.Fatal("timed out waiting for messages... this is probably a test implementation error")
|
||||
case msg = <-cacher.messages:
|
||||
assert.Assert(t, cmp.DeepEqual(msg, m))
|
||||
}
|
||||
}
|
||||
}
|
40
daemon/logger/loggerutils/cache/validate.go
vendored
Normal file
40
daemon/logger/loggerutils/cache/validate.go
vendored
Normal file
|
@ -0,0 +1,40 @@
|
|||
package cache
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
|
||||
"github.com/docker/docker/daemon/logger"
|
||||
"github.com/docker/docker/daemon/logger/local"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func init() {
|
||||
for k, v := range local.LogOptKeys {
|
||||
builtInCacheLogOpts[cachePrefix+k] = v
|
||||
}
|
||||
logger.AddBuiltinLogOpts(builtInCacheLogOpts)
|
||||
logger.RegisterExternalValidator(validateLogCacheOpts)
|
||||
}
|
||||
|
||||
func validateLogCacheOpts(cfg map[string]string) error {
|
||||
if v := cfg[cacheDisabledKey]; v != "" {
|
||||
_, err := strconv.ParseBool(v)
|
||||
if err != nil {
|
||||
return errors.Errorf("invalid value for option %s: %s", cacheDisabledKey, cfg[cacheDisabledKey])
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// MergeDefaultLogConfig reads the default log opts and makes sure that any caching related keys that exist there are
|
||||
// added to dst.
|
||||
func MergeDefaultLogConfig(dst, defaults map[string]string) {
|
||||
for k, v := range defaults {
|
||||
if !builtInCacheLogOpts[k] {
|
||||
continue
|
||||
}
|
||||
if _, exists := dst[k]; !exists {
|
||||
dst[k] = v
|
||||
}
|
||||
}
|
||||
}
|
|
@ -11,6 +11,7 @@ import (
|
|||
timetypes "github.com/docker/docker/api/types/time"
|
||||
"github.com/docker/docker/container"
|
||||
"github.com/docker/docker/daemon/logger"
|
||||
logcache "github.com/docker/docker/daemon/logger/loggerutils/cache"
|
||||
"github.com/docker/docker/errdefs"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
@ -190,6 +191,8 @@ func (daemon *Daemon) mergeAndVerifyLogConfig(cfg *containertypes.LogConfig) err
|
|||
}
|
||||
}
|
||||
|
||||
logcache.MergeDefaultLogConfig(cfg.Config, daemon.defaultLogConfig.Config)
|
||||
|
||||
return logger.ValidateLogOpts(cfg.Type, cfg.Config)
|
||||
}
|
||||
|
||||
|
@ -204,6 +207,7 @@ func (daemon *Daemon) setupDefaultLogConfig() error {
|
|||
Type: config.LogConfig.Type,
|
||||
Config: config.LogConfig.Config,
|
||||
}
|
||||
|
||||
logrus.Debugf("Using default logging driver %s", daemon.defaultLogConfig.Type)
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -62,6 +62,7 @@ func handle(mux *http.ServeMux) {
|
|||
if f := d.logs[req.File]; f != nil {
|
||||
f.Close()
|
||||
}
|
||||
d.mu.Unlock()
|
||||
respond(nil, w)
|
||||
})
|
||||
|
||||
|
|
|
@ -29,45 +29,62 @@ func TestReadPluginNoRead(t *testing.T) {
|
|||
createPlugin(t, client, "test", "discard", asLogDriver)
|
||||
|
||||
ctx := context.Background()
|
||||
defer func() {
|
||||
err = client.PluginRemove(ctx, "test", types.PluginRemoveOptions{Force: true})
|
||||
assert.Check(t, err)
|
||||
}()
|
||||
|
||||
err = client.PluginEnable(ctx, "test", types.PluginEnableOptions{Timeout: 30})
|
||||
assert.Check(t, err)
|
||||
d.Stop(t)
|
||||
|
||||
c, err := client.ContainerCreate(ctx,
|
||||
&container.Config{
|
||||
Image: "busybox",
|
||||
Cmd: []string{"/bin/echo", "hello world"},
|
||||
},
|
||||
&container.HostConfig{LogConfig: container.LogConfig{Type: "test"}},
|
||||
nil,
|
||||
"",
|
||||
)
|
||||
assert.Assert(t, err)
|
||||
|
||||
err = client.ContainerStart(ctx, c.ID, types.ContainerStartOptions{})
|
||||
assert.Assert(t, err)
|
||||
|
||||
logs, err := client.ContainerLogs(ctx, c.ID, types.ContainerLogsOptions{ShowStdout: true})
|
||||
assert.Assert(t, err)
|
||||
defer logs.Close()
|
||||
|
||||
buf := bytes.NewBuffer(nil)
|
||||
|
||||
errCh := make(chan error)
|
||||
go func() {
|
||||
_, err := stdcopy.StdCopy(buf, buf, logs)
|
||||
errCh <- err
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-time.After(60 * time.Second):
|
||||
t.Fatal("timeout waiting for IO to complete")
|
||||
case err := <-errCh:
|
||||
assert.Assert(t, err)
|
||||
cfg := &container.Config{
|
||||
Image: "busybox",
|
||||
Cmd: []string{"/bin/echo", "hello world"},
|
||||
}
|
||||
assert.Assert(t, strings.TrimSpace(buf.String()) == "hello world", buf.Bytes())
|
||||
for desc, test := range map[string]struct {
|
||||
dOpts []string
|
||||
logsSupported bool
|
||||
}{
|
||||
"default": {logsSupported: true},
|
||||
"disabled caching": {[]string{"--log-opt=cache-disabled=true"}, false},
|
||||
"explicitly enabled caching": {[]string{"--log-opt=cache-disabled=false"}, true},
|
||||
} {
|
||||
t.Run(desc, func(t *testing.T) {
|
||||
d.Start(t, append([]string{"--iptables=false"}, test.dOpts...)...)
|
||||
defer d.Stop(t)
|
||||
c, err := client.ContainerCreate(ctx,
|
||||
cfg,
|
||||
&container.HostConfig{LogConfig: container.LogConfig{Type: "test"}},
|
||||
nil,
|
||||
"",
|
||||
)
|
||||
assert.Assert(t, err)
|
||||
defer client.ContainerRemove(ctx, c.ID, types.ContainerRemoveOptions{Force: true})
|
||||
|
||||
err = client.ContainerStart(ctx, c.ID, types.ContainerStartOptions{})
|
||||
assert.Assert(t, err)
|
||||
|
||||
logs, err := client.ContainerLogs(ctx, c.ID, types.ContainerLogsOptions{ShowStdout: true})
|
||||
if !test.logsSupported {
|
||||
assert.Assert(t, err != nil)
|
||||
return
|
||||
}
|
||||
assert.Assert(t, err)
|
||||
defer logs.Close()
|
||||
|
||||
buf := bytes.NewBuffer(nil)
|
||||
|
||||
errCh := make(chan error)
|
||||
go func() {
|
||||
_, err := stdcopy.StdCopy(buf, buf, logs)
|
||||
errCh <- err
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-time.After(60 * time.Second):
|
||||
t.Fatal("timeout waiting for IO to complete")
|
||||
case err := <-errCh:
|
||||
assert.Assert(t, err)
|
||||
}
|
||||
assert.Assert(t, strings.TrimSpace(buf.String()) == "hello world", buf.Bytes())
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue