|
@@ -7,6 +7,7 @@ import (
|
|
"os"
|
|
"os"
|
|
"path/filepath"
|
|
"path/filepath"
|
|
"sync/atomic"
|
|
"sync/atomic"
|
|
|
|
+ "time"
|
|
|
|
|
|
"github.com/boltdb/bolt"
|
|
"github.com/boltdb/bolt"
|
|
"github.com/docker/libkv"
|
|
"github.com/docker/libkv"
|
|
@@ -25,15 +26,28 @@ var (
|
|
ErrBoltAPIUnsupported = errors.New("API not supported by BoltDB backend")
|
|
ErrBoltAPIUnsupported = errors.New("API not supported by BoltDB backend")
|
|
)
|
|
)
|
|
|
|
|
|
|
|
+const (
|
|
|
|
+ filePerm os.FileMode = 0644
|
|
|
|
+)
|
|
|
|
+
|
|
//BoltDB type implements the Store interface
|
|
//BoltDB type implements the Store interface
|
|
type BoltDB struct {
|
|
type BoltDB struct {
|
|
client *bolt.DB
|
|
client *bolt.DB
|
|
boltBucket []byte
|
|
boltBucket []byte
|
|
dbIndex uint64
|
|
dbIndex uint64
|
|
|
|
+ path string
|
|
|
|
+ timeout time.Duration
|
|
|
|
+ // By default libkv opens and closes the bolt DB connection for every
|
|
|
|
+ // get/put operation. This allows multiple apps to use a Bolt DB at the
|
|
|
|
+ // same time.
|
|
|
|
+ // PersistConnection flag provides an option to override ths behavior.
|
|
|
|
+ // ie: open the connection in New and use it till Close is called.
|
|
|
|
+ PersistConnection bool
|
|
}
|
|
}
|
|
|
|
|
|
const (
|
|
const (
|
|
libkvmetadatalen = 8
|
|
libkvmetadatalen = 8
|
|
|
|
+ transientTimeout = time.Duration(10) * time.Second
|
|
)
|
|
)
|
|
|
|
|
|
// Register registers boltdb to libkv
|
|
// Register registers boltdb to libkv
|
|
@@ -43,6 +57,12 @@ func Register() {
|
|
|
|
|
|
// New opens a new BoltDB connection to the specified path and bucket
|
|
// New opens a new BoltDB connection to the specified path and bucket
|
|
func New(endpoints []string, options *store.Config) (store.Store, error) {
|
|
func New(endpoints []string, options *store.Config) (store.Store, error) {
|
|
|
|
+ var (
|
|
|
|
+ db *bolt.DB
|
|
|
|
+ err error
|
|
|
|
+ boltOptions *bolt.Options
|
|
|
|
+ )
|
|
|
|
+
|
|
if len(endpoints) > 1 {
|
|
if len(endpoints) > 1 {
|
|
return nil, ErrMultipleEndpointsUnsupported
|
|
return nil, ErrMultipleEndpointsUnsupported
|
|
}
|
|
}
|
|
@@ -52,39 +72,79 @@ func New(endpoints []string, options *store.Config) (store.Store, error) {
|
|
}
|
|
}
|
|
|
|
|
|
dir, _ := filepath.Split(endpoints[0])
|
|
dir, _ := filepath.Split(endpoints[0])
|
|
- if err := os.MkdirAll(dir, 0750); err != nil {
|
|
|
|
|
|
+ if err = os.MkdirAll(dir, 0750); err != nil {
|
|
return nil, err
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
|
|
- var boltOptions *bolt.Options
|
|
|
|
- if options != nil {
|
|
|
|
|
|
+ if options.PersistConnection {
|
|
boltOptions = &bolt.Options{Timeout: options.ConnectionTimeout}
|
|
boltOptions = &bolt.Options{Timeout: options.ConnectionTimeout}
|
|
- }
|
|
|
|
- db, err := bolt.Open(endpoints[0], 0644, boltOptions)
|
|
|
|
- if err != nil {
|
|
|
|
- return nil, err
|
|
|
|
|
|
+ db, err = bolt.Open(endpoints[0], filePerm, boltOptions)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return nil, err
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
- b := &BoltDB{}
|
|
|
|
|
|
+ b := &BoltDB{
|
|
|
|
+ client: db,
|
|
|
|
+ path: endpoints[0],
|
|
|
|
+ boltBucket: []byte(options.Bucket),
|
|
|
|
+ timeout: transientTimeout,
|
|
|
|
+ PersistConnection: options.PersistConnection,
|
|
|
|
+ }
|
|
|
|
|
|
- b.client = db
|
|
|
|
- b.boltBucket = []byte(options.Bucket)
|
|
|
|
return b, nil
|
|
return b, nil
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+func (b *BoltDB) reset() {
|
|
|
|
+ b.path = ""
|
|
|
|
+ b.boltBucket = []byte{}
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (b *BoltDB) getDBhandle() (*bolt.DB, error) {
|
|
|
|
+ var (
|
|
|
|
+ db *bolt.DB
|
|
|
|
+ err error
|
|
|
|
+ )
|
|
|
|
+ if !b.PersistConnection {
|
|
|
|
+ boltOptions := &bolt.Options{Timeout: b.timeout}
|
|
|
|
+ if db, err = bolt.Open(b.path, filePerm, boltOptions); err != nil {
|
|
|
|
+ return nil, err
|
|
|
|
+ }
|
|
|
|
+ b.client = db
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return b.client, nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (b *BoltDB) releaseDBhandle() {
|
|
|
|
+ if !b.PersistConnection {
|
|
|
|
+ b.client.Close()
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
// Get the value at "key". BoltDB doesn't provide an inbuilt last modified index with every kv pair. Its implemented by
|
|
// Get the value at "key". BoltDB doesn't provide an inbuilt last modified index with every kv pair. Its implemented by
|
|
// by a atomic counter maintained by the libkv and appened to the value passed by the client.
|
|
// by a atomic counter maintained by the libkv and appened to the value passed by the client.
|
|
func (b *BoltDB) Get(key string) (*store.KVPair, error) {
|
|
func (b *BoltDB) Get(key string) (*store.KVPair, error) {
|
|
- var val []byte
|
|
|
|
|
|
+ var (
|
|
|
|
+ val []byte
|
|
|
|
+ db *bolt.DB
|
|
|
|
+ err error
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ if db, err = b.getDBhandle(); err != nil {
|
|
|
|
+ return nil, err
|
|
|
|
+ }
|
|
|
|
+ defer b.releaseDBhandle()
|
|
|
|
|
|
- db := b.client
|
|
|
|
- err := db.View(func(tx *bolt.Tx) error {
|
|
|
|
|
|
+ err = db.View(func(tx *bolt.Tx) error {
|
|
bucket := tx.Bucket(b.boltBucket)
|
|
bucket := tx.Bucket(b.boltBucket)
|
|
if bucket == nil {
|
|
if bucket == nil {
|
|
- return (ErrBoltBucketNotFound)
|
|
|
|
|
|
+ return ErrBoltBucketNotFound
|
|
}
|
|
}
|
|
|
|
|
|
- val = bucket.Get([]byte(key))
|
|
|
|
|
|
+ v := bucket.Get([]byte(key))
|
|
|
|
+ val = make([]byte, len(v))
|
|
|
|
+ copy(val, v)
|
|
|
|
|
|
return nil
|
|
return nil
|
|
})
|
|
})
|
|
@@ -104,11 +164,19 @@ func (b *BoltDB) Get(key string) (*store.KVPair, error) {
|
|
|
|
|
|
//Put the key, value pair. index number metadata is prepended to the value
|
|
//Put the key, value pair. index number metadata is prepended to the value
|
|
func (b *BoltDB) Put(key string, value []byte, opts *store.WriteOptions) error {
|
|
func (b *BoltDB) Put(key string, value []byte, opts *store.WriteOptions) error {
|
|
- var dbIndex uint64
|
|
|
|
- db := b.client
|
|
|
|
|
|
+ var (
|
|
|
|
+ dbIndex uint64
|
|
|
|
+ db *bolt.DB
|
|
|
|
+ err error
|
|
|
|
+ )
|
|
dbval := make([]byte, libkvmetadatalen)
|
|
dbval := make([]byte, libkvmetadatalen)
|
|
|
|
|
|
- err := db.Update(func(tx *bolt.Tx) error {
|
|
|
|
|
|
+ if db, err = b.getDBhandle(); err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+ defer b.releaseDBhandle()
|
|
|
|
+
|
|
|
|
+ err = db.Update(func(tx *bolt.Tx) error {
|
|
bucket, err := tx.CreateBucketIfNotExists(b.boltBucket)
|
|
bucket, err := tx.CreateBucketIfNotExists(b.boltBucket)
|
|
if err != nil {
|
|
if err != nil {
|
|
return err
|
|
return err
|
|
@@ -129,12 +197,19 @@ func (b *BoltDB) Put(key string, value []byte, opts *store.WriteOptions) error {
|
|
|
|
|
|
//Delete the value for the given key.
|
|
//Delete the value for the given key.
|
|
func (b *BoltDB) Delete(key string) error {
|
|
func (b *BoltDB) Delete(key string) error {
|
|
- db := b.client
|
|
|
|
|
|
+ var (
|
|
|
|
+ db *bolt.DB
|
|
|
|
+ err error
|
|
|
|
+ )
|
|
|
|
+ if db, err = b.getDBhandle(); err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+ defer b.releaseDBhandle()
|
|
|
|
|
|
- err := db.Update(func(tx *bolt.Tx) error {
|
|
|
|
|
|
+ err = db.Update(func(tx *bolt.Tx) error {
|
|
bucket := tx.Bucket(b.boltBucket)
|
|
bucket := tx.Bucket(b.boltBucket)
|
|
if bucket == nil {
|
|
if bucket == nil {
|
|
- return (ErrBoltBucketNotFound)
|
|
|
|
|
|
+ return ErrBoltBucketNotFound
|
|
}
|
|
}
|
|
err := bucket.Delete([]byte(key))
|
|
err := bucket.Delete([]byte(key))
|
|
return err
|
|
return err
|
|
@@ -144,13 +219,21 @@ func (b *BoltDB) Delete(key string) error {
|
|
|
|
|
|
// Exists checks if the key exists inside the store
|
|
// Exists checks if the key exists inside the store
|
|
func (b *BoltDB) Exists(key string) (bool, error) {
|
|
func (b *BoltDB) Exists(key string) (bool, error) {
|
|
- var val []byte
|
|
|
|
|
|
+ var (
|
|
|
|
+ val []byte
|
|
|
|
+ db *bolt.DB
|
|
|
|
+ err error
|
|
|
|
+ )
|
|
|
|
|
|
- db := b.client
|
|
|
|
- err := db.View(func(tx *bolt.Tx) error {
|
|
|
|
|
|
+ if db, err = b.getDBhandle(); err != nil {
|
|
|
|
+ return false, err
|
|
|
|
+ }
|
|
|
|
+ defer b.releaseDBhandle()
|
|
|
|
+
|
|
|
|
+ err = db.View(func(tx *bolt.Tx) error {
|
|
bucket := tx.Bucket(b.boltBucket)
|
|
bucket := tx.Bucket(b.boltBucket)
|
|
if bucket == nil {
|
|
if bucket == nil {
|
|
- return (ErrBoltBucketNotFound)
|
|
|
|
|
|
+ return ErrBoltBucketNotFound
|
|
}
|
|
}
|
|
|
|
|
|
val = bucket.Get([]byte(key))
|
|
val = bucket.Get([]byte(key))
|
|
@@ -166,22 +249,32 @@ func (b *BoltDB) Exists(key string) (bool, error) {
|
|
|
|
|
|
// List returns the range of keys starting with the passed in prefix
|
|
// List returns the range of keys starting with the passed in prefix
|
|
func (b *BoltDB) List(keyPrefix string) ([]*store.KVPair, error) {
|
|
func (b *BoltDB) List(keyPrefix string) ([]*store.KVPair, error) {
|
|
|
|
+ var (
|
|
|
|
+ db *bolt.DB
|
|
|
|
+ err error
|
|
|
|
+ )
|
|
kv := []*store.KVPair{}
|
|
kv := []*store.KVPair{}
|
|
|
|
|
|
- db := b.client
|
|
|
|
- err := db.View(func(tx *bolt.Tx) error {
|
|
|
|
|
|
+ if db, err = b.getDBhandle(); err != nil {
|
|
|
|
+ return nil, err
|
|
|
|
+ }
|
|
|
|
+ defer b.releaseDBhandle()
|
|
|
|
+
|
|
|
|
+ err = db.View(func(tx *bolt.Tx) error {
|
|
bucket := tx.Bucket(b.boltBucket)
|
|
bucket := tx.Bucket(b.boltBucket)
|
|
if bucket == nil {
|
|
if bucket == nil {
|
|
- return (ErrBoltBucketNotFound)
|
|
|
|
|
|
+ return ErrBoltBucketNotFound
|
|
}
|
|
}
|
|
|
|
|
|
cursor := bucket.Cursor()
|
|
cursor := bucket.Cursor()
|
|
prefix := []byte(keyPrefix)
|
|
prefix := []byte(keyPrefix)
|
|
|
|
|
|
- for key, val := cursor.Seek(prefix); bytes.HasPrefix(key, prefix); key, val = cursor.Next() {
|
|
|
|
|
|
+ for key, v := cursor.Seek(prefix); bytes.HasPrefix(key, prefix); key, v = cursor.Next() {
|
|
|
|
|
|
- dbIndex := binary.LittleEndian.Uint64(val[:libkvmetadatalen])
|
|
|
|
- val = val[libkvmetadatalen:]
|
|
|
|
|
|
+ dbIndex := binary.LittleEndian.Uint64(v[:libkvmetadatalen])
|
|
|
|
+ v = v[libkvmetadatalen:]
|
|
|
|
+ val := make([]byte, len(v))
|
|
|
|
+ copy(val, v)
|
|
|
|
|
|
kv = append(kv, &store.KVPair{
|
|
kv = append(kv, &store.KVPair{
|
|
Key: string(key),
|
|
Key: string(key),
|
|
@@ -201,22 +294,28 @@ func (b *BoltDB) List(keyPrefix string) ([]*store.KVPair, error) {
|
|
// has not been modified in the meantime, throws an
|
|
// has not been modified in the meantime, throws an
|
|
// error if this is the case
|
|
// error if this is the case
|
|
func (b *BoltDB) AtomicDelete(key string, previous *store.KVPair) (bool, error) {
|
|
func (b *BoltDB) AtomicDelete(key string, previous *store.KVPair) (bool, error) {
|
|
- var val []byte
|
|
|
|
- var dbIndex uint64
|
|
|
|
|
|
+ var (
|
|
|
|
+ val []byte
|
|
|
|
+ db *bolt.DB
|
|
|
|
+ err error
|
|
|
|
+ )
|
|
|
|
|
|
if previous == nil {
|
|
if previous == nil {
|
|
return false, store.ErrPreviousNotSpecified
|
|
return false, store.ErrPreviousNotSpecified
|
|
}
|
|
}
|
|
- db := b.client
|
|
|
|
|
|
+ if db, err = b.getDBhandle(); err != nil {
|
|
|
|
+ return false, err
|
|
|
|
+ }
|
|
|
|
+ defer b.releaseDBhandle()
|
|
|
|
|
|
- err := db.Update(func(tx *bolt.Tx) error {
|
|
|
|
|
|
+ err = db.Update(func(tx *bolt.Tx) error {
|
|
bucket := tx.Bucket(b.boltBucket)
|
|
bucket := tx.Bucket(b.boltBucket)
|
|
if bucket == nil {
|
|
if bucket == nil {
|
|
return ErrBoltBucketNotFound
|
|
return ErrBoltBucketNotFound
|
|
}
|
|
}
|
|
|
|
|
|
val = bucket.Get([]byte(key))
|
|
val = bucket.Get([]byte(key))
|
|
- dbIndex = binary.LittleEndian.Uint64(val[:libkvmetadatalen])
|
|
|
|
|
|
+ dbIndex := binary.LittleEndian.Uint64(val[:libkvmetadatalen])
|
|
if dbIndex != previous.LastIndex {
|
|
if dbIndex != previous.LastIndex {
|
|
return store.ErrKeyModified
|
|
return store.ErrKeyModified
|
|
}
|
|
}
|
|
@@ -232,13 +331,20 @@ func (b *BoltDB) AtomicDelete(key string, previous *store.KVPair) (bool, error)
|
|
// AtomicPut puts a value at "key" if the key has not been
|
|
// AtomicPut puts a value at "key" if the key has not been
|
|
// modified since the last Put, throws an error if this is the case
|
|
// modified since the last Put, throws an error if this is the case
|
|
func (b *BoltDB) AtomicPut(key string, value []byte, previous *store.KVPair, options *store.WriteOptions) (bool, *store.KVPair, error) {
|
|
func (b *BoltDB) AtomicPut(key string, value []byte, previous *store.KVPair, options *store.WriteOptions) (bool, *store.KVPair, error) {
|
|
- var val []byte
|
|
|
|
- var dbIndex uint64
|
|
|
|
|
|
+ var (
|
|
|
|
+ val []byte
|
|
|
|
+ dbIndex uint64
|
|
|
|
+ db *bolt.DB
|
|
|
|
+ err error
|
|
|
|
+ )
|
|
dbval := make([]byte, libkvmetadatalen)
|
|
dbval := make([]byte, libkvmetadatalen)
|
|
|
|
|
|
- db := b.client
|
|
|
|
|
|
+ if db, err = b.getDBhandle(); err != nil {
|
|
|
|
+ return false, nil, err
|
|
|
|
+ }
|
|
|
|
+ defer b.releaseDBhandle()
|
|
|
|
|
|
- err := db.Update(func(tx *bolt.Tx) error {
|
|
|
|
|
|
+ err = db.Update(func(tx *bolt.Tx) error {
|
|
var err error
|
|
var err error
|
|
bucket := tx.Bucket(b.boltBucket)
|
|
bucket := tx.Bucket(b.boltBucket)
|
|
if bucket == nil {
|
|
if bucket == nil {
|
|
@@ -285,18 +391,29 @@ func (b *BoltDB) AtomicPut(key string, value []byte, previous *store.KVPair, opt
|
|
|
|
|
|
// Close the db connection to the BoltDB
|
|
// Close the db connection to the BoltDB
|
|
func (b *BoltDB) Close() {
|
|
func (b *BoltDB) Close() {
|
|
- db := b.client
|
|
|
|
-
|
|
|
|
- db.Close()
|
|
|
|
|
|
+ if !b.PersistConnection {
|
|
|
|
+ b.reset()
|
|
|
|
+ } else {
|
|
|
|
+ b.client.Close()
|
|
|
|
+ }
|
|
|
|
+ return
|
|
}
|
|
}
|
|
|
|
|
|
// DeleteTree deletes a range of keys with a given prefix
|
|
// DeleteTree deletes a range of keys with a given prefix
|
|
func (b *BoltDB) DeleteTree(keyPrefix string) error {
|
|
func (b *BoltDB) DeleteTree(keyPrefix string) error {
|
|
- db := b.client
|
|
|
|
- err := db.Update(func(tx *bolt.Tx) error {
|
|
|
|
|
|
+ var (
|
|
|
|
+ db *bolt.DB
|
|
|
|
+ err error
|
|
|
|
+ )
|
|
|
|
+ if db, err = b.getDBhandle(); err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+ defer b.releaseDBhandle()
|
|
|
|
+
|
|
|
|
+ err = db.Update(func(tx *bolt.Tx) error {
|
|
bucket := tx.Bucket(b.boltBucket)
|
|
bucket := tx.Bucket(b.boltBucket)
|
|
if bucket == nil {
|
|
if bucket == nil {
|
|
- return (ErrBoltBucketNotFound)
|
|
|
|
|
|
+ return ErrBoltBucketNotFound
|
|
}
|
|
}
|
|
|
|
|
|
cursor := bucket.Cursor()
|
|
cursor := bucket.Cursor()
|