libcontainerd: create unstarted tasks

Split task creation and start into two separate method calls in the
libcontainerd API. Clients now have the opportunity to inspect the
freshly-created task and customize its runtime environment before
starting execution of the user-specified binary.

Signed-off-by: Cory Snider <csnider@mirantis.com>
This commit is contained in:
Cory Snider 2024-01-09 13:32:31 -05:00
parent afc7e581e6
commit 659d7b190f
5 changed files with 42 additions and 19 deletions

View file

@ -11,6 +11,7 @@ import (
"github.com/docker/docker/api/types/events" "github.com/docker/docker/api/types/events"
"github.com/docker/docker/container" "github.com/docker/docker/container"
"github.com/docker/docker/errdefs" "github.com/docker/docker/errdefs"
"github.com/docker/docker/internal/compatcontext"
"github.com/docker/docker/libcontainerd" "github.com/docker/docker/libcontainerd"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -198,16 +199,32 @@ func (daemon *Daemon) containerStart(ctx context.Context, daemonCfg *configStore
if err != nil { if err != nil {
return setExitCodeFromError(container.SetExitCode, err) return setExitCodeFromError(container.SetExitCode, err)
} }
defer func() {
if retErr != nil {
if err := ctr.Delete(compatcontext.WithoutCancel(ctx)); err != nil {
log.G(ctx).WithError(err).WithField("container", container.ID).
Error("failed to delete failed start container")
}
}
}()
// TODO(mlaventure): we need to specify checkpoint options here // TODO(mlaventure): we need to specify checkpoint options here
tsk, err := ctr.Start(context.TODO(), // Passing ctx to ctr.Start caused integration tests to be stuck in the cleanup phase tsk, err := ctr.NewTask(context.TODO(), // Passing ctx caused integration tests to be stuck in the cleanup phase
checkpointDir, container.StreamConfig.Stdin() != nil || container.Config.Tty, checkpointDir, container.StreamConfig.Stdin() != nil || container.Config.Tty,
container.InitializeStdio) container.InitializeStdio)
if err != nil { if err != nil {
if err := ctr.Delete(context.Background()); err != nil { return setExitCodeFromError(container.SetExitCode, err)
log.G(ctx).WithError(err).WithField("container", container.ID). }
Error("failed to delete failed start container") defer func() {
if retErr != nil {
if err := tsk.ForceDelete(compatcontext.WithoutCancel(ctx)); err != nil {
log.G(ctx).WithError(err).WithField("container", container.ID).
Error("failed to delete task after fail start")
}
} }
}()
if err := tsk.Start(context.TODO()); err != nil { // passing ctx caused integration tests to be stuck in the cleanup phase
return setExitCodeFromError(container.SetExitCode, err) return setExitCodeFromError(container.SetExitCode, err)
} }

View file

@ -387,7 +387,7 @@ func (c *client) extractResourcesFromSpec(spec *specs.Spec, configuration *hcssh
} }
} }
func (ctr *container) Start(_ context.Context, _ string, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (_ libcontainerdtypes.Task, retErr error) { func (ctr *container) NewTask(_ context.Context, _ string, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (_ libcontainerdtypes.Task, retErr error) {
ctr.mu.Lock() ctr.mu.Lock()
defer ctr.mu.Unlock() defer ctr.mu.Unlock()
@ -514,6 +514,11 @@ func (ctr *container) Start(_ context.Context, _ string, withStdin bool, attachS
return t, nil return t, nil
} }
func (*task) Start(context.Context) error {
// No-op on Windows.
return nil
}
func (ctr *container) Task(context.Context) (libcontainerdtypes.Task, error) { func (ctr *container) Task(context.Context) (libcontainerdtypes.Task, error) {
ctr.mu.Lock() ctr.mu.Lock()
defer ctr.mu.Unlock() defer ctr.mu.Unlock()

View file

@ -145,8 +145,8 @@ func (c *client) NewContainer(ctx context.Context, id string, ociSpec *specs.Spe
return &created, nil return &created, nil
} }
// Start create and start a task for the specified containerd id // NewTask creates a task for the specified containerd id
func (c *container) Start(ctx context.Context, checkpointDir string, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (libcontainerdtypes.Task, error) { func (c *container) NewTask(ctx context.Context, checkpointDir string, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (libcontainerdtypes.Task, error) {
var ( var (
checkpoint *types.Descriptor checkpoint *types.Descriptor
t containerd.Task t containerd.Task
@ -236,19 +236,14 @@ func (c *container) Start(ctx context.Context, checkpointDir string, withStdin b
// Signal c.createIO that it can call CloseIO // Signal c.createIO that it can call CloseIO
stdinCloseSync <- t stdinCloseSync <- t
if err := t.Start(ctx); err != nil {
// Only Stopped tasks can be deleted. Created tasks have to be
// killed first, to transition them to Stopped.
if _, err := t.Delete(ctx, containerd.WithProcessKill); err != nil {
c.client.logger.WithError(err).WithField("container", c.c8dCtr.ID()).
Error("failed to delete task after fail start")
}
return nil, wrapError(err)
}
return c.newTask(t), nil return c.newTask(t), nil
} }
func (t *task) Start(ctx context.Context) error {
return wrapError(t.Task.Start(ctx))
}
// Exec creates exec process. // Exec creates exec process.
// //
// The containerd client calls Exec to register the exec config in the shim side. // The containerd client calls Exec to register the exec config in the shim side.

View file

@ -64,7 +64,7 @@ type Client interface {
// Container provides access to a containerd container. // Container provides access to a containerd container.
type Container interface { type Container interface {
Start(ctx context.Context, checkpointDir string, withStdin bool, attachStdio StdioCallback) (Task, error) NewTask(ctx context.Context, checkpointDir string, withStdin bool, attachStdio StdioCallback) (Task, error)
Task(ctx context.Context) (Task, error) Task(ctx context.Context) (Task, error)
// AttachTask returns the current task for the container and reattaches // AttachTask returns the current task for the container and reattaches
// to the IO for the running task. If no task exists for the container // to the IO for the running task. If no task exists for the container
@ -79,6 +79,8 @@ type Container interface {
// Task provides access to a running containerd container. // Task provides access to a running containerd container.
type Task interface { type Task interface {
Process Process
// Start begins execution of the task
Start(context.Context) error
// Pause suspends the execution of the task // Pause suspends the execution of the task
Pause(context.Context) error Pause(context.Context) error
// Resume the execution of the task // Resume the execution of the task

View file

@ -81,11 +81,15 @@ func (e *Executor) Create(id string, spec specs.Spec, stdout, stderr io.WriteClo
} }
p := c8dPlugin{log: log.G(ctx).WithField("plugin", id), ctr: ctr} p := c8dPlugin{log: log.G(ctx).WithField("plugin", id), ctr: ctr}
p.tsk, err = ctr.Start(ctx, "", false, attachStreamsFunc(stdout, stderr)) p.tsk, err = ctr.NewTask(ctx, "", false, attachStreamsFunc(stdout, stderr))
if err != nil { if err != nil {
p.deleteTaskAndContainer(ctx) p.deleteTaskAndContainer(ctx)
return err return err
} }
if err := p.tsk.Start(ctx); err != nil {
p.deleteTaskAndContainer(ctx)
return err
}
e.mu.Lock() e.mu.Lock()
defer e.mu.Unlock() defer e.mu.Unlock()
e.plugins[id] = &p e.plugins[id] = &p