a3ae9a5956
Make sure we validate the default address given before using it, and combine the parsing/validation logic so that it can be reused. This patch also makes the errors more consistent, and uses pkg/errors for generating them. Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
460 lines
14 KiB
Go
460 lines
14 KiB
Go
package cluster // import "github.com/docker/docker/daemon/cluster"
|
|
|
|
//
|
|
// ## Swarmkit integration
|
|
//
|
|
// Cluster - static configurable object for accessing everything swarm related.
|
|
// Contains methods for connecting and controlling the cluster. Exists always,
|
|
// even if swarm mode is not enabled.
|
|
//
|
|
// NodeRunner - Manager for starting the swarmkit node. Is present only and
|
|
// always if swarm mode is enabled. Implements backoff restart loop in case of
|
|
// errors.
|
|
//
|
|
// NodeState - Information about the current node status including access to
|
|
// gRPC clients if a manager is active.
|
|
//
|
|
// ### Locking
|
|
//
|
|
// `cluster.controlMutex` - taken for the whole lifecycle of the processes that
|
|
// can reconfigure cluster(init/join/leave etc). Protects that one
|
|
// reconfiguration action has fully completed before another can start.
|
|
//
|
|
// `cluster.mu` - taken when the actual changes in cluster configurations
|
|
// happen. Different from `controlMutex` because in some cases we need to
|
|
// access current cluster state even if the long-running reconfiguration is
|
|
// going on. For example network stack may ask for the current cluster state in
|
|
// the middle of the shutdown. Any time current cluster state is asked you
|
|
// should take the read lock of `cluster.mu`. If you are writing an API
|
|
// responder that returns synchronously, hold `cluster.mu.RLock()` for the
|
|
// duration of the whole handler function. That ensures that node will not be
|
|
// shut down until the handler has finished.
|
|
//
|
|
// NodeRunner implements its internal locks that should not be used outside of
|
|
// the struct. Instead, you should just call `nodeRunner.State()` method to get
|
|
// the current state of the cluster(still need `cluster.mu.RLock()` to access
|
|
// `cluster.nr` reference itself). Most of the changes in NodeRunner happen
|
|
// because of an external event(network problem, unexpected swarmkit error) and
|
|
// Docker shouldn't take any locks that delay these changes from happening.
|
|
//
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math"
|
|
"net"
|
|
"os"
|
|
"path/filepath"
|
|
"runtime"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/docker/docker/api/types/network"
|
|
types "github.com/docker/docker/api/types/swarm"
|
|
"github.com/docker/docker/daemon/cluster/controllers/plugin"
|
|
executorpkg "github.com/docker/docker/daemon/cluster/executor"
|
|
lncluster "github.com/docker/docker/libnetwork/cluster"
|
|
"github.com/docker/docker/pkg/stack"
|
|
swarmapi "github.com/moby/swarmkit/v2/api"
|
|
swarmnode "github.com/moby/swarmkit/v2/node"
|
|
"github.com/pkg/errors"
|
|
"github.com/sirupsen/logrus"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
const (
|
|
swarmDirName = "swarm"
|
|
controlSocket = "control.sock"
|
|
swarmConnectTimeout = 20 * time.Second
|
|
swarmRequestTimeout = 20 * time.Second
|
|
stateFile = "docker-state.json"
|
|
defaultAddr = "tcp://0.0.0.0:2377"
|
|
isWindows = runtime.GOOS == "windows"
|
|
initialReconnectDelay = 100 * time.Millisecond
|
|
maxReconnectDelay = 30 * time.Second
|
|
contextPrefix = "com.docker.swarm"
|
|
defaultRecvSizeForListResponse = math.MaxInt32 // the max recv limit grpc <1.4.0
|
|
)
|
|
|
|
// NetworkSubnetsProvider exposes functions for retrieving the subnets
|
|
// of networks managed by Docker, so they can be filtered.
|
|
type NetworkSubnetsProvider interface {
|
|
Subnets() ([]net.IPNet, []net.IPNet)
|
|
}
|
|
|
|
// Config provides values for Cluster.
|
|
type Config struct {
|
|
Root string
|
|
Name string
|
|
Backend executorpkg.Backend
|
|
ImageBackend executorpkg.ImageBackend
|
|
PluginBackend plugin.Backend
|
|
VolumeBackend executorpkg.VolumeBackend
|
|
NetworkSubnetsProvider NetworkSubnetsProvider
|
|
|
|
// DefaultAdvertiseAddr is the default host/IP or network interface to use
|
|
// if no AdvertiseAddr value is specified.
|
|
DefaultAdvertiseAddr string
|
|
|
|
// path to store runtime state, such as the swarm control socket
|
|
RuntimeRoot string
|
|
|
|
// WatchStream is a channel to pass watch API notifications to daemon
|
|
WatchStream chan *swarmapi.WatchMessage
|
|
|
|
// RaftHeartbeatTick is the number of ticks for heartbeat of quorum members
|
|
RaftHeartbeatTick uint32
|
|
|
|
// RaftElectionTick is the number of ticks to elapse before followers propose a new round of leader election
|
|
// This value should be 10x that of RaftHeartbeatTick
|
|
RaftElectionTick uint32
|
|
}
|
|
|
|
// Cluster provides capabilities to participate in a cluster as a worker or a
|
|
// manager.
|
|
type Cluster struct {
|
|
mu sync.RWMutex
|
|
controlMutex sync.RWMutex // protect init/join/leave user operations
|
|
nr *nodeRunner
|
|
root string
|
|
runtimeRoot string
|
|
config Config
|
|
configEvent chan lncluster.ConfigEventType // todo: make this array and goroutine safe
|
|
attachers map[string]*attacher
|
|
watchStream chan *swarmapi.WatchMessage
|
|
}
|
|
|
|
// attacher manages the in-memory attachment state of a container
|
|
// attachment to a global scope network managed by swarm manager. It
|
|
// helps in identifying the attachment ID via the taskID and the
|
|
// corresponding attachment configuration obtained from the manager.
|
|
type attacher struct {
|
|
taskID string
|
|
config *network.NetworkingConfig
|
|
inProgress bool
|
|
attachWaitCh chan *network.NetworkingConfig
|
|
attachCompleteCh chan struct{}
|
|
detachWaitCh chan struct{}
|
|
}
|
|
|
|
// New creates a new Cluster instance using provided config.
|
|
func New(config Config) (*Cluster, error) {
|
|
root := filepath.Join(config.Root, swarmDirName)
|
|
if err := os.MkdirAll(root, 0700); err != nil {
|
|
return nil, err
|
|
}
|
|
if config.RuntimeRoot == "" {
|
|
config.RuntimeRoot = root
|
|
}
|
|
if config.RaftHeartbeatTick == 0 {
|
|
config.RaftHeartbeatTick = 1
|
|
}
|
|
if config.RaftElectionTick == 0 {
|
|
// 10X heartbeat tick is the recommended ratio according to etcd docs.
|
|
config.RaftElectionTick = 10 * config.RaftHeartbeatTick
|
|
}
|
|
|
|
if err := os.MkdirAll(config.RuntimeRoot, 0700); err != nil {
|
|
return nil, err
|
|
}
|
|
c := &Cluster{
|
|
root: root,
|
|
config: config,
|
|
configEvent: make(chan lncluster.ConfigEventType, 10),
|
|
runtimeRoot: config.RuntimeRoot,
|
|
attachers: make(map[string]*attacher),
|
|
watchStream: config.WatchStream,
|
|
}
|
|
return c, nil
|
|
}
|
|
|
|
// Start the Cluster instance
|
|
// TODO The split between New and Start can be join again when the SendClusterEvent
|
|
// method is no longer required
|
|
func (c *Cluster) Start() error {
|
|
root := filepath.Join(c.config.Root, swarmDirName)
|
|
|
|
nodeConfig, err := loadPersistentState(root)
|
|
if err != nil {
|
|
if os.IsNotExist(err) {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
|
|
nr, err := c.newNodeRunner(*nodeConfig)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
c.nr = nr
|
|
|
|
timer := time.NewTimer(swarmConnectTimeout)
|
|
defer timer.Stop()
|
|
|
|
select {
|
|
case <-timer.C:
|
|
logrus.Error("swarm component could not be started before timeout was reached")
|
|
case err := <-nr.Ready():
|
|
if err != nil {
|
|
logrus.WithError(err).Error("swarm component could not be started")
|
|
return nil
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *Cluster) newNodeRunner(conf nodeStartConfig) (*nodeRunner, error) {
|
|
if err := c.config.Backend.IsSwarmCompatible(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
actualLocalAddr := conf.LocalAddr
|
|
if actualLocalAddr == "" {
|
|
// If localAddr was not specified, resolve it automatically
|
|
// based on the route to joinAddr. localAddr can only be left
|
|
// empty on "join".
|
|
listenHost, _, err := net.SplitHostPort(conf.ListenAddr)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("could not parse listen address: %v", err)
|
|
}
|
|
|
|
listenAddrIP := net.ParseIP(listenHost)
|
|
if listenAddrIP == nil || !listenAddrIP.IsUnspecified() {
|
|
actualLocalAddr = listenHost
|
|
} else {
|
|
if conf.RemoteAddr == "" {
|
|
// Should never happen except using swarms created by
|
|
// old versions that didn't save remoteAddr.
|
|
conf.RemoteAddr = "8.8.8.8:53"
|
|
}
|
|
conn, err := net.Dial("udp", conf.RemoteAddr)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("could not find local IP address: %v", err)
|
|
}
|
|
localHostPort := conn.LocalAddr().String()
|
|
actualLocalAddr, _, _ = net.SplitHostPort(localHostPort)
|
|
conn.Close()
|
|
}
|
|
}
|
|
|
|
nr := &nodeRunner{cluster: c}
|
|
nr.actualLocalAddr = actualLocalAddr
|
|
|
|
if err := nr.Start(conf); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
c.config.Backend.DaemonJoinsCluster(c)
|
|
|
|
return nr, nil
|
|
}
|
|
|
|
func (c *Cluster) getRequestContext() (context.Context, func()) { // TODO: not needed when requests don't block on qourum lost
|
|
return context.WithTimeout(context.Background(), swarmRequestTimeout)
|
|
}
|
|
|
|
// IsManager returns true if Cluster is participating as a manager.
|
|
func (c *Cluster) IsManager() bool {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
return c.currentNodeState().IsActiveManager()
|
|
}
|
|
|
|
// IsAgent returns true if Cluster is participating as a worker/agent.
|
|
func (c *Cluster) IsAgent() bool {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
return c.currentNodeState().status == types.LocalNodeStateActive
|
|
}
|
|
|
|
// GetLocalAddress returns the local address.
|
|
func (c *Cluster) GetLocalAddress() string {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
return c.currentNodeState().actualLocalAddr
|
|
}
|
|
|
|
// GetListenAddress returns the listen address.
|
|
func (c *Cluster) GetListenAddress() string {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
if c.nr != nil {
|
|
return c.nr.config.ListenAddr
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// GetAdvertiseAddress returns the remotely reachable address of this node.
|
|
func (c *Cluster) GetAdvertiseAddress() string {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
if c.nr != nil && c.nr.config.AdvertiseAddr != "" {
|
|
advertiseHost, _, _ := net.SplitHostPort(c.nr.config.AdvertiseAddr)
|
|
return advertiseHost
|
|
}
|
|
return c.currentNodeState().actualLocalAddr
|
|
}
|
|
|
|
// GetDataPathAddress returns the address to be used for the data path traffic, if specified.
|
|
func (c *Cluster) GetDataPathAddress() string {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
if c.nr != nil {
|
|
return c.nr.config.DataPathAddr
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// GetRemoteAddressList returns the advertise address for each of the remote managers if
|
|
// available.
|
|
func (c *Cluster) GetRemoteAddressList() []string {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
return c.getRemoteAddressList()
|
|
}
|
|
|
|
// GetWatchStream returns the channel to pass changes from store watch API
|
|
func (c *Cluster) GetWatchStream() chan *swarmapi.WatchMessage {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
return c.watchStream
|
|
}
|
|
|
|
func (c *Cluster) getRemoteAddressList() []string {
|
|
state := c.currentNodeState()
|
|
if state.swarmNode == nil {
|
|
return []string{}
|
|
}
|
|
|
|
nodeID := state.swarmNode.NodeID()
|
|
remotes := state.swarmNode.Remotes()
|
|
addressList := make([]string, 0, len(remotes))
|
|
for _, r := range remotes {
|
|
if r.NodeID != nodeID {
|
|
addressList = append(addressList, r.Addr)
|
|
}
|
|
}
|
|
return addressList
|
|
}
|
|
|
|
// ListenClusterEvents returns a channel that receives messages on cluster
|
|
// participation changes.
|
|
// todo: make cancelable and accessible to multiple callers
|
|
func (c *Cluster) ListenClusterEvents() <-chan lncluster.ConfigEventType {
|
|
return c.configEvent
|
|
}
|
|
|
|
// currentNodeState should not be called without a read lock
|
|
func (c *Cluster) currentNodeState() nodeState {
|
|
return c.nr.State()
|
|
}
|
|
|
|
// errNoManager returns error describing why manager commands can't be used.
|
|
// Call with read lock.
|
|
func (c *Cluster) errNoManager(st nodeState) error {
|
|
if st.swarmNode == nil {
|
|
if errors.Is(st.err, errSwarmLocked) {
|
|
return errSwarmLocked
|
|
}
|
|
if st.err == errSwarmCertificatesExpired {
|
|
return errSwarmCertificatesExpired
|
|
}
|
|
return errors.WithStack(notAvailableError("This node is not a swarm manager. Use \"docker swarm init\" or \"docker swarm join\" to connect this node to swarm and try again."))
|
|
}
|
|
if st.swarmNode.Manager() != nil {
|
|
return errors.WithStack(notAvailableError("This node is not a swarm manager. Manager is being prepared or has trouble connecting to the cluster."))
|
|
}
|
|
return errors.WithStack(notAvailableError("This node is not a swarm manager. Worker nodes can't be used to view or modify cluster state. Please run this command on a manager node or promote the current node to a manager."))
|
|
}
|
|
|
|
// Cleanup stops active swarm node. This is run before daemon shutdown.
|
|
func (c *Cluster) Cleanup() {
|
|
c.controlMutex.Lock()
|
|
defer c.controlMutex.Unlock()
|
|
|
|
c.mu.Lock()
|
|
node := c.nr
|
|
if node == nil {
|
|
c.mu.Unlock()
|
|
return
|
|
}
|
|
state := c.currentNodeState()
|
|
c.mu.Unlock()
|
|
|
|
if state.IsActiveManager() {
|
|
active, reachable, unreachable, err := managerStats(state.controlClient, state.NodeID())
|
|
if err == nil {
|
|
singlenode := active && isLastManager(reachable, unreachable)
|
|
if active && !singlenode && removingManagerCausesLossOfQuorum(reachable, unreachable) {
|
|
logrus.Errorf("Leaving cluster with %v managers left out of %v. Raft quorum will be lost.", reachable-1, reachable+unreachable)
|
|
}
|
|
}
|
|
}
|
|
|
|
if err := node.Stop(); err != nil {
|
|
logrus.Errorf("failed to shut down cluster node: %v", err)
|
|
stack.Dump()
|
|
}
|
|
|
|
c.mu.Lock()
|
|
c.nr = nil
|
|
c.mu.Unlock()
|
|
}
|
|
|
|
func managerStats(client swarmapi.ControlClient, currentNodeID string) (current bool, reachable int, unreachable int, err error) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
nodes, err := client.ListNodes(
|
|
ctx, &swarmapi.ListNodesRequest{},
|
|
grpc.MaxCallRecvMsgSize(defaultRecvSizeForListResponse),
|
|
)
|
|
if err != nil {
|
|
return false, 0, 0, err
|
|
}
|
|
for _, n := range nodes.Nodes {
|
|
if n.ManagerStatus != nil {
|
|
if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_REACHABLE {
|
|
reachable++
|
|
if n.ID == currentNodeID {
|
|
current = true
|
|
}
|
|
}
|
|
if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_UNREACHABLE {
|
|
unreachable++
|
|
}
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func detectLockedError(err error) error {
|
|
if err == swarmnode.ErrInvalidUnlockKey {
|
|
return errors.WithStack(errSwarmLocked)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (c *Cluster) lockedManagerAction(fn func(ctx context.Context, state nodeState) error) error {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
|
|
state := c.currentNodeState()
|
|
if !state.IsActiveManager() {
|
|
return c.errNoManager(state)
|
|
}
|
|
|
|
ctx, cancel := c.getRequestContext()
|
|
defer cancel()
|
|
|
|
return fn(ctx, state)
|
|
}
|
|
|
|
// SendClusterEvent allows to send cluster events on the configEvent channel
|
|
// TODO This method should not be exposed.
|
|
// Currently it is used to notify the network controller that the keys are
|
|
// available
|
|
func (c *Cluster) SendClusterEvent(event lncluster.ConfigEventType) {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
c.configEvent <- event
|
|
}
|