agent.go 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980
  1. package libnetwork
  2. //go:generate protoc -I=. -I=../vendor/ --gogofaster_out=import_path=github.com/docker/docker/libnetwork:. agent.proto
  3. import (
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "net"
  8. "sort"
  9. "sync"
  10. "github.com/containerd/log"
  11. "github.com/docker/docker/libnetwork/cluster"
  12. "github.com/docker/docker/libnetwork/discoverapi"
  13. "github.com/docker/docker/libnetwork/driverapi"
  14. "github.com/docker/docker/libnetwork/networkdb"
  15. "github.com/docker/docker/libnetwork/scope"
  16. "github.com/docker/docker/libnetwork/types"
  17. "github.com/docker/go-events"
  18. "github.com/gogo/protobuf/proto"
  19. )
  20. const (
  21. subsysGossip = "networking:gossip"
  22. subsysIPSec = "networking:ipsec"
  23. keyringSize = 3
  24. )
  25. // ByTime implements sort.Interface for []*types.EncryptionKey based on
  26. // the LamportTime field.
  27. type ByTime []*types.EncryptionKey
  28. func (b ByTime) Len() int { return len(b) }
  29. func (b ByTime) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
  30. func (b ByTime) Less(i, j int) bool { return b[i].LamportTime < b[j].LamportTime }
  31. type nwAgent struct {
  32. networkDB *networkdb.NetworkDB
  33. bindAddr net.IP
  34. advertiseAddr string
  35. dataPathAddr string
  36. coreCancelFuncs []func()
  37. driverCancelFuncs map[string][]func()
  38. mu sync.Mutex
  39. }
  40. func (a *nwAgent) dataPathAddress() string {
  41. a.mu.Lock()
  42. defer a.mu.Unlock()
  43. if a.dataPathAddr != "" {
  44. return a.dataPathAddr
  45. }
  46. return a.advertiseAddr
  47. }
  48. const libnetworkEPTable = "endpoint_table"
  49. func getBindAddr(ifaceName string) (net.IP, error) {
  50. iface, err := net.InterfaceByName(ifaceName)
  51. if err != nil {
  52. return nil, fmt.Errorf("failed to find interface %s: %v", ifaceName, err)
  53. }
  54. addrs, err := iface.Addrs()
  55. if err != nil {
  56. return nil, fmt.Errorf("failed to get interface addresses: %v", err)
  57. }
  58. for _, a := range addrs {
  59. addr, ok := a.(*net.IPNet)
  60. if !ok {
  61. continue
  62. }
  63. addrIP := addr.IP
  64. if addrIP.IsLinkLocalUnicast() {
  65. continue
  66. }
  67. return addrIP, nil
  68. }
  69. return nil, fmt.Errorf("failed to get bind address")
  70. }
  71. // resolveAddr resolves the given address, which can be one of, and
  72. // parsed in the following order or priority:
  73. //
  74. // - a well-formed IP-address
  75. // - a hostname
  76. // - an interface-name
  77. func resolveAddr(addrOrInterface string) (net.IP, error) {
  78. // Try and see if this is a valid IP address
  79. if ip := net.ParseIP(addrOrInterface); ip != nil {
  80. return ip, nil
  81. }
  82. // If not a valid IP address, it could be a hostname.
  83. addr, err := net.ResolveIPAddr("ip", addrOrInterface)
  84. if err != nil {
  85. // If hostname lookup failed, try to look for an interface with the given name.
  86. return getBindAddr(addrOrInterface)
  87. }
  88. return addr.IP, nil
  89. }
  90. func (c *Controller) handleKeyChange(keys []*types.EncryptionKey) error {
  91. drvEnc := discoverapi.DriverEncryptionUpdate{}
  92. agent := c.getAgent()
  93. if agent == nil {
  94. log.G(context.TODO()).Debug("Skipping key change as agent is nil")
  95. return nil
  96. }
  97. // Find the deleted key. If the deleted key was the primary key,
  98. // a new primary key should be set before removing if from keyring.
  99. c.mu.Lock()
  100. added := []byte{}
  101. deleted := []byte{}
  102. j := len(c.keys)
  103. for i := 0; i < j; {
  104. same := false
  105. for _, key := range keys {
  106. if same = key.LamportTime == c.keys[i].LamportTime; same {
  107. break
  108. }
  109. }
  110. if !same {
  111. cKey := c.keys[i]
  112. if cKey.Subsystem == subsysGossip {
  113. deleted = cKey.Key
  114. }
  115. if cKey.Subsystem == subsysIPSec {
  116. drvEnc.Prune = cKey.Key
  117. drvEnc.PruneTag = cKey.LamportTime
  118. }
  119. c.keys[i], c.keys[j-1] = c.keys[j-1], c.keys[i]
  120. c.keys[j-1] = nil
  121. j--
  122. }
  123. i++
  124. }
  125. c.keys = c.keys[:j]
  126. // Find the new key and add it to the key ring
  127. for _, key := range keys {
  128. same := false
  129. for _, cKey := range c.keys {
  130. if same = cKey.LamportTime == key.LamportTime; same {
  131. break
  132. }
  133. }
  134. if !same {
  135. c.keys = append(c.keys, key)
  136. if key.Subsystem == subsysGossip {
  137. added = key.Key
  138. }
  139. if key.Subsystem == subsysIPSec {
  140. drvEnc.Key = key.Key
  141. drvEnc.Tag = key.LamportTime
  142. }
  143. }
  144. }
  145. c.mu.Unlock()
  146. if len(added) > 0 {
  147. agent.networkDB.SetKey(added)
  148. }
  149. key, _, err := c.getPrimaryKeyTag(subsysGossip)
  150. if err != nil {
  151. return err
  152. }
  153. agent.networkDB.SetPrimaryKey(key)
  154. key, tag, err := c.getPrimaryKeyTag(subsysIPSec)
  155. if err != nil {
  156. return err
  157. }
  158. drvEnc.Primary = key
  159. drvEnc.PrimaryTag = tag
  160. if len(deleted) > 0 {
  161. agent.networkDB.RemoveKey(deleted)
  162. }
  163. c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
  164. dr, ok := driver.(discoverapi.Discover)
  165. if !ok {
  166. return false
  167. }
  168. if err := dr.DiscoverNew(discoverapi.EncryptionKeysUpdate, drvEnc); err != nil {
  169. log.G(context.TODO()).Warnf("Failed to update datapath keys in driver %s: %v", name, err)
  170. // Attempt to reconfigure keys in case of a update failure
  171. // which can arise due to a mismatch of keys
  172. // if worker nodes get temporarily disconnected
  173. log.G(context.TODO()).Warnf("Reconfiguring datapath keys for %s", name)
  174. drvCfgEnc := discoverapi.DriverEncryptionConfig{}
  175. drvCfgEnc.Keys, drvCfgEnc.Tags = c.getKeys(subsysIPSec)
  176. err = dr.DiscoverNew(discoverapi.EncryptionKeysConfig, drvCfgEnc)
  177. if err != nil {
  178. log.G(context.TODO()).Warnf("Failed to reset datapath keys in driver %s: %v", name, err)
  179. }
  180. }
  181. return false
  182. })
  183. return nil
  184. }
  185. func (c *Controller) agentSetup(clusterProvider cluster.Provider) error {
  186. agent := c.getAgent()
  187. if agent != nil {
  188. // agent is already present, so there is no need initialize it again.
  189. return nil
  190. }
  191. bindAddr := clusterProvider.GetLocalAddress()
  192. advAddr := clusterProvider.GetAdvertiseAddress()
  193. dataAddr := clusterProvider.GetDataPathAddress()
  194. remoteList := clusterProvider.GetRemoteAddressList()
  195. remoteAddrList := make([]string, 0, len(remoteList))
  196. for _, remote := range remoteList {
  197. addr, _, _ := net.SplitHostPort(remote)
  198. remoteAddrList = append(remoteAddrList, addr)
  199. }
  200. listen := clusterProvider.GetListenAddress()
  201. listenAddr, _, _ := net.SplitHostPort(listen)
  202. log.G(context.TODO()).WithFields(log.Fields{
  203. "listen-addr": listenAddr,
  204. "local-addr": bindAddr,
  205. "advertise-addr": advAddr,
  206. "data-path-addr": dataAddr,
  207. "remote-addr-list": remoteAddrList,
  208. "network-control-plane-mtu": c.Config().NetworkControlPlaneMTU,
  209. }).Info("Initializing Libnetwork Agent")
  210. if advAddr != "" {
  211. if err := c.agentInit(listenAddr, bindAddr, advAddr, dataAddr); err != nil {
  212. log.G(context.TODO()).WithError(err).Errorf("Error in agentInit")
  213. return err
  214. }
  215. c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
  216. if capability.ConnectivityScope == scope.Global {
  217. if d, ok := driver.(discoverapi.Discover); ok {
  218. c.agentDriverNotify(d)
  219. }
  220. }
  221. return false
  222. })
  223. }
  224. if len(remoteAddrList) > 0 {
  225. if err := c.agentJoin(remoteAddrList); err != nil {
  226. log.G(context.TODO()).WithError(err).Error("Error in joining gossip cluster: join will be retried in background")
  227. }
  228. }
  229. return nil
  230. }
  231. // For a given subsystem getKeys sorts the keys by lamport time and returns
  232. // slice of keys and lamport time which can used as a unique tag for the keys
  233. func (c *Controller) getKeys(subsystem string) (keys [][]byte, tags []uint64) {
  234. c.mu.Lock()
  235. defer c.mu.Unlock()
  236. sort.Sort(ByTime(c.keys))
  237. keys = make([][]byte, 0, len(c.keys))
  238. tags = make([]uint64, 0, len(c.keys))
  239. for _, key := range c.keys {
  240. if key.Subsystem == subsystem {
  241. keys = append(keys, key.Key)
  242. tags = append(tags, key.LamportTime)
  243. }
  244. }
  245. if len(keys) > 1 {
  246. // TODO(thaJeztah): why are we swapping order here? This code was added in https://github.com/moby/libnetwork/commit/e83d68b7d1fd9c479120914024242238f791b4dc
  247. keys[0], keys[1] = keys[1], keys[0]
  248. tags[0], tags[1] = tags[1], tags[0]
  249. }
  250. return keys, tags
  251. }
  252. // getPrimaryKeyTag returns the primary key for a given subsystem from the
  253. // list of sorted key and the associated tag
  254. func (c *Controller) getPrimaryKeyTag(subsystem string) (key []byte, lamportTime uint64, _ error) {
  255. c.mu.Lock()
  256. defer c.mu.Unlock()
  257. sort.Sort(ByTime(c.keys))
  258. keys := make([]*types.EncryptionKey, 0, len(c.keys))
  259. for _, k := range c.keys {
  260. if k.Subsystem == subsystem {
  261. keys = append(keys, k)
  262. }
  263. }
  264. if len(keys) < 2 {
  265. return nil, 0, fmt.Errorf("no primary key found for %s subsystem: %d keys found on controller, expected at least 2", subsystem, len(keys))
  266. }
  267. return keys[1].Key, keys[1].LamportTime, nil
  268. }
  269. func (c *Controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, dataPathAddr string) error {
  270. bindAddr, err := resolveAddr(bindAddrOrInterface)
  271. if err != nil {
  272. return err
  273. }
  274. keys, _ := c.getKeys(subsysGossip)
  275. netDBConf := networkdb.DefaultConfig()
  276. netDBConf.BindAddr = listenAddr
  277. netDBConf.AdvertiseAddr = advertiseAddr
  278. netDBConf.Keys = keys
  279. if c.Config().NetworkControlPlaneMTU != 0 {
  280. // Consider the MTU remove the IP hdr (IPv4 or IPv6) and the TCP/UDP hdr.
  281. // To be on the safe side let's cut 100 bytes
  282. netDBConf.PacketBufferSize = (c.Config().NetworkControlPlaneMTU - 100)
  283. log.G(context.TODO()).Debugf("Control plane MTU: %d will initialize NetworkDB with: %d",
  284. c.Config().NetworkControlPlaneMTU, netDBConf.PacketBufferSize)
  285. }
  286. nDB, err := networkdb.New(netDBConf)
  287. if err != nil {
  288. return err
  289. }
  290. // Register the diagnostic handlers
  291. c.DiagnosticServer.RegisterHandler(nDB, networkdb.NetDbPaths2Func)
  292. var cancelList []func()
  293. ch, cancel := nDB.Watch(libnetworkEPTable, "")
  294. cancelList = append(cancelList, cancel)
  295. nodeCh, cancel := nDB.Watch(networkdb.NodeTable, "")
  296. cancelList = append(cancelList, cancel)
  297. c.mu.Lock()
  298. c.agent = &nwAgent{
  299. networkDB: nDB,
  300. bindAddr: bindAddr,
  301. advertiseAddr: advertiseAddr,
  302. dataPathAddr: dataPathAddr,
  303. coreCancelFuncs: cancelList,
  304. driverCancelFuncs: make(map[string][]func()),
  305. }
  306. c.mu.Unlock()
  307. go c.handleTableEvents(ch, c.handleEpTableEvent)
  308. go c.handleTableEvents(nodeCh, c.handleNodeTableEvent)
  309. keys, tags := c.getKeys(subsysIPSec)
  310. c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
  311. if dr, ok := driver.(discoverapi.Discover); ok {
  312. if err := dr.DiscoverNew(discoverapi.EncryptionKeysConfig, discoverapi.DriverEncryptionConfig{
  313. Keys: keys,
  314. Tags: tags,
  315. }); err != nil {
  316. log.G(context.TODO()).Warnf("Failed to set datapath keys in driver %s: %v", name, err)
  317. }
  318. }
  319. return false
  320. })
  321. c.WalkNetworks(joinCluster)
  322. return nil
  323. }
  324. func (c *Controller) agentJoin(remoteAddrList []string) error {
  325. agent := c.getAgent()
  326. if agent == nil {
  327. return nil
  328. }
  329. return agent.networkDB.Join(remoteAddrList)
  330. }
  331. func (c *Controller) agentDriverNotify(d discoverapi.Discover) {
  332. agent := c.getAgent()
  333. if agent == nil {
  334. return
  335. }
  336. if err := d.DiscoverNew(discoverapi.NodeDiscovery, discoverapi.NodeDiscoveryData{
  337. Address: agent.dataPathAddress(),
  338. BindAddress: agent.bindAddr.String(),
  339. Self: true,
  340. }); err != nil {
  341. log.G(context.TODO()).Warnf("Failed the node discovery in driver: %v", err)
  342. }
  343. keys, tags := c.getKeys(subsysIPSec)
  344. if err := d.DiscoverNew(discoverapi.EncryptionKeysConfig, discoverapi.DriverEncryptionConfig{
  345. Keys: keys,
  346. Tags: tags,
  347. }); err != nil {
  348. log.G(context.TODO()).Warnf("Failed to set datapath keys in driver: %v", err)
  349. }
  350. }
  351. func (c *Controller) agentClose() {
  352. // Acquire current agent instance and reset its pointer
  353. // then run closing functions
  354. c.mu.Lock()
  355. agent := c.agent
  356. c.agent = nil
  357. c.mu.Unlock()
  358. // when the agent is closed the cluster provider should be cleaned up
  359. c.SetClusterProvider(nil)
  360. if agent == nil {
  361. return
  362. }
  363. var cancelList []func()
  364. agent.mu.Lock()
  365. for _, cancelFuncs := range agent.driverCancelFuncs {
  366. cancelList = append(cancelList, cancelFuncs...)
  367. }
  368. // Add also the cancel functions for the network db
  369. cancelList = append(cancelList, agent.coreCancelFuncs...)
  370. agent.mu.Unlock()
  371. for _, cancel := range cancelList {
  372. cancel()
  373. }
  374. agent.networkDB.Close()
  375. }
  376. // Task has the backend container details
  377. type Task struct {
  378. Name string
  379. EndpointID string
  380. EndpointIP string
  381. Info map[string]string
  382. }
  383. // ServiceInfo has service specific details along with the list of backend tasks
  384. type ServiceInfo struct {
  385. VIP string
  386. LocalLBIndex int
  387. Tasks []Task
  388. Ports []string
  389. }
  390. type epRecord struct {
  391. ep EndpointRecord
  392. info map[string]string
  393. lbIndex int
  394. }
  395. // Services returns a map of services keyed by the service name with the details
  396. // of all the tasks that belong to the service. Applicable only in swarm mode.
  397. func (n *Network) Services() map[string]ServiceInfo {
  398. agent, ok := n.clusterAgent()
  399. if !ok {
  400. return nil
  401. }
  402. nwID := n.ID()
  403. d, err := n.driver(true)
  404. if err != nil {
  405. log.G(context.TODO()).Errorf("Could not resolve driver for network %s/%s while fetching services: %v", n.networkType, nwID, err)
  406. return nil
  407. }
  408. // Walk through libnetworkEPTable and fetch the driver agnostic endpoint info
  409. eps := make(map[string]epRecord)
  410. c := n.getController()
  411. for eid, value := range agent.networkDB.GetTableByNetwork(libnetworkEPTable, nwID) {
  412. var epRec EndpointRecord
  413. if err := proto.Unmarshal(value.Value, &epRec); err != nil {
  414. log.G(context.TODO()).Errorf("Unmarshal of libnetworkEPTable failed for endpoint %s in network %s, %v", eid, nwID, err)
  415. continue
  416. }
  417. eps[eid] = epRecord{
  418. ep: epRec,
  419. lbIndex: c.getLBIndex(epRec.ServiceID, nwID, epRec.IngressPorts),
  420. }
  421. }
  422. // Walk through the driver's tables, have the driver decode the entries
  423. // and return the tuple {ep ID, value}. value is a string that coveys
  424. // relevant info about the endpoint.
  425. for _, table := range n.driverTables {
  426. if table.objType != driverapi.EndpointObject {
  427. continue
  428. }
  429. for key, value := range agent.networkDB.GetTableByNetwork(table.name, nwID) {
  430. epID, info := d.DecodeTableEntry(table.name, key, value.Value)
  431. if ep, ok := eps[epID]; !ok {
  432. log.G(context.TODO()).Errorf("Inconsistent driver and libnetwork state for endpoint %s", epID)
  433. } else {
  434. ep.info = info
  435. eps[epID] = ep
  436. }
  437. }
  438. }
  439. // group the endpoints into a map keyed by the service name
  440. sinfo := make(map[string]ServiceInfo)
  441. for ep, epr := range eps {
  442. s, ok := sinfo[epr.ep.ServiceName]
  443. if !ok {
  444. s = ServiceInfo{
  445. VIP: epr.ep.VirtualIP,
  446. LocalLBIndex: epr.lbIndex,
  447. }
  448. }
  449. if s.Ports == nil {
  450. ports := make([]string, 0, len(epr.ep.IngressPorts))
  451. for _, port := range epr.ep.IngressPorts {
  452. ports = append(ports, fmt.Sprintf("Target: %d, Publish: %d", port.TargetPort, port.PublishedPort))
  453. }
  454. s.Ports = ports
  455. }
  456. s.Tasks = append(s.Tasks, Task{
  457. Name: epr.ep.Name,
  458. EndpointID: ep,
  459. EndpointIP: epr.ep.EndpointIP,
  460. Info: epr.info,
  461. })
  462. sinfo[epr.ep.ServiceName] = s
  463. }
  464. return sinfo
  465. }
  466. // clusterAgent returns the cluster agent if the network is a swarm-scoped,
  467. // multi-host network.
  468. func (n *Network) clusterAgent() (agent *nwAgent, ok bool) {
  469. if n.scope != scope.Swarm || !n.driverIsMultihost() {
  470. return nil, false
  471. }
  472. a := n.getController().getAgent()
  473. return a, a != nil
  474. }
  475. func (n *Network) joinCluster() error {
  476. agent, ok := n.clusterAgent()
  477. if !ok {
  478. return nil
  479. }
  480. return agent.networkDB.JoinNetwork(n.ID())
  481. }
  482. func (n *Network) leaveCluster() error {
  483. agent, ok := n.clusterAgent()
  484. if !ok {
  485. return nil
  486. }
  487. return agent.networkDB.LeaveNetwork(n.ID())
  488. }
  489. func (ep *Endpoint) addDriverInfoToCluster() error {
  490. if ep.joinInfo == nil || len(ep.joinInfo.driverTableEntries) == 0 {
  491. return nil
  492. }
  493. n := ep.getNetwork()
  494. agent, ok := n.clusterAgent()
  495. if !ok {
  496. return nil
  497. }
  498. nwID := n.ID()
  499. for _, te := range ep.joinInfo.driverTableEntries {
  500. if err := agent.networkDB.CreateEntry(te.tableName, nwID, te.key, te.value); err != nil {
  501. return err
  502. }
  503. }
  504. return nil
  505. }
  506. func (ep *Endpoint) deleteDriverInfoFromCluster() error {
  507. if ep.joinInfo == nil || len(ep.joinInfo.driverTableEntries) == 0 {
  508. return nil
  509. }
  510. n := ep.getNetwork()
  511. agent, ok := n.clusterAgent()
  512. if !ok {
  513. return nil
  514. }
  515. nwID := n.ID()
  516. for _, te := range ep.joinInfo.driverTableEntries {
  517. if err := agent.networkDB.DeleteEntry(te.tableName, nwID, te.key); err != nil {
  518. return err
  519. }
  520. }
  521. return nil
  522. }
  523. func (ep *Endpoint) addServiceInfoToCluster(sb *Sandbox) error {
  524. if len(ep.myAliases) == 0 && ep.isAnonymous() || ep.Iface() == nil || ep.Iface().Address() == nil {
  525. return nil
  526. }
  527. n := ep.getNetwork()
  528. agent, ok := n.clusterAgent()
  529. if !ok {
  530. return nil
  531. }
  532. sb.service.Lock()
  533. defer sb.service.Unlock()
  534. log.G(context.TODO()).Debugf("addServiceInfoToCluster START for %s %s", ep.svcName, ep.ID())
  535. // Check that the endpoint is still present on the sandbox before adding it to the service discovery.
  536. // This is to handle a race between the EnableService and the sbLeave
  537. // It is possible that the EnableService starts, fetches the list of the endpoints and
  538. // by the time the addServiceInfoToCluster is called the endpoint got removed from the sandbox
  539. // The risk is that the deleteServiceInfoToCluster happens before the addServiceInfoToCluster.
  540. // This check under the Service lock of the sandbox ensure the correct behavior.
  541. // If the addServiceInfoToCluster arrives first may find or not the endpoint and will proceed or exit
  542. // but in any case the deleteServiceInfoToCluster will follow doing the cleanup if needed.
  543. // In case the deleteServiceInfoToCluster arrives first, this one is happening after the endpoint is
  544. // removed from the list, in this situation the delete will bail out not finding any data to cleanup
  545. // and the add will bail out not finding the endpoint on the sandbox.
  546. if err := sb.getEndpoint(ep.ID()); err == nil {
  547. log.G(context.TODO()).Warnf("addServiceInfoToCluster suppressing service resolution ep is not anymore in the sandbox %s", ep.ID())
  548. return nil
  549. }
  550. name := ep.Name()
  551. if ep.isAnonymous() {
  552. name = ep.MyAliases()[0]
  553. }
  554. var ingressPorts []*PortConfig
  555. if ep.svcID != "" {
  556. // This is a task part of a service
  557. // Gossip ingress ports only in ingress network.
  558. if n.ingress {
  559. ingressPorts = ep.ingressPorts
  560. }
  561. if err := n.getController().addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "addServiceInfoToCluster"); err != nil {
  562. return err
  563. }
  564. } else {
  565. // This is a container simply attached to an attachable network
  566. if err := n.getController().addContainerNameResolution(n.ID(), ep.ID(), name, ep.myAliases, ep.Iface().Address().IP, "addServiceInfoToCluster"); err != nil {
  567. return err
  568. }
  569. }
  570. buf, err := proto.Marshal(&EndpointRecord{
  571. Name: name,
  572. ServiceName: ep.svcName,
  573. ServiceID: ep.svcID,
  574. VirtualIP: ep.virtualIP.String(),
  575. IngressPorts: ingressPorts,
  576. Aliases: ep.svcAliases,
  577. TaskAliases: ep.myAliases,
  578. EndpointIP: ep.Iface().Address().IP.String(),
  579. ServiceDisabled: false,
  580. })
  581. if err != nil {
  582. return err
  583. }
  584. if err := agent.networkDB.CreateEntry(libnetworkEPTable, n.ID(), ep.ID(), buf); err != nil {
  585. log.G(context.TODO()).Warnf("addServiceInfoToCluster NetworkDB CreateEntry failed for %s %s err:%s", ep.id, n.id, err)
  586. return err
  587. }
  588. log.G(context.TODO()).Debugf("addServiceInfoToCluster END for %s %s", ep.svcName, ep.ID())
  589. return nil
  590. }
  591. func (ep *Endpoint) deleteServiceInfoFromCluster(sb *Sandbox, fullRemove bool, method string) error {
  592. if len(ep.myAliases) == 0 && ep.isAnonymous() {
  593. return nil
  594. }
  595. n := ep.getNetwork()
  596. agent, ok := n.clusterAgent()
  597. if !ok {
  598. return nil
  599. }
  600. sb.service.Lock()
  601. defer sb.service.Unlock()
  602. log.G(context.TODO()).Debugf("deleteServiceInfoFromCluster from %s START for %s %s", method, ep.svcName, ep.ID())
  603. // Avoid a race w/ with a container that aborts preemptively. This would
  604. // get caught in disableServceInNetworkDB, but we check here to make the
  605. // nature of the condition more clear.
  606. // See comment in addServiceInfoToCluster()
  607. if err := sb.getEndpoint(ep.ID()); err == nil {
  608. log.G(context.TODO()).Warnf("deleteServiceInfoFromCluster suppressing service resolution ep is not anymore in the sandbox %s", ep.ID())
  609. return nil
  610. }
  611. name := ep.Name()
  612. if ep.isAnonymous() {
  613. name = ep.MyAliases()[0]
  614. }
  615. // First update the networkDB then locally
  616. if fullRemove {
  617. if err := agent.networkDB.DeleteEntry(libnetworkEPTable, n.ID(), ep.ID()); err != nil {
  618. log.G(context.TODO()).Warnf("deleteServiceInfoFromCluster NetworkDB DeleteEntry failed for %s %s err:%s", ep.id, n.id, err)
  619. }
  620. } else {
  621. disableServiceInNetworkDB(agent, n, ep)
  622. }
  623. if ep.Iface() != nil && ep.Iface().Address() != nil {
  624. if ep.svcID != "" {
  625. // This is a task part of a service
  626. var ingressPorts []*PortConfig
  627. if n.ingress {
  628. ingressPorts = ep.ingressPorts
  629. }
  630. if err := n.getController().rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster", true, fullRemove); err != nil {
  631. return err
  632. }
  633. } else {
  634. // This is a container simply attached to an attachable network
  635. if err := n.getController().delContainerNameResolution(n.ID(), ep.ID(), name, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster"); err != nil {
  636. return err
  637. }
  638. }
  639. }
  640. log.G(context.TODO()).Debugf("deleteServiceInfoFromCluster from %s END for %s %s", method, ep.svcName, ep.ID())
  641. return nil
  642. }
  643. func disableServiceInNetworkDB(a *nwAgent, n *Network, ep *Endpoint) {
  644. var epRec EndpointRecord
  645. log.G(context.TODO()).Debugf("disableServiceInNetworkDB for %s %s", ep.svcName, ep.ID())
  646. // Update existing record to indicate that the service is disabled
  647. inBuf, err := a.networkDB.GetEntry(libnetworkEPTable, n.ID(), ep.ID())
  648. if err != nil {
  649. log.G(context.TODO()).Warnf("disableServiceInNetworkDB GetEntry failed for %s %s err:%s", ep.id, n.id, err)
  650. return
  651. }
  652. // Should never fail
  653. if err := proto.Unmarshal(inBuf, &epRec); err != nil {
  654. log.G(context.TODO()).Errorf("disableServiceInNetworkDB unmarshal failed for %s %s err:%s", ep.id, n.id, err)
  655. return
  656. }
  657. epRec.ServiceDisabled = true
  658. // Should never fail
  659. outBuf, err := proto.Marshal(&epRec)
  660. if err != nil {
  661. log.G(context.TODO()).Errorf("disableServiceInNetworkDB marshalling failed for %s %s err:%s", ep.id, n.id, err)
  662. return
  663. }
  664. // Send update to the whole cluster
  665. if err := a.networkDB.UpdateEntry(libnetworkEPTable, n.ID(), ep.ID(), outBuf); err != nil {
  666. log.G(context.TODO()).Warnf("disableServiceInNetworkDB UpdateEntry failed for %s %s err:%s", ep.id, n.id, err)
  667. }
  668. }
  669. func (n *Network) addDriverWatches() {
  670. if len(n.driverTables) == 0 {
  671. return
  672. }
  673. agent, ok := n.clusterAgent()
  674. if !ok {
  675. return
  676. }
  677. c := n.getController()
  678. for _, table := range n.driverTables {
  679. ch, cancel := agent.networkDB.Watch(table.name, n.ID())
  680. agent.mu.Lock()
  681. agent.driverCancelFuncs[n.ID()] = append(agent.driverCancelFuncs[n.ID()], cancel)
  682. agent.mu.Unlock()
  683. go c.handleTableEvents(ch, n.handleDriverTableEvent)
  684. d, err := n.driver(false)
  685. if err != nil {
  686. log.G(context.TODO()).Errorf("Could not resolve driver %s while walking driver tabl: %v", n.networkType, err)
  687. return
  688. }
  689. err = agent.networkDB.WalkTable(table.name, func(nid, key string, value []byte, deleted bool) bool {
  690. // skip the entries that are mark for deletion, this is safe because this function is
  691. // called at initialization time so there is no state to delete
  692. if nid == n.ID() && !deleted {
  693. d.EventNotify(driverapi.Create, nid, table.name, key, value)
  694. }
  695. return false
  696. })
  697. if err != nil {
  698. log.G(context.TODO()).WithError(err).Warn("Error while walking networkdb")
  699. }
  700. }
  701. }
  702. func (n *Network) cancelDriverWatches() {
  703. agent, ok := n.clusterAgent()
  704. if !ok {
  705. return
  706. }
  707. agent.mu.Lock()
  708. cancelFuncs := agent.driverCancelFuncs[n.ID()]
  709. delete(agent.driverCancelFuncs, n.ID())
  710. agent.mu.Unlock()
  711. for _, cancel := range cancelFuncs {
  712. cancel()
  713. }
  714. }
  715. func (c *Controller) handleTableEvents(ch *events.Channel, fn func(events.Event)) {
  716. for {
  717. select {
  718. case ev := <-ch.C:
  719. fn(ev)
  720. case <-ch.Done():
  721. return
  722. }
  723. }
  724. }
  725. func (n *Network) handleDriverTableEvent(ev events.Event) {
  726. d, err := n.driver(false)
  727. if err != nil {
  728. log.G(context.TODO()).Errorf("Could not resolve driver %s while handling driver table event: %v", n.networkType, err)
  729. return
  730. }
  731. var (
  732. etype driverapi.EventType
  733. tname string
  734. key string
  735. value []byte
  736. )
  737. switch event := ev.(type) {
  738. case networkdb.CreateEvent:
  739. tname = event.Table
  740. key = event.Key
  741. value = event.Value
  742. etype = driverapi.Create
  743. case networkdb.DeleteEvent:
  744. tname = event.Table
  745. key = event.Key
  746. value = event.Value
  747. etype = driverapi.Delete
  748. case networkdb.UpdateEvent:
  749. tname = event.Table
  750. key = event.Key
  751. value = event.Value
  752. etype = driverapi.Delete
  753. }
  754. d.EventNotify(etype, n.ID(), tname, key, value)
  755. }
  756. func (c *Controller) handleNodeTableEvent(ev events.Event) {
  757. var (
  758. value []byte
  759. isAdd bool
  760. nodeAddr networkdb.NodeAddr
  761. )
  762. switch event := ev.(type) {
  763. case networkdb.CreateEvent:
  764. value = event.Value
  765. isAdd = true
  766. case networkdb.DeleteEvent:
  767. value = event.Value
  768. case networkdb.UpdateEvent:
  769. log.G(context.TODO()).Errorf("Unexpected update node table event = %#v", event)
  770. }
  771. err := json.Unmarshal(value, &nodeAddr)
  772. if err != nil {
  773. log.G(context.TODO()).Errorf("Error unmarshalling node table event %v", err)
  774. return
  775. }
  776. c.processNodeDiscovery([]net.IP{nodeAddr.Addr}, isAdd)
  777. }
  778. func (c *Controller) handleEpTableEvent(ev events.Event) {
  779. var (
  780. nid string
  781. eid string
  782. value []byte
  783. epRec EndpointRecord
  784. )
  785. switch event := ev.(type) {
  786. case networkdb.CreateEvent:
  787. nid = event.NetworkID
  788. eid = event.Key
  789. value = event.Value
  790. case networkdb.DeleteEvent:
  791. nid = event.NetworkID
  792. eid = event.Key
  793. value = event.Value
  794. case networkdb.UpdateEvent:
  795. nid = event.NetworkID
  796. eid = event.Key
  797. value = event.Value
  798. default:
  799. log.G(context.TODO()).Errorf("Unexpected update service table event = %#v", event)
  800. return
  801. }
  802. err := proto.Unmarshal(value, &epRec)
  803. if err != nil {
  804. log.G(context.TODO()).Errorf("Failed to unmarshal service table value: %v", err)
  805. return
  806. }
  807. containerName := epRec.Name
  808. svcName := epRec.ServiceName
  809. svcID := epRec.ServiceID
  810. vip := net.ParseIP(epRec.VirtualIP)
  811. ip := net.ParseIP(epRec.EndpointIP)
  812. ingressPorts := epRec.IngressPorts
  813. serviceAliases := epRec.Aliases
  814. taskAliases := epRec.TaskAliases
  815. if containerName == "" || ip == nil {
  816. log.G(context.TODO()).Errorf("Invalid endpoint name/ip received while handling service table event %s", value)
  817. return
  818. }
  819. switch ev.(type) {
  820. case networkdb.CreateEvent:
  821. log.G(context.TODO()).Debugf("handleEpTableEvent ADD %s R:%v", eid, epRec)
  822. if svcID != "" {
  823. // This is a remote task part of a service
  824. if err := c.addServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil {
  825. log.G(context.TODO()).Errorf("failed adding service binding for %s epRec:%v err:%v", eid, epRec, err)
  826. return
  827. }
  828. } else {
  829. // This is a remote container simply attached to an attachable network
  830. if err := c.addContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
  831. log.G(context.TODO()).Errorf("failed adding container name resolution for %s epRec:%v err:%v", eid, epRec, err)
  832. }
  833. }
  834. case networkdb.DeleteEvent:
  835. log.G(context.TODO()).Debugf("handleEpTableEvent DEL %s R:%v", eid, epRec)
  836. if svcID != "" {
  837. // This is a remote task part of a service
  838. if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, true); err != nil {
  839. log.G(context.TODO()).Errorf("failed removing service binding for %s epRec:%v err:%v", eid, epRec, err)
  840. return
  841. }
  842. } else {
  843. // This is a remote container simply attached to an attachable network
  844. if err := c.delContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
  845. log.G(context.TODO()).Errorf("failed removing container name resolution for %s epRec:%v err:%v", eid, epRec, err)
  846. }
  847. }
  848. case networkdb.UpdateEvent:
  849. log.G(context.TODO()).Debugf("handleEpTableEvent UPD %s R:%v", eid, epRec)
  850. // We currently should only get these to inform us that an endpoint
  851. // is disabled. Report if otherwise.
  852. if svcID == "" || !epRec.ServiceDisabled {
  853. log.G(context.TODO()).Errorf("Unexpected update table event for %s epRec:%v", eid, epRec)
  854. return
  855. }
  856. // This is a remote task that is part of a service that is now disabled
  857. if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, false); err != nil {
  858. log.G(context.TODO()).Errorf("failed disabling service binding for %s epRec:%v err:%v", eid, epRec, err)
  859. return
  860. }
  861. }
  862. }