network.go 31 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078
  1. package allocator
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/docker/go-events"
  6. "github.com/docker/swarmkit/api"
  7. "github.com/docker/swarmkit/log"
  8. "github.com/docker/swarmkit/manager/allocator/networkallocator"
  9. "github.com/docker/swarmkit/manager/state"
  10. "github.com/docker/swarmkit/manager/state/store"
  11. "github.com/docker/swarmkit/protobuf/ptypes"
  12. "github.com/pkg/errors"
  13. "golang.org/x/net/context"
  14. )
  15. const (
  16. // Network allocator Voter ID for task allocation vote.
  17. networkVoter = "network"
  18. allocatedStatusMessage = "pending task scheduling"
  19. )
  20. var (
  21. // ErrNoIngress is returned when no ingress network is found in store
  22. ErrNoIngress = errors.New("no ingress network found")
  23. errNoChanges = errors.New("task unchanged")
  24. retryInterval = 5 * time.Minute
  25. )
  26. // Network context information which is used throughout the network allocation code.
  27. type networkContext struct {
  28. ingressNetwork *api.Network
  29. // Instance of the low-level network allocator which performs
  30. // the actual network allocation.
  31. nwkAllocator *networkallocator.NetworkAllocator
  32. // A set of tasks which are ready to be allocated as a batch. This is
  33. // distinct from "unallocatedTasks" which are tasks that failed to
  34. // allocate on the first try, being held for a future retry.
  35. pendingTasks map[string]*api.Task
  36. // A set of unallocated tasks which will be revisited if any thing
  37. // changes in system state that might help task allocation.
  38. unallocatedTasks map[string]*api.Task
  39. // A set of unallocated services which will be revisited if
  40. // any thing changes in system state that might help service
  41. // allocation.
  42. unallocatedServices map[string]*api.Service
  43. // A set of unallocated networks which will be revisited if
  44. // any thing changes in system state that might help network
  45. // allocation.
  46. unallocatedNetworks map[string]*api.Network
  47. // lastRetry is the last timestamp when unallocated
  48. // tasks/services/networks were retried.
  49. lastRetry time.Time
  50. }
  51. func (a *Allocator) doNetworkInit(ctx context.Context) (err error) {
  52. na, err := networkallocator.New(a.pluginGetter)
  53. if err != nil {
  54. return err
  55. }
  56. nc := &networkContext{
  57. nwkAllocator: na,
  58. pendingTasks: make(map[string]*api.Task),
  59. unallocatedTasks: make(map[string]*api.Task),
  60. unallocatedServices: make(map[string]*api.Service),
  61. unallocatedNetworks: make(map[string]*api.Network),
  62. lastRetry: time.Now(),
  63. }
  64. a.netCtx = nc
  65. defer func() {
  66. // Clear a.netCtx if initialization was unsuccessful.
  67. if err != nil {
  68. a.netCtx = nil
  69. }
  70. }()
  71. // Ingress network is now created at cluster's first time creation.
  72. // Check if we have the ingress network. If found, make sure it is
  73. // allocated, before reading all network objects for allocation.
  74. // If not found, it means it was removed by user, nothing to do here.
  75. ingressNetwork, err := GetIngressNetwork(a.store)
  76. switch err {
  77. case nil:
  78. // Try to complete ingress network allocation before anything else so
  79. // that the we can get the preferred subnet for ingress network.
  80. nc.ingressNetwork = ingressNetwork
  81. if !na.IsAllocated(nc.ingressNetwork) {
  82. if err := a.allocateNetwork(ctx, nc.ingressNetwork); err != nil {
  83. log.G(ctx).WithError(err).Error("failed allocating ingress network during init")
  84. } else if _, err := a.store.Batch(func(batch *store.Batch) error {
  85. if err := a.commitAllocatedNetwork(ctx, batch, nc.ingressNetwork); err != nil {
  86. log.G(ctx).WithError(err).Error("failed committing allocation of ingress network during init")
  87. }
  88. return nil
  89. }); err != nil {
  90. log.G(ctx).WithError(err).Error("failed committing allocation of ingress network during init")
  91. }
  92. }
  93. case ErrNoIngress:
  94. // Ingress network is not present in store, It means user removed it
  95. // and did not create a new one.
  96. default:
  97. return errors.Wrap(err, "failure while looking for ingress network during init")
  98. }
  99. // Allocate networks in the store so far before we started
  100. // watching.
  101. var networks []*api.Network
  102. a.store.View(func(tx store.ReadTx) {
  103. networks, err = store.FindNetworks(tx, store.All)
  104. })
  105. if err != nil {
  106. return errors.Wrap(err, "error listing all networks in store while trying to allocate during init")
  107. }
  108. var allocatedNetworks []*api.Network
  109. for _, n := range networks {
  110. if na.IsAllocated(n) {
  111. continue
  112. }
  113. if err := a.allocateNetwork(ctx, n); err != nil {
  114. log.G(ctx).WithError(err).Errorf("failed allocating network %s during init", n.ID)
  115. continue
  116. }
  117. allocatedNetworks = append(allocatedNetworks, n)
  118. }
  119. if _, err := a.store.Batch(func(batch *store.Batch) error {
  120. for _, n := range allocatedNetworks {
  121. if err := a.commitAllocatedNetwork(ctx, batch, n); err != nil {
  122. log.G(ctx).WithError(err).Errorf("failed committing allocation of network %s during init", n.ID)
  123. }
  124. }
  125. return nil
  126. }); err != nil {
  127. log.G(ctx).WithError(err).Error("failed committing allocation of networks during init")
  128. }
  129. // Allocate nodes in the store so far before we process watched events,
  130. // if the ingress network is present.
  131. if nc.ingressNetwork != nil {
  132. if err := a.allocateNodes(ctx); err != nil {
  133. return err
  134. }
  135. }
  136. // Allocate services in the store so far before we process watched events.
  137. var services []*api.Service
  138. a.store.View(func(tx store.ReadTx) {
  139. services, err = store.FindServices(tx, store.All)
  140. })
  141. if err != nil {
  142. return errors.Wrap(err, "error listing all services in store while trying to allocate during init")
  143. }
  144. var allocatedServices []*api.Service
  145. for _, s := range services {
  146. if nc.nwkAllocator.IsServiceAllocated(s, networkallocator.OnInit) {
  147. continue
  148. }
  149. if err := a.allocateService(ctx, s); err != nil {
  150. log.G(ctx).WithError(err).Errorf("failed allocating service %s during init", s.ID)
  151. continue
  152. }
  153. allocatedServices = append(allocatedServices, s)
  154. }
  155. if _, err := a.store.Batch(func(batch *store.Batch) error {
  156. for _, s := range allocatedServices {
  157. if err := a.commitAllocatedService(ctx, batch, s); err != nil {
  158. log.G(ctx).WithError(err).Errorf("failed committing allocation of service %s during init", s.ID)
  159. }
  160. }
  161. return nil
  162. }); err != nil {
  163. log.G(ctx).WithError(err).Error("failed committing allocation of services during init")
  164. }
  165. // Allocate tasks in the store so far before we started watching.
  166. var (
  167. tasks []*api.Task
  168. allocatedTasks []*api.Task
  169. )
  170. a.store.View(func(tx store.ReadTx) {
  171. tasks, err = store.FindTasks(tx, store.All)
  172. })
  173. if err != nil {
  174. return errors.Wrap(err, "error listing all tasks in store while trying to allocate during init")
  175. }
  176. for _, t := range tasks {
  177. if t.Status.State > api.TaskStateRunning {
  178. continue
  179. }
  180. var s *api.Service
  181. if t.ServiceID != "" {
  182. a.store.View(func(tx store.ReadTx) {
  183. s = store.GetService(tx, t.ServiceID)
  184. })
  185. }
  186. // Populate network attachments in the task
  187. // based on service spec.
  188. a.taskCreateNetworkAttachments(t, s)
  189. if taskReadyForNetworkVote(t, s, nc) {
  190. if t.Status.State >= api.TaskStatePending {
  191. continue
  192. }
  193. if a.taskAllocateVote(networkVoter, t.ID) {
  194. // If the task is not attached to any network, network
  195. // allocators job is done. Immediately cast a vote so
  196. // that the task can be moved to the PENDING state as
  197. // soon as possible.
  198. updateTaskStatus(t, api.TaskStatePending, allocatedStatusMessage)
  199. allocatedTasks = append(allocatedTasks, t)
  200. }
  201. continue
  202. }
  203. err := a.allocateTask(ctx, t)
  204. if err == nil {
  205. allocatedTasks = append(allocatedTasks, t)
  206. } else if err != errNoChanges {
  207. log.G(ctx).WithError(err).Errorf("failed allocating task %s during init", t.ID)
  208. nc.unallocatedTasks[t.ID] = t
  209. }
  210. }
  211. if _, err := a.store.Batch(func(batch *store.Batch) error {
  212. for _, t := range allocatedTasks {
  213. if err := a.commitAllocatedTask(ctx, batch, t); err != nil {
  214. log.G(ctx).WithError(err).Errorf("failed committing allocation of task %s during init", t.ID)
  215. }
  216. }
  217. return nil
  218. }); err != nil {
  219. log.G(ctx).WithError(err).Error("failed committing allocation of tasks during init")
  220. }
  221. return nil
  222. }
  223. func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
  224. nc := a.netCtx
  225. switch v := ev.(type) {
  226. case state.EventCreateNetwork:
  227. n := v.Network.Copy()
  228. if nc.nwkAllocator.IsAllocated(n) {
  229. break
  230. }
  231. if IsIngressNetwork(n) && nc.ingressNetwork != nil {
  232. log.G(ctx).Errorf("Cannot allocate ingress network %s (%s) because another ingress network is already present: %s (%s)",
  233. n.ID, n.Spec.Annotations.Name, nc.ingressNetwork.ID, nc.ingressNetwork.Spec.Annotations)
  234. break
  235. }
  236. if err := a.allocateNetwork(ctx, n); err != nil {
  237. log.G(ctx).WithError(err).Errorf("Failed allocation for network %s", n.ID)
  238. break
  239. }
  240. if _, err := a.store.Batch(func(batch *store.Batch) error {
  241. return a.commitAllocatedNetwork(ctx, batch, n)
  242. }); err != nil {
  243. log.G(ctx).WithError(err).Errorf("Failed to commit allocation for network %s", n.ID)
  244. }
  245. if IsIngressNetwork(n) {
  246. nc.ingressNetwork = n
  247. err := a.allocateNodes(ctx)
  248. if err != nil {
  249. log.G(ctx).WithError(err).Error(err)
  250. }
  251. }
  252. case state.EventDeleteNetwork:
  253. n := v.Network.Copy()
  254. if IsIngressNetwork(n) && nc.ingressNetwork.ID == n.ID {
  255. nc.ingressNetwork = nil
  256. if err := a.deallocateNodes(ctx); err != nil {
  257. log.G(ctx).WithError(err).Error(err)
  258. }
  259. }
  260. // The assumption here is that all dependent objects
  261. // have been cleaned up when we are here so the only
  262. // thing that needs to happen is free the network
  263. // resources.
  264. if err := nc.nwkAllocator.Deallocate(n); err != nil {
  265. log.G(ctx).WithError(err).Errorf("Failed during network free for network %s", n.ID)
  266. }
  267. delete(nc.unallocatedNetworks, n.ID)
  268. case state.EventCreateService:
  269. s := v.Service.Copy()
  270. if nc.nwkAllocator.IsServiceAllocated(s) {
  271. break
  272. }
  273. if err := a.allocateService(ctx, s); err != nil {
  274. log.G(ctx).WithError(err).Errorf("Failed allocation for service %s", s.ID)
  275. break
  276. }
  277. if _, err := a.store.Batch(func(batch *store.Batch) error {
  278. return a.commitAllocatedService(ctx, batch, s)
  279. }); err != nil {
  280. log.G(ctx).WithError(err).Errorf("Failed to commit allocation for service %s", s.ID)
  281. }
  282. case state.EventUpdateService:
  283. s := v.Service.Copy()
  284. if nc.nwkAllocator.IsServiceAllocated(s) {
  285. if nc.nwkAllocator.PortsAllocatedInHostPublishMode(s) {
  286. break
  287. }
  288. updatePortsInHostPublishMode(s)
  289. } else {
  290. if err := a.allocateService(ctx, s); err != nil {
  291. log.G(ctx).WithError(err).Errorf("Failed allocation during update of service %s", s.ID)
  292. break
  293. }
  294. }
  295. if _, err := a.store.Batch(func(batch *store.Batch) error {
  296. return a.commitAllocatedService(ctx, batch, s)
  297. }); err != nil {
  298. log.G(ctx).WithError(err).Errorf("Failed to commit allocation during update for service %s", s.ID)
  299. nc.unallocatedServices[s.ID] = s
  300. } else {
  301. delete(nc.unallocatedServices, s.ID)
  302. }
  303. case state.EventDeleteService:
  304. s := v.Service.Copy()
  305. if err := nc.nwkAllocator.ServiceDeallocate(s); err != nil {
  306. log.G(ctx).WithError(err).Errorf("Failed deallocation during delete of service %s", s.ID)
  307. }
  308. // Remove it from unallocatedServices just in case
  309. // it's still there.
  310. delete(nc.unallocatedServices, s.ID)
  311. case state.EventCreateNode, state.EventUpdateNode, state.EventDeleteNode:
  312. a.doNodeAlloc(ctx, ev)
  313. case state.EventCreateTask, state.EventUpdateTask, state.EventDeleteTask:
  314. a.doTaskAlloc(ctx, ev)
  315. case state.EventCommit:
  316. a.procTasksNetwork(ctx, false)
  317. if time.Since(nc.lastRetry) > retryInterval {
  318. a.procUnallocatedNetworks(ctx)
  319. a.procUnallocatedServices(ctx)
  320. a.procTasksNetwork(ctx, true)
  321. nc.lastRetry = time.Now()
  322. }
  323. // Any left over tasks are moved to the unallocated set
  324. for _, t := range nc.pendingTasks {
  325. nc.unallocatedTasks[t.ID] = t
  326. }
  327. nc.pendingTasks = make(map[string]*api.Task)
  328. }
  329. }
  330. func (a *Allocator) doNodeAlloc(ctx context.Context, ev events.Event) {
  331. var (
  332. isDelete bool
  333. node *api.Node
  334. )
  335. switch v := ev.(type) {
  336. case state.EventCreateNode:
  337. node = v.Node.Copy()
  338. case state.EventUpdateNode:
  339. node = v.Node.Copy()
  340. case state.EventDeleteNode:
  341. isDelete = true
  342. node = v.Node.Copy()
  343. }
  344. nc := a.netCtx
  345. if isDelete {
  346. if nc.nwkAllocator.IsNodeAllocated(node) {
  347. if err := nc.nwkAllocator.DeallocateNode(node); err != nil {
  348. log.G(ctx).WithError(err).Errorf("Failed freeing network resources for node %s", node.ID)
  349. }
  350. }
  351. return
  352. }
  353. if !nc.nwkAllocator.IsNodeAllocated(node) && nc.ingressNetwork != nil {
  354. if node.Attachment == nil {
  355. node.Attachment = &api.NetworkAttachment{}
  356. }
  357. node.Attachment.Network = nc.ingressNetwork.Copy()
  358. if err := a.allocateNode(ctx, node); err != nil {
  359. log.G(ctx).WithError(err).Errorf("Failed to allocate network resources for node %s", node.ID)
  360. return
  361. }
  362. if _, err := a.store.Batch(func(batch *store.Batch) error {
  363. return a.commitAllocatedNode(ctx, batch, node)
  364. }); err != nil {
  365. log.G(ctx).WithError(err).Errorf("Failed to commit allocation of network resources for node %s", node.ID)
  366. }
  367. }
  368. }
  369. func (a *Allocator) allocateNodes(ctx context.Context) error {
  370. // Allocate nodes in the store so far before we process watched events.
  371. var (
  372. allocatedNodes []*api.Node
  373. nodes []*api.Node
  374. err error
  375. nc = a.netCtx
  376. )
  377. a.store.View(func(tx store.ReadTx) {
  378. nodes, err = store.FindNodes(tx, store.All)
  379. })
  380. if err != nil {
  381. return errors.Wrap(err, "error listing all nodes in store while trying to allocate network resources")
  382. }
  383. for _, node := range nodes {
  384. if nc.nwkAllocator.IsNodeAllocated(node) {
  385. continue
  386. }
  387. if node.Attachment == nil {
  388. node.Attachment = &api.NetworkAttachment{}
  389. }
  390. node.Attachment.Network = nc.ingressNetwork.Copy()
  391. if err := a.allocateNode(ctx, node); err != nil {
  392. log.G(ctx).WithError(err).Errorf("Failed to allocate network resources for node %s", node.ID)
  393. continue
  394. }
  395. allocatedNodes = append(allocatedNodes, node)
  396. }
  397. if _, err := a.store.Batch(func(batch *store.Batch) error {
  398. for _, node := range allocatedNodes {
  399. if err := a.commitAllocatedNode(ctx, batch, node); err != nil {
  400. log.G(ctx).WithError(err).Errorf("Failed to commit allocation of network resources for node %s", node.ID)
  401. }
  402. }
  403. return nil
  404. }); err != nil {
  405. log.G(ctx).WithError(err).Error("Failed to commit allocation of network resources for nodes")
  406. }
  407. return nil
  408. }
  409. func (a *Allocator) deallocateNodes(ctx context.Context) error {
  410. var (
  411. nodes []*api.Node
  412. nc = a.netCtx
  413. err error
  414. )
  415. a.store.View(func(tx store.ReadTx) {
  416. nodes, err = store.FindNodes(tx, store.All)
  417. })
  418. if err != nil {
  419. return fmt.Errorf("error listing all nodes in store while trying to free network resources")
  420. }
  421. for _, node := range nodes {
  422. if nc.nwkAllocator.IsNodeAllocated(node) {
  423. if err := nc.nwkAllocator.DeallocateNode(node); err != nil {
  424. log.G(ctx).WithError(err).Errorf("Failed freeing network resources for node %s", node.ID)
  425. }
  426. node.Attachment = nil
  427. if _, err := a.store.Batch(func(batch *store.Batch) error {
  428. return a.commitAllocatedNode(ctx, batch, node)
  429. }); err != nil {
  430. log.G(ctx).WithError(err).Errorf("Failed to commit deallocation of network resources for node %s", node.ID)
  431. }
  432. }
  433. }
  434. return nil
  435. }
  436. // taskReadyForNetworkVote checks if the task is ready for a network
  437. // vote to move it to PENDING state.
  438. func taskReadyForNetworkVote(t *api.Task, s *api.Service, nc *networkContext) bool {
  439. // Task is ready for vote if the following is true:
  440. //
  441. // Task has no network attached or networks attached but all
  442. // of them allocated AND Task's service has no endpoint or
  443. // network configured or service endpoints have been
  444. // allocated.
  445. return (len(t.Networks) == 0 || nc.nwkAllocator.IsTaskAllocated(t)) &&
  446. (s == nil || nc.nwkAllocator.IsServiceAllocated(s))
  447. }
  448. func taskUpdateNetworks(t *api.Task, networks []*api.NetworkAttachment) {
  449. networksCopy := make([]*api.NetworkAttachment, 0, len(networks))
  450. for _, n := range networks {
  451. networksCopy = append(networksCopy, n.Copy())
  452. }
  453. t.Networks = networksCopy
  454. }
  455. func taskUpdateEndpoint(t *api.Task, endpoint *api.Endpoint) {
  456. t.Endpoint = endpoint.Copy()
  457. }
  458. func isIngressNetworkNeeded(s *api.Service) bool {
  459. if s == nil {
  460. return false
  461. }
  462. if s.Spec.Endpoint == nil {
  463. return false
  464. }
  465. for _, p := range s.Spec.Endpoint.Ports {
  466. // The service to which this task belongs is trying to
  467. // expose ports with PublishMode as Ingress to the
  468. // external world. Automatically attach the task to
  469. // the ingress network.
  470. if p.PublishMode == api.PublishModeIngress {
  471. return true
  472. }
  473. }
  474. return false
  475. }
  476. func (a *Allocator) taskCreateNetworkAttachments(t *api.Task, s *api.Service) {
  477. // If task network attachments have already been filled in no
  478. // need to do anything else.
  479. if len(t.Networks) != 0 {
  480. return
  481. }
  482. var networks []*api.NetworkAttachment
  483. if isIngressNetworkNeeded(s) {
  484. networks = append(networks, &api.NetworkAttachment{Network: a.netCtx.ingressNetwork})
  485. }
  486. a.store.View(func(tx store.ReadTx) {
  487. // Always prefer NetworkAttachmentConfig in the TaskSpec
  488. specNetworks := t.Spec.Networks
  489. if len(specNetworks) == 0 && s != nil && len(s.Spec.Networks) != 0 {
  490. specNetworks = s.Spec.Networks
  491. }
  492. for _, na := range specNetworks {
  493. n := store.GetNetwork(tx, na.Target)
  494. if n == nil {
  495. continue
  496. }
  497. attachment := api.NetworkAttachment{Network: n}
  498. attachment.Aliases = append(attachment.Aliases, na.Aliases...)
  499. attachment.Addresses = append(attachment.Addresses, na.Addresses...)
  500. networks = append(networks, &attachment)
  501. }
  502. })
  503. taskUpdateNetworks(t, networks)
  504. }
  505. func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) {
  506. var (
  507. isDelete bool
  508. t *api.Task
  509. )
  510. switch v := ev.(type) {
  511. case state.EventCreateTask:
  512. t = v.Task.Copy()
  513. case state.EventUpdateTask:
  514. t = v.Task.Copy()
  515. case state.EventDeleteTask:
  516. isDelete = true
  517. t = v.Task.Copy()
  518. }
  519. nc := a.netCtx
  520. // If the task has stopped running then we should free the network
  521. // resources associated with the task right away.
  522. if t.Status.State > api.TaskStateRunning || isDelete {
  523. if nc.nwkAllocator.IsTaskAllocated(t) {
  524. if err := nc.nwkAllocator.DeallocateTask(t); err != nil {
  525. log.G(ctx).WithError(err).Errorf("Failed freeing network resources for task %s", t.ID)
  526. }
  527. }
  528. // Cleanup any task references that might exist
  529. delete(nc.pendingTasks, t.ID)
  530. delete(nc.unallocatedTasks, t.ID)
  531. return
  532. }
  533. // If we are already in allocated state, there is
  534. // absolutely nothing else to do.
  535. if t.Status.State >= api.TaskStatePending {
  536. delete(nc.pendingTasks, t.ID)
  537. delete(nc.unallocatedTasks, t.ID)
  538. return
  539. }
  540. var s *api.Service
  541. if t.ServiceID != "" {
  542. a.store.View(func(tx store.ReadTx) {
  543. s = store.GetService(tx, t.ServiceID)
  544. })
  545. if s == nil {
  546. // If the task is running it is not normal to
  547. // not be able to find the associated
  548. // service. If the task is not running (task
  549. // is either dead or the desired state is set
  550. // to dead) then the service may not be
  551. // available in store. But we still need to
  552. // cleanup network resources associated with
  553. // the task.
  554. if t.Status.State <= api.TaskStateRunning && !isDelete {
  555. log.G(ctx).Errorf("Event %T: Failed to get service %s for task %s state %s: could not find service %s", ev, t.ServiceID, t.ID, t.Status.State, t.ServiceID)
  556. return
  557. }
  558. }
  559. }
  560. // Populate network attachments in the task
  561. // based on service spec.
  562. a.taskCreateNetworkAttachments(t, s)
  563. nc.pendingTasks[t.ID] = t
  564. }
  565. func (a *Allocator) allocateNode(ctx context.Context, node *api.Node) error {
  566. return a.netCtx.nwkAllocator.AllocateNode(node)
  567. }
  568. func (a *Allocator) commitAllocatedNode(ctx context.Context, batch *store.Batch, node *api.Node) error {
  569. if err := batch.Update(func(tx store.Tx) error {
  570. err := store.UpdateNode(tx, node)
  571. if err == store.ErrSequenceConflict {
  572. storeNode := store.GetNode(tx, node.ID)
  573. storeNode.Attachment = node.Attachment.Copy()
  574. err = store.UpdateNode(tx, storeNode)
  575. }
  576. return errors.Wrapf(err, "failed updating state in store transaction for node %s", node.ID)
  577. }); err != nil {
  578. if err := a.netCtx.nwkAllocator.DeallocateNode(node); err != nil {
  579. log.G(ctx).WithError(err).Errorf("failed rolling back allocation of node %s", node.ID)
  580. }
  581. return err
  582. }
  583. return nil
  584. }
  585. // This function prepares the service object for being updated when the change regards
  586. // the published ports in host mode: It resets the runtime state ports (s.Endpoint.Ports)
  587. // to the current ingress mode runtime state ports plus the newly configured publish mode ports,
  588. // so that the service allocation invoked on this new service object will trigger the deallocation
  589. // of any old publish mode port and allocation of any new one.
  590. func updatePortsInHostPublishMode(s *api.Service) {
  591. if s.Endpoint != nil {
  592. var portConfigs []*api.PortConfig
  593. for _, portConfig := range s.Endpoint.Ports {
  594. if portConfig.PublishMode == api.PublishModeIngress {
  595. portConfigs = append(portConfigs, portConfig)
  596. }
  597. }
  598. s.Endpoint.Ports = portConfigs
  599. }
  600. if s.Spec.Endpoint != nil {
  601. if s.Endpoint == nil {
  602. s.Endpoint = &api.Endpoint{}
  603. }
  604. for _, portConfig := range s.Spec.Endpoint.Ports {
  605. if portConfig.PublishMode == api.PublishModeIngress {
  606. continue
  607. }
  608. s.Endpoint.Ports = append(s.Endpoint.Ports, portConfig.Copy())
  609. }
  610. s.Endpoint.Spec = s.Spec.Endpoint.Copy()
  611. }
  612. }
  613. func (a *Allocator) allocateService(ctx context.Context, s *api.Service) error {
  614. nc := a.netCtx
  615. if s.Spec.Endpoint != nil {
  616. // service has user-defined endpoint
  617. if s.Endpoint == nil {
  618. // service currently has no allocated endpoint, need allocated.
  619. s.Endpoint = &api.Endpoint{
  620. Spec: s.Spec.Endpoint.Copy(),
  621. }
  622. }
  623. // The service is trying to expose ports to the external
  624. // world. Automatically attach the service to the ingress
  625. // network only if it is not already done.
  626. if isIngressNetworkNeeded(s) {
  627. if nc.ingressNetwork == nil {
  628. return fmt.Errorf("ingress network is missing")
  629. }
  630. var found bool
  631. for _, vip := range s.Endpoint.VirtualIPs {
  632. if vip.NetworkID == nc.ingressNetwork.ID {
  633. found = true
  634. break
  635. }
  636. }
  637. if !found {
  638. s.Endpoint.VirtualIPs = append(s.Endpoint.VirtualIPs,
  639. &api.Endpoint_VirtualIP{NetworkID: nc.ingressNetwork.ID})
  640. }
  641. }
  642. } else if s.Endpoint != nil {
  643. // service has no user-defined endpoints while has already allocated network resources,
  644. // need deallocated.
  645. if err := nc.nwkAllocator.ServiceDeallocate(s); err != nil {
  646. return err
  647. }
  648. }
  649. if err := nc.nwkAllocator.ServiceAllocate(s); err != nil {
  650. nc.unallocatedServices[s.ID] = s
  651. return err
  652. }
  653. // If the service doesn't expose ports any more and if we have
  654. // any lingering virtual IP references for ingress network
  655. // clean them up here.
  656. if !isIngressNetworkNeeded(s) {
  657. if s.Endpoint != nil {
  658. for i, vip := range s.Endpoint.VirtualIPs {
  659. if vip.NetworkID == nc.ingressNetwork.ID {
  660. n := len(s.Endpoint.VirtualIPs)
  661. s.Endpoint.VirtualIPs[i], s.Endpoint.VirtualIPs[n-1] = s.Endpoint.VirtualIPs[n-1], nil
  662. s.Endpoint.VirtualIPs = s.Endpoint.VirtualIPs[:n-1]
  663. break
  664. }
  665. }
  666. }
  667. }
  668. return nil
  669. }
  670. func (a *Allocator) commitAllocatedService(ctx context.Context, batch *store.Batch, s *api.Service) error {
  671. if err := batch.Update(func(tx store.Tx) error {
  672. err := store.UpdateService(tx, s)
  673. if err == store.ErrSequenceConflict {
  674. storeService := store.GetService(tx, s.ID)
  675. storeService.Endpoint = s.Endpoint
  676. err = store.UpdateService(tx, storeService)
  677. }
  678. return errors.Wrapf(err, "failed updating state in store transaction for service %s", s.ID)
  679. }); err != nil {
  680. if err := a.netCtx.nwkAllocator.ServiceDeallocate(s); err != nil {
  681. log.G(ctx).WithError(err).Errorf("failed rolling back allocation of service %s", s.ID)
  682. }
  683. return err
  684. }
  685. return nil
  686. }
  687. func (a *Allocator) allocateNetwork(ctx context.Context, n *api.Network) error {
  688. nc := a.netCtx
  689. if err := nc.nwkAllocator.Allocate(n); err != nil {
  690. nc.unallocatedNetworks[n.ID] = n
  691. return errors.Wrapf(err, "failed during network allocation for network %s", n.ID)
  692. }
  693. return nil
  694. }
  695. func (a *Allocator) commitAllocatedNetwork(ctx context.Context, batch *store.Batch, n *api.Network) error {
  696. if err := batch.Update(func(tx store.Tx) error {
  697. if err := store.UpdateNetwork(tx, n); err != nil {
  698. return errors.Wrapf(err, "failed updating state in store transaction for network %s", n.ID)
  699. }
  700. return nil
  701. }); err != nil {
  702. if err := a.netCtx.nwkAllocator.Deallocate(n); err != nil {
  703. log.G(ctx).WithError(err).Errorf("failed rolling back allocation of network %s", n.ID)
  704. }
  705. return err
  706. }
  707. return nil
  708. }
  709. func (a *Allocator) allocateTask(ctx context.Context, t *api.Task) (err error) {
  710. taskUpdated := false
  711. nc := a.netCtx
  712. // We might be here even if a task allocation has already
  713. // happened but wasn't successfully committed to store. In such
  714. // cases skip allocation and go straight ahead to updating the
  715. // store.
  716. if !nc.nwkAllocator.IsTaskAllocated(t) {
  717. a.store.View(func(tx store.ReadTx) {
  718. if t.ServiceID != "" {
  719. s := store.GetService(tx, t.ServiceID)
  720. if s == nil {
  721. err = fmt.Errorf("could not find service %s", t.ServiceID)
  722. return
  723. }
  724. if !nc.nwkAllocator.IsServiceAllocated(s) {
  725. err = fmt.Errorf("service %s to which this task %s belongs has pending allocations", s.ID, t.ID)
  726. return
  727. }
  728. if s.Endpoint != nil {
  729. taskUpdateEndpoint(t, s.Endpoint)
  730. taskUpdated = true
  731. }
  732. }
  733. for _, na := range t.Networks {
  734. n := store.GetNetwork(tx, na.Network.ID)
  735. if n == nil {
  736. err = fmt.Errorf("failed to retrieve network %s while allocating task %s", na.Network.ID, t.ID)
  737. return
  738. }
  739. if !nc.nwkAllocator.IsAllocated(n) {
  740. err = fmt.Errorf("network %s attached to task %s not allocated yet", n.ID, t.ID)
  741. return
  742. }
  743. na.Network = n
  744. }
  745. if err = nc.nwkAllocator.AllocateTask(t); err != nil {
  746. err = errors.Wrapf(err, "failed during networktask allocation for task %s", t.ID)
  747. return
  748. }
  749. if nc.nwkAllocator.IsTaskAllocated(t) {
  750. taskUpdated = true
  751. }
  752. })
  753. if err != nil {
  754. return err
  755. }
  756. }
  757. // Update the network allocations and moving to
  758. // PENDING state on top of the latest store state.
  759. if a.taskAllocateVote(networkVoter, t.ID) {
  760. if t.Status.State < api.TaskStatePending {
  761. updateTaskStatus(t, api.TaskStatePending, allocatedStatusMessage)
  762. taskUpdated = true
  763. }
  764. }
  765. if !taskUpdated {
  766. return errNoChanges
  767. }
  768. return nil
  769. }
  770. func (a *Allocator) commitAllocatedTask(ctx context.Context, batch *store.Batch, t *api.Task) error {
  771. return batch.Update(func(tx store.Tx) error {
  772. err := store.UpdateTask(tx, t)
  773. if err == store.ErrSequenceConflict {
  774. storeTask := store.GetTask(tx, t.ID)
  775. taskUpdateNetworks(storeTask, t.Networks)
  776. taskUpdateEndpoint(storeTask, t.Endpoint)
  777. if storeTask.Status.State < api.TaskStatePending {
  778. storeTask.Status = t.Status
  779. }
  780. err = store.UpdateTask(tx, storeTask)
  781. }
  782. return errors.Wrapf(err, "failed updating state in store transaction for task %s", t.ID)
  783. })
  784. }
  785. func (a *Allocator) procUnallocatedNetworks(ctx context.Context) {
  786. nc := a.netCtx
  787. var allocatedNetworks []*api.Network
  788. for _, n := range nc.unallocatedNetworks {
  789. if !nc.nwkAllocator.IsAllocated(n) {
  790. if err := a.allocateNetwork(ctx, n); err != nil {
  791. log.G(ctx).WithError(err).Debugf("Failed allocation of unallocated network %s", n.ID)
  792. continue
  793. }
  794. allocatedNetworks = append(allocatedNetworks, n)
  795. }
  796. }
  797. if len(allocatedNetworks) == 0 {
  798. return
  799. }
  800. committed, err := a.store.Batch(func(batch *store.Batch) error {
  801. for _, n := range allocatedNetworks {
  802. if err := a.commitAllocatedNetwork(ctx, batch, n); err != nil {
  803. log.G(ctx).WithError(err).Debugf("Failed to commit allocation of unallocated network %s", n.ID)
  804. continue
  805. }
  806. }
  807. return nil
  808. })
  809. if err != nil {
  810. log.G(ctx).WithError(err).Error("Failed to commit allocation of unallocated networks")
  811. }
  812. for _, n := range allocatedNetworks[:committed] {
  813. delete(nc.unallocatedNetworks, n.ID)
  814. }
  815. }
  816. func (a *Allocator) procUnallocatedServices(ctx context.Context) {
  817. nc := a.netCtx
  818. var allocatedServices []*api.Service
  819. for _, s := range nc.unallocatedServices {
  820. if !nc.nwkAllocator.IsServiceAllocated(s) {
  821. if err := a.allocateService(ctx, s); err != nil {
  822. log.G(ctx).WithError(err).Debugf("Failed allocation of unallocated service %s", s.ID)
  823. continue
  824. }
  825. allocatedServices = append(allocatedServices, s)
  826. }
  827. }
  828. if len(allocatedServices) == 0 {
  829. return
  830. }
  831. committed, err := a.store.Batch(func(batch *store.Batch) error {
  832. for _, s := range allocatedServices {
  833. if err := a.commitAllocatedService(ctx, batch, s); err != nil {
  834. log.G(ctx).WithError(err).Debugf("Failed to commit allocation of unallocated service %s", s.ID)
  835. continue
  836. }
  837. }
  838. return nil
  839. })
  840. if err != nil {
  841. log.G(ctx).WithError(err).Error("Failed to commit allocation of unallocated services")
  842. }
  843. for _, s := range allocatedServices[:committed] {
  844. delete(nc.unallocatedServices, s.ID)
  845. }
  846. }
  847. func (a *Allocator) procTasksNetwork(ctx context.Context, onRetry bool) {
  848. nc := a.netCtx
  849. quiet := false
  850. toAllocate := nc.pendingTasks
  851. if onRetry {
  852. toAllocate = nc.unallocatedTasks
  853. quiet = true
  854. }
  855. allocatedTasks := make([]*api.Task, 0, len(toAllocate))
  856. for _, t := range toAllocate {
  857. if err := a.allocateTask(ctx, t); err == nil {
  858. allocatedTasks = append(allocatedTasks, t)
  859. } else if err != errNoChanges {
  860. if quiet {
  861. log.G(ctx).WithError(err).Debug("task allocation failure")
  862. } else {
  863. log.G(ctx).WithError(err).Error("task allocation failure")
  864. }
  865. }
  866. }
  867. if len(allocatedTasks) == 0 {
  868. return
  869. }
  870. committed, err := a.store.Batch(func(batch *store.Batch) error {
  871. for _, t := range allocatedTasks {
  872. err := a.commitAllocatedTask(ctx, batch, t)
  873. if err != nil {
  874. log.G(ctx).WithError(err).Error("task allocation commit failure")
  875. continue
  876. }
  877. }
  878. return nil
  879. })
  880. if err != nil {
  881. log.G(ctx).WithError(err).Error("failed a store batch operation while processing tasks")
  882. }
  883. for _, t := range allocatedTasks[:committed] {
  884. delete(toAllocate, t.ID)
  885. }
  886. }
  887. // updateTaskStatus sets TaskStatus and updates timestamp.
  888. func updateTaskStatus(t *api.Task, newStatus api.TaskState, message string) {
  889. t.Status.State = newStatus
  890. t.Status.Message = message
  891. t.Status.Timestamp = ptypes.MustTimestampProto(time.Now())
  892. }
  893. // IsIngressNetwork returns whether the passed network is an ingress network.
  894. func IsIngressNetwork(nw *api.Network) bool {
  895. if nw.Spec.Ingress {
  896. return true
  897. }
  898. // Check if legacy defined ingress network
  899. _, ok := nw.Spec.Annotations.Labels["com.docker.swarm.internal"]
  900. return ok && nw.Spec.Annotations.Name == "ingress"
  901. }
  902. // GetIngressNetwork fetches the ingress network from store.
  903. // ErrNoIngress will be returned if the ingress network is not present,
  904. // nil otherwise. In case of any other failure in accessing the store,
  905. // the respective error will be reported as is.
  906. func GetIngressNetwork(s *store.MemoryStore) (*api.Network, error) {
  907. var (
  908. networks []*api.Network
  909. err error
  910. )
  911. s.View(func(tx store.ReadTx) {
  912. networks, err = store.FindNetworks(tx, store.All)
  913. })
  914. if err != nil {
  915. return nil, err
  916. }
  917. for _, n := range networks {
  918. if IsIngressNetwork(n) {
  919. return n, nil
  920. }
  921. }
  922. return nil, ErrNoIngress
  923. }