delegate.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481
  1. package networkdb
  2. import (
  3. "net"
  4. "time"
  5. "github.com/gogo/protobuf/proto"
  6. "github.com/sirupsen/logrus"
  7. )
  8. type delegate struct {
  9. nDB *NetworkDB
  10. }
  11. func (d *delegate) NodeMeta(limit int) []byte {
  12. return []byte{}
  13. }
  14. func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
  15. // Update our local clock if the received messages has newer
  16. // time.
  17. nDB.networkClock.Witness(nEvent.LTime)
  18. nDB.RLock()
  19. defer nDB.RUnlock()
  20. // check if the node exists
  21. n, _, _ := nDB.findNode(nEvent.NodeName)
  22. if n == nil {
  23. return false
  24. }
  25. // check if the event is fresh
  26. if n.ltime >= nEvent.LTime {
  27. return false
  28. }
  29. // If we are here means that the event is fresher and the node is known. Update the laport time
  30. n.ltime = nEvent.LTime
  31. // If it is a node leave event for a manager and this is the only manager we
  32. // know of we want the reconnect logic to kick in. In a single manager
  33. // cluster manager's gossip can't be bootstrapped unless some other node
  34. // connects to it.
  35. if len(nDB.bootStrapIP) == 1 && nEvent.Type == NodeEventTypeLeave {
  36. for _, ip := range nDB.bootStrapIP {
  37. if ip.Equal(n.Addr) {
  38. return true
  39. }
  40. }
  41. }
  42. switch nEvent.Type {
  43. case NodeEventTypeJoin:
  44. moved, err := nDB.changeNodeState(n.Name, nodeActiveState)
  45. if err != nil {
  46. logrus.WithError(err).Error("unable to find the node to move")
  47. return false
  48. }
  49. if moved {
  50. logrus.Infof("%v(%v): Node join event for %s/%s", nDB.config.Hostname, nDB.config.NodeID, n.Name, n.Addr)
  51. }
  52. return moved
  53. case NodeEventTypeLeave:
  54. moved, err := nDB.changeNodeState(n.Name, nodeLeftState)
  55. if err != nil {
  56. logrus.WithError(err).Error("unable to find the node to move")
  57. return false
  58. }
  59. if moved {
  60. logrus.Infof("%v(%v): Node leave event for %s/%s", nDB.config.Hostname, nDB.config.NodeID, n.Name, n.Addr)
  61. }
  62. return moved
  63. }
  64. return false
  65. }
  66. func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
  67. // Update our local clock if the received messages has newer
  68. // time.
  69. nDB.networkClock.Witness(nEvent.LTime)
  70. nDB.Lock()
  71. defer nDB.Unlock()
  72. if nEvent.NodeName == nDB.config.NodeID {
  73. return false
  74. }
  75. nodeNetworks, ok := nDB.networks[nEvent.NodeName]
  76. if !ok {
  77. // We haven't heard about this node at all. Ignore the leave
  78. if nEvent.Type == NetworkEventTypeLeave {
  79. return false
  80. }
  81. nodeNetworks = make(map[string]*network)
  82. nDB.networks[nEvent.NodeName] = nodeNetworks
  83. }
  84. if n, ok := nodeNetworks[nEvent.NetworkID]; ok {
  85. // We have the latest state. Ignore the event
  86. // since it is stale.
  87. if n.ltime >= nEvent.LTime {
  88. return false
  89. }
  90. n.ltime = nEvent.LTime
  91. n.leaving = nEvent.Type == NetworkEventTypeLeave
  92. if n.leaving {
  93. n.reapTime = nDB.config.reapNetworkInterval
  94. // The remote node is leaving the network, but not the gossip cluster.
  95. // Mark all its entries in deleted state, this will guarantee that
  96. // if some node bulk sync with us, the deleted state of
  97. // these entries will be propagated.
  98. nDB.deleteNodeNetworkEntries(nEvent.NetworkID, nEvent.NodeName)
  99. }
  100. if nEvent.Type == NetworkEventTypeLeave {
  101. nDB.deleteNetworkNode(nEvent.NetworkID, nEvent.NodeName)
  102. } else {
  103. nDB.addNetworkNode(nEvent.NetworkID, nEvent.NodeName)
  104. }
  105. return true
  106. }
  107. if nEvent.Type == NetworkEventTypeLeave {
  108. return false
  109. }
  110. // If the node is not known from memberlist we cannot process save any state of it else if it actually
  111. // dies we won't receive any notification and we will remain stuck with it
  112. if _, ok := nDB.nodes[nEvent.NodeName]; !ok {
  113. return false
  114. }
  115. // This remote network join is being seen the first time.
  116. nodeNetworks[nEvent.NetworkID] = &network{
  117. id: nEvent.NetworkID,
  118. ltime: nEvent.LTime,
  119. }
  120. nDB.addNetworkNode(nEvent.NetworkID, nEvent.NodeName)
  121. return true
  122. }
  123. func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
  124. // Update our local clock if the received messages has newer time.
  125. nDB.tableClock.Witness(tEvent.LTime)
  126. // Ignore the table events for networks that are in the process of going away
  127. nDB.RLock()
  128. networks := nDB.networks[nDB.config.NodeID]
  129. network, ok := networks[tEvent.NetworkID]
  130. // Check if the owner of the event is still part of the network
  131. nodes := nDB.networkNodes[tEvent.NetworkID]
  132. var nodePresent bool
  133. for _, node := range nodes {
  134. if node == tEvent.NodeName {
  135. nodePresent = true
  136. break
  137. }
  138. }
  139. nDB.RUnlock()
  140. if !ok || network.leaving || !nodePresent {
  141. // I'm out of the network OR the event owner is not anymore part of the network so do not propagate
  142. return false
  143. }
  144. nDB.Lock()
  145. e, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key)
  146. if err == nil {
  147. // We have the latest state. Ignore the event
  148. // since it is stale.
  149. if e.ltime >= tEvent.LTime {
  150. nDB.Unlock()
  151. return false
  152. }
  153. }
  154. e = &entry{
  155. ltime: tEvent.LTime,
  156. node: tEvent.NodeName,
  157. value: tEvent.Value,
  158. deleting: tEvent.Type == TableEventTypeDelete,
  159. reapTime: time.Duration(tEvent.ResidualReapTime) * time.Second,
  160. }
  161. // All the entries marked for deletion should have a reapTime set greater than 0
  162. // This case can happen if the cluster is running different versions of the engine where the old version does not have the
  163. // field. If that is not the case, this can be a BUG
  164. if e.deleting && e.reapTime == 0 {
  165. logrus.Warnf("%v(%v) handleTableEvent object %+v has a 0 reapTime, is the cluster running the same docker engine version?",
  166. nDB.config.Hostname, nDB.config.NodeID, tEvent)
  167. e.reapTime = nDB.config.reapEntryInterval
  168. }
  169. nDB.createOrUpdateEntry(tEvent.NetworkID, tEvent.TableName, tEvent.Key, e)
  170. nDB.Unlock()
  171. if err != nil && tEvent.Type == TableEventTypeDelete {
  172. // If it is a delete event and we did not have a state for it, don't propagate to the application
  173. // If the residual reapTime is lower or equal to 1/6 of the total reapTime don't bother broadcasting it around
  174. // most likely the cluster is already aware of it, if not who will sync with this node will catch the state too.
  175. // This also avoids that deletion of entries close to their garbage collection ends up circuling around forever
  176. return e.reapTime > nDB.config.reapEntryInterval/6
  177. }
  178. var op opType
  179. switch tEvent.Type {
  180. case TableEventTypeCreate:
  181. op = opCreate
  182. case TableEventTypeUpdate:
  183. op = opUpdate
  184. case TableEventTypeDelete:
  185. op = opDelete
  186. }
  187. nDB.broadcaster.Write(makeEvent(op, tEvent.TableName, tEvent.NetworkID, tEvent.Key, tEvent.Value))
  188. return true
  189. }
  190. func (nDB *NetworkDB) handleCompound(buf []byte, isBulkSync bool) {
  191. // Decode the parts
  192. parts, err := decodeCompoundMessage(buf)
  193. if err != nil {
  194. logrus.Errorf("Failed to decode compound request: %v", err)
  195. return
  196. }
  197. // Handle each message
  198. for _, part := range parts {
  199. nDB.handleMessage(part, isBulkSync)
  200. }
  201. }
  202. func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
  203. var tEvent TableEvent
  204. if err := proto.Unmarshal(buf, &tEvent); err != nil {
  205. logrus.Errorf("Error decoding table event message: %v", err)
  206. return
  207. }
  208. // Ignore messages that this node generated.
  209. if tEvent.NodeName == nDB.config.NodeID {
  210. return
  211. }
  212. if rebroadcast := nDB.handleTableEvent(&tEvent); rebroadcast {
  213. var err error
  214. buf, err = encodeRawMessage(MessageTypeTableEvent, buf)
  215. if err != nil {
  216. logrus.Errorf("Error marshalling gossip message for network event rebroadcast: %v", err)
  217. return
  218. }
  219. nDB.RLock()
  220. n, ok := nDB.networks[nDB.config.NodeID][tEvent.NetworkID]
  221. nDB.RUnlock()
  222. // if the network is not there anymore, OR we are leaving the network OR the broadcast queue is not present
  223. if !ok || n.leaving || n.tableBroadcasts == nil {
  224. return
  225. }
  226. n.tableBroadcasts.QueueBroadcast(&tableEventMessage{
  227. msg: buf,
  228. id: tEvent.NetworkID,
  229. tname: tEvent.TableName,
  230. key: tEvent.Key,
  231. node: tEvent.NodeName,
  232. })
  233. }
  234. }
  235. func (nDB *NetworkDB) handleNodeMessage(buf []byte) {
  236. var nEvent NodeEvent
  237. if err := proto.Unmarshal(buf, &nEvent); err != nil {
  238. logrus.Errorf("Error decoding node event message: %v", err)
  239. return
  240. }
  241. if rebroadcast := nDB.handleNodeEvent(&nEvent); rebroadcast {
  242. var err error
  243. buf, err = encodeRawMessage(MessageTypeNodeEvent, buf)
  244. if err != nil {
  245. logrus.Errorf("Error marshalling gossip message for node event rebroadcast: %v", err)
  246. return
  247. }
  248. nDB.nodeBroadcasts.QueueBroadcast(&nodeEventMessage{
  249. msg: buf,
  250. })
  251. }
  252. }
  253. func (nDB *NetworkDB) handleNetworkMessage(buf []byte) {
  254. var nEvent NetworkEvent
  255. if err := proto.Unmarshal(buf, &nEvent); err != nil {
  256. logrus.Errorf("Error decoding network event message: %v", err)
  257. return
  258. }
  259. if rebroadcast := nDB.handleNetworkEvent(&nEvent); rebroadcast {
  260. var err error
  261. buf, err = encodeRawMessage(MessageTypeNetworkEvent, buf)
  262. if err != nil {
  263. logrus.Errorf("Error marshalling gossip message for network event rebroadcast: %v", err)
  264. return
  265. }
  266. nDB.networkBroadcasts.QueueBroadcast(&networkEventMessage{
  267. msg: buf,
  268. id: nEvent.NetworkID,
  269. node: nEvent.NodeName,
  270. })
  271. }
  272. }
  273. func (nDB *NetworkDB) handleBulkSync(buf []byte) {
  274. var bsm BulkSyncMessage
  275. if err := proto.Unmarshal(buf, &bsm); err != nil {
  276. logrus.Errorf("Error decoding bulk sync message: %v", err)
  277. return
  278. }
  279. if bsm.LTime > 0 {
  280. nDB.tableClock.Witness(bsm.LTime)
  281. }
  282. nDB.handleMessage(bsm.Payload, true)
  283. // Don't respond to a bulk sync which was not unsolicited
  284. if !bsm.Unsolicited {
  285. nDB.Lock()
  286. ch, ok := nDB.bulkSyncAckTbl[bsm.NodeName]
  287. if ok {
  288. close(ch)
  289. delete(nDB.bulkSyncAckTbl, bsm.NodeName)
  290. }
  291. nDB.Unlock()
  292. return
  293. }
  294. var nodeAddr net.IP
  295. nDB.RLock()
  296. if node, ok := nDB.nodes[bsm.NodeName]; ok {
  297. nodeAddr = node.Addr
  298. }
  299. nDB.RUnlock()
  300. if err := nDB.bulkSyncNode(bsm.Networks, bsm.NodeName, false); err != nil {
  301. logrus.Errorf("Error in responding to bulk sync from node %s: %v", nodeAddr, err)
  302. }
  303. }
  304. func (nDB *NetworkDB) handleMessage(buf []byte, isBulkSync bool) {
  305. mType, data, err := decodeMessage(buf)
  306. if err != nil {
  307. logrus.Errorf("Error decoding gossip message to get message type: %v", err)
  308. return
  309. }
  310. switch mType {
  311. case MessageTypeNodeEvent:
  312. nDB.handleNodeMessage(data)
  313. case MessageTypeNetworkEvent:
  314. nDB.handleNetworkMessage(data)
  315. case MessageTypeTableEvent:
  316. nDB.handleTableMessage(data, isBulkSync)
  317. case MessageTypeBulkSync:
  318. nDB.handleBulkSync(data)
  319. case MessageTypeCompound:
  320. nDB.handleCompound(data, isBulkSync)
  321. default:
  322. logrus.Errorf("%v(%v): unknown message type %d", nDB.config.Hostname, nDB.config.NodeID, mType)
  323. }
  324. }
  325. func (d *delegate) NotifyMsg(buf []byte) {
  326. if len(buf) == 0 {
  327. return
  328. }
  329. d.nDB.handleMessage(buf, false)
  330. }
  331. func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
  332. msgs := d.nDB.networkBroadcasts.GetBroadcasts(overhead, limit)
  333. msgs = append(msgs, d.nDB.nodeBroadcasts.GetBroadcasts(overhead, limit)...)
  334. return msgs
  335. }
  336. func (d *delegate) LocalState(join bool) []byte {
  337. if join {
  338. // Update all the local node/network state to a new time to
  339. // force update on the node we are trying to rejoin, just in
  340. // case that node has these in leaving state still. This is
  341. // facilitate fast convergence after recovering from a gossip
  342. // failure.
  343. d.nDB.updateLocalNetworkTime()
  344. }
  345. d.nDB.RLock()
  346. defer d.nDB.RUnlock()
  347. pp := NetworkPushPull{
  348. LTime: d.nDB.networkClock.Time(),
  349. NodeName: d.nDB.config.NodeID,
  350. }
  351. for name, nn := range d.nDB.networks {
  352. for _, n := range nn {
  353. pp.Networks = append(pp.Networks, &NetworkEntry{
  354. LTime: n.ltime,
  355. NetworkID: n.id,
  356. NodeName: name,
  357. Leaving: n.leaving,
  358. })
  359. }
  360. }
  361. buf, err := encodeMessage(MessageTypePushPull, &pp)
  362. if err != nil {
  363. logrus.Errorf("Failed to encode local network state: %v", err)
  364. return nil
  365. }
  366. return buf
  367. }
  368. func (d *delegate) MergeRemoteState(buf []byte, isJoin bool) {
  369. if len(buf) == 0 {
  370. logrus.Error("zero byte remote network state received")
  371. return
  372. }
  373. var gMsg GossipMessage
  374. err := proto.Unmarshal(buf, &gMsg)
  375. if err != nil {
  376. logrus.Errorf("Error unmarshalling push pull message: %v", err)
  377. return
  378. }
  379. if gMsg.Type != MessageTypePushPull {
  380. logrus.Errorf("Invalid message type %v received from remote", buf[0])
  381. }
  382. pp := NetworkPushPull{}
  383. if err := proto.Unmarshal(gMsg.Data, &pp); err != nil {
  384. logrus.Errorf("Failed to decode remote network state: %v", err)
  385. return
  386. }
  387. nodeEvent := &NodeEvent{
  388. LTime: pp.LTime,
  389. NodeName: pp.NodeName,
  390. Type: NodeEventTypeJoin,
  391. }
  392. d.nDB.handleNodeEvent(nodeEvent)
  393. for _, n := range pp.Networks {
  394. nEvent := &NetworkEvent{
  395. LTime: n.LTime,
  396. NodeName: n.NodeName,
  397. NetworkID: n.NetworkID,
  398. Type: NetworkEventTypeJoin,
  399. }
  400. if n.Leaving {
  401. nEvent.Type = NetworkEventTypeLeave
  402. }
  403. d.nDB.handleNetworkEvent(nEvent)
  404. }
  405. }