cluster.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426
  1. package cluster
  2. //
  3. // ## Swarmkit integration
  4. //
  5. // Cluster - static configurable object for accessing everything swarm related.
  6. // Contains methods for connecting and controlling the cluster. Exists always,
  7. // even if swarm mode is not enabled.
  8. //
  9. // NodeRunner - Manager for starting the swarmkit node. Is present only and
  10. // always if swarm mode is enabled. Implements backoff restart loop in case of
  11. // errors.
  12. //
  13. // NodeState - Information about the current node status including access to
  14. // gRPC clients if a manager is active.
  15. //
  16. // ### Locking
  17. //
  18. // `cluster.controlMutex` - taken for the whole lifecycle of the processes that
  19. // can reconfigure cluster(init/join/leave etc). Protects that one
  20. // reconfiguration action has fully completed before another can start.
  21. //
  22. // `cluster.mu` - taken when the actual changes in cluster configurations
  23. // happen. Different from `controlMutex` because in some cases we need to
  24. // access current cluster state even if the long-running reconfiguration is
  25. // going on. For example network stack may ask for the current cluster state in
  26. // the middle of the shutdown. Any time current cluster state is asked you
  27. // should take the read lock of `cluster.mu`. If you are writing an API
  28. // responder that returns synchronously, hold `cluster.mu.RLock()` for the
  29. // duration of the whole handler function. That ensures that node will not be
  30. // shut down until the handler has finished.
  31. //
  32. // NodeRunner implements its internal locks that should not be used outside of
  33. // the struct. Instead, you should just call `nodeRunner.State()` method to get
  34. // the current state of the cluster(still need `cluster.mu.RLock()` to access
  35. // `cluster.nr` reference itself). Most of the changes in NodeRunner happen
  36. // because of an external event(network problem, unexpected swarmkit error) and
  37. // Docker shouldn't take any locks that delay these changes from happening.
  38. //
  39. import (
  40. "fmt"
  41. "net"
  42. "os"
  43. "path/filepath"
  44. "sync"
  45. "time"
  46. "github.com/docker/docker/api/types/network"
  47. types "github.com/docker/docker/api/types/swarm"
  48. "github.com/docker/docker/daemon/cluster/controllers/plugin"
  49. executorpkg "github.com/docker/docker/daemon/cluster/executor"
  50. "github.com/docker/docker/pkg/signal"
  51. lncluster "github.com/docker/libnetwork/cluster"
  52. swarmapi "github.com/docker/swarmkit/api"
  53. swarmnode "github.com/docker/swarmkit/node"
  54. "github.com/pkg/errors"
  55. "github.com/sirupsen/logrus"
  56. "golang.org/x/net/context"
  57. )
  58. const swarmDirName = "swarm"
  59. const controlSocket = "control.sock"
  60. const swarmConnectTimeout = 20 * time.Second
  61. const swarmRequestTimeout = 20 * time.Second
  62. const stateFile = "docker-state.json"
  63. const defaultAddr = "0.0.0.0:2377"
  64. const (
  65. initialReconnectDelay = 100 * time.Millisecond
  66. maxReconnectDelay = 30 * time.Second
  67. contextPrefix = "com.docker.swarm"
  68. )
  69. // NetworkSubnetsProvider exposes functions for retrieving the subnets
  70. // of networks managed by Docker, so they can be filtered.
  71. type NetworkSubnetsProvider interface {
  72. Subnets() ([]net.IPNet, []net.IPNet)
  73. }
  74. // Config provides values for Cluster.
  75. type Config struct {
  76. Root string
  77. Name string
  78. Backend executorpkg.Backend
  79. PluginBackend plugin.Backend
  80. NetworkSubnetsProvider NetworkSubnetsProvider
  81. // DefaultAdvertiseAddr is the default host/IP or network interface to use
  82. // if no AdvertiseAddr value is specified.
  83. DefaultAdvertiseAddr string
  84. // path to store runtime state, such as the swarm control socket
  85. RuntimeRoot string
  86. // WatchStream is a channel to pass watch API notifications to daemon
  87. WatchStream chan *swarmapi.WatchMessage
  88. }
  89. // Cluster provides capabilities to participate in a cluster as a worker or a
  90. // manager.
  91. type Cluster struct {
  92. mu sync.RWMutex
  93. controlMutex sync.RWMutex // protect init/join/leave user operations
  94. nr *nodeRunner
  95. root string
  96. runtimeRoot string
  97. config Config
  98. configEvent chan lncluster.ConfigEventType // todo: make this array and goroutine safe
  99. attachers map[string]*attacher
  100. watchStream chan *swarmapi.WatchMessage
  101. }
  102. // attacher manages the in-memory attachment state of a container
  103. // attachment to a global scope network managed by swarm manager. It
  104. // helps in identifying the attachment ID via the taskID and the
  105. // corresponding attachment configuration obtained from the manager.
  106. type attacher struct {
  107. taskID string
  108. config *network.NetworkingConfig
  109. inProgress bool
  110. attachWaitCh chan *network.NetworkingConfig
  111. attachCompleteCh chan struct{}
  112. detachWaitCh chan struct{}
  113. }
  114. // New creates a new Cluster instance using provided config.
  115. func New(config Config) (*Cluster, error) {
  116. root := filepath.Join(config.Root, swarmDirName)
  117. if err := os.MkdirAll(root, 0700); err != nil {
  118. return nil, err
  119. }
  120. if config.RuntimeRoot == "" {
  121. config.RuntimeRoot = root
  122. }
  123. if err := os.MkdirAll(config.RuntimeRoot, 0700); err != nil {
  124. return nil, err
  125. }
  126. c := &Cluster{
  127. root: root,
  128. config: config,
  129. configEvent: make(chan lncluster.ConfigEventType, 10),
  130. runtimeRoot: config.RuntimeRoot,
  131. attachers: make(map[string]*attacher),
  132. watchStream: config.WatchStream,
  133. }
  134. return c, nil
  135. }
  136. // Start the Cluster instance
  137. // TODO The split between New and Start can be join again when the SendClusterEvent
  138. // method is no longer required
  139. func (c *Cluster) Start() error {
  140. root := filepath.Join(c.config.Root, swarmDirName)
  141. nodeConfig, err := loadPersistentState(root)
  142. if err != nil {
  143. if os.IsNotExist(err) {
  144. return nil
  145. }
  146. return err
  147. }
  148. nr, err := c.newNodeRunner(*nodeConfig)
  149. if err != nil {
  150. return err
  151. }
  152. c.nr = nr
  153. select {
  154. case <-time.After(swarmConnectTimeout):
  155. logrus.Error("swarm component could not be started before timeout was reached")
  156. case err := <-nr.Ready():
  157. if err != nil {
  158. logrus.WithError(err).Error("swarm component could not be started")
  159. return nil
  160. }
  161. }
  162. return nil
  163. }
  164. func (c *Cluster) newNodeRunner(conf nodeStartConfig) (*nodeRunner, error) {
  165. if err := c.config.Backend.IsSwarmCompatible(); err != nil {
  166. return nil, err
  167. }
  168. actualLocalAddr := conf.LocalAddr
  169. if actualLocalAddr == "" {
  170. // If localAddr was not specified, resolve it automatically
  171. // based on the route to joinAddr. localAddr can only be left
  172. // empty on "join".
  173. listenHost, _, err := net.SplitHostPort(conf.ListenAddr)
  174. if err != nil {
  175. return nil, fmt.Errorf("could not parse listen address: %v", err)
  176. }
  177. listenAddrIP := net.ParseIP(listenHost)
  178. if listenAddrIP == nil || !listenAddrIP.IsUnspecified() {
  179. actualLocalAddr = listenHost
  180. } else {
  181. if conf.RemoteAddr == "" {
  182. // Should never happen except using swarms created by
  183. // old versions that didn't save remoteAddr.
  184. conf.RemoteAddr = "8.8.8.8:53"
  185. }
  186. conn, err := net.Dial("udp", conf.RemoteAddr)
  187. if err != nil {
  188. return nil, fmt.Errorf("could not find local IP address: %v", err)
  189. }
  190. localHostPort := conn.LocalAddr().String()
  191. actualLocalAddr, _, _ = net.SplitHostPort(localHostPort)
  192. conn.Close()
  193. }
  194. }
  195. nr := &nodeRunner{cluster: c}
  196. nr.actualLocalAddr = actualLocalAddr
  197. if err := nr.Start(conf); err != nil {
  198. return nil, err
  199. }
  200. c.config.Backend.DaemonJoinsCluster(c)
  201. return nr, nil
  202. }
  203. func (c *Cluster) getRequestContext() (context.Context, func()) { // TODO: not needed when requests don't block on qourum lost
  204. return context.WithTimeout(context.Background(), swarmRequestTimeout)
  205. }
  206. // IsManager returns true if Cluster is participating as a manager.
  207. func (c *Cluster) IsManager() bool {
  208. c.mu.RLock()
  209. defer c.mu.RUnlock()
  210. return c.currentNodeState().IsActiveManager()
  211. }
  212. // IsAgent returns true if Cluster is participating as a worker/agent.
  213. func (c *Cluster) IsAgent() bool {
  214. c.mu.RLock()
  215. defer c.mu.RUnlock()
  216. return c.currentNodeState().status == types.LocalNodeStateActive
  217. }
  218. // GetLocalAddress returns the local address.
  219. func (c *Cluster) GetLocalAddress() string {
  220. c.mu.RLock()
  221. defer c.mu.RUnlock()
  222. return c.currentNodeState().actualLocalAddr
  223. }
  224. // GetListenAddress returns the listen address.
  225. func (c *Cluster) GetListenAddress() string {
  226. c.mu.RLock()
  227. defer c.mu.RUnlock()
  228. if c.nr != nil {
  229. return c.nr.config.ListenAddr
  230. }
  231. return ""
  232. }
  233. // GetAdvertiseAddress returns the remotely reachable address of this node.
  234. func (c *Cluster) GetAdvertiseAddress() string {
  235. c.mu.RLock()
  236. defer c.mu.RUnlock()
  237. if c.nr != nil && c.nr.config.AdvertiseAddr != "" {
  238. advertiseHost, _, _ := net.SplitHostPort(c.nr.config.AdvertiseAddr)
  239. return advertiseHost
  240. }
  241. return c.currentNodeState().actualLocalAddr
  242. }
  243. // GetDataPathAddress returns the address to be used for the data path traffic, if specified.
  244. func (c *Cluster) GetDataPathAddress() string {
  245. c.mu.RLock()
  246. defer c.mu.RUnlock()
  247. if c.nr != nil {
  248. return c.nr.config.DataPathAddr
  249. }
  250. return ""
  251. }
  252. // GetRemoteAddressList returns the advertise address for each of the remote managers if
  253. // available.
  254. func (c *Cluster) GetRemoteAddressList() []string {
  255. c.mu.RLock()
  256. defer c.mu.RUnlock()
  257. return c.getRemoteAddressList()
  258. }
  259. func (c *Cluster) getRemoteAddressList() []string {
  260. state := c.currentNodeState()
  261. if state.swarmNode == nil {
  262. return []string{}
  263. }
  264. nodeID := state.swarmNode.NodeID()
  265. remotes := state.swarmNode.Remotes()
  266. addressList := make([]string, 0, len(remotes))
  267. for _, r := range remotes {
  268. if r.NodeID != nodeID {
  269. addressList = append(addressList, r.Addr)
  270. }
  271. }
  272. return addressList
  273. }
  274. // ListenClusterEvents returns a channel that receives messages on cluster
  275. // participation changes.
  276. // todo: make cancelable and accessible to multiple callers
  277. func (c *Cluster) ListenClusterEvents() <-chan lncluster.ConfigEventType {
  278. return c.configEvent
  279. }
  280. // currentNodeState should not be called without a read lock
  281. func (c *Cluster) currentNodeState() nodeState {
  282. return c.nr.State()
  283. }
  284. // errNoManager returns error describing why manager commands can't be used.
  285. // Call with read lock.
  286. func (c *Cluster) errNoManager(st nodeState) error {
  287. if st.swarmNode == nil {
  288. if errors.Cause(st.err) == errSwarmLocked {
  289. return errSwarmLocked
  290. }
  291. if st.err == errSwarmCertificatesExpired {
  292. return errSwarmCertificatesExpired
  293. }
  294. return errors.WithStack(notAvailableError("This node is not a swarm manager. Use \"docker swarm init\" or \"docker swarm join\" to connect this node to swarm and try again."))
  295. }
  296. if st.swarmNode.Manager() != nil {
  297. return errors.WithStack(notAvailableError("This node is not a swarm manager. Manager is being prepared or has trouble connecting to the cluster."))
  298. }
  299. return errors.WithStack(notAvailableError("This node is not a swarm manager. Worker nodes can't be used to view or modify cluster state. Please run this command on a manager node or promote the current node to a manager."))
  300. }
  301. // Cleanup stops active swarm node. This is run before daemon shutdown.
  302. func (c *Cluster) Cleanup() {
  303. c.controlMutex.Lock()
  304. defer c.controlMutex.Unlock()
  305. c.mu.Lock()
  306. node := c.nr
  307. if node == nil {
  308. c.mu.Unlock()
  309. return
  310. }
  311. state := c.currentNodeState()
  312. c.mu.Unlock()
  313. if state.IsActiveManager() {
  314. active, reachable, unreachable, err := managerStats(state.controlClient, state.NodeID())
  315. if err == nil {
  316. singlenode := active && isLastManager(reachable, unreachable)
  317. if active && !singlenode && removingManagerCausesLossOfQuorum(reachable, unreachable) {
  318. logrus.Errorf("Leaving cluster with %v managers left out of %v. Raft quorum will be lost.", reachable-1, reachable+unreachable)
  319. }
  320. }
  321. }
  322. if err := node.Stop(); err != nil {
  323. logrus.Errorf("failed to shut down cluster node: %v", err)
  324. signal.DumpStacks("")
  325. }
  326. c.mu.Lock()
  327. c.nr = nil
  328. c.mu.Unlock()
  329. }
  330. func managerStats(client swarmapi.ControlClient, currentNodeID string) (current bool, reachable int, unreachable int, err error) {
  331. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  332. defer cancel()
  333. nodes, err := client.ListNodes(ctx, &swarmapi.ListNodesRequest{})
  334. if err != nil {
  335. return false, 0, 0, err
  336. }
  337. for _, n := range nodes.Nodes {
  338. if n.ManagerStatus != nil {
  339. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_REACHABLE {
  340. reachable++
  341. if n.ID == currentNodeID {
  342. current = true
  343. }
  344. }
  345. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_UNREACHABLE {
  346. unreachable++
  347. }
  348. }
  349. }
  350. return
  351. }
  352. func detectLockedError(err error) error {
  353. if err == swarmnode.ErrInvalidUnlockKey {
  354. return errors.WithStack(errSwarmLocked)
  355. }
  356. return err
  357. }
  358. func (c *Cluster) lockedManagerAction(fn func(ctx context.Context, state nodeState) error) error {
  359. c.mu.RLock()
  360. defer c.mu.RUnlock()
  361. state := c.currentNodeState()
  362. if !state.IsActiveManager() {
  363. return c.errNoManager(state)
  364. }
  365. ctx, cancel := c.getRequestContext()
  366. defer cancel()
  367. return fn(ctx, state)
  368. }
  369. // SendClusterEvent allows to send cluster events on the configEvent channel
  370. // TODO This method should not be exposed.
  371. // Currently it is used to notify the network controller that the keys are
  372. // available
  373. func (c *Cluster) SendClusterEvent(event lncluster.ConfigEventType) {
  374. c.mu.RLock()
  375. defer c.mu.RUnlock()
  376. c.configEvent <- event
  377. }