From f81172b9031160218e51fb2a7dbeee19962a60a9 Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Tue, 27 Mar 2018 10:03:53 -0400 Subject: [PATCH] Make sure plugin container is removed on failure Signed-off-by: Brian Goff --- libcontainerd/client_daemon.go | 18 +-- plugin/executor/containerd/containerd.go | 50 +++++- plugin/executor/containerd/containerd_test.go | 148 ++++++++++++++++++ 3 files changed, 200 insertions(+), 16 deletions(-) create mode 100644 plugin/executor/containerd/containerd_test.go diff --git a/libcontainerd/client_daemon.go b/libcontainerd/client_daemon.go index 2c914469a1..348aeb9d9f 100644 --- a/libcontainerd/client_daemon.go +++ b/libcontainerd/client_daemon.go @@ -205,7 +205,7 @@ func (c *client) Create(ctx context.Context, id string, ociSpec *specs.Spec, run // TODO(mlaventure): when containerd support lcow, revisit runtime value containerd.WithRuntime(fmt.Sprintf("io.containerd.runtime.v1.%s", runtime.GOOS), runtimeOptions)) if err != nil { - return err + return wrapError(err) } c.Lock() @@ -286,7 +286,7 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin rio.Cancel() rio.Close() } - return -1, err + return -1, wrapError(err) } ctr.setTask(t) @@ -300,7 +300,7 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin Error("failed to delete task after fail start") } ctr.setTask(nil) - return -1, err + return -1, wrapError(err) } return int(t.Pid()), nil @@ -344,7 +344,7 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec * }) if err != nil { close(stdinCloseSync) - return -1, err + return -1, wrapError(err) } ctr.addProcess(processID, p) @@ -355,7 +355,7 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec * if err = p.Start(ctx); err != nil { p.Delete(context.Background()) ctr.deleteProcess(processID) - return -1, err + return -1, wrapError(err) } return int(p.Pid()), nil @@ -393,7 +393,7 @@ func (c *client) Pause(ctx context.Context, containerID string) error { return err } - return p.(containerd.Task).Pause(ctx) + return wrapError(p.(containerd.Task).Pause(ctx)) } func (c *client) Resume(ctx context.Context, containerID string) error { @@ -493,7 +493,7 @@ func (c *client) Delete(ctx context.Context, containerID string) error { } if err := ctr.ctr.Delete(ctx); err != nil { - return err + return wrapError(err) } if os.Getenv("LIBCONTAINERD_NOCLEAN") != "1" { @@ -523,7 +523,7 @@ func (c *client) Status(ctx context.Context, containerID string) (Status, error) s, err := t.Status(ctx) if err != nil { - return StatusUnknown, err + return StatusUnknown, wrapError(err) } return Status(s.Status), nil @@ -537,7 +537,7 @@ func (c *client) CreateCheckpoint(ctx context.Context, containerID, checkpointDi img, err := p.(containerd.Task).Checkpoint(ctx) if err != nil { - return err + return wrapError(err) } // Whatever happens, delete the checkpoint from containerd defer func() { diff --git a/plugin/executor/containerd/containerd.go b/plugin/executor/containerd/containerd.go index 55313065bf..e490ef0a9e 100644 --- a/plugin/executor/containerd/containerd.go +++ b/plugin/executor/containerd/containerd.go @@ -5,6 +5,7 @@ import ( "io" "path/filepath" "sync" + "time" "github.com/containerd/containerd/cio" "github.com/containerd/containerd/linux/runctypes" @@ -15,21 +16,34 @@ import ( "github.com/sirupsen/logrus" ) -// PluginNamespace is the name used for the plugins namespace -var PluginNamespace = "plugins.moby" +// pluginNamespace is the name used for the plugins namespace +const pluginNamespace = "plugins.moby" // ExitHandler represents an object that is called when the exit event is received from containerd type ExitHandler interface { HandleExitEvent(id string) error } +// Client is used by the exector to perform operations. +// TODO(@cpuguy83): This should really just be based off the containerd client interface. +// However right now this whole package is tied to github.com/docker/docker/libcontainerd +type Client interface { + Create(ctx context.Context, containerID string, spec *specs.Spec, runtimeOptions interface{}) error + Restore(ctx context.Context, containerID string, attachStdio libcontainerd.StdioCallback) (alive bool, pid int, err error) + Status(ctx context.Context, containerID string) (libcontainerd.Status, error) + Delete(ctx context.Context, containerID string) error + DeleteTask(ctx context.Context, containerID string) (uint32, time.Time, error) + Start(ctx context.Context, containerID, checkpointDir string, withStdin bool, attachStdio libcontainerd.StdioCallback) (pid int, err error) + SignalProcess(ctx context.Context, containerID, processID string, signal int) error +} + // New creates a new containerd plugin executor func New(rootDir string, remote libcontainerd.Remote, exitHandler ExitHandler) (*Executor, error) { e := &Executor{ rootDir: rootDir, exitHandler: exitHandler, } - client, err := remote.NewClient(PluginNamespace, e) + client, err := remote.NewClient(pluginNamespace, e) if err != nil { return nil, errors.Wrap(err, "error creating containerd exec client") } @@ -40,7 +54,7 @@ func New(rootDir string, remote libcontainerd.Remote, exitHandler ExitHandler) ( // Executor is the containerd client implementation of a plugin executor type Executor struct { rootDir string - client libcontainerd.Client + client Client exitHandler ExitHandler } @@ -52,10 +66,34 @@ func (e *Executor) Create(id string, spec specs.Spec, stdout, stderr io.WriteClo ctx := context.Background() err := e.client.Create(ctx, id, &spec, &opts) if err != nil { - return err + status, err2 := e.client.Status(ctx, id) + if err2 != nil { + if !errdefs.IsNotFound(err2) { + logrus.WithError(err2).WithField("id", id).Warn("Received an error while attempting to read plugin status") + } + } else { + if status != libcontainerd.StatusRunning && status != libcontainerd.StatusUnknown { + if err2 := e.client.Delete(ctx, id); err2 != nil && !errdefs.IsNotFound(err2) { + logrus.WithError(err2).WithField("plugin", id).Error("Error cleaning up containerd container") + } + err = e.client.Create(ctx, id, &spec, &opts) + } + } + + if err != nil { + return errors.Wrap(err, "error creating containerd container") + } } _, err = e.client.Start(ctx, id, "", false, attachStreamsFunc(stdout, stderr)) + if err != nil { + if _, _, err2 := e.client.DeleteTask(ctx, id); err2 != nil && !errdefs.IsNotFound(err2) { + logrus.WithError(err2).WithField("id", id).Warn("Received an error while attempting to clean up containerd plugin task after failed start") + } + if err2 := e.client.Delete(ctx, id); err2 != nil && !errdefs.IsNotFound(err2) { + logrus.WithError(err2).WithField("id", id).Warn("Received an error while attempting to clean up containerd plugin container after failed start") + } + } return err } @@ -69,13 +107,11 @@ func (e *Executor) Restore(id string, stdout, stderr io.WriteCloser) error { _, _, err = e.client.DeleteTask(context.Background(), id) if err != nil && !errdefs.IsNotFound(err) { logrus.WithError(err).Errorf("failed to delete container plugin %s task from containerd", id) - return err } err = e.client.Delete(context.Background(), id) if err != nil && !errdefs.IsNotFound(err) { logrus.WithError(err).Errorf("failed to delete container plugin %s from containerd", id) - return err } } return nil diff --git a/plugin/executor/containerd/containerd_test.go b/plugin/executor/containerd/containerd_test.go new file mode 100644 index 0000000000..cd1a51bd35 --- /dev/null +++ b/plugin/executor/containerd/containerd_test.go @@ -0,0 +1,148 @@ +package containerd + +import ( + "context" + "io/ioutil" + "os" + "sync" + "testing" + "time" + + "github.com/docker/docker/libcontainerd" + "github.com/gotestyourself/gotestyourself/assert" + specs "github.com/opencontainers/runtime-spec/specs-go" + "github.com/pkg/errors" +) + +func TestLifeCycle(t *testing.T) { + t.Parallel() + + mock := newMockClient() + exec, cleanup := setupTest(t, mock, mock) + defer cleanup() + + id := "test-create" + mock.simulateStartError(true, id) + err := exec.Create(id, specs.Spec{}, nil, nil) + assert.Assert(t, err != nil) + mock.simulateStartError(false, id) + + err = exec.Create(id, specs.Spec{}, nil, nil) + assert.Assert(t, err) + running, _ := exec.IsRunning(id) + assert.Assert(t, running) + + // create with the same ID + err = exec.Create(id, specs.Spec{}, nil, nil) + assert.Assert(t, err != nil) + + mock.HandleExitEvent(id) // simulate a plugin that exits + + err = exec.Create(id, specs.Spec{}, nil, nil) + assert.Assert(t, err) +} + +func setupTest(t *testing.T, client Client, eh ExitHandler) (*Executor, func()) { + rootDir, err := ioutil.TempDir("", "test-daemon") + assert.Assert(t, err) + assert.Assert(t, client != nil) + assert.Assert(t, eh != nil) + + return &Executor{ + rootDir: rootDir, + client: client, + exitHandler: eh, + }, func() { + assert.Assert(t, os.RemoveAll(rootDir)) + } +} + +type mockClient struct { + mu sync.Mutex + containers map[string]bool + errorOnStart map[string]bool +} + +func newMockClient() *mockClient { + return &mockClient{ + containers: make(map[string]bool), + errorOnStart: make(map[string]bool), + } +} + +func (c *mockClient) Create(ctx context.Context, id string, _ *specs.Spec, _ interface{}) error { + c.mu.Lock() + defer c.mu.Unlock() + + if _, ok := c.containers[id]; ok { + return errors.New("exists") + } + + c.containers[id] = false + return nil +} + +func (c *mockClient) Restore(ctx context.Context, id string, attachStdio libcontainerd.StdioCallback) (alive bool, pid int, err error) { + return false, 0, nil +} + +func (c *mockClient) Status(ctx context.Context, id string) (libcontainerd.Status, error) { + c.mu.Lock() + defer c.mu.Unlock() + + running, ok := c.containers[id] + if !ok { + return libcontainerd.StatusUnknown, errors.New("not found") + } + if running { + return libcontainerd.StatusRunning, nil + } + return libcontainerd.StatusStopped, nil +} + +func (c *mockClient) Delete(ctx context.Context, id string) error { + c.mu.Lock() + defer c.mu.Unlock() + delete(c.containers, id) + return nil +} + +func (c *mockClient) DeleteTask(ctx context.Context, id string) (uint32, time.Time, error) { + return 0, time.Time{}, nil +} + +func (c *mockClient) Start(ctx context.Context, id, checkpointDir string, withStdin bool, attachStdio libcontainerd.StdioCallback) (pid int, err error) { + c.mu.Lock() + defer c.mu.Unlock() + + if _, ok := c.containers[id]; !ok { + return 0, errors.New("not found") + } + + if c.errorOnStart[id] { + return 0, errors.New("some startup error") + } + c.containers[id] = true + return 1, nil +} + +func (c *mockClient) SignalProcess(ctx context.Context, containerID, processID string, signal int) error { + return nil +} + +func (c *mockClient) simulateStartError(sim bool, id string) { + c.mu.Lock() + defer c.mu.Unlock() + if sim { + c.errorOnStart[id] = sim + return + } + delete(c.errorOnStart, id) +} + +func (c *mockClient) HandleExitEvent(id string) error { + c.mu.Lock() + defer c.mu.Unlock() + delete(c.containers, id) + return nil +}