networkdb.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779
  1. package networkdb
  2. //go:generate protoc -I.:../vendor/github.com/gogo/protobuf --gogo_out=import_path=github.com/docker/docker/libnetwork/networkdb,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. networkdb.proto
  3. import (
  4. "context"
  5. "fmt"
  6. "os"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/armon/go-radix"
  11. "github.com/docker/docker/libnetwork/types"
  12. "github.com/docker/docker/pkg/stringid"
  13. "github.com/docker/go-events"
  14. "github.com/hashicorp/memberlist"
  15. "github.com/hashicorp/serf/serf"
  16. "github.com/sirupsen/logrus"
  17. )
  18. const (
  19. byTable int = 1 + iota
  20. byNetwork
  21. )
  22. // NetworkDB instance drives the networkdb cluster and acts the broker
  23. // for cluster-scoped and network-scoped gossip and watches.
  24. type NetworkDB struct {
  25. // The clocks MUST be the first things
  26. // in this struct due to Golang issue #599.
  27. // Global lamport clock for node network attach events.
  28. networkClock serf.LamportClock
  29. // Global lamport clock for table events.
  30. tableClock serf.LamportClock
  31. sync.RWMutex
  32. // NetworkDB configuration.
  33. config *Config
  34. // All the tree index (byTable, byNetwork) that we maintain
  35. // the db.
  36. indexes map[int]*radix.Tree
  37. // Memberlist we use to drive the cluster.
  38. memberlist *memberlist.Memberlist
  39. // List of all peer nodes in the cluster not-limited to any
  40. // network.
  41. nodes map[string]*node
  42. // List of all peer nodes which have failed
  43. failedNodes map[string]*node
  44. // List of all peer nodes which have left
  45. leftNodes map[string]*node
  46. // A multi-dimensional map of network/node attachments. The
  47. // first key is a node name and the second key is a network ID
  48. // for the network that node is participating in.
  49. networks map[string]map[string]*network
  50. // A map of nodes which are participating in a given
  51. // network. The key is a network ID.
  52. networkNodes map[string][]string
  53. // A table of ack channels for every node from which we are
  54. // waiting for an ack.
  55. bulkSyncAckTbl map[string]chan struct{}
  56. // Broadcast queue for network event gossip.
  57. networkBroadcasts *memberlist.TransmitLimitedQueue
  58. // Broadcast queue for node event gossip.
  59. nodeBroadcasts *memberlist.TransmitLimitedQueue
  60. // A central context to stop all go routines running on
  61. // behalf of the NetworkDB instance.
  62. ctx context.Context
  63. cancelCtx context.CancelFunc
  64. // A central broadcaster for all local watchers watching table
  65. // events.
  66. broadcaster *events.Broadcaster
  67. // List of all tickers which needed to be stopped when
  68. // cleaning up.
  69. tickers []*time.Ticker
  70. // Reference to the memberlist's keyring to add & remove keys
  71. keyring *memberlist.Keyring
  72. // bootStrapIP is the list of IPs that can be used to bootstrap
  73. // the gossip.
  74. bootStrapIP []string
  75. // lastStatsTimestamp is the last timestamp when the stats got printed
  76. lastStatsTimestamp time.Time
  77. // lastHealthTimestamp is the last timestamp when the health score got printed
  78. lastHealthTimestamp time.Time
  79. }
  80. // PeerInfo represents the peer (gossip cluster) nodes of a network
  81. type PeerInfo struct {
  82. Name string
  83. IP string
  84. }
  85. // PeerClusterInfo represents the peer (gossip cluster) nodes
  86. type PeerClusterInfo struct {
  87. PeerInfo
  88. }
  89. type node struct {
  90. memberlist.Node
  91. ltime serf.LamportTime
  92. // Number of hours left before the reaper removes the node
  93. reapTime time.Duration
  94. }
  95. // network describes the node/network attachment.
  96. type network struct {
  97. // Network ID
  98. id string
  99. // Lamport time for the latest state of the entry.
  100. ltime serf.LamportTime
  101. // Gets set to true after the first bulk sync happens
  102. inSync bool
  103. // Node leave is in progress.
  104. leaving bool
  105. // Number of seconds still left before a deleted network entry gets
  106. // removed from networkDB
  107. reapTime time.Duration
  108. // The broadcast queue for table event gossip. This is only
  109. // initialized for this node's network attachment entries.
  110. tableBroadcasts *memberlist.TransmitLimitedQueue
  111. // Number of gossip messages sent related to this network during the last stats collection period
  112. qMessagesSent int
  113. // Number of entries on the network. This value is the sum of all the entries of all the tables of a specific network.
  114. // Its use is for statistics purposes. It keep tracks of database size and is printed per network every StatsPrintPeriod
  115. // interval
  116. entriesNumber int
  117. }
  118. // Config represents the configuration of the networkdb instance and
  119. // can be passed by the caller.
  120. type Config struct {
  121. // NodeID is the node unique identifier of the node when is part of the cluster
  122. NodeID string
  123. // Hostname is the node hostname.
  124. Hostname string
  125. // BindAddr is the IP on which networkdb listens. It can be
  126. // 0.0.0.0 to listen on all addresses on the host.
  127. BindAddr string
  128. // AdvertiseAddr is the node's IP address that we advertise for
  129. // cluster communication.
  130. AdvertiseAddr string
  131. // BindPort is the local node's port to which we bind to for
  132. // cluster communication.
  133. BindPort int
  134. // Keys to be added to the Keyring of the memberlist. Key at index
  135. // 0 is the primary key
  136. Keys [][]byte
  137. // PacketBufferSize is the maximum number of bytes that memberlist will
  138. // put in a packet (this will be for UDP packets by default with a NetTransport).
  139. // A safe value for this is typically 1400 bytes (which is the default). However,
  140. // depending on your network's MTU (Maximum Transmission Unit) you may
  141. // be able to increase this to get more content into each gossip packet.
  142. PacketBufferSize int
  143. // reapEntryInterval duration of a deleted entry before being garbage collected
  144. reapEntryInterval time.Duration
  145. // reapNetworkInterval duration of a delted network before being garbage collected
  146. // NOTE this MUST always be higher than reapEntryInterval
  147. reapNetworkInterval time.Duration
  148. // rejoinClusterDuration represents retryJoin timeout used by rejoinClusterBootStrap.
  149. // Default is 10sec.
  150. rejoinClusterDuration time.Duration
  151. // rejoinClusterInterval represents interval on which rejoinClusterBootStrap runs.
  152. // Default is 60sec.
  153. rejoinClusterInterval time.Duration
  154. // StatsPrintPeriod the period to use to print queue stats
  155. // Default is 5min
  156. StatsPrintPeriod time.Duration
  157. // HealthPrintPeriod the period to use to print the health score
  158. // Default is 1min
  159. HealthPrintPeriod time.Duration
  160. }
  161. // entry defines a table entry
  162. type entry struct {
  163. // node from which this entry was learned.
  164. node string
  165. // Lamport time for the most recent update to the entry
  166. ltime serf.LamportTime
  167. // Opaque value store in the entry
  168. value []byte
  169. // Deleting the entry is in progress. All entries linger in
  170. // the cluster for certain amount of time after deletion.
  171. deleting bool
  172. // Number of seconds still left before a deleted table entry gets
  173. // removed from networkDB
  174. reapTime time.Duration
  175. }
  176. // DefaultConfig returns a NetworkDB config with default values
  177. func DefaultConfig() *Config {
  178. hostname, _ := os.Hostname()
  179. return &Config{
  180. NodeID: stringid.TruncateID(stringid.GenerateRandomID()),
  181. Hostname: hostname,
  182. BindAddr: "0.0.0.0",
  183. PacketBufferSize: 1400,
  184. StatsPrintPeriod: 5 * time.Minute,
  185. HealthPrintPeriod: 1 * time.Minute,
  186. reapEntryInterval: 30 * time.Minute,
  187. rejoinClusterDuration: 10 * time.Second,
  188. rejoinClusterInterval: 60 * time.Second,
  189. }
  190. }
  191. // New creates a new instance of NetworkDB using the Config passed by
  192. // the caller.
  193. func New(c *Config) (*NetworkDB, error) {
  194. // The garbage collection logic for entries leverage the presence of the network.
  195. // For this reason the expiration time of the network is put slightly higher than the entry expiration so that
  196. // there is at least 5 extra cycle to make sure that all the entries are properly deleted before deleting the network.
  197. c.reapNetworkInterval = c.reapEntryInterval + 5*reapPeriod
  198. nDB := &NetworkDB{
  199. config: c,
  200. indexes: make(map[int]*radix.Tree),
  201. networks: make(map[string]map[string]*network),
  202. nodes: make(map[string]*node),
  203. failedNodes: make(map[string]*node),
  204. leftNodes: make(map[string]*node),
  205. networkNodes: make(map[string][]string),
  206. bulkSyncAckTbl: make(map[string]chan struct{}),
  207. broadcaster: events.NewBroadcaster(),
  208. }
  209. nDB.indexes[byTable] = radix.New()
  210. nDB.indexes[byNetwork] = radix.New()
  211. logrus.Infof("New memberlist node - Node:%v will use memberlist nodeID:%v with config:%+v", c.Hostname, c.NodeID, c)
  212. if err := nDB.clusterInit(); err != nil {
  213. return nil, err
  214. }
  215. return nDB, nil
  216. }
  217. // Join joins this NetworkDB instance with a list of peer NetworkDB
  218. // instances passed by the caller in the form of addr:port
  219. func (nDB *NetworkDB) Join(members []string) error {
  220. nDB.Lock()
  221. nDB.bootStrapIP = append([]string(nil), members...)
  222. logrus.Infof("The new bootstrap node list is:%v", nDB.bootStrapIP)
  223. nDB.Unlock()
  224. return nDB.clusterJoin(members)
  225. }
  226. // Close destroys this NetworkDB instance by leave the cluster,
  227. // stopping timers, canceling goroutines etc.
  228. func (nDB *NetworkDB) Close() {
  229. if err := nDB.clusterLeave(); err != nil {
  230. logrus.Errorf("%v(%v) Could not close DB: %v", nDB.config.Hostname, nDB.config.NodeID, err)
  231. }
  232. //Avoid (*Broadcaster).run goroutine leak
  233. nDB.broadcaster.Close()
  234. }
  235. // ClusterPeers returns all the gossip cluster peers.
  236. func (nDB *NetworkDB) ClusterPeers() []PeerInfo {
  237. nDB.RLock()
  238. defer nDB.RUnlock()
  239. peers := make([]PeerInfo, 0, len(nDB.nodes))
  240. for _, node := range nDB.nodes {
  241. peers = append(peers, PeerInfo{
  242. Name: node.Name,
  243. IP: node.Node.Addr.String(),
  244. })
  245. }
  246. return peers
  247. }
  248. // Peers returns the gossip peers for a given network.
  249. func (nDB *NetworkDB) Peers(nid string) []PeerInfo {
  250. nDB.RLock()
  251. defer nDB.RUnlock()
  252. peers := make([]PeerInfo, 0, len(nDB.networkNodes[nid]))
  253. for _, nodeName := range nDB.networkNodes[nid] {
  254. if node, ok := nDB.nodes[nodeName]; ok {
  255. peers = append(peers, PeerInfo{
  256. Name: node.Name,
  257. IP: node.Addr.String(),
  258. })
  259. } else {
  260. // Added for testing purposes, this condition should never happen else mean that the network list
  261. // is out of sync with the node list
  262. peers = append(peers, PeerInfo{Name: nodeName, IP: "unknown"})
  263. }
  264. }
  265. return peers
  266. }
  267. // GetEntry retrieves the value of a table entry in a given (network,
  268. // table, key) tuple
  269. func (nDB *NetworkDB) GetEntry(tname, nid, key string) ([]byte, error) {
  270. nDB.RLock()
  271. defer nDB.RUnlock()
  272. entry, err := nDB.getEntry(tname, nid, key)
  273. if err != nil {
  274. return nil, err
  275. }
  276. if entry != nil && entry.deleting {
  277. return nil, types.NotFoundErrorf("entry in table %s network id %s and key %s deleted and pending garbage collection", tname, nid, key)
  278. }
  279. return entry.value, nil
  280. }
  281. func (nDB *NetworkDB) getEntry(tname, nid, key string) (*entry, error) {
  282. e, ok := nDB.indexes[byTable].Get(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
  283. if !ok {
  284. return nil, types.NotFoundErrorf("could not get entry in table %s with network id %s and key %s", tname, nid, key)
  285. }
  286. return e.(*entry), nil
  287. }
  288. // CreateEntry creates a table entry in NetworkDB for given (network,
  289. // table, key) tuple and if the NetworkDB is part of the cluster
  290. // propagates this event to the cluster. It is an error to create an
  291. // entry for the same tuple for which there is already an existing
  292. // entry unless the current entry is deleting state.
  293. func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error {
  294. nDB.Lock()
  295. oldEntry, err := nDB.getEntry(tname, nid, key)
  296. if err == nil || (oldEntry != nil && !oldEntry.deleting) {
  297. nDB.Unlock()
  298. return fmt.Errorf("cannot create entry in table %s with network id %s and key %s, already exists", tname, nid, key)
  299. }
  300. entry := &entry{
  301. ltime: nDB.tableClock.Increment(),
  302. node: nDB.config.NodeID,
  303. value: value,
  304. }
  305. nDB.createOrUpdateEntry(nid, tname, key, entry)
  306. nDB.Unlock()
  307. if err := nDB.sendTableEvent(TableEventTypeCreate, nid, tname, key, entry); err != nil {
  308. return fmt.Errorf("cannot send create event for table %s, %v", tname, err)
  309. }
  310. return nil
  311. }
  312. // UpdateEntry updates a table entry in NetworkDB for given (network,
  313. // table, key) tuple and if the NetworkDB is part of the cluster
  314. // propagates this event to the cluster. It is an error to update a
  315. // non-existent entry.
  316. func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error {
  317. nDB.Lock()
  318. if _, err := nDB.getEntry(tname, nid, key); err != nil {
  319. nDB.Unlock()
  320. return fmt.Errorf("cannot update entry as the entry in table %s with network id %s and key %s does not exist", tname, nid, key)
  321. }
  322. entry := &entry{
  323. ltime: nDB.tableClock.Increment(),
  324. node: nDB.config.NodeID,
  325. value: value,
  326. }
  327. nDB.createOrUpdateEntry(nid, tname, key, entry)
  328. nDB.Unlock()
  329. if err := nDB.sendTableEvent(TableEventTypeUpdate, nid, tname, key, entry); err != nil {
  330. return fmt.Errorf("cannot send table update event: %v", err)
  331. }
  332. return nil
  333. }
  334. // TableElem elem
  335. type TableElem struct {
  336. Value []byte
  337. owner string
  338. }
  339. // GetTableByNetwork walks the networkdb by the give table and network id and
  340. // returns a map of keys and values
  341. func (nDB *NetworkDB) GetTableByNetwork(tname, nid string) map[string]*TableElem {
  342. entries := make(map[string]*TableElem)
  343. nDB.indexes[byTable].WalkPrefix(fmt.Sprintf("/%s/%s", tname, nid), func(k string, v interface{}) bool {
  344. entry := v.(*entry)
  345. if entry.deleting {
  346. return false
  347. }
  348. key := k[strings.LastIndex(k, "/")+1:]
  349. entries[key] = &TableElem{Value: entry.value, owner: entry.node}
  350. return false
  351. })
  352. return entries
  353. }
  354. // DeleteEntry deletes a table entry in NetworkDB for given (network,
  355. // table, key) tuple and if the NetworkDB is part of the cluster
  356. // propagates this event to the cluster.
  357. func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {
  358. nDB.Lock()
  359. oldEntry, err := nDB.getEntry(tname, nid, key)
  360. if err != nil || oldEntry == nil || oldEntry.deleting {
  361. nDB.Unlock()
  362. return fmt.Errorf("cannot delete entry %s with network id %s and key %s "+
  363. "does not exist or is already being deleted", tname, nid, key)
  364. }
  365. entry := &entry{
  366. ltime: nDB.tableClock.Increment(),
  367. node: nDB.config.NodeID,
  368. value: oldEntry.value,
  369. deleting: true,
  370. reapTime: nDB.config.reapEntryInterval,
  371. }
  372. nDB.createOrUpdateEntry(nid, tname, key, entry)
  373. nDB.Unlock()
  374. if err := nDB.sendTableEvent(TableEventTypeDelete, nid, tname, key, entry); err != nil {
  375. return fmt.Errorf("cannot send table delete event: %v", err)
  376. }
  377. return nil
  378. }
  379. func (nDB *NetworkDB) deleteNodeFromNetworks(deletedNode string) {
  380. for nid, nodes := range nDB.networkNodes {
  381. updatedNodes := make([]string, 0, len(nodes))
  382. for _, node := range nodes {
  383. if node == deletedNode {
  384. continue
  385. }
  386. updatedNodes = append(updatedNodes, node)
  387. }
  388. nDB.networkNodes[nid] = updatedNodes
  389. }
  390. delete(nDB.networks, deletedNode)
  391. }
  392. // deleteNodeNetworkEntries is called in 2 conditions with 2 different outcomes:
  393. // 1) when a notification is coming of a node leaving the network
  394. // - Walk all the network entries and mark the leaving node's entries for deletion
  395. // These will be garbage collected when the reap timer will expire
  396. // 2) when the local node is leaving the network
  397. // - Walk all the network entries:
  398. // A) if the entry is owned by the local node
  399. // then we will mark it for deletion. This will ensure that if a node did not
  400. // yet received the notification that the local node is leaving, will be aware
  401. // of the entries to be deleted.
  402. // B) if the entry is owned by a remote node, then we can safely delete it. This
  403. // ensures that if we join back this network as we receive the CREATE event for
  404. // entries owned by remote nodes, we will accept them and we notify the application
  405. func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {
  406. // Indicates if the delete is triggered for the local node
  407. isNodeLocal := node == nDB.config.NodeID
  408. nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid),
  409. func(path string, v interface{}) bool {
  410. oldEntry := v.(*entry)
  411. params := strings.Split(path[1:], "/")
  412. nid := params[0]
  413. tname := params[1]
  414. key := params[2]
  415. // If the entry is owned by a remote node and this node is not leaving the network
  416. if oldEntry.node != node && !isNodeLocal {
  417. // Don't do anything because the event is triggered for a node that does not own this entry
  418. return false
  419. }
  420. // If this entry is already marked for deletion and this node is not leaving the network
  421. if oldEntry.deleting && !isNodeLocal {
  422. // Don't do anything this entry will be already garbage collected using the old reapTime
  423. return false
  424. }
  425. entry := &entry{
  426. ltime: oldEntry.ltime,
  427. node: oldEntry.node,
  428. value: oldEntry.value,
  429. deleting: true,
  430. reapTime: nDB.config.reapEntryInterval,
  431. }
  432. // we arrived at this point in 2 cases:
  433. // 1) this entry is owned by the node that is leaving the network
  434. // 2) the local node is leaving the network
  435. if oldEntry.node == node {
  436. if isNodeLocal {
  437. // TODO fcrisciani: this can be removed if there is no way to leave the network
  438. // without doing a delete of all the objects
  439. entry.ltime++
  440. }
  441. if !oldEntry.deleting {
  442. nDB.createOrUpdateEntry(nid, tname, key, entry)
  443. }
  444. } else {
  445. // the local node is leaving the network, all the entries of remote nodes can be safely removed
  446. nDB.deleteEntry(nid, tname, key)
  447. }
  448. // Notify to the upper layer only entries not already marked for deletion
  449. if !oldEntry.deleting {
  450. nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value))
  451. }
  452. return false
  453. })
  454. }
  455. func (nDB *NetworkDB) deleteNodeTableEntries(node string) {
  456. nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
  457. oldEntry := v.(*entry)
  458. if oldEntry.node != node {
  459. return false
  460. }
  461. params := strings.Split(path[1:], "/")
  462. tname := params[0]
  463. nid := params[1]
  464. key := params[2]
  465. nDB.deleteEntry(nid, tname, key)
  466. if !oldEntry.deleting {
  467. nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, oldEntry.value))
  468. }
  469. return false
  470. })
  471. }
  472. // WalkTable walks a single table in NetworkDB and invokes the passed
  473. // function for each entry in the table passing the network, key,
  474. // value. The walk stops if the passed function returns a true.
  475. func (nDB *NetworkDB) WalkTable(tname string, fn func(string, string, []byte, bool) bool) error {
  476. nDB.RLock()
  477. values := make(map[string]interface{})
  478. nDB.indexes[byTable].WalkPrefix(fmt.Sprintf("/%s", tname), func(path string, v interface{}) bool {
  479. values[path] = v
  480. return false
  481. })
  482. nDB.RUnlock()
  483. for k, v := range values {
  484. params := strings.Split(k[1:], "/")
  485. nid := params[1]
  486. key := params[2]
  487. if fn(nid, key, v.(*entry).value, v.(*entry).deleting) {
  488. return nil
  489. }
  490. }
  491. return nil
  492. }
  493. // JoinNetwork joins this node to a given network and propagates this
  494. // event across the cluster. This triggers this node joining the
  495. // sub-cluster of this network and participates in the network-scoped
  496. // gossip and bulk sync for this network.
  497. func (nDB *NetworkDB) JoinNetwork(nid string) error {
  498. ltime := nDB.networkClock.Increment()
  499. nDB.Lock()
  500. nodeNetworks, ok := nDB.networks[nDB.config.NodeID]
  501. if !ok {
  502. nodeNetworks = make(map[string]*network)
  503. nDB.networks[nDB.config.NodeID] = nodeNetworks
  504. }
  505. n, ok := nodeNetworks[nid]
  506. var entries int
  507. if ok {
  508. entries = n.entriesNumber
  509. }
  510. nodeNetworks[nid] = &network{id: nid, ltime: ltime, entriesNumber: entries}
  511. nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{
  512. NumNodes: func() int {
  513. //TODO fcrisciani this can be optimized maybe avoiding the lock?
  514. // this call is done each GetBroadcasts call to evaluate the number of
  515. // replicas for the message
  516. nDB.RLock()
  517. defer nDB.RUnlock()
  518. return len(nDB.networkNodes[nid])
  519. },
  520. RetransmitMult: 4,
  521. }
  522. nDB.addNetworkNode(nid, nDB.config.NodeID)
  523. networkNodes := nDB.networkNodes[nid]
  524. n = nodeNetworks[nid]
  525. nDB.Unlock()
  526. if err := nDB.sendNetworkEvent(nid, NetworkEventTypeJoin, ltime); err != nil {
  527. return fmt.Errorf("failed to send leave network event for %s: %v", nid, err)
  528. }
  529. logrus.Debugf("%v(%v): joined network %s", nDB.config.Hostname, nDB.config.NodeID, nid)
  530. if _, err := nDB.bulkSync(networkNodes, true); err != nil {
  531. logrus.Errorf("Error bulk syncing while joining network %s: %v", nid, err)
  532. }
  533. // Mark the network as being synced
  534. // note this is a best effort, we are not checking the result of the bulk sync
  535. nDB.Lock()
  536. n.inSync = true
  537. nDB.Unlock()
  538. return nil
  539. }
  540. // LeaveNetwork leaves this node from a given network and propagates
  541. // this event across the cluster. This triggers this node leaving the
  542. // sub-cluster of this network and as a result will no longer
  543. // participate in the network-scoped gossip and bulk sync for this
  544. // network. Also remove all the table entries for this network from
  545. // networkdb
  546. func (nDB *NetworkDB) LeaveNetwork(nid string) error {
  547. ltime := nDB.networkClock.Increment()
  548. if err := nDB.sendNetworkEvent(nid, NetworkEventTypeLeave, ltime); err != nil {
  549. return fmt.Errorf("failed to send leave network event for %s: %v", nid, err)
  550. }
  551. nDB.Lock()
  552. defer nDB.Unlock()
  553. // Remove myself from the list of the nodes participating to the network
  554. nDB.deleteNetworkNode(nid, nDB.config.NodeID)
  555. // Update all the local entries marking them for deletion and delete all the remote entries
  556. nDB.deleteNodeNetworkEntries(nid, nDB.config.NodeID)
  557. nodeNetworks, ok := nDB.networks[nDB.config.NodeID]
  558. if !ok {
  559. return fmt.Errorf("could not find self node for network %s while trying to leave", nid)
  560. }
  561. n, ok := nodeNetworks[nid]
  562. if !ok {
  563. return fmt.Errorf("could not find network %s while trying to leave", nid)
  564. }
  565. logrus.Debugf("%v(%v): leaving network %s", nDB.config.Hostname, nDB.config.NodeID, nid)
  566. n.ltime = ltime
  567. n.reapTime = nDB.config.reapNetworkInterval
  568. n.leaving = true
  569. return nil
  570. }
  571. // addNetworkNode adds the node to the list of nodes which participate
  572. // in the passed network only if it is not already present. Caller
  573. // should hold the NetworkDB lock while calling this
  574. func (nDB *NetworkDB) addNetworkNode(nid string, nodeName string) {
  575. nodes := nDB.networkNodes[nid]
  576. for _, node := range nodes {
  577. if node == nodeName {
  578. return
  579. }
  580. }
  581. nDB.networkNodes[nid] = append(nDB.networkNodes[nid], nodeName)
  582. }
  583. // Deletes the node from the list of nodes which participate in the
  584. // passed network. Caller should hold the NetworkDB lock while calling
  585. // this
  586. func (nDB *NetworkDB) deleteNetworkNode(nid string, nodeName string) {
  587. nodes, ok := nDB.networkNodes[nid]
  588. if !ok || len(nodes) == 0 {
  589. return
  590. }
  591. newNodes := make([]string, 0, len(nodes)-1)
  592. for _, name := range nodes {
  593. if name == nodeName {
  594. continue
  595. }
  596. newNodes = append(newNodes, name)
  597. }
  598. nDB.networkNodes[nid] = newNodes
  599. }
  600. // findCommonnetworks find the networks that both this node and the
  601. // passed node have joined.
  602. func (nDB *NetworkDB) findCommonNetworks(nodeName string) []string {
  603. nDB.RLock()
  604. defer nDB.RUnlock()
  605. var networks []string
  606. for nid := range nDB.networks[nDB.config.NodeID] {
  607. if n, ok := nDB.networks[nodeName][nid]; ok {
  608. if !n.leaving {
  609. networks = append(networks, nid)
  610. }
  611. }
  612. }
  613. return networks
  614. }
  615. func (nDB *NetworkDB) updateLocalNetworkTime() {
  616. nDB.Lock()
  617. defer nDB.Unlock()
  618. ltime := nDB.networkClock.Increment()
  619. for _, n := range nDB.networks[nDB.config.NodeID] {
  620. n.ltime = ltime
  621. }
  622. }
  623. // createOrUpdateEntry this function handles the creation or update of entries into the local
  624. // tree store. It is also used to keep in sync the entries number of the network (all tables are aggregated)
  625. func (nDB *NetworkDB) createOrUpdateEntry(nid, tname, key string, entry interface{}) (bool, bool) {
  626. _, okTable := nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
  627. _, okNetwork := nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
  628. if !okNetwork {
  629. // Add only if it is an insert not an update
  630. n, ok := nDB.networks[nDB.config.NodeID][nid]
  631. if ok {
  632. n.entriesNumber++
  633. }
  634. }
  635. return okTable, okNetwork
  636. }
  637. // deleteEntry this function handles the deletion of entries into the local tree store.
  638. // It is also used to keep in sync the entries number of the network (all tables are aggregated)
  639. func (nDB *NetworkDB) deleteEntry(nid, tname, key string) (bool, bool) {
  640. _, okTable := nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
  641. _, okNetwork := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key))
  642. if okNetwork {
  643. // Remove only if the delete is successful
  644. n, ok := nDB.networks[nDB.config.NodeID][nid]
  645. if ok {
  646. n.entriesNumber--
  647. }
  648. }
  649. return okTable, okNetwork
  650. }