|
@@ -1,7 +1,6 @@
|
|
|
package scheduler
|
|
|
|
|
|
import (
|
|
|
- "container/list"
|
|
|
"time"
|
|
|
|
|
|
"github.com/docker/swarmkit/api"
|
|
@@ -30,7 +29,7 @@ type schedulingDecision struct {
|
|
|
// Scheduler assigns tasks to nodes.
|
|
|
type Scheduler struct {
|
|
|
store *store.MemoryStore
|
|
|
- unassignedTasks *list.List
|
|
|
+ unassignedTasks map[string]*api.Task
|
|
|
// preassignedTasks already have NodeID, need resource validation
|
|
|
preassignedTasks map[string]*api.Task
|
|
|
nodeSet nodeSet
|
|
@@ -47,7 +46,7 @@ type Scheduler struct {
|
|
|
func New(store *store.MemoryStore) *Scheduler {
|
|
|
return &Scheduler{
|
|
|
store: store,
|
|
|
- unassignedTasks: list.New(),
|
|
|
+ unassignedTasks: make(map[string]*api.Task),
|
|
|
preassignedTasks: make(map[string]*api.Task),
|
|
|
allTasks: make(map[string]*api.Task),
|
|
|
stopChan: make(chan struct{}),
|
|
@@ -191,7 +190,7 @@ func (s *Scheduler) Stop() {
|
|
|
|
|
|
// enqueue queues a task for scheduling.
|
|
|
func (s *Scheduler) enqueue(t *api.Task) {
|
|
|
- s.unassignedTasks.PushBack(t)
|
|
|
+ s.unassignedTasks[t.ID] = t
|
|
|
}
|
|
|
|
|
|
func (s *Scheduler) createTask(ctx context.Context, t *api.Task) int {
|
|
@@ -333,15 +332,12 @@ func (s *Scheduler) processPreassignedTasks(ctx context.Context) {
|
|
|
// tick attempts to schedule the queue.
|
|
|
func (s *Scheduler) tick(ctx context.Context) {
|
|
|
tasksByCommonSpec := make(map[string]map[string]*api.Task)
|
|
|
- schedulingDecisions := make(map[string]schedulingDecision, s.unassignedTasks.Len())
|
|
|
+ schedulingDecisions := make(map[string]schedulingDecision, len(s.unassignedTasks))
|
|
|
|
|
|
- var next *list.Element
|
|
|
- for e := s.unassignedTasks.Front(); e != nil; e = next {
|
|
|
- next = e.Next()
|
|
|
- t := s.allTasks[e.Value.(*api.Task).ID]
|
|
|
+ for taskID, t := range s.unassignedTasks {
|
|
|
if t == nil || t.NodeID != "" {
|
|
|
// task deleted or already assigned
|
|
|
- s.unassignedTasks.Remove(e)
|
|
|
+ delete(s.unassignedTasks, taskID)
|
|
|
continue
|
|
|
}
|
|
|
|
|
@@ -362,8 +358,8 @@ func (s *Scheduler) tick(ctx context.Context) {
|
|
|
if tasksByCommonSpec[taskGroupKey] == nil {
|
|
|
tasksByCommonSpec[taskGroupKey] = make(map[string]*api.Task)
|
|
|
}
|
|
|
- tasksByCommonSpec[taskGroupKey][t.ID] = t
|
|
|
- s.unassignedTasks.Remove(e)
|
|
|
+ tasksByCommonSpec[taskGroupKey][taskID] = t
|
|
|
+ delete(s.unassignedTasks, taskID)
|
|
|
}
|
|
|
|
|
|
for _, taskGroup := range tasksByCommonSpec {
|
|
@@ -540,6 +536,12 @@ func (s *Scheduler) scheduleTaskGroup(ctx context.Context, taskGroup map[string]
|
|
|
failedConstraints := make(map[int]bool) // key is index in nodes slice
|
|
|
nodeIter := 0
|
|
|
for taskID, t := range taskGroup {
|
|
|
+ // Skip tasks which were already scheduled because they ended
|
|
|
+ // up in two groups at once.
|
|
|
+ if _, exists := schedulingDecisions[taskID]; exists {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
n := &nodes[nodeIter%len(nodes)]
|
|
|
|
|
|
log.G(ctx).WithField("task.id", t.ID).Debugf("assigning to node %s", n.ID)
|