Unify swarm init and update options

Add api side validation and defaults for init and
join requests.

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
(cherry picked from commit fb3eb1c27e)
This commit is contained in:
Tonis Tiigi 2016-06-21 14:27:04 -07:00 committed by Tibor Vass
parent 8691607ade
commit 3d06cd4910
9 changed files with 255 additions and 77 deletions

View file

@ -13,16 +13,17 @@ import (
)
type initOptions struct {
swarmOptions
listenAddr NodeAddrOption
autoAccept AutoAcceptOption
forceNewCluster bool
secret string
}
func newInitCommand(dockerCli *client.DockerCli) *cobra.Command {
opts := initOptions{
listenAddr: NewListenAddrOption(),
swarmOptions: swarmOptions{
autoAccept: NewAutoAcceptOption(),
},
}
cmd := &cobra.Command{
@ -36,9 +37,8 @@ func newInitCommand(dockerCli *client.DockerCli) *cobra.Command {
flags := cmd.Flags()
flags.Var(&opts.listenAddr, "listen-addr", "Listen address")
flags.Var(&opts.autoAccept, flagAutoAccept, "Auto acceptance policy (worker, manager, or none)")
flags.StringVar(&opts.secret, flagSecret, "", "Set secret value needed to accept nodes into cluster")
flags.BoolVar(&opts.forceNewCluster, "force-new-cluster", false, "Force create a new cluster from current state.")
addSwarmFlags(flags, &opts.swarmOptions)
return cmd
}
@ -49,13 +49,9 @@ func runInit(dockerCli *client.DockerCli, flags *pflag.FlagSet, opts initOptions
req := swarm.InitRequest{
ListenAddr: opts.listenAddr.String(),
ForceNewCluster: opts.forceNewCluster,
Spec: opts.swarmOptions.ToSpec(),
}
if flags.Changed(flagSecret) {
req.Spec.AcceptancePolicy.Policies = opts.autoAccept.Policies(&opts.secret)
} else {
req.Spec.AcceptancePolicy.Policies = opts.autoAccept.Policies(nil)
}
nodeID, err := client.SwarmInit(ctx, req)
if err != nil {
return err

View file

@ -2,16 +2,16 @@ package swarm
import (
"fmt"
"net"
"strconv"
"strings"
"time"
"github.com/docker/docker/opts"
"github.com/docker/engine-api/types/swarm"
"github.com/spf13/pflag"
)
const (
defaultListenAddr = "0.0.0.0"
defaultListenPort uint16 = 2377
defaultListenAddr = "0.0.0.0:2377"
// WORKER constant for worker name
WORKER = "WORKER"
// MANAGER constant for manager name
@ -32,10 +32,17 @@ var (
}
)
type swarmOptions struct {
autoAccept AutoAcceptOption
secret string
taskHistoryLimit int64
dispatcherHeartbeat time.Duration
nodeCertExpiry time.Duration
}
// NodeAddrOption is a pflag.Value for listen and remote addresses
type NodeAddrOption struct {
addr string
port uint16
}
// String prints the representation of this flag
@ -45,25 +52,11 @@ func (a *NodeAddrOption) String() string {
// Set the value for this flag
func (a *NodeAddrOption) Set(value string) error {
if !strings.Contains(value, ":") {
a.addr = value
return nil
}
host, port, err := net.SplitHostPort(value)
addr, err := opts.ParseTCPAddr(value, a.addr)
if err != nil {
return fmt.Errorf("Invalid url, %v", err)
}
portInt, err := strconv.ParseUint(port, 10, 16)
if err != nil {
return fmt.Errorf("invalid url, %v", err)
}
a.port = uint16(portInt)
if host != "" {
a.addr = host
return err
}
a.addr = addr
return nil
}
@ -74,17 +67,17 @@ func (a *NodeAddrOption) Type() string {
// Value returns the value of this option as addr:port
func (a *NodeAddrOption) Value() string {
return net.JoinHostPort(a.addr, strconv.Itoa(int(a.port)))
return strings.TrimPrefix(a.addr, "tcp://")
}
// NewNodeAddrOption returns a new node address option
func NewNodeAddrOption(host string, port uint16) NodeAddrOption {
return NodeAddrOption{addr: host, port: port}
func NewNodeAddrOption(addr string) NodeAddrOption {
return NodeAddrOption{addr}
}
// NewListenAddrOption returns a NodeAddrOption with default values
func NewListenAddrOption() NodeAddrOption {
return NewNodeAddrOption(defaultListenAddr, defaultListenPort)
return NewNodeAddrOption(defaultListenAddr)
}
// AutoAcceptOption is a value type for auto-accept policy
@ -148,3 +141,24 @@ func (o *AutoAcceptOption) Policies(secret *string) []swarm.Policy {
func NewAutoAcceptOption() AutoAcceptOption {
return AutoAcceptOption{values: make(map[string]bool)}
}
func addSwarmFlags(flags *pflag.FlagSet, opts *swarmOptions) {
flags.Var(&opts.autoAccept, flagAutoAccept, "Auto acceptance policy (worker, manager or none)")
flags.StringVar(&opts.secret, flagSecret, "", "Set secret value needed to accept nodes into cluster")
flags.Int64Var(&opts.taskHistoryLimit, flagTaskHistoryLimit, 10, "Task history retention limit")
flags.DurationVar(&opts.dispatcherHeartbeat, flagDispatcherHeartbeat, time.Duration(5*time.Second), "Dispatcher heartbeat period")
flags.DurationVar(&opts.nodeCertExpiry, flagCertExpiry, time.Duration(90*24*time.Hour), "Validity period for node certificates")
}
func (opts *swarmOptions) ToSpec() swarm.Spec {
spec := swarm.Spec{}
if opts.secret != "" {
spec.AcceptancePolicy.Policies = opts.autoAccept.Policies(&opts.secret)
} else {
spec.AcceptancePolicy.Policies = opts.autoAccept.Policies(nil)
}
spec.Orchestration.TaskHistoryRetentionLimit = opts.taskHistoryLimit
spec.Dispatcher.HeartbeatPeriod = uint64(opts.dispatcherHeartbeat.Nanoseconds())
spec.CAConfig.NodeCertExpiry = opts.nodeCertExpiry
return spec
}

View file

@ -8,31 +8,33 @@ import (
)
func TestNodeAddrOptionSetHostAndPort(t *testing.T) {
opt := NewNodeAddrOption("old", 123)
opt := NewNodeAddrOption("old:123")
addr := "newhost:5555"
assert.NilError(t, opt.Set(addr))
assert.Equal(t, opt.addr, "newhost")
assert.Equal(t, opt.port, uint16(5555))
assert.Equal(t, opt.Value(), addr)
}
func TestNodeAddrOptionSetHostOnly(t *testing.T) {
opt := NewListenAddrOption()
assert.NilError(t, opt.Set("newhost"))
assert.Equal(t, opt.addr, "newhost")
assert.Equal(t, opt.port, defaultListenPort)
assert.Equal(t, opt.Value(), "newhost:2377")
}
func TestNodeAddrOptionSetHostOnlyIPv6(t *testing.T) {
opt := NewListenAddrOption()
assert.NilError(t, opt.Set("::1"))
assert.Equal(t, opt.Value(), "[::1]:2377")
}
func TestNodeAddrOptionSetPortOnly(t *testing.T) {
opt := NewListenAddrOption()
assert.NilError(t, opt.Set(":4545"))
assert.Equal(t, opt.addr, defaultListenAddr)
assert.Equal(t, opt.port, uint16(4545))
assert.Equal(t, opt.Value(), "0.0.0.0:4545")
}
func TestNodeAddrOptionSetInvalidFormat(t *testing.T) {
opt := NewListenAddrOption()
assert.Error(t, opt.Set("http://localhost:4545"), "Invalid url")
assert.Error(t, opt.Set("http://localhost:4545"), "Invalid")
}
func TestAutoAcceptOptionSetWorker(t *testing.T) {

View file

@ -2,7 +2,6 @@ package swarm
import (
"fmt"
"time"
"golang.org/x/net/context"
@ -13,16 +12,8 @@ import (
"github.com/spf13/pflag"
)
type updateOptions struct {
autoAccept AutoAcceptOption
secret string
taskHistoryLimit int64
dispatcherHeartbeat time.Duration
nodeCertExpiry time.Duration
}
func newUpdateCommand(dockerCli *client.DockerCli) *cobra.Command {
opts := updateOptions{autoAccept: NewAutoAcceptOption()}
opts := swarmOptions{autoAccept: NewAutoAcceptOption()}
cmd := &cobra.Command{
Use: "update",
@ -33,16 +24,11 @@ func newUpdateCommand(dockerCli *client.DockerCli) *cobra.Command {
},
}
flags := cmd.Flags()
flags.Var(&opts.autoAccept, flagAutoAccept, "Auto acceptance policy (worker, manager or none)")
flags.StringVar(&opts.secret, flagSecret, "", "Set secret value needed to accept nodes into cluster")
flags.Int64Var(&opts.taskHistoryLimit, flagTaskHistoryLimit, 10, "Task history retention limit")
flags.DurationVar(&opts.dispatcherHeartbeat, flagDispatcherHeartbeat, time.Duration(5*time.Second), "Dispatcher heartbeat period")
flags.DurationVar(&opts.nodeCertExpiry, flagCertExpiry, time.Duration(90*24*time.Hour), "Validity period for node certificates")
addSwarmFlags(cmd.Flags(), &opts)
return cmd
}
func runUpdate(dockerCli *client.DockerCli, flags *pflag.FlagSet, opts updateOptions) error {
func runUpdate(dockerCli *client.DockerCli, flags *pflag.FlagSet, opts swarmOptions) error {
client := dockerCli.Client()
ctx := context.Background()

View file

@ -13,10 +13,12 @@ import (
"google.golang.org/grpc"
"github.com/Sirupsen/logrus"
"github.com/docker/distribution/digest"
"github.com/docker/docker/daemon/cluster/convert"
executorpkg "github.com/docker/docker/daemon/cluster/executor"
"github.com/docker/docker/daemon/cluster/executor/container"
"github.com/docker/docker/errors"
"github.com/docker/docker/opts"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/runconfig"
apitypes "github.com/docker/engine-api/types"
@ -30,6 +32,7 @@ const swarmDirName = "swarm"
const controlSocket = "control.sock"
const swarmConnectTimeout = 20 * time.Second
const stateFile = "docker-state.json"
const defaultAddr = "0.0.0.0:2377"
const (
initialReconnectDelay = 100 * time.Millisecond
@ -51,6 +54,26 @@ var ErrPendingSwarmExists = fmt.Errorf("This node is processing an existing join
// ErrSwarmJoinTimeoutReached is returned when cluster join could not complete before timeout was reached.
var ErrSwarmJoinTimeoutReached = fmt.Errorf("Timeout was reached before node was joined. Attempt to join the cluster will continue in the background. Use \"docker info\" command to see the current Swarm status of your node.")
// defaultSpec contains some sane defaults if cluster options are missing on init
var defaultSpec = types.Spec{
Raft: types.RaftConfig{
SnapshotInterval: 10000,
KeepOldSnapshots: 0,
LogEntriesForSlowFollowers: 500,
HeartbeatTick: 1,
ElectionTick: 3,
},
CAConfig: types.CAConfig{
NodeCertExpiry: 90 * 24 * time.Hour,
},
Dispatcher: types.DispatcherConfig{
HeartbeatPeriod: uint64((5 * time.Second).Nanoseconds()),
},
Orchestration: types.OrchestrationConfig{
TaskHistoryRetentionLimit: 10,
},
}
type state struct {
ListenAddr string
}
@ -282,6 +305,12 @@ func (c *Cluster) Init(req types.InitRequest) (string, error) {
c.conn = nil
c.ready = false
}
if err := validateAndSanitizeInitRequest(&req); err != nil {
c.Unlock()
return "", err
}
// todo: check current state existing
n, ctx, err := c.startNewNode(req.ForceNewCluster, req.ListenAddr, "", "", "", false)
if err != nil {
@ -292,7 +321,7 @@ func (c *Cluster) Init(req types.InitRequest) (string, error) {
select {
case <-n.Ready():
if err := initAcceptancePolicy(n, req.Spec.AcceptancePolicy); err != nil {
if err := initClusterSpec(n, req.Spec); err != nil {
return "", err
}
go c.reconnectOnFailure(ctx)
@ -319,10 +348,11 @@ func (c *Cluster) Join(req types.JoinRequest) error {
c.Unlock()
return errSwarmExists(node)
}
// todo: check current state existing
if len(req.RemoteAddrs) == 0 {
return fmt.Errorf("at least 1 RemoteAddr is required to join")
if err := validateAndSanitizeJoinRequest(&req); err != nil {
c.Unlock()
return err
}
// todo: check current state existing
n, ctx, err := c.startNewNode(false, req.ListenAddr, req.RemoteAddrs[0], req.Secret, req.CACertHash, req.Manager)
if err != nil {
c.Unlock()
@ -1030,6 +1060,76 @@ func (c *Cluster) managerStats() (current bool, reachable int, unreachable int,
return
}
func validateAndSanitizeInitRequest(req *types.InitRequest) error {
var err error
req.ListenAddr, err = validateAddr(req.ListenAddr)
if err != nil {
return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
}
spec := &req.Spec
// provide sane defaults instead of erroring
if spec.Name == "" {
spec.Name = "default"
}
if spec.Raft.SnapshotInterval == 0 {
spec.Raft.SnapshotInterval = defaultSpec.Raft.SnapshotInterval
}
if spec.Raft.LogEntriesForSlowFollowers == 0 {
spec.Raft.LogEntriesForSlowFollowers = defaultSpec.Raft.LogEntriesForSlowFollowers
}
if spec.Raft.ElectionTick == 0 {
spec.Raft.ElectionTick = defaultSpec.Raft.ElectionTick
}
if spec.Raft.HeartbeatTick == 0 {
spec.Raft.HeartbeatTick = defaultSpec.Raft.HeartbeatTick
}
if spec.Dispatcher.HeartbeatPeriod == 0 {
spec.Dispatcher.HeartbeatPeriod = defaultSpec.Dispatcher.HeartbeatPeriod
}
if spec.CAConfig.NodeCertExpiry == 0 {
spec.CAConfig.NodeCertExpiry = defaultSpec.CAConfig.NodeCertExpiry
}
if spec.Orchestration.TaskHistoryRetentionLimit == 0 {
spec.Orchestration.TaskHistoryRetentionLimit = defaultSpec.Orchestration.TaskHistoryRetentionLimit
}
return nil
}
func validateAndSanitizeJoinRequest(req *types.JoinRequest) error {
var err error
req.ListenAddr, err = validateAddr(req.ListenAddr)
if err != nil {
return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
}
if len(req.RemoteAddrs) == 0 {
return fmt.Errorf("at least 1 RemoteAddr is required to join")
}
for i := range req.RemoteAddrs {
req.RemoteAddrs[i], err = validateAddr(req.RemoteAddrs[i])
if err != nil {
return fmt.Errorf("invalid remoteAddr %q: %v", req.RemoteAddrs[i], err)
}
}
if req.CACertHash != "" {
if _, err := digest.ParseDigest(req.CACertHash); err != nil {
return fmt.Errorf("invalid CACertHash %q, %v", req.CACertHash, err)
}
}
return nil
}
func validateAddr(addr string) (string, error) {
if addr == "" {
return addr, fmt.Errorf("invalid empty address")
}
newaddr, err := opts.ParseTCPAddr(addr, defaultAddr)
if err != nil {
return addr, nil
}
return strings.TrimPrefix(newaddr, "tcp://"), nil
}
func errSwarmExists(node *swarmagent.Node) error {
if node.NodeMembership() != swarmapi.NodeMembershipAccepted {
return ErrPendingSwarmExists
@ -1037,7 +1137,7 @@ func errSwarmExists(node *swarmagent.Node) error {
return ErrSwarmExists
}
func initAcceptancePolicy(node *swarmagent.Node, acceptancePolicy types.AcceptancePolicy) error {
func initClusterSpec(node *swarmagent.Node, spec types.Spec) error {
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
for conn := range node.ListenControlSocket(ctx) {
if ctx.Err() != nil {
@ -1061,15 +1161,14 @@ func initAcceptancePolicy(node *swarmagent.Node, acceptancePolicy types.Acceptan
cluster = lcr.Clusters[0]
break
}
spec := &cluster.Spec
if err := convert.SwarmSpecUpdateAcceptancePolicy(spec, acceptancePolicy, nil); err != nil {
newspec, err := convert.SwarmSpecToGRPCandMerge(spec, &cluster.Spec)
if err != nil {
return fmt.Errorf("error updating cluster settings: %v", err)
}
_, err := client.UpdateCluster(ctx, &swarmapi.UpdateClusterRequest{
_, err = client.UpdateCluster(ctx, &swarmapi.UpdateClusterRequest{
ClusterID: cluster.ID,
ClusterVersion: &cluster.Meta.Version,
Spec: spec,
Spec: &newspec,
})
if err != nil {
return fmt.Errorf("error updating cluster settings: %v", err)

View file

@ -656,6 +656,24 @@ loop0:
}
}
func (s *DockerSwarmSuite) TestApiSwarmInvalidAddress(c *check.C) {
d := s.AddDaemon(c, false, false)
req := swarm.InitRequest{
ListenAddr: "",
}
status, _, err := d.SockRequest("POST", "/swarm/init", req)
c.Assert(err, checker.IsNil)
c.Assert(status, checker.Equals, http.StatusInternalServerError)
req2 := swarm.JoinRequest{
ListenAddr: "0.0.0.0:2377",
RemoteAddrs: []string{""},
}
status, _, err = d.SockRequest("POST", "/swarm/join", req2)
c.Assert(err, checker.IsNil)
c.Assert(status, checker.Equals, http.StatusInternalServerError)
}
func simpleTestService(s *swarm.Service) {
var ureplicas uint64
ureplicas = 1

View file

@ -74,3 +74,63 @@ func (s *DockerSwarmSuite) TestSwarmUpdate(c *check.C) {
spec = getSpec()
c.Assert(spec.CAConfig.NodeCertExpiry, checker.Equals, 30*time.Hour)
}
func (s *DockerSwarmSuite) TestSwarmInit(c *check.C) {
d := s.AddDaemon(c, false, false)
getSpec := func() swarm.Spec {
out, err := d.Cmd("swarm", "inspect")
c.Assert(err, checker.IsNil)
var sw []swarm.Swarm
c.Assert(json.Unmarshal([]byte(out), &sw), checker.IsNil)
c.Assert(len(sw), checker.Equals, 1)
return sw[0].Spec
}
out, err := d.Cmd("swarm", "init", "--cert-expiry", "30h", "--dispatcher-heartbeat", "11s", "--auto-accept", "manager", "--auto-accept", "worker", "--secret", "foo")
c.Assert(err, checker.IsNil, check.Commentf("out: %v", out))
spec := getSpec()
c.Assert(spec.CAConfig.NodeCertExpiry, checker.Equals, 30*time.Hour)
c.Assert(spec.Dispatcher.HeartbeatPeriod, checker.Equals, uint64(11*time.Second))
c.Assert(spec.AcceptancePolicy.Policies, checker.HasLen, 2)
for _, p := range spec.AcceptancePolicy.Policies {
c.Assert(p.Autoaccept, checker.Equals, true)
c.Assert(p.Secret, checker.NotNil)
c.Assert(*p.Secret, checker.Not(checker.Equals), "")
}
c.Assert(d.Leave(true), checker.IsNil)
out, err = d.Cmd("swarm", "init", "--auto-accept", "none")
c.Assert(err, checker.IsNil, check.Commentf("out: %v", out))
spec = getSpec()
c.Assert(spec.CAConfig.NodeCertExpiry, checker.Equals, 90*24*time.Hour)
c.Assert(spec.Dispatcher.HeartbeatPeriod, checker.Equals, uint64(5*time.Second))
c.Assert(spec.AcceptancePolicy.Policies, checker.HasLen, 2)
for _, p := range spec.AcceptancePolicy.Policies {
c.Assert(p.Autoaccept, checker.Equals, false)
c.Assert(p.Secret, checker.IsNil)
}
}
func (s *DockerSwarmSuite) TestSwarmInitIPv6(c *check.C) {
testRequires(c, IPv6)
d1 := s.AddDaemon(c, false, false)
out, err := d1.Cmd("swarm", "init", "--listen-addr", "::1")
c.Assert(err, checker.IsNil, check.Commentf("out: %v", out))
d2 := s.AddDaemon(c, false, false)
out, err = d2.Cmd("swarm", "join", "::1")
c.Assert(err, checker.IsNil, check.Commentf("out: %v", out))
out, err = d2.Cmd("info")
c.Assert(err, checker.IsNil, check.Commentf("out: %v", out))
c.Assert(out, checker.Contains, "Swarm: active")
}

View file

@ -70,7 +70,7 @@ func parseDockerDaemonHost(addr string) (string, error) {
switch addrParts[0] {
case "tcp":
return parseTCPAddr(addrParts[1], DefaultTCPHost)
return ParseTCPAddr(addrParts[1], DefaultTCPHost)
case "unix":
return parseSimpleProtoAddr("unix", addrParts[1], DefaultUnixSocket)
case "npipe":
@ -97,12 +97,12 @@ func parseSimpleProtoAddr(proto, addr, defaultAddr string) (string, error) {
return fmt.Sprintf("%s://%s", proto, addr), nil
}
// parseTCPAddr parses and validates that the specified address is a valid TCP
// ParseTCPAddr parses and validates that the specified address is a valid TCP
// address. It returns a formatted TCP address, either using the address parsed
// from tryAddr, or the contents of defaultAddr if tryAddr is a blank string.
// tryAddr is expected to have already been Trim()'d
// defaultAddr must be in the full `tcp://host:port` form
func parseTCPAddr(tryAddr string, defaultAddr string) (string, error) {
func ParseTCPAddr(tryAddr string, defaultAddr string) (string, error) {
if tryAddr == "" || tryAddr == "tcp://" {
return defaultAddr, nil
}
@ -127,8 +127,11 @@ func parseTCPAddr(tryAddr string, defaultAddr string) (string, error) {
if err != nil {
return "", err
}
host, port, err := net.SplitHostPort(u.Host)
if err != nil {
// try port addition once
host, port, err = net.SplitHostPort(net.JoinHostPort(u.Host, defaultPort))
}
if err != nil {
return "", fmt.Errorf("Invalid bind address format: %s", tryAddr)
}

View file

@ -130,12 +130,12 @@ func TestParseTCP(t *testing.T) {
"localhost:5555/path": "tcp://localhost:5555/path",
}
for invalidAddr, expectedError := range invalids {
if addr, err := parseTCPAddr(invalidAddr, defaultHTTPHost); err == nil || err.Error() != expectedError {
if addr, err := ParseTCPAddr(invalidAddr, defaultHTTPHost); err == nil || err.Error() != expectedError {
t.Errorf("tcp %v address expected error %v return, got %s and addr %v", invalidAddr, expectedError, err, addr)
}
}
for validAddr, expectedAddr := range valids {
if addr, err := parseTCPAddr(validAddr, defaultHTTPHost); err != nil || addr != expectedAddr {
if addr, err := ParseTCPAddr(validAddr, defaultHTTPHost); err != nil || addr != expectedAddr {
t.Errorf("%v -> expected %v, got %v and addr %v", validAddr, expectedAddr, err, addr)
}
}