Selaa lähdekoodia

Add an unsafe memory discovery store for testing.

Signed-off-by: David Calavera <david.calavera@gmail.com>
David Calavera 9 vuotta sitten
vanhempi
commit
22a81a2c58
2 muutettua tiedostoa jossa 130 lisäystä ja 0 poistoa
  1. 82 0
      pkg/discovery/memory/memory.go
  2. 48 0
      pkg/discovery/memory/memory_test.go

+ 82 - 0
pkg/discovery/memory/memory.go

@@ -0,0 +1,82 @@
+package memory
+
+import (
+	"time"
+
+	"github.com/docker/docker/pkg/discovery"
+)
+
+// Discovery implements a descovery backend that keeps
+// data in memory.
+type Discovery struct {
+	heartbeat time.Duration
+	values    []string
+}
+
+func init() {
+	Init()
+}
+
+// Init registers the memory backend on demand.
+func Init() {
+	discovery.Register("memory", &Discovery{})
+}
+
+// Initialize sets the heartbeat for the memory backend.
+func (s *Discovery) Initialize(_ string, heartbeat time.Duration, _ time.Duration, _ map[string]string) error {
+	s.heartbeat = heartbeat
+	return nil
+}
+
+// Watch sends periodic discovery updates to a channel.
+func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) {
+	ch := make(chan discovery.Entries)
+	errCh := make(chan error)
+	ticker := time.NewTicker(s.heartbeat)
+
+	go func() {
+		defer close(errCh)
+		defer close(ch)
+
+		// Send the initial entries if available.
+		var currentEntries discovery.Entries
+		if len(s.values) > 0 {
+			var err error
+			currentEntries, err = discovery.CreateEntries(s.values)
+			if err != nil {
+				errCh <- err
+			} else {
+				ch <- currentEntries
+			}
+		}
+
+		// Periodically send updates.
+		for {
+			select {
+			case <-ticker.C:
+				newEntries, err := discovery.CreateEntries(s.values)
+				if err != nil {
+					errCh <- err
+					continue
+				}
+
+				// Check if the file has really changed.
+				if !newEntries.Equals(currentEntries) {
+					ch <- newEntries
+				}
+				currentEntries = newEntries
+			case <-stopCh:
+				ticker.Stop()
+				return
+			}
+		}
+	}()
+
+	return ch, errCh
+}
+
+// Register adds a new address to the discovery.
+func (s *Discovery) Register(addr string) error {
+	s.values = append(s.values, addr)
+	return nil
+}

+ 48 - 0
pkg/discovery/memory/memory_test.go

@@ -0,0 +1,48 @@
+package memory
+
+import (
+	"testing"
+
+	"github.com/docker/docker/pkg/discovery"
+	"github.com/go-check/check"
+)
+
+// Hook up gocheck into the "go test" runner.
+func Test(t *testing.T) { check.TestingT(t) }
+
+type discoverySuite struct{}
+
+var _ = check.Suite(&discoverySuite{})
+
+func (s *discoverySuite) TestWatch(c *check.C) {
+	d := &Discovery{}
+	d.Initialize("foo", 1000, 0, nil)
+	stopCh := make(chan struct{})
+	ch, errCh := d.Watch(stopCh)
+
+	// We have to drain the error channel otherwise Watch will get stuck.
+	go func() {
+		for range errCh {
+		}
+	}()
+
+	expected := discovery.Entries{
+		&discovery.Entry{Host: "1.1.1.1", Port: "1111"},
+	}
+
+	c.Assert(d.Register("1.1.1.1:1111"), check.IsNil)
+	c.Assert(<-ch, check.DeepEquals, expected)
+
+	expected = discovery.Entries{
+		&discovery.Entry{Host: "1.1.1.1", Port: "1111"},
+		&discovery.Entry{Host: "2.2.2.2", Port: "2222"},
+	}
+
+	c.Assert(d.Register("2.2.2.2:2222"), check.IsNil)
+	c.Assert(<-ch, check.DeepEquals, expected)
+
+	// Stop and make sure it closes all channels.
+	close(stopCh)
+	c.Assert(<-ch, check.IsNil)
+	c.Assert(<-errCh, check.IsNil)
+}