|
@@ -216,15 +216,11 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
|
|
|
network, ok := networks[tEvent.NetworkID]
|
|
|
nDB.RUnlock()
|
|
|
if !ok || network.leaving {
|
|
|
- return true
|
|
|
- }
|
|
|
-
|
|
|
- e, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key)
|
|
|
- if err != nil && tEvent.Type == TableEventTypeDelete {
|
|
|
- // If it is a delete event and we don't have the entry here nothing to do.
|
|
|
+ // I'm out of the network so do not propagate
|
|
|
return false
|
|
|
}
|
|
|
|
|
|
+ e, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key)
|
|
|
if err == nil {
|
|
|
// We have the latest state. Ignore the event
|
|
|
// since it is stale.
|
|
@@ -249,6 +245,11 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
|
|
|
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.NetworkID, tEvent.TableName, tEvent.Key), e)
|
|
|
nDB.Unlock()
|
|
|
|
|
|
+ if err != nil && tEvent.Type == TableEventTypeDelete {
|
|
|
+ // If it is a delete event and we didn't have the entry here don't repropagate
|
|
|
+ return true
|
|
|
+ }
|
|
|
+
|
|
|
var op opType
|
|
|
switch tEvent.Type {
|
|
|
case TableEventTypeCreate:
|
|
@@ -289,8 +290,7 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
|
|
|
return
|
|
|
}
|
|
|
|
|
|
- // Do not rebroadcast a bulk sync
|
|
|
- if rebroadcast := nDB.handleTableEvent(&tEvent); rebroadcast && !isBulkSync {
|
|
|
+ if rebroadcast := nDB.handleTableEvent(&tEvent); rebroadcast {
|
|
|
var err error
|
|
|
buf, err = encodeRawMessage(MessageTypeTableEvent, buf)
|
|
|
if err != nil {
|