agent.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991
  1. package libnetwork
  2. //go:generate protoc -I.:Godeps/_workspace/src/github.com/gogo/protobuf --gogo_out=import_path=github.com/docker/docker/libnetwork,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. agent.proto
  3. import (
  4. "encoding/json"
  5. "fmt"
  6. "net"
  7. "sort"
  8. "sync"
  9. "github.com/docker/docker/libnetwork/cluster"
  10. "github.com/docker/docker/libnetwork/datastore"
  11. "github.com/docker/docker/libnetwork/discoverapi"
  12. "github.com/docker/docker/libnetwork/driverapi"
  13. "github.com/docker/docker/libnetwork/networkdb"
  14. "github.com/docker/docker/libnetwork/types"
  15. "github.com/docker/go-events"
  16. "github.com/gogo/protobuf/proto"
  17. "github.com/sirupsen/logrus"
  18. )
  19. const (
  20. subsysGossip = "networking:gossip"
  21. subsysIPSec = "networking:ipsec"
  22. keyringSize = 3
  23. )
  24. // ByTime implements sort.Interface for []*types.EncryptionKey based on
  25. // the LamportTime field.
  26. type ByTime []*types.EncryptionKey
  27. func (b ByTime) Len() int { return len(b) }
  28. func (b ByTime) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
  29. func (b ByTime) Less(i, j int) bool { return b[i].LamportTime < b[j].LamportTime }
  30. type agent struct {
  31. networkDB *networkdb.NetworkDB
  32. bindAddr string
  33. advertiseAddr string
  34. dataPathAddr string
  35. coreCancelFuncs []func()
  36. driverCancelFuncs map[string][]func()
  37. sync.Mutex
  38. }
  39. func (a *agent) dataPathAddress() string {
  40. a.Lock()
  41. defer a.Unlock()
  42. if a.dataPathAddr != "" {
  43. return a.dataPathAddr
  44. }
  45. return a.advertiseAddr
  46. }
  47. const libnetworkEPTable = "endpoint_table"
  48. func getBindAddr(ifaceName string) (string, error) {
  49. iface, err := net.InterfaceByName(ifaceName)
  50. if err != nil {
  51. return "", fmt.Errorf("failed to find interface %s: %v", ifaceName, err)
  52. }
  53. addrs, err := iface.Addrs()
  54. if err != nil {
  55. return "", fmt.Errorf("failed to get interface addresses: %v", err)
  56. }
  57. for _, a := range addrs {
  58. addr, ok := a.(*net.IPNet)
  59. if !ok {
  60. continue
  61. }
  62. addrIP := addr.IP
  63. if addrIP.IsLinkLocalUnicast() {
  64. continue
  65. }
  66. return addrIP.String(), nil
  67. }
  68. return "", fmt.Errorf("failed to get bind address")
  69. }
  70. func resolveAddr(addrOrInterface string) (string, error) {
  71. // Try and see if this is a valid IP address
  72. if net.ParseIP(addrOrInterface) != nil {
  73. return addrOrInterface, nil
  74. }
  75. addr, err := net.ResolveIPAddr("ip", addrOrInterface)
  76. if err != nil {
  77. // If not a valid IP address, it should be a valid interface
  78. return getBindAddr(addrOrInterface)
  79. }
  80. return addr.String(), nil
  81. }
  82. func (c *controller) handleKeyChange(keys []*types.EncryptionKey) error {
  83. drvEnc := discoverapi.DriverEncryptionUpdate{}
  84. a := c.getAgent()
  85. if a == nil {
  86. logrus.Debug("Skipping key change as agent is nil")
  87. return nil
  88. }
  89. // Find the deleted key. If the deleted key was the primary key,
  90. // a new primary key should be set before removing if from keyring.
  91. c.Lock()
  92. added := []byte{}
  93. deleted := []byte{}
  94. j := len(c.keys)
  95. for i := 0; i < j; {
  96. same := false
  97. for _, key := range keys {
  98. if same = key.LamportTime == c.keys[i].LamportTime; same {
  99. break
  100. }
  101. }
  102. if !same {
  103. cKey := c.keys[i]
  104. if cKey.Subsystem == subsysGossip {
  105. deleted = cKey.Key
  106. }
  107. if cKey.Subsystem == subsysIPSec {
  108. drvEnc.Prune = cKey.Key
  109. drvEnc.PruneTag = cKey.LamportTime
  110. }
  111. c.keys[i], c.keys[j-1] = c.keys[j-1], c.keys[i]
  112. c.keys[j-1] = nil
  113. j--
  114. }
  115. i++
  116. }
  117. c.keys = c.keys[:j]
  118. // Find the new key and add it to the key ring
  119. for _, key := range keys {
  120. same := false
  121. for _, cKey := range c.keys {
  122. if same = cKey.LamportTime == key.LamportTime; same {
  123. break
  124. }
  125. }
  126. if !same {
  127. c.keys = append(c.keys, key)
  128. if key.Subsystem == subsysGossip {
  129. added = key.Key
  130. }
  131. if key.Subsystem == subsysIPSec {
  132. drvEnc.Key = key.Key
  133. drvEnc.Tag = key.LamportTime
  134. }
  135. }
  136. }
  137. c.Unlock()
  138. if len(added) > 0 {
  139. a.networkDB.SetKey(added)
  140. }
  141. key, _, err := c.getPrimaryKeyTag(subsysGossip)
  142. if err != nil {
  143. return err
  144. }
  145. a.networkDB.SetPrimaryKey(key)
  146. key, tag, err := c.getPrimaryKeyTag(subsysIPSec)
  147. if err != nil {
  148. return err
  149. }
  150. drvEnc.Primary = key
  151. drvEnc.PrimaryTag = tag
  152. if len(deleted) > 0 {
  153. a.networkDB.RemoveKey(deleted)
  154. }
  155. c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
  156. err := driver.DiscoverNew(discoverapi.EncryptionKeysUpdate, drvEnc)
  157. if err != nil {
  158. logrus.Warnf("Failed to update datapath keys in driver %s: %v", name, err)
  159. // Attempt to reconfigure keys in case of a update failure
  160. // which can arise due to a mismatch of keys
  161. // if worker nodes get temporarily disconnected
  162. logrus.Warnf("Reconfiguring datapath keys for %s", name)
  163. drvCfgEnc := discoverapi.DriverEncryptionConfig{}
  164. drvCfgEnc.Keys, drvCfgEnc.Tags = c.getKeys(subsysIPSec)
  165. err = driver.DiscoverNew(discoverapi.EncryptionKeysConfig, drvCfgEnc)
  166. if err != nil {
  167. logrus.Warnf("Failed to reset datapath keys in driver %s: %v", name, err)
  168. }
  169. }
  170. return false
  171. })
  172. return nil
  173. }
  174. func (c *controller) agentSetup(clusterProvider cluster.Provider) error {
  175. agent := c.getAgent()
  176. // If the agent is already present there is no need to try to initialize it again
  177. if agent != nil {
  178. return nil
  179. }
  180. bindAddr := clusterProvider.GetLocalAddress()
  181. advAddr := clusterProvider.GetAdvertiseAddress()
  182. dataAddr := clusterProvider.GetDataPathAddress()
  183. remoteList := clusterProvider.GetRemoteAddressList()
  184. remoteAddrList := make([]string, 0, len(remoteList))
  185. for _, remote := range remoteList {
  186. addr, _, _ := net.SplitHostPort(remote)
  187. remoteAddrList = append(remoteAddrList, addr)
  188. }
  189. listen := clusterProvider.GetListenAddress()
  190. listenAddr, _, _ := net.SplitHostPort(listen)
  191. logrus.Infof("Initializing Libnetwork Agent Listen-Addr=%s Local-addr=%s Adv-addr=%s Data-addr=%s Remote-addr-list=%v MTU=%d",
  192. listenAddr, bindAddr, advAddr, dataAddr, remoteAddrList, c.Config().Daemon.NetworkControlPlaneMTU)
  193. if advAddr != "" && agent == nil {
  194. if err := c.agentInit(listenAddr, bindAddr, advAddr, dataAddr); err != nil {
  195. logrus.Errorf("error in agentInit: %v", err)
  196. return err
  197. }
  198. c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
  199. if capability.ConnectivityScope == datastore.GlobalScope {
  200. c.agentDriverNotify(driver)
  201. }
  202. return false
  203. })
  204. }
  205. if len(remoteAddrList) > 0 {
  206. if err := c.agentJoin(remoteAddrList); err != nil {
  207. logrus.Errorf("Error in joining gossip cluster : %v(join will be retried in background)", err)
  208. }
  209. }
  210. return nil
  211. }
  212. // For a given subsystem getKeys sorts the keys by lamport time and returns
  213. // slice of keys and lamport time which can used as a unique tag for the keys
  214. func (c *controller) getKeys(subsys string) ([][]byte, []uint64) {
  215. c.Lock()
  216. defer c.Unlock()
  217. sort.Sort(ByTime(c.keys))
  218. keys := [][]byte{}
  219. tags := []uint64{}
  220. for _, key := range c.keys {
  221. if key.Subsystem == subsys {
  222. keys = append(keys, key.Key)
  223. tags = append(tags, key.LamportTime)
  224. }
  225. }
  226. keys[0], keys[1] = keys[1], keys[0]
  227. tags[0], tags[1] = tags[1], tags[0]
  228. return keys, tags
  229. }
  230. // getPrimaryKeyTag returns the primary key for a given subsystem from the
  231. // list of sorted key and the associated tag
  232. func (c *controller) getPrimaryKeyTag(subsys string) ([]byte, uint64, error) {
  233. c.Lock()
  234. defer c.Unlock()
  235. sort.Sort(ByTime(c.keys))
  236. keys := []*types.EncryptionKey{}
  237. for _, key := range c.keys {
  238. if key.Subsystem == subsys {
  239. keys = append(keys, key)
  240. }
  241. }
  242. return keys[1].Key, keys[1].LamportTime, nil
  243. }
  244. func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, dataPathAddr string) error {
  245. bindAddr, err := resolveAddr(bindAddrOrInterface)
  246. if err != nil {
  247. return err
  248. }
  249. keys, _ := c.getKeys(subsysGossip)
  250. netDBConf := networkdb.DefaultConfig()
  251. netDBConf.BindAddr = listenAddr
  252. netDBConf.AdvertiseAddr = advertiseAddr
  253. netDBConf.Keys = keys
  254. if c.Config().Daemon.NetworkControlPlaneMTU != 0 {
  255. // Consider the MTU remove the IP hdr (IPv4 or IPv6) and the TCP/UDP hdr.
  256. // To be on the safe side let's cut 100 bytes
  257. netDBConf.PacketBufferSize = (c.Config().Daemon.NetworkControlPlaneMTU - 100)
  258. logrus.Debugf("Control plane MTU: %d will initialize NetworkDB with: %d",
  259. c.Config().Daemon.NetworkControlPlaneMTU, netDBConf.PacketBufferSize)
  260. }
  261. nDB, err := networkdb.New(netDBConf)
  262. if err != nil {
  263. return err
  264. }
  265. // Register the diagnostic handlers
  266. c.DiagnosticServer.RegisterHandler(nDB, networkdb.NetDbPaths2Func)
  267. var cancelList []func()
  268. ch, cancel := nDB.Watch(libnetworkEPTable, "", "")
  269. cancelList = append(cancelList, cancel)
  270. nodeCh, cancel := nDB.Watch(networkdb.NodeTable, "", "")
  271. cancelList = append(cancelList, cancel)
  272. c.Lock()
  273. c.agent = &agent{
  274. networkDB: nDB,
  275. bindAddr: bindAddr,
  276. advertiseAddr: advertiseAddr,
  277. dataPathAddr: dataPathAddr,
  278. coreCancelFuncs: cancelList,
  279. driverCancelFuncs: make(map[string][]func()),
  280. }
  281. c.Unlock()
  282. go c.handleTableEvents(ch, c.handleEpTableEvent)
  283. go c.handleTableEvents(nodeCh, c.handleNodeTableEvent)
  284. drvEnc := discoverapi.DriverEncryptionConfig{}
  285. keys, tags := c.getKeys(subsysIPSec)
  286. drvEnc.Keys = keys
  287. drvEnc.Tags = tags
  288. c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
  289. err := driver.DiscoverNew(discoverapi.EncryptionKeysConfig, drvEnc)
  290. if err != nil {
  291. logrus.Warnf("Failed to set datapath keys in driver %s: %v", name, err)
  292. }
  293. return false
  294. })
  295. c.WalkNetworks(joinCluster)
  296. return nil
  297. }
  298. func (c *controller) agentJoin(remoteAddrList []string) error {
  299. agent := c.getAgent()
  300. if agent == nil {
  301. return nil
  302. }
  303. return agent.networkDB.Join(remoteAddrList)
  304. }
  305. func (c *controller) agentDriverNotify(d driverapi.Driver) {
  306. agent := c.getAgent()
  307. if agent == nil {
  308. return
  309. }
  310. if err := d.DiscoverNew(discoverapi.NodeDiscovery, discoverapi.NodeDiscoveryData{
  311. Address: agent.dataPathAddress(),
  312. BindAddress: agent.bindAddr,
  313. Self: true,
  314. }); err != nil {
  315. logrus.Warnf("Failed the node discovery in driver: %v", err)
  316. }
  317. drvEnc := discoverapi.DriverEncryptionConfig{}
  318. keys, tags := c.getKeys(subsysIPSec)
  319. drvEnc.Keys = keys
  320. drvEnc.Tags = tags
  321. if err := d.DiscoverNew(discoverapi.EncryptionKeysConfig, drvEnc); err != nil {
  322. logrus.Warnf("Failed to set datapath keys in driver: %v", err)
  323. }
  324. }
  325. func (c *controller) agentClose() {
  326. // Acquire current agent instance and reset its pointer
  327. // then run closing functions
  328. c.Lock()
  329. agent := c.agent
  330. c.agent = nil
  331. c.Unlock()
  332. // when the agent is closed the cluster provider should be cleaned up
  333. c.SetClusterProvider(nil)
  334. if agent == nil {
  335. return
  336. }
  337. var cancelList []func()
  338. agent.Lock()
  339. for _, cancelFuncs := range agent.driverCancelFuncs {
  340. cancelList = append(cancelList, cancelFuncs...)
  341. }
  342. // Add also the cancel functions for the network db
  343. cancelList = append(cancelList, agent.coreCancelFuncs...)
  344. agent.Unlock()
  345. for _, cancel := range cancelList {
  346. cancel()
  347. }
  348. agent.networkDB.Close()
  349. }
  350. // Task has the backend container details
  351. type Task struct {
  352. Name string
  353. EndpointID string
  354. EndpointIP string
  355. Info map[string]string
  356. }
  357. // ServiceInfo has service specific details along with the list of backend tasks
  358. type ServiceInfo struct {
  359. VIP string
  360. LocalLBIndex int
  361. Tasks []Task
  362. Ports []string
  363. }
  364. type epRecord struct {
  365. ep EndpointRecord
  366. info map[string]string
  367. lbIndex int
  368. }
  369. func (n *network) Services() map[string]ServiceInfo {
  370. eps := make(map[string]epRecord)
  371. if !n.isClusterEligible() {
  372. return nil
  373. }
  374. agent := n.getController().getAgent()
  375. if agent == nil {
  376. return nil
  377. }
  378. // Walk through libnetworkEPTable and fetch the driver agnostic endpoint info
  379. entries := agent.networkDB.GetTableByNetwork(libnetworkEPTable, n.id)
  380. for eid, value := range entries {
  381. var epRec EndpointRecord
  382. nid := n.ID()
  383. if err := proto.Unmarshal(value.Value, &epRec); err != nil {
  384. logrus.Errorf("Unmarshal of libnetworkEPTable failed for endpoint %s in network %s, %v", eid, nid, err)
  385. continue
  386. }
  387. i := n.getController().getLBIndex(epRec.ServiceID, nid, epRec.IngressPorts)
  388. eps[eid] = epRecord{
  389. ep: epRec,
  390. lbIndex: i,
  391. }
  392. }
  393. // Walk through the driver's tables, have the driver decode the entries
  394. // and return the tuple {ep ID, value}. value is a string that coveys
  395. // relevant info about the endpoint.
  396. d, err := n.driver(true)
  397. if err != nil {
  398. logrus.Errorf("Could not resolve driver for network %s/%s while fetching services: %v", n.networkType, n.ID(), err)
  399. return nil
  400. }
  401. for _, table := range n.driverTables {
  402. if table.objType != driverapi.EndpointObject {
  403. continue
  404. }
  405. entries := agent.networkDB.GetTableByNetwork(table.name, n.id)
  406. for key, value := range entries {
  407. epID, info := d.DecodeTableEntry(table.name, key, value.Value)
  408. if ep, ok := eps[epID]; !ok {
  409. logrus.Errorf("Inconsistent driver and libnetwork state for endpoint %s", epID)
  410. } else {
  411. ep.info = info
  412. eps[epID] = ep
  413. }
  414. }
  415. }
  416. // group the endpoints into a map keyed by the service name
  417. sinfo := make(map[string]ServiceInfo)
  418. for ep, epr := range eps {
  419. var (
  420. s ServiceInfo
  421. ok bool
  422. )
  423. if s, ok = sinfo[epr.ep.ServiceName]; !ok {
  424. s = ServiceInfo{
  425. VIP: epr.ep.VirtualIP,
  426. LocalLBIndex: epr.lbIndex,
  427. }
  428. }
  429. ports := []string{}
  430. if s.Ports == nil {
  431. for _, port := range epr.ep.IngressPorts {
  432. p := fmt.Sprintf("Target: %d, Publish: %d", port.TargetPort, port.PublishedPort)
  433. ports = append(ports, p)
  434. }
  435. s.Ports = ports
  436. }
  437. s.Tasks = append(s.Tasks, Task{
  438. Name: epr.ep.Name,
  439. EndpointID: ep,
  440. EndpointIP: epr.ep.EndpointIP,
  441. Info: epr.info,
  442. })
  443. sinfo[epr.ep.ServiceName] = s
  444. }
  445. return sinfo
  446. }
  447. func (n *network) isClusterEligible() bool {
  448. if n.scope != datastore.SwarmScope || !n.driverIsMultihost() {
  449. return false
  450. }
  451. return n.getController().getAgent() != nil
  452. }
  453. func (n *network) joinCluster() error {
  454. if !n.isClusterEligible() {
  455. return nil
  456. }
  457. agent := n.getController().getAgent()
  458. if agent == nil {
  459. return nil
  460. }
  461. return agent.networkDB.JoinNetwork(n.ID())
  462. }
  463. func (n *network) leaveCluster() error {
  464. if !n.isClusterEligible() {
  465. return nil
  466. }
  467. agent := n.getController().getAgent()
  468. if agent == nil {
  469. return nil
  470. }
  471. return agent.networkDB.LeaveNetwork(n.ID())
  472. }
  473. func (ep *endpoint) addDriverInfoToCluster() error {
  474. n := ep.getNetwork()
  475. if !n.isClusterEligible() {
  476. return nil
  477. }
  478. if ep.joinInfo == nil {
  479. return nil
  480. }
  481. agent := n.getController().getAgent()
  482. if agent == nil {
  483. return nil
  484. }
  485. for _, te := range ep.joinInfo.driverTableEntries {
  486. if err := agent.networkDB.CreateEntry(te.tableName, n.ID(), te.key, te.value); err != nil {
  487. return err
  488. }
  489. }
  490. return nil
  491. }
  492. func (ep *endpoint) deleteDriverInfoFromCluster() error {
  493. n := ep.getNetwork()
  494. if !n.isClusterEligible() {
  495. return nil
  496. }
  497. if ep.joinInfo == nil {
  498. return nil
  499. }
  500. agent := n.getController().getAgent()
  501. if agent == nil {
  502. return nil
  503. }
  504. for _, te := range ep.joinInfo.driverTableEntries {
  505. if err := agent.networkDB.DeleteEntry(te.tableName, n.ID(), te.key); err != nil {
  506. return err
  507. }
  508. }
  509. return nil
  510. }
  511. func (ep *endpoint) addServiceInfoToCluster(sb *sandbox) error {
  512. if ep.isAnonymous() && len(ep.myAliases) == 0 || ep.Iface() == nil || ep.Iface().Address() == nil {
  513. return nil
  514. }
  515. n := ep.getNetwork()
  516. if !n.isClusterEligible() {
  517. return nil
  518. }
  519. sb.Service.Lock()
  520. defer sb.Service.Unlock()
  521. logrus.Debugf("addServiceInfoToCluster START for %s %s", ep.svcName, ep.ID())
  522. // Check that the endpoint is still present on the sandbox before adding it to the service discovery.
  523. // This is to handle a race between the EnableService and the sbLeave
  524. // It is possible that the EnableService starts, fetches the list of the endpoints and
  525. // by the time the addServiceInfoToCluster is called the endpoint got removed from the sandbox
  526. // The risk is that the deleteServiceInfoToCluster happens before the addServiceInfoToCluster.
  527. // This check under the Service lock of the sandbox ensure the correct behavior.
  528. // If the addServiceInfoToCluster arrives first may find or not the endpoint and will proceed or exit
  529. // but in any case the deleteServiceInfoToCluster will follow doing the cleanup if needed.
  530. // In case the deleteServiceInfoToCluster arrives first, this one is happening after the endpoint is
  531. // removed from the list, in this situation the delete will bail out not finding any data to cleanup
  532. // and the add will bail out not finding the endpoint on the sandbox.
  533. if e := sb.getEndpoint(ep.ID()); e == nil {
  534. logrus.Warnf("addServiceInfoToCluster suppressing service resolution ep is not anymore in the sandbox %s", ep.ID())
  535. return nil
  536. }
  537. c := n.getController()
  538. agent := c.getAgent()
  539. name := ep.Name()
  540. if ep.isAnonymous() {
  541. name = ep.MyAliases()[0]
  542. }
  543. var ingressPorts []*PortConfig
  544. if ep.svcID != "" {
  545. // This is a task part of a service
  546. // Gossip ingress ports only in ingress network.
  547. if n.ingress {
  548. ingressPorts = ep.ingressPorts
  549. }
  550. if err := c.addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "addServiceInfoToCluster"); err != nil {
  551. return err
  552. }
  553. } else {
  554. // This is a container simply attached to an attachable network
  555. if err := c.addContainerNameResolution(n.ID(), ep.ID(), name, ep.myAliases, ep.Iface().Address().IP, "addServiceInfoToCluster"); err != nil {
  556. return err
  557. }
  558. }
  559. buf, err := proto.Marshal(&EndpointRecord{
  560. Name: name,
  561. ServiceName: ep.svcName,
  562. ServiceID: ep.svcID,
  563. VirtualIP: ep.virtualIP.String(),
  564. IngressPorts: ingressPorts,
  565. Aliases: ep.svcAliases,
  566. TaskAliases: ep.myAliases,
  567. EndpointIP: ep.Iface().Address().IP.String(),
  568. ServiceDisabled: false,
  569. })
  570. if err != nil {
  571. return err
  572. }
  573. if agent != nil {
  574. if err := agent.networkDB.CreateEntry(libnetworkEPTable, n.ID(), ep.ID(), buf); err != nil {
  575. logrus.Warnf("addServiceInfoToCluster NetworkDB CreateEntry failed for %s %s err:%s", ep.id, n.id, err)
  576. return err
  577. }
  578. }
  579. logrus.Debugf("addServiceInfoToCluster END for %s %s", ep.svcName, ep.ID())
  580. return nil
  581. }
  582. func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, fullRemove bool, method string) error {
  583. if ep.isAnonymous() && len(ep.myAliases) == 0 {
  584. return nil
  585. }
  586. n := ep.getNetwork()
  587. if !n.isClusterEligible() {
  588. return nil
  589. }
  590. sb.Service.Lock()
  591. defer sb.Service.Unlock()
  592. logrus.Debugf("deleteServiceInfoFromCluster from %s START for %s %s", method, ep.svcName, ep.ID())
  593. // Avoid a race w/ with a container that aborts preemptively. This would
  594. // get caught in disableServceInNetworkDB, but we check here to make the
  595. // nature of the condition more clear.
  596. // See comment in addServiceInfoToCluster()
  597. if e := sb.getEndpoint(ep.ID()); e == nil {
  598. logrus.Warnf("deleteServiceInfoFromCluster suppressing service resolution ep is not anymore in the sandbox %s", ep.ID())
  599. return nil
  600. }
  601. c := n.getController()
  602. agent := c.getAgent()
  603. name := ep.Name()
  604. if ep.isAnonymous() {
  605. name = ep.MyAliases()[0]
  606. }
  607. if agent != nil {
  608. // First update the networkDB then locally
  609. if fullRemove {
  610. if err := agent.networkDB.DeleteEntry(libnetworkEPTable, n.ID(), ep.ID()); err != nil {
  611. logrus.Warnf("deleteServiceInfoFromCluster NetworkDB DeleteEntry failed for %s %s err:%s", ep.id, n.id, err)
  612. }
  613. } else {
  614. disableServiceInNetworkDB(agent, n, ep)
  615. }
  616. }
  617. if ep.Iface() != nil && ep.Iface().Address() != nil {
  618. if ep.svcID != "" {
  619. // This is a task part of a service
  620. var ingressPorts []*PortConfig
  621. if n.ingress {
  622. ingressPorts = ep.ingressPorts
  623. }
  624. if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster", true, fullRemove); err != nil {
  625. return err
  626. }
  627. } else {
  628. // This is a container simply attached to an attachable network
  629. if err := c.delContainerNameResolution(n.ID(), ep.ID(), name, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster"); err != nil {
  630. return err
  631. }
  632. }
  633. }
  634. logrus.Debugf("deleteServiceInfoFromCluster from %s END for %s %s", method, ep.svcName, ep.ID())
  635. return nil
  636. }
  637. func disableServiceInNetworkDB(a *agent, n *network, ep *endpoint) {
  638. var epRec EndpointRecord
  639. logrus.Debugf("disableServiceInNetworkDB for %s %s", ep.svcName, ep.ID())
  640. // Update existing record to indicate that the service is disabled
  641. inBuf, err := a.networkDB.GetEntry(libnetworkEPTable, n.ID(), ep.ID())
  642. if err != nil {
  643. logrus.Warnf("disableServiceInNetworkDB GetEntry failed for %s %s err:%s", ep.id, n.id, err)
  644. return
  645. }
  646. // Should never fail
  647. if err := proto.Unmarshal(inBuf, &epRec); err != nil {
  648. logrus.Errorf("disableServiceInNetworkDB unmarshal failed for %s %s err:%s", ep.id, n.id, err)
  649. return
  650. }
  651. epRec.ServiceDisabled = true
  652. // Should never fail
  653. outBuf, err := proto.Marshal(&epRec)
  654. if err != nil {
  655. logrus.Errorf("disableServiceInNetworkDB marshalling failed for %s %s err:%s", ep.id, n.id, err)
  656. return
  657. }
  658. // Send update to the whole cluster
  659. if err := a.networkDB.UpdateEntry(libnetworkEPTable, n.ID(), ep.ID(), outBuf); err != nil {
  660. logrus.Warnf("disableServiceInNetworkDB UpdateEntry failed for %s %s err:%s", ep.id, n.id, err)
  661. }
  662. }
  663. func (n *network) addDriverWatches() {
  664. if !n.isClusterEligible() {
  665. return
  666. }
  667. c := n.getController()
  668. agent := c.getAgent()
  669. if agent == nil {
  670. return
  671. }
  672. for _, table := range n.driverTables {
  673. ch, cancel := agent.networkDB.Watch(table.name, n.ID(), "")
  674. agent.Lock()
  675. agent.driverCancelFuncs[n.ID()] = append(agent.driverCancelFuncs[n.ID()], cancel)
  676. agent.Unlock()
  677. go c.handleTableEvents(ch, n.handleDriverTableEvent)
  678. d, err := n.driver(false)
  679. if err != nil {
  680. logrus.Errorf("Could not resolve driver %s while walking driver tabl: %v", n.networkType, err)
  681. return
  682. }
  683. err = agent.networkDB.WalkTable(table.name, func(nid, key string, value []byte, deleted bool) bool {
  684. // skip the entries that are mark for deletion, this is safe because this function is
  685. // called at initialization time so there is no state to delete
  686. if nid == n.ID() && !deleted {
  687. d.EventNotify(driverapi.Create, nid, table.name, key, value)
  688. }
  689. return false
  690. })
  691. if err != nil {
  692. logrus.WithError(err).Warn("Error while walking networkdb")
  693. }
  694. }
  695. }
  696. func (n *network) cancelDriverWatches() {
  697. if !n.isClusterEligible() {
  698. return
  699. }
  700. agent := n.getController().getAgent()
  701. if agent == nil {
  702. return
  703. }
  704. agent.Lock()
  705. cancelFuncs := agent.driverCancelFuncs[n.ID()]
  706. delete(agent.driverCancelFuncs, n.ID())
  707. agent.Unlock()
  708. for _, cancel := range cancelFuncs {
  709. cancel()
  710. }
  711. }
  712. func (c *controller) handleTableEvents(ch *events.Channel, fn func(events.Event)) {
  713. for {
  714. select {
  715. case ev := <-ch.C:
  716. fn(ev)
  717. case <-ch.Done():
  718. return
  719. }
  720. }
  721. }
  722. func (n *network) handleDriverTableEvent(ev events.Event) {
  723. d, err := n.driver(false)
  724. if err != nil {
  725. logrus.Errorf("Could not resolve driver %s while handling driver table event: %v", n.networkType, err)
  726. return
  727. }
  728. var (
  729. etype driverapi.EventType
  730. tname string
  731. key string
  732. value []byte
  733. )
  734. switch event := ev.(type) {
  735. case networkdb.CreateEvent:
  736. tname = event.Table
  737. key = event.Key
  738. value = event.Value
  739. etype = driverapi.Create
  740. case networkdb.DeleteEvent:
  741. tname = event.Table
  742. key = event.Key
  743. value = event.Value
  744. etype = driverapi.Delete
  745. case networkdb.UpdateEvent:
  746. tname = event.Table
  747. key = event.Key
  748. value = event.Value
  749. etype = driverapi.Delete
  750. }
  751. d.EventNotify(etype, n.ID(), tname, key, value)
  752. }
  753. func (c *controller) handleNodeTableEvent(ev events.Event) {
  754. var (
  755. value []byte
  756. isAdd bool
  757. nodeAddr networkdb.NodeAddr
  758. )
  759. switch event := ev.(type) {
  760. case networkdb.CreateEvent:
  761. value = event.Value
  762. isAdd = true
  763. case networkdb.DeleteEvent:
  764. value = event.Value
  765. case networkdb.UpdateEvent:
  766. logrus.Errorf("Unexpected update node table event = %#v", event)
  767. }
  768. err := json.Unmarshal(value, &nodeAddr)
  769. if err != nil {
  770. logrus.Errorf("Error unmarshalling node table event %v", err)
  771. return
  772. }
  773. c.processNodeDiscovery([]net.IP{nodeAddr.Addr}, isAdd)
  774. }
  775. func (c *controller) handleEpTableEvent(ev events.Event) {
  776. var (
  777. nid string
  778. eid string
  779. value []byte
  780. epRec EndpointRecord
  781. )
  782. switch event := ev.(type) {
  783. case networkdb.CreateEvent:
  784. nid = event.NetworkID
  785. eid = event.Key
  786. value = event.Value
  787. case networkdb.DeleteEvent:
  788. nid = event.NetworkID
  789. eid = event.Key
  790. value = event.Value
  791. case networkdb.UpdateEvent:
  792. nid = event.NetworkID
  793. eid = event.Key
  794. value = event.Value
  795. default:
  796. logrus.Errorf("Unexpected update service table event = %#v", event)
  797. return
  798. }
  799. err := proto.Unmarshal(value, &epRec)
  800. if err != nil {
  801. logrus.Errorf("Failed to unmarshal service table value: %v", err)
  802. return
  803. }
  804. containerName := epRec.Name
  805. svcName := epRec.ServiceName
  806. svcID := epRec.ServiceID
  807. vip := net.ParseIP(epRec.VirtualIP)
  808. ip := net.ParseIP(epRec.EndpointIP)
  809. ingressPorts := epRec.IngressPorts
  810. serviceAliases := epRec.Aliases
  811. taskAliases := epRec.TaskAliases
  812. if containerName == "" || ip == nil {
  813. logrus.Errorf("Invalid endpoint name/ip received while handling service table event %s", value)
  814. return
  815. }
  816. switch ev.(type) {
  817. case networkdb.CreateEvent:
  818. logrus.Debugf("handleEpTableEvent ADD %s R:%v", eid, epRec)
  819. if svcID != "" {
  820. // This is a remote task part of a service
  821. if err := c.addServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil {
  822. logrus.Errorf("failed adding service binding for %s epRec:%v err:%v", eid, epRec, err)
  823. return
  824. }
  825. } else {
  826. // This is a remote container simply attached to an attachable network
  827. if err := c.addContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
  828. logrus.Errorf("failed adding container name resolution for %s epRec:%v err:%v", eid, epRec, err)
  829. }
  830. }
  831. case networkdb.DeleteEvent:
  832. logrus.Debugf("handleEpTableEvent DEL %s R:%v", eid, epRec)
  833. if svcID != "" {
  834. // This is a remote task part of a service
  835. if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, true); err != nil {
  836. logrus.Errorf("failed removing service binding for %s epRec:%v err:%v", eid, epRec, err)
  837. return
  838. }
  839. } else {
  840. // This is a remote container simply attached to an attachable network
  841. if err := c.delContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
  842. logrus.Errorf("failed removing container name resolution for %s epRec:%v err:%v", eid, epRec, err)
  843. }
  844. }
  845. case networkdb.UpdateEvent:
  846. logrus.Debugf("handleEpTableEvent UPD %s R:%v", eid, epRec)
  847. // We currently should only get these to inform us that an endpoint
  848. // is disabled. Report if otherwise.
  849. if svcID == "" || !epRec.ServiceDisabled {
  850. logrus.Errorf("Unexpected update table event for %s epRec:%v", eid, epRec)
  851. return
  852. }
  853. // This is a remote task that is part of a service that is now disabled
  854. if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, false); err != nil {
  855. logrus.Errorf("failed disabling service binding for %s epRec:%v err:%v", eid, epRec, err)
  856. return
  857. }
  858. }
  859. }