cluster.go 31 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265
  1. package cluster
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io/ioutil"
  6. "os"
  7. "path/filepath"
  8. "strings"
  9. "sync"
  10. "time"
  11. "google.golang.org/grpc"
  12. "github.com/Sirupsen/logrus"
  13. "github.com/docker/docker/daemon/cluster/convert"
  14. executorpkg "github.com/docker/docker/daemon/cluster/executor"
  15. "github.com/docker/docker/daemon/cluster/executor/container"
  16. "github.com/docker/docker/errors"
  17. "github.com/docker/docker/opts"
  18. "github.com/docker/docker/pkg/ioutils"
  19. "github.com/docker/docker/runconfig"
  20. apitypes "github.com/docker/engine-api/types"
  21. "github.com/docker/engine-api/types/filters"
  22. types "github.com/docker/engine-api/types/swarm"
  23. swarmagent "github.com/docker/swarmkit/agent"
  24. swarmapi "github.com/docker/swarmkit/api"
  25. "golang.org/x/net/context"
  26. )
  27. const swarmDirName = "swarm"
  28. const controlSocket = "control.sock"
  29. const swarmConnectTimeout = 20 * time.Second
  30. const swarmRequestTimeout = 20 * time.Second
  31. const stateFile = "docker-state.json"
  32. const defaultAddr = "0.0.0.0:2377"
  33. const (
  34. initialReconnectDelay = 100 * time.Millisecond
  35. maxReconnectDelay = 30 * time.Second
  36. )
  37. // ErrNoSwarm is returned on leaving a cluster that was never initialized
  38. var ErrNoSwarm = fmt.Errorf("This node is not part of swarm")
  39. // ErrSwarmExists is returned on initialize or join request for a cluster that has already been activated
  40. var ErrSwarmExists = fmt.Errorf("This node is already part of a swarm cluster. Use \"docker swarm leave\" to leave this cluster and join another one.")
  41. // ErrPendingSwarmExists is returned on initialize or join request for a cluster that is already processing a similar request but has not succeeded yet.
  42. var ErrPendingSwarmExists = fmt.Errorf("This node is processing an existing join request that has not succeeded yet. Use \"docker swarm leave\" to cancel the current request.")
  43. // ErrSwarmJoinTimeoutReached is returned when cluster join could not complete before timeout was reached.
  44. var ErrSwarmJoinTimeoutReached = fmt.Errorf("Timeout was reached before node was joined. Attempt to join the cluster will continue in the background. Use \"docker info\" command to see the current swarm status of your node.")
  45. // defaultSpec contains some sane defaults if cluster options are missing on init
  46. var defaultSpec = types.Spec{
  47. Raft: types.RaftConfig{
  48. SnapshotInterval: 10000,
  49. KeepOldSnapshots: 0,
  50. LogEntriesForSlowFollowers: 500,
  51. HeartbeatTick: 1,
  52. ElectionTick: 3,
  53. },
  54. CAConfig: types.CAConfig{
  55. NodeCertExpiry: 90 * 24 * time.Hour,
  56. },
  57. Dispatcher: types.DispatcherConfig{
  58. HeartbeatPeriod: uint64((5 * time.Second).Nanoseconds()),
  59. },
  60. Orchestration: types.OrchestrationConfig{
  61. TaskHistoryRetentionLimit: 10,
  62. },
  63. }
  64. type state struct {
  65. ListenAddr string
  66. }
  67. // Config provides values for Cluster.
  68. type Config struct {
  69. Root string
  70. Name string
  71. Backend executorpkg.Backend
  72. }
  73. // Cluster provides capabilities to participate in a cluster as a worker or a
  74. // manager.
  75. type Cluster struct {
  76. sync.RWMutex
  77. *node
  78. root string
  79. config Config
  80. configEvent chan struct{} // todo: make this array and goroutine safe
  81. listenAddr string
  82. stop bool
  83. err error
  84. cancelDelay func()
  85. }
  86. type node struct {
  87. *swarmagent.Node
  88. done chan struct{}
  89. ready bool
  90. conn *grpc.ClientConn
  91. client swarmapi.ControlClient
  92. reconnectDelay time.Duration
  93. }
  94. // New creates a new Cluster instance using provided config.
  95. func New(config Config) (*Cluster, error) {
  96. root := filepath.Join(config.Root, swarmDirName)
  97. if err := os.MkdirAll(root, 0700); err != nil {
  98. return nil, err
  99. }
  100. c := &Cluster{
  101. root: root,
  102. config: config,
  103. configEvent: make(chan struct{}, 10),
  104. }
  105. st, err := c.loadState()
  106. if err != nil {
  107. if os.IsNotExist(err) {
  108. return c, nil
  109. }
  110. return nil, err
  111. }
  112. n, err := c.startNewNode(false, st.ListenAddr, "", "")
  113. if err != nil {
  114. return nil, err
  115. }
  116. select {
  117. case <-time.After(swarmConnectTimeout):
  118. logrus.Errorf("swarm component could not be started before timeout was reached")
  119. case <-n.Ready():
  120. case <-n.done:
  121. return nil, fmt.Errorf("swarm component could not be started: %v", c.err)
  122. }
  123. go c.reconnectOnFailure(n)
  124. return c, nil
  125. }
  126. func (c *Cluster) loadState() (*state, error) {
  127. dt, err := ioutil.ReadFile(filepath.Join(c.root, stateFile))
  128. if err != nil {
  129. return nil, err
  130. }
  131. // missing certificate means no actual state to restore from
  132. if _, err := os.Stat(filepath.Join(c.root, "certificates/swarm-node.crt")); err != nil {
  133. if os.IsNotExist(err) {
  134. c.clearState()
  135. }
  136. return nil, err
  137. }
  138. var st state
  139. if err := json.Unmarshal(dt, &st); err != nil {
  140. return nil, err
  141. }
  142. return &st, nil
  143. }
  144. func (c *Cluster) saveState() error {
  145. dt, err := json.Marshal(state{ListenAddr: c.listenAddr})
  146. if err != nil {
  147. return err
  148. }
  149. return ioutils.AtomicWriteFile(filepath.Join(c.root, stateFile), dt, 0600)
  150. }
  151. func (c *Cluster) reconnectOnFailure(n *node) {
  152. for {
  153. <-n.done
  154. c.Lock()
  155. if c.stop || c.node != nil {
  156. c.Unlock()
  157. return
  158. }
  159. n.reconnectDelay *= 2
  160. if n.reconnectDelay > maxReconnectDelay {
  161. n.reconnectDelay = maxReconnectDelay
  162. }
  163. logrus.Warnf("Restarting swarm in %.2f seconds", n.reconnectDelay.Seconds())
  164. delayCtx, cancel := context.WithTimeout(context.Background(), n.reconnectDelay)
  165. c.cancelDelay = cancel
  166. c.Unlock()
  167. <-delayCtx.Done()
  168. if delayCtx.Err() != context.DeadlineExceeded {
  169. return
  170. }
  171. c.Lock()
  172. if c.node != nil {
  173. c.Unlock()
  174. return
  175. }
  176. var err error
  177. n, err = c.startNewNode(false, c.listenAddr, c.getRemoteAddress(), "")
  178. if err != nil {
  179. c.err = err
  180. close(n.done)
  181. }
  182. c.Unlock()
  183. }
  184. }
  185. func (c *Cluster) startNewNode(forceNewCluster bool, listenAddr, joinAddr, joinToken string) (*node, error) {
  186. if err := c.config.Backend.IsSwarmCompatible(); err != nil {
  187. return nil, err
  188. }
  189. c.node = nil
  190. c.cancelDelay = nil
  191. c.stop = false
  192. n, err := swarmagent.NewNode(&swarmagent.NodeConfig{
  193. Hostname: c.config.Name,
  194. ForceNewCluster: forceNewCluster,
  195. ListenControlAPI: filepath.Join(c.root, controlSocket),
  196. ListenRemoteAPI: listenAddr,
  197. JoinAddr: joinAddr,
  198. StateDir: c.root,
  199. JoinToken: joinToken,
  200. Executor: container.NewExecutor(c.config.Backend),
  201. HeartbeatTick: 1,
  202. ElectionTick: 3,
  203. })
  204. if err != nil {
  205. return nil, err
  206. }
  207. ctx := context.Background()
  208. if err := n.Start(ctx); err != nil {
  209. return nil, err
  210. }
  211. node := &node{
  212. Node: n,
  213. done: make(chan struct{}),
  214. reconnectDelay: initialReconnectDelay,
  215. }
  216. c.node = node
  217. c.listenAddr = listenAddr
  218. c.saveState()
  219. c.config.Backend.SetClusterProvider(c)
  220. go func() {
  221. err := n.Err(ctx)
  222. if err != nil {
  223. logrus.Errorf("cluster exited with error: %v", err)
  224. }
  225. c.Lock()
  226. c.node = nil
  227. c.err = err
  228. c.Unlock()
  229. close(node.done)
  230. }()
  231. go func() {
  232. select {
  233. case <-n.Ready():
  234. c.Lock()
  235. node.ready = true
  236. c.err = nil
  237. c.Unlock()
  238. case <-ctx.Done():
  239. }
  240. c.configEvent <- struct{}{}
  241. }()
  242. go func() {
  243. for conn := range n.ListenControlSocket(ctx) {
  244. c.Lock()
  245. if node.conn != conn {
  246. if conn == nil {
  247. node.client = nil
  248. } else {
  249. node.client = swarmapi.NewControlClient(conn)
  250. }
  251. }
  252. node.conn = conn
  253. c.Unlock()
  254. c.configEvent <- struct{}{}
  255. }
  256. }()
  257. return node, nil
  258. }
  259. // Init initializes new cluster from user provided request.
  260. func (c *Cluster) Init(req types.InitRequest) (string, error) {
  261. c.Lock()
  262. if node := c.node; node != nil {
  263. if !req.ForceNewCluster {
  264. c.Unlock()
  265. return "", ErrSwarmExists
  266. }
  267. if err := c.stopNode(); err != nil {
  268. c.Unlock()
  269. return "", err
  270. }
  271. }
  272. if err := validateAndSanitizeInitRequest(&req); err != nil {
  273. c.Unlock()
  274. return "", err
  275. }
  276. // todo: check current state existing
  277. n, err := c.startNewNode(req.ForceNewCluster, req.ListenAddr, "", "")
  278. if err != nil {
  279. c.Unlock()
  280. return "", err
  281. }
  282. c.Unlock()
  283. select {
  284. case <-n.Ready():
  285. if err := initClusterSpec(n, req.Spec); err != nil {
  286. return "", err
  287. }
  288. go c.reconnectOnFailure(n)
  289. return n.NodeID(), nil
  290. case <-n.done:
  291. c.RLock()
  292. defer c.RUnlock()
  293. if !req.ForceNewCluster { // if failure on first attempt don't keep state
  294. if err := c.clearState(); err != nil {
  295. return "", err
  296. }
  297. }
  298. return "", c.err
  299. }
  300. }
  301. // Join makes current Cluster part of an existing swarm cluster.
  302. func (c *Cluster) Join(req types.JoinRequest) error {
  303. c.Lock()
  304. if node := c.node; node != nil {
  305. c.Unlock()
  306. return ErrSwarmExists
  307. }
  308. if err := validateAndSanitizeJoinRequest(&req); err != nil {
  309. c.Unlock()
  310. return err
  311. }
  312. // todo: check current state existing
  313. n, err := c.startNewNode(false, req.ListenAddr, req.RemoteAddrs[0], req.JoinToken)
  314. if err != nil {
  315. c.Unlock()
  316. return err
  317. }
  318. c.Unlock()
  319. select {
  320. case <-time.After(swarmConnectTimeout):
  321. // attempt to connect will continue in background, also reconnecting
  322. go c.reconnectOnFailure(n)
  323. return ErrSwarmJoinTimeoutReached
  324. case <-n.Ready():
  325. go c.reconnectOnFailure(n)
  326. return nil
  327. case <-n.done:
  328. c.RLock()
  329. defer c.RUnlock()
  330. return c.err
  331. }
  332. }
  333. // stopNode is a helper that stops the active c.node and waits until it has
  334. // shut down. Call while keeping the cluster lock.
  335. func (c *Cluster) stopNode() error {
  336. if c.node == nil {
  337. return nil
  338. }
  339. c.stop = true
  340. if c.cancelDelay != nil {
  341. c.cancelDelay()
  342. c.cancelDelay = nil
  343. }
  344. node := c.node
  345. ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
  346. defer cancel()
  347. // TODO: can't hold lock on stop because it calls back to network
  348. c.Unlock()
  349. defer c.Lock()
  350. if err := node.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") {
  351. return err
  352. }
  353. <-node.done
  354. return nil
  355. }
  356. // Leave shuts down Cluster and removes current state.
  357. func (c *Cluster) Leave(force bool) error {
  358. c.Lock()
  359. node := c.node
  360. if node == nil {
  361. c.Unlock()
  362. return ErrNoSwarm
  363. }
  364. if node.Manager() != nil && !force {
  365. msg := "You are attempting to leave cluster on a node that is participating as a manager. "
  366. if c.isActiveManager() {
  367. active, reachable, unreachable, err := c.managerStats()
  368. if err == nil {
  369. if active && reachable-2 <= unreachable {
  370. if reachable == 1 && unreachable == 0 {
  371. msg += "Removing the last manager will erase all current state of the cluster. Use `--force` to ignore this message. "
  372. c.Unlock()
  373. return fmt.Errorf(msg)
  374. }
  375. msg += fmt.Sprintf("Leaving the cluster will leave you with %v managers out of %v. This means Raft quorum will be lost and your cluster will become inaccessible. ", reachable-1, reachable+unreachable)
  376. }
  377. }
  378. } else {
  379. msg += "Doing so may lose the consensus of your cluster. "
  380. }
  381. msg += "The only way to restore a cluster that has lost consensus is to reinitialize it with `--force-new-cluster`. Use `--force` to ignore this message."
  382. c.Unlock()
  383. return fmt.Errorf(msg)
  384. }
  385. if err := c.stopNode(); err != nil {
  386. c.Unlock()
  387. return err
  388. }
  389. c.Unlock()
  390. if nodeID := node.NodeID(); nodeID != "" {
  391. for _, id := range c.config.Backend.ListContainersForNode(nodeID) {
  392. if err := c.config.Backend.ContainerRm(id, &apitypes.ContainerRmConfig{ForceRemove: true}); err != nil {
  393. logrus.Errorf("error removing %v: %v", id, err)
  394. }
  395. }
  396. }
  397. c.configEvent <- struct{}{}
  398. // todo: cleanup optional?
  399. if err := c.clearState(); err != nil {
  400. return err
  401. }
  402. return nil
  403. }
  404. func (c *Cluster) clearState() error {
  405. // todo: backup this data instead of removing?
  406. if err := os.RemoveAll(c.root); err != nil {
  407. return err
  408. }
  409. if err := os.MkdirAll(c.root, 0700); err != nil {
  410. return err
  411. }
  412. c.config.Backend.SetClusterProvider(nil)
  413. return nil
  414. }
  415. func (c *Cluster) getRequestContext() (context.Context, func()) { // TODO: not needed when requests don't block on qourum lost
  416. return context.WithTimeout(context.Background(), swarmRequestTimeout)
  417. }
  418. // Inspect retrieves the configuration properties of a managed swarm cluster.
  419. func (c *Cluster) Inspect() (types.Swarm, error) {
  420. c.RLock()
  421. defer c.RUnlock()
  422. if !c.isActiveManager() {
  423. return types.Swarm{}, c.errNoManager()
  424. }
  425. ctx, cancel := c.getRequestContext()
  426. defer cancel()
  427. swarm, err := getSwarm(ctx, c.client)
  428. if err != nil {
  429. return types.Swarm{}, err
  430. }
  431. if err != nil {
  432. return types.Swarm{}, err
  433. }
  434. return convert.SwarmFromGRPC(*swarm), nil
  435. }
  436. // Update updates configuration of a managed swarm cluster.
  437. func (c *Cluster) Update(version uint64, spec types.Spec, flags types.UpdateFlags) error {
  438. c.RLock()
  439. defer c.RUnlock()
  440. if !c.isActiveManager() {
  441. return c.errNoManager()
  442. }
  443. ctx, cancel := c.getRequestContext()
  444. defer cancel()
  445. swarm, err := getSwarm(ctx, c.client)
  446. if err != nil {
  447. return err
  448. }
  449. swarmSpec, err := convert.SwarmSpecToGRPC(spec)
  450. if err != nil {
  451. return err
  452. }
  453. _, err = c.client.UpdateCluster(
  454. ctx,
  455. &swarmapi.UpdateClusterRequest{
  456. ClusterID: swarm.ID,
  457. Spec: &swarmSpec,
  458. ClusterVersion: &swarmapi.Version{
  459. Index: version,
  460. },
  461. Rotation: swarmapi.JoinTokenRotation{
  462. RotateWorkerToken: flags.RotateWorkerToken,
  463. RotateManagerToken: flags.RotateManagerToken,
  464. },
  465. },
  466. )
  467. return err
  468. }
  469. // IsManager returns true if Cluster is participating as a manager.
  470. func (c *Cluster) IsManager() bool {
  471. c.RLock()
  472. defer c.RUnlock()
  473. return c.isActiveManager()
  474. }
  475. // IsAgent returns true if Cluster is participating as a worker/agent.
  476. func (c *Cluster) IsAgent() bool {
  477. c.RLock()
  478. defer c.RUnlock()
  479. return c.node != nil && c.ready
  480. }
  481. // GetListenAddress returns the listening address for current manager's
  482. // consensus and dispatcher APIs.
  483. func (c *Cluster) GetListenAddress() string {
  484. c.RLock()
  485. defer c.RUnlock()
  486. if c.isActiveManager() {
  487. return c.listenAddr
  488. }
  489. return ""
  490. }
  491. // GetRemoteAddress returns a known advertise address of a remote manager if
  492. // available.
  493. // todo: change to array/connect with info
  494. func (c *Cluster) GetRemoteAddress() string {
  495. c.RLock()
  496. defer c.RUnlock()
  497. return c.getRemoteAddress()
  498. }
  499. func (c *Cluster) getRemoteAddress() string {
  500. if c.node == nil {
  501. return ""
  502. }
  503. nodeID := c.node.NodeID()
  504. for _, r := range c.node.Remotes() {
  505. if r.NodeID != nodeID {
  506. return r.Addr
  507. }
  508. }
  509. return ""
  510. }
  511. // ListenClusterEvents returns a channel that receives messages on cluster
  512. // participation changes.
  513. // todo: make cancelable and accessible to multiple callers
  514. func (c *Cluster) ListenClusterEvents() <-chan struct{} {
  515. return c.configEvent
  516. }
  517. // Info returns information about the current cluster state.
  518. func (c *Cluster) Info() types.Info {
  519. var info types.Info
  520. c.RLock()
  521. defer c.RUnlock()
  522. if c.node == nil {
  523. info.LocalNodeState = types.LocalNodeStateInactive
  524. if c.cancelDelay != nil {
  525. info.LocalNodeState = types.LocalNodeStateError
  526. }
  527. } else {
  528. info.LocalNodeState = types.LocalNodeStatePending
  529. if c.ready == true {
  530. info.LocalNodeState = types.LocalNodeStateActive
  531. }
  532. }
  533. if c.err != nil {
  534. info.Error = c.err.Error()
  535. }
  536. ctx, cancel := c.getRequestContext()
  537. defer cancel()
  538. if c.isActiveManager() {
  539. info.ControlAvailable = true
  540. if r, err := c.client.ListNodes(ctx, &swarmapi.ListNodesRequest{}); err == nil {
  541. info.Nodes = len(r.Nodes)
  542. for _, n := range r.Nodes {
  543. if n.ManagerStatus != nil {
  544. info.Managers = info.Managers + 1
  545. }
  546. }
  547. }
  548. }
  549. if c.node != nil {
  550. for _, r := range c.node.Remotes() {
  551. info.RemoteManagers = append(info.RemoteManagers, types.Peer{NodeID: r.NodeID, Addr: r.Addr})
  552. }
  553. info.NodeID = c.node.NodeID()
  554. }
  555. return info
  556. }
  557. // isActiveManager should not be called without a read lock
  558. func (c *Cluster) isActiveManager() bool {
  559. return c.node != nil && c.conn != nil
  560. }
  561. // errNoManager returns error describing why manager commands can't be used.
  562. // Call with read lock.
  563. func (c *Cluster) errNoManager() error {
  564. if c.node == nil {
  565. return fmt.Errorf("This node is not a swarm manager. Use \"docker swarm init\" or \"docker swarm join\" to connect this node to swarm and try again.")
  566. }
  567. if c.node.Manager() != nil {
  568. return fmt.Errorf("This node is not a swarm manager. Manager is being prepared or has trouble connecting to the cluster.")
  569. }
  570. return fmt.Errorf("This node is not a swarm manager. Worker nodes can't be used to view or modify cluster state. Please run this command on a manager node or promote the current node to a manager.")
  571. }
  572. // GetServices returns all services of a managed swarm cluster.
  573. func (c *Cluster) GetServices(options apitypes.ServiceListOptions) ([]types.Service, error) {
  574. c.RLock()
  575. defer c.RUnlock()
  576. if !c.isActiveManager() {
  577. return nil, c.errNoManager()
  578. }
  579. filters, err := newListServicesFilters(options.Filter)
  580. if err != nil {
  581. return nil, err
  582. }
  583. ctx, cancel := c.getRequestContext()
  584. defer cancel()
  585. r, err := c.client.ListServices(
  586. ctx,
  587. &swarmapi.ListServicesRequest{Filters: filters})
  588. if err != nil {
  589. return nil, err
  590. }
  591. services := []types.Service{}
  592. for _, service := range r.Services {
  593. services = append(services, convert.ServiceFromGRPC(*service))
  594. }
  595. return services, nil
  596. }
  597. // CreateService creates a new service in a managed swarm cluster.
  598. func (c *Cluster) CreateService(s types.ServiceSpec, encodedAuth string) (string, error) {
  599. c.RLock()
  600. defer c.RUnlock()
  601. if !c.isActiveManager() {
  602. return "", c.errNoManager()
  603. }
  604. ctx, cancel := c.getRequestContext()
  605. defer cancel()
  606. err := populateNetworkID(ctx, c.client, &s)
  607. if err != nil {
  608. return "", err
  609. }
  610. serviceSpec, err := convert.ServiceSpecToGRPC(s)
  611. if err != nil {
  612. return "", err
  613. }
  614. if encodedAuth != "" {
  615. ctnr := serviceSpec.Task.GetContainer()
  616. if ctnr == nil {
  617. return "", fmt.Errorf("service does not use container tasks")
  618. }
  619. ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
  620. }
  621. r, err := c.client.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
  622. if err != nil {
  623. return "", err
  624. }
  625. return r.Service.ID, nil
  626. }
  627. // GetService returns a service based on an ID or name.
  628. func (c *Cluster) GetService(input string) (types.Service, error) {
  629. c.RLock()
  630. defer c.RUnlock()
  631. if !c.isActiveManager() {
  632. return types.Service{}, c.errNoManager()
  633. }
  634. ctx, cancel := c.getRequestContext()
  635. defer cancel()
  636. service, err := getService(ctx, c.client, input)
  637. if err != nil {
  638. return types.Service{}, err
  639. }
  640. return convert.ServiceFromGRPC(*service), nil
  641. }
  642. // UpdateService updates existing service to match new properties.
  643. func (c *Cluster) UpdateService(serviceID string, version uint64, spec types.ServiceSpec, encodedAuth string) error {
  644. c.RLock()
  645. defer c.RUnlock()
  646. if !c.isActiveManager() {
  647. return c.errNoManager()
  648. }
  649. ctx, cancel := c.getRequestContext()
  650. defer cancel()
  651. err := populateNetworkID(ctx, c.client, &spec)
  652. if err != nil {
  653. return err
  654. }
  655. serviceSpec, err := convert.ServiceSpecToGRPC(spec)
  656. if err != nil {
  657. return err
  658. }
  659. if encodedAuth != "" {
  660. ctnr := serviceSpec.Task.GetContainer()
  661. if ctnr == nil {
  662. return fmt.Errorf("service does not use container tasks")
  663. }
  664. ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
  665. } else {
  666. // this is needed because if the encodedAuth isn't being updated then we
  667. // shouldn't lose it, and continue to use the one that was already present
  668. currentService, err := getService(ctx, c.client, serviceID)
  669. if err != nil {
  670. return err
  671. }
  672. ctnr := currentService.Spec.Task.GetContainer()
  673. if ctnr == nil {
  674. return fmt.Errorf("service does not use container tasks")
  675. }
  676. serviceSpec.Task.GetContainer().PullOptions = ctnr.PullOptions
  677. }
  678. _, err = c.client.UpdateService(
  679. ctx,
  680. &swarmapi.UpdateServiceRequest{
  681. ServiceID: serviceID,
  682. Spec: &serviceSpec,
  683. ServiceVersion: &swarmapi.Version{
  684. Index: version,
  685. },
  686. },
  687. )
  688. return err
  689. }
  690. // RemoveService removes a service from a managed swarm cluster.
  691. func (c *Cluster) RemoveService(input string) error {
  692. c.RLock()
  693. defer c.RUnlock()
  694. if !c.isActiveManager() {
  695. return c.errNoManager()
  696. }
  697. ctx, cancel := c.getRequestContext()
  698. defer cancel()
  699. service, err := getService(ctx, c.client, input)
  700. if err != nil {
  701. return err
  702. }
  703. if _, err := c.client.RemoveService(ctx, &swarmapi.RemoveServiceRequest{ServiceID: service.ID}); err != nil {
  704. return err
  705. }
  706. return nil
  707. }
  708. // GetNodes returns a list of all nodes known to a cluster.
  709. func (c *Cluster) GetNodes(options apitypes.NodeListOptions) ([]types.Node, error) {
  710. c.RLock()
  711. defer c.RUnlock()
  712. if !c.isActiveManager() {
  713. return nil, c.errNoManager()
  714. }
  715. filters, err := newListNodesFilters(options.Filter)
  716. if err != nil {
  717. return nil, err
  718. }
  719. ctx, cancel := c.getRequestContext()
  720. defer cancel()
  721. r, err := c.client.ListNodes(
  722. ctx,
  723. &swarmapi.ListNodesRequest{Filters: filters})
  724. if err != nil {
  725. return nil, err
  726. }
  727. nodes := []types.Node{}
  728. for _, node := range r.Nodes {
  729. nodes = append(nodes, convert.NodeFromGRPC(*node))
  730. }
  731. return nodes, nil
  732. }
  733. // GetNode returns a node based on an ID or name.
  734. func (c *Cluster) GetNode(input string) (types.Node, error) {
  735. c.RLock()
  736. defer c.RUnlock()
  737. if !c.isActiveManager() {
  738. return types.Node{}, c.errNoManager()
  739. }
  740. ctx, cancel := c.getRequestContext()
  741. defer cancel()
  742. node, err := getNode(ctx, c.client, input)
  743. if err != nil {
  744. return types.Node{}, err
  745. }
  746. return convert.NodeFromGRPC(*node), nil
  747. }
  748. // UpdateNode updates existing nodes properties.
  749. func (c *Cluster) UpdateNode(nodeID string, version uint64, spec types.NodeSpec) error {
  750. c.RLock()
  751. defer c.RUnlock()
  752. if !c.isActiveManager() {
  753. return c.errNoManager()
  754. }
  755. nodeSpec, err := convert.NodeSpecToGRPC(spec)
  756. if err != nil {
  757. return err
  758. }
  759. ctx, cancel := c.getRequestContext()
  760. defer cancel()
  761. _, err = c.client.UpdateNode(
  762. ctx,
  763. &swarmapi.UpdateNodeRequest{
  764. NodeID: nodeID,
  765. Spec: &nodeSpec,
  766. NodeVersion: &swarmapi.Version{
  767. Index: version,
  768. },
  769. },
  770. )
  771. return err
  772. }
  773. // RemoveNode removes a node from a cluster
  774. func (c *Cluster) RemoveNode(input string) error {
  775. c.RLock()
  776. defer c.RUnlock()
  777. if !c.isActiveManager() {
  778. return c.errNoManager()
  779. }
  780. ctx, cancel := c.getRequestContext()
  781. defer cancel()
  782. node, err := getNode(ctx, c.client, input)
  783. if err != nil {
  784. return err
  785. }
  786. if _, err := c.client.RemoveNode(ctx, &swarmapi.RemoveNodeRequest{NodeID: node.ID}); err != nil {
  787. return err
  788. }
  789. return nil
  790. }
  791. // GetTasks returns a list of tasks matching the filter options.
  792. func (c *Cluster) GetTasks(options apitypes.TaskListOptions) ([]types.Task, error) {
  793. c.RLock()
  794. defer c.RUnlock()
  795. if !c.isActiveManager() {
  796. return nil, c.errNoManager()
  797. }
  798. byName := func(filter filters.Args) error {
  799. if filter.Include("service") {
  800. serviceFilters := filter.Get("service")
  801. for _, serviceFilter := range serviceFilters {
  802. service, err := c.GetService(serviceFilter)
  803. if err != nil {
  804. return err
  805. }
  806. filter.Del("service", serviceFilter)
  807. filter.Add("service", service.ID)
  808. }
  809. }
  810. if filter.Include("node") {
  811. nodeFilters := filter.Get("node")
  812. for _, nodeFilter := range nodeFilters {
  813. node, err := c.GetNode(nodeFilter)
  814. if err != nil {
  815. return err
  816. }
  817. filter.Del("node", nodeFilter)
  818. filter.Add("node", node.ID)
  819. }
  820. }
  821. return nil
  822. }
  823. filters, err := newListTasksFilters(options.Filter, byName)
  824. if err != nil {
  825. return nil, err
  826. }
  827. ctx, cancel := c.getRequestContext()
  828. defer cancel()
  829. r, err := c.client.ListTasks(
  830. ctx,
  831. &swarmapi.ListTasksRequest{Filters: filters})
  832. if err != nil {
  833. return nil, err
  834. }
  835. tasks := []types.Task{}
  836. for _, task := range r.Tasks {
  837. tasks = append(tasks, convert.TaskFromGRPC(*task))
  838. }
  839. return tasks, nil
  840. }
  841. // GetTask returns a task by an ID.
  842. func (c *Cluster) GetTask(input string) (types.Task, error) {
  843. c.RLock()
  844. defer c.RUnlock()
  845. if !c.isActiveManager() {
  846. return types.Task{}, c.errNoManager()
  847. }
  848. ctx, cancel := c.getRequestContext()
  849. defer cancel()
  850. task, err := getTask(ctx, c.client, input)
  851. if err != nil {
  852. return types.Task{}, err
  853. }
  854. return convert.TaskFromGRPC(*task), nil
  855. }
  856. // GetNetwork returns a cluster network by an ID.
  857. func (c *Cluster) GetNetwork(input string) (apitypes.NetworkResource, error) {
  858. c.RLock()
  859. defer c.RUnlock()
  860. if !c.isActiveManager() {
  861. return apitypes.NetworkResource{}, c.errNoManager()
  862. }
  863. ctx, cancel := c.getRequestContext()
  864. defer cancel()
  865. network, err := getNetwork(ctx, c.client, input)
  866. if err != nil {
  867. return apitypes.NetworkResource{}, err
  868. }
  869. return convert.BasicNetworkFromGRPC(*network), nil
  870. }
  871. // GetNetworks returns all current cluster managed networks.
  872. func (c *Cluster) GetNetworks() ([]apitypes.NetworkResource, error) {
  873. c.RLock()
  874. defer c.RUnlock()
  875. if !c.isActiveManager() {
  876. return nil, c.errNoManager()
  877. }
  878. ctx, cancel := c.getRequestContext()
  879. defer cancel()
  880. r, err := c.client.ListNetworks(ctx, &swarmapi.ListNetworksRequest{})
  881. if err != nil {
  882. return nil, err
  883. }
  884. var networks []apitypes.NetworkResource
  885. for _, network := range r.Networks {
  886. networks = append(networks, convert.BasicNetworkFromGRPC(*network))
  887. }
  888. return networks, nil
  889. }
  890. // CreateNetwork creates a new cluster managed network.
  891. func (c *Cluster) CreateNetwork(s apitypes.NetworkCreateRequest) (string, error) {
  892. c.RLock()
  893. defer c.RUnlock()
  894. if !c.isActiveManager() {
  895. return "", c.errNoManager()
  896. }
  897. if runconfig.IsPreDefinedNetwork(s.Name) {
  898. err := fmt.Errorf("%s is a pre-defined network and cannot be created", s.Name)
  899. return "", errors.NewRequestForbiddenError(err)
  900. }
  901. ctx, cancel := c.getRequestContext()
  902. defer cancel()
  903. networkSpec := convert.BasicNetworkCreateToGRPC(s)
  904. r, err := c.client.CreateNetwork(ctx, &swarmapi.CreateNetworkRequest{Spec: &networkSpec})
  905. if err != nil {
  906. return "", err
  907. }
  908. return r.Network.ID, nil
  909. }
  910. // RemoveNetwork removes a cluster network.
  911. func (c *Cluster) RemoveNetwork(input string) error {
  912. c.RLock()
  913. defer c.RUnlock()
  914. if !c.isActiveManager() {
  915. return c.errNoManager()
  916. }
  917. ctx, cancel := c.getRequestContext()
  918. defer cancel()
  919. network, err := getNetwork(ctx, c.client, input)
  920. if err != nil {
  921. return err
  922. }
  923. if _, err := c.client.RemoveNetwork(ctx, &swarmapi.RemoveNetworkRequest{NetworkID: network.ID}); err != nil {
  924. return err
  925. }
  926. return nil
  927. }
  928. func populateNetworkID(ctx context.Context, c swarmapi.ControlClient, s *types.ServiceSpec) error {
  929. for i, n := range s.Networks {
  930. apiNetwork, err := getNetwork(ctx, c, n.Target)
  931. if err != nil {
  932. return err
  933. }
  934. s.Networks[i].Target = apiNetwork.ID
  935. }
  936. return nil
  937. }
  938. func getNetwork(ctx context.Context, c swarmapi.ControlClient, input string) (*swarmapi.Network, error) {
  939. // GetNetwork to match via full ID.
  940. rg, err := c.GetNetwork(ctx, &swarmapi.GetNetworkRequest{NetworkID: input})
  941. if err != nil {
  942. // If any error (including NotFound), ListNetworks to match via ID prefix and full name.
  943. rl, err := c.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: &swarmapi.ListNetworksRequest_Filters{Names: []string{input}}})
  944. if err != nil || len(rl.Networks) == 0 {
  945. rl, err = c.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: &swarmapi.ListNetworksRequest_Filters{IDPrefixes: []string{input}}})
  946. }
  947. if err != nil {
  948. return nil, err
  949. }
  950. if len(rl.Networks) == 0 {
  951. return nil, fmt.Errorf("network %s not found", input)
  952. }
  953. if l := len(rl.Networks); l > 1 {
  954. return nil, fmt.Errorf("network %s is ambiguous (%d matches found)", input, l)
  955. }
  956. return rl.Networks[0], nil
  957. }
  958. return rg.Network, nil
  959. }
  960. // Cleanup stops active swarm node. This is run before daemon shutdown.
  961. func (c *Cluster) Cleanup() {
  962. c.Lock()
  963. node := c.node
  964. if node == nil {
  965. c.Unlock()
  966. return
  967. }
  968. defer c.Unlock()
  969. if c.isActiveManager() {
  970. active, reachable, unreachable, err := c.managerStats()
  971. if err == nil {
  972. singlenode := active && reachable == 1 && unreachable == 0
  973. if active && !singlenode && reachable-2 <= unreachable {
  974. logrus.Errorf("Leaving cluster with %v managers left out of %v. Raft quorum will be lost.", reachable-1, reachable+unreachable)
  975. }
  976. }
  977. }
  978. c.stopNode()
  979. }
  980. func (c *Cluster) managerStats() (current bool, reachable int, unreachable int, err error) {
  981. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  982. defer cancel()
  983. nodes, err := c.client.ListNodes(ctx, &swarmapi.ListNodesRequest{})
  984. if err != nil {
  985. return false, 0, 0, err
  986. }
  987. for _, n := range nodes.Nodes {
  988. if n.ManagerStatus != nil {
  989. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_REACHABLE {
  990. reachable++
  991. if n.ID == c.node.NodeID() {
  992. current = true
  993. }
  994. }
  995. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_UNREACHABLE {
  996. unreachable++
  997. }
  998. }
  999. }
  1000. return
  1001. }
  1002. func validateAndSanitizeInitRequest(req *types.InitRequest) error {
  1003. var err error
  1004. req.ListenAddr, err = validateAddr(req.ListenAddr)
  1005. if err != nil {
  1006. return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
  1007. }
  1008. spec := &req.Spec
  1009. // provide sane defaults instead of erroring
  1010. if spec.Name == "" {
  1011. spec.Name = "default"
  1012. }
  1013. if spec.Raft.SnapshotInterval == 0 {
  1014. spec.Raft.SnapshotInterval = defaultSpec.Raft.SnapshotInterval
  1015. }
  1016. if spec.Raft.LogEntriesForSlowFollowers == 0 {
  1017. spec.Raft.LogEntriesForSlowFollowers = defaultSpec.Raft.LogEntriesForSlowFollowers
  1018. }
  1019. if spec.Raft.ElectionTick == 0 {
  1020. spec.Raft.ElectionTick = defaultSpec.Raft.ElectionTick
  1021. }
  1022. if spec.Raft.HeartbeatTick == 0 {
  1023. spec.Raft.HeartbeatTick = defaultSpec.Raft.HeartbeatTick
  1024. }
  1025. if spec.Dispatcher.HeartbeatPeriod == 0 {
  1026. spec.Dispatcher.HeartbeatPeriod = defaultSpec.Dispatcher.HeartbeatPeriod
  1027. }
  1028. if spec.CAConfig.NodeCertExpiry == 0 {
  1029. spec.CAConfig.NodeCertExpiry = defaultSpec.CAConfig.NodeCertExpiry
  1030. }
  1031. if spec.Orchestration.TaskHistoryRetentionLimit == 0 {
  1032. spec.Orchestration.TaskHistoryRetentionLimit = defaultSpec.Orchestration.TaskHistoryRetentionLimit
  1033. }
  1034. return nil
  1035. }
  1036. func validateAndSanitizeJoinRequest(req *types.JoinRequest) error {
  1037. var err error
  1038. req.ListenAddr, err = validateAddr(req.ListenAddr)
  1039. if err != nil {
  1040. return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
  1041. }
  1042. if len(req.RemoteAddrs) == 0 {
  1043. return fmt.Errorf("at least 1 RemoteAddr is required to join")
  1044. }
  1045. for i := range req.RemoteAddrs {
  1046. req.RemoteAddrs[i], err = validateAddr(req.RemoteAddrs[i])
  1047. if err != nil {
  1048. return fmt.Errorf("invalid remoteAddr %q: %v", req.RemoteAddrs[i], err)
  1049. }
  1050. }
  1051. return nil
  1052. }
  1053. func validateAddr(addr string) (string, error) {
  1054. if addr == "" {
  1055. return addr, fmt.Errorf("invalid empty address")
  1056. }
  1057. newaddr, err := opts.ParseTCPAddr(addr, defaultAddr)
  1058. if err != nil {
  1059. return addr, nil
  1060. }
  1061. return strings.TrimPrefix(newaddr, "tcp://"), nil
  1062. }
  1063. func initClusterSpec(node *node, spec types.Spec) error {
  1064. ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
  1065. for conn := range node.ListenControlSocket(ctx) {
  1066. if ctx.Err() != nil {
  1067. return ctx.Err()
  1068. }
  1069. if conn != nil {
  1070. client := swarmapi.NewControlClient(conn)
  1071. var cluster *swarmapi.Cluster
  1072. for i := 0; ; i++ {
  1073. lcr, err := client.ListClusters(ctx, &swarmapi.ListClustersRequest{})
  1074. if err != nil {
  1075. return fmt.Errorf("error on listing clusters: %v", err)
  1076. }
  1077. if len(lcr.Clusters) == 0 {
  1078. if i < 10 {
  1079. time.Sleep(200 * time.Millisecond)
  1080. continue
  1081. }
  1082. return fmt.Errorf("empty list of clusters was returned")
  1083. }
  1084. cluster = lcr.Clusters[0]
  1085. break
  1086. }
  1087. newspec, err := convert.SwarmSpecToGRPC(spec)
  1088. if err != nil {
  1089. return fmt.Errorf("error updating cluster settings: %v", err)
  1090. }
  1091. _, err = client.UpdateCluster(ctx, &swarmapi.UpdateClusterRequest{
  1092. ClusterID: cluster.ID,
  1093. ClusterVersion: &cluster.Meta.Version,
  1094. Spec: &newspec,
  1095. })
  1096. if err != nil {
  1097. return fmt.Errorf("error updating cluster settings: %v", err)
  1098. }
  1099. return nil
  1100. }
  1101. }
  1102. return ctx.Err()
  1103. }