123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107 |
- package allocator
- import (
- "fmt"
- "time"
- "github.com/docker/go-events"
- "github.com/docker/swarmkit/api"
- "github.com/docker/swarmkit/log"
- "github.com/docker/swarmkit/manager/allocator/networkallocator"
- "github.com/docker/swarmkit/manager/state"
- "github.com/docker/swarmkit/manager/state/store"
- "github.com/docker/swarmkit/protobuf/ptypes"
- "github.com/pkg/errors"
- "golang.org/x/net/context"
- )
- const (
- // Network allocator Voter ID for task allocation vote.
- networkVoter = "network"
- allocatedStatusMessage = "pending task scheduling"
- )
- var (
- // ErrNoIngress is returned when no ingress network is found in store
- ErrNoIngress = errors.New("no ingress network found")
- errNoChanges = errors.New("task unchanged")
- retryInterval = 5 * time.Minute
- )
- // Network context information which is used throughout the network allocation code.
- type networkContext struct {
- ingressNetwork *api.Network
- // Instance of the low-level network allocator which performs
- // the actual network allocation.
- nwkAllocator *networkallocator.NetworkAllocator
- // A set of tasks which are ready to be allocated as a batch. This is
- // distinct from "unallocatedTasks" which are tasks that failed to
- // allocate on the first try, being held for a future retry.
- pendingTasks map[string]*api.Task
- // A set of unallocated tasks which will be revisited if any thing
- // changes in system state that might help task allocation.
- unallocatedTasks map[string]*api.Task
- // A set of unallocated services which will be revisited if
- // any thing changes in system state that might help service
- // allocation.
- unallocatedServices map[string]*api.Service
- // A set of unallocated networks which will be revisited if
- // any thing changes in system state that might help network
- // allocation.
- unallocatedNetworks map[string]*api.Network
- // lastRetry is the last timestamp when unallocated
- // tasks/services/networks were retried.
- lastRetry time.Time
- }
- func (a *Allocator) doNetworkInit(ctx context.Context) (err error) {
- na, err := networkallocator.New(a.pluginGetter)
- if err != nil {
- return err
- }
- nc := &networkContext{
- nwkAllocator: na,
- pendingTasks: make(map[string]*api.Task),
- unallocatedTasks: make(map[string]*api.Task),
- unallocatedServices: make(map[string]*api.Service),
- unallocatedNetworks: make(map[string]*api.Network),
- lastRetry: time.Now(),
- }
- a.netCtx = nc
- defer func() {
- // Clear a.netCtx if initialization was unsuccessful.
- if err != nil {
- a.netCtx = nil
- }
- }()
- // Ingress network is now created at cluster's first time creation.
- // Check if we have the ingress network. If found, make sure it is
- // allocated, before reading all network objects for allocation.
- // If not found, it means it was removed by user, nothing to do here.
- ingressNetwork, err := GetIngressNetwork(a.store)
- switch err {
- case nil:
- // Try to complete ingress network allocation before anything else so
- // that the we can get the preferred subnet for ingress network.
- nc.ingressNetwork = ingressNetwork
- if !na.IsAllocated(nc.ingressNetwork) {
- if err := a.allocateNetwork(ctx, nc.ingressNetwork); err != nil {
- log.G(ctx).WithError(err).Error("failed allocating ingress network during init")
- } else if err := a.store.Batch(func(batch *store.Batch) error {
- if err := a.commitAllocatedNetwork(ctx, batch, nc.ingressNetwork); err != nil {
- log.G(ctx).WithError(err).Error("failed committing allocation of ingress network during init")
- }
- return nil
- }); err != nil {
- log.G(ctx).WithError(err).Error("failed committing allocation of ingress network during init")
- }
- }
- case ErrNoIngress:
- // Ingress network is not present in store, It means user removed it
- // and did not create a new one.
- default:
- return errors.Wrap(err, "failure while looking for ingress network during init")
- }
- // Allocate networks in the store so far before we started
- // watching.
- var networks []*api.Network
- a.store.View(func(tx store.ReadTx) {
- networks, err = store.FindNetworks(tx, store.All)
- })
- if err != nil {
- return errors.Wrap(err, "error listing all networks in store while trying to allocate during init")
- }
- var allocatedNetworks []*api.Network
- for _, n := range networks {
- if na.IsAllocated(n) {
- continue
- }
- if err := a.allocateNetwork(ctx, n); err != nil {
- log.G(ctx).WithError(err).Errorf("failed allocating network %s during init", n.ID)
- continue
- }
- allocatedNetworks = append(allocatedNetworks, n)
- }
- if err := a.store.Batch(func(batch *store.Batch) error {
- for _, n := range allocatedNetworks {
- if err := a.commitAllocatedNetwork(ctx, batch, n); err != nil {
- log.G(ctx).WithError(err).Errorf("failed committing allocation of network %s during init", n.ID)
- }
- }
- return nil
- }); err != nil {
- log.G(ctx).WithError(err).Error("failed committing allocation of networks during init")
- }
- // Allocate nodes in the store so far before we process watched events,
- // if the ingress network is present.
- if nc.ingressNetwork != nil {
- if err := a.allocateNodes(ctx); err != nil {
- return err
- }
- }
- // Allocate services in the store so far before we process watched events.
- var services []*api.Service
- a.store.View(func(tx store.ReadTx) {
- services, err = store.FindServices(tx, store.All)
- })
- if err != nil {
- return errors.Wrap(err, "error listing all services in store while trying to allocate during init")
- }
- var allocatedServices []*api.Service
- for _, s := range services {
- if !nc.nwkAllocator.ServiceNeedsAllocation(s, networkallocator.OnInit) {
- continue
- }
- if err := a.allocateService(ctx, s); err != nil {
- log.G(ctx).WithError(err).Errorf("failed allocating service %s during init", s.ID)
- continue
- }
- allocatedServices = append(allocatedServices, s)
- }
- if err := a.store.Batch(func(batch *store.Batch) error {
- for _, s := range allocatedServices {
- if err := a.commitAllocatedService(ctx, batch, s); err != nil {
- log.G(ctx).WithError(err).Errorf("failed committing allocation of service %s during init", s.ID)
- }
- }
- return nil
- }); err != nil {
- log.G(ctx).WithError(err).Error("failed committing allocation of services during init")
- }
- // Allocate tasks in the store so far before we started watching.
- var (
- tasks []*api.Task
- allocatedTasks []*api.Task
- )
- a.store.View(func(tx store.ReadTx) {
- tasks, err = store.FindTasks(tx, store.All)
- })
- if err != nil {
- return errors.Wrap(err, "error listing all tasks in store while trying to allocate during init")
- }
- for _, t := range tasks {
- if t.Status.State > api.TaskStateRunning {
- continue
- }
- var s *api.Service
- if t.ServiceID != "" {
- a.store.View(func(tx store.ReadTx) {
- s = store.GetService(tx, t.ServiceID)
- })
- }
- // Populate network attachments in the task
- // based on service spec.
- a.taskCreateNetworkAttachments(t, s)
- if taskReadyForNetworkVote(t, s, nc) {
- if t.Status.State >= api.TaskStatePending {
- continue
- }
- if a.taskAllocateVote(networkVoter, t.ID) {
- // If the task is not attached to any network, network
- // allocators job is done. Immediately cast a vote so
- // that the task can be moved to the PENDING state as
- // soon as possible.
- updateTaskStatus(t, api.TaskStatePending, allocatedStatusMessage)
- allocatedTasks = append(allocatedTasks, t)
- }
- continue
- }
- err := a.allocateTask(ctx, t)
- if err == nil {
- allocatedTasks = append(allocatedTasks, t)
- } else if err != errNoChanges {
- log.G(ctx).WithError(err).Errorf("failed allocating task %s during init", t.ID)
- nc.unallocatedTasks[t.ID] = t
- }
- }
- if err := a.store.Batch(func(batch *store.Batch) error {
- for _, t := range allocatedTasks {
- if err := a.commitAllocatedTask(ctx, batch, t); err != nil {
- log.G(ctx).WithError(err).Errorf("failed committing allocation of task %s during init", t.ID)
- }
- }
- return nil
- }); err != nil {
- log.G(ctx).WithError(err).Error("failed committing allocation of tasks during init")
- }
- return nil
- }
- func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
- nc := a.netCtx
- switch v := ev.(type) {
- case api.EventCreateNetwork:
- n := v.Network.Copy()
- if nc.nwkAllocator.IsAllocated(n) {
- break
- }
- if IsIngressNetwork(n) && nc.ingressNetwork != nil {
- log.G(ctx).Errorf("Cannot allocate ingress network %s (%s) because another ingress network is already present: %s (%s)",
- n.ID, n.Spec.Annotations.Name, nc.ingressNetwork.ID, nc.ingressNetwork.Spec.Annotations)
- break
- }
- if err := a.allocateNetwork(ctx, n); err != nil {
- log.G(ctx).WithError(err).Errorf("Failed allocation for network %s", n.ID)
- break
- }
- if err := a.store.Batch(func(batch *store.Batch) error {
- return a.commitAllocatedNetwork(ctx, batch, n)
- }); err != nil {
- log.G(ctx).WithError(err).Errorf("Failed to commit allocation for network %s", n.ID)
- }
- if IsIngressNetwork(n) {
- nc.ingressNetwork = n
- err := a.allocateNodes(ctx)
- if err != nil {
- log.G(ctx).WithError(err).Error(err)
- }
- }
- case api.EventDeleteNetwork:
- n := v.Network.Copy()
- if IsIngressNetwork(n) && nc.ingressNetwork != nil && nc.ingressNetwork.ID == n.ID {
- nc.ingressNetwork = nil
- if err := a.deallocateNodes(ctx); err != nil {
- log.G(ctx).WithError(err).Error(err)
- }
- }
- // The assumption here is that all dependent objects
- // have been cleaned up when we are here so the only
- // thing that needs to happen is free the network
- // resources.
- if err := nc.nwkAllocator.Deallocate(n); err != nil {
- log.G(ctx).WithError(err).Errorf("Failed during network free for network %s", n.ID)
- }
- delete(nc.unallocatedNetworks, n.ID)
- case api.EventCreateService:
- var s *api.Service
- a.store.View(func(tx store.ReadTx) {
- s = store.GetService(tx, v.Service.ID)
- })
- if s == nil {
- break
- }
- if !nc.nwkAllocator.ServiceNeedsAllocation(s) {
- break
- }
- if err := a.allocateService(ctx, s); err != nil {
- log.G(ctx).WithError(err).Errorf("Failed allocation for service %s", s.ID)
- break
- }
- if err := a.store.Batch(func(batch *store.Batch) error {
- return a.commitAllocatedService(ctx, batch, s)
- }); err != nil {
- log.G(ctx).WithError(err).Errorf("Failed to commit allocation for service %s", s.ID)
- }
- case api.EventUpdateService:
- // We may have already allocated this service. If a create or
- // update event is older than the current version in the store,
- // we run the risk of allocating the service a second time.
- // Only operate on the latest version of the service.
- var s *api.Service
- a.store.View(func(tx store.ReadTx) {
- s = store.GetService(tx, v.Service.ID)
- })
- if s == nil {
- break
- }
- if !nc.nwkAllocator.ServiceNeedsAllocation(s) {
- if !nc.nwkAllocator.HostPublishPortsNeedUpdate(s) {
- break
- }
- updatePortsInHostPublishMode(s)
- } else {
- if err := a.allocateService(ctx, s); err != nil {
- log.G(ctx).WithError(err).Errorf("Failed allocation during update of service %s", s.ID)
- break
- }
- }
- if err := a.store.Batch(func(batch *store.Batch) error {
- return a.commitAllocatedService(ctx, batch, s)
- }); err != nil {
- log.G(ctx).WithError(err).Errorf("Failed to commit allocation during update for service %s", s.ID)
- nc.unallocatedServices[s.ID] = s
- } else {
- delete(nc.unallocatedServices, s.ID)
- }
- case api.EventDeleteService:
- s := v.Service.Copy()
- if err := nc.nwkAllocator.ServiceDeallocate(s); err != nil {
- log.G(ctx).WithError(err).Errorf("Failed deallocation during delete of service %s", s.ID)
- }
- // Remove it from unallocatedServices just in case
- // it's still there.
- delete(nc.unallocatedServices, s.ID)
- case api.EventCreateNode, api.EventUpdateNode, api.EventDeleteNode:
- a.doNodeAlloc(ctx, ev)
- case api.EventCreateTask, api.EventUpdateTask, api.EventDeleteTask:
- a.doTaskAlloc(ctx, ev)
- case state.EventCommit:
- a.procTasksNetwork(ctx, false)
- if time.Since(nc.lastRetry) > retryInterval {
- a.procUnallocatedNetworks(ctx)
- a.procUnallocatedServices(ctx)
- a.procTasksNetwork(ctx, true)
- nc.lastRetry = time.Now()
- }
- // Any left over tasks are moved to the unallocated set
- for _, t := range nc.pendingTasks {
- nc.unallocatedTasks[t.ID] = t
- }
- nc.pendingTasks = make(map[string]*api.Task)
- }
- }
- func (a *Allocator) doNodeAlloc(ctx context.Context, ev events.Event) {
- var (
- isDelete bool
- node *api.Node
- )
- // We may have already allocated this node. If a create or update
- // event is older than the current version in the store, we run the
- // risk of allocating the node a second time. Only operate on the
- // latest version of the node.
- switch v := ev.(type) {
- case api.EventCreateNode:
- a.store.View(func(tx store.ReadTx) {
- node = store.GetNode(tx, v.Node.ID)
- })
- case api.EventUpdateNode:
- a.store.View(func(tx store.ReadTx) {
- node = store.GetNode(tx, v.Node.ID)
- })
- case api.EventDeleteNode:
- isDelete = true
- node = v.Node.Copy()
- }
- if node == nil {
- return
- }
- nc := a.netCtx
- if isDelete {
- if nc.nwkAllocator.IsNodeAllocated(node) {
- if err := nc.nwkAllocator.DeallocateNode(node); err != nil {
- log.G(ctx).WithError(err).Errorf("Failed freeing network resources for node %s", node.ID)
- }
- }
- return
- }
- if !nc.nwkAllocator.IsNodeAllocated(node) && nc.ingressNetwork != nil {
- if node.Attachment == nil {
- node.Attachment = &api.NetworkAttachment{}
- }
- node.Attachment.Network = nc.ingressNetwork.Copy()
- if err := a.allocateNode(ctx, node); err != nil {
- log.G(ctx).WithError(err).Errorf("Failed to allocate network resources for node %s", node.ID)
- return
- }
- if err := a.store.Batch(func(batch *store.Batch) error {
- return a.commitAllocatedNode(ctx, batch, node)
- }); err != nil {
- log.G(ctx).WithError(err).Errorf("Failed to commit allocation of network resources for node %s", node.ID)
- }
- }
- }
- func (a *Allocator) allocateNodes(ctx context.Context) error {
- // Allocate nodes in the store so far before we process watched events.
- var (
- allocatedNodes []*api.Node
- nodes []*api.Node
- err error
- nc = a.netCtx
- )
- a.store.View(func(tx store.ReadTx) {
- nodes, err = store.FindNodes(tx, store.All)
- })
- if err != nil {
- return errors.Wrap(err, "error listing all nodes in store while trying to allocate network resources")
- }
- for _, node := range nodes {
- if nc.nwkAllocator.IsNodeAllocated(node) {
- continue
- }
- if node.Attachment == nil {
- node.Attachment = &api.NetworkAttachment{}
- }
- node.Attachment.Network = nc.ingressNetwork.Copy()
- if err := a.allocateNode(ctx, node); err != nil {
- log.G(ctx).WithError(err).Errorf("Failed to allocate network resources for node %s", node.ID)
- continue
- }
- allocatedNodes = append(allocatedNodes, node)
- }
- if err := a.store.Batch(func(batch *store.Batch) error {
- for _, node := range allocatedNodes {
- if err := a.commitAllocatedNode(ctx, batch, node); err != nil {
- log.G(ctx).WithError(err).Errorf("Failed to commit allocation of network resources for node %s", node.ID)
- }
- }
- return nil
- }); err != nil {
- log.G(ctx).WithError(err).Error("Failed to commit allocation of network resources for nodes")
- }
- return nil
- }
- func (a *Allocator) deallocateNodes(ctx context.Context) error {
- var (
- nodes []*api.Node
- nc = a.netCtx
- err error
- )
- a.store.View(func(tx store.ReadTx) {
- nodes, err = store.FindNodes(tx, store.All)
- })
- if err != nil {
- return fmt.Errorf("error listing all nodes in store while trying to free network resources")
- }
- for _, node := range nodes {
- if nc.nwkAllocator.IsNodeAllocated(node) {
- if err := nc.nwkAllocator.DeallocateNode(node); err != nil {
- log.G(ctx).WithError(err).Errorf("Failed freeing network resources for node %s", node.ID)
- }
- node.Attachment = nil
- if err := a.store.Batch(func(batch *store.Batch) error {
- return a.commitAllocatedNode(ctx, batch, node)
- }); err != nil {
- log.G(ctx).WithError(err).Errorf("Failed to commit deallocation of network resources for node %s", node.ID)
- }
- }
- }
- return nil
- }
- // taskReadyForNetworkVote checks if the task is ready for a network
- // vote to move it to PENDING state.
- func taskReadyForNetworkVote(t *api.Task, s *api.Service, nc *networkContext) bool {
- // Task is ready for vote if the following is true:
- //
- // Task has no network attached or networks attached but all
- // of them allocated AND Task's service has no endpoint or
- // network configured or service endpoints have been
- // allocated.
- return (len(t.Networks) == 0 || nc.nwkAllocator.IsTaskAllocated(t)) &&
- (s == nil || !nc.nwkAllocator.ServiceNeedsAllocation(s))
- }
- func taskUpdateNetworks(t *api.Task, networks []*api.NetworkAttachment) {
- networksCopy := make([]*api.NetworkAttachment, 0, len(networks))
- for _, n := range networks {
- networksCopy = append(networksCopy, n.Copy())
- }
- t.Networks = networksCopy
- }
- func taskUpdateEndpoint(t *api.Task, endpoint *api.Endpoint) {
- t.Endpoint = endpoint.Copy()
- }
- // IsIngressNetworkNeeded checks whether the service requires the routing-mesh
- func IsIngressNetworkNeeded(s *api.Service) bool {
- return networkallocator.IsIngressNetworkNeeded(s)
- }
- func (a *Allocator) taskCreateNetworkAttachments(t *api.Task, s *api.Service) {
- // If task network attachments have already been filled in no
- // need to do anything else.
- if len(t.Networks) != 0 {
- return
- }
- var networks []*api.NetworkAttachment
- if IsIngressNetworkNeeded(s) && a.netCtx.ingressNetwork != nil {
- networks = append(networks, &api.NetworkAttachment{Network: a.netCtx.ingressNetwork})
- }
- a.store.View(func(tx store.ReadTx) {
- // Always prefer NetworkAttachmentConfig in the TaskSpec
- specNetworks := t.Spec.Networks
- if len(specNetworks) == 0 && s != nil && len(s.Spec.Networks) != 0 {
- specNetworks = s.Spec.Networks
- }
- for _, na := range specNetworks {
- n := store.GetNetwork(tx, na.Target)
- if n == nil {
- continue
- }
- attachment := api.NetworkAttachment{Network: n}
- attachment.Aliases = append(attachment.Aliases, na.Aliases...)
- attachment.Addresses = append(attachment.Addresses, na.Addresses...)
- attachment.DriverAttachmentOpts = na.DriverAttachmentOpts
- networks = append(networks, &attachment)
- }
- })
- taskUpdateNetworks(t, networks)
- }
- func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) {
- var (
- isDelete bool
- t *api.Task
- )
- // We may have already allocated this task. If a create or update
- // event is older than the current version in the store, we run the
- // risk of allocating the task a second time. Only operate on the
- // latest version of the task.
- switch v := ev.(type) {
- case api.EventCreateTask:
- a.store.View(func(tx store.ReadTx) {
- t = store.GetTask(tx, v.Task.ID)
- })
- case api.EventUpdateTask:
- a.store.View(func(tx store.ReadTx) {
- t = store.GetTask(tx, v.Task.ID)
- })
- case api.EventDeleteTask:
- isDelete = true
- t = v.Task.Copy()
- }
- if t == nil {
- return
- }
- nc := a.netCtx
- // If the task has stopped running then we should free the network
- // resources associated with the task right away.
- if t.Status.State > api.TaskStateRunning || isDelete {
- if nc.nwkAllocator.IsTaskAllocated(t) {
- if err := nc.nwkAllocator.DeallocateTask(t); err != nil {
- log.G(ctx).WithError(err).Errorf("Failed freeing network resources for task %s", t.ID)
- }
- }
- // Cleanup any task references that might exist
- delete(nc.pendingTasks, t.ID)
- delete(nc.unallocatedTasks, t.ID)
- return
- }
- // If we are already in allocated state, there is
- // absolutely nothing else to do.
- if t.Status.State >= api.TaskStatePending {
- delete(nc.pendingTasks, t.ID)
- delete(nc.unallocatedTasks, t.ID)
- return
- }
- var s *api.Service
- if t.ServiceID != "" {
- a.store.View(func(tx store.ReadTx) {
- s = store.GetService(tx, t.ServiceID)
- })
- if s == nil {
- // If the task is running it is not normal to
- // not be able to find the associated
- // service. If the task is not running (task
- // is either dead or the desired state is set
- // to dead) then the service may not be
- // available in store. But we still need to
- // cleanup network resources associated with
- // the task.
- if t.Status.State <= api.TaskStateRunning && !isDelete {
- log.G(ctx).Errorf("Event %T: Failed to get service %s for task %s state %s: could not find service %s", ev, t.ServiceID, t.ID, t.Status.State, t.ServiceID)
- return
- }
- }
- }
- // Populate network attachments in the task
- // based on service spec.
- a.taskCreateNetworkAttachments(t, s)
- nc.pendingTasks[t.ID] = t
- }
- func (a *Allocator) allocateNode(ctx context.Context, node *api.Node) error {
- return a.netCtx.nwkAllocator.AllocateNode(node)
- }
- func (a *Allocator) commitAllocatedNode(ctx context.Context, batch *store.Batch, node *api.Node) error {
- if err := batch.Update(func(tx store.Tx) error {
- err := store.UpdateNode(tx, node)
- if err == store.ErrSequenceConflict {
- storeNode := store.GetNode(tx, node.ID)
- storeNode.Attachment = node.Attachment.Copy()
- err = store.UpdateNode(tx, storeNode)
- }
- return errors.Wrapf(err, "failed updating state in store transaction for node %s", node.ID)
- }); err != nil {
- if err := a.netCtx.nwkAllocator.DeallocateNode(node); err != nil {
- log.G(ctx).WithError(err).Errorf("failed rolling back allocation of node %s", node.ID)
- }
- return err
- }
- return nil
- }
- // This function prepares the service object for being updated when the change regards
- // the published ports in host mode: It resets the runtime state ports (s.Endpoint.Ports)
- // to the current ingress mode runtime state ports plus the newly configured publish mode ports,
- // so that the service allocation invoked on this new service object will trigger the deallocation
- // of any old publish mode port and allocation of any new one.
- func updatePortsInHostPublishMode(s *api.Service) {
- // First, remove all host-mode ports from s.Endpoint.Ports
- if s.Endpoint != nil {
- var portConfigs []*api.PortConfig
- for _, portConfig := range s.Endpoint.Ports {
- if portConfig.PublishMode != api.PublishModeHost {
- portConfigs = append(portConfigs, portConfig)
- }
- }
- s.Endpoint.Ports = portConfigs
- }
- // Add back all host-mode ports
- if s.Spec.Endpoint != nil {
- if s.Endpoint == nil {
- s.Endpoint = &api.Endpoint{}
- }
- for _, portConfig := range s.Spec.Endpoint.Ports {
- if portConfig.PublishMode == api.PublishModeHost {
- s.Endpoint.Ports = append(s.Endpoint.Ports, portConfig.Copy())
- }
- }
- }
- s.Endpoint.Spec = s.Spec.Endpoint.Copy()
- }
- func (a *Allocator) allocateService(ctx context.Context, s *api.Service) error {
- nc := a.netCtx
- if s.Spec.Endpoint != nil {
- // service has user-defined endpoint
- if s.Endpoint == nil {
- // service currently has no allocated endpoint, need allocated.
- s.Endpoint = &api.Endpoint{
- Spec: s.Spec.Endpoint.Copy(),
- }
- }
- // The service is trying to expose ports to the external
- // world. Automatically attach the service to the ingress
- // network only if it is not already done.
- if IsIngressNetworkNeeded(s) {
- if nc.ingressNetwork == nil {
- return fmt.Errorf("ingress network is missing")
- }
- var found bool
- for _, vip := range s.Endpoint.VirtualIPs {
- if vip.NetworkID == nc.ingressNetwork.ID {
- found = true
- break
- }
- }
- if !found {
- s.Endpoint.VirtualIPs = append(s.Endpoint.VirtualIPs,
- &api.Endpoint_VirtualIP{NetworkID: nc.ingressNetwork.ID})
- }
- }
- } else if s.Endpoint != nil {
- // service has no user-defined endpoints while has already allocated network resources,
- // need deallocated.
- if err := nc.nwkAllocator.ServiceDeallocate(s); err != nil {
- return err
- }
- }
- if err := nc.nwkAllocator.ServiceAllocate(s); err != nil {
- nc.unallocatedServices[s.ID] = s
- return err
- }
- // If the service doesn't expose ports any more and if we have
- // any lingering virtual IP references for ingress network
- // clean them up here.
- if !IsIngressNetworkNeeded(s) && nc.ingressNetwork != nil {
- if s.Endpoint != nil {
- for i, vip := range s.Endpoint.VirtualIPs {
- if vip.NetworkID == nc.ingressNetwork.ID {
- n := len(s.Endpoint.VirtualIPs)
- s.Endpoint.VirtualIPs[i], s.Endpoint.VirtualIPs[n-1] = s.Endpoint.VirtualIPs[n-1], nil
- s.Endpoint.VirtualIPs = s.Endpoint.VirtualIPs[:n-1]
- break
- }
- }
- }
- }
- return nil
- }
- func (a *Allocator) commitAllocatedService(ctx context.Context, batch *store.Batch, s *api.Service) error {
- if err := batch.Update(func(tx store.Tx) error {
- err := store.UpdateService(tx, s)
- if err == store.ErrSequenceConflict {
- storeService := store.GetService(tx, s.ID)
- storeService.Endpoint = s.Endpoint
- err = store.UpdateService(tx, storeService)
- }
- return errors.Wrapf(err, "failed updating state in store transaction for service %s", s.ID)
- }); err != nil {
- if err := a.netCtx.nwkAllocator.ServiceDeallocate(s); err != nil {
- log.G(ctx).WithError(err).Errorf("failed rolling back allocation of service %s", s.ID)
- }
- return err
- }
- return nil
- }
- func (a *Allocator) allocateNetwork(ctx context.Context, n *api.Network) error {
- nc := a.netCtx
- if err := nc.nwkAllocator.Allocate(n); err != nil {
- nc.unallocatedNetworks[n.ID] = n
- return errors.Wrapf(err, "failed during network allocation for network %s", n.ID)
- }
- return nil
- }
- func (a *Allocator) commitAllocatedNetwork(ctx context.Context, batch *store.Batch, n *api.Network) error {
- if err := batch.Update(func(tx store.Tx) error {
- if err := store.UpdateNetwork(tx, n); err != nil {
- return errors.Wrapf(err, "failed updating state in store transaction for network %s", n.ID)
- }
- return nil
- }); err != nil {
- if err := a.netCtx.nwkAllocator.Deallocate(n); err != nil {
- log.G(ctx).WithError(err).Errorf("failed rolling back allocation of network %s", n.ID)
- }
- return err
- }
- return nil
- }
- func (a *Allocator) allocateTask(ctx context.Context, t *api.Task) (err error) {
- taskUpdated := false
- nc := a.netCtx
- // We might be here even if a task allocation has already
- // happened but wasn't successfully committed to store. In such
- // cases skip allocation and go straight ahead to updating the
- // store.
- if !nc.nwkAllocator.IsTaskAllocated(t) {
- a.store.View(func(tx store.ReadTx) {
- if t.ServiceID != "" {
- s := store.GetService(tx, t.ServiceID)
- if s == nil {
- err = fmt.Errorf("could not find service %s", t.ServiceID)
- return
- }
- if nc.nwkAllocator.ServiceNeedsAllocation(s) {
- err = fmt.Errorf("service %s to which this task %s belongs has pending allocations", s.ID, t.ID)
- return
- }
- if s.Endpoint != nil {
- taskUpdateEndpoint(t, s.Endpoint)
- taskUpdated = true
- }
- }
- for _, na := range t.Networks {
- n := store.GetNetwork(tx, na.Network.ID)
- if n == nil {
- err = fmt.Errorf("failed to retrieve network %s while allocating task %s", na.Network.ID, t.ID)
- return
- }
- if !nc.nwkAllocator.IsAllocated(n) {
- err = fmt.Errorf("network %s attached to task %s not allocated yet", n.ID, t.ID)
- return
- }
- na.Network = n
- }
- if err = nc.nwkAllocator.AllocateTask(t); err != nil {
- err = errors.Wrapf(err, "failed during network allocation for task %s", t.ID)
- return
- }
- if nc.nwkAllocator.IsTaskAllocated(t) {
- taskUpdated = true
- }
- })
- if err != nil {
- return err
- }
- }
- // Update the network allocations and moving to
- // PENDING state on top of the latest store state.
- if a.taskAllocateVote(networkVoter, t.ID) {
- if t.Status.State < api.TaskStatePending {
- updateTaskStatus(t, api.TaskStatePending, allocatedStatusMessage)
- taskUpdated = true
- }
- }
- if !taskUpdated {
- return errNoChanges
- }
- return nil
- }
- func (a *Allocator) commitAllocatedTask(ctx context.Context, batch *store.Batch, t *api.Task) error {
- return batch.Update(func(tx store.Tx) error {
- err := store.UpdateTask(tx, t)
- if err == store.ErrSequenceConflict {
- storeTask := store.GetTask(tx, t.ID)
- taskUpdateNetworks(storeTask, t.Networks)
- taskUpdateEndpoint(storeTask, t.Endpoint)
- if storeTask.Status.State < api.TaskStatePending {
- storeTask.Status = t.Status
- }
- err = store.UpdateTask(tx, storeTask)
- }
- return errors.Wrapf(err, "failed updating state in store transaction for task %s", t.ID)
- })
- }
- func (a *Allocator) procUnallocatedNetworks(ctx context.Context) {
- nc := a.netCtx
- var allocatedNetworks []*api.Network
- for _, n := range nc.unallocatedNetworks {
- if !nc.nwkAllocator.IsAllocated(n) {
- if err := a.allocateNetwork(ctx, n); err != nil {
- log.G(ctx).WithError(err).Debugf("Failed allocation of unallocated network %s", n.ID)
- continue
- }
- allocatedNetworks = append(allocatedNetworks, n)
- }
- }
- if len(allocatedNetworks) == 0 {
- return
- }
- err := a.store.Batch(func(batch *store.Batch) error {
- for _, n := range allocatedNetworks {
- if err := a.commitAllocatedNetwork(ctx, batch, n); err != nil {
- log.G(ctx).WithError(err).Debugf("Failed to commit allocation of unallocated network %s", n.ID)
- continue
- }
- delete(nc.unallocatedNetworks, n.ID)
- }
- return nil
- })
- if err != nil {
- log.G(ctx).WithError(err).Error("Failed to commit allocation of unallocated networks")
- // We optimistically removed these from nc.unallocatedNetworks
- // above in anticipation of successfully committing the batch,
- // but since the transaction has failed, we requeue them here.
- for _, n := range allocatedNetworks {
- nc.unallocatedNetworks[n.ID] = n
- }
- }
- }
- func (a *Allocator) procUnallocatedServices(ctx context.Context) {
- nc := a.netCtx
- var allocatedServices []*api.Service
- for _, s := range nc.unallocatedServices {
- if nc.nwkAllocator.ServiceNeedsAllocation(s) {
- if err := a.allocateService(ctx, s); err != nil {
- log.G(ctx).WithError(err).Debugf("Failed allocation of unallocated service %s", s.ID)
- continue
- }
- allocatedServices = append(allocatedServices, s)
- }
- }
- if len(allocatedServices) == 0 {
- return
- }
- err := a.store.Batch(func(batch *store.Batch) error {
- for _, s := range allocatedServices {
- if err := a.commitAllocatedService(ctx, batch, s); err != nil {
- log.G(ctx).WithError(err).Debugf("Failed to commit allocation of unallocated service %s", s.ID)
- continue
- }
- delete(nc.unallocatedServices, s.ID)
- }
- return nil
- })
- if err != nil {
- log.G(ctx).WithError(err).Error("Failed to commit allocation of unallocated services")
- // We optimistically removed these from nc.unallocatedServices
- // above in anticipation of successfully committing the batch,
- // but since the transaction has failed, we requeue them here.
- for _, s := range allocatedServices {
- nc.unallocatedServices[s.ID] = s
- }
- }
- }
- func (a *Allocator) procTasksNetwork(ctx context.Context, onRetry bool) {
- nc := a.netCtx
- quiet := false
- toAllocate := nc.pendingTasks
- if onRetry {
- toAllocate = nc.unallocatedTasks
- quiet = true
- }
- allocatedTasks := make([]*api.Task, 0, len(toAllocate))
- for _, t := range toAllocate {
- if err := a.allocateTask(ctx, t); err == nil {
- allocatedTasks = append(allocatedTasks, t)
- } else if err != errNoChanges {
- if quiet {
- log.G(ctx).WithError(err).Debug("task allocation failure")
- } else {
- log.G(ctx).WithError(err).Error("task allocation failure")
- }
- }
- }
- if len(allocatedTasks) == 0 {
- return
- }
- err := a.store.Batch(func(batch *store.Batch) error {
- for _, t := range allocatedTasks {
- err := a.commitAllocatedTask(ctx, batch, t)
- if err != nil {
- log.G(ctx).WithError(err).Error("task allocation commit failure")
- continue
- }
- delete(toAllocate, t.ID)
- }
- return nil
- })
- if err != nil {
- log.G(ctx).WithError(err).Error("failed a store batch operation while processing tasks")
- // We optimistically removed these from toAllocate above in
- // anticipation of successfully committing the batch, but since
- // the transaction has failed, we requeue them here.
- for _, t := range allocatedTasks {
- toAllocate[t.ID] = t
- }
- }
- }
- // updateTaskStatus sets TaskStatus and updates timestamp.
- func updateTaskStatus(t *api.Task, newStatus api.TaskState, message string) {
- t.Status.State = newStatus
- t.Status.Message = message
- t.Status.Timestamp = ptypes.MustTimestampProto(time.Now())
- }
- // IsIngressNetwork returns whether the passed network is an ingress network.
- func IsIngressNetwork(nw *api.Network) bool {
- return networkallocator.IsIngressNetwork(nw)
- }
- // GetIngressNetwork fetches the ingress network from store.
- // ErrNoIngress will be returned if the ingress network is not present,
- // nil otherwise. In case of any other failure in accessing the store,
- // the respective error will be reported as is.
- func GetIngressNetwork(s *store.MemoryStore) (*api.Network, error) {
- var (
- networks []*api.Network
- err error
- )
- s.View(func(tx store.ReadTx) {
- networks, err = store.FindNetworks(tx, store.All)
- })
- if err != nil {
- return nil, err
- }
- for _, n := range networks {
- if IsIngressNetwork(n) {
- return n, nil
- }
- }
- return nil, ErrNoIngress
- }
|