|
@@ -24,6 +24,9 @@ const (
|
|
retryInterval = 1 * time.Second
|
|
retryInterval = 1 * time.Second
|
|
nodeReapInterval = 24 * time.Hour
|
|
nodeReapInterval = 24 * time.Hour
|
|
nodeReapPeriod = 2 * time.Hour
|
|
nodeReapPeriod = 2 * time.Hour
|
|
|
|
+ // considering a cluster with > 20 nodes and a drain speed of 100 msg/s
|
|
|
|
+ // the following is roughly 1 minute
|
|
|
|
+ maxQueueLenBroadcastOnSync = 500
|
|
)
|
|
)
|
|
|
|
|
|
type logWriter struct{}
|
|
type logWriter struct{}
|
|
@@ -52,7 +55,7 @@ func (l *logWriter) Write(p []byte) (int, error) {
|
|
|
|
|
|
// SetKey adds a new key to the key ring
|
|
// SetKey adds a new key to the key ring
|
|
func (nDB *NetworkDB) SetKey(key []byte) {
|
|
func (nDB *NetworkDB) SetKey(key []byte) {
|
|
- logrus.Debugf("Adding key %s", hex.EncodeToString(key)[0:5])
|
|
|
|
|
|
+ logrus.Debugf("Adding key %.5s", hex.EncodeToString(key))
|
|
nDB.Lock()
|
|
nDB.Lock()
|
|
defer nDB.Unlock()
|
|
defer nDB.Unlock()
|
|
for _, dbKey := range nDB.config.Keys {
|
|
for _, dbKey := range nDB.config.Keys {
|
|
@@ -69,7 +72,7 @@ func (nDB *NetworkDB) SetKey(key []byte) {
|
|
// SetPrimaryKey sets the given key as the primary key. This should have
|
|
// SetPrimaryKey sets the given key as the primary key. This should have
|
|
// been added apriori through SetKey
|
|
// been added apriori through SetKey
|
|
func (nDB *NetworkDB) SetPrimaryKey(key []byte) {
|
|
func (nDB *NetworkDB) SetPrimaryKey(key []byte) {
|
|
- logrus.Debugf("Primary Key %s", hex.EncodeToString(key)[0:5])
|
|
|
|
|
|
+ logrus.Debugf("Primary Key %.5s", hex.EncodeToString(key))
|
|
nDB.RLock()
|
|
nDB.RLock()
|
|
defer nDB.RUnlock()
|
|
defer nDB.RUnlock()
|
|
for _, dbKey := range nDB.config.Keys {
|
|
for _, dbKey := range nDB.config.Keys {
|
|
@@ -85,7 +88,7 @@ func (nDB *NetworkDB) SetPrimaryKey(key []byte) {
|
|
// RemoveKey removes a key from the key ring. The key being removed
|
|
// RemoveKey removes a key from the key ring. The key being removed
|
|
// can't be the primary key
|
|
// can't be the primary key
|
|
func (nDB *NetworkDB) RemoveKey(key []byte) {
|
|
func (nDB *NetworkDB) RemoveKey(key []byte) {
|
|
- logrus.Debugf("Remove Key %s", hex.EncodeToString(key)[0:5])
|
|
|
|
|
|
+ logrus.Debugf("Remove Key %.5s", hex.EncodeToString(key))
|
|
nDB.Lock()
|
|
nDB.Lock()
|
|
defer nDB.Unlock()
|
|
defer nDB.Unlock()
|
|
for i, dbKey := range nDB.config.Keys {
|
|
for i, dbKey := range nDB.config.Keys {
|
|
@@ -123,7 +126,7 @@ func (nDB *NetworkDB) clusterInit() error {
|
|
var err error
|
|
var err error
|
|
if len(nDB.config.Keys) > 0 {
|
|
if len(nDB.config.Keys) > 0 {
|
|
for i, key := range nDB.config.Keys {
|
|
for i, key := range nDB.config.Keys {
|
|
- logrus.Debugf("Encryption key %d: %s", i+1, hex.EncodeToString(key)[0:5])
|
|
|
|
|
|
+ logrus.Debugf("Encryption key %d: %.5s", i+1, hex.EncodeToString(key))
|
|
}
|
|
}
|
|
nDB.keyring, err = memberlist.NewKeyring(nDB.config.Keys, nDB.config.Keys[0])
|
|
nDB.keyring, err = memberlist.NewKeyring(nDB.config.Keys, nDB.config.Keys[0])
|
|
if err != nil {
|
|
if err != nil {
|
|
@@ -285,18 +288,35 @@ func (nDB *NetworkDB) rejoinClusterBootStrap() {
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ myself, _ := nDB.nodes[nDB.config.NodeID]
|
|
bootStrapIPs := make([]string, 0, len(nDB.bootStrapIP))
|
|
bootStrapIPs := make([]string, 0, len(nDB.bootStrapIP))
|
|
for _, bootIP := range nDB.bootStrapIP {
|
|
for _, bootIP := range nDB.bootStrapIP {
|
|
- for _, node := range nDB.nodes {
|
|
|
|
- if node.Addr.Equal(bootIP) {
|
|
|
|
- // One of the bootstrap nodes is part of the cluster, return
|
|
|
|
- nDB.RUnlock()
|
|
|
|
- return
|
|
|
|
|
|
+ // botostrap IPs are usually IP:port from the Join
|
|
|
|
+ var bootstrapIP net.IP
|
|
|
|
+ ipStr, _, err := net.SplitHostPort(bootIP)
|
|
|
|
+ if err != nil {
|
|
|
|
+ // try to parse it as an IP with port
|
|
|
|
+ // Note this seems to be the case for swarm that do not specify any port
|
|
|
|
+ ipStr = bootIP
|
|
|
|
+ }
|
|
|
|
+ bootstrapIP = net.ParseIP(ipStr)
|
|
|
|
+ if bootstrapIP != nil {
|
|
|
|
+ for _, node := range nDB.nodes {
|
|
|
|
+ if node.Addr.Equal(bootstrapIP) && !node.Addr.Equal(myself.Addr) {
|
|
|
|
+ // One of the bootstrap nodes (and not myself) is part of the cluster, return
|
|
|
|
+ nDB.RUnlock()
|
|
|
|
+ return
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
+ bootStrapIPs = append(bootStrapIPs, bootIP)
|
|
}
|
|
}
|
|
- bootStrapIPs = append(bootStrapIPs, bootIP.String())
|
|
|
|
}
|
|
}
|
|
nDB.RUnlock()
|
|
nDB.RUnlock()
|
|
|
|
+ if len(bootStrapIPs) == 0 {
|
|
|
|
+ // this will also avoid to call the Join with an empty list erasing the current bootstrap ip list
|
|
|
|
+ logrus.Debug("rejoinClusterBootStrap did not find any valid IP")
|
|
|
|
+ return
|
|
|
|
+ }
|
|
// None of the bootStrap nodes are in the cluster, call memberlist join
|
|
// None of the bootStrap nodes are in the cluster, call memberlist join
|
|
logrus.Debugf("rejoinClusterBootStrap, calling cluster join with bootStrap %v", bootStrapIPs)
|
|
logrus.Debugf("rejoinClusterBootStrap, calling cluster join with bootStrap %v", bootStrapIPs)
|
|
ctx, cancel := context.WithTimeout(nDB.ctx, rejoinClusterDuration)
|
|
ctx, cancel := context.WithTimeout(nDB.ctx, rejoinClusterDuration)
|
|
@@ -555,6 +575,7 @@ func (nDB *NetworkDB) bulkSync(nodes []string, all bool) ([]string, error) {
|
|
|
|
|
|
var err error
|
|
var err error
|
|
var networks []string
|
|
var networks []string
|
|
|
|
+ var success bool
|
|
for _, node := range nodes {
|
|
for _, node := range nodes {
|
|
if node == nDB.config.NodeID {
|
|
if node == nDB.config.NodeID {
|
|
continue
|
|
continue
|
|
@@ -562,21 +583,25 @@ func (nDB *NetworkDB) bulkSync(nodes []string, all bool) ([]string, error) {
|
|
logrus.Debugf("%v(%v): Initiating bulk sync with node %v", nDB.config.Hostname, nDB.config.NodeID, node)
|
|
logrus.Debugf("%v(%v): Initiating bulk sync with node %v", nDB.config.Hostname, nDB.config.NodeID, node)
|
|
networks = nDB.findCommonNetworks(node)
|
|
networks = nDB.findCommonNetworks(node)
|
|
err = nDB.bulkSyncNode(networks, node, true)
|
|
err = nDB.bulkSyncNode(networks, node, true)
|
|
- // if its periodic bulksync stop after the first successful sync
|
|
|
|
- if !all && err == nil {
|
|
|
|
- break
|
|
|
|
- }
|
|
|
|
if err != nil {
|
|
if err != nil {
|
|
err = fmt.Errorf("bulk sync to node %s failed: %v", node, err)
|
|
err = fmt.Errorf("bulk sync to node %s failed: %v", node, err)
|
|
logrus.Warn(err.Error())
|
|
logrus.Warn(err.Error())
|
|
|
|
+ } else {
|
|
|
|
+ // bulk sync succeeded
|
|
|
|
+ success = true
|
|
|
|
+ // if its periodic bulksync stop after the first successful sync
|
|
|
|
+ if !all {
|
|
|
|
+ break
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- if err != nil {
|
|
|
|
- return nil, err
|
|
|
|
|
|
+ if success {
|
|
|
|
+ // if at least one node sync succeeded
|
|
|
|
+ return networks, nil
|
|
}
|
|
}
|
|
|
|
|
|
- return networks, nil
|
|
|
|
|
|
+ return nil, err
|
|
}
|
|
}
|
|
|
|
|
|
// Bulk sync all the table entries belonging to a set of networks to a
|
|
// Bulk sync all the table entries belonging to a set of networks to a
|