cluster.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056
  1. package cluster
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io/ioutil"
  6. "os"
  7. "path/filepath"
  8. "strings"
  9. "sync"
  10. "time"
  11. "google.golang.org/grpc"
  12. "github.com/Sirupsen/logrus"
  13. "github.com/docker/docker/daemon/cluster/convert"
  14. executorpkg "github.com/docker/docker/daemon/cluster/executor"
  15. "github.com/docker/docker/daemon/cluster/executor/container"
  16. "github.com/docker/docker/errors"
  17. "github.com/docker/docker/pkg/ioutils"
  18. "github.com/docker/docker/runconfig"
  19. apitypes "github.com/docker/engine-api/types"
  20. types "github.com/docker/engine-api/types/swarm"
  21. swarmagent "github.com/docker/swarmkit/agent"
  22. swarmapi "github.com/docker/swarmkit/api"
  23. "golang.org/x/net/context"
  24. )
  25. const swarmDirName = "swarm"
  26. const controlSocket = "control.sock"
  27. const swarmConnectTimeout = 5 * time.Second
  28. const stateFile = "docker-state.json"
  29. const (
  30. initialReconnectDelay = 100 * time.Millisecond
  31. maxReconnectDelay = 10 * time.Second
  32. )
  33. // ErrNoManager is returned then a manager-only function is called on non-manager
  34. var ErrNoManager = fmt.Errorf("this node is not participating as a Swarm manager")
  35. // ErrNoSwarm is returned on leaving a cluster that was never initialized
  36. var ErrNoSwarm = fmt.Errorf("this node is not part of Swarm")
  37. // ErrSwarmExists is returned on initialize or join request for a cluster that has already been activated
  38. var ErrSwarmExists = fmt.Errorf("this node is already part of a Swarm")
  39. // ErrSwarmJoinTimeoutReached is returned when cluster join could not complete before timeout was reached.
  40. var ErrSwarmJoinTimeoutReached = fmt.Errorf("timeout reached before node was joined")
  41. type state struct {
  42. ListenAddr string
  43. }
  44. // Config provides values for Cluster.
  45. type Config struct {
  46. Root string
  47. Name string
  48. Backend executorpkg.Backend
  49. }
  50. // Cluster provides capabilities to pariticipate in a cluster as worker or a
  51. // manager and a worker.
  52. type Cluster struct {
  53. sync.RWMutex
  54. root string
  55. config Config
  56. configEvent chan struct{} // todo: make this array and goroutine safe
  57. node *swarmagent.Node
  58. conn *grpc.ClientConn
  59. client swarmapi.ControlClient
  60. ready bool
  61. listenAddr string
  62. err error
  63. reconnectDelay time.Duration
  64. stop bool
  65. cancelDelay func()
  66. }
  67. // New creates a new Cluster instance using provided config.
  68. func New(config Config) (*Cluster, error) {
  69. root := filepath.Join(config.Root, swarmDirName)
  70. if err := os.MkdirAll(root, 0700); err != nil {
  71. return nil, err
  72. }
  73. c := &Cluster{
  74. root: root,
  75. config: config,
  76. configEvent: make(chan struct{}, 10),
  77. reconnectDelay: initialReconnectDelay,
  78. }
  79. dt, err := ioutil.ReadFile(filepath.Join(root, stateFile))
  80. if err != nil {
  81. if os.IsNotExist(err) {
  82. return c, nil
  83. }
  84. return nil, err
  85. }
  86. var st state
  87. if err := json.Unmarshal(dt, &st); err != nil {
  88. return nil, err
  89. }
  90. n, ctx, err := c.startNewNode(false, st.ListenAddr, "", "", "", false)
  91. if err != nil {
  92. return nil, err
  93. }
  94. select {
  95. case <-time.After(swarmConnectTimeout):
  96. logrus.Errorf("swarm component could not be started before timeout was reached")
  97. case <-n.Ready(context.Background()):
  98. case <-ctx.Done():
  99. }
  100. if ctx.Err() != nil {
  101. return nil, fmt.Errorf("swarm component could not be started")
  102. }
  103. go c.reconnectOnFailure(ctx)
  104. return c, nil
  105. }
  106. func (c *Cluster) checkCompatibility() error {
  107. info, _ := c.config.Backend.SystemInfo()
  108. if info != nil && (info.ClusterStore != "" || info.ClusterAdvertise != "") {
  109. return fmt.Errorf("swarm mode is incompatible with `--cluster-store` and `--cluster-advertise daemon configuration")
  110. }
  111. return nil
  112. }
  113. func (c *Cluster) saveState() error {
  114. dt, err := json.Marshal(state{ListenAddr: c.listenAddr})
  115. if err != nil {
  116. return err
  117. }
  118. return ioutils.AtomicWriteFile(filepath.Join(c.root, stateFile), dt, 0600)
  119. }
  120. func (c *Cluster) reconnectOnFailure(ctx context.Context) {
  121. for {
  122. <-ctx.Done()
  123. c.Lock()
  124. if c.stop || c.node != nil {
  125. c.Unlock()
  126. return
  127. }
  128. c.reconnectDelay *= 2
  129. if c.reconnectDelay > maxReconnectDelay {
  130. c.reconnectDelay = maxReconnectDelay
  131. }
  132. logrus.Warnf("Restarting swarm in %.2f seconds", c.reconnectDelay.Seconds())
  133. delayCtx, cancel := context.WithTimeout(context.Background(), c.reconnectDelay)
  134. c.cancelDelay = cancel
  135. c.Unlock()
  136. <-delayCtx.Done()
  137. if delayCtx.Err() != context.DeadlineExceeded {
  138. return
  139. }
  140. c.Lock()
  141. if c.node != nil {
  142. c.Unlock()
  143. return
  144. }
  145. var err error
  146. _, ctx, err = c.startNewNode(false, c.listenAddr, c.getRemoteAddress(), "", "", false)
  147. if err != nil {
  148. c.err = err
  149. ctx = delayCtx
  150. }
  151. c.Unlock()
  152. }
  153. }
  154. func (c *Cluster) startNewNode(forceNewCluster bool, listenAddr, joinAddr, secret, cahash string, ismanager bool) (*swarmagent.Node, context.Context, error) {
  155. if err := c.checkCompatibility(); err != nil {
  156. return nil, nil, err
  157. }
  158. c.node = nil
  159. c.cancelDelay = nil
  160. node, err := swarmagent.NewNode(&swarmagent.NodeConfig{
  161. Hostname: c.config.Name,
  162. ForceNewCluster: forceNewCluster,
  163. ListenControlAPI: filepath.Join(c.root, controlSocket),
  164. ListenRemoteAPI: listenAddr,
  165. JoinAddr: joinAddr,
  166. StateDir: c.root,
  167. CAHash: cahash,
  168. Secret: secret,
  169. Executor: container.NewExecutor(c.config.Backend),
  170. HeartbeatTick: 1,
  171. ElectionTick: 3,
  172. IsManager: ismanager,
  173. })
  174. if err != nil {
  175. return nil, nil, err
  176. }
  177. ctx, cancel := context.WithCancel(context.Background())
  178. if err := node.Start(ctx); err != nil {
  179. return nil, nil, err
  180. }
  181. c.node = node
  182. c.listenAddr = listenAddr
  183. c.saveState()
  184. c.config.Backend.SetClusterProvider(c)
  185. go func() {
  186. err := node.Err(ctx)
  187. if err != nil {
  188. logrus.Errorf("cluster exited with error: %v", err)
  189. }
  190. c.Lock()
  191. c.conn = nil
  192. c.client = nil
  193. c.node = nil
  194. c.ready = false
  195. c.err = err
  196. c.Unlock()
  197. cancel()
  198. }()
  199. go func() {
  200. select {
  201. case <-node.Ready(context.Background()):
  202. c.Lock()
  203. c.reconnectDelay = initialReconnectDelay
  204. c.Unlock()
  205. case <-ctx.Done():
  206. }
  207. if ctx.Err() == nil {
  208. c.Lock()
  209. c.ready = true
  210. c.err = nil
  211. c.Unlock()
  212. }
  213. c.configEvent <- struct{}{}
  214. }()
  215. go func() {
  216. for conn := range node.ListenControlSocket(ctx) {
  217. c.Lock()
  218. if c.conn != conn {
  219. c.client = swarmapi.NewControlClient(conn)
  220. }
  221. if c.conn != nil {
  222. c.client = nil
  223. }
  224. c.conn = conn
  225. c.Unlock()
  226. c.configEvent <- struct{}{}
  227. }
  228. }()
  229. return node, ctx, nil
  230. }
  231. // Init initializes new cluster from user provided request.
  232. func (c *Cluster) Init(req types.InitRequest) (string, error) {
  233. c.Lock()
  234. if c.node != nil {
  235. c.Unlock()
  236. if !req.ForceNewCluster {
  237. return "", ErrSwarmExists
  238. }
  239. ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
  240. defer cancel()
  241. if err := c.node.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") {
  242. return "", err
  243. }
  244. c.Lock()
  245. c.node = nil
  246. c.conn = nil
  247. c.ready = false
  248. }
  249. // todo: check current state existing
  250. n, ctx, err := c.startNewNode(req.ForceNewCluster, req.ListenAddr, "", "", "", false)
  251. if err != nil {
  252. c.Unlock()
  253. return "", err
  254. }
  255. c.Unlock()
  256. select {
  257. case <-n.Ready(context.Background()):
  258. if err := initAcceptancePolicy(n, req.Spec.AcceptancePolicy); err != nil {
  259. return "", err
  260. }
  261. go c.reconnectOnFailure(ctx)
  262. return n.NodeID(), nil
  263. case <-ctx.Done():
  264. c.RLock()
  265. defer c.RUnlock()
  266. if c.err != nil {
  267. if !req.ForceNewCluster { // if failure on first attempt don't keep state
  268. if err := c.clearState(); err != nil {
  269. return "", err
  270. }
  271. }
  272. return "", c.err
  273. }
  274. return "", ctx.Err()
  275. }
  276. }
  277. // Join makes current Cluster part of an existing swarm cluster.
  278. func (c *Cluster) Join(req types.JoinRequest) error {
  279. c.Lock()
  280. if c.node != nil {
  281. c.Unlock()
  282. return ErrSwarmExists
  283. }
  284. // todo: check current state existing
  285. if len(req.RemoteAddrs) == 0 {
  286. return fmt.Errorf("at least 1 RemoteAddr is required to join")
  287. }
  288. n, ctx, err := c.startNewNode(false, req.ListenAddr, req.RemoteAddrs[0], req.Secret, req.CACertHash, req.Manager)
  289. if err != nil {
  290. c.Unlock()
  291. return err
  292. }
  293. c.Unlock()
  294. select {
  295. case <-time.After(swarmConnectTimeout):
  296. go c.reconnectOnFailure(ctx)
  297. if nodeid := n.NodeID(); nodeid != "" {
  298. return fmt.Errorf("Timeout reached before node was joined. Your cluster settings may be preventing this node from automatically joining. To accept this node into cluster run `docker node accept %v` in an existing cluster manager", nodeid)
  299. }
  300. return ErrSwarmJoinTimeoutReached
  301. case <-n.Ready(context.Background()):
  302. go c.reconnectOnFailure(ctx)
  303. return nil
  304. case <-ctx.Done():
  305. c.RLock()
  306. defer c.RUnlock()
  307. if c.err != nil {
  308. return c.err
  309. }
  310. return ctx.Err()
  311. }
  312. }
  313. func (c *Cluster) cancelReconnect() {
  314. c.stop = true
  315. if c.cancelDelay != nil {
  316. c.cancelDelay()
  317. c.cancelDelay = nil
  318. }
  319. }
  320. // Leave shuts down Cluster and removes current state.
  321. func (c *Cluster) Leave(force bool) error {
  322. c.Lock()
  323. node := c.node
  324. if node == nil {
  325. c.Unlock()
  326. return ErrNoSwarm
  327. }
  328. if node.Manager() != nil && !force {
  329. msg := "You are attempting to leave cluster on a node that is participating as a manager. "
  330. if c.isActiveManager() {
  331. active, reachable, unreachable, err := c.managerStats()
  332. if err == nil {
  333. if active && reachable-2 <= unreachable {
  334. if reachable == 1 && unreachable == 0 {
  335. msg += "Leaving last manager will remove all current state of the cluster. Use `--force` to ignore this message. "
  336. c.Unlock()
  337. return fmt.Errorf(msg)
  338. }
  339. msg += fmt.Sprintf("Leaving cluster will leave you with %v managers out of %v. This means Raft quorum will be lost and your cluster will become inaccessible. ", reachable-1, reachable+unreachable)
  340. }
  341. }
  342. } else {
  343. msg += "Doing so may lose the consenus of your cluster. "
  344. }
  345. msg += "Only way to restore a cluster that has lost consensus is to reinitialize it with `--force-new-cluster`. Use `--force` to ignore this message."
  346. c.Unlock()
  347. return fmt.Errorf(msg)
  348. }
  349. c.cancelReconnect()
  350. c.Unlock()
  351. ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
  352. defer cancel()
  353. if err := node.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") {
  354. return err
  355. }
  356. nodeID := node.NodeID()
  357. for _, id := range c.config.Backend.ListContainersForNode(nodeID) {
  358. if err := c.config.Backend.ContainerRm(id, &apitypes.ContainerRmConfig{ForceRemove: true}); err != nil {
  359. logrus.Errorf("error removing %v: %v", id, err)
  360. }
  361. }
  362. c.Lock()
  363. defer c.Unlock()
  364. c.node = nil
  365. c.conn = nil
  366. c.ready = false
  367. c.configEvent <- struct{}{}
  368. // todo: cleanup optional?
  369. if err := c.clearState(); err != nil {
  370. return err
  371. }
  372. return nil
  373. }
  374. func (c *Cluster) clearState() error {
  375. if err := os.RemoveAll(c.root); err != nil {
  376. return err
  377. }
  378. if err := os.MkdirAll(c.root, 0700); err != nil {
  379. return err
  380. }
  381. c.config.Backend.SetClusterProvider(nil)
  382. return nil
  383. }
  384. func (c *Cluster) getRequestContext() context.Context { // TODO: not needed when requests don't block on qourum lost
  385. ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
  386. return ctx
  387. }
  388. // Inspect retrives the confuguration properties of managed swarm cluster.
  389. func (c *Cluster) Inspect() (types.Swarm, error) {
  390. c.RLock()
  391. defer c.RUnlock()
  392. if !c.isActiveManager() {
  393. return types.Swarm{}, ErrNoManager
  394. }
  395. swarm, err := getSwarm(c.getRequestContext(), c.client)
  396. if err != nil {
  397. return types.Swarm{}, err
  398. }
  399. if err != nil {
  400. return types.Swarm{}, err
  401. }
  402. return convert.SwarmFromGRPC(*swarm), nil
  403. }
  404. // Update updates configuration of a managed swarm cluster.
  405. func (c *Cluster) Update(version uint64, spec types.Spec) error {
  406. c.RLock()
  407. defer c.RUnlock()
  408. if !c.isActiveManager() {
  409. return ErrNoManager
  410. }
  411. swarmSpec, err := convert.SwarmSpecToGRPC(spec)
  412. if err != nil {
  413. return err
  414. }
  415. swarm, err := getSwarm(c.getRequestContext(), c.client)
  416. if err != nil {
  417. return err
  418. }
  419. _, err = c.client.UpdateCluster(
  420. c.getRequestContext(),
  421. &swarmapi.UpdateClusterRequest{
  422. ClusterID: swarm.ID,
  423. Spec: &swarmSpec,
  424. ClusterVersion: &swarmapi.Version{
  425. Index: version,
  426. },
  427. },
  428. )
  429. return err
  430. }
  431. // IsManager returns true is Cluster is participating as a manager.
  432. func (c *Cluster) IsManager() bool {
  433. c.RLock()
  434. defer c.RUnlock()
  435. return c.isActiveManager()
  436. }
  437. // IsAgent returns true is Cluster is participating as a worker/agent.
  438. func (c *Cluster) IsAgent() bool {
  439. c.RLock()
  440. defer c.RUnlock()
  441. return c.ready
  442. }
  443. // GetListenAddress returns the listening address for current maanger's
  444. // consensus and dispatcher APIs.
  445. func (c *Cluster) GetListenAddress() string {
  446. c.RLock()
  447. defer c.RUnlock()
  448. if c.conn != nil {
  449. return c.listenAddr
  450. }
  451. return ""
  452. }
  453. // GetRemoteAddress returns a known advertise address of a remote maanger if
  454. // available.
  455. // todo: change to array/connect with info
  456. func (c *Cluster) GetRemoteAddress() string {
  457. c.RLock()
  458. defer c.RUnlock()
  459. return c.getRemoteAddress()
  460. }
  461. func (c *Cluster) getRemoteAddress() string {
  462. if c.node == nil {
  463. return ""
  464. }
  465. nodeID := c.node.NodeID()
  466. for _, r := range c.node.Remotes() {
  467. if r.NodeID != nodeID {
  468. return r.Addr
  469. }
  470. }
  471. return ""
  472. }
  473. // ListenClusterEvents returns a channel that receives messages on cluster
  474. // participation changes.
  475. // todo: make cancelable and accessible to multiple callers
  476. func (c *Cluster) ListenClusterEvents() <-chan struct{} {
  477. return c.configEvent
  478. }
  479. // Info returns information about the current cluster state.
  480. func (c *Cluster) Info() types.Info {
  481. var info types.Info
  482. c.RLock()
  483. defer c.RUnlock()
  484. if c.node == nil {
  485. info.LocalNodeState = types.LocalNodeStateInactive
  486. if c.cancelDelay != nil {
  487. info.LocalNodeState = types.LocalNodeStateError
  488. }
  489. } else {
  490. info.LocalNodeState = types.LocalNodeStatePending
  491. if c.ready == true {
  492. info.LocalNodeState = types.LocalNodeStateActive
  493. }
  494. }
  495. if c.err != nil {
  496. info.Error = c.err.Error()
  497. }
  498. if c.isActiveManager() {
  499. info.ControlAvailable = true
  500. if r, err := c.client.ListNodes(c.getRequestContext(), &swarmapi.ListNodesRequest{}); err == nil {
  501. info.Nodes = len(r.Nodes)
  502. for _, n := range r.Nodes {
  503. if n.ManagerStatus != nil {
  504. info.Managers = info.Managers + 1
  505. }
  506. }
  507. }
  508. if swarm, err := getSwarm(c.getRequestContext(), c.client); err == nil && swarm != nil {
  509. info.CACertHash = swarm.RootCA.CACertHash
  510. }
  511. }
  512. if c.node != nil {
  513. for _, r := range c.node.Remotes() {
  514. info.RemoteManagers = append(info.RemoteManagers, types.Peer{NodeID: r.NodeID, Addr: r.Addr})
  515. }
  516. info.NodeID = c.node.NodeID()
  517. }
  518. return info
  519. }
  520. // isActiveManager should not be called without a read lock
  521. func (c *Cluster) isActiveManager() bool {
  522. return c.conn != nil
  523. }
  524. // GetServices returns all services of a managed swarm cluster.
  525. func (c *Cluster) GetServices(options apitypes.ServiceListOptions) ([]types.Service, error) {
  526. c.RLock()
  527. defer c.RUnlock()
  528. if !c.isActiveManager() {
  529. return nil, ErrNoManager
  530. }
  531. filters, err := newListServicesFilters(options.Filter)
  532. if err != nil {
  533. return nil, err
  534. }
  535. r, err := c.client.ListServices(
  536. c.getRequestContext(),
  537. &swarmapi.ListServicesRequest{Filters: filters})
  538. if err != nil {
  539. return nil, err
  540. }
  541. var services []types.Service
  542. for _, service := range r.Services {
  543. services = append(services, convert.ServiceFromGRPC(*service))
  544. }
  545. return services, nil
  546. }
  547. // CreateService creates a new service in a managed swarm cluster.
  548. func (c *Cluster) CreateService(s types.ServiceSpec) (string, error) {
  549. c.RLock()
  550. defer c.RUnlock()
  551. if !c.isActiveManager() {
  552. return "", ErrNoManager
  553. }
  554. ctx := c.getRequestContext()
  555. err := populateNetworkID(ctx, c.client, &s)
  556. if err != nil {
  557. return "", err
  558. }
  559. serviceSpec, err := convert.ServiceSpecToGRPC(s)
  560. if err != nil {
  561. return "", err
  562. }
  563. r, err := c.client.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
  564. if err != nil {
  565. return "", err
  566. }
  567. return r.Service.ID, nil
  568. }
  569. // GetService returns a service based on a ID or name.
  570. func (c *Cluster) GetService(input string) (types.Service, error) {
  571. c.RLock()
  572. defer c.RUnlock()
  573. if !c.isActiveManager() {
  574. return types.Service{}, ErrNoManager
  575. }
  576. service, err := getService(c.getRequestContext(), c.client, input)
  577. if err != nil {
  578. return types.Service{}, err
  579. }
  580. return convert.ServiceFromGRPC(*service), nil
  581. }
  582. // UpdateService updates existing service to match new properties.
  583. func (c *Cluster) UpdateService(serviceID string, version uint64, spec types.ServiceSpec) error {
  584. c.RLock()
  585. defer c.RUnlock()
  586. if !c.isActiveManager() {
  587. return ErrNoManager
  588. }
  589. serviceSpec, err := convert.ServiceSpecToGRPC(spec)
  590. if err != nil {
  591. return err
  592. }
  593. _, err = c.client.UpdateService(
  594. c.getRequestContext(),
  595. &swarmapi.UpdateServiceRequest{
  596. ServiceID: serviceID,
  597. Spec: &serviceSpec,
  598. ServiceVersion: &swarmapi.Version{
  599. Index: version,
  600. },
  601. },
  602. )
  603. return err
  604. }
  605. // RemoveService removes a service from a managed swarm cluster.
  606. func (c *Cluster) RemoveService(input string) error {
  607. c.RLock()
  608. defer c.RUnlock()
  609. if !c.isActiveManager() {
  610. return ErrNoManager
  611. }
  612. service, err := getService(c.getRequestContext(), c.client, input)
  613. if err != nil {
  614. return err
  615. }
  616. if _, err := c.client.RemoveService(c.getRequestContext(), &swarmapi.RemoveServiceRequest{ServiceID: service.ID}); err != nil {
  617. return err
  618. }
  619. return nil
  620. }
  621. // GetNodes returns a list of all nodes known to a cluster.
  622. func (c *Cluster) GetNodes(options apitypes.NodeListOptions) ([]types.Node, error) {
  623. c.RLock()
  624. defer c.RUnlock()
  625. if !c.isActiveManager() {
  626. return nil, ErrNoManager
  627. }
  628. filters, err := newListNodesFilters(options.Filter)
  629. if err != nil {
  630. return nil, err
  631. }
  632. r, err := c.client.ListNodes(
  633. c.getRequestContext(),
  634. &swarmapi.ListNodesRequest{Filters: filters})
  635. if err != nil {
  636. return nil, err
  637. }
  638. nodes := []types.Node{}
  639. for _, node := range r.Nodes {
  640. nodes = append(nodes, convert.NodeFromGRPC(*node))
  641. }
  642. return nodes, nil
  643. }
  644. // GetNode returns a node based on a ID or name.
  645. func (c *Cluster) GetNode(input string) (types.Node, error) {
  646. c.RLock()
  647. defer c.RUnlock()
  648. if !c.isActiveManager() {
  649. return types.Node{}, ErrNoManager
  650. }
  651. node, err := getNode(c.getRequestContext(), c.client, input)
  652. if err != nil {
  653. return types.Node{}, err
  654. }
  655. return convert.NodeFromGRPC(*node), nil
  656. }
  657. // UpdateNode updates existing nodes properties.
  658. func (c *Cluster) UpdateNode(nodeID string, version uint64, spec types.NodeSpec) error {
  659. c.RLock()
  660. defer c.RUnlock()
  661. if !c.isActiveManager() {
  662. return ErrNoManager
  663. }
  664. nodeSpec, err := convert.NodeSpecToGRPC(spec)
  665. if err != nil {
  666. return err
  667. }
  668. _, err = c.client.UpdateNode(
  669. c.getRequestContext(),
  670. &swarmapi.UpdateNodeRequest{
  671. NodeID: nodeID,
  672. Spec: &nodeSpec,
  673. NodeVersion: &swarmapi.Version{
  674. Index: version,
  675. },
  676. },
  677. )
  678. return err
  679. }
  680. // RemoveNode removes a node from a cluster
  681. func (c *Cluster) RemoveNode(input string) error {
  682. c.RLock()
  683. defer c.RUnlock()
  684. if !c.isActiveManager() {
  685. return ErrNoManager
  686. }
  687. ctx := c.getRequestContext()
  688. node, err := getNode(ctx, c.client, input)
  689. if err != nil {
  690. return err
  691. }
  692. if _, err := c.client.RemoveNode(ctx, &swarmapi.RemoveNodeRequest{NodeID: node.ID}); err != nil {
  693. return err
  694. }
  695. return nil
  696. }
  697. // GetTasks returns a list of tasks matching the filter options.
  698. func (c *Cluster) GetTasks(options apitypes.TaskListOptions) ([]types.Task, error) {
  699. c.RLock()
  700. defer c.RUnlock()
  701. if !c.isActiveManager() {
  702. return nil, ErrNoManager
  703. }
  704. filters, err := newListTasksFilters(options.Filter)
  705. if err != nil {
  706. return nil, err
  707. }
  708. r, err := c.client.ListTasks(
  709. c.getRequestContext(),
  710. &swarmapi.ListTasksRequest{Filters: filters})
  711. if err != nil {
  712. return nil, err
  713. }
  714. tasks := []types.Task{}
  715. for _, task := range r.Tasks {
  716. tasks = append(tasks, convert.TaskFromGRPC(*task))
  717. }
  718. return tasks, nil
  719. }
  720. // GetTask returns a task by an ID.
  721. func (c *Cluster) GetTask(input string) (types.Task, error) {
  722. c.RLock()
  723. defer c.RUnlock()
  724. if !c.isActiveManager() {
  725. return types.Task{}, ErrNoManager
  726. }
  727. task, err := getTask(c.getRequestContext(), c.client, input)
  728. if err != nil {
  729. return types.Task{}, err
  730. }
  731. return convert.TaskFromGRPC(*task), nil
  732. }
  733. // GetNetwork returns a cluster network by ID.
  734. func (c *Cluster) GetNetwork(input string) (apitypes.NetworkResource, error) {
  735. c.RLock()
  736. defer c.RUnlock()
  737. if !c.isActiveManager() {
  738. return apitypes.NetworkResource{}, ErrNoManager
  739. }
  740. network, err := getNetwork(c.getRequestContext(), c.client, input)
  741. if err != nil {
  742. return apitypes.NetworkResource{}, err
  743. }
  744. return convert.BasicNetworkFromGRPC(*network), nil
  745. }
  746. // GetNetworks returns all current cluster managed networks.
  747. func (c *Cluster) GetNetworks() ([]apitypes.NetworkResource, error) {
  748. c.RLock()
  749. defer c.RUnlock()
  750. if !c.isActiveManager() {
  751. return nil, ErrNoManager
  752. }
  753. r, err := c.client.ListNetworks(c.getRequestContext(), &swarmapi.ListNetworksRequest{})
  754. if err != nil {
  755. return nil, err
  756. }
  757. var networks []apitypes.NetworkResource
  758. for _, network := range r.Networks {
  759. networks = append(networks, convert.BasicNetworkFromGRPC(*network))
  760. }
  761. return networks, nil
  762. }
  763. // CreateNetwork creates a new cluster managed network.
  764. func (c *Cluster) CreateNetwork(s apitypes.NetworkCreateRequest) (string, error) {
  765. c.RLock()
  766. defer c.RUnlock()
  767. if !c.isActiveManager() {
  768. return "", ErrNoManager
  769. }
  770. if runconfig.IsPreDefinedNetwork(s.Name) {
  771. err := fmt.Errorf("%s is a pre-defined network and cannot be created", s.Name)
  772. return "", errors.NewRequestForbiddenError(err)
  773. }
  774. networkSpec := convert.BasicNetworkCreateToGRPC(s)
  775. r, err := c.client.CreateNetwork(c.getRequestContext(), &swarmapi.CreateNetworkRequest{Spec: &networkSpec})
  776. if err != nil {
  777. return "", err
  778. }
  779. return r.Network.ID, nil
  780. }
  781. // RemoveNetwork removes a cluster network.
  782. func (c *Cluster) RemoveNetwork(input string) error {
  783. c.RLock()
  784. defer c.RUnlock()
  785. if !c.isActiveManager() {
  786. return ErrNoManager
  787. }
  788. network, err := getNetwork(c.getRequestContext(), c.client, input)
  789. if err != nil {
  790. return err
  791. }
  792. if _, err := c.client.RemoveNetwork(c.getRequestContext(), &swarmapi.RemoveNetworkRequest{NetworkID: network.ID}); err != nil {
  793. return err
  794. }
  795. return nil
  796. }
  797. func populateNetworkID(ctx context.Context, c swarmapi.ControlClient, s *types.ServiceSpec) error {
  798. for i, n := range s.Networks {
  799. apiNetwork, err := getNetwork(ctx, c, n.Target)
  800. if err != nil {
  801. return err
  802. }
  803. s.Networks[i] = types.NetworkAttachmentConfig{Target: apiNetwork.ID}
  804. }
  805. return nil
  806. }
  807. func getNetwork(ctx context.Context, c swarmapi.ControlClient, input string) (*swarmapi.Network, error) {
  808. // GetNetwork to match via full ID.
  809. rg, err := c.GetNetwork(ctx, &swarmapi.GetNetworkRequest{NetworkID: input})
  810. if err != nil {
  811. // If any error (including NotFound), ListNetworks to match via ID prefix and full name.
  812. rl, err := c.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: &swarmapi.ListNetworksRequest_Filters{Names: []string{input}}})
  813. if err != nil || len(rl.Networks) == 0 {
  814. rl, err = c.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: &swarmapi.ListNetworksRequest_Filters{IDPrefixes: []string{input}}})
  815. }
  816. if err != nil {
  817. return nil, err
  818. }
  819. if len(rl.Networks) == 0 {
  820. return nil, fmt.Errorf("network %s not found", input)
  821. }
  822. if l := len(rl.Networks); l > 1 {
  823. return nil, fmt.Errorf("network %s is ambigious (%d matches found)", input, l)
  824. }
  825. return rl.Networks[0], nil
  826. }
  827. return rg.Network, nil
  828. }
  829. // Cleanup stops active swarm node. This is run before daemon shutdown.
  830. func (c *Cluster) Cleanup() {
  831. c.Lock()
  832. node := c.node
  833. if node == nil {
  834. c.Unlock()
  835. return
  836. }
  837. if c.isActiveManager() {
  838. active, reachable, unreachable, err := c.managerStats()
  839. if err == nil {
  840. singlenode := active && reachable == 1 && unreachable == 0
  841. if active && !singlenode && reachable-2 <= unreachable {
  842. logrus.Errorf("Leaving cluster with %v managers left out of %v. Raft quorum will be lost.", reachable-1, reachable+unreachable)
  843. }
  844. }
  845. }
  846. c.cancelReconnect()
  847. c.Unlock()
  848. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  849. defer cancel()
  850. if err := node.Stop(ctx); err != nil {
  851. logrus.Errorf("error cleaning up cluster: %v", err)
  852. }
  853. c.Lock()
  854. c.node = nil
  855. c.ready = false
  856. c.conn = nil
  857. c.Unlock()
  858. }
  859. func (c *Cluster) managerStats() (current bool, reachable int, unreachable int, err error) {
  860. ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
  861. nodes, err := c.client.ListNodes(ctx, &swarmapi.ListNodesRequest{})
  862. if err != nil {
  863. return false, 0, 0, err
  864. }
  865. for _, n := range nodes.Nodes {
  866. if n.ManagerStatus != nil {
  867. if n.ManagerStatus.Raft.Status.Reachability == swarmapi.RaftMemberStatus_REACHABLE {
  868. reachable++
  869. if n.ID == c.node.NodeID() {
  870. current = true
  871. }
  872. }
  873. if n.ManagerStatus.Raft.Status.Reachability == swarmapi.RaftMemberStatus_UNREACHABLE {
  874. unreachable++
  875. }
  876. }
  877. }
  878. return
  879. }
  880. func initAcceptancePolicy(node *swarmagent.Node, acceptancePolicy types.AcceptancePolicy) error {
  881. ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
  882. for conn := range node.ListenControlSocket(ctx) {
  883. if ctx.Err() != nil {
  884. return ctx.Err()
  885. }
  886. if conn != nil {
  887. client := swarmapi.NewControlClient(conn)
  888. var cluster *swarmapi.Cluster
  889. for i := 0; ; i++ {
  890. lcr, err := client.ListClusters(ctx, &swarmapi.ListClustersRequest{})
  891. if err != nil {
  892. return fmt.Errorf("error on listing clusters: %v", err)
  893. }
  894. if len(lcr.Clusters) == 0 {
  895. if i < 10 {
  896. time.Sleep(200 * time.Millisecond)
  897. continue
  898. }
  899. return fmt.Errorf("empty list of clusters was returned")
  900. }
  901. cluster = lcr.Clusters[0]
  902. break
  903. }
  904. spec := &cluster.Spec
  905. if err := convert.SwarmSpecUpdateAcceptancePolicy(spec, acceptancePolicy); err != nil {
  906. return fmt.Errorf("error updating cluster settings: %v", err)
  907. }
  908. _, err := client.UpdateCluster(ctx, &swarmapi.UpdateClusterRequest{
  909. ClusterID: cluster.ID,
  910. ClusterVersion: &cluster.Meta.Version,
  911. Spec: spec,
  912. })
  913. if err != nil {
  914. return fmt.Errorf("error updating cluster settings: %v", err)
  915. }
  916. return nil
  917. }
  918. }
  919. return ctx.Err()
  920. }