delegate.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477
  1. package networkdb
  2. import (
  3. "fmt"
  4. "net"
  5. "strings"
  6. "github.com/Sirupsen/logrus"
  7. "github.com/gogo/protobuf/proto"
  8. )
  9. type delegate struct {
  10. nDB *NetworkDB
  11. }
  12. func (d *delegate) NodeMeta(limit int) []byte {
  13. return []byte{}
  14. }
  15. func (nDB *NetworkDB) checkAndGetNode(nEvent *NodeEvent) *node {
  16. nDB.Lock()
  17. defer nDB.Unlock()
  18. for _, nodes := range []map[string]*node{
  19. nDB.failedNodes,
  20. nDB.leftNodes,
  21. nDB.nodes,
  22. } {
  23. if n, ok := nodes[nEvent.NodeName]; ok {
  24. if n.ltime >= nEvent.LTime {
  25. return nil
  26. }
  27. delete(nodes, n.Name)
  28. return n
  29. }
  30. }
  31. return nil
  32. }
  33. func (nDB *NetworkDB) purgeSameNode(n *node) {
  34. nDB.Lock()
  35. defer nDB.Unlock()
  36. prefix := strings.Split(n.Name, "-")[0]
  37. for _, nodes := range []map[string]*node{
  38. nDB.failedNodes,
  39. nDB.leftNodes,
  40. nDB.nodes,
  41. } {
  42. var nodeNames []string
  43. for name, node := range nodes {
  44. if strings.HasPrefix(name, prefix) && n.Addr.Equal(node.Addr) {
  45. nodeNames = append(nodeNames, name)
  46. }
  47. }
  48. for _, name := range nodeNames {
  49. delete(nodes, name)
  50. }
  51. }
  52. }
  53. func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
  54. n := nDB.checkAndGetNode(nEvent)
  55. if n == nil {
  56. return false
  57. }
  58. nDB.purgeSameNode(n)
  59. n.ltime = nEvent.LTime
  60. switch nEvent.Type {
  61. case NodeEventTypeJoin:
  62. nDB.Lock()
  63. nDB.nodes[n.Name] = n
  64. nDB.Unlock()
  65. return true
  66. case NodeEventTypeLeave:
  67. nDB.Lock()
  68. nDB.leftNodes[n.Name] = n
  69. nDB.Unlock()
  70. return true
  71. }
  72. return false
  73. }
  74. func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
  75. var flushEntries bool
  76. // Update our local clock if the received messages has newer
  77. // time.
  78. nDB.networkClock.Witness(nEvent.LTime)
  79. nDB.Lock()
  80. defer func() {
  81. nDB.Unlock()
  82. // When a node leaves a network on the last task removal cleanup the
  83. // local entries for this network & node combination. When the tasks
  84. // on a network are removed we could have missed the gossip updates.
  85. // Not doing this cleanup can leave stale entries because bulksyncs
  86. // from the node will no longer include this network state.
  87. //
  88. // deleteNodeNetworkEntries takes nDB lock.
  89. if flushEntries {
  90. nDB.deleteNodeNetworkEntries(nEvent.NetworkID, nEvent.NodeName)
  91. }
  92. }()
  93. if nEvent.NodeName == nDB.config.NodeName {
  94. return false
  95. }
  96. nodeNetworks, ok := nDB.networks[nEvent.NodeName]
  97. if !ok {
  98. // We haven't heard about this node at all. Ignore the leave
  99. if nEvent.Type == NetworkEventTypeLeave {
  100. return false
  101. }
  102. nodeNetworks = make(map[string]*network)
  103. nDB.networks[nEvent.NodeName] = nodeNetworks
  104. }
  105. if n, ok := nodeNetworks[nEvent.NetworkID]; ok {
  106. // We have the latest state. Ignore the event
  107. // since it is stale.
  108. if n.ltime >= nEvent.LTime {
  109. return false
  110. }
  111. n.ltime = nEvent.LTime
  112. n.leaving = nEvent.Type == NetworkEventTypeLeave
  113. if n.leaving {
  114. n.reapTime = reapInterval
  115. flushEntries = true
  116. }
  117. nDB.addNetworkNode(nEvent.NetworkID, nEvent.NodeName)
  118. return true
  119. }
  120. if nEvent.Type == NetworkEventTypeLeave {
  121. return false
  122. }
  123. // This remote network join is being seen the first time.
  124. nodeNetworks[nEvent.NetworkID] = &network{
  125. id: nEvent.NetworkID,
  126. ltime: nEvent.LTime,
  127. }
  128. nDB.addNetworkNode(nEvent.NetworkID, nEvent.NodeName)
  129. return true
  130. }
  131. func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
  132. // Update our local clock if the received messages has newer
  133. // time.
  134. nDB.tableClock.Witness(tEvent.LTime)
  135. // Ignore the table events for networks that are in the process of going away
  136. nDB.RLock()
  137. networks := nDB.networks[nDB.config.NodeName]
  138. network, ok := networks[tEvent.NetworkID]
  139. nDB.RUnlock()
  140. if !ok || network.leaving {
  141. return true
  142. }
  143. e, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key)
  144. if err != nil && tEvent.Type == TableEventTypeDelete {
  145. // If it is a delete event and we don't have the entry here nothing to do.
  146. return false
  147. }
  148. if err == nil {
  149. // We have the latest state. Ignore the event
  150. // since it is stale.
  151. if e.ltime >= tEvent.LTime {
  152. return false
  153. }
  154. }
  155. e = &entry{
  156. ltime: tEvent.LTime,
  157. node: tEvent.NodeName,
  158. value: tEvent.Value,
  159. deleting: tEvent.Type == TableEventTypeDelete,
  160. }
  161. if e.deleting {
  162. e.reapTime = reapInterval
  163. }
  164. nDB.Lock()
  165. nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.TableName, tEvent.NetworkID, tEvent.Key), e)
  166. nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.NetworkID, tEvent.TableName, tEvent.Key), e)
  167. nDB.Unlock()
  168. var op opType
  169. switch tEvent.Type {
  170. case TableEventTypeCreate:
  171. op = opCreate
  172. case TableEventTypeUpdate:
  173. op = opUpdate
  174. case TableEventTypeDelete:
  175. op = opDelete
  176. }
  177. nDB.broadcaster.Write(makeEvent(op, tEvent.TableName, tEvent.NetworkID, tEvent.Key, tEvent.Value))
  178. return true
  179. }
  180. func (nDB *NetworkDB) handleCompound(buf []byte, isBulkSync bool) {
  181. // Decode the parts
  182. parts, err := decodeCompoundMessage(buf)
  183. if err != nil {
  184. logrus.Errorf("Failed to decode compound request: %v", err)
  185. return
  186. }
  187. // Handle each message
  188. for _, part := range parts {
  189. nDB.handleMessage(part, isBulkSync)
  190. }
  191. }
  192. func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
  193. var tEvent TableEvent
  194. if err := proto.Unmarshal(buf, &tEvent); err != nil {
  195. logrus.Errorf("Error decoding table event message: %v", err)
  196. return
  197. }
  198. // Ignore messages that this node generated.
  199. if tEvent.NodeName == nDB.config.NodeName {
  200. return
  201. }
  202. // Do not rebroadcast a bulk sync
  203. if rebroadcast := nDB.handleTableEvent(&tEvent); rebroadcast && !isBulkSync {
  204. var err error
  205. buf, err = encodeRawMessage(MessageTypeTableEvent, buf)
  206. if err != nil {
  207. logrus.Errorf("Error marshalling gossip message for network event rebroadcast: %v", err)
  208. return
  209. }
  210. nDB.RLock()
  211. n, ok := nDB.networks[nDB.config.NodeName][tEvent.NetworkID]
  212. nDB.RUnlock()
  213. if !ok {
  214. return
  215. }
  216. broadcastQ := n.tableBroadcasts
  217. if broadcastQ == nil {
  218. return
  219. }
  220. broadcastQ.QueueBroadcast(&tableEventMessage{
  221. msg: buf,
  222. id: tEvent.NetworkID,
  223. tname: tEvent.TableName,
  224. key: tEvent.Key,
  225. node: nDB.config.NodeName,
  226. })
  227. }
  228. }
  229. func (nDB *NetworkDB) handleNodeMessage(buf []byte) {
  230. var nEvent NodeEvent
  231. if err := proto.Unmarshal(buf, &nEvent); err != nil {
  232. logrus.Errorf("Error decoding node event message: %v", err)
  233. return
  234. }
  235. if rebroadcast := nDB.handleNodeEvent(&nEvent); rebroadcast {
  236. var err error
  237. buf, err = encodeRawMessage(MessageTypeNodeEvent, buf)
  238. if err != nil {
  239. logrus.Errorf("Error marshalling gossip message for node event rebroadcast: %v", err)
  240. return
  241. }
  242. nDB.nodeBroadcasts.QueueBroadcast(&nodeEventMessage{
  243. msg: buf,
  244. })
  245. }
  246. }
  247. func (nDB *NetworkDB) handleNetworkMessage(buf []byte) {
  248. var nEvent NetworkEvent
  249. if err := proto.Unmarshal(buf, &nEvent); err != nil {
  250. logrus.Errorf("Error decoding network event message: %v", err)
  251. return
  252. }
  253. if rebroadcast := nDB.handleNetworkEvent(&nEvent); rebroadcast {
  254. var err error
  255. buf, err = encodeRawMessage(MessageTypeNetworkEvent, buf)
  256. if err != nil {
  257. logrus.Errorf("Error marshalling gossip message for network event rebroadcast: %v", err)
  258. return
  259. }
  260. nDB.networkBroadcasts.QueueBroadcast(&networkEventMessage{
  261. msg: buf,
  262. id: nEvent.NetworkID,
  263. node: nEvent.NodeName,
  264. })
  265. }
  266. }
  267. func (nDB *NetworkDB) handleBulkSync(buf []byte) {
  268. var bsm BulkSyncMessage
  269. if err := proto.Unmarshal(buf, &bsm); err != nil {
  270. logrus.Errorf("Error decoding bulk sync message: %v", err)
  271. return
  272. }
  273. if bsm.LTime > 0 {
  274. nDB.tableClock.Witness(bsm.LTime)
  275. }
  276. nDB.handleMessage(bsm.Payload, true)
  277. // Don't respond to a bulk sync which was not unsolicited
  278. if !bsm.Unsolicited {
  279. nDB.Lock()
  280. ch, ok := nDB.bulkSyncAckTbl[bsm.NodeName]
  281. if ok {
  282. close(ch)
  283. delete(nDB.bulkSyncAckTbl, bsm.NodeName)
  284. }
  285. nDB.Unlock()
  286. return
  287. }
  288. var nodeAddr net.IP
  289. nDB.RLock()
  290. if node, ok := nDB.nodes[bsm.NodeName]; ok {
  291. nodeAddr = node.Addr
  292. }
  293. nDB.RUnlock()
  294. if err := nDB.bulkSyncNode(bsm.Networks, bsm.NodeName, false); err != nil {
  295. logrus.Errorf("Error in responding to bulk sync from node %s: %v", nodeAddr, err)
  296. }
  297. }
  298. func (nDB *NetworkDB) handleMessage(buf []byte, isBulkSync bool) {
  299. mType, data, err := decodeMessage(buf)
  300. if err != nil {
  301. logrus.Errorf("Error decoding gossip message to get message type: %v", err)
  302. return
  303. }
  304. switch mType {
  305. case MessageTypeNodeEvent:
  306. nDB.handleNodeMessage(data)
  307. case MessageTypeNetworkEvent:
  308. nDB.handleNetworkMessage(data)
  309. case MessageTypeTableEvent:
  310. nDB.handleTableMessage(data, isBulkSync)
  311. case MessageTypeBulkSync:
  312. nDB.handleBulkSync(data)
  313. case MessageTypeCompound:
  314. nDB.handleCompound(data, isBulkSync)
  315. default:
  316. logrus.Errorf("%s: unknown message type %d", nDB.config.NodeName, mType)
  317. }
  318. }
  319. func (d *delegate) NotifyMsg(buf []byte) {
  320. if len(buf) == 0 {
  321. return
  322. }
  323. d.nDB.handleMessage(buf, false)
  324. }
  325. func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
  326. msgs := d.nDB.networkBroadcasts.GetBroadcasts(overhead, limit)
  327. msgs = append(msgs, d.nDB.nodeBroadcasts.GetBroadcasts(overhead, limit)...)
  328. return msgs
  329. }
  330. func (d *delegate) LocalState(join bool) []byte {
  331. if join {
  332. // Update all the local node/network state to a new time to
  333. // force update on the node we are trying to rejoin, just in
  334. // case that node has these in leaving state still. This is
  335. // facilitate fast convergence after recovering from a gossip
  336. // failure.
  337. d.nDB.updateLocalNetworkTime()
  338. }
  339. d.nDB.RLock()
  340. defer d.nDB.RUnlock()
  341. pp := NetworkPushPull{
  342. LTime: d.nDB.networkClock.Time(),
  343. NodeName: d.nDB.config.NodeName,
  344. }
  345. for name, nn := range d.nDB.networks {
  346. for _, n := range nn {
  347. pp.Networks = append(pp.Networks, &NetworkEntry{
  348. LTime: n.ltime,
  349. NetworkID: n.id,
  350. NodeName: name,
  351. Leaving: n.leaving,
  352. })
  353. }
  354. }
  355. buf, err := encodeMessage(MessageTypePushPull, &pp)
  356. if err != nil {
  357. logrus.Errorf("Failed to encode local network state: %v", err)
  358. return nil
  359. }
  360. return buf
  361. }
  362. func (d *delegate) MergeRemoteState(buf []byte, isJoin bool) {
  363. if len(buf) == 0 {
  364. logrus.Error("zero byte remote network state received")
  365. return
  366. }
  367. var gMsg GossipMessage
  368. err := proto.Unmarshal(buf, &gMsg)
  369. if err != nil {
  370. logrus.Errorf("Error unmarshalling push pull messsage: %v", err)
  371. return
  372. }
  373. if gMsg.Type != MessageTypePushPull {
  374. logrus.Errorf("Invalid message type %v received from remote", buf[0])
  375. }
  376. pp := NetworkPushPull{}
  377. if err := proto.Unmarshal(gMsg.Data, &pp); err != nil {
  378. logrus.Errorf("Failed to decode remote network state: %v", err)
  379. return
  380. }
  381. nodeEvent := &NodeEvent{
  382. LTime: pp.LTime,
  383. NodeName: pp.NodeName,
  384. Type: NodeEventTypeJoin,
  385. }
  386. d.nDB.handleNodeEvent(nodeEvent)
  387. for _, n := range pp.Networks {
  388. nEvent := &NetworkEvent{
  389. LTime: n.LTime,
  390. NodeName: n.NodeName,
  391. NetworkID: n.NetworkID,
  392. Type: NetworkEventTypeJoin,
  393. }
  394. if n.Leaving {
  395. nEvent.Type = NetworkEventTypeLeave
  396. }
  397. d.nDB.handleNetworkEvent(nEvent)
  398. }
  399. }