network.go 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116
  1. package allocator
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/docker/go-events"
  6. "github.com/docker/swarmkit/api"
  7. "github.com/docker/swarmkit/log"
  8. "github.com/docker/swarmkit/manager/allocator/networkallocator"
  9. "github.com/docker/swarmkit/manager/state"
  10. "github.com/docker/swarmkit/manager/state/store"
  11. "github.com/docker/swarmkit/protobuf/ptypes"
  12. "github.com/pkg/errors"
  13. "golang.org/x/net/context"
  14. )
  15. const (
  16. // Network allocator Voter ID for task allocation vote.
  17. networkVoter = "network"
  18. allocatedStatusMessage = "pending task scheduling"
  19. )
  20. var (
  21. // ErrNoIngress is returned when no ingress network is found in store
  22. ErrNoIngress = errors.New("no ingress network found")
  23. errNoChanges = errors.New("task unchanged")
  24. retryInterval = 5 * time.Minute
  25. )
  26. // Network context information which is used throughout the network allocation code.
  27. type networkContext struct {
  28. ingressNetwork *api.Network
  29. // Instance of the low-level network allocator which performs
  30. // the actual network allocation.
  31. nwkAllocator *networkallocator.NetworkAllocator
  32. // A set of tasks which are ready to be allocated as a batch. This is
  33. // distinct from "unallocatedTasks" which are tasks that failed to
  34. // allocate on the first try, being held for a future retry.
  35. pendingTasks map[string]*api.Task
  36. // A set of unallocated tasks which will be revisited if any thing
  37. // changes in system state that might help task allocation.
  38. unallocatedTasks map[string]*api.Task
  39. // A set of unallocated services which will be revisited if
  40. // any thing changes in system state that might help service
  41. // allocation.
  42. unallocatedServices map[string]*api.Service
  43. // A set of unallocated networks which will be revisited if
  44. // any thing changes in system state that might help network
  45. // allocation.
  46. unallocatedNetworks map[string]*api.Network
  47. // lastRetry is the last timestamp when unallocated
  48. // tasks/services/networks were retried.
  49. lastRetry time.Time
  50. }
  51. func (a *Allocator) doNetworkInit(ctx context.Context) (err error) {
  52. na, err := networkallocator.New(a.pluginGetter)
  53. if err != nil {
  54. return err
  55. }
  56. nc := &networkContext{
  57. nwkAllocator: na,
  58. pendingTasks: make(map[string]*api.Task),
  59. unallocatedTasks: make(map[string]*api.Task),
  60. unallocatedServices: make(map[string]*api.Service),
  61. unallocatedNetworks: make(map[string]*api.Network),
  62. lastRetry: time.Now(),
  63. }
  64. a.netCtx = nc
  65. defer func() {
  66. // Clear a.netCtx if initialization was unsuccessful.
  67. if err != nil {
  68. a.netCtx = nil
  69. }
  70. }()
  71. // Ingress network is now created at cluster's first time creation.
  72. // Check if we have the ingress network. If found, make sure it is
  73. // allocated, before reading all network objects for allocation.
  74. // If not found, it means it was removed by user, nothing to do here.
  75. ingressNetwork, err := GetIngressNetwork(a.store)
  76. switch err {
  77. case nil:
  78. // Try to complete ingress network allocation before anything else so
  79. // that the we can get the preferred subnet for ingress network.
  80. nc.ingressNetwork = ingressNetwork
  81. if !na.IsAllocated(nc.ingressNetwork) {
  82. if err := a.allocateNetwork(ctx, nc.ingressNetwork); err != nil {
  83. log.G(ctx).WithError(err).Error("failed allocating ingress network during init")
  84. } else if _, err := a.store.Batch(func(batch *store.Batch) error {
  85. if err := a.commitAllocatedNetwork(ctx, batch, nc.ingressNetwork); err != nil {
  86. log.G(ctx).WithError(err).Error("failed committing allocation of ingress network during init")
  87. }
  88. return nil
  89. }); err != nil {
  90. log.G(ctx).WithError(err).Error("failed committing allocation of ingress network during init")
  91. }
  92. }
  93. case ErrNoIngress:
  94. // Ingress network is not present in store, It means user removed it
  95. // and did not create a new one.
  96. default:
  97. return errors.Wrap(err, "failure while looking for ingress network during init")
  98. }
  99. // Allocate networks in the store so far before we started
  100. // watching.
  101. var networks []*api.Network
  102. a.store.View(func(tx store.ReadTx) {
  103. networks, err = store.FindNetworks(tx, store.All)
  104. })
  105. if err != nil {
  106. return errors.Wrap(err, "error listing all networks in store while trying to allocate during init")
  107. }
  108. var allocatedNetworks []*api.Network
  109. for _, n := range networks {
  110. if na.IsAllocated(n) {
  111. continue
  112. }
  113. if err := a.allocateNetwork(ctx, n); err != nil {
  114. log.G(ctx).WithError(err).Errorf("failed allocating network %s during init", n.ID)
  115. continue
  116. }
  117. allocatedNetworks = append(allocatedNetworks, n)
  118. }
  119. if _, err := a.store.Batch(func(batch *store.Batch) error {
  120. for _, n := range allocatedNetworks {
  121. if err := a.commitAllocatedNetwork(ctx, batch, n); err != nil {
  122. log.G(ctx).WithError(err).Errorf("failed committing allocation of network %s during init", n.ID)
  123. }
  124. }
  125. return nil
  126. }); err != nil {
  127. log.G(ctx).WithError(err).Error("failed committing allocation of networks during init")
  128. }
  129. // Allocate nodes in the store so far before we process watched events,
  130. // if the ingress network is present.
  131. if nc.ingressNetwork != nil {
  132. if err := a.allocateNodes(ctx); err != nil {
  133. return err
  134. }
  135. }
  136. // Allocate services in the store so far before we process watched events.
  137. var services []*api.Service
  138. a.store.View(func(tx store.ReadTx) {
  139. services, err = store.FindServices(tx, store.All)
  140. })
  141. if err != nil {
  142. return errors.Wrap(err, "error listing all services in store while trying to allocate during init")
  143. }
  144. var allocatedServices []*api.Service
  145. for _, s := range services {
  146. if !nc.nwkAllocator.ServiceNeedsAllocation(s, networkallocator.OnInit) {
  147. continue
  148. }
  149. if err := a.allocateService(ctx, s); err != nil {
  150. log.G(ctx).WithError(err).Errorf("failed allocating service %s during init", s.ID)
  151. continue
  152. }
  153. allocatedServices = append(allocatedServices, s)
  154. }
  155. if _, err := a.store.Batch(func(batch *store.Batch) error {
  156. for _, s := range allocatedServices {
  157. if err := a.commitAllocatedService(ctx, batch, s); err != nil {
  158. log.G(ctx).WithError(err).Errorf("failed committing allocation of service %s during init", s.ID)
  159. }
  160. }
  161. return nil
  162. }); err != nil {
  163. log.G(ctx).WithError(err).Error("failed committing allocation of services during init")
  164. }
  165. // Allocate tasks in the store so far before we started watching.
  166. var (
  167. tasks []*api.Task
  168. allocatedTasks []*api.Task
  169. )
  170. a.store.View(func(tx store.ReadTx) {
  171. tasks, err = store.FindTasks(tx, store.All)
  172. })
  173. if err != nil {
  174. return errors.Wrap(err, "error listing all tasks in store while trying to allocate during init")
  175. }
  176. for _, t := range tasks {
  177. if t.Status.State > api.TaskStateRunning {
  178. continue
  179. }
  180. var s *api.Service
  181. if t.ServiceID != "" {
  182. a.store.View(func(tx store.ReadTx) {
  183. s = store.GetService(tx, t.ServiceID)
  184. })
  185. }
  186. // Populate network attachments in the task
  187. // based on service spec.
  188. a.taskCreateNetworkAttachments(t, s)
  189. if taskReadyForNetworkVote(t, s, nc) {
  190. if t.Status.State >= api.TaskStatePending {
  191. continue
  192. }
  193. if a.taskAllocateVote(networkVoter, t.ID) {
  194. // If the task is not attached to any network, network
  195. // allocators job is done. Immediately cast a vote so
  196. // that the task can be moved to the PENDING state as
  197. // soon as possible.
  198. updateTaskStatus(t, api.TaskStatePending, allocatedStatusMessage)
  199. allocatedTasks = append(allocatedTasks, t)
  200. }
  201. continue
  202. }
  203. err := a.allocateTask(ctx, t)
  204. if err == nil {
  205. allocatedTasks = append(allocatedTasks, t)
  206. } else if err != errNoChanges {
  207. log.G(ctx).WithError(err).Errorf("failed allocating task %s during init", t.ID)
  208. nc.unallocatedTasks[t.ID] = t
  209. }
  210. }
  211. if _, err := a.store.Batch(func(batch *store.Batch) error {
  212. for _, t := range allocatedTasks {
  213. if err := a.commitAllocatedTask(ctx, batch, t); err != nil {
  214. log.G(ctx).WithError(err).Errorf("failed committing allocation of task %s during init", t.ID)
  215. }
  216. }
  217. return nil
  218. }); err != nil {
  219. log.G(ctx).WithError(err).Error("failed committing allocation of tasks during init")
  220. }
  221. return nil
  222. }
  223. func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
  224. nc := a.netCtx
  225. switch v := ev.(type) {
  226. case api.EventCreateNetwork:
  227. n := v.Network.Copy()
  228. if nc.nwkAllocator.IsAllocated(n) {
  229. break
  230. }
  231. if IsIngressNetwork(n) && nc.ingressNetwork != nil {
  232. log.G(ctx).Errorf("Cannot allocate ingress network %s (%s) because another ingress network is already present: %s (%s)",
  233. n.ID, n.Spec.Annotations.Name, nc.ingressNetwork.ID, nc.ingressNetwork.Spec.Annotations)
  234. break
  235. }
  236. if err := a.allocateNetwork(ctx, n); err != nil {
  237. log.G(ctx).WithError(err).Errorf("Failed allocation for network %s", n.ID)
  238. break
  239. }
  240. if _, err := a.store.Batch(func(batch *store.Batch) error {
  241. return a.commitAllocatedNetwork(ctx, batch, n)
  242. }); err != nil {
  243. log.G(ctx).WithError(err).Errorf("Failed to commit allocation for network %s", n.ID)
  244. }
  245. if IsIngressNetwork(n) {
  246. nc.ingressNetwork = n
  247. err := a.allocateNodes(ctx)
  248. if err != nil {
  249. log.G(ctx).WithError(err).Error(err)
  250. }
  251. }
  252. case api.EventDeleteNetwork:
  253. n := v.Network.Copy()
  254. if IsIngressNetwork(n) && nc.ingressNetwork != nil && nc.ingressNetwork.ID == n.ID {
  255. nc.ingressNetwork = nil
  256. if err := a.deallocateNodes(ctx); err != nil {
  257. log.G(ctx).WithError(err).Error(err)
  258. }
  259. }
  260. // The assumption here is that all dependent objects
  261. // have been cleaned up when we are here so the only
  262. // thing that needs to happen is free the network
  263. // resources.
  264. if err := nc.nwkAllocator.Deallocate(n); err != nil {
  265. log.G(ctx).WithError(err).Errorf("Failed during network free for network %s", n.ID)
  266. }
  267. delete(nc.unallocatedNetworks, n.ID)
  268. case api.EventCreateService:
  269. var s *api.Service
  270. a.store.View(func(tx store.ReadTx) {
  271. s = store.GetService(tx, v.Service.ID)
  272. })
  273. if s == nil {
  274. break
  275. }
  276. if !nc.nwkAllocator.ServiceNeedsAllocation(s) {
  277. break
  278. }
  279. if err := a.allocateService(ctx, s); err != nil {
  280. log.G(ctx).WithError(err).Errorf("Failed allocation for service %s", s.ID)
  281. break
  282. }
  283. if _, err := a.store.Batch(func(batch *store.Batch) error {
  284. return a.commitAllocatedService(ctx, batch, s)
  285. }); err != nil {
  286. log.G(ctx).WithError(err).Errorf("Failed to commit allocation for service %s", s.ID)
  287. }
  288. case api.EventUpdateService:
  289. // We may have already allocated this service. If a create or
  290. // update event is older than the current version in the store,
  291. // we run the risk of allocating the service a second time.
  292. // Only operate on the latest version of the service.
  293. var s *api.Service
  294. a.store.View(func(tx store.ReadTx) {
  295. s = store.GetService(tx, v.Service.ID)
  296. })
  297. if s == nil {
  298. break
  299. }
  300. if !nc.nwkAllocator.ServiceNeedsAllocation(s) {
  301. if nc.nwkAllocator.PortsAllocatedInHostPublishMode(s) {
  302. break
  303. }
  304. updatePortsInHostPublishMode(s)
  305. } else {
  306. if err := a.allocateService(ctx, s); err != nil {
  307. log.G(ctx).WithError(err).Errorf("Failed allocation during update of service %s", s.ID)
  308. break
  309. }
  310. }
  311. if _, err := a.store.Batch(func(batch *store.Batch) error {
  312. return a.commitAllocatedService(ctx, batch, s)
  313. }); err != nil {
  314. log.G(ctx).WithError(err).Errorf("Failed to commit allocation during update for service %s", s.ID)
  315. nc.unallocatedServices[s.ID] = s
  316. } else {
  317. delete(nc.unallocatedServices, s.ID)
  318. }
  319. case api.EventDeleteService:
  320. s := v.Service.Copy()
  321. if err := nc.nwkAllocator.ServiceDeallocate(s); err != nil {
  322. log.G(ctx).WithError(err).Errorf("Failed deallocation during delete of service %s", s.ID)
  323. }
  324. // Remove it from unallocatedServices just in case
  325. // it's still there.
  326. delete(nc.unallocatedServices, s.ID)
  327. case api.EventCreateNode, api.EventUpdateNode, api.EventDeleteNode:
  328. a.doNodeAlloc(ctx, ev)
  329. case api.EventCreateTask, api.EventUpdateTask, api.EventDeleteTask:
  330. a.doTaskAlloc(ctx, ev)
  331. case state.EventCommit:
  332. a.procTasksNetwork(ctx, false)
  333. if time.Since(nc.lastRetry) > retryInterval {
  334. a.procUnallocatedNetworks(ctx)
  335. a.procUnallocatedServices(ctx)
  336. a.procTasksNetwork(ctx, true)
  337. nc.lastRetry = time.Now()
  338. }
  339. // Any left over tasks are moved to the unallocated set
  340. for _, t := range nc.pendingTasks {
  341. nc.unallocatedTasks[t.ID] = t
  342. }
  343. nc.pendingTasks = make(map[string]*api.Task)
  344. }
  345. }
  346. func (a *Allocator) doNodeAlloc(ctx context.Context, ev events.Event) {
  347. var (
  348. isDelete bool
  349. node *api.Node
  350. )
  351. // We may have already allocated this node. If a create or update
  352. // event is older than the current version in the store, we run the
  353. // risk of allocating the node a second time. Only operate on the
  354. // latest version of the node.
  355. switch v := ev.(type) {
  356. case api.EventCreateNode:
  357. a.store.View(func(tx store.ReadTx) {
  358. node = store.GetNode(tx, v.Node.ID)
  359. })
  360. case api.EventUpdateNode:
  361. a.store.View(func(tx store.ReadTx) {
  362. node = store.GetNode(tx, v.Node.ID)
  363. })
  364. case api.EventDeleteNode:
  365. isDelete = true
  366. node = v.Node.Copy()
  367. }
  368. if node == nil {
  369. return
  370. }
  371. nc := a.netCtx
  372. if isDelete {
  373. if nc.nwkAllocator.IsNodeAllocated(node) {
  374. if err := nc.nwkAllocator.DeallocateNode(node); err != nil {
  375. log.G(ctx).WithError(err).Errorf("Failed freeing network resources for node %s", node.ID)
  376. }
  377. }
  378. return
  379. }
  380. if !nc.nwkAllocator.IsNodeAllocated(node) && nc.ingressNetwork != nil {
  381. if node.Attachment == nil {
  382. node.Attachment = &api.NetworkAttachment{}
  383. }
  384. node.Attachment.Network = nc.ingressNetwork.Copy()
  385. if err := a.allocateNode(ctx, node); err != nil {
  386. log.G(ctx).WithError(err).Errorf("Failed to allocate network resources for node %s", node.ID)
  387. return
  388. }
  389. if _, err := a.store.Batch(func(batch *store.Batch) error {
  390. return a.commitAllocatedNode(ctx, batch, node)
  391. }); err != nil {
  392. log.G(ctx).WithError(err).Errorf("Failed to commit allocation of network resources for node %s", node.ID)
  393. }
  394. }
  395. }
  396. func (a *Allocator) allocateNodes(ctx context.Context) error {
  397. // Allocate nodes in the store so far before we process watched events.
  398. var (
  399. allocatedNodes []*api.Node
  400. nodes []*api.Node
  401. err error
  402. nc = a.netCtx
  403. )
  404. a.store.View(func(tx store.ReadTx) {
  405. nodes, err = store.FindNodes(tx, store.All)
  406. })
  407. if err != nil {
  408. return errors.Wrap(err, "error listing all nodes in store while trying to allocate network resources")
  409. }
  410. for _, node := range nodes {
  411. if nc.nwkAllocator.IsNodeAllocated(node) {
  412. continue
  413. }
  414. if node.Attachment == nil {
  415. node.Attachment = &api.NetworkAttachment{}
  416. }
  417. node.Attachment.Network = nc.ingressNetwork.Copy()
  418. if err := a.allocateNode(ctx, node); err != nil {
  419. log.G(ctx).WithError(err).Errorf("Failed to allocate network resources for node %s", node.ID)
  420. continue
  421. }
  422. allocatedNodes = append(allocatedNodes, node)
  423. }
  424. if _, err := a.store.Batch(func(batch *store.Batch) error {
  425. for _, node := range allocatedNodes {
  426. if err := a.commitAllocatedNode(ctx, batch, node); err != nil {
  427. log.G(ctx).WithError(err).Errorf("Failed to commit allocation of network resources for node %s", node.ID)
  428. }
  429. }
  430. return nil
  431. }); err != nil {
  432. log.G(ctx).WithError(err).Error("Failed to commit allocation of network resources for nodes")
  433. }
  434. return nil
  435. }
  436. func (a *Allocator) deallocateNodes(ctx context.Context) error {
  437. var (
  438. nodes []*api.Node
  439. nc = a.netCtx
  440. err error
  441. )
  442. a.store.View(func(tx store.ReadTx) {
  443. nodes, err = store.FindNodes(tx, store.All)
  444. })
  445. if err != nil {
  446. return fmt.Errorf("error listing all nodes in store while trying to free network resources")
  447. }
  448. for _, node := range nodes {
  449. if nc.nwkAllocator.IsNodeAllocated(node) {
  450. if err := nc.nwkAllocator.DeallocateNode(node); err != nil {
  451. log.G(ctx).WithError(err).Errorf("Failed freeing network resources for node %s", node.ID)
  452. }
  453. node.Attachment = nil
  454. if _, err := a.store.Batch(func(batch *store.Batch) error {
  455. return a.commitAllocatedNode(ctx, batch, node)
  456. }); err != nil {
  457. log.G(ctx).WithError(err).Errorf("Failed to commit deallocation of network resources for node %s", node.ID)
  458. }
  459. }
  460. }
  461. return nil
  462. }
  463. // taskReadyForNetworkVote checks if the task is ready for a network
  464. // vote to move it to PENDING state.
  465. func taskReadyForNetworkVote(t *api.Task, s *api.Service, nc *networkContext) bool {
  466. // Task is ready for vote if the following is true:
  467. //
  468. // Task has no network attached or networks attached but all
  469. // of them allocated AND Task's service has no endpoint or
  470. // network configured or service endpoints have been
  471. // allocated.
  472. return (len(t.Networks) == 0 || nc.nwkAllocator.IsTaskAllocated(t)) &&
  473. (s == nil || !nc.nwkAllocator.ServiceNeedsAllocation(s))
  474. }
  475. func taskUpdateNetworks(t *api.Task, networks []*api.NetworkAttachment) {
  476. networksCopy := make([]*api.NetworkAttachment, 0, len(networks))
  477. for _, n := range networks {
  478. networksCopy = append(networksCopy, n.Copy())
  479. }
  480. t.Networks = networksCopy
  481. }
  482. func taskUpdateEndpoint(t *api.Task, endpoint *api.Endpoint) {
  483. t.Endpoint = endpoint.Copy()
  484. }
  485. // IsIngressNetworkNeeded checks whether the service requires the routing-mesh
  486. func IsIngressNetworkNeeded(s *api.Service) bool {
  487. if s == nil {
  488. return false
  489. }
  490. if s.Spec.Endpoint == nil {
  491. return false
  492. }
  493. for _, p := range s.Spec.Endpoint.Ports {
  494. // The service to which this task belongs is trying to
  495. // expose ports with PublishMode as Ingress to the
  496. // external world. Automatically attach the task to
  497. // the ingress network.
  498. if p.PublishMode == api.PublishModeIngress {
  499. return true
  500. }
  501. }
  502. return false
  503. }
  504. func (a *Allocator) taskCreateNetworkAttachments(t *api.Task, s *api.Service) {
  505. // If task network attachments have already been filled in no
  506. // need to do anything else.
  507. if len(t.Networks) != 0 {
  508. return
  509. }
  510. var networks []*api.NetworkAttachment
  511. if IsIngressNetworkNeeded(s) && a.netCtx.ingressNetwork != nil {
  512. networks = append(networks, &api.NetworkAttachment{Network: a.netCtx.ingressNetwork})
  513. }
  514. a.store.View(func(tx store.ReadTx) {
  515. // Always prefer NetworkAttachmentConfig in the TaskSpec
  516. specNetworks := t.Spec.Networks
  517. if len(specNetworks) == 0 && s != nil && len(s.Spec.Networks) != 0 {
  518. specNetworks = s.Spec.Networks
  519. }
  520. for _, na := range specNetworks {
  521. n := store.GetNetwork(tx, na.Target)
  522. if n == nil {
  523. continue
  524. }
  525. attachment := api.NetworkAttachment{Network: n}
  526. attachment.Aliases = append(attachment.Aliases, na.Aliases...)
  527. attachment.Addresses = append(attachment.Addresses, na.Addresses...)
  528. networks = append(networks, &attachment)
  529. }
  530. })
  531. taskUpdateNetworks(t, networks)
  532. }
  533. func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) {
  534. var (
  535. isDelete bool
  536. t *api.Task
  537. )
  538. // We may have already allocated this task. If a create or update
  539. // event is older than the current version in the store, we run the
  540. // risk of allocating the task a second time. Only operate on the
  541. // latest version of the task.
  542. switch v := ev.(type) {
  543. case api.EventCreateTask:
  544. a.store.View(func(tx store.ReadTx) {
  545. t = store.GetTask(tx, v.Task.ID)
  546. })
  547. case api.EventUpdateTask:
  548. a.store.View(func(tx store.ReadTx) {
  549. t = store.GetTask(tx, v.Task.ID)
  550. })
  551. case api.EventDeleteTask:
  552. isDelete = true
  553. t = v.Task.Copy()
  554. }
  555. if t == nil {
  556. return
  557. }
  558. nc := a.netCtx
  559. // If the task has stopped running then we should free the network
  560. // resources associated with the task right away.
  561. if t.Status.State > api.TaskStateRunning || isDelete {
  562. if nc.nwkAllocator.IsTaskAllocated(t) {
  563. if err := nc.nwkAllocator.DeallocateTask(t); err != nil {
  564. log.G(ctx).WithError(err).Errorf("Failed freeing network resources for task %s", t.ID)
  565. }
  566. }
  567. // Cleanup any task references that might exist
  568. delete(nc.pendingTasks, t.ID)
  569. delete(nc.unallocatedTasks, t.ID)
  570. return
  571. }
  572. // If we are already in allocated state, there is
  573. // absolutely nothing else to do.
  574. if t.Status.State >= api.TaskStatePending {
  575. delete(nc.pendingTasks, t.ID)
  576. delete(nc.unallocatedTasks, t.ID)
  577. return
  578. }
  579. var s *api.Service
  580. if t.ServiceID != "" {
  581. a.store.View(func(tx store.ReadTx) {
  582. s = store.GetService(tx, t.ServiceID)
  583. })
  584. if s == nil {
  585. // If the task is running it is not normal to
  586. // not be able to find the associated
  587. // service. If the task is not running (task
  588. // is either dead or the desired state is set
  589. // to dead) then the service may not be
  590. // available in store. But we still need to
  591. // cleanup network resources associated with
  592. // the task.
  593. if t.Status.State <= api.TaskStateRunning && !isDelete {
  594. log.G(ctx).Errorf("Event %T: Failed to get service %s for task %s state %s: could not find service %s", ev, t.ServiceID, t.ID, t.Status.State, t.ServiceID)
  595. return
  596. }
  597. }
  598. }
  599. // Populate network attachments in the task
  600. // based on service spec.
  601. a.taskCreateNetworkAttachments(t, s)
  602. nc.pendingTasks[t.ID] = t
  603. }
  604. func (a *Allocator) allocateNode(ctx context.Context, node *api.Node) error {
  605. return a.netCtx.nwkAllocator.AllocateNode(node)
  606. }
  607. func (a *Allocator) commitAllocatedNode(ctx context.Context, batch *store.Batch, node *api.Node) error {
  608. if err := batch.Update(func(tx store.Tx) error {
  609. err := store.UpdateNode(tx, node)
  610. if err == store.ErrSequenceConflict {
  611. storeNode := store.GetNode(tx, node.ID)
  612. storeNode.Attachment = node.Attachment.Copy()
  613. err = store.UpdateNode(tx, storeNode)
  614. }
  615. return errors.Wrapf(err, "failed updating state in store transaction for node %s", node.ID)
  616. }); err != nil {
  617. if err := a.netCtx.nwkAllocator.DeallocateNode(node); err != nil {
  618. log.G(ctx).WithError(err).Errorf("failed rolling back allocation of node %s", node.ID)
  619. }
  620. return err
  621. }
  622. return nil
  623. }
  624. // This function prepares the service object for being updated when the change regards
  625. // the published ports in host mode: It resets the runtime state ports (s.Endpoint.Ports)
  626. // to the current ingress mode runtime state ports plus the newly configured publish mode ports,
  627. // so that the service allocation invoked on this new service object will trigger the deallocation
  628. // of any old publish mode port and allocation of any new one.
  629. func updatePortsInHostPublishMode(s *api.Service) {
  630. if s.Endpoint != nil {
  631. var portConfigs []*api.PortConfig
  632. for _, portConfig := range s.Endpoint.Ports {
  633. if portConfig.PublishMode == api.PublishModeIngress {
  634. portConfigs = append(portConfigs, portConfig)
  635. }
  636. }
  637. s.Endpoint.Ports = portConfigs
  638. }
  639. if s.Spec.Endpoint != nil {
  640. if s.Endpoint == nil {
  641. s.Endpoint = &api.Endpoint{}
  642. }
  643. for _, portConfig := range s.Spec.Endpoint.Ports {
  644. if portConfig.PublishMode == api.PublishModeIngress {
  645. continue
  646. }
  647. s.Endpoint.Ports = append(s.Endpoint.Ports, portConfig.Copy())
  648. }
  649. s.Endpoint.Spec = s.Spec.Endpoint.Copy()
  650. }
  651. }
  652. func (a *Allocator) allocateService(ctx context.Context, s *api.Service) error {
  653. nc := a.netCtx
  654. if s.Spec.Endpoint != nil {
  655. // service has user-defined endpoint
  656. if s.Endpoint == nil {
  657. // service currently has no allocated endpoint, need allocated.
  658. s.Endpoint = &api.Endpoint{
  659. Spec: s.Spec.Endpoint.Copy(),
  660. }
  661. }
  662. // The service is trying to expose ports to the external
  663. // world. Automatically attach the service to the ingress
  664. // network only if it is not already done.
  665. if IsIngressNetworkNeeded(s) {
  666. if nc.ingressNetwork == nil {
  667. return fmt.Errorf("ingress network is missing")
  668. }
  669. var found bool
  670. for _, vip := range s.Endpoint.VirtualIPs {
  671. if vip.NetworkID == nc.ingressNetwork.ID {
  672. found = true
  673. break
  674. }
  675. }
  676. if !found {
  677. s.Endpoint.VirtualIPs = append(s.Endpoint.VirtualIPs,
  678. &api.Endpoint_VirtualIP{NetworkID: nc.ingressNetwork.ID})
  679. }
  680. }
  681. } else if s.Endpoint != nil {
  682. // service has no user-defined endpoints while has already allocated network resources,
  683. // need deallocated.
  684. if err := nc.nwkAllocator.ServiceDeallocate(s); err != nil {
  685. return err
  686. }
  687. }
  688. if err := nc.nwkAllocator.ServiceAllocate(s); err != nil {
  689. nc.unallocatedServices[s.ID] = s
  690. return err
  691. }
  692. // If the service doesn't expose ports any more and if we have
  693. // any lingering virtual IP references for ingress network
  694. // clean them up here.
  695. if !IsIngressNetworkNeeded(s) && nc.ingressNetwork != nil {
  696. if s.Endpoint != nil {
  697. for i, vip := range s.Endpoint.VirtualIPs {
  698. if vip.NetworkID == nc.ingressNetwork.ID {
  699. n := len(s.Endpoint.VirtualIPs)
  700. s.Endpoint.VirtualIPs[i], s.Endpoint.VirtualIPs[n-1] = s.Endpoint.VirtualIPs[n-1], nil
  701. s.Endpoint.VirtualIPs = s.Endpoint.VirtualIPs[:n-1]
  702. break
  703. }
  704. }
  705. }
  706. }
  707. return nil
  708. }
  709. func (a *Allocator) commitAllocatedService(ctx context.Context, batch *store.Batch, s *api.Service) error {
  710. if err := batch.Update(func(tx store.Tx) error {
  711. err := store.UpdateService(tx, s)
  712. if err == store.ErrSequenceConflict {
  713. storeService := store.GetService(tx, s.ID)
  714. storeService.Endpoint = s.Endpoint
  715. err = store.UpdateService(tx, storeService)
  716. }
  717. return errors.Wrapf(err, "failed updating state in store transaction for service %s", s.ID)
  718. }); err != nil {
  719. if err := a.netCtx.nwkAllocator.ServiceDeallocate(s); err != nil {
  720. log.G(ctx).WithError(err).Errorf("failed rolling back allocation of service %s", s.ID)
  721. }
  722. return err
  723. }
  724. return nil
  725. }
  726. func (a *Allocator) allocateNetwork(ctx context.Context, n *api.Network) error {
  727. nc := a.netCtx
  728. if err := nc.nwkAllocator.Allocate(n); err != nil {
  729. nc.unallocatedNetworks[n.ID] = n
  730. return errors.Wrapf(err, "failed during network allocation for network %s", n.ID)
  731. }
  732. return nil
  733. }
  734. func (a *Allocator) commitAllocatedNetwork(ctx context.Context, batch *store.Batch, n *api.Network) error {
  735. if err := batch.Update(func(tx store.Tx) error {
  736. if err := store.UpdateNetwork(tx, n); err != nil {
  737. return errors.Wrapf(err, "failed updating state in store transaction for network %s", n.ID)
  738. }
  739. return nil
  740. }); err != nil {
  741. if err := a.netCtx.nwkAllocator.Deallocate(n); err != nil {
  742. log.G(ctx).WithError(err).Errorf("failed rolling back allocation of network %s", n.ID)
  743. }
  744. return err
  745. }
  746. return nil
  747. }
  748. func (a *Allocator) allocateTask(ctx context.Context, t *api.Task) (err error) {
  749. taskUpdated := false
  750. nc := a.netCtx
  751. // We might be here even if a task allocation has already
  752. // happened but wasn't successfully committed to store. In such
  753. // cases skip allocation and go straight ahead to updating the
  754. // store.
  755. if !nc.nwkAllocator.IsTaskAllocated(t) {
  756. a.store.View(func(tx store.ReadTx) {
  757. if t.ServiceID != "" {
  758. s := store.GetService(tx, t.ServiceID)
  759. if s == nil {
  760. err = fmt.Errorf("could not find service %s", t.ServiceID)
  761. return
  762. }
  763. if nc.nwkAllocator.ServiceNeedsAllocation(s) {
  764. err = fmt.Errorf("service %s to which this task %s belongs has pending allocations", s.ID, t.ID)
  765. return
  766. }
  767. if s.Endpoint != nil {
  768. taskUpdateEndpoint(t, s.Endpoint)
  769. taskUpdated = true
  770. }
  771. }
  772. for _, na := range t.Networks {
  773. n := store.GetNetwork(tx, na.Network.ID)
  774. if n == nil {
  775. err = fmt.Errorf("failed to retrieve network %s while allocating task %s", na.Network.ID, t.ID)
  776. return
  777. }
  778. if !nc.nwkAllocator.IsAllocated(n) {
  779. err = fmt.Errorf("network %s attached to task %s not allocated yet", n.ID, t.ID)
  780. return
  781. }
  782. na.Network = n
  783. }
  784. if err = nc.nwkAllocator.AllocateTask(t); err != nil {
  785. err = errors.Wrapf(err, "failed during network allocation for task %s", t.ID)
  786. return
  787. }
  788. if nc.nwkAllocator.IsTaskAllocated(t) {
  789. taskUpdated = true
  790. }
  791. })
  792. if err != nil {
  793. return err
  794. }
  795. }
  796. // Update the network allocations and moving to
  797. // PENDING state on top of the latest store state.
  798. if a.taskAllocateVote(networkVoter, t.ID) {
  799. if t.Status.State < api.TaskStatePending {
  800. updateTaskStatus(t, api.TaskStatePending, allocatedStatusMessage)
  801. taskUpdated = true
  802. }
  803. }
  804. if !taskUpdated {
  805. return errNoChanges
  806. }
  807. return nil
  808. }
  809. func (a *Allocator) commitAllocatedTask(ctx context.Context, batch *store.Batch, t *api.Task) error {
  810. return batch.Update(func(tx store.Tx) error {
  811. err := store.UpdateTask(tx, t)
  812. if err == store.ErrSequenceConflict {
  813. storeTask := store.GetTask(tx, t.ID)
  814. taskUpdateNetworks(storeTask, t.Networks)
  815. taskUpdateEndpoint(storeTask, t.Endpoint)
  816. if storeTask.Status.State < api.TaskStatePending {
  817. storeTask.Status = t.Status
  818. }
  819. err = store.UpdateTask(tx, storeTask)
  820. }
  821. return errors.Wrapf(err, "failed updating state in store transaction for task %s", t.ID)
  822. })
  823. }
  824. func (a *Allocator) procUnallocatedNetworks(ctx context.Context) {
  825. nc := a.netCtx
  826. var allocatedNetworks []*api.Network
  827. for _, n := range nc.unallocatedNetworks {
  828. if !nc.nwkAllocator.IsAllocated(n) {
  829. if err := a.allocateNetwork(ctx, n); err != nil {
  830. log.G(ctx).WithError(err).Debugf("Failed allocation of unallocated network %s", n.ID)
  831. continue
  832. }
  833. allocatedNetworks = append(allocatedNetworks, n)
  834. }
  835. }
  836. if len(allocatedNetworks) == 0 {
  837. return
  838. }
  839. committed, err := a.store.Batch(func(batch *store.Batch) error {
  840. for _, n := range allocatedNetworks {
  841. if err := a.commitAllocatedNetwork(ctx, batch, n); err != nil {
  842. log.G(ctx).WithError(err).Debugf("Failed to commit allocation of unallocated network %s", n.ID)
  843. continue
  844. }
  845. }
  846. return nil
  847. })
  848. if err != nil {
  849. log.G(ctx).WithError(err).Error("Failed to commit allocation of unallocated networks")
  850. }
  851. for _, n := range allocatedNetworks[:committed] {
  852. delete(nc.unallocatedNetworks, n.ID)
  853. }
  854. }
  855. func (a *Allocator) procUnallocatedServices(ctx context.Context) {
  856. nc := a.netCtx
  857. var allocatedServices []*api.Service
  858. for _, s := range nc.unallocatedServices {
  859. if nc.nwkAllocator.ServiceNeedsAllocation(s) {
  860. if err := a.allocateService(ctx, s); err != nil {
  861. log.G(ctx).WithError(err).Debugf("Failed allocation of unallocated service %s", s.ID)
  862. continue
  863. }
  864. allocatedServices = append(allocatedServices, s)
  865. }
  866. }
  867. if len(allocatedServices) == 0 {
  868. return
  869. }
  870. committed, err := a.store.Batch(func(batch *store.Batch) error {
  871. for _, s := range allocatedServices {
  872. if err := a.commitAllocatedService(ctx, batch, s); err != nil {
  873. log.G(ctx).WithError(err).Debugf("Failed to commit allocation of unallocated service %s", s.ID)
  874. continue
  875. }
  876. }
  877. return nil
  878. })
  879. if err != nil {
  880. log.G(ctx).WithError(err).Error("Failed to commit allocation of unallocated services")
  881. }
  882. for _, s := range allocatedServices[:committed] {
  883. delete(nc.unallocatedServices, s.ID)
  884. }
  885. }
  886. func (a *Allocator) procTasksNetwork(ctx context.Context, onRetry bool) {
  887. nc := a.netCtx
  888. quiet := false
  889. toAllocate := nc.pendingTasks
  890. if onRetry {
  891. toAllocate = nc.unallocatedTasks
  892. quiet = true
  893. }
  894. allocatedTasks := make([]*api.Task, 0, len(toAllocate))
  895. for _, t := range toAllocate {
  896. if err := a.allocateTask(ctx, t); err == nil {
  897. allocatedTasks = append(allocatedTasks, t)
  898. } else if err != errNoChanges {
  899. if quiet {
  900. log.G(ctx).WithError(err).Debug("task allocation failure")
  901. } else {
  902. log.G(ctx).WithError(err).Error("task allocation failure")
  903. }
  904. }
  905. }
  906. if len(allocatedTasks) == 0 {
  907. return
  908. }
  909. committed, err := a.store.Batch(func(batch *store.Batch) error {
  910. for _, t := range allocatedTasks {
  911. err := a.commitAllocatedTask(ctx, batch, t)
  912. if err != nil {
  913. log.G(ctx).WithError(err).Error("task allocation commit failure")
  914. continue
  915. }
  916. }
  917. return nil
  918. })
  919. if err != nil {
  920. log.G(ctx).WithError(err).Error("failed a store batch operation while processing tasks")
  921. }
  922. for _, t := range allocatedTasks[:committed] {
  923. delete(toAllocate, t.ID)
  924. }
  925. }
  926. // updateTaskStatus sets TaskStatus and updates timestamp.
  927. func updateTaskStatus(t *api.Task, newStatus api.TaskState, message string) {
  928. t.Status.State = newStatus
  929. t.Status.Message = message
  930. t.Status.Timestamp = ptypes.MustTimestampProto(time.Now())
  931. }
  932. // IsIngressNetwork returns whether the passed network is an ingress network.
  933. func IsIngressNetwork(nw *api.Network) bool {
  934. return networkallocator.IsIngressNetwork(nw)
  935. }
  936. // GetIngressNetwork fetches the ingress network from store.
  937. // ErrNoIngress will be returned if the ingress network is not present,
  938. // nil otherwise. In case of any other failure in accessing the store,
  939. // the respective error will be reported as is.
  940. func GetIngressNetwork(s *store.MemoryStore) (*api.Network, error) {
  941. var (
  942. networks []*api.Network
  943. err error
  944. )
  945. s.View(func(tx store.ReadTx) {
  946. networks, err = store.FindNetworks(tx, store.All)
  947. })
  948. if err != nil {
  949. return nil, err
  950. }
  951. for _, n := range networks {
  952. if IsIngressNetwork(n) {
  953. return n, nil
  954. }
  955. }
  956. return nil, ErrNoIngress
  957. }