scheduler.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710
  1. package scheduler
  2. import (
  3. "time"
  4. "github.com/docker/swarmkit/api"
  5. "github.com/docker/swarmkit/log"
  6. "github.com/docker/swarmkit/manager/state"
  7. "github.com/docker/swarmkit/manager/state/store"
  8. "github.com/docker/swarmkit/protobuf/ptypes"
  9. "golang.org/x/net/context"
  10. )
  11. const (
  12. // monitorFailures is the lookback period for counting failures of
  13. // a task to determine if a node is faulty for a particular service.
  14. monitorFailures = 5 * time.Minute
  15. // maxFailures is the number of failures within monitorFailures that
  16. // triggers downweighting of a node in the sorting function.
  17. maxFailures = 5
  18. )
  19. type schedulingDecision struct {
  20. old *api.Task
  21. new *api.Task
  22. }
  23. // Scheduler assigns tasks to nodes.
  24. type Scheduler struct {
  25. store *store.MemoryStore
  26. unassignedTasks map[string]*api.Task
  27. // preassignedTasks already have NodeID, need resource validation
  28. preassignedTasks map[string]*api.Task
  29. nodeSet nodeSet
  30. allTasks map[string]*api.Task
  31. pipeline *Pipeline
  32. // stopChan signals to the state machine to stop running
  33. stopChan chan struct{}
  34. // doneChan is closed when the state machine terminates
  35. doneChan chan struct{}
  36. }
  37. // New creates a new scheduler.
  38. func New(store *store.MemoryStore) *Scheduler {
  39. return &Scheduler{
  40. store: store,
  41. unassignedTasks: make(map[string]*api.Task),
  42. preassignedTasks: make(map[string]*api.Task),
  43. allTasks: make(map[string]*api.Task),
  44. stopChan: make(chan struct{}),
  45. doneChan: make(chan struct{}),
  46. pipeline: NewPipeline(),
  47. }
  48. }
  49. func (s *Scheduler) setupTasksList(tx store.ReadTx) error {
  50. tasks, err := store.FindTasks(tx, store.All)
  51. if err != nil {
  52. return err
  53. }
  54. tasksByNode := make(map[string]map[string]*api.Task)
  55. for _, t := range tasks {
  56. // Ignore all tasks that have not reached PENDING
  57. // state and tasks that no longer consume resources.
  58. if t.Status.State < api.TaskStatePending || t.Status.State > api.TaskStateRunning {
  59. continue
  60. }
  61. s.allTasks[t.ID] = t
  62. if t.NodeID == "" {
  63. s.enqueue(t)
  64. continue
  65. }
  66. // preassigned tasks need to validate resource requirement on corresponding node
  67. if t.Status.State == api.TaskStatePending {
  68. s.preassignedTasks[t.ID] = t
  69. continue
  70. }
  71. if tasksByNode[t.NodeID] == nil {
  72. tasksByNode[t.NodeID] = make(map[string]*api.Task)
  73. }
  74. tasksByNode[t.NodeID][t.ID] = t
  75. }
  76. if err := s.buildNodeSet(tx, tasksByNode); err != nil {
  77. return err
  78. }
  79. return nil
  80. }
  81. // Run is the scheduler event loop.
  82. func (s *Scheduler) Run(ctx context.Context) error {
  83. defer close(s.doneChan)
  84. updates, cancel, err := store.ViewAndWatch(s.store, s.setupTasksList)
  85. if err != nil {
  86. log.G(ctx).WithError(err).Errorf("snapshot store update failed")
  87. return err
  88. }
  89. defer cancel()
  90. // Validate resource for tasks from preassigned tasks
  91. // do this before other tasks because preassigned tasks like
  92. // global service should start before other tasks
  93. s.processPreassignedTasks(ctx)
  94. // Queue all unassigned tasks before processing changes.
  95. s.tick(ctx)
  96. const (
  97. // commitDebounceGap is the amount of time to wait between
  98. // commit events to debounce them.
  99. commitDebounceGap = 50 * time.Millisecond
  100. // maxLatency is a time limit on the debouncing.
  101. maxLatency = time.Second
  102. )
  103. var (
  104. debouncingStarted time.Time
  105. commitDebounceTimer *time.Timer
  106. commitDebounceTimeout <-chan time.Time
  107. )
  108. tickRequired := false
  109. schedule := func() {
  110. if len(s.preassignedTasks) > 0 {
  111. s.processPreassignedTasks(ctx)
  112. }
  113. if tickRequired {
  114. s.tick(ctx)
  115. tickRequired = false
  116. }
  117. }
  118. // Watch for changes.
  119. for {
  120. select {
  121. case event := <-updates:
  122. switch v := event.(type) {
  123. case api.EventCreateTask:
  124. if s.createTask(ctx, v.Task) {
  125. tickRequired = true
  126. }
  127. case api.EventUpdateTask:
  128. if s.updateTask(ctx, v.Task) {
  129. tickRequired = true
  130. }
  131. case api.EventDeleteTask:
  132. if s.deleteTask(ctx, v.Task) {
  133. // deleting tasks may free up node resource, pending tasks should be re-evaluated.
  134. tickRequired = true
  135. }
  136. case api.EventCreateNode:
  137. s.createOrUpdateNode(v.Node)
  138. tickRequired = true
  139. case api.EventUpdateNode:
  140. s.createOrUpdateNode(v.Node)
  141. tickRequired = true
  142. case api.EventDeleteNode:
  143. s.nodeSet.remove(v.Node.ID)
  144. case state.EventCommit:
  145. if commitDebounceTimer != nil {
  146. if time.Since(debouncingStarted) > maxLatency {
  147. commitDebounceTimer.Stop()
  148. commitDebounceTimer = nil
  149. commitDebounceTimeout = nil
  150. schedule()
  151. } else {
  152. commitDebounceTimer.Reset(commitDebounceGap)
  153. }
  154. } else {
  155. commitDebounceTimer = time.NewTimer(commitDebounceGap)
  156. commitDebounceTimeout = commitDebounceTimer.C
  157. debouncingStarted = time.Now()
  158. }
  159. }
  160. case <-commitDebounceTimeout:
  161. schedule()
  162. commitDebounceTimer = nil
  163. commitDebounceTimeout = nil
  164. case <-s.stopChan:
  165. return nil
  166. }
  167. }
  168. }
  169. // Stop causes the scheduler event loop to stop running.
  170. func (s *Scheduler) Stop() {
  171. close(s.stopChan)
  172. <-s.doneChan
  173. }
  174. // enqueue queues a task for scheduling.
  175. func (s *Scheduler) enqueue(t *api.Task) {
  176. s.unassignedTasks[t.ID] = t
  177. }
  178. func (s *Scheduler) createTask(ctx context.Context, t *api.Task) bool {
  179. // Ignore all tasks that have not reached PENDING
  180. // state, and tasks that no longer consume resources.
  181. if t.Status.State < api.TaskStatePending || t.Status.State > api.TaskStateRunning {
  182. return false
  183. }
  184. s.allTasks[t.ID] = t
  185. if t.NodeID == "" {
  186. // unassigned task
  187. s.enqueue(t)
  188. return true
  189. }
  190. if t.Status.State == api.TaskStatePending {
  191. s.preassignedTasks[t.ID] = t
  192. // preassigned tasks do not contribute to running tasks count
  193. return false
  194. }
  195. nodeInfo, err := s.nodeSet.nodeInfo(t.NodeID)
  196. if err == nil && nodeInfo.addTask(t) {
  197. s.nodeSet.updateNode(nodeInfo)
  198. }
  199. return false
  200. }
  201. func (s *Scheduler) updateTask(ctx context.Context, t *api.Task) bool {
  202. // Ignore all tasks that have not reached PENDING
  203. // state.
  204. if t.Status.State < api.TaskStatePending {
  205. return false
  206. }
  207. oldTask := s.allTasks[t.ID]
  208. // Ignore all tasks that have not reached Pending
  209. // state, and tasks that no longer consume resources.
  210. if t.Status.State > api.TaskStateRunning {
  211. if oldTask == nil {
  212. return false
  213. }
  214. s.deleteTask(ctx, oldTask)
  215. if t.Status.State != oldTask.Status.State &&
  216. (t.Status.State == api.TaskStateFailed || t.Status.State == api.TaskStateRejected) {
  217. nodeInfo, err := s.nodeSet.nodeInfo(t.NodeID)
  218. if err == nil {
  219. nodeInfo.taskFailed(ctx, t.ServiceID)
  220. s.nodeSet.updateNode(nodeInfo)
  221. }
  222. }
  223. return true
  224. }
  225. if t.NodeID == "" {
  226. // unassigned task
  227. if oldTask != nil {
  228. s.deleteTask(ctx, oldTask)
  229. }
  230. s.allTasks[t.ID] = t
  231. s.enqueue(t)
  232. return true
  233. }
  234. if t.Status.State == api.TaskStatePending {
  235. if oldTask != nil {
  236. s.deleteTask(ctx, oldTask)
  237. }
  238. s.allTasks[t.ID] = t
  239. s.preassignedTasks[t.ID] = t
  240. // preassigned tasks do not contribute to running tasks count
  241. return false
  242. }
  243. s.allTasks[t.ID] = t
  244. nodeInfo, err := s.nodeSet.nodeInfo(t.NodeID)
  245. if err == nil && nodeInfo.addTask(t) {
  246. s.nodeSet.updateNode(nodeInfo)
  247. }
  248. return false
  249. }
  250. func (s *Scheduler) deleteTask(ctx context.Context, t *api.Task) bool {
  251. delete(s.allTasks, t.ID)
  252. delete(s.preassignedTasks, t.ID)
  253. nodeInfo, err := s.nodeSet.nodeInfo(t.NodeID)
  254. if err == nil && nodeInfo.removeTask(t) {
  255. s.nodeSet.updateNode(nodeInfo)
  256. return true
  257. }
  258. return false
  259. }
  260. func (s *Scheduler) createOrUpdateNode(n *api.Node) {
  261. nodeInfo, _ := s.nodeSet.nodeInfo(n.ID)
  262. var resources api.Resources
  263. if n.Description != nil && n.Description.Resources != nil {
  264. resources = *n.Description.Resources
  265. // reconcile resources by looping over all tasks in this node
  266. for _, task := range nodeInfo.Tasks {
  267. reservations := taskReservations(task.Spec)
  268. resources.MemoryBytes -= reservations.MemoryBytes
  269. resources.NanoCPUs -= reservations.NanoCPUs
  270. }
  271. }
  272. nodeInfo.Node = n
  273. nodeInfo.AvailableResources = resources
  274. s.nodeSet.addOrUpdateNode(nodeInfo)
  275. }
  276. func (s *Scheduler) processPreassignedTasks(ctx context.Context) {
  277. schedulingDecisions := make(map[string]schedulingDecision, len(s.preassignedTasks))
  278. for _, t := range s.preassignedTasks {
  279. newT := s.taskFitNode(ctx, t, t.NodeID)
  280. if newT == nil {
  281. continue
  282. }
  283. schedulingDecisions[t.ID] = schedulingDecision{old: t, new: newT}
  284. }
  285. successful, failed := s.applySchedulingDecisions(ctx, schedulingDecisions)
  286. for _, decision := range successful {
  287. if decision.new.Status.State == api.TaskStateAssigned {
  288. delete(s.preassignedTasks, decision.old.ID)
  289. }
  290. }
  291. for _, decision := range failed {
  292. s.allTasks[decision.old.ID] = decision.old
  293. nodeInfo, err := s.nodeSet.nodeInfo(decision.new.NodeID)
  294. if err == nil && nodeInfo.removeTask(decision.new) {
  295. s.nodeSet.updateNode(nodeInfo)
  296. }
  297. }
  298. }
  299. // tick attempts to schedule the queue.
  300. func (s *Scheduler) tick(ctx context.Context) {
  301. type commonSpecKey struct {
  302. serviceID string
  303. specVersion api.Version
  304. }
  305. tasksByCommonSpec := make(map[commonSpecKey]map[string]*api.Task)
  306. var oneOffTasks []*api.Task
  307. schedulingDecisions := make(map[string]schedulingDecision, len(s.unassignedTasks))
  308. for taskID, t := range s.unassignedTasks {
  309. if t == nil || t.NodeID != "" {
  310. // task deleted or already assigned
  311. delete(s.unassignedTasks, taskID)
  312. continue
  313. }
  314. // Group tasks with common specs
  315. if t.SpecVersion != nil {
  316. taskGroupKey := commonSpecKey{
  317. serviceID: t.ServiceID,
  318. specVersion: *t.SpecVersion,
  319. }
  320. if tasksByCommonSpec[taskGroupKey] == nil {
  321. tasksByCommonSpec[taskGroupKey] = make(map[string]*api.Task)
  322. }
  323. tasksByCommonSpec[taskGroupKey][taskID] = t
  324. } else {
  325. // This task doesn't have a spec version. We have to
  326. // schedule it as a one-off.
  327. oneOffTasks = append(oneOffTasks, t)
  328. }
  329. delete(s.unassignedTasks, taskID)
  330. }
  331. for _, taskGroup := range tasksByCommonSpec {
  332. s.scheduleTaskGroup(ctx, taskGroup, schedulingDecisions)
  333. }
  334. for _, t := range oneOffTasks {
  335. s.scheduleTaskGroup(ctx, map[string]*api.Task{t.ID: t}, schedulingDecisions)
  336. }
  337. _, failed := s.applySchedulingDecisions(ctx, schedulingDecisions)
  338. for _, decision := range failed {
  339. s.allTasks[decision.old.ID] = decision.old
  340. nodeInfo, err := s.nodeSet.nodeInfo(decision.new.NodeID)
  341. if err == nil && nodeInfo.removeTask(decision.new) {
  342. s.nodeSet.updateNode(nodeInfo)
  343. }
  344. // enqueue task for next scheduling attempt
  345. s.enqueue(decision.old)
  346. }
  347. }
  348. func (s *Scheduler) applySchedulingDecisions(ctx context.Context, schedulingDecisions map[string]schedulingDecision) (successful, failed []schedulingDecision) {
  349. if len(schedulingDecisions) == 0 {
  350. return
  351. }
  352. successful = make([]schedulingDecision, 0, len(schedulingDecisions))
  353. // Apply changes to master store
  354. err := s.store.Batch(func(batch *store.Batch) error {
  355. for len(schedulingDecisions) > 0 {
  356. err := batch.Update(func(tx store.Tx) error {
  357. // Update exactly one task inside this Update
  358. // callback.
  359. for taskID, decision := range schedulingDecisions {
  360. delete(schedulingDecisions, taskID)
  361. t := store.GetTask(tx, taskID)
  362. if t == nil {
  363. // Task no longer exists
  364. nodeInfo, err := s.nodeSet.nodeInfo(decision.new.NodeID)
  365. if err == nil && nodeInfo.removeTask(decision.new) {
  366. s.nodeSet.updateNode(nodeInfo)
  367. }
  368. delete(s.allTasks, decision.old.ID)
  369. continue
  370. }
  371. if t.Status.State == decision.new.Status.State && t.Status.Message == decision.new.Status.Message {
  372. // No changes, ignore
  373. continue
  374. }
  375. if t.Status.State >= api.TaskStateAssigned {
  376. nodeInfo, err := s.nodeSet.nodeInfo(decision.new.NodeID)
  377. if err != nil {
  378. failed = append(failed, decision)
  379. continue
  380. }
  381. node := store.GetNode(tx, decision.new.NodeID)
  382. if node == nil || node.Meta.Version != nodeInfo.Meta.Version {
  383. // node is out of date
  384. failed = append(failed, decision)
  385. continue
  386. }
  387. }
  388. if err := store.UpdateTask(tx, decision.new); err != nil {
  389. log.G(ctx).Debugf("scheduler failed to update task %s; will retry", taskID)
  390. failed = append(failed, decision)
  391. continue
  392. }
  393. successful = append(successful, decision)
  394. return nil
  395. }
  396. return nil
  397. })
  398. if err != nil {
  399. return err
  400. }
  401. }
  402. return nil
  403. })
  404. if err != nil {
  405. log.G(ctx).WithError(err).Error("scheduler tick transaction failed")
  406. failed = append(failed, successful...)
  407. successful = nil
  408. }
  409. return
  410. }
  411. // taskFitNode checks if a node has enough resources to accommodate a task.
  412. func (s *Scheduler) taskFitNode(ctx context.Context, t *api.Task, nodeID string) *api.Task {
  413. nodeInfo, err := s.nodeSet.nodeInfo(nodeID)
  414. if err != nil {
  415. // node does not exist in set (it may have been deleted)
  416. return nil
  417. }
  418. newT := *t
  419. s.pipeline.SetTask(t)
  420. if !s.pipeline.Process(&nodeInfo) {
  421. // this node cannot accommodate this task
  422. newT.Status.Timestamp = ptypes.MustTimestampProto(time.Now())
  423. newT.Status.Message = s.pipeline.Explain()
  424. s.allTasks[t.ID] = &newT
  425. return &newT
  426. }
  427. newT.Status = api.TaskStatus{
  428. State: api.TaskStateAssigned,
  429. Timestamp: ptypes.MustTimestampProto(time.Now()),
  430. Message: "scheduler confirmed task can run on preassigned node",
  431. }
  432. s.allTasks[t.ID] = &newT
  433. if nodeInfo.addTask(&newT) {
  434. s.nodeSet.updateNode(nodeInfo)
  435. }
  436. return &newT
  437. }
  438. // scheduleTaskGroup schedules a batch of tasks that are part of the same
  439. // service and share the same version of the spec.
  440. func (s *Scheduler) scheduleTaskGroup(ctx context.Context, taskGroup map[string]*api.Task, schedulingDecisions map[string]schedulingDecision) {
  441. // Pick at task at random from taskGroup to use for constraint
  442. // evaluation. It doesn't matter which one we pick because all the
  443. // tasks in the group are equal in terms of the fields the constraint
  444. // filters consider.
  445. var t *api.Task
  446. for _, t = range taskGroup {
  447. break
  448. }
  449. s.pipeline.SetTask(t)
  450. now := time.Now()
  451. nodeLess := func(a *NodeInfo, b *NodeInfo) bool {
  452. // If either node has at least maxFailures recent failures,
  453. // that's the deciding factor.
  454. recentFailuresA := a.countRecentFailures(now, t.ServiceID)
  455. recentFailuresB := b.countRecentFailures(now, t.ServiceID)
  456. if recentFailuresA >= maxFailures || recentFailuresB >= maxFailures {
  457. if recentFailuresA > recentFailuresB {
  458. return false
  459. }
  460. if recentFailuresB > recentFailuresA {
  461. return true
  462. }
  463. }
  464. tasksByServiceA := a.ActiveTasksCountByService[t.ServiceID]
  465. tasksByServiceB := b.ActiveTasksCountByService[t.ServiceID]
  466. if tasksByServiceA < tasksByServiceB {
  467. return true
  468. }
  469. if tasksByServiceA > tasksByServiceB {
  470. return false
  471. }
  472. // Total number of tasks breaks ties.
  473. return a.ActiveTasksCount < b.ActiveTasksCount
  474. }
  475. var prefs []*api.PlacementPreference
  476. if t.Spec.Placement != nil {
  477. prefs = t.Spec.Placement.Preferences
  478. }
  479. tree := s.nodeSet.tree(t.ServiceID, prefs, len(taskGroup), s.pipeline.Process, nodeLess)
  480. s.scheduleNTasksOnSubtree(ctx, len(taskGroup), taskGroup, &tree, schedulingDecisions, nodeLess)
  481. if len(taskGroup) != 0 {
  482. s.noSuitableNode(ctx, taskGroup, schedulingDecisions)
  483. }
  484. }
  485. func (s *Scheduler) scheduleNTasksOnSubtree(ctx context.Context, n int, taskGroup map[string]*api.Task, tree *decisionTree, schedulingDecisions map[string]schedulingDecision, nodeLess func(a *NodeInfo, b *NodeInfo) bool) int {
  486. if tree.next == nil {
  487. nodes := tree.orderedNodes(s.pipeline.Process, nodeLess)
  488. if len(nodes) == 0 {
  489. return 0
  490. }
  491. return s.scheduleNTasksOnNodes(ctx, n, taskGroup, nodes, schedulingDecisions, nodeLess)
  492. }
  493. // Walk the tree and figure out how the tasks should be split at each
  494. // level.
  495. tasksScheduled := 0
  496. tasksInUsableBranches := tree.tasks
  497. var noRoom map[*decisionTree]struct{}
  498. // Try to make branches even until either all branches are
  499. // full, or all tasks have been scheduled.
  500. for tasksScheduled != n && len(noRoom) != len(tree.next) {
  501. desiredTasksPerBranch := (tasksInUsableBranches + n - tasksScheduled) / (len(tree.next) - len(noRoom))
  502. remainder := (tasksInUsableBranches + n - tasksScheduled) % (len(tree.next) - len(noRoom))
  503. for _, subtree := range tree.next {
  504. if noRoom != nil {
  505. if _, ok := noRoom[subtree]; ok {
  506. continue
  507. }
  508. }
  509. subtreeTasks := subtree.tasks
  510. if subtreeTasks < desiredTasksPerBranch || (subtreeTasks == desiredTasksPerBranch && remainder > 0) {
  511. tasksToAssign := desiredTasksPerBranch - subtreeTasks
  512. if remainder > 0 {
  513. tasksToAssign++
  514. }
  515. res := s.scheduleNTasksOnSubtree(ctx, tasksToAssign, taskGroup, subtree, schedulingDecisions, nodeLess)
  516. if res < tasksToAssign {
  517. if noRoom == nil {
  518. noRoom = make(map[*decisionTree]struct{})
  519. }
  520. noRoom[subtree] = struct{}{}
  521. tasksInUsableBranches -= subtreeTasks
  522. } else if remainder > 0 {
  523. remainder--
  524. }
  525. tasksScheduled += res
  526. }
  527. }
  528. }
  529. return tasksScheduled
  530. }
  531. func (s *Scheduler) scheduleNTasksOnNodes(ctx context.Context, n int, taskGroup map[string]*api.Task, nodes []NodeInfo, schedulingDecisions map[string]schedulingDecision, nodeLess func(a *NodeInfo, b *NodeInfo) bool) int {
  532. tasksScheduled := 0
  533. failedConstraints := make(map[int]bool) // key is index in nodes slice
  534. nodeIter := 0
  535. nodeCount := len(nodes)
  536. for taskID, t := range taskGroup {
  537. // Skip tasks which were already scheduled because they ended
  538. // up in two groups at once.
  539. if _, exists := schedulingDecisions[taskID]; exists {
  540. continue
  541. }
  542. node := &nodes[nodeIter%nodeCount]
  543. log.G(ctx).WithField("task.id", t.ID).Debugf("assigning to node %s", node.ID)
  544. newT := *t
  545. newT.NodeID = node.ID
  546. newT.Status = api.TaskStatus{
  547. State: api.TaskStateAssigned,
  548. Timestamp: ptypes.MustTimestampProto(time.Now()),
  549. Message: "scheduler assigned task to node",
  550. }
  551. s.allTasks[t.ID] = &newT
  552. nodeInfo, err := s.nodeSet.nodeInfo(node.ID)
  553. if err == nil && nodeInfo.addTask(&newT) {
  554. s.nodeSet.updateNode(nodeInfo)
  555. nodes[nodeIter%nodeCount] = nodeInfo
  556. }
  557. schedulingDecisions[taskID] = schedulingDecision{old: t, new: &newT}
  558. delete(taskGroup, taskID)
  559. tasksScheduled++
  560. if tasksScheduled == n {
  561. return tasksScheduled
  562. }
  563. if nodeIter+1 < nodeCount {
  564. // First pass fills the nodes until they have the same
  565. // number of tasks from this service.
  566. nextNode := nodes[(nodeIter+1)%nodeCount]
  567. if nodeLess(&nextNode, &nodeInfo) {
  568. nodeIter++
  569. }
  570. } else {
  571. // In later passes, we just assign one task at a time
  572. // to each node that still meets the constraints.
  573. nodeIter++
  574. }
  575. origNodeIter := nodeIter
  576. for failedConstraints[nodeIter%nodeCount] || !s.pipeline.Process(&nodes[nodeIter%nodeCount]) {
  577. failedConstraints[nodeIter%nodeCount] = true
  578. nodeIter++
  579. if nodeIter-origNodeIter == nodeCount {
  580. // None of the nodes meet the constraints anymore.
  581. return tasksScheduled
  582. }
  583. }
  584. }
  585. return tasksScheduled
  586. }
  587. func (s *Scheduler) noSuitableNode(ctx context.Context, taskGroup map[string]*api.Task, schedulingDecisions map[string]schedulingDecision) {
  588. explanation := s.pipeline.Explain()
  589. for _, t := range taskGroup {
  590. log.G(ctx).WithField("task.id", t.ID).Debug("no suitable node available for task")
  591. newT := *t
  592. newT.Status.Timestamp = ptypes.MustTimestampProto(time.Now())
  593. if explanation != "" {
  594. newT.Status.Message = "no suitable node (" + explanation + ")"
  595. } else {
  596. newT.Status.Message = "no suitable node"
  597. }
  598. s.allTasks[t.ID] = &newT
  599. schedulingDecisions[t.ID] = schedulingDecision{old: t, new: &newT}
  600. s.enqueue(&newT)
  601. }
  602. }
  603. func (s *Scheduler) buildNodeSet(tx store.ReadTx, tasksByNode map[string]map[string]*api.Task) error {
  604. nodes, err := store.FindNodes(tx, store.All)
  605. if err != nil {
  606. return err
  607. }
  608. s.nodeSet.alloc(len(nodes))
  609. for _, n := range nodes {
  610. var resources api.Resources
  611. if n.Description != nil && n.Description.Resources != nil {
  612. resources = *n.Description.Resources
  613. }
  614. s.nodeSet.addOrUpdateNode(newNodeInfo(n, tasksByNode[n.ID], resources))
  615. }
  616. return nil
  617. }