cluster.go 41 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615
  1. package cluster
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io/ioutil"
  6. "net"
  7. "os"
  8. "path/filepath"
  9. "runtime"
  10. "strings"
  11. "sync"
  12. "time"
  13. "google.golang.org/grpc"
  14. "github.com/Sirupsen/logrus"
  15. "github.com/docker/docker/api/errors"
  16. apitypes "github.com/docker/docker/api/types"
  17. "github.com/docker/docker/api/types/filters"
  18. "github.com/docker/docker/api/types/network"
  19. types "github.com/docker/docker/api/types/swarm"
  20. "github.com/docker/docker/daemon/cluster/convert"
  21. executorpkg "github.com/docker/docker/daemon/cluster/executor"
  22. "github.com/docker/docker/daemon/cluster/executor/container"
  23. "github.com/docker/docker/opts"
  24. "github.com/docker/docker/pkg/ioutils"
  25. "github.com/docker/docker/pkg/signal"
  26. "github.com/docker/docker/runconfig"
  27. swarmapi "github.com/docker/swarmkit/api"
  28. swarmnode "github.com/docker/swarmkit/node"
  29. "golang.org/x/net/context"
  30. )
  31. const swarmDirName = "swarm"
  32. const controlSocket = "control.sock"
  33. const swarmConnectTimeout = 20 * time.Second
  34. const swarmRequestTimeout = 20 * time.Second
  35. const stateFile = "docker-state.json"
  36. const defaultAddr = "0.0.0.0:2377"
  37. const (
  38. initialReconnectDelay = 100 * time.Millisecond
  39. maxReconnectDelay = 30 * time.Second
  40. )
  41. // ErrNoSwarm is returned on leaving a cluster that was never initialized
  42. var ErrNoSwarm = fmt.Errorf("This node is not part of a swarm")
  43. // ErrSwarmExists is returned on initialize or join request for a cluster that has already been activated
  44. var ErrSwarmExists = fmt.Errorf("This node is already part of a swarm. Use \"docker swarm leave\" to leave this swarm and join another one.")
  45. // ErrPendingSwarmExists is returned on initialize or join request for a cluster that is already processing a similar request but has not succeeded yet.
  46. var ErrPendingSwarmExists = fmt.Errorf("This node is processing an existing join request that has not succeeded yet. Use \"docker swarm leave\" to cancel the current request.")
  47. // ErrSwarmJoinTimeoutReached is returned when cluster join could not complete before timeout was reached.
  48. var ErrSwarmJoinTimeoutReached = fmt.Errorf("Timeout was reached before node was joined. The attempt to join the swarm will continue in the background. Use the \"docker info\" command to see the current swarm status of your node.")
  49. // NetworkSubnetsProvider exposes functions for retrieving the subnets
  50. // of networks managed by Docker, so they can be filtered.
  51. type NetworkSubnetsProvider interface {
  52. V4Subnets() []net.IPNet
  53. V6Subnets() []net.IPNet
  54. }
  55. // Config provides values for Cluster.
  56. type Config struct {
  57. Root string
  58. Name string
  59. Backend executorpkg.Backend
  60. NetworkSubnetsProvider NetworkSubnetsProvider
  61. // DefaultAdvertiseAddr is the default host/IP or network interface to use
  62. // if no AdvertiseAddr value is specified.
  63. DefaultAdvertiseAddr string
  64. // path to store runtime state, such as the swarm control socket
  65. RuntimeRoot string
  66. }
  67. // Cluster provides capabilities to participate in a cluster as a worker or a
  68. // manager.
  69. type Cluster struct {
  70. sync.RWMutex
  71. *node
  72. root string
  73. runtimeRoot string
  74. config Config
  75. configEvent chan struct{} // todo: make this array and goroutine safe
  76. actualLocalAddr string // after resolution, not persisted
  77. stop bool
  78. err error
  79. cancelDelay func()
  80. attachers map[string]*attacher
  81. }
  82. // attacher manages the in-memory attachment state of a container
  83. // attachment to a global scope network managed by swarm manager. It
  84. // helps in identifying the attachment ID via the taskID and the
  85. // corresponding attachment configuration obtained from the manager.
  86. type attacher struct {
  87. taskID string
  88. config *network.NetworkingConfig
  89. attachWaitCh chan *network.NetworkingConfig
  90. attachCompleteCh chan struct{}
  91. detachWaitCh chan struct{}
  92. }
  93. type node struct {
  94. *swarmnode.Node
  95. done chan struct{}
  96. ready bool
  97. conn *grpc.ClientConn
  98. client swarmapi.ControlClient
  99. reconnectDelay time.Duration
  100. config nodeStartConfig
  101. }
  102. // nodeStartConfig holds configuration needed to start a new node. Exported
  103. // fields of this structure are saved to disk in json. Unexported fields
  104. // contain data that shouldn't be persisted between daemon reloads.
  105. type nodeStartConfig struct {
  106. // LocalAddr is this machine's local IP or hostname, if specified.
  107. LocalAddr string
  108. // RemoteAddr is the address that was given to "swarm join". It is used
  109. // to find LocalAddr if necessary.
  110. RemoteAddr string
  111. // ListenAddr is the address we bind to, including a port.
  112. ListenAddr string
  113. // AdvertiseAddr is the address other nodes should connect to,
  114. // including a port.
  115. AdvertiseAddr string
  116. joinAddr string
  117. forceNewCluster bool
  118. joinToken string
  119. }
  120. // New creates a new Cluster instance using provided config.
  121. func New(config Config) (*Cluster, error) {
  122. root := filepath.Join(config.Root, swarmDirName)
  123. if err := os.MkdirAll(root, 0700); err != nil {
  124. return nil, err
  125. }
  126. if config.RuntimeRoot == "" {
  127. config.RuntimeRoot = root
  128. }
  129. if err := os.MkdirAll(config.RuntimeRoot, 0700); err != nil {
  130. return nil, err
  131. }
  132. c := &Cluster{
  133. root: root,
  134. config: config,
  135. configEvent: make(chan struct{}, 10),
  136. runtimeRoot: config.RuntimeRoot,
  137. attachers: make(map[string]*attacher),
  138. }
  139. nodeConfig, err := c.loadState()
  140. if err != nil {
  141. if os.IsNotExist(err) {
  142. return c, nil
  143. }
  144. return nil, err
  145. }
  146. n, err := c.startNewNode(*nodeConfig)
  147. if err != nil {
  148. return nil, err
  149. }
  150. select {
  151. case <-time.After(swarmConnectTimeout):
  152. logrus.Error("swarm component could not be started before timeout was reached")
  153. case <-n.Ready():
  154. case <-n.done:
  155. return nil, fmt.Errorf("swarm component could not be started: %v", c.err)
  156. }
  157. go c.reconnectOnFailure(n)
  158. return c, nil
  159. }
  160. func (c *Cluster) loadState() (*nodeStartConfig, error) {
  161. dt, err := ioutil.ReadFile(filepath.Join(c.root, stateFile))
  162. if err != nil {
  163. return nil, err
  164. }
  165. // missing certificate means no actual state to restore from
  166. if _, err := os.Stat(filepath.Join(c.root, "certificates/swarm-node.crt")); err != nil {
  167. if os.IsNotExist(err) {
  168. c.clearState()
  169. }
  170. return nil, err
  171. }
  172. var st nodeStartConfig
  173. if err := json.Unmarshal(dt, &st); err != nil {
  174. return nil, err
  175. }
  176. return &st, nil
  177. }
  178. func (c *Cluster) saveState(config nodeStartConfig) error {
  179. dt, err := json.Marshal(config)
  180. if err != nil {
  181. return err
  182. }
  183. return ioutils.AtomicWriteFile(filepath.Join(c.root, stateFile), dt, 0600)
  184. }
  185. func (c *Cluster) reconnectOnFailure(n *node) {
  186. for {
  187. <-n.done
  188. c.Lock()
  189. if c.stop || c.node != nil {
  190. c.Unlock()
  191. return
  192. }
  193. n.reconnectDelay *= 2
  194. if n.reconnectDelay > maxReconnectDelay {
  195. n.reconnectDelay = maxReconnectDelay
  196. }
  197. logrus.Warnf("Restarting swarm in %.2f seconds", n.reconnectDelay.Seconds())
  198. delayCtx, cancel := context.WithTimeout(context.Background(), n.reconnectDelay)
  199. c.cancelDelay = cancel
  200. c.Unlock()
  201. <-delayCtx.Done()
  202. if delayCtx.Err() != context.DeadlineExceeded {
  203. return
  204. }
  205. c.Lock()
  206. if c.node != nil {
  207. c.Unlock()
  208. return
  209. }
  210. var err error
  211. config := n.config
  212. config.RemoteAddr = c.getRemoteAddress()
  213. config.joinAddr = config.RemoteAddr
  214. n, err = c.startNewNode(config)
  215. if err != nil {
  216. c.err = err
  217. close(n.done)
  218. }
  219. c.Unlock()
  220. }
  221. }
  222. func (c *Cluster) startNewNode(conf nodeStartConfig) (*node, error) {
  223. if err := c.config.Backend.IsSwarmCompatible(); err != nil {
  224. return nil, err
  225. }
  226. actualLocalAddr := conf.LocalAddr
  227. if actualLocalAddr == "" {
  228. // If localAddr was not specified, resolve it automatically
  229. // based on the route to joinAddr. localAddr can only be left
  230. // empty on "join".
  231. listenHost, _, err := net.SplitHostPort(conf.ListenAddr)
  232. if err != nil {
  233. return nil, fmt.Errorf("could not parse listen address: %v", err)
  234. }
  235. listenAddrIP := net.ParseIP(listenHost)
  236. if listenAddrIP == nil || !listenAddrIP.IsUnspecified() {
  237. actualLocalAddr = listenHost
  238. } else {
  239. if conf.RemoteAddr == "" {
  240. // Should never happen except using swarms created by
  241. // old versions that didn't save remoteAddr.
  242. conf.RemoteAddr = "8.8.8.8:53"
  243. }
  244. conn, err := net.Dial("udp", conf.RemoteAddr)
  245. if err != nil {
  246. return nil, fmt.Errorf("could not find local IP address: %v", err)
  247. }
  248. localHostPort := conn.LocalAddr().String()
  249. actualLocalAddr, _, _ = net.SplitHostPort(localHostPort)
  250. conn.Close()
  251. }
  252. }
  253. var control string
  254. if runtime.GOOS == "windows" {
  255. control = `\\.\pipe\` + controlSocket
  256. } else {
  257. control = filepath.Join(c.runtimeRoot, controlSocket)
  258. }
  259. c.node = nil
  260. c.cancelDelay = nil
  261. c.stop = false
  262. n, err := swarmnode.New(&swarmnode.Config{
  263. Hostname: c.config.Name,
  264. ForceNewCluster: conf.forceNewCluster,
  265. ListenControlAPI: control,
  266. ListenRemoteAPI: conf.ListenAddr,
  267. AdvertiseRemoteAPI: conf.AdvertiseAddr,
  268. JoinAddr: conf.joinAddr,
  269. StateDir: c.root,
  270. JoinToken: conf.joinToken,
  271. Executor: container.NewExecutor(c.config.Backend),
  272. HeartbeatTick: 1,
  273. ElectionTick: 3,
  274. })
  275. if err != nil {
  276. return nil, err
  277. }
  278. ctx := context.Background()
  279. if err := n.Start(ctx); err != nil {
  280. return nil, err
  281. }
  282. node := &node{
  283. Node: n,
  284. done: make(chan struct{}),
  285. reconnectDelay: initialReconnectDelay,
  286. config: conf,
  287. }
  288. c.node = node
  289. c.actualLocalAddr = actualLocalAddr // not saved
  290. c.saveState(conf)
  291. c.config.Backend.SetClusterProvider(c)
  292. go func() {
  293. err := n.Err(ctx)
  294. if err != nil {
  295. logrus.Errorf("cluster exited with error: %v", err)
  296. }
  297. c.Lock()
  298. c.node = nil
  299. c.err = err
  300. c.Unlock()
  301. close(node.done)
  302. }()
  303. go func() {
  304. select {
  305. case <-n.Ready():
  306. c.Lock()
  307. node.ready = true
  308. c.err = nil
  309. c.Unlock()
  310. case <-ctx.Done():
  311. }
  312. c.configEvent <- struct{}{}
  313. }()
  314. go func() {
  315. for conn := range n.ListenControlSocket(ctx) {
  316. c.Lock()
  317. if node.conn != conn {
  318. if conn == nil {
  319. node.client = nil
  320. } else {
  321. node.client = swarmapi.NewControlClient(conn)
  322. }
  323. }
  324. node.conn = conn
  325. c.Unlock()
  326. c.configEvent <- struct{}{}
  327. }
  328. }()
  329. return node, nil
  330. }
  331. // Init initializes new cluster from user provided request.
  332. func (c *Cluster) Init(req types.InitRequest) (string, error) {
  333. c.Lock()
  334. if node := c.node; node != nil {
  335. if !req.ForceNewCluster {
  336. c.Unlock()
  337. return "", ErrSwarmExists
  338. }
  339. if err := c.stopNode(); err != nil {
  340. c.Unlock()
  341. return "", err
  342. }
  343. }
  344. if err := validateAndSanitizeInitRequest(&req); err != nil {
  345. c.Unlock()
  346. return "", err
  347. }
  348. listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
  349. if err != nil {
  350. c.Unlock()
  351. return "", err
  352. }
  353. advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
  354. if err != nil {
  355. c.Unlock()
  356. return "", err
  357. }
  358. localAddr := listenHost
  359. // If the advertise address is not one of the system's
  360. // addresses, we also require a listen address.
  361. listenAddrIP := net.ParseIP(listenHost)
  362. if listenAddrIP != nil && listenAddrIP.IsUnspecified() {
  363. advertiseIP := net.ParseIP(advertiseHost)
  364. if advertiseIP == nil {
  365. // not an IP
  366. c.Unlock()
  367. return "", errMustSpecifyListenAddr
  368. }
  369. systemIPs := listSystemIPs()
  370. found := false
  371. for _, systemIP := range systemIPs {
  372. if systemIP.Equal(advertiseIP) {
  373. found = true
  374. break
  375. }
  376. }
  377. if !found {
  378. c.Unlock()
  379. return "", errMustSpecifyListenAddr
  380. }
  381. localAddr = advertiseIP.String()
  382. }
  383. // todo: check current state existing
  384. n, err := c.startNewNode(nodeStartConfig{
  385. forceNewCluster: req.ForceNewCluster,
  386. LocalAddr: localAddr,
  387. ListenAddr: net.JoinHostPort(listenHost, listenPort),
  388. AdvertiseAddr: net.JoinHostPort(advertiseHost, advertisePort),
  389. })
  390. if err != nil {
  391. c.Unlock()
  392. return "", err
  393. }
  394. c.Unlock()
  395. select {
  396. case <-n.Ready():
  397. if err := initClusterSpec(n, req.Spec); err != nil {
  398. return "", err
  399. }
  400. go c.reconnectOnFailure(n)
  401. return n.NodeID(), nil
  402. case <-n.done:
  403. c.RLock()
  404. defer c.RUnlock()
  405. if !req.ForceNewCluster { // if failure on first attempt don't keep state
  406. if err := c.clearState(); err != nil {
  407. return "", err
  408. }
  409. }
  410. return "", c.err
  411. }
  412. }
  413. // Join makes current Cluster part of an existing swarm cluster.
  414. func (c *Cluster) Join(req types.JoinRequest) error {
  415. c.Lock()
  416. if node := c.node; node != nil {
  417. c.Unlock()
  418. return ErrSwarmExists
  419. }
  420. if err := validateAndSanitizeJoinRequest(&req); err != nil {
  421. c.Unlock()
  422. return err
  423. }
  424. listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
  425. if err != nil {
  426. c.Unlock()
  427. return err
  428. }
  429. var advertiseAddr string
  430. if req.AdvertiseAddr != "" {
  431. advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
  432. // For joining, we don't need to provide an advertise address,
  433. // since the remote side can detect it.
  434. if err == nil {
  435. advertiseAddr = net.JoinHostPort(advertiseHost, advertisePort)
  436. }
  437. }
  438. // todo: check current state existing
  439. n, err := c.startNewNode(nodeStartConfig{
  440. RemoteAddr: req.RemoteAddrs[0],
  441. ListenAddr: net.JoinHostPort(listenHost, listenPort),
  442. AdvertiseAddr: advertiseAddr,
  443. joinAddr: req.RemoteAddrs[0],
  444. joinToken: req.JoinToken,
  445. })
  446. if err != nil {
  447. c.Unlock()
  448. return err
  449. }
  450. c.Unlock()
  451. select {
  452. case <-time.After(swarmConnectTimeout):
  453. // attempt to connect will continue in background, but reconnect only if it didn't fail
  454. go func() {
  455. select {
  456. case <-n.Ready():
  457. c.reconnectOnFailure(n)
  458. case <-n.done:
  459. logrus.Errorf("failed to join the cluster: %+v", c.err)
  460. }
  461. }()
  462. return ErrSwarmJoinTimeoutReached
  463. case <-n.Ready():
  464. go c.reconnectOnFailure(n)
  465. return nil
  466. case <-n.done:
  467. c.RLock()
  468. defer c.RUnlock()
  469. return c.err
  470. }
  471. }
  472. // stopNode is a helper that stops the active c.node and waits until it has
  473. // shut down. Call while keeping the cluster lock.
  474. func (c *Cluster) stopNode() error {
  475. if c.node == nil {
  476. return nil
  477. }
  478. c.stop = true
  479. if c.cancelDelay != nil {
  480. c.cancelDelay()
  481. c.cancelDelay = nil
  482. }
  483. node := c.node
  484. ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
  485. defer cancel()
  486. // TODO: can't hold lock on stop because it calls back to network
  487. c.Unlock()
  488. defer c.Lock()
  489. if err := node.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") {
  490. return err
  491. }
  492. <-node.done
  493. return nil
  494. }
  495. func removingManagerCausesLossOfQuorum(reachable, unreachable int) bool {
  496. return reachable-2 <= unreachable
  497. }
  498. func isLastManager(reachable, unreachable int) bool {
  499. return reachable == 1 && unreachable == 0
  500. }
  501. // Leave shuts down Cluster and removes current state.
  502. func (c *Cluster) Leave(force bool) error {
  503. c.Lock()
  504. node := c.node
  505. if node == nil {
  506. c.Unlock()
  507. return ErrNoSwarm
  508. }
  509. if node.Manager() != nil && !force {
  510. msg := "You are attempting to leave the swarm on a node that is participating as a manager. "
  511. if c.isActiveManager() {
  512. active, reachable, unreachable, err := c.managerStats()
  513. if err == nil {
  514. if active && removingManagerCausesLossOfQuorum(reachable, unreachable) {
  515. if isLastManager(reachable, unreachable) {
  516. msg += "Removing the last manager erases all current state of the swarm. Use `--force` to ignore this message. "
  517. c.Unlock()
  518. return fmt.Errorf(msg)
  519. }
  520. msg += fmt.Sprintf("Removing this node leaves %v managers out of %v. Without a Raft quorum your swarm will be inaccessible. ", reachable-1, reachable+unreachable)
  521. }
  522. }
  523. } else {
  524. msg += "Doing so may lose the consensus of your cluster. "
  525. }
  526. msg += "The only way to restore a swarm that has lost consensus is to reinitialize it with `--force-new-cluster`. Use `--force` to suppress this message."
  527. c.Unlock()
  528. return fmt.Errorf(msg)
  529. }
  530. if err := c.stopNode(); err != nil {
  531. logrus.Errorf("failed to shut down cluster node: %v", err)
  532. signal.DumpStacks("")
  533. c.Unlock()
  534. return err
  535. }
  536. c.Unlock()
  537. if nodeID := node.NodeID(); nodeID != "" {
  538. nodeContainers, err := c.listContainerForNode(nodeID)
  539. if err != nil {
  540. return err
  541. }
  542. for _, id := range nodeContainers {
  543. if err := c.config.Backend.ContainerRm(id, &apitypes.ContainerRmConfig{ForceRemove: true}); err != nil {
  544. logrus.Errorf("error removing %v: %v", id, err)
  545. }
  546. }
  547. }
  548. c.configEvent <- struct{}{}
  549. // todo: cleanup optional?
  550. if err := c.clearState(); err != nil {
  551. return err
  552. }
  553. return nil
  554. }
  555. func (c *Cluster) listContainerForNode(nodeID string) ([]string, error) {
  556. var ids []string
  557. filters := filters.NewArgs()
  558. filters.Add("label", fmt.Sprintf("com.docker.swarm.node.id=%s", nodeID))
  559. containers, err := c.config.Backend.Containers(&apitypes.ContainerListOptions{
  560. Filters: filters,
  561. })
  562. if err != nil {
  563. return []string{}, err
  564. }
  565. for _, c := range containers {
  566. ids = append(ids, c.ID)
  567. }
  568. return ids, nil
  569. }
  570. func (c *Cluster) clearState() error {
  571. // todo: backup this data instead of removing?
  572. if err := os.RemoveAll(c.root); err != nil {
  573. return err
  574. }
  575. if err := os.MkdirAll(c.root, 0700); err != nil {
  576. return err
  577. }
  578. c.config.Backend.SetClusterProvider(nil)
  579. return nil
  580. }
  581. func (c *Cluster) getRequestContext() (context.Context, func()) { // TODO: not needed when requests don't block on qourum lost
  582. return context.WithTimeout(context.Background(), swarmRequestTimeout)
  583. }
  584. // Inspect retrieves the configuration properties of a managed swarm cluster.
  585. func (c *Cluster) Inspect() (types.Swarm, error) {
  586. c.RLock()
  587. defer c.RUnlock()
  588. if !c.isActiveManager() {
  589. return types.Swarm{}, c.errNoManager()
  590. }
  591. ctx, cancel := c.getRequestContext()
  592. defer cancel()
  593. swarm, err := getSwarm(ctx, c.client)
  594. if err != nil {
  595. return types.Swarm{}, err
  596. }
  597. return convert.SwarmFromGRPC(*swarm), nil
  598. }
  599. // Update updates configuration of a managed swarm cluster.
  600. func (c *Cluster) Update(version uint64, spec types.Spec, flags types.UpdateFlags) error {
  601. c.RLock()
  602. defer c.RUnlock()
  603. if !c.isActiveManager() {
  604. return c.errNoManager()
  605. }
  606. ctx, cancel := c.getRequestContext()
  607. defer cancel()
  608. swarm, err := getSwarm(ctx, c.client)
  609. if err != nil {
  610. return err
  611. }
  612. // In update, client should provide the complete spec of the swarm, including
  613. // Name and Labels. If a field is specified with 0 or nil, then the default value
  614. // will be used to swarmkit.
  615. clusterSpec, err := convert.SwarmSpecToGRPC(spec)
  616. if err != nil {
  617. return err
  618. }
  619. _, err = c.client.UpdateCluster(
  620. ctx,
  621. &swarmapi.UpdateClusterRequest{
  622. ClusterID: swarm.ID,
  623. Spec: &clusterSpec,
  624. ClusterVersion: &swarmapi.Version{
  625. Index: version,
  626. },
  627. Rotation: swarmapi.JoinTokenRotation{
  628. RotateWorkerToken: flags.RotateWorkerToken,
  629. RotateManagerToken: flags.RotateManagerToken,
  630. },
  631. },
  632. )
  633. return err
  634. }
  635. // IsManager returns true if Cluster is participating as a manager.
  636. func (c *Cluster) IsManager() bool {
  637. c.RLock()
  638. defer c.RUnlock()
  639. return c.isActiveManager()
  640. }
  641. // IsAgent returns true if Cluster is participating as a worker/agent.
  642. func (c *Cluster) IsAgent() bool {
  643. c.RLock()
  644. defer c.RUnlock()
  645. return c.node != nil && c.ready
  646. }
  647. // GetLocalAddress returns the local address.
  648. func (c *Cluster) GetLocalAddress() string {
  649. c.RLock()
  650. defer c.RUnlock()
  651. return c.actualLocalAddr
  652. }
  653. // GetListenAddress returns the listen address.
  654. func (c *Cluster) GetListenAddress() string {
  655. c.RLock()
  656. defer c.RUnlock()
  657. if c.node != nil {
  658. return c.node.config.ListenAddr
  659. }
  660. return ""
  661. }
  662. // GetAdvertiseAddress returns the remotely reachable address of this node.
  663. func (c *Cluster) GetAdvertiseAddress() string {
  664. c.RLock()
  665. defer c.RUnlock()
  666. if c.node != nil && c.node.config.AdvertiseAddr != "" {
  667. advertiseHost, _, _ := net.SplitHostPort(c.node.config.AdvertiseAddr)
  668. return advertiseHost
  669. }
  670. return c.actualLocalAddr
  671. }
  672. // GetRemoteAddress returns a known advertise address of a remote manager if
  673. // available.
  674. // todo: change to array/connect with info
  675. func (c *Cluster) GetRemoteAddress() string {
  676. c.RLock()
  677. defer c.RUnlock()
  678. return c.getRemoteAddress()
  679. }
  680. func (c *Cluster) getRemoteAddress() string {
  681. if c.node == nil {
  682. return ""
  683. }
  684. nodeID := c.node.NodeID()
  685. for _, r := range c.node.Remotes() {
  686. if r.NodeID != nodeID {
  687. return r.Addr
  688. }
  689. }
  690. return ""
  691. }
  692. // ListenClusterEvents returns a channel that receives messages on cluster
  693. // participation changes.
  694. // todo: make cancelable and accessible to multiple callers
  695. func (c *Cluster) ListenClusterEvents() <-chan struct{} {
  696. return c.configEvent
  697. }
  698. // Info returns information about the current cluster state.
  699. func (c *Cluster) Info() types.Info {
  700. info := types.Info{
  701. NodeAddr: c.GetAdvertiseAddress(),
  702. }
  703. c.RLock()
  704. defer c.RUnlock()
  705. if c.node == nil {
  706. info.LocalNodeState = types.LocalNodeStateInactive
  707. if c.cancelDelay != nil {
  708. info.LocalNodeState = types.LocalNodeStateError
  709. }
  710. } else {
  711. info.LocalNodeState = types.LocalNodeStatePending
  712. if c.ready == true {
  713. info.LocalNodeState = types.LocalNodeStateActive
  714. }
  715. }
  716. if c.err != nil {
  717. info.Error = c.err.Error()
  718. }
  719. ctx, cancel := c.getRequestContext()
  720. defer cancel()
  721. if c.isActiveManager() {
  722. info.ControlAvailable = true
  723. swarm, err := c.Inspect()
  724. if err != nil {
  725. info.Error = err.Error()
  726. }
  727. // Strip JoinTokens
  728. info.Cluster = swarm.ClusterInfo
  729. if r, err := c.client.ListNodes(ctx, &swarmapi.ListNodesRequest{}); err == nil {
  730. info.Nodes = len(r.Nodes)
  731. for _, n := range r.Nodes {
  732. if n.ManagerStatus != nil {
  733. info.Managers = info.Managers + 1
  734. }
  735. }
  736. }
  737. }
  738. if c.node != nil {
  739. for _, r := range c.node.Remotes() {
  740. info.RemoteManagers = append(info.RemoteManagers, types.Peer{NodeID: r.NodeID, Addr: r.Addr})
  741. }
  742. info.NodeID = c.node.NodeID()
  743. }
  744. return info
  745. }
  746. // isActiveManager should not be called without a read lock
  747. func (c *Cluster) isActiveManager() bool {
  748. return c.node != nil && c.conn != nil
  749. }
  750. // errNoManager returns error describing why manager commands can't be used.
  751. // Call with read lock.
  752. func (c *Cluster) errNoManager() error {
  753. if c.node == nil {
  754. return fmt.Errorf("This node is not a swarm manager. Use \"docker swarm init\" or \"docker swarm join\" to connect this node to swarm and try again.")
  755. }
  756. if c.node.Manager() != nil {
  757. return fmt.Errorf("This node is not a swarm manager. Manager is being prepared or has trouble connecting to the cluster.")
  758. }
  759. return fmt.Errorf("This node is not a swarm manager. Worker nodes can't be used to view or modify cluster state. Please run this command on a manager node or promote the current node to a manager.")
  760. }
  761. // GetServices returns all services of a managed swarm cluster.
  762. func (c *Cluster) GetServices(options apitypes.ServiceListOptions) ([]types.Service, error) {
  763. c.RLock()
  764. defer c.RUnlock()
  765. if !c.isActiveManager() {
  766. return nil, c.errNoManager()
  767. }
  768. filters, err := newListServicesFilters(options.Filters)
  769. if err != nil {
  770. return nil, err
  771. }
  772. ctx, cancel := c.getRequestContext()
  773. defer cancel()
  774. r, err := c.client.ListServices(
  775. ctx,
  776. &swarmapi.ListServicesRequest{Filters: filters})
  777. if err != nil {
  778. return nil, err
  779. }
  780. services := []types.Service{}
  781. for _, service := range r.Services {
  782. services = append(services, convert.ServiceFromGRPC(*service))
  783. }
  784. return services, nil
  785. }
  786. // CreateService creates a new service in a managed swarm cluster.
  787. func (c *Cluster) CreateService(s types.ServiceSpec, encodedAuth string) (string, error) {
  788. c.RLock()
  789. defer c.RUnlock()
  790. if !c.isActiveManager() {
  791. return "", c.errNoManager()
  792. }
  793. ctx, cancel := c.getRequestContext()
  794. defer cancel()
  795. err := c.populateNetworkID(ctx, c.client, &s)
  796. if err != nil {
  797. return "", err
  798. }
  799. serviceSpec, err := convert.ServiceSpecToGRPC(s)
  800. if err != nil {
  801. return "", err
  802. }
  803. if encodedAuth != "" {
  804. ctnr := serviceSpec.Task.GetContainer()
  805. if ctnr == nil {
  806. return "", fmt.Errorf("service does not use container tasks")
  807. }
  808. ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
  809. }
  810. r, err := c.client.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
  811. if err != nil {
  812. return "", err
  813. }
  814. return r.Service.ID, nil
  815. }
  816. // GetService returns a service based on an ID or name.
  817. func (c *Cluster) GetService(input string) (types.Service, error) {
  818. c.RLock()
  819. defer c.RUnlock()
  820. if !c.isActiveManager() {
  821. return types.Service{}, c.errNoManager()
  822. }
  823. ctx, cancel := c.getRequestContext()
  824. defer cancel()
  825. service, err := getService(ctx, c.client, input)
  826. if err != nil {
  827. return types.Service{}, err
  828. }
  829. return convert.ServiceFromGRPC(*service), nil
  830. }
  831. // UpdateService updates existing service to match new properties.
  832. func (c *Cluster) UpdateService(serviceIDOrName string, version uint64, spec types.ServiceSpec, encodedAuth string, registryAuthFrom string) error {
  833. c.RLock()
  834. defer c.RUnlock()
  835. if !c.isActiveManager() {
  836. return c.errNoManager()
  837. }
  838. ctx, cancel := c.getRequestContext()
  839. defer cancel()
  840. err := c.populateNetworkID(ctx, c.client, &spec)
  841. if err != nil {
  842. return err
  843. }
  844. serviceSpec, err := convert.ServiceSpecToGRPC(spec)
  845. if err != nil {
  846. return err
  847. }
  848. currentService, err := getService(ctx, c.client, serviceIDOrName)
  849. if err != nil {
  850. return err
  851. }
  852. if encodedAuth != "" {
  853. ctnr := serviceSpec.Task.GetContainer()
  854. if ctnr == nil {
  855. return fmt.Errorf("service does not use container tasks")
  856. }
  857. ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
  858. } else {
  859. // this is needed because if the encodedAuth isn't being updated then we
  860. // shouldn't lose it, and continue to use the one that was already present
  861. var ctnr *swarmapi.ContainerSpec
  862. switch registryAuthFrom {
  863. case apitypes.RegistryAuthFromSpec, "":
  864. ctnr = currentService.Spec.Task.GetContainer()
  865. case apitypes.RegistryAuthFromPreviousSpec:
  866. if currentService.PreviousSpec == nil {
  867. return fmt.Errorf("service does not have a previous spec")
  868. }
  869. ctnr = currentService.PreviousSpec.Task.GetContainer()
  870. default:
  871. return fmt.Errorf("unsupported registryAuthFromValue")
  872. }
  873. if ctnr == nil {
  874. return fmt.Errorf("service does not use container tasks")
  875. }
  876. serviceSpec.Task.GetContainer().PullOptions = ctnr.PullOptions
  877. }
  878. _, err = c.client.UpdateService(
  879. ctx,
  880. &swarmapi.UpdateServiceRequest{
  881. ServiceID: currentService.ID,
  882. Spec: &serviceSpec,
  883. ServiceVersion: &swarmapi.Version{
  884. Index: version,
  885. },
  886. },
  887. )
  888. return err
  889. }
  890. // RemoveService removes a service from a managed swarm cluster.
  891. func (c *Cluster) RemoveService(input string) error {
  892. c.RLock()
  893. defer c.RUnlock()
  894. if !c.isActiveManager() {
  895. return c.errNoManager()
  896. }
  897. ctx, cancel := c.getRequestContext()
  898. defer cancel()
  899. service, err := getService(ctx, c.client, input)
  900. if err != nil {
  901. return err
  902. }
  903. if _, err := c.client.RemoveService(ctx, &swarmapi.RemoveServiceRequest{ServiceID: service.ID}); err != nil {
  904. return err
  905. }
  906. return nil
  907. }
  908. // GetNodes returns a list of all nodes known to a cluster.
  909. func (c *Cluster) GetNodes(options apitypes.NodeListOptions) ([]types.Node, error) {
  910. c.RLock()
  911. defer c.RUnlock()
  912. if !c.isActiveManager() {
  913. return nil, c.errNoManager()
  914. }
  915. filters, err := newListNodesFilters(options.Filters)
  916. if err != nil {
  917. return nil, err
  918. }
  919. ctx, cancel := c.getRequestContext()
  920. defer cancel()
  921. r, err := c.client.ListNodes(
  922. ctx,
  923. &swarmapi.ListNodesRequest{Filters: filters})
  924. if err != nil {
  925. return nil, err
  926. }
  927. nodes := []types.Node{}
  928. for _, node := range r.Nodes {
  929. nodes = append(nodes, convert.NodeFromGRPC(*node))
  930. }
  931. return nodes, nil
  932. }
  933. // GetNode returns a node based on an ID or name.
  934. func (c *Cluster) GetNode(input string) (types.Node, error) {
  935. c.RLock()
  936. defer c.RUnlock()
  937. if !c.isActiveManager() {
  938. return types.Node{}, c.errNoManager()
  939. }
  940. ctx, cancel := c.getRequestContext()
  941. defer cancel()
  942. node, err := getNode(ctx, c.client, input)
  943. if err != nil {
  944. return types.Node{}, err
  945. }
  946. return convert.NodeFromGRPC(*node), nil
  947. }
  948. // UpdateNode updates existing nodes properties.
  949. func (c *Cluster) UpdateNode(nodeID string, version uint64, spec types.NodeSpec) error {
  950. c.RLock()
  951. defer c.RUnlock()
  952. if !c.isActiveManager() {
  953. return c.errNoManager()
  954. }
  955. nodeSpec, err := convert.NodeSpecToGRPC(spec)
  956. if err != nil {
  957. return err
  958. }
  959. ctx, cancel := c.getRequestContext()
  960. defer cancel()
  961. _, err = c.client.UpdateNode(
  962. ctx,
  963. &swarmapi.UpdateNodeRequest{
  964. NodeID: nodeID,
  965. Spec: &nodeSpec,
  966. NodeVersion: &swarmapi.Version{
  967. Index: version,
  968. },
  969. },
  970. )
  971. return err
  972. }
  973. // RemoveNode removes a node from a cluster
  974. func (c *Cluster) RemoveNode(input string, force bool) error {
  975. c.RLock()
  976. defer c.RUnlock()
  977. if !c.isActiveManager() {
  978. return c.errNoManager()
  979. }
  980. ctx, cancel := c.getRequestContext()
  981. defer cancel()
  982. node, err := getNode(ctx, c.client, input)
  983. if err != nil {
  984. return err
  985. }
  986. if _, err := c.client.RemoveNode(ctx, &swarmapi.RemoveNodeRequest{NodeID: node.ID, Force: force}); err != nil {
  987. return err
  988. }
  989. return nil
  990. }
  991. // GetTasks returns a list of tasks matching the filter options.
  992. func (c *Cluster) GetTasks(options apitypes.TaskListOptions) ([]types.Task, error) {
  993. c.RLock()
  994. defer c.RUnlock()
  995. if !c.isActiveManager() {
  996. return nil, c.errNoManager()
  997. }
  998. byName := func(filter filters.Args) error {
  999. if filter.Include("service") {
  1000. serviceFilters := filter.Get("service")
  1001. for _, serviceFilter := range serviceFilters {
  1002. service, err := c.GetService(serviceFilter)
  1003. if err != nil {
  1004. return err
  1005. }
  1006. filter.Del("service", serviceFilter)
  1007. filter.Add("service", service.ID)
  1008. }
  1009. }
  1010. if filter.Include("node") {
  1011. nodeFilters := filter.Get("node")
  1012. for _, nodeFilter := range nodeFilters {
  1013. node, err := c.GetNode(nodeFilter)
  1014. if err != nil {
  1015. return err
  1016. }
  1017. filter.Del("node", nodeFilter)
  1018. filter.Add("node", node.ID)
  1019. }
  1020. }
  1021. return nil
  1022. }
  1023. filters, err := newListTasksFilters(options.Filters, byName)
  1024. if err != nil {
  1025. return nil, err
  1026. }
  1027. ctx, cancel := c.getRequestContext()
  1028. defer cancel()
  1029. r, err := c.client.ListTasks(
  1030. ctx,
  1031. &swarmapi.ListTasksRequest{Filters: filters})
  1032. if err != nil {
  1033. return nil, err
  1034. }
  1035. tasks := []types.Task{}
  1036. for _, task := range r.Tasks {
  1037. if task.Spec.GetContainer() != nil {
  1038. tasks = append(tasks, convert.TaskFromGRPC(*task))
  1039. }
  1040. }
  1041. return tasks, nil
  1042. }
  1043. // GetTask returns a task by an ID.
  1044. func (c *Cluster) GetTask(input string) (types.Task, error) {
  1045. c.RLock()
  1046. defer c.RUnlock()
  1047. if !c.isActiveManager() {
  1048. return types.Task{}, c.errNoManager()
  1049. }
  1050. ctx, cancel := c.getRequestContext()
  1051. defer cancel()
  1052. task, err := getTask(ctx, c.client, input)
  1053. if err != nil {
  1054. return types.Task{}, err
  1055. }
  1056. return convert.TaskFromGRPC(*task), nil
  1057. }
  1058. // GetNetwork returns a cluster network by an ID.
  1059. func (c *Cluster) GetNetwork(input string) (apitypes.NetworkResource, error) {
  1060. c.RLock()
  1061. defer c.RUnlock()
  1062. if !c.isActiveManager() {
  1063. return apitypes.NetworkResource{}, c.errNoManager()
  1064. }
  1065. ctx, cancel := c.getRequestContext()
  1066. defer cancel()
  1067. network, err := getNetwork(ctx, c.client, input)
  1068. if err != nil {
  1069. return apitypes.NetworkResource{}, err
  1070. }
  1071. return convert.BasicNetworkFromGRPC(*network), nil
  1072. }
  1073. // GetNetworks returns all current cluster managed networks.
  1074. func (c *Cluster) GetNetworks() ([]apitypes.NetworkResource, error) {
  1075. c.RLock()
  1076. defer c.RUnlock()
  1077. if !c.isActiveManager() {
  1078. return nil, c.errNoManager()
  1079. }
  1080. ctx, cancel := c.getRequestContext()
  1081. defer cancel()
  1082. r, err := c.client.ListNetworks(ctx, &swarmapi.ListNetworksRequest{})
  1083. if err != nil {
  1084. return nil, err
  1085. }
  1086. var networks []apitypes.NetworkResource
  1087. for _, network := range r.Networks {
  1088. networks = append(networks, convert.BasicNetworkFromGRPC(*network))
  1089. }
  1090. return networks, nil
  1091. }
  1092. func attacherKey(target, containerID string) string {
  1093. return containerID + ":" + target
  1094. }
  1095. // UpdateAttachment signals the attachment config to the attachment
  1096. // waiter who is trying to start or attach the container to the
  1097. // network.
  1098. func (c *Cluster) UpdateAttachment(target, containerID string, config *network.NetworkingConfig) error {
  1099. c.RLock()
  1100. attacher, ok := c.attachers[attacherKey(target, containerID)]
  1101. c.RUnlock()
  1102. if !ok || attacher == nil {
  1103. return fmt.Errorf("could not find attacher for container %s to network %s", containerID, target)
  1104. }
  1105. attacher.attachWaitCh <- config
  1106. close(attacher.attachWaitCh)
  1107. return nil
  1108. }
  1109. // WaitForDetachment waits for the container to stop or detach from
  1110. // the network.
  1111. func (c *Cluster) WaitForDetachment(ctx context.Context, networkName, networkID, taskID, containerID string) error {
  1112. c.RLock()
  1113. attacher, ok := c.attachers[attacherKey(networkName, containerID)]
  1114. if !ok {
  1115. attacher, ok = c.attachers[attacherKey(networkID, containerID)]
  1116. }
  1117. if c.node == nil || c.node.Agent() == nil {
  1118. c.RUnlock()
  1119. return fmt.Errorf("invalid cluster node while waiting for detachment")
  1120. }
  1121. agent := c.node.Agent()
  1122. c.RUnlock()
  1123. if ok && attacher != nil &&
  1124. attacher.detachWaitCh != nil &&
  1125. attacher.attachCompleteCh != nil {
  1126. // Attachment may be in progress still so wait for
  1127. // attachment to complete.
  1128. select {
  1129. case <-attacher.attachCompleteCh:
  1130. case <-ctx.Done():
  1131. return ctx.Err()
  1132. }
  1133. if attacher.taskID == taskID {
  1134. select {
  1135. case <-attacher.detachWaitCh:
  1136. case <-ctx.Done():
  1137. return ctx.Err()
  1138. }
  1139. }
  1140. }
  1141. return agent.ResourceAllocator().DetachNetwork(ctx, taskID)
  1142. }
  1143. // AttachNetwork generates an attachment request towards the manager.
  1144. func (c *Cluster) AttachNetwork(target string, containerID string, addresses []string) (*network.NetworkingConfig, error) {
  1145. aKey := attacherKey(target, containerID)
  1146. c.Lock()
  1147. if c.node == nil || c.node.Agent() == nil {
  1148. c.Unlock()
  1149. return nil, fmt.Errorf("invalid cluster node while attaching to network")
  1150. }
  1151. if attacher, ok := c.attachers[aKey]; ok {
  1152. c.Unlock()
  1153. return attacher.config, nil
  1154. }
  1155. agent := c.node.Agent()
  1156. attachWaitCh := make(chan *network.NetworkingConfig)
  1157. detachWaitCh := make(chan struct{})
  1158. attachCompleteCh := make(chan struct{})
  1159. c.attachers[aKey] = &attacher{
  1160. attachWaitCh: attachWaitCh,
  1161. attachCompleteCh: attachCompleteCh,
  1162. detachWaitCh: detachWaitCh,
  1163. }
  1164. c.Unlock()
  1165. ctx, cancel := c.getRequestContext()
  1166. defer cancel()
  1167. taskID, err := agent.ResourceAllocator().AttachNetwork(ctx, containerID, target, addresses)
  1168. if err != nil {
  1169. c.Lock()
  1170. delete(c.attachers, aKey)
  1171. c.Unlock()
  1172. return nil, fmt.Errorf("Could not attach to network %s: %v", target, err)
  1173. }
  1174. c.Lock()
  1175. c.attachers[aKey].taskID = taskID
  1176. close(attachCompleteCh)
  1177. c.Unlock()
  1178. logrus.Debugf("Successfully attached to network %s with tid %s", target, taskID)
  1179. var config *network.NetworkingConfig
  1180. select {
  1181. case config = <-attachWaitCh:
  1182. case <-ctx.Done():
  1183. return nil, fmt.Errorf("attaching to network failed, make sure your network options are correct and check manager logs: %v", ctx.Err())
  1184. }
  1185. c.Lock()
  1186. c.attachers[aKey].config = config
  1187. c.Unlock()
  1188. return config, nil
  1189. }
  1190. // DetachNetwork unblocks the waiters waiting on WaitForDetachment so
  1191. // that a request to detach can be generated towards the manager.
  1192. func (c *Cluster) DetachNetwork(target string, containerID string) error {
  1193. aKey := attacherKey(target, containerID)
  1194. c.Lock()
  1195. attacher, ok := c.attachers[aKey]
  1196. delete(c.attachers, aKey)
  1197. c.Unlock()
  1198. if !ok {
  1199. return fmt.Errorf("could not find network attachment for container %s to network %s", containerID, target)
  1200. }
  1201. close(attacher.detachWaitCh)
  1202. return nil
  1203. }
  1204. // CreateNetwork creates a new cluster managed network.
  1205. func (c *Cluster) CreateNetwork(s apitypes.NetworkCreateRequest) (string, error) {
  1206. c.RLock()
  1207. defer c.RUnlock()
  1208. if !c.isActiveManager() {
  1209. return "", c.errNoManager()
  1210. }
  1211. if runconfig.IsPreDefinedNetwork(s.Name) {
  1212. err := fmt.Errorf("%s is a pre-defined network and cannot be created", s.Name)
  1213. return "", errors.NewRequestForbiddenError(err)
  1214. }
  1215. ctx, cancel := c.getRequestContext()
  1216. defer cancel()
  1217. networkSpec := convert.BasicNetworkCreateToGRPC(s)
  1218. r, err := c.client.CreateNetwork(ctx, &swarmapi.CreateNetworkRequest{Spec: &networkSpec})
  1219. if err != nil {
  1220. return "", err
  1221. }
  1222. return r.Network.ID, nil
  1223. }
  1224. // RemoveNetwork removes a cluster network.
  1225. func (c *Cluster) RemoveNetwork(input string) error {
  1226. c.RLock()
  1227. defer c.RUnlock()
  1228. if !c.isActiveManager() {
  1229. return c.errNoManager()
  1230. }
  1231. ctx, cancel := c.getRequestContext()
  1232. defer cancel()
  1233. network, err := getNetwork(ctx, c.client, input)
  1234. if err != nil {
  1235. return err
  1236. }
  1237. if _, err := c.client.RemoveNetwork(ctx, &swarmapi.RemoveNetworkRequest{NetworkID: network.ID}); err != nil {
  1238. return err
  1239. }
  1240. return nil
  1241. }
  1242. func (c *Cluster) populateNetworkID(ctx context.Context, client swarmapi.ControlClient, s *types.ServiceSpec) error {
  1243. // Always prefer NetworkAttachmentConfigs from TaskTemplate
  1244. // but fallback to service spec for backward compatibility
  1245. networks := s.TaskTemplate.Networks
  1246. if len(networks) == 0 {
  1247. networks = s.Networks
  1248. }
  1249. for i, n := range networks {
  1250. apiNetwork, err := getNetwork(ctx, client, n.Target)
  1251. if err != nil {
  1252. if ln, _ := c.config.Backend.FindNetwork(n.Target); ln != nil && !ln.Info().Dynamic() {
  1253. err = fmt.Errorf("network %s is not eligible for docker services", ln.Name())
  1254. return errors.NewRequestForbiddenError(err)
  1255. }
  1256. return err
  1257. }
  1258. networks[i].Target = apiNetwork.ID
  1259. }
  1260. return nil
  1261. }
  1262. func getNetwork(ctx context.Context, c swarmapi.ControlClient, input string) (*swarmapi.Network, error) {
  1263. // GetNetwork to match via full ID.
  1264. rg, err := c.GetNetwork(ctx, &swarmapi.GetNetworkRequest{NetworkID: input})
  1265. if err != nil {
  1266. // If any error (including NotFound), ListNetworks to match via ID prefix and full name.
  1267. rl, err := c.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: &swarmapi.ListNetworksRequest_Filters{Names: []string{input}}})
  1268. if err != nil || len(rl.Networks) == 0 {
  1269. rl, err = c.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: &swarmapi.ListNetworksRequest_Filters{IDPrefixes: []string{input}}})
  1270. }
  1271. if err != nil {
  1272. return nil, err
  1273. }
  1274. if len(rl.Networks) == 0 {
  1275. return nil, fmt.Errorf("network %s not found", input)
  1276. }
  1277. if l := len(rl.Networks); l > 1 {
  1278. return nil, fmt.Errorf("network %s is ambiguous (%d matches found)", input, l)
  1279. }
  1280. return rl.Networks[0], nil
  1281. }
  1282. return rg.Network, nil
  1283. }
  1284. // Cleanup stops active swarm node. This is run before daemon shutdown.
  1285. func (c *Cluster) Cleanup() {
  1286. c.Lock()
  1287. node := c.node
  1288. if node == nil {
  1289. c.Unlock()
  1290. return
  1291. }
  1292. defer c.Unlock()
  1293. if c.isActiveManager() {
  1294. active, reachable, unreachable, err := c.managerStats()
  1295. if err == nil {
  1296. singlenode := active && isLastManager(reachable, unreachable)
  1297. if active && !singlenode && removingManagerCausesLossOfQuorum(reachable, unreachable) {
  1298. logrus.Errorf("Leaving cluster with %v managers left out of %v. Raft quorum will be lost.", reachable-1, reachable+unreachable)
  1299. }
  1300. }
  1301. }
  1302. c.stopNode()
  1303. }
  1304. func (c *Cluster) managerStats() (current bool, reachable int, unreachable int, err error) {
  1305. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  1306. defer cancel()
  1307. nodes, err := c.client.ListNodes(ctx, &swarmapi.ListNodesRequest{})
  1308. if err != nil {
  1309. return false, 0, 0, err
  1310. }
  1311. for _, n := range nodes.Nodes {
  1312. if n.ManagerStatus != nil {
  1313. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_REACHABLE {
  1314. reachable++
  1315. if n.ID == c.node.NodeID() {
  1316. current = true
  1317. }
  1318. }
  1319. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_UNREACHABLE {
  1320. unreachable++
  1321. }
  1322. }
  1323. }
  1324. return
  1325. }
  1326. func validateAndSanitizeInitRequest(req *types.InitRequest) error {
  1327. var err error
  1328. req.ListenAddr, err = validateAddr(req.ListenAddr)
  1329. if err != nil {
  1330. return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
  1331. }
  1332. return nil
  1333. }
  1334. func validateAndSanitizeJoinRequest(req *types.JoinRequest) error {
  1335. var err error
  1336. req.ListenAddr, err = validateAddr(req.ListenAddr)
  1337. if err != nil {
  1338. return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
  1339. }
  1340. if len(req.RemoteAddrs) == 0 {
  1341. return fmt.Errorf("at least 1 RemoteAddr is required to join")
  1342. }
  1343. for i := range req.RemoteAddrs {
  1344. req.RemoteAddrs[i], err = validateAddr(req.RemoteAddrs[i])
  1345. if err != nil {
  1346. return fmt.Errorf("invalid remoteAddr %q: %v", req.RemoteAddrs[i], err)
  1347. }
  1348. }
  1349. return nil
  1350. }
  1351. func validateAddr(addr string) (string, error) {
  1352. if addr == "" {
  1353. return addr, fmt.Errorf("invalid empty address")
  1354. }
  1355. newaddr, err := opts.ParseTCPAddr(addr, defaultAddr)
  1356. if err != nil {
  1357. return addr, nil
  1358. }
  1359. return strings.TrimPrefix(newaddr, "tcp://"), nil
  1360. }
  1361. func initClusterSpec(node *node, spec types.Spec) error {
  1362. ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
  1363. for conn := range node.ListenControlSocket(ctx) {
  1364. if ctx.Err() != nil {
  1365. return ctx.Err()
  1366. }
  1367. if conn != nil {
  1368. client := swarmapi.NewControlClient(conn)
  1369. var cluster *swarmapi.Cluster
  1370. for i := 0; ; i++ {
  1371. lcr, err := client.ListClusters(ctx, &swarmapi.ListClustersRequest{})
  1372. if err != nil {
  1373. return fmt.Errorf("error on listing clusters: %v", err)
  1374. }
  1375. if len(lcr.Clusters) == 0 {
  1376. if i < 10 {
  1377. time.Sleep(200 * time.Millisecond)
  1378. continue
  1379. }
  1380. return fmt.Errorf("empty list of clusters was returned")
  1381. }
  1382. cluster = lcr.Clusters[0]
  1383. break
  1384. }
  1385. // In init, we take the initial default values from swarmkit, and merge
  1386. // any non nil or 0 value from spec to GRPC spec. This will leave the
  1387. // default value alone.
  1388. // Note that this is different from Update(), as in Update() we expect
  1389. // user to specify the complete spec of the cluster (as they already know
  1390. // the existing one and knows which field to update)
  1391. clusterSpec, err := convert.MergeSwarmSpecToGRPC(spec, cluster.Spec)
  1392. if err != nil {
  1393. return fmt.Errorf("error updating cluster settings: %v", err)
  1394. }
  1395. _, err = client.UpdateCluster(ctx, &swarmapi.UpdateClusterRequest{
  1396. ClusterID: cluster.ID,
  1397. ClusterVersion: &cluster.Meta.Version,
  1398. Spec: &clusterSpec,
  1399. })
  1400. if err != nil {
  1401. return fmt.Errorf("error updating cluster settings: %v", err)
  1402. }
  1403. return nil
  1404. }
  1405. }
  1406. return ctx.Err()
  1407. }