|
@@ -26,7 +26,11 @@ const (
|
|
|
allocatedStatusMessage = "pending task scheduling"
|
|
|
)
|
|
|
|
|
|
-var errNoChanges = errors.New("task unchanged")
|
|
|
+var (
|
|
|
+ errNoChanges = errors.New("task unchanged")
|
|
|
+
|
|
|
+ retryInterval = 5 * time.Minute
|
|
|
+)
|
|
|
|
|
|
func newIngressNetwork() *api.Network {
|
|
|
return &api.Network{
|
|
@@ -57,19 +61,28 @@ type networkContext struct {
|
|
|
// the actual network allocation.
|
|
|
nwkAllocator *networkallocator.NetworkAllocator
|
|
|
|
|
|
- // A table of unallocated tasks which will be revisited if any thing
|
|
|
+ // A set of tasks which are ready to be allocated as a batch. This is
|
|
|
+ // distinct from "unallocatedTasks" which are tasks that failed to
|
|
|
+ // allocate on the first try, being held for a future retry.
|
|
|
+ pendingTasks map[string]*api.Task
|
|
|
+
|
|
|
+ // A set of unallocated tasks which will be revisited if any thing
|
|
|
// changes in system state that might help task allocation.
|
|
|
unallocatedTasks map[string]*api.Task
|
|
|
|
|
|
- // A table of unallocated services which will be revisited if
|
|
|
+ // A set of unallocated services which will be revisited if
|
|
|
// any thing changes in system state that might help service
|
|
|
// allocation.
|
|
|
unallocatedServices map[string]*api.Service
|
|
|
|
|
|
- // A table of unallocated networks which will be revisited if
|
|
|
+ // A set of unallocated networks which will be revisited if
|
|
|
// any thing changes in system state that might help network
|
|
|
// allocation.
|
|
|
unallocatedNetworks map[string]*api.Network
|
|
|
+
|
|
|
+ // lastRetry is the last timestamp when unallocated
|
|
|
+ // tasks/services/networks were retried.
|
|
|
+ lastRetry time.Time
|
|
|
}
|
|
|
|
|
|
func (a *Allocator) doNetworkInit(ctx context.Context) (err error) {
|
|
@@ -80,10 +93,12 @@ func (a *Allocator) doNetworkInit(ctx context.Context) (err error) {
|
|
|
|
|
|
nc := &networkContext{
|
|
|
nwkAllocator: na,
|
|
|
+ pendingTasks: make(map[string]*api.Task),
|
|
|
unallocatedTasks: make(map[string]*api.Task),
|
|
|
unallocatedServices: make(map[string]*api.Service),
|
|
|
unallocatedNetworks: make(map[string]*api.Network),
|
|
|
ingressNetwork: newIngressNetwork(),
|
|
|
+ lastRetry: time.Now(),
|
|
|
}
|
|
|
a.netCtx = nc
|
|
|
defer func() {
|
|
@@ -266,7 +281,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) (err error) {
|
|
|
}
|
|
|
|
|
|
for _, t := range tasks {
|
|
|
- if taskDead(t) {
|
|
|
+ if t.Status.State > api.TaskStateRunning {
|
|
|
continue
|
|
|
}
|
|
|
|
|
@@ -351,6 +366,8 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
|
|
|
if err := nc.nwkAllocator.Deallocate(n); err != nil {
|
|
|
log.G(ctx).WithError(err).Errorf("Failed during network free for network %s", n.ID)
|
|
|
}
|
|
|
+
|
|
|
+ delete(nc.unallocatedNetworks, n.ID)
|
|
|
case state.EventCreateService:
|
|
|
s := v.Service.Copy()
|
|
|
|
|
@@ -387,6 +404,9 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
|
|
|
return a.commitAllocatedService(ctx, batch, s)
|
|
|
}); err != nil {
|
|
|
log.G(ctx).WithError(err).Errorf("Failed to commit allocation during update for service %s", s.ID)
|
|
|
+ nc.unallocatedServices[s.ID] = s
|
|
|
+ } else {
|
|
|
+ delete(nc.unallocatedServices, s.ID)
|
|
|
}
|
|
|
case state.EventDeleteService:
|
|
|
s := v.Service.Copy()
|
|
@@ -403,10 +423,20 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
|
|
|
case state.EventCreateTask, state.EventUpdateTask, state.EventDeleteTask:
|
|
|
a.doTaskAlloc(ctx, ev)
|
|
|
case state.EventCommit:
|
|
|
- a.procUnallocatedNetworks(ctx)
|
|
|
- a.procUnallocatedServices(ctx)
|
|
|
- a.procUnallocatedTasksNetwork(ctx)
|
|
|
- return
|
|
|
+ a.procTasksNetwork(ctx, false)
|
|
|
+
|
|
|
+ if time.Since(nc.lastRetry) > retryInterval {
|
|
|
+ a.procUnallocatedNetworks(ctx)
|
|
|
+ a.procUnallocatedServices(ctx)
|
|
|
+ a.procTasksNetwork(ctx, true)
|
|
|
+ nc.lastRetry = time.Now()
|
|
|
+ }
|
|
|
+
|
|
|
+ // Any left over tasks are moved to the unallocated set
|
|
|
+ for _, t := range nc.pendingTasks {
|
|
|
+ nc.unallocatedTasks[t.ID] = t
|
|
|
+ }
|
|
|
+ nc.pendingTasks = make(map[string]*api.Task)
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -456,17 +486,6 @@ func (a *Allocator) doNodeAlloc(ctx context.Context, ev events.Event) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-// taskRunning checks whether a task is either actively running, or in the
|
|
|
-// process of starting up.
|
|
|
-func taskRunning(t *api.Task) bool {
|
|
|
- return t.DesiredState <= api.TaskStateRunning && t.Status.State <= api.TaskStateRunning
|
|
|
-}
|
|
|
-
|
|
|
-// taskDead checks whether a task is not actively running as far as allocator purposes are concerned.
|
|
|
-func taskDead(t *api.Task) bool {
|
|
|
- return t.DesiredState > api.TaskStateRunning && t.Status.State > api.TaskStateRunning
|
|
|
-}
|
|
|
-
|
|
|
// taskReadyForNetworkVote checks if the task is ready for a network
|
|
|
// vote to move it to PENDING state.
|
|
|
func taskReadyForNetworkVote(t *api.Task, s *api.Service, nc *networkContext) bool {
|
|
@@ -569,17 +588,17 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) {
|
|
|
|
|
|
nc := a.netCtx
|
|
|
|
|
|
- // If the task has stopped running or it's being deleted then
|
|
|
- // we should free the network resources associated with the
|
|
|
- // task right away.
|
|
|
- if taskDead(t) || isDelete {
|
|
|
+ // If the task has stopped running then we should free the network
|
|
|
+ // resources associated with the task right away.
|
|
|
+ if t.Status.State > api.TaskStateRunning || isDelete {
|
|
|
if nc.nwkAllocator.IsTaskAllocated(t) {
|
|
|
if err := nc.nwkAllocator.DeallocateTask(t); err != nil {
|
|
|
log.G(ctx).WithError(err).Errorf("Failed freeing network resources for task %s", t.ID)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // Cleanup any task references that might exist in unallocatedTasks
|
|
|
+ // Cleanup any task references that might exist
|
|
|
+ delete(nc.pendingTasks, t.ID)
|
|
|
delete(nc.unallocatedTasks, t.ID)
|
|
|
return
|
|
|
}
|
|
@@ -587,6 +606,7 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) {
|
|
|
// If we are already in allocated state, there is
|
|
|
// absolutely nothing else to do.
|
|
|
if t.Status.State >= api.TaskStatePending {
|
|
|
+ delete(nc.pendingTasks, t.ID)
|
|
|
delete(nc.unallocatedTasks, t.ID)
|
|
|
return
|
|
|
}
|
|
@@ -605,7 +625,7 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) {
|
|
|
// available in store. But we still need to
|
|
|
// cleanup network resources associated with
|
|
|
// the task.
|
|
|
- if taskRunning(t) && !isDelete {
|
|
|
+ if t.Status.State <= api.TaskStateRunning && !isDelete {
|
|
|
log.G(ctx).Errorf("Event %T: Failed to get service %s for task %s state %s: could not find service %s", ev, t.ServiceID, t.ID, t.Status.State, t.ServiceID)
|
|
|
return
|
|
|
}
|
|
@@ -616,7 +636,7 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) {
|
|
|
// based on service spec.
|
|
|
a.taskCreateNetworkAttachments(t, s)
|
|
|
|
|
|
- nc.unallocatedTasks[t.ID] = t
|
|
|
+ nc.pendingTasks[t.ID] = t
|
|
|
}
|
|
|
|
|
|
func (a *Allocator) allocateNode(ctx context.Context, node *api.Node) error {
|
|
@@ -948,15 +968,25 @@ func (a *Allocator) procUnallocatedServices(ctx context.Context) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (a *Allocator) procUnallocatedTasksNetwork(ctx context.Context) {
|
|
|
+func (a *Allocator) procTasksNetwork(ctx context.Context, onRetry bool) {
|
|
|
nc := a.netCtx
|
|
|
- allocatedTasks := make([]*api.Task, 0, len(nc.unallocatedTasks))
|
|
|
+ quiet := false
|
|
|
+ toAllocate := nc.pendingTasks
|
|
|
+ if onRetry {
|
|
|
+ toAllocate = nc.unallocatedTasks
|
|
|
+ quiet = true
|
|
|
+ }
|
|
|
+ allocatedTasks := make([]*api.Task, 0, len(toAllocate))
|
|
|
|
|
|
- for _, t := range nc.unallocatedTasks {
|
|
|
+ for _, t := range toAllocate {
|
|
|
if err := a.allocateTask(ctx, t); err == nil {
|
|
|
allocatedTasks = append(allocatedTasks, t)
|
|
|
} else if err != errNoChanges {
|
|
|
- log.G(ctx).WithError(err).Error("task allocation failure")
|
|
|
+ if quiet {
|
|
|
+ log.G(ctx).WithError(err).Debug("task allocation failure")
|
|
|
+ } else {
|
|
|
+ log.G(ctx).WithError(err).Error("task allocation failure")
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -978,11 +1008,11 @@ func (a *Allocator) procUnallocatedTasksNetwork(ctx context.Context) {
|
|
|
})
|
|
|
|
|
|
if err != nil {
|
|
|
- log.G(ctx).WithError(err).Error("failed a store batch operation while processing unallocated tasks")
|
|
|
+ log.G(ctx).WithError(err).Error("failed a store batch operation while processing tasks")
|
|
|
}
|
|
|
|
|
|
for _, t := range allocatedTasks[:committed] {
|
|
|
- delete(nc.unallocatedTasks, t.ID)
|
|
|
+ delete(toAllocate, t.ID)
|
|
|
}
|
|
|
}
|
|
|
|