diff --git a/daemon/logger/plugin_unix.go b/daemon/logger/plugin_unix.go index 6192f52eb8..e9a16af9b1 100644 --- a/daemon/logger/plugin_unix.go +++ b/daemon/logger/plugin_unix.go @@ -12,7 +12,10 @@ import ( ) func openPluginStream(a *pluginAdapter) (io.WriteCloser, error) { - f, err := fifo.OpenFifo(context.Background(), a.fifoPath, unix.O_WRONLY|unix.O_CREAT|unix.O_NONBLOCK, 0700) + // Make sure to also open with read (in addition to write) to avoid borken pipe errors on plugin failure. + // It is up to the plugin to keep track of pipes that it should re-attach to, however. + // If the plugin doesn't open for reads, then the container will block once the pipe is full. + f, err := fifo.OpenFifo(context.Background(), a.fifoPath, unix.O_RDWR|unix.O_CREAT|unix.O_NONBLOCK, 0700) if err != nil { return nil, errors.Wrapf(err, "error creating i/o pipe for log plugin: %s", a.Name()) } diff --git a/integration/internal/container/ops.go b/integration/internal/container/ops.go index b7d1a7bda9..df5598b62f 100644 --- a/integration/internal/container/ops.go +++ b/integration/internal/container/ops.go @@ -116,3 +116,21 @@ func WithIPv6(network, ip string) func(*TestContainerConfig) { c.NetworkingConfig.EndpointsConfig[network].IPAMConfig.IPv6Address = ip } } + +// WithLogDriver sets the log driver to use for the container +func WithLogDriver(driver string) func(*TestContainerConfig) { + return func(c *TestContainerConfig) { + if c.HostConfig == nil { + c.HostConfig = &containertypes.HostConfig{} + } + c.HostConfig.LogConfig.Type = driver + } +} + +// WithAutoRemove sets the container to be removed on exit +func WithAutoRemove(c *TestContainerConfig) { + if c.HostConfig == nil { + c.HostConfig = &containertypes.HostConfig{} + } + c.HostConfig.AutoRemove = true +} diff --git a/integration/plugin/logging/cmd/close_on_start/main.go b/integration/plugin/logging/cmd/close_on_start/main.go new file mode 100644 index 0000000000..6891d6a995 --- /dev/null +++ b/integration/plugin/logging/cmd/close_on_start/main.go @@ -0,0 +1,48 @@ +package main + +import ( + "encoding/json" + "fmt" + "net" + "net/http" + "os" +) + +type start struct { + File string +} + +func main() { + l, err := net.Listen("unix", "/run/docker/plugins/plugin.sock") + if err != nil { + panic(err) + } + + mux := http.NewServeMux() + mux.HandleFunc("/LogDriver.StartLogging", func(w http.ResponseWriter, req *http.Request) { + startReq := &start{} + if err := json.NewDecoder(req.Body).Decode(startReq); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + f, err := os.OpenFile(startReq.File, os.O_RDONLY, 0600) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + // Close the file immediately, this allows us to test what happens in the daemon when the plugin has closed the + // file or, for example, the plugin has crashed. + f.Close() + + w.WriteHeader(http.StatusOK) + fmt.Fprintln(w, `{}`) + }) + server := http.Server{ + Addr: l.Addr().String(), + Handler: mux, + } + + server.Serve(l) +} diff --git a/integration/plugin/logging/cmd/close_on_start/main_test.go b/integration/plugin/logging/cmd/close_on_start/main_test.go new file mode 100644 index 0000000000..06ab7d0f9a --- /dev/null +++ b/integration/plugin/logging/cmd/close_on_start/main_test.go @@ -0,0 +1 @@ +package main diff --git a/integration/plugin/logging/logging_test.go b/integration/plugin/logging/logging_test.go new file mode 100644 index 0000000000..7c118caa29 --- /dev/null +++ b/integration/plugin/logging/logging_test.go @@ -0,0 +1,82 @@ +package logging + +import ( + "bufio" + "context" + "os" + "strings" + "testing" + "time" + + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/volume" + "github.com/docker/docker/integration/internal/container" + "github.com/docker/docker/internal/test/daemon" + "github.com/gotestyourself/gotestyourself/assert" +) + +func TestContinueAfterPluginCrash(t *testing.T) { + t.Parallel() + + d := daemon.New(t) + d.StartWithBusybox(t, "--iptables=false", "--init") + defer d.Stop(t) + + client := d.NewClientT(t) + createPlugin(t, client, "test", "close_on_start", asLogDriver) + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + assert.Assert(t, client.PluginEnable(ctx, "test", types.PluginEnableOptions{Timeout: 30})) + cancel() + defer client.PluginRemove(context.Background(), "test", types.PluginRemoveOptions{Force: true}) + + v, err := client.VolumeCreate(context.Background(), volume.VolumesCreateBody{}) + assert.Assert(t, err) + defer client.VolumeRemove(context.Background(), v.Name, true) + + ctx, cancel = context.WithTimeout(context.Background(), 60*time.Second) + + id := container.Run(t, ctx, client, + container.WithAutoRemove, + container.WithLogDriver("test"), + container.WithCmd( + "/bin/sh", "-c", "while true; do sleep 1; echo hello; done", + ), + ) + cancel() + defer client.ContainerRemove(context.Background(), id, types.ContainerRemoveOptions{Force: true}) + + // Attach to the container to make sure it's written a few times to stdout + attach, err := client.ContainerAttach(context.Background(), id, types.ContainerAttachOptions{Stream: true, Stdout: true}) + assert.Assert(t, err) + + chErr := make(chan error) + go func() { + defer close(chErr) + rdr := bufio.NewReader(attach.Reader) + for i := 0; i < 5; i++ { + _, _, err := rdr.ReadLine() + if err != nil { + chErr <- err + return + } + } + }() + + select { + case err := <-chErr: + assert.Assert(t, err) + case <-time.After(60 * time.Second): + t.Fatal("timeout waiting for container i/o") + } + + // check daemon logs for "broken pipe" + // TODO(@cpuguy83): This is horribly hacky but is the only way to really test this case right now. + // It would be nice if there was a way to know that a broken pipe has occurred without looking through the logs. + log, err := os.Open(d.LogFileName()) + assert.Assert(t, err) + scanner := bufio.NewScanner(log) + for scanner.Scan() { + assert.Assert(t, !strings.Contains(scanner.Text(), "broken pipe")) + } +}