|
@@ -3,19 +3,27 @@ package orchestrator
|
|
import (
|
|
import (
|
|
"github.com/docker/swarmkit/api"
|
|
"github.com/docker/swarmkit/api"
|
|
"github.com/docker/swarmkit/log"
|
|
"github.com/docker/swarmkit/log"
|
|
|
|
+ "github.com/docker/swarmkit/manager/constraint"
|
|
"github.com/docker/swarmkit/manager/state"
|
|
"github.com/docker/swarmkit/manager/state"
|
|
"github.com/docker/swarmkit/manager/state/store"
|
|
"github.com/docker/swarmkit/manager/state/store"
|
|
"golang.org/x/net/context"
|
|
"golang.org/x/net/context"
|
|
)
|
|
)
|
|
|
|
|
|
|
|
+type globalService struct {
|
|
|
|
+ *api.Service
|
|
|
|
+
|
|
|
|
+ // Compiled constraints
|
|
|
|
+ constraints []constraint.Constraint
|
|
|
|
+}
|
|
|
|
+
|
|
// GlobalOrchestrator runs a reconciliation loop to create and destroy
|
|
// GlobalOrchestrator runs a reconciliation loop to create and destroy
|
|
// tasks as necessary for global services.
|
|
// tasks as necessary for global services.
|
|
type GlobalOrchestrator struct {
|
|
type GlobalOrchestrator struct {
|
|
store *store.MemoryStore
|
|
store *store.MemoryStore
|
|
- // nodes contains nodeID of all valid nodes in the cluster
|
|
|
|
- nodes map[string]struct{}
|
|
|
|
- // globalServices have all the global services in the cluster, indexed by ServiceID
|
|
|
|
- globalServices map[string]*api.Service
|
|
|
|
|
|
+ // nodes is the set of non-drained nodes in the cluster, indexed by node ID
|
|
|
|
+ nodes map[string]*api.Node
|
|
|
|
+ // globalServices has all the global services in the cluster, indexed by ServiceID
|
|
|
|
+ globalServices map[string]globalService
|
|
|
|
|
|
// stopChan signals to the state machine to stop running.
|
|
// stopChan signals to the state machine to stop running.
|
|
stopChan chan struct{}
|
|
stopChan chan struct{}
|
|
@@ -34,8 +42,8 @@ func NewGlobalOrchestrator(store *store.MemoryStore) *GlobalOrchestrator {
|
|
updater := NewUpdateSupervisor(store, restartSupervisor)
|
|
updater := NewUpdateSupervisor(store, restartSupervisor)
|
|
return &GlobalOrchestrator{
|
|
return &GlobalOrchestrator{
|
|
store: store,
|
|
store: store,
|
|
- nodes: make(map[string]struct{}),
|
|
|
|
- globalServices: make(map[string]*api.Service),
|
|
|
|
|
|
+ nodes: make(map[string]*api.Node),
|
|
|
|
+ globalServices: make(map[string]globalService),
|
|
stopChan: make(chan struct{}),
|
|
stopChan: make(chan struct{}),
|
|
doneChan: make(chan struct{}),
|
|
doneChan: make(chan struct{}),
|
|
updater: updater,
|
|
updater: updater,
|
|
@@ -76,10 +84,7 @@ func (g *GlobalOrchestrator) Run(ctx context.Context) error {
|
|
return err
|
|
return err
|
|
}
|
|
}
|
|
for _, n := range nodes {
|
|
for _, n := range nodes {
|
|
- // if a node is in drain state, do not add it
|
|
|
|
- if isValidNode(n) {
|
|
|
|
- g.nodes[n.ID] = struct{}{}
|
|
|
|
- }
|
|
|
|
|
|
+ g.updateNode(n)
|
|
}
|
|
}
|
|
|
|
|
|
// Lookup global services
|
|
// Lookup global services
|
|
@@ -90,12 +95,15 @@ func (g *GlobalOrchestrator) Run(ctx context.Context) error {
|
|
if err != nil {
|
|
if err != nil {
|
|
return err
|
|
return err
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ var reconcileServiceIDs []string
|
|
for _, s := range existingServices {
|
|
for _, s := range existingServices {
|
|
if isGlobalService(s) {
|
|
if isGlobalService(s) {
|
|
- g.globalServices[s.ID] = s
|
|
|
|
- g.reconcileOneService(ctx, s)
|
|
|
|
|
|
+ g.updateService(s)
|
|
|
|
+ reconcileServiceIDs = append(reconcileServiceIDs, s.ID)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ g.reconcileServices(ctx, reconcileServiceIDs)
|
|
|
|
|
|
for {
|
|
for {
|
|
select {
|
|
select {
|
|
@@ -108,14 +116,14 @@ func (g *GlobalOrchestrator) Run(ctx context.Context) error {
|
|
if !isGlobalService(v.Service) {
|
|
if !isGlobalService(v.Service) {
|
|
continue
|
|
continue
|
|
}
|
|
}
|
|
- g.globalServices[v.Service.ID] = v.Service
|
|
|
|
- g.reconcileOneService(ctx, v.Service)
|
|
|
|
|
|
+ g.updateService(v.Service)
|
|
|
|
+ g.reconcileServices(ctx, []string{v.Service.ID})
|
|
case state.EventUpdateService:
|
|
case state.EventUpdateService:
|
|
if !isGlobalService(v.Service) {
|
|
if !isGlobalService(v.Service) {
|
|
continue
|
|
continue
|
|
}
|
|
}
|
|
- g.globalServices[v.Service.ID] = v.Service
|
|
|
|
- g.reconcileOneService(ctx, v.Service)
|
|
|
|
|
|
+ g.updateService(v.Service)
|
|
|
|
+ g.reconcileServices(ctx, []string{v.Service.ID})
|
|
case state.EventDeleteService:
|
|
case state.EventDeleteService:
|
|
if !isGlobalService(v.Service) {
|
|
if !isGlobalService(v.Service) {
|
|
continue
|
|
continue
|
|
@@ -125,8 +133,10 @@ func (g *GlobalOrchestrator) Run(ctx context.Context) error {
|
|
delete(g.globalServices, v.Service.ID)
|
|
delete(g.globalServices, v.Service.ID)
|
|
g.restarts.ClearServiceHistory(v.Service.ID)
|
|
g.restarts.ClearServiceHistory(v.Service.ID)
|
|
case state.EventCreateNode:
|
|
case state.EventCreateNode:
|
|
|
|
+ g.updateNode(v.Node)
|
|
g.reconcileOneNode(ctx, v.Node)
|
|
g.reconcileOneNode(ctx, v.Node)
|
|
case state.EventUpdateNode:
|
|
case state.EventUpdateNode:
|
|
|
|
+ g.updateNode(v.Node)
|
|
switch v.Node.Status.State {
|
|
switch v.Node.Status.State {
|
|
// NodeStatus_DISCONNECTED is a transient state, no need to make any change
|
|
// NodeStatus_DISCONNECTED is a transient state, no need to make any change
|
|
case api.NodeStatus_DOWN:
|
|
case api.NodeStatus_DOWN:
|
|
@@ -153,7 +163,7 @@ func (g *GlobalOrchestrator) Run(ctx context.Context) error {
|
|
if _, exists := g.globalServices[v.Task.ServiceID]; !exists {
|
|
if _, exists := g.globalServices[v.Task.ServiceID]; !exists {
|
|
continue
|
|
continue
|
|
}
|
|
}
|
|
- g.reconcileServiceOneNode(ctx, v.Task.ServiceID, v.Task.NodeID)
|
|
|
|
|
|
+ g.reconcileServicesOneNode(ctx, []string{v.Task.ServiceID}, v.Task.NodeID)
|
|
}
|
|
}
|
|
case <-g.stopChan:
|
|
case <-g.stopChan:
|
|
return nil
|
|
return nil
|
|
@@ -196,138 +206,225 @@ func (g *GlobalOrchestrator) removeTasksFromNode(ctx context.Context, node *api.
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-func (g *GlobalOrchestrator) reconcileOneService(ctx context.Context, service *api.Service) {
|
|
|
|
- var (
|
|
|
|
- tasks []*api.Task
|
|
|
|
- err error
|
|
|
|
- )
|
|
|
|
|
|
+func (g *GlobalOrchestrator) reconcileServices(ctx context.Context, serviceIDs []string) {
|
|
|
|
+ nodeCompleted := make(map[string]map[string]struct{})
|
|
|
|
+ nodeTasks := make(map[string]map[string][]*api.Task)
|
|
|
|
+
|
|
g.store.View(func(tx store.ReadTx) {
|
|
g.store.View(func(tx store.ReadTx) {
|
|
- tasks, err = store.FindTasks(tx, store.ByServiceID(service.ID))
|
|
|
|
- })
|
|
|
|
- if err != nil {
|
|
|
|
- log.G(ctx).WithError(err).Errorf("global orchestrator: reconcileOneService failed finding tasks")
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
- // a node may have completed this service
|
|
|
|
- nodeCompleted := make(map[string]struct{})
|
|
|
|
- // nodeID -> task list
|
|
|
|
- nodeTasks := make(map[string][]*api.Task)
|
|
|
|
|
|
+ for _, serviceID := range serviceIDs {
|
|
|
|
+ tasks, err := store.FindTasks(tx, store.ByServiceID(serviceID))
|
|
|
|
+ if err != nil {
|
|
|
|
+ log.G(ctx).WithError(err).Errorf("global orchestrator: reconcileServices failed finding tasks for service %s", serviceID)
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
|
|
- for _, t := range tasks {
|
|
|
|
- if isTaskRunning(t) {
|
|
|
|
- // Collect all running instances of this service
|
|
|
|
- nodeTasks[t.NodeID] = append(nodeTasks[t.NodeID], t)
|
|
|
|
- } else {
|
|
|
|
- // for finished tasks, check restartPolicy
|
|
|
|
- if isTaskCompleted(t, restartCondition(t)) {
|
|
|
|
- nodeCompleted[t.NodeID] = struct{}{}
|
|
|
|
|
|
+ // a node may have completed this service
|
|
|
|
+ nodeCompleted[serviceID] = make(map[string]struct{})
|
|
|
|
+ // nodeID -> task list
|
|
|
|
+ nodeTasks[serviceID] = make(map[string][]*api.Task)
|
|
|
|
+
|
|
|
|
+ for _, t := range tasks {
|
|
|
|
+ if isTaskRunning(t) {
|
|
|
|
+ // Collect all running instances of this service
|
|
|
|
+ nodeTasks[serviceID][t.NodeID] = append(nodeTasks[serviceID][t.NodeID], t)
|
|
|
|
+ } else {
|
|
|
|
+ // for finished tasks, check restartPolicy
|
|
|
|
+ if isTaskCompleted(t, restartCondition(t)) {
|
|
|
|
+ nodeCompleted[serviceID][t.NodeID] = struct{}{}
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- }
|
|
|
|
|
|
+ })
|
|
|
|
|
|
- _, err = g.store.Batch(func(batch *store.Batch) error {
|
|
|
|
|
|
+ _, err := g.store.Batch(func(batch *store.Batch) error {
|
|
var updateTasks []slot
|
|
var updateTasks []slot
|
|
- for nodeID := range g.nodes {
|
|
|
|
- ntasks := nodeTasks[nodeID]
|
|
|
|
- // if restart policy considers this node has finished its task
|
|
|
|
- // it should remove all running tasks
|
|
|
|
- if _, exists := nodeCompleted[nodeID]; exists {
|
|
|
|
- g.removeTasks(ctx, batch, service, ntasks)
|
|
|
|
- return nil
|
|
|
|
|
|
+ for _, serviceID := range serviceIDs {
|
|
|
|
+ if _, exists := nodeTasks[serviceID]; !exists {
|
|
|
|
+ continue
|
|
}
|
|
}
|
|
- // this node needs to run 1 copy of the task
|
|
|
|
- if len(ntasks) == 0 {
|
|
|
|
- g.addTask(ctx, batch, service, nodeID)
|
|
|
|
- } else {
|
|
|
|
- updateTasks = append(updateTasks, ntasks)
|
|
|
|
|
|
+
|
|
|
|
+ service := g.globalServices[serviceID]
|
|
|
|
+
|
|
|
|
+ for nodeID, node := range g.nodes {
|
|
|
|
+ meetsConstraints := constraint.NodeMatches(service.constraints, node)
|
|
|
|
+ ntasks := nodeTasks[serviceID][nodeID]
|
|
|
|
+ delete(nodeTasks[serviceID], nodeID)
|
|
|
|
+
|
|
|
|
+ // if restart policy considers this node has finished its task
|
|
|
|
+ // it should remove all running tasks
|
|
|
|
+ if _, exists := nodeCompleted[serviceID][nodeID]; exists || !meetsConstraints {
|
|
|
|
+ g.removeTasks(ctx, batch, ntasks)
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if node.Spec.Availability == api.NodeAvailabilityPause {
|
|
|
|
+ // the node is paused, so we won't add or update
|
|
|
|
+ // any tasks
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // this node needs to run 1 copy of the task
|
|
|
|
+ if len(ntasks) == 0 {
|
|
|
|
+ g.addTask(ctx, batch, service.Service, nodeID)
|
|
|
|
+ } else {
|
|
|
|
+ updateTasks = append(updateTasks, ntasks)
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if len(updateTasks) > 0 {
|
|
|
|
+ g.updater.Update(ctx, g.cluster, service.Service, updateTasks)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Remove any tasks assigned to nodes not found in g.nodes.
|
|
|
|
+ // These must be associated with nodes that are drained, or
|
|
|
|
+ // nodes that no longer exist.
|
|
|
|
+ for _, ntasks := range nodeTasks[serviceID] {
|
|
|
|
+ g.removeTasks(ctx, batch, ntasks)
|
|
}
|
|
}
|
|
- }
|
|
|
|
- if len(updateTasks) > 0 {
|
|
|
|
- g.updater.Update(ctx, g.cluster, service, updateTasks)
|
|
|
|
}
|
|
}
|
|
return nil
|
|
return nil
|
|
})
|
|
})
|
|
if err != nil {
|
|
if err != nil {
|
|
- log.G(ctx).WithError(err).Errorf("global orchestrator: reconcileOneService transaction failed")
|
|
|
|
|
|
+ log.G(ctx).WithError(err).Errorf("global orchestrator: reconcileServices transaction failed")
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// updateNode updates g.nodes based on the current node value
|
|
|
|
+func (g *GlobalOrchestrator) updateNode(node *api.Node) {
|
|
|
|
+ if node.Spec.Availability == api.NodeAvailabilityDrain {
|
|
|
|
+ delete(g.nodes, node.ID)
|
|
|
|
+ } else {
|
|
|
|
+ g.nodes[node.ID] = node
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// updateService updates g.globalServices based on the current service value
|
|
|
|
+func (g *GlobalOrchestrator) updateService(service *api.Service) {
|
|
|
|
+ var constraints []constraint.Constraint
|
|
|
|
+
|
|
|
|
+ if service.Spec.Task.Placement != nil && len(service.Spec.Task.Placement.Constraints) != 0 {
|
|
|
|
+ constraints, _ = constraint.Parse(service.Spec.Task.Placement.Constraints)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ g.globalServices[service.ID] = globalService{
|
|
|
|
+ Service: service,
|
|
|
|
+ constraints: constraints,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
// reconcileOneNode checks all global services on one node
|
|
// reconcileOneNode checks all global services on one node
|
|
func (g *GlobalOrchestrator) reconcileOneNode(ctx context.Context, node *api.Node) {
|
|
func (g *GlobalOrchestrator) reconcileOneNode(ctx context.Context, node *api.Node) {
|
|
- switch node.Spec.Availability {
|
|
|
|
- case api.NodeAvailabilityDrain:
|
|
|
|
|
|
+ if node.Spec.Availability == api.NodeAvailabilityDrain {
|
|
log.G(ctx).Debugf("global orchestrator: node %s in drain state, removing tasks from it", node.ID)
|
|
log.G(ctx).Debugf("global orchestrator: node %s in drain state, removing tasks from it", node.ID)
|
|
g.removeTasksFromNode(ctx, node)
|
|
g.removeTasksFromNode(ctx, node)
|
|
- delete(g.nodes, node.ID)
|
|
|
|
- return
|
|
|
|
- case api.NodeAvailabilityActive:
|
|
|
|
- if _, exists := g.nodes[node.ID]; !exists {
|
|
|
|
- log.G(ctx).Debugf("global orchestrator: node %s not in current node list, adding it", node.ID)
|
|
|
|
- g.nodes[node.ID] = struct{}{}
|
|
|
|
- }
|
|
|
|
- default:
|
|
|
|
- log.G(ctx).Debugf("global orchestrator: node %s in %s state, doing nothing", node.ID, node.Spec.Availability.String())
|
|
|
|
return
|
|
return
|
|
}
|
|
}
|
|
- // typically there are only a few global services on a node
|
|
|
|
- // iterate through all of them one by one. If raft store visits become a concern,
|
|
|
|
- // it can be optimized.
|
|
|
|
- for _, service := range g.globalServices {
|
|
|
|
- g.reconcileServiceOneNode(ctx, service.ID, node.ID)
|
|
|
|
|
|
+
|
|
|
|
+ var serviceIDs []string
|
|
|
|
+ for id := range g.globalServices {
|
|
|
|
+ serviceIDs = append(serviceIDs, id)
|
|
}
|
|
}
|
|
|
|
+ g.reconcileServicesOneNode(ctx, serviceIDs, node.ID)
|
|
}
|
|
}
|
|
|
|
|
|
-// reconcileServiceOneNode checks one service on one node
|
|
|
|
-func (g *GlobalOrchestrator) reconcileServiceOneNode(ctx context.Context, serviceID string, nodeID string) {
|
|
|
|
- _, exists := g.nodes[nodeID]
|
|
|
|
- if !exists {
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
- service, exists := g.globalServices[serviceID]
|
|
|
|
|
|
+// reconcileServicesOneNode checks the specified services on one node
|
|
|
|
+func (g *GlobalOrchestrator) reconcileServicesOneNode(ctx context.Context, serviceIDs []string, nodeID string) {
|
|
|
|
+ node, exists := g.nodes[nodeID]
|
|
if !exists {
|
|
if !exists {
|
|
return
|
|
return
|
|
}
|
|
}
|
|
- // the node has completed this servie
|
|
|
|
- completed := false
|
|
|
|
- // tasks for this node and service
|
|
|
|
|
|
+
|
|
|
|
+ // whether each service has completed on the node
|
|
|
|
+ completed := make(map[string]bool)
|
|
|
|
+ // tasks by service
|
|
|
|
+ tasks := make(map[string][]*api.Task)
|
|
|
|
+
|
|
var (
|
|
var (
|
|
- tasks []*api.Task
|
|
|
|
- err error
|
|
|
|
|
|
+ tasksOnNode []*api.Task
|
|
|
|
+ err error
|
|
)
|
|
)
|
|
|
|
+
|
|
g.store.View(func(tx store.ReadTx) {
|
|
g.store.View(func(tx store.ReadTx) {
|
|
- var tasksOnNode []*api.Task
|
|
|
|
tasksOnNode, err = store.FindTasks(tx, store.ByNodeID(nodeID))
|
|
tasksOnNode, err = store.FindTasks(tx, store.ByNodeID(nodeID))
|
|
- if err != nil {
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
|
|
+ })
|
|
|
|
+ if err != nil {
|
|
|
|
+ log.G(ctx).WithError(err).Errorf("global orchestrator: reconcile failed finding tasks on node %s", nodeID)
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ for _, serviceID := range serviceIDs {
|
|
for _, t := range tasksOnNode {
|
|
for _, t := range tasksOnNode {
|
|
- // only interested in one service
|
|
|
|
if t.ServiceID != serviceID {
|
|
if t.ServiceID != serviceID {
|
|
continue
|
|
continue
|
|
}
|
|
}
|
|
if isTaskRunning(t) {
|
|
if isTaskRunning(t) {
|
|
- tasks = append(tasks, t)
|
|
|
|
|
|
+ tasks[serviceID] = append(tasks[serviceID], t)
|
|
} else {
|
|
} else {
|
|
if isTaskCompleted(t, restartCondition(t)) {
|
|
if isTaskCompleted(t, restartCondition(t)) {
|
|
- completed = true
|
|
|
|
|
|
+ completed[serviceID] = true
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- })
|
|
|
|
- if err != nil {
|
|
|
|
- log.G(ctx).WithError(err).Errorf("global orchestrator: reconcile failed finding tasks")
|
|
|
|
- return
|
|
|
|
}
|
|
}
|
|
|
|
|
|
_, err = g.store.Batch(func(batch *store.Batch) error {
|
|
_, err = g.store.Batch(func(batch *store.Batch) error {
|
|
- // if restart policy considers this node has finished its task
|
|
|
|
- // it should remove all running tasks
|
|
|
|
- if completed {
|
|
|
|
- g.removeTasks(ctx, batch, service, tasks)
|
|
|
|
- return nil
|
|
|
|
- }
|
|
|
|
- if len(tasks) == 0 {
|
|
|
|
- g.addTask(ctx, batch, service, nodeID)
|
|
|
|
|
|
+ for _, serviceID := range serviceIDs {
|
|
|
|
+ service, exists := g.globalServices[serviceID]
|
|
|
|
+ if !exists {
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ meetsConstraints := constraint.NodeMatches(service.constraints, node)
|
|
|
|
+
|
|
|
|
+ // if restart policy considers this node has finished its task
|
|
|
|
+ // it should remove all running tasks
|
|
|
|
+ if completed[serviceID] || !meetsConstraints {
|
|
|
|
+ g.removeTasks(ctx, batch, tasks[serviceID])
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if node.Spec.Availability == api.NodeAvailabilityPause {
|
|
|
|
+ // the node is paused, so we won't add or update tasks
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if len(tasks) == 0 {
|
|
|
|
+ g.addTask(ctx, batch, service.Service, nodeID)
|
|
|
|
+ } else {
|
|
|
|
+ // If task is out of date, update it. This can happen
|
|
|
|
+ // on node reconciliation if, for example, we pause a
|
|
|
|
+ // node, update the service, and then activate the node
|
|
|
|
+ // later.
|
|
|
|
+
|
|
|
|
+ // We don't use g.updater here for two reasons:
|
|
|
|
+ // - This is not a rolling update. Since it was not
|
|
|
|
+ // triggered directly by updating the service, it
|
|
|
|
+ // should not observe the rolling update parameters
|
|
|
|
+ // or show status in UpdateStatus.
|
|
|
|
+ // - Calling Update cancels any current rolling updates
|
|
|
|
+ // for the service, such as one triggered by service
|
|
|
|
+ // reconciliation.
|
|
|
|
+
|
|
|
|
+ var (
|
|
|
|
+ dirtyTasks []*api.Task
|
|
|
|
+ cleanTasks []*api.Task
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ for _, t := range tasks[serviceID] {
|
|
|
|
+ if isTaskDirty(service.Service, t) {
|
|
|
|
+ dirtyTasks = append(dirtyTasks, t)
|
|
|
|
+ } else {
|
|
|
|
+ cleanTasks = append(cleanTasks, t)
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if len(cleanTasks) == 0 {
|
|
|
|
+ g.addTask(ctx, batch, service.Service, nodeID)
|
|
|
|
+ } else {
|
|
|
|
+ dirtyTasks = append(dirtyTasks, cleanTasks[1:]...)
|
|
|
|
+ }
|
|
|
|
+ g.removeTasks(ctx, batch, dirtyTasks)
|
|
|
|
+ }
|
|
}
|
|
}
|
|
return nil
|
|
return nil
|
|
})
|
|
})
|
|
@@ -383,7 +480,7 @@ func (g *GlobalOrchestrator) addTask(ctx context.Context, batch *store.Batch, se
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-func (g *GlobalOrchestrator) removeTasks(ctx context.Context, batch *store.Batch, service *api.Service, tasks []*api.Task) {
|
|
|
|
|
|
+func (g *GlobalOrchestrator) removeTasks(ctx context.Context, batch *store.Batch, tasks []*api.Task) {
|
|
for _, t := range tasks {
|
|
for _, t := range tasks {
|
|
g.removeTask(ctx, batch, t)
|
|
g.removeTask(ctx, batch, t)
|
|
}
|
|
}
|
|
@@ -393,11 +490,6 @@ func isTaskRunning(t *api.Task) bool {
|
|
return t != nil && t.DesiredState <= api.TaskStateRunning && t.Status.State <= api.TaskStateRunning
|
|
return t != nil && t.DesiredState <= api.TaskStateRunning && t.Status.State <= api.TaskStateRunning
|
|
}
|
|
}
|
|
|
|
|
|
-func isValidNode(n *api.Node) bool {
|
|
|
|
- // current simulation spec could be nil
|
|
|
|
- return n != nil && n.Spec.Availability != api.NodeAvailabilityDrain
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
func isTaskCompleted(t *api.Task, restartPolicy api.RestartPolicy_RestartCondition) bool {
|
|
func isTaskCompleted(t *api.Task, restartPolicy api.RestartPolicy_RestartCondition) bool {
|
|
if t == nil || isTaskRunning(t) {
|
|
if t == nil || isTaskRunning(t) {
|
|
return false
|
|
return false
|