diff --git a/api/swagger.yaml b/api/swagger.yaml index 181c80a418..622441b268 100644 --- a/api/swagger.yaml +++ b/api/swagger.yaml @@ -4650,7 +4650,8 @@ definitions: example: false OOMKilled: description: | - Whether this container has been killed because it ran out of memory. + Whether a process within this container has been killed because it ran + out of memory since the container was last started. type: "boolean" example: false Dead: diff --git a/container/container.go b/container/container.go index d0f70e139e..09fab63511 100644 --- a/container/container.go +++ b/container/container.go @@ -19,7 +19,6 @@ import ( mounttypes "github.com/docker/docker/api/types/mount" swarmtypes "github.com/docker/docker/api/types/swarm" "github.com/docker/docker/container/stream" - "github.com/docker/docker/daemon/exec" "github.com/docker/docker/daemon/logger" "github.com/docker/docker/daemon/logger/jsonfilelog" "github.com/docker/docker/daemon/logger/local" @@ -28,6 +27,7 @@ import ( "github.com/docker/docker/errdefs" "github.com/docker/docker/image" "github.com/docker/docker/layer" + libcontainerdtypes "github.com/docker/docker/libcontainerd/types" "github.com/docker/docker/pkg/containerfs" "github.com/docker/docker/pkg/idtools" "github.com/docker/docker/pkg/ioutils" @@ -53,9 +53,6 @@ type ExitStatus struct { // The exit code with which the container exited. ExitCode int - // Whether the container encountered an OOM. - OOMKilled bool - // Time at which the container died ExitedAt time.Time } @@ -89,7 +86,7 @@ type Container struct { HasBeenManuallyRestarted bool `json:"-"` // used to distinguish restart caused by restart policy from the manual one MountPoints map[string]*volumemounts.MountPoint HostConfig *containertypes.HostConfig `json:"-"` // do not serialize the host config in the json, otherwise we'll make the container unportable - ExecCommands *exec.Store `json:"-"` + ExecCommands *ExecStore `json:"-"` DependencyStore agentexec.DependencyGetter `json:"-"` SecretReferences []*swarmtypes.SecretReference ConfigReferences []*swarmtypes.ConfigReference @@ -124,7 +121,7 @@ func NewBaseContainer(id, root string) *Container { return &Container{ ID: id, State: NewState(), - ExecCommands: exec.NewStore(), + ExecCommands: NewExecStore(), Root: root, MountPoints: make(map[string]*volumemounts.MountPoint), StreamConfig: stream.NewConfig(), @@ -755,6 +752,47 @@ func (container *Container) CreateDaemonEnvironment(tty bool, linkedEnv []string return env } +// RestoreTask restores the containerd container and task handles and reattaches +// the IO for the running task. Container state is not synced with containerd's +// state. +// +// An errdefs.NotFound error is returned if the container does not exist in +// containerd. However, a nil error is returned if the task does not exist in +// containerd. +func (container *Container) RestoreTask(ctx context.Context, client libcontainerdtypes.Client) error { + container.Lock() + defer container.Unlock() + var err error + container.ctr, err = client.LoadContainer(ctx, container.ID) + if err != nil { + return err + } + container.task, err = container.ctr.AttachTask(ctx, container.InitializeStdio) + if err != nil && !errdefs.IsNotFound(err) { + return err + } + return nil +} + +// GetRunningTask asserts that the container is running and returns the Task for +// the container. An errdefs.Conflict error is returned if the container is not +// in the Running state. +// +// A system error is returned if container is in a bad state: Running is true +// but has a nil Task. +// +// The container lock must be held when calling this method. +func (container *Container) GetRunningTask() (libcontainerdtypes.Task, error) { + if !container.Running { + return nil, errdefs.Conflict(fmt.Errorf("container %s is not running", container.ID)) + } + tsk, ok := container.Task() + if !ok { + return nil, errdefs.System(errors.WithStack(fmt.Errorf("container %s is in Running state but has no containerd Task set", container.ID))) + } + return tsk, nil +} + type rio struct { cio.IO diff --git a/daemon/exec/exec.go b/container/exec.go similarity index 59% rename from daemon/exec/exec.go rename to container/exec.go index 2cf1833d7d..18e86c6a4f 100644 --- a/daemon/exec/exec.go +++ b/container/exec.go @@ -1,20 +1,20 @@ -package exec // import "github.com/docker/docker/daemon/exec" +package container // import "github.com/docker/docker/container" import ( - "context" "runtime" "sync" "github.com/containerd/containerd/cio" "github.com/docker/docker/container/stream" + "github.com/docker/docker/libcontainerd/types" "github.com/docker/docker/pkg/stringid" "github.com/sirupsen/logrus" ) -// Config holds the configurations for execs. The Daemon keeps +// ExecConfig holds the configurations for execs. The Daemon keeps // track of both running and finished execs so that they can be // examined both during and after completion. -type Config struct { +type ExecConfig struct { sync.Mutex Started chan struct{} StreamConfig *stream.Config @@ -25,7 +25,7 @@ type Config struct { OpenStderr bool OpenStdout bool CanRemove bool - ContainerID string + Container *Container DetachKeys []byte Entrypoint string Args []string @@ -34,39 +34,22 @@ type Config struct { User string WorkingDir string Env []string - Pid int + Process types.Process ConsoleSize *[2]uint } -// NewConfig initializes the a new exec configuration -func NewConfig() *Config { - return &Config{ +// NewExecConfig initializes the a new exec configuration +func NewExecConfig(c *Container) *ExecConfig { + return &ExecConfig{ ID: stringid.GenerateRandomID(), + Container: c, StreamConfig: stream.NewConfig(), Started: make(chan struct{}), } } -type rio struct { - cio.IO - - sc *stream.Config -} - -func (i *rio) Close() error { - i.IO.Close() - - return i.sc.CloseStreams() -} - -func (i *rio) Wait() { - i.sc.Wait(context.Background()) - - i.IO.Wait() -} - // InitializeStdio is called by libcontainerd to connect the stdio. -func (c *Config) InitializeStdio(iop *cio.DirectIO) (cio.IO, error) { +func (c *ExecConfig) InitializeStdio(iop *cio.DirectIO) (cio.IO, error) { c.StreamConfig.CopyToPipe(iop) if c.StreamConfig.Stdin() == nil && !c.Tty && runtime.GOOS == "windows" { @@ -81,68 +64,68 @@ func (c *Config) InitializeStdio(iop *cio.DirectIO) (cio.IO, error) { } // CloseStreams closes the stdio streams for the exec -func (c *Config) CloseStreams() error { +func (c *ExecConfig) CloseStreams() error { return c.StreamConfig.CloseStreams() } // SetExitCode sets the exec config's exit code -func (c *Config) SetExitCode(code int) { +func (c *ExecConfig) SetExitCode(code int) { c.ExitCode = &code } -// Store keeps track of the exec configurations. -type Store struct { - byID map[string]*Config - sync.RWMutex +// ExecStore keeps track of the exec configurations. +type ExecStore struct { + byID map[string]*ExecConfig + mu sync.RWMutex } -// NewStore initializes a new exec store. -func NewStore() *Store { - return &Store{ - byID: make(map[string]*Config), +// NewExecStore initializes a new exec store. +func NewExecStore() *ExecStore { + return &ExecStore{ + byID: make(map[string]*ExecConfig), } } // Commands returns the exec configurations in the store. -func (e *Store) Commands() map[string]*Config { - e.RLock() - byID := make(map[string]*Config, len(e.byID)) +func (e *ExecStore) Commands() map[string]*ExecConfig { + e.mu.RLock() + byID := make(map[string]*ExecConfig, len(e.byID)) for id, config := range e.byID { byID[id] = config } - e.RUnlock() + e.mu.RUnlock() return byID } // Add adds a new exec configuration to the store. -func (e *Store) Add(id string, Config *Config) { - e.Lock() +func (e *ExecStore) Add(id string, Config *ExecConfig) { + e.mu.Lock() e.byID[id] = Config - e.Unlock() + e.mu.Unlock() } // Get returns an exec configuration by its id. -func (e *Store) Get(id string) *Config { - e.RLock() +func (e *ExecStore) Get(id string) *ExecConfig { + e.mu.RLock() res := e.byID[id] - e.RUnlock() + e.mu.RUnlock() return res } // Delete removes an exec configuration from the store. -func (e *Store) Delete(id string, pid int) { - e.Lock() +func (e *ExecStore) Delete(id string) { + e.mu.Lock() delete(e.byID, id) - e.Unlock() + e.mu.Unlock() } // List returns the list of exec ids in the store. -func (e *Store) List() []string { +func (e *ExecStore) List() []string { var IDs []string - e.RLock() + e.mu.RLock() for id := range e.byID { IDs = append(IDs, id) } - e.RUnlock() + e.mu.RUnlock() return IDs } diff --git a/container/state.go b/container/state.go index 1267c8694a..cdf88fe371 100644 --- a/container/state.go +++ b/container/state.go @@ -8,6 +8,7 @@ import ( "time" "github.com/docker/docker/api/types" + libcontainerdtypes "github.com/docker/docker/libcontainerd/types" units "github.com/docker/go-units" ) @@ -36,6 +37,14 @@ type State struct { stopWaiters []chan<- StateStatus removeOnlyWaiters []chan<- StateStatus + + // The libcontainerd reference fields are unexported to force consumers + // to access them through the getter methods with multi-valued returns + // so that they can't forget to nil-check: the code won't compile unless + // the nil-check result is explicitly consumed or discarded. + + ctr libcontainerdtypes.Container + task libcontainerdtypes.Task } // StateStatus is used to return container wait results. @@ -260,7 +269,7 @@ func (s *State) SetExitCode(ec int) { } // SetRunning sets the state of the container to "running". -func (s *State) SetRunning(pid int, initial bool) { +func (s *State) SetRunning(ctr libcontainerdtypes.Container, tsk libcontainerdtypes.Task, initial bool) { s.ErrorMsg = "" s.Paused = false s.Running = true @@ -269,7 +278,14 @@ func (s *State) SetRunning(pid int, initial bool) { s.Paused = false } s.ExitCodeValue = 0 - s.Pid = pid + s.ctr = ctr + s.task = tsk + if tsk != nil { + s.Pid = int(tsk.Pid()) + } else { + s.Pid = 0 + } + s.OOMKilled = false if initial { s.StartedAt = time.Now().UTC() } @@ -287,7 +303,6 @@ func (s *State) SetStopped(exitStatus *ExitStatus) { s.FinishedAt = exitStatus.ExitedAt } s.ExitCodeValue = exitStatus.ExitCode - s.OOMKilled = exitStatus.OOMKilled s.notifyAndClear(&s.stopWaiters) } @@ -303,7 +318,6 @@ func (s *State) SetRestarting(exitStatus *ExitStatus) { s.Pid = 0 s.FinishedAt = time.Now().UTC() s.ExitCodeValue = exitStatus.ExitCode - s.OOMKilled = exitStatus.OOMKilled s.notifyAndClear(&s.stopWaiters) } @@ -405,3 +419,21 @@ func (s *State) notifyAndClear(waiters *[]chan<- StateStatus) { } *waiters = nil } + +// C8dContainer returns a reference to the libcontainerd Container object for +// the container and whether the reference is valid. +// +// The container lock must be held when calling this method. +func (s *State) C8dContainer() (_ libcontainerdtypes.Container, ok bool) { + return s.ctr, s.ctr != nil +} + +// Task returns a reference to the libcontainerd Task object for the container +// and whether the reference is valid. +// +// The container lock must be held when calling this method. +// +// See also: (*Container).GetRunningTask(). +func (s *State) Task() (_ libcontainerdtypes.Task, ok bool) { + return s.task, s.task != nil +} diff --git a/container/state_test.go b/container/state_test.go index 09dfb56089..f4f22f70d6 100644 --- a/container/state_test.go +++ b/container/state_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/docker/docker/api/types" + libcontainerdtypes "github.com/docker/docker/libcontainerd/types" ) func TestIsValidHealthString(t *testing.T) { @@ -28,6 +29,13 @@ func TestIsValidHealthString(t *testing.T) { } } +type mockTask struct { + libcontainerdtypes.Task + pid uint32 +} + +func (t *mockTask) Pid() uint32 { return t.pid } + func TestStateRunStop(t *testing.T) { s := NewState() @@ -60,7 +68,7 @@ func TestStateRunStop(t *testing.T) { // Set the state to "Running". s.Lock() - s.SetRunning(i, true) + s.SetRunning(nil, &mockTask{pid: uint32(i)}, true) s.Unlock() // Assert desired state. @@ -125,7 +133,7 @@ func TestStateTimeoutWait(t *testing.T) { s := NewState() s.Lock() - s.SetRunning(0, true) + s.SetRunning(nil, nil, true) s.Unlock() // Start a wait with a timeout. @@ -174,7 +182,7 @@ func TestCorrectStateWaitResultAfterRestart(t *testing.T) { s := NewState() s.Lock() - s.SetRunning(0, true) + s.SetRunning(nil, nil, true) s.Unlock() waitC := s.Wait(context.Background(), WaitConditionNotRunning) @@ -185,7 +193,7 @@ func TestCorrectStateWaitResultAfterRestart(t *testing.T) { s.Unlock() s.Lock() - s.SetRunning(0, true) + s.SetRunning(nil, nil, true) s.Unlock() got := <-waitC diff --git a/daemon/checkpoint.go b/daemon/checkpoint.go index 5cbe8574ab..97acc1d897 100644 --- a/daemon/checkpoint.go +++ b/daemon/checkpoint.go @@ -57,8 +57,11 @@ func (daemon *Daemon) CheckpointCreate(name string, config types.CheckpointCreat return err } - if !container.IsRunning() { - return fmt.Errorf("Container %s not running", name) + container.Lock() + tsk, err := container.GetRunningTask() + container.Unlock() + if err != nil { + return err } if !validCheckpointNamePattern.MatchString(config.CheckpointID) { @@ -70,7 +73,7 @@ func (daemon *Daemon) CheckpointCreate(name string, config types.CheckpointCreat return fmt.Errorf("cannot checkpoint container %s: %s", name, err) } - err = daemon.containerd.CreateCheckpoint(context.Background(), container.ID, checkpointDir, config.Exit) + err = tsk.CreateCheckpoint(context.Background(), checkpointDir, config.Exit) if err != nil { os.RemoveAll(checkpointDir) return fmt.Errorf("Cannot checkpoint container %s: %s", name, err) diff --git a/daemon/daemon.go b/daemon/daemon.go index dfacd217e5..f0fa951bf6 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -30,7 +30,6 @@ import ( "github.com/docker/docker/daemon/config" ctrd "github.com/docker/docker/daemon/containerd" "github.com/docker/docker/daemon/events" - "github.com/docker/docker/daemon/exec" _ "github.com/docker/docker/daemon/graphdriver/register" // register graph drivers "github.com/docker/docker/daemon/images" "github.com/docker/docker/daemon/logger" @@ -75,7 +74,7 @@ type Daemon struct { repository string containers container.Store containersReplica container.ViewDB - execCommands *exec.Store + execCommands *container.ExecStore imageService ImageService configStore *config.Config statsCollector *stats.Collector @@ -317,40 +316,43 @@ func (daemon *Daemon) restore() error { logger(c).Debug("restoring container") - var ( - err error - alive bool - ec uint32 - exitedAt time.Time - process libcontainerdtypes.Process - ) + var es *containerd.ExitStatus - alive, _, process, err = daemon.containerd.Restore(context.Background(), c.ID, c.InitializeStdio) - if err != nil && !errdefs.IsNotFound(err) { + if err := c.RestoreTask(context.Background(), daemon.containerd); err != nil && !errdefs.IsNotFound(err) { logger(c).WithError(err).Error("failed to restore container with containerd") return } - logger(c).Debugf("alive: %v", alive) - if !alive { - // If process is not nil, cleanup dead container from containerd. - // If process is nil then the above `containerd.Restore` returned an errdefs.NotFoundError, - // and docker's view of the container state will be updated accorrdingly via SetStopped further down. - if process != nil { - logger(c).Debug("cleaning up dead container process") - ec, exitedAt, err = process.Delete(context.Background()) - if err != nil && !errdefs.IsNotFound(err) { - logger(c).WithError(err).Error("failed to delete container from containerd") - return + + alive := false + status := containerd.Unknown + if tsk, ok := c.Task(); ok { + s, err := tsk.Status(context.Background()) + if err != nil { + logger(c).WithError(err).Error("failed to get task status") + } else { + status = s.Status + alive = status != containerd.Stopped + if !alive { + logger(c).Debug("cleaning up dead container process") + es, err = tsk.Delete(context.Background()) + if err != nil && !errdefs.IsNotFound(err) { + logger(c).WithError(err).Error("failed to delete task from containerd") + return + } + } else if !daemon.configStore.LiveRestoreEnabled { + logger(c).Debug("shutting down container considered alive by containerd") + if err := daemon.shutdownContainer(c); err != nil && !errdefs.IsNotFound(err) { + log.WithError(err).Error("error shutting down container") + return + } + status = containerd.Stopped + alive = false + c.ResetRestartManager(false) } } - } else if !daemon.configStore.LiveRestoreEnabled { - logger(c).Debug("shutting down container considered alive by containerd") - if err := daemon.shutdownContainer(c); err != nil && !errdefs.IsNotFound(err) { - log.WithError(err).Error("error shutting down container") - return - } - c.ResetRestartManager(false) } + // If the containerd task for the container was not found, docker's view of the + // container state will be updated accordingly via SetStopped further down. if c.IsRunning() || c.IsPaused() { logger(c).Debug("syncing container on disk state with real state") @@ -359,29 +361,22 @@ func (daemon *Daemon) restore() error { switch { case c.IsPaused() && alive: - s, err := daemon.containerd.Status(context.Background(), c.ID) - if err != nil { - logger(c).WithError(err).Error("failed to get container status") - } else { - logger(c).WithField("state", s).Info("restored container paused") - switch s { - case containerd.Paused, containerd.Pausing: - // nothing to do - case containerd.Stopped: - alive = false - case containerd.Unknown: - log.Error("unknown status for paused container during restore") - default: - // running - c.Lock() - c.Paused = false - daemon.setStateCounter(c) - daemon.updateHealthMonitor(c) - if err := c.CheckpointTo(daemon.containersReplica); err != nil { - log.WithError(err).Error("failed to update paused container state") - } - c.Unlock() + logger(c).WithField("state", status).Info("restored container paused") + switch status { + case containerd.Paused, containerd.Pausing: + // nothing to do + case containerd.Unknown, containerd.Stopped, "": + log.WithField("status", status).Error("unexpected status for paused container during restore") + default: + // running + c.Lock() + c.Paused = false + daemon.setStateCounter(c) + daemon.updateHealthMonitor(c) + if err := c.CheckpointTo(daemon.containersReplica); err != nil { + log.WithError(err).Error("failed to update paused container state") } + c.Unlock() } case !c.IsPaused() && alive: logger(c).Debug("restoring healthcheck") @@ -393,7 +388,12 @@ func (daemon *Daemon) restore() error { if !alive { logger(c).Debug("setting stopped state") c.Lock() - c.SetStopped(&container.ExitStatus{ExitCode: int(ec), ExitedAt: exitedAt}) + var ces container.ExitStatus + if es != nil { + ces.ExitCode = int(es.ExitCode()) + ces.ExitedAt = es.ExitTime() + } + c.SetStopped(&ces) daemon.Cleanup(c) if err := c.CheckpointTo(daemon.containersReplica); err != nil { log.WithError(err).Error("failed to update stopped container state") @@ -956,7 +956,7 @@ func NewDaemon(ctx context.Context, config *config.Config, pluginStore *plugin.S if d.containersReplica, err = container.NewViewDB(); err != nil { return nil, err } - d.execCommands = exec.NewStore() + d.execCommands = container.NewExecStore() d.statsCollector = d.newStatsCollector(1 * time.Second) d.EventsService = events.New() diff --git a/daemon/daemon_unix.go b/daemon/daemon_unix.go index d0b71d1c65..acc2e85ba6 100644 --- a/daemon/daemon_unix.go +++ b/daemon/daemon_unix.go @@ -1387,10 +1387,13 @@ func copyBlkioEntry(entries []*statsV1.BlkIOEntry) []types.BlkioStatEntry { } func (daemon *Daemon) stats(c *container.Container) (*types.StatsJSON, error) { - if !c.IsRunning() { - return nil, errNotRunning(c.ID) + c.Lock() + task, err := c.GetRunningTask() + c.Unlock() + if err != nil { + return nil, err } - cs, err := daemon.containerd.Stats(context.Background(), c.ID) + cs, err := task.Stats(context.Background()) if err != nil { if strings.Contains(err.Error(), "container not found") { return nil, containerNotFound(c.ID) diff --git a/daemon/daemon_windows.go b/daemon/daemon_windows.go index 3dc8e09aeb..12f014c890 100644 --- a/daemon/daemon_windows.go +++ b/daemon/daemon_windows.go @@ -14,6 +14,7 @@ import ( containertypes "github.com/docker/docker/api/types/container" "github.com/docker/docker/container" "github.com/docker/docker/daemon/config" + "github.com/docker/docker/errdefs" "github.com/docker/docker/libcontainerd/local" "github.com/docker/docker/libcontainerd/remote" "github.com/docker/docker/libnetwork" @@ -515,14 +516,17 @@ func driverOptions(_ *config.Config) nwconfig.Option { } func (daemon *Daemon) stats(c *container.Container) (*types.StatsJSON, error) { - if !c.IsRunning() { - return nil, errNotRunning(c.ID) + c.Lock() + task, err := c.GetRunningTask() + c.Unlock() + if err != nil { + return nil, err } // Obtain the stats from HCS via libcontainerd - stats, err := daemon.containerd.Stats(context.Background(), c.ID) + stats, err := task.Stats(context.Background()) if err != nil { - if strings.Contains(err.Error(), "container not found") { + if errdefs.IsNotFound(err) { return nil, containerNotFound(c.ID) } return nil, err diff --git a/daemon/delete.go b/daemon/delete.go index db04705bef..e10c668352 100644 --- a/daemon/delete.go +++ b/daemon/delete.go @@ -138,7 +138,14 @@ func (daemon *Daemon) cleanupContainer(container *container.Container, config ty container.RWLayer = nil } - if err := containerfs.EnsureRemoveAll(container.Root); err != nil { + // Hold the container lock while deleting the container root directory + // so that other goroutines don't attempt to concurrently open files + // within it. Having any file open on Windows (without the + // FILE_SHARE_DELETE flag) will block it from being deleted. + container.Lock() + err := containerfs.EnsureRemoveAll(container.Root) + container.Unlock() + if err != nil { err = errors.Wrapf(err, "unable to remove filesystem for %s", container.ID) container.SetRemovalError(err) return err diff --git a/daemon/delete_test.go b/daemon/delete_test.go index c95309e012..de7bbdc486 100644 --- a/daemon/delete_test.go +++ b/daemon/delete_test.go @@ -52,7 +52,7 @@ func TestContainerDelete(t *testing.T) { fixMsg: "Stop the container before attempting removal or force remove", initContainer: func() *container.Container { c := newContainerWithState(container.NewState()) - c.SetRunning(0, true) + c.SetRunning(nil, nil, true) c.SetRestarting(&container.ExitStatus{}) return c }}, diff --git a/daemon/exec.go b/daemon/exec.go index 4675ee4557..d4e5ab3df2 100644 --- a/daemon/exec.go +++ b/daemon/exec.go @@ -2,18 +2,19 @@ package daemon // import "github.com/docker/docker/daemon" import ( "context" + "encoding/json" "fmt" "io" "runtime" "strings" "time" + "github.com/containerd/containerd" "github.com/docker/docker/api/types" containertypes "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/strslice" "github.com/docker/docker/container" "github.com/docker/docker/container/stream" - "github.com/docker/docker/daemon/exec" "github.com/docker/docker/errdefs" "github.com/docker/docker/pkg/pools" "github.com/moby/sys/signal" @@ -23,7 +24,7 @@ import ( "github.com/sirupsen/logrus" ) -func (daemon *Daemon) registerExecCommand(container *container.Container, config *exec.Config) { +func (daemon *Daemon) registerExecCommand(container *container.Container, config *container.ExecConfig) { // Storing execs in container in order to kill them gracefully whenever the container is stopped or removed. container.ExecCommands.Add(config.ID, config) // Storing execs in daemon for easy access via Engine API. @@ -41,7 +42,7 @@ func (daemon *Daemon) ExecExists(name string) (bool, error) { // getExecConfig looks up the exec instance by name. If the container associated // with the exec instance is stopped or paused, it will return an error. -func (daemon *Daemon) getExecConfig(name string) (*exec.Config, error) { +func (daemon *Daemon) getExecConfig(name string) (*container.ExecConfig, error) { ec := daemon.execCommands.Get(name) if ec == nil { return nil, errExecNotFound(name) @@ -52,7 +53,7 @@ func (daemon *Daemon) getExecConfig(name string) (*exec.Config, error) { // saying the container isn't running, we should return a 404 so that // the user sees the same error now that they will after the // 5 minute clean-up loop is run which erases old/dead execs. - ctr := daemon.containers.Get(ec.ContainerID) + ctr := daemon.containers.Get(ec.Container.ID) if ctr == nil { return nil, containerNotFound(name) } @@ -68,9 +69,9 @@ func (daemon *Daemon) getExecConfig(name string) (*exec.Config, error) { return ec, nil } -func (daemon *Daemon) unregisterExecCommand(container *container.Container, execConfig *exec.Config) { - container.ExecCommands.Delete(execConfig.ID, execConfig.Pid) - daemon.execCommands.Delete(execConfig.ID, execConfig.Pid) +func (daemon *Daemon) unregisterExecCommand(container *container.Container, execConfig *container.ExecConfig) { + container.ExecCommands.Delete(execConfig.ID) + daemon.execCommands.Delete(execConfig.ID) } func (daemon *Daemon) getActiveContainer(name string) (*container.Container, error) { @@ -110,11 +111,10 @@ func (daemon *Daemon) ContainerExecCreate(name string, config *types.ExecConfig) } } - execConfig := exec.NewConfig() + execConfig := container.NewExecConfig(cntr) execConfig.OpenStdin = config.AttachStdin execConfig.OpenStdout = config.AttachStdout execConfig.OpenStderr = config.AttachStderr - execConfig.ContainerID = cntr.ID execConfig.DetachKeys = keys execConfig.Entrypoint = entrypoint execConfig.Args = args @@ -174,27 +174,23 @@ func (daemon *Daemon) ContainerExecStart(ctx context.Context, name string, optio ec.Running = true ec.Unlock() - c := daemon.containers.Get(ec.ContainerID) - if c == nil { - return containerNotFound(ec.ContainerID) - } - logrus.Debugf("starting exec command %s in container %s", ec.ID, c.ID) + logrus.Debugf("starting exec command %s in container %s", ec.ID, ec.Container.ID) attributes := map[string]string{ "execID": ec.ID, } - daemon.LogContainerEventWithAttributes(c, "exec_start: "+ec.Entrypoint+" "+strings.Join(ec.Args, " "), attributes) + daemon.LogContainerEventWithAttributes(ec.Container, "exec_start: "+ec.Entrypoint+" "+strings.Join(ec.Args, " "), attributes) defer func() { if err != nil { ec.Lock() + ec.Container.ExecCommands.Delete(ec.ID) ec.Running = false exitCode := 126 ec.ExitCode = &exitCode if err := ec.CloseStreams(); err != nil { - logrus.Errorf("failed to cleanup exec %s streams: %s", c.ID, err) + logrus.Errorf("failed to cleanup exec %s streams: %s", ec.Container.ID, err) } ec.Unlock() - c.ExecCommands.Delete(ec.ID, ec.Pid) } }() @@ -222,15 +218,18 @@ func (daemon *Daemon) ContainerExecStart(ctx context.Context, name string, optio p := &specs.Process{} if runtime.GOOS != "windows" { - ctr, err := daemon.containerdCli.LoadContainer(ctx, ec.ContainerID) + ctr, err := daemon.containerdCli.LoadContainer(ctx, ec.Container.ID) if err != nil { return err } - spec, err := ctr.Spec(ctx) + md, err := ctr.Info(ctx, containerd.WithoutRefreshedMetadata) if err != nil { return err } - p = spec.Process + spec := specs.Spec{Process: p} + if err := json.Unmarshal(md.Spec.GetValue(), &spec); err != nil { + return err + } } p.Args = append([]string{ec.Entrypoint}, ec.Args...) p.Env = ec.Env @@ -253,7 +252,7 @@ func (daemon *Daemon) ContainerExecStart(ctx context.Context, name string, optio p.Cwd = "/" } - if err := daemon.execSetPlatformOpt(c, ec, p); err != nil { + if err := daemon.execSetPlatformOpt(ctx, ec, p); err != nil { return err } @@ -274,31 +273,34 @@ func (daemon *Daemon) ContainerExecStart(ctx context.Context, name string, optio defer cancel() attachErr := ec.StreamConfig.CopyStreams(copyCtx, &attachConfig) + ec.Container.Lock() + tsk, err := ec.Container.GetRunningTask() + ec.Container.Unlock() + if err != nil { + return err + } + // Synchronize with libcontainerd event loop ec.Lock() - c.ExecCommands.Lock() - systemPid, err := daemon.containerd.Exec(ctx, c.ID, ec.ID, p, cStdin != nil, ec.InitializeStdio) + ec.Process, err = tsk.Exec(ctx, ec.ID, p, cStdin != nil, ec.InitializeStdio) // the exec context should be ready, or error happened. // close the chan to notify readiness close(ec.Started) if err != nil { - c.ExecCommands.Unlock() - ec.Unlock() + defer ec.Unlock() return translateContainerdStartErr(ec.Entrypoint, ec.SetExitCode, err) } - ec.Pid = systemPid - c.ExecCommands.Unlock() ec.Unlock() select { case <-ctx.Done(): log := logrus. - WithField("container", c.ID). - WithField("exec", name) + WithField("container", ec.Container.ID). + WithField("exec", ec.ID) log.Debug("Sending KILL signal to container process") sigCtx, cancelFunc := context.WithTimeout(context.Background(), 30*time.Second) defer cancelFunc() - err := daemon.containerd.SignalProcess(sigCtx, c.ID, name, signal.SignalMap["KILL"]) + err := ec.Process.Kill(sigCtx, signal.SignalMap["KILL"]) if err != nil { log.WithError(err).Error("Could not send KILL signal to container process") } @@ -311,7 +313,7 @@ func (daemon *Daemon) ContainerExecStart(ctx context.Context, name string, optio attributes := map[string]string{ "execID": ec.ID, } - daemon.LogContainerEventWithAttributes(c, "exec_detach", attributes) + daemon.LogContainerEventWithAttributes(ec.Container, "exec_detach", attributes) } } return nil @@ -328,7 +330,7 @@ func (daemon *Daemon) execCommandGC() { for id, config := range daemon.execCommands.Commands() { if config.CanRemove { cleaned++ - daemon.execCommands.Delete(id, config.Pid) + daemon.execCommands.Delete(id) } else { if _, exists := liveExecCommands[id]; !exists { config.CanRemove = true diff --git a/daemon/exec_linux.go b/daemon/exec_linux.go index d0090d6097..46ed4309ff 100644 --- a/daemon/exec_linux.go +++ b/daemon/exec_linux.go @@ -5,15 +5,14 @@ import ( "github.com/containerd/containerd/pkg/apparmor" "github.com/docker/docker/container" - "github.com/docker/docker/daemon/exec" "github.com/docker/docker/oci/caps" specs "github.com/opencontainers/runtime-spec/specs-go" ) -func (daemon *Daemon) execSetPlatformOpt(c *container.Container, ec *exec.Config, p *specs.Process) error { +func (daemon *Daemon) execSetPlatformOpt(ctx context.Context, ec *container.ExecConfig, p *specs.Process) error { if len(ec.User) > 0 { var err error - p.User, err = getUser(c, ec.User) + p.User, err = getUser(ec.Container, ec.User) if err != nil { return err } @@ -27,9 +26,9 @@ func (daemon *Daemon) execSetPlatformOpt(c *container.Container, ec *exec.Config } if apparmor.HostSupports() { var appArmorProfile string - if c.AppArmorProfile != "" { - appArmorProfile = c.AppArmorProfile - } else if c.HostConfig.Privileged { + if ec.Container.AppArmorProfile != "" { + appArmorProfile = ec.Container.AppArmorProfile + } else if ec.Container.HostConfig.Privileged { // `docker exec --privileged` does not currently disable AppArmor // profiles. Privileged configuration of the container is inherited appArmorProfile = unconfinedAppArmorProfile @@ -51,5 +50,5 @@ func (daemon *Daemon) execSetPlatformOpt(c *container.Container, ec *exec.Config p.ApparmorProfile = appArmorProfile } s := &specs.Spec{Process: p} - return WithRlimits(daemon, c)(context.Background(), nil, nil, s) + return WithRlimits(daemon, ec.Container)(ctx, nil, nil, s) } diff --git a/daemon/exec_linux_test.go b/daemon/exec_linux_test.go index ffef343898..17df7e16ad 100644 --- a/daemon/exec_linux_test.go +++ b/daemon/exec_linux_test.go @@ -4,13 +4,13 @@ package daemon import ( + "context" "testing" "github.com/containerd/containerd/pkg/apparmor" containertypes "github.com/docker/docker/api/types/container" "github.com/docker/docker/container" "github.com/docker/docker/daemon/config" - "github.com/docker/docker/daemon/exec" specs "github.com/opencontainers/runtime-spec/specs-go" "gotest.tools/v3/assert" ) @@ -79,10 +79,10 @@ func TestExecSetPlatformOptAppArmor(t *testing.T) { Privileged: tc.privileged, }, } - ec := &exec.Config{Privileged: execPrivileged} + ec := &container.ExecConfig{Container: c, Privileged: execPrivileged} p := &specs.Process{} - err := d.execSetPlatformOpt(c, ec, p) + err := d.execSetPlatformOpt(context.Background(), ec, p) assert.NilError(t, err) assert.Equal(t, p.ApparmorProfile, tc.expectedProfile) }) diff --git a/daemon/exec_windows.go b/daemon/exec_windows.go index 32f16e9282..a4a8696aed 100644 --- a/daemon/exec_windows.go +++ b/daemon/exec_windows.go @@ -1,13 +1,14 @@ package daemon // import "github.com/docker/docker/daemon" import ( + "context" + "github.com/docker/docker/container" - "github.com/docker/docker/daemon/exec" specs "github.com/opencontainers/runtime-spec/specs-go" ) -func (daemon *Daemon) execSetPlatformOpt(c *container.Container, ec *exec.Config, p *specs.Process) error { - if c.OS == "windows" { +func (daemon *Daemon) execSetPlatformOpt(ctx context.Context, ec *container.ExecConfig, p *specs.Process) error { + if ec.Container.OS == "windows" { p.User.Username = ec.User } return nil diff --git a/daemon/health.go b/daemon/health.go index 5f5779526b..2d94b3b1d9 100644 --- a/daemon/health.go +++ b/daemon/health.go @@ -13,7 +13,6 @@ import ( containertypes "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/strslice" "github.com/docker/docker/container" - "github.com/docker/docker/daemon/exec" "github.com/sirupsen/logrus" ) @@ -69,11 +68,10 @@ func (p *cmdProbe) run(ctx context.Context, d *Daemon, cntr *container.Container cmdSlice = append(getShell(cntr), cmdSlice...) } entrypoint, args := d.getEntrypointAndArgs(strslice.StrSlice{}, cmdSlice) - execConfig := exec.NewConfig() + execConfig := container.NewExecConfig(cntr) execConfig.OpenStdin = false execConfig.OpenStdout = true execConfig.OpenStderr = true - execConfig.ContainerID = cntr.ID execConfig.DetachKeys = []byte{} execConfig.Entrypoint = entrypoint execConfig.Args = args @@ -151,14 +149,23 @@ func (p *cmdProbe) run(ctx context.Context, d *Daemon, cntr *container.Container if err != nil { return nil, err } - if info.ExitCode == nil { - return nil, fmt.Errorf("healthcheck for container %s has no exit code", cntr.ID) + exitCode, err := func() (int, error) { + info.Lock() + defer info.Unlock() + if info.ExitCode == nil { + info.Unlock() + return 0, fmt.Errorf("healthcheck for container %s has no exit code", cntr.ID) + } + return *info.ExitCode, nil + }() + if err != nil { + return nil, err } // Note: Go's json package will handle invalid UTF-8 for us out := output.String() return &types.HealthcheckResult{ End: time.Now(), - ExitCode: *info.ExitCode, + ExitCode: exitCode, Output: out, }, nil } diff --git a/daemon/inspect.go b/daemon/inspect.go index 3fc3de2806..75be2ee2b3 100644 --- a/daemon/inspect.go +++ b/daemon/inspect.go @@ -218,11 +218,17 @@ func (daemon *Daemon) ContainerExecInspect(id string) (*backend.ExecInspect, err return nil, errExecNotFound(id) } - if ctr := daemon.containers.Get(e.ContainerID); ctr == nil { + if ctr := daemon.containers.Get(e.Container.ID); ctr == nil { return nil, errExecNotFound(id) } + e.Lock() + defer e.Unlock() pc := inspectExecProcessConfig(e) + var pid int + if e.Process != nil { + pid = int(e.Process.Pid()) + } return &backend.ExecInspect{ ID: e.ID, @@ -233,9 +239,9 @@ func (daemon *Daemon) ContainerExecInspect(id string) (*backend.ExecInspect, err OpenStdout: e.OpenStdout, OpenStderr: e.OpenStderr, CanRemove: e.CanRemove, - ContainerID: e.ContainerID, + ContainerID: e.Container.ID, DetachKeys: e.DetachKeys, - Pid: e.Pid, + Pid: pid, }, nil } diff --git a/daemon/inspect_linux.go b/daemon/inspect_linux.go index 049a7f743f..9c2c513d0e 100644 --- a/daemon/inspect_linux.go +++ b/daemon/inspect_linux.go @@ -5,7 +5,6 @@ import ( "github.com/docker/docker/api/types/backend" "github.com/docker/docker/api/types/versions/v1p19" "github.com/docker/docker/container" - "github.com/docker/docker/daemon/exec" ) // This sets platform-specific fields @@ -62,7 +61,7 @@ func (daemon *Daemon) containerInspectPre120(name string) (*v1p19.ContainerJSON, }, nil } -func inspectExecProcessConfig(e *exec.Config) *backend.ExecProcessConfig { +func inspectExecProcessConfig(e *container.ExecConfig) *backend.ExecProcessConfig { return &backend.ExecProcessConfig{ Tty: e.Tty, Entrypoint: e.Entrypoint, diff --git a/daemon/inspect_test.go b/daemon/inspect_test.go index 07c026b723..e55af45bea 100644 --- a/daemon/inspect_test.go +++ b/daemon/inspect_test.go @@ -6,7 +6,6 @@ import ( containertypes "github.com/docker/docker/api/types/container" "github.com/docker/docker/container" "github.com/docker/docker/daemon/config" - "github.com/docker/docker/daemon/exec" "gotest.tools/v3/assert" is "gotest.tools/v3/assert/cmp" ) @@ -16,7 +15,7 @@ func TestGetInspectData(t *testing.T) { ID: "inspect-me", HostConfig: &containertypes.HostConfig{}, State: container.NewState(), - ExecCommands: exec.NewStore(), + ExecCommands: container.NewExecStore(), } d := &Daemon{ diff --git a/daemon/inspect_windows.go b/daemon/inspect_windows.go index 12fda670df..9b219d8b8c 100644 --- a/daemon/inspect_windows.go +++ b/daemon/inspect_windows.go @@ -4,7 +4,6 @@ import ( "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/backend" "github.com/docker/docker/container" - "github.com/docker/docker/daemon/exec" ) // This sets platform-specific fields @@ -17,7 +16,7 @@ func (daemon *Daemon) containerInspectPre120(name string) (*types.ContainerJSON, return daemon.ContainerInspectCurrent(name, false) } -func inspectExecProcessConfig(e *exec.Config) *backend.ExecProcessConfig { +func inspectExecProcessConfig(e *container.ExecConfig) *backend.ExecProcessConfig { return &backend.ExecProcessConfig{ Tty: e.Tty, Entrypoint: e.Entrypoint, diff --git a/daemon/kill.go b/daemon/kill.go index 383393e24f..953249c627 100644 --- a/daemon/kill.go +++ b/daemon/kill.go @@ -9,7 +9,6 @@ import ( containerpkg "github.com/docker/docker/container" "github.com/docker/docker/errdefs" - libcontainerdtypes "github.com/docker/docker/libcontainerd/types" "github.com/moby/sys/signal" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -65,8 +64,9 @@ func (daemon *Daemon) killWithSignal(container *containerpkg.Container, stopSign container.Lock() defer container.Unlock() - if !container.Running { - return errNotRunning(container.ID) + task, err := container.GetRunningTask() + if err != nil { + return err } var unpause bool @@ -96,8 +96,7 @@ func (daemon *Daemon) killWithSignal(container *containerpkg.Container, stopSign return nil } - err := daemon.containerd.SignalProcess(context.Background(), container.ID, libcontainerdtypes.InitProcessName, stopSignal) - if err != nil { + if err := task.Kill(context.Background(), stopSignal); err != nil { if errdefs.IsNotFound(err) { unpause = false logrus.WithError(err).WithField("container", container.ID).WithField("action", "kill").Debug("container kill failed because of 'container not found' or 'no such process'") @@ -121,7 +120,7 @@ func (daemon *Daemon) killWithSignal(container *containerpkg.Container, stopSign if unpause { // above kill signal will be sent once resume is finished - if err := daemon.containerd.Resume(context.Background(), container.ID); err != nil { + if err := task.Resume(context.Background()); err != nil { logrus.Warnf("Cannot unpause container %s: %s", container.ID, err) } } diff --git a/daemon/monitor.go b/daemon/monitor.go index 9a087283c8..1e96c1d9d8 100644 --- a/daemon/monitor.go +++ b/daemon/monitor.go @@ -7,6 +7,7 @@ import ( "github.com/docker/docker/api/types" "github.com/docker/docker/container" + "github.com/docker/docker/errdefs" libcontainerdtypes "github.com/docker/docker/libcontainerd/types" "github.com/docker/docker/restartmanager" "github.com/pkg/errors" @@ -25,28 +26,32 @@ func (daemon *Daemon) setStateCounter(c *container.Container) { } func (daemon *Daemon) handleContainerExit(c *container.Container, e *libcontainerdtypes.EventInfo) error { + var exitStatus container.ExitStatus c.Lock() - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - ec, et, err := daemon.containerd.DeleteTask(ctx, c.ID) - cancel() - if err != nil { - logrus.WithError(err).WithField("container", c.ID).Warnf("failed to delete container from containerd") + tsk, ok := c.Task() + if ok { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + es, err := tsk.Delete(ctx) + cancel() + if err != nil { + logrus.WithError(err).WithField("container", c.ID).Warnf("failed to delete container from containerd") + } else { + exitStatus = container.ExitStatus{ + ExitCode: int(es.ExitCode()), + ExitedAt: es.ExitTime(), + } + } } - ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) c.StreamConfig.Wait(ctx) cancel() c.Reset(false) - exitStatus := container.ExitStatus{ - ExitCode: int(ec), - ExitedAt: et, - } if e != nil { exitStatus.ExitCode = int(e.ExitCode) exitStatus.ExitedAt = e.ExitedAt - exitStatus.OOMKilled = e.OOMKilled if e.Error != nil { c.SetError(e.Error) } @@ -54,7 +59,7 @@ func (daemon *Daemon) handleContainerExit(c *container.Container, e *libcontaine daemonShutdown := daemon.IsShuttingDown() execDuration := time.Since(c.StartedAt) - restart, wait, err := c.RestartManager().ShouldRestart(ec, daemonShutdown || c.HasBeenManuallyStopped, execDuration) + restart, wait, err := c.RestartManager().ShouldRestart(uint32(exitStatus.ExitCode), daemonShutdown || c.HasBeenManuallyStopped, execDuration) if err != nil { logrus.WithError(err). WithField("container", c.ID). @@ -71,7 +76,7 @@ func (daemon *Daemon) handleContainerExit(c *container.Container, e *libcontaine // restarted if/when the container is started again daemon.stopHealthchecks(c) attributes := map[string]string{ - "exitCode": strconv.Itoa(int(ec)), + "exitCode": strconv.Itoa(exitStatus.ExitCode), } daemon.Cleanup(c) @@ -141,6 +146,7 @@ func (daemon *Daemon) ProcessEvent(id string, e libcontainerdtypes.EventType, ei c.Lock() defer c.Unlock() + c.OOMKilled = true daemon.updateHealthMonitor(c) if err := c.CheckpointTo(daemon.containersReplica); err != nil { return err @@ -157,6 +163,13 @@ func (daemon *Daemon) ProcessEvent(id string, e libcontainerdtypes.EventType, ei ec := int(ei.ExitCode) execConfig.Lock() defer execConfig.Unlock() + + // Remove the exec command from the container's store only and not the + // daemon's store so that the exec command can be inspected. Remove it + // before mutating execConfig to maintain the invariant that + // c.ExecCommands only contain execs in the Running state. + c.ExecCommands.Delete(execConfig.ID) + execConfig.ExitCode = &ec execConfig.Running = false @@ -168,11 +181,16 @@ func (daemon *Daemon) ProcessEvent(id string, e libcontainerdtypes.EventType, ei logrus.Errorf("failed to cleanup exec %s streams: %s", c.ID, err) } - // remove the exec command from the container's store only and not the - // daemon's store so that the exec command can be inspected. - c.ExecCommands.Delete(execConfig.ID, execConfig.Pid) - exitCode = ec + + go func() { + if _, err := execConfig.Process.Delete(context.Background()); err != nil { + logrus.WithError(err).WithFields(logrus.Fields{ + "container": ei.ContainerID, + "process": ei.ProcessID, + }).Warn("failed to delete process") + } + }() } attributes := map[string]string{ "execID": ei.ProcessID, @@ -185,7 +203,27 @@ func (daemon *Daemon) ProcessEvent(id string, e libcontainerdtypes.EventType, ei // This is here to handle start not generated by docker if !c.Running { - c.SetRunning(int(ei.Pid), false) + ctr, err := daemon.containerd.LoadContainer(context.Background(), c.ID) + if err != nil { + if errdefs.IsNotFound(err) { + // The container was started by not-docker and so could have been deleted by + // not-docker before we got around to loading it from containerd. + logrus.WithField("container", c.ID).WithError(err). + Debug("could not load containerd container for start event") + return nil + } + return err + } + tsk, err := ctr.Task(context.Background()) + if err != nil { + if errdefs.IsNotFound(err) { + logrus.WithField("container", c.ID).WithError(err). + Debug("failed to load task for externally-started container") + return nil + } + return err + } + c.SetRunning(ctr, tsk, false) c.HasBeenManuallyStopped = false c.HasBeenStartedBefore = true daemon.setStateCounter(c) diff --git a/daemon/pause.go b/daemon/pause.go index 51004e6c15..976531e527 100644 --- a/daemon/pause.go +++ b/daemon/pause.go @@ -24,8 +24,9 @@ func (daemon *Daemon) containerPause(container *container.Container) error { defer container.Unlock() // We cannot Pause the container which is not running - if !container.Running { - return errNotRunning(container.ID) + tsk, err := container.GetRunningTask() + if err != nil { + return err } // We cannot Pause the container which is already paused @@ -38,8 +39,8 @@ func (daemon *Daemon) containerPause(container *container.Container) error { return errContainerIsRestarting(container.ID) } - if err := daemon.containerd.Pause(context.Background(), container.ID); err != nil { - return fmt.Errorf("Cannot pause container %s: %s", container.ID, err) + if err := tsk.Pause(context.Background()); err != nil { + return fmt.Errorf("cannot pause container %s: %s", container.ID, err) } container.Paused = true diff --git a/daemon/resize.go b/daemon/resize.go index ac9395379b..2fd427ae9e 100644 --- a/daemon/resize.go +++ b/daemon/resize.go @@ -4,8 +4,6 @@ import ( "context" "fmt" "time" - - libcontainerdtypes "github.com/docker/docker/libcontainerd/types" ) // ContainerResize changes the size of the TTY of the process running @@ -16,11 +14,14 @@ func (daemon *Daemon) ContainerResize(name string, height, width int) error { return err } - if !container.IsRunning() { - return errNotRunning(container.ID) + container.Lock() + tsk, err := container.GetRunningTask() + container.Unlock() + if err != nil { + return err } - if err = daemon.containerd.ResizeTerminal(context.Background(), container.ID, libcontainerdtypes.InitProcessName, width, height); err == nil { + if err = tsk.Resize(context.Background(), uint32(width), uint32(height)); err == nil { attributes := map[string]string{ "height": fmt.Sprintf("%d", height), "width": fmt.Sprintf("%d", width), @@ -46,7 +47,7 @@ func (daemon *Daemon) ContainerExecResize(name string, height, width int) error select { case <-ec.Started: - return daemon.containerd.ResizeTerminal(context.Background(), ec.ContainerID, ec.ID, width, height) + return ec.Process.Resize(context.Background(), uint32(width), uint32(height)) case <-timeout.C: return fmt.Errorf("timeout waiting for exec session ready") } diff --git a/daemon/resize_test.go b/daemon/resize_test.go index 50a96778a7..b17e1fc3d0 100644 --- a/daemon/resize_test.go +++ b/daemon/resize_test.go @@ -8,7 +8,7 @@ import ( "testing" "github.com/docker/docker/container" - "github.com/docker/docker/daemon/exec" + "github.com/docker/docker/libcontainerd/types" "gotest.tools/v3/assert" ) @@ -16,32 +16,28 @@ import ( func TestExecResizeNoSuchExec(t *testing.T) { n := "TestExecResize" d := &Daemon{ - execCommands: exec.NewStore(), + execCommands: container.NewExecStore(), } c := &container.Container{ - ExecCommands: exec.NewStore(), + ExecCommands: container.NewExecStore(), } - ec := &exec.Config{ - ID: n, + ec := &container.ExecConfig{ + ID: n, + Container: c, } d.registerExecCommand(c, ec) err := d.ContainerExecResize("nil", 24, 8) assert.ErrorContains(t, err, "No such exec instance") } -type execResizeMockContainerdClient struct { - MockContainerdClient - ProcessID string - ContainerID string - Width int - Height int +type execResizeMockProcess struct { + types.Process + Width, Height int } -func (c *execResizeMockContainerdClient) ResizeTerminal(ctx context.Context, containerID, processID string, width, height int) error { - c.ProcessID = processID - c.ContainerID = containerID - c.Width = width - c.Height = height +func (p *execResizeMockProcess) Resize(ctx context.Context, width, height uint32) error { + p.Width = int(width) + p.Height = int(height) return nil } @@ -50,30 +46,29 @@ func TestExecResize(t *testing.T) { n := "TestExecResize" width := 24 height := 8 - ec := &exec.Config{ - ID: n, - ContainerID: n, - Started: make(chan struct{}), - } - close(ec.Started) - mc := &execResizeMockContainerdClient{} + mp := &execResizeMockProcess{} d := &Daemon{ - execCommands: exec.NewStore(), - containerd: mc, + execCommands: container.NewExecStore(), containers: container.NewMemoryStore(), } c := &container.Container{ - ExecCommands: exec.NewStore(), + ID: n, + ExecCommands: container.NewExecStore(), State: &container.State{Running: true}, } + ec := &container.ExecConfig{ + ID: n, + Container: c, + Process: mp, + Started: make(chan struct{}), + } + close(ec.Started) d.containers.Add(n, c) d.registerExecCommand(c, ec) err := d.ContainerExecResize(n, height, width) assert.NilError(t, err) - assert.Equal(t, mc.Width, width) - assert.Equal(t, mc.Height, height) - assert.Equal(t, mc.ProcessID, n) - assert.Equal(t, mc.ContainerID, n) + assert.Equal(t, mp.Width, width) + assert.Equal(t, mp.Height, height) } // This test is to make sure that when exec context is not ready, a timeout error should happen. @@ -82,21 +77,22 @@ func TestExecResizeTimeout(t *testing.T) { n := "TestExecResize" width := 24 height := 8 - ec := &exec.Config{ - ID: n, - ContainerID: n, - Started: make(chan struct{}), - } - mc := &execResizeMockContainerdClient{} + mp := &execResizeMockProcess{} d := &Daemon{ - execCommands: exec.NewStore(), - containerd: mc, + execCommands: container.NewExecStore(), containers: container.NewMemoryStore(), } c := &container.Container{ - ExecCommands: exec.NewStore(), + ID: n, + ExecCommands: container.NewExecStore(), State: &container.State{Running: true}, } + ec := &container.ExecConfig{ + ID: n, + Container: c, + Process: mp, + Started: make(chan struct{}), + } d.containers.Add(n, c) d.registerExecCommand(c, ec) err := d.ContainerExecResize(n, height, width) diff --git a/daemon/start.go b/daemon/start.go index ecc0f8b8af..bbdefb0173 100644 --- a/daemon/start.go +++ b/daemon/start.go @@ -9,6 +9,7 @@ import ( containertypes "github.com/docker/docker/api/types/container" "github.com/docker/docker/container" "github.com/docker/docker/errdefs" + "github.com/docker/docker/libcontainerd" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -178,28 +179,17 @@ func (daemon *Daemon) containerStart(container *container.Container, checkpoint ctx := context.TODO() - err = daemon.containerd.Create(ctx, container.ID, spec, shim, createOptions) + ctr, err := libcontainerd.ReplaceContainer(ctx, daemon.containerd, container.ID, spec, shim, createOptions) if err != nil { - if errdefs.IsConflict(err) { - logrus.WithError(err).WithField("container", container.ID).Error("Container not cleaned up from containerd from previous run") - // best effort to clean up old container object - daemon.containerd.DeleteTask(ctx, container.ID) - if err := daemon.containerd.Delete(ctx, container.ID); err != nil && !errdefs.IsNotFound(err) { - logrus.WithError(err).WithField("container", container.ID).Error("Error cleaning up stale containerd container object") - } - err = daemon.containerd.Create(ctx, container.ID, spec, shim, createOptions) - } - if err != nil { - return translateContainerdStartErr(container.Path, container.SetExitCode, err) - } + return translateContainerdStartErr(container.Path, container.SetExitCode, err) } // TODO(mlaventure): we need to specify checkpoint options here - pid, err := daemon.containerd.Start(context.Background(), container.ID, checkpointDir, + tsk, err := ctr.Start(ctx, checkpointDir, container.StreamConfig.Stdin() != nil || container.Config.Tty, container.InitializeStdio) if err != nil { - if err := daemon.containerd.Delete(context.Background(), container.ID); err != nil { + if err := ctr.Delete(context.Background()); err != nil { logrus.WithError(err).WithField("container", container.ID). Error("failed to delete failed start container") } @@ -207,7 +197,7 @@ func (daemon *Daemon) containerStart(container *container.Container, checkpoint } container.HasBeenManuallyRestarted = false - container.SetRunning(pid, true) + container.SetRunning(ctr, tsk, true) container.HasBeenStartedBefore = true daemon.setStateCounter(container) @@ -227,6 +217,14 @@ func (daemon *Daemon) containerStart(container *container.Container, checkpoint // Cleanup releases any network resources allocated to the container along with any rules // around how containers are linked together. It also unmounts the container's root filesystem. func (daemon *Daemon) Cleanup(container *container.Container) { + // Microsoft HCS containers get in a bad state if host resources are + // released while the container still exists. + if ctr, ok := container.C8dContainer(); ok { + if err := ctr.Delete(context.Background()); err != nil { + logrus.Errorf("%s cleanup: failed to delete container from containerd: %v", container.ID, err) + } + } + daemon.releaseNetwork(container) if err := container.UnmountIpcMount(); err != nil { @@ -260,8 +258,4 @@ func (daemon *Daemon) Cleanup(container *container.Container) { } container.CancelAttachContext() - - if err := daemon.containerd.Delete(context.Background(), container.ID); err != nil { - logrus.Errorf("%s cleanup: failed to delete container from containerd: %v", container.ID, err) - } } diff --git a/daemon/top_unix.go b/daemon/top_unix.go index 0287acaf7a..68da2596e4 100644 --- a/daemon/top_unix.go +++ b/daemon/top_unix.go @@ -14,6 +14,7 @@ import ( "github.com/docker/docker/api/types/container" "github.com/docker/docker/errdefs" + libcontainerdtypes "github.com/docker/docker/libcontainerd/types" "github.com/pkg/errors" ) @@ -150,19 +151,32 @@ func (daemon *Daemon) ContainerTop(name string, psArgs string) (*container.Conta return nil, err } - if !ctr.IsRunning() { - return nil, errNotRunning(ctr.ID) - } + tsk, err := func() (libcontainerdtypes.Task, error) { + ctr.Lock() + defer ctr.Unlock() - if ctr.IsRestarting() { - return nil, errContainerIsRestarting(ctr.ID) - } - - procs, err := daemon.containerd.ListPids(context.Background(), ctr.ID) + tsk, err := ctr.GetRunningTask() + if err != nil { + return nil, err + } + if ctr.Restarting { + return nil, errContainerIsRestarting(ctr.ID) + } + return tsk, nil + }() if err != nil { return nil, err } + infos, err := tsk.Pids(context.Background()) + if err != nil { + return nil, err + } + procs := make([]uint32, len(infos)) + for i, p := range infos { + procs[i] = p.Pid + } + args := strings.Split(psArgs, " ") pids := psPidsArg(procs) output, err := exec.Command("ps", append(args, pids)...).Output() diff --git a/daemon/top_windows.go b/daemon/top_windows.go index eaaad4f771..203a5b7c62 100644 --- a/daemon/top_windows.go +++ b/daemon/top_windows.go @@ -7,6 +7,7 @@ import ( "time" containertypes "github.com/docker/docker/api/types/container" + libcontainerdtypes "github.com/docker/docker/libcontainerd/types" units "github.com/docker/go-units" ) @@ -36,15 +37,21 @@ func (daemon *Daemon) ContainerTop(name string, psArgs string) (*containertypes. return nil, err } - if !container.IsRunning() { - return nil, errNotRunning(container.ID) - } + task, err := func() (libcontainerdtypes.Task, error) { + container.Lock() + defer container.Unlock() - if container.IsRestarting() { - return nil, errContainerIsRestarting(container.ID) - } + task, err := container.GetRunningTask() + if err != nil { + return nil, err + } + if container.Restarting { + return nil, errContainerIsRestarting(container.ID) + } + return task, nil + }() - s, err := daemon.containerd.Summary(context.Background(), container.ID) + s, err := task.Summary(context.Background()) if err != nil { return nil, err } diff --git a/daemon/unpause.go b/daemon/unpause.go index fbcf7a589e..eb52256771 100644 --- a/daemon/unpause.go +++ b/daemon/unpause.go @@ -26,8 +26,12 @@ func (daemon *Daemon) containerUnpause(ctr *container.Container) error { if !ctr.Paused { return fmt.Errorf("Container %s is not paused", ctr.ID) } + tsk, err := ctr.GetRunningTask() + if err != nil { + return err + } - if err := daemon.containerd.Resume(context.Background(), ctr.ID); err != nil { + if err := tsk.Resume(context.Background()); err != nil { return fmt.Errorf("Cannot unpause container %s: %s", ctr.ID, err) } diff --git a/daemon/update.go b/daemon/update.go index bd8479fc05..f01635e49e 100644 --- a/daemon/update.go +++ b/daemon/update.go @@ -74,19 +74,28 @@ func (daemon *Daemon) update(name string, hostConfig *container.HostConfig) erro ctr.UpdateMonitor(hostConfig.RestartPolicy) } + defer daemon.LogContainerEvent(ctr, "update") + // If container is not running, update hostConfig struct is enough, // resources will be updated when the container is started again. // If container is running (including paused), we need to update configs // to the real world. - if ctr.IsRunning() && !ctr.IsRestarting() { - if err := daemon.containerd.UpdateResources(context.Background(), ctr.ID, toContainerdResources(hostConfig.Resources)); err != nil { - restoreConfig = true - // TODO: it would be nice if containerd responded with better errors here so we can classify this better. - return errCannotUpdate(ctr.ID, errdefs.System(err)) - } + ctr.Lock() + isRestarting := ctr.Restarting + tsk, err := ctr.GetRunningTask() + ctr.Unlock() + if errdefs.IsConflict(err) || isRestarting { + return nil + } + if err != nil { + return err } - daemon.LogContainerEvent(ctr, "update") + if err := tsk.UpdateResources(context.TODO(), toContainerdResources(hostConfig.Resources)); err != nil { + restoreConfig = true + // TODO: it would be nice if containerd responded with better errors here so we can classify this better. + return errCannotUpdate(ctr.ID, errdefs.System(err)) + } return nil } diff --git a/daemon/util_test.go b/daemon/util_test.go deleted file mode 100644 index 5ac47fef3b..0000000000 --- a/daemon/util_test.go +++ /dev/null @@ -1,74 +0,0 @@ -//go:build linux -// +build linux - -package daemon - -import ( - "context" - "syscall" - "time" - - "github.com/containerd/containerd" - libcontainerdtypes "github.com/docker/docker/libcontainerd/types" - specs "github.com/opencontainers/runtime-spec/specs-go" -) - -type mockProcess struct { -} - -func (m *mockProcess) Delete(_ context.Context) (uint32, time.Time, error) { - return 0, time.Time{}, nil -} - -// Mock containerd client implementation, for unit tests. -type MockContainerdClient struct { -} - -func (c *MockContainerdClient) Version(ctx context.Context) (containerd.Version, error) { - return containerd.Version{}, nil -} -func (c *MockContainerdClient) Restore(ctx context.Context, containerID string, attachStdio libcontainerdtypes.StdioCallback) (alive bool, pid int, p libcontainerdtypes.Process, err error) { - return false, 0, &mockProcess{}, nil -} -func (c *MockContainerdClient) Create(ctx context.Context, containerID string, spec *specs.Spec, shim string, runtimeOptions interface{}, opts ...containerd.NewContainerOpts) error { - return nil -} -func (c *MockContainerdClient) Start(ctx context.Context, containerID, checkpointDir string, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (pid int, err error) { - return 0, nil -} -func (c *MockContainerdClient) SignalProcess(ctx context.Context, containerID, processID string, signal syscall.Signal) error { - return nil -} -func (c *MockContainerdClient) Exec(ctx context.Context, containerID, processID string, spec *specs.Process, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (int, error) { - return 0, nil -} -func (c *MockContainerdClient) ResizeTerminal(ctx context.Context, containerID, processID string, width, height int) error { - return nil -} -func (c *MockContainerdClient) CloseStdin(ctx context.Context, containerID, processID string) error { - return nil -} -func (c *MockContainerdClient) Pause(ctx context.Context, containerID string) error { return nil } -func (c *MockContainerdClient) Resume(ctx context.Context, containerID string) error { return nil } -func (c *MockContainerdClient) Stats(ctx context.Context, containerID string) (*libcontainerdtypes.Stats, error) { - return nil, nil -} -func (c *MockContainerdClient) ListPids(ctx context.Context, containerID string) ([]uint32, error) { - return nil, nil -} -func (c *MockContainerdClient) Summary(ctx context.Context, containerID string) ([]libcontainerdtypes.Summary, error) { - return nil, nil -} -func (c *MockContainerdClient) DeleteTask(ctx context.Context, containerID string) (uint32, time.Time, error) { - return 0, time.Time{}, nil -} -func (c *MockContainerdClient) Delete(ctx context.Context, containerID string) error { return nil } -func (c *MockContainerdClient) Status(ctx context.Context, containerID string) (containerd.ProcessStatus, error) { - return "null", nil -} -func (c *MockContainerdClient) UpdateResources(ctx context.Context, containerID string, resources *libcontainerdtypes.Resources) error { - return nil -} -func (c *MockContainerdClient) CreateCheckpoint(ctx context.Context, containerID, checkpointDir string, exit bool) error { - return nil -} diff --git a/integration/container/wait_test.go b/integration/container/wait_test.go index 12a9313caa..f8e64f0458 100644 --- a/integration/container/wait_test.go +++ b/integration/container/wait_test.go @@ -108,28 +108,25 @@ func TestWaitConditions(t *testing.T) { cli := request.NewAPIClient(t) testCases := []struct { - doc string - waitCond containertypes.WaitCondition - expectedCode int64 + doc string + waitCond containertypes.WaitCondition + runOpts []func(*container.TestContainerConfig) }{ { - doc: "default", - expectedCode: 99, + doc: "default", }, { - doc: "not-running", - expectedCode: 99, - waitCond: containertypes.WaitConditionNotRunning, + doc: "not-running", + waitCond: containertypes.WaitConditionNotRunning, }, { - doc: "next-exit", - expectedCode: 99, - waitCond: containertypes.WaitConditionNextExit, + doc: "next-exit", + waitCond: containertypes.WaitConditionNextExit, }, { - doc: "removed", - expectedCode: 99, - waitCond: containertypes.WaitConditionRemoved, + doc: "removed", + waitCond: containertypes.WaitConditionRemoved, + runOpts: []func(*container.TestContainerConfig){container.WithAutoRemove}, }, } @@ -138,21 +135,44 @@ func TestWaitConditions(t *testing.T) { t.Run(tc.doc, func(t *testing.T) { t.Parallel() ctx := context.Background() - opts := []func(*container.TestContainerConfig){ - container.WithCmd("sh", "-c", "sleep 1; exit 99"), - } - if tc.waitCond == containertypes.WaitConditionRemoved { - opts = append(opts, container.WithAutoRemove) - } - containerID := container.Run(ctx, t, cli, opts...) - poll.WaitOn(t, container.IsInState(ctx, cli, containerID, "running"), poll.WithTimeout(30*time.Second), poll.WithDelay(100*time.Millisecond)) + opts := append([]func(*container.TestContainerConfig){ + container.WithCmd("sh", "-c", "read -r; exit 99"), + func(tcc *container.TestContainerConfig) { + tcc.Config.AttachStdin = true + tcc.Config.OpenStdin = true + }, + }, tc.runOpts...) + containerID := container.Create(ctx, t, cli, opts...) + t.Logf("ContainerID = %v", containerID) + streams, err := cli.ContainerAttach(ctx, containerID, types.ContainerAttachOptions{Stream: true, Stdin: true}) + assert.NilError(t, err) + defer streams.Close() + + assert.NilError(t, cli.ContainerStart(ctx, containerID, types.ContainerStartOptions{})) waitResC, errC := cli.ContainerWait(ctx, containerID, tc.waitCond) + select { + case err := <-errC: + t.Fatalf("ContainerWait() err = %v", err) + case res := <-waitResC: + t.Fatalf("ContainerWait() sent exit code (%v) before ContainerStart()", res) + default: + } + + info, _ := cli.ContainerInspect(ctx, containerID) + assert.Equal(t, "running", info.State.Status) + + _, err = streams.Conn.Write([]byte("\n")) + assert.NilError(t, err) + select { case err := <-errC: assert.NilError(t, err) case waitRes := <-waitResC: - assert.Check(t, is.Equal(tc.expectedCode, waitRes.StatusCode)) + assert.Check(t, is.Equal(int64(99), waitRes.StatusCode)) + case <-time.After(15 * time.Second): + info, _ := cli.ContainerInspect(ctx, containerID) + t.Fatalf("Timed out waiting for container exit code (status = %q)", info.State.Status) } }) } diff --git a/libcontainerd/local/local_windows.go b/libcontainerd/local/local_windows.go index 5cd075d51e..da71805dbd 100644 --- a/libcontainerd/local/local_windows.go +++ b/libcontainerd/local/local_windows.go @@ -32,29 +32,44 @@ import ( ) type process struct { - id string - pid int - hcsProcess hcsshim.Process + // mu guards the mutable fields of this struct. + // + // Always lock mu before ctr's mutex to prevent deadlocks. + mu sync.Mutex + id string // Invariants: immutable + ctr *container // Invariants: immutable, ctr != nil + hcsProcess hcsshim.Process // Is set to nil on process exit + exited *containerd.ExitStatus // Valid iff waitCh is closed + waitCh chan struct{} +} + +type task struct { + process } type container struct { - sync.Mutex + mu sync.Mutex // The ociSpec is required, as client.Create() needs a spec, but can // be called from the RestartManager context which does not otherwise // have access to the Spec + // + // A container value with ociSpec == nil represents a container which + // has been loaded with (*client).LoadContainer, and is ineligible to + // be Start()ed. ociSpec *specs.Spec - hcsContainer hcsshim.Container + hcsContainer hcsshim.Container // Is set to nil on container delete + isPaused bool + client *client id string - status containerd.ProcessStatus - exitedAt time.Time - exitCode uint32 - waitCh chan struct{} - init *process - execs map[string]*process terminateInvoked bool + + // task is a reference to the current task for the container. As a + // corollary, when task == nil the container has no current task: the + // container was never Start()ed or the task was Delete()d. + task *task } // defaultOwner is a tag passed to HCS to allow it to differentiate between @@ -63,22 +78,18 @@ type container struct { const defaultOwner = "docker" type client struct { - sync.Mutex - - stateDir string - backend libcontainerdtypes.Backend - logger *logrus.Entry - eventQ queue.Queue - containers map[string]*container + stateDir string + backend libcontainerdtypes.Backend + logger *logrus.Entry + eventQ queue.Queue } // NewClient creates a new local executor for windows func NewClient(ctx context.Context, cli *containerd.Client, stateDir, ns string, b libcontainerdtypes.Backend) (libcontainerdtypes.Client, error) { c := &client{ - stateDir: stateDir, - backend: b, - logger: logrus.WithField("module", "libcontainerd").WithField("module", "libcontainerd").WithField("namespace", ns), - containers: make(map[string]*container), + stateDir: stateDir, + backend: b, + logger: logrus.WithField("module", "libcontainerd").WithField("namespace", ns), } return c, nil @@ -88,7 +99,7 @@ func (c *client) Version(ctx context.Context) (containerd.Version, error) { return containerd.Version{}, errors.New("not implemented on Windows") } -// Create is the entrypoint to create a container from a spec. +// NewContainer is the entrypoint to create a container from a spec. // Table below shows the fields required for HCS JSON calling parameters, // where if not populated, is omitted. // +-----------------+--------------------------------------------+---------------------------------------------------+ @@ -139,16 +150,12 @@ func (c *client) Version(ctx context.Context) (containerd.Version, error) { // "ImagePath": "C:\\\\control\\\\windowsfilter\\\\65bf96e5760a09edf1790cb229e2dfb2dbd0fcdc0bf7451bae099106bfbfea0c\\\\UtilityVM" // }, // } -func (c *client) Create(_ context.Context, id string, spec *specs.Spec, shim string, runtimeOptions interface{}, opts ...containerd.NewContainerOpts) error { - if ctr := c.getContainer(id); ctr != nil { - return errors.WithStack(errdefs.Conflict(errors.New("id already in use"))) - } - +func (c *client) NewContainer(_ context.Context, id string, spec *specs.Spec, shim string, runtimeOptions interface{}, opts ...containerd.NewContainerOpts) (libcontainerdtypes.Container, error) { var err error if spec.Linux != nil { - return errors.New("linux containers are not supported on this platform") + return nil, errors.New("linux containers are not supported on this platform") } - err = c.createWindows(id, spec, runtimeOptions) + ctr, err := c.createWindows(id, spec, runtimeOptions) if err == nil { c.eventQ.Append(id, func() { @@ -168,10 +175,10 @@ func (c *client) Create(_ context.Context, id string, spec *specs.Spec, shim str } }) } - return err + return ctr, err } -func (c *client) createWindows(id string, spec *specs.Spec, runtimeOptions interface{}) error { +func (c *client) createWindows(id string, spec *specs.Spec, runtimeOptions interface{}) (*container, error) { logger := c.logger.WithField("container", id) configuration := &hcsshim.ContainerConfig{ SystemType: "Container", @@ -215,7 +222,7 @@ func (c *client) createWindows(id string, spec *specs.Spec, runtimeOptions inter // We must have least two layers in the spec, the bottom one being a // base image, the top one being the RW layer. if spec.Windows.LayerFolders == nil || len(spec.Windows.LayerFolders) < 2 { - return fmt.Errorf("OCI spec is invalid - at least two LayerFolders must be supplied to the runtime") + return nil, fmt.Errorf("OCI spec is invalid - at least two LayerFolders must be supplied to the runtime") } // Strip off the top-most layer as that's passed in separately to HCS @@ -226,7 +233,7 @@ func (c *client) createWindows(id string, spec *specs.Spec, runtimeOptions inter // We don't currently support setting the utility VM image explicitly. // TODO circa RS5, this may be re-locatable. if spec.Windows.HyperV.UtilityVMPath != "" { - return errors.New("runtime does not support an explicit utility VM path for Hyper-V containers") + return nil, errors.New("runtime does not support an explicit utility VM path for Hyper-V containers") } // Find the upper-most utility VM image. @@ -239,35 +246,35 @@ func (c *client) createWindows(id string, spec *specs.Spec, runtimeOptions inter break } if !os.IsNotExist(err) { - return err + return nil, err } } if uvmImagePath == "" { - return errors.New("utility VM image could not be found") + return nil, errors.New("utility VM image could not be found") } configuration.HvRuntime = &hcsshim.HvRuntime{ImagePath: uvmImagePath} if spec.Root.Path != "" { - return errors.New("OCI spec is invalid - Root.Path must be omitted for a Hyper-V container") + return nil, errors.New("OCI spec is invalid - Root.Path must be omitted for a Hyper-V container") } } else { const volumeGUIDRegex = `^\\\\\?\\(Volume)\{{0,1}[0-9a-fA-F]{8}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{12}(\}){0,1}\}\\$` if _, err := regexp.MatchString(volumeGUIDRegex, spec.Root.Path); err != nil { - return fmt.Errorf(`OCI spec is invalid - Root.Path '%s' must be a volume GUID path in the format '\\?\Volume{GUID}\'`, spec.Root.Path) + return nil, fmt.Errorf(`OCI spec is invalid - Root.Path '%s' must be a volume GUID path in the format '\\?\Volume{GUID}\'`, spec.Root.Path) } // HCS API requires the trailing backslash to be removed configuration.VolumePath = spec.Root.Path[:len(spec.Root.Path)-1] } if spec.Root.Readonly { - return errors.New(`OCI spec is invalid - Root.Readonly must not be set on Windows`) + return nil, errors.New(`OCI spec is invalid - Root.Readonly must not be set on Windows`) } for _, layerPath := range layerFolders { _, filename := filepath.Split(layerPath) g, err := hcsshim.NameToGuid(filename) if err != nil { - return err + return nil, err } configuration.Layers = append(configuration.Layers, hcsshim.Layer{ ID: g.ToString(), @@ -281,7 +288,7 @@ func (c *client) createWindows(id string, spec *specs.Spec, runtimeOptions inter for _, mount := range spec.Mounts { const pipePrefix = `\\.\pipe\` if mount.Type != "" { - return fmt.Errorf("OCI spec is invalid - Mount.Type '%s' must not be set", mount.Type) + return nil, fmt.Errorf("OCI spec is invalid - Mount.Type '%s' must not be set", mount.Type) } if strings.HasPrefix(mount.Destination, pipePrefix) { mp := hcsshim.MappedPipe{ @@ -309,13 +316,13 @@ func (c *client) createWindows(id string, spec *specs.Spec, runtimeOptions inter if len(spec.Windows.Devices) > 0 { // Add any device assignments if configuration.HvPartition { - return errors.New("device assignment is not supported for HyperV containers") + return nil, errors.New("device assignment is not supported for HyperV containers") } for _, d := range spec.Windows.Devices { // Per https://github.com/microsoft/hcsshim/blob/v0.9.2/internal/uvm/virtual_device.go#L17-L18, // these represent an Interface Class GUID. if d.IDType != "class" && d.IDType != "vpci-class-guid" { - return errors.Errorf("device assignment of type '%s' is not supported", d.IDType) + return nil, errors.Errorf("device assignment of type '%s' is not supported", d.IDType) } configuration.AssignedDevices = append(configuration.AssignedDevices, hcsshim.AssignedDevice{InterfaceClassGUID: d.ID}) } @@ -323,38 +330,32 @@ func (c *client) createWindows(id string, spec *specs.Spec, runtimeOptions inter hcsContainer, err := hcsshim.CreateContainer(id, configuration) if err != nil { - return err + return nil, err } // Construct a container object for calling start on it. ctr := &container{ + client: c, id: id, - execs: make(map[string]*process), ociSpec: spec, hcsContainer: hcsContainer, - status: containerd.Created, - waitCh: make(chan struct{}), } logger.Debug("starting container") - if err = hcsContainer.Start(); err != nil { - c.logger.WithError(err).Error("failed to start container") - ctr.Lock() - if err := c.terminateContainer(ctr); err != nil { - c.logger.WithError(err).Error("failed to cleanup after a failed Start") + if err := ctr.hcsContainer.Start(); err != nil { + logger.WithError(err).Error("failed to start container") + ctr.mu.Lock() + if err := ctr.terminateContainer(); err != nil { + logger.WithError(err).Error("failed to cleanup after a failed Start") } else { - c.logger.Debug("cleaned up after failed Start by calling Terminate") + logger.Debug("cleaned up after failed Start by calling Terminate") } - ctr.Unlock() - return err + ctr.mu.Unlock() + return nil, err } - c.Lock() - c.containers[id] = ctr - c.Unlock() - logger.Debug("createWindows() completed successfully") - return nil + return ctr, nil } @@ -388,16 +389,18 @@ func (c *client) extractResourcesFromSpec(spec *specs.Spec, configuration *hcssh } } -func (c *client) Start(_ context.Context, id, _ string, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (int, error) { - ctr := c.getContainer(id) +func (ctr *container) Start(_ context.Context, _ string, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (libcontainerdtypes.Task, error) { + ctr.mu.Lock() + defer ctr.mu.Unlock() + switch { - case ctr == nil: - return -1, errors.WithStack(errdefs.NotFound(errors.New("no such container"))) - case ctr.init != nil: - return -1, errors.WithStack(errdefs.NotModified(errors.New("container already started"))) + case ctr.ociSpec == nil: + return nil, errors.WithStack(errdefs.NotImplemented(errors.New("a restored container cannot be started"))) + case ctr.task != nil: + return nil, errors.WithStack(errdefs.NotModified(containerderrdefs.ErrAlreadyExists)) } - logger := c.logger.WithField("container", id) + logger := ctr.client.logger.WithField("container", ctr.id) // Note we always tell HCS to create stdout as it's required // regardless of '-i' or '-t' options, so that docker can always grab @@ -435,32 +438,13 @@ func (c *client) Start(_ context.Context, id, _ string, withStdin bool, attachSt createProcessParms.User = ctr.ociSpec.Process.User.Username - ctr.Lock() - // Start the command running in the container. newProcess, err := ctr.hcsContainer.CreateProcess(createProcessParms) if err != nil { logger.WithError(err).Error("CreateProcess() failed") - // Fix for https://github.com/moby/moby/issues/38719. - // If the init process failed to launch, we still need to reap the - // container to avoid leaking it. - // - // Note we use the explicit exit code of 127 which is the - // Linux shell equivalent of "command not found". Windows cannot - // know ahead of time whether or not the command exists, especially - // in the case of Hyper-V containers. - ctr.Unlock() - exitedAt := time.Now() - p := &process{ - id: libcontainerdtypes.InitProcessName, - pid: 0, - } - c.reapContainer(ctr, p, 127, exitedAt, nil, logger) - return -1, err + return nil, err } - defer ctr.Unlock() - defer func() { if err != nil { if err := newProcess.Kill(); err != nil { @@ -476,55 +460,69 @@ func (c *client) Start(_ context.Context, id, _ string, withStdin bool, attachSt }() } }() - p := &process{ - hcsProcess: newProcess, + t := &task{process: process{ id: libcontainerdtypes.InitProcessName, - pid: newProcess.Pid(), - } - logger.WithField("pid", p.pid).Debug("init process started") + ctr: ctr, + hcsProcess: newProcess, + waitCh: make(chan struct{}), + }} + pid := t.Pid() + logger.WithField("pid", pid).Debug("init process started") - ctr.status = containerd.Running - ctr.init = p - - // Spin up a go routine waiting for exit to handle cleanup - go c.reapProcess(ctr, p) + // Spin up a goroutine to notify the backend and clean up resources when + // the task exits. Defer until after the start event is sent so that the + // exit event is not sent out-of-order. + defer func() { go t.reap() }() // Don't shadow err here due to our deferred clean-up. var dio *cio.DirectIO dio, err = newIOFromProcess(newProcess, ctr.ociSpec.Process.Terminal) if err != nil { logger.WithError(err).Error("failed to get stdio pipes") - return -1, err + return nil, err } _, err = attachStdio(dio) if err != nil { logger.WithError(err).Error("failed to attach stdio") - return -1, err + return nil, err } + // All fallible operations have succeeded so it is now safe to set the + // container's current task. + ctr.task = t + // Generate the associated event - c.eventQ.Append(id, func() { + ctr.client.eventQ.Append(ctr.id, func() { ei := libcontainerdtypes.EventInfo{ - ContainerID: id, + ContainerID: ctr.id, ProcessID: libcontainerdtypes.InitProcessName, - Pid: uint32(p.pid), + Pid: pid, } - c.logger.WithFields(logrus.Fields{ + ctr.client.logger.WithFields(logrus.Fields{ "container": ctr.id, "event": libcontainerdtypes.EventStart, "event-info": ei, }).Info("sending event") - err := c.backend.ProcessEvent(ei.ContainerID, libcontainerdtypes.EventStart, ei) + err := ctr.client.backend.ProcessEvent(ei.ContainerID, libcontainerdtypes.EventStart, ei) if err != nil { - c.logger.WithError(err).WithFields(logrus.Fields{ - "container": id, + ctr.client.logger.WithError(err).WithFields(logrus.Fields{ + "container": ei.ContainerID, "event": libcontainerdtypes.EventStart, "event-info": ei, }).Error("failed to process event") } }) logger.Debug("start() completed") - return p.pid, nil + return t, nil +} + +func (ctr *container) Task(context.Context) (libcontainerdtypes.Task, error) { + ctr.mu.Lock() + defer ctr.mu.Unlock() + if ctr.task == nil { + return nil, errdefs.NotFound(containerderrdefs.ErrNotFound) + } + return ctr.task, nil } // setCommandLineAndArgs configures the HCS ProcessConfig based on an OCI process spec @@ -554,19 +552,18 @@ func newIOFromProcess(newProcess hcsshim.Process, terminal bool) (*cio.DirectIO, return dio, nil } -// Exec adds a process in an running container -func (c *client) Exec(ctx context.Context, containerID, processID string, spec *specs.Process, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (int, error) { - ctr := c.getContainer(containerID) - switch { - case ctr == nil: - return -1, errors.WithStack(errdefs.NotFound(errors.New("no such container"))) - case ctr.hcsContainer == nil: - return -1, errors.WithStack(errdefs.InvalidParameter(errors.New("container is not running"))) - case ctr.execs != nil && ctr.execs[processID] != nil: - return -1, errors.WithStack(errdefs.Conflict(errors.New("id already in use"))) +// Exec launches a process in a running container. +// +// The processID argument is entirely informational. As there is no mechanism +// (exposed through the libcontainerd interfaces) to enumerate or reference an +// exec'd process by ID, uniqueness is not currently enforced. +func (t *task) Exec(ctx context.Context, processID string, spec *specs.Process, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (libcontainerdtypes.Process, error) { + hcsContainer, err := t.getHCSContainer() + if err != nil { + return nil, err } - logger := c.logger.WithFields(logrus.Fields{ - "container": containerID, + logger := t.ctr.client.logger.WithFields(logrus.Fields{ + "container": t.ctr.id, "exec": processID, }) @@ -593,7 +590,7 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec * if spec.Cwd != "" { createProcessParms.WorkingDirectory = spec.Cwd } else { - createProcessParms.WorkingDirectory = ctr.ociSpec.Process.Cwd + createProcessParms.WorkingDirectory = t.ctr.ociSpec.Process.Cwd } // Configure the environment for the process @@ -606,10 +603,10 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec * createProcessParms.User = spec.User.Username // Start the command running in the container. - newProcess, err := ctr.hcsContainer.CreateProcess(createProcessParms) + newProcess, err := hcsContainer.CreateProcess(createProcessParms) if err != nil { logger.WithError(err).Errorf("exec's CreateProcess() failed") - return -1, err + return nil, err } pid := newProcess.Pid() defer func() { @@ -631,163 +628,180 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec * dio, err := newIOFromProcess(newProcess, spec.Terminal) if err != nil { logger.WithError(err).Error("failed to get stdio pipes") - return -1, err + return nil, err } // Tell the engine to attach streams back to the client _, err = attachStdio(dio) if err != nil { - return -1, err + return nil, err } p := &process{ id: processID, - pid: pid, + ctr: t.ctr, hcsProcess: newProcess, + waitCh: make(chan struct{}), } - // Add the process to the container's list of processes - ctr.Lock() - ctr.execs[processID] = p - ctr.Unlock() + // Spin up a goroutine to notify the backend and clean up resources when + // the process exits. Defer until after the start event is sent so that + // the exit event is not sent out-of-order. + defer func() { go p.reap() }() - // Spin up a go routine waiting for exit to handle cleanup - go c.reapProcess(ctr, p) - - c.eventQ.Append(ctr.id, func() { + t.ctr.client.eventQ.Append(t.ctr.id, func() { ei := libcontainerdtypes.EventInfo{ - ContainerID: ctr.id, + ContainerID: t.ctr.id, ProcessID: p.id, - Pid: uint32(p.pid), + Pid: uint32(pid), } - c.logger.WithFields(logrus.Fields{ - "container": ctr.id, + t.ctr.client.logger.WithFields(logrus.Fields{ + "container": t.ctr.id, "event": libcontainerdtypes.EventExecAdded, "event-info": ei, }).Info("sending event") - err := c.backend.ProcessEvent(ctr.id, libcontainerdtypes.EventExecAdded, ei) + err := t.ctr.client.backend.ProcessEvent(t.ctr.id, libcontainerdtypes.EventExecAdded, ei) if err != nil { - c.logger.WithError(err).WithFields(logrus.Fields{ - "container": ctr.id, + t.ctr.client.logger.WithError(err).WithFields(logrus.Fields{ + "container": t.ctr.id, "event": libcontainerdtypes.EventExecAdded, "event-info": ei, }).Error("failed to process event") } - err = c.backend.ProcessEvent(ctr.id, libcontainerdtypes.EventExecStarted, ei) + err = t.ctr.client.backend.ProcessEvent(t.ctr.id, libcontainerdtypes.EventExecStarted, ei) if err != nil { - c.logger.WithError(err).WithFields(logrus.Fields{ - "container": ctr.id, + t.ctr.client.logger.WithError(err).WithFields(logrus.Fields{ + "container": t.ctr.id, "event": libcontainerdtypes.EventExecStarted, "event-info": ei, }).Error("failed to process event") } }) - return pid, nil + return p, nil } -// SignalProcess handles `docker stop` on Windows. While Linux has support for +func (p *process) Pid() uint32 { + p.mu.Lock() + hcsProcess := p.hcsProcess + p.mu.Unlock() + if hcsProcess == nil { + return 0 + } + return uint32(hcsProcess.Pid()) +} + +func (p *process) Kill(_ context.Context, signal syscall.Signal) error { + p.mu.Lock() + hcsProcess := p.hcsProcess + p.mu.Unlock() + if hcsProcess == nil { + return errors.WithStack(errdefs.NotFound(errors.New("process not found"))) + } + return hcsProcess.Kill() +} + +// Kill handles `docker stop` on Windows. While Linux has support for // the full range of signals, signals aren't really implemented on Windows. // We fake supporting regular stop and -9 to force kill. -func (c *client) SignalProcess(_ context.Context, containerID, processID string, signal syscall.Signal) error { - ctr, p, err := c.getProcess(containerID, processID) +func (t *task) Kill(_ context.Context, signal syscall.Signal) error { + hcsContainer, err := t.getHCSContainer() if err != nil { return err } - logger := c.logger.WithFields(logrus.Fields{ - "container": containerID, - "process": processID, - "pid": p.pid, + logger := t.ctr.client.logger.WithFields(logrus.Fields{ + "container": t.ctr.id, + "process": t.id, + "pid": t.Pid(), "signal": signal, }) logger.Debug("Signal()") - if processID == libcontainerdtypes.InitProcessName { - if syscall.Signal(signal) == syscall.SIGKILL { - // Terminate the compute system - ctr.Lock() - ctr.terminateInvoked = true - if err := ctr.hcsContainer.Terminate(); err != nil { - if !hcsshim.IsPending(err) { - logger.WithError(err).Error("failed to terminate hccshim container") - } - } - ctr.Unlock() - } else { - // Shut down the container - if err := ctr.hcsContainer.Shutdown(); err != nil { - if !hcsshim.IsPending(err) && !hcsshim.IsAlreadyStopped(err) { - // ignore errors - logger.WithError(err).Error("failed to shutdown hccshim container") - } - } - } + var op string + if signal == syscall.SIGKILL { + // Terminate the compute system + t.ctr.mu.Lock() + t.ctr.terminateInvoked = true + t.ctr.mu.Unlock() + op, err = "terminate", hcsContainer.Terminate() } else { - return p.hcsProcess.Kill() + // Shut down the container + op, err = "shutdown", hcsContainer.Shutdown() + } + if err != nil { + if !hcsshim.IsPending(err) && !hcsshim.IsAlreadyStopped(err) { + // ignore errors + logger.WithError(err).Errorf("failed to %s hccshim container", op) + } } return nil } -// ResizeTerminal handles a CLI event to resize an interactive docker run or docker +// Resize handles a CLI event to resize an interactive docker run or docker // exec window. -func (c *client) ResizeTerminal(_ context.Context, containerID, processID string, width, height int) error { - _, p, err := c.getProcess(containerID, processID) - if err != nil { - return err +func (p *process) Resize(_ context.Context, width, height uint32) error { + p.mu.Lock() + hcsProcess := p.hcsProcess + p.mu.Unlock() + if hcsProcess == nil { + return errors.WithStack(errdefs.NotFound(errors.New("process not found"))) } - c.logger.WithFields(logrus.Fields{ - "container": containerID, - "process": processID, + p.ctr.client.logger.WithFields(logrus.Fields{ + "container": p.ctr.id, + "process": p.id, "height": height, "width": width, - "pid": p.pid, + "pid": hcsProcess.Pid(), }).Debug("resizing") - return p.hcsProcess.ResizeConsole(uint16(width), uint16(height)) + return hcsProcess.ResizeConsole(uint16(width), uint16(height)) } -func (c *client) CloseStdin(_ context.Context, containerID, processID string) error { - _, p, err := c.getProcess(containerID, processID) - if err != nil { - return err +func (p *process) CloseStdin(context.Context) error { + p.mu.Lock() + hcsProcess := p.hcsProcess + p.mu.Unlock() + if hcsProcess == nil { + return errors.WithStack(errdefs.NotFound(errors.New("process not found"))) } - return p.hcsProcess.CloseStdin() + return hcsProcess.CloseStdin() } // Pause handles pause requests for containers -func (c *client) Pause(_ context.Context, containerID string) error { - ctr, _, err := c.getProcess(containerID, libcontainerdtypes.InitProcessName) - if err != nil { - return err - } - - if ctr.ociSpec.Windows.HyperV == nil { +func (t *task) Pause(_ context.Context) error { + if t.ctr.ociSpec.Windows.HyperV == nil { return containerderrdefs.ErrNotImplemented } - ctr.Lock() - defer ctr.Unlock() + t.ctr.mu.Lock() + defer t.ctr.mu.Unlock() - if err = ctr.hcsContainer.Pause(); err != nil { + if err := t.assertIsCurrentTask(); err != nil { + return err + } + if t.ctr.hcsContainer == nil { + return errdefs.NotFound(errors.WithStack(fmt.Errorf("container %q not found", t.ctr.id))) + } + if err := t.ctr.hcsContainer.Pause(); err != nil { return err } - ctr.status = containerd.Paused + t.ctr.isPaused = true - c.eventQ.Append(containerID, func() { - err := c.backend.ProcessEvent(containerID, libcontainerdtypes.EventPaused, libcontainerdtypes.EventInfo{ - ContainerID: containerID, + t.ctr.client.eventQ.Append(t.ctr.id, func() { + err := t.ctr.client.backend.ProcessEvent(t.ctr.id, libcontainerdtypes.EventPaused, libcontainerdtypes.EventInfo{ + ContainerID: t.ctr.id, ProcessID: libcontainerdtypes.InitProcessName, }) - c.logger.WithFields(logrus.Fields{ - "container": ctr.id, + t.ctr.client.logger.WithFields(logrus.Fields{ + "container": t.ctr.id, "event": libcontainerdtypes.EventPaused, }).Info("sending event") if err != nil { - c.logger.WithError(err).WithFields(logrus.Fields{ - "container": containerID, + t.ctr.client.logger.WithError(err).WithFields(logrus.Fields{ + "container": t.ctr.id, "event": libcontainerdtypes.EventPaused, }).Error("failed to process event") } @@ -797,37 +811,38 @@ func (c *client) Pause(_ context.Context, containerID string) error { } // Resume handles resume requests for containers -func (c *client) Resume(_ context.Context, containerID string) error { - ctr, _, err := c.getProcess(containerID, libcontainerdtypes.InitProcessName) - if err != nil { - return err - } - - if ctr.ociSpec.Windows.HyperV == nil { +func (t *task) Resume(ctx context.Context) error { + if t.ctr.ociSpec.Windows.HyperV == nil { return errors.New("cannot resume Windows Server Containers") } - ctr.Lock() - defer ctr.Unlock() + t.ctr.mu.Lock() + defer t.ctr.mu.Unlock() - if err = ctr.hcsContainer.Resume(); err != nil { + if err := t.assertIsCurrentTask(); err != nil { + return err + } + if t.ctr.hcsContainer == nil { + return errdefs.NotFound(errors.WithStack(fmt.Errorf("container %q not found", t.ctr.id))) + } + if err := t.ctr.hcsContainer.Resume(); err != nil { return err } - ctr.status = containerd.Running + t.ctr.isPaused = false - c.eventQ.Append(containerID, func() { - err := c.backend.ProcessEvent(containerID, libcontainerdtypes.EventResumed, libcontainerdtypes.EventInfo{ - ContainerID: containerID, + t.ctr.client.eventQ.Append(t.ctr.id, func() { + err := t.ctr.client.backend.ProcessEvent(t.ctr.id, libcontainerdtypes.EventResumed, libcontainerdtypes.EventInfo{ + ContainerID: t.ctr.id, ProcessID: libcontainerdtypes.InitProcessName, }) - c.logger.WithFields(logrus.Fields{ - "container": ctr.id, + t.ctr.client.logger.WithFields(logrus.Fields{ + "container": t.ctr.id, "event": libcontainerdtypes.EventResumed, }).Info("sending event") if err != nil { - c.logger.WithError(err).WithFields(logrus.Fields{ - "container": containerID, + t.ctr.client.logger.WithError(err).WithFields(logrus.Fields{ + "container": t.ctr.id, "event": libcontainerdtypes.EventResumed, }).Error("failed to process event") } @@ -837,14 +852,14 @@ func (c *client) Resume(_ context.Context, containerID string) error { } // Stats handles stats requests for containers -func (c *client) Stats(_ context.Context, containerID string) (*libcontainerdtypes.Stats, error) { - ctr, _, err := c.getProcess(containerID, libcontainerdtypes.InitProcessName) +func (t *task) Stats(_ context.Context) (*libcontainerdtypes.Stats, error) { + hc, err := t.getHCSContainer() if err != nil { return nil, err } readAt := time.Now() - s, err := ctr.hcsContainer.Statistics() + s, err := hc.Statistics() if err != nil { return nil, err } @@ -854,9 +869,9 @@ func (c *client) Stats(_ context.Context, containerID string) (*libcontainerdtyp }, nil } -// Restore is the handler for restoring a container -func (c *client) Restore(ctx context.Context, id string, attachStdio libcontainerdtypes.StdioCallback) (bool, int, libcontainerdtypes.Process, error) { - c.logger.WithField("container", id).Debug("restore()") +// LoadContainer is the handler for restoring a container +func (c *client) LoadContainer(ctx context.Context, id string) (libcontainerdtypes.Container, error) { + c.logger.WithField("container", id).Debug("LoadContainer()") // TODO Windows: On RS1, a re-attach isn't possible. // However, there is a scenario in which there is an issue. @@ -865,30 +880,40 @@ func (c *client) Restore(ctx context.Context, id string, attachStdio libcontaine // For consistence, we call in to shoot it regardless if HCS knows about it // We explicitly just log a warning if the terminate fails. // Then we tell the backend the container exited. - if hc, err := hcsshim.OpenContainer(id); err == nil { - const terminateTimeout = time.Minute * 2 - err := hc.Terminate() - - if hcsshim.IsPending(err) { - err = hc.WaitTimeout(terminateTimeout) - } else if hcsshim.IsAlreadyStopped(err) { - err = nil - } - - if err != nil { - c.logger.WithField("container", id).WithError(err).Debug("terminate failed on restore") - return false, -1, nil, err - } + hc, err := hcsshim.OpenContainer(id) + if err != nil { + return nil, errdefs.NotFound(errors.New("container not found")) } - return false, -1, &restoredProcess{ - c: c, - id: id, + const terminateTimeout = time.Minute * 2 + err = hc.Terminate() + + if hcsshim.IsPending(err) { + err = hc.WaitTimeout(terminateTimeout) + } else if hcsshim.IsAlreadyStopped(err) { + err = nil + } + + if err != nil { + c.logger.WithField("container", id).WithError(err).Debug("terminate failed on restore") + return nil, err + } + return &container{ + client: c, + hcsContainer: hc, + id: id, }, nil } -// ListPids returns a list of process IDs running in a container. It is not +// AttachTask is only called by the daemon when restoring containers. As +// re-attach isn't possible (see LoadContainer), a NotFound error is +// unconditionally returned to allow restore to make progress. +func (*container) AttachTask(context.Context, libcontainerdtypes.StdioCallback) (libcontainerdtypes.Task, error) { + return nil, errdefs.NotFound(containerderrdefs.ErrNotImplemented) +} + +// Pids returns a list of process IDs running in a container. It is not // implemented on Windows. -func (c *client) ListPids(_ context.Context, _ string) ([]uint32, error) { +func (t *task) Pids(context.Context) ([]containerd.ProcessInfo, error) { return nil, errors.New("not implemented on Windows") } @@ -898,13 +923,13 @@ func (c *client) ListPids(_ context.Context, _ string) ([]uint32, error) { // the containers could be Hyper-V containers, they would not be // visible on the container host. However, libcontainerd does have // that information. -func (c *client) Summary(_ context.Context, containerID string) ([]libcontainerdtypes.Summary, error) { - ctr, _, err := c.getProcess(containerID, libcontainerdtypes.InitProcessName) +func (t *task) Summary(_ context.Context) ([]libcontainerdtypes.Summary, error) { + hc, err := t.getHCSContainer() if err != nil { return nil, err } - p, err := ctr.hcsContainer.ProcessList() + p, err := hc.ProcessList() if err != nil { return nil, err } @@ -926,118 +951,114 @@ func (c *client) Summary(_ context.Context, containerID string) ([]libcontainerd return pl, nil } -type restoredProcess struct { - id string - c *client +func (p *process) Delete(ctx context.Context) (*containerd.ExitStatus, error) { + select { + case <-ctx.Done(): + return nil, errors.WithStack(ctx.Err()) + case <-p.waitCh: + default: + return nil, errdefs.Conflict(errors.New("process is running")) + } + return p.exited, nil } -func (p *restoredProcess) Delete(ctx context.Context) (uint32, time.Time, error) { - return p.c.DeleteTask(ctx, p.id) +func (t *task) Delete(ctx context.Context) (*containerd.ExitStatus, error) { + select { + case <-ctx.Done(): + return nil, errors.WithStack(ctx.Err()) + case <-t.waitCh: + default: + return nil, errdefs.Conflict(errors.New("container is not stopped")) + } + + t.ctr.mu.Lock() + defer t.ctr.mu.Unlock() + if err := t.assertIsCurrentTask(); err != nil { + return nil, err + } + t.ctr.task = nil + return t.exited, nil } -func (c *client) DeleteTask(ctx context.Context, containerID string) (uint32, time.Time, error) { - ec := -1 - ctr := c.getContainer(containerID) - if ctr == nil { - return uint32(ec), time.Now(), errors.WithStack(errdefs.NotFound(errors.New("no such container"))) +func (t *task) ForceDelete(ctx context.Context) error { + select { + case <-t.waitCh: // Task is already stopped. + _, err := t.Delete(ctx) + return err + default: + } + + if err := t.Kill(ctx, syscall.SIGKILL); err != nil { + return errors.Wrap(err, "could not force-kill task") } select { case <-ctx.Done(): - return uint32(ec), time.Now(), errors.WithStack(ctx.Err()) - case <-ctr.waitCh: + return ctx.Err() + case <-t.waitCh: + _, err := t.Delete(ctx) + return err + } +} + +func (t *task) Status(ctx context.Context) (containerd.Status, error) { + select { + case <-t.waitCh: + return containerd.Status{ + Status: containerd.Stopped, + ExitStatus: t.exited.ExitCode(), + ExitTime: t.exited.ExitTime(), + }, nil default: - return uint32(ec), time.Now(), errors.New("container is not stopped") } - ctr.Lock() - defer ctr.Unlock() - return ctr.exitCode, ctr.exitedAt, nil + t.ctr.mu.Lock() + defer t.ctr.mu.Unlock() + s := containerd.Running + if t.ctr.isPaused { + s = containerd.Paused + } + return containerd.Status{Status: s}, nil } -func (c *client) Delete(_ context.Context, containerID string) error { - c.Lock() - defer c.Unlock() - ctr := c.containers[containerID] - if ctr == nil { - return errors.WithStack(errdefs.NotFound(errors.New("no such container"))) - } - - ctr.Lock() - defer ctr.Unlock() - - switch ctr.status { - case containerd.Created: - if err := c.shutdownContainer(ctr); err != nil { - return err - } - fallthrough - case containerd.Stopped: - delete(c.containers, containerID) - return nil - } - - return errors.WithStack(errdefs.InvalidParameter(errors.New("container is not stopped"))) -} - -func (c *client) Status(ctx context.Context, containerID string) (containerd.ProcessStatus, error) { - c.Lock() - defer c.Unlock() - ctr := c.containers[containerID] - if ctr == nil { - return containerd.Unknown, errors.WithStack(errdefs.NotFound(errors.New("no such container"))) - } - - ctr.Lock() - defer ctr.Unlock() - return ctr.status, nil -} - -func (c *client) UpdateResources(ctx context.Context, containerID string, resources *libcontainerdtypes.Resources) error { +func (*task) UpdateResources(ctx context.Context, resources *libcontainerdtypes.Resources) error { // Updating resource isn't supported on Windows // but we should return nil for enabling updating container return nil } -func (c *client) CreateCheckpoint(ctx context.Context, containerID, checkpointDir string, exit bool) error { +func (*task) CreateCheckpoint(ctx context.Context, checkpointDir string, exit bool) error { return errors.New("Windows: Containers do not support checkpoints") } -func (c *client) getContainer(id string) *container { - c.Lock() - ctr := c.containers[id] - c.Unlock() - - return ctr +// assertIsCurrentTask returns a non-nil error if the task has been deleted. +func (t *task) assertIsCurrentTask() error { + if t.ctr.task != t { + return errors.WithStack(errdefs.NotFound(fmt.Errorf("task %q not found", t.id))) + } + return nil } -func (c *client) getProcess(containerID, processID string) (*container, *process, error) { - ctr := c.getContainer(containerID) - switch { - case ctr == nil: - return nil, nil, errors.WithStack(errdefs.NotFound(errors.New("no such container"))) - case ctr.init == nil: - return nil, nil, errors.WithStack(errdefs.NotFound(errors.New("container is not running"))) - case processID == libcontainerdtypes.InitProcessName: - return ctr, ctr.init, nil - default: - ctr.Lock() - defer ctr.Unlock() - if ctr.execs == nil { - return nil, nil, errors.WithStack(errdefs.NotFound(errors.New("no execs"))) - } +// getHCSContainer returns a reference to the hcsshim Container for the task's +// container if neither the task nor container have been deleted. +// +// t.ctr.mu must not be locked by the calling goroutine when calling this +// function. +func (t *task) getHCSContainer() (hcsshim.Container, error) { + t.ctr.mu.Lock() + defer t.ctr.mu.Unlock() + if err := t.assertIsCurrentTask(); err != nil { + return nil, err } - - p := ctr.execs[processID] - if p == nil { - return nil, nil, errors.WithStack(errdefs.NotFound(errors.New("no such exec"))) + hc := t.ctr.hcsContainer + if hc == nil { + return nil, errors.WithStack(errdefs.NotFound(fmt.Errorf("container %q not found", t.ctr.id))) } - - return ctr, p, nil + return hc, nil } // ctr mutex must be held when calling this function. -func (c *client) shutdownContainer(ctr *container) error { +func (ctr *container) shutdownContainer() error { var err error const waitTimeout = time.Minute * 5 @@ -1052,11 +1073,11 @@ func (c *client) shutdownContainer(ctr *container) error { } if err != nil { - c.logger.WithError(err).WithField("container", ctr.id). + ctr.client.logger.WithError(err).WithField("container", ctr.id). Debug("failed to shutdown container, terminating it") - terminateErr := c.terminateContainer(ctr) + terminateErr := ctr.terminateContainer() if terminateErr != nil { - c.logger.WithError(terminateErr).WithField("container", ctr.id). + ctr.client.logger.WithError(terminateErr).WithField("container", ctr.id). Error("failed to shutdown container, and subsequent terminate also failed") return fmt.Errorf("%s: subsequent terminate failed %s", err, terminateErr) } @@ -1067,7 +1088,7 @@ func (c *client) shutdownContainer(ctr *container) error { } // ctr mutex must be held when calling this function. -func (c *client) terminateContainer(ctr *container) error { +func (ctr *container) terminateContainer() error { const terminateTimeout = time.Minute * 5 ctr.terminateInvoked = true err := ctr.hcsContainer.Terminate() @@ -1079,7 +1100,7 @@ func (c *client) terminateContainer(ctr *container) error { } if err != nil { - c.logger.WithError(err).WithField("container", ctr.id). + ctr.client.logger.WithError(err).WithField("container", ctr.id). Debug("failed to terminate container") return err } @@ -1087,9 +1108,9 @@ func (c *client) terminateContainer(ctr *container) error { return nil } -func (c *client) reapProcess(ctr *container, p *process) int { - logger := c.logger.WithFields(logrus.Fields{ - "container": ctr.id, +func (p *process) reap() { + logger := p.ctr.client.logger.WithFields(logrus.Fields{ + "container": p.ctr.id, "process": p.id, }) @@ -1100,10 +1121,9 @@ func (c *client) reapProcess(ctr *container, p *process) int { if herr, ok := err.(*hcsshim.ProcessError); ok && herr.Err != windows.ERROR_BROKEN_PIPE { logger.WithError(err).Warnf("Wait() failed (container may have been killed)") } - // Fall through here, do not return. This ensures we attempt to - // continue the shutdown in HCS and tell the docker engine that the - // process/container has exited to avoid a container being dropped on - // the floor. + // Fall through here, do not return. This ensures we tell the + // docker engine that the process/container has exited to avoid + // a container being dropped on the floor. } exitedAt := time.Now() @@ -1116,87 +1136,88 @@ func (c *client) reapProcess(ctr *container, p *process) int { // code we return doesn't incorrectly indicate success. exitCode = -1 - // Fall through here, do not return. This ensures we attempt to - // continue the shutdown in HCS and tell the docker engine that the - // process/container has exited to avoid a container being dropped on - // the floor. + // Fall through here, do not return. This ensures we tell the + // docker engine that the process/container has exited to avoid + // a container being dropped on the floor. } - if err := p.hcsProcess.Close(); err != nil { + p.mu.Lock() + hcsProcess := p.hcsProcess + p.hcsProcess = nil + p.mu.Unlock() + + if err := hcsProcess.Close(); err != nil { logger.WithError(err).Warnf("failed to cleanup hcs process resources") exitCode = -1 eventErr = fmt.Errorf("hcsProcess.Close() failed %s", err) } - if p.id == libcontainerdtypes.InitProcessName { - exitCode, eventErr = c.reapContainer(ctr, p, exitCode, exitedAt, eventErr, logger) - } + // Explicit locking is not required as reads from exited are + // synchronized using waitCh. + p.exited = containerd.NewExitStatus(uint32(exitCode), exitedAt, nil) + close(p.waitCh) - c.eventQ.Append(ctr.id, func() { + p.ctr.client.eventQ.Append(p.ctr.id, func() { ei := libcontainerdtypes.EventInfo{ - ContainerID: ctr.id, + ContainerID: p.ctr.id, ProcessID: p.id, - Pid: uint32(p.pid), + Pid: uint32(hcsProcess.Pid()), ExitCode: uint32(exitCode), ExitedAt: exitedAt, Error: eventErr, } - c.logger.WithFields(logrus.Fields{ - "container": ctr.id, + p.ctr.client.logger.WithFields(logrus.Fields{ + "container": p.ctr.id, "event": libcontainerdtypes.EventExit, "event-info": ei, }).Info("sending event") - err := c.backend.ProcessEvent(ctr.id, libcontainerdtypes.EventExit, ei) + err := p.ctr.client.backend.ProcessEvent(p.ctr.id, libcontainerdtypes.EventExit, ei) if err != nil { - c.logger.WithError(err).WithFields(logrus.Fields{ - "container": ctr.id, + p.ctr.client.logger.WithError(err).WithFields(logrus.Fields{ + "container": p.ctr.id, "event": libcontainerdtypes.EventExit, "event-info": ei, }).Error("failed to process event") } - if p.id != libcontainerdtypes.InitProcessName { - ctr.Lock() - delete(ctr.execs, p.id) - ctr.Unlock() - } }) - - return exitCode } -// reapContainer shuts down the container and releases associated resources. It returns -// the error to be logged in the eventInfo sent back to the monitor. -func (c *client) reapContainer(ctr *container, p *process, exitCode int, exitedAt time.Time, eventErr error, logger *logrus.Entry) (int, error) { - // Update container status - ctr.Lock() - ctr.status = containerd.Stopped - ctr.exitedAt = exitedAt - ctr.exitCode = uint32(exitCode) - close(ctr.waitCh) +func (ctr *container) Delete(context.Context) error { + ctr.mu.Lock() + defer ctr.mu.Unlock() - if err := c.shutdownContainer(ctr); err != nil { - exitCode = -1 - logger.WithError(err).Warn("failed to shutdown container") - thisErr := errors.Wrap(err, "failed to shutdown container") - if eventErr != nil { - eventErr = errors.Wrap(eventErr, thisErr.Error()) - } else { - eventErr = thisErr + if ctr.hcsContainer == nil { + return errors.WithStack(errdefs.NotFound(fmt.Errorf("container %q not found", ctr.id))) + } + + // Check that there is no task currently running. + if ctr.task != nil { + select { + case <-ctr.task.waitCh: + default: + return errors.WithStack(errdefs.Conflict(errors.New("container is not stopped"))) } + } + + var ( + logger = ctr.client.logger.WithFields(logrus.Fields{ + "container": ctr.id, + }) + thisErr error + ) + + if err := ctr.shutdownContainer(); err != nil { + logger.WithError(err).Warn("failed to shutdown container") + thisErr = errors.Wrap(err, "failed to shutdown container") } else { logger.Debug("completed container shutdown") } - ctr.Unlock() if err := ctr.hcsContainer.Close(); err != nil { - exitCode = -1 logger.WithError(err).Error("failed to clean hcs container resources") - thisErr := errors.Wrap(err, "failed to terminate container") - if eventErr != nil { - eventErr = errors.Wrap(eventErr, thisErr.Error()) - } else { - eventErr = thisErr - } + thisErr = errors.Wrap(err, "failed to terminate container") } - return exitCode, eventErr + + ctr.hcsContainer = nil + return thisErr } diff --git a/libcontainerd/local/process_windows.go b/libcontainerd/local/process_windows.go index 6ff9f7e83e..c8164be987 100644 --- a/libcontainerd/local/process_windows.go +++ b/libcontainerd/local/process_windows.go @@ -38,7 +38,3 @@ func createStdInCloser(pipe io.WriteCloser, process hcsshim.Process) io.WriteClo return nil }) } - -func (p *process) Cleanup() error { - return nil -} diff --git a/libcontainerd/remote/client.go b/libcontainerd/remote/client.go index a2b895b328..4af29300ab 100644 --- a/libcontainerd/remote/client.go +++ b/libcontainerd/remote/client.go @@ -45,25 +45,34 @@ type client struct { logger *logrus.Entry ns string - backend libcontainerdtypes.Backend - eventQ queue.Queue - oomMu sync.Mutex - oom map[string]bool - v2runcoptionsMu sync.Mutex - // v2runcoptions is used for copying options specified on Create() to Start() - v2runcoptions map[string]v2runcoptions.Options + backend libcontainerdtypes.Backend + eventQ queue.Queue +} + +type container struct { + client *client + c8dCtr containerd.Container + + v2runcoptions *v2runcoptions.Options +} + +type task struct { + containerd.Task + ctr *container +} + +type process struct { + containerd.Process } // NewClient creates a new libcontainerd client from a containerd client func NewClient(ctx context.Context, cli *containerd.Client, stateDir, ns string, b libcontainerdtypes.Backend) (libcontainerdtypes.Client, error) { c := &client{ - client: cli, - stateDir: stateDir, - logger: logrus.WithField("module", "libcontainerd").WithField("namespace", ns), - ns: ns, - backend: b, - oom: make(map[string]bool), - v2runcoptions: make(map[string]v2runcoptions.Options), + client: cli, + stateDir: stateDir, + logger: logrus.WithField("module", "libcontainerd").WithField("namespace", ns), + ns: ns, + backend: b, } go c.processEventStream(ctx, ns) @@ -75,58 +84,36 @@ func (c *client) Version(ctx context.Context) (containerd.Version, error) { return c.client.Version(ctx) } -// Restore loads the containerd container. -// It should not be called concurrently with any other operation for the given ID. -func (c *client) Restore(ctx context.Context, id string, attachStdio libcontainerdtypes.StdioCallback) (alive bool, pid int, p libcontainerdtypes.Process, err error) { +func (c *container) newTask(t containerd.Task) *task { + return &task{Task: t, ctr: c} +} + +func (c *container) AttachTask(ctx context.Context, attachStdio libcontainerdtypes.StdioCallback) (_ libcontainerdtypes.Task, err error) { var dio *cio.DirectIO defer func() { if err != nil && dio != nil { dio.Cancel() dio.Close() } - err = wrapError(err) }() - ctr, err := c.client.LoadContainer(ctx, id) - if err != nil { - return false, -1, nil, errors.WithStack(wrapError(err)) - } - attachIO := func(fifos *cio.FIFOSet) (cio.IO, error) { // dio must be assigned to the previously defined dio for the defer above // to handle cleanup - dio, err = c.newDirectIO(ctx, fifos) + dio, err = c.client.newDirectIO(ctx, fifos) if err != nil { return nil, err } return attachStdio(dio) } - t, err := ctr.Task(ctx, attachIO) - if err != nil && !containerderrors.IsNotFound(err) { - return false, -1, nil, errors.Wrap(wrapError(err), "error getting containerd task for container") + t, err := c.c8dCtr.Task(ctx, attachIO) + if err != nil { + return nil, errors.Wrap(wrapError(err), "error getting containerd task for container") } - - if t != nil { - s, err := t.Status(ctx) - if err != nil { - return false, -1, nil, errors.Wrap(wrapError(err), "error getting task status") - } - alive = s.Status != containerd.Stopped - pid = int(t.Pid()) - } - - c.logger.WithFields(logrus.Fields{ - "container": id, - "alive": alive, - "pid": pid, - }).Debug("restored container") - - return alive, pid, &restoredProcess{ - p: t, - }, nil + return c.newTask(t), nil } -func (c *client) Create(ctx context.Context, id string, ociSpec *specs.Spec, shim string, runtimeOptions interface{}, opts ...containerd.NewContainerOpts) error { +func (c *client) NewContainer(ctx context.Context, id string, ociSpec *specs.Spec, shim string, runtimeOptions interface{}, opts ...containerd.NewContainerOpts) (libcontainerdtypes.Container, error) { bdir := c.bundleDir(id) c.logger.WithField("bundle", bdir).WithField("root", ociSpec.Root.Path).Debug("bundle dir created") @@ -137,44 +124,43 @@ func (c *client) Create(ctx context.Context, id string, ociSpec *specs.Spec, shi } opts = append(opts, newOpts...) - _, err := c.client.NewContainer(ctx, id, opts...) + ctr, err := c.client.NewContainer(ctx, id, opts...) if err != nil { if containerderrors.IsAlreadyExists(err) { - return errors.WithStack(errdefs.Conflict(errors.New("id already in use"))) + return nil, errors.WithStack(errdefs.Conflict(errors.New("id already in use"))) } - return wrapError(err) + return nil, wrapError(err) + } + + created := container{ + client: c, + c8dCtr: ctr, } if x, ok := runtimeOptions.(*v2runcoptions.Options); ok { - c.v2runcoptionsMu.Lock() - c.v2runcoptions[id] = *x - c.v2runcoptionsMu.Unlock() + created.v2runcoptions = x } - return nil + return &created, nil } // Start create and start a task for the specified containerd id -func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (int, error) { - ctr, err := c.getContainer(ctx, id) - if err != nil { - return -1, err - } +func (c *container) Start(ctx context.Context, checkpointDir string, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (libcontainerdtypes.Task, error) { var ( cp *types.Descriptor t containerd.Task rio cio.IO - stdinCloseSync = make(chan struct{}) + stdinCloseSync = make(chan containerd.Process, 1) ) if checkpointDir != "" { // write checkpoint to the content store tar := archive.Diff(ctx, "", checkpointDir) - cp, err = c.writeContent(ctx, images.MediaTypeContainerd1Checkpoint, checkpointDir, tar) + cp, err := c.client.writeContent(ctx, images.MediaTypeContainerd1Checkpoint, checkpointDir, tar) // remove the checkpoint when we're done defer func() { if cp != nil { - err := c.client.ContentStore().Delete(context.Background(), cp.Digest) + err := c.client.client.ContentStore().Delete(ctx, cp.Digest) if err != nil { - c.logger.WithError(err).WithFields(logrus.Fields{ + c.client.logger.WithError(err).WithFields(logrus.Fields{ "ref": checkpointDir, "digest": cp.Digest, }).Warnf("failed to delete temporary checkpoint entry") @@ -182,23 +168,27 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin } }() if err := tar.Close(); err != nil { - return -1, errors.Wrap(err, "failed to close checkpoint tar stream") + return nil, errors.Wrap(err, "failed to close checkpoint tar stream") } if err != nil { - return -1, errors.Wrapf(err, "failed to upload checkpoint to containerd") + return nil, errors.Wrapf(err, "failed to upload checkpoint to containerd") } } - spec, err := ctr.Spec(ctx) + // Optimization: assume the relevant metadata has not changed in the + // moment since the container was created. Elide redundant RPC requests + // to refresh the metadata separately for spec and labels. + md, err := c.c8dCtr.Info(ctx, containerd.WithoutRefreshedMetadata) if err != nil { - return -1, errors.Wrap(err, "failed to retrieve spec") + return nil, errors.Wrap(err, "failed to retrieve metadata") } - labels, err := ctr.Labels(ctx) - if err != nil { - return -1, errors.Wrap(err, "failed to retrieve labels") + bundle := md.Labels[DockerContainerBundlePath] + + var spec specs.Spec + if err := json.Unmarshal(md.Spec.GetValue(), &spec); err != nil { + return nil, errors.Wrap(err, "failed to retrieve spec") } - bundle := labels[DockerContainerBundlePath] - uid, gid := getSpecUser(spec) + uid, gid := getSpecUser(&spec) taskOpts := []containerd.NewTaskOpts{ func(_ context.Context, _ *containerd.Client, info *containerd.TaskInfo) error { @@ -209,10 +199,8 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin if runtime.GOOS != "windows" { taskOpts = append(taskOpts, func(_ context.Context, _ *containerd.Client, info *containerd.TaskInfo) error { - c.v2runcoptionsMu.Lock() - opts, ok := c.v2runcoptions[id] - c.v2runcoptionsMu.Unlock() - if ok { + if c.v2runcoptions != nil { + opts := *c.v2runcoptions opts.IoUid = uint32(uid) opts.IoGid = uint32(gid) info.Options = &opts @@ -220,14 +208,14 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin return nil }) } else { - taskOpts = append(taskOpts, withLogLevel(c.logger.Level)) + taskOpts = append(taskOpts, withLogLevel(c.client.logger.Level)) } - t, err = ctr.NewTask(ctx, + t, err = c.c8dCtr.NewTask(ctx, func(id string) (cio.IO, error) { fifos := newFIFOSet(bundle, libcontainerdtypes.InitProcessName, withStdin, spec.Process.Terminal) - rio, err = c.createIO(fifos, id, libcontainerdtypes.InitProcessName, stdinCloseSync, attachStdio) + rio, err = c.createIO(fifos, libcontainerdtypes.InitProcessName, stdinCloseSync, attachStdio) return rio, err }, taskOpts..., @@ -238,21 +226,21 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin rio.Cancel() rio.Close() } - return -1, wrapError(err) + return nil, errors.Wrap(wrapError(err), "failed to create task for container") } // Signal c.createIO that it can call CloseIO - close(stdinCloseSync) + stdinCloseSync <- t if err := t.Start(ctx); err != nil { if _, err := t.Delete(ctx); err != nil { - c.logger.WithError(err).WithField("container", id). + c.client.logger.WithError(err).WithField("container", c.c8dCtr.ID()). Error("failed to delete task after fail start") } - return -1, wrapError(err) + return nil, wrapError(err) } - return int(t.Pid()), nil + return c.newTask(t), nil } // Exec creates exec process. @@ -262,31 +250,21 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin // for the container main process, the stdin fifo will be created in Create not // the Start call. stdinCloseSync channel should be closed after Start exec // process. -func (c *client) Exec(ctx context.Context, containerID, processID string, spec *specs.Process, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (int, error) { - ctr, err := c.getContainer(ctx, containerID) - if err != nil { - return -1, err - } - t, err := ctr.Task(ctx, nil) - if err != nil { - if containerderrors.IsNotFound(err) { - return -1, errors.WithStack(errdefs.InvalidParameter(errors.New("container is not running"))) - } - return -1, wrapError(err) - } - +func (t *task) Exec(ctx context.Context, processID string, spec *specs.Process, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (libcontainerdtypes.Process, error) { var ( p containerd.Process rio cio.IO - stdinCloseSync = make(chan struct{}) + stdinCloseSync = make(chan containerd.Process, 1) ) - labels, err := ctr.Labels(ctx) + // Optimization: assume the DockerContainerBundlePath label has not been + // updated since the container metadata was last loaded/refreshed. + md, err := t.ctr.c8dCtr.Info(ctx, containerd.WithoutRefreshedMetadata) if err != nil { - return -1, wrapError(err) + return nil, wrapError(err) } - fifos := newFIFOSet(labels[DockerContainerBundlePath], processID, withStdin, spec.Terminal) + fifos := newFIFOSet(md.Labels[DockerContainerBundlePath], processID, withStdin, spec.Terminal) defer func() { if err != nil { @@ -297,22 +275,22 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec * } }() - p, err = t.Exec(ctx, processID, spec, func(id string) (cio.IO, error) { - rio, err = c.createIO(fifos, containerID, processID, stdinCloseSync, attachStdio) + p, err = t.Task.Exec(ctx, processID, spec, func(id string) (cio.IO, error) { + rio, err = t.ctr.createIO(fifos, processID, stdinCloseSync, attachStdio) return rio, err }) if err != nil { close(stdinCloseSync) if containerderrors.IsAlreadyExists(err) { - return -1, errors.WithStack(errdefs.Conflict(errors.New("id already in use"))) + return nil, errors.WithStack(errdefs.Conflict(errors.New("id already in use"))) } - return -1, wrapError(err) + return nil, wrapError(err) } // Signal c.createIO that it can call CloseIO // // the stdin of exec process will be created after p.Start in containerd - defer close(stdinCloseSync) + defer func() { stdinCloseSync <- p }() if err = p.Start(ctx); err != nil { // use new context for cleanup because old one may be cancelled by user, but leave a timeout to make sure @@ -321,62 +299,29 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec * ctx, cancel := context.WithTimeout(context.Background(), 45*time.Second) defer cancel() p.Delete(ctx) - return -1, wrapError(err) + return nil, wrapError(err) } - return int(p.Pid()), nil + return process{p}, nil } -func (c *client) SignalProcess(ctx context.Context, containerID, processID string, signal syscall.Signal) error { - p, err := c.getProcess(ctx, containerID, processID) - if err != nil { - return err - } - return wrapError(p.Kill(ctx, signal)) +func (t *task) Kill(ctx context.Context, signal syscall.Signal) error { + return wrapError(t.Task.Kill(ctx, signal)) } -func (c *client) ResizeTerminal(ctx context.Context, containerID, processID string, width, height int) error { - p, err := c.getProcess(ctx, containerID, processID) - if err != nil { - return err - } - - return p.Resize(ctx, uint32(width), uint32(height)) +func (p process) Kill(ctx context.Context, signal syscall.Signal) error { + return wrapError(p.Process.Kill(ctx, signal)) } -func (c *client) CloseStdin(ctx context.Context, containerID, processID string) error { - p, err := c.getProcess(ctx, containerID, processID) - if err != nil { - return err - } - - return p.CloseIO(ctx, containerd.WithStdinCloser) +func (t *task) Pause(ctx context.Context) error { + return wrapError(t.Task.Pause(ctx)) } -func (c *client) Pause(ctx context.Context, containerID string) error { - p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName) - if err != nil { - return err - } - - return wrapError(p.(containerd.Task).Pause(ctx)) +func (t *task) Resume(ctx context.Context) error { + return wrapError(t.Task.Resume(ctx)) } -func (c *client) Resume(ctx context.Context, containerID string) error { - p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName) - if err != nil { - return err - } - - return p.(containerd.Task).Resume(ctx) -} - -func (c *client) Stats(ctx context.Context, containerID string) (*libcontainerdtypes.Stats, error) { - p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName) - if err != nil { - return nil, err - } - - m, err := p.(containerd.Task).Metrics(ctx) +func (t *task) Stats(ctx context.Context) (*libcontainerdtypes.Stats, error) { + m, err := t.Metrics(ctx) if err != nil { return nil, err } @@ -388,32 +333,8 @@ func (c *client) Stats(ctx context.Context, containerID string) (*libcontainerdt return libcontainerdtypes.InterfaceToStats(m.Timestamp, v), nil } -func (c *client) ListPids(ctx context.Context, containerID string) ([]uint32, error) { - p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName) - if err != nil { - return nil, err - } - - pis, err := p.(containerd.Task).Pids(ctx) - if err != nil { - return nil, err - } - - var pids []uint32 - for _, i := range pis { - pids = append(pids, i.Pid) - } - - return pids, nil -} - -func (c *client) Summary(ctx context.Context, containerID string) ([]libcontainerdtypes.Summary, error) { - p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName) - if err != nil { - return nil, err - } - - pis, err := p.(containerd.Task).Pids(ctx) +func (t *task) Summary(ctx context.Context) ([]libcontainerdtypes.Summary, error) { + pis, err := t.Pids(ctx) if err != nil { return nil, err } @@ -434,57 +355,31 @@ func (c *client) Summary(ctx context.Context, containerID string) ([]libcontaine return infos, nil } -type restoredProcess struct { - p containerd.Process +func (t *task) Delete(ctx context.Context) (*containerd.ExitStatus, error) { + s, err := t.Task.Delete(ctx) + return s, wrapError(err) } -func (p *restoredProcess) Delete(ctx context.Context) (uint32, time.Time, error) { - if p.p == nil { - return 255, time.Now(), nil - } - status, err := p.p.Delete(ctx) - if err != nil { - return 255, time.Now(), nil - } - return status.ExitCode(), status.ExitTime(), nil +func (p process) Delete(ctx context.Context) (*containerd.ExitStatus, error) { + s, err := p.Process.Delete(ctx) + return s, wrapError(err) } -func (c *client) DeleteTask(ctx context.Context, containerID string) (uint32, time.Time, error) { - p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName) - if err != nil { - return 255, time.Now(), nil - } - - status, err := p.Delete(ctx) - if err != nil { - return 255, time.Now(), nil - } - return status.ExitCode(), status.ExitTime(), nil -} - -func (c *client) Delete(ctx context.Context, containerID string) error { - ctr, err := c.getContainer(ctx, containerID) +func (c *container) Delete(ctx context.Context) error { + // Optimization: assume the DockerContainerBundlePath label has not been + // updated since the container metadata was last loaded/refreshed. + md, err := c.c8dCtr.Info(ctx, containerd.WithoutRefreshedMetadata) if err != nil { return err } - labels, err := ctr.Labels(ctx) - if err != nil { - return err - } - bundle := labels[DockerContainerBundlePath] - if err := ctr.Delete(ctx); err != nil { + bundle := md.Labels[DockerContainerBundlePath] + if err := c.c8dCtr.Delete(ctx); err != nil { return wrapError(err) } - c.oomMu.Lock() - delete(c.oom, containerID) - c.oomMu.Unlock() - c.v2runcoptionsMu.Lock() - delete(c.v2runcoptions, containerID) - c.v2runcoptionsMu.Unlock() if os.Getenv("LIBCONTAINERD_NOCLEAN") != "1" { if err := os.RemoveAll(bundle); err != nil { - c.logger.WithError(err).WithFields(logrus.Fields{ - "container": containerID, + c.client.logger.WithContext(ctx).WithError(err).WithFields(logrus.Fields{ + "container": c.c8dCtr.ID(), "bundle": bundle, }).Error("failed to remove state dir") } @@ -492,28 +387,25 @@ func (c *client) Delete(ctx context.Context, containerID string) error { return nil } -func (c *client) Status(ctx context.Context, containerID string) (containerd.ProcessStatus, error) { - t, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName) - if err != nil { - return containerd.Unknown, err - } - s, err := t.Status(ctx) - if err != nil { - return containerd.Unknown, wrapError(err) - } - return s.Status, nil +func (t *task) ForceDelete(ctx context.Context) error { + _, err := t.Task.Delete(ctx, containerd.WithProcessKill) + return wrapError(err) } -func (c *client) getCheckpointOptions(id string, exit bool) containerd.CheckpointTaskOpts { +func (t *task) Status(ctx context.Context) (containerd.Status, error) { + s, err := t.Task.Status(ctx) + return s, wrapError(err) +} + +func (p process) Status(ctx context.Context) (containerd.Status, error) { + s, err := p.Process.Status(ctx) + return s, wrapError(err) +} + +func (c *container) getCheckpointOptions(exit bool) containerd.CheckpointTaskOpts { return func(r *containerd.CheckpointTaskInfo) error { - if r.Options == nil { - c.v2runcoptionsMu.Lock() - _, ok := c.v2runcoptions[id] - c.v2runcoptionsMu.Unlock() - if ok { - r.Options = &v2runcoptions.CheckpointOptions{Exit: exit} - } - return nil + if r.Options == nil && c.v2runcoptions != nil { + r.Options = &v2runcoptions.CheckpointOptions{} } switch opts := r.Options.(type) { @@ -525,27 +417,21 @@ func (c *client) getCheckpointOptions(id string, exit bool) containerd.Checkpoin } } -func (c *client) CreateCheckpoint(ctx context.Context, containerID, checkpointDir string, exit bool) error { - p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName) - if err != nil { - return err - } - - opts := []containerd.CheckpointTaskOpts{c.getCheckpointOptions(containerID, exit)} - img, err := p.(containerd.Task).Checkpoint(ctx, opts...) +func (t *task) CreateCheckpoint(ctx context.Context, checkpointDir string, exit bool) error { + img, err := t.Task.Checkpoint(ctx, t.ctr.getCheckpointOptions(exit)) if err != nil { return wrapError(err) } // Whatever happens, delete the checkpoint from containerd defer func() { - err := c.client.ImageService().Delete(context.Background(), img.Name()) + err := t.ctr.client.client.ImageService().Delete(ctx, img.Name()) if err != nil { - c.logger.WithError(err).WithField("digest", img.Target().Digest). + t.ctr.client.logger.WithError(err).WithField("digest", img.Target().Digest). Warnf("failed to delete checkpoint image") } }() - b, err := content.ReadBlob(ctx, c.client.ContentStore(), img.Target()) + b, err := content.ReadBlob(ctx, t.ctr.client.client.ContentStore(), img.Target()) if err != nil { return errdefs.System(errors.Wrapf(err, "failed to retrieve checkpoint data")) } @@ -566,7 +452,7 @@ func (c *client) CreateCheckpoint(ctx context.Context, containerID, checkpointDi return errdefs.System(errors.Wrapf(err, "invalid checkpoint")) } - rat, err := c.client.ContentStore().ReaderAt(ctx, *cpDesc) + rat, err := t.ctr.client.client.ContentStore().ReaderAt(ctx, *cpDesc) if err != nil { return errdefs.System(errors.Wrapf(err, "failed to get checkpoint reader")) } @@ -579,7 +465,8 @@ func (c *client) CreateCheckpoint(ctx context.Context, containerID, checkpointDi return err } -func (c *client) getContainer(ctx context.Context, id string) (containerd.Container, error) { +// LoadContainer loads the containerd container. +func (c *client) LoadContainer(ctx context.Context, id string) (libcontainerdtypes.Container, error) { ctr, err := c.client.LoadContainer(ctx, id) if err != nil { if containerderrors.IsNotFound(err) { @@ -587,42 +474,25 @@ func (c *client) getContainer(ctx context.Context, id string) (containerd.Contai } return nil, wrapError(err) } - return ctr, nil + return &container{client: c, c8dCtr: ctr}, nil } -func (c *client) getProcess(ctx context.Context, containerID, processID string) (containerd.Process, error) { - ctr, err := c.getContainer(ctx, containerID) +func (c *container) Task(ctx context.Context) (libcontainerdtypes.Task, error) { + t, err := c.c8dCtr.Task(ctx, nil) if err != nil { - return nil, err - } - t, err := ctr.Task(ctx, nil) - if err != nil { - if containerderrors.IsNotFound(err) { - return nil, errors.WithStack(errdefs.NotFound(errors.New("container is not running"))) - } return nil, wrapError(err) } - if processID == libcontainerdtypes.InitProcessName { - return t, nil - } - p, err := t.LoadProcess(ctx, processID, nil) - if err != nil { - if containerderrors.IsNotFound(err) { - return nil, errors.WithStack(errdefs.NotFound(errors.New("no such exec"))) - } - return nil, wrapError(err) - } - return p, nil + return c.newTask(t), nil } // createIO creates the io to be used by a process // This needs to get a pointer to interface as upon closure the process may not have yet been registered -func (c *client) createIO(fifos *cio.FIFOSet, containerID, processID string, stdinCloseSync chan struct{}, attachStdio libcontainerdtypes.StdioCallback) (cio.IO, error) { +func (c *container) createIO(fifos *cio.FIFOSet, processID string, stdinCloseSync chan containerd.Process, attachStdio libcontainerdtypes.StdioCallback) (cio.IO, error) { var ( io *cio.DirectIO err error ) - io, err = c.newDirectIO(context.Background(), fifos) + io, err = c.client.newDirectIO(context.Background(), fifos) if err != nil { return nil, err } @@ -639,13 +509,13 @@ func (c *client) createIO(fifos *cio.FIFOSet, containerID, processID string, std // Do the rest in a new routine to avoid a deadlock if the // Exec/Start call failed. go func() { - <-stdinCloseSync - p, err := c.getProcess(context.Background(), containerID, processID) - if err == nil { - err = p.CloseIO(context.Background(), containerd.WithStdinCloser) - if err != nil && strings.Contains(err.Error(), "transport is closing") { - err = nil - } + p, ok := <-stdinCloseSync + if !ok { + return + } + err = p.CloseIO(context.Background(), containerd.WithStdinCloser) + if err != nil && strings.Contains(err.Error(), "transport is closing") { + err = nil } }() }) @@ -665,51 +535,12 @@ func (c *client) processEvent(ctx context.Context, et libcontainerdtypes.EventTy c.eventQ.Append(ei.ContainerID, func() { err := c.backend.ProcessEvent(ei.ContainerID, et, ei) if err != nil { - c.logger.WithError(err).WithFields(logrus.Fields{ + c.logger.WithContext(ctx).WithError(err).WithFields(logrus.Fields{ "container": ei.ContainerID, "event": et, "event-info": ei, }).Error("failed to process event") } - - if et == libcontainerdtypes.EventExit && ei.ProcessID != ei.ContainerID { - p, err := c.getProcess(ctx, ei.ContainerID, ei.ProcessID) - if err != nil { - - c.logger.WithError(errors.New("no such process")). - WithFields(logrus.Fields{ - "error": err, - "container": ei.ContainerID, - "process": ei.ProcessID, - }).Error("exit event") - return - } - - ctr, err := c.getContainer(ctx, ei.ContainerID) - if err != nil { - c.logger.WithFields(logrus.Fields{ - "container": ei.ContainerID, - "error": err, - }).Error("failed to find container") - } else { - labels, err := ctr.Labels(ctx) - if err != nil { - c.logger.WithFields(logrus.Fields{ - "container": ei.ContainerID, - "error": err, - }).Error("failed to get container labels") - return - } - newFIFOSet(labels[DockerContainerBundlePath], ei.ProcessID, true, false).Close() - } - _, err = p.Delete(context.Background()) - if err != nil { - c.logger.WithError(err).WithFields(logrus.Fields{ - "container": ei.ContainerID, - "process": ei.ProcessID, - }).Warn("failed to delete process") - } - } }) } @@ -767,7 +598,6 @@ func (c *client) processEventStream(ctx context.Context, ns string) { c.logger.Debug("processing event stream") for { - var oomKilled bool select { case err = <-errC: if err != nil { @@ -825,9 +655,7 @@ func (c *client) processEventStream(ctx context.Context, ns string) { et = libcontainerdtypes.EventOOM ei = libcontainerdtypes.EventInfo{ ContainerID: t.ContainerID, - OOMKilled: true, } - oomKilled = true case *apievents.TaskExecAdded: et = libcontainerdtypes.EventExecAdded ei = libcontainerdtypes.EventInfo{ @@ -866,13 +694,6 @@ func (c *client) processEventStream(ctx context.Context, ns string) { continue } - c.oomMu.Lock() - if oomKilled { - c.oom[ei.ContainerID] = true - } - ei.OOMKilled = c.oom[ei.ContainerID] - c.oomMu.Unlock() - c.processEvent(ctx, et, ei) } } diff --git a/libcontainerd/remote/client_linux.go b/libcontainerd/remote/client_linux.go index e45d140b2f..dd7aee8fe8 100644 --- a/libcontainerd/remote/client_linux.go +++ b/libcontainerd/remote/client_linux.go @@ -20,15 +20,10 @@ func summaryFromInterface(i interface{}) (*libcontainerdtypes.Summary, error) { return &libcontainerdtypes.Summary{}, nil } -func (c *client) UpdateResources(ctx context.Context, containerID string, resources *libcontainerdtypes.Resources) error { - p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName) - if err != nil { - return err - } - +func (t *task) UpdateResources(ctx context.Context, resources *libcontainerdtypes.Resources) error { // go doesn't like the alias in 1.8, this means this need to be // platform specific - return p.(containerd.Task).Update(ctx, containerd.WithResources((*specs.LinuxResources)(resources))) + return t.Update(ctx, containerd.WithResources((*specs.LinuxResources)(resources))) } func hostIDFromMap(id uint32, mp []specs.LinuxIDMapping) int { diff --git a/libcontainerd/remote/client_windows.go b/libcontainerd/remote/client_windows.go index 87fb0e119d..4591124430 100644 --- a/libcontainerd/remote/client_windows.go +++ b/libcontainerd/remote/client_windows.go @@ -87,7 +87,7 @@ func (c *client) newDirectIO(ctx context.Context, fifos *cio.FIFOSet) (*cio.Dire return cio.NewDirectIOFromFIFOSet(ctx, pipes.stdin, pipes.stdout, pipes.stderr, fifos), nil } -func (c *client) UpdateResources(ctx context.Context, containerID string, resources *libcontainerdtypes.Resources) error { +func (t *task) UpdateResources(ctx context.Context, resources *libcontainerdtypes.Resources) error { // TODO: (containerd): Not implemented, but don't error. return nil } diff --git a/libcontainerd/replace.go b/libcontainerd/replace.go new file mode 100644 index 0000000000..6ef6141e98 --- /dev/null +++ b/libcontainerd/replace.go @@ -0,0 +1,62 @@ +package libcontainerd // import "github.com/docker/docker/libcontainerd" + +import ( + "context" + + "github.com/containerd/containerd" + "github.com/opencontainers/runtime-spec/specs-go" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + + "github.com/docker/docker/errdefs" + "github.com/docker/docker/libcontainerd/types" +) + +// ReplaceContainer creates a new container, replacing any existing container +// with the same id if necessary. +func ReplaceContainer(ctx context.Context, client types.Client, id string, spec *specs.Spec, shim string, runtimeOptions interface{}, opts ...containerd.NewContainerOpts) (types.Container, error) { + newContainer := func() (types.Container, error) { + return client.NewContainer(ctx, id, spec, shim, runtimeOptions, opts...) + } + ctr, err := newContainer() + if err == nil || !errdefs.IsConflict(err) { + return ctr, err + } + + log := logrus.WithContext(ctx).WithField("container", id) + log.Debug("A container already exists with the same ID. Attempting to clean up the old container.") + ctr, err = client.LoadContainer(ctx, id) + if err != nil { + if errdefs.IsNotFound(err) { + // Task failed successfully: the container no longer exists, + // despite us not doing anything. May as well try to create + // the container again. It might succeed. + return newContainer() + } + return nil, errors.Wrap(err, "could not load stale containerd container object") + } + tsk, err := ctr.Task(ctx) + if err != nil { + if errdefs.IsNotFound(err) { + goto deleteContainer + } + // There is no point in trying to delete the container if we + // cannot determine whether or not it has a task. The containerd + // client would just try to load the task itself, get the same + // error, and give up. + return nil, errors.Wrap(err, "could not load stale containerd task object") + } + if err := tsk.ForceDelete(ctx); err != nil { + if !errdefs.IsNotFound(err) { + return nil, errors.Wrap(err, "could not delete stale containerd task object") + } + // The task might have exited on its own. Proceed with + // attempting to delete the container. + } +deleteContainer: + if err := ctr.Delete(ctx); err != nil && !errdefs.IsNotFound(err) { + return nil, errors.Wrap(err, "could not delete stale containerd container object") + } + + return newContainer() +} diff --git a/libcontainerd/types/types.go b/libcontainerd/types/types.go index 71082f7661..673b184c03 100644 --- a/libcontainerd/types/types.go +++ b/libcontainerd/types/types.go @@ -33,7 +33,6 @@ type EventInfo struct { Pid uint32 ExitCode uint32 ExitedAt time.Time - OOMKilled bool Error error } @@ -44,32 +43,58 @@ type Backend interface { // Process of a container type Process interface { - Delete(context.Context) (uint32, time.Time, error) + // Pid is the system specific process id + Pid() uint32 + // Kill sends the provided signal to the process + Kill(ctx context.Context, signal syscall.Signal) error + // Resize changes the width and height of the process's terminal + Resize(ctx context.Context, width, height uint32) error + // Delete removes the process and any resources allocated returning the exit status + Delete(context.Context) (*containerd.ExitStatus, error) } // Client provides access to containerd features. type Client interface { Version(ctx context.Context) (containerd.Version, error) + // LoadContainer loads the metadata for a container from containerd. + LoadContainer(ctx context.Context, containerID string) (Container, error) + // NewContainer creates a new containerd container. + NewContainer(ctx context.Context, containerID string, spec *specs.Spec, shim string, runtimeOptions interface{}, opts ...containerd.NewContainerOpts) (Container, error) +} - Restore(ctx context.Context, containerID string, attachStdio StdioCallback) (alive bool, pid int, p Process, err error) +// Container provides access to a containerd container. +type Container interface { + Start(ctx context.Context, checkpointDir string, withStdin bool, attachStdio StdioCallback) (Task, error) + Task(ctx context.Context) (Task, error) + // AttachTask returns the current task for the container and reattaches + // to the IO for the running task. If no task exists for the container + // a NotFound error is returned. + // + // Clients must make sure that only one reader is attached to the task. + AttachTask(ctx context.Context, attachStdio StdioCallback) (Task, error) + // Delete removes the container and associated resources + Delete(context.Context) error +} - Create(ctx context.Context, containerID string, spec *specs.Spec, shim string, runtimeOptions interface{}, opts ...containerd.NewContainerOpts) error - Start(ctx context.Context, containerID, checkpointDir string, withStdin bool, attachStdio StdioCallback) (pid int, err error) - SignalProcess(ctx context.Context, containerID, processID string, signal syscall.Signal) error - Exec(ctx context.Context, containerID, processID string, spec *specs.Process, withStdin bool, attachStdio StdioCallback) (int, error) - ResizeTerminal(ctx context.Context, containerID, processID string, width, height int) error - CloseStdin(ctx context.Context, containerID, processID string) error - Pause(ctx context.Context, containerID string) error - Resume(ctx context.Context, containerID string) error - Stats(ctx context.Context, containerID string) (*Stats, error) - ListPids(ctx context.Context, containerID string) ([]uint32, error) - Summary(ctx context.Context, containerID string) ([]Summary, error) - DeleteTask(ctx context.Context, containerID string) (uint32, time.Time, error) - Delete(ctx context.Context, containerID string) error - Status(ctx context.Context, containerID string) (containerd.ProcessStatus, error) - - UpdateResources(ctx context.Context, containerID string, resources *Resources) error - CreateCheckpoint(ctx context.Context, containerID, checkpointDir string, exit bool) error +// Task provides access to a running containerd container. +type Task interface { + Process + // Pause suspends the execution of the task + Pause(context.Context) error + // Resume the execution of the task + Resume(context.Context) error + Stats(ctx context.Context) (*Stats, error) + // Pids returns a list of system specific process ids inside the task + Pids(context.Context) ([]containerd.ProcessInfo, error) + Summary(ctx context.Context) ([]Summary, error) + // ForceDelete forcefully kills the task's processes and deletes the task + ForceDelete(context.Context) error + // Status returns the executing status of the task + Status(ctx context.Context) (containerd.Status, error) + // Exec creates and starts a new process inside the task + Exec(ctx context.Context, processID string, spec *specs.Process, withStdin bool, attachStdio StdioCallback) (Process, error) + UpdateResources(ctx context.Context, resources *Resources) error + CreateCheckpoint(ctx context.Context, checkpointDir string, exit bool) error } // StdioCallback is called to connect a container or process stdio. diff --git a/plugin/executor/containerd/containerd.go b/plugin/executor/containerd/containerd.go index 92983056d0..0327e65dc4 100644 --- a/plugin/executor/containerd/containerd.go +++ b/plugin/executor/containerd/containerd.go @@ -2,6 +2,7 @@ package containerd // import "github.com/docker/docker/plugin/executor/container import ( "context" + "fmt" "io" "sync" "syscall" @@ -28,6 +29,7 @@ func New(ctx context.Context, rootDir string, cli *containerd.Client, ns string, rootDir: rootDir, exitHandler: exitHandler, runtime: runtime, + plugins: make(map[string]*c8dPlugin), } client, err := libcontainerd.NewClient(ctx, cli, rootDir, ns, e) @@ -44,77 +46,112 @@ type Executor struct { client libcontainerdtypes.Client exitHandler ExitHandler runtime types.Runtime + + mu sync.Mutex // Guards plugins map + plugins map[string]*c8dPlugin +} + +type c8dPlugin struct { + log *logrus.Entry + ctr libcontainerdtypes.Container + tsk libcontainerdtypes.Task } // deleteTaskAndContainer deletes plugin task and then plugin container from containerd -func deleteTaskAndContainer(ctx context.Context, cli libcontainerdtypes.Client, id string, p libcontainerdtypes.Process) { - if p != nil { - if _, _, err := p.Delete(ctx); err != nil && !errdefs.IsNotFound(err) { - logrus.WithError(err).WithField("id", id).Error("failed to delete plugin task from containerd") - } - } else { - if _, _, err := cli.DeleteTask(ctx, id); err != nil && !errdefs.IsNotFound(err) { - logrus.WithError(err).WithField("id", id).Error("failed to delete plugin task from containerd") +func (p c8dPlugin) deleteTaskAndContainer(ctx context.Context) { + if p.tsk != nil { + if _, err := p.tsk.Delete(ctx); err != nil && !errdefs.IsNotFound(err) { + p.log.WithError(err).Error("failed to delete plugin task from containerd") } } - - if err := cli.Delete(ctx, id); err != nil && !errdefs.IsNotFound(err) { - logrus.WithError(err).WithField("id", id).Error("failed to delete plugin container from containerd") + if p.ctr != nil { + if err := p.ctr.Delete(ctx); err != nil && !errdefs.IsNotFound(err) { + p.log.WithError(err).Error("failed to delete plugin container from containerd") + } } } // Create creates a new container func (e *Executor) Create(id string, spec specs.Spec, stdout, stderr io.WriteCloser) error { ctx := context.Background() - err := e.client.Create(ctx, id, &spec, e.runtime.Shim.Binary, e.runtime.Shim.Opts) + log := logrus.WithField("plugin", id) + ctr, err := libcontainerd.ReplaceContainer(ctx, e.client, id, &spec, e.runtime.Shim.Binary, e.runtime.Shim.Opts) if err != nil { - status, err2 := e.client.Status(ctx, id) - if err2 != nil { - if !errdefs.IsNotFound(err2) { - logrus.WithError(err2).WithField("id", id).Warn("Received an error while attempting to read plugin status") - } - } else { - if status != containerd.Running && status != containerd.Unknown { - if err2 := e.client.Delete(ctx, id); err2 != nil && !errdefs.IsNotFound(err2) { - logrus.WithError(err2).WithField("plugin", id).Error("Error cleaning up containerd container") - } - err = e.client.Create(ctx, id, &spec, e.runtime.Shim.Binary, e.runtime.Shim.Opts) - } - } - - if err != nil { - return errors.Wrap(err, "error creating containerd container") - } + return errors.Wrap(err, "error creating containerd container for plugin") } - _, err = e.client.Start(ctx, id, "", false, attachStreamsFunc(stdout, stderr)) + p := c8dPlugin{log: log, ctr: ctr} + p.tsk, err = ctr.Start(ctx, "", false, attachStreamsFunc(stdout, stderr)) if err != nil { - deleteTaskAndContainer(ctx, e.client, id, nil) + p.deleteTaskAndContainer(ctx) + return err } - return err + e.mu.Lock() + defer e.mu.Unlock() + e.plugins[id] = &p + return nil } // Restore restores a container func (e *Executor) Restore(id string, stdout, stderr io.WriteCloser) (bool, error) { - alive, _, p, err := e.client.Restore(context.Background(), id, attachStreamsFunc(stdout, stderr)) - if err != nil && !errdefs.IsNotFound(err) { + ctx := context.Background() + p := c8dPlugin{log: logrus.WithField("plugin", id)} + ctr, err := e.client.LoadContainer(ctx, id) + if err != nil { + if errdefs.IsNotFound(err) { + return false, nil + } return false, err } - if !alive { - deleteTaskAndContainer(context.Background(), e.client, id, p) + p.tsk, err = ctr.AttachTask(ctx, attachStreamsFunc(stdout, stderr)) + if err != nil { + if errdefs.IsNotFound(err) { + p.deleteTaskAndContainer(ctx) + return false, nil + } + return false, err } - return alive, nil + s, err := p.tsk.Status(ctx) + if err != nil { + if errdefs.IsNotFound(err) { + // Task vanished after attaching? + p.tsk = nil + p.deleteTaskAndContainer(ctx) + return false, nil + } + return false, err + } + if s.Status == containerd.Stopped { + p.deleteTaskAndContainer(ctx) + return false, nil + } + e.mu.Lock() + defer e.mu.Unlock() + e.plugins[id] = &p + return true, nil } // IsRunning returns if the container with the given id is running func (e *Executor) IsRunning(id string) (bool, error) { - status, err := e.client.Status(context.Background(), id) - return status == containerd.Running, err + e.mu.Lock() + p := e.plugins[id] + e.mu.Unlock() + if p == nil { + return false, errdefs.NotFound(fmt.Errorf("unknown plugin %q", id)) + } + status, err := p.tsk.Status(context.Background()) + return status.Status == containerd.Running, err } // Signal sends the specified signal to the container func (e *Executor) Signal(id string, signal syscall.Signal) error { - return e.client.SignalProcess(context.Background(), id, libcontainerdtypes.InitProcessName, signal) + e.mu.Lock() + p := e.plugins[id] + e.mu.Unlock() + if p == nil { + return errdefs.NotFound(fmt.Errorf("unknown plugin %q", id)) + } + return p.tsk.Kill(context.Background(), signal) } // ProcessEvent handles events from containerd @@ -122,7 +159,14 @@ func (e *Executor) Signal(id string, signal syscall.Signal) error { func (e *Executor) ProcessEvent(id string, et libcontainerdtypes.EventType, ei libcontainerdtypes.EventInfo) error { switch et { case libcontainerdtypes.EventExit: - deleteTaskAndContainer(context.Background(), e.client, id, nil) + e.mu.Lock() + p := e.plugins[id] + e.mu.Unlock() + if p == nil { + logrus.WithField("id", id).Warn("Received exit event for an unknown plugin") + } else { + p.deleteTaskAndContainer(context.Background()) + } return e.exitHandler.HandleExitEvent(ei.ContainerID) } return nil