cluster.go 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439
  1. package networkdb
  2. import (
  3. "crypto/rand"
  4. "fmt"
  5. "math/big"
  6. rnd "math/rand"
  7. "strings"
  8. "time"
  9. "github.com/Sirupsen/logrus"
  10. "github.com/hashicorp/memberlist"
  11. "github.com/hashicorp/serf/serf"
  12. )
  13. const reapInterval = 2 * time.Second
  14. type logWriter struct{}
  15. func (l *logWriter) Write(p []byte) (int, error) {
  16. str := string(p)
  17. switch {
  18. case strings.Contains(str, "[WARN]"):
  19. logrus.Warn(str)
  20. case strings.Contains(str, "[DEBUG]"):
  21. logrus.Debug(str)
  22. case strings.Contains(str, "[INFO]"):
  23. logrus.Info(str)
  24. case strings.Contains(str, "[ERR]"):
  25. logrus.Warn(str)
  26. }
  27. return len(p), nil
  28. }
  29. func (nDB *NetworkDB) clusterInit() error {
  30. config := memberlist.DefaultLANConfig()
  31. config.Name = nDB.config.NodeName
  32. config.BindAddr = nDB.config.BindAddr
  33. if nDB.config.BindPort != 0 {
  34. config.BindPort = nDB.config.BindPort
  35. }
  36. config.ProtocolVersion = memberlist.ProtocolVersionMax
  37. config.Delegate = &delegate{nDB: nDB}
  38. config.Events = &eventDelegate{nDB: nDB}
  39. config.LogOutput = &logWriter{}
  40. nDB.networkBroadcasts = &memberlist.TransmitLimitedQueue{
  41. NumNodes: func() int {
  42. return len(nDB.nodes)
  43. },
  44. RetransmitMult: config.RetransmitMult,
  45. }
  46. mlist, err := memberlist.Create(config)
  47. if err != nil {
  48. return fmt.Errorf("failed to create memberlist: %v", err)
  49. }
  50. nDB.stopCh = make(chan struct{})
  51. nDB.memberlist = mlist
  52. nDB.mConfig = config
  53. for _, trigger := range []struct {
  54. interval time.Duration
  55. fn func()
  56. }{
  57. {reapInterval, nDB.reapState},
  58. {config.GossipInterval, nDB.gossip},
  59. {config.PushPullInterval, nDB.bulkSyncTables},
  60. } {
  61. t := time.NewTicker(trigger.interval)
  62. go nDB.triggerFunc(trigger.interval, t.C, nDB.stopCh, trigger.fn)
  63. nDB.tickers = append(nDB.tickers, t)
  64. }
  65. return nil
  66. }
  67. func (nDB *NetworkDB) clusterJoin(members []string) error {
  68. mlist := nDB.memberlist
  69. if _, err := mlist.Join(members); err != nil {
  70. return fmt.Errorf("could not join node to memberlist: %v", err)
  71. }
  72. return nil
  73. }
  74. func (nDB *NetworkDB) clusterLeave() error {
  75. mlist := nDB.memberlist
  76. if err := mlist.Leave(time.Second); err != nil {
  77. return err
  78. }
  79. close(nDB.stopCh)
  80. for _, t := range nDB.tickers {
  81. t.Stop()
  82. }
  83. return mlist.Shutdown()
  84. }
  85. func (nDB *NetworkDB) triggerFunc(stagger time.Duration, C <-chan time.Time, stop <-chan struct{}, f func()) {
  86. // Use a random stagger to avoid syncronizing
  87. randStagger := time.Duration(uint64(rnd.Int63()) % uint64(stagger))
  88. select {
  89. case <-time.After(randStagger):
  90. case <-stop:
  91. return
  92. }
  93. for {
  94. select {
  95. case <-C:
  96. f()
  97. case <-stop:
  98. return
  99. }
  100. }
  101. }
  102. func (nDB *NetworkDB) reapState() {
  103. nDB.reapNetworks()
  104. nDB.reapTableEntries()
  105. }
  106. func (nDB *NetworkDB) reapNetworks() {
  107. now := time.Now()
  108. nDB.Lock()
  109. for name, nn := range nDB.networks {
  110. for id, n := range nn {
  111. if n.leaving && now.Sub(n.leaveTime) > reapInterval {
  112. delete(nn, id)
  113. nDB.deleteNetworkNode(id, name)
  114. }
  115. }
  116. }
  117. nDB.Unlock()
  118. }
  119. func (nDB *NetworkDB) reapTableEntries() {
  120. var paths []string
  121. now := time.Now()
  122. nDB.RLock()
  123. nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
  124. entry, ok := v.(*entry)
  125. if !ok {
  126. return false
  127. }
  128. if !entry.deleting || now.Sub(entry.deleteTime) <= reapInterval {
  129. return false
  130. }
  131. paths = append(paths, path)
  132. return false
  133. })
  134. nDB.RUnlock()
  135. nDB.Lock()
  136. for _, path := range paths {
  137. params := strings.Split(path[1:], "/")
  138. tname := params[0]
  139. nid := params[1]
  140. key := params[2]
  141. if _, ok := nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key)); !ok {
  142. logrus.Errorf("Could not delete entry in table %s with network id %s and key %s as it does not exist", tname, nid, key)
  143. }
  144. if _, ok := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)); !ok {
  145. logrus.Errorf("Could not delete entry in network %s with table name %s and key %s as it does not exist", nid, tname, key)
  146. }
  147. }
  148. nDB.Unlock()
  149. }
  150. func (nDB *NetworkDB) gossip() {
  151. networkNodes := make(map[string][]string)
  152. nDB.RLock()
  153. for nid := range nDB.networks[nDB.config.NodeName] {
  154. networkNodes[nid] = nDB.networkNodes[nid]
  155. }
  156. nDB.RUnlock()
  157. for nid, nodes := range networkNodes {
  158. mNodes := nDB.mRandomNodes(3, nodes)
  159. bytesAvail := udpSendBuf - compoundHeaderOverhead
  160. nDB.RLock()
  161. broadcastQ := nDB.networks[nDB.config.NodeName][nid].tableBroadcasts
  162. nDB.RUnlock()
  163. msgs := broadcastQ.GetBroadcasts(compoundOverhead, bytesAvail)
  164. if len(msgs) == 0 {
  165. break
  166. }
  167. // Create a compound message
  168. compound := makeCompoundMessage(msgs)
  169. for _, node := range mNodes {
  170. nDB.RLock()
  171. mnode := nDB.nodes[node]
  172. nDB.RUnlock()
  173. if mnode == nil {
  174. break
  175. }
  176. // Send the compound message
  177. if err := nDB.memberlist.SendToUDP(mnode, compound.Bytes()); err != nil {
  178. logrus.Errorf("Failed to send gossip to %s: %s", mnode.Addr, err)
  179. }
  180. }
  181. }
  182. }
  183. type bulkSyncMessage struct {
  184. LTime serf.LamportTime
  185. Unsolicited bool
  186. NodeName string
  187. Networks []string
  188. Payload []byte
  189. }
  190. func (nDB *NetworkDB) bulkSyncTables() {
  191. var networks []string
  192. nDB.RLock()
  193. for nid := range nDB.networks[nDB.config.NodeName] {
  194. networks = append(networks, nid)
  195. }
  196. nDB.RUnlock()
  197. for {
  198. if len(networks) == 0 {
  199. break
  200. }
  201. nid := networks[0]
  202. networks = networks[1:]
  203. completed, err := nDB.bulkSync(nid, false)
  204. if err != nil {
  205. logrus.Errorf("periodic bulk sync failure for network %s: %v", nid, err)
  206. continue
  207. }
  208. // Remove all the networks for which we have
  209. // successfully completed bulk sync in this iteration.
  210. updatedNetworks := make([]string, 0, len(networks))
  211. for _, nid := range networks {
  212. for _, completedNid := range completed {
  213. if nid == completedNid {
  214. continue
  215. }
  216. updatedNetworks = append(updatedNetworks, nid)
  217. }
  218. }
  219. networks = updatedNetworks
  220. }
  221. }
  222. func (nDB *NetworkDB) bulkSync(nid string, all bool) ([]string, error) {
  223. nDB.RLock()
  224. nodes := nDB.networkNodes[nid]
  225. nDB.RUnlock()
  226. if !all {
  227. // If not all, then just pick one.
  228. nodes = nDB.mRandomNodes(1, nodes)
  229. }
  230. logrus.Debugf("%s: Initiating bulk sync with nodes %v", nDB.config.NodeName, nodes)
  231. var err error
  232. var networks []string
  233. for _, node := range nodes {
  234. if node == nDB.config.NodeName {
  235. continue
  236. }
  237. networks = nDB.findCommonNetworks(node)
  238. err = nDB.bulkSyncNode(networks, node, true)
  239. if err != nil {
  240. err = fmt.Errorf("bulk sync failed on node %s: %v", node, err)
  241. }
  242. }
  243. if err != nil {
  244. return nil, err
  245. }
  246. return networks, nil
  247. }
  248. // Bulk sync all the table entries belonging to a set of networks to a
  249. // single peer node. It can be unsolicited or can be in response to an
  250. // unsolicited bulk sync
  251. func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited bool) error {
  252. var msgs [][]byte
  253. logrus.Debugf("%s: Initiating bulk sync for networks %v with node %s", nDB.config.NodeName, networks, node)
  254. nDB.RLock()
  255. mnode := nDB.nodes[node]
  256. if mnode == nil {
  257. nDB.RUnlock()
  258. return nil
  259. }
  260. for _, nid := range networks {
  261. nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid), func(path string, v interface{}) bool {
  262. entry, ok := v.(*entry)
  263. if !ok {
  264. return false
  265. }
  266. params := strings.Split(path[1:], "/")
  267. tEvent := tableEventData{
  268. Event: tableEntryCreate,
  269. LTime: entry.ltime,
  270. NodeName: entry.node,
  271. NetworkID: nid,
  272. TableName: params[1],
  273. Key: params[2],
  274. Value: entry.value,
  275. }
  276. msg, err := encodeMessage(tableEventMsg, &tEvent)
  277. if err != nil {
  278. logrus.Errorf("Encode failure during bulk sync: %#v", tEvent)
  279. return false
  280. }
  281. msgs = append(msgs, msg)
  282. return false
  283. })
  284. }
  285. nDB.RUnlock()
  286. // Create a compound message
  287. compound := makeCompoundMessage(msgs)
  288. bsm := bulkSyncMessage{
  289. LTime: nDB.tableClock.Time(),
  290. Unsolicited: unsolicited,
  291. NodeName: nDB.config.NodeName,
  292. Networks: networks,
  293. Payload: compound.Bytes(),
  294. }
  295. buf, err := encodeMessage(bulkSyncMsg, &bsm)
  296. if err != nil {
  297. return fmt.Errorf("failed to encode bulk sync message: %v", err)
  298. }
  299. nDB.Lock()
  300. ch := make(chan struct{})
  301. nDB.bulkSyncAckTbl[node] = ch
  302. nDB.Unlock()
  303. err = nDB.memberlist.SendToTCP(mnode, buf)
  304. if err != nil {
  305. nDB.Lock()
  306. delete(nDB.bulkSyncAckTbl, node)
  307. nDB.Unlock()
  308. return fmt.Errorf("failed to send a TCP message during bulk sync: %v", err)
  309. }
  310. startTime := time.Now()
  311. select {
  312. case <-time.After(30 * time.Second):
  313. logrus.Errorf("Bulk sync to node %s timed out", node)
  314. case <-ch:
  315. nDB.Lock()
  316. delete(nDB.bulkSyncAckTbl, node)
  317. nDB.Unlock()
  318. logrus.Debugf("%s: Bulk sync to node %s took %s", nDB.config.NodeName, node, time.Now().Sub(startTime))
  319. }
  320. return nil
  321. }
  322. // Returns a random offset between 0 and n
  323. func randomOffset(n int) int {
  324. if n == 0 {
  325. return 0
  326. }
  327. val, err := rand.Int(rand.Reader, big.NewInt(int64(n)))
  328. if err != nil {
  329. logrus.Errorf("Failed to get a random offset: %v", err)
  330. return 0
  331. }
  332. return int(val.Int64())
  333. }
  334. // mRandomNodes is used to select up to m random nodes. It is possible
  335. // that less than m nodes are returned.
  336. func (nDB *NetworkDB) mRandomNodes(m int, nodes []string) []string {
  337. n := len(nodes)
  338. mNodes := make([]string, 0, m)
  339. OUTER:
  340. // Probe up to 3*n times, with large n this is not necessary
  341. // since k << n, but with small n we want search to be
  342. // exhaustive
  343. for i := 0; i < 3*n && len(mNodes) < m; i++ {
  344. // Get random node
  345. idx := randomOffset(n)
  346. node := nodes[idx]
  347. if node == nDB.config.NodeName {
  348. continue
  349. }
  350. // Check if we have this node already
  351. for j := 0; j < len(mNodes); j++ {
  352. if node == mNodes[j] {
  353. continue OUTER
  354. }
  355. }
  356. // Append the node
  357. mNodes = append(mNodes, node)
  358. }
  359. return mNodes
  360. }