cluster.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413
  1. package cluster
  2. //
  3. // ## Swarmkit integration
  4. //
  5. // Cluster - static configurable object for accessing everything swarm related.
  6. // Contains methods for connecting and controlling the cluster. Exists always,
  7. // even if swarm mode is not enabled.
  8. //
  9. // NodeRunner - Manager for starting the swarmkit node. Is present only and
  10. // always if swarm mode is enabled. Implements backoff restart loop in case of
  11. // errors.
  12. //
  13. // NodeState - Information about the current node status including access to
  14. // gRPC clients if a manager is active.
  15. //
  16. // ### Locking
  17. //
  18. // `cluster.controlMutex` - taken for the whole lifecycle of the processes that
  19. // can reconfigure cluster(init/join/leave etc). Protects that one
  20. // reconfiguration action has fully completed before another can start.
  21. //
  22. // `cluster.mu` - taken when the actual changes in cluster configurations
  23. // happen. Different from `controlMutex` because in some cases we need to
  24. // access current cluster state even if the long-running reconfiguration is
  25. // going on. For example network stack may ask for the current cluster state in
  26. // the middle of the shutdown. Any time current cluster state is asked you
  27. // should take the read lock of `cluster.mu`. If you are writing an API
  28. // responder that returns synchronously, hold `cluster.mu.RLock()` for the
  29. // duration of the whole handler function. That ensures that node will not be
  30. // shut down until the handler has finished.
  31. //
  32. // NodeRunner implements its internal locks that should not be used outside of
  33. // the struct. Instead, you should just call `nodeRunner.State()` method to get
  34. // the current state of the cluster(still need `cluster.mu.RLock()` to access
  35. // `cluster.nr` reference itself). Most of the changes in NodeRunner happen
  36. // because of an external event(network problem, unexpected swarmkit error) and
  37. // Docker shouldn't take any locks that delay these changes from happening.
  38. //
  39. import (
  40. "fmt"
  41. "net"
  42. "os"
  43. "path/filepath"
  44. "sync"
  45. "time"
  46. "github.com/Sirupsen/logrus"
  47. "github.com/docker/docker/api/types/network"
  48. types "github.com/docker/docker/api/types/swarm"
  49. executorpkg "github.com/docker/docker/daemon/cluster/executor"
  50. "github.com/docker/docker/pkg/signal"
  51. swarmapi "github.com/docker/swarmkit/api"
  52. swarmnode "github.com/docker/swarmkit/node"
  53. "github.com/pkg/errors"
  54. "golang.org/x/net/context"
  55. )
  56. const swarmDirName = "swarm"
  57. const controlSocket = "control.sock"
  58. const swarmConnectTimeout = 20 * time.Second
  59. const swarmRequestTimeout = 20 * time.Second
  60. const stateFile = "docker-state.json"
  61. const defaultAddr = "0.0.0.0:2377"
  62. const (
  63. initialReconnectDelay = 100 * time.Millisecond
  64. maxReconnectDelay = 30 * time.Second
  65. contextPrefix = "com.docker.swarm"
  66. )
  67. // errNoSwarm is returned on leaving a cluster that was never initialized
  68. var errNoSwarm = errors.New("This node is not part of a swarm")
  69. // errSwarmExists is returned on initialize or join request for a cluster that has already been activated
  70. var errSwarmExists = errors.New("This node is already part of a swarm. Use \"docker swarm leave\" to leave this swarm and join another one.")
  71. // errSwarmJoinTimeoutReached is returned when cluster join could not complete before timeout was reached.
  72. var errSwarmJoinTimeoutReached = errors.New("Timeout was reached before node was joined. The attempt to join the swarm will continue in the background. Use the \"docker info\" command to see the current swarm status of your node.")
  73. // errSwarmLocked is returned if the swarm is encrypted and needs a key to unlock it.
  74. var errSwarmLocked = errors.New("Swarm is encrypted and needs to be unlocked before it can be used. Please use \"docker swarm unlock\" to unlock it.")
  75. // errSwarmCertificatesExpired is returned if docker was not started for the whole validity period and they had no chance to renew automatically.
  76. var errSwarmCertificatesExpired = errors.New("Swarm certificates have expired. To replace them, leave the swarm and join again.")
  77. // NetworkSubnetsProvider exposes functions for retrieving the subnets
  78. // of networks managed by Docker, so they can be filtered.
  79. type NetworkSubnetsProvider interface {
  80. Subnets() ([]net.IPNet, []net.IPNet)
  81. }
  82. // Config provides values for Cluster.
  83. type Config struct {
  84. Root string
  85. Name string
  86. Backend executorpkg.Backend
  87. NetworkSubnetsProvider NetworkSubnetsProvider
  88. // DefaultAdvertiseAddr is the default host/IP or network interface to use
  89. // if no AdvertiseAddr value is specified.
  90. DefaultAdvertiseAddr string
  91. // path to store runtime state, such as the swarm control socket
  92. RuntimeRoot string
  93. }
  94. // Cluster provides capabilities to participate in a cluster as a worker or a
  95. // manager.
  96. type Cluster struct {
  97. mu sync.RWMutex
  98. controlMutex sync.RWMutex // protect init/join/leave user operations
  99. nr *nodeRunner
  100. root string
  101. runtimeRoot string
  102. config Config
  103. configEvent chan struct{} // todo: make this array and goroutine safe
  104. attachers map[string]*attacher
  105. }
  106. // attacher manages the in-memory attachment state of a container
  107. // attachment to a global scope network managed by swarm manager. It
  108. // helps in identifying the attachment ID via the taskID and the
  109. // corresponding attachment configuration obtained from the manager.
  110. type attacher struct {
  111. taskID string
  112. config *network.NetworkingConfig
  113. inProgress bool
  114. attachWaitCh chan *network.NetworkingConfig
  115. attachCompleteCh chan struct{}
  116. detachWaitCh chan struct{}
  117. }
  118. // New creates a new Cluster instance using provided config.
  119. func New(config Config) (*Cluster, error) {
  120. root := filepath.Join(config.Root, swarmDirName)
  121. if err := os.MkdirAll(root, 0700); err != nil {
  122. return nil, err
  123. }
  124. if config.RuntimeRoot == "" {
  125. config.RuntimeRoot = root
  126. }
  127. if err := os.MkdirAll(config.RuntimeRoot, 0700); err != nil {
  128. return nil, err
  129. }
  130. c := &Cluster{
  131. root: root,
  132. config: config,
  133. configEvent: make(chan struct{}, 10),
  134. runtimeRoot: config.RuntimeRoot,
  135. attachers: make(map[string]*attacher),
  136. }
  137. nodeConfig, err := loadPersistentState(root)
  138. if err != nil {
  139. if os.IsNotExist(err) {
  140. return c, nil
  141. }
  142. return nil, err
  143. }
  144. nr, err := c.newNodeRunner(*nodeConfig)
  145. if err != nil {
  146. return nil, err
  147. }
  148. c.nr = nr
  149. select {
  150. case <-time.After(swarmConnectTimeout):
  151. logrus.Error("swarm component could not be started before timeout was reached")
  152. case err := <-nr.Ready():
  153. if err != nil {
  154. logrus.WithError(err).Error("swarm component could not be started")
  155. return c, nil
  156. }
  157. }
  158. return c, nil
  159. }
  160. func (c *Cluster) newNodeRunner(conf nodeStartConfig) (*nodeRunner, error) {
  161. if err := c.config.Backend.IsSwarmCompatible(); err != nil {
  162. return nil, err
  163. }
  164. actualLocalAddr := conf.LocalAddr
  165. if actualLocalAddr == "" {
  166. // If localAddr was not specified, resolve it automatically
  167. // based on the route to joinAddr. localAddr can only be left
  168. // empty on "join".
  169. listenHost, _, err := net.SplitHostPort(conf.ListenAddr)
  170. if err != nil {
  171. return nil, fmt.Errorf("could not parse listen address: %v", err)
  172. }
  173. listenAddrIP := net.ParseIP(listenHost)
  174. if listenAddrIP == nil || !listenAddrIP.IsUnspecified() {
  175. actualLocalAddr = listenHost
  176. } else {
  177. if conf.RemoteAddr == "" {
  178. // Should never happen except using swarms created by
  179. // old versions that didn't save remoteAddr.
  180. conf.RemoteAddr = "8.8.8.8:53"
  181. }
  182. conn, err := net.Dial("udp", conf.RemoteAddr)
  183. if err != nil {
  184. return nil, fmt.Errorf("could not find local IP address: %v", err)
  185. }
  186. localHostPort := conn.LocalAddr().String()
  187. actualLocalAddr, _, _ = net.SplitHostPort(localHostPort)
  188. conn.Close()
  189. }
  190. }
  191. nr := &nodeRunner{cluster: c}
  192. nr.actualLocalAddr = actualLocalAddr
  193. if err := nr.Start(conf); err != nil {
  194. return nil, err
  195. }
  196. c.config.Backend.DaemonJoinsCluster(c)
  197. return nr, nil
  198. }
  199. func (c *Cluster) getRequestContext() (context.Context, func()) { // TODO: not needed when requests don't block on qourum lost
  200. return context.WithTimeout(context.Background(), swarmRequestTimeout)
  201. }
  202. // IsManager returns true if Cluster is participating as a manager.
  203. func (c *Cluster) IsManager() bool {
  204. c.mu.RLock()
  205. defer c.mu.RUnlock()
  206. return c.currentNodeState().IsActiveManager()
  207. }
  208. // IsAgent returns true if Cluster is participating as a worker/agent.
  209. func (c *Cluster) IsAgent() bool {
  210. c.mu.RLock()
  211. defer c.mu.RUnlock()
  212. return c.currentNodeState().status == types.LocalNodeStateActive
  213. }
  214. // GetLocalAddress returns the local address.
  215. func (c *Cluster) GetLocalAddress() string {
  216. c.mu.RLock()
  217. defer c.mu.RUnlock()
  218. return c.currentNodeState().actualLocalAddr
  219. }
  220. // GetListenAddress returns the listen address.
  221. func (c *Cluster) GetListenAddress() string {
  222. c.mu.RLock()
  223. defer c.mu.RUnlock()
  224. if c.nr != nil {
  225. return c.nr.config.ListenAddr
  226. }
  227. return ""
  228. }
  229. // GetAdvertiseAddress returns the remotely reachable address of this node.
  230. func (c *Cluster) GetAdvertiseAddress() string {
  231. c.mu.RLock()
  232. defer c.mu.RUnlock()
  233. if c.nr != nil && c.nr.config.AdvertiseAddr != "" {
  234. advertiseHost, _, _ := net.SplitHostPort(c.nr.config.AdvertiseAddr)
  235. return advertiseHost
  236. }
  237. return c.currentNodeState().actualLocalAddr
  238. }
  239. // GetDataPathAddress returns the address to be used for the data path traffic, if specified.
  240. func (c *Cluster) GetDataPathAddress() string {
  241. c.mu.RLock()
  242. defer c.mu.RUnlock()
  243. if c.nr != nil {
  244. return c.nr.config.DataPathAddr
  245. }
  246. return ""
  247. }
  248. // GetRemoteAddress returns a known advertise address of a remote manager if
  249. // available.
  250. // todo: change to array/connect with info
  251. func (c *Cluster) GetRemoteAddress() string {
  252. c.mu.RLock()
  253. defer c.mu.RUnlock()
  254. return c.getRemoteAddress()
  255. }
  256. func (c *Cluster) getRemoteAddress() string {
  257. state := c.currentNodeState()
  258. if state.swarmNode == nil {
  259. return ""
  260. }
  261. nodeID := state.swarmNode.NodeID()
  262. for _, r := range state.swarmNode.Remotes() {
  263. if r.NodeID != nodeID {
  264. return r.Addr
  265. }
  266. }
  267. return ""
  268. }
  269. // ListenClusterEvents returns a channel that receives messages on cluster
  270. // participation changes.
  271. // todo: make cancelable and accessible to multiple callers
  272. func (c *Cluster) ListenClusterEvents() <-chan struct{} {
  273. return c.configEvent
  274. }
  275. // currentNodeState should not be called without a read lock
  276. func (c *Cluster) currentNodeState() nodeState {
  277. return c.nr.State()
  278. }
  279. // errNoManager returns error describing why manager commands can't be used.
  280. // Call with read lock.
  281. func (c *Cluster) errNoManager(st nodeState) error {
  282. if st.swarmNode == nil {
  283. if errors.Cause(st.err) == errSwarmLocked {
  284. return errSwarmLocked
  285. }
  286. if st.err == errSwarmCertificatesExpired {
  287. return errSwarmCertificatesExpired
  288. }
  289. return errors.New("This node is not a swarm manager. Use \"docker swarm init\" or \"docker swarm join\" to connect this node to swarm and try again.")
  290. }
  291. if st.swarmNode.Manager() != nil {
  292. return errors.New("This node is not a swarm manager. Manager is being prepared or has trouble connecting to the cluster.")
  293. }
  294. return errors.New("This node is not a swarm manager. Worker nodes can't be used to view or modify cluster state. Please run this command on a manager node or promote the current node to a manager.")
  295. }
  296. // Cleanup stops active swarm node. This is run before daemon shutdown.
  297. func (c *Cluster) Cleanup() {
  298. c.controlMutex.Lock()
  299. defer c.controlMutex.Unlock()
  300. c.mu.Lock()
  301. node := c.nr
  302. if node == nil {
  303. c.mu.Unlock()
  304. return
  305. }
  306. state := c.currentNodeState()
  307. c.mu.Unlock()
  308. if state.IsActiveManager() {
  309. active, reachable, unreachable, err := managerStats(state.controlClient, state.NodeID())
  310. if err == nil {
  311. singlenode := active && isLastManager(reachable, unreachable)
  312. if active && !singlenode && removingManagerCausesLossOfQuorum(reachable, unreachable) {
  313. logrus.Errorf("Leaving cluster with %v managers left out of %v. Raft quorum will be lost.", reachable-1, reachable+unreachable)
  314. }
  315. }
  316. }
  317. if err := node.Stop(); err != nil {
  318. logrus.Errorf("failed to shut down cluster node: %v", err)
  319. signal.DumpStacks("")
  320. }
  321. c.mu.Lock()
  322. c.nr = nil
  323. c.mu.Unlock()
  324. }
  325. func managerStats(client swarmapi.ControlClient, currentNodeID string) (current bool, reachable int, unreachable int, err error) {
  326. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  327. defer cancel()
  328. nodes, err := client.ListNodes(ctx, &swarmapi.ListNodesRequest{})
  329. if err != nil {
  330. return false, 0, 0, err
  331. }
  332. for _, n := range nodes.Nodes {
  333. if n.ManagerStatus != nil {
  334. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_REACHABLE {
  335. reachable++
  336. if n.ID == currentNodeID {
  337. current = true
  338. }
  339. }
  340. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_UNREACHABLE {
  341. unreachable++
  342. }
  343. }
  344. }
  345. return
  346. }
  347. func detectLockedError(err error) error {
  348. if err == swarmnode.ErrInvalidUnlockKey {
  349. return errors.WithStack(errSwarmLocked)
  350. }
  351. return err
  352. }
  353. func (c *Cluster) lockedManagerAction(fn func(ctx context.Context, state nodeState) error) error {
  354. c.mu.RLock()
  355. defer c.mu.RUnlock()
  356. state := c.currentNodeState()
  357. if !state.IsActiveManager() {
  358. return c.errNoManager(state)
  359. }
  360. ctx, cancel := c.getRequestContext()
  361. defer cancel()
  362. return fn(ctx, state)
  363. }