cluster.go 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455
  1. package cluster
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io/ioutil"
  6. "net"
  7. "os"
  8. "path/filepath"
  9. "strings"
  10. "sync"
  11. "time"
  12. "google.golang.org/grpc"
  13. "github.com/Sirupsen/logrus"
  14. "github.com/docker/docker/daemon/cluster/convert"
  15. executorpkg "github.com/docker/docker/daemon/cluster/executor"
  16. "github.com/docker/docker/daemon/cluster/executor/container"
  17. "github.com/docker/docker/errors"
  18. "github.com/docker/docker/opts"
  19. "github.com/docker/docker/pkg/ioutils"
  20. "github.com/docker/docker/pkg/signal"
  21. "github.com/docker/docker/runconfig"
  22. apitypes "github.com/docker/engine-api/types"
  23. "github.com/docker/engine-api/types/filters"
  24. types "github.com/docker/engine-api/types/swarm"
  25. swarmagent "github.com/docker/swarmkit/agent"
  26. swarmapi "github.com/docker/swarmkit/api"
  27. "golang.org/x/net/context"
  28. )
  29. const swarmDirName = "swarm"
  30. const controlSocket = "control.sock"
  31. const swarmConnectTimeout = 20 * time.Second
  32. const swarmRequestTimeout = 20 * time.Second
  33. const stateFile = "docker-state.json"
  34. const defaultAddr = "0.0.0.0:2377"
  35. const (
  36. initialReconnectDelay = 100 * time.Millisecond
  37. maxReconnectDelay = 30 * time.Second
  38. )
  39. // ErrNoSwarm is returned on leaving a cluster that was never initialized
  40. var ErrNoSwarm = fmt.Errorf("This node is not part of a swarm")
  41. // ErrSwarmExists is returned on initialize or join request for a cluster that has already been activated
  42. var ErrSwarmExists = fmt.Errorf("This node is already part of a swarm. Use \"docker swarm leave\" to leave this swarm and join another one.")
  43. // ErrPendingSwarmExists is returned on initialize or join request for a cluster that is already processing a similar request but has not succeeded yet.
  44. var ErrPendingSwarmExists = fmt.Errorf("This node is processing an existing join request that has not succeeded yet. Use \"docker swarm leave\" to cancel the current request.")
  45. // ErrSwarmJoinTimeoutReached is returned when cluster join could not complete before timeout was reached.
  46. var ErrSwarmJoinTimeoutReached = fmt.Errorf("Timeout was reached before node was joined. The attempt to join the swarm will continue in the background. Use the \"docker info\" command to see the current swarm status of your node.")
  47. // defaultSpec contains some sane defaults if cluster options are missing on init
  48. var defaultSpec = types.Spec{
  49. Raft: types.RaftConfig{
  50. SnapshotInterval: 10000,
  51. KeepOldSnapshots: 0,
  52. LogEntriesForSlowFollowers: 500,
  53. HeartbeatTick: 1,
  54. ElectionTick: 3,
  55. },
  56. CAConfig: types.CAConfig{
  57. NodeCertExpiry: 90 * 24 * time.Hour,
  58. },
  59. Dispatcher: types.DispatcherConfig{
  60. HeartbeatPeriod: 5 * time.Second,
  61. },
  62. Orchestration: types.OrchestrationConfig{
  63. TaskHistoryRetentionLimit: 10,
  64. },
  65. }
  66. type state struct {
  67. // LocalAddr is this machine's local IP or hostname, if specified.
  68. LocalAddr string
  69. // RemoteAddr is the address that was given to "swarm join. It is used
  70. // to find LocalAddr if necessary.
  71. RemoteAddr string
  72. // ListenAddr is the address we bind to, including a port.
  73. ListenAddr string
  74. // AdvertiseAddr is the address other nodes should connect to,
  75. // including a port.
  76. AdvertiseAddr string
  77. }
  78. // NetworkSubnetsProvider exposes functions for retrieving the subnets
  79. // of networks managed by Docker, so they can be filtered.
  80. type NetworkSubnetsProvider interface {
  81. V4Subnets() []net.IPNet
  82. V6Subnets() []net.IPNet
  83. }
  84. // Config provides values for Cluster.
  85. type Config struct {
  86. Root string
  87. Name string
  88. Backend executorpkg.Backend
  89. NetworkSubnetsProvider NetworkSubnetsProvider
  90. // DefaultAdvertiseAddr is the default host/IP or network interface to use
  91. // if no AdvertiseAddr value is specified.
  92. DefaultAdvertiseAddr string
  93. // path to store runtime state, such as the swarm control socket
  94. RuntimeRoot string
  95. }
  96. // Cluster provides capabilities to participate in a cluster as a worker or a
  97. // manager.
  98. type Cluster struct {
  99. sync.RWMutex
  100. *node
  101. root string
  102. runtimeRoot string
  103. config Config
  104. configEvent chan struct{} // todo: make this array and goroutine safe
  105. localAddr string
  106. actualLocalAddr string // after resolution, not persisted
  107. remoteAddr string
  108. listenAddr string
  109. advertiseAddr string
  110. stop bool
  111. err error
  112. cancelDelay func()
  113. }
  114. type node struct {
  115. *swarmagent.Node
  116. done chan struct{}
  117. ready bool
  118. conn *grpc.ClientConn
  119. client swarmapi.ControlClient
  120. reconnectDelay time.Duration
  121. }
  122. // New creates a new Cluster instance using provided config.
  123. func New(config Config) (*Cluster, error) {
  124. root := filepath.Join(config.Root, swarmDirName)
  125. if err := os.MkdirAll(root, 0700); err != nil {
  126. return nil, err
  127. }
  128. if config.RuntimeRoot == "" {
  129. config.RuntimeRoot = root
  130. }
  131. if err := os.MkdirAll(config.RuntimeRoot, 0700); err != nil {
  132. return nil, err
  133. }
  134. c := &Cluster{
  135. root: root,
  136. config: config,
  137. configEvent: make(chan struct{}, 10),
  138. runtimeRoot: config.RuntimeRoot,
  139. }
  140. st, err := c.loadState()
  141. if err != nil {
  142. if os.IsNotExist(err) {
  143. return c, nil
  144. }
  145. return nil, err
  146. }
  147. n, err := c.startNewNode(false, st.LocalAddr, st.RemoteAddr, st.ListenAddr, st.AdvertiseAddr, "", "")
  148. if err != nil {
  149. return nil, err
  150. }
  151. select {
  152. case <-time.After(swarmConnectTimeout):
  153. logrus.Errorf("swarm component could not be started before timeout was reached")
  154. case <-n.Ready():
  155. case <-n.done:
  156. return nil, fmt.Errorf("swarm component could not be started: %v", c.err)
  157. }
  158. go c.reconnectOnFailure(n)
  159. return c, nil
  160. }
  161. func (c *Cluster) loadState() (*state, error) {
  162. dt, err := ioutil.ReadFile(filepath.Join(c.root, stateFile))
  163. if err != nil {
  164. return nil, err
  165. }
  166. // missing certificate means no actual state to restore from
  167. if _, err := os.Stat(filepath.Join(c.root, "certificates/swarm-node.crt")); err != nil {
  168. if os.IsNotExist(err) {
  169. c.clearState()
  170. }
  171. return nil, err
  172. }
  173. var st state
  174. if err := json.Unmarshal(dt, &st); err != nil {
  175. return nil, err
  176. }
  177. return &st, nil
  178. }
  179. func (c *Cluster) saveState() error {
  180. dt, err := json.Marshal(state{
  181. LocalAddr: c.localAddr,
  182. RemoteAddr: c.remoteAddr,
  183. ListenAddr: c.listenAddr,
  184. AdvertiseAddr: c.advertiseAddr,
  185. })
  186. if err != nil {
  187. return err
  188. }
  189. return ioutils.AtomicWriteFile(filepath.Join(c.root, stateFile), dt, 0600)
  190. }
  191. func (c *Cluster) reconnectOnFailure(n *node) {
  192. for {
  193. <-n.done
  194. c.Lock()
  195. if c.stop || c.node != nil {
  196. c.Unlock()
  197. return
  198. }
  199. n.reconnectDelay *= 2
  200. if n.reconnectDelay > maxReconnectDelay {
  201. n.reconnectDelay = maxReconnectDelay
  202. }
  203. logrus.Warnf("Restarting swarm in %.2f seconds", n.reconnectDelay.Seconds())
  204. delayCtx, cancel := context.WithTimeout(context.Background(), n.reconnectDelay)
  205. c.cancelDelay = cancel
  206. c.Unlock()
  207. <-delayCtx.Done()
  208. if delayCtx.Err() != context.DeadlineExceeded {
  209. return
  210. }
  211. c.Lock()
  212. if c.node != nil {
  213. c.Unlock()
  214. return
  215. }
  216. var err error
  217. n, err = c.startNewNode(false, c.localAddr, c.getRemoteAddress(), c.listenAddr, c.advertiseAddr, c.getRemoteAddress(), "")
  218. if err != nil {
  219. c.err = err
  220. close(n.done)
  221. }
  222. c.Unlock()
  223. }
  224. }
  225. func (c *Cluster) startNewNode(forceNewCluster bool, localAddr, remoteAddr, listenAddr, advertiseAddr, joinAddr, joinToken string) (*node, error) {
  226. if err := c.config.Backend.IsSwarmCompatible(); err != nil {
  227. return nil, err
  228. }
  229. actualLocalAddr := localAddr
  230. if actualLocalAddr == "" {
  231. // If localAddr was not specified, resolve it automatically
  232. // based on the route to joinAddr. localAddr can only be left
  233. // empty on "join".
  234. listenHost, _, err := net.SplitHostPort(listenAddr)
  235. if err != nil {
  236. return nil, fmt.Errorf("could not parse listen address: %v", err)
  237. }
  238. listenAddrIP := net.ParseIP(listenHost)
  239. if listenAddrIP == nil || !listenAddrIP.IsUnspecified() {
  240. actualLocalAddr = listenHost
  241. } else {
  242. if remoteAddr == "" {
  243. // Should never happen except using swarms created by
  244. // old versions that didn't save remoteAddr.
  245. remoteAddr = "8.8.8.8:53"
  246. }
  247. conn, err := net.Dial("udp", remoteAddr)
  248. if err != nil {
  249. return nil, fmt.Errorf("could not find local IP address: %v", err)
  250. }
  251. localHostPort := conn.LocalAddr().String()
  252. actualLocalAddr, _, _ = net.SplitHostPort(localHostPort)
  253. conn.Close()
  254. }
  255. }
  256. c.node = nil
  257. c.cancelDelay = nil
  258. c.stop = false
  259. n, err := swarmagent.NewNode(&swarmagent.NodeConfig{
  260. Hostname: c.config.Name,
  261. ForceNewCluster: forceNewCluster,
  262. ListenControlAPI: filepath.Join(c.runtimeRoot, controlSocket),
  263. ListenRemoteAPI: listenAddr,
  264. AdvertiseRemoteAPI: advertiseAddr,
  265. JoinAddr: joinAddr,
  266. StateDir: c.root,
  267. JoinToken: joinToken,
  268. Executor: container.NewExecutor(c.config.Backend),
  269. HeartbeatTick: 1,
  270. ElectionTick: 3,
  271. })
  272. if err != nil {
  273. return nil, err
  274. }
  275. ctx := context.Background()
  276. if err := n.Start(ctx); err != nil {
  277. return nil, err
  278. }
  279. node := &node{
  280. Node: n,
  281. done: make(chan struct{}),
  282. reconnectDelay: initialReconnectDelay,
  283. }
  284. c.node = node
  285. c.localAddr = localAddr
  286. c.actualLocalAddr = actualLocalAddr // not saved
  287. c.remoteAddr = remoteAddr
  288. c.listenAddr = listenAddr
  289. c.advertiseAddr = advertiseAddr
  290. c.saveState()
  291. c.config.Backend.SetClusterProvider(c)
  292. go func() {
  293. err := n.Err(ctx)
  294. if err != nil {
  295. logrus.Errorf("cluster exited with error: %v", err)
  296. }
  297. c.Lock()
  298. c.node = nil
  299. c.err = err
  300. c.Unlock()
  301. close(node.done)
  302. }()
  303. go func() {
  304. select {
  305. case <-n.Ready():
  306. c.Lock()
  307. node.ready = true
  308. c.err = nil
  309. c.Unlock()
  310. case <-ctx.Done():
  311. }
  312. c.configEvent <- struct{}{}
  313. }()
  314. go func() {
  315. for conn := range n.ListenControlSocket(ctx) {
  316. c.Lock()
  317. if node.conn != conn {
  318. if conn == nil {
  319. node.client = nil
  320. } else {
  321. node.client = swarmapi.NewControlClient(conn)
  322. }
  323. }
  324. node.conn = conn
  325. c.Unlock()
  326. c.configEvent <- struct{}{}
  327. }
  328. }()
  329. return node, nil
  330. }
  331. // Init initializes new cluster from user provided request.
  332. func (c *Cluster) Init(req types.InitRequest) (string, error) {
  333. c.Lock()
  334. if node := c.node; node != nil {
  335. if !req.ForceNewCluster {
  336. c.Unlock()
  337. return "", ErrSwarmExists
  338. }
  339. if err := c.stopNode(); err != nil {
  340. c.Unlock()
  341. return "", err
  342. }
  343. }
  344. if err := validateAndSanitizeInitRequest(&req); err != nil {
  345. c.Unlock()
  346. return "", err
  347. }
  348. listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
  349. if err != nil {
  350. c.Unlock()
  351. return "", err
  352. }
  353. advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
  354. if err != nil {
  355. c.Unlock()
  356. return "", err
  357. }
  358. localAddr := listenHost
  359. // If the advertise address is not one of the system's
  360. // addresses, we also require a listen address.
  361. listenAddrIP := net.ParseIP(listenHost)
  362. if listenAddrIP != nil && listenAddrIP.IsUnspecified() {
  363. advertiseIP := net.ParseIP(advertiseHost)
  364. if advertiseIP == nil {
  365. // not an IP
  366. c.Unlock()
  367. return "", errMustSpecifyListenAddr
  368. }
  369. systemIPs := listSystemIPs()
  370. found := false
  371. for _, systemIP := range systemIPs {
  372. if systemIP.Equal(advertiseIP) {
  373. found = true
  374. break
  375. }
  376. }
  377. if !found {
  378. c.Unlock()
  379. return "", errMustSpecifyListenAddr
  380. }
  381. localAddr = advertiseIP.String()
  382. }
  383. // todo: check current state existing
  384. n, err := c.startNewNode(req.ForceNewCluster, localAddr, "", net.JoinHostPort(listenHost, listenPort), net.JoinHostPort(advertiseHost, advertisePort), "", "")
  385. if err != nil {
  386. c.Unlock()
  387. return "", err
  388. }
  389. c.Unlock()
  390. select {
  391. case <-n.Ready():
  392. if err := initClusterSpec(n, req.Spec); err != nil {
  393. return "", err
  394. }
  395. go c.reconnectOnFailure(n)
  396. return n.NodeID(), nil
  397. case <-n.done:
  398. c.RLock()
  399. defer c.RUnlock()
  400. if !req.ForceNewCluster { // if failure on first attempt don't keep state
  401. if err := c.clearState(); err != nil {
  402. return "", err
  403. }
  404. }
  405. return "", c.err
  406. }
  407. }
  408. // Join makes current Cluster part of an existing swarm cluster.
  409. func (c *Cluster) Join(req types.JoinRequest) error {
  410. c.Lock()
  411. if node := c.node; node != nil {
  412. c.Unlock()
  413. return ErrSwarmExists
  414. }
  415. if err := validateAndSanitizeJoinRequest(&req); err != nil {
  416. c.Unlock()
  417. return err
  418. }
  419. listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
  420. if err != nil {
  421. c.Unlock()
  422. return err
  423. }
  424. var advertiseAddr string
  425. if req.AdvertiseAddr != "" {
  426. advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
  427. // For joining, we don't need to provide an advertise address,
  428. // since the remote side can detect it.
  429. if err == nil {
  430. advertiseAddr = net.JoinHostPort(advertiseHost, advertisePort)
  431. }
  432. }
  433. // todo: check current state existing
  434. n, err := c.startNewNode(false, "", req.RemoteAddrs[0], net.JoinHostPort(listenHost, listenPort), advertiseAddr, req.RemoteAddrs[0], req.JoinToken)
  435. if err != nil {
  436. c.Unlock()
  437. return err
  438. }
  439. c.Unlock()
  440. select {
  441. case <-time.After(swarmConnectTimeout):
  442. // attempt to connect will continue in background, also reconnecting
  443. go c.reconnectOnFailure(n)
  444. return ErrSwarmJoinTimeoutReached
  445. case <-n.Ready():
  446. go c.reconnectOnFailure(n)
  447. return nil
  448. case <-n.done:
  449. c.RLock()
  450. defer c.RUnlock()
  451. return c.err
  452. }
  453. }
  454. // stopNode is a helper that stops the active c.node and waits until it has
  455. // shut down. Call while keeping the cluster lock.
  456. func (c *Cluster) stopNode() error {
  457. if c.node == nil {
  458. return nil
  459. }
  460. c.stop = true
  461. if c.cancelDelay != nil {
  462. c.cancelDelay()
  463. c.cancelDelay = nil
  464. }
  465. node := c.node
  466. ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
  467. defer cancel()
  468. // TODO: can't hold lock on stop because it calls back to network
  469. c.Unlock()
  470. defer c.Lock()
  471. if err := node.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") {
  472. return err
  473. }
  474. <-node.done
  475. return nil
  476. }
  477. func removingManagerCausesLossOfQuorum(reachable, unreachable int) bool {
  478. return reachable-2 <= unreachable
  479. }
  480. func isLastManager(reachable, unreachable int) bool {
  481. return reachable == 1 && unreachable == 0
  482. }
  483. // Leave shuts down Cluster and removes current state.
  484. func (c *Cluster) Leave(force bool) error {
  485. c.Lock()
  486. node := c.node
  487. if node == nil {
  488. c.Unlock()
  489. return ErrNoSwarm
  490. }
  491. if node.Manager() != nil && !force {
  492. msg := "You are attempting to leave the swarm on a node that is participating as a manager. "
  493. if c.isActiveManager() {
  494. active, reachable, unreachable, err := c.managerStats()
  495. if err == nil {
  496. if active && removingManagerCausesLossOfQuorum(reachable, unreachable) {
  497. if isLastManager(reachable, unreachable) {
  498. msg += "Removing the last manager erases all current state of the swarm. Use `--force` to ignore this message. "
  499. c.Unlock()
  500. return fmt.Errorf(msg)
  501. }
  502. msg += fmt.Sprintf("Removing this node leaves %v managers out of %v. Without a Raft quorum your swarm will be inaccessible. ", reachable-1, reachable+unreachable)
  503. }
  504. }
  505. } else {
  506. msg += "Doing so may lose the consensus of your cluster. "
  507. }
  508. msg += "The only way to restore a swarm that has lost consensus is to reinitialize it with `--force-new-cluster`. Use `--force` to suppress this message."
  509. c.Unlock()
  510. return fmt.Errorf(msg)
  511. }
  512. if err := c.stopNode(); err != nil {
  513. logrus.Errorf("failed to shut down cluster node: %v", err)
  514. signal.DumpStacks("")
  515. c.Unlock()
  516. return err
  517. }
  518. c.Unlock()
  519. if nodeID := node.NodeID(); nodeID != "" {
  520. nodeContainers, err := c.listContainerForNode(nodeID)
  521. if err != nil {
  522. return err
  523. }
  524. for _, id := range nodeContainers {
  525. if err := c.config.Backend.ContainerRm(id, &apitypes.ContainerRmConfig{ForceRemove: true}); err != nil {
  526. logrus.Errorf("error removing %v: %v", id, err)
  527. }
  528. }
  529. }
  530. c.configEvent <- struct{}{}
  531. // todo: cleanup optional?
  532. if err := c.clearState(); err != nil {
  533. return err
  534. }
  535. return nil
  536. }
  537. func (c *Cluster) listContainerForNode(nodeID string) ([]string, error) {
  538. var ids []string
  539. filters := filters.NewArgs()
  540. filters.Add("label", fmt.Sprintf("com.docker.swarm.node.id=%s", nodeID))
  541. containers, err := c.config.Backend.Containers(&apitypes.ContainerListOptions{
  542. Filter: filters,
  543. })
  544. if err != nil {
  545. return []string{}, err
  546. }
  547. for _, c := range containers {
  548. ids = append(ids, c.ID)
  549. }
  550. return ids, nil
  551. }
  552. func (c *Cluster) clearState() error {
  553. // todo: backup this data instead of removing?
  554. if err := os.RemoveAll(c.root); err != nil {
  555. return err
  556. }
  557. if err := os.MkdirAll(c.root, 0700); err != nil {
  558. return err
  559. }
  560. c.config.Backend.SetClusterProvider(nil)
  561. return nil
  562. }
  563. func (c *Cluster) getRequestContext() (context.Context, func()) { // TODO: not needed when requests don't block on qourum lost
  564. return context.WithTimeout(context.Background(), swarmRequestTimeout)
  565. }
  566. // Inspect retrieves the configuration properties of a managed swarm cluster.
  567. func (c *Cluster) Inspect() (types.Swarm, error) {
  568. c.RLock()
  569. defer c.RUnlock()
  570. if !c.isActiveManager() {
  571. return types.Swarm{}, c.errNoManager()
  572. }
  573. ctx, cancel := c.getRequestContext()
  574. defer cancel()
  575. swarm, err := getSwarm(ctx, c.client)
  576. if err != nil {
  577. return types.Swarm{}, err
  578. }
  579. if err != nil {
  580. return types.Swarm{}, err
  581. }
  582. return convert.SwarmFromGRPC(*swarm), nil
  583. }
  584. // Update updates configuration of a managed swarm cluster.
  585. func (c *Cluster) Update(version uint64, spec types.Spec, flags types.UpdateFlags) error {
  586. c.RLock()
  587. defer c.RUnlock()
  588. if !c.isActiveManager() {
  589. return c.errNoManager()
  590. }
  591. ctx, cancel := c.getRequestContext()
  592. defer cancel()
  593. swarm, err := getSwarm(ctx, c.client)
  594. if err != nil {
  595. return err
  596. }
  597. swarmSpec, err := convert.SwarmSpecToGRPC(spec)
  598. if err != nil {
  599. return err
  600. }
  601. _, err = c.client.UpdateCluster(
  602. ctx,
  603. &swarmapi.UpdateClusterRequest{
  604. ClusterID: swarm.ID,
  605. Spec: &swarmSpec,
  606. ClusterVersion: &swarmapi.Version{
  607. Index: version,
  608. },
  609. Rotation: swarmapi.JoinTokenRotation{
  610. RotateWorkerToken: flags.RotateWorkerToken,
  611. RotateManagerToken: flags.RotateManagerToken,
  612. },
  613. },
  614. )
  615. return err
  616. }
  617. // IsManager returns true if Cluster is participating as a manager.
  618. func (c *Cluster) IsManager() bool {
  619. c.RLock()
  620. defer c.RUnlock()
  621. return c.isActiveManager()
  622. }
  623. // IsAgent returns true if Cluster is participating as a worker/agent.
  624. func (c *Cluster) IsAgent() bool {
  625. c.RLock()
  626. defer c.RUnlock()
  627. return c.node != nil && c.ready
  628. }
  629. // GetLocalAddress returns the local address.
  630. func (c *Cluster) GetLocalAddress() string {
  631. c.RLock()
  632. defer c.RUnlock()
  633. return c.actualLocalAddr
  634. }
  635. // GetAdvertiseAddress returns the remotely reachable address of this node.
  636. func (c *Cluster) GetAdvertiseAddress() string {
  637. c.RLock()
  638. defer c.RUnlock()
  639. if c.advertiseAddr != "" {
  640. advertiseHost, _, _ := net.SplitHostPort(c.advertiseAddr)
  641. return advertiseHost
  642. }
  643. return c.actualLocalAddr
  644. }
  645. // GetRemoteAddress returns a known advertise address of a remote manager if
  646. // available.
  647. // todo: change to array/connect with info
  648. func (c *Cluster) GetRemoteAddress() string {
  649. c.RLock()
  650. defer c.RUnlock()
  651. return c.getRemoteAddress()
  652. }
  653. func (c *Cluster) getRemoteAddress() string {
  654. if c.node == nil {
  655. return ""
  656. }
  657. nodeID := c.node.NodeID()
  658. for _, r := range c.node.Remotes() {
  659. if r.NodeID != nodeID {
  660. return r.Addr
  661. }
  662. }
  663. return ""
  664. }
  665. // ListenClusterEvents returns a channel that receives messages on cluster
  666. // participation changes.
  667. // todo: make cancelable and accessible to multiple callers
  668. func (c *Cluster) ListenClusterEvents() <-chan struct{} {
  669. return c.configEvent
  670. }
  671. // Info returns information about the current cluster state.
  672. func (c *Cluster) Info() types.Info {
  673. info := types.Info{
  674. NodeAddr: c.GetAdvertiseAddress(),
  675. }
  676. c.RLock()
  677. defer c.RUnlock()
  678. if c.node == nil {
  679. info.LocalNodeState = types.LocalNodeStateInactive
  680. if c.cancelDelay != nil {
  681. info.LocalNodeState = types.LocalNodeStateError
  682. }
  683. } else {
  684. info.LocalNodeState = types.LocalNodeStatePending
  685. if c.ready == true {
  686. info.LocalNodeState = types.LocalNodeStateActive
  687. }
  688. }
  689. if c.err != nil {
  690. info.Error = c.err.Error()
  691. }
  692. ctx, cancel := c.getRequestContext()
  693. defer cancel()
  694. if c.isActiveManager() {
  695. info.ControlAvailable = true
  696. swarm, err := c.Inspect()
  697. if err != nil {
  698. info.Error = err.Error()
  699. }
  700. // Strip JoinTokens
  701. info.Cluster = swarm.ClusterInfo
  702. if r, err := c.client.ListNodes(ctx, &swarmapi.ListNodesRequest{}); err == nil {
  703. info.Nodes = len(r.Nodes)
  704. for _, n := range r.Nodes {
  705. if n.ManagerStatus != nil {
  706. info.Managers = info.Managers + 1
  707. }
  708. }
  709. }
  710. }
  711. if c.node != nil {
  712. for _, r := range c.node.Remotes() {
  713. info.RemoteManagers = append(info.RemoteManagers, types.Peer{NodeID: r.NodeID, Addr: r.Addr})
  714. }
  715. info.NodeID = c.node.NodeID()
  716. }
  717. return info
  718. }
  719. // isActiveManager should not be called without a read lock
  720. func (c *Cluster) isActiveManager() bool {
  721. return c.node != nil && c.conn != nil
  722. }
  723. // errNoManager returns error describing why manager commands can't be used.
  724. // Call with read lock.
  725. func (c *Cluster) errNoManager() error {
  726. if c.node == nil {
  727. return fmt.Errorf("This node is not a swarm manager. Use \"docker swarm init\" or \"docker swarm join\" to connect this node to swarm and try again.")
  728. }
  729. if c.node.Manager() != nil {
  730. return fmt.Errorf("This node is not a swarm manager. Manager is being prepared or has trouble connecting to the cluster.")
  731. }
  732. return fmt.Errorf("This node is not a swarm manager. Worker nodes can't be used to view or modify cluster state. Please run this command on a manager node or promote the current node to a manager.")
  733. }
  734. // GetServices returns all services of a managed swarm cluster.
  735. func (c *Cluster) GetServices(options apitypes.ServiceListOptions) ([]types.Service, error) {
  736. c.RLock()
  737. defer c.RUnlock()
  738. if !c.isActiveManager() {
  739. return nil, c.errNoManager()
  740. }
  741. filters, err := newListServicesFilters(options.Filter)
  742. if err != nil {
  743. return nil, err
  744. }
  745. ctx, cancel := c.getRequestContext()
  746. defer cancel()
  747. r, err := c.client.ListServices(
  748. ctx,
  749. &swarmapi.ListServicesRequest{Filters: filters})
  750. if err != nil {
  751. return nil, err
  752. }
  753. services := []types.Service{}
  754. for _, service := range r.Services {
  755. services = append(services, convert.ServiceFromGRPC(*service))
  756. }
  757. return services, nil
  758. }
  759. // CreateService creates a new service in a managed swarm cluster.
  760. func (c *Cluster) CreateService(s types.ServiceSpec, encodedAuth string) (string, error) {
  761. c.RLock()
  762. defer c.RUnlock()
  763. if !c.isActiveManager() {
  764. return "", c.errNoManager()
  765. }
  766. ctx, cancel := c.getRequestContext()
  767. defer cancel()
  768. err := c.populateNetworkID(ctx, c.client, &s)
  769. if err != nil {
  770. return "", err
  771. }
  772. serviceSpec, err := convert.ServiceSpecToGRPC(s)
  773. if err != nil {
  774. return "", err
  775. }
  776. if encodedAuth != "" {
  777. ctnr := serviceSpec.Task.GetContainer()
  778. if ctnr == nil {
  779. return "", fmt.Errorf("service does not use container tasks")
  780. }
  781. ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
  782. }
  783. r, err := c.client.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
  784. if err != nil {
  785. return "", err
  786. }
  787. return r.Service.ID, nil
  788. }
  789. // GetService returns a service based on an ID or name.
  790. func (c *Cluster) GetService(input string) (types.Service, error) {
  791. c.RLock()
  792. defer c.RUnlock()
  793. if !c.isActiveManager() {
  794. return types.Service{}, c.errNoManager()
  795. }
  796. ctx, cancel := c.getRequestContext()
  797. defer cancel()
  798. service, err := getService(ctx, c.client, input)
  799. if err != nil {
  800. return types.Service{}, err
  801. }
  802. return convert.ServiceFromGRPC(*service), nil
  803. }
  804. // UpdateService updates existing service to match new properties.
  805. func (c *Cluster) UpdateService(serviceIDOrName string, version uint64, spec types.ServiceSpec, encodedAuth string) error {
  806. c.RLock()
  807. defer c.RUnlock()
  808. if !c.isActiveManager() {
  809. return c.errNoManager()
  810. }
  811. ctx, cancel := c.getRequestContext()
  812. defer cancel()
  813. err := c.populateNetworkID(ctx, c.client, &spec)
  814. if err != nil {
  815. return err
  816. }
  817. serviceSpec, err := convert.ServiceSpecToGRPC(spec)
  818. if err != nil {
  819. return err
  820. }
  821. currentService, err := getService(ctx, c.client, serviceIDOrName)
  822. if err != nil {
  823. return err
  824. }
  825. if encodedAuth != "" {
  826. ctnr := serviceSpec.Task.GetContainer()
  827. if ctnr == nil {
  828. return fmt.Errorf("service does not use container tasks")
  829. }
  830. ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
  831. } else {
  832. // this is needed because if the encodedAuth isn't being updated then we
  833. // shouldn't lose it, and continue to use the one that was already present
  834. ctnr := currentService.Spec.Task.GetContainer()
  835. if ctnr == nil {
  836. return fmt.Errorf("service does not use container tasks")
  837. }
  838. serviceSpec.Task.GetContainer().PullOptions = ctnr.PullOptions
  839. }
  840. _, err = c.client.UpdateService(
  841. ctx,
  842. &swarmapi.UpdateServiceRequest{
  843. ServiceID: currentService.ID,
  844. Spec: &serviceSpec,
  845. ServiceVersion: &swarmapi.Version{
  846. Index: version,
  847. },
  848. },
  849. )
  850. return err
  851. }
  852. // RemoveService removes a service from a managed swarm cluster.
  853. func (c *Cluster) RemoveService(input string) error {
  854. c.RLock()
  855. defer c.RUnlock()
  856. if !c.isActiveManager() {
  857. return c.errNoManager()
  858. }
  859. ctx, cancel := c.getRequestContext()
  860. defer cancel()
  861. service, err := getService(ctx, c.client, input)
  862. if err != nil {
  863. return err
  864. }
  865. if _, err := c.client.RemoveService(ctx, &swarmapi.RemoveServiceRequest{ServiceID: service.ID}); err != nil {
  866. return err
  867. }
  868. return nil
  869. }
  870. // GetNodes returns a list of all nodes known to a cluster.
  871. func (c *Cluster) GetNodes(options apitypes.NodeListOptions) ([]types.Node, error) {
  872. c.RLock()
  873. defer c.RUnlock()
  874. if !c.isActiveManager() {
  875. return nil, c.errNoManager()
  876. }
  877. filters, err := newListNodesFilters(options.Filter)
  878. if err != nil {
  879. return nil, err
  880. }
  881. ctx, cancel := c.getRequestContext()
  882. defer cancel()
  883. r, err := c.client.ListNodes(
  884. ctx,
  885. &swarmapi.ListNodesRequest{Filters: filters})
  886. if err != nil {
  887. return nil, err
  888. }
  889. nodes := []types.Node{}
  890. for _, node := range r.Nodes {
  891. nodes = append(nodes, convert.NodeFromGRPC(*node))
  892. }
  893. return nodes, nil
  894. }
  895. // GetNode returns a node based on an ID or name.
  896. func (c *Cluster) GetNode(input string) (types.Node, error) {
  897. c.RLock()
  898. defer c.RUnlock()
  899. if !c.isActiveManager() {
  900. return types.Node{}, c.errNoManager()
  901. }
  902. ctx, cancel := c.getRequestContext()
  903. defer cancel()
  904. node, err := getNode(ctx, c.client, input)
  905. if err != nil {
  906. return types.Node{}, err
  907. }
  908. return convert.NodeFromGRPC(*node), nil
  909. }
  910. // UpdateNode updates existing nodes properties.
  911. func (c *Cluster) UpdateNode(nodeID string, version uint64, spec types.NodeSpec) error {
  912. c.RLock()
  913. defer c.RUnlock()
  914. if !c.isActiveManager() {
  915. return c.errNoManager()
  916. }
  917. nodeSpec, err := convert.NodeSpecToGRPC(spec)
  918. if err != nil {
  919. return err
  920. }
  921. ctx, cancel := c.getRequestContext()
  922. defer cancel()
  923. _, err = c.client.UpdateNode(
  924. ctx,
  925. &swarmapi.UpdateNodeRequest{
  926. NodeID: nodeID,
  927. Spec: &nodeSpec,
  928. NodeVersion: &swarmapi.Version{
  929. Index: version,
  930. },
  931. },
  932. )
  933. return err
  934. }
  935. // RemoveNode removes a node from a cluster
  936. func (c *Cluster) RemoveNode(input string, force bool) error {
  937. c.RLock()
  938. defer c.RUnlock()
  939. if !c.isActiveManager() {
  940. return c.errNoManager()
  941. }
  942. ctx, cancel := c.getRequestContext()
  943. defer cancel()
  944. node, err := getNode(ctx, c.client, input)
  945. if err != nil {
  946. return err
  947. }
  948. if _, err := c.client.RemoveNode(ctx, &swarmapi.RemoveNodeRequest{NodeID: node.ID, Force: force}); err != nil {
  949. return err
  950. }
  951. return nil
  952. }
  953. // GetTasks returns a list of tasks matching the filter options.
  954. func (c *Cluster) GetTasks(options apitypes.TaskListOptions) ([]types.Task, error) {
  955. c.RLock()
  956. defer c.RUnlock()
  957. if !c.isActiveManager() {
  958. return nil, c.errNoManager()
  959. }
  960. byName := func(filter filters.Args) error {
  961. if filter.Include("service") {
  962. serviceFilters := filter.Get("service")
  963. for _, serviceFilter := range serviceFilters {
  964. service, err := c.GetService(serviceFilter)
  965. if err != nil {
  966. return err
  967. }
  968. filter.Del("service", serviceFilter)
  969. filter.Add("service", service.ID)
  970. }
  971. }
  972. if filter.Include("node") {
  973. nodeFilters := filter.Get("node")
  974. for _, nodeFilter := range nodeFilters {
  975. node, err := c.GetNode(nodeFilter)
  976. if err != nil {
  977. return err
  978. }
  979. filter.Del("node", nodeFilter)
  980. filter.Add("node", node.ID)
  981. }
  982. }
  983. return nil
  984. }
  985. filters, err := newListTasksFilters(options.Filter, byName)
  986. if err != nil {
  987. return nil, err
  988. }
  989. ctx, cancel := c.getRequestContext()
  990. defer cancel()
  991. r, err := c.client.ListTasks(
  992. ctx,
  993. &swarmapi.ListTasksRequest{Filters: filters})
  994. if err != nil {
  995. return nil, err
  996. }
  997. tasks := []types.Task{}
  998. for _, task := range r.Tasks {
  999. tasks = append(tasks, convert.TaskFromGRPC(*task))
  1000. }
  1001. return tasks, nil
  1002. }
  1003. // GetTask returns a task by an ID.
  1004. func (c *Cluster) GetTask(input string) (types.Task, error) {
  1005. c.RLock()
  1006. defer c.RUnlock()
  1007. if !c.isActiveManager() {
  1008. return types.Task{}, c.errNoManager()
  1009. }
  1010. ctx, cancel := c.getRequestContext()
  1011. defer cancel()
  1012. task, err := getTask(ctx, c.client, input)
  1013. if err != nil {
  1014. return types.Task{}, err
  1015. }
  1016. return convert.TaskFromGRPC(*task), nil
  1017. }
  1018. // GetNetwork returns a cluster network by an ID.
  1019. func (c *Cluster) GetNetwork(input string) (apitypes.NetworkResource, error) {
  1020. c.RLock()
  1021. defer c.RUnlock()
  1022. if !c.isActiveManager() {
  1023. return apitypes.NetworkResource{}, c.errNoManager()
  1024. }
  1025. ctx, cancel := c.getRequestContext()
  1026. defer cancel()
  1027. network, err := getNetwork(ctx, c.client, input)
  1028. if err != nil {
  1029. return apitypes.NetworkResource{}, err
  1030. }
  1031. return convert.BasicNetworkFromGRPC(*network), nil
  1032. }
  1033. // GetNetworks returns all current cluster managed networks.
  1034. func (c *Cluster) GetNetworks() ([]apitypes.NetworkResource, error) {
  1035. c.RLock()
  1036. defer c.RUnlock()
  1037. if !c.isActiveManager() {
  1038. return nil, c.errNoManager()
  1039. }
  1040. ctx, cancel := c.getRequestContext()
  1041. defer cancel()
  1042. r, err := c.client.ListNetworks(ctx, &swarmapi.ListNetworksRequest{})
  1043. if err != nil {
  1044. return nil, err
  1045. }
  1046. var networks []apitypes.NetworkResource
  1047. for _, network := range r.Networks {
  1048. networks = append(networks, convert.BasicNetworkFromGRPC(*network))
  1049. }
  1050. return networks, nil
  1051. }
  1052. // CreateNetwork creates a new cluster managed network.
  1053. func (c *Cluster) CreateNetwork(s apitypes.NetworkCreateRequest) (string, error) {
  1054. c.RLock()
  1055. defer c.RUnlock()
  1056. if !c.isActiveManager() {
  1057. return "", c.errNoManager()
  1058. }
  1059. if runconfig.IsPreDefinedNetwork(s.Name) {
  1060. err := fmt.Errorf("%s is a pre-defined network and cannot be created", s.Name)
  1061. return "", errors.NewRequestForbiddenError(err)
  1062. }
  1063. ctx, cancel := c.getRequestContext()
  1064. defer cancel()
  1065. networkSpec := convert.BasicNetworkCreateToGRPC(s)
  1066. r, err := c.client.CreateNetwork(ctx, &swarmapi.CreateNetworkRequest{Spec: &networkSpec})
  1067. if err != nil {
  1068. return "", err
  1069. }
  1070. return r.Network.ID, nil
  1071. }
  1072. // RemoveNetwork removes a cluster network.
  1073. func (c *Cluster) RemoveNetwork(input string) error {
  1074. c.RLock()
  1075. defer c.RUnlock()
  1076. if !c.isActiveManager() {
  1077. return c.errNoManager()
  1078. }
  1079. ctx, cancel := c.getRequestContext()
  1080. defer cancel()
  1081. network, err := getNetwork(ctx, c.client, input)
  1082. if err != nil {
  1083. return err
  1084. }
  1085. if _, err := c.client.RemoveNetwork(ctx, &swarmapi.RemoveNetworkRequest{NetworkID: network.ID}); err != nil {
  1086. return err
  1087. }
  1088. return nil
  1089. }
  1090. func (c *Cluster) populateNetworkID(ctx context.Context, client swarmapi.ControlClient, s *types.ServiceSpec) error {
  1091. for i, n := range s.Networks {
  1092. apiNetwork, err := getNetwork(ctx, client, n.Target)
  1093. if err != nil {
  1094. if ln, _ := c.config.Backend.FindNetwork(n.Target); ln != nil && !ln.Info().Dynamic() {
  1095. err = fmt.Errorf("network %s is not eligible for docker services", ln.Name())
  1096. return errors.NewRequestForbiddenError(err)
  1097. }
  1098. return err
  1099. }
  1100. s.Networks[i].Target = apiNetwork.ID
  1101. }
  1102. return nil
  1103. }
  1104. func getNetwork(ctx context.Context, c swarmapi.ControlClient, input string) (*swarmapi.Network, error) {
  1105. // GetNetwork to match via full ID.
  1106. rg, err := c.GetNetwork(ctx, &swarmapi.GetNetworkRequest{NetworkID: input})
  1107. if err != nil {
  1108. // If any error (including NotFound), ListNetworks to match via ID prefix and full name.
  1109. rl, err := c.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: &swarmapi.ListNetworksRequest_Filters{Names: []string{input}}})
  1110. if err != nil || len(rl.Networks) == 0 {
  1111. rl, err = c.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: &swarmapi.ListNetworksRequest_Filters{IDPrefixes: []string{input}}})
  1112. }
  1113. if err != nil {
  1114. return nil, err
  1115. }
  1116. if len(rl.Networks) == 0 {
  1117. return nil, fmt.Errorf("network %s not found", input)
  1118. }
  1119. if l := len(rl.Networks); l > 1 {
  1120. return nil, fmt.Errorf("network %s is ambiguous (%d matches found)", input, l)
  1121. }
  1122. return rl.Networks[0], nil
  1123. }
  1124. return rg.Network, nil
  1125. }
  1126. // Cleanup stops active swarm node. This is run before daemon shutdown.
  1127. func (c *Cluster) Cleanup() {
  1128. c.Lock()
  1129. node := c.node
  1130. if node == nil {
  1131. c.Unlock()
  1132. return
  1133. }
  1134. defer c.Unlock()
  1135. if c.isActiveManager() {
  1136. active, reachable, unreachable, err := c.managerStats()
  1137. if err == nil {
  1138. singlenode := active && isLastManager(reachable, unreachable)
  1139. if active && !singlenode && removingManagerCausesLossOfQuorum(reachable, unreachable) {
  1140. logrus.Errorf("Leaving cluster with %v managers left out of %v. Raft quorum will be lost.", reachable-1, reachable+unreachable)
  1141. }
  1142. }
  1143. }
  1144. c.stopNode()
  1145. }
  1146. func (c *Cluster) managerStats() (current bool, reachable int, unreachable int, err error) {
  1147. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  1148. defer cancel()
  1149. nodes, err := c.client.ListNodes(ctx, &swarmapi.ListNodesRequest{})
  1150. if err != nil {
  1151. return false, 0, 0, err
  1152. }
  1153. for _, n := range nodes.Nodes {
  1154. if n.ManagerStatus != nil {
  1155. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_REACHABLE {
  1156. reachable++
  1157. if n.ID == c.node.NodeID() {
  1158. current = true
  1159. }
  1160. }
  1161. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_UNREACHABLE {
  1162. unreachable++
  1163. }
  1164. }
  1165. }
  1166. return
  1167. }
  1168. func validateAndSanitizeInitRequest(req *types.InitRequest) error {
  1169. var err error
  1170. req.ListenAddr, err = validateAddr(req.ListenAddr)
  1171. if err != nil {
  1172. return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
  1173. }
  1174. spec := &req.Spec
  1175. // provide sane defaults instead of erroring
  1176. if spec.Name == "" {
  1177. spec.Name = "default"
  1178. }
  1179. if spec.Raft.SnapshotInterval == 0 {
  1180. spec.Raft.SnapshotInterval = defaultSpec.Raft.SnapshotInterval
  1181. }
  1182. if spec.Raft.LogEntriesForSlowFollowers == 0 {
  1183. spec.Raft.LogEntriesForSlowFollowers = defaultSpec.Raft.LogEntriesForSlowFollowers
  1184. }
  1185. if spec.Raft.ElectionTick == 0 {
  1186. spec.Raft.ElectionTick = defaultSpec.Raft.ElectionTick
  1187. }
  1188. if spec.Raft.HeartbeatTick == 0 {
  1189. spec.Raft.HeartbeatTick = defaultSpec.Raft.HeartbeatTick
  1190. }
  1191. if spec.Dispatcher.HeartbeatPeriod == 0 {
  1192. spec.Dispatcher.HeartbeatPeriod = defaultSpec.Dispatcher.HeartbeatPeriod
  1193. }
  1194. if spec.CAConfig.NodeCertExpiry == 0 {
  1195. spec.CAConfig.NodeCertExpiry = defaultSpec.CAConfig.NodeCertExpiry
  1196. }
  1197. if spec.Orchestration.TaskHistoryRetentionLimit == 0 {
  1198. spec.Orchestration.TaskHistoryRetentionLimit = defaultSpec.Orchestration.TaskHistoryRetentionLimit
  1199. }
  1200. return nil
  1201. }
  1202. func validateAndSanitizeJoinRequest(req *types.JoinRequest) error {
  1203. var err error
  1204. req.ListenAddr, err = validateAddr(req.ListenAddr)
  1205. if err != nil {
  1206. return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
  1207. }
  1208. if len(req.RemoteAddrs) == 0 {
  1209. return fmt.Errorf("at least 1 RemoteAddr is required to join")
  1210. }
  1211. for i := range req.RemoteAddrs {
  1212. req.RemoteAddrs[i], err = validateAddr(req.RemoteAddrs[i])
  1213. if err != nil {
  1214. return fmt.Errorf("invalid remoteAddr %q: %v", req.RemoteAddrs[i], err)
  1215. }
  1216. }
  1217. return nil
  1218. }
  1219. func validateAddr(addr string) (string, error) {
  1220. if addr == "" {
  1221. return addr, fmt.Errorf("invalid empty address")
  1222. }
  1223. newaddr, err := opts.ParseTCPAddr(addr, defaultAddr)
  1224. if err != nil {
  1225. return addr, nil
  1226. }
  1227. return strings.TrimPrefix(newaddr, "tcp://"), nil
  1228. }
  1229. func initClusterSpec(node *node, spec types.Spec) error {
  1230. ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
  1231. for conn := range node.ListenControlSocket(ctx) {
  1232. if ctx.Err() != nil {
  1233. return ctx.Err()
  1234. }
  1235. if conn != nil {
  1236. client := swarmapi.NewControlClient(conn)
  1237. var cluster *swarmapi.Cluster
  1238. for i := 0; ; i++ {
  1239. lcr, err := client.ListClusters(ctx, &swarmapi.ListClustersRequest{})
  1240. if err != nil {
  1241. return fmt.Errorf("error on listing clusters: %v", err)
  1242. }
  1243. if len(lcr.Clusters) == 0 {
  1244. if i < 10 {
  1245. time.Sleep(200 * time.Millisecond)
  1246. continue
  1247. }
  1248. return fmt.Errorf("empty list of clusters was returned")
  1249. }
  1250. cluster = lcr.Clusters[0]
  1251. break
  1252. }
  1253. newspec, err := convert.SwarmSpecToGRPC(spec)
  1254. if err != nil {
  1255. return fmt.Errorf("error updating cluster settings: %v", err)
  1256. }
  1257. _, err = client.UpdateCluster(ctx, &swarmapi.UpdateClusterRequest{
  1258. ClusterID: cluster.ID,
  1259. ClusterVersion: &cluster.Meta.Version,
  1260. Spec: &newspec,
  1261. })
  1262. if err != nil {
  1263. return fmt.Errorf("error updating cluster settings: %v", err)
  1264. }
  1265. return nil
  1266. }
  1267. }
  1268. return ctx.Err()
  1269. }