agent.go 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999
  1. package libnetwork
  2. //go:generate protoc -I=. -I=../vendor/ --gogofaster_out=import_path=github.com/docker/docker/libnetwork:. agent.proto
  3. import (
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "net"
  8. "sort"
  9. "sync"
  10. "github.com/containerd/containerd/log"
  11. "github.com/docker/docker/libnetwork/cluster"
  12. "github.com/docker/docker/libnetwork/discoverapi"
  13. "github.com/docker/docker/libnetwork/driverapi"
  14. "github.com/docker/docker/libnetwork/networkdb"
  15. "github.com/docker/docker/libnetwork/scope"
  16. "github.com/docker/docker/libnetwork/types"
  17. "github.com/docker/go-events"
  18. "github.com/gogo/protobuf/proto"
  19. )
  20. const (
  21. subsysGossip = "networking:gossip"
  22. subsysIPSec = "networking:ipsec"
  23. keyringSize = 3
  24. )
  25. // ByTime implements sort.Interface for []*types.EncryptionKey based on
  26. // the LamportTime field.
  27. type ByTime []*types.EncryptionKey
  28. func (b ByTime) Len() int { return len(b) }
  29. func (b ByTime) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
  30. func (b ByTime) Less(i, j int) bool { return b[i].LamportTime < b[j].LamportTime }
  31. type nwAgent struct {
  32. networkDB *networkdb.NetworkDB
  33. bindAddr string
  34. advertiseAddr string
  35. dataPathAddr string
  36. coreCancelFuncs []func()
  37. driverCancelFuncs map[string][]func()
  38. sync.Mutex
  39. }
  40. func (a *nwAgent) dataPathAddress() string {
  41. a.Lock()
  42. defer a.Unlock()
  43. if a.dataPathAddr != "" {
  44. return a.dataPathAddr
  45. }
  46. return a.advertiseAddr
  47. }
  48. const libnetworkEPTable = "endpoint_table"
  49. func getBindAddr(ifaceName string) (string, error) {
  50. iface, err := net.InterfaceByName(ifaceName)
  51. if err != nil {
  52. return "", fmt.Errorf("failed to find interface %s: %v", ifaceName, err)
  53. }
  54. addrs, err := iface.Addrs()
  55. if err != nil {
  56. return "", fmt.Errorf("failed to get interface addresses: %v", err)
  57. }
  58. for _, a := range addrs {
  59. addr, ok := a.(*net.IPNet)
  60. if !ok {
  61. continue
  62. }
  63. addrIP := addr.IP
  64. if addrIP.IsLinkLocalUnicast() {
  65. continue
  66. }
  67. return addrIP.String(), nil
  68. }
  69. return "", fmt.Errorf("failed to get bind address")
  70. }
  71. func resolveAddr(addrOrInterface string) (string, error) {
  72. // Try and see if this is a valid IP address
  73. if net.ParseIP(addrOrInterface) != nil {
  74. return addrOrInterface, nil
  75. }
  76. addr, err := net.ResolveIPAddr("ip", addrOrInterface)
  77. if err != nil {
  78. // If not a valid IP address, it should be a valid interface
  79. return getBindAddr(addrOrInterface)
  80. }
  81. return addr.String(), nil
  82. }
  83. func (c *Controller) handleKeyChange(keys []*types.EncryptionKey) error {
  84. drvEnc := discoverapi.DriverEncryptionUpdate{}
  85. a := c.getAgent()
  86. if a == nil {
  87. log.G(context.TODO()).Debug("Skipping key change as agent is nil")
  88. return nil
  89. }
  90. // Find the deleted key. If the deleted key was the primary key,
  91. // a new primary key should be set before removing if from keyring.
  92. c.mu.Lock()
  93. added := []byte{}
  94. deleted := []byte{}
  95. j := len(c.keys)
  96. for i := 0; i < j; {
  97. same := false
  98. for _, key := range keys {
  99. if same = key.LamportTime == c.keys[i].LamportTime; same {
  100. break
  101. }
  102. }
  103. if !same {
  104. cKey := c.keys[i]
  105. if cKey.Subsystem == subsysGossip {
  106. deleted = cKey.Key
  107. }
  108. if cKey.Subsystem == subsysIPSec {
  109. drvEnc.Prune = cKey.Key
  110. drvEnc.PruneTag = cKey.LamportTime
  111. }
  112. c.keys[i], c.keys[j-1] = c.keys[j-1], c.keys[i]
  113. c.keys[j-1] = nil
  114. j--
  115. }
  116. i++
  117. }
  118. c.keys = c.keys[:j]
  119. // Find the new key and add it to the key ring
  120. for _, key := range keys {
  121. same := false
  122. for _, cKey := range c.keys {
  123. if same = cKey.LamportTime == key.LamportTime; same {
  124. break
  125. }
  126. }
  127. if !same {
  128. c.keys = append(c.keys, key)
  129. if key.Subsystem == subsysGossip {
  130. added = key.Key
  131. }
  132. if key.Subsystem == subsysIPSec {
  133. drvEnc.Key = key.Key
  134. drvEnc.Tag = key.LamportTime
  135. }
  136. }
  137. }
  138. c.mu.Unlock()
  139. if len(added) > 0 {
  140. a.networkDB.SetKey(added)
  141. }
  142. key, _, err := c.getPrimaryKeyTag(subsysGossip)
  143. if err != nil {
  144. return err
  145. }
  146. a.networkDB.SetPrimaryKey(key)
  147. key, tag, err := c.getPrimaryKeyTag(subsysIPSec)
  148. if err != nil {
  149. return err
  150. }
  151. drvEnc.Primary = key
  152. drvEnc.PrimaryTag = tag
  153. if len(deleted) > 0 {
  154. a.networkDB.RemoveKey(deleted)
  155. }
  156. c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
  157. dr, ok := driver.(discoverapi.Discover)
  158. if !ok {
  159. return false
  160. }
  161. if err := dr.DiscoverNew(discoverapi.EncryptionKeysUpdate, drvEnc); err != nil {
  162. log.G(context.TODO()).Warnf("Failed to update datapath keys in driver %s: %v", name, err)
  163. // Attempt to reconfigure keys in case of a update failure
  164. // which can arise due to a mismatch of keys
  165. // if worker nodes get temporarily disconnected
  166. log.G(context.TODO()).Warnf("Reconfiguring datapath keys for %s", name)
  167. drvCfgEnc := discoverapi.DriverEncryptionConfig{}
  168. drvCfgEnc.Keys, drvCfgEnc.Tags = c.getKeys(subsysIPSec)
  169. err = dr.DiscoverNew(discoverapi.EncryptionKeysConfig, drvCfgEnc)
  170. if err != nil {
  171. log.G(context.TODO()).Warnf("Failed to reset datapath keys in driver %s: %v", name, err)
  172. }
  173. }
  174. return false
  175. })
  176. return nil
  177. }
  178. func (c *Controller) agentSetup(clusterProvider cluster.Provider) error {
  179. agent := c.getAgent()
  180. // If the agent is already present there is no need to try to initialize it again
  181. if agent != nil {
  182. return nil
  183. }
  184. bindAddr := clusterProvider.GetLocalAddress()
  185. advAddr := clusterProvider.GetAdvertiseAddress()
  186. dataAddr := clusterProvider.GetDataPathAddress()
  187. remoteList := clusterProvider.GetRemoteAddressList()
  188. remoteAddrList := make([]string, 0, len(remoteList))
  189. for _, remote := range remoteList {
  190. addr, _, _ := net.SplitHostPort(remote)
  191. remoteAddrList = append(remoteAddrList, addr)
  192. }
  193. listen := clusterProvider.GetListenAddress()
  194. listenAddr, _, _ := net.SplitHostPort(listen)
  195. log.G(context.TODO()).Infof("Initializing Libnetwork Agent Listen-Addr=%s Local-addr=%s Adv-addr=%s Data-addr=%s Remote-addr-list=%v MTU=%d",
  196. listenAddr, bindAddr, advAddr, dataAddr, remoteAddrList, c.Config().NetworkControlPlaneMTU)
  197. if advAddr != "" && agent == nil {
  198. if err := c.agentInit(listenAddr, bindAddr, advAddr, dataAddr); err != nil {
  199. log.G(context.TODO()).Errorf("error in agentInit: %v", err)
  200. return err
  201. }
  202. c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
  203. if capability.ConnectivityScope == scope.Global {
  204. if d, ok := driver.(discoverapi.Discover); ok {
  205. c.agentDriverNotify(d)
  206. }
  207. }
  208. return false
  209. })
  210. }
  211. if len(remoteAddrList) > 0 {
  212. if err := c.agentJoin(remoteAddrList); err != nil {
  213. log.G(context.TODO()).Errorf("Error in joining gossip cluster : %v(join will be retried in background)", err)
  214. }
  215. }
  216. return nil
  217. }
  218. // For a given subsystem getKeys sorts the keys by lamport time and returns
  219. // slice of keys and lamport time which can used as a unique tag for the keys
  220. func (c *Controller) getKeys(subsys string) ([][]byte, []uint64) {
  221. c.mu.Lock()
  222. defer c.mu.Unlock()
  223. sort.Sort(ByTime(c.keys))
  224. keys := [][]byte{}
  225. tags := []uint64{}
  226. for _, key := range c.keys {
  227. if key.Subsystem == subsys {
  228. keys = append(keys, key.Key)
  229. tags = append(tags, key.LamportTime)
  230. }
  231. }
  232. keys[0], keys[1] = keys[1], keys[0]
  233. tags[0], tags[1] = tags[1], tags[0]
  234. return keys, tags
  235. }
  236. // getPrimaryKeyTag returns the primary key for a given subsystem from the
  237. // list of sorted key and the associated tag
  238. func (c *Controller) getPrimaryKeyTag(subsys string) ([]byte, uint64, error) {
  239. c.mu.Lock()
  240. defer c.mu.Unlock()
  241. sort.Sort(ByTime(c.keys))
  242. keys := []*types.EncryptionKey{}
  243. for _, key := range c.keys {
  244. if key.Subsystem == subsys {
  245. keys = append(keys, key)
  246. }
  247. }
  248. return keys[1].Key, keys[1].LamportTime, nil
  249. }
  250. func (c *Controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, dataPathAddr string) error {
  251. bindAddr, err := resolveAddr(bindAddrOrInterface)
  252. if err != nil {
  253. return err
  254. }
  255. keys, _ := c.getKeys(subsysGossip)
  256. netDBConf := networkdb.DefaultConfig()
  257. netDBConf.BindAddr = listenAddr
  258. netDBConf.AdvertiseAddr = advertiseAddr
  259. netDBConf.Keys = keys
  260. if c.Config().NetworkControlPlaneMTU != 0 {
  261. // Consider the MTU remove the IP hdr (IPv4 or IPv6) and the TCP/UDP hdr.
  262. // To be on the safe side let's cut 100 bytes
  263. netDBConf.PacketBufferSize = (c.Config().NetworkControlPlaneMTU - 100)
  264. log.G(context.TODO()).Debugf("Control plane MTU: %d will initialize NetworkDB with: %d",
  265. c.Config().NetworkControlPlaneMTU, netDBConf.PacketBufferSize)
  266. }
  267. nDB, err := networkdb.New(netDBConf)
  268. if err != nil {
  269. return err
  270. }
  271. // Register the diagnostic handlers
  272. c.DiagnosticServer.RegisterHandler(nDB, networkdb.NetDbPaths2Func)
  273. var cancelList []func()
  274. ch, cancel := nDB.Watch(libnetworkEPTable, "")
  275. cancelList = append(cancelList, cancel)
  276. nodeCh, cancel := nDB.Watch(networkdb.NodeTable, "")
  277. cancelList = append(cancelList, cancel)
  278. c.mu.Lock()
  279. c.agent = &nwAgent{
  280. networkDB: nDB,
  281. bindAddr: bindAddr,
  282. advertiseAddr: advertiseAddr,
  283. dataPathAddr: dataPathAddr,
  284. coreCancelFuncs: cancelList,
  285. driverCancelFuncs: make(map[string][]func()),
  286. }
  287. c.mu.Unlock()
  288. go c.handleTableEvents(ch, c.handleEpTableEvent)
  289. go c.handleTableEvents(nodeCh, c.handleNodeTableEvent)
  290. drvEnc := discoverapi.DriverEncryptionConfig{}
  291. keys, tags := c.getKeys(subsysIPSec)
  292. drvEnc.Keys = keys
  293. drvEnc.Tags = tags
  294. c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
  295. if dr, ok := driver.(discoverapi.Discover); ok {
  296. if err := dr.DiscoverNew(discoverapi.EncryptionKeysConfig, drvEnc); err != nil {
  297. log.G(context.TODO()).Warnf("Failed to set datapath keys in driver %s: %v", name, err)
  298. }
  299. }
  300. return false
  301. })
  302. c.WalkNetworks(joinCluster)
  303. return nil
  304. }
  305. func (c *Controller) agentJoin(remoteAddrList []string) error {
  306. agent := c.getAgent()
  307. if agent == nil {
  308. return nil
  309. }
  310. return agent.networkDB.Join(remoteAddrList)
  311. }
  312. func (c *Controller) agentDriverNotify(d discoverapi.Discover) {
  313. agent := c.getAgent()
  314. if agent == nil {
  315. return
  316. }
  317. if err := d.DiscoverNew(discoverapi.NodeDiscovery, discoverapi.NodeDiscoveryData{
  318. Address: agent.dataPathAddress(),
  319. BindAddress: agent.bindAddr,
  320. Self: true,
  321. }); err != nil {
  322. log.G(context.TODO()).Warnf("Failed the node discovery in driver: %v", err)
  323. }
  324. drvEnc := discoverapi.DriverEncryptionConfig{}
  325. keys, tags := c.getKeys(subsysIPSec)
  326. drvEnc.Keys = keys
  327. drvEnc.Tags = tags
  328. if err := d.DiscoverNew(discoverapi.EncryptionKeysConfig, drvEnc); err != nil {
  329. log.G(context.TODO()).Warnf("Failed to set datapath keys in driver: %v", err)
  330. }
  331. }
  332. func (c *Controller) agentClose() {
  333. // Acquire current agent instance and reset its pointer
  334. // then run closing functions
  335. c.mu.Lock()
  336. agent := c.agent
  337. c.agent = nil
  338. c.mu.Unlock()
  339. // when the agent is closed the cluster provider should be cleaned up
  340. c.SetClusterProvider(nil)
  341. if agent == nil {
  342. return
  343. }
  344. var cancelList []func()
  345. agent.Lock()
  346. for _, cancelFuncs := range agent.driverCancelFuncs {
  347. cancelList = append(cancelList, cancelFuncs...)
  348. }
  349. // Add also the cancel functions for the network db
  350. cancelList = append(cancelList, agent.coreCancelFuncs...)
  351. agent.Unlock()
  352. for _, cancel := range cancelList {
  353. cancel()
  354. }
  355. agent.networkDB.Close()
  356. }
  357. // Task has the backend container details
  358. type Task struct {
  359. Name string
  360. EndpointID string
  361. EndpointIP string
  362. Info map[string]string
  363. }
  364. // ServiceInfo has service specific details along with the list of backend tasks
  365. type ServiceInfo struct {
  366. VIP string
  367. LocalLBIndex int
  368. Tasks []Task
  369. Ports []string
  370. }
  371. type epRecord struct {
  372. ep EndpointRecord
  373. info map[string]string
  374. lbIndex int
  375. }
  376. // Services returns a map of services keyed by the service name with the details
  377. // of all the tasks that belong to the service. Applicable only in swarm mode.
  378. func (n *Network) Services() map[string]ServiceInfo {
  379. if !n.isClusterEligible() {
  380. return nil
  381. }
  382. agent := n.getController().getAgent()
  383. if agent == nil {
  384. return nil
  385. }
  386. nwID := n.ID()
  387. d, err := n.driver(true)
  388. if err != nil {
  389. log.G(context.TODO()).Errorf("Could not resolve driver for network %s/%s while fetching services: %v", n.networkType, nwID, err)
  390. return nil
  391. }
  392. // Walk through libnetworkEPTable and fetch the driver agnostic endpoint info
  393. entries := agent.networkDB.GetTableByNetwork(libnetworkEPTable, nwID)
  394. eps := make(map[string]epRecord)
  395. c := n.getController()
  396. for eid, value := range entries {
  397. var epRec EndpointRecord
  398. if err := proto.Unmarshal(value.Value, &epRec); err != nil {
  399. log.G(context.TODO()).Errorf("Unmarshal of libnetworkEPTable failed for endpoint %s in network %s, %v", eid, nwID, err)
  400. continue
  401. }
  402. i := c.getLBIndex(epRec.ServiceID, nwID, epRec.IngressPorts)
  403. eps[eid] = epRecord{
  404. ep: epRec,
  405. lbIndex: i,
  406. }
  407. }
  408. // Walk through the driver's tables, have the driver decode the entries
  409. // and return the tuple {ep ID, value}. value is a string that coveys
  410. // relevant info about the endpoint.
  411. for _, table := range n.driverTables {
  412. if table.objType != driverapi.EndpointObject {
  413. continue
  414. }
  415. entries := agent.networkDB.GetTableByNetwork(table.name, nwID)
  416. for key, value := range entries {
  417. epID, info := d.DecodeTableEntry(table.name, key, value.Value)
  418. if ep, ok := eps[epID]; !ok {
  419. log.G(context.TODO()).Errorf("Inconsistent driver and libnetwork state for endpoint %s", epID)
  420. } else {
  421. ep.info = info
  422. eps[epID] = ep
  423. }
  424. }
  425. }
  426. // group the endpoints into a map keyed by the service name
  427. sinfo := make(map[string]ServiceInfo)
  428. for ep, epr := range eps {
  429. var (
  430. s ServiceInfo
  431. ok bool
  432. )
  433. if s, ok = sinfo[epr.ep.ServiceName]; !ok {
  434. s = ServiceInfo{
  435. VIP: epr.ep.VirtualIP,
  436. LocalLBIndex: epr.lbIndex,
  437. }
  438. }
  439. ports := []string{}
  440. if s.Ports == nil {
  441. for _, port := range epr.ep.IngressPorts {
  442. p := fmt.Sprintf("Target: %d, Publish: %d", port.TargetPort, port.PublishedPort)
  443. ports = append(ports, p)
  444. }
  445. s.Ports = ports
  446. }
  447. s.Tasks = append(s.Tasks, Task{
  448. Name: epr.ep.Name,
  449. EndpointID: ep,
  450. EndpointIP: epr.ep.EndpointIP,
  451. Info: epr.info,
  452. })
  453. sinfo[epr.ep.ServiceName] = s
  454. }
  455. return sinfo
  456. }
  457. func (n *Network) isClusterEligible() bool {
  458. if n.scope != scope.Swarm || !n.driverIsMultihost() {
  459. return false
  460. }
  461. return n.getController().getAgent() != nil
  462. }
  463. func (n *Network) joinCluster() error {
  464. if !n.isClusterEligible() {
  465. return nil
  466. }
  467. agent := n.getController().getAgent()
  468. if agent == nil {
  469. return nil
  470. }
  471. return agent.networkDB.JoinNetwork(n.ID())
  472. }
  473. func (n *Network) leaveCluster() error {
  474. if !n.isClusterEligible() {
  475. return nil
  476. }
  477. agent := n.getController().getAgent()
  478. if agent == nil {
  479. return nil
  480. }
  481. return agent.networkDB.LeaveNetwork(n.ID())
  482. }
  483. func (ep *Endpoint) addDriverInfoToCluster() error {
  484. n := ep.getNetwork()
  485. if !n.isClusterEligible() {
  486. return nil
  487. }
  488. if ep.joinInfo == nil {
  489. return nil
  490. }
  491. agent := n.getController().getAgent()
  492. if agent == nil {
  493. return nil
  494. }
  495. for _, te := range ep.joinInfo.driverTableEntries {
  496. if err := agent.networkDB.CreateEntry(te.tableName, n.ID(), te.key, te.value); err != nil {
  497. return err
  498. }
  499. }
  500. return nil
  501. }
  502. func (ep *Endpoint) deleteDriverInfoFromCluster() error {
  503. n := ep.getNetwork()
  504. if !n.isClusterEligible() {
  505. return nil
  506. }
  507. if ep.joinInfo == nil {
  508. return nil
  509. }
  510. agent := n.getController().getAgent()
  511. if agent == nil {
  512. return nil
  513. }
  514. for _, te := range ep.joinInfo.driverTableEntries {
  515. if err := agent.networkDB.DeleteEntry(te.tableName, n.ID(), te.key); err != nil {
  516. return err
  517. }
  518. }
  519. return nil
  520. }
  521. func (ep *Endpoint) addServiceInfoToCluster(sb *Sandbox) error {
  522. if ep.isAnonymous() && len(ep.myAliases) == 0 || ep.Iface() == nil || ep.Iface().Address() == nil {
  523. return nil
  524. }
  525. n := ep.getNetwork()
  526. if !n.isClusterEligible() {
  527. return nil
  528. }
  529. sb.service.Lock()
  530. defer sb.service.Unlock()
  531. log.G(context.TODO()).Debugf("addServiceInfoToCluster START for %s %s", ep.svcName, ep.ID())
  532. // Check that the endpoint is still present on the sandbox before adding it to the service discovery.
  533. // This is to handle a race between the EnableService and the sbLeave
  534. // It is possible that the EnableService starts, fetches the list of the endpoints and
  535. // by the time the addServiceInfoToCluster is called the endpoint got removed from the sandbox
  536. // The risk is that the deleteServiceInfoToCluster happens before the addServiceInfoToCluster.
  537. // This check under the Service lock of the sandbox ensure the correct behavior.
  538. // If the addServiceInfoToCluster arrives first may find or not the endpoint and will proceed or exit
  539. // but in any case the deleteServiceInfoToCluster will follow doing the cleanup if needed.
  540. // In case the deleteServiceInfoToCluster arrives first, this one is happening after the endpoint is
  541. // removed from the list, in this situation the delete will bail out not finding any data to cleanup
  542. // and the add will bail out not finding the endpoint on the sandbox.
  543. if e := sb.getEndpoint(ep.ID()); e == nil {
  544. log.G(context.TODO()).Warnf("addServiceInfoToCluster suppressing service resolution ep is not anymore in the sandbox %s", ep.ID())
  545. return nil
  546. }
  547. c := n.getController()
  548. agent := c.getAgent()
  549. name := ep.Name()
  550. if ep.isAnonymous() {
  551. name = ep.MyAliases()[0]
  552. }
  553. var ingressPorts []*PortConfig
  554. if ep.svcID != "" {
  555. // This is a task part of a service
  556. // Gossip ingress ports only in ingress network.
  557. if n.ingress {
  558. ingressPorts = ep.ingressPorts
  559. }
  560. if err := c.addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "addServiceInfoToCluster"); err != nil {
  561. return err
  562. }
  563. } else {
  564. // This is a container simply attached to an attachable network
  565. if err := c.addContainerNameResolution(n.ID(), ep.ID(), name, ep.myAliases, ep.Iface().Address().IP, "addServiceInfoToCluster"); err != nil {
  566. return err
  567. }
  568. }
  569. buf, err := proto.Marshal(&EndpointRecord{
  570. Name: name,
  571. ServiceName: ep.svcName,
  572. ServiceID: ep.svcID,
  573. VirtualIP: ep.virtualIP.String(),
  574. IngressPorts: ingressPorts,
  575. Aliases: ep.svcAliases,
  576. TaskAliases: ep.myAliases,
  577. EndpointIP: ep.Iface().Address().IP.String(),
  578. ServiceDisabled: false,
  579. })
  580. if err != nil {
  581. return err
  582. }
  583. if agent != nil {
  584. if err := agent.networkDB.CreateEntry(libnetworkEPTable, n.ID(), ep.ID(), buf); err != nil {
  585. log.G(context.TODO()).Warnf("addServiceInfoToCluster NetworkDB CreateEntry failed for %s %s err:%s", ep.id, n.id, err)
  586. return err
  587. }
  588. }
  589. log.G(context.TODO()).Debugf("addServiceInfoToCluster END for %s %s", ep.svcName, ep.ID())
  590. return nil
  591. }
  592. func (ep *Endpoint) deleteServiceInfoFromCluster(sb *Sandbox, fullRemove bool, method string) error {
  593. if ep.isAnonymous() && len(ep.myAliases) == 0 {
  594. return nil
  595. }
  596. n := ep.getNetwork()
  597. if !n.isClusterEligible() {
  598. return nil
  599. }
  600. sb.service.Lock()
  601. defer sb.service.Unlock()
  602. log.G(context.TODO()).Debugf("deleteServiceInfoFromCluster from %s START for %s %s", method, ep.svcName, ep.ID())
  603. // Avoid a race w/ with a container that aborts preemptively. This would
  604. // get caught in disableServceInNetworkDB, but we check here to make the
  605. // nature of the condition more clear.
  606. // See comment in addServiceInfoToCluster()
  607. if e := sb.getEndpoint(ep.ID()); e == nil {
  608. log.G(context.TODO()).Warnf("deleteServiceInfoFromCluster suppressing service resolution ep is not anymore in the sandbox %s", ep.ID())
  609. return nil
  610. }
  611. c := n.getController()
  612. agent := c.getAgent()
  613. name := ep.Name()
  614. if ep.isAnonymous() {
  615. name = ep.MyAliases()[0]
  616. }
  617. if agent != nil {
  618. // First update the networkDB then locally
  619. if fullRemove {
  620. if err := agent.networkDB.DeleteEntry(libnetworkEPTable, n.ID(), ep.ID()); err != nil {
  621. log.G(context.TODO()).Warnf("deleteServiceInfoFromCluster NetworkDB DeleteEntry failed for %s %s err:%s", ep.id, n.id, err)
  622. }
  623. } else {
  624. disableServiceInNetworkDB(agent, n, ep)
  625. }
  626. }
  627. if ep.Iface() != nil && ep.Iface().Address() != nil {
  628. if ep.svcID != "" {
  629. // This is a task part of a service
  630. var ingressPorts []*PortConfig
  631. if n.ingress {
  632. ingressPorts = ep.ingressPorts
  633. }
  634. if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster", true, fullRemove); err != nil {
  635. return err
  636. }
  637. } else {
  638. // This is a container simply attached to an attachable network
  639. if err := c.delContainerNameResolution(n.ID(), ep.ID(), name, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster"); err != nil {
  640. return err
  641. }
  642. }
  643. }
  644. log.G(context.TODO()).Debugf("deleteServiceInfoFromCluster from %s END for %s %s", method, ep.svcName, ep.ID())
  645. return nil
  646. }
  647. func disableServiceInNetworkDB(a *nwAgent, n *Network, ep *Endpoint) {
  648. var epRec EndpointRecord
  649. log.G(context.TODO()).Debugf("disableServiceInNetworkDB for %s %s", ep.svcName, ep.ID())
  650. // Update existing record to indicate that the service is disabled
  651. inBuf, err := a.networkDB.GetEntry(libnetworkEPTable, n.ID(), ep.ID())
  652. if err != nil {
  653. log.G(context.TODO()).Warnf("disableServiceInNetworkDB GetEntry failed for %s %s err:%s", ep.id, n.id, err)
  654. return
  655. }
  656. // Should never fail
  657. if err := proto.Unmarshal(inBuf, &epRec); err != nil {
  658. log.G(context.TODO()).Errorf("disableServiceInNetworkDB unmarshal failed for %s %s err:%s", ep.id, n.id, err)
  659. return
  660. }
  661. epRec.ServiceDisabled = true
  662. // Should never fail
  663. outBuf, err := proto.Marshal(&epRec)
  664. if err != nil {
  665. log.G(context.TODO()).Errorf("disableServiceInNetworkDB marshalling failed for %s %s err:%s", ep.id, n.id, err)
  666. return
  667. }
  668. // Send update to the whole cluster
  669. if err := a.networkDB.UpdateEntry(libnetworkEPTable, n.ID(), ep.ID(), outBuf); err != nil {
  670. log.G(context.TODO()).Warnf("disableServiceInNetworkDB UpdateEntry failed for %s %s err:%s", ep.id, n.id, err)
  671. }
  672. }
  673. func (n *Network) addDriverWatches() {
  674. if !n.isClusterEligible() {
  675. return
  676. }
  677. c := n.getController()
  678. agent := c.getAgent()
  679. if agent == nil {
  680. return
  681. }
  682. for _, table := range n.driverTables {
  683. ch, cancel := agent.networkDB.Watch(table.name, n.ID())
  684. agent.Lock()
  685. agent.driverCancelFuncs[n.ID()] = append(agent.driverCancelFuncs[n.ID()], cancel)
  686. agent.Unlock()
  687. go c.handleTableEvents(ch, n.handleDriverTableEvent)
  688. d, err := n.driver(false)
  689. if err != nil {
  690. log.G(context.TODO()).Errorf("Could not resolve driver %s while walking driver tabl: %v", n.networkType, err)
  691. return
  692. }
  693. err = agent.networkDB.WalkTable(table.name, func(nid, key string, value []byte, deleted bool) bool {
  694. // skip the entries that are mark for deletion, this is safe because this function is
  695. // called at initialization time so there is no state to delete
  696. if nid == n.ID() && !deleted {
  697. d.EventNotify(driverapi.Create, nid, table.name, key, value)
  698. }
  699. return false
  700. })
  701. if err != nil {
  702. log.G(context.TODO()).WithError(err).Warn("Error while walking networkdb")
  703. }
  704. }
  705. }
  706. func (n *Network) cancelDriverWatches() {
  707. if !n.isClusterEligible() {
  708. return
  709. }
  710. agent := n.getController().getAgent()
  711. if agent == nil {
  712. return
  713. }
  714. agent.Lock()
  715. cancelFuncs := agent.driverCancelFuncs[n.ID()]
  716. delete(agent.driverCancelFuncs, n.ID())
  717. agent.Unlock()
  718. for _, cancel := range cancelFuncs {
  719. cancel()
  720. }
  721. }
  722. func (c *Controller) handleTableEvents(ch *events.Channel, fn func(events.Event)) {
  723. for {
  724. select {
  725. case ev := <-ch.C:
  726. fn(ev)
  727. case <-ch.Done():
  728. return
  729. }
  730. }
  731. }
  732. func (n *Network) handleDriverTableEvent(ev events.Event) {
  733. d, err := n.driver(false)
  734. if err != nil {
  735. log.G(context.TODO()).Errorf("Could not resolve driver %s while handling driver table event: %v", n.networkType, err)
  736. return
  737. }
  738. var (
  739. etype driverapi.EventType
  740. tname string
  741. key string
  742. value []byte
  743. )
  744. switch event := ev.(type) {
  745. case networkdb.CreateEvent:
  746. tname = event.Table
  747. key = event.Key
  748. value = event.Value
  749. etype = driverapi.Create
  750. case networkdb.DeleteEvent:
  751. tname = event.Table
  752. key = event.Key
  753. value = event.Value
  754. etype = driverapi.Delete
  755. case networkdb.UpdateEvent:
  756. tname = event.Table
  757. key = event.Key
  758. value = event.Value
  759. etype = driverapi.Delete
  760. }
  761. d.EventNotify(etype, n.ID(), tname, key, value)
  762. }
  763. func (c *Controller) handleNodeTableEvent(ev events.Event) {
  764. var (
  765. value []byte
  766. isAdd bool
  767. nodeAddr networkdb.NodeAddr
  768. )
  769. switch event := ev.(type) {
  770. case networkdb.CreateEvent:
  771. value = event.Value
  772. isAdd = true
  773. case networkdb.DeleteEvent:
  774. value = event.Value
  775. case networkdb.UpdateEvent:
  776. log.G(context.TODO()).Errorf("Unexpected update node table event = %#v", event)
  777. }
  778. err := json.Unmarshal(value, &nodeAddr)
  779. if err != nil {
  780. log.G(context.TODO()).Errorf("Error unmarshalling node table event %v", err)
  781. return
  782. }
  783. c.processNodeDiscovery([]net.IP{nodeAddr.Addr}, isAdd)
  784. }
  785. func (c *Controller) handleEpTableEvent(ev events.Event) {
  786. var (
  787. nid string
  788. eid string
  789. value []byte
  790. epRec EndpointRecord
  791. )
  792. switch event := ev.(type) {
  793. case networkdb.CreateEvent:
  794. nid = event.NetworkID
  795. eid = event.Key
  796. value = event.Value
  797. case networkdb.DeleteEvent:
  798. nid = event.NetworkID
  799. eid = event.Key
  800. value = event.Value
  801. case networkdb.UpdateEvent:
  802. nid = event.NetworkID
  803. eid = event.Key
  804. value = event.Value
  805. default:
  806. log.G(context.TODO()).Errorf("Unexpected update service table event = %#v", event)
  807. return
  808. }
  809. err := proto.Unmarshal(value, &epRec)
  810. if err != nil {
  811. log.G(context.TODO()).Errorf("Failed to unmarshal service table value: %v", err)
  812. return
  813. }
  814. containerName := epRec.Name
  815. svcName := epRec.ServiceName
  816. svcID := epRec.ServiceID
  817. vip := net.ParseIP(epRec.VirtualIP)
  818. ip := net.ParseIP(epRec.EndpointIP)
  819. ingressPorts := epRec.IngressPorts
  820. serviceAliases := epRec.Aliases
  821. taskAliases := epRec.TaskAliases
  822. if containerName == "" || ip == nil {
  823. log.G(context.TODO()).Errorf("Invalid endpoint name/ip received while handling service table event %s", value)
  824. return
  825. }
  826. switch ev.(type) {
  827. case networkdb.CreateEvent:
  828. log.G(context.TODO()).Debugf("handleEpTableEvent ADD %s R:%v", eid, epRec)
  829. if svcID != "" {
  830. // This is a remote task part of a service
  831. if err := c.addServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil {
  832. log.G(context.TODO()).Errorf("failed adding service binding for %s epRec:%v err:%v", eid, epRec, err)
  833. return
  834. }
  835. } else {
  836. // This is a remote container simply attached to an attachable network
  837. if err := c.addContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
  838. log.G(context.TODO()).Errorf("failed adding container name resolution for %s epRec:%v err:%v", eid, epRec, err)
  839. }
  840. }
  841. case networkdb.DeleteEvent:
  842. log.G(context.TODO()).Debugf("handleEpTableEvent DEL %s R:%v", eid, epRec)
  843. if svcID != "" {
  844. // This is a remote task part of a service
  845. if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, true); err != nil {
  846. log.G(context.TODO()).Errorf("failed removing service binding for %s epRec:%v err:%v", eid, epRec, err)
  847. return
  848. }
  849. } else {
  850. // This is a remote container simply attached to an attachable network
  851. if err := c.delContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
  852. log.G(context.TODO()).Errorf("failed removing container name resolution for %s epRec:%v err:%v", eid, epRec, err)
  853. }
  854. }
  855. case networkdb.UpdateEvent:
  856. log.G(context.TODO()).Debugf("handleEpTableEvent UPD %s R:%v", eid, epRec)
  857. // We currently should only get these to inform us that an endpoint
  858. // is disabled. Report if otherwise.
  859. if svcID == "" || !epRec.ServiceDisabled {
  860. log.G(context.TODO()).Errorf("Unexpected update table event for %s epRec:%v", eid, epRec)
  861. return
  862. }
  863. // This is a remote task that is part of a service that is now disabled
  864. if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, false); err != nil {
  865. log.G(context.TODO()).Errorf("failed disabling service binding for %s epRec:%v err:%v", eid, epRec, err)
  866. return
  867. }
  868. }
  869. }