network.go 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993
  1. package allocator
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/docker/go-events"
  6. "github.com/docker/swarmkit/api"
  7. "github.com/docker/swarmkit/identity"
  8. "github.com/docker/swarmkit/log"
  9. "github.com/docker/swarmkit/manager/allocator/networkallocator"
  10. "github.com/docker/swarmkit/manager/state"
  11. "github.com/docker/swarmkit/manager/state/store"
  12. "github.com/docker/swarmkit/protobuf/ptypes"
  13. "github.com/pkg/errors"
  14. "golang.org/x/net/context"
  15. )
  16. const (
  17. // Network allocator Voter ID for task allocation vote.
  18. networkVoter = "network"
  19. ingressNetworkName = "ingress"
  20. ingressSubnet = "10.255.0.0/16"
  21. allocatedStatusMessage = "pending task scheduling"
  22. )
  23. var errNoChanges = errors.New("task unchanged")
  24. func newIngressNetwork() *api.Network {
  25. return &api.Network{
  26. Spec: api.NetworkSpec{
  27. Annotations: api.Annotations{
  28. Name: ingressNetworkName,
  29. Labels: map[string]string{
  30. "com.docker.swarm.internal": "true",
  31. },
  32. },
  33. DriverConfig: &api.Driver{},
  34. IPAM: &api.IPAMOptions{
  35. Driver: &api.Driver{},
  36. Configs: []*api.IPAMConfig{
  37. {
  38. Subnet: ingressSubnet,
  39. },
  40. },
  41. },
  42. },
  43. }
  44. }
  45. // Network context information which is used throughout the network allocation code.
  46. type networkContext struct {
  47. ingressNetwork *api.Network
  48. // Instance of the low-level network allocator which performs
  49. // the actual network allocation.
  50. nwkAllocator *networkallocator.NetworkAllocator
  51. // A table of unallocated tasks which will be revisited if any thing
  52. // changes in system state that might help task allocation.
  53. unallocatedTasks map[string]*api.Task
  54. // A table of unallocated services which will be revisited if
  55. // any thing changes in system state that might help service
  56. // allocation.
  57. unallocatedServices map[string]*api.Service
  58. // A table of unallocated networks which will be revisited if
  59. // any thing changes in system state that might help network
  60. // allocation.
  61. unallocatedNetworks map[string]*api.Network
  62. }
  63. func (a *Allocator) doNetworkInit(ctx context.Context) (err error) {
  64. na, err := networkallocator.New(a.pluginGetter)
  65. if err != nil {
  66. return err
  67. }
  68. nc := &networkContext{
  69. nwkAllocator: na,
  70. unallocatedTasks: make(map[string]*api.Task),
  71. unallocatedServices: make(map[string]*api.Service),
  72. unallocatedNetworks: make(map[string]*api.Network),
  73. ingressNetwork: newIngressNetwork(),
  74. }
  75. a.netCtx = nc
  76. defer func() {
  77. // Clear a.netCtx if initialization was unsuccessful.
  78. if err != nil {
  79. a.netCtx = nil
  80. }
  81. }()
  82. // Check if we have the ingress network. If not found create
  83. // it before reading all network objects for allocation.
  84. var networks []*api.Network
  85. a.store.View(func(tx store.ReadTx) {
  86. networks, err = store.FindNetworks(tx, store.ByName(ingressNetworkName))
  87. if len(networks) > 0 {
  88. nc.ingressNetwork = networks[0]
  89. }
  90. })
  91. if err != nil {
  92. return errors.Wrap(err, "failed to find ingress network during init")
  93. }
  94. // If ingress network is not found, create one right away
  95. // using the predefined template.
  96. if len(networks) == 0 {
  97. if err := a.store.Update(func(tx store.Tx) error {
  98. nc.ingressNetwork.ID = identity.NewID()
  99. if err := store.CreateNetwork(tx, nc.ingressNetwork); err != nil {
  100. return err
  101. }
  102. return nil
  103. }); err != nil {
  104. return errors.Wrap(err, "failed to create ingress network")
  105. }
  106. a.store.View(func(tx store.ReadTx) {
  107. networks, err = store.FindNetworks(tx, store.ByName(ingressNetworkName))
  108. if len(networks) > 0 {
  109. nc.ingressNetwork = networks[0]
  110. }
  111. })
  112. if err != nil {
  113. return errors.Wrap(err, "failed to find ingress network after creating it")
  114. }
  115. }
  116. // Try to complete ingress network allocation before anything else so
  117. // that the we can get the preferred subnet for ingress
  118. // network.
  119. if !na.IsAllocated(nc.ingressNetwork) {
  120. if err := a.allocateNetwork(ctx, nc.ingressNetwork); err != nil {
  121. log.G(ctx).WithError(err).Error("failed allocating ingress network during init")
  122. } else if _, err := a.store.Batch(func(batch *store.Batch) error {
  123. if err := a.commitAllocatedNetwork(ctx, batch, nc.ingressNetwork); err != nil {
  124. log.G(ctx).WithError(err).Error("failed committing allocation of ingress network during init")
  125. }
  126. return nil
  127. }); err != nil {
  128. log.G(ctx).WithError(err).Error("failed committing allocation of ingress network during init")
  129. }
  130. }
  131. // Allocate networks in the store so far before we started
  132. // watching.
  133. a.store.View(func(tx store.ReadTx) {
  134. networks, err = store.FindNetworks(tx, store.All)
  135. })
  136. if err != nil {
  137. return errors.Wrap(err, "error listing all networks in store while trying to allocate during init")
  138. }
  139. var allocatedNetworks []*api.Network
  140. for _, n := range networks {
  141. if na.IsAllocated(n) {
  142. continue
  143. }
  144. if err := a.allocateNetwork(ctx, n); err != nil {
  145. log.G(ctx).WithError(err).Errorf("failed allocating network %s during init", n.ID)
  146. continue
  147. }
  148. allocatedNetworks = append(allocatedNetworks, n)
  149. }
  150. if _, err := a.store.Batch(func(batch *store.Batch) error {
  151. for _, n := range allocatedNetworks {
  152. if err := a.commitAllocatedNetwork(ctx, batch, n); err != nil {
  153. log.G(ctx).WithError(err).Errorf("failed committing allocation of network %s during init", n.ID)
  154. }
  155. }
  156. return nil
  157. }); err != nil {
  158. log.G(ctx).WithError(err).Error("failed committing allocation of networks during init")
  159. }
  160. // Allocate nodes in the store so far before we process watched events.
  161. var nodes []*api.Node
  162. a.store.View(func(tx store.ReadTx) {
  163. nodes, err = store.FindNodes(tx, store.All)
  164. })
  165. if err != nil {
  166. return errors.Wrap(err, "error listing all nodes in store while trying to allocate during init")
  167. }
  168. var allocatedNodes []*api.Node
  169. for _, node := range nodes {
  170. if na.IsNodeAllocated(node) {
  171. continue
  172. }
  173. if node.Attachment == nil {
  174. node.Attachment = &api.NetworkAttachment{}
  175. }
  176. node.Attachment.Network = nc.ingressNetwork.Copy()
  177. if err := a.allocateNode(ctx, node); err != nil {
  178. log.G(ctx).WithError(err).Errorf("Failed to allocate network resources for node %s during init", node.ID)
  179. continue
  180. }
  181. allocatedNodes = append(allocatedNodes, node)
  182. }
  183. if _, err := a.store.Batch(func(batch *store.Batch) error {
  184. for _, node := range allocatedNodes {
  185. if err := a.commitAllocatedNode(ctx, batch, node); err != nil {
  186. log.G(ctx).WithError(err).Errorf("Failed to commit allocation of network resources for node %s during init", node.ID)
  187. }
  188. }
  189. return nil
  190. }); err != nil {
  191. log.G(ctx).WithError(err).Error("Failed to commit allocation of network resources for nodes during init")
  192. }
  193. // Allocate services in the store so far before we process watched events.
  194. var services []*api.Service
  195. a.store.View(func(tx store.ReadTx) {
  196. services, err = store.FindServices(tx, store.All)
  197. })
  198. if err != nil {
  199. return errors.Wrap(err, "error listing all services in store while trying to allocate during init")
  200. }
  201. var allocatedServices []*api.Service
  202. for _, s := range services {
  203. if nc.nwkAllocator.IsServiceAllocated(s) {
  204. continue
  205. }
  206. if err := a.allocateService(ctx, s); err != nil {
  207. log.G(ctx).WithError(err).Errorf("failed allocating service %s during init", s.ID)
  208. continue
  209. }
  210. allocatedServices = append(allocatedServices, s)
  211. }
  212. if _, err := a.store.Batch(func(batch *store.Batch) error {
  213. for _, s := range allocatedServices {
  214. if err := a.commitAllocatedService(ctx, batch, s); err != nil {
  215. log.G(ctx).WithError(err).Errorf("failed committing allocation of service %s during init", s.ID)
  216. }
  217. }
  218. return nil
  219. }); err != nil {
  220. log.G(ctx).WithError(err).Error("failed committing allocation of services during init")
  221. }
  222. // Allocate tasks in the store so far before we started watching.
  223. var (
  224. tasks []*api.Task
  225. allocatedTasks []*api.Task
  226. )
  227. a.store.View(func(tx store.ReadTx) {
  228. tasks, err = store.FindTasks(tx, store.All)
  229. })
  230. if err != nil {
  231. return errors.Wrap(err, "error listing all tasks in store while trying to allocate during init")
  232. }
  233. for _, t := range tasks {
  234. if taskDead(t) {
  235. continue
  236. }
  237. var s *api.Service
  238. if t.ServiceID != "" {
  239. a.store.View(func(tx store.ReadTx) {
  240. s = store.GetService(tx, t.ServiceID)
  241. })
  242. }
  243. // Populate network attachments in the task
  244. // based on service spec.
  245. a.taskCreateNetworkAttachments(t, s)
  246. if taskReadyForNetworkVote(t, s, nc) {
  247. if t.Status.State >= api.TaskStatePending {
  248. continue
  249. }
  250. if a.taskAllocateVote(networkVoter, t.ID) {
  251. // If the task is not attached to any network, network
  252. // allocators job is done. Immediately cast a vote so
  253. // that the task can be moved to ALLOCATED state as
  254. // soon as possible.
  255. allocatedTasks = append(allocatedTasks, t)
  256. }
  257. continue
  258. }
  259. err := a.allocateTask(ctx, t)
  260. if err == nil {
  261. allocatedTasks = append(allocatedTasks, t)
  262. } else if err != errNoChanges {
  263. log.G(ctx).WithError(err).Errorf("failed allocating task %s during init", t.ID)
  264. nc.unallocatedTasks[t.ID] = t
  265. }
  266. }
  267. if _, err := a.store.Batch(func(batch *store.Batch) error {
  268. for _, t := range allocatedTasks {
  269. if err := a.commitAllocatedTask(ctx, batch, t); err != nil {
  270. log.G(ctx).WithError(err).Errorf("failed committing allocation of task %s during init", t.ID)
  271. }
  272. }
  273. return nil
  274. }); err != nil {
  275. log.G(ctx).WithError(err).Error("failed committing allocation of tasks during init")
  276. }
  277. return nil
  278. }
  279. func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
  280. nc := a.netCtx
  281. switch v := ev.(type) {
  282. case state.EventCreateNetwork:
  283. n := v.Network.Copy()
  284. if nc.nwkAllocator.IsAllocated(n) {
  285. break
  286. }
  287. if err := a.allocateNetwork(ctx, n); err != nil {
  288. log.G(ctx).WithError(err).Errorf("Failed allocation for network %s", n.ID)
  289. break
  290. }
  291. if _, err := a.store.Batch(func(batch *store.Batch) error {
  292. return a.commitAllocatedNetwork(ctx, batch, n)
  293. }); err != nil {
  294. log.G(ctx).WithError(err).Errorf("Failed to commit allocation for network %s", n.ID)
  295. }
  296. case state.EventDeleteNetwork:
  297. n := v.Network.Copy()
  298. // The assumption here is that all dependent objects
  299. // have been cleaned up when we are here so the only
  300. // thing that needs to happen is free the network
  301. // resources.
  302. if err := nc.nwkAllocator.Deallocate(n); err != nil {
  303. log.G(ctx).WithError(err).Errorf("Failed during network free for network %s", n.ID)
  304. }
  305. case state.EventCreateService:
  306. s := v.Service.Copy()
  307. if nc.nwkAllocator.IsServiceAllocated(s) {
  308. break
  309. }
  310. if err := a.allocateService(ctx, s); err != nil {
  311. log.G(ctx).WithError(err).Errorf("Failed allocation for service %s", s.ID)
  312. break
  313. }
  314. if _, err := a.store.Batch(func(batch *store.Batch) error {
  315. return a.commitAllocatedService(ctx, batch, s)
  316. }); err != nil {
  317. log.G(ctx).WithError(err).Errorf("Failed to commit allocation for service %s", s.ID)
  318. }
  319. case state.EventUpdateService:
  320. s := v.Service.Copy()
  321. if nc.nwkAllocator.IsServiceAllocated(s) {
  322. if nc.nwkAllocator.PortsAllocatedInHostPublishMode(s) {
  323. break
  324. }
  325. updatePortsInHostPublishMode(s)
  326. } else {
  327. if err := a.allocateService(ctx, s); err != nil {
  328. log.G(ctx).WithError(err).Errorf("Failed allocation during update of service %s", s.ID)
  329. break
  330. }
  331. }
  332. if _, err := a.store.Batch(func(batch *store.Batch) error {
  333. return a.commitAllocatedService(ctx, batch, s)
  334. }); err != nil {
  335. log.G(ctx).WithError(err).Errorf("Failed to commit allocation during update for service %s", s.ID)
  336. }
  337. case state.EventDeleteService:
  338. s := v.Service.Copy()
  339. if err := nc.nwkAllocator.ServiceDeallocate(s); err != nil {
  340. log.G(ctx).WithError(err).Errorf("Failed deallocation during delete of service %s", s.ID)
  341. }
  342. // Remove it from unallocatedServices just in case
  343. // it's still there.
  344. delete(nc.unallocatedServices, s.ID)
  345. case state.EventCreateNode, state.EventUpdateNode, state.EventDeleteNode:
  346. a.doNodeAlloc(ctx, ev)
  347. case state.EventCreateTask, state.EventUpdateTask, state.EventDeleteTask:
  348. a.doTaskAlloc(ctx, ev)
  349. case state.EventCommit:
  350. a.procUnallocatedNetworks(ctx)
  351. a.procUnallocatedServices(ctx)
  352. a.procUnallocatedTasksNetwork(ctx)
  353. return
  354. }
  355. }
  356. func (a *Allocator) doNodeAlloc(ctx context.Context, ev events.Event) {
  357. var (
  358. isDelete bool
  359. node *api.Node
  360. )
  361. switch v := ev.(type) {
  362. case state.EventCreateNode:
  363. node = v.Node.Copy()
  364. case state.EventUpdateNode:
  365. node = v.Node.Copy()
  366. case state.EventDeleteNode:
  367. isDelete = true
  368. node = v.Node.Copy()
  369. }
  370. nc := a.netCtx
  371. if isDelete {
  372. if nc.nwkAllocator.IsNodeAllocated(node) {
  373. if err := nc.nwkAllocator.DeallocateNode(node); err != nil {
  374. log.G(ctx).WithError(err).Errorf("Failed freeing network resources for node %s", node.ID)
  375. }
  376. }
  377. return
  378. }
  379. if !nc.nwkAllocator.IsNodeAllocated(node) {
  380. if node.Attachment == nil {
  381. node.Attachment = &api.NetworkAttachment{}
  382. }
  383. node.Attachment.Network = nc.ingressNetwork.Copy()
  384. if err := a.allocateNode(ctx, node); err != nil {
  385. log.G(ctx).WithError(err).Errorf("Failed to allocate network resources for node %s", node.ID)
  386. return
  387. }
  388. if _, err := a.store.Batch(func(batch *store.Batch) error {
  389. return a.commitAllocatedNode(ctx, batch, node)
  390. }); err != nil {
  391. log.G(ctx).WithError(err).Errorf("Failed to commit allocation of network resources for node %s", node.ID)
  392. }
  393. }
  394. }
  395. // taskRunning checks whether a task is either actively running, or in the
  396. // process of starting up.
  397. func taskRunning(t *api.Task) bool {
  398. return t.DesiredState <= api.TaskStateRunning && t.Status.State <= api.TaskStateRunning
  399. }
  400. // taskDead checks whether a task is not actively running as far as allocator purposes are concerned.
  401. func taskDead(t *api.Task) bool {
  402. return t.DesiredState > api.TaskStateRunning && t.Status.State > api.TaskStateRunning
  403. }
  404. // taskReadyForNetworkVote checks if the task is ready for a network
  405. // vote to move it to ALLOCATED state.
  406. func taskReadyForNetworkVote(t *api.Task, s *api.Service, nc *networkContext) bool {
  407. // Task is ready for vote if the following is true:
  408. //
  409. // Task has no network attached or networks attached but all
  410. // of them allocated AND Task's service has no endpoint or
  411. // network configured or service endpoints have been
  412. // allocated.
  413. return (len(t.Networks) == 0 || nc.nwkAllocator.IsTaskAllocated(t)) &&
  414. (s == nil || nc.nwkAllocator.IsServiceAllocated(s))
  415. }
  416. func taskUpdateNetworks(t *api.Task, networks []*api.NetworkAttachment) {
  417. networksCopy := make([]*api.NetworkAttachment, 0, len(networks))
  418. for _, n := range networks {
  419. networksCopy = append(networksCopy, n.Copy())
  420. }
  421. t.Networks = networksCopy
  422. }
  423. func taskUpdateEndpoint(t *api.Task, endpoint *api.Endpoint) {
  424. t.Endpoint = endpoint.Copy()
  425. }
  426. func isIngressNetworkNeeded(s *api.Service) bool {
  427. if s == nil {
  428. return false
  429. }
  430. if s.Spec.Endpoint == nil {
  431. return false
  432. }
  433. for _, p := range s.Spec.Endpoint.Ports {
  434. // The service to which this task belongs is trying to
  435. // expose ports with PublishMode as Ingress to the
  436. // external world. Automatically attach the task to
  437. // the ingress network.
  438. if p.PublishMode == api.PublishModeIngress {
  439. return true
  440. }
  441. }
  442. return false
  443. }
  444. func (a *Allocator) taskCreateNetworkAttachments(t *api.Task, s *api.Service) {
  445. // If task network attachments have already been filled in no
  446. // need to do anything else.
  447. if len(t.Networks) != 0 {
  448. return
  449. }
  450. var networks []*api.NetworkAttachment
  451. if isIngressNetworkNeeded(s) {
  452. networks = append(networks, &api.NetworkAttachment{Network: a.netCtx.ingressNetwork})
  453. }
  454. a.store.View(func(tx store.ReadTx) {
  455. // Always prefer NetworkAttachmentConfig in the TaskSpec
  456. specNetworks := t.Spec.Networks
  457. if len(specNetworks) == 0 && s != nil && len(s.Spec.Networks) != 0 {
  458. specNetworks = s.Spec.Networks
  459. }
  460. for _, na := range specNetworks {
  461. n := store.GetNetwork(tx, na.Target)
  462. if n == nil {
  463. continue
  464. }
  465. attachment := api.NetworkAttachment{Network: n}
  466. attachment.Aliases = append(attachment.Aliases, na.Aliases...)
  467. attachment.Addresses = append(attachment.Addresses, na.Addresses...)
  468. networks = append(networks, &attachment)
  469. }
  470. })
  471. taskUpdateNetworks(t, networks)
  472. }
  473. func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) {
  474. var (
  475. isDelete bool
  476. t *api.Task
  477. )
  478. switch v := ev.(type) {
  479. case state.EventCreateTask:
  480. t = v.Task.Copy()
  481. case state.EventUpdateTask:
  482. t = v.Task.Copy()
  483. case state.EventDeleteTask:
  484. isDelete = true
  485. t = v.Task.Copy()
  486. }
  487. nc := a.netCtx
  488. // If the task has stopped running or it's being deleted then
  489. // we should free the network resources associated with the
  490. // task right away.
  491. if taskDead(t) || isDelete {
  492. if nc.nwkAllocator.IsTaskAllocated(t) {
  493. if err := nc.nwkAllocator.DeallocateTask(t); err != nil {
  494. log.G(ctx).WithError(err).Errorf("Failed freeing network resources for task %s", t.ID)
  495. }
  496. }
  497. // Cleanup any task references that might exist in unallocatedTasks
  498. delete(nc.unallocatedTasks, t.ID)
  499. return
  500. }
  501. // If we are already in allocated state, there is
  502. // absolutely nothing else to do.
  503. if t.Status.State >= api.TaskStatePending {
  504. delete(nc.unallocatedTasks, t.ID)
  505. return
  506. }
  507. var s *api.Service
  508. if t.ServiceID != "" {
  509. a.store.View(func(tx store.ReadTx) {
  510. s = store.GetService(tx, t.ServiceID)
  511. })
  512. if s == nil {
  513. // If the task is running it is not normal to
  514. // not be able to find the associated
  515. // service. If the task is not running (task
  516. // is either dead or the desired state is set
  517. // to dead) then the service may not be
  518. // available in store. But we still need to
  519. // cleanup network resources associated with
  520. // the task.
  521. if taskRunning(t) && !isDelete {
  522. log.G(ctx).Errorf("Event %T: Failed to get service %s for task %s state %s: could not find service %s", ev, t.ServiceID, t.ID, t.Status.State, t.ServiceID)
  523. return
  524. }
  525. }
  526. }
  527. // Populate network attachments in the task
  528. // based on service spec.
  529. a.taskCreateNetworkAttachments(t, s)
  530. nc.unallocatedTasks[t.ID] = t
  531. }
  532. func (a *Allocator) allocateNode(ctx context.Context, node *api.Node) error {
  533. return a.netCtx.nwkAllocator.AllocateNode(node)
  534. }
  535. func (a *Allocator) commitAllocatedNode(ctx context.Context, batch *store.Batch, node *api.Node) error {
  536. if err := batch.Update(func(tx store.Tx) error {
  537. err := store.UpdateNode(tx, node)
  538. if err == store.ErrSequenceConflict {
  539. storeNode := store.GetNode(tx, node.ID)
  540. storeNode.Attachment = node.Attachment.Copy()
  541. err = store.UpdateNode(tx, storeNode)
  542. }
  543. return errors.Wrapf(err, "failed updating state in store transaction for node %s", node.ID)
  544. }); err != nil {
  545. if err := a.netCtx.nwkAllocator.DeallocateNode(node); err != nil {
  546. log.G(ctx).WithError(err).Errorf("failed rolling back allocation of node %s", node.ID)
  547. }
  548. return err
  549. }
  550. return nil
  551. }
  552. // This function prepares the service object for being updated when the change regards
  553. // the published ports in host mode: It resets the runtime state ports (s.Endpoint.Ports)
  554. // to the current ingress mode runtime state ports plus the newly configured publish mode ports,
  555. // so that the service allocation invoked on this new service object will trigger the deallocation
  556. // of any old publish mode port and allocation of any new one.
  557. func updatePortsInHostPublishMode(s *api.Service) {
  558. if s.Endpoint != nil {
  559. var portConfigs []*api.PortConfig
  560. for _, portConfig := range s.Endpoint.Ports {
  561. if portConfig.PublishMode == api.PublishModeIngress {
  562. portConfigs = append(portConfigs, portConfig)
  563. }
  564. }
  565. s.Endpoint.Ports = portConfigs
  566. }
  567. if s.Spec.Endpoint != nil {
  568. if s.Endpoint == nil {
  569. s.Endpoint = &api.Endpoint{}
  570. }
  571. for _, portConfig := range s.Spec.Endpoint.Ports {
  572. if portConfig.PublishMode == api.PublishModeIngress {
  573. continue
  574. }
  575. s.Endpoint.Ports = append(s.Endpoint.Ports, portConfig.Copy())
  576. }
  577. s.Endpoint.Spec = s.Spec.Endpoint.Copy()
  578. }
  579. }
  580. func (a *Allocator) allocateService(ctx context.Context, s *api.Service) error {
  581. nc := a.netCtx
  582. if s.Spec.Endpoint != nil {
  583. // service has user-defined endpoint
  584. if s.Endpoint == nil {
  585. // service currently has no allocated endpoint, need allocated.
  586. s.Endpoint = &api.Endpoint{
  587. Spec: s.Spec.Endpoint.Copy(),
  588. }
  589. }
  590. // The service is trying to expose ports to the external
  591. // world. Automatically attach the service to the ingress
  592. // network only if it is not already done.
  593. if isIngressNetworkNeeded(s) {
  594. var found bool
  595. for _, vip := range s.Endpoint.VirtualIPs {
  596. if vip.NetworkID == nc.ingressNetwork.ID {
  597. found = true
  598. break
  599. }
  600. }
  601. if !found {
  602. s.Endpoint.VirtualIPs = append(s.Endpoint.VirtualIPs,
  603. &api.Endpoint_VirtualIP{NetworkID: nc.ingressNetwork.ID})
  604. }
  605. }
  606. } else if s.Endpoint != nil {
  607. // service has no user-defined endpoints while has already allocated network resources,
  608. // need deallocated.
  609. if err := nc.nwkAllocator.ServiceDeallocate(s); err != nil {
  610. return err
  611. }
  612. }
  613. if err := nc.nwkAllocator.ServiceAllocate(s); err != nil {
  614. nc.unallocatedServices[s.ID] = s
  615. return err
  616. }
  617. // If the service doesn't expose ports any more and if we have
  618. // any lingering virtual IP references for ingress network
  619. // clean them up here.
  620. if !isIngressNetworkNeeded(s) {
  621. if s.Endpoint != nil {
  622. for i, vip := range s.Endpoint.VirtualIPs {
  623. if vip.NetworkID == nc.ingressNetwork.ID {
  624. n := len(s.Endpoint.VirtualIPs)
  625. s.Endpoint.VirtualIPs[i], s.Endpoint.VirtualIPs[n-1] = s.Endpoint.VirtualIPs[n-1], nil
  626. s.Endpoint.VirtualIPs = s.Endpoint.VirtualIPs[:n-1]
  627. break
  628. }
  629. }
  630. }
  631. }
  632. return nil
  633. }
  634. func (a *Allocator) commitAllocatedService(ctx context.Context, batch *store.Batch, s *api.Service) error {
  635. if err := batch.Update(func(tx store.Tx) error {
  636. err := store.UpdateService(tx, s)
  637. if err == store.ErrSequenceConflict {
  638. storeService := store.GetService(tx, s.ID)
  639. storeService.Endpoint = s.Endpoint
  640. err = store.UpdateService(tx, storeService)
  641. }
  642. return errors.Wrapf(err, "failed updating state in store transaction for service %s", s.ID)
  643. }); err != nil {
  644. if err := a.netCtx.nwkAllocator.ServiceDeallocate(s); err != nil {
  645. log.G(ctx).WithError(err).Errorf("failed rolling back allocation of service %s", s.ID)
  646. }
  647. return err
  648. }
  649. return nil
  650. }
  651. func (a *Allocator) allocateNetwork(ctx context.Context, n *api.Network) error {
  652. nc := a.netCtx
  653. if err := nc.nwkAllocator.Allocate(n); err != nil {
  654. nc.unallocatedNetworks[n.ID] = n
  655. return errors.Wrapf(err, "failed during network allocation for network %s", n.ID)
  656. }
  657. return nil
  658. }
  659. func (a *Allocator) commitAllocatedNetwork(ctx context.Context, batch *store.Batch, n *api.Network) error {
  660. if err := batch.Update(func(tx store.Tx) error {
  661. if err := store.UpdateNetwork(tx, n); err != nil {
  662. return errors.Wrapf(err, "failed updating state in store transaction for network %s", n.ID)
  663. }
  664. return nil
  665. }); err != nil {
  666. if err := a.netCtx.nwkAllocator.Deallocate(n); err != nil {
  667. log.G(ctx).WithError(err).Errorf("failed rolling back allocation of network %s", n.ID)
  668. }
  669. return err
  670. }
  671. return nil
  672. }
  673. func (a *Allocator) allocateTask(ctx context.Context, t *api.Task) (err error) {
  674. taskUpdated := false
  675. nc := a.netCtx
  676. // We might be here even if a task allocation has already
  677. // happened but wasn't successfully committed to store. In such
  678. // cases skip allocation and go straight ahead to updating the
  679. // store.
  680. if !nc.nwkAllocator.IsTaskAllocated(t) {
  681. a.store.View(func(tx store.ReadTx) {
  682. if t.ServiceID != "" {
  683. s := store.GetService(tx, t.ServiceID)
  684. if s == nil {
  685. err = fmt.Errorf("could not find service %s", t.ServiceID)
  686. return
  687. }
  688. if !nc.nwkAllocator.IsServiceAllocated(s) {
  689. err = fmt.Errorf("service %s to which this task %s belongs has pending allocations", s.ID, t.ID)
  690. return
  691. }
  692. if s.Endpoint != nil {
  693. taskUpdateEndpoint(t, s.Endpoint)
  694. taskUpdated = true
  695. }
  696. }
  697. for _, na := range t.Networks {
  698. n := store.GetNetwork(tx, na.Network.ID)
  699. if n == nil {
  700. err = fmt.Errorf("failed to retrieve network %s while allocating task %s", na.Network.ID, t.ID)
  701. return
  702. }
  703. if !nc.nwkAllocator.IsAllocated(n) {
  704. err = fmt.Errorf("network %s attached to task %s not allocated yet", n.ID, t.ID)
  705. return
  706. }
  707. na.Network = n
  708. }
  709. if err = nc.nwkAllocator.AllocateTask(t); err != nil {
  710. err = errors.Wrapf(err, "failed during networktask allocation for task %s", t.ID)
  711. return
  712. }
  713. if nc.nwkAllocator.IsTaskAllocated(t) {
  714. taskUpdated = true
  715. }
  716. })
  717. if err != nil {
  718. return err
  719. }
  720. }
  721. // Update the network allocations and moving to
  722. // PENDING state on top of the latest store state.
  723. if a.taskAllocateVote(networkVoter, t.ID) {
  724. if t.Status.State < api.TaskStatePending {
  725. updateTaskStatus(t, api.TaskStatePending, allocatedStatusMessage)
  726. taskUpdated = true
  727. }
  728. }
  729. if !taskUpdated {
  730. return errNoChanges
  731. }
  732. return nil
  733. }
  734. func (a *Allocator) commitAllocatedTask(ctx context.Context, batch *store.Batch, t *api.Task) error {
  735. return batch.Update(func(tx store.Tx) error {
  736. err := store.UpdateTask(tx, t)
  737. if err == store.ErrSequenceConflict {
  738. storeTask := store.GetTask(tx, t.ID)
  739. taskUpdateNetworks(storeTask, t.Networks)
  740. taskUpdateEndpoint(storeTask, t.Endpoint)
  741. if storeTask.Status.State < api.TaskStatePending {
  742. storeTask.Status = t.Status
  743. }
  744. err = store.UpdateTask(tx, storeTask)
  745. }
  746. return errors.Wrapf(err, "failed updating state in store transaction for task %s", t.ID)
  747. })
  748. }
  749. func (a *Allocator) procUnallocatedNetworks(ctx context.Context) {
  750. nc := a.netCtx
  751. var allocatedNetworks []*api.Network
  752. for _, n := range nc.unallocatedNetworks {
  753. if !nc.nwkAllocator.IsAllocated(n) {
  754. if err := a.allocateNetwork(ctx, n); err != nil {
  755. log.G(ctx).WithError(err).Debugf("Failed allocation of unallocated network %s", n.ID)
  756. continue
  757. }
  758. allocatedNetworks = append(allocatedNetworks, n)
  759. }
  760. }
  761. if len(allocatedNetworks) == 0 {
  762. return
  763. }
  764. committed, err := a.store.Batch(func(batch *store.Batch) error {
  765. for _, n := range allocatedNetworks {
  766. if err := a.commitAllocatedNetwork(ctx, batch, n); err != nil {
  767. log.G(ctx).WithError(err).Debugf("Failed to commit allocation of unallocated network %s", n.ID)
  768. continue
  769. }
  770. }
  771. return nil
  772. })
  773. if err != nil {
  774. log.G(ctx).WithError(err).Error("Failed to commit allocation of unallocated networks")
  775. }
  776. for _, n := range allocatedNetworks[:committed] {
  777. delete(nc.unallocatedNetworks, n.ID)
  778. }
  779. }
  780. func (a *Allocator) procUnallocatedServices(ctx context.Context) {
  781. nc := a.netCtx
  782. var allocatedServices []*api.Service
  783. for _, s := range nc.unallocatedServices {
  784. if !nc.nwkAllocator.IsServiceAllocated(s) {
  785. if err := a.allocateService(ctx, s); err != nil {
  786. log.G(ctx).WithError(err).Debugf("Failed allocation of unallocated service %s", s.ID)
  787. continue
  788. }
  789. allocatedServices = append(allocatedServices, s)
  790. }
  791. }
  792. if len(allocatedServices) == 0 {
  793. return
  794. }
  795. committed, err := a.store.Batch(func(batch *store.Batch) error {
  796. for _, s := range allocatedServices {
  797. if err := a.commitAllocatedService(ctx, batch, s); err != nil {
  798. log.G(ctx).WithError(err).Debugf("Failed to commit allocation of unallocated service %s", s.ID)
  799. continue
  800. }
  801. }
  802. return nil
  803. })
  804. if err != nil {
  805. log.G(ctx).WithError(err).Error("Failed to commit allocation of unallocated services")
  806. }
  807. for _, s := range allocatedServices[:committed] {
  808. delete(nc.unallocatedServices, s.ID)
  809. }
  810. }
  811. func (a *Allocator) procUnallocatedTasksNetwork(ctx context.Context) {
  812. nc := a.netCtx
  813. allocatedTasks := make([]*api.Task, 0, len(nc.unallocatedTasks))
  814. for _, t := range nc.unallocatedTasks {
  815. if err := a.allocateTask(ctx, t); err == nil {
  816. allocatedTasks = append(allocatedTasks, t)
  817. } else if err != errNoChanges {
  818. log.G(ctx).WithError(err).Error("task allocation failure")
  819. }
  820. }
  821. if len(allocatedTasks) == 0 {
  822. return
  823. }
  824. committed, err := a.store.Batch(func(batch *store.Batch) error {
  825. for _, t := range allocatedTasks {
  826. err := a.commitAllocatedTask(ctx, batch, t)
  827. if err != nil {
  828. log.G(ctx).WithError(err).Error("task allocation commit failure")
  829. continue
  830. }
  831. }
  832. return nil
  833. })
  834. if err != nil {
  835. log.G(ctx).WithError(err).Error("failed a store batch operation while processing unallocated tasks")
  836. }
  837. for _, t := range allocatedTasks[:committed] {
  838. delete(nc.unallocatedTasks, t.ID)
  839. }
  840. }
  841. // updateTaskStatus sets TaskStatus and updates timestamp.
  842. func updateTaskStatus(t *api.Task, newStatus api.TaskState, message string) {
  843. t.Status.State = newStatus
  844. t.Status.Message = message
  845. t.Status.Timestamp = ptypes.MustTimestampProto(time.Now())
  846. }