cluster.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065
  1. package cluster
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io/ioutil"
  6. "os"
  7. "path/filepath"
  8. "strings"
  9. "sync"
  10. "time"
  11. "google.golang.org/grpc"
  12. "github.com/Sirupsen/logrus"
  13. "github.com/docker/docker/daemon/cluster/convert"
  14. executorpkg "github.com/docker/docker/daemon/cluster/executor"
  15. "github.com/docker/docker/daemon/cluster/executor/container"
  16. "github.com/docker/docker/errors"
  17. "github.com/docker/docker/pkg/ioutils"
  18. "github.com/docker/docker/runconfig"
  19. apitypes "github.com/docker/engine-api/types"
  20. types "github.com/docker/engine-api/types/swarm"
  21. swarmagent "github.com/docker/swarmkit/agent"
  22. swarmapi "github.com/docker/swarmkit/api"
  23. "golang.org/x/net/context"
  24. )
  25. const swarmDirName = "swarm"
  26. const controlSocket = "control.sock"
  27. const swarmConnectTimeout = 20 * time.Second
  28. const stateFile = "docker-state.json"
  29. const (
  30. initialReconnectDelay = 100 * time.Millisecond
  31. maxReconnectDelay = 30 * time.Second
  32. )
  33. // ErrNoManager is returned then a manager-only function is called on non-manager
  34. var ErrNoManager = fmt.Errorf("This node is not participating as a Swarm manager")
  35. // ErrNoSwarm is returned on leaving a cluster that was never initialized
  36. var ErrNoSwarm = fmt.Errorf("This node is not part of Swarm")
  37. // ErrSwarmExists is returned on initialize or join request for a cluster that has already been activated
  38. var ErrSwarmExists = fmt.Errorf("This node is already part of a Swarm cluster. Use \"docker swarm leave\" to leave this cluster and join another one.")
  39. // ErrPendingSwarmExists is returned on initialize or join request for a cluster that is already processing a similar request but has not succeeded yet.
  40. var ErrPendingSwarmExists = fmt.Errorf("This node is processing an existing join request that has not succeeded yet. Use \"docker swarm leave\" to cancel the current request.")
  41. // ErrSwarmJoinTimeoutReached is returned when cluster join could not complete before timeout was reached.
  42. var ErrSwarmJoinTimeoutReached = fmt.Errorf("Timeout was reached before node was joined. Attempt to join the cluster will continue in the background. Use \"docker info\" command to see the current Swarm status of your node.")
  43. type state struct {
  44. ListenAddr string
  45. }
  46. // Config provides values for Cluster.
  47. type Config struct {
  48. Root string
  49. Name string
  50. Backend executorpkg.Backend
  51. }
  52. // Cluster provides capabilities to pariticipate in a cluster as worker or a
  53. // manager and a worker.
  54. type Cluster struct {
  55. sync.RWMutex
  56. root string
  57. config Config
  58. configEvent chan struct{} // todo: make this array and goroutine safe
  59. node *swarmagent.Node
  60. conn *grpc.ClientConn
  61. client swarmapi.ControlClient
  62. ready bool
  63. listenAddr string
  64. err error
  65. reconnectDelay time.Duration
  66. stop bool
  67. cancelDelay func()
  68. }
  69. // New creates a new Cluster instance using provided config.
  70. func New(config Config) (*Cluster, error) {
  71. root := filepath.Join(config.Root, swarmDirName)
  72. if err := os.MkdirAll(root, 0700); err != nil {
  73. return nil, err
  74. }
  75. c := &Cluster{
  76. root: root,
  77. config: config,
  78. configEvent: make(chan struct{}, 10),
  79. reconnectDelay: initialReconnectDelay,
  80. }
  81. dt, err := ioutil.ReadFile(filepath.Join(root, stateFile))
  82. if err != nil {
  83. if os.IsNotExist(err) {
  84. return c, nil
  85. }
  86. return nil, err
  87. }
  88. var st state
  89. if err := json.Unmarshal(dt, &st); err != nil {
  90. return nil, err
  91. }
  92. n, ctx, err := c.startNewNode(false, st.ListenAddr, "", "", "", false)
  93. if err != nil {
  94. return nil, err
  95. }
  96. select {
  97. case <-time.After(swarmConnectTimeout):
  98. logrus.Errorf("swarm component could not be started before timeout was reached")
  99. case <-n.Ready():
  100. case <-ctx.Done():
  101. }
  102. if ctx.Err() != nil {
  103. return nil, fmt.Errorf("swarm component could not be started")
  104. }
  105. go c.reconnectOnFailure(ctx)
  106. return c, nil
  107. }
  108. func (c *Cluster) saveState() error {
  109. dt, err := json.Marshal(state{ListenAddr: c.listenAddr})
  110. if err != nil {
  111. return err
  112. }
  113. return ioutils.AtomicWriteFile(filepath.Join(c.root, stateFile), dt, 0600)
  114. }
  115. func (c *Cluster) reconnectOnFailure(ctx context.Context) {
  116. for {
  117. <-ctx.Done()
  118. c.Lock()
  119. if c.stop || c.node != nil {
  120. c.Unlock()
  121. return
  122. }
  123. c.reconnectDelay *= 2
  124. if c.reconnectDelay > maxReconnectDelay {
  125. c.reconnectDelay = maxReconnectDelay
  126. }
  127. logrus.Warnf("Restarting swarm in %.2f seconds", c.reconnectDelay.Seconds())
  128. delayCtx, cancel := context.WithTimeout(context.Background(), c.reconnectDelay)
  129. c.cancelDelay = cancel
  130. c.Unlock()
  131. <-delayCtx.Done()
  132. if delayCtx.Err() != context.DeadlineExceeded {
  133. return
  134. }
  135. c.Lock()
  136. if c.node != nil {
  137. c.Unlock()
  138. return
  139. }
  140. var err error
  141. _, ctx, err = c.startNewNode(false, c.listenAddr, c.getRemoteAddress(), "", "", false)
  142. if err != nil {
  143. c.err = err
  144. ctx = delayCtx
  145. }
  146. c.Unlock()
  147. }
  148. }
  149. func (c *Cluster) startNewNode(forceNewCluster bool, listenAddr, joinAddr, secret, cahash string, ismanager bool) (*swarmagent.Node, context.Context, error) {
  150. if err := c.config.Backend.IsSwarmCompatible(); err != nil {
  151. return nil, nil, err
  152. }
  153. c.node = nil
  154. c.cancelDelay = nil
  155. node, err := swarmagent.NewNode(&swarmagent.NodeConfig{
  156. Hostname: c.config.Name,
  157. ForceNewCluster: forceNewCluster,
  158. ListenControlAPI: filepath.Join(c.root, controlSocket),
  159. ListenRemoteAPI: listenAddr,
  160. JoinAddr: joinAddr,
  161. StateDir: c.root,
  162. CAHash: cahash,
  163. Secret: secret,
  164. Executor: container.NewExecutor(c.config.Backend),
  165. HeartbeatTick: 1,
  166. ElectionTick: 3,
  167. IsManager: ismanager,
  168. })
  169. if err != nil {
  170. return nil, nil, err
  171. }
  172. ctx, cancel := context.WithCancel(context.Background())
  173. if err := node.Start(ctx); err != nil {
  174. return nil, nil, err
  175. }
  176. c.node = node
  177. c.listenAddr = listenAddr
  178. c.saveState()
  179. c.config.Backend.SetClusterProvider(c)
  180. go func() {
  181. err := node.Err(ctx)
  182. if err != nil {
  183. logrus.Errorf("cluster exited with error: %v", err)
  184. }
  185. c.Lock()
  186. c.conn = nil
  187. c.client = nil
  188. c.node = nil
  189. c.ready = false
  190. c.err = err
  191. c.Unlock()
  192. cancel()
  193. }()
  194. go func() {
  195. select {
  196. case <-node.Ready():
  197. c.Lock()
  198. c.reconnectDelay = initialReconnectDelay
  199. c.Unlock()
  200. case <-ctx.Done():
  201. }
  202. if ctx.Err() == nil {
  203. c.Lock()
  204. c.ready = true
  205. c.err = nil
  206. c.Unlock()
  207. }
  208. c.configEvent <- struct{}{}
  209. }()
  210. go func() {
  211. for conn := range node.ListenControlSocket(ctx) {
  212. c.Lock()
  213. if c.conn != conn {
  214. c.client = swarmapi.NewControlClient(conn)
  215. }
  216. if c.conn != nil {
  217. c.client = nil
  218. }
  219. c.conn = conn
  220. c.Unlock()
  221. c.configEvent <- struct{}{}
  222. }
  223. }()
  224. return node, ctx, nil
  225. }
  226. // Init initializes new cluster from user provided request.
  227. func (c *Cluster) Init(req types.InitRequest) (string, error) {
  228. c.Lock()
  229. if node := c.node; node != nil {
  230. c.Unlock()
  231. if !req.ForceNewCluster {
  232. return "", errSwarmExists(node)
  233. }
  234. ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
  235. defer cancel()
  236. c.cancelReconnect()
  237. if err := c.node.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") {
  238. return "", err
  239. }
  240. c.Lock()
  241. c.node = nil
  242. c.conn = nil
  243. c.ready = false
  244. }
  245. // todo: check current state existing
  246. n, ctx, err := c.startNewNode(req.ForceNewCluster, req.ListenAddr, "", "", "", false)
  247. if err != nil {
  248. c.Unlock()
  249. return "", err
  250. }
  251. c.Unlock()
  252. select {
  253. case <-n.Ready():
  254. if err := initAcceptancePolicy(n, req.Spec.AcceptancePolicy); err != nil {
  255. return "", err
  256. }
  257. go c.reconnectOnFailure(ctx)
  258. return n.NodeID(), nil
  259. case <-ctx.Done():
  260. c.RLock()
  261. defer c.RUnlock()
  262. if c.err != nil {
  263. if !req.ForceNewCluster { // if failure on first attempt don't keep state
  264. if err := c.clearState(); err != nil {
  265. return "", err
  266. }
  267. }
  268. return "", c.err
  269. }
  270. return "", ctx.Err()
  271. }
  272. }
  273. // Join makes current Cluster part of an existing swarm cluster.
  274. func (c *Cluster) Join(req types.JoinRequest) error {
  275. c.Lock()
  276. if node := c.node; node != nil {
  277. c.Unlock()
  278. return errSwarmExists(node)
  279. }
  280. // todo: check current state existing
  281. if len(req.RemoteAddrs) == 0 {
  282. return fmt.Errorf("at least 1 RemoteAddr is required to join")
  283. }
  284. n, ctx, err := c.startNewNode(false, req.ListenAddr, req.RemoteAddrs[0], req.Secret, req.CACertHash, req.Manager)
  285. if err != nil {
  286. c.Unlock()
  287. return err
  288. }
  289. c.Unlock()
  290. certificateRequested := n.CertificateRequested()
  291. for {
  292. select {
  293. case <-certificateRequested:
  294. if n.NodeMembership() == swarmapi.NodeMembershipPending {
  295. return fmt.Errorf("Your node is in the process of joining the cluster but needs to be accepted by existing cluster member.\nTo accept this node into cluster run \"docker node accept %v\" in an existing cluster manager. Use \"docker info\" command to see the current Swarm status of your node.", n.NodeID())
  296. }
  297. certificateRequested = nil
  298. case <-time.After(swarmConnectTimeout):
  299. // attempt to connect will continue in background, also reconnecting
  300. go c.reconnectOnFailure(ctx)
  301. return ErrSwarmJoinTimeoutReached
  302. case <-n.Ready():
  303. go c.reconnectOnFailure(ctx)
  304. return nil
  305. case <-ctx.Done():
  306. c.RLock()
  307. defer c.RUnlock()
  308. if c.err != nil {
  309. return c.err
  310. }
  311. return ctx.Err()
  312. }
  313. }
  314. }
  315. func (c *Cluster) cancelReconnect() {
  316. c.stop = true
  317. if c.cancelDelay != nil {
  318. c.cancelDelay()
  319. c.cancelDelay = nil
  320. }
  321. }
  322. // Leave shuts down Cluster and removes current state.
  323. func (c *Cluster) Leave(force bool) error {
  324. c.Lock()
  325. node := c.node
  326. if node == nil {
  327. c.Unlock()
  328. return ErrNoSwarm
  329. }
  330. if node.Manager() != nil && !force {
  331. msg := "You are attempting to leave cluster on a node that is participating as a manager. "
  332. if c.isActiveManager() {
  333. active, reachable, unreachable, err := c.managerStats()
  334. if err == nil {
  335. if active && reachable-2 <= unreachable {
  336. if reachable == 1 && unreachable == 0 {
  337. msg += "Leaving last manager will remove all current state of the cluster. Use `--force` to ignore this message. "
  338. c.Unlock()
  339. return fmt.Errorf(msg)
  340. }
  341. msg += fmt.Sprintf("Leaving cluster will leave you with %v managers out of %v. This means Raft quorum will be lost and your cluster will become inaccessible. ", reachable-1, reachable+unreachable)
  342. }
  343. }
  344. } else {
  345. msg += "Doing so may lose the consenus of your cluster. "
  346. }
  347. msg += "Only way to restore a cluster that has lost consensus is to reinitialize it with `--force-new-cluster`. Use `--force` to ignore this message."
  348. c.Unlock()
  349. return fmt.Errorf(msg)
  350. }
  351. c.cancelReconnect()
  352. c.Unlock()
  353. ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
  354. defer cancel()
  355. if err := node.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") {
  356. return err
  357. }
  358. nodeID := node.NodeID()
  359. for _, id := range c.config.Backend.ListContainersForNode(nodeID) {
  360. if err := c.config.Backend.ContainerRm(id, &apitypes.ContainerRmConfig{ForceRemove: true}); err != nil {
  361. logrus.Errorf("error removing %v: %v", id, err)
  362. }
  363. }
  364. c.Lock()
  365. defer c.Unlock()
  366. c.node = nil
  367. c.conn = nil
  368. c.ready = false
  369. c.configEvent <- struct{}{}
  370. // todo: cleanup optional?
  371. if err := c.clearState(); err != nil {
  372. return err
  373. }
  374. return nil
  375. }
  376. func (c *Cluster) clearState() error {
  377. if err := os.RemoveAll(c.root); err != nil {
  378. return err
  379. }
  380. if err := os.MkdirAll(c.root, 0700); err != nil {
  381. return err
  382. }
  383. c.config.Backend.SetClusterProvider(nil)
  384. return nil
  385. }
  386. func (c *Cluster) getRequestContext() context.Context { // TODO: not needed when requests don't block on qourum lost
  387. ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
  388. return ctx
  389. }
  390. // Inspect retrives the confuguration properties of managed swarm cluster.
  391. func (c *Cluster) Inspect() (types.Swarm, error) {
  392. c.RLock()
  393. defer c.RUnlock()
  394. if !c.isActiveManager() {
  395. return types.Swarm{}, ErrNoManager
  396. }
  397. swarm, err := getSwarm(c.getRequestContext(), c.client)
  398. if err != nil {
  399. return types.Swarm{}, err
  400. }
  401. if err != nil {
  402. return types.Swarm{}, err
  403. }
  404. return convert.SwarmFromGRPC(*swarm), nil
  405. }
  406. // Update updates configuration of a managed swarm cluster.
  407. func (c *Cluster) Update(version uint64, spec types.Spec) error {
  408. c.RLock()
  409. defer c.RUnlock()
  410. if !c.isActiveManager() {
  411. return ErrNoManager
  412. }
  413. swarm, err := getSwarm(c.getRequestContext(), c.client)
  414. if err != nil {
  415. return err
  416. }
  417. swarmSpec, err := convert.SwarmSpecToGRPCandMerge(spec, &swarm.Spec)
  418. if err != nil {
  419. return err
  420. }
  421. _, err = c.client.UpdateCluster(
  422. c.getRequestContext(),
  423. &swarmapi.UpdateClusterRequest{
  424. ClusterID: swarm.ID,
  425. Spec: &swarmSpec,
  426. ClusterVersion: &swarmapi.Version{
  427. Index: version,
  428. },
  429. },
  430. )
  431. return err
  432. }
  433. // IsManager returns true is Cluster is participating as a manager.
  434. func (c *Cluster) IsManager() bool {
  435. c.RLock()
  436. defer c.RUnlock()
  437. return c.isActiveManager()
  438. }
  439. // IsAgent returns true is Cluster is participating as a worker/agent.
  440. func (c *Cluster) IsAgent() bool {
  441. c.RLock()
  442. defer c.RUnlock()
  443. return c.ready
  444. }
  445. // GetListenAddress returns the listening address for current maanger's
  446. // consensus and dispatcher APIs.
  447. func (c *Cluster) GetListenAddress() string {
  448. c.RLock()
  449. defer c.RUnlock()
  450. if c.conn != nil {
  451. return c.listenAddr
  452. }
  453. return ""
  454. }
  455. // GetRemoteAddress returns a known advertise address of a remote maanger if
  456. // available.
  457. // todo: change to array/connect with info
  458. func (c *Cluster) GetRemoteAddress() string {
  459. c.RLock()
  460. defer c.RUnlock()
  461. return c.getRemoteAddress()
  462. }
  463. func (c *Cluster) getRemoteAddress() string {
  464. if c.node == nil {
  465. return ""
  466. }
  467. nodeID := c.node.NodeID()
  468. for _, r := range c.node.Remotes() {
  469. if r.NodeID != nodeID {
  470. return r.Addr
  471. }
  472. }
  473. return ""
  474. }
  475. // ListenClusterEvents returns a channel that receives messages on cluster
  476. // participation changes.
  477. // todo: make cancelable and accessible to multiple callers
  478. func (c *Cluster) ListenClusterEvents() <-chan struct{} {
  479. return c.configEvent
  480. }
  481. // Info returns information about the current cluster state.
  482. func (c *Cluster) Info() types.Info {
  483. var info types.Info
  484. c.RLock()
  485. defer c.RUnlock()
  486. if c.node == nil {
  487. info.LocalNodeState = types.LocalNodeStateInactive
  488. if c.cancelDelay != nil {
  489. info.LocalNodeState = types.LocalNodeStateError
  490. }
  491. } else {
  492. info.LocalNodeState = types.LocalNodeStatePending
  493. if c.ready == true {
  494. info.LocalNodeState = types.LocalNodeStateActive
  495. }
  496. }
  497. if c.err != nil {
  498. info.Error = c.err.Error()
  499. }
  500. if c.isActiveManager() {
  501. info.ControlAvailable = true
  502. if r, err := c.client.ListNodes(c.getRequestContext(), &swarmapi.ListNodesRequest{}); err == nil {
  503. info.Nodes = len(r.Nodes)
  504. for _, n := range r.Nodes {
  505. if n.ManagerStatus != nil {
  506. info.Managers = info.Managers + 1
  507. }
  508. }
  509. }
  510. if swarm, err := getSwarm(c.getRequestContext(), c.client); err == nil && swarm != nil {
  511. info.CACertHash = swarm.RootCA.CACertHash
  512. }
  513. }
  514. if c.node != nil {
  515. for _, r := range c.node.Remotes() {
  516. info.RemoteManagers = append(info.RemoteManagers, types.Peer{NodeID: r.NodeID, Addr: r.Addr})
  517. }
  518. info.NodeID = c.node.NodeID()
  519. }
  520. return info
  521. }
  522. // isActiveManager should not be called without a read lock
  523. func (c *Cluster) isActiveManager() bool {
  524. return c.conn != nil
  525. }
  526. // GetServices returns all services of a managed swarm cluster.
  527. func (c *Cluster) GetServices(options apitypes.ServiceListOptions) ([]types.Service, error) {
  528. c.RLock()
  529. defer c.RUnlock()
  530. if !c.isActiveManager() {
  531. return nil, ErrNoManager
  532. }
  533. filters, err := newListServicesFilters(options.Filter)
  534. if err != nil {
  535. return nil, err
  536. }
  537. r, err := c.client.ListServices(
  538. c.getRequestContext(),
  539. &swarmapi.ListServicesRequest{Filters: filters})
  540. if err != nil {
  541. return nil, err
  542. }
  543. var services []types.Service
  544. for _, service := range r.Services {
  545. services = append(services, convert.ServiceFromGRPC(*service))
  546. }
  547. return services, nil
  548. }
  549. // CreateService creates a new service in a managed swarm cluster.
  550. func (c *Cluster) CreateService(s types.ServiceSpec) (string, error) {
  551. c.RLock()
  552. defer c.RUnlock()
  553. if !c.isActiveManager() {
  554. return "", ErrNoManager
  555. }
  556. ctx := c.getRequestContext()
  557. err := populateNetworkID(ctx, c.client, &s)
  558. if err != nil {
  559. return "", err
  560. }
  561. serviceSpec, err := convert.ServiceSpecToGRPC(s)
  562. if err != nil {
  563. return "", err
  564. }
  565. r, err := c.client.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
  566. if err != nil {
  567. return "", err
  568. }
  569. return r.Service.ID, nil
  570. }
  571. // GetService returns a service based on a ID or name.
  572. func (c *Cluster) GetService(input string) (types.Service, error) {
  573. c.RLock()
  574. defer c.RUnlock()
  575. if !c.isActiveManager() {
  576. return types.Service{}, ErrNoManager
  577. }
  578. service, err := getService(c.getRequestContext(), c.client, input)
  579. if err != nil {
  580. return types.Service{}, err
  581. }
  582. return convert.ServiceFromGRPC(*service), nil
  583. }
  584. // UpdateService updates existing service to match new properties.
  585. func (c *Cluster) UpdateService(serviceID string, version uint64, spec types.ServiceSpec) error {
  586. c.RLock()
  587. defer c.RUnlock()
  588. if !c.isActiveManager() {
  589. return ErrNoManager
  590. }
  591. serviceSpec, err := convert.ServiceSpecToGRPC(spec)
  592. if err != nil {
  593. return err
  594. }
  595. _, err = c.client.UpdateService(
  596. c.getRequestContext(),
  597. &swarmapi.UpdateServiceRequest{
  598. ServiceID: serviceID,
  599. Spec: &serviceSpec,
  600. ServiceVersion: &swarmapi.Version{
  601. Index: version,
  602. },
  603. },
  604. )
  605. return err
  606. }
  607. // RemoveService removes a service from a managed swarm cluster.
  608. func (c *Cluster) RemoveService(input string) error {
  609. c.RLock()
  610. defer c.RUnlock()
  611. if !c.isActiveManager() {
  612. return ErrNoManager
  613. }
  614. service, err := getService(c.getRequestContext(), c.client, input)
  615. if err != nil {
  616. return err
  617. }
  618. if _, err := c.client.RemoveService(c.getRequestContext(), &swarmapi.RemoveServiceRequest{ServiceID: service.ID}); err != nil {
  619. return err
  620. }
  621. return nil
  622. }
  623. // GetNodes returns a list of all nodes known to a cluster.
  624. func (c *Cluster) GetNodes(options apitypes.NodeListOptions) ([]types.Node, error) {
  625. c.RLock()
  626. defer c.RUnlock()
  627. if !c.isActiveManager() {
  628. return nil, ErrNoManager
  629. }
  630. filters, err := newListNodesFilters(options.Filter)
  631. if err != nil {
  632. return nil, err
  633. }
  634. r, err := c.client.ListNodes(
  635. c.getRequestContext(),
  636. &swarmapi.ListNodesRequest{Filters: filters})
  637. if err != nil {
  638. return nil, err
  639. }
  640. nodes := []types.Node{}
  641. for _, node := range r.Nodes {
  642. nodes = append(nodes, convert.NodeFromGRPC(*node))
  643. }
  644. return nodes, nil
  645. }
  646. // GetNode returns a node based on a ID or name.
  647. func (c *Cluster) GetNode(input string) (types.Node, error) {
  648. c.RLock()
  649. defer c.RUnlock()
  650. if !c.isActiveManager() {
  651. return types.Node{}, ErrNoManager
  652. }
  653. node, err := getNode(c.getRequestContext(), c.client, input)
  654. if err != nil {
  655. return types.Node{}, err
  656. }
  657. return convert.NodeFromGRPC(*node), nil
  658. }
  659. // UpdateNode updates existing nodes properties.
  660. func (c *Cluster) UpdateNode(nodeID string, version uint64, spec types.NodeSpec) error {
  661. c.RLock()
  662. defer c.RUnlock()
  663. if !c.isActiveManager() {
  664. return ErrNoManager
  665. }
  666. nodeSpec, err := convert.NodeSpecToGRPC(spec)
  667. if err != nil {
  668. return err
  669. }
  670. _, err = c.client.UpdateNode(
  671. c.getRequestContext(),
  672. &swarmapi.UpdateNodeRequest{
  673. NodeID: nodeID,
  674. Spec: &nodeSpec,
  675. NodeVersion: &swarmapi.Version{
  676. Index: version,
  677. },
  678. },
  679. )
  680. return err
  681. }
  682. // RemoveNode removes a node from a cluster
  683. func (c *Cluster) RemoveNode(input string) error {
  684. c.RLock()
  685. defer c.RUnlock()
  686. if !c.isActiveManager() {
  687. return ErrNoManager
  688. }
  689. ctx := c.getRequestContext()
  690. node, err := getNode(ctx, c.client, input)
  691. if err != nil {
  692. return err
  693. }
  694. if _, err := c.client.RemoveNode(ctx, &swarmapi.RemoveNodeRequest{NodeID: node.ID}); err != nil {
  695. return err
  696. }
  697. return nil
  698. }
  699. // GetTasks returns a list of tasks matching the filter options.
  700. func (c *Cluster) GetTasks(options apitypes.TaskListOptions) ([]types.Task, error) {
  701. c.RLock()
  702. defer c.RUnlock()
  703. if !c.isActiveManager() {
  704. return nil, ErrNoManager
  705. }
  706. filters, err := newListTasksFilters(options.Filter)
  707. if err != nil {
  708. return nil, err
  709. }
  710. r, err := c.client.ListTasks(
  711. c.getRequestContext(),
  712. &swarmapi.ListTasksRequest{Filters: filters})
  713. if err != nil {
  714. return nil, err
  715. }
  716. tasks := []types.Task{}
  717. for _, task := range r.Tasks {
  718. tasks = append(tasks, convert.TaskFromGRPC(*task))
  719. }
  720. return tasks, nil
  721. }
  722. // GetTask returns a task by an ID.
  723. func (c *Cluster) GetTask(input string) (types.Task, error) {
  724. c.RLock()
  725. defer c.RUnlock()
  726. if !c.isActiveManager() {
  727. return types.Task{}, ErrNoManager
  728. }
  729. task, err := getTask(c.getRequestContext(), c.client, input)
  730. if err != nil {
  731. return types.Task{}, err
  732. }
  733. return convert.TaskFromGRPC(*task), nil
  734. }
  735. // GetNetwork returns a cluster network by ID.
  736. func (c *Cluster) GetNetwork(input string) (apitypes.NetworkResource, error) {
  737. c.RLock()
  738. defer c.RUnlock()
  739. if !c.isActiveManager() {
  740. return apitypes.NetworkResource{}, ErrNoManager
  741. }
  742. network, err := getNetwork(c.getRequestContext(), c.client, input)
  743. if err != nil {
  744. return apitypes.NetworkResource{}, err
  745. }
  746. return convert.BasicNetworkFromGRPC(*network), nil
  747. }
  748. // GetNetworks returns all current cluster managed networks.
  749. func (c *Cluster) GetNetworks() ([]apitypes.NetworkResource, error) {
  750. c.RLock()
  751. defer c.RUnlock()
  752. if !c.isActiveManager() {
  753. return nil, ErrNoManager
  754. }
  755. r, err := c.client.ListNetworks(c.getRequestContext(), &swarmapi.ListNetworksRequest{})
  756. if err != nil {
  757. return nil, err
  758. }
  759. var networks []apitypes.NetworkResource
  760. for _, network := range r.Networks {
  761. networks = append(networks, convert.BasicNetworkFromGRPC(*network))
  762. }
  763. return networks, nil
  764. }
  765. // CreateNetwork creates a new cluster managed network.
  766. func (c *Cluster) CreateNetwork(s apitypes.NetworkCreateRequest) (string, error) {
  767. c.RLock()
  768. defer c.RUnlock()
  769. if !c.isActiveManager() {
  770. return "", ErrNoManager
  771. }
  772. if runconfig.IsPreDefinedNetwork(s.Name) {
  773. err := fmt.Errorf("%s is a pre-defined network and cannot be created", s.Name)
  774. return "", errors.NewRequestForbiddenError(err)
  775. }
  776. networkSpec := convert.BasicNetworkCreateToGRPC(s)
  777. r, err := c.client.CreateNetwork(c.getRequestContext(), &swarmapi.CreateNetworkRequest{Spec: &networkSpec})
  778. if err != nil {
  779. return "", err
  780. }
  781. return r.Network.ID, nil
  782. }
  783. // RemoveNetwork removes a cluster network.
  784. func (c *Cluster) RemoveNetwork(input string) error {
  785. c.RLock()
  786. defer c.RUnlock()
  787. if !c.isActiveManager() {
  788. return ErrNoManager
  789. }
  790. network, err := getNetwork(c.getRequestContext(), c.client, input)
  791. if err != nil {
  792. return err
  793. }
  794. if _, err := c.client.RemoveNetwork(c.getRequestContext(), &swarmapi.RemoveNetworkRequest{NetworkID: network.ID}); err != nil {
  795. return err
  796. }
  797. return nil
  798. }
  799. func populateNetworkID(ctx context.Context, c swarmapi.ControlClient, s *types.ServiceSpec) error {
  800. for i, n := range s.Networks {
  801. apiNetwork, err := getNetwork(ctx, c, n.Target)
  802. if err != nil {
  803. return err
  804. }
  805. s.Networks[i].Target = apiNetwork.ID
  806. }
  807. return nil
  808. }
  809. func getNetwork(ctx context.Context, c swarmapi.ControlClient, input string) (*swarmapi.Network, error) {
  810. // GetNetwork to match via full ID.
  811. rg, err := c.GetNetwork(ctx, &swarmapi.GetNetworkRequest{NetworkID: input})
  812. if err != nil {
  813. // If any error (including NotFound), ListNetworks to match via ID prefix and full name.
  814. rl, err := c.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: &swarmapi.ListNetworksRequest_Filters{Names: []string{input}}})
  815. if err != nil || len(rl.Networks) == 0 {
  816. rl, err = c.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: &swarmapi.ListNetworksRequest_Filters{IDPrefixes: []string{input}}})
  817. }
  818. if err != nil {
  819. return nil, err
  820. }
  821. if len(rl.Networks) == 0 {
  822. return nil, fmt.Errorf("network %s not found", input)
  823. }
  824. if l := len(rl.Networks); l > 1 {
  825. return nil, fmt.Errorf("network %s is ambigious (%d matches found)", input, l)
  826. }
  827. return rl.Networks[0], nil
  828. }
  829. return rg.Network, nil
  830. }
  831. // Cleanup stops active swarm node. This is run before daemon shutdown.
  832. func (c *Cluster) Cleanup() {
  833. c.Lock()
  834. node := c.node
  835. if node == nil {
  836. c.Unlock()
  837. return
  838. }
  839. if c.isActiveManager() {
  840. active, reachable, unreachable, err := c.managerStats()
  841. if err == nil {
  842. singlenode := active && reachable == 1 && unreachable == 0
  843. if active && !singlenode && reachable-2 <= unreachable {
  844. logrus.Errorf("Leaving cluster with %v managers left out of %v. Raft quorum will be lost.", reachable-1, reachable+unreachable)
  845. }
  846. }
  847. }
  848. c.cancelReconnect()
  849. c.Unlock()
  850. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  851. defer cancel()
  852. if err := node.Stop(ctx); err != nil {
  853. logrus.Errorf("error cleaning up cluster: %v", err)
  854. }
  855. c.Lock()
  856. c.node = nil
  857. c.ready = false
  858. c.conn = nil
  859. c.Unlock()
  860. }
  861. func (c *Cluster) managerStats() (current bool, reachable int, unreachable int, err error) {
  862. ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
  863. nodes, err := c.client.ListNodes(ctx, &swarmapi.ListNodesRequest{})
  864. if err != nil {
  865. return false, 0, 0, err
  866. }
  867. for _, n := range nodes.Nodes {
  868. if n.ManagerStatus != nil {
  869. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_REACHABLE {
  870. reachable++
  871. if n.ID == c.node.NodeID() {
  872. current = true
  873. }
  874. }
  875. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_UNREACHABLE {
  876. unreachable++
  877. }
  878. }
  879. }
  880. return
  881. }
  882. func errSwarmExists(node *swarmagent.Node) error {
  883. if node.NodeMembership() != swarmapi.NodeMembershipAccepted {
  884. return ErrPendingSwarmExists
  885. }
  886. return ErrSwarmExists
  887. }
  888. func initAcceptancePolicy(node *swarmagent.Node, acceptancePolicy types.AcceptancePolicy) error {
  889. ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
  890. for conn := range node.ListenControlSocket(ctx) {
  891. if ctx.Err() != nil {
  892. return ctx.Err()
  893. }
  894. if conn != nil {
  895. client := swarmapi.NewControlClient(conn)
  896. var cluster *swarmapi.Cluster
  897. for i := 0; ; i++ {
  898. lcr, err := client.ListClusters(ctx, &swarmapi.ListClustersRequest{})
  899. if err != nil {
  900. return fmt.Errorf("error on listing clusters: %v", err)
  901. }
  902. if len(lcr.Clusters) == 0 {
  903. if i < 10 {
  904. time.Sleep(200 * time.Millisecond)
  905. continue
  906. }
  907. return fmt.Errorf("empty list of clusters was returned")
  908. }
  909. cluster = lcr.Clusters[0]
  910. break
  911. }
  912. spec := &cluster.Spec
  913. if err := convert.SwarmSpecUpdateAcceptancePolicy(spec, acceptancePolicy, nil); err != nil {
  914. return fmt.Errorf("error updating cluster settings: %v", err)
  915. }
  916. _, err := client.UpdateCluster(ctx, &swarmapi.UpdateClusterRequest{
  917. ClusterID: cluster.ID,
  918. ClusterVersion: &cluster.Meta.Version,
  919. Spec: spec,
  920. })
  921. if err != nil {
  922. return fmt.Errorf("error updating cluster settings: %v", err)
  923. }
  924. return nil
  925. }
  926. }
  927. return ctx.Err()
  928. }