123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604 |
- package agent
- import (
- "sync"
- "github.com/Sirupsen/logrus"
- "github.com/boltdb/bolt"
- "github.com/docker/swarmkit/agent/exec"
- "github.com/docker/swarmkit/api"
- "github.com/docker/swarmkit/log"
- "github.com/docker/swarmkit/watch"
- "golang.org/x/net/context"
- )
- // Worker implements the core task management logic and persistence. It
- // coordinates the set of assignments with the executor.
- type Worker interface {
- // Init prepares the worker for task assignment.
- Init(ctx context.Context) error
- // Close performs worker cleanup when no longer needed.
- //
- // It is not safe to call any worker function after that.
- Close()
- // Assign assigns a complete set of tasks and configs/secrets to a
- // worker. Any items not included in this set will be removed.
- Assign(ctx context.Context, assignments []*api.AssignmentChange) error
- // Updates updates an incremental set of tasks or configs/secrets of
- // the worker. Any items not included either in added or removed will
- // remain untouched.
- Update(ctx context.Context, assignments []*api.AssignmentChange) error
- // Listen to updates about tasks controlled by the worker. When first
- // called, the reporter will receive all updates for all tasks controlled
- // by the worker.
- //
- // The listener will be removed if the context is cancelled.
- Listen(ctx context.Context, reporter StatusReporter)
- // Subscribe to log messages matching the subscription.
- Subscribe(ctx context.Context, subscription *api.SubscriptionMessage) error
- // Wait blocks until all task managers have closed
- Wait(ctx context.Context) error
- }
- // statusReporterKey protects removal map from panic.
- type statusReporterKey struct {
- StatusReporter
- }
- type worker struct {
- db *bolt.DB
- executor exec.Executor
- publisher exec.LogPublisher
- listeners map[*statusReporterKey]struct{}
- taskevents *watch.Queue
- publisherProvider exec.LogPublisherProvider
- taskManagers map[string]*taskManager
- mu sync.RWMutex
- closed bool
- closers sync.WaitGroup // keeps track of active closers
- }
- func newWorker(db *bolt.DB, executor exec.Executor, publisherProvider exec.LogPublisherProvider) *worker {
- return &worker{
- db: db,
- executor: executor,
- publisherProvider: publisherProvider,
- taskevents: watch.NewQueue(),
- listeners: make(map[*statusReporterKey]struct{}),
- taskManagers: make(map[string]*taskManager),
- }
- }
- // Init prepares the worker for assignments.
- func (w *worker) Init(ctx context.Context) error {
- w.mu.Lock()
- defer w.mu.Unlock()
- ctx = log.WithModule(ctx, "worker")
- // TODO(stevvooe): Start task cleanup process.
- // read the tasks from the database and start any task managers that may be needed.
- return w.db.Update(func(tx *bolt.Tx) error {
- return WalkTasks(tx, func(task *api.Task) error {
- if !TaskAssigned(tx, task.ID) {
- // NOTE(stevvooe): If tasks can survive worker restart, we need
- // to startup the controller and ensure they are removed. For
- // now, we can simply remove them from the database.
- if err := DeleteTask(tx, task.ID); err != nil {
- log.G(ctx).WithError(err).Errorf("error removing task %v", task.ID)
- }
- return nil
- }
- status, err := GetTaskStatus(tx, task.ID)
- if err != nil {
- log.G(ctx).WithError(err).Error("unable to read tasks status")
- return nil
- }
- task.Status = *status // merges the status into the task, ensuring we start at the right point.
- return w.startTask(ctx, tx, task)
- })
- })
- }
- // Close performs worker cleanup when no longer needed.
- func (w *worker) Close() {
- w.mu.Lock()
- w.closed = true
- w.mu.Unlock()
- w.taskevents.Close()
- }
- // Assign assigns a full set of tasks, configs, and secrets to the worker.
- // Any tasks not previously known will be started. Any tasks that are in the task set
- // and already running will be updated, if possible. Any tasks currently running on
- // the worker outside the task set will be terminated.
- // Anything not in the set of assignments will be removed.
- func (w *worker) Assign(ctx context.Context, assignments []*api.AssignmentChange) error {
- w.mu.Lock()
- defer w.mu.Unlock()
- if w.closed {
- return ErrClosed
- }
- log.G(ctx).WithFields(logrus.Fields{
- "len(assignments)": len(assignments),
- }).Debug("(*worker).Assign")
- // Need to update dependencies before tasks
- err := reconcileSecrets(ctx, w, assignments, true)
- if err != nil {
- return err
- }
- err = reconcileConfigs(ctx, w, assignments, true)
- if err != nil {
- return err
- }
- return reconcileTaskState(ctx, w, assignments, true)
- }
- // Update updates the set of tasks, configs, and secrets for the worker.
- // Tasks in the added set will be added to the worker, and tasks in the removed set
- // will be removed from the worker
- // Secrets in the added set will be added to the worker, and secrets in the removed set
- // will be removed from the worker.
- // Configs in the added set will be added to the worker, and configs in the removed set
- // will be removed from the worker.
- func (w *worker) Update(ctx context.Context, assignments []*api.AssignmentChange) error {
- w.mu.Lock()
- defer w.mu.Unlock()
- if w.closed {
- return ErrClosed
- }
- log.G(ctx).WithFields(logrus.Fields{
- "len(assignments)": len(assignments),
- }).Debug("(*worker).Update")
- err := reconcileSecrets(ctx, w, assignments, false)
- if err != nil {
- return err
- }
- err = reconcileConfigs(ctx, w, assignments, false)
- if err != nil {
- return err
- }
- return reconcileTaskState(ctx, w, assignments, false)
- }
- func reconcileTaskState(ctx context.Context, w *worker, assignments []*api.AssignmentChange, fullSnapshot bool) error {
- var (
- updatedTasks []*api.Task
- removedTasks []*api.Task
- )
- for _, a := range assignments {
- if t := a.Assignment.GetTask(); t != nil {
- switch a.Action {
- case api.AssignmentChange_AssignmentActionUpdate:
- updatedTasks = append(updatedTasks, t)
- case api.AssignmentChange_AssignmentActionRemove:
- removedTasks = append(removedTasks, t)
- }
- }
- }
- log.G(ctx).WithFields(logrus.Fields{
- "len(updatedTasks)": len(updatedTasks),
- "len(removedTasks)": len(removedTasks),
- }).Debug("(*worker).reconcileTaskState")
- tx, err := w.db.Begin(true)
- if err != nil {
- log.G(ctx).WithError(err).Error("failed starting transaction against task database")
- return err
- }
- defer tx.Rollback()
- assigned := map[string]struct{}{}
- for _, task := range updatedTasks {
- log.G(ctx).WithFields(
- logrus.Fields{
- "task.id": task.ID,
- "task.desiredstate": task.DesiredState}).Debug("assigned")
- if err := PutTask(tx, task); err != nil {
- return err
- }
- if err := SetTaskAssignment(tx, task.ID, true); err != nil {
- return err
- }
- if mgr, ok := w.taskManagers[task.ID]; ok {
- if err := mgr.Update(ctx, task); err != nil && err != ErrClosed {
- log.G(ctx).WithError(err).Error("failed updating assigned task")
- }
- } else {
- // we may have still seen the task, let's grab the status from
- // storage and replace it with our status, if we have it.
- status, err := GetTaskStatus(tx, task.ID)
- if err != nil {
- if err != errTaskUnknown {
- return err
- }
- // never seen before, register the provided status
- if err := PutTaskStatus(tx, task.ID, &task.Status); err != nil {
- return err
- }
- } else {
- task.Status = *status
- }
- w.startTask(ctx, tx, task)
- }
- assigned[task.ID] = struct{}{}
- }
- closeManager := func(tm *taskManager) {
- go func(tm *taskManager) {
- defer w.closers.Done()
- // when a task is no longer assigned, we shutdown the task manager
- if err := tm.Close(); err != nil {
- log.G(ctx).WithError(err).Error("error closing task manager")
- }
- }(tm)
- // make an attempt at removing. this is best effort. any errors will be
- // retried by the reaper later.
- if err := tm.ctlr.Remove(ctx); err != nil {
- log.G(ctx).WithError(err).WithField("task.id", tm.task.ID).Error("remove task failed")
- }
- if err := tm.ctlr.Close(); err != nil {
- log.G(ctx).WithError(err).Error("error closing controller")
- }
- }
- removeTaskAssignment := func(taskID string) error {
- ctx := log.WithLogger(ctx, log.G(ctx).WithField("task.id", taskID))
- if err := SetTaskAssignment(tx, taskID, false); err != nil {
- log.G(ctx).WithError(err).Error("error setting task assignment in database")
- }
- return err
- }
- // If this was a complete set of assignments, we're going to remove all the remaining
- // tasks.
- if fullSnapshot {
- for id, tm := range w.taskManagers {
- if _, ok := assigned[id]; ok {
- continue
- }
- err := removeTaskAssignment(id)
- if err == nil {
- delete(w.taskManagers, id)
- go closeManager(tm)
- }
- }
- } else {
- // If this was an incremental set of assignments, we're going to remove only the tasks
- // in the removed set
- for _, task := range removedTasks {
- err := removeTaskAssignment(task.ID)
- if err != nil {
- continue
- }
- tm, ok := w.taskManagers[task.ID]
- if ok {
- delete(w.taskManagers, task.ID)
- go closeManager(tm)
- }
- }
- }
- return tx.Commit()
- }
- func reconcileSecrets(ctx context.Context, w *worker, assignments []*api.AssignmentChange, fullSnapshot bool) error {
- var (
- updatedSecrets []api.Secret
- removedSecrets []string
- )
- for _, a := range assignments {
- if s := a.Assignment.GetSecret(); s != nil {
- switch a.Action {
- case api.AssignmentChange_AssignmentActionUpdate:
- updatedSecrets = append(updatedSecrets, *s)
- case api.AssignmentChange_AssignmentActionRemove:
- removedSecrets = append(removedSecrets, s.ID)
- }
- }
- }
- secretsProvider, ok := w.executor.(exec.SecretsProvider)
- if !ok {
- if len(updatedSecrets) != 0 || len(removedSecrets) != 0 {
- log.G(ctx).Warn("secrets update ignored; executor does not support secrets")
- }
- return nil
- }
- secrets := secretsProvider.Secrets()
- log.G(ctx).WithFields(logrus.Fields{
- "len(updatedSecrets)": len(updatedSecrets),
- "len(removedSecrets)": len(removedSecrets),
- }).Debug("(*worker).reconcileSecrets")
- // If this was a complete set of secrets, we're going to clear the secrets map and add all of them
- if fullSnapshot {
- secrets.Reset()
- } else {
- secrets.Remove(removedSecrets)
- }
- secrets.Add(updatedSecrets...)
- return nil
- }
- func reconcileConfigs(ctx context.Context, w *worker, assignments []*api.AssignmentChange, fullSnapshot bool) error {
- var (
- updatedConfigs []api.Config
- removedConfigs []string
- )
- for _, a := range assignments {
- if r := a.Assignment.GetConfig(); r != nil {
- switch a.Action {
- case api.AssignmentChange_AssignmentActionUpdate:
- updatedConfigs = append(updatedConfigs, *r)
- case api.AssignmentChange_AssignmentActionRemove:
- removedConfigs = append(removedConfigs, r.ID)
- }
- }
- }
- configsProvider, ok := w.executor.(exec.ConfigsProvider)
- if !ok {
- if len(updatedConfigs) != 0 || len(removedConfigs) != 0 {
- log.G(ctx).Warn("configs update ignored; executor does not support configs")
- }
- return nil
- }
- configs := configsProvider.Configs()
- log.G(ctx).WithFields(logrus.Fields{
- "len(updatedConfigs)": len(updatedConfigs),
- "len(removedConfigs)": len(removedConfigs),
- }).Debug("(*worker).reconcileConfigs")
- // If this was a complete set of configs, we're going to clear the configs map and add all of them
- if fullSnapshot {
- configs.Reset()
- } else {
- configs.Remove(removedConfigs)
- }
- configs.Add(updatedConfigs...)
- return nil
- }
- func (w *worker) Listen(ctx context.Context, reporter StatusReporter) {
- w.mu.Lock()
- defer w.mu.Unlock()
- key := &statusReporterKey{reporter}
- w.listeners[key] = struct{}{}
- go func() {
- <-ctx.Done()
- w.mu.Lock()
- defer w.mu.Unlock()
- delete(w.listeners, key) // remove the listener if the context is closed.
- }()
- // report the current statuses to the new listener
- if err := w.db.View(func(tx *bolt.Tx) error {
- return WalkTaskStatus(tx, func(id string, status *api.TaskStatus) error {
- return reporter.UpdateTaskStatus(ctx, id, status)
- })
- }); err != nil {
- log.G(ctx).WithError(err).Errorf("failed reporting initial statuses to registered listener %v", reporter)
- }
- }
- func (w *worker) startTask(ctx context.Context, tx *bolt.Tx, task *api.Task) error {
- _, err := w.taskManager(ctx, tx, task) // side-effect taskManager creation.
- if err != nil {
- log.G(ctx).WithError(err).Error("failed to start taskManager")
- // we ignore this error: it gets reported in the taskStatus within
- // `newTaskManager`. We log it here and move on. If their is an
- // attempted restart, the lack of taskManager will have this retry
- // again.
- return nil
- }
- // only publish if controller resolution was successful.
- w.taskevents.Publish(task.Copy())
- return nil
- }
- func (w *worker) taskManager(ctx context.Context, tx *bolt.Tx, task *api.Task) (*taskManager, error) {
- if tm, ok := w.taskManagers[task.ID]; ok {
- return tm, nil
- }
- tm, err := w.newTaskManager(ctx, tx, task)
- if err != nil {
- return nil, err
- }
- w.taskManagers[task.ID] = tm
- // keep track of active tasks
- w.closers.Add(1)
- return tm, nil
- }
- func (w *worker) newTaskManager(ctx context.Context, tx *bolt.Tx, task *api.Task) (*taskManager, error) {
- ctx = log.WithLogger(ctx, log.G(ctx).WithFields(logrus.Fields{
- "task.id": task.ID,
- "service.id": task.ServiceID,
- }))
- ctlr, status, err := exec.Resolve(ctx, task, w.executor)
- if err := w.updateTaskStatus(ctx, tx, task.ID, status); err != nil {
- log.G(ctx).WithError(err).Error("error updating task status after controller resolution")
- }
- if err != nil {
- log.G(ctx).WithError(err).Error("controller resolution failed")
- return nil, err
- }
- return newTaskManager(ctx, task, ctlr, statusReporterFunc(func(ctx context.Context, taskID string, status *api.TaskStatus) error {
- w.mu.RLock()
- defer w.mu.RUnlock()
- return w.db.Update(func(tx *bolt.Tx) error {
- return w.updateTaskStatus(ctx, tx, taskID, status)
- })
- })), nil
- }
- // updateTaskStatus reports statuses to listeners, read lock must be held.
- func (w *worker) updateTaskStatus(ctx context.Context, tx *bolt.Tx, taskID string, status *api.TaskStatus) error {
- if err := PutTaskStatus(tx, taskID, status); err != nil {
- log.G(ctx).WithError(err).Error("failed writing status to disk")
- return err
- }
- // broadcast the task status out.
- for key := range w.listeners {
- if err := key.StatusReporter.UpdateTaskStatus(ctx, taskID, status); err != nil {
- log.G(ctx).WithError(err).Errorf("failed updating status for reporter %v", key.StatusReporter)
- }
- }
- return nil
- }
- // Subscribe to log messages matching the subscription.
- func (w *worker) Subscribe(ctx context.Context, subscription *api.SubscriptionMessage) error {
- log.G(ctx).Debugf("Received subscription %s (selector: %v)", subscription.ID, subscription.Selector)
- publisher, cancel, err := w.publisherProvider.Publisher(ctx, subscription.ID)
- if err != nil {
- return err
- }
- // Send a close once we're done
- defer cancel()
- match := func(t *api.Task) bool {
- // TODO(aluzzardi): Consider using maps to limit the iterations.
- for _, tid := range subscription.Selector.TaskIDs {
- if t.ID == tid {
- return true
- }
- }
- for _, sid := range subscription.Selector.ServiceIDs {
- if t.ServiceID == sid {
- return true
- }
- }
- for _, nid := range subscription.Selector.NodeIDs {
- if t.NodeID == nid {
- return true
- }
- }
- return false
- }
- wg := sync.WaitGroup{}
- w.mu.Lock()
- for _, tm := range w.taskManagers {
- if match(tm.task) {
- wg.Add(1)
- go func(tm *taskManager) {
- defer wg.Done()
- tm.Logs(ctx, *subscription.Options, publisher)
- }(tm)
- }
- }
- w.mu.Unlock()
- // If follow mode is disabled, wait for the current set of matched tasks
- // to finish publishing logs, then close the subscription by returning.
- if subscription.Options == nil || !subscription.Options.Follow {
- waitCh := make(chan struct{})
- go func() {
- defer close(waitCh)
- wg.Wait()
- }()
- select {
- case <-ctx.Done():
- return ctx.Err()
- case <-waitCh:
- return nil
- }
- }
- // In follow mode, watch for new tasks. Don't close the subscription
- // until it's cancelled.
- ch, cancel := w.taskevents.Watch()
- defer cancel()
- for {
- select {
- case v := <-ch:
- task := v.(*api.Task)
- if match(task) {
- w.mu.RLock()
- tm, ok := w.taskManagers[task.ID]
- w.mu.RUnlock()
- if !ok {
- continue
- }
- go tm.Logs(ctx, *subscription.Options, publisher)
- }
- case <-ctx.Done():
- return ctx.Err()
- }
- }
- }
- func (w *worker) Wait(ctx context.Context) error {
- ch := make(chan struct{})
- go func() {
- w.closers.Wait()
- close(ch)
- }()
- select {
- case <-ch:
- return nil
- case <-ctx.Done():
- return ctx.Err()
- }
- }
|