diff --git a/libnetwork/agent.go b/libnetwork/agent.go index 518e6bec84..7086648b49 100644 --- a/libnetwork/agent.go +++ b/libnetwork/agent.go @@ -312,9 +312,9 @@ func (c *Controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, d c.DiagnosticServer.RegisterHandler(nDB, networkdb.NetDbPaths2Func) var cancelList []func() - ch, cancel := nDB.Watch(libnetworkEPTable, "", "") + ch, cancel := nDB.Watch(libnetworkEPTable, "") cancelList = append(cancelList, cancel) - nodeCh, cancel := nDB.Watch(networkdb.NodeTable, "", "") + nodeCh, cancel := nDB.Watch(networkdb.NodeTable, "") cancelList = append(cancelList, cancel) c.mu.Lock() @@ -783,7 +783,7 @@ func (n *network) addDriverWatches() { return } for _, table := range n.driverTables { - ch, cancel := agent.networkDB.Watch(table.name, n.ID(), "") + ch, cancel := agent.networkDB.Watch(table.name, n.ID()) agent.Lock() agent.driverCancelFuncs[n.ID()] = append(agent.driverCancelFuncs[n.ID()], cancel) agent.Unlock() diff --git a/libnetwork/cmd/networkdb-test/dummyclient/dummyClient.go b/libnetwork/cmd/networkdb-test/dummyclient/dummyClient.go index 2bf56fae70..3477296670 100644 --- a/libnetwork/cmd/networkdb-test/dummyclient/dummyClient.go +++ b/libnetwork/cmd/networkdb-test/dummyclient/dummyClient.go @@ -45,7 +45,7 @@ func watchTable(ctx interface{}, w http.ResponseWriter, r *http.Request) { nDB, ok := ctx.(*networkdb.NetworkDB) if ok { - ch, cancel := nDB.Watch(tableName, "", "") + ch, cancel := nDB.Watch(tableName, "") clientWatchTable[tableName] = tableHandler{cancelWatch: cancel, entries: make(map[string]string)} go handleTableEvents(tableName, ch) diff --git a/libnetwork/config/config.go b/libnetwork/config/config.go index 9da3b588eb..8e0a27f863 100644 --- a/libnetwork/config/config.go +++ b/libnetwork/config/config.go @@ -7,7 +7,6 @@ import ( "github.com/containerd/containerd/log" "github.com/docker/docker/libnetwork/cluster" "github.com/docker/docker/libnetwork/datastore" - store "github.com/docker/docker/libnetwork/internal/kvstore" "github.com/docker/docker/libnetwork/ipamutils" "github.com/docker/docker/libnetwork/netlabel" "github.com/docker/docker/libnetwork/osl" @@ -141,30 +140,6 @@ func IsValidName(name string) bool { return strings.TrimSpace(name) != "" } -// OptionLocalKVProvider function returns an option setter for kvstore provider -func OptionLocalKVProvider(provider string) Option { - return func(c *Config) { - log.G(context.TODO()).Debugf("Option OptionLocalKVProvider: %s", provider) - c.Scope.Client.Provider = strings.TrimSpace(provider) - } -} - -// OptionLocalKVProviderURL function returns an option setter for kvstore url -func OptionLocalKVProviderURL(url string) Option { - return func(c *Config) { - log.G(context.TODO()).Debugf("Option OptionLocalKVProviderURL: %s", url) - c.Scope.Client.Address = strings.TrimSpace(url) - } -} - -// OptionLocalKVProviderConfig function returns an option setter for kvstore config -func OptionLocalKVProviderConfig(config *store.Config) Option { - return func(c *Config) { - log.G(context.TODO()).Debugf("Option OptionLocalKVProviderConfig: %v", config) - c.Scope.Client.Config = config - } -} - // OptionActiveSandboxes function returns an option setter for passing the sandboxes // which were active during previous daemon life func OptionActiveSandboxes(sandboxes map[string]interface{}) Option { diff --git a/libnetwork/datastore/cache.go b/libnetwork/datastore/cache.go index f001fc8138..61bc67e8ac 100644 --- a/libnetwork/datastore/cache.go +++ b/libnetwork/datastore/cache.go @@ -138,7 +138,7 @@ func (c *cache) del(kvObject KVObject, atomic bool) error { return nil } -func (c *cache) get(key string, kvObject KVObject) error { +func (c *cache) get(kvObject KVObject) error { kmap, err := c.kmap(kvObject) if err != nil { return err diff --git a/libnetwork/datastore/datastore.go b/libnetwork/datastore/datastore.go index b7963f04d6..f1456616b0 100644 --- a/libnetwork/datastore/datastore.go +++ b/libnetwork/datastore/datastore.go @@ -2,7 +2,6 @@ package datastore import ( "fmt" - "log" "reflect" "strings" "sync" @@ -17,24 +16,10 @@ import ( type DataStore interface { // GetObject gets data from datastore and unmarshals to the specified object GetObject(key string, o KVObject) error - // PutObject adds a new Record based on an object into the datastore - PutObject(kvObject KVObject) error // PutObjectAtomic provides an atomic add and update operation for a Record PutObjectAtomic(kvObject KVObject) error - // DeleteObject deletes a record - DeleteObject(kvObject KVObject) error // DeleteObjectAtomic performs an atomic delete operation DeleteObjectAtomic(kvObject KVObject) error - // DeleteTree deletes a record - DeleteTree(kvObject KVObject) error - // Watchable returns whether the store is watchable or not - Watchable() bool - // Watch for changes on a KVObject - Watch(kvObject KVObject, stopCh <-chan struct{}) (<-chan KVObject, error) - // RestartWatch retriggers stopped Watches - RestartWatch() - // Active returns if the store is active - Active() bool // List returns of a list of KVObjects belonging to the parent // key. The caller must pass a KVObject of the same type as // the objects that need to be listed @@ -56,13 +41,10 @@ var ( ) type datastore struct { - scope string - store store.Store - cache *cache - watchCh chan struct{} - active bool - sequential bool - sync.Mutex + mu sync.Mutex + scope string + store store.Store + cache *cache } // KVObject is Key/Value interface used by objects to be part of the DataStore @@ -210,7 +192,7 @@ func newClient(kv string, addr string, config *store.Config) (DataStore, error) return nil, err } - ds := &datastore{scope: LocalScope, store: s, active: true, watchCh: make(chan struct{}), sequential: true} + ds := &datastore{scope: LocalScope, store: s} ds.cache = newCache(ds) return ds, nil @@ -261,91 +243,6 @@ func (ds *datastore) Scope() string { return ds.scope } -func (ds *datastore) Active() bool { - return ds.active -} - -func (ds *datastore) Watchable() bool { - return ds.scope != LocalScope -} - -func (ds *datastore) Watch(kvObject KVObject, stopCh <-chan struct{}) (<-chan KVObject, error) { - sCh := make(chan struct{}) - - ctor, ok := kvObject.(KVConstructor) - if !ok { - return nil, fmt.Errorf("error watching object type %T, object does not implement KVConstructor interface", kvObject) - } - - kvpCh, err := ds.store.Watch(Key(kvObject.Key()...), sCh) - if err != nil { - return nil, err - } - - kvoCh := make(chan KVObject) - - go func() { - retry_watch: - var err error - - // Make sure to get a new instance of watch channel - ds.Lock() - watchCh := ds.watchCh - ds.Unlock() - - loop: - for { - select { - case <-stopCh: - close(sCh) - return - case kvPair := <-kvpCh: - // If the backend KV store gets reset libkv's go routine - // for the watch can exit resulting in a nil value in - // channel. - if kvPair == nil { - ds.Lock() - ds.active = false - ds.Unlock() - break loop - } - - dstO := ctor.New() - - if err = dstO.SetValue(kvPair.Value); err != nil { - log.Printf("Could not unmarshal kvpair value = %s", string(kvPair.Value)) - break - } - - dstO.SetIndex(kvPair.LastIndex) - kvoCh <- dstO - } - } - - // Wait on watch channel for a re-trigger when datastore becomes active - <-watchCh - - kvpCh, err = ds.store.Watch(Key(kvObject.Key()...), sCh) - if err != nil { - log.Printf("Could not watch the key %s in store: %v", Key(kvObject.Key()...), err) - } - - goto retry_watch - }() - - return kvoCh, nil -} - -func (ds *datastore) RestartWatch() { - ds.Lock() - defer ds.Unlock() - - ds.active = true - watchCh := ds.watchCh - ds.watchCh = make(chan struct{}) - close(watchCh) -} - func (ds *datastore) KVStore() store.Store { return ds.store } @@ -357,10 +254,8 @@ func (ds *datastore) PutObjectAtomic(kvObject KVObject) error { pair *store.KVPair err error ) - if ds.sequential { - ds.Lock() - defer ds.Unlock() - } + ds.mu.Lock() + defer ds.mu.Unlock() if kvObject == nil { return types.BadRequestErrorf("invalid KV Object : nil") @@ -382,7 +277,7 @@ func (ds *datastore) PutObjectAtomic(kvObject KVObject) error { previous = nil } - _, pair, err = ds.store.AtomicPut(Key(kvObject.Key()...), kvObjValue, previous, nil) + pair, err = ds.store.AtomicPut(Key(kvObject.Key()...), kvObjValue, previous) if err != nil { if err == store.ErrKeyExists { return ErrKeyModified @@ -402,53 +297,13 @@ add_cache: return nil } -// PutObject adds a new Record based on an object into the datastore -func (ds *datastore) PutObject(kvObject KVObject) error { - if ds.sequential { - ds.Lock() - defer ds.Unlock() - } - - if kvObject == nil { - return types.BadRequestErrorf("invalid KV Object : nil") - } - - if kvObject.Skip() { - goto add_cache - } - - if err := ds.putObjectWithKey(kvObject, kvObject.Key()...); err != nil { - return err - } - -add_cache: - if ds.cache != nil { - // If persistent store is skipped, sequencing needs to - // happen in cache. - return ds.cache.add(kvObject, kvObject.Skip()) - } - - return nil -} - -func (ds *datastore) putObjectWithKey(kvObject KVObject, key ...string) error { - kvObjValue := kvObject.Value() - - if kvObjValue == nil { - return types.BadRequestErrorf("invalid KV Object with a nil Value for key %s", Key(kvObject.Key()...)) - } - return ds.store.Put(Key(key...), kvObjValue, nil) -} - // GetObject returns a record matching the key func (ds *datastore) GetObject(key string, o KVObject) error { - if ds.sequential { - ds.Lock() - defer ds.Unlock() - } + ds.mu.Lock() + defer ds.mu.Unlock() if ds.cache != nil { - return ds.cache.get(key, o) + return ds.cache.get(o) } kvPair, err := ds.store.Get(key) @@ -474,14 +329,12 @@ func (ds *datastore) ensureParent(parent string) error { if exists { return nil } - return ds.store.Put(parent, []byte{}, &store.WriteOptions{IsDir: true}) + return ds.store.Put(parent, []byte{}) } func (ds *datastore) List(key string, kvObject KVObject) ([]KVObject, error) { - if ds.sequential { - ds.Lock() - defer ds.Unlock() - } + ds.mu.Lock() + defer ds.mu.Unlock() if ds.cache != nil { return ds.cache.list(kvObject) @@ -535,10 +388,8 @@ func (ds *datastore) iterateKVPairsFromStore(key string, kvObject KVObject, call } func (ds *datastore) Map(key string, kvObject KVObject) (map[string]KVObject, error) { - if ds.sequential { - ds.Lock() - defer ds.Unlock() - } + ds.mu.Lock() + defer ds.mu.Unlock() kvol := make(map[string]KVObject) cb := func(key string, val KVObject) { @@ -552,33 +403,10 @@ func (ds *datastore) Map(key string, kvObject KVObject) (map[string]KVObject, er return kvol, nil } -// DeleteObject unconditionally deletes a record from the store -func (ds *datastore) DeleteObject(kvObject KVObject) error { - if ds.sequential { - ds.Lock() - defer ds.Unlock() - } - - // cleanup the cache first - if ds.cache != nil { - // If persistent store is skipped, sequencing needs to - // happen in cache. - ds.cache.del(kvObject, kvObject.Skip()) - } - - if kvObject.Skip() { - return nil - } - - return ds.store.Delete(Key(kvObject.Key()...)) -} - // DeleteObjectAtomic performs atomic delete on a record func (ds *datastore) DeleteObjectAtomic(kvObject KVObject) error { - if ds.sequential { - ds.Lock() - defer ds.Unlock() - } + ds.mu.Lock() + defer ds.mu.Unlock() if kvObject == nil { return types.BadRequestErrorf("invalid KV Object : nil") @@ -590,7 +418,7 @@ func (ds *datastore) DeleteObjectAtomic(kvObject KVObject) error { goto del_cache } - if _, err := ds.store.AtomicDelete(Key(kvObject.Key()...), previous); err != nil { + if err := ds.store.AtomicDelete(Key(kvObject.Key()...), previous); err != nil { if err == store.ErrKeyExists { return ErrKeyModified } @@ -607,24 +435,3 @@ del_cache: return nil } - -// DeleteTree unconditionally deletes a record from the store -func (ds *datastore) DeleteTree(kvObject KVObject) error { - if ds.sequential { - ds.Lock() - defer ds.Unlock() - } - - // cleanup the cache first - if ds.cache != nil { - // If persistent store is skipped, sequencing needs to - // happen in cache. - ds.cache.del(kvObject, kvObject.Skip()) - } - - if kvObject.Skip() { - return nil - } - - return ds.store.DeleteTree(Key(kvObject.KeyPrefix()...)) -} diff --git a/libnetwork/datastore/datastore_test.go b/libnetwork/datastore/datastore_test.go index 02493bd13e..873c7b459e 100644 --- a/libnetwork/datastore/datastore_test.go +++ b/libnetwork/datastore/datastore_test.go @@ -51,7 +51,7 @@ func TestInvalidDataStore(t *testing.T) { func TestKVObjectFlatKey(t *testing.T) { store := NewTestDataStore() expected := dummyKVObject("1000", true) - err := store.PutObject(expected) + err := store.PutObjectAtomic(expected) if err != nil { t.Fatal(err) } diff --git a/libnetwork/datastore/mock_store.go b/libnetwork/datastore/mock_store.go index d6a513815f..9e7a0bb84b 100644 --- a/libnetwork/datastore/mock_store.go +++ b/libnetwork/datastore/mock_store.go @@ -38,7 +38,7 @@ func (s *MockStore) Get(key string) (*store.KVPair, error) { } // Put a value at "key" -func (s *MockStore) Put(key string, value []byte, options *store.WriteOptions) error { +func (s *MockStore) Put(key string, value []byte) error { mData := s.db[key] if mData == nil { mData = &MockData{value, 0} @@ -48,12 +48,6 @@ func (s *MockStore) Put(key string, value []byte, options *store.WriteOptions) e return nil } -// Delete a value at "key" -func (s *MockStore) Delete(key string) error { - delete(s.db, key) - return nil -} - // Exists checks that the key exists inside the store func (s *MockStore) Exists(key string) (bool, error) { _, ok := s.db[key] @@ -65,59 +59,38 @@ func (s *MockStore) List(prefix string) ([]*store.KVPair, error) { return nil, ErrNotImplemented } -// DeleteTree deletes a range of values at "directory" -func (s *MockStore) DeleteTree(prefix string) error { - delete(s.db, prefix) - return nil -} - -// Watch a single key for modifications -func (s *MockStore) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) { - return nil, ErrNotImplemented -} - -// WatchTree triggers a watch on a range of values at "directory" -func (s *MockStore) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) { - return nil, ErrNotImplemented -} - -// NewLock exposed -func (s *MockStore) NewLock(key string, options *store.LockOptions) (store.Locker, error) { - return nil, ErrNotImplemented -} - // AtomicPut put a value at "key" if the key has not been // modified in the meantime, throws an error if this is the case -func (s *MockStore) AtomicPut(key string, newValue []byte, previous *store.KVPair, options *store.WriteOptions) (bool, *store.KVPair, error) { +func (s *MockStore) AtomicPut(key string, newValue []byte, previous *store.KVPair) (*store.KVPair, error) { mData := s.db[key] if previous == nil { if mData != nil { - return false, nil, types.BadRequestErrorf("atomic put failed because key exists") + return nil, types.BadRequestErrorf("atomic put failed because key exists") } // Else OK. } else { if mData == nil { - return false, nil, types.BadRequestErrorf("atomic put failed because key exists") + return nil, types.BadRequestErrorf("atomic put failed because key exists") } if mData != nil && mData.Index != previous.LastIndex { - return false, nil, types.BadRequestErrorf("atomic put failed due to mismatched Index") + return nil, types.BadRequestErrorf("atomic put failed due to mismatched Index") } // Else OK. } - err := s.Put(key, newValue, nil) - if err != nil { - return false, nil, err + if err := s.Put(key, newValue); err != nil { + return nil, err } - return true, &store.KVPair{Key: key, Value: newValue, LastIndex: s.db[key].Index}, nil + return &store.KVPair{Key: key, Value: newValue, LastIndex: s.db[key].Index}, nil } // AtomicDelete deletes a value at "key" if the key has not // been modified in the meantime, throws an error if this is the case -func (s *MockStore) AtomicDelete(key string, previous *store.KVPair) (bool, error) { +func (s *MockStore) AtomicDelete(key string, previous *store.KVPair) error { mData := s.db[key] if mData != nil && mData.Index != previous.LastIndex { - return false, types.BadRequestErrorf("atomic delete failed due to mismatched Index") + return types.BadRequestErrorf("atomic delete failed due to mismatched Index") } - return true, s.Delete(key) + delete(s.db, key) + return nil } // Close closes the client connection diff --git a/libnetwork/internal/kvstore/boltdb/boltdb.go b/libnetwork/internal/kvstore/boltdb/boltdb.go index f4b97e030d..e5a4b949fb 100644 --- a/libnetwork/internal/kvstore/boltdb/boltdb.go +++ b/libnetwork/internal/kvstore/boltdb/boltdb.go @@ -22,12 +22,11 @@ var ( ErrBoltBucketOptionMissing = errors.New("boltBucket config option missing") ) -const ( - filePerm os.FileMode = 0644 -) +const filePerm = 0o644 // BoltDB type implements the Store interface type BoltDB struct { + mu sync.Mutex client *bolt.DB boltBucket []byte dbIndex uint64 @@ -39,7 +38,6 @@ type BoltDB struct { // PersistConnection flag provides an option to override ths behavior. // ie: open the connection in New and use it till Close is called. PersistConnection bool - sync.Mutex } const ( @@ -54,13 +52,6 @@ func Register() { // New opens a new BoltDB connection to the specified path and bucket func New(endpoints []string, options *store.Config) (store.Store, error) { - var ( - db *bolt.DB - err error - boltOptions *bolt.Options - timeout = transientTimeout - ) - if len(endpoints) > 1 { return nil, ErrMultipleEndpointsUnsupported } @@ -70,18 +61,22 @@ func New(endpoints []string, options *store.Config) (store.Store, error) { } dir, _ := filepath.Split(endpoints[0]) - if err = os.MkdirAll(dir, 0750); err != nil { + if err := os.MkdirAll(dir, 0o750); err != nil { return nil, err } + var db *bolt.DB if options.PersistConnection { - boltOptions = &bolt.Options{Timeout: options.ConnectionTimeout} - db, err = bolt.Open(endpoints[0], filePerm, boltOptions) + var err error + db, err = bolt.Open(endpoints[0], filePerm, &bolt.Options{ + Timeout: options.ConnectionTimeout, + }) if err != nil { return nil, err } } + timeout := transientTimeout if options.ConnectionTimeout != 0 { timeout = options.ConnectionTimeout } @@ -103,18 +98,13 @@ func (b *BoltDB) reset() { } func (b *BoltDB) getDBhandle() (*bolt.DB, error) { - var ( - db *bolt.DB - err error - ) if !b.PersistConnection { - boltOptions := &bolt.Options{Timeout: b.timeout} - if db, err = bolt.Open(b.path, filePerm, boltOptions); err != nil { + db, err := bolt.Open(b.path, filePerm, &bolt.Options{Timeout: b.timeout}) + if err != nil { return nil, err } b.client = db } - return b.client, nil } @@ -127,19 +117,16 @@ func (b *BoltDB) releaseDBhandle() { // Get the value at "key". BoltDB doesn't provide an inbuilt last modified index with every kv pair. Its implemented by // by a atomic counter maintained by the libkv and appened to the value passed by the client. func (b *BoltDB) Get(key string) (*store.KVPair, error) { - var ( - val []byte - db *bolt.DB - err error - ) - b.Lock() - defer b.Unlock() + b.mu.Lock() + defer b.mu.Unlock() - if db, err = b.getDBhandle(); err != nil { + db, err := b.getDBhandle() + if err != nil { return nil, err } defer b.releaseDBhandle() + var val []byte err = db.View(func(tx *bolt.Tx) error { bucket := tx.Bucket(b.boltBucket) if bucket == nil { @@ -152,129 +139,87 @@ func (b *BoltDB) Get(key string) (*store.KVPair, error) { return nil }) - - if len(val) == 0 { - return nil, store.ErrKeyNotFound - } if err != nil { return nil, err } + if len(val) == 0 { + return nil, store.ErrKeyNotFound + } dbIndex := binary.LittleEndian.Uint64(val[:libkvmetadatalen]) val = val[libkvmetadatalen:] - return &store.KVPair{Key: key, Value: val, LastIndex: (dbIndex)}, nil + return &store.KVPair{Key: key, Value: val, LastIndex: dbIndex}, nil } // Put the key, value pair. index number metadata is prepended to the value -func (b *BoltDB) Put(key string, value []byte, opts *store.WriteOptions) error { - var ( - dbIndex uint64 - db *bolt.DB - err error - ) - b.Lock() - defer b.Unlock() +func (b *BoltDB) Put(key string, value []byte) error { + b.mu.Lock() + defer b.mu.Unlock() - dbval := make([]byte, libkvmetadatalen) - - if db, err = b.getDBhandle(); err != nil { + db, err := b.getDBhandle() + if err != nil { return err } defer b.releaseDBhandle() - err = db.Update(func(tx *bolt.Tx) error { + return db.Update(func(tx *bolt.Tx) error { bucket, err := tx.CreateBucketIfNotExists(b.boltBucket) if err != nil { return err } - dbIndex = atomic.AddUint64(&b.dbIndex, 1) + dbIndex := atomic.AddUint64(&b.dbIndex, 1) + dbval := make([]byte, libkvmetadatalen) binary.LittleEndian.PutUint64(dbval, dbIndex) dbval = append(dbval, value...) - err = bucket.Put([]byte(key), dbval) - if err != nil { - return err - } - return nil + return bucket.Put([]byte(key), dbval) }) - return err -} - -// Delete the value for the given key. -func (b *BoltDB) Delete(key string) error { - var ( - db *bolt.DB - err error - ) - b.Lock() - defer b.Unlock() - - if db, err = b.getDBhandle(); err != nil { - return err - } - defer b.releaseDBhandle() - - err = db.Update(func(tx *bolt.Tx) error { - bucket := tx.Bucket(b.boltBucket) - if bucket == nil { - return store.ErrKeyNotFound - } - err := bucket.Delete([]byte(key)) - return err - }) - return err } // Exists checks if the key exists inside the store func (b *BoltDB) Exists(key string) (bool, error) { - var ( - val []byte - db *bolt.DB - err error - ) - b.Lock() - defer b.Unlock() + b.mu.Lock() + defer b.mu.Unlock() - if db, err = b.getDBhandle(); err != nil { + db, err := b.getDBhandle() + if err != nil { return false, err } defer b.releaseDBhandle() + var exists bool err = db.View(func(tx *bolt.Tx) error { bucket := tx.Bucket(b.boltBucket) if bucket == nil { return store.ErrKeyNotFound } - val = bucket.Get([]byte(key)) - + exists = len(bucket.Get([]byte(key))) > 0 return nil }) - - if len(val) == 0 { + if err != nil { return false, err } - return true, err + if !exists { + return false, store.ErrKeyNotFound + } + return true, nil } // List returns the range of keys starting with the passed in prefix func (b *BoltDB) List(keyPrefix string) ([]*store.KVPair, error) { - var ( - db *bolt.DB - err error - ) - b.Lock() - defer b.Unlock() + b.mu.Lock() + defer b.mu.Unlock() - kv := []*store.KVPair{} - - if db, err = b.getDBhandle(); err != nil { + db, err := b.getDBhandle() + if err != nil { return nil, err } defer b.releaseDBhandle() + var kv []*store.KVPair err = db.View(func(tx *bolt.Tx) error { bucket := tx.Bucket(b.boltBucket) if bucket == nil { @@ -285,7 +230,6 @@ func (b *BoltDB) List(keyPrefix string) ([]*store.KVPair, error) { prefix := []byte(keyPrefix) for key, v := cursor.Seek(prefix); bytes.HasPrefix(key, prefix); key, v = cursor.Next() { - dbIndex := binary.LittleEndian.Uint64(v[:libkvmetadatalen]) v = v[libkvmetadatalen:] val := make([]byte, len(v)) @@ -299,39 +243,38 @@ func (b *BoltDB) List(keyPrefix string) ([]*store.KVPair, error) { } return nil }) + if err != nil { + return nil, err + } if len(kv) == 0 { return nil, store.ErrKeyNotFound } - return kv, err + return kv, nil } // AtomicDelete deletes a value at "key" if the key // has not been modified in the meantime, throws an // error if this is the case -func (b *BoltDB) AtomicDelete(key string, previous *store.KVPair) (bool, error) { - var ( - val []byte - db *bolt.DB - err error - ) - b.Lock() - defer b.Unlock() +func (b *BoltDB) AtomicDelete(key string, previous *store.KVPair) error { + b.mu.Lock() + defer b.mu.Unlock() if previous == nil { - return false, store.ErrPreviousNotSpecified + return store.ErrPreviousNotSpecified } - if db, err = b.getDBhandle(); err != nil { - return false, err + db, err := b.getDBhandle() + if err != nil { + return err } defer b.releaseDBhandle() - err = db.Update(func(tx *bolt.Tx) error { + return db.Update(func(tx *bolt.Tx) error { bucket := tx.Bucket(b.boltBucket) if bucket == nil { return store.ErrKeyNotFound } - val = bucket.Get([]byte(key)) + val := bucket.Get([]byte(key)) if val == nil { return store.ErrKeyNotFound } @@ -339,41 +282,31 @@ func (b *BoltDB) AtomicDelete(key string, previous *store.KVPair) (bool, error) if dbIndex != previous.LastIndex { return store.ErrKeyModified } - err := bucket.Delete([]byte(key)) - return err + return bucket.Delete([]byte(key)) }) - if err != nil { - return false, err - } - return true, err } // AtomicPut puts a value at "key" if the key has not been // modified since the last Put, throws an error if this is the case -func (b *BoltDB) AtomicPut(key string, value []byte, previous *store.KVPair, options *store.WriteOptions) (bool, *store.KVPair, error) { - var ( - val []byte - dbIndex uint64 - db *bolt.DB - err error - ) - b.Lock() - defer b.Unlock() +func (b *BoltDB) AtomicPut(key string, value []byte, previous *store.KVPair) (*store.KVPair, error) { + b.mu.Lock() + defer b.mu.Unlock() - dbval := make([]byte, libkvmetadatalen) - - if db, err = b.getDBhandle(); err != nil { - return false, nil, err + db, err := b.getDBhandle() + if err != nil { + return nil, err } defer b.releaseDBhandle() + var dbIndex uint64 + dbval := make([]byte, libkvmetadatalen) err = db.Update(func(tx *bolt.Tx) error { - var err error bucket := tx.Bucket(b.boltBucket) if bucket == nil { if previous != nil { return store.ErrKeyNotFound } + var err error bucket, err = tx.CreateBucket(b.boltBucket) if err != nil { return err @@ -381,7 +314,7 @@ func (b *BoltDB) AtomicPut(key string, value []byte, previous *store.KVPair, opt } // AtomicPut is equivalent to Put if previous is nil and the Ky // doesn't exist in the DB. - val = bucket.Get([]byte(key)) + val := bucket.Get([]byte(key)) if previous == nil && len(val) != 0 { return store.ErrKeyExists } @@ -397,25 +330,18 @@ func (b *BoltDB) AtomicPut(key string, value []byte, previous *store.KVPair, opt dbIndex = atomic.AddUint64(&b.dbIndex, 1) binary.LittleEndian.PutUint64(dbval, b.dbIndex) dbval = append(dbval, value...) - return (bucket.Put([]byte(key), dbval)) + return bucket.Put([]byte(key), dbval) }) if err != nil { - return false, nil, err + return nil, err } - - updated := &store.KVPair{ - Key: key, - Value: value, - LastIndex: dbIndex, - } - - return true, updated, nil + return &store.KVPair{Key: key, Value: value, LastIndex: dbIndex}, nil } // Close the db connection to the BoltDB func (b *BoltDB) Close() { - b.Lock() - defer b.Unlock() + b.mu.Lock() + defer b.mu.Unlock() if !b.PersistConnection { b.reset() @@ -423,50 +349,3 @@ func (b *BoltDB) Close() { b.client.Close() } } - -// DeleteTree deletes a range of keys with a given prefix -func (b *BoltDB) DeleteTree(keyPrefix string) error { - var ( - db *bolt.DB - err error - ) - b.Lock() - defer b.Unlock() - - if db, err = b.getDBhandle(); err != nil { - return err - } - defer b.releaseDBhandle() - - err = db.Update(func(tx *bolt.Tx) error { - bucket := tx.Bucket(b.boltBucket) - if bucket == nil { - return store.ErrKeyNotFound - } - - cursor := bucket.Cursor() - prefix := []byte(keyPrefix) - - for key, _ := cursor.Seek(prefix); bytes.HasPrefix(key, prefix); key, _ = cursor.Next() { - _ = bucket.Delete(key) - } - return nil - }) - - return err -} - -// NewLock has to implemented at the library level since its not supported by BoltDB -func (b *BoltDB) NewLock(key string, options *store.LockOptions) (store.Locker, error) { - return nil, store.ErrCallNotSupported -} - -// Watch has to implemented at the library level since its not supported by BoltDB -func (b *BoltDB) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) { - return nil, store.ErrCallNotSupported -} - -// WatchTree has to implemented at the library level since its not supported by BoltDB -func (b *BoltDB) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) { - return nil, store.ErrCallNotSupported -} diff --git a/libnetwork/internal/kvstore/kvstore.go b/libnetwork/internal/kvstore/kvstore.go index bc7a24f462..e8b61046ad 100644 --- a/libnetwork/internal/kvstore/kvstore.go +++ b/libnetwork/internal/kvstore/kvstore.go @@ -1,7 +1,6 @@ package kvstore import ( - "crypto/tls" "errors" "time" ) @@ -15,8 +14,6 @@ const BOLTDB Backend = "boltdb" var ( // ErrBackendNotSupported is thrown when the backend k/v store is not supported by libkv ErrBackendNotSupported = errors.New("Backend storage not supported yet, please choose one of") - // ErrCallNotSupported is thrown when a method is not implemented/supported by the current backend - ErrCallNotSupported = errors.New("The current call is not supported with this backend") // ErrKeyModified is thrown during an atomic operation if the index does not match the one in the store ErrKeyModified = errors.New("Unable to complete atomic operation, key modified") // ErrKeyNotFound is thrown when the key is not found in the store during a Get operation @@ -29,21 +26,9 @@ var ( // Config contains the options for a storage client type Config struct { - ClientTLS *ClientTLSConfig - TLS *tls.Config ConnectionTimeout time.Duration Bucket string PersistConnection bool - Username string - Password string -} - -// ClientTLSConfig contains data for a Client TLS configuration in the form -// the etcd client wants it. Eventually we'll adapt it for ZK and Consul. -type ClientTLSConfig struct { - CertFile string - KeyFile string - CACertFile string } // Store represents the backend K/V storage @@ -52,41 +37,23 @@ type ClientTLSConfig struct { // backend for libkv type Store interface { // Put a value at the specified key - Put(key string, value []byte, options *WriteOptions) error + Put(key string, value []byte) error // Get a value given its key Get(key string) (*KVPair, error) - // Delete the value at the specified key - Delete(key string) error - - // Verify if a Key exists in the store + // Exists verifies if a Key exists in the store. Exists(key string) (bool, error) - // Watch for changes on a key - Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error) - - // WatchTree watches for changes on child nodes under - // a given directory - WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*KVPair, error) - - // NewLock creates a lock for a given key. - // The returned Locker is not held and must be acquired - // with `.Lock`. The Value is optional. - NewLock(key string, options *LockOptions) (Locker, error) - // List the content of a given prefix List(directory string) ([]*KVPair, error) - // DeleteTree deletes a range of keys under a given directory - DeleteTree(directory string) error - - // Atomic CAS operation on a single value. + // AtomicPut performs an atomic CAS operation on a single value. // Pass previous = nil to create a new key. - AtomicPut(key string, value []byte, previous *KVPair, options *WriteOptions) (bool, *KVPair, error) + AtomicPut(key string, value []byte, previous *KVPair) (*KVPair, error) - // Atomic delete of a single value - AtomicDelete(key string, previous *KVPair) (bool, error) + // AtomicDelete performs an atomic delete of a single value. + AtomicDelete(key string, previous *KVPair) error // Close the store connection Close() @@ -98,23 +65,3 @@ type KVPair struct { Value []byte LastIndex uint64 } - -// WriteOptions contains optional request parameters -type WriteOptions struct { - IsDir bool - TTL time.Duration -} - -// LockOptions contains optional request parameters -type LockOptions struct { - Value []byte // Optional, value to associate with the lock - TTL time.Duration // Optional, expiration ttl associated with the lock - RenewLock chan struct{} // Optional, chan used to control and stop the session ttl renewal for the lock -} - -// Locker provides locking mechanism on top of the store. -// Similar to `sync.Lock` except it may return errors. -type Locker interface { - Lock(stopChan chan struct{}) (<-chan struct{}, error) - Unlock() error -} diff --git a/libnetwork/libnetwork_test.go b/libnetwork/libnetwork_test.go index df9cfeba04..780ce18258 100644 --- a/libnetwork/libnetwork_test.go +++ b/libnetwork/libnetwork_test.go @@ -32,15 +32,13 @@ func TestMain(m *testing.M) { func newController(t *testing.T) *libnetwork.Controller { t.Helper() - genericOption := map[string]interface{}{ - netlabel.GenericData: options.Generic{ - "EnableIPForwarding": true, - }, - } - c, err := libnetwork.New( libnetwork.OptionBoltdbWithRandomDBFile(t), - config.OptionDriverConfig(bridgeNetType, genericOption), + config.OptionDriverConfig(bridgeNetType, map[string]interface{}{ + netlabel.GenericData: options.Generic{ + "EnableIPForwarding": true, + }, + }), ) if err != nil { t.Fatal(err) @@ -56,9 +54,7 @@ func createTestNetwork(c *libnetwork.Controller, networkType, networkName string } func getEmptyGenericOption() map[string]interface{} { - genericOption := make(map[string]interface{}) - genericOption[netlabel.GenericData] = map[string]string{} - return genericOption + return map[string]interface{}{netlabel.GenericData: map[string]string{}} } func getPortMapping() []types.PortBinding { diff --git a/libnetwork/networkdb/networkdb_test.go b/libnetwork/networkdb/networkdb_test.go index 48b559c07f..19969210bc 100644 --- a/libnetwork/networkdb/networkdb_test.go +++ b/libnetwork/networkdb/networkdb_test.go @@ -368,7 +368,7 @@ func TestNetworkDBWatch(t *testing.T) { err = dbs[1].JoinNetwork("network1") assert.NilError(t, err) - ch, cancel := dbs[1].Watch("", "", "") + ch, cancel := dbs[1].Watch("", "") err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value")) assert.NilError(t, err) diff --git a/libnetwork/networkdb/watch.go b/libnetwork/networkdb/watch.go index 2ef30422a8..c622787d50 100644 --- a/libnetwork/networkdb/watch.go +++ b/libnetwork/networkdb/watch.go @@ -39,14 +39,14 @@ type UpdateEvent event type DeleteEvent event // Watch creates a watcher with filters for a particular table or -// network or key or any combination of the tuple. If any of the +// network or any combination of the tuple. If any of the // filter is an empty string it acts as a wildcard for that // field. Watch returns a channel of events, where the events will be // sent. -func (nDB *NetworkDB) Watch(tname, nid, key string) (*events.Channel, func()) { +func (nDB *NetworkDB) Watch(tname, nid string) (*events.Channel, func()) { var matcher events.Matcher - if tname != "" || nid != "" || key != "" { + if tname != "" || nid != "" { matcher = events.MatcherFunc(func(ev events.Event) bool { var evt event switch ev := ev.(type) { @@ -66,10 +66,6 @@ func (nDB *NetworkDB) Watch(tname, nid, key string) (*events.Channel, func()) { return false } - if key != "" && evt.Key != key { - return false - } - return true }) } diff --git a/libnetwork/store.go b/libnetwork/store.go index 89c8efcdec..119a51731c 100644 --- a/libnetwork/store.go +++ b/libnetwork/store.go @@ -200,7 +200,6 @@ retry: type netWatch struct { localEps map[string]*Endpoint remoteEps map[string]*Endpoint - stopCh chan struct{} } func (c *Controller) getLocalEps(nw *netWatch) []*Endpoint { @@ -223,71 +222,6 @@ func (c *Controller) unWatchSvcRecord(ep *Endpoint) { c.unWatchCh <- ep } -func (c *Controller) networkWatchLoop(nw *netWatch, ep *Endpoint, ecCh <-chan datastore.KVObject) { - for { - select { - case <-nw.stopCh: - return - case o := <-ecCh: - ec := o.(*endpointCnt) - - epl, err := ec.n.getEndpointsFromStore() - if err != nil { - log.G(context.TODO()).WithError(err).Debug("error getting endpoints from store") - continue - } - - c.mu.Lock() - var addEp []*Endpoint - - delEpMap := make(map[string]*Endpoint) - renameEpMap := make(map[string]bool) - for k, v := range nw.remoteEps { - delEpMap[k] = v - } - - for _, lEp := range epl { - if _, ok := nw.localEps[lEp.ID()]; ok { - continue - } - - if ep, ok := nw.remoteEps[lEp.ID()]; ok { - // On a container rename EP ID will remain - // the same but the name will change. service - // records should reflect the change. - // Keep old EP entry in the delEpMap and add - // EP from the store (which has the new name) - // into the new list - if lEp.name == ep.name { - delete(delEpMap, lEp.ID()) - continue - } - renameEpMap[lEp.ID()] = true - } - nw.remoteEps[lEp.ID()] = lEp - addEp = append(addEp, lEp) - } - - // EPs whose name are to be deleted from the svc records - // should also be removed from nw's remote EP list, except - // the ones that are getting renamed. - for _, lEp := range delEpMap { - if !renameEpMap[lEp.ID()] { - delete(nw.remoteEps, lEp.ID()) - } - } - c.mu.Unlock() - - for _, lEp := range delEpMap { - ep.getNetwork().updateSvcRecord(lEp, c.getLocalEps(nw), false) - } - for _, lEp := range addEp { - ep.getNetwork().updateSvcRecord(lEp, c.getLocalEps(nw), true) - } - } - } -} - func (c *Controller) processEndpointCreate(nmap map[string]*netWatch, ep *Endpoint) { n := ep.getNetwork() if !c.isDistributedControl() && n.Scope() == datastore.SwarmScope && n.driverIsMultihost() { @@ -329,25 +263,7 @@ func (c *Controller) processEndpointCreate(nmap map[string]*netWatch, ep *Endpoi c.mu.Lock() nw.localEps[endpointID] = ep nmap[networkID] = nw - nw.stopCh = make(chan struct{}) c.mu.Unlock() - - store := c.getStore() - if store == nil { - return - } - - if !store.Watchable() { - return - } - - ch, err := store.Watch(n.getEpCnt(), nw.stopCh) - if err != nil { - log.G(context.TODO()).Warnf("Error creating watch for network: %v", err) - return - } - - go c.networkWatchLoop(nw, ep, ch) } func (c *Controller) processEndpointDelete(nmap map[string]*netWatch, ep *Endpoint) { @@ -373,8 +289,6 @@ func (c *Controller) processEndpointDelete(nmap map[string]*netWatch, ep *Endpoi c.mu.Lock() if len(nw.localEps) == 0 { - close(nw.stopCh) - // This is the last container going away for the network. Destroy // this network's svc db entry delete(c.svcRecords, networkID) diff --git a/libnetwork/store_linux_test.go b/libnetwork/store_linux_test.go index 19a74ec709..fbd0720311 100644 --- a/libnetwork/store_linux_test.go +++ b/libnetwork/store_linux_test.go @@ -2,6 +2,7 @@ package libnetwork import ( "os" + "path/filepath" "testing" "github.com/docker/docker/libnetwork/datastore" @@ -11,18 +12,19 @@ import ( func TestBoltdbBackend(t *testing.T) { defer os.Remove(datastore.DefaultScope("").Client.Address) testLocalBackend(t, "", "", nil) - defer os.Remove("/tmp/boltdb.db") - config := &store.Config{Bucket: "testBackend"} - testLocalBackend(t, "boltdb", "/tmp/boltdb.db", config) + tmpPath := filepath.Join(t.TempDir(), "boltdb.db") + testLocalBackend(t, "boltdb", tmpPath, &store.Config{ + Bucket: "testBackend", + }) } func TestNoPersist(t *testing.T) { - ctrl, err := New(OptionBoltdbWithRandomDBFile(t)) + testController, err := New(OptionBoltdbWithRandomDBFile(t)) if err != nil { t.Fatalf("Error new controller: %v", err) } - defer ctrl.Stop() - nw, err := ctrl.NewNetwork("host", "host", "", NetworkOptionPersist(false)) + defer testController.Stop() + nw, err := testController.NewNetwork("host", "host", "", NetworkOptionPersist(false)) if err != nil { t.Fatalf("Error creating default \"host\" network: %v", err) } @@ -30,12 +32,12 @@ func TestNoPersist(t *testing.T) { if err != nil { t.Fatalf("Error creating endpoint: %v", err) } - store := ctrl.getStore().KVStore() - if exists, _ := store.Exists(datastore.Key(datastore.NetworkKeyPrefix, nw.ID())); exists { + kvStore := testController.getStore().KVStore() + if exists, _ := kvStore.Exists(datastore.Key(datastore.NetworkKeyPrefix, nw.ID())); exists { t.Fatalf("Network with persist=false should not be stored in KV Store") } - if exists, _ := store.Exists(datastore.Key([]string{datastore.EndpointKeyPrefix, nw.ID(), ep.ID()}...)); exists { + if exists, _ := kvStore.Exists(datastore.Key([]string{datastore.EndpointKeyPrefix, nw.ID(), ep.ID()}...)); exists { t.Fatalf("Endpoint in Network with persist=false should not be stored in KV Store") } - store.Close() + kvStore.Close() } diff --git a/libnetwork/store_test.go b/libnetwork/store_test.go index 11d702c19d..b129179afd 100644 --- a/libnetwork/store_test.go +++ b/libnetwork/store_test.go @@ -13,22 +13,22 @@ import ( ) func testLocalBackend(t *testing.T, provider, url string, storeConfig *store.Config) { - cfgOptions := []config.Option{} - cfgOptions = append(cfgOptions, config.OptionLocalKVProvider(provider)) - cfgOptions = append(cfgOptions, config.OptionLocalKVProviderURL(url)) - cfgOptions = append(cfgOptions, config.OptionLocalKVProviderConfig(storeConfig)) + cfgOptions := []config.Option{func(c *config.Config) { + c.Scope.Client.Provider = provider + c.Scope.Client.Address = url + c.Scope.Client.Config = storeConfig + }} - driverOptions := options.Generic{} - genericOption := make(map[string]interface{}) - genericOption[netlabel.GenericData] = driverOptions - cfgOptions = append(cfgOptions, config.OptionDriverConfig("host", genericOption)) + cfgOptions = append(cfgOptions, config.OptionDriverConfig("host", map[string]interface{}{ + netlabel.GenericData: options.Generic{}, + })) - ctrl, err := New(cfgOptions...) + testController, err := New(cfgOptions...) if err != nil { t.Fatalf("Error new controller: %v", err) } - defer ctrl.Stop() - nw, err := ctrl.NewNetwork("host", "host", "") + defer testController.Stop() + nw, err := testController.NewNetwork("host", "host", "") if err != nil { t.Fatalf("Error creating default \"host\" network: %v", err) } @@ -36,22 +36,22 @@ func testLocalBackend(t *testing.T, provider, url string, storeConfig *store.Con if err != nil { t.Fatalf("Error creating endpoint: %v", err) } - store := ctrl.getStore().KVStore() - if exists, err := store.Exists(datastore.Key(datastore.NetworkKeyPrefix, nw.ID())); !exists || err != nil { + kvStore := testController.getStore().KVStore() + if exists, err := kvStore.Exists(datastore.Key(datastore.NetworkKeyPrefix, nw.ID())); !exists || err != nil { t.Fatalf("Network key should have been created.") } - if exists, err := store.Exists(datastore.Key([]string{datastore.EndpointKeyPrefix, nw.ID(), ep.ID()}...)); !exists || err != nil { + if exists, err := kvStore.Exists(datastore.Key([]string{datastore.EndpointKeyPrefix, nw.ID(), ep.ID()}...)); !exists || err != nil { t.Fatalf("Endpoint key should have been created.") } - store.Close() + kvStore.Close() // test restore of local store - ctrl, err = New(cfgOptions...) + testController, err = New(cfgOptions...) if err != nil { t.Fatalf("Error creating controller: %v", err) } - defer ctrl.Stop() - if _, err = ctrl.NetworkByID(nw.ID()); err != nil { + defer testController.Stop() + if _, err = testController.NetworkByID(nw.ID()); err != nil { t.Fatalf("Error getting network %v", err) } } @@ -65,9 +65,9 @@ func OptionBoltdbWithRandomDBFile(t *testing.T) config.Option { } return func(c *config.Config) { - config.OptionLocalKVProvider("boltdb")(c) - config.OptionLocalKVProviderURL(tmp)(c) - config.OptionLocalKVProviderConfig(&store.Config{Bucket: "testBackend"})(c) + c.Scope.Client.Provider = "boltdb" + c.Scope.Client.Address = tmp + c.Scope.Client.Config = &store.Config{Bucket: "testBackend"} } }