Pārlūkot izejas kodu

Swarm integration tests

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
Signed-off-by: Victor Vieux <vieux@docker.com>
Tonis Tiigi 9 gadi atpakaļ
vecāks
revīzija
0d88d5b64b

+ 55 - 0
integration-cli/check_test.go

@@ -184,6 +184,61 @@ func (s *DockerDaemonSuite) TearDownTest(c *check.C) {
 	s.ds.TearDownTest(c)
 }
 
+const defaultSwarmPort = 2477
+
+func init() {
+	check.Suite(&DockerSwarmSuite{
+		ds: &DockerSuite{},
+	})
+}
+
+type DockerSwarmSuite struct {
+	ds        *DockerSuite
+	daemons   []*SwarmDaemon
+	portIndex int
+}
+
+func (s *DockerSwarmSuite) SetUpTest(c *check.C) {
+	testRequires(c, DaemonIsLinux)
+}
+
+func (s *DockerSwarmSuite) AddDaemon(c *check.C, joinSwarm, manager bool) *SwarmDaemon {
+	d := &SwarmDaemon{
+		Daemon: NewDaemon(c),
+		port:   defaultSwarmPort + s.portIndex,
+	}
+	d.listenAddr = fmt.Sprintf("0.0.0.0:%d", d.port)
+	err := d.StartWithBusybox()
+	c.Assert(err, check.IsNil)
+
+	if joinSwarm == true {
+		if len(s.daemons) > 0 {
+			c.Assert(d.Join(s.daemons[0].listenAddr, "", "", manager), check.IsNil)
+		} else {
+			aa := make(map[string]bool)
+			aa["worker"] = true
+			aa["manager"] = true
+			c.Assert(d.Init(aa, ""), check.IsNil)
+		}
+	}
+
+	s.portIndex++
+	s.daemons = append(s.daemons, d)
+
+	return d
+}
+
+func (s *DockerSwarmSuite) TearDownTest(c *check.C) {
+	testRequires(c, DaemonIsLinux)
+	for _, d := range s.daemons {
+		d.Stop()
+	}
+	s.daemons = nil
+	s.portIndex = 0
+
+	s.ds.TearDownTest(c)
+}
+
 func init() {
 	check.Suite(&DockerTrustSuite{
 		ds: &DockerSuite{},

+ 44 - 3
integration-cli/daemon.go

@@ -1,6 +1,7 @@
 package main
 
 import (
+	"bytes"
 	"encoding/json"
 	"errors"
 	"fmt"
@@ -292,9 +293,9 @@ out1:
 		select {
 		case err := <-d.wait:
 			return err
-		case <-time.After(15 * time.Second):
+		case <-time.After(20 * time.Second):
 			// time for stopping jobs and run onShutdown hooks
-			d.c.Log("timeout")
+			d.c.Logf("timeout: %v", d.id)
 			break out1
 		}
 	}
@@ -306,7 +307,7 @@ out2:
 			return err
 		case <-tick:
 			i++
-			if i > 4 {
+			if i > 5 {
 				d.c.Logf("tried to interrupt daemon for %d times, now try to kill it", i)
 				break out2
 			}
@@ -452,6 +453,27 @@ func (d *Daemon) CmdWithArgs(daemonArgs []string, name string, arg ...string) (s
 	return string(b), err
 }
 
+// SockRequest executes a socket request on a daemon and returns statuscode and output.
+func (d *Daemon) SockRequest(method, endpoint string, data interface{}) (int, []byte, error) {
+	jsonData := bytes.NewBuffer(nil)
+	if err := json.NewEncoder(jsonData).Encode(data); err != nil {
+		return -1, nil, err
+	}
+
+	res, body, err := d.SockRequestRaw(method, endpoint, jsonData, "application/json")
+	if err != nil {
+		return -1, nil, err
+	}
+	b, err := readBody(body)
+	return res.StatusCode, b, err
+}
+
+// SockRequestRaw executes a socket request on a daemon and returns a http
+// response and a reader for the output data.
+func (d *Daemon) SockRequestRaw(method, endpoint string, data io.Reader, ct string) (*http.Response, io.ReadCloser, error) {
+	return sockRequestRawToDaemon(method, endpoint, data, ct, d.sock())
+}
+
 // LogFileName returns the path the the daemon's log file
 func (d *Daemon) LogFileName() string {
 	return d.logFile.Name()
@@ -461,6 +483,16 @@ func (d *Daemon) getIDByName(name string) (string, error) {
 	return d.inspectFieldWithError(name, "Id")
 }
 
+func (d *Daemon) activeContainers() (ids []string) {
+	out, _ := d.Cmd("ps", "-q")
+	for _, id := range strings.Split(out, "\n") {
+		if id = strings.TrimSpace(id); id != "" {
+			ids = append(ids, id)
+		}
+	}
+	return
+}
+
 func (d *Daemon) inspectFilter(name, filter string) (string, error) {
 	format := fmt.Sprintf("{{%s}}", filter)
 	out, err := d.Cmd("inspect", "-f", format, name)
@@ -486,3 +518,12 @@ func (d *Daemon) buildImageWithOut(name, dockerfile string, useCache bool, build
 	buildCmd := buildImageCmdWithHost(name, dockerfile, d.sock(), useCache, buildFlags...)
 	return runCommandWithOutput(buildCmd)
 }
+
+func (d *Daemon) checkActiveContainerCount(c *check.C) (interface{}, check.CommentInterface) {
+	out, err := d.Cmd("ps", "-q")
+	c.Assert(err, checker.IsNil)
+	if len(strings.TrimSpace(out)) == 0 {
+		return 0, nil
+	}
+	return len(strings.Split(strings.TrimSpace(out), "\n")), check.Commentf("output: %q", string(out))
+}

+ 178 - 0
integration-cli/daemon_swarm.go

@@ -0,0 +1,178 @@
+package main
+
+import (
+	"encoding/json"
+	"fmt"
+	"net/http"
+	"strings"
+
+	"github.com/docker/docker/pkg/integration/checker"
+	"github.com/docker/engine-api/types"
+	"github.com/docker/engine-api/types/swarm"
+	"github.com/go-check/check"
+)
+
+// SwarmDaemon is a test daemon with helpers for participating in a swarm.
+type SwarmDaemon struct {
+	*Daemon
+	swarm.Info
+	port       int
+	listenAddr string
+}
+
+// Init initializes a new swarm cluster.
+func (d *SwarmDaemon) Init(autoAccept map[string]bool, secret string) error {
+	req := swarm.InitRequest{
+		ListenAddr: d.listenAddr,
+	}
+	for _, role := range []swarm.NodeRole{swarm.NodeRoleManager, swarm.NodeRoleWorker} {
+		req.Spec.AcceptancePolicy.Policies = append(req.Spec.AcceptancePolicy.Policies, swarm.Policy{
+			Role:       role,
+			Autoaccept: autoAccept[strings.ToLower(string(role))],
+			Secret:     secret,
+		})
+	}
+	status, out, err := d.SockRequest("POST", "/swarm/init", req)
+	if status != http.StatusOK {
+		return fmt.Errorf("initializing swarm: invalid statuscode %v, %q", status, out)
+	}
+	if err != nil {
+		return fmt.Errorf("initializing swarm: %v", err)
+	}
+	info, err := d.info()
+	if err != nil {
+		return err
+	}
+	d.Info = info
+	return nil
+}
+
+// Join joins a current daemon with existing cluster.
+func (d *SwarmDaemon) Join(remoteAddr, secret, cahash string, manager bool) error {
+	status, out, err := d.SockRequest("POST", "/swarm/join", swarm.JoinRequest{
+		ListenAddr:  d.listenAddr,
+		RemoteAddrs: []string{remoteAddr},
+		Manager:     manager,
+		Secret:      secret,
+		CACertHash:  cahash,
+	})
+	if status != http.StatusOK {
+		return fmt.Errorf("joining swarm: invalid statuscode %v, %q", status, out)
+	}
+	if err != nil {
+		return fmt.Errorf("joining swarm: %v", err)
+	}
+	info, err := d.info()
+	if err != nil {
+		return err
+	}
+	d.Info = info
+	return nil
+}
+
+// Leave forces daemon to leave current cluster.
+func (d *SwarmDaemon) Leave(force bool) error {
+	url := "/swarm/leave"
+	if force {
+		url += "?force=1"
+	}
+	status, out, err := d.SockRequest("POST", url, nil)
+	if status != http.StatusOK {
+		return fmt.Errorf("leaving swarm: invalid statuscode %v, %q", status, out)
+	}
+	if err != nil {
+		err = fmt.Errorf("leaving swarm: %v", err)
+	}
+	return err
+}
+
+func (d *SwarmDaemon) info() (swarm.Info, error) {
+	var info struct {
+		Swarm swarm.Info
+	}
+	status, dt, err := d.SockRequest("GET", "/info", nil)
+	if status != http.StatusOK {
+		return info.Swarm, fmt.Errorf("get swarm info: invalid statuscode %v", status)
+	}
+	if err != nil {
+		return info.Swarm, fmt.Errorf("get swarm info: %v", err)
+	}
+	if err := json.Unmarshal(dt, &info); err != nil {
+		return info.Swarm, err
+	}
+	return info.Swarm, nil
+}
+
+type serviceConstructor func(*swarm.Service)
+type nodeConstructor func(*swarm.Node)
+
+func (d *SwarmDaemon) createService(c *check.C, f ...serviceConstructor) string {
+	var service swarm.Service
+	for _, fn := range f {
+		fn(&service)
+	}
+	status, out, err := d.SockRequest("POST", "/services/create", service.Spec)
+
+	c.Assert(err, checker.IsNil)
+	c.Assert(status, checker.Equals, http.StatusCreated, check.Commentf("output: %q", string(out)))
+
+	var scr types.ServiceCreateResponse
+	c.Assert(json.Unmarshal(out, &scr), checker.IsNil)
+	return scr.ID
+}
+
+func (d *SwarmDaemon) getService(c *check.C, id string) *swarm.Service {
+	var service swarm.Service
+	status, out, err := d.SockRequest("GET", "/services/"+id, nil)
+	c.Assert(status, checker.Equals, http.StatusOK, check.Commentf("output: %q", string(out)))
+	c.Assert(err, checker.IsNil)
+	c.Assert(json.Unmarshal(out, &service), checker.IsNil)
+	c.Assert(service.ID, checker.Equals, id)
+	return &service
+}
+
+func (d *SwarmDaemon) updateService(c *check.C, service *swarm.Service, f ...serviceConstructor) {
+	for _, fn := range f {
+		fn(service)
+	}
+	url := fmt.Sprintf("/services/%s/update?version=%d", service.ID, service.Version.Index)
+	status, out, err := d.SockRequest("POST", url, service.Spec)
+	c.Assert(err, checker.IsNil)
+	c.Assert(status, checker.Equals, http.StatusOK, check.Commentf("output: %q", string(out)))
+}
+
+func (d *SwarmDaemon) removeService(c *check.C, id string) {
+	status, out, err := d.SockRequest("DELETE", "/services/"+id, nil)
+	c.Assert(status, checker.Equals, http.StatusOK, check.Commentf("output: %q", string(out)))
+	c.Assert(err, checker.IsNil)
+}
+
+func (d *SwarmDaemon) getNode(c *check.C, id string) *swarm.Node {
+	var node swarm.Node
+	status, out, err := d.SockRequest("GET", "/nodes/"+id, nil)
+	c.Assert(status, checker.Equals, http.StatusOK, check.Commentf("output: %q", string(out)))
+	c.Assert(err, checker.IsNil)
+	c.Assert(json.Unmarshal(out, &node), checker.IsNil)
+	c.Assert(node.ID, checker.Equals, id)
+	return &node
+}
+
+func (d *SwarmDaemon) updateNode(c *check.C, node *swarm.Node, f ...nodeConstructor) {
+	for _, fn := range f {
+		fn(node)
+	}
+	url := fmt.Sprintf("/nodes/%s/update?version=%d", node.ID, node.Version.Index)
+	status, out, err := d.SockRequest("POST", url, node.Spec)
+	c.Assert(err, checker.IsNil)
+	c.Assert(status, checker.Equals, http.StatusOK, check.Commentf("output: %q", string(out)))
+}
+
+func (d *SwarmDaemon) listNodes(c *check.C) []swarm.Node {
+	status, out, err := d.SockRequest("GET", "/nodes", nil)
+	c.Assert(err, checker.IsNil)
+	c.Assert(status, checker.Equals, http.StatusOK, check.Commentf("output: %q", string(out)))
+
+	nodes := []swarm.Node{}
+	c.Assert(json.Unmarshal(out, &nodes), checker.IsNil)
+	return nodes
+}

+ 2 - 2
integration-cli/docker_api_attach_test.go

@@ -17,7 +17,7 @@ func (s *DockerSuite) TestGetContainersAttachWebsocket(c *check.C) {
 	testRequires(c, DaemonIsLinux)
 	out, _ := dockerCmd(c, "run", "-dit", "busybox", "cat")
 
-	rwc, err := sockConn(time.Duration(10 * time.Second))
+	rwc, err := sockConn(time.Duration(10*time.Second), "")
 	c.Assert(err, checker.IsNil)
 
 	cleanedContainerID := strings.TrimSpace(out)
@@ -67,7 +67,7 @@ func (s *DockerSuite) TestGetContainersAttachWebsocket(c *check.C) {
 
 // regression gh14320
 func (s *DockerSuite) TestPostContainersAttachContainerNotFound(c *check.C) {
-	req, client, err := newRequestClient("POST", "/containers/doesnotexist/attach", nil, "")
+	req, client, err := newRequestClient("POST", "/containers/doesnotexist/attach", nil, "", "")
 	c.Assert(err, checker.IsNil)
 
 	resp, err := client.Do(req)

+ 1 - 1
integration-cli/docker_api_containers_test.go

@@ -1076,7 +1076,7 @@ func (s *DockerSuite) TestContainerApiChunkedEncoding(c *check.C) {
 	// TODO Windows CI: This can be ported
 	testRequires(c, DaemonIsLinux)
 
-	conn, err := sockConn(time.Duration(10 * time.Second))
+	conn, err := sockConn(time.Duration(10*time.Second), "")
 	c.Assert(err, checker.IsNil)
 	client := httputil.NewClientConn(conn, nil)
 	defer client.Close()

+ 573 - 0
integration-cli/docker_api_swarm_test.go

@@ -0,0 +1,573 @@
+// +build !windows
+
+package main
+
+import (
+	"net/http"
+	"strconv"
+	"strings"
+	"syscall"
+	"time"
+
+	"github.com/docker/docker/pkg/integration/checker"
+	"github.com/docker/engine-api/types/swarm"
+	"github.com/go-check/check"
+)
+
+var defaultReconciliationTimeout = 30 * time.Second
+
+func (s *DockerSwarmSuite) TestApiSwarmInit(c *check.C) {
+	// todo: should find a better way to verify that components are running than /info
+	d1 := s.AddDaemon(c, true, true)
+	info, err := d1.info()
+	c.Assert(err, checker.IsNil)
+	c.Assert(info.ControlAvailable, checker.Equals, true)
+	c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStateActive)
+
+	d2 := s.AddDaemon(c, true, false)
+	info, err = d2.info()
+	c.Assert(err, checker.IsNil)
+	c.Assert(info.ControlAvailable, checker.Equals, false)
+	c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStateActive)
+
+	// Leaving cluster
+	c.Assert(d2.Leave(false), checker.IsNil)
+
+	info, err = d2.info()
+	c.Assert(err, checker.IsNil)
+	c.Assert(info.ControlAvailable, checker.Equals, false)
+	c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStateInactive)
+
+	c.Assert(d2.Join(d1.listenAddr, "", "", false), checker.IsNil)
+
+	info, err = d2.info()
+	c.Assert(err, checker.IsNil)
+	c.Assert(info.ControlAvailable, checker.Equals, false)
+	c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStateActive)
+
+	// Current state restoring after restarts
+	err = d1.Stop()
+	c.Assert(err, checker.IsNil)
+	err = d2.Stop()
+	c.Assert(err, checker.IsNil)
+
+	err = d1.Start()
+	c.Assert(err, checker.IsNil)
+	err = d2.Start()
+	c.Assert(err, checker.IsNil)
+
+	info, err = d1.info()
+	c.Assert(err, checker.IsNil)
+	c.Assert(info.ControlAvailable, checker.Equals, true)
+	c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStateActive)
+
+	info, err = d2.info()
+	c.Assert(err, checker.IsNil)
+	c.Assert(info.ControlAvailable, checker.Equals, false)
+	c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStateActive)
+}
+
+func (s *DockerSwarmSuite) TestApiSwarmManualAcceptance(c *check.C) {
+	s.testAPISwarmManualAcceptance(c, "")
+}
+func (s *DockerSwarmSuite) TestApiSwarmManualAcceptanceSecret(c *check.C) {
+	s.testAPISwarmManualAcceptance(c, "foobaz")
+}
+
+func (s *DockerSwarmSuite) testAPISwarmManualAcceptance(c *check.C, secret string) {
+	d1 := s.AddDaemon(c, false, false)
+	c.Assert(d1.Init(map[string]bool{}, secret), checker.IsNil)
+
+	d2 := s.AddDaemon(c, false, false)
+	err := d2.Join(d1.listenAddr, "", "", false)
+	c.Assert(err, checker.NotNil)
+	if secret == "" {
+		c.Assert(err.Error(), checker.Contains, "Timeout reached")
+		info, err := d2.info()
+		c.Assert(err, checker.IsNil)
+		c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStatePending)
+		c.Assert(d2.Leave(false), checker.IsNil)
+		info, err = d2.info()
+		c.Assert(err, checker.IsNil)
+		c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStateInactive)
+	} else {
+		c.Assert(err.Error(), checker.Contains, "valid secret token is necessary")
+		info, err := d2.info()
+		c.Assert(err, checker.IsNil)
+		c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStateInactive)
+	}
+	d3 := s.AddDaemon(c, false, false)
+	go func() {
+		for i := 0; ; i++ {
+			info, err := d3.info()
+			c.Assert(err, checker.IsNil)
+			if info.NodeID != "" {
+				d1.updateNode(c, d1.getNode(c, info.NodeID), func(n *swarm.Node) {
+					n.Spec.Membership = swarm.NodeMembershipAccepted
+				})
+				return
+			}
+			if i >= 10 {
+				c.Errorf("could not find nodeID")
+			}
+			time.Sleep(300 * time.Millisecond)
+		}
+	}()
+	c.Assert(d3.Join(d1.listenAddr, secret, "", false), checker.IsNil)
+}
+
+func (s *DockerSwarmSuite) TestApiSwarmSecretAcceptance(c *check.C) {
+	d1 := s.AddDaemon(c, false, false)
+	aa := make(map[string]bool)
+	aa["worker"] = true
+	c.Assert(d1.Init(aa, "foobar"), checker.IsNil)
+
+	d2 := s.AddDaemon(c, false, false)
+	err := d2.Join(d1.listenAddr, "", "", false)
+	c.Assert(err, checker.NotNil)
+	c.Assert(err.Error(), checker.Contains, "secret token is necessary")
+	info, err := d2.info()
+	c.Assert(err, checker.IsNil)
+	c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStateInactive)
+
+	err = d2.Join(d1.listenAddr, "foobaz", "", false)
+	c.Assert(err, checker.NotNil)
+	c.Assert(err.Error(), checker.Contains, "secret token is necessary")
+	info, err = d2.info()
+	c.Assert(err, checker.IsNil)
+	c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStateInactive)
+
+	c.Assert(d2.Join(d1.listenAddr, "foobar", "", false), checker.IsNil)
+	info, err = d2.info()
+	c.Assert(err, checker.IsNil)
+	c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStateActive)
+	c.Assert(d2.Leave(false), checker.IsNil)
+	info, err = d2.info()
+	c.Assert(err, checker.IsNil)
+	c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStateInactive)
+}
+
+func (s *DockerSwarmSuite) TestApiSwarmCAHash(c *check.C) {
+	d1 := s.AddDaemon(c, true, true)
+	d2 := s.AddDaemon(c, false, false)
+	err := d2.Join(d1.listenAddr, "", "foobar", false)
+	c.Assert(err, checker.NotNil)
+	c.Assert(err.Error(), checker.Contains, "invalid checksum digest format")
+
+	c.Assert(len(d1.CACertHash), checker.GreaterThan, 0)
+	c.Assert(d2.Join(d1.listenAddr, "", d1.CACertHash, false), checker.IsNil)
+}
+
+func (s *DockerSwarmSuite) TestApiSwarmPromoteDemote(c *check.C) {
+	d1 := s.AddDaemon(c, false, false)
+	c.Assert(d1.Init(map[string]bool{"worker": true}, ""), checker.IsNil)
+	d2 := s.AddDaemon(c, true, false)
+
+	info, err := d2.info()
+	c.Assert(err, checker.IsNil)
+	c.Assert(info.ControlAvailable, checker.Equals, false)
+	c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStateActive)
+
+	d1.updateNode(c, d1.getNode(c, d2.NodeID), func(n *swarm.Node) {
+		n.Spec.Role = swarm.NodeRoleManager
+	})
+
+	for i := 0; ; i++ {
+		info, err := d2.info()
+		c.Assert(err, checker.IsNil)
+		c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStateActive)
+		if info.ControlAvailable {
+			break
+		}
+		if i > 10 {
+			c.Errorf("node did not turn into manager")
+		} else {
+			break
+		}
+		time.Sleep(100 * time.Millisecond)
+	}
+
+	d1.updateNode(c, d1.getNode(c, d2.NodeID), func(n *swarm.Node) {
+		n.Spec.Role = swarm.NodeRoleWorker
+	})
+
+	for i := 0; ; i++ {
+		info, err := d2.info()
+		c.Assert(err, checker.IsNil)
+		c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStateActive)
+		if !info.ControlAvailable {
+			break
+		}
+		if i > 10 {
+			c.Errorf("node did not turn into manager")
+		} else {
+			break
+		}
+		time.Sleep(100 * time.Millisecond)
+	}
+
+	// todo: test raft qourum stability
+}
+
+func (s *DockerSwarmSuite) TestApiSwarmServicesCreate(c *check.C) {
+	d := s.AddDaemon(c, true, true)
+
+	instances := 2
+	id := d.createService(c, simpleTestService, setInstances(instances))
+	waitAndAssert(c, defaultReconciliationTimeout, d.checkActiveContainerCount, checker.Equals, instances)
+
+	service := d.getService(c, id)
+	instances = 5
+	d.updateService(c, service, setInstances(instances))
+	waitAndAssert(c, defaultReconciliationTimeout, d.checkActiveContainerCount, checker.Equals, instances)
+
+	d.removeService(c, service.ID)
+	waitAndAssert(c, defaultReconciliationTimeout, d.checkActiveContainerCount, checker.Equals, 0)
+}
+
+func (s *DockerSwarmSuite) TestApiSwarmServicesMultipleAgents(c *check.C) {
+	d1 := s.AddDaemon(c, true, true)
+	d2 := s.AddDaemon(c, true, false)
+	d3 := s.AddDaemon(c, true, false)
+
+	time.Sleep(1 * time.Second) // make sure all daemons are ready to accept tasks
+
+	instances := 9
+	id := d1.createService(c, simpleTestService, setInstances(instances))
+
+	waitAndAssert(c, defaultReconciliationTimeout, d1.checkActiveContainerCount, checker.GreaterThan, 0)
+	waitAndAssert(c, defaultReconciliationTimeout, d2.checkActiveContainerCount, checker.GreaterThan, 0)
+	waitAndAssert(c, defaultReconciliationTimeout, d3.checkActiveContainerCount, checker.GreaterThan, 0)
+
+	waitAndAssert(c, defaultReconciliationTimeout, reducedCheck(sumAsIntegers, d1.checkActiveContainerCount, d2.checkActiveContainerCount, d3.checkActiveContainerCount), checker.Equals, instances)
+
+	// reconciliation on d2 node down
+	c.Assert(d2.Stop(), checker.IsNil)
+
+	waitAndAssert(c, defaultReconciliationTimeout, reducedCheck(sumAsIntegers, d1.checkActiveContainerCount, d3.checkActiveContainerCount), checker.Equals, instances)
+
+	// test downscaling
+	instances = 5
+	d1.updateService(c, d1.getService(c, id), setInstances(instances))
+	waitAndAssert(c, defaultReconciliationTimeout, reducedCheck(sumAsIntegers, d1.checkActiveContainerCount, d3.checkActiveContainerCount), checker.Equals, instances)
+
+}
+
+func (s *DockerSwarmSuite) TestApiSwarmServicesCreateGlobal(c *check.C) {
+	d1 := s.AddDaemon(c, true, true)
+	d2 := s.AddDaemon(c, true, false)
+	d3 := s.AddDaemon(c, true, false)
+
+	d1.createService(c, simpleTestService, setGlobalMode)
+
+	waitAndAssert(c, defaultReconciliationTimeout, d1.checkActiveContainerCount, checker.Equals, 1)
+	waitAndAssert(c, defaultReconciliationTimeout, d2.checkActiveContainerCount, checker.Equals, 1)
+	waitAndAssert(c, defaultReconciliationTimeout, d3.checkActiveContainerCount, checker.Equals, 1)
+
+	d4 := s.AddDaemon(c, true, false)
+	d5 := s.AddDaemon(c, true, false)
+
+	waitAndAssert(c, defaultReconciliationTimeout, d4.checkActiveContainerCount, checker.Equals, 1)
+	waitAndAssert(c, defaultReconciliationTimeout, d5.checkActiveContainerCount, checker.Equals, 1)
+}
+
+func (s *DockerSwarmSuite) TestApiSwarmServicesStateReporting(c *check.C) {
+	testRequires(c, SameHostDaemon)
+	testRequires(c, DaemonIsLinux)
+
+	d1 := s.AddDaemon(c, true, true)
+	d2 := s.AddDaemon(c, true, true)
+	d3 := s.AddDaemon(c, true, false)
+
+	time.Sleep(1 * time.Second) // make sure all daemons are ready to accept
+
+	instances := 9
+	d1.createService(c, simpleTestService, setInstances(instances))
+
+	waitAndAssert(c, defaultReconciliationTimeout, reducedCheck(sumAsIntegers, d1.checkActiveContainerCount, d2.checkActiveContainerCount, d3.checkActiveContainerCount), checker.Equals, instances)
+
+	getContainers := func() map[string]*SwarmDaemon {
+		m := make(map[string]*SwarmDaemon)
+		for _, d := range []*SwarmDaemon{d1, d2, d3} {
+			for _, id := range d.activeContainers() {
+				m[id] = d
+			}
+		}
+		return m
+	}
+
+	containers := getContainers()
+	c.Assert(containers, checker.HasLen, instances)
+	var toRemove string
+	for i := range containers {
+		toRemove = i
+	}
+
+	_, err := containers[toRemove].Cmd("stop", toRemove)
+	c.Assert(err, checker.IsNil)
+
+	waitAndAssert(c, defaultReconciliationTimeout, reducedCheck(sumAsIntegers, d1.checkActiveContainerCount, d2.checkActiveContainerCount, d3.checkActiveContainerCount), checker.Equals, instances)
+
+	containers2 := getContainers()
+	c.Assert(containers2, checker.HasLen, instances)
+	for i := range containers {
+		if i == toRemove {
+			c.Assert(containers2[i], checker.IsNil)
+		} else {
+			c.Assert(containers2[i], checker.NotNil)
+		}
+	}
+
+	containers = containers2
+	for i := range containers {
+		toRemove = i
+	}
+
+	// try with killing process outside of docker
+	pidStr, err := containers[toRemove].Cmd("inspect", "-f", "{{.State.Pid}}", toRemove)
+	c.Assert(err, checker.IsNil)
+	pid, err := strconv.Atoi(strings.TrimSpace(pidStr))
+	c.Assert(err, checker.IsNil)
+	c.Assert(syscall.Kill(pid, syscall.SIGKILL), checker.IsNil)
+
+	time.Sleep(time.Second) // give some time to handle the signal
+
+	waitAndAssert(c, defaultReconciliationTimeout, reducedCheck(sumAsIntegers, d1.checkActiveContainerCount, d2.checkActiveContainerCount, d3.checkActiveContainerCount), checker.Equals, instances)
+
+	containers2 = getContainers()
+	c.Assert(containers2, checker.HasLen, instances)
+	for i := range containers {
+		if i == toRemove {
+			c.Assert(containers2[i], checker.IsNil)
+		} else {
+			c.Assert(containers2[i], checker.NotNil)
+		}
+	}
+}
+
+func (s *DockerSwarmSuite) TestApiSwarmRaftQuorum(c *check.C) {
+	d1 := s.AddDaemon(c, true, true)
+	d2 := s.AddDaemon(c, true, true)
+	d3 := s.AddDaemon(c, true, true)
+
+	d1.createService(c, simpleTestService)
+
+	c.Assert(d2.Stop(), checker.IsNil)
+
+	d1.createService(c, simpleTestService, func(s *swarm.Service) {
+		s.Spec.Name = "top1"
+	})
+
+	c.Assert(d3.Stop(), checker.IsNil)
+
+	var service swarm.Service
+	simpleTestService(&service)
+	service.Spec.Name = "top2"
+	status, out, err := d1.SockRequest("POST", "/services/create", service.Spec)
+	c.Assert(err, checker.IsNil)
+	c.Assert(status, checker.Equals, http.StatusInternalServerError, check.Commentf("deadline exceeded", string(out)))
+
+	c.Assert(d2.Start(), checker.IsNil)
+
+	d1.createService(c, simpleTestService, func(s *swarm.Service) {
+		s.Spec.Name = "top3"
+	})
+}
+
+func (s *DockerSwarmSuite) TestApiSwarmListNodes(c *check.C) {
+	d1 := s.AddDaemon(c, true, true)
+	d2 := s.AddDaemon(c, true, false)
+	d3 := s.AddDaemon(c, true, false)
+
+	nodes := d1.listNodes(c)
+	c.Assert(len(nodes), checker.Equals, 3, check.Commentf("nodes: %#v", nodes))
+
+loop0:
+	for _, n := range nodes {
+		for _, d := range []*SwarmDaemon{d1, d2, d3} {
+			if n.ID == d.NodeID {
+				continue loop0
+			}
+		}
+		c.Errorf("unknown nodeID %v", n.ID)
+	}
+}
+
+func (s *DockerSwarmSuite) TestApiSwarmNodeUpdate(c *check.C) {
+	d := s.AddDaemon(c, true, true)
+
+	nodes := d.listNodes(c)
+
+	d.updateNode(c, d.getNode(c, nodes[0].ID), func(n *swarm.Node) {
+		n.Spec.Availability = swarm.NodeAvailabilityPause
+	})
+
+	n := d.getNode(c, nodes[0].ID)
+	c.Assert(n.Spec.Availability, checker.Equals, swarm.NodeAvailabilityPause)
+}
+
+func (s *DockerSwarmSuite) TestApiSwarmNodeDrainPause(c *check.C) {
+	d1 := s.AddDaemon(c, true, true)
+	d2 := s.AddDaemon(c, true, false)
+
+	time.Sleep(1 * time.Second) // make sure all daemons are ready to accept tasks
+
+	// start a service, expect balanced distribution
+	instances := 8
+	id := d1.createService(c, simpleTestService, setInstances(instances))
+
+	waitAndAssert(c, defaultReconciliationTimeout, d1.checkActiveContainerCount, checker.GreaterThan, 0)
+	waitAndAssert(c, defaultReconciliationTimeout, d2.checkActiveContainerCount, checker.GreaterThan, 0)
+	waitAndAssert(c, defaultReconciliationTimeout, reducedCheck(sumAsIntegers, d1.checkActiveContainerCount, d2.checkActiveContainerCount), checker.Equals, instances)
+
+	// drain d2, all containers should move to d1
+	d1.updateNode(c, d1.getNode(c, d2.NodeID), func(n *swarm.Node) {
+		n.Spec.Availability = swarm.NodeAvailabilityDrain
+	})
+	waitAndAssert(c, defaultReconciliationTimeout, d1.checkActiveContainerCount, checker.Equals, instances)
+	waitAndAssert(c, defaultReconciliationTimeout, d2.checkActiveContainerCount, checker.Equals, 0)
+
+	// set d2 back to active
+	d1.updateNode(c, d1.getNode(c, d2.NodeID), func(n *swarm.Node) {
+		n.Spec.Availability = swarm.NodeAvailabilityActive
+	})
+
+	// change environment variable, resulting balanced rescheduling
+	d1.updateService(c, d1.getService(c, id), func(s *swarm.Service) {
+		s.Spec.TaskTemplate.ContainerSpec.Env = []string{"FOO=BAR"}
+		s.Spec.UpdateConfig = &swarm.UpdateConfig{
+			Parallelism: 2,
+			Delay:       250 * time.Millisecond,
+		}
+	})
+
+	// drained node first so we don't get any old containers
+	waitAndAssert(c, defaultReconciliationTimeout, d2.checkActiveContainerCount, checker.GreaterThan, 0)
+	waitAndAssert(c, defaultReconciliationTimeout, d1.checkActiveContainerCount, checker.GreaterThan, 0)
+	waitAndAssert(c, defaultReconciliationTimeout*2, reducedCheck(sumAsIntegers, d1.checkActiveContainerCount, d2.checkActiveContainerCount), checker.Equals, instances)
+
+	d2ContainerCount := len(d2.activeContainers())
+
+	// set d2 to paused, scale service up, only d1 gets new tasks
+	d1.updateNode(c, d1.getNode(c, d2.NodeID), func(n *swarm.Node) {
+		n.Spec.Availability = swarm.NodeAvailabilityPause
+	})
+
+	instances = 14
+	d1.updateService(c, d1.getService(c, id), setInstances(instances))
+
+	waitAndAssert(c, defaultReconciliationTimeout, d1.checkActiveContainerCount, checker.Equals, instances-d2ContainerCount)
+	waitAndAssert(c, defaultReconciliationTimeout, d2.checkActiveContainerCount, checker.Equals, d2ContainerCount)
+
+}
+
+func (s *DockerSwarmSuite) TestApiSwarmLeaveRemovesContainer(c *check.C) {
+	d := s.AddDaemon(c, true, true)
+
+	instances := 2
+	d.createService(c, simpleTestService, setInstances(instances))
+
+	id, err := d.Cmd("run", "-d", "busybox", "top")
+	c.Assert(err, checker.IsNil)
+	id = strings.TrimSpace(id)
+
+	waitAndAssert(c, defaultReconciliationTimeout, d.checkActiveContainerCount, checker.Equals, instances+1)
+
+	c.Assert(d.Leave(false), checker.NotNil)
+	c.Assert(d.Leave(true), checker.IsNil)
+
+	waitAndAssert(c, defaultReconciliationTimeout, d.checkActiveContainerCount, checker.Equals, 1)
+
+	id2, err := d.Cmd("ps", "-q")
+	c.Assert(err, checker.IsNil)
+	c.Assert(id, checker.HasPrefix, strings.TrimSpace(id2))
+}
+
+func (s *DockerSwarmSuite) TestApiSwarmManagerRestore(c *check.C) {
+	d1 := s.AddDaemon(c, true, true)
+
+	instances := 2
+	id := d1.createService(c, simpleTestService, setInstances(instances))
+
+	d1.getService(c, id)
+	d1.Stop()
+	d1.Start()
+	d1.getService(c, id)
+
+	d2 := s.AddDaemon(c, true, true)
+	d2.getService(c, id)
+	d2.Stop()
+	d2.Start()
+	d2.getService(c, id)
+
+	d3 := s.AddDaemon(c, true, true)
+	d3.getService(c, id)
+	d3.Stop()
+	d3.Start()
+	d3.getService(c, id)
+
+	d3.Kill()
+	time.Sleep(1 * time.Second) // time to handle signal
+	d3.Start()
+	d3.getService(c, id)
+}
+
+func (s *DockerSwarmSuite) TestApiSwarmScaleNoRollingUpdate(c *check.C) {
+	d := s.AddDaemon(c, true, true)
+
+	instances := 2
+	id := d.createService(c, simpleTestService, setInstances(instances))
+
+	waitAndAssert(c, defaultReconciliationTimeout, d.checkActiveContainerCount, checker.Equals, instances)
+	containers := d.activeContainers()
+	instances = 4
+	d.updateService(c, d.getService(c, id), setInstances(instances))
+	waitAndAssert(c, defaultReconciliationTimeout, d.checkActiveContainerCount, checker.Equals, instances)
+	containers2 := d.activeContainers()
+
+loop0:
+	for _, c1 := range containers {
+		for _, c2 := range containers2 {
+			if c1 == c2 {
+				continue loop0
+			}
+		}
+		c.Errorf("container %v not found in new set %#v", c1, containers2)
+	}
+}
+
+func simpleTestService(s *swarm.Service) {
+	var ureplicas uint64
+	ureplicas = 1
+	s.Spec = swarm.ServiceSpec{
+		TaskTemplate: swarm.TaskSpec{
+			ContainerSpec: swarm.ContainerSpec{
+				Image:   "busybox:latest",
+				Command: []string{"/bin/top"},
+			},
+		},
+		Mode: swarm.ServiceMode{
+			Replicated: &swarm.ReplicatedService{
+				Replicas: &ureplicas,
+			},
+		},
+	}
+	s.Spec.Name = "top"
+}
+
+func setInstances(replicas int) serviceConstructor {
+	ureplicas := uint64(replicas)
+	return func(s *swarm.Service) {
+		s.Spec.Mode = swarm.ServiceMode{
+			Replicated: &swarm.ReplicatedService{
+				Replicas: &ureplicas,
+			},
+		}
+	}
+}
+
+func setGlobalMode(s *swarm.Service) {
+	s.Spec.Mode = swarm.ServiceMode{
+		Global: &swarm.GlobalService{},
+	}
+}

+ 1 - 1
integration-cli/docker_api_test.go

@@ -34,7 +34,7 @@ func (s *DockerSuite) TestApiGetEnabledCors(c *check.C) {
 }
 
 func (s *DockerSuite) TestApiVersionStatusCode(c *check.C) {
-	conn, err := sockConn(time.Duration(10 * time.Second))
+	conn, err := sockConn(time.Duration(10*time.Second), "")
 	c.Assert(err, checker.IsNil)
 
 	client := httputil.NewClientConn(conn, nil)

+ 59 - 6
integration-cli/docker_utils.go

@@ -124,8 +124,10 @@ func getTLSConfig() (*tls.Config, error) {
 	return tlsConfig, nil
 }
 
-func sockConn(timeout time.Duration) (net.Conn, error) {
-	daemon := daemonHost()
+func sockConn(timeout time.Duration, daemon string) (net.Conn, error) {
+	if daemon == "" {
+		daemon = daemonHost()
+	}
 	daemonURL, err := url.Parse(daemon)
 	if err != nil {
 		return nil, fmt.Errorf("could not parse url %q: %v", daemon, err)
@@ -168,7 +170,11 @@ func sockRequest(method, endpoint string, data interface{}) (int, []byte, error)
 }
 
 func sockRequestRaw(method, endpoint string, data io.Reader, ct string) (*http.Response, io.ReadCloser, error) {
-	req, client, err := newRequestClient(method, endpoint, data, ct)
+	return sockRequestRawToDaemon(method, endpoint, data, ct, "")
+}
+
+func sockRequestRawToDaemon(method, endpoint string, data io.Reader, ct, daemon string) (*http.Response, io.ReadCloser, error) {
+	req, client, err := newRequestClient(method, endpoint, data, ct, daemon)
 	if err != nil {
 		return nil, nil, err
 	}
@@ -187,7 +193,7 @@ func sockRequestRaw(method, endpoint string, data io.Reader, ct string) (*http.R
 }
 
 func sockRequestHijack(method, endpoint string, data io.Reader, ct string) (net.Conn, *bufio.Reader, error) {
-	req, client, err := newRequestClient(method, endpoint, data, ct)
+	req, client, err := newRequestClient(method, endpoint, data, ct, "")
 	if err != nil {
 		return nil, nil, err
 	}
@@ -197,8 +203,8 @@ func sockRequestHijack(method, endpoint string, data io.Reader, ct string) (net.
 	return conn, br, nil
 }
 
-func newRequestClient(method, endpoint string, data io.Reader, ct string) (*http.Request, *httputil.ClientConn, error) {
-	c, err := sockConn(time.Duration(10 * time.Second))
+func newRequestClient(method, endpoint string, data io.Reader, ct, daemon string) (*http.Request, *httputil.ClientConn, error) {
+	c, err := sockConn(time.Duration(10*time.Second), daemon)
 	if err != nil {
 		return nil, nil, fmt.Errorf("could not dial docker daemon: %v", err)
 	}
@@ -1514,3 +1520,50 @@ func getErrorMessage(c *check.C, body []byte) string {
 	c.Assert(json.Unmarshal(body, &resp), check.IsNil)
 	return strings.TrimSpace(resp.Message)
 }
+
+func waitAndAssert(c *check.C, timeout time.Duration, f checkF, checker check.Checker, args ...interface{}) {
+	after := time.After(timeout)
+	for {
+		v, comment := f(c)
+		assert, _ := checker.Check(append([]interface{}{v}, args...), checker.Info().Params)
+		select {
+		case <-after:
+			assert = true
+		default:
+		}
+		if assert {
+			if comment != nil {
+				args = append(args, comment)
+			}
+			c.Assert(v, checker, args...)
+			return
+		}
+		time.Sleep(100 * time.Millisecond)
+	}
+}
+
+type checkF func(*check.C) (interface{}, check.CommentInterface)
+type reducer func(...interface{}) interface{}
+
+func reducedCheck(r reducer, funcs ...checkF) checkF {
+	return func(c *check.C) (interface{}, check.CommentInterface) {
+		var values []interface{}
+		var comments []string
+		for _, f := range funcs {
+			v, comment := f(c)
+			values = append(values, v)
+			if comment != nil {
+				comments = append(comments, comment.CheckCommentString())
+			}
+		}
+		return r(values...), check.Commentf("%v", strings.Join(comments, ", "))
+	}
+}
+
+func sumAsIntegers(vals ...interface{}) interface{} {
+	var s int
+	for _, v := range vals {
+		s += v.(int)
+	}
+	return s
+}