network.go 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107
  1. package allocator
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/docker/go-events"
  6. "github.com/docker/swarmkit/api"
  7. "github.com/docker/swarmkit/log"
  8. "github.com/docker/swarmkit/manager/allocator/networkallocator"
  9. "github.com/docker/swarmkit/manager/state"
  10. "github.com/docker/swarmkit/manager/state/store"
  11. "github.com/docker/swarmkit/protobuf/ptypes"
  12. "github.com/pkg/errors"
  13. "golang.org/x/net/context"
  14. )
  15. const (
  16. // Network allocator Voter ID for task allocation vote.
  17. networkVoter = "network"
  18. allocatedStatusMessage = "pending task scheduling"
  19. )
  20. var (
  21. // ErrNoIngress is returned when no ingress network is found in store
  22. ErrNoIngress = errors.New("no ingress network found")
  23. errNoChanges = errors.New("task unchanged")
  24. retryInterval = 5 * time.Minute
  25. )
  26. // Network context information which is used throughout the network allocation code.
  27. type networkContext struct {
  28. ingressNetwork *api.Network
  29. // Instance of the low-level network allocator which performs
  30. // the actual network allocation.
  31. nwkAllocator *networkallocator.NetworkAllocator
  32. // A set of tasks which are ready to be allocated as a batch. This is
  33. // distinct from "unallocatedTasks" which are tasks that failed to
  34. // allocate on the first try, being held for a future retry.
  35. pendingTasks map[string]*api.Task
  36. // A set of unallocated tasks which will be revisited if any thing
  37. // changes in system state that might help task allocation.
  38. unallocatedTasks map[string]*api.Task
  39. // A set of unallocated services which will be revisited if
  40. // any thing changes in system state that might help service
  41. // allocation.
  42. unallocatedServices map[string]*api.Service
  43. // A set of unallocated networks which will be revisited if
  44. // any thing changes in system state that might help network
  45. // allocation.
  46. unallocatedNetworks map[string]*api.Network
  47. // lastRetry is the last timestamp when unallocated
  48. // tasks/services/networks were retried.
  49. lastRetry time.Time
  50. }
  51. func (a *Allocator) doNetworkInit(ctx context.Context) (err error) {
  52. na, err := networkallocator.New(a.pluginGetter)
  53. if err != nil {
  54. return err
  55. }
  56. nc := &networkContext{
  57. nwkAllocator: na,
  58. pendingTasks: make(map[string]*api.Task),
  59. unallocatedTasks: make(map[string]*api.Task),
  60. unallocatedServices: make(map[string]*api.Service),
  61. unallocatedNetworks: make(map[string]*api.Network),
  62. lastRetry: time.Now(),
  63. }
  64. a.netCtx = nc
  65. defer func() {
  66. // Clear a.netCtx if initialization was unsuccessful.
  67. if err != nil {
  68. a.netCtx = nil
  69. }
  70. }()
  71. // Ingress network is now created at cluster's first time creation.
  72. // Check if we have the ingress network. If found, make sure it is
  73. // allocated, before reading all network objects for allocation.
  74. // If not found, it means it was removed by user, nothing to do here.
  75. ingressNetwork, err := GetIngressNetwork(a.store)
  76. switch err {
  77. case nil:
  78. // Try to complete ingress network allocation before anything else so
  79. // that the we can get the preferred subnet for ingress network.
  80. nc.ingressNetwork = ingressNetwork
  81. if !na.IsAllocated(nc.ingressNetwork) {
  82. if err := a.allocateNetwork(ctx, nc.ingressNetwork); err != nil {
  83. log.G(ctx).WithError(err).Error("failed allocating ingress network during init")
  84. } else if err := a.store.Batch(func(batch *store.Batch) error {
  85. if err := a.commitAllocatedNetwork(ctx, batch, nc.ingressNetwork); err != nil {
  86. log.G(ctx).WithError(err).Error("failed committing allocation of ingress network during init")
  87. }
  88. return nil
  89. }); err != nil {
  90. log.G(ctx).WithError(err).Error("failed committing allocation of ingress network during init")
  91. }
  92. }
  93. case ErrNoIngress:
  94. // Ingress network is not present in store, It means user removed it
  95. // and did not create a new one.
  96. default:
  97. return errors.Wrap(err, "failure while looking for ingress network during init")
  98. }
  99. // Allocate networks in the store so far before we started
  100. // watching.
  101. var networks []*api.Network
  102. a.store.View(func(tx store.ReadTx) {
  103. networks, err = store.FindNetworks(tx, store.All)
  104. })
  105. if err != nil {
  106. return errors.Wrap(err, "error listing all networks in store while trying to allocate during init")
  107. }
  108. var allocatedNetworks []*api.Network
  109. for _, n := range networks {
  110. if na.IsAllocated(n) {
  111. continue
  112. }
  113. if err := a.allocateNetwork(ctx, n); err != nil {
  114. log.G(ctx).WithError(err).Errorf("failed allocating network %s during init", n.ID)
  115. continue
  116. }
  117. allocatedNetworks = append(allocatedNetworks, n)
  118. }
  119. if err := a.store.Batch(func(batch *store.Batch) error {
  120. for _, n := range allocatedNetworks {
  121. if err := a.commitAllocatedNetwork(ctx, batch, n); err != nil {
  122. log.G(ctx).WithError(err).Errorf("failed committing allocation of network %s during init", n.ID)
  123. }
  124. }
  125. return nil
  126. }); err != nil {
  127. log.G(ctx).WithError(err).Error("failed committing allocation of networks during init")
  128. }
  129. // Allocate nodes in the store so far before we process watched events,
  130. // if the ingress network is present.
  131. if nc.ingressNetwork != nil {
  132. if err := a.allocateNodes(ctx); err != nil {
  133. return err
  134. }
  135. }
  136. // Allocate services in the store so far before we process watched events.
  137. var services []*api.Service
  138. a.store.View(func(tx store.ReadTx) {
  139. services, err = store.FindServices(tx, store.All)
  140. })
  141. if err != nil {
  142. return errors.Wrap(err, "error listing all services in store while trying to allocate during init")
  143. }
  144. var allocatedServices []*api.Service
  145. for _, s := range services {
  146. if !nc.nwkAllocator.ServiceNeedsAllocation(s, networkallocator.OnInit) {
  147. continue
  148. }
  149. if err := a.allocateService(ctx, s); err != nil {
  150. log.G(ctx).WithError(err).Errorf("failed allocating service %s during init", s.ID)
  151. continue
  152. }
  153. allocatedServices = append(allocatedServices, s)
  154. }
  155. if err := a.store.Batch(func(batch *store.Batch) error {
  156. for _, s := range allocatedServices {
  157. if err := a.commitAllocatedService(ctx, batch, s); err != nil {
  158. log.G(ctx).WithError(err).Errorf("failed committing allocation of service %s during init", s.ID)
  159. }
  160. }
  161. return nil
  162. }); err != nil {
  163. log.G(ctx).WithError(err).Error("failed committing allocation of services during init")
  164. }
  165. // Allocate tasks in the store so far before we started watching.
  166. var (
  167. tasks []*api.Task
  168. allocatedTasks []*api.Task
  169. )
  170. a.store.View(func(tx store.ReadTx) {
  171. tasks, err = store.FindTasks(tx, store.All)
  172. })
  173. if err != nil {
  174. return errors.Wrap(err, "error listing all tasks in store while trying to allocate during init")
  175. }
  176. for _, t := range tasks {
  177. if t.Status.State > api.TaskStateRunning {
  178. continue
  179. }
  180. var s *api.Service
  181. if t.ServiceID != "" {
  182. a.store.View(func(tx store.ReadTx) {
  183. s = store.GetService(tx, t.ServiceID)
  184. })
  185. }
  186. // Populate network attachments in the task
  187. // based on service spec.
  188. a.taskCreateNetworkAttachments(t, s)
  189. if taskReadyForNetworkVote(t, s, nc) {
  190. if t.Status.State >= api.TaskStatePending {
  191. continue
  192. }
  193. if a.taskAllocateVote(networkVoter, t.ID) {
  194. // If the task is not attached to any network, network
  195. // allocators job is done. Immediately cast a vote so
  196. // that the task can be moved to the PENDING state as
  197. // soon as possible.
  198. updateTaskStatus(t, api.TaskStatePending, allocatedStatusMessage)
  199. allocatedTasks = append(allocatedTasks, t)
  200. }
  201. continue
  202. }
  203. err := a.allocateTask(ctx, t)
  204. if err == nil {
  205. allocatedTasks = append(allocatedTasks, t)
  206. } else if err != errNoChanges {
  207. log.G(ctx).WithError(err).Errorf("failed allocating task %s during init", t.ID)
  208. nc.unallocatedTasks[t.ID] = t
  209. }
  210. }
  211. if err := a.store.Batch(func(batch *store.Batch) error {
  212. for _, t := range allocatedTasks {
  213. if err := a.commitAllocatedTask(ctx, batch, t); err != nil {
  214. log.G(ctx).WithError(err).Errorf("failed committing allocation of task %s during init", t.ID)
  215. }
  216. }
  217. return nil
  218. }); err != nil {
  219. log.G(ctx).WithError(err).Error("failed committing allocation of tasks during init")
  220. }
  221. return nil
  222. }
  223. func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
  224. nc := a.netCtx
  225. switch v := ev.(type) {
  226. case api.EventCreateNetwork:
  227. n := v.Network.Copy()
  228. if nc.nwkAllocator.IsAllocated(n) {
  229. break
  230. }
  231. if IsIngressNetwork(n) && nc.ingressNetwork != nil {
  232. log.G(ctx).Errorf("Cannot allocate ingress network %s (%s) because another ingress network is already present: %s (%s)",
  233. n.ID, n.Spec.Annotations.Name, nc.ingressNetwork.ID, nc.ingressNetwork.Spec.Annotations)
  234. break
  235. }
  236. if err := a.allocateNetwork(ctx, n); err != nil {
  237. log.G(ctx).WithError(err).Errorf("Failed allocation for network %s", n.ID)
  238. break
  239. }
  240. if err := a.store.Batch(func(batch *store.Batch) error {
  241. return a.commitAllocatedNetwork(ctx, batch, n)
  242. }); err != nil {
  243. log.G(ctx).WithError(err).Errorf("Failed to commit allocation for network %s", n.ID)
  244. }
  245. if IsIngressNetwork(n) {
  246. nc.ingressNetwork = n
  247. err := a.allocateNodes(ctx)
  248. if err != nil {
  249. log.G(ctx).WithError(err).Error(err)
  250. }
  251. }
  252. case api.EventDeleteNetwork:
  253. n := v.Network.Copy()
  254. if IsIngressNetwork(n) && nc.ingressNetwork != nil && nc.ingressNetwork.ID == n.ID {
  255. nc.ingressNetwork = nil
  256. if err := a.deallocateNodes(ctx); err != nil {
  257. log.G(ctx).WithError(err).Error(err)
  258. }
  259. }
  260. // The assumption here is that all dependent objects
  261. // have been cleaned up when we are here so the only
  262. // thing that needs to happen is free the network
  263. // resources.
  264. if err := nc.nwkAllocator.Deallocate(n); err != nil {
  265. log.G(ctx).WithError(err).Errorf("Failed during network free for network %s", n.ID)
  266. }
  267. delete(nc.unallocatedNetworks, n.ID)
  268. case api.EventCreateService:
  269. var s *api.Service
  270. a.store.View(func(tx store.ReadTx) {
  271. s = store.GetService(tx, v.Service.ID)
  272. })
  273. if s == nil {
  274. break
  275. }
  276. if !nc.nwkAllocator.ServiceNeedsAllocation(s) {
  277. break
  278. }
  279. if err := a.allocateService(ctx, s); err != nil {
  280. log.G(ctx).WithError(err).Errorf("Failed allocation for service %s", s.ID)
  281. break
  282. }
  283. if err := a.store.Batch(func(batch *store.Batch) error {
  284. return a.commitAllocatedService(ctx, batch, s)
  285. }); err != nil {
  286. log.G(ctx).WithError(err).Errorf("Failed to commit allocation for service %s", s.ID)
  287. }
  288. case api.EventUpdateService:
  289. // We may have already allocated this service. If a create or
  290. // update event is older than the current version in the store,
  291. // we run the risk of allocating the service a second time.
  292. // Only operate on the latest version of the service.
  293. var s *api.Service
  294. a.store.View(func(tx store.ReadTx) {
  295. s = store.GetService(tx, v.Service.ID)
  296. })
  297. if s == nil {
  298. break
  299. }
  300. if !nc.nwkAllocator.ServiceNeedsAllocation(s) {
  301. if !nc.nwkAllocator.HostPublishPortsNeedUpdate(s) {
  302. break
  303. }
  304. updatePortsInHostPublishMode(s)
  305. } else {
  306. if err := a.allocateService(ctx, s); err != nil {
  307. log.G(ctx).WithError(err).Errorf("Failed allocation during update of service %s", s.ID)
  308. break
  309. }
  310. }
  311. if err := a.store.Batch(func(batch *store.Batch) error {
  312. return a.commitAllocatedService(ctx, batch, s)
  313. }); err != nil {
  314. log.G(ctx).WithError(err).Errorf("Failed to commit allocation during update for service %s", s.ID)
  315. nc.unallocatedServices[s.ID] = s
  316. } else {
  317. delete(nc.unallocatedServices, s.ID)
  318. }
  319. case api.EventDeleteService:
  320. s := v.Service.Copy()
  321. if err := nc.nwkAllocator.ServiceDeallocate(s); err != nil {
  322. log.G(ctx).WithError(err).Errorf("Failed deallocation during delete of service %s", s.ID)
  323. }
  324. // Remove it from unallocatedServices just in case
  325. // it's still there.
  326. delete(nc.unallocatedServices, s.ID)
  327. case api.EventCreateNode, api.EventUpdateNode, api.EventDeleteNode:
  328. a.doNodeAlloc(ctx, ev)
  329. case api.EventCreateTask, api.EventUpdateTask, api.EventDeleteTask:
  330. a.doTaskAlloc(ctx, ev)
  331. case state.EventCommit:
  332. a.procTasksNetwork(ctx, false)
  333. if time.Since(nc.lastRetry) > retryInterval {
  334. a.procUnallocatedNetworks(ctx)
  335. a.procUnallocatedServices(ctx)
  336. a.procTasksNetwork(ctx, true)
  337. nc.lastRetry = time.Now()
  338. }
  339. // Any left over tasks are moved to the unallocated set
  340. for _, t := range nc.pendingTasks {
  341. nc.unallocatedTasks[t.ID] = t
  342. }
  343. nc.pendingTasks = make(map[string]*api.Task)
  344. }
  345. }
  346. func (a *Allocator) doNodeAlloc(ctx context.Context, ev events.Event) {
  347. var (
  348. isDelete bool
  349. node *api.Node
  350. )
  351. // We may have already allocated this node. If a create or update
  352. // event is older than the current version in the store, we run the
  353. // risk of allocating the node a second time. Only operate on the
  354. // latest version of the node.
  355. switch v := ev.(type) {
  356. case api.EventCreateNode:
  357. a.store.View(func(tx store.ReadTx) {
  358. node = store.GetNode(tx, v.Node.ID)
  359. })
  360. case api.EventUpdateNode:
  361. a.store.View(func(tx store.ReadTx) {
  362. node = store.GetNode(tx, v.Node.ID)
  363. })
  364. case api.EventDeleteNode:
  365. isDelete = true
  366. node = v.Node.Copy()
  367. }
  368. if node == nil {
  369. return
  370. }
  371. nc := a.netCtx
  372. if isDelete {
  373. if nc.nwkAllocator.IsNodeAllocated(node) {
  374. if err := nc.nwkAllocator.DeallocateNode(node); err != nil {
  375. log.G(ctx).WithError(err).Errorf("Failed freeing network resources for node %s", node.ID)
  376. }
  377. }
  378. return
  379. }
  380. if !nc.nwkAllocator.IsNodeAllocated(node) && nc.ingressNetwork != nil {
  381. if node.Attachment == nil {
  382. node.Attachment = &api.NetworkAttachment{}
  383. }
  384. node.Attachment.Network = nc.ingressNetwork.Copy()
  385. if err := a.allocateNode(ctx, node); err != nil {
  386. log.G(ctx).WithError(err).Errorf("Failed to allocate network resources for node %s", node.ID)
  387. return
  388. }
  389. if err := a.store.Batch(func(batch *store.Batch) error {
  390. return a.commitAllocatedNode(ctx, batch, node)
  391. }); err != nil {
  392. log.G(ctx).WithError(err).Errorf("Failed to commit allocation of network resources for node %s", node.ID)
  393. }
  394. }
  395. }
  396. func (a *Allocator) allocateNodes(ctx context.Context) error {
  397. // Allocate nodes in the store so far before we process watched events.
  398. var (
  399. allocatedNodes []*api.Node
  400. nodes []*api.Node
  401. err error
  402. nc = a.netCtx
  403. )
  404. a.store.View(func(tx store.ReadTx) {
  405. nodes, err = store.FindNodes(tx, store.All)
  406. })
  407. if err != nil {
  408. return errors.Wrap(err, "error listing all nodes in store while trying to allocate network resources")
  409. }
  410. for _, node := range nodes {
  411. if nc.nwkAllocator.IsNodeAllocated(node) {
  412. continue
  413. }
  414. if node.Attachment == nil {
  415. node.Attachment = &api.NetworkAttachment{}
  416. }
  417. node.Attachment.Network = nc.ingressNetwork.Copy()
  418. if err := a.allocateNode(ctx, node); err != nil {
  419. log.G(ctx).WithError(err).Errorf("Failed to allocate network resources for node %s", node.ID)
  420. continue
  421. }
  422. allocatedNodes = append(allocatedNodes, node)
  423. }
  424. if err := a.store.Batch(func(batch *store.Batch) error {
  425. for _, node := range allocatedNodes {
  426. if err := a.commitAllocatedNode(ctx, batch, node); err != nil {
  427. log.G(ctx).WithError(err).Errorf("Failed to commit allocation of network resources for node %s", node.ID)
  428. }
  429. }
  430. return nil
  431. }); err != nil {
  432. log.G(ctx).WithError(err).Error("Failed to commit allocation of network resources for nodes")
  433. }
  434. return nil
  435. }
  436. func (a *Allocator) deallocateNodes(ctx context.Context) error {
  437. var (
  438. nodes []*api.Node
  439. nc = a.netCtx
  440. err error
  441. )
  442. a.store.View(func(tx store.ReadTx) {
  443. nodes, err = store.FindNodes(tx, store.All)
  444. })
  445. if err != nil {
  446. return fmt.Errorf("error listing all nodes in store while trying to free network resources")
  447. }
  448. for _, node := range nodes {
  449. if nc.nwkAllocator.IsNodeAllocated(node) {
  450. if err := nc.nwkAllocator.DeallocateNode(node); err != nil {
  451. log.G(ctx).WithError(err).Errorf("Failed freeing network resources for node %s", node.ID)
  452. }
  453. node.Attachment = nil
  454. if err := a.store.Batch(func(batch *store.Batch) error {
  455. return a.commitAllocatedNode(ctx, batch, node)
  456. }); err != nil {
  457. log.G(ctx).WithError(err).Errorf("Failed to commit deallocation of network resources for node %s", node.ID)
  458. }
  459. }
  460. }
  461. return nil
  462. }
  463. // taskReadyForNetworkVote checks if the task is ready for a network
  464. // vote to move it to PENDING state.
  465. func taskReadyForNetworkVote(t *api.Task, s *api.Service, nc *networkContext) bool {
  466. // Task is ready for vote if the following is true:
  467. //
  468. // Task has no network attached or networks attached but all
  469. // of them allocated AND Task's service has no endpoint or
  470. // network configured or service endpoints have been
  471. // allocated.
  472. return (len(t.Networks) == 0 || nc.nwkAllocator.IsTaskAllocated(t)) &&
  473. (s == nil || !nc.nwkAllocator.ServiceNeedsAllocation(s))
  474. }
  475. func taskUpdateNetworks(t *api.Task, networks []*api.NetworkAttachment) {
  476. networksCopy := make([]*api.NetworkAttachment, 0, len(networks))
  477. for _, n := range networks {
  478. networksCopy = append(networksCopy, n.Copy())
  479. }
  480. t.Networks = networksCopy
  481. }
  482. func taskUpdateEndpoint(t *api.Task, endpoint *api.Endpoint) {
  483. t.Endpoint = endpoint.Copy()
  484. }
  485. // IsIngressNetworkNeeded checks whether the service requires the routing-mesh
  486. func IsIngressNetworkNeeded(s *api.Service) bool {
  487. return networkallocator.IsIngressNetworkNeeded(s)
  488. }
  489. func (a *Allocator) taskCreateNetworkAttachments(t *api.Task, s *api.Service) {
  490. // If task network attachments have already been filled in no
  491. // need to do anything else.
  492. if len(t.Networks) != 0 {
  493. return
  494. }
  495. var networks []*api.NetworkAttachment
  496. if IsIngressNetworkNeeded(s) && a.netCtx.ingressNetwork != nil {
  497. networks = append(networks, &api.NetworkAttachment{Network: a.netCtx.ingressNetwork})
  498. }
  499. a.store.View(func(tx store.ReadTx) {
  500. // Always prefer NetworkAttachmentConfig in the TaskSpec
  501. specNetworks := t.Spec.Networks
  502. if len(specNetworks) == 0 && s != nil && len(s.Spec.Networks) != 0 {
  503. specNetworks = s.Spec.Networks
  504. }
  505. for _, na := range specNetworks {
  506. n := store.GetNetwork(tx, na.Target)
  507. if n == nil {
  508. continue
  509. }
  510. attachment := api.NetworkAttachment{Network: n}
  511. attachment.Aliases = append(attachment.Aliases, na.Aliases...)
  512. attachment.Addresses = append(attachment.Addresses, na.Addresses...)
  513. attachment.DriverAttachmentOpts = na.DriverAttachmentOpts
  514. networks = append(networks, &attachment)
  515. }
  516. })
  517. taskUpdateNetworks(t, networks)
  518. }
  519. func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) {
  520. var (
  521. isDelete bool
  522. t *api.Task
  523. )
  524. // We may have already allocated this task. If a create or update
  525. // event is older than the current version in the store, we run the
  526. // risk of allocating the task a second time. Only operate on the
  527. // latest version of the task.
  528. switch v := ev.(type) {
  529. case api.EventCreateTask:
  530. a.store.View(func(tx store.ReadTx) {
  531. t = store.GetTask(tx, v.Task.ID)
  532. })
  533. case api.EventUpdateTask:
  534. a.store.View(func(tx store.ReadTx) {
  535. t = store.GetTask(tx, v.Task.ID)
  536. })
  537. case api.EventDeleteTask:
  538. isDelete = true
  539. t = v.Task.Copy()
  540. }
  541. if t == nil {
  542. return
  543. }
  544. nc := a.netCtx
  545. // If the task has stopped running then we should free the network
  546. // resources associated with the task right away.
  547. if t.Status.State > api.TaskStateRunning || isDelete {
  548. if nc.nwkAllocator.IsTaskAllocated(t) {
  549. if err := nc.nwkAllocator.DeallocateTask(t); err != nil {
  550. log.G(ctx).WithError(err).Errorf("Failed freeing network resources for task %s", t.ID)
  551. }
  552. }
  553. // Cleanup any task references that might exist
  554. delete(nc.pendingTasks, t.ID)
  555. delete(nc.unallocatedTasks, t.ID)
  556. return
  557. }
  558. // If we are already in allocated state, there is
  559. // absolutely nothing else to do.
  560. if t.Status.State >= api.TaskStatePending {
  561. delete(nc.pendingTasks, t.ID)
  562. delete(nc.unallocatedTasks, t.ID)
  563. return
  564. }
  565. var s *api.Service
  566. if t.ServiceID != "" {
  567. a.store.View(func(tx store.ReadTx) {
  568. s = store.GetService(tx, t.ServiceID)
  569. })
  570. if s == nil {
  571. // If the task is running it is not normal to
  572. // not be able to find the associated
  573. // service. If the task is not running (task
  574. // is either dead or the desired state is set
  575. // to dead) then the service may not be
  576. // available in store. But we still need to
  577. // cleanup network resources associated with
  578. // the task.
  579. if t.Status.State <= api.TaskStateRunning && !isDelete {
  580. log.G(ctx).Errorf("Event %T: Failed to get service %s for task %s state %s: could not find service %s", ev, t.ServiceID, t.ID, t.Status.State, t.ServiceID)
  581. return
  582. }
  583. }
  584. }
  585. // Populate network attachments in the task
  586. // based on service spec.
  587. a.taskCreateNetworkAttachments(t, s)
  588. nc.pendingTasks[t.ID] = t
  589. }
  590. func (a *Allocator) allocateNode(ctx context.Context, node *api.Node) error {
  591. return a.netCtx.nwkAllocator.AllocateNode(node)
  592. }
  593. func (a *Allocator) commitAllocatedNode(ctx context.Context, batch *store.Batch, node *api.Node) error {
  594. if err := batch.Update(func(tx store.Tx) error {
  595. err := store.UpdateNode(tx, node)
  596. if err == store.ErrSequenceConflict {
  597. storeNode := store.GetNode(tx, node.ID)
  598. storeNode.Attachment = node.Attachment.Copy()
  599. err = store.UpdateNode(tx, storeNode)
  600. }
  601. return errors.Wrapf(err, "failed updating state in store transaction for node %s", node.ID)
  602. }); err != nil {
  603. if err := a.netCtx.nwkAllocator.DeallocateNode(node); err != nil {
  604. log.G(ctx).WithError(err).Errorf("failed rolling back allocation of node %s", node.ID)
  605. }
  606. return err
  607. }
  608. return nil
  609. }
  610. // This function prepares the service object for being updated when the change regards
  611. // the published ports in host mode: It resets the runtime state ports (s.Endpoint.Ports)
  612. // to the current ingress mode runtime state ports plus the newly configured publish mode ports,
  613. // so that the service allocation invoked on this new service object will trigger the deallocation
  614. // of any old publish mode port and allocation of any new one.
  615. func updatePortsInHostPublishMode(s *api.Service) {
  616. // First, remove all host-mode ports from s.Endpoint.Ports
  617. if s.Endpoint != nil {
  618. var portConfigs []*api.PortConfig
  619. for _, portConfig := range s.Endpoint.Ports {
  620. if portConfig.PublishMode != api.PublishModeHost {
  621. portConfigs = append(portConfigs, portConfig)
  622. }
  623. }
  624. s.Endpoint.Ports = portConfigs
  625. }
  626. // Add back all host-mode ports
  627. if s.Spec.Endpoint != nil {
  628. if s.Endpoint == nil {
  629. s.Endpoint = &api.Endpoint{}
  630. }
  631. for _, portConfig := range s.Spec.Endpoint.Ports {
  632. if portConfig.PublishMode == api.PublishModeHost {
  633. s.Endpoint.Ports = append(s.Endpoint.Ports, portConfig.Copy())
  634. }
  635. }
  636. }
  637. s.Endpoint.Spec = s.Spec.Endpoint.Copy()
  638. }
  639. func (a *Allocator) allocateService(ctx context.Context, s *api.Service) error {
  640. nc := a.netCtx
  641. if s.Spec.Endpoint != nil {
  642. // service has user-defined endpoint
  643. if s.Endpoint == nil {
  644. // service currently has no allocated endpoint, need allocated.
  645. s.Endpoint = &api.Endpoint{
  646. Spec: s.Spec.Endpoint.Copy(),
  647. }
  648. }
  649. // The service is trying to expose ports to the external
  650. // world. Automatically attach the service to the ingress
  651. // network only if it is not already done.
  652. if IsIngressNetworkNeeded(s) {
  653. if nc.ingressNetwork == nil {
  654. return fmt.Errorf("ingress network is missing")
  655. }
  656. var found bool
  657. for _, vip := range s.Endpoint.VirtualIPs {
  658. if vip.NetworkID == nc.ingressNetwork.ID {
  659. found = true
  660. break
  661. }
  662. }
  663. if !found {
  664. s.Endpoint.VirtualIPs = append(s.Endpoint.VirtualIPs,
  665. &api.Endpoint_VirtualIP{NetworkID: nc.ingressNetwork.ID})
  666. }
  667. }
  668. } else if s.Endpoint != nil {
  669. // service has no user-defined endpoints while has already allocated network resources,
  670. // need deallocated.
  671. if err := nc.nwkAllocator.ServiceDeallocate(s); err != nil {
  672. return err
  673. }
  674. }
  675. if err := nc.nwkAllocator.ServiceAllocate(s); err != nil {
  676. nc.unallocatedServices[s.ID] = s
  677. return err
  678. }
  679. // If the service doesn't expose ports any more and if we have
  680. // any lingering virtual IP references for ingress network
  681. // clean them up here.
  682. if !IsIngressNetworkNeeded(s) && nc.ingressNetwork != nil {
  683. if s.Endpoint != nil {
  684. for i, vip := range s.Endpoint.VirtualIPs {
  685. if vip.NetworkID == nc.ingressNetwork.ID {
  686. n := len(s.Endpoint.VirtualIPs)
  687. s.Endpoint.VirtualIPs[i], s.Endpoint.VirtualIPs[n-1] = s.Endpoint.VirtualIPs[n-1], nil
  688. s.Endpoint.VirtualIPs = s.Endpoint.VirtualIPs[:n-1]
  689. break
  690. }
  691. }
  692. }
  693. }
  694. return nil
  695. }
  696. func (a *Allocator) commitAllocatedService(ctx context.Context, batch *store.Batch, s *api.Service) error {
  697. if err := batch.Update(func(tx store.Tx) error {
  698. err := store.UpdateService(tx, s)
  699. if err == store.ErrSequenceConflict {
  700. storeService := store.GetService(tx, s.ID)
  701. storeService.Endpoint = s.Endpoint
  702. err = store.UpdateService(tx, storeService)
  703. }
  704. return errors.Wrapf(err, "failed updating state in store transaction for service %s", s.ID)
  705. }); err != nil {
  706. if err := a.netCtx.nwkAllocator.ServiceDeallocate(s); err != nil {
  707. log.G(ctx).WithError(err).Errorf("failed rolling back allocation of service %s", s.ID)
  708. }
  709. return err
  710. }
  711. return nil
  712. }
  713. func (a *Allocator) allocateNetwork(ctx context.Context, n *api.Network) error {
  714. nc := a.netCtx
  715. if err := nc.nwkAllocator.Allocate(n); err != nil {
  716. nc.unallocatedNetworks[n.ID] = n
  717. return errors.Wrapf(err, "failed during network allocation for network %s", n.ID)
  718. }
  719. return nil
  720. }
  721. func (a *Allocator) commitAllocatedNetwork(ctx context.Context, batch *store.Batch, n *api.Network) error {
  722. if err := batch.Update(func(tx store.Tx) error {
  723. if err := store.UpdateNetwork(tx, n); err != nil {
  724. return errors.Wrapf(err, "failed updating state in store transaction for network %s", n.ID)
  725. }
  726. return nil
  727. }); err != nil {
  728. if err := a.netCtx.nwkAllocator.Deallocate(n); err != nil {
  729. log.G(ctx).WithError(err).Errorf("failed rolling back allocation of network %s", n.ID)
  730. }
  731. return err
  732. }
  733. return nil
  734. }
  735. func (a *Allocator) allocateTask(ctx context.Context, t *api.Task) (err error) {
  736. taskUpdated := false
  737. nc := a.netCtx
  738. // We might be here even if a task allocation has already
  739. // happened but wasn't successfully committed to store. In such
  740. // cases skip allocation and go straight ahead to updating the
  741. // store.
  742. if !nc.nwkAllocator.IsTaskAllocated(t) {
  743. a.store.View(func(tx store.ReadTx) {
  744. if t.ServiceID != "" {
  745. s := store.GetService(tx, t.ServiceID)
  746. if s == nil {
  747. err = fmt.Errorf("could not find service %s", t.ServiceID)
  748. return
  749. }
  750. if nc.nwkAllocator.ServiceNeedsAllocation(s) {
  751. err = fmt.Errorf("service %s to which this task %s belongs has pending allocations", s.ID, t.ID)
  752. return
  753. }
  754. if s.Endpoint != nil {
  755. taskUpdateEndpoint(t, s.Endpoint)
  756. taskUpdated = true
  757. }
  758. }
  759. for _, na := range t.Networks {
  760. n := store.GetNetwork(tx, na.Network.ID)
  761. if n == nil {
  762. err = fmt.Errorf("failed to retrieve network %s while allocating task %s", na.Network.ID, t.ID)
  763. return
  764. }
  765. if !nc.nwkAllocator.IsAllocated(n) {
  766. err = fmt.Errorf("network %s attached to task %s not allocated yet", n.ID, t.ID)
  767. return
  768. }
  769. na.Network = n
  770. }
  771. if err = nc.nwkAllocator.AllocateTask(t); err != nil {
  772. err = errors.Wrapf(err, "failed during network allocation for task %s", t.ID)
  773. return
  774. }
  775. if nc.nwkAllocator.IsTaskAllocated(t) {
  776. taskUpdated = true
  777. }
  778. })
  779. if err != nil {
  780. return err
  781. }
  782. }
  783. // Update the network allocations and moving to
  784. // PENDING state on top of the latest store state.
  785. if a.taskAllocateVote(networkVoter, t.ID) {
  786. if t.Status.State < api.TaskStatePending {
  787. updateTaskStatus(t, api.TaskStatePending, allocatedStatusMessage)
  788. taskUpdated = true
  789. }
  790. }
  791. if !taskUpdated {
  792. return errNoChanges
  793. }
  794. return nil
  795. }
  796. func (a *Allocator) commitAllocatedTask(ctx context.Context, batch *store.Batch, t *api.Task) error {
  797. return batch.Update(func(tx store.Tx) error {
  798. err := store.UpdateTask(tx, t)
  799. if err == store.ErrSequenceConflict {
  800. storeTask := store.GetTask(tx, t.ID)
  801. taskUpdateNetworks(storeTask, t.Networks)
  802. taskUpdateEndpoint(storeTask, t.Endpoint)
  803. if storeTask.Status.State < api.TaskStatePending {
  804. storeTask.Status = t.Status
  805. }
  806. err = store.UpdateTask(tx, storeTask)
  807. }
  808. return errors.Wrapf(err, "failed updating state in store transaction for task %s", t.ID)
  809. })
  810. }
  811. func (a *Allocator) procUnallocatedNetworks(ctx context.Context) {
  812. nc := a.netCtx
  813. var allocatedNetworks []*api.Network
  814. for _, n := range nc.unallocatedNetworks {
  815. if !nc.nwkAllocator.IsAllocated(n) {
  816. if err := a.allocateNetwork(ctx, n); err != nil {
  817. log.G(ctx).WithError(err).Debugf("Failed allocation of unallocated network %s", n.ID)
  818. continue
  819. }
  820. allocatedNetworks = append(allocatedNetworks, n)
  821. }
  822. }
  823. if len(allocatedNetworks) == 0 {
  824. return
  825. }
  826. err := a.store.Batch(func(batch *store.Batch) error {
  827. for _, n := range allocatedNetworks {
  828. if err := a.commitAllocatedNetwork(ctx, batch, n); err != nil {
  829. log.G(ctx).WithError(err).Debugf("Failed to commit allocation of unallocated network %s", n.ID)
  830. continue
  831. }
  832. delete(nc.unallocatedNetworks, n.ID)
  833. }
  834. return nil
  835. })
  836. if err != nil {
  837. log.G(ctx).WithError(err).Error("Failed to commit allocation of unallocated networks")
  838. // We optimistically removed these from nc.unallocatedNetworks
  839. // above in anticipation of successfully committing the batch,
  840. // but since the transaction has failed, we requeue them here.
  841. for _, n := range allocatedNetworks {
  842. nc.unallocatedNetworks[n.ID] = n
  843. }
  844. }
  845. }
  846. func (a *Allocator) procUnallocatedServices(ctx context.Context) {
  847. nc := a.netCtx
  848. var allocatedServices []*api.Service
  849. for _, s := range nc.unallocatedServices {
  850. if nc.nwkAllocator.ServiceNeedsAllocation(s) {
  851. if err := a.allocateService(ctx, s); err != nil {
  852. log.G(ctx).WithError(err).Debugf("Failed allocation of unallocated service %s", s.ID)
  853. continue
  854. }
  855. allocatedServices = append(allocatedServices, s)
  856. }
  857. }
  858. if len(allocatedServices) == 0 {
  859. return
  860. }
  861. err := a.store.Batch(func(batch *store.Batch) error {
  862. for _, s := range allocatedServices {
  863. if err := a.commitAllocatedService(ctx, batch, s); err != nil {
  864. log.G(ctx).WithError(err).Debugf("Failed to commit allocation of unallocated service %s", s.ID)
  865. continue
  866. }
  867. delete(nc.unallocatedServices, s.ID)
  868. }
  869. return nil
  870. })
  871. if err != nil {
  872. log.G(ctx).WithError(err).Error("Failed to commit allocation of unallocated services")
  873. // We optimistically removed these from nc.unallocatedServices
  874. // above in anticipation of successfully committing the batch,
  875. // but since the transaction has failed, we requeue them here.
  876. for _, s := range allocatedServices {
  877. nc.unallocatedServices[s.ID] = s
  878. }
  879. }
  880. }
  881. func (a *Allocator) procTasksNetwork(ctx context.Context, onRetry bool) {
  882. nc := a.netCtx
  883. quiet := false
  884. toAllocate := nc.pendingTasks
  885. if onRetry {
  886. toAllocate = nc.unallocatedTasks
  887. quiet = true
  888. }
  889. allocatedTasks := make([]*api.Task, 0, len(toAllocate))
  890. for _, t := range toAllocate {
  891. if err := a.allocateTask(ctx, t); err == nil {
  892. allocatedTasks = append(allocatedTasks, t)
  893. } else if err != errNoChanges {
  894. if quiet {
  895. log.G(ctx).WithError(err).Debug("task allocation failure")
  896. } else {
  897. log.G(ctx).WithError(err).Error("task allocation failure")
  898. }
  899. }
  900. }
  901. if len(allocatedTasks) == 0 {
  902. return
  903. }
  904. err := a.store.Batch(func(batch *store.Batch) error {
  905. for _, t := range allocatedTasks {
  906. err := a.commitAllocatedTask(ctx, batch, t)
  907. if err != nil {
  908. log.G(ctx).WithError(err).Error("task allocation commit failure")
  909. continue
  910. }
  911. delete(toAllocate, t.ID)
  912. }
  913. return nil
  914. })
  915. if err != nil {
  916. log.G(ctx).WithError(err).Error("failed a store batch operation while processing tasks")
  917. // We optimistically removed these from toAllocate above in
  918. // anticipation of successfully committing the batch, but since
  919. // the transaction has failed, we requeue them here.
  920. for _, t := range allocatedTasks {
  921. toAllocate[t.ID] = t
  922. }
  923. }
  924. }
  925. // updateTaskStatus sets TaskStatus and updates timestamp.
  926. func updateTaskStatus(t *api.Task, newStatus api.TaskState, message string) {
  927. t.Status.State = newStatus
  928. t.Status.Message = message
  929. t.Status.Timestamp = ptypes.MustTimestampProto(time.Now())
  930. }
  931. // IsIngressNetwork returns whether the passed network is an ingress network.
  932. func IsIngressNetwork(nw *api.Network) bool {
  933. return networkallocator.IsIngressNetwork(nw)
  934. }
  935. // GetIngressNetwork fetches the ingress network from store.
  936. // ErrNoIngress will be returned if the ingress network is not present,
  937. // nil otherwise. In case of any other failure in accessing the store,
  938. // the respective error will be reported as is.
  939. func GetIngressNetwork(s *store.MemoryStore) (*api.Network, error) {
  940. var (
  941. networks []*api.Network
  942. err error
  943. )
  944. s.View(func(tx store.ReadTx) {
  945. networks, err = store.FindNetworks(tx, store.All)
  946. })
  947. if err != nil {
  948. return nil, err
  949. }
  950. for _, n := range networks {
  951. if IsIngressNetwork(n) {
  952. return n, nil
  953. }
  954. }
  955. return nil, ErrNoIngress
  956. }