Merge pull request #32237 from jlhawn/update_container_wait

Update Container Wait
This commit is contained in:
Sebastiaan van Stijn 2017-05-17 02:39:52 +02:00 committed by GitHub
commit c053a2069e
31 changed files with 560 additions and 286 deletions

View file

@ -2,7 +2,6 @@ package container
import (
"io"
"time"
"golang.org/x/net/context"
@ -10,6 +9,7 @@ import (
"github.com/docker/docker/api/types/backend"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/filters"
containerpkg "github.com/docker/docker/container"
"github.com/docker/docker/pkg/archive"
)
@ -44,7 +44,7 @@ type stateBackend interface {
ContainerStop(name string, seconds *int) error
ContainerUnpause(name string) error
ContainerUpdate(name string, hostConfig *container.HostConfig) (container.ContainerUpdateOKBody, error)
ContainerWait(name string, timeout time.Duration) (int, error)
ContainerWait(ctx context.Context, name string, condition containerpkg.WaitCondition) (<-chan containerpkg.StateStatus, error)
}
// monitorBackend includes functions to implement to provide containers monitoring functionality.

View file

@ -59,7 +59,7 @@ func (r *containerRouter) initRoutes() {
router.NewPostRoute("/containers/{name:.*}/restart", r.postContainersRestart),
router.NewPostRoute("/containers/{name:.*}/start", r.postContainersStart),
router.NewPostRoute("/containers/{name:.*}/stop", r.postContainersStop),
router.NewPostRoute("/containers/{name:.*}/wait", r.postContainersWait),
router.NewPostRoute("/containers/{name:.*}/wait", r.postContainersWait, router.WithCancel),
router.NewPostRoute("/containers/{name:.*}/resize", r.postContainersResize),
router.NewPostRoute("/containers/{name:.*}/attach", r.postContainersAttach),
router.NewPostRoute("/containers/{name:.*}/copy", r.postContainersCopy), // Deprecated since 1.8, Errors out since 1.12

View file

@ -7,7 +7,6 @@ import (
"net/http"
"strconv"
"syscall"
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/api"
@ -17,6 +16,7 @@ import (
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/versions"
containerpkg "github.com/docker/docker/container"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/signal"
"golang.org/x/net/context"
@ -284,13 +284,48 @@ func (s *containerRouter) postContainersUnpause(ctx context.Context, w http.Resp
}
func (s *containerRouter) postContainersWait(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
status, err := s.backend.ContainerWait(vars["name"], -1*time.Second)
// Behavior changed in version 1.30 to handle wait condition and to
// return headers immediately.
version := httputils.VersionFromContext(ctx)
legacyBehavior := versions.LessThan(version, "1.30")
// The wait condition defaults to "not-running".
waitCondition := containerpkg.WaitConditionNotRunning
if !legacyBehavior {
if err := httputils.ParseForm(r); err != nil {
return err
}
switch container.WaitCondition(r.Form.Get("condition")) {
case container.WaitConditionNextExit:
waitCondition = containerpkg.WaitConditionNextExit
case container.WaitConditionRemoved:
waitCondition = containerpkg.WaitConditionRemoved
}
}
// Note: the context should get canceled if the client closes the
// connection since this handler has been wrapped by the
// router.WithCancel() wrapper.
waitC, err := s.backend.ContainerWait(ctx, vars["name"], waitCondition)
if err != nil {
return err
}
return httputils.WriteJSON(w, http.StatusOK, &container.ContainerWaitOKBody{
StatusCode: int64(status),
w.Header().Set("Content-Type", "application/json")
if !legacyBehavior {
// Write response header immediately.
w.WriteHeader(http.StatusOK)
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
}
// Block on the result of the wait operation.
status := <-waitC
return json.NewEncoder(w).Encode(&container.ContainerWaitOKBody{
StatusCode: int64(status.ExitCode()),
})
}

View file

@ -4270,6 +4270,11 @@ paths:
required: true
description: "ID or name of the container"
type: "string"
- name: "condition"
in: "query"
description: "Wait until a container state reaches the given condition, either 'not-running' (default), 'next-exit', or 'removed'."
type: "string"
default: "not-running"
tags: ["Container"]
/containers/{id}:
delete:

View file

@ -0,0 +1,22 @@
package container
// WaitCondition is a type used to specify a container state for which
// to wait.
type WaitCondition string
// Possible WaitCondition Values.
//
// WaitConditionNotRunning (default) is used to wait for any of the non-running
// states: "created", "exited", "dead", "removing", or "removed".
//
// WaitConditionNextExit is used to wait for the next time the state changes
// to a non-running state. If the state is currently "created" or "exited",
// this would cause Wait() to block until either the container runs and exits
// or is removed.
//
// WaitConditionRemoved is used to wait for the container to be removed.
const (
WaitConditionNotRunning WaitCondition = "not-running"
WaitConditionNextExit WaitCondition = "next-exit"
WaitConditionRemoved WaitCondition = "removed"
)

View file

@ -6,12 +6,13 @@ package builder
import (
"io"
"time"
"golang.org/x/net/context"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/backend"
"github.com/docker/docker/api/types/container"
"golang.org/x/net/context"
containerpkg "github.com/docker/docker/container"
)
const (
@ -49,7 +50,7 @@ type Backend interface {
// ContainerStart starts a new container
ContainerStart(containerID string, hostConfig *container.HostConfig, checkpoint string, checkpointDir string) error
// ContainerWait stops processing until the given container is stopped.
ContainerWait(containerID string, timeout time.Duration) (int, error)
ContainerWait(ctx context.Context, name string, condition containerpkg.WaitCondition) (<-chan containerpkg.StateStatus, error)
// ContainerCreateWorkdir creates the workdir
ContainerCreateWorkdir(containerID string) error

View file

@ -23,6 +23,7 @@ import (
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/builder"
"github.com/docker/docker/builder/remotecontext"
containerpkg "github.com/docker/docker/container"
"github.com/docker/docker/pkg/httputils"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/jsonmessage"
@ -596,16 +597,25 @@ func (b *Builder) run(cID string, cmd []string) (err error) {
return err
}
if ret, _ := b.docker.ContainerWait(cID, -1); ret != 0 {
waitC, err := b.docker.ContainerWait(b.clientCtx, cID, containerpkg.WaitConditionNotRunning)
if err != nil {
// Unable to begin waiting for container.
close(finished)
if cancelErr := <-cancelErrCh; cancelErr != nil {
logrus.Debugf("Build cancelled (%v) and got a non-zero code from ContainerWait: %d",
cancelErr, ret)
logrus.Debugf("Build cancelled (%v) and unable to begin ContainerWait: %d", cancelErr, err)
}
return err
}
if status := <-waitC; status.ExitCode() != 0 {
close(finished)
if cancelErr := <-cancelErrCh; cancelErr != nil {
logrus.Debugf("Build cancelled (%v) and got a non-zero code from ContainerWait: %d", cancelErr, status.ExitCode())
}
// TODO: change error type, because jsonmessage.JSONError assumes HTTP
return &jsonmessage.JSONError{
Message: fmt.Sprintf("The command '%s' returned a non-zero code: %d", strings.Join(cmd, " "), ret),
Code: ret,
Message: fmt.Sprintf("The command '%s' returned a non-zero code: %d", strings.Join(cmd, " "), status.ExitCode()),
Code: status.ExitCode(),
}
}
close(finished)

View file

@ -2,13 +2,13 @@ package dockerfile
import (
"io"
"time"
"github.com/docker/distribution/reference"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/backend"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/builder"
containerpkg "github.com/docker/docker/container"
"github.com/docker/docker/image"
"golang.org/x/net/context"
)
@ -54,8 +54,8 @@ func (m *MockBackend) ContainerStart(containerID string, hostConfig *container.H
return nil
}
func (m *MockBackend) ContainerWait(containerID string, timeout time.Duration) (int, error) {
return 0, nil
func (m *MockBackend) ContainerWait(ctx context.Context, containerID string, condition containerpkg.WaitCondition) (<-chan containerpkg.StateStatus, error) {
return nil, nil
}
func (m *MockBackend) ContainerCreateWorkdir(containerID string) error {

View file

@ -2,25 +2,83 @@ package client
import (
"encoding/json"
"net/url"
"golang.org/x/net/context"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/versions"
)
// ContainerWait pauses execution until a container exits.
// It returns the API status code as response of its readiness.
func (cli *Client) ContainerWait(ctx context.Context, containerID string) (int64, error) {
resp, err := cli.post(ctx, "/containers/"+containerID+"/wait", nil, nil, nil)
// ContainerWait waits until the specified continer is in a certain state
// indicated by the given condition, either "not-running" (default),
// "next-exit", or "removed".
//
// If this client's API version is beforer 1.30, condition is ignored and
// ContainerWait will return immediately with the two channels, as the server
// will wait as if the condition were "not-running".
//
// If this client's API version is at least 1.30, ContainerWait blocks until
// the request has been acknowledged by the server (with a response header),
// then returns two channels on which the caller can wait for the exit status
// of the container or an error if there was a problem either beginning the
// wait request or in getting the response. This allows the caller to
// sychronize ContainerWait with other calls, such as specifying a
// "next-exit" condition before issuing a ContainerStart request.
func (cli *Client) ContainerWait(ctx context.Context, containerID string, condition container.WaitCondition) (<-chan container.ContainerWaitOKBody, <-chan error) {
if versions.LessThan(cli.ClientVersion(), "1.30") {
return cli.legacyContainerWait(ctx, containerID)
}
resultC := make(chan container.ContainerWaitOKBody)
errC := make(chan error)
query := url.Values{}
query.Set("condition", string(condition))
resp, err := cli.post(ctx, "/containers/"+containerID+"/wait", query, nil, nil)
if err != nil {
return -1, err
}
defer ensureReaderClosed(resp)
var res container.ContainerWaitOKBody
if err := json.NewDecoder(resp.body).Decode(&res); err != nil {
return -1, err
defer ensureReaderClosed(resp)
errC <- err
return resultC, errC
}
return res.StatusCode, nil
go func() {
defer ensureReaderClosed(resp)
var res container.ContainerWaitOKBody
if err := json.NewDecoder(resp.body).Decode(&res); err != nil {
errC <- err
return
}
resultC <- res
}()
return resultC, errC
}
// legacyContainerWait returns immediately and doesn't have an option to wait
// until the container is removed.
func (cli *Client) legacyContainerWait(ctx context.Context, containerID string) (<-chan container.ContainerWaitOKBody, <-chan error) {
resultC := make(chan container.ContainerWaitOKBody)
errC := make(chan error)
go func() {
resp, err := cli.post(ctx, "/containers/"+containerID+"/wait", nil, nil, nil)
if err != nil {
errC <- err
return
}
defer ensureReaderClosed(resp)
var res container.ContainerWaitOKBody
if err := json.NewDecoder(resp.body).Decode(&res); err != nil {
errC <- err
return
}
resultC <- res
}()
return resultC, errC
}

View file

@ -20,12 +20,14 @@ func TestContainerWaitError(t *testing.T) {
client := &Client{
client: newMockClient(errorMock(http.StatusInternalServerError, "Server error")),
}
code, err := client.ContainerWait(context.Background(), "nothing")
if err == nil || err.Error() != "Error response from daemon: Server error" {
t.Fatalf("expected a Server Error, got %v", err)
}
if code != -1 {
t.Fatalf("expected a status code equal to '-1', got %d", code)
resultC, errC := client.ContainerWait(context.Background(), "nothing", "")
select {
case result := <-resultC:
t.Fatalf("expected to not get a wait result, got %d", result.StatusCode)
case err := <-errC:
if err.Error() != "Error response from daemon: Server error" {
t.Fatalf("expected a Server Error, got %v", err)
}
}
}
@ -49,12 +51,14 @@ func TestContainerWait(t *testing.T) {
}),
}
code, err := client.ContainerWait(context.Background(), "container_id")
if err != nil {
resultC, errC := client.ContainerWait(context.Background(), "container_id", "")
select {
case err := <-errC:
t.Fatal(err)
}
if code != 15 {
t.Fatalf("expected a status code equal to '15', got %d", code)
case result := <-resultC:
if result.StatusCode != 15 {
t.Fatalf("expected a status code equal to '15', got %d", result.StatusCode)
}
}
}
@ -63,8 +67,8 @@ func ExampleClient_ContainerWait_withTimeout() {
defer cancel()
client, _ := NewEnvClient()
_, err := client.ContainerWait(ctx, "container_id")
if err != nil {
_, errC := client.ContainerWait(ctx, "container_id", "")
if err := <-errC; err != nil {
log.Fatal(err)
}
}

View file

@ -64,7 +64,7 @@ type ContainerAPIClient interface {
ContainerTop(ctx context.Context, container string, arguments []string) (container.ContainerTopOKBody, error)
ContainerUnpause(ctx context.Context, container string) error
ContainerUpdate(ctx context.Context, container string, updateConfig container.UpdateConfig) (container.ContainerUpdateOKBody, error)
ContainerWait(ctx context.Context, container string) (int64, error)
ContainerWait(ctx context.Context, container string, condition container.WaitCondition) (<-chan container.ContainerWaitOKBody, <-chan error)
CopyFromContainer(ctx context.Context, container, srcPath string) (io.ReadCloser, types.ContainerPathStat, error)
CopyToContainer(ctx context.Context, container, path string, content io.Reader, options types.CopyToContainerOptions) error
ContainersPrune(ctx context.Context, pruneFilters filters.Args) (types.ContainersPruneReport, error)

View file

@ -1,6 +1,7 @@
package container
import (
"errors"
"fmt"
"sync"
"time"
@ -29,40 +30,37 @@ type State struct {
ErrorMsg string `json:"Error"` // contains last known error when starting the container
StartedAt time.Time
FinishedAt time.Time
waitChan chan struct{}
Health *Health
waitStop chan struct{}
waitRemove chan struct{}
}
// StateStatus is used to return an error type implementing both
// exec.ExitCode and error.
// StateStatus is used to return container wait results.
// Implements exec.ExitCode interface.
// This type is needed as State include a sync.Mutex field which make
// copying it unsafe.
type StateStatus struct {
exitCode int
error string
}
func newStateStatus(ec int, err string) *StateStatus {
return &StateStatus{
exitCode: ec,
error: err,
}
err error
}
// ExitCode returns current exitcode for the state.
func (ss *StateStatus) ExitCode() int {
return ss.exitCode
func (s StateStatus) ExitCode() int {
return s.exitCode
}
// Error returns current error for the state.
func (ss *StateStatus) Error() string {
return ss.error
// Err returns current error for the state. Returns nil if the container had
// exited on its own.
func (s StateStatus) Err() error {
return s.err
}
// NewState creates a default state object with a fresh channel for state changes.
func NewState() *State {
return &State{
waitChan: make(chan struct{}),
waitStop: make(chan struct{}),
waitRemove: make(chan struct{}),
}
}
@ -160,64 +158,89 @@ func IsValidStateString(s string) bool {
return true
}
func wait(waitChan <-chan struct{}, timeout time.Duration) error {
if timeout < 0 {
<-waitChan
return nil
}
select {
case <-time.After(timeout):
return fmt.Errorf("Timed out: %v", timeout)
case <-waitChan:
return nil
}
}
// WaitCondition is an enum type for different states to wait for.
type WaitCondition int
// WaitStop waits until state is stopped. If state already stopped it returns
// immediately. If you want wait forever you must supply negative timeout.
// Returns exit code, that was passed to SetStopped
func (s *State) WaitStop(timeout time.Duration) (int, error) {
ctx := context.Background()
if timeout >= 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}
if err := s.WaitWithContext(ctx); err != nil {
if status, ok := err.(*StateStatus); ok {
return status.ExitCode(), nil
}
return -1, err
}
return 0, nil
}
// Possible WaitCondition Values.
//
// WaitConditionNotRunning (default) is used to wait for any of the non-running
// states: "created", "exited", "dead", "removing", or "removed".
//
// WaitConditionNextExit is used to wait for the next time the state changes
// to a non-running state. If the state is currently "created" or "exited",
// this would cause Wait() to block until either the container runs and exits
// or is removed.
//
// WaitConditionRemoved is used to wait for the container to be removed.
const (
WaitConditionNotRunning WaitCondition = iota
WaitConditionNextExit
WaitConditionRemoved
)
// WaitWithContext waits for the container to stop. Optional context can be
// passed for canceling the request.
func (s *State) WaitWithContext(ctx context.Context) error {
// Wait waits until the continer is in a certain state indicated by the given
// condition. A context must be used for cancelling the request, controlling
// timeouts, and avoiding goroutine leaks. Wait must be called without holding
// the state lock. Returns a channel from which the caller will receive the
// result. If the container exited on its own, the result's Err() method will
// be nil and its ExitCode() method will return the conatiners exit code,
// otherwise, the results Err() method will return an error indicating why the
// wait operation failed.
func (s *State) Wait(ctx context.Context, condition WaitCondition) <-chan StateStatus {
s.Lock()
if !s.Running {
state := newStateStatus(s.ExitCode(), s.Error())
defer s.Unlock()
if state.ExitCode() == 0 {
return nil
defer s.Unlock()
if condition == WaitConditionNotRunning && !s.Running {
// Buffer so we can put it in the channel now.
resultC := make(chan StateStatus, 1)
// Send the current status.
resultC <- StateStatus{
exitCode: s.ExitCode(),
err: s.Err(),
}
return state
return resultC
}
waitChan := s.waitChan
s.Unlock()
select {
case <-waitChan:
// If we are waiting only for removal, the waitStop channel should
// remain nil and block forever.
var waitStop chan struct{}
if condition < WaitConditionRemoved {
waitStop = s.waitStop
}
// Always wait for removal, just in case the container gets removed
// while it is still in a "created" state, in which case it is never
// actually stopped.
waitRemove := s.waitRemove
resultC := make(chan StateStatus)
go func() {
select {
case <-ctx.Done():
// Context timeout or cancellation.
resultC <- StateStatus{
exitCode: -1,
err: ctx.Err(),
}
return
case <-waitStop:
case <-waitRemove:
}
s.Lock()
state := newStateStatus(s.ExitCode(), s.Error())
s.Unlock()
if state.ExitCode() == 0 {
return nil
result := StateStatus{
exitCode: s.ExitCode(),
err: s.Err(),
}
return state
case <-ctx.Done():
return ctx.Err()
}
s.Unlock()
resultC <- result
}()
return resultC
}
// IsRunning returns whether the running flag is set. Used by Container to check whether a container is running.
@ -268,8 +291,8 @@ func (s *State) SetStopped(exitStatus *ExitStatus) {
s.Pid = 0
s.FinishedAt = time.Now().UTC()
s.setFromExitStatus(exitStatus)
close(s.waitChan) // fire waiters for stop
s.waitChan = make(chan struct{})
close(s.waitStop) // Fire waiters for stop
s.waitStop = make(chan struct{})
}
// SetRestarting sets the container state to "restarting" without locking.
@ -282,8 +305,8 @@ func (s *State) SetRestarting(exitStatus *ExitStatus) {
s.Pid = 0
s.FinishedAt = time.Now().UTC()
s.setFromExitStatus(exitStatus)
close(s.waitChan) // fire waiters for stop
s.waitChan = make(chan struct{})
close(s.waitStop) // Fire waiters for stop
s.waitStop = make(chan struct{})
}
// SetError sets the container's error state. This is useful when we want to
@ -335,7 +358,19 @@ func (s *State) SetDead() {
s.Unlock()
}
// Error returns current error for the state.
func (s *State) Error() string {
return s.ErrorMsg
// SetRemoved assumes this container is already in the "dead" state and
// closes the internal waitRemove channel to unblock callers waiting for a
// container to be removed.
func (s *State) SetRemoved() {
s.Lock()
close(s.waitRemove) // Unblock those waiting on remove.
s.Unlock()
}
// Err returns an error if there is one.
func (s *State) Err() error {
if s.ErrorMsg != "" {
return errors.New(s.ErrorMsg)
}
return nil
}

View file

@ -1,7 +1,7 @@
package container
import (
"sync/atomic"
"context"
"testing"
"time"
@ -30,31 +30,63 @@ func TestIsValidHealthString(t *testing.T) {
func TestStateRunStop(t *testing.T) {
s := NewState()
for i := 1; i < 3; i++ { // full lifecycle two times
// Begin another wait with WaitConditionRemoved. It should complete
// within 200 milliseconds.
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
removalWait := s.Wait(ctx, WaitConditionRemoved)
// Full lifecycle two times.
for i := 1; i <= 2; i++ {
// A wait with WaitConditionNotRunning should return
// immediately since the state is now either "created" (on the
// first iteration) or "exited" (on the second iteration). It
// shouldn't take more than 50 milliseconds.
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
// Expectx exit code to be i-1 since it should be the exit
// code from the previous loop or 0 for the created state.
if status := <-s.Wait(ctx, WaitConditionNotRunning); status.ExitCode() != i-1 {
t.Fatalf("ExitCode %v, expected %v, err %q", status.ExitCode(), i-1, status.Err())
}
// A wait with WaitConditionNextExit should block until the
// container has started and exited. It shouldn't take more
// than 100 milliseconds.
ctx, cancel = context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
initialWait := s.Wait(ctx, WaitConditionNextExit)
// Set the state to "Running".
s.Lock()
s.SetRunning(i+100, false)
s.SetRunning(i, true)
s.Unlock()
// Assert desired state.
if !s.IsRunning() {
t.Fatal("State not running")
}
if s.Pid != i+100 {
t.Fatalf("Pid %v, expected %v", s.Pid, i+100)
if s.Pid != i {
t.Fatalf("Pid %v, expected %v", s.Pid, i)
}
if s.ExitCode() != 0 {
t.Fatalf("ExitCode %v, expected 0", s.ExitCode())
}
stopped := make(chan struct{})
var exit int64
go func() {
exitCode, _ := s.WaitStop(-1 * time.Second)
atomic.StoreInt64(&exit, int64(exitCode))
close(stopped)
}()
// Now that it's running, a wait with WaitConditionNotRunning
// should block until we stop the container. It shouldn't take
// more than 100 milliseconds.
ctx, cancel = context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
exitWait := s.Wait(ctx, WaitConditionNotRunning)
// Set the state to "Exited".
s.Lock()
s.SetStopped(&ExitStatus{ExitCode: i})
s.Unlock()
// Assert desired state.
if s.IsRunning() {
t.Fatal("State is running")
}
@ -64,50 +96,73 @@ func TestStateRunStop(t *testing.T) {
if s.Pid != 0 {
t.Fatalf("Pid %v, expected 0", s.Pid)
}
select {
case <-time.After(100 * time.Millisecond):
t.Fatal("Stop callback doesn't fire in 100 milliseconds")
case <-stopped:
t.Log("Stop callback fired")
// Receive the initialWait result.
if status := <-initialWait; status.ExitCode() != i {
t.Fatalf("ExitCode %v, expected %v, err %q", status.ExitCode(), i, status.Err())
}
exitCode := int(atomic.LoadInt64(&exit))
if exitCode != i {
t.Fatalf("ExitCode %v, expected %v", exitCode, i)
}
if exitCode, err := s.WaitStop(-1 * time.Second); err != nil || exitCode != i {
t.Fatalf("WaitStop returned exitCode: %v, err: %v, expected exitCode: %v, err: %v", exitCode, err, i, nil)
// Receive the exitWait result.
if status := <-exitWait; status.ExitCode() != i {
t.Fatalf("ExitCode %v, expected %v, err %q", status.ExitCode(), i, status.Err())
}
}
// Set the state to dead and removed.
s.SetDead()
s.SetRemoved()
// Wait for removed status or timeout.
if status := <-removalWait; status.ExitCode() != 2 {
// Should have the final exit code from the loop.
t.Fatalf("Removal wait exitCode %v, expected %v, err %q", status.ExitCode(), 2, status.Err())
}
}
func TestStateTimeoutWait(t *testing.T) {
s := NewState()
stopped := make(chan struct{})
go func() {
s.WaitStop(100 * time.Millisecond)
close(stopped)
}()
s.Lock()
s.SetRunning(0, true)
s.Unlock()
// Start a wait with a timeout.
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
waitC := s.Wait(ctx, WaitConditionNotRunning)
// It should timeout *before* this 200ms timer does.
select {
case <-time.After(200 * time.Millisecond):
t.Fatal("Stop callback doesn't fire in 200 milliseconds")
case <-stopped:
case status := <-waitC:
t.Log("Stop callback fired")
// Should be a timeout error.
if status.Err() == nil {
t.Fatal("expected timeout error, got nil")
}
if status.ExitCode() != -1 {
t.Fatalf("expected exit code %v, got %v", -1, status.ExitCode())
}
}
s.Lock()
s.SetStopped(&ExitStatus{ExitCode: 1})
s.SetStopped(&ExitStatus{ExitCode: 0})
s.Unlock()
stopped = make(chan struct{})
go func() {
s.WaitStop(100 * time.Millisecond)
close(stopped)
}()
// Start another wait with a timeout. This one should return
// immediately.
ctx, cancel = context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
waitC = s.Wait(ctx, WaitConditionNotRunning)
select {
case <-time.After(200 * time.Millisecond):
t.Fatal("Stop callback doesn't fire in 200 milliseconds")
case <-stopped:
case status := <-waitC:
t.Log("Stop callback fired")
if status.ExitCode() != 0 {
t.Fatalf("expected exit code %v, got %v, err %q", 0, status.ExitCode(), status.Err())
}
}
}

View file

@ -8,17 +8,11 @@ import (
"github.com/Sirupsen/logrus"
"github.com/docker/docker/pkg/promise"
"github.com/docker/docker/pkg/term"
)
var defaultEscapeSequence = []byte{16, 17} // ctrl-p, ctrl-q
// DetachError is special error which returned in case of container detach.
type DetachError struct{}
func (DetachError) Error() string {
return "detached from container"
}
// AttachConfig is the config struct used to attach a client to a stream's stdio
type AttachConfig struct {
// Tells the attach copier that the stream's stdin is a TTY and to look for
@ -173,62 +167,11 @@ func (c *Config) CopyStreams(ctx context.Context, cfg *AttachConfig) chan error
})
}
// ttyProxy is used only for attaches with a TTY. It is used to proxy
// stdin keypresses from the underlying reader and look for the passed in
// escape key sequence to signal a detach.
type ttyProxy struct {
escapeKeys []byte
escapeKeyPos int
r io.Reader
}
func (r *ttyProxy) Read(buf []byte) (int, error) {
nr, err := r.r.Read(buf)
preserve := func() {
// this preserves the original key presses in the passed in buffer
nr += r.escapeKeyPos
preserve := make([]byte, 0, r.escapeKeyPos+len(buf))
preserve = append(preserve, r.escapeKeys[:r.escapeKeyPos]...)
preserve = append(preserve, buf...)
r.escapeKeyPos = 0
copy(buf[0:nr], preserve)
}
if nr != 1 || err != nil {
if r.escapeKeyPos > 0 {
preserve()
}
return nr, err
}
if buf[0] != r.escapeKeys[r.escapeKeyPos] {
if r.escapeKeyPos > 0 {
preserve()
}
return nr, nil
}
if r.escapeKeyPos == len(r.escapeKeys)-1 {
return 0, DetachError{}
}
// Looks like we've got an escape key, but we need to match again on the next
// read.
// Store the current escape key we found so we can look for the next one on
// the next read.
// Since this is an escape key, make sure we don't let the caller read it
// If later on we find that this is not the escape sequence, we'll add the
// keys back
r.escapeKeyPos++
return nr - r.escapeKeyPos, nil
}
func copyEscapable(dst io.Writer, src io.ReadCloser, keys []byte) (written int64, err error) {
if len(keys) == 0 {
keys = defaultEscapeSequence
}
pr := &ttyProxy{escapeKeys: keys, r: src}
pr := term.NewEscapeProxy(src, keys)
defer src.Close()
return io.Copy(dst, pr)

View file

@ -1,9 +1,9 @@
package daemon
import (
"context"
"fmt"
"io"
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/api/errors"
@ -160,21 +160,18 @@ func (daemon *Daemon) containerAttach(c *container.Container, cfg *stream.Attach
cfg.Stdin = nil
}
waitChan := make(chan struct{})
if c.Config.StdinOnce && !c.Config.Tty {
// Wait for the container to stop before returning.
waitChan := c.Wait(context.Background(), container.WaitConditionNotRunning)
defer func() {
<-waitChan
}()
go func() {
c.WaitStop(-1 * time.Second)
close(waitChan)
_ = <-waitChan // Ignore returned exit code.
}()
}
ctx := c.InitAttachContext()
err := <-c.StreamConfig.CopyStreams(ctx, cfg)
if err != nil {
if _, ok := err.(stream.DetachError); ok {
if _, ok := err.(term.EscapeError); ok {
daemon.LogContainerEvent(c, "detach")
} else {
logrus.Errorf("attach failed with error: %v", err)

View file

@ -13,6 +13,7 @@ import (
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/network"
swarmtypes "github.com/docker/docker/api/types/swarm"
containerpkg "github.com/docker/docker/container"
clustertypes "github.com/docker/docker/daemon/cluster/provider"
"github.com/docker/docker/plugin"
"github.com/docker/libnetwork"
@ -39,7 +40,7 @@ type Backend interface {
DeactivateContainerServiceBinding(containerName string) error
UpdateContainerServiceConfig(containerName string, serviceConfig *clustertypes.ServiceConfig) error
ContainerInspectCurrent(name string, size bool) (*types.ContainerJSON, error)
ContainerWaitWithContext(ctx context.Context, name string) error
ContainerWait(ctx context.Context, name string, condition containerpkg.WaitCondition) (<-chan containerpkg.StateStatus, error)
ContainerRm(name string, config *types.ContainerRmConfig) error
ContainerKill(name string, sig uint64) error
SetContainerDependencyStore(name string, store exec.DependencyGetter) error

View file

@ -17,6 +17,7 @@ import (
"github.com/docker/docker/api/types/backend"
containertypes "github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/events"
containerpkg "github.com/docker/docker/container"
"github.com/docker/docker/daemon/cluster/convert"
executorpkg "github.com/docker/docker/daemon/cluster/executor"
"github.com/docker/libnetwork"
@ -337,8 +338,8 @@ func (c *containerAdapter) events(ctx context.Context) <-chan events.Message {
return eventsq
}
func (c *containerAdapter) wait(ctx context.Context) error {
return c.backend.ContainerWaitWithContext(ctx, c.container.nameOrID())
func (c *containerAdapter) wait(ctx context.Context) (<-chan containerpkg.StateStatus, error) {
return c.backend.ContainerWait(ctx, c.container.nameOrID(), containerpkg.WaitConditionNotRunning)
}
func (c *containerAdapter) shutdown(ctx context.Context) error {

View file

@ -279,25 +279,27 @@ func (r *controller) Wait(pctx context.Context) error {
}
}()
err := r.adapter.wait(ctx)
if ctx.Err() != nil {
return ctx.Err()
waitC, err := r.adapter.wait(ctx)
if err != nil {
return err
}
if err != nil {
ee := &exitError{}
if ec, ok := err.(exec.ExitCoder); ok {
ee.code = ec.ExitCode()
if status := <-waitC; status.ExitCode() != 0 {
exitErr := &exitError{
code: status.ExitCode(),
}
// Set the cause if it is knowable.
select {
case e := <-healthErr:
ee.cause = e
exitErr.cause = e
default:
if err.Error() != "" {
ee.cause = err
if status.Err() != nil {
exitErr.cause = status.Err()
}
}
return ee
return exitErr
}
return nil

View file

@ -3,6 +3,7 @@
package daemon
import (
"context"
"fmt"
"io/ioutil"
"os"
@ -290,11 +291,16 @@ func (daemon *Daemon) setupConfigDir(c *container.Container) (setupErr error) {
return nil
}
func killProcessDirectly(container *container.Container) error {
if _, err := container.WaitStop(10 * time.Second); err != nil {
func killProcessDirectly(cntr *container.Container) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Block until the container to stops or timeout.
status := <-cntr.Wait(ctx, container.WaitConditionNotRunning)
if status.Err() != nil {
// Ensure that we don't kill ourselves
if pid := container.GetPID(); pid != 0 {
logrus.Infof("Container %s failed to exit within 10 seconds of kill - trying direct SIGKILL", stringid.TruncateID(container.ID))
if pid := cntr.GetPID(); pid != 0 {
logrus.Infof("Container %s failed to exit within 10 seconds of kill - trying direct SIGKILL", stringid.TruncateID(cntr.ID))
if err := syscall.Kill(pid, 9); err != nil {
if err != syscall.ESRCH {
return err

View file

@ -6,6 +6,7 @@
package daemon
import (
"context"
"fmt"
"io/ioutil"
"net"
@ -773,7 +774,12 @@ func (daemon *Daemon) shutdownContainer(c *container.Container) error {
if err := daemon.containerUnpause(c); err != nil {
return fmt.Errorf("Failed to unpause container %s with error: %v", c.ID, err)
}
if _, err := c.WaitStop(time.Duration(stopTimeout) * time.Second); err != nil {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(stopTimeout)*time.Second)
defer cancel()
// Wait with timeout for container to exit.
if status := <-c.Wait(ctx, container.WaitConditionNotRunning); status.Err() != nil {
logrus.Debugf("container %s failed to exit in %d second of SIGTERM, sending SIGKILL to force", c.ID, stopTimeout)
sig, ok := signal.SignalMap["KILL"]
if !ok {
@ -782,8 +788,10 @@ func (daemon *Daemon) shutdownContainer(c *container.Container) error {
if err := daemon.kill(c, int(sig)); err != nil {
logrus.Errorf("Failed to SIGKILL container %s", c.ID)
}
c.WaitStop(-1 * time.Second)
return err
// Wait for exit again without a timeout.
// Explicitly ignore the result.
_ = <-c.Wait(context.Background(), container.WaitConditionNotRunning)
return status.Err()
}
}
// If container failed to exit in stopTimeout seconds of SIGTERM, then using the force
@ -791,7 +799,9 @@ func (daemon *Daemon) shutdownContainer(c *container.Container) error {
return fmt.Errorf("Failed to stop container %s with error: %v", c.ID, err)
}
c.WaitStop(-1 * time.Second)
// Wait without timeout for the container to exit.
// Ignore the result.
_ = <-c.Wait(context.Background(), container.WaitConditionNotRunning)
return nil
}

View file

@ -134,6 +134,7 @@ func (daemon *Daemon) cleanupContainer(container *container.Container, forceRemo
if e := daemon.removeMountPoints(container, removeVolume); e != nil {
logrus.Error(e)
}
container.SetRemoved()
stateCtr.del(container.ID)
daemon.LogContainerEvent(container, "destroy")
return nil

View file

@ -253,7 +253,7 @@ func (d *Daemon) ContainerExecStart(ctx context.Context, name string, stdin io.R
return fmt.Errorf("context cancelled")
case err := <-attachErr:
if err != nil {
if _, ok := err.(stream.DetachError); !ok {
if _, ok := err.(term.EscapeError); !ok {
return fmt.Errorf("exec attach failed with error: %v", err)
}
d.LogContainerEvent(c, "exec_detach")

View file

@ -153,7 +153,7 @@ func (daemon *Daemon) getInspectData(container *container.Container) (*types.Con
Dead: container.State.Dead,
Pid: container.State.Pid,
ExitCode: container.State.ExitCode(),
Error: container.State.Error(),
Error: container.State.ErrorMsg,
StartedAt: container.State.StartedAt.Format(time.RFC3339Nano),
FinishedAt: container.State.FinishedAt.Format(time.RFC3339Nano),
Health: containerHealth,

View file

@ -1,6 +1,7 @@
package daemon
import (
"context"
"fmt"
"runtime"
"strings"
@ -8,7 +9,7 @@ import (
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/container"
containerpkg "github.com/docker/docker/container"
"github.com/docker/docker/pkg/signal"
)
@ -54,7 +55,7 @@ func (daemon *Daemon) ContainerKill(name string, sig uint64) error {
// to send the signal. An error is returned if the container is paused
// or not running, or if there is a problem returned from the
// underlying kill command.
func (daemon *Daemon) killWithSignal(container *container.Container, sig int) error {
func (daemon *Daemon) killWithSignal(container *containerpkg.Container, sig int) error {
logrus.Debugf("Sending kill signal %d to container %s", sig, container.ID)
container.Lock()
defer container.Unlock()
@ -110,7 +111,7 @@ func (daemon *Daemon) killWithSignal(container *container.Container, sig int) er
}
// Kill forcefully terminates a container.
func (daemon *Daemon) Kill(container *container.Container) error {
func (daemon *Daemon) Kill(container *containerpkg.Container) error {
if !container.IsRunning() {
return errNotRunning{container.ID}
}
@ -131,7 +132,10 @@ func (daemon *Daemon) Kill(container *container.Container) error {
return nil
}
if _, err2 := container.WaitStop(2 * time.Second); err2 != nil {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if status := <-container.Wait(ctx, containerpkg.WaitConditionNotRunning); status.Err() != nil {
return err
}
}
@ -144,12 +148,15 @@ func (daemon *Daemon) Kill(container *container.Container) error {
return err
}
container.WaitStop(-1 * time.Second)
// Wait for exit with no timeout.
// Ignore returned status.
_ = <-container.Wait(context.Background(), containerpkg.WaitConditionNotRunning)
return nil
}
// killPossibleDeadProcess is a wrapper around killSig() suppressing "no such process" error.
func (daemon *Daemon) killPossiblyDeadProcess(container *container.Container, sig int) error {
func (daemon *Daemon) killPossiblyDeadProcess(container *containerpkg.Container, sig int) error {
err := daemon.killWithSignal(container, sig)
if err == syscall.ESRCH {
e := errNoSuchProcess{container.GetPID(), sig}
@ -159,6 +166,6 @@ func (daemon *Daemon) killPossiblyDeadProcess(container *container.Container, si
return err
}
func (daemon *Daemon) kill(c *container.Container, sig int) error {
func (daemon *Daemon) kill(c *containerpkg.Container, sig int) error {
return daemon.containerd.Signal(c.ID, sig)
}

View file

@ -1,13 +1,14 @@
package daemon
import (
"context"
"fmt"
"net/http"
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/api/errors"
"github.com/docker/docker/container"
containerpkg "github.com/docker/docker/container"
)
// ContainerStop looks for the given container and terminates it,
@ -40,7 +41,7 @@ func (daemon *Daemon) ContainerStop(name string, seconds *int) error {
// process to exit. If a negative duration is given, Stop will wait
// for the initial signal forever. If the container is not running Stop returns
// immediately.
func (daemon *Daemon) containerStop(container *container.Container, seconds int) error {
func (daemon *Daemon) containerStop(container *containerpkg.Container, seconds int) error {
if !container.IsRunning() {
return nil
}
@ -60,7 +61,10 @@ func (daemon *Daemon) containerStop(container *container.Container, seconds int)
// So, instead we'll give it up to 2 more seconds to complete and if
// by that time the container is still running, then the error
// we got is probably valid and so we force kill it.
if _, err := container.WaitStop(2 * time.Second); err != nil {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if status := <-container.Wait(ctx, containerpkg.WaitConditionNotRunning); status.Err() != nil {
logrus.Infof("Container failed to stop after sending signal %d to the process, force killing", stopSignal)
if err := daemon.killPossiblyDeadProcess(container, 9); err != nil {
return err
@ -69,11 +73,15 @@ func (daemon *Daemon) containerStop(container *container.Container, seconds int)
}
// 2. Wait for the process to exit on its own
if _, err := container.WaitStop(time.Duration(seconds) * time.Second); err != nil {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(seconds)*time.Second)
defer cancel()
if status := <-container.Wait(ctx, containerpkg.WaitConditionNotRunning); status.Err() != nil {
logrus.Infof("Container %v failed to exit within %d seconds of signal %d - using the force", container.ID, seconds, stopSignal)
// 3. If it doesn't, then send SIGKILL
if err := daemon.Kill(container); err != nil {
container.WaitStop(-1 * time.Second)
// Wait without a timeout, ignore result.
_ = <-container.Wait(context.Background(), containerpkg.WaitConditionNotRunning)
logrus.Warn(err) // Don't return error because we only care that container is stopped, not what function stopped it
}
}

View file

@ -1,32 +1,22 @@
package daemon
import (
"time"
"github.com/docker/docker/container"
"golang.org/x/net/context"
)
// ContainerWait stops processing until the given container is
// stopped. If the container is not found, an error is returned. On a
// successful stop, the exit code of the container is returned. On a
// timeout, an error is returned. If you want to wait forever, supply
// a negative duration for the timeout.
func (daemon *Daemon) ContainerWait(name string, timeout time.Duration) (int, error) {
container, err := daemon.GetContainer(name)
// ContainerWait waits until the given container is in a certain state
// indicated by the given condition. If the container is not found, a nil
// channel and non-nil error is returned immediately. If the container is
// found, a status result will be sent on the returned channel once the wait
// condition is met or if an error occurs waiting for the container (such as a
// context timeout or cancellation). On a successful wait, the exit code of the
// container is returned in the status with a non-nil Err() value.
func (daemon *Daemon) ContainerWait(ctx context.Context, name string, condition container.WaitCondition) (<-chan container.StateStatus, error) {
cntr, err := daemon.GetContainer(name)
if err != nil {
return -1, err
return nil, err
}
return container.WaitStop(timeout)
}
// ContainerWaitWithContext returns a channel where exit code is sent
// when container stops. Channel can be cancelled with a context.
func (daemon *Daemon) ContainerWaitWithContext(ctx context.Context, name string) error {
container, err := daemon.GetContainer(name)
if err != nil {
return err
}
return container.WaitWithContext(ctx)
return cntr.Wait(ctx, condition), nil
}

View file

@ -28,6 +28,7 @@ keywords: "API, Docker, rcli, REST, documentation"
the swarm, the desired CA key for the swarm (if not using an external certificate), and an optional parameter to force swarm to
generate and rotate to a new CA certificate/key pair.
* `POST /service/create` and `POST /services/(id or name)/update` now take the field `Platforms` as part of the service `Placement`, allowing to specify platforms supported by the service.
* `POST /containers/(name)/wait` now accepts a `condition` query parameter to indicate which state change condition to wait for. Also, response headers are now returned immediately to acknowledge that the server has registered a wait callback for the client.
## v1.29 API changes

View file

@ -21,7 +21,7 @@ Usage: docker wait CONTAINER [CONTAINER...]
Block until one or more containers stop, then print their exit codes
Options:
--help Print usage
--help Print usage
```
> **Note**: `docker wait` returns `0` when run against a container which had

View file

@ -83,11 +83,13 @@ func privilegedTestChunkExecutor(autoRemove bool) testChunkExecutor {
}
var b bytes.Buffer
teeContainerStream(&b, os.Stdout, os.Stderr, stream)
rc, err := cli.ContainerWait(context.Background(), id)
if err != nil {
resultC, errC := cli.ContainerWait(context.Background(), id, "")
select {
case err := <-errC:
return 0, "", err
case result := <-resultC:
return result.StatusCode, b.String(), nil
}
return rc, b.String(), nil
}
}

View file

@ -188,5 +188,11 @@ func waitForContainerCompletion(cli *client.Client, stdout, stderr io.Writer, co
}
stdcopy.StdCopy(stdout, stderr, stream)
stream.Close()
return cli.ContainerWait(context.Background(), containerID)
resultC, errC := cli.ContainerWait(context.Background(), containerID, "")
select {
case err := <-errC:
return 1, err
case result := <-resultC:
return result.StatusCode, nil
}
}

74
pkg/term/proxy.go Normal file
View file

@ -0,0 +1,74 @@
package term
import (
"io"
)
// EscapeError is special error which returned by a TTY proxy reader's Read()
// method in case its detach escape sequence is read.
type EscapeError struct{}
func (EscapeError) Error() string {
return "read escape sequence"
}
// escapeProxy is used only for attaches with a TTY. It is used to proxy
// stdin keypresses from the underlying reader and look for the passed in
// escape key sequence to signal a detach.
type escapeProxy struct {
escapeKeys []byte
escapeKeyPos int
r io.Reader
}
// NewEscapeProxy returns a new TTY proxy reader which wraps the given reader
// and detects when the specified escape keys are read, in which case the Read
// method will return an error of type EscapeError.
func NewEscapeProxy(r io.Reader, escapeKeys []byte) io.Reader {
return &escapeProxy{
escapeKeys: escapeKeys,
r: r,
}
}
func (r *escapeProxy) Read(buf []byte) (int, error) {
nr, err := r.r.Read(buf)
preserve := func() {
// this preserves the original key presses in the passed in buffer
nr += r.escapeKeyPos
preserve := make([]byte, 0, r.escapeKeyPos+len(buf))
preserve = append(preserve, r.escapeKeys[:r.escapeKeyPos]...)
preserve = append(preserve, buf...)
r.escapeKeyPos = 0
copy(buf[0:nr], preserve)
}
if nr != 1 || err != nil {
if r.escapeKeyPos > 0 {
preserve()
}
return nr, err
}
if buf[0] != r.escapeKeys[r.escapeKeyPos] {
if r.escapeKeyPos > 0 {
preserve()
}
return nr, nil
}
if r.escapeKeyPos == len(r.escapeKeys)-1 {
return 0, EscapeError{}
}
// Looks like we've got an escape key, but we need to match again on the next
// read.
// Store the current escape key we found so we can look for the next one on
// the next read.
// Since this is an escape key, make sure we don't let the caller read it
// If later on we find that this is not the escape sequence, we'll add the
// keys back
r.escapeKeyPos++
return nr - r.escapeKeyPos, nil
}