瀏覽代碼

Merge bc906c8a12895b438887abe4df468854bb21160b into ee8b788538ea2c6d46d65f17be156de65bc21bb9

asterite3 1 年之前
父節點
當前提交
d534bb13d8
共有 8 個文件被更改,包括 162 次插入45 次删除
  1. 4 4
      container/container.go
  2. 2 2
      container/exec.go
  3. 4 4
      container/monitor.go
  4. 3 3
      container/stream/streams.go
  5. 1 1
      daemon/exec.go
  6. 40 15
      daemon/monitor.go
  7. 1 1
      daemon/start.go
  8. 107 15
      pkg/broadcaster/unbuffered.go

+ 4 - 4
container/container.go

@@ -664,14 +664,14 @@ func (container *Container) StderrPipe() io.ReadCloser {
 }
 
 // CloseStreams closes the container's stdio streams
-func (container *Container) CloseStreams() error {
-	return container.StreamConfig.CloseStreams()
+func (container *Container) CloseStreams(ctx context.Context) error {
+	return container.StreamConfig.CloseStreams(ctx)
 }
 
 // InitializeStdio is called by libcontainerd to connect the stdio.
 func (container *Container) InitializeStdio(iop *cio.DirectIO) (cio.IO, error) {
 	if err := container.startLogging(); err != nil {
-		container.Reset(false)
+		container.Reset(context.Background(), false)
 		return nil, err
 	}
 
@@ -810,7 +810,7 @@ type rio struct {
 func (i *rio) Close() error {
 	i.IO.Close()
 
-	return i.sc.CloseStreams()
+	return i.sc.CloseStreams(context.Background())
 }
 
 func (i *rio) Wait() {

+ 2 - 2
container/exec.go

@@ -65,8 +65,8 @@ func (c *ExecConfig) InitializeStdio(iop *cio.DirectIO) (cio.IO, error) {
 }
 
 // CloseStreams closes the stdio streams for the exec
-func (c *ExecConfig) CloseStreams() error {
-	return c.StreamConfig.CloseStreams()
+func (c *ExecConfig) CloseStreams(ctx context.Context) error {
+	return c.StreamConfig.CloseStreams(ctx)
 }
 
 // SetExitCode sets the exec config's exit code

+ 4 - 4
container/monitor.go

@@ -12,14 +12,14 @@ const (
 )
 
 // Reset puts a container into a state where it can be restarted again.
-func (container *Container) Reset(lock bool) {
+func (container *Container) Reset(ctx context.Context, lock bool) {
 	if lock {
 		container.Lock()
 		defer container.Unlock()
 	}
 
-	if err := container.CloseStreams(); err != nil {
-		log.G(context.TODO()).Errorf("%s: %s", container.ID, err)
+	if err := container.CloseStreams(ctx); err != nil {
+		log.G(ctx).Errorf("%s: %s", container.ID, err)
 	}
 
 	// Re-create a brand new stdin pipe once the container exited
@@ -39,7 +39,7 @@ func (container *Container) Reset(lock bool) {
 			defer timer.Stop()
 			select {
 			case <-timer.C:
-				log.G(context.TODO()).Warn("Logger didn't exit in time: logs may be truncated")
+				log.G(ctx).Warn("Logger didn't exit in time: logs may be truncated")
 			case <-exit:
 			}
 		}

+ 3 - 3
container/stream/streams.go

@@ -90,7 +90,7 @@ func (c *Config) NewNopInputPipe() {
 }
 
 // CloseStreams ensures that the configured streams are properly closed.
-func (c *Config) CloseStreams() error {
+func (c *Config) CloseStreams(ctx context.Context) error {
 	var errors []string
 
 	if c.stdin != nil {
@@ -99,11 +99,11 @@ func (c *Config) CloseStreams() error {
 		}
 	}
 
-	if err := c.stdout.Clean(); err != nil {
+	if err := c.stdout.CleanContext(ctx); err != nil {
 		errors = append(errors, fmt.Sprintf("error close stdout: %s", err))
 	}
 
-	if err := c.stderr.Clean(); err != nil {
+	if err := c.stderr.CleanContext(ctx); err != nil {
 		errors = append(errors, fmt.Sprintf("error close stderr: %s", err))
 	}
 

+ 1 - 1
daemon/exec.go

@@ -185,7 +185,7 @@ func (daemon *Daemon) ContainerExecStart(ctx context.Context, name string, optio
 			ec.Running = false
 			exitCode := 126
 			ec.ExitCode = &exitCode
-			if err := ec.CloseStreams(); err != nil {
+			if err := ec.CloseStreams(ctx); err != nil {
 				log.G(ctx).Errorf("failed to cleanup exec %s streams: %s", ec.Container.ID, err)
 			}
 			ec.Unlock()

+ 40 - 15
daemon/monitor.go

@@ -28,7 +28,10 @@ func (daemon *Daemon) setStateCounter(c *container.Container) {
 }
 
 func (daemon *Daemon) handleContainerExit(c *container.Container, e *libcontainerdtypes.EventInfo) error {
-	var exitStatus container.ExitStatus
+	var (
+		exitStatus       container.ExitStatus
+		taskDeletionDone chan struct{}
+	)
 	c.Lock()
 
 	cfg := daemon.config()
@@ -39,19 +42,33 @@ func (daemon *Daemon) handleContainerExit(c *container.Container, e *libcontaine
 
 	tsk, ok := c.Task()
 	if ok {
-		ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
-		es, err := tsk.Delete(ctx)
-		cancel()
-		if err != nil {
-			log.G(ctx).WithFields(log.Fields{
-				"error":     err,
-				"container": c.ID,
-			}).Warn("failed to delete container from containerd")
-		} else {
-			exitStatus = container.ExitStatus{
-				ExitCode: int(es.ExitCode()),
-				ExitedAt: es.ExitTime(),
+		taskDeletionDone = make(chan struct{})
+		go func() {
+			defer close(taskDeletionDone)
+			ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+			es, err := tsk.Delete(ctx)
+			cancel()
+			if err != nil {
+				log.G(ctx).WithFields(log.Fields{
+					"error":     err,
+					"container": c.ID,
+				}).Warn("failed to delete container from containerd")
+			} else {
+				exitStatus = container.ExitStatus{
+					ExitCode: int(es.ExitCode()),
+					ExitedAt: es.ExitTime(),
+				}
 			}
+		}()
+
+		deletionIOCloseTimeout := time.NewTimer(3 * time.Second)
+		select {
+		case <-taskDeletionDone:
+			deletionIOCloseTimeout.Stop()
+		case <-deletionIOCloseTimeout.C:
+			// if tsk.Delete(ctx) did not exit after 3 seconds, try to close IO
+			// streams - they may be blocking the deletion - and continue
+			// waiting after that
 		}
 	}
 
@@ -59,7 +76,13 @@ func (daemon *Daemon) handleContainerExit(c *container.Container, e *libcontaine
 	c.StreamConfig.Wait(ctx)
 	cancel()
 
-	c.Reset(false)
+	ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second)
+	c.Reset(ctx, false)
+	cancel()
+
+	if taskDeletionDone != nil {
+		<-taskDeletionDone
+	}
 
 	if e != nil {
 		exitStatus.ExitCode = int(e.ExitCode)
@@ -194,9 +217,11 @@ func (daemon *Daemon) ProcessEvent(id string, e libcontainerdtypes.EventType, ei
 			execConfig.StreamConfig.Wait(ctx)
 			cancel()
 
-			if err := execConfig.CloseStreams(); err != nil {
+			ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second)
+			if err := execConfig.CloseStreams(ctx); err != nil {
 				log.G(ctx).Errorf("failed to cleanup exec %s streams: %s", c.ID, err)
 			}
+			cancel()
 
 			exitCode = ec
 

+ 1 - 1
daemon/start.go

@@ -96,7 +96,7 @@ func (daemon *Daemon) containerStart(ctx context.Context, daemonCfg *configStore
 			if err := container.CheckpointTo(daemon.containersReplica); err != nil {
 				log.G(ctx).Errorf("%s: failed saving state on start failure: %v", container.ID, err)
 			}
-			container.Reset(false)
+			container.Reset(ctx, false)
 
 			daemon.Cleanup(compatcontext.WithoutCancel(ctx), container)
 			// if containers AutoRemove flag is set, remove it after clean up

+ 107 - 15
pkg/broadcaster/unbuffered.go

@@ -1,49 +1,141 @@
 package broadcaster // import "github.com/docker/docker/pkg/broadcaster"
 
 import (
+	"context"
 	"io"
 	"sync"
 )
 
 // Unbuffered accumulates multiple io.WriteCloser by stream.
 type Unbuffered struct {
-	mu      sync.Mutex
-	writers []io.WriteCloser
+	writersUpdateMu sync.Mutex
+
+	writersWriteMu         chan struct{}
+	initWritersWriteMuOnce sync.Once
+
+	writers *[]io.WriteCloser
 }
 
 // Add adds new io.WriteCloser.
 func (w *Unbuffered) Add(writer io.WriteCloser) {
-	w.mu.Lock()
-	w.writers = append(w.writers, writer)
-	w.mu.Unlock()
+	w.lockWritersWriteMu()
+	defer w.unlockWritersWriteMu()
+
+	w.writersUpdateMu.Lock()
+	defer w.writersUpdateMu.Unlock()
+
+	if w.writers == nil {
+		w.writers = &[]io.WriteCloser{}
+	}
+	*w.writers = append(*w.writers, writer)
 }
 
 // Write writes bytes to all writers. Failed writers will be evicted during
 // this call.
 func (w *Unbuffered) Write(p []byte) (n int, err error) {
-	w.mu.Lock()
+	w.lockWritersWriteMu()
+	defer w.unlockWritersWriteMu()
+
+	w.writersUpdateMu.Lock()
+
+	// make a copy of w.writers. Even if .CleanContext sets w.writers, this
+	// Write call will still use it's valid copy
+	writers := w.writers
+
+	// release w.writersUpdateMu. This allows CleanContext to set w.writers
+	// to nil - but this Write call won't be affected by this, as it uses a copy
+	// of that pointer. We will also be able to safely iterate over *writers:
+	// clean methods never resize the slice (they only may set pointer to nil),
+	// and Add and Write require w.writersWriteMu, which we are still holding
+	w.writersUpdateMu.Unlock()
+
+	if writers == nil {
+		return
+	}
+
 	var evict []int
-	for i, sw := range w.writers {
+	for i, sw := range *writers {
 		if n, err := sw.Write(p); err != nil || n != len(p) {
 			// On error, evict the writer
 			evict = append(evict, i)
 		}
 	}
+
+	w.writersUpdateMu.Lock()
+	// at this point w.writers might have already been set to nil, but we're
+	// not affected by this as we are using a copy
 	for n, i := range evict {
-		w.writers = append(w.writers[:i-n], w.writers[i-n+1:]...)
+		*writers = append((*writers)[:i-n], (*writers)[i-n+1:]...)
 	}
-	w.mu.Unlock()
+	w.writersUpdateMu.Unlock()
+
 	return len(p), nil
 }
 
-// Clean closes and removes all writers. Last non-eol-terminated part of data
-// will be saved.
-func (w *Unbuffered) Clean() error {
-	w.mu.Lock()
-	for _, sw := range w.writers {
+func (w *Unbuffered) cleanUnlocked() {
+	if w.writers == nil {
+		return
+	}
+	for _, sw := range *w.writers {
 		sw.Close()
 	}
 	w.writers = nil
-	w.mu.Unlock()
+}
+
+func (w *Unbuffered) cleanWithWriteLock() {
+	w.writersUpdateMu.Lock()
+	defer w.writersUpdateMu.Unlock()
+
+	w.cleanUnlocked()
+}
+
+// Clean closes and removes all writers. Last non-eol-terminated part of data
+// will be saved.
+func (w *Unbuffered) Clean() error {
+	w.lockWritersWriteMu()
+	defer w.unlockWritersWriteMu()
+
+	w.cleanWithWriteLock()
 	return nil
 }
+
+// CleanContext closes and removes all writers.
+// CleanContext supports timeouts via the context to unblock and forcefully
+// close the io streams. This function should only be used if all writers
+// added to Unbuffered support concurrent calls to Close and Write: it will
+// call Close while Write may be in progress in order to forcefully close
+// writers
+func (w *Unbuffered) CleanContext(ctx context.Context) error {
+	writersWriteMu := w.getWritersWriteMu()
+
+	select {
+	case writersWriteMu <- struct{}{}:
+		defer w.unlockWritersWriteMu()
+	case <-ctx.Done():
+		// forceful cleanup - we will call w.cleanWithWriteLock() without
+		// actually holding w.writersWriteMu. This may call .Close() on a
+		// WriteCloser which is being blocked insite Write call
+	}
+
+	w.cleanWithWriteLock()
+	return ctx.Err()
+}
+
+func (w *Unbuffered) initWritersWriteMu() {
+	w.writersWriteMu = make(chan struct{}, 1)
+}
+
+func (w *Unbuffered) getWritersWriteMu() chan struct{} {
+	w.initWritersWriteMuOnce.Do(w.initWritersWriteMu)
+	return w.writersWriteMu
+}
+
+func (w *Unbuffered) lockWritersWriteMu() {
+	w.getWritersWriteMu() <- struct{}{}
+}
+
+func (w *Unbuffered) unlockWritersWriteMu() {
+	// this is never called before 'getWritersWriteMu()', so w.writersWriteMu is
+	// guaranteed to be initialized
+	<-w.writersWriteMu
+}