iradix.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676
  1. package iradix
  2. import (
  3. "bytes"
  4. "strings"
  5. "github.com/hashicorp/golang-lru/simplelru"
  6. )
  7. const (
  8. // defaultModifiedCache is the default size of the modified node
  9. // cache used per transaction. This is used to cache the updates
  10. // to the nodes near the root, while the leaves do not need to be
  11. // cached. This is important for very large transactions to prevent
  12. // the modified cache from growing to be enormous. This is also used
  13. // to set the max size of the mutation notify maps since those should
  14. // also be bounded in a similar way.
  15. defaultModifiedCache = 8192
  16. )
  17. // Tree implements an immutable radix tree. This can be treated as a
  18. // Dictionary abstract data type. The main advantage over a standard
  19. // hash map is prefix-based lookups and ordered iteration. The immutability
  20. // means that it is safe to concurrently read from a Tree without any
  21. // coordination.
  22. type Tree struct {
  23. root *Node
  24. size int
  25. }
  26. // New returns an empty Tree
  27. func New() *Tree {
  28. t := &Tree{
  29. root: &Node{
  30. mutateCh: make(chan struct{}),
  31. },
  32. }
  33. return t
  34. }
  35. // Len is used to return the number of elements in the tree
  36. func (t *Tree) Len() int {
  37. return t.size
  38. }
  39. // Txn is a transaction on the tree. This transaction is applied
  40. // atomically and returns a new tree when committed. A transaction
  41. // is not thread safe, and should only be used by a single goroutine.
  42. type Txn struct {
  43. // root is the modified root for the transaction.
  44. root *Node
  45. // snap is a snapshot of the root node for use if we have to run the
  46. // slow notify algorithm.
  47. snap *Node
  48. // size tracks the size of the tree as it is modified during the
  49. // transaction.
  50. size int
  51. // writable is a cache of writable nodes that have been created during
  52. // the course of the transaction. This allows us to re-use the same
  53. // nodes for further writes and avoid unnecessary copies of nodes that
  54. // have never been exposed outside the transaction. This will only hold
  55. // up to defaultModifiedCache number of entries.
  56. writable *simplelru.LRU
  57. // trackChannels is used to hold channels that need to be notified to
  58. // signal mutation of the tree. This will only hold up to
  59. // defaultModifiedCache number of entries, after which we will set the
  60. // trackOverflow flag, which will cause us to use a more expensive
  61. // algorithm to perform the notifications. Mutation tracking is only
  62. // performed if trackMutate is true.
  63. trackChannels map[chan struct{}]struct{}
  64. trackOverflow bool
  65. trackMutate bool
  66. }
  67. // Txn starts a new transaction that can be used to mutate the tree
  68. func (t *Tree) Txn() *Txn {
  69. txn := &Txn{
  70. root: t.root,
  71. snap: t.root,
  72. size: t.size,
  73. }
  74. return txn
  75. }
  76. // Clone makes an independent copy of the transaction. The new transaction
  77. // does not track any nodes and has TrackMutate turned off. The cloned transaction will contain any uncommitted writes in the original transaction but further mutations to either will be independent and result in different radix trees on Commit. A cloned transaction may be passed to another goroutine and mutated there independently however each transaction may only be mutated in a single thread.
  78. func (t *Txn) Clone() *Txn {
  79. // reset the writable node cache to avoid leaking future writes into the clone
  80. t.writable = nil
  81. txn := &Txn{
  82. root: t.root,
  83. snap: t.snap,
  84. size: t.size,
  85. }
  86. return txn
  87. }
  88. // TrackMutate can be used to toggle if mutations are tracked. If this is enabled
  89. // then notifications will be issued for affected internal nodes and leaves when
  90. // the transaction is committed.
  91. func (t *Txn) TrackMutate(track bool) {
  92. t.trackMutate = track
  93. }
  94. // trackChannel safely attempts to track the given mutation channel, setting the
  95. // overflow flag if we can no longer track any more. This limits the amount of
  96. // state that will accumulate during a transaction and we have a slower algorithm
  97. // to switch to if we overflow.
  98. func (t *Txn) trackChannel(ch chan struct{}) {
  99. // In overflow, make sure we don't store any more objects.
  100. if t.trackOverflow {
  101. return
  102. }
  103. // If this would overflow the state we reject it and set the flag (since
  104. // we aren't tracking everything that's required any longer).
  105. if len(t.trackChannels) >= defaultModifiedCache {
  106. // Mark that we are in the overflow state
  107. t.trackOverflow = true
  108. // Clear the map so that the channels can be garbage collected. It is
  109. // safe to do this since we have already overflowed and will be using
  110. // the slow notify algorithm.
  111. t.trackChannels = nil
  112. return
  113. }
  114. // Create the map on the fly when we need it.
  115. if t.trackChannels == nil {
  116. t.trackChannels = make(map[chan struct{}]struct{})
  117. }
  118. // Otherwise we are good to track it.
  119. t.trackChannels[ch] = struct{}{}
  120. }
  121. // writeNode returns a node to be modified, if the current node has already been
  122. // modified during the course of the transaction, it is used in-place. Set
  123. // forLeafUpdate to true if you are getting a write node to update the leaf,
  124. // which will set leaf mutation tracking appropriately as well.
  125. func (t *Txn) writeNode(n *Node, forLeafUpdate bool) *Node {
  126. // Ensure the writable set exists.
  127. if t.writable == nil {
  128. lru, err := simplelru.NewLRU(defaultModifiedCache, nil)
  129. if err != nil {
  130. panic(err)
  131. }
  132. t.writable = lru
  133. }
  134. // If this node has already been modified, we can continue to use it
  135. // during this transaction. We know that we don't need to track it for
  136. // a node update since the node is writable, but if this is for a leaf
  137. // update we track it, in case the initial write to this node didn't
  138. // update the leaf.
  139. if _, ok := t.writable.Get(n); ok {
  140. if t.trackMutate && forLeafUpdate && n.leaf != nil {
  141. t.trackChannel(n.leaf.mutateCh)
  142. }
  143. return n
  144. }
  145. // Mark this node as being mutated.
  146. if t.trackMutate {
  147. t.trackChannel(n.mutateCh)
  148. }
  149. // Mark its leaf as being mutated, if appropriate.
  150. if t.trackMutate && forLeafUpdate && n.leaf != nil {
  151. t.trackChannel(n.leaf.mutateCh)
  152. }
  153. // Copy the existing node. If you have set forLeafUpdate it will be
  154. // safe to replace this leaf with another after you get your node for
  155. // writing. You MUST replace it, because the channel associated with
  156. // this leaf will be closed when this transaction is committed.
  157. nc := &Node{
  158. mutateCh: make(chan struct{}),
  159. leaf: n.leaf,
  160. }
  161. if n.prefix != nil {
  162. nc.prefix = make([]byte, len(n.prefix))
  163. copy(nc.prefix, n.prefix)
  164. }
  165. if len(n.edges) != 0 {
  166. nc.edges = make([]edge, len(n.edges))
  167. copy(nc.edges, n.edges)
  168. }
  169. // Mark this node as writable.
  170. t.writable.Add(nc, nil)
  171. return nc
  172. }
  173. // Visit all the nodes in the tree under n, and add their mutateChannels to the transaction
  174. // Returns the size of the subtree visited
  175. func (t *Txn) trackChannelsAndCount(n *Node) int {
  176. // Count only leaf nodes
  177. leaves := 0
  178. if n.leaf != nil {
  179. leaves = 1
  180. }
  181. // Mark this node as being mutated.
  182. if t.trackMutate {
  183. t.trackChannel(n.mutateCh)
  184. }
  185. // Mark its leaf as being mutated, if appropriate.
  186. if t.trackMutate && n.leaf != nil {
  187. t.trackChannel(n.leaf.mutateCh)
  188. }
  189. // Recurse on the children
  190. for _, e := range n.edges {
  191. leaves += t.trackChannelsAndCount(e.node)
  192. }
  193. return leaves
  194. }
  195. // mergeChild is called to collapse the given node with its child. This is only
  196. // called when the given node is not a leaf and has a single edge.
  197. func (t *Txn) mergeChild(n *Node) {
  198. // Mark the child node as being mutated since we are about to abandon
  199. // it. We don't need to mark the leaf since we are retaining it if it
  200. // is there.
  201. e := n.edges[0]
  202. child := e.node
  203. if t.trackMutate {
  204. t.trackChannel(child.mutateCh)
  205. }
  206. // Merge the nodes.
  207. n.prefix = concat(n.prefix, child.prefix)
  208. n.leaf = child.leaf
  209. if len(child.edges) != 0 {
  210. n.edges = make([]edge, len(child.edges))
  211. copy(n.edges, child.edges)
  212. } else {
  213. n.edges = nil
  214. }
  215. }
  216. // insert does a recursive insertion
  217. func (t *Txn) insert(n *Node, k, search []byte, v interface{}) (*Node, interface{}, bool) {
  218. // Handle key exhaustion
  219. if len(search) == 0 {
  220. var oldVal interface{}
  221. didUpdate := false
  222. if n.isLeaf() {
  223. oldVal = n.leaf.val
  224. didUpdate = true
  225. }
  226. nc := t.writeNode(n, true)
  227. nc.leaf = &leafNode{
  228. mutateCh: make(chan struct{}),
  229. key: k,
  230. val: v,
  231. }
  232. return nc, oldVal, didUpdate
  233. }
  234. // Look for the edge
  235. idx, child := n.getEdge(search[0])
  236. // No edge, create one
  237. if child == nil {
  238. e := edge{
  239. label: search[0],
  240. node: &Node{
  241. mutateCh: make(chan struct{}),
  242. leaf: &leafNode{
  243. mutateCh: make(chan struct{}),
  244. key: k,
  245. val: v,
  246. },
  247. prefix: search,
  248. },
  249. }
  250. nc := t.writeNode(n, false)
  251. nc.addEdge(e)
  252. return nc, nil, false
  253. }
  254. // Determine longest prefix of the search key on match
  255. commonPrefix := longestPrefix(search, child.prefix)
  256. if commonPrefix == len(child.prefix) {
  257. search = search[commonPrefix:]
  258. newChild, oldVal, didUpdate := t.insert(child, k, search, v)
  259. if newChild != nil {
  260. nc := t.writeNode(n, false)
  261. nc.edges[idx].node = newChild
  262. return nc, oldVal, didUpdate
  263. }
  264. return nil, oldVal, didUpdate
  265. }
  266. // Split the node
  267. nc := t.writeNode(n, false)
  268. splitNode := &Node{
  269. mutateCh: make(chan struct{}),
  270. prefix: search[:commonPrefix],
  271. }
  272. nc.replaceEdge(edge{
  273. label: search[0],
  274. node: splitNode,
  275. })
  276. // Restore the existing child node
  277. modChild := t.writeNode(child, false)
  278. splitNode.addEdge(edge{
  279. label: modChild.prefix[commonPrefix],
  280. node: modChild,
  281. })
  282. modChild.prefix = modChild.prefix[commonPrefix:]
  283. // Create a new leaf node
  284. leaf := &leafNode{
  285. mutateCh: make(chan struct{}),
  286. key: k,
  287. val: v,
  288. }
  289. // If the new key is a subset, add to to this node
  290. search = search[commonPrefix:]
  291. if len(search) == 0 {
  292. splitNode.leaf = leaf
  293. return nc, nil, false
  294. }
  295. // Create a new edge for the node
  296. splitNode.addEdge(edge{
  297. label: search[0],
  298. node: &Node{
  299. mutateCh: make(chan struct{}),
  300. leaf: leaf,
  301. prefix: search,
  302. },
  303. })
  304. return nc, nil, false
  305. }
  306. // delete does a recursive deletion
  307. func (t *Txn) delete(parent, n *Node, search []byte) (*Node, *leafNode) {
  308. // Check for key exhaustion
  309. if len(search) == 0 {
  310. if !n.isLeaf() {
  311. return nil, nil
  312. }
  313. // Copy the pointer in case we are in a transaction that already
  314. // modified this node since the node will be reused. Any changes
  315. // made to the node will not affect returning the original leaf
  316. // value.
  317. oldLeaf := n.leaf
  318. // Remove the leaf node
  319. nc := t.writeNode(n, true)
  320. nc.leaf = nil
  321. // Check if this node should be merged
  322. if n != t.root && len(nc.edges) == 1 {
  323. t.mergeChild(nc)
  324. }
  325. return nc, oldLeaf
  326. }
  327. // Look for an edge
  328. label := search[0]
  329. idx, child := n.getEdge(label)
  330. if child == nil || !bytes.HasPrefix(search, child.prefix) {
  331. return nil, nil
  332. }
  333. // Consume the search prefix
  334. search = search[len(child.prefix):]
  335. newChild, leaf := t.delete(n, child, search)
  336. if newChild == nil {
  337. return nil, nil
  338. }
  339. // Copy this node. WATCH OUT - it's safe to pass "false" here because we
  340. // will only ADD a leaf via nc.mergeChild() if there isn't one due to
  341. // the !nc.isLeaf() check in the logic just below. This is pretty subtle,
  342. // so be careful if you change any of the logic here.
  343. nc := t.writeNode(n, false)
  344. // Delete the edge if the node has no edges
  345. if newChild.leaf == nil && len(newChild.edges) == 0 {
  346. nc.delEdge(label)
  347. if n != t.root && len(nc.edges) == 1 && !nc.isLeaf() {
  348. t.mergeChild(nc)
  349. }
  350. } else {
  351. nc.edges[idx].node = newChild
  352. }
  353. return nc, leaf
  354. }
  355. // delete does a recursive deletion
  356. func (t *Txn) deletePrefix(parent, n *Node, search []byte) (*Node, int) {
  357. // Check for key exhaustion
  358. if len(search) == 0 {
  359. nc := t.writeNode(n, true)
  360. if n.isLeaf() {
  361. nc.leaf = nil
  362. }
  363. nc.edges = nil
  364. return nc, t.trackChannelsAndCount(n)
  365. }
  366. // Look for an edge
  367. label := search[0]
  368. idx, child := n.getEdge(label)
  369. // We make sure that either the child node's prefix starts with the search term, or the search term starts with the child node's prefix
  370. // Need to do both so that we can delete prefixes that don't correspond to any node in the tree
  371. if child == nil || (!bytes.HasPrefix(child.prefix, search) && !bytes.HasPrefix(search, child.prefix)) {
  372. return nil, 0
  373. }
  374. // Consume the search prefix
  375. if len(child.prefix) > len(search) {
  376. search = []byte("")
  377. } else {
  378. search = search[len(child.prefix):]
  379. }
  380. newChild, numDeletions := t.deletePrefix(n, child, search)
  381. if newChild == nil {
  382. return nil, 0
  383. }
  384. // Copy this node. WATCH OUT - it's safe to pass "false" here because we
  385. // will only ADD a leaf via nc.mergeChild() if there isn't one due to
  386. // the !nc.isLeaf() check in the logic just below. This is pretty subtle,
  387. // so be careful if you change any of the logic here.
  388. nc := t.writeNode(n, false)
  389. // Delete the edge if the node has no edges
  390. if newChild.leaf == nil && len(newChild.edges) == 0 {
  391. nc.delEdge(label)
  392. if n != t.root && len(nc.edges) == 1 && !nc.isLeaf() {
  393. t.mergeChild(nc)
  394. }
  395. } else {
  396. nc.edges[idx].node = newChild
  397. }
  398. return nc, numDeletions
  399. }
  400. // Insert is used to add or update a given key. The return provides
  401. // the previous value and a bool indicating if any was set.
  402. func (t *Txn) Insert(k []byte, v interface{}) (interface{}, bool) {
  403. newRoot, oldVal, didUpdate := t.insert(t.root, k, k, v)
  404. if newRoot != nil {
  405. t.root = newRoot
  406. }
  407. if !didUpdate {
  408. t.size++
  409. }
  410. return oldVal, didUpdate
  411. }
  412. // Delete is used to delete a given key. Returns the old value if any,
  413. // and a bool indicating if the key was set.
  414. func (t *Txn) Delete(k []byte) (interface{}, bool) {
  415. newRoot, leaf := t.delete(nil, t.root, k)
  416. if newRoot != nil {
  417. t.root = newRoot
  418. }
  419. if leaf != nil {
  420. t.size--
  421. return leaf.val, true
  422. }
  423. return nil, false
  424. }
  425. // DeletePrefix is used to delete an entire subtree that matches the prefix
  426. // This will delete all nodes under that prefix
  427. func (t *Txn) DeletePrefix(prefix []byte) bool {
  428. newRoot, numDeletions := t.deletePrefix(nil, t.root, prefix)
  429. if newRoot != nil {
  430. t.root = newRoot
  431. t.size = t.size - numDeletions
  432. return true
  433. }
  434. return false
  435. }
  436. // Root returns the current root of the radix tree within this
  437. // transaction. The root is not safe across insert and delete operations,
  438. // but can be used to read the current state during a transaction.
  439. func (t *Txn) Root() *Node {
  440. return t.root
  441. }
  442. // Get is used to lookup a specific key, returning
  443. // the value and if it was found
  444. func (t *Txn) Get(k []byte) (interface{}, bool) {
  445. return t.root.Get(k)
  446. }
  447. // GetWatch is used to lookup a specific key, returning
  448. // the watch channel, value and if it was found
  449. func (t *Txn) GetWatch(k []byte) (<-chan struct{}, interface{}, bool) {
  450. return t.root.GetWatch(k)
  451. }
  452. // Commit is used to finalize the transaction and return a new tree. If mutation
  453. // tracking is turned on then notifications will also be issued.
  454. func (t *Txn) Commit() *Tree {
  455. nt := t.CommitOnly()
  456. if t.trackMutate {
  457. t.Notify()
  458. }
  459. return nt
  460. }
  461. // CommitOnly is used to finalize the transaction and return a new tree, but
  462. // does not issue any notifications until Notify is called.
  463. func (t *Txn) CommitOnly() *Tree {
  464. nt := &Tree{t.root, t.size}
  465. t.writable = nil
  466. return nt
  467. }
  468. // slowNotify does a complete comparison of the before and after trees in order
  469. // to trigger notifications. This doesn't require any additional state but it
  470. // is very expensive to compute.
  471. func (t *Txn) slowNotify() {
  472. snapIter := t.snap.rawIterator()
  473. rootIter := t.root.rawIterator()
  474. for snapIter.Front() != nil || rootIter.Front() != nil {
  475. // If we've exhausted the nodes in the old snapshot, we know
  476. // there's nothing remaining to notify.
  477. if snapIter.Front() == nil {
  478. return
  479. }
  480. snapElem := snapIter.Front()
  481. // If we've exhausted the nodes in the new root, we know we need
  482. // to invalidate everything that remains in the old snapshot. We
  483. // know from the loop condition there's something in the old
  484. // snapshot.
  485. if rootIter.Front() == nil {
  486. close(snapElem.mutateCh)
  487. if snapElem.isLeaf() {
  488. close(snapElem.leaf.mutateCh)
  489. }
  490. snapIter.Next()
  491. continue
  492. }
  493. // Do one string compare so we can check the various conditions
  494. // below without repeating the compare.
  495. cmp := strings.Compare(snapIter.Path(), rootIter.Path())
  496. // If the snapshot is behind the root, then we must have deleted
  497. // this node during the transaction.
  498. if cmp < 0 {
  499. close(snapElem.mutateCh)
  500. if snapElem.isLeaf() {
  501. close(snapElem.leaf.mutateCh)
  502. }
  503. snapIter.Next()
  504. continue
  505. }
  506. // If the snapshot is ahead of the root, then we must have added
  507. // this node during the transaction.
  508. if cmp > 0 {
  509. rootIter.Next()
  510. continue
  511. }
  512. // If we have the same path, then we need to see if we mutated a
  513. // node and possibly the leaf.
  514. rootElem := rootIter.Front()
  515. if snapElem != rootElem {
  516. close(snapElem.mutateCh)
  517. if snapElem.leaf != nil && (snapElem.leaf != rootElem.leaf) {
  518. close(snapElem.leaf.mutateCh)
  519. }
  520. }
  521. snapIter.Next()
  522. rootIter.Next()
  523. }
  524. }
  525. // Notify is used along with TrackMutate to trigger notifications. This must
  526. // only be done once a transaction is committed via CommitOnly, and it is called
  527. // automatically by Commit.
  528. func (t *Txn) Notify() {
  529. if !t.trackMutate {
  530. return
  531. }
  532. // If we've overflowed the tracking state we can't use it in any way and
  533. // need to do a full tree compare.
  534. if t.trackOverflow {
  535. t.slowNotify()
  536. } else {
  537. for ch := range t.trackChannels {
  538. close(ch)
  539. }
  540. }
  541. // Clean up the tracking state so that a re-notify is safe (will trigger
  542. // the else clause above which will be a no-op).
  543. t.trackChannels = nil
  544. t.trackOverflow = false
  545. }
  546. // Insert is used to add or update a given key. The return provides
  547. // the new tree, previous value and a bool indicating if any was set.
  548. func (t *Tree) Insert(k []byte, v interface{}) (*Tree, interface{}, bool) {
  549. txn := t.Txn()
  550. old, ok := txn.Insert(k, v)
  551. return txn.Commit(), old, ok
  552. }
  553. // Delete is used to delete a given key. Returns the new tree,
  554. // old value if any, and a bool indicating if the key was set.
  555. func (t *Tree) Delete(k []byte) (*Tree, interface{}, bool) {
  556. txn := t.Txn()
  557. old, ok := txn.Delete(k)
  558. return txn.Commit(), old, ok
  559. }
  560. // DeletePrefix is used to delete all nodes starting with a given prefix. Returns the new tree,
  561. // and a bool indicating if the prefix matched any nodes
  562. func (t *Tree) DeletePrefix(k []byte) (*Tree, bool) {
  563. txn := t.Txn()
  564. ok := txn.DeletePrefix(k)
  565. return txn.Commit(), ok
  566. }
  567. // Root returns the root node of the tree which can be used for richer
  568. // query operations.
  569. func (t *Tree) Root() *Node {
  570. return t.root
  571. }
  572. // Get is used to lookup a specific key, returning
  573. // the value and if it was found
  574. func (t *Tree) Get(k []byte) (interface{}, bool) {
  575. return t.root.Get(k)
  576. }
  577. // longestPrefix finds the length of the shared prefix
  578. // of two strings
  579. func longestPrefix(k1, k2 []byte) int {
  580. max := len(k1)
  581. if l := len(k2); l < max {
  582. max = l
  583. }
  584. var i int
  585. for i = 0; i < max; i++ {
  586. if k1[i] != k2[i] {
  587. break
  588. }
  589. }
  590. return i
  591. }
  592. // concat two byte slices, returning a third new copy
  593. func concat(a, b []byte) []byte {
  594. c := make([]byte, len(a)+len(b))
  595. copy(c, a)
  596. copy(c[len(a):], b)
  597. return c
  598. }