swarm.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568
  1. package cluster
  2. import (
  3. "fmt"
  4. "net"
  5. "strings"
  6. "time"
  7. apitypes "github.com/docker/docker/api/types"
  8. "github.com/docker/docker/api/types/filters"
  9. types "github.com/docker/docker/api/types/swarm"
  10. "github.com/docker/docker/daemon/cluster/convert"
  11. "github.com/docker/docker/errdefs"
  12. "github.com/docker/docker/opts"
  13. "github.com/docker/docker/pkg/signal"
  14. swarmapi "github.com/docker/swarmkit/api"
  15. "github.com/docker/swarmkit/manager/encryption"
  16. swarmnode "github.com/docker/swarmkit/node"
  17. "github.com/pkg/errors"
  18. "github.com/sirupsen/logrus"
  19. "golang.org/x/net/context"
  20. )
  21. // Init initializes new cluster from user provided request.
  22. func (c *Cluster) Init(req types.InitRequest) (string, error) {
  23. c.controlMutex.Lock()
  24. defer c.controlMutex.Unlock()
  25. if c.nr != nil {
  26. if req.ForceNewCluster {
  27. // Take c.mu temporarily to wait for presently running
  28. // API handlers to finish before shutting down the node.
  29. c.mu.Lock()
  30. if !c.nr.nodeState.IsManager() {
  31. return "", errSwarmNotManager
  32. }
  33. c.mu.Unlock()
  34. if err := c.nr.Stop(); err != nil {
  35. return "", err
  36. }
  37. } else {
  38. return "", errSwarmExists
  39. }
  40. }
  41. if err := validateAndSanitizeInitRequest(&req); err != nil {
  42. return "", errdefs.InvalidParameter(err)
  43. }
  44. listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
  45. if err != nil {
  46. return "", err
  47. }
  48. advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
  49. if err != nil {
  50. return "", err
  51. }
  52. dataPathAddr, err := resolveDataPathAddr(req.DataPathAddr)
  53. if err != nil {
  54. return "", err
  55. }
  56. localAddr := listenHost
  57. // If the local address is undetermined, the advertise address
  58. // will be used as local address, if it belongs to this system.
  59. // If the advertise address is not local, then we try to find
  60. // a system address to use as local address. If this fails,
  61. // we give up and ask the user to pass the listen address.
  62. if net.ParseIP(localAddr).IsUnspecified() {
  63. advertiseIP := net.ParseIP(advertiseHost)
  64. found := false
  65. for _, systemIP := range listSystemIPs() {
  66. if systemIP.Equal(advertiseIP) {
  67. localAddr = advertiseIP.String()
  68. found = true
  69. break
  70. }
  71. }
  72. if !found {
  73. ip, err := c.resolveSystemAddr()
  74. if err != nil {
  75. logrus.Warnf("Could not find a local address: %v", err)
  76. return "", errMustSpecifyListenAddr
  77. }
  78. localAddr = ip.String()
  79. }
  80. }
  81. nr, err := c.newNodeRunner(nodeStartConfig{
  82. forceNewCluster: req.ForceNewCluster,
  83. autolock: req.AutoLockManagers,
  84. LocalAddr: localAddr,
  85. ListenAddr: net.JoinHostPort(listenHost, listenPort),
  86. AdvertiseAddr: net.JoinHostPort(advertiseHost, advertisePort),
  87. DataPathAddr: dataPathAddr,
  88. availability: req.Availability,
  89. })
  90. if err != nil {
  91. return "", err
  92. }
  93. c.mu.Lock()
  94. c.nr = nr
  95. c.mu.Unlock()
  96. if err := <-nr.Ready(); err != nil {
  97. c.mu.Lock()
  98. c.nr = nil
  99. c.mu.Unlock()
  100. if !req.ForceNewCluster { // if failure on first attempt don't keep state
  101. if err := clearPersistentState(c.root); err != nil {
  102. return "", err
  103. }
  104. }
  105. return "", err
  106. }
  107. state := nr.State()
  108. if state.swarmNode == nil { // should never happen but protect from panic
  109. return "", errors.New("invalid cluster state for spec initialization")
  110. }
  111. if err := initClusterSpec(state.swarmNode, req.Spec); err != nil {
  112. return "", err
  113. }
  114. return state.NodeID(), nil
  115. }
  116. // Join makes current Cluster part of an existing swarm cluster.
  117. func (c *Cluster) Join(req types.JoinRequest) error {
  118. c.controlMutex.Lock()
  119. defer c.controlMutex.Unlock()
  120. c.mu.Lock()
  121. if c.nr != nil {
  122. c.mu.Unlock()
  123. return errors.WithStack(errSwarmExists)
  124. }
  125. c.mu.Unlock()
  126. if err := validateAndSanitizeJoinRequest(&req); err != nil {
  127. return errdefs.InvalidParameter(err)
  128. }
  129. listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
  130. if err != nil {
  131. return err
  132. }
  133. var advertiseAddr string
  134. if req.AdvertiseAddr != "" {
  135. advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
  136. // For joining, we don't need to provide an advertise address,
  137. // since the remote side can detect it.
  138. if err == nil {
  139. advertiseAddr = net.JoinHostPort(advertiseHost, advertisePort)
  140. }
  141. }
  142. dataPathAddr, err := resolveDataPathAddr(req.DataPathAddr)
  143. if err != nil {
  144. return err
  145. }
  146. nr, err := c.newNodeRunner(nodeStartConfig{
  147. RemoteAddr: req.RemoteAddrs[0],
  148. ListenAddr: net.JoinHostPort(listenHost, listenPort),
  149. AdvertiseAddr: advertiseAddr,
  150. DataPathAddr: dataPathAddr,
  151. joinAddr: req.RemoteAddrs[0],
  152. joinToken: req.JoinToken,
  153. availability: req.Availability,
  154. })
  155. if err != nil {
  156. return err
  157. }
  158. c.mu.Lock()
  159. c.nr = nr
  160. c.mu.Unlock()
  161. select {
  162. case <-time.After(swarmConnectTimeout):
  163. return errSwarmJoinTimeoutReached
  164. case err := <-nr.Ready():
  165. if err != nil {
  166. c.mu.Lock()
  167. c.nr = nil
  168. c.mu.Unlock()
  169. if err := clearPersistentState(c.root); err != nil {
  170. return err
  171. }
  172. }
  173. return err
  174. }
  175. }
  176. // Inspect retrieves the configuration properties of a managed swarm cluster.
  177. func (c *Cluster) Inspect() (types.Swarm, error) {
  178. var swarm types.Swarm
  179. if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
  180. s, err := c.inspect(ctx, state)
  181. if err != nil {
  182. return err
  183. }
  184. swarm = s
  185. return nil
  186. }); err != nil {
  187. return types.Swarm{}, err
  188. }
  189. return swarm, nil
  190. }
  191. func (c *Cluster) inspect(ctx context.Context, state nodeState) (types.Swarm, error) {
  192. s, err := getSwarm(ctx, state.controlClient)
  193. if err != nil {
  194. return types.Swarm{}, err
  195. }
  196. return convert.SwarmFromGRPC(*s), nil
  197. }
  198. // Update updates configuration of a managed swarm cluster.
  199. func (c *Cluster) Update(version uint64, spec types.Spec, flags types.UpdateFlags) error {
  200. return c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
  201. swarm, err := getSwarm(ctx, state.controlClient)
  202. if err != nil {
  203. return err
  204. }
  205. // Validate spec name.
  206. if spec.Annotations.Name == "" {
  207. spec.Annotations.Name = "default"
  208. } else if spec.Annotations.Name != "default" {
  209. return errdefs.InvalidParameter(errors.New(`swarm spec must be named "default"`))
  210. }
  211. // In update, client should provide the complete spec of the swarm, including
  212. // Name and Labels. If a field is specified with 0 or nil, then the default value
  213. // will be used to swarmkit.
  214. clusterSpec, err := convert.SwarmSpecToGRPC(spec)
  215. if err != nil {
  216. return errdefs.InvalidParameter(err)
  217. }
  218. _, err = state.controlClient.UpdateCluster(
  219. ctx,
  220. &swarmapi.UpdateClusterRequest{
  221. ClusterID: swarm.ID,
  222. Spec: &clusterSpec,
  223. ClusterVersion: &swarmapi.Version{
  224. Index: version,
  225. },
  226. Rotation: swarmapi.KeyRotation{
  227. WorkerJoinToken: flags.RotateWorkerToken,
  228. ManagerJoinToken: flags.RotateManagerToken,
  229. ManagerUnlockKey: flags.RotateManagerUnlockKey,
  230. },
  231. },
  232. )
  233. return err
  234. })
  235. }
  236. // GetUnlockKey returns the unlock key for the swarm.
  237. func (c *Cluster) GetUnlockKey() (string, error) {
  238. var resp *swarmapi.GetUnlockKeyResponse
  239. if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
  240. client := swarmapi.NewCAClient(state.grpcConn)
  241. r, err := client.GetUnlockKey(ctx, &swarmapi.GetUnlockKeyRequest{})
  242. if err != nil {
  243. return err
  244. }
  245. resp = r
  246. return nil
  247. }); err != nil {
  248. return "", err
  249. }
  250. if len(resp.UnlockKey) == 0 {
  251. // no key
  252. return "", nil
  253. }
  254. return encryption.HumanReadableKey(resp.UnlockKey), nil
  255. }
  256. // UnlockSwarm provides a key to decrypt data that is encrypted at rest.
  257. func (c *Cluster) UnlockSwarm(req types.UnlockRequest) error {
  258. c.controlMutex.Lock()
  259. defer c.controlMutex.Unlock()
  260. c.mu.RLock()
  261. state := c.currentNodeState()
  262. if !state.IsActiveManager() {
  263. // when manager is not active,
  264. // unless it is locked, otherwise return error.
  265. if err := c.errNoManager(state); err != errSwarmLocked {
  266. c.mu.RUnlock()
  267. return err
  268. }
  269. } else {
  270. // when manager is active, return an error of "not locked"
  271. c.mu.RUnlock()
  272. return notLockedError{}
  273. }
  274. // only when swarm is locked, code running reaches here
  275. nr := c.nr
  276. c.mu.RUnlock()
  277. key, err := encryption.ParseHumanReadableKey(req.UnlockKey)
  278. if err != nil {
  279. return errdefs.InvalidParameter(err)
  280. }
  281. config := nr.config
  282. config.lockKey = key
  283. if err := nr.Stop(); err != nil {
  284. return err
  285. }
  286. nr, err = c.newNodeRunner(config)
  287. if err != nil {
  288. return err
  289. }
  290. c.mu.Lock()
  291. c.nr = nr
  292. c.mu.Unlock()
  293. if err := <-nr.Ready(); err != nil {
  294. if errors.Cause(err) == errSwarmLocked {
  295. return invalidUnlockKey{}
  296. }
  297. return errors.Errorf("swarm component could not be started: %v", err)
  298. }
  299. return nil
  300. }
  301. // Leave shuts down Cluster and removes current state.
  302. func (c *Cluster) Leave(force bool) error {
  303. c.controlMutex.Lock()
  304. defer c.controlMutex.Unlock()
  305. c.mu.Lock()
  306. nr := c.nr
  307. if nr == nil {
  308. c.mu.Unlock()
  309. return errors.WithStack(errNoSwarm)
  310. }
  311. state := c.currentNodeState()
  312. c.mu.Unlock()
  313. if errors.Cause(state.err) == errSwarmLocked && !force {
  314. // leave a locked swarm without --force is not allowed
  315. return errors.WithStack(notAvailableError("Swarm is encrypted and locked. Please unlock it first or use `--force` to ignore this message."))
  316. }
  317. if state.IsManager() && !force {
  318. msg := "You are attempting to leave the swarm on a node that is participating as a manager. "
  319. if state.IsActiveManager() {
  320. active, reachable, unreachable, err := managerStats(state.controlClient, state.NodeID())
  321. if err == nil {
  322. if active && removingManagerCausesLossOfQuorum(reachable, unreachable) {
  323. if isLastManager(reachable, unreachable) {
  324. msg += "Removing the last manager erases all current state of the swarm. Use `--force` to ignore this message. "
  325. return errors.WithStack(notAvailableError(msg))
  326. }
  327. msg += fmt.Sprintf("Removing this node leaves %v managers out of %v. Without a Raft quorum your swarm will be inaccessible. ", reachable-1, reachable+unreachable)
  328. }
  329. }
  330. } else {
  331. msg += "Doing so may lose the consensus of your cluster. "
  332. }
  333. msg += "The only way to restore a swarm that has lost consensus is to reinitialize it with `--force-new-cluster`. Use `--force` to suppress this message."
  334. return errors.WithStack(notAvailableError(msg))
  335. }
  336. // release readers in here
  337. if err := nr.Stop(); err != nil {
  338. logrus.Errorf("failed to shut down cluster node: %v", err)
  339. signal.DumpStacks("")
  340. return err
  341. }
  342. c.mu.Lock()
  343. c.nr = nil
  344. c.mu.Unlock()
  345. if nodeID := state.NodeID(); nodeID != "" {
  346. nodeContainers, err := c.listContainerForNode(nodeID)
  347. if err != nil {
  348. return err
  349. }
  350. for _, id := range nodeContainers {
  351. if err := c.config.Backend.ContainerRm(id, &apitypes.ContainerRmConfig{ForceRemove: true}); err != nil {
  352. logrus.Errorf("error removing %v: %v", id, err)
  353. }
  354. }
  355. }
  356. // todo: cleanup optional?
  357. if err := clearPersistentState(c.root); err != nil {
  358. return err
  359. }
  360. c.config.Backend.DaemonLeavesCluster()
  361. return nil
  362. }
  363. // Info returns information about the current cluster state.
  364. func (c *Cluster) Info() types.Info {
  365. info := types.Info{
  366. NodeAddr: c.GetAdvertiseAddress(),
  367. }
  368. c.mu.RLock()
  369. defer c.mu.RUnlock()
  370. state := c.currentNodeState()
  371. info.LocalNodeState = state.status
  372. if state.err != nil {
  373. info.Error = state.err.Error()
  374. }
  375. ctx, cancel := c.getRequestContext()
  376. defer cancel()
  377. if state.IsActiveManager() {
  378. info.ControlAvailable = true
  379. swarm, err := c.inspect(ctx, state)
  380. if err != nil {
  381. info.Error = err.Error()
  382. }
  383. info.Cluster = &swarm.ClusterInfo
  384. if r, err := state.controlClient.ListNodes(ctx, &swarmapi.ListNodesRequest{}); err != nil {
  385. info.Error = err.Error()
  386. } else {
  387. info.Nodes = len(r.Nodes)
  388. for _, n := range r.Nodes {
  389. if n.ManagerStatus != nil {
  390. info.Managers = info.Managers + 1
  391. }
  392. }
  393. }
  394. }
  395. if state.swarmNode != nil {
  396. for _, r := range state.swarmNode.Remotes() {
  397. info.RemoteManagers = append(info.RemoteManagers, types.Peer{NodeID: r.NodeID, Addr: r.Addr})
  398. }
  399. info.NodeID = state.swarmNode.NodeID()
  400. }
  401. return info
  402. }
  403. func validateAndSanitizeInitRequest(req *types.InitRequest) error {
  404. var err error
  405. req.ListenAddr, err = validateAddr(req.ListenAddr)
  406. if err != nil {
  407. return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
  408. }
  409. if req.Spec.Annotations.Name == "" {
  410. req.Spec.Annotations.Name = "default"
  411. } else if req.Spec.Annotations.Name != "default" {
  412. return errors.New(`swarm spec must be named "default"`)
  413. }
  414. return nil
  415. }
  416. func validateAndSanitizeJoinRequest(req *types.JoinRequest) error {
  417. var err error
  418. req.ListenAddr, err = validateAddr(req.ListenAddr)
  419. if err != nil {
  420. return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
  421. }
  422. if len(req.RemoteAddrs) == 0 {
  423. return errors.New("at least 1 RemoteAddr is required to join")
  424. }
  425. for i := range req.RemoteAddrs {
  426. req.RemoteAddrs[i], err = validateAddr(req.RemoteAddrs[i])
  427. if err != nil {
  428. return fmt.Errorf("invalid remoteAddr %q: %v", req.RemoteAddrs[i], err)
  429. }
  430. }
  431. return nil
  432. }
  433. func validateAddr(addr string) (string, error) {
  434. if addr == "" {
  435. return addr, errors.New("invalid empty address")
  436. }
  437. newaddr, err := opts.ParseTCPAddr(addr, defaultAddr)
  438. if err != nil {
  439. return addr, nil
  440. }
  441. return strings.TrimPrefix(newaddr, "tcp://"), nil
  442. }
  443. func initClusterSpec(node *swarmnode.Node, spec types.Spec) error {
  444. ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
  445. for conn := range node.ListenControlSocket(ctx) {
  446. if ctx.Err() != nil {
  447. return ctx.Err()
  448. }
  449. if conn != nil {
  450. client := swarmapi.NewControlClient(conn)
  451. var cluster *swarmapi.Cluster
  452. for i := 0; ; i++ {
  453. lcr, err := client.ListClusters(ctx, &swarmapi.ListClustersRequest{})
  454. if err != nil {
  455. return fmt.Errorf("error on listing clusters: %v", err)
  456. }
  457. if len(lcr.Clusters) == 0 {
  458. if i < 10 {
  459. time.Sleep(200 * time.Millisecond)
  460. continue
  461. }
  462. return errors.New("empty list of clusters was returned")
  463. }
  464. cluster = lcr.Clusters[0]
  465. break
  466. }
  467. // In init, we take the initial default values from swarmkit, and merge
  468. // any non nil or 0 value from spec to GRPC spec. This will leave the
  469. // default value alone.
  470. // Note that this is different from Update(), as in Update() we expect
  471. // user to specify the complete spec of the cluster (as they already know
  472. // the existing one and knows which field to update)
  473. clusterSpec, err := convert.MergeSwarmSpecToGRPC(spec, cluster.Spec)
  474. if err != nil {
  475. return fmt.Errorf("error updating cluster settings: %v", err)
  476. }
  477. _, err = client.UpdateCluster(ctx, &swarmapi.UpdateClusterRequest{
  478. ClusterID: cluster.ID,
  479. ClusterVersion: &cluster.Meta.Version,
  480. Spec: &clusterSpec,
  481. })
  482. if err != nil {
  483. return fmt.Errorf("error updating cluster settings: %v", err)
  484. }
  485. return nil
  486. }
  487. }
  488. return ctx.Err()
  489. }
  490. func (c *Cluster) listContainerForNode(nodeID string) ([]string, error) {
  491. var ids []string
  492. filters := filters.NewArgs()
  493. filters.Add("label", fmt.Sprintf("com.docker.swarm.node.id=%s", nodeID))
  494. containers, err := c.config.Backend.Containers(&apitypes.ContainerListOptions{
  495. Filters: filters,
  496. })
  497. if err != nil {
  498. return []string{}, err
  499. }
  500. for _, c := range containers {
  501. ids = append(ids, c.ID)
  502. }
  503. return ids, nil
  504. }