scheduler.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735
  1. package scheduler
  2. import (
  3. "time"
  4. "github.com/docker/swarmkit/api"
  5. "github.com/docker/swarmkit/api/genericresource"
  6. "github.com/docker/swarmkit/log"
  7. "github.com/docker/swarmkit/manager/state"
  8. "github.com/docker/swarmkit/manager/state/store"
  9. "github.com/docker/swarmkit/protobuf/ptypes"
  10. "golang.org/x/net/context"
  11. )
  12. const (
  13. // monitorFailures is the lookback period for counting failures of
  14. // a task to determine if a node is faulty for a particular service.
  15. monitorFailures = 5 * time.Minute
  16. // maxFailures is the number of failures within monitorFailures that
  17. // triggers downweighting of a node in the sorting function.
  18. maxFailures = 5
  19. )
  20. type schedulingDecision struct {
  21. old *api.Task
  22. new *api.Task
  23. }
  24. // Scheduler assigns tasks to nodes.
  25. type Scheduler struct {
  26. store *store.MemoryStore
  27. unassignedTasks map[string]*api.Task
  28. // pendingPreassignedTasks already have NodeID, need resource validation
  29. pendingPreassignedTasks map[string]*api.Task
  30. // preassignedTasks tracks tasks that were preassigned, including those
  31. // past the pending state.
  32. preassignedTasks map[string]struct{}
  33. nodeSet nodeSet
  34. allTasks map[string]*api.Task
  35. pipeline *Pipeline
  36. // stopChan signals to the state machine to stop running
  37. stopChan chan struct{}
  38. // doneChan is closed when the state machine terminates
  39. doneChan chan struct{}
  40. }
  41. // New creates a new scheduler.
  42. func New(store *store.MemoryStore) *Scheduler {
  43. return &Scheduler{
  44. store: store,
  45. unassignedTasks: make(map[string]*api.Task),
  46. pendingPreassignedTasks: make(map[string]*api.Task),
  47. preassignedTasks: make(map[string]struct{}),
  48. allTasks: make(map[string]*api.Task),
  49. stopChan: make(chan struct{}),
  50. doneChan: make(chan struct{}),
  51. pipeline: NewPipeline(),
  52. }
  53. }
  54. func (s *Scheduler) setupTasksList(tx store.ReadTx) error {
  55. tasks, err := store.FindTasks(tx, store.All)
  56. if err != nil {
  57. return err
  58. }
  59. tasksByNode := make(map[string]map[string]*api.Task)
  60. for _, t := range tasks {
  61. // Ignore all tasks that have not reached PENDING
  62. // state and tasks that no longer consume resources.
  63. if t.Status.State < api.TaskStatePending || t.Status.State > api.TaskStateRunning {
  64. continue
  65. }
  66. s.allTasks[t.ID] = t
  67. if t.NodeID == "" {
  68. s.enqueue(t)
  69. continue
  70. }
  71. // preassigned tasks need to validate resource requirement on corresponding node
  72. if t.Status.State == api.TaskStatePending {
  73. s.preassignedTasks[t.ID] = struct{}{}
  74. s.pendingPreassignedTasks[t.ID] = t
  75. continue
  76. }
  77. if tasksByNode[t.NodeID] == nil {
  78. tasksByNode[t.NodeID] = make(map[string]*api.Task)
  79. }
  80. tasksByNode[t.NodeID][t.ID] = t
  81. }
  82. return s.buildNodeSet(tx, tasksByNode)
  83. }
  84. // Run is the scheduler event loop.
  85. func (s *Scheduler) Run(ctx context.Context) error {
  86. defer close(s.doneChan)
  87. updates, cancel, err := store.ViewAndWatch(s.store, s.setupTasksList)
  88. if err != nil {
  89. log.G(ctx).WithError(err).Errorf("snapshot store update failed")
  90. return err
  91. }
  92. defer cancel()
  93. // Validate resource for tasks from preassigned tasks
  94. // do this before other tasks because preassigned tasks like
  95. // global service should start before other tasks
  96. s.processPreassignedTasks(ctx)
  97. // Queue all unassigned tasks before processing changes.
  98. s.tick(ctx)
  99. const (
  100. // commitDebounceGap is the amount of time to wait between
  101. // commit events to debounce them.
  102. commitDebounceGap = 50 * time.Millisecond
  103. // maxLatency is a time limit on the debouncing.
  104. maxLatency = time.Second
  105. )
  106. var (
  107. debouncingStarted time.Time
  108. commitDebounceTimer *time.Timer
  109. commitDebounceTimeout <-chan time.Time
  110. )
  111. tickRequired := false
  112. schedule := func() {
  113. if len(s.pendingPreassignedTasks) > 0 {
  114. s.processPreassignedTasks(ctx)
  115. }
  116. if tickRequired {
  117. s.tick(ctx)
  118. tickRequired = false
  119. }
  120. }
  121. // Watch for changes.
  122. for {
  123. select {
  124. case event := <-updates:
  125. switch v := event.(type) {
  126. case api.EventCreateTask:
  127. if s.createTask(ctx, v.Task) {
  128. tickRequired = true
  129. }
  130. case api.EventUpdateTask:
  131. if s.updateTask(ctx, v.Task) {
  132. tickRequired = true
  133. }
  134. case api.EventDeleteTask:
  135. if s.deleteTask(v.Task) {
  136. // deleting tasks may free up node resource, pending tasks should be re-evaluated.
  137. tickRequired = true
  138. }
  139. case api.EventCreateNode:
  140. s.createOrUpdateNode(v.Node)
  141. tickRequired = true
  142. case api.EventUpdateNode:
  143. s.createOrUpdateNode(v.Node)
  144. tickRequired = true
  145. case api.EventDeleteNode:
  146. s.nodeSet.remove(v.Node.ID)
  147. case state.EventCommit:
  148. if commitDebounceTimer != nil {
  149. if time.Since(debouncingStarted) > maxLatency {
  150. commitDebounceTimer.Stop()
  151. commitDebounceTimer = nil
  152. commitDebounceTimeout = nil
  153. schedule()
  154. } else {
  155. commitDebounceTimer.Reset(commitDebounceGap)
  156. }
  157. } else {
  158. commitDebounceTimer = time.NewTimer(commitDebounceGap)
  159. commitDebounceTimeout = commitDebounceTimer.C
  160. debouncingStarted = time.Now()
  161. }
  162. }
  163. case <-commitDebounceTimeout:
  164. schedule()
  165. commitDebounceTimer = nil
  166. commitDebounceTimeout = nil
  167. case <-s.stopChan:
  168. return nil
  169. }
  170. }
  171. }
  172. // Stop causes the scheduler event loop to stop running.
  173. func (s *Scheduler) Stop() {
  174. close(s.stopChan)
  175. <-s.doneChan
  176. }
  177. // enqueue queues a task for scheduling.
  178. func (s *Scheduler) enqueue(t *api.Task) {
  179. s.unassignedTasks[t.ID] = t
  180. }
  181. func (s *Scheduler) createTask(ctx context.Context, t *api.Task) bool {
  182. // Ignore all tasks that have not reached PENDING
  183. // state, and tasks that no longer consume resources.
  184. if t.Status.State < api.TaskStatePending || t.Status.State > api.TaskStateRunning {
  185. return false
  186. }
  187. s.allTasks[t.ID] = t
  188. if t.NodeID == "" {
  189. // unassigned task
  190. s.enqueue(t)
  191. return true
  192. }
  193. if t.Status.State == api.TaskStatePending {
  194. s.preassignedTasks[t.ID] = struct{}{}
  195. s.pendingPreassignedTasks[t.ID] = t
  196. // preassigned tasks do not contribute to running tasks count
  197. return false
  198. }
  199. nodeInfo, err := s.nodeSet.nodeInfo(t.NodeID)
  200. if err == nil && nodeInfo.addTask(t) {
  201. s.nodeSet.updateNode(nodeInfo)
  202. }
  203. return false
  204. }
  205. func (s *Scheduler) updateTask(ctx context.Context, t *api.Task) bool {
  206. // Ignore all tasks that have not reached PENDING
  207. // state.
  208. if t.Status.State < api.TaskStatePending {
  209. return false
  210. }
  211. oldTask := s.allTasks[t.ID]
  212. // Ignore all tasks that have not reached Pending
  213. // state, and tasks that no longer consume resources.
  214. if t.Status.State > api.TaskStateRunning {
  215. if oldTask == nil {
  216. return false
  217. }
  218. if t.Status.State != oldTask.Status.State &&
  219. (t.Status.State == api.TaskStateFailed || t.Status.State == api.TaskStateRejected) {
  220. // Keep track of task failures, so other nodes can be preferred
  221. // for scheduling this service if it looks like the service is
  222. // failing in a loop on this node. However, skip this for
  223. // preassigned tasks, because the scheduler does not choose
  224. // which nodes those run on.
  225. if _, wasPreassigned := s.preassignedTasks[t.ID]; !wasPreassigned {
  226. nodeInfo, err := s.nodeSet.nodeInfo(t.NodeID)
  227. if err == nil {
  228. nodeInfo.taskFailed(ctx, t)
  229. s.nodeSet.updateNode(nodeInfo)
  230. }
  231. }
  232. }
  233. s.deleteTask(oldTask)
  234. return true
  235. }
  236. if t.NodeID == "" {
  237. // unassigned task
  238. if oldTask != nil {
  239. s.deleteTask(oldTask)
  240. }
  241. s.allTasks[t.ID] = t
  242. s.enqueue(t)
  243. return true
  244. }
  245. if t.Status.State == api.TaskStatePending {
  246. if oldTask != nil {
  247. s.deleteTask(oldTask)
  248. }
  249. s.preassignedTasks[t.ID] = struct{}{}
  250. s.allTasks[t.ID] = t
  251. s.pendingPreassignedTasks[t.ID] = t
  252. // preassigned tasks do not contribute to running tasks count
  253. return false
  254. }
  255. s.allTasks[t.ID] = t
  256. nodeInfo, err := s.nodeSet.nodeInfo(t.NodeID)
  257. if err == nil && nodeInfo.addTask(t) {
  258. s.nodeSet.updateNode(nodeInfo)
  259. }
  260. return false
  261. }
  262. func (s *Scheduler) deleteTask(t *api.Task) bool {
  263. delete(s.allTasks, t.ID)
  264. delete(s.preassignedTasks, t.ID)
  265. delete(s.pendingPreassignedTasks, t.ID)
  266. nodeInfo, err := s.nodeSet.nodeInfo(t.NodeID)
  267. if err == nil && nodeInfo.removeTask(t) {
  268. s.nodeSet.updateNode(nodeInfo)
  269. return true
  270. }
  271. return false
  272. }
  273. func (s *Scheduler) createOrUpdateNode(n *api.Node) {
  274. nodeInfo, nodeInfoErr := s.nodeSet.nodeInfo(n.ID)
  275. var resources *api.Resources
  276. if n.Description != nil && n.Description.Resources != nil {
  277. resources = n.Description.Resources.Copy()
  278. // reconcile resources by looping over all tasks in this node
  279. if nodeInfoErr == nil {
  280. for _, task := range nodeInfo.Tasks {
  281. reservations := taskReservations(task.Spec)
  282. resources.MemoryBytes -= reservations.MemoryBytes
  283. resources.NanoCPUs -= reservations.NanoCPUs
  284. genericresource.ConsumeNodeResources(&resources.Generic,
  285. task.AssignedGenericResources)
  286. }
  287. }
  288. } else {
  289. resources = &api.Resources{}
  290. }
  291. if nodeInfoErr != nil {
  292. nodeInfo = newNodeInfo(n, nil, *resources)
  293. } else {
  294. nodeInfo.Node = n
  295. nodeInfo.AvailableResources = resources
  296. }
  297. s.nodeSet.addOrUpdateNode(nodeInfo)
  298. }
  299. func (s *Scheduler) processPreassignedTasks(ctx context.Context) {
  300. schedulingDecisions := make(map[string]schedulingDecision, len(s.pendingPreassignedTasks))
  301. for _, t := range s.pendingPreassignedTasks {
  302. newT := s.taskFitNode(ctx, t, t.NodeID)
  303. if newT == nil {
  304. continue
  305. }
  306. schedulingDecisions[t.ID] = schedulingDecision{old: t, new: newT}
  307. }
  308. successful, failed := s.applySchedulingDecisions(ctx, schedulingDecisions)
  309. for _, decision := range successful {
  310. if decision.new.Status.State == api.TaskStateAssigned {
  311. delete(s.pendingPreassignedTasks, decision.old.ID)
  312. }
  313. }
  314. for _, decision := range failed {
  315. s.allTasks[decision.old.ID] = decision.old
  316. nodeInfo, err := s.nodeSet.nodeInfo(decision.new.NodeID)
  317. if err == nil && nodeInfo.removeTask(decision.new) {
  318. s.nodeSet.updateNode(nodeInfo)
  319. }
  320. }
  321. }
  322. // tick attempts to schedule the queue.
  323. func (s *Scheduler) tick(ctx context.Context) {
  324. type commonSpecKey struct {
  325. serviceID string
  326. specVersion api.Version
  327. }
  328. tasksByCommonSpec := make(map[commonSpecKey]map[string]*api.Task)
  329. var oneOffTasks []*api.Task
  330. schedulingDecisions := make(map[string]schedulingDecision, len(s.unassignedTasks))
  331. for taskID, t := range s.unassignedTasks {
  332. if t == nil || t.NodeID != "" {
  333. // task deleted or already assigned
  334. delete(s.unassignedTasks, taskID)
  335. continue
  336. }
  337. // Group tasks with common specs
  338. if t.SpecVersion != nil {
  339. taskGroupKey := commonSpecKey{
  340. serviceID: t.ServiceID,
  341. specVersion: *t.SpecVersion,
  342. }
  343. if tasksByCommonSpec[taskGroupKey] == nil {
  344. tasksByCommonSpec[taskGroupKey] = make(map[string]*api.Task)
  345. }
  346. tasksByCommonSpec[taskGroupKey][taskID] = t
  347. } else {
  348. // This task doesn't have a spec version. We have to
  349. // schedule it as a one-off.
  350. oneOffTasks = append(oneOffTasks, t)
  351. }
  352. delete(s.unassignedTasks, taskID)
  353. }
  354. for _, taskGroup := range tasksByCommonSpec {
  355. s.scheduleTaskGroup(ctx, taskGroup, schedulingDecisions)
  356. }
  357. for _, t := range oneOffTasks {
  358. s.scheduleTaskGroup(ctx, map[string]*api.Task{t.ID: t}, schedulingDecisions)
  359. }
  360. _, failed := s.applySchedulingDecisions(ctx, schedulingDecisions)
  361. for _, decision := range failed {
  362. s.allTasks[decision.old.ID] = decision.old
  363. nodeInfo, err := s.nodeSet.nodeInfo(decision.new.NodeID)
  364. if err == nil && nodeInfo.removeTask(decision.new) {
  365. s.nodeSet.updateNode(nodeInfo)
  366. }
  367. // enqueue task for next scheduling attempt
  368. s.enqueue(decision.old)
  369. }
  370. }
  371. func (s *Scheduler) applySchedulingDecisions(ctx context.Context, schedulingDecisions map[string]schedulingDecision) (successful, failed []schedulingDecision) {
  372. if len(schedulingDecisions) == 0 {
  373. return
  374. }
  375. successful = make([]schedulingDecision, 0, len(schedulingDecisions))
  376. // Apply changes to master store
  377. err := s.store.Batch(func(batch *store.Batch) error {
  378. for len(schedulingDecisions) > 0 {
  379. err := batch.Update(func(tx store.Tx) error {
  380. // Update exactly one task inside this Update
  381. // callback.
  382. for taskID, decision := range schedulingDecisions {
  383. delete(schedulingDecisions, taskID)
  384. t := store.GetTask(tx, taskID)
  385. if t == nil {
  386. // Task no longer exists
  387. s.deleteTask(decision.new)
  388. continue
  389. }
  390. if t.Status.State == decision.new.Status.State &&
  391. t.Status.Message == decision.new.Status.Message &&
  392. t.Status.Err == decision.new.Status.Err {
  393. // No changes, ignore
  394. continue
  395. }
  396. if t.Status.State >= api.TaskStateAssigned {
  397. nodeInfo, err := s.nodeSet.nodeInfo(decision.new.NodeID)
  398. if err != nil {
  399. failed = append(failed, decision)
  400. continue
  401. }
  402. node := store.GetNode(tx, decision.new.NodeID)
  403. if node == nil || node.Meta.Version != nodeInfo.Meta.Version {
  404. // node is out of date
  405. failed = append(failed, decision)
  406. continue
  407. }
  408. }
  409. if err := store.UpdateTask(tx, decision.new); err != nil {
  410. log.G(ctx).Debugf("scheduler failed to update task %s; will retry", taskID)
  411. failed = append(failed, decision)
  412. continue
  413. }
  414. successful = append(successful, decision)
  415. return nil
  416. }
  417. return nil
  418. })
  419. if err != nil {
  420. return err
  421. }
  422. }
  423. return nil
  424. })
  425. if err != nil {
  426. log.G(ctx).WithError(err).Error("scheduler tick transaction failed")
  427. failed = append(failed, successful...)
  428. successful = nil
  429. }
  430. return
  431. }
  432. // taskFitNode checks if a node has enough resources to accommodate a task.
  433. func (s *Scheduler) taskFitNode(ctx context.Context, t *api.Task, nodeID string) *api.Task {
  434. nodeInfo, err := s.nodeSet.nodeInfo(nodeID)
  435. if err != nil {
  436. // node does not exist in set (it may have been deleted)
  437. return nil
  438. }
  439. newT := *t
  440. s.pipeline.SetTask(t)
  441. if !s.pipeline.Process(&nodeInfo) {
  442. // this node cannot accommodate this task
  443. newT.Status.Timestamp = ptypes.MustTimestampProto(time.Now())
  444. newT.Status.Err = s.pipeline.Explain()
  445. s.allTasks[t.ID] = &newT
  446. return &newT
  447. }
  448. newT.Status = api.TaskStatus{
  449. State: api.TaskStateAssigned,
  450. Timestamp: ptypes.MustTimestampProto(time.Now()),
  451. Message: "scheduler confirmed task can run on preassigned node",
  452. }
  453. s.allTasks[t.ID] = &newT
  454. if nodeInfo.addTask(&newT) {
  455. s.nodeSet.updateNode(nodeInfo)
  456. }
  457. return &newT
  458. }
  459. // scheduleTaskGroup schedules a batch of tasks that are part of the same
  460. // service and share the same version of the spec.
  461. func (s *Scheduler) scheduleTaskGroup(ctx context.Context, taskGroup map[string]*api.Task, schedulingDecisions map[string]schedulingDecision) {
  462. // Pick at task at random from taskGroup to use for constraint
  463. // evaluation. It doesn't matter which one we pick because all the
  464. // tasks in the group are equal in terms of the fields the constraint
  465. // filters consider.
  466. var t *api.Task
  467. for _, t = range taskGroup {
  468. break
  469. }
  470. s.pipeline.SetTask(t)
  471. now := time.Now()
  472. nodeLess := func(a *NodeInfo, b *NodeInfo) bool {
  473. // If either node has at least maxFailures recent failures,
  474. // that's the deciding factor.
  475. recentFailuresA := a.countRecentFailures(now, t)
  476. recentFailuresB := b.countRecentFailures(now, t)
  477. if recentFailuresA >= maxFailures || recentFailuresB >= maxFailures {
  478. if recentFailuresA > recentFailuresB {
  479. return false
  480. }
  481. if recentFailuresB > recentFailuresA {
  482. return true
  483. }
  484. }
  485. tasksByServiceA := a.ActiveTasksCountByService[t.ServiceID]
  486. tasksByServiceB := b.ActiveTasksCountByService[t.ServiceID]
  487. if tasksByServiceA < tasksByServiceB {
  488. return true
  489. }
  490. if tasksByServiceA > tasksByServiceB {
  491. return false
  492. }
  493. // Total number of tasks breaks ties.
  494. return a.ActiveTasksCount < b.ActiveTasksCount
  495. }
  496. var prefs []*api.PlacementPreference
  497. if t.Spec.Placement != nil {
  498. prefs = t.Spec.Placement.Preferences
  499. }
  500. tree := s.nodeSet.tree(t.ServiceID, prefs, len(taskGroup), s.pipeline.Process, nodeLess)
  501. s.scheduleNTasksOnSubtree(ctx, len(taskGroup), taskGroup, &tree, schedulingDecisions, nodeLess)
  502. if len(taskGroup) != 0 {
  503. s.noSuitableNode(ctx, taskGroup, schedulingDecisions)
  504. }
  505. }
  506. func (s *Scheduler) scheduleNTasksOnSubtree(ctx context.Context, n int, taskGroup map[string]*api.Task, tree *decisionTree, schedulingDecisions map[string]schedulingDecision, nodeLess func(a *NodeInfo, b *NodeInfo) bool) int {
  507. if tree.next == nil {
  508. nodes := tree.orderedNodes(s.pipeline.Process, nodeLess)
  509. if len(nodes) == 0 {
  510. return 0
  511. }
  512. return s.scheduleNTasksOnNodes(ctx, n, taskGroup, nodes, schedulingDecisions, nodeLess)
  513. }
  514. // Walk the tree and figure out how the tasks should be split at each
  515. // level.
  516. tasksScheduled := 0
  517. tasksInUsableBranches := tree.tasks
  518. var noRoom map[*decisionTree]struct{}
  519. // Try to make branches even until either all branches are
  520. // full, or all tasks have been scheduled.
  521. for tasksScheduled != n && len(noRoom) != len(tree.next) {
  522. desiredTasksPerBranch := (tasksInUsableBranches + n - tasksScheduled) / (len(tree.next) - len(noRoom))
  523. remainder := (tasksInUsableBranches + n - tasksScheduled) % (len(tree.next) - len(noRoom))
  524. for _, subtree := range tree.next {
  525. if noRoom != nil {
  526. if _, ok := noRoom[subtree]; ok {
  527. continue
  528. }
  529. }
  530. subtreeTasks := subtree.tasks
  531. if subtreeTasks < desiredTasksPerBranch || (subtreeTasks == desiredTasksPerBranch && remainder > 0) {
  532. tasksToAssign := desiredTasksPerBranch - subtreeTasks
  533. if remainder > 0 {
  534. tasksToAssign++
  535. }
  536. res := s.scheduleNTasksOnSubtree(ctx, tasksToAssign, taskGroup, subtree, schedulingDecisions, nodeLess)
  537. if res < tasksToAssign {
  538. if noRoom == nil {
  539. noRoom = make(map[*decisionTree]struct{})
  540. }
  541. noRoom[subtree] = struct{}{}
  542. tasksInUsableBranches -= subtreeTasks
  543. } else if remainder > 0 {
  544. remainder--
  545. }
  546. tasksScheduled += res
  547. }
  548. }
  549. }
  550. return tasksScheduled
  551. }
  552. func (s *Scheduler) scheduleNTasksOnNodes(ctx context.Context, n int, taskGroup map[string]*api.Task, nodes []NodeInfo, schedulingDecisions map[string]schedulingDecision, nodeLess func(a *NodeInfo, b *NodeInfo) bool) int {
  553. tasksScheduled := 0
  554. failedConstraints := make(map[int]bool) // key is index in nodes slice
  555. nodeIter := 0
  556. nodeCount := len(nodes)
  557. for taskID, t := range taskGroup {
  558. // Skip tasks which were already scheduled because they ended
  559. // up in two groups at once.
  560. if _, exists := schedulingDecisions[taskID]; exists {
  561. continue
  562. }
  563. node := &nodes[nodeIter%nodeCount]
  564. log.G(ctx).WithField("task.id", t.ID).Debugf("assigning to node %s", node.ID)
  565. newT := *t
  566. newT.NodeID = node.ID
  567. newT.Status = api.TaskStatus{
  568. State: api.TaskStateAssigned,
  569. Timestamp: ptypes.MustTimestampProto(time.Now()),
  570. Message: "scheduler assigned task to node",
  571. }
  572. s.allTasks[t.ID] = &newT
  573. nodeInfo, err := s.nodeSet.nodeInfo(node.ID)
  574. if err == nil && nodeInfo.addTask(&newT) {
  575. s.nodeSet.updateNode(nodeInfo)
  576. nodes[nodeIter%nodeCount] = nodeInfo
  577. }
  578. schedulingDecisions[taskID] = schedulingDecision{old: t, new: &newT}
  579. delete(taskGroup, taskID)
  580. tasksScheduled++
  581. if tasksScheduled == n {
  582. return tasksScheduled
  583. }
  584. if nodeIter+1 < nodeCount {
  585. // First pass fills the nodes until they have the same
  586. // number of tasks from this service.
  587. nextNode := nodes[(nodeIter+1)%nodeCount]
  588. if nodeLess(&nextNode, &nodeInfo) {
  589. nodeIter++
  590. }
  591. } else {
  592. // In later passes, we just assign one task at a time
  593. // to each node that still meets the constraints.
  594. nodeIter++
  595. }
  596. origNodeIter := nodeIter
  597. for failedConstraints[nodeIter%nodeCount] || !s.pipeline.Process(&nodes[nodeIter%nodeCount]) {
  598. failedConstraints[nodeIter%nodeCount] = true
  599. nodeIter++
  600. if nodeIter-origNodeIter == nodeCount {
  601. // None of the nodes meet the constraints anymore.
  602. return tasksScheduled
  603. }
  604. }
  605. }
  606. return tasksScheduled
  607. }
  608. func (s *Scheduler) noSuitableNode(ctx context.Context, taskGroup map[string]*api.Task, schedulingDecisions map[string]schedulingDecision) {
  609. explanation := s.pipeline.Explain()
  610. for _, t := range taskGroup {
  611. log.G(ctx).WithField("task.id", t.ID).Debug("no suitable node available for task")
  612. newT := *t
  613. newT.Status.Timestamp = ptypes.MustTimestampProto(time.Now())
  614. if explanation != "" {
  615. newT.Status.Err = "no suitable node (" + explanation + ")"
  616. } else {
  617. newT.Status.Err = "no suitable node"
  618. }
  619. s.allTasks[t.ID] = &newT
  620. schedulingDecisions[t.ID] = schedulingDecision{old: t, new: &newT}
  621. s.enqueue(&newT)
  622. }
  623. }
  624. func (s *Scheduler) buildNodeSet(tx store.ReadTx, tasksByNode map[string]map[string]*api.Task) error {
  625. nodes, err := store.FindNodes(tx, store.All)
  626. if err != nil {
  627. return err
  628. }
  629. s.nodeSet.alloc(len(nodes))
  630. for _, n := range nodes {
  631. var resources api.Resources
  632. if n.Description != nil && n.Description.Resources != nil {
  633. resources = *n.Description.Resources
  634. }
  635. s.nodeSet.addOrUpdateNode(newNodeInfo(n, tasksByNode[n.ID], resources))
  636. }
  637. return nil
  638. }