123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783 |
- package networkdb
- //go:generate protoc -I=. -I=../../vendor/ --gogofaster_out=import_path=github.com/docker/docker/libnetwork/networkdb:. networkdb.proto
- import (
- "context"
- "fmt"
- "os"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- "github.com/containerd/log"
- "github.com/docker/docker/libnetwork/types"
- "github.com/docker/docker/pkg/stringid"
- "github.com/docker/go-events"
- iradix "github.com/hashicorp/go-immutable-radix"
- "github.com/hashicorp/memberlist"
- "github.com/hashicorp/serf/serf"
- )
- const (
- byTable int = 1 + iota
- byNetwork
- )
- // NetworkDB instance drives the networkdb cluster and acts the broker
- // for cluster-scoped and network-scoped gossip and watches.
- type NetworkDB struct {
- // The clocks MUST be the first things
- // in this struct due to Golang issue #599.
- // Global lamport clock for node network attach events.
- networkClock serf.LamportClock
- // Global lamport clock for table events.
- tableClock serf.LamportClock
- sync.RWMutex
- // NetworkDB configuration.
- config *Config
- // All the tree index (byTable, byNetwork) that we maintain
- // the db.
- indexes map[int]*iradix.Tree
- // Memberlist we use to drive the cluster.
- memberlist *memberlist.Memberlist
- // List of all peer nodes in the cluster not-limited to any
- // network.
- nodes map[string]*node
- // List of all peer nodes which have failed
- failedNodes map[string]*node
- // List of all peer nodes which have left
- leftNodes map[string]*node
- // A multi-dimensional map of network/node attachments. The
- // first key is a node name and the second key is a network ID
- // for the network that node is participating in.
- networks map[string]map[string]*network
- // A map of nodes which are participating in a given
- // network. The key is a network ID.
- networkNodes map[string][]string
- // A table of ack channels for every node from which we are
- // waiting for an ack.
- bulkSyncAckTbl map[string]chan struct{}
- // Broadcast queue for network event gossip.
- networkBroadcasts *memberlist.TransmitLimitedQueue
- // Broadcast queue for node event gossip.
- nodeBroadcasts *memberlist.TransmitLimitedQueue
- // A central context to stop all go routines running on
- // behalf of the NetworkDB instance.
- ctx context.Context
- cancelCtx context.CancelFunc
- // A central broadcaster for all local watchers watching table
- // events.
- broadcaster *events.Broadcaster
- // List of all tickers which needed to be stopped when
- // cleaning up.
- tickers []*time.Ticker
- // Reference to the memberlist's keyring to add & remove keys
- keyring *memberlist.Keyring
- // bootStrapIP is the list of IPs that can be used to bootstrap
- // the gossip.
- bootStrapIP []string
- // lastStatsTimestamp is the last timestamp when the stats got printed
- lastStatsTimestamp time.Time
- // lastHealthTimestamp is the last timestamp when the health score got printed
- lastHealthTimestamp time.Time
- }
- // PeerInfo represents the peer (gossip cluster) nodes of a network
- type PeerInfo struct {
- Name string
- IP string
- }
- // PeerClusterInfo represents the peer (gossip cluster) nodes
- type PeerClusterInfo struct {
- PeerInfo
- }
- type node struct {
- memberlist.Node
- ltime serf.LamportTime
- // Number of hours left before the reaper removes the node
- reapTime time.Duration
- }
- // network describes the node/network attachment.
- type network struct {
- // Network ID
- id string
- // Lamport time for the latest state of the entry.
- ltime serf.LamportTime
- // Gets set to true after the first bulk sync happens
- inSync bool
- // Node leave is in progress.
- leaving bool
- // Number of seconds still left before a deleted network entry gets
- // removed from networkDB
- reapTime time.Duration
- // The broadcast queue for table event gossip. This is only
- // initialized for this node's network attachment entries.
- tableBroadcasts *memberlist.TransmitLimitedQueue
- // Number of gossip messages sent related to this network during the last stats collection period
- qMessagesSent atomic.Int64
- // Number of entries on the network. This value is the sum of all the entries of all the tables of a specific network.
- // Its use is for statistics purposes. It keep tracks of database size and is printed per network every StatsPrintPeriod
- // interval
- entriesNumber atomic.Int64
- }
- // Config represents the configuration of the networkdb instance and
- // can be passed by the caller.
- type Config struct {
- // NodeID is the node unique identifier of the node when is part of the cluster
- NodeID string
- // Hostname is the node hostname.
- Hostname string
- // BindAddr is the IP on which networkdb listens. It can be
- // 0.0.0.0 to listen on all addresses on the host.
- BindAddr string
- // AdvertiseAddr is the node's IP address that we advertise for
- // cluster communication.
- AdvertiseAddr string
- // BindPort is the local node's port to which we bind to for
- // cluster communication.
- BindPort int
- // Keys to be added to the Keyring of the memberlist. Key at index
- // 0 is the primary key
- Keys [][]byte
- // PacketBufferSize is the maximum number of bytes that memberlist will
- // put in a packet (this will be for UDP packets by default with a NetTransport).
- // A safe value for this is typically 1400 bytes (which is the default). However,
- // depending on your network's MTU (Maximum Transmission Unit) you may
- // be able to increase this to get more content into each gossip packet.
- PacketBufferSize int
- // reapEntryInterval duration of a deleted entry before being garbage collected
- reapEntryInterval time.Duration
- // reapNetworkInterval duration of a delted network before being garbage collected
- // NOTE this MUST always be higher than reapEntryInterval
- reapNetworkInterval time.Duration
- // rejoinClusterDuration represents retryJoin timeout used by rejoinClusterBootStrap.
- // Default is 10sec.
- rejoinClusterDuration time.Duration
- // rejoinClusterInterval represents interval on which rejoinClusterBootStrap runs.
- // Default is 60sec.
- rejoinClusterInterval time.Duration
- // StatsPrintPeriod the period to use to print queue stats
- // Default is 5min
- StatsPrintPeriod time.Duration
- // HealthPrintPeriod the period to use to print the health score
- // Default is 1min
- HealthPrintPeriod time.Duration
- }
- // entry defines a table entry
- type entry struct {
- // node from which this entry was learned.
- node string
- // Lamport time for the most recent update to the entry
- ltime serf.LamportTime
- // Opaque value store in the entry
- value []byte
- // Deleting the entry is in progress. All entries linger in
- // the cluster for certain amount of time after deletion.
- deleting bool
- // Number of seconds still left before a deleted table entry gets
- // removed from networkDB
- reapTime time.Duration
- }
- // DefaultConfig returns a NetworkDB config with default values
- func DefaultConfig() *Config {
- hostname, _ := os.Hostname()
- return &Config{
- NodeID: stringid.TruncateID(stringid.GenerateRandomID()),
- Hostname: hostname,
- BindAddr: "0.0.0.0",
- PacketBufferSize: 1400,
- StatsPrintPeriod: 5 * time.Minute,
- HealthPrintPeriod: 1 * time.Minute,
- reapEntryInterval: 30 * time.Minute,
- rejoinClusterDuration: 10 * time.Second,
- rejoinClusterInterval: 60 * time.Second,
- }
- }
- // New creates a new instance of NetworkDB using the Config passed by
- // the caller.
- func New(c *Config) (*NetworkDB, error) {
- // The garbage collection logic for entries leverage the presence of the network.
- // For this reason the expiration time of the network is put slightly higher than the entry expiration so that
- // there is at least 5 extra cycle to make sure that all the entries are properly deleted before deleting the network.
- c.reapNetworkInterval = c.reapEntryInterval + 5*reapPeriod
- nDB := &NetworkDB{
- config: c,
- indexes: make(map[int]*iradix.Tree),
- networks: make(map[string]map[string]*network),
- nodes: make(map[string]*node),
- failedNodes: make(map[string]*node),
- leftNodes: make(map[string]*node),
- networkNodes: make(map[string][]string),
- bulkSyncAckTbl: make(map[string]chan struct{}),
- broadcaster: events.NewBroadcaster(),
- }
- nDB.indexes[byTable] = iradix.New()
- nDB.indexes[byNetwork] = iradix.New()
- log.G(context.TODO()).Infof("New memberlist node - Node:%v will use memberlist nodeID:%v with config:%+v", c.Hostname, c.NodeID, c)
- if err := nDB.clusterInit(); err != nil {
- return nil, err
- }
- return nDB, nil
- }
- // Join joins this NetworkDB instance with a list of peer NetworkDB
- // instances passed by the caller in the form of addr:port
- func (nDB *NetworkDB) Join(members []string) error {
- nDB.Lock()
- nDB.bootStrapIP = append([]string(nil), members...)
- log.G(context.TODO()).Infof("The new bootstrap node list is:%v", nDB.bootStrapIP)
- nDB.Unlock()
- return nDB.clusterJoin(members)
- }
- // Close destroys this NetworkDB instance by leave the cluster,
- // stopping timers, canceling goroutines etc.
- func (nDB *NetworkDB) Close() {
- if err := nDB.clusterLeave(); err != nil {
- log.G(context.TODO()).Errorf("%v(%v) Could not close DB: %v", nDB.config.Hostname, nDB.config.NodeID, err)
- }
- // Avoid (*Broadcaster).run goroutine leak
- nDB.broadcaster.Close()
- }
- // ClusterPeers returns all the gossip cluster peers.
- func (nDB *NetworkDB) ClusterPeers() []PeerInfo {
- nDB.RLock()
- defer nDB.RUnlock()
- peers := make([]PeerInfo, 0, len(nDB.nodes))
- for _, node := range nDB.nodes {
- peers = append(peers, PeerInfo{
- Name: node.Name,
- IP: node.Node.Addr.String(),
- })
- }
- return peers
- }
- // Peers returns the gossip peers for a given network.
- func (nDB *NetworkDB) Peers(nid string) []PeerInfo {
- nDB.RLock()
- defer nDB.RUnlock()
- peers := make([]PeerInfo, 0, len(nDB.networkNodes[nid]))
- for _, nodeName := range nDB.networkNodes[nid] {
- if node, ok := nDB.nodes[nodeName]; ok {
- peers = append(peers, PeerInfo{
- Name: node.Name,
- IP: node.Addr.String(),
- })
- } else {
- // Added for testing purposes, this condition should never happen else mean that the network list
- // is out of sync with the node list
- peers = append(peers, PeerInfo{Name: nodeName, IP: "unknown"})
- }
- }
- return peers
- }
- // GetEntry retrieves the value of a table entry in a given (network,
- // table, key) tuple
- func (nDB *NetworkDB) GetEntry(tname, nid, key string) ([]byte, error) {
- nDB.RLock()
- defer nDB.RUnlock()
- entry, err := nDB.getEntry(tname, nid, key)
- if err != nil {
- return nil, err
- }
- if entry != nil && entry.deleting {
- return nil, types.NotFoundErrorf("entry in table %s network id %s and key %s deleted and pending garbage collection", tname, nid, key)
- }
- return entry.value, nil
- }
- func (nDB *NetworkDB) getEntry(tname, nid, key string) (*entry, error) {
- e, ok := nDB.indexes[byTable].Get([]byte(fmt.Sprintf("/%s/%s/%s", tname, nid, key)))
- if !ok {
- return nil, types.NotFoundErrorf("could not get entry in table %s with network id %s and key %s", tname, nid, key)
- }
- return e.(*entry), nil
- }
- // CreateEntry creates a table entry in NetworkDB for given (network,
- // table, key) tuple and if the NetworkDB is part of the cluster
- // propagates this event to the cluster. It is an error to create an
- // entry for the same tuple for which there is already an existing
- // entry unless the current entry is deleting state.
- func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error {
- nDB.Lock()
- oldEntry, err := nDB.getEntry(tname, nid, key)
- if err == nil || (oldEntry != nil && !oldEntry.deleting) {
- nDB.Unlock()
- return fmt.Errorf("cannot create entry in table %s with network id %s and key %s, already exists", tname, nid, key)
- }
- entry := &entry{
- ltime: nDB.tableClock.Increment(),
- node: nDB.config.NodeID,
- value: value,
- }
- nDB.createOrUpdateEntry(nid, tname, key, entry)
- nDB.Unlock()
- if err := nDB.sendTableEvent(TableEventTypeCreate, nid, tname, key, entry); err != nil {
- return fmt.Errorf("cannot send create event for table %s, %v", tname, err)
- }
- return nil
- }
- // UpdateEntry updates a table entry in NetworkDB for given (network,
- // table, key) tuple and if the NetworkDB is part of the cluster
- // propagates this event to the cluster. It is an error to update a
- // non-existent entry.
- func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error {
- nDB.Lock()
- if _, err := nDB.getEntry(tname, nid, key); err != nil {
- nDB.Unlock()
- return fmt.Errorf("cannot update entry as the entry in table %s with network id %s and key %s does not exist", tname, nid, key)
- }
- entry := &entry{
- ltime: nDB.tableClock.Increment(),
- node: nDB.config.NodeID,
- value: value,
- }
- nDB.createOrUpdateEntry(nid, tname, key, entry)
- nDB.Unlock()
- if err := nDB.sendTableEvent(TableEventTypeUpdate, nid, tname, key, entry); err != nil {
- return fmt.Errorf("cannot send table update event: %v", err)
- }
- return nil
- }
- // TableElem elem
- type TableElem struct {
- Value []byte
- owner string
- }
- // GetTableByNetwork walks the networkdb by the give table and network id and
- // returns a map of keys and values
- func (nDB *NetworkDB) GetTableByNetwork(tname, nid string) map[string]*TableElem {
- entries := make(map[string]*TableElem)
- nDB.indexes[byTable].Root().WalkPrefix([]byte(fmt.Sprintf("/%s/%s", tname, nid)), func(k []byte, v interface{}) bool {
- entry := v.(*entry)
- if entry.deleting {
- return false
- }
- key := string(k)
- key = key[strings.LastIndex(key, "/")+1:]
- entries[key] = &TableElem{Value: entry.value, owner: entry.node}
- return false
- })
- return entries
- }
- // DeleteEntry deletes a table entry in NetworkDB for given (network,
- // table, key) tuple and if the NetworkDB is part of the cluster
- // propagates this event to the cluster.
- func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {
- nDB.Lock()
- oldEntry, err := nDB.getEntry(tname, nid, key)
- if err != nil || oldEntry == nil || oldEntry.deleting {
- nDB.Unlock()
- return fmt.Errorf("cannot delete entry %s with network id %s and key %s "+
- "does not exist or is already being deleted", tname, nid, key)
- }
- entry := &entry{
- ltime: nDB.tableClock.Increment(),
- node: nDB.config.NodeID,
- value: oldEntry.value,
- deleting: true,
- reapTime: nDB.config.reapEntryInterval,
- }
- nDB.createOrUpdateEntry(nid, tname, key, entry)
- nDB.Unlock()
- if err := nDB.sendTableEvent(TableEventTypeDelete, nid, tname, key, entry); err != nil {
- return fmt.Errorf("cannot send table delete event: %v", err)
- }
- return nil
- }
- func (nDB *NetworkDB) deleteNodeFromNetworks(deletedNode string) {
- for nid, nodes := range nDB.networkNodes {
- updatedNodes := make([]string, 0, len(nodes))
- for _, node := range nodes {
- if node == deletedNode {
- continue
- }
- updatedNodes = append(updatedNodes, node)
- }
- nDB.networkNodes[nid] = updatedNodes
- }
- delete(nDB.networks, deletedNode)
- }
- // deleteNodeNetworkEntries is called in 2 conditions with 2 different outcomes:
- // 1) when a notification is coming of a node leaving the network
- // - Walk all the network entries and mark the leaving node's entries for deletion
- // These will be garbage collected when the reap timer will expire
- //
- // 2) when the local node is leaving the network
- // - Walk all the network entries:
- // A) if the entry is owned by the local node
- // then we will mark it for deletion. This will ensure that if a node did not
- // yet received the notification that the local node is leaving, will be aware
- // of the entries to be deleted.
- // B) if the entry is owned by a remote node, then we can safely delete it. This
- // ensures that if we join back this network as we receive the CREATE event for
- // entries owned by remote nodes, we will accept them and we notify the application
- func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {
- // Indicates if the delete is triggered for the local node
- isNodeLocal := node == nDB.config.NodeID
- nDB.indexes[byNetwork].Root().WalkPrefix([]byte("/"+nid),
- func(path []byte, v interface{}) bool {
- oldEntry := v.(*entry)
- params := strings.Split(string(path[1:]), "/")
- nid := params[0]
- tname := params[1]
- key := params[2]
- // If the entry is owned by a remote node and this node is not leaving the network
- if oldEntry.node != node && !isNodeLocal {
- // Don't do anything because the event is triggered for a node that does not own this entry
- return false
- }
- // If this entry is already marked for deletion and this node is not leaving the network
- if oldEntry.deleting && !isNodeLocal {
- // Don't do anything this entry will be already garbage collected using the old reapTime
- return false
- }
- entry := &entry{
- ltime: oldEntry.ltime,
- node: oldEntry.node,
- value: oldEntry.value,
- deleting: true,
- reapTime: nDB.config.reapEntryInterval,
- }
- // we arrived at this point in 2 cases:
- // 1) this entry is owned by the node that is leaving the network
- // 2) the local node is leaving the network
- if oldEntry.node == node {
- if isNodeLocal {
- // TODO fcrisciani: this can be removed if there is no way to leave the network
- // without doing a delete of all the objects
- entry.ltime++
- }
- if !oldEntry.deleting {
- nDB.createOrUpdateEntry(nid, tname, key, entry)
- }
- } else {
- // the local node is leaving the network, all the entries of remote nodes can be safely removed
- nDB.deleteEntry(nid, tname, key)
- }
- // Notify to the upper layer only entries not already marked for deletion
- if !oldEntry.deleting {
- nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value))
- }
- return false
- })
- }
- func (nDB *NetworkDB) deleteNodeTableEntries(node string) {
- nDB.indexes[byTable].Root().Walk(func(path []byte, v interface{}) bool {
- oldEntry := v.(*entry)
- if oldEntry.node != node {
- return false
- }
- params := strings.Split(string(path[1:]), "/")
- tname := params[0]
- nid := params[1]
- key := params[2]
- nDB.deleteEntry(nid, tname, key)
- if !oldEntry.deleting {
- nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, oldEntry.value))
- }
- return false
- })
- }
- // WalkTable walks a single table in NetworkDB and invokes the passed
- // function for each entry in the table passing the network, key,
- // value. The walk stops if the passed function returns a true.
- func (nDB *NetworkDB) WalkTable(tname string, fn func(string, string, []byte, bool) bool) error {
- nDB.RLock()
- values := make(map[string]interface{})
- nDB.indexes[byTable].Root().WalkPrefix([]byte("/"+tname), func(path []byte, v interface{}) bool {
- values[string(path)] = v
- return false
- })
- nDB.RUnlock()
- for k, v := range values {
- params := strings.Split(k[1:], "/")
- nid := params[1]
- key := params[2]
- if fn(nid, key, v.(*entry).value, v.(*entry).deleting) {
- return nil
- }
- }
- return nil
- }
- // JoinNetwork joins this node to a given network and propagates this
- // event across the cluster. This triggers this node joining the
- // sub-cluster of this network and participates in the network-scoped
- // gossip and bulk sync for this network.
- func (nDB *NetworkDB) JoinNetwork(nid string) error {
- ltime := nDB.networkClock.Increment()
- nDB.Lock()
- nodeNetworks, ok := nDB.networks[nDB.config.NodeID]
- if !ok {
- nodeNetworks = make(map[string]*network)
- nDB.networks[nDB.config.NodeID] = nodeNetworks
- }
- n, ok := nodeNetworks[nid]
- var entries int64
- if ok {
- entries = n.entriesNumber.Load()
- }
- nodeNetworks[nid] = &network{id: nid, ltime: ltime}
- nodeNetworks[nid].entriesNumber.Store(entries)
- nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{
- NumNodes: func() int {
- // TODO fcrisciani this can be optimized maybe avoiding the lock?
- // this call is done each GetBroadcasts call to evaluate the number of
- // replicas for the message
- nDB.RLock()
- defer nDB.RUnlock()
- return len(nDB.networkNodes[nid])
- },
- RetransmitMult: 4,
- }
- nDB.addNetworkNode(nid, nDB.config.NodeID)
- networkNodes := nDB.networkNodes[nid]
- n = nodeNetworks[nid]
- nDB.Unlock()
- if err := nDB.sendNetworkEvent(nid, NetworkEventTypeJoin, ltime); err != nil {
- return fmt.Errorf("failed to send leave network event for %s: %v", nid, err)
- }
- log.G(context.TODO()).Debugf("%v(%v): joined network %s", nDB.config.Hostname, nDB.config.NodeID, nid)
- if _, err := nDB.bulkSync(networkNodes, true); err != nil {
- log.G(context.TODO()).Errorf("Error bulk syncing while joining network %s: %v", nid, err)
- }
- // Mark the network as being synced
- // note this is a best effort, we are not checking the result of the bulk sync
- nDB.Lock()
- n.inSync = true
- nDB.Unlock()
- return nil
- }
- // LeaveNetwork leaves this node from a given network and propagates
- // this event across the cluster. This triggers this node leaving the
- // sub-cluster of this network and as a result will no longer
- // participate in the network-scoped gossip and bulk sync for this
- // network. Also remove all the table entries for this network from
- // networkdb
- func (nDB *NetworkDB) LeaveNetwork(nid string) error {
- ltime := nDB.networkClock.Increment()
- if err := nDB.sendNetworkEvent(nid, NetworkEventTypeLeave, ltime); err != nil {
- return fmt.Errorf("failed to send leave network event for %s: %v", nid, err)
- }
- nDB.Lock()
- defer nDB.Unlock()
- // Remove myself from the list of the nodes participating to the network
- nDB.deleteNetworkNode(nid, nDB.config.NodeID)
- // Update all the local entries marking them for deletion and delete all the remote entries
- nDB.deleteNodeNetworkEntries(nid, nDB.config.NodeID)
- nodeNetworks, ok := nDB.networks[nDB.config.NodeID]
- if !ok {
- return fmt.Errorf("could not find self node for network %s while trying to leave", nid)
- }
- n, ok := nodeNetworks[nid]
- if !ok {
- return fmt.Errorf("could not find network %s while trying to leave", nid)
- }
- log.G(context.TODO()).Debugf("%v(%v): leaving network %s", nDB.config.Hostname, nDB.config.NodeID, nid)
- n.ltime = ltime
- n.reapTime = nDB.config.reapNetworkInterval
- n.leaving = true
- return nil
- }
- // addNetworkNode adds the node to the list of nodes which participate
- // in the passed network only if it is not already present. Caller
- // should hold the NetworkDB lock while calling this
- func (nDB *NetworkDB) addNetworkNode(nid string, nodeName string) {
- nodes := nDB.networkNodes[nid]
- for _, node := range nodes {
- if node == nodeName {
- return
- }
- }
- nDB.networkNodes[nid] = append(nDB.networkNodes[nid], nodeName)
- }
- // Deletes the node from the list of nodes which participate in the
- // passed network. Caller should hold the NetworkDB lock while calling
- // this
- func (nDB *NetworkDB) deleteNetworkNode(nid string, nodeName string) {
- nodes, ok := nDB.networkNodes[nid]
- if !ok || len(nodes) == 0 {
- return
- }
- newNodes := make([]string, 0, len(nodes)-1)
- for _, name := range nodes {
- if name == nodeName {
- continue
- }
- newNodes = append(newNodes, name)
- }
- nDB.networkNodes[nid] = newNodes
- }
- // findCommonnetworks find the networks that both this node and the
- // passed node have joined.
- func (nDB *NetworkDB) findCommonNetworks(nodeName string) []string {
- nDB.RLock()
- defer nDB.RUnlock()
- var networks []string
- for nid := range nDB.networks[nDB.config.NodeID] {
- if n, ok := nDB.networks[nodeName][nid]; ok {
- if !n.leaving {
- networks = append(networks, nid)
- }
- }
- }
- return networks
- }
- func (nDB *NetworkDB) updateLocalNetworkTime() {
- nDB.Lock()
- defer nDB.Unlock()
- ltime := nDB.networkClock.Increment()
- for _, n := range nDB.networks[nDB.config.NodeID] {
- n.ltime = ltime
- }
- }
- // createOrUpdateEntry this function handles the creation or update of entries into the local
- // tree store. It is also used to keep in sync the entries number of the network (all tables are aggregated)
- func (nDB *NetworkDB) createOrUpdateEntry(nid, tname, key string, entry interface{}) (okTable bool, okNetwork bool) {
- nDB.indexes[byTable], _, okTable = nDB.indexes[byTable].Insert([]byte(fmt.Sprintf("/%s/%s/%s", tname, nid, key)), entry)
- nDB.indexes[byNetwork], _, okNetwork = nDB.indexes[byNetwork].Insert([]byte(fmt.Sprintf("/%s/%s/%s", nid, tname, key)), entry)
- if !okNetwork {
- // Add only if it is an insert not an update
- n, ok := nDB.networks[nDB.config.NodeID][nid]
- if ok {
- n.entriesNumber.Add(1)
- }
- }
- return okTable, okNetwork
- }
- // deleteEntry this function handles the deletion of entries into the local tree store.
- // It is also used to keep in sync the entries number of the network (all tables are aggregated)
- func (nDB *NetworkDB) deleteEntry(nid, tname, key string) (okTable bool, okNetwork bool) {
- nDB.indexes[byTable], _, okTable = nDB.indexes[byTable].Delete([]byte(fmt.Sprintf("/%s/%s/%s", tname, nid, key)))
- nDB.indexes[byNetwork], _, okNetwork = nDB.indexes[byNetwork].Delete([]byte(fmt.Sprintf("/%s/%s/%s", nid, tname, key)))
- if okNetwork {
- // Remove only if the delete is successful
- n, ok := nDB.networks[nDB.config.NodeID][nid]
- if ok {
- n.entriesNumber.Add(-1)
- }
- }
- return okTable, okNetwork
- }
|