Browse Source

Add transient connection option to bolt db

Signed-off-by: Santhosh Manohar <santhosh@docker.com>
Santhosh Manohar 9 years ago
parent
commit
0b81042dc3
2 changed files with 164 additions and 46 deletions
  1. 163 46
      libnetwork/internal/kvstore/boltdb/boltdb.go
  2. 1 0
      libnetwork/internal/kvstore/kvstore.go

+ 163 - 46
libnetwork/internal/kvstore/boltdb/boltdb.go

@@ -7,6 +7,7 @@ import (
 	"os"
 	"path/filepath"
 	"sync/atomic"
+	"time"
 
 	"github.com/boltdb/bolt"
 	"github.com/docker/libkv"
@@ -23,15 +24,28 @@ var (
 	ErrBoltBucketOptionMissing = errors.New("boltBucket config option missing")
 )
 
+const (
+	filePerm os.FileMode = 0644
+)
+
 //BoltDB type implements the Store interface
 type BoltDB struct {
 	client     *bolt.DB
 	boltBucket []byte
 	dbIndex    uint64
+	path       string
+	timeout    time.Duration
+	// By default libkv opens and closes the bolt DB connection  for every
+	// get/put operation. This allows multiple apps to use a Bolt DB at the
+	// same time.
+	// PersistConnection flag provides an option to override ths behavior.
+	// ie: open the connection in New and use it till Close is called.
+	PersistConnection bool
 }
 
 const (
 	libkvmetadatalen = 8
+	transientTimeout = time.Duration(10) * time.Second
 )
 
 // Register registers boltdb to libkv
@@ -41,6 +55,12 @@ func Register() {
 
 // New opens a new BoltDB connection to the specified path and bucket
 func New(endpoints []string, options *store.Config) (store.Store, error) {
+	var (
+		db          *bolt.DB
+		err         error
+		boltOptions *bolt.Options
+	)
+
 	if len(endpoints) > 1 {
 		return nil, ErrMultipleEndpointsUnsupported
 	}
@@ -50,39 +70,79 @@ func New(endpoints []string, options *store.Config) (store.Store, error) {
 	}
 
 	dir, _ := filepath.Split(endpoints[0])
-	if err := os.MkdirAll(dir, 0750); err != nil {
+	if err = os.MkdirAll(dir, 0750); err != nil {
 		return nil, err
 	}
 
-	var boltOptions *bolt.Options
-	if options != nil {
+	if options.PersistConnection {
 		boltOptions = &bolt.Options{Timeout: options.ConnectionTimeout}
-	}
-	db, err := bolt.Open(endpoints[0], 0644, boltOptions)
-	if err != nil {
-		return nil, err
+		db, err = bolt.Open(endpoints[0], filePerm, boltOptions)
+		if err != nil {
+			return nil, err
+		}
 	}
 
-	b := &BoltDB{}
+	b := &BoltDB{
+		client:            db,
+		path:              endpoints[0],
+		boltBucket:        []byte(options.Bucket),
+		timeout:           transientTimeout,
+		PersistConnection: options.PersistConnection,
+	}
 
-	b.client = db
-	b.boltBucket = []byte(options.Bucket)
 	return b, nil
 }
 
+func (b *BoltDB) reset() {
+	b.path = ""
+	b.boltBucket = []byte{}
+}
+
+func (b *BoltDB) getDBhandle() (*bolt.DB, error) {
+	var (
+		db  *bolt.DB
+		err error
+	)
+	if !b.PersistConnection {
+		boltOptions := &bolt.Options{Timeout: b.timeout}
+		if db, err = bolt.Open(b.path, filePerm, boltOptions); err != nil {
+			return nil, err
+		}
+		b.client = db
+	}
+
+	return b.client, nil
+}
+
+func (b *BoltDB) releaseDBhandle() {
+	if !b.PersistConnection {
+		b.client.Close()
+	}
+}
+
 // Get the value at "key". BoltDB doesn't provide an inbuilt last modified index with every kv pair. Its implemented by
 // by a atomic counter maintained by the libkv and appened to the value passed by the client.
 func (b *BoltDB) Get(key string) (*store.KVPair, error) {
-	var val []byte
+	var (
+		val []byte
+		db  *bolt.DB
+		err error
+	)
+
+	if db, err = b.getDBhandle(); err != nil {
+		return nil, err
+	}
+	defer b.releaseDBhandle()
 
-	db := b.client
-	err := db.View(func(tx *bolt.Tx) error {
+	err = db.View(func(tx *bolt.Tx) error {
 		bucket := tx.Bucket(b.boltBucket)
 		if bucket == nil {
-			return (ErrBoltBucketNotFound)
+			return ErrBoltBucketNotFound
 		}
 
-		val = bucket.Get([]byte(key))
+		v := bucket.Get([]byte(key))
+		val = make([]byte, len(v))
+		copy(val, v)
 
 		return nil
 	})
@@ -102,11 +162,19 @@ func (b *BoltDB) Get(key string) (*store.KVPair, error) {
 
 //Put the key, value pair. index number metadata is prepended to the value
 func (b *BoltDB) Put(key string, value []byte, opts *store.WriteOptions) error {
-	var dbIndex uint64
-	db := b.client
+	var (
+		dbIndex uint64
+		db      *bolt.DB
+		err     error
+	)
 	dbval := make([]byte, libkvmetadatalen)
 
-	err := db.Update(func(tx *bolt.Tx) error {
+	if db, err = b.getDBhandle(); err != nil {
+		return err
+	}
+	defer b.releaseDBhandle()
+
+	err = db.Update(func(tx *bolt.Tx) error {
 		bucket, err := tx.CreateBucketIfNotExists(b.boltBucket)
 		if err != nil {
 			return err
@@ -127,12 +195,19 @@ func (b *BoltDB) Put(key string, value []byte, opts *store.WriteOptions) error {
 
 //Delete the value for the given key.
 func (b *BoltDB) Delete(key string) error {
-	db := b.client
+	var (
+		db  *bolt.DB
+		err error
+	)
+	if db, err = b.getDBhandle(); err != nil {
+		return err
+	}
+	defer b.releaseDBhandle()
 
-	err := db.Update(func(tx *bolt.Tx) error {
+	err = db.Update(func(tx *bolt.Tx) error {
 		bucket := tx.Bucket(b.boltBucket)
 		if bucket == nil {
-			return (ErrBoltBucketNotFound)
+			return ErrBoltBucketNotFound
 		}
 		err := bucket.Delete([]byte(key))
 		return err
@@ -142,13 +217,21 @@ func (b *BoltDB) Delete(key string) error {
 
 // Exists checks if the key exists inside the store
 func (b *BoltDB) Exists(key string) (bool, error) {
-	var val []byte
+	var (
+		val []byte
+		db  *bolt.DB
+		err error
+	)
 
-	db := b.client
-	err := db.View(func(tx *bolt.Tx) error {
+	if db, err = b.getDBhandle(); err != nil {
+		return false, err
+	}
+	defer b.releaseDBhandle()
+
+	err = db.View(func(tx *bolt.Tx) error {
 		bucket := tx.Bucket(b.boltBucket)
 		if bucket == nil {
-			return (ErrBoltBucketNotFound)
+			return ErrBoltBucketNotFound
 		}
 
 		val = bucket.Get([]byte(key))
@@ -164,22 +247,32 @@ func (b *BoltDB) Exists(key string) (bool, error) {
 
 // List returns the range of keys starting with the passed in prefix
 func (b *BoltDB) List(keyPrefix string) ([]*store.KVPair, error) {
+	var (
+		db  *bolt.DB
+		err error
+	)
 	kv := []*store.KVPair{}
 
-	db := b.client
-	err := db.View(func(tx *bolt.Tx) error {
+	if db, err = b.getDBhandle(); err != nil {
+		return nil, err
+	}
+	defer b.releaseDBhandle()
+
+	err = db.View(func(tx *bolt.Tx) error {
 		bucket := tx.Bucket(b.boltBucket)
 		if bucket == nil {
-			return (ErrBoltBucketNotFound)
+			return ErrBoltBucketNotFound
 		}
 
 		cursor := bucket.Cursor()
 		prefix := []byte(keyPrefix)
 
-		for key, val := cursor.Seek(prefix); bytes.HasPrefix(key, prefix); key, val = cursor.Next() {
+		for key, v := cursor.Seek(prefix); bytes.HasPrefix(key, prefix); key, v = cursor.Next() {
 
-			dbIndex := binary.LittleEndian.Uint64(val[:libkvmetadatalen])
-			val = val[libkvmetadatalen:]
+			dbIndex := binary.LittleEndian.Uint64(v[:libkvmetadatalen])
+			v = v[libkvmetadatalen:]
+			val := make([]byte, len(v))
+			copy(val, v)
 
 			kv = append(kv, &store.KVPair{
 				Key:       string(key),
@@ -199,22 +292,28 @@ func (b *BoltDB) List(keyPrefix string) ([]*store.KVPair, error) {
 // has not been modified in the meantime, throws an
 // error if this is the case
 func (b *BoltDB) AtomicDelete(key string, previous *store.KVPair) (bool, error) {
-	var val []byte
-	var dbIndex uint64
+	var (
+		val []byte
+		db  *bolt.DB
+		err error
+	)
 
 	if previous == nil {
 		return false, store.ErrPreviousNotSpecified
 	}
-	db := b.client
+	if db, err = b.getDBhandle(); err != nil {
+		return false, err
+	}
+	defer b.releaseDBhandle()
 
-	err := db.Update(func(tx *bolt.Tx) error {
+	err = db.Update(func(tx *bolt.Tx) error {
 		bucket := tx.Bucket(b.boltBucket)
 		if bucket == nil {
 			return ErrBoltBucketNotFound
 		}
 
 		val = bucket.Get([]byte(key))
-		dbIndex = binary.LittleEndian.Uint64(val[:libkvmetadatalen])
+		dbIndex := binary.LittleEndian.Uint64(val[:libkvmetadatalen])
 		if dbIndex != previous.LastIndex {
 			return store.ErrKeyModified
 		}
@@ -230,13 +329,20 @@ func (b *BoltDB) AtomicDelete(key string, previous *store.KVPair) (bool, error)
 // AtomicPut puts a value at "key" if the key has not been
 // modified since the last Put, throws an error if this is the case
 func (b *BoltDB) AtomicPut(key string, value []byte, previous *store.KVPair, options *store.WriteOptions) (bool, *store.KVPair, error) {
-	var val []byte
-	var dbIndex uint64
+	var (
+		val     []byte
+		dbIndex uint64
+		db      *bolt.DB
+		err     error
+	)
 	dbval := make([]byte, libkvmetadatalen)
 
-	db := b.client
+	if db, err = b.getDBhandle(); err != nil {
+		return false, nil, err
+	}
+	defer b.releaseDBhandle()
 
-	err := db.Update(func(tx *bolt.Tx) error {
+	err = db.Update(func(tx *bolt.Tx) error {
 		var err error
 		bucket := tx.Bucket(b.boltBucket)
 		if bucket == nil {
@@ -283,18 +389,29 @@ func (b *BoltDB) AtomicPut(key string, value []byte, previous *store.KVPair, opt
 
 // Close the db connection to the BoltDB
 func (b *BoltDB) Close() {
-	db := b.client
-
-	db.Close()
+	if !b.PersistConnection {
+		b.reset()
+	} else {
+		b.client.Close()
+	}
+	return
 }
 
 // DeleteTree deletes a range of keys with a given prefix
 func (b *BoltDB) DeleteTree(keyPrefix string) error {
-	db := b.client
-	err := db.Update(func(tx *bolt.Tx) error {
+	var (
+		db  *bolt.DB
+		err error
+	)
+	if db, err = b.getDBhandle(); err != nil {
+		return err
+	}
+	defer b.releaseDBhandle()
+
+	err = db.Update(func(tx *bolt.Tx) error {
 		bucket := tx.Bucket(b.boltBucket)
 		if bucket == nil {
-			return (ErrBoltBucketNotFound)
+			return ErrBoltBucketNotFound
 		}
 
 		cursor := bucket.Cursor()

+ 1 - 0
libnetwork/internal/kvstore/kvstore.go

@@ -43,6 +43,7 @@ type Config struct {
 	TLS               *tls.Config
 	ConnectionTimeout time.Duration
 	Bucket            string
+	PersistConnection bool
 }
 
 // ClientTLSConfig contains data for a Client TLS configuration in the form