diff --git a/container/container.go b/container/container.go index 9809124d8e..f8b6fabc6e 100644 --- a/container/container.go +++ b/container/container.go @@ -664,14 +664,14 @@ func (container *Container) StderrPipe() io.ReadCloser { } // CloseStreams closes the container's stdio streams -func (container *Container) CloseStreams() error { - return container.StreamConfig.CloseStreams() +func (container *Container) CloseStreams(ctx context.Context) error { + return container.StreamConfig.CloseStreams(ctx) } // InitializeStdio is called by libcontainerd to connect the stdio. func (container *Container) InitializeStdio(iop *cio.DirectIO) (cio.IO, error) { if err := container.startLogging(); err != nil { - container.Reset(false) + container.Reset(context.Background(), false) return nil, err } @@ -810,7 +810,7 @@ type rio struct { func (i *rio) Close() error { i.IO.Close() - return i.sc.CloseStreams() + return i.sc.CloseStreams(context.Background()) } func (i *rio) Wait() { diff --git a/container/exec.go b/container/exec.go index 328b596b04..d4bef0c547 100644 --- a/container/exec.go +++ b/container/exec.go @@ -65,8 +65,8 @@ func (c *ExecConfig) InitializeStdio(iop *cio.DirectIO) (cio.IO, error) { } // CloseStreams closes the stdio streams for the exec -func (c *ExecConfig) CloseStreams() error { - return c.StreamConfig.CloseStreams() +func (c *ExecConfig) CloseStreams(ctx context.Context) error { + return c.StreamConfig.CloseStreams(ctx) } // SetExitCode sets the exec config's exit code diff --git a/container/monitor.go b/container/monitor.go index 60a6228ee4..98dc4d024c 100644 --- a/container/monitor.go +++ b/container/monitor.go @@ -12,14 +12,14 @@ const ( ) // Reset puts a container into a state where it can be restarted again. -func (container *Container) Reset(lock bool) { +func (container *Container) Reset(ctx context.Context, lock bool) { if lock { container.Lock() defer container.Unlock() } - if err := container.CloseStreams(); err != nil { - log.G(context.TODO()).Errorf("%s: %s", container.ID, err) + if err := container.CloseStreams(ctx); err != nil { + log.G(ctx).Errorf("%s: %s", container.ID, err) } // Re-create a brand new stdin pipe once the container exited @@ -39,7 +39,7 @@ func (container *Container) Reset(lock bool) { defer timer.Stop() select { case <-timer.C: - log.G(context.TODO()).Warn("Logger didn't exit in time: logs may be truncated") + log.G(ctx).Warn("Logger didn't exit in time: logs may be truncated") case <-exit: } } diff --git a/container/stream/streams.go b/container/stream/streams.go index 78ec048396..b6d7d26844 100644 --- a/container/stream/streams.go +++ b/container/stream/streams.go @@ -90,7 +90,7 @@ func (c *Config) NewNopInputPipe() { } // CloseStreams ensures that the configured streams are properly closed. -func (c *Config) CloseStreams() error { +func (c *Config) CloseStreams(ctx context.Context) error { var errors []string if c.stdin != nil { @@ -99,11 +99,11 @@ func (c *Config) CloseStreams() error { } } - if err := c.stdout.Clean(); err != nil { + if err := c.stdout.CleanContext(ctx); err != nil { errors = append(errors, fmt.Sprintf("error close stdout: %s", err)) } - if err := c.stderr.Clean(); err != nil { + if err := c.stderr.CleanContext(ctx); err != nil { errors = append(errors, fmt.Sprintf("error close stderr: %s", err)) } diff --git a/daemon/exec.go b/daemon/exec.go index 2ab0b6b409..1de3502f43 100644 --- a/daemon/exec.go +++ b/daemon/exec.go @@ -185,7 +185,7 @@ func (daemon *Daemon) ContainerExecStart(ctx context.Context, name string, optio ec.Running = false exitCode := 126 ec.ExitCode = &exitCode - if err := ec.CloseStreams(); err != nil { + if err := ec.CloseStreams(ctx); err != nil { log.G(ctx).Errorf("failed to cleanup exec %s streams: %s", ec.Container.ID, err) } ec.Unlock() diff --git a/daemon/monitor.go b/daemon/monitor.go index 7c47ae0786..c66758e8e0 100644 --- a/daemon/monitor.go +++ b/daemon/monitor.go @@ -28,7 +28,10 @@ func (daemon *Daemon) setStateCounter(c *container.Container) { } func (daemon *Daemon) handleContainerExit(c *container.Container, e *libcontainerdtypes.EventInfo) error { - var exitStatus container.ExitStatus + var ( + exitStatus container.ExitStatus + taskDeletionDone chan struct{} + ) c.Lock() cfg := daemon.config() @@ -39,19 +42,33 @@ func (daemon *Daemon) handleContainerExit(c *container.Container, e *libcontaine tsk, ok := c.Task() if ok { - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - es, err := tsk.Delete(ctx) - cancel() - if err != nil { - log.G(ctx).WithFields(log.Fields{ - "error": err, - "container": c.ID, - }).Warn("failed to delete container from containerd") - } else { - exitStatus = container.ExitStatus{ - ExitCode: int(es.ExitCode()), - ExitedAt: es.ExitTime(), + taskDeletionDone = make(chan struct{}) + go func() { + defer close(taskDeletionDone) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + es, err := tsk.Delete(ctx) + cancel() + if err != nil { + log.G(ctx).WithFields(log.Fields{ + "error": err, + "container": c.ID, + }).Warn("failed to delete container from containerd") + } else { + exitStatus = container.ExitStatus{ + ExitCode: int(es.ExitCode()), + ExitedAt: es.ExitTime(), + } } + }() + + deletionIOCloseTimeout := time.NewTimer(3 * time.Second) + select { + case <-taskDeletionDone: + deletionIOCloseTimeout.Stop() + case <-deletionIOCloseTimeout.C: + // if tsk.Delete(ctx) did not exit after 3 seconds, try to close IO + // streams - they may be blocking the deletion - and continue + // waiting after that } } @@ -59,7 +76,13 @@ func (daemon *Daemon) handleContainerExit(c *container.Container, e *libcontaine c.StreamConfig.Wait(ctx) cancel() - c.Reset(false) + ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second) + c.Reset(ctx, false) + cancel() + + if taskDeletionDone != nil { + <-taskDeletionDone + } if e != nil { exitStatus.ExitCode = int(e.ExitCode) @@ -194,9 +217,11 @@ func (daemon *Daemon) ProcessEvent(id string, e libcontainerdtypes.EventType, ei execConfig.StreamConfig.Wait(ctx) cancel() - if err := execConfig.CloseStreams(); err != nil { + ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second) + if err := execConfig.CloseStreams(ctx); err != nil { log.G(ctx).Errorf("failed to cleanup exec %s streams: %s", c.ID, err) } + cancel() exitCode = ec diff --git a/daemon/start.go b/daemon/start.go index b967947af2..0bb75a0595 100644 --- a/daemon/start.go +++ b/daemon/start.go @@ -96,7 +96,7 @@ func (daemon *Daemon) containerStart(ctx context.Context, daemonCfg *configStore if err := container.CheckpointTo(daemon.containersReplica); err != nil { log.G(ctx).Errorf("%s: failed saving state on start failure: %v", container.ID, err) } - container.Reset(false) + container.Reset(ctx, false) daemon.Cleanup(compatcontext.WithoutCancel(ctx), container) // if containers AutoRemove flag is set, remove it after clean up diff --git a/pkg/broadcaster/unbuffered.go b/pkg/broadcaster/unbuffered.go index 6bb285123f..ec5584a0c6 100644 --- a/pkg/broadcaster/unbuffered.go +++ b/pkg/broadcaster/unbuffered.go @@ -1,49 +1,141 @@ package broadcaster // import "github.com/docker/docker/pkg/broadcaster" import ( + "context" "io" "sync" ) // Unbuffered accumulates multiple io.WriteCloser by stream. type Unbuffered struct { - mu sync.Mutex - writers []io.WriteCloser + writersUpdateMu sync.Mutex + + writersWriteMu chan struct{} + initWritersWriteMuOnce sync.Once + + writers *[]io.WriteCloser } // Add adds new io.WriteCloser. func (w *Unbuffered) Add(writer io.WriteCloser) { - w.mu.Lock() - w.writers = append(w.writers, writer) - w.mu.Unlock() + w.lockWritersWriteMu() + defer w.unlockWritersWriteMu() + + w.writersUpdateMu.Lock() + defer w.writersUpdateMu.Unlock() + + if w.writers == nil { + w.writers = &[]io.WriteCloser{} + } + *w.writers = append(*w.writers, writer) } // Write writes bytes to all writers. Failed writers will be evicted during // this call. func (w *Unbuffered) Write(p []byte) (n int, err error) { - w.mu.Lock() + w.lockWritersWriteMu() + defer w.unlockWritersWriteMu() + + w.writersUpdateMu.Lock() + + // make a copy of w.writers. Even if .CleanContext sets w.writers, this + // Write call will still use it's valid copy + writers := w.writers + + // release w.writersUpdateMu. This allows CleanContext to set w.writers + // to nil - but this Write call won't be affected by this, as it uses a copy + // of that pointer. We will also be able to safely iterate over *writers: + // clean methods never resize the slice (they only may set pointer to nil), + // and Add and Write require w.writersWriteMu, which we are still holding + w.writersUpdateMu.Unlock() + + if writers == nil { + return + } + var evict []int - for i, sw := range w.writers { + for i, sw := range *writers { if n, err := sw.Write(p); err != nil || n != len(p) { // On error, evict the writer evict = append(evict, i) } } + + w.writersUpdateMu.Lock() + // at this point w.writers might have already been set to nil, but we're + // not affected by this as we are using a copy for n, i := range evict { - w.writers = append(w.writers[:i-n], w.writers[i-n+1:]...) + *writers = append((*writers)[:i-n], (*writers)[i-n+1:]...) } - w.mu.Unlock() + w.writersUpdateMu.Unlock() + return len(p), nil } +func (w *Unbuffered) cleanUnlocked() { + if w.writers == nil { + return + } + for _, sw := range *w.writers { + sw.Close() + } + w.writers = nil +} + +func (w *Unbuffered) cleanWithWriteLock() { + w.writersUpdateMu.Lock() + defer w.writersUpdateMu.Unlock() + + w.cleanUnlocked() +} + // Clean closes and removes all writers. Last non-eol-terminated part of data // will be saved. func (w *Unbuffered) Clean() error { - w.mu.Lock() - for _, sw := range w.writers { - sw.Close() - } - w.writers = nil - w.mu.Unlock() + w.lockWritersWriteMu() + defer w.unlockWritersWriteMu() + + w.cleanWithWriteLock() return nil } + +// CleanContext closes and removes all writers. +// CleanContext supports timeouts via the context to unblock and forcefully +// close the io streams. This function should only be used if all writers +// added to Unbuffered support concurrent calls to Close and Write: it will +// call Close while Write may be in progress in order to forcefully close +// writers +func (w *Unbuffered) CleanContext(ctx context.Context) error { + writersWriteMu := w.getWritersWriteMu() + + select { + case writersWriteMu <- struct{}{}: + defer w.unlockWritersWriteMu() + case <-ctx.Done(): + // forceful cleanup - we will call w.cleanWithWriteLock() without + // actually holding w.writersWriteMu. This may call .Close() on a + // WriteCloser which is being blocked insite Write call + } + + w.cleanWithWriteLock() + return ctx.Err() +} + +func (w *Unbuffered) initWritersWriteMu() { + w.writersWriteMu = make(chan struct{}, 1) +} + +func (w *Unbuffered) getWritersWriteMu() chan struct{} { + w.initWritersWriteMuOnce.Do(w.initWritersWriteMu) + return w.writersWriteMu +} + +func (w *Unbuffered) lockWritersWriteMu() { + w.getWritersWriteMu() <- struct{}{} +} + +func (w *Unbuffered) unlockWritersWriteMu() { + // this is never called before 'getWritersWriteMu()', so w.writersWriteMu is + // guaranteed to be initialized + <-w.writersWriteMu +}