delegate.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494
  1. package networkdb
  2. import (
  3. "net"
  4. "time"
  5. "github.com/gogo/protobuf/proto"
  6. "github.com/sirupsen/logrus"
  7. )
  8. type delegate struct {
  9. nDB *NetworkDB
  10. }
  11. func (d *delegate) NodeMeta(limit int) []byte {
  12. return []byte{}
  13. }
  14. func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
  15. // Update our local clock if the received messages has newer
  16. // time.
  17. nDB.networkClock.Witness(nEvent.LTime)
  18. nDB.Lock()
  19. defer nDB.Unlock()
  20. // check if the node exists
  21. n, _, _ := nDB.findNode(nEvent.NodeName)
  22. if n == nil {
  23. return false
  24. }
  25. // check if the event is fresh
  26. if n.ltime >= nEvent.LTime {
  27. return false
  28. }
  29. // If we are here means that the event is fresher and the node is known. Update the laport time
  30. n.ltime = nEvent.LTime
  31. // If the node is not known from memberlist we cannot process save any state of it else if it actually
  32. // dies we won't receive any notification and we will remain stuck with it
  33. if _, ok := nDB.nodes[nEvent.NodeName]; !ok {
  34. logrus.Errorf("node: %s is unknown to memberlist", nEvent.NodeName)
  35. return false
  36. }
  37. switch nEvent.Type {
  38. case NodeEventTypeJoin:
  39. moved, err := nDB.changeNodeState(n.Name, nodeActiveState)
  40. if err != nil {
  41. logrus.WithError(err).Error("unable to find the node to move")
  42. return false
  43. }
  44. if moved {
  45. logrus.Infof("%v(%v): Node join event for %s/%s", nDB.config.Hostname, nDB.config.NodeID, n.Name, n.Addr)
  46. }
  47. return moved
  48. case NodeEventTypeLeave:
  49. moved, err := nDB.changeNodeState(n.Name, nodeLeftState)
  50. if err != nil {
  51. logrus.WithError(err).Error("unable to find the node to move")
  52. return false
  53. }
  54. if moved {
  55. logrus.Infof("%v(%v): Node leave event for %s/%s", nDB.config.Hostname, nDB.config.NodeID, n.Name, n.Addr)
  56. }
  57. return moved
  58. }
  59. return false
  60. }
  61. func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
  62. // Update our local clock if the received messages has newer
  63. // time.
  64. nDB.networkClock.Witness(nEvent.LTime)
  65. nDB.Lock()
  66. defer nDB.Unlock()
  67. if nEvent.NodeName == nDB.config.NodeID {
  68. return false
  69. }
  70. nodeNetworks, ok := nDB.networks[nEvent.NodeName]
  71. if !ok {
  72. // We haven't heard about this node at all. Ignore the leave
  73. if nEvent.Type == NetworkEventTypeLeave {
  74. return false
  75. }
  76. nodeNetworks = make(map[string]*network)
  77. nDB.networks[nEvent.NodeName] = nodeNetworks
  78. }
  79. if n, ok := nodeNetworks[nEvent.NetworkID]; ok {
  80. // We have the latest state. Ignore the event
  81. // since it is stale.
  82. if n.ltime >= nEvent.LTime {
  83. return false
  84. }
  85. n.ltime = nEvent.LTime
  86. n.leaving = nEvent.Type == NetworkEventTypeLeave
  87. if n.leaving {
  88. n.reapTime = nDB.config.reapNetworkInterval
  89. // The remote node is leaving the network, but not the gossip cluster.
  90. // Mark all its entries in deleted state, this will guarantee that
  91. // if some node bulk sync with us, the deleted state of
  92. // these entries will be propagated.
  93. nDB.deleteNodeNetworkEntries(nEvent.NetworkID, nEvent.NodeName)
  94. }
  95. if nEvent.Type == NetworkEventTypeLeave {
  96. nDB.deleteNetworkNode(nEvent.NetworkID, nEvent.NodeName)
  97. } else {
  98. nDB.addNetworkNode(nEvent.NetworkID, nEvent.NodeName)
  99. }
  100. return true
  101. }
  102. if nEvent.Type == NetworkEventTypeLeave {
  103. return false
  104. }
  105. // If the node is not known from memberlist we cannot process save any state of it else if it actually
  106. // dies we won't receive any notification and we will remain stuck with it
  107. if _, ok := nDB.nodes[nEvent.NodeName]; !ok {
  108. return false
  109. }
  110. // This remote network join is being seen the first time.
  111. nodeNetworks[nEvent.NetworkID] = &network{
  112. id: nEvent.NetworkID,
  113. ltime: nEvent.LTime,
  114. }
  115. nDB.addNetworkNode(nEvent.NetworkID, nEvent.NodeName)
  116. return true
  117. }
  118. func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent, isBulkSync bool) bool {
  119. // Update our local clock if the received messages has newer time.
  120. nDB.tableClock.Witness(tEvent.LTime)
  121. // Ignore the table events for networks that are in the process of going away
  122. nDB.RLock()
  123. networks := nDB.networks[nDB.config.NodeID]
  124. network, ok := networks[tEvent.NetworkID]
  125. // Check if the owner of the event is still part of the network
  126. nodes := nDB.networkNodes[tEvent.NetworkID]
  127. var nodePresent bool
  128. for _, node := range nodes {
  129. if node == tEvent.NodeName {
  130. nodePresent = true
  131. break
  132. }
  133. }
  134. nDB.RUnlock()
  135. if !ok || network.leaving || !nodePresent {
  136. // I'm out of the network OR the event owner is not anymore part of the network so do not propagate
  137. return false
  138. }
  139. nDB.Lock()
  140. e, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key)
  141. if err == nil {
  142. // We have the latest state. Ignore the event
  143. // since it is stale.
  144. if e.ltime >= tEvent.LTime {
  145. nDB.Unlock()
  146. return false
  147. }
  148. } else if tEvent.Type == TableEventTypeDelete && !isBulkSync {
  149. nDB.Unlock()
  150. // We don't know the entry, the entry is being deleted and the message is an async message
  151. // In this case the safest approach is to ignore it, it is possible that the queue grew so much to
  152. // exceed the garbage collection time (the residual reap time that is in the message is not being
  153. // updated, to avoid inserting too many messages in the queue).
  154. // Instead the messages coming from TCP bulk sync are safe with the latest value for the garbage collection time
  155. return false
  156. }
  157. e = &entry{
  158. ltime: tEvent.LTime,
  159. node: tEvent.NodeName,
  160. value: tEvent.Value,
  161. deleting: tEvent.Type == TableEventTypeDelete,
  162. reapTime: time.Duration(tEvent.ResidualReapTime) * time.Second,
  163. }
  164. // All the entries marked for deletion should have a reapTime set greater than 0
  165. // This case can happen if the cluster is running different versions of the engine where the old version does not have the
  166. // field. If that is not the case, this can be a BUG
  167. if e.deleting && e.reapTime == 0 {
  168. logrus.Warnf("%v(%v) handleTableEvent object %+v has a 0 reapTime, is the cluster running the same docker engine version?",
  169. nDB.config.Hostname, nDB.config.NodeID, tEvent)
  170. e.reapTime = nDB.config.reapEntryInterval
  171. }
  172. nDB.createOrUpdateEntry(tEvent.NetworkID, tEvent.TableName, tEvent.Key, e)
  173. nDB.Unlock()
  174. if err != nil && tEvent.Type == TableEventTypeDelete {
  175. // Again we don't know the entry but this is coming from a TCP sync so the message body is up to date.
  176. // We had saved the state so to speed up convergence and be able to avoid accepting create events.
  177. // Now we will rebroadcast the message if 2 conditions are met:
  178. // 1) we had already synced this network (during the network join)
  179. // 2) the residual reapTime is higher than 1/6 of the total reapTime.
  180. // If the residual reapTime is lower or equal to 1/6 of the total reapTime don't bother broadcasting it around
  181. // most likely the cluster is already aware of it
  182. // This also reduce the possibility that deletion of entries close to their garbage collection ends up circuling around
  183. // forever
  184. //logrus.Infof("exiting on delete not knowing the obj with rebroadcast:%t", network.inSync)
  185. return network.inSync && e.reapTime > nDB.config.reapEntryInterval/6
  186. }
  187. var op opType
  188. switch tEvent.Type {
  189. case TableEventTypeCreate:
  190. op = opCreate
  191. case TableEventTypeUpdate:
  192. op = opUpdate
  193. case TableEventTypeDelete:
  194. op = opDelete
  195. }
  196. nDB.broadcaster.Write(makeEvent(op, tEvent.TableName, tEvent.NetworkID, tEvent.Key, tEvent.Value))
  197. return network.inSync
  198. }
  199. func (nDB *NetworkDB) handleCompound(buf []byte, isBulkSync bool) {
  200. // Decode the parts
  201. parts, err := decodeCompoundMessage(buf)
  202. if err != nil {
  203. logrus.Errorf("Failed to decode compound request: %v", err)
  204. return
  205. }
  206. // Handle each message
  207. for _, part := range parts {
  208. nDB.handleMessage(part, isBulkSync)
  209. }
  210. }
  211. func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
  212. var tEvent TableEvent
  213. if err := proto.Unmarshal(buf, &tEvent); err != nil {
  214. logrus.Errorf("Error decoding table event message: %v", err)
  215. return
  216. }
  217. // Ignore messages that this node generated.
  218. if tEvent.NodeName == nDB.config.NodeID {
  219. return
  220. }
  221. if rebroadcast := nDB.handleTableEvent(&tEvent, isBulkSync); rebroadcast {
  222. var err error
  223. buf, err = encodeRawMessage(MessageTypeTableEvent, buf)
  224. if err != nil {
  225. logrus.Errorf("Error marshalling gossip message for network event rebroadcast: %v", err)
  226. return
  227. }
  228. nDB.RLock()
  229. n, ok := nDB.networks[nDB.config.NodeID][tEvent.NetworkID]
  230. nDB.RUnlock()
  231. // if the network is not there anymore, OR we are leaving the network OR the broadcast queue is not present
  232. if !ok || n.leaving || n.tableBroadcasts == nil {
  233. return
  234. }
  235. // if the queue is over the threshold, avoid distributing information coming from TCP sync
  236. if isBulkSync && n.tableBroadcasts.NumQueued() > maxQueueLenBroadcastOnSync {
  237. return
  238. }
  239. n.tableBroadcasts.QueueBroadcast(&tableEventMessage{
  240. msg: buf,
  241. id: tEvent.NetworkID,
  242. tname: tEvent.TableName,
  243. key: tEvent.Key,
  244. })
  245. }
  246. }
  247. func (nDB *NetworkDB) handleNodeMessage(buf []byte) {
  248. var nEvent NodeEvent
  249. if err := proto.Unmarshal(buf, &nEvent); err != nil {
  250. logrus.Errorf("Error decoding node event message: %v", err)
  251. return
  252. }
  253. if rebroadcast := nDB.handleNodeEvent(&nEvent); rebroadcast {
  254. var err error
  255. buf, err = encodeRawMessage(MessageTypeNodeEvent, buf)
  256. if err != nil {
  257. logrus.Errorf("Error marshalling gossip message for node event rebroadcast: %v", err)
  258. return
  259. }
  260. nDB.nodeBroadcasts.QueueBroadcast(&nodeEventMessage{
  261. msg: buf,
  262. })
  263. }
  264. }
  265. func (nDB *NetworkDB) handleNetworkMessage(buf []byte) {
  266. var nEvent NetworkEvent
  267. if err := proto.Unmarshal(buf, &nEvent); err != nil {
  268. logrus.Errorf("Error decoding network event message: %v", err)
  269. return
  270. }
  271. if rebroadcast := nDB.handleNetworkEvent(&nEvent); rebroadcast {
  272. var err error
  273. buf, err = encodeRawMessage(MessageTypeNetworkEvent, buf)
  274. if err != nil {
  275. logrus.Errorf("Error marshalling gossip message for network event rebroadcast: %v", err)
  276. return
  277. }
  278. nDB.networkBroadcasts.QueueBroadcast(&networkEventMessage{
  279. msg: buf,
  280. id: nEvent.NetworkID,
  281. node: nEvent.NodeName,
  282. })
  283. }
  284. }
  285. func (nDB *NetworkDB) handleBulkSync(buf []byte) {
  286. var bsm BulkSyncMessage
  287. if err := proto.Unmarshal(buf, &bsm); err != nil {
  288. logrus.Errorf("Error decoding bulk sync message: %v", err)
  289. return
  290. }
  291. if bsm.LTime > 0 {
  292. nDB.tableClock.Witness(bsm.LTime)
  293. }
  294. nDB.handleMessage(bsm.Payload, true)
  295. // Don't respond to a bulk sync which was not unsolicited
  296. if !bsm.Unsolicited {
  297. nDB.Lock()
  298. ch, ok := nDB.bulkSyncAckTbl[bsm.NodeName]
  299. if ok {
  300. close(ch)
  301. delete(nDB.bulkSyncAckTbl, bsm.NodeName)
  302. }
  303. nDB.Unlock()
  304. return
  305. }
  306. var nodeAddr net.IP
  307. nDB.RLock()
  308. if node, ok := nDB.nodes[bsm.NodeName]; ok {
  309. nodeAddr = node.Addr
  310. }
  311. nDB.RUnlock()
  312. if err := nDB.bulkSyncNode(bsm.Networks, bsm.NodeName, false); err != nil {
  313. logrus.Errorf("Error in responding to bulk sync from node %s: %v", nodeAddr, err)
  314. }
  315. }
  316. func (nDB *NetworkDB) handleMessage(buf []byte, isBulkSync bool) {
  317. mType, data, err := decodeMessage(buf)
  318. if err != nil {
  319. logrus.Errorf("Error decoding gossip message to get message type: %v", err)
  320. return
  321. }
  322. switch mType {
  323. case MessageTypeNodeEvent:
  324. nDB.handleNodeMessage(data)
  325. case MessageTypeNetworkEvent:
  326. nDB.handleNetworkMessage(data)
  327. case MessageTypeTableEvent:
  328. nDB.handleTableMessage(data, isBulkSync)
  329. case MessageTypeBulkSync:
  330. nDB.handleBulkSync(data)
  331. case MessageTypeCompound:
  332. nDB.handleCompound(data, isBulkSync)
  333. default:
  334. logrus.Errorf("%v(%v): unknown message type %d", nDB.config.Hostname, nDB.config.NodeID, mType)
  335. }
  336. }
  337. func (d *delegate) NotifyMsg(buf []byte) {
  338. if len(buf) == 0 {
  339. return
  340. }
  341. d.nDB.handleMessage(buf, false)
  342. }
  343. func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
  344. msgs := d.nDB.networkBroadcasts.GetBroadcasts(overhead, limit)
  345. msgs = append(msgs, d.nDB.nodeBroadcasts.GetBroadcasts(overhead, limit)...)
  346. return msgs
  347. }
  348. func (d *delegate) LocalState(join bool) []byte {
  349. if join {
  350. // Update all the local node/network state to a new time to
  351. // force update on the node we are trying to rejoin, just in
  352. // case that node has these in leaving state still. This is
  353. // facilitate fast convergence after recovering from a gossip
  354. // failure.
  355. d.nDB.updateLocalNetworkTime()
  356. }
  357. d.nDB.RLock()
  358. defer d.nDB.RUnlock()
  359. pp := NetworkPushPull{
  360. LTime: d.nDB.networkClock.Time(),
  361. NodeName: d.nDB.config.NodeID,
  362. }
  363. for name, nn := range d.nDB.networks {
  364. for _, n := range nn {
  365. pp.Networks = append(pp.Networks, &NetworkEntry{
  366. LTime: n.ltime,
  367. NetworkID: n.id,
  368. NodeName: name,
  369. Leaving: n.leaving,
  370. })
  371. }
  372. }
  373. buf, err := encodeMessage(MessageTypePushPull, &pp)
  374. if err != nil {
  375. logrus.Errorf("Failed to encode local network state: %v", err)
  376. return nil
  377. }
  378. return buf
  379. }
  380. func (d *delegate) MergeRemoteState(buf []byte, isJoin bool) {
  381. if len(buf) == 0 {
  382. logrus.Error("zero byte remote network state received")
  383. return
  384. }
  385. var gMsg GossipMessage
  386. err := proto.Unmarshal(buf, &gMsg)
  387. if err != nil {
  388. logrus.Errorf("Error unmarshalling push pull message: %v", err)
  389. return
  390. }
  391. if gMsg.Type != MessageTypePushPull {
  392. logrus.Errorf("Invalid message type %v received from remote", buf[0])
  393. }
  394. pp := NetworkPushPull{}
  395. if err := proto.Unmarshal(gMsg.Data, &pp); err != nil {
  396. logrus.Errorf("Failed to decode remote network state: %v", err)
  397. return
  398. }
  399. nodeEvent := &NodeEvent{
  400. LTime: pp.LTime,
  401. NodeName: pp.NodeName,
  402. Type: NodeEventTypeJoin,
  403. }
  404. d.nDB.handleNodeEvent(nodeEvent)
  405. for _, n := range pp.Networks {
  406. nEvent := &NetworkEvent{
  407. LTime: n.LTime,
  408. NodeName: n.NodeName,
  409. NetworkID: n.NetworkID,
  410. Type: NetworkEventTypeJoin,
  411. }
  412. if n.Leaving {
  413. nEvent.Type = NetworkEventTypeLeave
  414. }
  415. d.nDB.handleNetworkEvent(nEvent)
  416. }
  417. }