|
@@ -22,9 +22,7 @@ var (
|
|
|
ErrBoltBucketOptionMissing = errors.New("boltBucket config option missing")
|
|
|
)
|
|
|
|
|
|
-const (
|
|
|
- filePerm os.FileMode = 0644
|
|
|
-)
|
|
|
+const filePerm = 0o644
|
|
|
|
|
|
// BoltDB type implements the Store interface
|
|
|
type BoltDB struct {
|
|
@@ -54,13 +52,6 @@ func Register() {
|
|
|
|
|
|
// New opens a new BoltDB connection to the specified path and bucket
|
|
|
func New(endpoints []string, options *store.Config) (store.Store, error) {
|
|
|
- var (
|
|
|
- db *bolt.DB
|
|
|
- err error
|
|
|
- boltOptions *bolt.Options
|
|
|
- timeout = transientTimeout
|
|
|
- )
|
|
|
-
|
|
|
if len(endpoints) > 1 {
|
|
|
return nil, ErrMultipleEndpointsUnsupported
|
|
|
}
|
|
@@ -70,18 +61,22 @@ func New(endpoints []string, options *store.Config) (store.Store, error) {
|
|
|
}
|
|
|
|
|
|
dir, _ := filepath.Split(endpoints[0])
|
|
|
- if err = os.MkdirAll(dir, 0750); err != nil {
|
|
|
+ if err := os.MkdirAll(dir, 0o750); err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
|
|
|
+ var db *bolt.DB
|
|
|
if options.PersistConnection {
|
|
|
- boltOptions = &bolt.Options{Timeout: options.ConnectionTimeout}
|
|
|
- db, err = bolt.Open(endpoints[0], filePerm, boltOptions)
|
|
|
+ var err error
|
|
|
+ db, err = bolt.Open(endpoints[0], filePerm, &bolt.Options{
|
|
|
+ Timeout: options.ConnectionTimeout,
|
|
|
+ })
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ timeout := transientTimeout
|
|
|
if options.ConnectionTimeout != 0 {
|
|
|
timeout = options.ConnectionTimeout
|
|
|
}
|
|
@@ -103,18 +98,13 @@ func (b *BoltDB) reset() {
|
|
|
}
|
|
|
|
|
|
func (b *BoltDB) getDBhandle() (*bolt.DB, error) {
|
|
|
- var (
|
|
|
- db *bolt.DB
|
|
|
- err error
|
|
|
- )
|
|
|
if !b.PersistConnection {
|
|
|
- boltOptions := &bolt.Options{Timeout: b.timeout}
|
|
|
- if db, err = bolt.Open(b.path, filePerm, boltOptions); err != nil {
|
|
|
+ db, err := bolt.Open(b.path, filePerm, &bolt.Options{Timeout: b.timeout})
|
|
|
+ if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
b.client = db
|
|
|
}
|
|
|
-
|
|
|
return b.client, nil
|
|
|
}
|
|
|
|
|
@@ -127,19 +117,16 @@ func (b *BoltDB) releaseDBhandle() {
|
|
|
// Get the value at "key". BoltDB doesn't provide an inbuilt last modified index with every kv pair. Its implemented by
|
|
|
// by a atomic counter maintained by the libkv and appened to the value passed by the client.
|
|
|
func (b *BoltDB) Get(key string) (*store.KVPair, error) {
|
|
|
- var (
|
|
|
- val []byte
|
|
|
- db *bolt.DB
|
|
|
- err error
|
|
|
- )
|
|
|
b.Lock()
|
|
|
defer b.Unlock()
|
|
|
|
|
|
- if db, err = b.getDBhandle(); err != nil {
|
|
|
+ db, err := b.getDBhandle()
|
|
|
+ if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
defer b.releaseDBhandle()
|
|
|
|
|
|
+ var val []byte
|
|
|
err = db.View(func(tx *bolt.Tx) error {
|
|
|
bucket := tx.Bucket(b.boltBucket)
|
|
|
if bucket == nil {
|
|
@@ -163,100 +150,78 @@ func (b *BoltDB) Get(key string) (*store.KVPair, error) {
|
|
|
dbIndex := binary.LittleEndian.Uint64(val[:libkvmetadatalen])
|
|
|
val = val[libkvmetadatalen:]
|
|
|
|
|
|
- return &store.KVPair{Key: key, Value: val, LastIndex: (dbIndex)}, nil
|
|
|
+ return &store.KVPair{Key: key, Value: val, LastIndex: dbIndex}, nil
|
|
|
}
|
|
|
|
|
|
// Put the key, value pair. index number metadata is prepended to the value
|
|
|
func (b *BoltDB) Put(key string, value []byte) error {
|
|
|
- var (
|
|
|
- dbIndex uint64
|
|
|
- db *bolt.DB
|
|
|
- err error
|
|
|
- )
|
|
|
b.Lock()
|
|
|
defer b.Unlock()
|
|
|
|
|
|
- dbval := make([]byte, libkvmetadatalen)
|
|
|
-
|
|
|
- if db, err = b.getDBhandle(); err != nil {
|
|
|
+ db, err := b.getDBhandle()
|
|
|
+ if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
defer b.releaseDBhandle()
|
|
|
|
|
|
- err = db.Update(func(tx *bolt.Tx) error {
|
|
|
+ return db.Update(func(tx *bolt.Tx) error {
|
|
|
bucket, err := tx.CreateBucketIfNotExists(b.boltBucket)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
- dbIndex = atomic.AddUint64(&b.dbIndex, 1)
|
|
|
+ dbIndex := atomic.AddUint64(&b.dbIndex, 1)
|
|
|
+ dbval := make([]byte, libkvmetadatalen)
|
|
|
binary.LittleEndian.PutUint64(dbval, dbIndex)
|
|
|
dbval = append(dbval, value...)
|
|
|
|
|
|
- err = bucket.Put([]byte(key), dbval)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- return nil
|
|
|
+ return bucket.Put([]byte(key), dbval)
|
|
|
})
|
|
|
- return err
|
|
|
}
|
|
|
|
|
|
// Delete the value for the given key.
|
|
|
func (b *BoltDB) Delete(key string) error {
|
|
|
- var (
|
|
|
- db *bolt.DB
|
|
|
- err error
|
|
|
- )
|
|
|
b.Lock()
|
|
|
defer b.Unlock()
|
|
|
|
|
|
- if db, err = b.getDBhandle(); err != nil {
|
|
|
+ db, err := b.getDBhandle()
|
|
|
+ if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
defer b.releaseDBhandle()
|
|
|
|
|
|
- err = db.Update(func(tx *bolt.Tx) error {
|
|
|
+ return db.Update(func(tx *bolt.Tx) error {
|
|
|
bucket := tx.Bucket(b.boltBucket)
|
|
|
if bucket == nil {
|
|
|
return store.ErrKeyNotFound
|
|
|
}
|
|
|
- err := bucket.Delete([]byte(key))
|
|
|
- return err
|
|
|
+ return bucket.Delete([]byte(key))
|
|
|
})
|
|
|
- return err
|
|
|
}
|
|
|
|
|
|
// Exists checks if the key exists inside the store
|
|
|
func (b *BoltDB) Exists(key string) (bool, error) {
|
|
|
- var (
|
|
|
- val []byte
|
|
|
- db *bolt.DB
|
|
|
- err error
|
|
|
- )
|
|
|
b.Lock()
|
|
|
defer b.Unlock()
|
|
|
|
|
|
- if db, err = b.getDBhandle(); err != nil {
|
|
|
+ db, err := b.getDBhandle()
|
|
|
+ if err != nil {
|
|
|
return false, err
|
|
|
}
|
|
|
defer b.releaseDBhandle()
|
|
|
|
|
|
+ var exists bool
|
|
|
err = db.View(func(tx *bolt.Tx) error {
|
|
|
bucket := tx.Bucket(b.boltBucket)
|
|
|
if bucket == nil {
|
|
|
return store.ErrKeyNotFound
|
|
|
}
|
|
|
|
|
|
- val = bucket.Get([]byte(key))
|
|
|
-
|
|
|
+ exists = len(bucket.Get([]byte(key))) > 0
|
|
|
return nil
|
|
|
})
|
|
|
|
|
|
- if len(val) == 0 {
|
|
|
- return false, err
|
|
|
- }
|
|
|
- return true, err
|
|
|
+ return exists, err
|
|
|
}
|
|
|
|
|
|
// List returns the range of keys starting with the passed in prefix
|
|
@@ -342,29 +307,24 @@ func (b *BoltDB) AtomicDelete(key string, previous *store.KVPair) error {
|
|
|
// AtomicPut puts a value at "key" if the key has not been
|
|
|
// modified since the last Put, throws an error if this is the case
|
|
|
func (b *BoltDB) AtomicPut(key string, value []byte, previous *store.KVPair) (*store.KVPair, error) {
|
|
|
- var (
|
|
|
- val []byte
|
|
|
- dbIndex uint64
|
|
|
- db *bolt.DB
|
|
|
- err error
|
|
|
- )
|
|
|
b.Lock()
|
|
|
defer b.Unlock()
|
|
|
|
|
|
- dbval := make([]byte, libkvmetadatalen)
|
|
|
-
|
|
|
- if db, err = b.getDBhandle(); err != nil {
|
|
|
+ db, err := b.getDBhandle()
|
|
|
+ if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
defer b.releaseDBhandle()
|
|
|
|
|
|
+ var dbIndex uint64
|
|
|
+ dbval := make([]byte, libkvmetadatalen)
|
|
|
err = db.Update(func(tx *bolt.Tx) error {
|
|
|
- var err error
|
|
|
bucket := tx.Bucket(b.boltBucket)
|
|
|
if bucket == nil {
|
|
|
if previous != nil {
|
|
|
return store.ErrKeyNotFound
|
|
|
}
|
|
|
+ var err error
|
|
|
bucket, err = tx.CreateBucket(b.boltBucket)
|
|
|
if err != nil {
|
|
|
return err
|
|
@@ -372,7 +332,7 @@ func (b *BoltDB) AtomicPut(key string, value []byte, previous *store.KVPair) (*s
|
|
|
}
|
|
|
// AtomicPut is equivalent to Put if previous is nil and the Ky
|
|
|
// doesn't exist in the DB.
|
|
|
- val = bucket.Get([]byte(key))
|
|
|
+ val := bucket.Get([]byte(key))
|
|
|
if previous == nil && len(val) != 0 {
|
|
|
return store.ErrKeyExists
|
|
|
}
|
|
@@ -388,7 +348,7 @@ func (b *BoltDB) AtomicPut(key string, value []byte, previous *store.KVPair) (*s
|
|
|
dbIndex = atomic.AddUint64(&b.dbIndex, 1)
|
|
|
binary.LittleEndian.PutUint64(dbval, b.dbIndex)
|
|
|
dbval = append(dbval, value...)
|
|
|
- return (bucket.Put([]byte(key), dbval))
|
|
|
+ return bucket.Put([]byte(key), dbval)
|
|
|
})
|
|
|
if err != nil {
|
|
|
return nil, err
|