cluster.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439
  1. package cluster
  2. //
  3. // ## Swarmkit integration
  4. //
  5. // Cluster - static configurable object for accessing everything swarm related.
  6. // Contains methods for connecting and controlling the cluster. Exists always,
  7. // even if swarm mode is not enabled.
  8. //
  9. // NodeRunner - Manager for starting the swarmkit node. Is present only and
  10. // always if swarm mode is enabled. Implements backoff restart loop in case of
  11. // errors.
  12. //
  13. // NodeState - Information about the current node status including access to
  14. // gRPC clients if a manager is active.
  15. //
  16. // ### Locking
  17. //
  18. // `cluster.controlMutex` - taken for the whole lifecycle of the processes that
  19. // can reconfigure cluster(init/join/leave etc). Protects that one
  20. // reconfiguration action has fully completed before another can start.
  21. //
  22. // `cluster.mu` - taken when the actual changes in cluster configurations
  23. // happen. Different from `controlMutex` because in some cases we need to
  24. // access current cluster state even if the long-running reconfiguration is
  25. // going on. For example network stack may ask for the current cluster state in
  26. // the middle of the shutdown. Any time current cluster state is asked you
  27. // should take the read lock of `cluster.mu`. If you are writing an API
  28. // responder that returns synchronously, hold `cluster.mu.RLock()` for the
  29. // duration of the whole handler function. That ensures that node will not be
  30. // shut down until the handler has finished.
  31. //
  32. // NodeRunner implements its internal locks that should not be used outside of
  33. // the struct. Instead, you should just call `nodeRunner.State()` method to get
  34. // the current state of the cluster(still need `cluster.mu.RLock()` to access
  35. // `cluster.nr` reference itself). Most of the changes in NodeRunner happen
  36. // because of an external event(network problem, unexpected swarmkit error) and
  37. // Docker shouldn't take any locks that delay these changes from happening.
  38. //
  39. import (
  40. "fmt"
  41. "net"
  42. "os"
  43. "path/filepath"
  44. "sync"
  45. "time"
  46. "github.com/Sirupsen/logrus"
  47. "github.com/docker/docker/api/types/network"
  48. types "github.com/docker/docker/api/types/swarm"
  49. executorpkg "github.com/docker/docker/daemon/cluster/executor"
  50. "github.com/docker/docker/pkg/signal"
  51. lncluster "github.com/docker/libnetwork/cluster"
  52. swarmapi "github.com/docker/swarmkit/api"
  53. swarmnode "github.com/docker/swarmkit/node"
  54. "github.com/pkg/errors"
  55. "golang.org/x/net/context"
  56. )
  57. const swarmDirName = "swarm"
  58. const controlSocket = "control.sock"
  59. const swarmConnectTimeout = 20 * time.Second
  60. const swarmRequestTimeout = 20 * time.Second
  61. const stateFile = "docker-state.json"
  62. const defaultAddr = "0.0.0.0:2377"
  63. const (
  64. initialReconnectDelay = 100 * time.Millisecond
  65. maxReconnectDelay = 30 * time.Second
  66. contextPrefix = "com.docker.swarm"
  67. )
  68. // errNoSwarm is returned on leaving a cluster that was never initialized
  69. var errNoSwarm = errors.New("This node is not part of a swarm")
  70. // errSwarmExists is returned on initialize or join request for a cluster that has already been activated
  71. var errSwarmExists = errors.New("This node is already part of a swarm. Use \"docker swarm leave\" to leave this swarm and join another one.")
  72. // errSwarmJoinTimeoutReached is returned when cluster join could not complete before timeout was reached.
  73. var errSwarmJoinTimeoutReached = errors.New("Timeout was reached before node was joined. The attempt to join the swarm will continue in the background. Use the \"docker info\" command to see the current swarm status of your node.")
  74. // errSwarmLocked is returned if the swarm is encrypted and needs a key to unlock it.
  75. var errSwarmLocked = errors.New("Swarm is encrypted and needs to be unlocked before it can be used. Please use \"docker swarm unlock\" to unlock it.")
  76. // errSwarmCertificatesExpired is returned if docker was not started for the whole validity period and they had no chance to renew automatically.
  77. var errSwarmCertificatesExpired = errors.New("Swarm certificates have expired. To replace them, leave the swarm and join again.")
  78. // NetworkSubnetsProvider exposes functions for retrieving the subnets
  79. // of networks managed by Docker, so they can be filtered.
  80. type NetworkSubnetsProvider interface {
  81. Subnets() ([]net.IPNet, []net.IPNet)
  82. }
  83. // Config provides values for Cluster.
  84. type Config struct {
  85. Root string
  86. Name string
  87. Backend executorpkg.Backend
  88. NetworkSubnetsProvider NetworkSubnetsProvider
  89. // DefaultAdvertiseAddr is the default host/IP or network interface to use
  90. // if no AdvertiseAddr value is specified.
  91. DefaultAdvertiseAddr string
  92. // path to store runtime state, such as the swarm control socket
  93. RuntimeRoot string
  94. // WatchStream is a channel to pass watch API notifications to daemon
  95. WatchStream chan *swarmapi.WatchMessage
  96. }
  97. // Cluster provides capabilities to participate in a cluster as a worker or a
  98. // manager.
  99. type Cluster struct {
  100. mu sync.RWMutex
  101. controlMutex sync.RWMutex // protect init/join/leave user operations
  102. nr *nodeRunner
  103. root string
  104. runtimeRoot string
  105. config Config
  106. configEvent chan lncluster.ConfigEventType // todo: make this array and goroutine safe
  107. attachers map[string]*attacher
  108. watchStream chan *swarmapi.WatchMessage
  109. }
  110. // attacher manages the in-memory attachment state of a container
  111. // attachment to a global scope network managed by swarm manager. It
  112. // helps in identifying the attachment ID via the taskID and the
  113. // corresponding attachment configuration obtained from the manager.
  114. type attacher struct {
  115. taskID string
  116. config *network.NetworkingConfig
  117. inProgress bool
  118. attachWaitCh chan *network.NetworkingConfig
  119. attachCompleteCh chan struct{}
  120. detachWaitCh chan struct{}
  121. }
  122. // New creates a new Cluster instance using provided config.
  123. func New(config Config) (*Cluster, error) {
  124. root := filepath.Join(config.Root, swarmDirName)
  125. if err := os.MkdirAll(root, 0700); err != nil {
  126. return nil, err
  127. }
  128. if config.RuntimeRoot == "" {
  129. config.RuntimeRoot = root
  130. }
  131. if err := os.MkdirAll(config.RuntimeRoot, 0700); err != nil {
  132. return nil, err
  133. }
  134. c := &Cluster{
  135. root: root,
  136. config: config,
  137. configEvent: make(chan lncluster.ConfigEventType, 10),
  138. runtimeRoot: config.RuntimeRoot,
  139. attachers: make(map[string]*attacher),
  140. watchStream: config.WatchStream,
  141. }
  142. return c, nil
  143. }
  144. // Start the Cluster instance
  145. // TODO The split between New and Start can be join again when the SendClusterEvent
  146. // method is no longer required
  147. func (c *Cluster) Start() error {
  148. root := filepath.Join(c.config.Root, swarmDirName)
  149. nodeConfig, err := loadPersistentState(root)
  150. if err != nil {
  151. if os.IsNotExist(err) {
  152. return nil
  153. }
  154. return err
  155. }
  156. nr, err := c.newNodeRunner(*nodeConfig)
  157. if err != nil {
  158. return err
  159. }
  160. c.nr = nr
  161. select {
  162. case <-time.After(swarmConnectTimeout):
  163. logrus.Error("swarm component could not be started before timeout was reached")
  164. case err := <-nr.Ready():
  165. if err != nil {
  166. logrus.WithError(err).Error("swarm component could not be started")
  167. return nil
  168. }
  169. }
  170. return nil
  171. }
  172. func (c *Cluster) newNodeRunner(conf nodeStartConfig) (*nodeRunner, error) {
  173. if err := c.config.Backend.IsSwarmCompatible(); err != nil {
  174. return nil, err
  175. }
  176. actualLocalAddr := conf.LocalAddr
  177. if actualLocalAddr == "" {
  178. // If localAddr was not specified, resolve it automatically
  179. // based on the route to joinAddr. localAddr can only be left
  180. // empty on "join".
  181. listenHost, _, err := net.SplitHostPort(conf.ListenAddr)
  182. if err != nil {
  183. return nil, fmt.Errorf("could not parse listen address: %v", err)
  184. }
  185. listenAddrIP := net.ParseIP(listenHost)
  186. if listenAddrIP == nil || !listenAddrIP.IsUnspecified() {
  187. actualLocalAddr = listenHost
  188. } else {
  189. if conf.RemoteAddr == "" {
  190. // Should never happen except using swarms created by
  191. // old versions that didn't save remoteAddr.
  192. conf.RemoteAddr = "8.8.8.8:53"
  193. }
  194. conn, err := net.Dial("udp", conf.RemoteAddr)
  195. if err != nil {
  196. return nil, fmt.Errorf("could not find local IP address: %v", err)
  197. }
  198. localHostPort := conn.LocalAddr().String()
  199. actualLocalAddr, _, _ = net.SplitHostPort(localHostPort)
  200. conn.Close()
  201. }
  202. }
  203. nr := &nodeRunner{cluster: c}
  204. nr.actualLocalAddr = actualLocalAddr
  205. if err := nr.Start(conf); err != nil {
  206. return nil, err
  207. }
  208. c.config.Backend.DaemonJoinsCluster(c)
  209. return nr, nil
  210. }
  211. func (c *Cluster) getRequestContext() (context.Context, func()) { // TODO: not needed when requests don't block on qourum lost
  212. return context.WithTimeout(context.Background(), swarmRequestTimeout)
  213. }
  214. // IsManager returns true if Cluster is participating as a manager.
  215. func (c *Cluster) IsManager() bool {
  216. c.mu.RLock()
  217. defer c.mu.RUnlock()
  218. return c.currentNodeState().IsActiveManager()
  219. }
  220. // IsAgent returns true if Cluster is participating as a worker/agent.
  221. func (c *Cluster) IsAgent() bool {
  222. c.mu.RLock()
  223. defer c.mu.RUnlock()
  224. return c.currentNodeState().status == types.LocalNodeStateActive
  225. }
  226. // GetLocalAddress returns the local address.
  227. func (c *Cluster) GetLocalAddress() string {
  228. c.mu.RLock()
  229. defer c.mu.RUnlock()
  230. return c.currentNodeState().actualLocalAddr
  231. }
  232. // GetListenAddress returns the listen address.
  233. func (c *Cluster) GetListenAddress() string {
  234. c.mu.RLock()
  235. defer c.mu.RUnlock()
  236. if c.nr != nil {
  237. return c.nr.config.ListenAddr
  238. }
  239. return ""
  240. }
  241. // GetAdvertiseAddress returns the remotely reachable address of this node.
  242. func (c *Cluster) GetAdvertiseAddress() string {
  243. c.mu.RLock()
  244. defer c.mu.RUnlock()
  245. if c.nr != nil && c.nr.config.AdvertiseAddr != "" {
  246. advertiseHost, _, _ := net.SplitHostPort(c.nr.config.AdvertiseAddr)
  247. return advertiseHost
  248. }
  249. return c.currentNodeState().actualLocalAddr
  250. }
  251. // GetDataPathAddress returns the address to be used for the data path traffic, if specified.
  252. func (c *Cluster) GetDataPathAddress() string {
  253. c.mu.RLock()
  254. defer c.mu.RUnlock()
  255. if c.nr != nil {
  256. return c.nr.config.DataPathAddr
  257. }
  258. return ""
  259. }
  260. // GetRemoteAddressList returns the advertise address for each of the remote managers if
  261. // available.
  262. func (c *Cluster) GetRemoteAddressList() []string {
  263. c.mu.RLock()
  264. defer c.mu.RUnlock()
  265. return c.getRemoteAddressList()
  266. }
  267. func (c *Cluster) getRemoteAddressList() []string {
  268. state := c.currentNodeState()
  269. if state.swarmNode == nil {
  270. return []string{}
  271. }
  272. nodeID := state.swarmNode.NodeID()
  273. remotes := state.swarmNode.Remotes()
  274. addressList := make([]string, 0, len(remotes))
  275. for _, r := range remotes {
  276. if r.NodeID != nodeID {
  277. addressList = append(addressList, r.Addr)
  278. }
  279. }
  280. return addressList
  281. }
  282. // ListenClusterEvents returns a channel that receives messages on cluster
  283. // participation changes.
  284. // todo: make cancelable and accessible to multiple callers
  285. func (c *Cluster) ListenClusterEvents() <-chan lncluster.ConfigEventType {
  286. return c.configEvent
  287. }
  288. // currentNodeState should not be called without a read lock
  289. func (c *Cluster) currentNodeState() nodeState {
  290. return c.nr.State()
  291. }
  292. // errNoManager returns error describing why manager commands can't be used.
  293. // Call with read lock.
  294. func (c *Cluster) errNoManager(st nodeState) error {
  295. if st.swarmNode == nil {
  296. if errors.Cause(st.err) == errSwarmLocked {
  297. return errSwarmLocked
  298. }
  299. if st.err == errSwarmCertificatesExpired {
  300. return errSwarmCertificatesExpired
  301. }
  302. return errors.New("This node is not a swarm manager. Use \"docker swarm init\" or \"docker swarm join\" to connect this node to swarm and try again.")
  303. }
  304. if st.swarmNode.Manager() != nil {
  305. return errors.New("This node is not a swarm manager. Manager is being prepared or has trouble connecting to the cluster.")
  306. }
  307. return errors.New("This node is not a swarm manager. Worker nodes can't be used to view or modify cluster state. Please run this command on a manager node or promote the current node to a manager.")
  308. }
  309. // Cleanup stops active swarm node. This is run before daemon shutdown.
  310. func (c *Cluster) Cleanup() {
  311. c.controlMutex.Lock()
  312. defer c.controlMutex.Unlock()
  313. c.mu.Lock()
  314. node := c.nr
  315. if node == nil {
  316. c.mu.Unlock()
  317. return
  318. }
  319. state := c.currentNodeState()
  320. c.mu.Unlock()
  321. if state.IsActiveManager() {
  322. active, reachable, unreachable, err := managerStats(state.controlClient, state.NodeID())
  323. if err == nil {
  324. singlenode := active && isLastManager(reachable, unreachable)
  325. if active && !singlenode && removingManagerCausesLossOfQuorum(reachable, unreachable) {
  326. logrus.Errorf("Leaving cluster with %v managers left out of %v. Raft quorum will be lost.", reachable-1, reachable+unreachable)
  327. }
  328. }
  329. }
  330. if err := node.Stop(); err != nil {
  331. logrus.Errorf("failed to shut down cluster node: %v", err)
  332. signal.DumpStacks("")
  333. }
  334. c.mu.Lock()
  335. c.nr = nil
  336. c.mu.Unlock()
  337. }
  338. func managerStats(client swarmapi.ControlClient, currentNodeID string) (current bool, reachable int, unreachable int, err error) {
  339. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  340. defer cancel()
  341. nodes, err := client.ListNodes(ctx, &swarmapi.ListNodesRequest{})
  342. if err != nil {
  343. return false, 0, 0, err
  344. }
  345. for _, n := range nodes.Nodes {
  346. if n.ManagerStatus != nil {
  347. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_REACHABLE {
  348. reachable++
  349. if n.ID == currentNodeID {
  350. current = true
  351. }
  352. }
  353. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_UNREACHABLE {
  354. unreachable++
  355. }
  356. }
  357. }
  358. return
  359. }
  360. func detectLockedError(err error) error {
  361. if err == swarmnode.ErrInvalidUnlockKey {
  362. return errors.WithStack(errSwarmLocked)
  363. }
  364. return err
  365. }
  366. func (c *Cluster) lockedManagerAction(fn func(ctx context.Context, state nodeState) error) error {
  367. c.mu.RLock()
  368. defer c.mu.RUnlock()
  369. state := c.currentNodeState()
  370. if !state.IsActiveManager() {
  371. return c.errNoManager(state)
  372. }
  373. ctx, cancel := c.getRequestContext()
  374. defer cancel()
  375. return fn(ctx, state)
  376. }
  377. // SendClusterEvent allows to send cluster events on the configEvent channel
  378. // TODO This method should not be exposed.
  379. // Currently it is used to notify the network controller that the keys are
  380. // available
  381. func (c *Cluster) SendClusterEvent(event lncluster.ConfigEventType) {
  382. c.mu.RLock()
  383. defer c.mu.RUnlock()
  384. c.configEvent <- event
  385. }