Bläddra i källkod

Merge pull request #598 from mrjana/store

Share libkv store handles across datastore handles
aboch 9 år sedan
förälder
incheckning
2e43b58b95
2 ändrade filer med 89 tillägg och 20 borttagningar
  1. 73 15
      libnetwork/datastore/datastore.go
  2. 16 5
      libnetwork/store_test.go

+ 73 - 15
libnetwork/datastore/datastore.go

@@ -5,6 +5,7 @@ import (
 	"log"
 	"reflect"
 	"strings"
+	"sync"
 	"time"
 
 	"github.com/docker/libkv"
@@ -56,6 +57,7 @@ type datastore struct {
 	scope string
 	store store.Store
 	cache *cache
+	cfg   ScopeCfg
 }
 
 // KVObject is  Key/Value interface used by objects to be part of the DataStore
@@ -104,6 +106,12 @@ type ScopeClientCfg struct {
 	Config   *store.Config
 }
 
+type storeTableData struct {
+	refCnt int
+	store  store.Store
+	once   sync.Once
+}
+
 const (
 	// LocalScope indicates to store the KV object in local datastore such as boltdb
 	LocalScope = "local"
@@ -121,6 +129,8 @@ const (
 
 var (
 	defaultScopes = makeDefaultScopes()
+	storeLock     sync.Mutex
+	storeTable    = make(map[ScopeCfg]*storeTableData)
 )
 
 func makeDefaultScopes() map[string]*ScopeCfg {
@@ -190,11 +200,7 @@ func ParseKey(key string) ([]string, error) {
 }
 
 // newClient used to connect to KV Store
-func newClient(scope string, kv string, addrs string, config *store.Config, cached bool) (DataStore, error) {
-	if cached && scope != LocalScope {
-		return nil, fmt.Errorf("caching supported only for scope %s", LocalScope)
-	}
-
+func newClient(scope string, kv string, addrs string, config *store.Config, cached bool) (*datastore, error) {
 	if config == nil {
 		config = &store.Config{}
 	}
@@ -213,24 +219,76 @@ func newClient(scope string, kv string, addrs string, config *store.Config, cach
 
 // NewDataStore creates a new instance of LibKV data store
 func NewDataStore(scope string, cfg *ScopeCfg) (DataStore, error) {
-	if cfg == nil || cfg.Client.Provider == "" || cfg.Client.Address == "" {
-		c, ok := defaultScopes[scope]
-		if !ok || c.Client.Provider == "" || c.Client.Address == "" {
-			return nil, fmt.Errorf("unexpected scope %s without configuration passed", scope)
-		}
+	var (
+		err error
+		ds  *datastore
+	)
 
-		cfg = c
+	if !cfg.IsValid() {
+		return nil, fmt.Errorf("invalid datastore configuration passed for scope %s", scope)
 	}
 
-	var cached bool
-	if scope == LocalScope {
-		cached = true
+	storeLock.Lock()
+	sdata, ok := storeTable[*cfg]
+	if ok {
+		sdata.refCnt++
+		// If sdata already has a store nothing to do. Just
+		// create a datastore handle using it and return with
+		// that.
+		if sdata.store != nil {
+			storeLock.Unlock()
+			return &datastore{scope: scope, cfg: *cfg, store: sdata.store}, nil
+		}
+	} else {
+		// If sdata is not present create one and add ito
+		// storeTable while holding the lock.
+		sdata = &storeTableData{refCnt: 1}
+		storeTable[*cfg] = sdata
+	}
+	storeLock.Unlock()
+
+	// We come here either because:
+	//
+	// 1. We just created the store table data OR
+	// 2. We picked up the store table data from table but store was not initialized.
+	//
+	// In both cases the once function will ensure the store
+	// initialization happens exactly once
+	sdata.once.Do(func() {
+		ds, err = newClient(scope, cfg.Client.Provider, cfg.Client.Address, cfg.Client.Config, scope == LocalScope)
+		if err != nil {
+			return
+		}
+
+		ds.cfg = *cfg
+		sdata.store = ds.store
+	})
+
+	if err != nil {
+		return nil, err
 	}
 
-	return newClient(scope, cfg.Client.Provider, cfg.Client.Address, cfg.Client.Config, cached)
+	return ds, nil
 }
 
 func (ds *datastore) Close() {
+	storeLock.Lock()
+	sdata := storeTable[ds.cfg]
+
+	if sdata == nil {
+		storeLock.Unlock()
+		return
+	}
+
+	sdata.refCnt--
+	if sdata.refCnt > 0 {
+		storeLock.Unlock()
+		return
+	}
+
+	delete(storeTable, ds.cfg)
+	storeLock.Unlock()
+
 	ds.store.Close()
 }
 

+ 16 - 5
libnetwork/store_test.go

@@ -43,9 +43,17 @@ func TestBoltdbBackend(t *testing.T) {
 
 func testLocalBackend(t *testing.T, provider, url string, storeConfig *store.Config) {
 	cfgOptions := []config.Option{}
-	cfgOptions = append(cfgOptions, config.OptionLocalKVProvider(provider))
-	cfgOptions = append(cfgOptions, config.OptionLocalKVProviderURL(url))
-	cfgOptions = append(cfgOptions, config.OptionLocalKVProviderConfig(storeConfig))
+	if provider != "" {
+		cfgOptions = append(cfgOptions, config.OptionLocalKVProvider(provider))
+	}
+
+	if url != "" {
+		cfgOptions = append(cfgOptions, config.OptionLocalKVProviderURL(url))
+	}
+
+	if storeConfig != nil {
+		cfgOptions = append(cfgOptions, config.OptionLocalKVProviderConfig(storeConfig))
+	}
 
 	driverOptions := options.Generic{}
 	genericOption := make(map[string]interface{})
@@ -71,7 +79,7 @@ func testLocalBackend(t *testing.T, provider, url string, storeConfig *store.Con
 	if exists, err := store.Exists(datastore.Key([]string{datastore.EndpointKeyPrefix, string(nw.ID()), string(ep.ID())}...)); exists || err != nil {
 		t.Fatalf("Endpoint key shouldn't have been created.")
 	}
-	store.Close()
+	ctrl.(*controller).getStore(datastore.LocalScope).Close()
 
 	// test restore of local store
 	ctrl, err = New(cfgOptions...)
@@ -138,7 +146,10 @@ func TestLocalStoreLockTimeout(t *testing.T) {
 	}
 	defer ctrl1.Stop()
 	// Use the same boltdb file without closing the previous controller
-	_, err = New(cfgOptions...)
+	// with a slightly altered configuration
+	sCfg := &store.Config{Bucket: "testBackend", ConnectionTimeout: 1 * time.Second}
+	_, err = New(append(cfgOptions[:len(cfgOptions)-1],
+		config.OptionLocalKVProviderConfig(sCfg))...)
 	if err == nil {
 		t.Fatalf("Expected to fail but succeeded")
 	}