Pārlūkot izejas kodu

Update libkv to latest commit

Signed-off-by: Chun Chen <ramichen@tencent.com>
Chun Chen 9 gadi atpakaļ
vecāks
revīzija
ebbca4814e

+ 1 - 1
libnetwork/Godeps/Godeps.json

@@ -76,7 +76,7 @@
 		},
 		{
 			"ImportPath": "github.com/docker/libkv",
-			"Rev": "60c7c881345b3c67defc7f93a8297debf041d43c"
+			"Rev": "a0a57ed3755665e9a402a3df315402134eb6625f"
 		},
 		{
 			"ImportPath": "github.com/godbus/dbus",

+ 14 - 6
libnetwork/Godeps/_workspace/src/github.com/docker/libkv/README.md

@@ -12,7 +12,7 @@ For example, you can use it to store your metadata or for service discovery to r
 
 You can also easily implement a generic *Leader Election* on top of it (see the [swarm/leadership](https://github.com/docker/swarm/tree/master/leadership) package).
 
-As of now, `libkv` offers support for `Consul`, `Etcd` and `Zookeeper`.
+As of now, `libkv` offers support for `Consul`, `Etcd`, `Zookeeper` and `BoltDB`.
 
 ## Example of usage
 
@@ -24,17 +24,23 @@ package main
 import (
 	"fmt"
 	"time"
-	
+
 	"github.com/docker/libkv"
 	"github.com/docker/libkv/store"
+	"github.com/docker/libkv/store/consul"
 	log "github.com/Sirupsen/logrus"
 )
 
+func init() {
+	// Register consul store to libkv
+	consul.Register()
+}
+
 func main() {
 	client := "localhost:8500"
 
 	// Initialize a new store with consul
-	kv, err = libkv.NewStore(
+	kv, err := libkv.NewStore(
 		store.CONSUL, // or "consul"
 		[]string{client},
 		&store.Config{
@@ -62,11 +68,13 @@ func main() {
 
 You can find other usage examples for `libkv` under the `docker/swarm` or `docker/libnetwork` repositories.
 
-## Details
+## Warning
+
+There are a few consistency issues with *etcd*, on the notion of *directory* and *key*. If you want to use the three KV backends in an interchangeable way, you should only put data on leaves (see [Issue 20](https://github.com/docker/libkv/issues/20) for more details). This will be fixed when *etcd* API v3 will be made available (API v3 drops the *directory/key* distinction). An official release for *libkv* with a tag is likely to come after this issue being marked as **solved**.
 
-You should expect the same experience for basic operations like `Get`/`Put`, etc.
+Other than that, you should expect the same experience for basic operations like `Get`/`Put`, etc.
 
-However calls like `WatchTree` may return different events (or number of events) depending on the backend (for now, `Etcd` and `Consul` will likely return more events than `Zookeeper` that you should triage properly).
+Calls like `WatchTree` may return different events (or number of events) depending on the backend (for now, `Etcd` and `Consul` will likely return more events than `Zookeeper` that you should triage properly). Although you should be able to use it successfully to watch on events in an interchangeable way (see the **swarm/leadership** or **swarm/discovery** packages in **docker/swarm**).
 
 ## Create a new storage backend
 

+ 81 - 9
libnetwork/Godeps/_workspace/src/github.com/docker/libkv/libkv.go

@@ -1,10 +1,72 @@
+// Package libkv provides a Go native library to store metadata.
+//
+// The goal of libkv is to abstract common store operations for multiple
+// Key/Value backends and offer the same experience no matter which one of the
+// backend you want to use.
+//
+// For example, you can use it to store your metadata or for service discovery to
+// register machines and endpoints inside your cluster.
+//
+// As of now, `libkv` offers support for `Consul`, `Etcd` and `Zookeeper`.
+//
+// ## Example of usage
+//
+// ### Create a new store and use Put/Get
+//
+//
+//     package main
+//
+//     import (
+//     	"fmt"
+//     	"time"
+//
+//     	"github.com/docker/libkv"
+//     	"github.com/docker/libkv/store"
+//     	log "github.com/Sirupsen/logrus"
+//     )
+//
+//     func main() {
+//     	client := "localhost:8500"
+//
+//     	// Initialize a new store with consul
+//     	kv, err := libkv.NewStore(
+//     		store.CONSUL, // or "consul"
+//     		[]string{client},
+//     		&store.Config{
+//     			ConnectionTimeout: 10*time.Second,
+//     		},
+//     	)
+//     	if err != nil {
+//     		log.Fatal("Cannot create store consul")
+//     	}
+//
+//     	key := "foo"
+//     	err = kv.Put(key, []byte("bar"), nil)
+//     	if err != nil {
+//     		log.Error("Error trying to put value at key `", key, "`")
+//     	}
+//
+//     	pair, err := kv.Get(key)
+//     	if err != nil {
+//     		log.Error("Error trying accessing value at key `", key, "`")
+//     	}
+//
+//     	log.Info("value: ", string(pair.Value))
+//     }
+//
+// ##Copyright and license
+//
+// Code and documentation copyright 2015 Docker, inc. Code released under the
+// Apache 2.0 license. Docs released under Creative commons.
+//
 package libkv
 
 import (
+	"fmt"
+	"sort"
+	"strings"
+
 	"github.com/docker/libkv/store"
-	"github.com/docker/libkv/store/consul"
-	"github.com/docker/libkv/store/etcd"
-	"github.com/docker/libkv/store/zookeeper"
 )
 
 // Initialize creates a new Store object, initializing the client
@@ -12,11 +74,16 @@ type Initialize func(addrs []string, options *store.Config) (store.Store, error)
 
 var (
 	// Backend initializers
-	initializers = map[store.Backend]Initialize{
-		store.CONSUL: consul.New,
-		store.ETCD:   etcd.New,
-		store.ZK:     zookeeper.New,
-	}
+	initializers = make(map[store.Backend]Initialize)
+
+	supportedBackend = func() string {
+		keys := make([]string, 0, len(initializers))
+		for k := range initializers {
+			keys = append(keys, string(k))
+		}
+		sort.Strings(keys)
+		return strings.Join(keys, ", ")
+	}()
 )
 
 // NewStore creates a an instance of store
@@ -25,5 +92,10 @@ func NewStore(backend store.Backend, addrs []string, options *store.Config) (sto
 		return init(addrs, options)
 	}
 
-	return nil, store.ErrNotSupported
+	return nil, fmt.Errorf("%s %s", store.ErrNotSupported.Error(), supportedBackend)
+}
+
+// AddStore adds a new store backend to libkv
+func AddStore(store store.Backend, init Initialize) {
+	initializers[store] = init
 }

+ 1 - 57
libnetwork/Godeps/_workspace/src/github.com/docker/libkv/libkv_test.go

@@ -5,66 +5,9 @@ import (
 	"time"
 
 	"github.com/docker/libkv/store"
-	"github.com/docker/libkv/store/consul"
-	"github.com/docker/libkv/store/etcd"
-	"github.com/docker/libkv/store/zookeeper"
 	"github.com/stretchr/testify/assert"
 )
 
-func TestNewStoreConsul(t *testing.T) {
-	client := "localhost:8500"
-
-	kv, err := NewStore(
-		store.CONSUL,
-		[]string{client},
-		&store.Config{
-			ConnectionTimeout: 10 * time.Second,
-		},
-	)
-	assert.NoError(t, err)
-	assert.NotNil(t, kv)
-
-	if _, ok := kv.(*consul.Consul); !ok {
-		t.Fatal("Error while initializing store consul")
-	}
-}
-
-func TestNewStoreEtcd(t *testing.T) {
-	client := "localhost:4001"
-
-	kv, err := NewStore(
-		store.ETCD,
-		[]string{client},
-		&store.Config{
-			ConnectionTimeout: 10 * time.Second,
-		},
-	)
-	assert.NoError(t, err)
-	assert.NotNil(t, kv)
-
-	if _, ok := kv.(*etcd.Etcd); !ok {
-		t.Fatal("Error while initializing store etcd")
-	}
-}
-
-func TestNewStoreZookeeper(t *testing.T) {
-	client := "localhost:2181"
-
-	kv, err := NewStore(
-		store.ZK,
-		[]string{client},
-		&store.Config{
-			ConnectionTimeout: 10 * time.Second,
-		},
-	)
-	assert.NoError(t, err)
-	assert.NotNil(t, kv)
-
-	if _, ok := kv.(*zookeeper.Zookeeper); !ok {
-		t.Fatal("Error while initializing store zookeeper")
-	}
-}
-
 func TestNewStoreUnsupported(t *testing.T) {
 	client := "localhost:9999"
 
@@ -77,4 +20,5 @@ func TestNewStoreUnsupported(t *testing.T) {
 	)
 	assert.Error(t, err)
 	assert.Nil(t, kv)
+	assert.Equal(t, "Backend storage not supported yet, please choose one of ", err.Error())
 }

+ 36 - 19
libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/consul/consul.go

@@ -2,11 +2,13 @@ package consul
 
 import (
 	"crypto/tls"
+	"errors"
 	"net/http"
 	"strings"
 	"sync"
 	"time"
 
+	"github.com/docker/libkv"
 	"github.com/docker/libkv/store"
 	api "github.com/hashicorp/consul/api"
 )
@@ -18,22 +20,36 @@ const (
 	DefaultWatchWaitTime = 15 * time.Second
 )
 
+var (
+	// ErrMultipleEndpointsUnsupported is thrown when there are
+	// multiple endpoints specified for Consul
+	ErrMultipleEndpointsUnsupported = errors.New("consul does not support multiple endpoints")
+)
+
 // Consul is the receiver type for the
 // Store interface
 type Consul struct {
 	sync.Mutex
-	config       *api.Config
-	client       *api.Client
-	ephemeralTTL time.Duration
+	config *api.Config
+	client *api.Client
 }
 
 type consulLock struct {
 	lock *api.Lock
 }
 
+// Register registers consul to libkv
+func Register() {
+	libkv.AddStore(store.CONSUL, New)
+}
+
 // New creates a new Consul client given a list
 // of endpoints and optional tls config
 func New(endpoints []string, options *store.Config) (store.Store, error) {
+	if len(endpoints) > 1 {
+		return nil, ErrMultipleEndpointsUnsupported
+	}
+
 	s := &Consul{}
 
 	// Create Consul client
@@ -51,9 +67,6 @@ func New(endpoints []string, options *store.Config) (store.Store, error) {
 		if options.ConnectionTimeout != 0 {
 			s.setTimeout(options.ConnectionTimeout)
 		}
-		if options.EphemeralTTL != 0 {
-			s.setEphemeralTTL(options.EphemeralTTL)
-		}
 	}
 
 	// Creates a new client
@@ -79,18 +92,13 @@ func (s *Consul) setTimeout(time time.Duration) {
 	s.config.WaitTime = time
 }
 
-// SetEphemeralTTL sets the ttl for ephemeral nodes
-func (s *Consul) setEphemeralTTL(ttl time.Duration) {
-	s.ephemeralTTL = ttl
-}
-
 // Normalize the key for usage in Consul
 func (s *Consul) normalize(key string) string {
 	key = store.Normalize(key)
 	return strings.TrimPrefix(key, "/")
 }
 
-func (s *Consul) refreshSession(pair *api.KVPair) error {
+func (s *Consul) refreshSession(pair *api.KVPair, ttl time.Duration) error {
 	// Check if there is any previous session with an active TTL
 	session, err := s.getActiveSession(pair.Key)
 	if err != nil {
@@ -99,9 +107,9 @@ func (s *Consul) refreshSession(pair *api.KVPair) error {
 
 	if session == "" {
 		entry := &api.SessionEntry{
-			Behavior:  api.SessionBehaviorDelete,       // Delete the key when the session expires
-			TTL:       ((s.ephemeralTTL) / 2).String(), // Consul multiplies the TTL by 2x
-			LockDelay: 1 * time.Millisecond,            // Virtually disable lock delay
+			Behavior:  api.SessionBehaviorDelete, // Delete the key when the session expires
+			TTL:       (ttl / 2).String(),        // Consul multiplies the TTL by 2x
+			LockDelay: 1 * time.Millisecond,      // Virtually disable lock delay
 		}
 
 		// Create the key session
@@ -126,7 +134,7 @@ func (s *Consul) refreshSession(pair *api.KVPair) error {
 
 	_, _, err = s.client.Session().Renew(session, nil)
 	if err != nil {
-		return s.refreshSession(pair)
+		return s.refreshSession(pair, ttl)
 	}
 	return nil
 }
@@ -174,9 +182,9 @@ func (s *Consul) Put(key string, value []byte, opts *store.WriteOptions) error {
 		Value: value,
 	}
 
-	if opts != nil && opts.Ephemeral {
+	if opts != nil && opts.TTL > 0 {
 		// Create or refresh the session
-		err := s.refreshSession(p)
+		err := s.refreshSession(p, opts.TTL)
 		if err != nil {
 			return err
 		}
@@ -188,6 +196,9 @@ func (s *Consul) Put(key string, value []byte, opts *store.WriteOptions) error {
 
 // Delete a value at "key"
 func (s *Consul) Delete(key string) error {
+	if _, err := s.Get(key); err != nil {
+		return err
+	}
 	_, err := s.client.KV().Delete(s.normalize(key), nil)
 	return err
 }
@@ -195,7 +206,10 @@ func (s *Consul) Delete(key string) error {
 // Exists checks that the key exists inside the store
 func (s *Consul) Exists(key string) (bool, error) {
 	_, err := s.Get(key)
-	if err != nil && err == store.ErrKeyNotFound {
+	if err != nil {
+		if err == store.ErrKeyNotFound {
+			return false, nil
+		}
 		return false, err
 	}
 	return true, nil
@@ -229,6 +243,9 @@ func (s *Consul) List(directory string) ([]*store.KVPair, error) {
 
 // DeleteTree deletes a range of keys under a given directory
 func (s *Consul) DeleteTree(directory string) error {
+	if _, err := s.List(directory); err != nil {
+		return err
+	}
 	_, err := s.client.KV().DeleteTree(s.normalize(directory), nil)
 	return err
 }

+ 24 - 4
libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/consul/consul_test.go

@@ -4,19 +4,22 @@ import (
 	"testing"
 	"time"
 
+	"github.com/docker/libkv"
 	"github.com/docker/libkv/store"
 	"github.com/docker/libkv/testutils"
 	"github.com/stretchr/testify/assert"
 )
 
+var (
+	client = "localhost:8500"
+)
+
 func makeConsulClient(t *testing.T) store.Store {
-	client := "localhost:8500"
 
 	kv, err := New(
 		[]string{client},
 		&store.Config{
 			ConnectionTimeout: 3 * time.Second,
-			EphemeralTTL:      2 * time.Second,
 		},
 	)
 
@@ -27,11 +30,28 @@ func makeConsulClient(t *testing.T) store.Store {
 	return kv
 }
 
+func TestRegister(t *testing.T) {
+	Register()
+
+	kv, err := libkv.NewStore(store.CONSUL, []string{client}, nil)
+	assert.NoError(t, err)
+	assert.NotNil(t, kv)
+
+	if _, ok := kv.(*Consul); !ok {
+		t.Fatal("Error registering and initializing consul")
+	}
+}
+
 func TestConsulStore(t *testing.T) {
 	kv := makeConsulClient(t)
 	backup := makeConsulClient(t)
 
-	testutils.RunTestStore(t, kv, backup)
+	testutils.RunTestCommon(t, kv)
+	testutils.RunTestAtomic(t, kv)
+	testutils.RunTestWatch(t, kv)
+	testutils.RunTestLock(t, kv)
+	testutils.RunTestTTL(t, kv, backup)
+	testutils.RunCleanup(t, kv)
 }
 
 func TestGetActiveSession(t *testing.T) {
@@ -43,7 +63,7 @@ func TestGetActiveSession(t *testing.T) {
 	value := []byte("bar")
 
 	// Put the first key with the Ephemeral flag
-	err := kv.Put(key, value, &store.WriteOptions{Ephemeral: true})
+	err := kv.Put(key, value, &store.WriteOptions{TTL: 2 * time.Second})
 	assert.NoError(t, err)
 
 	// Session should not be empty

+ 60 - 36
libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/etcd/etcd.go

@@ -8,14 +8,14 @@ import (
 	"time"
 
 	etcd "github.com/coreos/go-etcd/etcd"
+	"github.com/docker/libkv"
 	"github.com/docker/libkv/store"
 )
 
 // Etcd is the receiver type for the
 // Store interface
 type Etcd struct {
-	client       *etcd.Client
-	ephemeralTTL time.Duration
+	client *etcd.Client
 }
 
 type etcdLock struct {
@@ -33,6 +33,11 @@ const (
 	defaultUpdateTime = 5 * time.Second
 )
 
+// Register registers etcd to libkv
+func Register() {
+	libkv.AddStore(store.ETCD, New)
+}
+
 // New creates a new Etcd client given a list
 // of endpoints and an optional tls config
 func New(addrs []string, options *store.Config) (store.Store, error) {
@@ -49,9 +54,6 @@ func New(addrs []string, options *store.Config) (store.Store, error) {
 		if options.ConnectionTimeout != 0 {
 			s.setTimeout(options.ConnectionTimeout)
 		}
-		if options.EphemeralTTL != 0 {
-			s.setEphemeralTTL(options.EphemeralTTL)
-		}
 	}
 
 	// Periodic SyncCluster
@@ -93,12 +95,6 @@ func (s *Etcd) setTimeout(time time.Duration) {
 	s.client.SetDialTimeout(time)
 }
 
-// setEphemeralHeartbeat sets the heartbeat value to notify
-// that a node is alive
-func (s *Etcd) setEphemeralTTL(time time.Duration) {
-	s.ephemeralTTL = time
-}
-
 // createDirectory creates the entire path for a directory
 // that does not exist
 func (s *Etcd) createDirectory(path string) error {
@@ -120,11 +116,8 @@ func (s *Etcd) createDirectory(path string) error {
 func (s *Etcd) Get(key string) (pair *store.KVPair, err error) {
 	result, err := s.client.Get(store.Normalize(key), false, false)
 	if err != nil {
-		if etcdError, ok := err.(*etcd.EtcdError); ok {
-			// Not a Directory or Not a file
-			if etcdError.ErrorCode == 102 || etcdError.ErrorCode == 104 {
-				return nil, store.ErrKeyNotFound
-			}
+		if isKeyNotFoundError(err) {
+			return nil, store.ErrKeyNotFound
 		}
 		return nil, err
 	}
@@ -143,8 +136,8 @@ func (s *Etcd) Put(key string, value []byte, opts *store.WriteOptions) error {
 
 	// Default TTL = 0 means no expiration
 	var ttl uint64
-	if opts != nil && opts.Ephemeral {
-		ttl = uint64(s.ephemeralTTL.Seconds())
+	if opts != nil && opts.TTL > 0 {
+		ttl = uint64(opts.TTL.Seconds())
 	}
 
 	if _, err := s.client.Set(key, string(value), ttl); err != nil {
@@ -173,14 +166,17 @@ func (s *Etcd) Put(key string, value []byte, opts *store.WriteOptions) error {
 // Delete a value at "key"
 func (s *Etcd) Delete(key string) error {
 	_, err := s.client.Delete(store.Normalize(key), false)
+	if isKeyNotFoundError(err) {
+		return store.ErrKeyNotFound
+	}
 	return err
 }
 
 // Exists checks if the key exists inside the store
 func (s *Etcd) Exists(key string) (bool, error) {
-	entry, err := s.Get(key)
-	if err != nil && entry != nil {
-		if err == store.ErrKeyNotFound || entry.Value == nil {
+	_, err := s.Get(key)
+	if err != nil {
+		if err == store.ErrKeyNotFound {
 			return false, nil
 		}
 		return false, err
@@ -194,12 +190,6 @@ func (s *Etcd) Exists(key string) (bool, error) {
 // be sent to the channel. Providing a non-nil stopCh can
 // be used to stop watching.
 func (s *Etcd) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) {
-	// Get the current value
-	current, err := s.Get(key)
-	if err != nil {
-		return nil, err
-	}
-
 	// Start an etcd watch.
 	// Note: etcd will send the current value through the channel.
 	etcdWatchCh := make(chan *etcd.Response)
@@ -212,12 +202,23 @@ func (s *Etcd) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair,
 	go func() {
 		defer close(watchCh)
 
+		// Get the current value
+		current, err := s.Get(key)
+		if err != nil {
+			return
+		}
+
 		// Push the current value through the channel.
 		watchCh <- current
 
 		for {
 			select {
 			case result := <-etcdWatchCh:
+				if result == nil || result.Node == nil {
+					// Something went wrong, exit
+					// No need to stop the chan as the watch already ended
+					return
+				}
 				watchCh <- &store.KVPair{
 					Key:       key,
 					Value:     []byte(result.Node.Value),
@@ -238,12 +239,6 @@ func (s *Etcd) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair,
 // will be sent to the channel .Providing a non-nil stopCh can
 // be used to stop watching.
 func (s *Etcd) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) {
-	// Get child values
-	current, err := s.List(directory)
-	if err != nil {
-		return nil, err
-	}
-
 	// Start the watch
 	etcdWatchCh := make(chan *etcd.Response)
 	etcdStopCh := make(chan bool)
@@ -255,12 +250,23 @@ func (s *Etcd) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*st
 	go func() {
 		defer close(watchCh)
 
+		// Get child values
+		current, err := s.List(directory)
+		if err != nil {
+			return
+		}
+
 		// Push the current value through the channel.
 		watchCh <- current
 
 		for {
 			select {
-			case <-etcdWatchCh:
+			case event := <-etcdWatchCh:
+				if event == nil {
+					// Something went wrong, exit
+					// No need to stop the chan as the watch already ended
+					return
+				}
 				// FIXME: We should probably use the value pushed by the channel.
 				// However, Node.Nodes seems to be empty.
 				if list, err := s.List(directory); err == nil {
@@ -349,6 +355,9 @@ func (s *Etcd) AtomicDelete(key string, previous *store.KVPair) (bool, error) {
 func (s *Etcd) List(directory string) ([]*store.KVPair, error) {
 	resp, err := s.client.Get(store.Normalize(directory), true, true)
 	if err != nil {
+		if isKeyNotFoundError(err) {
+			return nil, store.ErrKeyNotFound
+		}
 		return nil, err
 	}
 	kv := []*store.KVPair{}
@@ -366,6 +375,9 @@ func (s *Etcd) List(directory string) ([]*store.KVPair, error) {
 // DeleteTree deletes a range of keys under a given directory
 func (s *Etcd) DeleteTree(directory string) error {
 	_, err := s.client.Delete(store.Normalize(directory), true)
+	if isKeyNotFoundError(err) {
+		return store.ErrKeyNotFound
+	}
 	return err
 }
 
@@ -422,7 +434,7 @@ func (l *etcdLock) Lock() (<-chan struct{}, error) {
 			lastIndex = resp.Node.ModifiedIndex
 		}
 
-		_, err = l.client.CompareAndSwap(key, l.value, l.ttl, "", lastIndex)
+		l.last, err = l.client.CompareAndSwap(key, l.value, l.ttl, "", lastIndex)
 
 		if err == nil {
 			// Leader section
@@ -457,7 +469,7 @@ func (l *etcdLock) holdLock(key string, lockHeld chan struct{}, stopLocking chan
 	for {
 		select {
 		case <-update.C:
-			l.last, err = l.client.Update(key, l.value, l.ttl)
+			l.last, err = l.client.CompareAndSwap(key, l.value, l.ttl, "", l.last.Node.ModifiedIndex)
 			if err != nil {
 				return
 			}
@@ -497,3 +509,15 @@ func (l *etcdLock) Unlock() error {
 func (s *Etcd) Close() {
 	return
 }
+
+func isKeyNotFoundError(err error) bool {
+	if err != nil {
+		if etcdError, ok := err.(*etcd.EtcdError); ok {
+			// Not a Directory or Not a file
+			if etcdError.ErrorCode == 100 || etcdError.ErrorCode == 102 || etcdError.ErrorCode == 104 {
+				return true
+			}
+		}
+	}
+	return false
+}

+ 24 - 4
libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/etcd/etcd_test.go

@@ -4,18 +4,21 @@ import (
 	"testing"
 	"time"
 
+	"github.com/docker/libkv"
 	"github.com/docker/libkv/store"
 	"github.com/docker/libkv/testutils"
+	"github.com/stretchr/testify/assert"
 )
 
-func makeEtcdClient(t *testing.T) store.Store {
-	client := "localhost:4001"
+var (
+	client = "localhost:4001"
+)
 
+func makeEtcdClient(t *testing.T) store.Store {
 	kv, err := New(
 		[]string{client},
 		&store.Config{
 			ConnectionTimeout: 3 * time.Second,
-			EphemeralTTL:      2 * time.Second,
 		},
 	)
 
@@ -26,9 +29,26 @@ func makeEtcdClient(t *testing.T) store.Store {
 	return kv
 }
 
+func TestRegister(t *testing.T) {
+	Register()
+
+	kv, err := libkv.NewStore(store.ETCD, []string{client}, nil)
+	assert.NoError(t, err)
+	assert.NotNil(t, kv)
+
+	if _, ok := kv.(*Etcd); !ok {
+		t.Fatal("Error registering and initializing etcd")
+	}
+}
+
 func TestEtcdStore(t *testing.T) {
 	kv := makeEtcdClient(t)
 	backup := makeEtcdClient(t)
 
-	testutils.RunTestStore(t, kv, backup)
+	testutils.RunTestCommon(t, kv)
+	testutils.RunTestAtomic(t, kv)
+	testutils.RunTestWatch(t, kv)
+	testutils.RunTestLock(t, kv)
+	testutils.RunTestTTL(t, kv, backup)
+	testutils.RunCleanup(t, kv)
 }

+ 7 - 10
libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/store.go

@@ -16,11 +16,13 @@ const (
 	ETCD Backend = "etcd"
 	// ZK backend
 	ZK Backend = "zk"
+	// BOLTDB backend
+	BOLTDB Backend = "boltdb"
 )
 
 var (
 	// ErrNotSupported is thrown when the backend k/v store is not supported by libkv
-	ErrNotSupported = errors.New("Backend storage not supported yet, please choose another one")
+	ErrNotSupported = errors.New("Backend storage not supported yet, please choose one of")
 	// ErrNotImplemented is thrown when a method is not implemented by the current backend
 	ErrNotImplemented = errors.New("Call not implemented in current backend")
 	// ErrNotReachable is thrown when the API cannot be reached for issuing common store operations
@@ -39,7 +41,7 @@ var (
 type Config struct {
 	TLS               *tls.Config
 	ConnectionTimeout time.Duration
-	EphemeralTTL      time.Duration
+	Bucket            string
 }
 
 // Store represents the backend K/V storage
@@ -63,10 +65,10 @@ type Store interface {
 	Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error)
 
 	// WatchTree watches for changes on child nodes under
-	// a given a directory
+	// a given directory
 	WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*KVPair, error)
 
-	// CreateLock for a given key.
+	// NewLock creates a lock for a given key.
 	// The returned Locker is not held and must be acquired
 	// with `.Lock`. The Value is optional.
 	NewLock(key string, options *LockOptions) (Locker, error)
@@ -97,8 +99,7 @@ type KVPair struct {
 
 // WriteOptions contains optional request parameters
 type WriteOptions struct {
-	Heartbeat time.Duration
-	Ephemeral bool
+	TTL time.Duration
 }
 
 // LockOptions contains optional request parameters
@@ -107,10 +108,6 @@ type LockOptions struct {
 	TTL   time.Duration // Optional, expiration ttl associated with the lock
 }
 
-// WatchCallback is used for watch methods on keys
-// and is triggered on key change
-type WatchCallback func(entries ...*KVPair)
-
 // Locker provides locking mechanism on top of the store.
 // Similar to `sync.Lock` except it may return errors.
 type Locker interface {

+ 49 - 26
libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/zookeeper/zookeeper.go

@@ -4,11 +4,14 @@ import (
 	"strings"
 	"time"
 
+	"github.com/docker/libkv"
 	"github.com/docker/libkv/store"
 	zk "github.com/samuel/go-zookeeper/zk"
 )
 
-const defaultTimeout = 10 * time.Second
+const (
+	defaultTimeout = 10 * time.Second
+)
 
 // Zookeeper is the receiver type for
 // the Store interface
@@ -24,6 +27,11 @@ type zookeeperLock struct {
 	value  []byte
 }
 
+// Register registers zookeeper to libkv
+func Register() {
+	libkv.AddStore(store.ZK, New)
+}
+
 // New creates a new Zookeeper client given a
 // list of endpoints and an optional tls config
 func New(endpoints []string, options *store.Config) (store.Store, error) {
@@ -55,16 +63,15 @@ func (s *Zookeeper) setTimeout(time time.Duration) {
 // Get the value at "key", returns the last modified index
 // to use in conjunction to Atomic calls
 func (s *Zookeeper) Get(key string) (pair *store.KVPair, err error) {
-	resp, meta, err := s.client.Get(store.Normalize(key))
+	resp, meta, err := s.client.Get(s.normalize(key))
+
 	if err != nil {
+		if err == zk.ErrNoNode {
+			return nil, store.ErrKeyNotFound
+		}
 		return nil, err
 	}
 
-	// If resp is nil, the key does not exist
-	if resp == nil {
-		return nil, store.ErrKeyNotFound
-	}
-
 	pair = &store.KVPair{
 		Key:       key,
 		Value:     resp,
@@ -80,10 +87,10 @@ func (s *Zookeeper) createFullPath(path []string, ephemeral bool) error {
 	for i := 1; i <= len(path); i++ {
 		newpath := "/" + strings.Join(path[:i], "/")
 		if i == len(path) && ephemeral {
-			_, err := s.client.Create(newpath, []byte{1}, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
+			_, err := s.client.Create(newpath, []byte{}, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
 			return err
 		}
-		_, err := s.client.Create(newpath, []byte{1}, 0, zk.WorldACL(zk.PermAll))
+		_, err := s.client.Create(newpath, []byte{}, 0, zk.WorldACL(zk.PermAll))
 		if err != nil {
 			// Skip if node already exists
 			if err != zk.ErrNodeExists {
@@ -96,7 +103,7 @@ func (s *Zookeeper) createFullPath(path []string, ephemeral bool) error {
 
 // Put a value at "key"
 func (s *Zookeeper) Put(key string, value []byte, opts *store.WriteOptions) error {
-	fkey := store.Normalize(key)
+	fkey := s.normalize(key)
 
 	exists, err := s.Exists(key)
 	if err != nil {
@@ -104,10 +111,10 @@ func (s *Zookeeper) Put(key string, value []byte, opts *store.WriteOptions) erro
 	}
 
 	if !exists {
-		if opts != nil && opts.Ephemeral {
-			s.createFullPath(store.SplitKey(key), opts.Ephemeral)
+		if opts != nil && opts.TTL > 0 {
+			s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), true)
 		} else {
-			s.createFullPath(store.SplitKey(key), false)
+			s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), false)
 		}
 	}
 
@@ -117,13 +124,16 @@ func (s *Zookeeper) Put(key string, value []byte, opts *store.WriteOptions) erro
 
 // Delete a value at "key"
 func (s *Zookeeper) Delete(key string) error {
-	err := s.client.Delete(store.Normalize(key), -1)
+	err := s.client.Delete(s.normalize(key), -1)
+	if err == zk.ErrNoNode {
+		return store.ErrKeyNotFound
+	}
 	return err
 }
 
 // Exists checks if the key exists inside the store
 func (s *Zookeeper) Exists(key string) (bool, error) {
-	exists, _, err := s.client.Exists(store.Normalize(key))
+	exists, _, err := s.client.Exists(s.normalize(key))
 	if err != nil {
 		return false, err
 	}
@@ -151,7 +161,7 @@ func (s *Zookeeper) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVP
 		// to listening to any event that may occur on that key
 		watchCh <- pair
 		for {
-			_, _, eventCh, err := s.client.GetW(store.Normalize(key))
+			_, _, eventCh, err := s.client.GetW(s.normalize(key))
 			if err != nil {
 				return
 			}
@@ -195,7 +205,7 @@ func (s *Zookeeper) WatchTree(directory string, stopCh <-chan struct{}) (<-chan
 		watchCh <- entries
 
 		for {
-			_, _, eventCh, err := s.client.ChildrenW(store.Normalize(directory))
+			_, _, eventCh, err := s.client.ChildrenW(s.normalize(directory))
 			if err != nil {
 				return
 			}
@@ -218,8 +228,11 @@ func (s *Zookeeper) WatchTree(directory string, stopCh <-chan struct{}) (<-chan
 
 // List child nodes of a given directory
 func (s *Zookeeper) List(directory string) ([]*store.KVPair, error) {
-	keys, stat, err := s.client.Children(store.Normalize(directory))
+	keys, stat, err := s.client.Children(s.normalize(directory))
 	if err != nil {
+		if err == zk.ErrNoNode {
+			return nil, store.ErrKeyNotFound
+		}
 		return nil, err
 	}
 
@@ -227,8 +240,12 @@ func (s *Zookeeper) List(directory string) ([]*store.KVPair, error) {
 
 	// FIXME Costly Get request for each child key..
 	for _, key := range keys {
-		pair, err := s.Get(directory + store.Normalize(key))
+		pair, err := s.Get(strings.TrimSuffix(directory, "/") + s.normalize(key))
 		if err != nil {
+			// If node is not found: List is out of date, retry
+			if err == zk.ErrNoNode {
+				return s.List(directory)
+			}
 			return nil, err
 		}
 
@@ -253,7 +270,7 @@ func (s *Zookeeper) DeleteTree(directory string) error {
 
 	for _, pair := range pairs {
 		reqs = append(reqs, &zk.DeleteRequest{
-			Path:    store.Normalize(directory + "/" + pair.Key),
+			Path:    s.normalize(directory + "/" + pair.Key),
 			Version: -1,
 		})
 	}
@@ -268,7 +285,7 @@ func (s *Zookeeper) AtomicPut(key string, value []byte, previous *store.KVPair,
 
 	var lastIndex uint64
 	if previous != nil {
-		meta, err := s.client.Set(store.Normalize(key), value, int32(previous.LastIndex))
+		meta, err := s.client.Set(s.normalize(key), value, int32(previous.LastIndex))
 		if err != nil {
 			// Compare Failed
 			if err == zk.ErrBadVersion {
@@ -279,7 +296,7 @@ func (s *Zookeeper) AtomicPut(key string, value []byte, previous *store.KVPair,
 		lastIndex = uint64(meta.Version)
 	} else {
 		// Interpret previous == nil as create operation.
-		_, err := s.client.Create(store.Normalize(key), value, 0, zk.WorldACL(zk.PermAll))
+		_, err := s.client.Create(s.normalize(key), value, 0, zk.WorldACL(zk.PermAll))
 		if err != nil {
 			// Zookeeper will complain if the directory doesn't exist.
 			if err == zk.ErrNoNode {
@@ -290,7 +307,7 @@ func (s *Zookeeper) AtomicPut(key string, value []byte, previous *store.KVPair,
 					// Failed to create the directory.
 					return false, nil, err
 				}
-				if _, err := s.client.Create(store.Normalize(key), value, 0, zk.WorldACL(zk.PermAll)); err != nil {
+				if _, err := s.client.Create(s.normalize(key), value, 0, zk.WorldACL(zk.PermAll)); err != nil {
 					return false, nil, err
 				}
 
@@ -319,7 +336,7 @@ func (s *Zookeeper) AtomicDelete(key string, previous *store.KVPair) (bool, erro
 		return false, store.ErrPreviousNotSpecified
 	}
 
-	err := s.client.Delete(store.Normalize(key), int32(previous.LastIndex))
+	err := s.client.Delete(s.normalize(key), int32(previous.LastIndex))
 	if err != nil {
 		if err == zk.ErrBadVersion {
 			return false, store.ErrKeyModified
@@ -343,9 +360,9 @@ func (s *Zookeeper) NewLock(key string, options *store.LockOptions) (lock store.
 
 	lock = &zookeeperLock{
 		client: s.client,
-		key:    store.Normalize(key),
+		key:    s.normalize(key),
 		value:  value,
-		lock:   zk.NewLock(s.client, store.Normalize(key), zk.WorldACL(zk.PermAll)),
+		lock:   zk.NewLock(s.client, s.normalize(key), zk.WorldACL(zk.PermAll)),
 	}
 
 	return lock, err
@@ -377,3 +394,9 @@ func (l *zookeeperLock) Unlock() error {
 func (s *Zookeeper) Close() {
 	s.client.Close()
 }
+
+// Normalize the key for usage in Zookeeper
+func (s *Zookeeper) normalize(key string) string {
+	key = store.Normalize(key)
+	return strings.TrimSuffix(key, "/")
+}

+ 24 - 4
libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/zookeeper/zookeeper_test.go

@@ -4,18 +4,21 @@ import (
 	"testing"
 	"time"
 
+	"github.com/docker/libkv"
 	"github.com/docker/libkv/store"
 	"github.com/docker/libkv/testutils"
+	"github.com/stretchr/testify/assert"
 )
 
-func makeZkClient(t *testing.T) store.Store {
-	client := "localhost:2181"
+var (
+	client = "localhost:2181"
+)
 
+func makeZkClient(t *testing.T) store.Store {
 	kv, err := New(
 		[]string{client},
 		&store.Config{
 			ConnectionTimeout: 3 * time.Second,
-			EphemeralTTL:      2 * time.Second,
 		},
 	)
 
@@ -26,9 +29,26 @@ func makeZkClient(t *testing.T) store.Store {
 	return kv
 }
 
+func TestRegister(t *testing.T) {
+	Register()
+
+	kv, err := libkv.NewStore(store.ZK, []string{client}, nil)
+	assert.NoError(t, err)
+	assert.NotNil(t, kv)
+
+	if _, ok := kv.(*Zookeeper); !ok {
+		t.Fatal("Error registering and initializing zookeeper")
+	}
+}
+
 func TestZkStore(t *testing.T) {
 	kv := makeZkClient(t)
 	backup := makeZkClient(t)
 
-	testutils.RunTestStore(t, kv, backup)
+	testutils.RunTestCommon(t, kv)
+	testutils.RunTestAtomic(t, kv)
+	testutils.RunTestWatch(t, kv)
+	testutils.RunTestLock(t, kv)
+	testutils.RunTestTTL(t, kv, backup)
+	testutils.RunCleanup(t, kv)
 }

+ 136 - 72
libnetwork/Godeps/_workspace/src/github.com/docker/libkv/testutils/utils.go

@@ -1,6 +1,7 @@
 package testutils
 
 import (
+	"fmt"
 	"testing"
 	"time"
 
@@ -8,50 +9,89 @@ import (
 	"github.com/stretchr/testify/assert"
 )
 
-// RunTestStore is an helper testing method that is
-// called by each K/V backend sub-package testing
-func RunTestStore(t *testing.T, kv store.Store, backup store.Store) {
-	testPutGetDelete(t, kv)
-	testWatch(t, kv)
-	testWatchTree(t, kv)
+// RunTestCommon tests the minimal required APIs which
+// should be supported by all K/V backends
+func RunTestCommon(t *testing.T, kv store.Store) {
+	testPutGetDeleteExists(t, kv)
+	testList(t, kv)
+	testDeleteTree(t, kv)
+}
+
+// RunTestAtomic tests the Atomic operations by the K/V
+// backends
+func RunTestAtomic(t *testing.T, kv store.Store) {
 	testAtomicPut(t, kv)
 	testAtomicPutCreate(t, kv)
 	testAtomicDelete(t, kv)
-	testLockUnlock(t, kv)
-	testPutEphemeral(t, kv, backup)
-	testList(t, kv)
-	testDeleteTree(t, kv)
 }
 
-func testPutGetDelete(t *testing.T, kv store.Store) {
-	key := "foo"
-	value := []byte("bar")
+// RunTestWatch tests the watch/monitor APIs supported
+// by the K/V backends.
+func RunTestWatch(t *testing.T, kv store.Store) {
+	testWatch(t, kv)
+	testWatchTree(t, kv)
+}
 
-	// Put the key
-	err := kv.Put(key, value, nil)
-	assert.NoError(t, err)
+// RunTestLock tests the KV pair Lock/Unlock APIs supported
+// by the K/V backends.
+func RunTestLock(t *testing.T, kv store.Store) {
+	testLockUnlock(t, kv)
+}
 
-	// Get should return the value and an incremented index
-	pair, err := kv.Get(key)
-	assert.NoError(t, err)
-	if assert.NotNil(t, pair) {
-		assert.NotNil(t, pair.Value)
-	}
-	assert.Equal(t, pair.Value, value)
-	assert.NotEqual(t, pair.LastIndex, 0)
+// RunTestTTL tests the TTL funtionality of the K/V backend.
+func RunTestTTL(t *testing.T, kv store.Store, backup store.Store) {
+	testPutTTL(t, kv, backup)
+}
 
-	// Delete the key
-	err = kv.Delete(key)
-	assert.NoError(t, err)
+func testPutGetDeleteExists(t *testing.T, kv store.Store) {
+	// Get a not exist key should return ErrKeyNotFound
+	pair, err := kv.Get("/testPutGetDelete_not_exist_key")
+	assert.Equal(t, store.ErrKeyNotFound, err)
 
-	// Get should fail
-	pair, err = kv.Get(key)
-	assert.Error(t, err)
-	assert.Nil(t, pair)
+	value := []byte("bar")
+	for _, key := range []string{
+		"testPutGetDeleteExists",
+		"testPutGetDeleteExists/",
+		"testPutGetDeleteExists/testbar/",
+		"testPutGetDeleteExists/testbar/testfoobar",
+	} {
+		failMsg := fmt.Sprintf("Fail key %s", key)
+		// Put the key
+		err = kv.Put(key, value, nil)
+		assert.NoError(t, err, failMsg)
+
+		// Get should return the value and an incremented index
+		pair, err = kv.Get(key)
+		assert.NoError(t, err, failMsg)
+		if assert.NotNil(t, pair, failMsg) {
+			assert.NotNil(t, pair.Value, failMsg)
+		}
+		assert.Equal(t, pair.Value, value, failMsg)
+		assert.NotEqual(t, pair.LastIndex, 0, failMsg)
+
+		// Exists should return true
+		exists, err := kv.Exists(key)
+		assert.NoError(t, err, failMsg)
+		assert.True(t, exists, failMsg)
+
+		// Delete the key
+		err = kv.Delete(key)
+		assert.NoError(t, err, failMsg)
+
+		// Get should fail
+		pair, err = kv.Get(key)
+		assert.Error(t, err, failMsg)
+		assert.Nil(t, pair, failMsg)
+
+		// Exists should return false
+		exists, err = kv.Exists(key)
+		assert.NoError(t, err, failMsg)
+		assert.False(t, exists, failMsg)
+	}
 }
 
 func testWatch(t *testing.T, kv store.Store) {
-	key := "hello"
+	key := "testWatch"
 	value := []byte("world")
 	newValue := []byte("world!")
 
@@ -108,15 +148,15 @@ func testWatch(t *testing.T, kv store.Store) {
 }
 
 func testWatchTree(t *testing.T, kv store.Store) {
-	dir := "tree"
+	dir := "testWatchTree"
 
-	node1 := "tree/node1"
+	node1 := "testWatchTree/node1"
 	value1 := []byte("node1")
 
-	node2 := "tree/node2"
+	node2 := "testWatchTree/node2"
 	value2 := []byte("node2")
 
-	node3 := "tree/node3"
+	node3 := "testWatchTree/node3"
 	value3 := []byte("node3")
 
 	err := kv.Put(node1, value1, nil)
@@ -162,7 +202,7 @@ func testWatchTree(t *testing.T, kv store.Store) {
 }
 
 func testAtomicPut(t *testing.T, kv store.Store) {
-	key := "hello"
+	key := "testAtomicPut"
 	value := []byte("world")
 
 	// Put the key
@@ -179,18 +219,18 @@ func testAtomicPut(t *testing.T, kv store.Store) {
 	assert.NotEqual(t, pair.LastIndex, 0)
 
 	// This CAS should fail: previous exists.
-	success, _, err := kv.AtomicPut("hello", []byte("WORLD"), nil, nil)
+	success, _, err := kv.AtomicPut(key, []byte("WORLD"), nil, nil)
 	assert.Error(t, err)
 	assert.False(t, success)
 
 	// This CAS should succeed
-	success, _, err = kv.AtomicPut("hello", []byte("WORLD"), pair, nil)
+	success, _, err = kv.AtomicPut(key, []byte("WORLD"), pair, nil)
 	assert.NoError(t, err)
 	assert.True(t, success)
 
 	// This CAS should fail, key exists.
 	pair.LastIndex = 0
-	success, _, err = kv.AtomicPut("hello", []byte("WORLDWORLD"), pair, nil)
+	success, _, err = kv.AtomicPut(key, []byte("WORLDWORLD"), pair, nil)
 	assert.Error(t, err)
 	assert.False(t, success)
 }
@@ -198,7 +238,7 @@ func testAtomicPut(t *testing.T, kv store.Store) {
 func testAtomicPutCreate(t *testing.T, kv store.Store) {
 	// Use a key in a new directory to ensure Stores will create directories
 	// that don't yet exist.
-	key := "put/create"
+	key := "testAtomicPutCreate/create"
 	value := []byte("putcreate")
 
 	// AtomicPut the key, previous = nil indicates create.
@@ -223,14 +263,10 @@ func testAtomicPutCreate(t *testing.T, kv store.Store) {
 	success, _, err = kv.AtomicPut(key, []byte("PUTCREATE"), pair, nil)
 	assert.NoError(t, err)
 	assert.True(t, success)
-
-	// Delete the key, ensures runs of the test don't interfere with each other.
-	err = kv.DeleteTree("put")
-	assert.NoError(t, err)
 }
 
 func testAtomicDelete(t *testing.T, kv store.Store) {
-	key := "atomic"
+	key := "testAtomicDelete"
 	value := []byte("world")
 
 	// Put the key
@@ -262,11 +298,11 @@ func testAtomicDelete(t *testing.T, kv store.Store) {
 }
 
 func testLockUnlock(t *testing.T, kv store.Store) {
-	key := "foo"
+	key := "testLockUnlock"
 	value := []byte("bar")
 
 	// We should be able to create a new lock on key
-	lock, err := kv.NewLock(key, &store.LockOptions{Value: value})
+	lock, err := kv.NewLock(key, &store.LockOptions{Value: value, TTL: 2 * time.Second})
 	assert.NoError(t, err)
 	assert.NotNil(t, lock)
 
@@ -288,6 +324,11 @@ func testLockUnlock(t *testing.T, kv store.Store) {
 	err = lock.Unlock()
 	assert.NoError(t, err)
 
+	// Lock should succeed again
+	lockChan, err = lock.Lock()
+	assert.NoError(t, err)
+	assert.NotNil(t, lockChan)
+
 	// Get should work
 	pair, err = kv.Get(key)
 	assert.NoError(t, err)
@@ -298,19 +339,19 @@ func testLockUnlock(t *testing.T, kv store.Store) {
 	assert.NotEqual(t, pair.LastIndex, 0)
 }
 
-func testPutEphemeral(t *testing.T, kv store.Store, otherConn store.Store) {
-	firstKey := "first"
+func testPutTTL(t *testing.T, kv store.Store, otherConn store.Store) {
+	firstKey := "testPutTTL"
 	firstValue := []byte("foo")
 
 	secondKey := "second"
 	secondValue := []byte("bar")
 
 	// Put the first key with the Ephemeral flag
-	err := otherConn.Put(firstKey, firstValue, &store.WriteOptions{Ephemeral: true})
+	err := otherConn.Put(firstKey, firstValue, &store.WriteOptions{TTL: 2 * time.Second})
 	assert.NoError(t, err)
 
 	// Put a second key with the Ephemeral flag
-	err = otherConn.Put(secondKey, secondValue, &store.WriteOptions{Ephemeral: true})
+	err = otherConn.Put(secondKey, secondValue, &store.WriteOptions{TTL: 2 * time.Second})
 	assert.NoError(t, err)
 
 	// Get on firstKey should work
@@ -341,12 +382,12 @@ func testPutEphemeral(t *testing.T, kv store.Store, otherConn store.Store) {
 }
 
 func testList(t *testing.T, kv store.Store) {
-	prefix := "nodes"
+	prefix := "testList"
 
-	firstKey := "nodes/first"
+	firstKey := "testList/first"
 	firstValue := []byte("first")
 
-	secondKey := "nodes/second"
+	secondKey := "testList/second"
 	secondValue := []byte("second")
 
 	// Put the first key
@@ -358,35 +399,37 @@ func testList(t *testing.T, kv store.Store) {
 	assert.NoError(t, err)
 
 	// List should work and return the two correct values
-	pairs, err := kv.List(prefix)
-	assert.NoError(t, err)
-	if assert.NotNil(t, pairs) {
-		assert.Equal(t, len(pairs), 2)
-	}
-
-	// Check pairs, those are not necessarily in Put order
-	for _, pair := range pairs {
-		if pair.Key == firstKey {
-			assert.Equal(t, pair.Value, firstValue)
+	for _, parent := range []string{prefix, prefix + "/"} {
+		pairs, err := kv.List(parent)
+		assert.NoError(t, err)
+		if assert.NotNil(t, pairs) {
+			assert.Equal(t, len(pairs), 2)
 		}
-		if pair.Key == secondKey {
-			assert.Equal(t, pair.Value, secondValue)
+
+		// Check pairs, those are not necessarily in Put order
+		for _, pair := range pairs {
+			if pair.Key == firstKey {
+				assert.Equal(t, pair.Value, firstValue)
+			}
+			if pair.Key == secondKey {
+				assert.Equal(t, pair.Value, secondValue)
+			}
 		}
 	}
 
 	// List should fail: the key does not exist
-	pairs, err = kv.List("idontexist")
-	assert.Error(t, err)
+	pairs, err := kv.List("idontexist")
+	assert.Equal(t, store.ErrKeyNotFound, err)
 	assert.Nil(t, pairs)
 }
 
 func testDeleteTree(t *testing.T, kv store.Store) {
-	prefix := "nodes"
+	prefix := "testDeleteTree"
 
-	firstKey := "nodes/first"
+	firstKey := "testDeleteTree/first"
 	firstValue := []byte("first")
 
-	secondKey := "nodes/second"
+	secondKey := "testDeleteTree/second"
 	secondValue := []byte("second")
 
 	// Put the first key
@@ -428,3 +471,24 @@ func testDeleteTree(t *testing.T, kv store.Store) {
 	assert.Error(t, err)
 	assert.Nil(t, pair)
 }
+
+// RunCleanup cleans up keys introduced by the tests
+func RunCleanup(t *testing.T, kv store.Store) {
+	for _, key := range []string{
+		"testPutGetDeleteExists",
+		"testWatch",
+		"testWatchTree",
+		"testAtomicPut",
+		"testAtomicPutCreate",
+		"testAtomicDelete",
+		"testLockUnlock",
+		"testPutTTL",
+		"testList",
+		"testDeleteTree",
+	} {
+		err := kv.DeleteTree(key)
+		assert.True(t, err == nil || err == store.ErrKeyNotFound, fmt.Sprintf("failed to delete tree key %s: %v", key, err))
+		err = kv.Delete(key)
+		assert.True(t, err == nil || err == store.ErrKeyNotFound, fmt.Sprintf("failed to delete key %s: %v", key, err))
+	}
+}

+ 9 - 0
libnetwork/datastore/datastore.go

@@ -6,6 +6,9 @@ import (
 
 	"github.com/docker/libkv"
 	"github.com/docker/libkv/store"
+	"github.com/docker/libkv/store/consul"
+	"github.com/docker/libkv/store/etcd"
+	"github.com/docker/libkv/store/zookeeper"
 	"github.com/docker/libnetwork/config"
 	"github.com/docker/libnetwork/types"
 )
@@ -66,6 +69,12 @@ const (
 
 var rootChain = []string{"docker", "libnetwork"}
 
+func init() {
+	consul.Register()
+	zookeeper.Register()
+	etcd.Register()
+}
+
 //Key provides convenient method to create a Key
 func Key(key ...string) string {
 	keychain := append(rootChain, key...)