networkdb.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637
  1. package networkdb
  2. //go:generate protoc -I.:../Godeps/_workspace/src/github.com/gogo/protobuf --gogo_out=import_path=github.com/docker/libnetwork/networkdb,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. networkdb.proto
  3. import (
  4. "fmt"
  5. "strings"
  6. "sync"
  7. "time"
  8. "github.com/Sirupsen/logrus"
  9. "github.com/armon/go-radix"
  10. "github.com/docker/go-events"
  11. "github.com/docker/libnetwork/types"
  12. "github.com/hashicorp/memberlist"
  13. "github.com/hashicorp/serf/serf"
  14. )
  15. const (
  16. byTable int = 1 + iota
  17. byNetwork
  18. )
  19. // NetworkDB instance drives the networkdb cluster and acts the broker
  20. // for cluster-scoped and network-scoped gossip and watches.
  21. type NetworkDB struct {
  22. // The clocks MUST be the first things
  23. // in this struct due to Golang issue #599.
  24. // Global lamport clock for node network attach events.
  25. networkClock serf.LamportClock
  26. // Global lamport clock for table events.
  27. tableClock serf.LamportClock
  28. sync.RWMutex
  29. // NetworkDB configuration.
  30. config *Config
  31. // All the tree index (byTable, byNetwork) that we maintain
  32. // the db.
  33. indexes map[int]*radix.Tree
  34. // Memberlist we use to drive the cluster.
  35. memberlist *memberlist.Memberlist
  36. // List of all peer nodes in the cluster not-limited to any
  37. // network.
  38. nodes map[string]*node
  39. // List of all peer nodes which have failed
  40. failedNodes map[string]*node
  41. // List of all peer nodes which have left
  42. leftNodes map[string]*node
  43. // A multi-dimensional map of network/node attachmemts. The
  44. // first key is a node name and the second key is a network ID
  45. // for the network that node is participating in.
  46. networks map[string]map[string]*network
  47. // A map of nodes which are participating in a given
  48. // network. The key is a network ID.
  49. networkNodes map[string][]string
  50. // A table of ack channels for every node from which we are
  51. // waiting for an ack.
  52. bulkSyncAckTbl map[string]chan struct{}
  53. // Broadcast queue for network event gossip.
  54. networkBroadcasts *memberlist.TransmitLimitedQueue
  55. // Broadcast queue for node event gossip.
  56. nodeBroadcasts *memberlist.TransmitLimitedQueue
  57. // A central stop channel to stop all go routines running on
  58. // behalf of the NetworkDB instance.
  59. stopCh chan struct{}
  60. // A central broadcaster for all local watchers watching table
  61. // events.
  62. broadcaster *events.Broadcaster
  63. // List of all tickers which needed to be stopped when
  64. // cleaning up.
  65. tickers []*time.Ticker
  66. // Reference to the memberlist's keyring to add & remove keys
  67. keyring *memberlist.Keyring
  68. }
  69. // PeerInfo represents the peer (gossip cluster) nodes of a network
  70. type PeerInfo struct {
  71. Name string
  72. IP string
  73. }
  74. type node struct {
  75. memberlist.Node
  76. ltime serf.LamportTime
  77. // Number of hours left before the reaper removes the node
  78. reapTime time.Duration
  79. }
  80. // network describes the node/network attachment.
  81. type network struct {
  82. // Network ID
  83. id string
  84. // Lamport time for the latest state of the entry.
  85. ltime serf.LamportTime
  86. // Node leave is in progress.
  87. leaving bool
  88. // Number of seconds still left before a deleted network entry gets
  89. // removed from networkDB
  90. reapTime time.Duration
  91. // The broadcast queue for table event gossip. This is only
  92. // initialized for this node's network attachment entries.
  93. tableBroadcasts *memberlist.TransmitLimitedQueue
  94. }
  95. // Config represents the configuration of the networdb instance and
  96. // can be passed by the caller.
  97. type Config struct {
  98. // NodeName is the cluster wide unique name for this node.
  99. NodeName string
  100. // BindAddr is the IP on which networkdb listens. It can be
  101. // 0.0.0.0 to listen on all addresses on the host.
  102. BindAddr string
  103. // AdvertiseAddr is the node's IP address that we advertise for
  104. // cluster communication.
  105. AdvertiseAddr string
  106. // BindPort is the local node's port to which we bind to for
  107. // cluster communication.
  108. BindPort int
  109. // Keys to be added to the Keyring of the memberlist. Key at index
  110. // 0 is the primary key
  111. Keys [][]byte
  112. }
  113. // entry defines a table entry
  114. type entry struct {
  115. // node from which this entry was learned.
  116. node string
  117. // Lamport time for the most recent update to the entry
  118. ltime serf.LamportTime
  119. // Opaque value store in the entry
  120. value []byte
  121. // Deleting the entry is in progress. All entries linger in
  122. // the cluster for certain amount of time after deletion.
  123. deleting bool
  124. // Number of seconds still left before a deleted table entry gets
  125. // removed from networkDB
  126. reapTime time.Duration
  127. }
  128. // New creates a new instance of NetworkDB using the Config passed by
  129. // the caller.
  130. func New(c *Config) (*NetworkDB, error) {
  131. nDB := &NetworkDB{
  132. config: c,
  133. indexes: make(map[int]*radix.Tree),
  134. networks: make(map[string]map[string]*network),
  135. nodes: make(map[string]*node),
  136. failedNodes: make(map[string]*node),
  137. leftNodes: make(map[string]*node),
  138. networkNodes: make(map[string][]string),
  139. bulkSyncAckTbl: make(map[string]chan struct{}),
  140. broadcaster: events.NewBroadcaster(),
  141. }
  142. nDB.indexes[byTable] = radix.New()
  143. nDB.indexes[byNetwork] = radix.New()
  144. if err := nDB.clusterInit(); err != nil {
  145. return nil, err
  146. }
  147. return nDB, nil
  148. }
  149. // Join joins this NetworkDB instance with a list of peer NetworkDB
  150. // instances passed by the caller in the form of addr:port
  151. func (nDB *NetworkDB) Join(members []string) error {
  152. return nDB.clusterJoin(members)
  153. }
  154. // Close destroys this NetworkDB instance by leave the cluster,
  155. // stopping timers, canceling goroutines etc.
  156. func (nDB *NetworkDB) Close() {
  157. if err := nDB.clusterLeave(); err != nil {
  158. logrus.Errorf("Could not close DB %s: %v", nDB.config.NodeName, err)
  159. }
  160. }
  161. // Peers returns the gossip peers for a given network.
  162. func (nDB *NetworkDB) Peers(nid string) []PeerInfo {
  163. nDB.RLock()
  164. defer nDB.RUnlock()
  165. peers := make([]PeerInfo, 0, len(nDB.networkNodes[nid]))
  166. for _, nodeName := range nDB.networkNodes[nid] {
  167. if node, ok := nDB.nodes[nodeName]; ok {
  168. peers = append(peers, PeerInfo{
  169. Name: node.Name,
  170. IP: node.Addr.String(),
  171. })
  172. }
  173. }
  174. return peers
  175. }
  176. // GetEntry retrieves the value of a table entry in a given (network,
  177. // table, key) tuple
  178. func (nDB *NetworkDB) GetEntry(tname, nid, key string) ([]byte, error) {
  179. entry, err := nDB.getEntry(tname, nid, key)
  180. if err != nil {
  181. return nil, err
  182. }
  183. return entry.value, nil
  184. }
  185. func (nDB *NetworkDB) getEntry(tname, nid, key string) (*entry, error) {
  186. nDB.RLock()
  187. defer nDB.RUnlock()
  188. e, ok := nDB.indexes[byTable].Get(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
  189. if !ok {
  190. return nil, types.NotFoundErrorf("could not get entry in table %s with network id %s and key %s", tname, nid, key)
  191. }
  192. return e.(*entry), nil
  193. }
  194. // CreateEntry creates a table entry in NetworkDB for given (network,
  195. // table, key) tuple and if the NetworkDB is part of the cluster
  196. // propagates this event to the cluster. It is an error to create an
  197. // entry for the same tuple for which there is already an existing
  198. // entry unless the current entry is deleting state.
  199. func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error {
  200. oldEntry, err := nDB.getEntry(tname, nid, key)
  201. if err != nil {
  202. if _, ok := err.(types.NotFoundError); !ok {
  203. return fmt.Errorf("cannot create entry in table %s with network id %s and key %s: %v", tname, nid, key, err)
  204. }
  205. }
  206. if oldEntry != nil && !oldEntry.deleting {
  207. return fmt.Errorf("cannot create entry in table %s with network id %s and key %s, already exists", tname, nid, key)
  208. }
  209. entry := &entry{
  210. ltime: nDB.tableClock.Increment(),
  211. node: nDB.config.NodeName,
  212. value: value,
  213. }
  214. if err := nDB.sendTableEvent(TableEventTypeCreate, nid, tname, key, entry); err != nil {
  215. return fmt.Errorf("cannot send create event for table %s, %v", tname, err)
  216. }
  217. nDB.Lock()
  218. nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
  219. nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
  220. nDB.Unlock()
  221. nDB.broadcaster.Write(makeEvent(opCreate, tname, nid, key, value))
  222. return nil
  223. }
  224. // UpdateEntry updates a table entry in NetworkDB for given (network,
  225. // table, key) tuple and if the NetworkDB is part of the cluster
  226. // propagates this event to the cluster. It is an error to update a
  227. // non-existent entry.
  228. func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error {
  229. if _, err := nDB.GetEntry(tname, nid, key); err != nil {
  230. return fmt.Errorf("cannot update entry as the entry in table %s with network id %s and key %s does not exist", tname, nid, key)
  231. }
  232. entry := &entry{
  233. ltime: nDB.tableClock.Increment(),
  234. node: nDB.config.NodeName,
  235. value: value,
  236. }
  237. if err := nDB.sendTableEvent(TableEventTypeUpdate, nid, tname, key, entry); err != nil {
  238. return fmt.Errorf("cannot send table update event: %v", err)
  239. }
  240. nDB.Lock()
  241. nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
  242. nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
  243. nDB.Unlock()
  244. nDB.broadcaster.Write(makeEvent(opUpdate, tname, nid, key, value))
  245. return nil
  246. }
  247. // GetTableByNetwork walks the networkdb by the give table and network id and
  248. // returns a map of keys and values
  249. func (nDB *NetworkDB) GetTableByNetwork(tname, nid string) map[string]interface{} {
  250. entries := make(map[string]interface{})
  251. nDB.indexes[byTable].WalkPrefix(fmt.Sprintf("/%s/%s", tname, nid), func(k string, v interface{}) bool {
  252. entry := v.(*entry)
  253. if entry.deleting {
  254. return false
  255. }
  256. key := k[strings.LastIndex(k, "/")+1:]
  257. entries[key] = entry.value
  258. return false
  259. })
  260. return entries
  261. }
  262. // DeleteEntry deletes a table entry in NetworkDB for given (network,
  263. // table, key) tuple and if the NetworkDB is part of the cluster
  264. // propagates this event to the cluster.
  265. func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {
  266. value, err := nDB.GetEntry(tname, nid, key)
  267. if err != nil {
  268. return fmt.Errorf("cannot delete entry as the entry in table %s with network id %s and key %s does not exist", tname, nid, key)
  269. }
  270. entry := &entry{
  271. ltime: nDB.tableClock.Increment(),
  272. node: nDB.config.NodeName,
  273. value: value,
  274. deleting: true,
  275. reapTime: reapInterval,
  276. }
  277. if err := nDB.sendTableEvent(TableEventTypeDelete, nid, tname, key, entry); err != nil {
  278. return fmt.Errorf("cannot send table delete event: %v", err)
  279. }
  280. nDB.Lock()
  281. nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
  282. nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
  283. nDB.Unlock()
  284. nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, value))
  285. return nil
  286. }
  287. func (nDB *NetworkDB) deleteNetworkEntriesForNode(deletedNode string) {
  288. nDB.Lock()
  289. for nid, nodes := range nDB.networkNodes {
  290. updatedNodes := make([]string, 0, len(nodes))
  291. for _, node := range nodes {
  292. if node == deletedNode {
  293. continue
  294. }
  295. updatedNodes = append(updatedNodes, node)
  296. }
  297. nDB.networkNodes[nid] = updatedNodes
  298. }
  299. delete(nDB.networks, deletedNode)
  300. nDB.Unlock()
  301. }
  302. func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {
  303. nDB.Lock()
  304. nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid),
  305. func(path string, v interface{}) bool {
  306. oldEntry := v.(*entry)
  307. params := strings.Split(path[1:], "/")
  308. nid := params[0]
  309. tname := params[1]
  310. key := params[2]
  311. if oldEntry.node != node {
  312. return false
  313. }
  314. entry := &entry{
  315. ltime: oldEntry.ltime,
  316. node: node,
  317. value: oldEntry.value,
  318. deleting: true,
  319. reapTime: reapInterval,
  320. }
  321. nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
  322. nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
  323. nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value))
  324. return false
  325. })
  326. nDB.Unlock()
  327. }
  328. func (nDB *NetworkDB) deleteNodeTableEntries(node string) {
  329. nDB.Lock()
  330. nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
  331. oldEntry := v.(*entry)
  332. if oldEntry.node != node {
  333. return false
  334. }
  335. params := strings.Split(path[1:], "/")
  336. tname := params[0]
  337. nid := params[1]
  338. key := params[2]
  339. entry := &entry{
  340. ltime: oldEntry.ltime,
  341. node: node,
  342. value: oldEntry.value,
  343. deleting: true,
  344. reapTime: reapInterval,
  345. }
  346. nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
  347. nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
  348. nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value))
  349. return false
  350. })
  351. nDB.Unlock()
  352. }
  353. // WalkTable walks a single table in NetworkDB and invokes the passed
  354. // function for each entry in the table passing the network, key,
  355. // value. The walk stops if the passed function returns a true.
  356. func (nDB *NetworkDB) WalkTable(tname string, fn func(string, string, []byte) bool) error {
  357. nDB.RLock()
  358. values := make(map[string]interface{})
  359. nDB.indexes[byTable].WalkPrefix(fmt.Sprintf("/%s", tname), func(path string, v interface{}) bool {
  360. values[path] = v
  361. return false
  362. })
  363. nDB.RUnlock()
  364. for k, v := range values {
  365. params := strings.Split(k[1:], "/")
  366. nid := params[1]
  367. key := params[2]
  368. if fn(nid, key, v.(*entry).value) {
  369. return nil
  370. }
  371. }
  372. return nil
  373. }
  374. // JoinNetwork joins this node to a given network and propagates this
  375. // event across the cluster. This triggers this node joining the
  376. // sub-cluster of this network and participates in the network-scoped
  377. // gossip and bulk sync for this network.
  378. func (nDB *NetworkDB) JoinNetwork(nid string) error {
  379. ltime := nDB.networkClock.Increment()
  380. nDB.Lock()
  381. nodeNetworks, ok := nDB.networks[nDB.config.NodeName]
  382. if !ok {
  383. nodeNetworks = make(map[string]*network)
  384. nDB.networks[nDB.config.NodeName] = nodeNetworks
  385. }
  386. nodeNetworks[nid] = &network{id: nid, ltime: ltime}
  387. nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{
  388. NumNodes: func() int {
  389. nDB.RLock()
  390. num := len(nDB.networkNodes[nid])
  391. nDB.RUnlock()
  392. return num
  393. },
  394. RetransmitMult: 4,
  395. }
  396. nDB.networkNodes[nid] = append(nDB.networkNodes[nid], nDB.config.NodeName)
  397. networkNodes := nDB.networkNodes[nid]
  398. nDB.Unlock()
  399. if err := nDB.sendNetworkEvent(nid, NetworkEventTypeJoin, ltime); err != nil {
  400. return fmt.Errorf("failed to send leave network event for %s: %v", nid, err)
  401. }
  402. logrus.Debugf("%s: joined network %s", nDB.config.NodeName, nid)
  403. if _, err := nDB.bulkSync(networkNodes, true); err != nil {
  404. logrus.Errorf("Error bulk syncing while joining network %s: %v", nid, err)
  405. }
  406. return nil
  407. }
  408. // LeaveNetwork leaves this node from a given network and propagates
  409. // this event across the cluster. This triggers this node leaving the
  410. // sub-cluster of this network and as a result will no longer
  411. // participate in the network-scoped gossip and bulk sync for this
  412. // network. Also remove all the table entries for this network from
  413. // networkdb
  414. func (nDB *NetworkDB) LeaveNetwork(nid string) error {
  415. ltime := nDB.networkClock.Increment()
  416. if err := nDB.sendNetworkEvent(nid, NetworkEventTypeLeave, ltime); err != nil {
  417. return fmt.Errorf("failed to send leave network event for %s: %v", nid, err)
  418. }
  419. nDB.Lock()
  420. defer nDB.Unlock()
  421. var (
  422. paths []string
  423. entries []*entry
  424. )
  425. nwWalker := func(path string, v interface{}) bool {
  426. entry, ok := v.(*entry)
  427. if !ok {
  428. return false
  429. }
  430. paths = append(paths, path)
  431. entries = append(entries, entry)
  432. return false
  433. }
  434. nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid), nwWalker)
  435. for _, path := range paths {
  436. params := strings.Split(path[1:], "/")
  437. tname := params[1]
  438. key := params[2]
  439. if _, ok := nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key)); !ok {
  440. logrus.Errorf("Could not delete entry in table %s with network id %s and key %s as it does not exist", tname, nid, key)
  441. }
  442. if _, ok := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)); !ok {
  443. logrus.Errorf("Could not delete entry in network %s with table name %s and key %s as it does not exist", nid, tname, key)
  444. }
  445. }
  446. nodeNetworks, ok := nDB.networks[nDB.config.NodeName]
  447. if !ok {
  448. return fmt.Errorf("could not find self node for network %s while trying to leave", nid)
  449. }
  450. n, ok := nodeNetworks[nid]
  451. if !ok {
  452. return fmt.Errorf("could not find network %s while trying to leave", nid)
  453. }
  454. n.ltime = ltime
  455. n.leaving = true
  456. return nil
  457. }
  458. // addNetworkNode adds the node to the list of nodes which participate
  459. // in the passed network only if it is not already present. Caller
  460. // should hold the NetworkDB lock while calling this
  461. func (nDB *NetworkDB) addNetworkNode(nid string, nodeName string) {
  462. nodes := nDB.networkNodes[nid]
  463. for _, node := range nodes {
  464. if node == nodeName {
  465. return
  466. }
  467. }
  468. nDB.networkNodes[nid] = append(nDB.networkNodes[nid], nodeName)
  469. }
  470. // Deletes the node from the list of nodes which participate in the
  471. // passed network. Caller should hold the NetworkDB lock while calling
  472. // this
  473. func (nDB *NetworkDB) deleteNetworkNode(nid string, nodeName string) {
  474. nodes := nDB.networkNodes[nid]
  475. newNodes := make([]string, 0, len(nodes)-1)
  476. for _, name := range nodes {
  477. if name == nodeName {
  478. continue
  479. }
  480. newNodes = append(newNodes, name)
  481. }
  482. nDB.networkNodes[nid] = newNodes
  483. }
  484. // findCommonnetworks find the networks that both this node and the
  485. // passed node have joined.
  486. func (nDB *NetworkDB) findCommonNetworks(nodeName string) []string {
  487. nDB.RLock()
  488. defer nDB.RUnlock()
  489. var networks []string
  490. for nid := range nDB.networks[nDB.config.NodeName] {
  491. if n, ok := nDB.networks[nodeName][nid]; ok {
  492. if !n.leaving {
  493. networks = append(networks, nid)
  494. }
  495. }
  496. }
  497. return networks
  498. }
  499. func (nDB *NetworkDB) updateLocalNetworkTime() {
  500. nDB.Lock()
  501. defer nDB.Unlock()
  502. ltime := nDB.networkClock.Increment()
  503. for _, n := range nDB.networks[nDB.config.NodeName] {
  504. n.ltime = ltime
  505. }
  506. }
  507. func (nDB *NetworkDB) updateLocalTableTime() {
  508. nDB.Lock()
  509. defer nDB.Unlock()
  510. ltime := nDB.tableClock.Increment()
  511. nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
  512. entry := v.(*entry)
  513. if entry.node != nDB.config.NodeName {
  514. return false
  515. }
  516. params := strings.Split(path[1:], "/")
  517. tname := params[0]
  518. nid := params[1]
  519. key := params[2]
  520. entry.ltime = ltime
  521. nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
  522. nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
  523. return false
  524. })
  525. }