libnetwork: remove more datastore scope plumbing
Signed-off-by: Cory Snider <csnider@mirantis.com>
This commit is contained in:
parent
142b522946
commit
befff0e13f
9 changed files with 73 additions and 101 deletions
|
@ -360,7 +360,7 @@ func (daemon *Daemon) createNetwork(create types.NetworkCreateRequest, id string
|
|||
|
||||
n, err := c.NewNetwork(driver, create.Name, id, nwOptions...)
|
||||
if err != nil {
|
||||
if _, ok := err.(libnetwork.ErrDataStoreNotInitialized); ok {
|
||||
if errors.Is(err, libnetwork.ErrDataStoreNotInitialized) {
|
||||
//nolint: revive
|
||||
return nil, errors.New("This node is not a swarm manager. Use \"docker swarm init\" or \"docker swarm join\" to connect this node to swarm and try again.")
|
||||
}
|
||||
|
|
|
@ -88,7 +88,7 @@ type Controller struct {
|
|||
drvRegistry *drvregistry.DrvRegistry
|
||||
sandboxes sandboxTable
|
||||
cfg *config.Config
|
||||
stores []datastore.DataStore
|
||||
store datastore.DataStore
|
||||
extKeyListener net.Listener
|
||||
watchCh chan *Endpoint
|
||||
unWatchCh chan *Endpoint
|
||||
|
@ -130,7 +130,7 @@ func New(cfgOptions ...config.Option) (*Controller, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
drvRegistry, err := drvregistry.New(c.getStore(datastore.LocalScope), c.getStore(datastore.GlobalScope), c.RegisterDriver, nil, c.cfg.PluginGetter)
|
||||
drvRegistry, err := drvregistry.New(c.getStore(), nil, c.RegisterDriver, nil, c.cfg.PluginGetter)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -149,7 +149,7 @@ func New(cfgOptions ...config.Option) (*Controller, error) {
|
|||
}
|
||||
}
|
||||
|
||||
if err = initIPAMDrivers(drvRegistry, nil, c.getStore(datastore.GlobalScope), c.cfg.DefaultAddressPool); err != nil {
|
||||
if err = initIPAMDrivers(drvRegistry, nil, nil, c.cfg.DefaultAddressPool); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -696,7 +696,7 @@ var joinCluster NetworkWalker = func(nw Network) bool {
|
|||
}
|
||||
|
||||
func (c *Controller) reservePools() {
|
||||
networks, err := c.getNetworksForScope(datastore.LocalScope)
|
||||
networks, err := c.getNetworks()
|
||||
if err != nil {
|
||||
logrus.Warnf("Could not retrieve networks from local store during ipam allocation for existing networks: %v", err)
|
||||
return
|
||||
|
|
|
@ -1151,7 +1151,7 @@ func (c *Controller) cleanupLocalEndpoints() {
|
|||
eps[ep.id] = true
|
||||
}
|
||||
}
|
||||
nl, err := c.getNetworksForScope(datastore.LocalScope)
|
||||
nl, err := c.getNetworks()
|
||||
if err != nil {
|
||||
logrus.Warnf("Could not get list of networks during endpoint cleanup: %v", err)
|
||||
return
|
||||
|
|
|
@ -109,7 +109,7 @@ func (ec *endpointCnt) EndpointCnt() uint64 {
|
|||
}
|
||||
|
||||
func (ec *endpointCnt) updateStore() error {
|
||||
store := ec.n.getController().getStore(ec.DataScope())
|
||||
store := ec.n.getController().getStore()
|
||||
if store == nil {
|
||||
return fmt.Errorf("store not found for scope %s on endpoint count update", ec.DataScope())
|
||||
}
|
||||
|
@ -138,7 +138,7 @@ func (ec *endpointCnt) setCnt(cnt uint64) error {
|
|||
}
|
||||
|
||||
func (ec *endpointCnt) atomicIncDecEpCnt(inc bool) error {
|
||||
store := ec.n.getController().getStore(ec.DataScope())
|
||||
store := ec.n.getController().getStore()
|
||||
if store == nil {
|
||||
return fmt.Errorf("store not found for scope %s", ec.DataScope())
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package libnetwork
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
|
@ -186,8 +187,4 @@ func (mr ManagerRedirectError) Maskable() {}
|
|||
|
||||
// ErrDataStoreNotInitialized is returned if an invalid data scope is passed
|
||||
// for getting data store
|
||||
type ErrDataStoreNotInitialized string
|
||||
|
||||
func (dsni ErrDataStoreNotInitialized) Error() string {
|
||||
return fmt.Sprintf("datastore for scope %q is not initialized", string(dsni))
|
||||
}
|
||||
var ErrDataStoreNotInitialized = errors.New("datastore is not initialized")
|
||||
|
|
|
@ -187,7 +187,7 @@ func (sb *Sandbox) storeDelete() error {
|
|||
}
|
||||
|
||||
func (c *Controller) sandboxCleanup(activeSandboxes map[string]interface{}) {
|
||||
store := c.getStore(datastore.LocalScope)
|
||||
store := c.getStore()
|
||||
if store == nil {
|
||||
logrus.Error("Could not find local scope store while trying to cleanup sandboxes")
|
||||
return
|
||||
|
|
|
@ -22,40 +22,27 @@ func (c *Controller) initStores() error {
|
|||
if c.cfg == nil {
|
||||
return nil
|
||||
}
|
||||
store, err := datastore.NewDataStore(c.cfg.Scope)
|
||||
var err error
|
||||
c.store, err = datastore.NewDataStore(c.cfg.Scope)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.stores = []datastore.DataStore{store}
|
||||
|
||||
c.startWatch()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Controller) closeStores() {
|
||||
for _, store := range c.getStores() {
|
||||
if store := c.store; store != nil {
|
||||
store.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Controller) getStore(scope string) datastore.DataStore {
|
||||
func (c *Controller) getStore() datastore.DataStore {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
for _, store := range c.stores {
|
||||
if store.Scope() == scope {
|
||||
return store
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Controller) getStores() []datastore.DataStore {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
return c.stores
|
||||
return c.store
|
||||
}
|
||||
|
||||
func (c *Controller) getNetworkFromStore(nid string) (*network, error) {
|
||||
|
@ -67,10 +54,10 @@ func (c *Controller) getNetworkFromStore(nid string) (*network, error) {
|
|||
return nil, ErrNoSuchNetwork(nid)
|
||||
}
|
||||
|
||||
func (c *Controller) getNetworksForScope(scope string) ([]*network, error) {
|
||||
func (c *Controller) getNetworks() ([]*network, error) {
|
||||
var nl []*network
|
||||
|
||||
store := c.getStore(scope)
|
||||
store := c.getStore()
|
||||
if store == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
@ -78,8 +65,7 @@ func (c *Controller) getNetworksForScope(scope string) ([]*network, error) {
|
|||
kvol, err := store.List(datastore.Key(datastore.NetworkKeyPrefix),
|
||||
&network{ctrlr: c})
|
||||
if err != nil && err != datastore.ErrKeyNotFound {
|
||||
return nil, fmt.Errorf("failed to get networks for scope %s: %v",
|
||||
scope, err)
|
||||
return nil, fmt.Errorf("failed to get networks: %w", err)
|
||||
}
|
||||
|
||||
for _, kvo := range kvol {
|
||||
|
@ -95,7 +81,7 @@ func (c *Controller) getNetworksForScope(scope string) ([]*network, error) {
|
|||
|
||||
n.epCnt = ec
|
||||
if n.scope == "" {
|
||||
n.scope = scope
|
||||
n.scope = store.Scope()
|
||||
}
|
||||
nl = append(nl, n)
|
||||
}
|
||||
|
@ -103,92 +89,80 @@ func (c *Controller) getNetworksForScope(scope string) ([]*network, error) {
|
|||
return nl, nil
|
||||
}
|
||||
|
||||
func (c *Controller) getNetworksFromStore() []*network {
|
||||
func (c *Controller) getNetworksFromStore() []*network { // FIXME: unify with c.getNetworks()
|
||||
var nl []*network
|
||||
|
||||
for _, store := range c.getStores() {
|
||||
kvol, err := store.List(datastore.Key(datastore.NetworkKeyPrefix), &network{ctrlr: c})
|
||||
// Continue searching in the next store if no keys found in this store
|
||||
if err != nil {
|
||||
if err != datastore.ErrKeyNotFound {
|
||||
logrus.Debugf("failed to get networks for scope %s: %v", store.Scope(), err)
|
||||
}
|
||||
continue
|
||||
store := c.getStore()
|
||||
kvol, err := store.List(datastore.Key(datastore.NetworkKeyPrefix), &network{ctrlr: c})
|
||||
if err != nil {
|
||||
if err != datastore.ErrKeyNotFound {
|
||||
logrus.Debugf("failed to get networks from store: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
kvep, err := store.Map(datastore.Key(epCntKeyPrefix), &endpointCnt{})
|
||||
if err != nil && err != datastore.ErrKeyNotFound {
|
||||
logrus.Warnf("failed to get endpoint_count map for scope %s: %v", store.Scope(), err)
|
||||
}
|
||||
kvep, err := store.Map(datastore.Key(epCntKeyPrefix), &endpointCnt{})
|
||||
if err != nil && err != datastore.ErrKeyNotFound {
|
||||
logrus.Warnf("failed to get endpoint_count map from store: %v", err)
|
||||
}
|
||||
|
||||
for _, kvo := range kvol {
|
||||
n := kvo.(*network)
|
||||
n.mu.Lock()
|
||||
n.ctrlr = c
|
||||
ec := &endpointCnt{n: n}
|
||||
// Trim the leading & trailing "/" to make it consistent across all stores
|
||||
if val, ok := kvep[strings.Trim(datastore.Key(ec.Key()...), "/")]; ok {
|
||||
ec = val.(*endpointCnt)
|
||||
ec.n = n
|
||||
n.epCnt = ec
|
||||
}
|
||||
if n.scope == "" {
|
||||
n.scope = store.Scope()
|
||||
}
|
||||
n.mu.Unlock()
|
||||
nl = append(nl, n)
|
||||
for _, kvo := range kvol {
|
||||
n := kvo.(*network)
|
||||
n.mu.Lock()
|
||||
n.ctrlr = c
|
||||
ec := &endpointCnt{n: n}
|
||||
// Trim the leading & trailing "/" to make it consistent across all stores
|
||||
if val, ok := kvep[strings.Trim(datastore.Key(ec.Key()...), "/")]; ok {
|
||||
ec = val.(*endpointCnt)
|
||||
ec.n = n
|
||||
n.epCnt = ec
|
||||
}
|
||||
if n.scope == "" {
|
||||
n.scope = store.Scope()
|
||||
}
|
||||
n.mu.Unlock()
|
||||
nl = append(nl, n)
|
||||
}
|
||||
|
||||
return nl
|
||||
}
|
||||
|
||||
func (n *network) getEndpointFromStore(eid string) (*Endpoint, error) {
|
||||
var errors []string
|
||||
for _, store := range n.ctrlr.getStores() {
|
||||
ep := &Endpoint{id: eid, network: n}
|
||||
err := store.GetObject(datastore.Key(ep.Key()...), ep)
|
||||
// Continue searching in the next store if the key is not found in this store
|
||||
if err != nil {
|
||||
if err != datastore.ErrKeyNotFound {
|
||||
errors = append(errors, fmt.Sprintf("{%s:%v}, ", store.Scope(), err))
|
||||
logrus.Debugf("could not find endpoint %s in %s: %v", eid, store.Scope(), err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
return ep, nil
|
||||
store := n.ctrlr.getStore()
|
||||
ep := &Endpoint{id: eid, network: n}
|
||||
err := store.GetObject(datastore.Key(ep.Key()...), ep)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not find endpoint %s: %w", eid, err)
|
||||
}
|
||||
return nil, fmt.Errorf("could not find endpoint %s: %v", eid, errors)
|
||||
return ep, nil
|
||||
}
|
||||
|
||||
func (n *network) getEndpointsFromStore() ([]*Endpoint, error) {
|
||||
var epl []*Endpoint
|
||||
|
||||
tmp := Endpoint{network: n}
|
||||
for _, store := range n.getController().getStores() {
|
||||
kvol, err := store.List(datastore.Key(tmp.KeyPrefix()...), &Endpoint{network: n})
|
||||
// Continue searching in the next store if no keys found in this store
|
||||
if err != nil {
|
||||
if err != datastore.ErrKeyNotFound {
|
||||
logrus.Debugf("failed to get endpoints for network %s scope %s: %v",
|
||||
n.Name(), store.Scope(), err)
|
||||
}
|
||||
continue
|
||||
store := n.getController().getStore()
|
||||
kvol, err := store.List(datastore.Key(tmp.KeyPrefix()...), &Endpoint{network: n})
|
||||
if err != nil {
|
||||
if err != datastore.ErrKeyNotFound {
|
||||
return nil, fmt.Errorf("failed to get endpoints for network %s scope %s: %w",
|
||||
n.Name(), store.Scope(), err)
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
for _, kvo := range kvol {
|
||||
ep := kvo.(*Endpoint)
|
||||
epl = append(epl, ep)
|
||||
}
|
||||
for _, kvo := range kvol {
|
||||
ep := kvo.(*Endpoint)
|
||||
epl = append(epl, ep)
|
||||
}
|
||||
|
||||
return epl, nil
|
||||
}
|
||||
|
||||
func (c *Controller) updateToStore(kvObject datastore.KVObject) error {
|
||||
cs := c.getStore(kvObject.DataScope())
|
||||
cs := c.getStore()
|
||||
if cs == nil {
|
||||
return ErrDataStoreNotInitialized(kvObject.DataScope())
|
||||
return ErrDataStoreNotInitialized
|
||||
}
|
||||
|
||||
if err := cs.PutObjectAtomic(kvObject); err != nil {
|
||||
|
@ -202,9 +176,9 @@ func (c *Controller) updateToStore(kvObject datastore.KVObject) error {
|
|||
}
|
||||
|
||||
func (c *Controller) deleteFromStore(kvObject datastore.KVObject) error {
|
||||
cs := c.getStore(kvObject.DataScope())
|
||||
cs := c.getStore()
|
||||
if cs == nil {
|
||||
return ErrDataStoreNotInitialized(kvObject.DataScope())
|
||||
return ErrDataStoreNotInitialized
|
||||
}
|
||||
|
||||
retry:
|
||||
|
@ -258,7 +232,8 @@ func (c *Controller) networkWatchLoop(nw *netWatch, ep *Endpoint, ecCh <-chan da
|
|||
|
||||
epl, err := ec.n.getEndpointsFromStore()
|
||||
if err != nil {
|
||||
break
|
||||
logrus.WithError(err).Debug("error getting endpoints from store")
|
||||
continue
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
|
@ -356,7 +331,7 @@ func (c *Controller) processEndpointCreate(nmap map[string]*netWatch, ep *Endpoi
|
|||
nw.stopCh = make(chan struct{})
|
||||
c.mu.Unlock()
|
||||
|
||||
store := c.getStore(n.DataScope())
|
||||
store := c.getStore()
|
||||
if store == nil {
|
||||
return
|
||||
}
|
||||
|
|
|
@ -30,7 +30,7 @@ func TestNoPersist(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("Error creating endpoint: %v", err)
|
||||
}
|
||||
store := ctrl.getStore(datastore.LocalScope).KVStore()
|
||||
store := ctrl.getStore().KVStore()
|
||||
if exists, _ := store.Exists(datastore.Key(datastore.NetworkKeyPrefix, nw.ID())); exists {
|
||||
t.Fatalf("Network with persist=false should not be stored in KV Store")
|
||||
}
|
||||
|
|
|
@ -36,7 +36,7 @@ func testLocalBackend(t *testing.T, provider, url string, storeConfig *store.Con
|
|||
if err != nil {
|
||||
t.Fatalf("Error creating endpoint: %v", err)
|
||||
}
|
||||
store := ctrl.getStore(datastore.LocalScope).KVStore()
|
||||
store := ctrl.getStore().KVStore()
|
||||
if exists, err := store.Exists(datastore.Key(datastore.NetworkKeyPrefix, nw.ID())); !exists || err != nil {
|
||||
t.Fatalf("Network key should have been created.")
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue