Merge pull request #36715 from cpuguy83/plugin_exec_fixes
Make sure plugin container is removed on failure
This commit is contained in:
commit
859e43e64c
3 changed files with 200 additions and 16 deletions
|
@ -205,7 +205,7 @@ func (c *client) Create(ctx context.Context, id string, ociSpec *specs.Spec, run
|
|||
// TODO(mlaventure): when containerd support lcow, revisit runtime value
|
||||
containerd.WithRuntime(fmt.Sprintf("io.containerd.runtime.v1.%s", runtime.GOOS), runtimeOptions))
|
||||
if err != nil {
|
||||
return err
|
||||
return wrapError(err)
|
||||
}
|
||||
|
||||
c.Lock()
|
||||
|
@ -286,7 +286,7 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin
|
|||
rio.Cancel()
|
||||
rio.Close()
|
||||
}
|
||||
return -1, err
|
||||
return -1, wrapError(err)
|
||||
}
|
||||
|
||||
ctr.setTask(t)
|
||||
|
@ -300,7 +300,7 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin
|
|||
Error("failed to delete task after fail start")
|
||||
}
|
||||
ctr.setTask(nil)
|
||||
return -1, err
|
||||
return -1, wrapError(err)
|
||||
}
|
||||
|
||||
return int(t.Pid()), nil
|
||||
|
@ -344,7 +344,7 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec *
|
|||
})
|
||||
if err != nil {
|
||||
close(stdinCloseSync)
|
||||
return -1, err
|
||||
return -1, wrapError(err)
|
||||
}
|
||||
|
||||
ctr.addProcess(processID, p)
|
||||
|
@ -355,7 +355,7 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec *
|
|||
if err = p.Start(ctx); err != nil {
|
||||
p.Delete(context.Background())
|
||||
ctr.deleteProcess(processID)
|
||||
return -1, err
|
||||
return -1, wrapError(err)
|
||||
}
|
||||
|
||||
return int(p.Pid()), nil
|
||||
|
@ -393,7 +393,7 @@ func (c *client) Pause(ctx context.Context, containerID string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
return p.(containerd.Task).Pause(ctx)
|
||||
return wrapError(p.(containerd.Task).Pause(ctx))
|
||||
}
|
||||
|
||||
func (c *client) Resume(ctx context.Context, containerID string) error {
|
||||
|
@ -493,7 +493,7 @@ func (c *client) Delete(ctx context.Context, containerID string) error {
|
|||
}
|
||||
|
||||
if err := ctr.ctr.Delete(ctx); err != nil {
|
||||
return err
|
||||
return wrapError(err)
|
||||
}
|
||||
|
||||
if os.Getenv("LIBCONTAINERD_NOCLEAN") != "1" {
|
||||
|
@ -523,7 +523,7 @@ func (c *client) Status(ctx context.Context, containerID string) (Status, error)
|
|||
|
||||
s, err := t.Status(ctx)
|
||||
if err != nil {
|
||||
return StatusUnknown, err
|
||||
return StatusUnknown, wrapError(err)
|
||||
}
|
||||
|
||||
return Status(s.Status), nil
|
||||
|
@ -537,7 +537,7 @@ func (c *client) CreateCheckpoint(ctx context.Context, containerID, checkpointDi
|
|||
|
||||
img, err := p.(containerd.Task).Checkpoint(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
return wrapError(err)
|
||||
}
|
||||
// Whatever happens, delete the checkpoint from containerd
|
||||
defer func() {
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"io"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd/cio"
|
||||
"github.com/containerd/containerd/linux/runctypes"
|
||||
|
@ -15,21 +16,34 @@ import (
|
|||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// PluginNamespace is the name used for the plugins namespace
|
||||
var PluginNamespace = "plugins.moby"
|
||||
// pluginNamespace is the name used for the plugins namespace
|
||||
const pluginNamespace = "plugins.moby"
|
||||
|
||||
// ExitHandler represents an object that is called when the exit event is received from containerd
|
||||
type ExitHandler interface {
|
||||
HandleExitEvent(id string) error
|
||||
}
|
||||
|
||||
// Client is used by the exector to perform operations.
|
||||
// TODO(@cpuguy83): This should really just be based off the containerd client interface.
|
||||
// However right now this whole package is tied to github.com/docker/docker/libcontainerd
|
||||
type Client interface {
|
||||
Create(ctx context.Context, containerID string, spec *specs.Spec, runtimeOptions interface{}) error
|
||||
Restore(ctx context.Context, containerID string, attachStdio libcontainerd.StdioCallback) (alive bool, pid int, err error)
|
||||
Status(ctx context.Context, containerID string) (libcontainerd.Status, error)
|
||||
Delete(ctx context.Context, containerID string) error
|
||||
DeleteTask(ctx context.Context, containerID string) (uint32, time.Time, error)
|
||||
Start(ctx context.Context, containerID, checkpointDir string, withStdin bool, attachStdio libcontainerd.StdioCallback) (pid int, err error)
|
||||
SignalProcess(ctx context.Context, containerID, processID string, signal int) error
|
||||
}
|
||||
|
||||
// New creates a new containerd plugin executor
|
||||
func New(rootDir string, remote libcontainerd.Remote, exitHandler ExitHandler) (*Executor, error) {
|
||||
e := &Executor{
|
||||
rootDir: rootDir,
|
||||
exitHandler: exitHandler,
|
||||
}
|
||||
client, err := remote.NewClient(PluginNamespace, e)
|
||||
client, err := remote.NewClient(pluginNamespace, e)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error creating containerd exec client")
|
||||
}
|
||||
|
@ -40,7 +54,7 @@ func New(rootDir string, remote libcontainerd.Remote, exitHandler ExitHandler) (
|
|||
// Executor is the containerd client implementation of a plugin executor
|
||||
type Executor struct {
|
||||
rootDir string
|
||||
client libcontainerd.Client
|
||||
client Client
|
||||
exitHandler ExitHandler
|
||||
}
|
||||
|
||||
|
@ -52,10 +66,34 @@ func (e *Executor) Create(id string, spec specs.Spec, stdout, stderr io.WriteClo
|
|||
ctx := context.Background()
|
||||
err := e.client.Create(ctx, id, &spec, &opts)
|
||||
if err != nil {
|
||||
return err
|
||||
status, err2 := e.client.Status(ctx, id)
|
||||
if err2 != nil {
|
||||
if !errdefs.IsNotFound(err2) {
|
||||
logrus.WithError(err2).WithField("id", id).Warn("Received an error while attempting to read plugin status")
|
||||
}
|
||||
} else {
|
||||
if status != libcontainerd.StatusRunning && status != libcontainerd.StatusUnknown {
|
||||
if err2 := e.client.Delete(ctx, id); err2 != nil && !errdefs.IsNotFound(err2) {
|
||||
logrus.WithError(err2).WithField("plugin", id).Error("Error cleaning up containerd container")
|
||||
}
|
||||
err = e.client.Create(ctx, id, &spec, &opts)
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error creating containerd container")
|
||||
}
|
||||
}
|
||||
|
||||
_, err = e.client.Start(ctx, id, "", false, attachStreamsFunc(stdout, stderr))
|
||||
if err != nil {
|
||||
if _, _, err2 := e.client.DeleteTask(ctx, id); err2 != nil && !errdefs.IsNotFound(err2) {
|
||||
logrus.WithError(err2).WithField("id", id).Warn("Received an error while attempting to clean up containerd plugin task after failed start")
|
||||
}
|
||||
if err2 := e.client.Delete(ctx, id); err2 != nil && !errdefs.IsNotFound(err2) {
|
||||
logrus.WithError(err2).WithField("id", id).Warn("Received an error while attempting to clean up containerd plugin container after failed start")
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -69,13 +107,11 @@ func (e *Executor) Restore(id string, stdout, stderr io.WriteCloser) error {
|
|||
_, _, err = e.client.DeleteTask(context.Background(), id)
|
||||
if err != nil && !errdefs.IsNotFound(err) {
|
||||
logrus.WithError(err).Errorf("failed to delete container plugin %s task from containerd", id)
|
||||
return err
|
||||
}
|
||||
|
||||
err = e.client.Delete(context.Background(), id)
|
||||
if err != nil && !errdefs.IsNotFound(err) {
|
||||
logrus.WithError(err).Errorf("failed to delete container plugin %s from containerd", id)
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
|
148
plugin/executor/containerd/containerd_test.go
Normal file
148
plugin/executor/containerd/containerd_test.go
Normal file
|
@ -0,0 +1,148 @@
|
|||
package containerd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/docker/docker/libcontainerd"
|
||||
"github.com/gotestyourself/gotestyourself/assert"
|
||||
specs "github.com/opencontainers/runtime-spec/specs-go"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func TestLifeCycle(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
mock := newMockClient()
|
||||
exec, cleanup := setupTest(t, mock, mock)
|
||||
defer cleanup()
|
||||
|
||||
id := "test-create"
|
||||
mock.simulateStartError(true, id)
|
||||
err := exec.Create(id, specs.Spec{}, nil, nil)
|
||||
assert.Assert(t, err != nil)
|
||||
mock.simulateStartError(false, id)
|
||||
|
||||
err = exec.Create(id, specs.Spec{}, nil, nil)
|
||||
assert.Assert(t, err)
|
||||
running, _ := exec.IsRunning(id)
|
||||
assert.Assert(t, running)
|
||||
|
||||
// create with the same ID
|
||||
err = exec.Create(id, specs.Spec{}, nil, nil)
|
||||
assert.Assert(t, err != nil)
|
||||
|
||||
mock.HandleExitEvent(id) // simulate a plugin that exits
|
||||
|
||||
err = exec.Create(id, specs.Spec{}, nil, nil)
|
||||
assert.Assert(t, err)
|
||||
}
|
||||
|
||||
func setupTest(t *testing.T, client Client, eh ExitHandler) (*Executor, func()) {
|
||||
rootDir, err := ioutil.TempDir("", "test-daemon")
|
||||
assert.Assert(t, err)
|
||||
assert.Assert(t, client != nil)
|
||||
assert.Assert(t, eh != nil)
|
||||
|
||||
return &Executor{
|
||||
rootDir: rootDir,
|
||||
client: client,
|
||||
exitHandler: eh,
|
||||
}, func() {
|
||||
assert.Assert(t, os.RemoveAll(rootDir))
|
||||
}
|
||||
}
|
||||
|
||||
type mockClient struct {
|
||||
mu sync.Mutex
|
||||
containers map[string]bool
|
||||
errorOnStart map[string]bool
|
||||
}
|
||||
|
||||
func newMockClient() *mockClient {
|
||||
return &mockClient{
|
||||
containers: make(map[string]bool),
|
||||
errorOnStart: make(map[string]bool),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *mockClient) Create(ctx context.Context, id string, _ *specs.Spec, _ interface{}) error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
if _, ok := c.containers[id]; ok {
|
||||
return errors.New("exists")
|
||||
}
|
||||
|
||||
c.containers[id] = false
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *mockClient) Restore(ctx context.Context, id string, attachStdio libcontainerd.StdioCallback) (alive bool, pid int, err error) {
|
||||
return false, 0, nil
|
||||
}
|
||||
|
||||
func (c *mockClient) Status(ctx context.Context, id string) (libcontainerd.Status, error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
running, ok := c.containers[id]
|
||||
if !ok {
|
||||
return libcontainerd.StatusUnknown, errors.New("not found")
|
||||
}
|
||||
if running {
|
||||
return libcontainerd.StatusRunning, nil
|
||||
}
|
||||
return libcontainerd.StatusStopped, nil
|
||||
}
|
||||
|
||||
func (c *mockClient) Delete(ctx context.Context, id string) error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
delete(c.containers, id)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *mockClient) DeleteTask(ctx context.Context, id string) (uint32, time.Time, error) {
|
||||
return 0, time.Time{}, nil
|
||||
}
|
||||
|
||||
func (c *mockClient) Start(ctx context.Context, id, checkpointDir string, withStdin bool, attachStdio libcontainerd.StdioCallback) (pid int, err error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
if _, ok := c.containers[id]; !ok {
|
||||
return 0, errors.New("not found")
|
||||
}
|
||||
|
||||
if c.errorOnStart[id] {
|
||||
return 0, errors.New("some startup error")
|
||||
}
|
||||
c.containers[id] = true
|
||||
return 1, nil
|
||||
}
|
||||
|
||||
func (c *mockClient) SignalProcess(ctx context.Context, containerID, processID string, signal int) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *mockClient) simulateStartError(sim bool, id string) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if sim {
|
||||
c.errorOnStart[id] = sim
|
||||
return
|
||||
}
|
||||
delete(c.errorOnStart, id)
|
||||
}
|
||||
|
||||
func (c *mockClient) HandleExitEvent(id string) error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
delete(c.containers, id)
|
||||
return nil
|
||||
}
|
Loading…
Reference in a new issue