123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172 |
- package networkdb
- import (
- "errors"
- "time"
- "github.com/hashicorp/memberlist"
- "github.com/hashicorp/serf/serf"
- )
- const broadcastTimeout = 5 * time.Second
- type networkEventMessage struct {
- id string
- node string
- msg []byte
- }
- func (m *networkEventMessage) Invalidates(other memberlist.Broadcast) bool {
- otherm := other.(*networkEventMessage)
- return m.id == otherm.id && m.node == otherm.node
- }
- func (m *networkEventMessage) Message() []byte {
- return m.msg
- }
- func (m *networkEventMessage) Finished() {
- }
- func (nDB *NetworkDB) sendNetworkEvent(nid string, event NetworkEvent_Type, ltime serf.LamportTime) error {
- nEvent := NetworkEvent{
- Type: event,
- LTime: ltime,
- NodeName: nDB.config.NodeID,
- NetworkID: nid,
- }
- raw, err := encodeMessage(MessageTypeNetworkEvent, &nEvent)
- if err != nil {
- return err
- }
- nDB.networkBroadcasts.QueueBroadcast(&networkEventMessage{
- msg: raw,
- id: nid,
- node: nDB.config.NodeID,
- })
- return nil
- }
- type nodeEventMessage struct {
- msg []byte
- notify chan<- struct{}
- }
- func (m *nodeEventMessage) Invalidates(other memberlist.Broadcast) bool {
- return false
- }
- func (m *nodeEventMessage) Message() []byte {
- return m.msg
- }
- func (m *nodeEventMessage) Finished() {
- if m.notify != nil {
- close(m.notify)
- }
- }
- func (nDB *NetworkDB) sendNodeEvent(event NodeEvent_Type) error {
- nEvent := NodeEvent{
- Type: event,
- LTime: nDB.networkClock.Increment(),
- NodeName: nDB.config.NodeID,
- }
- raw, err := encodeMessage(MessageTypeNodeEvent, &nEvent)
- if err != nil {
- return err
- }
- notifyCh := make(chan struct{})
- nDB.nodeBroadcasts.QueueBroadcast(&nodeEventMessage{
- msg: raw,
- notify: notifyCh,
- })
- nDB.RLock()
- noPeers := len(nDB.nodes) <= 1
- nDB.RUnlock()
- // Message enqueued, do not wait for a send if no peer is present
- if noPeers {
- return nil
- }
- // Wait for the broadcast
- select {
- case <-notifyCh:
- case <-time.After(broadcastTimeout):
- return errors.New("timed out broadcasting node event")
- }
- return nil
- }
- type tableEventMessage struct {
- id string
- tname string
- key string
- msg []byte
- }
- func (m *tableEventMessage) Invalidates(other memberlist.Broadcast) bool {
- otherm := other.(*tableEventMessage)
- return m.tname == otherm.tname && m.id == otherm.id && m.key == otherm.key
- }
- func (m *tableEventMessage) Message() []byte {
- return m.msg
- }
- func (m *tableEventMessage) Finished() {
- }
- func (nDB *NetworkDB) sendTableEvent(event TableEvent_Type, nid string, tname string, key string, entry *entry) error {
- tEvent := TableEvent{
- Type: event,
- LTime: entry.ltime,
- NodeName: nDB.config.NodeID,
- NetworkID: nid,
- TableName: tname,
- Key: key,
- Value: entry.value,
- // The duration in second is a float that below would be truncated
- ResidualReapTime: int32(entry.reapTime.Seconds()),
- }
- raw, err := encodeMessage(MessageTypeTableEvent, &tEvent)
- if err != nil {
- return err
- }
- var broadcastQ *memberlist.TransmitLimitedQueue
- nDB.RLock()
- thisNodeNetworks, ok := nDB.networks[nDB.config.NodeID]
- if ok {
- // The network may have been removed
- network, networkOk := thisNodeNetworks[nid]
- if !networkOk {
- nDB.RUnlock()
- return nil
- }
- broadcastQ = network.tableBroadcasts
- }
- nDB.RUnlock()
- // The network may have been removed
- if broadcastQ == nil {
- return nil
- }
- broadcastQ.QueueBroadcast(&tableEventMessage{
- msg: raw,
- id: nid,
- tname: tname,
- key: key,
- })
- return nil
- }
|