From 647cec4324186faa3183bd6a7bc72a032a86c8c9 Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Fri, 15 Dec 2017 11:32:08 -0500 Subject: [PATCH] Fix some missing synchronization in libcontainerd Signed-off-by: Brian Goff --- libcontainerd/client_daemon.go | 142 ++++++++++++++++++++------------- 1 file changed, 87 insertions(+), 55 deletions(-) diff --git a/libcontainerd/client_daemon.go b/libcontainerd/client_daemon.go index 78b1412068..a9f7c11dd1 100644 --- a/libcontainerd/client_daemon.go +++ b/libcontainerd/client_daemon.go @@ -43,7 +43,7 @@ import ( const InitProcessName = "init" type container struct { - sync.Mutex + mu sync.Mutex bundleDir string ctr containerd.Container @@ -52,6 +52,54 @@ type container struct { oomKilled bool } +func (c *container) setTask(t containerd.Task) { + c.mu.Lock() + c.task = t + c.mu.Unlock() +} + +func (c *container) getTask() containerd.Task { + c.mu.Lock() + t := c.task + c.mu.Unlock() + return t +} + +func (c *container) addProcess(id string, p containerd.Process) { + c.mu.Lock() + if c.execs == nil { + c.execs = make(map[string]containerd.Process) + } + c.execs[id] = p + c.mu.Unlock() +} + +func (c *container) deleteProcess(id string) { + c.mu.Lock() + delete(c.execs, id) + c.mu.Unlock() +} + +func (c *container) getProcess(id string) containerd.Process { + c.mu.Lock() + p := c.execs[id] + c.mu.Unlock() + return p +} + +func (c *container) setOOMKilled(killed bool) { + c.mu.Lock() + c.oomKilled = killed + c.mu.Unlock() +} + +func (c *container) getOOMKilled() bool { + c.mu.Lock() + killed := c.oomKilled + c.mu.Unlock() + return killed +} + type client struct { sync.RWMutex // protects containers map @@ -161,10 +209,10 @@ func (c *client) Create(ctx context.Context, id string, ociSpec *specs.Spec, run // Start create and start a task for the specified containerd id func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin bool, attachStdio StdioCallback) (int, error) { ctr := c.getContainer(id) - switch { - case ctr == nil: + if ctr == nil { return -1, errors.WithStack(newNotFoundError("no such container")) - case ctr.task != nil: + } + if t := ctr.getTask(); t != nil { return -1, errors.WithStack(newConflictError("container already started")) } @@ -228,9 +276,7 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin return -1, err } - c.Lock() - c.containers[id].task = t - c.Unlock() + ctr.setTask(t) // Signal c.createIO that it can call CloseIO close(stdinCloseSync) @@ -240,9 +286,7 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin c.logger.WithError(err).WithField("container", id). Error("failed to delete task after fail start") } - c.Lock() - c.containers[id].task = nil - c.Unlock() + ctr.setTask(nil) return -1, err } @@ -251,12 +295,15 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin func (c *client) Exec(ctx context.Context, containerID, processID string, spec *specs.Process, withStdin bool, attachStdio StdioCallback) (int, error) { ctr := c.getContainer(containerID) - switch { - case ctr == nil: + if ctr == nil { return -1, errors.WithStack(newNotFoundError("no such container")) - case ctr.task == nil: + } + t := ctr.getTask() + if t == nil { return -1, errors.WithStack(newInvalidParameterError("container is not running")) - case ctr.execs != nil && ctr.execs[processID] != nil: + } + + if p := ctr.getProcess(processID); p != nil { return -1, errors.WithStack(newConflictError("id already in use")) } @@ -279,7 +326,7 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec * } }() - p, err = ctr.task.Exec(ctx, processID, spec, func(id string) (cio.IO, error) { + p, err = t.Exec(ctx, processID, spec, func(id string) (cio.IO, error) { rio, err = c.createIO(fifos, containerID, processID, stdinCloseSync, attachStdio) return rio, err }) @@ -292,21 +339,14 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec * return -1, err } - ctr.Lock() - if ctr.execs == nil { - ctr.execs = make(map[string]containerd.Process) - } - ctr.execs[processID] = p - ctr.Unlock() + ctr.addProcess(processID, p) // Signal c.createIO that it can call CloseIO close(stdinCloseSync) if err = p.Start(ctx); err != nil { p.Delete(context.Background()) - ctr.Lock() - delete(ctr.execs, processID) - ctr.Unlock() + ctr.deleteProcess(processID) return -1, err } @@ -432,12 +472,9 @@ func (c *client) DeleteTask(ctx context.Context, containerID string) (uint32, ti return 255, time.Now(), nil } - c.Lock() - if ctr, ok := c.containers[containerID]; ok { - ctr.task = nil + if ctr := c.getContainer(containerID); ctr != nil { + ctr.setTask(nil) } - c.Unlock() - return status.ExitCode(), status.ExitTime(), nil } @@ -471,7 +508,12 @@ func (c *client) Status(ctx context.Context, containerID string) (Status, error) return StatusUnknown, errors.WithStack(newNotFoundError("no such container")) } - s, err := ctr.task.Status(ctx) + t := ctr.getTask() + if t == nil { + return StatusUnknown, errors.WithStack(newNotFoundError("no such task")) + } + + s, err := t.Status(ctx) if err != nil { return StatusUnknown, err } @@ -547,26 +589,22 @@ func (c *client) removeContainer(id string) { func (c *client) getProcess(containerID, processID string) (containerd.Process, error) { ctr := c.getContainer(containerID) - switch { - case ctr == nil: + if ctr == nil { return nil, errors.WithStack(newNotFoundError("no such container")) - case ctr.task == nil: - return nil, errors.WithStack(newNotFoundError("container is not running")) - case processID == InitProcessName: - return ctr.task, nil - default: - ctr.Lock() - defer ctr.Unlock() - if ctr.execs == nil { - return nil, errors.WithStack(newNotFoundError("no execs")) - } } - p := ctr.execs[processID] + t := ctr.getTask() + if t == nil { + return nil, errors.WithStack(newNotFoundError("container is not running")) + } + if processID == InitProcessName { + return t, nil + } + + p := ctr.getProcess(processID) if p == nil { return nil, errors.WithStack(newNotFoundError("no such exec")) } - return p, nil } @@ -624,12 +662,7 @@ func (c *client) processEvent(ctr *container, et EventType, ei EventInfo) { } if et == EventExit && ei.ProcessID != ei.ContainerID { - var p containerd.Process - ctr.Lock() - if ctr.execs != nil { - p = ctr.execs[ei.ProcessID] - } - ctr.Unlock() + p := ctr.getProcess(ei.ProcessID) if p == nil { c.logger.WithError(errors.New("no such process")). WithFields(logrus.Fields{ @@ -645,9 +678,8 @@ func (c *client) processEvent(ctr *container, et EventType, ei EventInfo) { "process": ei.ProcessID, }).Warn("failed to delete process") } - c.Lock() - delete(ctr.execs, ei.ProcessID) - c.Unlock() + ctr.deleteProcess(ei.ProcessID) + ctr := c.getContainer(ei.ContainerID) if ctr == nil { c.logger.WithFields(logrus.Fields{ @@ -784,10 +816,10 @@ func (c *client) processEventStream(ctx context.Context) { } if oomKilled { - ctr.oomKilled = true + ctr.setOOMKilled(true) oomKilled = false } - ei.OOMKilled = ctr.oomKilled + ei.OOMKilled = ctr.getOOMKilled() c.processEvent(ctr, et, ei) }