Add local store caching support

Add local scope store caching support as
well as do some refactoring to make it datastore
scope aware and manage scope specific config.

Signed-off-by: Jana Radhakrishnan <mrjana@docker.com>
This commit is contained in:
Jana Radhakrishnan 2015-10-05 04:18:10 -07:00
parent 5f21cf3df1
commit d74384b1d4
3 changed files with 461 additions and 45 deletions

View file

@ -0,0 +1,153 @@
package datastore
import (
"fmt"
"sync"
"github.com/docker/libkv/store"
"github.com/docker/libkv/store/boltdb"
)
type kvMap map[string]KVObject
type cache struct {
sync.Mutex
kmm map[string]kvMap
ds *datastore
}
func newCache(ds *datastore) *cache {
return &cache{kmm: make(map[string]kvMap), ds: ds}
}
func (c *cache) kmap(kvObject KVObject) (kvMap, error) {
var err error
c.Lock()
keyPrefix := Key(kvObject.KeyPrefix()...)
kmap, ok := c.kmm[keyPrefix]
c.Unlock()
if ok {
return kmap, nil
}
kmap = kvMap{}
// Bail out right away if the kvObject does not implement KVConstructor
ctor, ok := kvObject.(KVConstructor)
if !ok {
return nil, fmt.Errorf("error while populating kmap, object does not implement KVConstructor interface")
}
kvList, err := c.ds.store.List(keyPrefix)
if err != nil {
// In case of BoltDB it may return ErrBoltBucketNotFound when no writes
// have ever happened on the db bucket. So check for both err codes
if err == store.ErrKeyNotFound || err == boltdb.ErrBoltBucketNotFound {
// If the store doesn't have anything then there is nothing to
// populate in the cache. Just bail out.
goto out
}
return nil, fmt.Errorf("error while populating kmap: %v", err)
}
for _, kvPair := range kvList {
// Ignore empty kvPair values
if len(kvPair.Value) == 0 {
continue
}
dstO := ctor.New()
err = dstO.SetValue(kvPair.Value)
if err != nil {
return nil, err
}
// Make sure the object has a correct view of the DB index in
// case we need to modify it and update the DB.
dstO.SetIndex(kvPair.LastIndex)
kmap[Key(dstO.Key()...)] = dstO
}
out:
// There may multiple go routines racing to fill the
// cache. The one which places the kmap in c.kmm first
// wins. The others should just use what the first populated.
c.Lock()
kmapNew, ok := c.kmm[keyPrefix]
if ok {
c.Unlock()
return kmapNew, nil
}
c.kmm[keyPrefix] = kmap
c.Unlock()
return kmap, nil
}
func (c *cache) add(kvObject KVObject) error {
kmap, err := c.kmap(kvObject)
if err != nil {
return err
}
c.Lock()
kmap[Key(kvObject.Key()...)] = kvObject
c.Unlock()
return nil
}
func (c *cache) del(kvObject KVObject) error {
kmap, err := c.kmap(kvObject)
if err != nil {
return err
}
c.Lock()
delete(kmap, Key(kvObject.Key()...))
c.Unlock()
return nil
}
func (c *cache) get(key string, kvObject KVObject) error {
kmap, err := c.kmap(kvObject)
if err != nil {
return err
}
c.Lock()
defer c.Unlock()
o, ok := kmap[Key(kvObject.Key()...)]
if !ok {
return ErrKeyNotFound
}
ctor, ok := o.(KVConstructor)
if !ok {
return fmt.Errorf("kvobject does not implement KVConstructor interface. could not get object")
}
return ctor.CopyTo(kvObject)
}
func (c *cache) list(kvObject KVObject) ([]KVObject, error) {
kmap, err := c.kmap(kvObject)
if err != nil {
return nil, err
}
c.Lock()
defer c.Unlock()
var kvol []KVObject
for _, v := range kmap {
kvol = append(kvol, v)
}
return kvol, nil
}

View file

@ -1,8 +1,11 @@
package datastore
import (
"fmt"
"log"
"reflect"
"strings"
"time"
"github.com/docker/libkv"
"github.com/docker/libkv/store"
@ -10,26 +13,37 @@ import (
"github.com/docker/libkv/store/consul"
"github.com/docker/libkv/store/etcd"
"github.com/docker/libkv/store/zookeeper"
"github.com/docker/libnetwork/config"
"github.com/docker/libnetwork/types"
)
//DataStore exported
type DataStore interface {
// GetObject gets data from datastore and unmarshals to the specified object
GetObject(key string, o KV) error
GetObject(key string, o KVObject) error
// PutObject adds a new Record based on an object into the datastore
PutObject(kvObject KV) error
PutObject(kvObject KVObject) error
// PutObjectAtomic provides an atomic add and update operation for a Record
PutObjectAtomic(kvObject KV) error
PutObjectAtomic(kvObject KVObject) error
// DeleteObject deletes a record
DeleteObject(kvObject KV) error
DeleteObject(kvObject KVObject) error
// DeleteObjectAtomic performs an atomic delete operation
DeleteObjectAtomic(kvObject KV) error
DeleteObjectAtomic(kvObject KVObject) error
// DeleteTree deletes a record
DeleteTree(kvObject KV) error
DeleteTree(kvObject KVObject) error
// Watchable returns whether the store is watchable are not
Watchable() bool
// Watch for changes on a KVObject
Watch(kvObject KVObject, stopCh <-chan struct{}) (<-chan KVObject, error)
// List returns of a list of KVObjects belonging to the parent
// key. The caller must pass a KVObject of the same type as
// the objects that need to be listed
List(string, KVObject) ([]KVObject, error)
// Scope returns the scope of the store
Scope() string
// KVStore returns access to the KV Store
KVStore() store.Store
// Close closes the data store
Close()
}
// ErrKeyModified is raised for an atomic update when the update is working on a stale state
@ -39,11 +53,13 @@ var (
)
type datastore struct {
scope string
store store.Store
cache *cache
}
//KV Key Value interface used by objects to be part of the DataStore
type KV interface {
// KVObject is Key/Value interface used by objects to be part of the DataStore
type KVObject interface {
// Key method lets an object to provide the Key to be used in KV Store
Key() []string
// KeyPrefix method lets an object to return immediate parent key that can be used for tree walk
@ -60,19 +76,40 @@ type KV interface {
// When SetIndex() is called, the object has been stored.
Exists() bool
// DataScope indicates the storage scope of the KV object
DataScope() DataScope
DataScope() string
// Skip provides a way for a KV Object to avoid persisting it in the KV Store
Skip() bool
}
// DataScope indicates the storage scope
type DataScope int
// KVConstructor interface defines methods which can construct a KVObject from another.
type KVConstructor interface {
// New returns a new object which is created based on the
// source object
New() KVObject
// CopyTo deep copies the contents of the implementing object
// to the passed destination object
CopyTo(KVObject) error
}
// ScopeCfg represents Datastore configuration.
type ScopeCfg struct {
Embedded bool
Client ScopeClientCfg
}
// ScopeClientCfg represents Datastore Client-only mode configuration
type ScopeClientCfg struct {
Provider string
Address string
Config *store.Config
}
const (
// LocalScope indicates to store the KV object in local datastore such as boltdb
LocalScope DataScope = iota
LocalScope = "local"
// GlobalScope indicates to store the KV object in global datastore such as consul/etcd/zookeeper
GlobalScope
GlobalScope = "global"
defaultPrefix = "/var/lib/docker/network/files"
)
const (
@ -82,6 +119,27 @@ const (
EndpointKeyPrefix = "endpoint"
)
var (
defaultScopes = makeDefaultScopes()
)
func makeDefaultScopes() map[string]*ScopeCfg {
def := make(map[string]*ScopeCfg)
def[LocalScope] = &ScopeCfg{
Embedded: true,
Client: ScopeClientCfg{
Provider: "boltdb",
Address: defaultPrefix + "/boltdb.db",
Config: &store.Config{
Bucket: "libnetwork",
ConnectionTimeout: 3 * time.Second,
},
},
}
return def
}
var rootChain = []string{"docker", "libnetwork"}
func init() {
@ -91,6 +149,28 @@ func init() {
boltdb.Register()
}
// DefaultScopes returns a map of default scopes and it's config for clients to use.
func DefaultScopes(dataDir string) map[string]*ScopeCfg {
if dataDir != "" {
defaultScopes[LocalScope].Client.Address = dataDir + "/network/files/boltdb.db"
return defaultScopes
}
defaultScopes[LocalScope].Client.Address = defaultPrefix + "/boltdb.db"
return defaultScopes
}
// IsValid checks if the scope config has valid configuration.
func (cfg *ScopeCfg) IsValid() bool {
if cfg == nil ||
strings.TrimSpace(cfg.Client.Provider) == "" ||
strings.TrimSpace(cfg.Client.Address) == "" {
return false
}
return true
}
//Key provides convenient method to create a Key
func Key(key ...string) string {
keychain := append(rootChain, key...)
@ -110,7 +190,11 @@ func ParseKey(key string) ([]string, error) {
}
// newClient used to connect to KV Store
func newClient(kv string, addrs string, config *store.Config) (DataStore, error) {
func newClient(scope string, kv string, addrs string, config *store.Config, cached bool) (DataStore, error) {
if cached && scope != LocalScope {
return nil, fmt.Errorf("caching supported only for scope %s", LocalScope)
}
if config == nil {
config = &store.Config{}
}
@ -118,22 +202,82 @@ func newClient(kv string, addrs string, config *store.Config) (DataStore, error)
if err != nil {
return nil, err
}
ds := &datastore{store: store}
ds := &datastore{scope: scope, store: store}
if cached {
ds.cache = newCache(ds)
}
return ds, nil
}
// NewDataStore creates a new instance of LibKV data store
func NewDataStore(cfg *config.DatastoreCfg) (DataStore, error) {
if cfg == nil {
return nil, types.BadRequestErrorf("invalid configuration passed to datastore")
func NewDataStore(scope string, cfg *ScopeCfg) (DataStore, error) {
if cfg == nil || cfg.Client.Provider == "" || cfg.Client.Address == "" {
c, ok := defaultScopes[scope]
if !ok || c.Client.Provider == "" || c.Client.Address == "" {
return nil, fmt.Errorf("unexpected scope %s without configuration passed", scope)
}
cfg = c
}
// TODO : cfg.Embedded case
return newClient(cfg.Client.Provider, cfg.Client.Address, cfg.Client.Config)
var cached bool
if scope == LocalScope {
cached = true
}
return newClient(scope, cfg.Client.Provider, cfg.Client.Address, cfg.Client.Config, cached)
}
// NewCustomDataStore can be used by clients to plugin cusom datatore that adhers to store.Store
func NewCustomDataStore(customStore store.Store) DataStore {
return &datastore{store: customStore}
func (ds *datastore) Close() {
ds.store.Close()
}
func (ds *datastore) Scope() string {
return ds.scope
}
func (ds *datastore) Watchable() bool {
return ds.scope != LocalScope
}
func (ds *datastore) Watch(kvObject KVObject, stopCh <-chan struct{}) (<-chan KVObject, error) {
sCh := make(chan struct{})
ctor, ok := kvObject.(KVConstructor)
if !ok {
return nil, fmt.Errorf("error watching object type %T, object does not implement KVConstructor interface", kvObject)
}
kvpCh, err := ds.store.Watch(Key(kvObject.Key()...), sCh)
if err != nil {
return nil, err
}
kvoCh := make(chan KVObject)
go func() {
for {
select {
case <-stopCh:
close(sCh)
return
case kvPair := <-kvpCh:
dstO := ctor.New()
if err := dstO.SetValue(kvPair.Value); err != nil {
log.Printf("Could not unmarshal kvpair value = %s", string(kvPair.Value))
break
}
dstO.SetIndex(kvPair.LastIndex)
kvoCh <- dstO
}
}
}()
return kvoCh, nil
}
func (ds *datastore) KVStore() store.Store {
@ -141,40 +285,71 @@ func (ds *datastore) KVStore() store.Store {
}
// PutObjectAtomic adds a new Record based on an object into the datastore
func (ds *datastore) PutObjectAtomic(kvObject KV) error {
func (ds *datastore) PutObjectAtomic(kvObject KVObject) error {
var (
previous *store.KVPair
pair *store.KVPair
err error
)
if kvObject == nil {
return types.BadRequestErrorf("invalid KV Object : nil")
}
kvObjValue := kvObject.Value()
if kvObjValue == nil {
return types.BadRequestErrorf("invalid KV Object with a nil Value for key %s", Key(kvObject.Key()...))
}
var previous *store.KVPair
if kvObject.Skip() {
goto add_cache
}
if kvObject.Exists() {
previous = &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()}
} else {
previous = nil
}
_, pair, err := ds.store.AtomicPut(Key(kvObject.Key()...), kvObjValue, previous, nil)
_, pair, err = ds.store.AtomicPut(Key(kvObject.Key()...), kvObjValue, previous, nil)
if err != nil {
return err
}
kvObject.SetIndex(pair.LastIndex)
add_cache:
if ds.cache != nil {
return ds.cache.add(kvObject)
}
return nil
}
// PutObject adds a new Record based on an object into the datastore
func (ds *datastore) PutObject(kvObject KV) error {
func (ds *datastore) PutObject(kvObject KVObject) error {
if kvObject == nil {
return types.BadRequestErrorf("invalid KV Object : nil")
}
return ds.putObjectWithKey(kvObject, kvObject.Key()...)
if kvObject.Skip() {
goto add_cache
}
if err := ds.putObjectWithKey(kvObject, kvObject.Key()...); err != nil {
return err
}
add_cache:
if ds.cache != nil {
return ds.cache.add(kvObject)
}
return nil
}
func (ds *datastore) putObjectWithKey(kvObject KV, key ...string) error {
func (ds *datastore) putObjectWithKey(kvObject KVObject, key ...string) error {
kvObjValue := kvObject.Value()
if kvObjValue == nil {
@ -184,39 +359,128 @@ func (ds *datastore) putObjectWithKey(kvObject KV, key ...string) error {
}
// GetObject returns a record matching the key
func (ds *datastore) GetObject(key string, o KV) error {
func (ds *datastore) GetObject(key string, o KVObject) error {
if ds.cache != nil {
return ds.cache.get(key, o)
}
kvPair, err := ds.store.Get(key)
if err != nil {
return err
}
err = o.SetValue(kvPair.Value)
if err != nil {
if err := o.SetValue(kvPair.Value); err != nil {
return err
}
// Make sure the object has a correct view of the DB index in case we need to modify it
// and update the DB.
// Make sure the object has a correct view of the DB index in
// case we need to modify it and update the DB.
o.SetIndex(kvPair.LastIndex)
return nil
}
func (ds *datastore) ensureKey(key string) error {
exists, err := ds.store.Exists(key)
if err != nil {
return err
}
if exists {
return nil
}
return ds.store.Put(key, []byte{}, nil)
}
func (ds *datastore) List(key string, kvObject KVObject) ([]KVObject, error) {
if ds.cache != nil {
return ds.cache.list(kvObject)
}
// Bail out right away if the kvObject does not implement KVConstructor
ctor, ok := kvObject.(KVConstructor)
if !ok {
return nil, fmt.Errorf("error listing objects, object does not implement KVConstructor interface")
}
// Make sure the parent key exists
if err := ds.ensureKey(key); err != nil {
return nil, err
}
kvList, err := ds.store.List(key)
if err != nil {
return nil, err
}
var kvol []KVObject
for _, kvPair := range kvList {
if len(kvPair.Value) == 0 {
continue
}
dstO := ctor.New()
if err := dstO.SetValue(kvPair.Value); err != nil {
return nil, err
}
// Make sure the object has a correct view of the DB index in
// case we need to modify it and update the DB.
dstO.SetIndex(kvPair.LastIndex)
kvol = append(kvol, dstO)
}
return kvol, nil
}
// DeleteObject unconditionally deletes a record from the store
func (ds *datastore) DeleteObject(kvObject KV) error {
func (ds *datastore) DeleteObject(kvObject KVObject) error {
// cleaup the cache first
if ds.cache != nil {
ds.cache.del(kvObject)
}
if kvObject.Skip() {
return nil
}
return ds.store.Delete(Key(kvObject.Key()...))
}
// DeleteObjectAtomic performs atomic delete on a record
func (ds *datastore) DeleteObjectAtomic(kvObject KV) error {
func (ds *datastore) DeleteObjectAtomic(kvObject KVObject) error {
if kvObject == nil {
return types.BadRequestErrorf("invalid KV Object : nil")
}
previous := &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()}
_, err := ds.store.AtomicDelete(Key(kvObject.Key()...), previous)
return err
if kvObject.Skip() {
goto del_cache
}
if _, err := ds.store.AtomicDelete(Key(kvObject.Key()...), previous); err != nil {
return err
}
del_cache:
// cleanup the cache only if AtomicDelete went through successfully
if ds.cache != nil {
return ds.cache.del(kvObject)
}
return nil
}
// DeleteTree unconditionally deletes a record from the store
func (ds *datastore) DeleteTree(kvObject KV) error {
func (ds *datastore) DeleteTree(kvObject KVObject) error {
// cleaup the cache first
if ds.cache != nil {
ds.cache.del(kvObject)
}
if kvObject.Skip() {
return nil
}
return ds.store.DeleteTree(Key(kvObject.KeyPrefix()...))
}

View file

@ -5,7 +5,6 @@ import (
"reflect"
"testing"
"github.com/docker/libnetwork/config"
"github.com/docker/libnetwork/options"
_ "github.com/docker/libnetwork/testutils"
"github.com/stretchr/testify/assert"
@ -15,7 +14,7 @@ var dummyKey = "dummy"
// NewCustomDataStore can be used by other Tests in order to use custom datastore
func NewTestDataStore() DataStore {
return &datastore{store: NewMockStore()}
return &datastore{scope: LocalScope, store: NewMockStore()}
}
func TestKey(t *testing.T) {
@ -38,11 +37,11 @@ func TestParseKey(t *testing.T) {
}
func TestInvalidDataStore(t *testing.T) {
config := &config.DatastoreCfg{}
config := &ScopeCfg{}
config.Embedded = false
config.Client.Provider = "invalid"
config.Client.Address = "localhost:8500"
_, err := NewDataStore(config)
_, err := NewDataStore(GlobalScope, config)
if err == nil {
t.Fatal("Invalid Datastore connection configuration must result in a failure")
}
@ -167,7 +166,7 @@ func (n *dummyObject) Skip() bool {
return n.SkipSave
}
func (n *dummyObject) DataScope() DataScope {
func (n *dummyObject) DataScope() string {
return LocalScope
}