cluster.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500
  1. package networkdb
  2. import (
  3. "bytes"
  4. "crypto/rand"
  5. "fmt"
  6. "math/big"
  7. rnd "math/rand"
  8. "strings"
  9. "time"
  10. "github.com/Sirupsen/logrus"
  11. "github.com/hashicorp/memberlist"
  12. )
  13. const reapInterval = 2 * time.Second
  14. type logWriter struct{}
  15. func (l *logWriter) Write(p []byte) (int, error) {
  16. str := string(p)
  17. switch {
  18. case strings.Contains(str, "[WARN]"):
  19. logrus.Warn(str)
  20. case strings.Contains(str, "[DEBUG]"):
  21. logrus.Debug(str)
  22. case strings.Contains(str, "[INFO]"):
  23. logrus.Info(str)
  24. case strings.Contains(str, "[ERR]"):
  25. logrus.Warn(str)
  26. }
  27. return len(p), nil
  28. }
  29. // SetKey adds a new key to the key ring
  30. func (nDB *NetworkDB) SetKey(key []byte) {
  31. for _, dbKey := range nDB.config.Keys {
  32. if bytes.Equal(key, dbKey) {
  33. return
  34. }
  35. }
  36. nDB.config.Keys = append(nDB.config.Keys, key)
  37. if nDB.keyring != nil {
  38. nDB.keyring.AddKey(key)
  39. }
  40. }
  41. // SetPrimaryKey sets the given key as the primary key. This should have
  42. // been added apriori through SetKey
  43. func (nDB *NetworkDB) SetPrimaryKey(key []byte) {
  44. for _, dbKey := range nDB.config.Keys {
  45. if bytes.Equal(key, dbKey) {
  46. if nDB.keyring != nil {
  47. nDB.keyring.UseKey(dbKey)
  48. }
  49. break
  50. }
  51. }
  52. }
  53. // RemoveKey removes a key from the key ring. The key being removed
  54. // can't be the primary key
  55. func (nDB *NetworkDB) RemoveKey(key []byte) {
  56. for i, dbKey := range nDB.config.Keys {
  57. if bytes.Equal(key, dbKey) {
  58. nDB.config.Keys = append(nDB.config.Keys[:i], nDB.config.Keys[i+1:]...)
  59. if nDB.keyring != nil {
  60. nDB.keyring.RemoveKey(dbKey)
  61. }
  62. break
  63. }
  64. }
  65. }
  66. func (nDB *NetworkDB) clusterInit() error {
  67. config := memberlist.DefaultLANConfig()
  68. config.Name = nDB.config.NodeName
  69. config.BindAddr = nDB.config.BindAddr
  70. if nDB.config.BindPort != 0 {
  71. config.BindPort = nDB.config.BindPort
  72. }
  73. config.ProtocolVersion = memberlist.ProtocolVersionMax
  74. config.Delegate = &delegate{nDB: nDB}
  75. config.Events = &eventDelegate{nDB: nDB}
  76. config.LogOutput = &logWriter{}
  77. var err error
  78. if len(nDB.config.Keys) > 0 {
  79. nDB.keyring, err = memberlist.NewKeyring(nDB.config.Keys, nDB.config.Keys[0])
  80. if err != nil {
  81. return err
  82. }
  83. config.Keyring = nDB.keyring
  84. }
  85. nDB.networkBroadcasts = &memberlist.TransmitLimitedQueue{
  86. NumNodes: func() int {
  87. return len(nDB.nodes)
  88. },
  89. RetransmitMult: config.RetransmitMult,
  90. }
  91. mlist, err := memberlist.Create(config)
  92. if err != nil {
  93. return fmt.Errorf("failed to create memberlist: %v", err)
  94. }
  95. nDB.stopCh = make(chan struct{})
  96. nDB.memberlist = mlist
  97. nDB.mConfig = config
  98. for _, trigger := range []struct {
  99. interval time.Duration
  100. fn func()
  101. }{
  102. {reapInterval, nDB.reapState},
  103. {config.GossipInterval, nDB.gossip},
  104. {config.PushPullInterval, nDB.bulkSyncTables},
  105. } {
  106. t := time.NewTicker(trigger.interval)
  107. go nDB.triggerFunc(trigger.interval, t.C, nDB.stopCh, trigger.fn)
  108. nDB.tickers = append(nDB.tickers, t)
  109. }
  110. return nil
  111. }
  112. func (nDB *NetworkDB) clusterJoin(members []string) error {
  113. mlist := nDB.memberlist
  114. if _, err := mlist.Join(members); err != nil {
  115. return fmt.Errorf("could not join node to memberlist: %v", err)
  116. }
  117. return nil
  118. }
  119. func (nDB *NetworkDB) clusterLeave() error {
  120. mlist := nDB.memberlist
  121. if err := mlist.Leave(time.Second); err != nil {
  122. return err
  123. }
  124. close(nDB.stopCh)
  125. for _, t := range nDB.tickers {
  126. t.Stop()
  127. }
  128. return mlist.Shutdown()
  129. }
  130. func (nDB *NetworkDB) triggerFunc(stagger time.Duration, C <-chan time.Time, stop <-chan struct{}, f func()) {
  131. // Use a random stagger to avoid syncronizing
  132. randStagger := time.Duration(uint64(rnd.Int63()) % uint64(stagger))
  133. select {
  134. case <-time.After(randStagger):
  135. case <-stop:
  136. return
  137. }
  138. for {
  139. select {
  140. case <-C:
  141. f()
  142. case <-stop:
  143. return
  144. }
  145. }
  146. }
  147. func (nDB *NetworkDB) reapState() {
  148. nDB.reapNetworks()
  149. nDB.reapTableEntries()
  150. }
  151. func (nDB *NetworkDB) reapNetworks() {
  152. now := time.Now()
  153. nDB.Lock()
  154. for name, nn := range nDB.networks {
  155. for id, n := range nn {
  156. if n.leaving && now.Sub(n.leaveTime) > reapInterval {
  157. delete(nn, id)
  158. nDB.deleteNetworkNode(id, name)
  159. }
  160. }
  161. }
  162. nDB.Unlock()
  163. }
  164. func (nDB *NetworkDB) reapTableEntries() {
  165. var paths []string
  166. now := time.Now()
  167. nDB.RLock()
  168. nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
  169. entry, ok := v.(*entry)
  170. if !ok {
  171. return false
  172. }
  173. if !entry.deleting || now.Sub(entry.deleteTime) <= reapInterval {
  174. return false
  175. }
  176. paths = append(paths, path)
  177. return false
  178. })
  179. nDB.RUnlock()
  180. nDB.Lock()
  181. for _, path := range paths {
  182. params := strings.Split(path[1:], "/")
  183. tname := params[0]
  184. nid := params[1]
  185. key := params[2]
  186. if _, ok := nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key)); !ok {
  187. logrus.Errorf("Could not delete entry in table %s with network id %s and key %s as it does not exist", tname, nid, key)
  188. }
  189. if _, ok := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)); !ok {
  190. logrus.Errorf("Could not delete entry in network %s with table name %s and key %s as it does not exist", nid, tname, key)
  191. }
  192. }
  193. nDB.Unlock()
  194. }
  195. func (nDB *NetworkDB) gossip() {
  196. networkNodes := make(map[string][]string)
  197. nDB.RLock()
  198. thisNodeNetworks := nDB.networks[nDB.config.NodeName]
  199. for nid := range thisNodeNetworks {
  200. networkNodes[nid] = nDB.networkNodes[nid]
  201. }
  202. nDB.RUnlock()
  203. for nid, nodes := range networkNodes {
  204. mNodes := nDB.mRandomNodes(3, nodes)
  205. bytesAvail := udpSendBuf - compoundHeaderOverhead
  206. nDB.RLock()
  207. network, ok := thisNodeNetworks[nid]
  208. nDB.RUnlock()
  209. if !ok || network == nil {
  210. // It is normal for the network to be removed
  211. // between the time we collect the network
  212. // attachments of this node and processing
  213. // them here.
  214. continue
  215. }
  216. broadcastQ := network.tableBroadcasts
  217. if broadcastQ == nil {
  218. logrus.Errorf("Invalid broadcastQ encountered while gossiping for network %s", nid)
  219. continue
  220. }
  221. msgs := broadcastQ.GetBroadcasts(compoundOverhead, bytesAvail)
  222. if len(msgs) == 0 {
  223. continue
  224. }
  225. // Create a compound message
  226. compound := makeCompoundMessage(msgs)
  227. for _, node := range mNodes {
  228. nDB.RLock()
  229. mnode := nDB.nodes[node]
  230. nDB.RUnlock()
  231. if mnode == nil {
  232. break
  233. }
  234. // Send the compound message
  235. if err := nDB.memberlist.SendToUDP(mnode, compound); err != nil {
  236. logrus.Errorf("Failed to send gossip to %s: %s", mnode.Addr, err)
  237. }
  238. }
  239. }
  240. }
  241. func (nDB *NetworkDB) bulkSyncTables() {
  242. var networks []string
  243. nDB.RLock()
  244. for nid := range nDB.networks[nDB.config.NodeName] {
  245. networks = append(networks, nid)
  246. }
  247. nDB.RUnlock()
  248. for {
  249. if len(networks) == 0 {
  250. break
  251. }
  252. nid := networks[0]
  253. networks = networks[1:]
  254. completed, err := nDB.bulkSync(nid, false)
  255. if err != nil {
  256. logrus.Errorf("periodic bulk sync failure for network %s: %v", nid, err)
  257. continue
  258. }
  259. // Remove all the networks for which we have
  260. // successfully completed bulk sync in this iteration.
  261. updatedNetworks := make([]string, 0, len(networks))
  262. for _, nid := range networks {
  263. for _, completedNid := range completed {
  264. if nid == completedNid {
  265. continue
  266. }
  267. updatedNetworks = append(updatedNetworks, nid)
  268. }
  269. }
  270. networks = updatedNetworks
  271. }
  272. }
  273. func (nDB *NetworkDB) bulkSync(nid string, all bool) ([]string, error) {
  274. nDB.RLock()
  275. nodes := nDB.networkNodes[nid]
  276. nDB.RUnlock()
  277. if !all {
  278. // If not all, then just pick one.
  279. nodes = nDB.mRandomNodes(1, nodes)
  280. }
  281. logrus.Debugf("%s: Initiating bulk sync with nodes %v", nDB.config.NodeName, nodes)
  282. var err error
  283. var networks []string
  284. for _, node := range nodes {
  285. if node == nDB.config.NodeName {
  286. continue
  287. }
  288. networks = nDB.findCommonNetworks(node)
  289. err = nDB.bulkSyncNode(networks, node, true)
  290. if err != nil {
  291. err = fmt.Errorf("bulk sync failed on node %s: %v", node, err)
  292. }
  293. }
  294. if err != nil {
  295. return nil, err
  296. }
  297. return networks, nil
  298. }
  299. // Bulk sync all the table entries belonging to a set of networks to a
  300. // single peer node. It can be unsolicited or can be in response to an
  301. // unsolicited bulk sync
  302. func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited bool) error {
  303. var msgs [][]byte
  304. logrus.Debugf("%s: Initiating bulk sync for networks %v with node %s", nDB.config.NodeName, networks, node)
  305. nDB.RLock()
  306. mnode := nDB.nodes[node]
  307. if mnode == nil {
  308. nDB.RUnlock()
  309. return nil
  310. }
  311. for _, nid := range networks {
  312. nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid), func(path string, v interface{}) bool {
  313. entry, ok := v.(*entry)
  314. if !ok {
  315. return false
  316. }
  317. params := strings.Split(path[1:], "/")
  318. tEvent := TableEvent{
  319. Type: TableEventTypeCreate,
  320. LTime: entry.ltime,
  321. NodeName: entry.node,
  322. NetworkID: nid,
  323. TableName: params[1],
  324. Key: params[2],
  325. Value: entry.value,
  326. }
  327. msg, err := encodeMessage(MessageTypeTableEvent, &tEvent)
  328. if err != nil {
  329. logrus.Errorf("Encode failure during bulk sync: %#v", tEvent)
  330. return false
  331. }
  332. msgs = append(msgs, msg)
  333. return false
  334. })
  335. }
  336. nDB.RUnlock()
  337. // Create a compound message
  338. compound := makeCompoundMessage(msgs)
  339. bsm := BulkSyncMessage{
  340. LTime: nDB.tableClock.Time(),
  341. Unsolicited: unsolicited,
  342. NodeName: nDB.config.NodeName,
  343. Networks: networks,
  344. Payload: compound,
  345. }
  346. buf, err := encodeMessage(MessageTypeBulkSync, &bsm)
  347. if err != nil {
  348. return fmt.Errorf("failed to encode bulk sync message: %v", err)
  349. }
  350. nDB.Lock()
  351. ch := make(chan struct{})
  352. nDB.bulkSyncAckTbl[node] = ch
  353. nDB.Unlock()
  354. err = nDB.memberlist.SendToTCP(mnode, buf)
  355. if err != nil {
  356. nDB.Lock()
  357. delete(nDB.bulkSyncAckTbl, node)
  358. nDB.Unlock()
  359. return fmt.Errorf("failed to send a TCP message during bulk sync: %v", err)
  360. }
  361. // Wait on a response only if it is unsolicited.
  362. if unsolicited {
  363. startTime := time.Now()
  364. select {
  365. case <-time.After(30 * time.Second):
  366. logrus.Errorf("Bulk sync to node %s timed out", node)
  367. case <-ch:
  368. nDB.Lock()
  369. delete(nDB.bulkSyncAckTbl, node)
  370. nDB.Unlock()
  371. logrus.Debugf("%s: Bulk sync to node %s took %s", nDB.config.NodeName, node, time.Now().Sub(startTime))
  372. }
  373. }
  374. return nil
  375. }
  376. // Returns a random offset between 0 and n
  377. func randomOffset(n int) int {
  378. if n == 0 {
  379. return 0
  380. }
  381. val, err := rand.Int(rand.Reader, big.NewInt(int64(n)))
  382. if err != nil {
  383. logrus.Errorf("Failed to get a random offset: %v", err)
  384. return 0
  385. }
  386. return int(val.Int64())
  387. }
  388. // mRandomNodes is used to select up to m random nodes. It is possible
  389. // that less than m nodes are returned.
  390. func (nDB *NetworkDB) mRandomNodes(m int, nodes []string) []string {
  391. n := len(nodes)
  392. mNodes := make([]string, 0, m)
  393. OUTER:
  394. // Probe up to 3*n times, with large n this is not necessary
  395. // since k << n, but with small n we want search to be
  396. // exhaustive
  397. for i := 0; i < 3*n && len(mNodes) < m; i++ {
  398. // Get random node
  399. idx := randomOffset(n)
  400. node := nodes[idx]
  401. if node == nDB.config.NodeName {
  402. continue
  403. }
  404. // Check if we have this node already
  405. for j := 0; j < len(mNodes); j++ {
  406. if node == mNodes[j] {
  407. continue OUTER
  408. }
  409. }
  410. // Append the node
  411. mNodes = append(mNodes, node)
  412. }
  413. return mNodes
  414. }