delegate.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. package networkdb
  2. import (
  3. "fmt"
  4. "net"
  5. "time"
  6. "github.com/Sirupsen/logrus"
  7. "github.com/gogo/protobuf/proto"
  8. )
  9. type delegate struct {
  10. nDB *NetworkDB
  11. }
  12. func (d *delegate) NodeMeta(limit int) []byte {
  13. return []byte{}
  14. }
  15. func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
  16. // Update our local clock if the received messages has newer
  17. // time.
  18. nDB.networkClock.Witness(nEvent.LTime)
  19. nDB.Lock()
  20. defer nDB.Unlock()
  21. if nEvent.NodeName == nDB.config.NodeName {
  22. return false
  23. }
  24. nodeNetworks, ok := nDB.networks[nEvent.NodeName]
  25. if !ok {
  26. // We haven't heard about this node at all. Ignore the leave
  27. if nEvent.Type == NetworkEventTypeLeave {
  28. return false
  29. }
  30. nodeNetworks = make(map[string]*network)
  31. nDB.networks[nEvent.NodeName] = nodeNetworks
  32. }
  33. if n, ok := nodeNetworks[nEvent.NetworkID]; ok {
  34. // We have the latest state. Ignore the event
  35. // since it is stale.
  36. if n.ltime >= nEvent.LTime {
  37. return false
  38. }
  39. n.ltime = nEvent.LTime
  40. n.leaving = nEvent.Type == NetworkEventTypeLeave
  41. if n.leaving {
  42. n.leaveTime = time.Now()
  43. }
  44. return true
  45. }
  46. if nEvent.Type == NetworkEventTypeLeave {
  47. return false
  48. }
  49. // This remote network join is being seen the first time.
  50. nodeNetworks[nEvent.NetworkID] = &network{
  51. id: nEvent.NetworkID,
  52. ltime: nEvent.LTime,
  53. }
  54. nDB.networkNodes[nEvent.NetworkID] = append(nDB.networkNodes[nEvent.NetworkID], nEvent.NodeName)
  55. return true
  56. }
  57. func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
  58. // Update our local clock if the received messages has newer
  59. // time.
  60. nDB.tableClock.Witness(tEvent.LTime)
  61. // Ignore the table events for networks that are in the process of going away
  62. nDB.RLock()
  63. networks := nDB.networks[nDB.config.NodeName]
  64. network, ok := networks[tEvent.NetworkID]
  65. nDB.RUnlock()
  66. if !ok || network.leaving {
  67. return true
  68. }
  69. e, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key)
  70. if err != nil && tEvent.Type == TableEventTypeDelete {
  71. // If it is a delete event and we don't have the entry here nothing to do.
  72. return false
  73. }
  74. if err == nil {
  75. // We have the latest state. Ignore the event
  76. // since it is stale.
  77. if e.ltime >= tEvent.LTime {
  78. return false
  79. }
  80. }
  81. e = &entry{
  82. ltime: tEvent.LTime,
  83. node: tEvent.NodeName,
  84. value: tEvent.Value,
  85. deleting: tEvent.Type == TableEventTypeDelete,
  86. }
  87. if e.deleting {
  88. e.deleteTime = time.Now()
  89. }
  90. nDB.Lock()
  91. nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.TableName, tEvent.NetworkID, tEvent.Key), e)
  92. nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.NetworkID, tEvent.TableName, tEvent.Key), e)
  93. nDB.Unlock()
  94. var op opType
  95. switch tEvent.Type {
  96. case TableEventTypeCreate:
  97. op = opCreate
  98. case TableEventTypeUpdate:
  99. op = opUpdate
  100. case TableEventTypeDelete:
  101. op = opDelete
  102. }
  103. nDB.broadcaster.Write(makeEvent(op, tEvent.TableName, tEvent.NetworkID, tEvent.Key, tEvent.Value))
  104. return true
  105. }
  106. func (nDB *NetworkDB) handleCompound(buf []byte, isBulkSync bool) {
  107. // Decode the parts
  108. parts, err := decodeCompoundMessage(buf)
  109. if err != nil {
  110. logrus.Errorf("Failed to decode compound request: %v", err)
  111. return
  112. }
  113. // Handle each message
  114. for _, part := range parts {
  115. nDB.handleMessage(part, isBulkSync)
  116. }
  117. }
  118. func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
  119. var tEvent TableEvent
  120. if err := proto.Unmarshal(buf, &tEvent); err != nil {
  121. logrus.Errorf("Error decoding table event message: %v", err)
  122. return
  123. }
  124. // Ignore messages that this node generated.
  125. if tEvent.NodeName == nDB.config.NodeName {
  126. return
  127. }
  128. // Do not rebroadcast a bulk sync
  129. if rebroadcast := nDB.handleTableEvent(&tEvent); rebroadcast && !isBulkSync {
  130. var err error
  131. buf, err = encodeRawMessage(MessageTypeTableEvent, buf)
  132. if err != nil {
  133. logrus.Errorf("Error marshalling gossip message for network event rebroadcast: %v", err)
  134. return
  135. }
  136. nDB.RLock()
  137. n, ok := nDB.networks[nDB.config.NodeName][tEvent.NetworkID]
  138. nDB.RUnlock()
  139. if !ok {
  140. return
  141. }
  142. broadcastQ := n.tableBroadcasts
  143. if broadcastQ == nil {
  144. return
  145. }
  146. broadcastQ.QueueBroadcast(&tableEventMessage{
  147. msg: buf,
  148. id: tEvent.NetworkID,
  149. tname: tEvent.TableName,
  150. key: tEvent.Key,
  151. node: nDB.config.NodeName,
  152. })
  153. }
  154. }
  155. func (nDB *NetworkDB) handleNetworkMessage(buf []byte) {
  156. var nEvent NetworkEvent
  157. if err := proto.Unmarshal(buf, &nEvent); err != nil {
  158. logrus.Errorf("Error decoding network event message: %v", err)
  159. return
  160. }
  161. if rebroadcast := nDB.handleNetworkEvent(&nEvent); rebroadcast {
  162. var err error
  163. buf, err = encodeRawMessage(MessageTypeNetworkEvent, buf)
  164. if err != nil {
  165. logrus.Errorf("Error marshalling gossip message for network event rebroadcast: %v", err)
  166. return
  167. }
  168. nDB.networkBroadcasts.QueueBroadcast(&networkEventMessage{
  169. msg: buf,
  170. id: nEvent.NetworkID,
  171. node: nEvent.NodeName,
  172. })
  173. }
  174. }
  175. func (nDB *NetworkDB) handleBulkSync(buf []byte) {
  176. var bsm BulkSyncMessage
  177. if err := proto.Unmarshal(buf, &bsm); err != nil {
  178. logrus.Errorf("Error decoding bulk sync message: %v", err)
  179. return
  180. }
  181. if bsm.LTime > 0 {
  182. nDB.tableClock.Witness(bsm.LTime)
  183. }
  184. nDB.handleMessage(bsm.Payload, true)
  185. // Don't respond to a bulk sync which was not unsolicited
  186. if !bsm.Unsolicited {
  187. nDB.RLock()
  188. ch, ok := nDB.bulkSyncAckTbl[bsm.NodeName]
  189. nDB.RUnlock()
  190. if ok {
  191. close(ch)
  192. }
  193. return
  194. }
  195. var nodeAddr net.IP
  196. nDB.RLock()
  197. if node, ok := nDB.nodes[bsm.NodeName]; ok {
  198. nodeAddr = node.Addr
  199. }
  200. nDB.RUnlock()
  201. if err := nDB.bulkSyncNode(bsm.Networks, bsm.NodeName, false); err != nil {
  202. logrus.Errorf("Error in responding to bulk sync from node %s: %v", nodeAddr, err)
  203. }
  204. }
  205. func (nDB *NetworkDB) handleMessage(buf []byte, isBulkSync bool) {
  206. mType, data, err := decodeMessage(buf)
  207. if err != nil {
  208. logrus.Errorf("Error decoding gossip message to get message type: %v", err)
  209. return
  210. }
  211. switch mType {
  212. case MessageTypeNetworkEvent:
  213. nDB.handleNetworkMessage(data)
  214. case MessageTypeTableEvent:
  215. nDB.handleTableMessage(data, isBulkSync)
  216. case MessageTypeBulkSync:
  217. nDB.handleBulkSync(data)
  218. case MessageTypeCompound:
  219. nDB.handleCompound(data, isBulkSync)
  220. default:
  221. logrus.Errorf("%s: unknown message type %d", nDB.config.NodeName, mType)
  222. }
  223. }
  224. func (d *delegate) NotifyMsg(buf []byte) {
  225. if len(buf) == 0 {
  226. return
  227. }
  228. d.nDB.handleMessage(buf, false)
  229. }
  230. func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
  231. return d.nDB.networkBroadcasts.GetBroadcasts(overhead, limit)
  232. }
  233. func (d *delegate) LocalState(join bool) []byte {
  234. d.nDB.RLock()
  235. defer d.nDB.RUnlock()
  236. pp := NetworkPushPull{
  237. LTime: d.nDB.networkClock.Time(),
  238. }
  239. for name, nn := range d.nDB.networks {
  240. for _, n := range nn {
  241. pp.Networks = append(pp.Networks, &NetworkEntry{
  242. LTime: n.ltime,
  243. NetworkID: n.id,
  244. NodeName: name,
  245. Leaving: n.leaving,
  246. })
  247. }
  248. }
  249. buf, err := encodeMessage(MessageTypePushPull, &pp)
  250. if err != nil {
  251. logrus.Errorf("Failed to encode local network state: %v", err)
  252. return nil
  253. }
  254. return buf
  255. }
  256. func (d *delegate) MergeRemoteState(buf []byte, isJoin bool) {
  257. if len(buf) == 0 {
  258. logrus.Error("zero byte remote network state received")
  259. return
  260. }
  261. var gMsg GossipMessage
  262. err := proto.Unmarshal(buf, &gMsg)
  263. if err != nil {
  264. logrus.Errorf("Error unmarshalling push pull messsage: %v", err)
  265. return
  266. }
  267. if gMsg.Type != MessageTypePushPull {
  268. logrus.Errorf("Invalid message type %v received from remote", buf[0])
  269. }
  270. pp := NetworkPushPull{}
  271. if err := proto.Unmarshal(gMsg.Data, &pp); err != nil {
  272. logrus.Errorf("Failed to decode remote network state: %v", err)
  273. return
  274. }
  275. if pp.LTime > 0 {
  276. d.nDB.networkClock.Witness(pp.LTime)
  277. }
  278. for _, n := range pp.Networks {
  279. nEvent := &NetworkEvent{
  280. LTime: n.LTime,
  281. NodeName: n.NodeName,
  282. NetworkID: n.NetworkID,
  283. Type: NetworkEventTypeJoin,
  284. }
  285. if n.Leaving {
  286. nEvent.Type = NetworkEventTypeLeave
  287. }
  288. d.nDB.handleNetworkEvent(nEvent)
  289. }
  290. }