Merge pull request #25849 from darrenstahlmsft/LibcontainerdRaces
Lock all calls to hcsshim to prevent close races
This commit is contained in:
commit
4348878242
7 changed files with 95 additions and 47 deletions
|
@ -121,11 +121,8 @@ func (daemon *Daemon) Kill(container *container.Container) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if container.IsRunning() {
|
if _, err2 := container.WaitStop(2 * time.Second); err2 != nil {
|
||||||
container.WaitStop(2 * time.Second)
|
return err
|
||||||
if container.IsRunning() {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -162,7 +162,9 @@ func (daemon *Daemon) AttachStreams(id string, iop libcontainerd.IOPipe) error {
|
||||||
if iop.Stdin != nil {
|
if iop.Stdin != nil {
|
||||||
go func() {
|
go func() {
|
||||||
io.Copy(iop.Stdin, stdin)
|
io.Copy(iop.Stdin, stdin)
|
||||||
iop.Stdin.Close()
|
if err := iop.Stdin.Close(); err != nil {
|
||||||
|
logrus.Error(err)
|
||||||
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -172,7 +174,9 @@ func (daemon *Daemon) AttachStreams(id string, iop libcontainerd.IOPipe) error {
|
||||||
if (c != nil && !c.Config.Tty) || (ec != nil && !ec.Tty && runtime.GOOS == "windows") {
|
if (c != nil && !c.Config.Tty) || (ec != nil && !ec.Tty && runtime.GOOS == "windows") {
|
||||||
// tty is enabled, so dont close containerd's iopipe stdin.
|
// tty is enabled, so dont close containerd's iopipe stdin.
|
||||||
if iop.Stdin != nil {
|
if iop.Stdin != nil {
|
||||||
iop.Stdin.Close()
|
if err := iop.Stdin.Close(); err != nil {
|
||||||
|
logrus.Error(err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,9 +46,21 @@ func (daemon *Daemon) containerStop(container *container.Container, seconds int)
|
||||||
stopSignal := container.StopSignal()
|
stopSignal := container.StopSignal()
|
||||||
// 1. Send a stop signal
|
// 1. Send a stop signal
|
||||||
if err := daemon.killPossiblyDeadProcess(container, stopSignal); err != nil {
|
if err := daemon.killPossiblyDeadProcess(container, stopSignal); err != nil {
|
||||||
logrus.Infof("Failed to send signal %d to the process, force killing", stopSignal)
|
// While normally we might "return err" here we're not going to
|
||||||
if err := daemon.killPossiblyDeadProcess(container, 9); err != nil {
|
// because if we can't stop the container by this point then
|
||||||
return err
|
// its probably because its already stopped. Meaning, between
|
||||||
|
// the time of the IsRunning() call above and now it stopped.
|
||||||
|
// Also, since the err return will be environment specific we can't
|
||||||
|
// look for any particular (common) error that would indicate
|
||||||
|
// that the process is already dead vs something else going wrong.
|
||||||
|
// So, instead we'll give it up to 2 more seconds to complete and if
|
||||||
|
// by that time the container is still running, then the error
|
||||||
|
// we got is probably valid and so we force kill it.
|
||||||
|
if _, err := container.WaitStop(2 * time.Second); err != nil {
|
||||||
|
logrus.Infof("Container failed to stop after sending signal %d to the process, force killing", stopSignal)
|
||||||
|
if err := daemon.killPossiblyDeadProcess(container, 9); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -29,9 +29,9 @@ func (clnt *client) appendContainer(cont *container) {
|
||||||
clnt.containers[cont.containerID] = cont
|
clnt.containers[cont.containerID] = cont
|
||||||
clnt.mapMutex.Unlock()
|
clnt.mapMutex.Unlock()
|
||||||
}
|
}
|
||||||
func (clnt *client) deleteContainer(friendlyName string) {
|
func (clnt *client) deleteContainer(containerID string) {
|
||||||
clnt.mapMutex.Lock()
|
clnt.mapMutex.Lock()
|
||||||
delete(clnt.containers, friendlyName)
|
delete(clnt.containers, containerID)
|
||||||
clnt.mapMutex.Unlock()
|
clnt.mapMutex.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -38,6 +38,8 @@ const defaultOwner = "docker"
|
||||||
// Create is the entrypoint to create a container from a spec, and if successfully
|
// Create is the entrypoint to create a container from a spec, and if successfully
|
||||||
// created, start it too.
|
// created, start it too.
|
||||||
func (clnt *client) Create(containerID string, checkpoint string, checkpointDir string, spec Spec, options ...CreateOption) error {
|
func (clnt *client) Create(containerID string, checkpoint string, checkpointDir string, spec Spec, options ...CreateOption) error {
|
||||||
|
clnt.lock(containerID)
|
||||||
|
defer clnt.unlock(containerID)
|
||||||
logrus.Debugln("libcontainerd: client.Create() with spec", spec)
|
logrus.Debugln("libcontainerd: client.Create() with spec", spec)
|
||||||
|
|
||||||
configuration := &hcsshim.ContainerConfig{
|
configuration := &hcsshim.ContainerConfig{
|
||||||
|
@ -221,6 +223,13 @@ func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendly
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pid := newProcess.Pid()
|
||||||
|
openedProcess, err := container.hcsContainer.OpenProcess(pid)
|
||||||
|
if err != nil {
|
||||||
|
logrus.Errorf("AddProcess %s OpenProcess() failed %s", containerID, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
stdin, stdout, stderr, err = newProcess.Stdio()
|
stdin, stdout, stderr, err = newProcess.Stdio()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Errorf("libcontainerd: %s getting std pipes failed %s", containerID, err)
|
logrus.Errorf("libcontainerd: %s getting std pipes failed %s", containerID, err)
|
||||||
|
@ -238,8 +247,6 @@ func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendly
|
||||||
iopipe.Stderr = openReaderFromPipe(stderr)
|
iopipe.Stderr = openReaderFromPipe(stderr)
|
||||||
}
|
}
|
||||||
|
|
||||||
pid := newProcess.Pid()
|
|
||||||
|
|
||||||
proc := &process{
|
proc := &process{
|
||||||
processCommon: processCommon{
|
processCommon: processCommon{
|
||||||
containerID: containerID,
|
containerID: containerID,
|
||||||
|
@ -248,7 +255,7 @@ func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendly
|
||||||
systemPid: uint32(pid),
|
systemPid: uint32(pid),
|
||||||
},
|
},
|
||||||
commandLine: createProcessParms.CommandLine,
|
commandLine: createProcessParms.CommandLine,
|
||||||
hcsProcess: newProcess,
|
hcsProcess: openedProcess,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add the process to the container's list of processes
|
// Add the process to the container's list of processes
|
||||||
|
@ -280,7 +287,7 @@ func (clnt *client) Signal(containerID string, sig int) error {
|
||||||
err error
|
err error
|
||||||
)
|
)
|
||||||
|
|
||||||
// Get the container as we need it to find the pid of the process.
|
// Get the container as we need it to get the container handle.
|
||||||
clnt.lock(containerID)
|
clnt.lock(containerID)
|
||||||
defer clnt.unlock(containerID)
|
defer clnt.unlock(containerID)
|
||||||
if cont, err = clnt.getContainer(containerID); err != nil {
|
if cont, err = clnt.getContainer(containerID); err != nil {
|
||||||
|
|
|
@ -35,6 +35,8 @@ func (ctr *container) newProcess(friendlyName string) *process {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// start starts a created container.
|
||||||
|
// Caller needs to lock container ID before calling this method.
|
||||||
func (ctr *container) start() error {
|
func (ctr *container) start() error {
|
||||||
var err error
|
var err error
|
||||||
isServicing := false
|
isServicing := false
|
||||||
|
@ -78,7 +80,7 @@ func (ctr *container) start() error {
|
||||||
createProcessParms.CommandLine = strings.Join(ctr.ociSpec.Process.Args, " ")
|
createProcessParms.CommandLine = strings.Join(ctr.ociSpec.Process.Args, " ")
|
||||||
|
|
||||||
// Start the command running in the container.
|
// Start the command running in the container.
|
||||||
hcsProcess, err := ctr.hcsContainer.CreateProcess(createProcessParms)
|
newProcess, err := ctr.hcsContainer.CreateProcess(createProcessParms)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Errorf("libcontainerd: CreateProcess() failed %s", err)
|
logrus.Errorf("libcontainerd: CreateProcess() failed %s", err)
|
||||||
if err := ctr.terminate(); err != nil {
|
if err := ctr.terminate(); err != nil {
|
||||||
|
@ -90,10 +92,21 @@ func (ctr *container) start() error {
|
||||||
}
|
}
|
||||||
ctr.startedAt = time.Now()
|
ctr.startedAt = time.Now()
|
||||||
|
|
||||||
|
pid := newProcess.Pid()
|
||||||
|
openedProcess, err := ctr.hcsContainer.OpenProcess(pid)
|
||||||
|
if err != nil {
|
||||||
|
logrus.Errorf("OpenProcess() failed %s", err)
|
||||||
|
if err := ctr.terminate(); err != nil {
|
||||||
|
logrus.Errorf("Failed to cleanup after a failed OpenProcess. %s", err)
|
||||||
|
} else {
|
||||||
|
logrus.Debugln("Cleaned up after failed OpenProcess by calling Terminate")
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// Save the hcs Process and PID
|
// Save the hcs Process and PID
|
||||||
ctr.process.friendlyName = InitFriendlyName
|
ctr.process.friendlyName = InitFriendlyName
|
||||||
pid := hcsProcess.Pid()
|
ctr.process.hcsProcess = openedProcess
|
||||||
ctr.process.hcsProcess = hcsProcess
|
|
||||||
|
|
||||||
// If this is a servicing container, wait on the process synchronously here and
|
// If this is a servicing container, wait on the process synchronously here and
|
||||||
// if it succeeds, wait for it cleanly shutdown and merge into the parent container.
|
// if it succeeds, wait for it cleanly shutdown and merge into the parent container.
|
||||||
|
@ -110,7 +123,7 @@ func (ctr *container) start() error {
|
||||||
|
|
||||||
var stdout, stderr io.ReadCloser
|
var stdout, stderr io.ReadCloser
|
||||||
var stdin io.WriteCloser
|
var stdin io.WriteCloser
|
||||||
stdin, stdout, stderr, err = hcsProcess.Stdio()
|
stdin, stdout, stderr, err = newProcess.Stdio()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Errorf("libcontainerd: failed to get stdio pipes: %s", err)
|
logrus.Errorf("libcontainerd: failed to get stdio pipes: %s", err)
|
||||||
if err := ctr.terminate(); err != nil {
|
if err := ctr.terminate(); err != nil {
|
||||||
|
@ -121,7 +134,7 @@ func (ctr *container) start() error {
|
||||||
|
|
||||||
iopipe := &IOPipe{Terminal: ctr.ociSpec.Process.Terminal}
|
iopipe := &IOPipe{Terminal: ctr.ociSpec.Process.Terminal}
|
||||||
|
|
||||||
iopipe.Stdin = createStdInCloser(stdin, hcsProcess)
|
iopipe.Stdin = createStdInCloser(stdin, newProcess)
|
||||||
|
|
||||||
// Convert io.ReadClosers to io.Readers
|
// Convert io.ReadClosers to io.Readers
|
||||||
if stdout != nil {
|
if stdout != nil {
|
||||||
|
@ -151,6 +164,7 @@ func (ctr *container) start() error {
|
||||||
State: StateStart,
|
State: StateStart,
|
||||||
Pid: ctr.systemPid, // Not sure this is needed? Double-check monitor.go in daemon BUGBUG @jhowardmsft
|
Pid: ctr.systemPid, // Not sure this is needed? Double-check monitor.go in daemon BUGBUG @jhowardmsft
|
||||||
}}
|
}}
|
||||||
|
logrus.Debugf("libcontainerd: start() completed OK, %+v", si)
|
||||||
return ctr.client.backend.StateChanged(ctr.containerID, si)
|
return ctr.client.backend.StateChanged(ctr.containerID, si)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -182,10 +196,6 @@ func (ctr *container) waitProcessExitCode(process *process) int {
|
||||||
// has exited to avoid a container being dropped on the floor.
|
// has exited to avoid a container being dropped on the floor.
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := process.hcsProcess.Close(); err != nil {
|
|
||||||
logrus.Errorf("libcontainerd: hcsProcess.Close(): %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return exitCode
|
return exitCode
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -197,6 +207,8 @@ func (ctr *container) waitExit(process *process, isFirstProcessToStart bool) err
|
||||||
logrus.Debugln("libcontainerd: waitExit() on pid", process.systemPid)
|
logrus.Debugln("libcontainerd: waitExit() on pid", process.systemPid)
|
||||||
|
|
||||||
exitCode := ctr.waitProcessExitCode(process)
|
exitCode := ctr.waitProcessExitCode(process)
|
||||||
|
// Lock the container while shutting down
|
||||||
|
ctr.client.lock(ctr.containerID)
|
||||||
|
|
||||||
// Assume the container has exited
|
// Assume the container has exited
|
||||||
si := StateInfo{
|
si := StateInfo{
|
||||||
|
@ -212,6 +224,7 @@ func (ctr *container) waitExit(process *process, isFirstProcessToStart bool) err
|
||||||
// But it could have been an exec'd process which exited
|
// But it could have been an exec'd process which exited
|
||||||
if !isFirstProcessToStart {
|
if !isFirstProcessToStart {
|
||||||
si.State = StateExitProcess
|
si.State = StateExitProcess
|
||||||
|
ctr.cleanProcess(process.friendlyName)
|
||||||
} else {
|
} else {
|
||||||
updatePending, err := ctr.hcsContainer.HasPendingUpdates()
|
updatePending, err := ctr.hcsContainer.HasPendingUpdates()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -237,6 +250,7 @@ func (ctr *container) waitExit(process *process, isFirstProcessToStart bool) err
|
||||||
} else if restart {
|
} else if restart {
|
||||||
si.State = StateRestart
|
si.State = StateRestart
|
||||||
ctr.restarting = true
|
ctr.restarting = true
|
||||||
|
ctr.client.deleteContainer(ctr.containerID)
|
||||||
waitRestart = wait
|
waitRestart = wait
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -244,10 +258,17 @@ func (ctr *container) waitExit(process *process, isFirstProcessToStart bool) err
|
||||||
// Remove process from list if we have exited
|
// Remove process from list if we have exited
|
||||||
// We need to do so here in case the Message Handler decides to restart it.
|
// We need to do so here in case the Message Handler decides to restart it.
|
||||||
if si.State == StateExit {
|
if si.State == StateExit {
|
||||||
ctr.client.deleteContainer(ctr.friendlyName)
|
ctr.client.deleteContainer(ctr.containerID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := process.hcsProcess.Close(); err != nil {
|
||||||
|
logrus.Errorf("libcontainerd: hcsProcess.Close(): %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unlock here before we call back into the daemon to update state
|
||||||
|
ctr.client.unlock(ctr.containerID)
|
||||||
|
|
||||||
// Call into the backend to notify it of the state change.
|
// Call into the backend to notify it of the state change.
|
||||||
logrus.Debugf("libcontainerd: waitExit() calling backend.StateChanged %+v", si)
|
logrus.Debugf("libcontainerd: waitExit() calling backend.StateChanged %+v", si)
|
||||||
if err := ctr.client.backend.StateChanged(ctr.containerID, si); err != nil {
|
if err := ctr.client.backend.StateChanged(ctr.containerID, si); err != nil {
|
||||||
|
@ -257,7 +278,6 @@ func (ctr *container) waitExit(process *process, isFirstProcessToStart bool) err
|
||||||
go func() {
|
go func() {
|
||||||
err := <-waitRestart
|
err := <-waitRestart
|
||||||
ctr.restarting = false
|
ctr.restarting = false
|
||||||
ctr.client.deleteContainer(ctr.friendlyName)
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
if err = ctr.client.Create(ctr.containerID, "", "", ctr.ociSpec, ctr.options...); err != nil {
|
if err = ctr.client.Create(ctr.containerID, "", "", ctr.ociSpec, ctr.options...); err != nil {
|
||||||
logrus.Errorf("libcontainerd: error restarting %v", err)
|
logrus.Errorf("libcontainerd: error restarting %v", err)
|
||||||
|
@ -276,6 +296,14 @@ func (ctr *container) waitExit(process *process, isFirstProcessToStart bool) err
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// cleanProcess removes process from the map.
|
||||||
|
// Caller needs to lock container ID before calling this method.
|
||||||
|
func (ctr *container) cleanProcess(id string) {
|
||||||
|
delete(ctr.processes, id)
|
||||||
|
}
|
||||||
|
|
||||||
|
// shutdown shuts down the container in HCS
|
||||||
|
// Caller needs to lock container ID before calling this method.
|
||||||
func (ctr *container) shutdown() error {
|
func (ctr *container) shutdown() error {
|
||||||
const shutdownTimeout = time.Minute * 5
|
const shutdownTimeout = time.Minute * 5
|
||||||
err := ctr.hcsContainer.Shutdown()
|
err := ctr.hcsContainer.Shutdown()
|
||||||
|
@ -297,6 +325,8 @@ func (ctr *container) shutdown() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// terminate terminates the container in HCS
|
||||||
|
// Caller needs to lock container ID before calling this method.
|
||||||
func (ctr *container) terminate() error {
|
func (ctr *container) terminate() error {
|
||||||
const terminateTimeout = time.Minute * 5
|
const terminateTimeout = time.Minute * 5
|
||||||
err := ctr.hcsContainer.Terminate()
|
err := ctr.hcsContainer.Terminate()
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
"github.com/Microsoft/hcsshim"
|
"github.com/Microsoft/hcsshim"
|
||||||
|
"github.com/docker/docker/pkg/ioutils"
|
||||||
)
|
)
|
||||||
|
|
||||||
// process keeps the state for both main container process and exec process.
|
// process keeps the state for both main container process and exec process.
|
||||||
|
@ -29,26 +30,23 @@ func openReaderFromPipe(p io.ReadCloser) io.Reader {
|
||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
type stdInCloser struct {
|
func createStdInCloser(pipe io.WriteCloser, process hcsshim.Process) io.WriteCloser {
|
||||||
io.WriteCloser
|
return ioutils.NewWriteCloserWrapper(pipe, func() error {
|
||||||
hcsshim.Process
|
if err := pipe.Close(); err != nil {
|
||||||
}
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func createStdInCloser(pipe io.WriteCloser, process hcsshim.Process) *stdInCloser {
|
// We do not need to lock container ID here, even though
|
||||||
return &stdInCloser{
|
// we are calling into hcsshim. This is safe, because the
|
||||||
WriteCloser: pipe,
|
// only place that closes this process handle is this method.
|
||||||
Process: process,
|
err := process.CloseStdin()
|
||||||
}
|
if err != nil && !hcsshim.IsNotExist(err) {
|
||||||
}
|
// This error will occur if the compute system is currently shutting down
|
||||||
|
if perr, ok := err.(*hcsshim.ProcessError); ok && perr.Err != hcsshim.ErrVmcomputeOperationInvalidState {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (stdin *stdInCloser) Close() error {
|
return process.Close()
|
||||||
if err := stdin.WriteCloser.Close(); err != nil {
|
})
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return stdin.Process.CloseStdin()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (stdin *stdInCloser) Write(p []byte) (n int, err error) {
|
|
||||||
return stdin.WriteCloser.Write(p)
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue