|
@@ -7,13 +7,18 @@ import (
|
|
"sync/atomic"
|
|
"sync/atomic"
|
|
"unsafe"
|
|
"unsafe"
|
|
|
|
|
|
- "github.com/hashicorp/go-immutable-radix"
|
|
|
|
|
|
+ iradix "github.com/hashicorp/go-immutable-radix"
|
|
)
|
|
)
|
|
|
|
|
|
const (
|
|
const (
|
|
id = "id"
|
|
id = "id"
|
|
)
|
|
)
|
|
|
|
|
|
|
|
+var (
|
|
|
|
+ // ErrNotFound is returned when the requested item is not found
|
|
|
|
+ ErrNotFound = fmt.Errorf("not found")
|
|
|
|
+)
|
|
|
|
+
|
|
// tableIndex is a tuple of (Table, Index) used for lookups
|
|
// tableIndex is a tuple of (Table, Index) used for lookups
|
|
type tableIndex struct {
|
|
type tableIndex struct {
|
|
Table string
|
|
Table string
|
|
@@ -28,19 +33,35 @@ type Txn struct {
|
|
rootTxn *iradix.Txn
|
|
rootTxn *iradix.Txn
|
|
after []func()
|
|
after []func()
|
|
|
|
|
|
|
|
+ // changes is used to track the changes performed during the transaction. If
|
|
|
|
+ // it is nil at transaction start then changes are not tracked.
|
|
|
|
+ changes Changes
|
|
|
|
+
|
|
modified map[tableIndex]*iradix.Txn
|
|
modified map[tableIndex]*iradix.Txn
|
|
}
|
|
}
|
|
|
|
|
|
-// readableIndex returns a transaction usable for reading the given
|
|
|
|
-// index in a table. If a write transaction is in progress, we may need
|
|
|
|
-// to use an existing modified txn.
|
|
|
|
|
|
+// TrackChanges enables change tracking for the transaction. If called at any
|
|
|
|
+// point before commit, subsequent mutations will be recorded and can be
|
|
|
|
+// retrieved using ChangeSet. Once this has been called on a transaction it
|
|
|
|
+// can't be unset. As with other Txn methods it's not safe to call this from a
|
|
|
|
+// different goroutine than the one making mutations or committing the
|
|
|
|
+// transaction.
|
|
|
|
+func (txn *Txn) TrackChanges() {
|
|
|
|
+ if txn.changes == nil {
|
|
|
|
+ txn.changes = make(Changes, 0, 1)
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// readableIndex returns a transaction usable for reading the given index in a
|
|
|
|
+// table. If the transaction is a write transaction with modifications, a clone of the
|
|
|
|
+// modified index will be returned.
|
|
func (txn *Txn) readableIndex(table, index string) *iradix.Txn {
|
|
func (txn *Txn) readableIndex(table, index string) *iradix.Txn {
|
|
// Look for existing transaction
|
|
// Look for existing transaction
|
|
if txn.write && txn.modified != nil {
|
|
if txn.write && txn.modified != nil {
|
|
key := tableIndex{table, index}
|
|
key := tableIndex{table, index}
|
|
exist, ok := txn.modified[key]
|
|
exist, ok := txn.modified[key]
|
|
if ok {
|
|
if ok {
|
|
- return exist
|
|
|
|
|
|
+ return exist.Clone()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -70,6 +91,11 @@ func (txn *Txn) writableIndex(table, index string) *iradix.Txn {
|
|
raw, _ := txn.rootTxn.Get(path)
|
|
raw, _ := txn.rootTxn.Get(path)
|
|
indexTxn := raw.(*iradix.Tree).Txn()
|
|
indexTxn := raw.(*iradix.Tree).Txn()
|
|
|
|
|
|
|
|
+ // If we are the primary DB, enable mutation tracking. Snapshots should
|
|
|
|
+ // not notify, otherwise we will trigger watches on the primary DB when
|
|
|
|
+ // the writes will not be visible.
|
|
|
|
+ indexTxn.TrackMutate(txn.db.primary)
|
|
|
|
+
|
|
// Keep this open for the duration of the txn
|
|
// Keep this open for the duration of the txn
|
|
txn.modified[key] = indexTxn
|
|
txn.modified[key] = indexTxn
|
|
return indexTxn
|
|
return indexTxn
|
|
@@ -91,6 +117,7 @@ func (txn *Txn) Abort() {
|
|
// Clear the txn
|
|
// Clear the txn
|
|
txn.rootTxn = nil
|
|
txn.rootTxn = nil
|
|
txn.modified = nil
|
|
txn.modified = nil
|
|
|
|
+ txn.changes = nil
|
|
|
|
|
|
// Release the writer lock since this is invalid
|
|
// Release the writer lock since this is invalid
|
|
txn.db.writer.Unlock()
|
|
txn.db.writer.Unlock()
|
|
@@ -112,14 +139,23 @@ func (txn *Txn) Commit() {
|
|
// Commit each sub-transaction scoped to (table, index)
|
|
// Commit each sub-transaction scoped to (table, index)
|
|
for key, subTxn := range txn.modified {
|
|
for key, subTxn := range txn.modified {
|
|
path := indexPath(key.Table, key.Index)
|
|
path := indexPath(key.Table, key.Index)
|
|
- final := subTxn.Commit()
|
|
|
|
|
|
+ final := subTxn.CommitOnly()
|
|
txn.rootTxn.Insert(path, final)
|
|
txn.rootTxn.Insert(path, final)
|
|
}
|
|
}
|
|
|
|
|
|
// Update the root of the DB
|
|
// Update the root of the DB
|
|
- newRoot := txn.rootTxn.Commit()
|
|
|
|
|
|
+ newRoot := txn.rootTxn.CommitOnly()
|
|
atomic.StorePointer(&txn.db.root, unsafe.Pointer(newRoot))
|
|
atomic.StorePointer(&txn.db.root, unsafe.Pointer(newRoot))
|
|
|
|
|
|
|
|
+ // Now issue all of the mutation updates (this is safe to call
|
|
|
|
+ // even if mutation tracking isn't enabled); we do this after
|
|
|
|
+ // the root pointer is swapped so that waking responders will
|
|
|
|
+ // see the new state.
|
|
|
|
+ for _, subTxn := range txn.modified {
|
|
|
|
+ subTxn.Notify()
|
|
|
|
+ }
|
|
|
|
+ txn.rootTxn.Notify()
|
|
|
|
+
|
|
// Clear the txn
|
|
// Clear the txn
|
|
txn.rootTxn = nil
|
|
txn.rootTxn = nil
|
|
txn.modified = nil
|
|
txn.modified = nil
|
|
@@ -134,7 +170,11 @@ func (txn *Txn) Commit() {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-// Insert is used to add or update an object into the given table
|
|
|
|
|
|
+// Insert is used to add or update an object into the given table.
|
|
|
|
+//
|
|
|
|
+// When updating an object, the obj provided should be a copy rather
|
|
|
|
+// than a value updated in-place. Modifying values in-place that are already
|
|
|
|
+// inserted into MemDB is not supported behavior.
|
|
func (txn *Txn) Insert(table string, obj interface{}) error {
|
|
func (txn *Txn) Insert(table string, obj interface{}) error {
|
|
if !txn.write {
|
|
if !txn.write {
|
|
return fmt.Errorf("cannot insert in read-only transaction")
|
|
return fmt.Errorf("cannot insert in read-only transaction")
|
|
@@ -246,11 +286,19 @@ func (txn *Txn) Insert(table string, obj interface{}) error {
|
|
indexTxn.Insert(val, obj)
|
|
indexTxn.Insert(val, obj)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ if txn.changes != nil {
|
|
|
|
+ txn.changes = append(txn.changes, Change{
|
|
|
|
+ Table: table,
|
|
|
|
+ Before: existing, // might be nil on a create
|
|
|
|
+ After: obj,
|
|
|
|
+ primaryKey: idVal,
|
|
|
|
+ })
|
|
|
|
+ }
|
|
return nil
|
|
return nil
|
|
}
|
|
}
|
|
|
|
|
|
-// Delete is used to delete a single object from the given table
|
|
|
|
-// This object must already exist in the table
|
|
|
|
|
|
+// Delete is used to delete a single object from the given table.
|
|
|
|
+// This object must already exist in the table.
|
|
func (txn *Txn) Delete(table string, obj interface{}) error {
|
|
func (txn *Txn) Delete(table string, obj interface{}) error {
|
|
if !txn.write {
|
|
if !txn.write {
|
|
return fmt.Errorf("cannot delete in read-only transaction")
|
|
return fmt.Errorf("cannot delete in read-only transaction")
|
|
@@ -277,7 +325,7 @@ func (txn *Txn) Delete(table string, obj interface{}) error {
|
|
idTxn := txn.writableIndex(table, id)
|
|
idTxn := txn.writableIndex(table, id)
|
|
existing, ok := idTxn.Get(idVal)
|
|
existing, ok := idTxn.Get(idVal)
|
|
if !ok {
|
|
if !ok {
|
|
- return fmt.Errorf("not found")
|
|
|
|
|
|
+ return ErrNotFound
|
|
}
|
|
}
|
|
|
|
|
|
// Remove the object from all the indexes
|
|
// Remove the object from all the indexes
|
|
@@ -313,9 +361,121 @@ func (txn *Txn) Delete(table string, obj interface{}) error {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ if txn.changes != nil {
|
|
|
|
+ txn.changes = append(txn.changes, Change{
|
|
|
|
+ Table: table,
|
|
|
|
+ Before: existing,
|
|
|
|
+ After: nil, // Now nil indicates deletion
|
|
|
|
+ primaryKey: idVal,
|
|
|
|
+ })
|
|
|
|
+ }
|
|
return nil
|
|
return nil
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+// DeletePrefix is used to delete an entire subtree based on a prefix.
|
|
|
|
+// The given index must be a prefix index, and will be used to perform a scan and enumerate the set of objects to delete.
|
|
|
|
+// These will be removed from all other indexes, and then a special prefix operation will delete the objects from the given index in an efficient subtree delete operation.
|
|
|
|
+// This is useful when you have a very large number of objects indexed by the given index, along with a much smaller number of entries in the other indexes for those objects.
|
|
|
|
+func (txn *Txn) DeletePrefix(table string, prefix_index string, prefix string) (bool, error) {
|
|
|
|
+ if !txn.write {
|
|
|
|
+ return false, fmt.Errorf("cannot delete in read-only transaction")
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if !strings.HasSuffix(prefix_index, "_prefix") {
|
|
|
|
+ return false, fmt.Errorf("Index name for DeletePrefix must be a prefix index, Got %v ", prefix_index)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ deletePrefixIndex := strings.TrimSuffix(prefix_index, "_prefix")
|
|
|
|
+
|
|
|
|
+ // Get an iterator over all of the keys with the given prefix.
|
|
|
|
+ entries, err := txn.Get(table, prefix_index, prefix)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return false, fmt.Errorf("failed kvs lookup: %s", err)
|
|
|
|
+ }
|
|
|
|
+ // Get the table schema
|
|
|
|
+ tableSchema, ok := txn.db.schema.Tables[table]
|
|
|
|
+ if !ok {
|
|
|
|
+ return false, fmt.Errorf("invalid table '%s'", table)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ foundAny := false
|
|
|
|
+ for entry := entries.Next(); entry != nil; entry = entries.Next() {
|
|
|
|
+ if !foundAny {
|
|
|
|
+ foundAny = true
|
|
|
|
+ }
|
|
|
|
+ // Get the primary ID of the object
|
|
|
|
+ idSchema := tableSchema.Indexes[id]
|
|
|
|
+ idIndexer := idSchema.Indexer.(SingleIndexer)
|
|
|
|
+ ok, idVal, err := idIndexer.FromObject(entry)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return false, fmt.Errorf("failed to build primary index: %v", err)
|
|
|
|
+ }
|
|
|
|
+ if !ok {
|
|
|
|
+ return false, fmt.Errorf("object missing primary index")
|
|
|
|
+ }
|
|
|
|
+ if txn.changes != nil {
|
|
|
|
+ // Record the deletion
|
|
|
|
+ idTxn := txn.writableIndex(table, id)
|
|
|
|
+ existing, ok := idTxn.Get(idVal)
|
|
|
|
+ if ok {
|
|
|
|
+ txn.changes = append(txn.changes, Change{
|
|
|
|
+ Table: table,
|
|
|
|
+ Before: existing,
|
|
|
|
+ After: nil, // Now nil indicates deletion
|
|
|
|
+ primaryKey: idVal,
|
|
|
|
+ })
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ // Remove the object from all the indexes except the given prefix index
|
|
|
|
+ for name, indexSchema := range tableSchema.Indexes {
|
|
|
|
+ if name == deletePrefixIndex {
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+ indexTxn := txn.writableIndex(table, name)
|
|
|
|
+
|
|
|
|
+ // Handle the update by deleting from the index first
|
|
|
|
+ var (
|
|
|
|
+ ok bool
|
|
|
|
+ vals [][]byte
|
|
|
|
+ err error
|
|
|
|
+ )
|
|
|
|
+ switch indexer := indexSchema.Indexer.(type) {
|
|
|
|
+ case SingleIndexer:
|
|
|
|
+ var val []byte
|
|
|
|
+ ok, val, err = indexer.FromObject(entry)
|
|
|
|
+ vals = [][]byte{val}
|
|
|
|
+ case MultiIndexer:
|
|
|
|
+ ok, vals, err = indexer.FromObject(entry)
|
|
|
|
+ }
|
|
|
|
+ if err != nil {
|
|
|
|
+ return false, fmt.Errorf("failed to build index '%s': %v", name, err)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if ok {
|
|
|
|
+ // Handle non-unique index by computing a unique index.
|
|
|
|
+ // This is done by appending the primary key which must
|
|
|
|
+ // be unique anyways.
|
|
|
|
+ for _, val := range vals {
|
|
|
|
+ if !indexSchema.Unique {
|
|
|
|
+ val = append(val, idVal...)
|
|
|
|
+ }
|
|
|
|
+ indexTxn.Delete(val)
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+ if foundAny {
|
|
|
|
+ indexTxn := txn.writableIndex(table, deletePrefixIndex)
|
|
|
|
+ ok = indexTxn.DeletePrefix([]byte(prefix))
|
|
|
|
+ if !ok {
|
|
|
|
+ panic(fmt.Errorf("prefix %v matched some entries but DeletePrefix did not delete any ", prefix))
|
|
|
|
+ }
|
|
|
|
+ return true, nil
|
|
|
|
+ }
|
|
|
|
+ return false, nil
|
|
|
|
+}
|
|
|
|
+
|
|
// DeleteAll is used to delete all the objects in a given table
|
|
// DeleteAll is used to delete all the objects in a given table
|
|
// matching the constraints on the index
|
|
// matching the constraints on the index
|
|
func (txn *Txn) DeleteAll(table, index string, args ...interface{}) (int, error) {
|
|
func (txn *Txn) DeleteAll(table, index string, args ...interface{}) (int, error) {
|
|
@@ -352,13 +512,13 @@ func (txn *Txn) DeleteAll(table, index string, args ...interface{}) (int, error)
|
|
return num, nil
|
|
return num, nil
|
|
}
|
|
}
|
|
|
|
|
|
-// First is used to return the first matching object for
|
|
|
|
-// the given constraints on the index
|
|
|
|
-func (txn *Txn) First(table, index string, args ...interface{}) (interface{}, error) {
|
|
|
|
|
|
+// FirstWatch is used to return the first matching object for
|
|
|
|
+// the given constraints on the index along with the watch channel
|
|
|
|
+func (txn *Txn) FirstWatch(table, index string, args ...interface{}) (<-chan struct{}, interface{}, error) {
|
|
// Get the index value
|
|
// Get the index value
|
|
indexSchema, val, err := txn.getIndexValue(table, index, args...)
|
|
indexSchema, val, err := txn.getIndexValue(table, index, args...)
|
|
if err != nil {
|
|
if err != nil {
|
|
- return nil, err
|
|
|
|
|
|
+ return nil, nil, err
|
|
}
|
|
}
|
|
|
|
|
|
// Get the index itself
|
|
// Get the index itself
|
|
@@ -366,18 +526,60 @@ func (txn *Txn) First(table, index string, args ...interface{}) (interface{}, er
|
|
|
|
|
|
// Do an exact lookup
|
|
// Do an exact lookup
|
|
if indexSchema.Unique && val != nil && indexSchema.Name == index {
|
|
if indexSchema.Unique && val != nil && indexSchema.Name == index {
|
|
- obj, ok := indexTxn.Get(val)
|
|
|
|
|
|
+ watch, obj, ok := indexTxn.GetWatch(val)
|
|
if !ok {
|
|
if !ok {
|
|
- return nil, nil
|
|
|
|
|
|
+ return watch, nil, nil
|
|
}
|
|
}
|
|
- return obj, nil
|
|
|
|
|
|
+ return watch, obj, nil
|
|
}
|
|
}
|
|
|
|
|
|
// Handle non-unique index by using an iterator and getting the first value
|
|
// Handle non-unique index by using an iterator and getting the first value
|
|
iter := indexTxn.Root().Iterator()
|
|
iter := indexTxn.Root().Iterator()
|
|
- iter.SeekPrefix(val)
|
|
|
|
|
|
+ watch := iter.SeekPrefixWatch(val)
|
|
_, value, _ := iter.Next()
|
|
_, value, _ := iter.Next()
|
|
- return value, nil
|
|
|
|
|
|
+ return watch, value, nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// LastWatch is used to return the last matching object for
|
|
|
|
+// the given constraints on the index along with the watch channel
|
|
|
|
+func (txn *Txn) LastWatch(table, index string, args ...interface{}) (<-chan struct{}, interface{}, error) {
|
|
|
|
+ // Get the index value
|
|
|
|
+ indexSchema, val, err := txn.getIndexValue(table, index, args...)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return nil, nil, err
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Get the index itself
|
|
|
|
+ indexTxn := txn.readableIndex(table, indexSchema.Name)
|
|
|
|
+
|
|
|
|
+ // Do an exact lookup
|
|
|
|
+ if indexSchema.Unique && val != nil && indexSchema.Name == index {
|
|
|
|
+ watch, obj, ok := indexTxn.GetWatch(val)
|
|
|
|
+ if !ok {
|
|
|
|
+ return watch, nil, nil
|
|
|
|
+ }
|
|
|
|
+ return watch, obj, nil
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Handle non-unique index by using an iterator and getting the last value
|
|
|
|
+ iter := indexTxn.Root().ReverseIterator()
|
|
|
|
+ watch := iter.SeekPrefixWatch(val)
|
|
|
|
+ _, value, _ := iter.Previous()
|
|
|
|
+ return watch, value, nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// First is used to return the first matching object for
|
|
|
|
+// the given constraints on the index
|
|
|
|
+func (txn *Txn) First(table, index string, args ...interface{}) (interface{}, error) {
|
|
|
|
+ _, val, err := txn.FirstWatch(table, index, args...)
|
|
|
|
+ return val, err
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// Last is used to return the last matching object for
|
|
|
|
+// the given constraints on the index
|
|
|
|
+func (txn *Txn) Last(table, index string, args ...interface{}) (interface{}, error) {
|
|
|
|
+ _, val, err := txn.LastWatch(table, index, args...)
|
|
|
|
+ return val, err
|
|
}
|
|
}
|
|
|
|
|
|
// LongestPrefix is used to fetch the longest prefix match for the given
|
|
// LongestPrefix is used to fetch the longest prefix match for the given
|
|
@@ -465,30 +667,100 @@ func (txn *Txn) getIndexValue(table, index string, args ...interface{}) (*IndexS
|
|
return indexSchema, val, err
|
|
return indexSchema, val, err
|
|
}
|
|
}
|
|
|
|
|
|
-// ResultIterator is used to iterate over a list of results
|
|
|
|
-// from a Get query on a table.
|
|
|
|
|
|
+// ResultIterator is used to iterate over a list of results from a query on a table.
|
|
|
|
+//
|
|
|
|
+// When a ResultIterator is created from a write transaction, the results from
|
|
|
|
+// Next will reflect a snapshot of the table at the time the ResultIterator is
|
|
|
|
+// created.
|
|
|
|
+// This means that calling Insert or Delete on a transaction while iterating is
|
|
|
|
+// allowed, but the changes made by Insert or Delete will not be observed in the
|
|
|
|
+// results returned from subsequent calls to Next. For example if an item is deleted
|
|
|
|
+// from the index used by the iterator it will still be returned by Next. If an
|
|
|
|
+// item is inserted into the index used by the iterator, it will not be returned
|
|
|
|
+// by Next. However, an iterator created after a call to Insert or Delete will
|
|
|
|
+// reflect the modifications.
|
|
|
|
+//
|
|
|
|
+// When a ResultIterator is created from a write transaction, and there are already
|
|
|
|
+// modifications to the index used by the iterator, the modification cache of the
|
|
|
|
+// index will be invalidated. This may result in some additional allocations if
|
|
|
|
+// the same node in the index is modified again.
|
|
type ResultIterator interface {
|
|
type ResultIterator interface {
|
|
|
|
+ WatchCh() <-chan struct{}
|
|
|
|
+ // Next returns the next result from the iterator. If there are no more results
|
|
|
|
+ // nil is returned.
|
|
Next() interface{}
|
|
Next() interface{}
|
|
}
|
|
}
|
|
|
|
|
|
-// Get is used to construct a ResultIterator over all the
|
|
|
|
-// rows that match the given constraints of an index.
|
|
|
|
|
|
+// Get is used to construct a ResultIterator over all the rows that match the
|
|
|
|
+// given constraints of an index. The index values must match exactly (this
|
|
|
|
+// is not a range-based or prefix-based lookup) by default.
|
|
|
|
+//
|
|
|
|
+// Prefix lookups: if the named index implements PrefixIndexer, you may perform
|
|
|
|
+// prefix-based lookups by appending "_prefix" to the index name. In this
|
|
|
|
+// scenario, the index values given in args are treated as prefix lookups. For
|
|
|
|
+// example, a StringFieldIndex will match any string with the given value
|
|
|
|
+// as a prefix: "mem" matches "memdb".
|
|
|
|
+//
|
|
|
|
+// See the documentation for ResultIterator to understand the behaviour of the
|
|
|
|
+// returned ResultIterator.
|
|
func (txn *Txn) Get(table, index string, args ...interface{}) (ResultIterator, error) {
|
|
func (txn *Txn) Get(table, index string, args ...interface{}) (ResultIterator, error) {
|
|
- // Get the index value to scan
|
|
|
|
- indexSchema, val, err := txn.getIndexValue(table, index, args...)
|
|
|
|
|
|
+ indexIter, val, err := txn.getIndexIterator(table, index, args...)
|
|
if err != nil {
|
|
if err != nil {
|
|
return nil, err
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
|
|
- // Get the index itself
|
|
|
|
- indexTxn := txn.readableIndex(table, indexSchema.Name)
|
|
|
|
- indexRoot := indexTxn.Root()
|
|
|
|
|
|
+ // Seek the iterator to the appropriate sub-set
|
|
|
|
+ watchCh := indexIter.SeekPrefixWatch(val)
|
|
|
|
|
|
- // Get an interator over the index
|
|
|
|
- indexIter := indexRoot.Iterator()
|
|
|
|
|
|
+ // Create an iterator
|
|
|
|
+ iter := &radixIterator{
|
|
|
|
+ iter: indexIter,
|
|
|
|
+ watchCh: watchCh,
|
|
|
|
+ }
|
|
|
|
+ return iter, nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// GetReverse is used to construct a Reverse ResultIterator over all the
|
|
|
|
+// rows that match the given constraints of an index.
|
|
|
|
+// The returned ResultIterator's Next() will return the next Previous value.
|
|
|
|
+//
|
|
|
|
+// See the documentation on Get for details on arguments.
|
|
|
|
+// See the documentation for ResultIterator to understand the behaviour of the
|
|
|
|
+// returned ResultIterator.
|
|
|
|
+func (txn *Txn) GetReverse(table, index string, args ...interface{}) (ResultIterator, error) {
|
|
|
|
+ indexIter, val, err := txn.getIndexIteratorReverse(table, index, args...)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return nil, err
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Seek the iterator to the appropriate sub-set
|
|
|
|
+ watchCh := indexIter.SeekPrefixWatch(val)
|
|
|
|
+
|
|
|
|
+ // Create an iterator
|
|
|
|
+ iter := &radixReverseIterator{
|
|
|
|
+ iter: indexIter,
|
|
|
|
+ watchCh: watchCh,
|
|
|
|
+ }
|
|
|
|
+ return iter, nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// LowerBound is used to construct a ResultIterator over all the the range of
|
|
|
|
+// rows that have an index value greater than or equal to the provide args.
|
|
|
|
+// Calling this then iterating until the rows are larger than required allows
|
|
|
|
+// range scans within an index. It is not possible to watch the resulting
|
|
|
|
+// iterator since the radix tree doesn't efficiently allow watching on lower
|
|
|
|
+// bound changes. The WatchCh returned will be nill and so will block forever.
|
|
|
|
+//
|
|
|
|
+// See the documentation for ResultIterator to understand the behaviour of the
|
|
|
|
+// returned ResultIterator.
|
|
|
|
+func (txn *Txn) LowerBound(table, index string, args ...interface{}) (ResultIterator, error) {
|
|
|
|
+ indexIter, val, err := txn.getIndexIterator(table, index, args...)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return nil, err
|
|
|
|
+ }
|
|
|
|
|
|
// Seek the iterator to the appropriate sub-set
|
|
// Seek the iterator to the appropriate sub-set
|
|
- indexIter.SeekPrefix(val)
|
|
|
|
|
|
+ indexIter.SeekLowerBound(val)
|
|
|
|
|
|
// Create an iterator
|
|
// Create an iterator
|
|
iter := &radixIterator{
|
|
iter := &radixIterator{
|
|
@@ -497,6 +769,149 @@ func (txn *Txn) Get(table, index string, args ...interface{}) (ResultIterator, e
|
|
return iter, nil
|
|
return iter, nil
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+// ReverseLowerBound is used to construct a Reverse ResultIterator over all the
|
|
|
|
+// the range of rows that have an index value less than or equal to the
|
|
|
|
+// provide args. Calling this then iterating until the rows are lower than
|
|
|
|
+// required allows range scans within an index. It is not possible to watch the
|
|
|
|
+// resulting iterator since the radix tree doesn't efficiently allow watching
|
|
|
|
+// on lower bound changes. The WatchCh returned will be nill and so will block
|
|
|
|
+// forever.
|
|
|
|
+//
|
|
|
|
+// See the documentation for ResultIterator to understand the behaviour of the
|
|
|
|
+// returned ResultIterator.
|
|
|
|
+func (txn *Txn) ReverseLowerBound(table, index string, args ...interface{}) (ResultIterator, error) {
|
|
|
|
+ indexIter, val, err := txn.getIndexIteratorReverse(table, index, args...)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return nil, err
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Seek the iterator to the appropriate sub-set
|
|
|
|
+ indexIter.SeekReverseLowerBound(val)
|
|
|
|
+
|
|
|
|
+ // Create an iterator
|
|
|
|
+ iter := &radixReverseIterator{
|
|
|
|
+ iter: indexIter,
|
|
|
|
+ }
|
|
|
|
+ return iter, nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// objectID is a tuple of table name and the raw internal id byte slice
|
|
|
|
+// converted to a string. It's only converted to a string to make it comparable
|
|
|
|
+// so this struct can be used as a map index.
|
|
|
|
+type objectID struct {
|
|
|
|
+ Table string
|
|
|
|
+ IndexVal string
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// mutInfo stores metadata about mutations to allow collapsing multiple
|
|
|
|
+// mutations to the same object into one.
|
|
|
|
+type mutInfo struct {
|
|
|
|
+ firstBefore interface{}
|
|
|
|
+ lastIdx int
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// Changes returns the set of object changes that have been made in the
|
|
|
|
+// transaction so far. If change tracking is not enabled it wil always return
|
|
|
|
+// nil. It can be called before or after Commit. If it is before Commit it will
|
|
|
|
+// return all changes made so far which may not be the same as the final
|
|
|
|
+// Changes. After abort it will always return nil. As with other Txn methods
|
|
|
|
+// it's not safe to call this from a different goroutine than the one making
|
|
|
|
+// mutations or committing the transaction. Mutations will appear in the order
|
|
|
|
+// they were performed in the transaction but multiple operations to the same
|
|
|
|
+// object will be collapsed so only the effective overall change to that object
|
|
|
|
+// is present. If transaction operations are dependent (e.g. copy object X to Y
|
|
|
|
+// then delete X) this might mean the set of mutations is incomplete to verify
|
|
|
|
+// history, but it is complete in that the net effect is preserved (Y got a new
|
|
|
|
+// value, X got removed).
|
|
|
|
+func (txn *Txn) Changes() Changes {
|
|
|
|
+ if txn.changes == nil {
|
|
|
|
+ return nil
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // De-duplicate mutations by key so all take effect at the point of the last
|
|
|
|
+ // write but we keep the mutations in order.
|
|
|
|
+ dups := make(map[objectID]mutInfo)
|
|
|
|
+ for i, m := range txn.changes {
|
|
|
|
+ oid := objectID{
|
|
|
|
+ Table: m.Table,
|
|
|
|
+ IndexVal: string(m.primaryKey),
|
|
|
|
+ }
|
|
|
|
+ // Store the latest mutation index for each key value
|
|
|
|
+ mi, ok := dups[oid]
|
|
|
|
+ if !ok {
|
|
|
|
+ // First entry for key, store the before value
|
|
|
|
+ mi.firstBefore = m.Before
|
|
|
|
+ }
|
|
|
|
+ mi.lastIdx = i
|
|
|
|
+ dups[oid] = mi
|
|
|
|
+ }
|
|
|
|
+ if len(dups) == len(txn.changes) {
|
|
|
|
+ // No duplicates found, fast path return it as is
|
|
|
|
+ return txn.changes
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Need to remove the duplicates
|
|
|
|
+ cs := make(Changes, 0, len(dups))
|
|
|
|
+ for i, m := range txn.changes {
|
|
|
|
+ oid := objectID{
|
|
|
|
+ Table: m.Table,
|
|
|
|
+ IndexVal: string(m.primaryKey),
|
|
|
|
+ }
|
|
|
|
+ mi := dups[oid]
|
|
|
|
+ if mi.lastIdx == i {
|
|
|
|
+ // This was the latest value for this key copy it with the before value in
|
|
|
|
+ // case it's different. Note that m is not a pointer so we are not
|
|
|
|
+ // modifying the txn.changeSet here - it's already a copy.
|
|
|
|
+ m.Before = mi.firstBefore
|
|
|
|
+
|
|
|
|
+ // Edge case - if the object was inserted and then eventually deleted in
|
|
|
|
+ // the same transaction, then the net affect on that key is a no-op. Don't
|
|
|
|
+ // emit a mutation with nil for before and after as it's meaningless and
|
|
|
|
+ // might violate expectations and cause a panic in code that assumes at
|
|
|
|
+ // least one must be set.
|
|
|
|
+ if m.Before == nil && m.After == nil {
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+ cs = append(cs, m)
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ // Store the de-duped version in case this is called again
|
|
|
|
+ txn.changes = cs
|
|
|
|
+ return cs
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (txn *Txn) getIndexIterator(table, index string, args ...interface{}) (*iradix.Iterator, []byte, error) {
|
|
|
|
+ // Get the index value to scan
|
|
|
|
+ indexSchema, val, err := txn.getIndexValue(table, index, args...)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return nil, nil, err
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Get the index itself
|
|
|
|
+ indexTxn := txn.readableIndex(table, indexSchema.Name)
|
|
|
|
+ indexRoot := indexTxn.Root()
|
|
|
|
+
|
|
|
|
+ // Get an iterator over the index
|
|
|
|
+ indexIter := indexRoot.Iterator()
|
|
|
|
+ return indexIter, val, nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (txn *Txn) getIndexIteratorReverse(table, index string, args ...interface{}) (*iradix.ReverseIterator, []byte, error) {
|
|
|
|
+ // Get the index value to scan
|
|
|
|
+ indexSchema, val, err := txn.getIndexValue(table, index, args...)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return nil, nil, err
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Get the index itself
|
|
|
|
+ indexTxn := txn.readableIndex(table, indexSchema.Name)
|
|
|
|
+ indexRoot := indexTxn.Root()
|
|
|
|
+
|
|
|
|
+ // Get an interator over the index
|
|
|
|
+ indexIter := indexRoot.ReverseIterator()
|
|
|
|
+ return indexIter, val, nil
|
|
|
|
+}
|
|
|
|
+
|
|
// Defer is used to push a new arbitrary function onto a stack which
|
|
// Defer is used to push a new arbitrary function onto a stack which
|
|
// gets called when a transaction is committed and finished. Deferred
|
|
// gets called when a transaction is committed and finished. Deferred
|
|
// functions are called in LIFO order, and only invoked at the end of
|
|
// functions are called in LIFO order, and only invoked at the end of
|
|
@@ -506,10 +921,15 @@ func (txn *Txn) Defer(fn func()) {
|
|
}
|
|
}
|
|
|
|
|
|
// radixIterator is used to wrap an underlying iradix iterator.
|
|
// radixIterator is used to wrap an underlying iradix iterator.
|
|
-// This is much mroe efficient than a sliceIterator as we are not
|
|
|
|
|
|
+// This is much more efficient than a sliceIterator as we are not
|
|
// materializing the entire view.
|
|
// materializing the entire view.
|
|
type radixIterator struct {
|
|
type radixIterator struct {
|
|
- iter *iradix.Iterator
|
|
|
|
|
|
+ iter *iradix.Iterator
|
|
|
|
+ watchCh <-chan struct{}
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (r *radixIterator) WatchCh() <-chan struct{} {
|
|
|
|
+ return r.watchCh
|
|
}
|
|
}
|
|
|
|
|
|
func (r *radixIterator) Next() interface{} {
|
|
func (r *radixIterator) Next() interface{} {
|
|
@@ -519,3 +939,43 @@ func (r *radixIterator) Next() interface{} {
|
|
}
|
|
}
|
|
return value
|
|
return value
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+type radixReverseIterator struct {
|
|
|
|
+ iter *iradix.ReverseIterator
|
|
|
|
+ watchCh <-chan struct{}
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (r *radixReverseIterator) Next() interface{} {
|
|
|
|
+ _, value, ok := r.iter.Previous()
|
|
|
|
+ if !ok {
|
|
|
|
+ return nil
|
|
|
|
+ }
|
|
|
|
+ return value
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (r *radixReverseIterator) WatchCh() <-chan struct{} {
|
|
|
|
+ return r.watchCh
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// Snapshot creates a snapshot of the current state of the transaction.
|
|
|
|
+// Returns a new read-only transaction or nil if the transaction is already
|
|
|
|
+// aborted or committed.
|
|
|
|
+func (txn *Txn) Snapshot() *Txn {
|
|
|
|
+ if txn.rootTxn == nil {
|
|
|
|
+ return nil
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ snapshot := &Txn{
|
|
|
|
+ db: txn.db,
|
|
|
|
+ rootTxn: txn.rootTxn.Clone(),
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Commit sub-transactions into the snapshot
|
|
|
|
+ for key, subTxn := range txn.modified {
|
|
|
|
+ path := indexPath(key.Table, key.Index)
|
|
|
|
+ final := subTxn.CommitOnly()
|
|
|
|
+ snapshot.rootTxn.Insert(path, final)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return snapshot
|
|
|
|
+}
|