Merge pull request #79 from sanimej/bolt

Add transient connection option to bolt db
This commit is contained in:
Alexandre Beslic 2015-10-08 13:24:52 -07:00
commit 2a86b2bdb9
2 changed files with 164 additions and 46 deletions

View file

@ -7,6 +7,7 @@ import (
"os"
"path/filepath"
"sync/atomic"
"time"
"github.com/boltdb/bolt"
"github.com/docker/libkv"
@ -23,15 +24,28 @@ var (
ErrBoltBucketOptionMissing = errors.New("boltBucket config option missing")
)
const (
filePerm os.FileMode = 0644
)
//BoltDB type implements the Store interface
type BoltDB struct {
client *bolt.DB
boltBucket []byte
dbIndex uint64
path string
timeout time.Duration
// By default libkv opens and closes the bolt DB connection for every
// get/put operation. This allows multiple apps to use a Bolt DB at the
// same time.
// PersistConnection flag provides an option to override ths behavior.
// ie: open the connection in New and use it till Close is called.
PersistConnection bool
}
const (
libkvmetadatalen = 8
transientTimeout = time.Duration(10) * time.Second
)
// Register registers boltdb to libkv
@ -41,6 +55,12 @@ func Register() {
// New opens a new BoltDB connection to the specified path and bucket
func New(endpoints []string, options *store.Config) (store.Store, error) {
var (
db *bolt.DB
err error
boltOptions *bolt.Options
)
if len(endpoints) > 1 {
return nil, ErrMultipleEndpointsUnsupported
}
@ -50,39 +70,79 @@ func New(endpoints []string, options *store.Config) (store.Store, error) {
}
dir, _ := filepath.Split(endpoints[0])
if err := os.MkdirAll(dir, 0750); err != nil {
if err = os.MkdirAll(dir, 0750); err != nil {
return nil, err
}
var boltOptions *bolt.Options
if options != nil {
if options.PersistConnection {
boltOptions = &bolt.Options{Timeout: options.ConnectionTimeout}
}
db, err := bolt.Open(endpoints[0], 0644, boltOptions)
if err != nil {
return nil, err
db, err = bolt.Open(endpoints[0], filePerm, boltOptions)
if err != nil {
return nil, err
}
}
b := &BoltDB{}
b := &BoltDB{
client: db,
path: endpoints[0],
boltBucket: []byte(options.Bucket),
timeout: transientTimeout,
PersistConnection: options.PersistConnection,
}
b.client = db
b.boltBucket = []byte(options.Bucket)
return b, nil
}
func (b *BoltDB) reset() {
b.path = ""
b.boltBucket = []byte{}
}
func (b *BoltDB) getDBhandle() (*bolt.DB, error) {
var (
db *bolt.DB
err error
)
if !b.PersistConnection {
boltOptions := &bolt.Options{Timeout: b.timeout}
if db, err = bolt.Open(b.path, filePerm, boltOptions); err != nil {
return nil, err
}
b.client = db
}
return b.client, nil
}
func (b *BoltDB) releaseDBhandle() {
if !b.PersistConnection {
b.client.Close()
}
}
// Get the value at "key". BoltDB doesn't provide an inbuilt last modified index with every kv pair. Its implemented by
// by a atomic counter maintained by the libkv and appened to the value passed by the client.
func (b *BoltDB) Get(key string) (*store.KVPair, error) {
var val []byte
var (
val []byte
db *bolt.DB
err error
)
db := b.client
err := db.View(func(tx *bolt.Tx) error {
if db, err = b.getDBhandle(); err != nil {
return nil, err
}
defer b.releaseDBhandle()
err = db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(b.boltBucket)
if bucket == nil {
return (ErrBoltBucketNotFound)
return ErrBoltBucketNotFound
}
val = bucket.Get([]byte(key))
v := bucket.Get([]byte(key))
val = make([]byte, len(v))
copy(val, v)
return nil
})
@ -102,11 +162,19 @@ func (b *BoltDB) Get(key string) (*store.KVPair, error) {
//Put the key, value pair. index number metadata is prepended to the value
func (b *BoltDB) Put(key string, value []byte, opts *store.WriteOptions) error {
var dbIndex uint64
db := b.client
var (
dbIndex uint64
db *bolt.DB
err error
)
dbval := make([]byte, libkvmetadatalen)
err := db.Update(func(tx *bolt.Tx) error {
if db, err = b.getDBhandle(); err != nil {
return err
}
defer b.releaseDBhandle()
err = db.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists(b.boltBucket)
if err != nil {
return err
@ -127,12 +195,19 @@ func (b *BoltDB) Put(key string, value []byte, opts *store.WriteOptions) error {
//Delete the value for the given key.
func (b *BoltDB) Delete(key string) error {
db := b.client
var (
db *bolt.DB
err error
)
if db, err = b.getDBhandle(); err != nil {
return err
}
defer b.releaseDBhandle()
err := db.Update(func(tx *bolt.Tx) error {
err = db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(b.boltBucket)
if bucket == nil {
return (ErrBoltBucketNotFound)
return ErrBoltBucketNotFound
}
err := bucket.Delete([]byte(key))
return err
@ -142,13 +217,21 @@ func (b *BoltDB) Delete(key string) error {
// Exists checks if the key exists inside the store
func (b *BoltDB) Exists(key string) (bool, error) {
var val []byte
var (
val []byte
db *bolt.DB
err error
)
db := b.client
err := db.View(func(tx *bolt.Tx) error {
if db, err = b.getDBhandle(); err != nil {
return false, err
}
defer b.releaseDBhandle()
err = db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(b.boltBucket)
if bucket == nil {
return (ErrBoltBucketNotFound)
return ErrBoltBucketNotFound
}
val = bucket.Get([]byte(key))
@ -164,22 +247,32 @@ func (b *BoltDB) Exists(key string) (bool, error) {
// List returns the range of keys starting with the passed in prefix
func (b *BoltDB) List(keyPrefix string) ([]*store.KVPair, error) {
var (
db *bolt.DB
err error
)
kv := []*store.KVPair{}
db := b.client
err := db.View(func(tx *bolt.Tx) error {
if db, err = b.getDBhandle(); err != nil {
return nil, err
}
defer b.releaseDBhandle()
err = db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(b.boltBucket)
if bucket == nil {
return (ErrBoltBucketNotFound)
return ErrBoltBucketNotFound
}
cursor := bucket.Cursor()
prefix := []byte(keyPrefix)
for key, val := cursor.Seek(prefix); bytes.HasPrefix(key, prefix); key, val = cursor.Next() {
for key, v := cursor.Seek(prefix); bytes.HasPrefix(key, prefix); key, v = cursor.Next() {
dbIndex := binary.LittleEndian.Uint64(val[:libkvmetadatalen])
val = val[libkvmetadatalen:]
dbIndex := binary.LittleEndian.Uint64(v[:libkvmetadatalen])
v = v[libkvmetadatalen:]
val := make([]byte, len(v))
copy(val, v)
kv = append(kv, &store.KVPair{
Key: string(key),
@ -199,22 +292,28 @@ func (b *BoltDB) List(keyPrefix string) ([]*store.KVPair, error) {
// has not been modified in the meantime, throws an
// error if this is the case
func (b *BoltDB) AtomicDelete(key string, previous *store.KVPair) (bool, error) {
var val []byte
var dbIndex uint64
var (
val []byte
db *bolt.DB
err error
)
if previous == nil {
return false, store.ErrPreviousNotSpecified
}
db := b.client
if db, err = b.getDBhandle(); err != nil {
return false, err
}
defer b.releaseDBhandle()
err := db.Update(func(tx *bolt.Tx) error {
err = db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(b.boltBucket)
if bucket == nil {
return ErrBoltBucketNotFound
}
val = bucket.Get([]byte(key))
dbIndex = binary.LittleEndian.Uint64(val[:libkvmetadatalen])
dbIndex := binary.LittleEndian.Uint64(val[:libkvmetadatalen])
if dbIndex != previous.LastIndex {
return store.ErrKeyModified
}
@ -230,13 +329,20 @@ func (b *BoltDB) AtomicDelete(key string, previous *store.KVPair) (bool, error)
// AtomicPut puts a value at "key" if the key has not been
// modified since the last Put, throws an error if this is the case
func (b *BoltDB) AtomicPut(key string, value []byte, previous *store.KVPair, options *store.WriteOptions) (bool, *store.KVPair, error) {
var val []byte
var dbIndex uint64
var (
val []byte
dbIndex uint64
db *bolt.DB
err error
)
dbval := make([]byte, libkvmetadatalen)
db := b.client
if db, err = b.getDBhandle(); err != nil {
return false, nil, err
}
defer b.releaseDBhandle()
err := db.Update(func(tx *bolt.Tx) error {
err = db.Update(func(tx *bolt.Tx) error {
var err error
bucket := tx.Bucket(b.boltBucket)
if bucket == nil {
@ -283,18 +389,29 @@ func (b *BoltDB) AtomicPut(key string, value []byte, previous *store.KVPair, opt
// Close the db connection to the BoltDB
func (b *BoltDB) Close() {
db := b.client
db.Close()
if !b.PersistConnection {
b.reset()
} else {
b.client.Close()
}
return
}
// DeleteTree deletes a range of keys with a given prefix
func (b *BoltDB) DeleteTree(keyPrefix string) error {
db := b.client
err := db.Update(func(tx *bolt.Tx) error {
var (
db *bolt.DB
err error
)
if db, err = b.getDBhandle(); err != nil {
return err
}
defer b.releaseDBhandle()
err = db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(b.boltBucket)
if bucket == nil {
return (ErrBoltBucketNotFound)
return ErrBoltBucketNotFound
}
cursor := bucket.Cursor()

View file

@ -43,6 +43,7 @@ type Config struct {
TLS *tls.Config
ConnectionTimeout time.Duration
Bucket string
PersistConnection bool
}
// ClientTLSConfig contains data for a Client TLS configuration in the form