networkdb.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424
  1. package networkdb
  2. import (
  3. "fmt"
  4. "strings"
  5. "sync"
  6. "time"
  7. "github.com/Sirupsen/logrus"
  8. "github.com/armon/go-radix"
  9. "github.com/docker/go-events"
  10. "github.com/hashicorp/memberlist"
  11. "github.com/hashicorp/serf/serf"
  12. )
  13. const (
  14. byTable int = 1 + iota
  15. byNetwork
  16. )
  17. // NetworkDB instance drives the networkdb cluster and acts the broker
  18. // for cluster-scoped and network-scoped gossip and watches.
  19. type NetworkDB struct {
  20. sync.RWMutex
  21. // NetworkDB configuration.
  22. config *Config
  23. // local copy of memberlist config that we use to driver
  24. // network scoped gossip and bulk sync.
  25. mConfig *memberlist.Config
  26. // All the tree index (byTable, byNetwork) that we maintain
  27. // the db.
  28. indexes map[int]*radix.Tree
  29. // Memberlist we use to drive the cluster.
  30. memberlist *memberlist.Memberlist
  31. // List of all peer nodes in the cluster not-limited to any
  32. // network.
  33. nodes map[string]*memberlist.Node
  34. // A multi-dimensional map of network/node attachmemts. The
  35. // first key is a node name and the second key is a network ID
  36. // for the network that node is participating in.
  37. networks map[string]map[string]*network
  38. // A map of nodes which are participating in a given
  39. // network. The key is a network ID.
  40. networkNodes map[string][]string
  41. // A table of ack channels for every node from which we are
  42. // waiting for an ack.
  43. bulkSyncAckTbl map[string]chan struct{}
  44. // Global lamport clock for node network attach events.
  45. networkClock serf.LamportClock
  46. // Global lamport clock for table events.
  47. tableClock serf.LamportClock
  48. // Broadcast queue for network event gossip.
  49. networkBroadcasts *memberlist.TransmitLimitedQueue
  50. // A central stop channel to stop all go routines running on
  51. // behalf of the NetworkDB instance.
  52. stopCh chan struct{}
  53. // A central broadcaster for all local watchers watching table
  54. // events.
  55. broadcaster *events.Broadcaster
  56. // List of all tickers which needed to be stopped when
  57. // cleaning up.
  58. tickers []*time.Ticker
  59. }
  60. // network describes the node/network attachment.
  61. type network struct {
  62. // Network ID
  63. id string
  64. // Lamport time for the latest state of the entry.
  65. ltime serf.LamportTime
  66. // Node leave is in progress.
  67. leaving bool
  68. // The time this node knew about the node's network leave.
  69. leaveTime time.Time
  70. // The broadcast queue for table event gossip. This is only
  71. // initialized for this node's network attachment entries.
  72. tableBroadcasts *memberlist.TransmitLimitedQueue
  73. }
  74. // Config represents the configuration of the networdb instance and
  75. // can be passed by the caller.
  76. type Config struct {
  77. // NodeName is the cluster wide unique name for this node.
  78. NodeName string
  79. // BindAddr is the local node's IP address that we bind to for
  80. // cluster communication.
  81. BindAddr string
  82. // BindPort is the local node's port to which we bind to for
  83. // cluster communication.
  84. BindPort int
  85. }
  86. // entry defines a table entry
  87. type entry struct {
  88. // node from which this entry was learned.
  89. node string
  90. // Lamport time for the most recent update to the entry
  91. ltime serf.LamportTime
  92. // Opaque value store in the entry
  93. value []byte
  94. // Deleting the entry is in progress. All entries linger in
  95. // the cluster for certain amount of time after deletion.
  96. deleting bool
  97. // The wall clock time when this node learned about this deletion.
  98. deleteTime time.Time
  99. }
  100. // New creates a new instance of NetworkDB using the Config passed by
  101. // the caller.
  102. func New(c *Config) (*NetworkDB, error) {
  103. nDB := &NetworkDB{
  104. config: c,
  105. indexes: make(map[int]*radix.Tree),
  106. networks: make(map[string]map[string]*network),
  107. nodes: make(map[string]*memberlist.Node),
  108. networkNodes: make(map[string][]string),
  109. bulkSyncAckTbl: make(map[string]chan struct{}),
  110. broadcaster: events.NewBroadcaster(),
  111. }
  112. nDB.indexes[byTable] = radix.New()
  113. nDB.indexes[byNetwork] = radix.New()
  114. if err := nDB.clusterInit(); err != nil {
  115. return nil, err
  116. }
  117. return nDB, nil
  118. }
  119. // Join joins this NetworkDB instance with a list of peer NetworkDB
  120. // instances passed by the caller in the form of addr:port
  121. func (nDB *NetworkDB) Join(members []string) error {
  122. return nDB.clusterJoin(members)
  123. }
  124. // Close destroys this NetworkDB instance by leave the cluster,
  125. // stopping timers, canceling goroutines etc.
  126. func (nDB *NetworkDB) Close() {
  127. if err := nDB.clusterLeave(); err != nil {
  128. logrus.Errorf("Could not close DB %s: %v", nDB.config.NodeName, err)
  129. }
  130. }
  131. // GetEntry retrieves the value of a table entry in a given (network,
  132. // table, key) tuple
  133. func (nDB *NetworkDB) GetEntry(tname, nid, key string) ([]byte, error) {
  134. entry, err := nDB.getEntry(tname, nid, key)
  135. if err != nil {
  136. return nil, err
  137. }
  138. return entry.value, nil
  139. }
  140. func (nDB *NetworkDB) getEntry(tname, nid, key string) (*entry, error) {
  141. nDB.RLock()
  142. defer nDB.RUnlock()
  143. e, ok := nDB.indexes[byTable].Get(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
  144. if !ok {
  145. return nil, fmt.Errorf("could not get entry in table %s with network id %s and key %s", tname, nid, key)
  146. }
  147. return e.(*entry), nil
  148. }
  149. // CreateEntry creates a table entry in NetworkDB for given (network,
  150. // table, key) tuple and if the NetworkDB is part of the cluster
  151. // propogates this event to the cluster. It is an error to create an
  152. // entry for the same tuple for which there is already an existing
  153. // entry.
  154. func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error {
  155. if _, err := nDB.GetEntry(tname, nid, key); err == nil {
  156. return fmt.Errorf("cannot create entry as the entry in table %s with network id %s and key %s already exists", tname, nid, key)
  157. }
  158. entry := &entry{
  159. ltime: nDB.tableClock.Increment(),
  160. node: nDB.config.NodeName,
  161. value: value,
  162. }
  163. if err := nDB.sendTableEvent(tableEntryCreate, nid, tname, key, entry); err != nil {
  164. return fmt.Errorf("cannot send table create event: %v", err)
  165. }
  166. nDB.Lock()
  167. nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
  168. nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
  169. nDB.Unlock()
  170. nDB.broadcaster.Write(makeEvent(opCreate, tname, nid, key, value))
  171. return nil
  172. }
  173. // UpdateEntry updates a table entry in NetworkDB for given (network,
  174. // table, key) tuple and if the NetworkDB is part of the cluster
  175. // propogates this event to the cluster. It is an error to update a
  176. // non-existent entry.
  177. func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error {
  178. if _, err := nDB.GetEntry(tname, nid, key); err != nil {
  179. return fmt.Errorf("cannot update entry as the entry in table %s with network id %s and key %s does not exist", tname, nid, key)
  180. }
  181. entry := &entry{
  182. ltime: nDB.tableClock.Increment(),
  183. node: nDB.config.NodeName,
  184. value: value,
  185. }
  186. if err := nDB.sendTableEvent(tableEntryUpdate, nid, tname, key, entry); err != nil {
  187. return fmt.Errorf("cannot send table update event: %v", err)
  188. }
  189. nDB.Lock()
  190. nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
  191. nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
  192. nDB.Unlock()
  193. nDB.broadcaster.Write(makeEvent(opUpdate, tname, nid, key, value))
  194. return nil
  195. }
  196. // DeleteEntry deletes a table entry in NetworkDB for given (network,
  197. // table, key) tuple and if the NetworkDB is part of the cluster
  198. // propogates this event to the cluster.
  199. func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {
  200. value, err := nDB.GetEntry(tname, nid, key)
  201. if err != nil {
  202. return fmt.Errorf("cannot delete entry as the entry in table %s with network id %s and key %s does not exist", tname, nid, key)
  203. }
  204. entry := &entry{
  205. ltime: nDB.tableClock.Increment(),
  206. node: nDB.config.NodeName,
  207. value: value,
  208. deleting: true,
  209. deleteTime: time.Now(),
  210. }
  211. if err := nDB.sendTableEvent(tableEntryDelete, nid, tname, key, entry); err != nil {
  212. return fmt.Errorf("cannot send table delete event: %v", err)
  213. }
  214. nDB.Lock()
  215. nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
  216. nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
  217. nDB.Unlock()
  218. nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, value))
  219. return nil
  220. }
  221. func (nDB *NetworkDB) deleteNodeTableEntries(node string) {
  222. nDB.Lock()
  223. nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
  224. oldEntry := v.(*entry)
  225. if oldEntry.node != node {
  226. return false
  227. }
  228. params := strings.Split(path[1:], "/")
  229. tname := params[0]
  230. nid := params[1]
  231. key := params[2]
  232. entry := &entry{
  233. ltime: oldEntry.ltime,
  234. node: node,
  235. value: oldEntry.value,
  236. deleting: true,
  237. deleteTime: time.Now(),
  238. }
  239. nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
  240. nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
  241. return false
  242. })
  243. nDB.Unlock()
  244. }
  245. // WalkTable walks a single table in NetworkDB and invokes the passed
  246. // function for each entry in the table passing the network, key,
  247. // value. The walk stops if the passed function returns a true.
  248. func (nDB *NetworkDB) WalkTable(tname string, fn func(string, string, []byte) bool) error {
  249. nDB.RLock()
  250. values := make(map[string]interface{})
  251. nDB.indexes[byTable].WalkPrefix(fmt.Sprintf("/%s", tname), func(path string, v interface{}) bool {
  252. values[path] = v
  253. return false
  254. })
  255. nDB.RUnlock()
  256. for k, v := range values {
  257. params := strings.Split(k[1:], "/")
  258. nid := params[1]
  259. key := params[2]
  260. if fn(nid, key, v.(*entry).value) {
  261. return nil
  262. }
  263. }
  264. return nil
  265. }
  266. // JoinNetwork joins this node to a given network and propogates this
  267. // event across the cluster. This triggers this node joining the
  268. // sub-cluster of this network and participates in the network-scoped
  269. // gossip and bulk sync for this network.
  270. func (nDB *NetworkDB) JoinNetwork(nid string) error {
  271. ltime := nDB.networkClock.Increment()
  272. if err := nDB.sendNetworkEvent(nid, networkJoin, ltime); err != nil {
  273. return fmt.Errorf("failed to send leave network event for %s: %v", nid, err)
  274. }
  275. nDB.Lock()
  276. nodeNetworks, ok := nDB.networks[nDB.config.NodeName]
  277. if !ok {
  278. nodeNetworks = make(map[string]*network)
  279. nDB.networks[nDB.config.NodeName] = nodeNetworks
  280. }
  281. nodeNetworks[nid] = &network{id: nid, ltime: ltime}
  282. nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{
  283. NumNodes: func() int {
  284. return len(nDB.networkNodes[nid])
  285. },
  286. RetransmitMult: 4,
  287. }
  288. nDB.networkNodes[nid] = append(nDB.networkNodes[nid], nDB.config.NodeName)
  289. nDB.Unlock()
  290. logrus.Debugf("%s: joined network %s", nDB.config.NodeName, nid)
  291. if _, err := nDB.bulkSync(nid, true); err != nil {
  292. logrus.Errorf("Error bulk syncing while joining network %s: %v", nid, err)
  293. }
  294. return nil
  295. }
  296. // LeaveNetwork leaves this node from a given network and propogates
  297. // this event across the cluster. This triggers this node leaving the
  298. // sub-cluster of this network and as a result will no longer
  299. // participate in the network-scoped gossip and bulk sync for this
  300. // network.
  301. func (nDB *NetworkDB) LeaveNetwork(nid string) error {
  302. ltime := nDB.networkClock.Increment()
  303. if err := nDB.sendNetworkEvent(nid, networkLeave, ltime); err != nil {
  304. return fmt.Errorf("failed to send leave network event for %s: %v", nid, err)
  305. }
  306. nDB.Lock()
  307. defer nDB.Unlock()
  308. nodeNetworks, ok := nDB.networks[nDB.config.NodeName]
  309. if !ok {
  310. return fmt.Errorf("could not find self node for network %s while trying to leave", nid)
  311. }
  312. n, ok := nodeNetworks[nid]
  313. if !ok {
  314. return fmt.Errorf("could not find network %s while trying to leave", nid)
  315. }
  316. n.ltime = ltime
  317. n.leaving = true
  318. return nil
  319. }
  320. // Deletes the node from the list of nodes which participate in the
  321. // passed network. Caller should hold the NetworkDB lock while calling
  322. // this
  323. func (nDB *NetworkDB) deleteNetworkNode(nid string, nodeName string) {
  324. nodes := nDB.networkNodes[nid]
  325. for i, name := range nodes {
  326. if name == nodeName {
  327. nodes[i] = nodes[len(nodes)-1]
  328. nodes = nodes[:len(nodes)-1]
  329. break
  330. }
  331. }
  332. nDB.networkNodes[nid] = nodes
  333. }
  334. // findCommonnetworks find the networks that both this node and the
  335. // passed node have joined.
  336. func (nDB *NetworkDB) findCommonNetworks(nodeName string) []string {
  337. nDB.RLock()
  338. defer nDB.RUnlock()
  339. var networks []string
  340. for nid := range nDB.networks[nDB.config.NodeName] {
  341. if _, ok := nDB.networks[nodeName][nid]; ok {
  342. networks = append(networks, nid)
  343. }
  344. }
  345. return networks
  346. }