484e6b784c
The `ContainerCreateConfig` and `ContainerRmConfig` structs are used for options to be passed to the backend, and are not used in client code. Thess struct currently is intended for internal use only (for example, the `AdjustCPUShares` is an internal implementation details to adjust the container's config when older API versions are used). Somewhat ironically, the signature of the Backend has a nicer UX than that of the client's `ContainerCreate` signature (which expects all options to be passed as separate arguments), so we may want to update that signature to be closer to what the backend is using, but that can be left as a future exercise. This patch moves the `ContainerCreateConfig` and `ContainerRmConfig` structs to the backend package to prevent it being imported in the client, and to make it more clear that this is part of internal APIs, and not public-facing. Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
620 lines
17 KiB
Go
620 lines
17 KiB
Go
package cluster // import "github.com/docker/docker/daemon/cluster"
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/containerd/log"
|
|
"github.com/docker/docker/api/types/backend"
|
|
"github.com/docker/docker/api/types/container"
|
|
"github.com/docker/docker/api/types/filters"
|
|
types "github.com/docker/docker/api/types/swarm"
|
|
"github.com/docker/docker/daemon/cluster/convert"
|
|
"github.com/docker/docker/errdefs"
|
|
"github.com/docker/docker/opts"
|
|
"github.com/docker/docker/pkg/stack"
|
|
swarmapi "github.com/moby/swarmkit/v2/api"
|
|
"github.com/moby/swarmkit/v2/manager/encryption"
|
|
swarmnode "github.com/moby/swarmkit/v2/node"
|
|
"github.com/pkg/errors"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
// Init initializes new cluster from user provided request.
|
|
func (c *Cluster) Init(req types.InitRequest) (string, error) {
|
|
c.controlMutex.Lock()
|
|
defer c.controlMutex.Unlock()
|
|
if c.nr != nil {
|
|
if req.ForceNewCluster {
|
|
|
|
// Take c.mu temporarily to wait for presently running
|
|
// API handlers to finish before shutting down the node.
|
|
c.mu.Lock()
|
|
if !c.nr.nodeState.IsManager() {
|
|
c.mu.Unlock()
|
|
return "", errSwarmNotManager
|
|
}
|
|
c.mu.Unlock()
|
|
|
|
if err := c.nr.Stop(); err != nil {
|
|
return "", err
|
|
}
|
|
} else {
|
|
return "", errSwarmExists
|
|
}
|
|
}
|
|
|
|
if err := validateAndSanitizeInitRequest(&req); err != nil {
|
|
return "", errdefs.InvalidParameter(err)
|
|
}
|
|
|
|
listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
|
|
if err != nil {
|
|
return "", errdefs.InvalidParameter(err)
|
|
}
|
|
|
|
advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
dataPathAddr, err := resolveDataPathAddr(req.DataPathAddr)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
localAddr := listenHost
|
|
|
|
// If the local address is undetermined, the advertise address
|
|
// will be used as local address, if it belongs to this system.
|
|
// If the advertise address is not local, then we try to find
|
|
// a system address to use as local address. If this fails,
|
|
// we give up and ask the user to pass the listen address.
|
|
if net.ParseIP(localAddr).IsUnspecified() {
|
|
advertiseIP := net.ParseIP(advertiseHost)
|
|
|
|
found := false
|
|
for _, systemIP := range listSystemIPs() {
|
|
if systemIP.Equal(advertiseIP) {
|
|
localAddr = advertiseIP.String()
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if !found {
|
|
ip, err := c.resolveSystemAddr()
|
|
if err != nil {
|
|
log.G(context.TODO()).Warnf("Could not find a local address: %v", err)
|
|
return "", errMustSpecifyListenAddr
|
|
}
|
|
localAddr = ip.String()
|
|
}
|
|
}
|
|
|
|
if err := validateDefaultAddrPool(req.DefaultAddrPool, req.SubnetSize); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
port, err := getDataPathPort(req.DataPathPort)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
nr, err := c.newNodeRunner(nodeStartConfig{
|
|
forceNewCluster: req.ForceNewCluster,
|
|
autolock: req.AutoLockManagers,
|
|
LocalAddr: localAddr,
|
|
ListenAddr: net.JoinHostPort(listenHost, listenPort),
|
|
AdvertiseAddr: net.JoinHostPort(advertiseHost, advertisePort),
|
|
DataPathAddr: dataPathAddr,
|
|
DefaultAddressPool: req.DefaultAddrPool,
|
|
SubnetSize: req.SubnetSize,
|
|
availability: req.Availability,
|
|
DataPathPort: port,
|
|
})
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
c.mu.Lock()
|
|
c.nr = nr
|
|
c.mu.Unlock()
|
|
|
|
if err := <-nr.Ready(); err != nil {
|
|
c.mu.Lock()
|
|
c.nr = nil
|
|
c.mu.Unlock()
|
|
if !req.ForceNewCluster { // if failure on first attempt don't keep state
|
|
if err := clearPersistentState(c.root); err != nil {
|
|
return "", err
|
|
}
|
|
}
|
|
return "", err
|
|
}
|
|
state := nr.State()
|
|
if state.swarmNode == nil { // should never happen but protect from panic
|
|
return "", errors.New("invalid cluster state for spec initialization")
|
|
}
|
|
if err := initClusterSpec(state.swarmNode, req.Spec); err != nil {
|
|
return "", err
|
|
}
|
|
return state.NodeID(), nil
|
|
}
|
|
|
|
// Join makes current Cluster part of an existing swarm cluster.
|
|
func (c *Cluster) Join(req types.JoinRequest) error {
|
|
c.controlMutex.Lock()
|
|
defer c.controlMutex.Unlock()
|
|
c.mu.Lock()
|
|
if c.nr != nil {
|
|
c.mu.Unlock()
|
|
return errors.WithStack(errSwarmExists)
|
|
}
|
|
c.mu.Unlock()
|
|
|
|
if err := validateAndSanitizeJoinRequest(&req); err != nil {
|
|
return errdefs.InvalidParameter(err)
|
|
}
|
|
|
|
listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var advertiseAddr string
|
|
if req.AdvertiseAddr != "" {
|
|
advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
|
|
// For joining, we don't need to provide an advertise address,
|
|
// since the remote side can detect it.
|
|
if err == nil {
|
|
advertiseAddr = net.JoinHostPort(advertiseHost, advertisePort)
|
|
}
|
|
}
|
|
|
|
dataPathAddr, err := resolveDataPathAddr(req.DataPathAddr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
nr, err := c.newNodeRunner(nodeStartConfig{
|
|
RemoteAddr: req.RemoteAddrs[0],
|
|
ListenAddr: net.JoinHostPort(listenHost, listenPort),
|
|
AdvertiseAddr: advertiseAddr,
|
|
DataPathAddr: dataPathAddr,
|
|
joinAddr: req.RemoteAddrs[0],
|
|
joinToken: req.JoinToken,
|
|
availability: req.Availability,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
c.mu.Lock()
|
|
c.nr = nr
|
|
c.mu.Unlock()
|
|
|
|
timeout := time.NewTimer(swarmConnectTimeout)
|
|
defer timeout.Stop()
|
|
|
|
select {
|
|
case <-timeout.C:
|
|
return errSwarmJoinTimeoutReached
|
|
case err := <-nr.Ready():
|
|
if err != nil {
|
|
c.mu.Lock()
|
|
c.nr = nil
|
|
c.mu.Unlock()
|
|
if err := clearPersistentState(c.root); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Inspect retrieves the configuration properties of a managed swarm cluster.
|
|
func (c *Cluster) Inspect() (types.Swarm, error) {
|
|
var swarm types.Swarm
|
|
if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
|
|
s, err := c.inspect(ctx, state)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
swarm = s
|
|
return nil
|
|
}); err != nil {
|
|
return types.Swarm{}, err
|
|
}
|
|
return swarm, nil
|
|
}
|
|
|
|
func (c *Cluster) inspect(ctx context.Context, state nodeState) (types.Swarm, error) {
|
|
s, err := getSwarm(ctx, state.controlClient)
|
|
if err != nil {
|
|
return types.Swarm{}, err
|
|
}
|
|
return convert.SwarmFromGRPC(*s), nil
|
|
}
|
|
|
|
// Update updates configuration of a managed swarm cluster.
|
|
func (c *Cluster) Update(version uint64, spec types.Spec, flags types.UpdateFlags) error {
|
|
return c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
|
|
swarm, err := getSwarm(ctx, state.controlClient)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Validate spec name.
|
|
if spec.Annotations.Name == "" {
|
|
spec.Annotations.Name = "default"
|
|
} else if spec.Annotations.Name != "default" {
|
|
return errdefs.InvalidParameter(errors.New(`swarm spec must be named "default"`))
|
|
}
|
|
|
|
// In update, client should provide the complete spec of the swarm, including
|
|
// Name and Labels. If a field is specified with 0 or nil, then the default value
|
|
// will be used to swarmkit.
|
|
clusterSpec, err := convert.SwarmSpecToGRPC(spec)
|
|
if err != nil {
|
|
return errdefs.InvalidParameter(err)
|
|
}
|
|
|
|
_, err = state.controlClient.UpdateCluster(
|
|
ctx,
|
|
&swarmapi.UpdateClusterRequest{
|
|
ClusterID: swarm.ID,
|
|
Spec: &clusterSpec,
|
|
ClusterVersion: &swarmapi.Version{
|
|
Index: version,
|
|
},
|
|
Rotation: swarmapi.KeyRotation{
|
|
WorkerJoinToken: flags.RotateWorkerToken,
|
|
ManagerJoinToken: flags.RotateManagerToken,
|
|
ManagerUnlockKey: flags.RotateManagerUnlockKey,
|
|
},
|
|
},
|
|
)
|
|
return err
|
|
})
|
|
}
|
|
|
|
// GetUnlockKey returns the unlock key for the swarm.
|
|
func (c *Cluster) GetUnlockKey() (string, error) {
|
|
var resp *swarmapi.GetUnlockKeyResponse
|
|
if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
|
|
client := swarmapi.NewCAClient(state.grpcConn)
|
|
|
|
r, err := client.GetUnlockKey(ctx, &swarmapi.GetUnlockKeyRequest{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
resp = r
|
|
return nil
|
|
}); err != nil {
|
|
return "", err
|
|
}
|
|
if len(resp.UnlockKey) == 0 {
|
|
// no key
|
|
return "", nil
|
|
}
|
|
return encryption.HumanReadableKey(resp.UnlockKey), nil
|
|
}
|
|
|
|
// UnlockSwarm provides a key to decrypt data that is encrypted at rest.
|
|
func (c *Cluster) UnlockSwarm(req types.UnlockRequest) error {
|
|
c.controlMutex.Lock()
|
|
defer c.controlMutex.Unlock()
|
|
|
|
c.mu.RLock()
|
|
state := c.currentNodeState()
|
|
|
|
if !state.IsActiveManager() {
|
|
// when manager is not active,
|
|
// unless it is locked, otherwise return error.
|
|
if err := c.errNoManager(state); err != errSwarmLocked {
|
|
c.mu.RUnlock()
|
|
return err
|
|
}
|
|
} else {
|
|
// when manager is active, return an error of "not locked"
|
|
c.mu.RUnlock()
|
|
return notLockedError{}
|
|
}
|
|
|
|
// only when swarm is locked, code running reaches here
|
|
nr := c.nr
|
|
c.mu.RUnlock()
|
|
|
|
key, err := encryption.ParseHumanReadableKey(req.UnlockKey)
|
|
if err != nil {
|
|
return errdefs.InvalidParameter(err)
|
|
}
|
|
|
|
config := nr.config
|
|
config.lockKey = key
|
|
if err := nr.Stop(); err != nil {
|
|
return err
|
|
}
|
|
nr, err = c.newNodeRunner(config)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
c.mu.Lock()
|
|
c.nr = nr
|
|
c.mu.Unlock()
|
|
|
|
if err := <-nr.Ready(); err != nil {
|
|
if errors.Is(err, errSwarmLocked) {
|
|
return invalidUnlockKey{}
|
|
}
|
|
return errors.Errorf("swarm component could not be started: %v", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Leave shuts down Cluster and removes current state.
|
|
func (c *Cluster) Leave(ctx context.Context, force bool) error {
|
|
c.controlMutex.Lock()
|
|
defer c.controlMutex.Unlock()
|
|
|
|
c.mu.Lock()
|
|
nr := c.nr
|
|
if nr == nil {
|
|
c.mu.Unlock()
|
|
return errors.WithStack(errNoSwarm)
|
|
}
|
|
|
|
state := c.currentNodeState()
|
|
|
|
c.mu.Unlock()
|
|
|
|
if errors.Is(state.err, errSwarmLocked) && !force {
|
|
// leave a locked swarm without --force is not allowed
|
|
return errors.WithStack(notAvailableError("Swarm is encrypted and locked. Please unlock it first or use `--force` to ignore this message."))
|
|
}
|
|
|
|
if state.IsManager() && !force {
|
|
msg := "You are attempting to leave the swarm on a node that is participating as a manager. "
|
|
if state.IsActiveManager() {
|
|
active, reachable, unreachable, err := managerStats(state.controlClient, state.NodeID())
|
|
if err == nil {
|
|
if active && removingManagerCausesLossOfQuorum(reachable, unreachable) {
|
|
if isLastManager(reachable, unreachable) {
|
|
msg += "Removing the last manager erases all current state of the swarm. Use `--force` to ignore this message. "
|
|
return errors.WithStack(notAvailableError(msg))
|
|
}
|
|
msg += fmt.Sprintf("Removing this node leaves %v managers out of %v. Without a Raft quorum your swarm will be inaccessible. ", reachable-1, reachable+unreachable)
|
|
}
|
|
}
|
|
} else {
|
|
msg += "Doing so may lose the consensus of your cluster. "
|
|
}
|
|
|
|
msg += "The only way to restore a swarm that has lost consensus is to reinitialize it with `--force-new-cluster`. Use `--force` to suppress this message."
|
|
return errors.WithStack(notAvailableError(msg))
|
|
}
|
|
// release readers in here
|
|
if err := nr.Stop(); err != nil {
|
|
log.G(ctx).Errorf("failed to shut down cluster node: %v", err)
|
|
stack.Dump()
|
|
return err
|
|
}
|
|
|
|
c.mu.Lock()
|
|
c.nr = nil
|
|
c.mu.Unlock()
|
|
|
|
if nodeID := state.NodeID(); nodeID != "" {
|
|
nodeContainers, err := c.listContainerForNode(ctx, nodeID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, id := range nodeContainers {
|
|
if err := c.config.Backend.ContainerRm(id, &backend.ContainerRmConfig{ForceRemove: true}); err != nil {
|
|
log.G(ctx).Errorf("error removing %v: %v", id, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// todo: cleanup optional?
|
|
if err := clearPersistentState(c.root); err != nil {
|
|
return err
|
|
}
|
|
c.config.Backend.DaemonLeavesCluster()
|
|
return nil
|
|
}
|
|
|
|
// Info returns information about the current cluster state.
|
|
func (c *Cluster) Info(ctx context.Context) types.Info {
|
|
info := types.Info{
|
|
NodeAddr: c.GetAdvertiseAddress(),
|
|
}
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
|
|
state := c.currentNodeState()
|
|
info.LocalNodeState = state.status
|
|
if state.err != nil {
|
|
info.Error = state.err.Error()
|
|
}
|
|
|
|
ctx, cancel := c.getRequestContext(ctx)
|
|
defer cancel()
|
|
|
|
if state.IsActiveManager() {
|
|
info.ControlAvailable = true
|
|
swarm, err := c.inspect(ctx, state)
|
|
if err != nil {
|
|
info.Error = err.Error()
|
|
}
|
|
|
|
info.Cluster = &swarm.ClusterInfo
|
|
|
|
if r, err := state.controlClient.ListNodes(
|
|
ctx, &swarmapi.ListNodesRequest{},
|
|
grpc.MaxCallRecvMsgSize(defaultRecvSizeForListResponse),
|
|
); err != nil {
|
|
info.Error = err.Error()
|
|
} else {
|
|
info.Nodes = len(r.Nodes)
|
|
for _, n := range r.Nodes {
|
|
if n.ManagerStatus != nil {
|
|
info.Managers = info.Managers + 1
|
|
}
|
|
}
|
|
}
|
|
|
|
switch info.LocalNodeState {
|
|
case types.LocalNodeStateInactive, types.LocalNodeStateLocked, types.LocalNodeStateError:
|
|
// nothing to do
|
|
default:
|
|
if info.Managers == 2 {
|
|
const warn string = `WARNING: Running Swarm in a two-manager configuration. This configuration provides
|
|
no fault tolerance, and poses a high risk to lose control over the cluster.
|
|
Refer to https://docs.docker.com/engine/swarm/admin_guide/ to configure the
|
|
Swarm for fault-tolerance.`
|
|
|
|
info.Warnings = append(info.Warnings, warn)
|
|
}
|
|
}
|
|
}
|
|
|
|
if state.swarmNode != nil {
|
|
for _, r := range state.swarmNode.Remotes() {
|
|
info.RemoteManagers = append(info.RemoteManagers, types.Peer{NodeID: r.NodeID, Addr: r.Addr})
|
|
}
|
|
info.NodeID = state.swarmNode.NodeID()
|
|
}
|
|
|
|
return info
|
|
}
|
|
|
|
// Status returns a textual representation of the node's swarm status and role (manager/worker)
|
|
func (c *Cluster) Status() string {
|
|
c.mu.RLock()
|
|
s := c.currentNodeState()
|
|
c.mu.RUnlock()
|
|
|
|
state := string(s.status)
|
|
if s.status == types.LocalNodeStateActive {
|
|
if s.IsActiveManager() || s.IsManager() {
|
|
state += "/manager"
|
|
} else {
|
|
state += "/worker"
|
|
}
|
|
}
|
|
return state
|
|
}
|
|
|
|
func validateAndSanitizeInitRequest(req *types.InitRequest) error {
|
|
var err error
|
|
req.ListenAddr, err = validateAddr(req.ListenAddr)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
|
|
}
|
|
|
|
if req.Spec.Annotations.Name == "" {
|
|
req.Spec.Annotations.Name = "default"
|
|
} else if req.Spec.Annotations.Name != "default" {
|
|
return errors.New(`swarm spec must be named "default"`)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func validateAndSanitizeJoinRequest(req *types.JoinRequest) error {
|
|
var err error
|
|
req.ListenAddr, err = validateAddr(req.ListenAddr)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
|
|
}
|
|
if len(req.RemoteAddrs) == 0 {
|
|
return errors.New("at least 1 RemoteAddr is required to join")
|
|
}
|
|
for i := range req.RemoteAddrs {
|
|
req.RemoteAddrs[i], err = validateAddr(req.RemoteAddrs[i])
|
|
if err != nil {
|
|
return fmt.Errorf("invalid remoteAddr %q: %v", req.RemoteAddrs[i], err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func validateAddr(addr string) (string, error) {
|
|
if addr == "" {
|
|
return addr, errors.New("invalid empty address")
|
|
}
|
|
newaddr, err := opts.ParseTCPAddr(addr, defaultAddr)
|
|
if err != nil {
|
|
// TODO(thaJeztah) why are we ignoring the error here? Is this to allow "non-tcp" addresses?
|
|
return addr, nil
|
|
}
|
|
return strings.TrimPrefix(newaddr, "tcp://"), nil
|
|
}
|
|
|
|
func initClusterSpec(node *swarmnode.Node, spec types.Spec) error {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
for conn := range node.ListenControlSocket(ctx) {
|
|
if ctx.Err() != nil {
|
|
return ctx.Err()
|
|
}
|
|
if conn != nil {
|
|
client := swarmapi.NewControlClient(conn)
|
|
var cluster *swarmapi.Cluster
|
|
for i := 0; ; i++ {
|
|
lcr, err := client.ListClusters(ctx, &swarmapi.ListClustersRequest{})
|
|
if err != nil {
|
|
return fmt.Errorf("error on listing clusters: %v", err)
|
|
}
|
|
if len(lcr.Clusters) == 0 {
|
|
if i < 10 {
|
|
time.Sleep(200 * time.Millisecond)
|
|
continue
|
|
}
|
|
return errors.New("empty list of clusters was returned")
|
|
}
|
|
cluster = lcr.Clusters[0]
|
|
break
|
|
}
|
|
// In init, we take the initial default values from swarmkit, and merge
|
|
// any non nil or 0 value from spec to GRPC spec. This will leave the
|
|
// default value alone.
|
|
// Note that this is different from Update(), as in Update() we expect
|
|
// user to specify the complete spec of the cluster (as they already know
|
|
// the existing one and knows which field to update)
|
|
clusterSpec, err := convert.MergeSwarmSpecToGRPC(spec, cluster.Spec)
|
|
if err != nil {
|
|
return fmt.Errorf("error updating cluster settings: %v", err)
|
|
}
|
|
_, err = client.UpdateCluster(ctx, &swarmapi.UpdateClusterRequest{
|
|
ClusterID: cluster.ID,
|
|
ClusterVersion: &cluster.Meta.Version,
|
|
Spec: &clusterSpec,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("error updating cluster settings: %v", err)
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
return ctx.Err()
|
|
}
|
|
|
|
func (c *Cluster) listContainerForNode(ctx context.Context, nodeID string) ([]string, error) {
|
|
var ids []string
|
|
containers, err := c.config.Backend.Containers(ctx, &container.ListOptions{
|
|
Filters: filters.NewArgs(filters.Arg("label", "com.docker.swarm.node.id="+nodeID)),
|
|
})
|
|
if err != nil {
|
|
return []string{}, err
|
|
}
|
|
for _, c := range containers {
|
|
ids = append(ids, c.ID)
|
|
}
|
|
return ids, nil
|
|
}
|