Merge pull request #43751 from vvoland/fix-exitcode-wait
state/Wait: Fix race when reading exit status
This commit is contained in:
commit
f34567bf41
9 changed files with 255 additions and 73 deletions
|
@ -86,6 +86,7 @@ type Container struct {
|
|||
RestartCount int
|
||||
HasBeenStartedBefore bool
|
||||
HasBeenManuallyStopped bool // used for unless-stopped restart policy
|
||||
HasBeenManuallyRestarted bool `json:"-"` // used to distinguish restart caused by restart policy from the manual one
|
||||
MountPoints map[string]*volumemounts.MountPoint
|
||||
HostConfig *containertypes.HostConfig `json:"-"` // do not serialize the host config in the json, otherwise we'll make the container unportable
|
||||
ExecCommands *exec.Store `json:"-"`
|
||||
|
|
|
@ -32,9 +32,10 @@ type State struct {
|
|||
StartedAt time.Time
|
||||
FinishedAt time.Time
|
||||
Health *Health
|
||||
Removed bool `json:"-"`
|
||||
|
||||
waitStop chan struct{}
|
||||
waitRemove chan struct{}
|
||||
stopWaiters []chan<- StateStatus
|
||||
removeOnlyWaiters []chan<- StateStatus
|
||||
}
|
||||
|
||||
// StateStatus is used to return container wait results.
|
||||
|
@ -57,12 +58,9 @@ func (s StateStatus) Err() error {
|
|||
return s.err
|
||||
}
|
||||
|
||||
// NewState creates a default state object with a fresh channel for state changes.
|
||||
// NewState creates a default state object.
|
||||
func NewState() *State {
|
||||
return &State{
|
||||
waitStop: make(chan struct{}),
|
||||
waitRemove: make(chan struct{}),
|
||||
}
|
||||
return &State{}
|
||||
}
|
||||
|
||||
// String returns a human-readable description of the state
|
||||
|
@ -182,11 +180,10 @@ func (s *State) Wait(ctx context.Context, condition WaitCondition) <-chan StateS
|
|||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
if condition == WaitConditionNotRunning && !s.Running {
|
||||
// Buffer so we can put it in the channel now.
|
||||
// Buffer so we can put status and finish even nobody receives it.
|
||||
resultC := make(chan StateStatus, 1)
|
||||
|
||||
// Send the current status.
|
||||
if s.conditionAlreadyMet(condition) {
|
||||
resultC <- StateStatus{
|
||||
exitCode: s.ExitCode(),
|
||||
err: s.Err(),
|
||||
|
@ -195,20 +192,17 @@ func (s *State) Wait(ctx context.Context, condition WaitCondition) <-chan StateS
|
|||
return resultC
|
||||
}
|
||||
|
||||
// If we are waiting only for removal, the waitStop channel should
|
||||
// remain nil and block forever.
|
||||
var waitStop chan struct{}
|
||||
if condition < WaitConditionRemoved {
|
||||
waitStop = s.waitStop
|
||||
waitC := make(chan StateStatus, 1)
|
||||
|
||||
// Removal wakes up both removeOnlyWaiters and stopWaiters
|
||||
// Container could be removed while still in "created" state
|
||||
// in which case it is never actually stopped
|
||||
if condition == WaitConditionRemoved {
|
||||
s.removeOnlyWaiters = append(s.removeOnlyWaiters, waitC)
|
||||
} else {
|
||||
s.stopWaiters = append(s.stopWaiters, waitC)
|
||||
}
|
||||
|
||||
// Always wait for removal, just in case the container gets removed
|
||||
// while it is still in a "created" state, in which case it is never
|
||||
// actually stopped.
|
||||
waitRemove := s.waitRemove
|
||||
|
||||
resultC := make(chan StateStatus, 1)
|
||||
|
||||
go func() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
@ -218,23 +212,25 @@ func (s *State) Wait(ctx context.Context, condition WaitCondition) <-chan StateS
|
|||
err: ctx.Err(),
|
||||
}
|
||||
return
|
||||
case <-waitStop:
|
||||
case <-waitRemove:
|
||||
case status := <-waitC:
|
||||
resultC <- status
|
||||
}
|
||||
|
||||
s.Lock()
|
||||
result := StateStatus{
|
||||
exitCode: s.ExitCode(),
|
||||
err: s.Err(),
|
||||
}
|
||||
s.Unlock()
|
||||
|
||||
resultC <- result
|
||||
}()
|
||||
|
||||
return resultC
|
||||
}
|
||||
|
||||
func (s *State) conditionAlreadyMet(condition WaitCondition) bool {
|
||||
switch condition {
|
||||
case WaitConditionNotRunning:
|
||||
return !s.Running
|
||||
case WaitConditionRemoved:
|
||||
return s.Removed
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// IsRunning returns whether the running flag is set. Used by Container to check whether a container is running.
|
||||
func (s *State) IsRunning() bool {
|
||||
s.Lock()
|
||||
|
@ -292,8 +288,8 @@ func (s *State) SetStopped(exitStatus *ExitStatus) {
|
|||
}
|
||||
s.ExitCodeValue = exitStatus.ExitCode
|
||||
s.OOMKilled = exitStatus.OOMKilled
|
||||
close(s.waitStop) // fire waiters for stop
|
||||
s.waitStop = make(chan struct{})
|
||||
|
||||
s.notifyAndClear(&s.stopWaiters)
|
||||
}
|
||||
|
||||
// SetRestarting sets the container state to "restarting" without locking.
|
||||
|
@ -308,8 +304,8 @@ func (s *State) SetRestarting(exitStatus *ExitStatus) {
|
|||
s.FinishedAt = time.Now().UTC()
|
||||
s.ExitCodeValue = exitStatus.ExitCode
|
||||
s.OOMKilled = exitStatus.OOMKilled
|
||||
close(s.waitStop) // fire waiters for stop
|
||||
s.waitStop = make(chan struct{})
|
||||
|
||||
s.notifyAndClear(&s.stopWaiters)
|
||||
}
|
||||
|
||||
// SetError sets the container's error state. This is useful when we want to
|
||||
|
@ -374,22 +370,19 @@ func (s *State) IsDead() bool {
|
|||
return res
|
||||
}
|
||||
|
||||
// SetRemoved assumes this container is already in the "dead" state and
|
||||
// closes the internal waitRemove channel to unblock callers waiting for a
|
||||
// container to be removed.
|
||||
// SetRemoved assumes this container is already in the "dead" state and notifies all waiters.
|
||||
func (s *State) SetRemoved() {
|
||||
s.SetRemovalError(nil)
|
||||
}
|
||||
|
||||
// SetRemovalError is to be called in case a container remove failed.
|
||||
// It sets an error and closes the internal waitRemove channel to unblock
|
||||
// callers waiting for the container to be removed.
|
||||
// It sets an error and notifies all waiters.
|
||||
func (s *State) SetRemovalError(err error) {
|
||||
s.SetError(err)
|
||||
s.Lock()
|
||||
close(s.waitRemove) // Unblock those waiting on remove.
|
||||
// Recreate the channel so next ContainerWait will work
|
||||
s.waitRemove = make(chan struct{})
|
||||
s.Removed = true
|
||||
s.notifyAndClear(&s.removeOnlyWaiters)
|
||||
s.notifyAndClear(&s.stopWaiters)
|
||||
s.Unlock()
|
||||
}
|
||||
|
||||
|
@ -400,3 +393,15 @@ func (s *State) Err() error {
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *State) notifyAndClear(waiters *[]chan<- StateStatus) {
|
||||
result := StateStatus{
|
||||
exitCode: s.ExitCodeValue,
|
||||
err: s.Err(),
|
||||
}
|
||||
|
||||
for _, c := range *waiters {
|
||||
c <- result
|
||||
}
|
||||
*waiters = nil
|
||||
}
|
||||
|
|
|
@ -169,6 +169,31 @@ func TestStateTimeoutWait(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Related issue: #39352
|
||||
func TestCorrectStateWaitResultAfterRestart(t *testing.T) {
|
||||
s := NewState()
|
||||
|
||||
s.Lock()
|
||||
s.SetRunning(0, true)
|
||||
s.Unlock()
|
||||
|
||||
waitC := s.Wait(context.Background(), WaitConditionNotRunning)
|
||||
want := ExitStatus{ExitCode: 10, ExitedAt: time.Now()}
|
||||
|
||||
s.Lock()
|
||||
s.SetRestarting(&want)
|
||||
s.Unlock()
|
||||
|
||||
s.Lock()
|
||||
s.SetRunning(0, true)
|
||||
s.Unlock()
|
||||
|
||||
got := <-waitC
|
||||
if got.exitCode != want.ExitCode {
|
||||
t.Fatalf("expected exit code %v, got %v", want.ExitCode, got.exitCode)
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsValidStateString(t *testing.T) {
|
||||
states := []struct {
|
||||
state string
|
||||
|
|
|
@ -52,7 +52,20 @@ func (daemon *Daemon) handleContainerExit(c *container.Container, e *libcontaine
|
|||
}
|
||||
}
|
||||
|
||||
restart, wait, err := c.RestartManager().ShouldRestart(ec, daemon.IsShuttingDown() || c.HasBeenManuallyStopped, time.Since(c.StartedAt))
|
||||
daemonShutdown := daemon.IsShuttingDown()
|
||||
execDuration := time.Since(c.StartedAt)
|
||||
restart, wait, err := c.RestartManager().ShouldRestart(ec, daemonShutdown || c.HasBeenManuallyStopped, execDuration)
|
||||
if err != nil {
|
||||
logrus.WithError(err).
|
||||
WithField("container", c.ID).
|
||||
WithField("restartCount", c.RestartCount).
|
||||
WithField("exitStatus", exitStatus).
|
||||
WithField("daemonShuttingDown", daemonShutdown).
|
||||
WithField("hasBeenManuallyStopped", c.HasBeenManuallyStopped).
|
||||
WithField("execDuration", execDuration).
|
||||
Warn("ShouldRestart failed, container will not be restarted")
|
||||
restart = false
|
||||
}
|
||||
|
||||
// cancel healthcheck here, they will be automatically
|
||||
// restarted if/when the container is started again
|
||||
|
@ -62,13 +75,20 @@ func (daemon *Daemon) handleContainerExit(c *container.Container, e *libcontaine
|
|||
}
|
||||
daemon.Cleanup(c)
|
||||
|
||||
if err == nil && restart {
|
||||
if restart {
|
||||
c.RestartCount++
|
||||
logrus.WithField("container", c.ID).
|
||||
WithField("restartCount", c.RestartCount).
|
||||
WithField("exitStatus", exitStatus).
|
||||
WithField("manualRestart", c.HasBeenManuallyRestarted).
|
||||
Debug("Restarting container")
|
||||
c.SetRestarting(&exitStatus)
|
||||
} else {
|
||||
c.SetStopped(&exitStatus)
|
||||
if !c.HasBeenManuallyRestarted {
|
||||
defer daemon.autoRemove(c)
|
||||
}
|
||||
}
|
||||
defer c.Unlock() // needs to be called before autoRemove
|
||||
|
||||
daemon.setStateCounter(c)
|
||||
|
@ -76,7 +96,7 @@ func (daemon *Daemon) handleContainerExit(c *container.Container, e *libcontaine
|
|||
|
||||
daemon.LogContainerEventWithAttributes(c, "die", attributes)
|
||||
|
||||
if err == nil && restart {
|
||||
if restart {
|
||||
go func() {
|
||||
err := <-wait
|
||||
if err == nil {
|
||||
|
|
|
@ -6,7 +6,6 @@ import (
|
|||
|
||||
containertypes "github.com/docker/docker/api/types/container"
|
||||
"github.com/docker/docker/container"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// ContainerRestart stops and starts a container. It attempts to
|
||||
|
@ -52,19 +51,11 @@ func (daemon *Daemon) containerRestart(ctx context.Context, container *container
|
|||
}
|
||||
|
||||
if container.IsRunning() {
|
||||
// set AutoRemove flag to false before stop so the container won't be
|
||||
// removed during restart process
|
||||
autoRemove := container.HostConfig.AutoRemove
|
||||
container.Lock()
|
||||
container.HasBeenManuallyRestarted = true
|
||||
container.Unlock()
|
||||
|
||||
container.HostConfig.AutoRemove = false
|
||||
err := daemon.containerStop(ctx, container, options)
|
||||
// restore AutoRemove irrespective of whether the stop worked or not
|
||||
container.HostConfig.AutoRemove = autoRemove
|
||||
// containerStop will write HostConfig to disk, we shall restore AutoRemove
|
||||
// in disk too
|
||||
if toDiskErr := daemon.checkpointAndSave(container); toDiskErr != nil {
|
||||
logrus.Errorf("Write container to disk error: %v", toDiskErr)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -206,6 +206,7 @@ func (daemon *Daemon) containerStart(container *container.Container, checkpoint
|
|||
return translateContainerdStartErr(container.Path, container.SetExitCode, err)
|
||||
}
|
||||
|
||||
container.HasBeenManuallyRestarted = false
|
||||
container.SetRunning(pid, true)
|
||||
container.HasBeenStartedBefore = true
|
||||
daemon.setStateCounter(container)
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"github.com/docker/docker/api/types"
|
||||
"github.com/docker/docker/api/types/container"
|
||||
"github.com/docker/docker/client"
|
||||
testContainer "github.com/docker/docker/integration/internal/container"
|
||||
"github.com/docker/docker/testutil/daemon"
|
||||
"gotest.tools/v3/assert"
|
||||
"gotest.tools/v3/poll"
|
||||
|
@ -153,3 +154,60 @@ func pollForNewHealthCheck(ctx context.Context, client *client.Client, startTime
|
|||
return poll.Continue("waiting for a new container healthcheck")
|
||||
}
|
||||
}
|
||||
|
||||
// Container started with --rm should be able to be restarted.
|
||||
// It should be removed only if killed or stopped
|
||||
func TestContainerWithAutoRemoveCanBeRestarted(t *testing.T) {
|
||||
defer setupTest(t)()
|
||||
cli := testEnv.APIClient()
|
||||
ctx := context.Background()
|
||||
|
||||
noWaitTimeout := 0
|
||||
|
||||
for _, tc := range []struct {
|
||||
desc string
|
||||
doSth func(ctx context.Context, containerID string) error
|
||||
}{
|
||||
{
|
||||
desc: "kill",
|
||||
doSth: func(ctx context.Context, containerID string) error {
|
||||
return cli.ContainerKill(ctx, containerID, "SIGKILL")
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "stop",
|
||||
doSth: func(ctx context.Context, containerID string) error {
|
||||
return cli.ContainerStop(ctx, containerID, container.StopOptions{Timeout: &noWaitTimeout})
|
||||
},
|
||||
},
|
||||
} {
|
||||
tc := tc
|
||||
t.Run(tc.desc, func(t *testing.T) {
|
||||
cID := testContainer.Run(ctx, t, cli,
|
||||
testContainer.WithName("autoremove-restart-and-"+tc.desc),
|
||||
testContainer.WithAutoRemove,
|
||||
)
|
||||
defer func() {
|
||||
err := cli.ContainerRemove(ctx, cID, types.ContainerRemoveOptions{Force: true})
|
||||
if t.Failed() && err != nil {
|
||||
t.Logf("Cleaning up test container failed with error: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
err := cli.ContainerRestart(ctx, cID, container.StopOptions{Timeout: &noWaitTimeout})
|
||||
assert.NilError(t, err)
|
||||
|
||||
inspect, err := cli.ContainerInspect(ctx, cID)
|
||||
assert.NilError(t, err)
|
||||
assert.Assert(t, inspect.State.Status != "removing", "Container should not be removing yet")
|
||||
|
||||
poll.WaitOn(t, testContainer.IsInState(ctx, cli, cID, "running"))
|
||||
|
||||
err = tc.doSth(ctx, cID)
|
||||
assert.NilError(t, err)
|
||||
|
||||
poll.WaitOn(t, testContainer.IsRemoved(ctx, cli, cID))
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/docker/docker/api/types"
|
||||
containertypes "github.com/docker/docker/api/types/container"
|
||||
"github.com/docker/docker/integration/internal/container"
|
||||
"github.com/docker/docker/testutil/request"
|
||||
|
@ -156,3 +157,69 @@ func TestWaitConditions(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestWaitRestartedContainer(t *testing.T) {
|
||||
defer setupTest(t)()
|
||||
cli := request.NewAPIClient(t)
|
||||
|
||||
testCases := []struct {
|
||||
doc string
|
||||
waitCond containertypes.WaitCondition
|
||||
}{
|
||||
{
|
||||
doc: "default",
|
||||
},
|
||||
{
|
||||
doc: "not-running",
|
||||
waitCond: containertypes.WaitConditionNotRunning,
|
||||
},
|
||||
{
|
||||
doc: "next-exit",
|
||||
waitCond: containertypes.WaitConditionNextExit,
|
||||
},
|
||||
}
|
||||
|
||||
// We can't catch the SIGTERM in the Windows based busybox image
|
||||
isWindowDaemon := testEnv.DaemonInfo.OSType == "windows"
|
||||
|
||||
for _, tc := range testCases {
|
||||
tc := tc
|
||||
t.Run(tc.doc, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx := context.Background()
|
||||
containerID := container.Run(ctx, t, cli,
|
||||
container.WithCmd("sh", "-c", "trap 'exit 5' SIGTERM; while true; do sleep 0.1; done"),
|
||||
)
|
||||
defer cli.ContainerRemove(ctx, containerID, types.ContainerRemoveOptions{Force: true})
|
||||
|
||||
poll.WaitOn(t, container.IsInState(ctx, cli, containerID, "running"), poll.WithTimeout(30*time.Second), poll.WithDelay(100*time.Millisecond))
|
||||
|
||||
// Container is running now, wait for exit
|
||||
waitResC, errC := cli.ContainerWait(ctx, containerID, tc.waitCond)
|
||||
|
||||
timeout := 5
|
||||
// On Windows it will always timeout, because our process won't receive SIGTERM
|
||||
// Skip to force killing immediately
|
||||
if isWindowDaemon {
|
||||
timeout = 0
|
||||
}
|
||||
|
||||
err := cli.ContainerRestart(ctx, containerID, containertypes.StopOptions{Timeout: &timeout, Signal: "SIGTERM"})
|
||||
assert.NilError(t, err)
|
||||
|
||||
select {
|
||||
case err := <-errC:
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
case <-time.After(time.Second * 3):
|
||||
t.Fatalf("Wait should end after restart")
|
||||
case waitRes := <-waitResC:
|
||||
expectedCode := int64(5)
|
||||
|
||||
if !isWindowDaemon {
|
||||
assert.Check(t, is.Equal(expectedCode, waitRes.StatusCode))
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -57,3 +57,17 @@ func IsSuccessful(ctx context.Context, client client.APIClient, containerID stri
|
|||
return poll.Continue("waiting for container to be \"exited\", currently %s", inspect.State.Status)
|
||||
}
|
||||
}
|
||||
|
||||
// IsRemoved verifies the container has been removed
|
||||
func IsRemoved(ctx context.Context, cli client.APIClient, containerID string) func(log poll.LogT) poll.Result {
|
||||
return func(log poll.LogT) poll.Result {
|
||||
inspect, err := cli.ContainerInspect(ctx, containerID)
|
||||
if err != nil {
|
||||
if client.IsErrNotFound(err) {
|
||||
return poll.Success()
|
||||
}
|
||||
return poll.Error(err)
|
||||
}
|
||||
return poll.Continue("waiting for container to be removed, currently %s", inspect.State.Status)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue