diff --git a/libnetwork/internal/kvstore/boltdb/boltdb.go b/libnetwork/internal/kvstore/boltdb/boltdb.go new file mode 100644 index 0000000000..d62979cb72 --- /dev/null +++ b/libnetwork/internal/kvstore/boltdb/boltdb.go @@ -0,0 +1,474 @@ +package boltdb + +import ( + "bytes" + "encoding/binary" + "errors" + "os" + "path/filepath" + "sync" + "sync/atomic" + "time" + + "github.com/docker/libkv" + "github.com/docker/libkv/store" + bolt "go.etcd.io/bbolt" +) + +var ( + // ErrMultipleEndpointsUnsupported is thrown when multiple endpoints specified for + // BoltDB. Endpoint has to be a local file path + ErrMultipleEndpointsUnsupported = errors.New("boltdb supports one endpoint and should be a file path") + // ErrBoltBucketOptionMissing is thrown when boltBcuket config option is missing + ErrBoltBucketOptionMissing = errors.New("boltBucket config option missing") +) + +const ( + filePerm os.FileMode = 0644 +) + +//BoltDB type implements the Store interface +type BoltDB struct { + client *bolt.DB + boltBucket []byte + dbIndex uint64 + path string + timeout time.Duration + // By default libkv opens and closes the bolt DB connection for every + // get/put operation. This allows multiple apps to use a Bolt DB at the + // same time. + // PersistConnection flag provides an option to override ths behavior. + // ie: open the connection in New and use it till Close is called. + PersistConnection bool + sync.Mutex +} + +const ( + libkvmetadatalen = 8 + transientTimeout = time.Duration(10) * time.Second +) + +// Register registers boltdb to libkv +func Register() { + libkv.AddStore(store.BOLTDB, New) +} + +// New opens a new BoltDB connection to the specified path and bucket +func New(endpoints []string, options *store.Config) (store.Store, error) { + var ( + db *bolt.DB + err error + boltOptions *bolt.Options + timeout = transientTimeout + ) + + if len(endpoints) > 1 { + return nil, ErrMultipleEndpointsUnsupported + } + + if (options == nil) || (len(options.Bucket) == 0) { + return nil, ErrBoltBucketOptionMissing + } + + dir, _ := filepath.Split(endpoints[0]) + if err = os.MkdirAll(dir, 0750); err != nil { + return nil, err + } + + if options.PersistConnection { + boltOptions = &bolt.Options{Timeout: options.ConnectionTimeout} + db, err = bolt.Open(endpoints[0], filePerm, boltOptions) + if err != nil { + return nil, err + } + } + + if options.ConnectionTimeout != 0 { + timeout = options.ConnectionTimeout + } + + b := &BoltDB{ + client: db, + path: endpoints[0], + boltBucket: []byte(options.Bucket), + timeout: timeout, + PersistConnection: options.PersistConnection, + } + + return b, nil +} + +func (b *BoltDB) reset() { + b.path = "" + b.boltBucket = []byte{} +} + +func (b *BoltDB) getDBhandle() (*bolt.DB, error) { + var ( + db *bolt.DB + err error + ) + if !b.PersistConnection { + boltOptions := &bolt.Options{Timeout: b.timeout} + if db, err = bolt.Open(b.path, filePerm, boltOptions); err != nil { + return nil, err + } + b.client = db + } + + return b.client, nil +} + +func (b *BoltDB) releaseDBhandle() { + if !b.PersistConnection { + b.client.Close() + } +} + +// Get the value at "key". BoltDB doesn't provide an inbuilt last modified index with every kv pair. Its implemented by +// by a atomic counter maintained by the libkv and appened to the value passed by the client. +func (b *BoltDB) Get(key string) (*store.KVPair, error) { + var ( + val []byte + db *bolt.DB + err error + ) + b.Lock() + defer b.Unlock() + + if db, err = b.getDBhandle(); err != nil { + return nil, err + } + defer b.releaseDBhandle() + + err = db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket(b.boltBucket) + if bucket == nil { + return store.ErrKeyNotFound + } + + v := bucket.Get([]byte(key)) + val = make([]byte, len(v)) + copy(val, v) + + return nil + }) + + if len(val) == 0 { + return nil, store.ErrKeyNotFound + } + if err != nil { + return nil, err + } + + dbIndex := binary.LittleEndian.Uint64(val[:libkvmetadatalen]) + val = val[libkvmetadatalen:] + + return &store.KVPair{Key: key, Value: val, LastIndex: (dbIndex)}, nil +} + +//Put the key, value pair. index number metadata is prepended to the value +func (b *BoltDB) Put(key string, value []byte, opts *store.WriteOptions) error { + var ( + dbIndex uint64 + db *bolt.DB + err error + ) + b.Lock() + defer b.Unlock() + + dbval := make([]byte, libkvmetadatalen) + + if db, err = b.getDBhandle(); err != nil { + return err + } + defer b.releaseDBhandle() + + err = db.Update(func(tx *bolt.Tx) error { + bucket, err := tx.CreateBucketIfNotExists(b.boltBucket) + if err != nil { + return err + } + + dbIndex = atomic.AddUint64(&b.dbIndex, 1) + binary.LittleEndian.PutUint64(dbval, dbIndex) + dbval = append(dbval, value...) + + err = bucket.Put([]byte(key), dbval) + if err != nil { + return err + } + return nil + }) + return err +} + +//Delete the value for the given key. +func (b *BoltDB) Delete(key string) error { + var ( + db *bolt.DB + err error + ) + b.Lock() + defer b.Unlock() + + if db, err = b.getDBhandle(); err != nil { + return err + } + defer b.releaseDBhandle() + + err = db.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket(b.boltBucket) + if bucket == nil { + return store.ErrKeyNotFound + } + err := bucket.Delete([]byte(key)) + return err + }) + return err +} + +// Exists checks if the key exists inside the store +func (b *BoltDB) Exists(key string) (bool, error) { + var ( + val []byte + db *bolt.DB + err error + ) + b.Lock() + defer b.Unlock() + + if db, err = b.getDBhandle(); err != nil { + return false, err + } + defer b.releaseDBhandle() + + err = db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket(b.boltBucket) + if bucket == nil { + return store.ErrKeyNotFound + } + + val = bucket.Get([]byte(key)) + + return nil + }) + + if len(val) == 0 { + return false, err + } + return true, err +} + +// List returns the range of keys starting with the passed in prefix +func (b *BoltDB) List(keyPrefix string) ([]*store.KVPair, error) { + var ( + db *bolt.DB + err error + ) + b.Lock() + defer b.Unlock() + + kv := []*store.KVPair{} + + if db, err = b.getDBhandle(); err != nil { + return nil, err + } + defer b.releaseDBhandle() + + err = db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket(b.boltBucket) + if bucket == nil { + return store.ErrKeyNotFound + } + + cursor := bucket.Cursor() + prefix := []byte(keyPrefix) + + for key, v := cursor.Seek(prefix); bytes.HasPrefix(key, prefix); key, v = cursor.Next() { + + dbIndex := binary.LittleEndian.Uint64(v[:libkvmetadatalen]) + v = v[libkvmetadatalen:] + val := make([]byte, len(v)) + copy(val, v) + + kv = append(kv, &store.KVPair{ + Key: string(key), + Value: val, + LastIndex: dbIndex, + }) + } + return nil + }) + if len(kv) == 0 { + return nil, store.ErrKeyNotFound + } + return kv, err +} + +// AtomicDelete deletes a value at "key" if the key +// has not been modified in the meantime, throws an +// error if this is the case +func (b *BoltDB) AtomicDelete(key string, previous *store.KVPair) (bool, error) { + var ( + val []byte + db *bolt.DB + err error + ) + b.Lock() + defer b.Unlock() + + if previous == nil { + return false, store.ErrPreviousNotSpecified + } + if db, err = b.getDBhandle(); err != nil { + return false, err + } + defer b.releaseDBhandle() + + err = db.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket(b.boltBucket) + if bucket == nil { + return store.ErrKeyNotFound + } + + val = bucket.Get([]byte(key)) + if val == nil { + return store.ErrKeyNotFound + } + dbIndex := binary.LittleEndian.Uint64(val[:libkvmetadatalen]) + if dbIndex != previous.LastIndex { + return store.ErrKeyModified + } + err := bucket.Delete([]byte(key)) + return err + }) + if err != nil { + return false, err + } + return true, err +} + +// AtomicPut puts a value at "key" if the key has not been +// modified since the last Put, throws an error if this is the case +func (b *BoltDB) AtomicPut(key string, value []byte, previous *store.KVPair, options *store.WriteOptions) (bool, *store.KVPair, error) { + var ( + val []byte + dbIndex uint64 + db *bolt.DB + err error + ) + b.Lock() + defer b.Unlock() + + dbval := make([]byte, libkvmetadatalen) + + if db, err = b.getDBhandle(); err != nil { + return false, nil, err + } + defer b.releaseDBhandle() + + err = db.Update(func(tx *bolt.Tx) error { + var err error + bucket := tx.Bucket(b.boltBucket) + if bucket == nil { + if previous != nil { + return store.ErrKeyNotFound + } + bucket, err = tx.CreateBucket(b.boltBucket) + if err != nil { + return err + } + } + // AtomicPut is equivalent to Put if previous is nil and the Ky + // doesn't exist in the DB. + val = bucket.Get([]byte(key)) + if previous == nil && len(val) != 0 { + return store.ErrKeyExists + } + if previous != nil { + if len(val) == 0 { + return store.ErrKeyNotFound + } + dbIndex = binary.LittleEndian.Uint64(val[:libkvmetadatalen]) + if dbIndex != previous.LastIndex { + return store.ErrKeyModified + } + } + dbIndex = atomic.AddUint64(&b.dbIndex, 1) + binary.LittleEndian.PutUint64(dbval, b.dbIndex) + dbval = append(dbval, value...) + return (bucket.Put([]byte(key), dbval)) + }) + if err != nil { + return false, nil, err + } + + updated := &store.KVPair{ + Key: key, + Value: value, + LastIndex: dbIndex, + } + + return true, updated, nil +} + +// Close the db connection to the BoltDB +func (b *BoltDB) Close() { + b.Lock() + defer b.Unlock() + + if !b.PersistConnection { + b.reset() + } else { + b.client.Close() + } + return +} + +// DeleteTree deletes a range of keys with a given prefix +func (b *BoltDB) DeleteTree(keyPrefix string) error { + var ( + db *bolt.DB + err error + ) + b.Lock() + defer b.Unlock() + + if db, err = b.getDBhandle(); err != nil { + return err + } + defer b.releaseDBhandle() + + err = db.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket(b.boltBucket) + if bucket == nil { + return store.ErrKeyNotFound + } + + cursor := bucket.Cursor() + prefix := []byte(keyPrefix) + + for key, _ := cursor.Seek(prefix); bytes.HasPrefix(key, prefix); key, _ = cursor.Next() { + _ = bucket.Delete([]byte(key)) + } + return nil + }) + + return err +} + +// NewLock has to implemented at the library level since its not supported by BoltDB +func (b *BoltDB) NewLock(key string, options *store.LockOptions) (store.Locker, error) { + return nil, store.ErrCallNotSupported +} + +// Watch has to implemented at the library level since its not supported by BoltDB +func (b *BoltDB) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) { + return nil, store.ErrCallNotSupported +} + +// WatchTree has to implemented at the library level since its not supported by BoltDB +func (b *BoltDB) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) { + return nil, store.ErrCallNotSupported +} diff --git a/libnetwork/internal/kvstore/kvstore.go b/libnetwork/internal/kvstore/kvstore.go new file mode 100644 index 0000000000..7a4850c019 --- /dev/null +++ b/libnetwork/internal/kvstore/kvstore.go @@ -0,0 +1,132 @@ +package store + +import ( + "crypto/tls" + "errors" + "time" +) + +// Backend represents a KV Store Backend +type Backend string + +const ( + // CONSUL backend + CONSUL Backend = "consul" + // ETCD backend + ETCD Backend = "etcd" + // ZK backend + ZK Backend = "zk" + // BOLTDB backend + BOLTDB Backend = "boltdb" +) + +var ( + // ErrBackendNotSupported is thrown when the backend k/v store is not supported by libkv + ErrBackendNotSupported = errors.New("Backend storage not supported yet, please choose one of") + // ErrCallNotSupported is thrown when a method is not implemented/supported by the current backend + ErrCallNotSupported = errors.New("The current call is not supported with this backend") + // ErrNotReachable is thrown when the API cannot be reached for issuing common store operations + ErrNotReachable = errors.New("Api not reachable") + // ErrCannotLock is thrown when there is an error acquiring a lock on a key + ErrCannotLock = errors.New("Error acquiring the lock") + // ErrKeyModified is thrown during an atomic operation if the index does not match the one in the store + ErrKeyModified = errors.New("Unable to complete atomic operation, key modified") + // ErrKeyNotFound is thrown when the key is not found in the store during a Get operation + ErrKeyNotFound = errors.New("Key not found in store") + // ErrPreviousNotSpecified is thrown when the previous value is not specified for an atomic operation + ErrPreviousNotSpecified = errors.New("Previous K/V pair should be provided for the Atomic operation") + // ErrKeyExists is thrown when the previous value exists in the case of an AtomicPut + ErrKeyExists = errors.New("Previous K/V pair exists, cannot complete Atomic operation") +) + +// Config contains the options for a storage client +type Config struct { + ClientTLS *ClientTLSConfig + TLS *tls.Config + ConnectionTimeout time.Duration + Bucket string + PersistConnection bool + Username string + Password string +} + +// ClientTLSConfig contains data for a Client TLS configuration in the form +// the etcd client wants it. Eventually we'll adapt it for ZK and Consul. +type ClientTLSConfig struct { + CertFile string + KeyFile string + CACertFile string +} + +// Store represents the backend K/V storage +// Each store should support every call listed +// here. Or it couldn't be implemented as a K/V +// backend for libkv +type Store interface { + // Put a value at the specified key + Put(key string, value []byte, options *WriteOptions) error + + // Get a value given its key + Get(key string) (*KVPair, error) + + // Delete the value at the specified key + Delete(key string) error + + // Verify if a Key exists in the store + Exists(key string) (bool, error) + + // Watch for changes on a key + Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error) + + // WatchTree watches for changes on child nodes under + // a given directory + WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*KVPair, error) + + // NewLock creates a lock for a given key. + // The returned Locker is not held and must be acquired + // with `.Lock`. The Value is optional. + NewLock(key string, options *LockOptions) (Locker, error) + + // List the content of a given prefix + List(directory string) ([]*KVPair, error) + + // DeleteTree deletes a range of keys under a given directory + DeleteTree(directory string) error + + // Atomic CAS operation on a single value. + // Pass previous = nil to create a new key. + AtomicPut(key string, value []byte, previous *KVPair, options *WriteOptions) (bool, *KVPair, error) + + // Atomic delete of a single value + AtomicDelete(key string, previous *KVPair) (bool, error) + + // Close the store connection + Close() +} + +// KVPair represents {Key, Value, Lastindex} tuple +type KVPair struct { + Key string + Value []byte + LastIndex uint64 +} + +// WriteOptions contains optional request parameters +type WriteOptions struct { + IsDir bool + TTL time.Duration +} + +// LockOptions contains optional request parameters +type LockOptions struct { + Value []byte // Optional, value to associate with the lock + TTL time.Duration // Optional, expiration ttl associated with the lock + RenewLock chan struct{} // Optional, chan used to control and stop the session ttl renewal for the lock +} + +// Locker provides locking mechanism on top of the store. +// Similar to `sync.Lock` except it may return errors. +type Locker interface { + Lock(stopChan chan struct{}) (<-chan struct{}, error) + Unlock() error +} diff --git a/libnetwork/internal/kvstore/kvstore_manage.go b/libnetwork/internal/kvstore/kvstore_manage.go new file mode 100644 index 0000000000..bdb8c7529f --- /dev/null +++ b/libnetwork/internal/kvstore/kvstore_manage.go @@ -0,0 +1,40 @@ +package libkv + +import ( + "fmt" + "sort" + "strings" + + "github.com/docker/libkv/store" +) + +// Initialize creates a new Store object, initializing the client +type Initialize func(addrs []string, options *store.Config) (store.Store, error) + +var ( + // Backend initializers + initializers = make(map[store.Backend]Initialize) + + supportedBackend = func() string { + keys := make([]string, 0, len(initializers)) + for k := range initializers { + keys = append(keys, string(k)) + } + sort.Strings(keys) + return strings.Join(keys, ", ") + }() +) + +// NewStore creates an instance of store +func NewStore(backend store.Backend, addrs []string, options *store.Config) (store.Store, error) { + if init, exists := initializers[backend]; exists { + return init(addrs, options) + } + + return nil, fmt.Errorf("%s %s", store.ErrBackendNotSupported.Error(), supportedBackend) +} + +// AddStore adds a new store backend to libkv +func AddStore(store store.Backend, init Initialize) { + initializers[store] = init +}