cluster.go 30 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180
  1. package cluster
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io/ioutil"
  6. "os"
  7. "path/filepath"
  8. "strings"
  9. "sync"
  10. "time"
  11. "google.golang.org/grpc"
  12. "github.com/Sirupsen/logrus"
  13. "github.com/docker/distribution/digest"
  14. "github.com/docker/docker/daemon/cluster/convert"
  15. executorpkg "github.com/docker/docker/daemon/cluster/executor"
  16. "github.com/docker/docker/daemon/cluster/executor/container"
  17. "github.com/docker/docker/errors"
  18. "github.com/docker/docker/opts"
  19. "github.com/docker/docker/pkg/ioutils"
  20. "github.com/docker/docker/runconfig"
  21. apitypes "github.com/docker/engine-api/types"
  22. types "github.com/docker/engine-api/types/swarm"
  23. swarmagent "github.com/docker/swarmkit/agent"
  24. swarmapi "github.com/docker/swarmkit/api"
  25. "golang.org/x/net/context"
  26. )
  27. const swarmDirName = "swarm"
  28. const controlSocket = "control.sock"
  29. const swarmConnectTimeout = 20 * time.Second
  30. const stateFile = "docker-state.json"
  31. const defaultAddr = "0.0.0.0:2377"
  32. const (
  33. initialReconnectDelay = 100 * time.Millisecond
  34. maxReconnectDelay = 30 * time.Second
  35. )
  36. // ErrNoManager is returned then a manager-only function is called on non-manager
  37. var ErrNoManager = fmt.Errorf("This node is not participating as a Swarm manager")
  38. // ErrNoSwarm is returned on leaving a cluster that was never initialized
  39. var ErrNoSwarm = fmt.Errorf("This node is not part of Swarm")
  40. // ErrSwarmExists is returned on initialize or join request for a cluster that has already been activated
  41. var ErrSwarmExists = fmt.Errorf("This node is already part of a Swarm cluster. Use \"docker swarm leave\" to leave this cluster and join another one.")
  42. // ErrPendingSwarmExists is returned on initialize or join request for a cluster that is already processing a similar request but has not succeeded yet.
  43. var ErrPendingSwarmExists = fmt.Errorf("This node is processing an existing join request that has not succeeded yet. Use \"docker swarm leave\" to cancel the current request.")
  44. // ErrSwarmJoinTimeoutReached is returned when cluster join could not complete before timeout was reached.
  45. var ErrSwarmJoinTimeoutReached = fmt.Errorf("Timeout was reached before node was joined. Attempt to join the cluster will continue in the background. Use \"docker info\" command to see the current Swarm status of your node.")
  46. // defaultSpec contains some sane defaults if cluster options are missing on init
  47. var defaultSpec = types.Spec{
  48. Raft: types.RaftConfig{
  49. SnapshotInterval: 10000,
  50. KeepOldSnapshots: 0,
  51. LogEntriesForSlowFollowers: 500,
  52. HeartbeatTick: 1,
  53. ElectionTick: 3,
  54. },
  55. CAConfig: types.CAConfig{
  56. NodeCertExpiry: 90 * 24 * time.Hour,
  57. },
  58. Dispatcher: types.DispatcherConfig{
  59. HeartbeatPeriod: uint64((5 * time.Second).Nanoseconds()),
  60. },
  61. Orchestration: types.OrchestrationConfig{
  62. TaskHistoryRetentionLimit: 10,
  63. },
  64. }
  65. type state struct {
  66. ListenAddr string
  67. }
  68. // Config provides values for Cluster.
  69. type Config struct {
  70. Root string
  71. Name string
  72. Backend executorpkg.Backend
  73. }
  74. // Cluster provides capabilities to participate in a cluster as a worker or a
  75. // manager.
  76. type Cluster struct {
  77. sync.RWMutex
  78. root string
  79. config Config
  80. configEvent chan struct{} // todo: make this array and goroutine safe
  81. node *swarmagent.Node
  82. conn *grpc.ClientConn
  83. client swarmapi.ControlClient
  84. ready bool
  85. listenAddr string
  86. err error
  87. reconnectDelay time.Duration
  88. stop bool
  89. cancelDelay func()
  90. }
  91. // New creates a new Cluster instance using provided config.
  92. func New(config Config) (*Cluster, error) {
  93. root := filepath.Join(config.Root, swarmDirName)
  94. if err := os.MkdirAll(root, 0700); err != nil {
  95. return nil, err
  96. }
  97. c := &Cluster{
  98. root: root,
  99. config: config,
  100. configEvent: make(chan struct{}, 10),
  101. reconnectDelay: initialReconnectDelay,
  102. }
  103. st, err := c.loadState()
  104. if err != nil {
  105. if os.IsNotExist(err) {
  106. return c, nil
  107. }
  108. return nil, err
  109. }
  110. n, ctx, err := c.startNewNode(false, st.ListenAddr, "", "", "", false)
  111. if err != nil {
  112. return nil, err
  113. }
  114. select {
  115. case <-time.After(swarmConnectTimeout):
  116. logrus.Errorf("swarm component could not be started before timeout was reached")
  117. case <-n.Ready():
  118. case <-ctx.Done():
  119. }
  120. if ctx.Err() != nil {
  121. return nil, fmt.Errorf("swarm component could not be started")
  122. }
  123. go c.reconnectOnFailure(ctx)
  124. return c, nil
  125. }
  126. func (c *Cluster) loadState() (*state, error) {
  127. dt, err := ioutil.ReadFile(filepath.Join(c.root, stateFile))
  128. if err != nil {
  129. return nil, err
  130. }
  131. // missing certificate means no actual state to restore from
  132. if _, err := os.Stat(filepath.Join(c.root, "certificates/swarm-node.crt")); err != nil {
  133. if os.IsNotExist(err) {
  134. c.clearState()
  135. }
  136. return nil, err
  137. }
  138. var st state
  139. if err := json.Unmarshal(dt, &st); err != nil {
  140. return nil, err
  141. }
  142. return &st, nil
  143. }
  144. func (c *Cluster) saveState() error {
  145. dt, err := json.Marshal(state{ListenAddr: c.listenAddr})
  146. if err != nil {
  147. return err
  148. }
  149. return ioutils.AtomicWriteFile(filepath.Join(c.root, stateFile), dt, 0600)
  150. }
  151. func (c *Cluster) reconnectOnFailure(ctx context.Context) {
  152. for {
  153. <-ctx.Done()
  154. c.Lock()
  155. if c.stop || c.node != nil {
  156. c.Unlock()
  157. return
  158. }
  159. c.reconnectDelay *= 2
  160. if c.reconnectDelay > maxReconnectDelay {
  161. c.reconnectDelay = maxReconnectDelay
  162. }
  163. logrus.Warnf("Restarting swarm in %.2f seconds", c.reconnectDelay.Seconds())
  164. delayCtx, cancel := context.WithTimeout(context.Background(), c.reconnectDelay)
  165. c.cancelDelay = cancel
  166. c.Unlock()
  167. <-delayCtx.Done()
  168. if delayCtx.Err() != context.DeadlineExceeded {
  169. return
  170. }
  171. c.Lock()
  172. if c.node != nil {
  173. c.Unlock()
  174. return
  175. }
  176. var err error
  177. _, ctx, err = c.startNewNode(false, c.listenAddr, c.getRemoteAddress(), "", "", false)
  178. if err != nil {
  179. c.err = err
  180. ctx = delayCtx
  181. }
  182. c.Unlock()
  183. }
  184. }
  185. func (c *Cluster) startNewNode(forceNewCluster bool, listenAddr, joinAddr, secret, cahash string, ismanager bool) (*swarmagent.Node, context.Context, error) {
  186. if err := c.config.Backend.IsSwarmCompatible(); err != nil {
  187. return nil, nil, err
  188. }
  189. c.node = nil
  190. c.cancelDelay = nil
  191. node, err := swarmagent.NewNode(&swarmagent.NodeConfig{
  192. Hostname: c.config.Name,
  193. ForceNewCluster: forceNewCluster,
  194. ListenControlAPI: filepath.Join(c.root, controlSocket),
  195. ListenRemoteAPI: listenAddr,
  196. JoinAddr: joinAddr,
  197. StateDir: c.root,
  198. CAHash: cahash,
  199. Secret: secret,
  200. Executor: container.NewExecutor(c.config.Backend),
  201. HeartbeatTick: 1,
  202. ElectionTick: 3,
  203. IsManager: ismanager,
  204. })
  205. if err != nil {
  206. return nil, nil, err
  207. }
  208. ctx, cancel := context.WithCancel(context.Background())
  209. if err := node.Start(ctx); err != nil {
  210. return nil, nil, err
  211. }
  212. c.node = node
  213. c.listenAddr = listenAddr
  214. c.saveState()
  215. c.config.Backend.SetClusterProvider(c)
  216. go func() {
  217. err := node.Err(ctx)
  218. if err != nil {
  219. logrus.Errorf("cluster exited with error: %v", err)
  220. }
  221. c.Lock()
  222. c.conn = nil
  223. c.client = nil
  224. c.node = nil
  225. c.ready = false
  226. c.err = err
  227. c.Unlock()
  228. cancel()
  229. }()
  230. go func() {
  231. select {
  232. case <-node.Ready():
  233. c.Lock()
  234. c.reconnectDelay = initialReconnectDelay
  235. c.Unlock()
  236. case <-ctx.Done():
  237. }
  238. if ctx.Err() == nil {
  239. c.Lock()
  240. c.ready = true
  241. c.err = nil
  242. c.Unlock()
  243. }
  244. c.configEvent <- struct{}{}
  245. }()
  246. go func() {
  247. for conn := range node.ListenControlSocket(ctx) {
  248. c.Lock()
  249. if c.conn != conn {
  250. c.client = swarmapi.NewControlClient(conn)
  251. }
  252. if c.conn != nil {
  253. c.client = nil
  254. }
  255. c.conn = conn
  256. c.Unlock()
  257. c.configEvent <- struct{}{}
  258. }
  259. }()
  260. return node, ctx, nil
  261. }
  262. // Init initializes new cluster from user provided request.
  263. func (c *Cluster) Init(req types.InitRequest) (string, error) {
  264. c.Lock()
  265. if node := c.node; node != nil {
  266. c.Unlock()
  267. if !req.ForceNewCluster {
  268. return "", errSwarmExists(node)
  269. }
  270. ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
  271. defer cancel()
  272. c.cancelReconnect()
  273. if err := c.node.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") {
  274. return "", err
  275. }
  276. c.Lock()
  277. c.node = nil
  278. c.conn = nil
  279. c.ready = false
  280. }
  281. if err := validateAndSanitizeInitRequest(&req); err != nil {
  282. c.Unlock()
  283. return "", err
  284. }
  285. // todo: check current state existing
  286. n, ctx, err := c.startNewNode(req.ForceNewCluster, req.ListenAddr, "", "", "", false)
  287. if err != nil {
  288. c.Unlock()
  289. return "", err
  290. }
  291. c.Unlock()
  292. select {
  293. case <-n.Ready():
  294. if err := initClusterSpec(n, req.Spec); err != nil {
  295. return "", err
  296. }
  297. go c.reconnectOnFailure(ctx)
  298. return n.NodeID(), nil
  299. case <-ctx.Done():
  300. c.RLock()
  301. defer c.RUnlock()
  302. if c.err != nil {
  303. if !req.ForceNewCluster { // if failure on first attempt don't keep state
  304. if err := c.clearState(); err != nil {
  305. return "", err
  306. }
  307. }
  308. return "", c.err
  309. }
  310. return "", ctx.Err()
  311. }
  312. }
  313. // Join makes current Cluster part of an existing swarm cluster.
  314. func (c *Cluster) Join(req types.JoinRequest) error {
  315. c.Lock()
  316. if node := c.node; node != nil {
  317. c.Unlock()
  318. return errSwarmExists(node)
  319. }
  320. if err := validateAndSanitizeJoinRequest(&req); err != nil {
  321. c.Unlock()
  322. return err
  323. }
  324. // todo: check current state existing
  325. n, ctx, err := c.startNewNode(false, req.ListenAddr, req.RemoteAddrs[0], req.Secret, req.CACertHash, req.Manager)
  326. if err != nil {
  327. c.Unlock()
  328. return err
  329. }
  330. c.Unlock()
  331. certificateRequested := n.CertificateRequested()
  332. for {
  333. select {
  334. case <-certificateRequested:
  335. if n.NodeMembership() == swarmapi.NodeMembershipPending {
  336. return fmt.Errorf("Your node is in the process of joining the cluster but needs to be accepted by existing cluster member.\nTo accept this node into cluster run \"docker node accept %v\" in an existing cluster manager. Use \"docker info\" command to see the current Swarm status of your node.", n.NodeID())
  337. }
  338. certificateRequested = nil
  339. case <-time.After(swarmConnectTimeout):
  340. // attempt to connect will continue in background, also reconnecting
  341. go c.reconnectOnFailure(ctx)
  342. return ErrSwarmJoinTimeoutReached
  343. case <-n.Ready():
  344. go c.reconnectOnFailure(ctx)
  345. return nil
  346. case <-ctx.Done():
  347. c.RLock()
  348. defer c.RUnlock()
  349. if c.err != nil {
  350. return c.err
  351. }
  352. return ctx.Err()
  353. }
  354. }
  355. }
  356. func (c *Cluster) cancelReconnect() {
  357. c.stop = true
  358. if c.cancelDelay != nil {
  359. c.cancelDelay()
  360. c.cancelDelay = nil
  361. }
  362. }
  363. // Leave shuts down Cluster and removes current state.
  364. func (c *Cluster) Leave(force bool) error {
  365. c.Lock()
  366. node := c.node
  367. if node == nil {
  368. c.Unlock()
  369. return ErrNoSwarm
  370. }
  371. if node.Manager() != nil && !force {
  372. msg := "You are attempting to leave cluster on a node that is participating as a manager. "
  373. if c.isActiveManager() {
  374. active, reachable, unreachable, err := c.managerStats()
  375. if err == nil {
  376. if active && reachable-2 <= unreachable {
  377. if reachable == 1 && unreachable == 0 {
  378. msg += "Leaving last manager will remove all current state of the cluster. Use `--force` to ignore this message. "
  379. c.Unlock()
  380. return fmt.Errorf(msg)
  381. }
  382. msg += fmt.Sprintf("Leaving cluster will leave you with %v managers out of %v. This means Raft quorum will be lost and your cluster will become inaccessible. ", reachable-1, reachable+unreachable)
  383. }
  384. }
  385. } else {
  386. msg += "Doing so may lose the consensus of your cluster. "
  387. }
  388. msg += "Only way to restore a cluster that has lost consensus is to reinitialize it with `--force-new-cluster`. Use `--force` to ignore this message."
  389. c.Unlock()
  390. return fmt.Errorf(msg)
  391. }
  392. c.cancelReconnect()
  393. c.Unlock()
  394. ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
  395. defer cancel()
  396. if err := node.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") {
  397. return err
  398. }
  399. if nodeID := node.NodeID(); nodeID != "" {
  400. for _, id := range c.config.Backend.ListContainersForNode(nodeID) {
  401. if err := c.config.Backend.ContainerRm(id, &apitypes.ContainerRmConfig{ForceRemove: true}); err != nil {
  402. logrus.Errorf("error removing %v: %v", id, err)
  403. }
  404. }
  405. }
  406. c.Lock()
  407. defer c.Unlock()
  408. c.node = nil
  409. c.conn = nil
  410. c.ready = false
  411. c.configEvent <- struct{}{}
  412. // todo: cleanup optional?
  413. if err := c.clearState(); err != nil {
  414. return err
  415. }
  416. return nil
  417. }
  418. func (c *Cluster) clearState() error {
  419. // todo: backup this data instead of removing?
  420. if err := os.RemoveAll(c.root); err != nil {
  421. return err
  422. }
  423. if err := os.MkdirAll(c.root, 0700); err != nil {
  424. return err
  425. }
  426. c.config.Backend.SetClusterProvider(nil)
  427. return nil
  428. }
  429. func (c *Cluster) getRequestContext() context.Context { // TODO: not needed when requests don't block on qourum lost
  430. ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
  431. return ctx
  432. }
  433. // Inspect retrieves the configuration properties of a managed swarm cluster.
  434. func (c *Cluster) Inspect() (types.Swarm, error) {
  435. c.RLock()
  436. defer c.RUnlock()
  437. if !c.isActiveManager() {
  438. return types.Swarm{}, ErrNoManager
  439. }
  440. swarm, err := getSwarm(c.getRequestContext(), c.client)
  441. if err != nil {
  442. return types.Swarm{}, err
  443. }
  444. if err != nil {
  445. return types.Swarm{}, err
  446. }
  447. return convert.SwarmFromGRPC(*swarm), nil
  448. }
  449. // Update updates configuration of a managed swarm cluster.
  450. func (c *Cluster) Update(version uint64, spec types.Spec) error {
  451. c.RLock()
  452. defer c.RUnlock()
  453. if !c.isActiveManager() {
  454. return ErrNoManager
  455. }
  456. swarm, err := getSwarm(c.getRequestContext(), c.client)
  457. if err != nil {
  458. return err
  459. }
  460. swarmSpec, err := convert.SwarmSpecToGRPCandMerge(spec, &swarm.Spec)
  461. if err != nil {
  462. return err
  463. }
  464. _, err = c.client.UpdateCluster(
  465. c.getRequestContext(),
  466. &swarmapi.UpdateClusterRequest{
  467. ClusterID: swarm.ID,
  468. Spec: &swarmSpec,
  469. ClusterVersion: &swarmapi.Version{
  470. Index: version,
  471. },
  472. },
  473. )
  474. return err
  475. }
  476. // IsManager returns true if Cluster is participating as a manager.
  477. func (c *Cluster) IsManager() bool {
  478. c.RLock()
  479. defer c.RUnlock()
  480. return c.isActiveManager()
  481. }
  482. // IsAgent returns true if Cluster is participating as a worker/agent.
  483. func (c *Cluster) IsAgent() bool {
  484. c.RLock()
  485. defer c.RUnlock()
  486. return c.ready
  487. }
  488. // GetListenAddress returns the listening address for current manager's
  489. // consensus and dispatcher APIs.
  490. func (c *Cluster) GetListenAddress() string {
  491. c.RLock()
  492. defer c.RUnlock()
  493. if c.conn != nil {
  494. return c.listenAddr
  495. }
  496. return ""
  497. }
  498. // GetRemoteAddress returns a known advertise address of a remote manager if
  499. // available.
  500. // todo: change to array/connect with info
  501. func (c *Cluster) GetRemoteAddress() string {
  502. c.RLock()
  503. defer c.RUnlock()
  504. return c.getRemoteAddress()
  505. }
  506. func (c *Cluster) getRemoteAddress() string {
  507. if c.node == nil {
  508. return ""
  509. }
  510. nodeID := c.node.NodeID()
  511. for _, r := range c.node.Remotes() {
  512. if r.NodeID != nodeID {
  513. return r.Addr
  514. }
  515. }
  516. return ""
  517. }
  518. // ListenClusterEvents returns a channel that receives messages on cluster
  519. // participation changes.
  520. // todo: make cancelable and accessible to multiple callers
  521. func (c *Cluster) ListenClusterEvents() <-chan struct{} {
  522. return c.configEvent
  523. }
  524. // Info returns information about the current cluster state.
  525. func (c *Cluster) Info() types.Info {
  526. var info types.Info
  527. c.RLock()
  528. defer c.RUnlock()
  529. if c.node == nil {
  530. info.LocalNodeState = types.LocalNodeStateInactive
  531. if c.cancelDelay != nil {
  532. info.LocalNodeState = types.LocalNodeStateError
  533. }
  534. } else {
  535. info.LocalNodeState = types.LocalNodeStatePending
  536. if c.ready == true {
  537. info.LocalNodeState = types.LocalNodeStateActive
  538. }
  539. }
  540. if c.err != nil {
  541. info.Error = c.err.Error()
  542. }
  543. if c.isActiveManager() {
  544. info.ControlAvailable = true
  545. if r, err := c.client.ListNodes(c.getRequestContext(), &swarmapi.ListNodesRequest{}); err == nil {
  546. info.Nodes = len(r.Nodes)
  547. for _, n := range r.Nodes {
  548. if n.ManagerStatus != nil {
  549. info.Managers = info.Managers + 1
  550. }
  551. }
  552. }
  553. if swarm, err := getSwarm(c.getRequestContext(), c.client); err == nil && swarm != nil {
  554. info.CACertHash = swarm.RootCA.CACertHash
  555. }
  556. }
  557. if c.node != nil {
  558. for _, r := range c.node.Remotes() {
  559. info.RemoteManagers = append(info.RemoteManagers, types.Peer{NodeID: r.NodeID, Addr: r.Addr})
  560. }
  561. info.NodeID = c.node.NodeID()
  562. }
  563. return info
  564. }
  565. // isActiveManager should not be called without a read lock
  566. func (c *Cluster) isActiveManager() bool {
  567. return c.conn != nil
  568. }
  569. // GetServices returns all services of a managed swarm cluster.
  570. func (c *Cluster) GetServices(options apitypes.ServiceListOptions) ([]types.Service, error) {
  571. c.RLock()
  572. defer c.RUnlock()
  573. if !c.isActiveManager() {
  574. return nil, ErrNoManager
  575. }
  576. filters, err := newListServicesFilters(options.Filter)
  577. if err != nil {
  578. return nil, err
  579. }
  580. r, err := c.client.ListServices(
  581. c.getRequestContext(),
  582. &swarmapi.ListServicesRequest{Filters: filters})
  583. if err != nil {
  584. return nil, err
  585. }
  586. var services []types.Service
  587. for _, service := range r.Services {
  588. services = append(services, convert.ServiceFromGRPC(*service))
  589. }
  590. return services, nil
  591. }
  592. // CreateService creates a new service in a managed swarm cluster.
  593. func (c *Cluster) CreateService(s types.ServiceSpec) (string, error) {
  594. c.RLock()
  595. defer c.RUnlock()
  596. if !c.isActiveManager() {
  597. return "", ErrNoManager
  598. }
  599. ctx := c.getRequestContext()
  600. err := populateNetworkID(ctx, c.client, &s)
  601. if err != nil {
  602. return "", err
  603. }
  604. serviceSpec, err := convert.ServiceSpecToGRPC(s)
  605. if err != nil {
  606. return "", err
  607. }
  608. r, err := c.client.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
  609. if err != nil {
  610. return "", err
  611. }
  612. return r.Service.ID, nil
  613. }
  614. // GetService returns a service based on an ID or name.
  615. func (c *Cluster) GetService(input string) (types.Service, error) {
  616. c.RLock()
  617. defer c.RUnlock()
  618. if !c.isActiveManager() {
  619. return types.Service{}, ErrNoManager
  620. }
  621. service, err := getService(c.getRequestContext(), c.client, input)
  622. if err != nil {
  623. return types.Service{}, err
  624. }
  625. return convert.ServiceFromGRPC(*service), nil
  626. }
  627. // UpdateService updates existing service to match new properties.
  628. func (c *Cluster) UpdateService(serviceID string, version uint64, spec types.ServiceSpec) error {
  629. c.RLock()
  630. defer c.RUnlock()
  631. if !c.isActiveManager() {
  632. return ErrNoManager
  633. }
  634. serviceSpec, err := convert.ServiceSpecToGRPC(spec)
  635. if err != nil {
  636. return err
  637. }
  638. _, err = c.client.UpdateService(
  639. c.getRequestContext(),
  640. &swarmapi.UpdateServiceRequest{
  641. ServiceID: serviceID,
  642. Spec: &serviceSpec,
  643. ServiceVersion: &swarmapi.Version{
  644. Index: version,
  645. },
  646. },
  647. )
  648. return err
  649. }
  650. // RemoveService removes a service from a managed swarm cluster.
  651. func (c *Cluster) RemoveService(input string) error {
  652. c.RLock()
  653. defer c.RUnlock()
  654. if !c.isActiveManager() {
  655. return ErrNoManager
  656. }
  657. service, err := getService(c.getRequestContext(), c.client, input)
  658. if err != nil {
  659. return err
  660. }
  661. if _, err := c.client.RemoveService(c.getRequestContext(), &swarmapi.RemoveServiceRequest{ServiceID: service.ID}); err != nil {
  662. return err
  663. }
  664. return nil
  665. }
  666. // GetNodes returns a list of all nodes known to a cluster.
  667. func (c *Cluster) GetNodes(options apitypes.NodeListOptions) ([]types.Node, error) {
  668. c.RLock()
  669. defer c.RUnlock()
  670. if !c.isActiveManager() {
  671. return nil, ErrNoManager
  672. }
  673. filters, err := newListNodesFilters(options.Filter)
  674. if err != nil {
  675. return nil, err
  676. }
  677. r, err := c.client.ListNodes(
  678. c.getRequestContext(),
  679. &swarmapi.ListNodesRequest{Filters: filters})
  680. if err != nil {
  681. return nil, err
  682. }
  683. nodes := []types.Node{}
  684. for _, node := range r.Nodes {
  685. nodes = append(nodes, convert.NodeFromGRPC(*node))
  686. }
  687. return nodes, nil
  688. }
  689. // GetNode returns a node based on an ID or name.
  690. func (c *Cluster) GetNode(input string) (types.Node, error) {
  691. c.RLock()
  692. defer c.RUnlock()
  693. if !c.isActiveManager() {
  694. return types.Node{}, ErrNoManager
  695. }
  696. node, err := getNode(c.getRequestContext(), c.client, input)
  697. if err != nil {
  698. return types.Node{}, err
  699. }
  700. return convert.NodeFromGRPC(*node), nil
  701. }
  702. // UpdateNode updates existing nodes properties.
  703. func (c *Cluster) UpdateNode(nodeID string, version uint64, spec types.NodeSpec) error {
  704. c.RLock()
  705. defer c.RUnlock()
  706. if !c.isActiveManager() {
  707. return ErrNoManager
  708. }
  709. nodeSpec, err := convert.NodeSpecToGRPC(spec)
  710. if err != nil {
  711. return err
  712. }
  713. _, err = c.client.UpdateNode(
  714. c.getRequestContext(),
  715. &swarmapi.UpdateNodeRequest{
  716. NodeID: nodeID,
  717. Spec: &nodeSpec,
  718. NodeVersion: &swarmapi.Version{
  719. Index: version,
  720. },
  721. },
  722. )
  723. return err
  724. }
  725. // RemoveNode removes a node from a cluster
  726. func (c *Cluster) RemoveNode(input string) error {
  727. c.RLock()
  728. defer c.RUnlock()
  729. if !c.isActiveManager() {
  730. return ErrNoManager
  731. }
  732. ctx := c.getRequestContext()
  733. node, err := getNode(ctx, c.client, input)
  734. if err != nil {
  735. return err
  736. }
  737. if _, err := c.client.RemoveNode(ctx, &swarmapi.RemoveNodeRequest{NodeID: node.ID}); err != nil {
  738. return err
  739. }
  740. return nil
  741. }
  742. // GetTasks returns a list of tasks matching the filter options.
  743. func (c *Cluster) GetTasks(options apitypes.TaskListOptions) ([]types.Task, error) {
  744. c.RLock()
  745. defer c.RUnlock()
  746. if !c.isActiveManager() {
  747. return nil, ErrNoManager
  748. }
  749. filters, err := newListTasksFilters(options.Filter)
  750. if err != nil {
  751. return nil, err
  752. }
  753. r, err := c.client.ListTasks(
  754. c.getRequestContext(),
  755. &swarmapi.ListTasksRequest{Filters: filters})
  756. if err != nil {
  757. return nil, err
  758. }
  759. tasks := []types.Task{}
  760. for _, task := range r.Tasks {
  761. tasks = append(tasks, convert.TaskFromGRPC(*task))
  762. }
  763. return tasks, nil
  764. }
  765. // GetTask returns a task by an ID.
  766. func (c *Cluster) GetTask(input string) (types.Task, error) {
  767. c.RLock()
  768. defer c.RUnlock()
  769. if !c.isActiveManager() {
  770. return types.Task{}, ErrNoManager
  771. }
  772. task, err := getTask(c.getRequestContext(), c.client, input)
  773. if err != nil {
  774. return types.Task{}, err
  775. }
  776. return convert.TaskFromGRPC(*task), nil
  777. }
  778. // GetNetwork returns a cluster network by an ID.
  779. func (c *Cluster) GetNetwork(input string) (apitypes.NetworkResource, error) {
  780. c.RLock()
  781. defer c.RUnlock()
  782. if !c.isActiveManager() {
  783. return apitypes.NetworkResource{}, ErrNoManager
  784. }
  785. network, err := getNetwork(c.getRequestContext(), c.client, input)
  786. if err != nil {
  787. return apitypes.NetworkResource{}, err
  788. }
  789. return convert.BasicNetworkFromGRPC(*network), nil
  790. }
  791. // GetNetworks returns all current cluster managed networks.
  792. func (c *Cluster) GetNetworks() ([]apitypes.NetworkResource, error) {
  793. c.RLock()
  794. defer c.RUnlock()
  795. if !c.isActiveManager() {
  796. return nil, ErrNoManager
  797. }
  798. r, err := c.client.ListNetworks(c.getRequestContext(), &swarmapi.ListNetworksRequest{})
  799. if err != nil {
  800. return nil, err
  801. }
  802. var networks []apitypes.NetworkResource
  803. for _, network := range r.Networks {
  804. networks = append(networks, convert.BasicNetworkFromGRPC(*network))
  805. }
  806. return networks, nil
  807. }
  808. // CreateNetwork creates a new cluster managed network.
  809. func (c *Cluster) CreateNetwork(s apitypes.NetworkCreateRequest) (string, error) {
  810. c.RLock()
  811. defer c.RUnlock()
  812. if !c.isActiveManager() {
  813. return "", ErrNoManager
  814. }
  815. if runconfig.IsPreDefinedNetwork(s.Name) {
  816. err := fmt.Errorf("%s is a pre-defined network and cannot be created", s.Name)
  817. return "", errors.NewRequestForbiddenError(err)
  818. }
  819. networkSpec := convert.BasicNetworkCreateToGRPC(s)
  820. r, err := c.client.CreateNetwork(c.getRequestContext(), &swarmapi.CreateNetworkRequest{Spec: &networkSpec})
  821. if err != nil {
  822. return "", err
  823. }
  824. return r.Network.ID, nil
  825. }
  826. // RemoveNetwork removes a cluster network.
  827. func (c *Cluster) RemoveNetwork(input string) error {
  828. c.RLock()
  829. defer c.RUnlock()
  830. if !c.isActiveManager() {
  831. return ErrNoManager
  832. }
  833. network, err := getNetwork(c.getRequestContext(), c.client, input)
  834. if err != nil {
  835. return err
  836. }
  837. if _, err := c.client.RemoveNetwork(c.getRequestContext(), &swarmapi.RemoveNetworkRequest{NetworkID: network.ID}); err != nil {
  838. return err
  839. }
  840. return nil
  841. }
  842. func populateNetworkID(ctx context.Context, c swarmapi.ControlClient, s *types.ServiceSpec) error {
  843. for i, n := range s.Networks {
  844. apiNetwork, err := getNetwork(ctx, c, n.Target)
  845. if err != nil {
  846. return err
  847. }
  848. s.Networks[i].Target = apiNetwork.ID
  849. }
  850. return nil
  851. }
  852. func getNetwork(ctx context.Context, c swarmapi.ControlClient, input string) (*swarmapi.Network, error) {
  853. // GetNetwork to match via full ID.
  854. rg, err := c.GetNetwork(ctx, &swarmapi.GetNetworkRequest{NetworkID: input})
  855. if err != nil {
  856. // If any error (including NotFound), ListNetworks to match via ID prefix and full name.
  857. rl, err := c.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: &swarmapi.ListNetworksRequest_Filters{Names: []string{input}}})
  858. if err != nil || len(rl.Networks) == 0 {
  859. rl, err = c.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: &swarmapi.ListNetworksRequest_Filters{IDPrefixes: []string{input}}})
  860. }
  861. if err != nil {
  862. return nil, err
  863. }
  864. if len(rl.Networks) == 0 {
  865. return nil, fmt.Errorf("network %s not found", input)
  866. }
  867. if l := len(rl.Networks); l > 1 {
  868. return nil, fmt.Errorf("network %s is ambigious (%d matches found)", input, l)
  869. }
  870. return rl.Networks[0], nil
  871. }
  872. return rg.Network, nil
  873. }
  874. // Cleanup stops active swarm node. This is run before daemon shutdown.
  875. func (c *Cluster) Cleanup() {
  876. c.Lock()
  877. node := c.node
  878. if node == nil {
  879. c.Unlock()
  880. return
  881. }
  882. if c.isActiveManager() {
  883. active, reachable, unreachable, err := c.managerStats()
  884. if err == nil {
  885. singlenode := active && reachable == 1 && unreachable == 0
  886. if active && !singlenode && reachable-2 <= unreachable {
  887. logrus.Errorf("Leaving cluster with %v managers left out of %v. Raft quorum will be lost.", reachable-1, reachable+unreachable)
  888. }
  889. }
  890. }
  891. c.cancelReconnect()
  892. c.Unlock()
  893. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  894. defer cancel()
  895. if err := node.Stop(ctx); err != nil {
  896. logrus.Errorf("error cleaning up cluster: %v", err)
  897. }
  898. c.Lock()
  899. c.node = nil
  900. c.ready = false
  901. c.conn = nil
  902. c.Unlock()
  903. }
  904. func (c *Cluster) managerStats() (current bool, reachable int, unreachable int, err error) {
  905. ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
  906. nodes, err := c.client.ListNodes(ctx, &swarmapi.ListNodesRequest{})
  907. if err != nil {
  908. return false, 0, 0, err
  909. }
  910. for _, n := range nodes.Nodes {
  911. if n.ManagerStatus != nil {
  912. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_REACHABLE {
  913. reachable++
  914. if n.ID == c.node.NodeID() {
  915. current = true
  916. }
  917. }
  918. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_UNREACHABLE {
  919. unreachable++
  920. }
  921. }
  922. }
  923. return
  924. }
  925. func validateAndSanitizeInitRequest(req *types.InitRequest) error {
  926. var err error
  927. req.ListenAddr, err = validateAddr(req.ListenAddr)
  928. if err != nil {
  929. return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
  930. }
  931. spec := &req.Spec
  932. // provide sane defaults instead of erroring
  933. if spec.Name == "" {
  934. spec.Name = "default"
  935. }
  936. if spec.Raft.SnapshotInterval == 0 {
  937. spec.Raft.SnapshotInterval = defaultSpec.Raft.SnapshotInterval
  938. }
  939. if spec.Raft.LogEntriesForSlowFollowers == 0 {
  940. spec.Raft.LogEntriesForSlowFollowers = defaultSpec.Raft.LogEntriesForSlowFollowers
  941. }
  942. if spec.Raft.ElectionTick == 0 {
  943. spec.Raft.ElectionTick = defaultSpec.Raft.ElectionTick
  944. }
  945. if spec.Raft.HeartbeatTick == 0 {
  946. spec.Raft.HeartbeatTick = defaultSpec.Raft.HeartbeatTick
  947. }
  948. if spec.Dispatcher.HeartbeatPeriod == 0 {
  949. spec.Dispatcher.HeartbeatPeriod = defaultSpec.Dispatcher.HeartbeatPeriod
  950. }
  951. if spec.CAConfig.NodeCertExpiry == 0 {
  952. spec.CAConfig.NodeCertExpiry = defaultSpec.CAConfig.NodeCertExpiry
  953. }
  954. if spec.Orchestration.TaskHistoryRetentionLimit == 0 {
  955. spec.Orchestration.TaskHistoryRetentionLimit = defaultSpec.Orchestration.TaskHistoryRetentionLimit
  956. }
  957. return nil
  958. }
  959. func validateAndSanitizeJoinRequest(req *types.JoinRequest) error {
  960. var err error
  961. req.ListenAddr, err = validateAddr(req.ListenAddr)
  962. if err != nil {
  963. return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
  964. }
  965. if len(req.RemoteAddrs) == 0 {
  966. return fmt.Errorf("at least 1 RemoteAddr is required to join")
  967. }
  968. for i := range req.RemoteAddrs {
  969. req.RemoteAddrs[i], err = validateAddr(req.RemoteAddrs[i])
  970. if err != nil {
  971. return fmt.Errorf("invalid remoteAddr %q: %v", req.RemoteAddrs[i], err)
  972. }
  973. }
  974. if req.CACertHash != "" {
  975. if _, err := digest.ParseDigest(req.CACertHash); err != nil {
  976. return fmt.Errorf("invalid CACertHash %q, %v", req.CACertHash, err)
  977. }
  978. }
  979. return nil
  980. }
  981. func validateAddr(addr string) (string, error) {
  982. if addr == "" {
  983. return addr, fmt.Errorf("invalid empty address")
  984. }
  985. newaddr, err := opts.ParseTCPAddr(addr, defaultAddr)
  986. if err != nil {
  987. return addr, nil
  988. }
  989. return strings.TrimPrefix(newaddr, "tcp://"), nil
  990. }
  991. func errSwarmExists(node *swarmagent.Node) error {
  992. if node.NodeMembership() != swarmapi.NodeMembershipAccepted {
  993. return ErrPendingSwarmExists
  994. }
  995. return ErrSwarmExists
  996. }
  997. func initClusterSpec(node *swarmagent.Node, spec types.Spec) error {
  998. ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
  999. for conn := range node.ListenControlSocket(ctx) {
  1000. if ctx.Err() != nil {
  1001. return ctx.Err()
  1002. }
  1003. if conn != nil {
  1004. client := swarmapi.NewControlClient(conn)
  1005. var cluster *swarmapi.Cluster
  1006. for i := 0; ; i++ {
  1007. lcr, err := client.ListClusters(ctx, &swarmapi.ListClustersRequest{})
  1008. if err != nil {
  1009. return fmt.Errorf("error on listing clusters: %v", err)
  1010. }
  1011. if len(lcr.Clusters) == 0 {
  1012. if i < 10 {
  1013. time.Sleep(200 * time.Millisecond)
  1014. continue
  1015. }
  1016. return fmt.Errorf("empty list of clusters was returned")
  1017. }
  1018. cluster = lcr.Clusters[0]
  1019. break
  1020. }
  1021. newspec, err := convert.SwarmSpecToGRPCandMerge(spec, &cluster.Spec)
  1022. if err != nil {
  1023. return fmt.Errorf("error updating cluster settings: %v", err)
  1024. }
  1025. _, err = client.UpdateCluster(ctx, &swarmapi.UpdateClusterRequest{
  1026. ClusterID: cluster.ID,
  1027. ClusterVersion: &cluster.Meta.Version,
  1028. Spec: &newspec,
  1029. })
  1030. if err != nil {
  1031. return fmt.Errorf("error updating cluster settings: %v", err)
  1032. }
  1033. return nil
  1034. }
  1035. }
  1036. return ctx.Err()
  1037. }