networkdb.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644
  1. package networkdb
  2. //go:generate protoc -I.:../Godeps/_workspace/src/github.com/gogo/protobuf --gogo_out=import_path=github.com/docker/libnetwork/networkdb,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. networkdb.proto
  3. import (
  4. "fmt"
  5. "net"
  6. "strings"
  7. "sync"
  8. "time"
  9. "github.com/Sirupsen/logrus"
  10. "github.com/armon/go-radix"
  11. "github.com/docker/go-events"
  12. "github.com/docker/libnetwork/types"
  13. "github.com/hashicorp/memberlist"
  14. "github.com/hashicorp/serf/serf"
  15. )
  16. const (
  17. byTable int = 1 + iota
  18. byNetwork
  19. )
  20. // NetworkDB instance drives the networkdb cluster and acts the broker
  21. // for cluster-scoped and network-scoped gossip and watches.
  22. type NetworkDB struct {
  23. // The clocks MUST be the first things
  24. // in this struct due to Golang issue #599.
  25. // Global lamport clock for node network attach events.
  26. networkClock serf.LamportClock
  27. // Global lamport clock for table events.
  28. tableClock serf.LamportClock
  29. sync.RWMutex
  30. // NetworkDB configuration.
  31. config *Config
  32. // All the tree index (byTable, byNetwork) that we maintain
  33. // the db.
  34. indexes map[int]*radix.Tree
  35. // Memberlist we use to drive the cluster.
  36. memberlist *memberlist.Memberlist
  37. // List of all peer nodes in the cluster not-limited to any
  38. // network.
  39. nodes map[string]*node
  40. // List of all peer nodes which have failed
  41. failedNodes map[string]*node
  42. // List of all peer nodes which have left
  43. leftNodes map[string]*node
  44. // A multi-dimensional map of network/node attachmemts. The
  45. // first key is a node name and the second key is a network ID
  46. // for the network that node is participating in.
  47. networks map[string]map[string]*network
  48. // A map of nodes which are participating in a given
  49. // network. The key is a network ID.
  50. networkNodes map[string][]string
  51. // A table of ack channels for every node from which we are
  52. // waiting for an ack.
  53. bulkSyncAckTbl map[string]chan struct{}
  54. // Broadcast queue for network event gossip.
  55. networkBroadcasts *memberlist.TransmitLimitedQueue
  56. // Broadcast queue for node event gossip.
  57. nodeBroadcasts *memberlist.TransmitLimitedQueue
  58. // A central stop channel to stop all go routines running on
  59. // behalf of the NetworkDB instance.
  60. stopCh chan struct{}
  61. // A central broadcaster for all local watchers watching table
  62. // events.
  63. broadcaster *events.Broadcaster
  64. // List of all tickers which needed to be stopped when
  65. // cleaning up.
  66. tickers []*time.Ticker
  67. // Reference to the memberlist's keyring to add & remove keys
  68. keyring *memberlist.Keyring
  69. // bootStrapIP is the list of IPs that can be used to bootstrap
  70. // the gossip.
  71. bootStrapIP []net.IP
  72. }
  73. // PeerInfo represents the peer (gossip cluster) nodes of a network
  74. type PeerInfo struct {
  75. Name string
  76. IP string
  77. }
  78. type node struct {
  79. memberlist.Node
  80. ltime serf.LamportTime
  81. // Number of hours left before the reaper removes the node
  82. reapTime time.Duration
  83. }
  84. // network describes the node/network attachment.
  85. type network struct {
  86. // Network ID
  87. id string
  88. // Lamport time for the latest state of the entry.
  89. ltime serf.LamportTime
  90. // Node leave is in progress.
  91. leaving bool
  92. // Number of seconds still left before a deleted network entry gets
  93. // removed from networkDB
  94. reapTime time.Duration
  95. // The broadcast queue for table event gossip. This is only
  96. // initialized for this node's network attachment entries.
  97. tableBroadcasts *memberlist.TransmitLimitedQueue
  98. }
  99. // Config represents the configuration of the networdb instance and
  100. // can be passed by the caller.
  101. type Config struct {
  102. // NodeName is the cluster wide unique name for this node.
  103. NodeName string
  104. // BindAddr is the IP on which networkdb listens. It can be
  105. // 0.0.0.0 to listen on all addresses on the host.
  106. BindAddr string
  107. // AdvertiseAddr is the node's IP address that we advertise for
  108. // cluster communication.
  109. AdvertiseAddr string
  110. // BindPort is the local node's port to which we bind to for
  111. // cluster communication.
  112. BindPort int
  113. // Keys to be added to the Keyring of the memberlist. Key at index
  114. // 0 is the primary key
  115. Keys [][]byte
  116. }
  117. // entry defines a table entry
  118. type entry struct {
  119. // node from which this entry was learned.
  120. node string
  121. // Lamport time for the most recent update to the entry
  122. ltime serf.LamportTime
  123. // Opaque value store in the entry
  124. value []byte
  125. // Deleting the entry is in progress. All entries linger in
  126. // the cluster for certain amount of time after deletion.
  127. deleting bool
  128. // Number of seconds still left before a deleted table entry gets
  129. // removed from networkDB
  130. reapTime time.Duration
  131. }
  132. // New creates a new instance of NetworkDB using the Config passed by
  133. // the caller.
  134. func New(c *Config) (*NetworkDB, error) {
  135. nDB := &NetworkDB{
  136. config: c,
  137. indexes: make(map[int]*radix.Tree),
  138. networks: make(map[string]map[string]*network),
  139. nodes: make(map[string]*node),
  140. failedNodes: make(map[string]*node),
  141. leftNodes: make(map[string]*node),
  142. networkNodes: make(map[string][]string),
  143. bulkSyncAckTbl: make(map[string]chan struct{}),
  144. broadcaster: events.NewBroadcaster(),
  145. }
  146. nDB.indexes[byTable] = radix.New()
  147. nDB.indexes[byNetwork] = radix.New()
  148. if err := nDB.clusterInit(); err != nil {
  149. return nil, err
  150. }
  151. return nDB, nil
  152. }
  153. // Join joins this NetworkDB instance with a list of peer NetworkDB
  154. // instances passed by the caller in the form of addr:port
  155. func (nDB *NetworkDB) Join(members []string) error {
  156. nDB.Lock()
  157. for _, m := range members {
  158. nDB.bootStrapIP = append(nDB.bootStrapIP, net.ParseIP(m))
  159. }
  160. nDB.Unlock()
  161. return nDB.clusterJoin(members)
  162. }
  163. // Close destroys this NetworkDB instance by leave the cluster,
  164. // stopping timers, canceling goroutines etc.
  165. func (nDB *NetworkDB) Close() {
  166. if err := nDB.clusterLeave(); err != nil {
  167. logrus.Errorf("Could not close DB %s: %v", nDB.config.NodeName, err)
  168. }
  169. }
  170. // Peers returns the gossip peers for a given network.
  171. func (nDB *NetworkDB) Peers(nid string) []PeerInfo {
  172. nDB.RLock()
  173. defer nDB.RUnlock()
  174. peers := make([]PeerInfo, 0, len(nDB.networkNodes[nid]))
  175. for _, nodeName := range nDB.networkNodes[nid] {
  176. if node, ok := nDB.nodes[nodeName]; ok {
  177. peers = append(peers, PeerInfo{
  178. Name: node.Name,
  179. IP: node.Addr.String(),
  180. })
  181. }
  182. }
  183. return peers
  184. }
  185. // GetEntry retrieves the value of a table entry in a given (network,
  186. // table, key) tuple
  187. func (nDB *NetworkDB) GetEntry(tname, nid, key string) ([]byte, error) {
  188. entry, err := nDB.getEntry(tname, nid, key)
  189. if err != nil {
  190. return nil, err
  191. }
  192. return entry.value, nil
  193. }
  194. func (nDB *NetworkDB) getEntry(tname, nid, key string) (*entry, error) {
  195. nDB.RLock()
  196. defer nDB.RUnlock()
  197. e, ok := nDB.indexes[byTable].Get(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
  198. if !ok {
  199. return nil, types.NotFoundErrorf("could not get entry in table %s with network id %s and key %s", tname, nid, key)
  200. }
  201. return e.(*entry), nil
  202. }
  203. // CreateEntry creates a table entry in NetworkDB for given (network,
  204. // table, key) tuple and if the NetworkDB is part of the cluster
  205. // propagates this event to the cluster. It is an error to create an
  206. // entry for the same tuple for which there is already an existing
  207. // entry unless the current entry is deleting state.
  208. func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error {
  209. oldEntry, err := nDB.getEntry(tname, nid, key)
  210. if err != nil {
  211. if _, ok := err.(types.NotFoundError); !ok {
  212. return fmt.Errorf("cannot create entry in table %s with network id %s and key %s: %v", tname, nid, key, err)
  213. }
  214. }
  215. if oldEntry != nil && !oldEntry.deleting {
  216. return fmt.Errorf("cannot create entry in table %s with network id %s and key %s, already exists", tname, nid, key)
  217. }
  218. entry := &entry{
  219. ltime: nDB.tableClock.Increment(),
  220. node: nDB.config.NodeName,
  221. value: value,
  222. }
  223. if err := nDB.sendTableEvent(TableEventTypeCreate, nid, tname, key, entry); err != nil {
  224. return fmt.Errorf("cannot send create event for table %s, %v", tname, err)
  225. }
  226. nDB.Lock()
  227. nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
  228. nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
  229. nDB.Unlock()
  230. return nil
  231. }
  232. // UpdateEntry updates a table entry in NetworkDB for given (network,
  233. // table, key) tuple and if the NetworkDB is part of the cluster
  234. // propagates this event to the cluster. It is an error to update a
  235. // non-existent entry.
  236. func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error {
  237. if _, err := nDB.GetEntry(tname, nid, key); err != nil {
  238. return fmt.Errorf("cannot update entry as the entry in table %s with network id %s and key %s does not exist", tname, nid, key)
  239. }
  240. entry := &entry{
  241. ltime: nDB.tableClock.Increment(),
  242. node: nDB.config.NodeName,
  243. value: value,
  244. }
  245. if err := nDB.sendTableEvent(TableEventTypeUpdate, nid, tname, key, entry); err != nil {
  246. return fmt.Errorf("cannot send table update event: %v", err)
  247. }
  248. nDB.Lock()
  249. nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
  250. nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
  251. nDB.Unlock()
  252. return nil
  253. }
  254. // GetTableByNetwork walks the networkdb by the give table and network id and
  255. // returns a map of keys and values
  256. func (nDB *NetworkDB) GetTableByNetwork(tname, nid string) map[string]interface{} {
  257. entries := make(map[string]interface{})
  258. nDB.indexes[byTable].WalkPrefix(fmt.Sprintf("/%s/%s", tname, nid), func(k string, v interface{}) bool {
  259. entry := v.(*entry)
  260. if entry.deleting {
  261. return false
  262. }
  263. key := k[strings.LastIndex(k, "/")+1:]
  264. entries[key] = entry.value
  265. return false
  266. })
  267. return entries
  268. }
  269. // DeleteEntry deletes a table entry in NetworkDB for given (network,
  270. // table, key) tuple and if the NetworkDB is part of the cluster
  271. // propagates this event to the cluster.
  272. func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {
  273. value, err := nDB.GetEntry(tname, nid, key)
  274. if err != nil {
  275. return fmt.Errorf("cannot delete entry as the entry in table %s with network id %s and key %s does not exist", tname, nid, key)
  276. }
  277. entry := &entry{
  278. ltime: nDB.tableClock.Increment(),
  279. node: nDB.config.NodeName,
  280. value: value,
  281. deleting: true,
  282. reapTime: reapInterval,
  283. }
  284. if err := nDB.sendTableEvent(TableEventTypeDelete, nid, tname, key, entry); err != nil {
  285. return fmt.Errorf("cannot send table delete event: %v", err)
  286. }
  287. nDB.Lock()
  288. nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
  289. nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
  290. nDB.Unlock()
  291. return nil
  292. }
  293. func (nDB *NetworkDB) deleteNetworkEntriesForNode(deletedNode string) {
  294. nDB.Lock()
  295. for nid, nodes := range nDB.networkNodes {
  296. updatedNodes := make([]string, 0, len(nodes))
  297. for _, node := range nodes {
  298. if node == deletedNode {
  299. continue
  300. }
  301. updatedNodes = append(updatedNodes, node)
  302. }
  303. nDB.networkNodes[nid] = updatedNodes
  304. }
  305. delete(nDB.networks, deletedNode)
  306. nDB.Unlock()
  307. }
  308. func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {
  309. nDB.Lock()
  310. nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid),
  311. func(path string, v interface{}) bool {
  312. oldEntry := v.(*entry)
  313. params := strings.Split(path[1:], "/")
  314. nid := params[0]
  315. tname := params[1]
  316. key := params[2]
  317. if oldEntry.node != node {
  318. return false
  319. }
  320. entry := &entry{
  321. ltime: oldEntry.ltime,
  322. node: node,
  323. value: oldEntry.value,
  324. deleting: true,
  325. reapTime: reapInterval,
  326. }
  327. nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
  328. nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
  329. nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value))
  330. return false
  331. })
  332. nDB.Unlock()
  333. }
  334. func (nDB *NetworkDB) deleteNodeTableEntries(node string) {
  335. nDB.Lock()
  336. nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
  337. oldEntry := v.(*entry)
  338. if oldEntry.node != node {
  339. return false
  340. }
  341. params := strings.Split(path[1:], "/")
  342. tname := params[0]
  343. nid := params[1]
  344. key := params[2]
  345. entry := &entry{
  346. ltime: oldEntry.ltime,
  347. node: node,
  348. value: oldEntry.value,
  349. deleting: true,
  350. reapTime: reapInterval,
  351. }
  352. nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
  353. nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
  354. nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value))
  355. return false
  356. })
  357. nDB.Unlock()
  358. }
  359. // WalkTable walks a single table in NetworkDB and invokes the passed
  360. // function for each entry in the table passing the network, key,
  361. // value. The walk stops if the passed function returns a true.
  362. func (nDB *NetworkDB) WalkTable(tname string, fn func(string, string, []byte) bool) error {
  363. nDB.RLock()
  364. values := make(map[string]interface{})
  365. nDB.indexes[byTable].WalkPrefix(fmt.Sprintf("/%s", tname), func(path string, v interface{}) bool {
  366. values[path] = v
  367. return false
  368. })
  369. nDB.RUnlock()
  370. for k, v := range values {
  371. params := strings.Split(k[1:], "/")
  372. nid := params[1]
  373. key := params[2]
  374. if fn(nid, key, v.(*entry).value) {
  375. return nil
  376. }
  377. }
  378. return nil
  379. }
  380. // JoinNetwork joins this node to a given network and propagates this
  381. // event across the cluster. This triggers this node joining the
  382. // sub-cluster of this network and participates in the network-scoped
  383. // gossip and bulk sync for this network.
  384. func (nDB *NetworkDB) JoinNetwork(nid string) error {
  385. ltime := nDB.networkClock.Increment()
  386. nDB.Lock()
  387. nodeNetworks, ok := nDB.networks[nDB.config.NodeName]
  388. if !ok {
  389. nodeNetworks = make(map[string]*network)
  390. nDB.networks[nDB.config.NodeName] = nodeNetworks
  391. }
  392. nodeNetworks[nid] = &network{id: nid, ltime: ltime}
  393. nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{
  394. NumNodes: func() int {
  395. nDB.RLock()
  396. num := len(nDB.networkNodes[nid])
  397. nDB.RUnlock()
  398. return num
  399. },
  400. RetransmitMult: 4,
  401. }
  402. nDB.networkNodes[nid] = append(nDB.networkNodes[nid], nDB.config.NodeName)
  403. networkNodes := nDB.networkNodes[nid]
  404. nDB.Unlock()
  405. if err := nDB.sendNetworkEvent(nid, NetworkEventTypeJoin, ltime); err != nil {
  406. return fmt.Errorf("failed to send leave network event for %s: %v", nid, err)
  407. }
  408. logrus.Debugf("%s: joined network %s", nDB.config.NodeName, nid)
  409. if _, err := nDB.bulkSync(networkNodes, true); err != nil {
  410. logrus.Errorf("Error bulk syncing while joining network %s: %v", nid, err)
  411. }
  412. return nil
  413. }
  414. // LeaveNetwork leaves this node from a given network and propagates
  415. // this event across the cluster. This triggers this node leaving the
  416. // sub-cluster of this network and as a result will no longer
  417. // participate in the network-scoped gossip and bulk sync for this
  418. // network. Also remove all the table entries for this network from
  419. // networkdb
  420. func (nDB *NetworkDB) LeaveNetwork(nid string) error {
  421. ltime := nDB.networkClock.Increment()
  422. if err := nDB.sendNetworkEvent(nid, NetworkEventTypeLeave, ltime); err != nil {
  423. return fmt.Errorf("failed to send leave network event for %s: %v", nid, err)
  424. }
  425. nDB.Lock()
  426. defer nDB.Unlock()
  427. var (
  428. paths []string
  429. entries []*entry
  430. )
  431. nwWalker := func(path string, v interface{}) bool {
  432. entry, ok := v.(*entry)
  433. if !ok {
  434. return false
  435. }
  436. paths = append(paths, path)
  437. entries = append(entries, entry)
  438. return false
  439. }
  440. nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid), nwWalker)
  441. for _, path := range paths {
  442. params := strings.Split(path[1:], "/")
  443. tname := params[1]
  444. key := params[2]
  445. if _, ok := nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key)); !ok {
  446. logrus.Errorf("Could not delete entry in table %s with network id %s and key %s as it does not exist", tname, nid, key)
  447. }
  448. if _, ok := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)); !ok {
  449. logrus.Errorf("Could not delete entry in network %s with table name %s and key %s as it does not exist", nid, tname, key)
  450. }
  451. }
  452. nodeNetworks, ok := nDB.networks[nDB.config.NodeName]
  453. if !ok {
  454. return fmt.Errorf("could not find self node for network %s while trying to leave", nid)
  455. }
  456. n, ok := nodeNetworks[nid]
  457. if !ok {
  458. return fmt.Errorf("could not find network %s while trying to leave", nid)
  459. }
  460. n.ltime = ltime
  461. n.leaving = true
  462. return nil
  463. }
  464. // addNetworkNode adds the node to the list of nodes which participate
  465. // in the passed network only if it is not already present. Caller
  466. // should hold the NetworkDB lock while calling this
  467. func (nDB *NetworkDB) addNetworkNode(nid string, nodeName string) {
  468. nodes := nDB.networkNodes[nid]
  469. for _, node := range nodes {
  470. if node == nodeName {
  471. return
  472. }
  473. }
  474. nDB.networkNodes[nid] = append(nDB.networkNodes[nid], nodeName)
  475. }
  476. // Deletes the node from the list of nodes which participate in the
  477. // passed network. Caller should hold the NetworkDB lock while calling
  478. // this
  479. func (nDB *NetworkDB) deleteNetworkNode(nid string, nodeName string) {
  480. nodes := nDB.networkNodes[nid]
  481. newNodes := make([]string, 0, len(nodes)-1)
  482. for _, name := range nodes {
  483. if name == nodeName {
  484. continue
  485. }
  486. newNodes = append(newNodes, name)
  487. }
  488. nDB.networkNodes[nid] = newNodes
  489. }
  490. // findCommonnetworks find the networks that both this node and the
  491. // passed node have joined.
  492. func (nDB *NetworkDB) findCommonNetworks(nodeName string) []string {
  493. nDB.RLock()
  494. defer nDB.RUnlock()
  495. var networks []string
  496. for nid := range nDB.networks[nDB.config.NodeName] {
  497. if n, ok := nDB.networks[nodeName][nid]; ok {
  498. if !n.leaving {
  499. networks = append(networks, nid)
  500. }
  501. }
  502. }
  503. return networks
  504. }
  505. func (nDB *NetworkDB) updateLocalNetworkTime() {
  506. nDB.Lock()
  507. defer nDB.Unlock()
  508. ltime := nDB.networkClock.Increment()
  509. for _, n := range nDB.networks[nDB.config.NodeName] {
  510. n.ltime = ltime
  511. }
  512. }
  513. func (nDB *NetworkDB) updateLocalTableTime() {
  514. nDB.Lock()
  515. defer nDB.Unlock()
  516. ltime := nDB.tableClock.Increment()
  517. nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
  518. entry := v.(*entry)
  519. if entry.node != nDB.config.NodeName {
  520. return false
  521. }
  522. params := strings.Split(path[1:], "/")
  523. tname := params[0]
  524. nid := params[1]
  525. key := params[2]
  526. entry.ltime = ltime
  527. nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
  528. nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
  529. return false
  530. })
  531. }