瀏覽代碼

Updating to new Swarm discovery and store APIs

Signed-off-by: Madhu Venugopal <madhu@docker.com>
Madhu Venugopal 10 年之前
父節點
當前提交
0fda541b37

+ 2 - 2
libnetwork/cmd/test/libnetwork.toml

@@ -3,8 +3,8 @@ title = "LibNetwork Configuration file"
 [daemon]
   debug = false
 [cluster]
-  discovery = "token://ce5b9756aeab50fe8fda02624f093d1c"
-  Address = "1.1.1.1:90"
+  discovery = "token://22aa23948f4f6b31230687689636959e"
+  Address = "2.1.1.1"
 [datastore]
   embedded = false
 [datastore.client]

+ 8 - 3
libnetwork/cmd/test/main.go

@@ -2,16 +2,18 @@ package main
 
 import (
 	"fmt"
-	"log"
 	"net"
 	"os"
 	"time"
 
+	log "github.com/Sirupsen/logrus"
+
 	"github.com/docker/libnetwork"
 	"github.com/docker/libnetwork/options"
 )
 
 func main() {
+	log.SetLevel(log.DebugLevel)
 	os.Setenv("LIBNETWORK_CFG", "libnetwork.toml")
 	controller, err := libnetwork.New("libnetwork.toml")
 	if err != nil {
@@ -27,9 +29,12 @@ func main() {
 	for i := 0; i < 100; i++ {
 		netw, err := controller.NewNetwork(netType, fmt.Sprintf("Gordon-%d", i))
 		if err != nil {
-			log.Fatal(err)
+			if _, ok := err.(libnetwork.NetworkNameError); !ok {
+				log.Fatal(err)
+			}
+		} else {
+			fmt.Println("Network Created Successfully :", netw)
 		}
-		fmt.Println("Network Created Successfully :", netw)
 		time.Sleep(10 * time.Second)
 	}
 }

+ 52 - 36
libnetwork/controller.go

@@ -61,7 +61,6 @@ import (
 	"github.com/docker/libnetwork/hostdiscovery"
 	"github.com/docker/libnetwork/sandbox"
 	"github.com/docker/libnetwork/types"
-	"github.com/docker/swarm/pkg/store"
 )
 
 // NetworkController provides the interface for controller instance which manages
@@ -104,6 +103,7 @@ type controller struct {
 	sandboxes sandboxTable
 	cfg       *config.Config
 	store     datastore.DataStore
+	stopChan  chan struct{}
 	sync.Mutex
 }
 
@@ -133,6 +133,7 @@ func New(configFile string) (NetworkController, error) {
 		// But without that, datastore cannot be initialized.
 		log.Debugf("Unable to Parse LibNetwork Config file : %v", err)
 	}
+	c.stopChan = make(chan struct{})
 
 	return c, nil
 }
@@ -172,9 +173,7 @@ func (c *controller) initDataStore() error {
 	c.Lock()
 	c.store = store
 	c.Unlock()
-	go c.watchNewNetworks()
-
-	return nil
+	return c.watchNewNetworks()
 }
 
 func (c *controller) initDiscovery() error {
@@ -242,17 +241,15 @@ func (c *controller) NewNetwork(networkType, name string, options ...NetworkOpti
 
 	// Construct the network object
 	network := &network{
-		name:      name,
-		id:        types.UUID(stringid.GenerateRandomID()),
-		ctrlr:     c,
-		driver:    d,
-		endpoints: endpointTable{},
+		name:        name,
+		networkType: networkType,
+		id:          types.UUID(stringid.GenerateRandomID()),
+		ctrlr:       c,
+		driver:      d,
+		endpoints:   endpointTable{},
 	}
 
 	network.processOptions(options...)
-	if err := c.addNetworkToStore(network); err != nil {
-		return nil, err
-	}
 	// Create the network
 	if err := d.CreateNetwork(network.id, network.generic); err != nil {
 		return nil, err
@@ -263,6 +260,10 @@ func (c *controller) NewNetwork(networkType, name string, options ...NetworkOpti
 	c.networks[network.id] = network
 	c.Unlock()
 
+	if err := c.addNetworkToStore(network); err != nil {
+		return nil, err
+	}
+
 	return network, nil
 }
 
@@ -291,38 +292,53 @@ func (c *controller) addNetworkToStore(n *network) error {
 		log.Debugf("datastore not initialized. Network %s is not added to the store", n.Name())
 		return nil
 	}
-	return cs.PutObjectAtomic(n)
+
+	// Commenting out AtomicPut due to https://github.com/docker/swarm/issues/875,
+	// Also Network object is Keyed with UUID & hence an Atomic put is not mandatory.
+	// return cs.PutObjectAtomic(n)
+
+	return cs.PutObject(n)
 }
 
-func (c *controller) watchNewNetworks() {
+func (c *controller) watchNewNetworks() error {
 	c.Lock()
 	cs := c.store
 	c.Unlock()
 
-	cs.KVStore().WatchRange(datastore.Key(datastore.NetworkKeyPrefix), "", 0, func(kvi []store.KVEntry) {
-		for _, kve := range kvi {
-			var n network
-			err := json.Unmarshal(kve.Value(), &n)
-			if err != nil {
-				log.Error(err)
-				continue
-			}
-			n.dbIndex = kve.LastIndex()
-			c.Lock()
-			existing, ok := c.networks[n.id]
-			c.Unlock()
-			if ok && existing.dbIndex == n.dbIndex {
-				// Skip any watch notification for a network that has not changed
-				continue
-			} else if ok {
-				// Received an update for an existing network object
-				log.Debugf("Skipping network update for %s (%s)", n.name, n.id)
-				continue
+	kvPairs, err := cs.KVStore().WatchTree(datastore.Key(datastore.NetworkKeyPrefix), c.stopChan)
+	if err != nil {
+		return err
+	}
+	go func() {
+		for {
+			select {
+			case kvs := <-kvPairs:
+				for _, kve := range kvs {
+					var n network
+					err := json.Unmarshal(kve.Value, &n)
+					if err != nil {
+						log.Error(err)
+						continue
+					}
+					n.dbIndex = kve.LastIndex
+					c.Lock()
+					existing, ok := c.networks[n.id]
+					c.Unlock()
+					if ok && existing.dbIndex == n.dbIndex {
+						// Skip any watch notification for a network that has not changed
+						continue
+					} else if ok {
+						// Received an update for an existing network object
+						log.Debugf("Skipping network update for %s (%s)", n.name, n.id)
+						continue
+					}
+
+					c.newNetworkFromStore(&n)
+				}
 			}
-
-			c.newNetworkFromStore(&n)
 		}
-	})
+	}()
+	return nil
 }
 
 func (c *controller) Networks() []Network {

+ 6 - 8
libnetwork/datastore/datastore.go

@@ -53,7 +53,7 @@ var errInvalidAtomicRequest = errors.New("Invalid Atomic Request")
 
 // newClient used to connect to KV Store
 func newClient(kv string, addrs string) (DataStore, error) {
-	store, err := store.CreateStore(kv, []string{addrs}, store.Config{})
+	store, err := store.NewStore(store.Backend(kv), []string{addrs}, &store.Config{})
 	if err != nil {
 		return nil, err
 	}
@@ -89,16 +89,14 @@ func (ds *datastore) PutObjectAtomic(kvObject KV) error {
 	if kvObjValue == nil {
 		return errInvalidAtomicRequest
 	}
-	_, err := ds.store.AtomicPut(Key(kvObject.Key()...), []byte{}, kvObjValue, kvObject.Index())
-	if err != nil {
-		return err
-	}
 
-	_, index, err := ds.store.Get(Key(kvObject.Key()...))
+	previous := &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()}
+	_, pair, err := ds.store.AtomicPut(Key(kvObject.Key()...), kvObjValue, previous, nil)
 	if err != nil {
 		return err
 	}
-	kvObject.SetIndex(index)
+
+	kvObject.SetIndex(pair.LastIndex)
 	return nil
 }
 
@@ -116,5 +114,5 @@ func (ds *datastore) putObjectWithKey(kvObject KV, key ...string) error {
 	if kvObjValue == nil {
 		return errors.New("Object must provide marshalled data for key : " + Key(kvObject.Key()...))
 	}
-	return ds.store.Put(Key(key...), kvObjValue)
+	return ds.store.Put(Key(key...), kvObjValue, nil)
 }

+ 5 - 5
libnetwork/datastore/datastore_test.go

@@ -35,12 +35,12 @@ func TestKVObjectFlatKey(t *testing.T) {
 		t.Fatal(err)
 	}
 	keychain := []string{dummyKey, "1000"}
-	data, _, err := store.KVStore().Get(Key(keychain...))
+	data, err := store.KVStore().Get(Key(keychain...))
 	if err != nil {
 		t.Fatal(err)
 	}
 	var n dummyObject
-	json.Unmarshal(data, &n)
+	json.Unmarshal(data.Value, &n)
 	if n.Name != expected.Name {
 		t.Fatalf("Dummy object doesnt match the expected object")
 	}
@@ -63,14 +63,14 @@ func TestAtomicKVObjectFlatKey(t *testing.T) {
 
 	// Get the latest index and try PutObjectAtomic again for the same Key
 	// This must succeed as well
-	data, index, err := store.KVStore().Get(Key(expected.Key()...))
+	data, err := store.KVStore().Get(Key(expected.Key()...))
 	if err != nil {
 		t.Fatal(err)
 	}
 	n := dummyObject{}
-	json.Unmarshal(data, &n)
+	json.Unmarshal(data.Value, &n)
 	n.ID = "1111"
-	n.DBIndex = index
+	n.DBIndex = data.LastIndex
 	n.ReturnValue = true
 	err = store.PutObjectAtomic(&n)
 	if err != nil {

+ 26 - 45
libnetwork/datastore/mock_store.go

@@ -2,7 +2,6 @@ package datastore
 
 import (
 	"errors"
-	"time"
 
 	"github.com/docker/swarm/pkg/store"
 )
@@ -31,17 +30,17 @@ func NewMockStore() *MockStore {
 
 // Get the value at "key", returns the last modified index
 // to use in conjunction to CAS calls
-func (s *MockStore) Get(key string) (value []byte, lastIndex uint64, err error) {
+func (s *MockStore) Get(key string) (*store.KVPair, error) {
 	mData := s.db[key]
 	if mData == nil {
-		return nil, 0, nil
+		return nil, nil
 	}
-	return mData.Data, mData.Index, nil
+	return &store.KVPair{Value: mData.Data, LastIndex: mData.Index}, nil
 
 }
 
 // Put a value at "key"
-func (s *MockStore) Put(key string, value []byte) error {
+func (s *MockStore) Put(key string, value []byte, options *store.WriteOptions) error {
 	mData := s.db[key]
 	if mData == nil {
 		mData = &MockData{value, 0}
@@ -63,68 +62,50 @@ func (s *MockStore) Exists(key string) (bool, error) {
 	return ok, nil
 }
 
-// GetRange gets a range of values at "directory"
-func (s *MockStore) GetRange(prefix string) (values []store.KVEntry, err error) {
+// List gets a range of values at "directory"
+func (s *MockStore) List(prefix string) ([]*store.KVPair, error) {
 	return nil, ErrNotImplmented
 }
 
-// DeleteRange deletes a range of values at "directory"
-func (s *MockStore) DeleteRange(prefix string) error {
+// DeleteTree deletes a range of values at "directory"
+func (s *MockStore) DeleteTree(prefix string) error {
 	return ErrNotImplmented
 }
 
 // Watch a single key for modifications
-func (s *MockStore) Watch(key string, heartbeat time.Duration, callback store.WatchCallback) error {
-	return ErrNotImplmented
-}
-
-// CancelWatch cancels a watch, sends a signal to the appropriate
-// stop channel
-func (s *MockStore) CancelWatch(key string) error {
-	return ErrNotImplmented
-}
-
-// Internal function to check if a key has changed
-func (s *MockStore) waitForChange(key string) <-chan uint64 {
-	return nil
-}
-
-// WatchRange triggers a watch on a range of values at "directory"
-func (s *MockStore) WatchRange(prefix string, filter string, heartbeat time.Duration, callback store.WatchCallback) error {
-	return ErrNotImplmented
-}
-
-// CancelWatchRange stops the watch on the range of values, sends
-// a signal to the appropriate stop channel
-func (s *MockStore) CancelWatchRange(prefix string) error {
-	return ErrNotImplmented
+func (s *MockStore) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) {
+	return nil, ErrNotImplmented
 }
 
-// Acquire the lock for "key"/"directory"
-func (s *MockStore) Acquire(key string, value []byte) (string, error) {
-	return "", ErrNotImplmented
+// WatchTree triggers a watch on a range of values at "directory"
+func (s *MockStore) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) {
+	return nil, ErrNotImplmented
 }
 
-// Release the lock for "key"/"directory"
-func (s *MockStore) Release(id string) error {
-	return ErrNotImplmented
+// NewLock exposed
+func (s *MockStore) NewLock(key string, options *store.LockOptions) (store.Locker, error) {
+	return nil, ErrNotImplmented
 }
 
 // AtomicPut put a value at "key" if the key has not been
 // modified in the meantime, throws an error if this is the case
-func (s *MockStore) AtomicPut(key string, _ []byte, newValue []byte, index uint64) (bool, error) {
+func (s *MockStore) AtomicPut(key string, newValue []byte, previous *store.KVPair, options *store.WriteOptions) (bool, *store.KVPair, error) {
 	mData := s.db[key]
-	if mData != nil && mData.Index != index {
-		return false, errInvalidAtomicRequest
+	if mData != nil && mData.Index != previous.LastIndex {
+		return false, nil, errInvalidAtomicRequest
+	}
+	err := s.Put(key, newValue, nil)
+	if err != nil {
+		return false, nil, err
 	}
-	return true, s.Put(key, newValue)
+	return true, &store.KVPair{Key: key, Value: newValue, LastIndex: s.db[key].Index}, nil
 }
 
 // AtomicDelete deletes a value at "key" if the key has not
 // been modified in the meantime, throws an error if this is the case
-func (s *MockStore) AtomicDelete(key string, oldValue []byte, index uint64) (bool, error) {
+func (s *MockStore) AtomicDelete(key string, previous *store.KVPair) (bool, error) {
 	mData := s.db[key]
-	if mData != nil && mData.Index != index {
+	if mData != nil && mData.Index != previous.LastIndex {
 		return false, errInvalidAtomicRequest
 	}
 	return true, s.Delete(key)

+ 28 - 25
libnetwork/hostdiscovery/hostdiscovery.go

@@ -24,7 +24,7 @@ import (
 	_ "github.com/docker/swarm/discovery/token"
 )
 
-const defaultHeartbeat = 10
+const defaultHeartbeat = time.Duration(10) * time.Second
 
 type hostDiscovery struct {
 	discovery discovery.Discovery
@@ -43,17 +43,17 @@ func (h *hostDiscovery) StartDiscovery(cfg *config.ClusterCfg, joinCallback Join
 		return fmt.Errorf("discovery requires a valid configuration")
 	}
 
-	hb := cfg.Heartbeat
+	hb := time.Duration(cfg.Heartbeat) * time.Second
 	if hb == 0 {
 		hb = defaultHeartbeat
 	}
-	d, err := discovery.New(cfg.Discovery, hb)
+	d, err := discovery.New(cfg.Discovery, hb, 3*hb)
 	if err != nil {
 		return err
 	}
 
 	if ip := net.ParseIP(cfg.Address); ip == nil {
-		return errors.New("Address config should be either ipv4 or ipv6 address")
+		return errors.New("address config should be either ipv4 or ipv6 address")
 	}
 
 	if err := d.Register(cfg.Address + ":0"); err != nil {
@@ -64,14 +64,25 @@ func (h *hostDiscovery) StartDiscovery(cfg *config.ClusterCfg, joinCallback Join
 	h.discovery = d
 	h.Unlock()
 
-	go d.Watch(func(entries []*discovery.Entry) {
-		h.processCallback(entries, joinCallback, leaveCallback)
-	})
-
-	go sustainHeartbeat(d, hb, cfg, h.stopChan)
+	discoveryCh, errCh := d.Watch(h.stopChan)
+	go h.monitorDiscovery(discoveryCh, errCh, joinCallback, leaveCallback)
+	go h.sustainHeartbeat(d, hb, cfg)
 	return nil
 }
 
+func (h *hostDiscovery) monitorDiscovery(ch <-chan discovery.Entries, errCh <-chan error, joinCallback JoinCallback, leaveCallback LeaveCallback) {
+	for {
+		select {
+		case entries := <-ch:
+			h.processCallback(entries, joinCallback, leaveCallback)
+		case err := <-errCh:
+			log.Errorf("discovery error: %v", err)
+		case <-h.stopChan:
+			return
+		}
+	}
+}
+
 func (h *hostDiscovery) StopDiscovery() error {
 	h.Lock()
 	stopChan := h.stopChan
@@ -82,12 +93,12 @@ func (h *hostDiscovery) StopDiscovery() error {
 	return nil
 }
 
-func sustainHeartbeat(d discovery.Discovery, hb uint64, config *config.ClusterCfg, stopChan chan struct{}) {
+func (h *hostDiscovery) sustainHeartbeat(d discovery.Discovery, hb time.Duration, config *config.ClusterCfg) {
 	for {
 		select {
-		case <-stopChan:
+		case <-h.stopChan:
 			return
-		case <-time.After(time.Duration(hb) * time.Second):
+		case <-time.After(hb):
 			if err := d.Register(config.Address + ":0"); err != nil {
 				log.Warn(err)
 			}
@@ -95,7 +106,7 @@ func sustainHeartbeat(d discovery.Discovery, hb uint64, config *config.ClusterCf
 	}
 }
 
-func (h *hostDiscovery) processCallback(entries []*discovery.Entry, joinCallback JoinCallback, leaveCallback LeaveCallback) {
+func (h *hostDiscovery) processCallback(entries discovery.Entries, joinCallback JoinCallback, leaveCallback LeaveCallback) {
 	updated := hosts(entries)
 	h.Lock()
 	existing := h.nodes
@@ -125,23 +136,15 @@ func diff(existing mapset.Set, updated mapset.Set) (added []net.IP, removed []ne
 
 func (h *hostDiscovery) Fetch() ([]net.IP, error) {
 	h.Lock()
-	hd := h.discovery
-	h.Unlock()
-	if hd == nil {
-		return nil, errors.New("No Active Discovery")
-	}
-	entries, err := hd.Fetch()
-	if err != nil {
-		return nil, err
-	}
+	defer h.Unlock()
 	ips := []net.IP{}
-	for _, entry := range entries {
-		ips = append(ips, net.ParseIP(entry.Host))
+	for _, ipstr := range h.nodes.ToSlice() {
+		ips = append(ips, net.ParseIP(ipstr.(string)))
 	}
 	return ips, nil
 }
 
-func hosts(entries []*discovery.Entry) mapset.Set {
+func hosts(entries discovery.Entries) mapset.Set {
 	hosts := mapset.NewSet()
 	for _, entry := range entries {
 		hosts.Add(entry.Host)

+ 2 - 2
libnetwork/hostdiscovery/hostdiscovery_test.go

@@ -15,7 +15,7 @@ import (
 )
 
 func TestDiscovery(t *testing.T) {
-	_, err := net.Dial("tcp", "discovery-stage.hub.docker.com:80")
+	_, err := net.DialTimeout("tcp", "discovery-stage.hub.docker.com:80", 10*time.Second)
 	if err != nil {
 		t.Skip("Skipping Discovery test which need connectivity to discovery-stage.hub.docker.com")
 	}
@@ -52,7 +52,7 @@ func TestDiscovery(t *testing.T) {
 }
 
 func TestBadDiscovery(t *testing.T) {
-	_, err := net.Dial("tcp", "discovery-stage.hub.docker.com:80")
+	_, err := net.DialTimeout("tcp", "discovery-stage.hub.docker.com:80", 10*time.Second)
 	if err != nil {
 		t.Skip("Skipping Discovery test which need connectivity to discovery-stage.hub.docker.com")
 	}