agent.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873
  1. package libnetwork
  2. //go:generate protoc -I.:Godeps/_workspace/src/github.com/gogo/protobuf --gogo_out=import_path=github.com/docker/libnetwork,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. agent.proto
  3. import (
  4. "encoding/json"
  5. "fmt"
  6. "net"
  7. "os"
  8. "sort"
  9. "sync"
  10. "github.com/Sirupsen/logrus"
  11. "github.com/docker/docker/pkg/stringid"
  12. "github.com/docker/go-events"
  13. "github.com/docker/libnetwork/cluster"
  14. "github.com/docker/libnetwork/datastore"
  15. "github.com/docker/libnetwork/discoverapi"
  16. "github.com/docker/libnetwork/driverapi"
  17. "github.com/docker/libnetwork/networkdb"
  18. "github.com/docker/libnetwork/types"
  19. "github.com/gogo/protobuf/proto"
  20. )
  21. const (
  22. subsysGossip = "networking:gossip"
  23. subsysIPSec = "networking:ipsec"
  24. keyringSize = 3
  25. )
  26. // ByTime implements sort.Interface for []*types.EncryptionKey based on
  27. // the LamportTime field.
  28. type ByTime []*types.EncryptionKey
  29. func (b ByTime) Len() int { return len(b) }
  30. func (b ByTime) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
  31. func (b ByTime) Less(i, j int) bool { return b[i].LamportTime < b[j].LamportTime }
  32. type agent struct {
  33. networkDB *networkdb.NetworkDB
  34. bindAddr string
  35. advertiseAddr string
  36. dataPathAddr string
  37. coreCancelFuncs []func()
  38. driverCancelFuncs map[string][]func()
  39. sync.Mutex
  40. }
  41. func (a *agent) dataPathAddress() string {
  42. a.Lock()
  43. defer a.Unlock()
  44. if a.dataPathAddr != "" {
  45. return a.dataPathAddr
  46. }
  47. return a.advertiseAddr
  48. }
  49. const libnetworkEPTable = "endpoint_table"
  50. func getBindAddr(ifaceName string) (string, error) {
  51. iface, err := net.InterfaceByName(ifaceName)
  52. if err != nil {
  53. return "", fmt.Errorf("failed to find interface %s: %v", ifaceName, err)
  54. }
  55. addrs, err := iface.Addrs()
  56. if err != nil {
  57. return "", fmt.Errorf("failed to get interface addresses: %v", err)
  58. }
  59. for _, a := range addrs {
  60. addr, ok := a.(*net.IPNet)
  61. if !ok {
  62. continue
  63. }
  64. addrIP := addr.IP
  65. if addrIP.IsLinkLocalUnicast() {
  66. continue
  67. }
  68. return addrIP.String(), nil
  69. }
  70. return "", fmt.Errorf("failed to get bind address")
  71. }
  72. func resolveAddr(addrOrInterface string) (string, error) {
  73. // Try and see if this is a valid IP address
  74. if net.ParseIP(addrOrInterface) != nil {
  75. return addrOrInterface, nil
  76. }
  77. addr, err := net.ResolveIPAddr("ip", addrOrInterface)
  78. if err != nil {
  79. // If not a valid IP address, it should be a valid interface
  80. return getBindAddr(addrOrInterface)
  81. }
  82. return addr.String(), nil
  83. }
  84. func (c *controller) handleKeyChange(keys []*types.EncryptionKey) error {
  85. drvEnc := discoverapi.DriverEncryptionUpdate{}
  86. a := c.getAgent()
  87. if a == nil {
  88. logrus.Debug("Skipping key change as agent is nil")
  89. return nil
  90. }
  91. // Find the deleted key. If the deleted key was the primary key,
  92. // a new primary key should be set before removing if from keyring.
  93. c.Lock()
  94. added := []byte{}
  95. deleted := []byte{}
  96. j := len(c.keys)
  97. for i := 0; i < j; {
  98. same := false
  99. for _, key := range keys {
  100. if same = key.LamportTime == c.keys[i].LamportTime; same {
  101. break
  102. }
  103. }
  104. if !same {
  105. cKey := c.keys[i]
  106. if cKey.Subsystem == subsysGossip {
  107. deleted = cKey.Key
  108. }
  109. if cKey.Subsystem == subsysIPSec {
  110. drvEnc.Prune = cKey.Key
  111. drvEnc.PruneTag = cKey.LamportTime
  112. }
  113. c.keys[i], c.keys[j-1] = c.keys[j-1], c.keys[i]
  114. c.keys[j-1] = nil
  115. j--
  116. }
  117. i++
  118. }
  119. c.keys = c.keys[:j]
  120. // Find the new key and add it to the key ring
  121. for _, key := range keys {
  122. same := false
  123. for _, cKey := range c.keys {
  124. if same = cKey.LamportTime == key.LamportTime; same {
  125. break
  126. }
  127. }
  128. if !same {
  129. c.keys = append(c.keys, key)
  130. if key.Subsystem == subsysGossip {
  131. added = key.Key
  132. }
  133. if key.Subsystem == subsysIPSec {
  134. drvEnc.Key = key.Key
  135. drvEnc.Tag = key.LamportTime
  136. }
  137. }
  138. }
  139. c.Unlock()
  140. if len(added) > 0 {
  141. a.networkDB.SetKey(added)
  142. }
  143. key, tag, err := c.getPrimaryKeyTag(subsysGossip)
  144. if err != nil {
  145. return err
  146. }
  147. a.networkDB.SetPrimaryKey(key)
  148. key, tag, err = c.getPrimaryKeyTag(subsysIPSec)
  149. if err != nil {
  150. return err
  151. }
  152. drvEnc.Primary = key
  153. drvEnc.PrimaryTag = tag
  154. if len(deleted) > 0 {
  155. a.networkDB.RemoveKey(deleted)
  156. }
  157. c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
  158. err := driver.DiscoverNew(discoverapi.EncryptionKeysUpdate, drvEnc)
  159. if err != nil {
  160. logrus.Warnf("Failed to update datapath keys in driver %s: %v", name, err)
  161. }
  162. return false
  163. })
  164. return nil
  165. }
  166. func (c *controller) agentSetup(clusterProvider cluster.Provider) error {
  167. agent := c.getAgent()
  168. // If the agent is already present there is no need to try to initilize it again
  169. if agent != nil {
  170. return nil
  171. }
  172. bindAddr := clusterProvider.GetLocalAddress()
  173. advAddr := clusterProvider.GetAdvertiseAddress()
  174. dataAddr := clusterProvider.GetDataPathAddress()
  175. remoteList := clusterProvider.GetRemoteAddressList()
  176. remoteAddrList := make([]string, 0, len(remoteList))
  177. for _, remote := range remoteList {
  178. addr, _, _ := net.SplitHostPort(remote)
  179. remoteAddrList = append(remoteAddrList, addr)
  180. }
  181. listen := clusterProvider.GetListenAddress()
  182. listenAddr, _, _ := net.SplitHostPort(listen)
  183. logrus.Infof("Initializing Libnetwork Agent Listen-Addr=%s Local-addr=%s Adv-addr=%s Data-addr=%s Remote-addr-list=%v",
  184. listenAddr, bindAddr, advAddr, dataAddr, remoteAddrList)
  185. if advAddr != "" && agent == nil {
  186. if err := c.agentInit(listenAddr, bindAddr, advAddr, dataAddr); err != nil {
  187. logrus.Errorf("error in agentInit: %v", err)
  188. return err
  189. }
  190. c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
  191. if capability.ConnectivityScope == datastore.GlobalScope {
  192. c.agentDriverNotify(driver)
  193. }
  194. return false
  195. })
  196. }
  197. if len(remoteAddrList) > 0 {
  198. if err := c.agentJoin(remoteAddrList); err != nil {
  199. logrus.Errorf("Error in joining gossip cluster : %v(join will be retried in background)", err)
  200. }
  201. }
  202. return nil
  203. }
  204. // For a given subsystem getKeys sorts the keys by lamport time and returns
  205. // slice of keys and lamport time which can used as a unique tag for the keys
  206. func (c *controller) getKeys(subsys string) ([][]byte, []uint64) {
  207. c.Lock()
  208. defer c.Unlock()
  209. sort.Sort(ByTime(c.keys))
  210. keys := [][]byte{}
  211. tags := []uint64{}
  212. for _, key := range c.keys {
  213. if key.Subsystem == subsys {
  214. keys = append(keys, key.Key)
  215. tags = append(tags, key.LamportTime)
  216. }
  217. }
  218. keys[0], keys[1] = keys[1], keys[0]
  219. tags[0], tags[1] = tags[1], tags[0]
  220. return keys, tags
  221. }
  222. // getPrimaryKeyTag returns the primary key for a given subsystem from the
  223. // list of sorted key and the associated tag
  224. func (c *controller) getPrimaryKeyTag(subsys string) ([]byte, uint64, error) {
  225. c.Lock()
  226. defer c.Unlock()
  227. sort.Sort(ByTime(c.keys))
  228. keys := []*types.EncryptionKey{}
  229. for _, key := range c.keys {
  230. if key.Subsystem == subsys {
  231. keys = append(keys, key)
  232. }
  233. }
  234. return keys[1].Key, keys[1].LamportTime, nil
  235. }
  236. func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, dataPathAddr string) error {
  237. bindAddr, err := resolveAddr(bindAddrOrInterface)
  238. if err != nil {
  239. return err
  240. }
  241. keys, _ := c.getKeys(subsysGossip)
  242. hostname, _ := os.Hostname()
  243. nodeName := hostname + "-" + stringid.TruncateID(stringid.GenerateRandomID())
  244. logrus.Info("Gossip cluster hostname ", nodeName)
  245. nDB, err := networkdb.New(&networkdb.Config{
  246. BindAddr: listenAddr,
  247. AdvertiseAddr: advertiseAddr,
  248. NodeName: nodeName,
  249. Keys: keys,
  250. })
  251. if err != nil {
  252. return err
  253. }
  254. var cancelList []func()
  255. ch, cancel := nDB.Watch(libnetworkEPTable, "", "")
  256. cancelList = append(cancelList, cancel)
  257. nodeCh, cancel := nDB.Watch(networkdb.NodeTable, "", "")
  258. cancelList = append(cancelList, cancel)
  259. c.Lock()
  260. c.agent = &agent{
  261. networkDB: nDB,
  262. bindAddr: bindAddr,
  263. advertiseAddr: advertiseAddr,
  264. dataPathAddr: dataPathAddr,
  265. coreCancelFuncs: cancelList,
  266. driverCancelFuncs: make(map[string][]func()),
  267. }
  268. c.Unlock()
  269. go c.handleTableEvents(ch, c.handleEpTableEvent)
  270. go c.handleTableEvents(nodeCh, c.handleNodeTableEvent)
  271. drvEnc := discoverapi.DriverEncryptionConfig{}
  272. keys, tags := c.getKeys(subsysIPSec)
  273. drvEnc.Keys = keys
  274. drvEnc.Tags = tags
  275. c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
  276. err := driver.DiscoverNew(discoverapi.EncryptionKeysConfig, drvEnc)
  277. if err != nil {
  278. logrus.Warnf("Failed to set datapath keys in driver %s: %v", name, err)
  279. }
  280. return false
  281. })
  282. c.WalkNetworks(joinCluster)
  283. return nil
  284. }
  285. func (c *controller) agentJoin(remoteAddrList []string) error {
  286. agent := c.getAgent()
  287. if agent == nil {
  288. return nil
  289. }
  290. return agent.networkDB.Join(remoteAddrList)
  291. }
  292. func (c *controller) agentDriverNotify(d driverapi.Driver) {
  293. agent := c.getAgent()
  294. if agent == nil {
  295. return
  296. }
  297. if err := d.DiscoverNew(discoverapi.NodeDiscovery, discoverapi.NodeDiscoveryData{
  298. Address: agent.dataPathAddress(),
  299. BindAddress: agent.bindAddr,
  300. Self: true,
  301. }); err != nil {
  302. logrus.Warnf("Failed the node discovery in driver: %v", err)
  303. }
  304. drvEnc := discoverapi.DriverEncryptionConfig{}
  305. keys, tags := c.getKeys(subsysIPSec)
  306. drvEnc.Keys = keys
  307. drvEnc.Tags = tags
  308. if err := d.DiscoverNew(discoverapi.EncryptionKeysConfig, drvEnc); err != nil {
  309. logrus.Warnf("Failed to set datapath keys in driver: %v", err)
  310. }
  311. }
  312. func (c *controller) agentClose() {
  313. // Acquire current agent instance and reset its pointer
  314. // then run closing functions
  315. c.Lock()
  316. agent := c.agent
  317. c.agent = nil
  318. c.Unlock()
  319. if agent == nil {
  320. return
  321. }
  322. var cancelList []func()
  323. agent.Lock()
  324. for _, cancelFuncs := range agent.driverCancelFuncs {
  325. for _, cancel := range cancelFuncs {
  326. cancelList = append(cancelList, cancel)
  327. }
  328. }
  329. // Add also the cancel functions for the network db
  330. for _, cancel := range agent.coreCancelFuncs {
  331. cancelList = append(cancelList, cancel)
  332. }
  333. agent.Unlock()
  334. for _, cancel := range cancelList {
  335. cancel()
  336. }
  337. agent.networkDB.Close()
  338. }
  339. // Task has the backend container details
  340. type Task struct {
  341. Name string
  342. EndpointID string
  343. EndpointIP string
  344. Info map[string]string
  345. }
  346. // ServiceInfo has service specific details along with the list of backend tasks
  347. type ServiceInfo struct {
  348. VIP string
  349. LocalLBIndex int
  350. Tasks []Task
  351. Ports []string
  352. }
  353. type epRecord struct {
  354. ep EndpointRecord
  355. info map[string]string
  356. lbIndex int
  357. }
  358. func (n *network) Services() map[string]ServiceInfo {
  359. eps := make(map[string]epRecord)
  360. if !n.isClusterEligible() {
  361. return nil
  362. }
  363. agent := n.getController().getAgent()
  364. if agent == nil {
  365. return nil
  366. }
  367. // Walk through libnetworkEPTable and fetch the driver agnostic endpoint info
  368. entries := agent.networkDB.GetTableByNetwork(libnetworkEPTable, n.id)
  369. for eid, value := range entries {
  370. var epRec EndpointRecord
  371. nid := n.ID()
  372. if err := proto.Unmarshal(value.([]byte), &epRec); err != nil {
  373. logrus.Errorf("Unmarshal of libnetworkEPTable failed for endpoint %s in network %s, %v", eid, nid, err)
  374. continue
  375. }
  376. i := n.getController().getLBIndex(epRec.ServiceID, nid, epRec.IngressPorts)
  377. eps[eid] = epRecord{
  378. ep: epRec,
  379. lbIndex: i,
  380. }
  381. }
  382. // Walk through the driver's tables, have the driver decode the entries
  383. // and return the tuple {ep ID, value}. value is a string that coveys
  384. // relevant info about the endpoint.
  385. d, err := n.driver(true)
  386. if err != nil {
  387. logrus.Errorf("Could not resolve driver for network %s/%s while fetching services: %v", n.networkType, n.ID(), err)
  388. return nil
  389. }
  390. for _, table := range n.driverTables {
  391. if table.objType != driverapi.EndpointObject {
  392. continue
  393. }
  394. entries := agent.networkDB.GetTableByNetwork(table.name, n.id)
  395. for key, value := range entries {
  396. epID, info := d.DecodeTableEntry(table.name, key, value.([]byte))
  397. if ep, ok := eps[epID]; !ok {
  398. logrus.Errorf("Inconsistent driver and libnetwork state for endpoint %s", epID)
  399. } else {
  400. ep.info = info
  401. eps[epID] = ep
  402. }
  403. }
  404. }
  405. // group the endpoints into a map keyed by the service name
  406. sinfo := make(map[string]ServiceInfo)
  407. for ep, epr := range eps {
  408. var (
  409. s ServiceInfo
  410. ok bool
  411. )
  412. if s, ok = sinfo[epr.ep.ServiceName]; !ok {
  413. s = ServiceInfo{
  414. VIP: epr.ep.VirtualIP,
  415. LocalLBIndex: epr.lbIndex,
  416. }
  417. }
  418. ports := []string{}
  419. if s.Ports == nil {
  420. for _, port := range epr.ep.IngressPorts {
  421. p := fmt.Sprintf("Target: %d, Publish: %d", port.TargetPort, port.PublishedPort)
  422. ports = append(ports, p)
  423. }
  424. s.Ports = ports
  425. }
  426. s.Tasks = append(s.Tasks, Task{
  427. Name: epr.ep.Name,
  428. EndpointID: ep,
  429. EndpointIP: epr.ep.EndpointIP,
  430. Info: epr.info,
  431. })
  432. sinfo[epr.ep.ServiceName] = s
  433. }
  434. return sinfo
  435. }
  436. func (n *network) isClusterEligible() bool {
  437. if n.scope != datastore.SwarmScope || !n.driverIsMultihost() {
  438. return false
  439. }
  440. return n.getController().getAgent() != nil
  441. }
  442. func (n *network) joinCluster() error {
  443. if !n.isClusterEligible() {
  444. return nil
  445. }
  446. agent := n.getController().getAgent()
  447. if agent == nil {
  448. return nil
  449. }
  450. return agent.networkDB.JoinNetwork(n.ID())
  451. }
  452. func (n *network) leaveCluster() error {
  453. if !n.isClusterEligible() {
  454. return nil
  455. }
  456. agent := n.getController().getAgent()
  457. if agent == nil {
  458. return nil
  459. }
  460. return agent.networkDB.LeaveNetwork(n.ID())
  461. }
  462. func (ep *endpoint) addDriverInfoToCluster() error {
  463. n := ep.getNetwork()
  464. if !n.isClusterEligible() {
  465. return nil
  466. }
  467. if ep.joinInfo == nil {
  468. return nil
  469. }
  470. agent := n.getController().getAgent()
  471. if agent == nil {
  472. return nil
  473. }
  474. for _, te := range ep.joinInfo.driverTableEntries {
  475. if err := agent.networkDB.CreateEntry(te.tableName, n.ID(), te.key, te.value); err != nil {
  476. return err
  477. }
  478. }
  479. return nil
  480. }
  481. func (ep *endpoint) deleteDriverInfoFromCluster() error {
  482. n := ep.getNetwork()
  483. if !n.isClusterEligible() {
  484. return nil
  485. }
  486. if ep.joinInfo == nil {
  487. return nil
  488. }
  489. agent := n.getController().getAgent()
  490. if agent == nil {
  491. return nil
  492. }
  493. for _, te := range ep.joinInfo.driverTableEntries {
  494. if err := agent.networkDB.DeleteEntry(te.tableName, n.ID(), te.key); err != nil {
  495. return err
  496. }
  497. }
  498. return nil
  499. }
  500. func (ep *endpoint) addServiceInfoToCluster() error {
  501. if ep.isAnonymous() && len(ep.myAliases) == 0 || ep.Iface().Address() == nil {
  502. return nil
  503. }
  504. n := ep.getNetwork()
  505. if !n.isClusterEligible() {
  506. return nil
  507. }
  508. c := n.getController()
  509. agent := c.getAgent()
  510. var ingressPorts []*PortConfig
  511. if ep.svcID != "" {
  512. // Gossip ingress ports only in ingress network.
  513. if n.ingress {
  514. ingressPorts = ep.ingressPorts
  515. }
  516. if err := c.addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.virtualIP, ingressPorts, ep.svcAliases, ep.Iface().Address().IP); err != nil {
  517. return err
  518. }
  519. }
  520. name := ep.Name()
  521. if ep.isAnonymous() {
  522. name = ep.MyAliases()[0]
  523. }
  524. buf, err := proto.Marshal(&EndpointRecord{
  525. Name: name,
  526. ServiceName: ep.svcName,
  527. ServiceID: ep.svcID,
  528. VirtualIP: ep.virtualIP.String(),
  529. IngressPorts: ingressPorts,
  530. Aliases: ep.svcAliases,
  531. TaskAliases: ep.myAliases,
  532. EndpointIP: ep.Iface().Address().IP.String(),
  533. })
  534. if err != nil {
  535. return err
  536. }
  537. if agent != nil {
  538. if err := agent.networkDB.CreateEntry(libnetworkEPTable, n.ID(), ep.ID(), buf); err != nil {
  539. return err
  540. }
  541. }
  542. return nil
  543. }
  544. func (ep *endpoint) deleteServiceInfoFromCluster() error {
  545. if ep.isAnonymous() && len(ep.myAliases) == 0 {
  546. return nil
  547. }
  548. n := ep.getNetwork()
  549. if !n.isClusterEligible() {
  550. return nil
  551. }
  552. c := n.getController()
  553. agent := c.getAgent()
  554. if ep.svcID != "" && ep.Iface().Address() != nil {
  555. var ingressPorts []*PortConfig
  556. if n.ingress {
  557. ingressPorts = ep.ingressPorts
  558. }
  559. if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.virtualIP, ingressPorts, ep.svcAliases, ep.Iface().Address().IP); err != nil {
  560. return err
  561. }
  562. }
  563. if agent != nil {
  564. if err := agent.networkDB.DeleteEntry(libnetworkEPTable, n.ID(), ep.ID()); err != nil {
  565. return err
  566. }
  567. }
  568. return nil
  569. }
  570. func (n *network) addDriverWatches() {
  571. if !n.isClusterEligible() {
  572. return
  573. }
  574. c := n.getController()
  575. agent := c.getAgent()
  576. if agent == nil {
  577. return
  578. }
  579. for _, table := range n.driverTables {
  580. ch, cancel := agent.networkDB.Watch(table.name, n.ID(), "")
  581. agent.Lock()
  582. agent.driverCancelFuncs[n.ID()] = append(agent.driverCancelFuncs[n.ID()], cancel)
  583. agent.Unlock()
  584. go c.handleTableEvents(ch, n.handleDriverTableEvent)
  585. d, err := n.driver(false)
  586. if err != nil {
  587. logrus.Errorf("Could not resolve driver %s while walking driver tabl: %v", n.networkType, err)
  588. return
  589. }
  590. agent.networkDB.WalkTable(table.name, func(nid, key string, value []byte) bool {
  591. if nid == n.ID() {
  592. d.EventNotify(driverapi.Create, nid, table.name, key, value)
  593. }
  594. return false
  595. })
  596. }
  597. }
  598. func (n *network) cancelDriverWatches() {
  599. if !n.isClusterEligible() {
  600. return
  601. }
  602. agent := n.getController().getAgent()
  603. if agent == nil {
  604. return
  605. }
  606. agent.Lock()
  607. cancelFuncs := agent.driverCancelFuncs[n.ID()]
  608. delete(agent.driverCancelFuncs, n.ID())
  609. agent.Unlock()
  610. for _, cancel := range cancelFuncs {
  611. cancel()
  612. }
  613. }
  614. func (c *controller) handleTableEvents(ch chan events.Event, fn func(events.Event)) {
  615. for {
  616. select {
  617. case ev, ok := <-ch:
  618. if !ok {
  619. return
  620. }
  621. fn(ev)
  622. }
  623. }
  624. }
  625. func (n *network) handleDriverTableEvent(ev events.Event) {
  626. d, err := n.driver(false)
  627. if err != nil {
  628. logrus.Errorf("Could not resolve driver %s while handling driver table event: %v", n.networkType, err)
  629. return
  630. }
  631. var (
  632. etype driverapi.EventType
  633. tname string
  634. key string
  635. value []byte
  636. )
  637. switch event := ev.(type) {
  638. case networkdb.CreateEvent:
  639. tname = event.Table
  640. key = event.Key
  641. value = event.Value
  642. etype = driverapi.Create
  643. case networkdb.DeleteEvent:
  644. tname = event.Table
  645. key = event.Key
  646. value = event.Value
  647. etype = driverapi.Delete
  648. case networkdb.UpdateEvent:
  649. tname = event.Table
  650. key = event.Key
  651. value = event.Value
  652. etype = driverapi.Delete
  653. }
  654. d.EventNotify(etype, n.ID(), tname, key, value)
  655. }
  656. func (c *controller) handleNodeTableEvent(ev events.Event) {
  657. var (
  658. value []byte
  659. isAdd bool
  660. nodeAddr networkdb.NodeAddr
  661. )
  662. switch event := ev.(type) {
  663. case networkdb.CreateEvent:
  664. value = event.Value
  665. isAdd = true
  666. case networkdb.DeleteEvent:
  667. value = event.Value
  668. case networkdb.UpdateEvent:
  669. logrus.Errorf("Unexpected update node table event = %#v", event)
  670. }
  671. err := json.Unmarshal(value, &nodeAddr)
  672. if err != nil {
  673. logrus.Errorf("Error unmarshalling node table event %v", err)
  674. return
  675. }
  676. c.processNodeDiscovery([]net.IP{nodeAddr.Addr}, isAdd)
  677. }
  678. func (c *controller) handleEpTableEvent(ev events.Event) {
  679. var (
  680. nid string
  681. eid string
  682. value []byte
  683. isAdd bool
  684. epRec EndpointRecord
  685. )
  686. switch event := ev.(type) {
  687. case networkdb.CreateEvent:
  688. nid = event.NetworkID
  689. eid = event.Key
  690. value = event.Value
  691. isAdd = true
  692. case networkdb.DeleteEvent:
  693. nid = event.NetworkID
  694. eid = event.Key
  695. value = event.Value
  696. case networkdb.UpdateEvent:
  697. logrus.Errorf("Unexpected update service table event = %#v", event)
  698. }
  699. nw, err := c.NetworkByID(nid)
  700. if err != nil {
  701. logrus.Errorf("Could not find network %s while handling service table event: %v", nid, err)
  702. return
  703. }
  704. n := nw.(*network)
  705. err = proto.Unmarshal(value, &epRec)
  706. if err != nil {
  707. logrus.Errorf("Failed to unmarshal service table value: %v", err)
  708. return
  709. }
  710. name := epRec.Name
  711. svcName := epRec.ServiceName
  712. svcID := epRec.ServiceID
  713. vip := net.ParseIP(epRec.VirtualIP)
  714. ip := net.ParseIP(epRec.EndpointIP)
  715. ingressPorts := epRec.IngressPorts
  716. aliases := epRec.Aliases
  717. taskaliases := epRec.TaskAliases
  718. if name == "" || ip == nil {
  719. logrus.Errorf("Invalid endpoint name/ip received while handling service table event %s", value)
  720. return
  721. }
  722. if isAdd {
  723. if svcID != "" {
  724. if err := c.addServiceBinding(svcName, svcID, nid, eid, vip, ingressPorts, aliases, ip); err != nil {
  725. logrus.Errorf("Failed adding service binding for value %s: %v", value, err)
  726. return
  727. }
  728. }
  729. n.addSvcRecords(name, ip, nil, true)
  730. for _, alias := range taskaliases {
  731. n.addSvcRecords(alias, ip, nil, true)
  732. }
  733. } else {
  734. if svcID != "" {
  735. if err := c.rmServiceBinding(svcName, svcID, nid, eid, vip, ingressPorts, aliases, ip); err != nil {
  736. logrus.Errorf("Failed adding service binding for value %s: %v", value, err)
  737. return
  738. }
  739. }
  740. n.deleteSvcRecords(name, ip, nil, true)
  741. for _, alias := range taskaliases {
  742. n.deleteSvcRecords(alias, ip, nil, true)
  743. }
  744. }
  745. }