123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 |
- package agent
- import (
- "sync"
- "time"
- "github.com/docker/swarmkit/agent/exec"
- "github.com/docker/swarmkit/api"
- "github.com/docker/swarmkit/api/equality"
- "github.com/docker/swarmkit/log"
- "golang.org/x/net/context"
- )
- // taskManager manages all aspects of task execution and reporting for an agent
- // through state management.
- type taskManager struct {
- task *api.Task
- ctlr exec.Controller
- reporter StatusReporter
- updateq chan *api.Task
- shutdown chan struct{}
- shutdownOnce sync.Once
- closed chan struct{}
- closeOnce sync.Once
- }
- func newTaskManager(ctx context.Context, task *api.Task, ctlr exec.Controller, reporter StatusReporter) *taskManager {
- t := &taskManager{
- task: task.Copy(),
- ctlr: ctlr,
- reporter: reporter,
- updateq: make(chan *api.Task),
- shutdown: make(chan struct{}),
- closed: make(chan struct{}),
- }
- go t.run(ctx)
- return t
- }
- // Update the task data.
- func (tm *taskManager) Update(ctx context.Context, task *api.Task) error {
- select {
- case tm.updateq <- task:
- return nil
- case <-tm.closed:
- return ErrClosed
- case <-ctx.Done():
- return ctx.Err()
- }
- }
- // Close shuts down the task manager, blocking until it is closed.
- func (tm *taskManager) Close() error {
- tm.shutdownOnce.Do(func() {
- close(tm.shutdown)
- })
- <-tm.closed
- return nil
- }
- func (tm *taskManager) Logs(ctx context.Context, options api.LogSubscriptionOptions, publisher exec.LogPublisher) {
- ctx = log.WithModule(ctx, "taskmanager")
- logCtlr, ok := tm.ctlr.(exec.ControllerLogs)
- if !ok {
- return // no logs available
- }
- if err := logCtlr.Logs(ctx, publisher, options); err != nil {
- log.G(ctx).WithError(err).Errorf("logs call failed")
- }
- }
- func (tm *taskManager) run(ctx context.Context) {
- ctx, cancelAll := context.WithCancel(ctx)
- defer cancelAll() // cancel all child operations on exit.
- ctx = log.WithModule(ctx, "taskmanager")
- var (
- opctx context.Context
- cancel context.CancelFunc
- run = make(chan struct{}, 1)
- statusq = make(chan *api.TaskStatus)
- errs = make(chan error)
- shutdown = tm.shutdown
- updated bool // true if the task was updated.
- )
- defer func() {
- // closure picks up current value of cancel.
- if cancel != nil {
- cancel()
- }
- }()
- run <- struct{}{} // prime the pump
- for {
- select {
- case <-run:
- // always check for shutdown before running.
- select {
- case <-tm.shutdown:
- shutdown = tm.shutdown // a little questionable
- continue // ignore run request and handle shutdown
- case <-tm.closed:
- continue
- default:
- }
- opctx, cancel = context.WithCancel(ctx)
- // Several variables need to be snapshotted for the closure below.
- opcancel := cancel // fork for the closure
- running := tm.task.Copy() // clone the task before dispatch
- statusqLocal := statusq
- updatedLocal := updated // capture state of update for goroutine
- updated = false
- go runctx(ctx, tm.closed, errs, func(ctx context.Context) error {
- defer opcancel()
- if updatedLocal {
- // before we do anything, update the task for the controller.
- // always update the controller before running.
- if err := tm.ctlr.Update(opctx, running); err != nil {
- log.G(ctx).WithError(err).Error("updating task controller failed")
- return err
- }
- }
- status, err := exec.Do(opctx, running, tm.ctlr)
- if status != nil {
- // always report the status if we get one back. This
- // returns to the manager loop, then reports the status
- // upstream.
- select {
- case statusqLocal <- status:
- case <-ctx.Done(): // not opctx, since that may have been cancelled.
- }
- if err := tm.reporter.UpdateTaskStatus(ctx, running.ID, status); err != nil {
- log.G(ctx).WithError(err).Error("task manager failed to report status to agent")
- }
- }
- return err
- })
- case err := <-errs:
- // This branch is always executed when an operations completes. The
- // goal is to decide whether or not we re-dispatch the operation.
- cancel = nil
- select {
- case <-tm.shutdown:
- shutdown = tm.shutdown // re-enable the shutdown branch
- continue // no dispatch if we are in shutdown.
- default:
- }
- switch err {
- case exec.ErrTaskNoop:
- if !updated {
- continue // wait till getting pumped via update.
- }
- case exec.ErrTaskRetry:
- // TODO(stevvooe): Add exponential backoff with random jitter
- // here. For now, this backoff is enough to keep the task
- // manager from running away with the CPU.
- time.AfterFunc(time.Second, func() {
- errs <- nil // repump this branch, with no err
- })
- continue
- case nil, context.Canceled, context.DeadlineExceeded:
- // no log in this case
- default:
- log.G(ctx).WithError(err).Error("task operation failed")
- }
- select {
- case run <- struct{}{}:
- default:
- }
- case status := <-statusq:
- tm.task.Status = *status
- case task := <-tm.updateq:
- if equality.TasksEqualStable(task, tm.task) {
- continue // ignore the update
- }
- if task.ID != tm.task.ID {
- log.G(ctx).WithField("task.update.id", task.ID).Error("received update for incorrect task")
- continue
- }
- if task.DesiredState < tm.task.DesiredState {
- log.G(ctx).WithField("task.update.desiredstate", task.DesiredState).
- Error("ignoring task update with invalid desired state")
- continue
- }
- task = task.Copy()
- task.Status = tm.task.Status // overwrite our status, as it is canonical.
- tm.task = task
- updated = true
- // we have accepted the task update
- if cancel != nil {
- cancel() // cancel outstanding if necessary.
- } else {
- // If this channel op fails, it means there is already a
- // message on the run queue.
- select {
- case run <- struct{}{}:
- default:
- }
- }
- case <-shutdown:
- if cancel != nil {
- // cancel outstanding operation.
- cancel()
- // subtle: after a cancellation, we want to avoid busy wait
- // here. this gets renabled in the errs branch and we'll come
- // back around and try shutdown again.
- shutdown = nil // turn off this branch until op proceeds
- continue // wait until operation actually exits.
- }
- // disable everything, and prepare for closing.
- statusq = nil
- errs = nil
- shutdown = nil
- tm.closeOnce.Do(func() {
- close(tm.closed)
- })
- case <-tm.closed:
- return
- case <-ctx.Done():
- tm.closeOnce.Do(func() {
- close(tm.closed)
- })
- return
- }
- }
- }
|