123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461 |
- package cluster // import "github.com/docker/docker/daemon/cluster"
- //
- // ## Swarmkit integration
- //
- // Cluster - static configurable object for accessing everything swarm related.
- // Contains methods for connecting and controlling the cluster. Exists always,
- // even if swarm mode is not enabled.
- //
- // NodeRunner - Manager for starting the swarmkit node. Is present only and
- // always if swarm mode is enabled. Implements backoff restart loop in case of
- // errors.
- //
- // NodeState - Information about the current node status including access to
- // gRPC clients if a manager is active.
- //
- // ### Locking
- //
- // `cluster.controlMutex` - taken for the whole lifecycle of the processes that
- // can reconfigure cluster(init/join/leave etc). Protects that one
- // reconfiguration action has fully completed before another can start.
- //
- // `cluster.mu` - taken when the actual changes in cluster configurations
- // happen. Different from `controlMutex` because in some cases we need to
- // access current cluster state even if the long-running reconfiguration is
- // going on. For example network stack may ask for the current cluster state in
- // the middle of the shutdown. Any time current cluster state is asked you
- // should take the read lock of `cluster.mu`. If you are writing an API
- // responder that returns synchronously, hold `cluster.mu.RLock()` for the
- // duration of the whole handler function. That ensures that node will not be
- // shut down until the handler has finished.
- //
- // NodeRunner implements its internal locks that should not be used outside of
- // the struct. Instead, you should just call `nodeRunner.State()` method to get
- // the current state of the cluster(still need `cluster.mu.RLock()` to access
- // `cluster.nr` reference itself). Most of the changes in NodeRunner happen
- // because of an external event(network problem, unexpected swarmkit error) and
- // Docker shouldn't take any locks that delay these changes from happening.
- //
- import (
- "context"
- "fmt"
- "math"
- "net"
- "os"
- "path/filepath"
- "runtime"
- "sync"
- "time"
- "github.com/containerd/log"
- "github.com/docker/docker/api/types/network"
- types "github.com/docker/docker/api/types/swarm"
- "github.com/docker/docker/daemon/cluster/controllers/plugin"
- executorpkg "github.com/docker/docker/daemon/cluster/executor"
- lncluster "github.com/docker/docker/libnetwork/cluster"
- "github.com/docker/docker/pkg/stack"
- swarmapi "github.com/moby/swarmkit/v2/api"
- swarmnode "github.com/moby/swarmkit/v2/node"
- "github.com/pkg/errors"
- "google.golang.org/grpc"
- )
- const (
- swarmDirName = "swarm"
- controlSocket = "control.sock"
- swarmConnectTimeout = 20 * time.Second
- swarmRequestTimeout = 20 * time.Second
- stateFile = "docker-state.json"
- defaultAddr = "tcp://0.0.0.0:2377"
- isWindows = runtime.GOOS == "windows"
- initialReconnectDelay = 100 * time.Millisecond
- maxReconnectDelay = 30 * time.Second
- contextPrefix = "com.docker.swarm"
- defaultRecvSizeForListResponse = math.MaxInt32 // the max recv limit grpc <1.4.0
- )
- // NetworkSubnetsProvider exposes functions for retrieving the subnets
- // of networks managed by Docker, so they can be filtered.
- type NetworkSubnetsProvider interface {
- Subnets() ([]net.IPNet, []net.IPNet)
- }
- // Config provides values for Cluster.
- type Config struct {
- Root string
- Name string
- Backend executorpkg.Backend
- ImageBackend executorpkg.ImageBackend
- PluginBackend plugin.Backend
- VolumeBackend executorpkg.VolumeBackend
- NetworkSubnetsProvider NetworkSubnetsProvider
- // DefaultAdvertiseAddr is the default host/IP or network interface to use
- // if no AdvertiseAddr value is specified.
- DefaultAdvertiseAddr string
- // path to store runtime state, such as the swarm control socket
- RuntimeRoot string
- // WatchStream is a channel to pass watch API notifications to daemon
- WatchStream chan *swarmapi.WatchMessage
- // RaftHeartbeatTick is the number of ticks for heartbeat of quorum members
- RaftHeartbeatTick uint32
- // RaftElectionTick is the number of ticks to elapse before followers propose a new round of leader election
- // This value should be 10x that of RaftHeartbeatTick
- RaftElectionTick uint32
- }
- // Cluster provides capabilities to participate in a cluster as a worker or a
- // manager.
- type Cluster struct {
- mu sync.RWMutex
- controlMutex sync.RWMutex // protect init/join/leave user operations
- nr *nodeRunner
- root string
- runtimeRoot string
- config Config
- configEvent chan lncluster.ConfigEventType // todo: make this array and goroutine safe
- attachers map[string]*attacher
- watchStream chan *swarmapi.WatchMessage
- }
- // attacher manages the in-memory attachment state of a container
- // attachment to a global scope network managed by swarm manager. It
- // helps in identifying the attachment ID via the taskID and the
- // corresponding attachment configuration obtained from the manager.
- type attacher struct {
- taskID string
- config *network.NetworkingConfig
- inProgress bool
- attachWaitCh chan *network.NetworkingConfig
- attachCompleteCh chan struct{}
- detachWaitCh chan struct{}
- }
- // New creates a new Cluster instance using provided config.
- func New(config Config) (*Cluster, error) {
- root := filepath.Join(config.Root, swarmDirName)
- if err := os.MkdirAll(root, 0o700); err != nil {
- return nil, err
- }
- if config.RuntimeRoot == "" {
- config.RuntimeRoot = root
- }
- if config.RaftHeartbeatTick == 0 {
- config.RaftHeartbeatTick = 1
- }
- if config.RaftElectionTick == 0 {
- // 10X heartbeat tick is the recommended ratio according to etcd docs.
- config.RaftElectionTick = 10 * config.RaftHeartbeatTick
- }
- if err := os.MkdirAll(config.RuntimeRoot, 0o700); err != nil {
- return nil, err
- }
- c := &Cluster{
- root: root,
- config: config,
- configEvent: make(chan lncluster.ConfigEventType, 10),
- runtimeRoot: config.RuntimeRoot,
- attachers: make(map[string]*attacher),
- watchStream: config.WatchStream,
- }
- return c, nil
- }
- // Start the Cluster instance
- // TODO The split between New and Start can be join again when the SendClusterEvent
- // method is no longer required
- func (c *Cluster) Start() error {
- root := filepath.Join(c.config.Root, swarmDirName)
- nodeConfig, err := loadPersistentState(root)
- if err != nil {
- if os.IsNotExist(err) {
- return nil
- }
- return err
- }
- nr, err := c.newNodeRunner(*nodeConfig)
- if err != nil {
- return err
- }
- c.nr = nr
- timer := time.NewTimer(swarmConnectTimeout)
- defer timer.Stop()
- select {
- case <-timer.C:
- log.G(context.TODO()).Error("swarm component could not be started before timeout was reached")
- case err := <-nr.Ready():
- if err != nil {
- log.G(context.TODO()).WithError(err).Error("swarm component could not be started")
- return nil
- }
- }
- return nil
- }
- func (c *Cluster) newNodeRunner(conf nodeStartConfig) (*nodeRunner, error) {
- if err := c.config.Backend.IsSwarmCompatible(); err != nil {
- return nil, err
- }
- actualLocalAddr := conf.LocalAddr
- if actualLocalAddr == "" {
- // If localAddr was not specified, resolve it automatically
- // based on the route to joinAddr. localAddr can only be left
- // empty on "join".
- listenHost, _, err := net.SplitHostPort(conf.ListenAddr)
- if err != nil {
- return nil, fmt.Errorf("could not parse listen address: %v", err)
- }
- listenAddrIP := net.ParseIP(listenHost)
- if listenAddrIP == nil || !listenAddrIP.IsUnspecified() {
- actualLocalAddr = listenHost
- } else {
- if conf.RemoteAddr == "" {
- // Should never happen except using swarms created by
- // old versions that didn't save remoteAddr.
- conf.RemoteAddr = "8.8.8.8:53"
- }
- conn, err := net.Dial("udp", conf.RemoteAddr)
- if err != nil {
- return nil, fmt.Errorf("could not find local IP address: %v", err)
- }
- localHostPort := conn.LocalAddr().String()
- actualLocalAddr, _, _ = net.SplitHostPort(localHostPort)
- conn.Close()
- }
- }
- nr := &nodeRunner{cluster: c}
- nr.actualLocalAddr = actualLocalAddr
- if err := nr.Start(conf); err != nil {
- return nil, err
- }
- c.config.Backend.DaemonJoinsCluster(c)
- return nr, nil
- }
- func (c *Cluster) getRequestContext(ctx context.Context) (context.Context, func()) { // TODO: not needed when requests don't block on qourum lost
- return context.WithTimeout(ctx, swarmRequestTimeout)
- }
- // IsManager returns true if Cluster is participating as a manager.
- func (c *Cluster) IsManager() bool {
- c.mu.RLock()
- defer c.mu.RUnlock()
- return c.currentNodeState().IsActiveManager()
- }
- // IsAgent returns true if Cluster is participating as a worker/agent.
- func (c *Cluster) IsAgent() bool {
- c.mu.RLock()
- defer c.mu.RUnlock()
- return c.currentNodeState().status == types.LocalNodeStateActive
- }
- // GetLocalAddress returns the local address.
- func (c *Cluster) GetLocalAddress() string {
- c.mu.RLock()
- defer c.mu.RUnlock()
- return c.currentNodeState().actualLocalAddr
- }
- // GetListenAddress returns the listen address.
- func (c *Cluster) GetListenAddress() string {
- c.mu.RLock()
- defer c.mu.RUnlock()
- if c.nr != nil {
- return c.nr.config.ListenAddr
- }
- return ""
- }
- // GetAdvertiseAddress returns the remotely reachable address of this node.
- func (c *Cluster) GetAdvertiseAddress() string {
- c.mu.RLock()
- defer c.mu.RUnlock()
- if c.nr != nil && c.nr.config.AdvertiseAddr != "" {
- advertiseHost, _, _ := net.SplitHostPort(c.nr.config.AdvertiseAddr)
- return advertiseHost
- }
- return c.currentNodeState().actualLocalAddr
- }
- // GetDataPathAddress returns the address to be used for the data path traffic, if specified.
- func (c *Cluster) GetDataPathAddress() string {
- c.mu.RLock()
- defer c.mu.RUnlock()
- if c.nr != nil {
- return c.nr.config.DataPathAddr
- }
- return ""
- }
- // GetRemoteAddressList returns the advertise address for each of the remote managers if
- // available.
- func (c *Cluster) GetRemoteAddressList() []string {
- c.mu.RLock()
- defer c.mu.RUnlock()
- return c.getRemoteAddressList()
- }
- // GetWatchStream returns the channel to pass changes from store watch API
- func (c *Cluster) GetWatchStream() chan *swarmapi.WatchMessage {
- c.mu.RLock()
- defer c.mu.RUnlock()
- return c.watchStream
- }
- func (c *Cluster) getRemoteAddressList() []string {
- state := c.currentNodeState()
- if state.swarmNode == nil {
- return []string{}
- }
- nodeID := state.swarmNode.NodeID()
- remotes := state.swarmNode.Remotes()
- addressList := make([]string, 0, len(remotes))
- for _, r := range remotes {
- if r.NodeID != nodeID {
- addressList = append(addressList, r.Addr)
- }
- }
- return addressList
- }
- // ListenClusterEvents returns a channel that receives messages on cluster
- // participation changes.
- // todo: make cancelable and accessible to multiple callers
- func (c *Cluster) ListenClusterEvents() <-chan lncluster.ConfigEventType {
- return c.configEvent
- }
- // currentNodeState should not be called without a read lock
- func (c *Cluster) currentNodeState() nodeState {
- return c.nr.State()
- }
- // errNoManager returns error describing why manager commands can't be used.
- // Call with read lock.
- func (c *Cluster) errNoManager(st nodeState) error {
- if st.swarmNode == nil {
- if errors.Is(st.err, errSwarmLocked) {
- return errSwarmLocked
- }
- if st.err == errSwarmCertificatesExpired {
- return errSwarmCertificatesExpired
- }
- return errors.WithStack(notAvailableError(`This node is not a swarm manager. Use "docker swarm init" or "docker swarm join" to connect this node to swarm and try again.`))
- }
- if st.swarmNode.Manager() != nil {
- return errors.WithStack(notAvailableError("This node is not a swarm manager. Manager is being prepared or has trouble connecting to the cluster."))
- }
- return errors.WithStack(notAvailableError("This node is not a swarm manager. Worker nodes can't be used to view or modify cluster state. Please run this command on a manager node or promote the current node to a manager."))
- }
- // Cleanup stops active swarm node. This is run before daemon shutdown.
- func (c *Cluster) Cleanup() {
- c.controlMutex.Lock()
- defer c.controlMutex.Unlock()
- c.mu.Lock()
- node := c.nr
- if node == nil {
- c.mu.Unlock()
- return
- }
- state := c.currentNodeState()
- c.mu.Unlock()
- if state.IsActiveManager() {
- active, reachable, unreachable, err := managerStats(state.controlClient, state.NodeID())
- if err == nil {
- singlenode := active && isLastManager(reachable, unreachable)
- if active && !singlenode && removingManagerCausesLossOfQuorum(reachable, unreachable) {
- log.G(context.TODO()).Errorf("Leaving cluster with %v managers left out of %v. Raft quorum will be lost.", reachable-1, reachable+unreachable)
- }
- }
- }
- if err := node.Stop(); err != nil {
- log.G(context.TODO()).Errorf("failed to shut down cluster node: %v", err)
- stack.Dump()
- }
- c.mu.Lock()
- c.nr = nil
- c.mu.Unlock()
- }
- func managerStats(client swarmapi.ControlClient, currentNodeID string) (current bool, reachable int, unreachable int, err error) {
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- nodes, err := client.ListNodes(
- ctx, &swarmapi.ListNodesRequest{},
- grpc.MaxCallRecvMsgSize(defaultRecvSizeForListResponse),
- )
- if err != nil {
- return false, 0, 0, err
- }
- for _, n := range nodes.Nodes {
- if n.ManagerStatus != nil {
- if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_REACHABLE {
- reachable++
- if n.ID == currentNodeID {
- current = true
- }
- }
- if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_UNREACHABLE {
- unreachable++
- }
- }
- }
- return
- }
- func detectLockedError(err error) error {
- if err == swarmnode.ErrInvalidUnlockKey {
- return errors.WithStack(errSwarmLocked)
- }
- return err
- }
- func (c *Cluster) lockedManagerAction(fn func(ctx context.Context, state nodeState) error) error {
- c.mu.RLock()
- defer c.mu.RUnlock()
- state := c.currentNodeState()
- if !state.IsActiveManager() {
- return c.errNoManager(state)
- }
- ctx := context.TODO()
- ctx, cancel := c.getRequestContext(ctx)
- defer cancel()
- return fn(ctx, state)
- }
- // SendClusterEvent allows to send cluster events on the configEvent channel
- // TODO This method should not be exposed.
- // Currently it is used to notify the network controller that the keys are
- // available
- func (c *Cluster) SendClusterEvent(event lncluster.ConfigEventType) {
- c.mu.RLock()
- defer c.mu.RUnlock()
- c.configEvent <- event
- }
|