iradix.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657
  1. package iradix
  2. import (
  3. "bytes"
  4. "strings"
  5. "github.com/hashicorp/golang-lru/simplelru"
  6. )
  7. const (
  8. // defaultModifiedCache is the default size of the modified node
  9. // cache used per transaction. This is used to cache the updates
  10. // to the nodes near the root, while the leaves do not need to be
  11. // cached. This is important for very large transactions to prevent
  12. // the modified cache from growing to be enormous. This is also used
  13. // to set the max size of the mutation notify maps since those should
  14. // also be bounded in a similar way.
  15. defaultModifiedCache = 8192
  16. )
  17. // Tree implements an immutable radix tree. This can be treated as a
  18. // Dictionary abstract data type. The main advantage over a standard
  19. // hash map is prefix-based lookups and ordered iteration. The immutability
  20. // means that it is safe to concurrently read from a Tree without any
  21. // coordination.
  22. type Tree struct {
  23. root *Node
  24. size int
  25. }
  26. // New returns an empty Tree
  27. func New() *Tree {
  28. t := &Tree{
  29. root: &Node{
  30. mutateCh: make(chan struct{}),
  31. },
  32. }
  33. return t
  34. }
  35. // Len is used to return the number of elements in the tree
  36. func (t *Tree) Len() int {
  37. return t.size
  38. }
  39. // Txn is a transaction on the tree. This transaction is applied
  40. // atomically and returns a new tree when committed. A transaction
  41. // is not thread safe, and should only be used by a single goroutine.
  42. type Txn struct {
  43. // root is the modified root for the transaction.
  44. root *Node
  45. // snap is a snapshot of the root node for use if we have to run the
  46. // slow notify algorithm.
  47. snap *Node
  48. // size tracks the size of the tree as it is modified during the
  49. // transaction.
  50. size int
  51. // writable is a cache of writable nodes that have been created during
  52. // the course of the transaction. This allows us to re-use the same
  53. // nodes for further writes and avoid unnecessary copies of nodes that
  54. // have never been exposed outside the transaction. This will only hold
  55. // up to defaultModifiedCache number of entries.
  56. writable *simplelru.LRU
  57. // trackChannels is used to hold channels that need to be notified to
  58. // signal mutation of the tree. This will only hold up to
  59. // defaultModifiedCache number of entries, after which we will set the
  60. // trackOverflow flag, which will cause us to use a more expensive
  61. // algorithm to perform the notifications. Mutation tracking is only
  62. // performed if trackMutate is true.
  63. trackChannels map[chan struct{}]struct{}
  64. trackOverflow bool
  65. trackMutate bool
  66. }
  67. // Txn starts a new transaction that can be used to mutate the tree
  68. func (t *Tree) Txn() *Txn {
  69. txn := &Txn{
  70. root: t.root,
  71. snap: t.root,
  72. size: t.size,
  73. }
  74. return txn
  75. }
  76. // TrackMutate can be used to toggle if mutations are tracked. If this is enabled
  77. // then notifications will be issued for affected internal nodes and leaves when
  78. // the transaction is committed.
  79. func (t *Txn) TrackMutate(track bool) {
  80. t.trackMutate = track
  81. }
  82. // trackChannel safely attempts to track the given mutation channel, setting the
  83. // overflow flag if we can no longer track any more. This limits the amount of
  84. // state that will accumulate during a transaction and we have a slower algorithm
  85. // to switch to if we overflow.
  86. func (t *Txn) trackChannel(ch chan struct{}) {
  87. // In overflow, make sure we don't store any more objects.
  88. if t.trackOverflow {
  89. return
  90. }
  91. // If this would overflow the state we reject it and set the flag (since
  92. // we aren't tracking everything that's required any longer).
  93. if len(t.trackChannels) >= defaultModifiedCache {
  94. // Mark that we are in the overflow state
  95. t.trackOverflow = true
  96. // Clear the map so that the channels can be garbage collected. It is
  97. // safe to do this since we have already overflowed and will be using
  98. // the slow notify algorithm.
  99. t.trackChannels = nil
  100. return
  101. }
  102. // Create the map on the fly when we need it.
  103. if t.trackChannels == nil {
  104. t.trackChannels = make(map[chan struct{}]struct{})
  105. }
  106. // Otherwise we are good to track it.
  107. t.trackChannels[ch] = struct{}{}
  108. }
  109. // writeNode returns a node to be modified, if the current node has already been
  110. // modified during the course of the transaction, it is used in-place. Set
  111. // forLeafUpdate to true if you are getting a write node to update the leaf,
  112. // which will set leaf mutation tracking appropriately as well.
  113. func (t *Txn) writeNode(n *Node, forLeafUpdate bool) *Node {
  114. // Ensure the writable set exists.
  115. if t.writable == nil {
  116. lru, err := simplelru.NewLRU(defaultModifiedCache, nil)
  117. if err != nil {
  118. panic(err)
  119. }
  120. t.writable = lru
  121. }
  122. // If this node has already been modified, we can continue to use it
  123. // during this transaction. We know that we don't need to track it for
  124. // a node update since the node is writable, but if this is for a leaf
  125. // update we track it, in case the initial write to this node didn't
  126. // update the leaf.
  127. if _, ok := t.writable.Get(n); ok {
  128. if t.trackMutate && forLeafUpdate && n.leaf != nil {
  129. t.trackChannel(n.leaf.mutateCh)
  130. }
  131. return n
  132. }
  133. // Mark this node as being mutated.
  134. if t.trackMutate {
  135. t.trackChannel(n.mutateCh)
  136. }
  137. // Mark its leaf as being mutated, if appropriate.
  138. if t.trackMutate && forLeafUpdate && n.leaf != nil {
  139. t.trackChannel(n.leaf.mutateCh)
  140. }
  141. // Copy the existing node. If you have set forLeafUpdate it will be
  142. // safe to replace this leaf with another after you get your node for
  143. // writing. You MUST replace it, because the channel associated with
  144. // this leaf will be closed when this transaction is committed.
  145. nc := &Node{
  146. mutateCh: make(chan struct{}),
  147. leaf: n.leaf,
  148. }
  149. if n.prefix != nil {
  150. nc.prefix = make([]byte, len(n.prefix))
  151. copy(nc.prefix, n.prefix)
  152. }
  153. if len(n.edges) != 0 {
  154. nc.edges = make([]edge, len(n.edges))
  155. copy(nc.edges, n.edges)
  156. }
  157. // Mark this node as writable.
  158. t.writable.Add(nc, nil)
  159. return nc
  160. }
  161. // Visit all the nodes in the tree under n, and add their mutateChannels to the transaction
  162. // Returns the size of the subtree visited
  163. func (t *Txn) trackChannelsAndCount(n *Node) int {
  164. // Count only leaf nodes
  165. leaves := 0
  166. if n.leaf != nil {
  167. leaves = 1
  168. }
  169. // Mark this node as being mutated.
  170. if t.trackMutate {
  171. t.trackChannel(n.mutateCh)
  172. }
  173. // Mark its leaf as being mutated, if appropriate.
  174. if t.trackMutate && n.leaf != nil {
  175. t.trackChannel(n.leaf.mutateCh)
  176. }
  177. // Recurse on the children
  178. for _, e := range n.edges {
  179. leaves += t.trackChannelsAndCount(e.node)
  180. }
  181. return leaves
  182. }
  183. // mergeChild is called to collapse the given node with its child. This is only
  184. // called when the given node is not a leaf and has a single edge.
  185. func (t *Txn) mergeChild(n *Node) {
  186. // Mark the child node as being mutated since we are about to abandon
  187. // it. We don't need to mark the leaf since we are retaining it if it
  188. // is there.
  189. e := n.edges[0]
  190. child := e.node
  191. if t.trackMutate {
  192. t.trackChannel(child.mutateCh)
  193. }
  194. // Merge the nodes.
  195. n.prefix = concat(n.prefix, child.prefix)
  196. n.leaf = child.leaf
  197. if len(child.edges) != 0 {
  198. n.edges = make([]edge, len(child.edges))
  199. copy(n.edges, child.edges)
  200. } else {
  201. n.edges = nil
  202. }
  203. }
  204. // insert does a recursive insertion
  205. func (t *Txn) insert(n *Node, k, search []byte, v interface{}) (*Node, interface{}, bool) {
  206. // Handle key exhaustion
  207. if len(search) == 0 {
  208. var oldVal interface{}
  209. didUpdate := false
  210. if n.isLeaf() {
  211. oldVal = n.leaf.val
  212. didUpdate = true
  213. }
  214. nc := t.writeNode(n, true)
  215. nc.leaf = &leafNode{
  216. mutateCh: make(chan struct{}),
  217. key: k,
  218. val: v,
  219. }
  220. return nc, oldVal, didUpdate
  221. }
  222. // Look for the edge
  223. idx, child := n.getEdge(search[0])
  224. // No edge, create one
  225. if child == nil {
  226. e := edge{
  227. label: search[0],
  228. node: &Node{
  229. mutateCh: make(chan struct{}),
  230. leaf: &leafNode{
  231. mutateCh: make(chan struct{}),
  232. key: k,
  233. val: v,
  234. },
  235. prefix: search,
  236. },
  237. }
  238. nc := t.writeNode(n, false)
  239. nc.addEdge(e)
  240. return nc, nil, false
  241. }
  242. // Determine longest prefix of the search key on match
  243. commonPrefix := longestPrefix(search, child.prefix)
  244. if commonPrefix == len(child.prefix) {
  245. search = search[commonPrefix:]
  246. newChild, oldVal, didUpdate := t.insert(child, k, search, v)
  247. if newChild != nil {
  248. nc := t.writeNode(n, false)
  249. nc.edges[idx].node = newChild
  250. return nc, oldVal, didUpdate
  251. }
  252. return nil, oldVal, didUpdate
  253. }
  254. // Split the node
  255. nc := t.writeNode(n, false)
  256. splitNode := &Node{
  257. mutateCh: make(chan struct{}),
  258. prefix: search[:commonPrefix],
  259. }
  260. nc.replaceEdge(edge{
  261. label: search[0],
  262. node: splitNode,
  263. })
  264. // Restore the existing child node
  265. modChild := t.writeNode(child, false)
  266. splitNode.addEdge(edge{
  267. label: modChild.prefix[commonPrefix],
  268. node: modChild,
  269. })
  270. modChild.prefix = modChild.prefix[commonPrefix:]
  271. // Create a new leaf node
  272. leaf := &leafNode{
  273. mutateCh: make(chan struct{}),
  274. key: k,
  275. val: v,
  276. }
  277. // If the new key is a subset, add to to this node
  278. search = search[commonPrefix:]
  279. if len(search) == 0 {
  280. splitNode.leaf = leaf
  281. return nc, nil, false
  282. }
  283. // Create a new edge for the node
  284. splitNode.addEdge(edge{
  285. label: search[0],
  286. node: &Node{
  287. mutateCh: make(chan struct{}),
  288. leaf: leaf,
  289. prefix: search,
  290. },
  291. })
  292. return nc, nil, false
  293. }
  294. // delete does a recursive deletion
  295. func (t *Txn) delete(parent, n *Node, search []byte) (*Node, *leafNode) {
  296. // Check for key exhaustion
  297. if len(search) == 0 {
  298. if !n.isLeaf() {
  299. return nil, nil
  300. }
  301. // Remove the leaf node
  302. nc := t.writeNode(n, true)
  303. nc.leaf = nil
  304. // Check if this node should be merged
  305. if n != t.root && len(nc.edges) == 1 {
  306. t.mergeChild(nc)
  307. }
  308. return nc, n.leaf
  309. }
  310. // Look for an edge
  311. label := search[0]
  312. idx, child := n.getEdge(label)
  313. if child == nil || !bytes.HasPrefix(search, child.prefix) {
  314. return nil, nil
  315. }
  316. // Consume the search prefix
  317. search = search[len(child.prefix):]
  318. newChild, leaf := t.delete(n, child, search)
  319. if newChild == nil {
  320. return nil, nil
  321. }
  322. // Copy this node. WATCH OUT - it's safe to pass "false" here because we
  323. // will only ADD a leaf via nc.mergeChild() if there isn't one due to
  324. // the !nc.isLeaf() check in the logic just below. This is pretty subtle,
  325. // so be careful if you change any of the logic here.
  326. nc := t.writeNode(n, false)
  327. // Delete the edge if the node has no edges
  328. if newChild.leaf == nil && len(newChild.edges) == 0 {
  329. nc.delEdge(label)
  330. if n != t.root && len(nc.edges) == 1 && !nc.isLeaf() {
  331. t.mergeChild(nc)
  332. }
  333. } else {
  334. nc.edges[idx].node = newChild
  335. }
  336. return nc, leaf
  337. }
  338. // delete does a recursive deletion
  339. func (t *Txn) deletePrefix(parent, n *Node, search []byte) (*Node, int) {
  340. // Check for key exhaustion
  341. if len(search) == 0 {
  342. nc := t.writeNode(n, true)
  343. if n.isLeaf() {
  344. nc.leaf = nil
  345. }
  346. nc.edges = nil
  347. return nc, t.trackChannelsAndCount(n)
  348. }
  349. // Look for an edge
  350. label := search[0]
  351. idx, child := n.getEdge(label)
  352. // We make sure that either the child node's prefix starts with the search term, or the search term starts with the child node's prefix
  353. // Need to do both so that we can delete prefixes that don't correspond to any node in the tree
  354. if child == nil || (!bytes.HasPrefix(child.prefix, search) && !bytes.HasPrefix(search, child.prefix)) {
  355. return nil, 0
  356. }
  357. // Consume the search prefix
  358. if len(child.prefix) > len(search) {
  359. search = []byte("")
  360. } else {
  361. search = search[len(child.prefix):]
  362. }
  363. newChild, numDeletions := t.deletePrefix(n, child, search)
  364. if newChild == nil {
  365. return nil, 0
  366. }
  367. // Copy this node. WATCH OUT - it's safe to pass "false" here because we
  368. // will only ADD a leaf via nc.mergeChild() if there isn't one due to
  369. // the !nc.isLeaf() check in the logic just below. This is pretty subtle,
  370. // so be careful if you change any of the logic here.
  371. nc := t.writeNode(n, false)
  372. // Delete the edge if the node has no edges
  373. if newChild.leaf == nil && len(newChild.edges) == 0 {
  374. nc.delEdge(label)
  375. if n != t.root && len(nc.edges) == 1 && !nc.isLeaf() {
  376. t.mergeChild(nc)
  377. }
  378. } else {
  379. nc.edges[idx].node = newChild
  380. }
  381. return nc, numDeletions
  382. }
  383. // Insert is used to add or update a given key. The return provides
  384. // the previous value and a bool indicating if any was set.
  385. func (t *Txn) Insert(k []byte, v interface{}) (interface{}, bool) {
  386. newRoot, oldVal, didUpdate := t.insert(t.root, k, k, v)
  387. if newRoot != nil {
  388. t.root = newRoot
  389. }
  390. if !didUpdate {
  391. t.size++
  392. }
  393. return oldVal, didUpdate
  394. }
  395. // Delete is used to delete a given key. Returns the old value if any,
  396. // and a bool indicating if the key was set.
  397. func (t *Txn) Delete(k []byte) (interface{}, bool) {
  398. newRoot, leaf := t.delete(nil, t.root, k)
  399. if newRoot != nil {
  400. t.root = newRoot
  401. }
  402. if leaf != nil {
  403. t.size--
  404. return leaf.val, true
  405. }
  406. return nil, false
  407. }
  408. // DeletePrefix is used to delete an entire subtree that matches the prefix
  409. // This will delete all nodes under that prefix
  410. func (t *Txn) DeletePrefix(prefix []byte) bool {
  411. newRoot, numDeletions := t.deletePrefix(nil, t.root, prefix)
  412. if newRoot != nil {
  413. t.root = newRoot
  414. t.size = t.size - numDeletions
  415. return true
  416. }
  417. return false
  418. }
  419. // Root returns the current root of the radix tree within this
  420. // transaction. The root is not safe across insert and delete operations,
  421. // but can be used to read the current state during a transaction.
  422. func (t *Txn) Root() *Node {
  423. return t.root
  424. }
  425. // Get is used to lookup a specific key, returning
  426. // the value and if it was found
  427. func (t *Txn) Get(k []byte) (interface{}, bool) {
  428. return t.root.Get(k)
  429. }
  430. // GetWatch is used to lookup a specific key, returning
  431. // the watch channel, value and if it was found
  432. func (t *Txn) GetWatch(k []byte) (<-chan struct{}, interface{}, bool) {
  433. return t.root.GetWatch(k)
  434. }
  435. // Commit is used to finalize the transaction and return a new tree. If mutation
  436. // tracking is turned on then notifications will also be issued.
  437. func (t *Txn) Commit() *Tree {
  438. nt := t.CommitOnly()
  439. if t.trackMutate {
  440. t.Notify()
  441. }
  442. return nt
  443. }
  444. // CommitOnly is used to finalize the transaction and return a new tree, but
  445. // does not issue any notifications until Notify is called.
  446. func (t *Txn) CommitOnly() *Tree {
  447. nt := &Tree{t.root, t.size}
  448. t.writable = nil
  449. return nt
  450. }
  451. // slowNotify does a complete comparison of the before and after trees in order
  452. // to trigger notifications. This doesn't require any additional state but it
  453. // is very expensive to compute.
  454. func (t *Txn) slowNotify() {
  455. snapIter := t.snap.rawIterator()
  456. rootIter := t.root.rawIterator()
  457. for snapIter.Front() != nil || rootIter.Front() != nil {
  458. // If we've exhausted the nodes in the old snapshot, we know
  459. // there's nothing remaining to notify.
  460. if snapIter.Front() == nil {
  461. return
  462. }
  463. snapElem := snapIter.Front()
  464. // If we've exhausted the nodes in the new root, we know we need
  465. // to invalidate everything that remains in the old snapshot. We
  466. // know from the loop condition there's something in the old
  467. // snapshot.
  468. if rootIter.Front() == nil {
  469. close(snapElem.mutateCh)
  470. if snapElem.isLeaf() {
  471. close(snapElem.leaf.mutateCh)
  472. }
  473. snapIter.Next()
  474. continue
  475. }
  476. // Do one string compare so we can check the various conditions
  477. // below without repeating the compare.
  478. cmp := strings.Compare(snapIter.Path(), rootIter.Path())
  479. // If the snapshot is behind the root, then we must have deleted
  480. // this node during the transaction.
  481. if cmp < 0 {
  482. close(snapElem.mutateCh)
  483. if snapElem.isLeaf() {
  484. close(snapElem.leaf.mutateCh)
  485. }
  486. snapIter.Next()
  487. continue
  488. }
  489. // If the snapshot is ahead of the root, then we must have added
  490. // this node during the transaction.
  491. if cmp > 0 {
  492. rootIter.Next()
  493. continue
  494. }
  495. // If we have the same path, then we need to see if we mutated a
  496. // node and possibly the leaf.
  497. rootElem := rootIter.Front()
  498. if snapElem != rootElem {
  499. close(snapElem.mutateCh)
  500. if snapElem.leaf != nil && (snapElem.leaf != rootElem.leaf) {
  501. close(snapElem.leaf.mutateCh)
  502. }
  503. }
  504. snapIter.Next()
  505. rootIter.Next()
  506. }
  507. }
  508. // Notify is used along with TrackMutate to trigger notifications. This must
  509. // only be done once a transaction is committed via CommitOnly, and it is called
  510. // automatically by Commit.
  511. func (t *Txn) Notify() {
  512. if !t.trackMutate {
  513. return
  514. }
  515. // If we've overflowed the tracking state we can't use it in any way and
  516. // need to do a full tree compare.
  517. if t.trackOverflow {
  518. t.slowNotify()
  519. } else {
  520. for ch := range t.trackChannels {
  521. close(ch)
  522. }
  523. }
  524. // Clean up the tracking state so that a re-notify is safe (will trigger
  525. // the else clause above which will be a no-op).
  526. t.trackChannels = nil
  527. t.trackOverflow = false
  528. }
  529. // Insert is used to add or update a given key. The return provides
  530. // the new tree, previous value and a bool indicating if any was set.
  531. func (t *Tree) Insert(k []byte, v interface{}) (*Tree, interface{}, bool) {
  532. txn := t.Txn()
  533. old, ok := txn.Insert(k, v)
  534. return txn.Commit(), old, ok
  535. }
  536. // Delete is used to delete a given key. Returns the new tree,
  537. // old value if any, and a bool indicating if the key was set.
  538. func (t *Tree) Delete(k []byte) (*Tree, interface{}, bool) {
  539. txn := t.Txn()
  540. old, ok := txn.Delete(k)
  541. return txn.Commit(), old, ok
  542. }
  543. // DeletePrefix is used to delete all nodes starting with a given prefix. Returns the new tree,
  544. // and a bool indicating if the prefix matched any nodes
  545. func (t *Tree) DeletePrefix(k []byte) (*Tree, bool) {
  546. txn := t.Txn()
  547. ok := txn.DeletePrefix(k)
  548. return txn.Commit(), ok
  549. }
  550. // Root returns the root node of the tree which can be used for richer
  551. // query operations.
  552. func (t *Tree) Root() *Node {
  553. return t.root
  554. }
  555. // Get is used to lookup a specific key, returning
  556. // the value and if it was found
  557. func (t *Tree) Get(k []byte) (interface{}, bool) {
  558. return t.root.Get(k)
  559. }
  560. // longestPrefix finds the length of the shared prefix
  561. // of two strings
  562. func longestPrefix(k1, k2 []byte) int {
  563. max := len(k1)
  564. if l := len(k2); l < max {
  565. max = l
  566. }
  567. var i int
  568. for i = 0; i < max; i++ {
  569. if k1[i] != k2[i] {
  570. break
  571. }
  572. }
  573. return i
  574. }
  575. // concat two byte slices, returning a third new copy
  576. func concat(a, b []byte) []byte {
  577. c := make([]byte, len(a)+len(b))
  578. copy(c, a)
  579. copy(c[len(a):], b)
  580. return c
  581. }