network.go 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125
  1. package allocator
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/docker/go-events"
  6. "github.com/docker/swarmkit/api"
  7. "github.com/docker/swarmkit/log"
  8. "github.com/docker/swarmkit/manager/allocator/networkallocator"
  9. "github.com/docker/swarmkit/manager/state"
  10. "github.com/docker/swarmkit/manager/state/store"
  11. "github.com/docker/swarmkit/protobuf/ptypes"
  12. "github.com/pkg/errors"
  13. "golang.org/x/net/context"
  14. )
  15. const (
  16. // Network allocator Voter ID for task allocation vote.
  17. networkVoter = "network"
  18. allocatedStatusMessage = "pending task scheduling"
  19. )
  20. var (
  21. // ErrNoIngress is returned when no ingress network is found in store
  22. ErrNoIngress = errors.New("no ingress network found")
  23. errNoChanges = errors.New("task unchanged")
  24. retryInterval = 5 * time.Minute
  25. )
  26. // Network context information which is used throughout the network allocation code.
  27. type networkContext struct {
  28. ingressNetwork *api.Network
  29. // Instance of the low-level network allocator which performs
  30. // the actual network allocation.
  31. nwkAllocator *networkallocator.NetworkAllocator
  32. // A set of tasks which are ready to be allocated as a batch. This is
  33. // distinct from "unallocatedTasks" which are tasks that failed to
  34. // allocate on the first try, being held for a future retry.
  35. pendingTasks map[string]*api.Task
  36. // A set of unallocated tasks which will be revisited if any thing
  37. // changes in system state that might help task allocation.
  38. unallocatedTasks map[string]*api.Task
  39. // A set of unallocated services which will be revisited if
  40. // any thing changes in system state that might help service
  41. // allocation.
  42. unallocatedServices map[string]*api.Service
  43. // A set of unallocated networks which will be revisited if
  44. // any thing changes in system state that might help network
  45. // allocation.
  46. unallocatedNetworks map[string]*api.Network
  47. // lastRetry is the last timestamp when unallocated
  48. // tasks/services/networks were retried.
  49. lastRetry time.Time
  50. }
  51. func (a *Allocator) doNetworkInit(ctx context.Context) (err error) {
  52. na, err := networkallocator.New(a.pluginGetter)
  53. if err != nil {
  54. return err
  55. }
  56. nc := &networkContext{
  57. nwkAllocator: na,
  58. pendingTasks: make(map[string]*api.Task),
  59. unallocatedTasks: make(map[string]*api.Task),
  60. unallocatedServices: make(map[string]*api.Service),
  61. unallocatedNetworks: make(map[string]*api.Network),
  62. lastRetry: time.Now(),
  63. }
  64. a.netCtx = nc
  65. defer func() {
  66. // Clear a.netCtx if initialization was unsuccessful.
  67. if err != nil {
  68. a.netCtx = nil
  69. }
  70. }()
  71. // Ingress network is now created at cluster's first time creation.
  72. // Check if we have the ingress network. If found, make sure it is
  73. // allocated, before reading all network objects for allocation.
  74. // If not found, it means it was removed by user, nothing to do here.
  75. ingressNetwork, err := GetIngressNetwork(a.store)
  76. switch err {
  77. case nil:
  78. // Try to complete ingress network allocation before anything else so
  79. // that the we can get the preferred subnet for ingress network.
  80. nc.ingressNetwork = ingressNetwork
  81. if !na.IsAllocated(nc.ingressNetwork) {
  82. if err := a.allocateNetwork(ctx, nc.ingressNetwork); err != nil {
  83. log.G(ctx).WithError(err).Error("failed allocating ingress network during init")
  84. } else if err := a.store.Batch(func(batch *store.Batch) error {
  85. if err := a.commitAllocatedNetwork(ctx, batch, nc.ingressNetwork); err != nil {
  86. log.G(ctx).WithError(err).Error("failed committing allocation of ingress network during init")
  87. }
  88. return nil
  89. }); err != nil {
  90. log.G(ctx).WithError(err).Error("failed committing allocation of ingress network during init")
  91. }
  92. }
  93. case ErrNoIngress:
  94. // Ingress network is not present in store, It means user removed it
  95. // and did not create a new one.
  96. default:
  97. return errors.Wrap(err, "failure while looking for ingress network during init")
  98. }
  99. // Allocate networks in the store so far before we started
  100. // watching.
  101. var networks []*api.Network
  102. a.store.View(func(tx store.ReadTx) {
  103. networks, err = store.FindNetworks(tx, store.All)
  104. })
  105. if err != nil {
  106. return errors.Wrap(err, "error listing all networks in store while trying to allocate during init")
  107. }
  108. var allocatedNetworks []*api.Network
  109. for _, n := range networks {
  110. if na.IsAllocated(n) {
  111. continue
  112. }
  113. if err := a.allocateNetwork(ctx, n); err != nil {
  114. log.G(ctx).WithError(err).Errorf("failed allocating network %s during init", n.ID)
  115. continue
  116. }
  117. allocatedNetworks = append(allocatedNetworks, n)
  118. }
  119. if err := a.store.Batch(func(batch *store.Batch) error {
  120. for _, n := range allocatedNetworks {
  121. if err := a.commitAllocatedNetwork(ctx, batch, n); err != nil {
  122. log.G(ctx).WithError(err).Errorf("failed committing allocation of network %s during init", n.ID)
  123. }
  124. }
  125. return nil
  126. }); err != nil {
  127. log.G(ctx).WithError(err).Error("failed committing allocation of networks during init")
  128. }
  129. // Allocate nodes in the store so far before we process watched events,
  130. // if the ingress network is present.
  131. if nc.ingressNetwork != nil {
  132. if err := a.allocateNodes(ctx); err != nil {
  133. return err
  134. }
  135. }
  136. // Allocate services in the store so far before we process watched events.
  137. var services []*api.Service
  138. a.store.View(func(tx store.ReadTx) {
  139. services, err = store.FindServices(tx, store.All)
  140. })
  141. if err != nil {
  142. return errors.Wrap(err, "error listing all services in store while trying to allocate during init")
  143. }
  144. var allocatedServices []*api.Service
  145. for _, s := range services {
  146. if !nc.nwkAllocator.ServiceNeedsAllocation(s, networkallocator.OnInit) {
  147. continue
  148. }
  149. if err := a.allocateService(ctx, s); err != nil {
  150. log.G(ctx).WithError(err).Errorf("failed allocating service %s during init", s.ID)
  151. continue
  152. }
  153. allocatedServices = append(allocatedServices, s)
  154. }
  155. if err := a.store.Batch(func(batch *store.Batch) error {
  156. for _, s := range allocatedServices {
  157. if err := a.commitAllocatedService(ctx, batch, s); err != nil {
  158. log.G(ctx).WithError(err).Errorf("failed committing allocation of service %s during init", s.ID)
  159. }
  160. }
  161. return nil
  162. }); err != nil {
  163. log.G(ctx).WithError(err).Error("failed committing allocation of services during init")
  164. }
  165. // Allocate tasks in the store so far before we started watching.
  166. var (
  167. tasks []*api.Task
  168. allocatedTasks []*api.Task
  169. )
  170. a.store.View(func(tx store.ReadTx) {
  171. tasks, err = store.FindTasks(tx, store.All)
  172. })
  173. if err != nil {
  174. return errors.Wrap(err, "error listing all tasks in store while trying to allocate during init")
  175. }
  176. for _, t := range tasks {
  177. if t.Status.State > api.TaskStateRunning {
  178. continue
  179. }
  180. var s *api.Service
  181. if t.ServiceID != "" {
  182. a.store.View(func(tx store.ReadTx) {
  183. s = store.GetService(tx, t.ServiceID)
  184. })
  185. }
  186. // Populate network attachments in the task
  187. // based on service spec.
  188. a.taskCreateNetworkAttachments(t, s)
  189. if taskReadyForNetworkVote(t, s, nc) {
  190. if t.Status.State >= api.TaskStatePending {
  191. continue
  192. }
  193. if a.taskAllocateVote(networkVoter, t.ID) {
  194. // If the task is not attached to any network, network
  195. // allocators job is done. Immediately cast a vote so
  196. // that the task can be moved to the PENDING state as
  197. // soon as possible.
  198. updateTaskStatus(t, api.TaskStatePending, allocatedStatusMessage)
  199. allocatedTasks = append(allocatedTasks, t)
  200. }
  201. continue
  202. }
  203. err := a.allocateTask(ctx, t)
  204. if err == nil {
  205. allocatedTasks = append(allocatedTasks, t)
  206. } else if err != errNoChanges {
  207. log.G(ctx).WithError(err).Errorf("failed allocating task %s during init", t.ID)
  208. nc.unallocatedTasks[t.ID] = t
  209. }
  210. }
  211. if err := a.store.Batch(func(batch *store.Batch) error {
  212. for _, t := range allocatedTasks {
  213. if err := a.commitAllocatedTask(ctx, batch, t); err != nil {
  214. log.G(ctx).WithError(err).Errorf("failed committing allocation of task %s during init", t.ID)
  215. }
  216. }
  217. return nil
  218. }); err != nil {
  219. log.G(ctx).WithError(err).Error("failed committing allocation of tasks during init")
  220. }
  221. return nil
  222. }
  223. func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
  224. nc := a.netCtx
  225. switch v := ev.(type) {
  226. case api.EventCreateNetwork:
  227. n := v.Network.Copy()
  228. if nc.nwkAllocator.IsAllocated(n) {
  229. break
  230. }
  231. if IsIngressNetwork(n) && nc.ingressNetwork != nil {
  232. log.G(ctx).Errorf("Cannot allocate ingress network %s (%s) because another ingress network is already present: %s (%s)",
  233. n.ID, n.Spec.Annotations.Name, nc.ingressNetwork.ID, nc.ingressNetwork.Spec.Annotations)
  234. break
  235. }
  236. if err := a.allocateNetwork(ctx, n); err != nil {
  237. log.G(ctx).WithError(err).Errorf("Failed allocation for network %s", n.ID)
  238. break
  239. }
  240. if err := a.store.Batch(func(batch *store.Batch) error {
  241. return a.commitAllocatedNetwork(ctx, batch, n)
  242. }); err != nil {
  243. log.G(ctx).WithError(err).Errorf("Failed to commit allocation for network %s", n.ID)
  244. }
  245. if IsIngressNetwork(n) {
  246. nc.ingressNetwork = n
  247. err := a.allocateNodes(ctx)
  248. if err != nil {
  249. log.G(ctx).WithError(err).Error(err)
  250. }
  251. }
  252. case api.EventDeleteNetwork:
  253. n := v.Network.Copy()
  254. if IsIngressNetwork(n) && nc.ingressNetwork != nil && nc.ingressNetwork.ID == n.ID {
  255. nc.ingressNetwork = nil
  256. if err := a.deallocateNodes(ctx); err != nil {
  257. log.G(ctx).WithError(err).Error(err)
  258. }
  259. }
  260. // The assumption here is that all dependent objects
  261. // have been cleaned up when we are here so the only
  262. // thing that needs to happen is free the network
  263. // resources.
  264. if err := nc.nwkAllocator.Deallocate(n); err != nil {
  265. log.G(ctx).WithError(err).Errorf("Failed during network free for network %s", n.ID)
  266. }
  267. delete(nc.unallocatedNetworks, n.ID)
  268. case api.EventCreateService:
  269. var s *api.Service
  270. a.store.View(func(tx store.ReadTx) {
  271. s = store.GetService(tx, v.Service.ID)
  272. })
  273. if s == nil {
  274. break
  275. }
  276. if !nc.nwkAllocator.ServiceNeedsAllocation(s) {
  277. break
  278. }
  279. if err := a.allocateService(ctx, s); err != nil {
  280. log.G(ctx).WithError(err).Errorf("Failed allocation for service %s", s.ID)
  281. break
  282. }
  283. if err := a.store.Batch(func(batch *store.Batch) error {
  284. return a.commitAllocatedService(ctx, batch, s)
  285. }); err != nil {
  286. log.G(ctx).WithError(err).Errorf("Failed to commit allocation for service %s", s.ID)
  287. }
  288. case api.EventUpdateService:
  289. // We may have already allocated this service. If a create or
  290. // update event is older than the current version in the store,
  291. // we run the risk of allocating the service a second time.
  292. // Only operate on the latest version of the service.
  293. var s *api.Service
  294. a.store.View(func(tx store.ReadTx) {
  295. s = store.GetService(tx, v.Service.ID)
  296. })
  297. if s == nil {
  298. break
  299. }
  300. if !nc.nwkAllocator.ServiceNeedsAllocation(s) {
  301. if !nc.nwkAllocator.HostPublishPortsNeedUpdate(s) {
  302. break
  303. }
  304. updatePortsInHostPublishMode(s)
  305. } else {
  306. if err := a.allocateService(ctx, s); err != nil {
  307. log.G(ctx).WithError(err).Errorf("Failed allocation during update of service %s", s.ID)
  308. break
  309. }
  310. }
  311. if err := a.store.Batch(func(batch *store.Batch) error {
  312. return a.commitAllocatedService(ctx, batch, s)
  313. }); err != nil {
  314. log.G(ctx).WithError(err).Errorf("Failed to commit allocation during update for service %s", s.ID)
  315. nc.unallocatedServices[s.ID] = s
  316. } else {
  317. delete(nc.unallocatedServices, s.ID)
  318. }
  319. case api.EventDeleteService:
  320. s := v.Service.Copy()
  321. if err := nc.nwkAllocator.ServiceDeallocate(s); err != nil {
  322. log.G(ctx).WithError(err).Errorf("Failed deallocation during delete of service %s", s.ID)
  323. }
  324. // Remove it from unallocatedServices just in case
  325. // it's still there.
  326. delete(nc.unallocatedServices, s.ID)
  327. case api.EventCreateNode, api.EventUpdateNode, api.EventDeleteNode:
  328. a.doNodeAlloc(ctx, ev)
  329. case api.EventCreateTask, api.EventUpdateTask, api.EventDeleteTask:
  330. a.doTaskAlloc(ctx, ev)
  331. case state.EventCommit:
  332. a.procTasksNetwork(ctx, false)
  333. if time.Since(nc.lastRetry) > retryInterval {
  334. a.procUnallocatedNetworks(ctx)
  335. a.procUnallocatedServices(ctx)
  336. a.procTasksNetwork(ctx, true)
  337. nc.lastRetry = time.Now()
  338. }
  339. // Any left over tasks are moved to the unallocated set
  340. for _, t := range nc.pendingTasks {
  341. nc.unallocatedTasks[t.ID] = t
  342. }
  343. nc.pendingTasks = make(map[string]*api.Task)
  344. }
  345. }
  346. func (a *Allocator) doNodeAlloc(ctx context.Context, ev events.Event) {
  347. var (
  348. isDelete bool
  349. node *api.Node
  350. )
  351. // We may have already allocated this node. If a create or update
  352. // event is older than the current version in the store, we run the
  353. // risk of allocating the node a second time. Only operate on the
  354. // latest version of the node.
  355. switch v := ev.(type) {
  356. case api.EventCreateNode:
  357. a.store.View(func(tx store.ReadTx) {
  358. node = store.GetNode(tx, v.Node.ID)
  359. })
  360. case api.EventUpdateNode:
  361. a.store.View(func(tx store.ReadTx) {
  362. node = store.GetNode(tx, v.Node.ID)
  363. })
  364. case api.EventDeleteNode:
  365. isDelete = true
  366. node = v.Node.Copy()
  367. }
  368. if node == nil {
  369. return
  370. }
  371. nc := a.netCtx
  372. if isDelete {
  373. if nc.nwkAllocator.IsNodeAllocated(node) {
  374. if err := nc.nwkAllocator.DeallocateNode(node); err != nil {
  375. log.G(ctx).WithError(err).Errorf("Failed freeing network resources for node %s", node.ID)
  376. }
  377. }
  378. return
  379. }
  380. if !nc.nwkAllocator.IsNodeAllocated(node) && nc.ingressNetwork != nil {
  381. if node.Attachment == nil {
  382. node.Attachment = &api.NetworkAttachment{}
  383. }
  384. node.Attachment.Network = nc.ingressNetwork.Copy()
  385. if err := a.allocateNode(ctx, node); err != nil {
  386. log.G(ctx).WithError(err).Errorf("Failed to allocate network resources for node %s", node.ID)
  387. return
  388. }
  389. if err := a.store.Batch(func(batch *store.Batch) error {
  390. return a.commitAllocatedNode(ctx, batch, node)
  391. }); err != nil {
  392. log.G(ctx).WithError(err).Errorf("Failed to commit allocation of network resources for node %s", node.ID)
  393. }
  394. }
  395. }
  396. func (a *Allocator) allocateNodes(ctx context.Context) error {
  397. // Allocate nodes in the store so far before we process watched events.
  398. var (
  399. allocatedNodes []*api.Node
  400. nodes []*api.Node
  401. err error
  402. nc = a.netCtx
  403. )
  404. a.store.View(func(tx store.ReadTx) {
  405. nodes, err = store.FindNodes(tx, store.All)
  406. })
  407. if err != nil {
  408. return errors.Wrap(err, "error listing all nodes in store while trying to allocate network resources")
  409. }
  410. for _, node := range nodes {
  411. if nc.nwkAllocator.IsNodeAllocated(node) {
  412. continue
  413. }
  414. if node.Attachment == nil {
  415. node.Attachment = &api.NetworkAttachment{}
  416. }
  417. node.Attachment.Network = nc.ingressNetwork.Copy()
  418. if err := a.allocateNode(ctx, node); err != nil {
  419. log.G(ctx).WithError(err).Errorf("Failed to allocate network resources for node %s", node.ID)
  420. continue
  421. }
  422. allocatedNodes = append(allocatedNodes, node)
  423. }
  424. if err := a.store.Batch(func(batch *store.Batch) error {
  425. for _, node := range allocatedNodes {
  426. if err := a.commitAllocatedNode(ctx, batch, node); err != nil {
  427. log.G(ctx).WithError(err).Errorf("Failed to commit allocation of network resources for node %s", node.ID)
  428. }
  429. }
  430. return nil
  431. }); err != nil {
  432. log.G(ctx).WithError(err).Error("Failed to commit allocation of network resources for nodes")
  433. }
  434. return nil
  435. }
  436. func (a *Allocator) deallocateNodes(ctx context.Context) error {
  437. var (
  438. nodes []*api.Node
  439. nc = a.netCtx
  440. err error
  441. )
  442. a.store.View(func(tx store.ReadTx) {
  443. nodes, err = store.FindNodes(tx, store.All)
  444. })
  445. if err != nil {
  446. return fmt.Errorf("error listing all nodes in store while trying to free network resources")
  447. }
  448. for _, node := range nodes {
  449. if nc.nwkAllocator.IsNodeAllocated(node) {
  450. if err := nc.nwkAllocator.DeallocateNode(node); err != nil {
  451. log.G(ctx).WithError(err).Errorf("Failed freeing network resources for node %s", node.ID)
  452. }
  453. node.Attachment = nil
  454. if err := a.store.Batch(func(batch *store.Batch) error {
  455. return a.commitAllocatedNode(ctx, batch, node)
  456. }); err != nil {
  457. log.G(ctx).WithError(err).Errorf("Failed to commit deallocation of network resources for node %s", node.ID)
  458. }
  459. }
  460. }
  461. return nil
  462. }
  463. // taskReadyForNetworkVote checks if the task is ready for a network
  464. // vote to move it to PENDING state.
  465. func taskReadyForNetworkVote(t *api.Task, s *api.Service, nc *networkContext) bool {
  466. // Task is ready for vote if the following is true:
  467. //
  468. // Task has no network attached or networks attached but all
  469. // of them allocated AND Task's service has no endpoint or
  470. // network configured or service endpoints have been
  471. // allocated.
  472. return (len(t.Networks) == 0 || nc.nwkAllocator.IsTaskAllocated(t)) &&
  473. (s == nil || !nc.nwkAllocator.ServiceNeedsAllocation(s))
  474. }
  475. func taskUpdateNetworks(t *api.Task, networks []*api.NetworkAttachment) {
  476. networksCopy := make([]*api.NetworkAttachment, 0, len(networks))
  477. for _, n := range networks {
  478. networksCopy = append(networksCopy, n.Copy())
  479. }
  480. t.Networks = networksCopy
  481. }
  482. func taskUpdateEndpoint(t *api.Task, endpoint *api.Endpoint) {
  483. t.Endpoint = endpoint.Copy()
  484. }
  485. // IsIngressNetworkNeeded checks whether the service requires the routing-mesh
  486. func IsIngressNetworkNeeded(s *api.Service) bool {
  487. if s == nil {
  488. return false
  489. }
  490. if s.Spec.Endpoint == nil {
  491. return false
  492. }
  493. for _, p := range s.Spec.Endpoint.Ports {
  494. // The service to which this task belongs is trying to
  495. // expose ports with PublishMode as Ingress to the
  496. // external world. Automatically attach the task to
  497. // the ingress network.
  498. if p.PublishMode == api.PublishModeIngress {
  499. return true
  500. }
  501. }
  502. return false
  503. }
  504. func (a *Allocator) taskCreateNetworkAttachments(t *api.Task, s *api.Service) {
  505. // If task network attachments have already been filled in no
  506. // need to do anything else.
  507. if len(t.Networks) != 0 {
  508. return
  509. }
  510. var networks []*api.NetworkAttachment
  511. if IsIngressNetworkNeeded(s) && a.netCtx.ingressNetwork != nil {
  512. networks = append(networks, &api.NetworkAttachment{Network: a.netCtx.ingressNetwork})
  513. }
  514. a.store.View(func(tx store.ReadTx) {
  515. // Always prefer NetworkAttachmentConfig in the TaskSpec
  516. specNetworks := t.Spec.Networks
  517. if len(specNetworks) == 0 && s != nil && len(s.Spec.Networks) != 0 {
  518. specNetworks = s.Spec.Networks
  519. }
  520. for _, na := range specNetworks {
  521. n := store.GetNetwork(tx, na.Target)
  522. if n == nil {
  523. continue
  524. }
  525. attachment := api.NetworkAttachment{Network: n}
  526. attachment.Aliases = append(attachment.Aliases, na.Aliases...)
  527. attachment.Addresses = append(attachment.Addresses, na.Addresses...)
  528. networks = append(networks, &attachment)
  529. }
  530. })
  531. taskUpdateNetworks(t, networks)
  532. }
  533. func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) {
  534. var (
  535. isDelete bool
  536. t *api.Task
  537. )
  538. // We may have already allocated this task. If a create or update
  539. // event is older than the current version in the store, we run the
  540. // risk of allocating the task a second time. Only operate on the
  541. // latest version of the task.
  542. switch v := ev.(type) {
  543. case api.EventCreateTask:
  544. a.store.View(func(tx store.ReadTx) {
  545. t = store.GetTask(tx, v.Task.ID)
  546. })
  547. case api.EventUpdateTask:
  548. a.store.View(func(tx store.ReadTx) {
  549. t = store.GetTask(tx, v.Task.ID)
  550. })
  551. case api.EventDeleteTask:
  552. isDelete = true
  553. t = v.Task.Copy()
  554. }
  555. if t == nil {
  556. return
  557. }
  558. nc := a.netCtx
  559. // If the task has stopped running then we should free the network
  560. // resources associated with the task right away.
  561. if t.Status.State > api.TaskStateRunning || isDelete {
  562. if nc.nwkAllocator.IsTaskAllocated(t) {
  563. if err := nc.nwkAllocator.DeallocateTask(t); err != nil {
  564. log.G(ctx).WithError(err).Errorf("Failed freeing network resources for task %s", t.ID)
  565. }
  566. }
  567. // Cleanup any task references that might exist
  568. delete(nc.pendingTasks, t.ID)
  569. delete(nc.unallocatedTasks, t.ID)
  570. return
  571. }
  572. // If we are already in allocated state, there is
  573. // absolutely nothing else to do.
  574. if t.Status.State >= api.TaskStatePending {
  575. delete(nc.pendingTasks, t.ID)
  576. delete(nc.unallocatedTasks, t.ID)
  577. return
  578. }
  579. var s *api.Service
  580. if t.ServiceID != "" {
  581. a.store.View(func(tx store.ReadTx) {
  582. s = store.GetService(tx, t.ServiceID)
  583. })
  584. if s == nil {
  585. // If the task is running it is not normal to
  586. // not be able to find the associated
  587. // service. If the task is not running (task
  588. // is either dead or the desired state is set
  589. // to dead) then the service may not be
  590. // available in store. But we still need to
  591. // cleanup network resources associated with
  592. // the task.
  593. if t.Status.State <= api.TaskStateRunning && !isDelete {
  594. log.G(ctx).Errorf("Event %T: Failed to get service %s for task %s state %s: could not find service %s", ev, t.ServiceID, t.ID, t.Status.State, t.ServiceID)
  595. return
  596. }
  597. }
  598. }
  599. // Populate network attachments in the task
  600. // based on service spec.
  601. a.taskCreateNetworkAttachments(t, s)
  602. nc.pendingTasks[t.ID] = t
  603. }
  604. func (a *Allocator) allocateNode(ctx context.Context, node *api.Node) error {
  605. return a.netCtx.nwkAllocator.AllocateNode(node)
  606. }
  607. func (a *Allocator) commitAllocatedNode(ctx context.Context, batch *store.Batch, node *api.Node) error {
  608. if err := batch.Update(func(tx store.Tx) error {
  609. err := store.UpdateNode(tx, node)
  610. if err == store.ErrSequenceConflict {
  611. storeNode := store.GetNode(tx, node.ID)
  612. storeNode.Attachment = node.Attachment.Copy()
  613. err = store.UpdateNode(tx, storeNode)
  614. }
  615. return errors.Wrapf(err, "failed updating state in store transaction for node %s", node.ID)
  616. }); err != nil {
  617. if err := a.netCtx.nwkAllocator.DeallocateNode(node); err != nil {
  618. log.G(ctx).WithError(err).Errorf("failed rolling back allocation of node %s", node.ID)
  619. }
  620. return err
  621. }
  622. return nil
  623. }
  624. // This function prepares the service object for being updated when the change regards
  625. // the published ports in host mode: It resets the runtime state ports (s.Endpoint.Ports)
  626. // to the current ingress mode runtime state ports plus the newly configured publish mode ports,
  627. // so that the service allocation invoked on this new service object will trigger the deallocation
  628. // of any old publish mode port and allocation of any new one.
  629. func updatePortsInHostPublishMode(s *api.Service) {
  630. // First, remove all host-mode ports from s.Endpoint.Ports
  631. if s.Endpoint != nil {
  632. var portConfigs []*api.PortConfig
  633. for _, portConfig := range s.Endpoint.Ports {
  634. if portConfig.PublishMode != api.PublishModeHost {
  635. portConfigs = append(portConfigs, portConfig)
  636. }
  637. }
  638. s.Endpoint.Ports = portConfigs
  639. }
  640. // Add back all host-mode ports
  641. if s.Spec.Endpoint != nil {
  642. if s.Endpoint == nil {
  643. s.Endpoint = &api.Endpoint{}
  644. }
  645. for _, portConfig := range s.Spec.Endpoint.Ports {
  646. if portConfig.PublishMode == api.PublishModeHost {
  647. s.Endpoint.Ports = append(s.Endpoint.Ports, portConfig.Copy())
  648. }
  649. }
  650. }
  651. s.Endpoint.Spec = s.Spec.Endpoint.Copy()
  652. }
  653. func (a *Allocator) allocateService(ctx context.Context, s *api.Service) error {
  654. nc := a.netCtx
  655. if s.Spec.Endpoint != nil {
  656. // service has user-defined endpoint
  657. if s.Endpoint == nil {
  658. // service currently has no allocated endpoint, need allocated.
  659. s.Endpoint = &api.Endpoint{
  660. Spec: s.Spec.Endpoint.Copy(),
  661. }
  662. }
  663. // The service is trying to expose ports to the external
  664. // world. Automatically attach the service to the ingress
  665. // network only if it is not already done.
  666. if IsIngressNetworkNeeded(s) {
  667. if nc.ingressNetwork == nil {
  668. return fmt.Errorf("ingress network is missing")
  669. }
  670. var found bool
  671. for _, vip := range s.Endpoint.VirtualIPs {
  672. if vip.NetworkID == nc.ingressNetwork.ID {
  673. found = true
  674. break
  675. }
  676. }
  677. if !found {
  678. s.Endpoint.VirtualIPs = append(s.Endpoint.VirtualIPs,
  679. &api.Endpoint_VirtualIP{NetworkID: nc.ingressNetwork.ID})
  680. }
  681. }
  682. } else if s.Endpoint != nil {
  683. // service has no user-defined endpoints while has already allocated network resources,
  684. // need deallocated.
  685. if err := nc.nwkAllocator.ServiceDeallocate(s); err != nil {
  686. return err
  687. }
  688. }
  689. if err := nc.nwkAllocator.ServiceAllocate(s); err != nil {
  690. nc.unallocatedServices[s.ID] = s
  691. return err
  692. }
  693. // If the service doesn't expose ports any more and if we have
  694. // any lingering virtual IP references for ingress network
  695. // clean them up here.
  696. if !IsIngressNetworkNeeded(s) && nc.ingressNetwork != nil {
  697. if s.Endpoint != nil {
  698. for i, vip := range s.Endpoint.VirtualIPs {
  699. if vip.NetworkID == nc.ingressNetwork.ID {
  700. n := len(s.Endpoint.VirtualIPs)
  701. s.Endpoint.VirtualIPs[i], s.Endpoint.VirtualIPs[n-1] = s.Endpoint.VirtualIPs[n-1], nil
  702. s.Endpoint.VirtualIPs = s.Endpoint.VirtualIPs[:n-1]
  703. break
  704. }
  705. }
  706. }
  707. }
  708. return nil
  709. }
  710. func (a *Allocator) commitAllocatedService(ctx context.Context, batch *store.Batch, s *api.Service) error {
  711. if err := batch.Update(func(tx store.Tx) error {
  712. err := store.UpdateService(tx, s)
  713. if err == store.ErrSequenceConflict {
  714. storeService := store.GetService(tx, s.ID)
  715. storeService.Endpoint = s.Endpoint
  716. err = store.UpdateService(tx, storeService)
  717. }
  718. return errors.Wrapf(err, "failed updating state in store transaction for service %s", s.ID)
  719. }); err != nil {
  720. if err := a.netCtx.nwkAllocator.ServiceDeallocate(s); err != nil {
  721. log.G(ctx).WithError(err).Errorf("failed rolling back allocation of service %s", s.ID)
  722. }
  723. return err
  724. }
  725. return nil
  726. }
  727. func (a *Allocator) allocateNetwork(ctx context.Context, n *api.Network) error {
  728. nc := a.netCtx
  729. if err := nc.nwkAllocator.Allocate(n); err != nil {
  730. nc.unallocatedNetworks[n.ID] = n
  731. return errors.Wrapf(err, "failed during network allocation for network %s", n.ID)
  732. }
  733. return nil
  734. }
  735. func (a *Allocator) commitAllocatedNetwork(ctx context.Context, batch *store.Batch, n *api.Network) error {
  736. if err := batch.Update(func(tx store.Tx) error {
  737. if err := store.UpdateNetwork(tx, n); err != nil {
  738. return errors.Wrapf(err, "failed updating state in store transaction for network %s", n.ID)
  739. }
  740. return nil
  741. }); err != nil {
  742. if err := a.netCtx.nwkAllocator.Deallocate(n); err != nil {
  743. log.G(ctx).WithError(err).Errorf("failed rolling back allocation of network %s", n.ID)
  744. }
  745. return err
  746. }
  747. return nil
  748. }
  749. func (a *Allocator) allocateTask(ctx context.Context, t *api.Task) (err error) {
  750. taskUpdated := false
  751. nc := a.netCtx
  752. // We might be here even if a task allocation has already
  753. // happened but wasn't successfully committed to store. In such
  754. // cases skip allocation and go straight ahead to updating the
  755. // store.
  756. if !nc.nwkAllocator.IsTaskAllocated(t) {
  757. a.store.View(func(tx store.ReadTx) {
  758. if t.ServiceID != "" {
  759. s := store.GetService(tx, t.ServiceID)
  760. if s == nil {
  761. err = fmt.Errorf("could not find service %s", t.ServiceID)
  762. return
  763. }
  764. if nc.nwkAllocator.ServiceNeedsAllocation(s) {
  765. err = fmt.Errorf("service %s to which this task %s belongs has pending allocations", s.ID, t.ID)
  766. return
  767. }
  768. if s.Endpoint != nil {
  769. taskUpdateEndpoint(t, s.Endpoint)
  770. taskUpdated = true
  771. }
  772. }
  773. for _, na := range t.Networks {
  774. n := store.GetNetwork(tx, na.Network.ID)
  775. if n == nil {
  776. err = fmt.Errorf("failed to retrieve network %s while allocating task %s", na.Network.ID, t.ID)
  777. return
  778. }
  779. if !nc.nwkAllocator.IsAllocated(n) {
  780. err = fmt.Errorf("network %s attached to task %s not allocated yet", n.ID, t.ID)
  781. return
  782. }
  783. na.Network = n
  784. }
  785. if err = nc.nwkAllocator.AllocateTask(t); err != nil {
  786. err = errors.Wrapf(err, "failed during network allocation for task %s", t.ID)
  787. return
  788. }
  789. if nc.nwkAllocator.IsTaskAllocated(t) {
  790. taskUpdated = true
  791. }
  792. })
  793. if err != nil {
  794. return err
  795. }
  796. }
  797. // Update the network allocations and moving to
  798. // PENDING state on top of the latest store state.
  799. if a.taskAllocateVote(networkVoter, t.ID) {
  800. if t.Status.State < api.TaskStatePending {
  801. updateTaskStatus(t, api.TaskStatePending, allocatedStatusMessage)
  802. taskUpdated = true
  803. }
  804. }
  805. if !taskUpdated {
  806. return errNoChanges
  807. }
  808. return nil
  809. }
  810. func (a *Allocator) commitAllocatedTask(ctx context.Context, batch *store.Batch, t *api.Task) error {
  811. return batch.Update(func(tx store.Tx) error {
  812. err := store.UpdateTask(tx, t)
  813. if err == store.ErrSequenceConflict {
  814. storeTask := store.GetTask(tx, t.ID)
  815. taskUpdateNetworks(storeTask, t.Networks)
  816. taskUpdateEndpoint(storeTask, t.Endpoint)
  817. if storeTask.Status.State < api.TaskStatePending {
  818. storeTask.Status = t.Status
  819. }
  820. err = store.UpdateTask(tx, storeTask)
  821. }
  822. return errors.Wrapf(err, "failed updating state in store transaction for task %s", t.ID)
  823. })
  824. }
  825. func (a *Allocator) procUnallocatedNetworks(ctx context.Context) {
  826. nc := a.netCtx
  827. var allocatedNetworks []*api.Network
  828. for _, n := range nc.unallocatedNetworks {
  829. if !nc.nwkAllocator.IsAllocated(n) {
  830. if err := a.allocateNetwork(ctx, n); err != nil {
  831. log.G(ctx).WithError(err).Debugf("Failed allocation of unallocated network %s", n.ID)
  832. continue
  833. }
  834. allocatedNetworks = append(allocatedNetworks, n)
  835. }
  836. }
  837. if len(allocatedNetworks) == 0 {
  838. return
  839. }
  840. err := a.store.Batch(func(batch *store.Batch) error {
  841. for _, n := range allocatedNetworks {
  842. if err := a.commitAllocatedNetwork(ctx, batch, n); err != nil {
  843. log.G(ctx).WithError(err).Debugf("Failed to commit allocation of unallocated network %s", n.ID)
  844. continue
  845. }
  846. delete(nc.unallocatedNetworks, n.ID)
  847. }
  848. return nil
  849. })
  850. if err != nil {
  851. log.G(ctx).WithError(err).Error("Failed to commit allocation of unallocated networks")
  852. // We optimistically removed these from nc.unallocatedNetworks
  853. // above in anticipation of successfully committing the batch,
  854. // but since the transaction has failed, we requeue them here.
  855. for _, n := range allocatedNetworks {
  856. nc.unallocatedNetworks[n.ID] = n
  857. }
  858. }
  859. }
  860. func (a *Allocator) procUnallocatedServices(ctx context.Context) {
  861. nc := a.netCtx
  862. var allocatedServices []*api.Service
  863. for _, s := range nc.unallocatedServices {
  864. if nc.nwkAllocator.ServiceNeedsAllocation(s) {
  865. if err := a.allocateService(ctx, s); err != nil {
  866. log.G(ctx).WithError(err).Debugf("Failed allocation of unallocated service %s", s.ID)
  867. continue
  868. }
  869. allocatedServices = append(allocatedServices, s)
  870. }
  871. }
  872. if len(allocatedServices) == 0 {
  873. return
  874. }
  875. err := a.store.Batch(func(batch *store.Batch) error {
  876. for _, s := range allocatedServices {
  877. if err := a.commitAllocatedService(ctx, batch, s); err != nil {
  878. log.G(ctx).WithError(err).Debugf("Failed to commit allocation of unallocated service %s", s.ID)
  879. continue
  880. }
  881. delete(nc.unallocatedServices, s.ID)
  882. }
  883. return nil
  884. })
  885. if err != nil {
  886. log.G(ctx).WithError(err).Error("Failed to commit allocation of unallocated services")
  887. // We optimistically removed these from nc.unallocatedServices
  888. // above in anticipation of successfully committing the batch,
  889. // but since the transaction has failed, we requeue them here.
  890. for _, s := range allocatedServices {
  891. nc.unallocatedServices[s.ID] = s
  892. }
  893. }
  894. }
  895. func (a *Allocator) procTasksNetwork(ctx context.Context, onRetry bool) {
  896. nc := a.netCtx
  897. quiet := false
  898. toAllocate := nc.pendingTasks
  899. if onRetry {
  900. toAllocate = nc.unallocatedTasks
  901. quiet = true
  902. }
  903. allocatedTasks := make([]*api.Task, 0, len(toAllocate))
  904. for _, t := range toAllocate {
  905. if err := a.allocateTask(ctx, t); err == nil {
  906. allocatedTasks = append(allocatedTasks, t)
  907. } else if err != errNoChanges {
  908. if quiet {
  909. log.G(ctx).WithError(err).Debug("task allocation failure")
  910. } else {
  911. log.G(ctx).WithError(err).Error("task allocation failure")
  912. }
  913. }
  914. }
  915. if len(allocatedTasks) == 0 {
  916. return
  917. }
  918. err := a.store.Batch(func(batch *store.Batch) error {
  919. for _, t := range allocatedTasks {
  920. err := a.commitAllocatedTask(ctx, batch, t)
  921. if err != nil {
  922. log.G(ctx).WithError(err).Error("task allocation commit failure")
  923. continue
  924. }
  925. delete(toAllocate, t.ID)
  926. }
  927. return nil
  928. })
  929. if err != nil {
  930. log.G(ctx).WithError(err).Error("failed a store batch operation while processing tasks")
  931. // We optimistically removed these from toAllocate above in
  932. // anticipation of successfully committing the batch, but since
  933. // the transaction has failed, we requeue them here.
  934. for _, t := range allocatedTasks {
  935. toAllocate[t.ID] = t
  936. }
  937. }
  938. }
  939. // updateTaskStatus sets TaskStatus and updates timestamp.
  940. func updateTaskStatus(t *api.Task, newStatus api.TaskState, message string) {
  941. t.Status.State = newStatus
  942. t.Status.Message = message
  943. t.Status.Timestamp = ptypes.MustTimestampProto(time.Now())
  944. }
  945. // IsIngressNetwork returns whether the passed network is an ingress network.
  946. func IsIngressNetwork(nw *api.Network) bool {
  947. return networkallocator.IsIngressNetwork(nw)
  948. }
  949. // GetIngressNetwork fetches the ingress network from store.
  950. // ErrNoIngress will be returned if the ingress network is not present,
  951. // nil otherwise. In case of any other failure in accessing the store,
  952. // the respective error will be reported as is.
  953. func GetIngressNetwork(s *store.MemoryStore) (*api.Network, error) {
  954. var (
  955. networks []*api.Network
  956. err error
  957. )
  958. s.View(func(tx store.ReadTx) {
  959. networks, err = store.FindNetworks(tx, store.All)
  960. })
  961. if err != nil {
  962. return nil, err
  963. }
  964. for _, n := range networks {
  965. if IsIngressNetwork(n) {
  966. return n, nil
  967. }
  968. }
  969. return nil, ErrNoIngress
  970. }