cluster.go 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410
  1. package cluster
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io/ioutil"
  6. "net"
  7. "os"
  8. "path/filepath"
  9. "strings"
  10. "sync"
  11. "time"
  12. "google.golang.org/grpc"
  13. "github.com/Sirupsen/logrus"
  14. "github.com/docker/docker/daemon/cluster/convert"
  15. executorpkg "github.com/docker/docker/daemon/cluster/executor"
  16. "github.com/docker/docker/daemon/cluster/executor/container"
  17. "github.com/docker/docker/errors"
  18. "github.com/docker/docker/opts"
  19. "github.com/docker/docker/pkg/ioutils"
  20. "github.com/docker/docker/runconfig"
  21. apitypes "github.com/docker/engine-api/types"
  22. "github.com/docker/engine-api/types/filters"
  23. types "github.com/docker/engine-api/types/swarm"
  24. swarmagent "github.com/docker/swarmkit/agent"
  25. swarmapi "github.com/docker/swarmkit/api"
  26. "golang.org/x/net/context"
  27. )
  28. const swarmDirName = "swarm"
  29. const controlSocket = "control.sock"
  30. const swarmConnectTimeout = 20 * time.Second
  31. const swarmRequestTimeout = 20 * time.Second
  32. const stateFile = "docker-state.json"
  33. const defaultAddr = "0.0.0.0:2377"
  34. const (
  35. initialReconnectDelay = 100 * time.Millisecond
  36. maxReconnectDelay = 30 * time.Second
  37. )
  38. // ErrNoSwarm is returned on leaving a cluster that was never initialized
  39. var ErrNoSwarm = fmt.Errorf("This node is not part of a swarm")
  40. // ErrSwarmExists is returned on initialize or join request for a cluster that has already been activated
  41. var ErrSwarmExists = fmt.Errorf("This node is already part of a swarm. Use \"docker swarm leave\" to leave this swarm and join another one.")
  42. // ErrPendingSwarmExists is returned on initialize or join request for a cluster that is already processing a similar request but has not succeeded yet.
  43. var ErrPendingSwarmExists = fmt.Errorf("This node is processing an existing join request that has not succeeded yet. Use \"docker swarm leave\" to cancel the current request.")
  44. // ErrSwarmJoinTimeoutReached is returned when cluster join could not complete before timeout was reached.
  45. var ErrSwarmJoinTimeoutReached = fmt.Errorf("Timeout was reached before node was joined. The attempt to join the swarm will continue in the background. Use the \"docker info\" command to see the current swarm status of your node.")
  46. // defaultSpec contains some sane defaults if cluster options are missing on init
  47. var defaultSpec = types.Spec{
  48. Raft: types.RaftConfig{
  49. SnapshotInterval: 10000,
  50. KeepOldSnapshots: 0,
  51. LogEntriesForSlowFollowers: 500,
  52. HeartbeatTick: 1,
  53. ElectionTick: 3,
  54. },
  55. CAConfig: types.CAConfig{
  56. NodeCertExpiry: 90 * 24 * time.Hour,
  57. },
  58. Dispatcher: types.DispatcherConfig{
  59. HeartbeatPeriod: uint64((5 * time.Second).Nanoseconds()),
  60. },
  61. Orchestration: types.OrchestrationConfig{
  62. TaskHistoryRetentionLimit: 10,
  63. },
  64. }
  65. type state struct {
  66. // LocalAddr is this machine's local IP or hostname, if specified.
  67. LocalAddr string
  68. // RemoteAddr is the address that was given to "swarm join. It is used
  69. // to find LocalAddr if necessary.
  70. RemoteAddr string
  71. // ListenAddr is the address we bind to, including a port.
  72. ListenAddr string
  73. // AdvertiseAddr is the address other nodes should connect to,
  74. // including a port.
  75. AdvertiseAddr string
  76. }
  77. // NetworkSubnetsProvider exposes functions for retrieving the subnets
  78. // of networks managed by Docker, so they can be filtered.
  79. type NetworkSubnetsProvider interface {
  80. V4Subnets() []net.IPNet
  81. V6Subnets() []net.IPNet
  82. }
  83. // Config provides values for Cluster.
  84. type Config struct {
  85. Root string
  86. Name string
  87. Backend executorpkg.Backend
  88. NetworkSubnetsProvider NetworkSubnetsProvider
  89. // DefaultAdvertiseAddr is the default host/IP or network interface to use
  90. // if no AdvertiseAddr value is specified.
  91. DefaultAdvertiseAddr string
  92. }
  93. // Cluster provides capabilities to participate in a cluster as a worker or a
  94. // manager.
  95. type Cluster struct {
  96. sync.RWMutex
  97. *node
  98. root string
  99. config Config
  100. configEvent chan struct{} // todo: make this array and goroutine safe
  101. localAddr string
  102. actualLocalAddr string // after resolution, not persisted
  103. remoteAddr string
  104. listenAddr string
  105. advertiseAddr string
  106. stop bool
  107. err error
  108. cancelDelay func()
  109. }
  110. type node struct {
  111. *swarmagent.Node
  112. done chan struct{}
  113. ready bool
  114. conn *grpc.ClientConn
  115. client swarmapi.ControlClient
  116. reconnectDelay time.Duration
  117. }
  118. // New creates a new Cluster instance using provided config.
  119. func New(config Config) (*Cluster, error) {
  120. root := filepath.Join(config.Root, swarmDirName)
  121. if err := os.MkdirAll(root, 0700); err != nil {
  122. return nil, err
  123. }
  124. c := &Cluster{
  125. root: root,
  126. config: config,
  127. configEvent: make(chan struct{}, 10),
  128. }
  129. st, err := c.loadState()
  130. if err != nil {
  131. if os.IsNotExist(err) {
  132. return c, nil
  133. }
  134. return nil, err
  135. }
  136. n, err := c.startNewNode(false, st.LocalAddr, st.RemoteAddr, st.ListenAddr, st.AdvertiseAddr, "", "")
  137. if err != nil {
  138. return nil, err
  139. }
  140. select {
  141. case <-time.After(swarmConnectTimeout):
  142. logrus.Errorf("swarm component could not be started before timeout was reached")
  143. case <-n.Ready():
  144. case <-n.done:
  145. return nil, fmt.Errorf("swarm component could not be started: %v", c.err)
  146. }
  147. go c.reconnectOnFailure(n)
  148. return c, nil
  149. }
  150. func (c *Cluster) loadState() (*state, error) {
  151. dt, err := ioutil.ReadFile(filepath.Join(c.root, stateFile))
  152. if err != nil {
  153. return nil, err
  154. }
  155. // missing certificate means no actual state to restore from
  156. if _, err := os.Stat(filepath.Join(c.root, "certificates/swarm-node.crt")); err != nil {
  157. if os.IsNotExist(err) {
  158. c.clearState()
  159. }
  160. return nil, err
  161. }
  162. var st state
  163. if err := json.Unmarshal(dt, &st); err != nil {
  164. return nil, err
  165. }
  166. return &st, nil
  167. }
  168. func (c *Cluster) saveState() error {
  169. dt, err := json.Marshal(state{
  170. LocalAddr: c.localAddr,
  171. RemoteAddr: c.remoteAddr,
  172. ListenAddr: c.listenAddr,
  173. AdvertiseAddr: c.advertiseAddr,
  174. })
  175. if err != nil {
  176. return err
  177. }
  178. return ioutils.AtomicWriteFile(filepath.Join(c.root, stateFile), dt, 0600)
  179. }
  180. func (c *Cluster) reconnectOnFailure(n *node) {
  181. for {
  182. <-n.done
  183. c.Lock()
  184. if c.stop || c.node != nil {
  185. c.Unlock()
  186. return
  187. }
  188. n.reconnectDelay *= 2
  189. if n.reconnectDelay > maxReconnectDelay {
  190. n.reconnectDelay = maxReconnectDelay
  191. }
  192. logrus.Warnf("Restarting swarm in %.2f seconds", n.reconnectDelay.Seconds())
  193. delayCtx, cancel := context.WithTimeout(context.Background(), n.reconnectDelay)
  194. c.cancelDelay = cancel
  195. c.Unlock()
  196. <-delayCtx.Done()
  197. if delayCtx.Err() != context.DeadlineExceeded {
  198. return
  199. }
  200. c.Lock()
  201. if c.node != nil {
  202. c.Unlock()
  203. return
  204. }
  205. var err error
  206. n, err = c.startNewNode(false, c.localAddr, c.getRemoteAddress(), c.listenAddr, c.advertiseAddr, c.getRemoteAddress(), "")
  207. if err != nil {
  208. c.err = err
  209. close(n.done)
  210. }
  211. c.Unlock()
  212. }
  213. }
  214. func (c *Cluster) startNewNode(forceNewCluster bool, localAddr, remoteAddr, listenAddr, advertiseAddr, joinAddr, joinToken string) (*node, error) {
  215. if err := c.config.Backend.IsSwarmCompatible(); err != nil {
  216. return nil, err
  217. }
  218. actualLocalAddr := localAddr
  219. if actualLocalAddr == "" {
  220. // If localAddr was not specified, resolve it automatically
  221. // based on the route to joinAddr. localAddr can only be left
  222. // empty on "join".
  223. listenHost, _, err := net.SplitHostPort(listenAddr)
  224. if err != nil {
  225. return nil, fmt.Errorf("could not parse listen address: %v", err)
  226. }
  227. listenAddrIP := net.ParseIP(listenHost)
  228. if listenAddrIP == nil || !listenAddrIP.IsUnspecified() {
  229. actualLocalAddr = listenHost
  230. } else {
  231. if remoteAddr == "" {
  232. // Should never happen except using swarms created by
  233. // old versions that didn't save remoteAddr.
  234. remoteAddr = "8.8.8.8:53"
  235. }
  236. conn, err := net.Dial("udp", remoteAddr)
  237. if err != nil {
  238. return nil, fmt.Errorf("could not find local IP address: %v", err)
  239. }
  240. localHostPort := conn.LocalAddr().String()
  241. actualLocalAddr, _, _ = net.SplitHostPort(localHostPort)
  242. conn.Close()
  243. }
  244. }
  245. c.node = nil
  246. c.cancelDelay = nil
  247. c.stop = false
  248. n, err := swarmagent.NewNode(&swarmagent.NodeConfig{
  249. Hostname: c.config.Name,
  250. ForceNewCluster: forceNewCluster,
  251. ListenControlAPI: filepath.Join(c.root, controlSocket),
  252. ListenRemoteAPI: listenAddr,
  253. AdvertiseRemoteAPI: advertiseAddr,
  254. JoinAddr: joinAddr,
  255. StateDir: c.root,
  256. JoinToken: joinToken,
  257. Executor: container.NewExecutor(c.config.Backend),
  258. HeartbeatTick: 1,
  259. ElectionTick: 3,
  260. })
  261. if err != nil {
  262. return nil, err
  263. }
  264. ctx := context.Background()
  265. if err := n.Start(ctx); err != nil {
  266. return nil, err
  267. }
  268. node := &node{
  269. Node: n,
  270. done: make(chan struct{}),
  271. reconnectDelay: initialReconnectDelay,
  272. }
  273. c.node = node
  274. c.localAddr = localAddr
  275. c.actualLocalAddr = actualLocalAddr // not saved
  276. c.remoteAddr = remoteAddr
  277. c.listenAddr = listenAddr
  278. c.advertiseAddr = advertiseAddr
  279. c.saveState()
  280. c.config.Backend.SetClusterProvider(c)
  281. go func() {
  282. err := n.Err(ctx)
  283. if err != nil {
  284. logrus.Errorf("cluster exited with error: %v", err)
  285. }
  286. c.Lock()
  287. c.node = nil
  288. c.err = err
  289. c.Unlock()
  290. close(node.done)
  291. }()
  292. go func() {
  293. select {
  294. case <-n.Ready():
  295. c.Lock()
  296. node.ready = true
  297. c.err = nil
  298. c.Unlock()
  299. case <-ctx.Done():
  300. }
  301. c.configEvent <- struct{}{}
  302. }()
  303. go func() {
  304. for conn := range n.ListenControlSocket(ctx) {
  305. c.Lock()
  306. if node.conn != conn {
  307. if conn == nil {
  308. node.client = nil
  309. } else {
  310. node.client = swarmapi.NewControlClient(conn)
  311. }
  312. }
  313. node.conn = conn
  314. c.Unlock()
  315. c.configEvent <- struct{}{}
  316. }
  317. }()
  318. return node, nil
  319. }
  320. // Init initializes new cluster from user provided request.
  321. func (c *Cluster) Init(req types.InitRequest) (string, error) {
  322. c.Lock()
  323. if node := c.node; node != nil {
  324. if !req.ForceNewCluster {
  325. c.Unlock()
  326. return "", ErrSwarmExists
  327. }
  328. if err := c.stopNode(); err != nil {
  329. c.Unlock()
  330. return "", err
  331. }
  332. }
  333. if err := validateAndSanitizeInitRequest(&req); err != nil {
  334. c.Unlock()
  335. return "", err
  336. }
  337. listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
  338. if err != nil {
  339. c.Unlock()
  340. return "", err
  341. }
  342. advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
  343. if err != nil {
  344. c.Unlock()
  345. return "", err
  346. }
  347. localAddr := listenHost
  348. // If the advertise address is not one of the system's
  349. // addresses, we also require a listen address.
  350. listenAddrIP := net.ParseIP(listenHost)
  351. if listenAddrIP != nil && listenAddrIP.IsUnspecified() {
  352. advertiseIP := net.ParseIP(advertiseHost)
  353. if advertiseIP == nil {
  354. // not an IP
  355. c.Unlock()
  356. return "", errMustSpecifyListenAddr
  357. }
  358. systemIPs := listSystemIPs()
  359. found := false
  360. for _, systemIP := range systemIPs {
  361. if systemIP.Equal(advertiseIP) {
  362. found = true
  363. break
  364. }
  365. }
  366. if !found {
  367. c.Unlock()
  368. return "", errMustSpecifyListenAddr
  369. }
  370. localAddr = advertiseIP.String()
  371. }
  372. // todo: check current state existing
  373. n, err := c.startNewNode(req.ForceNewCluster, localAddr, "", net.JoinHostPort(listenHost, listenPort), net.JoinHostPort(advertiseHost, advertisePort), "", "")
  374. if err != nil {
  375. c.Unlock()
  376. return "", err
  377. }
  378. c.Unlock()
  379. select {
  380. case <-n.Ready():
  381. if err := initClusterSpec(n, req.Spec); err != nil {
  382. return "", err
  383. }
  384. go c.reconnectOnFailure(n)
  385. return n.NodeID(), nil
  386. case <-n.done:
  387. c.RLock()
  388. defer c.RUnlock()
  389. if !req.ForceNewCluster { // if failure on first attempt don't keep state
  390. if err := c.clearState(); err != nil {
  391. return "", err
  392. }
  393. }
  394. return "", c.err
  395. }
  396. }
  397. // Join makes current Cluster part of an existing swarm cluster.
  398. func (c *Cluster) Join(req types.JoinRequest) error {
  399. c.Lock()
  400. if node := c.node; node != nil {
  401. c.Unlock()
  402. return ErrSwarmExists
  403. }
  404. if err := validateAndSanitizeJoinRequest(&req); err != nil {
  405. c.Unlock()
  406. return err
  407. }
  408. listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
  409. if err != nil {
  410. c.Unlock()
  411. return err
  412. }
  413. var advertiseAddr string
  414. advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
  415. // For joining, we don't need to provide an advertise address,
  416. // since the remote side can detect it.
  417. if err == nil {
  418. advertiseAddr = net.JoinHostPort(advertiseHost, advertisePort)
  419. }
  420. // todo: check current state existing
  421. n, err := c.startNewNode(false, "", req.RemoteAddrs[0], net.JoinHostPort(listenHost, listenPort), advertiseAddr, req.RemoteAddrs[0], req.JoinToken)
  422. if err != nil {
  423. c.Unlock()
  424. return err
  425. }
  426. c.Unlock()
  427. select {
  428. case <-time.After(swarmConnectTimeout):
  429. // attempt to connect will continue in background, also reconnecting
  430. go c.reconnectOnFailure(n)
  431. return ErrSwarmJoinTimeoutReached
  432. case <-n.Ready():
  433. go c.reconnectOnFailure(n)
  434. return nil
  435. case <-n.done:
  436. c.RLock()
  437. defer c.RUnlock()
  438. return c.err
  439. }
  440. }
  441. // stopNode is a helper that stops the active c.node and waits until it has
  442. // shut down. Call while keeping the cluster lock.
  443. func (c *Cluster) stopNode() error {
  444. if c.node == nil {
  445. return nil
  446. }
  447. c.stop = true
  448. if c.cancelDelay != nil {
  449. c.cancelDelay()
  450. c.cancelDelay = nil
  451. }
  452. node := c.node
  453. ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
  454. defer cancel()
  455. // TODO: can't hold lock on stop because it calls back to network
  456. c.Unlock()
  457. defer c.Lock()
  458. if err := node.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") {
  459. return err
  460. }
  461. <-node.done
  462. return nil
  463. }
  464. // Leave shuts down Cluster and removes current state.
  465. func (c *Cluster) Leave(force bool) error {
  466. c.Lock()
  467. node := c.node
  468. if node == nil {
  469. c.Unlock()
  470. return ErrNoSwarm
  471. }
  472. if node.Manager() != nil && !force {
  473. msg := "You are attempting to leave the swarm on a node that is participating as a manager. "
  474. if c.isActiveManager() {
  475. active, reachable, unreachable, err := c.managerStats()
  476. if err == nil {
  477. if active && reachable-2 <= unreachable {
  478. if reachable == 1 && unreachable == 0 {
  479. msg += "Removing the last manager erases all current state of the swarm. Use `--force` to ignore this message. "
  480. c.Unlock()
  481. return fmt.Errorf(msg)
  482. }
  483. msg += fmt.Sprintf("Removing this node leaves %v managers out of %v. Without a Raft quorum your swarm will be inaccessible. ", reachable-1, reachable+unreachable)
  484. }
  485. }
  486. } else {
  487. msg += "Doing so may lose the consensus of your cluster. "
  488. }
  489. msg += "The only way to restore a swarm that has lost consensus is to reinitialize it with `--force-new-cluster`. Use `--force` to suppress this message."
  490. c.Unlock()
  491. return fmt.Errorf(msg)
  492. }
  493. if err := c.stopNode(); err != nil {
  494. c.Unlock()
  495. return err
  496. }
  497. c.Unlock()
  498. if nodeID := node.NodeID(); nodeID != "" {
  499. for _, id := range c.config.Backend.ListContainersForNode(nodeID) {
  500. if err := c.config.Backend.ContainerRm(id, &apitypes.ContainerRmConfig{ForceRemove: true}); err != nil {
  501. logrus.Errorf("error removing %v: %v", id, err)
  502. }
  503. }
  504. }
  505. c.configEvent <- struct{}{}
  506. // todo: cleanup optional?
  507. if err := c.clearState(); err != nil {
  508. return err
  509. }
  510. return nil
  511. }
  512. func (c *Cluster) clearState() error {
  513. // todo: backup this data instead of removing?
  514. if err := os.RemoveAll(c.root); err != nil {
  515. return err
  516. }
  517. if err := os.MkdirAll(c.root, 0700); err != nil {
  518. return err
  519. }
  520. c.config.Backend.SetClusterProvider(nil)
  521. return nil
  522. }
  523. func (c *Cluster) getRequestContext() (context.Context, func()) { // TODO: not needed when requests don't block on qourum lost
  524. return context.WithTimeout(context.Background(), swarmRequestTimeout)
  525. }
  526. // Inspect retrieves the configuration properties of a managed swarm cluster.
  527. func (c *Cluster) Inspect() (types.Swarm, error) {
  528. c.RLock()
  529. defer c.RUnlock()
  530. if !c.isActiveManager() {
  531. return types.Swarm{}, c.errNoManager()
  532. }
  533. ctx, cancel := c.getRequestContext()
  534. defer cancel()
  535. swarm, err := getSwarm(ctx, c.client)
  536. if err != nil {
  537. return types.Swarm{}, err
  538. }
  539. if err != nil {
  540. return types.Swarm{}, err
  541. }
  542. return convert.SwarmFromGRPC(*swarm), nil
  543. }
  544. // Update updates configuration of a managed swarm cluster.
  545. func (c *Cluster) Update(version uint64, spec types.Spec, flags types.UpdateFlags) error {
  546. c.RLock()
  547. defer c.RUnlock()
  548. if !c.isActiveManager() {
  549. return c.errNoManager()
  550. }
  551. ctx, cancel := c.getRequestContext()
  552. defer cancel()
  553. swarm, err := getSwarm(ctx, c.client)
  554. if err != nil {
  555. return err
  556. }
  557. swarmSpec, err := convert.SwarmSpecToGRPC(spec)
  558. if err != nil {
  559. return err
  560. }
  561. _, err = c.client.UpdateCluster(
  562. ctx,
  563. &swarmapi.UpdateClusterRequest{
  564. ClusterID: swarm.ID,
  565. Spec: &swarmSpec,
  566. ClusterVersion: &swarmapi.Version{
  567. Index: version,
  568. },
  569. Rotation: swarmapi.JoinTokenRotation{
  570. RotateWorkerToken: flags.RotateWorkerToken,
  571. RotateManagerToken: flags.RotateManagerToken,
  572. },
  573. },
  574. )
  575. return err
  576. }
  577. // IsManager returns true if Cluster is participating as a manager.
  578. func (c *Cluster) IsManager() bool {
  579. c.RLock()
  580. defer c.RUnlock()
  581. return c.isActiveManager()
  582. }
  583. // IsAgent returns true if Cluster is participating as a worker/agent.
  584. func (c *Cluster) IsAgent() bool {
  585. c.RLock()
  586. defer c.RUnlock()
  587. return c.node != nil && c.ready
  588. }
  589. // GetLocalAddress returns the local address.
  590. func (c *Cluster) GetLocalAddress() string {
  591. c.RLock()
  592. defer c.RUnlock()
  593. return c.actualLocalAddr
  594. }
  595. // GetAdvertiseAddress returns the remotely reachable address of this node.
  596. func (c *Cluster) GetAdvertiseAddress() string {
  597. c.RLock()
  598. defer c.RUnlock()
  599. if c.advertiseAddr != "" {
  600. advertiseHost, _, _ := net.SplitHostPort(c.advertiseAddr)
  601. return advertiseHost
  602. }
  603. return c.actualLocalAddr
  604. }
  605. // GetRemoteAddress returns a known advertise address of a remote manager if
  606. // available.
  607. // todo: change to array/connect with info
  608. func (c *Cluster) GetRemoteAddress() string {
  609. c.RLock()
  610. defer c.RUnlock()
  611. return c.getRemoteAddress()
  612. }
  613. func (c *Cluster) getRemoteAddress() string {
  614. if c.node == nil {
  615. return ""
  616. }
  617. nodeID := c.node.NodeID()
  618. for _, r := range c.node.Remotes() {
  619. if r.NodeID != nodeID {
  620. return r.Addr
  621. }
  622. }
  623. return ""
  624. }
  625. // ListenClusterEvents returns a channel that receives messages on cluster
  626. // participation changes.
  627. // todo: make cancelable and accessible to multiple callers
  628. func (c *Cluster) ListenClusterEvents() <-chan struct{} {
  629. return c.configEvent
  630. }
  631. // Info returns information about the current cluster state.
  632. func (c *Cluster) Info() types.Info {
  633. info := types.Info{
  634. NodeAddr: c.GetAdvertiseAddress(),
  635. }
  636. c.RLock()
  637. defer c.RUnlock()
  638. if c.node == nil {
  639. info.LocalNodeState = types.LocalNodeStateInactive
  640. if c.cancelDelay != nil {
  641. info.LocalNodeState = types.LocalNodeStateError
  642. }
  643. } else {
  644. info.LocalNodeState = types.LocalNodeStatePending
  645. if c.ready == true {
  646. info.LocalNodeState = types.LocalNodeStateActive
  647. }
  648. }
  649. if c.err != nil {
  650. info.Error = c.err.Error()
  651. }
  652. ctx, cancel := c.getRequestContext()
  653. defer cancel()
  654. if c.isActiveManager() {
  655. info.ControlAvailable = true
  656. swarm, err := c.Inspect()
  657. if err != nil {
  658. info.Error = err.Error()
  659. }
  660. // Strip JoinTokens
  661. info.Cluster = swarm.ClusterInfo
  662. if r, err := c.client.ListNodes(ctx, &swarmapi.ListNodesRequest{}); err == nil {
  663. info.Nodes = len(r.Nodes)
  664. for _, n := range r.Nodes {
  665. if n.ManagerStatus != nil {
  666. info.Managers = info.Managers + 1
  667. }
  668. }
  669. }
  670. }
  671. if c.node != nil {
  672. for _, r := range c.node.Remotes() {
  673. info.RemoteManagers = append(info.RemoteManagers, types.Peer{NodeID: r.NodeID, Addr: r.Addr})
  674. }
  675. info.NodeID = c.node.NodeID()
  676. }
  677. return info
  678. }
  679. // isActiveManager should not be called without a read lock
  680. func (c *Cluster) isActiveManager() bool {
  681. return c.node != nil && c.conn != nil
  682. }
  683. // errNoManager returns error describing why manager commands can't be used.
  684. // Call with read lock.
  685. func (c *Cluster) errNoManager() error {
  686. if c.node == nil {
  687. return fmt.Errorf("This node is not a swarm manager. Use \"docker swarm init\" or \"docker swarm join\" to connect this node to swarm and try again.")
  688. }
  689. if c.node.Manager() != nil {
  690. return fmt.Errorf("This node is not a swarm manager. Manager is being prepared or has trouble connecting to the cluster.")
  691. }
  692. return fmt.Errorf("This node is not a swarm manager. Worker nodes can't be used to view or modify cluster state. Please run this command on a manager node or promote the current node to a manager.")
  693. }
  694. // GetServices returns all services of a managed swarm cluster.
  695. func (c *Cluster) GetServices(options apitypes.ServiceListOptions) ([]types.Service, error) {
  696. c.RLock()
  697. defer c.RUnlock()
  698. if !c.isActiveManager() {
  699. return nil, c.errNoManager()
  700. }
  701. filters, err := newListServicesFilters(options.Filter)
  702. if err != nil {
  703. return nil, err
  704. }
  705. ctx, cancel := c.getRequestContext()
  706. defer cancel()
  707. r, err := c.client.ListServices(
  708. ctx,
  709. &swarmapi.ListServicesRequest{Filters: filters})
  710. if err != nil {
  711. return nil, err
  712. }
  713. services := []types.Service{}
  714. for _, service := range r.Services {
  715. services = append(services, convert.ServiceFromGRPC(*service))
  716. }
  717. return services, nil
  718. }
  719. // CreateService creates a new service in a managed swarm cluster.
  720. func (c *Cluster) CreateService(s types.ServiceSpec, encodedAuth string) (string, error) {
  721. c.RLock()
  722. defer c.RUnlock()
  723. if !c.isActiveManager() {
  724. return "", c.errNoManager()
  725. }
  726. ctx, cancel := c.getRequestContext()
  727. defer cancel()
  728. err := c.populateNetworkID(ctx, c.client, &s)
  729. if err != nil {
  730. return "", err
  731. }
  732. serviceSpec, err := convert.ServiceSpecToGRPC(s)
  733. if err != nil {
  734. return "", err
  735. }
  736. if encodedAuth != "" {
  737. ctnr := serviceSpec.Task.GetContainer()
  738. if ctnr == nil {
  739. return "", fmt.Errorf("service does not use container tasks")
  740. }
  741. ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
  742. }
  743. r, err := c.client.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
  744. if err != nil {
  745. return "", err
  746. }
  747. return r.Service.ID, nil
  748. }
  749. // GetService returns a service based on an ID or name.
  750. func (c *Cluster) GetService(input string) (types.Service, error) {
  751. c.RLock()
  752. defer c.RUnlock()
  753. if !c.isActiveManager() {
  754. return types.Service{}, c.errNoManager()
  755. }
  756. ctx, cancel := c.getRequestContext()
  757. defer cancel()
  758. service, err := getService(ctx, c.client, input)
  759. if err != nil {
  760. return types.Service{}, err
  761. }
  762. return convert.ServiceFromGRPC(*service), nil
  763. }
  764. // UpdateService updates existing service to match new properties.
  765. func (c *Cluster) UpdateService(serviceID string, version uint64, spec types.ServiceSpec, encodedAuth string) error {
  766. c.RLock()
  767. defer c.RUnlock()
  768. if !c.isActiveManager() {
  769. return c.errNoManager()
  770. }
  771. ctx, cancel := c.getRequestContext()
  772. defer cancel()
  773. err := c.populateNetworkID(ctx, c.client, &spec)
  774. if err != nil {
  775. return err
  776. }
  777. serviceSpec, err := convert.ServiceSpecToGRPC(spec)
  778. if err != nil {
  779. return err
  780. }
  781. if encodedAuth != "" {
  782. ctnr := serviceSpec.Task.GetContainer()
  783. if ctnr == nil {
  784. return fmt.Errorf("service does not use container tasks")
  785. }
  786. ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
  787. } else {
  788. // this is needed because if the encodedAuth isn't being updated then we
  789. // shouldn't lose it, and continue to use the one that was already present
  790. currentService, err := getService(ctx, c.client, serviceID)
  791. if err != nil {
  792. return err
  793. }
  794. ctnr := currentService.Spec.Task.GetContainer()
  795. if ctnr == nil {
  796. return fmt.Errorf("service does not use container tasks")
  797. }
  798. serviceSpec.Task.GetContainer().PullOptions = ctnr.PullOptions
  799. }
  800. _, err = c.client.UpdateService(
  801. ctx,
  802. &swarmapi.UpdateServiceRequest{
  803. ServiceID: serviceID,
  804. Spec: &serviceSpec,
  805. ServiceVersion: &swarmapi.Version{
  806. Index: version,
  807. },
  808. },
  809. )
  810. return err
  811. }
  812. // RemoveService removes a service from a managed swarm cluster.
  813. func (c *Cluster) RemoveService(input string) error {
  814. c.RLock()
  815. defer c.RUnlock()
  816. if !c.isActiveManager() {
  817. return c.errNoManager()
  818. }
  819. ctx, cancel := c.getRequestContext()
  820. defer cancel()
  821. service, err := getService(ctx, c.client, input)
  822. if err != nil {
  823. return err
  824. }
  825. if _, err := c.client.RemoveService(ctx, &swarmapi.RemoveServiceRequest{ServiceID: service.ID}); err != nil {
  826. return err
  827. }
  828. return nil
  829. }
  830. // GetNodes returns a list of all nodes known to a cluster.
  831. func (c *Cluster) GetNodes(options apitypes.NodeListOptions) ([]types.Node, error) {
  832. c.RLock()
  833. defer c.RUnlock()
  834. if !c.isActiveManager() {
  835. return nil, c.errNoManager()
  836. }
  837. filters, err := newListNodesFilters(options.Filter)
  838. if err != nil {
  839. return nil, err
  840. }
  841. ctx, cancel := c.getRequestContext()
  842. defer cancel()
  843. r, err := c.client.ListNodes(
  844. ctx,
  845. &swarmapi.ListNodesRequest{Filters: filters})
  846. if err != nil {
  847. return nil, err
  848. }
  849. nodes := []types.Node{}
  850. for _, node := range r.Nodes {
  851. nodes = append(nodes, convert.NodeFromGRPC(*node))
  852. }
  853. return nodes, nil
  854. }
  855. // GetNode returns a node based on an ID or name.
  856. func (c *Cluster) GetNode(input string) (types.Node, error) {
  857. c.RLock()
  858. defer c.RUnlock()
  859. if !c.isActiveManager() {
  860. return types.Node{}, c.errNoManager()
  861. }
  862. ctx, cancel := c.getRequestContext()
  863. defer cancel()
  864. node, err := getNode(ctx, c.client, input)
  865. if err != nil {
  866. return types.Node{}, err
  867. }
  868. return convert.NodeFromGRPC(*node), nil
  869. }
  870. // UpdateNode updates existing nodes properties.
  871. func (c *Cluster) UpdateNode(nodeID string, version uint64, spec types.NodeSpec) error {
  872. c.RLock()
  873. defer c.RUnlock()
  874. if !c.isActiveManager() {
  875. return c.errNoManager()
  876. }
  877. nodeSpec, err := convert.NodeSpecToGRPC(spec)
  878. if err != nil {
  879. return err
  880. }
  881. ctx, cancel := c.getRequestContext()
  882. defer cancel()
  883. _, err = c.client.UpdateNode(
  884. ctx,
  885. &swarmapi.UpdateNodeRequest{
  886. NodeID: nodeID,
  887. Spec: &nodeSpec,
  888. NodeVersion: &swarmapi.Version{
  889. Index: version,
  890. },
  891. },
  892. )
  893. return err
  894. }
  895. // RemoveNode removes a node from a cluster
  896. func (c *Cluster) RemoveNode(input string, force bool) error {
  897. c.RLock()
  898. defer c.RUnlock()
  899. if !c.isActiveManager() {
  900. return c.errNoManager()
  901. }
  902. ctx, cancel := c.getRequestContext()
  903. defer cancel()
  904. node, err := getNode(ctx, c.client, input)
  905. if err != nil {
  906. return err
  907. }
  908. if _, err := c.client.RemoveNode(ctx, &swarmapi.RemoveNodeRequest{NodeID: node.ID, Force: force}); err != nil {
  909. return err
  910. }
  911. return nil
  912. }
  913. // GetTasks returns a list of tasks matching the filter options.
  914. func (c *Cluster) GetTasks(options apitypes.TaskListOptions) ([]types.Task, error) {
  915. c.RLock()
  916. defer c.RUnlock()
  917. if !c.isActiveManager() {
  918. return nil, c.errNoManager()
  919. }
  920. byName := func(filter filters.Args) error {
  921. if filter.Include("service") {
  922. serviceFilters := filter.Get("service")
  923. for _, serviceFilter := range serviceFilters {
  924. service, err := c.GetService(serviceFilter)
  925. if err != nil {
  926. return err
  927. }
  928. filter.Del("service", serviceFilter)
  929. filter.Add("service", service.ID)
  930. }
  931. }
  932. if filter.Include("node") {
  933. nodeFilters := filter.Get("node")
  934. for _, nodeFilter := range nodeFilters {
  935. node, err := c.GetNode(nodeFilter)
  936. if err != nil {
  937. return err
  938. }
  939. filter.Del("node", nodeFilter)
  940. filter.Add("node", node.ID)
  941. }
  942. }
  943. return nil
  944. }
  945. filters, err := newListTasksFilters(options.Filter, byName)
  946. if err != nil {
  947. return nil, err
  948. }
  949. ctx, cancel := c.getRequestContext()
  950. defer cancel()
  951. r, err := c.client.ListTasks(
  952. ctx,
  953. &swarmapi.ListTasksRequest{Filters: filters})
  954. if err != nil {
  955. return nil, err
  956. }
  957. tasks := []types.Task{}
  958. for _, task := range r.Tasks {
  959. tasks = append(tasks, convert.TaskFromGRPC(*task))
  960. }
  961. return tasks, nil
  962. }
  963. // GetTask returns a task by an ID.
  964. func (c *Cluster) GetTask(input string) (types.Task, error) {
  965. c.RLock()
  966. defer c.RUnlock()
  967. if !c.isActiveManager() {
  968. return types.Task{}, c.errNoManager()
  969. }
  970. ctx, cancel := c.getRequestContext()
  971. defer cancel()
  972. task, err := getTask(ctx, c.client, input)
  973. if err != nil {
  974. return types.Task{}, err
  975. }
  976. return convert.TaskFromGRPC(*task), nil
  977. }
  978. // GetNetwork returns a cluster network by an ID.
  979. func (c *Cluster) GetNetwork(input string) (apitypes.NetworkResource, error) {
  980. c.RLock()
  981. defer c.RUnlock()
  982. if !c.isActiveManager() {
  983. return apitypes.NetworkResource{}, c.errNoManager()
  984. }
  985. ctx, cancel := c.getRequestContext()
  986. defer cancel()
  987. network, err := getNetwork(ctx, c.client, input)
  988. if err != nil {
  989. return apitypes.NetworkResource{}, err
  990. }
  991. return convert.BasicNetworkFromGRPC(*network), nil
  992. }
  993. // GetNetworks returns all current cluster managed networks.
  994. func (c *Cluster) GetNetworks() ([]apitypes.NetworkResource, error) {
  995. c.RLock()
  996. defer c.RUnlock()
  997. if !c.isActiveManager() {
  998. return nil, c.errNoManager()
  999. }
  1000. ctx, cancel := c.getRequestContext()
  1001. defer cancel()
  1002. r, err := c.client.ListNetworks(ctx, &swarmapi.ListNetworksRequest{})
  1003. if err != nil {
  1004. return nil, err
  1005. }
  1006. var networks []apitypes.NetworkResource
  1007. for _, network := range r.Networks {
  1008. networks = append(networks, convert.BasicNetworkFromGRPC(*network))
  1009. }
  1010. return networks, nil
  1011. }
  1012. // CreateNetwork creates a new cluster managed network.
  1013. func (c *Cluster) CreateNetwork(s apitypes.NetworkCreateRequest) (string, error) {
  1014. c.RLock()
  1015. defer c.RUnlock()
  1016. if !c.isActiveManager() {
  1017. return "", c.errNoManager()
  1018. }
  1019. if runconfig.IsPreDefinedNetwork(s.Name) {
  1020. err := fmt.Errorf("%s is a pre-defined network and cannot be created", s.Name)
  1021. return "", errors.NewRequestForbiddenError(err)
  1022. }
  1023. ctx, cancel := c.getRequestContext()
  1024. defer cancel()
  1025. networkSpec := convert.BasicNetworkCreateToGRPC(s)
  1026. r, err := c.client.CreateNetwork(ctx, &swarmapi.CreateNetworkRequest{Spec: &networkSpec})
  1027. if err != nil {
  1028. return "", err
  1029. }
  1030. return r.Network.ID, nil
  1031. }
  1032. // RemoveNetwork removes a cluster network.
  1033. func (c *Cluster) RemoveNetwork(input string) error {
  1034. c.RLock()
  1035. defer c.RUnlock()
  1036. if !c.isActiveManager() {
  1037. return c.errNoManager()
  1038. }
  1039. ctx, cancel := c.getRequestContext()
  1040. defer cancel()
  1041. network, err := getNetwork(ctx, c.client, input)
  1042. if err != nil {
  1043. return err
  1044. }
  1045. if _, err := c.client.RemoveNetwork(ctx, &swarmapi.RemoveNetworkRequest{NetworkID: network.ID}); err != nil {
  1046. return err
  1047. }
  1048. return nil
  1049. }
  1050. func (c *Cluster) populateNetworkID(ctx context.Context, client swarmapi.ControlClient, s *types.ServiceSpec) error {
  1051. for i, n := range s.Networks {
  1052. apiNetwork, err := getNetwork(ctx, client, n.Target)
  1053. if err != nil {
  1054. if ln, _ := c.config.Backend.FindNetwork(n.Target); ln != nil && !ln.Info().Dynamic() {
  1055. err = fmt.Errorf("network %s is not eligible for docker services", ln.Name())
  1056. return errors.NewRequestForbiddenError(err)
  1057. }
  1058. return err
  1059. }
  1060. s.Networks[i].Target = apiNetwork.ID
  1061. }
  1062. return nil
  1063. }
  1064. func getNetwork(ctx context.Context, c swarmapi.ControlClient, input string) (*swarmapi.Network, error) {
  1065. // GetNetwork to match via full ID.
  1066. rg, err := c.GetNetwork(ctx, &swarmapi.GetNetworkRequest{NetworkID: input})
  1067. if err != nil {
  1068. // If any error (including NotFound), ListNetworks to match via ID prefix and full name.
  1069. rl, err := c.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: &swarmapi.ListNetworksRequest_Filters{Names: []string{input}}})
  1070. if err != nil || len(rl.Networks) == 0 {
  1071. rl, err = c.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: &swarmapi.ListNetworksRequest_Filters{IDPrefixes: []string{input}}})
  1072. }
  1073. if err != nil {
  1074. return nil, err
  1075. }
  1076. if len(rl.Networks) == 0 {
  1077. return nil, fmt.Errorf("network %s not found", input)
  1078. }
  1079. if l := len(rl.Networks); l > 1 {
  1080. return nil, fmt.Errorf("network %s is ambiguous (%d matches found)", input, l)
  1081. }
  1082. return rl.Networks[0], nil
  1083. }
  1084. return rg.Network, nil
  1085. }
  1086. // Cleanup stops active swarm node. This is run before daemon shutdown.
  1087. func (c *Cluster) Cleanup() {
  1088. c.Lock()
  1089. node := c.node
  1090. if node == nil {
  1091. c.Unlock()
  1092. return
  1093. }
  1094. defer c.Unlock()
  1095. if c.isActiveManager() {
  1096. active, reachable, unreachable, err := c.managerStats()
  1097. if err == nil {
  1098. singlenode := active && reachable == 1 && unreachable == 0
  1099. if active && !singlenode && reachable-2 <= unreachable {
  1100. logrus.Errorf("Leaving cluster with %v managers left out of %v. Raft quorum will be lost.", reachable-1, reachable+unreachable)
  1101. }
  1102. }
  1103. }
  1104. c.stopNode()
  1105. }
  1106. func (c *Cluster) managerStats() (current bool, reachable int, unreachable int, err error) {
  1107. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  1108. defer cancel()
  1109. nodes, err := c.client.ListNodes(ctx, &swarmapi.ListNodesRequest{})
  1110. if err != nil {
  1111. return false, 0, 0, err
  1112. }
  1113. for _, n := range nodes.Nodes {
  1114. if n.ManagerStatus != nil {
  1115. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_REACHABLE {
  1116. reachable++
  1117. if n.ID == c.node.NodeID() {
  1118. current = true
  1119. }
  1120. }
  1121. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_UNREACHABLE {
  1122. unreachable++
  1123. }
  1124. }
  1125. }
  1126. return
  1127. }
  1128. func validateAndSanitizeInitRequest(req *types.InitRequest) error {
  1129. var err error
  1130. req.ListenAddr, err = validateAddr(req.ListenAddr)
  1131. if err != nil {
  1132. return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
  1133. }
  1134. spec := &req.Spec
  1135. // provide sane defaults instead of erroring
  1136. if spec.Name == "" {
  1137. spec.Name = "default"
  1138. }
  1139. if spec.Raft.SnapshotInterval == 0 {
  1140. spec.Raft.SnapshotInterval = defaultSpec.Raft.SnapshotInterval
  1141. }
  1142. if spec.Raft.LogEntriesForSlowFollowers == 0 {
  1143. spec.Raft.LogEntriesForSlowFollowers = defaultSpec.Raft.LogEntriesForSlowFollowers
  1144. }
  1145. if spec.Raft.ElectionTick == 0 {
  1146. spec.Raft.ElectionTick = defaultSpec.Raft.ElectionTick
  1147. }
  1148. if spec.Raft.HeartbeatTick == 0 {
  1149. spec.Raft.HeartbeatTick = defaultSpec.Raft.HeartbeatTick
  1150. }
  1151. if spec.Dispatcher.HeartbeatPeriod == 0 {
  1152. spec.Dispatcher.HeartbeatPeriod = defaultSpec.Dispatcher.HeartbeatPeriod
  1153. }
  1154. if spec.CAConfig.NodeCertExpiry == 0 {
  1155. spec.CAConfig.NodeCertExpiry = defaultSpec.CAConfig.NodeCertExpiry
  1156. }
  1157. if spec.Orchestration.TaskHistoryRetentionLimit == 0 {
  1158. spec.Orchestration.TaskHistoryRetentionLimit = defaultSpec.Orchestration.TaskHistoryRetentionLimit
  1159. }
  1160. return nil
  1161. }
  1162. func validateAndSanitizeJoinRequest(req *types.JoinRequest) error {
  1163. var err error
  1164. req.ListenAddr, err = validateAddr(req.ListenAddr)
  1165. if err != nil {
  1166. return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
  1167. }
  1168. if len(req.RemoteAddrs) == 0 {
  1169. return fmt.Errorf("at least 1 RemoteAddr is required to join")
  1170. }
  1171. for i := range req.RemoteAddrs {
  1172. req.RemoteAddrs[i], err = validateAddr(req.RemoteAddrs[i])
  1173. if err != nil {
  1174. return fmt.Errorf("invalid remoteAddr %q: %v", req.RemoteAddrs[i], err)
  1175. }
  1176. }
  1177. return nil
  1178. }
  1179. func validateAddr(addr string) (string, error) {
  1180. if addr == "" {
  1181. return addr, fmt.Errorf("invalid empty address")
  1182. }
  1183. newaddr, err := opts.ParseTCPAddr(addr, defaultAddr)
  1184. if err != nil {
  1185. return addr, nil
  1186. }
  1187. return strings.TrimPrefix(newaddr, "tcp://"), nil
  1188. }
  1189. func initClusterSpec(node *node, spec types.Spec) error {
  1190. ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
  1191. for conn := range node.ListenControlSocket(ctx) {
  1192. if ctx.Err() != nil {
  1193. return ctx.Err()
  1194. }
  1195. if conn != nil {
  1196. client := swarmapi.NewControlClient(conn)
  1197. var cluster *swarmapi.Cluster
  1198. for i := 0; ; i++ {
  1199. lcr, err := client.ListClusters(ctx, &swarmapi.ListClustersRequest{})
  1200. if err != nil {
  1201. return fmt.Errorf("error on listing clusters: %v", err)
  1202. }
  1203. if len(lcr.Clusters) == 0 {
  1204. if i < 10 {
  1205. time.Sleep(200 * time.Millisecond)
  1206. continue
  1207. }
  1208. return fmt.Errorf("empty list of clusters was returned")
  1209. }
  1210. cluster = lcr.Clusters[0]
  1211. break
  1212. }
  1213. newspec, err := convert.SwarmSpecToGRPC(spec)
  1214. if err != nil {
  1215. return fmt.Errorf("error updating cluster settings: %v", err)
  1216. }
  1217. _, err = client.UpdateCluster(ctx, &swarmapi.UpdateClusterRequest{
  1218. ClusterID: cluster.ID,
  1219. ClusterVersion: &cluster.Meta.Version,
  1220. Spec: &newspec,
  1221. })
  1222. if err != nil {
  1223. return fmt.Errorf("error updating cluster settings: %v", err)
  1224. }
  1225. return nil
  1226. }
  1227. }
  1228. return ctx.Err()
  1229. }