Forráskód Böngészése

container: cleanup stdout/stderr with context

Add context argument to CloseStreams method of
container/stream.Config and methods Reset and CloseStreams
of container.Container: use that context to limit the time waiting
for stdout and stderr of stream.Config to cleanup (i. e. for all
their attached writers to close).

Signed-off-by: Daniil Sigalov <asterite@seclab.cs.msu.ru>
Daniil Sigalov 4 éve
szülő
commit
e9bfffa378

+ 4 - 4
container/container.go

@@ -657,14 +657,14 @@ func (container *Container) StderrPipe() io.ReadCloser {
 }
 }
 
 
 // CloseStreams closes the container's stdio streams
 // CloseStreams closes the container's stdio streams
-func (container *Container) CloseStreams() error {
-	return container.StreamConfig.CloseStreams()
+func (container *Container) CloseStreams(ctx context.Context) error {
+	return container.StreamConfig.CloseStreams(ctx)
 }
 }
 
 
 // InitializeStdio is called by libcontainerd to connect the stdio.
 // InitializeStdio is called by libcontainerd to connect the stdio.
 func (container *Container) InitializeStdio(iop *cio.DirectIO) (cio.IO, error) {
 func (container *Container) InitializeStdio(iop *cio.DirectIO) (cio.IO, error) {
 	if err := container.startLogging(); err != nil {
 	if err := container.startLogging(); err != nil {
-		container.Reset(false)
+		container.Reset(context.Background(), false)
 		return nil, err
 		return nil, err
 	}
 	}
 
 
@@ -803,7 +803,7 @@ type rio struct {
 func (i *rio) Close() error {
 func (i *rio) Close() error {
 	i.IO.Close()
 	i.IO.Close()
 
 
-	return i.sc.CloseStreams()
+	return i.sc.CloseStreams(context.Background())
 }
 }
 
 
 func (i *rio) Wait() {
 func (i *rio) Wait() {

+ 1 - 1
container/exec.go

@@ -66,7 +66,7 @@ func (c *ExecConfig) InitializeStdio(iop *cio.DirectIO) (cio.IO, error) {
 
 
 // CloseStreams closes the stdio streams for the exec
 // CloseStreams closes the stdio streams for the exec
 func (c *ExecConfig) CloseStreams() error {
 func (c *ExecConfig) CloseStreams() error {
-	return c.StreamConfig.CloseStreams()
+	return c.StreamConfig.CloseStreams(context.Background())
 }
 }
 
 
 // SetExitCode sets the exec config's exit code
 // SetExitCode sets the exec config's exit code

+ 4 - 4
container/monitor.go

@@ -12,14 +12,14 @@ const (
 )
 )
 
 
 // Reset puts a container into a state where it can be restarted again.
 // Reset puts a container into a state where it can be restarted again.
-func (container *Container) Reset(lock bool) {
+func (container *Container) Reset(ctx context.Context, lock bool) {
 	if lock {
 	if lock {
 		container.Lock()
 		container.Lock()
 		defer container.Unlock()
 		defer container.Unlock()
 	}
 	}
 
 
-	if err := container.CloseStreams(); err != nil {
-		log.G(context.TODO()).Errorf("%s: %s", container.ID, err)
+	if err := container.CloseStreams(ctx); err != nil {
+		log.G(ctx).Errorf("%s: %s", container.ID, err)
 	}
 	}
 
 
 	// Re-create a brand new stdin pipe once the container exited
 	// Re-create a brand new stdin pipe once the container exited
@@ -39,7 +39,7 @@ func (container *Container) Reset(lock bool) {
 			defer timer.Stop()
 			defer timer.Stop()
 			select {
 			select {
 			case <-timer.C:
 			case <-timer.C:
-				log.G(context.TODO()).Warn("Logger didn't exit in time: logs may be truncated")
+				log.G(ctx).Warn("Logger didn't exit in time: logs may be truncated")
 			case <-exit:
 			case <-exit:
 			}
 			}
 		}
 		}

+ 3 - 3
container/stream/streams.go

@@ -90,7 +90,7 @@ func (c *Config) NewNopInputPipe() {
 }
 }
 
 
 // CloseStreams ensures that the configured streams are properly closed.
 // CloseStreams ensures that the configured streams are properly closed.
-func (c *Config) CloseStreams() error {
+func (c *Config) CloseStreams(ctx context.Context) error {
 	var errors []string
 	var errors []string
 
 
 	if c.stdin != nil {
 	if c.stdin != nil {
@@ -99,11 +99,11 @@ func (c *Config) CloseStreams() error {
 		}
 		}
 	}
 	}
 
 
-	if err := c.stdout.Clean(); err != nil {
+	if err := c.stdout.CleanContext(ctx); err != nil {
 		errors = append(errors, fmt.Sprintf("error close stdout: %s", err))
 		errors = append(errors, fmt.Sprintf("error close stdout: %s", err))
 	}
 	}
 
 
-	if err := c.stderr.Clean(); err != nil {
+	if err := c.stderr.CleanContext(ctx); err != nil {
 		errors = append(errors, fmt.Sprintf("error close stderr: %s", err))
 		errors = append(errors, fmt.Sprintf("error close stderr: %s", err))
 	}
 	}
 
 

+ 3 - 1
daemon/monitor.go

@@ -58,7 +58,9 @@ func (daemon *Daemon) handleContainerExit(c *container.Container, e *libcontaine
 	c.StreamConfig.Wait(ctx)
 	c.StreamConfig.Wait(ctx)
 	cancel()
 	cancel()
 
 
-	c.Reset(false)
+	ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second)
+	c.Reset(ctx, false)
+	cancel()
 
 
 	if e != nil {
 	if e != nil {
 		exitStatus.ExitCode = int(e.ExitCode)
 		exitStatus.ExitCode = int(e.ExitCode)

+ 1 - 1
daemon/start.go

@@ -130,7 +130,7 @@ func (daemon *Daemon) containerStart(ctx context.Context, daemonCfg *configStore
 			if err := container.CheckpointTo(daemon.containersReplica); err != nil {
 			if err := container.CheckpointTo(daemon.containersReplica); err != nil {
 				log.G(ctx).Errorf("%s: failed saving state on start failure: %v", container.ID, err)
 				log.G(ctx).Errorf("%s: failed saving state on start failure: %v", container.ID, err)
 			}
 			}
-			container.Reset(false)
+			container.Reset(ctx, false)
 
 
 			daemon.Cleanup(container)
 			daemon.Cleanup(container)
 			// if containers AutoRemove flag is set, remove it after clean up
 			// if containers AutoRemove flag is set, remove it after clean up