diff --git a/container/container.go b/container/container.go index c2b04c2161..d7d6222b21 100644 --- a/container/container.go +++ b/container/container.go @@ -423,18 +423,21 @@ func (container *Container) StartLogger() (logger.Logger, error) { } if _, ok := l.(logger.LogReader); !ok { - logPath, err := container.GetRootResourcePath("container-cached.log") - if err != nil { - return nil, err - } - info.LogPath = logPath + if cache.ShouldUseCache(cfg.Config) { + logPath, err := container.GetRootResourcePath("container-cached.log") + if err != nil { + return nil, err + } - if !container.LocalLogCacheMeta.HaveNotifyEnabled { - logrus.WithField("container", container.ID).Info("Configured log driver does not support reads, enabling local file cache for container logs") - } - l, err = cache.WithLocalCache(l, info) - if err != nil { - return nil, errors.Wrap(err, "error setting up local container log cache") + if !container.LocalLogCacheMeta.HaveNotifyEnabled { + logrus.WithField("container", container.ID).WithField("driver", container.HostConfig.LogConfig.Type).Info("Configured log driver does not support reads, enabling local file cache for container logs") + container.LocalLogCacheMeta.HaveNotifyEnabled = true + } + info.LogPath = logPath + l, err = cache.WithLocalCache(l, info) + if err != nil { + return nil, errors.Wrap(err, "error setting up local container log cache") + } } } return l, nil diff --git a/daemon/logdrivers_linux.go b/daemon/logdrivers_linux.go index 67154a7a98..425f412b20 100644 --- a/daemon/logdrivers_linux.go +++ b/daemon/logdrivers_linux.go @@ -11,6 +11,7 @@ import ( _ "github.com/docker/docker/daemon/logger/jsonfilelog" _ "github.com/docker/docker/daemon/logger/local" _ "github.com/docker/docker/daemon/logger/logentries" + _ "github.com/docker/docker/daemon/logger/loggerutils/cache" _ "github.com/docker/docker/daemon/logger/splunk" _ "github.com/docker/docker/daemon/logger/syslog" ) diff --git a/daemon/logdrivers_windows.go b/daemon/logdrivers_windows.go index 425a4d89b6..6c9d97f785 100644 --- a/daemon/logdrivers_windows.go +++ b/daemon/logdrivers_windows.go @@ -10,6 +10,7 @@ import ( _ "github.com/docker/docker/daemon/logger/gelf" _ "github.com/docker/docker/daemon/logger/jsonfilelog" _ "github.com/docker/docker/daemon/logger/logentries" + _ "github.com/docker/docker/daemon/logger/loggerutils/cache" _ "github.com/docker/docker/daemon/logger/splunk" _ "github.com/docker/docker/daemon/logger/syslog" ) diff --git a/daemon/logger/factory.go b/daemon/logger/factory.go index 9723f7fc0c..d0bedac4ad 100644 --- a/daemon/logger/factory.go +++ b/daemon/logger/factory.go @@ -143,6 +143,10 @@ func ValidateLogOpts(name string, cfg map[string]string) error { } } + if err := validateExternal(cfg); err != nil { + return err + } + if !factory.driverRegistered(name) { return fmt.Errorf("logger: no log driver named '%s' is registered", name) } diff --git a/daemon/logger/log_cache_opts.go b/daemon/logger/log_cache_opts.go new file mode 100644 index 0000000000..8d09c489ed --- /dev/null +++ b/daemon/logger/log_cache_opts.go @@ -0,0 +1,29 @@ +package logger + +var externalValidators []LogOptValidator + +// RegisterExternalValidator adds the validator to the list of external validators. +// External validators are used by packages outside this package that need to add their own validation logic. +// This should only be called on package initialization. +func RegisterExternalValidator(v LogOptValidator) { + externalValidators = append(externalValidators, v) +} + +// AddBuiltinLogOpts updates the list of built-in log opts. This allows other packages to supplement additional log options +// without having to register an actual log driver. This is used by things that are more proxy log drivers and should +// not be exposed as a usable log driver to the API. +// This should only be called on package initialization. +func AddBuiltinLogOpts(opts map[string]bool) { + for k, v := range opts { + builtInLogOpts[k] = v + } +} + +func validateExternal(cfg map[string]string) error { + for _, v := range externalValidators { + if err := v(cfg); err != nil { + return err + } + } + return nil +} diff --git a/daemon/logger/loggerutils/cache/local_cache.go b/daemon/logger/loggerutils/cache/local_cache.go index 842c2bfc9e..750dc498ab 100644 --- a/daemon/logger/loggerutils/cache/local_cache.go +++ b/daemon/logger/loggerutils/cache/local_cache.go @@ -1,22 +1,55 @@ package cache // import "github.com/docker/docker/daemon/logger/loggerutils/cache" import ( + "strconv" + + "github.com/docker/docker/api/types/container" "github.com/docker/docker/daemon/logger" "github.com/docker/docker/daemon/logger/local" + units "github.com/docker/go-units" + "github.com/pkg/errors" "github.com/sirupsen/logrus" ) +const ( + // DriverName is the name of the driver used for local log caching + DriverName = local.Name + + cachePrefix = "cache-" + cacheDisabledKey = cachePrefix + "disabled" +) + +var builtInCacheLogOpts = map[string]bool{ + cacheDisabledKey: true, +} + // WithLocalCache wraps the passed in logger with a logger caches all writes locally // in addition to writing to the passed in logger. -func WithLocalCache(l logger.Logger, logInfo logger.Info) (logger.Logger, error) { - localLogger, err := local.New(logInfo) +func WithLocalCache(l logger.Logger, info logger.Info) (logger.Logger, error) { + initLogger, err := logger.GetLogDriver(DriverName) if err != nil { return nil, err } + + cacher, err := initLogger(info) + if err != nil { + return nil, errors.Wrap(err, "error initializing local log cache driver") + } + + if info.Config["mode"] == container.LogModeUnset || container.LogMode(info.Config["mode"]) == container.LogModeNonBlock { + var size int64 = -1 + if s, exists := info.Config["max-buffer-size"]; exists { + size, err = units.RAMInBytes(s) + if err != nil { + return nil, err + } + } + cacher = logger.NewRingLogger(cacher, info, size) + } + return &loggerWithCache{ - l: l, - // TODO(@cpuguy83): Should this be configurable? - cache: logger.NewRingLogger(localLogger, logInfo, -1), + l: l, + cache: cacher, }, nil } @@ -26,9 +59,10 @@ type loggerWithCache struct { } func (l *loggerWithCache) Log(msg *logger.Message) error { - // copy the message since the underlying logger will return the passed in message to the message pool + // copy the message as the original will be reset once the call to `Log` is complete dup := logger.NewMessage() dumbCopyMessage(dup, msg) + if err := l.l.Log(msg); err != nil { return err } @@ -51,6 +85,19 @@ func (l *loggerWithCache) Close() error { return err } +// ShouldUseCache reads the log opts to determine if caching should be enabled +func ShouldUseCache(cfg map[string]string) bool { + if cfg[cacheDisabledKey] == "" { + return true + } + b, err := strconv.ParseBool(cfg[cacheDisabledKey]) + if err != nil { + // This shouldn't happen since the values are validated before hand. + return false + } + return !b +} + // dumbCopyMessage is a bit of a fake copy but avoids extra allocations which // are not necessary for this use case. func dumbCopyMessage(dst, src *logger.Message) { @@ -59,5 +106,5 @@ func dumbCopyMessage(dst, src *logger.Message) { dst.PLogMetaData = src.PLogMetaData dst.Err = src.Err dst.Attrs = src.Attrs - dst.Line = src.Line + dst.Line = append(dst.Line[:0], src.Line...) } diff --git a/daemon/logger/loggerutils/cache/log_cache_test.go b/daemon/logger/loggerutils/cache/log_cache_test.go new file mode 100644 index 0000000000..ef4be26f6f --- /dev/null +++ b/daemon/logger/loggerutils/cache/log_cache_test.go @@ -0,0 +1,81 @@ +package cache + +import ( + "context" + "testing" + + "time" + + "bytes" + + "github.com/docker/docker/daemon/logger" + "gotest.tools/v3/assert" + "gotest.tools/v3/assert/cmp" +) + +type fakeLogger struct { + messages chan logger.Message + close chan struct{} +} + +func (l *fakeLogger) Log(msg *logger.Message) error { + select { + case l.messages <- *msg: + case <-l.close: + } + logger.PutMessage(msg) + return nil +} + +func (l *fakeLogger) Name() string { + return "fake" +} + +func (l *fakeLogger) Close() error { + close(l.close) + return nil +} + +func TestLog(t *testing.T) { + cacher := &fakeLogger{make(chan logger.Message), make(chan struct{})} + l := &loggerWithCache{ + l: &fakeLogger{make(chan logger.Message, 100), make(chan struct{})}, + cache: cacher, + } + defer l.Close() + + var messages []logger.Message + for i := 0; i < 100; i++ { + messages = append(messages, logger.Message{ + Timestamp: time.Now(), + Line: append(bytes.Repeat([]byte("a"), 100), '\n'), + }) + } + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + go func() { + for _, msg := range messages { + select { + case <-ctx.Done(): + return + default: + } + + m := logger.NewMessage() + dumbCopyMessage(m, &msg) + l.Log(m) + } + }() + + for _, m := range messages { + var msg logger.Message + select { + case <-ctx.Done(): + t.Fatal("timed out waiting for messages... this is probably a test implementation error") + case msg = <-cacher.messages: + assert.Assert(t, cmp.DeepEqual(msg, m)) + } + } +} diff --git a/daemon/logger/loggerutils/cache/validate.go b/daemon/logger/loggerutils/cache/validate.go new file mode 100644 index 0000000000..70bd8810d0 --- /dev/null +++ b/daemon/logger/loggerutils/cache/validate.go @@ -0,0 +1,40 @@ +package cache + +import ( + "strconv" + + "github.com/docker/docker/daemon/logger" + "github.com/docker/docker/daemon/logger/local" + "github.com/pkg/errors" +) + +func init() { + for k, v := range local.LogOptKeys { + builtInCacheLogOpts[cachePrefix+k] = v + } + logger.AddBuiltinLogOpts(builtInCacheLogOpts) + logger.RegisterExternalValidator(validateLogCacheOpts) +} + +func validateLogCacheOpts(cfg map[string]string) error { + if v := cfg[cacheDisabledKey]; v != "" { + _, err := strconv.ParseBool(v) + if err != nil { + return errors.Errorf("invalid value for option %s: %s", cacheDisabledKey, cfg[cacheDisabledKey]) + } + } + return nil +} + +// MergeDefaultLogConfig reads the default log opts and makes sure that any caching related keys that exist there are +// added to dst. +func MergeDefaultLogConfig(dst, defaults map[string]string) { + for k, v := range defaults { + if !builtInCacheLogOpts[k] { + continue + } + if _, exists := dst[k]; !exists { + dst[k] = v + } + } +} diff --git a/daemon/logs.go b/daemon/logs.go index 668a75c778..a4105d3e78 100644 --- a/daemon/logs.go +++ b/daemon/logs.go @@ -11,6 +11,7 @@ import ( timetypes "github.com/docker/docker/api/types/time" "github.com/docker/docker/container" "github.com/docker/docker/daemon/logger" + logcache "github.com/docker/docker/daemon/logger/loggerutils/cache" "github.com/docker/docker/errdefs" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -190,6 +191,8 @@ func (daemon *Daemon) mergeAndVerifyLogConfig(cfg *containertypes.LogConfig) err } } + logcache.MergeDefaultLogConfig(cfg.Config, daemon.defaultLogConfig.Config) + return logger.ValidateLogOpts(cfg.Type, cfg.Config) } @@ -204,6 +207,7 @@ func (daemon *Daemon) setupDefaultLogConfig() error { Type: config.LogConfig.Type, Config: config.LogConfig.Config, } + logrus.Debugf("Using default logging driver %s", daemon.defaultLogConfig.Type) return nil } diff --git a/integration/plugin/logging/cmd/discard/driver.go b/integration/plugin/logging/cmd/discard/driver.go index ccf2849cca..e02b56e88f 100644 --- a/integration/plugin/logging/cmd/discard/driver.go +++ b/integration/plugin/logging/cmd/discard/driver.go @@ -62,6 +62,7 @@ func handle(mux *http.ServeMux) { if f := d.logs[req.File]; f != nil { f.Close() } + d.mu.Unlock() respond(nil, w) }) diff --git a/integration/plugin/logging/read_test.go b/integration/plugin/logging/read_test.go index 8671618fea..cd57a5d864 100644 --- a/integration/plugin/logging/read_test.go +++ b/integration/plugin/logging/read_test.go @@ -29,45 +29,62 @@ func TestReadPluginNoRead(t *testing.T) { createPlugin(t, client, "test", "discard", asLogDriver) ctx := context.Background() - defer func() { - err = client.PluginRemove(ctx, "test", types.PluginRemoveOptions{Force: true}) - assert.Check(t, err) - }() err = client.PluginEnable(ctx, "test", types.PluginEnableOptions{Timeout: 30}) assert.Check(t, err) + d.Stop(t) - c, err := client.ContainerCreate(ctx, - &container.Config{ - Image: "busybox", - Cmd: []string{"/bin/echo", "hello world"}, - }, - &container.HostConfig{LogConfig: container.LogConfig{Type: "test"}}, - nil, - "", - ) - assert.Assert(t, err) - - err = client.ContainerStart(ctx, c.ID, types.ContainerStartOptions{}) - assert.Assert(t, err) - - logs, err := client.ContainerLogs(ctx, c.ID, types.ContainerLogsOptions{ShowStdout: true}) - assert.Assert(t, err) - defer logs.Close() - - buf := bytes.NewBuffer(nil) - - errCh := make(chan error) - go func() { - _, err := stdcopy.StdCopy(buf, buf, logs) - errCh <- err - }() - - select { - case <-time.After(60 * time.Second): - t.Fatal("timeout waiting for IO to complete") - case err := <-errCh: - assert.Assert(t, err) + cfg := &container.Config{ + Image: "busybox", + Cmd: []string{"/bin/echo", "hello world"}, } - assert.Assert(t, strings.TrimSpace(buf.String()) == "hello world", buf.Bytes()) + for desc, test := range map[string]struct { + dOpts []string + logsSupported bool + }{ + "default": {logsSupported: true}, + "disabled caching": {[]string{"--log-opt=cache-disabled=true"}, false}, + "explicitly enabled caching": {[]string{"--log-opt=cache-disabled=false"}, true}, + } { + t.Run(desc, func(t *testing.T) { + d.Start(t, append([]string{"--iptables=false"}, test.dOpts...)...) + defer d.Stop(t) + c, err := client.ContainerCreate(ctx, + cfg, + &container.HostConfig{LogConfig: container.LogConfig{Type: "test"}}, + nil, + "", + ) + assert.Assert(t, err) + defer client.ContainerRemove(ctx, c.ID, types.ContainerRemoveOptions{Force: true}) + + err = client.ContainerStart(ctx, c.ID, types.ContainerStartOptions{}) + assert.Assert(t, err) + + logs, err := client.ContainerLogs(ctx, c.ID, types.ContainerLogsOptions{ShowStdout: true}) + if !test.logsSupported { + assert.Assert(t, err != nil) + return + } + assert.Assert(t, err) + defer logs.Close() + + buf := bytes.NewBuffer(nil) + + errCh := make(chan error) + go func() { + _, err := stdcopy.StdCopy(buf, buf, logs) + errCh <- err + }() + + select { + case <-time.After(60 * time.Second): + t.Fatal("timeout waiting for IO to complete") + case err := <-errCh: + assert.Assert(t, err) + } + assert.Assert(t, strings.TrimSpace(buf.String()) == "hello world", buf.Bytes()) + }) + } + }