Update Container Wait Backend
This patch consolidates the two WaitStop and WaitWithContext methods on the container.State type. Now there is a single method, Wait, which takes a context and a bool specifying whether to wait for not just a container exit but also removal. The behavior has been changed slightly so that a wait call during a Created state will not return immediately but instead wait for the container to be started and then exited. The interface has been changed to no longer block, but instead returns a channel on which the caller can receive a *StateStatus value which indicates the ExitCode or an error if there was one (like a context timeout or state transition error). These changes have been propagated through the rest of the deamon to preserve all other existing behavior. Docker-DCO-1.1-Signed-off-by: Josh Hawn <josh.hawn@docker.com> (github: jlhawn)
This commit is contained in:
parent
1290ec2d4b
commit
cfdf84d5d0
16 changed files with 286 additions and 166 deletions
|
@ -2,7 +2,6 @@ package container
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
"io"
|
||||||
"time"
|
|
||||||
|
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
|
|
||||||
|
@ -10,6 +9,7 @@ import (
|
||||||
"github.com/docker/docker/api/types/backend"
|
"github.com/docker/docker/api/types/backend"
|
||||||
"github.com/docker/docker/api/types/container"
|
"github.com/docker/docker/api/types/container"
|
||||||
"github.com/docker/docker/api/types/filters"
|
"github.com/docker/docker/api/types/filters"
|
||||||
|
containerpkg "github.com/docker/docker/container"
|
||||||
"github.com/docker/docker/pkg/archive"
|
"github.com/docker/docker/pkg/archive"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -44,7 +44,7 @@ type stateBackend interface {
|
||||||
ContainerStop(name string, seconds *int) error
|
ContainerStop(name string, seconds *int) error
|
||||||
ContainerUnpause(name string) error
|
ContainerUnpause(name string) error
|
||||||
ContainerUpdate(name string, hostConfig *container.HostConfig) (container.ContainerUpdateOKBody, error)
|
ContainerUpdate(name string, hostConfig *container.HostConfig) (container.ContainerUpdateOKBody, error)
|
||||||
ContainerWait(name string, timeout time.Duration) (int, error)
|
ContainerWait(ctx context.Context, name string, untilRemoved bool) (<-chan *containerpkg.StateStatus, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// monitorBackend includes functions to implement to provide containers monitoring functionality.
|
// monitorBackend includes functions to implement to provide containers monitoring functionality.
|
||||||
|
|
|
@ -7,7 +7,6 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/Sirupsen/logrus"
|
"github.com/Sirupsen/logrus"
|
||||||
"github.com/docker/docker/api"
|
"github.com/docker/docker/api"
|
||||||
|
@ -284,13 +283,15 @@ func (s *containerRouter) postContainersUnpause(ctx context.Context, w http.Resp
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *containerRouter) postContainersWait(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
|
func (s *containerRouter) postContainersWait(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
|
||||||
status, err := s.backend.ContainerWait(vars["name"], -1*time.Second)
|
waitC, err := s.backend.ContainerWait(ctx, vars["name"], false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
status := <-waitC
|
||||||
|
|
||||||
return httputils.WriteJSON(w, http.StatusOK, &container.ContainerWaitOKBody{
|
return httputils.WriteJSON(w, http.StatusOK, &container.ContainerWaitOKBody{
|
||||||
StatusCode: int64(status),
|
StatusCode: int64(status.ExitCode()),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -6,12 +6,13 @@ package builder
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
"io"
|
||||||
"time"
|
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
|
||||||
"github.com/docker/docker/api/types"
|
"github.com/docker/docker/api/types"
|
||||||
"github.com/docker/docker/api/types/backend"
|
"github.com/docker/docker/api/types/backend"
|
||||||
"github.com/docker/docker/api/types/container"
|
"github.com/docker/docker/api/types/container"
|
||||||
"golang.org/x/net/context"
|
containerpkg "github.com/docker/docker/container"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -49,7 +50,7 @@ type Backend interface {
|
||||||
// ContainerStart starts a new container
|
// ContainerStart starts a new container
|
||||||
ContainerStart(containerID string, hostConfig *container.HostConfig, checkpoint string, checkpointDir string) error
|
ContainerStart(containerID string, hostConfig *container.HostConfig, checkpoint string, checkpointDir string) error
|
||||||
// ContainerWait stops processing until the given container is stopped.
|
// ContainerWait stops processing until the given container is stopped.
|
||||||
ContainerWait(containerID string, timeout time.Duration) (int, error)
|
ContainerWait(ctx context.Context, name string, untilRemoved bool) (<-chan *containerpkg.StateStatus, error)
|
||||||
// ContainerCreateWorkdir creates the workdir
|
// ContainerCreateWorkdir creates the workdir
|
||||||
ContainerCreateWorkdir(containerID string) error
|
ContainerCreateWorkdir(containerID string) error
|
||||||
|
|
||||||
|
|
|
@ -4,6 +4,7 @@ package dockerfile
|
||||||
// non-contiguous functionality. Please read the comments.
|
// non-contiguous functionality. Please read the comments.
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
@ -596,16 +597,25 @@ func (b *Builder) run(cID string, cmd []string) (err error) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if ret, _ := b.docker.ContainerWait(cID, -1); ret != 0 {
|
waitC, err := b.docker.ContainerWait(context.Background(), cID, false)
|
||||||
|
if err != nil {
|
||||||
|
// Unable to begin waiting for container.
|
||||||
close(finished)
|
close(finished)
|
||||||
if cancelErr := <-cancelErrCh; cancelErr != nil {
|
if cancelErr := <-cancelErrCh; cancelErr != nil {
|
||||||
logrus.Debugf("Build cancelled (%v) and got a non-zero code from ContainerWait: %d",
|
logrus.Debugf("Build cancelled (%v) and unable to begin ContainerWait: %d", cancelErr, err)
|
||||||
cancelErr, ret)
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if status := <-waitC; status.ExitCode() != 0 {
|
||||||
|
close(finished)
|
||||||
|
if cancelErr := <-cancelErrCh; cancelErr != nil {
|
||||||
|
logrus.Debugf("Build cancelled (%v) and got a non-zero code from ContainerWait: %d", cancelErr, status.ExitCode())
|
||||||
}
|
}
|
||||||
// TODO: change error type, because jsonmessage.JSONError assumes HTTP
|
// TODO: change error type, because jsonmessage.JSONError assumes HTTP
|
||||||
return &jsonmessage.JSONError{
|
return &jsonmessage.JSONError{
|
||||||
Message: fmt.Sprintf("The command '%s' returned a non-zero code: %d", strings.Join(cmd, " "), ret),
|
Message: fmt.Sprintf("The command '%s' returned a non-zero code: %d", strings.Join(cmd, " "), status.ExitCode()),
|
||||||
Code: ret,
|
Code: status.ExitCode(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
close(finished)
|
close(finished)
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package container
|
package container
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
@ -29,23 +30,25 @@ type State struct {
|
||||||
ErrorMsg string `json:"Error"` // contains last known error when starting the container
|
ErrorMsg string `json:"Error"` // contains last known error when starting the container
|
||||||
StartedAt time.Time
|
StartedAt time.Time
|
||||||
FinishedAt time.Time
|
FinishedAt time.Time
|
||||||
waitChan chan struct{}
|
|
||||||
Health *Health
|
Health *Health
|
||||||
|
|
||||||
|
waitStop chan struct{}
|
||||||
|
waitRemove chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// StateStatus is used to return an error type implementing both
|
// StateStatus is used to return container wait results.
|
||||||
// exec.ExitCode and error.
|
// Implements exec.ExitCode interface.
|
||||||
// This type is needed as State include a sync.Mutex field which make
|
// This type is needed as State include a sync.Mutex field which make
|
||||||
// copying it unsafe.
|
// copying it unsafe.
|
||||||
type StateStatus struct {
|
type StateStatus struct {
|
||||||
exitCode int
|
exitCode int
|
||||||
error string
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
func newStateStatus(ec int, err string) *StateStatus {
|
func newStateStatus(ec int, err error) *StateStatus {
|
||||||
return &StateStatus{
|
return &StateStatus{
|
||||||
exitCode: ec,
|
exitCode: ec,
|
||||||
error: err,
|
err: err,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -54,15 +57,17 @@ func (ss *StateStatus) ExitCode() int {
|
||||||
return ss.exitCode
|
return ss.exitCode
|
||||||
}
|
}
|
||||||
|
|
||||||
// Error returns current error for the state.
|
// Err returns current error for the state. Returns nil if the container had
|
||||||
func (ss *StateStatus) Error() string {
|
// exited on its own.
|
||||||
return ss.error
|
func (ss *StateStatus) Err() error {
|
||||||
|
return ss.err
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewState creates a default state object with a fresh channel for state changes.
|
// NewState creates a default state object with a fresh channel for state changes.
|
||||||
func NewState() *State {
|
func NewState() *State {
|
||||||
return &State{
|
return &State{
|
||||||
waitChan: make(chan struct{}),
|
waitStop: make(chan struct{}),
|
||||||
|
waitRemove: make(chan struct{}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -160,64 +165,73 @@ func IsValidStateString(s string) bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func wait(waitChan <-chan struct{}, timeout time.Duration) error {
|
func (s *State) isStopped() bool {
|
||||||
if timeout < 0 {
|
// The state is not considered "stopped" if it is either "created",
|
||||||
<-waitChan
|
// "running", or "paused".
|
||||||
return nil
|
switch s.StateString() {
|
||||||
}
|
case "created", "running", "paused":
|
||||||
select {
|
return false
|
||||||
case <-time.After(timeout):
|
default:
|
||||||
return fmt.Errorf("Timed out: %v", timeout)
|
return true
|
||||||
case <-waitChan:
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WaitStop waits until state is stopped. If state already stopped it returns
|
// Wait waits until the continer is in a "stopped" state. A context can be used
|
||||||
// immediately. If you want wait forever you must supply negative timeout.
|
// for cancelling the request or controlling timeouts. If untilRemoved is true,
|
||||||
// Returns exit code, that was passed to SetStopped
|
// Wait will block until the SetRemoved() method has been called. Wait must be
|
||||||
func (s *State) WaitStop(timeout time.Duration) (int, error) {
|
// called without holding the state lock. Returns a channel which can be used
|
||||||
ctx := context.Background()
|
// to receive the result. If the container exited on its own, the result's Err() method wil be nil and
|
||||||
if timeout >= 0 {
|
// its ExitCode() method will return the conatiners exit code, otherwise, the
|
||||||
var cancel func()
|
// results Err() method will return an error indicating why the wait operation
|
||||||
ctx, cancel = context.WithTimeout(ctx, timeout)
|
// failed.
|
||||||
defer cancel()
|
func (s *State) Wait(ctx context.Context, untilRemoved bool) <-chan *StateStatus {
|
||||||
}
|
|
||||||
if err := s.WaitWithContext(ctx); err != nil {
|
|
||||||
if status, ok := err.(*StateStatus); ok {
|
|
||||||
return status.ExitCode(), nil
|
|
||||||
}
|
|
||||||
return -1, err
|
|
||||||
}
|
|
||||||
return 0, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// WaitWithContext waits for the container to stop. Optional context can be
|
|
||||||
// passed for canceling the request.
|
|
||||||
func (s *State) WaitWithContext(ctx context.Context) error {
|
|
||||||
s.Lock()
|
s.Lock()
|
||||||
if !s.Running {
|
|
||||||
state := newStateStatus(s.ExitCode(), s.Error())
|
|
||||||
defer s.Unlock()
|
defer s.Unlock()
|
||||||
if state.ExitCode() == 0 {
|
|
||||||
return nil
|
if !untilRemoved && s.isStopped() {
|
||||||
|
// We are not waiting for removal and the container is already
|
||||||
|
// in a stopped state so just return the current state.
|
||||||
|
result := newStateStatus(s.ExitCode(), s.Err())
|
||||||
|
|
||||||
|
// Buffer so we don't block putting it in the channel.
|
||||||
|
resultC := make(chan *StateStatus, 1)
|
||||||
|
resultC <- result
|
||||||
|
|
||||||
|
return resultC
|
||||||
}
|
}
|
||||||
return state
|
|
||||||
|
// The waitStop chan will remain nil if we are waiting for removal, in
|
||||||
|
// which case it would block forever.
|
||||||
|
var waitStop chan struct{}
|
||||||
|
if !untilRemoved {
|
||||||
|
waitStop = s.waitStop
|
||||||
}
|
}
|
||||||
waitChan := s.waitChan
|
|
||||||
s.Unlock()
|
// Always wait for removal, just in case the container gets removed
|
||||||
|
// while it is still in a "created" state, in which case it is never
|
||||||
|
// actually stopped.
|
||||||
|
waitRemove := s.waitRemove
|
||||||
|
|
||||||
|
resultC := make(chan *StateStatus)
|
||||||
|
|
||||||
|
go func() {
|
||||||
select {
|
select {
|
||||||
case <-waitChan:
|
|
||||||
s.Lock()
|
|
||||||
state := newStateStatus(s.ExitCode(), s.Error())
|
|
||||||
s.Unlock()
|
|
||||||
if state.ExitCode() == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return state
|
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return ctx.Err()
|
// Context timeout or cancellation.
|
||||||
|
resultC <- newStateStatus(-1, ctx.Err())
|
||||||
|
return
|
||||||
|
case <-waitStop:
|
||||||
|
case <-waitRemove:
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.Lock()
|
||||||
|
result := newStateStatus(s.ExitCode(), s.Err())
|
||||||
|
s.Unlock()
|
||||||
|
|
||||||
|
resultC <- result
|
||||||
|
}()
|
||||||
|
|
||||||
|
return resultC
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsRunning returns whether the running flag is set. Used by Container to check whether a container is running.
|
// IsRunning returns whether the running flag is set. Used by Container to check whether a container is running.
|
||||||
|
@ -268,8 +282,8 @@ func (s *State) SetStopped(exitStatus *ExitStatus) {
|
||||||
s.Pid = 0
|
s.Pid = 0
|
||||||
s.FinishedAt = time.Now().UTC()
|
s.FinishedAt = time.Now().UTC()
|
||||||
s.setFromExitStatus(exitStatus)
|
s.setFromExitStatus(exitStatus)
|
||||||
close(s.waitChan) // fire waiters for stop
|
close(s.waitStop) // Fire waiters for stop
|
||||||
s.waitChan = make(chan struct{})
|
s.waitStop = make(chan struct{})
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetRestarting sets the container state to "restarting" without locking.
|
// SetRestarting sets the container state to "restarting" without locking.
|
||||||
|
@ -282,8 +296,8 @@ func (s *State) SetRestarting(exitStatus *ExitStatus) {
|
||||||
s.Pid = 0
|
s.Pid = 0
|
||||||
s.FinishedAt = time.Now().UTC()
|
s.FinishedAt = time.Now().UTC()
|
||||||
s.setFromExitStatus(exitStatus)
|
s.setFromExitStatus(exitStatus)
|
||||||
close(s.waitChan) // fire waiters for stop
|
close(s.waitStop) // Fire waiters for stop
|
||||||
s.waitChan = make(chan struct{})
|
s.waitStop = make(chan struct{})
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetError sets the container's error state. This is useful when we want to
|
// SetError sets the container's error state. This is useful when we want to
|
||||||
|
@ -335,6 +349,23 @@ func (s *State) SetDead() {
|
||||||
s.Unlock()
|
s.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetRemoved assumes this container is already in the "dead" state and
|
||||||
|
// closes the internal waitRemove channel to unblock callers waiting for a
|
||||||
|
// container to be removed.
|
||||||
|
func (s *State) SetRemoved() {
|
||||||
|
s.Lock()
|
||||||
|
close(s.waitRemove) // Unblock those waiting on remove.
|
||||||
|
s.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Err returns an error if there is one.
|
||||||
|
func (s *State) Err() error {
|
||||||
|
if s.ErrorMsg != "" {
|
||||||
|
return errors.New(s.ErrorMsg)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Error returns current error for the state.
|
// Error returns current error for the state.
|
||||||
func (s *State) Error() string {
|
func (s *State) Error() string {
|
||||||
return s.ErrorMsg
|
return s.ErrorMsg
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
package container
|
package container
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sync/atomic"
|
"context"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -30,31 +30,49 @@ func TestIsValidHealthString(t *testing.T) {
|
||||||
|
|
||||||
func TestStateRunStop(t *testing.T) {
|
func TestStateRunStop(t *testing.T) {
|
||||||
s := NewState()
|
s := NewState()
|
||||||
for i := 1; i < 3; i++ { // full lifecycle two times
|
|
||||||
|
// An initial wait (in "created" state) should block until the
|
||||||
|
// container has started and exited. It shouldn't take more than 100
|
||||||
|
// milliseconds.
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
|
||||||
|
defer cancel()
|
||||||
|
initialWait := s.Wait(ctx, false)
|
||||||
|
|
||||||
|
// Begin another wait for the final removed state. It should complete
|
||||||
|
// within 200 milliseconds.
|
||||||
|
ctx, cancel = context.WithTimeout(context.Background(), 200*time.Millisecond)
|
||||||
|
defer cancel()
|
||||||
|
removalWait := s.Wait(ctx, true)
|
||||||
|
|
||||||
|
// Full lifecycle two times.
|
||||||
|
for i := 1; i <= 2; i++ {
|
||||||
|
// Set the state to "Running".
|
||||||
s.Lock()
|
s.Lock()
|
||||||
s.SetRunning(i+100, false)
|
s.SetRunning(i, true)
|
||||||
s.Unlock()
|
s.Unlock()
|
||||||
|
|
||||||
|
// Assert desired state.
|
||||||
if !s.IsRunning() {
|
if !s.IsRunning() {
|
||||||
t.Fatal("State not running")
|
t.Fatal("State not running")
|
||||||
}
|
}
|
||||||
if s.Pid != i+100 {
|
if s.Pid != i {
|
||||||
t.Fatalf("Pid %v, expected %v", s.Pid, i+100)
|
t.Fatalf("Pid %v, expected %v", s.Pid, i)
|
||||||
}
|
}
|
||||||
if s.ExitCode() != 0 {
|
if s.ExitCode() != 0 {
|
||||||
t.Fatalf("ExitCode %v, expected 0", s.ExitCode())
|
t.Fatalf("ExitCode %v, expected 0", s.ExitCode())
|
||||||
}
|
}
|
||||||
|
|
||||||
stopped := make(chan struct{})
|
// Async wait up to 50 milliseconds for the exit status.
|
||||||
var exit int64
|
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
|
||||||
go func() {
|
defer cancel()
|
||||||
exitCode, _ := s.WaitStop(-1 * time.Second)
|
exitWait := s.Wait(ctx, false)
|
||||||
atomic.StoreInt64(&exit, int64(exitCode))
|
|
||||||
close(stopped)
|
// Set the state to "Exited".
|
||||||
}()
|
|
||||||
s.Lock()
|
s.Lock()
|
||||||
s.SetStopped(&ExitStatus{ExitCode: i})
|
s.SetStopped(&ExitStatus{ExitCode: i})
|
||||||
s.Unlock()
|
s.Unlock()
|
||||||
|
|
||||||
|
// Assert desired state.
|
||||||
if s.IsRunning() {
|
if s.IsRunning() {
|
||||||
t.Fatal("State is running")
|
t.Fatal("State is running")
|
||||||
}
|
}
|
||||||
|
@ -64,50 +82,86 @@ func TestStateRunStop(t *testing.T) {
|
||||||
if s.Pid != 0 {
|
if s.Pid != 0 {
|
||||||
t.Fatalf("Pid %v, expected 0", s.Pid)
|
t.Fatalf("Pid %v, expected 0", s.Pid)
|
||||||
}
|
}
|
||||||
select {
|
|
||||||
case <-time.After(100 * time.Millisecond):
|
// Receive the exitWait result.
|
||||||
t.Fatal("Stop callback doesn't fire in 100 milliseconds")
|
status := <-exitWait
|
||||||
case <-stopped:
|
if status.ExitCode() != i {
|
||||||
t.Log("Stop callback fired")
|
t.Fatalf("ExitCode %v, expected %v, err %q", status.ExitCode(), i, status.Err())
|
||||||
}
|
}
|
||||||
exitCode := int(atomic.LoadInt64(&exit))
|
|
||||||
if exitCode != i {
|
// A repeated call to Wait() should not block at this point.
|
||||||
t.Fatalf("ExitCode %v, expected %v", exitCode, i)
|
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Millisecond)
|
||||||
|
defer cancel()
|
||||||
|
exitWait = s.Wait(ctx, false)
|
||||||
|
|
||||||
|
status = <-exitWait
|
||||||
|
if status.ExitCode() != i {
|
||||||
|
t.Fatalf("ExitCode %v, expected %v, err %q", status.ExitCode(), i, status.Err())
|
||||||
}
|
}
|
||||||
if exitCode, err := s.WaitStop(-1 * time.Second); err != nil || exitCode != i {
|
|
||||||
t.Fatalf("WaitStop returned exitCode: %v, err: %v, expected exitCode: %v, err: %v", exitCode, err, i, nil)
|
if i == 1 {
|
||||||
|
// Make sure our initial wait also succeeds.
|
||||||
|
status = <-initialWait
|
||||||
|
if status.ExitCode() != i {
|
||||||
|
// Should have the exit code from this first loop.
|
||||||
|
t.Fatalf("Initial wait exitCode %v, expected %v, err %q", status.ExitCode(), i, status.Err())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set the state to dead and removed.
|
||||||
|
s.SetDead()
|
||||||
|
s.SetRemoved()
|
||||||
|
|
||||||
|
// Wait for removed status or timeout.
|
||||||
|
status := <-removalWait
|
||||||
|
if status.ExitCode() != 2 {
|
||||||
|
// Should have the final exit code from the loop.
|
||||||
|
t.Fatalf("Removal wait exitCode %v, expected %v, err %q", status.ExitCode(), 2, status.Err())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestStateTimeoutWait(t *testing.T) {
|
func TestStateTimeoutWait(t *testing.T) {
|
||||||
s := NewState()
|
s := NewState()
|
||||||
stopped := make(chan struct{})
|
|
||||||
go func() {
|
// Start a wait with a timeout.
|
||||||
s.WaitStop(100 * time.Millisecond)
|
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
|
||||||
close(stopped)
|
defer cancel()
|
||||||
}()
|
waitC := s.Wait(ctx, false)
|
||||||
|
|
||||||
|
// It should timeout *before* this 200ms timer does.
|
||||||
select {
|
select {
|
||||||
case <-time.After(200 * time.Millisecond):
|
case <-time.After(200 * time.Millisecond):
|
||||||
t.Fatal("Stop callback doesn't fire in 200 milliseconds")
|
t.Fatal("Stop callback doesn't fire in 200 milliseconds")
|
||||||
case <-stopped:
|
case status := <-waitC:
|
||||||
t.Log("Stop callback fired")
|
t.Log("Stop callback fired")
|
||||||
|
// Should be a timeout error.
|
||||||
|
if status.Err() == nil {
|
||||||
|
t.Fatal("expected timeout error, got nil")
|
||||||
|
}
|
||||||
|
if status.ExitCode() != -1 {
|
||||||
|
t.Fatalf("expected exit code %v, got %v", -1, status.ExitCode())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
s.Lock()
|
s.Lock()
|
||||||
s.SetStopped(&ExitStatus{ExitCode: 1})
|
s.SetRunning(0, true)
|
||||||
|
s.SetStopped(&ExitStatus{ExitCode: 0})
|
||||||
s.Unlock()
|
s.Unlock()
|
||||||
|
|
||||||
stopped = make(chan struct{})
|
// Start another wait with a timeout. This one should return
|
||||||
go func() {
|
// immediately.
|
||||||
s.WaitStop(100 * time.Millisecond)
|
ctx, cancel = context.WithTimeout(context.Background(), 100*time.Millisecond)
|
||||||
close(stopped)
|
defer cancel()
|
||||||
}()
|
waitC = s.Wait(ctx, false)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-time.After(200 * time.Millisecond):
|
case <-time.After(200 * time.Millisecond):
|
||||||
t.Fatal("Stop callback doesn't fire in 200 milliseconds")
|
t.Fatal("Stop callback doesn't fire in 200 milliseconds")
|
||||||
case <-stopped:
|
case status := <-waitC:
|
||||||
t.Log("Stop callback fired")
|
t.Log("Stop callback fired")
|
||||||
|
if status.ExitCode() != 0 {
|
||||||
|
t.Fatalf("expected exit code %v, got %v, err %q", 0, status.ExitCode(), status.Err())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,9 +1,9 @@
|
||||||
package daemon
|
package daemon
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/Sirupsen/logrus"
|
"github.com/Sirupsen/logrus"
|
||||||
"github.com/docker/docker/api/errors"
|
"github.com/docker/docker/api/errors"
|
||||||
|
@ -160,14 +160,11 @@ func (daemon *Daemon) containerAttach(c *container.Container, cfg *stream.Attach
|
||||||
cfg.Stdin = nil
|
cfg.Stdin = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
waitChan := make(chan struct{})
|
|
||||||
if c.Config.StdinOnce && !c.Config.Tty {
|
if c.Config.StdinOnce && !c.Config.Tty {
|
||||||
|
// Wait for the container to stop before returning.
|
||||||
|
waitChan := c.Wait(context.Background(), false)
|
||||||
defer func() {
|
defer func() {
|
||||||
<-waitChan
|
_ = <-waitChan // Ignore returned exit code.
|
||||||
}()
|
|
||||||
go func() {
|
|
||||||
c.WaitStop(-1 * time.Second)
|
|
||||||
close(waitChan)
|
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -13,6 +13,7 @@ import (
|
||||||
"github.com/docker/docker/api/types/filters"
|
"github.com/docker/docker/api/types/filters"
|
||||||
"github.com/docker/docker/api/types/network"
|
"github.com/docker/docker/api/types/network"
|
||||||
swarmtypes "github.com/docker/docker/api/types/swarm"
|
swarmtypes "github.com/docker/docker/api/types/swarm"
|
||||||
|
containerpkg "github.com/docker/docker/container"
|
||||||
clustertypes "github.com/docker/docker/daemon/cluster/provider"
|
clustertypes "github.com/docker/docker/daemon/cluster/provider"
|
||||||
"github.com/docker/docker/plugin"
|
"github.com/docker/docker/plugin"
|
||||||
"github.com/docker/libnetwork"
|
"github.com/docker/libnetwork"
|
||||||
|
@ -39,7 +40,7 @@ type Backend interface {
|
||||||
DeactivateContainerServiceBinding(containerName string) error
|
DeactivateContainerServiceBinding(containerName string) error
|
||||||
UpdateContainerServiceConfig(containerName string, serviceConfig *clustertypes.ServiceConfig) error
|
UpdateContainerServiceConfig(containerName string, serviceConfig *clustertypes.ServiceConfig) error
|
||||||
ContainerInspectCurrent(name string, size bool) (*types.ContainerJSON, error)
|
ContainerInspectCurrent(name string, size bool) (*types.ContainerJSON, error)
|
||||||
ContainerWaitWithContext(ctx context.Context, name string) error
|
ContainerWait(ctx context.Context, name string, untilRemoved bool) (<-chan *containerpkg.StateStatus, error)
|
||||||
ContainerRm(name string, config *types.ContainerRmConfig) error
|
ContainerRm(name string, config *types.ContainerRmConfig) error
|
||||||
ContainerKill(name string, sig uint64) error
|
ContainerKill(name string, sig uint64) error
|
||||||
SetContainerDependencyStore(name string, store exec.DependencyGetter) error
|
SetContainerDependencyStore(name string, store exec.DependencyGetter) error
|
||||||
|
|
|
@ -17,6 +17,7 @@ import (
|
||||||
"github.com/docker/docker/api/types/backend"
|
"github.com/docker/docker/api/types/backend"
|
||||||
containertypes "github.com/docker/docker/api/types/container"
|
containertypes "github.com/docker/docker/api/types/container"
|
||||||
"github.com/docker/docker/api/types/events"
|
"github.com/docker/docker/api/types/events"
|
||||||
|
containerpkg "github.com/docker/docker/container"
|
||||||
"github.com/docker/docker/daemon/cluster/convert"
|
"github.com/docker/docker/daemon/cluster/convert"
|
||||||
executorpkg "github.com/docker/docker/daemon/cluster/executor"
|
executorpkg "github.com/docker/docker/daemon/cluster/executor"
|
||||||
"github.com/docker/libnetwork"
|
"github.com/docker/libnetwork"
|
||||||
|
@ -337,8 +338,8 @@ func (c *containerAdapter) events(ctx context.Context) <-chan events.Message {
|
||||||
return eventsq
|
return eventsq
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *containerAdapter) wait(ctx context.Context) error {
|
func (c *containerAdapter) wait(ctx context.Context) (<-chan *containerpkg.StateStatus, error) {
|
||||||
return c.backend.ContainerWaitWithContext(ctx, c.container.nameOrID())
|
return c.backend.ContainerWait(ctx, c.container.nameOrID(), false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *containerAdapter) shutdown(ctx context.Context) error {
|
func (c *containerAdapter) shutdown(ctx context.Context) error {
|
||||||
|
|
|
@ -279,25 +279,27 @@ func (r *controller) Wait(pctx context.Context) error {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
err := r.adapter.wait(ctx)
|
waitC, err := r.adapter.wait(ctx)
|
||||||
if ctx.Err() != nil {
|
if err != nil {
|
||||||
return ctx.Err()
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if status := <-waitC; status.ExitCode() != 0 {
|
||||||
ee := &exitError{}
|
exitErr := &exitError{
|
||||||
if ec, ok := err.(exec.ExitCoder); ok {
|
code: status.ExitCode(),
|
||||||
ee.code = ec.ExitCode()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Set the cause if it is knowable.
|
||||||
select {
|
select {
|
||||||
case e := <-healthErr:
|
case e := <-healthErr:
|
||||||
ee.cause = e
|
exitErr.cause = e
|
||||||
default:
|
default:
|
||||||
if err.Error() != "" {
|
if status.Err() != nil {
|
||||||
ee.cause = err
|
exitErr.cause = status.Err()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return ee
|
|
||||||
|
return exitErr
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -3,6 +3,7 @@
|
||||||
package daemon
|
package daemon
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
|
@ -291,7 +292,12 @@ func (daemon *Daemon) setupConfigDir(c *container.Container) (setupErr error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func killProcessDirectly(container *container.Container) error {
|
func killProcessDirectly(container *container.Container) error {
|
||||||
if _, err := container.WaitStop(10 * time.Second); err != nil {
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// Block until the container to stops or timeout.
|
||||||
|
status := <-container.Wait(ctx, false)
|
||||||
|
if status.Err() != nil {
|
||||||
// Ensure that we don't kill ourselves
|
// Ensure that we don't kill ourselves
|
||||||
if pid := container.GetPID(); pid != 0 {
|
if pid := container.GetPID(); pid != 0 {
|
||||||
logrus.Infof("Container %s failed to exit within 10 seconds of kill - trying direct SIGKILL", stringid.TruncateID(container.ID))
|
logrus.Infof("Container %s failed to exit within 10 seconds of kill - trying direct SIGKILL", stringid.TruncateID(container.ID))
|
||||||
|
|
|
@ -6,6 +6,7 @@
|
||||||
package daemon
|
package daemon
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net"
|
"net"
|
||||||
|
@ -773,7 +774,12 @@ func (daemon *Daemon) shutdownContainer(c *container.Container) error {
|
||||||
if err := daemon.containerUnpause(c); err != nil {
|
if err := daemon.containerUnpause(c); err != nil {
|
||||||
return fmt.Errorf("Failed to unpause container %s with error: %v", c.ID, err)
|
return fmt.Errorf("Failed to unpause container %s with error: %v", c.ID, err)
|
||||||
}
|
}
|
||||||
if _, err := c.WaitStop(time.Duration(stopTimeout) * time.Second); err != nil {
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(stopTimeout)*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// Wait with timeout for container to exit.
|
||||||
|
if status := <-c.Wait(ctx, false); status.Err() != nil {
|
||||||
logrus.Debugf("container %s failed to exit in %d second of SIGTERM, sending SIGKILL to force", c.ID, stopTimeout)
|
logrus.Debugf("container %s failed to exit in %d second of SIGTERM, sending SIGKILL to force", c.ID, stopTimeout)
|
||||||
sig, ok := signal.SignalMap["KILL"]
|
sig, ok := signal.SignalMap["KILL"]
|
||||||
if !ok {
|
if !ok {
|
||||||
|
@ -782,8 +788,10 @@ func (daemon *Daemon) shutdownContainer(c *container.Container) error {
|
||||||
if err := daemon.kill(c, int(sig)); err != nil {
|
if err := daemon.kill(c, int(sig)); err != nil {
|
||||||
logrus.Errorf("Failed to SIGKILL container %s", c.ID)
|
logrus.Errorf("Failed to SIGKILL container %s", c.ID)
|
||||||
}
|
}
|
||||||
c.WaitStop(-1 * time.Second)
|
// Wait for exit again without a timeout.
|
||||||
return err
|
// Explicitly ignore the result.
|
||||||
|
_ = <-c.Wait(context.Background(), false)
|
||||||
|
return status.Err()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// If container failed to exit in stopTimeout seconds of SIGTERM, then using the force
|
// If container failed to exit in stopTimeout seconds of SIGTERM, then using the force
|
||||||
|
@ -791,7 +799,9 @@ func (daemon *Daemon) shutdownContainer(c *container.Container) error {
|
||||||
return fmt.Errorf("Failed to stop container %s with error: %v", c.ID, err)
|
return fmt.Errorf("Failed to stop container %s with error: %v", c.ID, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.WaitStop(-1 * time.Second)
|
// Wait without timeout for the container to exit.
|
||||||
|
// Ignore the result.
|
||||||
|
_ = <-c.Wait(context.Background(), false)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -134,6 +134,7 @@ func (daemon *Daemon) cleanupContainer(container *container.Container, forceRemo
|
||||||
if e := daemon.removeMountPoints(container, removeVolume); e != nil {
|
if e := daemon.removeMountPoints(container, removeVolume); e != nil {
|
||||||
logrus.Error(e)
|
logrus.Error(e)
|
||||||
}
|
}
|
||||||
|
container.SetRemoved()
|
||||||
stateCtr.del(container.ID)
|
stateCtr.del(container.ID)
|
||||||
daemon.LogContainerEvent(container, "destroy")
|
daemon.LogContainerEvent(container, "destroy")
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package daemon
|
package daemon
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"runtime"
|
"runtime"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -131,7 +132,10 @@ func (daemon *Daemon) Kill(container *container.Container) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err2 := container.WaitStop(2 * time.Second); err2 != nil {
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
if status := <-container.Wait(ctx, false); status.Err() != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -144,7 +148,10 @@ func (daemon *Daemon) Kill(container *container.Container) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
container.WaitStop(-1 * time.Second)
|
// Wait for exit with no timeout.
|
||||||
|
// Ignore returned status.
|
||||||
|
_ = <-container.Wait(context.Background(), false)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package daemon
|
package daemon
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
"time"
|
||||||
|
@ -60,7 +61,10 @@ func (daemon *Daemon) containerStop(container *container.Container, seconds int)
|
||||||
// So, instead we'll give it up to 2 more seconds to complete and if
|
// So, instead we'll give it up to 2 more seconds to complete and if
|
||||||
// by that time the container is still running, then the error
|
// by that time the container is still running, then the error
|
||||||
// we got is probably valid and so we force kill it.
|
// we got is probably valid and so we force kill it.
|
||||||
if _, err := container.WaitStop(2 * time.Second); err != nil {
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
if status := <-container.Wait(ctx, false); status.Err() != nil {
|
||||||
logrus.Infof("Container failed to stop after sending signal %d to the process, force killing", stopSignal)
|
logrus.Infof("Container failed to stop after sending signal %d to the process, force killing", stopSignal)
|
||||||
if err := daemon.killPossiblyDeadProcess(container, 9); err != nil {
|
if err := daemon.killPossiblyDeadProcess(container, 9); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -69,11 +73,15 @@ func (daemon *Daemon) containerStop(container *container.Container, seconds int)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. Wait for the process to exit on its own
|
// 2. Wait for the process to exit on its own
|
||||||
if _, err := container.WaitStop(time.Duration(seconds) * time.Second); err != nil {
|
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(seconds)*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
if status := <-container.Wait(ctx, false); status.Err() != nil {
|
||||||
logrus.Infof("Container %v failed to exit within %d seconds of signal %d - using the force", container.ID, seconds, stopSignal)
|
logrus.Infof("Container %v failed to exit within %d seconds of signal %d - using the force", container.ID, seconds, stopSignal)
|
||||||
// 3. If it doesn't, then send SIGKILL
|
// 3. If it doesn't, then send SIGKILL
|
||||||
if err := daemon.Kill(container); err != nil {
|
if err := daemon.Kill(container); err != nil {
|
||||||
container.WaitStop(-1 * time.Second)
|
// Wait without a timeout, ignore result.
|
||||||
|
_ = <-container.Wait(context.Background(), false)
|
||||||
logrus.Warn(err) // Don't return error because we only care that container is stopped, not what function stopped it
|
logrus.Warn(err) // Don't return error because we only care that container is stopped, not what function stopped it
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,32 +1,22 @@
|
||||||
package daemon
|
package daemon
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"time"
|
"github.com/docker/docker/container"
|
||||||
|
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ContainerWait stops processing until the given container is
|
// ContainerWait stops processing until the given container is stopped or
|
||||||
// stopped. If the container is not found, an error is returned. On a
|
// removed (if untilRemoved is true). If the container is not found, a nil
|
||||||
// successful stop, the exit code of the container is returned. On a
|
// channel and non-nil error is returned immediately. If the container is
|
||||||
// timeout, an error is returned. If you want to wait forever, supply
|
// found, a status result will be sent on the returned channel once the wait
|
||||||
// a negative duration for the timeout.
|
// condition is met or if an error occurs waiting for the container (such as a
|
||||||
func (daemon *Daemon) ContainerWait(name string, timeout time.Duration) (int, error) {
|
// context timeout or cancellation). On a successful stop, the exit code of the
|
||||||
|
// container is returned in the status with a non-nil Err() value.
|
||||||
|
func (daemon *Daemon) ContainerWait(ctx context.Context, name string, untilRemoved bool) (<-chan *container.StateStatus, error) {
|
||||||
container, err := daemon.GetContainer(name)
|
container, err := daemon.GetContainer(name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return -1, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return container.WaitStop(timeout)
|
return container.Wait(ctx, untilRemoved), nil
|
||||||
}
|
|
||||||
|
|
||||||
// ContainerWaitWithContext returns a channel where exit code is sent
|
|
||||||
// when container stops. Channel can be cancelled with a context.
|
|
||||||
func (daemon *Daemon) ContainerWaitWithContext(ctx context.Context, name string) error {
|
|
||||||
container, err := daemon.GetContainer(name)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return container.WaitWithContext(ctx)
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue