cluster.go 31 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229
  1. package cluster
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io/ioutil"
  6. "os"
  7. "path/filepath"
  8. "strings"
  9. "sync"
  10. "time"
  11. "google.golang.org/grpc"
  12. "github.com/Sirupsen/logrus"
  13. "github.com/docker/distribution/digest"
  14. "github.com/docker/docker/daemon/cluster/convert"
  15. executorpkg "github.com/docker/docker/daemon/cluster/executor"
  16. "github.com/docker/docker/daemon/cluster/executor/container"
  17. "github.com/docker/docker/errors"
  18. "github.com/docker/docker/opts"
  19. "github.com/docker/docker/pkg/ioutils"
  20. "github.com/docker/docker/runconfig"
  21. apitypes "github.com/docker/engine-api/types"
  22. "github.com/docker/engine-api/types/filters"
  23. types "github.com/docker/engine-api/types/swarm"
  24. swarmagent "github.com/docker/swarmkit/agent"
  25. swarmapi "github.com/docker/swarmkit/api"
  26. "golang.org/x/net/context"
  27. )
  28. const swarmDirName = "swarm"
  29. const controlSocket = "control.sock"
  30. const swarmConnectTimeout = 20 * time.Second
  31. const stateFile = "docker-state.json"
  32. const defaultAddr = "0.0.0.0:2377"
  33. const (
  34. initialReconnectDelay = 100 * time.Millisecond
  35. maxReconnectDelay = 30 * time.Second
  36. )
  37. // ErrNoSwarm is returned on leaving a cluster that was never initialized
  38. var ErrNoSwarm = fmt.Errorf("This node is not part of Swarm")
  39. // ErrSwarmExists is returned on initialize or join request for a cluster that has already been activated
  40. var ErrSwarmExists = fmt.Errorf("This node is already part of a Swarm cluster. Use \"docker swarm leave\" to leave this cluster and join another one.")
  41. // ErrPendingSwarmExists is returned on initialize or join request for a cluster that is already processing a similar request but has not succeeded yet.
  42. var ErrPendingSwarmExists = fmt.Errorf("This node is processing an existing join request that has not succeeded yet. Use \"docker swarm leave\" to cancel the current request.")
  43. // ErrSwarmJoinTimeoutReached is returned when cluster join could not complete before timeout was reached.
  44. var ErrSwarmJoinTimeoutReached = fmt.Errorf("Timeout was reached before node was joined. Attempt to join the cluster will continue in the background. Use \"docker info\" command to see the current Swarm status of your node.")
  45. // defaultSpec contains some sane defaults if cluster options are missing on init
  46. var defaultSpec = types.Spec{
  47. Raft: types.RaftConfig{
  48. SnapshotInterval: 10000,
  49. KeepOldSnapshots: 0,
  50. LogEntriesForSlowFollowers: 500,
  51. HeartbeatTick: 1,
  52. ElectionTick: 3,
  53. },
  54. CAConfig: types.CAConfig{
  55. NodeCertExpiry: 90 * 24 * time.Hour,
  56. },
  57. Dispatcher: types.DispatcherConfig{
  58. HeartbeatPeriod: uint64((5 * time.Second).Nanoseconds()),
  59. },
  60. Orchestration: types.OrchestrationConfig{
  61. TaskHistoryRetentionLimit: 10,
  62. },
  63. }
  64. type state struct {
  65. ListenAddr string
  66. }
  67. // Config provides values for Cluster.
  68. type Config struct {
  69. Root string
  70. Name string
  71. Backend executorpkg.Backend
  72. }
  73. // Cluster provides capabilities to participate in a cluster as a worker or a
  74. // manager.
  75. type Cluster struct {
  76. sync.RWMutex
  77. *node
  78. root string
  79. config Config
  80. configEvent chan struct{} // todo: make this array and goroutine safe
  81. listenAddr string
  82. stop bool
  83. err error
  84. cancelDelay func()
  85. }
  86. type node struct {
  87. *swarmagent.Node
  88. done chan struct{}
  89. ready bool
  90. conn *grpc.ClientConn
  91. client swarmapi.ControlClient
  92. reconnectDelay time.Duration
  93. }
  94. // New creates a new Cluster instance using provided config.
  95. func New(config Config) (*Cluster, error) {
  96. root := filepath.Join(config.Root, swarmDirName)
  97. if err := os.MkdirAll(root, 0700); err != nil {
  98. return nil, err
  99. }
  100. c := &Cluster{
  101. root: root,
  102. config: config,
  103. configEvent: make(chan struct{}, 10),
  104. }
  105. st, err := c.loadState()
  106. if err != nil {
  107. if os.IsNotExist(err) {
  108. return c, nil
  109. }
  110. return nil, err
  111. }
  112. n, err := c.startNewNode(false, st.ListenAddr, "", "", "", false)
  113. if err != nil {
  114. return nil, err
  115. }
  116. select {
  117. case <-time.After(swarmConnectTimeout):
  118. logrus.Errorf("swarm component could not be started before timeout was reached")
  119. case <-n.Ready():
  120. case <-n.done:
  121. return nil, fmt.Errorf("swarm component could not be started: %v", c.err)
  122. }
  123. go c.reconnectOnFailure(n)
  124. return c, nil
  125. }
  126. func (c *Cluster) loadState() (*state, error) {
  127. dt, err := ioutil.ReadFile(filepath.Join(c.root, stateFile))
  128. if err != nil {
  129. return nil, err
  130. }
  131. // missing certificate means no actual state to restore from
  132. if _, err := os.Stat(filepath.Join(c.root, "certificates/swarm-node.crt")); err != nil {
  133. if os.IsNotExist(err) {
  134. c.clearState()
  135. }
  136. return nil, err
  137. }
  138. var st state
  139. if err := json.Unmarshal(dt, &st); err != nil {
  140. return nil, err
  141. }
  142. return &st, nil
  143. }
  144. func (c *Cluster) saveState() error {
  145. dt, err := json.Marshal(state{ListenAddr: c.listenAddr})
  146. if err != nil {
  147. return err
  148. }
  149. return ioutils.AtomicWriteFile(filepath.Join(c.root, stateFile), dt, 0600)
  150. }
  151. func (c *Cluster) reconnectOnFailure(n *node) {
  152. for {
  153. <-n.done
  154. c.Lock()
  155. if c.stop || c.node != nil {
  156. c.Unlock()
  157. return
  158. }
  159. n.reconnectDelay *= 2
  160. if n.reconnectDelay > maxReconnectDelay {
  161. n.reconnectDelay = maxReconnectDelay
  162. }
  163. logrus.Warnf("Restarting swarm in %.2f seconds", n.reconnectDelay.Seconds())
  164. delayCtx, cancel := context.WithTimeout(context.Background(), n.reconnectDelay)
  165. c.cancelDelay = cancel
  166. c.Unlock()
  167. <-delayCtx.Done()
  168. if delayCtx.Err() != context.DeadlineExceeded {
  169. return
  170. }
  171. c.Lock()
  172. if c.node != nil {
  173. c.Unlock()
  174. return
  175. }
  176. var err error
  177. n, err = c.startNewNode(false, c.listenAddr, c.getRemoteAddress(), "", "", false)
  178. if err != nil {
  179. c.err = err
  180. close(n.done)
  181. }
  182. c.Unlock()
  183. }
  184. }
  185. func (c *Cluster) startNewNode(forceNewCluster bool, listenAddr, joinAddr, secret, cahash string, ismanager bool) (*node, error) {
  186. if err := c.config.Backend.IsSwarmCompatible(); err != nil {
  187. return nil, err
  188. }
  189. c.node = nil
  190. c.cancelDelay = nil
  191. c.stop = false
  192. n, err := swarmagent.NewNode(&swarmagent.NodeConfig{
  193. Hostname: c.config.Name,
  194. ForceNewCluster: forceNewCluster,
  195. ListenControlAPI: filepath.Join(c.root, controlSocket),
  196. ListenRemoteAPI: listenAddr,
  197. JoinAddr: joinAddr,
  198. StateDir: c.root,
  199. CAHash: cahash,
  200. Secret: secret,
  201. Executor: container.NewExecutor(c.config.Backend),
  202. HeartbeatTick: 1,
  203. ElectionTick: 3,
  204. IsManager: ismanager,
  205. })
  206. if err != nil {
  207. return nil, err
  208. }
  209. ctx := context.Background()
  210. if err := n.Start(ctx); err != nil {
  211. return nil, err
  212. }
  213. node := &node{
  214. Node: n,
  215. done: make(chan struct{}),
  216. reconnectDelay: initialReconnectDelay,
  217. }
  218. c.node = node
  219. c.listenAddr = listenAddr
  220. c.saveState()
  221. c.config.Backend.SetClusterProvider(c)
  222. go func() {
  223. err := n.Err(ctx)
  224. if err != nil {
  225. logrus.Errorf("cluster exited with error: %v", err)
  226. }
  227. c.Lock()
  228. c.node = nil
  229. c.err = err
  230. c.Unlock()
  231. close(node.done)
  232. }()
  233. go func() {
  234. select {
  235. case <-n.Ready():
  236. c.Lock()
  237. node.ready = true
  238. c.err = nil
  239. c.Unlock()
  240. case <-ctx.Done():
  241. }
  242. c.configEvent <- struct{}{}
  243. }()
  244. go func() {
  245. for conn := range n.ListenControlSocket(ctx) {
  246. c.Lock()
  247. if node.conn != conn {
  248. if conn == nil {
  249. node.client = nil
  250. } else {
  251. node.client = swarmapi.NewControlClient(conn)
  252. }
  253. }
  254. node.conn = conn
  255. c.Unlock()
  256. c.configEvent <- struct{}{}
  257. }
  258. }()
  259. return node, nil
  260. }
  261. // Init initializes new cluster from user provided request.
  262. func (c *Cluster) Init(req types.InitRequest) (string, error) {
  263. c.Lock()
  264. if node := c.node; node != nil {
  265. if !req.ForceNewCluster {
  266. c.Unlock()
  267. return "", errSwarmExists(node)
  268. }
  269. if err := c.stopNode(); err != nil {
  270. c.Unlock()
  271. return "", err
  272. }
  273. }
  274. if err := validateAndSanitizeInitRequest(&req); err != nil {
  275. c.Unlock()
  276. return "", err
  277. }
  278. // todo: check current state existing
  279. n, err := c.startNewNode(req.ForceNewCluster, req.ListenAddr, "", "", "", false)
  280. if err != nil {
  281. c.Unlock()
  282. return "", err
  283. }
  284. c.Unlock()
  285. select {
  286. case <-n.Ready():
  287. if err := initClusterSpec(n, req.Spec); err != nil {
  288. return "", err
  289. }
  290. go c.reconnectOnFailure(n)
  291. return n.NodeID(), nil
  292. case <-n.done:
  293. c.RLock()
  294. defer c.RUnlock()
  295. if !req.ForceNewCluster { // if failure on first attempt don't keep state
  296. if err := c.clearState(); err != nil {
  297. return "", err
  298. }
  299. }
  300. return "", c.err
  301. }
  302. }
  303. // Join makes current Cluster part of an existing swarm cluster.
  304. func (c *Cluster) Join(req types.JoinRequest) error {
  305. c.Lock()
  306. if node := c.node; node != nil {
  307. c.Unlock()
  308. return errSwarmExists(node)
  309. }
  310. if err := validateAndSanitizeJoinRequest(&req); err != nil {
  311. c.Unlock()
  312. return err
  313. }
  314. // todo: check current state existing
  315. n, err := c.startNewNode(false, req.ListenAddr, req.RemoteAddrs[0], req.Secret, req.CACertHash, req.Manager)
  316. if err != nil {
  317. c.Unlock()
  318. return err
  319. }
  320. c.Unlock()
  321. certificateRequested := n.CertificateRequested()
  322. for {
  323. select {
  324. case <-certificateRequested:
  325. if n.NodeMembership() == swarmapi.NodeMembershipPending {
  326. return fmt.Errorf("Your node is in the process of joining the cluster but needs to be accepted by existing cluster member.\nTo accept this node into cluster run \"docker node accept %v\" in an existing cluster manager. Use \"docker info\" command to see the current Swarm status of your node.", n.NodeID())
  327. }
  328. certificateRequested = nil
  329. case <-time.After(swarmConnectTimeout):
  330. // attempt to connect will continue in background, also reconnecting
  331. go c.reconnectOnFailure(n)
  332. return ErrSwarmJoinTimeoutReached
  333. case <-n.Ready():
  334. go c.reconnectOnFailure(n)
  335. return nil
  336. case <-n.done:
  337. c.RLock()
  338. defer c.RUnlock()
  339. return c.err
  340. }
  341. }
  342. }
  343. // stopNode is a helper that stops the active c.node and waits until it has
  344. // shut down. Call while keeping the cluster lock.
  345. func (c *Cluster) stopNode() error {
  346. if c.node == nil {
  347. return nil
  348. }
  349. c.stop = true
  350. if c.cancelDelay != nil {
  351. c.cancelDelay()
  352. c.cancelDelay = nil
  353. }
  354. node := c.node
  355. ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
  356. defer cancel()
  357. // TODO: can't hold lock on stop because it calls back to network
  358. c.Unlock()
  359. defer c.Lock()
  360. if err := node.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") {
  361. return err
  362. }
  363. <-node.done
  364. return nil
  365. }
  366. // Leave shuts down Cluster and removes current state.
  367. func (c *Cluster) Leave(force bool) error {
  368. c.Lock()
  369. node := c.node
  370. if node == nil {
  371. c.Unlock()
  372. return ErrNoSwarm
  373. }
  374. if node.Manager() != nil && !force {
  375. msg := "You are attempting to leave cluster on a node that is participating as a manager. "
  376. if c.isActiveManager() {
  377. active, reachable, unreachable, err := c.managerStats()
  378. if err == nil {
  379. if active && reachable-2 <= unreachable {
  380. if reachable == 1 && unreachable == 0 {
  381. msg += "Removing the last manager will erase all current state of the cluster. Use `--force` to ignore this message. "
  382. c.Unlock()
  383. return fmt.Errorf(msg)
  384. }
  385. msg += fmt.Sprintf("Leaving the cluster will leave you with %v managers out of %v. This means Raft quorum will be lost and your cluster will become inaccessible. ", reachable-1, reachable+unreachable)
  386. }
  387. }
  388. } else {
  389. msg += "Doing so may lose the consensus of your cluster. "
  390. }
  391. msg += "The only way to restore a cluster that has lost consensus is to reinitialize it with `--force-new-cluster`. Use `--force` to ignore this message."
  392. c.Unlock()
  393. return fmt.Errorf(msg)
  394. }
  395. if err := c.stopNode(); err != nil {
  396. c.Unlock()
  397. return err
  398. }
  399. c.Unlock()
  400. if nodeID := node.NodeID(); nodeID != "" {
  401. for _, id := range c.config.Backend.ListContainersForNode(nodeID) {
  402. if err := c.config.Backend.ContainerRm(id, &apitypes.ContainerRmConfig{ForceRemove: true}); err != nil {
  403. logrus.Errorf("error removing %v: %v", id, err)
  404. }
  405. }
  406. }
  407. c.configEvent <- struct{}{}
  408. // todo: cleanup optional?
  409. if err := c.clearState(); err != nil {
  410. return err
  411. }
  412. return nil
  413. }
  414. func (c *Cluster) clearState() error {
  415. // todo: backup this data instead of removing?
  416. if err := os.RemoveAll(c.root); err != nil {
  417. return err
  418. }
  419. if err := os.MkdirAll(c.root, 0700); err != nil {
  420. return err
  421. }
  422. c.config.Backend.SetClusterProvider(nil)
  423. return nil
  424. }
  425. func (c *Cluster) getRequestContext() context.Context { // TODO: not needed when requests don't block on qourum lost
  426. ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
  427. return ctx
  428. }
  429. // Inspect retrieves the configuration properties of a managed swarm cluster.
  430. func (c *Cluster) Inspect() (types.Swarm, error) {
  431. c.RLock()
  432. defer c.RUnlock()
  433. if !c.isActiveManager() {
  434. return types.Swarm{}, c.errNoManager()
  435. }
  436. swarm, err := getSwarm(c.getRequestContext(), c.client)
  437. if err != nil {
  438. return types.Swarm{}, err
  439. }
  440. if err != nil {
  441. return types.Swarm{}, err
  442. }
  443. return convert.SwarmFromGRPC(*swarm), nil
  444. }
  445. // Update updates configuration of a managed swarm cluster.
  446. func (c *Cluster) Update(version uint64, spec types.Spec) error {
  447. c.RLock()
  448. defer c.RUnlock()
  449. if !c.isActiveManager() {
  450. return c.errNoManager()
  451. }
  452. swarm, err := getSwarm(c.getRequestContext(), c.client)
  453. if err != nil {
  454. return err
  455. }
  456. swarmSpec, err := convert.SwarmSpecToGRPCandMerge(spec, &swarm.Spec)
  457. if err != nil {
  458. return err
  459. }
  460. _, err = c.client.UpdateCluster(
  461. c.getRequestContext(),
  462. &swarmapi.UpdateClusterRequest{
  463. ClusterID: swarm.ID,
  464. Spec: &swarmSpec,
  465. ClusterVersion: &swarmapi.Version{
  466. Index: version,
  467. },
  468. },
  469. )
  470. return err
  471. }
  472. // IsManager returns true if Cluster is participating as a manager.
  473. func (c *Cluster) IsManager() bool {
  474. c.RLock()
  475. defer c.RUnlock()
  476. return c.isActiveManager()
  477. }
  478. // IsAgent returns true if Cluster is participating as a worker/agent.
  479. func (c *Cluster) IsAgent() bool {
  480. c.RLock()
  481. defer c.RUnlock()
  482. return c.node != nil && c.ready
  483. }
  484. // GetListenAddress returns the listening address for current manager's
  485. // consensus and dispatcher APIs.
  486. func (c *Cluster) GetListenAddress() string {
  487. c.RLock()
  488. defer c.RUnlock()
  489. if c.isActiveManager() {
  490. return c.listenAddr
  491. }
  492. return ""
  493. }
  494. // GetRemoteAddress returns a known advertise address of a remote manager if
  495. // available.
  496. // todo: change to array/connect with info
  497. func (c *Cluster) GetRemoteAddress() string {
  498. c.RLock()
  499. defer c.RUnlock()
  500. return c.getRemoteAddress()
  501. }
  502. func (c *Cluster) getRemoteAddress() string {
  503. if c.node == nil {
  504. return ""
  505. }
  506. nodeID := c.node.NodeID()
  507. for _, r := range c.node.Remotes() {
  508. if r.NodeID != nodeID {
  509. return r.Addr
  510. }
  511. }
  512. return ""
  513. }
  514. // ListenClusterEvents returns a channel that receives messages on cluster
  515. // participation changes.
  516. // todo: make cancelable and accessible to multiple callers
  517. func (c *Cluster) ListenClusterEvents() <-chan struct{} {
  518. return c.configEvent
  519. }
  520. // Info returns information about the current cluster state.
  521. func (c *Cluster) Info() types.Info {
  522. var info types.Info
  523. c.RLock()
  524. defer c.RUnlock()
  525. if c.node == nil {
  526. info.LocalNodeState = types.LocalNodeStateInactive
  527. if c.cancelDelay != nil {
  528. info.LocalNodeState = types.LocalNodeStateError
  529. }
  530. } else {
  531. info.LocalNodeState = types.LocalNodeStatePending
  532. if c.ready == true {
  533. info.LocalNodeState = types.LocalNodeStateActive
  534. }
  535. }
  536. if c.err != nil {
  537. info.Error = c.err.Error()
  538. }
  539. if c.isActiveManager() {
  540. info.ControlAvailable = true
  541. if r, err := c.client.ListNodes(c.getRequestContext(), &swarmapi.ListNodesRequest{}); err == nil {
  542. info.Nodes = len(r.Nodes)
  543. for _, n := range r.Nodes {
  544. if n.ManagerStatus != nil {
  545. info.Managers = info.Managers + 1
  546. }
  547. }
  548. }
  549. if swarm, err := getSwarm(c.getRequestContext(), c.client); err == nil && swarm != nil {
  550. info.CACertHash = swarm.RootCA.CACertHash
  551. }
  552. }
  553. if c.node != nil {
  554. for _, r := range c.node.Remotes() {
  555. info.RemoteManagers = append(info.RemoteManagers, types.Peer{NodeID: r.NodeID, Addr: r.Addr})
  556. }
  557. info.NodeID = c.node.NodeID()
  558. }
  559. return info
  560. }
  561. // isActiveManager should not be called without a read lock
  562. func (c *Cluster) isActiveManager() bool {
  563. return c.node != nil && c.conn != nil
  564. }
  565. // errNoManager returns error describing why manager commands can't be used.
  566. // Call with read lock.
  567. func (c *Cluster) errNoManager() error {
  568. if c.node == nil {
  569. return fmt.Errorf("This node is not a Swarm manager. Use \"docker swarm init\" or \"docker swarm join --manager\" to connect this node to Swarm and try again.")
  570. }
  571. if c.node.Manager() != nil {
  572. return fmt.Errorf("This node is not a Swarm manager. Manager is being prepared or has trouble connecting to the cluster.")
  573. }
  574. return fmt.Errorf("This node is not a Swarm manager. Worker nodes can't be used to view or modify cluster state. Please run this command on a manager node or promote the current node to a manager.")
  575. }
  576. // GetServices returns all services of a managed swarm cluster.
  577. func (c *Cluster) GetServices(options apitypes.ServiceListOptions) ([]types.Service, error) {
  578. c.RLock()
  579. defer c.RUnlock()
  580. if !c.isActiveManager() {
  581. return nil, c.errNoManager()
  582. }
  583. filters, err := newListServicesFilters(options.Filter)
  584. if err != nil {
  585. return nil, err
  586. }
  587. r, err := c.client.ListServices(
  588. c.getRequestContext(),
  589. &swarmapi.ListServicesRequest{Filters: filters})
  590. if err != nil {
  591. return nil, err
  592. }
  593. var services []types.Service
  594. for _, service := range r.Services {
  595. services = append(services, convert.ServiceFromGRPC(*service))
  596. }
  597. return services, nil
  598. }
  599. // CreateService creates a new service in a managed swarm cluster.
  600. func (c *Cluster) CreateService(s types.ServiceSpec, encodedAuth string) (string, error) {
  601. c.RLock()
  602. defer c.RUnlock()
  603. if !c.isActiveManager() {
  604. return "", c.errNoManager()
  605. }
  606. ctx := c.getRequestContext()
  607. err := populateNetworkID(ctx, c.client, &s)
  608. if err != nil {
  609. return "", err
  610. }
  611. serviceSpec, err := convert.ServiceSpecToGRPC(s)
  612. if err != nil {
  613. return "", err
  614. }
  615. if encodedAuth != "" {
  616. ctnr := serviceSpec.Task.GetContainer()
  617. if ctnr == nil {
  618. return "", fmt.Errorf("service does not use container tasks")
  619. }
  620. ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
  621. }
  622. r, err := c.client.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
  623. if err != nil {
  624. return "", err
  625. }
  626. return r.Service.ID, nil
  627. }
  628. // GetService returns a service based on an ID or name.
  629. func (c *Cluster) GetService(input string) (types.Service, error) {
  630. c.RLock()
  631. defer c.RUnlock()
  632. if !c.isActiveManager() {
  633. return types.Service{}, c.errNoManager()
  634. }
  635. service, err := getService(c.getRequestContext(), c.client, input)
  636. if err != nil {
  637. return types.Service{}, err
  638. }
  639. return convert.ServiceFromGRPC(*service), nil
  640. }
  641. // UpdateService updates existing service to match new properties.
  642. func (c *Cluster) UpdateService(serviceID string, version uint64, spec types.ServiceSpec, encodedAuth string) error {
  643. c.RLock()
  644. defer c.RUnlock()
  645. if !c.isActiveManager() {
  646. return c.errNoManager()
  647. }
  648. serviceSpec, err := convert.ServiceSpecToGRPC(spec)
  649. if err != nil {
  650. return err
  651. }
  652. if encodedAuth != "" {
  653. ctnr := serviceSpec.Task.GetContainer()
  654. if ctnr == nil {
  655. return fmt.Errorf("service does not use container tasks")
  656. }
  657. ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
  658. } else {
  659. // this is needed because if the encodedAuth isn't being updated then we
  660. // shouldn't lose it, and continue to use the one that was already present
  661. currentService, err := getService(c.getRequestContext(), c.client, serviceID)
  662. if err != nil {
  663. return err
  664. }
  665. ctnr := currentService.Spec.Task.GetContainer()
  666. if ctnr == nil {
  667. return fmt.Errorf("service does not use container tasks")
  668. }
  669. serviceSpec.Task.GetContainer().PullOptions = ctnr.PullOptions
  670. }
  671. _, err = c.client.UpdateService(
  672. c.getRequestContext(),
  673. &swarmapi.UpdateServiceRequest{
  674. ServiceID: serviceID,
  675. Spec: &serviceSpec,
  676. ServiceVersion: &swarmapi.Version{
  677. Index: version,
  678. },
  679. },
  680. )
  681. return err
  682. }
  683. // RemoveService removes a service from a managed swarm cluster.
  684. func (c *Cluster) RemoveService(input string) error {
  685. c.RLock()
  686. defer c.RUnlock()
  687. if !c.isActiveManager() {
  688. return c.errNoManager()
  689. }
  690. service, err := getService(c.getRequestContext(), c.client, input)
  691. if err != nil {
  692. return err
  693. }
  694. if _, err := c.client.RemoveService(c.getRequestContext(), &swarmapi.RemoveServiceRequest{ServiceID: service.ID}); err != nil {
  695. return err
  696. }
  697. return nil
  698. }
  699. // GetNodes returns a list of all nodes known to a cluster.
  700. func (c *Cluster) GetNodes(options apitypes.NodeListOptions) ([]types.Node, error) {
  701. c.RLock()
  702. defer c.RUnlock()
  703. if !c.isActiveManager() {
  704. return nil, c.errNoManager()
  705. }
  706. filters, err := newListNodesFilters(options.Filter)
  707. if err != nil {
  708. return nil, err
  709. }
  710. r, err := c.client.ListNodes(
  711. c.getRequestContext(),
  712. &swarmapi.ListNodesRequest{Filters: filters})
  713. if err != nil {
  714. return nil, err
  715. }
  716. nodes := []types.Node{}
  717. for _, node := range r.Nodes {
  718. nodes = append(nodes, convert.NodeFromGRPC(*node))
  719. }
  720. return nodes, nil
  721. }
  722. // GetNode returns a node based on an ID or name.
  723. func (c *Cluster) GetNode(input string) (types.Node, error) {
  724. c.RLock()
  725. defer c.RUnlock()
  726. if !c.isActiveManager() {
  727. return types.Node{}, c.errNoManager()
  728. }
  729. node, err := getNode(c.getRequestContext(), c.client, input)
  730. if err != nil {
  731. return types.Node{}, err
  732. }
  733. return convert.NodeFromGRPC(*node), nil
  734. }
  735. // UpdateNode updates existing nodes properties.
  736. func (c *Cluster) UpdateNode(nodeID string, version uint64, spec types.NodeSpec) error {
  737. c.RLock()
  738. defer c.RUnlock()
  739. if !c.isActiveManager() {
  740. return c.errNoManager()
  741. }
  742. nodeSpec, err := convert.NodeSpecToGRPC(spec)
  743. if err != nil {
  744. return err
  745. }
  746. _, err = c.client.UpdateNode(
  747. c.getRequestContext(),
  748. &swarmapi.UpdateNodeRequest{
  749. NodeID: nodeID,
  750. Spec: &nodeSpec,
  751. NodeVersion: &swarmapi.Version{
  752. Index: version,
  753. },
  754. },
  755. )
  756. return err
  757. }
  758. // RemoveNode removes a node from a cluster
  759. func (c *Cluster) RemoveNode(input string) error {
  760. c.RLock()
  761. defer c.RUnlock()
  762. if !c.isActiveManager() {
  763. return c.errNoManager()
  764. }
  765. ctx := c.getRequestContext()
  766. node, err := getNode(ctx, c.client, input)
  767. if err != nil {
  768. return err
  769. }
  770. if _, err := c.client.RemoveNode(ctx, &swarmapi.RemoveNodeRequest{NodeID: node.ID}); err != nil {
  771. return err
  772. }
  773. return nil
  774. }
  775. // GetTasks returns a list of tasks matching the filter options.
  776. func (c *Cluster) GetTasks(options apitypes.TaskListOptions) ([]types.Task, error) {
  777. c.RLock()
  778. defer c.RUnlock()
  779. if !c.isActiveManager() {
  780. return nil, c.errNoManager()
  781. }
  782. byName := func(filter filters.Args) error {
  783. if filter.Include("service") {
  784. serviceFilters := filter.Get("service")
  785. for _, serviceFilter := range serviceFilters {
  786. service, err := c.GetService(serviceFilter)
  787. if err != nil {
  788. return err
  789. }
  790. filter.Del("service", serviceFilter)
  791. filter.Add("service", service.ID)
  792. }
  793. }
  794. if filter.Include("node") {
  795. nodeFilters := filter.Get("node")
  796. for _, nodeFilter := range nodeFilters {
  797. node, err := c.GetNode(nodeFilter)
  798. if err != nil {
  799. return err
  800. }
  801. filter.Del("node", nodeFilter)
  802. filter.Add("node", node.ID)
  803. }
  804. }
  805. return nil
  806. }
  807. filters, err := newListTasksFilters(options.Filter, byName)
  808. if err != nil {
  809. return nil, err
  810. }
  811. r, err := c.client.ListTasks(
  812. c.getRequestContext(),
  813. &swarmapi.ListTasksRequest{Filters: filters})
  814. if err != nil {
  815. return nil, err
  816. }
  817. tasks := []types.Task{}
  818. for _, task := range r.Tasks {
  819. tasks = append(tasks, convert.TaskFromGRPC(*task))
  820. }
  821. return tasks, nil
  822. }
  823. // GetTask returns a task by an ID.
  824. func (c *Cluster) GetTask(input string) (types.Task, error) {
  825. c.RLock()
  826. defer c.RUnlock()
  827. if !c.isActiveManager() {
  828. return types.Task{}, c.errNoManager()
  829. }
  830. task, err := getTask(c.getRequestContext(), c.client, input)
  831. if err != nil {
  832. return types.Task{}, err
  833. }
  834. return convert.TaskFromGRPC(*task), nil
  835. }
  836. // GetNetwork returns a cluster network by an ID.
  837. func (c *Cluster) GetNetwork(input string) (apitypes.NetworkResource, error) {
  838. c.RLock()
  839. defer c.RUnlock()
  840. if !c.isActiveManager() {
  841. return apitypes.NetworkResource{}, c.errNoManager()
  842. }
  843. network, err := getNetwork(c.getRequestContext(), c.client, input)
  844. if err != nil {
  845. return apitypes.NetworkResource{}, err
  846. }
  847. return convert.BasicNetworkFromGRPC(*network), nil
  848. }
  849. // GetNetworks returns all current cluster managed networks.
  850. func (c *Cluster) GetNetworks() ([]apitypes.NetworkResource, error) {
  851. c.RLock()
  852. defer c.RUnlock()
  853. if !c.isActiveManager() {
  854. return nil, c.errNoManager()
  855. }
  856. r, err := c.client.ListNetworks(c.getRequestContext(), &swarmapi.ListNetworksRequest{})
  857. if err != nil {
  858. return nil, err
  859. }
  860. var networks []apitypes.NetworkResource
  861. for _, network := range r.Networks {
  862. networks = append(networks, convert.BasicNetworkFromGRPC(*network))
  863. }
  864. return networks, nil
  865. }
  866. // CreateNetwork creates a new cluster managed network.
  867. func (c *Cluster) CreateNetwork(s apitypes.NetworkCreateRequest) (string, error) {
  868. c.RLock()
  869. defer c.RUnlock()
  870. if !c.isActiveManager() {
  871. return "", c.errNoManager()
  872. }
  873. if runconfig.IsPreDefinedNetwork(s.Name) {
  874. err := fmt.Errorf("%s is a pre-defined network and cannot be created", s.Name)
  875. return "", errors.NewRequestForbiddenError(err)
  876. }
  877. networkSpec := convert.BasicNetworkCreateToGRPC(s)
  878. r, err := c.client.CreateNetwork(c.getRequestContext(), &swarmapi.CreateNetworkRequest{Spec: &networkSpec})
  879. if err != nil {
  880. return "", err
  881. }
  882. return r.Network.ID, nil
  883. }
  884. // RemoveNetwork removes a cluster network.
  885. func (c *Cluster) RemoveNetwork(input string) error {
  886. c.RLock()
  887. defer c.RUnlock()
  888. if !c.isActiveManager() {
  889. return c.errNoManager()
  890. }
  891. network, err := getNetwork(c.getRequestContext(), c.client, input)
  892. if err != nil {
  893. return err
  894. }
  895. if _, err := c.client.RemoveNetwork(c.getRequestContext(), &swarmapi.RemoveNetworkRequest{NetworkID: network.ID}); err != nil {
  896. return err
  897. }
  898. return nil
  899. }
  900. func populateNetworkID(ctx context.Context, c swarmapi.ControlClient, s *types.ServiceSpec) error {
  901. for i, n := range s.Networks {
  902. apiNetwork, err := getNetwork(ctx, c, n.Target)
  903. if err != nil {
  904. return err
  905. }
  906. s.Networks[i].Target = apiNetwork.ID
  907. }
  908. return nil
  909. }
  910. func getNetwork(ctx context.Context, c swarmapi.ControlClient, input string) (*swarmapi.Network, error) {
  911. // GetNetwork to match via full ID.
  912. rg, err := c.GetNetwork(ctx, &swarmapi.GetNetworkRequest{NetworkID: input})
  913. if err != nil {
  914. // If any error (including NotFound), ListNetworks to match via ID prefix and full name.
  915. rl, err := c.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: &swarmapi.ListNetworksRequest_Filters{Names: []string{input}}})
  916. if err != nil || len(rl.Networks) == 0 {
  917. rl, err = c.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: &swarmapi.ListNetworksRequest_Filters{IDPrefixes: []string{input}}})
  918. }
  919. if err != nil {
  920. return nil, err
  921. }
  922. if len(rl.Networks) == 0 {
  923. return nil, fmt.Errorf("network %s not found", input)
  924. }
  925. if l := len(rl.Networks); l > 1 {
  926. return nil, fmt.Errorf("network %s is ambiguous (%d matches found)", input, l)
  927. }
  928. return rl.Networks[0], nil
  929. }
  930. return rg.Network, nil
  931. }
  932. // Cleanup stops active swarm node. This is run before daemon shutdown.
  933. func (c *Cluster) Cleanup() {
  934. c.Lock()
  935. node := c.node
  936. if node == nil {
  937. c.Unlock()
  938. return
  939. }
  940. defer c.Unlock()
  941. if c.isActiveManager() {
  942. active, reachable, unreachable, err := c.managerStats()
  943. if err == nil {
  944. singlenode := active && reachable == 1 && unreachable == 0
  945. if active && !singlenode && reachable-2 <= unreachable {
  946. logrus.Errorf("Leaving cluster with %v managers left out of %v. Raft quorum will be lost.", reachable-1, reachable+unreachable)
  947. }
  948. }
  949. }
  950. c.stopNode()
  951. }
  952. func (c *Cluster) managerStats() (current bool, reachable int, unreachable int, err error) {
  953. ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
  954. nodes, err := c.client.ListNodes(ctx, &swarmapi.ListNodesRequest{})
  955. if err != nil {
  956. return false, 0, 0, err
  957. }
  958. for _, n := range nodes.Nodes {
  959. if n.ManagerStatus != nil {
  960. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_REACHABLE {
  961. reachable++
  962. if n.ID == c.node.NodeID() {
  963. current = true
  964. }
  965. }
  966. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_UNREACHABLE {
  967. unreachable++
  968. }
  969. }
  970. }
  971. return
  972. }
  973. func validateAndSanitizeInitRequest(req *types.InitRequest) error {
  974. var err error
  975. req.ListenAddr, err = validateAddr(req.ListenAddr)
  976. if err != nil {
  977. return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
  978. }
  979. spec := &req.Spec
  980. // provide sane defaults instead of erroring
  981. if spec.Name == "" {
  982. spec.Name = "default"
  983. }
  984. if spec.Raft.SnapshotInterval == 0 {
  985. spec.Raft.SnapshotInterval = defaultSpec.Raft.SnapshotInterval
  986. }
  987. if spec.Raft.LogEntriesForSlowFollowers == 0 {
  988. spec.Raft.LogEntriesForSlowFollowers = defaultSpec.Raft.LogEntriesForSlowFollowers
  989. }
  990. if spec.Raft.ElectionTick == 0 {
  991. spec.Raft.ElectionTick = defaultSpec.Raft.ElectionTick
  992. }
  993. if spec.Raft.HeartbeatTick == 0 {
  994. spec.Raft.HeartbeatTick = defaultSpec.Raft.HeartbeatTick
  995. }
  996. if spec.Dispatcher.HeartbeatPeriod == 0 {
  997. spec.Dispatcher.HeartbeatPeriod = defaultSpec.Dispatcher.HeartbeatPeriod
  998. }
  999. if spec.CAConfig.NodeCertExpiry == 0 {
  1000. spec.CAConfig.NodeCertExpiry = defaultSpec.CAConfig.NodeCertExpiry
  1001. }
  1002. if spec.Orchestration.TaskHistoryRetentionLimit == 0 {
  1003. spec.Orchestration.TaskHistoryRetentionLimit = defaultSpec.Orchestration.TaskHistoryRetentionLimit
  1004. }
  1005. return nil
  1006. }
  1007. func validateAndSanitizeJoinRequest(req *types.JoinRequest) error {
  1008. var err error
  1009. req.ListenAddr, err = validateAddr(req.ListenAddr)
  1010. if err != nil {
  1011. return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
  1012. }
  1013. if len(req.RemoteAddrs) == 0 {
  1014. return fmt.Errorf("at least 1 RemoteAddr is required to join")
  1015. }
  1016. for i := range req.RemoteAddrs {
  1017. req.RemoteAddrs[i], err = validateAddr(req.RemoteAddrs[i])
  1018. if err != nil {
  1019. return fmt.Errorf("invalid remoteAddr %q: %v", req.RemoteAddrs[i], err)
  1020. }
  1021. }
  1022. if req.CACertHash != "" {
  1023. if _, err := digest.ParseDigest(req.CACertHash); err != nil {
  1024. return fmt.Errorf("invalid CACertHash %q, %v", req.CACertHash, err)
  1025. }
  1026. }
  1027. return nil
  1028. }
  1029. func validateAddr(addr string) (string, error) {
  1030. if addr == "" {
  1031. return addr, fmt.Errorf("invalid empty address")
  1032. }
  1033. newaddr, err := opts.ParseTCPAddr(addr, defaultAddr)
  1034. if err != nil {
  1035. return addr, nil
  1036. }
  1037. return strings.TrimPrefix(newaddr, "tcp://"), nil
  1038. }
  1039. func errSwarmExists(node *node) error {
  1040. if node.NodeMembership() != swarmapi.NodeMembershipAccepted {
  1041. return ErrPendingSwarmExists
  1042. }
  1043. return ErrSwarmExists
  1044. }
  1045. func initClusterSpec(node *node, spec types.Spec) error {
  1046. ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
  1047. for conn := range node.ListenControlSocket(ctx) {
  1048. if ctx.Err() != nil {
  1049. return ctx.Err()
  1050. }
  1051. if conn != nil {
  1052. client := swarmapi.NewControlClient(conn)
  1053. var cluster *swarmapi.Cluster
  1054. for i := 0; ; i++ {
  1055. lcr, err := client.ListClusters(ctx, &swarmapi.ListClustersRequest{})
  1056. if err != nil {
  1057. return fmt.Errorf("error on listing clusters: %v", err)
  1058. }
  1059. if len(lcr.Clusters) == 0 {
  1060. if i < 10 {
  1061. time.Sleep(200 * time.Millisecond)
  1062. continue
  1063. }
  1064. return fmt.Errorf("empty list of clusters was returned")
  1065. }
  1066. cluster = lcr.Clusters[0]
  1067. break
  1068. }
  1069. newspec, err := convert.SwarmSpecToGRPCandMerge(spec, &cluster.Spec)
  1070. if err != nil {
  1071. return fmt.Errorf("error updating cluster settings: %v", err)
  1072. }
  1073. _, err = client.UpdateCluster(ctx, &swarmapi.UpdateClusterRequest{
  1074. ClusterID: cluster.ID,
  1075. ClusterVersion: &cluster.Meta.Version,
  1076. Spec: &newspec,
  1077. })
  1078. if err != nil {
  1079. return fmt.Errorf("error updating cluster settings: %v", err)
  1080. }
  1081. return nil
  1082. }
  1083. }
  1084. return ctx.Err()
  1085. }