cluster.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621
  1. package networkdb
  2. import (
  3. "bytes"
  4. "crypto/rand"
  5. "encoding/hex"
  6. "fmt"
  7. "math/big"
  8. rnd "math/rand"
  9. "net"
  10. "strings"
  11. "time"
  12. "github.com/Sirupsen/logrus"
  13. "github.com/hashicorp/memberlist"
  14. )
  15. const (
  16. reapInterval = 60 * time.Second
  17. reapPeriod = 5 * time.Second
  18. retryInterval = 1 * time.Second
  19. )
  20. type logWriter struct{}
  21. func (l *logWriter) Write(p []byte) (int, error) {
  22. str := string(p)
  23. switch {
  24. case strings.Contains(str, "[WARN]"):
  25. logrus.Warn(str)
  26. case strings.Contains(str, "[DEBUG]"):
  27. logrus.Debug(str)
  28. case strings.Contains(str, "[INFO]"):
  29. logrus.Info(str)
  30. case strings.Contains(str, "[ERR]"):
  31. logrus.Warn(str)
  32. }
  33. return len(p), nil
  34. }
  35. // SetKey adds a new key to the key ring
  36. func (nDB *NetworkDB) SetKey(key []byte) {
  37. logrus.Debugf("Adding key %s", hex.EncodeToString(key)[0:5])
  38. for _, dbKey := range nDB.config.Keys {
  39. if bytes.Equal(key, dbKey) {
  40. return
  41. }
  42. }
  43. nDB.config.Keys = append(nDB.config.Keys, key)
  44. if nDB.keyring != nil {
  45. nDB.keyring.AddKey(key)
  46. }
  47. }
  48. // SetPrimaryKey sets the given key as the primary key. This should have
  49. // been added apriori through SetKey
  50. func (nDB *NetworkDB) SetPrimaryKey(key []byte) {
  51. logrus.Debugf("Primary Key %s", hex.EncodeToString(key)[0:5])
  52. for _, dbKey := range nDB.config.Keys {
  53. if bytes.Equal(key, dbKey) {
  54. if nDB.keyring != nil {
  55. nDB.keyring.UseKey(dbKey)
  56. }
  57. break
  58. }
  59. }
  60. }
  61. // RemoveKey removes a key from the key ring. The key being removed
  62. // can't be the primary key
  63. func (nDB *NetworkDB) RemoveKey(key []byte) {
  64. logrus.Debugf("Remove Key %s", hex.EncodeToString(key)[0:5])
  65. for i, dbKey := range nDB.config.Keys {
  66. if bytes.Equal(key, dbKey) {
  67. nDB.config.Keys = append(nDB.config.Keys[:i], nDB.config.Keys[i+1:]...)
  68. if nDB.keyring != nil {
  69. nDB.keyring.RemoveKey(dbKey)
  70. }
  71. break
  72. }
  73. }
  74. }
  75. func (nDB *NetworkDB) clusterInit() error {
  76. config := memberlist.DefaultLANConfig()
  77. config.Name = nDB.config.NodeName
  78. config.BindAddr = nDB.config.BindAddr
  79. config.AdvertiseAddr = nDB.config.AdvertiseAddr
  80. if nDB.config.BindPort != 0 {
  81. config.BindPort = nDB.config.BindPort
  82. }
  83. config.ProtocolVersion = memberlist.ProtocolVersionMax
  84. config.Delegate = &delegate{nDB: nDB}
  85. config.Events = &eventDelegate{nDB: nDB}
  86. config.LogOutput = &logWriter{}
  87. var err error
  88. if len(nDB.config.Keys) > 0 {
  89. for i, key := range nDB.config.Keys {
  90. logrus.Debugf("Encryption key %d: %s", i+1, hex.EncodeToString(key)[0:5])
  91. }
  92. nDB.keyring, err = memberlist.NewKeyring(nDB.config.Keys, nDB.config.Keys[0])
  93. if err != nil {
  94. return err
  95. }
  96. config.Keyring = nDB.keyring
  97. }
  98. nDB.networkBroadcasts = &memberlist.TransmitLimitedQueue{
  99. NumNodes: func() int {
  100. nDB.RLock()
  101. num := len(nDB.nodes)
  102. nDB.RUnlock()
  103. return num
  104. },
  105. RetransmitMult: config.RetransmitMult,
  106. }
  107. nDB.nodeBroadcasts = &memberlist.TransmitLimitedQueue{
  108. NumNodes: func() int {
  109. nDB.RLock()
  110. num := len(nDB.nodes)
  111. nDB.RUnlock()
  112. return num
  113. },
  114. RetransmitMult: config.RetransmitMult,
  115. }
  116. mlist, err := memberlist.Create(config)
  117. if err != nil {
  118. return fmt.Errorf("failed to create memberlist: %v", err)
  119. }
  120. nDB.stopCh = make(chan struct{})
  121. nDB.memberlist = mlist
  122. nDB.mConfig = config
  123. for _, trigger := range []struct {
  124. interval time.Duration
  125. fn func()
  126. }{
  127. {reapPeriod, nDB.reapState},
  128. {config.GossipInterval, nDB.gossip},
  129. {config.PushPullInterval, nDB.bulkSyncTables},
  130. {retryInterval, nDB.reconnectNode},
  131. } {
  132. t := time.NewTicker(trigger.interval)
  133. go nDB.triggerFunc(trigger.interval, t.C, nDB.stopCh, trigger.fn)
  134. nDB.tickers = append(nDB.tickers, t)
  135. }
  136. return nil
  137. }
  138. func (nDB *NetworkDB) retryJoin(members []string, stop <-chan struct{}) {
  139. t := time.NewTicker(retryInterval)
  140. defer t.Stop()
  141. for {
  142. select {
  143. case <-t.C:
  144. if _, err := nDB.memberlist.Join(members); err != nil {
  145. logrus.Errorf("Failed to join memberlist %s on retry: %v", members, err)
  146. continue
  147. }
  148. if err := nDB.sendNodeEvent(NodeEventTypeJoin); err != nil {
  149. logrus.Errorf("failed to send node join on retry: %v", err)
  150. continue
  151. }
  152. return
  153. case <-stop:
  154. return
  155. }
  156. }
  157. }
  158. func (nDB *NetworkDB) clusterJoin(members []string) error {
  159. mlist := nDB.memberlist
  160. if _, err := mlist.Join(members); err != nil {
  161. // Incase of failure, keep retrying join until it succeeds or the cluster is shutdown.
  162. go nDB.retryJoin(members, nDB.stopCh)
  163. return fmt.Errorf("could not join node to memberlist: %v", err)
  164. }
  165. if err := nDB.sendNodeEvent(NodeEventTypeJoin); err != nil {
  166. return fmt.Errorf("failed to send node join: %v", err)
  167. }
  168. return nil
  169. }
  170. func (nDB *NetworkDB) clusterLeave() error {
  171. mlist := nDB.memberlist
  172. if err := nDB.sendNodeEvent(NodeEventTypeLeave); err != nil {
  173. logrus.Errorf("failed to send node leave: %v", err)
  174. }
  175. if err := mlist.Leave(time.Second); err != nil {
  176. return err
  177. }
  178. close(nDB.stopCh)
  179. for _, t := range nDB.tickers {
  180. t.Stop()
  181. }
  182. return mlist.Shutdown()
  183. }
  184. func (nDB *NetworkDB) triggerFunc(stagger time.Duration, C <-chan time.Time, stop <-chan struct{}, f func()) {
  185. // Use a random stagger to avoid syncronizing
  186. randStagger := time.Duration(uint64(rnd.Int63()) % uint64(stagger))
  187. select {
  188. case <-time.After(randStagger):
  189. case <-stop:
  190. return
  191. }
  192. for {
  193. select {
  194. case <-C:
  195. f()
  196. case <-stop:
  197. return
  198. }
  199. }
  200. }
  201. func (nDB *NetworkDB) reconnectNode() {
  202. nDB.RLock()
  203. if len(nDB.failedNodes) == 0 {
  204. nDB.RUnlock()
  205. return
  206. }
  207. nodes := make([]*node, 0, len(nDB.failedNodes))
  208. for _, n := range nDB.failedNodes {
  209. nodes = append(nodes, n)
  210. }
  211. nDB.RUnlock()
  212. node := nodes[randomOffset(len(nodes))]
  213. addr := net.UDPAddr{IP: node.Addr, Port: int(node.Port)}
  214. if _, err := nDB.memberlist.Join([]string{addr.String()}); err != nil {
  215. return
  216. }
  217. if err := nDB.sendNodeEvent(NodeEventTypeJoin); err != nil {
  218. logrus.Errorf("failed to send node join during reconnect: %v", err)
  219. return
  220. }
  221. // Update all the local table state to a new time to
  222. // force update on the node we are trying to rejoin, just in
  223. // case that node has these in deleting state still. This is
  224. // facilitate fast convergence after recovering from a gossip
  225. // failure.
  226. nDB.updateLocalTableTime()
  227. logrus.Debugf("Initiating bulk sync with node %s after reconnect", node.Name)
  228. nDB.bulkSync([]string{node.Name}, true)
  229. }
  230. func (nDB *NetworkDB) reapState() {
  231. nDB.reapNetworks()
  232. nDB.reapTableEntries()
  233. }
  234. func (nDB *NetworkDB) reapNetworks() {
  235. now := time.Now()
  236. nDB.Lock()
  237. for name, nn := range nDB.networks {
  238. for id, n := range nn {
  239. if n.leaving && now.Sub(n.leaveTime) > reapInterval {
  240. delete(nn, id)
  241. nDB.deleteNetworkNode(id, name)
  242. }
  243. }
  244. }
  245. nDB.Unlock()
  246. }
  247. func (nDB *NetworkDB) reapTableEntries() {
  248. var paths []string
  249. now := time.Now()
  250. nDB.RLock()
  251. nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
  252. entry, ok := v.(*entry)
  253. if !ok {
  254. return false
  255. }
  256. if !entry.deleting || now.Sub(entry.deleteTime) <= reapInterval {
  257. return false
  258. }
  259. paths = append(paths, path)
  260. return false
  261. })
  262. nDB.RUnlock()
  263. nDB.Lock()
  264. for _, path := range paths {
  265. params := strings.Split(path[1:], "/")
  266. tname := params[0]
  267. nid := params[1]
  268. key := params[2]
  269. if _, ok := nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key)); !ok {
  270. logrus.Errorf("Could not delete entry in table %s with network id %s and key %s as it does not exist", tname, nid, key)
  271. }
  272. if _, ok := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)); !ok {
  273. logrus.Errorf("Could not delete entry in network %s with table name %s and key %s as it does not exist", nid, tname, key)
  274. }
  275. }
  276. nDB.Unlock()
  277. }
  278. func (nDB *NetworkDB) gossip() {
  279. networkNodes := make(map[string][]string)
  280. nDB.RLock()
  281. thisNodeNetworks := nDB.networks[nDB.config.NodeName]
  282. for nid := range thisNodeNetworks {
  283. networkNodes[nid] = nDB.networkNodes[nid]
  284. }
  285. nDB.RUnlock()
  286. for nid, nodes := range networkNodes {
  287. mNodes := nDB.mRandomNodes(3, nodes)
  288. bytesAvail := udpSendBuf - compoundHeaderOverhead
  289. nDB.RLock()
  290. network, ok := thisNodeNetworks[nid]
  291. nDB.RUnlock()
  292. if !ok || network == nil {
  293. // It is normal for the network to be removed
  294. // between the time we collect the network
  295. // attachments of this node and processing
  296. // them here.
  297. continue
  298. }
  299. broadcastQ := network.tableBroadcasts
  300. if broadcastQ == nil {
  301. logrus.Errorf("Invalid broadcastQ encountered while gossiping for network %s", nid)
  302. continue
  303. }
  304. msgs := broadcastQ.GetBroadcasts(compoundOverhead, bytesAvail)
  305. if len(msgs) == 0 {
  306. continue
  307. }
  308. // Create a compound message
  309. compound := makeCompoundMessage(msgs)
  310. for _, node := range mNodes {
  311. nDB.RLock()
  312. mnode := nDB.nodes[node]
  313. nDB.RUnlock()
  314. if mnode == nil {
  315. break
  316. }
  317. // Send the compound message
  318. if err := nDB.memberlist.SendToUDP(&mnode.Node, compound); err != nil {
  319. logrus.Errorf("Failed to send gossip to %s: %s", mnode.Addr, err)
  320. }
  321. }
  322. }
  323. }
  324. func (nDB *NetworkDB) bulkSyncTables() {
  325. var networks []string
  326. nDB.RLock()
  327. for nid, network := range nDB.networks[nDB.config.NodeName] {
  328. if network.leaving {
  329. continue
  330. }
  331. networks = append(networks, nid)
  332. }
  333. nDB.RUnlock()
  334. for {
  335. if len(networks) == 0 {
  336. break
  337. }
  338. nid := networks[0]
  339. networks = networks[1:]
  340. nDB.RLock()
  341. nodes := nDB.networkNodes[nid]
  342. nDB.RUnlock()
  343. // No peer nodes on this network. Move on.
  344. if len(nodes) == 0 {
  345. continue
  346. }
  347. completed, err := nDB.bulkSync(nodes, false)
  348. if err != nil {
  349. logrus.Errorf("periodic bulk sync failure for network %s: %v", nid, err)
  350. continue
  351. }
  352. // Remove all the networks for which we have
  353. // successfully completed bulk sync in this iteration.
  354. updatedNetworks := make([]string, 0, len(networks))
  355. for _, nid := range networks {
  356. var found bool
  357. for _, completedNid := range completed {
  358. if nid == completedNid {
  359. found = true
  360. break
  361. }
  362. }
  363. if !found {
  364. updatedNetworks = append(updatedNetworks, nid)
  365. }
  366. }
  367. networks = updatedNetworks
  368. }
  369. }
  370. func (nDB *NetworkDB) bulkSync(nodes []string, all bool) ([]string, error) {
  371. if !all {
  372. // If not all, then just pick one.
  373. nodes = nDB.mRandomNodes(1, nodes)
  374. }
  375. if len(nodes) == 0 {
  376. return nil, nil
  377. }
  378. logrus.Debugf("%s: Initiating bulk sync with nodes %v", nDB.config.NodeName, nodes)
  379. var err error
  380. var networks []string
  381. for _, node := range nodes {
  382. if node == nDB.config.NodeName {
  383. continue
  384. }
  385. networks = nDB.findCommonNetworks(node)
  386. err = nDB.bulkSyncNode(networks, node, true)
  387. if err != nil {
  388. err = fmt.Errorf("bulk sync failed on node %s: %v", node, err)
  389. }
  390. }
  391. if err != nil {
  392. return nil, err
  393. }
  394. return networks, nil
  395. }
  396. // Bulk sync all the table entries belonging to a set of networks to a
  397. // single peer node. It can be unsolicited or can be in response to an
  398. // unsolicited bulk sync
  399. func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited bool) error {
  400. var msgs [][]byte
  401. var unsolMsg string
  402. if unsolicited {
  403. unsolMsg = "unsolicited"
  404. }
  405. logrus.Debugf("%s: Initiating %s bulk sync for networks %v with node %s", nDB.config.NodeName, unsolMsg, networks, node)
  406. nDB.RLock()
  407. mnode := nDB.nodes[node]
  408. if mnode == nil {
  409. nDB.RUnlock()
  410. return nil
  411. }
  412. for _, nid := range networks {
  413. nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid), func(path string, v interface{}) bool {
  414. entry, ok := v.(*entry)
  415. if !ok {
  416. return false
  417. }
  418. eType := TableEventTypeCreate
  419. if entry.deleting {
  420. eType = TableEventTypeDelete
  421. }
  422. params := strings.Split(path[1:], "/")
  423. tEvent := TableEvent{
  424. Type: eType,
  425. LTime: entry.ltime,
  426. NodeName: entry.node,
  427. NetworkID: nid,
  428. TableName: params[1],
  429. Key: params[2],
  430. Value: entry.value,
  431. }
  432. msg, err := encodeMessage(MessageTypeTableEvent, &tEvent)
  433. if err != nil {
  434. logrus.Errorf("Encode failure during bulk sync: %#v", tEvent)
  435. return false
  436. }
  437. msgs = append(msgs, msg)
  438. return false
  439. })
  440. }
  441. nDB.RUnlock()
  442. // Create a compound message
  443. compound := makeCompoundMessage(msgs)
  444. bsm := BulkSyncMessage{
  445. LTime: nDB.tableClock.Time(),
  446. Unsolicited: unsolicited,
  447. NodeName: nDB.config.NodeName,
  448. Networks: networks,
  449. Payload: compound,
  450. }
  451. buf, err := encodeMessage(MessageTypeBulkSync, &bsm)
  452. if err != nil {
  453. return fmt.Errorf("failed to encode bulk sync message: %v", err)
  454. }
  455. nDB.Lock()
  456. ch := make(chan struct{})
  457. nDB.bulkSyncAckTbl[node] = ch
  458. nDB.Unlock()
  459. err = nDB.memberlist.SendToTCP(&mnode.Node, buf)
  460. if err != nil {
  461. nDB.Lock()
  462. delete(nDB.bulkSyncAckTbl, node)
  463. nDB.Unlock()
  464. return fmt.Errorf("failed to send a TCP message during bulk sync: %v", err)
  465. }
  466. // Wait on a response only if it is unsolicited.
  467. if unsolicited {
  468. startTime := time.Now()
  469. t := time.NewTimer(30 * time.Second)
  470. select {
  471. case <-t.C:
  472. logrus.Errorf("Bulk sync to node %s timed out", node)
  473. case <-ch:
  474. logrus.Debugf("%s: Bulk sync to node %s took %s", nDB.config.NodeName, node, time.Now().Sub(startTime))
  475. }
  476. t.Stop()
  477. }
  478. return nil
  479. }
  480. // Returns a random offset between 0 and n
  481. func randomOffset(n int) int {
  482. if n == 0 {
  483. return 0
  484. }
  485. val, err := rand.Int(rand.Reader, big.NewInt(int64(n)))
  486. if err != nil {
  487. logrus.Errorf("Failed to get a random offset: %v", err)
  488. return 0
  489. }
  490. return int(val.Int64())
  491. }
  492. // mRandomNodes is used to select up to m random nodes. It is possible
  493. // that less than m nodes are returned.
  494. func (nDB *NetworkDB) mRandomNodes(m int, nodes []string) []string {
  495. n := len(nodes)
  496. mNodes := make([]string, 0, m)
  497. OUTER:
  498. // Probe up to 3*n times, with large n this is not necessary
  499. // since k << n, but with small n we want search to be
  500. // exhaustive
  501. for i := 0; i < 3*n && len(mNodes) < m; i++ {
  502. // Get random node
  503. idx := randomOffset(n)
  504. node := nodes[idx]
  505. if node == nDB.config.NodeName {
  506. continue
  507. }
  508. // Check if we have this node already
  509. for j := 0; j < len(mNodes); j++ {
  510. if node == mNodes[j] {
  511. continue OUTER
  512. }
  513. }
  514. // Append the node
  515. mNodes = append(mNodes, node)
  516. }
  517. return mNodes
  518. }