agent.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860
  1. package libnetwork
  2. //go:generate protoc -I.:Godeps/_workspace/src/github.com/gogo/protobuf --gogo_out=import_path=github.com/docker/libnetwork,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. agent.proto
  3. import (
  4. "encoding/json"
  5. "fmt"
  6. "net"
  7. "os"
  8. "sort"
  9. "sync"
  10. "github.com/Sirupsen/logrus"
  11. "github.com/docker/docker/pkg/stringid"
  12. "github.com/docker/go-events"
  13. "github.com/docker/libnetwork/datastore"
  14. "github.com/docker/libnetwork/discoverapi"
  15. "github.com/docker/libnetwork/driverapi"
  16. "github.com/docker/libnetwork/networkdb"
  17. "github.com/docker/libnetwork/types"
  18. "github.com/gogo/protobuf/proto"
  19. )
  20. const (
  21. subsysGossip = "networking:gossip"
  22. subsysIPSec = "networking:ipsec"
  23. keyringSize = 3
  24. )
  25. // ByTime implements sort.Interface for []*types.EncryptionKey based on
  26. // the LamportTime field.
  27. type ByTime []*types.EncryptionKey
  28. func (b ByTime) Len() int { return len(b) }
  29. func (b ByTime) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
  30. func (b ByTime) Less(i, j int) bool { return b[i].LamportTime < b[j].LamportTime }
  31. type agent struct {
  32. networkDB *networkdb.NetworkDB
  33. bindAddr string
  34. advertiseAddr string
  35. epTblCancel func()
  36. driverCancelFuncs map[string][]func()
  37. sync.Mutex
  38. }
  39. const libnetworkEPTable = "endpoint_table"
  40. func getBindAddr(ifaceName string) (string, error) {
  41. iface, err := net.InterfaceByName(ifaceName)
  42. if err != nil {
  43. return "", fmt.Errorf("failed to find interface %s: %v", ifaceName, err)
  44. }
  45. addrs, err := iface.Addrs()
  46. if err != nil {
  47. return "", fmt.Errorf("failed to get interface addresses: %v", err)
  48. }
  49. for _, a := range addrs {
  50. addr, ok := a.(*net.IPNet)
  51. if !ok {
  52. continue
  53. }
  54. addrIP := addr.IP
  55. if addrIP.IsLinkLocalUnicast() {
  56. continue
  57. }
  58. return addrIP.String(), nil
  59. }
  60. return "", fmt.Errorf("failed to get bind address")
  61. }
  62. func resolveAddr(addrOrInterface string) (string, error) {
  63. // Try and see if this is a valid IP address
  64. if net.ParseIP(addrOrInterface) != nil {
  65. return addrOrInterface, nil
  66. }
  67. addr, err := net.ResolveIPAddr("ip", addrOrInterface)
  68. if err != nil {
  69. // If not a valid IP address, it should be a valid interface
  70. return getBindAddr(addrOrInterface)
  71. }
  72. return addr.String(), nil
  73. }
  74. func (c *controller) handleKeyChange(keys []*types.EncryptionKey) error {
  75. drvEnc := discoverapi.DriverEncryptionUpdate{}
  76. a := c.getAgent()
  77. if a == nil {
  78. logrus.Debug("Skipping key change as agent is nil")
  79. return nil
  80. }
  81. // Find the deleted key. If the deleted key was the primary key,
  82. // a new primary key should be set before removing if from keyring.
  83. c.Lock()
  84. added := []byte{}
  85. deleted := []byte{}
  86. j := len(c.keys)
  87. for i := 0; i < j; {
  88. same := false
  89. for _, key := range keys {
  90. if same = key.LamportTime == c.keys[i].LamportTime; same {
  91. break
  92. }
  93. }
  94. if !same {
  95. cKey := c.keys[i]
  96. if cKey.Subsystem == subsysGossip {
  97. deleted = cKey.Key
  98. }
  99. if cKey.Subsystem == subsysIPSec {
  100. drvEnc.Prune = cKey.Key
  101. drvEnc.PruneTag = cKey.LamportTime
  102. }
  103. c.keys[i], c.keys[j-1] = c.keys[j-1], c.keys[i]
  104. c.keys[j-1] = nil
  105. j--
  106. }
  107. i++
  108. }
  109. c.keys = c.keys[:j]
  110. // Find the new key and add it to the key ring
  111. for _, key := range keys {
  112. same := false
  113. for _, cKey := range c.keys {
  114. if same = cKey.LamportTime == key.LamportTime; same {
  115. break
  116. }
  117. }
  118. if !same {
  119. c.keys = append(c.keys, key)
  120. if key.Subsystem == subsysGossip {
  121. added = key.Key
  122. }
  123. if key.Subsystem == subsysIPSec {
  124. drvEnc.Key = key.Key
  125. drvEnc.Tag = key.LamportTime
  126. }
  127. }
  128. }
  129. c.Unlock()
  130. if len(added) > 0 {
  131. a.networkDB.SetKey(added)
  132. }
  133. key, tag, err := c.getPrimaryKeyTag(subsysGossip)
  134. if err != nil {
  135. return err
  136. }
  137. a.networkDB.SetPrimaryKey(key)
  138. key, tag, err = c.getPrimaryKeyTag(subsysIPSec)
  139. if err != nil {
  140. return err
  141. }
  142. drvEnc.Primary = key
  143. drvEnc.PrimaryTag = tag
  144. if len(deleted) > 0 {
  145. a.networkDB.RemoveKey(deleted)
  146. }
  147. c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
  148. err := driver.DiscoverNew(discoverapi.EncryptionKeysUpdate, drvEnc)
  149. if err != nil {
  150. logrus.Warnf("Failed to update datapath keys in driver %s: %v", name, err)
  151. }
  152. return false
  153. })
  154. return nil
  155. }
  156. func (c *controller) agentSetup() error {
  157. c.Lock()
  158. clusterProvider := c.cfg.Daemon.ClusterProvider
  159. agent := c.agent
  160. c.Unlock()
  161. bindAddr := clusterProvider.GetLocalAddress()
  162. advAddr := clusterProvider.GetAdvertiseAddress()
  163. remote := clusterProvider.GetRemoteAddress()
  164. remoteAddr, _, _ := net.SplitHostPort(remote)
  165. listen := clusterProvider.GetListenAddress()
  166. listenAddr, _, _ := net.SplitHostPort(listen)
  167. logrus.Infof("Initializing Libnetwork Agent Listen-Addr=%s Local-addr=%s Adv-addr=%s Remote-addr =%s", listenAddr, bindAddr, advAddr, remoteAddr)
  168. if advAddr != "" && agent == nil {
  169. if err := c.agentInit(listenAddr, bindAddr, advAddr); err != nil {
  170. logrus.Errorf("Error in agentInit : %v", err)
  171. } else {
  172. c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
  173. if capability.DataScope == datastore.GlobalScope {
  174. c.agentDriverNotify(driver)
  175. }
  176. return false
  177. })
  178. }
  179. }
  180. if remoteAddr != "" {
  181. if err := c.agentJoin(remoteAddr); err != nil {
  182. logrus.Errorf("Error in joining gossip cluster : %v(join will be retried in background)", err)
  183. }
  184. }
  185. c.Lock()
  186. if c.agent != nil && c.agentInitDone != nil {
  187. close(c.agentInitDone)
  188. c.agentInitDone = nil
  189. c.agentStopDone = make(chan struct{})
  190. }
  191. c.Unlock()
  192. return nil
  193. }
  194. // For a given subsystem getKeys sorts the keys by lamport time and returns
  195. // slice of keys and lamport time which can used as a unique tag for the keys
  196. func (c *controller) getKeys(subsys string) ([][]byte, []uint64) {
  197. c.Lock()
  198. defer c.Unlock()
  199. sort.Sort(ByTime(c.keys))
  200. keys := [][]byte{}
  201. tags := []uint64{}
  202. for _, key := range c.keys {
  203. if key.Subsystem == subsys {
  204. keys = append(keys, key.Key)
  205. tags = append(tags, key.LamportTime)
  206. }
  207. }
  208. keys[0], keys[1] = keys[1], keys[0]
  209. tags[0], tags[1] = tags[1], tags[0]
  210. return keys, tags
  211. }
  212. // getPrimaryKeyTag returns the primary key for a given subsystem from the
  213. // list of sorted key and the associated tag
  214. func (c *controller) getPrimaryKeyTag(subsys string) ([]byte, uint64, error) {
  215. c.Lock()
  216. defer c.Unlock()
  217. sort.Sort(ByTime(c.keys))
  218. keys := []*types.EncryptionKey{}
  219. for _, key := range c.keys {
  220. if key.Subsystem == subsys {
  221. keys = append(keys, key)
  222. }
  223. }
  224. return keys[1].Key, keys[1].LamportTime, nil
  225. }
  226. func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr string) error {
  227. if !c.isAgent() {
  228. return nil
  229. }
  230. bindAddr, err := resolveAddr(bindAddrOrInterface)
  231. if err != nil {
  232. return err
  233. }
  234. keys, tags := c.getKeys(subsysGossip)
  235. hostname, _ := os.Hostname()
  236. nodeName := hostname + "-" + stringid.TruncateID(stringid.GenerateRandomID())
  237. logrus.Info("Gossip cluster hostname ", nodeName)
  238. nDB, err := networkdb.New(&networkdb.Config{
  239. BindAddr: listenAddr,
  240. AdvertiseAddr: advertiseAddr,
  241. NodeName: nodeName,
  242. Keys: keys,
  243. })
  244. if err != nil {
  245. return err
  246. }
  247. ch, cancel := nDB.Watch(libnetworkEPTable, "", "")
  248. nodeCh, cancel := nDB.Watch(networkdb.NodeTable, "", "")
  249. c.Lock()
  250. c.agent = &agent{
  251. networkDB: nDB,
  252. bindAddr: bindAddr,
  253. advertiseAddr: advertiseAddr,
  254. epTblCancel: cancel,
  255. driverCancelFuncs: make(map[string][]func()),
  256. }
  257. c.Unlock()
  258. go c.handleTableEvents(ch, c.handleEpTableEvent)
  259. go c.handleTableEvents(nodeCh, c.handleNodeTableEvent)
  260. drvEnc := discoverapi.DriverEncryptionConfig{}
  261. keys, tags = c.getKeys(subsysIPSec)
  262. drvEnc.Keys = keys
  263. drvEnc.Tags = tags
  264. c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
  265. err := driver.DiscoverNew(discoverapi.EncryptionKeysConfig, drvEnc)
  266. if err != nil {
  267. logrus.Warnf("Failed to set datapath keys in driver %s: %v", name, err)
  268. }
  269. return false
  270. })
  271. c.WalkNetworks(joinCluster)
  272. return nil
  273. }
  274. func (c *controller) agentJoin(remote string) error {
  275. agent := c.getAgent()
  276. if agent == nil {
  277. return nil
  278. }
  279. return agent.networkDB.Join([]string{remote})
  280. }
  281. func (c *controller) agentDriverNotify(d driverapi.Driver) {
  282. agent := c.getAgent()
  283. if agent == nil {
  284. return
  285. }
  286. d.DiscoverNew(discoverapi.NodeDiscovery, discoverapi.NodeDiscoveryData{
  287. Address: agent.advertiseAddr,
  288. BindAddress: agent.bindAddr,
  289. Self: true,
  290. })
  291. drvEnc := discoverapi.DriverEncryptionConfig{}
  292. keys, tags := c.getKeys(subsysIPSec)
  293. drvEnc.Keys = keys
  294. drvEnc.Tags = tags
  295. c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
  296. err := driver.DiscoverNew(discoverapi.EncryptionKeysConfig, drvEnc)
  297. if err != nil {
  298. logrus.Warnf("Failed to set datapath keys in driver %s: %v", name, err)
  299. }
  300. return false
  301. })
  302. }
  303. func (c *controller) agentClose() {
  304. // Acquire current agent instance and reset its pointer
  305. // then run closing functions
  306. c.Lock()
  307. agent := c.agent
  308. c.agent = nil
  309. c.Unlock()
  310. if agent == nil {
  311. return
  312. }
  313. var cancelList []func()
  314. agent.Lock()
  315. for _, cancelFuncs := range agent.driverCancelFuncs {
  316. for _, cancel := range cancelFuncs {
  317. cancelList = append(cancelList, cancel)
  318. }
  319. }
  320. agent.Unlock()
  321. for _, cancel := range cancelList {
  322. cancel()
  323. }
  324. agent.epTblCancel()
  325. agent.networkDB.Close()
  326. }
  327. // Task has the backend container details
  328. type Task struct {
  329. Name string
  330. EndpointID string
  331. EndpointIP string
  332. Info map[string]string
  333. }
  334. // ServiceInfo has service specific details along with the list of backend tasks
  335. type ServiceInfo struct {
  336. VIP string
  337. LocalLBIndex int
  338. Tasks []Task
  339. Ports []string
  340. }
  341. type epRecord struct {
  342. ep EndpointRecord
  343. info map[string]string
  344. lbIndex int
  345. }
  346. func (n *network) Services() map[string]ServiceInfo {
  347. eps := make(map[string]epRecord)
  348. if !n.isClusterEligible() {
  349. return nil
  350. }
  351. agent := n.getController().getAgent()
  352. if agent == nil {
  353. return nil
  354. }
  355. // Walk through libnetworkEPTable and fetch the driver agnostic endpoint info
  356. entries := agent.networkDB.GetTableByNetwork(libnetworkEPTable, n.id)
  357. for eid, value := range entries {
  358. var epRec EndpointRecord
  359. nid := n.ID()
  360. if err := proto.Unmarshal(value.([]byte), &epRec); err != nil {
  361. logrus.Errorf("Unmarshal of libnetworkEPTable failed for endpoint %s in network %s, %v", eid, nid, err)
  362. continue
  363. }
  364. i := n.getController().getLBIndex(epRec.ServiceID, nid, epRec.IngressPorts)
  365. eps[eid] = epRecord{
  366. ep: epRec,
  367. lbIndex: i,
  368. }
  369. }
  370. // Walk through the driver's tables, have the driver decode the entries
  371. // and return the tuple {ep ID, value}. value is a string that coveys
  372. // relevant info about the endpoint.
  373. d, err := n.driver(true)
  374. if err != nil {
  375. logrus.Errorf("Could not resolve driver for network %s/%s while fetching services: %v", n.networkType, n.ID(), err)
  376. return nil
  377. }
  378. for _, table := range n.driverTables {
  379. if table.objType != driverapi.EndpointObject {
  380. continue
  381. }
  382. entries := agent.networkDB.GetTableByNetwork(table.name, n.id)
  383. for key, value := range entries {
  384. epID, info := d.DecodeTableEntry(table.name, key, value.([]byte))
  385. if ep, ok := eps[epID]; !ok {
  386. logrus.Errorf("Inconsistent driver and libnetwork state for endpoint %s", epID)
  387. } else {
  388. ep.info = info
  389. eps[epID] = ep
  390. }
  391. }
  392. }
  393. // group the endpoints into a map keyed by the service name
  394. sinfo := make(map[string]ServiceInfo)
  395. for ep, epr := range eps {
  396. var (
  397. s ServiceInfo
  398. ok bool
  399. )
  400. if s, ok = sinfo[epr.ep.ServiceName]; !ok {
  401. s = ServiceInfo{
  402. VIP: epr.ep.VirtualIP,
  403. LocalLBIndex: epr.lbIndex,
  404. }
  405. }
  406. ports := []string{}
  407. if s.Ports == nil {
  408. for _, port := range epr.ep.IngressPorts {
  409. p := fmt.Sprintf("Target: %d, Publish: %d", port.TargetPort, port.PublishedPort)
  410. ports = append(ports, p)
  411. }
  412. s.Ports = ports
  413. }
  414. s.Tasks = append(s.Tasks, Task{
  415. Name: epr.ep.Name,
  416. EndpointID: ep,
  417. EndpointIP: epr.ep.EndpointIP,
  418. Info: epr.info,
  419. })
  420. sinfo[epr.ep.ServiceName] = s
  421. }
  422. return sinfo
  423. }
  424. func (n *network) isClusterEligible() bool {
  425. if n.driverScope() != datastore.GlobalScope {
  426. return false
  427. }
  428. return n.getController().getAgent() != nil
  429. }
  430. func (n *network) joinCluster() error {
  431. if !n.isClusterEligible() {
  432. return nil
  433. }
  434. agent := n.getController().getAgent()
  435. if agent == nil {
  436. return nil
  437. }
  438. return agent.networkDB.JoinNetwork(n.ID())
  439. }
  440. func (n *network) leaveCluster() error {
  441. if !n.isClusterEligible() {
  442. return nil
  443. }
  444. agent := n.getController().getAgent()
  445. if agent == nil {
  446. return nil
  447. }
  448. return agent.networkDB.LeaveNetwork(n.ID())
  449. }
  450. func (ep *endpoint) addDriverInfoToCluster() error {
  451. n := ep.getNetwork()
  452. if !n.isClusterEligible() {
  453. return nil
  454. }
  455. if ep.joinInfo == nil {
  456. return nil
  457. }
  458. agent := n.getController().getAgent()
  459. if agent == nil {
  460. return nil
  461. }
  462. for _, te := range ep.joinInfo.driverTableEntries {
  463. if err := agent.networkDB.CreateEntry(te.tableName, n.ID(), te.key, te.value); err != nil {
  464. return err
  465. }
  466. }
  467. return nil
  468. }
  469. func (ep *endpoint) deleteDriverInfoFromCluster() error {
  470. n := ep.getNetwork()
  471. if !n.isClusterEligible() {
  472. return nil
  473. }
  474. if ep.joinInfo == nil {
  475. return nil
  476. }
  477. agent := n.getController().getAgent()
  478. if agent == nil {
  479. return nil
  480. }
  481. for _, te := range ep.joinInfo.driverTableEntries {
  482. if err := agent.networkDB.DeleteEntry(te.tableName, n.ID(), te.key); err != nil {
  483. return err
  484. }
  485. }
  486. return nil
  487. }
  488. func (ep *endpoint) addServiceInfoToCluster() error {
  489. if ep.isAnonymous() && len(ep.myAliases) == 0 || ep.Iface().Address() == nil {
  490. return nil
  491. }
  492. n := ep.getNetwork()
  493. if !n.isClusterEligible() {
  494. return nil
  495. }
  496. c := n.getController()
  497. agent := c.getAgent()
  498. var ingressPorts []*PortConfig
  499. if ep.svcID != "" {
  500. // Gossip ingress ports only in ingress network.
  501. if n.ingress {
  502. ingressPorts = ep.ingressPorts
  503. }
  504. if err := c.addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.virtualIP, ingressPorts, ep.svcAliases, ep.Iface().Address().IP); err != nil {
  505. return err
  506. }
  507. }
  508. name := ep.Name()
  509. if ep.isAnonymous() {
  510. name = ep.MyAliases()[0]
  511. }
  512. buf, err := proto.Marshal(&EndpointRecord{
  513. Name: name,
  514. ServiceName: ep.svcName,
  515. ServiceID: ep.svcID,
  516. VirtualIP: ep.virtualIP.String(),
  517. IngressPorts: ingressPorts,
  518. Aliases: ep.svcAliases,
  519. TaskAliases: ep.myAliases,
  520. EndpointIP: ep.Iface().Address().IP.String(),
  521. })
  522. if err != nil {
  523. return err
  524. }
  525. if agent != nil {
  526. if err := agent.networkDB.CreateEntry(libnetworkEPTable, n.ID(), ep.ID(), buf); err != nil {
  527. return err
  528. }
  529. }
  530. return nil
  531. }
  532. func (ep *endpoint) deleteServiceInfoFromCluster() error {
  533. if ep.isAnonymous() && len(ep.myAliases) == 0 {
  534. return nil
  535. }
  536. n := ep.getNetwork()
  537. if !n.isClusterEligible() {
  538. return nil
  539. }
  540. c := n.getController()
  541. agent := c.getAgent()
  542. if ep.svcID != "" && ep.Iface().Address() != nil {
  543. var ingressPorts []*PortConfig
  544. if n.ingress {
  545. ingressPorts = ep.ingressPorts
  546. }
  547. if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.virtualIP, ingressPorts, ep.svcAliases, ep.Iface().Address().IP); err != nil {
  548. return err
  549. }
  550. }
  551. if agent != nil {
  552. if err := agent.networkDB.DeleteEntry(libnetworkEPTable, n.ID(), ep.ID()); err != nil {
  553. return err
  554. }
  555. }
  556. return nil
  557. }
  558. func (n *network) addDriverWatches() {
  559. if !n.isClusterEligible() {
  560. return
  561. }
  562. c := n.getController()
  563. agent := c.getAgent()
  564. if agent == nil {
  565. return
  566. }
  567. for _, table := range n.driverTables {
  568. ch, cancel := agent.networkDB.Watch(table.name, n.ID(), "")
  569. agent.Lock()
  570. agent.driverCancelFuncs[n.ID()] = append(agent.driverCancelFuncs[n.ID()], cancel)
  571. agent.Unlock()
  572. go c.handleTableEvents(ch, n.handleDriverTableEvent)
  573. d, err := n.driver(false)
  574. if err != nil {
  575. logrus.Errorf("Could not resolve driver %s while walking driver tabl: %v", n.networkType, err)
  576. return
  577. }
  578. agent.networkDB.WalkTable(table.name, func(nid, key string, value []byte) bool {
  579. if nid == n.ID() {
  580. d.EventNotify(driverapi.Create, nid, table.name, key, value)
  581. }
  582. return false
  583. })
  584. }
  585. }
  586. func (n *network) cancelDriverWatches() {
  587. if !n.isClusterEligible() {
  588. return
  589. }
  590. agent := n.getController().getAgent()
  591. if agent == nil {
  592. return
  593. }
  594. agent.Lock()
  595. cancelFuncs := agent.driverCancelFuncs[n.ID()]
  596. delete(agent.driverCancelFuncs, n.ID())
  597. agent.Unlock()
  598. for _, cancel := range cancelFuncs {
  599. cancel()
  600. }
  601. }
  602. func (c *controller) handleTableEvents(ch chan events.Event, fn func(events.Event)) {
  603. for {
  604. select {
  605. case ev, ok := <-ch:
  606. if !ok {
  607. return
  608. }
  609. fn(ev)
  610. }
  611. }
  612. }
  613. func (n *network) handleDriverTableEvent(ev events.Event) {
  614. d, err := n.driver(false)
  615. if err != nil {
  616. logrus.Errorf("Could not resolve driver %s while handling driver table event: %v", n.networkType, err)
  617. return
  618. }
  619. var (
  620. etype driverapi.EventType
  621. tname string
  622. key string
  623. value []byte
  624. )
  625. switch event := ev.(type) {
  626. case networkdb.CreateEvent:
  627. tname = event.Table
  628. key = event.Key
  629. value = event.Value
  630. etype = driverapi.Create
  631. case networkdb.DeleteEvent:
  632. tname = event.Table
  633. key = event.Key
  634. value = event.Value
  635. etype = driverapi.Delete
  636. case networkdb.UpdateEvent:
  637. tname = event.Table
  638. key = event.Key
  639. value = event.Value
  640. etype = driverapi.Delete
  641. }
  642. d.EventNotify(etype, n.ID(), tname, key, value)
  643. }
  644. func (c *controller) handleNodeTableEvent(ev events.Event) {
  645. var (
  646. value []byte
  647. isAdd bool
  648. nodeAddr networkdb.NodeAddr
  649. )
  650. switch event := ev.(type) {
  651. case networkdb.CreateEvent:
  652. value = event.Value
  653. isAdd = true
  654. case networkdb.DeleteEvent:
  655. value = event.Value
  656. case networkdb.UpdateEvent:
  657. logrus.Errorf("Unexpected update node table event = %#v", event)
  658. }
  659. err := json.Unmarshal(value, &nodeAddr)
  660. if err != nil {
  661. logrus.Errorf("Error unmarshalling node table event %v", err)
  662. return
  663. }
  664. c.processNodeDiscovery([]net.IP{nodeAddr.Addr}, isAdd)
  665. }
  666. func (c *controller) handleEpTableEvent(ev events.Event) {
  667. var (
  668. nid string
  669. eid string
  670. value []byte
  671. isAdd bool
  672. epRec EndpointRecord
  673. )
  674. switch event := ev.(type) {
  675. case networkdb.CreateEvent:
  676. nid = event.NetworkID
  677. eid = event.Key
  678. value = event.Value
  679. isAdd = true
  680. case networkdb.DeleteEvent:
  681. nid = event.NetworkID
  682. eid = event.Key
  683. value = event.Value
  684. case networkdb.UpdateEvent:
  685. logrus.Errorf("Unexpected update service table event = %#v", event)
  686. }
  687. nw, err := c.NetworkByID(nid)
  688. if err != nil {
  689. logrus.Errorf("Could not find network %s while handling service table event: %v", nid, err)
  690. return
  691. }
  692. n := nw.(*network)
  693. err = proto.Unmarshal(value, &epRec)
  694. if err != nil {
  695. logrus.Errorf("Failed to unmarshal service table value: %v", err)
  696. return
  697. }
  698. name := epRec.Name
  699. svcName := epRec.ServiceName
  700. svcID := epRec.ServiceID
  701. vip := net.ParseIP(epRec.VirtualIP)
  702. ip := net.ParseIP(epRec.EndpointIP)
  703. ingressPorts := epRec.IngressPorts
  704. aliases := epRec.Aliases
  705. taskaliases := epRec.TaskAliases
  706. if name == "" || ip == nil {
  707. logrus.Errorf("Invalid endpoint name/ip received while handling service table event %s", value)
  708. return
  709. }
  710. if isAdd {
  711. if svcID != "" {
  712. if err := c.addServiceBinding(svcName, svcID, nid, eid, vip, ingressPorts, aliases, ip); err != nil {
  713. logrus.Errorf("Failed adding service binding for value %s: %v", value, err)
  714. return
  715. }
  716. }
  717. n.addSvcRecords(name, ip, nil, true)
  718. for _, alias := range taskaliases {
  719. n.addSvcRecords(alias, ip, nil, true)
  720. }
  721. } else {
  722. if svcID != "" {
  723. if err := c.rmServiceBinding(svcName, svcID, nid, eid, vip, ingressPorts, aliases, ip); err != nil {
  724. logrus.Errorf("Failed adding service binding for value %s: %v", value, err)
  725. return
  726. }
  727. }
  728. n.deleteSvcRecords(name, ip, nil, true)
  729. for _, alias := range taskaliases {
  730. n.deleteSvcRecords(alias, ip, nil, true)
  731. }
  732. }
  733. }