cluster.go 31 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202
  1. package cluster
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io/ioutil"
  6. "os"
  7. "path/filepath"
  8. "strings"
  9. "sync"
  10. "time"
  11. "google.golang.org/grpc"
  12. "github.com/Sirupsen/logrus"
  13. "github.com/docker/distribution/digest"
  14. "github.com/docker/docker/daemon/cluster/convert"
  15. executorpkg "github.com/docker/docker/daemon/cluster/executor"
  16. "github.com/docker/docker/daemon/cluster/executor/container"
  17. "github.com/docker/docker/errors"
  18. "github.com/docker/docker/opts"
  19. "github.com/docker/docker/pkg/ioutils"
  20. "github.com/docker/docker/runconfig"
  21. apitypes "github.com/docker/engine-api/types"
  22. types "github.com/docker/engine-api/types/swarm"
  23. swarmagent "github.com/docker/swarmkit/agent"
  24. swarmapi "github.com/docker/swarmkit/api"
  25. "golang.org/x/net/context"
  26. )
  27. const swarmDirName = "swarm"
  28. const controlSocket = "control.sock"
  29. const swarmConnectTimeout = 20 * time.Second
  30. const stateFile = "docker-state.json"
  31. const defaultAddr = "0.0.0.0:2377"
  32. const (
  33. initialReconnectDelay = 100 * time.Millisecond
  34. maxReconnectDelay = 30 * time.Second
  35. )
  36. // ErrNoSwarm is returned on leaving a cluster that was never initialized
  37. var ErrNoSwarm = fmt.Errorf("This node is not part of Swarm")
  38. // ErrSwarmExists is returned on initialize or join request for a cluster that has already been activated
  39. var ErrSwarmExists = fmt.Errorf("This node is already part of a Swarm cluster. Use \"docker swarm leave\" to leave this cluster and join another one.")
  40. // ErrPendingSwarmExists is returned on initialize or join request for a cluster that is already processing a similar request but has not succeeded yet.
  41. var ErrPendingSwarmExists = fmt.Errorf("This node is processing an existing join request that has not succeeded yet. Use \"docker swarm leave\" to cancel the current request.")
  42. // ErrSwarmJoinTimeoutReached is returned when cluster join could not complete before timeout was reached.
  43. var ErrSwarmJoinTimeoutReached = fmt.Errorf("Timeout was reached before node was joined. Attempt to join the cluster will continue in the background. Use \"docker info\" command to see the current Swarm status of your node.")
  44. // defaultSpec contains some sane defaults if cluster options are missing on init
  45. var defaultSpec = types.Spec{
  46. Raft: types.RaftConfig{
  47. SnapshotInterval: 10000,
  48. KeepOldSnapshots: 0,
  49. LogEntriesForSlowFollowers: 500,
  50. HeartbeatTick: 1,
  51. ElectionTick: 3,
  52. },
  53. CAConfig: types.CAConfig{
  54. NodeCertExpiry: 90 * 24 * time.Hour,
  55. },
  56. Dispatcher: types.DispatcherConfig{
  57. HeartbeatPeriod: uint64((5 * time.Second).Nanoseconds()),
  58. },
  59. Orchestration: types.OrchestrationConfig{
  60. TaskHistoryRetentionLimit: 10,
  61. },
  62. }
  63. type state struct {
  64. ListenAddr string
  65. }
  66. // Config provides values for Cluster.
  67. type Config struct {
  68. Root string
  69. Name string
  70. Backend executorpkg.Backend
  71. }
  72. // Cluster provides capabilities to participate in a cluster as a worker or a
  73. // manager.
  74. type Cluster struct {
  75. sync.RWMutex
  76. *node
  77. root string
  78. config Config
  79. configEvent chan struct{} // todo: make this array and goroutine safe
  80. listenAddr string
  81. stop bool
  82. err error
  83. cancelDelay func()
  84. }
  85. type node struct {
  86. *swarmagent.Node
  87. done chan struct{}
  88. ready bool
  89. conn *grpc.ClientConn
  90. client swarmapi.ControlClient
  91. reconnectDelay time.Duration
  92. }
  93. // New creates a new Cluster instance using provided config.
  94. func New(config Config) (*Cluster, error) {
  95. root := filepath.Join(config.Root, swarmDirName)
  96. if err := os.MkdirAll(root, 0700); err != nil {
  97. return nil, err
  98. }
  99. c := &Cluster{
  100. root: root,
  101. config: config,
  102. configEvent: make(chan struct{}, 10),
  103. }
  104. st, err := c.loadState()
  105. if err != nil {
  106. if os.IsNotExist(err) {
  107. return c, nil
  108. }
  109. return nil, err
  110. }
  111. n, err := c.startNewNode(false, st.ListenAddr, "", "", "", false)
  112. if err != nil {
  113. return nil, err
  114. }
  115. select {
  116. case <-time.After(swarmConnectTimeout):
  117. logrus.Errorf("swarm component could not be started before timeout was reached")
  118. case <-n.Ready():
  119. case <-n.done:
  120. return nil, fmt.Errorf("swarm component could not be started: %v", c.err)
  121. }
  122. go c.reconnectOnFailure(n)
  123. return c, nil
  124. }
  125. func (c *Cluster) loadState() (*state, error) {
  126. dt, err := ioutil.ReadFile(filepath.Join(c.root, stateFile))
  127. if err != nil {
  128. return nil, err
  129. }
  130. // missing certificate means no actual state to restore from
  131. if _, err := os.Stat(filepath.Join(c.root, "certificates/swarm-node.crt")); err != nil {
  132. if os.IsNotExist(err) {
  133. c.clearState()
  134. }
  135. return nil, err
  136. }
  137. var st state
  138. if err := json.Unmarshal(dt, &st); err != nil {
  139. return nil, err
  140. }
  141. return &st, nil
  142. }
  143. func (c *Cluster) saveState() error {
  144. dt, err := json.Marshal(state{ListenAddr: c.listenAddr})
  145. if err != nil {
  146. return err
  147. }
  148. return ioutils.AtomicWriteFile(filepath.Join(c.root, stateFile), dt, 0600)
  149. }
  150. func (c *Cluster) reconnectOnFailure(n *node) {
  151. for {
  152. <-n.done
  153. c.Lock()
  154. if c.stop || c.node != nil {
  155. c.Unlock()
  156. return
  157. }
  158. n.reconnectDelay *= 2
  159. if n.reconnectDelay > maxReconnectDelay {
  160. n.reconnectDelay = maxReconnectDelay
  161. }
  162. logrus.Warnf("Restarting swarm in %.2f seconds", n.reconnectDelay.Seconds())
  163. delayCtx, cancel := context.WithTimeout(context.Background(), n.reconnectDelay)
  164. c.cancelDelay = cancel
  165. c.Unlock()
  166. <-delayCtx.Done()
  167. if delayCtx.Err() != context.DeadlineExceeded {
  168. return
  169. }
  170. c.Lock()
  171. if c.node != nil {
  172. c.Unlock()
  173. return
  174. }
  175. var err error
  176. n, err = c.startNewNode(false, c.listenAddr, c.getRemoteAddress(), "", "", false)
  177. if err != nil {
  178. c.err = err
  179. close(n.done)
  180. }
  181. c.Unlock()
  182. }
  183. }
  184. func (c *Cluster) startNewNode(forceNewCluster bool, listenAddr, joinAddr, secret, cahash string, ismanager bool) (*node, error) {
  185. if err := c.config.Backend.IsSwarmCompatible(); err != nil {
  186. return nil, err
  187. }
  188. c.node = nil
  189. c.cancelDelay = nil
  190. c.stop = false
  191. n, err := swarmagent.NewNode(&swarmagent.NodeConfig{
  192. Hostname: c.config.Name,
  193. ForceNewCluster: forceNewCluster,
  194. ListenControlAPI: filepath.Join(c.root, controlSocket),
  195. ListenRemoteAPI: listenAddr,
  196. JoinAddr: joinAddr,
  197. StateDir: c.root,
  198. CAHash: cahash,
  199. Secret: secret,
  200. Executor: container.NewExecutor(c.config.Backend),
  201. HeartbeatTick: 1,
  202. ElectionTick: 3,
  203. IsManager: ismanager,
  204. })
  205. if err != nil {
  206. return nil, err
  207. }
  208. ctx := context.Background()
  209. if err := n.Start(ctx); err != nil {
  210. return nil, err
  211. }
  212. node := &node{
  213. Node: n,
  214. done: make(chan struct{}),
  215. reconnectDelay: initialReconnectDelay,
  216. }
  217. c.node = node
  218. c.listenAddr = listenAddr
  219. c.saveState()
  220. c.config.Backend.SetClusterProvider(c)
  221. go func() {
  222. err := n.Err(ctx)
  223. if err != nil {
  224. logrus.Errorf("cluster exited with error: %v", err)
  225. }
  226. c.Lock()
  227. c.node = nil
  228. c.err = err
  229. c.Unlock()
  230. close(node.done)
  231. }()
  232. go func() {
  233. select {
  234. case <-n.Ready():
  235. c.Lock()
  236. node.ready = true
  237. c.err = nil
  238. c.Unlock()
  239. case <-ctx.Done():
  240. }
  241. c.configEvent <- struct{}{}
  242. }()
  243. go func() {
  244. for conn := range n.ListenControlSocket(ctx) {
  245. c.Lock()
  246. if node.conn != conn {
  247. if conn == nil {
  248. node.client = nil
  249. } else {
  250. node.client = swarmapi.NewControlClient(conn)
  251. }
  252. }
  253. node.conn = conn
  254. c.Unlock()
  255. c.configEvent <- struct{}{}
  256. }
  257. }()
  258. return node, nil
  259. }
  260. // Init initializes new cluster from user provided request.
  261. func (c *Cluster) Init(req types.InitRequest) (string, error) {
  262. c.Lock()
  263. if node := c.node; node != nil {
  264. if !req.ForceNewCluster {
  265. c.Unlock()
  266. return "", errSwarmExists(node)
  267. }
  268. if err := c.stopNode(); err != nil {
  269. c.Unlock()
  270. return "", err
  271. }
  272. }
  273. if err := validateAndSanitizeInitRequest(&req); err != nil {
  274. c.Unlock()
  275. return "", err
  276. }
  277. // todo: check current state existing
  278. n, err := c.startNewNode(req.ForceNewCluster, req.ListenAddr, "", "", "", false)
  279. if err != nil {
  280. c.Unlock()
  281. return "", err
  282. }
  283. c.Unlock()
  284. select {
  285. case <-n.Ready():
  286. if err := initClusterSpec(n, req.Spec); err != nil {
  287. return "", err
  288. }
  289. go c.reconnectOnFailure(n)
  290. return n.NodeID(), nil
  291. case <-n.done:
  292. c.RLock()
  293. defer c.RUnlock()
  294. if !req.ForceNewCluster { // if failure on first attempt don't keep state
  295. if err := c.clearState(); err != nil {
  296. return "", err
  297. }
  298. }
  299. return "", c.err
  300. }
  301. }
  302. // Join makes current Cluster part of an existing swarm cluster.
  303. func (c *Cluster) Join(req types.JoinRequest) error {
  304. c.Lock()
  305. if node := c.node; node != nil {
  306. c.Unlock()
  307. return errSwarmExists(node)
  308. }
  309. if err := validateAndSanitizeJoinRequest(&req); err != nil {
  310. c.Unlock()
  311. return err
  312. }
  313. // todo: check current state existing
  314. n, err := c.startNewNode(false, req.ListenAddr, req.RemoteAddrs[0], req.Secret, req.CACertHash, req.Manager)
  315. if err != nil {
  316. c.Unlock()
  317. return err
  318. }
  319. c.Unlock()
  320. certificateRequested := n.CertificateRequested()
  321. for {
  322. select {
  323. case <-certificateRequested:
  324. if n.NodeMembership() == swarmapi.NodeMembershipPending {
  325. return fmt.Errorf("Your node is in the process of joining the cluster but needs to be accepted by existing cluster member.\nTo accept this node into cluster run \"docker node accept %v\" in an existing cluster manager. Use \"docker info\" command to see the current Swarm status of your node.", n.NodeID())
  326. }
  327. certificateRequested = nil
  328. case <-time.After(swarmConnectTimeout):
  329. // attempt to connect will continue in background, also reconnecting
  330. go c.reconnectOnFailure(n)
  331. return ErrSwarmJoinTimeoutReached
  332. case <-n.Ready():
  333. go c.reconnectOnFailure(n)
  334. return nil
  335. case <-n.done:
  336. c.RLock()
  337. defer c.RUnlock()
  338. return c.err
  339. }
  340. }
  341. }
  342. // stopNode is a helper that stops the active c.node and waits until it has
  343. // shut down. Call while keeping the cluster lock.
  344. func (c *Cluster) stopNode() error {
  345. if c.node == nil {
  346. return nil
  347. }
  348. c.stop = true
  349. if c.cancelDelay != nil {
  350. c.cancelDelay()
  351. c.cancelDelay = nil
  352. }
  353. node := c.node
  354. ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
  355. defer cancel()
  356. // TODO: can't hold lock on stop because it calls back to network
  357. c.Unlock()
  358. defer c.Lock()
  359. if err := node.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") {
  360. return err
  361. }
  362. <-node.done
  363. return nil
  364. }
  365. // Leave shuts down Cluster and removes current state.
  366. func (c *Cluster) Leave(force bool) error {
  367. c.Lock()
  368. node := c.node
  369. if node == nil {
  370. c.Unlock()
  371. return ErrNoSwarm
  372. }
  373. if node.Manager() != nil && !force {
  374. msg := "You are attempting to leave cluster on a node that is participating as a manager. "
  375. if c.isActiveManager() {
  376. active, reachable, unreachable, err := c.managerStats()
  377. if err == nil {
  378. if active && reachable-2 <= unreachable {
  379. if reachable == 1 && unreachable == 0 {
  380. msg += "Removing the last manager will erase all current state of the cluster. Use `--force` to ignore this message. "
  381. c.Unlock()
  382. return fmt.Errorf(msg)
  383. }
  384. msg += fmt.Sprintf("Leaving the cluster will leave you with %v managers out of %v. This means Raft quorum will be lost and your cluster will become inaccessible. ", reachable-1, reachable+unreachable)
  385. }
  386. }
  387. } else {
  388. msg += "Doing so may lose the consensus of your cluster. "
  389. }
  390. msg += "The only way to restore a cluster that has lost consensus is to reinitialize it with `--force-new-cluster`. Use `--force` to ignore this message."
  391. c.Unlock()
  392. return fmt.Errorf(msg)
  393. }
  394. if err := c.stopNode(); err != nil {
  395. c.Unlock()
  396. return err
  397. }
  398. c.Unlock()
  399. if nodeID := node.NodeID(); nodeID != "" {
  400. for _, id := range c.config.Backend.ListContainersForNode(nodeID) {
  401. if err := c.config.Backend.ContainerRm(id, &apitypes.ContainerRmConfig{ForceRemove: true}); err != nil {
  402. logrus.Errorf("error removing %v: %v", id, err)
  403. }
  404. }
  405. }
  406. c.configEvent <- struct{}{}
  407. // todo: cleanup optional?
  408. if err := c.clearState(); err != nil {
  409. return err
  410. }
  411. return nil
  412. }
  413. func (c *Cluster) clearState() error {
  414. // todo: backup this data instead of removing?
  415. if err := os.RemoveAll(c.root); err != nil {
  416. return err
  417. }
  418. if err := os.MkdirAll(c.root, 0700); err != nil {
  419. return err
  420. }
  421. c.config.Backend.SetClusterProvider(nil)
  422. return nil
  423. }
  424. func (c *Cluster) getRequestContext() context.Context { // TODO: not needed when requests don't block on qourum lost
  425. ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
  426. return ctx
  427. }
  428. // Inspect retrieves the configuration properties of a managed swarm cluster.
  429. func (c *Cluster) Inspect() (types.Swarm, error) {
  430. c.RLock()
  431. defer c.RUnlock()
  432. if !c.isActiveManager() {
  433. return types.Swarm{}, c.errNoManager()
  434. }
  435. swarm, err := getSwarm(c.getRequestContext(), c.client)
  436. if err != nil {
  437. return types.Swarm{}, err
  438. }
  439. if err != nil {
  440. return types.Swarm{}, err
  441. }
  442. return convert.SwarmFromGRPC(*swarm), nil
  443. }
  444. // Update updates configuration of a managed swarm cluster.
  445. func (c *Cluster) Update(version uint64, spec types.Spec) error {
  446. c.RLock()
  447. defer c.RUnlock()
  448. if !c.isActiveManager() {
  449. return c.errNoManager()
  450. }
  451. swarm, err := getSwarm(c.getRequestContext(), c.client)
  452. if err != nil {
  453. return err
  454. }
  455. swarmSpec, err := convert.SwarmSpecToGRPCandMerge(spec, &swarm.Spec)
  456. if err != nil {
  457. return err
  458. }
  459. _, err = c.client.UpdateCluster(
  460. c.getRequestContext(),
  461. &swarmapi.UpdateClusterRequest{
  462. ClusterID: swarm.ID,
  463. Spec: &swarmSpec,
  464. ClusterVersion: &swarmapi.Version{
  465. Index: version,
  466. },
  467. },
  468. )
  469. return err
  470. }
  471. // IsManager returns true if Cluster is participating as a manager.
  472. func (c *Cluster) IsManager() bool {
  473. c.RLock()
  474. defer c.RUnlock()
  475. return c.isActiveManager()
  476. }
  477. // IsAgent returns true if Cluster is participating as a worker/agent.
  478. func (c *Cluster) IsAgent() bool {
  479. c.RLock()
  480. defer c.RUnlock()
  481. return c.node != nil && c.ready
  482. }
  483. // GetListenAddress returns the listening address for current manager's
  484. // consensus and dispatcher APIs.
  485. func (c *Cluster) GetListenAddress() string {
  486. c.RLock()
  487. defer c.RUnlock()
  488. if c.isActiveManager() {
  489. return c.listenAddr
  490. }
  491. return ""
  492. }
  493. // GetRemoteAddress returns a known advertise address of a remote manager if
  494. // available.
  495. // todo: change to array/connect with info
  496. func (c *Cluster) GetRemoteAddress() string {
  497. c.RLock()
  498. defer c.RUnlock()
  499. return c.getRemoteAddress()
  500. }
  501. func (c *Cluster) getRemoteAddress() string {
  502. if c.node == nil {
  503. return ""
  504. }
  505. nodeID := c.node.NodeID()
  506. for _, r := range c.node.Remotes() {
  507. if r.NodeID != nodeID {
  508. return r.Addr
  509. }
  510. }
  511. return ""
  512. }
  513. // ListenClusterEvents returns a channel that receives messages on cluster
  514. // participation changes.
  515. // todo: make cancelable and accessible to multiple callers
  516. func (c *Cluster) ListenClusterEvents() <-chan struct{} {
  517. return c.configEvent
  518. }
  519. // Info returns information about the current cluster state.
  520. func (c *Cluster) Info() types.Info {
  521. var info types.Info
  522. c.RLock()
  523. defer c.RUnlock()
  524. if c.node == nil {
  525. info.LocalNodeState = types.LocalNodeStateInactive
  526. if c.cancelDelay != nil {
  527. info.LocalNodeState = types.LocalNodeStateError
  528. }
  529. } else {
  530. info.LocalNodeState = types.LocalNodeStatePending
  531. if c.ready == true {
  532. info.LocalNodeState = types.LocalNodeStateActive
  533. }
  534. }
  535. if c.err != nil {
  536. info.Error = c.err.Error()
  537. }
  538. if c.isActiveManager() {
  539. info.ControlAvailable = true
  540. if r, err := c.client.ListNodes(c.getRequestContext(), &swarmapi.ListNodesRequest{}); err == nil {
  541. info.Nodes = len(r.Nodes)
  542. for _, n := range r.Nodes {
  543. if n.ManagerStatus != nil {
  544. info.Managers = info.Managers + 1
  545. }
  546. }
  547. }
  548. if swarm, err := getSwarm(c.getRequestContext(), c.client); err == nil && swarm != nil {
  549. info.CACertHash = swarm.RootCA.CACertHash
  550. }
  551. }
  552. if c.node != nil {
  553. for _, r := range c.node.Remotes() {
  554. info.RemoteManagers = append(info.RemoteManagers, types.Peer{NodeID: r.NodeID, Addr: r.Addr})
  555. }
  556. info.NodeID = c.node.NodeID()
  557. }
  558. return info
  559. }
  560. // isActiveManager should not be called without a read lock
  561. func (c *Cluster) isActiveManager() bool {
  562. return c.node != nil && c.conn != nil
  563. }
  564. // errNoManager returns error describing why manager commands can't be used.
  565. // Call with read lock.
  566. func (c *Cluster) errNoManager() error {
  567. if c.node == nil {
  568. return fmt.Errorf("This node is not a Swarm manager. Use \"docker swarm init\" or \"docker swarm join --manager\" to connect this node to Swarm and try again.")
  569. }
  570. if c.node.Manager() != nil {
  571. return fmt.Errorf("This node is not a Swarm manager. Manager is being prepared or has trouble connecting to the cluster.")
  572. }
  573. return fmt.Errorf("This node is not a Swarm manager. Worker nodes can't be used to view or modify cluster state. Please run this command on a manager node or promote the current node to a manager.")
  574. }
  575. // GetServices returns all services of a managed swarm cluster.
  576. func (c *Cluster) GetServices(options apitypes.ServiceListOptions) ([]types.Service, error) {
  577. c.RLock()
  578. defer c.RUnlock()
  579. if !c.isActiveManager() {
  580. return nil, c.errNoManager()
  581. }
  582. filters, err := newListServicesFilters(options.Filter)
  583. if err != nil {
  584. return nil, err
  585. }
  586. r, err := c.client.ListServices(
  587. c.getRequestContext(),
  588. &swarmapi.ListServicesRequest{Filters: filters})
  589. if err != nil {
  590. return nil, err
  591. }
  592. var services []types.Service
  593. for _, service := range r.Services {
  594. services = append(services, convert.ServiceFromGRPC(*service))
  595. }
  596. return services, nil
  597. }
  598. // CreateService creates a new service in a managed swarm cluster.
  599. func (c *Cluster) CreateService(s types.ServiceSpec, encodedAuth string) (string, error) {
  600. c.RLock()
  601. defer c.RUnlock()
  602. if !c.isActiveManager() {
  603. return "", c.errNoManager()
  604. }
  605. ctx := c.getRequestContext()
  606. err := populateNetworkID(ctx, c.client, &s)
  607. if err != nil {
  608. return "", err
  609. }
  610. serviceSpec, err := convert.ServiceSpecToGRPC(s)
  611. if err != nil {
  612. return "", err
  613. }
  614. if encodedAuth != "" {
  615. ctnr := serviceSpec.Task.GetContainer()
  616. if ctnr == nil {
  617. return "", fmt.Errorf("service does not use container tasks")
  618. }
  619. ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
  620. }
  621. r, err := c.client.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
  622. if err != nil {
  623. return "", err
  624. }
  625. return r.Service.ID, nil
  626. }
  627. // GetService returns a service based on an ID or name.
  628. func (c *Cluster) GetService(input string) (types.Service, error) {
  629. c.RLock()
  630. defer c.RUnlock()
  631. if !c.isActiveManager() {
  632. return types.Service{}, c.errNoManager()
  633. }
  634. service, err := getService(c.getRequestContext(), c.client, input)
  635. if err != nil {
  636. return types.Service{}, err
  637. }
  638. return convert.ServiceFromGRPC(*service), nil
  639. }
  640. // UpdateService updates existing service to match new properties.
  641. func (c *Cluster) UpdateService(serviceID string, version uint64, spec types.ServiceSpec, encodedAuth string) error {
  642. c.RLock()
  643. defer c.RUnlock()
  644. if !c.isActiveManager() {
  645. return c.errNoManager()
  646. }
  647. serviceSpec, err := convert.ServiceSpecToGRPC(spec)
  648. if err != nil {
  649. return err
  650. }
  651. if encodedAuth != "" {
  652. ctnr := serviceSpec.Task.GetContainer()
  653. if ctnr == nil {
  654. return fmt.Errorf("service does not use container tasks")
  655. }
  656. ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
  657. } else {
  658. // this is needed because if the encodedAuth isn't being updated then we
  659. // shouldn't lose it, and continue to use the one that was already present
  660. currentService, err := getService(c.getRequestContext(), c.client, serviceID)
  661. if err != nil {
  662. return err
  663. }
  664. ctnr := currentService.Spec.Task.GetContainer()
  665. if ctnr == nil {
  666. return fmt.Errorf("service does not use container tasks")
  667. }
  668. serviceSpec.Task.GetContainer().PullOptions = ctnr.PullOptions
  669. }
  670. _, err = c.client.UpdateService(
  671. c.getRequestContext(),
  672. &swarmapi.UpdateServiceRequest{
  673. ServiceID: serviceID,
  674. Spec: &serviceSpec,
  675. ServiceVersion: &swarmapi.Version{
  676. Index: version,
  677. },
  678. },
  679. )
  680. return err
  681. }
  682. // RemoveService removes a service from a managed swarm cluster.
  683. func (c *Cluster) RemoveService(input string) error {
  684. c.RLock()
  685. defer c.RUnlock()
  686. if !c.isActiveManager() {
  687. return c.errNoManager()
  688. }
  689. service, err := getService(c.getRequestContext(), c.client, input)
  690. if err != nil {
  691. return err
  692. }
  693. if _, err := c.client.RemoveService(c.getRequestContext(), &swarmapi.RemoveServiceRequest{ServiceID: service.ID}); err != nil {
  694. return err
  695. }
  696. return nil
  697. }
  698. // GetNodes returns a list of all nodes known to a cluster.
  699. func (c *Cluster) GetNodes(options apitypes.NodeListOptions) ([]types.Node, error) {
  700. c.RLock()
  701. defer c.RUnlock()
  702. if !c.isActiveManager() {
  703. return nil, c.errNoManager()
  704. }
  705. filters, err := newListNodesFilters(options.Filter)
  706. if err != nil {
  707. return nil, err
  708. }
  709. r, err := c.client.ListNodes(
  710. c.getRequestContext(),
  711. &swarmapi.ListNodesRequest{Filters: filters})
  712. if err != nil {
  713. return nil, err
  714. }
  715. nodes := []types.Node{}
  716. for _, node := range r.Nodes {
  717. nodes = append(nodes, convert.NodeFromGRPC(*node))
  718. }
  719. return nodes, nil
  720. }
  721. // GetNode returns a node based on an ID or name.
  722. func (c *Cluster) GetNode(input string) (types.Node, error) {
  723. c.RLock()
  724. defer c.RUnlock()
  725. if !c.isActiveManager() {
  726. return types.Node{}, c.errNoManager()
  727. }
  728. node, err := getNode(c.getRequestContext(), c.client, input)
  729. if err != nil {
  730. return types.Node{}, err
  731. }
  732. return convert.NodeFromGRPC(*node), nil
  733. }
  734. // UpdateNode updates existing nodes properties.
  735. func (c *Cluster) UpdateNode(nodeID string, version uint64, spec types.NodeSpec) error {
  736. c.RLock()
  737. defer c.RUnlock()
  738. if !c.isActiveManager() {
  739. return c.errNoManager()
  740. }
  741. nodeSpec, err := convert.NodeSpecToGRPC(spec)
  742. if err != nil {
  743. return err
  744. }
  745. _, err = c.client.UpdateNode(
  746. c.getRequestContext(),
  747. &swarmapi.UpdateNodeRequest{
  748. NodeID: nodeID,
  749. Spec: &nodeSpec,
  750. NodeVersion: &swarmapi.Version{
  751. Index: version,
  752. },
  753. },
  754. )
  755. return err
  756. }
  757. // RemoveNode removes a node from a cluster
  758. func (c *Cluster) RemoveNode(input string) error {
  759. c.RLock()
  760. defer c.RUnlock()
  761. if !c.isActiveManager() {
  762. return c.errNoManager()
  763. }
  764. ctx := c.getRequestContext()
  765. node, err := getNode(ctx, c.client, input)
  766. if err != nil {
  767. return err
  768. }
  769. if _, err := c.client.RemoveNode(ctx, &swarmapi.RemoveNodeRequest{NodeID: node.ID}); err != nil {
  770. return err
  771. }
  772. return nil
  773. }
  774. // GetTasks returns a list of tasks matching the filter options.
  775. func (c *Cluster) GetTasks(options apitypes.TaskListOptions) ([]types.Task, error) {
  776. c.RLock()
  777. defer c.RUnlock()
  778. if !c.isActiveManager() {
  779. return nil, c.errNoManager()
  780. }
  781. filters, err := newListTasksFilters(options.Filter)
  782. if err != nil {
  783. return nil, err
  784. }
  785. r, err := c.client.ListTasks(
  786. c.getRequestContext(),
  787. &swarmapi.ListTasksRequest{Filters: filters})
  788. if err != nil {
  789. return nil, err
  790. }
  791. tasks := []types.Task{}
  792. for _, task := range r.Tasks {
  793. tasks = append(tasks, convert.TaskFromGRPC(*task))
  794. }
  795. return tasks, nil
  796. }
  797. // GetTask returns a task by an ID.
  798. func (c *Cluster) GetTask(input string) (types.Task, error) {
  799. c.RLock()
  800. defer c.RUnlock()
  801. if !c.isActiveManager() {
  802. return types.Task{}, c.errNoManager()
  803. }
  804. task, err := getTask(c.getRequestContext(), c.client, input)
  805. if err != nil {
  806. return types.Task{}, err
  807. }
  808. return convert.TaskFromGRPC(*task), nil
  809. }
  810. // GetNetwork returns a cluster network by an ID.
  811. func (c *Cluster) GetNetwork(input string) (apitypes.NetworkResource, error) {
  812. c.RLock()
  813. defer c.RUnlock()
  814. if !c.isActiveManager() {
  815. return apitypes.NetworkResource{}, c.errNoManager()
  816. }
  817. network, err := getNetwork(c.getRequestContext(), c.client, input)
  818. if err != nil {
  819. return apitypes.NetworkResource{}, err
  820. }
  821. return convert.BasicNetworkFromGRPC(*network), nil
  822. }
  823. // GetNetworks returns all current cluster managed networks.
  824. func (c *Cluster) GetNetworks() ([]apitypes.NetworkResource, error) {
  825. c.RLock()
  826. defer c.RUnlock()
  827. if !c.isActiveManager() {
  828. return nil, c.errNoManager()
  829. }
  830. r, err := c.client.ListNetworks(c.getRequestContext(), &swarmapi.ListNetworksRequest{})
  831. if err != nil {
  832. return nil, err
  833. }
  834. var networks []apitypes.NetworkResource
  835. for _, network := range r.Networks {
  836. networks = append(networks, convert.BasicNetworkFromGRPC(*network))
  837. }
  838. return networks, nil
  839. }
  840. // CreateNetwork creates a new cluster managed network.
  841. func (c *Cluster) CreateNetwork(s apitypes.NetworkCreateRequest) (string, error) {
  842. c.RLock()
  843. defer c.RUnlock()
  844. if !c.isActiveManager() {
  845. return "", c.errNoManager()
  846. }
  847. if runconfig.IsPreDefinedNetwork(s.Name) {
  848. err := fmt.Errorf("%s is a pre-defined network and cannot be created", s.Name)
  849. return "", errors.NewRequestForbiddenError(err)
  850. }
  851. networkSpec := convert.BasicNetworkCreateToGRPC(s)
  852. r, err := c.client.CreateNetwork(c.getRequestContext(), &swarmapi.CreateNetworkRequest{Spec: &networkSpec})
  853. if err != nil {
  854. return "", err
  855. }
  856. return r.Network.ID, nil
  857. }
  858. // RemoveNetwork removes a cluster network.
  859. func (c *Cluster) RemoveNetwork(input string) error {
  860. c.RLock()
  861. defer c.RUnlock()
  862. if !c.isActiveManager() {
  863. return c.errNoManager()
  864. }
  865. network, err := getNetwork(c.getRequestContext(), c.client, input)
  866. if err != nil {
  867. return err
  868. }
  869. if _, err := c.client.RemoveNetwork(c.getRequestContext(), &swarmapi.RemoveNetworkRequest{NetworkID: network.ID}); err != nil {
  870. return err
  871. }
  872. return nil
  873. }
  874. func populateNetworkID(ctx context.Context, c swarmapi.ControlClient, s *types.ServiceSpec) error {
  875. for i, n := range s.Networks {
  876. apiNetwork, err := getNetwork(ctx, c, n.Target)
  877. if err != nil {
  878. return err
  879. }
  880. s.Networks[i].Target = apiNetwork.ID
  881. }
  882. return nil
  883. }
  884. func getNetwork(ctx context.Context, c swarmapi.ControlClient, input string) (*swarmapi.Network, error) {
  885. // GetNetwork to match via full ID.
  886. rg, err := c.GetNetwork(ctx, &swarmapi.GetNetworkRequest{NetworkID: input})
  887. if err != nil {
  888. // If any error (including NotFound), ListNetworks to match via ID prefix and full name.
  889. rl, err := c.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: &swarmapi.ListNetworksRequest_Filters{Names: []string{input}}})
  890. if err != nil || len(rl.Networks) == 0 {
  891. rl, err = c.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: &swarmapi.ListNetworksRequest_Filters{IDPrefixes: []string{input}}})
  892. }
  893. if err != nil {
  894. return nil, err
  895. }
  896. if len(rl.Networks) == 0 {
  897. return nil, fmt.Errorf("network %s not found", input)
  898. }
  899. if l := len(rl.Networks); l > 1 {
  900. return nil, fmt.Errorf("network %s is ambiguous (%d matches found)", input, l)
  901. }
  902. return rl.Networks[0], nil
  903. }
  904. return rg.Network, nil
  905. }
  906. // Cleanup stops active swarm node. This is run before daemon shutdown.
  907. func (c *Cluster) Cleanup() {
  908. c.Lock()
  909. node := c.node
  910. if node == nil {
  911. c.Unlock()
  912. return
  913. }
  914. defer c.Unlock()
  915. if c.isActiveManager() {
  916. active, reachable, unreachable, err := c.managerStats()
  917. if err == nil {
  918. singlenode := active && reachable == 1 && unreachable == 0
  919. if active && !singlenode && reachable-2 <= unreachable {
  920. logrus.Errorf("Leaving cluster with %v managers left out of %v. Raft quorum will be lost.", reachable-1, reachable+unreachable)
  921. }
  922. }
  923. }
  924. c.stopNode()
  925. }
  926. func (c *Cluster) managerStats() (current bool, reachable int, unreachable int, err error) {
  927. ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
  928. nodes, err := c.client.ListNodes(ctx, &swarmapi.ListNodesRequest{})
  929. if err != nil {
  930. return false, 0, 0, err
  931. }
  932. for _, n := range nodes.Nodes {
  933. if n.ManagerStatus != nil {
  934. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_REACHABLE {
  935. reachable++
  936. if n.ID == c.node.NodeID() {
  937. current = true
  938. }
  939. }
  940. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_UNREACHABLE {
  941. unreachable++
  942. }
  943. }
  944. }
  945. return
  946. }
  947. func validateAndSanitizeInitRequest(req *types.InitRequest) error {
  948. var err error
  949. req.ListenAddr, err = validateAddr(req.ListenAddr)
  950. if err != nil {
  951. return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
  952. }
  953. spec := &req.Spec
  954. // provide sane defaults instead of erroring
  955. if spec.Name == "" {
  956. spec.Name = "default"
  957. }
  958. if spec.Raft.SnapshotInterval == 0 {
  959. spec.Raft.SnapshotInterval = defaultSpec.Raft.SnapshotInterval
  960. }
  961. if spec.Raft.LogEntriesForSlowFollowers == 0 {
  962. spec.Raft.LogEntriesForSlowFollowers = defaultSpec.Raft.LogEntriesForSlowFollowers
  963. }
  964. if spec.Raft.ElectionTick == 0 {
  965. spec.Raft.ElectionTick = defaultSpec.Raft.ElectionTick
  966. }
  967. if spec.Raft.HeartbeatTick == 0 {
  968. spec.Raft.HeartbeatTick = defaultSpec.Raft.HeartbeatTick
  969. }
  970. if spec.Dispatcher.HeartbeatPeriod == 0 {
  971. spec.Dispatcher.HeartbeatPeriod = defaultSpec.Dispatcher.HeartbeatPeriod
  972. }
  973. if spec.CAConfig.NodeCertExpiry == 0 {
  974. spec.CAConfig.NodeCertExpiry = defaultSpec.CAConfig.NodeCertExpiry
  975. }
  976. if spec.Orchestration.TaskHistoryRetentionLimit == 0 {
  977. spec.Orchestration.TaskHistoryRetentionLimit = defaultSpec.Orchestration.TaskHistoryRetentionLimit
  978. }
  979. return nil
  980. }
  981. func validateAndSanitizeJoinRequest(req *types.JoinRequest) error {
  982. var err error
  983. req.ListenAddr, err = validateAddr(req.ListenAddr)
  984. if err != nil {
  985. return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
  986. }
  987. if len(req.RemoteAddrs) == 0 {
  988. return fmt.Errorf("at least 1 RemoteAddr is required to join")
  989. }
  990. for i := range req.RemoteAddrs {
  991. req.RemoteAddrs[i], err = validateAddr(req.RemoteAddrs[i])
  992. if err != nil {
  993. return fmt.Errorf("invalid remoteAddr %q: %v", req.RemoteAddrs[i], err)
  994. }
  995. }
  996. if req.CACertHash != "" {
  997. if _, err := digest.ParseDigest(req.CACertHash); err != nil {
  998. return fmt.Errorf("invalid CACertHash %q, %v", req.CACertHash, err)
  999. }
  1000. }
  1001. return nil
  1002. }
  1003. func validateAddr(addr string) (string, error) {
  1004. if addr == "" {
  1005. return addr, fmt.Errorf("invalid empty address")
  1006. }
  1007. newaddr, err := opts.ParseTCPAddr(addr, defaultAddr)
  1008. if err != nil {
  1009. return addr, nil
  1010. }
  1011. return strings.TrimPrefix(newaddr, "tcp://"), nil
  1012. }
  1013. func errSwarmExists(node *node) error {
  1014. if node.NodeMembership() != swarmapi.NodeMembershipAccepted {
  1015. return ErrPendingSwarmExists
  1016. }
  1017. return ErrSwarmExists
  1018. }
  1019. func initClusterSpec(node *node, spec types.Spec) error {
  1020. ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
  1021. for conn := range node.ListenControlSocket(ctx) {
  1022. if ctx.Err() != nil {
  1023. return ctx.Err()
  1024. }
  1025. if conn != nil {
  1026. client := swarmapi.NewControlClient(conn)
  1027. var cluster *swarmapi.Cluster
  1028. for i := 0; ; i++ {
  1029. lcr, err := client.ListClusters(ctx, &swarmapi.ListClustersRequest{})
  1030. if err != nil {
  1031. return fmt.Errorf("error on listing clusters: %v", err)
  1032. }
  1033. if len(lcr.Clusters) == 0 {
  1034. if i < 10 {
  1035. time.Sleep(200 * time.Millisecond)
  1036. continue
  1037. }
  1038. return fmt.Errorf("empty list of clusters was returned")
  1039. }
  1040. cluster = lcr.Clusters[0]
  1041. break
  1042. }
  1043. newspec, err := convert.SwarmSpecToGRPCandMerge(spec, &cluster.Spec)
  1044. if err != nil {
  1045. return fmt.Errorf("error updating cluster settings: %v", err)
  1046. }
  1047. _, err = client.UpdateCluster(ctx, &swarmapi.UpdateClusterRequest{
  1048. ClusterID: cluster.ID,
  1049. ClusterVersion: &cluster.Meta.Version,
  1050. Spec: &newspec,
  1051. })
  1052. if err != nil {
  1053. return fmt.Errorf("error updating cluster settings: %v", err)
  1054. }
  1055. return nil
  1056. }
  1057. }
  1058. return ctx.Err()
  1059. }