network.go 38 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320
  1. package allocator
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/docker/go-events"
  6. "github.com/docker/swarmkit/api"
  7. "github.com/docker/swarmkit/log"
  8. "github.com/docker/swarmkit/manager/allocator/cnmallocator"
  9. "github.com/docker/swarmkit/manager/allocator/networkallocator"
  10. "github.com/docker/swarmkit/manager/state"
  11. "github.com/docker/swarmkit/manager/state/store"
  12. "github.com/docker/swarmkit/protobuf/ptypes"
  13. "github.com/pkg/errors"
  14. "golang.org/x/net/context"
  15. )
  16. const (
  17. // Network allocator Voter ID for task allocation vote.
  18. networkVoter = "network"
  19. allocatedStatusMessage = "pending task scheduling"
  20. )
  21. var (
  22. // ErrNoIngress is returned when no ingress network is found in store
  23. ErrNoIngress = errors.New("no ingress network found")
  24. errNoChanges = errors.New("task unchanged")
  25. retryInterval = 5 * time.Minute
  26. )
  27. // Network context information which is used throughout the network allocation code.
  28. type networkContext struct {
  29. ingressNetwork *api.Network
  30. // Instance of the low-level network allocator which performs
  31. // the actual network allocation.
  32. nwkAllocator networkallocator.NetworkAllocator
  33. // A set of tasks which are ready to be allocated as a batch. This is
  34. // distinct from "unallocatedTasks" which are tasks that failed to
  35. // allocate on the first try, being held for a future retry.
  36. pendingTasks map[string]*api.Task
  37. // A set of unallocated tasks which will be revisited if any thing
  38. // changes in system state that might help task allocation.
  39. unallocatedTasks map[string]*api.Task
  40. // A set of unallocated services which will be revisited if
  41. // any thing changes in system state that might help service
  42. // allocation.
  43. unallocatedServices map[string]*api.Service
  44. // A set of unallocated networks which will be revisited if
  45. // any thing changes in system state that might help network
  46. // allocation.
  47. unallocatedNetworks map[string]*api.Network
  48. // lastRetry is the last timestamp when unallocated
  49. // tasks/services/networks were retried.
  50. lastRetry time.Time
  51. // somethingWasDeallocated indicates that we just deallocated at
  52. // least one service/task/network, so we should retry failed
  53. // allocations (in we are experiencing IP exhaustion and an IP was
  54. // released).
  55. somethingWasDeallocated bool
  56. }
  57. func (a *Allocator) doNetworkInit(ctx context.Context) (err error) {
  58. na, err := cnmallocator.New(a.pluginGetter)
  59. if err != nil {
  60. return err
  61. }
  62. nc := &networkContext{
  63. nwkAllocator: na,
  64. pendingTasks: make(map[string]*api.Task),
  65. unallocatedTasks: make(map[string]*api.Task),
  66. unallocatedServices: make(map[string]*api.Service),
  67. unallocatedNetworks: make(map[string]*api.Network),
  68. lastRetry: time.Now(),
  69. }
  70. a.netCtx = nc
  71. defer func() {
  72. // Clear a.netCtx if initialization was unsuccessful.
  73. if err != nil {
  74. a.netCtx = nil
  75. }
  76. }()
  77. // Ingress network is now created at cluster's first time creation.
  78. // Check if we have the ingress network. If found, make sure it is
  79. // allocated, before reading all network objects for allocation.
  80. // If not found, it means it was removed by user, nothing to do here.
  81. ingressNetwork, err := GetIngressNetwork(a.store)
  82. switch err {
  83. case nil:
  84. // Try to complete ingress network allocation before anything else so
  85. // that the we can get the preferred subnet for ingress network.
  86. nc.ingressNetwork = ingressNetwork
  87. if !na.IsAllocated(nc.ingressNetwork) {
  88. if err := a.allocateNetwork(ctx, nc.ingressNetwork); err != nil {
  89. log.G(ctx).WithError(err).Error("failed allocating ingress network during init")
  90. } else if err := a.store.Batch(func(batch *store.Batch) error {
  91. if err := a.commitAllocatedNetwork(ctx, batch, nc.ingressNetwork); err != nil {
  92. log.G(ctx).WithError(err).Error("failed committing allocation of ingress network during init")
  93. }
  94. return nil
  95. }); err != nil {
  96. log.G(ctx).WithError(err).Error("failed committing allocation of ingress network during init")
  97. }
  98. }
  99. case ErrNoIngress:
  100. // Ingress network is not present in store, It means user removed it
  101. // and did not create a new one.
  102. default:
  103. return errors.Wrap(err, "failure while looking for ingress network during init")
  104. }
  105. // Allocate networks in the store so far before we started
  106. // watching.
  107. var networks []*api.Network
  108. a.store.View(func(tx store.ReadTx) {
  109. networks, err = store.FindNetworks(tx, store.All)
  110. })
  111. if err != nil {
  112. return errors.Wrap(err, "error listing all networks in store while trying to allocate during init")
  113. }
  114. var allocatedNetworks []*api.Network
  115. for _, n := range networks {
  116. if na.IsAllocated(n) {
  117. continue
  118. }
  119. if err := a.allocateNetwork(ctx, n); err != nil {
  120. log.G(ctx).WithError(err).Errorf("failed allocating network %s during init", n.ID)
  121. continue
  122. }
  123. allocatedNetworks = append(allocatedNetworks, n)
  124. }
  125. if err := a.store.Batch(func(batch *store.Batch) error {
  126. for _, n := range allocatedNetworks {
  127. if err := a.commitAllocatedNetwork(ctx, batch, n); err != nil {
  128. log.G(ctx).WithError(err).Errorf("failed committing allocation of network %s during init", n.ID)
  129. }
  130. }
  131. return nil
  132. }); err != nil {
  133. log.G(ctx).WithError(err).Error("failed committing allocation of networks during init")
  134. }
  135. // First, allocate objects that already have addresses associated with
  136. // them, to reserve these IP addresses in internal state.
  137. if err := a.allocateNodes(ctx, true); err != nil {
  138. return err
  139. }
  140. if err := a.allocateServices(ctx, true); err != nil {
  141. return err
  142. }
  143. if err := a.allocateTasks(ctx, true); err != nil {
  144. return err
  145. }
  146. if err := a.allocateNodes(ctx, false); err != nil {
  147. return err
  148. }
  149. if err := a.allocateServices(ctx, false); err != nil {
  150. return err
  151. }
  152. return a.allocateTasks(ctx, false)
  153. }
  154. func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
  155. nc := a.netCtx
  156. switch v := ev.(type) {
  157. case api.EventCreateNetwork:
  158. n := v.Network.Copy()
  159. if nc.nwkAllocator.IsAllocated(n) {
  160. break
  161. }
  162. if IsIngressNetwork(n) && nc.ingressNetwork != nil {
  163. log.G(ctx).Errorf("Cannot allocate ingress network %s (%s) because another ingress network is already present: %s (%s)",
  164. n.ID, n.Spec.Annotations.Name, nc.ingressNetwork.ID, nc.ingressNetwork.Spec.Annotations)
  165. break
  166. }
  167. if err := a.allocateNetwork(ctx, n); err != nil {
  168. log.G(ctx).WithError(err).Errorf("Failed allocation for network %s", n.ID)
  169. break
  170. }
  171. if err := a.store.Batch(func(batch *store.Batch) error {
  172. return a.commitAllocatedNetwork(ctx, batch, n)
  173. }); err != nil {
  174. log.G(ctx).WithError(err).Errorf("Failed to commit allocation for network %s", n.ID)
  175. }
  176. if IsIngressNetwork(n) {
  177. nc.ingressNetwork = n
  178. }
  179. err := a.allocateNodes(ctx, false)
  180. if err != nil {
  181. log.G(ctx).WithError(err).Error(err)
  182. }
  183. case api.EventDeleteNetwork:
  184. n := v.Network.Copy()
  185. if IsIngressNetwork(n) && nc.ingressNetwork != nil && nc.ingressNetwork.ID == n.ID {
  186. nc.ingressNetwork = nil
  187. }
  188. if err := a.deallocateNodeAttachments(ctx, n.ID); err != nil {
  189. log.G(ctx).WithError(err).Error(err)
  190. }
  191. // The assumption here is that all dependent objects
  192. // have been cleaned up when we are here so the only
  193. // thing that needs to happen is free the network
  194. // resources.
  195. if err := nc.nwkAllocator.Deallocate(n); err != nil {
  196. log.G(ctx).WithError(err).Errorf("Failed during network free for network %s", n.ID)
  197. } else {
  198. nc.somethingWasDeallocated = true
  199. }
  200. delete(nc.unallocatedNetworks, n.ID)
  201. case api.EventCreateService:
  202. var s *api.Service
  203. a.store.View(func(tx store.ReadTx) {
  204. s = store.GetService(tx, v.Service.ID)
  205. })
  206. if s == nil {
  207. break
  208. }
  209. if nc.nwkAllocator.IsServiceAllocated(s) {
  210. break
  211. }
  212. if err := a.allocateService(ctx, s); err != nil {
  213. log.G(ctx).WithError(err).Errorf("Failed allocation for service %s", s.ID)
  214. break
  215. }
  216. if err := a.store.Batch(func(batch *store.Batch) error {
  217. return a.commitAllocatedService(ctx, batch, s)
  218. }); err != nil {
  219. log.G(ctx).WithError(err).Errorf("Failed to commit allocation for service %s", s.ID)
  220. }
  221. case api.EventUpdateService:
  222. // We may have already allocated this service. If a create or
  223. // update event is older than the current version in the store,
  224. // we run the risk of allocating the service a second time.
  225. // Only operate on the latest version of the service.
  226. var s *api.Service
  227. a.store.View(func(tx store.ReadTx) {
  228. s = store.GetService(tx, v.Service.ID)
  229. })
  230. if s == nil {
  231. break
  232. }
  233. if nc.nwkAllocator.IsServiceAllocated(s) {
  234. if !nc.nwkAllocator.HostPublishPortsNeedUpdate(s) {
  235. break
  236. }
  237. updatePortsInHostPublishMode(s)
  238. } else {
  239. if err := a.allocateService(ctx, s); err != nil {
  240. log.G(ctx).WithError(err).Errorf("Failed allocation during update of service %s", s.ID)
  241. break
  242. }
  243. }
  244. if err := a.store.Batch(func(batch *store.Batch) error {
  245. return a.commitAllocatedService(ctx, batch, s)
  246. }); err != nil {
  247. log.G(ctx).WithError(err).Errorf("Failed to commit allocation during update for service %s", s.ID)
  248. nc.unallocatedServices[s.ID] = s
  249. } else {
  250. delete(nc.unallocatedServices, s.ID)
  251. }
  252. case api.EventDeleteService:
  253. s := v.Service.Copy()
  254. if err := nc.nwkAllocator.DeallocateService(s); err != nil {
  255. log.G(ctx).WithError(err).Errorf("Failed deallocation during delete of service %s", s.ID)
  256. } else {
  257. nc.somethingWasDeallocated = true
  258. }
  259. // Remove it from unallocatedServices just in case
  260. // it's still there.
  261. delete(nc.unallocatedServices, s.ID)
  262. case api.EventCreateNode, api.EventUpdateNode, api.EventDeleteNode:
  263. a.doNodeAlloc(ctx, ev)
  264. case api.EventCreateTask, api.EventUpdateTask, api.EventDeleteTask:
  265. a.doTaskAlloc(ctx, ev)
  266. case state.EventCommit:
  267. a.procTasksNetwork(ctx, false)
  268. if time.Since(nc.lastRetry) > retryInterval || nc.somethingWasDeallocated {
  269. a.procUnallocatedNetworks(ctx)
  270. a.procUnallocatedServices(ctx)
  271. a.procTasksNetwork(ctx, true)
  272. nc.lastRetry = time.Now()
  273. nc.somethingWasDeallocated = false
  274. }
  275. // Any left over tasks are moved to the unallocated set
  276. for _, t := range nc.pendingTasks {
  277. nc.unallocatedTasks[t.ID] = t
  278. }
  279. nc.pendingTasks = make(map[string]*api.Task)
  280. }
  281. }
  282. func (a *Allocator) doNodeAlloc(ctx context.Context, ev events.Event) {
  283. var (
  284. isDelete bool
  285. node *api.Node
  286. )
  287. // We may have already allocated this node. If a create or update
  288. // event is older than the current version in the store, we run the
  289. // risk of allocating the node a second time. Only operate on the
  290. // latest version of the node.
  291. switch v := ev.(type) {
  292. case api.EventCreateNode:
  293. a.store.View(func(tx store.ReadTx) {
  294. node = store.GetNode(tx, v.Node.ID)
  295. })
  296. case api.EventUpdateNode:
  297. a.store.View(func(tx store.ReadTx) {
  298. node = store.GetNode(tx, v.Node.ID)
  299. })
  300. case api.EventDeleteNode:
  301. isDelete = true
  302. node = v.Node.Copy()
  303. }
  304. if node == nil {
  305. return
  306. }
  307. nc := a.netCtx
  308. if isDelete {
  309. if err := a.deallocateNode(node); err != nil {
  310. log.G(ctx).WithError(err).Errorf("Failed freeing network resources for node %s", node.ID)
  311. } else {
  312. nc.somethingWasDeallocated = true
  313. }
  314. } else {
  315. allocatedNetworks, err := a.getAllocatedNetworks()
  316. if err != nil {
  317. log.G(ctx).WithError(err).Errorf("Error listing allocated networks in network %s", node.ID)
  318. }
  319. isAllocated := a.allocateNode(ctx, node, false, allocatedNetworks)
  320. if isAllocated {
  321. if err := a.store.Batch(func(batch *store.Batch) error {
  322. return a.commitAllocatedNode(ctx, batch, node)
  323. }); err != nil {
  324. log.G(ctx).WithError(err).Errorf("Failed to commit allocation of network resources for node %s", node.ID)
  325. }
  326. }
  327. }
  328. }
  329. func isOverlayNetwork(n *api.Network) bool {
  330. if n.DriverState != nil && n.DriverState.Name == "overlay" {
  331. return true
  332. }
  333. if n.Spec.DriverConfig != nil && n.Spec.DriverConfig.Name == "overlay" {
  334. return true
  335. }
  336. return false
  337. }
  338. func (a *Allocator) getAllocatedNetworks() ([]*api.Network, error) {
  339. var (
  340. err error
  341. nc = a.netCtx
  342. na = nc.nwkAllocator
  343. allocatedNetworks []*api.Network
  344. )
  345. // Find allocated networks
  346. var networks []*api.Network
  347. a.store.View(func(tx store.ReadTx) {
  348. networks, err = store.FindNetworks(tx, store.All)
  349. })
  350. if err != nil {
  351. return nil, errors.Wrap(err, "error listing all networks in store while trying to allocate during init")
  352. }
  353. for _, n := range networks {
  354. if isOverlayNetwork(n) && na.IsAllocated(n) {
  355. allocatedNetworks = append(allocatedNetworks, n)
  356. }
  357. }
  358. return allocatedNetworks, nil
  359. }
  360. func (a *Allocator) allocateNodes(ctx context.Context, existingAddressesOnly bool) error {
  361. // Allocate nodes in the store so far before we process watched events.
  362. var (
  363. allocatedNodes []*api.Node
  364. nodes []*api.Node
  365. err error
  366. )
  367. a.store.View(func(tx store.ReadTx) {
  368. nodes, err = store.FindNodes(tx, store.All)
  369. })
  370. if err != nil {
  371. return errors.Wrap(err, "error listing all nodes in store while trying to allocate network resources")
  372. }
  373. allocatedNetworks, err := a.getAllocatedNetworks()
  374. if err != nil {
  375. return errors.Wrap(err, "error listing all nodes in store while trying to allocate network resources")
  376. }
  377. for _, node := range nodes {
  378. isAllocated := a.allocateNode(ctx, node, existingAddressesOnly, allocatedNetworks)
  379. if isAllocated {
  380. allocatedNodes = append(allocatedNodes, node)
  381. }
  382. }
  383. if err := a.store.Batch(func(batch *store.Batch) error {
  384. for _, node := range allocatedNodes {
  385. if err := a.commitAllocatedNode(ctx, batch, node); err != nil {
  386. log.G(ctx).WithError(err).Errorf("Failed to commit allocation of network resources for node %s", node.ID)
  387. }
  388. }
  389. return nil
  390. }); err != nil {
  391. log.G(ctx).WithError(err).Error("Failed to commit allocation of network resources for nodes")
  392. }
  393. return nil
  394. }
  395. func (a *Allocator) deallocateNodes(ctx context.Context) error {
  396. var (
  397. nodes []*api.Node
  398. nc = a.netCtx
  399. err error
  400. )
  401. a.store.View(func(tx store.ReadTx) {
  402. nodes, err = store.FindNodes(tx, store.All)
  403. })
  404. if err != nil {
  405. return fmt.Errorf("error listing all nodes in store while trying to free network resources")
  406. }
  407. for _, node := range nodes {
  408. if err := a.deallocateNode(node); err != nil {
  409. log.G(ctx).WithError(err).Errorf("Failed freeing network resources for node %s", node.ID)
  410. } else {
  411. nc.somethingWasDeallocated = true
  412. }
  413. if err := a.store.Batch(func(batch *store.Batch) error {
  414. return a.commitAllocatedNode(ctx, batch, node)
  415. }); err != nil {
  416. log.G(ctx).WithError(err).Errorf("Failed to commit deallocation of network resources for node %s", node.ID)
  417. }
  418. }
  419. return nil
  420. }
  421. func (a *Allocator) deallocateNodeAttachments(ctx context.Context, nid string) error {
  422. var (
  423. nodes []*api.Node
  424. nc = a.netCtx
  425. err error
  426. )
  427. a.store.View(func(tx store.ReadTx) {
  428. nodes, err = store.FindNodes(tx, store.All)
  429. })
  430. if err != nil {
  431. return fmt.Errorf("error listing all nodes in store while trying to free network resources")
  432. }
  433. for _, node := range nodes {
  434. var networkAttachment *api.NetworkAttachment
  435. var naIndex int
  436. for index, na := range node.Attachments {
  437. if na.Network.ID == nid {
  438. networkAttachment = na
  439. naIndex = index
  440. break
  441. }
  442. }
  443. if networkAttachment == nil {
  444. log.G(ctx).Errorf("Failed to find network %s on node %s", nid, node.ID)
  445. continue
  446. }
  447. if nc.nwkAllocator.IsAttachmentAllocated(node, networkAttachment) {
  448. if err := nc.nwkAllocator.DeallocateAttachment(node, networkAttachment); err != nil {
  449. log.G(ctx).WithError(err).Errorf("Failed to commit deallocation of network resources for node %s", node.ID)
  450. } else {
  451. // Delete the lbattachment
  452. node.Attachments[naIndex] = node.Attachments[len(node.Attachments)-1]
  453. node.Attachments[len(node.Attachments)-1] = nil
  454. node.Attachments = node.Attachments[:len(node.Attachments)-1]
  455. if err := a.store.Batch(func(batch *store.Batch) error {
  456. return a.commitAllocatedNode(ctx, batch, node)
  457. }); err != nil {
  458. log.G(ctx).WithError(err).Errorf("Failed to commit deallocation of network resources for node %s", node.ID)
  459. }
  460. }
  461. }
  462. }
  463. return nil
  464. }
  465. func (a *Allocator) deallocateNode(node *api.Node) error {
  466. var (
  467. nc = a.netCtx
  468. )
  469. for _, na := range node.Attachments {
  470. if nc.nwkAllocator.IsAttachmentAllocated(node, na) {
  471. if err := nc.nwkAllocator.DeallocateAttachment(node, na); err != nil {
  472. return err
  473. }
  474. }
  475. }
  476. node.Attachments = nil
  477. return nil
  478. }
  479. // allocateServices allocates services in the store so far before we process
  480. // watched events.
  481. func (a *Allocator) allocateServices(ctx context.Context, existingAddressesOnly bool) error {
  482. var (
  483. nc = a.netCtx
  484. services []*api.Service
  485. err error
  486. )
  487. a.store.View(func(tx store.ReadTx) {
  488. services, err = store.FindServices(tx, store.All)
  489. })
  490. if err != nil {
  491. return errors.Wrap(err, "error listing all services in store while trying to allocate during init")
  492. }
  493. var allocatedServices []*api.Service
  494. for _, s := range services {
  495. if nc.nwkAllocator.IsServiceAllocated(s, networkallocator.OnInit) {
  496. continue
  497. }
  498. if existingAddressesOnly &&
  499. (s.Endpoint == nil ||
  500. len(s.Endpoint.VirtualIPs) == 0) {
  501. continue
  502. }
  503. if err := a.allocateService(ctx, s); err != nil {
  504. log.G(ctx).WithError(err).Errorf("failed allocating service %s during init", s.ID)
  505. continue
  506. }
  507. allocatedServices = append(allocatedServices, s)
  508. }
  509. if err := a.store.Batch(func(batch *store.Batch) error {
  510. for _, s := range allocatedServices {
  511. if err := a.commitAllocatedService(ctx, batch, s); err != nil {
  512. log.G(ctx).WithError(err).Errorf("failed committing allocation of service %s during init", s.ID)
  513. }
  514. }
  515. return nil
  516. }); err != nil {
  517. log.G(ctx).WithError(err).Error("failed committing allocation of services during init")
  518. }
  519. return nil
  520. }
  521. // allocateTasks allocates tasks in the store so far before we started watching.
  522. func (a *Allocator) allocateTasks(ctx context.Context, existingAddressesOnly bool) error {
  523. var (
  524. nc = a.netCtx
  525. tasks []*api.Task
  526. allocatedTasks []*api.Task
  527. err error
  528. )
  529. a.store.View(func(tx store.ReadTx) {
  530. tasks, err = store.FindTasks(tx, store.All)
  531. })
  532. if err != nil {
  533. return errors.Wrap(err, "error listing all tasks in store while trying to allocate during init")
  534. }
  535. for _, t := range tasks {
  536. if t.Status.State > api.TaskStateRunning {
  537. continue
  538. }
  539. if existingAddressesOnly {
  540. hasAddresses := false
  541. for _, nAttach := range t.Networks {
  542. if len(nAttach.Addresses) != 0 {
  543. hasAddresses = true
  544. break
  545. }
  546. }
  547. if !hasAddresses {
  548. continue
  549. }
  550. }
  551. var s *api.Service
  552. if t.ServiceID != "" {
  553. a.store.View(func(tx store.ReadTx) {
  554. s = store.GetService(tx, t.ServiceID)
  555. })
  556. }
  557. // Populate network attachments in the task
  558. // based on service spec.
  559. a.taskCreateNetworkAttachments(t, s)
  560. if taskReadyForNetworkVote(t, s, nc) {
  561. if t.Status.State >= api.TaskStatePending {
  562. continue
  563. }
  564. if a.taskAllocateVote(networkVoter, t.ID) {
  565. // If the task is not attached to any network, network
  566. // allocators job is done. Immediately cast a vote so
  567. // that the task can be moved to the PENDING state as
  568. // soon as possible.
  569. updateTaskStatus(t, api.TaskStatePending, allocatedStatusMessage)
  570. allocatedTasks = append(allocatedTasks, t)
  571. }
  572. continue
  573. }
  574. err := a.allocateTask(ctx, t)
  575. if err == nil {
  576. allocatedTasks = append(allocatedTasks, t)
  577. } else if err != errNoChanges {
  578. log.G(ctx).WithError(err).Errorf("failed allocating task %s during init", t.ID)
  579. nc.unallocatedTasks[t.ID] = t
  580. }
  581. }
  582. if err := a.store.Batch(func(batch *store.Batch) error {
  583. for _, t := range allocatedTasks {
  584. if err := a.commitAllocatedTask(ctx, batch, t); err != nil {
  585. log.G(ctx).WithError(err).Errorf("failed committing allocation of task %s during init", t.ID)
  586. }
  587. }
  588. return nil
  589. }); err != nil {
  590. log.G(ctx).WithError(err).Error("failed committing allocation of tasks during init")
  591. }
  592. return nil
  593. }
  594. // taskReadyForNetworkVote checks if the task is ready for a network
  595. // vote to move it to PENDING state.
  596. func taskReadyForNetworkVote(t *api.Task, s *api.Service, nc *networkContext) bool {
  597. // Task is ready for vote if the following is true:
  598. //
  599. // Task has no network attached or networks attached but all
  600. // of them allocated AND Task's service has no endpoint or
  601. // network configured or service endpoints have been
  602. // allocated.
  603. return (len(t.Networks) == 0 || nc.nwkAllocator.IsTaskAllocated(t)) &&
  604. (s == nil || nc.nwkAllocator.IsServiceAllocated(s))
  605. }
  606. func taskUpdateNetworks(t *api.Task, networks []*api.NetworkAttachment) {
  607. networksCopy := make([]*api.NetworkAttachment, 0, len(networks))
  608. for _, n := range networks {
  609. networksCopy = append(networksCopy, n.Copy())
  610. }
  611. t.Networks = networksCopy
  612. }
  613. func taskUpdateEndpoint(t *api.Task, endpoint *api.Endpoint) {
  614. t.Endpoint = endpoint.Copy()
  615. }
  616. // IsIngressNetworkNeeded checks whether the service requires the routing-mesh
  617. func IsIngressNetworkNeeded(s *api.Service) bool {
  618. return networkallocator.IsIngressNetworkNeeded(s)
  619. }
  620. func (a *Allocator) taskCreateNetworkAttachments(t *api.Task, s *api.Service) {
  621. // If task network attachments have already been filled in no
  622. // need to do anything else.
  623. if len(t.Networks) != 0 {
  624. return
  625. }
  626. var networks []*api.NetworkAttachment
  627. if IsIngressNetworkNeeded(s) && a.netCtx.ingressNetwork != nil {
  628. networks = append(networks, &api.NetworkAttachment{Network: a.netCtx.ingressNetwork})
  629. }
  630. a.store.View(func(tx store.ReadTx) {
  631. // Always prefer NetworkAttachmentConfig in the TaskSpec
  632. specNetworks := t.Spec.Networks
  633. if len(specNetworks) == 0 && s != nil && len(s.Spec.Networks) != 0 {
  634. specNetworks = s.Spec.Networks
  635. }
  636. for _, na := range specNetworks {
  637. n := store.GetNetwork(tx, na.Target)
  638. if n == nil {
  639. continue
  640. }
  641. attachment := api.NetworkAttachment{Network: n}
  642. attachment.Aliases = append(attachment.Aliases, na.Aliases...)
  643. attachment.Addresses = append(attachment.Addresses, na.Addresses...)
  644. attachment.DriverAttachmentOpts = na.DriverAttachmentOpts
  645. networks = append(networks, &attachment)
  646. }
  647. })
  648. taskUpdateNetworks(t, networks)
  649. }
  650. func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) {
  651. var (
  652. isDelete bool
  653. t *api.Task
  654. )
  655. // We may have already allocated this task. If a create or update
  656. // event is older than the current version in the store, we run the
  657. // risk of allocating the task a second time. Only operate on the
  658. // latest version of the task.
  659. switch v := ev.(type) {
  660. case api.EventCreateTask:
  661. a.store.View(func(tx store.ReadTx) {
  662. t = store.GetTask(tx, v.Task.ID)
  663. })
  664. case api.EventUpdateTask:
  665. a.store.View(func(tx store.ReadTx) {
  666. t = store.GetTask(tx, v.Task.ID)
  667. })
  668. case api.EventDeleteTask:
  669. isDelete = true
  670. t = v.Task.Copy()
  671. }
  672. if t == nil {
  673. return
  674. }
  675. nc := a.netCtx
  676. // If the task has stopped running then we should free the network
  677. // resources associated with the task right away.
  678. if t.Status.State > api.TaskStateRunning || isDelete {
  679. if nc.nwkAllocator.IsTaskAllocated(t) {
  680. if err := nc.nwkAllocator.DeallocateTask(t); err != nil {
  681. log.G(ctx).WithError(err).Errorf("Failed freeing network resources for task %s", t.ID)
  682. } else {
  683. nc.somethingWasDeallocated = true
  684. }
  685. }
  686. // Cleanup any task references that might exist
  687. delete(nc.pendingTasks, t.ID)
  688. delete(nc.unallocatedTasks, t.ID)
  689. return
  690. }
  691. // If we are already in allocated state, there is
  692. // absolutely nothing else to do.
  693. if t.Status.State >= api.TaskStatePending {
  694. delete(nc.pendingTasks, t.ID)
  695. delete(nc.unallocatedTasks, t.ID)
  696. return
  697. }
  698. var s *api.Service
  699. if t.ServiceID != "" {
  700. a.store.View(func(tx store.ReadTx) {
  701. s = store.GetService(tx, t.ServiceID)
  702. })
  703. if s == nil {
  704. // If the task is running it is not normal to
  705. // not be able to find the associated
  706. // service. If the task is not running (task
  707. // is either dead or the desired state is set
  708. // to dead) then the service may not be
  709. // available in store. But we still need to
  710. // cleanup network resources associated with
  711. // the task.
  712. if t.Status.State <= api.TaskStateRunning && !isDelete {
  713. log.G(ctx).Errorf("Event %T: Failed to get service %s for task %s state %s: could not find service %s", ev, t.ServiceID, t.ID, t.Status.State, t.ServiceID)
  714. return
  715. }
  716. }
  717. }
  718. // Populate network attachments in the task
  719. // based on service spec.
  720. a.taskCreateNetworkAttachments(t, s)
  721. nc.pendingTasks[t.ID] = t
  722. }
  723. func (a *Allocator) allocateNode(ctx context.Context, node *api.Node, existingAddressesOnly bool, networks []*api.Network) bool {
  724. var allocated bool
  725. nc := a.netCtx
  726. for _, network := range networks {
  727. var lbAttachment *api.NetworkAttachment
  728. for _, na := range node.Attachments {
  729. if na.Network != nil && na.Network.ID == network.ID {
  730. lbAttachment = na
  731. break
  732. }
  733. }
  734. if lbAttachment != nil {
  735. if nc.nwkAllocator.IsAttachmentAllocated(node, lbAttachment) {
  736. continue
  737. }
  738. }
  739. if lbAttachment == nil {
  740. lbAttachment = &api.NetworkAttachment{}
  741. node.Attachments = append(node.Attachments, lbAttachment)
  742. }
  743. if existingAddressesOnly && len(lbAttachment.Addresses) == 0 {
  744. continue
  745. }
  746. lbAttachment.Network = network.Copy()
  747. if err := a.netCtx.nwkAllocator.AllocateAttachment(node, lbAttachment); err != nil {
  748. log.G(ctx).WithError(err).Errorf("Failed to allocate network resources for node %s", node.ID)
  749. // TODO: Should we add a unallocatedNode and retry allocating resources like we do for network, tasks, services?
  750. // right now, we will only retry allocating network resources for the node when the node is updated.
  751. continue
  752. }
  753. allocated = true
  754. }
  755. return allocated
  756. }
  757. func (a *Allocator) commitAllocatedNode(ctx context.Context, batch *store.Batch, node *api.Node) error {
  758. if err := batch.Update(func(tx store.Tx) error {
  759. err := store.UpdateNode(tx, node)
  760. if err == store.ErrSequenceConflict {
  761. storeNode := store.GetNode(tx, node.ID)
  762. storeNode.Attachments = node.Attachments
  763. err = store.UpdateNode(tx, storeNode)
  764. }
  765. return errors.Wrapf(err, "failed updating state in store transaction for node %s", node.ID)
  766. }); err != nil {
  767. if err := a.deallocateNode(node); err != nil {
  768. log.G(ctx).WithError(err).Errorf("failed rolling back allocation of node %s", node.ID)
  769. }
  770. return err
  771. }
  772. return nil
  773. }
  774. // This function prepares the service object for being updated when the change regards
  775. // the published ports in host mode: It resets the runtime state ports (s.Endpoint.Ports)
  776. // to the current ingress mode runtime state ports plus the newly configured publish mode ports,
  777. // so that the service allocation invoked on this new service object will trigger the deallocation
  778. // of any old publish mode port and allocation of any new one.
  779. func updatePortsInHostPublishMode(s *api.Service) {
  780. // First, remove all host-mode ports from s.Endpoint.Ports
  781. if s.Endpoint != nil {
  782. var portConfigs []*api.PortConfig
  783. for _, portConfig := range s.Endpoint.Ports {
  784. if portConfig.PublishMode != api.PublishModeHost {
  785. portConfigs = append(portConfigs, portConfig)
  786. }
  787. }
  788. s.Endpoint.Ports = portConfigs
  789. }
  790. // Add back all host-mode ports
  791. if s.Spec.Endpoint != nil {
  792. if s.Endpoint == nil {
  793. s.Endpoint = &api.Endpoint{}
  794. }
  795. for _, portConfig := range s.Spec.Endpoint.Ports {
  796. if portConfig.PublishMode == api.PublishModeHost {
  797. s.Endpoint.Ports = append(s.Endpoint.Ports, portConfig.Copy())
  798. }
  799. }
  800. }
  801. s.Endpoint.Spec = s.Spec.Endpoint.Copy()
  802. }
  803. func (a *Allocator) allocateService(ctx context.Context, s *api.Service) error {
  804. nc := a.netCtx
  805. if s.Spec.Endpoint != nil {
  806. // service has user-defined endpoint
  807. if s.Endpoint == nil {
  808. // service currently has no allocated endpoint, need allocated.
  809. s.Endpoint = &api.Endpoint{
  810. Spec: s.Spec.Endpoint.Copy(),
  811. }
  812. }
  813. // The service is trying to expose ports to the external
  814. // world. Automatically attach the service to the ingress
  815. // network only if it is not already done.
  816. if IsIngressNetworkNeeded(s) {
  817. if nc.ingressNetwork == nil {
  818. return fmt.Errorf("ingress network is missing")
  819. }
  820. var found bool
  821. for _, vip := range s.Endpoint.VirtualIPs {
  822. if vip.NetworkID == nc.ingressNetwork.ID {
  823. found = true
  824. break
  825. }
  826. }
  827. if !found {
  828. s.Endpoint.VirtualIPs = append(s.Endpoint.VirtualIPs,
  829. &api.Endpoint_VirtualIP{NetworkID: nc.ingressNetwork.ID})
  830. }
  831. }
  832. } else if s.Endpoint != nil {
  833. // service has no user-defined endpoints while has already allocated network resources,
  834. // need deallocated.
  835. if err := nc.nwkAllocator.DeallocateService(s); err != nil {
  836. return err
  837. }
  838. nc.somethingWasDeallocated = true
  839. }
  840. if err := nc.nwkAllocator.AllocateService(s); err != nil {
  841. nc.unallocatedServices[s.ID] = s
  842. return err
  843. }
  844. // If the service doesn't expose ports any more and if we have
  845. // any lingering virtual IP references for ingress network
  846. // clean them up here.
  847. if !IsIngressNetworkNeeded(s) && nc.ingressNetwork != nil {
  848. if s.Endpoint != nil {
  849. for i, vip := range s.Endpoint.VirtualIPs {
  850. if vip.NetworkID == nc.ingressNetwork.ID {
  851. n := len(s.Endpoint.VirtualIPs)
  852. s.Endpoint.VirtualIPs[i], s.Endpoint.VirtualIPs[n-1] = s.Endpoint.VirtualIPs[n-1], nil
  853. s.Endpoint.VirtualIPs = s.Endpoint.VirtualIPs[:n-1]
  854. break
  855. }
  856. }
  857. }
  858. }
  859. return nil
  860. }
  861. func (a *Allocator) commitAllocatedService(ctx context.Context, batch *store.Batch, s *api.Service) error {
  862. if err := batch.Update(func(tx store.Tx) error {
  863. err := store.UpdateService(tx, s)
  864. if err == store.ErrSequenceConflict {
  865. storeService := store.GetService(tx, s.ID)
  866. storeService.Endpoint = s.Endpoint
  867. err = store.UpdateService(tx, storeService)
  868. }
  869. return errors.Wrapf(err, "failed updating state in store transaction for service %s", s.ID)
  870. }); err != nil {
  871. if err := a.netCtx.nwkAllocator.DeallocateService(s); err != nil {
  872. log.G(ctx).WithError(err).Errorf("failed rolling back allocation of service %s", s.ID)
  873. }
  874. return err
  875. }
  876. return nil
  877. }
  878. func (a *Allocator) allocateNetwork(ctx context.Context, n *api.Network) error {
  879. nc := a.netCtx
  880. if err := nc.nwkAllocator.Allocate(n); err != nil {
  881. nc.unallocatedNetworks[n.ID] = n
  882. return err
  883. }
  884. return nil
  885. }
  886. func (a *Allocator) commitAllocatedNetwork(ctx context.Context, batch *store.Batch, n *api.Network) error {
  887. if err := batch.Update(func(tx store.Tx) error {
  888. if err := store.UpdateNetwork(tx, n); err != nil {
  889. return errors.Wrapf(err, "failed updating state in store transaction for network %s", n.ID)
  890. }
  891. return nil
  892. }); err != nil {
  893. if err := a.netCtx.nwkAllocator.Deallocate(n); err != nil {
  894. log.G(ctx).WithError(err).Errorf("failed rolling back allocation of network %s", n.ID)
  895. }
  896. return err
  897. }
  898. return nil
  899. }
  900. func (a *Allocator) allocateTask(ctx context.Context, t *api.Task) (err error) {
  901. taskUpdated := false
  902. nc := a.netCtx
  903. // We might be here even if a task allocation has already
  904. // happened but wasn't successfully committed to store. In such
  905. // cases skip allocation and go straight ahead to updating the
  906. // store.
  907. if !nc.nwkAllocator.IsTaskAllocated(t) {
  908. a.store.View(func(tx store.ReadTx) {
  909. if t.ServiceID != "" {
  910. s := store.GetService(tx, t.ServiceID)
  911. if s == nil {
  912. err = fmt.Errorf("could not find service %s", t.ServiceID)
  913. return
  914. }
  915. if !nc.nwkAllocator.IsServiceAllocated(s) {
  916. err = fmt.Errorf("service %s to which this task %s belongs has pending allocations", s.ID, t.ID)
  917. return
  918. }
  919. if s.Endpoint != nil {
  920. taskUpdateEndpoint(t, s.Endpoint)
  921. taskUpdated = true
  922. }
  923. }
  924. for _, na := range t.Networks {
  925. n := store.GetNetwork(tx, na.Network.ID)
  926. if n == nil {
  927. err = fmt.Errorf("failed to retrieve network %s while allocating task %s", na.Network.ID, t.ID)
  928. return
  929. }
  930. if !nc.nwkAllocator.IsAllocated(n) {
  931. err = fmt.Errorf("network %s attached to task %s not allocated yet", n.ID, t.ID)
  932. return
  933. }
  934. na.Network = n
  935. }
  936. if err = nc.nwkAllocator.AllocateTask(t); err != nil {
  937. return
  938. }
  939. if nc.nwkAllocator.IsTaskAllocated(t) {
  940. taskUpdated = true
  941. }
  942. })
  943. if err != nil {
  944. return err
  945. }
  946. }
  947. // Update the network allocations and moving to
  948. // PENDING state on top of the latest store state.
  949. if a.taskAllocateVote(networkVoter, t.ID) {
  950. if t.Status.State < api.TaskStatePending {
  951. updateTaskStatus(t, api.TaskStatePending, allocatedStatusMessage)
  952. taskUpdated = true
  953. }
  954. }
  955. if !taskUpdated {
  956. return errNoChanges
  957. }
  958. return nil
  959. }
  960. func (a *Allocator) commitAllocatedTask(ctx context.Context, batch *store.Batch, t *api.Task) error {
  961. return batch.Update(func(tx store.Tx) error {
  962. err := store.UpdateTask(tx, t)
  963. if err == store.ErrSequenceConflict {
  964. storeTask := store.GetTask(tx, t.ID)
  965. taskUpdateNetworks(storeTask, t.Networks)
  966. taskUpdateEndpoint(storeTask, t.Endpoint)
  967. if storeTask.Status.State < api.TaskStatePending {
  968. storeTask.Status = t.Status
  969. }
  970. err = store.UpdateTask(tx, storeTask)
  971. }
  972. return errors.Wrapf(err, "failed updating state in store transaction for task %s", t.ID)
  973. })
  974. }
  975. func (a *Allocator) procUnallocatedNetworks(ctx context.Context) {
  976. nc := a.netCtx
  977. var allocatedNetworks []*api.Network
  978. for _, n := range nc.unallocatedNetworks {
  979. if !nc.nwkAllocator.IsAllocated(n) {
  980. if err := a.allocateNetwork(ctx, n); err != nil {
  981. log.G(ctx).WithError(err).Debugf("Failed allocation of unallocated network %s", n.ID)
  982. continue
  983. }
  984. allocatedNetworks = append(allocatedNetworks, n)
  985. }
  986. }
  987. if len(allocatedNetworks) == 0 {
  988. return
  989. }
  990. err := a.store.Batch(func(batch *store.Batch) error {
  991. for _, n := range allocatedNetworks {
  992. if err := a.commitAllocatedNetwork(ctx, batch, n); err != nil {
  993. log.G(ctx).WithError(err).Debugf("Failed to commit allocation of unallocated network %s", n.ID)
  994. continue
  995. }
  996. delete(nc.unallocatedNetworks, n.ID)
  997. }
  998. return nil
  999. })
  1000. if err != nil {
  1001. log.G(ctx).WithError(err).Error("Failed to commit allocation of unallocated networks")
  1002. // We optimistically removed these from nc.unallocatedNetworks
  1003. // above in anticipation of successfully committing the batch,
  1004. // but since the transaction has failed, we requeue them here.
  1005. for _, n := range allocatedNetworks {
  1006. nc.unallocatedNetworks[n.ID] = n
  1007. }
  1008. }
  1009. }
  1010. func (a *Allocator) procUnallocatedServices(ctx context.Context) {
  1011. nc := a.netCtx
  1012. var allocatedServices []*api.Service
  1013. for _, s := range nc.unallocatedServices {
  1014. if !nc.nwkAllocator.IsServiceAllocated(s) {
  1015. if err := a.allocateService(ctx, s); err != nil {
  1016. log.G(ctx).WithError(err).Debugf("Failed allocation of unallocated service %s", s.ID)
  1017. continue
  1018. }
  1019. allocatedServices = append(allocatedServices, s)
  1020. }
  1021. }
  1022. if len(allocatedServices) == 0 {
  1023. return
  1024. }
  1025. err := a.store.Batch(func(batch *store.Batch) error {
  1026. for _, s := range allocatedServices {
  1027. if err := a.commitAllocatedService(ctx, batch, s); err != nil {
  1028. log.G(ctx).WithError(err).Debugf("Failed to commit allocation of unallocated service %s", s.ID)
  1029. continue
  1030. }
  1031. delete(nc.unallocatedServices, s.ID)
  1032. }
  1033. return nil
  1034. })
  1035. if err != nil {
  1036. log.G(ctx).WithError(err).Error("Failed to commit allocation of unallocated services")
  1037. // We optimistically removed these from nc.unallocatedServices
  1038. // above in anticipation of successfully committing the batch,
  1039. // but since the transaction has failed, we requeue them here.
  1040. for _, s := range allocatedServices {
  1041. nc.unallocatedServices[s.ID] = s
  1042. }
  1043. }
  1044. }
  1045. func (a *Allocator) procTasksNetwork(ctx context.Context, onRetry bool) {
  1046. nc := a.netCtx
  1047. quiet := false
  1048. toAllocate := nc.pendingTasks
  1049. if onRetry {
  1050. toAllocate = nc.unallocatedTasks
  1051. quiet = true
  1052. }
  1053. allocatedTasks := make([]*api.Task, 0, len(toAllocate))
  1054. for _, t := range toAllocate {
  1055. if err := a.allocateTask(ctx, t); err == nil {
  1056. allocatedTasks = append(allocatedTasks, t)
  1057. } else if err != errNoChanges {
  1058. if quiet {
  1059. log.G(ctx).WithError(err).Debug("task allocation failure")
  1060. } else {
  1061. log.G(ctx).WithError(err).Error("task allocation failure")
  1062. }
  1063. }
  1064. }
  1065. if len(allocatedTasks) == 0 {
  1066. return
  1067. }
  1068. err := a.store.Batch(func(batch *store.Batch) error {
  1069. for _, t := range allocatedTasks {
  1070. err := a.commitAllocatedTask(ctx, batch, t)
  1071. if err != nil {
  1072. log.G(ctx).WithError(err).Error("task allocation commit failure")
  1073. continue
  1074. }
  1075. delete(toAllocate, t.ID)
  1076. }
  1077. return nil
  1078. })
  1079. if err != nil {
  1080. log.G(ctx).WithError(err).Error("failed a store batch operation while processing tasks")
  1081. // We optimistically removed these from toAllocate above in
  1082. // anticipation of successfully committing the batch, but since
  1083. // the transaction has failed, we requeue them here.
  1084. for _, t := range allocatedTasks {
  1085. toAllocate[t.ID] = t
  1086. }
  1087. }
  1088. }
  1089. // IsBuiltInNetworkDriver returns whether the passed driver is an internal network driver
  1090. func IsBuiltInNetworkDriver(name string) bool {
  1091. return cnmallocator.IsBuiltInDriver(name)
  1092. }
  1093. // PredefinedNetworks returns the list of predefined network structures for a given network model
  1094. func PredefinedNetworks() []networkallocator.PredefinedNetworkData {
  1095. return cnmallocator.PredefinedNetworks()
  1096. }
  1097. // updateTaskStatus sets TaskStatus and updates timestamp.
  1098. func updateTaskStatus(t *api.Task, newStatus api.TaskState, message string) {
  1099. t.Status = api.TaskStatus{
  1100. State: newStatus,
  1101. Message: message,
  1102. Timestamp: ptypes.MustTimestampProto(time.Now()),
  1103. }
  1104. }
  1105. // IsIngressNetwork returns whether the passed network is an ingress network.
  1106. func IsIngressNetwork(nw *api.Network) bool {
  1107. return networkallocator.IsIngressNetwork(nw)
  1108. }
  1109. // GetIngressNetwork fetches the ingress network from store.
  1110. // ErrNoIngress will be returned if the ingress network is not present,
  1111. // nil otherwise. In case of any other failure in accessing the store,
  1112. // the respective error will be reported as is.
  1113. func GetIngressNetwork(s *store.MemoryStore) (*api.Network, error) {
  1114. var (
  1115. networks []*api.Network
  1116. err error
  1117. )
  1118. s.View(func(tx store.ReadTx) {
  1119. networks, err = store.FindNetworks(tx, store.All)
  1120. })
  1121. if err != nil {
  1122. return nil, err
  1123. }
  1124. for _, n := range networks {
  1125. if IsIngressNetwork(n) {
  1126. return n, nil
  1127. }
  1128. }
  1129. return nil, ErrNoIngress
  1130. }