libnetwork/datastore: remove Watch(), Watchable(), RestartWatch()

The `store.Watch()` was only used in `Controller.processEndpointCreate()`,
and skipped if the store was not "watchable" (`store.Watchable()`).

Whether a store is watchable depends on the store's datastore.scope;
local stores are not watchable;

    func (ds *datastore) Watchable() bool {
        return ds.scope != LocalScope
    }

datastore is only initialized in two locations, and both locations set the
scope field to LocalScope:

datastore.newClient() (also called by datastore.NewDataStore()):
3e4c9d90cf/libnetwork/datastore/datastore.go (L213)

datastore.NewTestDataStore() (used in tests);
3e4c9d90cf/libnetwork/datastore/datastore_test.go (L14-L17)

Furthermore, the backing BoltDB kvstore does not implement the Watch()
method;

3e4c9d90cf/libnetwork/internal/kvstore/boltdb/boltdb.go (L464-L467)

Based on the above;

- our datastore is never Watchable()
- so datastore.Watch() is never used

This patch removes the Watchable(), Watch(), and RestartWatch() functions,
as well as the code handling watching.

Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
This commit is contained in:
Sebastiaan van Stijn 2023-07-02 16:03:45 +02:00
parent 2409a36e29
commit a3b0181503
No known key found for this signature in database
GPG key ID: 76698F39D527CE8C
2 changed files with 4 additions and 175 deletions

View file

@ -2,7 +2,6 @@ package datastore
import (
"fmt"
"log"
"reflect"
"strings"
"sync"
@ -27,12 +26,6 @@ type DataStore interface {
DeleteObjectAtomic(kvObject KVObject) error
// DeleteTree deletes a record
DeleteTree(kvObject KVObject) error
// Watchable returns whether the store is watchable or not
Watchable() bool
// Watch for changes on a KVObject
Watch(kvObject KVObject, stopCh <-chan struct{}) (<-chan KVObject, error)
// RestartWatch retriggers stopped Watches
RestartWatch()
// List returns of a list of KVObjects belonging to the parent
// key. The caller must pass a KVObject of the same type as
// the objects that need to be listed
@ -54,10 +47,9 @@ var (
)
type datastore struct {
scope string
store store.Store
cache *cache
watchCh chan struct{}
scope string
store store.Store
cache *cache
sync.Mutex
}
@ -206,7 +198,7 @@ func newClient(kv string, addr string, config *store.Config) (DataStore, error)
return nil, err
}
ds := &datastore{scope: LocalScope, store: s, watchCh: make(chan struct{})}
ds := &datastore{scope: LocalScope, store: s}
ds.cache = newCache(ds)
return ds, nil
@ -257,83 +249,6 @@ func (ds *datastore) Scope() string {
return ds.scope
}
func (ds *datastore) Watchable() bool {
return ds.scope != LocalScope
}
func (ds *datastore) Watch(kvObject KVObject, stopCh <-chan struct{}) (<-chan KVObject, error) {
sCh := make(chan struct{})
ctor, ok := kvObject.(KVConstructor)
if !ok {
return nil, fmt.Errorf("error watching object type %T, object does not implement KVConstructor interface", kvObject)
}
kvpCh, err := ds.store.Watch(Key(kvObject.Key()...), sCh)
if err != nil {
return nil, err
}
kvoCh := make(chan KVObject)
go func() {
retry_watch:
var err error
// Make sure to get a new instance of watch channel
ds.Lock()
watchCh := ds.watchCh
ds.Unlock()
loop:
for {
select {
case <-stopCh:
close(sCh)
return
case kvPair := <-kvpCh:
// If the backend KV store gets reset libkv's go routine
// for the watch can exit resulting in a nil value in
// channel.
if kvPair == nil {
break loop
}
dstO := ctor.New()
if err = dstO.SetValue(kvPair.Value); err != nil {
log.Printf("Could not unmarshal kvpair value = %s", string(kvPair.Value))
break
}
dstO.SetIndex(kvPair.LastIndex)
kvoCh <- dstO
}
}
// Wait on watch channel for a re-trigger when datastore becomes active
<-watchCh
kvpCh, err = ds.store.Watch(Key(kvObject.Key()...), sCh)
if err != nil {
log.Printf("Could not watch the key %s in store: %v", Key(kvObject.Key()...), err)
}
goto retry_watch
}()
return kvoCh, nil
}
func (ds *datastore) RestartWatch() {
ds.Lock()
defer ds.Unlock()
watchCh := ds.watchCh
ds.watchCh = make(chan struct{})
close(watchCh)
}
func (ds *datastore) KVStore() store.Store {
return ds.store
}

View file

@ -200,7 +200,6 @@ retry:
type netWatch struct {
localEps map[string]*Endpoint
remoteEps map[string]*Endpoint
stopCh chan struct{}
}
func (c *Controller) getLocalEps(nw *netWatch) []*Endpoint {
@ -223,71 +222,6 @@ func (c *Controller) unWatchSvcRecord(ep *Endpoint) {
c.unWatchCh <- ep
}
func (c *Controller) networkWatchLoop(nw *netWatch, ep *Endpoint, ecCh <-chan datastore.KVObject) {
for {
select {
case <-nw.stopCh:
return
case o := <-ecCh:
ec := o.(*endpointCnt)
epl, err := ec.n.getEndpointsFromStore()
if err != nil {
log.G(context.TODO()).WithError(err).Debug("error getting endpoints from store")
continue
}
c.mu.Lock()
var addEp []*Endpoint
delEpMap := make(map[string]*Endpoint)
renameEpMap := make(map[string]bool)
for k, v := range nw.remoteEps {
delEpMap[k] = v
}
for _, lEp := range epl {
if _, ok := nw.localEps[lEp.ID()]; ok {
continue
}
if ep, ok := nw.remoteEps[lEp.ID()]; ok {
// On a container rename EP ID will remain
// the same but the name will change. service
// records should reflect the change.
// Keep old EP entry in the delEpMap and add
// EP from the store (which has the new name)
// into the new list
if lEp.name == ep.name {
delete(delEpMap, lEp.ID())
continue
}
renameEpMap[lEp.ID()] = true
}
nw.remoteEps[lEp.ID()] = lEp
addEp = append(addEp, lEp)
}
// EPs whose name are to be deleted from the svc records
// should also be removed from nw's remote EP list, except
// the ones that are getting renamed.
for _, lEp := range delEpMap {
if !renameEpMap[lEp.ID()] {
delete(nw.remoteEps, lEp.ID())
}
}
c.mu.Unlock()
for _, lEp := range delEpMap {
ep.getNetwork().updateSvcRecord(lEp, c.getLocalEps(nw), false)
}
for _, lEp := range addEp {
ep.getNetwork().updateSvcRecord(lEp, c.getLocalEps(nw), true)
}
}
}
}
func (c *Controller) processEndpointCreate(nmap map[string]*netWatch, ep *Endpoint) {
n := ep.getNetwork()
if !c.isDistributedControl() && n.Scope() == datastore.SwarmScope && n.driverIsMultihost() {
@ -329,25 +263,7 @@ func (c *Controller) processEndpointCreate(nmap map[string]*netWatch, ep *Endpoi
c.mu.Lock()
nw.localEps[endpointID] = ep
nmap[networkID] = nw
nw.stopCh = make(chan struct{})
c.mu.Unlock()
store := c.getStore()
if store == nil {
return
}
if !store.Watchable() {
return
}
ch, err := store.Watch(n.getEpCnt(), nw.stopCh)
if err != nil {
log.G(context.TODO()).Warnf("Error creating watch for network: %v", err)
return
}
go c.networkWatchLoop(nw, ep, ch)
}
func (c *Controller) processEndpointDelete(nmap map[string]*netWatch, ep *Endpoint) {
@ -373,8 +289,6 @@ func (c *Controller) processEndpointDelete(nmap map[string]*netWatch, ep *Endpoi
c.mu.Lock()
if len(nw.localEps) == 0 {
close(nw.stopCh)
// This is the last container going away for the network. Destroy
// this network's svc db entry
delete(c.svcRecords, networkID)