delegate.go 12 KB


  1. package networkdb
  2. import (
  3. "fmt"
  4. "net"
  5. "strings"
  6. "github.com/Sirupsen/logrus"
  7. "github.com/gogo/protobuf/proto"
  8. )
  9. type delegate struct {
  10. nDB *NetworkDB
  11. }
  12. func (d *delegate) NodeMeta(limit int) []byte {
  13. return []byte{}
  14. }
  15. func (nDB *NetworkDB) getNode(nEvent *NodeEvent) *node {
  16. nDB.Lock()
  17. defer nDB.Unlock()
  18. for _, nodes := range []map[string]*node{
  19. nDB.failedNodes,
  20. nDB.leftNodes,
  21. nDB.nodes,
  22. } {
  23. if n, ok := nodes[nEvent.NodeName]; ok {
  24. if n.ltime >= nEvent.LTime {
  25. return nil
  26. }
  27. return n
  28. }
  29. }
  30. return nil
  31. }
  32. func (nDB *NetworkDB) checkAndGetNode(nEvent *NodeEvent) *node {
  33. nDB.Lock()
  34. defer nDB.Unlock()
  35. for _, nodes := range []map[string]*node{
  36. nDB.failedNodes,
  37. nDB.leftNodes,
  38. nDB.nodes,
  39. } {
  40. if n, ok := nodes[nEvent.NodeName]; ok {
  41. if n.ltime >= nEvent.LTime {
  42. return nil
  43. }
  44. delete(nodes, n.Name)
  45. return n
  46. }
  47. }
  48. return nil
  49. }
  50. func (nDB *NetworkDB) purgeSameNode(n *node) {
  51. nDB.Lock()
  52. defer nDB.Unlock()
  53. prefix := strings.Split(n.Name, "-")[0]
  54. for _, nodes := range []map[string]*node{
  55. nDB.failedNodes,
  56. nDB.leftNodes,
  57. nDB.nodes,
  58. } {
  59. var nodeNames []string
  60. for name, node := range nodes {
  61. if strings.HasPrefix(name, prefix) && n.Addr.Equal(node.Addr) {
  62. nodeNames = append(nodeNames, name)
  63. }
  64. }
  65. for _, name := range nodeNames {
  66. delete(nodes, name)
  67. }
  68. }
  69. }
  70. func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
  71. // Update our local clock if the received messages has newer
  72. // time.
  73. nDB.networkClock.Witness(nEvent.LTime)
  74. n := nDB.getNode(nEvent)
  75. if n == nil {
  76. return false
  77. }
  78. // If its a node leave event for a manager and this is the only manager we
  79. // know of we want the reconnect logic to kick in. In a single manager
  80. // cluster manager's gossip can't be bootstrapped unless some other node
  81. // connects to it.
  82. if len(nDB.bootStrapIP) == 1 && nEvent.Type == NodeEventTypeLeave {
  83. for _, ip := range nDB.bootStrapIP {
  84. if ip.Equal(n.Addr) {
  85. n.ltime = nEvent.LTime
  86. return true
  87. }
  88. }
  89. }
  90. n = nDB.checkAndGetNode(nEvent)
  91. nDB.purgeSameNode(n)
  92. n.ltime = nEvent.LTime
  93. switch nEvent.Type {
  94. case NodeEventTypeJoin:
  95. nDB.Lock()
  96. _, found := nDB.nodes[n.Name]
  97. nDB.nodes[n.Name] = n
  98. nDB.Unlock()
  99. if !found {
  100. logrus.Infof("Node join event for %s/%s", n.Name, n.Addr)
  101. }
  102. return true
  103. case NodeEventTypeLeave:
  104. nDB.Lock()
  105. nDB.leftNodes[n.Name] = n
  106. nDB.Unlock()
  107. logrus.Infof("Node leave event for %s/%s", n.Name, n.Addr)
  108. return true
  109. }
  110. return false
  111. }
  112. func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
  113. var flushEntries bool
  114. // Update our local clock if the received messages has newer
  115. // time.
  116. nDB.networkClock.Witness(nEvent.LTime)
  117. nDB.Lock()
  118. defer func() {
  119. nDB.Unlock()
  120. // When a node leaves a network on the last task removal cleanup the
  121. // local entries for this network & node combination. When the tasks
  122. // on a network are removed we could have missed the gossip updates.
  123. // Not doing this cleanup can leave stale entries because bulksyncs
  124. // from the node will no longer include this network state.
  125. //
  126. // deleteNodeNetworkEntries takes nDB lock.
  127. if flushEntries {
  128. nDB.deleteNodeNetworkEntries(nEvent.NetworkID, nEvent.NodeName)
  129. }
  130. }()
  131. if nEvent.NodeName == nDB.config.NodeName {
  132. return false
  133. }
  134. nodeNetworks, ok := nDB.networks[nEvent.NodeName]
  135. if !ok {
  136. // We haven't heard about this node at all. Ignore the leave
  137. if nEvent.Type == NetworkEventTypeLeave {
  138. return false
  139. }
  140. nodeNetworks = make(map[string]*network)
  141. nDB.networks[nEvent.NodeName] = nodeNetworks
  142. }
  143. if n, ok := nodeNetworks[nEvent.NetworkID]; ok {
  144. // We have the latest state. Ignore the event
  145. // since it is stale.
  146. if n.ltime >= nEvent.LTime {
  147. return false
  148. }
  149. n.ltime = nEvent.LTime
  150. n.leaving = nEvent.Type == NetworkEventTypeLeave
  151. if n.leaving {
  152. n.reapTime = reapInterval
  153. flushEntries = true
  154. }
  155. if nEvent.Type == NetworkEventTypeLeave {
  156. nDB.deleteNetworkNode(nEvent.NetworkID, nEvent.NodeName)
  157. } else {
  158. nDB.addNetworkNode(nEvent.NetworkID, nEvent.NodeName)
  159. }
  160. return true
  161. }
  162. if nEvent.Type == NetworkEventTypeLeave {
  163. return false
  164. }
  165. // This remote network join is being seen the first time.
  166. nodeNetworks[nEvent.NetworkID] = &network{
  167. id: nEvent.NetworkID,
  168. ltime: nEvent.LTime,
  169. }
  170. nDB.addNetworkNode(nEvent.NetworkID, nEvent.NodeName)
  171. return true
  172. }
  173. func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
  174. // Update our local clock if the received messages has newer
  175. // time.
  176. nDB.tableClock.Witness(tEvent.LTime)
  177. // Ignore the table events for networks that are in the process of going away
  178. nDB.RLock()
  179. networks := nDB.networks[nDB.config.NodeName]
  180. network, ok := networks[tEvent.NetworkID]
  181. nDB.RUnlock()
  182. if !ok || network.leaving {
  183. return true
  184. }
  185. e, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key)
  186. if err != nil && tEvent.Type == TableEventTypeDelete {
  187. // If it is a delete event and we don't have the entry here nothing to do.
  188. return false
  189. }
  190. if err == nil {
  191. // We have the latest state. Ignore the event
  192. // since it is stale.
  193. if e.ltime >= tEvent.LTime {
  194. return false
  195. }
  196. }
  197. e = &entry{
  198. ltime: tEvent.LTime,
  199. node: tEvent.NodeName,
  200. value: tEvent.Value,
  201. deleting: tEvent.Type == TableEventTypeDelete,
  202. }
  203. if e.deleting {
  204. e.reapTime = reapInterval
  205. }
  206. nDB.Lock()
  207. nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.TableName, tEvent.NetworkID, tEvent.Key), e)
  208. nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.NetworkID, tEvent.TableName, tEvent.Key), e)
  209. nDB.Unlock()
  210. var op opType
  211. switch tEvent.Type {
  212. case TableEventTypeCreate:
  213. op = opCreate
  214. case TableEventTypeUpdate:
  215. op = opUpdate
  216. case TableEventTypeDelete:
  217. op = opDelete
  218. }
  219. nDB.broadcaster.Write(makeEvent(op, tEvent.TableName, tEvent.NetworkID, tEvent.Key, tEvent.Value))
  220. return true
  221. }
  222. func (nDB *NetworkDB) handleCompound(buf []byte, isBulkSync bool) {
  223. // Decode the parts
  224. parts, err := decodeCompoundMessage(buf)
  225. if err != nil {
  226. logrus.Errorf("Failed to decode compound request: %v", err)
  227. return
  228. }
  229. // Handle each message
  230. for _, part := range parts {
  231. nDB.handleMessage(part, isBulkSync)
  232. }
  233. }
  234. func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
  235. var tEvent TableEvent
  236. if err := proto.Unmarshal(buf, &tEvent); err != nil {
  237. logrus.Errorf("Error decoding table event message: %v", err)
  238. return
  239. }
  240. // Ignore messages that this node generated.
  241. if tEvent.NodeName == nDB.config.NodeName {
  242. return
  243. }
  244. // Do not rebroadcast a bulk sync
  245. if rebroadcast := nDB.handleTableEvent(&tEvent); rebroadcast && !isBulkSync {
  246. var err error
  247. buf, err = encodeRawMessage(MessageTypeTableEvent, buf)
  248. if err != nil {
  249. logrus.Errorf("Error marshalling gossip message for network event rebroadcast: %v", err)
  250. return
  251. }
  252. nDB.RLock()
  253. n, ok := nDB.networks[nDB.config.NodeName][tEvent.NetworkID]
  254. nDB.RUnlock()
  255. if !ok {
  256. return
  257. }
  258. broadcastQ := n.tableBroadcasts
  259. if broadcastQ == nil {
  260. return
  261. }
  262. broadcastQ.QueueBroadcast(&tableEventMessage{
  263. msg: buf,
  264. id: tEvent.NetworkID,
  265. tname: tEvent.TableName,
  266. key: tEvent.Key,
  267. node: nDB.config.NodeName,
  268. })
  269. }
  270. }
  271. func (nDB *NetworkDB) handleNodeMessage(buf []byte) {
  272. var nEvent NodeEvent
  273. if err := proto.Unmarshal(buf, &nEvent); err != nil {
  274. logrus.Errorf("Error decoding node event message: %v", err)
  275. return
  276. }
  277. if rebroadcast := nDB.handleNodeEvent(&nEvent); rebroadcast {
  278. var err error
  279. buf, err = encodeRawMessage(MessageTypeNodeEvent, buf)
  280. if err != nil {
  281. logrus.Errorf("Error marshalling gossip message for node event rebroadcast: %v", err)
  282. return
  283. }
  284. nDB.nodeBroadcasts.QueueBroadcast(&nodeEventMessage{
  285. msg: buf,
  286. })
  287. }
  288. }
  289. func (nDB *NetworkDB) handleNetworkMessage(buf []byte) {
  290. var nEvent NetworkEvent
  291. if err := proto.Unmarshal(buf, &nEvent); err != nil {
  292. logrus.Errorf("Error decoding network event message: %v", err)
  293. return
  294. }
  295. if rebroadcast := nDB.handleNetworkEvent(&nEvent); rebroadcast {
  296. var err error
  297. buf, err = encodeRawMessage(MessageTypeNetworkEvent, buf)
  298. if err != nil {
  299. logrus.Errorf("Error marshalling gossip message for network event rebroadcast: %v", err)
  300. return
  301. }
  302. nDB.networkBroadcasts.QueueBroadcast(&networkEventMessage{
  303. msg: buf,
  304. id: nEvent.NetworkID,
  305. node: nEvent.NodeName,
  306. })
  307. }
  308. }
  309. func (nDB *NetworkDB) handleBulkSync(buf []byte) {
  310. var bsm BulkSyncMessage
  311. if err := proto.Unmarshal(buf, &bsm); err != nil {
  312. logrus.Errorf("Error decoding bulk sync message: %v", err)
  313. return
  314. }
  315. if bsm.LTime > 0 {
  316. nDB.tableClock.Witness(bsm.LTime)
  317. }
  318. nDB.handleMessage(bsm.Payload, true)
  319. // Don't respond to a bulk sync which was not unsolicited
  320. if !bsm.Unsolicited {
  321. nDB.Lock()
  322. ch, ok := nDB.bulkSyncAckTbl[bsm.NodeName]
  323. if ok {
  324. close(ch)
  325. delete(nDB.bulkSyncAckTbl, bsm.NodeName)
  326. }
  327. nDB.Unlock()
  328. return
  329. }
  330. var nodeAddr net.IP
  331. nDB.RLock()
  332. if node, ok := nDB.nodes[bsm.NodeName]; ok {
  333. nodeAddr = node.Addr
  334. }
  335. nDB.RUnlock()
  336. if err := nDB.bulkSyncNode(bsm.Networks, bsm.NodeName, false); err != nil {
  337. logrus.Errorf("Error in responding to bulk sync from node %s: %v", nodeAddr, err)
  338. }
  339. }
  340. func (nDB *NetworkDB) handleMessage(buf []byte, isBulkSync bool) {
  341. mType, data, err := decodeMessage(buf)
  342. if err != nil {
  343. logrus.Errorf("Error decoding gossip message to get message type: %v", err)
  344. return
  345. }
  346. switch mType {
  347. case MessageTypeNodeEvent:
  348. nDB.handleNodeMessage(data)
  349. case MessageTypeNetworkEvent:
  350. nDB.handleNetworkMessage(data)
  351. case MessageTypeTableEvent:
  352. nDB.handleTableMessage(data, isBulkSync)
  353. case MessageTypeBulkSync:
  354. nDB.handleBulkSync(data)
  355. case MessageTypeCompound:
  356. nDB.handleCompound(data, isBulkSync)
  357. default:
  358. logrus.Errorf("%s: unknown message type %d", nDB.config.NodeName, mType)
  359. }
  360. }
  361. func (d *delegate) NotifyMsg(buf []byte) {
  362. if len(buf) == 0 {
  363. return
  364. }
  365. d.nDB.handleMessage(buf, false)
  366. }
  367. func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
  368. msgs := d.nDB.networkBroadcasts.GetBroadcasts(overhead, limit)
  369. msgs = append(msgs, d.nDB.nodeBroadcasts.GetBroadcasts(overhead, limit)...)
  370. return msgs
  371. }
  372. func (d *delegate) LocalState(join bool) []byte {
  373. if join {
  374. // Update all the local node/network state to a new time to
  375. // force update on the node we are trying to rejoin, just in
  376. // case that node has these in leaving state still. This is
  377. // facilitate fast convergence after recovering from a gossip
  378. // failure.
  379. d.nDB.updateLocalNetworkTime()
  380. }
  381. d.nDB.RLock()
  382. defer d.nDB.RUnlock()
  383. pp := NetworkPushPull{
  384. LTime: d.nDB.networkClock.Time(),
  385. NodeName: d.nDB.config.NodeName,
  386. }
  387. for name, nn := range d.nDB.networks {
  388. for _, n := range nn {
  389. pp.Networks = append(pp.Networks, &NetworkEntry{
  390. LTime: n.ltime,
  391. NetworkID: n.id,
  392. NodeName: name,
  393. Leaving: n.leaving,
  394. })
  395. }
  396. }
  397. buf, err := encodeMessage(MessageTypePushPull, &pp)
  398. if err != nil {
  399. logrus.Errorf("Failed to encode local network state: %v", err)
  400. return nil
  401. }
  402. return buf
  403. }
  404. func (d *delegate) MergeRemoteState(buf []byte, isJoin bool) {
  405. if len(buf) == 0 {
  406. logrus.Error("zero byte remote network state received")
  407. return
  408. }
  409. var gMsg GossipMessage
  410. err := proto.Unmarshal(buf, &gMsg)
  411. if err != nil {
  412. logrus.Errorf("Error unmarshalling push pull messsage: %v", err)
  413. return
  414. }
  415. if gMsg.Type != MessageTypePushPull {
  416. logrus.Errorf("Invalid message type %v received from remote", buf[0])
  417. }
  418. pp := NetworkPushPull{}
  419. if err := proto.Unmarshal(gMsg.Data, &pp); err != nil {
  420. logrus.Errorf("Failed to decode remote network state: %v", err)
  421. return
  422. }
  423. nodeEvent := &NodeEvent{
  424. LTime: pp.LTime,
  425. NodeName: pp.NodeName,
  426. Type: NodeEventTypeJoin,
  427. }
  428. d.nDB.handleNodeEvent(nodeEvent)
  429. for _, n := range pp.Networks {
  430. nEvent := &NetworkEvent{
  431. LTime: n.LTime,
  432. NodeName: n.NodeName,
  433. NetworkID: n.NetworkID,
  434. Type: NetworkEventTypeJoin,
  435. }
  436. if n.Leaving {
  437. nEvent.Type = NetworkEventTypeLeave
  438. }
  439. d.nDB.handleNetworkEvent(nEvent)
  440. }
  441. }