Просмотр исходного кода

Merge pull request #46 from sanimej/bolt

BoltDB backend support for libkv
Alexandre Beslic 10 лет назад
Родитель
Сommit
6c6315a1b3

+ 307 - 0
libnetwork/internal/kvstore/boltdb/boltdb.go

@@ -0,0 +1,307 @@
+package boltdb
+
+import (
+	"bytes"
+	"encoding/binary"
+	"errors"
+	"sync/atomic"
+
+	"github.com/boltdb/bolt"
+	"github.com/docker/libkv/store"
+)
+
+var (
+	// ErrMultipleEndpointsUnsupported is thrown when multiple endpoints specified for
+	// BoltDB. Endpoint has to be a local file path
+	ErrMultipleEndpointsUnsupported = errors.New("boltdb supports one endpoint and should be a file path")
+	// ErrBoltBucketNotFound is thrown when specified BoltBD bucket doesn't exist in the DB
+	ErrBoltBucketNotFound = errors.New("boltdb bucket doesn't exist")
+	// ErrBoltBucketOptionMissing is thrown when boltBcuket config option is missing
+	ErrBoltBucketOptionMissing = errors.New("boltBucket config option missing")
+	// ErrBoltAPIUnsupported is thrown when an APIs unsupported by BoltDB backend is called
+	ErrBoltAPIUnsupported = errors.New("API not supported by BoltDB backend")
+)
+
+//BoltDB type implements the Store interface
+type BoltDB struct {
+	client     *bolt.DB
+	boltBucket []byte
+	dbIndex    uint64
+}
+
+const (
+	libkvmetadatalen = 8
+)
+
+// New opens a new BoltDB connection to the specified path and bucket
+func New(endpoints []string, options *store.Config) (store.Store, error) {
+	if len(endpoints) > 1 {
+		return nil, ErrMultipleEndpointsUnsupported
+	}
+
+	if (options == nil) || (len(options.Bucket) == 0) {
+		return nil, ErrBoltBucketOptionMissing
+	}
+
+	db, err := bolt.Open(endpoints[0], 0644, nil)
+	if err != nil {
+		return nil, err
+	}
+
+	b := &BoltDB{}
+
+	b.client = db
+	b.boltBucket = []byte(options.Bucket)
+	return b, nil
+}
+
+// Get the value at "key". BoltDB doesn't provide an inbuilt last modified index with every kv pair. Its implemented by
+// by a atomic counter maintained by the libkv and appened to the value passed by the client.
+func (b *BoltDB) Get(key string) (*store.KVPair, error) {
+	var val []byte
+
+	db := b.client
+	err := db.View(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket(b.boltBucket)
+		if bucket == nil {
+			return (ErrBoltBucketNotFound)
+		}
+
+		val = bucket.Get([]byte(key))
+
+		return nil
+	})
+
+	if len(val) == 0 {
+		return nil, store.ErrKeyNotFound
+	}
+	if err != nil {
+		return nil, err
+	}
+
+	dbIndex := binary.LittleEndian.Uint64(val[:libkvmetadatalen])
+	val = val[libkvmetadatalen:]
+
+	return &store.KVPair{Key: key, Value: val, LastIndex: (dbIndex)}, nil
+}
+
+//Put the key, value pair. index number metadata is prepended to the value
+func (b *BoltDB) Put(key string, value []byte, opts *store.WriteOptions) error {
+	var dbIndex uint64
+	db := b.client
+	dbval := make([]byte, libkvmetadatalen)
+
+	err := db.Update(func(tx *bolt.Tx) error {
+		bucket, err := tx.CreateBucketIfNotExists(b.boltBucket)
+		if err != nil {
+			return err
+		}
+
+		dbIndex = atomic.AddUint64(&b.dbIndex, 1)
+		binary.LittleEndian.PutUint64(dbval, dbIndex)
+		dbval = append(dbval, value...)
+
+		err = bucket.Put([]byte(key), dbval)
+		if err != nil {
+			return err
+		}
+		return nil
+	})
+	return err
+}
+
+//Delete the value for the given key.
+func (b *BoltDB) Delete(key string) error {
+	db := b.client
+
+	err := db.Update(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket(b.boltBucket)
+		if bucket == nil {
+			return (ErrBoltBucketNotFound)
+		}
+		err := bucket.Delete([]byte(key))
+		return err
+	})
+	return err
+}
+
+// Exists checks if the key exists inside the store
+func (b *BoltDB) Exists(key string) (bool, error) {
+	var val []byte
+
+	db := b.client
+	err := db.View(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket(b.boltBucket)
+		if bucket == nil {
+			return (ErrBoltBucketNotFound)
+		}
+
+		val = bucket.Get([]byte(key))
+
+		return nil
+	})
+
+	if len(val) == 0 {
+		return false, err
+	}
+	return true, err
+}
+
+// List returns the range of keys starting with the passed in prefix
+func (b *BoltDB) List(keyPrefix string) ([]*store.KVPair, error) {
+	kv := []*store.KVPair{}
+
+	db := b.client
+	err := db.View(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket(b.boltBucket)
+		if bucket == nil {
+			return (ErrBoltBucketNotFound)
+		}
+
+		cursor := bucket.Cursor()
+		prefix := []byte(keyPrefix)
+
+		for key, val := cursor.Seek(prefix); bytes.HasPrefix(key, prefix); key, val = cursor.Next() {
+
+			dbIndex := binary.LittleEndian.Uint64(val[:libkvmetadatalen])
+			val = val[libkvmetadatalen:]
+
+			kv = append(kv, &store.KVPair{
+				Key:       string(key),
+				Value:     val,
+				LastIndex: dbIndex,
+			})
+		}
+		return nil
+	})
+	if len(kv) == 0 {
+		return nil, store.ErrKeyNotFound
+	}
+	return kv, err
+}
+
+// AtomicDelete deletes a value at "key" if the key
+// has not been modified in the meantime, throws an
+// error if this is the case
+func (b *BoltDB) AtomicDelete(key string, previous *store.KVPair) (bool, error) {
+	var val []byte
+	var dbIndex uint64
+
+	if previous == nil {
+		return false, store.ErrPreviousNotSpecified
+	}
+	db := b.client
+
+	err := db.Update(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket(b.boltBucket)
+		if bucket == nil {
+			return ErrBoltBucketNotFound
+		}
+
+		val = bucket.Get([]byte(key))
+		dbIndex = binary.LittleEndian.Uint64(val[:libkvmetadatalen])
+		if dbIndex != previous.LastIndex {
+			return store.ErrKeyModified
+		}
+		err := bucket.Delete([]byte(key))
+		return err
+	})
+	if err != nil {
+		return false, err
+	}
+	return true, err
+}
+
+// AtomicPut puts a value at "key" if the key has not been
+// modified since the last Put, throws an error if this is the case
+func (b *BoltDB) AtomicPut(key string, value []byte, previous *store.KVPair, options *store.WriteOptions) (bool, *store.KVPair, error) {
+	var val []byte
+	var dbIndex uint64
+	dbval := make([]byte, libkvmetadatalen)
+
+	db := b.client
+
+	err := db.Update(func(tx *bolt.Tx) error {
+		var err error
+		bucket := tx.Bucket(b.boltBucket)
+		if bucket == nil {
+			if previous != nil {
+				return ErrBoltBucketNotFound
+			}
+			bucket, err = tx.CreateBucket(b.boltBucket)
+			if err != nil {
+				return err
+			}
+		}
+		// AtomicPut is equivalent to Put if previous is nil and the Ky
+		// doesn't exist in the DB.
+		val = bucket.Get([]byte(key))
+		if previous == nil && len(val) != 0 {
+			return store.ErrKeyModified
+		}
+		if previous != nil {
+			dbIndex = binary.LittleEndian.Uint64(val[:libkvmetadatalen])
+			if dbIndex != previous.LastIndex {
+				return store.ErrKeyModified
+			}
+		}
+		dbIndex = atomic.AddUint64(&b.dbIndex, 1)
+		binary.LittleEndian.PutUint64(dbval, b.dbIndex)
+		dbval = append(dbval, value...)
+		return (bucket.Put([]byte(key), dbval))
+	})
+	if err != nil {
+		return false, nil, err
+	}
+
+	updated := &store.KVPair{
+		Key:       key,
+		Value:     value,
+		LastIndex: dbIndex,
+	}
+
+	return true, updated, nil
+}
+
+// Close the db connection to the BoltDB
+func (b *BoltDB) Close() {
+	db := b.client
+
+	db.Close()
+}
+
+// DeleteTree deletes a range of keys with a given prefix
+func (b *BoltDB) DeleteTree(keyPrefix string) error {
+	db := b.client
+	err := db.Update(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket(b.boltBucket)
+		if bucket == nil {
+			return (ErrBoltBucketNotFound)
+		}
+
+		cursor := bucket.Cursor()
+		prefix := []byte(keyPrefix)
+
+		for key, _ := cursor.Seek(prefix); bytes.HasPrefix(key, prefix); key, _ = cursor.Next() {
+			_ = bucket.Delete([]byte(key))
+		}
+		return nil
+	})
+
+	return err
+}
+
+// NewLock has to implemented at the library level since its not supported by BoltDB
+func (b *BoltDB) NewLock(key string, options *store.LockOptions) (store.Locker, error) {
+	return nil, ErrBoltAPIUnsupported
+}
+
+// Watch has to implemented at the library level since its not supported by BoltDB
+func (b *BoltDB) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) {
+	return nil, ErrBoltAPIUnsupported
+}
+
+// WatchTree has to implemented at the library level since its not supported by BoltDB
+func (b *BoltDB) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) {
+	return nil, ErrBoltAPIUnsupported
+}

+ 3 - 0
libnetwork/internal/kvstore/kvstore.go

@@ -16,6 +16,8 @@ const (
 	ETCD Backend = "etcd"
 	// ZK backend
 	ZK Backend = "zk"
+	// BOLTDB backend
+	BOLTDB Backend = "boltdb"
 )
 
 var (
@@ -39,6 +41,7 @@ var (
 type Config struct {
 	TLS               *tls.Config
 	ConnectionTimeout time.Duration
+	Bucket            string
 }
 
 // Store represents the backend K/V storage

+ 2 - 0
libnetwork/internal/kvstore/kvstore_manage.go

@@ -67,6 +67,7 @@ import (
 	"strings"
 
 	"github.com/docker/libkv/store"
+	"github.com/docker/libkv/store/boltdb"
 	"github.com/docker/libkv/store/consul"
 	"github.com/docker/libkv/store/etcd"
 	"github.com/docker/libkv/store/zookeeper"
@@ -81,6 +82,7 @@ var (
 		store.CONSUL: consul.New,
 		store.ETCD:   etcd.New,
 		store.ZK:     zookeeper.New,
+		store.BOLTDB: boltdb.New,
 	}
 	supportedBackend = func() string {
 		keys := make([]string, 0, len(initializers))