diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/discovery/README.md b/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/discovery/README.md deleted file mode 100644 index 3c9feeda83..0000000000 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/discovery/README.md +++ /dev/null @@ -1,274 +0,0 @@ ---- -page_title: Docker Swarm discovery -page_description: Swarm discovery -page_keywords: docker, swarm, clustering, discovery ---- - -# Discovery - -Docker Swarm comes with multiple Discovery backends. - -## Backends - -### Hosted Discovery with Docker Hub - -First we create a cluster. - -```bash -# create a cluster -$ swarm create -6856663cdefdec325839a4b7e1de38e8 # <- this is your unique -``` - -Then we create each node and join them to the cluster. - -```bash -# on each of your nodes, start the swarm agent -# doesn't have to be public (eg. 192.168.0.X), -# as long as the swarm manager can access it. -$ swarm join --advertise= token:// -``` - -Finally, we start the Swarm manager. This can be on any machine or even -your laptop. - -```bash -$ swarm manage -H tcp:// token:// -``` - -You can then use regular Docker commands to interact with your swarm. - -```bash -docker -H tcp:// info -docker -H tcp:// run ... -docker -H tcp:// ps -docker -H tcp:// logs ... -... -``` - -You can also list the nodes in your cluster. - -```bash -swarm list token:// - -``` - -### Using a static file describing the cluster - -For each of your nodes, add a line to a file. The node IP address -doesn't need to be public as long the Swarm manager can access it. - -```bash -echo >> /tmp/my_cluster -echo >> /tmp/my_cluster -echo >> /tmp/my_cluster -``` - -Then start the Swarm manager on any machine. - -```bash -swarm manage -H tcp:// file:///tmp/my_cluster -``` - -And then use the regular Docker commands. - -```bash -docker -H tcp:// info -docker -H tcp:// run ... -docker -H tcp:// ps -docker -H tcp:// logs ... -... -``` - -You can list the nodes in your cluster. - -```bash -$ swarm list file:///tmp/my_cluster - - - -``` - -### Using etcd - -On each of your nodes, start the Swarm agent. The node IP address -doesn't have to be public as long as the swarm manager can access it. - -```bash -swarm join --advertise= etcd:/// -``` - -Start the manager on any machine or your laptop. - -```bash -swarm manage -H tcp:// etcd:/// -``` - -And then use the regular Docker commands. - -```bash -docker -H tcp:// info -docker -H tcp:// run ... -docker -H tcp:// ps -docker -H tcp:// logs ... -... -``` - -You can list the nodes in your cluster. - -```bash -swarm list etcd:/// - -``` - -### Using consul - -On each of your nodes, start the Swarm agent. The node IP address -doesn't need to be public as long as the Swarm manager can access it. - -```bash -swarm join --advertise= consul:/// -``` - -Start the manager on any machine or your laptop. - -```bash -swarm manage -H tcp:// consul:/// -``` - -And then use the regular Docker commands. - -```bash -docker -H tcp:// info -docker -H tcp:// run ... -docker -H tcp:// ps -docker -H tcp:// logs ... -... -``` - -You can list the nodes in your cluster. - -```bash -swarm list consul:/// - -``` - -### Using zookeeper - -On each of your nodes, start the Swarm agent. The node IP doesn't have -to be public as long as the swarm manager can access it. - -```bash -swarm join --advertise= zk://,/ -``` - -Start the manager on any machine or your laptop. - -```bash -swarm manage -H tcp:// zk://,/ -``` - -You can then use the regular Docker commands. - -```bash -docker -H tcp:// info -docker -H tcp:// run ... -docker -H tcp:// ps -docker -H tcp:// logs ... -... -``` - -You can list the nodes in the cluster. - -```bash -swarm list zk://,/ - -``` - -### Using a static list of IP addresses - -Start the manager on any machine or your laptop - -```bash -swarm manage -H nodes://, -``` - -Or - -```bash -swarm manage -H , -``` - -Then use the regular Docker commands. - -```bash -docker -H info -docker -H run ... -docker -H ps -docker -H logs ... -... -``` - -### Range pattern for IP addresses - -The `file` and `nodes` discoveries support a range pattern to specify IP -addresses, i.e., `10.0.0.[10:200]` will be a list of nodes starting from -`10.0.0.10` to `10.0.0.200`. - -For example for the `file` discovery method. - -```bash -$ echo "10.0.0.[11:100]:2375" >> /tmp/my_cluster -$ echo "10.0.1.[15:20]:2375" >> /tmp/my_cluster -$ echo "192.168.1.2:[2:20]375" >> /tmp/my_cluster -``` - -Then start the manager. - -```bash -swarm manage -H tcp:// file:///tmp/my_cluster -``` - -And for the `nodes` discovery method. - -```bash -swarm manage -H "nodes://10.0.0.[10:200]:2375,10.0.1.[2:250]:2375" -``` - -## Contributing a new discovery backend - -Contributing a new discovery backend is easy, simply implement this -interface: - -```go -type Discovery interface { - Initialize(string, int) error - Fetch() ([]string, error) - Watch(WatchCallback) - Register(string) error -} -``` - -### Initialize - -The parameters are `discovery` location without the scheme and a heartbeat (in seconds). - -### Fetch - -Returns the list of all the nodes from the discovery. - -### Watch - -Triggers an update (`Fetch`). This can happen either via a timer (like -`token`) or use backend specific features (like `etcd`). - -### Register - -Add a new node to the discovery service. - -## Docker Swarm documentation index - -- [User guide](./index.md) -- [Sheduler strategies](./scheduler/strategy.md) -- [Sheduler filters](./scheduler/filter.md) -- [Swarm API](./API.md) diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/discovery/discovery.go b/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/discovery/discovery.go deleted file mode 100644 index 9942de7cef..0000000000 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/discovery/discovery.go +++ /dev/null @@ -1,166 +0,0 @@ -package discovery - -import ( - "errors" - "fmt" - "net" - "strings" - "time" - - log "github.com/Sirupsen/logrus" -) - -// An Entry represents a swarm host. -type Entry struct { - Host string - Port string -} - -// NewEntry creates a new entry. -func NewEntry(url string) (*Entry, error) { - host, port, err := net.SplitHostPort(url) - if err != nil { - return nil, err - } - return &Entry{host, port}, nil -} - -// String returns the string form of an entry. -func (e *Entry) String() string { - return fmt.Sprintf("%s:%s", e.Host, e.Port) -} - -// Equals returns true if cmp contains the same data. -func (e *Entry) Equals(cmp *Entry) bool { - return e.Host == cmp.Host && e.Port == cmp.Port -} - -// Entries is a list of *Entry with some helpers. -type Entries []*Entry - -// Equals returns true if cmp contains the same data. -func (e Entries) Equals(cmp Entries) bool { - // Check if the file has really changed. - if len(e) != len(cmp) { - return false - } - for i := range e { - if !e[i].Equals(cmp[i]) { - return false - } - } - return true -} - -// Contains returns true if the Entries contain a given Entry. -func (e Entries) Contains(entry *Entry) bool { - for _, curr := range e { - if curr.Equals(entry) { - return true - } - } - return false -} - -// Diff compares two entries and returns the added and removed entries. -func (e Entries) Diff(cmp Entries) (Entries, Entries) { - added := Entries{} - for _, entry := range cmp { - if !e.Contains(entry) { - added = append(added, entry) - } - } - - removed := Entries{} - for _, entry := range e { - if !cmp.Contains(entry) { - removed = append(removed, entry) - } - } - - return added, removed -} - -// The Discovery interface is implemented by Discovery backends which -// manage swarm host entries. -type Discovery interface { - // Initialize the discovery with URIs, a heartbeat and a ttl. - Initialize(string, time.Duration, time.Duration) error - - // Watch the discovery for entry changes. - // Returns a channel that will receive changes or an error. - // Providing a non-nil stopCh can be used to stop watching. - Watch(stopCh <-chan struct{}) (<-chan Entries, <-chan error) - - // Register to the discovery - Register(string) error -} - -var ( - discoveries map[string]Discovery - // ErrNotSupported is returned when a discovery service is not supported. - ErrNotSupported = errors.New("discovery service not supported") - // ErrNotImplemented is returned when discovery feature is not implemented - // by discovery backend. - ErrNotImplemented = errors.New("not implemented in this discovery service") -) - -func init() { - discoveries = make(map[string]Discovery) -} - -// Register makes a discovery backend available by the provided scheme. -// If Register is called twice with the same scheme an error is returned. -func Register(scheme string, d Discovery) error { - if _, exists := discoveries[scheme]; exists { - return fmt.Errorf("scheme already registered %s", scheme) - } - log.WithField("name", scheme).Debug("Registering discovery service") - discoveries[scheme] = d - - return nil -} - -func parse(rawurl string) (string, string) { - parts := strings.SplitN(rawurl, "://", 2) - - // nodes:port,node2:port => nodes://node1:port,node2:port - if len(parts) == 1 { - return "nodes", parts[0] - } - return parts[0], parts[1] -} - -// New returns a new Discovery given a URL, heartbeat and ttl settings. -// Returns an error if the URL scheme is not supported. -func New(rawurl string, heartbeat time.Duration, ttl time.Duration) (Discovery, error) { - scheme, uri := parse(rawurl) - - if discovery, exists := discoveries[scheme]; exists { - log.WithFields(log.Fields{"name": scheme, "uri": uri}).Debug("Initializing discovery service") - err := discovery.Initialize(uri, heartbeat, ttl) - return discovery, err - } - - return nil, ErrNotSupported -} - -// CreateEntries returns an array of entries based on the given addresses. -func CreateEntries(addrs []string) (Entries, error) { - entries := Entries{} - if addrs == nil { - return entries, nil - } - - for _, addr := range addrs { - if len(addr) == 0 { - continue - } - entry, err := NewEntry(addr) - if err != nil { - return nil, err - } - entries = append(entries, entry) - } - return entries, nil -} diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/discovery/discovery_test.go b/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/discovery/discovery_test.go deleted file mode 100644 index b7128ff258..0000000000 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/discovery/discovery_test.go +++ /dev/null @@ -1,120 +0,0 @@ -package discovery - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestNewEntry(t *testing.T) { - entry, err := NewEntry("127.0.0.1:2375") - assert.NoError(t, err) - assert.True(t, entry.Equals(&Entry{Host: "127.0.0.1", Port: "2375"})) - assert.Equal(t, entry.String(), "127.0.0.1:2375") - - _, err = NewEntry("127.0.0.1") - assert.Error(t, err) -} - -func TestParse(t *testing.T) { - scheme, uri := parse("127.0.0.1:2375") - assert.Equal(t, scheme, "nodes") - assert.Equal(t, uri, "127.0.0.1:2375") - - scheme, uri = parse("localhost:2375") - assert.Equal(t, scheme, "nodes") - assert.Equal(t, uri, "localhost:2375") - - scheme, uri = parse("scheme://127.0.0.1:2375") - assert.Equal(t, scheme, "scheme") - assert.Equal(t, uri, "127.0.0.1:2375") - - scheme, uri = parse("scheme://localhost:2375") - assert.Equal(t, scheme, "scheme") - assert.Equal(t, uri, "localhost:2375") - - scheme, uri = parse("") - assert.Equal(t, scheme, "nodes") - assert.Equal(t, uri, "") -} - -func TestCreateEntries(t *testing.T) { - entries, err := CreateEntries(nil) - assert.Equal(t, entries, Entries{}) - assert.NoError(t, err) - - entries, err = CreateEntries([]string{"127.0.0.1:2375", "127.0.0.2:2375", ""}) - assert.NoError(t, err) - expected := Entries{ - &Entry{Host: "127.0.0.1", Port: "2375"}, - &Entry{Host: "127.0.0.2", Port: "2375"}, - } - assert.True(t, entries.Equals(expected)) - - _, err = CreateEntries([]string{"127.0.0.1", "127.0.0.2"}) - assert.Error(t, err) -} - -func TestContainsEntry(t *testing.T) { - entries, err := CreateEntries([]string{"127.0.0.1:2375", "127.0.0.2:2375", ""}) - assert.NoError(t, err) - assert.True(t, entries.Contains(&Entry{Host: "127.0.0.1", Port: "2375"})) - assert.False(t, entries.Contains(&Entry{Host: "127.0.0.3", Port: "2375"})) -} - -func TestEntriesEquality(t *testing.T) { - entries := Entries{ - &Entry{Host: "127.0.0.1", Port: "2375"}, - &Entry{Host: "127.0.0.2", Port: "2375"}, - } - - // Same - assert.True(t, entries.Equals(Entries{ - &Entry{Host: "127.0.0.1", Port: "2375"}, - &Entry{Host: "127.0.0.2", Port: "2375"}, - })) - - // Different size - assert.False(t, entries.Equals(Entries{ - &Entry{Host: "127.0.0.1", Port: "2375"}, - &Entry{Host: "127.0.0.2", Port: "2375"}, - &Entry{Host: "127.0.0.3", Port: "2375"}, - })) - - // Different content - assert.False(t, entries.Equals(Entries{ - &Entry{Host: "127.0.0.1", Port: "2375"}, - &Entry{Host: "127.0.0.42", Port: "2375"}, - })) -} - -func TestEntriesDiff(t *testing.T) { - entry1 := &Entry{Host: "1.1.1.1", Port: "1111"} - entry2 := &Entry{Host: "2.2.2.2", Port: "2222"} - entry3 := &Entry{Host: "3.3.3.3", Port: "3333"} - entries := Entries{entry1, entry2} - - // No diff - added, removed := entries.Diff(Entries{entry2, entry1}) - assert.Empty(t, added) - assert.Empty(t, removed) - - // Add - added, removed = entries.Diff(Entries{entry2, entry3, entry1}) - assert.Len(t, added, 1) - assert.True(t, added.Contains(entry3)) - assert.Empty(t, removed) - - // Remove - added, removed = entries.Diff(Entries{entry2}) - assert.Empty(t, added) - assert.Len(t, removed, 1) - assert.True(t, removed.Contains(entry1)) - - // Add and remove - added, removed = entries.Diff(Entries{entry1, entry3}) - assert.Len(t, added, 1) - assert.True(t, added.Contains(entry3)) - assert.Len(t, removed, 1) - assert.True(t, removed.Contains(entry2)) -} diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/discovery/file/file.go b/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/discovery/file/file.go deleted file mode 100644 index 3e1566ea59..0000000000 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/discovery/file/file.go +++ /dev/null @@ -1,109 +0,0 @@ -package file - -import ( - "fmt" - "io/ioutil" - "strings" - "time" - - "github.com/docker/swarm/discovery" -) - -// Discovery is exported -type Discovery struct { - heartbeat time.Duration - path string -} - -func init() { - Init() -} - -// Init is exported -func Init() { - discovery.Register("file", &Discovery{}) -} - -// Initialize is exported -func (s *Discovery) Initialize(path string, heartbeat time.Duration, ttl time.Duration) error { - s.path = path - s.heartbeat = heartbeat - return nil -} - -func parseFileContent(content []byte) []string { - var result []string - for _, line := range strings.Split(strings.TrimSpace(string(content)), "\n") { - line = strings.TrimSpace(line) - // Ignoring line starts with # - if strings.HasPrefix(line, "#") { - continue - } - // Inlined # comment also ignored. - if strings.Contains(line, "#") { - line = line[0:strings.Index(line, "#")] - // Trim additional spaces caused by above stripping. - line = strings.TrimSpace(line) - } - for _, ip := range discovery.Generate(line) { - result = append(result, ip) - } - } - return result -} - -func (s *Discovery) fetch() (discovery.Entries, error) { - fileContent, err := ioutil.ReadFile(s.path) - if err != nil { - return nil, fmt.Errorf("failed to read '%s': %v", s.path, err) - } - return discovery.CreateEntries(parseFileContent(fileContent)) -} - -// Watch is exported -func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) { - ch := make(chan discovery.Entries) - errCh := make(chan error) - ticker := time.NewTicker(s.heartbeat) - - go func() { - defer close(errCh) - defer close(ch) - - // Send the initial entries if available. - currentEntries, err := s.fetch() - if err != nil { - errCh <- err - } else { - ch <- currentEntries - } - - // Periodically send updates. - for { - select { - case <-ticker.C: - newEntries, err := s.fetch() - if err != nil { - errCh <- err - continue - } - - // Check if the file has really changed. - if !newEntries.Equals(currentEntries) { - ch <- newEntries - } - currentEntries = newEntries - case <-stopCh: - ticker.Stop() - return - } - } - }() - - return ch, errCh -} - -// Register is exported -func (s *Discovery) Register(addr string) error { - return discovery.ErrNotImplemented -} diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/discovery/file/file_test.go b/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/discovery/file/file_test.go deleted file mode 100644 index 6e861f9a82..0000000000 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/discovery/file/file_test.go +++ /dev/null @@ -1,106 +0,0 @@ -package file - -import ( - "io/ioutil" - "os" - "testing" - - "github.com/docker/swarm/discovery" - "github.com/stretchr/testify/assert" -) - -func TestInitialize(t *testing.T) { - d := &Discovery{} - d.Initialize("/path/to/file", 1000, 0) - assert.Equal(t, d.path, "/path/to/file") -} - -func TestNew(t *testing.T) { - d, err := discovery.New("file:///path/to/file", 0, 0) - assert.NoError(t, err) - assert.Equal(t, d.(*Discovery).path, "/path/to/file") -} - -func TestContent(t *testing.T) { - data := ` -1.1.1.[1:2]:1111 -2.2.2.[2:4]:2222 -` - ips := parseFileContent([]byte(data)) - assert.Len(t, ips, 5) - assert.Equal(t, ips[0], "1.1.1.1:1111") - assert.Equal(t, ips[1], "1.1.1.2:1111") - assert.Equal(t, ips[2], "2.2.2.2:2222") - assert.Equal(t, ips[3], "2.2.2.3:2222") - assert.Equal(t, ips[4], "2.2.2.4:2222") -} - -func TestRegister(t *testing.T) { - discovery := &Discovery{path: "/path/to/file"} - assert.Error(t, discovery.Register("0.0.0.0")) -} - -func TestParsingContentsWithComments(t *testing.T) { - data := ` -### test ### -1.1.1.1:1111 # inline comment -# 2.2.2.2:2222 - ### empty line with comment - 3.3.3.3:3333 -### test ### -` - ips := parseFileContent([]byte(data)) - assert.Len(t, ips, 2) - assert.Equal(t, "1.1.1.1:1111", ips[0]) - assert.Equal(t, "3.3.3.3:3333", ips[1]) -} - -func TestWatch(t *testing.T) { - data := ` -1.1.1.1:1111 -2.2.2.2:2222 -` - expected := discovery.Entries{ - &discovery.Entry{Host: "1.1.1.1", Port: "1111"}, - &discovery.Entry{Host: "2.2.2.2", Port: "2222"}, - } - - // Create a temporary file and remove it. - tmp, err := ioutil.TempFile(os.TempDir(), "discovery-file-test") - assert.NoError(t, err) - assert.NoError(t, tmp.Close()) - assert.NoError(t, os.Remove(tmp.Name())) - - // Set up file discovery. - d := &Discovery{} - d.Initialize(tmp.Name(), 1000, 0) - stopCh := make(chan struct{}) - ch, errCh := d.Watch(stopCh) - - // Make sure it fires errors since the file doesn't exist. - assert.Error(t, <-errCh) - // We have to drain the error channel otherwise Watch will get stuck. - go func() { - for _ = range errCh { - } - }() - - // Write the file and make sure we get the expected value back. - assert.NoError(t, ioutil.WriteFile(tmp.Name(), []byte(data), 0600)) - assert.Equal(t, expected, <-ch) - - // Add a new entry and look it up. - expected = append(expected, &discovery.Entry{Host: "3.3.3.3", Port: "3333"}) - f, err := os.OpenFile(tmp.Name(), os.O_APPEND|os.O_WRONLY, 0600) - assert.NoError(t, err) - assert.NotNil(t, f) - _, err = f.WriteString("\n3.3.3.3:3333\n") - assert.NoError(t, err) - f.Close() - assert.Equal(t, expected, <-ch) - - // Stop and make sure it closes all channels. - close(stopCh) - assert.Nil(t, <-ch) - assert.Nil(t, <-errCh) -} diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/discovery/generator.go b/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/discovery/generator.go deleted file mode 100644 index d22298298f..0000000000 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/discovery/generator.go +++ /dev/null @@ -1,35 +0,0 @@ -package discovery - -import ( - "fmt" - "regexp" - "strconv" -) - -// Generate takes care of IP generation -func Generate(pattern string) []string { - re, _ := regexp.Compile(`\[(.+):(.+)\]`) - submatch := re.FindStringSubmatch(pattern) - if submatch == nil { - return []string{pattern} - } - - from, err := strconv.Atoi(submatch[1]) - if err != nil { - return []string{pattern} - } - to, err := strconv.Atoi(submatch[2]) - if err != nil { - return []string{pattern} - } - - template := re.ReplaceAllString(pattern, "%d") - - var result []string - for val := from; val <= to; val++ { - entry := fmt.Sprintf(template, val) - result = append(result, entry) - } - - return result -} diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/discovery/generator_test.go b/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/discovery/generator_test.go deleted file mode 100644 index 747334452f..0000000000 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/discovery/generator_test.go +++ /dev/null @@ -1,55 +0,0 @@ -package discovery - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestGeneratorNotGenerate(t *testing.T) { - ips := Generate("127.0.0.1") - assert.Equal(t, len(ips), 1) - assert.Equal(t, ips[0], "127.0.0.1") -} - -func TestGeneratorWithPortNotGenerate(t *testing.T) { - ips := Generate("127.0.0.1:8080") - assert.Equal(t, len(ips), 1) - assert.Equal(t, ips[0], "127.0.0.1:8080") -} - -func TestGeneratorMatchFailedNotGenerate(t *testing.T) { - ips := Generate("127.0.0.[1]") - assert.Equal(t, len(ips), 1) - assert.Equal(t, ips[0], "127.0.0.[1]") -} - -func TestGeneratorWithPort(t *testing.T) { - ips := Generate("127.0.0.[1:11]:2375") - assert.Equal(t, len(ips), 11) - assert.Equal(t, ips[0], "127.0.0.1:2375") - assert.Equal(t, ips[1], "127.0.0.2:2375") - assert.Equal(t, ips[2], "127.0.0.3:2375") - assert.Equal(t, ips[3], "127.0.0.4:2375") - assert.Equal(t, ips[4], "127.0.0.5:2375") - assert.Equal(t, ips[5], "127.0.0.6:2375") - assert.Equal(t, ips[6], "127.0.0.7:2375") - assert.Equal(t, ips[7], "127.0.0.8:2375") - assert.Equal(t, ips[8], "127.0.0.9:2375") - assert.Equal(t, ips[9], "127.0.0.10:2375") - assert.Equal(t, ips[10], "127.0.0.11:2375") -} - -func TestGenerateWithMalformedInputAtRangeStart(t *testing.T) { - malformedInput := "127.0.0.[x:11]:2375" - ips := Generate(malformedInput) - assert.Equal(t, len(ips), 1) - assert.Equal(t, ips[0], malformedInput) -} - -func TestGenerateWithMalformedInputAtRangeEnd(t *testing.T) { - malformedInput := "127.0.0.[1:x]:2375" - ips := Generate(malformedInput) - assert.Equal(t, len(ips), 1) - assert.Equal(t, ips[0], malformedInput) -} diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/discovery/kv/kv.go b/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/discovery/kv/kv.go deleted file mode 100644 index cd5f6506cf..0000000000 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/discovery/kv/kv.go +++ /dev/null @@ -1,140 +0,0 @@ -package kv - -import ( - "fmt" - "path" - "strings" - "time" - - log "github.com/Sirupsen/logrus" - "github.com/docker/swarm/discovery" - "github.com/docker/swarm/pkg/store" -) - -const ( - discoveryPath = "docker/swarm/nodes" -) - -// Discovery is exported -type Discovery struct { - backend store.Backend - store store.Store - heartbeat time.Duration - ttl time.Duration - path string -} - -func init() { - Init() -} - -// Init is exported -func Init() { - discovery.Register("zk", &Discovery{backend: store.ZK}) - discovery.Register("consul", &Discovery{backend: store.CONSUL}) - discovery.Register("etcd", &Discovery{backend: store.ETCD}) -} - -// Initialize is exported -func (s *Discovery) Initialize(uris string, heartbeat time.Duration, ttl time.Duration) error { - var ( - parts = strings.SplitN(uris, "/", 2) - addrs = strings.Split(parts[0], ",") - prefix = "" - err error - ) - - // A custom prefix to the path can be optionally used. - if len(parts) == 2 { - prefix = parts[1] - } - - s.heartbeat = heartbeat - s.ttl = ttl - s.path = path.Join(prefix, discoveryPath) - - // Creates a new store, will ignore options given - // if not supported by the chosen store - s.store, err = store.NewStore( - s.backend, - addrs, - &store.Config{ - EphemeralTTL: s.ttl, - }, - ) - - return err -} - -// Watch the store until either there's a store error or we receive a stop request. -// Returns false if we shouldn't attempt watching the store anymore (stop request received). -func (s *Discovery) watchOnce(stopCh <-chan struct{}, watchCh <-chan []*store.KVPair, discoveryCh chan discovery.Entries, errCh chan error) bool { - for { - select { - case pairs := <-watchCh: - if pairs == nil { - return true - } - - log.WithField("discovery", s.backend).Debugf("Watch triggered with %d nodes", len(pairs)) - - // Convert `KVPair` into `discovery.Entry`. - addrs := make([]string, len(pairs)) - for _, pair := range pairs { - addrs = append(addrs, string(pair.Value)) - } - - entries, err := discovery.CreateEntries(addrs) - if err != nil { - errCh <- err - } else { - discoveryCh <- entries - } - case <-stopCh: - // We were requested to stop watching. - return false - } - } -} - -// Watch is exported -func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) { - ch := make(chan discovery.Entries) - errCh := make(chan error) - - go func() { - defer close(ch) - defer close(errCh) - - // Forever: Create a store watch, watch until we get an error and then try again. - // Will only stop if we receive a stopCh request. - for { - // Set up a watch. - watchCh, err := s.store.WatchTree(s.path, stopCh) - if err != nil { - errCh <- err - } else { - if !s.watchOnce(stopCh, watchCh, ch, errCh) { - return - } - } - - // If we get here it means the store watch channel was closed. This - // is unexpected so let's retry later. - errCh <- fmt.Errorf("Unexpected watch error") - time.Sleep(s.heartbeat) - } - }() - return ch, errCh -} - -// Register is exported -func (s *Discovery) Register(addr string) error { - opts := &store.WriteOptions{Ephemeral: true, Heartbeat: s.heartbeat} - return s.store.Put(path.Join(s.path, addr), []byte(addr), opts) -} - -// Store returns the underlying store used by KV discovery. -func (s *Discovery) Store() store.Store { - return s.store -} diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/discovery/kv/kv_test.go b/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/discovery/kv/kv_test.go deleted file mode 100644 index 39d5d74063..0000000000 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/discovery/kv/kv_test.go +++ /dev/null @@ -1,94 +0,0 @@ -package kv - -import ( - "errors" - "path" - "testing" - "time" - - "github.com/docker/swarm/discovery" - "github.com/docker/swarm/pkg/store" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" -) - -func TestInitialize(t *testing.T) { - d := &Discovery{backend: store.MOCK} - assert.NoError(t, d.Initialize("127.0.0.1", 0, 0)) - s := d.store.(*store.Mock) - assert.Len(t, s.Endpoints, 1) - assert.Equal(t, s.Endpoints[0], "127.0.0.1") - assert.Equal(t, d.path, discoveryPath) - - d = &Discovery{backend: store.MOCK} - assert.NoError(t, d.Initialize("127.0.0.1:1234/path", 0, 0)) - s = d.store.(*store.Mock) - assert.Len(t, s.Endpoints, 1) - assert.Equal(t, s.Endpoints[0], "127.0.0.1:1234") - assert.Equal(t, d.path, "path/"+discoveryPath) - - d = &Discovery{backend: store.MOCK} - assert.NoError(t, d.Initialize("127.0.0.1:1234,127.0.0.2:1234,127.0.0.3:1234/path", 0, 0)) - s = d.store.(*store.Mock) - assert.Len(t, s.Endpoints, 3) - assert.Equal(t, s.Endpoints[0], "127.0.0.1:1234") - assert.Equal(t, s.Endpoints[1], "127.0.0.2:1234") - assert.Equal(t, s.Endpoints[2], "127.0.0.3:1234") - assert.Equal(t, d.path, "path/"+discoveryPath) -} - -func TestWatch(t *testing.T) { - d := &Discovery{backend: store.MOCK} - assert.NoError(t, d.Initialize("127.0.0.1:1234/path", 0, 0)) - s := d.store.(*store.Mock) - - mockCh := make(chan []*store.KVPair) - - // The first watch will fail. - s.On("WatchTree", "path/"+discoveryPath, mock.Anything).Return(mockCh, errors.New("test error")).Once() - // The second one will succeed. - s.On("WatchTree", "path/"+discoveryPath, mock.Anything).Return(mockCh, nil).Once() - expected := discovery.Entries{ - &discovery.Entry{Host: "1.1.1.1", Port: "1111"}, - &discovery.Entry{Host: "2.2.2.2", Port: "2222"}, - } - kvs := []*store.KVPair{ - {Key: path.Join("path", discoveryPath, "1.1.1.1"), Value: []byte("1.1.1.1:1111")}, - {Key: path.Join("path", discoveryPath, "2.2.2.2"), Value: []byte("2.2.2.2:2222")}, - } - - stopCh := make(chan struct{}) - ch, errCh := d.Watch(stopCh) - - // It should fire an error since the first WatchRange call failed. - assert.EqualError(t, <-errCh, "test error") - // We have to drain the error channel otherwise Watch will get stuck. - go func() { - for _ = range errCh { - } - }() - - // Push the entries into the store channel and make sure discovery emits. - mockCh <- kvs - assert.Equal(t, <-ch, expected) - - // Add a new entry. - expected = append(expected, &discovery.Entry{Host: "3.3.3.3", Port: "3333"}) - kvs = append(kvs, &store.KVPair{Key: path.Join("path", discoveryPath, "3.3.3.3"), Value: []byte("3.3.3.3:3333")}) - mockCh <- kvs - assert.Equal(t, <-ch, expected) - - // Make sure that if an error occurs it retries. - // This third call to WatchTree will be checked later by AssertExpectations. - s.On("WatchTree", "path/"+discoveryPath, mock.Anything).Return(mockCh, nil) - close(mockCh) - // Give it enough time to call WatchTree. - time.Sleep(3) - - // Stop and make sure it closes all channels. - close(stopCh) - assert.Nil(t, <-ch) - assert.Nil(t, <-errCh) - - s.AssertExpectations(t) -} diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/discovery/nodes/nodes.go b/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/discovery/nodes/nodes.go deleted file mode 100644 index 7de7ae2f57..0000000000 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/discovery/nodes/nodes.go +++ /dev/null @@ -1,53 +0,0 @@ -package nodes - -import ( - "strings" - "time" - - "github.com/docker/swarm/discovery" -) - -// Discovery is exported -type Discovery struct { - entries discovery.Entries -} - -func init() { - Init() -} - -// Init is exported -func Init() { - discovery.Register("nodes", &Discovery{}) -} - -// Initialize is exported -func (s *Discovery) Initialize(uris string, _ time.Duration, _ time.Duration) error { - for _, input := range strings.Split(uris, ",") { - for _, ip := range discovery.Generate(input) { - entry, err := discovery.NewEntry(ip) - if err != nil { - return err - } - s.entries = append(s.entries, entry) - } - } - - return nil -} - -// Watch is exported -func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) { - ch := make(chan discovery.Entries) - go func() { - defer close(ch) - ch <- s.entries - <-stopCh - }() - return ch, nil -} - -// Register is exported -func (s *Discovery) Register(addr string) error { - return discovery.ErrNotImplemented -} diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/discovery/nodes/nodes_test.go b/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/discovery/nodes/nodes_test.go deleted file mode 100644 index d59e38621d..0000000000 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/discovery/nodes/nodes_test.go +++ /dev/null @@ -1,43 +0,0 @@ -package nodes - -import ( - "testing" - - "github.com/docker/swarm/discovery" - "github.com/stretchr/testify/assert" -) - -func TestInitialize(t *testing.T) { - d := &Discovery{} - d.Initialize("1.1.1.1:1111,2.2.2.2:2222", 0, 0) - assert.Equal(t, len(d.entries), 2) - assert.Equal(t, d.entries[0].String(), "1.1.1.1:1111") - assert.Equal(t, d.entries[1].String(), "2.2.2.2:2222") -} - -func TestInitializeWithPattern(t *testing.T) { - d := &Discovery{} - d.Initialize("1.1.1.[1:2]:1111,2.2.2.[2:4]:2222", 0, 0) - assert.Equal(t, len(d.entries), 5) - assert.Equal(t, d.entries[0].String(), "1.1.1.1:1111") - assert.Equal(t, d.entries[1].String(), "1.1.1.2:1111") - assert.Equal(t, d.entries[2].String(), "2.2.2.2:2222") - assert.Equal(t, d.entries[3].String(), "2.2.2.3:2222") - assert.Equal(t, d.entries[4].String(), "2.2.2.4:2222") -} - -func TestWatch(t *testing.T) { - d := &Discovery{} - d.Initialize("1.1.1.1:1111,2.2.2.2:2222", 0, 0) - expected := discovery.Entries{ - &discovery.Entry{Host: "1.1.1.1", Port: "1111"}, - &discovery.Entry{Host: "2.2.2.2", Port: "2222"}, - } - ch, _ := d.Watch(nil) - assert.True(t, expected.Equals(<-ch)) -} - -func TestRegister(t *testing.T) { - d := &Discovery{} - assert.Error(t, d.Register("0.0.0.0")) -} diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/discovery/token/README.md b/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/discovery/token/README.md deleted file mode 100644 index 78d6cc7f2c..0000000000 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/discovery/token/README.md +++ /dev/null @@ -1,31 +0,0 @@ -#discovery-stage.hub.docker.com - -Docker Swarm comes with a simple discovery service built into the [Docker Hub](http://hub.docker.com) - -The discovery service is still in alpha stage and currently hosted at `https://discovery-stage.hub.docker.com` - -#####Create a new cluster -`-> POST https://discovery-stage.hub.docker.com/v1/clusters` - -`<- ` - -#####Add new nodes to a cluster -`-> POST https://discovery-stage.hub.docker.com/v1/clusters/ Request body: ":"` - -`<- OK` - -`-> POST https://discovery-stage.hub.docker.com/v1/clusters/ Request body: ":")` - -`<- OK` - - -#####List nodes in a cluster -`-> GET https://discovery-stage.hub.docker.com/v1/clusters/` - -`<- [":", ":"]` - - -#####Delete a cluster (all the nodes in a cluster) -`-> DELETE https://discovery-stage.hub.docker.com/v1/clusters/` - -`<- OK` diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/discovery/token/token.go b/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/discovery/token/token.go deleted file mode 100644 index 71f03b7179..0000000000 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/discovery/token/token.go +++ /dev/null @@ -1,143 +0,0 @@ -package token - -import ( - "encoding/json" - "errors" - "fmt" - "io/ioutil" - "net/http" - "strings" - "time" - - "github.com/docker/swarm/discovery" -) - -// DiscoveryUrl is exported -const DiscoveryURL = "https://discovery-stage.hub.docker.com/v1" - -// Discovery is exported -type Discovery struct { - heartbeat time.Duration - ttl time.Duration - url string - token string -} - -func init() { - Init() -} - -// Init is exported -func Init() { - discovery.Register("token", &Discovery{}) -} - -// Initialize is exported -func (s *Discovery) Initialize(urltoken string, heartbeat time.Duration, ttl time.Duration) error { - if i := strings.LastIndex(urltoken, "/"); i != -1 { - s.url = "https://" + urltoken[:i] - s.token = urltoken[i+1:] - } else { - s.url = DiscoveryURL - s.token = urltoken - } - - if s.token == "" { - return errors.New("token is empty") - } - s.heartbeat = heartbeat - s.ttl = ttl - - return nil -} - -// Fetch returns the list of entries for the discovery service at the specified endpoint -func (s *Discovery) fetch() (discovery.Entries, error) { - resp, err := http.Get(fmt.Sprintf("%s/%s/%s", s.url, "clusters", s.token)) - if err != nil { - return nil, err - } - - defer resp.Body.Close() - - var addrs []string - if resp.StatusCode == http.StatusOK { - if err := json.NewDecoder(resp.Body).Decode(&addrs); err != nil { - return nil, fmt.Errorf("Failed to decode response: %v", err) - } - } else { - return nil, fmt.Errorf("Failed to fetch entries, Discovery service returned %d HTTP status code", resp.StatusCode) - } - - return discovery.CreateEntries(addrs) -} - -// Watch is exported -func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) { - ch := make(chan discovery.Entries) - ticker := time.NewTicker(s.heartbeat) - errCh := make(chan error) - - go func() { - defer close(ch) - defer close(errCh) - - // Send the initial entries if available. - currentEntries, err := s.fetch() - if err != nil { - errCh <- err - } else { - ch <- currentEntries - } - - // Periodically send updates. - for { - select { - case <-ticker.C: - newEntries, err := s.fetch() - if err != nil { - errCh <- err - continue - } - - // Check if the file has really changed. - if !newEntries.Equals(currentEntries) { - ch <- newEntries - } - currentEntries = newEntries - case <-stopCh: - ticker.Stop() - return - } - } - }() - - return ch, nil -} - -// Register adds a new entry identified by the into the discovery service -func (s *Discovery) Register(addr string) error { - buf := strings.NewReader(addr) - - resp, err := http.Post(fmt.Sprintf("%s/%s/%s", s.url, - "clusters", s.token), "application/json", buf) - - if err != nil { - return err - } - - resp.Body.Close() - return nil -} - -// CreateCluster returns a unique cluster token -func (s *Discovery) CreateCluster() (string, error) { - resp, err := http.Post(fmt.Sprintf("%s/%s", s.url, "clusters"), "", nil) - if err != nil { - return "", err - } - - defer resp.Body.Close() - token, err := ioutil.ReadAll(resp.Body) - return string(token), err -} diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/discovery/token/token_test.go b/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/discovery/token/token_test.go deleted file mode 100644 index 16e3fa6ab9..0000000000 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/discovery/token/token_test.go +++ /dev/null @@ -1,48 +0,0 @@ -package token - -import ( - "testing" - "time" - - "github.com/docker/swarm/discovery" - "github.com/stretchr/testify/assert" -) - -func TestInitialize(t *testing.T) { - discovery := &Discovery{} - err := discovery.Initialize("token", 0, 0) - assert.NoError(t, err) - assert.Equal(t, discovery.token, "token") - assert.Equal(t, discovery.url, DiscoveryURL) - - err = discovery.Initialize("custom/path/token", 0, 0) - assert.NoError(t, err) - assert.Equal(t, discovery.token, "token") - assert.Equal(t, discovery.url, "https://custom/path") - - err = discovery.Initialize("", 0, 0) - assert.Error(t, err) -} - -func TestRegister(t *testing.T) { - d := &Discovery{token: "TEST_TOKEN", url: DiscoveryURL, heartbeat: 1} - expected := "127.0.0.1:2675" - expectedEntries, err := discovery.CreateEntries([]string{expected}) - assert.NoError(t, err) - - // Register - assert.NoError(t, d.Register(expected)) - - // Watch - ch, errCh := d.Watch(nil) - select { - case entries := <-ch: - assert.True(t, entries.Equals(expectedEntries)) - case err := <-errCh: - t.Fatal(err) - case <-time.After(5 * time.Second): - t.Fatal("Timed out") - } - - assert.NoError(t, d.Register(expected)) -} diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/pkg/store/README.md b/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/pkg/store/README.md deleted file mode 100644 index 23fd0f5f14..0000000000 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/pkg/store/README.md +++ /dev/null @@ -1,83 +0,0 @@ -# Storage - -The goal of `pkg/store` is to abstract common store operations for multiple Key/Value backends. - -For example, you can use it to store your metadata or for service discovery to register machines and endpoints inside your cluster. - -As of now, `pkg/store` offers support for `Consul`, `Etcd` and `Zookeeper`. - -## Example of usage - -### Create a new store and use Put/Get - -```go -package main - -import ( - "fmt" - "time" - - log "github.com/Sirupsen/logrus" - "github.com/docker/swarm/store" -) - -func main() { - var ( - // Consul local address - client = "localhost:8500" - ) - - // Initialize a new store with consul - kv, err = store.NewStore( - store.CONSUL, // or "consul" - []string{client}, - &store.Config{ - Timeout: 10*time.Second, - }, - ) - if err != nil { - log.Error("Cannot create store consul") - } - - key := "foo" - err = kv.Put(key, []byte("bar"), nil) - if err != nil { - log.Error("Error trying to put value at key `", key, "`") - } - - pair, err := kv.Get(key) - if err != nil { - log.Error("Error trying accessing value at key `", key, "`") - } - - log.Info("value: ", string(pair.Value)) -} -``` - - - -## Contributing to a new storage backend - -A new **storage backend** should include those calls: - -```go -type Store interface { - Put(key string, value []byte, options *WriteOptions) error - Get(key string) (*KVPair, error) - Delete(key string) error - Exists(key string) (bool, error) - Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error) - WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPair, error) - NewLock(key string, options *LockOptions) (Locker, error) - List(prefix string) ([]*KVPair, error) - DeleteTree(prefix string) error - AtomicPut(key string, value []byte, previous *KVPair, options *WriteOptions) (bool, *KVPair, error) - AtomicDelete(key string, previous *KVPair) (bool, error) -} -``` - -In the case of Swarm and to be eligible as a **discovery backend** only, a K/V store implementation should at least offer `Get`, `Put`, `WatchTree` and `List`. - -`Put` should support usage of `ttl` to be able to remove entries in case of a node failure. - -You can get inspiration from existing backends to create a new one. This interface could be subject to changes to improve the experience of using the library and contributing to a new backend. diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/pkg/store/consul.go b/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/pkg/store/consul.go deleted file mode 100644 index 159840a6b0..0000000000 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/pkg/store/consul.go +++ /dev/null @@ -1,403 +0,0 @@ -package store - -import ( - "crypto/tls" - "net/http" - "strings" - "sync" - "time" - - log "github.com/Sirupsen/logrus" - api "github.com/hashicorp/consul/api" -) - -const ( - // DefaultWatchWaitTime is how long we block for at a time to check if the - // watched key has changed. This affects the minimum time it takes to - // cancel a watch. - DefaultWatchWaitTime = 15 * time.Second -) - -// Consul embeds the client and watches -type Consul struct { - sync.Mutex - config *api.Config - client *api.Client - ephemeralTTL time.Duration - ephemeralSession string -} - -type consulLock struct { - lock *api.Lock -} - -// InitializeConsul creates a new Consul client given -// a list of endpoints and optional tls config -func InitializeConsul(endpoints []string, options *Config) (Store, error) { - s := &Consul{} - - // Create Consul client - config := api.DefaultConfig() - s.config = config - config.HttpClient = http.DefaultClient - config.Address = endpoints[0] - config.Scheme = "http" - - // Set options - if options != nil { - if options.TLS != nil { - s.setTLS(options.TLS) - } - if options.ConnectionTimeout != 0 { - s.setTimeout(options.ConnectionTimeout) - } - if options.EphemeralTTL != 0 { - s.setEphemeralTTL(options.EphemeralTTL) - } - } - - // Creates a new client - client, err := api.NewClient(config) - if err != nil { - log.Errorf("Couldn't initialize consul client..") - return nil, err - } - s.client = client - - return s, nil -} - -// SetTLS sets Consul TLS options -func (s *Consul) setTLS(tls *tls.Config) { - s.config.HttpClient.Transport = &http.Transport{ - TLSClientConfig: tls, - } - s.config.Scheme = "https" -} - -// SetTimeout sets the timout for connecting to Consul -func (s *Consul) setTimeout(time time.Duration) { - s.config.WaitTime = time -} - -// SetEphemeralTTL sets the ttl for ephemeral nodes -func (s *Consul) setEphemeralTTL(ttl time.Duration) { - s.ephemeralTTL = ttl -} - -// createEphemeralSession creates the global session -// once that is used to delete keys at node failure -func (s *Consul) createEphemeralSession() error { - s.Lock() - defer s.Unlock() - - // Create new session - if s.ephemeralSession == "" { - entry := &api.SessionEntry{ - Behavior: api.SessionBehaviorDelete, - TTL: s.ephemeralTTL.String(), - } - // Create global ephemeral keys session - session, _, err := s.client.Session().Create(entry, nil) - if err != nil { - return err - } - s.ephemeralSession = session - } - return nil -} - -// checkActiveSession checks if the key already has a session attached -func (s *Consul) checkActiveSession(key string) (string, error) { - pair, _, err := s.client.KV().Get(key, nil) - if err != nil { - return "", err - } - if pair != nil && pair.Session != "" { - return pair.Session, nil - } - return "", nil -} - -// Normalize the key for usage in Consul -func (s *Consul) normalize(key string) string { - key = normalize(key) - return strings.TrimPrefix(key, "/") -} - -// Get the value at "key", returns the last modified index -// to use in conjunction to CAS calls -func (s *Consul) Get(key string) (*KVPair, error) { - options := &api.QueryOptions{ - AllowStale: false, - RequireConsistent: true, - } - pair, meta, err := s.client.KV().Get(s.normalize(key), options) - if err != nil { - return nil, err - } - if pair == nil { - return nil, ErrKeyNotFound - } - return &KVPair{pair.Key, pair.Value, meta.LastIndex}, nil -} - -// Put a value at "key" -func (s *Consul) Put(key string, value []byte, opts *WriteOptions) error { - - key = s.normalize(key) - - p := &api.KVPair{ - Key: key, - Value: value, - } - - if opts != nil && opts.Ephemeral { - // Check if there is any previous session with an active TTL - previous, err := s.checkActiveSession(key) - if err != nil { - return err - } - - // Create the global ephemeral session if it does not exist yet - if s.ephemeralSession == "" { - if err = s.createEphemeralSession(); err != nil { - return err - } - } - - // If a previous session is still active for that key, use it - // else we use the global ephemeral session - if previous != "" { - p.Session = previous - } else { - p.Session = s.ephemeralSession - } - - // Create lock option with the - // EphemeralSession - lockOpts := &api.LockOptions{ - Key: key, - Session: p.Session, - } - - // Lock and ignore if lock is held - // It's just a placeholder for the - // ephemeral behavior - lock, _ := s.client.LockOpts(lockOpts) - if lock != nil { - lock.Lock(nil) - } - - // Renew the session - _, _, err = s.client.Session().Renew(p.Session, nil) - if err != nil { - s.ephemeralSession = "" - return err - } - } - - _, err := s.client.KV().Put(p, nil) - return err -} - -// Delete a value at "key" -func (s *Consul) Delete(key string) error { - _, err := s.client.KV().Delete(s.normalize(key), nil) - return err -} - -// Exists checks that the key exists inside the store -func (s *Consul) Exists(key string) (bool, error) { - _, err := s.Get(key) - if err != nil && err == ErrKeyNotFound { - return false, err - } - return true, nil -} - -// List the content of a given prefix -func (s *Consul) List(prefix string) ([]*KVPair, error) { - pairs, _, err := s.client.KV().List(s.normalize(prefix), nil) - if err != nil { - return nil, err - } - if len(pairs) == 0 { - return nil, ErrKeyNotFound - } - kv := []*KVPair{} - for _, pair := range pairs { - if pair.Key == prefix { - continue - } - kv = append(kv, &KVPair{pair.Key, pair.Value, pair.ModifyIndex}) - } - return kv, nil -} - -// DeleteTree deletes a range of keys based on prefix -func (s *Consul) DeleteTree(prefix string) error { - _, err := s.client.KV().DeleteTree(s.normalize(prefix), nil) - return err -} - -// Watch changes on a key. -// Returns a channel that will receive changes or an error. -// Upon creating a watch, the current value will be sent to the channel. -// Providing a non-nil stopCh can be used to stop watching. -func (s *Consul) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error) { - key = s.normalize(key) - kv := s.client.KV() - watchCh := make(chan *KVPair) - - go func() { - defer close(watchCh) - - // Use a wait time in order to check if we should quit from time to - // time. - opts := &api.QueryOptions{WaitTime: DefaultWatchWaitTime} - for { - // Check if we should quit - select { - case <-stopCh: - return - default: - } - pair, meta, err := kv.Get(key, opts) - if err != nil { - log.Errorf("consul: %v", err) - return - } - // If LastIndex didn't change then it means `Get` returned because - // of the WaitTime and the key didn't change. - if opts.WaitIndex == meta.LastIndex { - continue - } - opts.WaitIndex = meta.LastIndex - // FIXME: What happens when a key is deleted? - if pair != nil { - watchCh <- &KVPair{pair.Key, pair.Value, pair.ModifyIndex} - } - } - }() - - return watchCh, nil -} - -// WatchTree watches changes on a "directory" -// Returns a channel that will receive changes or an error. -// Upon creating a watch, the current value will be sent to the channel. -// Providing a non-nil stopCh can be used to stop watching. -func (s *Consul) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPair, error) { - prefix = s.normalize(prefix) - kv := s.client.KV() - watchCh := make(chan []*KVPair) - - go func() { - defer close(watchCh) - - // Use a wait time in order to check if we should quit from time to - // time. - opts := &api.QueryOptions{WaitTime: DefaultWatchWaitTime} - for { - // Check if we should quit - select { - case <-stopCh: - return - default: - } - - pairs, meta, err := kv.List(prefix, opts) - if err != nil { - log.Errorf("consul: %v", err) - return - } - // If LastIndex didn't change then it means `Get` returned because - // of the WaitTime and the key didn't change. - if opts.WaitIndex == meta.LastIndex { - continue - } - opts.WaitIndex = meta.LastIndex - kv := []*KVPair{} - for _, pair := range pairs { - if pair.Key == prefix { - continue - } - kv = append(kv, &KVPair{pair.Key, pair.Value, pair.ModifyIndex}) - } - watchCh <- kv - } - }() - - return watchCh, nil -} - -// NewLock returns a handle to a lock struct which can be used to acquire and -// release the mutex. -func (s *Consul) NewLock(key string, options *LockOptions) (Locker, error) { - consulOpts := &api.LockOptions{ - Key: s.normalize(key), - } - if options != nil { - consulOpts.Value = options.Value - } - l, err := s.client.LockOpts(consulOpts) - if err != nil { - return nil, err - } - return &consulLock{lock: l}, nil -} - -// Lock attempts to acquire the lock and blocks while doing so. -// Returns a channel that is closed if our lock is lost or an error. -func (l *consulLock) Lock() (<-chan struct{}, error) { - return l.lock.Lock(nil) -} - -// Unlock released the lock. It is an error to call this -// if the lock is not currently held. -func (l *consulLock) Unlock() error { - return l.lock.Unlock() -} - -// AtomicPut put a value at "key" if the key has not been -// modified in the meantime, throws an error if this is the case -func (s *Consul) AtomicPut(key string, value []byte, previous *KVPair, options *WriteOptions) (bool, *KVPair, error) { - if previous == nil { - return false, nil, ErrPreviousNotSpecified - } - - p := &api.KVPair{Key: s.normalize(key), Value: value, ModifyIndex: previous.LastIndex} - if work, _, err := s.client.KV().CAS(p, nil); err != nil { - return false, nil, err - } else if !work { - return false, nil, ErrKeyModified - } - - pair, err := s.Get(key) - if err != nil { - return false, nil, err - } - return true, pair, nil -} - -// AtomicDelete deletes a value at "key" if the key has not -// been modified in the meantime, throws an error if this is the case -func (s *Consul) AtomicDelete(key string, previous *KVPair) (bool, error) { - if previous == nil { - return false, ErrPreviousNotSpecified - } - - p := &api.KVPair{Key: s.normalize(key), ModifyIndex: previous.LastIndex} - if work, _, err := s.client.KV().DeleteCAS(p, nil); err != nil { - return false, err - } else if !work { - return false, ErrKeyModified - } - return true, nil -} - -// Close closes the client connection -func (s *Consul) Close() { - return -} diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/pkg/store/consul_test.go b/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/pkg/store/consul_test.go deleted file mode 100644 index 380497f531..0000000000 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/pkg/store/consul_test.go +++ /dev/null @@ -1,69 +0,0 @@ -package store - -import ( - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -func makeConsulClient(t *testing.T) Store { - client := "localhost:8500" - - kv, err := NewStore( - CONSUL, - []string{client}, - &Config{ - ConnectionTimeout: 3 * time.Second, - EphemeralTTL: 2 * time.Second, - }, - ) - if err != nil { - t.Fatalf("cannot create store: %v", err) - } - - return kv -} - -func TestConsulStore(t *testing.T) { - kv := makeConsulClient(t) - - testStore(t, kv) -} - -func TestCreateEphemeralSession(t *testing.T) { - kv := makeConsulClient(t) - - consul := kv.(*Consul) - - err := consul.createEphemeralSession() - assert.NoError(t, err) - assert.NotEqual(t, consul.ephemeralSession, "") -} - -func TestCheckActiveSession(t *testing.T) { - kv := makeConsulClient(t) - - consul := kv.(*Consul) - - key := "foo" - value := []byte("bar") - - // Put the first key with the Ephemeral flag - err := kv.Put(key, value, &WriteOptions{Ephemeral: true}) - assert.NoError(t, err) - - // Session should not be empty - session, err := consul.checkActiveSession(key) - assert.NoError(t, err) - assert.NotEqual(t, session, "") - - // Delete the key - err = kv.Delete(key) - assert.NoError(t, err) - - // Check the session again, it should return nothing - session, err = consul.checkActiveSession(key) - assert.NoError(t, err) - assert.Equal(t, session, "") -} diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/pkg/store/etcd.go b/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/pkg/store/etcd.go deleted file mode 100644 index 6b6460ebde..0000000000 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/pkg/store/etcd.go +++ /dev/null @@ -1,444 +0,0 @@ -package store - -import ( - "crypto/tls" - "net" - "net/http" - "strings" - "time" - - etcd "github.com/coreos/go-etcd/etcd" -) - -// Etcd embeds the client -type Etcd struct { - client *etcd.Client - ephemeralTTL time.Duration -} - -type etcdLock struct { - client *etcd.Client - stopLock chan struct{} - key string - value string - last *etcd.Response - ttl uint64 -} - -const ( - defaultLockTTL = 20 * time.Second - defaultUpdateTime = 5 * time.Second - - // periodicSync is the time between each call to SyncCluster - periodicSync = 10 * time.Minute -) - -// InitializeEtcd creates a new Etcd client given -// a list of endpoints and optional tls config -func InitializeEtcd(addrs []string, options *Config) (Store, error) { - s := &Etcd{} - - entries := createEndpoints(addrs, "http") - s.client = etcd.NewClient(entries) - - // Set options - if options != nil { - if options.TLS != nil { - s.setTLS(options.TLS) - } - if options.ConnectionTimeout != 0 { - s.setTimeout(options.ConnectionTimeout) - } - if options.EphemeralTTL != 0 { - s.setEphemeralTTL(options.EphemeralTTL) - } - } - - go func() { - for { - s.client.SyncCluster() - time.Sleep(periodicSync) - } - }() - return s, nil -} - -// SetTLS sets the tls configuration given the path -// of certificate files -func (s *Etcd) setTLS(tls *tls.Config) { - // Change to https scheme - var addrs []string - entries := s.client.GetCluster() - for _, entry := range entries { - addrs = append(addrs, strings.Replace(entry, "http", "https", -1)) - } - s.client.SetCluster(addrs) - - // Set transport - t := http.Transport{ - Dial: (&net.Dialer{ - Timeout: 30 * time.Second, // default timeout - KeepAlive: 30 * time.Second, - }).Dial, - TLSHandshakeTimeout: 10 * time.Second, - TLSClientConfig: tls, - } - s.client.SetTransport(&t) -} - -// SetTimeout sets the timeout used for connecting to the store -func (s *Etcd) setTimeout(time time.Duration) { - s.client.SetDialTimeout(time) -} - -// SetHeartbeat sets the heartbeat value to notify we are alive -func (s *Etcd) setEphemeralTTL(time time.Duration) { - s.ephemeralTTL = time -} - -// Create the entire path for a directory that does not exist -func (s *Etcd) createDirectory(path string) error { - if _, err := s.client.CreateDir(normalize(path), 10); err != nil { - if etcdError, ok := err.(*etcd.EtcdError); ok { - if etcdError.ErrorCode != 105 { // Skip key already exists - return err - } - } else { - return err - } - } - return nil -} - -// Get the value at "key", returns the last modified index -// to use in conjunction to CAS calls -func (s *Etcd) Get(key string) (*KVPair, error) { - result, err := s.client.Get(normalize(key), false, false) - if err != nil { - if etcdError, ok := err.(*etcd.EtcdError); ok { - // Not a Directory or Not a file - if etcdError.ErrorCode == 102 || etcdError.ErrorCode == 104 { - return nil, ErrKeyNotFound - } - } - return nil, err - } - return &KVPair{key, []byte(result.Node.Value), result.Node.ModifiedIndex}, nil -} - -// Put a value at "key" -func (s *Etcd) Put(key string, value []byte, opts *WriteOptions) error { - - // Default TTL = 0 means no expiration - var ttl uint64 - if opts != nil && opts.Ephemeral { - ttl = uint64(s.ephemeralTTL.Seconds()) - } - - if _, err := s.client.Set(key, string(value), ttl); err != nil { - if etcdError, ok := err.(*etcd.EtcdError); ok { - if etcdError.ErrorCode == 104 { // Not a directory - // Remove the last element (the actual key) and set the prefix as a dir - err = s.createDirectory(getDirectory(key)) - if _, err := s.client.Set(key, string(value), ttl); err != nil { - return err - } - } - } - return err - } - return nil -} - -// Delete a value at "key" -func (s *Etcd) Delete(key string) error { - if _, err := s.client.Delete(normalize(key), false); err != nil { - return err - } - return nil -} - -// Exists checks if the key exists inside the store -func (s *Etcd) Exists(key string) (bool, error) { - entry, err := s.Get(key) - if err != nil { - if err == ErrKeyNotFound || entry.Value == nil { - return false, nil - } - return false, err - } - return true, nil -} - -// Watch changes on a key. -// Returns a channel that will receive changes or an error. -// Upon creating a watch, the current value will be sent to the channel. -// Providing a non-nil stopCh can be used to stop watching. -func (s *Etcd) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error) { - // Get the current value - current, err := s.Get(key) - if err != nil { - return nil, err - } - - // Start an etcd watch. - // Note: etcd will send the current value through the channel. - etcdWatchCh := make(chan *etcd.Response) - etcdStopCh := make(chan bool) - go s.client.Watch(normalize(key), 0, false, etcdWatchCh, etcdStopCh) - - // Adapter goroutine: The goal here is to convert wathever format etcd is - // using into our interface. - watchCh := make(chan *KVPair) - go func() { - defer close(watchCh) - - // Push the current value through the channel. - watchCh <- current - - for { - select { - case result := <-etcdWatchCh: - watchCh <- &KVPair{ - key, - []byte(result.Node.Value), - result.Node.ModifiedIndex, - } - case <-stopCh: - etcdStopCh <- true - return - } - } - }() - return watchCh, nil -} - -// WatchTree watches changes on a "directory" -// Returns a channel that will receive changes or an error. -// Upon creating a watch, the current value will be sent to the channel. -// Providing a non-nil stopCh can be used to stop watching. -func (s *Etcd) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPair, error) { - // Get the current value - current, err := s.List(prefix) - if err != nil { - return nil, err - } - - // Start an etcd watch. - etcdWatchCh := make(chan *etcd.Response) - etcdStopCh := make(chan bool) - go s.client.Watch(normalize(prefix), 0, true, etcdWatchCh, etcdStopCh) - - // Adapter goroutine: The goal here is to convert wathever format etcd is - // using into our interface. - watchCh := make(chan []*KVPair) - go func() { - defer close(watchCh) - - // Push the current value through the channel. - watchCh <- current - - for { - select { - case <-etcdWatchCh: - // FIXME: We should probably use the value pushed by the channel. - // However, .Node.Nodes seems to be empty. - if list, err := s.List(prefix); err == nil { - watchCh <- list - } - case <-stopCh: - etcdStopCh <- true - return - } - } - }() - return watchCh, nil -} - -// AtomicPut put a value at "key" if the key has not been -// modified in the meantime, throws an error if this is the case -func (s *Etcd) AtomicPut(key string, value []byte, previous *KVPair, options *WriteOptions) (bool, *KVPair, error) { - if previous == nil { - return false, nil, ErrPreviousNotSpecified - } - - meta, err := s.client.CompareAndSwap(normalize(key), string(value), 0, "", previous.LastIndex) - if err != nil { - if etcdError, ok := err.(*etcd.EtcdError); ok { - if etcdError.ErrorCode == 101 { // Compare failed - return false, nil, ErrKeyModified - } - } - return false, nil, err - } - return true, &KVPair{Key: key, Value: value, LastIndex: meta.Node.ModifiedIndex}, nil -} - -// AtomicDelete deletes a value at "key" if the key has not -// been modified in the meantime, throws an error if this is the case -func (s *Etcd) AtomicDelete(key string, previous *KVPair) (bool, error) { - if previous == nil { - return false, ErrPreviousNotSpecified - } - - _, err := s.client.CompareAndDelete(normalize(key), "", previous.LastIndex) - if err != nil { - if etcdError, ok := err.(*etcd.EtcdError); ok { - if etcdError.ErrorCode == 101 { // Compare failed - return false, ErrKeyModified - } - } - return false, err - } - return true, nil -} - -// List the content of a given prefix -func (s *Etcd) List(prefix string) ([]*KVPair, error) { - resp, err := s.client.Get(normalize(prefix), true, true) - if err != nil { - return nil, err - } - kv := []*KVPair{} - for _, n := range resp.Node.Nodes { - key := strings.TrimLeft(n.Key, "/") - kv = append(kv, &KVPair{key, []byte(n.Value), n.ModifiedIndex}) - } - return kv, nil -} - -// DeleteTree deletes a range of keys based on prefix -func (s *Etcd) DeleteTree(prefix string) error { - if _, err := s.client.Delete(normalize(prefix), true); err != nil { - return err - } - return nil -} - -// NewLock returns a handle to a lock struct which can be used to acquire and -// release the mutex. -func (s *Etcd) NewLock(key string, options *LockOptions) (Locker, error) { - var value string - ttl := uint64(time.Duration(defaultLockTTL).Seconds()) - - // Apply options - if options != nil { - if options.Value != nil { - value = string(options.Value) - } - if options.TTL != 0 { - ttl = uint64(options.TTL.Seconds()) - } - } - - // Create lock object - lock := &etcdLock{ - client: s.client, - key: key, - value: value, - ttl: ttl, - } - - return lock, nil -} - -// Lock attempts to acquire the lock and blocks while doing so. -// Returns a channel that is closed if our lock is lost or an error. -func (l *etcdLock) Lock() (<-chan struct{}, error) { - - key := normalize(l.key) - - // Lock holder channels - lockHeld := make(chan struct{}) - stopLocking := make(chan struct{}) - - var lastIndex uint64 - - for { - resp, err := l.client.Create(key, l.value, l.ttl) - if err != nil { - if etcdError, ok := err.(*etcd.EtcdError); ok { - // Key already exists - if etcdError.ErrorCode != 105 { - lastIndex = ^uint64(0) - } - } - } else { - lastIndex = resp.Node.ModifiedIndex - } - - _, err = l.client.CompareAndSwap(key, l.value, l.ttl, "", lastIndex) - - if err == nil { - // Leader section - l.stopLock = stopLocking - go l.holdLock(key, lockHeld, stopLocking) - break - } else { - // Seeker section - chW := make(chan *etcd.Response) - chWStop := make(chan bool) - l.waitLock(key, chW, chWStop) - - // Delete or Expire event occured - // Retry - } - } - - return lockHeld, nil -} - -// Hold the lock as long as we can -// Updates the key ttl periodically until we receive -// an explicit stop signal from the Unlock method -func (l *etcdLock) holdLock(key string, lockHeld chan struct{}, stopLocking chan struct{}) { - defer close(lockHeld) - - update := time.NewTicker(defaultUpdateTime) - defer update.Stop() - - var err error - - for { - select { - case <-update.C: - l.last, err = l.client.Update(key, l.value, l.ttl) - if err != nil { - return - } - - case <-stopLocking: - return - } - } -} - -// WaitLock simply waits for the key to be available for creation -func (l *etcdLock) waitLock(key string, eventCh chan *etcd.Response, stopWatchCh chan bool) { - go l.client.Watch(key, 0, false, eventCh, stopWatchCh) - for event := range eventCh { - if event.Action == "delete" || event.Action == "expire" { - return - } - } -} - -// Unlock released the lock. It is an error to call this -// if the lock is not currently held. -func (l *etcdLock) Unlock() error { - if l.stopLock != nil { - l.stopLock <- struct{}{} - } - if l.last != nil { - _, err := l.client.CompareAndDelete(normalize(l.key), l.value, l.last.Node.ModifiedIndex) - if err != nil { - return err - } - } - return nil -} - -// Close closes the client connection -func (s *Etcd) Close() { - return -} diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/pkg/store/etcd_test.go b/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/pkg/store/etcd_test.go deleted file mode 100644 index da3bf5f4ad..0000000000 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/pkg/store/etcd_test.go +++ /dev/null @@ -1,30 +0,0 @@ -package store - -import ( - "testing" - "time" -) - -func makeEtcdClient(t *testing.T) Store { - client := "localhost:4001" - - kv, err := NewStore( - ETCD, - []string{client}, - &Config{ - ConnectionTimeout: 3 * time.Second, - EphemeralTTL: 2 * time.Second, - }, - ) - if err != nil { - t.Fatalf("cannot create store: %v", err) - } - - return kv -} - -func TestEtcdStore(t *testing.T) { - kv := makeEtcdClient(t) - - testStore(t, kv) -} diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/pkg/store/helpers.go b/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/pkg/store/helpers.go deleted file mode 100644 index a9386e31c1..0000000000 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/pkg/store/helpers.go +++ /dev/null @@ -1,46 +0,0 @@ -package store - -import ( - "strings" -) - -// Creates a list of endpoints given the right scheme -func createEndpoints(addrs []string, scheme string) (entries []string) { - for _, addr := range addrs { - entries = append(entries, scheme+"://"+addr) - } - return entries -} - -// Normalize the key for each store to the form: -// -// /path/to/key -// -func normalize(key string) string { - return "/" + join(splitKey(key)) -} - -// Get the full directory part of the key to the form: -// -// /path/to/ -// -func getDirectory(key string) string { - parts := splitKey(key) - parts = parts[:len(parts)-1] - return "/" + join(parts) -} - -// SplitKey splits the key to extract path informations -func splitKey(key string) (path []string) { - if strings.Contains(key, "/") { - path = strings.Split(key, "/") - } else { - path = []string{key} - } - return path -} - -// Join the path parts with '/' -func join(parts []string) string { - return strings.Join(parts, "/") -} diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/pkg/store/mock.go b/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/pkg/store/mock.go deleted file mode 100644 index e7c3396973..0000000000 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/pkg/store/mock.go +++ /dev/null @@ -1,109 +0,0 @@ -package store - -import "github.com/stretchr/testify/mock" - -// Mock store. Mocks all Store functions using testify.Mock. -type Mock struct { - mock.Mock - - // Endpoints passed to InitializeMock - Endpoints []string - // Options passed to InitializeMock - Options *Config -} - -// InitializeMock creates a Mock store. -func InitializeMock(endpoints []string, options *Config) (Store, error) { - s := &Mock{} - s.Endpoints = endpoints - s.Options = options - return s, nil -} - -// Put mock -func (s *Mock) Put(key string, value []byte, opts *WriteOptions) error { - args := s.Mock.Called(key, value, opts) - return args.Error(0) -} - -// Get mock -func (s *Mock) Get(key string) (*KVPair, error) { - args := s.Mock.Called(key) - return args.Get(0).(*KVPair), args.Error(1) -} - -// Delete mock -func (s *Mock) Delete(key string) error { - args := s.Mock.Called(key) - return args.Error(0) -} - -// Exists mock -func (s *Mock) Exists(key string) (bool, error) { - args := s.Mock.Called(key) - return args.Bool(0), args.Error(1) -} - -// Watch mock -func (s *Mock) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error) { - args := s.Mock.Called(key, stopCh) - return args.Get(0).(<-chan *KVPair), args.Error(1) -} - -// WatchTree mock -func (s *Mock) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPair, error) { - args := s.Mock.Called(prefix, stopCh) - return args.Get(0).(chan []*KVPair), args.Error(1) -} - -// NewLock mock -func (s *Mock) NewLock(key string, options *LockOptions) (Locker, error) { - args := s.Mock.Called(key, options) - return args.Get(0).(Locker), args.Error(1) -} - -// List mock -func (s *Mock) List(prefix string) ([]*KVPair, error) { - args := s.Mock.Called(prefix) - return args.Get(0).([]*KVPair), args.Error(1) -} - -// DeleteTree mock -func (s *Mock) DeleteTree(prefix string) error { - args := s.Mock.Called(prefix) - return args.Error(0) -} - -// AtomicPut mock -func (s *Mock) AtomicPut(key string, value []byte, previous *KVPair, opts *WriteOptions) (bool, *KVPair, error) { - args := s.Mock.Called(key, value, previous, opts) - return args.Bool(0), args.Get(1).(*KVPair), args.Error(2) -} - -// AtomicDelete mock -func (s *Mock) AtomicDelete(key string, previous *KVPair) (bool, error) { - args := s.Mock.Called(key, previous) - return args.Bool(0), args.Error(1) -} - -// MockLock mock implementation of Locker -type MockLock struct { - mock.Mock -} - -// Lock mock -func (l *MockLock) Lock() (<-chan struct{}, error) { - args := l.Mock.Called() - return args.Get(0).(<-chan struct{}), args.Error(1) -} - -// Unlock mock -func (l *MockLock) Unlock() error { - args := l.Mock.Called() - return args.Error(0) -} - -// Close mock -func (s *Mock) Close() { - return -} diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/pkg/store/store.go b/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/pkg/store/store.go deleted file mode 100644 index 0a4813aa34..0000000000 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/pkg/store/store.go +++ /dev/null @@ -1,154 +0,0 @@ -package store - -import ( - "crypto/tls" - "errors" - "time" - - log "github.com/Sirupsen/logrus" -) - -// Backend represents a KV Store Backend -type Backend string - -const ( - // MOCK backend - MOCK Backend = "mock" - // CONSUL backend - CONSUL = "consul" - // ETCD backend - ETCD = "etcd" - // ZK backend - ZK = "zk" -) - -var ( - // ErrInvalidTTL is a specific error to consul - ErrInvalidTTL = errors.New("Invalid TTL, please change the value to the miminum allowed ttl for the chosen store") - // ErrNotSupported is exported - ErrNotSupported = errors.New("Backend storage not supported yet, please choose another one") - // ErrNotImplemented is exported - ErrNotImplemented = errors.New("Call not implemented in current backend") - // ErrNotReachable is exported - ErrNotReachable = errors.New("Api not reachable") - // ErrCannotLock is exported - ErrCannotLock = errors.New("Error acquiring the lock") - // ErrWatchDoesNotExist is exported - ErrWatchDoesNotExist = errors.New("No watch found for specified key") - // ErrKeyModified is exported - ErrKeyModified = errors.New("Unable to complete atomic operation, key modified") - // ErrKeyNotFound is exported - ErrKeyNotFound = errors.New("Key not found in store") - // ErrPreviousNotSpecified is exported - ErrPreviousNotSpecified = errors.New("Previous K/V pair should be provided for the Atomic operation") -) - -// Config contains the options for a storage client -type Config struct { - TLS *tls.Config - ConnectionTimeout time.Duration - EphemeralTTL time.Duration -} - -// Store represents the backend K/V storage -// Each store should support every call listed -// here. Or it couldn't be implemented as a K/V -// backend for libkv -type Store interface { - // Put a value at the specified key - Put(key string, value []byte, options *WriteOptions) error - - // Get a value given its key - Get(key string) (*KVPair, error) - - // Delete the value at the specified key - Delete(key string) error - - // Verify if a Key exists in the store - Exists(key string) (bool, error) - - // Watch changes on a key. - // Returns a channel that will receive changes or an error. - // Upon creating a watch, the current value will be sent to the channel. - // Providing a non-nil stopCh can be used to stop watching. - Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error) - - // WatchTree watches changes on a "directory" - // Returns a channel that will receive changes or an error. - // Upon creating a watch, the current value will be sent to the channel. - // Providing a non-nil stopCh can be used to stop watching. - WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPair, error) - - // CreateLock for a given key. - // The returned Locker is not held and must be acquired with `.Lock`. - // value is optional. - NewLock(key string, options *LockOptions) (Locker, error) - - // List the content of a given prefix - List(prefix string) ([]*KVPair, error) - - // DeleteTree deletes a range of keys based on prefix - DeleteTree(prefix string) error - - // Atomic operation on a single value - AtomicPut(key string, value []byte, previous *KVPair, options *WriteOptions) (bool, *KVPair, error) - - // Atomic delete of a single value - AtomicDelete(key string, previous *KVPair) (bool, error) - - // Close the store connection - Close() -} - -// KVPair represents {Key, Value, Lastindex} tuple -type KVPair struct { - Key string - Value []byte - LastIndex uint64 -} - -// WriteOptions contains optional request parameters -type WriteOptions struct { - Heartbeat time.Duration - Ephemeral bool -} - -// LockOptions contains optional request parameters -type LockOptions struct { - Value []byte // Optional, value to associate with the lock - TTL time.Duration // Optional, expiration ttl associated with the lock -} - -// WatchCallback is used for watch methods on keys -// and is triggered on key change -type WatchCallback func(entries ...*KVPair) - -// Locker provides locking mechanism on top of the store. -// Similar to `sync.Lock` except it may return errors. -type Locker interface { - Lock() (<-chan struct{}, error) - Unlock() error -} - -// Initialize creates a new Store object, initializing the client -type Initialize func(addrs []string, options *Config) (Store, error) - -var ( - // Backend initializers - initializers = map[Backend]Initialize{ - MOCK: InitializeMock, - CONSUL: InitializeConsul, - ETCD: InitializeEtcd, - ZK: InitializeZookeeper, - } -) - -// NewStore creates a an instance of store -func NewStore(backend Backend, addrs []string, options *Config) (Store, error) { - if init, exists := initializers[backend]; exists { - log.WithFields(log.Fields{"backend": backend}).Debug("Initializing store service") - return init(addrs, options) - } - - return nil, ErrNotSupported -} diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/pkg/store/store_test.go b/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/pkg/store/store_test.go deleted file mode 100644 index 89c982911b..0000000000 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/pkg/store/store_test.go +++ /dev/null @@ -1,401 +0,0 @@ -package store - -import ( - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -func testStore(t *testing.T, kv Store) { - testPutGetDelete(t, kv) - testWatch(t, kv) - testWatchTree(t, kv) - testAtomicPut(t, kv) - testAtomicDelete(t, kv) - testLockUnlock(t, kv) - testPutEphemeral(t, kv) - testList(t, kv) - testDeleteTree(t, kv) -} - -func testPutGetDelete(t *testing.T, kv Store) { - key := "foo" - value := []byte("bar") - - // Put the key - err := kv.Put(key, value, nil) - assert.NoError(t, err) - - // Get should return the value and an incremented index - pair, err := kv.Get(key) - assert.NoError(t, err) - if assert.NotNil(t, pair) { - assert.NotNil(t, pair.Value) - } - assert.Equal(t, pair.Value, value) - assert.NotEqual(t, pair.LastIndex, 0) - - // Delete the key - err = kv.Delete(key) - assert.NoError(t, err) - - // Get should fail - pair, err = kv.Get(key) - assert.Error(t, err) - assert.Nil(t, pair) -} - -func testWatch(t *testing.T, kv Store) { - key := "hello" - value := []byte("world") - newValue := []byte("world!") - - // Put the key - err := kv.Put(key, value, nil) - assert.NoError(t, err) - - stopCh := make(<-chan struct{}) - events, err := kv.Watch(key, stopCh) - assert.NoError(t, err) - assert.NotNil(t, events) - - // Update loop - go func() { - timeout := time.After(1 * time.Second) - tick := time.Tick(250 * time.Millisecond) - for { - select { - case <-timeout: - return - case <-tick: - err := kv.Put(key, newValue, nil) - if assert.NoError(t, err) { - continue - } - return - } - } - }() - - // Check for updates - timeout := time.After(2 * time.Second) - eventCount := 1 - for { - select { - case event := <-events: - assert.NotNil(t, event) - if eventCount == 1 { - assert.Equal(t, event.Key, key) - assert.Equal(t, event.Value, value) - } else { - assert.Equal(t, event.Key, key) - assert.Equal(t, event.Value, newValue) - } - eventCount++ - // We received all the events we wanted to check - if eventCount >= 4 { - return - } - case <-timeout: - t.Fatal("Timeout reached") - return - } - } -} - -func testWatchTree(t *testing.T, kv Store) { - dir := "tree" - - node1 := "tree/node1" - value1 := []byte("node1") - - node2 := "tree/node2" - value2 := []byte("node2") - - node3 := "tree/node3" - value3 := []byte("node3") - - err := kv.Put(node1, value1, nil) - assert.NoError(t, err) - err = kv.Put(node2, value2, nil) - assert.NoError(t, err) - err = kv.Put(node3, value3, nil) - assert.NoError(t, err) - - stopCh := make(<-chan struct{}) - events, err := kv.WatchTree(dir, stopCh) - assert.NoError(t, err) - assert.NotNil(t, events) - - // Update loop - go func() { - timeout := time.After(250 * time.Millisecond) - for { - select { - case <-timeout: - err := kv.Delete(node3) - assert.NoError(t, err) - return - } - } - }() - - // Check for updates - timeout := time.After(4 * time.Second) - for { - select { - case event := <-events: - assert.NotNil(t, event) - // We received the Delete event on a child node - // Exit test successfully - if len(event) == 2 { - return - } - case <-timeout: - t.Fatal("Timeout reached") - return - } - } -} - -func testAtomicPut(t *testing.T, kv Store) { - key := "hello" - value := []byte("world") - - // Put the key - err := kv.Put(key, value, nil) - assert.NoError(t, err) - - // Get should return the value and an incremented index - pair, err := kv.Get(key) - assert.NoError(t, err) - if assert.NotNil(t, pair) { - assert.NotNil(t, pair.Value) - } - assert.Equal(t, pair.Value, value) - assert.NotEqual(t, pair.LastIndex, 0) - - // This CAS should succeed - success, _, err := kv.AtomicPut("hello", []byte("WORLD"), pair, nil) - assert.NoError(t, err) - assert.True(t, success) - - // This CAS should fail - pair.LastIndex = 0 - success, _, err = kv.AtomicPut("hello", []byte("WORLDWORLD"), pair, nil) - assert.Error(t, err) - assert.False(t, success) -} - -func testAtomicDelete(t *testing.T, kv Store) { - key := "atomic" - value := []byte("world") - - // Put the key - err := kv.Put(key, value, nil) - assert.NoError(t, err) - - // Get should return the value and an incremented index - pair, err := kv.Get(key) - assert.NoError(t, err) - if assert.NotNil(t, pair) { - assert.NotNil(t, pair.Value) - } - assert.Equal(t, pair.Value, value) - assert.NotEqual(t, pair.LastIndex, 0) - - tempIndex := pair.LastIndex - - // AtomicDelete should fail - pair.LastIndex = 0 - success, err := kv.AtomicDelete(key, pair) - assert.Error(t, err) - assert.False(t, success) - - // AtomicDelete should succeed - pair.LastIndex = tempIndex - success, err = kv.AtomicDelete(key, pair) - assert.NoError(t, err) - assert.True(t, success) -} - -func testLockUnlock(t *testing.T, kv Store) { - t.Parallel() - - key := "foo" - value := []byte("bar") - - // We should be able to create a new lock on key - lock, err := kv.NewLock(key, &LockOptions{Value: value}) - assert.NoError(t, err) - assert.NotNil(t, lock) - - // Lock should successfully succeed or block - lockChan, err := lock.Lock() - assert.NoError(t, err) - assert.NotNil(t, lockChan) - - // Get should work - pair, err := kv.Get(key) - assert.NoError(t, err) - if assert.NotNil(t, pair) { - assert.NotNil(t, pair.Value) - } - assert.Equal(t, pair.Value, value) - assert.NotEqual(t, pair.LastIndex, 0) - - // Unlock should succeed - err = lock.Unlock() - assert.NoError(t, err) - - // Get should work - pair, err = kv.Get(key) - assert.NoError(t, err) - if assert.NotNil(t, pair) { - assert.NotNil(t, pair.Value) - } - assert.Equal(t, pair.Value, value) - assert.NotEqual(t, pair.LastIndex, 0) -} - -// FIXME Gracefully handle Zookeeper -func testPutEphemeral(t *testing.T, kv Store) { - // Zookeeper: initialize client here (Close() hangs otherwise) - zookeeper := false - if _, ok := kv.(*Zookeeper); ok { - zookeeper = true - kv = makeZkClient(t) - } - - firstKey := "first" - firstValue := []byte("foo") - - secondKey := "second" - secondValue := []byte("bar") - - // Put the first key with the Ephemeral flag - err := kv.Put(firstKey, firstValue, &WriteOptions{Ephemeral: true}) - assert.NoError(t, err) - - // Put a second key with the Ephemeral flag - err = kv.Put(secondKey, secondValue, &WriteOptions{Ephemeral: true}) - assert.NoError(t, err) - - // Get on firstKey should work - pair, err := kv.Get(firstKey) - assert.NoError(t, err) - assert.NotNil(t, pair) - - // Get on secondKey should work - pair, err = kv.Get(secondKey) - assert.NoError(t, err) - assert.NotNil(t, pair) - - // Zookeeper: close client connection - if zookeeper { - kv.Close() - } - - // Let the session expire - time.Sleep(5 * time.Second) - - // Zookeeper: re-create the client - if zookeeper { - kv = makeZkClient(t) - } - - // Get on firstKey shouldn't work - pair, err = kv.Get(firstKey) - assert.Error(t, err) - assert.Nil(t, pair) - - // Get on secondKey shouldn't work - pair, err = kv.Get(secondKey) - assert.Error(t, err) - assert.Nil(t, pair) -} - -func testList(t *testing.T, kv Store) { - prefix := "nodes" - - firstKey := "nodes/first" - firstValue := []byte("first") - - secondKey := "nodes/second" - secondValue := []byte("second") - - // Put the first key - err := kv.Put(firstKey, firstValue, nil) - assert.NoError(t, err) - - // Put the second key - err = kv.Put(secondKey, secondValue, nil) - assert.NoError(t, err) - - // List should work and return the two correct values - pairs, err := kv.List(prefix) - assert.NoError(t, err) - if assert.NotNil(t, pairs) { - assert.Equal(t, len(pairs), 2) - } - - // Check pairs, those are not necessarily in Put order - for _, pair := range pairs { - if pair.Key == firstKey { - assert.Equal(t, pair.Value, firstValue) - } - if pair.Key == secondKey { - assert.Equal(t, pair.Value, secondValue) - } - } -} - -func testDeleteTree(t *testing.T, kv Store) { - prefix := "nodes" - - firstKey := "nodes/first" - firstValue := []byte("first") - - secondKey := "nodes/second" - secondValue := []byte("second") - - // Put the first key - err := kv.Put(firstKey, firstValue, nil) - assert.NoError(t, err) - - // Put the second key - err = kv.Put(secondKey, secondValue, nil) - assert.NoError(t, err) - - // Get should work on the first Key - pair, err := kv.Get(firstKey) - assert.NoError(t, err) - if assert.NotNil(t, pair) { - assert.NotNil(t, pair.Value) - } - assert.Equal(t, pair.Value, firstValue) - assert.NotEqual(t, pair.LastIndex, 0) - - // Get should work on the second Key - pair, err = kv.Get(secondKey) - assert.NoError(t, err) - if assert.NotNil(t, pair) { - assert.NotNil(t, pair.Value) - } - assert.Equal(t, pair.Value, secondValue) - assert.NotEqual(t, pair.LastIndex, 0) - - // Delete Values under directory `nodes` - err = kv.DeleteTree(prefix) - assert.NoError(t, err) - - // Get should fail on both keys - pair, err = kv.Get(firstKey) - assert.Error(t, err) - assert.Nil(t, pair) - - pair, err = kv.Get(secondKey) - assert.Error(t, err) - assert.Nil(t, pair) -} diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/pkg/store/zookeeper.go b/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/pkg/store/zookeeper.go deleted file mode 100644 index 355f513332..0000000000 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/pkg/store/zookeeper.go +++ /dev/null @@ -1,311 +0,0 @@ -package store - -import ( - "strings" - "time" - - log "github.com/Sirupsen/logrus" - zk "github.com/samuel/go-zookeeper/zk" -) - -const defaultTimeout = 10 * time.Second - -// Zookeeper embeds the zookeeper client -type Zookeeper struct { - timeout time.Duration - client *zk.Conn -} - -type zookeeperLock struct { - client *zk.Conn - lock *zk.Lock - key string - value []byte -} - -// InitializeZookeeper creates a new Zookeeper client -// given a list of endpoints and optional tls config -func InitializeZookeeper(endpoints []string, options *Config) (Store, error) { - s := &Zookeeper{} - s.timeout = defaultTimeout - - // Set options - if options != nil { - if options.ConnectionTimeout != 0 { - s.setTimeout(options.ConnectionTimeout) - } - } - - conn, _, err := zk.Connect(endpoints, s.timeout) - if err != nil { - log.Error(err) - return nil, err - } - s.client = conn - return s, nil -} - -// SetTimeout sets the timout for connecting to Zookeeper -func (s *Zookeeper) setTimeout(time time.Duration) { - s.timeout = time -} - -// Get the value at "key", returns the last modified index -// to use in conjunction to CAS calls -func (s *Zookeeper) Get(key string) (*KVPair, error) { - resp, meta, err := s.client.Get(normalize(key)) - if err != nil { - return nil, err - } - if resp == nil { - return nil, ErrKeyNotFound - } - return &KVPair{key, resp, uint64(meta.Version)}, nil -} - -// Create the entire path for a directory that does not exist -func (s *Zookeeper) createFullpath(path []string, ephemeral bool) error { - for i := 1; i <= len(path); i++ { - newpath := "/" + strings.Join(path[:i], "/") - if i == len(path) && ephemeral { - _, err := s.client.Create(newpath, []byte{1}, zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) - return err - } - _, err := s.client.Create(newpath, []byte{1}, 0, zk.WorldACL(zk.PermAll)) - if err != nil { - // Skip if node already exists - if err != zk.ErrNodeExists { - return err - } - } - } - return nil -} - -// Put a value at "key" -func (s *Zookeeper) Put(key string, value []byte, opts *WriteOptions) error { - fkey := normalize(key) - exists, err := s.Exists(key) - if err != nil { - return err - } - if !exists { - if opts != nil && opts.Ephemeral { - s.createFullpath(splitKey(key), opts.Ephemeral) - } else { - s.createFullpath(splitKey(key), false) - } - } - _, err = s.client.Set(fkey, value, -1) - return err -} - -// Delete a value at "key" -func (s *Zookeeper) Delete(key string) error { - err := s.client.Delete(normalize(key), -1) - return err -} - -// Exists checks if the key exists inside the store -func (s *Zookeeper) Exists(key string) (bool, error) { - exists, _, err := s.client.Exists(normalize(key)) - if err != nil { - return false, err - } - return exists, nil -} - -// Watch changes on a key. -// Returns a channel that will receive changes or an error. -// Upon creating a watch, the current value will be sent to the channel. -// Providing a non-nil stopCh can be used to stop watching. -func (s *Zookeeper) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error) { - fkey := normalize(key) - pair, err := s.Get(key) - if err != nil { - return nil, err - } - - // Catch zk notifications and fire changes into the channel. - watchCh := make(chan *KVPair) - go func() { - defer close(watchCh) - - // Get returns the current value before setting the watch. - watchCh <- pair - for { - _, _, eventCh, err := s.client.GetW(fkey) - if err != nil { - return - } - select { - case e := <-eventCh: - if e.Type == zk.EventNodeDataChanged { - if entry, err := s.Get(key); err == nil { - watchCh <- entry - } - } - case <-stopCh: - // There is no way to stop GetW so just quit - return - } - } - }() - - return watchCh, nil -} - -// WatchTree watches changes on a "directory" -// Returns a channel that will receive changes or an error. -// Upon creating a watch, the current value will be sent to the channel. -// Providing a non-nil stopCh can be used to stop watching. -func (s *Zookeeper) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPair, error) { - fprefix := normalize(prefix) - entries, err := s.List(prefix) - if err != nil { - return nil, err - } - - // Catch zk notifications and fire changes into the channel. - watchCh := make(chan []*KVPair) - go func() { - defer close(watchCh) - - // List returns the current values before setting the watch. - watchCh <- entries - - for { - _, _, eventCh, err := s.client.ChildrenW(fprefix) - if err != nil { - return - } - select { - case e := <-eventCh: - if e.Type == zk.EventNodeChildrenChanged { - if kv, err := s.List(prefix); err == nil { - watchCh <- kv - } - } - case <-stopCh: - // There is no way to stop GetW so just quit - return - } - } - }() - - return watchCh, nil -} - -// List the content of a given prefix -func (s *Zookeeper) List(prefix string) ([]*KVPair, error) { - keys, stat, err := s.client.Children(normalize(prefix)) - if err != nil { - return nil, err - } - kv := []*KVPair{} - for _, key := range keys { - // FIXME Costly Get request for each child key.. - pair, err := s.Get(prefix + normalize(key)) - if err != nil { - return nil, err - } - kv = append(kv, &KVPair{key, []byte(pair.Value), uint64(stat.Version)}) - } - return kv, nil -} - -// DeleteTree deletes a range of keys based on prefix -func (s *Zookeeper) DeleteTree(prefix string) error { - pairs, err := s.List(prefix) - if err != nil { - return err - } - var reqs []interface{} - for _, pair := range pairs { - reqs = append(reqs, &zk.DeleteRequest{ - Path: normalize(prefix + "/" + pair.Key), - Version: -1, - }) - } - _, err = s.client.Multi(reqs...) - return err -} - -// AtomicPut put a value at "key" if the key has not been -// modified in the meantime, throws an error if this is the case -func (s *Zookeeper) AtomicPut(key string, value []byte, previous *KVPair, _ *WriteOptions) (bool, *KVPair, error) { - if previous == nil { - return false, nil, ErrPreviousNotSpecified - } - - meta, err := s.client.Set(normalize(key), value, int32(previous.LastIndex)) - if err != nil { - if err == zk.ErrBadVersion { - return false, nil, ErrKeyModified - } - return false, nil, err - } - return true, &KVPair{Key: key, Value: value, LastIndex: uint64(meta.Version)}, nil -} - -// AtomicDelete deletes a value at "key" if the key has not -// been modified in the meantime, throws an error if this is the case -func (s *Zookeeper) AtomicDelete(key string, previous *KVPair) (bool, error) { - if previous == nil { - return false, ErrPreviousNotSpecified - } - - err := s.client.Delete(normalize(key), int32(previous.LastIndex)) - if err != nil { - if err == zk.ErrBadVersion { - return false, ErrKeyModified - } - return false, err - } - return true, nil -} - -// NewLock returns a handle to a lock struct which can be used to acquire and -// release the mutex. -func (s *Zookeeper) NewLock(key string, options *LockOptions) (Locker, error) { - value := []byte("") - - // Apply options - if options != nil { - if options.Value != nil { - value = options.Value - } - } - - return &zookeeperLock{ - client: s.client, - key: normalize(key), - value: value, - lock: zk.NewLock(s.client, normalize(key), zk.WorldACL(zk.PermAll)), - }, nil -} - -// Lock attempts to acquire the lock and blocks while doing so. -// Returns a channel that is closed if our lock is lost or an error. -func (l *zookeeperLock) Lock() (<-chan struct{}, error) { - err := l.lock.Lock() - - if err == nil { - // We hold the lock, we can set our value - // FIXME: When the last leader leaves the election, this value will be left behind - _, err = l.client.Set(l.key, l.value, -1) - } - - return make(chan struct{}), err -} - -// Unlock released the lock. It is an error to call this -// if the lock is not currently held. -func (l *zookeeperLock) Unlock() error { - return l.lock.Unlock() -} - -// Close closes the client connection -func (s *Zookeeper) Close() { - s.client.Close() -} diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/pkg/store/zookeeper_test.go b/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/pkg/store/zookeeper_test.go deleted file mode 100644 index f5e4fec0db..0000000000 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/swarm/pkg/store/zookeeper_test.go +++ /dev/null @@ -1,30 +0,0 @@ -package store - -import ( - "testing" - "time" -) - -func makeZkClient(t *testing.T) Store { - client := "localhost:2181" - - kv, err := NewStore( - ZK, - []string{client}, - &Config{ - ConnectionTimeout: 3 * time.Second, - EphemeralTTL: 2 * time.Second, - }, - ) - if err != nil { - t.Fatalf("cannot create store: %v", err) - } - - return kv -} - -func TestZkStore(t *testing.T) { - kv := makeZkClient(t) - - testStore(t, kv) -}