cluster.go 50 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919
  1. package cluster
  2. import (
  3. "crypto/x509"
  4. "encoding/base64"
  5. "encoding/json"
  6. "fmt"
  7. "io"
  8. "io/ioutil"
  9. "net"
  10. "os"
  11. "path/filepath"
  12. "runtime"
  13. "strings"
  14. "sync"
  15. "time"
  16. "github.com/Sirupsen/logrus"
  17. distreference "github.com/docker/distribution/reference"
  18. apierrors "github.com/docker/docker/api/errors"
  19. apitypes "github.com/docker/docker/api/types"
  20. "github.com/docker/docker/api/types/backend"
  21. "github.com/docker/docker/api/types/filters"
  22. "github.com/docker/docker/api/types/network"
  23. types "github.com/docker/docker/api/types/swarm"
  24. "github.com/docker/docker/daemon/cluster/convert"
  25. executorpkg "github.com/docker/docker/daemon/cluster/executor"
  26. "github.com/docker/docker/daemon/cluster/executor/container"
  27. "github.com/docker/docker/daemon/logger"
  28. "github.com/docker/docker/opts"
  29. "github.com/docker/docker/pkg/ioutils"
  30. "github.com/docker/docker/pkg/signal"
  31. "github.com/docker/docker/pkg/stdcopy"
  32. "github.com/docker/docker/reference"
  33. "github.com/docker/docker/runconfig"
  34. swarmapi "github.com/docker/swarmkit/api"
  35. "github.com/docker/swarmkit/manager/encryption"
  36. swarmnode "github.com/docker/swarmkit/node"
  37. "github.com/docker/swarmkit/protobuf/ptypes"
  38. "github.com/pkg/errors"
  39. "golang.org/x/net/context"
  40. "google.golang.org/grpc"
  41. )
  42. const swarmDirName = "swarm"
  43. const controlSocket = "control.sock"
  44. const swarmConnectTimeout = 20 * time.Second
  45. const swarmRequestTimeout = 20 * time.Second
  46. const stateFile = "docker-state.json"
  47. const defaultAddr = "0.0.0.0:2377"
  48. const (
  49. initialReconnectDelay = 100 * time.Millisecond
  50. maxReconnectDelay = 30 * time.Second
  51. contextPrefix = "com.docker.swarm"
  52. )
  53. // ErrNoSwarm is returned on leaving a cluster that was never initialized
  54. var ErrNoSwarm = fmt.Errorf("This node is not part of a swarm")
  55. // ErrSwarmExists is returned on initialize or join request for a cluster that has already been activated
  56. var ErrSwarmExists = fmt.Errorf("This node is already part of a swarm. Use \"docker swarm leave\" to leave this swarm and join another one.")
  57. // ErrSwarmJoinTimeoutReached is returned when cluster join could not complete before timeout was reached.
  58. var ErrSwarmJoinTimeoutReached = fmt.Errorf("Timeout was reached before node was joined. The attempt to join the swarm will continue in the background. Use the \"docker info\" command to see the current swarm status of your node.")
  59. // ErrSwarmLocked is returned if the swarm is encrypted and needs a key to unlock it.
  60. var ErrSwarmLocked = fmt.Errorf("Swarm is encrypted and needs to be unlocked before it can be used. Please use \"docker swarm unlock\" to unlock it.")
  61. // ErrSwarmCertificatesExpired is returned if docker was not started for the whole validity period and they had no chance to renew automatically.
  62. var ErrSwarmCertificatesExpired = errors.New("Swarm certificates have expired. To replace them, leave the swarm and join again.")
  63. // NetworkSubnetsProvider exposes functions for retrieving the subnets
  64. // of networks managed by Docker, so they can be filtered.
  65. type NetworkSubnetsProvider interface {
  66. V4Subnets() []net.IPNet
  67. V6Subnets() []net.IPNet
  68. }
  69. // Config provides values for Cluster.
  70. type Config struct {
  71. Root string
  72. Name string
  73. Backend executorpkg.Backend
  74. NetworkSubnetsProvider NetworkSubnetsProvider
  75. // DefaultAdvertiseAddr is the default host/IP or network interface to use
  76. // if no AdvertiseAddr value is specified.
  77. DefaultAdvertiseAddr string
  78. // path to store runtime state, such as the swarm control socket
  79. RuntimeRoot string
  80. }
  81. // Cluster provides capabilities to participate in a cluster as a worker or a
  82. // manager.
  83. type Cluster struct {
  84. sync.RWMutex
  85. *node
  86. root string
  87. runtimeRoot string
  88. config Config
  89. configEvent chan struct{} // todo: make this array and goroutine safe
  90. actualLocalAddr string // after resolution, not persisted
  91. stop bool
  92. err error
  93. cancelDelay func()
  94. attachers map[string]*attacher
  95. locked bool
  96. lastNodeConfig *nodeStartConfig
  97. }
  98. // attacher manages the in-memory attachment state of a container
  99. // attachment to a global scope network managed by swarm manager. It
  100. // helps in identifying the attachment ID via the taskID and the
  101. // corresponding attachment configuration obtained from the manager.
  102. type attacher struct {
  103. taskID string
  104. config *network.NetworkingConfig
  105. attachWaitCh chan *network.NetworkingConfig
  106. attachCompleteCh chan struct{}
  107. detachWaitCh chan struct{}
  108. }
  109. type node struct {
  110. *swarmnode.Node
  111. done chan struct{}
  112. ready bool
  113. conn *grpc.ClientConn
  114. client swarmapi.ControlClient
  115. logs swarmapi.LogsClient
  116. reconnectDelay time.Duration
  117. config nodeStartConfig
  118. }
  119. // nodeStartConfig holds configuration needed to start a new node. Exported
  120. // fields of this structure are saved to disk in json. Unexported fields
  121. // contain data that shouldn't be persisted between daemon reloads.
  122. type nodeStartConfig struct {
  123. // LocalAddr is this machine's local IP or hostname, if specified.
  124. LocalAddr string
  125. // RemoteAddr is the address that was given to "swarm join". It is used
  126. // to find LocalAddr if necessary.
  127. RemoteAddr string
  128. // ListenAddr is the address we bind to, including a port.
  129. ListenAddr string
  130. // AdvertiseAddr is the address other nodes should connect to,
  131. // including a port.
  132. AdvertiseAddr string
  133. joinAddr string
  134. forceNewCluster bool
  135. joinToken string
  136. lockKey []byte
  137. autolock bool
  138. }
  139. // New creates a new Cluster instance using provided config.
  140. func New(config Config) (*Cluster, error) {
  141. root := filepath.Join(config.Root, swarmDirName)
  142. if err := os.MkdirAll(root, 0700); err != nil {
  143. return nil, err
  144. }
  145. if config.RuntimeRoot == "" {
  146. config.RuntimeRoot = root
  147. }
  148. if err := os.MkdirAll(config.RuntimeRoot, 0700); err != nil {
  149. return nil, err
  150. }
  151. c := &Cluster{
  152. root: root,
  153. config: config,
  154. configEvent: make(chan struct{}, 10),
  155. runtimeRoot: config.RuntimeRoot,
  156. attachers: make(map[string]*attacher),
  157. }
  158. nodeConfig, err := c.loadState()
  159. if err != nil {
  160. if os.IsNotExist(err) {
  161. return c, nil
  162. }
  163. return nil, err
  164. }
  165. n, err := c.startNewNode(*nodeConfig)
  166. if err != nil {
  167. return nil, err
  168. }
  169. select {
  170. case <-time.After(swarmConnectTimeout):
  171. logrus.Error("swarm component could not be started before timeout was reached")
  172. case <-n.Ready():
  173. case <-n.done:
  174. if errors.Cause(c.err) == ErrSwarmLocked {
  175. return c, nil
  176. }
  177. if err, ok := errors.Cause(c.err).(x509.CertificateInvalidError); ok && err.Reason == x509.Expired {
  178. c.err = ErrSwarmCertificatesExpired
  179. return c, nil
  180. }
  181. return nil, fmt.Errorf("swarm component could not be started: %v", c.err)
  182. }
  183. go c.reconnectOnFailure(n)
  184. return c, nil
  185. }
  186. func (c *Cluster) loadState() (*nodeStartConfig, error) {
  187. dt, err := ioutil.ReadFile(filepath.Join(c.root, stateFile))
  188. if err != nil {
  189. return nil, err
  190. }
  191. // missing certificate means no actual state to restore from
  192. if _, err := os.Stat(filepath.Join(c.root, "certificates/swarm-node.crt")); err != nil {
  193. if os.IsNotExist(err) {
  194. c.clearState()
  195. }
  196. return nil, err
  197. }
  198. var st nodeStartConfig
  199. if err := json.Unmarshal(dt, &st); err != nil {
  200. return nil, err
  201. }
  202. return &st, nil
  203. }
  204. func (c *Cluster) saveState(config nodeStartConfig) error {
  205. dt, err := json.Marshal(config)
  206. if err != nil {
  207. return err
  208. }
  209. return ioutils.AtomicWriteFile(filepath.Join(c.root, stateFile), dt, 0600)
  210. }
  211. func (c *Cluster) reconnectOnFailure(n *node) {
  212. for {
  213. <-n.done
  214. c.Lock()
  215. if c.stop || c.node != nil {
  216. c.Unlock()
  217. return
  218. }
  219. n.reconnectDelay *= 2
  220. if n.reconnectDelay > maxReconnectDelay {
  221. n.reconnectDelay = maxReconnectDelay
  222. }
  223. logrus.Warnf("Restarting swarm in %.2f seconds", n.reconnectDelay.Seconds())
  224. delayCtx, cancel := context.WithTimeout(context.Background(), n.reconnectDelay)
  225. c.cancelDelay = cancel
  226. c.Unlock()
  227. <-delayCtx.Done()
  228. if delayCtx.Err() != context.DeadlineExceeded {
  229. return
  230. }
  231. c.Lock()
  232. if c.node != nil {
  233. c.Unlock()
  234. return
  235. }
  236. var err error
  237. config := n.config
  238. config.RemoteAddr = c.getRemoteAddress()
  239. config.joinAddr = config.RemoteAddr
  240. n, err = c.startNewNode(config)
  241. if err != nil {
  242. c.err = err
  243. close(n.done)
  244. }
  245. c.Unlock()
  246. }
  247. }
  248. func (c *Cluster) startNewNode(conf nodeStartConfig) (*node, error) {
  249. if err := c.config.Backend.IsSwarmCompatible(); err != nil {
  250. return nil, err
  251. }
  252. actualLocalAddr := conf.LocalAddr
  253. if actualLocalAddr == "" {
  254. // If localAddr was not specified, resolve it automatically
  255. // based on the route to joinAddr. localAddr can only be left
  256. // empty on "join".
  257. listenHost, _, err := net.SplitHostPort(conf.ListenAddr)
  258. if err != nil {
  259. return nil, fmt.Errorf("could not parse listen address: %v", err)
  260. }
  261. listenAddrIP := net.ParseIP(listenHost)
  262. if listenAddrIP == nil || !listenAddrIP.IsUnspecified() {
  263. actualLocalAddr = listenHost
  264. } else {
  265. if conf.RemoteAddr == "" {
  266. // Should never happen except using swarms created by
  267. // old versions that didn't save remoteAddr.
  268. conf.RemoteAddr = "8.8.8.8:53"
  269. }
  270. conn, err := net.Dial("udp", conf.RemoteAddr)
  271. if err != nil {
  272. return nil, fmt.Errorf("could not find local IP address: %v", err)
  273. }
  274. localHostPort := conn.LocalAddr().String()
  275. actualLocalAddr, _, _ = net.SplitHostPort(localHostPort)
  276. conn.Close()
  277. }
  278. }
  279. var control string
  280. if runtime.GOOS == "windows" {
  281. control = `\\.\pipe\` + controlSocket
  282. } else {
  283. control = filepath.Join(c.runtimeRoot, controlSocket)
  284. }
  285. c.node = nil
  286. c.cancelDelay = nil
  287. c.stop = false
  288. n, err := swarmnode.New(&swarmnode.Config{
  289. Hostname: c.config.Name,
  290. ForceNewCluster: conf.forceNewCluster,
  291. ListenControlAPI: control,
  292. ListenRemoteAPI: conf.ListenAddr,
  293. AdvertiseRemoteAPI: conf.AdvertiseAddr,
  294. JoinAddr: conf.joinAddr,
  295. StateDir: c.root,
  296. JoinToken: conf.joinToken,
  297. Executor: container.NewExecutor(c.config.Backend),
  298. HeartbeatTick: 1,
  299. ElectionTick: 3,
  300. UnlockKey: conf.lockKey,
  301. AutoLockManagers: conf.autolock,
  302. })
  303. if err != nil {
  304. return nil, err
  305. }
  306. ctx := context.Background()
  307. if err := n.Start(ctx); err != nil {
  308. return nil, err
  309. }
  310. node := &node{
  311. Node: n,
  312. done: make(chan struct{}),
  313. reconnectDelay: initialReconnectDelay,
  314. config: conf,
  315. }
  316. c.node = node
  317. c.actualLocalAddr = actualLocalAddr // not saved
  318. c.saveState(conf)
  319. c.config.Backend.SetClusterProvider(c)
  320. go func() {
  321. err := detectLockedError(n.Err(ctx))
  322. if err != nil {
  323. logrus.Errorf("cluster exited with error: %v", err)
  324. }
  325. c.Lock()
  326. c.node = nil
  327. c.err = err
  328. if errors.Cause(err) == ErrSwarmLocked {
  329. c.locked = true
  330. confClone := conf
  331. c.lastNodeConfig = &confClone
  332. }
  333. c.Unlock()
  334. close(node.done)
  335. }()
  336. go func() {
  337. select {
  338. case <-n.Ready():
  339. c.Lock()
  340. node.ready = true
  341. c.err = nil
  342. c.Unlock()
  343. case <-ctx.Done():
  344. }
  345. c.configEvent <- struct{}{}
  346. }()
  347. go func() {
  348. for conn := range n.ListenControlSocket(ctx) {
  349. c.Lock()
  350. if node.conn != conn {
  351. if conn == nil {
  352. node.client = nil
  353. node.logs = nil
  354. } else {
  355. node.client = swarmapi.NewControlClient(conn)
  356. node.logs = swarmapi.NewLogsClient(conn)
  357. }
  358. }
  359. node.conn = conn
  360. c.Unlock()
  361. c.configEvent <- struct{}{}
  362. }
  363. }()
  364. return node, nil
  365. }
  366. // Init initializes new cluster from user provided request.
  367. func (c *Cluster) Init(req types.InitRequest) (string, error) {
  368. c.Lock()
  369. if c.swarmExists() {
  370. if !req.ForceNewCluster {
  371. c.Unlock()
  372. return "", ErrSwarmExists
  373. }
  374. if err := c.stopNode(); err != nil {
  375. c.Unlock()
  376. return "", err
  377. }
  378. }
  379. if err := validateAndSanitizeInitRequest(&req); err != nil {
  380. c.Unlock()
  381. return "", err
  382. }
  383. listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
  384. if err != nil {
  385. c.Unlock()
  386. return "", err
  387. }
  388. advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
  389. if err != nil {
  390. c.Unlock()
  391. return "", err
  392. }
  393. localAddr := listenHost
  394. // If the local address is undetermined, the advertise address
  395. // will be used as local address, if it belongs to this system.
  396. // If the advertise address is not local, then we try to find
  397. // a system address to use as local address. If this fails,
  398. // we give up and ask user to pass the listen address.
  399. if net.ParseIP(localAddr).IsUnspecified() {
  400. advertiseIP := net.ParseIP(advertiseHost)
  401. found := false
  402. for _, systemIP := range listSystemIPs() {
  403. if systemIP.Equal(advertiseIP) {
  404. localAddr = advertiseIP.String()
  405. found = true
  406. break
  407. }
  408. }
  409. if !found {
  410. ip, err := c.resolveSystemAddr()
  411. if err != nil {
  412. c.Unlock()
  413. logrus.Warnf("Could not find a local address: %v", err)
  414. return "", errMustSpecifyListenAddr
  415. }
  416. localAddr = ip.String()
  417. }
  418. }
  419. // todo: check current state existing
  420. n, err := c.startNewNode(nodeStartConfig{
  421. forceNewCluster: req.ForceNewCluster,
  422. autolock: req.AutoLockManagers,
  423. LocalAddr: localAddr,
  424. ListenAddr: net.JoinHostPort(listenHost, listenPort),
  425. AdvertiseAddr: net.JoinHostPort(advertiseHost, advertisePort),
  426. })
  427. if err != nil {
  428. c.Unlock()
  429. return "", err
  430. }
  431. c.Unlock()
  432. select {
  433. case <-n.Ready():
  434. if err := initClusterSpec(n, req.Spec); err != nil {
  435. return "", err
  436. }
  437. go c.reconnectOnFailure(n)
  438. return n.NodeID(), nil
  439. case <-n.done:
  440. c.RLock()
  441. defer c.RUnlock()
  442. if !req.ForceNewCluster { // if failure on first attempt don't keep state
  443. if err := c.clearState(); err != nil {
  444. return "", err
  445. }
  446. }
  447. return "", c.err
  448. }
  449. }
  450. // Join makes current Cluster part of an existing swarm cluster.
  451. func (c *Cluster) Join(req types.JoinRequest) error {
  452. c.Lock()
  453. if c.swarmExists() {
  454. c.Unlock()
  455. return ErrSwarmExists
  456. }
  457. if err := validateAndSanitizeJoinRequest(&req); err != nil {
  458. c.Unlock()
  459. return err
  460. }
  461. listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
  462. if err != nil {
  463. c.Unlock()
  464. return err
  465. }
  466. var advertiseAddr string
  467. if req.AdvertiseAddr != "" {
  468. advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
  469. // For joining, we don't need to provide an advertise address,
  470. // since the remote side can detect it.
  471. if err == nil {
  472. advertiseAddr = net.JoinHostPort(advertiseHost, advertisePort)
  473. }
  474. }
  475. // todo: check current state existing
  476. n, err := c.startNewNode(nodeStartConfig{
  477. RemoteAddr: req.RemoteAddrs[0],
  478. ListenAddr: net.JoinHostPort(listenHost, listenPort),
  479. AdvertiseAddr: advertiseAddr,
  480. joinAddr: req.RemoteAddrs[0],
  481. joinToken: req.JoinToken,
  482. })
  483. if err != nil {
  484. c.Unlock()
  485. return err
  486. }
  487. c.Unlock()
  488. select {
  489. case <-time.After(swarmConnectTimeout):
  490. // attempt to connect will continue in background, but reconnect only if it didn't fail
  491. go func() {
  492. select {
  493. case <-n.Ready():
  494. c.reconnectOnFailure(n)
  495. case <-n.done:
  496. logrus.Errorf("failed to join the cluster: %+v", c.err)
  497. }
  498. }()
  499. return ErrSwarmJoinTimeoutReached
  500. case <-n.Ready():
  501. go c.reconnectOnFailure(n)
  502. return nil
  503. case <-n.done:
  504. c.RLock()
  505. defer c.RUnlock()
  506. return c.err
  507. }
  508. }
  509. // GetUnlockKey returns the unlock key for the swarm.
  510. func (c *Cluster) GetUnlockKey() (string, error) {
  511. c.RLock()
  512. defer c.RUnlock()
  513. if !c.isActiveManager() {
  514. return "", c.errNoManager()
  515. }
  516. ctx, cancel := c.getRequestContext()
  517. defer cancel()
  518. client := swarmapi.NewCAClient(c.conn)
  519. r, err := client.GetUnlockKey(ctx, &swarmapi.GetUnlockKeyRequest{})
  520. if err != nil {
  521. return "", err
  522. }
  523. if len(r.UnlockKey) == 0 {
  524. // no key
  525. return "", nil
  526. }
  527. return encryption.HumanReadableKey(r.UnlockKey), nil
  528. }
  529. // UnlockSwarm provides a key to decrypt data that is encrypted at rest.
  530. func (c *Cluster) UnlockSwarm(req types.UnlockRequest) error {
  531. c.RLock()
  532. if !c.isActiveManager() {
  533. if err := c.errNoManager(); err != ErrSwarmLocked {
  534. c.RUnlock()
  535. return err
  536. }
  537. }
  538. if c.node != nil || c.locked != true {
  539. c.RUnlock()
  540. return errors.New("swarm is not locked")
  541. }
  542. c.RUnlock()
  543. key, err := encryption.ParseHumanReadableKey(req.UnlockKey)
  544. if err != nil {
  545. return err
  546. }
  547. c.Lock()
  548. config := *c.lastNodeConfig
  549. config.lockKey = key
  550. n, err := c.startNewNode(config)
  551. if err != nil {
  552. c.Unlock()
  553. return err
  554. }
  555. c.Unlock()
  556. select {
  557. case <-n.Ready():
  558. case <-n.done:
  559. if errors.Cause(c.err) == ErrSwarmLocked {
  560. return errors.New("swarm could not be unlocked: invalid key provided")
  561. }
  562. return fmt.Errorf("swarm component could not be started: %v", c.err)
  563. }
  564. go c.reconnectOnFailure(n)
  565. return nil
  566. }
  567. // stopNode is a helper that stops the active c.node and waits until it has
  568. // shut down. Call while keeping the cluster lock.
  569. func (c *Cluster) stopNode() error {
  570. if c.node == nil {
  571. return nil
  572. }
  573. c.stop = true
  574. if c.cancelDelay != nil {
  575. c.cancelDelay()
  576. c.cancelDelay = nil
  577. }
  578. node := c.node
  579. ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
  580. defer cancel()
  581. // TODO: can't hold lock on stop because it calls back to network
  582. c.Unlock()
  583. defer c.Lock()
  584. if err := node.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") {
  585. return err
  586. }
  587. <-node.done
  588. return nil
  589. }
  590. func removingManagerCausesLossOfQuorum(reachable, unreachable int) bool {
  591. return reachable-2 <= unreachable
  592. }
  593. func isLastManager(reachable, unreachable int) bool {
  594. return reachable == 1 && unreachable == 0
  595. }
  596. // Leave shuts down Cluster and removes current state.
  597. func (c *Cluster) Leave(force bool) error {
  598. c.Lock()
  599. node := c.node
  600. if node == nil {
  601. if c.locked {
  602. c.locked = false
  603. c.lastNodeConfig = nil
  604. c.Unlock()
  605. } else if c.err == ErrSwarmCertificatesExpired {
  606. c.err = nil
  607. c.Unlock()
  608. } else {
  609. c.Unlock()
  610. return ErrNoSwarm
  611. }
  612. } else {
  613. if node.Manager() != nil && !force {
  614. msg := "You are attempting to leave the swarm on a node that is participating as a manager. "
  615. if c.isActiveManager() {
  616. active, reachable, unreachable, err := c.managerStats()
  617. if err == nil {
  618. if active && removingManagerCausesLossOfQuorum(reachable, unreachable) {
  619. if isLastManager(reachable, unreachable) {
  620. msg += "Removing the last manager erases all current state of the swarm. Use `--force` to ignore this message. "
  621. c.Unlock()
  622. return fmt.Errorf(msg)
  623. }
  624. msg += fmt.Sprintf("Removing this node leaves %v managers out of %v. Without a Raft quorum your swarm will be inaccessible. ", reachable-1, reachable+unreachable)
  625. }
  626. }
  627. } else {
  628. msg += "Doing so may lose the consensus of your cluster. "
  629. }
  630. msg += "The only way to restore a swarm that has lost consensus is to reinitialize it with `--force-new-cluster`. Use `--force` to suppress this message."
  631. c.Unlock()
  632. return fmt.Errorf(msg)
  633. }
  634. if err := c.stopNode(); err != nil {
  635. logrus.Errorf("failed to shut down cluster node: %v", err)
  636. signal.DumpStacks("")
  637. c.Unlock()
  638. return err
  639. }
  640. c.Unlock()
  641. if nodeID := node.NodeID(); nodeID != "" {
  642. nodeContainers, err := c.listContainerForNode(nodeID)
  643. if err != nil {
  644. return err
  645. }
  646. for _, id := range nodeContainers {
  647. if err := c.config.Backend.ContainerRm(id, &apitypes.ContainerRmConfig{ForceRemove: true}); err != nil {
  648. logrus.Errorf("error removing %v: %v", id, err)
  649. }
  650. }
  651. }
  652. }
  653. c.configEvent <- struct{}{}
  654. // todo: cleanup optional?
  655. if err := c.clearState(); err != nil {
  656. return err
  657. }
  658. return nil
  659. }
  660. func (c *Cluster) listContainerForNode(nodeID string) ([]string, error) {
  661. var ids []string
  662. filters := filters.NewArgs()
  663. filters.Add("label", fmt.Sprintf("com.docker.swarm.node.id=%s", nodeID))
  664. containers, err := c.config.Backend.Containers(&apitypes.ContainerListOptions{
  665. Filters: filters,
  666. })
  667. if err != nil {
  668. return []string{}, err
  669. }
  670. for _, c := range containers {
  671. ids = append(ids, c.ID)
  672. }
  673. return ids, nil
  674. }
  675. func (c *Cluster) clearState() error {
  676. // todo: backup this data instead of removing?
  677. if err := os.RemoveAll(c.root); err != nil {
  678. return err
  679. }
  680. if err := os.MkdirAll(c.root, 0700); err != nil {
  681. return err
  682. }
  683. c.config.Backend.SetClusterProvider(nil)
  684. return nil
  685. }
  686. func (c *Cluster) getRequestContext() (context.Context, func()) { // TODO: not needed when requests don't block on qourum lost
  687. return context.WithTimeout(context.Background(), swarmRequestTimeout)
  688. }
  689. // Inspect retrieves the configuration properties of a managed swarm cluster.
  690. func (c *Cluster) Inspect() (types.Swarm, error) {
  691. c.RLock()
  692. defer c.RUnlock()
  693. if !c.isActiveManager() {
  694. return types.Swarm{}, c.errNoManager()
  695. }
  696. ctx, cancel := c.getRequestContext()
  697. defer cancel()
  698. swarm, err := getSwarm(ctx, c.client)
  699. if err != nil {
  700. return types.Swarm{}, err
  701. }
  702. return convert.SwarmFromGRPC(*swarm), nil
  703. }
  704. // Update updates configuration of a managed swarm cluster.
  705. func (c *Cluster) Update(version uint64, spec types.Spec, flags types.UpdateFlags) error {
  706. c.RLock()
  707. defer c.RUnlock()
  708. if !c.isActiveManager() {
  709. return c.errNoManager()
  710. }
  711. ctx, cancel := c.getRequestContext()
  712. defer cancel()
  713. swarm, err := getSwarm(ctx, c.client)
  714. if err != nil {
  715. return err
  716. }
  717. // In update, client should provide the complete spec of the swarm, including
  718. // Name and Labels. If a field is specified with 0 or nil, then the default value
  719. // will be used to swarmkit.
  720. clusterSpec, err := convert.SwarmSpecToGRPC(spec)
  721. if err != nil {
  722. return err
  723. }
  724. _, err = c.client.UpdateCluster(
  725. ctx,
  726. &swarmapi.UpdateClusterRequest{
  727. ClusterID: swarm.ID,
  728. Spec: &clusterSpec,
  729. ClusterVersion: &swarmapi.Version{
  730. Index: version,
  731. },
  732. Rotation: swarmapi.KeyRotation{
  733. WorkerJoinToken: flags.RotateWorkerToken,
  734. ManagerJoinToken: flags.RotateManagerToken,
  735. ManagerUnlockKey: flags.RotateManagerUnlockKey,
  736. },
  737. },
  738. )
  739. return err
  740. }
  741. // IsManager returns true if Cluster is participating as a manager.
  742. func (c *Cluster) IsManager() bool {
  743. c.RLock()
  744. defer c.RUnlock()
  745. return c.isActiveManager()
  746. }
  747. // IsAgent returns true if Cluster is participating as a worker/agent.
  748. func (c *Cluster) IsAgent() bool {
  749. c.RLock()
  750. defer c.RUnlock()
  751. return c.node != nil && c.ready
  752. }
  753. // GetLocalAddress returns the local address.
  754. func (c *Cluster) GetLocalAddress() string {
  755. c.RLock()
  756. defer c.RUnlock()
  757. return c.actualLocalAddr
  758. }
  759. // GetListenAddress returns the listen address.
  760. func (c *Cluster) GetListenAddress() string {
  761. c.RLock()
  762. defer c.RUnlock()
  763. if c.node != nil {
  764. return c.node.config.ListenAddr
  765. }
  766. return ""
  767. }
  768. // GetAdvertiseAddress returns the remotely reachable address of this node.
  769. func (c *Cluster) GetAdvertiseAddress() string {
  770. c.RLock()
  771. defer c.RUnlock()
  772. if c.node != nil && c.node.config.AdvertiseAddr != "" {
  773. advertiseHost, _, _ := net.SplitHostPort(c.node.config.AdvertiseAddr)
  774. return advertiseHost
  775. }
  776. return c.actualLocalAddr
  777. }
  778. // GetRemoteAddress returns a known advertise address of a remote manager if
  779. // available.
  780. // todo: change to array/connect with info
  781. func (c *Cluster) GetRemoteAddress() string {
  782. c.RLock()
  783. defer c.RUnlock()
  784. return c.getRemoteAddress()
  785. }
  786. func (c *Cluster) getRemoteAddress() string {
  787. if c.node == nil {
  788. return ""
  789. }
  790. nodeID := c.node.NodeID()
  791. for _, r := range c.node.Remotes() {
  792. if r.NodeID != nodeID {
  793. return r.Addr
  794. }
  795. }
  796. return ""
  797. }
  798. // ListenClusterEvents returns a channel that receives messages on cluster
  799. // participation changes.
  800. // todo: make cancelable and accessible to multiple callers
  801. func (c *Cluster) ListenClusterEvents() <-chan struct{} {
  802. return c.configEvent
  803. }
  804. // Info returns information about the current cluster state.
  805. func (c *Cluster) Info() types.Info {
  806. info := types.Info{
  807. NodeAddr: c.GetAdvertiseAddress(),
  808. }
  809. c.RLock()
  810. defer c.RUnlock()
  811. if c.node == nil {
  812. info.LocalNodeState = types.LocalNodeStateInactive
  813. if c.cancelDelay != nil {
  814. info.LocalNodeState = types.LocalNodeStateError
  815. }
  816. if c.locked {
  817. info.LocalNodeState = types.LocalNodeStateLocked
  818. } else if c.err == ErrSwarmCertificatesExpired {
  819. info.LocalNodeState = types.LocalNodeStateError
  820. }
  821. } else {
  822. info.LocalNodeState = types.LocalNodeStatePending
  823. if c.ready == true {
  824. info.LocalNodeState = types.LocalNodeStateActive
  825. } else if c.locked {
  826. info.LocalNodeState = types.LocalNodeStateLocked
  827. }
  828. }
  829. if c.err != nil {
  830. info.Error = c.err.Error()
  831. }
  832. ctx, cancel := c.getRequestContext()
  833. defer cancel()
  834. if c.isActiveManager() {
  835. info.ControlAvailable = true
  836. swarm, err := c.Inspect()
  837. if err != nil {
  838. info.Error = err.Error()
  839. }
  840. // Strip JoinTokens
  841. info.Cluster = swarm.ClusterInfo
  842. if r, err := c.client.ListNodes(ctx, &swarmapi.ListNodesRequest{}); err == nil {
  843. info.Nodes = len(r.Nodes)
  844. for _, n := range r.Nodes {
  845. if n.ManagerStatus != nil {
  846. info.Managers = info.Managers + 1
  847. }
  848. }
  849. }
  850. }
  851. if c.node != nil {
  852. for _, r := range c.node.Remotes() {
  853. info.RemoteManagers = append(info.RemoteManagers, types.Peer{NodeID: r.NodeID, Addr: r.Addr})
  854. }
  855. info.NodeID = c.node.NodeID()
  856. }
  857. return info
  858. }
  859. // isActiveManager should not be called without a read lock
  860. func (c *Cluster) isActiveManager() bool {
  861. return c.node != nil && c.conn != nil
  862. }
  863. // swarmExists should not be called without a read lock
  864. func (c *Cluster) swarmExists() bool {
  865. return c.node != nil || c.locked || c.err == ErrSwarmCertificatesExpired
  866. }
  867. // errNoManager returns error describing why manager commands can't be used.
  868. // Call with read lock.
  869. func (c *Cluster) errNoManager() error {
  870. if c.node == nil {
  871. if c.locked {
  872. return ErrSwarmLocked
  873. }
  874. if c.err == ErrSwarmCertificatesExpired {
  875. return ErrSwarmCertificatesExpired
  876. }
  877. return fmt.Errorf("This node is not a swarm manager. Use \"docker swarm init\" or \"docker swarm join\" to connect this node to swarm and try again.")
  878. }
  879. if c.node.Manager() != nil {
  880. return fmt.Errorf("This node is not a swarm manager. Manager is being prepared or has trouble connecting to the cluster.")
  881. }
  882. return fmt.Errorf("This node is not a swarm manager. Worker nodes can't be used to view or modify cluster state. Please run this command on a manager node or promote the current node to a manager.")
  883. }
  884. // GetServices returns all services of a managed swarm cluster.
  885. func (c *Cluster) GetServices(options apitypes.ServiceListOptions) ([]types.Service, error) {
  886. c.RLock()
  887. defer c.RUnlock()
  888. if !c.isActiveManager() {
  889. return nil, c.errNoManager()
  890. }
  891. filters, err := newListServicesFilters(options.Filters)
  892. if err != nil {
  893. return nil, err
  894. }
  895. ctx, cancel := c.getRequestContext()
  896. defer cancel()
  897. r, err := c.client.ListServices(
  898. ctx,
  899. &swarmapi.ListServicesRequest{Filters: filters})
  900. if err != nil {
  901. return nil, err
  902. }
  903. services := []types.Service{}
  904. for _, service := range r.Services {
  905. services = append(services, convert.ServiceFromGRPC(*service))
  906. }
  907. return services, nil
  908. }
  909. // imageWithDigestString takes an image such as name or name:tag
  910. // and returns the image pinned to a digest, such as name@sha256:34234...
  911. // Due to the difference between the docker/docker/reference, and the
  912. // docker/distribution/reference packages, we're parsing the image twice.
  913. // As the two packages converge, this function should be simplified.
  914. // TODO(nishanttotla): After the packages converge, the function must
  915. // convert distreference.Named -> distreference.Canonical, and the logic simplified.
  916. func (c *Cluster) imageWithDigestString(ctx context.Context, image string, authConfig *apitypes.AuthConfig) (string, error) {
  917. ref, err := distreference.ParseNamed(image)
  918. if err != nil {
  919. return "", err
  920. }
  921. // only query registry if not a canonical reference (i.e. with digest)
  922. if _, ok := ref.(distreference.Canonical); !ok {
  923. // create a docker/docker/reference Named object because GetRepository needs it
  924. dockerRef, err := reference.ParseNamed(image)
  925. if err != nil {
  926. return "", err
  927. }
  928. dockerRef = reference.WithDefaultTag(dockerRef)
  929. namedTaggedRef, ok := dockerRef.(reference.NamedTagged)
  930. if !ok {
  931. return "", fmt.Errorf("unable to cast image to NamedTagged reference object")
  932. }
  933. repo, _, err := c.config.Backend.GetRepository(ctx, namedTaggedRef, authConfig)
  934. if err != nil {
  935. return "", err
  936. }
  937. dscrptr, err := repo.Tags(ctx).Get(ctx, namedTaggedRef.Tag())
  938. if err != nil {
  939. return "", err
  940. }
  941. namedDigestedRef, err := distreference.WithDigest(distreference.EnsureTagged(ref), dscrptr.Digest)
  942. if err != nil {
  943. return "", err
  944. }
  945. return namedDigestedRef.String(), nil
  946. }
  947. // reference already contains a digest, so just return it
  948. return ref.String(), nil
  949. }
  950. // CreateService creates a new service in a managed swarm cluster.
  951. func (c *Cluster) CreateService(s types.ServiceSpec, encodedAuth string) (*apitypes.ServiceCreateResponse, error) {
  952. c.RLock()
  953. defer c.RUnlock()
  954. if !c.isActiveManager() {
  955. return nil, c.errNoManager()
  956. }
  957. ctx, cancel := c.getRequestContext()
  958. defer cancel()
  959. err := c.populateNetworkID(ctx, c.client, &s)
  960. if err != nil {
  961. return nil, err
  962. }
  963. serviceSpec, err := convert.ServiceSpecToGRPC(s)
  964. if err != nil {
  965. return nil, err
  966. }
  967. ctnr := serviceSpec.Task.GetContainer()
  968. if ctnr == nil {
  969. return nil, fmt.Errorf("service does not use container tasks")
  970. }
  971. if encodedAuth != "" {
  972. ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
  973. }
  974. // retrieve auth config from encoded auth
  975. authConfig := &apitypes.AuthConfig{}
  976. if encodedAuth != "" {
  977. if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil {
  978. logrus.Warnf("invalid authconfig: %v", err)
  979. }
  980. }
  981. resp := &apitypes.ServiceCreateResponse{}
  982. // pin image by digest
  983. if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" {
  984. digestImage, err := c.imageWithDigestString(ctx, ctnr.Image, authConfig)
  985. if err != nil {
  986. logrus.Warnf("unable to pin image %s to digest: %s", ctnr.Image, err.Error())
  987. resp.Warnings = append(resp.Warnings, fmt.Sprintf("unable to pin image %s to digest: %s", ctnr.Image, err.Error()))
  988. } else {
  989. logrus.Debugf("pinning image %s by digest: %s", ctnr.Image, digestImage)
  990. ctnr.Image = digestImage
  991. }
  992. }
  993. r, err := c.client.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
  994. if err != nil {
  995. return nil, err
  996. }
  997. resp.ID = r.Service.ID
  998. return resp, nil
  999. }
  1000. // GetService returns a service based on an ID or name.
  1001. func (c *Cluster) GetService(input string) (types.Service, error) {
  1002. c.RLock()
  1003. defer c.RUnlock()
  1004. if !c.isActiveManager() {
  1005. return types.Service{}, c.errNoManager()
  1006. }
  1007. ctx, cancel := c.getRequestContext()
  1008. defer cancel()
  1009. service, err := getService(ctx, c.client, input)
  1010. if err != nil {
  1011. return types.Service{}, err
  1012. }
  1013. return convert.ServiceFromGRPC(*service), nil
  1014. }
  1015. // UpdateService updates existing service to match new properties.
  1016. func (c *Cluster) UpdateService(serviceIDOrName string, version uint64, spec types.ServiceSpec, encodedAuth string, registryAuthFrom string) (*apitypes.ServiceUpdateResponse, error) {
  1017. c.RLock()
  1018. defer c.RUnlock()
  1019. if !c.isActiveManager() {
  1020. return nil, c.errNoManager()
  1021. }
  1022. ctx, cancel := c.getRequestContext()
  1023. defer cancel()
  1024. err := c.populateNetworkID(ctx, c.client, &spec)
  1025. if err != nil {
  1026. return nil, err
  1027. }
  1028. serviceSpec, err := convert.ServiceSpecToGRPC(spec)
  1029. if err != nil {
  1030. return nil, err
  1031. }
  1032. currentService, err := getService(ctx, c.client, serviceIDOrName)
  1033. if err != nil {
  1034. return nil, err
  1035. }
  1036. newCtnr := serviceSpec.Task.GetContainer()
  1037. if newCtnr == nil {
  1038. return nil, fmt.Errorf("service does not use container tasks")
  1039. }
  1040. if encodedAuth != "" {
  1041. newCtnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
  1042. } else {
  1043. // this is needed because if the encodedAuth isn't being updated then we
  1044. // shouldn't lose it, and continue to use the one that was already present
  1045. var ctnr *swarmapi.ContainerSpec
  1046. switch registryAuthFrom {
  1047. case apitypes.RegistryAuthFromSpec, "":
  1048. ctnr = currentService.Spec.Task.GetContainer()
  1049. case apitypes.RegistryAuthFromPreviousSpec:
  1050. if currentService.PreviousSpec == nil {
  1051. return nil, fmt.Errorf("service does not have a previous spec")
  1052. }
  1053. ctnr = currentService.PreviousSpec.Task.GetContainer()
  1054. default:
  1055. return nil, fmt.Errorf("unsupported registryAuthFromValue")
  1056. }
  1057. if ctnr == nil {
  1058. return nil, fmt.Errorf("service does not use container tasks")
  1059. }
  1060. newCtnr.PullOptions = ctnr.PullOptions
  1061. // update encodedAuth so it can be used to pin image by digest
  1062. if ctnr.PullOptions != nil {
  1063. encodedAuth = ctnr.PullOptions.RegistryAuth
  1064. }
  1065. }
  1066. // retrieve auth config from encoded auth
  1067. authConfig := &apitypes.AuthConfig{}
  1068. if encodedAuth != "" {
  1069. if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil {
  1070. logrus.Warnf("invalid authconfig: %v", err)
  1071. }
  1072. }
  1073. resp := &apitypes.ServiceUpdateResponse{}
  1074. // pin image by digest
  1075. if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" {
  1076. digestImage, err := c.imageWithDigestString(ctx, newCtnr.Image, authConfig)
  1077. if err != nil {
  1078. logrus.Warnf("unable to pin image %s to digest: %s", newCtnr.Image, err.Error())
  1079. resp.Warnings = append(resp.Warnings, fmt.Sprintf("unable to pin image %s to digest: %s", newCtnr.Image, err.Error()))
  1080. } else if newCtnr.Image != digestImage {
  1081. logrus.Debugf("pinning image %s by digest: %s", newCtnr.Image, digestImage)
  1082. newCtnr.Image = digestImage
  1083. }
  1084. }
  1085. _, err = c.client.UpdateService(
  1086. ctx,
  1087. &swarmapi.UpdateServiceRequest{
  1088. ServiceID: currentService.ID,
  1089. Spec: &serviceSpec,
  1090. ServiceVersion: &swarmapi.Version{
  1091. Index: version,
  1092. },
  1093. },
  1094. )
  1095. return resp, err
  1096. }
  1097. // RemoveService removes a service from a managed swarm cluster.
  1098. func (c *Cluster) RemoveService(input string) error {
  1099. c.RLock()
  1100. defer c.RUnlock()
  1101. if !c.isActiveManager() {
  1102. return c.errNoManager()
  1103. }
  1104. ctx, cancel := c.getRequestContext()
  1105. defer cancel()
  1106. service, err := getService(ctx, c.client, input)
  1107. if err != nil {
  1108. return err
  1109. }
  1110. if _, err := c.client.RemoveService(ctx, &swarmapi.RemoveServiceRequest{ServiceID: service.ID}); err != nil {
  1111. return err
  1112. }
  1113. return nil
  1114. }
  1115. // ServiceLogs collects service logs and writes them back to `config.OutStream`
  1116. func (c *Cluster) ServiceLogs(ctx context.Context, input string, config *backend.ContainerLogsConfig, started chan struct{}) error {
  1117. c.RLock()
  1118. if !c.isActiveManager() {
  1119. c.RUnlock()
  1120. return c.errNoManager()
  1121. }
  1122. service, err := getService(ctx, c.client, input)
  1123. if err != nil {
  1124. c.RUnlock()
  1125. return err
  1126. }
  1127. stream, err := c.logs.SubscribeLogs(ctx, &swarmapi.SubscribeLogsRequest{
  1128. Selector: &swarmapi.LogSelector{
  1129. ServiceIDs: []string{service.ID},
  1130. },
  1131. Options: &swarmapi.LogSubscriptionOptions{
  1132. Follow: config.Follow,
  1133. },
  1134. })
  1135. if err != nil {
  1136. c.RUnlock()
  1137. return err
  1138. }
  1139. wf := ioutils.NewWriteFlusher(config.OutStream)
  1140. defer wf.Close()
  1141. close(started)
  1142. wf.Flush()
  1143. outStream := stdcopy.NewStdWriter(wf, stdcopy.Stdout)
  1144. errStream := stdcopy.NewStdWriter(wf, stdcopy.Stderr)
  1145. // Release the lock before starting the stream.
  1146. c.RUnlock()
  1147. for {
  1148. // Check the context before doing anything.
  1149. select {
  1150. case <-ctx.Done():
  1151. return ctx.Err()
  1152. default:
  1153. }
  1154. subscribeMsg, err := stream.Recv()
  1155. if err == io.EOF {
  1156. return nil
  1157. }
  1158. if err != nil {
  1159. return err
  1160. }
  1161. for _, msg := range subscribeMsg.Messages {
  1162. data := []byte{}
  1163. if config.Timestamps {
  1164. ts, err := ptypes.Timestamp(msg.Timestamp)
  1165. if err != nil {
  1166. return err
  1167. }
  1168. data = append(data, []byte(ts.Format(logger.TimeFormat)+" ")...)
  1169. }
  1170. data = append(data, []byte(fmt.Sprintf("%s.node.id=%s,%s.service.id=%s,%s.task.id=%s ",
  1171. contextPrefix, msg.Context.NodeID,
  1172. contextPrefix, msg.Context.ServiceID,
  1173. contextPrefix, msg.Context.TaskID,
  1174. ))...)
  1175. data = append(data, msg.Data...)
  1176. switch msg.Stream {
  1177. case swarmapi.LogStreamStdout:
  1178. outStream.Write(data)
  1179. case swarmapi.LogStreamStderr:
  1180. errStream.Write(data)
  1181. }
  1182. }
  1183. }
  1184. }
  1185. // GetNodes returns a list of all nodes known to a cluster.
  1186. func (c *Cluster) GetNodes(options apitypes.NodeListOptions) ([]types.Node, error) {
  1187. c.RLock()
  1188. defer c.RUnlock()
  1189. if !c.isActiveManager() {
  1190. return nil, c.errNoManager()
  1191. }
  1192. filters, err := newListNodesFilters(options.Filters)
  1193. if err != nil {
  1194. return nil, err
  1195. }
  1196. ctx, cancel := c.getRequestContext()
  1197. defer cancel()
  1198. r, err := c.client.ListNodes(
  1199. ctx,
  1200. &swarmapi.ListNodesRequest{Filters: filters})
  1201. if err != nil {
  1202. return nil, err
  1203. }
  1204. nodes := []types.Node{}
  1205. for _, node := range r.Nodes {
  1206. nodes = append(nodes, convert.NodeFromGRPC(*node))
  1207. }
  1208. return nodes, nil
  1209. }
  1210. // GetNode returns a node based on an ID or name.
  1211. func (c *Cluster) GetNode(input string) (types.Node, error) {
  1212. c.RLock()
  1213. defer c.RUnlock()
  1214. if !c.isActiveManager() {
  1215. return types.Node{}, c.errNoManager()
  1216. }
  1217. ctx, cancel := c.getRequestContext()
  1218. defer cancel()
  1219. node, err := getNode(ctx, c.client, input)
  1220. if err != nil {
  1221. return types.Node{}, err
  1222. }
  1223. return convert.NodeFromGRPC(*node), nil
  1224. }
  1225. // UpdateNode updates existing nodes properties.
  1226. func (c *Cluster) UpdateNode(input string, version uint64, spec types.NodeSpec) error {
  1227. c.RLock()
  1228. defer c.RUnlock()
  1229. if !c.isActiveManager() {
  1230. return c.errNoManager()
  1231. }
  1232. nodeSpec, err := convert.NodeSpecToGRPC(spec)
  1233. if err != nil {
  1234. return err
  1235. }
  1236. ctx, cancel := c.getRequestContext()
  1237. defer cancel()
  1238. currentNode, err := getNode(ctx, c.client, input)
  1239. if err != nil {
  1240. return err
  1241. }
  1242. _, err = c.client.UpdateNode(
  1243. ctx,
  1244. &swarmapi.UpdateNodeRequest{
  1245. NodeID: currentNode.ID,
  1246. Spec: &nodeSpec,
  1247. NodeVersion: &swarmapi.Version{
  1248. Index: version,
  1249. },
  1250. },
  1251. )
  1252. return err
  1253. }
  1254. // RemoveNode removes a node from a cluster
  1255. func (c *Cluster) RemoveNode(input string, force bool) error {
  1256. c.RLock()
  1257. defer c.RUnlock()
  1258. if !c.isActiveManager() {
  1259. return c.errNoManager()
  1260. }
  1261. ctx, cancel := c.getRequestContext()
  1262. defer cancel()
  1263. node, err := getNode(ctx, c.client, input)
  1264. if err != nil {
  1265. return err
  1266. }
  1267. if _, err := c.client.RemoveNode(ctx, &swarmapi.RemoveNodeRequest{NodeID: node.ID, Force: force}); err != nil {
  1268. return err
  1269. }
  1270. return nil
  1271. }
  1272. // GetTasks returns a list of tasks matching the filter options.
  1273. func (c *Cluster) GetTasks(options apitypes.TaskListOptions) ([]types.Task, error) {
  1274. c.RLock()
  1275. defer c.RUnlock()
  1276. if !c.isActiveManager() {
  1277. return nil, c.errNoManager()
  1278. }
  1279. byName := func(filter filters.Args) error {
  1280. if filter.Include("service") {
  1281. serviceFilters := filter.Get("service")
  1282. for _, serviceFilter := range serviceFilters {
  1283. service, err := c.GetService(serviceFilter)
  1284. if err != nil {
  1285. return err
  1286. }
  1287. filter.Del("service", serviceFilter)
  1288. filter.Add("service", service.ID)
  1289. }
  1290. }
  1291. if filter.Include("node") {
  1292. nodeFilters := filter.Get("node")
  1293. for _, nodeFilter := range nodeFilters {
  1294. node, err := c.GetNode(nodeFilter)
  1295. if err != nil {
  1296. return err
  1297. }
  1298. filter.Del("node", nodeFilter)
  1299. filter.Add("node", node.ID)
  1300. }
  1301. }
  1302. return nil
  1303. }
  1304. filters, err := newListTasksFilters(options.Filters, byName)
  1305. if err != nil {
  1306. return nil, err
  1307. }
  1308. ctx, cancel := c.getRequestContext()
  1309. defer cancel()
  1310. r, err := c.client.ListTasks(
  1311. ctx,
  1312. &swarmapi.ListTasksRequest{Filters: filters})
  1313. if err != nil {
  1314. return nil, err
  1315. }
  1316. tasks := []types.Task{}
  1317. for _, task := range r.Tasks {
  1318. if task.Spec.GetContainer() != nil {
  1319. tasks = append(tasks, convert.TaskFromGRPC(*task))
  1320. }
  1321. }
  1322. return tasks, nil
  1323. }
  1324. // GetTask returns a task by an ID.
  1325. func (c *Cluster) GetTask(input string) (types.Task, error) {
  1326. c.RLock()
  1327. defer c.RUnlock()
  1328. if !c.isActiveManager() {
  1329. return types.Task{}, c.errNoManager()
  1330. }
  1331. ctx, cancel := c.getRequestContext()
  1332. defer cancel()
  1333. task, err := getTask(ctx, c.client, input)
  1334. if err != nil {
  1335. return types.Task{}, err
  1336. }
  1337. return convert.TaskFromGRPC(*task), nil
  1338. }
  1339. // GetNetwork returns a cluster network by an ID.
  1340. func (c *Cluster) GetNetwork(input string) (apitypes.NetworkResource, error) {
  1341. c.RLock()
  1342. defer c.RUnlock()
  1343. if !c.isActiveManager() {
  1344. return apitypes.NetworkResource{}, c.errNoManager()
  1345. }
  1346. ctx, cancel := c.getRequestContext()
  1347. defer cancel()
  1348. network, err := getNetwork(ctx, c.client, input)
  1349. if err != nil {
  1350. return apitypes.NetworkResource{}, err
  1351. }
  1352. return convert.BasicNetworkFromGRPC(*network), nil
  1353. }
  1354. // GetNetworks returns all current cluster managed networks.
  1355. func (c *Cluster) GetNetworks() ([]apitypes.NetworkResource, error) {
  1356. c.RLock()
  1357. defer c.RUnlock()
  1358. if !c.isActiveManager() {
  1359. return nil, c.errNoManager()
  1360. }
  1361. ctx, cancel := c.getRequestContext()
  1362. defer cancel()
  1363. r, err := c.client.ListNetworks(ctx, &swarmapi.ListNetworksRequest{})
  1364. if err != nil {
  1365. return nil, err
  1366. }
  1367. var networks []apitypes.NetworkResource
  1368. for _, network := range r.Networks {
  1369. networks = append(networks, convert.BasicNetworkFromGRPC(*network))
  1370. }
  1371. return networks, nil
  1372. }
  1373. func attacherKey(target, containerID string) string {
  1374. return containerID + ":" + target
  1375. }
  1376. // UpdateAttachment signals the attachment config to the attachment
  1377. // waiter who is trying to start or attach the container to the
  1378. // network.
  1379. func (c *Cluster) UpdateAttachment(target, containerID string, config *network.NetworkingConfig) error {
  1380. c.RLock()
  1381. attacher, ok := c.attachers[attacherKey(target, containerID)]
  1382. c.RUnlock()
  1383. if !ok || attacher == nil {
  1384. return fmt.Errorf("could not find attacher for container %s to network %s", containerID, target)
  1385. }
  1386. attacher.attachWaitCh <- config
  1387. close(attacher.attachWaitCh)
  1388. return nil
  1389. }
  1390. // WaitForDetachment waits for the container to stop or detach from
  1391. // the network.
  1392. func (c *Cluster) WaitForDetachment(ctx context.Context, networkName, networkID, taskID, containerID string) error {
  1393. c.RLock()
  1394. attacher, ok := c.attachers[attacherKey(networkName, containerID)]
  1395. if !ok {
  1396. attacher, ok = c.attachers[attacherKey(networkID, containerID)]
  1397. }
  1398. if c.node == nil || c.node.Agent() == nil {
  1399. c.RUnlock()
  1400. return fmt.Errorf("invalid cluster node while waiting for detachment")
  1401. }
  1402. agent := c.node.Agent()
  1403. c.RUnlock()
  1404. if ok && attacher != nil &&
  1405. attacher.detachWaitCh != nil &&
  1406. attacher.attachCompleteCh != nil {
  1407. // Attachment may be in progress still so wait for
  1408. // attachment to complete.
  1409. select {
  1410. case <-attacher.attachCompleteCh:
  1411. case <-ctx.Done():
  1412. return ctx.Err()
  1413. }
  1414. if attacher.taskID == taskID {
  1415. select {
  1416. case <-attacher.detachWaitCh:
  1417. case <-ctx.Done():
  1418. return ctx.Err()
  1419. }
  1420. }
  1421. }
  1422. return agent.ResourceAllocator().DetachNetwork(ctx, taskID)
  1423. }
  1424. // AttachNetwork generates an attachment request towards the manager.
  1425. func (c *Cluster) AttachNetwork(target string, containerID string, addresses []string) (*network.NetworkingConfig, error) {
  1426. aKey := attacherKey(target, containerID)
  1427. c.Lock()
  1428. if c.node == nil || c.node.Agent() == nil {
  1429. c.Unlock()
  1430. return nil, fmt.Errorf("invalid cluster node while attaching to network")
  1431. }
  1432. if attacher, ok := c.attachers[aKey]; ok {
  1433. c.Unlock()
  1434. return attacher.config, nil
  1435. }
  1436. agent := c.node.Agent()
  1437. attachWaitCh := make(chan *network.NetworkingConfig)
  1438. detachWaitCh := make(chan struct{})
  1439. attachCompleteCh := make(chan struct{})
  1440. c.attachers[aKey] = &attacher{
  1441. attachWaitCh: attachWaitCh,
  1442. attachCompleteCh: attachCompleteCh,
  1443. detachWaitCh: detachWaitCh,
  1444. }
  1445. c.Unlock()
  1446. ctx, cancel := c.getRequestContext()
  1447. defer cancel()
  1448. taskID, err := agent.ResourceAllocator().AttachNetwork(ctx, containerID, target, addresses)
  1449. if err != nil {
  1450. c.Lock()
  1451. delete(c.attachers, aKey)
  1452. c.Unlock()
  1453. return nil, fmt.Errorf("Could not attach to network %s: %v", target, err)
  1454. }
  1455. c.Lock()
  1456. c.attachers[aKey].taskID = taskID
  1457. close(attachCompleteCh)
  1458. c.Unlock()
  1459. logrus.Debugf("Successfully attached to network %s with tid %s", target, taskID)
  1460. var config *network.NetworkingConfig
  1461. select {
  1462. case config = <-attachWaitCh:
  1463. case <-ctx.Done():
  1464. return nil, fmt.Errorf("attaching to network failed, make sure your network options are correct and check manager logs: %v", ctx.Err())
  1465. }
  1466. c.Lock()
  1467. c.attachers[aKey].config = config
  1468. c.Unlock()
  1469. return config, nil
  1470. }
  1471. // DetachNetwork unblocks the waiters waiting on WaitForDetachment so
  1472. // that a request to detach can be generated towards the manager.
  1473. func (c *Cluster) DetachNetwork(target string, containerID string) error {
  1474. aKey := attacherKey(target, containerID)
  1475. c.Lock()
  1476. attacher, ok := c.attachers[aKey]
  1477. delete(c.attachers, aKey)
  1478. c.Unlock()
  1479. if !ok {
  1480. return fmt.Errorf("could not find network attachment for container %s to network %s", containerID, target)
  1481. }
  1482. close(attacher.detachWaitCh)
  1483. return nil
  1484. }
  1485. // CreateNetwork creates a new cluster managed network.
  1486. func (c *Cluster) CreateNetwork(s apitypes.NetworkCreateRequest) (string, error) {
  1487. c.RLock()
  1488. defer c.RUnlock()
  1489. if !c.isActiveManager() {
  1490. return "", c.errNoManager()
  1491. }
  1492. if runconfig.IsPreDefinedNetwork(s.Name) {
  1493. err := fmt.Errorf("%s is a pre-defined network and cannot be created", s.Name)
  1494. return "", apierrors.NewRequestForbiddenError(err)
  1495. }
  1496. ctx, cancel := c.getRequestContext()
  1497. defer cancel()
  1498. networkSpec := convert.BasicNetworkCreateToGRPC(s)
  1499. r, err := c.client.CreateNetwork(ctx, &swarmapi.CreateNetworkRequest{Spec: &networkSpec})
  1500. if err != nil {
  1501. return "", err
  1502. }
  1503. return r.Network.ID, nil
  1504. }
  1505. // RemoveNetwork removes a cluster network.
  1506. func (c *Cluster) RemoveNetwork(input string) error {
  1507. c.RLock()
  1508. defer c.RUnlock()
  1509. if !c.isActiveManager() {
  1510. return c.errNoManager()
  1511. }
  1512. ctx, cancel := c.getRequestContext()
  1513. defer cancel()
  1514. network, err := getNetwork(ctx, c.client, input)
  1515. if err != nil {
  1516. return err
  1517. }
  1518. if _, err := c.client.RemoveNetwork(ctx, &swarmapi.RemoveNetworkRequest{NetworkID: network.ID}); err != nil {
  1519. return err
  1520. }
  1521. return nil
  1522. }
  1523. func (c *Cluster) populateNetworkID(ctx context.Context, client swarmapi.ControlClient, s *types.ServiceSpec) error {
  1524. // Always prefer NetworkAttachmentConfigs from TaskTemplate
  1525. // but fallback to service spec for backward compatibility
  1526. networks := s.TaskTemplate.Networks
  1527. if len(networks) == 0 {
  1528. networks = s.Networks
  1529. }
  1530. for i, n := range networks {
  1531. apiNetwork, err := getNetwork(ctx, client, n.Target)
  1532. if err != nil {
  1533. if ln, _ := c.config.Backend.FindNetwork(n.Target); ln != nil && !ln.Info().Dynamic() {
  1534. err = fmt.Errorf("The network %s cannot be used with services. Only networks scoped to the swarm can be used, such as those created with the overlay driver.", ln.Name())
  1535. return apierrors.NewRequestForbiddenError(err)
  1536. }
  1537. return err
  1538. }
  1539. networks[i].Target = apiNetwork.ID
  1540. }
  1541. return nil
  1542. }
  1543. // Cleanup stops active swarm node. This is run before daemon shutdown.
  1544. func (c *Cluster) Cleanup() {
  1545. c.Lock()
  1546. node := c.node
  1547. if node == nil {
  1548. c.Unlock()
  1549. return
  1550. }
  1551. defer c.Unlock()
  1552. if c.isActiveManager() {
  1553. active, reachable, unreachable, err := c.managerStats()
  1554. if err == nil {
  1555. singlenode := active && isLastManager(reachable, unreachable)
  1556. if active && !singlenode && removingManagerCausesLossOfQuorum(reachable, unreachable) {
  1557. logrus.Errorf("Leaving cluster with %v managers left out of %v. Raft quorum will be lost.", reachable-1, reachable+unreachable)
  1558. }
  1559. }
  1560. }
  1561. c.stopNode()
  1562. }
  1563. func (c *Cluster) managerStats() (current bool, reachable int, unreachable int, err error) {
  1564. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  1565. defer cancel()
  1566. nodes, err := c.client.ListNodes(ctx, &swarmapi.ListNodesRequest{})
  1567. if err != nil {
  1568. return false, 0, 0, err
  1569. }
  1570. for _, n := range nodes.Nodes {
  1571. if n.ManagerStatus != nil {
  1572. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_REACHABLE {
  1573. reachable++
  1574. if n.ID == c.node.NodeID() {
  1575. current = true
  1576. }
  1577. }
  1578. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_UNREACHABLE {
  1579. unreachable++
  1580. }
  1581. }
  1582. }
  1583. return
  1584. }
  1585. func validateAndSanitizeInitRequest(req *types.InitRequest) error {
  1586. var err error
  1587. req.ListenAddr, err = validateAddr(req.ListenAddr)
  1588. if err != nil {
  1589. return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
  1590. }
  1591. if req.Spec.Annotations.Name == "" {
  1592. req.Spec.Annotations.Name = "default"
  1593. } else if req.Spec.Annotations.Name != "default" {
  1594. return errors.New(`swarm spec must be named "default"`)
  1595. }
  1596. return nil
  1597. }
  1598. func validateAndSanitizeJoinRequest(req *types.JoinRequest) error {
  1599. var err error
  1600. req.ListenAddr, err = validateAddr(req.ListenAddr)
  1601. if err != nil {
  1602. return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
  1603. }
  1604. if len(req.RemoteAddrs) == 0 {
  1605. return fmt.Errorf("at least 1 RemoteAddr is required to join")
  1606. }
  1607. for i := range req.RemoteAddrs {
  1608. req.RemoteAddrs[i], err = validateAddr(req.RemoteAddrs[i])
  1609. if err != nil {
  1610. return fmt.Errorf("invalid remoteAddr %q: %v", req.RemoteAddrs[i], err)
  1611. }
  1612. }
  1613. return nil
  1614. }
  1615. func validateAddr(addr string) (string, error) {
  1616. if addr == "" {
  1617. return addr, fmt.Errorf("invalid empty address")
  1618. }
  1619. newaddr, err := opts.ParseTCPAddr(addr, defaultAddr)
  1620. if err != nil {
  1621. return addr, nil
  1622. }
  1623. return strings.TrimPrefix(newaddr, "tcp://"), nil
  1624. }
  1625. func initClusterSpec(node *node, spec types.Spec) error {
  1626. ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
  1627. for conn := range node.ListenControlSocket(ctx) {
  1628. if ctx.Err() != nil {
  1629. return ctx.Err()
  1630. }
  1631. if conn != nil {
  1632. client := swarmapi.NewControlClient(conn)
  1633. var cluster *swarmapi.Cluster
  1634. for i := 0; ; i++ {
  1635. lcr, err := client.ListClusters(ctx, &swarmapi.ListClustersRequest{})
  1636. if err != nil {
  1637. return fmt.Errorf("error on listing clusters: %v", err)
  1638. }
  1639. if len(lcr.Clusters) == 0 {
  1640. if i < 10 {
  1641. time.Sleep(200 * time.Millisecond)
  1642. continue
  1643. }
  1644. return fmt.Errorf("empty list of clusters was returned")
  1645. }
  1646. cluster = lcr.Clusters[0]
  1647. break
  1648. }
  1649. // In init, we take the initial default values from swarmkit, and merge
  1650. // any non nil or 0 value from spec to GRPC spec. This will leave the
  1651. // default value alone.
  1652. // Note that this is different from Update(), as in Update() we expect
  1653. // user to specify the complete spec of the cluster (as they already know
  1654. // the existing one and knows which field to update)
  1655. clusterSpec, err := convert.MergeSwarmSpecToGRPC(spec, cluster.Spec)
  1656. if err != nil {
  1657. return fmt.Errorf("error updating cluster settings: %v", err)
  1658. }
  1659. _, err = client.UpdateCluster(ctx, &swarmapi.UpdateClusterRequest{
  1660. ClusterID: cluster.ID,
  1661. ClusterVersion: &cluster.Meta.Version,
  1662. Spec: &clusterSpec,
  1663. })
  1664. if err != nil {
  1665. return fmt.Errorf("error updating cluster settings: %v", err)
  1666. }
  1667. return nil
  1668. }
  1669. }
  1670. return ctx.Err()
  1671. }
  1672. func detectLockedError(err error) error {
  1673. if err == swarmnode.ErrInvalidUnlockKey {
  1674. return errors.WithStack(ErrSwarmLocked)
  1675. }
  1676. return err
  1677. }