scheduler.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770
  1. package scheduler
  2. import (
  3. "context"
  4. "time"
  5. "github.com/docker/swarmkit/api"
  6. "github.com/docker/swarmkit/api/genericresource"
  7. "github.com/docker/swarmkit/log"
  8. "github.com/docker/swarmkit/manager/state"
  9. "github.com/docker/swarmkit/manager/state/store"
  10. "github.com/docker/swarmkit/protobuf/ptypes"
  11. )
  12. const (
  13. // monitorFailures is the lookback period for counting failures of
  14. // a task to determine if a node is faulty for a particular service.
  15. monitorFailures = 5 * time.Minute
  16. // maxFailures is the number of failures within monitorFailures that
  17. // triggers downweighting of a node in the sorting function.
  18. maxFailures = 5
  19. )
  20. type schedulingDecision struct {
  21. old *api.Task
  22. new *api.Task
  23. }
  24. // Scheduler assigns tasks to nodes.
  25. type Scheduler struct {
  26. store *store.MemoryStore
  27. unassignedTasks map[string]*api.Task
  28. // pendingPreassignedTasks already have NodeID, need resource validation
  29. pendingPreassignedTasks map[string]*api.Task
  30. // preassignedTasks tracks tasks that were preassigned, including those
  31. // past the pending state.
  32. preassignedTasks map[string]struct{}
  33. nodeSet nodeSet
  34. allTasks map[string]*api.Task
  35. pipeline *Pipeline
  36. // stopChan signals to the state machine to stop running
  37. stopChan chan struct{}
  38. // doneChan is closed when the state machine terminates
  39. doneChan chan struct{}
  40. }
  41. // New creates a new scheduler.
  42. func New(store *store.MemoryStore) *Scheduler {
  43. return &Scheduler{
  44. store: store,
  45. unassignedTasks: make(map[string]*api.Task),
  46. pendingPreassignedTasks: make(map[string]*api.Task),
  47. preassignedTasks: make(map[string]struct{}),
  48. allTasks: make(map[string]*api.Task),
  49. stopChan: make(chan struct{}),
  50. doneChan: make(chan struct{}),
  51. pipeline: NewPipeline(),
  52. }
  53. }
  54. func (s *Scheduler) setupTasksList(tx store.ReadTx) error {
  55. tasks, err := store.FindTasks(tx, store.All)
  56. if err != nil {
  57. return err
  58. }
  59. tasksByNode := make(map[string]map[string]*api.Task)
  60. for _, t := range tasks {
  61. // Ignore all tasks that have not reached PENDING
  62. // state and tasks that no longer consume resources.
  63. if t.Status.State < api.TaskStatePending || t.Status.State > api.TaskStateRunning {
  64. continue
  65. }
  66. // Also ignore tasks that have not yet been assigned but desired state
  67. // is beyond TaskStateCompleted. This can happen if you update, delete
  68. // or scale down a service before its tasks were assigned.
  69. if t.Status.State == api.TaskStatePending && t.DesiredState > api.TaskStateCompleted {
  70. continue
  71. }
  72. s.allTasks[t.ID] = t
  73. if t.NodeID == "" {
  74. s.enqueue(t)
  75. continue
  76. }
  77. // preassigned tasks need to validate resource requirement on corresponding node
  78. if t.Status.State == api.TaskStatePending {
  79. s.preassignedTasks[t.ID] = struct{}{}
  80. s.pendingPreassignedTasks[t.ID] = t
  81. continue
  82. }
  83. if tasksByNode[t.NodeID] == nil {
  84. tasksByNode[t.NodeID] = make(map[string]*api.Task)
  85. }
  86. tasksByNode[t.NodeID][t.ID] = t
  87. }
  88. return s.buildNodeSet(tx, tasksByNode)
  89. }
  90. // Run is the scheduler event loop.
  91. func (s *Scheduler) Run(ctx context.Context) error {
  92. defer close(s.doneChan)
  93. updates, cancel, err := store.ViewAndWatch(s.store, s.setupTasksList)
  94. if err != nil {
  95. log.G(ctx).WithError(err).Errorf("snapshot store update failed")
  96. return err
  97. }
  98. defer cancel()
  99. // Validate resource for tasks from preassigned tasks
  100. // do this before other tasks because preassigned tasks like
  101. // global service should start before other tasks
  102. s.processPreassignedTasks(ctx)
  103. // Queue all unassigned tasks before processing changes.
  104. s.tick(ctx)
  105. const (
  106. // commitDebounceGap is the amount of time to wait between
  107. // commit events to debounce them.
  108. commitDebounceGap = 50 * time.Millisecond
  109. // maxLatency is a time limit on the debouncing.
  110. maxLatency = time.Second
  111. )
  112. var (
  113. debouncingStarted time.Time
  114. commitDebounceTimer *time.Timer
  115. commitDebounceTimeout <-chan time.Time
  116. )
  117. tickRequired := false
  118. schedule := func() {
  119. if len(s.pendingPreassignedTasks) > 0 {
  120. s.processPreassignedTasks(ctx)
  121. }
  122. if tickRequired {
  123. s.tick(ctx)
  124. tickRequired = false
  125. }
  126. }
  127. // Watch for changes.
  128. for {
  129. select {
  130. case event := <-updates:
  131. switch v := event.(type) {
  132. case api.EventCreateTask:
  133. if s.createTask(ctx, v.Task) {
  134. tickRequired = true
  135. }
  136. case api.EventUpdateTask:
  137. if s.updateTask(ctx, v.Task) {
  138. tickRequired = true
  139. }
  140. case api.EventDeleteTask:
  141. if s.deleteTask(v.Task) {
  142. // deleting tasks may free up node resource, pending tasks should be re-evaluated.
  143. tickRequired = true
  144. }
  145. case api.EventCreateNode:
  146. s.createOrUpdateNode(v.Node)
  147. tickRequired = true
  148. case api.EventUpdateNode:
  149. s.createOrUpdateNode(v.Node)
  150. tickRequired = true
  151. case api.EventDeleteNode:
  152. s.nodeSet.remove(v.Node.ID)
  153. case state.EventCommit:
  154. if commitDebounceTimer != nil {
  155. if time.Since(debouncingStarted) > maxLatency {
  156. commitDebounceTimer.Stop()
  157. commitDebounceTimer = nil
  158. commitDebounceTimeout = nil
  159. schedule()
  160. } else {
  161. commitDebounceTimer.Reset(commitDebounceGap)
  162. }
  163. } else {
  164. commitDebounceTimer = time.NewTimer(commitDebounceGap)
  165. commitDebounceTimeout = commitDebounceTimer.C
  166. debouncingStarted = time.Now()
  167. }
  168. }
  169. case <-commitDebounceTimeout:
  170. schedule()
  171. commitDebounceTimer = nil
  172. commitDebounceTimeout = nil
  173. case <-s.stopChan:
  174. return nil
  175. }
  176. }
  177. }
  178. // Stop causes the scheduler event loop to stop running.
  179. func (s *Scheduler) Stop() {
  180. close(s.stopChan)
  181. <-s.doneChan
  182. }
  183. // enqueue queues a task for scheduling.
  184. func (s *Scheduler) enqueue(t *api.Task) {
  185. s.unassignedTasks[t.ID] = t
  186. }
  187. func (s *Scheduler) createTask(ctx context.Context, t *api.Task) bool {
  188. // Ignore all tasks that have not reached PENDING
  189. // state, and tasks that no longer consume resources.
  190. if t.Status.State < api.TaskStatePending || t.Status.State > api.TaskStateRunning {
  191. return false
  192. }
  193. s.allTasks[t.ID] = t
  194. if t.NodeID == "" {
  195. // unassigned task
  196. s.enqueue(t)
  197. return true
  198. }
  199. if t.Status.State == api.TaskStatePending {
  200. s.preassignedTasks[t.ID] = struct{}{}
  201. s.pendingPreassignedTasks[t.ID] = t
  202. // preassigned tasks do not contribute to running tasks count
  203. return false
  204. }
  205. nodeInfo, err := s.nodeSet.nodeInfo(t.NodeID)
  206. if err == nil && nodeInfo.addTask(t) {
  207. s.nodeSet.updateNode(nodeInfo)
  208. }
  209. return false
  210. }
  211. func (s *Scheduler) updateTask(ctx context.Context, t *api.Task) bool {
  212. // Ignore all tasks that have not reached PENDING
  213. // state.
  214. if t.Status.State < api.TaskStatePending {
  215. return false
  216. }
  217. oldTask := s.allTasks[t.ID]
  218. // Ignore all tasks that have not reached Pending
  219. // state, and tasks that no longer consume resources.
  220. if t.Status.State > api.TaskStateRunning {
  221. if oldTask == nil {
  222. return false
  223. }
  224. if t.Status.State != oldTask.Status.State &&
  225. (t.Status.State == api.TaskStateFailed || t.Status.State == api.TaskStateRejected) {
  226. // Keep track of task failures, so other nodes can be preferred
  227. // for scheduling this service if it looks like the service is
  228. // failing in a loop on this node. However, skip this for
  229. // preassigned tasks, because the scheduler does not choose
  230. // which nodes those run on.
  231. if _, wasPreassigned := s.preassignedTasks[t.ID]; !wasPreassigned {
  232. nodeInfo, err := s.nodeSet.nodeInfo(t.NodeID)
  233. if err == nil {
  234. nodeInfo.taskFailed(ctx, t)
  235. s.nodeSet.updateNode(nodeInfo)
  236. }
  237. }
  238. }
  239. s.deleteTask(oldTask)
  240. return true
  241. }
  242. if t.NodeID == "" {
  243. // unassigned task
  244. if oldTask != nil {
  245. s.deleteTask(oldTask)
  246. }
  247. s.allTasks[t.ID] = t
  248. s.enqueue(t)
  249. return true
  250. }
  251. if t.Status.State == api.TaskStatePending {
  252. if oldTask != nil {
  253. s.deleteTask(oldTask)
  254. }
  255. s.preassignedTasks[t.ID] = struct{}{}
  256. s.allTasks[t.ID] = t
  257. s.pendingPreassignedTasks[t.ID] = t
  258. // preassigned tasks do not contribute to running tasks count
  259. return false
  260. }
  261. s.allTasks[t.ID] = t
  262. nodeInfo, err := s.nodeSet.nodeInfo(t.NodeID)
  263. if err == nil && nodeInfo.addTask(t) {
  264. s.nodeSet.updateNode(nodeInfo)
  265. }
  266. return false
  267. }
  268. func (s *Scheduler) deleteTask(t *api.Task) bool {
  269. delete(s.allTasks, t.ID)
  270. delete(s.preassignedTasks, t.ID)
  271. delete(s.pendingPreassignedTasks, t.ID)
  272. nodeInfo, err := s.nodeSet.nodeInfo(t.NodeID)
  273. if err == nil && nodeInfo.removeTask(t) {
  274. s.nodeSet.updateNode(nodeInfo)
  275. return true
  276. }
  277. return false
  278. }
  279. func (s *Scheduler) createOrUpdateNode(n *api.Node) {
  280. nodeInfo, nodeInfoErr := s.nodeSet.nodeInfo(n.ID)
  281. var resources *api.Resources
  282. if n.Description != nil && n.Description.Resources != nil {
  283. resources = n.Description.Resources.Copy()
  284. // reconcile resources by looping over all tasks in this node
  285. if nodeInfoErr == nil {
  286. for _, task := range nodeInfo.Tasks {
  287. reservations := taskReservations(task.Spec)
  288. resources.MemoryBytes -= reservations.MemoryBytes
  289. resources.NanoCPUs -= reservations.NanoCPUs
  290. genericresource.ConsumeNodeResources(&resources.Generic,
  291. task.AssignedGenericResources)
  292. }
  293. }
  294. } else {
  295. resources = &api.Resources{}
  296. }
  297. if nodeInfoErr != nil {
  298. nodeInfo = newNodeInfo(n, nil, *resources)
  299. } else {
  300. nodeInfo.Node = n
  301. nodeInfo.AvailableResources = resources
  302. }
  303. s.nodeSet.addOrUpdateNode(nodeInfo)
  304. }
  305. func (s *Scheduler) processPreassignedTasks(ctx context.Context) {
  306. schedulingDecisions := make(map[string]schedulingDecision, len(s.pendingPreassignedTasks))
  307. for _, t := range s.pendingPreassignedTasks {
  308. newT := s.taskFitNode(ctx, t, t.NodeID)
  309. if newT == nil {
  310. continue
  311. }
  312. schedulingDecisions[t.ID] = schedulingDecision{old: t, new: newT}
  313. }
  314. successful, failed := s.applySchedulingDecisions(ctx, schedulingDecisions)
  315. for _, decision := range successful {
  316. if decision.new.Status.State == api.TaskStateAssigned {
  317. delete(s.pendingPreassignedTasks, decision.old.ID)
  318. }
  319. }
  320. for _, decision := range failed {
  321. s.allTasks[decision.old.ID] = decision.old
  322. nodeInfo, err := s.nodeSet.nodeInfo(decision.new.NodeID)
  323. if err == nil && nodeInfo.removeTask(decision.new) {
  324. s.nodeSet.updateNode(nodeInfo)
  325. }
  326. }
  327. }
  328. // tick attempts to schedule the queue.
  329. func (s *Scheduler) tick(ctx context.Context) {
  330. type commonSpecKey struct {
  331. serviceID string
  332. specVersion api.Version
  333. }
  334. tasksByCommonSpec := make(map[commonSpecKey]map[string]*api.Task)
  335. var oneOffTasks []*api.Task
  336. schedulingDecisions := make(map[string]schedulingDecision, len(s.unassignedTasks))
  337. for taskID, t := range s.unassignedTasks {
  338. if t == nil || t.NodeID != "" {
  339. // task deleted or already assigned
  340. delete(s.unassignedTasks, taskID)
  341. continue
  342. }
  343. // Group tasks with common specs
  344. if t.SpecVersion != nil {
  345. taskGroupKey := commonSpecKey{
  346. serviceID: t.ServiceID,
  347. specVersion: *t.SpecVersion,
  348. }
  349. if tasksByCommonSpec[taskGroupKey] == nil {
  350. tasksByCommonSpec[taskGroupKey] = make(map[string]*api.Task)
  351. }
  352. tasksByCommonSpec[taskGroupKey][taskID] = t
  353. } else {
  354. // This task doesn't have a spec version. We have to
  355. // schedule it as a one-off.
  356. oneOffTasks = append(oneOffTasks, t)
  357. }
  358. delete(s.unassignedTasks, taskID)
  359. }
  360. for _, taskGroup := range tasksByCommonSpec {
  361. s.scheduleTaskGroup(ctx, taskGroup, schedulingDecisions)
  362. }
  363. for _, t := range oneOffTasks {
  364. s.scheduleTaskGroup(ctx, map[string]*api.Task{t.ID: t}, schedulingDecisions)
  365. }
  366. _, failed := s.applySchedulingDecisions(ctx, schedulingDecisions)
  367. for _, decision := range failed {
  368. s.allTasks[decision.old.ID] = decision.old
  369. nodeInfo, err := s.nodeSet.nodeInfo(decision.new.NodeID)
  370. if err == nil && nodeInfo.removeTask(decision.new) {
  371. s.nodeSet.updateNode(nodeInfo)
  372. }
  373. // enqueue task for next scheduling attempt
  374. s.enqueue(decision.old)
  375. }
  376. }
  377. func (s *Scheduler) applySchedulingDecisions(ctx context.Context, schedulingDecisions map[string]schedulingDecision) (successful, failed []schedulingDecision) {
  378. if len(schedulingDecisions) == 0 {
  379. return
  380. }
  381. successful = make([]schedulingDecision, 0, len(schedulingDecisions))
  382. // Apply changes to master store
  383. err := s.store.Batch(func(batch *store.Batch) error {
  384. for len(schedulingDecisions) > 0 {
  385. err := batch.Update(func(tx store.Tx) error {
  386. // Update exactly one task inside this Update
  387. // callback.
  388. for taskID, decision := range schedulingDecisions {
  389. delete(schedulingDecisions, taskID)
  390. t := store.GetTask(tx, taskID)
  391. if t == nil {
  392. // Task no longer exists
  393. s.deleteTask(decision.new)
  394. continue
  395. }
  396. if t.Status.State == decision.new.Status.State &&
  397. t.Status.Message == decision.new.Status.Message &&
  398. t.Status.Err == decision.new.Status.Err {
  399. // No changes, ignore
  400. continue
  401. }
  402. if t.Status.State >= api.TaskStateAssigned {
  403. nodeInfo, err := s.nodeSet.nodeInfo(decision.new.NodeID)
  404. if err != nil {
  405. failed = append(failed, decision)
  406. continue
  407. }
  408. node := store.GetNode(tx, decision.new.NodeID)
  409. if node == nil || node.Meta.Version != nodeInfo.Meta.Version {
  410. // node is out of date
  411. failed = append(failed, decision)
  412. continue
  413. }
  414. }
  415. if err := store.UpdateTask(tx, decision.new); err != nil {
  416. log.G(ctx).Debugf("scheduler failed to update task %s; will retry", taskID)
  417. failed = append(failed, decision)
  418. continue
  419. }
  420. successful = append(successful, decision)
  421. return nil
  422. }
  423. return nil
  424. })
  425. if err != nil {
  426. return err
  427. }
  428. }
  429. return nil
  430. })
  431. if err != nil {
  432. log.G(ctx).WithError(err).Error("scheduler tick transaction failed")
  433. failed = append(failed, successful...)
  434. successful = nil
  435. }
  436. return
  437. }
  438. // taskFitNode checks if a node has enough resources to accommodate a task.
  439. func (s *Scheduler) taskFitNode(ctx context.Context, t *api.Task, nodeID string) *api.Task {
  440. nodeInfo, err := s.nodeSet.nodeInfo(nodeID)
  441. if err != nil {
  442. // node does not exist in set (it may have been deleted)
  443. return nil
  444. }
  445. newT := *t
  446. s.pipeline.SetTask(t)
  447. if !s.pipeline.Process(&nodeInfo) {
  448. // this node cannot accommodate this task
  449. newT.Status.Timestamp = ptypes.MustTimestampProto(time.Now())
  450. newT.Status.Err = s.pipeline.Explain()
  451. s.allTasks[t.ID] = &newT
  452. return &newT
  453. }
  454. newT.Status = api.TaskStatus{
  455. State: api.TaskStateAssigned,
  456. Timestamp: ptypes.MustTimestampProto(time.Now()),
  457. Message: "scheduler confirmed task can run on preassigned node",
  458. }
  459. s.allTasks[t.ID] = &newT
  460. if nodeInfo.addTask(&newT) {
  461. s.nodeSet.updateNode(nodeInfo)
  462. }
  463. return &newT
  464. }
  465. // scheduleTaskGroup schedules a batch of tasks that are part of the same
  466. // service and share the same version of the spec.
  467. func (s *Scheduler) scheduleTaskGroup(ctx context.Context, taskGroup map[string]*api.Task, schedulingDecisions map[string]schedulingDecision) {
  468. // Pick at task at random from taskGroup to use for constraint
  469. // evaluation. It doesn't matter which one we pick because all the
  470. // tasks in the group are equal in terms of the fields the constraint
  471. // filters consider.
  472. var t *api.Task
  473. for _, t = range taskGroup {
  474. break
  475. }
  476. s.pipeline.SetTask(t)
  477. now := time.Now()
  478. nodeLess := func(a *NodeInfo, b *NodeInfo) bool {
  479. // If either node has at least maxFailures recent failures,
  480. // that's the deciding factor.
  481. recentFailuresA := a.countRecentFailures(now, t)
  482. recentFailuresB := b.countRecentFailures(now, t)
  483. if recentFailuresA >= maxFailures || recentFailuresB >= maxFailures {
  484. if recentFailuresA > recentFailuresB {
  485. return false
  486. }
  487. if recentFailuresB > recentFailuresA {
  488. return true
  489. }
  490. }
  491. tasksByServiceA := a.ActiveTasksCountByService[t.ServiceID]
  492. tasksByServiceB := b.ActiveTasksCountByService[t.ServiceID]
  493. if tasksByServiceA < tasksByServiceB {
  494. return true
  495. }
  496. if tasksByServiceA > tasksByServiceB {
  497. return false
  498. }
  499. // Total number of tasks breaks ties.
  500. return a.ActiveTasksCount < b.ActiveTasksCount
  501. }
  502. var prefs []*api.PlacementPreference
  503. if t.Spec.Placement != nil {
  504. prefs = t.Spec.Placement.Preferences
  505. }
  506. tree := s.nodeSet.tree(t.ServiceID, prefs, len(taskGroup), s.pipeline.Process, nodeLess)
  507. s.scheduleNTasksOnSubtree(ctx, len(taskGroup), taskGroup, &tree, schedulingDecisions, nodeLess)
  508. if len(taskGroup) != 0 {
  509. s.noSuitableNode(ctx, taskGroup, schedulingDecisions)
  510. }
  511. }
  512. func (s *Scheduler) scheduleNTasksOnSubtree(ctx context.Context, n int, taskGroup map[string]*api.Task, tree *decisionTree, schedulingDecisions map[string]schedulingDecision, nodeLess func(a *NodeInfo, b *NodeInfo) bool) int {
  513. if tree.next == nil {
  514. nodes := tree.orderedNodes(s.pipeline.Process, nodeLess)
  515. if len(nodes) == 0 {
  516. return 0
  517. }
  518. return s.scheduleNTasksOnNodes(ctx, n, taskGroup, nodes, schedulingDecisions, nodeLess)
  519. }
  520. // Walk the tree and figure out how the tasks should be split at each
  521. // level.
  522. tasksScheduled := 0
  523. tasksInUsableBranches := tree.tasks
  524. var noRoom map[*decisionTree]struct{}
  525. // Try to make branches even until either all branches are
  526. // full, or all tasks have been scheduled.
  527. for tasksScheduled != n && len(noRoom) != len(tree.next) {
  528. desiredTasksPerBranch := (tasksInUsableBranches + n - tasksScheduled) / (len(tree.next) - len(noRoom))
  529. remainder := (tasksInUsableBranches + n - tasksScheduled) % (len(tree.next) - len(noRoom))
  530. for _, subtree := range tree.next {
  531. if noRoom != nil {
  532. if _, ok := noRoom[subtree]; ok {
  533. continue
  534. }
  535. }
  536. subtreeTasks := subtree.tasks
  537. if subtreeTasks < desiredTasksPerBranch || (subtreeTasks == desiredTasksPerBranch && remainder > 0) {
  538. tasksToAssign := desiredTasksPerBranch - subtreeTasks
  539. if remainder > 0 {
  540. tasksToAssign++
  541. }
  542. res := s.scheduleNTasksOnSubtree(ctx, tasksToAssign, taskGroup, subtree, schedulingDecisions, nodeLess)
  543. if res < tasksToAssign {
  544. if noRoom == nil {
  545. noRoom = make(map[*decisionTree]struct{})
  546. }
  547. noRoom[subtree] = struct{}{}
  548. tasksInUsableBranches -= subtreeTasks
  549. } else if remainder > 0 {
  550. remainder--
  551. }
  552. tasksScheduled += res
  553. }
  554. }
  555. }
  556. return tasksScheduled
  557. }
  558. func (s *Scheduler) scheduleNTasksOnNodes(ctx context.Context, n int, taskGroup map[string]*api.Task, nodes []NodeInfo, schedulingDecisions map[string]schedulingDecision, nodeLess func(a *NodeInfo, b *NodeInfo) bool) int {
  559. tasksScheduled := 0
  560. failedConstraints := make(map[int]bool) // key is index in nodes slice
  561. nodeIter := 0
  562. nodeCount := len(nodes)
  563. for taskID, t := range taskGroup {
  564. // Skip tasks which were already scheduled because they ended
  565. // up in two groups at once.
  566. if _, exists := schedulingDecisions[taskID]; exists {
  567. continue
  568. }
  569. node := &nodes[nodeIter%nodeCount]
  570. log.G(ctx).WithField("task.id", t.ID).Debugf("assigning to node %s", node.ID)
  571. newT := *t
  572. newT.NodeID = node.ID
  573. newT.Status = api.TaskStatus{
  574. State: api.TaskStateAssigned,
  575. Timestamp: ptypes.MustTimestampProto(time.Now()),
  576. Message: "scheduler assigned task to node",
  577. }
  578. s.allTasks[t.ID] = &newT
  579. nodeInfo, err := s.nodeSet.nodeInfo(node.ID)
  580. if err == nil && nodeInfo.addTask(&newT) {
  581. s.nodeSet.updateNode(nodeInfo)
  582. nodes[nodeIter%nodeCount] = nodeInfo
  583. }
  584. schedulingDecisions[taskID] = schedulingDecision{old: t, new: &newT}
  585. delete(taskGroup, taskID)
  586. tasksScheduled++
  587. if tasksScheduled == n {
  588. return tasksScheduled
  589. }
  590. if nodeIter+1 < nodeCount {
  591. // First pass fills the nodes until they have the same
  592. // number of tasks from this service.
  593. nextNode := nodes[(nodeIter+1)%nodeCount]
  594. if nodeLess(&nextNode, &nodeInfo) {
  595. nodeIter++
  596. }
  597. } else {
  598. // In later passes, we just assign one task at a time
  599. // to each node that still meets the constraints.
  600. nodeIter++
  601. }
  602. origNodeIter := nodeIter
  603. for failedConstraints[nodeIter%nodeCount] || !s.pipeline.Process(&nodes[nodeIter%nodeCount]) {
  604. failedConstraints[nodeIter%nodeCount] = true
  605. nodeIter++
  606. if nodeIter-origNodeIter == nodeCount {
  607. // None of the nodes meet the constraints anymore.
  608. return tasksScheduled
  609. }
  610. }
  611. }
  612. return tasksScheduled
  613. }
  614. // noSuitableNode checks unassigned tasks and make sure they have an existing service in the store before
  615. // updating the task status and adding it back to: schedulingDecisions, unassignedTasks and allTasks
  616. func (s *Scheduler) noSuitableNode(ctx context.Context, taskGroup map[string]*api.Task, schedulingDecisions map[string]schedulingDecision) {
  617. explanation := s.pipeline.Explain()
  618. for _, t := range taskGroup {
  619. var service *api.Service
  620. s.store.View(func(tx store.ReadTx) {
  621. service = store.GetService(tx, t.ServiceID)
  622. })
  623. if service == nil {
  624. log.G(ctx).WithField("task.id", t.ID).Debug("removing task from the scheduler")
  625. continue
  626. }
  627. log.G(ctx).WithField("task.id", t.ID).Debug("no suitable node available for task")
  628. newT := *t
  629. newT.Status.Timestamp = ptypes.MustTimestampProto(time.Now())
  630. sv := service.SpecVersion
  631. tv := newT.SpecVersion
  632. if sv != nil && tv != nil && sv.Index > tv.Index {
  633. log.G(ctx).WithField("task.id", t.ID).Debug(
  634. "task belongs to old revision of service",
  635. )
  636. if t.Status.State == api.TaskStatePending && t.DesiredState >= api.TaskStateShutdown {
  637. log.G(ctx).WithField("task.id", t.ID).Debug(
  638. "task is desired shutdown, scheduler will go ahead and do so",
  639. )
  640. newT.Status.State = api.TaskStateShutdown
  641. newT.Status.Err = ""
  642. }
  643. } else {
  644. if explanation != "" {
  645. newT.Status.Err = "no suitable node (" + explanation + ")"
  646. } else {
  647. newT.Status.Err = "no suitable node"
  648. }
  649. // re-enqueue a task that should still be attempted
  650. s.enqueue(&newT)
  651. }
  652. s.allTasks[t.ID] = &newT
  653. schedulingDecisions[t.ID] = schedulingDecision{old: t, new: &newT}
  654. }
  655. }
  656. func (s *Scheduler) buildNodeSet(tx store.ReadTx, tasksByNode map[string]map[string]*api.Task) error {
  657. nodes, err := store.FindNodes(tx, store.All)
  658. if err != nil {
  659. return err
  660. }
  661. s.nodeSet.alloc(len(nodes))
  662. for _, n := range nodes {
  663. var resources api.Resources
  664. if n.Description != nil && n.Description.Resources != nil {
  665. resources = *n.Description.Resources
  666. }
  667. s.nodeSet.addOrUpdateNode(newNodeInfo(n, tasksByNode[n.ID], resources))
  668. }
  669. return nil
  670. }