|
@@ -67,7 +67,7 @@ type networkContext struct {
|
|
|
unallocatedNetworks map[string]*api.Network
|
|
|
}
|
|
|
|
|
|
-func (a *Allocator) doNetworkInit(ctx context.Context) error {
|
|
|
+func (a *Allocator) doNetworkInit(ctx context.Context) (err error) {
|
|
|
na, err := networkallocator.New()
|
|
|
if err != nil {
|
|
|
return err
|
|
@@ -80,6 +80,13 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {
|
|
|
unallocatedNetworks: make(map[string]*api.Network),
|
|
|
ingressNetwork: newIngressNetwork(),
|
|
|
}
|
|
|
+ a.netCtx = nc
|
|
|
+ defer func() {
|
|
|
+ // Clear a.netCtx if initialization was unsuccessful.
|
|
|
+ if err != nil {
|
|
|
+ a.netCtx = nil
|
|
|
+ }
|
|
|
+ }()
|
|
|
|
|
|
// Check if we have the ingress network. If not found create
|
|
|
// it before reading all network objects for allocation.
|
|
@@ -124,7 +131,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {
|
|
|
// that the we can get the preferred subnet for ingress
|
|
|
// network.
|
|
|
if !na.IsAllocated(nc.ingressNetwork) {
|
|
|
- if err := a.allocateNetwork(ctx, nc, nc.ingressNetwork); err != nil {
|
|
|
+ if err := a.allocateNetwork(ctx, nc.ingressNetwork); err != nil {
|
|
|
log.G(ctx).Errorf("failed allocating ingress network during init: %v", err)
|
|
|
}
|
|
|
|
|
@@ -154,7 +161,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
- if err := a.allocateNetwork(ctx, nc, n); err != nil {
|
|
|
+ if err := a.allocateNetwork(ctx, n); err != nil {
|
|
|
log.G(ctx).Errorf("failed allocating network %s during init: %v", n.ID, err)
|
|
|
}
|
|
|
}
|
|
@@ -178,7 +185,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {
|
|
|
}
|
|
|
|
|
|
node.Attachment.Network = nc.ingressNetwork.Copy()
|
|
|
- if err := a.allocateNode(ctx, nc, node); err != nil {
|
|
|
+ if err := a.allocateNode(ctx, node); err != nil {
|
|
|
log.G(ctx).Errorf("Failed to allocate network resources for node %s during init: %v", node.ID, err)
|
|
|
}
|
|
|
}
|
|
@@ -197,7 +204,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
- if err := a.allocateService(ctx, nc, s); err != nil {
|
|
|
+ if err := a.allocateService(ctx, s); err != nil {
|
|
|
log.G(ctx).Errorf("failed allocating service %s during init: %v", s.ID, err)
|
|
|
}
|
|
|
}
|
|
@@ -259,7 +266,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {
|
|
|
}
|
|
|
|
|
|
err := batch.Update(func(tx store.Tx) error {
|
|
|
- _, err := a.allocateTask(ctx, nc, tx, t)
|
|
|
+ _, err := a.allocateTask(ctx, tx, t)
|
|
|
return err
|
|
|
})
|
|
|
if err != nil {
|
|
@@ -273,7 +280,6 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
- a.netCtx = nc
|
|
|
return nil
|
|
|
}
|
|
|
|
|
@@ -287,7 +293,7 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
|
|
|
break
|
|
|
}
|
|
|
|
|
|
- if err := a.allocateNetwork(ctx, nc, n); err != nil {
|
|
|
+ if err := a.allocateNetwork(ctx, n); err != nil {
|
|
|
log.G(ctx).Errorf("Failed allocation for network %s: %v", n.ID, err)
|
|
|
break
|
|
|
}
|
|
@@ -308,7 +314,7 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
|
|
|
break
|
|
|
}
|
|
|
|
|
|
- if err := a.allocateService(ctx, nc, s); err != nil {
|
|
|
+ if err := a.allocateService(ctx, s); err != nil {
|
|
|
log.G(ctx).Errorf("Failed allocation for service %s: %v", s.ID, err)
|
|
|
break
|
|
|
}
|
|
@@ -319,7 +325,7 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
|
|
|
break
|
|
|
}
|
|
|
|
|
|
- if err := a.allocateService(ctx, nc, s); err != nil {
|
|
|
+ if err := a.allocateService(ctx, s); err != nil {
|
|
|
log.G(ctx).Errorf("Failed allocation during update of service %s: %v", s.ID, err)
|
|
|
break
|
|
|
}
|
|
@@ -334,18 +340,18 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
|
|
|
// it's still there.
|
|
|
delete(nc.unallocatedServices, s.ID)
|
|
|
case state.EventCreateNode, state.EventUpdateNode, state.EventDeleteNode:
|
|
|
- a.doNodeAlloc(ctx, nc, ev)
|
|
|
+ a.doNodeAlloc(ctx, ev)
|
|
|
case state.EventCreateTask, state.EventUpdateTask, state.EventDeleteTask:
|
|
|
- a.doTaskAlloc(ctx, nc, ev)
|
|
|
+ a.doTaskAlloc(ctx, ev)
|
|
|
case state.EventCommit:
|
|
|
- a.procUnallocatedNetworks(ctx, nc)
|
|
|
- a.procUnallocatedServices(ctx, nc)
|
|
|
- a.procUnallocatedTasksNetwork(ctx, nc)
|
|
|
+ a.procUnallocatedNetworks(ctx)
|
|
|
+ a.procUnallocatedServices(ctx)
|
|
|
+ a.procUnallocatedTasksNetwork(ctx)
|
|
|
return
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (a *Allocator) doNodeAlloc(ctx context.Context, nc *networkContext, ev events.Event) {
|
|
|
+func (a *Allocator) doNodeAlloc(ctx context.Context, ev events.Event) {
|
|
|
var (
|
|
|
isDelete bool
|
|
|
node *api.Node
|
|
@@ -361,6 +367,8 @@ func (a *Allocator) doNodeAlloc(ctx context.Context, nc *networkContext, ev even
|
|
|
node = v.Node.Copy()
|
|
|
}
|
|
|
|
|
|
+ nc := a.netCtx
|
|
|
+
|
|
|
if isDelete {
|
|
|
if nc.nwkAllocator.IsNodeAllocated(node) {
|
|
|
if err := nc.nwkAllocator.DeallocateNode(node); err != nil {
|
|
@@ -376,8 +384,8 @@ func (a *Allocator) doNodeAlloc(ctx context.Context, nc *networkContext, ev even
|
|
|
}
|
|
|
|
|
|
node.Attachment.Network = nc.ingressNetwork.Copy()
|
|
|
- if err := a.allocateNode(ctx, nc, node); err != nil {
|
|
|
- log.G(ctx).Errorf("Fauled to allocate network resources for node %s: %v", node.ID, err)
|
|
|
+ if err := a.allocateNode(ctx, node); err != nil {
|
|
|
+ log.G(ctx).Errorf("Failed to allocate network resources for node %s: %v", node.ID, err)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -451,7 +459,7 @@ func (a *Allocator) taskCreateNetworkAttachments(t *api.Task, s *api.Service) {
|
|
|
taskUpdateNetworks(t, networks)
|
|
|
}
|
|
|
|
|
|
-func (a *Allocator) doTaskAlloc(ctx context.Context, nc *networkContext, ev events.Event) {
|
|
|
+func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) {
|
|
|
var (
|
|
|
isDelete bool
|
|
|
t *api.Task
|
|
@@ -467,6 +475,8 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, nc *networkContext, ev even
|
|
|
t = v.Task.Copy()
|
|
|
}
|
|
|
|
|
|
+ nc := a.netCtx
|
|
|
+
|
|
|
// If the task has stopped running or it's being deleted then
|
|
|
// we should free the network resources associated with the
|
|
|
// task right away.
|
|
@@ -517,7 +527,9 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, nc *networkContext, ev even
|
|
|
nc.unallocatedTasks[t.ID] = t
|
|
|
}
|
|
|
|
|
|
-func (a *Allocator) allocateNode(ctx context.Context, nc *networkContext, node *api.Node) error {
|
|
|
+func (a *Allocator) allocateNode(ctx context.Context, node *api.Node) error {
|
|
|
+ nc := a.netCtx
|
|
|
+
|
|
|
if err := nc.nwkAllocator.AllocateNode(node); err != nil {
|
|
|
return err
|
|
|
}
|
|
@@ -550,7 +562,9 @@ func (a *Allocator) allocateNode(ctx context.Context, nc *networkContext, node *
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (a *Allocator) allocateService(ctx context.Context, nc *networkContext, s *api.Service) error {
|
|
|
+func (a *Allocator) allocateService(ctx context.Context, s *api.Service) error {
|
|
|
+ nc := a.netCtx
|
|
|
+
|
|
|
if s.Spec.Endpoint != nil {
|
|
|
// service has user-defined endpoint
|
|
|
if s.Endpoint == nil {
|
|
@@ -635,7 +649,9 @@ func (a *Allocator) allocateService(ctx context.Context, nc *networkContext, s *
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (a *Allocator) allocateNetwork(ctx context.Context, nc *networkContext, n *api.Network) error {
|
|
|
+func (a *Allocator) allocateNetwork(ctx context.Context, n *api.Network) error {
|
|
|
+ nc := a.netCtx
|
|
|
+
|
|
|
if err := nc.nwkAllocator.Allocate(n); err != nil {
|
|
|
nc.unallocatedNetworks[n.ID] = n
|
|
|
return fmt.Errorf("failed during network allocation for network %s: %v", n.ID, err)
|
|
@@ -657,7 +673,7 @@ func (a *Allocator) allocateNetwork(ctx context.Context, nc *networkContext, n *
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (a *Allocator) allocateTask(ctx context.Context, nc *networkContext, tx store.Tx, t *api.Task) (*api.Task, error) {
|
|
|
+func (a *Allocator) allocateTask(ctx context.Context, tx store.Tx, t *api.Task) (*api.Task, error) {
|
|
|
taskUpdated := false
|
|
|
|
|
|
// Get the latest task state from the store before updating.
|
|
@@ -666,6 +682,8 @@ func (a *Allocator) allocateTask(ctx context.Context, nc *networkContext, tx sto
|
|
|
return nil, fmt.Errorf("could not find task %s while trying to update network allocation", t.ID)
|
|
|
}
|
|
|
|
|
|
+ nc := a.netCtx
|
|
|
+
|
|
|
// We might be here even if a task allocation has already
|
|
|
// happened but wasn't successfully committed to store. In such
|
|
|
// cases skip allocation and go straight ahead to updating the
|
|
@@ -725,10 +743,11 @@ func (a *Allocator) allocateTask(ctx context.Context, nc *networkContext, tx sto
|
|
|
return storeT, nil
|
|
|
}
|
|
|
|
|
|
-func (a *Allocator) procUnallocatedNetworks(ctx context.Context, nc *networkContext) {
|
|
|
+func (a *Allocator) procUnallocatedNetworks(ctx context.Context) {
|
|
|
+ nc := a.netCtx
|
|
|
for _, n := range nc.unallocatedNetworks {
|
|
|
if !nc.nwkAllocator.IsAllocated(n) {
|
|
|
- if err := a.allocateNetwork(ctx, nc, n); err != nil {
|
|
|
+ if err := a.allocateNetwork(ctx, n); err != nil {
|
|
|
log.G(ctx).Debugf("Failed allocation of unallocated network %s: %v", n.ID, err)
|
|
|
continue
|
|
|
}
|
|
@@ -738,10 +757,11 @@ func (a *Allocator) procUnallocatedNetworks(ctx context.Context, nc *networkCont
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (a *Allocator) procUnallocatedServices(ctx context.Context, nc *networkContext) {
|
|
|
+func (a *Allocator) procUnallocatedServices(ctx context.Context) {
|
|
|
+ nc := a.netCtx
|
|
|
for _, s := range nc.unallocatedServices {
|
|
|
if !nc.nwkAllocator.IsServiceAllocated(s) {
|
|
|
- if err := a.allocateService(ctx, nc, s); err != nil {
|
|
|
+ if err := a.allocateService(ctx, s); err != nil {
|
|
|
log.G(ctx).Debugf("Failed allocation of unallocated service %s: %v", s.ID, err)
|
|
|
continue
|
|
|
}
|
|
@@ -751,7 +771,8 @@ func (a *Allocator) procUnallocatedServices(ctx context.Context, nc *networkCont
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (a *Allocator) procUnallocatedTasksNetwork(ctx context.Context, nc *networkContext) {
|
|
|
+func (a *Allocator) procUnallocatedTasksNetwork(ctx context.Context) {
|
|
|
+ nc := a.netCtx
|
|
|
tasks := make([]*api.Task, 0, len(nc.unallocatedTasks))
|
|
|
|
|
|
committed, err := a.store.Batch(func(batch *store.Batch) error {
|
|
@@ -759,7 +780,7 @@ func (a *Allocator) procUnallocatedTasksNetwork(ctx context.Context, nc *network
|
|
|
var allocatedT *api.Task
|
|
|
err := batch.Update(func(tx store.Tx) error {
|
|
|
var err error
|
|
|
- allocatedT, err = a.allocateTask(ctx, nc, tx, t)
|
|
|
+ allocatedT, err = a.allocateTask(ctx, tx, t)
|
|
|
return err
|
|
|
})
|
|
|
|