cluster.go 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422
  1. package cluster
  2. import (
  3. "encoding/json"
  4. stdliberrors "errors"
  5. "fmt"
  6. "io/ioutil"
  7. "net"
  8. "os"
  9. "path/filepath"
  10. "strings"
  11. "sync"
  12. "time"
  13. "google.golang.org/grpc"
  14. "github.com/Sirupsen/logrus"
  15. "github.com/docker/docker/daemon/cluster/convert"
  16. executorpkg "github.com/docker/docker/daemon/cluster/executor"
  17. "github.com/docker/docker/daemon/cluster/executor/container"
  18. "github.com/docker/docker/errors"
  19. "github.com/docker/docker/opts"
  20. "github.com/docker/docker/pkg/ioutils"
  21. "github.com/docker/docker/runconfig"
  22. apitypes "github.com/docker/engine-api/types"
  23. "github.com/docker/engine-api/types/filters"
  24. types "github.com/docker/engine-api/types/swarm"
  25. swarmagent "github.com/docker/swarmkit/agent"
  26. swarmapi "github.com/docker/swarmkit/api"
  27. "golang.org/x/net/context"
  28. )
  29. const swarmDirName = "swarm"
  30. const controlSocket = "control.sock"
  31. const swarmConnectTimeout = 20 * time.Second
  32. const swarmRequestTimeout = 20 * time.Second
  33. const stateFile = "docker-state.json"
  34. const defaultAddr = "0.0.0.0:2377"
  35. const (
  36. initialReconnectDelay = 100 * time.Millisecond
  37. maxReconnectDelay = 30 * time.Second
  38. )
  39. // ErrNoSwarm is returned on leaving a cluster that was never initialized
  40. var ErrNoSwarm = fmt.Errorf("This node is not part of a swarm")
  41. // ErrSwarmExists is returned on initialize or join request for a cluster that has already been activated
  42. var ErrSwarmExists = fmt.Errorf("This node is already part of a swarm. Use \"docker swarm leave\" to leave this swarm and join another one.")
  43. // ErrPendingSwarmExists is returned on initialize or join request for a cluster that is already processing a similar request but has not succeeded yet.
  44. var ErrPendingSwarmExists = fmt.Errorf("This node is processing an existing join request that has not succeeded yet. Use \"docker swarm leave\" to cancel the current request.")
  45. // ErrSwarmJoinTimeoutReached is returned when cluster join could not complete before timeout was reached.
  46. var ErrSwarmJoinTimeoutReached = fmt.Errorf("Timeout was reached before node was joined. The attempt to join the swarm will continue in the background. Use the \"docker info\" command to see the current swarm status of your node.")
  47. // defaultSpec contains some sane defaults if cluster options are missing on init
  48. var defaultSpec = types.Spec{
  49. Raft: types.RaftConfig{
  50. SnapshotInterval: 10000,
  51. KeepOldSnapshots: 0,
  52. LogEntriesForSlowFollowers: 500,
  53. HeartbeatTick: 1,
  54. ElectionTick: 3,
  55. },
  56. CAConfig: types.CAConfig{
  57. NodeCertExpiry: 90 * 24 * time.Hour,
  58. },
  59. Dispatcher: types.DispatcherConfig{
  60. HeartbeatPeriod: uint64((5 * time.Second).Nanoseconds()),
  61. },
  62. Orchestration: types.OrchestrationConfig{
  63. TaskHistoryRetentionLimit: 10,
  64. },
  65. }
  66. type state struct {
  67. // LocalAddr is this machine's local IP or hostname, if specified.
  68. LocalAddr string
  69. // RemoteAddr is the address that was given to "swarm join. It is used
  70. // to find LocalAddr if necessary.
  71. RemoteAddr string
  72. // ListenAddr is the address we bind to, including a port.
  73. ListenAddr string
  74. // AdvertiseAddr is the address other nodes should connect to,
  75. // including a port.
  76. AdvertiseAddr string
  77. }
  78. // NetworkSubnetsProvider exposes functions for retrieving the subnets
  79. // of networks managed by Docker, so they can be filtered.
  80. type NetworkSubnetsProvider interface {
  81. V4Subnets() []net.IPNet
  82. V6Subnets() []net.IPNet
  83. }
  84. // Config provides values for Cluster.
  85. type Config struct {
  86. Root string
  87. Name string
  88. Backend executorpkg.Backend
  89. NetworkSubnetsProvider NetworkSubnetsProvider
  90. // DefaultAdvertiseAddr is the default host/IP or network interface to use
  91. // if no AdvertiseAddr value is specified.
  92. DefaultAdvertiseAddr string
  93. }
  94. // Cluster provides capabilities to participate in a cluster as a worker or a
  95. // manager.
  96. type Cluster struct {
  97. sync.RWMutex
  98. *node
  99. root string
  100. config Config
  101. configEvent chan struct{} // todo: make this array and goroutine safe
  102. localAddr string
  103. actualLocalAddr string // after resolution, not persisted
  104. remoteAddr string
  105. listenAddr string
  106. advertiseAddr string
  107. stop bool
  108. err error
  109. cancelDelay func()
  110. }
  111. type node struct {
  112. *swarmagent.Node
  113. done chan struct{}
  114. ready bool
  115. conn *grpc.ClientConn
  116. client swarmapi.ControlClient
  117. reconnectDelay time.Duration
  118. }
  119. // New creates a new Cluster instance using provided config.
  120. func New(config Config) (*Cluster, error) {
  121. root := filepath.Join(config.Root, swarmDirName)
  122. if err := os.MkdirAll(root, 0700); err != nil {
  123. return nil, err
  124. }
  125. c := &Cluster{
  126. root: root,
  127. config: config,
  128. configEvent: make(chan struct{}, 10),
  129. }
  130. st, err := c.loadState()
  131. if err != nil {
  132. if os.IsNotExist(err) {
  133. return c, nil
  134. }
  135. return nil, err
  136. }
  137. n, err := c.startNewNode(false, st.LocalAddr, st.RemoteAddr, st.ListenAddr, st.AdvertiseAddr, "", "")
  138. if err != nil {
  139. return nil, err
  140. }
  141. select {
  142. case <-time.After(swarmConnectTimeout):
  143. logrus.Errorf("swarm component could not be started before timeout was reached")
  144. case <-n.Ready():
  145. case <-n.done:
  146. return nil, fmt.Errorf("swarm component could not be started: %v", c.err)
  147. }
  148. go c.reconnectOnFailure(n)
  149. return c, nil
  150. }
  151. func (c *Cluster) loadState() (*state, error) {
  152. dt, err := ioutil.ReadFile(filepath.Join(c.root, stateFile))
  153. if err != nil {
  154. return nil, err
  155. }
  156. // missing certificate means no actual state to restore from
  157. if _, err := os.Stat(filepath.Join(c.root, "certificates/swarm-node.crt")); err != nil {
  158. if os.IsNotExist(err) {
  159. c.clearState()
  160. }
  161. return nil, err
  162. }
  163. var st state
  164. if err := json.Unmarshal(dt, &st); err != nil {
  165. return nil, err
  166. }
  167. return &st, nil
  168. }
  169. func (c *Cluster) saveState() error {
  170. dt, err := json.Marshal(state{
  171. LocalAddr: c.localAddr,
  172. RemoteAddr: c.remoteAddr,
  173. ListenAddr: c.listenAddr,
  174. AdvertiseAddr: c.advertiseAddr,
  175. })
  176. if err != nil {
  177. return err
  178. }
  179. return ioutils.AtomicWriteFile(filepath.Join(c.root, stateFile), dt, 0600)
  180. }
  181. func (c *Cluster) reconnectOnFailure(n *node) {
  182. for {
  183. <-n.done
  184. c.Lock()
  185. if c.stop || c.node != nil {
  186. c.Unlock()
  187. return
  188. }
  189. n.reconnectDelay *= 2
  190. if n.reconnectDelay > maxReconnectDelay {
  191. n.reconnectDelay = maxReconnectDelay
  192. }
  193. logrus.Warnf("Restarting swarm in %.2f seconds", n.reconnectDelay.Seconds())
  194. delayCtx, cancel := context.WithTimeout(context.Background(), n.reconnectDelay)
  195. c.cancelDelay = cancel
  196. c.Unlock()
  197. <-delayCtx.Done()
  198. if delayCtx.Err() != context.DeadlineExceeded {
  199. return
  200. }
  201. c.Lock()
  202. if c.node != nil {
  203. c.Unlock()
  204. return
  205. }
  206. var err error
  207. n, err = c.startNewNode(false, c.localAddr, c.getRemoteAddress(), c.listenAddr, c.advertiseAddr, c.getRemoteAddress(), "")
  208. if err != nil {
  209. c.err = err
  210. close(n.done)
  211. }
  212. c.Unlock()
  213. }
  214. }
  215. func (c *Cluster) startNewNode(forceNewCluster bool, localAddr, remoteAddr, listenAddr, advertiseAddr, joinAddr, joinToken string) (*node, error) {
  216. if err := c.config.Backend.IsSwarmCompatible(); err != nil {
  217. return nil, err
  218. }
  219. actualLocalAddr := localAddr
  220. if actualLocalAddr == "" {
  221. // If localAddr was not specified, resolve it automatically
  222. // based on the route to joinAddr. localAddr can only be left
  223. // empty on "join".
  224. listenHost, _, err := net.SplitHostPort(listenAddr)
  225. if err != nil {
  226. return nil, fmt.Errorf("could not parse listen address: %v", err)
  227. }
  228. listenAddrIP := net.ParseIP(listenHost)
  229. if listenAddrIP == nil || !listenAddrIP.IsUnspecified() {
  230. actualLocalAddr = listenHost
  231. } else {
  232. if remoteAddr == "" {
  233. // Should never happen except using swarms created by
  234. // old versions that didn't save remoteAddr.
  235. remoteAddr = "8.8.8.8:53"
  236. }
  237. conn, err := net.Dial("udp", remoteAddr)
  238. if err != nil {
  239. return nil, fmt.Errorf("could not find local IP address: %v", err)
  240. }
  241. localHostPort := conn.LocalAddr().String()
  242. actualLocalAddr, _, _ = net.SplitHostPort(localHostPort)
  243. conn.Close()
  244. }
  245. }
  246. c.node = nil
  247. c.cancelDelay = nil
  248. c.stop = false
  249. n, err := swarmagent.NewNode(&swarmagent.NodeConfig{
  250. Hostname: c.config.Name,
  251. ForceNewCluster: forceNewCluster,
  252. ListenControlAPI: filepath.Join(c.root, controlSocket),
  253. ListenRemoteAPI: listenAddr,
  254. AdvertiseRemoteAPI: advertiseAddr,
  255. JoinAddr: joinAddr,
  256. StateDir: c.root,
  257. JoinToken: joinToken,
  258. Executor: container.NewExecutor(c.config.Backend),
  259. HeartbeatTick: 1,
  260. ElectionTick: 3,
  261. })
  262. if err != nil {
  263. return nil, err
  264. }
  265. ctx := context.Background()
  266. if err := n.Start(ctx); err != nil {
  267. return nil, err
  268. }
  269. node := &node{
  270. Node: n,
  271. done: make(chan struct{}),
  272. reconnectDelay: initialReconnectDelay,
  273. }
  274. c.node = node
  275. c.localAddr = localAddr
  276. c.actualLocalAddr = actualLocalAddr // not saved
  277. c.remoteAddr = remoteAddr
  278. c.listenAddr = listenAddr
  279. c.advertiseAddr = advertiseAddr
  280. c.saveState()
  281. c.config.Backend.SetClusterProvider(c)
  282. go func() {
  283. err := n.Err(ctx)
  284. if err != nil {
  285. logrus.Errorf("cluster exited with error: %v", err)
  286. }
  287. c.Lock()
  288. c.node = nil
  289. c.err = err
  290. c.Unlock()
  291. close(node.done)
  292. }()
  293. go func() {
  294. select {
  295. case <-n.Ready():
  296. c.Lock()
  297. node.ready = true
  298. c.err = nil
  299. c.Unlock()
  300. case <-ctx.Done():
  301. }
  302. c.configEvent <- struct{}{}
  303. }()
  304. go func() {
  305. for conn := range n.ListenControlSocket(ctx) {
  306. c.Lock()
  307. if node.conn != conn {
  308. if conn == nil {
  309. node.client = nil
  310. } else {
  311. node.client = swarmapi.NewControlClient(conn)
  312. }
  313. }
  314. node.conn = conn
  315. c.Unlock()
  316. c.configEvent <- struct{}{}
  317. }
  318. }()
  319. return node, nil
  320. }
  321. // Init initializes new cluster from user provided request.
  322. func (c *Cluster) Init(req types.InitRequest) (string, error) {
  323. c.Lock()
  324. if node := c.node; node != nil {
  325. if !req.ForceNewCluster {
  326. c.Unlock()
  327. return "", ErrSwarmExists
  328. }
  329. if err := c.stopNode(); err != nil {
  330. c.Unlock()
  331. return "", err
  332. }
  333. }
  334. if err := validateAndSanitizeInitRequest(&req); err != nil {
  335. c.Unlock()
  336. return "", err
  337. }
  338. listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
  339. if err != nil {
  340. c.Unlock()
  341. return "", err
  342. }
  343. advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
  344. if err != nil {
  345. c.Unlock()
  346. return "", err
  347. }
  348. localAddr := listenHost
  349. // If the advertise address is not one of the system's
  350. // addresses, we also require a listen address.
  351. listenAddrIP := net.ParseIP(listenHost)
  352. if listenAddrIP != nil && listenAddrIP.IsUnspecified() {
  353. advertiseIP := net.ParseIP(advertiseHost)
  354. if advertiseIP == nil {
  355. // not an IP
  356. c.Unlock()
  357. return "", errMustSpecifyListenAddr
  358. }
  359. systemIPs := listSystemIPs()
  360. found := false
  361. for _, systemIP := range systemIPs {
  362. if systemIP.Equal(advertiseIP) {
  363. found = true
  364. break
  365. }
  366. }
  367. if !found {
  368. c.Unlock()
  369. return "", errMustSpecifyListenAddr
  370. }
  371. localAddr = advertiseIP.String()
  372. }
  373. // todo: check current state existing
  374. n, err := c.startNewNode(req.ForceNewCluster, localAddr, "", net.JoinHostPort(listenHost, listenPort), net.JoinHostPort(advertiseHost, advertisePort), "", "")
  375. if err != nil {
  376. c.Unlock()
  377. return "", err
  378. }
  379. c.Unlock()
  380. select {
  381. case <-n.Ready():
  382. if err := initClusterSpec(n, req.Spec); err != nil {
  383. return "", err
  384. }
  385. go c.reconnectOnFailure(n)
  386. return n.NodeID(), nil
  387. case <-n.done:
  388. c.RLock()
  389. defer c.RUnlock()
  390. if !req.ForceNewCluster { // if failure on first attempt don't keep state
  391. if err := c.clearState(); err != nil {
  392. return "", err
  393. }
  394. }
  395. return "", c.err
  396. }
  397. }
  398. // Join makes current Cluster part of an existing swarm cluster.
  399. func (c *Cluster) Join(req types.JoinRequest) error {
  400. c.Lock()
  401. if node := c.node; node != nil {
  402. c.Unlock()
  403. return ErrSwarmExists
  404. }
  405. if err := validateAndSanitizeJoinRequest(&req); err != nil {
  406. c.Unlock()
  407. return err
  408. }
  409. listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
  410. if err != nil {
  411. c.Unlock()
  412. return err
  413. }
  414. var advertiseAddr string
  415. if req.AdvertiseAddr != "" {
  416. advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
  417. // For joining, we don't need to provide an advertise address,
  418. // since the remote side can detect it.
  419. if err == nil {
  420. advertiseAddr = net.JoinHostPort(advertiseHost, advertisePort)
  421. }
  422. }
  423. // todo: check current state existing
  424. n, err := c.startNewNode(false, "", req.RemoteAddrs[0], net.JoinHostPort(listenHost, listenPort), advertiseAddr, req.RemoteAddrs[0], req.JoinToken)
  425. if err != nil {
  426. c.Unlock()
  427. return err
  428. }
  429. c.Unlock()
  430. select {
  431. case <-time.After(swarmConnectTimeout):
  432. // attempt to connect will continue in background, also reconnecting
  433. go c.reconnectOnFailure(n)
  434. return ErrSwarmJoinTimeoutReached
  435. case <-n.Ready():
  436. go c.reconnectOnFailure(n)
  437. return nil
  438. case <-n.done:
  439. c.RLock()
  440. defer c.RUnlock()
  441. return c.err
  442. }
  443. }
  444. // stopNode is a helper that stops the active c.node and waits until it has
  445. // shut down. Call while keeping the cluster lock.
  446. func (c *Cluster) stopNode() error {
  447. if c.node == nil {
  448. return nil
  449. }
  450. c.stop = true
  451. if c.cancelDelay != nil {
  452. c.cancelDelay()
  453. c.cancelDelay = nil
  454. }
  455. node := c.node
  456. ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
  457. defer cancel()
  458. // TODO: can't hold lock on stop because it calls back to network
  459. c.Unlock()
  460. defer c.Lock()
  461. if err := node.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") {
  462. return err
  463. }
  464. <-node.done
  465. return nil
  466. }
  467. // Leave shuts down Cluster and removes current state.
  468. func (c *Cluster) Leave(force bool) error {
  469. c.Lock()
  470. node := c.node
  471. if node == nil {
  472. c.Unlock()
  473. return ErrNoSwarm
  474. }
  475. if node.Manager() != nil && !force {
  476. msg := "You are attempting to leave the swarm on a node that is participating as a manager. "
  477. if c.isActiveManager() {
  478. active, reachable, unreachable, err := c.managerStats()
  479. if err == nil {
  480. if active && reachable-2 <= unreachable {
  481. if reachable == 1 && unreachable == 0 {
  482. msg += "Removing the last manager erases all current state of the swarm. Use `--force` to ignore this message. "
  483. c.Unlock()
  484. return fmt.Errorf(msg)
  485. }
  486. msg += fmt.Sprintf("Removing this node leaves %v managers out of %v. Without a Raft quorum your swarm will be inaccessible. ", reachable-1, reachable+unreachable)
  487. }
  488. }
  489. } else {
  490. msg += "Doing so may lose the consensus of your cluster. "
  491. }
  492. msg += "The only way to restore a swarm that has lost consensus is to reinitialize it with `--force-new-cluster`. Use `--force` to suppress this message."
  493. c.Unlock()
  494. return fmt.Errorf(msg)
  495. }
  496. if err := c.stopNode(); err != nil {
  497. c.Unlock()
  498. return err
  499. }
  500. c.Unlock()
  501. if nodeID := node.NodeID(); nodeID != "" {
  502. for _, id := range c.config.Backend.ListContainersForNode(nodeID) {
  503. if err := c.config.Backend.ContainerRm(id, &apitypes.ContainerRmConfig{ForceRemove: true}); err != nil {
  504. logrus.Errorf("error removing %v: %v", id, err)
  505. }
  506. }
  507. }
  508. c.configEvent <- struct{}{}
  509. // todo: cleanup optional?
  510. if err := c.clearState(); err != nil {
  511. return err
  512. }
  513. return nil
  514. }
  515. func (c *Cluster) clearState() error {
  516. // todo: backup this data instead of removing?
  517. if err := os.RemoveAll(c.root); err != nil {
  518. return err
  519. }
  520. if err := os.MkdirAll(c.root, 0700); err != nil {
  521. return err
  522. }
  523. c.config.Backend.SetClusterProvider(nil)
  524. return nil
  525. }
  526. func (c *Cluster) getRequestContext() (context.Context, func()) { // TODO: not needed when requests don't block on qourum lost
  527. return context.WithTimeout(context.Background(), swarmRequestTimeout)
  528. }
  529. // Inspect retrieves the configuration properties of a managed swarm cluster.
  530. func (c *Cluster) Inspect() (types.Swarm, error) {
  531. c.RLock()
  532. defer c.RUnlock()
  533. if !c.isActiveManager() {
  534. return types.Swarm{}, c.errNoManager()
  535. }
  536. ctx, cancel := c.getRequestContext()
  537. defer cancel()
  538. swarm, err := getSwarm(ctx, c.client)
  539. if err != nil {
  540. return types.Swarm{}, err
  541. }
  542. if err != nil {
  543. return types.Swarm{}, err
  544. }
  545. return convert.SwarmFromGRPC(*swarm), nil
  546. }
  547. // Update updates configuration of a managed swarm cluster.
  548. func (c *Cluster) Update(version uint64, spec types.Spec, flags types.UpdateFlags) error {
  549. c.RLock()
  550. defer c.RUnlock()
  551. if !c.isActiveManager() {
  552. return c.errNoManager()
  553. }
  554. ctx, cancel := c.getRequestContext()
  555. defer cancel()
  556. swarm, err := getSwarm(ctx, c.client)
  557. if err != nil {
  558. return err
  559. }
  560. swarmSpec, err := convert.SwarmSpecToGRPC(spec)
  561. if err != nil {
  562. return err
  563. }
  564. _, err = c.client.UpdateCluster(
  565. ctx,
  566. &swarmapi.UpdateClusterRequest{
  567. ClusterID: swarm.ID,
  568. Spec: &swarmSpec,
  569. ClusterVersion: &swarmapi.Version{
  570. Index: version,
  571. },
  572. Rotation: swarmapi.JoinTokenRotation{
  573. RotateWorkerToken: flags.RotateWorkerToken,
  574. RotateManagerToken: flags.RotateManagerToken,
  575. },
  576. },
  577. )
  578. return err
  579. }
  580. // IsManager returns true if Cluster is participating as a manager.
  581. func (c *Cluster) IsManager() bool {
  582. c.RLock()
  583. defer c.RUnlock()
  584. return c.isActiveManager()
  585. }
  586. // IsAgent returns true if Cluster is participating as a worker/agent.
  587. func (c *Cluster) IsAgent() bool {
  588. c.RLock()
  589. defer c.RUnlock()
  590. return c.node != nil && c.ready
  591. }
  592. // GetLocalAddress returns the local address.
  593. func (c *Cluster) GetLocalAddress() string {
  594. c.RLock()
  595. defer c.RUnlock()
  596. return c.actualLocalAddr
  597. }
  598. // GetListenAddress returns the listen address.
  599. func (c *Cluster) GetListenAddress() string {
  600. c.RLock()
  601. defer c.RUnlock()
  602. return c.listenAddr
  603. }
  604. // GetAdvertiseAddress returns the remotely reachable address of this node.
  605. func (c *Cluster) GetAdvertiseAddress() string {
  606. c.RLock()
  607. defer c.RUnlock()
  608. if c.advertiseAddr != "" {
  609. advertiseHost, _, _ := net.SplitHostPort(c.advertiseAddr)
  610. return advertiseHost
  611. }
  612. return c.actualLocalAddr
  613. }
  614. // GetRemoteAddress returns a known advertise address of a remote manager if
  615. // available.
  616. // todo: change to array/connect with info
  617. func (c *Cluster) GetRemoteAddress() string {
  618. c.RLock()
  619. defer c.RUnlock()
  620. return c.getRemoteAddress()
  621. }
  622. func (c *Cluster) getRemoteAddress() string {
  623. if c.node == nil {
  624. return ""
  625. }
  626. nodeID := c.node.NodeID()
  627. for _, r := range c.node.Remotes() {
  628. if r.NodeID != nodeID {
  629. return r.Addr
  630. }
  631. }
  632. return ""
  633. }
  634. // ListenClusterEvents returns a channel that receives messages on cluster
  635. // participation changes.
  636. // todo: make cancelable and accessible to multiple callers
  637. func (c *Cluster) ListenClusterEvents() <-chan struct{} {
  638. return c.configEvent
  639. }
  640. // Info returns information about the current cluster state.
  641. func (c *Cluster) Info() types.Info {
  642. info := types.Info{
  643. NodeAddr: c.GetAdvertiseAddress(),
  644. }
  645. c.RLock()
  646. defer c.RUnlock()
  647. if c.node == nil {
  648. info.LocalNodeState = types.LocalNodeStateInactive
  649. if c.cancelDelay != nil {
  650. info.LocalNodeState = types.LocalNodeStateError
  651. }
  652. } else {
  653. info.LocalNodeState = types.LocalNodeStatePending
  654. if c.ready == true {
  655. info.LocalNodeState = types.LocalNodeStateActive
  656. }
  657. }
  658. if c.err != nil {
  659. info.Error = c.err.Error()
  660. }
  661. ctx, cancel := c.getRequestContext()
  662. defer cancel()
  663. if c.isActiveManager() {
  664. info.ControlAvailable = true
  665. swarm, err := c.Inspect()
  666. if err != nil {
  667. info.Error = err.Error()
  668. }
  669. // Strip JoinTokens
  670. info.Cluster = swarm.ClusterInfo
  671. if r, err := c.client.ListNodes(ctx, &swarmapi.ListNodesRequest{}); err == nil {
  672. info.Nodes = len(r.Nodes)
  673. for _, n := range r.Nodes {
  674. if n.ManagerStatus != nil {
  675. info.Managers = info.Managers + 1
  676. }
  677. }
  678. }
  679. }
  680. if c.node != nil {
  681. for _, r := range c.node.Remotes() {
  682. info.RemoteManagers = append(info.RemoteManagers, types.Peer{NodeID: r.NodeID, Addr: r.Addr})
  683. }
  684. info.NodeID = c.node.NodeID()
  685. }
  686. return info
  687. }
  688. // isActiveManager should not be called without a read lock
  689. func (c *Cluster) isActiveManager() bool {
  690. return c.node != nil && c.conn != nil
  691. }
  692. // errNoManager returns error describing why manager commands can't be used.
  693. // Call with read lock.
  694. func (c *Cluster) errNoManager() error {
  695. if c.node == nil {
  696. return fmt.Errorf("This node is not a swarm manager. Use \"docker swarm init\" or \"docker swarm join\" to connect this node to swarm and try again.")
  697. }
  698. if c.node.Manager() != nil {
  699. return fmt.Errorf("This node is not a swarm manager. Manager is being prepared or has trouble connecting to the cluster.")
  700. }
  701. return fmt.Errorf("This node is not a swarm manager. Worker nodes can't be used to view or modify cluster state. Please run this command on a manager node or promote the current node to a manager.")
  702. }
  703. // GetServices returns all services of a managed swarm cluster.
  704. func (c *Cluster) GetServices(options apitypes.ServiceListOptions) ([]types.Service, error) {
  705. c.RLock()
  706. defer c.RUnlock()
  707. if !c.isActiveManager() {
  708. return nil, c.errNoManager()
  709. }
  710. filters, err := newListServicesFilters(options.Filter)
  711. if err != nil {
  712. return nil, err
  713. }
  714. ctx, cancel := c.getRequestContext()
  715. defer cancel()
  716. r, err := c.client.ListServices(
  717. ctx,
  718. &swarmapi.ListServicesRequest{Filters: filters})
  719. if err != nil {
  720. return nil, err
  721. }
  722. services := []types.Service{}
  723. for _, service := range r.Services {
  724. services = append(services, convert.ServiceFromGRPC(*service))
  725. }
  726. return services, nil
  727. }
  728. // CreateService creates a new service in a managed swarm cluster.
  729. func (c *Cluster) CreateService(s types.ServiceSpec, encodedAuth string) (string, error) {
  730. c.RLock()
  731. defer c.RUnlock()
  732. if !c.isActiveManager() {
  733. return "", c.errNoManager()
  734. }
  735. ctx, cancel := c.getRequestContext()
  736. defer cancel()
  737. err := c.populateNetworkID(ctx, c.client, &s)
  738. if err != nil {
  739. return "", err
  740. }
  741. serviceSpec, err := convert.ServiceSpecToGRPC(s)
  742. if err != nil {
  743. return "", err
  744. }
  745. if encodedAuth != "" {
  746. ctnr := serviceSpec.Task.GetContainer()
  747. if ctnr == nil {
  748. return "", fmt.Errorf("service does not use container tasks")
  749. }
  750. ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
  751. }
  752. r, err := c.client.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
  753. if err != nil {
  754. return "", err
  755. }
  756. return r.Service.ID, nil
  757. }
  758. // GetService returns a service based on an ID or name.
  759. func (c *Cluster) GetService(input string) (types.Service, error) {
  760. c.RLock()
  761. defer c.RUnlock()
  762. if !c.isActiveManager() {
  763. return types.Service{}, c.errNoManager()
  764. }
  765. ctx, cancel := c.getRequestContext()
  766. defer cancel()
  767. service, err := getService(ctx, c.client, input)
  768. if err != nil {
  769. return types.Service{}, err
  770. }
  771. return convert.ServiceFromGRPC(*service), nil
  772. }
  773. // UpdateService updates existing service to match new properties.
  774. func (c *Cluster) UpdateService(serviceID string, version uint64, spec types.ServiceSpec, encodedAuth string) error {
  775. c.RLock()
  776. defer c.RUnlock()
  777. if !c.isActiveManager() {
  778. return c.errNoManager()
  779. }
  780. ctx, cancel := c.getRequestContext()
  781. defer cancel()
  782. err := c.populateNetworkID(ctx, c.client, &spec)
  783. if err != nil {
  784. return err
  785. }
  786. serviceSpec, err := convert.ServiceSpecToGRPC(spec)
  787. if err != nil {
  788. return err
  789. }
  790. if encodedAuth != "" {
  791. ctnr := serviceSpec.Task.GetContainer()
  792. if ctnr == nil {
  793. return fmt.Errorf("service does not use container tasks")
  794. }
  795. ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
  796. } else {
  797. // this is needed because if the encodedAuth isn't being updated then we
  798. // shouldn't lose it, and continue to use the one that was already present
  799. currentService, err := getService(ctx, c.client, serviceID)
  800. if err != nil {
  801. return err
  802. }
  803. ctnr := currentService.Spec.Task.GetContainer()
  804. if ctnr == nil {
  805. return fmt.Errorf("service does not use container tasks")
  806. }
  807. serviceSpec.Task.GetContainer().PullOptions = ctnr.PullOptions
  808. }
  809. _, err = c.client.UpdateService(
  810. ctx,
  811. &swarmapi.UpdateServiceRequest{
  812. ServiceID: serviceID,
  813. Spec: &serviceSpec,
  814. ServiceVersion: &swarmapi.Version{
  815. Index: version,
  816. },
  817. },
  818. )
  819. return err
  820. }
  821. // RemoveService removes a service from a managed swarm cluster.
  822. func (c *Cluster) RemoveService(input string) error {
  823. c.RLock()
  824. defer c.RUnlock()
  825. if !c.isActiveManager() {
  826. return c.errNoManager()
  827. }
  828. ctx, cancel := c.getRequestContext()
  829. defer cancel()
  830. service, err := getService(ctx, c.client, input)
  831. if err != nil {
  832. return err
  833. }
  834. if _, err := c.client.RemoveService(ctx, &swarmapi.RemoveServiceRequest{ServiceID: service.ID}); err != nil {
  835. return err
  836. }
  837. return nil
  838. }
  839. // GetNodes returns a list of all nodes known to a cluster.
  840. func (c *Cluster) GetNodes(options apitypes.NodeListOptions) ([]types.Node, error) {
  841. c.RLock()
  842. defer c.RUnlock()
  843. if !c.isActiveManager() {
  844. return nil, c.errNoManager()
  845. }
  846. filters, err := newListNodesFilters(options.Filter)
  847. if err != nil {
  848. return nil, err
  849. }
  850. ctx, cancel := c.getRequestContext()
  851. defer cancel()
  852. r, err := c.client.ListNodes(
  853. ctx,
  854. &swarmapi.ListNodesRequest{Filters: filters})
  855. if err != nil {
  856. return nil, err
  857. }
  858. nodes := []types.Node{}
  859. for _, node := range r.Nodes {
  860. nodes = append(nodes, convert.NodeFromGRPC(*node))
  861. }
  862. return nodes, nil
  863. }
  864. // GetNode returns a node based on an ID or name.
  865. func (c *Cluster) GetNode(input string) (types.Node, error) {
  866. c.RLock()
  867. defer c.RUnlock()
  868. if !c.isActiveManager() {
  869. return types.Node{}, c.errNoManager()
  870. }
  871. ctx, cancel := c.getRequestContext()
  872. defer cancel()
  873. node, err := getNode(ctx, c.client, input)
  874. if err != nil {
  875. return types.Node{}, err
  876. }
  877. return convert.NodeFromGRPC(*node), nil
  878. }
  879. // UpdateNode updates existing nodes properties.
  880. func (c *Cluster) UpdateNode(nodeID string, version uint64, spec types.NodeSpec) error {
  881. c.RLock()
  882. defer c.RUnlock()
  883. if !c.isActiveManager() {
  884. return c.errNoManager()
  885. }
  886. nodeSpec, err := convert.NodeSpecToGRPC(spec)
  887. if err != nil {
  888. return err
  889. }
  890. ctx, cancel := c.getRequestContext()
  891. defer cancel()
  892. _, err = c.client.UpdateNode(
  893. ctx,
  894. &swarmapi.UpdateNodeRequest{
  895. NodeID: nodeID,
  896. Spec: &nodeSpec,
  897. NodeVersion: &swarmapi.Version{
  898. Index: version,
  899. },
  900. },
  901. )
  902. return err
  903. }
  904. // RemoveNode removes a node from a cluster
  905. func (c *Cluster) RemoveNode(input string, force bool) error {
  906. c.RLock()
  907. defer c.RUnlock()
  908. if !c.isActiveManager() {
  909. return c.errNoManager()
  910. }
  911. ctx, cancel := c.getRequestContext()
  912. defer cancel()
  913. node, err := getNode(ctx, c.client, input)
  914. if err != nil {
  915. return err
  916. }
  917. if _, err := c.client.RemoveNode(ctx, &swarmapi.RemoveNodeRequest{NodeID: node.ID, Force: force}); err != nil {
  918. return err
  919. }
  920. return nil
  921. }
  922. // GetTasks returns a list of tasks matching the filter options.
  923. func (c *Cluster) GetTasks(options apitypes.TaskListOptions) ([]types.Task, error) {
  924. c.RLock()
  925. defer c.RUnlock()
  926. if !c.isActiveManager() {
  927. return nil, c.errNoManager()
  928. }
  929. byName := func(filter filters.Args) error {
  930. if filter.Include("service") {
  931. serviceFilters := filter.Get("service")
  932. for _, serviceFilter := range serviceFilters {
  933. service, err := c.GetService(serviceFilter)
  934. if err != nil {
  935. return err
  936. }
  937. filter.Del("service", serviceFilter)
  938. filter.Add("service", service.ID)
  939. }
  940. }
  941. if filter.Include("node") {
  942. nodeFilters := filter.Get("node")
  943. for _, nodeFilter := range nodeFilters {
  944. node, err := c.GetNode(nodeFilter)
  945. if err != nil {
  946. return err
  947. }
  948. filter.Del("node", nodeFilter)
  949. filter.Add("node", node.ID)
  950. }
  951. }
  952. return nil
  953. }
  954. filters, err := newListTasksFilters(options.Filter, byName)
  955. if err != nil {
  956. return nil, err
  957. }
  958. ctx, cancel := c.getRequestContext()
  959. defer cancel()
  960. r, err := c.client.ListTasks(
  961. ctx,
  962. &swarmapi.ListTasksRequest{Filters: filters})
  963. if err != nil {
  964. return nil, err
  965. }
  966. tasks := []types.Task{}
  967. for _, task := range r.Tasks {
  968. tasks = append(tasks, convert.TaskFromGRPC(*task))
  969. }
  970. return tasks, nil
  971. }
  972. // GetTask returns a task by an ID.
  973. func (c *Cluster) GetTask(input string) (types.Task, error) {
  974. c.RLock()
  975. defer c.RUnlock()
  976. if !c.isActiveManager() {
  977. return types.Task{}, c.errNoManager()
  978. }
  979. ctx, cancel := c.getRequestContext()
  980. defer cancel()
  981. task, err := getTask(ctx, c.client, input)
  982. if err != nil {
  983. return types.Task{}, err
  984. }
  985. return convert.TaskFromGRPC(*task), nil
  986. }
  987. // GetNetwork returns a cluster network by an ID.
  988. func (c *Cluster) GetNetwork(input string) (apitypes.NetworkResource, error) {
  989. c.RLock()
  990. defer c.RUnlock()
  991. if !c.isActiveManager() {
  992. return apitypes.NetworkResource{}, c.errNoManager()
  993. }
  994. ctx, cancel := c.getRequestContext()
  995. defer cancel()
  996. network, err := getNetwork(ctx, c.client, input)
  997. if err != nil {
  998. return apitypes.NetworkResource{}, err
  999. }
  1000. return convert.BasicNetworkFromGRPC(*network), nil
  1001. }
  1002. // GetNetworks returns all current cluster managed networks.
  1003. func (c *Cluster) GetNetworks() ([]apitypes.NetworkResource, error) {
  1004. c.RLock()
  1005. defer c.RUnlock()
  1006. if !c.isActiveManager() {
  1007. return nil, c.errNoManager()
  1008. }
  1009. ctx, cancel := c.getRequestContext()
  1010. defer cancel()
  1011. r, err := c.client.ListNetworks(ctx, &swarmapi.ListNetworksRequest{})
  1012. if err != nil {
  1013. return nil, err
  1014. }
  1015. var networks []apitypes.NetworkResource
  1016. for _, network := range r.Networks {
  1017. networks = append(networks, convert.BasicNetworkFromGRPC(*network))
  1018. }
  1019. return networks, nil
  1020. }
  1021. // CreateNetwork creates a new cluster managed network.
  1022. func (c *Cluster) CreateNetwork(s apitypes.NetworkCreateRequest) (string, error) {
  1023. c.RLock()
  1024. defer c.RUnlock()
  1025. if !c.isActiveManager() {
  1026. return "", c.errNoManager()
  1027. }
  1028. if runconfig.IsPreDefinedNetwork(s.Name) {
  1029. err := fmt.Errorf("%s is a pre-defined network and cannot be created", s.Name)
  1030. return "", errors.NewRequestForbiddenError(err)
  1031. }
  1032. ctx, cancel := c.getRequestContext()
  1033. defer cancel()
  1034. networkSpec := convert.BasicNetworkCreateToGRPC(s)
  1035. r, err := c.client.CreateNetwork(ctx, &swarmapi.CreateNetworkRequest{Spec: &networkSpec})
  1036. if err != nil {
  1037. return "", err
  1038. }
  1039. return r.Network.ID, nil
  1040. }
  1041. // RemoveNetwork removes a cluster network.
  1042. func (c *Cluster) RemoveNetwork(input string) error {
  1043. c.RLock()
  1044. defer c.RUnlock()
  1045. if !c.isActiveManager() {
  1046. return c.errNoManager()
  1047. }
  1048. ctx, cancel := c.getRequestContext()
  1049. defer cancel()
  1050. network, err := getNetwork(ctx, c.client, input)
  1051. if err != nil {
  1052. return err
  1053. }
  1054. if _, err := c.client.RemoveNetwork(ctx, &swarmapi.RemoveNetworkRequest{NetworkID: network.ID}); err != nil {
  1055. return err
  1056. }
  1057. return nil
  1058. }
  1059. func (c *Cluster) populateNetworkID(ctx context.Context, client swarmapi.ControlClient, s *types.ServiceSpec) error {
  1060. for i, n := range s.Networks {
  1061. apiNetwork, err := getNetwork(ctx, client, n.Target)
  1062. if err != nil {
  1063. if ln, _ := c.config.Backend.FindNetwork(n.Target); ln != nil && !ln.Info().Dynamic() {
  1064. err = fmt.Errorf("network %s is not eligible for docker services", ln.Name())
  1065. return errors.NewRequestForbiddenError(err)
  1066. }
  1067. return err
  1068. }
  1069. s.Networks[i].Target = apiNetwork.ID
  1070. }
  1071. return nil
  1072. }
  1073. func getNetwork(ctx context.Context, c swarmapi.ControlClient, input string) (*swarmapi.Network, error) {
  1074. // GetNetwork to match via full ID.
  1075. rg, err := c.GetNetwork(ctx, &swarmapi.GetNetworkRequest{NetworkID: input})
  1076. if err != nil {
  1077. // If any error (including NotFound), ListNetworks to match via ID prefix and full name.
  1078. rl, err := c.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: &swarmapi.ListNetworksRequest_Filters{Names: []string{input}}})
  1079. if err != nil || len(rl.Networks) == 0 {
  1080. rl, err = c.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: &swarmapi.ListNetworksRequest_Filters{IDPrefixes: []string{input}}})
  1081. }
  1082. if err != nil {
  1083. return nil, err
  1084. }
  1085. if len(rl.Networks) == 0 {
  1086. return nil, fmt.Errorf("network %s not found", input)
  1087. }
  1088. if l := len(rl.Networks); l > 1 {
  1089. return nil, fmt.Errorf("network %s is ambiguous (%d matches found)", input, l)
  1090. }
  1091. return rl.Networks[0], nil
  1092. }
  1093. return rg.Network, nil
  1094. }
  1095. // Cleanup stops active swarm node. This is run before daemon shutdown.
  1096. func (c *Cluster) Cleanup() {
  1097. c.Lock()
  1098. node := c.node
  1099. if node == nil {
  1100. c.Unlock()
  1101. return
  1102. }
  1103. defer c.Unlock()
  1104. if c.isActiveManager() {
  1105. active, reachable, unreachable, err := c.managerStats()
  1106. if err == nil {
  1107. singlenode := active && reachable == 1 && unreachable == 0
  1108. if active && !singlenode && reachable-2 <= unreachable {
  1109. logrus.Errorf("Leaving cluster with %v managers left out of %v. Raft quorum will be lost.", reachable-1, reachable+unreachable)
  1110. }
  1111. }
  1112. }
  1113. c.stopNode()
  1114. }
  1115. func (c *Cluster) managerStats() (current bool, reachable int, unreachable int, err error) {
  1116. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  1117. defer cancel()
  1118. nodes, err := c.client.ListNodes(ctx, &swarmapi.ListNodesRequest{})
  1119. if err != nil {
  1120. return false, 0, 0, err
  1121. }
  1122. for _, n := range nodes.Nodes {
  1123. if n.ManagerStatus != nil {
  1124. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_REACHABLE {
  1125. reachable++
  1126. if n.ID == c.node.NodeID() {
  1127. current = true
  1128. }
  1129. }
  1130. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_UNREACHABLE {
  1131. unreachable++
  1132. }
  1133. }
  1134. }
  1135. return
  1136. }
  1137. func validateAndSanitizeInitRequest(req *types.InitRequest) error {
  1138. var err error
  1139. req.ListenAddr, err = validateAddr(req.ListenAddr)
  1140. if err != nil {
  1141. return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
  1142. }
  1143. spec := &req.Spec
  1144. // provide sane defaults instead of erroring
  1145. if spec.Name == "" {
  1146. spec.Name = "default"
  1147. } else if spec.Name != "default" {
  1148. return stdliberrors.New(`swarm spec must be named "default"`)
  1149. }
  1150. if spec.Raft.SnapshotInterval == 0 {
  1151. spec.Raft.SnapshotInterval = defaultSpec.Raft.SnapshotInterval
  1152. }
  1153. if spec.Raft.LogEntriesForSlowFollowers == 0 {
  1154. spec.Raft.LogEntriesForSlowFollowers = defaultSpec.Raft.LogEntriesForSlowFollowers
  1155. }
  1156. if spec.Raft.ElectionTick == 0 {
  1157. spec.Raft.ElectionTick = defaultSpec.Raft.ElectionTick
  1158. }
  1159. if spec.Raft.HeartbeatTick == 0 {
  1160. spec.Raft.HeartbeatTick = defaultSpec.Raft.HeartbeatTick
  1161. }
  1162. if spec.Dispatcher.HeartbeatPeriod == 0 {
  1163. spec.Dispatcher.HeartbeatPeriod = defaultSpec.Dispatcher.HeartbeatPeriod
  1164. }
  1165. if spec.CAConfig.NodeCertExpiry == 0 {
  1166. spec.CAConfig.NodeCertExpiry = defaultSpec.CAConfig.NodeCertExpiry
  1167. }
  1168. if spec.Orchestration.TaskHistoryRetentionLimit == 0 {
  1169. spec.Orchestration.TaskHistoryRetentionLimit = defaultSpec.Orchestration.TaskHistoryRetentionLimit
  1170. }
  1171. return nil
  1172. }
  1173. func validateAndSanitizeJoinRequest(req *types.JoinRequest) error {
  1174. var err error
  1175. req.ListenAddr, err = validateAddr(req.ListenAddr)
  1176. if err != nil {
  1177. return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
  1178. }
  1179. if len(req.RemoteAddrs) == 0 {
  1180. return fmt.Errorf("at least 1 RemoteAddr is required to join")
  1181. }
  1182. for i := range req.RemoteAddrs {
  1183. req.RemoteAddrs[i], err = validateAddr(req.RemoteAddrs[i])
  1184. if err != nil {
  1185. return fmt.Errorf("invalid remoteAddr %q: %v", req.RemoteAddrs[i], err)
  1186. }
  1187. }
  1188. return nil
  1189. }
  1190. func validateAddr(addr string) (string, error) {
  1191. if addr == "" {
  1192. return addr, fmt.Errorf("invalid empty address")
  1193. }
  1194. newaddr, err := opts.ParseTCPAddr(addr, defaultAddr)
  1195. if err != nil {
  1196. return addr, nil
  1197. }
  1198. return strings.TrimPrefix(newaddr, "tcp://"), nil
  1199. }
  1200. func initClusterSpec(node *node, spec types.Spec) error {
  1201. ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
  1202. for conn := range node.ListenControlSocket(ctx) {
  1203. if ctx.Err() != nil {
  1204. return ctx.Err()
  1205. }
  1206. if conn != nil {
  1207. client := swarmapi.NewControlClient(conn)
  1208. var cluster *swarmapi.Cluster
  1209. for i := 0; ; i++ {
  1210. lcr, err := client.ListClusters(ctx, &swarmapi.ListClustersRequest{})
  1211. if err != nil {
  1212. return fmt.Errorf("error on listing clusters: %v", err)
  1213. }
  1214. if len(lcr.Clusters) == 0 {
  1215. if i < 10 {
  1216. time.Sleep(200 * time.Millisecond)
  1217. continue
  1218. }
  1219. return fmt.Errorf("empty list of clusters was returned")
  1220. }
  1221. cluster = lcr.Clusters[0]
  1222. break
  1223. }
  1224. newspec, err := convert.SwarmSpecToGRPC(spec)
  1225. if err != nil {
  1226. return fmt.Errorf("error updating cluster settings: %v", err)
  1227. }
  1228. _, err = client.UpdateCluster(ctx, &swarmapi.UpdateClusterRequest{
  1229. ClusterID: cluster.ID,
  1230. ClusterVersion: &cluster.Meta.Version,
  1231. Spec: &newspec,
  1232. })
  1233. if err != nil {
  1234. return fmt.Errorf("error updating cluster settings: %v", err)
  1235. }
  1236. return nil
  1237. }
  1238. }
  1239. return ctx.Err()
  1240. }