diff --git a/daemon/daemon.go b/daemon/daemon.go index 842248e113..e6a1fb1456 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -325,16 +325,17 @@ func (daemon *Daemon) restore() error { alive bool ec uint32 exitedAt time.Time + process libcontainerdtypes.Process ) - alive, _, err = daemon.containerd.Restore(context.Background(), c.ID, c.InitializeStdio) + alive, _, process, err = daemon.containerd.Restore(context.Background(), c.ID, c.InitializeStdio) if err != nil && !errdefs.IsNotFound(err) { logrus.Errorf("Failed to restore container %s with containerd: %s", c.ID, err) return } - if !alive { - ec, exitedAt, err = daemon.containerd.DeleteTask(context.Background(), c.ID) - if err != nil && !errdefs.IsNotFound(err) { + if !alive && process != nil { + ec, exitedAt, err = process.Delete(context.Background()) + if err != nil { logrus.WithError(err).Errorf("Failed to delete container %s from containerd", c.ID) return } diff --git a/daemon/util_test.go b/daemon/util_test.go index 020ab51b20..ce293bae12 100644 --- a/daemon/util_test.go +++ b/daemon/util_test.go @@ -11,6 +11,13 @@ import ( specs "github.com/opencontainers/runtime-spec/specs-go" ) +type mockProcess struct { +} + +func (m *mockProcess) Delete(_ context.Context) (uint32, time.Time, error) { + return 0, time.Time{}, nil +} + // Mock containerd client implementation, for unit tests. type MockContainerdClient struct { } @@ -18,8 +25,8 @@ type MockContainerdClient struct { func (c *MockContainerdClient) Version(ctx context.Context) (containerd.Version, error) { return containerd.Version{}, nil } -func (c *MockContainerdClient) Restore(ctx context.Context, containerID string, attachStdio libcontainerdtypes.StdioCallback) (alive bool, pid int, err error) { - return false, 0, nil +func (c *MockContainerdClient) Restore(ctx context.Context, containerID string, attachStdio libcontainerdtypes.StdioCallback) (alive bool, pid int, p libcontainerdtypes.Process, err error) { + return false, 0, &mockProcess{}, nil } func (c *MockContainerdClient) Create(ctx context.Context, containerID string, spec *specs.Spec, runtimeOptions interface{}) error { return nil diff --git a/libcontainerd/local/local_windows.go b/libcontainerd/local/local_windows.go index afc6590179..95c41b445d 100644 --- a/libcontainerd/local/local_windows.go +++ b/libcontainerd/local/local_windows.go @@ -1085,7 +1085,7 @@ func (c *client) Stats(_ context.Context, containerID string) (*libcontainerdtyp } // Restore is the handler for restoring a container -func (c *client) Restore(ctx context.Context, id string, attachStdio libcontainerdtypes.StdioCallback) (bool, int, error) { +func (c *client) Restore(ctx context.Context, id string, attachStdio libcontainerdtypes.StdioCallback) (bool, int, libcontainerdtypes.Process, error) { c.logger.WithField("container", id).Debug("restore()") // TODO Windows: On RS1, a re-attach isn't possible. @@ -1107,10 +1107,13 @@ func (c *client) Restore(ctx context.Context, id string, attachStdio libcontaine if err != nil { c.logger.WithField("container", id).WithError(err).Debug("terminate failed on restore") - return false, -1, err + return false, -1, nil, err } } - return false, -1, nil + return false, -1, &restoredProcess{ + c: c, + id: id, + }, nil } // GetPidsForContainer returns a list of process IDs running in a container. @@ -1153,6 +1156,15 @@ func (c *client) Summary(_ context.Context, containerID string) ([]libcontainerd return pl, nil } +type restoredProcess struct { + id string + c *client +} + +func (p *restoredProcess) Delete(ctx context.Context) (uint32, time.Time, error) { + return p.c.DeleteTask(ctx, p.id) +} + func (c *client) DeleteTask(ctx context.Context, containerID string) (uint32, time.Time, error) { ec := -1 ctr := c.getContainer(containerID) diff --git a/libcontainerd/remote/client.go b/libcontainerd/remote/client.go index 11a9ebd2cb..07389e3f17 100644 --- a/libcontainerd/remote/client.go +++ b/libcontainerd/remote/client.go @@ -38,86 +38,30 @@ import ( "google.golang.org/grpc/status" ) -type container struct { - mu sync.Mutex - - bundleDir string - ctr containerd.Container - task containerd.Task - execs map[string]containerd.Process - oomKilled bool -} - -func (c *container) setTask(t containerd.Task) { - c.mu.Lock() - c.task = t - c.mu.Unlock() -} - -func (c *container) getTask() containerd.Task { - c.mu.Lock() - t := c.task - c.mu.Unlock() - return t -} - -func (c *container) addProcess(id string, p containerd.Process) { - c.mu.Lock() - if c.execs == nil { - c.execs = make(map[string]containerd.Process) - } - c.execs[id] = p - c.mu.Unlock() -} - -func (c *container) deleteProcess(id string) { - c.mu.Lock() - delete(c.execs, id) - c.mu.Unlock() -} - -func (c *container) getProcess(id string) containerd.Process { - c.mu.Lock() - p := c.execs[id] - c.mu.Unlock() - return p -} - -func (c *container) setOOMKilled(killed bool) { - c.mu.Lock() - c.oomKilled = killed - c.mu.Unlock() -} - -func (c *container) getOOMKilled() bool { - c.mu.Lock() - killed := c.oomKilled - c.mu.Unlock() - return killed -} +// DockerContainerBundlePath is the label key pointing to the container's bundle path +const DockerContainerBundlePath = "com.docker/engine.bundle.path" type client struct { - sync.RWMutex // protects containers map - client *containerd.Client stateDir string logger *logrus.Entry ns string - backend libcontainerdtypes.Backend - eventQ queue.Queue - containers map[string]*container + backend libcontainerdtypes.Backend + eventQ queue.Queue + oomMu sync.Mutex + oom map[string]bool } // NewClient creates a new libcontainerd client from a containerd client func NewClient(ctx context.Context, cli *containerd.Client, stateDir, ns string, b libcontainerdtypes.Backend) (libcontainerdtypes.Client, error) { c := &client{ - client: cli, - stateDir: stateDir, - logger: logrus.WithField("module", "libcontainerd").WithField("namespace", ns), - ns: ns, - backend: b, - containers: make(map[string]*container), + client: cli, + stateDir: stateDir, + logger: logrus.WithField("module", "libcontainerd").WithField("namespace", ns), + ns: ns, + backend: b, + oom: make(map[string]bool), } go c.processEventStream(ctx, ns) @@ -131,29 +75,7 @@ func (c *client) Version(ctx context.Context) (containerd.Version, error) { // Restore loads the containerd container. // It should not be called concurrently with any other operation for the given ID. -func (c *client) Restore(ctx context.Context, id string, attachStdio libcontainerdtypes.StdioCallback) (alive bool, pid int, err error) { - c.Lock() - _, ok := c.containers[id] - if ok { - c.Unlock() - return false, 0, errors.WithStack(errdefs.Conflict(errors.New("id already in use"))) - } - - cntr := &container{} - c.containers[id] = cntr - cntr.mu.Lock() - defer cntr.mu.Unlock() - - c.Unlock() - - defer func() { - if err != nil { - c.Lock() - delete(c.containers, id) - c.Unlock() - } - }() - +func (c *client) Restore(ctx context.Context, id string, attachStdio libcontainerdtypes.StdioCallback) (alive bool, pid int, p libcontainerdtypes.Process, err error) { var dio *cio.DirectIO defer func() { if err != nil && dio != nil { @@ -165,13 +87,12 @@ func (c *client) Restore(ctx context.Context, id string, attachStdio libcontaine ctr, err := c.client.LoadContainer(ctx, id) if err != nil { - return false, -1, errors.WithStack(wrapError(err)) + return false, -1, nil, errors.WithStack(wrapError(err)) } attachIO := func(fifos *cio.FIFOSet) (cio.IO, error) { // dio must be assigned to the previously defined dio for the defer above // to handle cleanup - dio, err = c.newDirectIO(ctx, fifos) if err != nil { return nil, err @@ -180,77 +101,57 @@ func (c *client) Restore(ctx context.Context, id string, attachStdio libcontaine } t, err := ctr.Task(ctx, attachIO) if err != nil && !containerderrors.IsNotFound(err) { - return false, -1, errors.Wrap(wrapError(err), "error getting containerd task for container") + return false, -1, nil, errors.Wrap(wrapError(err), "error getting containerd task for container") } if t != nil { s, err := t.Status(ctx) if err != nil { - return false, -1, errors.Wrap(wrapError(err), "error getting task status") + return false, -1, nil, errors.Wrap(wrapError(err), "error getting task status") } - alive = s.Status != containerd.Stopped pid = int(t.Pid()) } - cntr.bundleDir = filepath.Join(c.stateDir, id) - cntr.ctr = ctr - cntr.task = t - // TODO(mlaventure): load execs - c.logger.WithFields(logrus.Fields{ "container": id, "alive": alive, "pid": pid, }).Debug("restored container") - return alive, pid, nil + return alive, pid, &restoredProcess{ + p: t, + }, nil } func (c *client) Create(ctx context.Context, id string, ociSpec *specs.Spec, runtimeOptions interface{}) error { - if ctr := c.getContainer(id); ctr != nil { - return errors.WithStack(errdefs.Conflict(errors.New("id already in use"))) - } - - bdir, err := prepareBundleDir(filepath.Join(c.stateDir, id), ociSpec) - if err != nil { - return errdefs.System(errors.Wrap(err, "prepare bundle dir failed")) - } - + bdir := c.bundleDir(id) c.logger.WithField("bundle", bdir).WithField("root", ociSpec.Root.Path).Debug("bundle dir created") - cdCtr, err := c.client.NewContainer(ctx, id, + _, err := c.client.NewContainer(ctx, id, containerd.WithSpec(ociSpec), - containerd.WithRuntime(runtimeName, runtimeOptions)) + containerd.WithRuntime(runtimeName, runtimeOptions), + WithBundle(bdir, ociSpec), + ) if err != nil { + if containerderrors.IsAlreadyExists(err) { + return errors.WithStack(errdefs.Conflict(errors.New("id already in use"))) + } return wrapError(err) } - - c.Lock() - c.containers[id] = &container{ - bundleDir: bdir, - ctr: cdCtr, - } - c.Unlock() - return nil } // Start create and start a task for the specified containerd id func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (int, error) { - ctr := c.getContainer(id) - if ctr == nil { - return -1, errors.WithStack(errdefs.NotFound(errors.New("no such container"))) + ctr, err := c.getContainer(ctx, id) + if err != nil { + return -1, err } - if t := ctr.getTask(); t != nil { - return -1, errors.WithStack(errdefs.Conflict(errors.New("container already started"))) - } - var ( cp *types.Descriptor t containerd.Task rio cio.IO - err error stdinCloseSync = make(chan struct{}) ) @@ -278,14 +179,19 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin } } - spec, err := ctr.ctr.Spec(ctx) + spec, err := ctr.Spec(ctx) if err != nil { return -1, errors.Wrap(err, "failed to retrieve spec") } + labels, err := ctr.Labels(ctx) + if err != nil { + return -1, errors.Wrap(err, "failed to retreive labels") + } + bundle := labels[DockerContainerBundlePath] uid, gid := getSpecUser(spec) - t, err = ctr.ctr.NewTask(ctx, + t, err = ctr.NewTask(ctx, func(id string) (cio.IO, error) { - fifos := newFIFOSet(ctr.bundleDir, libcontainerdtypes.InitProcessName, withStdin, spec.Process.Terminal) + fifos := newFIFOSet(bundle, libcontainerdtypes.InitProcessName, withStdin, spec.Process.Terminal) rio, err = c.createIO(fifos, id, libcontainerdtypes.InitProcessName, stdinCloseSync, attachStdio) return rio, err @@ -315,8 +221,6 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin return -1, wrapError(err) } - ctr.setTask(t) - // Signal c.createIO that it can call CloseIO close(stdinCloseSync) @@ -325,7 +229,6 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin c.logger.WithError(err).WithField("container", id). Error("failed to delete task after fail start") } - ctr.setTask(nil) return -1, wrapError(err) } @@ -340,27 +243,30 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin // the Start call. stdinCloseSync channel should be closed after Start exec // process. func (c *client) Exec(ctx context.Context, containerID, processID string, spec *specs.Process, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (int, error) { - ctr := c.getContainer(containerID) - if ctr == nil { - return -1, errors.WithStack(errdefs.NotFound(errors.New("no such container"))) + ctr, err := c.getContainer(ctx, containerID) + if err != nil { + return -1, err } - t := ctr.getTask() - if t == nil { - return -1, errors.WithStack(errdefs.InvalidParameter(errors.New("container is not running"))) - } - - if p := ctr.getProcess(processID); p != nil { - return -1, errors.WithStack(errdefs.Conflict(errors.New("id already in use"))) + t, err := ctr.Task(ctx, nil) + if err != nil { + if containerderrors.IsNotFound(err) { + return -1, errors.WithStack(errdefs.InvalidParameter(errors.New("container is not running"))) + } + return -1, wrapError(err) } var ( p containerd.Process rio cio.IO - err error stdinCloseSync = make(chan struct{}) ) - fifos := newFIFOSet(ctr.bundleDir, processID, withStdin, spec.Terminal) + labels, err := ctr.Labels(ctx) + if err != nil { + return -1, wrapError(err) + } + + fifos := newFIFOSet(labels[DockerContainerBundlePath], processID, withStdin, spec.Terminal) defer func() { if err != nil { @@ -377,11 +283,12 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec * }) if err != nil { close(stdinCloseSync) + if containerderrors.IsAlreadyExists(err) { + return -1, errors.WithStack(errdefs.Conflict(errors.New("id already in use"))) + } return -1, wrapError(err) } - ctr.addProcess(processID, p) - // Signal c.createIO that it can call CloseIO // // the stdin of exec process will be created after p.Start in containerd @@ -394,15 +301,13 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec * ctx, cancel := context.WithTimeout(context.Background(), 45*time.Second) defer cancel() p.Delete(ctx) - ctr.deleteProcess(processID) return -1, wrapError(err) } - return int(p.Pid()), nil } func (c *client) SignalProcess(ctx context.Context, containerID, processID string, signal int) error { - p, err := c.getProcess(containerID, processID) + p, err := c.getProcess(ctx, containerID, processID) if err != nil { return err } @@ -410,7 +315,7 @@ func (c *client) SignalProcess(ctx context.Context, containerID, processID strin } func (c *client) ResizeTerminal(ctx context.Context, containerID, processID string, width, height int) error { - p, err := c.getProcess(containerID, processID) + p, err := c.getProcess(ctx, containerID, processID) if err != nil { return err } @@ -419,7 +324,7 @@ func (c *client) ResizeTerminal(ctx context.Context, containerID, processID stri } func (c *client) CloseStdin(ctx context.Context, containerID, processID string) error { - p, err := c.getProcess(containerID, processID) + p, err := c.getProcess(ctx, containerID, processID) if err != nil { return err } @@ -428,7 +333,7 @@ func (c *client) CloseStdin(ctx context.Context, containerID, processID string) } func (c *client) Pause(ctx context.Context, containerID string) error { - p, err := c.getProcess(containerID, libcontainerdtypes.InitProcessName) + p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName) if err != nil { return err } @@ -437,7 +342,7 @@ func (c *client) Pause(ctx context.Context, containerID string) error { } func (c *client) Resume(ctx context.Context, containerID string) error { - p, err := c.getProcess(containerID, libcontainerdtypes.InitProcessName) + p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName) if err != nil { return err } @@ -446,7 +351,7 @@ func (c *client) Resume(ctx context.Context, containerID string) error { } func (c *client) Stats(ctx context.Context, containerID string) (*libcontainerdtypes.Stats, error) { - p, err := c.getProcess(containerID, libcontainerdtypes.InitProcessName) + p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName) if err != nil { return nil, err } @@ -464,7 +369,7 @@ func (c *client) Stats(ctx context.Context, containerID string) (*libcontainerdt } func (c *client) ListPids(ctx context.Context, containerID string) ([]uint32, error) { - p, err := c.getProcess(containerID, libcontainerdtypes.InitProcessName) + p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName) if err != nil { return nil, err } @@ -483,7 +388,7 @@ func (c *client) ListPids(ctx context.Context, containerID string) ([]uint32, er } func (c *client) Summary(ctx context.Context, containerID string) ([]libcontainerdtypes.Summary, error) { - p, err := c.getProcess(containerID, libcontainerdtypes.InitProcessName) + p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName) if err != nil { return nil, err } @@ -509,68 +414,75 @@ func (c *client) Summary(ctx context.Context, containerID string) ([]libcontaine return infos, nil } +type restoredProcess struct { + p containerd.Process +} + +func (p *restoredProcess) Delete(ctx context.Context) (uint32, time.Time, error) { + if p.p == nil { + return 255, time.Now(), nil + } + status, err := p.p.Delete(ctx) + if err != nil { + return 255, time.Now(), nil + } + return status.ExitCode(), status.ExitTime(), nil +} + func (c *client) DeleteTask(ctx context.Context, containerID string) (uint32, time.Time, error) { - p, err := c.getProcess(containerID, libcontainerdtypes.InitProcessName) + p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName) if err != nil { return 255, time.Now(), nil } - status, err := p.(containerd.Task).Delete(ctx) + status, err := p.Delete(ctx) if err != nil { return 255, time.Now(), nil } - - if ctr := c.getContainer(containerID); ctr != nil { - ctr.setTask(nil) - } return status.ExitCode(), status.ExitTime(), nil } func (c *client) Delete(ctx context.Context, containerID string) error { - ctr := c.getContainer(containerID) - if ctr == nil { - return errors.WithStack(errdefs.NotFound(errors.New("no such container"))) + ctr, err := c.getContainer(ctx, containerID) + if err != nil { + return err } - - if err := ctr.ctr.Delete(ctx); err != nil { + labels, err := ctr.Labels(ctx) + if err != nil { + return err + } + bundle := labels[DockerContainerBundlePath] + if err := ctr.Delete(ctx); err != nil { return wrapError(err) } - + c.oomMu.Lock() + delete(c.oom, containerID) + c.oomMu.Unlock() if os.Getenv("LIBCONTAINERD_NOCLEAN") != "1" { - if err := os.RemoveAll(ctr.bundleDir); err != nil { + if err := os.RemoveAll(bundle); err != nil { c.logger.WithError(err).WithFields(logrus.Fields{ "container": containerID, - "bundle": ctr.bundleDir, + "bundle": bundle, }).Error("failed to remove state dir") } } - - c.removeContainer(containerID) - return nil } func (c *client) Status(ctx context.Context, containerID string) (containerd.ProcessStatus, error) { - ctr := c.getContainer(containerID) - if ctr == nil { - return containerd.Unknown, errors.WithStack(errdefs.NotFound(errors.New("no such container"))) + t, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName) + if err != nil { + return containerd.Unknown, err } - - t := ctr.getTask() - if t == nil { - return containerd.Unknown, errors.WithStack(errdefs.NotFound(errors.New("no such task"))) - } - s, err := t.Status(ctx) if err != nil { return containerd.Unknown, wrapError(err) } - return s.Status, nil } func (c *client) CreateCheckpoint(ctx context.Context, containerID, checkpointDir string, exit bool) error { - p, err := c.getProcess(containerID, libcontainerdtypes.InitProcessName) + p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName) if err != nil { return err } @@ -635,37 +547,38 @@ func (c *client) CreateCheckpoint(ctx context.Context, containerID, checkpointDi return err } -func (c *client) getContainer(id string) *container { - c.RLock() - ctr := c.containers[id] - c.RUnlock() - - return ctr -} - -func (c *client) removeContainer(id string) { - c.Lock() - delete(c.containers, id) - c.Unlock() -} - -func (c *client) getProcess(containerID, processID string) (containerd.Process, error) { - ctr := c.getContainer(containerID) - if ctr == nil { - return nil, errors.WithStack(errdefs.NotFound(errors.New("no such container"))) +func (c *client) getContainer(ctx context.Context, id string) (containerd.Container, error) { + ctr, err := c.client.LoadContainer(ctx, id) + if err != nil { + if containerderrors.IsNotFound(err) { + return nil, errors.WithStack(errdefs.NotFound(errors.New("no such container"))) + } + return nil, wrapError(err) } + return ctr, nil +} - t := ctr.getTask() - if t == nil { - return nil, errors.WithStack(errdefs.NotFound(errors.New("container is not running"))) +func (c *client) getProcess(ctx context.Context, containerID, processID string) (containerd.Process, error) { + ctr, err := c.getContainer(ctx, containerID) + if err != nil { + return nil, err + } + t, err := ctr.Task(ctx, nil) + if err != nil { + if containerderrors.IsNotFound(err) { + return nil, errors.WithStack(errdefs.NotFound(errors.New("container is not running"))) + } + return nil, wrapError(err) } if processID == libcontainerdtypes.InitProcessName { return t, nil } - - p := ctr.getProcess(processID) - if p == nil { - return nil, errors.WithStack(errdefs.NotFound(errors.New("no such exec"))) + p, err := t.LoadProcess(ctx, processID, nil) + if err != nil { + if containerderrors.IsNotFound(err) { + return nil, errors.WithStack(errdefs.NotFound(errors.New("no such exec"))) + } + return nil, wrapError(err) } return p, nil } @@ -695,7 +608,7 @@ func (c *client) createIO(fifos *cio.FIFOSet, containerID, processID string, std // Exec/Start call failed. go func() { <-stdinCloseSync - p, err := c.getProcess(containerID, processID) + p, err := c.getProcess(context.Background(), containerID, processID) if err == nil { err = p.CloseIO(context.Background(), containerd.WithStdinCloser) if err != nil && strings.Contains(err.Error(), "transport is closing") { @@ -716,7 +629,7 @@ func (c *client) createIO(fifos *cio.FIFOSet, containerID, processID string, std return rio, err } -func (c *client) processEvent(ctr *container, et libcontainerdtypes.EventType, ei libcontainerdtypes.EventInfo) { +func (c *client) processEvent(ctx context.Context, et libcontainerdtypes.EventType, ei libcontainerdtypes.EventInfo) { c.eventQ.Append(ei.ContainerID, func() { err := c.backend.ProcessEvent(ei.ContainerID, et, ei) if err != nil { @@ -728,10 +641,12 @@ func (c *client) processEvent(ctr *container, et libcontainerdtypes.EventType, e } if et == libcontainerdtypes.EventExit && ei.ProcessID != ei.ContainerID { - p := ctr.getProcess(ei.ProcessID) - if p == nil { + p, err := c.getProcess(ctx, ei.ContainerID, ei.ProcessID) + if err != nil { + c.logger.WithError(errors.New("no such process")). WithFields(logrus.Fields{ + "error": err, "container": ei.ContainerID, "process": ei.ProcessID, }).Error("exit event") @@ -744,15 +659,23 @@ func (c *client) processEvent(ctr *container, et libcontainerdtypes.EventType, e "process": ei.ProcessID, }).Warn("failed to delete process") } - ctr.deleteProcess(ei.ProcessID) - ctr := c.getContainer(ei.ContainerID) - if ctr == nil { + ctr, err := c.getContainer(ctx, ei.ContainerID) + if err != nil { c.logger.WithFields(logrus.Fields{ "container": ei.ContainerID, + "error": err, }).Error("failed to find container") } else { - newFIFOSet(ctr.bundleDir, ei.ProcessID, true, false).Close() + labels, err := ctr.Labels(ctx) + if err != nil { + c.logger.WithFields(logrus.Fields{ + "container": ei.ContainerID, + "error": err, + }).Error("failed to find container") + return + } + newFIFOSet(labels[DockerContainerBundlePath], ei.ProcessID, true, false).Close() } } }) @@ -764,7 +687,6 @@ func (c *client) processEventStream(ctx context.Context, ns string) { ev *events.Envelope et libcontainerdtypes.EventType ei libcontainerdtypes.EventInfo - ctr *container ) // Filter on both namespace *and* topic. To create an "and" filter, @@ -773,8 +695,8 @@ func (c *client) processEventStream(ctx context.Context, ns string) { c.logger.Debug("processing event stream") - var oomKilled bool for { + var oomKilled bool select { case err = <-errC: if err != nil { @@ -863,19 +785,14 @@ func (c *client) processEventStream(ctx context.Context, ns string) { continue } - ctr = c.getContainer(ei.ContainerID) - if ctr == nil { - c.logger.WithField("container", ei.ContainerID).Warn("unknown container") - continue - } - + c.oomMu.Lock() if oomKilled { - ctr.setOOMKilled(true) - oomKilled = false + c.oom[ei.ContainerID] = true } - ei.OOMKilled = ctr.getOOMKilled() + ei.OOMKilled = c.oom[ei.ContainerID] + c.oomMu.Unlock() - c.processEvent(ctr, et, ei) + c.processEvent(ctx, et, ei) } } } @@ -903,6 +820,10 @@ func (c *client) writeContent(ctx context.Context, mediaType, ref string, r io.R }, nil } +func (c *client) bundleDir(id string) string { + return filepath.Join(c.stateDir, id) +} + func wrapError(err error) error { switch { case err == nil: diff --git a/libcontainerd/remote/client_linux.go b/libcontainerd/remote/client_linux.go index 22e764fbd1..1c43ef61c3 100644 --- a/libcontainerd/remote/client_linux.go +++ b/libcontainerd/remote/client_linux.go @@ -9,6 +9,7 @@ import ( "github.com/containerd/containerd" "github.com/containerd/containerd/cio" + "github.com/containerd/containerd/containers" libcontainerdtypes "github.com/docker/docker/libcontainerd/types" "github.com/docker/docker/pkg/idtools" "github.com/opencontainers/runtime-spec/specs-go" @@ -22,7 +23,7 @@ func summaryFromInterface(i interface{}) (*libcontainerdtypes.Summary, error) { } func (c *client) UpdateResources(ctx context.Context, containerID string, resources *libcontainerdtypes.Resources) error { - p, err := c.getProcess(containerID, libcontainerdtypes.InitProcessName) + p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName) if err != nil { return err } @@ -58,29 +59,39 @@ func getSpecUser(ociSpec *specs.Spec) (int, int) { return uid, gid } -func prepareBundleDir(bundleDir string, ociSpec *specs.Spec) (string, error) { - uid, gid := getSpecUser(ociSpec) - if uid == 0 && gid == 0 { - return bundleDir, idtools.MkdirAllAndChownNew(bundleDir, 0755, idtools.Identity{UID: 0, GID: 0}) - } - - p := string(filepath.Separator) - components := strings.Split(bundleDir, string(filepath.Separator)) - for _, d := range components[1:] { - p = filepath.Join(p, d) - fi, err := os.Stat(p) - if err != nil && !os.IsNotExist(err) { - return "", err +// WithBundle creates the bundle for the container +func WithBundle(bundleDir string, ociSpec *specs.Spec) containerd.NewContainerOpts { + return func(ctx context.Context, client *containerd.Client, c *containers.Container) error { + if c.Labels == nil { + c.Labels = make(map[string]string) } - if os.IsNotExist(err) || fi.Mode()&1 == 0 { - p = fmt.Sprintf("%s.%d.%d", p, uid, gid) - if err := idtools.MkdirAndChown(p, 0700, idtools.Identity{UID: uid, GID: gid}); err != nil && !os.IsExist(err) { - return "", err + uid, gid := getSpecUser(ociSpec) + if uid == 0 && gid == 0 { + c.Labels[DockerContainerBundlePath] = bundleDir + return idtools.MkdirAllAndChownNew(bundleDir, 0755, idtools.Identity{UID: 0, GID: 0}) + } + + p := string(filepath.Separator) + components := strings.Split(bundleDir, string(filepath.Separator)) + for _, d := range components[1:] { + p = filepath.Join(p, d) + fi, err := os.Stat(p) + if err != nil && !os.IsNotExist(err) { + return err + } + if os.IsNotExist(err) || fi.Mode()&1 == 0 { + p = fmt.Sprintf("%s.%d.%d", p, uid, gid) + if err := idtools.MkdirAndChown(p, 0700, idtools.Identity{UID: uid, GID: gid}); err != nil && !os.IsExist(err) { + return err + } } } + if c.Labels == nil { + c.Labels = make(map[string]string) + } + c.Labels[DockerContainerBundlePath] = p + return nil } - - return p, nil } func newFIFOSet(bundleDir, processID string, withStdin, withTerminal bool) *cio.FIFOSet { diff --git a/libcontainerd/remote/client_windows.go b/libcontainerd/remote/client_windows.go index d6885622f9..35b9a190a8 100644 --- a/libcontainerd/remote/client_windows.go +++ b/libcontainerd/remote/client_windows.go @@ -7,7 +7,9 @@ import ( "path/filepath" "github.com/Microsoft/hcsshim/cmd/containerd-shim-runhcs-v1/options" + "github.com/containerd/containerd" "github.com/containerd/containerd/cio" + "github.com/containerd/containerd/containers" libcontainerdtypes "github.com/docker/docker/libcontainerd/types" specs "github.com/opencontainers/runtime-spec/specs-go" @@ -35,9 +37,16 @@ func summaryFromInterface(i interface{}) (*libcontainerdtypes.Summary, error) { } } -func prepareBundleDir(bundleDir string, ociSpec *specs.Spec) (string, error) { - // TODO: (containerd) Determine if we need to use system.MkdirAllWithACL here - return bundleDir, os.MkdirAll(bundleDir, 0755) +// WithBundle creates the bundle for the container +func WithBundle(bundleDir string, ociSpec *specs.Spec) containerd.NewContainerOpts { + return func(ctx context.Context, client *containerd.Client, c *containers.Container) error { + // TODO: (containerd) Determine if we need to use system.MkdirAllWithACL here + if c.Labels == nil { + c.Labels = make(map[string]string) + } + c.Labels[DockerContainerBundlePath] = bundleDir + return os.MkdirAll(bundleDir, 0755) + } } func pipeName(containerID, processID, name string) string { diff --git a/libcontainerd/types/types.go b/libcontainerd/types/types.go index 89290647ab..42d5f24476 100644 --- a/libcontainerd/types/types.go +++ b/libcontainerd/types/types.go @@ -41,11 +41,16 @@ type Backend interface { ProcessEvent(containerID string, event EventType, ei EventInfo) error } +// Process of a container +type Process interface { + Delete(context.Context) (uint32, time.Time, error) +} + // Client provides access to containerd features. type Client interface { Version(ctx context.Context) (containerd.Version, error) - Restore(ctx context.Context, containerID string, attachStdio StdioCallback) (alive bool, pid int, err error) + Restore(ctx context.Context, containerID string, attachStdio StdioCallback) (alive bool, pid int, p Process, err error) Create(ctx context.Context, containerID string, spec *specs.Spec, runtimeOptions interface{}) error Start(ctx context.Context, containerID, checkpointDir string, withStdin bool, attachStdio StdioCallback) (pid int, err error) diff --git a/plugin/executor/containerd/containerd.go b/plugin/executor/containerd/containerd.go index 61aaedf3ee..23418558d8 100644 --- a/plugin/executor/containerd/containerd.go +++ b/plugin/executor/containerd/containerd.go @@ -5,7 +5,6 @@ import ( "io" "path/filepath" "sync" - "time" "github.com/containerd/containerd" "github.com/containerd/containerd/cio" @@ -26,19 +25,6 @@ type ExitHandler interface { HandleExitEvent(id string) error } -// Client is used by the exector to perform operations. -// TODO(@cpuguy83): This should really just be based off the containerd client interface. -// However right now this whole package is tied to github.com/docker/docker/libcontainerd -type Client interface { - Create(ctx context.Context, containerID string, spec *specs.Spec, runtimeOptions interface{}) error - Restore(ctx context.Context, containerID string, attachStdio libcontainerdtypes.StdioCallback) (alive bool, pid int, err error) - Status(ctx context.Context, containerID string) (containerd.ProcessStatus, error) - Delete(ctx context.Context, containerID string) error - DeleteTask(ctx context.Context, containerID string) (uint32, time.Time, error) - Start(ctx context.Context, containerID, checkpointDir string, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (pid int, err error) - SignalProcess(ctx context.Context, containerID, processID string, signal int) error -} - // New creates a new containerd plugin executor func New(ctx context.Context, rootDir string, cli *containerd.Client, exitHandler ExitHandler) (*Executor, error) { e := &Executor{ @@ -57,19 +43,23 @@ func New(ctx context.Context, rootDir string, cli *containerd.Client, exitHandle // Executor is the containerd client implementation of a plugin executor type Executor struct { rootDir string - client Client + client libcontainerdtypes.Client exitHandler ExitHandler } // deleteTaskAndContainer deletes plugin task and then plugin container from containerd -func deleteTaskAndContainer(ctx context.Context, cli Client, id string) { - _, _, err := cli.DeleteTask(ctx, id) - if err != nil && !errdefs.IsNotFound(err) { - logrus.WithError(err).WithField("id", id).Error("failed to delete plugin task from containerd") +func deleteTaskAndContainer(ctx context.Context, cli libcontainerdtypes.Client, id string, p libcontainerdtypes.Process) { + if p != nil { + if _, _, err := p.Delete(ctx); err != nil && !errdefs.IsNotFound(err) { + logrus.WithError(err).WithField("id", id).Error("failed to delete plugin task from containerd") + } + } else { + if _, _, err := cli.DeleteTask(ctx, id); err != nil && !errdefs.IsNotFound(err) { + logrus.WithError(err).WithField("id", id).Error("failed to delete plugin task from containerd") + } } - err = cli.Delete(ctx, id) - if err != nil && !errdefs.IsNotFound(err) { + if err := cli.Delete(ctx, id); err != nil && !errdefs.IsNotFound(err) { logrus.WithError(err).WithField("id", id).Error("failed to delete plugin container from containerd") } } @@ -103,19 +93,19 @@ func (e *Executor) Create(id string, spec specs.Spec, stdout, stderr io.WriteClo _, err = e.client.Start(ctx, id, "", false, attachStreamsFunc(stdout, stderr)) if err != nil { - deleteTaskAndContainer(ctx, e.client, id) + deleteTaskAndContainer(ctx, e.client, id, nil) } return err } // Restore restores a container func (e *Executor) Restore(id string, stdout, stderr io.WriteCloser) (bool, error) { - alive, _, err := e.client.Restore(context.Background(), id, attachStreamsFunc(stdout, stderr)) + alive, _, p, err := e.client.Restore(context.Background(), id, attachStreamsFunc(stdout, stderr)) if err != nil && !errdefs.IsNotFound(err) { return false, err } if !alive { - deleteTaskAndContainer(context.Background(), e.client, id) + deleteTaskAndContainer(context.Background(), e.client, id, p) } return alive, nil } @@ -136,7 +126,7 @@ func (e *Executor) Signal(id string, signal int) error { func (e *Executor) ProcessEvent(id string, et libcontainerdtypes.EventType, ei libcontainerdtypes.EventInfo) error { switch et { case libcontainerdtypes.EventExit: - deleteTaskAndContainer(context.Background(), e.client, id) + deleteTaskAndContainer(context.Background(), e.client, id, nil) return e.exitHandler.HandleExitEvent(ei.ContainerID) } return nil diff --git a/plugin/executor/containerd/containerd_test.go b/plugin/executor/containerd/containerd_test.go deleted file mode 100644 index 8e2d616784..0000000000 --- a/plugin/executor/containerd/containerd_test.go +++ /dev/null @@ -1,149 +0,0 @@ -package containerd - -import ( - "context" - "io/ioutil" - "os" - "sync" - "testing" - "time" - - "github.com/containerd/containerd" - libcontainerdtypes "github.com/docker/docker/libcontainerd/types" - "github.com/opencontainers/runtime-spec/specs-go" - "github.com/pkg/errors" - "gotest.tools/assert" -) - -func TestLifeCycle(t *testing.T) { - t.Parallel() - - mock := newMockClient() - exec, cleanup := setupTest(t, mock, mock) - defer cleanup() - - id := "test-create" - mock.simulateStartError(true, id) - err := exec.Create(id, specs.Spec{}, nil, nil) - assert.Assert(t, err != nil) - mock.simulateStartError(false, id) - - err = exec.Create(id, specs.Spec{}, nil, nil) - assert.NilError(t, err) - running, _ := exec.IsRunning(id) - assert.Assert(t, running) - - // create with the same ID - err = exec.Create(id, specs.Spec{}, nil, nil) - assert.Assert(t, err != nil) - - mock.HandleExitEvent(id) // simulate a plugin that exits - - err = exec.Create(id, specs.Spec{}, nil, nil) - assert.NilError(t, err) -} - -func setupTest(t *testing.T, client Client, eh ExitHandler) (*Executor, func()) { - rootDir, err := ioutil.TempDir("", "test-daemon") - assert.NilError(t, err) - assert.Assert(t, client != nil) - assert.Assert(t, eh != nil) - - return &Executor{ - rootDir: rootDir, - client: client, - exitHandler: eh, - }, func() { - assert.Assert(t, os.RemoveAll(rootDir)) - } -} - -type mockClient struct { - mu sync.Mutex - containers map[string]bool - errorOnStart map[string]bool -} - -func newMockClient() *mockClient { - return &mockClient{ - containers: make(map[string]bool), - errorOnStart: make(map[string]bool), - } -} - -func (c *mockClient) Create(ctx context.Context, id string, _ *specs.Spec, _ interface{}) error { - c.mu.Lock() - defer c.mu.Unlock() - - if _, ok := c.containers[id]; ok { - return errors.New("exists") - } - - c.containers[id] = false - return nil -} - -func (c *mockClient) Restore(ctx context.Context, id string, attachStdio libcontainerdtypes.StdioCallback) (alive bool, pid int, err error) { - return false, 0, nil -} - -func (c *mockClient) Status(ctx context.Context, id string) (containerd.ProcessStatus, error) { - c.mu.Lock() - defer c.mu.Unlock() - - running, ok := c.containers[id] - if !ok { - return containerd.Unknown, errors.New("not found") - } - if running { - return containerd.Running, nil - } - return containerd.Stopped, nil -} - -func (c *mockClient) Delete(ctx context.Context, id string) error { - c.mu.Lock() - defer c.mu.Unlock() - delete(c.containers, id) - return nil -} - -func (c *mockClient) DeleteTask(ctx context.Context, id string) (uint32, time.Time, error) { - return 0, time.Time{}, nil -} - -func (c *mockClient) Start(ctx context.Context, id, checkpointDir string, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (pid int, err error) { - c.mu.Lock() - defer c.mu.Unlock() - - if _, ok := c.containers[id]; !ok { - return 0, errors.New("not found") - } - - if c.errorOnStart[id] { - return 0, errors.New("some startup error") - } - c.containers[id] = true - return 1, nil -} - -func (c *mockClient) SignalProcess(ctx context.Context, containerID, processID string, signal int) error { - return nil -} - -func (c *mockClient) simulateStartError(sim bool, id string) { - c.mu.Lock() - defer c.mu.Unlock() - if sim { - c.errorOnStart[id] = sim - return - } - delete(c.errorOnStart, id) -} - -func (c *mockClient) HandleExitEvent(id string) error { - c.mu.Lock() - defer c.mu.Unlock() - delete(c.containers, id) - return nil -}