delegate.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494
  1. package networkdb
  2. import (
  3. "context"
  4. "net"
  5. "time"
  6. "github.com/containerd/log"
  7. "github.com/gogo/protobuf/proto"
  8. )
  9. type delegate struct {
  10. nDB *NetworkDB
  11. }
  12. func (d *delegate) NodeMeta(limit int) []byte {
  13. return []byte{}
  14. }
  15. func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
  16. // Update our local clock if the received messages has newer
  17. // time.
  18. nDB.networkClock.Witness(nEvent.LTime)
  19. nDB.Lock()
  20. defer nDB.Unlock()
  21. // check if the node exists
  22. n, _, _ := nDB.findNode(nEvent.NodeName)
  23. if n == nil {
  24. return false
  25. }
  26. // check if the event is fresh
  27. if n.ltime >= nEvent.LTime {
  28. return false
  29. }
  30. // If we are here means that the event is fresher and the node is known. Update the laport time
  31. n.ltime = nEvent.LTime
  32. // If the node is not known from memberlist we cannot process save any state of it else if it actually
  33. // dies we won't receive any notification and we will remain stuck with it
  34. if _, ok := nDB.nodes[nEvent.NodeName]; !ok {
  35. log.G(context.TODO()).Errorf("node: %s is unknown to memberlist", nEvent.NodeName)
  36. return false
  37. }
  38. switch nEvent.Type {
  39. case NodeEventTypeJoin:
  40. moved, err := nDB.changeNodeState(n.Name, nodeActiveState)
  41. if err != nil {
  42. log.G(context.TODO()).WithError(err).Error("unable to find the node to move")
  43. return false
  44. }
  45. if moved {
  46. log.G(context.TODO()).Infof("%v(%v): Node join event for %s/%s", nDB.config.Hostname, nDB.config.NodeID, n.Name, n.Addr)
  47. }
  48. return moved
  49. case NodeEventTypeLeave:
  50. moved, err := nDB.changeNodeState(n.Name, nodeLeftState)
  51. if err != nil {
  52. log.G(context.TODO()).WithError(err).Error("unable to find the node to move")
  53. return false
  54. }
  55. if moved {
  56. log.G(context.TODO()).Infof("%v(%v): Node leave event for %s/%s", nDB.config.Hostname, nDB.config.NodeID, n.Name, n.Addr)
  57. }
  58. return moved
  59. }
  60. return false
  61. }
  62. func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
  63. // Update our local clock if the received messages has newer
  64. // time.
  65. nDB.networkClock.Witness(nEvent.LTime)
  66. nDB.Lock()
  67. defer nDB.Unlock()
  68. if nEvent.NodeName == nDB.config.NodeID {
  69. return false
  70. }
  71. nodeNetworks, ok := nDB.networks[nEvent.NodeName]
  72. if !ok {
  73. // We haven't heard about this node at all. Ignore the leave
  74. if nEvent.Type == NetworkEventTypeLeave {
  75. return false
  76. }
  77. nodeNetworks = make(map[string]*network)
  78. nDB.networks[nEvent.NodeName] = nodeNetworks
  79. }
  80. if n, ok := nodeNetworks[nEvent.NetworkID]; ok {
  81. // We have the latest state. Ignore the event
  82. // since it is stale.
  83. if n.ltime >= nEvent.LTime {
  84. return false
  85. }
  86. n.ltime = nEvent.LTime
  87. n.leaving = nEvent.Type == NetworkEventTypeLeave
  88. if n.leaving {
  89. n.reapTime = nDB.config.reapNetworkInterval
  90. // The remote node is leaving the network, but not the gossip cluster.
  91. // Mark all its entries in deleted state, this will guarantee that
  92. // if some node bulk sync with us, the deleted state of
  93. // these entries will be propagated.
  94. nDB.deleteNodeNetworkEntries(nEvent.NetworkID, nEvent.NodeName)
  95. }
  96. if nEvent.Type == NetworkEventTypeLeave {
  97. nDB.deleteNetworkNode(nEvent.NetworkID, nEvent.NodeName)
  98. } else {
  99. nDB.addNetworkNode(nEvent.NetworkID, nEvent.NodeName)
  100. }
  101. return true
  102. }
  103. if nEvent.Type == NetworkEventTypeLeave {
  104. return false
  105. }
  106. // If the node is not known from memberlist we cannot process save any state of it else if it actually
  107. // dies we won't receive any notification and we will remain stuck with it
  108. if _, ok := nDB.nodes[nEvent.NodeName]; !ok {
  109. return false
  110. }
  111. // This remote network join is being seen the first time.
  112. nodeNetworks[nEvent.NetworkID] = &network{
  113. id: nEvent.NetworkID,
  114. ltime: nEvent.LTime,
  115. }
  116. nDB.addNetworkNode(nEvent.NetworkID, nEvent.NodeName)
  117. return true
  118. }
  119. func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent, isBulkSync bool) bool {
  120. // Update our local clock if the received messages has newer time.
  121. nDB.tableClock.Witness(tEvent.LTime)
  122. // Ignore the table events for networks that are in the process of going away
  123. nDB.RLock()
  124. networks := nDB.networks[nDB.config.NodeID]
  125. network, ok := networks[tEvent.NetworkID]
  126. // Check if the owner of the event is still part of the network
  127. nodes := nDB.networkNodes[tEvent.NetworkID]
  128. var nodePresent bool
  129. for _, node := range nodes {
  130. if node == tEvent.NodeName {
  131. nodePresent = true
  132. break
  133. }
  134. }
  135. nDB.RUnlock()
  136. if !ok || network.leaving || !nodePresent {
  137. // I'm out of the network OR the event owner is not anymore part of the network so do not propagate
  138. return false
  139. }
  140. nDB.Lock()
  141. e, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key)
  142. if err == nil {
  143. // We have the latest state. Ignore the event
  144. // since it is stale.
  145. if e.ltime >= tEvent.LTime {
  146. nDB.Unlock()
  147. return false
  148. }
  149. } else if tEvent.Type == TableEventTypeDelete && !isBulkSync {
  150. nDB.Unlock()
  151. // We don't know the entry, the entry is being deleted and the message is an async message
  152. // In this case the safest approach is to ignore it, it is possible that the queue grew so much to
  153. // exceed the garbage collection time (the residual reap time that is in the message is not being
  154. // updated, to avoid inserting too many messages in the queue).
  155. // Instead the messages coming from TCP bulk sync are safe with the latest value for the garbage collection time
  156. return false
  157. }
  158. e = &entry{
  159. ltime: tEvent.LTime,
  160. node: tEvent.NodeName,
  161. value: tEvent.Value,
  162. deleting: tEvent.Type == TableEventTypeDelete,
  163. reapTime: time.Duration(tEvent.ResidualReapTime) * time.Second,
  164. }
  165. // All the entries marked for deletion should have a reapTime set greater than 0
  166. // This case can happen if the cluster is running different versions of the engine where the old version does not have the
  167. // field. If that is not the case, this can be a BUG
  168. if e.deleting && e.reapTime == 0 {
  169. log.G(context.TODO()).Warnf("%v(%v) handleTableEvent object %+v has a 0 reapTime, is the cluster running the same docker engine version?",
  170. nDB.config.Hostname, nDB.config.NodeID, tEvent)
  171. e.reapTime = nDB.config.reapEntryInterval
  172. }
  173. nDB.createOrUpdateEntry(tEvent.NetworkID, tEvent.TableName, tEvent.Key, e)
  174. nDB.Unlock()
  175. if err != nil && tEvent.Type == TableEventTypeDelete {
  176. // Again we don't know the entry but this is coming from a TCP sync so the message body is up to date.
  177. // We had saved the state so to speed up convergence and be able to avoid accepting create events.
  178. // Now we will rebroadcast the message if 2 conditions are met:
  179. // 1) we had already synced this network (during the network join)
  180. // 2) the residual reapTime is higher than 1/6 of the total reapTime.
  181. // If the residual reapTime is lower or equal to 1/6 of the total reapTime don't bother broadcasting it around
  182. // most likely the cluster is already aware of it
  183. // This also reduce the possibility that deletion of entries close to their garbage collection ends up circuling around
  184. // forever
  185. // log.G(ctx).Infof("exiting on delete not knowing the obj with rebroadcast:%t", network.inSync)
  186. return network.inSync && e.reapTime > nDB.config.reapEntryInterval/6
  187. }
  188. var op opType
  189. switch tEvent.Type {
  190. case TableEventTypeCreate:
  191. op = opCreate
  192. case TableEventTypeUpdate:
  193. op = opUpdate
  194. case TableEventTypeDelete:
  195. op = opDelete
  196. }
  197. nDB.broadcaster.Write(makeEvent(op, tEvent.TableName, tEvent.NetworkID, tEvent.Key, tEvent.Value))
  198. return network.inSync
  199. }
  200. func (nDB *NetworkDB) handleCompound(buf []byte, isBulkSync bool) {
  201. // Decode the parts
  202. parts, err := decodeCompoundMessage(buf)
  203. if err != nil {
  204. log.G(context.TODO()).Errorf("Failed to decode compound request: %v", err)
  205. return
  206. }
  207. // Handle each message
  208. for _, part := range parts {
  209. nDB.handleMessage(part, isBulkSync)
  210. }
  211. }
  212. func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
  213. var tEvent TableEvent
  214. if err := proto.Unmarshal(buf, &tEvent); err != nil {
  215. log.G(context.TODO()).Errorf("Error decoding table event message: %v", err)
  216. return
  217. }
  218. // Ignore messages that this node generated.
  219. if tEvent.NodeName == nDB.config.NodeID {
  220. return
  221. }
  222. if rebroadcast := nDB.handleTableEvent(&tEvent, isBulkSync); rebroadcast {
  223. var err error
  224. buf, err = encodeRawMessage(MessageTypeTableEvent, buf)
  225. if err != nil {
  226. log.G(context.TODO()).Errorf("Error marshalling gossip message for network event rebroadcast: %v", err)
  227. return
  228. }
  229. nDB.RLock()
  230. n, ok := nDB.networks[nDB.config.NodeID][tEvent.NetworkID]
  231. nDB.RUnlock()
  232. // if the network is not there anymore, OR we are leaving the network OR the broadcast queue is not present
  233. if !ok || n.leaving || n.tableBroadcasts == nil {
  234. return
  235. }
  236. // if the queue is over the threshold, avoid distributing information coming from TCP sync
  237. if isBulkSync && n.tableBroadcasts.NumQueued() > maxQueueLenBroadcastOnSync {
  238. return
  239. }
  240. n.tableBroadcasts.QueueBroadcast(&tableEventMessage{
  241. msg: buf,
  242. id: tEvent.NetworkID,
  243. tname: tEvent.TableName,
  244. key: tEvent.Key,
  245. })
  246. }
  247. }
  248. func (nDB *NetworkDB) handleNodeMessage(buf []byte) {
  249. var nEvent NodeEvent
  250. if err := proto.Unmarshal(buf, &nEvent); err != nil {
  251. log.G(context.TODO()).Errorf("Error decoding node event message: %v", err)
  252. return
  253. }
  254. if rebroadcast := nDB.handleNodeEvent(&nEvent); rebroadcast {
  255. var err error
  256. buf, err = encodeRawMessage(MessageTypeNodeEvent, buf)
  257. if err != nil {
  258. log.G(context.TODO()).Errorf("Error marshalling gossip message for node event rebroadcast: %v", err)
  259. return
  260. }
  261. nDB.nodeBroadcasts.QueueBroadcast(&nodeEventMessage{
  262. msg: buf,
  263. })
  264. }
  265. }
  266. func (nDB *NetworkDB) handleNetworkMessage(buf []byte) {
  267. var nEvent NetworkEvent
  268. if err := proto.Unmarshal(buf, &nEvent); err != nil {
  269. log.G(context.TODO()).Errorf("Error decoding network event message: %v", err)
  270. return
  271. }
  272. if rebroadcast := nDB.handleNetworkEvent(&nEvent); rebroadcast {
  273. var err error
  274. buf, err = encodeRawMessage(MessageTypeNetworkEvent, buf)
  275. if err != nil {
  276. log.G(context.TODO()).Errorf("Error marshalling gossip message for network event rebroadcast: %v", err)
  277. return
  278. }
  279. nDB.networkBroadcasts.QueueBroadcast(&networkEventMessage{
  280. msg: buf,
  281. id: nEvent.NetworkID,
  282. node: nEvent.NodeName,
  283. })
  284. }
  285. }
  286. func (nDB *NetworkDB) handleBulkSync(buf []byte) {
  287. var bsm BulkSyncMessage
  288. if err := proto.Unmarshal(buf, &bsm); err != nil {
  289. log.G(context.TODO()).Errorf("Error decoding bulk sync message: %v", err)
  290. return
  291. }
  292. if bsm.LTime > 0 {
  293. nDB.tableClock.Witness(bsm.LTime)
  294. }
  295. nDB.handleMessage(bsm.Payload, true)
  296. // Don't respond to a bulk sync which was not unsolicited
  297. if !bsm.Unsolicited {
  298. nDB.Lock()
  299. ch, ok := nDB.bulkSyncAckTbl[bsm.NodeName]
  300. if ok {
  301. close(ch)
  302. delete(nDB.bulkSyncAckTbl, bsm.NodeName)
  303. }
  304. nDB.Unlock()
  305. return
  306. }
  307. var nodeAddr net.IP
  308. nDB.RLock()
  309. if node, ok := nDB.nodes[bsm.NodeName]; ok {
  310. nodeAddr = node.Addr
  311. }
  312. nDB.RUnlock()
  313. if err := nDB.bulkSyncNode(bsm.Networks, bsm.NodeName, false); err != nil {
  314. log.G(context.TODO()).Errorf("Error in responding to bulk sync from node %s: %v", nodeAddr, err)
  315. }
  316. }
  317. func (nDB *NetworkDB) handleMessage(buf []byte, isBulkSync bool) {
  318. mType, data, err := decodeMessage(buf)
  319. if err != nil {
  320. log.G(context.TODO()).Errorf("Error decoding gossip message to get message type: %v", err)
  321. return
  322. }
  323. switch mType {
  324. case MessageTypeNodeEvent:
  325. nDB.handleNodeMessage(data)
  326. case MessageTypeNetworkEvent:
  327. nDB.handleNetworkMessage(data)
  328. case MessageTypeTableEvent:
  329. nDB.handleTableMessage(data, isBulkSync)
  330. case MessageTypeBulkSync:
  331. nDB.handleBulkSync(data)
  332. case MessageTypeCompound:
  333. nDB.handleCompound(data, isBulkSync)
  334. default:
  335. log.G(context.TODO()).Errorf("%v(%v): unknown message type %d", nDB.config.Hostname, nDB.config.NodeID, mType)
  336. }
  337. }
  338. func (d *delegate) NotifyMsg(buf []byte) {
  339. if len(buf) == 0 {
  340. return
  341. }
  342. d.nDB.handleMessage(buf, false)
  343. }
  344. func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
  345. msgs := d.nDB.networkBroadcasts.GetBroadcasts(overhead, limit)
  346. msgs = append(msgs, d.nDB.nodeBroadcasts.GetBroadcasts(overhead, limit)...)
  347. return msgs
  348. }
  349. func (d *delegate) LocalState(join bool) []byte {
  350. if join {
  351. // Update all the local node/network state to a new time to
  352. // force update on the node we are trying to rejoin, just in
  353. // case that node has these in leaving state still. This is
  354. // facilitate fast convergence after recovering from a gossip
  355. // failure.
  356. d.nDB.updateLocalNetworkTime()
  357. }
  358. d.nDB.RLock()
  359. defer d.nDB.RUnlock()
  360. pp := NetworkPushPull{
  361. LTime: d.nDB.networkClock.Time(),
  362. NodeName: d.nDB.config.NodeID,
  363. }
  364. for name, nn := range d.nDB.networks {
  365. for _, n := range nn {
  366. pp.Networks = append(pp.Networks, &NetworkEntry{
  367. LTime: n.ltime,
  368. NetworkID: n.id,
  369. NodeName: name,
  370. Leaving: n.leaving,
  371. })
  372. }
  373. }
  374. buf, err := encodeMessage(MessageTypePushPull, &pp)
  375. if err != nil {
  376. log.G(context.TODO()).Errorf("Failed to encode local network state: %v", err)
  377. return nil
  378. }
  379. return buf
  380. }
  381. func (d *delegate) MergeRemoteState(buf []byte, isJoin bool) {
  382. if len(buf) == 0 {
  383. log.G(context.TODO()).Error("zero byte remote network state received")
  384. return
  385. }
  386. var gMsg GossipMessage
  387. err := proto.Unmarshal(buf, &gMsg)
  388. if err != nil {
  389. log.G(context.TODO()).Errorf("Error unmarshalling push pull message: %v", err)
  390. return
  391. }
  392. if gMsg.Type != MessageTypePushPull {
  393. log.G(context.TODO()).Errorf("Invalid message type %v received from remote", buf[0])
  394. }
  395. pp := NetworkPushPull{}
  396. if err := proto.Unmarshal(gMsg.Data, &pp); err != nil {
  397. log.G(context.TODO()).Errorf("Failed to decode remote network state: %v", err)
  398. return
  399. }
  400. nodeEvent := &NodeEvent{
  401. LTime: pp.LTime,
  402. NodeName: pp.NodeName,
  403. Type: NodeEventTypeJoin,
  404. }
  405. d.nDB.handleNodeEvent(nodeEvent)
  406. for _, n := range pp.Networks {
  407. nEvent := &NetworkEvent{
  408. LTime: n.LTime,
  409. NodeName: n.NodeName,
  410. NetworkID: n.NetworkID,
  411. Type: NetworkEventTypeJoin,
  412. }
  413. if n.Leaving {
  414. nEvent.Type = NetworkEventTypeLeave
  415. }
  416. d.nDB.handleNetworkEvent(nEvent)
  417. }
  418. }