55e4cc7262
Added some optimizations to reduce the messages in the queue: 1) on join network the node execute a tcp sync with all the nodes that it is aware part of the specific network. During this time before the node was redistributing all the entries. This meant that if the network had 10K entries the queue of the joining node will jump to 10K. The fix adds a flag on the network that would avoid to insert any entry in the queue till the sync happens. Note that right now the flag is set in a best effort way, there is no real check if at least one of the nodes succeed. 2) limit the number of messages to redistribute coming from a TCP sync. Introduced a threshold that limit the number of messages that are propagated, this will disable this optimization in case of heavy load. Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>
494 lines
13 KiB
Go
494 lines
13 KiB
Go
package networkdb
|
|
|
|
import (
|
|
"net"
|
|
"time"
|
|
|
|
"github.com/gogo/protobuf/proto"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
type delegate struct {
|
|
nDB *NetworkDB
|
|
}
|
|
|
|
func (d *delegate) NodeMeta(limit int) []byte {
|
|
return []byte{}
|
|
}
|
|
|
|
func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
|
|
// Update our local clock if the received messages has newer
|
|
// time.
|
|
nDB.networkClock.Witness(nEvent.LTime)
|
|
|
|
nDB.Lock()
|
|
defer nDB.Unlock()
|
|
|
|
// check if the node exists
|
|
n, _, _ := nDB.findNode(nEvent.NodeName)
|
|
if n == nil {
|
|
return false
|
|
}
|
|
|
|
// check if the event is fresh
|
|
if n.ltime >= nEvent.LTime {
|
|
return false
|
|
}
|
|
|
|
// If we are here means that the event is fresher and the node is known. Update the laport time
|
|
n.ltime = nEvent.LTime
|
|
|
|
// If the node is not known from memberlist we cannot process save any state of it else if it actually
|
|
// dies we won't receive any notification and we will remain stuck with it
|
|
if _, ok := nDB.nodes[nEvent.NodeName]; !ok {
|
|
logrus.Errorf("node: %s is unknown to memberlist", nEvent.NodeName)
|
|
return false
|
|
}
|
|
|
|
switch nEvent.Type {
|
|
case NodeEventTypeJoin:
|
|
moved, err := nDB.changeNodeState(n.Name, nodeActiveState)
|
|
if err != nil {
|
|
logrus.WithError(err).Error("unable to find the node to move")
|
|
return false
|
|
}
|
|
if moved {
|
|
logrus.Infof("%v(%v): Node join event for %s/%s", nDB.config.Hostname, nDB.config.NodeID, n.Name, n.Addr)
|
|
}
|
|
return moved
|
|
case NodeEventTypeLeave:
|
|
moved, err := nDB.changeNodeState(n.Name, nodeLeftState)
|
|
if err != nil {
|
|
logrus.WithError(err).Error("unable to find the node to move")
|
|
return false
|
|
}
|
|
if moved {
|
|
logrus.Infof("%v(%v): Node leave event for %s/%s", nDB.config.Hostname, nDB.config.NodeID, n.Name, n.Addr)
|
|
}
|
|
return moved
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
|
|
// Update our local clock if the received messages has newer
|
|
// time.
|
|
nDB.networkClock.Witness(nEvent.LTime)
|
|
|
|
nDB.Lock()
|
|
defer nDB.Unlock()
|
|
|
|
if nEvent.NodeName == nDB.config.NodeID {
|
|
return false
|
|
}
|
|
|
|
nodeNetworks, ok := nDB.networks[nEvent.NodeName]
|
|
if !ok {
|
|
// We haven't heard about this node at all. Ignore the leave
|
|
if nEvent.Type == NetworkEventTypeLeave {
|
|
return false
|
|
}
|
|
|
|
nodeNetworks = make(map[string]*network)
|
|
nDB.networks[nEvent.NodeName] = nodeNetworks
|
|
}
|
|
|
|
if n, ok := nodeNetworks[nEvent.NetworkID]; ok {
|
|
// We have the latest state. Ignore the event
|
|
// since it is stale.
|
|
if n.ltime >= nEvent.LTime {
|
|
return false
|
|
}
|
|
|
|
n.ltime = nEvent.LTime
|
|
n.leaving = nEvent.Type == NetworkEventTypeLeave
|
|
if n.leaving {
|
|
n.reapTime = nDB.config.reapNetworkInterval
|
|
|
|
// The remote node is leaving the network, but not the gossip cluster.
|
|
// Mark all its entries in deleted state, this will guarantee that
|
|
// if some node bulk sync with us, the deleted state of
|
|
// these entries will be propagated.
|
|
nDB.deleteNodeNetworkEntries(nEvent.NetworkID, nEvent.NodeName)
|
|
}
|
|
|
|
if nEvent.Type == NetworkEventTypeLeave {
|
|
nDB.deleteNetworkNode(nEvent.NetworkID, nEvent.NodeName)
|
|
} else {
|
|
nDB.addNetworkNode(nEvent.NetworkID, nEvent.NodeName)
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
if nEvent.Type == NetworkEventTypeLeave {
|
|
return false
|
|
}
|
|
|
|
// If the node is not known from memberlist we cannot process save any state of it else if it actually
|
|
// dies we won't receive any notification and we will remain stuck with it
|
|
if _, ok := nDB.nodes[nEvent.NodeName]; !ok {
|
|
return false
|
|
}
|
|
|
|
// This remote network join is being seen the first time.
|
|
nodeNetworks[nEvent.NetworkID] = &network{
|
|
id: nEvent.NetworkID,
|
|
ltime: nEvent.LTime,
|
|
}
|
|
|
|
nDB.addNetworkNode(nEvent.NetworkID, nEvent.NodeName)
|
|
return true
|
|
}
|
|
|
|
func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent, isBulkSync bool) bool {
|
|
// Update our local clock if the received messages has newer time.
|
|
nDB.tableClock.Witness(tEvent.LTime)
|
|
|
|
// Ignore the table events for networks that are in the process of going away
|
|
nDB.RLock()
|
|
networks := nDB.networks[nDB.config.NodeID]
|
|
network, ok := networks[tEvent.NetworkID]
|
|
// Check if the owner of the event is still part of the network
|
|
nodes := nDB.networkNodes[tEvent.NetworkID]
|
|
var nodePresent bool
|
|
for _, node := range nodes {
|
|
if node == tEvent.NodeName {
|
|
nodePresent = true
|
|
break
|
|
}
|
|
}
|
|
nDB.RUnlock()
|
|
|
|
if !ok || network.leaving || !nodePresent {
|
|
// I'm out of the network OR the event owner is not anymore part of the network so do not propagate
|
|
return false
|
|
}
|
|
|
|
nDB.Lock()
|
|
e, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key)
|
|
if err == nil {
|
|
// We have the latest state. Ignore the event
|
|
// since it is stale.
|
|
if e.ltime >= tEvent.LTime {
|
|
nDB.Unlock()
|
|
return false
|
|
}
|
|
} else if tEvent.Type == TableEventTypeDelete && !isBulkSync {
|
|
nDB.Unlock()
|
|
// We don't know the entry, the entry is being deleted and the message is an async message
|
|
// In this case the safest approach is to ignore it, it is possible that the queue grew so much to
|
|
// exceed the garbage collection time (the residual reap time that is in the message is not being
|
|
// updated, to avoid inserting too many messages in the queue).
|
|
// Instead the messages coming from TCP bulk sync are safe with the latest value for the garbage collection time
|
|
return false
|
|
}
|
|
|
|
e = &entry{
|
|
ltime: tEvent.LTime,
|
|
node: tEvent.NodeName,
|
|
value: tEvent.Value,
|
|
deleting: tEvent.Type == TableEventTypeDelete,
|
|
reapTime: time.Duration(tEvent.ResidualReapTime) * time.Second,
|
|
}
|
|
|
|
// All the entries marked for deletion should have a reapTime set greater than 0
|
|
// This case can happen if the cluster is running different versions of the engine where the old version does not have the
|
|
// field. If that is not the case, this can be a BUG
|
|
if e.deleting && e.reapTime == 0 {
|
|
logrus.Warnf("%v(%v) handleTableEvent object %+v has a 0 reapTime, is the cluster running the same docker engine version?",
|
|
nDB.config.Hostname, nDB.config.NodeID, tEvent)
|
|
e.reapTime = nDB.config.reapEntryInterval
|
|
}
|
|
nDB.createOrUpdateEntry(tEvent.NetworkID, tEvent.TableName, tEvent.Key, e)
|
|
nDB.Unlock()
|
|
|
|
if err != nil && tEvent.Type == TableEventTypeDelete {
|
|
// Again we don't know the entry but this is coming from a TCP sync so the message body is up to date.
|
|
// We had saved the state so to speed up convergence and be able to avoid accepting create events.
|
|
// Now we will rebroadcast the message if 2 conditions are met:
|
|
// 1) we had already synced this network (during the network join)
|
|
// 2) the residual reapTime is higher than 1/6 of the total reapTime.
|
|
// If the residual reapTime is lower or equal to 1/6 of the total reapTime don't bother broadcasting it around
|
|
// most likely the cluster is already aware of it
|
|
// This also reduce the possibility that deletion of entries close to their garbage collection ends up circuling around
|
|
// forever
|
|
//logrus.Infof("exiting on delete not knowing the obj with rebroadcast:%t", network.inSync)
|
|
return network.inSync && e.reapTime > nDB.config.reapEntryInterval/6
|
|
}
|
|
|
|
var op opType
|
|
switch tEvent.Type {
|
|
case TableEventTypeCreate:
|
|
op = opCreate
|
|
case TableEventTypeUpdate:
|
|
op = opUpdate
|
|
case TableEventTypeDelete:
|
|
op = opDelete
|
|
}
|
|
|
|
nDB.broadcaster.Write(makeEvent(op, tEvent.TableName, tEvent.NetworkID, tEvent.Key, tEvent.Value))
|
|
return network.inSync
|
|
}
|
|
|
|
func (nDB *NetworkDB) handleCompound(buf []byte, isBulkSync bool) {
|
|
// Decode the parts
|
|
parts, err := decodeCompoundMessage(buf)
|
|
if err != nil {
|
|
logrus.Errorf("Failed to decode compound request: %v", err)
|
|
return
|
|
}
|
|
|
|
// Handle each message
|
|
for _, part := range parts {
|
|
nDB.handleMessage(part, isBulkSync)
|
|
}
|
|
}
|
|
|
|
func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
|
|
var tEvent TableEvent
|
|
if err := proto.Unmarshal(buf, &tEvent); err != nil {
|
|
logrus.Errorf("Error decoding table event message: %v", err)
|
|
return
|
|
}
|
|
|
|
// Ignore messages that this node generated.
|
|
if tEvent.NodeName == nDB.config.NodeID {
|
|
return
|
|
}
|
|
|
|
if rebroadcast := nDB.handleTableEvent(&tEvent, isBulkSync); rebroadcast {
|
|
var err error
|
|
buf, err = encodeRawMessage(MessageTypeTableEvent, buf)
|
|
if err != nil {
|
|
logrus.Errorf("Error marshalling gossip message for network event rebroadcast: %v", err)
|
|
return
|
|
}
|
|
|
|
nDB.RLock()
|
|
n, ok := nDB.networks[nDB.config.NodeID][tEvent.NetworkID]
|
|
nDB.RUnlock()
|
|
|
|
// if the network is not there anymore, OR we are leaving the network OR the broadcast queue is not present
|
|
if !ok || n.leaving || n.tableBroadcasts == nil {
|
|
return
|
|
}
|
|
|
|
// if the queue is over the threshold, avoid distributing information coming from TCP sync
|
|
if isBulkSync && n.tableBroadcasts.NumQueued() > maxQueueLenBroadcastOnSync {
|
|
return
|
|
}
|
|
|
|
n.tableBroadcasts.QueueBroadcast(&tableEventMessage{
|
|
msg: buf,
|
|
id: tEvent.NetworkID,
|
|
tname: tEvent.TableName,
|
|
key: tEvent.Key,
|
|
})
|
|
}
|
|
}
|
|
|
|
func (nDB *NetworkDB) handleNodeMessage(buf []byte) {
|
|
var nEvent NodeEvent
|
|
if err := proto.Unmarshal(buf, &nEvent); err != nil {
|
|
logrus.Errorf("Error decoding node event message: %v", err)
|
|
return
|
|
}
|
|
|
|
if rebroadcast := nDB.handleNodeEvent(&nEvent); rebroadcast {
|
|
var err error
|
|
buf, err = encodeRawMessage(MessageTypeNodeEvent, buf)
|
|
if err != nil {
|
|
logrus.Errorf("Error marshalling gossip message for node event rebroadcast: %v", err)
|
|
return
|
|
}
|
|
|
|
nDB.nodeBroadcasts.QueueBroadcast(&nodeEventMessage{
|
|
msg: buf,
|
|
})
|
|
}
|
|
}
|
|
|
|
func (nDB *NetworkDB) handleNetworkMessage(buf []byte) {
|
|
var nEvent NetworkEvent
|
|
if err := proto.Unmarshal(buf, &nEvent); err != nil {
|
|
logrus.Errorf("Error decoding network event message: %v", err)
|
|
return
|
|
}
|
|
|
|
if rebroadcast := nDB.handleNetworkEvent(&nEvent); rebroadcast {
|
|
var err error
|
|
buf, err = encodeRawMessage(MessageTypeNetworkEvent, buf)
|
|
if err != nil {
|
|
logrus.Errorf("Error marshalling gossip message for network event rebroadcast: %v", err)
|
|
return
|
|
}
|
|
|
|
nDB.networkBroadcasts.QueueBroadcast(&networkEventMessage{
|
|
msg: buf,
|
|
id: nEvent.NetworkID,
|
|
node: nEvent.NodeName,
|
|
})
|
|
}
|
|
}
|
|
|
|
func (nDB *NetworkDB) handleBulkSync(buf []byte) {
|
|
var bsm BulkSyncMessage
|
|
if err := proto.Unmarshal(buf, &bsm); err != nil {
|
|
logrus.Errorf("Error decoding bulk sync message: %v", err)
|
|
return
|
|
}
|
|
|
|
if bsm.LTime > 0 {
|
|
nDB.tableClock.Witness(bsm.LTime)
|
|
}
|
|
|
|
nDB.handleMessage(bsm.Payload, true)
|
|
|
|
// Don't respond to a bulk sync which was not unsolicited
|
|
if !bsm.Unsolicited {
|
|
nDB.Lock()
|
|
ch, ok := nDB.bulkSyncAckTbl[bsm.NodeName]
|
|
if ok {
|
|
close(ch)
|
|
delete(nDB.bulkSyncAckTbl, bsm.NodeName)
|
|
}
|
|
nDB.Unlock()
|
|
|
|
return
|
|
}
|
|
|
|
var nodeAddr net.IP
|
|
nDB.RLock()
|
|
if node, ok := nDB.nodes[bsm.NodeName]; ok {
|
|
nodeAddr = node.Addr
|
|
}
|
|
nDB.RUnlock()
|
|
|
|
if err := nDB.bulkSyncNode(bsm.Networks, bsm.NodeName, false); err != nil {
|
|
logrus.Errorf("Error in responding to bulk sync from node %s: %v", nodeAddr, err)
|
|
}
|
|
}
|
|
|
|
func (nDB *NetworkDB) handleMessage(buf []byte, isBulkSync bool) {
|
|
mType, data, err := decodeMessage(buf)
|
|
if err != nil {
|
|
logrus.Errorf("Error decoding gossip message to get message type: %v", err)
|
|
return
|
|
}
|
|
|
|
switch mType {
|
|
case MessageTypeNodeEvent:
|
|
nDB.handleNodeMessage(data)
|
|
case MessageTypeNetworkEvent:
|
|
nDB.handleNetworkMessage(data)
|
|
case MessageTypeTableEvent:
|
|
nDB.handleTableMessage(data, isBulkSync)
|
|
case MessageTypeBulkSync:
|
|
nDB.handleBulkSync(data)
|
|
case MessageTypeCompound:
|
|
nDB.handleCompound(data, isBulkSync)
|
|
default:
|
|
logrus.Errorf("%v(%v): unknown message type %d", nDB.config.Hostname, nDB.config.NodeID, mType)
|
|
}
|
|
}
|
|
|
|
func (d *delegate) NotifyMsg(buf []byte) {
|
|
if len(buf) == 0 {
|
|
return
|
|
}
|
|
|
|
d.nDB.handleMessage(buf, false)
|
|
}
|
|
|
|
func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
|
|
msgs := d.nDB.networkBroadcasts.GetBroadcasts(overhead, limit)
|
|
msgs = append(msgs, d.nDB.nodeBroadcasts.GetBroadcasts(overhead, limit)...)
|
|
return msgs
|
|
}
|
|
|
|
func (d *delegate) LocalState(join bool) []byte {
|
|
if join {
|
|
// Update all the local node/network state to a new time to
|
|
// force update on the node we are trying to rejoin, just in
|
|
// case that node has these in leaving state still. This is
|
|
// facilitate fast convergence after recovering from a gossip
|
|
// failure.
|
|
d.nDB.updateLocalNetworkTime()
|
|
}
|
|
|
|
d.nDB.RLock()
|
|
defer d.nDB.RUnlock()
|
|
|
|
pp := NetworkPushPull{
|
|
LTime: d.nDB.networkClock.Time(),
|
|
NodeName: d.nDB.config.NodeID,
|
|
}
|
|
|
|
for name, nn := range d.nDB.networks {
|
|
for _, n := range nn {
|
|
pp.Networks = append(pp.Networks, &NetworkEntry{
|
|
LTime: n.ltime,
|
|
NetworkID: n.id,
|
|
NodeName: name,
|
|
Leaving: n.leaving,
|
|
})
|
|
}
|
|
}
|
|
|
|
buf, err := encodeMessage(MessageTypePushPull, &pp)
|
|
if err != nil {
|
|
logrus.Errorf("Failed to encode local network state: %v", err)
|
|
return nil
|
|
}
|
|
|
|
return buf
|
|
}
|
|
|
|
func (d *delegate) MergeRemoteState(buf []byte, isJoin bool) {
|
|
if len(buf) == 0 {
|
|
logrus.Error("zero byte remote network state received")
|
|
return
|
|
}
|
|
|
|
var gMsg GossipMessage
|
|
err := proto.Unmarshal(buf, &gMsg)
|
|
if err != nil {
|
|
logrus.Errorf("Error unmarshalling push pull message: %v", err)
|
|
return
|
|
}
|
|
|
|
if gMsg.Type != MessageTypePushPull {
|
|
logrus.Errorf("Invalid message type %v received from remote", buf[0])
|
|
}
|
|
|
|
pp := NetworkPushPull{}
|
|
if err := proto.Unmarshal(gMsg.Data, &pp); err != nil {
|
|
logrus.Errorf("Failed to decode remote network state: %v", err)
|
|
return
|
|
}
|
|
|
|
nodeEvent := &NodeEvent{
|
|
LTime: pp.LTime,
|
|
NodeName: pp.NodeName,
|
|
Type: NodeEventTypeJoin,
|
|
}
|
|
d.nDB.handleNodeEvent(nodeEvent)
|
|
|
|
for _, n := range pp.Networks {
|
|
nEvent := &NetworkEvent{
|
|
LTime: n.LTime,
|
|
NodeName: n.NodeName,
|
|
NetworkID: n.NetworkID,
|
|
Type: NetworkEventTypeJoin,
|
|
}
|
|
|
|
if n.Leaving {
|
|
nEvent.Type = NetworkEventTypeLeave
|
|
}
|
|
|
|
d.nDB.handleNetworkEvent(nEvent)
|
|
}
|
|
|
|
}
|