network.go 29 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024
  1. package allocator
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/docker/go-events"
  6. "github.com/docker/swarmkit/api"
  7. "github.com/docker/swarmkit/identity"
  8. "github.com/docker/swarmkit/log"
  9. "github.com/docker/swarmkit/manager/allocator/networkallocator"
  10. "github.com/docker/swarmkit/manager/state"
  11. "github.com/docker/swarmkit/manager/state/store"
  12. "github.com/docker/swarmkit/protobuf/ptypes"
  13. "github.com/pkg/errors"
  14. "golang.org/x/net/context"
  15. )
  16. const (
  17. // Network allocator Voter ID for task allocation vote.
  18. networkVoter = "network"
  19. ingressNetworkName = "ingress"
  20. ingressSubnet = "10.255.0.0/16"
  21. allocatedStatusMessage = "pending task scheduling"
  22. )
  23. var (
  24. errNoChanges = errors.New("task unchanged")
  25. retryInterval = 5 * time.Minute
  26. )
  27. func newIngressNetwork() *api.Network {
  28. return &api.Network{
  29. Spec: api.NetworkSpec{
  30. Annotations: api.Annotations{
  31. Name: ingressNetworkName,
  32. Labels: map[string]string{
  33. "com.docker.swarm.internal": "true",
  34. },
  35. },
  36. DriverConfig: &api.Driver{},
  37. IPAM: &api.IPAMOptions{
  38. Driver: &api.Driver{},
  39. Configs: []*api.IPAMConfig{
  40. {
  41. Subnet: ingressSubnet,
  42. },
  43. },
  44. },
  45. },
  46. }
  47. }
  48. // Network context information which is used throughout the network allocation code.
  49. type networkContext struct {
  50. ingressNetwork *api.Network
  51. // Instance of the low-level network allocator which performs
  52. // the actual network allocation.
  53. nwkAllocator *networkallocator.NetworkAllocator
  54. // A set of tasks which are ready to be allocated as a batch. This is
  55. // distinct from "unallocatedTasks" which are tasks that failed to
  56. // allocate on the first try, being held for a future retry.
  57. pendingTasks map[string]*api.Task
  58. // A set of unallocated tasks which will be revisited if any thing
  59. // changes in system state that might help task allocation.
  60. unallocatedTasks map[string]*api.Task
  61. // A set of unallocated services which will be revisited if
  62. // any thing changes in system state that might help service
  63. // allocation.
  64. unallocatedServices map[string]*api.Service
  65. // A set of unallocated networks which will be revisited if
  66. // any thing changes in system state that might help network
  67. // allocation.
  68. unallocatedNetworks map[string]*api.Network
  69. // lastRetry is the last timestamp when unallocated
  70. // tasks/services/networks were retried.
  71. lastRetry time.Time
  72. }
  73. func (a *Allocator) doNetworkInit(ctx context.Context) (err error) {
  74. na, err := networkallocator.New(a.pluginGetter)
  75. if err != nil {
  76. return err
  77. }
  78. nc := &networkContext{
  79. nwkAllocator: na,
  80. pendingTasks: make(map[string]*api.Task),
  81. unallocatedTasks: make(map[string]*api.Task),
  82. unallocatedServices: make(map[string]*api.Service),
  83. unallocatedNetworks: make(map[string]*api.Network),
  84. ingressNetwork: newIngressNetwork(),
  85. lastRetry: time.Now(),
  86. }
  87. a.netCtx = nc
  88. defer func() {
  89. // Clear a.netCtx if initialization was unsuccessful.
  90. if err != nil {
  91. a.netCtx = nil
  92. }
  93. }()
  94. // Check if we have the ingress network. If not found create
  95. // it before reading all network objects for allocation.
  96. var networks []*api.Network
  97. a.store.View(func(tx store.ReadTx) {
  98. networks, err = store.FindNetworks(tx, store.ByName(ingressNetworkName))
  99. if len(networks) > 0 {
  100. nc.ingressNetwork = networks[0]
  101. }
  102. })
  103. if err != nil {
  104. return errors.Wrap(err, "failed to find ingress network during init")
  105. }
  106. // If ingress network is not found, create one right away
  107. // using the predefined template.
  108. if len(networks) == 0 {
  109. if err := a.store.Update(func(tx store.Tx) error {
  110. nc.ingressNetwork.ID = identity.NewID()
  111. if err := store.CreateNetwork(tx, nc.ingressNetwork); err != nil {
  112. return err
  113. }
  114. return nil
  115. }); err != nil {
  116. return errors.Wrap(err, "failed to create ingress network")
  117. }
  118. a.store.View(func(tx store.ReadTx) {
  119. networks, err = store.FindNetworks(tx, store.ByName(ingressNetworkName))
  120. if len(networks) > 0 {
  121. nc.ingressNetwork = networks[0]
  122. }
  123. })
  124. if err != nil {
  125. return errors.Wrap(err, "failed to find ingress network after creating it")
  126. }
  127. }
  128. // Try to complete ingress network allocation before anything else so
  129. // that the we can get the preferred subnet for ingress
  130. // network.
  131. if !na.IsAllocated(nc.ingressNetwork) {
  132. if err := a.allocateNetwork(ctx, nc.ingressNetwork); err != nil {
  133. log.G(ctx).WithError(err).Error("failed allocating ingress network during init")
  134. } else if _, err := a.store.Batch(func(batch *store.Batch) error {
  135. if err := a.commitAllocatedNetwork(ctx, batch, nc.ingressNetwork); err != nil {
  136. log.G(ctx).WithError(err).Error("failed committing allocation of ingress network during init")
  137. }
  138. return nil
  139. }); err != nil {
  140. log.G(ctx).WithError(err).Error("failed committing allocation of ingress network during init")
  141. }
  142. }
  143. // Allocate networks in the store so far before we started
  144. // watching.
  145. a.store.View(func(tx store.ReadTx) {
  146. networks, err = store.FindNetworks(tx, store.All)
  147. })
  148. if err != nil {
  149. return errors.Wrap(err, "error listing all networks in store while trying to allocate during init")
  150. }
  151. var allocatedNetworks []*api.Network
  152. for _, n := range networks {
  153. if na.IsAllocated(n) {
  154. continue
  155. }
  156. if err := a.allocateNetwork(ctx, n); err != nil {
  157. log.G(ctx).WithError(err).Errorf("failed allocating network %s during init", n.ID)
  158. continue
  159. }
  160. allocatedNetworks = append(allocatedNetworks, n)
  161. }
  162. if _, err := a.store.Batch(func(batch *store.Batch) error {
  163. for _, n := range allocatedNetworks {
  164. if err := a.commitAllocatedNetwork(ctx, batch, n); err != nil {
  165. log.G(ctx).WithError(err).Errorf("failed committing allocation of network %s during init", n.ID)
  166. }
  167. }
  168. return nil
  169. }); err != nil {
  170. log.G(ctx).WithError(err).Error("failed committing allocation of networks during init")
  171. }
  172. // Allocate nodes in the store so far before we process watched events.
  173. var nodes []*api.Node
  174. a.store.View(func(tx store.ReadTx) {
  175. nodes, err = store.FindNodes(tx, store.All)
  176. })
  177. if err != nil {
  178. return errors.Wrap(err, "error listing all nodes in store while trying to allocate during init")
  179. }
  180. var allocatedNodes []*api.Node
  181. for _, node := range nodes {
  182. if na.IsNodeAllocated(node) {
  183. continue
  184. }
  185. if node.Attachment == nil {
  186. node.Attachment = &api.NetworkAttachment{}
  187. }
  188. node.Attachment.Network = nc.ingressNetwork.Copy()
  189. if err := a.allocateNode(ctx, node); err != nil {
  190. log.G(ctx).WithError(err).Errorf("Failed to allocate network resources for node %s during init", node.ID)
  191. continue
  192. }
  193. allocatedNodes = append(allocatedNodes, node)
  194. }
  195. if _, err := a.store.Batch(func(batch *store.Batch) error {
  196. for _, node := range allocatedNodes {
  197. if err := a.commitAllocatedNode(ctx, batch, node); err != nil {
  198. log.G(ctx).WithError(err).Errorf("Failed to commit allocation of network resources for node %s during init", node.ID)
  199. }
  200. }
  201. return nil
  202. }); err != nil {
  203. log.G(ctx).WithError(err).Error("Failed to commit allocation of network resources for nodes during init")
  204. }
  205. // Allocate services in the store so far before we process watched events.
  206. var services []*api.Service
  207. a.store.View(func(tx store.ReadTx) {
  208. services, err = store.FindServices(tx, store.All)
  209. })
  210. if err != nil {
  211. return errors.Wrap(err, "error listing all services in store while trying to allocate during init")
  212. }
  213. var allocatedServices []*api.Service
  214. for _, s := range services {
  215. if nc.nwkAllocator.IsServiceAllocated(s, networkallocator.OnInit) {
  216. continue
  217. }
  218. if err := a.allocateService(ctx, s); err != nil {
  219. log.G(ctx).WithError(err).Errorf("failed allocating service %s during init", s.ID)
  220. continue
  221. }
  222. allocatedServices = append(allocatedServices, s)
  223. }
  224. if _, err := a.store.Batch(func(batch *store.Batch) error {
  225. for _, s := range allocatedServices {
  226. if err := a.commitAllocatedService(ctx, batch, s); err != nil {
  227. log.G(ctx).WithError(err).Errorf("failed committing allocation of service %s during init", s.ID)
  228. }
  229. }
  230. return nil
  231. }); err != nil {
  232. log.G(ctx).WithError(err).Error("failed committing allocation of services during init")
  233. }
  234. // Allocate tasks in the store so far before we started watching.
  235. var (
  236. tasks []*api.Task
  237. allocatedTasks []*api.Task
  238. )
  239. a.store.View(func(tx store.ReadTx) {
  240. tasks, err = store.FindTasks(tx, store.All)
  241. })
  242. if err != nil {
  243. return errors.Wrap(err, "error listing all tasks in store while trying to allocate during init")
  244. }
  245. for _, t := range tasks {
  246. if t.Status.State > api.TaskStateRunning {
  247. continue
  248. }
  249. var s *api.Service
  250. if t.ServiceID != "" {
  251. a.store.View(func(tx store.ReadTx) {
  252. s = store.GetService(tx, t.ServiceID)
  253. })
  254. }
  255. // Populate network attachments in the task
  256. // based on service spec.
  257. a.taskCreateNetworkAttachments(t, s)
  258. if taskReadyForNetworkVote(t, s, nc) {
  259. if t.Status.State >= api.TaskStatePending {
  260. continue
  261. }
  262. if a.taskAllocateVote(networkVoter, t.ID) {
  263. // If the task is not attached to any network, network
  264. // allocators job is done. Immediately cast a vote so
  265. // that the task can be moved to the PENDING state as
  266. // soon as possible.
  267. updateTaskStatus(t, api.TaskStatePending, allocatedStatusMessage)
  268. allocatedTasks = append(allocatedTasks, t)
  269. }
  270. continue
  271. }
  272. err := a.allocateTask(ctx, t)
  273. if err == nil {
  274. allocatedTasks = append(allocatedTasks, t)
  275. } else if err != errNoChanges {
  276. log.G(ctx).WithError(err).Errorf("failed allocating task %s during init", t.ID)
  277. nc.unallocatedTasks[t.ID] = t
  278. }
  279. }
  280. if _, err := a.store.Batch(func(batch *store.Batch) error {
  281. for _, t := range allocatedTasks {
  282. if err := a.commitAllocatedTask(ctx, batch, t); err != nil {
  283. log.G(ctx).WithError(err).Errorf("failed committing allocation of task %s during init", t.ID)
  284. }
  285. }
  286. return nil
  287. }); err != nil {
  288. log.G(ctx).WithError(err).Error("failed committing allocation of tasks during init")
  289. }
  290. return nil
  291. }
  292. func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
  293. nc := a.netCtx
  294. switch v := ev.(type) {
  295. case state.EventCreateNetwork:
  296. n := v.Network.Copy()
  297. if nc.nwkAllocator.IsAllocated(n) {
  298. break
  299. }
  300. if err := a.allocateNetwork(ctx, n); err != nil {
  301. log.G(ctx).WithError(err).Errorf("Failed allocation for network %s", n.ID)
  302. break
  303. }
  304. if _, err := a.store.Batch(func(batch *store.Batch) error {
  305. return a.commitAllocatedNetwork(ctx, batch, n)
  306. }); err != nil {
  307. log.G(ctx).WithError(err).Errorf("Failed to commit allocation for network %s", n.ID)
  308. }
  309. case state.EventDeleteNetwork:
  310. n := v.Network.Copy()
  311. // The assumption here is that all dependent objects
  312. // have been cleaned up when we are here so the only
  313. // thing that needs to happen is free the network
  314. // resources.
  315. if err := nc.nwkAllocator.Deallocate(n); err != nil {
  316. log.G(ctx).WithError(err).Errorf("Failed during network free for network %s", n.ID)
  317. }
  318. delete(nc.unallocatedNetworks, n.ID)
  319. case state.EventCreateService:
  320. s := v.Service.Copy()
  321. if nc.nwkAllocator.IsServiceAllocated(s) {
  322. break
  323. }
  324. if err := a.allocateService(ctx, s); err != nil {
  325. log.G(ctx).WithError(err).Errorf("Failed allocation for service %s", s.ID)
  326. break
  327. }
  328. if _, err := a.store.Batch(func(batch *store.Batch) error {
  329. return a.commitAllocatedService(ctx, batch, s)
  330. }); err != nil {
  331. log.G(ctx).WithError(err).Errorf("Failed to commit allocation for service %s", s.ID)
  332. }
  333. case state.EventUpdateService:
  334. s := v.Service.Copy()
  335. if nc.nwkAllocator.IsServiceAllocated(s) {
  336. if nc.nwkAllocator.PortsAllocatedInHostPublishMode(s) {
  337. break
  338. }
  339. updatePortsInHostPublishMode(s)
  340. } else {
  341. if err := a.allocateService(ctx, s); err != nil {
  342. log.G(ctx).WithError(err).Errorf("Failed allocation during update of service %s", s.ID)
  343. break
  344. }
  345. }
  346. if _, err := a.store.Batch(func(batch *store.Batch) error {
  347. return a.commitAllocatedService(ctx, batch, s)
  348. }); err != nil {
  349. log.G(ctx).WithError(err).Errorf("Failed to commit allocation during update for service %s", s.ID)
  350. nc.unallocatedServices[s.ID] = s
  351. } else {
  352. delete(nc.unallocatedServices, s.ID)
  353. }
  354. case state.EventDeleteService:
  355. s := v.Service.Copy()
  356. if err := nc.nwkAllocator.ServiceDeallocate(s); err != nil {
  357. log.G(ctx).WithError(err).Errorf("Failed deallocation during delete of service %s", s.ID)
  358. }
  359. // Remove it from unallocatedServices just in case
  360. // it's still there.
  361. delete(nc.unallocatedServices, s.ID)
  362. case state.EventCreateNode, state.EventUpdateNode, state.EventDeleteNode:
  363. a.doNodeAlloc(ctx, ev)
  364. case state.EventCreateTask, state.EventUpdateTask, state.EventDeleteTask:
  365. a.doTaskAlloc(ctx, ev)
  366. case state.EventCommit:
  367. a.procTasksNetwork(ctx, false)
  368. if time.Since(nc.lastRetry) > retryInterval {
  369. a.procUnallocatedNetworks(ctx)
  370. a.procUnallocatedServices(ctx)
  371. a.procTasksNetwork(ctx, true)
  372. nc.lastRetry = time.Now()
  373. }
  374. // Any left over tasks are moved to the unallocated set
  375. for _, t := range nc.pendingTasks {
  376. nc.unallocatedTasks[t.ID] = t
  377. }
  378. nc.pendingTasks = make(map[string]*api.Task)
  379. }
  380. }
  381. func (a *Allocator) doNodeAlloc(ctx context.Context, ev events.Event) {
  382. var (
  383. isDelete bool
  384. node *api.Node
  385. )
  386. switch v := ev.(type) {
  387. case state.EventCreateNode:
  388. node = v.Node.Copy()
  389. case state.EventUpdateNode:
  390. node = v.Node.Copy()
  391. case state.EventDeleteNode:
  392. isDelete = true
  393. node = v.Node.Copy()
  394. }
  395. nc := a.netCtx
  396. if isDelete {
  397. if nc.nwkAllocator.IsNodeAllocated(node) {
  398. if err := nc.nwkAllocator.DeallocateNode(node); err != nil {
  399. log.G(ctx).WithError(err).Errorf("Failed freeing network resources for node %s", node.ID)
  400. }
  401. }
  402. return
  403. }
  404. if !nc.nwkAllocator.IsNodeAllocated(node) {
  405. if node.Attachment == nil {
  406. node.Attachment = &api.NetworkAttachment{}
  407. }
  408. node.Attachment.Network = nc.ingressNetwork.Copy()
  409. if err := a.allocateNode(ctx, node); err != nil {
  410. log.G(ctx).WithError(err).Errorf("Failed to allocate network resources for node %s", node.ID)
  411. return
  412. }
  413. if _, err := a.store.Batch(func(batch *store.Batch) error {
  414. return a.commitAllocatedNode(ctx, batch, node)
  415. }); err != nil {
  416. log.G(ctx).WithError(err).Errorf("Failed to commit allocation of network resources for node %s", node.ID)
  417. }
  418. }
  419. }
  420. // taskReadyForNetworkVote checks if the task is ready for a network
  421. // vote to move it to PENDING state.
  422. func taskReadyForNetworkVote(t *api.Task, s *api.Service, nc *networkContext) bool {
  423. // Task is ready for vote if the following is true:
  424. //
  425. // Task has no network attached or networks attached but all
  426. // of them allocated AND Task's service has no endpoint or
  427. // network configured or service endpoints have been
  428. // allocated.
  429. return (len(t.Networks) == 0 || nc.nwkAllocator.IsTaskAllocated(t)) &&
  430. (s == nil || nc.nwkAllocator.IsServiceAllocated(s))
  431. }
  432. func taskUpdateNetworks(t *api.Task, networks []*api.NetworkAttachment) {
  433. networksCopy := make([]*api.NetworkAttachment, 0, len(networks))
  434. for _, n := range networks {
  435. networksCopy = append(networksCopy, n.Copy())
  436. }
  437. t.Networks = networksCopy
  438. }
  439. func taskUpdateEndpoint(t *api.Task, endpoint *api.Endpoint) {
  440. t.Endpoint = endpoint.Copy()
  441. }
  442. func isIngressNetworkNeeded(s *api.Service) bool {
  443. if s == nil {
  444. return false
  445. }
  446. if s.Spec.Endpoint == nil {
  447. return false
  448. }
  449. for _, p := range s.Spec.Endpoint.Ports {
  450. // The service to which this task belongs is trying to
  451. // expose ports with PublishMode as Ingress to the
  452. // external world. Automatically attach the task to
  453. // the ingress network.
  454. if p.PublishMode == api.PublishModeIngress {
  455. return true
  456. }
  457. }
  458. return false
  459. }
  460. func (a *Allocator) taskCreateNetworkAttachments(t *api.Task, s *api.Service) {
  461. // If task network attachments have already been filled in no
  462. // need to do anything else.
  463. if len(t.Networks) != 0 {
  464. return
  465. }
  466. var networks []*api.NetworkAttachment
  467. if isIngressNetworkNeeded(s) {
  468. networks = append(networks, &api.NetworkAttachment{Network: a.netCtx.ingressNetwork})
  469. }
  470. a.store.View(func(tx store.ReadTx) {
  471. // Always prefer NetworkAttachmentConfig in the TaskSpec
  472. specNetworks := t.Spec.Networks
  473. if len(specNetworks) == 0 && s != nil && len(s.Spec.Networks) != 0 {
  474. specNetworks = s.Spec.Networks
  475. }
  476. for _, na := range specNetworks {
  477. n := store.GetNetwork(tx, na.Target)
  478. if n == nil {
  479. continue
  480. }
  481. attachment := api.NetworkAttachment{Network: n}
  482. attachment.Aliases = append(attachment.Aliases, na.Aliases...)
  483. attachment.Addresses = append(attachment.Addresses, na.Addresses...)
  484. networks = append(networks, &attachment)
  485. }
  486. })
  487. taskUpdateNetworks(t, networks)
  488. }
  489. func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) {
  490. var (
  491. isDelete bool
  492. t *api.Task
  493. )
  494. switch v := ev.(type) {
  495. case state.EventCreateTask:
  496. t = v.Task.Copy()
  497. case state.EventUpdateTask:
  498. t = v.Task.Copy()
  499. case state.EventDeleteTask:
  500. isDelete = true
  501. t = v.Task.Copy()
  502. }
  503. nc := a.netCtx
  504. // If the task has stopped running then we should free the network
  505. // resources associated with the task right away.
  506. if t.Status.State > api.TaskStateRunning || isDelete {
  507. if nc.nwkAllocator.IsTaskAllocated(t) {
  508. if err := nc.nwkAllocator.DeallocateTask(t); err != nil {
  509. log.G(ctx).WithError(err).Errorf("Failed freeing network resources for task %s", t.ID)
  510. }
  511. }
  512. // Cleanup any task references that might exist
  513. delete(nc.pendingTasks, t.ID)
  514. delete(nc.unallocatedTasks, t.ID)
  515. return
  516. }
  517. // If we are already in allocated state, there is
  518. // absolutely nothing else to do.
  519. if t.Status.State >= api.TaskStatePending {
  520. delete(nc.pendingTasks, t.ID)
  521. delete(nc.unallocatedTasks, t.ID)
  522. return
  523. }
  524. var s *api.Service
  525. if t.ServiceID != "" {
  526. a.store.View(func(tx store.ReadTx) {
  527. s = store.GetService(tx, t.ServiceID)
  528. })
  529. if s == nil {
  530. // If the task is running it is not normal to
  531. // not be able to find the associated
  532. // service. If the task is not running (task
  533. // is either dead or the desired state is set
  534. // to dead) then the service may not be
  535. // available in store. But we still need to
  536. // cleanup network resources associated with
  537. // the task.
  538. if t.Status.State <= api.TaskStateRunning && !isDelete {
  539. log.G(ctx).Errorf("Event %T: Failed to get service %s for task %s state %s: could not find service %s", ev, t.ServiceID, t.ID, t.Status.State, t.ServiceID)
  540. return
  541. }
  542. }
  543. }
  544. // Populate network attachments in the task
  545. // based on service spec.
  546. a.taskCreateNetworkAttachments(t, s)
  547. nc.pendingTasks[t.ID] = t
  548. }
  549. func (a *Allocator) allocateNode(ctx context.Context, node *api.Node) error {
  550. return a.netCtx.nwkAllocator.AllocateNode(node)
  551. }
  552. func (a *Allocator) commitAllocatedNode(ctx context.Context, batch *store.Batch, node *api.Node) error {
  553. if err := batch.Update(func(tx store.Tx) error {
  554. err := store.UpdateNode(tx, node)
  555. if err == store.ErrSequenceConflict {
  556. storeNode := store.GetNode(tx, node.ID)
  557. storeNode.Attachment = node.Attachment.Copy()
  558. err = store.UpdateNode(tx, storeNode)
  559. }
  560. return errors.Wrapf(err, "failed updating state in store transaction for node %s", node.ID)
  561. }); err != nil {
  562. if err := a.netCtx.nwkAllocator.DeallocateNode(node); err != nil {
  563. log.G(ctx).WithError(err).Errorf("failed rolling back allocation of node %s", node.ID)
  564. }
  565. return err
  566. }
  567. return nil
  568. }
  569. // This function prepares the service object for being updated when the change regards
  570. // the published ports in host mode: It resets the runtime state ports (s.Endpoint.Ports)
  571. // to the current ingress mode runtime state ports plus the newly configured publish mode ports,
  572. // so that the service allocation invoked on this new service object will trigger the deallocation
  573. // of any old publish mode port and allocation of any new one.
  574. func updatePortsInHostPublishMode(s *api.Service) {
  575. if s.Endpoint != nil {
  576. var portConfigs []*api.PortConfig
  577. for _, portConfig := range s.Endpoint.Ports {
  578. if portConfig.PublishMode == api.PublishModeIngress {
  579. portConfigs = append(portConfigs, portConfig)
  580. }
  581. }
  582. s.Endpoint.Ports = portConfigs
  583. }
  584. if s.Spec.Endpoint != nil {
  585. if s.Endpoint == nil {
  586. s.Endpoint = &api.Endpoint{}
  587. }
  588. for _, portConfig := range s.Spec.Endpoint.Ports {
  589. if portConfig.PublishMode == api.PublishModeIngress {
  590. continue
  591. }
  592. s.Endpoint.Ports = append(s.Endpoint.Ports, portConfig.Copy())
  593. }
  594. s.Endpoint.Spec = s.Spec.Endpoint.Copy()
  595. }
  596. }
  597. func (a *Allocator) allocateService(ctx context.Context, s *api.Service) error {
  598. nc := a.netCtx
  599. if s.Spec.Endpoint != nil {
  600. // service has user-defined endpoint
  601. if s.Endpoint == nil {
  602. // service currently has no allocated endpoint, need allocated.
  603. s.Endpoint = &api.Endpoint{
  604. Spec: s.Spec.Endpoint.Copy(),
  605. }
  606. }
  607. // The service is trying to expose ports to the external
  608. // world. Automatically attach the service to the ingress
  609. // network only if it is not already done.
  610. if isIngressNetworkNeeded(s) {
  611. var found bool
  612. for _, vip := range s.Endpoint.VirtualIPs {
  613. if vip.NetworkID == nc.ingressNetwork.ID {
  614. found = true
  615. break
  616. }
  617. }
  618. if !found {
  619. s.Endpoint.VirtualIPs = append(s.Endpoint.VirtualIPs,
  620. &api.Endpoint_VirtualIP{NetworkID: nc.ingressNetwork.ID})
  621. }
  622. }
  623. } else if s.Endpoint != nil {
  624. // service has no user-defined endpoints while has already allocated network resources,
  625. // need deallocated.
  626. if err := nc.nwkAllocator.ServiceDeallocate(s); err != nil {
  627. return err
  628. }
  629. }
  630. if err := nc.nwkAllocator.ServiceAllocate(s); err != nil {
  631. nc.unallocatedServices[s.ID] = s
  632. return err
  633. }
  634. // If the service doesn't expose ports any more and if we have
  635. // any lingering virtual IP references for ingress network
  636. // clean them up here.
  637. if !isIngressNetworkNeeded(s) {
  638. if s.Endpoint != nil {
  639. for i, vip := range s.Endpoint.VirtualIPs {
  640. if vip.NetworkID == nc.ingressNetwork.ID {
  641. n := len(s.Endpoint.VirtualIPs)
  642. s.Endpoint.VirtualIPs[i], s.Endpoint.VirtualIPs[n-1] = s.Endpoint.VirtualIPs[n-1], nil
  643. s.Endpoint.VirtualIPs = s.Endpoint.VirtualIPs[:n-1]
  644. break
  645. }
  646. }
  647. }
  648. }
  649. return nil
  650. }
  651. func (a *Allocator) commitAllocatedService(ctx context.Context, batch *store.Batch, s *api.Service) error {
  652. if err := batch.Update(func(tx store.Tx) error {
  653. err := store.UpdateService(tx, s)
  654. if err == store.ErrSequenceConflict {
  655. storeService := store.GetService(tx, s.ID)
  656. storeService.Endpoint = s.Endpoint
  657. err = store.UpdateService(tx, storeService)
  658. }
  659. return errors.Wrapf(err, "failed updating state in store transaction for service %s", s.ID)
  660. }); err != nil {
  661. if err := a.netCtx.nwkAllocator.ServiceDeallocate(s); err != nil {
  662. log.G(ctx).WithError(err).Errorf("failed rolling back allocation of service %s", s.ID)
  663. }
  664. return err
  665. }
  666. return nil
  667. }
  668. func (a *Allocator) allocateNetwork(ctx context.Context, n *api.Network) error {
  669. nc := a.netCtx
  670. if err := nc.nwkAllocator.Allocate(n); err != nil {
  671. nc.unallocatedNetworks[n.ID] = n
  672. return errors.Wrapf(err, "failed during network allocation for network %s", n.ID)
  673. }
  674. return nil
  675. }
  676. func (a *Allocator) commitAllocatedNetwork(ctx context.Context, batch *store.Batch, n *api.Network) error {
  677. if err := batch.Update(func(tx store.Tx) error {
  678. if err := store.UpdateNetwork(tx, n); err != nil {
  679. return errors.Wrapf(err, "failed updating state in store transaction for network %s", n.ID)
  680. }
  681. return nil
  682. }); err != nil {
  683. if err := a.netCtx.nwkAllocator.Deallocate(n); err != nil {
  684. log.G(ctx).WithError(err).Errorf("failed rolling back allocation of network %s", n.ID)
  685. }
  686. return err
  687. }
  688. return nil
  689. }
  690. func (a *Allocator) allocateTask(ctx context.Context, t *api.Task) (err error) {
  691. taskUpdated := false
  692. nc := a.netCtx
  693. // We might be here even if a task allocation has already
  694. // happened but wasn't successfully committed to store. In such
  695. // cases skip allocation and go straight ahead to updating the
  696. // store.
  697. if !nc.nwkAllocator.IsTaskAllocated(t) {
  698. a.store.View(func(tx store.ReadTx) {
  699. if t.ServiceID != "" {
  700. s := store.GetService(tx, t.ServiceID)
  701. if s == nil {
  702. err = fmt.Errorf("could not find service %s", t.ServiceID)
  703. return
  704. }
  705. if !nc.nwkAllocator.IsServiceAllocated(s) {
  706. err = fmt.Errorf("service %s to which this task %s belongs has pending allocations", s.ID, t.ID)
  707. return
  708. }
  709. if s.Endpoint != nil {
  710. taskUpdateEndpoint(t, s.Endpoint)
  711. taskUpdated = true
  712. }
  713. }
  714. for _, na := range t.Networks {
  715. n := store.GetNetwork(tx, na.Network.ID)
  716. if n == nil {
  717. err = fmt.Errorf("failed to retrieve network %s while allocating task %s", na.Network.ID, t.ID)
  718. return
  719. }
  720. if !nc.nwkAllocator.IsAllocated(n) {
  721. err = fmt.Errorf("network %s attached to task %s not allocated yet", n.ID, t.ID)
  722. return
  723. }
  724. na.Network = n
  725. }
  726. if err = nc.nwkAllocator.AllocateTask(t); err != nil {
  727. err = errors.Wrapf(err, "failed during networktask allocation for task %s", t.ID)
  728. return
  729. }
  730. if nc.nwkAllocator.IsTaskAllocated(t) {
  731. taskUpdated = true
  732. }
  733. })
  734. if err != nil {
  735. return err
  736. }
  737. }
  738. // Update the network allocations and moving to
  739. // PENDING state on top of the latest store state.
  740. if a.taskAllocateVote(networkVoter, t.ID) {
  741. if t.Status.State < api.TaskStatePending {
  742. updateTaskStatus(t, api.TaskStatePending, allocatedStatusMessage)
  743. taskUpdated = true
  744. }
  745. }
  746. if !taskUpdated {
  747. return errNoChanges
  748. }
  749. return nil
  750. }
  751. func (a *Allocator) commitAllocatedTask(ctx context.Context, batch *store.Batch, t *api.Task) error {
  752. return batch.Update(func(tx store.Tx) error {
  753. err := store.UpdateTask(tx, t)
  754. if err == store.ErrSequenceConflict {
  755. storeTask := store.GetTask(tx, t.ID)
  756. taskUpdateNetworks(storeTask, t.Networks)
  757. taskUpdateEndpoint(storeTask, t.Endpoint)
  758. if storeTask.Status.State < api.TaskStatePending {
  759. storeTask.Status = t.Status
  760. }
  761. err = store.UpdateTask(tx, storeTask)
  762. }
  763. return errors.Wrapf(err, "failed updating state in store transaction for task %s", t.ID)
  764. })
  765. }
  766. func (a *Allocator) procUnallocatedNetworks(ctx context.Context) {
  767. nc := a.netCtx
  768. var allocatedNetworks []*api.Network
  769. for _, n := range nc.unallocatedNetworks {
  770. if !nc.nwkAllocator.IsAllocated(n) {
  771. if err := a.allocateNetwork(ctx, n); err != nil {
  772. log.G(ctx).WithError(err).Debugf("Failed allocation of unallocated network %s", n.ID)
  773. continue
  774. }
  775. allocatedNetworks = append(allocatedNetworks, n)
  776. }
  777. }
  778. if len(allocatedNetworks) == 0 {
  779. return
  780. }
  781. committed, err := a.store.Batch(func(batch *store.Batch) error {
  782. for _, n := range allocatedNetworks {
  783. if err := a.commitAllocatedNetwork(ctx, batch, n); err != nil {
  784. log.G(ctx).WithError(err).Debugf("Failed to commit allocation of unallocated network %s", n.ID)
  785. continue
  786. }
  787. }
  788. return nil
  789. })
  790. if err != nil {
  791. log.G(ctx).WithError(err).Error("Failed to commit allocation of unallocated networks")
  792. }
  793. for _, n := range allocatedNetworks[:committed] {
  794. delete(nc.unallocatedNetworks, n.ID)
  795. }
  796. }
  797. func (a *Allocator) procUnallocatedServices(ctx context.Context) {
  798. nc := a.netCtx
  799. var allocatedServices []*api.Service
  800. for _, s := range nc.unallocatedServices {
  801. if !nc.nwkAllocator.IsServiceAllocated(s) {
  802. if err := a.allocateService(ctx, s); err != nil {
  803. log.G(ctx).WithError(err).Debugf("Failed allocation of unallocated service %s", s.ID)
  804. continue
  805. }
  806. allocatedServices = append(allocatedServices, s)
  807. }
  808. }
  809. if len(allocatedServices) == 0 {
  810. return
  811. }
  812. committed, err := a.store.Batch(func(batch *store.Batch) error {
  813. for _, s := range allocatedServices {
  814. if err := a.commitAllocatedService(ctx, batch, s); err != nil {
  815. log.G(ctx).WithError(err).Debugf("Failed to commit allocation of unallocated service %s", s.ID)
  816. continue
  817. }
  818. }
  819. return nil
  820. })
  821. if err != nil {
  822. log.G(ctx).WithError(err).Error("Failed to commit allocation of unallocated services")
  823. }
  824. for _, s := range allocatedServices[:committed] {
  825. delete(nc.unallocatedServices, s.ID)
  826. }
  827. }
  828. func (a *Allocator) procTasksNetwork(ctx context.Context, onRetry bool) {
  829. nc := a.netCtx
  830. quiet := false
  831. toAllocate := nc.pendingTasks
  832. if onRetry {
  833. toAllocate = nc.unallocatedTasks
  834. quiet = true
  835. }
  836. allocatedTasks := make([]*api.Task, 0, len(toAllocate))
  837. for _, t := range toAllocate {
  838. if err := a.allocateTask(ctx, t); err == nil {
  839. allocatedTasks = append(allocatedTasks, t)
  840. } else if err != errNoChanges {
  841. if quiet {
  842. log.G(ctx).WithError(err).Debug("task allocation failure")
  843. } else {
  844. log.G(ctx).WithError(err).Error("task allocation failure")
  845. }
  846. }
  847. }
  848. if len(allocatedTasks) == 0 {
  849. return
  850. }
  851. committed, err := a.store.Batch(func(batch *store.Batch) error {
  852. for _, t := range allocatedTasks {
  853. err := a.commitAllocatedTask(ctx, batch, t)
  854. if err != nil {
  855. log.G(ctx).WithError(err).Error("task allocation commit failure")
  856. continue
  857. }
  858. }
  859. return nil
  860. })
  861. if err != nil {
  862. log.G(ctx).WithError(err).Error("failed a store batch operation while processing tasks")
  863. }
  864. for _, t := range allocatedTasks[:committed] {
  865. delete(toAllocate, t.ID)
  866. }
  867. }
  868. // updateTaskStatus sets TaskStatus and updates timestamp.
  869. func updateTaskStatus(t *api.Task, newStatus api.TaskState, message string) {
  870. t.Status.State = newStatus
  871. t.Status.Message = message
  872. t.Status.Timestamp = ptypes.MustTimestampProto(time.Now())
  873. }