cluster.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461
  1. package cluster // import "github.com/docker/docker/daemon/cluster"
  2. //
  3. // ## Swarmkit integration
  4. //
  5. // Cluster - static configurable object for accessing everything swarm related.
  6. // Contains methods for connecting and controlling the cluster. Exists always,
  7. // even if swarm mode is not enabled.
  8. //
  9. // NodeRunner - Manager for starting the swarmkit node. Is present only and
  10. // always if swarm mode is enabled. Implements backoff restart loop in case of
  11. // errors.
  12. //
  13. // NodeState - Information about the current node status including access to
  14. // gRPC clients if a manager is active.
  15. //
  16. // ### Locking
  17. //
  18. // `cluster.controlMutex` - taken for the whole lifecycle of the processes that
  19. // can reconfigure cluster(init/join/leave etc). Protects that one
  20. // reconfiguration action has fully completed before another can start.
  21. //
  22. // `cluster.mu` - taken when the actual changes in cluster configurations
  23. // happen. Different from `controlMutex` because in some cases we need to
  24. // access current cluster state even if the long-running reconfiguration is
  25. // going on. For example network stack may ask for the current cluster state in
  26. // the middle of the shutdown. Any time current cluster state is asked you
  27. // should take the read lock of `cluster.mu`. If you are writing an API
  28. // responder that returns synchronously, hold `cluster.mu.RLock()` for the
  29. // duration of the whole handler function. That ensures that node will not be
  30. // shut down until the handler has finished.
  31. //
  32. // NodeRunner implements its internal locks that should not be used outside of
  33. // the struct. Instead, you should just call `nodeRunner.State()` method to get
  34. // the current state of the cluster(still need `cluster.mu.RLock()` to access
  35. // `cluster.nr` reference itself). Most of the changes in NodeRunner happen
  36. // because of an external event(network problem, unexpected swarmkit error) and
  37. // Docker shouldn't take any locks that delay these changes from happening.
  38. //
  39. import (
  40. "context"
  41. "fmt"
  42. "math"
  43. "net"
  44. "os"
  45. "path/filepath"
  46. "runtime"
  47. "sync"
  48. "time"
  49. "github.com/containerd/log"
  50. "github.com/docker/docker/api/types/network"
  51. types "github.com/docker/docker/api/types/swarm"
  52. "github.com/docker/docker/daemon/cluster/controllers/plugin"
  53. executorpkg "github.com/docker/docker/daemon/cluster/executor"
  54. lncluster "github.com/docker/docker/libnetwork/cluster"
  55. "github.com/docker/docker/pkg/stack"
  56. swarmapi "github.com/moby/swarmkit/v2/api"
  57. swarmnode "github.com/moby/swarmkit/v2/node"
  58. "github.com/pkg/errors"
  59. "google.golang.org/grpc"
  60. )
  61. const (
  62. swarmDirName = "swarm"
  63. controlSocket = "control.sock"
  64. swarmConnectTimeout = 20 * time.Second
  65. swarmRequestTimeout = 20 * time.Second
  66. stateFile = "docker-state.json"
  67. defaultAddr = "tcp://0.0.0.0:2377"
  68. isWindows = runtime.GOOS == "windows"
  69. initialReconnectDelay = 100 * time.Millisecond
  70. maxReconnectDelay = 30 * time.Second
  71. contextPrefix = "com.docker.swarm"
  72. defaultRecvSizeForListResponse = math.MaxInt32 // the max recv limit grpc <1.4.0
  73. )
  74. // NetworkSubnetsProvider exposes functions for retrieving the subnets
  75. // of networks managed by Docker, so they can be filtered.
  76. type NetworkSubnetsProvider interface {
  77. Subnets() ([]net.IPNet, []net.IPNet)
  78. }
  79. // Config provides values for Cluster.
  80. type Config struct {
  81. Root string
  82. Name string
  83. Backend executorpkg.Backend
  84. ImageBackend executorpkg.ImageBackend
  85. PluginBackend plugin.Backend
  86. VolumeBackend executorpkg.VolumeBackend
  87. NetworkSubnetsProvider NetworkSubnetsProvider
  88. // DefaultAdvertiseAddr is the default host/IP or network interface to use
  89. // if no AdvertiseAddr value is specified.
  90. DefaultAdvertiseAddr string
  91. // path to store runtime state, such as the swarm control socket
  92. RuntimeRoot string
  93. // WatchStream is a channel to pass watch API notifications to daemon
  94. WatchStream chan *swarmapi.WatchMessage
  95. // RaftHeartbeatTick is the number of ticks for heartbeat of quorum members
  96. RaftHeartbeatTick uint32
  97. // RaftElectionTick is the number of ticks to elapse before followers propose a new round of leader election
  98. // This value should be 10x that of RaftHeartbeatTick
  99. RaftElectionTick uint32
  100. }
  101. // Cluster provides capabilities to participate in a cluster as a worker or a
  102. // manager.
  103. type Cluster struct {
  104. mu sync.RWMutex
  105. controlMutex sync.RWMutex // protect init/join/leave user operations
  106. nr *nodeRunner
  107. root string
  108. runtimeRoot string
  109. config Config
  110. configEvent chan lncluster.ConfigEventType // todo: make this array and goroutine safe
  111. attachers map[string]*attacher
  112. watchStream chan *swarmapi.WatchMessage
  113. }
  114. // attacher manages the in-memory attachment state of a container
  115. // attachment to a global scope network managed by swarm manager. It
  116. // helps in identifying the attachment ID via the taskID and the
  117. // corresponding attachment configuration obtained from the manager.
  118. type attacher struct {
  119. taskID string
  120. config *network.NetworkingConfig
  121. inProgress bool
  122. attachWaitCh chan *network.NetworkingConfig
  123. attachCompleteCh chan struct{}
  124. detachWaitCh chan struct{}
  125. }
  126. // New creates a new Cluster instance using provided config.
  127. func New(config Config) (*Cluster, error) {
  128. root := filepath.Join(config.Root, swarmDirName)
  129. if err := os.MkdirAll(root, 0o700); err != nil {
  130. return nil, err
  131. }
  132. if config.RuntimeRoot == "" {
  133. config.RuntimeRoot = root
  134. }
  135. if config.RaftHeartbeatTick == 0 {
  136. config.RaftHeartbeatTick = 1
  137. }
  138. if config.RaftElectionTick == 0 {
  139. // 10X heartbeat tick is the recommended ratio according to etcd docs.
  140. config.RaftElectionTick = 10 * config.RaftHeartbeatTick
  141. }
  142. if err := os.MkdirAll(config.RuntimeRoot, 0o700); err != nil {
  143. return nil, err
  144. }
  145. c := &Cluster{
  146. root: root,
  147. config: config,
  148. configEvent: make(chan lncluster.ConfigEventType, 10),
  149. runtimeRoot: config.RuntimeRoot,
  150. attachers: make(map[string]*attacher),
  151. watchStream: config.WatchStream,
  152. }
  153. return c, nil
  154. }
  155. // Start the Cluster instance
  156. // TODO The split between New and Start can be join again when the SendClusterEvent
  157. // method is no longer required
  158. func (c *Cluster) Start() error {
  159. root := filepath.Join(c.config.Root, swarmDirName)
  160. nodeConfig, err := loadPersistentState(root)
  161. if err != nil {
  162. if os.IsNotExist(err) {
  163. return nil
  164. }
  165. return err
  166. }
  167. nr, err := c.newNodeRunner(*nodeConfig)
  168. if err != nil {
  169. return err
  170. }
  171. c.nr = nr
  172. timer := time.NewTimer(swarmConnectTimeout)
  173. defer timer.Stop()
  174. select {
  175. case <-timer.C:
  176. log.G(context.TODO()).Error("swarm component could not be started before timeout was reached")
  177. case err := <-nr.Ready():
  178. if err != nil {
  179. log.G(context.TODO()).WithError(err).Error("swarm component could not be started")
  180. return nil
  181. }
  182. }
  183. return nil
  184. }
  185. func (c *Cluster) newNodeRunner(conf nodeStartConfig) (*nodeRunner, error) {
  186. if err := c.config.Backend.IsSwarmCompatible(); err != nil {
  187. return nil, err
  188. }
  189. actualLocalAddr := conf.LocalAddr
  190. if actualLocalAddr == "" {
  191. // If localAddr was not specified, resolve it automatically
  192. // based on the route to joinAddr. localAddr can only be left
  193. // empty on "join".
  194. listenHost, _, err := net.SplitHostPort(conf.ListenAddr)
  195. if err != nil {
  196. return nil, fmt.Errorf("could not parse listen address: %v", err)
  197. }
  198. listenAddrIP := net.ParseIP(listenHost)
  199. if listenAddrIP == nil || !listenAddrIP.IsUnspecified() {
  200. actualLocalAddr = listenHost
  201. } else {
  202. if conf.RemoteAddr == "" {
  203. // Should never happen except using swarms created by
  204. // old versions that didn't save remoteAddr.
  205. conf.RemoteAddr = "8.8.8.8:53"
  206. }
  207. conn, err := net.Dial("udp", conf.RemoteAddr)
  208. if err != nil {
  209. return nil, fmt.Errorf("could not find local IP address: %v", err)
  210. }
  211. localHostPort := conn.LocalAddr().String()
  212. actualLocalAddr, _, _ = net.SplitHostPort(localHostPort)
  213. conn.Close()
  214. }
  215. }
  216. nr := &nodeRunner{cluster: c}
  217. nr.actualLocalAddr = actualLocalAddr
  218. if err := nr.Start(conf); err != nil {
  219. return nil, err
  220. }
  221. c.config.Backend.DaemonJoinsCluster(c)
  222. return nr, nil
  223. }
  224. func (c *Cluster) getRequestContext(ctx context.Context) (context.Context, func()) { // TODO: not needed when requests don't block on qourum lost
  225. return context.WithTimeout(ctx, swarmRequestTimeout)
  226. }
  227. // IsManager returns true if Cluster is participating as a manager.
  228. func (c *Cluster) IsManager() bool {
  229. c.mu.RLock()
  230. defer c.mu.RUnlock()
  231. return c.currentNodeState().IsActiveManager()
  232. }
  233. // IsAgent returns true if Cluster is participating as a worker/agent.
  234. func (c *Cluster) IsAgent() bool {
  235. c.mu.RLock()
  236. defer c.mu.RUnlock()
  237. return c.currentNodeState().status == types.LocalNodeStateActive
  238. }
  239. // GetLocalAddress returns the local address.
  240. func (c *Cluster) GetLocalAddress() string {
  241. c.mu.RLock()
  242. defer c.mu.RUnlock()
  243. return c.currentNodeState().actualLocalAddr
  244. }
  245. // GetListenAddress returns the listen address.
  246. func (c *Cluster) GetListenAddress() string {
  247. c.mu.RLock()
  248. defer c.mu.RUnlock()
  249. if c.nr != nil {
  250. return c.nr.config.ListenAddr
  251. }
  252. return ""
  253. }
  254. // GetAdvertiseAddress returns the remotely reachable address of this node.
  255. func (c *Cluster) GetAdvertiseAddress() string {
  256. c.mu.RLock()
  257. defer c.mu.RUnlock()
  258. if c.nr != nil && c.nr.config.AdvertiseAddr != "" {
  259. advertiseHost, _, _ := net.SplitHostPort(c.nr.config.AdvertiseAddr)
  260. return advertiseHost
  261. }
  262. return c.currentNodeState().actualLocalAddr
  263. }
  264. // GetDataPathAddress returns the address to be used for the data path traffic, if specified.
  265. func (c *Cluster) GetDataPathAddress() string {
  266. c.mu.RLock()
  267. defer c.mu.RUnlock()
  268. if c.nr != nil {
  269. return c.nr.config.DataPathAddr
  270. }
  271. return ""
  272. }
  273. // GetRemoteAddressList returns the advertise address for each of the remote managers if
  274. // available.
  275. func (c *Cluster) GetRemoteAddressList() []string {
  276. c.mu.RLock()
  277. defer c.mu.RUnlock()
  278. return c.getRemoteAddressList()
  279. }
  280. // GetWatchStream returns the channel to pass changes from store watch API
  281. func (c *Cluster) GetWatchStream() chan *swarmapi.WatchMessage {
  282. c.mu.RLock()
  283. defer c.mu.RUnlock()
  284. return c.watchStream
  285. }
  286. func (c *Cluster) getRemoteAddressList() []string {
  287. state := c.currentNodeState()
  288. if state.swarmNode == nil {
  289. return []string{}
  290. }
  291. nodeID := state.swarmNode.NodeID()
  292. remotes := state.swarmNode.Remotes()
  293. addressList := make([]string, 0, len(remotes))
  294. for _, r := range remotes {
  295. if r.NodeID != nodeID {
  296. addressList = append(addressList, r.Addr)
  297. }
  298. }
  299. return addressList
  300. }
  301. // ListenClusterEvents returns a channel that receives messages on cluster
  302. // participation changes.
  303. // todo: make cancelable and accessible to multiple callers
  304. func (c *Cluster) ListenClusterEvents() <-chan lncluster.ConfigEventType {
  305. return c.configEvent
  306. }
  307. // currentNodeState should not be called without a read lock
  308. func (c *Cluster) currentNodeState() nodeState {
  309. return c.nr.State()
  310. }
  311. // errNoManager returns error describing why manager commands can't be used.
  312. // Call with read lock.
  313. func (c *Cluster) errNoManager(st nodeState) error {
  314. if st.swarmNode == nil {
  315. if errors.Is(st.err, errSwarmLocked) {
  316. return errSwarmLocked
  317. }
  318. if st.err == errSwarmCertificatesExpired {
  319. return errSwarmCertificatesExpired
  320. }
  321. return errors.WithStack(notAvailableError(`This node is not a swarm manager. Use "docker swarm init" or "docker swarm join" to connect this node to swarm and try again.`))
  322. }
  323. if st.swarmNode.Manager() != nil {
  324. return errors.WithStack(notAvailableError("This node is not a swarm manager. Manager is being prepared or has trouble connecting to the cluster."))
  325. }
  326. return errors.WithStack(notAvailableError("This node is not a swarm manager. Worker nodes can't be used to view or modify cluster state. Please run this command on a manager node or promote the current node to a manager."))
  327. }
  328. // Cleanup stops active swarm node. This is run before daemon shutdown.
  329. func (c *Cluster) Cleanup() {
  330. c.controlMutex.Lock()
  331. defer c.controlMutex.Unlock()
  332. c.mu.Lock()
  333. node := c.nr
  334. if node == nil {
  335. c.mu.Unlock()
  336. return
  337. }
  338. state := c.currentNodeState()
  339. c.mu.Unlock()
  340. if state.IsActiveManager() {
  341. active, reachable, unreachable, err := managerStats(state.controlClient, state.NodeID())
  342. if err == nil {
  343. singlenode := active && isLastManager(reachable, unreachable)
  344. if active && !singlenode && removingManagerCausesLossOfQuorum(reachable, unreachable) {
  345. log.G(context.TODO()).Errorf("Leaving cluster with %v managers left out of %v. Raft quorum will be lost.", reachable-1, reachable+unreachable)
  346. }
  347. }
  348. }
  349. if err := node.Stop(); err != nil {
  350. log.G(context.TODO()).Errorf("failed to shut down cluster node: %v", err)
  351. stack.Dump()
  352. }
  353. c.mu.Lock()
  354. c.nr = nil
  355. c.mu.Unlock()
  356. }
  357. func managerStats(client swarmapi.ControlClient, currentNodeID string) (current bool, reachable int, unreachable int, err error) {
  358. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  359. defer cancel()
  360. nodes, err := client.ListNodes(
  361. ctx, &swarmapi.ListNodesRequest{},
  362. grpc.MaxCallRecvMsgSize(defaultRecvSizeForListResponse),
  363. )
  364. if err != nil {
  365. return false, 0, 0, err
  366. }
  367. for _, n := range nodes.Nodes {
  368. if n.ManagerStatus != nil {
  369. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_REACHABLE {
  370. reachable++
  371. if n.ID == currentNodeID {
  372. current = true
  373. }
  374. }
  375. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_UNREACHABLE {
  376. unreachable++
  377. }
  378. }
  379. }
  380. return
  381. }
  382. func detectLockedError(err error) error {
  383. if err == swarmnode.ErrInvalidUnlockKey {
  384. return errors.WithStack(errSwarmLocked)
  385. }
  386. return err
  387. }
  388. func (c *Cluster) lockedManagerAction(fn func(ctx context.Context, state nodeState) error) error {
  389. c.mu.RLock()
  390. defer c.mu.RUnlock()
  391. state := c.currentNodeState()
  392. if !state.IsActiveManager() {
  393. return c.errNoManager(state)
  394. }
  395. ctx := context.TODO()
  396. ctx, cancel := c.getRequestContext(ctx)
  397. defer cancel()
  398. return fn(ctx, state)
  399. }
  400. // SendClusterEvent allows to send cluster events on the configEvent channel
  401. // TODO This method should not be exposed.
  402. // Currently it is used to notify the network controller that the keys are
  403. // available
  404. func (c *Cluster) SendClusterEvent(event lncluster.ConfigEventType) {
  405. c.mu.RLock()
  406. defer c.mu.RUnlock()
  407. c.configEvent <- event
  408. }