moby/daemon/cluster/swarm.go
Sebastiaan van Stijn 484e6b784c
api/types: move ContainerCreateConfig, ContainerRmConfig to api/types/backend
The `ContainerCreateConfig` and `ContainerRmConfig` structs are used for
options to be passed to the backend, and are not used in client code.

Thess struct currently is intended for internal use only (for example, the
`AdjustCPUShares` is an internal implementation details to adjust the container's
config when older API versions are used).

Somewhat ironically, the signature of the Backend has a nicer UX than that
of the client's `ContainerCreate` signature (which expects all options to
be passed as separate arguments), so we may want to update that signature
to be closer to what the backend is using, but that can be left as a future
exercise.

This patch moves the `ContainerCreateConfig` and `ContainerRmConfig` structs
to the backend package to prevent it being imported in the client, and to make
it more clear that this is part of internal APIs, and not public-facing.

Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
2023-12-05 16:41:36 +01:00

620 lines
17 KiB
Go

package cluster // import "github.com/docker/docker/daemon/cluster"
import (
"context"
"fmt"
"net"
"strings"
"time"
"github.com/containerd/log"
"github.com/docker/docker/api/types/backend"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/filters"
types "github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/daemon/cluster/convert"
"github.com/docker/docker/errdefs"
"github.com/docker/docker/opts"
"github.com/docker/docker/pkg/stack"
swarmapi "github.com/moby/swarmkit/v2/api"
"github.com/moby/swarmkit/v2/manager/encryption"
swarmnode "github.com/moby/swarmkit/v2/node"
"github.com/pkg/errors"
"google.golang.org/grpc"
)
// Init initializes new cluster from user provided request.
func (c *Cluster) Init(req types.InitRequest) (string, error) {
c.controlMutex.Lock()
defer c.controlMutex.Unlock()
if c.nr != nil {
if req.ForceNewCluster {
// Take c.mu temporarily to wait for presently running
// API handlers to finish before shutting down the node.
c.mu.Lock()
if !c.nr.nodeState.IsManager() {
c.mu.Unlock()
return "", errSwarmNotManager
}
c.mu.Unlock()
if err := c.nr.Stop(); err != nil {
return "", err
}
} else {
return "", errSwarmExists
}
}
if err := validateAndSanitizeInitRequest(&req); err != nil {
return "", errdefs.InvalidParameter(err)
}
listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
if err != nil {
return "", errdefs.InvalidParameter(err)
}
advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
if err != nil {
return "", err
}
dataPathAddr, err := resolveDataPathAddr(req.DataPathAddr)
if err != nil {
return "", err
}
localAddr := listenHost
// If the local address is undetermined, the advertise address
// will be used as local address, if it belongs to this system.
// If the advertise address is not local, then we try to find
// a system address to use as local address. If this fails,
// we give up and ask the user to pass the listen address.
if net.ParseIP(localAddr).IsUnspecified() {
advertiseIP := net.ParseIP(advertiseHost)
found := false
for _, systemIP := range listSystemIPs() {
if systemIP.Equal(advertiseIP) {
localAddr = advertiseIP.String()
found = true
break
}
}
if !found {
ip, err := c.resolveSystemAddr()
if err != nil {
log.G(context.TODO()).Warnf("Could not find a local address: %v", err)
return "", errMustSpecifyListenAddr
}
localAddr = ip.String()
}
}
if err := validateDefaultAddrPool(req.DefaultAddrPool, req.SubnetSize); err != nil {
return "", err
}
port, err := getDataPathPort(req.DataPathPort)
if err != nil {
return "", err
}
nr, err := c.newNodeRunner(nodeStartConfig{
forceNewCluster: req.ForceNewCluster,
autolock: req.AutoLockManagers,
LocalAddr: localAddr,
ListenAddr: net.JoinHostPort(listenHost, listenPort),
AdvertiseAddr: net.JoinHostPort(advertiseHost, advertisePort),
DataPathAddr: dataPathAddr,
DefaultAddressPool: req.DefaultAddrPool,
SubnetSize: req.SubnetSize,
availability: req.Availability,
DataPathPort: port,
})
if err != nil {
return "", err
}
c.mu.Lock()
c.nr = nr
c.mu.Unlock()
if err := <-nr.Ready(); err != nil {
c.mu.Lock()
c.nr = nil
c.mu.Unlock()
if !req.ForceNewCluster { // if failure on first attempt don't keep state
if err := clearPersistentState(c.root); err != nil {
return "", err
}
}
return "", err
}
state := nr.State()
if state.swarmNode == nil { // should never happen but protect from panic
return "", errors.New("invalid cluster state for spec initialization")
}
if err := initClusterSpec(state.swarmNode, req.Spec); err != nil {
return "", err
}
return state.NodeID(), nil
}
// Join makes current Cluster part of an existing swarm cluster.
func (c *Cluster) Join(req types.JoinRequest) error {
c.controlMutex.Lock()
defer c.controlMutex.Unlock()
c.mu.Lock()
if c.nr != nil {
c.mu.Unlock()
return errors.WithStack(errSwarmExists)
}
c.mu.Unlock()
if err := validateAndSanitizeJoinRequest(&req); err != nil {
return errdefs.InvalidParameter(err)
}
listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
if err != nil {
return err
}
var advertiseAddr string
if req.AdvertiseAddr != "" {
advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
// For joining, we don't need to provide an advertise address,
// since the remote side can detect it.
if err == nil {
advertiseAddr = net.JoinHostPort(advertiseHost, advertisePort)
}
}
dataPathAddr, err := resolveDataPathAddr(req.DataPathAddr)
if err != nil {
return err
}
nr, err := c.newNodeRunner(nodeStartConfig{
RemoteAddr: req.RemoteAddrs[0],
ListenAddr: net.JoinHostPort(listenHost, listenPort),
AdvertiseAddr: advertiseAddr,
DataPathAddr: dataPathAddr,
joinAddr: req.RemoteAddrs[0],
joinToken: req.JoinToken,
availability: req.Availability,
})
if err != nil {
return err
}
c.mu.Lock()
c.nr = nr
c.mu.Unlock()
timeout := time.NewTimer(swarmConnectTimeout)
defer timeout.Stop()
select {
case <-timeout.C:
return errSwarmJoinTimeoutReached
case err := <-nr.Ready():
if err != nil {
c.mu.Lock()
c.nr = nil
c.mu.Unlock()
if err := clearPersistentState(c.root); err != nil {
return err
}
}
return err
}
}
// Inspect retrieves the configuration properties of a managed swarm cluster.
func (c *Cluster) Inspect() (types.Swarm, error) {
var swarm types.Swarm
if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
s, err := c.inspect(ctx, state)
if err != nil {
return err
}
swarm = s
return nil
}); err != nil {
return types.Swarm{}, err
}
return swarm, nil
}
func (c *Cluster) inspect(ctx context.Context, state nodeState) (types.Swarm, error) {
s, err := getSwarm(ctx, state.controlClient)
if err != nil {
return types.Swarm{}, err
}
return convert.SwarmFromGRPC(*s), nil
}
// Update updates configuration of a managed swarm cluster.
func (c *Cluster) Update(version uint64, spec types.Spec, flags types.UpdateFlags) error {
return c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
swarm, err := getSwarm(ctx, state.controlClient)
if err != nil {
return err
}
// Validate spec name.
if spec.Annotations.Name == "" {
spec.Annotations.Name = "default"
} else if spec.Annotations.Name != "default" {
return errdefs.InvalidParameter(errors.New(`swarm spec must be named "default"`))
}
// In update, client should provide the complete spec of the swarm, including
// Name and Labels. If a field is specified with 0 or nil, then the default value
// will be used to swarmkit.
clusterSpec, err := convert.SwarmSpecToGRPC(spec)
if err != nil {
return errdefs.InvalidParameter(err)
}
_, err = state.controlClient.UpdateCluster(
ctx,
&swarmapi.UpdateClusterRequest{
ClusterID: swarm.ID,
Spec: &clusterSpec,
ClusterVersion: &swarmapi.Version{
Index: version,
},
Rotation: swarmapi.KeyRotation{
WorkerJoinToken: flags.RotateWorkerToken,
ManagerJoinToken: flags.RotateManagerToken,
ManagerUnlockKey: flags.RotateManagerUnlockKey,
},
},
)
return err
})
}
// GetUnlockKey returns the unlock key for the swarm.
func (c *Cluster) GetUnlockKey() (string, error) {
var resp *swarmapi.GetUnlockKeyResponse
if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
client := swarmapi.NewCAClient(state.grpcConn)
r, err := client.GetUnlockKey(ctx, &swarmapi.GetUnlockKeyRequest{})
if err != nil {
return err
}
resp = r
return nil
}); err != nil {
return "", err
}
if len(resp.UnlockKey) == 0 {
// no key
return "", nil
}
return encryption.HumanReadableKey(resp.UnlockKey), nil
}
// UnlockSwarm provides a key to decrypt data that is encrypted at rest.
func (c *Cluster) UnlockSwarm(req types.UnlockRequest) error {
c.controlMutex.Lock()
defer c.controlMutex.Unlock()
c.mu.RLock()
state := c.currentNodeState()
if !state.IsActiveManager() {
// when manager is not active,
// unless it is locked, otherwise return error.
if err := c.errNoManager(state); err != errSwarmLocked {
c.mu.RUnlock()
return err
}
} else {
// when manager is active, return an error of "not locked"
c.mu.RUnlock()
return notLockedError{}
}
// only when swarm is locked, code running reaches here
nr := c.nr
c.mu.RUnlock()
key, err := encryption.ParseHumanReadableKey(req.UnlockKey)
if err != nil {
return errdefs.InvalidParameter(err)
}
config := nr.config
config.lockKey = key
if err := nr.Stop(); err != nil {
return err
}
nr, err = c.newNodeRunner(config)
if err != nil {
return err
}
c.mu.Lock()
c.nr = nr
c.mu.Unlock()
if err := <-nr.Ready(); err != nil {
if errors.Is(err, errSwarmLocked) {
return invalidUnlockKey{}
}
return errors.Errorf("swarm component could not be started: %v", err)
}
return nil
}
// Leave shuts down Cluster and removes current state.
func (c *Cluster) Leave(ctx context.Context, force bool) error {
c.controlMutex.Lock()
defer c.controlMutex.Unlock()
c.mu.Lock()
nr := c.nr
if nr == nil {
c.mu.Unlock()
return errors.WithStack(errNoSwarm)
}
state := c.currentNodeState()
c.mu.Unlock()
if errors.Is(state.err, errSwarmLocked) && !force {
// leave a locked swarm without --force is not allowed
return errors.WithStack(notAvailableError("Swarm is encrypted and locked. Please unlock it first or use `--force` to ignore this message."))
}
if state.IsManager() && !force {
msg := "You are attempting to leave the swarm on a node that is participating as a manager. "
if state.IsActiveManager() {
active, reachable, unreachable, err := managerStats(state.controlClient, state.NodeID())
if err == nil {
if active && removingManagerCausesLossOfQuorum(reachable, unreachable) {
if isLastManager(reachable, unreachable) {
msg += "Removing the last manager erases all current state of the swarm. Use `--force` to ignore this message. "
return errors.WithStack(notAvailableError(msg))
}
msg += fmt.Sprintf("Removing this node leaves %v managers out of %v. Without a Raft quorum your swarm will be inaccessible. ", reachable-1, reachable+unreachable)
}
}
} else {
msg += "Doing so may lose the consensus of your cluster. "
}
msg += "The only way to restore a swarm that has lost consensus is to reinitialize it with `--force-new-cluster`. Use `--force` to suppress this message."
return errors.WithStack(notAvailableError(msg))
}
// release readers in here
if err := nr.Stop(); err != nil {
log.G(ctx).Errorf("failed to shut down cluster node: %v", err)
stack.Dump()
return err
}
c.mu.Lock()
c.nr = nil
c.mu.Unlock()
if nodeID := state.NodeID(); nodeID != "" {
nodeContainers, err := c.listContainerForNode(ctx, nodeID)
if err != nil {
return err
}
for _, id := range nodeContainers {
if err := c.config.Backend.ContainerRm(id, &backend.ContainerRmConfig{ForceRemove: true}); err != nil {
log.G(ctx).Errorf("error removing %v: %v", id, err)
}
}
}
// todo: cleanup optional?
if err := clearPersistentState(c.root); err != nil {
return err
}
c.config.Backend.DaemonLeavesCluster()
return nil
}
// Info returns information about the current cluster state.
func (c *Cluster) Info(ctx context.Context) types.Info {
info := types.Info{
NodeAddr: c.GetAdvertiseAddress(),
}
c.mu.RLock()
defer c.mu.RUnlock()
state := c.currentNodeState()
info.LocalNodeState = state.status
if state.err != nil {
info.Error = state.err.Error()
}
ctx, cancel := c.getRequestContext(ctx)
defer cancel()
if state.IsActiveManager() {
info.ControlAvailable = true
swarm, err := c.inspect(ctx, state)
if err != nil {
info.Error = err.Error()
}
info.Cluster = &swarm.ClusterInfo
if r, err := state.controlClient.ListNodes(
ctx, &swarmapi.ListNodesRequest{},
grpc.MaxCallRecvMsgSize(defaultRecvSizeForListResponse),
); err != nil {
info.Error = err.Error()
} else {
info.Nodes = len(r.Nodes)
for _, n := range r.Nodes {
if n.ManagerStatus != nil {
info.Managers = info.Managers + 1
}
}
}
switch info.LocalNodeState {
case types.LocalNodeStateInactive, types.LocalNodeStateLocked, types.LocalNodeStateError:
// nothing to do
default:
if info.Managers == 2 {
const warn string = `WARNING: Running Swarm in a two-manager configuration. This configuration provides
no fault tolerance, and poses a high risk to lose control over the cluster.
Refer to https://docs.docker.com/engine/swarm/admin_guide/ to configure the
Swarm for fault-tolerance.`
info.Warnings = append(info.Warnings, warn)
}
}
}
if state.swarmNode != nil {
for _, r := range state.swarmNode.Remotes() {
info.RemoteManagers = append(info.RemoteManagers, types.Peer{NodeID: r.NodeID, Addr: r.Addr})
}
info.NodeID = state.swarmNode.NodeID()
}
return info
}
// Status returns a textual representation of the node's swarm status and role (manager/worker)
func (c *Cluster) Status() string {
c.mu.RLock()
s := c.currentNodeState()
c.mu.RUnlock()
state := string(s.status)
if s.status == types.LocalNodeStateActive {
if s.IsActiveManager() || s.IsManager() {
state += "/manager"
} else {
state += "/worker"
}
}
return state
}
func validateAndSanitizeInitRequest(req *types.InitRequest) error {
var err error
req.ListenAddr, err = validateAddr(req.ListenAddr)
if err != nil {
return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
}
if req.Spec.Annotations.Name == "" {
req.Spec.Annotations.Name = "default"
} else if req.Spec.Annotations.Name != "default" {
return errors.New(`swarm spec must be named "default"`)
}
return nil
}
func validateAndSanitizeJoinRequest(req *types.JoinRequest) error {
var err error
req.ListenAddr, err = validateAddr(req.ListenAddr)
if err != nil {
return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
}
if len(req.RemoteAddrs) == 0 {
return errors.New("at least 1 RemoteAddr is required to join")
}
for i := range req.RemoteAddrs {
req.RemoteAddrs[i], err = validateAddr(req.RemoteAddrs[i])
if err != nil {
return fmt.Errorf("invalid remoteAddr %q: %v", req.RemoteAddrs[i], err)
}
}
return nil
}
func validateAddr(addr string) (string, error) {
if addr == "" {
return addr, errors.New("invalid empty address")
}
newaddr, err := opts.ParseTCPAddr(addr, defaultAddr)
if err != nil {
// TODO(thaJeztah) why are we ignoring the error here? Is this to allow "non-tcp" addresses?
return addr, nil
}
return strings.TrimPrefix(newaddr, "tcp://"), nil
}
func initClusterSpec(node *swarmnode.Node, spec types.Spec) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
for conn := range node.ListenControlSocket(ctx) {
if ctx.Err() != nil {
return ctx.Err()
}
if conn != nil {
client := swarmapi.NewControlClient(conn)
var cluster *swarmapi.Cluster
for i := 0; ; i++ {
lcr, err := client.ListClusters(ctx, &swarmapi.ListClustersRequest{})
if err != nil {
return fmt.Errorf("error on listing clusters: %v", err)
}
if len(lcr.Clusters) == 0 {
if i < 10 {
time.Sleep(200 * time.Millisecond)
continue
}
return errors.New("empty list of clusters was returned")
}
cluster = lcr.Clusters[0]
break
}
// In init, we take the initial default values from swarmkit, and merge
// any non nil or 0 value from spec to GRPC spec. This will leave the
// default value alone.
// Note that this is different from Update(), as in Update() we expect
// user to specify the complete spec of the cluster (as they already know
// the existing one and knows which field to update)
clusterSpec, err := convert.MergeSwarmSpecToGRPC(spec, cluster.Spec)
if err != nil {
return fmt.Errorf("error updating cluster settings: %v", err)
}
_, err = client.UpdateCluster(ctx, &swarmapi.UpdateClusterRequest{
ClusterID: cluster.ID,
ClusterVersion: &cluster.Meta.Version,
Spec: &clusterSpec,
})
if err != nil {
return fmt.Errorf("error updating cluster settings: %v", err)
}
return nil
}
}
return ctx.Err()
}
func (c *Cluster) listContainerForNode(ctx context.Context, nodeID string) ([]string, error) {
var ids []string
containers, err := c.config.Backend.Containers(ctx, &container.ListOptions{
Filters: filters.NewArgs(filters.Arg("label", "com.docker.swarm.node.id="+nodeID)),
})
if err != nil {
return []string{}, err
}
for _, c := range containers {
ids = append(ids, c.ID)
}
return ids, nil
}