Remove inmemory container map

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2019-03-25 16:17:17 -04:00
parent adb15c2899
commit b9b5dc37e3
9 changed files with 213 additions and 412 deletions

View file

@ -325,16 +325,17 @@ func (daemon *Daemon) restore() error {
alive bool
ec uint32
exitedAt time.Time
process libcontainerdtypes.Process
)
alive, _, err = daemon.containerd.Restore(context.Background(), c.ID, c.InitializeStdio)
alive, _, process, err = daemon.containerd.Restore(context.Background(), c.ID, c.InitializeStdio)
if err != nil && !errdefs.IsNotFound(err) {
logrus.Errorf("Failed to restore container %s with containerd: %s", c.ID, err)
return
}
if !alive {
ec, exitedAt, err = daemon.containerd.DeleteTask(context.Background(), c.ID)
if err != nil && !errdefs.IsNotFound(err) {
if !alive && process != nil {
ec, exitedAt, err = process.Delete(context.Background())
if err != nil {
logrus.WithError(err).Errorf("Failed to delete container %s from containerd", c.ID)
return
}

View file

@ -11,6 +11,13 @@ import (
specs "github.com/opencontainers/runtime-spec/specs-go"
)
type mockProcess struct {
}
func (m *mockProcess) Delete(_ context.Context) (uint32, time.Time, error) {
return 0, time.Time{}, nil
}
// Mock containerd client implementation, for unit tests.
type MockContainerdClient struct {
}
@ -18,8 +25,8 @@ type MockContainerdClient struct {
func (c *MockContainerdClient) Version(ctx context.Context) (containerd.Version, error) {
return containerd.Version{}, nil
}
func (c *MockContainerdClient) Restore(ctx context.Context, containerID string, attachStdio libcontainerdtypes.StdioCallback) (alive bool, pid int, err error) {
return false, 0, nil
func (c *MockContainerdClient) Restore(ctx context.Context, containerID string, attachStdio libcontainerdtypes.StdioCallback) (alive bool, pid int, p libcontainerdtypes.Process, err error) {
return false, 0, &mockProcess{}, nil
}
func (c *MockContainerdClient) Create(ctx context.Context, containerID string, spec *specs.Spec, runtimeOptions interface{}) error {
return nil

View file

@ -1085,7 +1085,7 @@ func (c *client) Stats(_ context.Context, containerID string) (*libcontainerdtyp
}
// Restore is the handler for restoring a container
func (c *client) Restore(ctx context.Context, id string, attachStdio libcontainerdtypes.StdioCallback) (bool, int, error) {
func (c *client) Restore(ctx context.Context, id string, attachStdio libcontainerdtypes.StdioCallback) (bool, int, libcontainerdtypes.Process, error) {
c.logger.WithField("container", id).Debug("restore()")
// TODO Windows: On RS1, a re-attach isn't possible.
@ -1107,10 +1107,13 @@ func (c *client) Restore(ctx context.Context, id string, attachStdio libcontaine
if err != nil {
c.logger.WithField("container", id).WithError(err).Debug("terminate failed on restore")
return false, -1, err
return false, -1, nil, err
}
}
return false, -1, nil
return false, -1, &restoredProcess{
c: c,
id: id,
}, nil
}
// GetPidsForContainer returns a list of process IDs running in a container.
@ -1153,6 +1156,15 @@ func (c *client) Summary(_ context.Context, containerID string) ([]libcontainerd
return pl, nil
}
type restoredProcess struct {
id string
c *client
}
func (p *restoredProcess) Delete(ctx context.Context) (uint32, time.Time, error) {
return p.c.DeleteTask(ctx, p.id)
}
func (c *client) DeleteTask(ctx context.Context, containerID string) (uint32, time.Time, error) {
ec := -1
ctr := c.getContainer(containerID)

View file

@ -38,86 +38,30 @@ import (
"google.golang.org/grpc/status"
)
type container struct {
mu sync.Mutex
bundleDir string
ctr containerd.Container
task containerd.Task
execs map[string]containerd.Process
oomKilled bool
}
func (c *container) setTask(t containerd.Task) {
c.mu.Lock()
c.task = t
c.mu.Unlock()
}
func (c *container) getTask() containerd.Task {
c.mu.Lock()
t := c.task
c.mu.Unlock()
return t
}
func (c *container) addProcess(id string, p containerd.Process) {
c.mu.Lock()
if c.execs == nil {
c.execs = make(map[string]containerd.Process)
}
c.execs[id] = p
c.mu.Unlock()
}
func (c *container) deleteProcess(id string) {
c.mu.Lock()
delete(c.execs, id)
c.mu.Unlock()
}
func (c *container) getProcess(id string) containerd.Process {
c.mu.Lock()
p := c.execs[id]
c.mu.Unlock()
return p
}
func (c *container) setOOMKilled(killed bool) {
c.mu.Lock()
c.oomKilled = killed
c.mu.Unlock()
}
func (c *container) getOOMKilled() bool {
c.mu.Lock()
killed := c.oomKilled
c.mu.Unlock()
return killed
}
// DockerContainerBundlePath is the label key pointing to the container's bundle path
const DockerContainerBundlePath = "com.docker/engine.bundle.path"
type client struct {
sync.RWMutex // protects containers map
client *containerd.Client
stateDir string
logger *logrus.Entry
ns string
backend libcontainerdtypes.Backend
eventQ queue.Queue
containers map[string]*container
backend libcontainerdtypes.Backend
eventQ queue.Queue
oomMu sync.Mutex
oom map[string]bool
}
// NewClient creates a new libcontainerd client from a containerd client
func NewClient(ctx context.Context, cli *containerd.Client, stateDir, ns string, b libcontainerdtypes.Backend) (libcontainerdtypes.Client, error) {
c := &client{
client: cli,
stateDir: stateDir,
logger: logrus.WithField("module", "libcontainerd").WithField("namespace", ns),
ns: ns,
backend: b,
containers: make(map[string]*container),
client: cli,
stateDir: stateDir,
logger: logrus.WithField("module", "libcontainerd").WithField("namespace", ns),
ns: ns,
backend: b,
oom: make(map[string]bool),
}
go c.processEventStream(ctx, ns)
@ -131,29 +75,7 @@ func (c *client) Version(ctx context.Context) (containerd.Version, error) {
// Restore loads the containerd container.
// It should not be called concurrently with any other operation for the given ID.
func (c *client) Restore(ctx context.Context, id string, attachStdio libcontainerdtypes.StdioCallback) (alive bool, pid int, err error) {
c.Lock()
_, ok := c.containers[id]
if ok {
c.Unlock()
return false, 0, errors.WithStack(errdefs.Conflict(errors.New("id already in use")))
}
cntr := &container{}
c.containers[id] = cntr
cntr.mu.Lock()
defer cntr.mu.Unlock()
c.Unlock()
defer func() {
if err != nil {
c.Lock()
delete(c.containers, id)
c.Unlock()
}
}()
func (c *client) Restore(ctx context.Context, id string, attachStdio libcontainerdtypes.StdioCallback) (alive bool, pid int, p libcontainerdtypes.Process, err error) {
var dio *cio.DirectIO
defer func() {
if err != nil && dio != nil {
@ -165,13 +87,12 @@ func (c *client) Restore(ctx context.Context, id string, attachStdio libcontaine
ctr, err := c.client.LoadContainer(ctx, id)
if err != nil {
return false, -1, errors.WithStack(wrapError(err))
return false, -1, nil, errors.WithStack(wrapError(err))
}
attachIO := func(fifos *cio.FIFOSet) (cio.IO, error) {
// dio must be assigned to the previously defined dio for the defer above
// to handle cleanup
dio, err = c.newDirectIO(ctx, fifos)
if err != nil {
return nil, err
@ -180,75 +101,57 @@ func (c *client) Restore(ctx context.Context, id string, attachStdio libcontaine
}
t, err := ctr.Task(ctx, attachIO)
if err != nil && !containerderrors.IsNotFound(err) {
return false, -1, errors.Wrap(wrapError(err), "error getting containerd task for container")
return false, -1, nil, errors.Wrap(wrapError(err), "error getting containerd task for container")
}
if t != nil {
s, err := t.Status(ctx)
if err != nil {
return false, -1, errors.Wrap(wrapError(err), "error getting task status")
return false, -1, nil, errors.Wrap(wrapError(err), "error getting task status")
}
alive = s.Status != containerd.Stopped
pid = int(t.Pid())
}
cntr.bundleDir = filepath.Join(c.stateDir, id)
cntr.ctr = ctr
cntr.task = t
// TODO(mlaventure): load execs
c.logger.WithFields(logrus.Fields{
"container": id,
"alive": alive,
"pid": pid,
}).Debug("restored container")
return alive, pid, nil
return alive, pid, &restoredProcess{
p: t,
}, nil
}
func (c *client) Create(ctx context.Context, id string, ociSpec *specs.Spec, runtimeOptions interface{}) error {
if ctr := c.getContainer(id); ctr != nil {
return errors.WithStack(errdefs.Conflict(errors.New("id already in use")))
}
bdir := filepath.Join(c.stateDir, id)
bdir := c.bundleDir(id)
c.logger.WithField("bundle", bdir).WithField("root", ociSpec.Root.Path).Debug("bundle dir created")
cdCtr, err := c.client.NewContainer(ctx, id,
_, err := c.client.NewContainer(ctx, id,
containerd.WithSpec(ociSpec),
containerd.WithRuntime(runtimeName, runtimeOptions),
WithBundle(bdir, ociSpec),
)
if err != nil {
if containerderrors.IsAlreadyExists(err) {
return errors.WithStack(errdefs.Conflict(errors.New("id already in use")))
}
return wrapError(err)
}
c.Lock()
c.containers[id] = &container{
bundleDir: bdir,
ctr: cdCtr,
}
c.Unlock()
return nil
}
// Start create and start a task for the specified containerd id
func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (int, error) {
ctr := c.getContainer(id)
if ctr == nil {
return -1, errors.WithStack(errdefs.NotFound(errors.New("no such container")))
ctr, err := c.getContainer(ctx, id)
if err != nil {
return -1, err
}
if t := ctr.getTask(); t != nil {
return -1, errors.WithStack(errdefs.Conflict(errors.New("container already started")))
}
var (
cp *types.Descriptor
t containerd.Task
rio cio.IO
err error
stdinCloseSync = make(chan struct{})
)
@ -276,14 +179,19 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin
}
}
spec, err := ctr.ctr.Spec(ctx)
spec, err := ctr.Spec(ctx)
if err != nil {
return -1, errors.Wrap(err, "failed to retrieve spec")
}
labels, err := ctr.Labels(ctx)
if err != nil {
return -1, errors.Wrap(err, "failed to retreive labels")
}
bundle := labels[DockerContainerBundlePath]
uid, gid := getSpecUser(spec)
t, err = ctr.ctr.NewTask(ctx,
t, err = ctr.NewTask(ctx,
func(id string) (cio.IO, error) {
fifos := newFIFOSet(ctr.bundleDir, libcontainerdtypes.InitProcessName, withStdin, spec.Process.Terminal)
fifos := newFIFOSet(bundle, libcontainerdtypes.InitProcessName, withStdin, spec.Process.Terminal)
rio, err = c.createIO(fifos, id, libcontainerdtypes.InitProcessName, stdinCloseSync, attachStdio)
return rio, err
@ -313,8 +221,6 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin
return -1, wrapError(err)
}
ctr.setTask(t)
// Signal c.createIO that it can call CloseIO
close(stdinCloseSync)
@ -323,7 +229,6 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin
c.logger.WithError(err).WithField("container", id).
Error("failed to delete task after fail start")
}
ctr.setTask(nil)
return -1, wrapError(err)
}
@ -338,27 +243,30 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin
// the Start call. stdinCloseSync channel should be closed after Start exec
// process.
func (c *client) Exec(ctx context.Context, containerID, processID string, spec *specs.Process, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (int, error) {
ctr := c.getContainer(containerID)
if ctr == nil {
return -1, errors.WithStack(errdefs.NotFound(errors.New("no such container")))
ctr, err := c.getContainer(ctx, containerID)
if err != nil {
return -1, err
}
t := ctr.getTask()
if t == nil {
return -1, errors.WithStack(errdefs.InvalidParameter(errors.New("container is not running")))
}
if p := ctr.getProcess(processID); p != nil {
return -1, errors.WithStack(errdefs.Conflict(errors.New("id already in use")))
t, err := ctr.Task(ctx, nil)
if err != nil {
if containerderrors.IsNotFound(err) {
return -1, errors.WithStack(errdefs.InvalidParameter(errors.New("container is not running")))
}
return -1, wrapError(err)
}
var (
p containerd.Process
rio cio.IO
err error
stdinCloseSync = make(chan struct{})
)
fifos := newFIFOSet(ctr.bundleDir, processID, withStdin, spec.Terminal)
labels, err := ctr.Labels(ctx)
if err != nil {
return -1, wrapError(err)
}
fifos := newFIFOSet(labels[DockerContainerBundlePath], processID, withStdin, spec.Terminal)
defer func() {
if err != nil {
@ -375,11 +283,12 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec *
})
if err != nil {
close(stdinCloseSync)
if containerderrors.IsAlreadyExists(err) {
return -1, errors.WithStack(errdefs.Conflict(errors.New("id already in use")))
}
return -1, wrapError(err)
}
ctr.addProcess(processID, p)
// Signal c.createIO that it can call CloseIO
//
// the stdin of exec process will be created after p.Start in containerd
@ -392,15 +301,13 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec *
ctx, cancel := context.WithTimeout(context.Background(), 45*time.Second)
defer cancel()
p.Delete(ctx)
ctr.deleteProcess(processID)
return -1, wrapError(err)
}
return int(p.Pid()), nil
}
func (c *client) SignalProcess(ctx context.Context, containerID, processID string, signal int) error {
p, err := c.getProcess(containerID, processID)
p, err := c.getProcess(ctx, containerID, processID)
if err != nil {
return err
}
@ -408,7 +315,7 @@ func (c *client) SignalProcess(ctx context.Context, containerID, processID strin
}
func (c *client) ResizeTerminal(ctx context.Context, containerID, processID string, width, height int) error {
p, err := c.getProcess(containerID, processID)
p, err := c.getProcess(ctx, containerID, processID)
if err != nil {
return err
}
@ -417,7 +324,7 @@ func (c *client) ResizeTerminal(ctx context.Context, containerID, processID stri
}
func (c *client) CloseStdin(ctx context.Context, containerID, processID string) error {
p, err := c.getProcess(containerID, processID)
p, err := c.getProcess(ctx, containerID, processID)
if err != nil {
return err
}
@ -426,7 +333,7 @@ func (c *client) CloseStdin(ctx context.Context, containerID, processID string)
}
func (c *client) Pause(ctx context.Context, containerID string) error {
p, err := c.getProcess(containerID, libcontainerdtypes.InitProcessName)
p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
if err != nil {
return err
}
@ -435,7 +342,7 @@ func (c *client) Pause(ctx context.Context, containerID string) error {
}
func (c *client) Resume(ctx context.Context, containerID string) error {
p, err := c.getProcess(containerID, libcontainerdtypes.InitProcessName)
p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
if err != nil {
return err
}
@ -444,7 +351,7 @@ func (c *client) Resume(ctx context.Context, containerID string) error {
}
func (c *client) Stats(ctx context.Context, containerID string) (*libcontainerdtypes.Stats, error) {
p, err := c.getProcess(containerID, libcontainerdtypes.InitProcessName)
p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
if err != nil {
return nil, err
}
@ -462,7 +369,7 @@ func (c *client) Stats(ctx context.Context, containerID string) (*libcontainerdt
}
func (c *client) ListPids(ctx context.Context, containerID string) ([]uint32, error) {
p, err := c.getProcess(containerID, libcontainerdtypes.InitProcessName)
p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
if err != nil {
return nil, err
}
@ -481,7 +388,7 @@ func (c *client) ListPids(ctx context.Context, containerID string) ([]uint32, er
}
func (c *client) Summary(ctx context.Context, containerID string) ([]libcontainerdtypes.Summary, error) {
p, err := c.getProcess(containerID, libcontainerdtypes.InitProcessName)
p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
if err != nil {
return nil, err
}
@ -507,68 +414,75 @@ func (c *client) Summary(ctx context.Context, containerID string) ([]libcontaine
return infos, nil
}
type restoredProcess struct {
p containerd.Process
}
func (p *restoredProcess) Delete(ctx context.Context) (uint32, time.Time, error) {
if p.p == nil {
return 255, time.Now(), nil
}
status, err := p.p.Delete(ctx)
if err != nil {
return 255, time.Now(), nil
}
return status.ExitCode(), status.ExitTime(), nil
}
func (c *client) DeleteTask(ctx context.Context, containerID string) (uint32, time.Time, error) {
p, err := c.getProcess(containerID, libcontainerdtypes.InitProcessName)
p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
if err != nil {
return 255, time.Now(), nil
}
status, err := p.(containerd.Task).Delete(ctx)
status, err := p.Delete(ctx)
if err != nil {
return 255, time.Now(), nil
}
if ctr := c.getContainer(containerID); ctr != nil {
ctr.setTask(nil)
}
return status.ExitCode(), status.ExitTime(), nil
}
func (c *client) Delete(ctx context.Context, containerID string) error {
ctr := c.getContainer(containerID)
if ctr == nil {
return errors.WithStack(errdefs.NotFound(errors.New("no such container")))
ctr, err := c.getContainer(ctx, containerID)
if err != nil {
return err
}
if err := ctr.ctr.Delete(ctx); err != nil {
labels, err := ctr.Labels(ctx)
if err != nil {
return err
}
bundle := labels[DockerContainerBundlePath]
if err := ctr.Delete(ctx); err != nil {
return wrapError(err)
}
c.oomMu.Lock()
delete(c.oom, containerID)
c.oomMu.Unlock()
if os.Getenv("LIBCONTAINERD_NOCLEAN") != "1" {
if err := os.RemoveAll(ctr.bundleDir); err != nil {
if err := os.RemoveAll(bundle); err != nil {
c.logger.WithError(err).WithFields(logrus.Fields{
"container": containerID,
"bundle": ctr.bundleDir,
"bundle": bundle,
}).Error("failed to remove state dir")
}
}
c.removeContainer(containerID)
return nil
}
func (c *client) Status(ctx context.Context, containerID string) (containerd.ProcessStatus, error) {
ctr := c.getContainer(containerID)
if ctr == nil {
return containerd.Unknown, errors.WithStack(errdefs.NotFound(errors.New("no such container")))
t, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
if err != nil {
return containerd.Unknown, err
}
t := ctr.getTask()
if t == nil {
return containerd.Unknown, errors.WithStack(errdefs.NotFound(errors.New("no such task")))
}
s, err := t.Status(ctx)
if err != nil {
return containerd.Unknown, wrapError(err)
}
return s.Status, nil
}
func (c *client) CreateCheckpoint(ctx context.Context, containerID, checkpointDir string, exit bool) error {
p, err := c.getProcess(containerID, libcontainerdtypes.InitProcessName)
p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
if err != nil {
return err
}
@ -633,37 +547,38 @@ func (c *client) CreateCheckpoint(ctx context.Context, containerID, checkpointDi
return err
}
func (c *client) getContainer(id string) *container {
c.RLock()
ctr := c.containers[id]
c.RUnlock()
return ctr
}
func (c *client) removeContainer(id string) {
c.Lock()
delete(c.containers, id)
c.Unlock()
}
func (c *client) getProcess(containerID, processID string) (containerd.Process, error) {
ctr := c.getContainer(containerID)
if ctr == nil {
return nil, errors.WithStack(errdefs.NotFound(errors.New("no such container")))
func (c *client) getContainer(ctx context.Context, id string) (containerd.Container, error) {
ctr, err := c.client.LoadContainer(ctx, id)
if err != nil {
if containerderrors.IsNotFound(err) {
return nil, errors.WithStack(errdefs.NotFound(errors.New("no such container")))
}
return nil, wrapError(err)
}
return ctr, nil
}
t := ctr.getTask()
if t == nil {
return nil, errors.WithStack(errdefs.NotFound(errors.New("container is not running")))
func (c *client) getProcess(ctx context.Context, containerID, processID string) (containerd.Process, error) {
ctr, err := c.getContainer(ctx, containerID)
if err != nil {
return nil, err
}
t, err := ctr.Task(ctx, nil)
if err != nil {
if containerderrors.IsNotFound(err) {
return nil, errors.WithStack(errdefs.NotFound(errors.New("container is not running")))
}
return nil, wrapError(err)
}
if processID == libcontainerdtypes.InitProcessName {
return t, nil
}
p := ctr.getProcess(processID)
if p == nil {
return nil, errors.WithStack(errdefs.NotFound(errors.New("no such exec")))
p, err := t.LoadProcess(ctx, processID, nil)
if err != nil {
if containerderrors.IsNotFound(err) {
return nil, errors.WithStack(errdefs.NotFound(errors.New("no such exec")))
}
return nil, wrapError(err)
}
return p, nil
}
@ -693,7 +608,7 @@ func (c *client) createIO(fifos *cio.FIFOSet, containerID, processID string, std
// Exec/Start call failed.
go func() {
<-stdinCloseSync
p, err := c.getProcess(containerID, processID)
p, err := c.getProcess(context.Background(), containerID, processID)
if err == nil {
err = p.CloseIO(context.Background(), containerd.WithStdinCloser)
if err != nil && strings.Contains(err.Error(), "transport is closing") {
@ -714,7 +629,7 @@ func (c *client) createIO(fifos *cio.FIFOSet, containerID, processID string, std
return rio, err
}
func (c *client) processEvent(ctr *container, et libcontainerdtypes.EventType, ei libcontainerdtypes.EventInfo) {
func (c *client) processEvent(ctx context.Context, et libcontainerdtypes.EventType, ei libcontainerdtypes.EventInfo) {
c.eventQ.Append(ei.ContainerID, func() {
err := c.backend.ProcessEvent(ei.ContainerID, et, ei)
if err != nil {
@ -726,10 +641,12 @@ func (c *client) processEvent(ctr *container, et libcontainerdtypes.EventType, e
}
if et == libcontainerdtypes.EventExit && ei.ProcessID != ei.ContainerID {
p := ctr.getProcess(ei.ProcessID)
if p == nil {
p, err := c.getProcess(ctx, ei.ContainerID, ei.ProcessID)
if err != nil {
c.logger.WithError(errors.New("no such process")).
WithFields(logrus.Fields{
"error": err,
"container": ei.ContainerID,
"process": ei.ProcessID,
}).Error("exit event")
@ -742,15 +659,23 @@ func (c *client) processEvent(ctr *container, et libcontainerdtypes.EventType, e
"process": ei.ProcessID,
}).Warn("failed to delete process")
}
ctr.deleteProcess(ei.ProcessID)
ctr := c.getContainer(ei.ContainerID)
if ctr == nil {
ctr, err := c.getContainer(ctx, ei.ContainerID)
if err != nil {
c.logger.WithFields(logrus.Fields{
"container": ei.ContainerID,
"error": err,
}).Error("failed to find container")
} else {
newFIFOSet(ctr.bundleDir, ei.ProcessID, true, false).Close()
labels, err := ctr.Labels(ctx)
if err != nil {
c.logger.WithFields(logrus.Fields{
"container": ei.ContainerID,
"error": err,
}).Error("failed to find container")
return
}
newFIFOSet(labels[DockerContainerBundlePath], ei.ProcessID, true, false).Close()
}
}
})
@ -762,7 +687,6 @@ func (c *client) processEventStream(ctx context.Context, ns string) {
ev *events.Envelope
et libcontainerdtypes.EventType
ei libcontainerdtypes.EventInfo
ctr *container
)
// Filter on both namespace *and* topic. To create an "and" filter,
@ -771,8 +695,8 @@ func (c *client) processEventStream(ctx context.Context, ns string) {
c.logger.Debug("processing event stream")
var oomKilled bool
for {
var oomKilled bool
select {
case err = <-errC:
if err != nil {
@ -861,19 +785,14 @@ func (c *client) processEventStream(ctx context.Context, ns string) {
continue
}
ctr = c.getContainer(ei.ContainerID)
if ctr == nil {
c.logger.WithField("container", ei.ContainerID).Warn("unknown container")
continue
}
c.oomMu.Lock()
if oomKilled {
ctr.setOOMKilled(true)
oomKilled = false
c.oom[ei.ContainerID] = true
}
ei.OOMKilled = ctr.getOOMKilled()
ei.OOMKilled = c.oom[ei.ContainerID]
c.oomMu.Unlock()
c.processEvent(ctr, et, ei)
c.processEvent(ctx, et, ei)
}
}
}
@ -901,6 +820,10 @@ func (c *client) writeContent(ctx context.Context, mediaType, ref string, r io.R
}, nil
}
func (c *client) bundleDir(id string) string {
return filepath.Join(c.stateDir, id)
}
func wrapError(err error) error {
switch {
case err == nil:

View file

@ -23,7 +23,7 @@ func summaryFromInterface(i interface{}) (*libcontainerdtypes.Summary, error) {
}
func (c *client) UpdateResources(ctx context.Context, containerID string, resources *libcontainerdtypes.Resources) error {
p, err := c.getProcess(containerID, libcontainerdtypes.InitProcessName)
p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
if err != nil {
return err
}
@ -62,8 +62,12 @@ func getSpecUser(ociSpec *specs.Spec) (int, int) {
// WithBundle creates the bundle for the container
func WithBundle(bundleDir string, ociSpec *specs.Spec) containerd.NewContainerOpts {
return func(ctx context.Context, client *containerd.Client, c *containers.Container) error {
if c.Labels == nil {
c.Labels = make(map[string]string)
}
uid, gid := getSpecUser(ociSpec)
if uid == 0 && gid == 0 {
c.Labels[DockerContainerBundlePath] = bundleDir
return idtools.MkdirAllAndChownNew(bundleDir, 0755, idtools.Identity{UID: 0, GID: 0})
}
@ -82,6 +86,10 @@ func WithBundle(bundleDir string, ociSpec *specs.Spec) containerd.NewContainerOp
}
}
}
if c.Labels == nil {
c.Labels = make(map[string]string)
}
c.Labels[DockerContainerBundlePath] = p
return nil
}
}

View file

@ -41,6 +41,10 @@ func summaryFromInterface(i interface{}) (*libcontainerdtypes.Summary, error) {
func WithBundle(bundleDir string, ociSpec *specs.Spec) containerd.NewContainerOpts {
return func(ctx context.Context, client *containerd.Client, c *containers.Container) error {
// TODO: (containerd) Determine if we need to use system.MkdirAllWithACL here
if c.Labels == nil {
c.Labels = make(map[string]string)
}
c.Labels[DockerContainerBundlePath] = bundleDir
return os.MkdirAll(bundleDir, 0755)
}
}

View file

@ -41,11 +41,16 @@ type Backend interface {
ProcessEvent(containerID string, event EventType, ei EventInfo) error
}
// Process of a container
type Process interface {
Delete(context.Context) (uint32, time.Time, error)
}
// Client provides access to containerd features.
type Client interface {
Version(ctx context.Context) (containerd.Version, error)
Restore(ctx context.Context, containerID string, attachStdio StdioCallback) (alive bool, pid int, err error)
Restore(ctx context.Context, containerID string, attachStdio StdioCallback) (alive bool, pid int, p Process, err error)
Create(ctx context.Context, containerID string, spec *specs.Spec, runtimeOptions interface{}) error
Start(ctx context.Context, containerID, checkpointDir string, withStdin bool, attachStdio StdioCallback) (pid int, err error)

View file

@ -5,7 +5,6 @@ import (
"io"
"path/filepath"
"sync"
"time"
"github.com/containerd/containerd"
"github.com/containerd/containerd/cio"
@ -26,19 +25,6 @@ type ExitHandler interface {
HandleExitEvent(id string) error
}
// Client is used by the exector to perform operations.
// TODO(@cpuguy83): This should really just be based off the containerd client interface.
// However right now this whole package is tied to github.com/docker/docker/libcontainerd
type Client interface {
Create(ctx context.Context, containerID string, spec *specs.Spec, runtimeOptions interface{}) error
Restore(ctx context.Context, containerID string, attachStdio libcontainerdtypes.StdioCallback) (alive bool, pid int, err error)
Status(ctx context.Context, containerID string) (containerd.ProcessStatus, error)
Delete(ctx context.Context, containerID string) error
DeleteTask(ctx context.Context, containerID string) (uint32, time.Time, error)
Start(ctx context.Context, containerID, checkpointDir string, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (pid int, err error)
SignalProcess(ctx context.Context, containerID, processID string, signal int) error
}
// New creates a new containerd plugin executor
func New(ctx context.Context, rootDir string, cli *containerd.Client, exitHandler ExitHandler) (*Executor, error) {
e := &Executor{
@ -57,19 +43,23 @@ func New(ctx context.Context, rootDir string, cli *containerd.Client, exitHandle
// Executor is the containerd client implementation of a plugin executor
type Executor struct {
rootDir string
client Client
client libcontainerdtypes.Client
exitHandler ExitHandler
}
// deleteTaskAndContainer deletes plugin task and then plugin container from containerd
func deleteTaskAndContainer(ctx context.Context, cli Client, id string) {
_, _, err := cli.DeleteTask(ctx, id)
if err != nil && !errdefs.IsNotFound(err) {
logrus.WithError(err).WithField("id", id).Error("failed to delete plugin task from containerd")
func deleteTaskAndContainer(ctx context.Context, cli libcontainerdtypes.Client, id string, p libcontainerdtypes.Process) {
if p != nil {
if _, _, err := p.Delete(ctx); err != nil && !errdefs.IsNotFound(err) {
logrus.WithError(err).WithField("id", id).Error("failed to delete plugin task from containerd")
}
} else {
if _, _, err := cli.DeleteTask(ctx, id); err != nil && !errdefs.IsNotFound(err) {
logrus.WithError(err).WithField("id", id).Error("failed to delete plugin task from containerd")
}
}
err = cli.Delete(ctx, id)
if err != nil && !errdefs.IsNotFound(err) {
if err := cli.Delete(ctx, id); err != nil && !errdefs.IsNotFound(err) {
logrus.WithError(err).WithField("id", id).Error("failed to delete plugin container from containerd")
}
}
@ -103,19 +93,19 @@ func (e *Executor) Create(id string, spec specs.Spec, stdout, stderr io.WriteClo
_, err = e.client.Start(ctx, id, "", false, attachStreamsFunc(stdout, stderr))
if err != nil {
deleteTaskAndContainer(ctx, e.client, id)
deleteTaskAndContainer(ctx, e.client, id, nil)
}
return err
}
// Restore restores a container
func (e *Executor) Restore(id string, stdout, stderr io.WriteCloser) (bool, error) {
alive, _, err := e.client.Restore(context.Background(), id, attachStreamsFunc(stdout, stderr))
alive, _, p, err := e.client.Restore(context.Background(), id, attachStreamsFunc(stdout, stderr))
if err != nil && !errdefs.IsNotFound(err) {
return false, err
}
if !alive {
deleteTaskAndContainer(context.Background(), e.client, id)
deleteTaskAndContainer(context.Background(), e.client, id, p)
}
return alive, nil
}
@ -136,7 +126,7 @@ func (e *Executor) Signal(id string, signal int) error {
func (e *Executor) ProcessEvent(id string, et libcontainerdtypes.EventType, ei libcontainerdtypes.EventInfo) error {
switch et {
case libcontainerdtypes.EventExit:
deleteTaskAndContainer(context.Background(), e.client, id)
deleteTaskAndContainer(context.Background(), e.client, id, nil)
return e.exitHandler.HandleExitEvent(ei.ContainerID)
}
return nil

View file

@ -1,149 +0,0 @@
package containerd
import (
"context"
"io/ioutil"
"os"
"sync"
"testing"
"time"
"github.com/containerd/containerd"
libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
"gotest.tools/assert"
)
func TestLifeCycle(t *testing.T) {
t.Parallel()
mock := newMockClient()
exec, cleanup := setupTest(t, mock, mock)
defer cleanup()
id := "test-create"
mock.simulateStartError(true, id)
err := exec.Create(id, specs.Spec{}, nil, nil)
assert.Assert(t, err != nil)
mock.simulateStartError(false, id)
err = exec.Create(id, specs.Spec{}, nil, nil)
assert.NilError(t, err)
running, _ := exec.IsRunning(id)
assert.Assert(t, running)
// create with the same ID
err = exec.Create(id, specs.Spec{}, nil, nil)
assert.Assert(t, err != nil)
mock.HandleExitEvent(id) // simulate a plugin that exits
err = exec.Create(id, specs.Spec{}, nil, nil)
assert.NilError(t, err)
}
func setupTest(t *testing.T, client Client, eh ExitHandler) (*Executor, func()) {
rootDir, err := ioutil.TempDir("", "test-daemon")
assert.NilError(t, err)
assert.Assert(t, client != nil)
assert.Assert(t, eh != nil)
return &Executor{
rootDir: rootDir,
client: client,
exitHandler: eh,
}, func() {
assert.Assert(t, os.RemoveAll(rootDir))
}
}
type mockClient struct {
mu sync.Mutex
containers map[string]bool
errorOnStart map[string]bool
}
func newMockClient() *mockClient {
return &mockClient{
containers: make(map[string]bool),
errorOnStart: make(map[string]bool),
}
}
func (c *mockClient) Create(ctx context.Context, id string, _ *specs.Spec, _ interface{}) error {
c.mu.Lock()
defer c.mu.Unlock()
if _, ok := c.containers[id]; ok {
return errors.New("exists")
}
c.containers[id] = false
return nil
}
func (c *mockClient) Restore(ctx context.Context, id string, attachStdio libcontainerdtypes.StdioCallback) (alive bool, pid int, err error) {
return false, 0, nil
}
func (c *mockClient) Status(ctx context.Context, id string) (containerd.ProcessStatus, error) {
c.mu.Lock()
defer c.mu.Unlock()
running, ok := c.containers[id]
if !ok {
return containerd.Unknown, errors.New("not found")
}
if running {
return containerd.Running, nil
}
return containerd.Stopped, nil
}
func (c *mockClient) Delete(ctx context.Context, id string) error {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.containers, id)
return nil
}
func (c *mockClient) DeleteTask(ctx context.Context, id string) (uint32, time.Time, error) {
return 0, time.Time{}, nil
}
func (c *mockClient) Start(ctx context.Context, id, checkpointDir string, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (pid int, err error) {
c.mu.Lock()
defer c.mu.Unlock()
if _, ok := c.containers[id]; !ok {
return 0, errors.New("not found")
}
if c.errorOnStart[id] {
return 0, errors.New("some startup error")
}
c.containers[id] = true
return 1, nil
}
func (c *mockClient) SignalProcess(ctx context.Context, containerID, processID string, signal int) error {
return nil
}
func (c *mockClient) simulateStartError(sim bool, id string) {
c.mu.Lock()
defer c.mu.Unlock()
if sim {
c.errorOnStart[id] = sim
return
}
delete(c.errorOnStart, id)
}
func (c *mockClient) HandleExitEvent(id string) error {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.containers, id)
return nil
}