networkdb.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783
  1. package networkdb
  2. //go:generate protoc -I=. -I=../../vendor/ --gogofaster_out=import_path=github.com/docker/docker/libnetwork/networkdb:. networkdb.proto
  3. import (
  4. "context"
  5. "fmt"
  6. "os"
  7. "strings"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "github.com/containerd/log"
  12. "github.com/docker/docker/libnetwork/types"
  13. "github.com/docker/docker/pkg/stringid"
  14. "github.com/docker/go-events"
  15. iradix "github.com/hashicorp/go-immutable-radix"
  16. "github.com/hashicorp/memberlist"
  17. "github.com/hashicorp/serf/serf"
  18. )
  19. const (
  20. byTable int = 1 + iota
  21. byNetwork
  22. )
  23. // NetworkDB instance drives the networkdb cluster and acts the broker
  24. // for cluster-scoped and network-scoped gossip and watches.
  25. type NetworkDB struct {
  26. // The clocks MUST be the first things
  27. // in this struct due to Golang issue #599.
  28. // Global lamport clock for node network attach events.
  29. networkClock serf.LamportClock
  30. // Global lamport clock for table events.
  31. tableClock serf.LamportClock
  32. sync.RWMutex
  33. // NetworkDB configuration.
  34. config *Config
  35. // All the tree index (byTable, byNetwork) that we maintain
  36. // the db.
  37. indexes map[int]*iradix.Tree
  38. // Memberlist we use to drive the cluster.
  39. memberlist *memberlist.Memberlist
  40. // List of all peer nodes in the cluster not-limited to any
  41. // network.
  42. nodes map[string]*node
  43. // List of all peer nodes which have failed
  44. failedNodes map[string]*node
  45. // List of all peer nodes which have left
  46. leftNodes map[string]*node
  47. // A multi-dimensional map of network/node attachments. The
  48. // first key is a node name and the second key is a network ID
  49. // for the network that node is participating in.
  50. networks map[string]map[string]*network
  51. // A map of nodes which are participating in a given
  52. // network. The key is a network ID.
  53. networkNodes map[string][]string
  54. // A table of ack channels for every node from which we are
  55. // waiting for an ack.
  56. bulkSyncAckTbl map[string]chan struct{}
  57. // Broadcast queue for network event gossip.
  58. networkBroadcasts *memberlist.TransmitLimitedQueue
  59. // Broadcast queue for node event gossip.
  60. nodeBroadcasts *memberlist.TransmitLimitedQueue
  61. // A central context to stop all go routines running on
  62. // behalf of the NetworkDB instance.
  63. ctx context.Context
  64. cancelCtx context.CancelFunc
  65. // A central broadcaster for all local watchers watching table
  66. // events.
  67. broadcaster *events.Broadcaster
  68. // List of all tickers which needed to be stopped when
  69. // cleaning up.
  70. tickers []*time.Ticker
  71. // Reference to the memberlist's keyring to add & remove keys
  72. keyring *memberlist.Keyring
  73. // bootStrapIP is the list of IPs that can be used to bootstrap
  74. // the gossip.
  75. bootStrapIP []string
  76. // lastStatsTimestamp is the last timestamp when the stats got printed
  77. lastStatsTimestamp time.Time
  78. // lastHealthTimestamp is the last timestamp when the health score got printed
  79. lastHealthTimestamp time.Time
  80. }
  81. // PeerInfo represents the peer (gossip cluster) nodes of a network
  82. type PeerInfo struct {
  83. Name string
  84. IP string
  85. }
  86. // PeerClusterInfo represents the peer (gossip cluster) nodes
  87. type PeerClusterInfo struct {
  88. PeerInfo
  89. }
  90. type node struct {
  91. memberlist.Node
  92. ltime serf.LamportTime
  93. // Number of hours left before the reaper removes the node
  94. reapTime time.Duration
  95. }
  96. // network describes the node/network attachment.
  97. type network struct {
  98. // Network ID
  99. id string
  100. // Lamport time for the latest state of the entry.
  101. ltime serf.LamportTime
  102. // Gets set to true after the first bulk sync happens
  103. inSync bool
  104. // Node leave is in progress.
  105. leaving bool
  106. // Number of seconds still left before a deleted network entry gets
  107. // removed from networkDB
  108. reapTime time.Duration
  109. // The broadcast queue for table event gossip. This is only
  110. // initialized for this node's network attachment entries.
  111. tableBroadcasts *memberlist.TransmitLimitedQueue
  112. // Number of gossip messages sent related to this network during the last stats collection period
  113. qMessagesSent atomic.Int64
  114. // Number of entries on the network. This value is the sum of all the entries of all the tables of a specific network.
  115. // Its use is for statistics purposes. It keep tracks of database size and is printed per network every StatsPrintPeriod
  116. // interval
  117. entriesNumber atomic.Int64
  118. }
  119. // Config represents the configuration of the networkdb instance and
  120. // can be passed by the caller.
  121. type Config struct {
  122. // NodeID is the node unique identifier of the node when is part of the cluster
  123. NodeID string
  124. // Hostname is the node hostname.
  125. Hostname string
  126. // BindAddr is the IP on which networkdb listens. It can be
  127. // 0.0.0.0 to listen on all addresses on the host.
  128. BindAddr string
  129. // AdvertiseAddr is the node's IP address that we advertise for
  130. // cluster communication.
  131. AdvertiseAddr string
  132. // BindPort is the local node's port to which we bind to for
  133. // cluster communication.
  134. BindPort int
  135. // Keys to be added to the Keyring of the memberlist. Key at index
  136. // 0 is the primary key
  137. Keys [][]byte
  138. // PacketBufferSize is the maximum number of bytes that memberlist will
  139. // put in a packet (this will be for UDP packets by default with a NetTransport).
  140. // A safe value for this is typically 1400 bytes (which is the default). However,
  141. // depending on your network's MTU (Maximum Transmission Unit) you may
  142. // be able to increase this to get more content into each gossip packet.
  143. PacketBufferSize int
  144. // reapEntryInterval duration of a deleted entry before being garbage collected
  145. reapEntryInterval time.Duration
  146. // reapNetworkInterval duration of a delted network before being garbage collected
  147. // NOTE this MUST always be higher than reapEntryInterval
  148. reapNetworkInterval time.Duration
  149. // rejoinClusterDuration represents retryJoin timeout used by rejoinClusterBootStrap.
  150. // Default is 10sec.
  151. rejoinClusterDuration time.Duration
  152. // rejoinClusterInterval represents interval on which rejoinClusterBootStrap runs.
  153. // Default is 60sec.
  154. rejoinClusterInterval time.Duration
  155. // StatsPrintPeriod the period to use to print queue stats
  156. // Default is 5min
  157. StatsPrintPeriod time.Duration
  158. // HealthPrintPeriod the period to use to print the health score
  159. // Default is 1min
  160. HealthPrintPeriod time.Duration
  161. }
  162. // entry defines a table entry
  163. type entry struct {
  164. // node from which this entry was learned.
  165. node string
  166. // Lamport time for the most recent update to the entry
  167. ltime serf.LamportTime
  168. // Opaque value store in the entry
  169. value []byte
  170. // Deleting the entry is in progress. All entries linger in
  171. // the cluster for certain amount of time after deletion.
  172. deleting bool
  173. // Number of seconds still left before a deleted table entry gets
  174. // removed from networkDB
  175. reapTime time.Duration
  176. }
  177. // DefaultConfig returns a NetworkDB config with default values
  178. func DefaultConfig() *Config {
  179. hostname, _ := os.Hostname()
  180. return &Config{
  181. NodeID: stringid.TruncateID(stringid.GenerateRandomID()),
  182. Hostname: hostname,
  183. BindAddr: "0.0.0.0",
  184. PacketBufferSize: 1400,
  185. StatsPrintPeriod: 5 * time.Minute,
  186. HealthPrintPeriod: 1 * time.Minute,
  187. reapEntryInterval: 30 * time.Minute,
  188. rejoinClusterDuration: 10 * time.Second,
  189. rejoinClusterInterval: 60 * time.Second,
  190. }
  191. }
  192. // New creates a new instance of NetworkDB using the Config passed by
  193. // the caller.
  194. func New(c *Config) (*NetworkDB, error) {
  195. // The garbage collection logic for entries leverage the presence of the network.
  196. // For this reason the expiration time of the network is put slightly higher than the entry expiration so that
  197. // there is at least 5 extra cycle to make sure that all the entries are properly deleted before deleting the network.
  198. c.reapNetworkInterval = c.reapEntryInterval + 5*reapPeriod
  199. nDB := &NetworkDB{
  200. config: c,
  201. indexes: make(map[int]*iradix.Tree),
  202. networks: make(map[string]map[string]*network),
  203. nodes: make(map[string]*node),
  204. failedNodes: make(map[string]*node),
  205. leftNodes: make(map[string]*node),
  206. networkNodes: make(map[string][]string),
  207. bulkSyncAckTbl: make(map[string]chan struct{}),
  208. broadcaster: events.NewBroadcaster(),
  209. }
  210. nDB.indexes[byTable] = iradix.New()
  211. nDB.indexes[byNetwork] = iradix.New()
  212. log.G(context.TODO()).Infof("New memberlist node - Node:%v will use memberlist nodeID:%v with config:%+v", c.Hostname, c.NodeID, c)
  213. if err := nDB.clusterInit(); err != nil {
  214. return nil, err
  215. }
  216. return nDB, nil
  217. }
  218. // Join joins this NetworkDB instance with a list of peer NetworkDB
  219. // instances passed by the caller in the form of addr:port
  220. func (nDB *NetworkDB) Join(members []string) error {
  221. nDB.Lock()
  222. nDB.bootStrapIP = append([]string(nil), members...)
  223. log.G(context.TODO()).Infof("The new bootstrap node list is:%v", nDB.bootStrapIP)
  224. nDB.Unlock()
  225. return nDB.clusterJoin(members)
  226. }
  227. // Close destroys this NetworkDB instance by leave the cluster,
  228. // stopping timers, canceling goroutines etc.
  229. func (nDB *NetworkDB) Close() {
  230. if err := nDB.clusterLeave(); err != nil {
  231. log.G(context.TODO()).Errorf("%v(%v) Could not close DB: %v", nDB.config.Hostname, nDB.config.NodeID, err)
  232. }
  233. // Avoid (*Broadcaster).run goroutine leak
  234. nDB.broadcaster.Close()
  235. }
  236. // ClusterPeers returns all the gossip cluster peers.
  237. func (nDB *NetworkDB) ClusterPeers() []PeerInfo {
  238. nDB.RLock()
  239. defer nDB.RUnlock()
  240. peers := make([]PeerInfo, 0, len(nDB.nodes))
  241. for _, node := range nDB.nodes {
  242. peers = append(peers, PeerInfo{
  243. Name: node.Name,
  244. IP: node.Node.Addr.String(),
  245. })
  246. }
  247. return peers
  248. }
  249. // Peers returns the gossip peers for a given network.
  250. func (nDB *NetworkDB) Peers(nid string) []PeerInfo {
  251. nDB.RLock()
  252. defer nDB.RUnlock()
  253. peers := make([]PeerInfo, 0, len(nDB.networkNodes[nid]))
  254. for _, nodeName := range nDB.networkNodes[nid] {
  255. if node, ok := nDB.nodes[nodeName]; ok {
  256. peers = append(peers, PeerInfo{
  257. Name: node.Name,
  258. IP: node.Addr.String(),
  259. })
  260. } else {
  261. // Added for testing purposes, this condition should never happen else mean that the network list
  262. // is out of sync with the node list
  263. peers = append(peers, PeerInfo{Name: nodeName, IP: "unknown"})
  264. }
  265. }
  266. return peers
  267. }
  268. // GetEntry retrieves the value of a table entry in a given (network,
  269. // table, key) tuple
  270. func (nDB *NetworkDB) GetEntry(tname, nid, key string) ([]byte, error) {
  271. nDB.RLock()
  272. defer nDB.RUnlock()
  273. entry, err := nDB.getEntry(tname, nid, key)
  274. if err != nil {
  275. return nil, err
  276. }
  277. if entry != nil && entry.deleting {
  278. return nil, types.NotFoundErrorf("entry in table %s network id %s and key %s deleted and pending garbage collection", tname, nid, key)
  279. }
  280. return entry.value, nil
  281. }
  282. func (nDB *NetworkDB) getEntry(tname, nid, key string) (*entry, error) {
  283. e, ok := nDB.indexes[byTable].Get([]byte(fmt.Sprintf("/%s/%s/%s", tname, nid, key)))
  284. if !ok {
  285. return nil, types.NotFoundErrorf("could not get entry in table %s with network id %s and key %s", tname, nid, key)
  286. }
  287. return e.(*entry), nil
  288. }
  289. // CreateEntry creates a table entry in NetworkDB for given (network,
  290. // table, key) tuple and if the NetworkDB is part of the cluster
  291. // propagates this event to the cluster. It is an error to create an
  292. // entry for the same tuple for which there is already an existing
  293. // entry unless the current entry is deleting state.
  294. func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error {
  295. nDB.Lock()
  296. oldEntry, err := nDB.getEntry(tname, nid, key)
  297. if err == nil || (oldEntry != nil && !oldEntry.deleting) {
  298. nDB.Unlock()
  299. return fmt.Errorf("cannot create entry in table %s with network id %s and key %s, already exists", tname, nid, key)
  300. }
  301. entry := &entry{
  302. ltime: nDB.tableClock.Increment(),
  303. node: nDB.config.NodeID,
  304. value: value,
  305. }
  306. nDB.createOrUpdateEntry(nid, tname, key, entry)
  307. nDB.Unlock()
  308. if err := nDB.sendTableEvent(TableEventTypeCreate, nid, tname, key, entry); err != nil {
  309. return fmt.Errorf("cannot send create event for table %s, %v", tname, err)
  310. }
  311. return nil
  312. }
  313. // UpdateEntry updates a table entry in NetworkDB for given (network,
  314. // table, key) tuple and if the NetworkDB is part of the cluster
  315. // propagates this event to the cluster. It is an error to update a
  316. // non-existent entry.
  317. func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error {
  318. nDB.Lock()
  319. if _, err := nDB.getEntry(tname, nid, key); err != nil {
  320. nDB.Unlock()
  321. return fmt.Errorf("cannot update entry as the entry in table %s with network id %s and key %s does not exist", tname, nid, key)
  322. }
  323. entry := &entry{
  324. ltime: nDB.tableClock.Increment(),
  325. node: nDB.config.NodeID,
  326. value: value,
  327. }
  328. nDB.createOrUpdateEntry(nid, tname, key, entry)
  329. nDB.Unlock()
  330. if err := nDB.sendTableEvent(TableEventTypeUpdate, nid, tname, key, entry); err != nil {
  331. return fmt.Errorf("cannot send table update event: %v", err)
  332. }
  333. return nil
  334. }
  335. // TableElem elem
  336. type TableElem struct {
  337. Value []byte
  338. owner string
  339. }
  340. // GetTableByNetwork walks the networkdb by the give table and network id and
  341. // returns a map of keys and values
  342. func (nDB *NetworkDB) GetTableByNetwork(tname, nid string) map[string]*TableElem {
  343. entries := make(map[string]*TableElem)
  344. nDB.indexes[byTable].Root().WalkPrefix([]byte(fmt.Sprintf("/%s/%s", tname, nid)), func(k []byte, v interface{}) bool {
  345. entry := v.(*entry)
  346. if entry.deleting {
  347. return false
  348. }
  349. key := string(k)
  350. key = key[strings.LastIndex(key, "/")+1:]
  351. entries[key] = &TableElem{Value: entry.value, owner: entry.node}
  352. return false
  353. })
  354. return entries
  355. }
  356. // DeleteEntry deletes a table entry in NetworkDB for given (network,
  357. // table, key) tuple and if the NetworkDB is part of the cluster
  358. // propagates this event to the cluster.
  359. func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {
  360. nDB.Lock()
  361. oldEntry, err := nDB.getEntry(tname, nid, key)
  362. if err != nil || oldEntry == nil || oldEntry.deleting {
  363. nDB.Unlock()
  364. return fmt.Errorf("cannot delete entry %s with network id %s and key %s "+
  365. "does not exist or is already being deleted", tname, nid, key)
  366. }
  367. entry := &entry{
  368. ltime: nDB.tableClock.Increment(),
  369. node: nDB.config.NodeID,
  370. value: oldEntry.value,
  371. deleting: true,
  372. reapTime: nDB.config.reapEntryInterval,
  373. }
  374. nDB.createOrUpdateEntry(nid, tname, key, entry)
  375. nDB.Unlock()
  376. if err := nDB.sendTableEvent(TableEventTypeDelete, nid, tname, key, entry); err != nil {
  377. return fmt.Errorf("cannot send table delete event: %v", err)
  378. }
  379. return nil
  380. }
  381. func (nDB *NetworkDB) deleteNodeFromNetworks(deletedNode string) {
  382. for nid, nodes := range nDB.networkNodes {
  383. updatedNodes := make([]string, 0, len(nodes))
  384. for _, node := range nodes {
  385. if node == deletedNode {
  386. continue
  387. }
  388. updatedNodes = append(updatedNodes, node)
  389. }
  390. nDB.networkNodes[nid] = updatedNodes
  391. }
  392. delete(nDB.networks, deletedNode)
  393. }
  394. // deleteNodeNetworkEntries is called in 2 conditions with 2 different outcomes:
  395. // 1) when a notification is coming of a node leaving the network
  396. // - Walk all the network entries and mark the leaving node's entries for deletion
  397. // These will be garbage collected when the reap timer will expire
  398. //
  399. // 2) when the local node is leaving the network
  400. // - Walk all the network entries:
  401. // A) if the entry is owned by the local node
  402. // then we will mark it for deletion. This will ensure that if a node did not
  403. // yet received the notification that the local node is leaving, will be aware
  404. // of the entries to be deleted.
  405. // B) if the entry is owned by a remote node, then we can safely delete it. This
  406. // ensures that if we join back this network as we receive the CREATE event for
  407. // entries owned by remote nodes, we will accept them and we notify the application
  408. func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {
  409. // Indicates if the delete is triggered for the local node
  410. isNodeLocal := node == nDB.config.NodeID
  411. nDB.indexes[byNetwork].Root().WalkPrefix([]byte("/"+nid),
  412. func(path []byte, v interface{}) bool {
  413. oldEntry := v.(*entry)
  414. params := strings.Split(string(path[1:]), "/")
  415. nid := params[0]
  416. tname := params[1]
  417. key := params[2]
  418. // If the entry is owned by a remote node and this node is not leaving the network
  419. if oldEntry.node != node && !isNodeLocal {
  420. // Don't do anything because the event is triggered for a node that does not own this entry
  421. return false
  422. }
  423. // If this entry is already marked for deletion and this node is not leaving the network
  424. if oldEntry.deleting && !isNodeLocal {
  425. // Don't do anything this entry will be already garbage collected using the old reapTime
  426. return false
  427. }
  428. entry := &entry{
  429. ltime: oldEntry.ltime,
  430. node: oldEntry.node,
  431. value: oldEntry.value,
  432. deleting: true,
  433. reapTime: nDB.config.reapEntryInterval,
  434. }
  435. // we arrived at this point in 2 cases:
  436. // 1) this entry is owned by the node that is leaving the network
  437. // 2) the local node is leaving the network
  438. if oldEntry.node == node {
  439. if isNodeLocal {
  440. // TODO fcrisciani: this can be removed if there is no way to leave the network
  441. // without doing a delete of all the objects
  442. entry.ltime++
  443. }
  444. if !oldEntry.deleting {
  445. nDB.createOrUpdateEntry(nid, tname, key, entry)
  446. }
  447. } else {
  448. // the local node is leaving the network, all the entries of remote nodes can be safely removed
  449. nDB.deleteEntry(nid, tname, key)
  450. }
  451. // Notify to the upper layer only entries not already marked for deletion
  452. if !oldEntry.deleting {
  453. nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value))
  454. }
  455. return false
  456. })
  457. }
  458. func (nDB *NetworkDB) deleteNodeTableEntries(node string) {
  459. nDB.indexes[byTable].Root().Walk(func(path []byte, v interface{}) bool {
  460. oldEntry := v.(*entry)
  461. if oldEntry.node != node {
  462. return false
  463. }
  464. params := strings.Split(string(path[1:]), "/")
  465. tname := params[0]
  466. nid := params[1]
  467. key := params[2]
  468. nDB.deleteEntry(nid, tname, key)
  469. if !oldEntry.deleting {
  470. nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, oldEntry.value))
  471. }
  472. return false
  473. })
  474. }
  475. // WalkTable walks a single table in NetworkDB and invokes the passed
  476. // function for each entry in the table passing the network, key,
  477. // value. The walk stops if the passed function returns a true.
  478. func (nDB *NetworkDB) WalkTable(tname string, fn func(string, string, []byte, bool) bool) error {
  479. nDB.RLock()
  480. values := make(map[string]interface{})
  481. nDB.indexes[byTable].Root().WalkPrefix([]byte("/"+tname), func(path []byte, v interface{}) bool {
  482. values[string(path)] = v
  483. return false
  484. })
  485. nDB.RUnlock()
  486. for k, v := range values {
  487. params := strings.Split(k[1:], "/")
  488. nid := params[1]
  489. key := params[2]
  490. if fn(nid, key, v.(*entry).value, v.(*entry).deleting) {
  491. return nil
  492. }
  493. }
  494. return nil
  495. }
  496. // JoinNetwork joins this node to a given network and propagates this
  497. // event across the cluster. This triggers this node joining the
  498. // sub-cluster of this network and participates in the network-scoped
  499. // gossip and bulk sync for this network.
  500. func (nDB *NetworkDB) JoinNetwork(nid string) error {
  501. ltime := nDB.networkClock.Increment()
  502. nDB.Lock()
  503. nodeNetworks, ok := nDB.networks[nDB.config.NodeID]
  504. if !ok {
  505. nodeNetworks = make(map[string]*network)
  506. nDB.networks[nDB.config.NodeID] = nodeNetworks
  507. }
  508. n, ok := nodeNetworks[nid]
  509. var entries int64
  510. if ok {
  511. entries = n.entriesNumber.Load()
  512. }
  513. nodeNetworks[nid] = &network{id: nid, ltime: ltime}
  514. nodeNetworks[nid].entriesNumber.Store(entries)
  515. nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{
  516. NumNodes: func() int {
  517. // TODO fcrisciani this can be optimized maybe avoiding the lock?
  518. // this call is done each GetBroadcasts call to evaluate the number of
  519. // replicas for the message
  520. nDB.RLock()
  521. defer nDB.RUnlock()
  522. return len(nDB.networkNodes[nid])
  523. },
  524. RetransmitMult: 4,
  525. }
  526. nDB.addNetworkNode(nid, nDB.config.NodeID)
  527. networkNodes := nDB.networkNodes[nid]
  528. n = nodeNetworks[nid]
  529. nDB.Unlock()
  530. if err := nDB.sendNetworkEvent(nid, NetworkEventTypeJoin, ltime); err != nil {
  531. return fmt.Errorf("failed to send leave network event for %s: %v", nid, err)
  532. }
  533. log.G(context.TODO()).Debugf("%v(%v): joined network %s", nDB.config.Hostname, nDB.config.NodeID, nid)
  534. if _, err := nDB.bulkSync(networkNodes, true); err != nil {
  535. log.G(context.TODO()).Errorf("Error bulk syncing while joining network %s: %v", nid, err)
  536. }
  537. // Mark the network as being synced
  538. // note this is a best effort, we are not checking the result of the bulk sync
  539. nDB.Lock()
  540. n.inSync = true
  541. nDB.Unlock()
  542. return nil
  543. }
  544. // LeaveNetwork leaves this node from a given network and propagates
  545. // this event across the cluster. This triggers this node leaving the
  546. // sub-cluster of this network and as a result will no longer
  547. // participate in the network-scoped gossip and bulk sync for this
  548. // network. Also remove all the table entries for this network from
  549. // networkdb
  550. func (nDB *NetworkDB) LeaveNetwork(nid string) error {
  551. ltime := nDB.networkClock.Increment()
  552. if err := nDB.sendNetworkEvent(nid, NetworkEventTypeLeave, ltime); err != nil {
  553. return fmt.Errorf("failed to send leave network event for %s: %v", nid, err)
  554. }
  555. nDB.Lock()
  556. defer nDB.Unlock()
  557. // Remove myself from the list of the nodes participating to the network
  558. nDB.deleteNetworkNode(nid, nDB.config.NodeID)
  559. // Update all the local entries marking them for deletion and delete all the remote entries
  560. nDB.deleteNodeNetworkEntries(nid, nDB.config.NodeID)
  561. nodeNetworks, ok := nDB.networks[nDB.config.NodeID]
  562. if !ok {
  563. return fmt.Errorf("could not find self node for network %s while trying to leave", nid)
  564. }
  565. n, ok := nodeNetworks[nid]
  566. if !ok {
  567. return fmt.Errorf("could not find network %s while trying to leave", nid)
  568. }
  569. log.G(context.TODO()).Debugf("%v(%v): leaving network %s", nDB.config.Hostname, nDB.config.NodeID, nid)
  570. n.ltime = ltime
  571. n.reapTime = nDB.config.reapNetworkInterval
  572. n.leaving = true
  573. return nil
  574. }
  575. // addNetworkNode adds the node to the list of nodes which participate
  576. // in the passed network only if it is not already present. Caller
  577. // should hold the NetworkDB lock while calling this
  578. func (nDB *NetworkDB) addNetworkNode(nid string, nodeName string) {
  579. nodes := nDB.networkNodes[nid]
  580. for _, node := range nodes {
  581. if node == nodeName {
  582. return
  583. }
  584. }
  585. nDB.networkNodes[nid] = append(nDB.networkNodes[nid], nodeName)
  586. }
  587. // Deletes the node from the list of nodes which participate in the
  588. // passed network. Caller should hold the NetworkDB lock while calling
  589. // this
  590. func (nDB *NetworkDB) deleteNetworkNode(nid string, nodeName string) {
  591. nodes, ok := nDB.networkNodes[nid]
  592. if !ok || len(nodes) == 0 {
  593. return
  594. }
  595. newNodes := make([]string, 0, len(nodes)-1)
  596. for _, name := range nodes {
  597. if name == nodeName {
  598. continue
  599. }
  600. newNodes = append(newNodes, name)
  601. }
  602. nDB.networkNodes[nid] = newNodes
  603. }
  604. // findCommonnetworks find the networks that both this node and the
  605. // passed node have joined.
  606. func (nDB *NetworkDB) findCommonNetworks(nodeName string) []string {
  607. nDB.RLock()
  608. defer nDB.RUnlock()
  609. var networks []string
  610. for nid := range nDB.networks[nDB.config.NodeID] {
  611. if n, ok := nDB.networks[nodeName][nid]; ok {
  612. if !n.leaving {
  613. networks = append(networks, nid)
  614. }
  615. }
  616. }
  617. return networks
  618. }
  619. func (nDB *NetworkDB) updateLocalNetworkTime() {
  620. nDB.Lock()
  621. defer nDB.Unlock()
  622. ltime := nDB.networkClock.Increment()
  623. for _, n := range nDB.networks[nDB.config.NodeID] {
  624. n.ltime = ltime
  625. }
  626. }
  627. // createOrUpdateEntry this function handles the creation or update of entries into the local
  628. // tree store. It is also used to keep in sync the entries number of the network (all tables are aggregated)
  629. func (nDB *NetworkDB) createOrUpdateEntry(nid, tname, key string, entry interface{}) (okTable bool, okNetwork bool) {
  630. nDB.indexes[byTable], _, okTable = nDB.indexes[byTable].Insert([]byte(fmt.Sprintf("/%s/%s/%s", tname, nid, key)), entry)
  631. nDB.indexes[byNetwork], _, okNetwork = nDB.indexes[byNetwork].Insert([]byte(fmt.Sprintf("/%s/%s/%s", nid, tname, key)), entry)
  632. if !okNetwork {
  633. // Add only if it is an insert not an update
  634. n, ok := nDB.networks[nDB.config.NodeID][nid]
  635. if ok {
  636. n.entriesNumber.Add(1)
  637. }
  638. }
  639. return okTable, okNetwork
  640. }
  641. // deleteEntry this function handles the deletion of entries into the local tree store.
  642. // It is also used to keep in sync the entries number of the network (all tables are aggregated)
  643. func (nDB *NetworkDB) deleteEntry(nid, tname, key string) (okTable bool, okNetwork bool) {
  644. nDB.indexes[byTable], _, okTable = nDB.indexes[byTable].Delete([]byte(fmt.Sprintf("/%s/%s/%s", tname, nid, key)))
  645. nDB.indexes[byNetwork], _, okNetwork = nDB.indexes[byNetwork].Delete([]byte(fmt.Sprintf("/%s/%s/%s", nid, tname, key)))
  646. if okNetwork {
  647. // Remove only if the delete is successful
  648. n, ok := nDB.networks[nDB.config.NodeID][nid]
  649. if ok {
  650. n.entriesNumber.Add(-1)
  651. }
  652. }
  653. return okTable, okNetwork
  654. }