123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404 |
- package cluster // import "github.com/docker/docker/daemon/cluster"
- import (
- "context"
- "fmt"
- "path/filepath"
- "strings"
- "sync"
- "time"
- "github.com/containerd/log"
- types "github.com/docker/docker/api/types/swarm"
- "github.com/docker/docker/daemon/cluster/executor/container"
- lncluster "github.com/docker/docker/libnetwork/cluster"
- swarmapi "github.com/moby/swarmkit/v2/api"
- swarmallocator "github.com/moby/swarmkit/v2/manager/allocator/cnmallocator"
- swarmnode "github.com/moby/swarmkit/v2/node"
- "github.com/pkg/errors"
- "google.golang.org/grpc"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
- )
- // nodeRunner implements a manager for continuously running swarmkit node, restarting them with backoff delays if needed.
- type nodeRunner struct {
- nodeState
- mu sync.RWMutex
- done chan struct{} // closed when swarmNode exits
- ready chan struct{} // closed when swarmNode becomes active
- reconnectDelay time.Duration
- config nodeStartConfig
- repeatedRun bool
- cancelReconnect func()
- stopping bool
- cluster *Cluster // only for accessing config helpers, never call any methods. TODO: change to config struct
- }
- // nodeStartConfig holds configuration needed to start a new node. Exported
- // fields of this structure are saved to disk in json. Unexported fields
- // contain data that shouldn't be persisted between daemon reloads.
- type nodeStartConfig struct {
- // LocalAddr is this machine's local IP or hostname, if specified.
- LocalAddr string
- // RemoteAddr is the address that was given to "swarm join". It is used
- // to find LocalAddr if necessary.
- RemoteAddr string
- // ListenAddr is the address we bind to, including a port.
- ListenAddr string
- // AdvertiseAddr is the address other nodes should connect to,
- // including a port.
- AdvertiseAddr string
- // DataPathAddr is the address that has to be used for the data path
- DataPathAddr string
- // DefaultAddressPool contains list of subnets
- DefaultAddressPool []string
- // SubnetSize contains subnet size of DefaultAddressPool
- SubnetSize uint32
- // DataPathPort contains Data path port (VXLAN UDP port) number that is used for data traffic.
- DataPathPort uint32
- // JoinInProgress is set to true if a join operation has started, but
- // not completed yet.
- JoinInProgress bool
- joinAddr string
- forceNewCluster bool
- joinToken string
- lockKey []byte
- autolock bool
- availability types.NodeAvailability
- }
- func (n *nodeRunner) Ready() chan error {
- c := make(chan error, 1)
- n.mu.RLock()
- ready, done := n.ready, n.done
- n.mu.RUnlock()
- go func() {
- select {
- case <-ready:
- case <-done:
- }
- select {
- case <-ready:
- default:
- n.mu.RLock()
- c <- n.err
- n.mu.RUnlock()
- }
- close(c)
- }()
- return c
- }
- func (n *nodeRunner) Start(conf nodeStartConfig) error {
- n.mu.Lock()
- defer n.mu.Unlock()
- n.reconnectDelay = initialReconnectDelay
- return n.start(conf)
- }
- func (n *nodeRunner) start(conf nodeStartConfig) error {
- var control string
- if isWindows {
- control = `\\.\pipe\` + controlSocket
- } else {
- control = filepath.Join(n.cluster.runtimeRoot, controlSocket)
- }
- joinAddr := conf.joinAddr
- if joinAddr == "" && conf.JoinInProgress {
- // We must have been restarted while trying to join a cluster.
- // Continue trying to join instead of forming our own cluster.
- joinAddr = conf.RemoteAddr
- }
- // Hostname is not set here. Instead, it is obtained from
- // the node description that is reported periodically
- swarmnodeConfig := swarmnode.Config{
- ForceNewCluster: conf.forceNewCluster,
- ListenControlAPI: control,
- ListenRemoteAPI: conf.ListenAddr,
- AdvertiseRemoteAPI: conf.AdvertiseAddr,
- NetworkConfig: &swarmallocator.NetworkConfig{
- DefaultAddrPool: conf.DefaultAddressPool,
- SubnetSize: conf.SubnetSize,
- VXLANUDPPort: conf.DataPathPort,
- },
- JoinAddr: joinAddr,
- StateDir: n.cluster.root,
- JoinToken: conf.joinToken,
- Executor: container.NewExecutor(
- n.cluster.config.Backend,
- n.cluster.config.PluginBackend,
- n.cluster.config.ImageBackend,
- n.cluster.config.VolumeBackend,
- ),
- HeartbeatTick: n.cluster.config.RaftHeartbeatTick,
- // Recommended value in etcd/raft is 10 x (HeartbeatTick).
- // Lower values were seen to have caused instability because of
- // frequent leader elections when running on flakey networks.
- ElectionTick: n.cluster.config.RaftElectionTick,
- UnlockKey: conf.lockKey,
- AutoLockManagers: conf.autolock,
- PluginGetter: n.cluster.config.Backend.PluginGetter(),
- }
- if conf.availability != "" {
- avail, ok := swarmapi.NodeSpec_Availability_value[strings.ToUpper(string(conf.availability))]
- if !ok {
- return fmt.Errorf("invalid Availability: %q", conf.availability)
- }
- swarmnodeConfig.Availability = swarmapi.NodeSpec_Availability(avail)
- }
- node, err := swarmnode.New(&swarmnodeConfig)
- if err != nil {
- return err
- }
- if err := node.Start(context.Background()); err != nil {
- return err
- }
- n.done = make(chan struct{})
- n.ready = make(chan struct{})
- n.swarmNode = node
- if conf.joinAddr != "" {
- conf.JoinInProgress = true
- }
- n.config = conf
- savePersistentState(n.cluster.root, conf)
- ctx, cancel := context.WithCancel(context.Background())
- go func() {
- n.handleNodeExit(node)
- cancel()
- }()
- go n.handleReadyEvent(ctx, node, n.ready)
- go n.handleControlSocketChange(ctx, node)
- return nil
- }
- func (n *nodeRunner) handleControlSocketChange(ctx context.Context, node *swarmnode.Node) {
- for conn := range node.ListenControlSocket(ctx) {
- n.mu.Lock()
- if n.grpcConn != conn {
- if conn == nil {
- n.controlClient = nil
- n.logsClient = nil
- } else {
- n.controlClient = swarmapi.NewControlClient(conn)
- n.logsClient = swarmapi.NewLogsClient(conn)
- // push store changes to daemon
- go n.watchClusterEvents(ctx, conn)
- }
- }
- n.grpcConn = conn
- n.mu.Unlock()
- n.cluster.SendClusterEvent(lncluster.EventSocketChange)
- }
- }
- func (n *nodeRunner) watchClusterEvents(ctx context.Context, conn *grpc.ClientConn) {
- client := swarmapi.NewWatchClient(conn)
- watch, err := client.Watch(ctx, &swarmapi.WatchRequest{
- Entries: []*swarmapi.WatchRequest_WatchEntry{
- {
- Kind: "node",
- Action: swarmapi.WatchActionKindCreate | swarmapi.WatchActionKindUpdate | swarmapi.WatchActionKindRemove,
- },
- {
- Kind: "service",
- Action: swarmapi.WatchActionKindCreate | swarmapi.WatchActionKindUpdate | swarmapi.WatchActionKindRemove,
- },
- {
- Kind: "network",
- Action: swarmapi.WatchActionKindCreate | swarmapi.WatchActionKindUpdate | swarmapi.WatchActionKindRemove,
- },
- {
- Kind: "secret",
- Action: swarmapi.WatchActionKindCreate | swarmapi.WatchActionKindUpdate | swarmapi.WatchActionKindRemove,
- },
- {
- Kind: "config",
- Action: swarmapi.WatchActionKindCreate | swarmapi.WatchActionKindUpdate | swarmapi.WatchActionKindRemove,
- },
- },
- IncludeOldObject: true,
- })
- if err != nil {
- log.G(ctx).WithError(err).Error("failed to watch cluster store")
- return
- }
- for {
- msg, err := watch.Recv()
- if err != nil {
- // store watch is broken
- errStatus, ok := status.FromError(err)
- if !ok || errStatus.Code() != codes.Canceled {
- log.G(ctx).WithError(err).Error("failed to receive changes from store watch API")
- }
- return
- }
- select {
- case <-ctx.Done():
- return
- case n.cluster.watchStream <- msg:
- }
- }
- }
- func (n *nodeRunner) handleReadyEvent(ctx context.Context, node *swarmnode.Node, ready chan struct{}) {
- select {
- case <-node.Ready():
- n.mu.Lock()
- n.err = nil
- if n.config.JoinInProgress {
- n.config.JoinInProgress = false
- savePersistentState(n.cluster.root, n.config)
- }
- n.mu.Unlock()
- close(ready)
- case <-ctx.Done():
- }
- n.cluster.SendClusterEvent(lncluster.EventNodeReady)
- }
- func (n *nodeRunner) handleNodeExit(node *swarmnode.Node) {
- err := detectLockedError(node.Err(context.Background()))
- if err != nil {
- log.G(context.TODO()).Errorf("cluster exited with error: %v", err)
- }
- n.mu.Lock()
- n.swarmNode = nil
- n.err = err
- close(n.done)
- select {
- case <-n.ready:
- n.enableReconnectWatcher()
- default:
- if n.repeatedRun {
- n.enableReconnectWatcher()
- }
- }
- n.repeatedRun = true
- n.mu.Unlock()
- }
- // Stop stops the current swarm node if it is running.
- func (n *nodeRunner) Stop() error {
- n.mu.Lock()
- if n.cancelReconnect != nil { // between restarts
- n.cancelReconnect()
- n.cancelReconnect = nil
- }
- if n.swarmNode == nil {
- // even though the swarm node is nil we still may need
- // to send a node leave event to perform any cleanup required.
- if n.cluster != nil {
- n.cluster.SendClusterEvent(lncluster.EventNodeLeave)
- }
- n.mu.Unlock()
- return nil
- }
- n.stopping = true
- ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
- defer cancel()
- n.mu.Unlock()
- if err := n.swarmNode.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") {
- return err
- }
- n.cluster.SendClusterEvent(lncluster.EventNodeLeave)
- <-n.done
- return nil
- }
- func (n *nodeRunner) State() nodeState {
- if n == nil {
- return nodeState{status: types.LocalNodeStateInactive}
- }
- n.mu.RLock()
- defer n.mu.RUnlock()
- ns := n.nodeState
- if ns.err != nil || n.cancelReconnect != nil {
- if errors.Is(ns.err, errSwarmLocked) {
- ns.status = types.LocalNodeStateLocked
- } else {
- ns.status = types.LocalNodeStateError
- }
- } else {
- select {
- case <-n.ready:
- ns.status = types.LocalNodeStateActive
- default:
- ns.status = types.LocalNodeStatePending
- }
- }
- return ns
- }
- func (n *nodeRunner) enableReconnectWatcher() {
- if n.stopping {
- return
- }
- n.reconnectDelay *= 2
- if n.reconnectDelay > maxReconnectDelay {
- n.reconnectDelay = maxReconnectDelay
- }
- log.G(context.TODO()).Warnf("Restarting swarm in %.2f seconds", n.reconnectDelay.Seconds())
- delayCtx, cancel := context.WithTimeout(context.Background(), n.reconnectDelay)
- n.cancelReconnect = cancel
- go func() {
- <-delayCtx.Done()
- if delayCtx.Err() != context.DeadlineExceeded {
- return
- }
- n.mu.Lock()
- defer n.mu.Unlock()
- if n.stopping {
- return
- }
- if err := n.start(n.config); err != nil {
- n.err = err
- }
- }()
- }
- // nodeState represents information about the current state of the cluster and
- // provides access to the grpc clients.
- type nodeState struct {
- swarmNode *swarmnode.Node
- grpcConn *grpc.ClientConn
- controlClient swarmapi.ControlClient
- logsClient swarmapi.LogsClient
- status types.LocalNodeState
- actualLocalAddr string
- err error
- }
- // IsActiveManager returns true if node is a manager ready to accept control requests. It is safe to access the client properties if this returns true.
- func (ns nodeState) IsActiveManager() bool {
- return ns.controlClient != nil
- }
- // IsManager returns true if node is a manager.
- func (ns nodeState) IsManager() bool {
- return ns.swarmNode != nil && ns.swarmNode.Manager() != nil
- }
- // NodeID returns node's ID or empty string if node is inactive.
- func (ns nodeState) NodeID() string {
- if ns.swarmNode != nil {
- return ns.swarmNode.NodeID()
- }
- return ""
- }
|