Merge pull request #21523 from LK4D4/cancellable_routes
Embed context.Context to routes with usage of CloseNotify
This commit is contained in:
commit
be390c30cd
12 changed files with 58 additions and 51 deletions
|
@ -24,6 +24,6 @@ func (r *buildRouter) Routes() []router.Route {
|
|||
|
||||
func (r *buildRouter) initRoutes() {
|
||||
r.routes = []router.Route{
|
||||
router.NewPostRoute("/build", r.postBuild),
|
||||
router.Cancellable(router.NewPostRoute("/build", r.postBuild)),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -184,21 +184,6 @@ func (br *buildRouter) postBuild(ctx context.Context, w http.ResponseWriter, r *
|
|||
stdout := &streamformatter.StdoutFormatter{Writer: out, StreamFormatter: sf}
|
||||
stderr := &streamformatter.StderrFormatter{Writer: out, StreamFormatter: sf}
|
||||
|
||||
finished := make(chan struct{})
|
||||
defer close(finished)
|
||||
if notifier, ok := w.(http.CloseNotifier); ok {
|
||||
notifyContext, cancel := context.WithCancel(ctx)
|
||||
closeNotifier := notifier.CloseNotify()
|
||||
go func() {
|
||||
select {
|
||||
case <-closeNotifier:
|
||||
cancel()
|
||||
case <-finished:
|
||||
}
|
||||
}()
|
||||
ctx = notifyContext
|
||||
}
|
||||
|
||||
imgID, err := br.backend.Build(ctx, buildOptions,
|
||||
builder.DockerIgnoreContext{ModifiableContext: buildContext},
|
||||
stdout, stderr, out)
|
||||
|
|
|
@ -4,6 +4,8 @@ import (
|
|||
"io"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/docker/docker/api/types/backend"
|
||||
"github.com/docker/docker/pkg/archive"
|
||||
"github.com/docker/docker/pkg/version"
|
||||
|
@ -49,8 +51,8 @@ type stateBackend interface {
|
|||
type monitorBackend interface {
|
||||
ContainerChanges(name string) ([]archive.Change, error)
|
||||
ContainerInspect(name string, size bool, version version.Version) (interface{}, error)
|
||||
ContainerLogs(name string, config *backend.ContainerLogsConfig, started chan struct{}) error
|
||||
ContainerStats(name string, config *backend.ContainerStatsConfig) error
|
||||
ContainerLogs(ctx context.Context, name string, config *backend.ContainerLogsConfig, started chan struct{}) error
|
||||
ContainerStats(ctx context.Context, name string, config *backend.ContainerStatsConfig) error
|
||||
ContainerTop(name string, psArgs string) (*types.ContainerProcessList, error)
|
||||
|
||||
Containers(config *types.ContainerListOptions) ([]*types.Container, error)
|
||||
|
|
|
@ -33,8 +33,8 @@ func (r *containerRouter) initRoutes() {
|
|||
router.NewGetRoute("/containers/{name:.*}/changes", r.getContainersChanges),
|
||||
router.NewGetRoute("/containers/{name:.*}/json", r.getContainersByName),
|
||||
router.NewGetRoute("/containers/{name:.*}/top", r.getContainersTop),
|
||||
router.NewGetRoute("/containers/{name:.*}/logs", r.getContainersLogs),
|
||||
router.NewGetRoute("/containers/{name:.*}/stats", r.getContainersStats),
|
||||
router.Cancellable(router.NewGetRoute("/containers/{name:.*}/logs", r.getContainersLogs)),
|
||||
router.Cancellable(router.NewGetRoute("/containers/{name:.*}/stats", r.getContainersStats)),
|
||||
router.NewGetRoute("/containers/{name:.*}/attach/ws", r.wsContainersAttach),
|
||||
router.NewGetRoute("/exec/{id:.*}/json", r.getExecByID),
|
||||
router.NewGetRoute("/containers/{name:.*}/archive", r.getContainersArchive),
|
||||
|
|
|
@ -67,19 +67,13 @@ func (s *containerRouter) getContainersStats(ctx context.Context, w http.Respons
|
|||
w.Header().Set("Content-Type", "application/json")
|
||||
}
|
||||
|
||||
var closeNotifier <-chan bool
|
||||
if notifier, ok := w.(http.CloseNotifier); ok {
|
||||
closeNotifier = notifier.CloseNotify()
|
||||
}
|
||||
|
||||
config := &backend.ContainerStatsConfig{
|
||||
Stream: stream,
|
||||
OutStream: w,
|
||||
Stop: closeNotifier,
|
||||
Version: string(httputils.VersionFromContext(ctx)),
|
||||
}
|
||||
|
||||
return s.backend.ContainerStats(vars["name"], config)
|
||||
return s.backend.ContainerStats(ctx, vars["name"], config)
|
||||
}
|
||||
|
||||
func (s *containerRouter) getContainersLogs(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
|
||||
|
@ -97,11 +91,6 @@ func (s *containerRouter) getContainersLogs(ctx context.Context, w http.Response
|
|||
return fmt.Errorf("Bad parameters: you must choose at least one stream")
|
||||
}
|
||||
|
||||
var closeNotifier <-chan bool
|
||||
if notifier, ok := w.(http.CloseNotifier); ok {
|
||||
closeNotifier = notifier.CloseNotify()
|
||||
}
|
||||
|
||||
containerName := vars["name"]
|
||||
logsConfig := &backend.ContainerLogsConfig{
|
||||
ContainerLogsOptions: types.ContainerLogsOptions{
|
||||
|
@ -113,11 +102,10 @@ func (s *containerRouter) getContainersLogs(ctx context.Context, w http.Response
|
|||
ShowStderr: stderr,
|
||||
},
|
||||
OutStream: w,
|
||||
Stop: closeNotifier,
|
||||
}
|
||||
|
||||
chStarted := make(chan struct{})
|
||||
if err := s.backend.ContainerLogs(containerName, logsConfig, chStarted); err != nil {
|
||||
if err := s.backend.ContainerLogs(ctx, containerName, logsConfig, chStarted); err != nil {
|
||||
select {
|
||||
case <-chStarted:
|
||||
// The client may be expecting all of the data we're sending to
|
||||
|
|
|
@ -34,9 +34,9 @@ func (r *imageRouter) initRoutes() {
|
|||
router.NewGetRoute("/images/{name:.*}/json", r.getImagesByName),
|
||||
// POST
|
||||
router.NewPostRoute("/commit", r.postCommit),
|
||||
router.NewPostRoute("/images/create", r.postImagesCreate),
|
||||
router.NewPostRoute("/images/load", r.postImagesLoad),
|
||||
router.NewPostRoute("/images/{name:.*}/push", r.postImagesPush),
|
||||
router.Cancellable(router.NewPostRoute("/images/create", r.postImagesCreate)),
|
||||
router.Cancellable(router.NewPostRoute("/images/{name:.*}/push", r.postImagesPush)),
|
||||
router.NewPostRoute("/images/{name:.*}/tag", r.postImagesTag),
|
||||
// DELETE
|
||||
router.NewDeleteRoute("/images/{name:.*}", r.deleteImages),
|
||||
|
|
|
@ -1,6 +1,11 @@
|
|||
package router
|
||||
|
||||
import "github.com/docker/docker/api/server/httputils"
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/docker/docker/api/server/httputils"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
// localRoute defines an individual API route to connect
|
||||
// with the docker daemon. It implements Route.
|
||||
|
@ -59,3 +64,33 @@ func NewOptionsRoute(path string, handler httputils.APIFunc) Route {
|
|||
func NewHeadRoute(path string, handler httputils.APIFunc) Route {
|
||||
return NewRoute("HEAD", path, handler)
|
||||
}
|
||||
|
||||
func cancellableHandler(h httputils.APIFunc) httputils.APIFunc {
|
||||
return func(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
|
||||
if notifier, ok := w.(http.CloseNotifier); ok {
|
||||
notify := notifier.CloseNotify()
|
||||
notifyCtx, cancel := context.WithCancel(ctx)
|
||||
finished := make(chan struct{})
|
||||
defer close(finished)
|
||||
ctx = notifyCtx
|
||||
go func() {
|
||||
select {
|
||||
case <-notify:
|
||||
cancel()
|
||||
case <-finished:
|
||||
}
|
||||
}()
|
||||
}
|
||||
return h(ctx, w, r, vars)
|
||||
}
|
||||
}
|
||||
|
||||
// Cancellable makes new route which embeds http.CloseNotifier feature to
|
||||
// context.Context of handler.
|
||||
func Cancellable(r Route) Route {
|
||||
return localRoute{
|
||||
method: r.Method(),
|
||||
path: r.Path(),
|
||||
handler: cancellableHandler(r.Handler()),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,7 @@ func NewRouter(b Backend) router.Router {
|
|||
r.routes = []router.Route{
|
||||
router.NewOptionsRoute("/{anyroute:.*}", optionsHandler),
|
||||
router.NewGetRoute("/_ping", pingHandler),
|
||||
router.NewGetRoute("/events", r.getEvents),
|
||||
router.Cancellable(router.NewGetRoute("/events", r.getEvents)),
|
||||
router.NewGetRoute("/info", r.getInfo),
|
||||
router.NewGetRoute("/version", r.getVersion),
|
||||
router.NewPostRoute("/auth", r.postAuth),
|
||||
|
|
|
@ -83,11 +83,6 @@ func (s *systemRouter) getEvents(ctx context.Context, w http.ResponseWriter, r *
|
|||
}
|
||||
}
|
||||
|
||||
var closeNotify <-chan bool
|
||||
if closeNotifier, ok := w.(http.CloseNotifier); ok {
|
||||
closeNotify = closeNotifier.CloseNotify()
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case ev := <-l:
|
||||
|
@ -101,8 +96,8 @@ func (s *systemRouter) getEvents(ctx context.Context, w http.ResponseWriter, r *
|
|||
}
|
||||
case <-timer.C:
|
||||
return nil
|
||||
case <-closeNotify:
|
||||
logrus.Debug("Client disconnected, stop sending events")
|
||||
case <-ctx.Done():
|
||||
logrus.Debug("Client context cancelled, stop sending events")
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,7 +31,6 @@ type ContainerAttachConfig struct {
|
|||
type ContainerLogsConfig struct {
|
||||
types.ContainerLogsOptions
|
||||
OutStream io.Writer
|
||||
Stop <-chan bool
|
||||
}
|
||||
|
||||
// ContainerStatsConfig holds information for configuring the runtime
|
||||
|
@ -39,7 +38,6 @@ type ContainerLogsConfig struct {
|
|||
type ContainerStatsConfig struct {
|
||||
Stream bool
|
||||
OutStream io.Writer
|
||||
Stop <-chan bool
|
||||
Version string
|
||||
}
|
||||
|
||||
|
|
|
@ -6,6 +6,8 @@ import (
|
|||
"strconv"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/docker/docker/api/types/backend"
|
||||
"github.com/docker/docker/container"
|
||||
|
@ -19,7 +21,7 @@ import (
|
|||
|
||||
// ContainerLogs hooks up a container's stdout and stderr streams
|
||||
// configured with the given struct.
|
||||
func (daemon *Daemon) ContainerLogs(containerName string, config *backend.ContainerLogsConfig, started chan struct{}) error {
|
||||
func (daemon *Daemon) ContainerLogs(ctx context.Context, containerName string, config *backend.ContainerLogsConfig, started chan struct{}) error {
|
||||
container, err := daemon.GetContainer(containerName)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -78,7 +80,7 @@ func (daemon *Daemon) ContainerLogs(containerName string, config *backend.Contai
|
|||
case err := <-logs.Err:
|
||||
logrus.Errorf("Error streaming logs: %v", err)
|
||||
return nil
|
||||
case <-config.Stop:
|
||||
case <-ctx.Done():
|
||||
logs.Close()
|
||||
return nil
|
||||
case msg, ok := <-logs.Msg:
|
||||
|
|
|
@ -5,6 +5,8 @@ import (
|
|||
"errors"
|
||||
"runtime"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/docker/docker/api/types/backend"
|
||||
"github.com/docker/docker/pkg/ioutils"
|
||||
"github.com/docker/docker/pkg/version"
|
||||
|
@ -14,7 +16,7 @@ import (
|
|||
|
||||
// ContainerStats writes information about the container to the stream
|
||||
// given in the config object.
|
||||
func (daemon *Daemon) ContainerStats(prefixOrName string, config *backend.ContainerStatsConfig) error {
|
||||
func (daemon *Daemon) ContainerStats(ctx context.Context, prefixOrName string, config *backend.ContainerStatsConfig) error {
|
||||
if runtime.GOOS == "windows" {
|
||||
return errors.New("Windows does not support stats")
|
||||
}
|
||||
|
@ -114,7 +116,7 @@ func (daemon *Daemon) ContainerStats(prefixOrName string, config *backend.Contai
|
|||
if !config.Stream {
|
||||
return nil
|
||||
}
|
||||
case <-config.Stop:
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue