networkdb.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588
  1. package networkdb
  2. //go:generate protoc -I.:../Godeps/_workspace/src/github.com/gogo/protobuf --gogo_out=import_path=github.com/docker/libnetwork/networkdb,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. networkdb.proto
  3. import (
  4. "fmt"
  5. "strings"
  6. "sync"
  7. "time"
  8. "github.com/Sirupsen/logrus"
  9. "github.com/armon/go-radix"
  10. "github.com/docker/go-events"
  11. "github.com/docker/libnetwork/types"
  12. "github.com/hashicorp/memberlist"
  13. "github.com/hashicorp/serf/serf"
  14. )
  15. const (
  16. byTable int = 1 + iota
  17. byNetwork
  18. )
  19. // NetworkDB instance drives the networkdb cluster and acts the broker
  20. // for cluster-scoped and network-scoped gossip and watches.
  21. type NetworkDB struct {
  22. // The clocks MUST be the first things
  23. // in this struct due to Golang issue #599.
  24. // Global lamport clock for node network attach events.
  25. networkClock serf.LamportClock
  26. // Global lamport clock for table events.
  27. tableClock serf.LamportClock
  28. sync.RWMutex
  29. // NetworkDB configuration.
  30. config *Config
  31. // All the tree index (byTable, byNetwork) that we maintain
  32. // the db.
  33. indexes map[int]*radix.Tree
  34. // Memberlist we use to drive the cluster.
  35. memberlist *memberlist.Memberlist
  36. // List of all peer nodes in the cluster not-limited to any
  37. // network.
  38. nodes map[string]*node
  39. // List of all peer nodes which have failed
  40. failedNodes map[string]*node
  41. // List of all peer nodes which have left
  42. leftNodes map[string]*node
  43. // A multi-dimensional map of network/node attachmemts. The
  44. // first key is a node name and the second key is a network ID
  45. // for the network that node is participating in.
  46. networks map[string]map[string]*network
  47. // A map of nodes which are participating in a given
  48. // network. The key is a network ID.
  49. networkNodes map[string][]string
  50. // A table of ack channels for every node from which we are
  51. // waiting for an ack.
  52. bulkSyncAckTbl map[string]chan struct{}
  53. // Broadcast queue for network event gossip.
  54. networkBroadcasts *memberlist.TransmitLimitedQueue
  55. // Broadcast queue for node event gossip.
  56. nodeBroadcasts *memberlist.TransmitLimitedQueue
  57. // A central stop channel to stop all go routines running on
  58. // behalf of the NetworkDB instance.
  59. stopCh chan struct{}
  60. // A central broadcaster for all local watchers watching table
  61. // events.
  62. broadcaster *events.Broadcaster
  63. // List of all tickers which needed to be stopped when
  64. // cleaning up.
  65. tickers []*time.Ticker
  66. // Reference to the memberlist's keyring to add & remove keys
  67. keyring *memberlist.Keyring
  68. }
  69. // PeerInfo represents the peer (gossip cluster) nodes of a network
  70. type PeerInfo struct {
  71. Name string
  72. IP string
  73. }
  74. type node struct {
  75. memberlist.Node
  76. ltime serf.LamportTime
  77. // Number of hours left before the reaper removes the node
  78. reapTime time.Duration
  79. }
  80. // network describes the node/network attachment.
  81. type network struct {
  82. // Network ID
  83. id string
  84. // Lamport time for the latest state of the entry.
  85. ltime serf.LamportTime
  86. // Node leave is in progress.
  87. leaving bool
  88. // Number of seconds still left before a deleted network entry gets
  89. // removed from networkDB
  90. reapTime time.Duration
  91. // The broadcast queue for table event gossip. This is only
  92. // initialized for this node's network attachment entries.
  93. tableBroadcasts *memberlist.TransmitLimitedQueue
  94. }
  95. // Config represents the configuration of the networdb instance and
  96. // can be passed by the caller.
  97. type Config struct {
  98. // NodeName is the cluster wide unique name for this node.
  99. NodeName string
  100. // BindAddr is the IP on which networkdb listens. It can be
  101. // 0.0.0.0 to listen on all addresses on the host.
  102. BindAddr string
  103. // AdvertiseAddr is the node's IP address that we advertise for
  104. // cluster communication.
  105. AdvertiseAddr string
  106. // BindPort is the local node's port to which we bind to for
  107. // cluster communication.
  108. BindPort int
  109. // Keys to be added to the Keyring of the memberlist. Key at index
  110. // 0 is the primary key
  111. Keys [][]byte
  112. }
  113. // entry defines a table entry
  114. type entry struct {
  115. // node from which this entry was learned.
  116. node string
  117. // Lamport time for the most recent update to the entry
  118. ltime serf.LamportTime
  119. // Opaque value store in the entry
  120. value []byte
  121. // Deleting the entry is in progress. All entries linger in
  122. // the cluster for certain amount of time after deletion.
  123. deleting bool
  124. // Number of seconds still left before a deleted table entry gets
  125. // removed from networkDB
  126. reapTime time.Duration
  127. }
  128. // New creates a new instance of NetworkDB using the Config passed by
  129. // the caller.
  130. func New(c *Config) (*NetworkDB, error) {
  131. nDB := &NetworkDB{
  132. config: c,
  133. indexes: make(map[int]*radix.Tree),
  134. networks: make(map[string]map[string]*network),
  135. nodes: make(map[string]*node),
  136. failedNodes: make(map[string]*node),
  137. leftNodes: make(map[string]*node),
  138. networkNodes: make(map[string][]string),
  139. bulkSyncAckTbl: make(map[string]chan struct{}),
  140. broadcaster: events.NewBroadcaster(),
  141. }
  142. nDB.indexes[byTable] = radix.New()
  143. nDB.indexes[byNetwork] = radix.New()
  144. if err := nDB.clusterInit(); err != nil {
  145. return nil, err
  146. }
  147. return nDB, nil
  148. }
  149. // Join joins this NetworkDB instance with a list of peer NetworkDB
  150. // instances passed by the caller in the form of addr:port
  151. func (nDB *NetworkDB) Join(members []string) error {
  152. return nDB.clusterJoin(members)
  153. }
  154. // Close destroys this NetworkDB instance by leave the cluster,
  155. // stopping timers, canceling goroutines etc.
  156. func (nDB *NetworkDB) Close() {
  157. if err := nDB.clusterLeave(); err != nil {
  158. logrus.Errorf("Could not close DB %s: %v", nDB.config.NodeName, err)
  159. }
  160. }
  161. // Peers returns the gossip peers for a given network.
  162. func (nDB *NetworkDB) Peers(nid string) []PeerInfo {
  163. nDB.RLock()
  164. defer nDB.RUnlock()
  165. peers := make([]PeerInfo, 0, len(nDB.networkNodes[nid]))
  166. for _, nodeName := range nDB.networkNodes[nid] {
  167. peers = append(peers, PeerInfo{
  168. Name: nDB.nodes[nodeName].Name,
  169. IP: nDB.nodes[nodeName].Addr.String(),
  170. })
  171. }
  172. return peers
  173. }
  174. // GetEntry retrieves the value of a table entry in a given (network,
  175. // table, key) tuple
  176. func (nDB *NetworkDB) GetEntry(tname, nid, key string) ([]byte, error) {
  177. entry, err := nDB.getEntry(tname, nid, key)
  178. if err != nil {
  179. return nil, err
  180. }
  181. return entry.value, nil
  182. }
  183. func (nDB *NetworkDB) getEntry(tname, nid, key string) (*entry, error) {
  184. nDB.RLock()
  185. defer nDB.RUnlock()
  186. e, ok := nDB.indexes[byTable].Get(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
  187. if !ok {
  188. return nil, types.NotFoundErrorf("could not get entry in table %s with network id %s and key %s", tname, nid, key)
  189. }
  190. return e.(*entry), nil
  191. }
  192. // CreateEntry creates a table entry in NetworkDB for given (network,
  193. // table, key) tuple and if the NetworkDB is part of the cluster
  194. // propogates this event to the cluster. It is an error to create an
  195. // entry for the same tuple for which there is already an existing
  196. // entry unless the current entry is deleting state.
  197. func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error {
  198. oldEntry, err := nDB.getEntry(tname, nid, key)
  199. if err != nil {
  200. if _, ok := err.(types.NotFoundError); !ok {
  201. return fmt.Errorf("cannot create entry in table %s with network id %s and key %s: %v", tname, nid, key, err)
  202. }
  203. }
  204. if oldEntry != nil && !oldEntry.deleting {
  205. return fmt.Errorf("cannot create entry in table %s with network id %s and key %s, already exists", tname, nid, key)
  206. }
  207. entry := &entry{
  208. ltime: nDB.tableClock.Increment(),
  209. node: nDB.config.NodeName,
  210. value: value,
  211. }
  212. if err := nDB.sendTableEvent(TableEventTypeCreate, nid, tname, key, entry); err != nil {
  213. return fmt.Errorf("cannot send table create event: %v", err)
  214. }
  215. nDB.Lock()
  216. nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
  217. nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
  218. nDB.Unlock()
  219. nDB.broadcaster.Write(makeEvent(opCreate, tname, nid, key, value))
  220. return nil
  221. }
  222. // UpdateEntry updates a table entry in NetworkDB for given (network,
  223. // table, key) tuple and if the NetworkDB is part of the cluster
  224. // propogates this event to the cluster. It is an error to update a
  225. // non-existent entry.
  226. func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error {
  227. if _, err := nDB.GetEntry(tname, nid, key); err != nil {
  228. return fmt.Errorf("cannot update entry as the entry in table %s with network id %s and key %s does not exist", tname, nid, key)
  229. }
  230. entry := &entry{
  231. ltime: nDB.tableClock.Increment(),
  232. node: nDB.config.NodeName,
  233. value: value,
  234. }
  235. if err := nDB.sendTableEvent(TableEventTypeUpdate, nid, tname, key, entry); err != nil {
  236. return fmt.Errorf("cannot send table update event: %v", err)
  237. }
  238. nDB.Lock()
  239. nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
  240. nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
  241. nDB.Unlock()
  242. nDB.broadcaster.Write(makeEvent(opUpdate, tname, nid, key, value))
  243. return nil
  244. }
  245. // DeleteEntry deletes a table entry in NetworkDB for given (network,
  246. // table, key) tuple and if the NetworkDB is part of the cluster
  247. // propogates this event to the cluster.
  248. func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {
  249. value, err := nDB.GetEntry(tname, nid, key)
  250. if err != nil {
  251. return fmt.Errorf("cannot delete entry as the entry in table %s with network id %s and key %s does not exist", tname, nid, key)
  252. }
  253. entry := &entry{
  254. ltime: nDB.tableClock.Increment(),
  255. node: nDB.config.NodeName,
  256. value: value,
  257. deleting: true,
  258. reapTime: reapInterval,
  259. }
  260. if err := nDB.sendTableEvent(TableEventTypeDelete, nid, tname, key, entry); err != nil {
  261. return fmt.Errorf("cannot send table delete event: %v", err)
  262. }
  263. nDB.Lock()
  264. nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
  265. nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
  266. nDB.Unlock()
  267. nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, value))
  268. return nil
  269. }
  270. func (nDB *NetworkDB) deleteNetworkEntriesForNode(deletedNode string) {
  271. nDB.Lock()
  272. for nid, nodes := range nDB.networkNodes {
  273. updatedNodes := make([]string, 0, len(nodes))
  274. for _, node := range nodes {
  275. if node == deletedNode {
  276. continue
  277. }
  278. updatedNodes = append(updatedNodes, node)
  279. }
  280. nDB.networkNodes[nid] = updatedNodes
  281. }
  282. delete(nDB.networks, deletedNode)
  283. nDB.Unlock()
  284. }
  285. func (nDB *NetworkDB) deleteNodeTableEntries(node string) {
  286. nDB.Lock()
  287. nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
  288. oldEntry := v.(*entry)
  289. if oldEntry.node != node {
  290. return false
  291. }
  292. params := strings.Split(path[1:], "/")
  293. tname := params[0]
  294. nid := params[1]
  295. key := params[2]
  296. entry := &entry{
  297. ltime: oldEntry.ltime,
  298. node: node,
  299. value: oldEntry.value,
  300. deleting: true,
  301. reapTime: reapInterval,
  302. }
  303. nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
  304. nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
  305. nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value))
  306. return false
  307. })
  308. nDB.Unlock()
  309. }
  310. // WalkTable walks a single table in NetworkDB and invokes the passed
  311. // function for each entry in the table passing the network, key,
  312. // value. The walk stops if the passed function returns a true.
  313. func (nDB *NetworkDB) WalkTable(tname string, fn func(string, string, []byte) bool) error {
  314. nDB.RLock()
  315. values := make(map[string]interface{})
  316. nDB.indexes[byTable].WalkPrefix(fmt.Sprintf("/%s", tname), func(path string, v interface{}) bool {
  317. values[path] = v
  318. return false
  319. })
  320. nDB.RUnlock()
  321. for k, v := range values {
  322. params := strings.Split(k[1:], "/")
  323. nid := params[1]
  324. key := params[2]
  325. if fn(nid, key, v.(*entry).value) {
  326. return nil
  327. }
  328. }
  329. return nil
  330. }
  331. // JoinNetwork joins this node to a given network and propogates this
  332. // event across the cluster. This triggers this node joining the
  333. // sub-cluster of this network and participates in the network-scoped
  334. // gossip and bulk sync for this network.
  335. func (nDB *NetworkDB) JoinNetwork(nid string) error {
  336. ltime := nDB.networkClock.Increment()
  337. nDB.Lock()
  338. nodeNetworks, ok := nDB.networks[nDB.config.NodeName]
  339. if !ok {
  340. nodeNetworks = make(map[string]*network)
  341. nDB.networks[nDB.config.NodeName] = nodeNetworks
  342. }
  343. nodeNetworks[nid] = &network{id: nid, ltime: ltime}
  344. nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{
  345. NumNodes: func() int {
  346. nDB.RLock()
  347. num := len(nDB.networkNodes[nid])
  348. nDB.RUnlock()
  349. return num
  350. },
  351. RetransmitMult: 4,
  352. }
  353. nDB.networkNodes[nid] = append(nDB.networkNodes[nid], nDB.config.NodeName)
  354. networkNodes := nDB.networkNodes[nid]
  355. nDB.Unlock()
  356. if err := nDB.sendNetworkEvent(nid, NetworkEventTypeJoin, ltime); err != nil {
  357. return fmt.Errorf("failed to send leave network event for %s: %v", nid, err)
  358. }
  359. logrus.Debugf("%s: joined network %s", nDB.config.NodeName, nid)
  360. if _, err := nDB.bulkSync(networkNodes, true); err != nil {
  361. logrus.Errorf("Error bulk syncing while joining network %s: %v", nid, err)
  362. }
  363. return nil
  364. }
  365. // LeaveNetwork leaves this node from a given network and propogates
  366. // this event across the cluster. This triggers this node leaving the
  367. // sub-cluster of this network and as a result will no longer
  368. // participate in the network-scoped gossip and bulk sync for this
  369. // network. Also remove all the table entries for this network from
  370. // networkdb
  371. func (nDB *NetworkDB) LeaveNetwork(nid string) error {
  372. ltime := nDB.networkClock.Increment()
  373. if err := nDB.sendNetworkEvent(nid, NetworkEventTypeLeave, ltime); err != nil {
  374. return fmt.Errorf("failed to send leave network event for %s: %v", nid, err)
  375. }
  376. nDB.Lock()
  377. defer nDB.Unlock()
  378. var (
  379. paths []string
  380. entries []*entry
  381. )
  382. nwWalker := func(path string, v interface{}) bool {
  383. entry, ok := v.(*entry)
  384. if !ok {
  385. return false
  386. }
  387. paths = append(paths, path)
  388. entries = append(entries, entry)
  389. return false
  390. }
  391. nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid), nwWalker)
  392. for _, path := range paths {
  393. params := strings.Split(path[1:], "/")
  394. tname := params[1]
  395. key := params[2]
  396. if _, ok := nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key)); !ok {
  397. logrus.Errorf("Could not delete entry in table %s with network id %s and key %s as it does not exist", tname, nid, key)
  398. }
  399. if _, ok := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)); !ok {
  400. logrus.Errorf("Could not delete entry in network %s with table name %s and key %s as it does not exist", nid, tname, key)
  401. }
  402. }
  403. nodeNetworks, ok := nDB.networks[nDB.config.NodeName]
  404. if !ok {
  405. return fmt.Errorf("could not find self node for network %s while trying to leave", nid)
  406. }
  407. n, ok := nodeNetworks[nid]
  408. if !ok {
  409. return fmt.Errorf("could not find network %s while trying to leave", nid)
  410. }
  411. n.ltime = ltime
  412. n.leaving = true
  413. return nil
  414. }
  415. // addNetworkNode adds the node to the list of nodes which participate
  416. // in the passed network only if it is not already present. Caller
  417. // should hold the NetworkDB lock while calling this
  418. func (nDB *NetworkDB) addNetworkNode(nid string, nodeName string) {
  419. nodes := nDB.networkNodes[nid]
  420. for _, node := range nodes {
  421. if node == nodeName {
  422. return
  423. }
  424. }
  425. nDB.networkNodes[nid] = append(nDB.networkNodes[nid], nodeName)
  426. }
  427. // Deletes the node from the list of nodes which participate in the
  428. // passed network. Caller should hold the NetworkDB lock while calling
  429. // this
  430. func (nDB *NetworkDB) deleteNetworkNode(nid string, nodeName string) {
  431. nodes := nDB.networkNodes[nid]
  432. newNodes := make([]string, 0, len(nodes)-1)
  433. for _, name := range nodes {
  434. if name == nodeName {
  435. continue
  436. }
  437. newNodes = append(newNodes, name)
  438. }
  439. nDB.networkNodes[nid] = newNodes
  440. }
  441. // findCommonnetworks find the networks that both this node and the
  442. // passed node have joined.
  443. func (nDB *NetworkDB) findCommonNetworks(nodeName string) []string {
  444. nDB.RLock()
  445. defer nDB.RUnlock()
  446. var networks []string
  447. for nid := range nDB.networks[nDB.config.NodeName] {
  448. if n, ok := nDB.networks[nodeName][nid]; ok {
  449. if !n.leaving {
  450. networks = append(networks, nid)
  451. }
  452. }
  453. }
  454. return networks
  455. }
  456. func (nDB *NetworkDB) updateLocalNetworkTime() {
  457. nDB.Lock()
  458. defer nDB.Unlock()
  459. ltime := nDB.networkClock.Increment()
  460. for _, n := range nDB.networks[nDB.config.NodeName] {
  461. n.ltime = ltime
  462. }
  463. }
  464. func (nDB *NetworkDB) updateLocalTableTime() {
  465. nDB.Lock()
  466. defer nDB.Unlock()
  467. ltime := nDB.tableClock.Increment()
  468. nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
  469. entry := v.(*entry)
  470. if entry.node != nDB.config.NodeName {
  471. return false
  472. }
  473. params := strings.Split(path[1:], "/")
  474. tname := params[0]
  475. nid := params[1]
  476. key := params[2]
  477. entry.ltime = ltime
  478. nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
  479. nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
  480. return false
  481. })
  482. }