delegate.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528
  1. package networkdb
  2. import (
  3. "fmt"
  4. "net"
  5. "strings"
  6. "github.com/gogo/protobuf/proto"
  7. "github.com/sirupsen/logrus"
  8. )
  9. type delegate struct {
  10. nDB *NetworkDB
  11. }
  12. func (d *delegate) NodeMeta(limit int) []byte {
  13. return []byte{}
  14. }
  15. func (nDB *NetworkDB) getNode(nEvent *NodeEvent) *node {
  16. nDB.Lock()
  17. defer nDB.Unlock()
  18. for _, nodes := range []map[string]*node{
  19. nDB.failedNodes,
  20. nDB.leftNodes,
  21. nDB.nodes,
  22. } {
  23. if n, ok := nodes[nEvent.NodeName]; ok {
  24. if n.ltime >= nEvent.LTime {
  25. return nil
  26. }
  27. return n
  28. }
  29. }
  30. return nil
  31. }
  32. func (nDB *NetworkDB) checkAndGetNode(nEvent *NodeEvent) *node {
  33. nDB.Lock()
  34. defer nDB.Unlock()
  35. for _, nodes := range []map[string]*node{
  36. nDB.failedNodes,
  37. nDB.leftNodes,
  38. nDB.nodes,
  39. } {
  40. if n, ok := nodes[nEvent.NodeName]; ok {
  41. if n.ltime >= nEvent.LTime {
  42. return nil
  43. }
  44. delete(nodes, n.Name)
  45. return n
  46. }
  47. }
  48. return nil
  49. }
  50. func (nDB *NetworkDB) purgeSameNode(n *node) {
  51. nDB.Lock()
  52. defer nDB.Unlock()
  53. prefix := strings.Split(n.Name, "-")[0]
  54. for _, nodes := range []map[string]*node{
  55. nDB.failedNodes,
  56. nDB.leftNodes,
  57. nDB.nodes,
  58. } {
  59. var nodeNames []string
  60. for name, node := range nodes {
  61. if strings.HasPrefix(name, prefix) && n.Addr.Equal(node.Addr) {
  62. nodeNames = append(nodeNames, name)
  63. }
  64. }
  65. for _, name := range nodeNames {
  66. delete(nodes, name)
  67. }
  68. }
  69. }
  70. func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
  71. // Update our local clock if the received messages has newer
  72. // time.
  73. nDB.networkClock.Witness(nEvent.LTime)
  74. n := nDB.getNode(nEvent)
  75. if n == nil {
  76. return false
  77. }
  78. // If its a node leave event for a manager and this is the only manager we
  79. // know of we want the reconnect logic to kick in. In a single manager
  80. // cluster manager's gossip can't be bootstrapped unless some other node
  81. // connects to it.
  82. if len(nDB.bootStrapIP) == 1 && nEvent.Type == NodeEventTypeLeave {
  83. for _, ip := range nDB.bootStrapIP {
  84. if ip.Equal(n.Addr) {
  85. n.ltime = nEvent.LTime
  86. return true
  87. }
  88. }
  89. }
  90. n = nDB.checkAndGetNode(nEvent)
  91. if n == nil {
  92. return false
  93. }
  94. nDB.purgeSameNode(n)
  95. n.ltime = nEvent.LTime
  96. switch nEvent.Type {
  97. case NodeEventTypeJoin:
  98. nDB.Lock()
  99. _, found := nDB.nodes[n.Name]
  100. nDB.nodes[n.Name] = n
  101. nDB.Unlock()
  102. if !found {
  103. logrus.Infof("Node join event for %s/%s", n.Name, n.Addr)
  104. }
  105. return true
  106. case NodeEventTypeLeave:
  107. nDB.Lock()
  108. nDB.leftNodes[n.Name] = n
  109. nDB.Unlock()
  110. logrus.Infof("Node leave event for %s/%s", n.Name, n.Addr)
  111. return true
  112. }
  113. return false
  114. }
  115. func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
  116. // Update our local clock if the received messages has newer
  117. // time.
  118. nDB.networkClock.Witness(nEvent.LTime)
  119. nDB.Lock()
  120. defer nDB.Unlock()
  121. if nEvent.NodeName == nDB.config.NodeName {
  122. return false
  123. }
  124. nodeNetworks, ok := nDB.networks[nEvent.NodeName]
  125. if !ok {
  126. // We haven't heard about this node at all. Ignore the leave
  127. if nEvent.Type == NetworkEventTypeLeave {
  128. return false
  129. }
  130. nodeNetworks = make(map[string]*network)
  131. nDB.networks[nEvent.NodeName] = nodeNetworks
  132. }
  133. if n, ok := nodeNetworks[nEvent.NetworkID]; ok {
  134. // We have the latest state. Ignore the event
  135. // since it is stale.
  136. if n.ltime >= nEvent.LTime {
  137. return false
  138. }
  139. n.ltime = nEvent.LTime
  140. n.leaving = nEvent.Type == NetworkEventTypeLeave
  141. if n.leaving {
  142. n.reapTime = reapInterval
  143. // The remote node is leaving the network, but not the gossip cluster.
  144. // Mark all its entries in deleted state, this will guarantee that
  145. // if some node bulk sync with us, the deleted state of
  146. // these entries will be propagated.
  147. nDB.deleteNodeNetworkEntries(nEvent.NetworkID, nEvent.NodeName)
  148. }
  149. if nEvent.Type == NetworkEventTypeLeave {
  150. nDB.deleteNetworkNode(nEvent.NetworkID, nEvent.NodeName)
  151. } else {
  152. nDB.addNetworkNode(nEvent.NetworkID, nEvent.NodeName)
  153. }
  154. return true
  155. }
  156. if nEvent.Type == NetworkEventTypeLeave {
  157. return false
  158. }
  159. // This remote network join is being seen the first time.
  160. nodeNetworks[nEvent.NetworkID] = &network{
  161. id: nEvent.NetworkID,
  162. ltime: nEvent.LTime,
  163. }
  164. nDB.addNetworkNode(nEvent.NetworkID, nEvent.NodeName)
  165. return true
  166. }
  167. func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
  168. // Update our local clock if the received messages has newer
  169. // time.
  170. nDB.tableClock.Witness(tEvent.LTime)
  171. // Ignore the table events for networks that are in the process of going away
  172. nDB.RLock()
  173. networks := nDB.networks[nDB.config.NodeName]
  174. network, ok := networks[tEvent.NetworkID]
  175. // Check if the owner of the event is still part of the network
  176. nodes := nDB.networkNodes[tEvent.NetworkID]
  177. var nodePresent bool
  178. for _, node := range nodes {
  179. if node == tEvent.NodeName {
  180. nodePresent = true
  181. break
  182. }
  183. }
  184. nDB.RUnlock()
  185. if !ok || network.leaving || !nodePresent {
  186. // I'm out of the network OR the event owner is not anymore part of the network so do not propagate
  187. return false
  188. }
  189. e, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key)
  190. if err == nil {
  191. // We have the latest state. Ignore the event
  192. // since it is stale.
  193. if e.ltime >= tEvent.LTime {
  194. return false
  195. }
  196. }
  197. e = &entry{
  198. ltime: tEvent.LTime,
  199. node: tEvent.NodeName,
  200. value: tEvent.Value,
  201. deleting: tEvent.Type == TableEventTypeDelete,
  202. }
  203. if e.deleting {
  204. e.reapTime = reapInterval
  205. }
  206. nDB.Lock()
  207. nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.TableName, tEvent.NetworkID, tEvent.Key), e)
  208. nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.NetworkID, tEvent.TableName, tEvent.Key), e)
  209. nDB.Unlock()
  210. if err != nil && tEvent.Type == TableEventTypeDelete {
  211. // If it is a delete event and we didn't have the entry here don't repropagate
  212. return true
  213. }
  214. var op opType
  215. switch tEvent.Type {
  216. case TableEventTypeCreate:
  217. op = opCreate
  218. case TableEventTypeUpdate:
  219. op = opUpdate
  220. case TableEventTypeDelete:
  221. op = opDelete
  222. }
  223. nDB.broadcaster.Write(makeEvent(op, tEvent.TableName, tEvent.NetworkID, tEvent.Key, tEvent.Value))
  224. return true
  225. }
  226. func (nDB *NetworkDB) handleCompound(buf []byte, isBulkSync bool) {
  227. // Decode the parts
  228. parts, err := decodeCompoundMessage(buf)
  229. if err != nil {
  230. logrus.Errorf("Failed to decode compound request: %v", err)
  231. return
  232. }
  233. // Handle each message
  234. for _, part := range parts {
  235. nDB.handleMessage(part, isBulkSync)
  236. }
  237. }
  238. func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
  239. var tEvent TableEvent
  240. if err := proto.Unmarshal(buf, &tEvent); err != nil {
  241. logrus.Errorf("Error decoding table event message: %v", err)
  242. return
  243. }
  244. // Ignore messages that this node generated.
  245. if tEvent.NodeName == nDB.config.NodeName {
  246. return
  247. }
  248. if rebroadcast := nDB.handleTableEvent(&tEvent); rebroadcast {
  249. var err error
  250. buf, err = encodeRawMessage(MessageTypeTableEvent, buf)
  251. if err != nil {
  252. logrus.Errorf("Error marshalling gossip message for network event rebroadcast: %v", err)
  253. return
  254. }
  255. nDB.RLock()
  256. n, ok := nDB.networks[nDB.config.NodeName][tEvent.NetworkID]
  257. nDB.RUnlock()
  258. if !ok {
  259. return
  260. }
  261. broadcastQ := n.tableBroadcasts
  262. if broadcastQ == nil {
  263. return
  264. }
  265. broadcastQ.QueueBroadcast(&tableEventMessage{
  266. msg: buf,
  267. id: tEvent.NetworkID,
  268. tname: tEvent.TableName,
  269. key: tEvent.Key,
  270. node: nDB.config.NodeName,
  271. })
  272. }
  273. }
  274. func (nDB *NetworkDB) handleNodeMessage(buf []byte) {
  275. var nEvent NodeEvent
  276. if err := proto.Unmarshal(buf, &nEvent); err != nil {
  277. logrus.Errorf("Error decoding node event message: %v", err)
  278. return
  279. }
  280. if rebroadcast := nDB.handleNodeEvent(&nEvent); rebroadcast {
  281. var err error
  282. buf, err = encodeRawMessage(MessageTypeNodeEvent, buf)
  283. if err != nil {
  284. logrus.Errorf("Error marshalling gossip message for node event rebroadcast: %v", err)
  285. return
  286. }
  287. nDB.nodeBroadcasts.QueueBroadcast(&nodeEventMessage{
  288. msg: buf,
  289. })
  290. }
  291. }
  292. func (nDB *NetworkDB) handleNetworkMessage(buf []byte) {
  293. var nEvent NetworkEvent
  294. if err := proto.Unmarshal(buf, &nEvent); err != nil {
  295. logrus.Errorf("Error decoding network event message: %v", err)
  296. return
  297. }
  298. if rebroadcast := nDB.handleNetworkEvent(&nEvent); rebroadcast {
  299. var err error
  300. buf, err = encodeRawMessage(MessageTypeNetworkEvent, buf)
  301. if err != nil {
  302. logrus.Errorf("Error marshalling gossip message for network event rebroadcast: %v", err)
  303. return
  304. }
  305. nDB.networkBroadcasts.QueueBroadcast(&networkEventMessage{
  306. msg: buf,
  307. id: nEvent.NetworkID,
  308. node: nEvent.NodeName,
  309. })
  310. }
  311. }
  312. func (nDB *NetworkDB) handleBulkSync(buf []byte) {
  313. var bsm BulkSyncMessage
  314. if err := proto.Unmarshal(buf, &bsm); err != nil {
  315. logrus.Errorf("Error decoding bulk sync message: %v", err)
  316. return
  317. }
  318. if bsm.LTime > 0 {
  319. nDB.tableClock.Witness(bsm.LTime)
  320. }
  321. nDB.handleMessage(bsm.Payload, true)
  322. // Don't respond to a bulk sync which was not unsolicited
  323. if !bsm.Unsolicited {
  324. nDB.Lock()
  325. ch, ok := nDB.bulkSyncAckTbl[bsm.NodeName]
  326. if ok {
  327. close(ch)
  328. delete(nDB.bulkSyncAckTbl, bsm.NodeName)
  329. }
  330. nDB.Unlock()
  331. return
  332. }
  333. var nodeAddr net.IP
  334. nDB.RLock()
  335. if node, ok := nDB.nodes[bsm.NodeName]; ok {
  336. nodeAddr = node.Addr
  337. }
  338. nDB.RUnlock()
  339. if err := nDB.bulkSyncNode(bsm.Networks, bsm.NodeName, false); err != nil {
  340. logrus.Errorf("Error in responding to bulk sync from node %s: %v", nodeAddr, err)
  341. }
  342. }
  343. func (nDB *NetworkDB) handleMessage(buf []byte, isBulkSync bool) {
  344. mType, data, err := decodeMessage(buf)
  345. if err != nil {
  346. logrus.Errorf("Error decoding gossip message to get message type: %v", err)
  347. return
  348. }
  349. switch mType {
  350. case MessageTypeNodeEvent:
  351. nDB.handleNodeMessage(data)
  352. case MessageTypeNetworkEvent:
  353. nDB.handleNetworkMessage(data)
  354. case MessageTypeTableEvent:
  355. nDB.handleTableMessage(data, isBulkSync)
  356. case MessageTypeBulkSync:
  357. nDB.handleBulkSync(data)
  358. case MessageTypeCompound:
  359. nDB.handleCompound(data, isBulkSync)
  360. default:
  361. logrus.Errorf("%s: unknown message type %d", nDB.config.NodeName, mType)
  362. }
  363. }
  364. func (d *delegate) NotifyMsg(buf []byte) {
  365. if len(buf) == 0 {
  366. return
  367. }
  368. d.nDB.handleMessage(buf, false)
  369. }
  370. func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
  371. msgs := d.nDB.networkBroadcasts.GetBroadcasts(overhead, limit)
  372. msgs = append(msgs, d.nDB.nodeBroadcasts.GetBroadcasts(overhead, limit)...)
  373. return msgs
  374. }
  375. func (d *delegate) LocalState(join bool) []byte {
  376. if join {
  377. // Update all the local node/network state to a new time to
  378. // force update on the node we are trying to rejoin, just in
  379. // case that node has these in leaving state still. This is
  380. // facilitate fast convergence after recovering from a gossip
  381. // failure.
  382. d.nDB.updateLocalNetworkTime()
  383. }
  384. d.nDB.RLock()
  385. defer d.nDB.RUnlock()
  386. pp := NetworkPushPull{
  387. LTime: d.nDB.networkClock.Time(),
  388. NodeName: d.nDB.config.NodeName,
  389. }
  390. for name, nn := range d.nDB.networks {
  391. for _, n := range nn {
  392. pp.Networks = append(pp.Networks, &NetworkEntry{
  393. LTime: n.ltime,
  394. NetworkID: n.id,
  395. NodeName: name,
  396. Leaving: n.leaving,
  397. })
  398. }
  399. }
  400. buf, err := encodeMessage(MessageTypePushPull, &pp)
  401. if err != nil {
  402. logrus.Errorf("Failed to encode local network state: %v", err)
  403. return nil
  404. }
  405. return buf
  406. }
  407. func (d *delegate) MergeRemoteState(buf []byte, isJoin bool) {
  408. if len(buf) == 0 {
  409. logrus.Error("zero byte remote network state received")
  410. return
  411. }
  412. var gMsg GossipMessage
  413. err := proto.Unmarshal(buf, &gMsg)
  414. if err != nil {
  415. logrus.Errorf("Error unmarshalling push pull messsage: %v", err)
  416. return
  417. }
  418. if gMsg.Type != MessageTypePushPull {
  419. logrus.Errorf("Invalid message type %v received from remote", buf[0])
  420. }
  421. pp := NetworkPushPull{}
  422. if err := proto.Unmarshal(gMsg.Data, &pp); err != nil {
  423. logrus.Errorf("Failed to decode remote network state: %v", err)
  424. return
  425. }
  426. nodeEvent := &NodeEvent{
  427. LTime: pp.LTime,
  428. NodeName: pp.NodeName,
  429. Type: NodeEventTypeJoin,
  430. }
  431. d.nDB.handleNodeEvent(nodeEvent)
  432. for _, n := range pp.Networks {
  433. nEvent := &NetworkEvent{
  434. LTime: n.LTime,
  435. NodeName: n.NodeName,
  436. NetworkID: n.NetworkID,
  437. Type: NetworkEventTypeJoin,
  438. }
  439. if n.Leaving {
  440. nEvent.Type = NetworkEventTypeLeave
  441. }
  442. d.nDB.handleNetworkEvent(nEvent)
  443. }
  444. }