worker.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604
  1. package agent
  2. import (
  3. "sync"
  4. "github.com/Sirupsen/logrus"
  5. "github.com/boltdb/bolt"
  6. "github.com/docker/swarmkit/agent/exec"
  7. "github.com/docker/swarmkit/api"
  8. "github.com/docker/swarmkit/log"
  9. "github.com/docker/swarmkit/watch"
  10. "golang.org/x/net/context"
  11. )
  12. // Worker implements the core task management logic and persistence. It
  13. // coordinates the set of assignments with the executor.
  14. type Worker interface {
  15. // Init prepares the worker for task assignment.
  16. Init(ctx context.Context) error
  17. // Close performs worker cleanup when no longer needed.
  18. //
  19. // It is not safe to call any worker function after that.
  20. Close()
  21. // Assign assigns a complete set of tasks and configs/secrets to a
  22. // worker. Any items not included in this set will be removed.
  23. Assign(ctx context.Context, assignments []*api.AssignmentChange) error
  24. // Updates updates an incremental set of tasks or configs/secrets of
  25. // the worker. Any items not included either in added or removed will
  26. // remain untouched.
  27. Update(ctx context.Context, assignments []*api.AssignmentChange) error
  28. // Listen to updates about tasks controlled by the worker. When first
  29. // called, the reporter will receive all updates for all tasks controlled
  30. // by the worker.
  31. //
  32. // The listener will be removed if the context is cancelled.
  33. Listen(ctx context.Context, reporter StatusReporter)
  34. // Subscribe to log messages matching the subscription.
  35. Subscribe(ctx context.Context, subscription *api.SubscriptionMessage) error
  36. // Wait blocks until all task managers have closed
  37. Wait(ctx context.Context) error
  38. }
  39. // statusReporterKey protects removal map from panic.
  40. type statusReporterKey struct {
  41. StatusReporter
  42. }
  43. type worker struct {
  44. db *bolt.DB
  45. executor exec.Executor
  46. publisher exec.LogPublisher
  47. listeners map[*statusReporterKey]struct{}
  48. taskevents *watch.Queue
  49. publisherProvider exec.LogPublisherProvider
  50. taskManagers map[string]*taskManager
  51. mu sync.RWMutex
  52. closed bool
  53. closers sync.WaitGroup // keeps track of active closers
  54. }
  55. func newWorker(db *bolt.DB, executor exec.Executor, publisherProvider exec.LogPublisherProvider) *worker {
  56. return &worker{
  57. db: db,
  58. executor: executor,
  59. publisherProvider: publisherProvider,
  60. taskevents: watch.NewQueue(),
  61. listeners: make(map[*statusReporterKey]struct{}),
  62. taskManagers: make(map[string]*taskManager),
  63. }
  64. }
  65. // Init prepares the worker for assignments.
  66. func (w *worker) Init(ctx context.Context) error {
  67. w.mu.Lock()
  68. defer w.mu.Unlock()
  69. ctx = log.WithModule(ctx, "worker")
  70. // TODO(stevvooe): Start task cleanup process.
  71. // read the tasks from the database and start any task managers that may be needed.
  72. return w.db.Update(func(tx *bolt.Tx) error {
  73. return WalkTasks(tx, func(task *api.Task) error {
  74. if !TaskAssigned(tx, task.ID) {
  75. // NOTE(stevvooe): If tasks can survive worker restart, we need
  76. // to startup the controller and ensure they are removed. For
  77. // now, we can simply remove them from the database.
  78. if err := DeleteTask(tx, task.ID); err != nil {
  79. log.G(ctx).WithError(err).Errorf("error removing task %v", task.ID)
  80. }
  81. return nil
  82. }
  83. status, err := GetTaskStatus(tx, task.ID)
  84. if err != nil {
  85. log.G(ctx).WithError(err).Error("unable to read tasks status")
  86. return nil
  87. }
  88. task.Status = *status // merges the status into the task, ensuring we start at the right point.
  89. return w.startTask(ctx, tx, task)
  90. })
  91. })
  92. }
  93. // Close performs worker cleanup when no longer needed.
  94. func (w *worker) Close() {
  95. w.mu.Lock()
  96. w.closed = true
  97. w.mu.Unlock()
  98. w.taskevents.Close()
  99. }
  100. // Assign assigns a full set of tasks, configs, and secrets to the worker.
  101. // Any tasks not previously known will be started. Any tasks that are in the task set
  102. // and already running will be updated, if possible. Any tasks currently running on
  103. // the worker outside the task set will be terminated.
  104. // Anything not in the set of assignments will be removed.
  105. func (w *worker) Assign(ctx context.Context, assignments []*api.AssignmentChange) error {
  106. w.mu.Lock()
  107. defer w.mu.Unlock()
  108. if w.closed {
  109. return ErrClosed
  110. }
  111. log.G(ctx).WithFields(logrus.Fields{
  112. "len(assignments)": len(assignments),
  113. }).Debug("(*worker).Assign")
  114. // Need to update dependencies before tasks
  115. err := reconcileSecrets(ctx, w, assignments, true)
  116. if err != nil {
  117. return err
  118. }
  119. err = reconcileConfigs(ctx, w, assignments, true)
  120. if err != nil {
  121. return err
  122. }
  123. return reconcileTaskState(ctx, w, assignments, true)
  124. }
  125. // Update updates the set of tasks, configs, and secrets for the worker.
  126. // Tasks in the added set will be added to the worker, and tasks in the removed set
  127. // will be removed from the worker
  128. // Secrets in the added set will be added to the worker, and secrets in the removed set
  129. // will be removed from the worker.
  130. // Configs in the added set will be added to the worker, and configs in the removed set
  131. // will be removed from the worker.
  132. func (w *worker) Update(ctx context.Context, assignments []*api.AssignmentChange) error {
  133. w.mu.Lock()
  134. defer w.mu.Unlock()
  135. if w.closed {
  136. return ErrClosed
  137. }
  138. log.G(ctx).WithFields(logrus.Fields{
  139. "len(assignments)": len(assignments),
  140. }).Debug("(*worker).Update")
  141. err := reconcileSecrets(ctx, w, assignments, false)
  142. if err != nil {
  143. return err
  144. }
  145. err = reconcileConfigs(ctx, w, assignments, false)
  146. if err != nil {
  147. return err
  148. }
  149. return reconcileTaskState(ctx, w, assignments, false)
  150. }
  151. func reconcileTaskState(ctx context.Context, w *worker, assignments []*api.AssignmentChange, fullSnapshot bool) error {
  152. var (
  153. updatedTasks []*api.Task
  154. removedTasks []*api.Task
  155. )
  156. for _, a := range assignments {
  157. if t := a.Assignment.GetTask(); t != nil {
  158. switch a.Action {
  159. case api.AssignmentChange_AssignmentActionUpdate:
  160. updatedTasks = append(updatedTasks, t)
  161. case api.AssignmentChange_AssignmentActionRemove:
  162. removedTasks = append(removedTasks, t)
  163. }
  164. }
  165. }
  166. log.G(ctx).WithFields(logrus.Fields{
  167. "len(updatedTasks)": len(updatedTasks),
  168. "len(removedTasks)": len(removedTasks),
  169. }).Debug("(*worker).reconcileTaskState")
  170. tx, err := w.db.Begin(true)
  171. if err != nil {
  172. log.G(ctx).WithError(err).Error("failed starting transaction against task database")
  173. return err
  174. }
  175. defer tx.Rollback()
  176. assigned := map[string]struct{}{}
  177. for _, task := range updatedTasks {
  178. log.G(ctx).WithFields(
  179. logrus.Fields{
  180. "task.id": task.ID,
  181. "task.desiredstate": task.DesiredState}).Debug("assigned")
  182. if err := PutTask(tx, task); err != nil {
  183. return err
  184. }
  185. if err := SetTaskAssignment(tx, task.ID, true); err != nil {
  186. return err
  187. }
  188. if mgr, ok := w.taskManagers[task.ID]; ok {
  189. if err := mgr.Update(ctx, task); err != nil && err != ErrClosed {
  190. log.G(ctx).WithError(err).Error("failed updating assigned task")
  191. }
  192. } else {
  193. // we may have still seen the task, let's grab the status from
  194. // storage and replace it with our status, if we have it.
  195. status, err := GetTaskStatus(tx, task.ID)
  196. if err != nil {
  197. if err != errTaskUnknown {
  198. return err
  199. }
  200. // never seen before, register the provided status
  201. if err := PutTaskStatus(tx, task.ID, &task.Status); err != nil {
  202. return err
  203. }
  204. } else {
  205. task.Status = *status
  206. }
  207. w.startTask(ctx, tx, task)
  208. }
  209. assigned[task.ID] = struct{}{}
  210. }
  211. closeManager := func(tm *taskManager) {
  212. go func(tm *taskManager) {
  213. defer w.closers.Done()
  214. // when a task is no longer assigned, we shutdown the task manager
  215. if err := tm.Close(); err != nil {
  216. log.G(ctx).WithError(err).Error("error closing task manager")
  217. }
  218. }(tm)
  219. // make an attempt at removing. this is best effort. any errors will be
  220. // retried by the reaper later.
  221. if err := tm.ctlr.Remove(ctx); err != nil {
  222. log.G(ctx).WithError(err).WithField("task.id", tm.task.ID).Error("remove task failed")
  223. }
  224. if err := tm.ctlr.Close(); err != nil {
  225. log.G(ctx).WithError(err).Error("error closing controller")
  226. }
  227. }
  228. removeTaskAssignment := func(taskID string) error {
  229. ctx := log.WithLogger(ctx, log.G(ctx).WithField("task.id", taskID))
  230. if err := SetTaskAssignment(tx, taskID, false); err != nil {
  231. log.G(ctx).WithError(err).Error("error setting task assignment in database")
  232. }
  233. return err
  234. }
  235. // If this was a complete set of assignments, we're going to remove all the remaining
  236. // tasks.
  237. if fullSnapshot {
  238. for id, tm := range w.taskManagers {
  239. if _, ok := assigned[id]; ok {
  240. continue
  241. }
  242. err := removeTaskAssignment(id)
  243. if err == nil {
  244. delete(w.taskManagers, id)
  245. go closeManager(tm)
  246. }
  247. }
  248. } else {
  249. // If this was an incremental set of assignments, we're going to remove only the tasks
  250. // in the removed set
  251. for _, task := range removedTasks {
  252. err := removeTaskAssignment(task.ID)
  253. if err != nil {
  254. continue
  255. }
  256. tm, ok := w.taskManagers[task.ID]
  257. if ok {
  258. delete(w.taskManagers, task.ID)
  259. go closeManager(tm)
  260. }
  261. }
  262. }
  263. return tx.Commit()
  264. }
  265. func reconcileSecrets(ctx context.Context, w *worker, assignments []*api.AssignmentChange, fullSnapshot bool) error {
  266. var (
  267. updatedSecrets []api.Secret
  268. removedSecrets []string
  269. )
  270. for _, a := range assignments {
  271. if s := a.Assignment.GetSecret(); s != nil {
  272. switch a.Action {
  273. case api.AssignmentChange_AssignmentActionUpdate:
  274. updatedSecrets = append(updatedSecrets, *s)
  275. case api.AssignmentChange_AssignmentActionRemove:
  276. removedSecrets = append(removedSecrets, s.ID)
  277. }
  278. }
  279. }
  280. secretsProvider, ok := w.executor.(exec.SecretsProvider)
  281. if !ok {
  282. if len(updatedSecrets) != 0 || len(removedSecrets) != 0 {
  283. log.G(ctx).Warn("secrets update ignored; executor does not support secrets")
  284. }
  285. return nil
  286. }
  287. secrets := secretsProvider.Secrets()
  288. log.G(ctx).WithFields(logrus.Fields{
  289. "len(updatedSecrets)": len(updatedSecrets),
  290. "len(removedSecrets)": len(removedSecrets),
  291. }).Debug("(*worker).reconcileSecrets")
  292. // If this was a complete set of secrets, we're going to clear the secrets map and add all of them
  293. if fullSnapshot {
  294. secrets.Reset()
  295. } else {
  296. secrets.Remove(removedSecrets)
  297. }
  298. secrets.Add(updatedSecrets...)
  299. return nil
  300. }
  301. func reconcileConfigs(ctx context.Context, w *worker, assignments []*api.AssignmentChange, fullSnapshot bool) error {
  302. var (
  303. updatedConfigs []api.Config
  304. removedConfigs []string
  305. )
  306. for _, a := range assignments {
  307. if r := a.Assignment.GetConfig(); r != nil {
  308. switch a.Action {
  309. case api.AssignmentChange_AssignmentActionUpdate:
  310. updatedConfigs = append(updatedConfigs, *r)
  311. case api.AssignmentChange_AssignmentActionRemove:
  312. removedConfigs = append(removedConfigs, r.ID)
  313. }
  314. }
  315. }
  316. configsProvider, ok := w.executor.(exec.ConfigsProvider)
  317. if !ok {
  318. if len(updatedConfigs) != 0 || len(removedConfigs) != 0 {
  319. log.G(ctx).Warn("configs update ignored; executor does not support configs")
  320. }
  321. return nil
  322. }
  323. configs := configsProvider.Configs()
  324. log.G(ctx).WithFields(logrus.Fields{
  325. "len(updatedConfigs)": len(updatedConfigs),
  326. "len(removedConfigs)": len(removedConfigs),
  327. }).Debug("(*worker).reconcileConfigs")
  328. // If this was a complete set of configs, we're going to clear the configs map and add all of them
  329. if fullSnapshot {
  330. configs.Reset()
  331. } else {
  332. configs.Remove(removedConfigs)
  333. }
  334. configs.Add(updatedConfigs...)
  335. return nil
  336. }
  337. func (w *worker) Listen(ctx context.Context, reporter StatusReporter) {
  338. w.mu.Lock()
  339. defer w.mu.Unlock()
  340. key := &statusReporterKey{reporter}
  341. w.listeners[key] = struct{}{}
  342. go func() {
  343. <-ctx.Done()
  344. w.mu.Lock()
  345. defer w.mu.Unlock()
  346. delete(w.listeners, key) // remove the listener if the context is closed.
  347. }()
  348. // report the current statuses to the new listener
  349. if err := w.db.View(func(tx *bolt.Tx) error {
  350. return WalkTaskStatus(tx, func(id string, status *api.TaskStatus) error {
  351. return reporter.UpdateTaskStatus(ctx, id, status)
  352. })
  353. }); err != nil {
  354. log.G(ctx).WithError(err).Errorf("failed reporting initial statuses to registered listener %v", reporter)
  355. }
  356. }
  357. func (w *worker) startTask(ctx context.Context, tx *bolt.Tx, task *api.Task) error {
  358. _, err := w.taskManager(ctx, tx, task) // side-effect taskManager creation.
  359. if err != nil {
  360. log.G(ctx).WithError(err).Error("failed to start taskManager")
  361. // we ignore this error: it gets reported in the taskStatus within
  362. // `newTaskManager`. We log it here and move on. If their is an
  363. // attempted restart, the lack of taskManager will have this retry
  364. // again.
  365. return nil
  366. }
  367. // only publish if controller resolution was successful.
  368. w.taskevents.Publish(task.Copy())
  369. return nil
  370. }
  371. func (w *worker) taskManager(ctx context.Context, tx *bolt.Tx, task *api.Task) (*taskManager, error) {
  372. if tm, ok := w.taskManagers[task.ID]; ok {
  373. return tm, nil
  374. }
  375. tm, err := w.newTaskManager(ctx, tx, task)
  376. if err != nil {
  377. return nil, err
  378. }
  379. w.taskManagers[task.ID] = tm
  380. // keep track of active tasks
  381. w.closers.Add(1)
  382. return tm, nil
  383. }
  384. func (w *worker) newTaskManager(ctx context.Context, tx *bolt.Tx, task *api.Task) (*taskManager, error) {
  385. ctx = log.WithLogger(ctx, log.G(ctx).WithFields(logrus.Fields{
  386. "task.id": task.ID,
  387. "service.id": task.ServiceID,
  388. }))
  389. ctlr, status, err := exec.Resolve(ctx, task, w.executor)
  390. if err := w.updateTaskStatus(ctx, tx, task.ID, status); err != nil {
  391. log.G(ctx).WithError(err).Error("error updating task status after controller resolution")
  392. }
  393. if err != nil {
  394. log.G(ctx).WithError(err).Error("controller resolution failed")
  395. return nil, err
  396. }
  397. return newTaskManager(ctx, task, ctlr, statusReporterFunc(func(ctx context.Context, taskID string, status *api.TaskStatus) error {
  398. w.mu.RLock()
  399. defer w.mu.RUnlock()
  400. return w.db.Update(func(tx *bolt.Tx) error {
  401. return w.updateTaskStatus(ctx, tx, taskID, status)
  402. })
  403. })), nil
  404. }
  405. // updateTaskStatus reports statuses to listeners, read lock must be held.
  406. func (w *worker) updateTaskStatus(ctx context.Context, tx *bolt.Tx, taskID string, status *api.TaskStatus) error {
  407. if err := PutTaskStatus(tx, taskID, status); err != nil {
  408. log.G(ctx).WithError(err).Error("failed writing status to disk")
  409. return err
  410. }
  411. // broadcast the task status out.
  412. for key := range w.listeners {
  413. if err := key.StatusReporter.UpdateTaskStatus(ctx, taskID, status); err != nil {
  414. log.G(ctx).WithError(err).Errorf("failed updating status for reporter %v", key.StatusReporter)
  415. }
  416. }
  417. return nil
  418. }
  419. // Subscribe to log messages matching the subscription.
  420. func (w *worker) Subscribe(ctx context.Context, subscription *api.SubscriptionMessage) error {
  421. log.G(ctx).Debugf("Received subscription %s (selector: %v)", subscription.ID, subscription.Selector)
  422. publisher, cancel, err := w.publisherProvider.Publisher(ctx, subscription.ID)
  423. if err != nil {
  424. return err
  425. }
  426. // Send a close once we're done
  427. defer cancel()
  428. match := func(t *api.Task) bool {
  429. // TODO(aluzzardi): Consider using maps to limit the iterations.
  430. for _, tid := range subscription.Selector.TaskIDs {
  431. if t.ID == tid {
  432. return true
  433. }
  434. }
  435. for _, sid := range subscription.Selector.ServiceIDs {
  436. if t.ServiceID == sid {
  437. return true
  438. }
  439. }
  440. for _, nid := range subscription.Selector.NodeIDs {
  441. if t.NodeID == nid {
  442. return true
  443. }
  444. }
  445. return false
  446. }
  447. wg := sync.WaitGroup{}
  448. w.mu.Lock()
  449. for _, tm := range w.taskManagers {
  450. if match(tm.task) {
  451. wg.Add(1)
  452. go func(tm *taskManager) {
  453. defer wg.Done()
  454. tm.Logs(ctx, *subscription.Options, publisher)
  455. }(tm)
  456. }
  457. }
  458. w.mu.Unlock()
  459. // If follow mode is disabled, wait for the current set of matched tasks
  460. // to finish publishing logs, then close the subscription by returning.
  461. if subscription.Options == nil || !subscription.Options.Follow {
  462. waitCh := make(chan struct{})
  463. go func() {
  464. defer close(waitCh)
  465. wg.Wait()
  466. }()
  467. select {
  468. case <-ctx.Done():
  469. return ctx.Err()
  470. case <-waitCh:
  471. return nil
  472. }
  473. }
  474. // In follow mode, watch for new tasks. Don't close the subscription
  475. // until it's cancelled.
  476. ch, cancel := w.taskevents.Watch()
  477. defer cancel()
  478. for {
  479. select {
  480. case v := <-ch:
  481. task := v.(*api.Task)
  482. if match(task) {
  483. w.mu.RLock()
  484. tm, ok := w.taskManagers[task.ID]
  485. w.mu.RUnlock()
  486. if !ok {
  487. continue
  488. }
  489. go tm.Logs(ctx, *subscription.Options, publisher)
  490. }
  491. case <-ctx.Done():
  492. return ctx.Err()
  493. }
  494. }
  495. }
  496. func (w *worker) Wait(ctx context.Context) error {
  497. ch := make(chan struct{})
  498. go func() {
  499. w.closers.Wait()
  500. close(ch)
  501. }()
  502. select {
  503. case <-ch:
  504. return nil
  505. case <-ctx.Done():
  506. return ctx.Err()
  507. }
  508. }