Merge pull request #45870 from thaJeztah/libnetwork_cleanup_kvstore

libnetwork: remove dead / unused code from datastore and kvstore
This commit is contained in:
Bjorn Neergaard 2023-07-05 07:00:55 -06:00 committed by GitHub
commit 569e1a4b77
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 160 additions and 671 deletions

View file

@ -312,9 +312,9 @@ func (c *Controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, d
c.DiagnosticServer.RegisterHandler(nDB, networkdb.NetDbPaths2Func)
var cancelList []func()
ch, cancel := nDB.Watch(libnetworkEPTable, "", "")
ch, cancel := nDB.Watch(libnetworkEPTable, "")
cancelList = append(cancelList, cancel)
nodeCh, cancel := nDB.Watch(networkdb.NodeTable, "", "")
nodeCh, cancel := nDB.Watch(networkdb.NodeTable, "")
cancelList = append(cancelList, cancel)
c.mu.Lock()
@ -783,7 +783,7 @@ func (n *network) addDriverWatches() {
return
}
for _, table := range n.driverTables {
ch, cancel := agent.networkDB.Watch(table.name, n.ID(), "")
ch, cancel := agent.networkDB.Watch(table.name, n.ID())
agent.Lock()
agent.driverCancelFuncs[n.ID()] = append(agent.driverCancelFuncs[n.ID()], cancel)
agent.Unlock()

View file

@ -45,7 +45,7 @@ func watchTable(ctx interface{}, w http.ResponseWriter, r *http.Request) {
nDB, ok := ctx.(*networkdb.NetworkDB)
if ok {
ch, cancel := nDB.Watch(tableName, "", "")
ch, cancel := nDB.Watch(tableName, "")
clientWatchTable[tableName] = tableHandler{cancelWatch: cancel, entries: make(map[string]string)}
go handleTableEvents(tableName, ch)

View file

@ -7,7 +7,6 @@ import (
"github.com/containerd/containerd/log"
"github.com/docker/docker/libnetwork/cluster"
"github.com/docker/docker/libnetwork/datastore"
store "github.com/docker/docker/libnetwork/internal/kvstore"
"github.com/docker/docker/libnetwork/ipamutils"
"github.com/docker/docker/libnetwork/netlabel"
"github.com/docker/docker/libnetwork/osl"
@ -141,30 +140,6 @@ func IsValidName(name string) bool {
return strings.TrimSpace(name) != ""
}
// OptionLocalKVProvider function returns an option setter for kvstore provider
func OptionLocalKVProvider(provider string) Option {
return func(c *Config) {
log.G(context.TODO()).Debugf("Option OptionLocalKVProvider: %s", provider)
c.Scope.Client.Provider = strings.TrimSpace(provider)
}
}
// OptionLocalKVProviderURL function returns an option setter for kvstore url
func OptionLocalKVProviderURL(url string) Option {
return func(c *Config) {
log.G(context.TODO()).Debugf("Option OptionLocalKVProviderURL: %s", url)
c.Scope.Client.Address = strings.TrimSpace(url)
}
}
// OptionLocalKVProviderConfig function returns an option setter for kvstore config
func OptionLocalKVProviderConfig(config *store.Config) Option {
return func(c *Config) {
log.G(context.TODO()).Debugf("Option OptionLocalKVProviderConfig: %v", config)
c.Scope.Client.Config = config
}
}
// OptionActiveSandboxes function returns an option setter for passing the sandboxes
// which were active during previous daemon life
func OptionActiveSandboxes(sandboxes map[string]interface{}) Option {

View file

@ -138,7 +138,7 @@ func (c *cache) del(kvObject KVObject, atomic bool) error {
return nil
}
func (c *cache) get(key string, kvObject KVObject) error {
func (c *cache) get(kvObject KVObject) error {
kmap, err := c.kmap(kvObject)
if err != nil {
return err

View file

@ -2,7 +2,6 @@ package datastore
import (
"fmt"
"log"
"reflect"
"strings"
"sync"
@ -17,24 +16,10 @@ import (
type DataStore interface {
// GetObject gets data from datastore and unmarshals to the specified object
GetObject(key string, o KVObject) error
// PutObject adds a new Record based on an object into the datastore
PutObject(kvObject KVObject) error
// PutObjectAtomic provides an atomic add and update operation for a Record
PutObjectAtomic(kvObject KVObject) error
// DeleteObject deletes a record
DeleteObject(kvObject KVObject) error
// DeleteObjectAtomic performs an atomic delete operation
DeleteObjectAtomic(kvObject KVObject) error
// DeleteTree deletes a record
DeleteTree(kvObject KVObject) error
// Watchable returns whether the store is watchable or not
Watchable() bool
// Watch for changes on a KVObject
Watch(kvObject KVObject, stopCh <-chan struct{}) (<-chan KVObject, error)
// RestartWatch retriggers stopped Watches
RestartWatch()
// Active returns if the store is active
Active() bool
// List returns of a list of KVObjects belonging to the parent
// key. The caller must pass a KVObject of the same type as
// the objects that need to be listed
@ -56,13 +41,10 @@ var (
)
type datastore struct {
scope string
store store.Store
cache *cache
watchCh chan struct{}
active bool
sequential bool
sync.Mutex
mu sync.Mutex
scope string
store store.Store
cache *cache
}
// KVObject is Key/Value interface used by objects to be part of the DataStore
@ -210,7 +192,7 @@ func newClient(kv string, addr string, config *store.Config) (DataStore, error)
return nil, err
}
ds := &datastore{scope: LocalScope, store: s, active: true, watchCh: make(chan struct{}), sequential: true}
ds := &datastore{scope: LocalScope, store: s}
ds.cache = newCache(ds)
return ds, nil
@ -261,91 +243,6 @@ func (ds *datastore) Scope() string {
return ds.scope
}
func (ds *datastore) Active() bool {
return ds.active
}
func (ds *datastore) Watchable() bool {
return ds.scope != LocalScope
}
func (ds *datastore) Watch(kvObject KVObject, stopCh <-chan struct{}) (<-chan KVObject, error) {
sCh := make(chan struct{})
ctor, ok := kvObject.(KVConstructor)
if !ok {
return nil, fmt.Errorf("error watching object type %T, object does not implement KVConstructor interface", kvObject)
}
kvpCh, err := ds.store.Watch(Key(kvObject.Key()...), sCh)
if err != nil {
return nil, err
}
kvoCh := make(chan KVObject)
go func() {
retry_watch:
var err error
// Make sure to get a new instance of watch channel
ds.Lock()
watchCh := ds.watchCh
ds.Unlock()
loop:
for {
select {
case <-stopCh:
close(sCh)
return
case kvPair := <-kvpCh:
// If the backend KV store gets reset libkv's go routine
// for the watch can exit resulting in a nil value in
// channel.
if kvPair == nil {
ds.Lock()
ds.active = false
ds.Unlock()
break loop
}
dstO := ctor.New()
if err = dstO.SetValue(kvPair.Value); err != nil {
log.Printf("Could not unmarshal kvpair value = %s", string(kvPair.Value))
break
}
dstO.SetIndex(kvPair.LastIndex)
kvoCh <- dstO
}
}
// Wait on watch channel for a re-trigger when datastore becomes active
<-watchCh
kvpCh, err = ds.store.Watch(Key(kvObject.Key()...), sCh)
if err != nil {
log.Printf("Could not watch the key %s in store: %v", Key(kvObject.Key()...), err)
}
goto retry_watch
}()
return kvoCh, nil
}
func (ds *datastore) RestartWatch() {
ds.Lock()
defer ds.Unlock()
ds.active = true
watchCh := ds.watchCh
ds.watchCh = make(chan struct{})
close(watchCh)
}
func (ds *datastore) KVStore() store.Store {
return ds.store
}
@ -357,10 +254,8 @@ func (ds *datastore) PutObjectAtomic(kvObject KVObject) error {
pair *store.KVPair
err error
)
if ds.sequential {
ds.Lock()
defer ds.Unlock()
}
ds.mu.Lock()
defer ds.mu.Unlock()
if kvObject == nil {
return types.BadRequestErrorf("invalid KV Object : nil")
@ -382,7 +277,7 @@ func (ds *datastore) PutObjectAtomic(kvObject KVObject) error {
previous = nil
}
_, pair, err = ds.store.AtomicPut(Key(kvObject.Key()...), kvObjValue, previous, nil)
pair, err = ds.store.AtomicPut(Key(kvObject.Key()...), kvObjValue, previous)
if err != nil {
if err == store.ErrKeyExists {
return ErrKeyModified
@ -402,53 +297,13 @@ add_cache:
return nil
}
// PutObject adds a new Record based on an object into the datastore
func (ds *datastore) PutObject(kvObject KVObject) error {
if ds.sequential {
ds.Lock()
defer ds.Unlock()
}
if kvObject == nil {
return types.BadRequestErrorf("invalid KV Object : nil")
}
if kvObject.Skip() {
goto add_cache
}
if err := ds.putObjectWithKey(kvObject, kvObject.Key()...); err != nil {
return err
}
add_cache:
if ds.cache != nil {
// If persistent store is skipped, sequencing needs to
// happen in cache.
return ds.cache.add(kvObject, kvObject.Skip())
}
return nil
}
func (ds *datastore) putObjectWithKey(kvObject KVObject, key ...string) error {
kvObjValue := kvObject.Value()
if kvObjValue == nil {
return types.BadRequestErrorf("invalid KV Object with a nil Value for key %s", Key(kvObject.Key()...))
}
return ds.store.Put(Key(key...), kvObjValue, nil)
}
// GetObject returns a record matching the key
func (ds *datastore) GetObject(key string, o KVObject) error {
if ds.sequential {
ds.Lock()
defer ds.Unlock()
}
ds.mu.Lock()
defer ds.mu.Unlock()
if ds.cache != nil {
return ds.cache.get(key, o)
return ds.cache.get(o)
}
kvPair, err := ds.store.Get(key)
@ -474,14 +329,12 @@ func (ds *datastore) ensureParent(parent string) error {
if exists {
return nil
}
return ds.store.Put(parent, []byte{}, &store.WriteOptions{IsDir: true})
return ds.store.Put(parent, []byte{})
}
func (ds *datastore) List(key string, kvObject KVObject) ([]KVObject, error) {
if ds.sequential {
ds.Lock()
defer ds.Unlock()
}
ds.mu.Lock()
defer ds.mu.Unlock()
if ds.cache != nil {
return ds.cache.list(kvObject)
@ -535,10 +388,8 @@ func (ds *datastore) iterateKVPairsFromStore(key string, kvObject KVObject, call
}
func (ds *datastore) Map(key string, kvObject KVObject) (map[string]KVObject, error) {
if ds.sequential {
ds.Lock()
defer ds.Unlock()
}
ds.mu.Lock()
defer ds.mu.Unlock()
kvol := make(map[string]KVObject)
cb := func(key string, val KVObject) {
@ -552,33 +403,10 @@ func (ds *datastore) Map(key string, kvObject KVObject) (map[string]KVObject, er
return kvol, nil
}
// DeleteObject unconditionally deletes a record from the store
func (ds *datastore) DeleteObject(kvObject KVObject) error {
if ds.sequential {
ds.Lock()
defer ds.Unlock()
}
// cleanup the cache first
if ds.cache != nil {
// If persistent store is skipped, sequencing needs to
// happen in cache.
ds.cache.del(kvObject, kvObject.Skip())
}
if kvObject.Skip() {
return nil
}
return ds.store.Delete(Key(kvObject.Key()...))
}
// DeleteObjectAtomic performs atomic delete on a record
func (ds *datastore) DeleteObjectAtomic(kvObject KVObject) error {
if ds.sequential {
ds.Lock()
defer ds.Unlock()
}
ds.mu.Lock()
defer ds.mu.Unlock()
if kvObject == nil {
return types.BadRequestErrorf("invalid KV Object : nil")
@ -590,7 +418,7 @@ func (ds *datastore) DeleteObjectAtomic(kvObject KVObject) error {
goto del_cache
}
if _, err := ds.store.AtomicDelete(Key(kvObject.Key()...), previous); err != nil {
if err := ds.store.AtomicDelete(Key(kvObject.Key()...), previous); err != nil {
if err == store.ErrKeyExists {
return ErrKeyModified
}
@ -607,24 +435,3 @@ del_cache:
return nil
}
// DeleteTree unconditionally deletes a record from the store
func (ds *datastore) DeleteTree(kvObject KVObject) error {
if ds.sequential {
ds.Lock()
defer ds.Unlock()
}
// cleanup the cache first
if ds.cache != nil {
// If persistent store is skipped, sequencing needs to
// happen in cache.
ds.cache.del(kvObject, kvObject.Skip())
}
if kvObject.Skip() {
return nil
}
return ds.store.DeleteTree(Key(kvObject.KeyPrefix()...))
}

View file

@ -51,7 +51,7 @@ func TestInvalidDataStore(t *testing.T) {
func TestKVObjectFlatKey(t *testing.T) {
store := NewTestDataStore()
expected := dummyKVObject("1000", true)
err := store.PutObject(expected)
err := store.PutObjectAtomic(expected)
if err != nil {
t.Fatal(err)
}

View file

@ -38,7 +38,7 @@ func (s *MockStore) Get(key string) (*store.KVPair, error) {
}
// Put a value at "key"
func (s *MockStore) Put(key string, value []byte, options *store.WriteOptions) error {
func (s *MockStore) Put(key string, value []byte) error {
mData := s.db[key]
if mData == nil {
mData = &MockData{value, 0}
@ -48,12 +48,6 @@ func (s *MockStore) Put(key string, value []byte, options *store.WriteOptions) e
return nil
}
// Delete a value at "key"
func (s *MockStore) Delete(key string) error {
delete(s.db, key)
return nil
}
// Exists checks that the key exists inside the store
func (s *MockStore) Exists(key string) (bool, error) {
_, ok := s.db[key]
@ -65,59 +59,38 @@ func (s *MockStore) List(prefix string) ([]*store.KVPair, error) {
return nil, ErrNotImplemented
}
// DeleteTree deletes a range of values at "directory"
func (s *MockStore) DeleteTree(prefix string) error {
delete(s.db, prefix)
return nil
}
// Watch a single key for modifications
func (s *MockStore) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) {
return nil, ErrNotImplemented
}
// WatchTree triggers a watch on a range of values at "directory"
func (s *MockStore) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) {
return nil, ErrNotImplemented
}
// NewLock exposed
func (s *MockStore) NewLock(key string, options *store.LockOptions) (store.Locker, error) {
return nil, ErrNotImplemented
}
// AtomicPut put a value at "key" if the key has not been
// modified in the meantime, throws an error if this is the case
func (s *MockStore) AtomicPut(key string, newValue []byte, previous *store.KVPair, options *store.WriteOptions) (bool, *store.KVPair, error) {
func (s *MockStore) AtomicPut(key string, newValue []byte, previous *store.KVPair) (*store.KVPair, error) {
mData := s.db[key]
if previous == nil {
if mData != nil {
return false, nil, types.BadRequestErrorf("atomic put failed because key exists")
return nil, types.BadRequestErrorf("atomic put failed because key exists")
} // Else OK.
} else {
if mData == nil {
return false, nil, types.BadRequestErrorf("atomic put failed because key exists")
return nil, types.BadRequestErrorf("atomic put failed because key exists")
}
if mData != nil && mData.Index != previous.LastIndex {
return false, nil, types.BadRequestErrorf("atomic put failed due to mismatched Index")
return nil, types.BadRequestErrorf("atomic put failed due to mismatched Index")
} // Else OK.
}
err := s.Put(key, newValue, nil)
if err != nil {
return false, nil, err
if err := s.Put(key, newValue); err != nil {
return nil, err
}
return true, &store.KVPair{Key: key, Value: newValue, LastIndex: s.db[key].Index}, nil
return &store.KVPair{Key: key, Value: newValue, LastIndex: s.db[key].Index}, nil
}
// AtomicDelete deletes a value at "key" if the key has not
// been modified in the meantime, throws an error if this is the case
func (s *MockStore) AtomicDelete(key string, previous *store.KVPair) (bool, error) {
func (s *MockStore) AtomicDelete(key string, previous *store.KVPair) error {
mData := s.db[key]
if mData != nil && mData.Index != previous.LastIndex {
return false, types.BadRequestErrorf("atomic delete failed due to mismatched Index")
return types.BadRequestErrorf("atomic delete failed due to mismatched Index")
}
return true, s.Delete(key)
delete(s.db, key)
return nil
}
// Close closes the client connection

View file

@ -22,12 +22,11 @@ var (
ErrBoltBucketOptionMissing = errors.New("boltBucket config option missing")
)
const (
filePerm os.FileMode = 0644
)
const filePerm = 0o644
// BoltDB type implements the Store interface
type BoltDB struct {
mu sync.Mutex
client *bolt.DB
boltBucket []byte
dbIndex uint64
@ -39,7 +38,6 @@ type BoltDB struct {
// PersistConnection flag provides an option to override ths behavior.
// ie: open the connection in New and use it till Close is called.
PersistConnection bool
sync.Mutex
}
const (
@ -54,13 +52,6 @@ func Register() {
// New opens a new BoltDB connection to the specified path and bucket
func New(endpoints []string, options *store.Config) (store.Store, error) {
var (
db *bolt.DB
err error
boltOptions *bolt.Options
timeout = transientTimeout
)
if len(endpoints) > 1 {
return nil, ErrMultipleEndpointsUnsupported
}
@ -70,18 +61,22 @@ func New(endpoints []string, options *store.Config) (store.Store, error) {
}
dir, _ := filepath.Split(endpoints[0])
if err = os.MkdirAll(dir, 0750); err != nil {
if err := os.MkdirAll(dir, 0o750); err != nil {
return nil, err
}
var db *bolt.DB
if options.PersistConnection {
boltOptions = &bolt.Options{Timeout: options.ConnectionTimeout}
db, err = bolt.Open(endpoints[0], filePerm, boltOptions)
var err error
db, err = bolt.Open(endpoints[0], filePerm, &bolt.Options{
Timeout: options.ConnectionTimeout,
})
if err != nil {
return nil, err
}
}
timeout := transientTimeout
if options.ConnectionTimeout != 0 {
timeout = options.ConnectionTimeout
}
@ -103,18 +98,13 @@ func (b *BoltDB) reset() {
}
func (b *BoltDB) getDBhandle() (*bolt.DB, error) {
var (
db *bolt.DB
err error
)
if !b.PersistConnection {
boltOptions := &bolt.Options{Timeout: b.timeout}
if db, err = bolt.Open(b.path, filePerm, boltOptions); err != nil {
db, err := bolt.Open(b.path, filePerm, &bolt.Options{Timeout: b.timeout})
if err != nil {
return nil, err
}
b.client = db
}
return b.client, nil
}
@ -127,19 +117,16 @@ func (b *BoltDB) releaseDBhandle() {
// Get the value at "key". BoltDB doesn't provide an inbuilt last modified index with every kv pair. Its implemented by
// by a atomic counter maintained by the libkv and appened to the value passed by the client.
func (b *BoltDB) Get(key string) (*store.KVPair, error) {
var (
val []byte
db *bolt.DB
err error
)
b.Lock()
defer b.Unlock()
b.mu.Lock()
defer b.mu.Unlock()
if db, err = b.getDBhandle(); err != nil {
db, err := b.getDBhandle()
if err != nil {
return nil, err
}
defer b.releaseDBhandle()
var val []byte
err = db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(b.boltBucket)
if bucket == nil {
@ -152,129 +139,87 @@ func (b *BoltDB) Get(key string) (*store.KVPair, error) {
return nil
})
if len(val) == 0 {
return nil, store.ErrKeyNotFound
}
if err != nil {
return nil, err
}
if len(val) == 0 {
return nil, store.ErrKeyNotFound
}
dbIndex := binary.LittleEndian.Uint64(val[:libkvmetadatalen])
val = val[libkvmetadatalen:]
return &store.KVPair{Key: key, Value: val, LastIndex: (dbIndex)}, nil
return &store.KVPair{Key: key, Value: val, LastIndex: dbIndex}, nil
}
// Put the key, value pair. index number metadata is prepended to the value
func (b *BoltDB) Put(key string, value []byte, opts *store.WriteOptions) error {
var (
dbIndex uint64
db *bolt.DB
err error
)
b.Lock()
defer b.Unlock()
func (b *BoltDB) Put(key string, value []byte) error {
b.mu.Lock()
defer b.mu.Unlock()
dbval := make([]byte, libkvmetadatalen)
if db, err = b.getDBhandle(); err != nil {
db, err := b.getDBhandle()
if err != nil {
return err
}
defer b.releaseDBhandle()
err = db.Update(func(tx *bolt.Tx) error {
return db.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists(b.boltBucket)
if err != nil {
return err
}
dbIndex = atomic.AddUint64(&b.dbIndex, 1)
dbIndex := atomic.AddUint64(&b.dbIndex, 1)
dbval := make([]byte, libkvmetadatalen)
binary.LittleEndian.PutUint64(dbval, dbIndex)
dbval = append(dbval, value...)
err = bucket.Put([]byte(key), dbval)
if err != nil {
return err
}
return nil
return bucket.Put([]byte(key), dbval)
})
return err
}
// Delete the value for the given key.
func (b *BoltDB) Delete(key string) error {
var (
db *bolt.DB
err error
)
b.Lock()
defer b.Unlock()
if db, err = b.getDBhandle(); err != nil {
return err
}
defer b.releaseDBhandle()
err = db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(b.boltBucket)
if bucket == nil {
return store.ErrKeyNotFound
}
err := bucket.Delete([]byte(key))
return err
})
return err
}
// Exists checks if the key exists inside the store
func (b *BoltDB) Exists(key string) (bool, error) {
var (
val []byte
db *bolt.DB
err error
)
b.Lock()
defer b.Unlock()
b.mu.Lock()
defer b.mu.Unlock()
if db, err = b.getDBhandle(); err != nil {
db, err := b.getDBhandle()
if err != nil {
return false, err
}
defer b.releaseDBhandle()
var exists bool
err = db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(b.boltBucket)
if bucket == nil {
return store.ErrKeyNotFound
}
val = bucket.Get([]byte(key))
exists = len(bucket.Get([]byte(key))) > 0
return nil
})
if len(val) == 0 {
if err != nil {
return false, err
}
return true, err
if !exists {
return false, store.ErrKeyNotFound
}
return true, nil
}
// List returns the range of keys starting with the passed in prefix
func (b *BoltDB) List(keyPrefix string) ([]*store.KVPair, error) {
var (
db *bolt.DB
err error
)
b.Lock()
defer b.Unlock()
b.mu.Lock()
defer b.mu.Unlock()
kv := []*store.KVPair{}
if db, err = b.getDBhandle(); err != nil {
db, err := b.getDBhandle()
if err != nil {
return nil, err
}
defer b.releaseDBhandle()
var kv []*store.KVPair
err = db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(b.boltBucket)
if bucket == nil {
@ -285,7 +230,6 @@ func (b *BoltDB) List(keyPrefix string) ([]*store.KVPair, error) {
prefix := []byte(keyPrefix)
for key, v := cursor.Seek(prefix); bytes.HasPrefix(key, prefix); key, v = cursor.Next() {
dbIndex := binary.LittleEndian.Uint64(v[:libkvmetadatalen])
v = v[libkvmetadatalen:]
val := make([]byte, len(v))
@ -299,39 +243,38 @@ func (b *BoltDB) List(keyPrefix string) ([]*store.KVPair, error) {
}
return nil
})
if err != nil {
return nil, err
}
if len(kv) == 0 {
return nil, store.ErrKeyNotFound
}
return kv, err
return kv, nil
}
// AtomicDelete deletes a value at "key" if the key
// has not been modified in the meantime, throws an
// error if this is the case
func (b *BoltDB) AtomicDelete(key string, previous *store.KVPair) (bool, error) {
var (
val []byte
db *bolt.DB
err error
)
b.Lock()
defer b.Unlock()
func (b *BoltDB) AtomicDelete(key string, previous *store.KVPair) error {
b.mu.Lock()
defer b.mu.Unlock()
if previous == nil {
return false, store.ErrPreviousNotSpecified
return store.ErrPreviousNotSpecified
}
if db, err = b.getDBhandle(); err != nil {
return false, err
db, err := b.getDBhandle()
if err != nil {
return err
}
defer b.releaseDBhandle()
err = db.Update(func(tx *bolt.Tx) error {
return db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(b.boltBucket)
if bucket == nil {
return store.ErrKeyNotFound
}
val = bucket.Get([]byte(key))
val := bucket.Get([]byte(key))
if val == nil {
return store.ErrKeyNotFound
}
@ -339,41 +282,31 @@ func (b *BoltDB) AtomicDelete(key string, previous *store.KVPair) (bool, error)
if dbIndex != previous.LastIndex {
return store.ErrKeyModified
}
err := bucket.Delete([]byte(key))
return err
return bucket.Delete([]byte(key))
})
if err != nil {
return false, err
}
return true, err
}
// AtomicPut puts a value at "key" if the key has not been
// modified since the last Put, throws an error if this is the case
func (b *BoltDB) AtomicPut(key string, value []byte, previous *store.KVPair, options *store.WriteOptions) (bool, *store.KVPair, error) {
var (
val []byte
dbIndex uint64
db *bolt.DB
err error
)
b.Lock()
defer b.Unlock()
func (b *BoltDB) AtomicPut(key string, value []byte, previous *store.KVPair) (*store.KVPair, error) {
b.mu.Lock()
defer b.mu.Unlock()
dbval := make([]byte, libkvmetadatalen)
if db, err = b.getDBhandle(); err != nil {
return false, nil, err
db, err := b.getDBhandle()
if err != nil {
return nil, err
}
defer b.releaseDBhandle()
var dbIndex uint64
dbval := make([]byte, libkvmetadatalen)
err = db.Update(func(tx *bolt.Tx) error {
var err error
bucket := tx.Bucket(b.boltBucket)
if bucket == nil {
if previous != nil {
return store.ErrKeyNotFound
}
var err error
bucket, err = tx.CreateBucket(b.boltBucket)
if err != nil {
return err
@ -381,7 +314,7 @@ func (b *BoltDB) AtomicPut(key string, value []byte, previous *store.KVPair, opt
}
// AtomicPut is equivalent to Put if previous is nil and the Ky
// doesn't exist in the DB.
val = bucket.Get([]byte(key))
val := bucket.Get([]byte(key))
if previous == nil && len(val) != 0 {
return store.ErrKeyExists
}
@ -397,25 +330,18 @@ func (b *BoltDB) AtomicPut(key string, value []byte, previous *store.KVPair, opt
dbIndex = atomic.AddUint64(&b.dbIndex, 1)
binary.LittleEndian.PutUint64(dbval, b.dbIndex)
dbval = append(dbval, value...)
return (bucket.Put([]byte(key), dbval))
return bucket.Put([]byte(key), dbval)
})
if err != nil {
return false, nil, err
return nil, err
}
updated := &store.KVPair{
Key: key,
Value: value,
LastIndex: dbIndex,
}
return true, updated, nil
return &store.KVPair{Key: key, Value: value, LastIndex: dbIndex}, nil
}
// Close the db connection to the BoltDB
func (b *BoltDB) Close() {
b.Lock()
defer b.Unlock()
b.mu.Lock()
defer b.mu.Unlock()
if !b.PersistConnection {
b.reset()
@ -423,50 +349,3 @@ func (b *BoltDB) Close() {
b.client.Close()
}
}
// DeleteTree deletes a range of keys with a given prefix
func (b *BoltDB) DeleteTree(keyPrefix string) error {
var (
db *bolt.DB
err error
)
b.Lock()
defer b.Unlock()
if db, err = b.getDBhandle(); err != nil {
return err
}
defer b.releaseDBhandle()
err = db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(b.boltBucket)
if bucket == nil {
return store.ErrKeyNotFound
}
cursor := bucket.Cursor()
prefix := []byte(keyPrefix)
for key, _ := cursor.Seek(prefix); bytes.HasPrefix(key, prefix); key, _ = cursor.Next() {
_ = bucket.Delete(key)
}
return nil
})
return err
}
// NewLock has to implemented at the library level since its not supported by BoltDB
func (b *BoltDB) NewLock(key string, options *store.LockOptions) (store.Locker, error) {
return nil, store.ErrCallNotSupported
}
// Watch has to implemented at the library level since its not supported by BoltDB
func (b *BoltDB) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) {
return nil, store.ErrCallNotSupported
}
// WatchTree has to implemented at the library level since its not supported by BoltDB
func (b *BoltDB) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) {
return nil, store.ErrCallNotSupported
}

View file

@ -1,7 +1,6 @@
package kvstore
import (
"crypto/tls"
"errors"
"time"
)
@ -15,8 +14,6 @@ const BOLTDB Backend = "boltdb"
var (
// ErrBackendNotSupported is thrown when the backend k/v store is not supported by libkv
ErrBackendNotSupported = errors.New("Backend storage not supported yet, please choose one of")
// ErrCallNotSupported is thrown when a method is not implemented/supported by the current backend
ErrCallNotSupported = errors.New("The current call is not supported with this backend")
// ErrKeyModified is thrown during an atomic operation if the index does not match the one in the store
ErrKeyModified = errors.New("Unable to complete atomic operation, key modified")
// ErrKeyNotFound is thrown when the key is not found in the store during a Get operation
@ -29,21 +26,9 @@ var (
// Config contains the options for a storage client
type Config struct {
ClientTLS *ClientTLSConfig
TLS *tls.Config
ConnectionTimeout time.Duration
Bucket string
PersistConnection bool
Username string
Password string
}
// ClientTLSConfig contains data for a Client TLS configuration in the form
// the etcd client wants it. Eventually we'll adapt it for ZK and Consul.
type ClientTLSConfig struct {
CertFile string
KeyFile string
CACertFile string
}
// Store represents the backend K/V storage
@ -52,41 +37,23 @@ type ClientTLSConfig struct {
// backend for libkv
type Store interface {
// Put a value at the specified key
Put(key string, value []byte, options *WriteOptions) error
Put(key string, value []byte) error
// Get a value given its key
Get(key string) (*KVPair, error)
// Delete the value at the specified key
Delete(key string) error
// Verify if a Key exists in the store
// Exists verifies if a Key exists in the store.
Exists(key string) (bool, error)
// Watch for changes on a key
Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error)
// WatchTree watches for changes on child nodes under
// a given directory
WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*KVPair, error)
// NewLock creates a lock for a given key.
// The returned Locker is not held and must be acquired
// with `.Lock`. The Value is optional.
NewLock(key string, options *LockOptions) (Locker, error)
// List the content of a given prefix
List(directory string) ([]*KVPair, error)
// DeleteTree deletes a range of keys under a given directory
DeleteTree(directory string) error
// Atomic CAS operation on a single value.
// AtomicPut performs an atomic CAS operation on a single value.
// Pass previous = nil to create a new key.
AtomicPut(key string, value []byte, previous *KVPair, options *WriteOptions) (bool, *KVPair, error)
AtomicPut(key string, value []byte, previous *KVPair) (*KVPair, error)
// Atomic delete of a single value
AtomicDelete(key string, previous *KVPair) (bool, error)
// AtomicDelete performs an atomic delete of a single value.
AtomicDelete(key string, previous *KVPair) error
// Close the store connection
Close()
@ -98,23 +65,3 @@ type KVPair struct {
Value []byte
LastIndex uint64
}
// WriteOptions contains optional request parameters
type WriteOptions struct {
IsDir bool
TTL time.Duration
}
// LockOptions contains optional request parameters
type LockOptions struct {
Value []byte // Optional, value to associate with the lock
TTL time.Duration // Optional, expiration ttl associated with the lock
RenewLock chan struct{} // Optional, chan used to control and stop the session ttl renewal for the lock
}
// Locker provides locking mechanism on top of the store.
// Similar to `sync.Lock` except it may return errors.
type Locker interface {
Lock(stopChan chan struct{}) (<-chan struct{}, error)
Unlock() error
}

View file

@ -32,15 +32,13 @@ func TestMain(m *testing.M) {
func newController(t *testing.T) *libnetwork.Controller {
t.Helper()
genericOption := map[string]interface{}{
netlabel.GenericData: options.Generic{
"EnableIPForwarding": true,
},
}
c, err := libnetwork.New(
libnetwork.OptionBoltdbWithRandomDBFile(t),
config.OptionDriverConfig(bridgeNetType, genericOption),
config.OptionDriverConfig(bridgeNetType, map[string]interface{}{
netlabel.GenericData: options.Generic{
"EnableIPForwarding": true,
},
}),
)
if err != nil {
t.Fatal(err)
@ -56,9 +54,7 @@ func createTestNetwork(c *libnetwork.Controller, networkType, networkName string
}
func getEmptyGenericOption() map[string]interface{} {
genericOption := make(map[string]interface{})
genericOption[netlabel.GenericData] = map[string]string{}
return genericOption
return map[string]interface{}{netlabel.GenericData: map[string]string{}}
}
func getPortMapping() []types.PortBinding {

View file

@ -368,7 +368,7 @@ func TestNetworkDBWatch(t *testing.T) {
err = dbs[1].JoinNetwork("network1")
assert.NilError(t, err)
ch, cancel := dbs[1].Watch("", "", "")
ch, cancel := dbs[1].Watch("", "")
err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
assert.NilError(t, err)

View file

@ -39,14 +39,14 @@ type UpdateEvent event
type DeleteEvent event
// Watch creates a watcher with filters for a particular table or
// network or key or any combination of the tuple. If any of the
// network or any combination of the tuple. If any of the
// filter is an empty string it acts as a wildcard for that
// field. Watch returns a channel of events, where the events will be
// sent.
func (nDB *NetworkDB) Watch(tname, nid, key string) (*events.Channel, func()) {
func (nDB *NetworkDB) Watch(tname, nid string) (*events.Channel, func()) {
var matcher events.Matcher
if tname != "" || nid != "" || key != "" {
if tname != "" || nid != "" {
matcher = events.MatcherFunc(func(ev events.Event) bool {
var evt event
switch ev := ev.(type) {
@ -66,10 +66,6 @@ func (nDB *NetworkDB) Watch(tname, nid, key string) (*events.Channel, func()) {
return false
}
if key != "" && evt.Key != key {
return false
}
return true
})
}

View file

@ -200,7 +200,6 @@ retry:
type netWatch struct {
localEps map[string]*Endpoint
remoteEps map[string]*Endpoint
stopCh chan struct{}
}
func (c *Controller) getLocalEps(nw *netWatch) []*Endpoint {
@ -223,71 +222,6 @@ func (c *Controller) unWatchSvcRecord(ep *Endpoint) {
c.unWatchCh <- ep
}
func (c *Controller) networkWatchLoop(nw *netWatch, ep *Endpoint, ecCh <-chan datastore.KVObject) {
for {
select {
case <-nw.stopCh:
return
case o := <-ecCh:
ec := o.(*endpointCnt)
epl, err := ec.n.getEndpointsFromStore()
if err != nil {
log.G(context.TODO()).WithError(err).Debug("error getting endpoints from store")
continue
}
c.mu.Lock()
var addEp []*Endpoint
delEpMap := make(map[string]*Endpoint)
renameEpMap := make(map[string]bool)
for k, v := range nw.remoteEps {
delEpMap[k] = v
}
for _, lEp := range epl {
if _, ok := nw.localEps[lEp.ID()]; ok {
continue
}
if ep, ok := nw.remoteEps[lEp.ID()]; ok {
// On a container rename EP ID will remain
// the same but the name will change. service
// records should reflect the change.
// Keep old EP entry in the delEpMap and add
// EP from the store (which has the new name)
// into the new list
if lEp.name == ep.name {
delete(delEpMap, lEp.ID())
continue
}
renameEpMap[lEp.ID()] = true
}
nw.remoteEps[lEp.ID()] = lEp
addEp = append(addEp, lEp)
}
// EPs whose name are to be deleted from the svc records
// should also be removed from nw's remote EP list, except
// the ones that are getting renamed.
for _, lEp := range delEpMap {
if !renameEpMap[lEp.ID()] {
delete(nw.remoteEps, lEp.ID())
}
}
c.mu.Unlock()
for _, lEp := range delEpMap {
ep.getNetwork().updateSvcRecord(lEp, c.getLocalEps(nw), false)
}
for _, lEp := range addEp {
ep.getNetwork().updateSvcRecord(lEp, c.getLocalEps(nw), true)
}
}
}
}
func (c *Controller) processEndpointCreate(nmap map[string]*netWatch, ep *Endpoint) {
n := ep.getNetwork()
if !c.isDistributedControl() && n.Scope() == datastore.SwarmScope && n.driverIsMultihost() {
@ -329,25 +263,7 @@ func (c *Controller) processEndpointCreate(nmap map[string]*netWatch, ep *Endpoi
c.mu.Lock()
nw.localEps[endpointID] = ep
nmap[networkID] = nw
nw.stopCh = make(chan struct{})
c.mu.Unlock()
store := c.getStore()
if store == nil {
return
}
if !store.Watchable() {
return
}
ch, err := store.Watch(n.getEpCnt(), nw.stopCh)
if err != nil {
log.G(context.TODO()).Warnf("Error creating watch for network: %v", err)
return
}
go c.networkWatchLoop(nw, ep, ch)
}
func (c *Controller) processEndpointDelete(nmap map[string]*netWatch, ep *Endpoint) {
@ -373,8 +289,6 @@ func (c *Controller) processEndpointDelete(nmap map[string]*netWatch, ep *Endpoi
c.mu.Lock()
if len(nw.localEps) == 0 {
close(nw.stopCh)
// This is the last container going away for the network. Destroy
// this network's svc db entry
delete(c.svcRecords, networkID)

View file

@ -2,6 +2,7 @@ package libnetwork
import (
"os"
"path/filepath"
"testing"
"github.com/docker/docker/libnetwork/datastore"
@ -11,18 +12,19 @@ import (
func TestBoltdbBackend(t *testing.T) {
defer os.Remove(datastore.DefaultScope("").Client.Address)
testLocalBackend(t, "", "", nil)
defer os.Remove("/tmp/boltdb.db")
config := &store.Config{Bucket: "testBackend"}
testLocalBackend(t, "boltdb", "/tmp/boltdb.db", config)
tmpPath := filepath.Join(t.TempDir(), "boltdb.db")
testLocalBackend(t, "boltdb", tmpPath, &store.Config{
Bucket: "testBackend",
})
}
func TestNoPersist(t *testing.T) {
ctrl, err := New(OptionBoltdbWithRandomDBFile(t))
testController, err := New(OptionBoltdbWithRandomDBFile(t))
if err != nil {
t.Fatalf("Error new controller: %v", err)
}
defer ctrl.Stop()
nw, err := ctrl.NewNetwork("host", "host", "", NetworkOptionPersist(false))
defer testController.Stop()
nw, err := testController.NewNetwork("host", "host", "", NetworkOptionPersist(false))
if err != nil {
t.Fatalf("Error creating default \"host\" network: %v", err)
}
@ -30,12 +32,12 @@ func TestNoPersist(t *testing.T) {
if err != nil {
t.Fatalf("Error creating endpoint: %v", err)
}
store := ctrl.getStore().KVStore()
if exists, _ := store.Exists(datastore.Key(datastore.NetworkKeyPrefix, nw.ID())); exists {
kvStore := testController.getStore().KVStore()
if exists, _ := kvStore.Exists(datastore.Key(datastore.NetworkKeyPrefix, nw.ID())); exists {
t.Fatalf("Network with persist=false should not be stored in KV Store")
}
if exists, _ := store.Exists(datastore.Key([]string{datastore.EndpointKeyPrefix, nw.ID(), ep.ID()}...)); exists {
if exists, _ := kvStore.Exists(datastore.Key([]string{datastore.EndpointKeyPrefix, nw.ID(), ep.ID()}...)); exists {
t.Fatalf("Endpoint in Network with persist=false should not be stored in KV Store")
}
store.Close()
kvStore.Close()
}

View file

@ -13,22 +13,22 @@ import (
)
func testLocalBackend(t *testing.T, provider, url string, storeConfig *store.Config) {
cfgOptions := []config.Option{}
cfgOptions = append(cfgOptions, config.OptionLocalKVProvider(provider))
cfgOptions = append(cfgOptions, config.OptionLocalKVProviderURL(url))
cfgOptions = append(cfgOptions, config.OptionLocalKVProviderConfig(storeConfig))
cfgOptions := []config.Option{func(c *config.Config) {
c.Scope.Client.Provider = provider
c.Scope.Client.Address = url
c.Scope.Client.Config = storeConfig
}}
driverOptions := options.Generic{}
genericOption := make(map[string]interface{})
genericOption[netlabel.GenericData] = driverOptions
cfgOptions = append(cfgOptions, config.OptionDriverConfig("host", genericOption))
cfgOptions = append(cfgOptions, config.OptionDriverConfig("host", map[string]interface{}{
netlabel.GenericData: options.Generic{},
}))
ctrl, err := New(cfgOptions...)
testController, err := New(cfgOptions...)
if err != nil {
t.Fatalf("Error new controller: %v", err)
}
defer ctrl.Stop()
nw, err := ctrl.NewNetwork("host", "host", "")
defer testController.Stop()
nw, err := testController.NewNetwork("host", "host", "")
if err != nil {
t.Fatalf("Error creating default \"host\" network: %v", err)
}
@ -36,22 +36,22 @@ func testLocalBackend(t *testing.T, provider, url string, storeConfig *store.Con
if err != nil {
t.Fatalf("Error creating endpoint: %v", err)
}
store := ctrl.getStore().KVStore()
if exists, err := store.Exists(datastore.Key(datastore.NetworkKeyPrefix, nw.ID())); !exists || err != nil {
kvStore := testController.getStore().KVStore()
if exists, err := kvStore.Exists(datastore.Key(datastore.NetworkKeyPrefix, nw.ID())); !exists || err != nil {
t.Fatalf("Network key should have been created.")
}
if exists, err := store.Exists(datastore.Key([]string{datastore.EndpointKeyPrefix, nw.ID(), ep.ID()}...)); !exists || err != nil {
if exists, err := kvStore.Exists(datastore.Key([]string{datastore.EndpointKeyPrefix, nw.ID(), ep.ID()}...)); !exists || err != nil {
t.Fatalf("Endpoint key should have been created.")
}
store.Close()
kvStore.Close()
// test restore of local store
ctrl, err = New(cfgOptions...)
testController, err = New(cfgOptions...)
if err != nil {
t.Fatalf("Error creating controller: %v", err)
}
defer ctrl.Stop()
if _, err = ctrl.NetworkByID(nw.ID()); err != nil {
defer testController.Stop()
if _, err = testController.NetworkByID(nw.ID()); err != nil {
t.Fatalf("Error getting network %v", err)
}
}
@ -65,9 +65,9 @@ func OptionBoltdbWithRandomDBFile(t *testing.T) config.Option {
}
return func(c *config.Config) {
config.OptionLocalKVProvider("boltdb")(c)
config.OptionLocalKVProviderURL(tmp)(c)
config.OptionLocalKVProviderConfig(&store.Config{Bucket: "testBackend"})(c)
c.Scope.Client.Provider = "boltdb"
c.Scope.Client.Address = tmp
c.Scope.Client.Config = &store.Config{Bucket: "testBackend"}
}
}