Bläddra i källkod

Merge pull request #43564 from corhere/libcontainerd-overhaul

Refactor libcontainerd to minimize containerd RPCs
Tianon Gravi 2 år sedan
förälder
incheckning
0ec426a57b

+ 2 - 1
api/swagger.yaml

@@ -4650,7 +4650,8 @@ definitions:
         example: false
       OOMKilled:
         description: |
-          Whether this container has been killed because it ran out of memory.
+          Whether a process within this container has been killed because it ran
+          out of memory since the container was last started.
         type: "boolean"
         example: false
       Dead:

+ 44 - 6
container/container.go

@@ -19,7 +19,6 @@ import (
 	mounttypes "github.com/docker/docker/api/types/mount"
 	swarmtypes "github.com/docker/docker/api/types/swarm"
 	"github.com/docker/docker/container/stream"
-	"github.com/docker/docker/daemon/exec"
 	"github.com/docker/docker/daemon/logger"
 	"github.com/docker/docker/daemon/logger/jsonfilelog"
 	"github.com/docker/docker/daemon/logger/local"
@@ -28,6 +27,7 @@ import (
 	"github.com/docker/docker/errdefs"
 	"github.com/docker/docker/image"
 	"github.com/docker/docker/layer"
+	libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
 	"github.com/docker/docker/pkg/containerfs"
 	"github.com/docker/docker/pkg/idtools"
 	"github.com/docker/docker/pkg/ioutils"
@@ -53,9 +53,6 @@ type ExitStatus struct {
 	// The exit code with which the container exited.
 	ExitCode int
 
-	// Whether the container encountered an OOM.
-	OOMKilled bool
-
 	// Time at which the container died
 	ExitedAt time.Time
 }
@@ -89,7 +86,7 @@ type Container struct {
 	HasBeenManuallyRestarted bool `json:"-"` // used to distinguish restart caused by restart policy from the manual one
 	MountPoints              map[string]*volumemounts.MountPoint
 	HostConfig               *containertypes.HostConfig `json:"-"` // do not serialize the host config in the json, otherwise we'll make the container unportable
-	ExecCommands             *exec.Store                `json:"-"`
+	ExecCommands             *ExecStore                 `json:"-"`
 	DependencyStore          agentexec.DependencyGetter `json:"-"`
 	SecretReferences         []*swarmtypes.SecretReference
 	ConfigReferences         []*swarmtypes.ConfigReference
@@ -124,7 +121,7 @@ func NewBaseContainer(id, root string) *Container {
 	return &Container{
 		ID:            id,
 		State:         NewState(),
-		ExecCommands:  exec.NewStore(),
+		ExecCommands:  NewExecStore(),
 		Root:          root,
 		MountPoints:   make(map[string]*volumemounts.MountPoint),
 		StreamConfig:  stream.NewConfig(),
@@ -755,6 +752,47 @@ func (container *Container) CreateDaemonEnvironment(tty bool, linkedEnv []string
 	return env
 }
 
+// RestoreTask restores the containerd container and task handles and reattaches
+// the IO for the running task. Container state is not synced with containerd's
+// state.
+//
+// An errdefs.NotFound error is returned if the container does not exist in
+// containerd. However, a nil error is returned if the task does not exist in
+// containerd.
+func (container *Container) RestoreTask(ctx context.Context, client libcontainerdtypes.Client) error {
+	container.Lock()
+	defer container.Unlock()
+	var err error
+	container.ctr, err = client.LoadContainer(ctx, container.ID)
+	if err != nil {
+		return err
+	}
+	container.task, err = container.ctr.AttachTask(ctx, container.InitializeStdio)
+	if err != nil && !errdefs.IsNotFound(err) {
+		return err
+	}
+	return nil
+}
+
+// GetRunningTask asserts that the container is running and returns the Task for
+// the container. An errdefs.Conflict error is returned if the container is not
+// in the Running state.
+//
+// A system error is returned if container is in a bad state: Running is true
+// but has a nil Task.
+//
+// The container lock must be held when calling this method.
+func (container *Container) GetRunningTask() (libcontainerdtypes.Task, error) {
+	if !container.Running {
+		return nil, errdefs.Conflict(fmt.Errorf("container %s is not running", container.ID))
+	}
+	tsk, ok := container.Task()
+	if !ok {
+		return nil, errdefs.System(errors.WithStack(fmt.Errorf("container %s is in Running state but has no containerd Task set", container.ID)))
+	}
+	return tsk, nil
+}
+
 type rio struct {
 	cio.IO
 

+ 37 - 54
daemon/exec/exec.go → container/exec.go

@@ -1,20 +1,20 @@
-package exec // import "github.com/docker/docker/daemon/exec"
+package container // import "github.com/docker/docker/container"
 
 import (
-	"context"
 	"runtime"
 	"sync"
 
 	"github.com/containerd/containerd/cio"
 	"github.com/docker/docker/container/stream"
+	"github.com/docker/docker/libcontainerd/types"
 	"github.com/docker/docker/pkg/stringid"
 	"github.com/sirupsen/logrus"
 )
 
-// Config holds the configurations for execs. The Daemon keeps
+// ExecConfig holds the configurations for execs. The Daemon keeps
 // track of both running and finished execs so that they can be
 // examined both during and after completion.
-type Config struct {
+type ExecConfig struct {
 	sync.Mutex
 	Started      chan struct{}
 	StreamConfig *stream.Config
@@ -25,7 +25,7 @@ type Config struct {
 	OpenStderr   bool
 	OpenStdout   bool
 	CanRemove    bool
-	ContainerID  string
+	Container    *Container
 	DetachKeys   []byte
 	Entrypoint   string
 	Args         []string
@@ -34,39 +34,22 @@ type Config struct {
 	User         string
 	WorkingDir   string
 	Env          []string
-	Pid          int
+	Process      types.Process
 	ConsoleSize  *[2]uint
 }
 
-// NewConfig initializes the a new exec configuration
-func NewConfig() *Config {
-	return &Config{
+// NewExecConfig initializes the a new exec configuration
+func NewExecConfig(c *Container) *ExecConfig {
+	return &ExecConfig{
 		ID:           stringid.GenerateRandomID(),
+		Container:    c,
 		StreamConfig: stream.NewConfig(),
 		Started:      make(chan struct{}),
 	}
 }
 
-type rio struct {
-	cio.IO
-
-	sc *stream.Config
-}
-
-func (i *rio) Close() error {
-	i.IO.Close()
-
-	return i.sc.CloseStreams()
-}
-
-func (i *rio) Wait() {
-	i.sc.Wait(context.Background())
-
-	i.IO.Wait()
-}
-
 // InitializeStdio is called by libcontainerd to connect the stdio.
-func (c *Config) InitializeStdio(iop *cio.DirectIO) (cio.IO, error) {
+func (c *ExecConfig) InitializeStdio(iop *cio.DirectIO) (cio.IO, error) {
 	c.StreamConfig.CopyToPipe(iop)
 
 	if c.StreamConfig.Stdin() == nil && !c.Tty && runtime.GOOS == "windows" {
@@ -81,68 +64,68 @@ func (c *Config) InitializeStdio(iop *cio.DirectIO) (cio.IO, error) {
 }
 
 // CloseStreams closes the stdio streams for the exec
-func (c *Config) CloseStreams() error {
+func (c *ExecConfig) CloseStreams() error {
 	return c.StreamConfig.CloseStreams()
 }
 
 // SetExitCode sets the exec config's exit code
-func (c *Config) SetExitCode(code int) {
+func (c *ExecConfig) SetExitCode(code int) {
 	c.ExitCode = &code
 }
 
-// Store keeps track of the exec configurations.
-type Store struct {
-	byID map[string]*Config
-	sync.RWMutex
+// ExecStore keeps track of the exec configurations.
+type ExecStore struct {
+	byID map[string]*ExecConfig
+	mu   sync.RWMutex
 }
 
-// NewStore initializes a new exec store.
-func NewStore() *Store {
-	return &Store{
-		byID: make(map[string]*Config),
+// NewExecStore initializes a new exec store.
+func NewExecStore() *ExecStore {
+	return &ExecStore{
+		byID: make(map[string]*ExecConfig),
 	}
 }
 
 // Commands returns the exec configurations in the store.
-func (e *Store) Commands() map[string]*Config {
-	e.RLock()
-	byID := make(map[string]*Config, len(e.byID))
+func (e *ExecStore) Commands() map[string]*ExecConfig {
+	e.mu.RLock()
+	byID := make(map[string]*ExecConfig, len(e.byID))
 	for id, config := range e.byID {
 		byID[id] = config
 	}
-	e.RUnlock()
+	e.mu.RUnlock()
 	return byID
 }
 
 // Add adds a new exec configuration to the store.
-func (e *Store) Add(id string, Config *Config) {
-	e.Lock()
+func (e *ExecStore) Add(id string, Config *ExecConfig) {
+	e.mu.Lock()
 	e.byID[id] = Config
-	e.Unlock()
+	e.mu.Unlock()
 }
 
 // Get returns an exec configuration by its id.
-func (e *Store) Get(id string) *Config {
-	e.RLock()
+func (e *ExecStore) Get(id string) *ExecConfig {
+	e.mu.RLock()
 	res := e.byID[id]
-	e.RUnlock()
+	e.mu.RUnlock()
 	return res
 }
 
 // Delete removes an exec configuration from the store.
-func (e *Store) Delete(id string, pid int) {
-	e.Lock()
+func (e *ExecStore) Delete(id string) {
+	e.mu.Lock()
 	delete(e.byID, id)
-	e.Unlock()
+	e.mu.Unlock()
 }
 
 // List returns the list of exec ids in the store.
-func (e *Store) List() []string {
+func (e *ExecStore) List() []string {
 	var IDs []string
-	e.RLock()
+	e.mu.RLock()
 	for id := range e.byID {
 		IDs = append(IDs, id)
 	}
-	e.RUnlock()
+	e.mu.RUnlock()
 	return IDs
 }

+ 36 - 4
container/state.go

@@ -8,6 +8,7 @@ import (
 	"time"
 
 	"github.com/docker/docker/api/types"
+	libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
 	units "github.com/docker/go-units"
 )
 
@@ -36,6 +37,14 @@ type State struct {
 
 	stopWaiters       []chan<- StateStatus
 	removeOnlyWaiters []chan<- StateStatus
+
+	// The libcontainerd reference fields are unexported to force consumers
+	// to access them through the getter methods with multi-valued returns
+	// so that they can't forget to nil-check: the code won't compile unless
+	// the nil-check result is explicitly consumed or discarded.
+
+	ctr  libcontainerdtypes.Container
+	task libcontainerdtypes.Task
 }
 
 // StateStatus is used to return container wait results.
@@ -260,7 +269,7 @@ func (s *State) SetExitCode(ec int) {
 }
 
 // SetRunning sets the state of the container to "running".
-func (s *State) SetRunning(pid int, initial bool) {
+func (s *State) SetRunning(ctr libcontainerdtypes.Container, tsk libcontainerdtypes.Task, initial bool) {
 	s.ErrorMsg = ""
 	s.Paused = false
 	s.Running = true
@@ -269,7 +278,14 @@ func (s *State) SetRunning(pid int, initial bool) {
 		s.Paused = false
 	}
 	s.ExitCodeValue = 0
-	s.Pid = pid
+	s.ctr = ctr
+	s.task = tsk
+	if tsk != nil {
+		s.Pid = int(tsk.Pid())
+	} else {
+		s.Pid = 0
+	}
+	s.OOMKilled = false
 	if initial {
 		s.StartedAt = time.Now().UTC()
 	}
@@ -287,7 +303,6 @@ func (s *State) SetStopped(exitStatus *ExitStatus) {
 		s.FinishedAt = exitStatus.ExitedAt
 	}
 	s.ExitCodeValue = exitStatus.ExitCode
-	s.OOMKilled = exitStatus.OOMKilled
 
 	s.notifyAndClear(&s.stopWaiters)
 }
@@ -303,7 +318,6 @@ func (s *State) SetRestarting(exitStatus *ExitStatus) {
 	s.Pid = 0
 	s.FinishedAt = time.Now().UTC()
 	s.ExitCodeValue = exitStatus.ExitCode
-	s.OOMKilled = exitStatus.OOMKilled
 
 	s.notifyAndClear(&s.stopWaiters)
 }
@@ -405,3 +419,21 @@ func (s *State) notifyAndClear(waiters *[]chan<- StateStatus) {
 	}
 	*waiters = nil
 }
+
+// C8dContainer returns a reference to the libcontainerd Container object for
+// the container and whether the reference is valid.
+//
+// The container lock must be held when calling this method.
+func (s *State) C8dContainer() (_ libcontainerdtypes.Container, ok bool) {
+	return s.ctr, s.ctr != nil
+}
+
+// Task returns a reference to the libcontainerd Task object for the container
+// and whether the reference is valid.
+//
+// The container lock must be held when calling this method.
+//
+// See also: (*Container).GetRunningTask().
+func (s *State) Task() (_ libcontainerdtypes.Task, ok bool) {
+	return s.task, s.task != nil
+}

+ 12 - 4
container/state_test.go

@@ -6,6 +6,7 @@ import (
 	"time"
 
 	"github.com/docker/docker/api/types"
+	libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
 )
 
 func TestIsValidHealthString(t *testing.T) {
@@ -28,6 +29,13 @@ func TestIsValidHealthString(t *testing.T) {
 	}
 }
 
+type mockTask struct {
+	libcontainerdtypes.Task
+	pid uint32
+}
+
+func (t *mockTask) Pid() uint32 { return t.pid }
+
 func TestStateRunStop(t *testing.T) {
 	s := NewState()
 
@@ -60,7 +68,7 @@ func TestStateRunStop(t *testing.T) {
 
 		// Set the state to "Running".
 		s.Lock()
-		s.SetRunning(i, true)
+		s.SetRunning(nil, &mockTask{pid: uint32(i)}, true)
 		s.Unlock()
 
 		// Assert desired state.
@@ -125,7 +133,7 @@ func TestStateTimeoutWait(t *testing.T) {
 	s := NewState()
 
 	s.Lock()
-	s.SetRunning(0, true)
+	s.SetRunning(nil, nil, true)
 	s.Unlock()
 
 	// Start a wait with a timeout.
@@ -174,7 +182,7 @@ func TestCorrectStateWaitResultAfterRestart(t *testing.T) {
 	s := NewState()
 
 	s.Lock()
-	s.SetRunning(0, true)
+	s.SetRunning(nil, nil, true)
 	s.Unlock()
 
 	waitC := s.Wait(context.Background(), WaitConditionNotRunning)
@@ -185,7 +193,7 @@ func TestCorrectStateWaitResultAfterRestart(t *testing.T) {
 	s.Unlock()
 
 	s.Lock()
-	s.SetRunning(0, true)
+	s.SetRunning(nil, nil, true)
 	s.Unlock()
 
 	got := <-waitC

+ 6 - 3
daemon/checkpoint.go

@@ -57,8 +57,11 @@ func (daemon *Daemon) CheckpointCreate(name string, config types.CheckpointCreat
 		return err
 	}
 
-	if !container.IsRunning() {
-		return fmt.Errorf("Container %s not running", name)
+	container.Lock()
+	tsk, err := container.GetRunningTask()
+	container.Unlock()
+	if err != nil {
+		return err
 	}
 
 	if !validCheckpointNamePattern.MatchString(config.CheckpointID) {
@@ -70,7 +73,7 @@ func (daemon *Daemon) CheckpointCreate(name string, config types.CheckpointCreat
 		return fmt.Errorf("cannot checkpoint container %s: %s", name, err)
 	}
 
-	err = daemon.containerd.CreateCheckpoint(context.Background(), container.ID, checkpointDir, config.Exit)
+	err = tsk.CreateCheckpoint(context.Background(), checkpointDir, config.Exit)
 	if err != nil {
 		os.RemoveAll(checkpointDir)
 		return fmt.Errorf("Cannot checkpoint container %s: %s", name, err)

+ 53 - 53
daemon/daemon.go

@@ -30,7 +30,6 @@ import (
 	"github.com/docker/docker/daemon/config"
 	ctrd "github.com/docker/docker/daemon/containerd"
 	"github.com/docker/docker/daemon/events"
-	"github.com/docker/docker/daemon/exec"
 	_ "github.com/docker/docker/daemon/graphdriver/register" // register graph drivers
 	"github.com/docker/docker/daemon/images"
 	"github.com/docker/docker/daemon/logger"
@@ -75,7 +74,7 @@ type Daemon struct {
 	repository            string
 	containers            container.Store
 	containersReplica     container.ViewDB
-	execCommands          *exec.Store
+	execCommands          *container.ExecStore
 	imageService          ImageService
 	configStore           *config.Config
 	statsCollector        *stats.Collector
@@ -317,40 +316,43 @@ func (daemon *Daemon) restore() error {
 
 			logger(c).Debug("restoring container")
 
-			var (
-				err      error
-				alive    bool
-				ec       uint32
-				exitedAt time.Time
-				process  libcontainerdtypes.Process
-			)
+			var es *containerd.ExitStatus
 
-			alive, _, process, err = daemon.containerd.Restore(context.Background(), c.ID, c.InitializeStdio)
-			if err != nil && !errdefs.IsNotFound(err) {
+			if err := c.RestoreTask(context.Background(), daemon.containerd); err != nil && !errdefs.IsNotFound(err) {
 				logger(c).WithError(err).Error("failed to restore container with containerd")
 				return
 			}
-			logger(c).Debugf("alive: %v", alive)
-			if !alive {
-				// If process is not nil, cleanup dead container from containerd.
-				// If process is nil then the above `containerd.Restore` returned an errdefs.NotFoundError,
-				// and docker's view of the container state will be updated accorrdingly via SetStopped further down.
-				if process != nil {
-					logger(c).Debug("cleaning up dead container process")
-					ec, exitedAt, err = process.Delete(context.Background())
-					if err != nil && !errdefs.IsNotFound(err) {
-						logger(c).WithError(err).Error("failed to delete container from containerd")
-						return
+
+			alive := false
+			status := containerd.Unknown
+			if tsk, ok := c.Task(); ok {
+				s, err := tsk.Status(context.Background())
+				if err != nil {
+					logger(c).WithError(err).Error("failed to get task status")
+				} else {
+					status = s.Status
+					alive = status != containerd.Stopped
+					if !alive {
+						logger(c).Debug("cleaning up dead container process")
+						es, err = tsk.Delete(context.Background())
+						if err != nil && !errdefs.IsNotFound(err) {
+							logger(c).WithError(err).Error("failed to delete task from containerd")
+							return
+						}
+					} else if !daemon.configStore.LiveRestoreEnabled {
+						logger(c).Debug("shutting down container considered alive by containerd")
+						if err := daemon.shutdownContainer(c); err != nil && !errdefs.IsNotFound(err) {
+							log.WithError(err).Error("error shutting down container")
+							return
+						}
+						status = containerd.Stopped
+						alive = false
+						c.ResetRestartManager(false)
 					}
 				}
-			} else if !daemon.configStore.LiveRestoreEnabled {
-				logger(c).Debug("shutting down container considered alive by containerd")
-				if err := daemon.shutdownContainer(c); err != nil && !errdefs.IsNotFound(err) {
-					log.WithError(err).Error("error shutting down container")
-					return
-				}
-				c.ResetRestartManager(false)
 			}
+			// If the containerd task for the container was not found, docker's view of the
+			// container state will be updated accordingly via SetStopped further down.
 
 			if c.IsRunning() || c.IsPaused() {
 				logger(c).Debug("syncing container on disk state with real state")
@@ -359,29 +361,22 @@ func (daemon *Daemon) restore() error {
 
 				switch {
 				case c.IsPaused() && alive:
-					s, err := daemon.containerd.Status(context.Background(), c.ID)
-					if err != nil {
-						logger(c).WithError(err).Error("failed to get container status")
-					} else {
-						logger(c).WithField("state", s).Info("restored container paused")
-						switch s {
-						case containerd.Paused, containerd.Pausing:
-							// nothing to do
-						case containerd.Stopped:
-							alive = false
-						case containerd.Unknown:
-							log.Error("unknown status for paused container during restore")
-						default:
-							// running
-							c.Lock()
-							c.Paused = false
-							daemon.setStateCounter(c)
-							daemon.updateHealthMonitor(c)
-							if err := c.CheckpointTo(daemon.containersReplica); err != nil {
-								log.WithError(err).Error("failed to update paused container state")
-							}
-							c.Unlock()
+					logger(c).WithField("state", status).Info("restored container paused")
+					switch status {
+					case containerd.Paused, containerd.Pausing:
+						// nothing to do
+					case containerd.Unknown, containerd.Stopped, "":
+						log.WithField("status", status).Error("unexpected status for paused container during restore")
+					default:
+						// running
+						c.Lock()
+						c.Paused = false
+						daemon.setStateCounter(c)
+						daemon.updateHealthMonitor(c)
+						if err := c.CheckpointTo(daemon.containersReplica); err != nil {
+							log.WithError(err).Error("failed to update paused container state")
 						}
+						c.Unlock()
 					}
 				case !c.IsPaused() && alive:
 					logger(c).Debug("restoring healthcheck")
@@ -393,7 +388,12 @@ func (daemon *Daemon) restore() error {
 				if !alive {
 					logger(c).Debug("setting stopped state")
 					c.Lock()
-					c.SetStopped(&container.ExitStatus{ExitCode: int(ec), ExitedAt: exitedAt})
+					var ces container.ExitStatus
+					if es != nil {
+						ces.ExitCode = int(es.ExitCode())
+						ces.ExitedAt = es.ExitTime()
+					}
+					c.SetStopped(&ces)
 					daemon.Cleanup(c)
 					if err := c.CheckpointTo(daemon.containersReplica); err != nil {
 						log.WithError(err).Error("failed to update stopped container state")
@@ -956,7 +956,7 @@ func NewDaemon(ctx context.Context, config *config.Config, pluginStore *plugin.S
 	if d.containersReplica, err = container.NewViewDB(); err != nil {
 		return nil, err
 	}
-	d.execCommands = exec.NewStore()
+	d.execCommands = container.NewExecStore()
 	d.statsCollector = d.newStatsCollector(1 * time.Second)
 
 	d.EventsService = events.New()

+ 6 - 3
daemon/daemon_unix.go

@@ -1387,10 +1387,13 @@ func copyBlkioEntry(entries []*statsV1.BlkIOEntry) []types.BlkioStatEntry {
 }
 
 func (daemon *Daemon) stats(c *container.Container) (*types.StatsJSON, error) {
-	if !c.IsRunning() {
-		return nil, errNotRunning(c.ID)
+	c.Lock()
+	task, err := c.GetRunningTask()
+	c.Unlock()
+	if err != nil {
+		return nil, err
 	}
-	cs, err := daemon.containerd.Stats(context.Background(), c.ID)
+	cs, err := task.Stats(context.Background())
 	if err != nil {
 		if strings.Contains(err.Error(), "container not found") {
 			return nil, containerNotFound(c.ID)

+ 8 - 4
daemon/daemon_windows.go

@@ -14,6 +14,7 @@ import (
 	containertypes "github.com/docker/docker/api/types/container"
 	"github.com/docker/docker/container"
 	"github.com/docker/docker/daemon/config"
+	"github.com/docker/docker/errdefs"
 	"github.com/docker/docker/libcontainerd/local"
 	"github.com/docker/docker/libcontainerd/remote"
 	"github.com/docker/docker/libnetwork"
@@ -515,14 +516,17 @@ func driverOptions(_ *config.Config) nwconfig.Option {
 }
 
 func (daemon *Daemon) stats(c *container.Container) (*types.StatsJSON, error) {
-	if !c.IsRunning() {
-		return nil, errNotRunning(c.ID)
+	c.Lock()
+	task, err := c.GetRunningTask()
+	c.Unlock()
+	if err != nil {
+		return nil, err
 	}
 
 	// Obtain the stats from HCS via libcontainerd
-	stats, err := daemon.containerd.Stats(context.Background(), c.ID)
+	stats, err := task.Stats(context.Background())
 	if err != nil {
-		if strings.Contains(err.Error(), "container not found") {
+		if errdefs.IsNotFound(err) {
 			return nil, containerNotFound(c.ID)
 		}
 		return nil, err

+ 8 - 1
daemon/delete.go

@@ -138,7 +138,14 @@ func (daemon *Daemon) cleanupContainer(container *container.Container, config ty
 		container.RWLayer = nil
 	}
 
-	if err := containerfs.EnsureRemoveAll(container.Root); err != nil {
+	// Hold the container lock while deleting the container root directory
+	// so that other goroutines don't attempt to concurrently open files
+	// within it. Having any file open on Windows (without the
+	// FILE_SHARE_DELETE flag) will block it from being deleted.
+	container.Lock()
+	err := containerfs.EnsureRemoveAll(container.Root)
+	container.Unlock()
+	if err != nil {
 		err = errors.Wrapf(err, "unable to remove filesystem for %s", container.ID)
 		container.SetRemovalError(err)
 		return err

+ 1 - 1
daemon/delete_test.go

@@ -52,7 +52,7 @@ func TestContainerDelete(t *testing.T) {
 			fixMsg: "Stop the container before attempting removal or force remove",
 			initContainer: func() *container.Container {
 				c := newContainerWithState(container.NewState())
-				c.SetRunning(0, true)
+				c.SetRunning(nil, nil, true)
 				c.SetRestarting(&container.ExitStatus{})
 				return c
 			}},

+ 34 - 32
daemon/exec.go

@@ -2,18 +2,19 @@ package daemon // import "github.com/docker/docker/daemon"
 
 import (
 	"context"
+	"encoding/json"
 	"fmt"
 	"io"
 	"runtime"
 	"strings"
 	"time"
 
+	"github.com/containerd/containerd"
 	"github.com/docker/docker/api/types"
 	containertypes "github.com/docker/docker/api/types/container"
 	"github.com/docker/docker/api/types/strslice"
 	"github.com/docker/docker/container"
 	"github.com/docker/docker/container/stream"
-	"github.com/docker/docker/daemon/exec"
 	"github.com/docker/docker/errdefs"
 	"github.com/docker/docker/pkg/pools"
 	"github.com/moby/sys/signal"
@@ -23,7 +24,7 @@ import (
 	"github.com/sirupsen/logrus"
 )
 
-func (daemon *Daemon) registerExecCommand(container *container.Container, config *exec.Config) {
+func (daemon *Daemon) registerExecCommand(container *container.Container, config *container.ExecConfig) {
 	// Storing execs in container in order to kill them gracefully whenever the container is stopped or removed.
 	container.ExecCommands.Add(config.ID, config)
 	// Storing execs in daemon for easy access via Engine API.
@@ -41,7 +42,7 @@ func (daemon *Daemon) ExecExists(name string) (bool, error) {
 
 // getExecConfig looks up the exec instance by name. If the container associated
 // with the exec instance is stopped or paused, it will return an error.
-func (daemon *Daemon) getExecConfig(name string) (*exec.Config, error) {
+func (daemon *Daemon) getExecConfig(name string) (*container.ExecConfig, error) {
 	ec := daemon.execCommands.Get(name)
 	if ec == nil {
 		return nil, errExecNotFound(name)
@@ -52,7 +53,7 @@ func (daemon *Daemon) getExecConfig(name string) (*exec.Config, error) {
 	// saying the container isn't running, we should return a 404 so that
 	// the user sees the same error now that they will after the
 	// 5 minute clean-up loop is run which erases old/dead execs.
-	ctr := daemon.containers.Get(ec.ContainerID)
+	ctr := daemon.containers.Get(ec.Container.ID)
 	if ctr == nil {
 		return nil, containerNotFound(name)
 	}
@@ -68,9 +69,9 @@ func (daemon *Daemon) getExecConfig(name string) (*exec.Config, error) {
 	return ec, nil
 }
 
-func (daemon *Daemon) unregisterExecCommand(container *container.Container, execConfig *exec.Config) {
-	container.ExecCommands.Delete(execConfig.ID, execConfig.Pid)
-	daemon.execCommands.Delete(execConfig.ID, execConfig.Pid)
+func (daemon *Daemon) unregisterExecCommand(container *container.Container, execConfig *container.ExecConfig) {
+	container.ExecCommands.Delete(execConfig.ID)
+	daemon.execCommands.Delete(execConfig.ID)
 }
 
 func (daemon *Daemon) getActiveContainer(name string) (*container.Container, error) {
@@ -110,11 +111,10 @@ func (daemon *Daemon) ContainerExecCreate(name string, config *types.ExecConfig)
 		}
 	}
 
-	execConfig := exec.NewConfig()
+	execConfig := container.NewExecConfig(cntr)
 	execConfig.OpenStdin = config.AttachStdin
 	execConfig.OpenStdout = config.AttachStdout
 	execConfig.OpenStderr = config.AttachStderr
-	execConfig.ContainerID = cntr.ID
 	execConfig.DetachKeys = keys
 	execConfig.Entrypoint = entrypoint
 	execConfig.Args = args
@@ -174,27 +174,23 @@ func (daemon *Daemon) ContainerExecStart(ctx context.Context, name string, optio
 	ec.Running = true
 	ec.Unlock()
 
-	c := daemon.containers.Get(ec.ContainerID)
-	if c == nil {
-		return containerNotFound(ec.ContainerID)
-	}
-	logrus.Debugf("starting exec command %s in container %s", ec.ID, c.ID)
+	logrus.Debugf("starting exec command %s in container %s", ec.ID, ec.Container.ID)
 	attributes := map[string]string{
 		"execID": ec.ID,
 	}
-	daemon.LogContainerEventWithAttributes(c, "exec_start: "+ec.Entrypoint+" "+strings.Join(ec.Args, " "), attributes)
+	daemon.LogContainerEventWithAttributes(ec.Container, "exec_start: "+ec.Entrypoint+" "+strings.Join(ec.Args, " "), attributes)
 
 	defer func() {
 		if err != nil {
 			ec.Lock()
+			ec.Container.ExecCommands.Delete(ec.ID)
 			ec.Running = false
 			exitCode := 126
 			ec.ExitCode = &exitCode
 			if err := ec.CloseStreams(); err != nil {
-				logrus.Errorf("failed to cleanup exec %s streams: %s", c.ID, err)
+				logrus.Errorf("failed to cleanup exec %s streams: %s", ec.Container.ID, err)
 			}
 			ec.Unlock()
-			c.ExecCommands.Delete(ec.ID, ec.Pid)
 		}
 	}()
 
@@ -222,15 +218,18 @@ func (daemon *Daemon) ContainerExecStart(ctx context.Context, name string, optio
 
 	p := &specs.Process{}
 	if runtime.GOOS != "windows" {
-		ctr, err := daemon.containerdCli.LoadContainer(ctx, ec.ContainerID)
+		ctr, err := daemon.containerdCli.LoadContainer(ctx, ec.Container.ID)
 		if err != nil {
 			return err
 		}
-		spec, err := ctr.Spec(ctx)
+		md, err := ctr.Info(ctx, containerd.WithoutRefreshedMetadata)
 		if err != nil {
 			return err
 		}
-		p = spec.Process
+		spec := specs.Spec{Process: p}
+		if err := json.Unmarshal(md.Spec.GetValue(), &spec); err != nil {
+			return err
+		}
 	}
 	p.Args = append([]string{ec.Entrypoint}, ec.Args...)
 	p.Env = ec.Env
@@ -253,7 +252,7 @@ func (daemon *Daemon) ContainerExecStart(ctx context.Context, name string, optio
 		p.Cwd = "/"
 	}
 
-	if err := daemon.execSetPlatformOpt(c, ec, p); err != nil {
+	if err := daemon.execSetPlatformOpt(ctx, ec, p); err != nil {
 		return err
 	}
 
@@ -274,31 +273,34 @@ func (daemon *Daemon) ContainerExecStart(ctx context.Context, name string, optio
 	defer cancel()
 	attachErr := ec.StreamConfig.CopyStreams(copyCtx, &attachConfig)
 
+	ec.Container.Lock()
+	tsk, err := ec.Container.GetRunningTask()
+	ec.Container.Unlock()
+	if err != nil {
+		return err
+	}
+
 	// Synchronize with libcontainerd event loop
 	ec.Lock()
-	c.ExecCommands.Lock()
-	systemPid, err := daemon.containerd.Exec(ctx, c.ID, ec.ID, p, cStdin != nil, ec.InitializeStdio)
+	ec.Process, err = tsk.Exec(ctx, ec.ID, p, cStdin != nil, ec.InitializeStdio)
 	// the exec context should be ready, or error happened.
 	// close the chan to notify readiness
 	close(ec.Started)
 	if err != nil {
-		c.ExecCommands.Unlock()
-		ec.Unlock()
+		defer ec.Unlock()
 		return translateContainerdStartErr(ec.Entrypoint, ec.SetExitCode, err)
 	}
-	ec.Pid = systemPid
-	c.ExecCommands.Unlock()
 	ec.Unlock()
 
 	select {
 	case <-ctx.Done():
 		log := logrus.
-			WithField("container", c.ID).
-			WithField("exec", name)
+			WithField("container", ec.Container.ID).
+			WithField("exec", ec.ID)
 		log.Debug("Sending KILL signal to container process")
 		sigCtx, cancelFunc := context.WithTimeout(context.Background(), 30*time.Second)
 		defer cancelFunc()
-		err := daemon.containerd.SignalProcess(sigCtx, c.ID, name, signal.SignalMap["KILL"])
+		err := ec.Process.Kill(sigCtx, signal.SignalMap["KILL"])
 		if err != nil {
 			log.WithError(err).Error("Could not send KILL signal to container process")
 		}
@@ -311,7 +313,7 @@ func (daemon *Daemon) ContainerExecStart(ctx context.Context, name string, optio
 			attributes := map[string]string{
 				"execID": ec.ID,
 			}
-			daemon.LogContainerEventWithAttributes(c, "exec_detach", attributes)
+			daemon.LogContainerEventWithAttributes(ec.Container, "exec_detach", attributes)
 		}
 	}
 	return nil
@@ -328,7 +330,7 @@ func (daemon *Daemon) execCommandGC() {
 		for id, config := range daemon.execCommands.Commands() {
 			if config.CanRemove {
 				cleaned++
-				daemon.execCommands.Delete(id, config.Pid)
+				daemon.execCommands.Delete(id)
 			} else {
 				if _, exists := liveExecCommands[id]; !exists {
 					config.CanRemove = true

+ 6 - 7
daemon/exec_linux.go

@@ -5,15 +5,14 @@ import (
 
 	"github.com/containerd/containerd/pkg/apparmor"
 	"github.com/docker/docker/container"
-	"github.com/docker/docker/daemon/exec"
 	"github.com/docker/docker/oci/caps"
 	specs "github.com/opencontainers/runtime-spec/specs-go"
 )
 
-func (daemon *Daemon) execSetPlatformOpt(c *container.Container, ec *exec.Config, p *specs.Process) error {
+func (daemon *Daemon) execSetPlatformOpt(ctx context.Context, ec *container.ExecConfig, p *specs.Process) error {
 	if len(ec.User) > 0 {
 		var err error
-		p.User, err = getUser(c, ec.User)
+		p.User, err = getUser(ec.Container, ec.User)
 		if err != nil {
 			return err
 		}
@@ -27,9 +26,9 @@ func (daemon *Daemon) execSetPlatformOpt(c *container.Container, ec *exec.Config
 	}
 	if apparmor.HostSupports() {
 		var appArmorProfile string
-		if c.AppArmorProfile != "" {
-			appArmorProfile = c.AppArmorProfile
-		} else if c.HostConfig.Privileged {
+		if ec.Container.AppArmorProfile != "" {
+			appArmorProfile = ec.Container.AppArmorProfile
+		} else if ec.Container.HostConfig.Privileged {
 			// `docker exec --privileged` does not currently disable AppArmor
 			// profiles. Privileged configuration of the container is inherited
 			appArmorProfile = unconfinedAppArmorProfile
@@ -51,5 +50,5 @@ func (daemon *Daemon) execSetPlatformOpt(c *container.Container, ec *exec.Config
 		p.ApparmorProfile = appArmorProfile
 	}
 	s := &specs.Spec{Process: p}
-	return WithRlimits(daemon, c)(context.Background(), nil, nil, s)
+	return WithRlimits(daemon, ec.Container)(ctx, nil, nil, s)
 }

+ 3 - 3
daemon/exec_linux_test.go

@@ -4,13 +4,13 @@
 package daemon
 
 import (
+	"context"
 	"testing"
 
 	"github.com/containerd/containerd/pkg/apparmor"
 	containertypes "github.com/docker/docker/api/types/container"
 	"github.com/docker/docker/container"
 	"github.com/docker/docker/daemon/config"
-	"github.com/docker/docker/daemon/exec"
 	specs "github.com/opencontainers/runtime-spec/specs-go"
 	"gotest.tools/v3/assert"
 )
@@ -79,10 +79,10 @@ func TestExecSetPlatformOptAppArmor(t *testing.T) {
 						Privileged: tc.privileged,
 					},
 				}
-				ec := &exec.Config{Privileged: execPrivileged}
+				ec := &container.ExecConfig{Container: c, Privileged: execPrivileged}
 				p := &specs.Process{}
 
-				err := d.execSetPlatformOpt(c, ec, p)
+				err := d.execSetPlatformOpt(context.Background(), ec, p)
 				assert.NilError(t, err)
 				assert.Equal(t, p.ApparmorProfile, tc.expectedProfile)
 			})

+ 4 - 3
daemon/exec_windows.go

@@ -1,13 +1,14 @@
 package daemon // import "github.com/docker/docker/daemon"
 
 import (
+	"context"
+
 	"github.com/docker/docker/container"
-	"github.com/docker/docker/daemon/exec"
 	specs "github.com/opencontainers/runtime-spec/specs-go"
 )
 
-func (daemon *Daemon) execSetPlatformOpt(c *container.Container, ec *exec.Config, p *specs.Process) error {
-	if c.OS == "windows" {
+func (daemon *Daemon) execSetPlatformOpt(ctx context.Context, ec *container.ExecConfig, p *specs.Process) error {
+	if ec.Container.OS == "windows" {
 		p.User.Username = ec.User
 	}
 	return nil

+ 13 - 6
daemon/health.go

@@ -13,7 +13,6 @@ import (
 	containertypes "github.com/docker/docker/api/types/container"
 	"github.com/docker/docker/api/types/strslice"
 	"github.com/docker/docker/container"
-	"github.com/docker/docker/daemon/exec"
 	"github.com/sirupsen/logrus"
 )
 
@@ -69,11 +68,10 @@ func (p *cmdProbe) run(ctx context.Context, d *Daemon, cntr *container.Container
 		cmdSlice = append(getShell(cntr), cmdSlice...)
 	}
 	entrypoint, args := d.getEntrypointAndArgs(strslice.StrSlice{}, cmdSlice)
-	execConfig := exec.NewConfig()
+	execConfig := container.NewExecConfig(cntr)
 	execConfig.OpenStdin = false
 	execConfig.OpenStdout = true
 	execConfig.OpenStderr = true
-	execConfig.ContainerID = cntr.ID
 	execConfig.DetachKeys = []byte{}
 	execConfig.Entrypoint = entrypoint
 	execConfig.Args = args
@@ -151,14 +149,23 @@ func (p *cmdProbe) run(ctx context.Context, d *Daemon, cntr *container.Container
 	if err != nil {
 		return nil, err
 	}
-	if info.ExitCode == nil {
-		return nil, fmt.Errorf("healthcheck for container %s has no exit code", cntr.ID)
+	exitCode, err := func() (int, error) {
+		info.Lock()
+		defer info.Unlock()
+		if info.ExitCode == nil {
+			info.Unlock()
+			return 0, fmt.Errorf("healthcheck for container %s has no exit code", cntr.ID)
+		}
+		return *info.ExitCode, nil
+	}()
+	if err != nil {
+		return nil, err
 	}
 	// Note: Go's json package will handle invalid UTF-8 for us
 	out := output.String()
 	return &types.HealthcheckResult{
 		End:      time.Now(),
-		ExitCode: *info.ExitCode,
+		ExitCode: exitCode,
 		Output:   out,
 	}, nil
 }

+ 9 - 3
daemon/inspect.go

@@ -218,11 +218,17 @@ func (daemon *Daemon) ContainerExecInspect(id string) (*backend.ExecInspect, err
 		return nil, errExecNotFound(id)
 	}
 
-	if ctr := daemon.containers.Get(e.ContainerID); ctr == nil {
+	if ctr := daemon.containers.Get(e.Container.ID); ctr == nil {
 		return nil, errExecNotFound(id)
 	}
 
+	e.Lock()
+	defer e.Unlock()
 	pc := inspectExecProcessConfig(e)
+	var pid int
+	if e.Process != nil {
+		pid = int(e.Process.Pid())
+	}
 
 	return &backend.ExecInspect{
 		ID:            e.ID,
@@ -233,9 +239,9 @@ func (daemon *Daemon) ContainerExecInspect(id string) (*backend.ExecInspect, err
 		OpenStdout:    e.OpenStdout,
 		OpenStderr:    e.OpenStderr,
 		CanRemove:     e.CanRemove,
-		ContainerID:   e.ContainerID,
+		ContainerID:   e.Container.ID,
 		DetachKeys:    e.DetachKeys,
-		Pid:           e.Pid,
+		Pid:           pid,
 	}, nil
 }
 

+ 1 - 2
daemon/inspect_linux.go

@@ -5,7 +5,6 @@ import (
 	"github.com/docker/docker/api/types/backend"
 	"github.com/docker/docker/api/types/versions/v1p19"
 	"github.com/docker/docker/container"
-	"github.com/docker/docker/daemon/exec"
 )
 
 // This sets platform-specific fields
@@ -62,7 +61,7 @@ func (daemon *Daemon) containerInspectPre120(name string) (*v1p19.ContainerJSON,
 	}, nil
 }
 
-func inspectExecProcessConfig(e *exec.Config) *backend.ExecProcessConfig {
+func inspectExecProcessConfig(e *container.ExecConfig) *backend.ExecProcessConfig {
 	return &backend.ExecProcessConfig{
 		Tty:        e.Tty,
 		Entrypoint: e.Entrypoint,

+ 1 - 2
daemon/inspect_test.go

@@ -6,7 +6,6 @@ import (
 	containertypes "github.com/docker/docker/api/types/container"
 	"github.com/docker/docker/container"
 	"github.com/docker/docker/daemon/config"
-	"github.com/docker/docker/daemon/exec"
 	"gotest.tools/v3/assert"
 	is "gotest.tools/v3/assert/cmp"
 )
@@ -16,7 +15,7 @@ func TestGetInspectData(t *testing.T) {
 		ID:           "inspect-me",
 		HostConfig:   &containertypes.HostConfig{},
 		State:        container.NewState(),
-		ExecCommands: exec.NewStore(),
+		ExecCommands: container.NewExecStore(),
 	}
 
 	d := &Daemon{

+ 1 - 2
daemon/inspect_windows.go

@@ -4,7 +4,6 @@ import (
 	"github.com/docker/docker/api/types"
 	"github.com/docker/docker/api/types/backend"
 	"github.com/docker/docker/container"
-	"github.com/docker/docker/daemon/exec"
 )
 
 // This sets platform-specific fields
@@ -17,7 +16,7 @@ func (daemon *Daemon) containerInspectPre120(name string) (*types.ContainerJSON,
 	return daemon.ContainerInspectCurrent(name, false)
 }
 
-func inspectExecProcessConfig(e *exec.Config) *backend.ExecProcessConfig {
+func inspectExecProcessConfig(e *container.ExecConfig) *backend.ExecProcessConfig {
 	return &backend.ExecProcessConfig{
 		Tty:        e.Tty,
 		Entrypoint: e.Entrypoint,

+ 5 - 6
daemon/kill.go

@@ -9,7 +9,6 @@ import (
 
 	containerpkg "github.com/docker/docker/container"
 	"github.com/docker/docker/errdefs"
-	libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
 	"github.com/moby/sys/signal"
 	"github.com/pkg/errors"
 	"github.com/sirupsen/logrus"
@@ -65,8 +64,9 @@ func (daemon *Daemon) killWithSignal(container *containerpkg.Container, stopSign
 	container.Lock()
 	defer container.Unlock()
 
-	if !container.Running {
-		return errNotRunning(container.ID)
+	task, err := container.GetRunningTask()
+	if err != nil {
+		return err
 	}
 
 	var unpause bool
@@ -96,8 +96,7 @@ func (daemon *Daemon) killWithSignal(container *containerpkg.Container, stopSign
 		return nil
 	}
 
-	err := daemon.containerd.SignalProcess(context.Background(), container.ID, libcontainerdtypes.InitProcessName, stopSignal)
-	if err != nil {
+	if err := task.Kill(context.Background(), stopSignal); err != nil {
 		if errdefs.IsNotFound(err) {
 			unpause = false
 			logrus.WithError(err).WithField("container", container.ID).WithField("action", "kill").Debug("container kill failed because of 'container not found' or 'no such process'")
@@ -121,7 +120,7 @@ func (daemon *Daemon) killWithSignal(container *containerpkg.Container, stopSign
 
 	if unpause {
 		// above kill signal will be sent once resume is finished
-		if err := daemon.containerd.Resume(context.Background(), container.ID); err != nil {
+		if err := task.Resume(context.Background()); err != nil {
 			logrus.Warnf("Cannot unpause container %s: %s", container.ID, err)
 		}
 	}

+ 56 - 18
daemon/monitor.go

@@ -7,6 +7,7 @@ import (
 
 	"github.com/docker/docker/api/types"
 	"github.com/docker/docker/container"
+	"github.com/docker/docker/errdefs"
 	libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
 	"github.com/docker/docker/restartmanager"
 	"github.com/pkg/errors"
@@ -25,28 +26,32 @@ func (daemon *Daemon) setStateCounter(c *container.Container) {
 }
 
 func (daemon *Daemon) handleContainerExit(c *container.Container, e *libcontainerdtypes.EventInfo) error {
+	var exitStatus container.ExitStatus
 	c.Lock()
-	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
-	ec, et, err := daemon.containerd.DeleteTask(ctx, c.ID)
-	cancel()
-	if err != nil {
-		logrus.WithError(err).WithField("container", c.ID).Warnf("failed to delete container from containerd")
+	tsk, ok := c.Task()
+	if ok {
+		ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+		es, err := tsk.Delete(ctx)
+		cancel()
+		if err != nil {
+			logrus.WithError(err).WithField("container", c.ID).Warnf("failed to delete container from containerd")
+		} else {
+			exitStatus = container.ExitStatus{
+				ExitCode: int(es.ExitCode()),
+				ExitedAt: es.ExitTime(),
+			}
+		}
 	}
 
-	ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second)
+	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
 	c.StreamConfig.Wait(ctx)
 	cancel()
 
 	c.Reset(false)
 
-	exitStatus := container.ExitStatus{
-		ExitCode: int(ec),
-		ExitedAt: et,
-	}
 	if e != nil {
 		exitStatus.ExitCode = int(e.ExitCode)
 		exitStatus.ExitedAt = e.ExitedAt
-		exitStatus.OOMKilled = e.OOMKilled
 		if e.Error != nil {
 			c.SetError(e.Error)
 		}
@@ -54,7 +59,7 @@ func (daemon *Daemon) handleContainerExit(c *container.Container, e *libcontaine
 
 	daemonShutdown := daemon.IsShuttingDown()
 	execDuration := time.Since(c.StartedAt)
-	restart, wait, err := c.RestartManager().ShouldRestart(ec, daemonShutdown || c.HasBeenManuallyStopped, execDuration)
+	restart, wait, err := c.RestartManager().ShouldRestart(uint32(exitStatus.ExitCode), daemonShutdown || c.HasBeenManuallyStopped, execDuration)
 	if err != nil {
 		logrus.WithError(err).
 			WithField("container", c.ID).
@@ -71,7 +76,7 @@ func (daemon *Daemon) handleContainerExit(c *container.Container, e *libcontaine
 	// restarted if/when the container is started again
 	daemon.stopHealthchecks(c)
 	attributes := map[string]string{
-		"exitCode": strconv.Itoa(int(ec)),
+		"exitCode": strconv.Itoa(exitStatus.ExitCode),
 	}
 	daemon.Cleanup(c)
 
@@ -141,6 +146,7 @@ func (daemon *Daemon) ProcessEvent(id string, e libcontainerdtypes.EventType, ei
 
 		c.Lock()
 		defer c.Unlock()
+		c.OOMKilled = true
 		daemon.updateHealthMonitor(c)
 		if err := c.CheckpointTo(daemon.containersReplica); err != nil {
 			return err
@@ -157,6 +163,13 @@ func (daemon *Daemon) ProcessEvent(id string, e libcontainerdtypes.EventType, ei
 			ec := int(ei.ExitCode)
 			execConfig.Lock()
 			defer execConfig.Unlock()
+
+			// Remove the exec command from the container's store only and not the
+			// daemon's store so that the exec command can be inspected. Remove it
+			// before mutating execConfig to maintain the invariant that
+			// c.ExecCommands only contain execs in the Running state.
+			c.ExecCommands.Delete(execConfig.ID)
+
 			execConfig.ExitCode = &ec
 			execConfig.Running = false
 
@@ -168,11 +181,16 @@ func (daemon *Daemon) ProcessEvent(id string, e libcontainerdtypes.EventType, ei
 				logrus.Errorf("failed to cleanup exec %s streams: %s", c.ID, err)
 			}
 
-			// remove the exec command from the container's store only and not the
-			// daemon's store so that the exec command can be inspected.
-			c.ExecCommands.Delete(execConfig.ID, execConfig.Pid)
-
 			exitCode = ec
+
+			go func() {
+				if _, err := execConfig.Process.Delete(context.Background()); err != nil {
+					logrus.WithError(err).WithFields(logrus.Fields{
+						"container": ei.ContainerID,
+						"process":   ei.ProcessID,
+					}).Warn("failed to delete process")
+				}
+			}()
 		}
 		attributes := map[string]string{
 			"execID":   ei.ProcessID,
@@ -185,7 +203,27 @@ func (daemon *Daemon) ProcessEvent(id string, e libcontainerdtypes.EventType, ei
 
 		// This is here to handle start not generated by docker
 		if !c.Running {
-			c.SetRunning(int(ei.Pid), false)
+			ctr, err := daemon.containerd.LoadContainer(context.Background(), c.ID)
+			if err != nil {
+				if errdefs.IsNotFound(err) {
+					// The container was started by not-docker and so could have been deleted by
+					// not-docker before we got around to loading it from containerd.
+					logrus.WithField("container", c.ID).WithError(err).
+						Debug("could not load containerd container for start event")
+					return nil
+				}
+				return err
+			}
+			tsk, err := ctr.Task(context.Background())
+			if err != nil {
+				if errdefs.IsNotFound(err) {
+					logrus.WithField("container", c.ID).WithError(err).
+						Debug("failed to load task for externally-started container")
+					return nil
+				}
+				return err
+			}
+			c.SetRunning(ctr, tsk, false)
 			c.HasBeenManuallyStopped = false
 			c.HasBeenStartedBefore = true
 			daemon.setStateCounter(c)

+ 5 - 4
daemon/pause.go

@@ -24,8 +24,9 @@ func (daemon *Daemon) containerPause(container *container.Container) error {
 	defer container.Unlock()
 
 	// We cannot Pause the container which is not running
-	if !container.Running {
-		return errNotRunning(container.ID)
+	tsk, err := container.GetRunningTask()
+	if err != nil {
+		return err
 	}
 
 	// We cannot Pause the container which is already paused
@@ -38,8 +39,8 @@ func (daemon *Daemon) containerPause(container *container.Container) error {
 		return errContainerIsRestarting(container.ID)
 	}
 
-	if err := daemon.containerd.Pause(context.Background(), container.ID); err != nil {
-		return fmt.Errorf("Cannot pause container %s: %s", container.ID, err)
+	if err := tsk.Pause(context.Background()); err != nil {
+		return fmt.Errorf("cannot pause container %s: %s", container.ID, err)
 	}
 
 	container.Paused = true

+ 7 - 6
daemon/resize.go

@@ -4,8 +4,6 @@ import (
 	"context"
 	"fmt"
 	"time"
-
-	libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
 )
 
 // ContainerResize changes the size of the TTY of the process running
@@ -16,11 +14,14 @@ func (daemon *Daemon) ContainerResize(name string, height, width int) error {
 		return err
 	}
 
-	if !container.IsRunning() {
-		return errNotRunning(container.ID)
+	container.Lock()
+	tsk, err := container.GetRunningTask()
+	container.Unlock()
+	if err != nil {
+		return err
 	}
 
-	if err = daemon.containerd.ResizeTerminal(context.Background(), container.ID, libcontainerdtypes.InitProcessName, width, height); err == nil {
+	if err = tsk.Resize(context.Background(), uint32(width), uint32(height)); err == nil {
 		attributes := map[string]string{
 			"height": fmt.Sprintf("%d", height),
 			"width":  fmt.Sprintf("%d", width),
@@ -46,7 +47,7 @@ func (daemon *Daemon) ContainerExecResize(name string, height, width int) error
 
 	select {
 	case <-ec.Started:
-		return daemon.containerd.ResizeTerminal(context.Background(), ec.ContainerID, ec.ID, width, height)
+		return ec.Process.Resize(context.Background(), uint32(width), uint32(height))
 	case <-timeout.C:
 		return fmt.Errorf("timeout waiting for exec session ready")
 	}

+ 35 - 39
daemon/resize_test.go

@@ -8,7 +8,7 @@ import (
 	"testing"
 
 	"github.com/docker/docker/container"
-	"github.com/docker/docker/daemon/exec"
+	"github.com/docker/docker/libcontainerd/types"
 	"gotest.tools/v3/assert"
 )
 
@@ -16,32 +16,28 @@ import (
 func TestExecResizeNoSuchExec(t *testing.T) {
 	n := "TestExecResize"
 	d := &Daemon{
-		execCommands: exec.NewStore(),
+		execCommands: container.NewExecStore(),
 	}
 	c := &container.Container{
-		ExecCommands: exec.NewStore(),
+		ExecCommands: container.NewExecStore(),
 	}
-	ec := &exec.Config{
-		ID: n,
+	ec := &container.ExecConfig{
+		ID:        n,
+		Container: c,
 	}
 	d.registerExecCommand(c, ec)
 	err := d.ContainerExecResize("nil", 24, 8)
 	assert.ErrorContains(t, err, "No such exec instance")
 }
 
-type execResizeMockContainerdClient struct {
-	MockContainerdClient
-	ProcessID   string
-	ContainerID string
-	Width       int
-	Height      int
+type execResizeMockProcess struct {
+	types.Process
+	Width, Height int
 }
 
-func (c *execResizeMockContainerdClient) ResizeTerminal(ctx context.Context, containerID, processID string, width, height int) error {
-	c.ProcessID = processID
-	c.ContainerID = containerID
-	c.Width = width
-	c.Height = height
+func (p *execResizeMockProcess) Resize(ctx context.Context, width, height uint32) error {
+	p.Width = int(width)
+	p.Height = int(height)
 	return nil
 }
 
@@ -50,30 +46,29 @@ func TestExecResize(t *testing.T) {
 	n := "TestExecResize"
 	width := 24
 	height := 8
-	ec := &exec.Config{
-		ID:          n,
-		ContainerID: n,
-		Started:     make(chan struct{}),
-	}
-	close(ec.Started)
-	mc := &execResizeMockContainerdClient{}
+	mp := &execResizeMockProcess{}
 	d := &Daemon{
-		execCommands: exec.NewStore(),
-		containerd:   mc,
+		execCommands: container.NewExecStore(),
 		containers:   container.NewMemoryStore(),
 	}
 	c := &container.Container{
-		ExecCommands: exec.NewStore(),
+		ID:           n,
+		ExecCommands: container.NewExecStore(),
 		State:        &container.State{Running: true},
 	}
+	ec := &container.ExecConfig{
+		ID:        n,
+		Container: c,
+		Process:   mp,
+		Started:   make(chan struct{}),
+	}
+	close(ec.Started)
 	d.containers.Add(n, c)
 	d.registerExecCommand(c, ec)
 	err := d.ContainerExecResize(n, height, width)
 	assert.NilError(t, err)
-	assert.Equal(t, mc.Width, width)
-	assert.Equal(t, mc.Height, height)
-	assert.Equal(t, mc.ProcessID, n)
-	assert.Equal(t, mc.ContainerID, n)
+	assert.Equal(t, mp.Width, width)
+	assert.Equal(t, mp.Height, height)
 }
 
 // This test is to make sure that when exec context is not ready, a timeout error should happen.
@@ -82,21 +77,22 @@ func TestExecResizeTimeout(t *testing.T) {
 	n := "TestExecResize"
 	width := 24
 	height := 8
-	ec := &exec.Config{
-		ID:          n,
-		ContainerID: n,
-		Started:     make(chan struct{}),
-	}
-	mc := &execResizeMockContainerdClient{}
+	mp := &execResizeMockProcess{}
 	d := &Daemon{
-		execCommands: exec.NewStore(),
-		containerd:   mc,
+		execCommands: container.NewExecStore(),
 		containers:   container.NewMemoryStore(),
 	}
 	c := &container.Container{
-		ExecCommands: exec.NewStore(),
+		ID:           n,
+		ExecCommands: container.NewExecStore(),
 		State:        &container.State{Running: true},
 	}
+	ec := &container.ExecConfig{
+		ID:        n,
+		Container: c,
+		Process:   mp,
+		Started:   make(chan struct{}),
+	}
 	d.containers.Add(n, c)
 	d.registerExecCommand(c, ec)
 	err := d.ContainerExecResize(n, height, width)

+ 14 - 20
daemon/start.go

@@ -9,6 +9,7 @@ import (
 	containertypes "github.com/docker/docker/api/types/container"
 	"github.com/docker/docker/container"
 	"github.com/docker/docker/errdefs"
+	"github.com/docker/docker/libcontainerd"
 	"github.com/pkg/errors"
 	"github.com/sirupsen/logrus"
 )
@@ -178,28 +179,17 @@ func (daemon *Daemon) containerStart(container *container.Container, checkpoint
 
 	ctx := context.TODO()
 
-	err = daemon.containerd.Create(ctx, container.ID, spec, shim, createOptions)
+	ctr, err := libcontainerd.ReplaceContainer(ctx, daemon.containerd, container.ID, spec, shim, createOptions)
 	if err != nil {
-		if errdefs.IsConflict(err) {
-			logrus.WithError(err).WithField("container", container.ID).Error("Container not cleaned up from containerd from previous run")
-			// best effort to clean up old container object
-			daemon.containerd.DeleteTask(ctx, container.ID)
-			if err := daemon.containerd.Delete(ctx, container.ID); err != nil && !errdefs.IsNotFound(err) {
-				logrus.WithError(err).WithField("container", container.ID).Error("Error cleaning up stale containerd container object")
-			}
-			err = daemon.containerd.Create(ctx, container.ID, spec, shim, createOptions)
-		}
-		if err != nil {
-			return translateContainerdStartErr(container.Path, container.SetExitCode, err)
-		}
+		return translateContainerdStartErr(container.Path, container.SetExitCode, err)
 	}
 
 	// TODO(mlaventure): we need to specify checkpoint options here
-	pid, err := daemon.containerd.Start(context.Background(), container.ID, checkpointDir,
+	tsk, err := ctr.Start(ctx, checkpointDir,
 		container.StreamConfig.Stdin() != nil || container.Config.Tty,
 		container.InitializeStdio)
 	if err != nil {
-		if err := daemon.containerd.Delete(context.Background(), container.ID); err != nil {
+		if err := ctr.Delete(context.Background()); err != nil {
 			logrus.WithError(err).WithField("container", container.ID).
 				Error("failed to delete failed start container")
 		}
@@ -207,7 +197,7 @@ func (daemon *Daemon) containerStart(container *container.Container, checkpoint
 	}
 
 	container.HasBeenManuallyRestarted = false
-	container.SetRunning(pid, true)
+	container.SetRunning(ctr, tsk, true)
 	container.HasBeenStartedBefore = true
 	daemon.setStateCounter(container)
 
@@ -227,6 +217,14 @@ func (daemon *Daemon) containerStart(container *container.Container, checkpoint
 // Cleanup releases any network resources allocated to the container along with any rules
 // around how containers are linked together.  It also unmounts the container's root filesystem.
 func (daemon *Daemon) Cleanup(container *container.Container) {
+	// Microsoft HCS containers get in a bad state if host resources are
+	// released while the container still exists.
+	if ctr, ok := container.C8dContainer(); ok {
+		if err := ctr.Delete(context.Background()); err != nil {
+			logrus.Errorf("%s cleanup: failed to delete container from containerd: %v", container.ID, err)
+		}
+	}
+
 	daemon.releaseNetwork(container)
 
 	if err := container.UnmountIpcMount(); err != nil {
@@ -260,8 +258,4 @@ func (daemon *Daemon) Cleanup(container *container.Container) {
 	}
 
 	container.CancelAttachContext()
-
-	if err := daemon.containerd.Delete(context.Background(), container.ID); err != nil {
-		logrus.Errorf("%s cleanup: failed to delete container from containerd: %v", container.ID, err)
-	}
 }

+ 20 - 6
daemon/top_unix.go

@@ -14,6 +14,7 @@ import (
 
 	"github.com/docker/docker/api/types/container"
 	"github.com/docker/docker/errdefs"
+	libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
 	"github.com/pkg/errors"
 )
 
@@ -150,18 +151,31 @@ func (daemon *Daemon) ContainerTop(name string, psArgs string) (*container.Conta
 		return nil, err
 	}
 
-	if !ctr.IsRunning() {
-		return nil, errNotRunning(ctr.ID)
-	}
+	tsk, err := func() (libcontainerdtypes.Task, error) {
+		ctr.Lock()
+		defer ctr.Unlock()
 
-	if ctr.IsRestarting() {
-		return nil, errContainerIsRestarting(ctr.ID)
+		tsk, err := ctr.GetRunningTask()
+		if err != nil {
+			return nil, err
+		}
+		if ctr.Restarting {
+			return nil, errContainerIsRestarting(ctr.ID)
+		}
+		return tsk, nil
+	}()
+	if err != nil {
+		return nil, err
 	}
 
-	procs, err := daemon.containerd.ListPids(context.Background(), ctr.ID)
+	infos, err := tsk.Pids(context.Background())
 	if err != nil {
 		return nil, err
 	}
+	procs := make([]uint32, len(infos))
+	for i, p := range infos {
+		procs[i] = p.Pid
+	}
 
 	args := strings.Split(psArgs, " ")
 	pids := psPidsArg(procs)

+ 14 - 7
daemon/top_windows.go

@@ -7,6 +7,7 @@ import (
 	"time"
 
 	containertypes "github.com/docker/docker/api/types/container"
+	libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
 	units "github.com/docker/go-units"
 )
 
@@ -36,15 +37,21 @@ func (daemon *Daemon) ContainerTop(name string, psArgs string) (*containertypes.
 		return nil, err
 	}
 
-	if !container.IsRunning() {
-		return nil, errNotRunning(container.ID)
-	}
+	task, err := func() (libcontainerdtypes.Task, error) {
+		container.Lock()
+		defer container.Unlock()
 
-	if container.IsRestarting() {
-		return nil, errContainerIsRestarting(container.ID)
-	}
+		task, err := container.GetRunningTask()
+		if err != nil {
+			return nil, err
+		}
+		if container.Restarting {
+			return nil, errContainerIsRestarting(container.ID)
+		}
+		return task, nil
+	}()
 
-	s, err := daemon.containerd.Summary(context.Background(), container.ID)
+	s, err := task.Summary(context.Background())
 	if err != nil {
 		return nil, err
 	}

+ 5 - 1
daemon/unpause.go

@@ -26,8 +26,12 @@ func (daemon *Daemon) containerUnpause(ctr *container.Container) error {
 	if !ctr.Paused {
 		return fmt.Errorf("Container %s is not paused", ctr.ID)
 	}
+	tsk, err := ctr.GetRunningTask()
+	if err != nil {
+		return err
+	}
 
-	if err := daemon.containerd.Resume(context.Background(), ctr.ID); err != nil {
+	if err := tsk.Resume(context.Background()); err != nil {
 		return fmt.Errorf("Cannot unpause container %s: %s", ctr.ID, err)
 	}
 

+ 16 - 7
daemon/update.go

@@ -74,19 +74,28 @@ func (daemon *Daemon) update(name string, hostConfig *container.HostConfig) erro
 		ctr.UpdateMonitor(hostConfig.RestartPolicy)
 	}
 
+	defer daemon.LogContainerEvent(ctr, "update")
+
 	// If container is not running, update hostConfig struct is enough,
 	// resources will be updated when the container is started again.
 	// If container is running (including paused), we need to update configs
 	// to the real world.
-	if ctr.IsRunning() && !ctr.IsRestarting() {
-		if err := daemon.containerd.UpdateResources(context.Background(), ctr.ID, toContainerdResources(hostConfig.Resources)); err != nil {
-			restoreConfig = true
-			// TODO: it would be nice if containerd responded with better errors here so we can classify this better.
-			return errCannotUpdate(ctr.ID, errdefs.System(err))
-		}
+	ctr.Lock()
+	isRestarting := ctr.Restarting
+	tsk, err := ctr.GetRunningTask()
+	ctr.Unlock()
+	if errdefs.IsConflict(err) || isRestarting {
+		return nil
+	}
+	if err != nil {
+		return err
 	}
 
-	daemon.LogContainerEvent(ctr, "update")
+	if err := tsk.UpdateResources(context.TODO(), toContainerdResources(hostConfig.Resources)); err != nil {
+		restoreConfig = true
+		// TODO: it would be nice if containerd responded with better errors here so we can classify this better.
+		return errCannotUpdate(ctr.ID, errdefs.System(err))
+	}
 
 	return nil
 }

+ 0 - 74
daemon/util_test.go

@@ -1,74 +0,0 @@
-//go:build linux
-// +build linux
-
-package daemon
-
-import (
-	"context"
-	"syscall"
-	"time"
-
-	"github.com/containerd/containerd"
-	libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
-	specs "github.com/opencontainers/runtime-spec/specs-go"
-)
-
-type mockProcess struct {
-}
-
-func (m *mockProcess) Delete(_ context.Context) (uint32, time.Time, error) {
-	return 0, time.Time{}, nil
-}
-
-// Mock containerd client implementation, for unit tests.
-type MockContainerdClient struct {
-}
-
-func (c *MockContainerdClient) Version(ctx context.Context) (containerd.Version, error) {
-	return containerd.Version{}, nil
-}
-func (c *MockContainerdClient) Restore(ctx context.Context, containerID string, attachStdio libcontainerdtypes.StdioCallback) (alive bool, pid int, p libcontainerdtypes.Process, err error) {
-	return false, 0, &mockProcess{}, nil
-}
-func (c *MockContainerdClient) Create(ctx context.Context, containerID string, spec *specs.Spec, shim string, runtimeOptions interface{}, opts ...containerd.NewContainerOpts) error {
-	return nil
-}
-func (c *MockContainerdClient) Start(ctx context.Context, containerID, checkpointDir string, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (pid int, err error) {
-	return 0, nil
-}
-func (c *MockContainerdClient) SignalProcess(ctx context.Context, containerID, processID string, signal syscall.Signal) error {
-	return nil
-}
-func (c *MockContainerdClient) Exec(ctx context.Context, containerID, processID string, spec *specs.Process, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (int, error) {
-	return 0, nil
-}
-func (c *MockContainerdClient) ResizeTerminal(ctx context.Context, containerID, processID string, width, height int) error {
-	return nil
-}
-func (c *MockContainerdClient) CloseStdin(ctx context.Context, containerID, processID string) error {
-	return nil
-}
-func (c *MockContainerdClient) Pause(ctx context.Context, containerID string) error  { return nil }
-func (c *MockContainerdClient) Resume(ctx context.Context, containerID string) error { return nil }
-func (c *MockContainerdClient) Stats(ctx context.Context, containerID string) (*libcontainerdtypes.Stats, error) {
-	return nil, nil
-}
-func (c *MockContainerdClient) ListPids(ctx context.Context, containerID string) ([]uint32, error) {
-	return nil, nil
-}
-func (c *MockContainerdClient) Summary(ctx context.Context, containerID string) ([]libcontainerdtypes.Summary, error) {
-	return nil, nil
-}
-func (c *MockContainerdClient) DeleteTask(ctx context.Context, containerID string) (uint32, time.Time, error) {
-	return 0, time.Time{}, nil
-}
-func (c *MockContainerdClient) Delete(ctx context.Context, containerID string) error { return nil }
-func (c *MockContainerdClient) Status(ctx context.Context, containerID string) (containerd.ProcessStatus, error) {
-	return "null", nil
-}
-func (c *MockContainerdClient) UpdateResources(ctx context.Context, containerID string, resources *libcontainerdtypes.Resources) error {
-	return nil
-}
-func (c *MockContainerdClient) CreateCheckpoint(ctx context.Context, containerID, checkpointDir string, exit bool) error {
-	return nil
-}

+ 43 - 23
integration/container/wait_test.go

@@ -108,28 +108,25 @@ func TestWaitConditions(t *testing.T) {
 	cli := request.NewAPIClient(t)
 
 	testCases := []struct {
-		doc          string
-		waitCond     containertypes.WaitCondition
-		expectedCode int64
+		doc      string
+		waitCond containertypes.WaitCondition
+		runOpts  []func(*container.TestContainerConfig)
 	}{
 		{
-			doc:          "default",
-			expectedCode: 99,
+			doc: "default",
 		},
 		{
-			doc:          "not-running",
-			expectedCode: 99,
-			waitCond:     containertypes.WaitConditionNotRunning,
+			doc:      "not-running",
+			waitCond: containertypes.WaitConditionNotRunning,
 		},
 		{
-			doc:          "next-exit",
-			expectedCode: 99,
-			waitCond:     containertypes.WaitConditionNextExit,
+			doc:      "next-exit",
+			waitCond: containertypes.WaitConditionNextExit,
 		},
 		{
-			doc:          "removed",
-			expectedCode: 99,
-			waitCond:     containertypes.WaitConditionRemoved,
+			doc:      "removed",
+			waitCond: containertypes.WaitConditionRemoved,
+			runOpts:  []func(*container.TestContainerConfig){container.WithAutoRemove},
 		},
 	}
 
@@ -138,21 +135,44 @@ func TestWaitConditions(t *testing.T) {
 		t.Run(tc.doc, func(t *testing.T) {
 			t.Parallel()
 			ctx := context.Background()
-			opts := []func(*container.TestContainerConfig){
-				container.WithCmd("sh", "-c", "sleep 1; exit 99"),
-			}
-			if tc.waitCond == containertypes.WaitConditionRemoved {
-				opts = append(opts, container.WithAutoRemove)
-			}
-			containerID := container.Run(ctx, t, cli, opts...)
-			poll.WaitOn(t, container.IsInState(ctx, cli, containerID, "running"), poll.WithTimeout(30*time.Second), poll.WithDelay(100*time.Millisecond))
+			opts := append([]func(*container.TestContainerConfig){
+				container.WithCmd("sh", "-c", "read -r; exit 99"),
+				func(tcc *container.TestContainerConfig) {
+					tcc.Config.AttachStdin = true
+					tcc.Config.OpenStdin = true
+				},
+			}, tc.runOpts...)
+			containerID := container.Create(ctx, t, cli, opts...)
+			t.Logf("ContainerID = %v", containerID)
+
+			streams, err := cli.ContainerAttach(ctx, containerID, types.ContainerAttachOptions{Stream: true, Stdin: true})
+			assert.NilError(t, err)
+			defer streams.Close()
 
+			assert.NilError(t, cli.ContainerStart(ctx, containerID, types.ContainerStartOptions{}))
 			waitResC, errC := cli.ContainerWait(ctx, containerID, tc.waitCond)
+			select {
+			case err := <-errC:
+				t.Fatalf("ContainerWait() err = %v", err)
+			case res := <-waitResC:
+				t.Fatalf("ContainerWait() sent exit code (%v) before ContainerStart()", res)
+			default:
+			}
+
+			info, _ := cli.ContainerInspect(ctx, containerID)
+			assert.Equal(t, "running", info.State.Status)
+
+			_, err = streams.Conn.Write([]byte("\n"))
+			assert.NilError(t, err)
+
 			select {
 			case err := <-errC:
 				assert.NilError(t, err)
 			case waitRes := <-waitResC:
-				assert.Check(t, is.Equal(tc.expectedCode, waitRes.StatusCode))
+				assert.Check(t, is.Equal(int64(99), waitRes.StatusCode))
+			case <-time.After(15 * time.Second):
+				info, _ := cli.ContainerInspect(ctx, containerID)
+				t.Fatalf("Timed out waiting for container exit code (status = %q)", info.State.Status)
 			}
 		})
 	}

Filskillnaden har hållts tillbaka eftersom den är för stor
+ 350 - 329
libcontainerd/local/local_windows.go


+ 0 - 4
libcontainerd/local/process_windows.go

@@ -38,7 +38,3 @@ func createStdInCloser(pipe io.WriteCloser, process hcsshim.Process) io.WriteClo
 		return nil
 	})
 }
-
-func (p *process) Cleanup() error {
-	return nil
-}

+ 150 - 329
libcontainerd/remote/client.go

@@ -45,25 +45,34 @@ type client struct {
 	logger   *logrus.Entry
 	ns       string
 
-	backend         libcontainerdtypes.Backend
-	eventQ          queue.Queue
-	oomMu           sync.Mutex
-	oom             map[string]bool
-	v2runcoptionsMu sync.Mutex
-	// v2runcoptions is used for copying options specified on Create() to Start()
-	v2runcoptions map[string]v2runcoptions.Options
+	backend libcontainerdtypes.Backend
+	eventQ  queue.Queue
+}
+
+type container struct {
+	client *client
+	c8dCtr containerd.Container
+
+	v2runcoptions *v2runcoptions.Options
+}
+
+type task struct {
+	containerd.Task
+	ctr *container
+}
+
+type process struct {
+	containerd.Process
 }
 
 // NewClient creates a new libcontainerd client from a containerd client
 func NewClient(ctx context.Context, cli *containerd.Client, stateDir, ns string, b libcontainerdtypes.Backend) (libcontainerdtypes.Client, error) {
 	c := &client{
-		client:        cli,
-		stateDir:      stateDir,
-		logger:        logrus.WithField("module", "libcontainerd").WithField("namespace", ns),
-		ns:            ns,
-		backend:       b,
-		oom:           make(map[string]bool),
-		v2runcoptions: make(map[string]v2runcoptions.Options),
+		client:   cli,
+		stateDir: stateDir,
+		logger:   logrus.WithField("module", "libcontainerd").WithField("namespace", ns),
+		ns:       ns,
+		backend:  b,
 	}
 
 	go c.processEventStream(ctx, ns)
@@ -75,58 +84,36 @@ func (c *client) Version(ctx context.Context) (containerd.Version, error) {
 	return c.client.Version(ctx)
 }
 
-// Restore loads the containerd container.
-// It should not be called concurrently with any other operation for the given ID.
-func (c *client) Restore(ctx context.Context, id string, attachStdio libcontainerdtypes.StdioCallback) (alive bool, pid int, p libcontainerdtypes.Process, err error) {
+func (c *container) newTask(t containerd.Task) *task {
+	return &task{Task: t, ctr: c}
+}
+
+func (c *container) AttachTask(ctx context.Context, attachStdio libcontainerdtypes.StdioCallback) (_ libcontainerdtypes.Task, err error) {
 	var dio *cio.DirectIO
 	defer func() {
 		if err != nil && dio != nil {
 			dio.Cancel()
 			dio.Close()
 		}
-		err = wrapError(err)
 	}()
 
-	ctr, err := c.client.LoadContainer(ctx, id)
-	if err != nil {
-		return false, -1, nil, errors.WithStack(wrapError(err))
-	}
-
 	attachIO := func(fifos *cio.FIFOSet) (cio.IO, error) {
 		// dio must be assigned to the previously defined dio for the defer above
 		// to handle cleanup
-		dio, err = c.newDirectIO(ctx, fifos)
+		dio, err = c.client.newDirectIO(ctx, fifos)
 		if err != nil {
 			return nil, err
 		}
 		return attachStdio(dio)
 	}
-	t, err := ctr.Task(ctx, attachIO)
-	if err != nil && !containerderrors.IsNotFound(err) {
-		return false, -1, nil, errors.Wrap(wrapError(err), "error getting containerd task for container")
-	}
-
-	if t != nil {
-		s, err := t.Status(ctx)
-		if err != nil {
-			return false, -1, nil, errors.Wrap(wrapError(err), "error getting task status")
-		}
-		alive = s.Status != containerd.Stopped
-		pid = int(t.Pid())
+	t, err := c.c8dCtr.Task(ctx, attachIO)
+	if err != nil {
+		return nil, errors.Wrap(wrapError(err), "error getting containerd task for container")
 	}
-
-	c.logger.WithFields(logrus.Fields{
-		"container": id,
-		"alive":     alive,
-		"pid":       pid,
-	}).Debug("restored container")
-
-	return alive, pid, &restoredProcess{
-		p: t,
-	}, nil
+	return c.newTask(t), nil
 }
 
-func (c *client) Create(ctx context.Context, id string, ociSpec *specs.Spec, shim string, runtimeOptions interface{}, opts ...containerd.NewContainerOpts) error {
+func (c *client) NewContainer(ctx context.Context, id string, ociSpec *specs.Spec, shim string, runtimeOptions interface{}, opts ...containerd.NewContainerOpts) (libcontainerdtypes.Container, error) {
 	bdir := c.bundleDir(id)
 	c.logger.WithField("bundle", bdir).WithField("root", ociSpec.Root.Path).Debug("bundle dir created")
 
@@ -137,44 +124,43 @@ func (c *client) Create(ctx context.Context, id string, ociSpec *specs.Spec, shi
 	}
 	opts = append(opts, newOpts...)
 
-	_, err := c.client.NewContainer(ctx, id, opts...)
+	ctr, err := c.client.NewContainer(ctx, id, opts...)
 	if err != nil {
 		if containerderrors.IsAlreadyExists(err) {
-			return errors.WithStack(errdefs.Conflict(errors.New("id already in use")))
+			return nil, errors.WithStack(errdefs.Conflict(errors.New("id already in use")))
 		}
-		return wrapError(err)
+		return nil, wrapError(err)
+	}
+
+	created := container{
+		client: c,
+		c8dCtr: ctr,
 	}
 	if x, ok := runtimeOptions.(*v2runcoptions.Options); ok {
-		c.v2runcoptionsMu.Lock()
-		c.v2runcoptions[id] = *x
-		c.v2runcoptionsMu.Unlock()
+		created.v2runcoptions = x
 	}
-	return nil
+	return &created, nil
 }
 
 // Start create and start a task for the specified containerd id
-func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (int, error) {
-	ctr, err := c.getContainer(ctx, id)
-	if err != nil {
-		return -1, err
-	}
+func (c *container) Start(ctx context.Context, checkpointDir string, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (libcontainerdtypes.Task, error) {
 	var (
 		cp             *types.Descriptor
 		t              containerd.Task
 		rio            cio.IO
-		stdinCloseSync = make(chan struct{})
+		stdinCloseSync = make(chan containerd.Process, 1)
 	)
 
 	if checkpointDir != "" {
 		// write checkpoint to the content store
 		tar := archive.Diff(ctx, "", checkpointDir)
-		cp, err = c.writeContent(ctx, images.MediaTypeContainerd1Checkpoint, checkpointDir, tar)
+		cp, err := c.client.writeContent(ctx, images.MediaTypeContainerd1Checkpoint, checkpointDir, tar)
 		// remove the checkpoint when we're done
 		defer func() {
 			if cp != nil {
-				err := c.client.ContentStore().Delete(context.Background(), cp.Digest)
+				err := c.client.client.ContentStore().Delete(ctx, cp.Digest)
 				if err != nil {
-					c.logger.WithError(err).WithFields(logrus.Fields{
+					c.client.logger.WithError(err).WithFields(logrus.Fields{
 						"ref":    checkpointDir,
 						"digest": cp.Digest,
 					}).Warnf("failed to delete temporary checkpoint entry")
@@ -182,23 +168,27 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin
 			}
 		}()
 		if err := tar.Close(); err != nil {
-			return -1, errors.Wrap(err, "failed to close checkpoint tar stream")
+			return nil, errors.Wrap(err, "failed to close checkpoint tar stream")
 		}
 		if err != nil {
-			return -1, errors.Wrapf(err, "failed to upload checkpoint to containerd")
+			return nil, errors.Wrapf(err, "failed to upload checkpoint to containerd")
 		}
 	}
 
-	spec, err := ctr.Spec(ctx)
+	// Optimization: assume the relevant metadata has not changed in the
+	// moment since the container was created. Elide redundant RPC requests
+	// to refresh the metadata separately for spec and labels.
+	md, err := c.c8dCtr.Info(ctx, containerd.WithoutRefreshedMetadata)
 	if err != nil {
-		return -1, errors.Wrap(err, "failed to retrieve spec")
+		return nil, errors.Wrap(err, "failed to retrieve metadata")
 	}
-	labels, err := ctr.Labels(ctx)
-	if err != nil {
-		return -1, errors.Wrap(err, "failed to retrieve labels")
+	bundle := md.Labels[DockerContainerBundlePath]
+
+	var spec specs.Spec
+	if err := json.Unmarshal(md.Spec.GetValue(), &spec); err != nil {
+		return nil, errors.Wrap(err, "failed to retrieve spec")
 	}
-	bundle := labels[DockerContainerBundlePath]
-	uid, gid := getSpecUser(spec)
+	uid, gid := getSpecUser(&spec)
 
 	taskOpts := []containerd.NewTaskOpts{
 		func(_ context.Context, _ *containerd.Client, info *containerd.TaskInfo) error {
@@ -209,10 +199,8 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin
 
 	if runtime.GOOS != "windows" {
 		taskOpts = append(taskOpts, func(_ context.Context, _ *containerd.Client, info *containerd.TaskInfo) error {
-			c.v2runcoptionsMu.Lock()
-			opts, ok := c.v2runcoptions[id]
-			c.v2runcoptionsMu.Unlock()
-			if ok {
+			if c.v2runcoptions != nil {
+				opts := *c.v2runcoptions
 				opts.IoUid = uint32(uid)
 				opts.IoGid = uint32(gid)
 				info.Options = &opts
@@ -220,14 +208,14 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin
 			return nil
 		})
 	} else {
-		taskOpts = append(taskOpts, withLogLevel(c.logger.Level))
+		taskOpts = append(taskOpts, withLogLevel(c.client.logger.Level))
 	}
 
-	t, err = ctr.NewTask(ctx,
+	t, err = c.c8dCtr.NewTask(ctx,
 		func(id string) (cio.IO, error) {
 			fifos := newFIFOSet(bundle, libcontainerdtypes.InitProcessName, withStdin, spec.Process.Terminal)
 
-			rio, err = c.createIO(fifos, id, libcontainerdtypes.InitProcessName, stdinCloseSync, attachStdio)
+			rio, err = c.createIO(fifos, libcontainerdtypes.InitProcessName, stdinCloseSync, attachStdio)
 			return rio, err
 		},
 		taskOpts...,
@@ -238,21 +226,21 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin
 			rio.Cancel()
 			rio.Close()
 		}
-		return -1, wrapError(err)
+		return nil, errors.Wrap(wrapError(err), "failed to create task for container")
 	}
 
 	// Signal c.createIO that it can call CloseIO
-	close(stdinCloseSync)
+	stdinCloseSync <- t
 
 	if err := t.Start(ctx); err != nil {
 		if _, err := t.Delete(ctx); err != nil {
-			c.logger.WithError(err).WithField("container", id).
+			c.client.logger.WithError(err).WithField("container", c.c8dCtr.ID()).
 				Error("failed to delete task after fail start")
 		}
-		return -1, wrapError(err)
+		return nil, wrapError(err)
 	}
 
-	return int(t.Pid()), nil
+	return c.newTask(t), nil
 }
 
 // Exec creates exec process.
@@ -262,31 +250,21 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin
 // for the container main process, the stdin fifo will be created in Create not
 // the Start call. stdinCloseSync channel should be closed after Start exec
 // process.
-func (c *client) Exec(ctx context.Context, containerID, processID string, spec *specs.Process, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (int, error) {
-	ctr, err := c.getContainer(ctx, containerID)
-	if err != nil {
-		return -1, err
-	}
-	t, err := ctr.Task(ctx, nil)
-	if err != nil {
-		if containerderrors.IsNotFound(err) {
-			return -1, errors.WithStack(errdefs.InvalidParameter(errors.New("container is not running")))
-		}
-		return -1, wrapError(err)
-	}
-
+func (t *task) Exec(ctx context.Context, processID string, spec *specs.Process, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (libcontainerdtypes.Process, error) {
 	var (
 		p              containerd.Process
 		rio            cio.IO
-		stdinCloseSync = make(chan struct{})
+		stdinCloseSync = make(chan containerd.Process, 1)
 	)
 
-	labels, err := ctr.Labels(ctx)
+	// Optimization: assume the DockerContainerBundlePath label has not been
+	// updated since the container metadata was last loaded/refreshed.
+	md, err := t.ctr.c8dCtr.Info(ctx, containerd.WithoutRefreshedMetadata)
 	if err != nil {
-		return -1, wrapError(err)
+		return nil, wrapError(err)
 	}
 
-	fifos := newFIFOSet(labels[DockerContainerBundlePath], processID, withStdin, spec.Terminal)
+	fifos := newFIFOSet(md.Labels[DockerContainerBundlePath], processID, withStdin, spec.Terminal)
 
 	defer func() {
 		if err != nil {
@@ -297,22 +275,22 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec *
 		}
 	}()
 
-	p, err = t.Exec(ctx, processID, spec, func(id string) (cio.IO, error) {
-		rio, err = c.createIO(fifos, containerID, processID, stdinCloseSync, attachStdio)
+	p, err = t.Task.Exec(ctx, processID, spec, func(id string) (cio.IO, error) {
+		rio, err = t.ctr.createIO(fifos, processID, stdinCloseSync, attachStdio)
 		return rio, err
 	})
 	if err != nil {
 		close(stdinCloseSync)
 		if containerderrors.IsAlreadyExists(err) {
-			return -1, errors.WithStack(errdefs.Conflict(errors.New("id already in use")))
+			return nil, errors.WithStack(errdefs.Conflict(errors.New("id already in use")))
 		}
-		return -1, wrapError(err)
+		return nil, wrapError(err)
 	}
 
 	// Signal c.createIO that it can call CloseIO
 	//
 	// the stdin of exec process will be created after p.Start in containerd
-	defer close(stdinCloseSync)
+	defer func() { stdinCloseSync <- p }()
 
 	if err = p.Start(ctx); err != nil {
 		// use new context for cleanup because old one may be cancelled by user, but leave a timeout to make sure
@@ -321,62 +299,29 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec *
 		ctx, cancel := context.WithTimeout(context.Background(), 45*time.Second)
 		defer cancel()
 		p.Delete(ctx)
-		return -1, wrapError(err)
-	}
-	return int(p.Pid()), nil
-}
-
-func (c *client) SignalProcess(ctx context.Context, containerID, processID string, signal syscall.Signal) error {
-	p, err := c.getProcess(ctx, containerID, processID)
-	if err != nil {
-		return err
+		return nil, wrapError(err)
 	}
-	return wrapError(p.Kill(ctx, signal))
+	return process{p}, nil
 }
 
-func (c *client) ResizeTerminal(ctx context.Context, containerID, processID string, width, height int) error {
-	p, err := c.getProcess(ctx, containerID, processID)
-	if err != nil {
-		return err
-	}
-
-	return p.Resize(ctx, uint32(width), uint32(height))
+func (t *task) Kill(ctx context.Context, signal syscall.Signal) error {
+	return wrapError(t.Task.Kill(ctx, signal))
 }
 
-func (c *client) CloseStdin(ctx context.Context, containerID, processID string) error {
-	p, err := c.getProcess(ctx, containerID, processID)
-	if err != nil {
-		return err
-	}
-
-	return p.CloseIO(ctx, containerd.WithStdinCloser)
+func (p process) Kill(ctx context.Context, signal syscall.Signal) error {
+	return wrapError(p.Process.Kill(ctx, signal))
 }
 
-func (c *client) Pause(ctx context.Context, containerID string) error {
-	p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
-	if err != nil {
-		return err
-	}
-
-	return wrapError(p.(containerd.Task).Pause(ctx))
+func (t *task) Pause(ctx context.Context) error {
+	return wrapError(t.Task.Pause(ctx))
 }
 
-func (c *client) Resume(ctx context.Context, containerID string) error {
-	p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
-	if err != nil {
-		return err
-	}
-
-	return p.(containerd.Task).Resume(ctx)
+func (t *task) Resume(ctx context.Context) error {
+	return wrapError(t.Task.Resume(ctx))
 }
 
-func (c *client) Stats(ctx context.Context, containerID string) (*libcontainerdtypes.Stats, error) {
-	p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
-	if err != nil {
-		return nil, err
-	}
-
-	m, err := p.(containerd.Task).Metrics(ctx)
+func (t *task) Stats(ctx context.Context) (*libcontainerdtypes.Stats, error) {
+	m, err := t.Metrics(ctx)
 	if err != nil {
 		return nil, err
 	}
@@ -388,32 +333,8 @@ func (c *client) Stats(ctx context.Context, containerID string) (*libcontainerdt
 	return libcontainerdtypes.InterfaceToStats(m.Timestamp, v), nil
 }
 
-func (c *client) ListPids(ctx context.Context, containerID string) ([]uint32, error) {
-	p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
-	if err != nil {
-		return nil, err
-	}
-
-	pis, err := p.(containerd.Task).Pids(ctx)
-	if err != nil {
-		return nil, err
-	}
-
-	var pids []uint32
-	for _, i := range pis {
-		pids = append(pids, i.Pid)
-	}
-
-	return pids, nil
-}
-
-func (c *client) Summary(ctx context.Context, containerID string) ([]libcontainerdtypes.Summary, error) {
-	p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
-	if err != nil {
-		return nil, err
-	}
-
-	pis, err := p.(containerd.Task).Pids(ctx)
+func (t *task) Summary(ctx context.Context) ([]libcontainerdtypes.Summary, error) {
+	pis, err := t.Pids(ctx)
 	if err != nil {
 		return nil, err
 	}
@@ -434,57 +355,31 @@ func (c *client) Summary(ctx context.Context, containerID string) ([]libcontaine
 	return infos, nil
 }
 
-type restoredProcess struct {
-	p containerd.Process
-}
-
-func (p *restoredProcess) Delete(ctx context.Context) (uint32, time.Time, error) {
-	if p.p == nil {
-		return 255, time.Now(), nil
-	}
-	status, err := p.p.Delete(ctx)
-	if err != nil {
-		return 255, time.Now(), nil
-	}
-	return status.ExitCode(), status.ExitTime(), nil
+func (t *task) Delete(ctx context.Context) (*containerd.ExitStatus, error) {
+	s, err := t.Task.Delete(ctx)
+	return s, wrapError(err)
 }
 
-func (c *client) DeleteTask(ctx context.Context, containerID string) (uint32, time.Time, error) {
-	p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
-	if err != nil {
-		return 255, time.Now(), nil
-	}
-
-	status, err := p.Delete(ctx)
-	if err != nil {
-		return 255, time.Now(), nil
-	}
-	return status.ExitCode(), status.ExitTime(), nil
+func (p process) Delete(ctx context.Context) (*containerd.ExitStatus, error) {
+	s, err := p.Process.Delete(ctx)
+	return s, wrapError(err)
 }
 
-func (c *client) Delete(ctx context.Context, containerID string) error {
-	ctr, err := c.getContainer(ctx, containerID)
-	if err != nil {
-		return err
-	}
-	labels, err := ctr.Labels(ctx)
+func (c *container) Delete(ctx context.Context) error {
+	// Optimization: assume the DockerContainerBundlePath label has not been
+	// updated since the container metadata was last loaded/refreshed.
+	md, err := c.c8dCtr.Info(ctx, containerd.WithoutRefreshedMetadata)
 	if err != nil {
 		return err
 	}
-	bundle := labels[DockerContainerBundlePath]
-	if err := ctr.Delete(ctx); err != nil {
+	bundle := md.Labels[DockerContainerBundlePath]
+	if err := c.c8dCtr.Delete(ctx); err != nil {
 		return wrapError(err)
 	}
-	c.oomMu.Lock()
-	delete(c.oom, containerID)
-	c.oomMu.Unlock()
-	c.v2runcoptionsMu.Lock()
-	delete(c.v2runcoptions, containerID)
-	c.v2runcoptionsMu.Unlock()
 	if os.Getenv("LIBCONTAINERD_NOCLEAN") != "1" {
 		if err := os.RemoveAll(bundle); err != nil {
-			c.logger.WithError(err).WithFields(logrus.Fields{
-				"container": containerID,
+			c.client.logger.WithContext(ctx).WithError(err).WithFields(logrus.Fields{
+				"container": c.c8dCtr.ID(),
 				"bundle":    bundle,
 			}).Error("failed to remove state dir")
 		}
@@ -492,28 +387,25 @@ func (c *client) Delete(ctx context.Context, containerID string) error {
 	return nil
 }
 
-func (c *client) Status(ctx context.Context, containerID string) (containerd.ProcessStatus, error) {
-	t, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
-	if err != nil {
-		return containerd.Unknown, err
-	}
-	s, err := t.Status(ctx)
-	if err != nil {
-		return containerd.Unknown, wrapError(err)
-	}
-	return s.Status, nil
+func (t *task) ForceDelete(ctx context.Context) error {
+	_, err := t.Task.Delete(ctx, containerd.WithProcessKill)
+	return wrapError(err)
+}
+
+func (t *task) Status(ctx context.Context) (containerd.Status, error) {
+	s, err := t.Task.Status(ctx)
+	return s, wrapError(err)
+}
+
+func (p process) Status(ctx context.Context) (containerd.Status, error) {
+	s, err := p.Process.Status(ctx)
+	return s, wrapError(err)
 }
 
-func (c *client) getCheckpointOptions(id string, exit bool) containerd.CheckpointTaskOpts {
+func (c *container) getCheckpointOptions(exit bool) containerd.CheckpointTaskOpts {
 	return func(r *containerd.CheckpointTaskInfo) error {
-		if r.Options == nil {
-			c.v2runcoptionsMu.Lock()
-			_, ok := c.v2runcoptions[id]
-			c.v2runcoptionsMu.Unlock()
-			if ok {
-				r.Options = &v2runcoptions.CheckpointOptions{Exit: exit}
-			}
-			return nil
+		if r.Options == nil && c.v2runcoptions != nil {
+			r.Options = &v2runcoptions.CheckpointOptions{}
 		}
 
 		switch opts := r.Options.(type) {
@@ -525,27 +417,21 @@ func (c *client) getCheckpointOptions(id string, exit bool) containerd.Checkpoin
 	}
 }
 
-func (c *client) CreateCheckpoint(ctx context.Context, containerID, checkpointDir string, exit bool) error {
-	p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
-	if err != nil {
-		return err
-	}
-
-	opts := []containerd.CheckpointTaskOpts{c.getCheckpointOptions(containerID, exit)}
-	img, err := p.(containerd.Task).Checkpoint(ctx, opts...)
+func (t *task) CreateCheckpoint(ctx context.Context, checkpointDir string, exit bool) error {
+	img, err := t.Task.Checkpoint(ctx, t.ctr.getCheckpointOptions(exit))
 	if err != nil {
 		return wrapError(err)
 	}
 	// Whatever happens, delete the checkpoint from containerd
 	defer func() {
-		err := c.client.ImageService().Delete(context.Background(), img.Name())
+		err := t.ctr.client.client.ImageService().Delete(ctx, img.Name())
 		if err != nil {
-			c.logger.WithError(err).WithField("digest", img.Target().Digest).
+			t.ctr.client.logger.WithError(err).WithField("digest", img.Target().Digest).
 				Warnf("failed to delete checkpoint image")
 		}
 	}()
 
-	b, err := content.ReadBlob(ctx, c.client.ContentStore(), img.Target())
+	b, err := content.ReadBlob(ctx, t.ctr.client.client.ContentStore(), img.Target())
 	if err != nil {
 		return errdefs.System(errors.Wrapf(err, "failed to retrieve checkpoint data"))
 	}
@@ -566,7 +452,7 @@ func (c *client) CreateCheckpoint(ctx context.Context, containerID, checkpointDi
 		return errdefs.System(errors.Wrapf(err, "invalid checkpoint"))
 	}
 
-	rat, err := c.client.ContentStore().ReaderAt(ctx, *cpDesc)
+	rat, err := t.ctr.client.client.ContentStore().ReaderAt(ctx, *cpDesc)
 	if err != nil {
 		return errdefs.System(errors.Wrapf(err, "failed to get checkpoint reader"))
 	}
@@ -579,7 +465,8 @@ func (c *client) CreateCheckpoint(ctx context.Context, containerID, checkpointDi
 	return err
 }
 
-func (c *client) getContainer(ctx context.Context, id string) (containerd.Container, error) {
+// LoadContainer loads the containerd container.
+func (c *client) LoadContainer(ctx context.Context, id string) (libcontainerdtypes.Container, error) {
 	ctr, err := c.client.LoadContainer(ctx, id)
 	if err != nil {
 		if containerderrors.IsNotFound(err) {
@@ -587,42 +474,25 @@ func (c *client) getContainer(ctx context.Context, id string) (containerd.Contai
 		}
 		return nil, wrapError(err)
 	}
-	return ctr, nil
+	return &container{client: c, c8dCtr: ctr}, nil
 }
 
-func (c *client) getProcess(ctx context.Context, containerID, processID string) (containerd.Process, error) {
-	ctr, err := c.getContainer(ctx, containerID)
-	if err != nil {
-		return nil, err
-	}
-	t, err := ctr.Task(ctx, nil)
+func (c *container) Task(ctx context.Context) (libcontainerdtypes.Task, error) {
+	t, err := c.c8dCtr.Task(ctx, nil)
 	if err != nil {
-		if containerderrors.IsNotFound(err) {
-			return nil, errors.WithStack(errdefs.NotFound(errors.New("container is not running")))
-		}
-		return nil, wrapError(err)
-	}
-	if processID == libcontainerdtypes.InitProcessName {
-		return t, nil
-	}
-	p, err := t.LoadProcess(ctx, processID, nil)
-	if err != nil {
-		if containerderrors.IsNotFound(err) {
-			return nil, errors.WithStack(errdefs.NotFound(errors.New("no such exec")))
-		}
 		return nil, wrapError(err)
 	}
-	return p, nil
+	return c.newTask(t), nil
 }
 
 // createIO creates the io to be used by a process
 // This needs to get a pointer to interface as upon closure the process may not have yet been registered
-func (c *client) createIO(fifos *cio.FIFOSet, containerID, processID string, stdinCloseSync chan struct{}, attachStdio libcontainerdtypes.StdioCallback) (cio.IO, error) {
+func (c *container) createIO(fifos *cio.FIFOSet, processID string, stdinCloseSync chan containerd.Process, attachStdio libcontainerdtypes.StdioCallback) (cio.IO, error) {
 	var (
 		io  *cio.DirectIO
 		err error
 	)
-	io, err = c.newDirectIO(context.Background(), fifos)
+	io, err = c.client.newDirectIO(context.Background(), fifos)
 	if err != nil {
 		return nil, err
 	}
@@ -639,13 +509,13 @@ func (c *client) createIO(fifos *cio.FIFOSet, containerID, processID string, std
 				// Do the rest in a new routine to avoid a deadlock if the
 				// Exec/Start call failed.
 				go func() {
-					<-stdinCloseSync
-					p, err := c.getProcess(context.Background(), containerID, processID)
-					if err == nil {
-						err = p.CloseIO(context.Background(), containerd.WithStdinCloser)
-						if err != nil && strings.Contains(err.Error(), "transport is closing") {
-							err = nil
-						}
+					p, ok := <-stdinCloseSync
+					if !ok {
+						return
+					}
+					err = p.CloseIO(context.Background(), containerd.WithStdinCloser)
+					if err != nil && strings.Contains(err.Error(), "transport is closing") {
+						err = nil
 					}
 				}()
 			})
@@ -665,51 +535,12 @@ func (c *client) processEvent(ctx context.Context, et libcontainerdtypes.EventTy
 	c.eventQ.Append(ei.ContainerID, func() {
 		err := c.backend.ProcessEvent(ei.ContainerID, et, ei)
 		if err != nil {
-			c.logger.WithError(err).WithFields(logrus.Fields{
+			c.logger.WithContext(ctx).WithError(err).WithFields(logrus.Fields{
 				"container":  ei.ContainerID,
 				"event":      et,
 				"event-info": ei,
 			}).Error("failed to process event")
 		}
-
-		if et == libcontainerdtypes.EventExit && ei.ProcessID != ei.ContainerID {
-			p, err := c.getProcess(ctx, ei.ContainerID, ei.ProcessID)
-			if err != nil {
-
-				c.logger.WithError(errors.New("no such process")).
-					WithFields(logrus.Fields{
-						"error":     err,
-						"container": ei.ContainerID,
-						"process":   ei.ProcessID,
-					}).Error("exit event")
-				return
-			}
-
-			ctr, err := c.getContainer(ctx, ei.ContainerID)
-			if err != nil {
-				c.logger.WithFields(logrus.Fields{
-					"container": ei.ContainerID,
-					"error":     err,
-				}).Error("failed to find container")
-			} else {
-				labels, err := ctr.Labels(ctx)
-				if err != nil {
-					c.logger.WithFields(logrus.Fields{
-						"container": ei.ContainerID,
-						"error":     err,
-					}).Error("failed to get container labels")
-					return
-				}
-				newFIFOSet(labels[DockerContainerBundlePath], ei.ProcessID, true, false).Close()
-			}
-			_, err = p.Delete(context.Background())
-			if err != nil {
-				c.logger.WithError(err).WithFields(logrus.Fields{
-					"container": ei.ContainerID,
-					"process":   ei.ProcessID,
-				}).Warn("failed to delete process")
-			}
-		}
 	})
 }
 
@@ -767,7 +598,6 @@ func (c *client) processEventStream(ctx context.Context, ns string) {
 	c.logger.Debug("processing event stream")
 
 	for {
-		var oomKilled bool
 		select {
 		case err = <-errC:
 			if err != nil {
@@ -825,9 +655,7 @@ func (c *client) processEventStream(ctx context.Context, ns string) {
 				et = libcontainerdtypes.EventOOM
 				ei = libcontainerdtypes.EventInfo{
 					ContainerID: t.ContainerID,
-					OOMKilled:   true,
 				}
-				oomKilled = true
 			case *apievents.TaskExecAdded:
 				et = libcontainerdtypes.EventExecAdded
 				ei = libcontainerdtypes.EventInfo{
@@ -866,13 +694,6 @@ func (c *client) processEventStream(ctx context.Context, ns string) {
 				continue
 			}
 
-			c.oomMu.Lock()
-			if oomKilled {
-				c.oom[ei.ContainerID] = true
-			}
-			ei.OOMKilled = c.oom[ei.ContainerID]
-			c.oomMu.Unlock()
-
 			c.processEvent(ctx, et, ei)
 		}
 	}

+ 2 - 7
libcontainerd/remote/client_linux.go

@@ -20,15 +20,10 @@ func summaryFromInterface(i interface{}) (*libcontainerdtypes.Summary, error) {
 	return &libcontainerdtypes.Summary{}, nil
 }
 
-func (c *client) UpdateResources(ctx context.Context, containerID string, resources *libcontainerdtypes.Resources) error {
-	p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
-	if err != nil {
-		return err
-	}
-
+func (t *task) UpdateResources(ctx context.Context, resources *libcontainerdtypes.Resources) error {
 	// go doesn't like the alias in 1.8, this means this need to be
 	// platform specific
-	return p.(containerd.Task).Update(ctx, containerd.WithResources((*specs.LinuxResources)(resources)))
+	return t.Update(ctx, containerd.WithResources((*specs.LinuxResources)(resources)))
 }
 
 func hostIDFromMap(id uint32, mp []specs.LinuxIDMapping) int {

+ 1 - 1
libcontainerd/remote/client_windows.go

@@ -87,7 +87,7 @@ func (c *client) newDirectIO(ctx context.Context, fifos *cio.FIFOSet) (*cio.Dire
 	return cio.NewDirectIOFromFIFOSet(ctx, pipes.stdin, pipes.stdout, pipes.stderr, fifos), nil
 }
 
-func (c *client) UpdateResources(ctx context.Context, containerID string, resources *libcontainerdtypes.Resources) error {
+func (t *task) UpdateResources(ctx context.Context, resources *libcontainerdtypes.Resources) error {
 	// TODO: (containerd): Not implemented, but don't error.
 	return nil
 }

+ 62 - 0
libcontainerd/replace.go

@@ -0,0 +1,62 @@
+package libcontainerd // import "github.com/docker/docker/libcontainerd"
+
+import (
+	"context"
+
+	"github.com/containerd/containerd"
+	"github.com/opencontainers/runtime-spec/specs-go"
+	"github.com/pkg/errors"
+	"github.com/sirupsen/logrus"
+
+	"github.com/docker/docker/errdefs"
+	"github.com/docker/docker/libcontainerd/types"
+)
+
+// ReplaceContainer creates a new container, replacing any existing container
+// with the same id if necessary.
+func ReplaceContainer(ctx context.Context, client types.Client, id string, spec *specs.Spec, shim string, runtimeOptions interface{}, opts ...containerd.NewContainerOpts) (types.Container, error) {
+	newContainer := func() (types.Container, error) {
+		return client.NewContainer(ctx, id, spec, shim, runtimeOptions, opts...)
+	}
+	ctr, err := newContainer()
+	if err == nil || !errdefs.IsConflict(err) {
+		return ctr, err
+	}
+
+	log := logrus.WithContext(ctx).WithField("container", id)
+	log.Debug("A container already exists with the same ID. Attempting to clean up the old container.")
+	ctr, err = client.LoadContainer(ctx, id)
+	if err != nil {
+		if errdefs.IsNotFound(err) {
+			// Task failed successfully: the container no longer exists,
+			// despite us not doing anything. May as well try to create
+			// the container again. It might succeed.
+			return newContainer()
+		}
+		return nil, errors.Wrap(err, "could not load stale containerd container object")
+	}
+	tsk, err := ctr.Task(ctx)
+	if err != nil {
+		if errdefs.IsNotFound(err) {
+			goto deleteContainer
+		}
+		// There is no point in trying to delete the container if we
+		// cannot determine whether or not it has a task. The containerd
+		// client would just try to load the task itself, get the same
+		// error, and give up.
+		return nil, errors.Wrap(err, "could not load stale containerd task object")
+	}
+	if err := tsk.ForceDelete(ctx); err != nil {
+		if !errdefs.IsNotFound(err) {
+			return nil, errors.Wrap(err, "could not delete stale containerd task object")
+		}
+		// The task might have exited on its own. Proceed with
+		// attempting to delete the container.
+	}
+deleteContainer:
+	if err := ctr.Delete(ctx); err != nil && !errdefs.IsNotFound(err) {
+		return nil, errors.Wrap(err, "could not delete stale containerd container object")
+	}
+
+	return newContainer()
+}

+ 45 - 20
libcontainerd/types/types.go

@@ -33,7 +33,6 @@ type EventInfo struct {
 	Pid         uint32
 	ExitCode    uint32
 	ExitedAt    time.Time
-	OOMKilled   bool
 	Error       error
 }
 
@@ -44,32 +43,58 @@ type Backend interface {
 
 // Process of a container
 type Process interface {
-	Delete(context.Context) (uint32, time.Time, error)
+	// Pid is the system specific process id
+	Pid() uint32
+	// Kill sends the provided signal to the process
+	Kill(ctx context.Context, signal syscall.Signal) error
+	// Resize changes the width and height of the process's terminal
+	Resize(ctx context.Context, width, height uint32) error
+	// Delete removes the process and any resources allocated returning the exit status
+	Delete(context.Context) (*containerd.ExitStatus, error)
 }
 
 // Client provides access to containerd features.
 type Client interface {
 	Version(ctx context.Context) (containerd.Version, error)
+	// LoadContainer loads the metadata for a container from containerd.
+	LoadContainer(ctx context.Context, containerID string) (Container, error)
+	// NewContainer creates a new containerd container.
+	NewContainer(ctx context.Context, containerID string, spec *specs.Spec, shim string, runtimeOptions interface{}, opts ...containerd.NewContainerOpts) (Container, error)
+}
 
-	Restore(ctx context.Context, containerID string, attachStdio StdioCallback) (alive bool, pid int, p Process, err error)
-
-	Create(ctx context.Context, containerID string, spec *specs.Spec, shim string, runtimeOptions interface{}, opts ...containerd.NewContainerOpts) error
-	Start(ctx context.Context, containerID, checkpointDir string, withStdin bool, attachStdio StdioCallback) (pid int, err error)
-	SignalProcess(ctx context.Context, containerID, processID string, signal syscall.Signal) error
-	Exec(ctx context.Context, containerID, processID string, spec *specs.Process, withStdin bool, attachStdio StdioCallback) (int, error)
-	ResizeTerminal(ctx context.Context, containerID, processID string, width, height int) error
-	CloseStdin(ctx context.Context, containerID, processID string) error
-	Pause(ctx context.Context, containerID string) error
-	Resume(ctx context.Context, containerID string) error
-	Stats(ctx context.Context, containerID string) (*Stats, error)
-	ListPids(ctx context.Context, containerID string) ([]uint32, error)
-	Summary(ctx context.Context, containerID string) ([]Summary, error)
-	DeleteTask(ctx context.Context, containerID string) (uint32, time.Time, error)
-	Delete(ctx context.Context, containerID string) error
-	Status(ctx context.Context, containerID string) (containerd.ProcessStatus, error)
+// Container provides access to a containerd container.
+type Container interface {
+	Start(ctx context.Context, checkpointDir string, withStdin bool, attachStdio StdioCallback) (Task, error)
+	Task(ctx context.Context) (Task, error)
+	// AttachTask returns the current task for the container and reattaches
+	// to the IO for the running task. If no task exists for the container
+	// a NotFound error is returned.
+	//
+	// Clients must make sure that only one reader is attached to the task.
+	AttachTask(ctx context.Context, attachStdio StdioCallback) (Task, error)
+	// Delete removes the container and associated resources
+	Delete(context.Context) error
+}
 
-	UpdateResources(ctx context.Context, containerID string, resources *Resources) error
-	CreateCheckpoint(ctx context.Context, containerID, checkpointDir string, exit bool) error
+// Task provides access to a running containerd container.
+type Task interface {
+	Process
+	// Pause suspends the execution of the task
+	Pause(context.Context) error
+	// Resume the execution of the task
+	Resume(context.Context) error
+	Stats(ctx context.Context) (*Stats, error)
+	// Pids returns a list of system specific process ids inside the task
+	Pids(context.Context) ([]containerd.ProcessInfo, error)
+	Summary(ctx context.Context) ([]Summary, error)
+	// ForceDelete forcefully kills the task's processes and deletes the task
+	ForceDelete(context.Context) error
+	// Status returns the executing status of the task
+	Status(ctx context.Context) (containerd.Status, error)
+	// Exec creates and starts a new process inside the task
+	Exec(ctx context.Context, processID string, spec *specs.Process, withStdin bool, attachStdio StdioCallback) (Process, error)
+	UpdateResources(ctx context.Context, resources *Resources) error
+	CreateCheckpoint(ctx context.Context, checkpointDir string, exit bool) error
 }
 
 // StdioCallback is called to connect a container or process stdio.

+ 85 - 41
plugin/executor/containerd/containerd.go

@@ -2,6 +2,7 @@ package containerd // import "github.com/docker/docker/plugin/executor/container
 
 import (
 	"context"
+	"fmt"
 	"io"
 	"sync"
 	"syscall"
@@ -28,6 +29,7 @@ func New(ctx context.Context, rootDir string, cli *containerd.Client, ns string,
 		rootDir:     rootDir,
 		exitHandler: exitHandler,
 		runtime:     runtime,
+		plugins:     make(map[string]*c8dPlugin),
 	}
 
 	client, err := libcontainerd.NewClient(ctx, cli, rootDir, ns, e)
@@ -44,77 +46,112 @@ type Executor struct {
 	client      libcontainerdtypes.Client
 	exitHandler ExitHandler
 	runtime     types.Runtime
+
+	mu      sync.Mutex // Guards plugins map
+	plugins map[string]*c8dPlugin
+}
+
+type c8dPlugin struct {
+	log *logrus.Entry
+	ctr libcontainerdtypes.Container
+	tsk libcontainerdtypes.Task
 }
 
 // deleteTaskAndContainer deletes plugin task and then plugin container from containerd
-func deleteTaskAndContainer(ctx context.Context, cli libcontainerdtypes.Client, id string, p libcontainerdtypes.Process) {
-	if p != nil {
-		if _, _, err := p.Delete(ctx); err != nil && !errdefs.IsNotFound(err) {
-			logrus.WithError(err).WithField("id", id).Error("failed to delete plugin task from containerd")
-		}
-	} else {
-		if _, _, err := cli.DeleteTask(ctx, id); err != nil && !errdefs.IsNotFound(err) {
-			logrus.WithError(err).WithField("id", id).Error("failed to delete plugin task from containerd")
+func (p c8dPlugin) deleteTaskAndContainer(ctx context.Context) {
+	if p.tsk != nil {
+		if _, err := p.tsk.Delete(ctx); err != nil && !errdefs.IsNotFound(err) {
+			p.log.WithError(err).Error("failed to delete plugin task from containerd")
 		}
 	}
-
-	if err := cli.Delete(ctx, id); err != nil && !errdefs.IsNotFound(err) {
-		logrus.WithError(err).WithField("id", id).Error("failed to delete plugin container from containerd")
+	if p.ctr != nil {
+		if err := p.ctr.Delete(ctx); err != nil && !errdefs.IsNotFound(err) {
+			p.log.WithError(err).Error("failed to delete plugin container from containerd")
+		}
 	}
 }
 
 // Create creates a new container
 func (e *Executor) Create(id string, spec specs.Spec, stdout, stderr io.WriteCloser) error {
 	ctx := context.Background()
-	err := e.client.Create(ctx, id, &spec, e.runtime.Shim.Binary, e.runtime.Shim.Opts)
+	log := logrus.WithField("plugin", id)
+	ctr, err := libcontainerd.ReplaceContainer(ctx, e.client, id, &spec, e.runtime.Shim.Binary, e.runtime.Shim.Opts)
 	if err != nil {
-		status, err2 := e.client.Status(ctx, id)
-		if err2 != nil {
-			if !errdefs.IsNotFound(err2) {
-				logrus.WithError(err2).WithField("id", id).Warn("Received an error while attempting to read plugin status")
-			}
-		} else {
-			if status != containerd.Running && status != containerd.Unknown {
-				if err2 := e.client.Delete(ctx, id); err2 != nil && !errdefs.IsNotFound(err2) {
-					logrus.WithError(err2).WithField("plugin", id).Error("Error cleaning up containerd container")
-				}
-				err = e.client.Create(ctx, id, &spec, e.runtime.Shim.Binary, e.runtime.Shim.Opts)
-			}
-		}
-
-		if err != nil {
-			return errors.Wrap(err, "error creating containerd container")
-		}
+		return errors.Wrap(err, "error creating containerd container for plugin")
 	}
 
-	_, err = e.client.Start(ctx, id, "", false, attachStreamsFunc(stdout, stderr))
+	p := c8dPlugin{log: log, ctr: ctr}
+	p.tsk, err = ctr.Start(ctx, "", false, attachStreamsFunc(stdout, stderr))
 	if err != nil {
-		deleteTaskAndContainer(ctx, e.client, id, nil)
+		p.deleteTaskAndContainer(ctx)
+		return err
 	}
-	return err
+	e.mu.Lock()
+	defer e.mu.Unlock()
+	e.plugins[id] = &p
+	return nil
 }
 
 // Restore restores a container
 func (e *Executor) Restore(id string, stdout, stderr io.WriteCloser) (bool, error) {
-	alive, _, p, err := e.client.Restore(context.Background(), id, attachStreamsFunc(stdout, stderr))
-	if err != nil && !errdefs.IsNotFound(err) {
+	ctx := context.Background()
+	p := c8dPlugin{log: logrus.WithField("plugin", id)}
+	ctr, err := e.client.LoadContainer(ctx, id)
+	if err != nil {
+		if errdefs.IsNotFound(err) {
+			return false, nil
+		}
 		return false, err
 	}
-	if !alive {
-		deleteTaskAndContainer(context.Background(), e.client, id, p)
+	p.tsk, err = ctr.AttachTask(ctx, attachStreamsFunc(stdout, stderr))
+	if err != nil {
+		if errdefs.IsNotFound(err) {
+			p.deleteTaskAndContainer(ctx)
+			return false, nil
+		}
+		return false, err
 	}
-	return alive, nil
+	s, err := p.tsk.Status(ctx)
+	if err != nil {
+		if errdefs.IsNotFound(err) {
+			// Task vanished after attaching?
+			p.tsk = nil
+			p.deleteTaskAndContainer(ctx)
+			return false, nil
+		}
+		return false, err
+	}
+	if s.Status == containerd.Stopped {
+		p.deleteTaskAndContainer(ctx)
+		return false, nil
+	}
+	e.mu.Lock()
+	defer e.mu.Unlock()
+	e.plugins[id] = &p
+	return true, nil
 }
 
 // IsRunning returns if the container with the given id is running
 func (e *Executor) IsRunning(id string) (bool, error) {
-	status, err := e.client.Status(context.Background(), id)
-	return status == containerd.Running, err
+	e.mu.Lock()
+	p := e.plugins[id]
+	e.mu.Unlock()
+	if p == nil {
+		return false, errdefs.NotFound(fmt.Errorf("unknown plugin %q", id))
+	}
+	status, err := p.tsk.Status(context.Background())
+	return status.Status == containerd.Running, err
 }
 
 // Signal sends the specified signal to the container
 func (e *Executor) Signal(id string, signal syscall.Signal) error {
-	return e.client.SignalProcess(context.Background(), id, libcontainerdtypes.InitProcessName, signal)
+	e.mu.Lock()
+	p := e.plugins[id]
+	e.mu.Unlock()
+	if p == nil {
+		return errdefs.NotFound(fmt.Errorf("unknown plugin %q", id))
+	}
+	return p.tsk.Kill(context.Background(), signal)
 }
 
 // ProcessEvent handles events from containerd
@@ -122,7 +159,14 @@ func (e *Executor) Signal(id string, signal syscall.Signal) error {
 func (e *Executor) ProcessEvent(id string, et libcontainerdtypes.EventType, ei libcontainerdtypes.EventInfo) error {
 	switch et {
 	case libcontainerdtypes.EventExit:
-		deleteTaskAndContainer(context.Background(), e.client, id, nil)
+		e.mu.Lock()
+		p := e.plugins[id]
+		e.mu.Unlock()
+		if p == nil {
+			logrus.WithField("id", id).Warn("Received exit event for an unknown plugin")
+		} else {
+			p.deleteTaskAndContainer(context.Background())
+		}
 		return e.exitHandler.HandleExitEvent(ei.ContainerID)
 	}
 	return nil

Vissa filer visades inte eftersom för många filer har ändrats