cluster.go 36 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421
  1. package cluster
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io/ioutil"
  6. "net"
  7. "os"
  8. "path/filepath"
  9. "strings"
  10. "sync"
  11. "time"
  12. "google.golang.org/grpc"
  13. "github.com/Sirupsen/logrus"
  14. "github.com/docker/docker/daemon/cluster/convert"
  15. executorpkg "github.com/docker/docker/daemon/cluster/executor"
  16. "github.com/docker/docker/daemon/cluster/executor/container"
  17. "github.com/docker/docker/errors"
  18. "github.com/docker/docker/opts"
  19. "github.com/docker/docker/pkg/ioutils"
  20. "github.com/docker/docker/runconfig"
  21. apitypes "github.com/docker/engine-api/types"
  22. "github.com/docker/engine-api/types/filters"
  23. types "github.com/docker/engine-api/types/swarm"
  24. swarmagent "github.com/docker/swarmkit/agent"
  25. swarmapi "github.com/docker/swarmkit/api"
  26. "golang.org/x/net/context"
  27. )
  28. const swarmDirName = "swarm"
  29. const controlSocket = "control.sock"
  30. const swarmConnectTimeout = 20 * time.Second
  31. const swarmRequestTimeout = 20 * time.Second
  32. const stateFile = "docker-state.json"
  33. const defaultAddr = "0.0.0.0:2377"
  34. const (
  35. initialReconnectDelay = 100 * time.Millisecond
  36. maxReconnectDelay = 30 * time.Second
  37. )
  38. // ErrNoSwarm is returned on leaving a cluster that was never initialized
  39. var ErrNoSwarm = fmt.Errorf("This node is not part of a swarm")
  40. // ErrSwarmExists is returned on initialize or join request for a cluster that has already been activated
  41. var ErrSwarmExists = fmt.Errorf("This node is already part of a swarm. Use \"docker swarm leave\" to leave this swarm and join another one.")
  42. // ErrPendingSwarmExists is returned on initialize or join request for a cluster that is already processing a similar request but has not succeeded yet.
  43. var ErrPendingSwarmExists = fmt.Errorf("This node is processing an existing join request that has not succeeded yet. Use \"docker swarm leave\" to cancel the current request.")
  44. // ErrSwarmJoinTimeoutReached is returned when cluster join could not complete before timeout was reached.
  45. var ErrSwarmJoinTimeoutReached = fmt.Errorf("Timeout was reached before node was joined. The attempt to join the swarm will continue in the background. Use the \"docker info\" command to see the current swarm status of your node.")
  46. // defaultSpec contains some sane defaults if cluster options are missing on init
  47. var defaultSpec = types.Spec{
  48. Raft: types.RaftConfig{
  49. SnapshotInterval: 10000,
  50. KeepOldSnapshots: 0,
  51. LogEntriesForSlowFollowers: 500,
  52. HeartbeatTick: 1,
  53. ElectionTick: 3,
  54. },
  55. CAConfig: types.CAConfig{
  56. NodeCertExpiry: 90 * 24 * time.Hour,
  57. },
  58. Dispatcher: types.DispatcherConfig{
  59. HeartbeatPeriod: uint64((5 * time.Second).Nanoseconds()),
  60. },
  61. Orchestration: types.OrchestrationConfig{
  62. TaskHistoryRetentionLimit: 10,
  63. },
  64. }
  65. type state struct {
  66. // LocalAddr is this machine's local IP or hostname, if specified.
  67. LocalAddr string
  68. // RemoteAddr is the address that was given to "swarm join. It is used
  69. // to find LocalAddr if necessary.
  70. RemoteAddr string
  71. // ListenAddr is the address we bind to, including a port.
  72. ListenAddr string
  73. // AdvertiseAddr is the address other nodes should connect to,
  74. // including a port.
  75. AdvertiseAddr string
  76. }
  77. // NetworkSubnetsProvider exposes functions for retrieving the subnets
  78. // of networks managed by Docker, so they can be filtered.
  79. type NetworkSubnetsProvider interface {
  80. V4Subnets() []net.IPNet
  81. V6Subnets() []net.IPNet
  82. }
  83. // Config provides values for Cluster.
  84. type Config struct {
  85. Root string
  86. Name string
  87. Backend executorpkg.Backend
  88. NetworkSubnetsProvider NetworkSubnetsProvider
  89. // DefaultAdvertiseAddr is the default host/IP or network interface to use
  90. // if no AdvertiseAddr value is specified.
  91. DefaultAdvertiseAddr string
  92. // path to store runtime state, such as the swarm control socket
  93. RuntimeRoot string
  94. }
  95. // Cluster provides capabilities to participate in a cluster as a worker or a
  96. // manager.
  97. type Cluster struct {
  98. sync.RWMutex
  99. *node
  100. root string
  101. runtimeRoot string
  102. config Config
  103. configEvent chan struct{} // todo: make this array and goroutine safe
  104. localAddr string
  105. actualLocalAddr string // after resolution, not persisted
  106. remoteAddr string
  107. listenAddr string
  108. advertiseAddr string
  109. stop bool
  110. err error
  111. cancelDelay func()
  112. }
  113. type node struct {
  114. *swarmagent.Node
  115. done chan struct{}
  116. ready bool
  117. conn *grpc.ClientConn
  118. client swarmapi.ControlClient
  119. reconnectDelay time.Duration
  120. }
  121. // New creates a new Cluster instance using provided config.
  122. func New(config Config) (*Cluster, error) {
  123. root := filepath.Join(config.Root, swarmDirName)
  124. if err := os.MkdirAll(root, 0700); err != nil {
  125. return nil, err
  126. }
  127. if config.RuntimeRoot == "" {
  128. config.RuntimeRoot = root
  129. }
  130. if err := os.MkdirAll(config.RuntimeRoot, 0700); err != nil {
  131. return nil, err
  132. }
  133. c := &Cluster{
  134. root: root,
  135. config: config,
  136. configEvent: make(chan struct{}, 10),
  137. runtimeRoot: config.RuntimeRoot,
  138. }
  139. st, err := c.loadState()
  140. if err != nil {
  141. if os.IsNotExist(err) {
  142. return c, nil
  143. }
  144. return nil, err
  145. }
  146. n, err := c.startNewNode(false, st.LocalAddr, st.RemoteAddr, st.ListenAddr, st.AdvertiseAddr, "", "")
  147. if err != nil {
  148. return nil, err
  149. }
  150. select {
  151. case <-time.After(swarmConnectTimeout):
  152. logrus.Errorf("swarm component could not be started before timeout was reached")
  153. case <-n.Ready():
  154. case <-n.done:
  155. return nil, fmt.Errorf("swarm component could not be started: %v", c.err)
  156. }
  157. go c.reconnectOnFailure(n)
  158. return c, nil
  159. }
  160. func (c *Cluster) loadState() (*state, error) {
  161. dt, err := ioutil.ReadFile(filepath.Join(c.root, stateFile))
  162. if err != nil {
  163. return nil, err
  164. }
  165. // missing certificate means no actual state to restore from
  166. if _, err := os.Stat(filepath.Join(c.root, "certificates/swarm-node.crt")); err != nil {
  167. if os.IsNotExist(err) {
  168. c.clearState()
  169. }
  170. return nil, err
  171. }
  172. var st state
  173. if err := json.Unmarshal(dt, &st); err != nil {
  174. return nil, err
  175. }
  176. return &st, nil
  177. }
  178. func (c *Cluster) saveState() error {
  179. dt, err := json.Marshal(state{
  180. LocalAddr: c.localAddr,
  181. RemoteAddr: c.remoteAddr,
  182. ListenAddr: c.listenAddr,
  183. AdvertiseAddr: c.advertiseAddr,
  184. })
  185. if err != nil {
  186. return err
  187. }
  188. return ioutils.AtomicWriteFile(filepath.Join(c.root, stateFile), dt, 0600)
  189. }
  190. func (c *Cluster) reconnectOnFailure(n *node) {
  191. for {
  192. <-n.done
  193. c.Lock()
  194. if c.stop || c.node != nil {
  195. c.Unlock()
  196. return
  197. }
  198. n.reconnectDelay *= 2
  199. if n.reconnectDelay > maxReconnectDelay {
  200. n.reconnectDelay = maxReconnectDelay
  201. }
  202. logrus.Warnf("Restarting swarm in %.2f seconds", n.reconnectDelay.Seconds())
  203. delayCtx, cancel := context.WithTimeout(context.Background(), n.reconnectDelay)
  204. c.cancelDelay = cancel
  205. c.Unlock()
  206. <-delayCtx.Done()
  207. if delayCtx.Err() != context.DeadlineExceeded {
  208. return
  209. }
  210. c.Lock()
  211. if c.node != nil {
  212. c.Unlock()
  213. return
  214. }
  215. var err error
  216. n, err = c.startNewNode(false, c.localAddr, c.getRemoteAddress(), c.listenAddr, c.advertiseAddr, c.getRemoteAddress(), "")
  217. if err != nil {
  218. c.err = err
  219. close(n.done)
  220. }
  221. c.Unlock()
  222. }
  223. }
  224. func (c *Cluster) startNewNode(forceNewCluster bool, localAddr, remoteAddr, listenAddr, advertiseAddr, joinAddr, joinToken string) (*node, error) {
  225. if err := c.config.Backend.IsSwarmCompatible(); err != nil {
  226. return nil, err
  227. }
  228. actualLocalAddr := localAddr
  229. if actualLocalAddr == "" {
  230. // If localAddr was not specified, resolve it automatically
  231. // based on the route to joinAddr. localAddr can only be left
  232. // empty on "join".
  233. listenHost, _, err := net.SplitHostPort(listenAddr)
  234. if err != nil {
  235. return nil, fmt.Errorf("could not parse listen address: %v", err)
  236. }
  237. listenAddrIP := net.ParseIP(listenHost)
  238. if listenAddrIP == nil || !listenAddrIP.IsUnspecified() {
  239. actualLocalAddr = listenHost
  240. } else {
  241. if remoteAddr == "" {
  242. // Should never happen except using swarms created by
  243. // old versions that didn't save remoteAddr.
  244. remoteAddr = "8.8.8.8:53"
  245. }
  246. conn, err := net.Dial("udp", remoteAddr)
  247. if err != nil {
  248. return nil, fmt.Errorf("could not find local IP address: %v", err)
  249. }
  250. localHostPort := conn.LocalAddr().String()
  251. actualLocalAddr, _, _ = net.SplitHostPort(localHostPort)
  252. conn.Close()
  253. }
  254. }
  255. c.node = nil
  256. c.cancelDelay = nil
  257. c.stop = false
  258. n, err := swarmagent.NewNode(&swarmagent.NodeConfig{
  259. Hostname: c.config.Name,
  260. ForceNewCluster: forceNewCluster,
  261. ListenControlAPI: filepath.Join(c.runtimeRoot, controlSocket),
  262. ListenRemoteAPI: listenAddr,
  263. AdvertiseRemoteAPI: advertiseAddr,
  264. JoinAddr: joinAddr,
  265. StateDir: c.root,
  266. JoinToken: joinToken,
  267. Executor: container.NewExecutor(c.config.Backend),
  268. HeartbeatTick: 1,
  269. ElectionTick: 3,
  270. })
  271. if err != nil {
  272. return nil, err
  273. }
  274. ctx := context.Background()
  275. if err := n.Start(ctx); err != nil {
  276. return nil, err
  277. }
  278. node := &node{
  279. Node: n,
  280. done: make(chan struct{}),
  281. reconnectDelay: initialReconnectDelay,
  282. }
  283. c.node = node
  284. c.localAddr = localAddr
  285. c.actualLocalAddr = actualLocalAddr // not saved
  286. c.remoteAddr = remoteAddr
  287. c.listenAddr = listenAddr
  288. c.advertiseAddr = advertiseAddr
  289. c.saveState()
  290. c.config.Backend.SetClusterProvider(c)
  291. go func() {
  292. err := n.Err(ctx)
  293. if err != nil {
  294. logrus.Errorf("cluster exited with error: %v", err)
  295. }
  296. c.Lock()
  297. c.node = nil
  298. c.err = err
  299. c.Unlock()
  300. close(node.done)
  301. }()
  302. go func() {
  303. select {
  304. case <-n.Ready():
  305. c.Lock()
  306. node.ready = true
  307. c.err = nil
  308. c.Unlock()
  309. case <-ctx.Done():
  310. }
  311. c.configEvent <- struct{}{}
  312. }()
  313. go func() {
  314. for conn := range n.ListenControlSocket(ctx) {
  315. c.Lock()
  316. if node.conn != conn {
  317. if conn == nil {
  318. node.client = nil
  319. } else {
  320. node.client = swarmapi.NewControlClient(conn)
  321. }
  322. }
  323. node.conn = conn
  324. c.Unlock()
  325. c.configEvent <- struct{}{}
  326. }
  327. }()
  328. return node, nil
  329. }
  330. // Init initializes new cluster from user provided request.
  331. func (c *Cluster) Init(req types.InitRequest) (string, error) {
  332. c.Lock()
  333. if node := c.node; node != nil {
  334. if !req.ForceNewCluster {
  335. c.Unlock()
  336. return "", ErrSwarmExists
  337. }
  338. if err := c.stopNode(); err != nil {
  339. c.Unlock()
  340. return "", err
  341. }
  342. }
  343. if err := validateAndSanitizeInitRequest(&req); err != nil {
  344. c.Unlock()
  345. return "", err
  346. }
  347. listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
  348. if err != nil {
  349. c.Unlock()
  350. return "", err
  351. }
  352. advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
  353. if err != nil {
  354. c.Unlock()
  355. return "", err
  356. }
  357. localAddr := listenHost
  358. // If the advertise address is not one of the system's
  359. // addresses, we also require a listen address.
  360. listenAddrIP := net.ParseIP(listenHost)
  361. if listenAddrIP != nil && listenAddrIP.IsUnspecified() {
  362. advertiseIP := net.ParseIP(advertiseHost)
  363. if advertiseIP == nil {
  364. // not an IP
  365. c.Unlock()
  366. return "", errMustSpecifyListenAddr
  367. }
  368. systemIPs := listSystemIPs()
  369. found := false
  370. for _, systemIP := range systemIPs {
  371. if systemIP.Equal(advertiseIP) {
  372. found = true
  373. break
  374. }
  375. }
  376. if !found {
  377. c.Unlock()
  378. return "", errMustSpecifyListenAddr
  379. }
  380. localAddr = advertiseIP.String()
  381. }
  382. // todo: check current state existing
  383. n, err := c.startNewNode(req.ForceNewCluster, localAddr, "", net.JoinHostPort(listenHost, listenPort), net.JoinHostPort(advertiseHost, advertisePort), "", "")
  384. if err != nil {
  385. c.Unlock()
  386. return "", err
  387. }
  388. c.Unlock()
  389. select {
  390. case <-n.Ready():
  391. if err := initClusterSpec(n, req.Spec); err != nil {
  392. return "", err
  393. }
  394. go c.reconnectOnFailure(n)
  395. return n.NodeID(), nil
  396. case <-n.done:
  397. c.RLock()
  398. defer c.RUnlock()
  399. if !req.ForceNewCluster { // if failure on first attempt don't keep state
  400. if err := c.clearState(); err != nil {
  401. return "", err
  402. }
  403. }
  404. return "", c.err
  405. }
  406. }
  407. // Join makes current Cluster part of an existing swarm cluster.
  408. func (c *Cluster) Join(req types.JoinRequest) error {
  409. c.Lock()
  410. if node := c.node; node != nil {
  411. c.Unlock()
  412. return ErrSwarmExists
  413. }
  414. if err := validateAndSanitizeJoinRequest(&req); err != nil {
  415. c.Unlock()
  416. return err
  417. }
  418. listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
  419. if err != nil {
  420. c.Unlock()
  421. return err
  422. }
  423. var advertiseAddr string
  424. advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
  425. // For joining, we don't need to provide an advertise address,
  426. // since the remote side can detect it.
  427. if err == nil {
  428. advertiseAddr = net.JoinHostPort(advertiseHost, advertisePort)
  429. }
  430. // todo: check current state existing
  431. n, err := c.startNewNode(false, "", req.RemoteAddrs[0], net.JoinHostPort(listenHost, listenPort), advertiseAddr, req.RemoteAddrs[0], req.JoinToken)
  432. if err != nil {
  433. c.Unlock()
  434. return err
  435. }
  436. c.Unlock()
  437. select {
  438. case <-time.After(swarmConnectTimeout):
  439. // attempt to connect will continue in background, also reconnecting
  440. go c.reconnectOnFailure(n)
  441. return ErrSwarmJoinTimeoutReached
  442. case <-n.Ready():
  443. go c.reconnectOnFailure(n)
  444. return nil
  445. case <-n.done:
  446. c.RLock()
  447. defer c.RUnlock()
  448. return c.err
  449. }
  450. }
  451. // stopNode is a helper that stops the active c.node and waits until it has
  452. // shut down. Call while keeping the cluster lock.
  453. func (c *Cluster) stopNode() error {
  454. if c.node == nil {
  455. return nil
  456. }
  457. c.stop = true
  458. if c.cancelDelay != nil {
  459. c.cancelDelay()
  460. c.cancelDelay = nil
  461. }
  462. node := c.node
  463. ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
  464. defer cancel()
  465. // TODO: can't hold lock on stop because it calls back to network
  466. c.Unlock()
  467. defer c.Lock()
  468. if err := node.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") {
  469. return err
  470. }
  471. <-node.done
  472. return nil
  473. }
  474. // Leave shuts down Cluster and removes current state.
  475. func (c *Cluster) Leave(force bool) error {
  476. c.Lock()
  477. node := c.node
  478. if node == nil {
  479. c.Unlock()
  480. return ErrNoSwarm
  481. }
  482. if node.Manager() != nil && !force {
  483. msg := "You are attempting to leave the swarm on a node that is participating as a manager. "
  484. if c.isActiveManager() {
  485. active, reachable, unreachable, err := c.managerStats()
  486. if err == nil {
  487. if active && reachable-2 <= unreachable {
  488. if reachable == 1 && unreachable == 0 {
  489. msg += "Removing the last manager erases all current state of the swarm. Use `--force` to ignore this message. "
  490. c.Unlock()
  491. return fmt.Errorf(msg)
  492. }
  493. msg += fmt.Sprintf("Removing this node leaves %v managers out of %v. Without a Raft quorum your swarm will be inaccessible. ", reachable-1, reachable+unreachable)
  494. }
  495. }
  496. } else {
  497. msg += "Doing so may lose the consensus of your cluster. "
  498. }
  499. msg += "The only way to restore a swarm that has lost consensus is to reinitialize it with `--force-new-cluster`. Use `--force` to suppress this message."
  500. c.Unlock()
  501. return fmt.Errorf(msg)
  502. }
  503. if err := c.stopNode(); err != nil {
  504. c.Unlock()
  505. return err
  506. }
  507. c.Unlock()
  508. if nodeID := node.NodeID(); nodeID != "" {
  509. for _, id := range c.config.Backend.ListContainersForNode(nodeID) {
  510. if err := c.config.Backend.ContainerRm(id, &apitypes.ContainerRmConfig{ForceRemove: true}); err != nil {
  511. logrus.Errorf("error removing %v: %v", id, err)
  512. }
  513. }
  514. }
  515. c.configEvent <- struct{}{}
  516. // todo: cleanup optional?
  517. if err := c.clearState(); err != nil {
  518. return err
  519. }
  520. return nil
  521. }
  522. func (c *Cluster) clearState() error {
  523. // todo: backup this data instead of removing?
  524. if err := os.RemoveAll(c.root); err != nil {
  525. return err
  526. }
  527. if err := os.MkdirAll(c.root, 0700); err != nil {
  528. return err
  529. }
  530. c.config.Backend.SetClusterProvider(nil)
  531. return nil
  532. }
  533. func (c *Cluster) getRequestContext() (context.Context, func()) { // TODO: not needed when requests don't block on qourum lost
  534. return context.WithTimeout(context.Background(), swarmRequestTimeout)
  535. }
  536. // Inspect retrieves the configuration properties of a managed swarm cluster.
  537. func (c *Cluster) Inspect() (types.Swarm, error) {
  538. c.RLock()
  539. defer c.RUnlock()
  540. if !c.isActiveManager() {
  541. return types.Swarm{}, c.errNoManager()
  542. }
  543. ctx, cancel := c.getRequestContext()
  544. defer cancel()
  545. swarm, err := getSwarm(ctx, c.client)
  546. if err != nil {
  547. return types.Swarm{}, err
  548. }
  549. if err != nil {
  550. return types.Swarm{}, err
  551. }
  552. return convert.SwarmFromGRPC(*swarm), nil
  553. }
  554. // Update updates configuration of a managed swarm cluster.
  555. func (c *Cluster) Update(version uint64, spec types.Spec, flags types.UpdateFlags) error {
  556. c.RLock()
  557. defer c.RUnlock()
  558. if !c.isActiveManager() {
  559. return c.errNoManager()
  560. }
  561. ctx, cancel := c.getRequestContext()
  562. defer cancel()
  563. swarm, err := getSwarm(ctx, c.client)
  564. if err != nil {
  565. return err
  566. }
  567. swarmSpec, err := convert.SwarmSpecToGRPC(spec)
  568. if err != nil {
  569. return err
  570. }
  571. _, err = c.client.UpdateCluster(
  572. ctx,
  573. &swarmapi.UpdateClusterRequest{
  574. ClusterID: swarm.ID,
  575. Spec: &swarmSpec,
  576. ClusterVersion: &swarmapi.Version{
  577. Index: version,
  578. },
  579. Rotation: swarmapi.JoinTokenRotation{
  580. RotateWorkerToken: flags.RotateWorkerToken,
  581. RotateManagerToken: flags.RotateManagerToken,
  582. },
  583. },
  584. )
  585. return err
  586. }
  587. // IsManager returns true if Cluster is participating as a manager.
  588. func (c *Cluster) IsManager() bool {
  589. c.RLock()
  590. defer c.RUnlock()
  591. return c.isActiveManager()
  592. }
  593. // IsAgent returns true if Cluster is participating as a worker/agent.
  594. func (c *Cluster) IsAgent() bool {
  595. c.RLock()
  596. defer c.RUnlock()
  597. return c.node != nil && c.ready
  598. }
  599. // GetLocalAddress returns the local address.
  600. func (c *Cluster) GetLocalAddress() string {
  601. c.RLock()
  602. defer c.RUnlock()
  603. return c.actualLocalAddr
  604. }
  605. // GetAdvertiseAddress returns the remotely reachable address of this node.
  606. func (c *Cluster) GetAdvertiseAddress() string {
  607. c.RLock()
  608. defer c.RUnlock()
  609. if c.advertiseAddr != "" {
  610. advertiseHost, _, _ := net.SplitHostPort(c.advertiseAddr)
  611. return advertiseHost
  612. }
  613. return c.actualLocalAddr
  614. }
  615. // GetRemoteAddress returns a known advertise address of a remote manager if
  616. // available.
  617. // todo: change to array/connect with info
  618. func (c *Cluster) GetRemoteAddress() string {
  619. c.RLock()
  620. defer c.RUnlock()
  621. return c.getRemoteAddress()
  622. }
  623. func (c *Cluster) getRemoteAddress() string {
  624. if c.node == nil {
  625. return ""
  626. }
  627. nodeID := c.node.NodeID()
  628. for _, r := range c.node.Remotes() {
  629. if r.NodeID != nodeID {
  630. return r.Addr
  631. }
  632. }
  633. return ""
  634. }
  635. // ListenClusterEvents returns a channel that receives messages on cluster
  636. // participation changes.
  637. // todo: make cancelable and accessible to multiple callers
  638. func (c *Cluster) ListenClusterEvents() <-chan struct{} {
  639. return c.configEvent
  640. }
  641. // Info returns information about the current cluster state.
  642. func (c *Cluster) Info() types.Info {
  643. info := types.Info{
  644. NodeAddr: c.GetAdvertiseAddress(),
  645. }
  646. c.RLock()
  647. defer c.RUnlock()
  648. if c.node == nil {
  649. info.LocalNodeState = types.LocalNodeStateInactive
  650. if c.cancelDelay != nil {
  651. info.LocalNodeState = types.LocalNodeStateError
  652. }
  653. } else {
  654. info.LocalNodeState = types.LocalNodeStatePending
  655. if c.ready == true {
  656. info.LocalNodeState = types.LocalNodeStateActive
  657. }
  658. }
  659. if c.err != nil {
  660. info.Error = c.err.Error()
  661. }
  662. ctx, cancel := c.getRequestContext()
  663. defer cancel()
  664. if c.isActiveManager() {
  665. info.ControlAvailable = true
  666. swarm, err := c.Inspect()
  667. if err != nil {
  668. info.Error = err.Error()
  669. }
  670. // Strip JoinTokens
  671. info.Cluster = swarm.ClusterInfo
  672. if r, err := c.client.ListNodes(ctx, &swarmapi.ListNodesRequest{}); err == nil {
  673. info.Nodes = len(r.Nodes)
  674. for _, n := range r.Nodes {
  675. if n.ManagerStatus != nil {
  676. info.Managers = info.Managers + 1
  677. }
  678. }
  679. }
  680. }
  681. if c.node != nil {
  682. for _, r := range c.node.Remotes() {
  683. info.RemoteManagers = append(info.RemoteManagers, types.Peer{NodeID: r.NodeID, Addr: r.Addr})
  684. }
  685. info.NodeID = c.node.NodeID()
  686. }
  687. return info
  688. }
  689. // isActiveManager should not be called without a read lock
  690. func (c *Cluster) isActiveManager() bool {
  691. return c.node != nil && c.conn != nil
  692. }
  693. // errNoManager returns error describing why manager commands can't be used.
  694. // Call with read lock.
  695. func (c *Cluster) errNoManager() error {
  696. if c.node == nil {
  697. return fmt.Errorf("This node is not a swarm manager. Use \"docker swarm init\" or \"docker swarm join\" to connect this node to swarm and try again.")
  698. }
  699. if c.node.Manager() != nil {
  700. return fmt.Errorf("This node is not a swarm manager. Manager is being prepared or has trouble connecting to the cluster.")
  701. }
  702. return fmt.Errorf("This node is not a swarm manager. Worker nodes can't be used to view or modify cluster state. Please run this command on a manager node or promote the current node to a manager.")
  703. }
  704. // GetServices returns all services of a managed swarm cluster.
  705. func (c *Cluster) GetServices(options apitypes.ServiceListOptions) ([]types.Service, error) {
  706. c.RLock()
  707. defer c.RUnlock()
  708. if !c.isActiveManager() {
  709. return nil, c.errNoManager()
  710. }
  711. filters, err := newListServicesFilters(options.Filter)
  712. if err != nil {
  713. return nil, err
  714. }
  715. ctx, cancel := c.getRequestContext()
  716. defer cancel()
  717. r, err := c.client.ListServices(
  718. ctx,
  719. &swarmapi.ListServicesRequest{Filters: filters})
  720. if err != nil {
  721. return nil, err
  722. }
  723. services := []types.Service{}
  724. for _, service := range r.Services {
  725. services = append(services, convert.ServiceFromGRPC(*service))
  726. }
  727. return services, nil
  728. }
  729. // CreateService creates a new service in a managed swarm cluster.
  730. func (c *Cluster) CreateService(s types.ServiceSpec, encodedAuth string) (string, error) {
  731. c.RLock()
  732. defer c.RUnlock()
  733. if !c.isActiveManager() {
  734. return "", c.errNoManager()
  735. }
  736. ctx, cancel := c.getRequestContext()
  737. defer cancel()
  738. err := c.populateNetworkID(ctx, c.client, &s)
  739. if err != nil {
  740. return "", err
  741. }
  742. serviceSpec, err := convert.ServiceSpecToGRPC(s)
  743. if err != nil {
  744. return "", err
  745. }
  746. if encodedAuth != "" {
  747. ctnr := serviceSpec.Task.GetContainer()
  748. if ctnr == nil {
  749. return "", fmt.Errorf("service does not use container tasks")
  750. }
  751. ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
  752. }
  753. r, err := c.client.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
  754. if err != nil {
  755. return "", err
  756. }
  757. return r.Service.ID, nil
  758. }
  759. // GetService returns a service based on an ID or name.
  760. func (c *Cluster) GetService(input string) (types.Service, error) {
  761. c.RLock()
  762. defer c.RUnlock()
  763. if !c.isActiveManager() {
  764. return types.Service{}, c.errNoManager()
  765. }
  766. ctx, cancel := c.getRequestContext()
  767. defer cancel()
  768. service, err := getService(ctx, c.client, input)
  769. if err != nil {
  770. return types.Service{}, err
  771. }
  772. return convert.ServiceFromGRPC(*service), nil
  773. }
  774. // UpdateService updates existing service to match new properties.
  775. func (c *Cluster) UpdateService(serviceID string, version uint64, spec types.ServiceSpec, encodedAuth string) error {
  776. c.RLock()
  777. defer c.RUnlock()
  778. if !c.isActiveManager() {
  779. return c.errNoManager()
  780. }
  781. ctx, cancel := c.getRequestContext()
  782. defer cancel()
  783. err := c.populateNetworkID(ctx, c.client, &spec)
  784. if err != nil {
  785. return err
  786. }
  787. serviceSpec, err := convert.ServiceSpecToGRPC(spec)
  788. if err != nil {
  789. return err
  790. }
  791. if encodedAuth != "" {
  792. ctnr := serviceSpec.Task.GetContainer()
  793. if ctnr == nil {
  794. return fmt.Errorf("service does not use container tasks")
  795. }
  796. ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
  797. } else {
  798. // this is needed because if the encodedAuth isn't being updated then we
  799. // shouldn't lose it, and continue to use the one that was already present
  800. currentService, err := getService(ctx, c.client, serviceID)
  801. if err != nil {
  802. return err
  803. }
  804. ctnr := currentService.Spec.Task.GetContainer()
  805. if ctnr == nil {
  806. return fmt.Errorf("service does not use container tasks")
  807. }
  808. serviceSpec.Task.GetContainer().PullOptions = ctnr.PullOptions
  809. }
  810. _, err = c.client.UpdateService(
  811. ctx,
  812. &swarmapi.UpdateServiceRequest{
  813. ServiceID: serviceID,
  814. Spec: &serviceSpec,
  815. ServiceVersion: &swarmapi.Version{
  816. Index: version,
  817. },
  818. },
  819. )
  820. return err
  821. }
  822. // RemoveService removes a service from a managed swarm cluster.
  823. func (c *Cluster) RemoveService(input string) error {
  824. c.RLock()
  825. defer c.RUnlock()
  826. if !c.isActiveManager() {
  827. return c.errNoManager()
  828. }
  829. ctx, cancel := c.getRequestContext()
  830. defer cancel()
  831. service, err := getService(ctx, c.client, input)
  832. if err != nil {
  833. return err
  834. }
  835. if _, err := c.client.RemoveService(ctx, &swarmapi.RemoveServiceRequest{ServiceID: service.ID}); err != nil {
  836. return err
  837. }
  838. return nil
  839. }
  840. // GetNodes returns a list of all nodes known to a cluster.
  841. func (c *Cluster) GetNodes(options apitypes.NodeListOptions) ([]types.Node, error) {
  842. c.RLock()
  843. defer c.RUnlock()
  844. if !c.isActiveManager() {
  845. return nil, c.errNoManager()
  846. }
  847. filters, err := newListNodesFilters(options.Filter)
  848. if err != nil {
  849. return nil, err
  850. }
  851. ctx, cancel := c.getRequestContext()
  852. defer cancel()
  853. r, err := c.client.ListNodes(
  854. ctx,
  855. &swarmapi.ListNodesRequest{Filters: filters})
  856. if err != nil {
  857. return nil, err
  858. }
  859. nodes := []types.Node{}
  860. for _, node := range r.Nodes {
  861. nodes = append(nodes, convert.NodeFromGRPC(*node))
  862. }
  863. return nodes, nil
  864. }
  865. // GetNode returns a node based on an ID or name.
  866. func (c *Cluster) GetNode(input string) (types.Node, error) {
  867. c.RLock()
  868. defer c.RUnlock()
  869. if !c.isActiveManager() {
  870. return types.Node{}, c.errNoManager()
  871. }
  872. ctx, cancel := c.getRequestContext()
  873. defer cancel()
  874. node, err := getNode(ctx, c.client, input)
  875. if err != nil {
  876. return types.Node{}, err
  877. }
  878. return convert.NodeFromGRPC(*node), nil
  879. }
  880. // UpdateNode updates existing nodes properties.
  881. func (c *Cluster) UpdateNode(nodeID string, version uint64, spec types.NodeSpec) error {
  882. c.RLock()
  883. defer c.RUnlock()
  884. if !c.isActiveManager() {
  885. return c.errNoManager()
  886. }
  887. nodeSpec, err := convert.NodeSpecToGRPC(spec)
  888. if err != nil {
  889. return err
  890. }
  891. ctx, cancel := c.getRequestContext()
  892. defer cancel()
  893. _, err = c.client.UpdateNode(
  894. ctx,
  895. &swarmapi.UpdateNodeRequest{
  896. NodeID: nodeID,
  897. Spec: &nodeSpec,
  898. NodeVersion: &swarmapi.Version{
  899. Index: version,
  900. },
  901. },
  902. )
  903. return err
  904. }
  905. // RemoveNode removes a node from a cluster
  906. func (c *Cluster) RemoveNode(input string, force bool) error {
  907. c.RLock()
  908. defer c.RUnlock()
  909. if !c.isActiveManager() {
  910. return c.errNoManager()
  911. }
  912. ctx, cancel := c.getRequestContext()
  913. defer cancel()
  914. node, err := getNode(ctx, c.client, input)
  915. if err != nil {
  916. return err
  917. }
  918. if _, err := c.client.RemoveNode(ctx, &swarmapi.RemoveNodeRequest{NodeID: node.ID, Force: force}); err != nil {
  919. return err
  920. }
  921. return nil
  922. }
  923. // GetTasks returns a list of tasks matching the filter options.
  924. func (c *Cluster) GetTasks(options apitypes.TaskListOptions) ([]types.Task, error) {
  925. c.RLock()
  926. defer c.RUnlock()
  927. if !c.isActiveManager() {
  928. return nil, c.errNoManager()
  929. }
  930. byName := func(filter filters.Args) error {
  931. if filter.Include("service") {
  932. serviceFilters := filter.Get("service")
  933. for _, serviceFilter := range serviceFilters {
  934. service, err := c.GetService(serviceFilter)
  935. if err != nil {
  936. return err
  937. }
  938. filter.Del("service", serviceFilter)
  939. filter.Add("service", service.ID)
  940. }
  941. }
  942. if filter.Include("node") {
  943. nodeFilters := filter.Get("node")
  944. for _, nodeFilter := range nodeFilters {
  945. node, err := c.GetNode(nodeFilter)
  946. if err != nil {
  947. return err
  948. }
  949. filter.Del("node", nodeFilter)
  950. filter.Add("node", node.ID)
  951. }
  952. }
  953. return nil
  954. }
  955. filters, err := newListTasksFilters(options.Filter, byName)
  956. if err != nil {
  957. return nil, err
  958. }
  959. ctx, cancel := c.getRequestContext()
  960. defer cancel()
  961. r, err := c.client.ListTasks(
  962. ctx,
  963. &swarmapi.ListTasksRequest{Filters: filters})
  964. if err != nil {
  965. return nil, err
  966. }
  967. tasks := []types.Task{}
  968. for _, task := range r.Tasks {
  969. tasks = append(tasks, convert.TaskFromGRPC(*task))
  970. }
  971. return tasks, nil
  972. }
  973. // GetTask returns a task by an ID.
  974. func (c *Cluster) GetTask(input string) (types.Task, error) {
  975. c.RLock()
  976. defer c.RUnlock()
  977. if !c.isActiveManager() {
  978. return types.Task{}, c.errNoManager()
  979. }
  980. ctx, cancel := c.getRequestContext()
  981. defer cancel()
  982. task, err := getTask(ctx, c.client, input)
  983. if err != nil {
  984. return types.Task{}, err
  985. }
  986. return convert.TaskFromGRPC(*task), nil
  987. }
  988. // GetNetwork returns a cluster network by an ID.
  989. func (c *Cluster) GetNetwork(input string) (apitypes.NetworkResource, error) {
  990. c.RLock()
  991. defer c.RUnlock()
  992. if !c.isActiveManager() {
  993. return apitypes.NetworkResource{}, c.errNoManager()
  994. }
  995. ctx, cancel := c.getRequestContext()
  996. defer cancel()
  997. network, err := getNetwork(ctx, c.client, input)
  998. if err != nil {
  999. return apitypes.NetworkResource{}, err
  1000. }
  1001. return convert.BasicNetworkFromGRPC(*network), nil
  1002. }
  1003. // GetNetworks returns all current cluster managed networks.
  1004. func (c *Cluster) GetNetworks() ([]apitypes.NetworkResource, error) {
  1005. c.RLock()
  1006. defer c.RUnlock()
  1007. if !c.isActiveManager() {
  1008. return nil, c.errNoManager()
  1009. }
  1010. ctx, cancel := c.getRequestContext()
  1011. defer cancel()
  1012. r, err := c.client.ListNetworks(ctx, &swarmapi.ListNetworksRequest{})
  1013. if err != nil {
  1014. return nil, err
  1015. }
  1016. var networks []apitypes.NetworkResource
  1017. for _, network := range r.Networks {
  1018. networks = append(networks, convert.BasicNetworkFromGRPC(*network))
  1019. }
  1020. return networks, nil
  1021. }
  1022. // CreateNetwork creates a new cluster managed network.
  1023. func (c *Cluster) CreateNetwork(s apitypes.NetworkCreateRequest) (string, error) {
  1024. c.RLock()
  1025. defer c.RUnlock()
  1026. if !c.isActiveManager() {
  1027. return "", c.errNoManager()
  1028. }
  1029. if runconfig.IsPreDefinedNetwork(s.Name) {
  1030. err := fmt.Errorf("%s is a pre-defined network and cannot be created", s.Name)
  1031. return "", errors.NewRequestForbiddenError(err)
  1032. }
  1033. ctx, cancel := c.getRequestContext()
  1034. defer cancel()
  1035. networkSpec := convert.BasicNetworkCreateToGRPC(s)
  1036. r, err := c.client.CreateNetwork(ctx, &swarmapi.CreateNetworkRequest{Spec: &networkSpec})
  1037. if err != nil {
  1038. return "", err
  1039. }
  1040. return r.Network.ID, nil
  1041. }
  1042. // RemoveNetwork removes a cluster network.
  1043. func (c *Cluster) RemoveNetwork(input string) error {
  1044. c.RLock()
  1045. defer c.RUnlock()
  1046. if !c.isActiveManager() {
  1047. return c.errNoManager()
  1048. }
  1049. ctx, cancel := c.getRequestContext()
  1050. defer cancel()
  1051. network, err := getNetwork(ctx, c.client, input)
  1052. if err != nil {
  1053. return err
  1054. }
  1055. if _, err := c.client.RemoveNetwork(ctx, &swarmapi.RemoveNetworkRequest{NetworkID: network.ID}); err != nil {
  1056. return err
  1057. }
  1058. return nil
  1059. }
  1060. func (c *Cluster) populateNetworkID(ctx context.Context, client swarmapi.ControlClient, s *types.ServiceSpec) error {
  1061. for i, n := range s.Networks {
  1062. apiNetwork, err := getNetwork(ctx, client, n.Target)
  1063. if err != nil {
  1064. if ln, _ := c.config.Backend.FindNetwork(n.Target); ln != nil && !ln.Info().Dynamic() {
  1065. err = fmt.Errorf("network %s is not eligible for docker services", ln.Name())
  1066. return errors.NewRequestForbiddenError(err)
  1067. }
  1068. return err
  1069. }
  1070. s.Networks[i].Target = apiNetwork.ID
  1071. }
  1072. return nil
  1073. }
  1074. func getNetwork(ctx context.Context, c swarmapi.ControlClient, input string) (*swarmapi.Network, error) {
  1075. // GetNetwork to match via full ID.
  1076. rg, err := c.GetNetwork(ctx, &swarmapi.GetNetworkRequest{NetworkID: input})
  1077. if err != nil {
  1078. // If any error (including NotFound), ListNetworks to match via ID prefix and full name.
  1079. rl, err := c.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: &swarmapi.ListNetworksRequest_Filters{Names: []string{input}}})
  1080. if err != nil || len(rl.Networks) == 0 {
  1081. rl, err = c.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: &swarmapi.ListNetworksRequest_Filters{IDPrefixes: []string{input}}})
  1082. }
  1083. if err != nil {
  1084. return nil, err
  1085. }
  1086. if len(rl.Networks) == 0 {
  1087. return nil, fmt.Errorf("network %s not found", input)
  1088. }
  1089. if l := len(rl.Networks); l > 1 {
  1090. return nil, fmt.Errorf("network %s is ambiguous (%d matches found)", input, l)
  1091. }
  1092. return rl.Networks[0], nil
  1093. }
  1094. return rg.Network, nil
  1095. }
  1096. // Cleanup stops active swarm node. This is run before daemon shutdown.
  1097. func (c *Cluster) Cleanup() {
  1098. c.Lock()
  1099. node := c.node
  1100. if node == nil {
  1101. c.Unlock()
  1102. return
  1103. }
  1104. defer c.Unlock()
  1105. if c.isActiveManager() {
  1106. active, reachable, unreachable, err := c.managerStats()
  1107. if err == nil {
  1108. singlenode := active && reachable == 1 && unreachable == 0
  1109. if active && !singlenode && reachable-2 <= unreachable {
  1110. logrus.Errorf("Leaving cluster with %v managers left out of %v. Raft quorum will be lost.", reachable-1, reachable+unreachable)
  1111. }
  1112. }
  1113. }
  1114. c.stopNode()
  1115. }
  1116. func (c *Cluster) managerStats() (current bool, reachable int, unreachable int, err error) {
  1117. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  1118. defer cancel()
  1119. nodes, err := c.client.ListNodes(ctx, &swarmapi.ListNodesRequest{})
  1120. if err != nil {
  1121. return false, 0, 0, err
  1122. }
  1123. for _, n := range nodes.Nodes {
  1124. if n.ManagerStatus != nil {
  1125. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_REACHABLE {
  1126. reachable++
  1127. if n.ID == c.node.NodeID() {
  1128. current = true
  1129. }
  1130. }
  1131. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_UNREACHABLE {
  1132. unreachable++
  1133. }
  1134. }
  1135. }
  1136. return
  1137. }
  1138. func validateAndSanitizeInitRequest(req *types.InitRequest) error {
  1139. var err error
  1140. req.ListenAddr, err = validateAddr(req.ListenAddr)
  1141. if err != nil {
  1142. return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
  1143. }
  1144. spec := &req.Spec
  1145. // provide sane defaults instead of erroring
  1146. if spec.Name == "" {
  1147. spec.Name = "default"
  1148. }
  1149. if spec.Raft.SnapshotInterval == 0 {
  1150. spec.Raft.SnapshotInterval = defaultSpec.Raft.SnapshotInterval
  1151. }
  1152. if spec.Raft.LogEntriesForSlowFollowers == 0 {
  1153. spec.Raft.LogEntriesForSlowFollowers = defaultSpec.Raft.LogEntriesForSlowFollowers
  1154. }
  1155. if spec.Raft.ElectionTick == 0 {
  1156. spec.Raft.ElectionTick = defaultSpec.Raft.ElectionTick
  1157. }
  1158. if spec.Raft.HeartbeatTick == 0 {
  1159. spec.Raft.HeartbeatTick = defaultSpec.Raft.HeartbeatTick
  1160. }
  1161. if spec.Dispatcher.HeartbeatPeriod == 0 {
  1162. spec.Dispatcher.HeartbeatPeriod = defaultSpec.Dispatcher.HeartbeatPeriod
  1163. }
  1164. if spec.CAConfig.NodeCertExpiry == 0 {
  1165. spec.CAConfig.NodeCertExpiry = defaultSpec.CAConfig.NodeCertExpiry
  1166. }
  1167. if spec.Orchestration.TaskHistoryRetentionLimit == 0 {
  1168. spec.Orchestration.TaskHistoryRetentionLimit = defaultSpec.Orchestration.TaskHistoryRetentionLimit
  1169. }
  1170. return nil
  1171. }
  1172. func validateAndSanitizeJoinRequest(req *types.JoinRequest) error {
  1173. var err error
  1174. req.ListenAddr, err = validateAddr(req.ListenAddr)
  1175. if err != nil {
  1176. return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
  1177. }
  1178. if len(req.RemoteAddrs) == 0 {
  1179. return fmt.Errorf("at least 1 RemoteAddr is required to join")
  1180. }
  1181. for i := range req.RemoteAddrs {
  1182. req.RemoteAddrs[i], err = validateAddr(req.RemoteAddrs[i])
  1183. if err != nil {
  1184. return fmt.Errorf("invalid remoteAddr %q: %v", req.RemoteAddrs[i], err)
  1185. }
  1186. }
  1187. return nil
  1188. }
  1189. func validateAddr(addr string) (string, error) {
  1190. if addr == "" {
  1191. return addr, fmt.Errorf("invalid empty address")
  1192. }
  1193. newaddr, err := opts.ParseTCPAddr(addr, defaultAddr)
  1194. if err != nil {
  1195. return addr, nil
  1196. }
  1197. return strings.TrimPrefix(newaddr, "tcp://"), nil
  1198. }
  1199. func initClusterSpec(node *node, spec types.Spec) error {
  1200. ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
  1201. for conn := range node.ListenControlSocket(ctx) {
  1202. if ctx.Err() != nil {
  1203. return ctx.Err()
  1204. }
  1205. if conn != nil {
  1206. client := swarmapi.NewControlClient(conn)
  1207. var cluster *swarmapi.Cluster
  1208. for i := 0; ; i++ {
  1209. lcr, err := client.ListClusters(ctx, &swarmapi.ListClustersRequest{})
  1210. if err != nil {
  1211. return fmt.Errorf("error on listing clusters: %v", err)
  1212. }
  1213. if len(lcr.Clusters) == 0 {
  1214. if i < 10 {
  1215. time.Sleep(200 * time.Millisecond)
  1216. continue
  1217. }
  1218. return fmt.Errorf("empty list of clusters was returned")
  1219. }
  1220. cluster = lcr.Clusters[0]
  1221. break
  1222. }
  1223. newspec, err := convert.SwarmSpecToGRPC(spec)
  1224. if err != nil {
  1225. return fmt.Errorf("error updating cluster settings: %v", err)
  1226. }
  1227. _, err = client.UpdateCluster(ctx, &swarmapi.UpdateClusterRequest{
  1228. ClusterID: cluster.ID,
  1229. ClusterVersion: &cluster.Meta.Version,
  1230. Spec: &newspec,
  1231. })
  1232. if err != nil {
  1233. return fmt.Errorf("error updating cluster settings: %v", err)
  1234. }
  1235. return nil
  1236. }
  1237. }
  1238. return ctx.Err()
  1239. }