diff --git a/libnetwork/internal/kvstore/boltdb/boltdb.go b/libnetwork/internal/kvstore/boltdb/boltdb.go index a9213f1090..417deced6e 100644 --- a/libnetwork/internal/kvstore/boltdb/boltdb.go +++ b/libnetwork/internal/kvstore/boltdb/boltdb.go @@ -7,6 +7,7 @@ import ( "os" "path/filepath" "sync/atomic" + "time" "github.com/boltdb/bolt" "github.com/docker/libkv" @@ -23,15 +24,28 @@ var ( ErrBoltBucketOptionMissing = errors.New("boltBucket config option missing") ) +const ( + filePerm os.FileMode = 0644 +) + //BoltDB type implements the Store interface type BoltDB struct { client *bolt.DB boltBucket []byte dbIndex uint64 + path string + timeout time.Duration + // By default libkv opens and closes the bolt DB connection for every + // get/put operation. This allows multiple apps to use a Bolt DB at the + // same time. + // PersistConnection flag provides an option to override ths behavior. + // ie: open the connection in New and use it till Close is called. + PersistConnection bool } const ( libkvmetadatalen = 8 + transientTimeout = time.Duration(10) * time.Second ) // Register registers boltdb to libkv @@ -41,6 +55,12 @@ func Register() { // New opens a new BoltDB connection to the specified path and bucket func New(endpoints []string, options *store.Config) (store.Store, error) { + var ( + db *bolt.DB + err error + boltOptions *bolt.Options + ) + if len(endpoints) > 1 { return nil, ErrMultipleEndpointsUnsupported } @@ -50,39 +70,79 @@ func New(endpoints []string, options *store.Config) (store.Store, error) { } dir, _ := filepath.Split(endpoints[0]) - if err := os.MkdirAll(dir, 0750); err != nil { + if err = os.MkdirAll(dir, 0750); err != nil { return nil, err } - var boltOptions *bolt.Options - if options != nil { + if options.PersistConnection { boltOptions = &bolt.Options{Timeout: options.ConnectionTimeout} - } - db, err := bolt.Open(endpoints[0], 0644, boltOptions) - if err != nil { - return nil, err + db, err = bolt.Open(endpoints[0], filePerm, boltOptions) + if err != nil { + return nil, err + } } - b := &BoltDB{} + b := &BoltDB{ + client: db, + path: endpoints[0], + boltBucket: []byte(options.Bucket), + timeout: transientTimeout, + PersistConnection: options.PersistConnection, + } - b.client = db - b.boltBucket = []byte(options.Bucket) return b, nil } +func (b *BoltDB) reset() { + b.path = "" + b.boltBucket = []byte{} +} + +func (b *BoltDB) getDBhandle() (*bolt.DB, error) { + var ( + db *bolt.DB + err error + ) + if !b.PersistConnection { + boltOptions := &bolt.Options{Timeout: b.timeout} + if db, err = bolt.Open(b.path, filePerm, boltOptions); err != nil { + return nil, err + } + b.client = db + } + + return b.client, nil +} + +func (b *BoltDB) releaseDBhandle() { + if !b.PersistConnection { + b.client.Close() + } +} + // Get the value at "key". BoltDB doesn't provide an inbuilt last modified index with every kv pair. Its implemented by // by a atomic counter maintained by the libkv and appened to the value passed by the client. func (b *BoltDB) Get(key string) (*store.KVPair, error) { - var val []byte + var ( + val []byte + db *bolt.DB + err error + ) - db := b.client - err := db.View(func(tx *bolt.Tx) error { + if db, err = b.getDBhandle(); err != nil { + return nil, err + } + defer b.releaseDBhandle() + + err = db.View(func(tx *bolt.Tx) error { bucket := tx.Bucket(b.boltBucket) if bucket == nil { - return (ErrBoltBucketNotFound) + return ErrBoltBucketNotFound } - val = bucket.Get([]byte(key)) + v := bucket.Get([]byte(key)) + val = make([]byte, len(v)) + copy(val, v) return nil }) @@ -102,11 +162,19 @@ func (b *BoltDB) Get(key string) (*store.KVPair, error) { //Put the key, value pair. index number metadata is prepended to the value func (b *BoltDB) Put(key string, value []byte, opts *store.WriteOptions) error { - var dbIndex uint64 - db := b.client + var ( + dbIndex uint64 + db *bolt.DB + err error + ) dbval := make([]byte, libkvmetadatalen) - err := db.Update(func(tx *bolt.Tx) error { + if db, err = b.getDBhandle(); err != nil { + return err + } + defer b.releaseDBhandle() + + err = db.Update(func(tx *bolt.Tx) error { bucket, err := tx.CreateBucketIfNotExists(b.boltBucket) if err != nil { return err @@ -127,12 +195,19 @@ func (b *BoltDB) Put(key string, value []byte, opts *store.WriteOptions) error { //Delete the value for the given key. func (b *BoltDB) Delete(key string) error { - db := b.client + var ( + db *bolt.DB + err error + ) + if db, err = b.getDBhandle(); err != nil { + return err + } + defer b.releaseDBhandle() - err := db.Update(func(tx *bolt.Tx) error { + err = db.Update(func(tx *bolt.Tx) error { bucket := tx.Bucket(b.boltBucket) if bucket == nil { - return (ErrBoltBucketNotFound) + return ErrBoltBucketNotFound } err := bucket.Delete([]byte(key)) return err @@ -142,13 +217,21 @@ func (b *BoltDB) Delete(key string) error { // Exists checks if the key exists inside the store func (b *BoltDB) Exists(key string) (bool, error) { - var val []byte + var ( + val []byte + db *bolt.DB + err error + ) - db := b.client - err := db.View(func(tx *bolt.Tx) error { + if db, err = b.getDBhandle(); err != nil { + return false, err + } + defer b.releaseDBhandle() + + err = db.View(func(tx *bolt.Tx) error { bucket := tx.Bucket(b.boltBucket) if bucket == nil { - return (ErrBoltBucketNotFound) + return ErrBoltBucketNotFound } val = bucket.Get([]byte(key)) @@ -164,22 +247,32 @@ func (b *BoltDB) Exists(key string) (bool, error) { // List returns the range of keys starting with the passed in prefix func (b *BoltDB) List(keyPrefix string) ([]*store.KVPair, error) { + var ( + db *bolt.DB + err error + ) kv := []*store.KVPair{} - db := b.client - err := db.View(func(tx *bolt.Tx) error { + if db, err = b.getDBhandle(); err != nil { + return nil, err + } + defer b.releaseDBhandle() + + err = db.View(func(tx *bolt.Tx) error { bucket := tx.Bucket(b.boltBucket) if bucket == nil { - return (ErrBoltBucketNotFound) + return ErrBoltBucketNotFound } cursor := bucket.Cursor() prefix := []byte(keyPrefix) - for key, val := cursor.Seek(prefix); bytes.HasPrefix(key, prefix); key, val = cursor.Next() { + for key, v := cursor.Seek(prefix); bytes.HasPrefix(key, prefix); key, v = cursor.Next() { - dbIndex := binary.LittleEndian.Uint64(val[:libkvmetadatalen]) - val = val[libkvmetadatalen:] + dbIndex := binary.LittleEndian.Uint64(v[:libkvmetadatalen]) + v = v[libkvmetadatalen:] + val := make([]byte, len(v)) + copy(val, v) kv = append(kv, &store.KVPair{ Key: string(key), @@ -199,22 +292,28 @@ func (b *BoltDB) List(keyPrefix string) ([]*store.KVPair, error) { // has not been modified in the meantime, throws an // error if this is the case func (b *BoltDB) AtomicDelete(key string, previous *store.KVPair) (bool, error) { - var val []byte - var dbIndex uint64 + var ( + val []byte + db *bolt.DB + err error + ) if previous == nil { return false, store.ErrPreviousNotSpecified } - db := b.client + if db, err = b.getDBhandle(); err != nil { + return false, err + } + defer b.releaseDBhandle() - err := db.Update(func(tx *bolt.Tx) error { + err = db.Update(func(tx *bolt.Tx) error { bucket := tx.Bucket(b.boltBucket) if bucket == nil { return ErrBoltBucketNotFound } val = bucket.Get([]byte(key)) - dbIndex = binary.LittleEndian.Uint64(val[:libkvmetadatalen]) + dbIndex := binary.LittleEndian.Uint64(val[:libkvmetadatalen]) if dbIndex != previous.LastIndex { return store.ErrKeyModified } @@ -230,13 +329,20 @@ func (b *BoltDB) AtomicDelete(key string, previous *store.KVPair) (bool, error) // AtomicPut puts a value at "key" if the key has not been // modified since the last Put, throws an error if this is the case func (b *BoltDB) AtomicPut(key string, value []byte, previous *store.KVPair, options *store.WriteOptions) (bool, *store.KVPair, error) { - var val []byte - var dbIndex uint64 + var ( + val []byte + dbIndex uint64 + db *bolt.DB + err error + ) dbval := make([]byte, libkvmetadatalen) - db := b.client + if db, err = b.getDBhandle(); err != nil { + return false, nil, err + } + defer b.releaseDBhandle() - err := db.Update(func(tx *bolt.Tx) error { + err = db.Update(func(tx *bolt.Tx) error { var err error bucket := tx.Bucket(b.boltBucket) if bucket == nil { @@ -283,18 +389,29 @@ func (b *BoltDB) AtomicPut(key string, value []byte, previous *store.KVPair, opt // Close the db connection to the BoltDB func (b *BoltDB) Close() { - db := b.client - - db.Close() + if !b.PersistConnection { + b.reset() + } else { + b.client.Close() + } + return } // DeleteTree deletes a range of keys with a given prefix func (b *BoltDB) DeleteTree(keyPrefix string) error { - db := b.client - err := db.Update(func(tx *bolt.Tx) error { + var ( + db *bolt.DB + err error + ) + if db, err = b.getDBhandle(); err != nil { + return err + } + defer b.releaseDBhandle() + + err = db.Update(func(tx *bolt.Tx) error { bucket := tx.Bucket(b.boltBucket) if bucket == nil { - return (ErrBoltBucketNotFound) + return ErrBoltBucketNotFound } cursor := bucket.Cursor() diff --git a/libnetwork/internal/kvstore/kvstore.go b/libnetwork/internal/kvstore/kvstore.go index 06889e5468..0df01b6c83 100644 --- a/libnetwork/internal/kvstore/kvstore.go +++ b/libnetwork/internal/kvstore/kvstore.go @@ -43,6 +43,7 @@ type Config struct { TLS *tls.Config ConnectionTimeout time.Duration Bucket string + PersistConnection bool } // ClientTLSConfig contains data for a Client TLS configuration in the form