package remote // import "github.com/docker/docker/libcontainerd/remote" import ( "context" "encoding/json" "io" "os" "path/filepath" "reflect" "runtime" "strings" "sync" "syscall" "time" "github.com/containerd/containerd" apievents "github.com/containerd/containerd/api/events" "github.com/containerd/containerd/api/types" "github.com/containerd/containerd/archive" "github.com/containerd/containerd/cio" "github.com/containerd/containerd/content" cerrdefs "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/images" "github.com/containerd/containerd/protobuf" v2runcoptions "github.com/containerd/containerd/runtime/v2/runc/options" "github.com/containerd/log" "github.com/containerd/typeurl/v2" "github.com/docker/docker/errdefs" "github.com/docker/docker/libcontainerd/queue" libcontainerdtypes "github.com/docker/docker/libcontainerd/types" "github.com/docker/docker/pkg/ioutils" "github.com/hashicorp/go-multierror" "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" specs "github.com/opencontainers/runtime-spec/specs-go" "github.com/pkg/errors" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" ) // DockerContainerBundlePath is the label key pointing to the container's bundle path const DockerContainerBundlePath = "com.docker/engine.bundle.path" type client struct { client *containerd.Client stateDir string logger *log.Entry ns string backend libcontainerdtypes.Backend eventQ queue.Queue } type container struct { client *client c8dCtr containerd.Container v2runcoptions *v2runcoptions.Options } type task struct { containerd.Task ctr *container } type process struct { containerd.Process } // NewClient creates a new libcontainerd client from a containerd client func NewClient(ctx context.Context, cli *containerd.Client, stateDir, ns string, b libcontainerdtypes.Backend) (libcontainerdtypes.Client, error) { c := &client{ client: cli, stateDir: stateDir, logger: log.G(ctx).WithField("module", "libcontainerd").WithField("namespace", ns), ns: ns, backend: b, } go c.processEventStream(ctx, ns) return c, nil } func (c *client) Version(ctx context.Context) (containerd.Version, error) { return c.client.Version(ctx) } func (c *container) newTask(t containerd.Task) *task { return &task{Task: t, ctr: c} } func (c *container) AttachTask(ctx context.Context, attachStdio libcontainerdtypes.StdioCallback) (_ libcontainerdtypes.Task, err error) { var dio *cio.DirectIO defer func() { if err != nil && dio != nil { dio.Cancel() dio.Close() } }() attachIO := func(fifos *cio.FIFOSet) (cio.IO, error) { // dio must be assigned to the previously defined dio for the defer above // to handle cleanup dio, err = c.client.newDirectIO(ctx, fifos) if err != nil { return nil, err } return attachStdio(dio) } t, err := c.c8dCtr.Task(ctx, attachIO) if err != nil { return nil, errors.Wrap(wrapError(err), "error getting containerd task for container") } return c.newTask(t), nil } func (c *client) NewContainer(ctx context.Context, id string, ociSpec *specs.Spec, shim string, runtimeOptions interface{}, opts ...containerd.NewContainerOpts) (libcontainerdtypes.Container, error) { bdir := c.bundleDir(id) c.logger.WithField("bundle", bdir).WithField("root", ociSpec.Root.Path).Debug("bundle dir created") newOpts := []containerd.NewContainerOpts{ containerd.WithSpec(ociSpec), containerd.WithRuntime(shim, runtimeOptions), WithBundle(bdir, ociSpec), } opts = append(opts, newOpts...) ctr, err := c.client.NewContainer(ctx, id, opts...) if err != nil { if cerrdefs.IsAlreadyExists(err) { return nil, errors.WithStack(errdefs.Conflict(errors.New("id already in use"))) } return nil, wrapError(err) } created := container{ client: c, c8dCtr: ctr, } if x, ok := runtimeOptions.(*v2runcoptions.Options); ok { created.v2runcoptions = x } return &created, nil } // NewTask creates a task for the specified containerd id func (c *container) NewTask(ctx context.Context, checkpointDir string, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (libcontainerdtypes.Task, error) { var ( checkpoint *types.Descriptor t containerd.Task rio cio.IO stdinCloseSync = make(chan containerd.Process, 1) ) if checkpointDir != "" { // write checkpoint to the content store tar := archive.Diff(ctx, "", checkpointDir) var err error checkpoint, err = c.client.writeContent(ctx, images.MediaTypeContainerd1Checkpoint, checkpointDir, tar) // remove the checkpoint when we're done defer func() { if checkpoint != nil { err := c.client.client.ContentStore().Delete(ctx, digest.Digest(checkpoint.Digest)) if err != nil { c.client.logger.WithError(err).WithFields(log.Fields{ "ref": checkpointDir, "digest": checkpoint.Digest, }).Warnf("failed to delete temporary checkpoint entry") } } }() if err := tar.Close(); err != nil { return nil, errors.Wrap(err, "failed to close checkpoint tar stream") } if err != nil { return nil, errors.Wrapf(err, "failed to upload checkpoint to containerd") } } // Optimization: assume the relevant metadata has not changed in the // moment since the container was created. Elide redundant RPC requests // to refresh the metadata separately for spec and labels. md, err := c.c8dCtr.Info(ctx, containerd.WithoutRefreshedMetadata) if err != nil { return nil, errors.Wrap(err, "failed to retrieve metadata") } bundle := md.Labels[DockerContainerBundlePath] var spec specs.Spec if err := json.Unmarshal(md.Spec.GetValue(), &spec); err != nil { return nil, errors.Wrap(err, "failed to retrieve spec") } uid, gid := getSpecUser(&spec) taskOpts := []containerd.NewTaskOpts{ func(_ context.Context, _ *containerd.Client, info *containerd.TaskInfo) error { info.Checkpoint = checkpoint return nil }, } if runtime.GOOS != "windows" { taskOpts = append(taskOpts, func(_ context.Context, _ *containerd.Client, info *containerd.TaskInfo) error { if c.v2runcoptions != nil { opts := proto.Clone(c.v2runcoptions).(*v2runcoptions.Options) opts.IoUid = uint32(uid) opts.IoGid = uint32(gid) info.Options = opts } return nil }) } else { taskOpts = append(taskOpts, withLogLevel(c.client.logger.Level)) } t, err = c.c8dCtr.NewTask(ctx, func(id string) (cio.IO, error) { fifos := newFIFOSet(bundle, id, withStdin, spec.Process.Terminal) rio, err = c.createIO(fifos, stdinCloseSync, attachStdio) return rio, err }, taskOpts..., ) if err != nil { close(stdinCloseSync) if rio != nil { rio.Cancel() rio.Close() } return nil, errors.Wrap(wrapError(err), "failed to create task for container") } // Signal c.createIO that it can call CloseIO stdinCloseSync <- t return c.newTask(t), nil } func (t *task) Start(ctx context.Context) error { return wrapError(t.Task.Start(ctx)) } // Exec creates exec process. // // The containerd client calls Exec to register the exec config in the shim side. // When the client calls Start, the shim will create stdin fifo if needs. But // for the container main process, the stdin fifo will be created in Create not // the Start call. stdinCloseSync channel should be closed after Start exec // process. func (t *task) Exec(ctx context.Context, processID string, spec *specs.Process, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (libcontainerdtypes.Process, error) { var ( p containerd.Process rio cio.IO stdinCloseSync = make(chan containerd.Process, 1) ) // Optimization: assume the DockerContainerBundlePath label has not been // updated since the container metadata was last loaded/refreshed. md, err := t.ctr.c8dCtr.Info(ctx, containerd.WithoutRefreshedMetadata) if err != nil { return nil, wrapError(err) } fifos := newFIFOSet(md.Labels[DockerContainerBundlePath], processID, withStdin, spec.Terminal) defer func() { if err != nil { if rio != nil { rio.Cancel() rio.Close() } } }() p, err = t.Task.Exec(ctx, processID, spec, func(id string) (cio.IO, error) { rio, err = t.ctr.createIO(fifos, stdinCloseSync, attachStdio) return rio, err }) if err != nil { close(stdinCloseSync) if cerrdefs.IsAlreadyExists(err) { return nil, errors.WithStack(errdefs.Conflict(errors.New("id already in use"))) } return nil, wrapError(err) } // Signal c.createIO that it can call CloseIO // // the stdin of exec process will be created after p.Start in containerd defer func() { stdinCloseSync <- p }() if err = p.Start(ctx); err != nil { // use new context for cleanup because old one may be cancelled by user, but leave a timeout to make sure // we are not waiting forever if containerd is unresponsive or to work around fifo cancelling issues in // older containerd-shim ctx, cancel := context.WithTimeout(context.Background(), 45*time.Second) defer cancel() p.Delete(ctx) return nil, wrapError(err) } return process{p}, nil } func (t *task) Kill(ctx context.Context, signal syscall.Signal) error { return wrapError(t.Task.Kill(ctx, signal)) } func (p process) Kill(ctx context.Context, signal syscall.Signal) error { return wrapError(p.Process.Kill(ctx, signal)) } func (t *task) Pause(ctx context.Context) error { return wrapError(t.Task.Pause(ctx)) } func (t *task) Resume(ctx context.Context) error { return wrapError(t.Task.Resume(ctx)) } func (t *task) Stats(ctx context.Context) (*libcontainerdtypes.Stats, error) { m, err := t.Metrics(ctx) if err != nil { return nil, err } v, err := typeurl.UnmarshalAny(m.Data) if err != nil { return nil, err } return libcontainerdtypes.InterfaceToStats(protobuf.FromTimestamp(m.Timestamp), v), nil } func (t *task) Summary(ctx context.Context) ([]libcontainerdtypes.Summary, error) { pis, err := t.Pids(ctx) if err != nil { return nil, err } var infos []libcontainerdtypes.Summary for _, pi := range pis { i, err := typeurl.UnmarshalAny(pi.Info) if err != nil { return nil, errors.Wrap(err, "unable to decode process details") } s, err := summaryFromInterface(i) if err != nil { return nil, err } infos = append(infos, *s) } return infos, nil } func (t *task) Delete(ctx context.Context) (*containerd.ExitStatus, error) { s, err := t.Task.Delete(ctx) return s, wrapError(err) } func (p process) Delete(ctx context.Context) (*containerd.ExitStatus, error) { s, err := p.Process.Delete(ctx) return s, wrapError(err) } func (c *container) Delete(ctx context.Context) error { // Optimization: assume the DockerContainerBundlePath label has not been // updated since the container metadata was last loaded/refreshed. md, err := c.c8dCtr.Info(ctx, containerd.WithoutRefreshedMetadata) if err != nil { return err } bundle := md.Labels[DockerContainerBundlePath] if err := c.c8dCtr.Delete(ctx); err != nil { return wrapError(err) } if os.Getenv("LIBCONTAINERD_NOCLEAN") != "1" { if err := os.RemoveAll(bundle); err != nil { c.client.logger.WithContext(ctx).WithError(err).WithFields(log.Fields{ "container": c.c8dCtr.ID(), "bundle": bundle, }).Error("failed to remove state dir") } } return nil } func (t *task) ForceDelete(ctx context.Context) error { _, err := t.Task.Delete(ctx, containerd.WithProcessKill) return wrapError(err) } func (t *task) Status(ctx context.Context) (containerd.Status, error) { s, err := t.Task.Status(ctx) return s, wrapError(err) } func (p process) Status(ctx context.Context) (containerd.Status, error) { s, err := p.Process.Status(ctx) return s, wrapError(err) } func (c *container) getCheckpointOptions(exit bool) containerd.CheckpointTaskOpts { return func(r *containerd.CheckpointTaskInfo) error { if r.Options == nil && c.v2runcoptions != nil { r.Options = &v2runcoptions.CheckpointOptions{} } switch opts := r.Options.(type) { case *v2runcoptions.CheckpointOptions: opts.Exit = exit } return nil } } func (t *task) CreateCheckpoint(ctx context.Context, checkpointDir string, exit bool) error { img, err := t.Task.Checkpoint(ctx, t.ctr.getCheckpointOptions(exit)) if err != nil { return wrapError(err) } // Whatever happens, delete the checkpoint from containerd defer func() { err := t.ctr.client.client.ImageService().Delete(ctx, img.Name()) if err != nil { t.ctr.client.logger.WithError(err).WithField("digest", img.Target().Digest). Warnf("failed to delete checkpoint image") } }() b, err := content.ReadBlob(ctx, t.ctr.client.client.ContentStore(), img.Target()) if err != nil { return errdefs.System(errors.Wrapf(err, "failed to retrieve checkpoint data")) } var index ocispec.Index if err := json.Unmarshal(b, &index); err != nil { return errdefs.System(errors.Wrapf(err, "failed to decode checkpoint data")) } var cpDesc *ocispec.Descriptor for _, m := range index.Manifests { m := m if m.MediaType == images.MediaTypeContainerd1Checkpoint { cpDesc = &m //nolint:gosec break } } if cpDesc == nil { return errdefs.System(errors.Wrapf(err, "invalid checkpoint")) } rat, err := t.ctr.client.client.ContentStore().ReaderAt(ctx, *cpDesc) if err != nil { return errdefs.System(errors.Wrapf(err, "failed to get checkpoint reader")) } defer rat.Close() _, err = archive.Apply(ctx, checkpointDir, content.NewReader(rat)) if err != nil { return errdefs.System(errors.Wrapf(err, "failed to read checkpoint reader")) } return err } // LoadContainer loads the containerd container. func (c *client) LoadContainer(ctx context.Context, id string) (libcontainerdtypes.Container, error) { ctr, err := c.client.LoadContainer(ctx, id) if err != nil { if cerrdefs.IsNotFound(err) { return nil, errors.WithStack(errdefs.NotFound(errors.New("no such container"))) } return nil, wrapError(err) } return &container{client: c, c8dCtr: ctr}, nil } func (c *container) Task(ctx context.Context) (libcontainerdtypes.Task, error) { t, err := c.c8dCtr.Task(ctx, nil) if err != nil { return nil, wrapError(err) } return c.newTask(t), nil } // createIO creates the io to be used by a process // This needs to get a pointer to interface as upon closure the process may not have yet been registered func (c *container) createIO(fifos *cio.FIFOSet, stdinCloseSync chan containerd.Process, attachStdio libcontainerdtypes.StdioCallback) (cio.IO, error) { var ( io *cio.DirectIO err error ) io, err = c.client.newDirectIO(context.Background(), fifos) if err != nil { return nil, err } if io.Stdin != nil { var ( closeErr error stdinOnce sync.Once ) pipe := io.Stdin io.Stdin = ioutils.NewWriteCloserWrapper(pipe, func() error { stdinOnce.Do(func() { closeErr = pipe.Close() select { case p, ok := <-stdinCloseSync: if !ok { return } if err := closeStdin(context.Background(), p); err != nil { if closeErr != nil { closeErr = multierror.Append(closeErr, err) } else { // Avoid wrapping a single error in a multierror. closeErr = err } } default: // The process wasn't ready. Close its stdin asynchronously. go func() { p, ok := <-stdinCloseSync if !ok { return } if err := closeStdin(context.Background(), p); err != nil { c.client.logger.WithError(err). WithField("container", c.c8dCtr.ID()). Error("failed to close container stdin") } }() } }) return closeErr }) } rio, err := attachStdio(io) if err != nil { io.Cancel() io.Close() } return rio, err } func closeStdin(ctx context.Context, p containerd.Process) error { err := p.CloseIO(ctx, containerd.WithStdinCloser) if err != nil && strings.Contains(err.Error(), "transport is closing") { err = nil } return err } func (c *client) processEvent(ctx context.Context, et libcontainerdtypes.EventType, ei libcontainerdtypes.EventInfo) { c.eventQ.Append(ei.ContainerID, func() { err := c.backend.ProcessEvent(ei.ContainerID, et, ei) if err != nil { c.logger.WithContext(ctx).WithError(err).WithFields(log.Fields{ "container": ei.ContainerID, "event": et, "event-info": ei, }).Error("failed to process event") } }) } func (c *client) waitServe(ctx context.Context) bool { t := 100 * time.Millisecond delay := time.NewTimer(t) if !delay.Stop() { <-delay.C } defer delay.Stop() // `IsServing` will actually block until the service is ready. // However it can return early, so we'll loop with a delay to handle it. for { serving, err := c.client.IsServing(ctx) if err != nil { if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { return false } log.G(ctx).WithError(err).Warn("Error while testing if containerd API is ready") } if serving { return true } delay.Reset(t) select { case <-ctx.Done(): return false case <-delay.C: } } } func (c *client) processEventStream(ctx context.Context, ns string) { // Create a new context specifically for this subscription. // The context must be cancelled to cancel the subscription. // In cases where we have to restart event stream processing, // we'll need the original context b/c this one will be cancelled subCtx, cancel := context.WithCancel(ctx) defer cancel() // Filter on both namespace *and* topic. To create an "and" filter, // this must be a single, comma-separated string eventStream, errC := c.client.EventService().Subscribe(subCtx, "namespace=="+ns+",topic~=|^/tasks/|") c.logger.Debug("processing event stream") for { select { case err := <-errC: if err != nil { errStatus, ok := status.FromError(err) if !ok || errStatus.Code() != codes.Canceled { c.logger.WithError(err).Error("Failed to get event") c.logger.Info("Waiting for containerd to be ready to restart event processing") if c.waitServe(ctx) { go c.processEventStream(ctx, ns) return } } c.logger.WithError(ctx.Err()).Info("stopping event stream following graceful shutdown") } return case ev := <-eventStream: if ev.Event == nil { c.logger.WithField("event", ev).Warn("invalid event") continue } v, err := typeurl.UnmarshalAny(ev.Event) if err != nil { c.logger.WithError(err).WithField("event", ev).Warn("failed to unmarshal event") continue } c.logger.WithField("topic", ev.Topic).Debug("event") switch t := v.(type) { case *apievents.TaskCreate: c.processEvent(ctx, libcontainerdtypes.EventCreate, libcontainerdtypes.EventInfo{ ContainerID: t.ContainerID, ProcessID: t.ContainerID, Pid: t.Pid, }) case *apievents.TaskStart: c.processEvent(ctx, libcontainerdtypes.EventStart, libcontainerdtypes.EventInfo{ ContainerID: t.ContainerID, ProcessID: t.ContainerID, Pid: t.Pid, }) case *apievents.TaskExit: c.processEvent(ctx, libcontainerdtypes.EventExit, libcontainerdtypes.EventInfo{ ContainerID: t.ContainerID, ProcessID: t.ID, Pid: t.Pid, ExitCode: t.ExitStatus, ExitedAt: protobuf.FromTimestamp(t.ExitedAt), }) case *apievents.TaskOOM: c.processEvent(ctx, libcontainerdtypes.EventOOM, libcontainerdtypes.EventInfo{ ContainerID: t.ContainerID, }) case *apievents.TaskExecAdded: c.processEvent(ctx, libcontainerdtypes.EventExecAdded, libcontainerdtypes.EventInfo{ ContainerID: t.ContainerID, ProcessID: t.ExecID, }) case *apievents.TaskExecStarted: c.processEvent(ctx, libcontainerdtypes.EventExecStarted, libcontainerdtypes.EventInfo{ ContainerID: t.ContainerID, ProcessID: t.ExecID, Pid: t.Pid, }) case *apievents.TaskPaused: c.processEvent(ctx, libcontainerdtypes.EventPaused, libcontainerdtypes.EventInfo{ ContainerID: t.ContainerID, }) case *apievents.TaskResumed: c.processEvent(ctx, libcontainerdtypes.EventResumed, libcontainerdtypes.EventInfo{ ContainerID: t.ContainerID, }) case *apievents.TaskDelete: c.logger.WithFields(log.Fields{ "topic": ev.Topic, "type": reflect.TypeOf(t), "container": t.ContainerID, }).Info("ignoring event") default: c.logger.WithFields(log.Fields{ "topic": ev.Topic, "type": reflect.TypeOf(t), }).Info("ignoring event") } } } } func (c *client) writeContent(ctx context.Context, mediaType, ref string, r io.Reader) (*types.Descriptor, error) { writer, err := c.client.ContentStore().Writer(ctx, content.WithRef(ref)) if err != nil { return nil, err } defer writer.Close() size, err := io.Copy(writer, r) if err != nil { return nil, err } labels := map[string]string{ "containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339), } if err := writer.Commit(ctx, 0, "", content.WithLabels(labels)); err != nil { return nil, err } return &types.Descriptor{ MediaType: mediaType, Digest: writer.Digest().String(), Size: size, }, nil } func (c *client) bundleDir(id string) string { return filepath.Join(c.stateDir, id) } func wrapError(err error) error { switch { case err == nil: return nil case cerrdefs.IsNotFound(err): return errdefs.NotFound(err) } msg := err.Error() for _, s := range []string{"container does not exist", "not found", "no such container"} { if strings.Contains(msg, s) { return errdefs.NotFound(err) } } return err }