Fix some missing synchronization in libcontainerd

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
This commit is contained in:
Brian Goff 2017-12-15 11:32:08 -05:00
parent e55bead518
commit 647cec4324

View file

@ -43,7 +43,7 @@ import (
const InitProcessName = "init"
type container struct {
sync.Mutex
mu sync.Mutex
bundleDir string
ctr containerd.Container
@ -52,6 +52,54 @@ type container struct {
oomKilled bool
}
func (c *container) setTask(t containerd.Task) {
c.mu.Lock()
c.task = t
c.mu.Unlock()
}
func (c *container) getTask() containerd.Task {
c.mu.Lock()
t := c.task
c.mu.Unlock()
return t
}
func (c *container) addProcess(id string, p containerd.Process) {
c.mu.Lock()
if c.execs == nil {
c.execs = make(map[string]containerd.Process)
}
c.execs[id] = p
c.mu.Unlock()
}
func (c *container) deleteProcess(id string) {
c.mu.Lock()
delete(c.execs, id)
c.mu.Unlock()
}
func (c *container) getProcess(id string) containerd.Process {
c.mu.Lock()
p := c.execs[id]
c.mu.Unlock()
return p
}
func (c *container) setOOMKilled(killed bool) {
c.mu.Lock()
c.oomKilled = killed
c.mu.Unlock()
}
func (c *container) getOOMKilled() bool {
c.mu.Lock()
killed := c.oomKilled
c.mu.Unlock()
return killed
}
type client struct {
sync.RWMutex // protects containers map
@ -161,10 +209,10 @@ func (c *client) Create(ctx context.Context, id string, ociSpec *specs.Spec, run
// Start create and start a task for the specified containerd id
func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin bool, attachStdio StdioCallback) (int, error) {
ctr := c.getContainer(id)
switch {
case ctr == nil:
if ctr == nil {
return -1, errors.WithStack(newNotFoundError("no such container"))
case ctr.task != nil:
}
if t := ctr.getTask(); t != nil {
return -1, errors.WithStack(newConflictError("container already started"))
}
@ -228,9 +276,7 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin
return -1, err
}
c.Lock()
c.containers[id].task = t
c.Unlock()
ctr.setTask(t)
// Signal c.createIO that it can call CloseIO
close(stdinCloseSync)
@ -240,9 +286,7 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin
c.logger.WithError(err).WithField("container", id).
Error("failed to delete task after fail start")
}
c.Lock()
c.containers[id].task = nil
c.Unlock()
ctr.setTask(nil)
return -1, err
}
@ -251,12 +295,15 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin
func (c *client) Exec(ctx context.Context, containerID, processID string, spec *specs.Process, withStdin bool, attachStdio StdioCallback) (int, error) {
ctr := c.getContainer(containerID)
switch {
case ctr == nil:
if ctr == nil {
return -1, errors.WithStack(newNotFoundError("no such container"))
case ctr.task == nil:
}
t := ctr.getTask()
if t == nil {
return -1, errors.WithStack(newInvalidParameterError("container is not running"))
case ctr.execs != nil && ctr.execs[processID] != nil:
}
if p := ctr.getProcess(processID); p != nil {
return -1, errors.WithStack(newConflictError("id already in use"))
}
@ -279,7 +326,7 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec *
}
}()
p, err = ctr.task.Exec(ctx, processID, spec, func(id string) (cio.IO, error) {
p, err = t.Exec(ctx, processID, spec, func(id string) (cio.IO, error) {
rio, err = c.createIO(fifos, containerID, processID, stdinCloseSync, attachStdio)
return rio, err
})
@ -292,21 +339,14 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec *
return -1, err
}
ctr.Lock()
if ctr.execs == nil {
ctr.execs = make(map[string]containerd.Process)
}
ctr.execs[processID] = p
ctr.Unlock()
ctr.addProcess(processID, p)
// Signal c.createIO that it can call CloseIO
close(stdinCloseSync)
if err = p.Start(ctx); err != nil {
p.Delete(context.Background())
ctr.Lock()
delete(ctr.execs, processID)
ctr.Unlock()
ctr.deleteProcess(processID)
return -1, err
}
@ -432,12 +472,9 @@ func (c *client) DeleteTask(ctx context.Context, containerID string) (uint32, ti
return 255, time.Now(), nil
}
c.Lock()
if ctr, ok := c.containers[containerID]; ok {
ctr.task = nil
if ctr := c.getContainer(containerID); ctr != nil {
ctr.setTask(nil)
}
c.Unlock()
return status.ExitCode(), status.ExitTime(), nil
}
@ -471,7 +508,12 @@ func (c *client) Status(ctx context.Context, containerID string) (Status, error)
return StatusUnknown, errors.WithStack(newNotFoundError("no such container"))
}
s, err := ctr.task.Status(ctx)
t := ctr.getTask()
if t == nil {
return StatusUnknown, errors.WithStack(newNotFoundError("no such task"))
}
s, err := t.Status(ctx)
if err != nil {
return StatusUnknown, err
}
@ -547,26 +589,22 @@ func (c *client) removeContainer(id string) {
func (c *client) getProcess(containerID, processID string) (containerd.Process, error) {
ctr := c.getContainer(containerID)
switch {
case ctr == nil:
if ctr == nil {
return nil, errors.WithStack(newNotFoundError("no such container"))
case ctr.task == nil:
return nil, errors.WithStack(newNotFoundError("container is not running"))
case processID == InitProcessName:
return ctr.task, nil
default:
ctr.Lock()
defer ctr.Unlock()
if ctr.execs == nil {
return nil, errors.WithStack(newNotFoundError("no execs"))
}
}
p := ctr.execs[processID]
t := ctr.getTask()
if t == nil {
return nil, errors.WithStack(newNotFoundError("container is not running"))
}
if processID == InitProcessName {
return t, nil
}
p := ctr.getProcess(processID)
if p == nil {
return nil, errors.WithStack(newNotFoundError("no such exec"))
}
return p, nil
}
@ -624,12 +662,7 @@ func (c *client) processEvent(ctr *container, et EventType, ei EventInfo) {
}
if et == EventExit && ei.ProcessID != ei.ContainerID {
var p containerd.Process
ctr.Lock()
if ctr.execs != nil {
p = ctr.execs[ei.ProcessID]
}
ctr.Unlock()
p := ctr.getProcess(ei.ProcessID)
if p == nil {
c.logger.WithError(errors.New("no such process")).
WithFields(logrus.Fields{
@ -645,9 +678,8 @@ func (c *client) processEvent(ctr *container, et EventType, ei EventInfo) {
"process": ei.ProcessID,
}).Warn("failed to delete process")
}
c.Lock()
delete(ctr.execs, ei.ProcessID)
c.Unlock()
ctr.deleteProcess(ei.ProcessID)
ctr := c.getContainer(ei.ContainerID)
if ctr == nil {
c.logger.WithFields(logrus.Fields{
@ -784,10 +816,10 @@ func (c *client) processEventStream(ctx context.Context) {
}
if oomKilled {
ctr.oomKilled = true
ctr.setOOMKilled(true)
oomKilled = false
}
ei.OOMKilled = ctr.oomKilled
ei.OOMKilled = ctr.getOOMKilled()
c.processEvent(ctr, et, ei)
}