From 750f0d16482c7d6ec9bb3dd77227f18f57569eee Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Mon, 9 Apr 2018 15:35:24 -0400 Subject: [PATCH] Support configuration of log cacher. Configuration over the API per container is intentionally left out for the time being, but is supported to configure the default from the daemon config. Signed-off-by: Brian Goff (cherry picked from commit cbecf48bc352e680a5390a7ca9cff53098cd16d7) Signed-off-by: Madhu Venugopal --- container/container.go | 25 +++--- daemon/logdrivers_linux.go | 1 + daemon/logdrivers_windows.go | 1 + daemon/logger/factory.go | 4 + daemon/logger/log_cache_opts.go | 29 ++++++ .../logger/loggerutils/cache/local_cache.go | 61 +++++++++++-- .../loggerutils/cache/log_cache_test.go | 81 +++++++++++++++++ daemon/logger/loggerutils/cache/validate.go | 40 +++++++++ daemon/logs.go | 4 + .../plugin/logging/cmd/discard/driver.go | 1 + integration/plugin/logging/read_test.go | 89 +++++++++++-------- 11 files changed, 282 insertions(+), 54 deletions(-) create mode 100644 daemon/logger/log_cache_opts.go create mode 100644 daemon/logger/loggerutils/cache/log_cache_test.go create mode 100644 daemon/logger/loggerutils/cache/validate.go diff --git a/container/container.go b/container/container.go index c2b04c2161..d7d6222b21 100644 --- a/container/container.go +++ b/container/container.go @@ -423,18 +423,21 @@ func (container *Container) StartLogger() (logger.Logger, error) { } if _, ok := l.(logger.LogReader); !ok { - logPath, err := container.GetRootResourcePath("container-cached.log") - if err != nil { - return nil, err - } - info.LogPath = logPath + if cache.ShouldUseCache(cfg.Config) { + logPath, err := container.GetRootResourcePath("container-cached.log") + if err != nil { + return nil, err + } - if !container.LocalLogCacheMeta.HaveNotifyEnabled { - logrus.WithField("container", container.ID).Info("Configured log driver does not support reads, enabling local file cache for container logs") - } - l, err = cache.WithLocalCache(l, info) - if err != nil { - return nil, errors.Wrap(err, "error setting up local container log cache") + if !container.LocalLogCacheMeta.HaveNotifyEnabled { + logrus.WithField("container", container.ID).WithField("driver", container.HostConfig.LogConfig.Type).Info("Configured log driver does not support reads, enabling local file cache for container logs") + container.LocalLogCacheMeta.HaveNotifyEnabled = true + } + info.LogPath = logPath + l, err = cache.WithLocalCache(l, info) + if err != nil { + return nil, errors.Wrap(err, "error setting up local container log cache") + } } } return l, nil diff --git a/daemon/logdrivers_linux.go b/daemon/logdrivers_linux.go index 67154a7a98..425f412b20 100644 --- a/daemon/logdrivers_linux.go +++ b/daemon/logdrivers_linux.go @@ -11,6 +11,7 @@ import ( _ "github.com/docker/docker/daemon/logger/jsonfilelog" _ "github.com/docker/docker/daemon/logger/local" _ "github.com/docker/docker/daemon/logger/logentries" + _ "github.com/docker/docker/daemon/logger/loggerutils/cache" _ "github.com/docker/docker/daemon/logger/splunk" _ "github.com/docker/docker/daemon/logger/syslog" ) diff --git a/daemon/logdrivers_windows.go b/daemon/logdrivers_windows.go index 425a4d89b6..6c9d97f785 100644 --- a/daemon/logdrivers_windows.go +++ b/daemon/logdrivers_windows.go @@ -10,6 +10,7 @@ import ( _ "github.com/docker/docker/daemon/logger/gelf" _ "github.com/docker/docker/daemon/logger/jsonfilelog" _ "github.com/docker/docker/daemon/logger/logentries" + _ "github.com/docker/docker/daemon/logger/loggerutils/cache" _ "github.com/docker/docker/daemon/logger/splunk" _ "github.com/docker/docker/daemon/logger/syslog" ) diff --git a/daemon/logger/factory.go b/daemon/logger/factory.go index 9723f7fc0c..d0bedac4ad 100644 --- a/daemon/logger/factory.go +++ b/daemon/logger/factory.go @@ -143,6 +143,10 @@ func ValidateLogOpts(name string, cfg map[string]string) error { } } + if err := validateExternal(cfg); err != nil { + return err + } + if !factory.driverRegistered(name) { return fmt.Errorf("logger: no log driver named '%s' is registered", name) } diff --git a/daemon/logger/log_cache_opts.go b/daemon/logger/log_cache_opts.go new file mode 100644 index 0000000000..8d09c489ed --- /dev/null +++ b/daemon/logger/log_cache_opts.go @@ -0,0 +1,29 @@ +package logger + +var externalValidators []LogOptValidator + +// RegisterExternalValidator adds the validator to the list of external validators. +// External validators are used by packages outside this package that need to add their own validation logic. +// This should only be called on package initialization. +func RegisterExternalValidator(v LogOptValidator) { + externalValidators = append(externalValidators, v) +} + +// AddBuiltinLogOpts updates the list of built-in log opts. This allows other packages to supplement additional log options +// without having to register an actual log driver. This is used by things that are more proxy log drivers and should +// not be exposed as a usable log driver to the API. +// This should only be called on package initialization. +func AddBuiltinLogOpts(opts map[string]bool) { + for k, v := range opts { + builtInLogOpts[k] = v + } +} + +func validateExternal(cfg map[string]string) error { + for _, v := range externalValidators { + if err := v(cfg); err != nil { + return err + } + } + return nil +} diff --git a/daemon/logger/loggerutils/cache/local_cache.go b/daemon/logger/loggerutils/cache/local_cache.go index 842c2bfc9e..750dc498ab 100644 --- a/daemon/logger/loggerutils/cache/local_cache.go +++ b/daemon/logger/loggerutils/cache/local_cache.go @@ -1,22 +1,55 @@ package cache // import "github.com/docker/docker/daemon/logger/loggerutils/cache" import ( + "strconv" + + "github.com/docker/docker/api/types/container" "github.com/docker/docker/daemon/logger" "github.com/docker/docker/daemon/logger/local" + units "github.com/docker/go-units" + "github.com/pkg/errors" "github.com/sirupsen/logrus" ) +const ( + // DriverName is the name of the driver used for local log caching + DriverName = local.Name + + cachePrefix = "cache-" + cacheDisabledKey = cachePrefix + "disabled" +) + +var builtInCacheLogOpts = map[string]bool{ + cacheDisabledKey: true, +} + // WithLocalCache wraps the passed in logger with a logger caches all writes locally // in addition to writing to the passed in logger. -func WithLocalCache(l logger.Logger, logInfo logger.Info) (logger.Logger, error) { - localLogger, err := local.New(logInfo) +func WithLocalCache(l logger.Logger, info logger.Info) (logger.Logger, error) { + initLogger, err := logger.GetLogDriver(DriverName) if err != nil { return nil, err } + + cacher, err := initLogger(info) + if err != nil { + return nil, errors.Wrap(err, "error initializing local log cache driver") + } + + if info.Config["mode"] == container.LogModeUnset || container.LogMode(info.Config["mode"]) == container.LogModeNonBlock { + var size int64 = -1 + if s, exists := info.Config["max-buffer-size"]; exists { + size, err = units.RAMInBytes(s) + if err != nil { + return nil, err + } + } + cacher = logger.NewRingLogger(cacher, info, size) + } + return &loggerWithCache{ - l: l, - // TODO(@cpuguy83): Should this be configurable? - cache: logger.NewRingLogger(localLogger, logInfo, -1), + l: l, + cache: cacher, }, nil } @@ -26,9 +59,10 @@ type loggerWithCache struct { } func (l *loggerWithCache) Log(msg *logger.Message) error { - // copy the message since the underlying logger will return the passed in message to the message pool + // copy the message as the original will be reset once the call to `Log` is complete dup := logger.NewMessage() dumbCopyMessage(dup, msg) + if err := l.l.Log(msg); err != nil { return err } @@ -51,6 +85,19 @@ func (l *loggerWithCache) Close() error { return err } +// ShouldUseCache reads the log opts to determine if caching should be enabled +func ShouldUseCache(cfg map[string]string) bool { + if cfg[cacheDisabledKey] == "" { + return true + } + b, err := strconv.ParseBool(cfg[cacheDisabledKey]) + if err != nil { + // This shouldn't happen since the values are validated before hand. + return false + } + return !b +} + // dumbCopyMessage is a bit of a fake copy but avoids extra allocations which // are not necessary for this use case. func dumbCopyMessage(dst, src *logger.Message) { @@ -59,5 +106,5 @@ func dumbCopyMessage(dst, src *logger.Message) { dst.PLogMetaData = src.PLogMetaData dst.Err = src.Err dst.Attrs = src.Attrs - dst.Line = src.Line + dst.Line = append(dst.Line[:0], src.Line...) } diff --git a/daemon/logger/loggerutils/cache/log_cache_test.go b/daemon/logger/loggerutils/cache/log_cache_test.go new file mode 100644 index 0000000000..ef4be26f6f --- /dev/null +++ b/daemon/logger/loggerutils/cache/log_cache_test.go @@ -0,0 +1,81 @@ +package cache + +import ( + "context" + "testing" + + "time" + + "bytes" + + "github.com/docker/docker/daemon/logger" + "gotest.tools/v3/assert" + "gotest.tools/v3/assert/cmp" +) + +type fakeLogger struct { + messages chan logger.Message + close chan struct{} +} + +func (l *fakeLogger) Log(msg *logger.Message) error { + select { + case l.messages <- *msg: + case <-l.close: + } + logger.PutMessage(msg) + return nil +} + +func (l *fakeLogger) Name() string { + return "fake" +} + +func (l *fakeLogger) Close() error { + close(l.close) + return nil +} + +func TestLog(t *testing.T) { + cacher := &fakeLogger{make(chan logger.Message), make(chan struct{})} + l := &loggerWithCache{ + l: &fakeLogger{make(chan logger.Message, 100), make(chan struct{})}, + cache: cacher, + } + defer l.Close() + + var messages []logger.Message + for i := 0; i < 100; i++ { + messages = append(messages, logger.Message{ + Timestamp: time.Now(), + Line: append(bytes.Repeat([]byte("a"), 100), '\n'), + }) + } + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + go func() { + for _, msg := range messages { + select { + case <-ctx.Done(): + return + default: + } + + m := logger.NewMessage() + dumbCopyMessage(m, &msg) + l.Log(m) + } + }() + + for _, m := range messages { + var msg logger.Message + select { + case <-ctx.Done(): + t.Fatal("timed out waiting for messages... this is probably a test implementation error") + case msg = <-cacher.messages: + assert.Assert(t, cmp.DeepEqual(msg, m)) + } + } +} diff --git a/daemon/logger/loggerutils/cache/validate.go b/daemon/logger/loggerutils/cache/validate.go new file mode 100644 index 0000000000..70bd8810d0 --- /dev/null +++ b/daemon/logger/loggerutils/cache/validate.go @@ -0,0 +1,40 @@ +package cache + +import ( + "strconv" + + "github.com/docker/docker/daemon/logger" + "github.com/docker/docker/daemon/logger/local" + "github.com/pkg/errors" +) + +func init() { + for k, v := range local.LogOptKeys { + builtInCacheLogOpts[cachePrefix+k] = v + } + logger.AddBuiltinLogOpts(builtInCacheLogOpts) + logger.RegisterExternalValidator(validateLogCacheOpts) +} + +func validateLogCacheOpts(cfg map[string]string) error { + if v := cfg[cacheDisabledKey]; v != "" { + _, err := strconv.ParseBool(v) + if err != nil { + return errors.Errorf("invalid value for option %s: %s", cacheDisabledKey, cfg[cacheDisabledKey]) + } + } + return nil +} + +// MergeDefaultLogConfig reads the default log opts and makes sure that any caching related keys that exist there are +// added to dst. +func MergeDefaultLogConfig(dst, defaults map[string]string) { + for k, v := range defaults { + if !builtInCacheLogOpts[k] { + continue + } + if _, exists := dst[k]; !exists { + dst[k] = v + } + } +} diff --git a/daemon/logs.go b/daemon/logs.go index 668a75c778..a4105d3e78 100644 --- a/daemon/logs.go +++ b/daemon/logs.go @@ -11,6 +11,7 @@ import ( timetypes "github.com/docker/docker/api/types/time" "github.com/docker/docker/container" "github.com/docker/docker/daemon/logger" + logcache "github.com/docker/docker/daemon/logger/loggerutils/cache" "github.com/docker/docker/errdefs" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -190,6 +191,8 @@ func (daemon *Daemon) mergeAndVerifyLogConfig(cfg *containertypes.LogConfig) err } } + logcache.MergeDefaultLogConfig(cfg.Config, daemon.defaultLogConfig.Config) + return logger.ValidateLogOpts(cfg.Type, cfg.Config) } @@ -204,6 +207,7 @@ func (daemon *Daemon) setupDefaultLogConfig() error { Type: config.LogConfig.Type, Config: config.LogConfig.Config, } + logrus.Debugf("Using default logging driver %s", daemon.defaultLogConfig.Type) return nil } diff --git a/integration/plugin/logging/cmd/discard/driver.go b/integration/plugin/logging/cmd/discard/driver.go index ccf2849cca..e02b56e88f 100644 --- a/integration/plugin/logging/cmd/discard/driver.go +++ b/integration/plugin/logging/cmd/discard/driver.go @@ -62,6 +62,7 @@ func handle(mux *http.ServeMux) { if f := d.logs[req.File]; f != nil { f.Close() } + d.mu.Unlock() respond(nil, w) }) diff --git a/integration/plugin/logging/read_test.go b/integration/plugin/logging/read_test.go index 8671618fea..cd57a5d864 100644 --- a/integration/plugin/logging/read_test.go +++ b/integration/plugin/logging/read_test.go @@ -29,45 +29,62 @@ func TestReadPluginNoRead(t *testing.T) { createPlugin(t, client, "test", "discard", asLogDriver) ctx := context.Background() - defer func() { - err = client.PluginRemove(ctx, "test", types.PluginRemoveOptions{Force: true}) - assert.Check(t, err) - }() err = client.PluginEnable(ctx, "test", types.PluginEnableOptions{Timeout: 30}) assert.Check(t, err) + d.Stop(t) - c, err := client.ContainerCreate(ctx, - &container.Config{ - Image: "busybox", - Cmd: []string{"/bin/echo", "hello world"}, - }, - &container.HostConfig{LogConfig: container.LogConfig{Type: "test"}}, - nil, - "", - ) - assert.Assert(t, err) - - err = client.ContainerStart(ctx, c.ID, types.ContainerStartOptions{}) - assert.Assert(t, err) - - logs, err := client.ContainerLogs(ctx, c.ID, types.ContainerLogsOptions{ShowStdout: true}) - assert.Assert(t, err) - defer logs.Close() - - buf := bytes.NewBuffer(nil) - - errCh := make(chan error) - go func() { - _, err := stdcopy.StdCopy(buf, buf, logs) - errCh <- err - }() - - select { - case <-time.After(60 * time.Second): - t.Fatal("timeout waiting for IO to complete") - case err := <-errCh: - assert.Assert(t, err) + cfg := &container.Config{ + Image: "busybox", + Cmd: []string{"/bin/echo", "hello world"}, } - assert.Assert(t, strings.TrimSpace(buf.String()) == "hello world", buf.Bytes()) + for desc, test := range map[string]struct { + dOpts []string + logsSupported bool + }{ + "default": {logsSupported: true}, + "disabled caching": {[]string{"--log-opt=cache-disabled=true"}, false}, + "explicitly enabled caching": {[]string{"--log-opt=cache-disabled=false"}, true}, + } { + t.Run(desc, func(t *testing.T) { + d.Start(t, append([]string{"--iptables=false"}, test.dOpts...)...) + defer d.Stop(t) + c, err := client.ContainerCreate(ctx, + cfg, + &container.HostConfig{LogConfig: container.LogConfig{Type: "test"}}, + nil, + "", + ) + assert.Assert(t, err) + defer client.ContainerRemove(ctx, c.ID, types.ContainerRemoveOptions{Force: true}) + + err = client.ContainerStart(ctx, c.ID, types.ContainerStartOptions{}) + assert.Assert(t, err) + + logs, err := client.ContainerLogs(ctx, c.ID, types.ContainerLogsOptions{ShowStdout: true}) + if !test.logsSupported { + assert.Assert(t, err != nil) + return + } + assert.Assert(t, err) + defer logs.Close() + + buf := bytes.NewBuffer(nil) + + errCh := make(chan error) + go func() { + _, err := stdcopy.StdCopy(buf, buf, logs) + errCh <- err + }() + + select { + case <-time.After(60 * time.Second): + t.Fatal("timeout waiting for IO to complete") + case err := <-errCh: + assert.Assert(t, err) + } + assert.Assert(t, strings.TrimSpace(buf.String()) == "hello world", buf.Bytes()) + }) + } + }