cluster.go 41 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618
  1. package cluster
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io/ioutil"
  6. "net"
  7. "os"
  8. "path/filepath"
  9. "strings"
  10. "sync"
  11. "time"
  12. "google.golang.org/grpc"
  13. "github.com/Sirupsen/logrus"
  14. "github.com/docker/docker/api/errors"
  15. apitypes "github.com/docker/docker/api/types"
  16. "github.com/docker/docker/api/types/filters"
  17. "github.com/docker/docker/api/types/network"
  18. types "github.com/docker/docker/api/types/swarm"
  19. "github.com/docker/docker/daemon/cluster/convert"
  20. executorpkg "github.com/docker/docker/daemon/cluster/executor"
  21. "github.com/docker/docker/daemon/cluster/executor/container"
  22. "github.com/docker/docker/opts"
  23. "github.com/docker/docker/pkg/ioutils"
  24. "github.com/docker/docker/pkg/signal"
  25. "github.com/docker/docker/runconfig"
  26. swarmagent "github.com/docker/swarmkit/agent"
  27. swarmapi "github.com/docker/swarmkit/api"
  28. "golang.org/x/net/context"
  29. )
  30. const swarmDirName = "swarm"
  31. const controlSocket = "control.sock"
  32. const swarmConnectTimeout = 20 * time.Second
  33. const swarmRequestTimeout = 20 * time.Second
  34. const stateFile = "docker-state.json"
  35. const defaultAddr = "0.0.0.0:2377"
  36. const (
  37. initialReconnectDelay = 100 * time.Millisecond
  38. maxReconnectDelay = 30 * time.Second
  39. )
  40. // ErrNoSwarm is returned on leaving a cluster that was never initialized
  41. var ErrNoSwarm = fmt.Errorf("This node is not part of a swarm")
  42. // ErrSwarmExists is returned on initialize or join request for a cluster that has already been activated
  43. var ErrSwarmExists = fmt.Errorf("This node is already part of a swarm. Use \"docker swarm leave\" to leave this swarm and join another one.")
  44. // ErrPendingSwarmExists is returned on initialize or join request for a cluster that is already processing a similar request but has not succeeded yet.
  45. var ErrPendingSwarmExists = fmt.Errorf("This node is processing an existing join request that has not succeeded yet. Use \"docker swarm leave\" to cancel the current request.")
  46. // ErrSwarmJoinTimeoutReached is returned when cluster join could not complete before timeout was reached.
  47. var ErrSwarmJoinTimeoutReached = fmt.Errorf("Timeout was reached before node was joined. The attempt to join the swarm will continue in the background. Use the \"docker info\" command to see the current swarm status of your node.")
  48. // defaultSpec contains some sane defaults if cluster options are missing on init
  49. var defaultSpec = types.Spec{
  50. Raft: types.RaftConfig{
  51. SnapshotInterval: 10000,
  52. KeepOldSnapshots: 0,
  53. LogEntriesForSlowFollowers: 500,
  54. HeartbeatTick: 1,
  55. ElectionTick: 3,
  56. },
  57. CAConfig: types.CAConfig{
  58. NodeCertExpiry: 90 * 24 * time.Hour,
  59. },
  60. Dispatcher: types.DispatcherConfig{
  61. HeartbeatPeriod: 5 * time.Second,
  62. },
  63. Orchestration: types.OrchestrationConfig{
  64. TaskHistoryRetentionLimit: 10,
  65. },
  66. }
  67. type state struct {
  68. // LocalAddr is this machine's local IP or hostname, if specified.
  69. LocalAddr string
  70. // RemoteAddr is the address that was given to "swarm join. It is used
  71. // to find LocalAddr if necessary.
  72. RemoteAddr string
  73. // ListenAddr is the address we bind to, including a port.
  74. ListenAddr string
  75. // AdvertiseAddr is the address other nodes should connect to,
  76. // including a port.
  77. AdvertiseAddr string
  78. }
  79. // NetworkSubnetsProvider exposes functions for retrieving the subnets
  80. // of networks managed by Docker, so they can be filtered.
  81. type NetworkSubnetsProvider interface {
  82. V4Subnets() []net.IPNet
  83. V6Subnets() []net.IPNet
  84. }
  85. // Config provides values for Cluster.
  86. type Config struct {
  87. Root string
  88. Name string
  89. Backend executorpkg.Backend
  90. NetworkSubnetsProvider NetworkSubnetsProvider
  91. // DefaultAdvertiseAddr is the default host/IP or network interface to use
  92. // if no AdvertiseAddr value is specified.
  93. DefaultAdvertiseAddr string
  94. // path to store runtime state, such as the swarm control socket
  95. RuntimeRoot string
  96. }
  97. // Cluster provides capabilities to participate in a cluster as a worker or a
  98. // manager.
  99. type Cluster struct {
  100. sync.RWMutex
  101. *node
  102. root string
  103. runtimeRoot string
  104. config Config
  105. configEvent chan struct{} // todo: make this array and goroutine safe
  106. localAddr string
  107. actualLocalAddr string // after resolution, not persisted
  108. remoteAddr string
  109. listenAddr string
  110. advertiseAddr string
  111. stop bool
  112. err error
  113. cancelDelay func()
  114. attachers map[string]*attacher
  115. }
  116. // attacher manages the in-memory attachment state of a container
  117. // attachment to a global scope network managed by swarm manager. It
  118. // helps in identifying the attachment ID via the taskID and the
  119. // corresponding attachment configuration obtained from the manager.
  120. type attacher struct {
  121. taskID string
  122. config *network.NetworkingConfig
  123. attachWaitCh chan *network.NetworkingConfig
  124. attachCompleteCh chan struct{}
  125. detachWaitCh chan struct{}
  126. }
  127. type node struct {
  128. *swarmagent.Node
  129. done chan struct{}
  130. ready bool
  131. conn *grpc.ClientConn
  132. client swarmapi.ControlClient
  133. reconnectDelay time.Duration
  134. }
  135. // New creates a new Cluster instance using provided config.
  136. func New(config Config) (*Cluster, error) {
  137. root := filepath.Join(config.Root, swarmDirName)
  138. if err := os.MkdirAll(root, 0700); err != nil {
  139. return nil, err
  140. }
  141. if config.RuntimeRoot == "" {
  142. config.RuntimeRoot = root
  143. }
  144. if err := os.MkdirAll(config.RuntimeRoot, 0700); err != nil {
  145. return nil, err
  146. }
  147. c := &Cluster{
  148. root: root,
  149. config: config,
  150. configEvent: make(chan struct{}, 10),
  151. runtimeRoot: config.RuntimeRoot,
  152. attachers: make(map[string]*attacher),
  153. }
  154. st, err := c.loadState()
  155. if err != nil {
  156. if os.IsNotExist(err) {
  157. return c, nil
  158. }
  159. return nil, err
  160. }
  161. n, err := c.startNewNode(false, st.LocalAddr, st.RemoteAddr, st.ListenAddr, st.AdvertiseAddr, "", "")
  162. if err != nil {
  163. return nil, err
  164. }
  165. select {
  166. case <-time.After(swarmConnectTimeout):
  167. logrus.Errorf("swarm component could not be started before timeout was reached")
  168. case <-n.Ready():
  169. case <-n.done:
  170. return nil, fmt.Errorf("swarm component could not be started: %v", c.err)
  171. }
  172. go c.reconnectOnFailure(n)
  173. return c, nil
  174. }
  175. func (c *Cluster) loadState() (*state, error) {
  176. dt, err := ioutil.ReadFile(filepath.Join(c.root, stateFile))
  177. if err != nil {
  178. return nil, err
  179. }
  180. // missing certificate means no actual state to restore from
  181. if _, err := os.Stat(filepath.Join(c.root, "certificates/swarm-node.crt")); err != nil {
  182. if os.IsNotExist(err) {
  183. c.clearState()
  184. }
  185. return nil, err
  186. }
  187. var st state
  188. if err := json.Unmarshal(dt, &st); err != nil {
  189. return nil, err
  190. }
  191. return &st, nil
  192. }
  193. func (c *Cluster) saveState() error {
  194. dt, err := json.Marshal(state{
  195. LocalAddr: c.localAddr,
  196. RemoteAddr: c.remoteAddr,
  197. ListenAddr: c.listenAddr,
  198. AdvertiseAddr: c.advertiseAddr,
  199. })
  200. if err != nil {
  201. return err
  202. }
  203. return ioutils.AtomicWriteFile(filepath.Join(c.root, stateFile), dt, 0600)
  204. }
  205. func (c *Cluster) reconnectOnFailure(n *node) {
  206. for {
  207. <-n.done
  208. c.Lock()
  209. if c.stop || c.node != nil {
  210. c.Unlock()
  211. return
  212. }
  213. n.reconnectDelay *= 2
  214. if n.reconnectDelay > maxReconnectDelay {
  215. n.reconnectDelay = maxReconnectDelay
  216. }
  217. logrus.Warnf("Restarting swarm in %.2f seconds", n.reconnectDelay.Seconds())
  218. delayCtx, cancel := context.WithTimeout(context.Background(), n.reconnectDelay)
  219. c.cancelDelay = cancel
  220. c.Unlock()
  221. <-delayCtx.Done()
  222. if delayCtx.Err() != context.DeadlineExceeded {
  223. return
  224. }
  225. c.Lock()
  226. if c.node != nil {
  227. c.Unlock()
  228. return
  229. }
  230. var err error
  231. n, err = c.startNewNode(false, c.localAddr, c.getRemoteAddress(), c.listenAddr, c.advertiseAddr, c.getRemoteAddress(), "")
  232. if err != nil {
  233. c.err = err
  234. close(n.done)
  235. }
  236. c.Unlock()
  237. }
  238. }
  239. func (c *Cluster) startNewNode(forceNewCluster bool, localAddr, remoteAddr, listenAddr, advertiseAddr, joinAddr, joinToken string) (*node, error) {
  240. if err := c.config.Backend.IsSwarmCompatible(); err != nil {
  241. return nil, err
  242. }
  243. actualLocalAddr := localAddr
  244. if actualLocalAddr == "" {
  245. // If localAddr was not specified, resolve it automatically
  246. // based on the route to joinAddr. localAddr can only be left
  247. // empty on "join".
  248. listenHost, _, err := net.SplitHostPort(listenAddr)
  249. if err != nil {
  250. return nil, fmt.Errorf("could not parse listen address: %v", err)
  251. }
  252. listenAddrIP := net.ParseIP(listenHost)
  253. if listenAddrIP == nil || !listenAddrIP.IsUnspecified() {
  254. actualLocalAddr = listenHost
  255. } else {
  256. if remoteAddr == "" {
  257. // Should never happen except using swarms created by
  258. // old versions that didn't save remoteAddr.
  259. remoteAddr = "8.8.8.8:53"
  260. }
  261. conn, err := net.Dial("udp", remoteAddr)
  262. if err != nil {
  263. return nil, fmt.Errorf("could not find local IP address: %v", err)
  264. }
  265. localHostPort := conn.LocalAddr().String()
  266. actualLocalAddr, _, _ = net.SplitHostPort(localHostPort)
  267. conn.Close()
  268. }
  269. }
  270. c.node = nil
  271. c.cancelDelay = nil
  272. c.stop = false
  273. n, err := swarmagent.NewNode(&swarmagent.NodeConfig{
  274. Hostname: c.config.Name,
  275. ForceNewCluster: forceNewCluster,
  276. ListenControlAPI: filepath.Join(c.runtimeRoot, controlSocket),
  277. ListenRemoteAPI: listenAddr,
  278. AdvertiseRemoteAPI: advertiseAddr,
  279. JoinAddr: joinAddr,
  280. StateDir: c.root,
  281. JoinToken: joinToken,
  282. Executor: container.NewExecutor(c.config.Backend),
  283. HeartbeatTick: 1,
  284. ElectionTick: 3,
  285. })
  286. if err != nil {
  287. return nil, err
  288. }
  289. ctx := context.Background()
  290. if err := n.Start(ctx); err != nil {
  291. return nil, err
  292. }
  293. node := &node{
  294. Node: n,
  295. done: make(chan struct{}),
  296. reconnectDelay: initialReconnectDelay,
  297. }
  298. c.node = node
  299. c.localAddr = localAddr
  300. c.actualLocalAddr = actualLocalAddr // not saved
  301. c.remoteAddr = remoteAddr
  302. c.listenAddr = listenAddr
  303. c.advertiseAddr = advertiseAddr
  304. c.saveState()
  305. c.config.Backend.SetClusterProvider(c)
  306. go func() {
  307. err := n.Err(ctx)
  308. if err != nil {
  309. logrus.Errorf("cluster exited with error: %v", err)
  310. }
  311. c.Lock()
  312. c.node = nil
  313. c.err = err
  314. c.Unlock()
  315. close(node.done)
  316. }()
  317. go func() {
  318. select {
  319. case <-n.Ready():
  320. c.Lock()
  321. node.ready = true
  322. c.err = nil
  323. c.Unlock()
  324. case <-ctx.Done():
  325. }
  326. c.configEvent <- struct{}{}
  327. }()
  328. go func() {
  329. for conn := range n.ListenControlSocket(ctx) {
  330. c.Lock()
  331. if node.conn != conn {
  332. if conn == nil {
  333. node.client = nil
  334. } else {
  335. node.client = swarmapi.NewControlClient(conn)
  336. }
  337. }
  338. node.conn = conn
  339. c.Unlock()
  340. c.configEvent <- struct{}{}
  341. }
  342. }()
  343. return node, nil
  344. }
  345. // Init initializes new cluster from user provided request.
  346. func (c *Cluster) Init(req types.InitRequest) (string, error) {
  347. c.Lock()
  348. if node := c.node; node != nil {
  349. if !req.ForceNewCluster {
  350. c.Unlock()
  351. return "", ErrSwarmExists
  352. }
  353. if err := c.stopNode(); err != nil {
  354. c.Unlock()
  355. return "", err
  356. }
  357. }
  358. if err := validateAndSanitizeInitRequest(&req); err != nil {
  359. c.Unlock()
  360. return "", err
  361. }
  362. listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
  363. if err != nil {
  364. c.Unlock()
  365. return "", err
  366. }
  367. advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
  368. if err != nil {
  369. c.Unlock()
  370. return "", err
  371. }
  372. localAddr := listenHost
  373. // If the advertise address is not one of the system's
  374. // addresses, we also require a listen address.
  375. listenAddrIP := net.ParseIP(listenHost)
  376. if listenAddrIP != nil && listenAddrIP.IsUnspecified() {
  377. advertiseIP := net.ParseIP(advertiseHost)
  378. if advertiseIP == nil {
  379. // not an IP
  380. c.Unlock()
  381. return "", errMustSpecifyListenAddr
  382. }
  383. systemIPs := listSystemIPs()
  384. found := false
  385. for _, systemIP := range systemIPs {
  386. if systemIP.Equal(advertiseIP) {
  387. found = true
  388. break
  389. }
  390. }
  391. if !found {
  392. c.Unlock()
  393. return "", errMustSpecifyListenAddr
  394. }
  395. localAddr = advertiseIP.String()
  396. }
  397. // todo: check current state existing
  398. n, err := c.startNewNode(req.ForceNewCluster, localAddr, "", net.JoinHostPort(listenHost, listenPort), net.JoinHostPort(advertiseHost, advertisePort), "", "")
  399. if err != nil {
  400. c.Unlock()
  401. return "", err
  402. }
  403. c.Unlock()
  404. select {
  405. case <-n.Ready():
  406. if err := initClusterSpec(n, req.Spec); err != nil {
  407. return "", err
  408. }
  409. go c.reconnectOnFailure(n)
  410. return n.NodeID(), nil
  411. case <-n.done:
  412. c.RLock()
  413. defer c.RUnlock()
  414. if !req.ForceNewCluster { // if failure on first attempt don't keep state
  415. if err := c.clearState(); err != nil {
  416. return "", err
  417. }
  418. }
  419. return "", c.err
  420. }
  421. }
  422. // Join makes current Cluster part of an existing swarm cluster.
  423. func (c *Cluster) Join(req types.JoinRequest) error {
  424. c.Lock()
  425. if node := c.node; node != nil {
  426. c.Unlock()
  427. return ErrSwarmExists
  428. }
  429. if err := validateAndSanitizeJoinRequest(&req); err != nil {
  430. c.Unlock()
  431. return err
  432. }
  433. listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
  434. if err != nil {
  435. c.Unlock()
  436. return err
  437. }
  438. var advertiseAddr string
  439. if req.AdvertiseAddr != "" {
  440. advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
  441. // For joining, we don't need to provide an advertise address,
  442. // since the remote side can detect it.
  443. if err == nil {
  444. advertiseAddr = net.JoinHostPort(advertiseHost, advertisePort)
  445. }
  446. }
  447. // todo: check current state existing
  448. n, err := c.startNewNode(false, "", req.RemoteAddrs[0], net.JoinHostPort(listenHost, listenPort), advertiseAddr, req.RemoteAddrs[0], req.JoinToken)
  449. if err != nil {
  450. c.Unlock()
  451. return err
  452. }
  453. c.Unlock()
  454. select {
  455. case <-time.After(swarmConnectTimeout):
  456. // attempt to connect will continue in background, also reconnecting
  457. go c.reconnectOnFailure(n)
  458. return ErrSwarmJoinTimeoutReached
  459. case <-n.Ready():
  460. go c.reconnectOnFailure(n)
  461. return nil
  462. case <-n.done:
  463. c.RLock()
  464. defer c.RUnlock()
  465. return c.err
  466. }
  467. }
  468. // stopNode is a helper that stops the active c.node and waits until it has
  469. // shut down. Call while keeping the cluster lock.
  470. func (c *Cluster) stopNode() error {
  471. if c.node == nil {
  472. return nil
  473. }
  474. c.stop = true
  475. if c.cancelDelay != nil {
  476. c.cancelDelay()
  477. c.cancelDelay = nil
  478. }
  479. node := c.node
  480. ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
  481. defer cancel()
  482. // TODO: can't hold lock on stop because it calls back to network
  483. c.Unlock()
  484. defer c.Lock()
  485. if err := node.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") {
  486. return err
  487. }
  488. <-node.done
  489. return nil
  490. }
  491. func removingManagerCausesLossOfQuorum(reachable, unreachable int) bool {
  492. return reachable-2 <= unreachable
  493. }
  494. func isLastManager(reachable, unreachable int) bool {
  495. return reachable == 1 && unreachable == 0
  496. }
  497. // Leave shuts down Cluster and removes current state.
  498. func (c *Cluster) Leave(force bool) error {
  499. c.Lock()
  500. node := c.node
  501. if node == nil {
  502. c.Unlock()
  503. return ErrNoSwarm
  504. }
  505. if node.Manager() != nil && !force {
  506. msg := "You are attempting to leave the swarm on a node that is participating as a manager. "
  507. if c.isActiveManager() {
  508. active, reachable, unreachable, err := c.managerStats()
  509. if err == nil {
  510. if active && removingManagerCausesLossOfQuorum(reachable, unreachable) {
  511. if isLastManager(reachable, unreachable) {
  512. msg += "Removing the last manager erases all current state of the swarm. Use `--force` to ignore this message. "
  513. c.Unlock()
  514. return fmt.Errorf(msg)
  515. }
  516. msg += fmt.Sprintf("Removing this node leaves %v managers out of %v. Without a Raft quorum your swarm will be inaccessible. ", reachable-1, reachable+unreachable)
  517. }
  518. }
  519. } else {
  520. msg += "Doing so may lose the consensus of your cluster. "
  521. }
  522. msg += "The only way to restore a swarm that has lost consensus is to reinitialize it with `--force-new-cluster`. Use `--force` to suppress this message."
  523. c.Unlock()
  524. return fmt.Errorf(msg)
  525. }
  526. if err := c.stopNode(); err != nil {
  527. logrus.Errorf("failed to shut down cluster node: %v", err)
  528. signal.DumpStacks("")
  529. c.Unlock()
  530. return err
  531. }
  532. c.Unlock()
  533. if nodeID := node.NodeID(); nodeID != "" {
  534. nodeContainers, err := c.listContainerForNode(nodeID)
  535. if err != nil {
  536. return err
  537. }
  538. for _, id := range nodeContainers {
  539. if err := c.config.Backend.ContainerRm(id, &apitypes.ContainerRmConfig{ForceRemove: true}); err != nil {
  540. logrus.Errorf("error removing %v: %v", id, err)
  541. }
  542. }
  543. }
  544. c.configEvent <- struct{}{}
  545. // todo: cleanup optional?
  546. if err := c.clearState(); err != nil {
  547. return err
  548. }
  549. return nil
  550. }
  551. func (c *Cluster) listContainerForNode(nodeID string) ([]string, error) {
  552. var ids []string
  553. filters := filters.NewArgs()
  554. filters.Add("label", fmt.Sprintf("com.docker.swarm.node.id=%s", nodeID))
  555. containers, err := c.config.Backend.Containers(&apitypes.ContainerListOptions{
  556. Filter: filters,
  557. })
  558. if err != nil {
  559. return []string{}, err
  560. }
  561. for _, c := range containers {
  562. ids = append(ids, c.ID)
  563. }
  564. return ids, nil
  565. }
  566. func (c *Cluster) clearState() error {
  567. // todo: backup this data instead of removing?
  568. if err := os.RemoveAll(c.root); err != nil {
  569. return err
  570. }
  571. if err := os.MkdirAll(c.root, 0700); err != nil {
  572. return err
  573. }
  574. c.config.Backend.SetClusterProvider(nil)
  575. return nil
  576. }
  577. func (c *Cluster) getRequestContext() (context.Context, func()) { // TODO: not needed when requests don't block on qourum lost
  578. return context.WithTimeout(context.Background(), swarmRequestTimeout)
  579. }
  580. // Inspect retrieves the configuration properties of a managed swarm cluster.
  581. func (c *Cluster) Inspect() (types.Swarm, error) {
  582. c.RLock()
  583. defer c.RUnlock()
  584. if !c.isActiveManager() {
  585. return types.Swarm{}, c.errNoManager()
  586. }
  587. ctx, cancel := c.getRequestContext()
  588. defer cancel()
  589. swarm, err := getSwarm(ctx, c.client)
  590. if err != nil {
  591. return types.Swarm{}, err
  592. }
  593. if err != nil {
  594. return types.Swarm{}, err
  595. }
  596. return convert.SwarmFromGRPC(*swarm), nil
  597. }
  598. // Update updates configuration of a managed swarm cluster.
  599. func (c *Cluster) Update(version uint64, spec types.Spec, flags types.UpdateFlags) error {
  600. c.RLock()
  601. defer c.RUnlock()
  602. if !c.isActiveManager() {
  603. return c.errNoManager()
  604. }
  605. ctx, cancel := c.getRequestContext()
  606. defer cancel()
  607. swarm, err := getSwarm(ctx, c.client)
  608. if err != nil {
  609. return err
  610. }
  611. swarmSpec, err := convert.SwarmSpecToGRPC(spec)
  612. if err != nil {
  613. return err
  614. }
  615. _, err = c.client.UpdateCluster(
  616. ctx,
  617. &swarmapi.UpdateClusterRequest{
  618. ClusterID: swarm.ID,
  619. Spec: &swarmSpec,
  620. ClusterVersion: &swarmapi.Version{
  621. Index: version,
  622. },
  623. Rotation: swarmapi.JoinTokenRotation{
  624. RotateWorkerToken: flags.RotateWorkerToken,
  625. RotateManagerToken: flags.RotateManagerToken,
  626. },
  627. },
  628. )
  629. return err
  630. }
  631. // IsManager returns true if Cluster is participating as a manager.
  632. func (c *Cluster) IsManager() bool {
  633. c.RLock()
  634. defer c.RUnlock()
  635. return c.isActiveManager()
  636. }
  637. // IsAgent returns true if Cluster is participating as a worker/agent.
  638. func (c *Cluster) IsAgent() bool {
  639. c.RLock()
  640. defer c.RUnlock()
  641. return c.node != nil && c.ready
  642. }
  643. // GetLocalAddress returns the local address.
  644. func (c *Cluster) GetLocalAddress() string {
  645. c.RLock()
  646. defer c.RUnlock()
  647. return c.actualLocalAddr
  648. }
  649. // GetListenAddress returns the listen address.
  650. func (c *Cluster) GetListenAddress() string {
  651. c.RLock()
  652. defer c.RUnlock()
  653. return c.listenAddr
  654. }
  655. // GetAdvertiseAddress returns the remotely reachable address of this node.
  656. func (c *Cluster) GetAdvertiseAddress() string {
  657. c.RLock()
  658. defer c.RUnlock()
  659. if c.advertiseAddr != "" {
  660. advertiseHost, _, _ := net.SplitHostPort(c.advertiseAddr)
  661. return advertiseHost
  662. }
  663. return c.actualLocalAddr
  664. }
  665. // GetRemoteAddress returns a known advertise address of a remote manager if
  666. // available.
  667. // todo: change to array/connect with info
  668. func (c *Cluster) GetRemoteAddress() string {
  669. c.RLock()
  670. defer c.RUnlock()
  671. return c.getRemoteAddress()
  672. }
  673. func (c *Cluster) getRemoteAddress() string {
  674. if c.node == nil {
  675. return ""
  676. }
  677. nodeID := c.node.NodeID()
  678. for _, r := range c.node.Remotes() {
  679. if r.NodeID != nodeID {
  680. return r.Addr
  681. }
  682. }
  683. return ""
  684. }
  685. // ListenClusterEvents returns a channel that receives messages on cluster
  686. // participation changes.
  687. // todo: make cancelable and accessible to multiple callers
  688. func (c *Cluster) ListenClusterEvents() <-chan struct{} {
  689. return c.configEvent
  690. }
  691. // Info returns information about the current cluster state.
  692. func (c *Cluster) Info() types.Info {
  693. info := types.Info{
  694. NodeAddr: c.GetAdvertiseAddress(),
  695. }
  696. c.RLock()
  697. defer c.RUnlock()
  698. if c.node == nil {
  699. info.LocalNodeState = types.LocalNodeStateInactive
  700. if c.cancelDelay != nil {
  701. info.LocalNodeState = types.LocalNodeStateError
  702. }
  703. } else {
  704. info.LocalNodeState = types.LocalNodeStatePending
  705. if c.ready == true {
  706. info.LocalNodeState = types.LocalNodeStateActive
  707. }
  708. }
  709. if c.err != nil {
  710. info.Error = c.err.Error()
  711. }
  712. ctx, cancel := c.getRequestContext()
  713. defer cancel()
  714. if c.isActiveManager() {
  715. info.ControlAvailable = true
  716. swarm, err := c.Inspect()
  717. if err != nil {
  718. info.Error = err.Error()
  719. }
  720. // Strip JoinTokens
  721. info.Cluster = swarm.ClusterInfo
  722. if r, err := c.client.ListNodes(ctx, &swarmapi.ListNodesRequest{}); err == nil {
  723. info.Nodes = len(r.Nodes)
  724. for _, n := range r.Nodes {
  725. if n.ManagerStatus != nil {
  726. info.Managers = info.Managers + 1
  727. }
  728. }
  729. }
  730. }
  731. if c.node != nil {
  732. for _, r := range c.node.Remotes() {
  733. info.RemoteManagers = append(info.RemoteManagers, types.Peer{NodeID: r.NodeID, Addr: r.Addr})
  734. }
  735. info.NodeID = c.node.NodeID()
  736. }
  737. return info
  738. }
  739. // isActiveManager should not be called without a read lock
  740. func (c *Cluster) isActiveManager() bool {
  741. return c.node != nil && c.conn != nil
  742. }
  743. // errNoManager returns error describing why manager commands can't be used.
  744. // Call with read lock.
  745. func (c *Cluster) errNoManager() error {
  746. if c.node == nil {
  747. return fmt.Errorf("This node is not a swarm manager. Use \"docker swarm init\" or \"docker swarm join\" to connect this node to swarm and try again.")
  748. }
  749. if c.node.Manager() != nil {
  750. return fmt.Errorf("This node is not a swarm manager. Manager is being prepared or has trouble connecting to the cluster.")
  751. }
  752. return fmt.Errorf("This node is not a swarm manager. Worker nodes can't be used to view or modify cluster state. Please run this command on a manager node or promote the current node to a manager.")
  753. }
  754. // GetServices returns all services of a managed swarm cluster.
  755. func (c *Cluster) GetServices(options apitypes.ServiceListOptions) ([]types.Service, error) {
  756. c.RLock()
  757. defer c.RUnlock()
  758. if !c.isActiveManager() {
  759. return nil, c.errNoManager()
  760. }
  761. filters, err := newListServicesFilters(options.Filter)
  762. if err != nil {
  763. return nil, err
  764. }
  765. ctx, cancel := c.getRequestContext()
  766. defer cancel()
  767. r, err := c.client.ListServices(
  768. ctx,
  769. &swarmapi.ListServicesRequest{Filters: filters})
  770. if err != nil {
  771. return nil, err
  772. }
  773. services := []types.Service{}
  774. for _, service := range r.Services {
  775. services = append(services, convert.ServiceFromGRPC(*service))
  776. }
  777. return services, nil
  778. }
  779. // CreateService creates a new service in a managed swarm cluster.
  780. func (c *Cluster) CreateService(s types.ServiceSpec, encodedAuth string) (string, error) {
  781. c.RLock()
  782. defer c.RUnlock()
  783. if !c.isActiveManager() {
  784. return "", c.errNoManager()
  785. }
  786. ctx, cancel := c.getRequestContext()
  787. defer cancel()
  788. err := c.populateNetworkID(ctx, c.client, &s)
  789. if err != nil {
  790. return "", err
  791. }
  792. serviceSpec, err := convert.ServiceSpecToGRPC(s)
  793. if err != nil {
  794. return "", err
  795. }
  796. if encodedAuth != "" {
  797. ctnr := serviceSpec.Task.GetContainer()
  798. if ctnr == nil {
  799. return "", fmt.Errorf("service does not use container tasks")
  800. }
  801. ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
  802. }
  803. r, err := c.client.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
  804. if err != nil {
  805. return "", err
  806. }
  807. return r.Service.ID, nil
  808. }
  809. // GetService returns a service based on an ID or name.
  810. func (c *Cluster) GetService(input string) (types.Service, error) {
  811. c.RLock()
  812. defer c.RUnlock()
  813. if !c.isActiveManager() {
  814. return types.Service{}, c.errNoManager()
  815. }
  816. ctx, cancel := c.getRequestContext()
  817. defer cancel()
  818. service, err := getService(ctx, c.client, input)
  819. if err != nil {
  820. return types.Service{}, err
  821. }
  822. return convert.ServiceFromGRPC(*service), nil
  823. }
  824. // UpdateService updates existing service to match new properties.
  825. func (c *Cluster) UpdateService(serviceIDOrName string, version uint64, spec types.ServiceSpec, encodedAuth string) error {
  826. c.RLock()
  827. defer c.RUnlock()
  828. if !c.isActiveManager() {
  829. return c.errNoManager()
  830. }
  831. ctx, cancel := c.getRequestContext()
  832. defer cancel()
  833. err := c.populateNetworkID(ctx, c.client, &spec)
  834. if err != nil {
  835. return err
  836. }
  837. serviceSpec, err := convert.ServiceSpecToGRPC(spec)
  838. if err != nil {
  839. return err
  840. }
  841. currentService, err := getService(ctx, c.client, serviceIDOrName)
  842. if err != nil {
  843. return err
  844. }
  845. if encodedAuth != "" {
  846. ctnr := serviceSpec.Task.GetContainer()
  847. if ctnr == nil {
  848. return fmt.Errorf("service does not use container tasks")
  849. }
  850. ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
  851. } else {
  852. // this is needed because if the encodedAuth isn't being updated then we
  853. // shouldn't lose it, and continue to use the one that was already present
  854. ctnr := currentService.Spec.Task.GetContainer()
  855. if ctnr == nil {
  856. return fmt.Errorf("service does not use container tasks")
  857. }
  858. serviceSpec.Task.GetContainer().PullOptions = ctnr.PullOptions
  859. }
  860. _, err = c.client.UpdateService(
  861. ctx,
  862. &swarmapi.UpdateServiceRequest{
  863. ServiceID: currentService.ID,
  864. Spec: &serviceSpec,
  865. ServiceVersion: &swarmapi.Version{
  866. Index: version,
  867. },
  868. },
  869. )
  870. return err
  871. }
  872. // RemoveService removes a service from a managed swarm cluster.
  873. func (c *Cluster) RemoveService(input string) error {
  874. c.RLock()
  875. defer c.RUnlock()
  876. if !c.isActiveManager() {
  877. return c.errNoManager()
  878. }
  879. ctx, cancel := c.getRequestContext()
  880. defer cancel()
  881. service, err := getService(ctx, c.client, input)
  882. if err != nil {
  883. return err
  884. }
  885. if _, err := c.client.RemoveService(ctx, &swarmapi.RemoveServiceRequest{ServiceID: service.ID}); err != nil {
  886. return err
  887. }
  888. return nil
  889. }
  890. // GetNodes returns a list of all nodes known to a cluster.
  891. func (c *Cluster) GetNodes(options apitypes.NodeListOptions) ([]types.Node, error) {
  892. c.RLock()
  893. defer c.RUnlock()
  894. if !c.isActiveManager() {
  895. return nil, c.errNoManager()
  896. }
  897. filters, err := newListNodesFilters(options.Filter)
  898. if err != nil {
  899. return nil, err
  900. }
  901. ctx, cancel := c.getRequestContext()
  902. defer cancel()
  903. r, err := c.client.ListNodes(
  904. ctx,
  905. &swarmapi.ListNodesRequest{Filters: filters})
  906. if err != nil {
  907. return nil, err
  908. }
  909. nodes := []types.Node{}
  910. for _, node := range r.Nodes {
  911. nodes = append(nodes, convert.NodeFromGRPC(*node))
  912. }
  913. return nodes, nil
  914. }
  915. // GetNode returns a node based on an ID or name.
  916. func (c *Cluster) GetNode(input string) (types.Node, error) {
  917. c.RLock()
  918. defer c.RUnlock()
  919. if !c.isActiveManager() {
  920. return types.Node{}, c.errNoManager()
  921. }
  922. ctx, cancel := c.getRequestContext()
  923. defer cancel()
  924. node, err := getNode(ctx, c.client, input)
  925. if err != nil {
  926. return types.Node{}, err
  927. }
  928. return convert.NodeFromGRPC(*node), nil
  929. }
  930. // UpdateNode updates existing nodes properties.
  931. func (c *Cluster) UpdateNode(nodeID string, version uint64, spec types.NodeSpec) error {
  932. c.RLock()
  933. defer c.RUnlock()
  934. if !c.isActiveManager() {
  935. return c.errNoManager()
  936. }
  937. nodeSpec, err := convert.NodeSpecToGRPC(spec)
  938. if err != nil {
  939. return err
  940. }
  941. ctx, cancel := c.getRequestContext()
  942. defer cancel()
  943. _, err = c.client.UpdateNode(
  944. ctx,
  945. &swarmapi.UpdateNodeRequest{
  946. NodeID: nodeID,
  947. Spec: &nodeSpec,
  948. NodeVersion: &swarmapi.Version{
  949. Index: version,
  950. },
  951. },
  952. )
  953. return err
  954. }
  955. // RemoveNode removes a node from a cluster
  956. func (c *Cluster) RemoveNode(input string, force bool) error {
  957. c.RLock()
  958. defer c.RUnlock()
  959. if !c.isActiveManager() {
  960. return c.errNoManager()
  961. }
  962. ctx, cancel := c.getRequestContext()
  963. defer cancel()
  964. node, err := getNode(ctx, c.client, input)
  965. if err != nil {
  966. return err
  967. }
  968. if _, err := c.client.RemoveNode(ctx, &swarmapi.RemoveNodeRequest{NodeID: node.ID, Force: force}); err != nil {
  969. return err
  970. }
  971. return nil
  972. }
  973. // GetTasks returns a list of tasks matching the filter options.
  974. func (c *Cluster) GetTasks(options apitypes.TaskListOptions) ([]types.Task, error) {
  975. c.RLock()
  976. defer c.RUnlock()
  977. if !c.isActiveManager() {
  978. return nil, c.errNoManager()
  979. }
  980. byName := func(filter filters.Args) error {
  981. if filter.Include("service") {
  982. serviceFilters := filter.Get("service")
  983. for _, serviceFilter := range serviceFilters {
  984. service, err := c.GetService(serviceFilter)
  985. if err != nil {
  986. return err
  987. }
  988. filter.Del("service", serviceFilter)
  989. filter.Add("service", service.ID)
  990. }
  991. }
  992. if filter.Include("node") {
  993. nodeFilters := filter.Get("node")
  994. for _, nodeFilter := range nodeFilters {
  995. node, err := c.GetNode(nodeFilter)
  996. if err != nil {
  997. return err
  998. }
  999. filter.Del("node", nodeFilter)
  1000. filter.Add("node", node.ID)
  1001. }
  1002. }
  1003. return nil
  1004. }
  1005. filters, err := newListTasksFilters(options.Filter, byName)
  1006. if err != nil {
  1007. return nil, err
  1008. }
  1009. ctx, cancel := c.getRequestContext()
  1010. defer cancel()
  1011. r, err := c.client.ListTasks(
  1012. ctx,
  1013. &swarmapi.ListTasksRequest{Filters: filters})
  1014. if err != nil {
  1015. return nil, err
  1016. }
  1017. tasks := []types.Task{}
  1018. for _, task := range r.Tasks {
  1019. if task.Spec.GetContainer() != nil {
  1020. tasks = append(tasks, convert.TaskFromGRPC(*task))
  1021. }
  1022. }
  1023. return tasks, nil
  1024. }
  1025. // GetTask returns a task by an ID.
  1026. func (c *Cluster) GetTask(input string) (types.Task, error) {
  1027. c.RLock()
  1028. defer c.RUnlock()
  1029. if !c.isActiveManager() {
  1030. return types.Task{}, c.errNoManager()
  1031. }
  1032. ctx, cancel := c.getRequestContext()
  1033. defer cancel()
  1034. task, err := getTask(ctx, c.client, input)
  1035. if err != nil {
  1036. return types.Task{}, err
  1037. }
  1038. return convert.TaskFromGRPC(*task), nil
  1039. }
  1040. // GetNetwork returns a cluster network by an ID.
  1041. func (c *Cluster) GetNetwork(input string) (apitypes.NetworkResource, error) {
  1042. c.RLock()
  1043. defer c.RUnlock()
  1044. if !c.isActiveManager() {
  1045. return apitypes.NetworkResource{}, c.errNoManager()
  1046. }
  1047. ctx, cancel := c.getRequestContext()
  1048. defer cancel()
  1049. network, err := getNetwork(ctx, c.client, input)
  1050. if err != nil {
  1051. return apitypes.NetworkResource{}, err
  1052. }
  1053. return convert.BasicNetworkFromGRPC(*network), nil
  1054. }
  1055. // GetNetworks returns all current cluster managed networks.
  1056. func (c *Cluster) GetNetworks() ([]apitypes.NetworkResource, error) {
  1057. c.RLock()
  1058. defer c.RUnlock()
  1059. if !c.isActiveManager() {
  1060. return nil, c.errNoManager()
  1061. }
  1062. ctx, cancel := c.getRequestContext()
  1063. defer cancel()
  1064. r, err := c.client.ListNetworks(ctx, &swarmapi.ListNetworksRequest{})
  1065. if err != nil {
  1066. return nil, err
  1067. }
  1068. var networks []apitypes.NetworkResource
  1069. for _, network := range r.Networks {
  1070. networks = append(networks, convert.BasicNetworkFromGRPC(*network))
  1071. }
  1072. return networks, nil
  1073. }
  1074. func attacherKey(target, containerID string) string {
  1075. return containerID + ":" + target
  1076. }
  1077. // UpdateAttachment signals the attachment config to the attachment
  1078. // waiter who is trying to start or attach the container to the
  1079. // network.
  1080. func (c *Cluster) UpdateAttachment(target, containerID string, config *network.NetworkingConfig) error {
  1081. c.RLock()
  1082. attacher, ok := c.attachers[attacherKey(target, containerID)]
  1083. c.RUnlock()
  1084. if !ok || attacher == nil {
  1085. return fmt.Errorf("could not find attacher for container %s to network %s", containerID, target)
  1086. }
  1087. attacher.attachWaitCh <- config
  1088. close(attacher.attachWaitCh)
  1089. return nil
  1090. }
  1091. // WaitForDetachment waits for the container to stop or detach from
  1092. // the network.
  1093. func (c *Cluster) WaitForDetachment(ctx context.Context, networkName, networkID, taskID, containerID string) error {
  1094. c.RLock()
  1095. attacher, ok := c.attachers[attacherKey(networkName, containerID)]
  1096. if !ok {
  1097. attacher, ok = c.attachers[attacherKey(networkID, containerID)]
  1098. }
  1099. if c.node == nil || c.node.Agent() == nil {
  1100. c.RUnlock()
  1101. return fmt.Errorf("invalid cluster node while waiting for detachment")
  1102. }
  1103. agent := c.node.Agent()
  1104. c.RUnlock()
  1105. if ok && attacher != nil &&
  1106. attacher.detachWaitCh != nil &&
  1107. attacher.attachCompleteCh != nil {
  1108. // Attachment may be in progress still so wait for
  1109. // attachment to complete.
  1110. select {
  1111. case <-attacher.attachCompleteCh:
  1112. case <-ctx.Done():
  1113. return ctx.Err()
  1114. }
  1115. if attacher.taskID == taskID {
  1116. select {
  1117. case <-attacher.detachWaitCh:
  1118. case <-ctx.Done():
  1119. return ctx.Err()
  1120. }
  1121. }
  1122. }
  1123. return agent.ResourceAllocator().DetachNetwork(ctx, taskID)
  1124. }
  1125. // AttachNetwork generates an attachment request towards the manager.
  1126. func (c *Cluster) AttachNetwork(target string, containerID string, addresses []string) (*network.NetworkingConfig, error) {
  1127. aKey := attacherKey(target, containerID)
  1128. c.Lock()
  1129. if c.node == nil || c.node.Agent() == nil {
  1130. c.Unlock()
  1131. return nil, fmt.Errorf("invalid cluster node while attaching to network")
  1132. }
  1133. if attacher, ok := c.attachers[aKey]; ok {
  1134. c.Unlock()
  1135. return attacher.config, nil
  1136. }
  1137. agent := c.node.Agent()
  1138. attachWaitCh := make(chan *network.NetworkingConfig)
  1139. detachWaitCh := make(chan struct{})
  1140. attachCompleteCh := make(chan struct{})
  1141. c.attachers[aKey] = &attacher{
  1142. attachWaitCh: attachWaitCh,
  1143. attachCompleteCh: attachCompleteCh,
  1144. detachWaitCh: detachWaitCh,
  1145. }
  1146. c.Unlock()
  1147. ctx, cancel := c.getRequestContext()
  1148. defer cancel()
  1149. taskID, err := agent.ResourceAllocator().AttachNetwork(ctx, containerID, target, addresses)
  1150. if err != nil {
  1151. c.Lock()
  1152. delete(c.attachers, aKey)
  1153. c.Unlock()
  1154. return nil, fmt.Errorf("Could not attach to network %s: %v", target, err)
  1155. }
  1156. c.Lock()
  1157. c.attachers[aKey].taskID = taskID
  1158. close(attachCompleteCh)
  1159. c.Unlock()
  1160. logrus.Debugf("Successfully attached to network %s with tid %s", target, taskID)
  1161. var config *network.NetworkingConfig
  1162. select {
  1163. case config = <-attachWaitCh:
  1164. case <-ctx.Done():
  1165. return nil, fmt.Errorf("attaching to network failed, make sure your network options are correct and check manager logs: %v", ctx.Err())
  1166. }
  1167. c.Lock()
  1168. c.attachers[aKey].config = config
  1169. c.Unlock()
  1170. return config, nil
  1171. }
  1172. // DetachNetwork unblocks the waiters waiting on WaitForDetachment so
  1173. // that a request to detach can be generated towards the manager.
  1174. func (c *Cluster) DetachNetwork(target string, containerID string) error {
  1175. aKey := attacherKey(target, containerID)
  1176. c.Lock()
  1177. attacher, ok := c.attachers[aKey]
  1178. delete(c.attachers, aKey)
  1179. c.Unlock()
  1180. if !ok {
  1181. return fmt.Errorf("could not find network attachment for container %s to network %s", containerID, target)
  1182. }
  1183. close(attacher.detachWaitCh)
  1184. return nil
  1185. }
  1186. // CreateNetwork creates a new cluster managed network.
  1187. func (c *Cluster) CreateNetwork(s apitypes.NetworkCreateRequest) (string, error) {
  1188. c.RLock()
  1189. defer c.RUnlock()
  1190. if !c.isActiveManager() {
  1191. return "", c.errNoManager()
  1192. }
  1193. if runconfig.IsPreDefinedNetwork(s.Name) {
  1194. err := fmt.Errorf("%s is a pre-defined network and cannot be created", s.Name)
  1195. return "", errors.NewRequestForbiddenError(err)
  1196. }
  1197. ctx, cancel := c.getRequestContext()
  1198. defer cancel()
  1199. networkSpec := convert.BasicNetworkCreateToGRPC(s)
  1200. r, err := c.client.CreateNetwork(ctx, &swarmapi.CreateNetworkRequest{Spec: &networkSpec})
  1201. if err != nil {
  1202. return "", err
  1203. }
  1204. return r.Network.ID, nil
  1205. }
  1206. // RemoveNetwork removes a cluster network.
  1207. func (c *Cluster) RemoveNetwork(input string) error {
  1208. c.RLock()
  1209. defer c.RUnlock()
  1210. if !c.isActiveManager() {
  1211. return c.errNoManager()
  1212. }
  1213. ctx, cancel := c.getRequestContext()
  1214. defer cancel()
  1215. network, err := getNetwork(ctx, c.client, input)
  1216. if err != nil {
  1217. return err
  1218. }
  1219. if _, err := c.client.RemoveNetwork(ctx, &swarmapi.RemoveNetworkRequest{NetworkID: network.ID}); err != nil {
  1220. return err
  1221. }
  1222. return nil
  1223. }
  1224. func (c *Cluster) populateNetworkID(ctx context.Context, client swarmapi.ControlClient, s *types.ServiceSpec) error {
  1225. // Always prefer NetworkAttachmentConfigs from TaskTemplate
  1226. // but fallback to service spec for backward compatibility
  1227. networks := s.TaskTemplate.Networks
  1228. if len(networks) == 0 {
  1229. networks = s.Networks
  1230. }
  1231. for i, n := range networks {
  1232. apiNetwork, err := getNetwork(ctx, client, n.Target)
  1233. if err != nil {
  1234. if ln, _ := c.config.Backend.FindNetwork(n.Target); ln != nil && !ln.Info().Dynamic() {
  1235. err = fmt.Errorf("network %s is not eligible for docker services", ln.Name())
  1236. return errors.NewRequestForbiddenError(err)
  1237. }
  1238. return err
  1239. }
  1240. networks[i].Target = apiNetwork.ID
  1241. }
  1242. return nil
  1243. }
  1244. func getNetwork(ctx context.Context, c swarmapi.ControlClient, input string) (*swarmapi.Network, error) {
  1245. // GetNetwork to match via full ID.
  1246. rg, err := c.GetNetwork(ctx, &swarmapi.GetNetworkRequest{NetworkID: input})
  1247. if err != nil {
  1248. // If any error (including NotFound), ListNetworks to match via ID prefix and full name.
  1249. rl, err := c.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: &swarmapi.ListNetworksRequest_Filters{Names: []string{input}}})
  1250. if err != nil || len(rl.Networks) == 0 {
  1251. rl, err = c.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: &swarmapi.ListNetworksRequest_Filters{IDPrefixes: []string{input}}})
  1252. }
  1253. if err != nil {
  1254. return nil, err
  1255. }
  1256. if len(rl.Networks) == 0 {
  1257. return nil, fmt.Errorf("network %s not found", input)
  1258. }
  1259. if l := len(rl.Networks); l > 1 {
  1260. return nil, fmt.Errorf("network %s is ambiguous (%d matches found)", input, l)
  1261. }
  1262. return rl.Networks[0], nil
  1263. }
  1264. return rg.Network, nil
  1265. }
  1266. // Cleanup stops active swarm node. This is run before daemon shutdown.
  1267. func (c *Cluster) Cleanup() {
  1268. c.Lock()
  1269. node := c.node
  1270. if node == nil {
  1271. c.Unlock()
  1272. return
  1273. }
  1274. defer c.Unlock()
  1275. if c.isActiveManager() {
  1276. active, reachable, unreachable, err := c.managerStats()
  1277. if err == nil {
  1278. singlenode := active && isLastManager(reachable, unreachable)
  1279. if active && !singlenode && removingManagerCausesLossOfQuorum(reachable, unreachable) {
  1280. logrus.Errorf("Leaving cluster with %v managers left out of %v. Raft quorum will be lost.", reachable-1, reachable+unreachable)
  1281. }
  1282. }
  1283. }
  1284. c.stopNode()
  1285. }
  1286. func (c *Cluster) managerStats() (current bool, reachable int, unreachable int, err error) {
  1287. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  1288. defer cancel()
  1289. nodes, err := c.client.ListNodes(ctx, &swarmapi.ListNodesRequest{})
  1290. if err != nil {
  1291. return false, 0, 0, err
  1292. }
  1293. for _, n := range nodes.Nodes {
  1294. if n.ManagerStatus != nil {
  1295. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_REACHABLE {
  1296. reachable++
  1297. if n.ID == c.node.NodeID() {
  1298. current = true
  1299. }
  1300. }
  1301. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_UNREACHABLE {
  1302. unreachable++
  1303. }
  1304. }
  1305. }
  1306. return
  1307. }
  1308. func validateAndSanitizeInitRequest(req *types.InitRequest) error {
  1309. var err error
  1310. req.ListenAddr, err = validateAddr(req.ListenAddr)
  1311. if err != nil {
  1312. return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
  1313. }
  1314. spec := &req.Spec
  1315. // provide sane defaults instead of erroring
  1316. if spec.Name == "" {
  1317. spec.Name = "default"
  1318. }
  1319. if spec.Raft.SnapshotInterval == 0 {
  1320. spec.Raft.SnapshotInterval = defaultSpec.Raft.SnapshotInterval
  1321. }
  1322. if spec.Raft.LogEntriesForSlowFollowers == 0 {
  1323. spec.Raft.LogEntriesForSlowFollowers = defaultSpec.Raft.LogEntriesForSlowFollowers
  1324. }
  1325. if spec.Raft.ElectionTick == 0 {
  1326. spec.Raft.ElectionTick = defaultSpec.Raft.ElectionTick
  1327. }
  1328. if spec.Raft.HeartbeatTick == 0 {
  1329. spec.Raft.HeartbeatTick = defaultSpec.Raft.HeartbeatTick
  1330. }
  1331. if spec.Dispatcher.HeartbeatPeriod == 0 {
  1332. spec.Dispatcher.HeartbeatPeriod = defaultSpec.Dispatcher.HeartbeatPeriod
  1333. }
  1334. if spec.CAConfig.NodeCertExpiry == 0 {
  1335. spec.CAConfig.NodeCertExpiry = defaultSpec.CAConfig.NodeCertExpiry
  1336. }
  1337. if spec.Orchestration.TaskHistoryRetentionLimit == 0 {
  1338. spec.Orchestration.TaskHistoryRetentionLimit = defaultSpec.Orchestration.TaskHistoryRetentionLimit
  1339. }
  1340. return nil
  1341. }
  1342. func validateAndSanitizeJoinRequest(req *types.JoinRequest) error {
  1343. var err error
  1344. req.ListenAddr, err = validateAddr(req.ListenAddr)
  1345. if err != nil {
  1346. return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
  1347. }
  1348. if len(req.RemoteAddrs) == 0 {
  1349. return fmt.Errorf("at least 1 RemoteAddr is required to join")
  1350. }
  1351. for i := range req.RemoteAddrs {
  1352. req.RemoteAddrs[i], err = validateAddr(req.RemoteAddrs[i])
  1353. if err != nil {
  1354. return fmt.Errorf("invalid remoteAddr %q: %v", req.RemoteAddrs[i], err)
  1355. }
  1356. }
  1357. return nil
  1358. }
  1359. func validateAddr(addr string) (string, error) {
  1360. if addr == "" {
  1361. return addr, fmt.Errorf("invalid empty address")
  1362. }
  1363. newaddr, err := opts.ParseTCPAddr(addr, defaultAddr)
  1364. if err != nil {
  1365. return addr, nil
  1366. }
  1367. return strings.TrimPrefix(newaddr, "tcp://"), nil
  1368. }
  1369. func initClusterSpec(node *node, spec types.Spec) error {
  1370. ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
  1371. for conn := range node.ListenControlSocket(ctx) {
  1372. if ctx.Err() != nil {
  1373. return ctx.Err()
  1374. }
  1375. if conn != nil {
  1376. client := swarmapi.NewControlClient(conn)
  1377. var cluster *swarmapi.Cluster
  1378. for i := 0; ; i++ {
  1379. lcr, err := client.ListClusters(ctx, &swarmapi.ListClustersRequest{})
  1380. if err != nil {
  1381. return fmt.Errorf("error on listing clusters: %v", err)
  1382. }
  1383. if len(lcr.Clusters) == 0 {
  1384. if i < 10 {
  1385. time.Sleep(200 * time.Millisecond)
  1386. continue
  1387. }
  1388. return fmt.Errorf("empty list of clusters was returned")
  1389. }
  1390. cluster = lcr.Clusters[0]
  1391. break
  1392. }
  1393. newspec, err := convert.SwarmSpecToGRPC(spec)
  1394. if err != nil {
  1395. return fmt.Errorf("error updating cluster settings: %v", err)
  1396. }
  1397. _, err = client.UpdateCluster(ctx, &swarmapi.UpdateClusterRequest{
  1398. ClusterID: cluster.ID,
  1399. ClusterVersion: &cluster.Meta.Version,
  1400. Spec: &newspec,
  1401. })
  1402. if err != nil {
  1403. return fmt.Errorf("error updating cluster settings: %v", err)
  1404. }
  1405. return nil
  1406. }
  1407. }
  1408. return ctx.Err()
  1409. }