service_common.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411
  1. //go:build linux || windows
  2. package libnetwork
  3. import (
  4. "context"
  5. "net"
  6. "github.com/containerd/log"
  7. )
  8. const maxSetStringLen = 350
  9. func (c *Controller) addEndpointNameResolution(svcName, svcID, nID, eID, containerName string, vip net.IP, serviceAliases, taskAliases []string, ip net.IP, addService bool, method string) error {
  10. n, err := c.NetworkByID(nID)
  11. if err != nil {
  12. return err
  13. }
  14. log.G(context.TODO()).Debugf("addEndpointNameResolution %s %s add_service:%t sAliases:%v tAliases:%v", eID, svcName, addService, serviceAliases, taskAliases)
  15. // Add container resolution mappings
  16. if err := c.addContainerNameResolution(nID, eID, containerName, taskAliases, ip, method); err != nil {
  17. return err
  18. }
  19. serviceID := svcID
  20. if serviceID == "" {
  21. // This is the case of a normal container not part of a service
  22. serviceID = eID
  23. }
  24. // Add endpoint IP to special "tasks.svc_name" so that the applications have access to DNS RR.
  25. n.addSvcRecords(eID, "tasks."+svcName, serviceID, ip, nil, false, method)
  26. for _, alias := range serviceAliases {
  27. n.addSvcRecords(eID, "tasks."+alias, serviceID, ip, nil, false, method)
  28. }
  29. // Add service name to vip in DNS, if vip is valid. Otherwise resort to DNS RR
  30. if len(vip) == 0 {
  31. n.addSvcRecords(eID, svcName, serviceID, ip, nil, false, method)
  32. for _, alias := range serviceAliases {
  33. n.addSvcRecords(eID, alias, serviceID, ip, nil, false, method)
  34. }
  35. }
  36. if addService && len(vip) != 0 {
  37. n.addSvcRecords(eID, svcName, serviceID, vip, nil, false, method)
  38. for _, alias := range serviceAliases {
  39. n.addSvcRecords(eID, alias, serviceID, vip, nil, false, method)
  40. }
  41. }
  42. return nil
  43. }
  44. func (c *Controller) addContainerNameResolution(nID, eID, containerName string, taskAliases []string, ip net.IP, method string) error {
  45. n, err := c.NetworkByID(nID)
  46. if err != nil {
  47. return err
  48. }
  49. log.G(context.TODO()).Debugf("addContainerNameResolution %s %s", eID, containerName)
  50. // Add resolution for container name
  51. n.addSvcRecords(eID, containerName, eID, ip, nil, true, method)
  52. // Add resolution for taskaliases
  53. for _, alias := range taskAliases {
  54. n.addSvcRecords(eID, alias, eID, ip, nil, false, method)
  55. }
  56. return nil
  57. }
  58. func (c *Controller) deleteEndpointNameResolution(svcName, svcID, nID, eID, containerName string, vip net.IP, serviceAliases, taskAliases []string, ip net.IP, rmService, multipleEntries bool, method string) error {
  59. n, err := c.NetworkByID(nID)
  60. if err != nil {
  61. return err
  62. }
  63. log.G(context.TODO()).Debugf("deleteEndpointNameResolution %s %s rm_service:%t suppress:%t sAliases:%v tAliases:%v", eID, svcName, rmService, multipleEntries, serviceAliases, taskAliases)
  64. // Delete container resolution mappings
  65. if err := c.delContainerNameResolution(nID, eID, containerName, taskAliases, ip, method); err != nil {
  66. log.G(context.TODO()).WithError(err).Warn("Error delting container from resolver")
  67. }
  68. serviceID := svcID
  69. if serviceID == "" {
  70. // This is the case of a normal container not part of a service
  71. serviceID = eID
  72. }
  73. // Delete the special "tasks.svc_name" backend record.
  74. if !multipleEntries {
  75. n.deleteSvcRecords(eID, "tasks."+svcName, serviceID, ip, nil, false, method)
  76. for _, alias := range serviceAliases {
  77. n.deleteSvcRecords(eID, "tasks."+alias, serviceID, ip, nil, false, method)
  78. }
  79. }
  80. // If we are doing DNS RR delete the endpoint IP from DNS record right away.
  81. if !multipleEntries && len(vip) == 0 {
  82. n.deleteSvcRecords(eID, svcName, serviceID, ip, nil, false, method)
  83. for _, alias := range serviceAliases {
  84. n.deleteSvcRecords(eID, alias, serviceID, ip, nil, false, method)
  85. }
  86. }
  87. // Remove the DNS record for VIP only if we are removing the service
  88. if rmService && len(vip) != 0 && !multipleEntries {
  89. n.deleteSvcRecords(eID, svcName, serviceID, vip, nil, false, method)
  90. for _, alias := range serviceAliases {
  91. n.deleteSvcRecords(eID, alias, serviceID, vip, nil, false, method)
  92. }
  93. }
  94. return nil
  95. }
  96. func (c *Controller) delContainerNameResolution(nID, eID, containerName string, taskAliases []string, ip net.IP, method string) error {
  97. n, err := c.NetworkByID(nID)
  98. if err != nil {
  99. return err
  100. }
  101. log.G(context.TODO()).Debugf("delContainerNameResolution %s %s", eID, containerName)
  102. // Delete resolution for container name
  103. n.deleteSvcRecords(eID, containerName, eID, ip, nil, true, method)
  104. // Delete resolution for taskaliases
  105. for _, alias := range taskAliases {
  106. n.deleteSvcRecords(eID, alias, eID, ip, nil, true, method)
  107. }
  108. return nil
  109. }
  110. func newService(name string, id string, ingressPorts []*PortConfig, serviceAliases []string) *service {
  111. return &service{
  112. name: name,
  113. id: id,
  114. ingressPorts: ingressPorts,
  115. loadBalancers: make(map[string]*loadBalancer),
  116. aliases: serviceAliases,
  117. }
  118. }
  119. func (c *Controller) getLBIndex(sid, nid string, ingressPorts []*PortConfig) int {
  120. skey := serviceKey{
  121. id: sid,
  122. ports: portConfigs(ingressPorts).String(),
  123. }
  124. c.mu.Lock()
  125. s, ok := c.serviceBindings[skey]
  126. c.mu.Unlock()
  127. if !ok {
  128. return 0
  129. }
  130. s.Lock()
  131. lb := s.loadBalancers[nid]
  132. s.Unlock()
  133. return int(lb.fwMark)
  134. }
  135. // cleanupServiceDiscovery when the network is being deleted, erase all the associated service discovery records
  136. func (c *Controller) cleanupServiceDiscovery(cleanupNID string) {
  137. c.mu.Lock()
  138. defer c.mu.Unlock()
  139. if cleanupNID == "" {
  140. log.G(context.TODO()).Debugf("cleanupServiceDiscovery for all networks")
  141. c.svcRecords = make(map[string]*svcInfo)
  142. return
  143. }
  144. log.G(context.TODO()).Debugf("cleanupServiceDiscovery for network:%s", cleanupNID)
  145. delete(c.svcRecords, cleanupNID)
  146. }
  147. func (c *Controller) cleanupServiceBindings(cleanupNID string) {
  148. var cleanupFuncs []func()
  149. log.G(context.TODO()).Debugf("cleanupServiceBindings for %s", cleanupNID)
  150. c.mu.Lock()
  151. services := make([]*service, 0, len(c.serviceBindings))
  152. for _, s := range c.serviceBindings {
  153. services = append(services, s)
  154. }
  155. c.mu.Unlock()
  156. for _, s := range services {
  157. s.Lock()
  158. // Skip the serviceBindings that got deleted
  159. if s.deleted {
  160. s.Unlock()
  161. continue
  162. }
  163. for nid, lb := range s.loadBalancers {
  164. if cleanupNID != "" && nid != cleanupNID {
  165. continue
  166. }
  167. for eid, be := range lb.backEnds {
  168. cleanupFuncs = append(cleanupFuncs, makeServiceCleanupFunc(c, s, nid, eid, lb.vip, be.ip))
  169. }
  170. }
  171. s.Unlock()
  172. }
  173. for _, f := range cleanupFuncs {
  174. f()
  175. }
  176. }
  177. func makeServiceCleanupFunc(c *Controller, s *service, nID, eID string, vip net.IP, ip net.IP) func() {
  178. // ContainerName and taskAliases are not available here, this is still fine because the Service discovery
  179. // cleanup already happened before. The only thing that rmServiceBinding is still doing here a part from the Load
  180. // Balancer bookeeping, is to keep consistent the mapping of endpoint to IP.
  181. return func() {
  182. if err := c.rmServiceBinding(s.name, s.id, nID, eID, "", vip, s.ingressPorts, s.aliases, []string{}, ip, "cleanupServiceBindings", false, true); err != nil {
  183. log.G(context.TODO()).Errorf("Failed to remove service bindings for service %s network %s endpoint %s while cleanup: %v", s.id, nID, eID, err)
  184. }
  185. }
  186. }
  187. func (c *Controller) addServiceBinding(svcName, svcID, nID, eID, containerName string, vip net.IP, ingressPorts []*PortConfig, serviceAliases, taskAliases []string, ip net.IP, method string) error {
  188. var addService bool
  189. // Failure to lock the network ID on add can result in racing
  190. // against network deletion resulting in inconsistent state
  191. // in the c.serviceBindings map and it's sub-maps. Also,
  192. // always lock network ID before services to avoid deadlock.
  193. c.networkLocker.Lock(nID)
  194. defer c.networkLocker.Unlock(nID) //nolint:errcheck
  195. n, err := c.NetworkByID(nID)
  196. if err != nil {
  197. return err
  198. }
  199. skey := serviceKey{
  200. id: svcID,
  201. ports: portConfigs(ingressPorts).String(),
  202. }
  203. var s *service
  204. for {
  205. c.mu.Lock()
  206. var ok bool
  207. s, ok = c.serviceBindings[skey]
  208. if !ok {
  209. // Create a new service if we are seeing this service
  210. // for the first time.
  211. s = newService(svcName, svcID, ingressPorts, serviceAliases)
  212. c.serviceBindings[skey] = s
  213. }
  214. c.mu.Unlock()
  215. s.Lock()
  216. if !s.deleted {
  217. // ok the object is good to be used
  218. break
  219. }
  220. s.Unlock()
  221. }
  222. log.G(context.TODO()).Debugf("addServiceBinding from %s START for %s %s p:%p nid:%s skey:%v", method, svcName, eID, s, nID, skey)
  223. defer s.Unlock()
  224. lb, ok := s.loadBalancers[nID]
  225. if !ok {
  226. // Create a new load balancer if we are seeing this
  227. // network attachment on the service for the first
  228. // time.
  229. fwMarkCtrMu.Lock()
  230. lb = &loadBalancer{
  231. vip: vip,
  232. fwMark: fwMarkCtr,
  233. backEnds: make(map[string]*lbBackend),
  234. service: s,
  235. }
  236. fwMarkCtr++
  237. fwMarkCtrMu.Unlock()
  238. s.loadBalancers[nID] = lb
  239. addService = true
  240. }
  241. lb.backEnds[eID] = &lbBackend{ip, false}
  242. ok, entries := s.assignIPToEndpoint(ip.String(), eID)
  243. if !ok || entries > 1 {
  244. setStr, b := s.printIPToEndpoint(ip.String())
  245. if len(setStr) > maxSetStringLen {
  246. setStr = setStr[:maxSetStringLen]
  247. }
  248. log.G(context.TODO()).Warnf("addServiceBinding %s possible transient state ok:%t entries:%d set:%t %s", eID, ok, entries, b, setStr)
  249. }
  250. // Add loadbalancer service and backend to the network
  251. n.addLBBackend(ip, lb)
  252. // Add the appropriate name resolutions
  253. if err := c.addEndpointNameResolution(svcName, svcID, nID, eID, containerName, vip, serviceAliases, taskAliases, ip, addService, "addServiceBinding"); err != nil {
  254. return err
  255. }
  256. log.G(context.TODO()).Debugf("addServiceBinding from %s END for %s %s", method, svcName, eID)
  257. return nil
  258. }
  259. func (c *Controller) rmServiceBinding(svcName, svcID, nID, eID, containerName string, vip net.IP, ingressPorts []*PortConfig, serviceAliases []string, taskAliases []string, ip net.IP, method string, deleteSvcRecords bool, fullRemove bool) error {
  260. var rmService bool
  261. skey := serviceKey{
  262. id: svcID,
  263. ports: portConfigs(ingressPorts).String(),
  264. }
  265. c.mu.Lock()
  266. s, ok := c.serviceBindings[skey]
  267. c.mu.Unlock()
  268. if !ok {
  269. log.G(context.TODO()).Warnf("rmServiceBinding %s %s %s aborted c.serviceBindings[skey] !ok", method, svcName, eID)
  270. return nil
  271. }
  272. s.Lock()
  273. defer s.Unlock()
  274. log.G(context.TODO()).Debugf("rmServiceBinding from %s START for %s %s p:%p nid:%s sKey:%v deleteSvc:%t", method, svcName, eID, s, nID, skey, deleteSvcRecords)
  275. lb, ok := s.loadBalancers[nID]
  276. if !ok {
  277. log.G(context.TODO()).Warnf("rmServiceBinding %s %s %s aborted s.loadBalancers[nid] !ok", method, svcName, eID)
  278. return nil
  279. }
  280. be, ok := lb.backEnds[eID]
  281. if !ok {
  282. log.G(context.TODO()).Warnf("rmServiceBinding %s %s %s aborted lb.backEnds[eid] && lb.disabled[eid] !ok", method, svcName, eID)
  283. return nil
  284. }
  285. if fullRemove {
  286. // delete regardless
  287. delete(lb.backEnds, eID)
  288. } else {
  289. be.disabled = true
  290. }
  291. if len(lb.backEnds) == 0 {
  292. // All the backends for this service have been
  293. // removed. Time to remove the load balancer and also
  294. // remove the service entry in IPVS.
  295. rmService = true
  296. delete(s.loadBalancers, nID)
  297. log.G(context.TODO()).Debugf("rmServiceBinding %s delete %s, p:%p in loadbalancers len:%d", eID, nID, lb, len(s.loadBalancers))
  298. }
  299. ok, entries := s.removeIPToEndpoint(ip.String(), eID)
  300. if !ok || entries > 0 {
  301. setStr, b := s.printIPToEndpoint(ip.String())
  302. if len(setStr) > maxSetStringLen {
  303. setStr = setStr[:maxSetStringLen]
  304. }
  305. log.G(context.TODO()).Warnf("rmServiceBinding %s possible transient state ok:%t entries:%d set:%t %s", eID, ok, entries, b, setStr)
  306. }
  307. // Remove loadbalancer service(if needed) and backend in all
  308. // sandboxes in the network only if the vip is valid.
  309. if entries == 0 {
  310. // The network may well have been deleted from the store (and
  311. // dataplane) before the last of the service bindings. On Linux that's
  312. // ok because removing the network sandbox from the dataplane
  313. // implicitly cleans up all related dataplane state.
  314. // On the Windows dataplane, VFP policylists must be removed
  315. // independently of the network, and they must be removed before the HNS
  316. // network. Otherwise, policylist removal fails with "network not
  317. // found." On Windows cleanupServiceBindings must be called prior to
  318. // removing the network from the store or dataplane.
  319. n, err := c.NetworkByID(nID)
  320. if err == nil {
  321. n.rmLBBackend(ip, lb, rmService, fullRemove)
  322. }
  323. }
  324. // Delete the name resolutions
  325. if deleteSvcRecords {
  326. if err := c.deleteEndpointNameResolution(svcName, svcID, nID, eID, containerName, vip, serviceAliases, taskAliases, ip, rmService, entries > 0, "rmServiceBinding"); err != nil {
  327. return err
  328. }
  329. }
  330. if len(s.loadBalancers) == 0 {
  331. // All loadbalancers for the service removed. Time to
  332. // remove the service itself.
  333. c.mu.Lock()
  334. // Mark the object as deleted so that the add won't use it wrongly
  335. s.deleted = true
  336. // NOTE The delete from the serviceBindings map has to be the last operation else we are allowing a race between this service
  337. // that is getting deleted and a new service that will be created if the entry is not anymore there
  338. delete(c.serviceBindings, skey)
  339. c.mu.Unlock()
  340. }
  341. log.G(context.TODO()).Debugf("rmServiceBinding from %s END for %s %s", method, svcName, eID)
  342. return nil
  343. }