123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710 |
- package scheduler
- import (
- "time"
- "github.com/docker/swarmkit/api"
- "github.com/docker/swarmkit/log"
- "github.com/docker/swarmkit/manager/state"
- "github.com/docker/swarmkit/manager/state/store"
- "github.com/docker/swarmkit/protobuf/ptypes"
- "golang.org/x/net/context"
- )
- const (
- // monitorFailures is the lookback period for counting failures of
- // a task to determine if a node is faulty for a particular service.
- monitorFailures = 5 * time.Minute
- // maxFailures is the number of failures within monitorFailures that
- // triggers downweighting of a node in the sorting function.
- maxFailures = 5
- )
- type schedulingDecision struct {
- old *api.Task
- new *api.Task
- }
- // Scheduler assigns tasks to nodes.
- type Scheduler struct {
- store *store.MemoryStore
- unassignedTasks map[string]*api.Task
- // preassignedTasks already have NodeID, need resource validation
- preassignedTasks map[string]*api.Task
- nodeSet nodeSet
- allTasks map[string]*api.Task
- pipeline *Pipeline
- // stopChan signals to the state machine to stop running
- stopChan chan struct{}
- // doneChan is closed when the state machine terminates
- doneChan chan struct{}
- }
- // New creates a new scheduler.
- func New(store *store.MemoryStore) *Scheduler {
- return &Scheduler{
- store: store,
- unassignedTasks: make(map[string]*api.Task),
- preassignedTasks: make(map[string]*api.Task),
- allTasks: make(map[string]*api.Task),
- stopChan: make(chan struct{}),
- doneChan: make(chan struct{}),
- pipeline: NewPipeline(),
- }
- }
- func (s *Scheduler) setupTasksList(tx store.ReadTx) error {
- tasks, err := store.FindTasks(tx, store.All)
- if err != nil {
- return err
- }
- tasksByNode := make(map[string]map[string]*api.Task)
- for _, t := range tasks {
- // Ignore all tasks that have not reached PENDING
- // state and tasks that no longer consume resources.
- if t.Status.State < api.TaskStatePending || t.Status.State > api.TaskStateRunning {
- continue
- }
- s.allTasks[t.ID] = t
- if t.NodeID == "" {
- s.enqueue(t)
- continue
- }
- // preassigned tasks need to validate resource requirement on corresponding node
- if t.Status.State == api.TaskStatePending {
- s.preassignedTasks[t.ID] = t
- continue
- }
- if tasksByNode[t.NodeID] == nil {
- tasksByNode[t.NodeID] = make(map[string]*api.Task)
- }
- tasksByNode[t.NodeID][t.ID] = t
- }
- if err := s.buildNodeSet(tx, tasksByNode); err != nil {
- return err
- }
- return nil
- }
- // Run is the scheduler event loop.
- func (s *Scheduler) Run(ctx context.Context) error {
- defer close(s.doneChan)
- updates, cancel, err := store.ViewAndWatch(s.store, s.setupTasksList)
- if err != nil {
- log.G(ctx).WithError(err).Errorf("snapshot store update failed")
- return err
- }
- defer cancel()
- // Validate resource for tasks from preassigned tasks
- // do this before other tasks because preassigned tasks like
- // global service should start before other tasks
- s.processPreassignedTasks(ctx)
- // Queue all unassigned tasks before processing changes.
- s.tick(ctx)
- const (
- // commitDebounceGap is the amount of time to wait between
- // commit events to debounce them.
- commitDebounceGap = 50 * time.Millisecond
- // maxLatency is a time limit on the debouncing.
- maxLatency = time.Second
- )
- var (
- debouncingStarted time.Time
- commitDebounceTimer *time.Timer
- commitDebounceTimeout <-chan time.Time
- )
- tickRequired := false
- schedule := func() {
- if len(s.preassignedTasks) > 0 {
- s.processPreassignedTasks(ctx)
- }
- if tickRequired {
- s.tick(ctx)
- tickRequired = false
- }
- }
- // Watch for changes.
- for {
- select {
- case event := <-updates:
- switch v := event.(type) {
- case api.EventCreateTask:
- if s.createTask(ctx, v.Task) {
- tickRequired = true
- }
- case api.EventUpdateTask:
- if s.updateTask(ctx, v.Task) {
- tickRequired = true
- }
- case api.EventDeleteTask:
- if s.deleteTask(ctx, v.Task) {
- // deleting tasks may free up node resource, pending tasks should be re-evaluated.
- tickRequired = true
- }
- case api.EventCreateNode:
- s.createOrUpdateNode(v.Node)
- tickRequired = true
- case api.EventUpdateNode:
- s.createOrUpdateNode(v.Node)
- tickRequired = true
- case api.EventDeleteNode:
- s.nodeSet.remove(v.Node.ID)
- case state.EventCommit:
- if commitDebounceTimer != nil {
- if time.Since(debouncingStarted) > maxLatency {
- commitDebounceTimer.Stop()
- commitDebounceTimer = nil
- commitDebounceTimeout = nil
- schedule()
- } else {
- commitDebounceTimer.Reset(commitDebounceGap)
- }
- } else {
- commitDebounceTimer = time.NewTimer(commitDebounceGap)
- commitDebounceTimeout = commitDebounceTimer.C
- debouncingStarted = time.Now()
- }
- }
- case <-commitDebounceTimeout:
- schedule()
- commitDebounceTimer = nil
- commitDebounceTimeout = nil
- case <-s.stopChan:
- return nil
- }
- }
- }
- // Stop causes the scheduler event loop to stop running.
- func (s *Scheduler) Stop() {
- close(s.stopChan)
- <-s.doneChan
- }
- // enqueue queues a task for scheduling.
- func (s *Scheduler) enqueue(t *api.Task) {
- s.unassignedTasks[t.ID] = t
- }
- func (s *Scheduler) createTask(ctx context.Context, t *api.Task) bool {
- // Ignore all tasks that have not reached PENDING
- // state, and tasks that no longer consume resources.
- if t.Status.State < api.TaskStatePending || t.Status.State > api.TaskStateRunning {
- return false
- }
- s.allTasks[t.ID] = t
- if t.NodeID == "" {
- // unassigned task
- s.enqueue(t)
- return true
- }
- if t.Status.State == api.TaskStatePending {
- s.preassignedTasks[t.ID] = t
- // preassigned tasks do not contribute to running tasks count
- return false
- }
- nodeInfo, err := s.nodeSet.nodeInfo(t.NodeID)
- if err == nil && nodeInfo.addTask(t) {
- s.nodeSet.updateNode(nodeInfo)
- }
- return false
- }
- func (s *Scheduler) updateTask(ctx context.Context, t *api.Task) bool {
- // Ignore all tasks that have not reached PENDING
- // state.
- if t.Status.State < api.TaskStatePending {
- return false
- }
- oldTask := s.allTasks[t.ID]
- // Ignore all tasks that have not reached Pending
- // state, and tasks that no longer consume resources.
- if t.Status.State > api.TaskStateRunning {
- if oldTask == nil {
- return false
- }
- s.deleteTask(ctx, oldTask)
- if t.Status.State != oldTask.Status.State &&
- (t.Status.State == api.TaskStateFailed || t.Status.State == api.TaskStateRejected) {
- nodeInfo, err := s.nodeSet.nodeInfo(t.NodeID)
- if err == nil {
- nodeInfo.taskFailed(ctx, t.ServiceID)
- s.nodeSet.updateNode(nodeInfo)
- }
- }
- return true
- }
- if t.NodeID == "" {
- // unassigned task
- if oldTask != nil {
- s.deleteTask(ctx, oldTask)
- }
- s.allTasks[t.ID] = t
- s.enqueue(t)
- return true
- }
- if t.Status.State == api.TaskStatePending {
- if oldTask != nil {
- s.deleteTask(ctx, oldTask)
- }
- s.allTasks[t.ID] = t
- s.preassignedTasks[t.ID] = t
- // preassigned tasks do not contribute to running tasks count
- return false
- }
- s.allTasks[t.ID] = t
- nodeInfo, err := s.nodeSet.nodeInfo(t.NodeID)
- if err == nil && nodeInfo.addTask(t) {
- s.nodeSet.updateNode(nodeInfo)
- }
- return false
- }
- func (s *Scheduler) deleteTask(ctx context.Context, t *api.Task) bool {
- delete(s.allTasks, t.ID)
- delete(s.preassignedTasks, t.ID)
- nodeInfo, err := s.nodeSet.nodeInfo(t.NodeID)
- if err == nil && nodeInfo.removeTask(t) {
- s.nodeSet.updateNode(nodeInfo)
- return true
- }
- return false
- }
- func (s *Scheduler) createOrUpdateNode(n *api.Node) {
- nodeInfo, _ := s.nodeSet.nodeInfo(n.ID)
- var resources api.Resources
- if n.Description != nil && n.Description.Resources != nil {
- resources = *n.Description.Resources
- // reconcile resources by looping over all tasks in this node
- for _, task := range nodeInfo.Tasks {
- reservations := taskReservations(task.Spec)
- resources.MemoryBytes -= reservations.MemoryBytes
- resources.NanoCPUs -= reservations.NanoCPUs
- }
- }
- nodeInfo.Node = n
- nodeInfo.AvailableResources = resources
- s.nodeSet.addOrUpdateNode(nodeInfo)
- }
- func (s *Scheduler) processPreassignedTasks(ctx context.Context) {
- schedulingDecisions := make(map[string]schedulingDecision, len(s.preassignedTasks))
- for _, t := range s.preassignedTasks {
- newT := s.taskFitNode(ctx, t, t.NodeID)
- if newT == nil {
- continue
- }
- schedulingDecisions[t.ID] = schedulingDecision{old: t, new: newT}
- }
- successful, failed := s.applySchedulingDecisions(ctx, schedulingDecisions)
- for _, decision := range successful {
- if decision.new.Status.State == api.TaskStateAssigned {
- delete(s.preassignedTasks, decision.old.ID)
- }
- }
- for _, decision := range failed {
- s.allTasks[decision.old.ID] = decision.old
- nodeInfo, err := s.nodeSet.nodeInfo(decision.new.NodeID)
- if err == nil && nodeInfo.removeTask(decision.new) {
- s.nodeSet.updateNode(nodeInfo)
- }
- }
- }
- // tick attempts to schedule the queue.
- func (s *Scheduler) tick(ctx context.Context) {
- type commonSpecKey struct {
- serviceID string
- specVersion api.Version
- }
- tasksByCommonSpec := make(map[commonSpecKey]map[string]*api.Task)
- var oneOffTasks []*api.Task
- schedulingDecisions := make(map[string]schedulingDecision, len(s.unassignedTasks))
- for taskID, t := range s.unassignedTasks {
- if t == nil || t.NodeID != "" {
- // task deleted or already assigned
- delete(s.unassignedTasks, taskID)
- continue
- }
- // Group tasks with common specs
- if t.SpecVersion != nil {
- taskGroupKey := commonSpecKey{
- serviceID: t.ServiceID,
- specVersion: *t.SpecVersion,
- }
- if tasksByCommonSpec[taskGroupKey] == nil {
- tasksByCommonSpec[taskGroupKey] = make(map[string]*api.Task)
- }
- tasksByCommonSpec[taskGroupKey][taskID] = t
- } else {
- // This task doesn't have a spec version. We have to
- // schedule it as a one-off.
- oneOffTasks = append(oneOffTasks, t)
- }
- delete(s.unassignedTasks, taskID)
- }
- for _, taskGroup := range tasksByCommonSpec {
- s.scheduleTaskGroup(ctx, taskGroup, schedulingDecisions)
- }
- for _, t := range oneOffTasks {
- s.scheduleTaskGroup(ctx, map[string]*api.Task{t.ID: t}, schedulingDecisions)
- }
- _, failed := s.applySchedulingDecisions(ctx, schedulingDecisions)
- for _, decision := range failed {
- s.allTasks[decision.old.ID] = decision.old
- nodeInfo, err := s.nodeSet.nodeInfo(decision.new.NodeID)
- if err == nil && nodeInfo.removeTask(decision.new) {
- s.nodeSet.updateNode(nodeInfo)
- }
- // enqueue task for next scheduling attempt
- s.enqueue(decision.old)
- }
- }
- func (s *Scheduler) applySchedulingDecisions(ctx context.Context, schedulingDecisions map[string]schedulingDecision) (successful, failed []schedulingDecision) {
- if len(schedulingDecisions) == 0 {
- return
- }
- successful = make([]schedulingDecision, 0, len(schedulingDecisions))
- // Apply changes to master store
- err := s.store.Batch(func(batch *store.Batch) error {
- for len(schedulingDecisions) > 0 {
- err := batch.Update(func(tx store.Tx) error {
- // Update exactly one task inside this Update
- // callback.
- for taskID, decision := range schedulingDecisions {
- delete(schedulingDecisions, taskID)
- t := store.GetTask(tx, taskID)
- if t == nil {
- // Task no longer exists
- nodeInfo, err := s.nodeSet.nodeInfo(decision.new.NodeID)
- if err == nil && nodeInfo.removeTask(decision.new) {
- s.nodeSet.updateNode(nodeInfo)
- }
- delete(s.allTasks, decision.old.ID)
- continue
- }
- if t.Status.State == decision.new.Status.State && t.Status.Message == decision.new.Status.Message {
- // No changes, ignore
- continue
- }
- if t.Status.State >= api.TaskStateAssigned {
- nodeInfo, err := s.nodeSet.nodeInfo(decision.new.NodeID)
- if err != nil {
- failed = append(failed, decision)
- continue
- }
- node := store.GetNode(tx, decision.new.NodeID)
- if node == nil || node.Meta.Version != nodeInfo.Meta.Version {
- // node is out of date
- failed = append(failed, decision)
- continue
- }
- }
- if err := store.UpdateTask(tx, decision.new); err != nil {
- log.G(ctx).Debugf("scheduler failed to update task %s; will retry", taskID)
- failed = append(failed, decision)
- continue
- }
- successful = append(successful, decision)
- return nil
- }
- return nil
- })
- if err != nil {
- return err
- }
- }
- return nil
- })
- if err != nil {
- log.G(ctx).WithError(err).Error("scheduler tick transaction failed")
- failed = append(failed, successful...)
- successful = nil
- }
- return
- }
- // taskFitNode checks if a node has enough resources to accommodate a task.
- func (s *Scheduler) taskFitNode(ctx context.Context, t *api.Task, nodeID string) *api.Task {
- nodeInfo, err := s.nodeSet.nodeInfo(nodeID)
- if err != nil {
- // node does not exist in set (it may have been deleted)
- return nil
- }
- newT := *t
- s.pipeline.SetTask(t)
- if !s.pipeline.Process(&nodeInfo) {
- // this node cannot accommodate this task
- newT.Status.Timestamp = ptypes.MustTimestampProto(time.Now())
- newT.Status.Message = s.pipeline.Explain()
- s.allTasks[t.ID] = &newT
- return &newT
- }
- newT.Status = api.TaskStatus{
- State: api.TaskStateAssigned,
- Timestamp: ptypes.MustTimestampProto(time.Now()),
- Message: "scheduler confirmed task can run on preassigned node",
- }
- s.allTasks[t.ID] = &newT
- if nodeInfo.addTask(&newT) {
- s.nodeSet.updateNode(nodeInfo)
- }
- return &newT
- }
- // scheduleTaskGroup schedules a batch of tasks that are part of the same
- // service and share the same version of the spec.
- func (s *Scheduler) scheduleTaskGroup(ctx context.Context, taskGroup map[string]*api.Task, schedulingDecisions map[string]schedulingDecision) {
- // Pick at task at random from taskGroup to use for constraint
- // evaluation. It doesn't matter which one we pick because all the
- // tasks in the group are equal in terms of the fields the constraint
- // filters consider.
- var t *api.Task
- for _, t = range taskGroup {
- break
- }
- s.pipeline.SetTask(t)
- now := time.Now()
- nodeLess := func(a *NodeInfo, b *NodeInfo) bool {
- // If either node has at least maxFailures recent failures,
- // that's the deciding factor.
- recentFailuresA := a.countRecentFailures(now, t.ServiceID)
- recentFailuresB := b.countRecentFailures(now, t.ServiceID)
- if recentFailuresA >= maxFailures || recentFailuresB >= maxFailures {
- if recentFailuresA > recentFailuresB {
- return false
- }
- if recentFailuresB > recentFailuresA {
- return true
- }
- }
- tasksByServiceA := a.ActiveTasksCountByService[t.ServiceID]
- tasksByServiceB := b.ActiveTasksCountByService[t.ServiceID]
- if tasksByServiceA < tasksByServiceB {
- return true
- }
- if tasksByServiceA > tasksByServiceB {
- return false
- }
- // Total number of tasks breaks ties.
- return a.ActiveTasksCount < b.ActiveTasksCount
- }
- var prefs []*api.PlacementPreference
- if t.Spec.Placement != nil {
- prefs = t.Spec.Placement.Preferences
- }
- tree := s.nodeSet.tree(t.ServiceID, prefs, len(taskGroup), s.pipeline.Process, nodeLess)
- s.scheduleNTasksOnSubtree(ctx, len(taskGroup), taskGroup, &tree, schedulingDecisions, nodeLess)
- if len(taskGroup) != 0 {
- s.noSuitableNode(ctx, taskGroup, schedulingDecisions)
- }
- }
- func (s *Scheduler) scheduleNTasksOnSubtree(ctx context.Context, n int, taskGroup map[string]*api.Task, tree *decisionTree, schedulingDecisions map[string]schedulingDecision, nodeLess func(a *NodeInfo, b *NodeInfo) bool) int {
- if tree.next == nil {
- nodes := tree.orderedNodes(s.pipeline.Process, nodeLess)
- if len(nodes) == 0 {
- return 0
- }
- return s.scheduleNTasksOnNodes(ctx, n, taskGroup, nodes, schedulingDecisions, nodeLess)
- }
- // Walk the tree and figure out how the tasks should be split at each
- // level.
- tasksScheduled := 0
- tasksInUsableBranches := tree.tasks
- var noRoom map[*decisionTree]struct{}
- // Try to make branches even until either all branches are
- // full, or all tasks have been scheduled.
- for tasksScheduled != n && len(noRoom) != len(tree.next) {
- desiredTasksPerBranch := (tasksInUsableBranches + n - tasksScheduled) / (len(tree.next) - len(noRoom))
- remainder := (tasksInUsableBranches + n - tasksScheduled) % (len(tree.next) - len(noRoom))
- for _, subtree := range tree.next {
- if noRoom != nil {
- if _, ok := noRoom[subtree]; ok {
- continue
- }
- }
- subtreeTasks := subtree.tasks
- if subtreeTasks < desiredTasksPerBranch || (subtreeTasks == desiredTasksPerBranch && remainder > 0) {
- tasksToAssign := desiredTasksPerBranch - subtreeTasks
- if remainder > 0 {
- tasksToAssign++
- }
- res := s.scheduleNTasksOnSubtree(ctx, tasksToAssign, taskGroup, subtree, schedulingDecisions, nodeLess)
- if res < tasksToAssign {
- if noRoom == nil {
- noRoom = make(map[*decisionTree]struct{})
- }
- noRoom[subtree] = struct{}{}
- tasksInUsableBranches -= subtreeTasks
- } else if remainder > 0 {
- remainder--
- }
- tasksScheduled += res
- }
- }
- }
- return tasksScheduled
- }
- func (s *Scheduler) scheduleNTasksOnNodes(ctx context.Context, n int, taskGroup map[string]*api.Task, nodes []NodeInfo, schedulingDecisions map[string]schedulingDecision, nodeLess func(a *NodeInfo, b *NodeInfo) bool) int {
- tasksScheduled := 0
- failedConstraints := make(map[int]bool) // key is index in nodes slice
- nodeIter := 0
- nodeCount := len(nodes)
- for taskID, t := range taskGroup {
- // Skip tasks which were already scheduled because they ended
- // up in two groups at once.
- if _, exists := schedulingDecisions[taskID]; exists {
- continue
- }
- node := &nodes[nodeIter%nodeCount]
- log.G(ctx).WithField("task.id", t.ID).Debugf("assigning to node %s", node.ID)
- newT := *t
- newT.NodeID = node.ID
- newT.Status = api.TaskStatus{
- State: api.TaskStateAssigned,
- Timestamp: ptypes.MustTimestampProto(time.Now()),
- Message: "scheduler assigned task to node",
- }
- s.allTasks[t.ID] = &newT
- nodeInfo, err := s.nodeSet.nodeInfo(node.ID)
- if err == nil && nodeInfo.addTask(&newT) {
- s.nodeSet.updateNode(nodeInfo)
- nodes[nodeIter%nodeCount] = nodeInfo
- }
- schedulingDecisions[taskID] = schedulingDecision{old: t, new: &newT}
- delete(taskGroup, taskID)
- tasksScheduled++
- if tasksScheduled == n {
- return tasksScheduled
- }
- if nodeIter+1 < nodeCount {
- // First pass fills the nodes until they have the same
- // number of tasks from this service.
- nextNode := nodes[(nodeIter+1)%nodeCount]
- if nodeLess(&nextNode, &nodeInfo) {
- nodeIter++
- }
- } else {
- // In later passes, we just assign one task at a time
- // to each node that still meets the constraints.
- nodeIter++
- }
- origNodeIter := nodeIter
- for failedConstraints[nodeIter%nodeCount] || !s.pipeline.Process(&nodes[nodeIter%nodeCount]) {
- failedConstraints[nodeIter%nodeCount] = true
- nodeIter++
- if nodeIter-origNodeIter == nodeCount {
- // None of the nodes meet the constraints anymore.
- return tasksScheduled
- }
- }
- }
- return tasksScheduled
- }
- func (s *Scheduler) noSuitableNode(ctx context.Context, taskGroup map[string]*api.Task, schedulingDecisions map[string]schedulingDecision) {
- explanation := s.pipeline.Explain()
- for _, t := range taskGroup {
- log.G(ctx).WithField("task.id", t.ID).Debug("no suitable node available for task")
- newT := *t
- newT.Status.Timestamp = ptypes.MustTimestampProto(time.Now())
- if explanation != "" {
- newT.Status.Message = "no suitable node (" + explanation + ")"
- } else {
- newT.Status.Message = "no suitable node"
- }
- s.allTasks[t.ID] = &newT
- schedulingDecisions[t.ID] = schedulingDecision{old: t, new: &newT}
- s.enqueue(&newT)
- }
- }
- func (s *Scheduler) buildNodeSet(tx store.ReadTx, tasksByNode map[string]map[string]*api.Task) error {
- nodes, err := store.FindNodes(tx, store.All)
- if err != nil {
- return err
- }
- s.nodeSet.alloc(len(nodes))
- for _, n := range nodes {
- var resources api.Resources
- if n.Description != nil && n.Description.Resources != nil {
- resources = *n.Description.Resources
- }
- s.nodeSet.addOrUpdateNode(newNodeInfo(n, tasksByNode[n.ID], resources))
- }
- return nil
- }
|