txn.go 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981
  1. package memdb
  2. import (
  3. "bytes"
  4. "fmt"
  5. "strings"
  6. "sync/atomic"
  7. "unsafe"
  8. iradix "github.com/hashicorp/go-immutable-radix"
  9. )
  10. const (
  11. id = "id"
  12. )
  13. var (
  14. // ErrNotFound is returned when the requested item is not found
  15. ErrNotFound = fmt.Errorf("not found")
  16. )
  17. // tableIndex is a tuple of (Table, Index) used for lookups
  18. type tableIndex struct {
  19. Table string
  20. Index string
  21. }
  22. // Txn is a transaction against a MemDB.
  23. // This can be a read or write transaction.
  24. type Txn struct {
  25. db *MemDB
  26. write bool
  27. rootTxn *iradix.Txn
  28. after []func()
  29. // changes is used to track the changes performed during the transaction. If
  30. // it is nil at transaction start then changes are not tracked.
  31. changes Changes
  32. modified map[tableIndex]*iradix.Txn
  33. }
  34. // TrackChanges enables change tracking for the transaction. If called at any
  35. // point before commit, subsequent mutations will be recorded and can be
  36. // retrieved using ChangeSet. Once this has been called on a transaction it
  37. // can't be unset. As with other Txn methods it's not safe to call this from a
  38. // different goroutine than the one making mutations or committing the
  39. // transaction.
  40. func (txn *Txn) TrackChanges() {
  41. if txn.changes == nil {
  42. txn.changes = make(Changes, 0, 1)
  43. }
  44. }
  45. // readableIndex returns a transaction usable for reading the given index in a
  46. // table. If the transaction is a write transaction with modifications, a clone of the
  47. // modified index will be returned.
  48. func (txn *Txn) readableIndex(table, index string) *iradix.Txn {
  49. // Look for existing transaction
  50. if txn.write && txn.modified != nil {
  51. key := tableIndex{table, index}
  52. exist, ok := txn.modified[key]
  53. if ok {
  54. return exist.Clone()
  55. }
  56. }
  57. // Create a read transaction
  58. path := indexPath(table, index)
  59. raw, _ := txn.rootTxn.Get(path)
  60. indexTxn := raw.(*iradix.Tree).Txn()
  61. return indexTxn
  62. }
  63. // writableIndex returns a transaction usable for modifying the
  64. // given index in a table.
  65. func (txn *Txn) writableIndex(table, index string) *iradix.Txn {
  66. if txn.modified == nil {
  67. txn.modified = make(map[tableIndex]*iradix.Txn)
  68. }
  69. // Look for existing transaction
  70. key := tableIndex{table, index}
  71. exist, ok := txn.modified[key]
  72. if ok {
  73. return exist
  74. }
  75. // Start a new transaction
  76. path := indexPath(table, index)
  77. raw, _ := txn.rootTxn.Get(path)
  78. indexTxn := raw.(*iradix.Tree).Txn()
  79. // If we are the primary DB, enable mutation tracking. Snapshots should
  80. // not notify, otherwise we will trigger watches on the primary DB when
  81. // the writes will not be visible.
  82. indexTxn.TrackMutate(txn.db.primary)
  83. // Keep this open for the duration of the txn
  84. txn.modified[key] = indexTxn
  85. return indexTxn
  86. }
  87. // Abort is used to cancel this transaction.
  88. // This is a noop for read transactions.
  89. func (txn *Txn) Abort() {
  90. // Noop for a read transaction
  91. if !txn.write {
  92. return
  93. }
  94. // Check if already aborted or committed
  95. if txn.rootTxn == nil {
  96. return
  97. }
  98. // Clear the txn
  99. txn.rootTxn = nil
  100. txn.modified = nil
  101. txn.changes = nil
  102. // Release the writer lock since this is invalid
  103. txn.db.writer.Unlock()
  104. }
  105. // Commit is used to finalize this transaction.
  106. // This is a noop for read transactions.
  107. func (txn *Txn) Commit() {
  108. // Noop for a read transaction
  109. if !txn.write {
  110. return
  111. }
  112. // Check if already aborted or committed
  113. if txn.rootTxn == nil {
  114. return
  115. }
  116. // Commit each sub-transaction scoped to (table, index)
  117. for key, subTxn := range txn.modified {
  118. path := indexPath(key.Table, key.Index)
  119. final := subTxn.CommitOnly()
  120. txn.rootTxn.Insert(path, final)
  121. }
  122. // Update the root of the DB
  123. newRoot := txn.rootTxn.CommitOnly()
  124. atomic.StorePointer(&txn.db.root, unsafe.Pointer(newRoot))
  125. // Now issue all of the mutation updates (this is safe to call
  126. // even if mutation tracking isn't enabled); we do this after
  127. // the root pointer is swapped so that waking responders will
  128. // see the new state.
  129. for _, subTxn := range txn.modified {
  130. subTxn.Notify()
  131. }
  132. txn.rootTxn.Notify()
  133. // Clear the txn
  134. txn.rootTxn = nil
  135. txn.modified = nil
  136. // Release the writer lock since this is invalid
  137. txn.db.writer.Unlock()
  138. // Run the deferred functions, if any
  139. for i := len(txn.after); i > 0; i-- {
  140. fn := txn.after[i-1]
  141. fn()
  142. }
  143. }
  144. // Insert is used to add or update an object into the given table.
  145. //
  146. // When updating an object, the obj provided should be a copy rather
  147. // than a value updated in-place. Modifying values in-place that are already
  148. // inserted into MemDB is not supported behavior.
  149. func (txn *Txn) Insert(table string, obj interface{}) error {
  150. if !txn.write {
  151. return fmt.Errorf("cannot insert in read-only transaction")
  152. }
  153. // Get the table schema
  154. tableSchema, ok := txn.db.schema.Tables[table]
  155. if !ok {
  156. return fmt.Errorf("invalid table '%s'", table)
  157. }
  158. // Get the primary ID of the object
  159. idSchema := tableSchema.Indexes[id]
  160. idIndexer := idSchema.Indexer.(SingleIndexer)
  161. ok, idVal, err := idIndexer.FromObject(obj)
  162. if err != nil {
  163. return fmt.Errorf("failed to build primary index: %v", err)
  164. }
  165. if !ok {
  166. return fmt.Errorf("object missing primary index")
  167. }
  168. // Lookup the object by ID first, to see if this is an update
  169. idTxn := txn.writableIndex(table, id)
  170. existing, update := idTxn.Get(idVal)
  171. // On an update, there is an existing object with the given
  172. // primary ID. We do the update by deleting the current object
  173. // and inserting the new object.
  174. for name, indexSchema := range tableSchema.Indexes {
  175. indexTxn := txn.writableIndex(table, name)
  176. // Determine the new index value
  177. var (
  178. ok bool
  179. vals [][]byte
  180. err error
  181. )
  182. switch indexer := indexSchema.Indexer.(type) {
  183. case SingleIndexer:
  184. var val []byte
  185. ok, val, err = indexer.FromObject(obj)
  186. vals = [][]byte{val}
  187. case MultiIndexer:
  188. ok, vals, err = indexer.FromObject(obj)
  189. }
  190. if err != nil {
  191. return fmt.Errorf("failed to build index '%s': %v", name, err)
  192. }
  193. // Handle non-unique index by computing a unique index.
  194. // This is done by appending the primary key which must
  195. // be unique anyways.
  196. if ok && !indexSchema.Unique {
  197. for i := range vals {
  198. vals[i] = append(vals[i], idVal...)
  199. }
  200. }
  201. // Handle the update by deleting from the index first
  202. if update {
  203. var (
  204. okExist bool
  205. valsExist [][]byte
  206. err error
  207. )
  208. switch indexer := indexSchema.Indexer.(type) {
  209. case SingleIndexer:
  210. var valExist []byte
  211. okExist, valExist, err = indexer.FromObject(existing)
  212. valsExist = [][]byte{valExist}
  213. case MultiIndexer:
  214. okExist, valsExist, err = indexer.FromObject(existing)
  215. }
  216. if err != nil {
  217. return fmt.Errorf("failed to build index '%s': %v", name, err)
  218. }
  219. if okExist {
  220. for i, valExist := range valsExist {
  221. // Handle non-unique index by computing a unique index.
  222. // This is done by appending the primary key which must
  223. // be unique anyways.
  224. if !indexSchema.Unique {
  225. valExist = append(valExist, idVal...)
  226. }
  227. // If we are writing to the same index with the same value,
  228. // we can avoid the delete as the insert will overwrite the
  229. // value anyways.
  230. if i >= len(vals) || !bytes.Equal(valExist, vals[i]) {
  231. indexTxn.Delete(valExist)
  232. }
  233. }
  234. }
  235. }
  236. // If there is no index value, either this is an error or an expected
  237. // case and we can skip updating
  238. if !ok {
  239. if indexSchema.AllowMissing {
  240. continue
  241. } else {
  242. return fmt.Errorf("missing value for index '%s'", name)
  243. }
  244. }
  245. // Update the value of the index
  246. for _, val := range vals {
  247. indexTxn.Insert(val, obj)
  248. }
  249. }
  250. if txn.changes != nil {
  251. txn.changes = append(txn.changes, Change{
  252. Table: table,
  253. Before: existing, // might be nil on a create
  254. After: obj,
  255. primaryKey: idVal,
  256. })
  257. }
  258. return nil
  259. }
  260. // Delete is used to delete a single object from the given table.
  261. // This object must already exist in the table.
  262. func (txn *Txn) Delete(table string, obj interface{}) error {
  263. if !txn.write {
  264. return fmt.Errorf("cannot delete in read-only transaction")
  265. }
  266. // Get the table schema
  267. tableSchema, ok := txn.db.schema.Tables[table]
  268. if !ok {
  269. return fmt.Errorf("invalid table '%s'", table)
  270. }
  271. // Get the primary ID of the object
  272. idSchema := tableSchema.Indexes[id]
  273. idIndexer := idSchema.Indexer.(SingleIndexer)
  274. ok, idVal, err := idIndexer.FromObject(obj)
  275. if err != nil {
  276. return fmt.Errorf("failed to build primary index: %v", err)
  277. }
  278. if !ok {
  279. return fmt.Errorf("object missing primary index")
  280. }
  281. // Lookup the object by ID first, check fi we should continue
  282. idTxn := txn.writableIndex(table, id)
  283. existing, ok := idTxn.Get(idVal)
  284. if !ok {
  285. return ErrNotFound
  286. }
  287. // Remove the object from all the indexes
  288. for name, indexSchema := range tableSchema.Indexes {
  289. indexTxn := txn.writableIndex(table, name)
  290. // Handle the update by deleting from the index first
  291. var (
  292. ok bool
  293. vals [][]byte
  294. err error
  295. )
  296. switch indexer := indexSchema.Indexer.(type) {
  297. case SingleIndexer:
  298. var val []byte
  299. ok, val, err = indexer.FromObject(existing)
  300. vals = [][]byte{val}
  301. case MultiIndexer:
  302. ok, vals, err = indexer.FromObject(existing)
  303. }
  304. if err != nil {
  305. return fmt.Errorf("failed to build index '%s': %v", name, err)
  306. }
  307. if ok {
  308. // Handle non-unique index by computing a unique index.
  309. // This is done by appending the primary key which must
  310. // be unique anyways.
  311. for _, val := range vals {
  312. if !indexSchema.Unique {
  313. val = append(val, idVal...)
  314. }
  315. indexTxn.Delete(val)
  316. }
  317. }
  318. }
  319. if txn.changes != nil {
  320. txn.changes = append(txn.changes, Change{
  321. Table: table,
  322. Before: existing,
  323. After: nil, // Now nil indicates deletion
  324. primaryKey: idVal,
  325. })
  326. }
  327. return nil
  328. }
  329. // DeletePrefix is used to delete an entire subtree based on a prefix.
  330. // The given index must be a prefix index, and will be used to perform a scan and enumerate the set of objects to delete.
  331. // These will be removed from all other indexes, and then a special prefix operation will delete the objects from the given index in an efficient subtree delete operation.
  332. // This is useful when you have a very large number of objects indexed by the given index, along with a much smaller number of entries in the other indexes for those objects.
  333. func (txn *Txn) DeletePrefix(table string, prefix_index string, prefix string) (bool, error) {
  334. if !txn.write {
  335. return false, fmt.Errorf("cannot delete in read-only transaction")
  336. }
  337. if !strings.HasSuffix(prefix_index, "_prefix") {
  338. return false, fmt.Errorf("Index name for DeletePrefix must be a prefix index, Got %v ", prefix_index)
  339. }
  340. deletePrefixIndex := strings.TrimSuffix(prefix_index, "_prefix")
  341. // Get an iterator over all of the keys with the given prefix.
  342. entries, err := txn.Get(table, prefix_index, prefix)
  343. if err != nil {
  344. return false, fmt.Errorf("failed kvs lookup: %s", err)
  345. }
  346. // Get the table schema
  347. tableSchema, ok := txn.db.schema.Tables[table]
  348. if !ok {
  349. return false, fmt.Errorf("invalid table '%s'", table)
  350. }
  351. foundAny := false
  352. for entry := entries.Next(); entry != nil; entry = entries.Next() {
  353. if !foundAny {
  354. foundAny = true
  355. }
  356. // Get the primary ID of the object
  357. idSchema := tableSchema.Indexes[id]
  358. idIndexer := idSchema.Indexer.(SingleIndexer)
  359. ok, idVal, err := idIndexer.FromObject(entry)
  360. if err != nil {
  361. return false, fmt.Errorf("failed to build primary index: %v", err)
  362. }
  363. if !ok {
  364. return false, fmt.Errorf("object missing primary index")
  365. }
  366. if txn.changes != nil {
  367. // Record the deletion
  368. idTxn := txn.writableIndex(table, id)
  369. existing, ok := idTxn.Get(idVal)
  370. if ok {
  371. txn.changes = append(txn.changes, Change{
  372. Table: table,
  373. Before: existing,
  374. After: nil, // Now nil indicates deletion
  375. primaryKey: idVal,
  376. })
  377. }
  378. }
  379. // Remove the object from all the indexes except the given prefix index
  380. for name, indexSchema := range tableSchema.Indexes {
  381. if name == deletePrefixIndex {
  382. continue
  383. }
  384. indexTxn := txn.writableIndex(table, name)
  385. // Handle the update by deleting from the index first
  386. var (
  387. ok bool
  388. vals [][]byte
  389. err error
  390. )
  391. switch indexer := indexSchema.Indexer.(type) {
  392. case SingleIndexer:
  393. var val []byte
  394. ok, val, err = indexer.FromObject(entry)
  395. vals = [][]byte{val}
  396. case MultiIndexer:
  397. ok, vals, err = indexer.FromObject(entry)
  398. }
  399. if err != nil {
  400. return false, fmt.Errorf("failed to build index '%s': %v", name, err)
  401. }
  402. if ok {
  403. // Handle non-unique index by computing a unique index.
  404. // This is done by appending the primary key which must
  405. // be unique anyways.
  406. for _, val := range vals {
  407. if !indexSchema.Unique {
  408. val = append(val, idVal...)
  409. }
  410. indexTxn.Delete(val)
  411. }
  412. }
  413. }
  414. }
  415. if foundAny {
  416. indexTxn := txn.writableIndex(table, deletePrefixIndex)
  417. ok = indexTxn.DeletePrefix([]byte(prefix))
  418. if !ok {
  419. panic(fmt.Errorf("prefix %v matched some entries but DeletePrefix did not delete any ", prefix))
  420. }
  421. return true, nil
  422. }
  423. return false, nil
  424. }
  425. // DeleteAll is used to delete all the objects in a given table
  426. // matching the constraints on the index
  427. func (txn *Txn) DeleteAll(table, index string, args ...interface{}) (int, error) {
  428. if !txn.write {
  429. return 0, fmt.Errorf("cannot delete in read-only transaction")
  430. }
  431. // Get all the objects
  432. iter, err := txn.Get(table, index, args...)
  433. if err != nil {
  434. return 0, err
  435. }
  436. // Put them into a slice so there are no safety concerns while actually
  437. // performing the deletes
  438. var objs []interface{}
  439. for {
  440. obj := iter.Next()
  441. if obj == nil {
  442. break
  443. }
  444. objs = append(objs, obj)
  445. }
  446. // Do the deletes
  447. num := 0
  448. for _, obj := range objs {
  449. if err := txn.Delete(table, obj); err != nil {
  450. return num, err
  451. }
  452. num++
  453. }
  454. return num, nil
  455. }
  456. // FirstWatch is used to return the first matching object for
  457. // the given constraints on the index along with the watch channel
  458. func (txn *Txn) FirstWatch(table, index string, args ...interface{}) (<-chan struct{}, interface{}, error) {
  459. // Get the index value
  460. indexSchema, val, err := txn.getIndexValue(table, index, args...)
  461. if err != nil {
  462. return nil, nil, err
  463. }
  464. // Get the index itself
  465. indexTxn := txn.readableIndex(table, indexSchema.Name)
  466. // Do an exact lookup
  467. if indexSchema.Unique && val != nil && indexSchema.Name == index {
  468. watch, obj, ok := indexTxn.GetWatch(val)
  469. if !ok {
  470. return watch, nil, nil
  471. }
  472. return watch, obj, nil
  473. }
  474. // Handle non-unique index by using an iterator and getting the first value
  475. iter := indexTxn.Root().Iterator()
  476. watch := iter.SeekPrefixWatch(val)
  477. _, value, _ := iter.Next()
  478. return watch, value, nil
  479. }
  480. // LastWatch is used to return the last matching object for
  481. // the given constraints on the index along with the watch channel
  482. func (txn *Txn) LastWatch(table, index string, args ...interface{}) (<-chan struct{}, interface{}, error) {
  483. // Get the index value
  484. indexSchema, val, err := txn.getIndexValue(table, index, args...)
  485. if err != nil {
  486. return nil, nil, err
  487. }
  488. // Get the index itself
  489. indexTxn := txn.readableIndex(table, indexSchema.Name)
  490. // Do an exact lookup
  491. if indexSchema.Unique && val != nil && indexSchema.Name == index {
  492. watch, obj, ok := indexTxn.GetWatch(val)
  493. if !ok {
  494. return watch, nil, nil
  495. }
  496. return watch, obj, nil
  497. }
  498. // Handle non-unique index by using an iterator and getting the last value
  499. iter := indexTxn.Root().ReverseIterator()
  500. watch := iter.SeekPrefixWatch(val)
  501. _, value, _ := iter.Previous()
  502. return watch, value, nil
  503. }
  504. // First is used to return the first matching object for
  505. // the given constraints on the index
  506. func (txn *Txn) First(table, index string, args ...interface{}) (interface{}, error) {
  507. _, val, err := txn.FirstWatch(table, index, args...)
  508. return val, err
  509. }
  510. // Last is used to return the last matching object for
  511. // the given constraints on the index
  512. func (txn *Txn) Last(table, index string, args ...interface{}) (interface{}, error) {
  513. _, val, err := txn.LastWatch(table, index, args...)
  514. return val, err
  515. }
  516. // LongestPrefix is used to fetch the longest prefix match for the given
  517. // constraints on the index. Note that this will not work with the memdb
  518. // StringFieldIndex because it adds null terminators which prevent the
  519. // algorithm from correctly finding a match (it will get to right before the
  520. // null and fail to find a leaf node). This should only be used where the prefix
  521. // given is capable of matching indexed entries directly, which typically only
  522. // applies to a custom indexer. See the unit test for an example.
  523. func (txn *Txn) LongestPrefix(table, index string, args ...interface{}) (interface{}, error) {
  524. // Enforce that this only works on prefix indexes.
  525. if !strings.HasSuffix(index, "_prefix") {
  526. return nil, fmt.Errorf("must use '%s_prefix' on index", index)
  527. }
  528. // Get the index value.
  529. indexSchema, val, err := txn.getIndexValue(table, index, args...)
  530. if err != nil {
  531. return nil, err
  532. }
  533. // This algorithm only makes sense against a unique index, otherwise the
  534. // index keys will have the IDs appended to them.
  535. if !indexSchema.Unique {
  536. return nil, fmt.Errorf("index '%s' is not unique", index)
  537. }
  538. // Find the longest prefix match with the given index.
  539. indexTxn := txn.readableIndex(table, indexSchema.Name)
  540. if _, value, ok := indexTxn.Root().LongestPrefix(val); ok {
  541. return value, nil
  542. }
  543. return nil, nil
  544. }
  545. // getIndexValue is used to get the IndexSchema and the value
  546. // used to scan the index given the parameters. This handles prefix based
  547. // scans when the index has the "_prefix" suffix. The index must support
  548. // prefix iteration.
  549. func (txn *Txn) getIndexValue(table, index string, args ...interface{}) (*IndexSchema, []byte, error) {
  550. // Get the table schema
  551. tableSchema, ok := txn.db.schema.Tables[table]
  552. if !ok {
  553. return nil, nil, fmt.Errorf("invalid table '%s'", table)
  554. }
  555. // Check for a prefix scan
  556. prefixScan := false
  557. if strings.HasSuffix(index, "_prefix") {
  558. index = strings.TrimSuffix(index, "_prefix")
  559. prefixScan = true
  560. }
  561. // Get the index schema
  562. indexSchema, ok := tableSchema.Indexes[index]
  563. if !ok {
  564. return nil, nil, fmt.Errorf("invalid index '%s'", index)
  565. }
  566. // Hot-path for when there are no arguments
  567. if len(args) == 0 {
  568. return indexSchema, nil, nil
  569. }
  570. // Special case the prefix scanning
  571. if prefixScan {
  572. prefixIndexer, ok := indexSchema.Indexer.(PrefixIndexer)
  573. if !ok {
  574. return indexSchema, nil,
  575. fmt.Errorf("index '%s' does not support prefix scanning", index)
  576. }
  577. val, err := prefixIndexer.PrefixFromArgs(args...)
  578. if err != nil {
  579. return indexSchema, nil, fmt.Errorf("index error: %v", err)
  580. }
  581. return indexSchema, val, err
  582. }
  583. // Get the exact match index
  584. val, err := indexSchema.Indexer.FromArgs(args...)
  585. if err != nil {
  586. return indexSchema, nil, fmt.Errorf("index error: %v", err)
  587. }
  588. return indexSchema, val, err
  589. }
  590. // ResultIterator is used to iterate over a list of results from a query on a table.
  591. //
  592. // When a ResultIterator is created from a write transaction, the results from
  593. // Next will reflect a snapshot of the table at the time the ResultIterator is
  594. // created.
  595. // This means that calling Insert or Delete on a transaction while iterating is
  596. // allowed, but the changes made by Insert or Delete will not be observed in the
  597. // results returned from subsequent calls to Next. For example if an item is deleted
  598. // from the index used by the iterator it will still be returned by Next. If an
  599. // item is inserted into the index used by the iterator, it will not be returned
  600. // by Next. However, an iterator created after a call to Insert or Delete will
  601. // reflect the modifications.
  602. //
  603. // When a ResultIterator is created from a write transaction, and there are already
  604. // modifications to the index used by the iterator, the modification cache of the
  605. // index will be invalidated. This may result in some additional allocations if
  606. // the same node in the index is modified again.
  607. type ResultIterator interface {
  608. WatchCh() <-chan struct{}
  609. // Next returns the next result from the iterator. If there are no more results
  610. // nil is returned.
  611. Next() interface{}
  612. }
  613. // Get is used to construct a ResultIterator over all the rows that match the
  614. // given constraints of an index. The index values must match exactly (this
  615. // is not a range-based or prefix-based lookup) by default.
  616. //
  617. // Prefix lookups: if the named index implements PrefixIndexer, you may perform
  618. // prefix-based lookups by appending "_prefix" to the index name. In this
  619. // scenario, the index values given in args are treated as prefix lookups. For
  620. // example, a StringFieldIndex will match any string with the given value
  621. // as a prefix: "mem" matches "memdb".
  622. //
  623. // See the documentation for ResultIterator to understand the behaviour of the
  624. // returned ResultIterator.
  625. func (txn *Txn) Get(table, index string, args ...interface{}) (ResultIterator, error) {
  626. indexIter, val, err := txn.getIndexIterator(table, index, args...)
  627. if err != nil {
  628. return nil, err
  629. }
  630. // Seek the iterator to the appropriate sub-set
  631. watchCh := indexIter.SeekPrefixWatch(val)
  632. // Create an iterator
  633. iter := &radixIterator{
  634. iter: indexIter,
  635. watchCh: watchCh,
  636. }
  637. return iter, nil
  638. }
  639. // GetReverse is used to construct a Reverse ResultIterator over all the
  640. // rows that match the given constraints of an index.
  641. // The returned ResultIterator's Next() will return the next Previous value.
  642. //
  643. // See the documentation on Get for details on arguments.
  644. // See the documentation for ResultIterator to understand the behaviour of the
  645. // returned ResultIterator.
  646. func (txn *Txn) GetReverse(table, index string, args ...interface{}) (ResultIterator, error) {
  647. indexIter, val, err := txn.getIndexIteratorReverse(table, index, args...)
  648. if err != nil {
  649. return nil, err
  650. }
  651. // Seek the iterator to the appropriate sub-set
  652. watchCh := indexIter.SeekPrefixWatch(val)
  653. // Create an iterator
  654. iter := &radixReverseIterator{
  655. iter: indexIter,
  656. watchCh: watchCh,
  657. }
  658. return iter, nil
  659. }
  660. // LowerBound is used to construct a ResultIterator over all the the range of
  661. // rows that have an index value greater than or equal to the provide args.
  662. // Calling this then iterating until the rows are larger than required allows
  663. // range scans within an index. It is not possible to watch the resulting
  664. // iterator since the radix tree doesn't efficiently allow watching on lower
  665. // bound changes. The WatchCh returned will be nill and so will block forever.
  666. //
  667. // See the documentation for ResultIterator to understand the behaviour of the
  668. // returned ResultIterator.
  669. func (txn *Txn) LowerBound(table, index string, args ...interface{}) (ResultIterator, error) {
  670. indexIter, val, err := txn.getIndexIterator(table, index, args...)
  671. if err != nil {
  672. return nil, err
  673. }
  674. // Seek the iterator to the appropriate sub-set
  675. indexIter.SeekLowerBound(val)
  676. // Create an iterator
  677. iter := &radixIterator{
  678. iter: indexIter,
  679. }
  680. return iter, nil
  681. }
  682. // ReverseLowerBound is used to construct a Reverse ResultIterator over all the
  683. // the range of rows that have an index value less than or equal to the
  684. // provide args. Calling this then iterating until the rows are lower than
  685. // required allows range scans within an index. It is not possible to watch the
  686. // resulting iterator since the radix tree doesn't efficiently allow watching
  687. // on lower bound changes. The WatchCh returned will be nill and so will block
  688. // forever.
  689. //
  690. // See the documentation for ResultIterator to understand the behaviour of the
  691. // returned ResultIterator.
  692. func (txn *Txn) ReverseLowerBound(table, index string, args ...interface{}) (ResultIterator, error) {
  693. indexIter, val, err := txn.getIndexIteratorReverse(table, index, args...)
  694. if err != nil {
  695. return nil, err
  696. }
  697. // Seek the iterator to the appropriate sub-set
  698. indexIter.SeekReverseLowerBound(val)
  699. // Create an iterator
  700. iter := &radixReverseIterator{
  701. iter: indexIter,
  702. }
  703. return iter, nil
  704. }
  705. // objectID is a tuple of table name and the raw internal id byte slice
  706. // converted to a string. It's only converted to a string to make it comparable
  707. // so this struct can be used as a map index.
  708. type objectID struct {
  709. Table string
  710. IndexVal string
  711. }
  712. // mutInfo stores metadata about mutations to allow collapsing multiple
  713. // mutations to the same object into one.
  714. type mutInfo struct {
  715. firstBefore interface{}
  716. lastIdx int
  717. }
  718. // Changes returns the set of object changes that have been made in the
  719. // transaction so far. If change tracking is not enabled it wil always return
  720. // nil. It can be called before or after Commit. If it is before Commit it will
  721. // return all changes made so far which may not be the same as the final
  722. // Changes. After abort it will always return nil. As with other Txn methods
  723. // it's not safe to call this from a different goroutine than the one making
  724. // mutations or committing the transaction. Mutations will appear in the order
  725. // they were performed in the transaction but multiple operations to the same
  726. // object will be collapsed so only the effective overall change to that object
  727. // is present. If transaction operations are dependent (e.g. copy object X to Y
  728. // then delete X) this might mean the set of mutations is incomplete to verify
  729. // history, but it is complete in that the net effect is preserved (Y got a new
  730. // value, X got removed).
  731. func (txn *Txn) Changes() Changes {
  732. if txn.changes == nil {
  733. return nil
  734. }
  735. // De-duplicate mutations by key so all take effect at the point of the last
  736. // write but we keep the mutations in order.
  737. dups := make(map[objectID]mutInfo)
  738. for i, m := range txn.changes {
  739. oid := objectID{
  740. Table: m.Table,
  741. IndexVal: string(m.primaryKey),
  742. }
  743. // Store the latest mutation index for each key value
  744. mi, ok := dups[oid]
  745. if !ok {
  746. // First entry for key, store the before value
  747. mi.firstBefore = m.Before
  748. }
  749. mi.lastIdx = i
  750. dups[oid] = mi
  751. }
  752. if len(dups) == len(txn.changes) {
  753. // No duplicates found, fast path return it as is
  754. return txn.changes
  755. }
  756. // Need to remove the duplicates
  757. cs := make(Changes, 0, len(dups))
  758. for i, m := range txn.changes {
  759. oid := objectID{
  760. Table: m.Table,
  761. IndexVal: string(m.primaryKey),
  762. }
  763. mi := dups[oid]
  764. if mi.lastIdx == i {
  765. // This was the latest value for this key copy it with the before value in
  766. // case it's different. Note that m is not a pointer so we are not
  767. // modifying the txn.changeSet here - it's already a copy.
  768. m.Before = mi.firstBefore
  769. // Edge case - if the object was inserted and then eventually deleted in
  770. // the same transaction, then the net affect on that key is a no-op. Don't
  771. // emit a mutation with nil for before and after as it's meaningless and
  772. // might violate expectations and cause a panic in code that assumes at
  773. // least one must be set.
  774. if m.Before == nil && m.After == nil {
  775. continue
  776. }
  777. cs = append(cs, m)
  778. }
  779. }
  780. // Store the de-duped version in case this is called again
  781. txn.changes = cs
  782. return cs
  783. }
  784. func (txn *Txn) getIndexIterator(table, index string, args ...interface{}) (*iradix.Iterator, []byte, error) {
  785. // Get the index value to scan
  786. indexSchema, val, err := txn.getIndexValue(table, index, args...)
  787. if err != nil {
  788. return nil, nil, err
  789. }
  790. // Get the index itself
  791. indexTxn := txn.readableIndex(table, indexSchema.Name)
  792. indexRoot := indexTxn.Root()
  793. // Get an iterator over the index
  794. indexIter := indexRoot.Iterator()
  795. return indexIter, val, nil
  796. }
  797. func (txn *Txn) getIndexIteratorReverse(table, index string, args ...interface{}) (*iradix.ReverseIterator, []byte, error) {
  798. // Get the index value to scan
  799. indexSchema, val, err := txn.getIndexValue(table, index, args...)
  800. if err != nil {
  801. return nil, nil, err
  802. }
  803. // Get the index itself
  804. indexTxn := txn.readableIndex(table, indexSchema.Name)
  805. indexRoot := indexTxn.Root()
  806. // Get an interator over the index
  807. indexIter := indexRoot.ReverseIterator()
  808. return indexIter, val, nil
  809. }
  810. // Defer is used to push a new arbitrary function onto a stack which
  811. // gets called when a transaction is committed and finished. Deferred
  812. // functions are called in LIFO order, and only invoked at the end of
  813. // write transactions.
  814. func (txn *Txn) Defer(fn func()) {
  815. txn.after = append(txn.after, fn)
  816. }
  817. // radixIterator is used to wrap an underlying iradix iterator.
  818. // This is much more efficient than a sliceIterator as we are not
  819. // materializing the entire view.
  820. type radixIterator struct {
  821. iter *iradix.Iterator
  822. watchCh <-chan struct{}
  823. }
  824. func (r *radixIterator) WatchCh() <-chan struct{} {
  825. return r.watchCh
  826. }
  827. func (r *radixIterator) Next() interface{} {
  828. _, value, ok := r.iter.Next()
  829. if !ok {
  830. return nil
  831. }
  832. return value
  833. }
  834. type radixReverseIterator struct {
  835. iter *iradix.ReverseIterator
  836. watchCh <-chan struct{}
  837. }
  838. func (r *radixReverseIterator) Next() interface{} {
  839. _, value, ok := r.iter.Previous()
  840. if !ok {
  841. return nil
  842. }
  843. return value
  844. }
  845. func (r *radixReverseIterator) WatchCh() <-chan struct{} {
  846. return r.watchCh
  847. }
  848. // Snapshot creates a snapshot of the current state of the transaction.
  849. // Returns a new read-only transaction or nil if the transaction is already
  850. // aborted or committed.
  851. func (txn *Txn) Snapshot() *Txn {
  852. if txn.rootTxn == nil {
  853. return nil
  854. }
  855. snapshot := &Txn{
  856. db: txn.db,
  857. rootTxn: txn.rootTxn.Clone(),
  858. }
  859. // Commit sub-transactions into the snapshot
  860. for key, subTxn := range txn.modified {
  861. path := indexPath(key.Table, key.Index)
  862. final := subTxn.CommitOnly()
  863. snapshot.rootTxn.Insert(path, final)
  864. }
  865. return snapshot
  866. }