cluster.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507
  1. package networkdb
  2. import (
  3. "bytes"
  4. "crypto/rand"
  5. "fmt"
  6. "math/big"
  7. rnd "math/rand"
  8. "strings"
  9. "time"
  10. "github.com/Sirupsen/logrus"
  11. "github.com/hashicorp/memberlist"
  12. )
  13. const reapInterval = 2 * time.Second
  14. type logWriter struct{}
  15. func (l *logWriter) Write(p []byte) (int, error) {
  16. str := string(p)
  17. switch {
  18. case strings.Contains(str, "[WARN]"):
  19. logrus.Warn(str)
  20. case strings.Contains(str, "[DEBUG]"):
  21. logrus.Debug(str)
  22. case strings.Contains(str, "[INFO]"):
  23. logrus.Info(str)
  24. case strings.Contains(str, "[ERR]"):
  25. logrus.Warn(str)
  26. }
  27. return len(p), nil
  28. }
  29. // SetKey adds a new key to the key ring
  30. func (nDB *NetworkDB) SetKey(key []byte) {
  31. for _, dbKey := range nDB.config.Keys {
  32. if bytes.Equal(key, dbKey) {
  33. return
  34. }
  35. }
  36. nDB.config.Keys = append(nDB.config.Keys, key)
  37. if nDB.keyring != nil {
  38. nDB.keyring.AddKey(key)
  39. }
  40. }
  41. // SetPrimaryKey sets the given key as the primary key. This should have
  42. // been added apriori through SetKey
  43. func (nDB *NetworkDB) SetPrimaryKey(key []byte) {
  44. for _, dbKey := range nDB.config.Keys {
  45. if bytes.Equal(key, dbKey) {
  46. if nDB.keyring != nil {
  47. nDB.keyring.UseKey(dbKey)
  48. }
  49. break
  50. }
  51. }
  52. }
  53. // RemoveKey removes a key from the key ring. The key being removed
  54. // can't be the primary key
  55. func (nDB *NetworkDB) RemoveKey(key []byte) {
  56. for i, dbKey := range nDB.config.Keys {
  57. if bytes.Equal(key, dbKey) {
  58. nDB.config.Keys = append(nDB.config.Keys[:i], nDB.config.Keys[i+1:]...)
  59. if nDB.keyring != nil {
  60. nDB.keyring.RemoveKey(dbKey)
  61. }
  62. break
  63. }
  64. }
  65. }
  66. func (nDB *NetworkDB) clusterInit() error {
  67. config := memberlist.DefaultLANConfig()
  68. config.Name = nDB.config.NodeName
  69. config.BindAddr = nDB.config.BindAddr
  70. if nDB.config.BindPort != 0 {
  71. config.BindPort = nDB.config.BindPort
  72. }
  73. config.ProtocolVersion = memberlist.ProtocolVersionMax
  74. config.Delegate = &delegate{nDB: nDB}
  75. config.Events = &eventDelegate{nDB: nDB}
  76. config.LogOutput = &logWriter{}
  77. var err error
  78. if len(nDB.config.Keys) > 0 {
  79. nDB.keyring, err = memberlist.NewKeyring(nDB.config.Keys, nDB.config.Keys[0])
  80. if err != nil {
  81. return err
  82. }
  83. config.Keyring = nDB.keyring
  84. }
  85. nDB.networkBroadcasts = &memberlist.TransmitLimitedQueue{
  86. NumNodes: func() int {
  87. return len(nDB.nodes)
  88. },
  89. RetransmitMult: config.RetransmitMult,
  90. }
  91. mlist, err := memberlist.Create(config)
  92. if err != nil {
  93. return fmt.Errorf("failed to create memberlist: %v", err)
  94. }
  95. nDB.stopCh = make(chan struct{})
  96. nDB.memberlist = mlist
  97. nDB.mConfig = config
  98. for _, trigger := range []struct {
  99. interval time.Duration
  100. fn func()
  101. }{
  102. {reapInterval, nDB.reapState},
  103. {config.GossipInterval, nDB.gossip},
  104. {config.PushPullInterval, nDB.bulkSyncTables},
  105. } {
  106. t := time.NewTicker(trigger.interval)
  107. go nDB.triggerFunc(trigger.interval, t.C, nDB.stopCh, trigger.fn)
  108. nDB.tickers = append(nDB.tickers, t)
  109. }
  110. return nil
  111. }
  112. func (nDB *NetworkDB) clusterJoin(members []string) error {
  113. mlist := nDB.memberlist
  114. if _, err := mlist.Join(members); err != nil {
  115. return fmt.Errorf("could not join node to memberlist: %v", err)
  116. }
  117. return nil
  118. }
  119. func (nDB *NetworkDB) clusterLeave() error {
  120. mlist := nDB.memberlist
  121. if err := mlist.Leave(time.Second); err != nil {
  122. return err
  123. }
  124. close(nDB.stopCh)
  125. for _, t := range nDB.tickers {
  126. t.Stop()
  127. }
  128. return mlist.Shutdown()
  129. }
  130. func (nDB *NetworkDB) triggerFunc(stagger time.Duration, C <-chan time.Time, stop <-chan struct{}, f func()) {
  131. // Use a random stagger to avoid syncronizing
  132. randStagger := time.Duration(uint64(rnd.Int63()) % uint64(stagger))
  133. select {
  134. case <-time.After(randStagger):
  135. case <-stop:
  136. return
  137. }
  138. for {
  139. select {
  140. case <-C:
  141. f()
  142. case <-stop:
  143. return
  144. }
  145. }
  146. }
  147. func (nDB *NetworkDB) reapState() {
  148. nDB.reapNetworks()
  149. nDB.reapTableEntries()
  150. }
  151. func (nDB *NetworkDB) reapNetworks() {
  152. now := time.Now()
  153. nDB.Lock()
  154. for name, nn := range nDB.networks {
  155. for id, n := range nn {
  156. if n.leaving && now.Sub(n.leaveTime) > reapInterval {
  157. delete(nn, id)
  158. nDB.deleteNetworkNode(id, name)
  159. }
  160. }
  161. }
  162. nDB.Unlock()
  163. }
  164. func (nDB *NetworkDB) reapTableEntries() {
  165. var (
  166. paths []string
  167. entries []*entry
  168. )
  169. now := time.Now()
  170. nDB.RLock()
  171. nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
  172. entry, ok := v.(*entry)
  173. if !ok {
  174. return false
  175. }
  176. if !entry.deleting || now.Sub(entry.deleteTime) <= reapInterval {
  177. return false
  178. }
  179. paths = append(paths, path)
  180. entries = append(entries, entry)
  181. return false
  182. })
  183. nDB.RUnlock()
  184. nDB.Lock()
  185. for i, path := range paths {
  186. entry := entries[i]
  187. params := strings.Split(path[1:], "/")
  188. tname := params[0]
  189. nid := params[1]
  190. key := params[2]
  191. if _, ok := nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key)); !ok {
  192. logrus.Errorf("Could not delete entry in table %s with network id %s and key %s as it does not exist", tname, nid, key)
  193. }
  194. if _, ok := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)); !ok {
  195. logrus.Errorf("Could not delete entry in network %s with table name %s and key %s as it does not exist", nid, tname, key)
  196. }
  197. nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value))
  198. }
  199. nDB.Unlock()
  200. }
  201. func (nDB *NetworkDB) gossip() {
  202. networkNodes := make(map[string][]string)
  203. nDB.RLock()
  204. thisNodeNetworks := nDB.networks[nDB.config.NodeName]
  205. for nid := range thisNodeNetworks {
  206. networkNodes[nid] = nDB.networkNodes[nid]
  207. }
  208. nDB.RUnlock()
  209. for nid, nodes := range networkNodes {
  210. mNodes := nDB.mRandomNodes(3, nodes)
  211. bytesAvail := udpSendBuf - compoundHeaderOverhead
  212. nDB.RLock()
  213. network, ok := thisNodeNetworks[nid]
  214. nDB.RUnlock()
  215. if !ok || network == nil {
  216. // It is normal for the network to be removed
  217. // between the time we collect the network
  218. // attachments of this node and processing
  219. // them here.
  220. continue
  221. }
  222. broadcastQ := network.tableBroadcasts
  223. if broadcastQ == nil {
  224. logrus.Errorf("Invalid broadcastQ encountered while gossiping for network %s", nid)
  225. continue
  226. }
  227. msgs := broadcastQ.GetBroadcasts(compoundOverhead, bytesAvail)
  228. if len(msgs) == 0 {
  229. continue
  230. }
  231. // Create a compound message
  232. compound := makeCompoundMessage(msgs)
  233. for _, node := range mNodes {
  234. nDB.RLock()
  235. mnode := nDB.nodes[node]
  236. nDB.RUnlock()
  237. if mnode == nil {
  238. break
  239. }
  240. // Send the compound message
  241. if err := nDB.memberlist.SendToUDP(mnode, compound); err != nil {
  242. logrus.Errorf("Failed to send gossip to %s: %s", mnode.Addr, err)
  243. }
  244. }
  245. }
  246. }
  247. func (nDB *NetworkDB) bulkSyncTables() {
  248. var networks []string
  249. nDB.RLock()
  250. for nid := range nDB.networks[nDB.config.NodeName] {
  251. networks = append(networks, nid)
  252. }
  253. nDB.RUnlock()
  254. for {
  255. if len(networks) == 0 {
  256. break
  257. }
  258. nid := networks[0]
  259. networks = networks[1:]
  260. completed, err := nDB.bulkSync(nid, false)
  261. if err != nil {
  262. logrus.Errorf("periodic bulk sync failure for network %s: %v", nid, err)
  263. continue
  264. }
  265. // Remove all the networks for which we have
  266. // successfully completed bulk sync in this iteration.
  267. updatedNetworks := make([]string, 0, len(networks))
  268. for _, nid := range networks {
  269. for _, completedNid := range completed {
  270. if nid == completedNid {
  271. continue
  272. }
  273. updatedNetworks = append(updatedNetworks, nid)
  274. }
  275. }
  276. networks = updatedNetworks
  277. }
  278. }
  279. func (nDB *NetworkDB) bulkSync(nid string, all bool) ([]string, error) {
  280. nDB.RLock()
  281. nodes := nDB.networkNodes[nid]
  282. nDB.RUnlock()
  283. if !all {
  284. // If not all, then just pick one.
  285. nodes = nDB.mRandomNodes(1, nodes)
  286. }
  287. logrus.Debugf("%s: Initiating bulk sync with nodes %v", nDB.config.NodeName, nodes)
  288. var err error
  289. var networks []string
  290. for _, node := range nodes {
  291. if node == nDB.config.NodeName {
  292. continue
  293. }
  294. networks = nDB.findCommonNetworks(node)
  295. err = nDB.bulkSyncNode(networks, node, true)
  296. if err != nil {
  297. err = fmt.Errorf("bulk sync failed on node %s: %v", node, err)
  298. }
  299. }
  300. if err != nil {
  301. return nil, err
  302. }
  303. return networks, nil
  304. }
  305. // Bulk sync all the table entries belonging to a set of networks to a
  306. // single peer node. It can be unsolicited or can be in response to an
  307. // unsolicited bulk sync
  308. func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited bool) error {
  309. var msgs [][]byte
  310. logrus.Debugf("%s: Initiating bulk sync for networks %v with node %s", nDB.config.NodeName, networks, node)
  311. nDB.RLock()
  312. mnode := nDB.nodes[node]
  313. if mnode == nil {
  314. nDB.RUnlock()
  315. return nil
  316. }
  317. for _, nid := range networks {
  318. nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid), func(path string, v interface{}) bool {
  319. entry, ok := v.(*entry)
  320. if !ok {
  321. return false
  322. }
  323. params := strings.Split(path[1:], "/")
  324. tEvent := TableEvent{
  325. Type: TableEventTypeCreate,
  326. LTime: entry.ltime,
  327. NodeName: entry.node,
  328. NetworkID: nid,
  329. TableName: params[1],
  330. Key: params[2],
  331. Value: entry.value,
  332. }
  333. msg, err := encodeMessage(MessageTypeTableEvent, &tEvent)
  334. if err != nil {
  335. logrus.Errorf("Encode failure during bulk sync: %#v", tEvent)
  336. return false
  337. }
  338. msgs = append(msgs, msg)
  339. return false
  340. })
  341. }
  342. nDB.RUnlock()
  343. // Create a compound message
  344. compound := makeCompoundMessage(msgs)
  345. bsm := BulkSyncMessage{
  346. LTime: nDB.tableClock.Time(),
  347. Unsolicited: unsolicited,
  348. NodeName: nDB.config.NodeName,
  349. Networks: networks,
  350. Payload: compound,
  351. }
  352. buf, err := encodeMessage(MessageTypeBulkSync, &bsm)
  353. if err != nil {
  354. return fmt.Errorf("failed to encode bulk sync message: %v", err)
  355. }
  356. nDB.Lock()
  357. ch := make(chan struct{})
  358. nDB.bulkSyncAckTbl[node] = ch
  359. nDB.Unlock()
  360. err = nDB.memberlist.SendToTCP(mnode, buf)
  361. if err != nil {
  362. nDB.Lock()
  363. delete(nDB.bulkSyncAckTbl, node)
  364. nDB.Unlock()
  365. return fmt.Errorf("failed to send a TCP message during bulk sync: %v", err)
  366. }
  367. // Wait on a response only if it is unsolicited.
  368. if unsolicited {
  369. startTime := time.Now()
  370. select {
  371. case <-time.After(30 * time.Second):
  372. logrus.Errorf("Bulk sync to node %s timed out", node)
  373. case <-ch:
  374. nDB.Lock()
  375. delete(nDB.bulkSyncAckTbl, node)
  376. nDB.Unlock()
  377. logrus.Debugf("%s: Bulk sync to node %s took %s", nDB.config.NodeName, node, time.Now().Sub(startTime))
  378. }
  379. }
  380. return nil
  381. }
  382. // Returns a random offset between 0 and n
  383. func randomOffset(n int) int {
  384. if n == 0 {
  385. return 0
  386. }
  387. val, err := rand.Int(rand.Reader, big.NewInt(int64(n)))
  388. if err != nil {
  389. logrus.Errorf("Failed to get a random offset: %v", err)
  390. return 0
  391. }
  392. return int(val.Int64())
  393. }
  394. // mRandomNodes is used to select up to m random nodes. It is possible
  395. // that less than m nodes are returned.
  396. func (nDB *NetworkDB) mRandomNodes(m int, nodes []string) []string {
  397. n := len(nodes)
  398. mNodes := make([]string, 0, m)
  399. OUTER:
  400. // Probe up to 3*n times, with large n this is not necessary
  401. // since k << n, but with small n we want search to be
  402. // exhaustive
  403. for i := 0; i < 3*n && len(mNodes) < m; i++ {
  404. // Get random node
  405. idx := randomOffset(n)
  406. node := nodes[idx]
  407. if node == nDB.config.NodeName {
  408. continue
  409. }
  410. // Check if we have this node already
  411. for j := 0; j < len(mNodes); j++ {
  412. if node == mNodes[j] {
  413. continue OUTER
  414. }
  415. }
  416. // Append the node
  417. mNodes = append(mNodes, node)
  418. }
  419. return mNodes
  420. }