cluster.go 44 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715
  1. package cluster
  2. import (
  3. "crypto/x509"
  4. "encoding/json"
  5. "fmt"
  6. "io/ioutil"
  7. "net"
  8. "os"
  9. "path/filepath"
  10. "runtime"
  11. "strings"
  12. "sync"
  13. "time"
  14. "github.com/Sirupsen/logrus"
  15. apierrors "github.com/docker/docker/api/errors"
  16. apitypes "github.com/docker/docker/api/types"
  17. "github.com/docker/docker/api/types/filters"
  18. "github.com/docker/docker/api/types/network"
  19. types "github.com/docker/docker/api/types/swarm"
  20. "github.com/docker/docker/daemon/cluster/convert"
  21. executorpkg "github.com/docker/docker/daemon/cluster/executor"
  22. "github.com/docker/docker/daemon/cluster/executor/container"
  23. "github.com/docker/docker/opts"
  24. "github.com/docker/docker/pkg/ioutils"
  25. "github.com/docker/docker/pkg/signal"
  26. "github.com/docker/docker/runconfig"
  27. swarmapi "github.com/docker/swarmkit/api"
  28. swarmnode "github.com/docker/swarmkit/node"
  29. "github.com/pkg/errors"
  30. "golang.org/x/net/context"
  31. "google.golang.org/grpc"
  32. )
  33. const swarmDirName = "swarm"
  34. const controlSocket = "control.sock"
  35. const swarmConnectTimeout = 20 * time.Second
  36. const swarmRequestTimeout = 20 * time.Second
  37. const stateFile = "docker-state.json"
  38. const defaultAddr = "0.0.0.0:2377"
  39. const (
  40. initialReconnectDelay = 100 * time.Millisecond
  41. maxReconnectDelay = 30 * time.Second
  42. )
  43. // ErrNoSwarm is returned on leaving a cluster that was never initialized
  44. var ErrNoSwarm = fmt.Errorf("This node is not part of a swarm")
  45. // ErrSwarmExists is returned on initialize or join request for a cluster that has already been activated
  46. var ErrSwarmExists = fmt.Errorf("This node is already part of a swarm. Use \"docker swarm leave\" to leave this swarm and join another one.")
  47. // ErrPendingSwarmExists is returned on initialize or join request for a cluster that is already processing a similar request but has not succeeded yet.
  48. var ErrPendingSwarmExists = fmt.Errorf("This node is processing an existing join request that has not succeeded yet. Use \"docker swarm leave\" to cancel the current request.")
  49. // ErrSwarmJoinTimeoutReached is returned when cluster join could not complete before timeout was reached.
  50. var ErrSwarmJoinTimeoutReached = fmt.Errorf("Timeout was reached before node was joined. The attempt to join the swarm will continue in the background. Use the \"docker info\" command to see the current swarm status of your node.")
  51. // ErrSwarmLocked is returned if the swarm is encrypted and needs a key to unlock it.
  52. var ErrSwarmLocked = fmt.Errorf("Swarm is encrypted and needs to be unlocked before it can be used. Please use \"docker swarm unlock\" to unlock it.")
  53. // NetworkSubnetsProvider exposes functions for retrieving the subnets
  54. // of networks managed by Docker, so they can be filtered.
  55. type NetworkSubnetsProvider interface {
  56. V4Subnets() []net.IPNet
  57. V6Subnets() []net.IPNet
  58. }
  59. // Config provides values for Cluster.
  60. type Config struct {
  61. Root string
  62. Name string
  63. Backend executorpkg.Backend
  64. NetworkSubnetsProvider NetworkSubnetsProvider
  65. // DefaultAdvertiseAddr is the default host/IP or network interface to use
  66. // if no AdvertiseAddr value is specified.
  67. DefaultAdvertiseAddr string
  68. // path to store runtime state, such as the swarm control socket
  69. RuntimeRoot string
  70. }
  71. // Cluster provides capabilities to participate in a cluster as a worker or a
  72. // manager.
  73. type Cluster struct {
  74. sync.RWMutex
  75. *node
  76. root string
  77. runtimeRoot string
  78. config Config
  79. configEvent chan struct{} // todo: make this array and goroutine safe
  80. actualLocalAddr string // after resolution, not persisted
  81. stop bool
  82. err error
  83. cancelDelay func()
  84. attachers map[string]*attacher
  85. locked bool
  86. lastNodeConfig *nodeStartConfig
  87. }
  88. // attacher manages the in-memory attachment state of a container
  89. // attachment to a global scope network managed by swarm manager. It
  90. // helps in identifying the attachment ID via the taskID and the
  91. // corresponding attachment configuration obtained from the manager.
  92. type attacher struct {
  93. taskID string
  94. config *network.NetworkingConfig
  95. attachWaitCh chan *network.NetworkingConfig
  96. attachCompleteCh chan struct{}
  97. detachWaitCh chan struct{}
  98. }
  99. type node struct {
  100. *swarmnode.Node
  101. done chan struct{}
  102. ready bool
  103. conn *grpc.ClientConn
  104. client swarmapi.ControlClient
  105. reconnectDelay time.Duration
  106. config nodeStartConfig
  107. }
  108. // nodeStartConfig holds configuration needed to start a new node. Exported
  109. // fields of this structure are saved to disk in json. Unexported fields
  110. // contain data that shouldn't be persisted between daemon reloads.
  111. type nodeStartConfig struct {
  112. // LocalAddr is this machine's local IP or hostname, if specified.
  113. LocalAddr string
  114. // RemoteAddr is the address that was given to "swarm join". It is used
  115. // to find LocalAddr if necessary.
  116. RemoteAddr string
  117. // ListenAddr is the address we bind to, including a port.
  118. ListenAddr string
  119. // AdvertiseAddr is the address other nodes should connect to,
  120. // including a port.
  121. AdvertiseAddr string
  122. joinAddr string
  123. forceNewCluster bool
  124. joinToken string
  125. lockKey []byte
  126. }
  127. // New creates a new Cluster instance using provided config.
  128. func New(config Config) (*Cluster, error) {
  129. root := filepath.Join(config.Root, swarmDirName)
  130. if err := os.MkdirAll(root, 0700); err != nil {
  131. return nil, err
  132. }
  133. if config.RuntimeRoot == "" {
  134. config.RuntimeRoot = root
  135. }
  136. if err := os.MkdirAll(config.RuntimeRoot, 0700); err != nil {
  137. return nil, err
  138. }
  139. c := &Cluster{
  140. root: root,
  141. config: config,
  142. configEvent: make(chan struct{}, 10),
  143. runtimeRoot: config.RuntimeRoot,
  144. attachers: make(map[string]*attacher),
  145. }
  146. nodeConfig, err := c.loadState()
  147. if err != nil {
  148. if os.IsNotExist(err) {
  149. return c, nil
  150. }
  151. return nil, err
  152. }
  153. n, err := c.startNewNode(*nodeConfig)
  154. if err != nil {
  155. if errors.Cause(err) == ErrSwarmLocked {
  156. logrus.Warnf("swarm component could not be started: %v", err)
  157. c.locked = true
  158. c.lastNodeConfig = nodeConfig
  159. return c, nil
  160. }
  161. return nil, err
  162. }
  163. select {
  164. case <-time.After(swarmConnectTimeout):
  165. logrus.Error("swarm component could not be started before timeout was reached")
  166. case <-n.Ready():
  167. case <-n.done:
  168. return nil, fmt.Errorf("swarm component could not be started: %v", c.err)
  169. }
  170. go c.reconnectOnFailure(n)
  171. return c, nil
  172. }
  173. func (c *Cluster) loadState() (*nodeStartConfig, error) {
  174. dt, err := ioutil.ReadFile(filepath.Join(c.root, stateFile))
  175. if err != nil {
  176. return nil, err
  177. }
  178. // missing certificate means no actual state to restore from
  179. if _, err := os.Stat(filepath.Join(c.root, "certificates/swarm-node.crt")); err != nil {
  180. if os.IsNotExist(err) {
  181. c.clearState()
  182. }
  183. return nil, err
  184. }
  185. var st nodeStartConfig
  186. if err := json.Unmarshal(dt, &st); err != nil {
  187. return nil, err
  188. }
  189. return &st, nil
  190. }
  191. func (c *Cluster) saveState(config nodeStartConfig) error {
  192. dt, err := json.Marshal(config)
  193. if err != nil {
  194. return err
  195. }
  196. return ioutils.AtomicWriteFile(filepath.Join(c.root, stateFile), dt, 0600)
  197. }
  198. func (c *Cluster) reconnectOnFailure(n *node) {
  199. for {
  200. <-n.done
  201. c.Lock()
  202. if c.stop || c.node != nil {
  203. c.Unlock()
  204. return
  205. }
  206. n.reconnectDelay *= 2
  207. if n.reconnectDelay > maxReconnectDelay {
  208. n.reconnectDelay = maxReconnectDelay
  209. }
  210. logrus.Warnf("Restarting swarm in %.2f seconds", n.reconnectDelay.Seconds())
  211. delayCtx, cancel := context.WithTimeout(context.Background(), n.reconnectDelay)
  212. c.cancelDelay = cancel
  213. c.Unlock()
  214. <-delayCtx.Done()
  215. if delayCtx.Err() != context.DeadlineExceeded {
  216. return
  217. }
  218. c.Lock()
  219. if c.node != nil {
  220. c.Unlock()
  221. return
  222. }
  223. var err error
  224. config := n.config
  225. config.RemoteAddr = c.getRemoteAddress()
  226. config.joinAddr = config.RemoteAddr
  227. n, err = c.startNewNode(config)
  228. if err != nil {
  229. c.err = err
  230. close(n.done)
  231. }
  232. c.Unlock()
  233. }
  234. }
  235. func (c *Cluster) startNewNode(conf nodeStartConfig) (*node, error) {
  236. if err := c.config.Backend.IsSwarmCompatible(); err != nil {
  237. return nil, err
  238. }
  239. actualLocalAddr := conf.LocalAddr
  240. if actualLocalAddr == "" {
  241. // If localAddr was not specified, resolve it automatically
  242. // based on the route to joinAddr. localAddr can only be left
  243. // empty on "join".
  244. listenHost, _, err := net.SplitHostPort(conf.ListenAddr)
  245. if err != nil {
  246. return nil, fmt.Errorf("could not parse listen address: %v", err)
  247. }
  248. listenAddrIP := net.ParseIP(listenHost)
  249. if listenAddrIP == nil || !listenAddrIP.IsUnspecified() {
  250. actualLocalAddr = listenHost
  251. } else {
  252. if conf.RemoteAddr == "" {
  253. // Should never happen except using swarms created by
  254. // old versions that didn't save remoteAddr.
  255. conf.RemoteAddr = "8.8.8.8:53"
  256. }
  257. conn, err := net.Dial("udp", conf.RemoteAddr)
  258. if err != nil {
  259. return nil, fmt.Errorf("could not find local IP address: %v", err)
  260. }
  261. localHostPort := conn.LocalAddr().String()
  262. actualLocalAddr, _, _ = net.SplitHostPort(localHostPort)
  263. conn.Close()
  264. }
  265. }
  266. var control string
  267. if runtime.GOOS == "windows" {
  268. control = `\\.\pipe\` + controlSocket
  269. } else {
  270. control = filepath.Join(c.runtimeRoot, controlSocket)
  271. }
  272. c.node = nil
  273. c.cancelDelay = nil
  274. c.stop = false
  275. n, err := swarmnode.New(&swarmnode.Config{
  276. Hostname: c.config.Name,
  277. ForceNewCluster: conf.forceNewCluster,
  278. ListenControlAPI: control,
  279. ListenRemoteAPI: conf.ListenAddr,
  280. AdvertiseRemoteAPI: conf.AdvertiseAddr,
  281. JoinAddr: conf.joinAddr,
  282. StateDir: c.root,
  283. JoinToken: conf.joinToken,
  284. Executor: container.NewExecutor(c.config.Backend),
  285. HeartbeatTick: 1,
  286. ElectionTick: 3,
  287. UnlockKey: conf.lockKey,
  288. })
  289. if err != nil {
  290. err = detectLockedError(err)
  291. if errors.Cause(err) == ErrSwarmLocked {
  292. c.locked = true
  293. confClone := conf
  294. c.lastNodeConfig = &confClone
  295. }
  296. return nil, err
  297. }
  298. ctx := context.Background()
  299. if err := n.Start(ctx); err != nil {
  300. return nil, err
  301. }
  302. node := &node{
  303. Node: n,
  304. done: make(chan struct{}),
  305. reconnectDelay: initialReconnectDelay,
  306. config: conf,
  307. }
  308. c.node = node
  309. c.actualLocalAddr = actualLocalAddr // not saved
  310. c.saveState(conf)
  311. c.config.Backend.SetClusterProvider(c)
  312. go func() {
  313. err := n.Err(ctx)
  314. if err != nil {
  315. logrus.Errorf("cluster exited with error: %v", err)
  316. }
  317. c.Lock()
  318. c.node = nil
  319. c.err = err
  320. c.Unlock()
  321. close(node.done)
  322. }()
  323. go func() {
  324. select {
  325. case <-n.Ready():
  326. c.Lock()
  327. node.ready = true
  328. c.err = nil
  329. c.Unlock()
  330. case <-ctx.Done():
  331. }
  332. c.configEvent <- struct{}{}
  333. }()
  334. go func() {
  335. for conn := range n.ListenControlSocket(ctx) {
  336. c.Lock()
  337. if node.conn != conn {
  338. if conn == nil {
  339. node.client = nil
  340. } else {
  341. node.client = swarmapi.NewControlClient(conn)
  342. }
  343. }
  344. node.conn = conn
  345. c.Unlock()
  346. c.configEvent <- struct{}{}
  347. }
  348. }()
  349. return node, nil
  350. }
  351. // Init initializes new cluster from user provided request.
  352. func (c *Cluster) Init(req types.InitRequest) (string, error) {
  353. c.Lock()
  354. if node := c.node; node != nil || c.locked {
  355. if !req.ForceNewCluster {
  356. c.Unlock()
  357. return "", ErrSwarmExists
  358. }
  359. if err := c.stopNode(); err != nil {
  360. c.Unlock()
  361. return "", err
  362. }
  363. }
  364. if err := validateAndSanitizeInitRequest(&req); err != nil {
  365. c.Unlock()
  366. return "", err
  367. }
  368. listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
  369. if err != nil {
  370. c.Unlock()
  371. return "", err
  372. }
  373. advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
  374. if err != nil {
  375. c.Unlock()
  376. return "", err
  377. }
  378. localAddr := listenHost
  379. // If the advertise address is not one of the system's
  380. // addresses, we also require a listen address.
  381. listenAddrIP := net.ParseIP(listenHost)
  382. if listenAddrIP != nil && listenAddrIP.IsUnspecified() {
  383. advertiseIP := net.ParseIP(advertiseHost)
  384. if advertiseIP == nil {
  385. // not an IP
  386. c.Unlock()
  387. return "", errMustSpecifyListenAddr
  388. }
  389. systemIPs := listSystemIPs()
  390. found := false
  391. for _, systemIP := range systemIPs {
  392. if systemIP.Equal(advertiseIP) {
  393. found = true
  394. break
  395. }
  396. }
  397. if !found {
  398. c.Unlock()
  399. return "", errMustSpecifyListenAddr
  400. }
  401. localAddr = advertiseIP.String()
  402. }
  403. var key []byte
  404. if len(req.LockKey) > 0 {
  405. key = []byte(req.LockKey)
  406. }
  407. // todo: check current state existing
  408. n, err := c.startNewNode(nodeStartConfig{
  409. forceNewCluster: req.ForceNewCluster,
  410. LocalAddr: localAddr,
  411. ListenAddr: net.JoinHostPort(listenHost, listenPort),
  412. AdvertiseAddr: net.JoinHostPort(advertiseHost, advertisePort),
  413. lockKey: key,
  414. })
  415. if err != nil {
  416. c.Unlock()
  417. return "", err
  418. }
  419. c.Unlock()
  420. select {
  421. case <-n.Ready():
  422. if err := initClusterSpec(n, req.Spec); err != nil {
  423. return "", err
  424. }
  425. go c.reconnectOnFailure(n)
  426. return n.NodeID(), nil
  427. case <-n.done:
  428. c.RLock()
  429. defer c.RUnlock()
  430. if !req.ForceNewCluster { // if failure on first attempt don't keep state
  431. if err := c.clearState(); err != nil {
  432. return "", err
  433. }
  434. }
  435. return "", c.err
  436. }
  437. }
  438. // Join makes current Cluster part of an existing swarm cluster.
  439. func (c *Cluster) Join(req types.JoinRequest) error {
  440. c.Lock()
  441. if node := c.node; node != nil || c.locked {
  442. c.Unlock()
  443. return ErrSwarmExists
  444. }
  445. if err := validateAndSanitizeJoinRequest(&req); err != nil {
  446. c.Unlock()
  447. return err
  448. }
  449. listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
  450. if err != nil {
  451. c.Unlock()
  452. return err
  453. }
  454. var advertiseAddr string
  455. if req.AdvertiseAddr != "" {
  456. advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
  457. // For joining, we don't need to provide an advertise address,
  458. // since the remote side can detect it.
  459. if err == nil {
  460. advertiseAddr = net.JoinHostPort(advertiseHost, advertisePort)
  461. }
  462. }
  463. // todo: check current state existing
  464. n, err := c.startNewNode(nodeStartConfig{
  465. RemoteAddr: req.RemoteAddrs[0],
  466. ListenAddr: net.JoinHostPort(listenHost, listenPort),
  467. AdvertiseAddr: advertiseAddr,
  468. joinAddr: req.RemoteAddrs[0],
  469. joinToken: req.JoinToken,
  470. })
  471. if err != nil {
  472. c.Unlock()
  473. return err
  474. }
  475. c.Unlock()
  476. select {
  477. case <-time.After(swarmConnectTimeout):
  478. // attempt to connect will continue in background, but reconnect only if it didn't fail
  479. go func() {
  480. select {
  481. case <-n.Ready():
  482. c.reconnectOnFailure(n)
  483. case <-n.done:
  484. logrus.Errorf("failed to join the cluster: %+v", c.err)
  485. }
  486. }()
  487. return ErrSwarmJoinTimeoutReached
  488. case <-n.Ready():
  489. go c.reconnectOnFailure(n)
  490. return nil
  491. case <-n.done:
  492. c.RLock()
  493. defer c.RUnlock()
  494. return c.err
  495. }
  496. }
  497. // GetUnlockKey returns the unlock key for the swarm.
  498. func (c *Cluster) GetUnlockKey() (string, error) {
  499. c.RLock()
  500. defer c.RUnlock()
  501. if !c.isActiveManager() {
  502. return "", c.errNoManager()
  503. }
  504. ctx, cancel := c.getRequestContext()
  505. defer cancel()
  506. client := swarmapi.NewCAClient(c.conn)
  507. r, err := client.GetUnlockKey(ctx, &swarmapi.GetUnlockKeyRequest{})
  508. if err != nil {
  509. return "", err
  510. }
  511. return encryption.HumanReadableKey(r.UnlockKey), nil
  512. }
  513. // UnlockSwarm provides a key to decrypt data that is encrypted at rest.
  514. func (c *Cluster) UnlockSwarm(req types.UnlockRequest) error {
  515. if len(req.LockKey) == 0 {
  516. return errors.New("unlock key can't be empty")
  517. }
  518. c.Lock()
  519. if c.node != nil || c.locked != true {
  520. c.Unlock()
  521. return errors.New("swarm is not locked")
  522. }
  523. config := *c.lastNodeConfig
  524. config.lockKey = []byte(req.LockKey)
  525. n, err := c.startNewNode(config)
  526. if err != nil {
  527. c.Unlock()
  528. if errors.Cause(err) == ErrSwarmLocked {
  529. return errors.New("swarm could not be unlocked: invalid key provided")
  530. }
  531. return err
  532. }
  533. c.Unlock()
  534. select {
  535. case <-n.Ready():
  536. case <-n.done:
  537. return fmt.Errorf("swarm component could not be started: %v", c.err)
  538. }
  539. go c.reconnectOnFailure(n)
  540. return nil
  541. }
  542. // stopNode is a helper that stops the active c.node and waits until it has
  543. // shut down. Call while keeping the cluster lock.
  544. func (c *Cluster) stopNode() error {
  545. if c.node == nil {
  546. return nil
  547. }
  548. c.stop = true
  549. if c.cancelDelay != nil {
  550. c.cancelDelay()
  551. c.cancelDelay = nil
  552. }
  553. node := c.node
  554. ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
  555. defer cancel()
  556. // TODO: can't hold lock on stop because it calls back to network
  557. c.Unlock()
  558. defer c.Lock()
  559. if err := node.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") {
  560. return err
  561. }
  562. <-node.done
  563. return nil
  564. }
  565. func removingManagerCausesLossOfQuorum(reachable, unreachable int) bool {
  566. return reachable-2 <= unreachable
  567. }
  568. func isLastManager(reachable, unreachable int) bool {
  569. return reachable == 1 && unreachable == 0
  570. }
  571. // Leave shuts down Cluster and removes current state.
  572. func (c *Cluster) Leave(force bool) error {
  573. c.Lock()
  574. node := c.node
  575. if node == nil {
  576. if c.locked {
  577. c.locked = false
  578. c.lastNodeConfig = nil
  579. c.Unlock()
  580. } else {
  581. c.Unlock()
  582. return ErrNoSwarm
  583. }
  584. } else {
  585. if node.Manager() != nil && !force {
  586. msg := "You are attempting to leave the swarm on a node that is participating as a manager. "
  587. if c.isActiveManager() {
  588. active, reachable, unreachable, err := c.managerStats()
  589. if err == nil {
  590. if active && removingManagerCausesLossOfQuorum(reachable, unreachable) {
  591. if isLastManager(reachable, unreachable) {
  592. msg += "Removing the last manager erases all current state of the swarm. Use `--force` to ignore this message. "
  593. c.Unlock()
  594. return fmt.Errorf(msg)
  595. }
  596. msg += fmt.Sprintf("Removing this node leaves %v managers out of %v. Without a Raft quorum your swarm will be inaccessible. ", reachable-1, reachable+unreachable)
  597. }
  598. }
  599. } else {
  600. msg += "Doing so may lose the consensus of your cluster. "
  601. }
  602. msg += "The only way to restore a swarm that has lost consensus is to reinitialize it with `--force-new-cluster`. Use `--force` to suppress this message."
  603. c.Unlock()
  604. return fmt.Errorf(msg)
  605. }
  606. if err := c.stopNode(); err != nil {
  607. logrus.Errorf("failed to shut down cluster node: %v", err)
  608. signal.DumpStacks("")
  609. c.Unlock()
  610. return err
  611. }
  612. c.Unlock()
  613. if nodeID := node.NodeID(); nodeID != "" {
  614. nodeContainers, err := c.listContainerForNode(nodeID)
  615. if err != nil {
  616. return err
  617. }
  618. for _, id := range nodeContainers {
  619. if err := c.config.Backend.ContainerRm(id, &apitypes.ContainerRmConfig{ForceRemove: true}); err != nil {
  620. logrus.Errorf("error removing %v: %v", id, err)
  621. }
  622. }
  623. }
  624. }
  625. c.configEvent <- struct{}{}
  626. // todo: cleanup optional?
  627. if err := c.clearState(); err != nil {
  628. return err
  629. }
  630. return nil
  631. }
  632. func (c *Cluster) listContainerForNode(nodeID string) ([]string, error) {
  633. var ids []string
  634. filters := filters.NewArgs()
  635. filters.Add("label", fmt.Sprintf("com.docker.swarm.node.id=%s", nodeID))
  636. containers, err := c.config.Backend.Containers(&apitypes.ContainerListOptions{
  637. Filters: filters,
  638. })
  639. if err != nil {
  640. return []string{}, err
  641. }
  642. for _, c := range containers {
  643. ids = append(ids, c.ID)
  644. }
  645. return ids, nil
  646. }
  647. func (c *Cluster) clearState() error {
  648. // todo: backup this data instead of removing?
  649. if err := os.RemoveAll(c.root); err != nil {
  650. return err
  651. }
  652. if err := os.MkdirAll(c.root, 0700); err != nil {
  653. return err
  654. }
  655. c.config.Backend.SetClusterProvider(nil)
  656. return nil
  657. }
  658. func (c *Cluster) getRequestContext() (context.Context, func()) { // TODO: not needed when requests don't block on qourum lost
  659. return context.WithTimeout(context.Background(), swarmRequestTimeout)
  660. }
  661. // Inspect retrieves the configuration properties of a managed swarm cluster.
  662. func (c *Cluster) Inspect() (types.Swarm, error) {
  663. c.RLock()
  664. defer c.RUnlock()
  665. if !c.isActiveManager() {
  666. return types.Swarm{}, c.errNoManager()
  667. }
  668. ctx, cancel := c.getRequestContext()
  669. defer cancel()
  670. swarm, err := getSwarm(ctx, c.client)
  671. if err != nil {
  672. return types.Swarm{}, err
  673. }
  674. return convert.SwarmFromGRPC(*swarm), nil
  675. }
  676. // Update updates configuration of a managed swarm cluster.
  677. func (c *Cluster) Update(version uint64, spec types.Spec, flags types.UpdateFlags) error {
  678. c.RLock()
  679. defer c.RUnlock()
  680. if !c.isActiveManager() {
  681. return c.errNoManager()
  682. }
  683. ctx, cancel := c.getRequestContext()
  684. defer cancel()
  685. swarm, err := getSwarm(ctx, c.client)
  686. if err != nil {
  687. return err
  688. }
  689. // In update, client should provide the complete spec of the swarm, including
  690. // Name and Labels. If a field is specified with 0 or nil, then the default value
  691. // will be used to swarmkit.
  692. clusterSpec, err := convert.SwarmSpecToGRPC(spec)
  693. if err != nil {
  694. return err
  695. }
  696. _, err = c.client.UpdateCluster(
  697. ctx,
  698. &swarmapi.UpdateClusterRequest{
  699. ClusterID: swarm.ID,
  700. Spec: &clusterSpec,
  701. ClusterVersion: &swarmapi.Version{
  702. Index: version,
  703. },
  704. Rotation: swarmapi.JoinTokenRotation{
  705. RotateWorkerToken: flags.RotateWorkerToken,
  706. RotateManagerToken: flags.RotateManagerToken,
  707. },
  708. },
  709. )
  710. return err
  711. }
  712. // IsManager returns true if Cluster is participating as a manager.
  713. func (c *Cluster) IsManager() bool {
  714. c.RLock()
  715. defer c.RUnlock()
  716. return c.isActiveManager()
  717. }
  718. // IsAgent returns true if Cluster is participating as a worker/agent.
  719. func (c *Cluster) IsAgent() bool {
  720. c.RLock()
  721. defer c.RUnlock()
  722. return c.node != nil && c.ready
  723. }
  724. // GetLocalAddress returns the local address.
  725. func (c *Cluster) GetLocalAddress() string {
  726. c.RLock()
  727. defer c.RUnlock()
  728. return c.actualLocalAddr
  729. }
  730. // GetListenAddress returns the listen address.
  731. func (c *Cluster) GetListenAddress() string {
  732. c.RLock()
  733. defer c.RUnlock()
  734. if c.node != nil {
  735. return c.node.config.ListenAddr
  736. }
  737. return ""
  738. }
  739. // GetAdvertiseAddress returns the remotely reachable address of this node.
  740. func (c *Cluster) GetAdvertiseAddress() string {
  741. c.RLock()
  742. defer c.RUnlock()
  743. if c.node != nil && c.node.config.AdvertiseAddr != "" {
  744. advertiseHost, _, _ := net.SplitHostPort(c.node.config.AdvertiseAddr)
  745. return advertiseHost
  746. }
  747. return c.actualLocalAddr
  748. }
  749. // GetRemoteAddress returns a known advertise address of a remote manager if
  750. // available.
  751. // todo: change to array/connect with info
  752. func (c *Cluster) GetRemoteAddress() string {
  753. c.RLock()
  754. defer c.RUnlock()
  755. return c.getRemoteAddress()
  756. }
  757. func (c *Cluster) getRemoteAddress() string {
  758. if c.node == nil {
  759. return ""
  760. }
  761. nodeID := c.node.NodeID()
  762. for _, r := range c.node.Remotes() {
  763. if r.NodeID != nodeID {
  764. return r.Addr
  765. }
  766. }
  767. return ""
  768. }
  769. // ListenClusterEvents returns a channel that receives messages on cluster
  770. // participation changes.
  771. // todo: make cancelable and accessible to multiple callers
  772. func (c *Cluster) ListenClusterEvents() <-chan struct{} {
  773. return c.configEvent
  774. }
  775. // Info returns information about the current cluster state.
  776. func (c *Cluster) Info() types.Info {
  777. info := types.Info{
  778. NodeAddr: c.GetAdvertiseAddress(),
  779. }
  780. c.RLock()
  781. defer c.RUnlock()
  782. if c.node == nil {
  783. info.LocalNodeState = types.LocalNodeStateInactive
  784. if c.cancelDelay != nil {
  785. info.LocalNodeState = types.LocalNodeStateError
  786. }
  787. if c.locked {
  788. info.LocalNodeState = types.LocalNodeStateLocked
  789. }
  790. } else {
  791. info.LocalNodeState = types.LocalNodeStatePending
  792. if c.ready == true {
  793. info.LocalNodeState = types.LocalNodeStateActive
  794. }
  795. }
  796. if c.err != nil {
  797. info.Error = c.err.Error()
  798. }
  799. ctx, cancel := c.getRequestContext()
  800. defer cancel()
  801. if c.isActiveManager() {
  802. info.ControlAvailable = true
  803. swarm, err := c.Inspect()
  804. if err != nil {
  805. info.Error = err.Error()
  806. }
  807. // Strip JoinTokens
  808. info.Cluster = swarm.ClusterInfo
  809. if r, err := c.client.ListNodes(ctx, &swarmapi.ListNodesRequest{}); err == nil {
  810. info.Nodes = len(r.Nodes)
  811. for _, n := range r.Nodes {
  812. if n.ManagerStatus != nil {
  813. info.Managers = info.Managers + 1
  814. }
  815. }
  816. }
  817. }
  818. if c.node != nil {
  819. for _, r := range c.node.Remotes() {
  820. info.RemoteManagers = append(info.RemoteManagers, types.Peer{NodeID: r.NodeID, Addr: r.Addr})
  821. }
  822. info.NodeID = c.node.NodeID()
  823. }
  824. return info
  825. }
  826. // isActiveManager should not be called without a read lock
  827. func (c *Cluster) isActiveManager() bool {
  828. return c.node != nil && c.conn != nil
  829. }
  830. // errNoManager returns error describing why manager commands can't be used.
  831. // Call with read lock.
  832. func (c *Cluster) errNoManager() error {
  833. if c.node == nil {
  834. if c.locked {
  835. return ErrSwarmLocked
  836. }
  837. return fmt.Errorf("This node is not a swarm manager. Use \"docker swarm init\" or \"docker swarm join\" to connect this node to swarm and try again.")
  838. }
  839. if c.node.Manager() != nil {
  840. return fmt.Errorf("This node is not a swarm manager. Manager is being prepared or has trouble connecting to the cluster.")
  841. }
  842. return fmt.Errorf("This node is not a swarm manager. Worker nodes can't be used to view or modify cluster state. Please run this command on a manager node or promote the current node to a manager.")
  843. }
  844. // GetServices returns all services of a managed swarm cluster.
  845. func (c *Cluster) GetServices(options apitypes.ServiceListOptions) ([]types.Service, error) {
  846. c.RLock()
  847. defer c.RUnlock()
  848. if !c.isActiveManager() {
  849. return nil, c.errNoManager()
  850. }
  851. filters, err := newListServicesFilters(options.Filters)
  852. if err != nil {
  853. return nil, err
  854. }
  855. ctx, cancel := c.getRequestContext()
  856. defer cancel()
  857. r, err := c.client.ListServices(
  858. ctx,
  859. &swarmapi.ListServicesRequest{Filters: filters})
  860. if err != nil {
  861. return nil, err
  862. }
  863. services := []types.Service{}
  864. for _, service := range r.Services {
  865. services = append(services, convert.ServiceFromGRPC(*service))
  866. }
  867. return services, nil
  868. }
  869. // CreateService creates a new service in a managed swarm cluster.
  870. func (c *Cluster) CreateService(s types.ServiceSpec, encodedAuth string) (string, error) {
  871. c.RLock()
  872. defer c.RUnlock()
  873. if !c.isActiveManager() {
  874. return "", c.errNoManager()
  875. }
  876. ctx, cancel := c.getRequestContext()
  877. defer cancel()
  878. err := c.populateNetworkID(ctx, c.client, &s)
  879. if err != nil {
  880. return "", err
  881. }
  882. serviceSpec, err := convert.ServiceSpecToGRPC(s)
  883. if err != nil {
  884. return "", err
  885. }
  886. if encodedAuth != "" {
  887. ctnr := serviceSpec.Task.GetContainer()
  888. if ctnr == nil {
  889. return "", fmt.Errorf("service does not use container tasks")
  890. }
  891. ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
  892. }
  893. r, err := c.client.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
  894. if err != nil {
  895. return "", err
  896. }
  897. return r.Service.ID, nil
  898. }
  899. // GetService returns a service based on an ID or name.
  900. func (c *Cluster) GetService(input string) (types.Service, error) {
  901. c.RLock()
  902. defer c.RUnlock()
  903. if !c.isActiveManager() {
  904. return types.Service{}, c.errNoManager()
  905. }
  906. ctx, cancel := c.getRequestContext()
  907. defer cancel()
  908. service, err := getService(ctx, c.client, input)
  909. if err != nil {
  910. return types.Service{}, err
  911. }
  912. return convert.ServiceFromGRPC(*service), nil
  913. }
  914. // UpdateService updates existing service to match new properties.
  915. func (c *Cluster) UpdateService(serviceIDOrName string, version uint64, spec types.ServiceSpec, encodedAuth string, registryAuthFrom string) error {
  916. c.RLock()
  917. defer c.RUnlock()
  918. if !c.isActiveManager() {
  919. return c.errNoManager()
  920. }
  921. ctx, cancel := c.getRequestContext()
  922. defer cancel()
  923. err := c.populateNetworkID(ctx, c.client, &spec)
  924. if err != nil {
  925. return err
  926. }
  927. serviceSpec, err := convert.ServiceSpecToGRPC(spec)
  928. if err != nil {
  929. return err
  930. }
  931. currentService, err := getService(ctx, c.client, serviceIDOrName)
  932. if err != nil {
  933. return err
  934. }
  935. if encodedAuth != "" {
  936. ctnr := serviceSpec.Task.GetContainer()
  937. if ctnr == nil {
  938. return fmt.Errorf("service does not use container tasks")
  939. }
  940. ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
  941. } else {
  942. // this is needed because if the encodedAuth isn't being updated then we
  943. // shouldn't lose it, and continue to use the one that was already present
  944. var ctnr *swarmapi.ContainerSpec
  945. switch registryAuthFrom {
  946. case apitypes.RegistryAuthFromSpec, "":
  947. ctnr = currentService.Spec.Task.GetContainer()
  948. case apitypes.RegistryAuthFromPreviousSpec:
  949. if currentService.PreviousSpec == nil {
  950. return fmt.Errorf("service does not have a previous spec")
  951. }
  952. ctnr = currentService.PreviousSpec.Task.GetContainer()
  953. default:
  954. return fmt.Errorf("unsupported registryAuthFromValue")
  955. }
  956. if ctnr == nil {
  957. return fmt.Errorf("service does not use container tasks")
  958. }
  959. serviceSpec.Task.GetContainer().PullOptions = ctnr.PullOptions
  960. }
  961. _, err = c.client.UpdateService(
  962. ctx,
  963. &swarmapi.UpdateServiceRequest{
  964. ServiceID: currentService.ID,
  965. Spec: &serviceSpec,
  966. ServiceVersion: &swarmapi.Version{
  967. Index: version,
  968. },
  969. },
  970. )
  971. return err
  972. }
  973. // RemoveService removes a service from a managed swarm cluster.
  974. func (c *Cluster) RemoveService(input string) error {
  975. c.RLock()
  976. defer c.RUnlock()
  977. if !c.isActiveManager() {
  978. return c.errNoManager()
  979. }
  980. ctx, cancel := c.getRequestContext()
  981. defer cancel()
  982. service, err := getService(ctx, c.client, input)
  983. if err != nil {
  984. return err
  985. }
  986. if _, err := c.client.RemoveService(ctx, &swarmapi.RemoveServiceRequest{ServiceID: service.ID}); err != nil {
  987. return err
  988. }
  989. return nil
  990. }
  991. // GetNodes returns a list of all nodes known to a cluster.
  992. func (c *Cluster) GetNodes(options apitypes.NodeListOptions) ([]types.Node, error) {
  993. c.RLock()
  994. defer c.RUnlock()
  995. if !c.isActiveManager() {
  996. return nil, c.errNoManager()
  997. }
  998. filters, err := newListNodesFilters(options.Filters)
  999. if err != nil {
  1000. return nil, err
  1001. }
  1002. ctx, cancel := c.getRequestContext()
  1003. defer cancel()
  1004. r, err := c.client.ListNodes(
  1005. ctx,
  1006. &swarmapi.ListNodesRequest{Filters: filters})
  1007. if err != nil {
  1008. return nil, err
  1009. }
  1010. nodes := []types.Node{}
  1011. for _, node := range r.Nodes {
  1012. nodes = append(nodes, convert.NodeFromGRPC(*node))
  1013. }
  1014. return nodes, nil
  1015. }
  1016. // GetNode returns a node based on an ID or name.
  1017. func (c *Cluster) GetNode(input string) (types.Node, error) {
  1018. c.RLock()
  1019. defer c.RUnlock()
  1020. if !c.isActiveManager() {
  1021. return types.Node{}, c.errNoManager()
  1022. }
  1023. ctx, cancel := c.getRequestContext()
  1024. defer cancel()
  1025. node, err := getNode(ctx, c.client, input)
  1026. if err != nil {
  1027. return types.Node{}, err
  1028. }
  1029. return convert.NodeFromGRPC(*node), nil
  1030. }
  1031. // UpdateNode updates existing nodes properties.
  1032. func (c *Cluster) UpdateNode(nodeID string, version uint64, spec types.NodeSpec) error {
  1033. c.RLock()
  1034. defer c.RUnlock()
  1035. if !c.isActiveManager() {
  1036. return c.errNoManager()
  1037. }
  1038. nodeSpec, err := convert.NodeSpecToGRPC(spec)
  1039. if err != nil {
  1040. return err
  1041. }
  1042. ctx, cancel := c.getRequestContext()
  1043. defer cancel()
  1044. _, err = c.client.UpdateNode(
  1045. ctx,
  1046. &swarmapi.UpdateNodeRequest{
  1047. NodeID: nodeID,
  1048. Spec: &nodeSpec,
  1049. NodeVersion: &swarmapi.Version{
  1050. Index: version,
  1051. },
  1052. },
  1053. )
  1054. return err
  1055. }
  1056. // RemoveNode removes a node from a cluster
  1057. func (c *Cluster) RemoveNode(input string, force bool) error {
  1058. c.RLock()
  1059. defer c.RUnlock()
  1060. if !c.isActiveManager() {
  1061. return c.errNoManager()
  1062. }
  1063. ctx, cancel := c.getRequestContext()
  1064. defer cancel()
  1065. node, err := getNode(ctx, c.client, input)
  1066. if err != nil {
  1067. return err
  1068. }
  1069. if _, err := c.client.RemoveNode(ctx, &swarmapi.RemoveNodeRequest{NodeID: node.ID, Force: force}); err != nil {
  1070. return err
  1071. }
  1072. return nil
  1073. }
  1074. // GetTasks returns a list of tasks matching the filter options.
  1075. func (c *Cluster) GetTasks(options apitypes.TaskListOptions) ([]types.Task, error) {
  1076. c.RLock()
  1077. defer c.RUnlock()
  1078. if !c.isActiveManager() {
  1079. return nil, c.errNoManager()
  1080. }
  1081. byName := func(filter filters.Args) error {
  1082. if filter.Include("service") {
  1083. serviceFilters := filter.Get("service")
  1084. for _, serviceFilter := range serviceFilters {
  1085. service, err := c.GetService(serviceFilter)
  1086. if err != nil {
  1087. return err
  1088. }
  1089. filter.Del("service", serviceFilter)
  1090. filter.Add("service", service.ID)
  1091. }
  1092. }
  1093. if filter.Include("node") {
  1094. nodeFilters := filter.Get("node")
  1095. for _, nodeFilter := range nodeFilters {
  1096. node, err := c.GetNode(nodeFilter)
  1097. if err != nil {
  1098. return err
  1099. }
  1100. filter.Del("node", nodeFilter)
  1101. filter.Add("node", node.ID)
  1102. }
  1103. }
  1104. return nil
  1105. }
  1106. filters, err := newListTasksFilters(options.Filters, byName)
  1107. if err != nil {
  1108. return nil, err
  1109. }
  1110. ctx, cancel := c.getRequestContext()
  1111. defer cancel()
  1112. r, err := c.client.ListTasks(
  1113. ctx,
  1114. &swarmapi.ListTasksRequest{Filters: filters})
  1115. if err != nil {
  1116. return nil, err
  1117. }
  1118. tasks := []types.Task{}
  1119. for _, task := range r.Tasks {
  1120. if task.Spec.GetContainer() != nil {
  1121. tasks = append(tasks, convert.TaskFromGRPC(*task))
  1122. }
  1123. }
  1124. return tasks, nil
  1125. }
  1126. // GetTask returns a task by an ID.
  1127. func (c *Cluster) GetTask(input string) (types.Task, error) {
  1128. c.RLock()
  1129. defer c.RUnlock()
  1130. if !c.isActiveManager() {
  1131. return types.Task{}, c.errNoManager()
  1132. }
  1133. ctx, cancel := c.getRequestContext()
  1134. defer cancel()
  1135. task, err := getTask(ctx, c.client, input)
  1136. if err != nil {
  1137. return types.Task{}, err
  1138. }
  1139. return convert.TaskFromGRPC(*task), nil
  1140. }
  1141. // GetNetwork returns a cluster network by an ID.
  1142. func (c *Cluster) GetNetwork(input string) (apitypes.NetworkResource, error) {
  1143. c.RLock()
  1144. defer c.RUnlock()
  1145. if !c.isActiveManager() {
  1146. return apitypes.NetworkResource{}, c.errNoManager()
  1147. }
  1148. ctx, cancel := c.getRequestContext()
  1149. defer cancel()
  1150. network, err := getNetwork(ctx, c.client, input)
  1151. if err != nil {
  1152. return apitypes.NetworkResource{}, err
  1153. }
  1154. return convert.BasicNetworkFromGRPC(*network), nil
  1155. }
  1156. // GetNetworks returns all current cluster managed networks.
  1157. func (c *Cluster) GetNetworks() ([]apitypes.NetworkResource, error) {
  1158. c.RLock()
  1159. defer c.RUnlock()
  1160. if !c.isActiveManager() {
  1161. return nil, c.errNoManager()
  1162. }
  1163. ctx, cancel := c.getRequestContext()
  1164. defer cancel()
  1165. r, err := c.client.ListNetworks(ctx, &swarmapi.ListNetworksRequest{})
  1166. if err != nil {
  1167. return nil, err
  1168. }
  1169. var networks []apitypes.NetworkResource
  1170. for _, network := range r.Networks {
  1171. networks = append(networks, convert.BasicNetworkFromGRPC(*network))
  1172. }
  1173. return networks, nil
  1174. }
  1175. func attacherKey(target, containerID string) string {
  1176. return containerID + ":" + target
  1177. }
  1178. // UpdateAttachment signals the attachment config to the attachment
  1179. // waiter who is trying to start or attach the container to the
  1180. // network.
  1181. func (c *Cluster) UpdateAttachment(target, containerID string, config *network.NetworkingConfig) error {
  1182. c.RLock()
  1183. attacher, ok := c.attachers[attacherKey(target, containerID)]
  1184. c.RUnlock()
  1185. if !ok || attacher == nil {
  1186. return fmt.Errorf("could not find attacher for container %s to network %s", containerID, target)
  1187. }
  1188. attacher.attachWaitCh <- config
  1189. close(attacher.attachWaitCh)
  1190. return nil
  1191. }
  1192. // WaitForDetachment waits for the container to stop or detach from
  1193. // the network.
  1194. func (c *Cluster) WaitForDetachment(ctx context.Context, networkName, networkID, taskID, containerID string) error {
  1195. c.RLock()
  1196. attacher, ok := c.attachers[attacherKey(networkName, containerID)]
  1197. if !ok {
  1198. attacher, ok = c.attachers[attacherKey(networkID, containerID)]
  1199. }
  1200. if c.node == nil || c.node.Agent() == nil {
  1201. c.RUnlock()
  1202. return fmt.Errorf("invalid cluster node while waiting for detachment")
  1203. }
  1204. agent := c.node.Agent()
  1205. c.RUnlock()
  1206. if ok && attacher != nil &&
  1207. attacher.detachWaitCh != nil &&
  1208. attacher.attachCompleteCh != nil {
  1209. // Attachment may be in progress still so wait for
  1210. // attachment to complete.
  1211. select {
  1212. case <-attacher.attachCompleteCh:
  1213. case <-ctx.Done():
  1214. return ctx.Err()
  1215. }
  1216. if attacher.taskID == taskID {
  1217. select {
  1218. case <-attacher.detachWaitCh:
  1219. case <-ctx.Done():
  1220. return ctx.Err()
  1221. }
  1222. }
  1223. }
  1224. return agent.ResourceAllocator().DetachNetwork(ctx, taskID)
  1225. }
  1226. // AttachNetwork generates an attachment request towards the manager.
  1227. func (c *Cluster) AttachNetwork(target string, containerID string, addresses []string) (*network.NetworkingConfig, error) {
  1228. aKey := attacherKey(target, containerID)
  1229. c.Lock()
  1230. if c.node == nil || c.node.Agent() == nil {
  1231. c.Unlock()
  1232. return nil, fmt.Errorf("invalid cluster node while attaching to network")
  1233. }
  1234. if attacher, ok := c.attachers[aKey]; ok {
  1235. c.Unlock()
  1236. return attacher.config, nil
  1237. }
  1238. agent := c.node.Agent()
  1239. attachWaitCh := make(chan *network.NetworkingConfig)
  1240. detachWaitCh := make(chan struct{})
  1241. attachCompleteCh := make(chan struct{})
  1242. c.attachers[aKey] = &attacher{
  1243. attachWaitCh: attachWaitCh,
  1244. attachCompleteCh: attachCompleteCh,
  1245. detachWaitCh: detachWaitCh,
  1246. }
  1247. c.Unlock()
  1248. ctx, cancel := c.getRequestContext()
  1249. defer cancel()
  1250. taskID, err := agent.ResourceAllocator().AttachNetwork(ctx, containerID, target, addresses)
  1251. if err != nil {
  1252. c.Lock()
  1253. delete(c.attachers, aKey)
  1254. c.Unlock()
  1255. return nil, fmt.Errorf("Could not attach to network %s: %v", target, err)
  1256. }
  1257. c.Lock()
  1258. c.attachers[aKey].taskID = taskID
  1259. close(attachCompleteCh)
  1260. c.Unlock()
  1261. logrus.Debugf("Successfully attached to network %s with tid %s", target, taskID)
  1262. var config *network.NetworkingConfig
  1263. select {
  1264. case config = <-attachWaitCh:
  1265. case <-ctx.Done():
  1266. return nil, fmt.Errorf("attaching to network failed, make sure your network options are correct and check manager logs: %v", ctx.Err())
  1267. }
  1268. c.Lock()
  1269. c.attachers[aKey].config = config
  1270. c.Unlock()
  1271. return config, nil
  1272. }
  1273. // DetachNetwork unblocks the waiters waiting on WaitForDetachment so
  1274. // that a request to detach can be generated towards the manager.
  1275. func (c *Cluster) DetachNetwork(target string, containerID string) error {
  1276. aKey := attacherKey(target, containerID)
  1277. c.Lock()
  1278. attacher, ok := c.attachers[aKey]
  1279. delete(c.attachers, aKey)
  1280. c.Unlock()
  1281. if !ok {
  1282. return fmt.Errorf("could not find network attachment for container %s to network %s", containerID, target)
  1283. }
  1284. close(attacher.detachWaitCh)
  1285. return nil
  1286. }
  1287. // CreateNetwork creates a new cluster managed network.
  1288. func (c *Cluster) CreateNetwork(s apitypes.NetworkCreateRequest) (string, error) {
  1289. c.RLock()
  1290. defer c.RUnlock()
  1291. if !c.isActiveManager() {
  1292. return "", c.errNoManager()
  1293. }
  1294. if runconfig.IsPreDefinedNetwork(s.Name) {
  1295. err := fmt.Errorf("%s is a pre-defined network and cannot be created", s.Name)
  1296. return "", apierrors.NewRequestForbiddenError(err)
  1297. }
  1298. ctx, cancel := c.getRequestContext()
  1299. defer cancel()
  1300. networkSpec := convert.BasicNetworkCreateToGRPC(s)
  1301. r, err := c.client.CreateNetwork(ctx, &swarmapi.CreateNetworkRequest{Spec: &networkSpec})
  1302. if err != nil {
  1303. return "", err
  1304. }
  1305. return r.Network.ID, nil
  1306. }
  1307. // RemoveNetwork removes a cluster network.
  1308. func (c *Cluster) RemoveNetwork(input string) error {
  1309. c.RLock()
  1310. defer c.RUnlock()
  1311. if !c.isActiveManager() {
  1312. return c.errNoManager()
  1313. }
  1314. ctx, cancel := c.getRequestContext()
  1315. defer cancel()
  1316. network, err := getNetwork(ctx, c.client, input)
  1317. if err != nil {
  1318. return err
  1319. }
  1320. if _, err := c.client.RemoveNetwork(ctx, &swarmapi.RemoveNetworkRequest{NetworkID: network.ID}); err != nil {
  1321. return err
  1322. }
  1323. return nil
  1324. }
  1325. func (c *Cluster) populateNetworkID(ctx context.Context, client swarmapi.ControlClient, s *types.ServiceSpec) error {
  1326. // Always prefer NetworkAttachmentConfigs from TaskTemplate
  1327. // but fallback to service spec for backward compatibility
  1328. networks := s.TaskTemplate.Networks
  1329. if len(networks) == 0 {
  1330. networks = s.Networks
  1331. }
  1332. for i, n := range networks {
  1333. apiNetwork, err := getNetwork(ctx, client, n.Target)
  1334. if err != nil {
  1335. if ln, _ := c.config.Backend.FindNetwork(n.Target); ln != nil && !ln.Info().Dynamic() {
  1336. err = fmt.Errorf("network %s is not eligible for docker services", ln.Name())
  1337. return apierrors.NewRequestForbiddenError(err)
  1338. }
  1339. return err
  1340. }
  1341. networks[i].Target = apiNetwork.ID
  1342. }
  1343. return nil
  1344. }
  1345. func getNetwork(ctx context.Context, c swarmapi.ControlClient, input string) (*swarmapi.Network, error) {
  1346. // GetNetwork to match via full ID.
  1347. rg, err := c.GetNetwork(ctx, &swarmapi.GetNetworkRequest{NetworkID: input})
  1348. if err != nil {
  1349. // If any error (including NotFound), ListNetworks to match via ID prefix and full name.
  1350. rl, err := c.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: &swarmapi.ListNetworksRequest_Filters{Names: []string{input}}})
  1351. if err != nil || len(rl.Networks) == 0 {
  1352. rl, err = c.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: &swarmapi.ListNetworksRequest_Filters{IDPrefixes: []string{input}}})
  1353. }
  1354. if err != nil {
  1355. return nil, err
  1356. }
  1357. if len(rl.Networks) == 0 {
  1358. return nil, fmt.Errorf("network %s not found", input)
  1359. }
  1360. if l := len(rl.Networks); l > 1 {
  1361. return nil, fmt.Errorf("network %s is ambiguous (%d matches found)", input, l)
  1362. }
  1363. return rl.Networks[0], nil
  1364. }
  1365. return rg.Network, nil
  1366. }
  1367. // Cleanup stops active swarm node. This is run before daemon shutdown.
  1368. func (c *Cluster) Cleanup() {
  1369. c.Lock()
  1370. node := c.node
  1371. if node == nil {
  1372. c.Unlock()
  1373. return
  1374. }
  1375. defer c.Unlock()
  1376. if c.isActiveManager() {
  1377. active, reachable, unreachable, err := c.managerStats()
  1378. if err == nil {
  1379. singlenode := active && isLastManager(reachable, unreachable)
  1380. if active && !singlenode && removingManagerCausesLossOfQuorum(reachable, unreachable) {
  1381. logrus.Errorf("Leaving cluster with %v managers left out of %v. Raft quorum will be lost.", reachable-1, reachable+unreachable)
  1382. }
  1383. }
  1384. }
  1385. c.stopNode()
  1386. }
  1387. func (c *Cluster) managerStats() (current bool, reachable int, unreachable int, err error) {
  1388. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  1389. defer cancel()
  1390. nodes, err := c.client.ListNodes(ctx, &swarmapi.ListNodesRequest{})
  1391. if err != nil {
  1392. return false, 0, 0, err
  1393. }
  1394. for _, n := range nodes.Nodes {
  1395. if n.ManagerStatus != nil {
  1396. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_REACHABLE {
  1397. reachable++
  1398. if n.ID == c.node.NodeID() {
  1399. current = true
  1400. }
  1401. }
  1402. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_UNREACHABLE {
  1403. unreachable++
  1404. }
  1405. }
  1406. }
  1407. return
  1408. }
  1409. func validateAndSanitizeInitRequest(req *types.InitRequest) error {
  1410. var err error
  1411. req.ListenAddr, err = validateAddr(req.ListenAddr)
  1412. if err != nil {
  1413. return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
  1414. }
  1415. return nil
  1416. }
  1417. func validateAndSanitizeJoinRequest(req *types.JoinRequest) error {
  1418. var err error
  1419. req.ListenAddr, err = validateAddr(req.ListenAddr)
  1420. if err != nil {
  1421. return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
  1422. }
  1423. if len(req.RemoteAddrs) == 0 {
  1424. return fmt.Errorf("at least 1 RemoteAddr is required to join")
  1425. }
  1426. for i := range req.RemoteAddrs {
  1427. req.RemoteAddrs[i], err = validateAddr(req.RemoteAddrs[i])
  1428. if err != nil {
  1429. return fmt.Errorf("invalid remoteAddr %q: %v", req.RemoteAddrs[i], err)
  1430. }
  1431. }
  1432. return nil
  1433. }
  1434. func validateAddr(addr string) (string, error) {
  1435. if addr == "" {
  1436. return addr, fmt.Errorf("invalid empty address")
  1437. }
  1438. newaddr, err := opts.ParseTCPAddr(addr, defaultAddr)
  1439. if err != nil {
  1440. return addr, nil
  1441. }
  1442. return strings.TrimPrefix(newaddr, "tcp://"), nil
  1443. }
  1444. func initClusterSpec(node *node, spec types.Spec) error {
  1445. ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
  1446. for conn := range node.ListenControlSocket(ctx) {
  1447. if ctx.Err() != nil {
  1448. return ctx.Err()
  1449. }
  1450. if conn != nil {
  1451. client := swarmapi.NewControlClient(conn)
  1452. var cluster *swarmapi.Cluster
  1453. for i := 0; ; i++ {
  1454. lcr, err := client.ListClusters(ctx, &swarmapi.ListClustersRequest{})
  1455. if err != nil {
  1456. return fmt.Errorf("error on listing clusters: %v", err)
  1457. }
  1458. if len(lcr.Clusters) == 0 {
  1459. if i < 10 {
  1460. time.Sleep(200 * time.Millisecond)
  1461. continue
  1462. }
  1463. return fmt.Errorf("empty list of clusters was returned")
  1464. }
  1465. cluster = lcr.Clusters[0]
  1466. break
  1467. }
  1468. // In init, we take the initial default values from swarmkit, and merge
  1469. // any non nil or 0 value from spec to GRPC spec. This will leave the
  1470. // default value alone.
  1471. // Note that this is different from Update(), as in Update() we expect
  1472. // user to specify the complete spec of the cluster (as they already know
  1473. // the existing one and knows which field to update)
  1474. clusterSpec, err := convert.MergeSwarmSpecToGRPC(spec, cluster.Spec)
  1475. if err != nil {
  1476. return fmt.Errorf("error updating cluster settings: %v", err)
  1477. }
  1478. _, err = client.UpdateCluster(ctx, &swarmapi.UpdateClusterRequest{
  1479. ClusterID: cluster.ID,
  1480. ClusterVersion: &cluster.Meta.Version,
  1481. Spec: &clusterSpec,
  1482. })
  1483. if err != nil {
  1484. return fmt.Errorf("error updating cluster settings: %v", err)
  1485. }
  1486. return nil
  1487. }
  1488. }
  1489. return ctx.Err()
  1490. }
  1491. func detectLockedError(err error) error {
  1492. if errors.Cause(err) == x509.IncorrectPasswordError || errors.Cause(err).Error() == "tls: failed to parse private key" { // todo: better to export typed error
  1493. return errors.WithStack(ErrSwarmLocked)
  1494. }
  1495. return err
  1496. }