da643c0b8a
For current implementation of Checkpoint Restore (C/R) in docker, it will write the checkpoint to content store. However, when restoring libcontainerd uses .Digest().Encoded(), which will remove the info of alg, leading to error. Signed-off-by: huang-jl <1046678590@qq.com>
750 lines
21 KiB
Go
750 lines
21 KiB
Go
package remote // import "github.com/docker/docker/libcontainerd/remote"
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"reflect"
|
|
"runtime"
|
|
"strings"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/containerd/containerd"
|
|
apievents "github.com/containerd/containerd/api/events"
|
|
"github.com/containerd/containerd/api/types"
|
|
"github.com/containerd/containerd/archive"
|
|
"github.com/containerd/containerd/cio"
|
|
"github.com/containerd/containerd/content"
|
|
cerrdefs "github.com/containerd/containerd/errdefs"
|
|
"github.com/containerd/containerd/images"
|
|
"github.com/containerd/containerd/protobuf"
|
|
v2runcoptions "github.com/containerd/containerd/runtime/v2/runc/options"
|
|
"github.com/containerd/log"
|
|
"github.com/containerd/typeurl/v2"
|
|
"github.com/docker/docker/errdefs"
|
|
"github.com/docker/docker/libcontainerd/queue"
|
|
libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
|
|
"github.com/docker/docker/pkg/ioutils"
|
|
"github.com/hashicorp/go-multierror"
|
|
"github.com/opencontainers/go-digest"
|
|
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
|
specs "github.com/opencontainers/runtime-spec/specs-go"
|
|
"github.com/pkg/errors"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
"google.golang.org/protobuf/proto"
|
|
)
|
|
|
|
// DockerContainerBundlePath is the label key pointing to the container's bundle path
|
|
const DockerContainerBundlePath = "com.docker/engine.bundle.path"
|
|
|
|
type client struct {
|
|
client *containerd.Client
|
|
stateDir string
|
|
logger *log.Entry
|
|
ns string
|
|
|
|
backend libcontainerdtypes.Backend
|
|
eventQ queue.Queue
|
|
}
|
|
|
|
type container struct {
|
|
client *client
|
|
c8dCtr containerd.Container
|
|
|
|
v2runcoptions *v2runcoptions.Options
|
|
}
|
|
|
|
type task struct {
|
|
containerd.Task
|
|
ctr *container
|
|
}
|
|
|
|
type process struct {
|
|
containerd.Process
|
|
}
|
|
|
|
// NewClient creates a new libcontainerd client from a containerd client
|
|
func NewClient(ctx context.Context, cli *containerd.Client, stateDir, ns string, b libcontainerdtypes.Backend) (libcontainerdtypes.Client, error) {
|
|
c := &client{
|
|
client: cli,
|
|
stateDir: stateDir,
|
|
logger: log.G(ctx).WithField("module", "libcontainerd").WithField("namespace", ns),
|
|
ns: ns,
|
|
backend: b,
|
|
}
|
|
|
|
go c.processEventStream(ctx, ns)
|
|
|
|
return c, nil
|
|
}
|
|
|
|
func (c *client) Version(ctx context.Context) (containerd.Version, error) {
|
|
return c.client.Version(ctx)
|
|
}
|
|
|
|
func (c *container) newTask(t containerd.Task) *task {
|
|
return &task{Task: t, ctr: c}
|
|
}
|
|
|
|
func (c *container) AttachTask(ctx context.Context, attachStdio libcontainerdtypes.StdioCallback) (_ libcontainerdtypes.Task, err error) {
|
|
var dio *cio.DirectIO
|
|
defer func() {
|
|
if err != nil && dio != nil {
|
|
dio.Cancel()
|
|
dio.Close()
|
|
}
|
|
}()
|
|
|
|
attachIO := func(fifos *cio.FIFOSet) (cio.IO, error) {
|
|
// dio must be assigned to the previously defined dio for the defer above
|
|
// to handle cleanup
|
|
dio, err = c.client.newDirectIO(ctx, fifos)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return attachStdio(dio)
|
|
}
|
|
t, err := c.c8dCtr.Task(ctx, attachIO)
|
|
if err != nil {
|
|
return nil, errors.Wrap(wrapError(err), "error getting containerd task for container")
|
|
}
|
|
return c.newTask(t), nil
|
|
}
|
|
|
|
func (c *client) NewContainer(ctx context.Context, id string, ociSpec *specs.Spec, shim string, runtimeOptions interface{}, opts ...containerd.NewContainerOpts) (libcontainerdtypes.Container, error) {
|
|
bdir := c.bundleDir(id)
|
|
c.logger.WithField("bundle", bdir).WithField("root", ociSpec.Root.Path).Debug("bundle dir created")
|
|
|
|
newOpts := []containerd.NewContainerOpts{
|
|
containerd.WithSpec(ociSpec),
|
|
containerd.WithRuntime(shim, runtimeOptions),
|
|
WithBundle(bdir, ociSpec),
|
|
}
|
|
opts = append(opts, newOpts...)
|
|
|
|
ctr, err := c.client.NewContainer(ctx, id, opts...)
|
|
if err != nil {
|
|
if cerrdefs.IsAlreadyExists(err) {
|
|
return nil, errors.WithStack(errdefs.Conflict(errors.New("id already in use")))
|
|
}
|
|
return nil, wrapError(err)
|
|
}
|
|
|
|
created := container{
|
|
client: c,
|
|
c8dCtr: ctr,
|
|
}
|
|
if x, ok := runtimeOptions.(*v2runcoptions.Options); ok {
|
|
created.v2runcoptions = x
|
|
}
|
|
return &created, nil
|
|
}
|
|
|
|
// NewTask creates a task for the specified containerd id
|
|
func (c *container) NewTask(ctx context.Context, checkpointDir string, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (libcontainerdtypes.Task, error) {
|
|
var (
|
|
checkpoint *types.Descriptor
|
|
t containerd.Task
|
|
rio cio.IO
|
|
stdinCloseSync = make(chan containerd.Process, 1)
|
|
)
|
|
|
|
if checkpointDir != "" {
|
|
// write checkpoint to the content store
|
|
tar := archive.Diff(ctx, "", checkpointDir)
|
|
var err error
|
|
checkpoint, err = c.client.writeContent(ctx, images.MediaTypeContainerd1Checkpoint, checkpointDir, tar)
|
|
// remove the checkpoint when we're done
|
|
defer func() {
|
|
if checkpoint != nil {
|
|
err := c.client.client.ContentStore().Delete(ctx, digest.Digest(checkpoint.Digest))
|
|
if err != nil {
|
|
c.client.logger.WithError(err).WithFields(log.Fields{
|
|
"ref": checkpointDir,
|
|
"digest": checkpoint.Digest,
|
|
}).Warnf("failed to delete temporary checkpoint entry")
|
|
}
|
|
}
|
|
}()
|
|
if err := tar.Close(); err != nil {
|
|
return nil, errors.Wrap(err, "failed to close checkpoint tar stream")
|
|
}
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "failed to upload checkpoint to containerd")
|
|
}
|
|
}
|
|
|
|
// Optimization: assume the relevant metadata has not changed in the
|
|
// moment since the container was created. Elide redundant RPC requests
|
|
// to refresh the metadata separately for spec and labels.
|
|
md, err := c.c8dCtr.Info(ctx, containerd.WithoutRefreshedMetadata)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to retrieve metadata")
|
|
}
|
|
bundle := md.Labels[DockerContainerBundlePath]
|
|
|
|
var spec specs.Spec
|
|
if err := json.Unmarshal(md.Spec.GetValue(), &spec); err != nil {
|
|
return nil, errors.Wrap(err, "failed to retrieve spec")
|
|
}
|
|
uid, gid := getSpecUser(&spec)
|
|
|
|
taskOpts := []containerd.NewTaskOpts{
|
|
func(_ context.Context, _ *containerd.Client, info *containerd.TaskInfo) error {
|
|
info.Checkpoint = checkpoint
|
|
return nil
|
|
},
|
|
}
|
|
|
|
if runtime.GOOS != "windows" {
|
|
taskOpts = append(taskOpts, func(_ context.Context, _ *containerd.Client, info *containerd.TaskInfo) error {
|
|
if c.v2runcoptions != nil {
|
|
opts := proto.Clone(c.v2runcoptions).(*v2runcoptions.Options)
|
|
opts.IoUid = uint32(uid)
|
|
opts.IoGid = uint32(gid)
|
|
info.Options = opts
|
|
}
|
|
return nil
|
|
})
|
|
} else {
|
|
taskOpts = append(taskOpts, withLogLevel(c.client.logger.Level))
|
|
}
|
|
|
|
t, err = c.c8dCtr.NewTask(ctx,
|
|
func(id string) (cio.IO, error) {
|
|
fifos := newFIFOSet(bundle, id, withStdin, spec.Process.Terminal)
|
|
|
|
rio, err = c.createIO(fifos, stdinCloseSync, attachStdio)
|
|
return rio, err
|
|
},
|
|
taskOpts...,
|
|
)
|
|
if err != nil {
|
|
close(stdinCloseSync)
|
|
if rio != nil {
|
|
rio.Cancel()
|
|
rio.Close()
|
|
}
|
|
return nil, errors.Wrap(wrapError(err), "failed to create task for container")
|
|
}
|
|
|
|
// Signal c.createIO that it can call CloseIO
|
|
stdinCloseSync <- t
|
|
|
|
return c.newTask(t), nil
|
|
}
|
|
|
|
func (t *task) Start(ctx context.Context) error {
|
|
return wrapError(t.Task.Start(ctx))
|
|
|
|
}
|
|
|
|
// Exec creates exec process.
|
|
//
|
|
// The containerd client calls Exec to register the exec config in the shim side.
|
|
// When the client calls Start, the shim will create stdin fifo if needs. But
|
|
// for the container main process, the stdin fifo will be created in Create not
|
|
// the Start call. stdinCloseSync channel should be closed after Start exec
|
|
// process.
|
|
func (t *task) Exec(ctx context.Context, processID string, spec *specs.Process, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (libcontainerdtypes.Process, error) {
|
|
var (
|
|
p containerd.Process
|
|
rio cio.IO
|
|
stdinCloseSync = make(chan containerd.Process, 1)
|
|
)
|
|
|
|
// Optimization: assume the DockerContainerBundlePath label has not been
|
|
// updated since the container metadata was last loaded/refreshed.
|
|
md, err := t.ctr.c8dCtr.Info(ctx, containerd.WithoutRefreshedMetadata)
|
|
if err != nil {
|
|
return nil, wrapError(err)
|
|
}
|
|
|
|
fifos := newFIFOSet(md.Labels[DockerContainerBundlePath], processID, withStdin, spec.Terminal)
|
|
|
|
defer func() {
|
|
if err != nil {
|
|
if rio != nil {
|
|
rio.Cancel()
|
|
rio.Close()
|
|
}
|
|
}
|
|
}()
|
|
|
|
p, err = t.Task.Exec(ctx, processID, spec, func(id string) (cio.IO, error) {
|
|
rio, err = t.ctr.createIO(fifos, stdinCloseSync, attachStdio)
|
|
return rio, err
|
|
})
|
|
if err != nil {
|
|
close(stdinCloseSync)
|
|
if cerrdefs.IsAlreadyExists(err) {
|
|
return nil, errors.WithStack(errdefs.Conflict(errors.New("id already in use")))
|
|
}
|
|
return nil, wrapError(err)
|
|
}
|
|
|
|
// Signal c.createIO that it can call CloseIO
|
|
//
|
|
// the stdin of exec process will be created after p.Start in containerd
|
|
defer func() { stdinCloseSync <- p }()
|
|
|
|
if err = p.Start(ctx); err != nil {
|
|
// use new context for cleanup because old one may be cancelled by user, but leave a timeout to make sure
|
|
// we are not waiting forever if containerd is unresponsive or to work around fifo cancelling issues in
|
|
// older containerd-shim
|
|
ctx, cancel := context.WithTimeout(context.Background(), 45*time.Second)
|
|
defer cancel()
|
|
p.Delete(ctx)
|
|
return nil, wrapError(err)
|
|
}
|
|
return process{p}, nil
|
|
}
|
|
|
|
func (t *task) Kill(ctx context.Context, signal syscall.Signal) error {
|
|
return wrapError(t.Task.Kill(ctx, signal))
|
|
}
|
|
|
|
func (p process) Kill(ctx context.Context, signal syscall.Signal) error {
|
|
return wrapError(p.Process.Kill(ctx, signal))
|
|
}
|
|
|
|
func (t *task) Pause(ctx context.Context) error {
|
|
return wrapError(t.Task.Pause(ctx))
|
|
}
|
|
|
|
func (t *task) Resume(ctx context.Context) error {
|
|
return wrapError(t.Task.Resume(ctx))
|
|
}
|
|
|
|
func (t *task) Stats(ctx context.Context) (*libcontainerdtypes.Stats, error) {
|
|
m, err := t.Metrics(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
v, err := typeurl.UnmarshalAny(m.Data)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return libcontainerdtypes.InterfaceToStats(protobuf.FromTimestamp(m.Timestamp), v), nil
|
|
}
|
|
|
|
func (t *task) Summary(ctx context.Context) ([]libcontainerdtypes.Summary, error) {
|
|
pis, err := t.Pids(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var infos []libcontainerdtypes.Summary
|
|
for _, pi := range pis {
|
|
i, err := typeurl.UnmarshalAny(pi.Info)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "unable to decode process details")
|
|
}
|
|
s, err := summaryFromInterface(i)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
infos = append(infos, *s)
|
|
}
|
|
|
|
return infos, nil
|
|
}
|
|
|
|
func (t *task) Delete(ctx context.Context) (*containerd.ExitStatus, error) {
|
|
s, err := t.Task.Delete(ctx)
|
|
return s, wrapError(err)
|
|
}
|
|
|
|
func (p process) Delete(ctx context.Context) (*containerd.ExitStatus, error) {
|
|
s, err := p.Process.Delete(ctx)
|
|
return s, wrapError(err)
|
|
}
|
|
|
|
func (c *container) Delete(ctx context.Context) error {
|
|
// Optimization: assume the DockerContainerBundlePath label has not been
|
|
// updated since the container metadata was last loaded/refreshed.
|
|
md, err := c.c8dCtr.Info(ctx, containerd.WithoutRefreshedMetadata)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
bundle := md.Labels[DockerContainerBundlePath]
|
|
if err := c.c8dCtr.Delete(ctx); err != nil {
|
|
return wrapError(err)
|
|
}
|
|
if os.Getenv("LIBCONTAINERD_NOCLEAN") != "1" {
|
|
if err := os.RemoveAll(bundle); err != nil {
|
|
c.client.logger.WithContext(ctx).WithError(err).WithFields(log.Fields{
|
|
"container": c.c8dCtr.ID(),
|
|
"bundle": bundle,
|
|
}).Error("failed to remove state dir")
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (t *task) ForceDelete(ctx context.Context) error {
|
|
_, err := t.Task.Delete(ctx, containerd.WithProcessKill)
|
|
return wrapError(err)
|
|
}
|
|
|
|
func (t *task) Status(ctx context.Context) (containerd.Status, error) {
|
|
s, err := t.Task.Status(ctx)
|
|
return s, wrapError(err)
|
|
}
|
|
|
|
func (p process) Status(ctx context.Context) (containerd.Status, error) {
|
|
s, err := p.Process.Status(ctx)
|
|
return s, wrapError(err)
|
|
}
|
|
|
|
func (c *container) getCheckpointOptions(exit bool) containerd.CheckpointTaskOpts {
|
|
return func(r *containerd.CheckpointTaskInfo) error {
|
|
if r.Options == nil && c.v2runcoptions != nil {
|
|
r.Options = &v2runcoptions.CheckpointOptions{}
|
|
}
|
|
|
|
switch opts := r.Options.(type) {
|
|
case *v2runcoptions.CheckpointOptions:
|
|
opts.Exit = exit
|
|
}
|
|
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (t *task) CreateCheckpoint(ctx context.Context, checkpointDir string, exit bool) error {
|
|
img, err := t.Task.Checkpoint(ctx, t.ctr.getCheckpointOptions(exit))
|
|
if err != nil {
|
|
return wrapError(err)
|
|
}
|
|
// Whatever happens, delete the checkpoint from containerd
|
|
defer func() {
|
|
err := t.ctr.client.client.ImageService().Delete(ctx, img.Name())
|
|
if err != nil {
|
|
t.ctr.client.logger.WithError(err).WithField("digest", img.Target().Digest).
|
|
Warnf("failed to delete checkpoint image")
|
|
}
|
|
}()
|
|
|
|
b, err := content.ReadBlob(ctx, t.ctr.client.client.ContentStore(), img.Target())
|
|
if err != nil {
|
|
return errdefs.System(errors.Wrapf(err, "failed to retrieve checkpoint data"))
|
|
}
|
|
var index ocispec.Index
|
|
if err := json.Unmarshal(b, &index); err != nil {
|
|
return errdefs.System(errors.Wrapf(err, "failed to decode checkpoint data"))
|
|
}
|
|
|
|
var cpDesc *ocispec.Descriptor
|
|
for _, m := range index.Manifests {
|
|
m := m
|
|
if m.MediaType == images.MediaTypeContainerd1Checkpoint {
|
|
cpDesc = &m //nolint:gosec
|
|
break
|
|
}
|
|
}
|
|
if cpDesc == nil {
|
|
return errdefs.System(errors.Wrapf(err, "invalid checkpoint"))
|
|
}
|
|
|
|
rat, err := t.ctr.client.client.ContentStore().ReaderAt(ctx, *cpDesc)
|
|
if err != nil {
|
|
return errdefs.System(errors.Wrapf(err, "failed to get checkpoint reader"))
|
|
}
|
|
defer rat.Close()
|
|
_, err = archive.Apply(ctx, checkpointDir, content.NewReader(rat))
|
|
if err != nil {
|
|
return errdefs.System(errors.Wrapf(err, "failed to read checkpoint reader"))
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// LoadContainer loads the containerd container.
|
|
func (c *client) LoadContainer(ctx context.Context, id string) (libcontainerdtypes.Container, error) {
|
|
ctr, err := c.client.LoadContainer(ctx, id)
|
|
if err != nil {
|
|
if cerrdefs.IsNotFound(err) {
|
|
return nil, errors.WithStack(errdefs.NotFound(errors.New("no such container")))
|
|
}
|
|
return nil, wrapError(err)
|
|
}
|
|
return &container{client: c, c8dCtr: ctr}, nil
|
|
}
|
|
|
|
func (c *container) Task(ctx context.Context) (libcontainerdtypes.Task, error) {
|
|
t, err := c.c8dCtr.Task(ctx, nil)
|
|
if err != nil {
|
|
return nil, wrapError(err)
|
|
}
|
|
return c.newTask(t), nil
|
|
}
|
|
|
|
// createIO creates the io to be used by a process
|
|
// This needs to get a pointer to interface as upon closure the process may not have yet been registered
|
|
func (c *container) createIO(fifos *cio.FIFOSet, stdinCloseSync chan containerd.Process, attachStdio libcontainerdtypes.StdioCallback) (cio.IO, error) {
|
|
var (
|
|
io *cio.DirectIO
|
|
err error
|
|
)
|
|
io, err = c.client.newDirectIO(context.Background(), fifos)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if io.Stdin != nil {
|
|
var (
|
|
closeErr error
|
|
stdinOnce sync.Once
|
|
)
|
|
pipe := io.Stdin
|
|
io.Stdin = ioutils.NewWriteCloserWrapper(pipe, func() error {
|
|
stdinOnce.Do(func() {
|
|
closeErr = pipe.Close()
|
|
|
|
select {
|
|
case p, ok := <-stdinCloseSync:
|
|
if !ok {
|
|
return
|
|
}
|
|
if err := closeStdin(context.Background(), p); err != nil {
|
|
if closeErr != nil {
|
|
closeErr = multierror.Append(closeErr, err)
|
|
} else {
|
|
// Avoid wrapping a single error in a multierror.
|
|
closeErr = err
|
|
}
|
|
}
|
|
default:
|
|
// The process wasn't ready. Close its stdin asynchronously.
|
|
go func() {
|
|
p, ok := <-stdinCloseSync
|
|
if !ok {
|
|
return
|
|
}
|
|
if err := closeStdin(context.Background(), p); err != nil {
|
|
c.client.logger.WithError(err).
|
|
WithField("container", c.c8dCtr.ID()).
|
|
Error("failed to close container stdin")
|
|
}
|
|
}()
|
|
}
|
|
})
|
|
return closeErr
|
|
})
|
|
}
|
|
|
|
rio, err := attachStdio(io)
|
|
if err != nil {
|
|
io.Cancel()
|
|
io.Close()
|
|
}
|
|
return rio, err
|
|
}
|
|
|
|
func closeStdin(ctx context.Context, p containerd.Process) error {
|
|
err := p.CloseIO(ctx, containerd.WithStdinCloser)
|
|
if err != nil && strings.Contains(err.Error(), "transport is closing") {
|
|
err = nil
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (c *client) processEvent(ctx context.Context, et libcontainerdtypes.EventType, ei libcontainerdtypes.EventInfo) {
|
|
c.eventQ.Append(ei.ContainerID, func() {
|
|
err := c.backend.ProcessEvent(ei.ContainerID, et, ei)
|
|
if err != nil {
|
|
c.logger.WithContext(ctx).WithError(err).WithFields(log.Fields{
|
|
"container": ei.ContainerID,
|
|
"event": et,
|
|
"event-info": ei,
|
|
}).Error("failed to process event")
|
|
}
|
|
})
|
|
}
|
|
|
|
func (c *client) waitServe(ctx context.Context) bool {
|
|
t := 100 * time.Millisecond
|
|
delay := time.NewTimer(t)
|
|
if !delay.Stop() {
|
|
<-delay.C
|
|
}
|
|
defer delay.Stop()
|
|
|
|
// `IsServing` will actually block until the service is ready.
|
|
// However it can return early, so we'll loop with a delay to handle it.
|
|
for {
|
|
serving, err := c.client.IsServing(ctx)
|
|
if err != nil {
|
|
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
|
|
return false
|
|
}
|
|
log.G(ctx).WithError(err).Warn("Error while testing if containerd API is ready")
|
|
}
|
|
|
|
if serving {
|
|
return true
|
|
}
|
|
|
|
delay.Reset(t)
|
|
select {
|
|
case <-ctx.Done():
|
|
return false
|
|
case <-delay.C:
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *client) processEventStream(ctx context.Context, ns string) {
|
|
// Create a new context specifically for this subscription.
|
|
// The context must be cancelled to cancel the subscription.
|
|
// In cases where we have to restart event stream processing,
|
|
// we'll need the original context b/c this one will be cancelled
|
|
subCtx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
// Filter on both namespace *and* topic. To create an "and" filter,
|
|
// this must be a single, comma-separated string
|
|
eventStream, errC := c.client.EventService().Subscribe(subCtx, "namespace=="+ns+",topic~=|^/tasks/|")
|
|
|
|
c.logger.Debug("processing event stream")
|
|
|
|
for {
|
|
select {
|
|
case err := <-errC:
|
|
if err != nil {
|
|
errStatus, ok := status.FromError(err)
|
|
if !ok || errStatus.Code() != codes.Canceled {
|
|
c.logger.WithError(err).Error("Failed to get event")
|
|
c.logger.Info("Waiting for containerd to be ready to restart event processing")
|
|
if c.waitServe(ctx) {
|
|
go c.processEventStream(ctx, ns)
|
|
return
|
|
}
|
|
}
|
|
c.logger.WithError(ctx.Err()).Info("stopping event stream following graceful shutdown")
|
|
}
|
|
return
|
|
case ev := <-eventStream:
|
|
if ev.Event == nil {
|
|
c.logger.WithField("event", ev).Warn("invalid event")
|
|
continue
|
|
}
|
|
|
|
v, err := typeurl.UnmarshalAny(ev.Event)
|
|
if err != nil {
|
|
c.logger.WithError(err).WithField("event", ev).Warn("failed to unmarshal event")
|
|
continue
|
|
}
|
|
|
|
c.logger.WithField("topic", ev.Topic).Debug("event")
|
|
|
|
switch t := v.(type) {
|
|
case *apievents.TaskCreate:
|
|
c.processEvent(ctx, libcontainerdtypes.EventCreate, libcontainerdtypes.EventInfo{
|
|
ContainerID: t.ContainerID,
|
|
ProcessID: t.ContainerID,
|
|
Pid: t.Pid,
|
|
})
|
|
case *apievents.TaskStart:
|
|
c.processEvent(ctx, libcontainerdtypes.EventStart, libcontainerdtypes.EventInfo{
|
|
ContainerID: t.ContainerID,
|
|
ProcessID: t.ContainerID,
|
|
Pid: t.Pid,
|
|
})
|
|
case *apievents.TaskExit:
|
|
c.processEvent(ctx, libcontainerdtypes.EventExit, libcontainerdtypes.EventInfo{
|
|
ContainerID: t.ContainerID,
|
|
ProcessID: t.ID,
|
|
Pid: t.Pid,
|
|
ExitCode: t.ExitStatus,
|
|
ExitedAt: protobuf.FromTimestamp(t.ExitedAt),
|
|
})
|
|
case *apievents.TaskOOM:
|
|
c.processEvent(ctx, libcontainerdtypes.EventOOM, libcontainerdtypes.EventInfo{
|
|
ContainerID: t.ContainerID,
|
|
})
|
|
case *apievents.TaskExecAdded:
|
|
c.processEvent(ctx, libcontainerdtypes.EventExecAdded, libcontainerdtypes.EventInfo{
|
|
ContainerID: t.ContainerID,
|
|
ProcessID: t.ExecID,
|
|
})
|
|
case *apievents.TaskExecStarted:
|
|
c.processEvent(ctx, libcontainerdtypes.EventExecStarted, libcontainerdtypes.EventInfo{
|
|
ContainerID: t.ContainerID,
|
|
ProcessID: t.ExecID,
|
|
Pid: t.Pid,
|
|
})
|
|
case *apievents.TaskPaused:
|
|
c.processEvent(ctx, libcontainerdtypes.EventPaused, libcontainerdtypes.EventInfo{
|
|
ContainerID: t.ContainerID,
|
|
})
|
|
case *apievents.TaskResumed:
|
|
c.processEvent(ctx, libcontainerdtypes.EventResumed, libcontainerdtypes.EventInfo{
|
|
ContainerID: t.ContainerID,
|
|
})
|
|
case *apievents.TaskDelete:
|
|
c.logger.WithFields(log.Fields{
|
|
"topic": ev.Topic,
|
|
"type": reflect.TypeOf(t),
|
|
"container": t.ContainerID,
|
|
}).Info("ignoring event")
|
|
default:
|
|
c.logger.WithFields(log.Fields{
|
|
"topic": ev.Topic,
|
|
"type": reflect.TypeOf(t),
|
|
}).Info("ignoring event")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *client) writeContent(ctx context.Context, mediaType, ref string, r io.Reader) (*types.Descriptor, error) {
|
|
writer, err := c.client.ContentStore().Writer(ctx, content.WithRef(ref))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer writer.Close()
|
|
size, err := io.Copy(writer, r)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
labels := map[string]string{
|
|
"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339),
|
|
}
|
|
if err := writer.Commit(ctx, 0, "", content.WithLabels(labels)); err != nil {
|
|
return nil, err
|
|
}
|
|
return &types.Descriptor{
|
|
MediaType: mediaType,
|
|
Digest: writer.Digest().String(),
|
|
Size: size,
|
|
}, nil
|
|
}
|
|
|
|
func (c *client) bundleDir(id string) string {
|
|
return filepath.Join(c.stateDir, id)
|
|
}
|
|
|
|
func wrapError(err error) error {
|
|
switch {
|
|
case err == nil:
|
|
return nil
|
|
case cerrdefs.IsNotFound(err):
|
|
return errdefs.NotFound(err)
|
|
}
|
|
|
|
msg := err.Error()
|
|
for _, s := range []string{"container does not exist", "not found", "no such container"} {
|
|
if strings.Contains(msg, s) {
|
|
return errdefs.NotFound(err)
|
|
}
|
|
}
|
|
return err
|
|
}
|