|
@@ -5,6 +5,7 @@ import (
|
|
|
"io"
|
|
|
"path/filepath"
|
|
|
"sync"
|
|
|
+ "time"
|
|
|
|
|
|
"github.com/containerd/containerd/cio"
|
|
|
"github.com/containerd/containerd/linux/runctypes"
|
|
@@ -15,21 +16,34 @@ import (
|
|
|
"github.com/sirupsen/logrus"
|
|
|
)
|
|
|
|
|
|
-// PluginNamespace is the name used for the plugins namespace
|
|
|
-var PluginNamespace = "plugins.moby"
|
|
|
+// pluginNamespace is the name used for the plugins namespace
|
|
|
+const pluginNamespace = "plugins.moby"
|
|
|
|
|
|
// ExitHandler represents an object that is called when the exit event is received from containerd
|
|
|
type ExitHandler interface {
|
|
|
HandleExitEvent(id string) error
|
|
|
}
|
|
|
|
|
|
+// Client is used by the exector to perform operations.
|
|
|
+// TODO(@cpuguy83): This should really just be based off the containerd client interface.
|
|
|
+// However right now this whole package is tied to github.com/docker/docker/libcontainerd
|
|
|
+type Client interface {
|
|
|
+ Create(ctx context.Context, containerID string, spec *specs.Spec, runtimeOptions interface{}) error
|
|
|
+ Restore(ctx context.Context, containerID string, attachStdio libcontainerd.StdioCallback) (alive bool, pid int, err error)
|
|
|
+ Status(ctx context.Context, containerID string) (libcontainerd.Status, error)
|
|
|
+ Delete(ctx context.Context, containerID string) error
|
|
|
+ DeleteTask(ctx context.Context, containerID string) (uint32, time.Time, error)
|
|
|
+ Start(ctx context.Context, containerID, checkpointDir string, withStdin bool, attachStdio libcontainerd.StdioCallback) (pid int, err error)
|
|
|
+ SignalProcess(ctx context.Context, containerID, processID string, signal int) error
|
|
|
+}
|
|
|
+
|
|
|
// New creates a new containerd plugin executor
|
|
|
func New(rootDir string, remote libcontainerd.Remote, exitHandler ExitHandler) (*Executor, error) {
|
|
|
e := &Executor{
|
|
|
rootDir: rootDir,
|
|
|
exitHandler: exitHandler,
|
|
|
}
|
|
|
- client, err := remote.NewClient(PluginNamespace, e)
|
|
|
+ client, err := remote.NewClient(pluginNamespace, e)
|
|
|
if err != nil {
|
|
|
return nil, errors.Wrap(err, "error creating containerd exec client")
|
|
|
}
|
|
@@ -40,7 +54,7 @@ func New(rootDir string, remote libcontainerd.Remote, exitHandler ExitHandler) (
|
|
|
// Executor is the containerd client implementation of a plugin executor
|
|
|
type Executor struct {
|
|
|
rootDir string
|
|
|
- client libcontainerd.Client
|
|
|
+ client Client
|
|
|
exitHandler ExitHandler
|
|
|
}
|
|
|
|
|
@@ -52,10 +66,34 @@ func (e *Executor) Create(id string, spec specs.Spec, stdout, stderr io.WriteClo
|
|
|
ctx := context.Background()
|
|
|
err := e.client.Create(ctx, id, &spec, &opts)
|
|
|
if err != nil {
|
|
|
- return err
|
|
|
+ status, err2 := e.client.Status(ctx, id)
|
|
|
+ if err2 != nil {
|
|
|
+ if !errdefs.IsNotFound(err2) {
|
|
|
+ logrus.WithError(err2).WithField("id", id).Warn("Received an error while attempting to read plugin status")
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ if status != libcontainerd.StatusRunning && status != libcontainerd.StatusUnknown {
|
|
|
+ if err2 := e.client.Delete(ctx, id); err2 != nil && !errdefs.IsNotFound(err2) {
|
|
|
+ logrus.WithError(err2).WithField("plugin", id).Error("Error cleaning up containerd container")
|
|
|
+ }
|
|
|
+ err = e.client.Create(ctx, id, &spec, &opts)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ return errors.Wrap(err, "error creating containerd container")
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
_, err = e.client.Start(ctx, id, "", false, attachStreamsFunc(stdout, stderr))
|
|
|
+ if err != nil {
|
|
|
+ if _, _, err2 := e.client.DeleteTask(ctx, id); err2 != nil && !errdefs.IsNotFound(err2) {
|
|
|
+ logrus.WithError(err2).WithField("id", id).Warn("Received an error while attempting to clean up containerd plugin task after failed start")
|
|
|
+ }
|
|
|
+ if err2 := e.client.Delete(ctx, id); err2 != nil && !errdefs.IsNotFound(err2) {
|
|
|
+ logrus.WithError(err2).WithField("id", id).Warn("Received an error while attempting to clean up containerd plugin container after failed start")
|
|
|
+ }
|
|
|
+ }
|
|
|
return err
|
|
|
}
|
|
|
|
|
@@ -69,13 +107,11 @@ func (e *Executor) Restore(id string, stdout, stderr io.WriteCloser) error {
|
|
|
_, _, err = e.client.DeleteTask(context.Background(), id)
|
|
|
if err != nil && !errdefs.IsNotFound(err) {
|
|
|
logrus.WithError(err).Errorf("failed to delete container plugin %s task from containerd", id)
|
|
|
- return err
|
|
|
}
|
|
|
|
|
|
err = e.client.Delete(context.Background(), id)
|
|
|
if err != nil && !errdefs.IsNotFound(err) {
|
|
|
logrus.WithError(err).Errorf("failed to delete container plugin %s from containerd", id)
|
|
|
- return err
|
|
|
}
|
|
|
}
|
|
|
return nil
|