cluster.go 51 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943
  1. package cluster
  2. import (
  3. "crypto/x509"
  4. "encoding/base64"
  5. "encoding/json"
  6. "fmt"
  7. "io"
  8. "io/ioutil"
  9. "net"
  10. "os"
  11. "path/filepath"
  12. "runtime"
  13. "strings"
  14. "sync"
  15. "time"
  16. "github.com/Sirupsen/logrus"
  17. distreference "github.com/docker/distribution/reference"
  18. apierrors "github.com/docker/docker/api/errors"
  19. apitypes "github.com/docker/docker/api/types"
  20. "github.com/docker/docker/api/types/backend"
  21. "github.com/docker/docker/api/types/filters"
  22. "github.com/docker/docker/api/types/network"
  23. types "github.com/docker/docker/api/types/swarm"
  24. "github.com/docker/docker/daemon/cluster/convert"
  25. executorpkg "github.com/docker/docker/daemon/cluster/executor"
  26. "github.com/docker/docker/daemon/cluster/executor/container"
  27. "github.com/docker/docker/daemon/logger"
  28. "github.com/docker/docker/opts"
  29. "github.com/docker/docker/pkg/ioutils"
  30. "github.com/docker/docker/pkg/signal"
  31. "github.com/docker/docker/pkg/stdcopy"
  32. "github.com/docker/docker/reference"
  33. "github.com/docker/docker/runconfig"
  34. swarmapi "github.com/docker/swarmkit/api"
  35. "github.com/docker/swarmkit/manager/encryption"
  36. swarmnode "github.com/docker/swarmkit/node"
  37. "github.com/docker/swarmkit/protobuf/ptypes"
  38. "github.com/pkg/errors"
  39. "golang.org/x/net/context"
  40. "google.golang.org/grpc"
  41. )
  42. const swarmDirName = "swarm"
  43. const controlSocket = "control.sock"
  44. const swarmConnectTimeout = 20 * time.Second
  45. const swarmRequestTimeout = 20 * time.Second
  46. const stateFile = "docker-state.json"
  47. const defaultAddr = "0.0.0.0:2377"
  48. const (
  49. initialReconnectDelay = 100 * time.Millisecond
  50. maxReconnectDelay = 30 * time.Second
  51. contextPrefix = "com.docker.swarm"
  52. )
  53. // ErrNoSwarm is returned on leaving a cluster that was never initialized
  54. var ErrNoSwarm = fmt.Errorf("This node is not part of a swarm")
  55. // ErrSwarmExists is returned on initialize or join request for a cluster that has already been activated
  56. var ErrSwarmExists = fmt.Errorf("This node is already part of a swarm. Use \"docker swarm leave\" to leave this swarm and join another one.")
  57. // ErrPendingSwarmExists is returned on initialize or join request for a cluster that is already processing a similar request but has not succeeded yet.
  58. var ErrPendingSwarmExists = fmt.Errorf("This node is processing an existing join request that has not succeeded yet. Use \"docker swarm leave\" to cancel the current request.")
  59. // ErrSwarmJoinTimeoutReached is returned when cluster join could not complete before timeout was reached.
  60. var ErrSwarmJoinTimeoutReached = fmt.Errorf("Timeout was reached before node was joined. The attempt to join the swarm will continue in the background. Use the \"docker info\" command to see the current swarm status of your node.")
  61. // ErrSwarmLocked is returned if the swarm is encrypted and needs a key to unlock it.
  62. var ErrSwarmLocked = fmt.Errorf("Swarm is encrypted and needs to be unlocked before it can be used. Please use \"docker swarm unlock\" to unlock it.")
  63. // ErrSwarmCertificatesExpired is returned if docker was not started for the whole validity period and they had no chance to renew automatically.
  64. var ErrSwarmCertificatesExpired = errors.New("Swarm certificates have expired. To replace them, leave the swarm and join again.")
  65. // NetworkSubnetsProvider exposes functions for retrieving the subnets
  66. // of networks managed by Docker, so they can be filtered.
  67. type NetworkSubnetsProvider interface {
  68. V4Subnets() []net.IPNet
  69. V6Subnets() []net.IPNet
  70. }
  71. // Config provides values for Cluster.
  72. type Config struct {
  73. Root string
  74. Name string
  75. Backend executorpkg.Backend
  76. NetworkSubnetsProvider NetworkSubnetsProvider
  77. // DefaultAdvertiseAddr is the default host/IP or network interface to use
  78. // if no AdvertiseAddr value is specified.
  79. DefaultAdvertiseAddr string
  80. // path to store runtime state, such as the swarm control socket
  81. RuntimeRoot string
  82. }
  83. // Cluster provides capabilities to participate in a cluster as a worker or a
  84. // manager.
  85. type Cluster struct {
  86. sync.RWMutex
  87. *node
  88. root string
  89. runtimeRoot string
  90. config Config
  91. configEvent chan struct{} // todo: make this array and goroutine safe
  92. actualLocalAddr string // after resolution, not persisted
  93. stop bool
  94. err error
  95. cancelDelay func()
  96. attachers map[string]*attacher
  97. locked bool
  98. lastNodeConfig *nodeStartConfig
  99. }
  100. // attacher manages the in-memory attachment state of a container
  101. // attachment to a global scope network managed by swarm manager. It
  102. // helps in identifying the attachment ID via the taskID and the
  103. // corresponding attachment configuration obtained from the manager.
  104. type attacher struct {
  105. taskID string
  106. config *network.NetworkingConfig
  107. attachWaitCh chan *network.NetworkingConfig
  108. attachCompleteCh chan struct{}
  109. detachWaitCh chan struct{}
  110. }
  111. type node struct {
  112. *swarmnode.Node
  113. done chan struct{}
  114. ready bool
  115. conn *grpc.ClientConn
  116. client swarmapi.ControlClient
  117. logs swarmapi.LogsClient
  118. reconnectDelay time.Duration
  119. config nodeStartConfig
  120. }
  121. // nodeStartConfig holds configuration needed to start a new node. Exported
  122. // fields of this structure are saved to disk in json. Unexported fields
  123. // contain data that shouldn't be persisted between daemon reloads.
  124. type nodeStartConfig struct {
  125. // LocalAddr is this machine's local IP or hostname, if specified.
  126. LocalAddr string
  127. // RemoteAddr is the address that was given to "swarm join". It is used
  128. // to find LocalAddr if necessary.
  129. RemoteAddr string
  130. // ListenAddr is the address we bind to, including a port.
  131. ListenAddr string
  132. // AdvertiseAddr is the address other nodes should connect to,
  133. // including a port.
  134. AdvertiseAddr string
  135. joinAddr string
  136. forceNewCluster bool
  137. joinToken string
  138. lockKey []byte
  139. autolock bool
  140. }
  141. // New creates a new Cluster instance using provided config.
  142. func New(config Config) (*Cluster, error) {
  143. root := filepath.Join(config.Root, swarmDirName)
  144. if err := os.MkdirAll(root, 0700); err != nil {
  145. return nil, err
  146. }
  147. if config.RuntimeRoot == "" {
  148. config.RuntimeRoot = root
  149. }
  150. if err := os.MkdirAll(config.RuntimeRoot, 0700); err != nil {
  151. return nil, err
  152. }
  153. c := &Cluster{
  154. root: root,
  155. config: config,
  156. configEvent: make(chan struct{}, 10),
  157. runtimeRoot: config.RuntimeRoot,
  158. attachers: make(map[string]*attacher),
  159. }
  160. nodeConfig, err := c.loadState()
  161. if err != nil {
  162. if os.IsNotExist(err) {
  163. return c, nil
  164. }
  165. return nil, err
  166. }
  167. n, err := c.startNewNode(*nodeConfig)
  168. if err != nil {
  169. return nil, err
  170. }
  171. select {
  172. case <-time.After(swarmConnectTimeout):
  173. logrus.Error("swarm component could not be started before timeout was reached")
  174. case <-n.Ready():
  175. case <-n.done:
  176. if errors.Cause(c.err) == ErrSwarmLocked {
  177. return c, nil
  178. }
  179. if err, ok := errors.Cause(c.err).(x509.CertificateInvalidError); ok && err.Reason == x509.Expired {
  180. c.err = ErrSwarmCertificatesExpired
  181. return c, nil
  182. }
  183. return nil, fmt.Errorf("swarm component could not be started: %v", c.err)
  184. }
  185. go c.reconnectOnFailure(n)
  186. return c, nil
  187. }
  188. func (c *Cluster) loadState() (*nodeStartConfig, error) {
  189. dt, err := ioutil.ReadFile(filepath.Join(c.root, stateFile))
  190. if err != nil {
  191. return nil, err
  192. }
  193. // missing certificate means no actual state to restore from
  194. if _, err := os.Stat(filepath.Join(c.root, "certificates/swarm-node.crt")); err != nil {
  195. if os.IsNotExist(err) {
  196. c.clearState()
  197. }
  198. return nil, err
  199. }
  200. var st nodeStartConfig
  201. if err := json.Unmarshal(dt, &st); err != nil {
  202. return nil, err
  203. }
  204. return &st, nil
  205. }
  206. func (c *Cluster) saveState(config nodeStartConfig) error {
  207. dt, err := json.Marshal(config)
  208. if err != nil {
  209. return err
  210. }
  211. return ioutils.AtomicWriteFile(filepath.Join(c.root, stateFile), dt, 0600)
  212. }
  213. func (c *Cluster) reconnectOnFailure(n *node) {
  214. for {
  215. <-n.done
  216. c.Lock()
  217. if c.stop || c.node != nil {
  218. c.Unlock()
  219. return
  220. }
  221. n.reconnectDelay *= 2
  222. if n.reconnectDelay > maxReconnectDelay {
  223. n.reconnectDelay = maxReconnectDelay
  224. }
  225. logrus.Warnf("Restarting swarm in %.2f seconds", n.reconnectDelay.Seconds())
  226. delayCtx, cancel := context.WithTimeout(context.Background(), n.reconnectDelay)
  227. c.cancelDelay = cancel
  228. c.Unlock()
  229. <-delayCtx.Done()
  230. if delayCtx.Err() != context.DeadlineExceeded {
  231. return
  232. }
  233. c.Lock()
  234. if c.node != nil {
  235. c.Unlock()
  236. return
  237. }
  238. var err error
  239. config := n.config
  240. config.RemoteAddr = c.getRemoteAddress()
  241. config.joinAddr = config.RemoteAddr
  242. n, err = c.startNewNode(config)
  243. if err != nil {
  244. c.err = err
  245. close(n.done)
  246. }
  247. c.Unlock()
  248. }
  249. }
  250. func (c *Cluster) startNewNode(conf nodeStartConfig) (*node, error) {
  251. if err := c.config.Backend.IsSwarmCompatible(); err != nil {
  252. return nil, err
  253. }
  254. actualLocalAddr := conf.LocalAddr
  255. if actualLocalAddr == "" {
  256. // If localAddr was not specified, resolve it automatically
  257. // based on the route to joinAddr. localAddr can only be left
  258. // empty on "join".
  259. listenHost, _, err := net.SplitHostPort(conf.ListenAddr)
  260. if err != nil {
  261. return nil, fmt.Errorf("could not parse listen address: %v", err)
  262. }
  263. listenAddrIP := net.ParseIP(listenHost)
  264. if listenAddrIP == nil || !listenAddrIP.IsUnspecified() {
  265. actualLocalAddr = listenHost
  266. } else {
  267. if conf.RemoteAddr == "" {
  268. // Should never happen except using swarms created by
  269. // old versions that didn't save remoteAddr.
  270. conf.RemoteAddr = "8.8.8.8:53"
  271. }
  272. conn, err := net.Dial("udp", conf.RemoteAddr)
  273. if err != nil {
  274. return nil, fmt.Errorf("could not find local IP address: %v", err)
  275. }
  276. localHostPort := conn.LocalAddr().String()
  277. actualLocalAddr, _, _ = net.SplitHostPort(localHostPort)
  278. conn.Close()
  279. }
  280. }
  281. var control string
  282. if runtime.GOOS == "windows" {
  283. control = `\\.\pipe\` + controlSocket
  284. } else {
  285. control = filepath.Join(c.runtimeRoot, controlSocket)
  286. }
  287. c.node = nil
  288. c.cancelDelay = nil
  289. c.stop = false
  290. n, err := swarmnode.New(&swarmnode.Config{
  291. Hostname: c.config.Name,
  292. ForceNewCluster: conf.forceNewCluster,
  293. ListenControlAPI: control,
  294. ListenRemoteAPI: conf.ListenAddr,
  295. AdvertiseRemoteAPI: conf.AdvertiseAddr,
  296. JoinAddr: conf.joinAddr,
  297. StateDir: c.root,
  298. JoinToken: conf.joinToken,
  299. Executor: container.NewExecutor(c.config.Backend),
  300. HeartbeatTick: 1,
  301. ElectionTick: 3,
  302. UnlockKey: conf.lockKey,
  303. AutoLockManagers: conf.autolock,
  304. })
  305. if err != nil {
  306. return nil, err
  307. }
  308. ctx := context.Background()
  309. if err := n.Start(ctx); err != nil {
  310. return nil, err
  311. }
  312. node := &node{
  313. Node: n,
  314. done: make(chan struct{}),
  315. reconnectDelay: initialReconnectDelay,
  316. config: conf,
  317. }
  318. c.node = node
  319. c.actualLocalAddr = actualLocalAddr // not saved
  320. c.saveState(conf)
  321. c.config.Backend.SetClusterProvider(c)
  322. go func() {
  323. err := detectLockedError(n.Err(ctx))
  324. if err != nil {
  325. logrus.Errorf("cluster exited with error: %v", err)
  326. }
  327. c.Lock()
  328. c.node = nil
  329. c.err = err
  330. if errors.Cause(err) == ErrSwarmLocked {
  331. c.locked = true
  332. confClone := conf
  333. c.lastNodeConfig = &confClone
  334. }
  335. c.Unlock()
  336. close(node.done)
  337. }()
  338. go func() {
  339. select {
  340. case <-n.Ready():
  341. c.Lock()
  342. node.ready = true
  343. c.err = nil
  344. c.Unlock()
  345. case <-ctx.Done():
  346. }
  347. c.configEvent <- struct{}{}
  348. }()
  349. go func() {
  350. for conn := range n.ListenControlSocket(ctx) {
  351. c.Lock()
  352. if node.conn != conn {
  353. if conn == nil {
  354. node.client = nil
  355. node.logs = nil
  356. } else {
  357. node.client = swarmapi.NewControlClient(conn)
  358. node.logs = swarmapi.NewLogsClient(conn)
  359. }
  360. }
  361. node.conn = conn
  362. c.Unlock()
  363. c.configEvent <- struct{}{}
  364. }
  365. }()
  366. return node, nil
  367. }
  368. // Init initializes new cluster from user provided request.
  369. func (c *Cluster) Init(req types.InitRequest) (string, error) {
  370. c.Lock()
  371. if c.swarmExists() {
  372. if !req.ForceNewCluster {
  373. c.Unlock()
  374. return "", ErrSwarmExists
  375. }
  376. if err := c.stopNode(); err != nil {
  377. c.Unlock()
  378. return "", err
  379. }
  380. }
  381. if err := validateAndSanitizeInitRequest(&req); err != nil {
  382. c.Unlock()
  383. return "", err
  384. }
  385. listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
  386. if err != nil {
  387. c.Unlock()
  388. return "", err
  389. }
  390. advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
  391. if err != nil {
  392. c.Unlock()
  393. return "", err
  394. }
  395. localAddr := listenHost
  396. // If the local address is undetermined, the advertise address
  397. // will be used as local address, if it belongs to this system.
  398. // If the advertise address is not local, then we try to find
  399. // a system address to use as local address. If this fails,
  400. // we give up and ask user to pass the listen address.
  401. if net.ParseIP(localAddr).IsUnspecified() {
  402. advertiseIP := net.ParseIP(advertiseHost)
  403. found := false
  404. for _, systemIP := range listSystemIPs() {
  405. if systemIP.Equal(advertiseIP) {
  406. localAddr = advertiseIP.String()
  407. found = true
  408. break
  409. }
  410. }
  411. if !found {
  412. ip, err := c.resolveSystemAddr()
  413. if err != nil {
  414. c.Unlock()
  415. logrus.Warnf("Could not find a local address: %v", err)
  416. return "", errMustSpecifyListenAddr
  417. }
  418. localAddr = ip.String()
  419. }
  420. }
  421. // todo: check current state existing
  422. n, err := c.startNewNode(nodeStartConfig{
  423. forceNewCluster: req.ForceNewCluster,
  424. autolock: req.AutoLockManagers,
  425. LocalAddr: localAddr,
  426. ListenAddr: net.JoinHostPort(listenHost, listenPort),
  427. AdvertiseAddr: net.JoinHostPort(advertiseHost, advertisePort),
  428. })
  429. if err != nil {
  430. c.Unlock()
  431. return "", err
  432. }
  433. c.Unlock()
  434. select {
  435. case <-n.Ready():
  436. if err := initClusterSpec(n, req.Spec); err != nil {
  437. return "", err
  438. }
  439. go c.reconnectOnFailure(n)
  440. return n.NodeID(), nil
  441. case <-n.done:
  442. c.RLock()
  443. defer c.RUnlock()
  444. if !req.ForceNewCluster { // if failure on first attempt don't keep state
  445. if err := c.clearState(); err != nil {
  446. return "", err
  447. }
  448. }
  449. return "", c.err
  450. }
  451. }
  452. // Join makes current Cluster part of an existing swarm cluster.
  453. func (c *Cluster) Join(req types.JoinRequest) error {
  454. c.Lock()
  455. if c.swarmExists() {
  456. c.Unlock()
  457. return ErrSwarmExists
  458. }
  459. if err := validateAndSanitizeJoinRequest(&req); err != nil {
  460. c.Unlock()
  461. return err
  462. }
  463. listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
  464. if err != nil {
  465. c.Unlock()
  466. return err
  467. }
  468. var advertiseAddr string
  469. if req.AdvertiseAddr != "" {
  470. advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
  471. // For joining, we don't need to provide an advertise address,
  472. // since the remote side can detect it.
  473. if err == nil {
  474. advertiseAddr = net.JoinHostPort(advertiseHost, advertisePort)
  475. }
  476. }
  477. // todo: check current state existing
  478. n, err := c.startNewNode(nodeStartConfig{
  479. RemoteAddr: req.RemoteAddrs[0],
  480. ListenAddr: net.JoinHostPort(listenHost, listenPort),
  481. AdvertiseAddr: advertiseAddr,
  482. joinAddr: req.RemoteAddrs[0],
  483. joinToken: req.JoinToken,
  484. })
  485. if err != nil {
  486. c.Unlock()
  487. return err
  488. }
  489. c.Unlock()
  490. select {
  491. case <-time.After(swarmConnectTimeout):
  492. // attempt to connect will continue in background, but reconnect only if it didn't fail
  493. go func() {
  494. select {
  495. case <-n.Ready():
  496. c.reconnectOnFailure(n)
  497. case <-n.done:
  498. logrus.Errorf("failed to join the cluster: %+v", c.err)
  499. }
  500. }()
  501. return ErrSwarmJoinTimeoutReached
  502. case <-n.Ready():
  503. go c.reconnectOnFailure(n)
  504. return nil
  505. case <-n.done:
  506. c.RLock()
  507. defer c.RUnlock()
  508. return c.err
  509. }
  510. }
  511. // GetUnlockKey returns the unlock key for the swarm.
  512. func (c *Cluster) GetUnlockKey() (string, error) {
  513. c.RLock()
  514. defer c.RUnlock()
  515. if !c.isActiveManager() {
  516. return "", c.errNoManager()
  517. }
  518. ctx, cancel := c.getRequestContext()
  519. defer cancel()
  520. client := swarmapi.NewCAClient(c.conn)
  521. r, err := client.GetUnlockKey(ctx, &swarmapi.GetUnlockKeyRequest{})
  522. if err != nil {
  523. return "", err
  524. }
  525. if len(r.UnlockKey) == 0 {
  526. // no key
  527. return "", nil
  528. }
  529. return encryption.HumanReadableKey(r.UnlockKey), nil
  530. }
  531. // UnlockSwarm provides a key to decrypt data that is encrypted at rest.
  532. func (c *Cluster) UnlockSwarm(req types.UnlockRequest) error {
  533. c.RLock()
  534. if !c.isActiveManager() {
  535. if err := c.errNoManager(); err != ErrSwarmLocked {
  536. c.RUnlock()
  537. return err
  538. }
  539. }
  540. c.RUnlock()
  541. key, err := encryption.ParseHumanReadableKey(req.UnlockKey)
  542. if err != nil {
  543. return err
  544. }
  545. c.Lock()
  546. if c.node != nil || c.locked != true {
  547. c.Unlock()
  548. return errors.New("swarm is not locked")
  549. }
  550. config := *c.lastNodeConfig
  551. config.lockKey = key
  552. n, err := c.startNewNode(config)
  553. if err != nil {
  554. c.Unlock()
  555. return err
  556. }
  557. c.Unlock()
  558. select {
  559. case <-n.Ready():
  560. case <-n.done:
  561. if errors.Cause(c.err) == ErrSwarmLocked {
  562. return errors.New("swarm could not be unlocked: invalid key provided")
  563. }
  564. return fmt.Errorf("swarm component could not be started: %v", c.err)
  565. }
  566. go c.reconnectOnFailure(n)
  567. return nil
  568. }
  569. // stopNode is a helper that stops the active c.node and waits until it has
  570. // shut down. Call while keeping the cluster lock.
  571. func (c *Cluster) stopNode() error {
  572. if c.node == nil {
  573. return nil
  574. }
  575. c.stop = true
  576. if c.cancelDelay != nil {
  577. c.cancelDelay()
  578. c.cancelDelay = nil
  579. }
  580. node := c.node
  581. ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
  582. defer cancel()
  583. // TODO: can't hold lock on stop because it calls back to network
  584. c.Unlock()
  585. defer c.Lock()
  586. if err := node.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") {
  587. return err
  588. }
  589. <-node.done
  590. return nil
  591. }
  592. func removingManagerCausesLossOfQuorum(reachable, unreachable int) bool {
  593. return reachable-2 <= unreachable
  594. }
  595. func isLastManager(reachable, unreachable int) bool {
  596. return reachable == 1 && unreachable == 0
  597. }
  598. // Leave shuts down Cluster and removes current state.
  599. func (c *Cluster) Leave(force bool) error {
  600. c.Lock()
  601. node := c.node
  602. if node == nil {
  603. if c.locked {
  604. c.locked = false
  605. c.lastNodeConfig = nil
  606. c.Unlock()
  607. } else if c.err == ErrSwarmCertificatesExpired {
  608. c.err = nil
  609. c.Unlock()
  610. } else {
  611. c.Unlock()
  612. return ErrNoSwarm
  613. }
  614. } else {
  615. if node.Manager() != nil && !force {
  616. msg := "You are attempting to leave the swarm on a node that is participating as a manager. "
  617. if c.isActiveManager() {
  618. active, reachable, unreachable, err := c.managerStats()
  619. if err == nil {
  620. if active && removingManagerCausesLossOfQuorum(reachable, unreachable) {
  621. if isLastManager(reachable, unreachable) {
  622. msg += "Removing the last manager erases all current state of the swarm. Use `--force` to ignore this message. "
  623. c.Unlock()
  624. return fmt.Errorf(msg)
  625. }
  626. msg += fmt.Sprintf("Removing this node leaves %v managers out of %v. Without a Raft quorum your swarm will be inaccessible. ", reachable-1, reachable+unreachable)
  627. }
  628. }
  629. } else {
  630. msg += "Doing so may lose the consensus of your cluster. "
  631. }
  632. msg += "The only way to restore a swarm that has lost consensus is to reinitialize it with `--force-new-cluster`. Use `--force` to suppress this message."
  633. c.Unlock()
  634. return fmt.Errorf(msg)
  635. }
  636. if err := c.stopNode(); err != nil {
  637. logrus.Errorf("failed to shut down cluster node: %v", err)
  638. signal.DumpStacks("")
  639. c.Unlock()
  640. return err
  641. }
  642. c.Unlock()
  643. if nodeID := node.NodeID(); nodeID != "" {
  644. nodeContainers, err := c.listContainerForNode(nodeID)
  645. if err != nil {
  646. return err
  647. }
  648. for _, id := range nodeContainers {
  649. if err := c.config.Backend.ContainerRm(id, &apitypes.ContainerRmConfig{ForceRemove: true}); err != nil {
  650. logrus.Errorf("error removing %v: %v", id, err)
  651. }
  652. }
  653. }
  654. }
  655. c.configEvent <- struct{}{}
  656. // todo: cleanup optional?
  657. if err := c.clearState(); err != nil {
  658. return err
  659. }
  660. return nil
  661. }
  662. func (c *Cluster) listContainerForNode(nodeID string) ([]string, error) {
  663. var ids []string
  664. filters := filters.NewArgs()
  665. filters.Add("label", fmt.Sprintf("com.docker.swarm.node.id=%s", nodeID))
  666. containers, err := c.config.Backend.Containers(&apitypes.ContainerListOptions{
  667. Filters: filters,
  668. })
  669. if err != nil {
  670. return []string{}, err
  671. }
  672. for _, c := range containers {
  673. ids = append(ids, c.ID)
  674. }
  675. return ids, nil
  676. }
  677. func (c *Cluster) clearState() error {
  678. // todo: backup this data instead of removing?
  679. if err := os.RemoveAll(c.root); err != nil {
  680. return err
  681. }
  682. if err := os.MkdirAll(c.root, 0700); err != nil {
  683. return err
  684. }
  685. c.config.Backend.SetClusterProvider(nil)
  686. return nil
  687. }
  688. func (c *Cluster) getRequestContext() (context.Context, func()) { // TODO: not needed when requests don't block on qourum lost
  689. return context.WithTimeout(context.Background(), swarmRequestTimeout)
  690. }
  691. // Inspect retrieves the configuration properties of a managed swarm cluster.
  692. func (c *Cluster) Inspect() (types.Swarm, error) {
  693. c.RLock()
  694. defer c.RUnlock()
  695. if !c.isActiveManager() {
  696. return types.Swarm{}, c.errNoManager()
  697. }
  698. ctx, cancel := c.getRequestContext()
  699. defer cancel()
  700. swarm, err := getSwarm(ctx, c.client)
  701. if err != nil {
  702. return types.Swarm{}, err
  703. }
  704. return convert.SwarmFromGRPC(*swarm), nil
  705. }
  706. // Update updates configuration of a managed swarm cluster.
  707. func (c *Cluster) Update(version uint64, spec types.Spec, flags types.UpdateFlags) error {
  708. c.RLock()
  709. defer c.RUnlock()
  710. if !c.isActiveManager() {
  711. return c.errNoManager()
  712. }
  713. ctx, cancel := c.getRequestContext()
  714. defer cancel()
  715. swarm, err := getSwarm(ctx, c.client)
  716. if err != nil {
  717. return err
  718. }
  719. // In update, client should provide the complete spec of the swarm, including
  720. // Name and Labels. If a field is specified with 0 or nil, then the default value
  721. // will be used to swarmkit.
  722. clusterSpec, err := convert.SwarmSpecToGRPC(spec)
  723. if err != nil {
  724. return err
  725. }
  726. _, err = c.client.UpdateCluster(
  727. ctx,
  728. &swarmapi.UpdateClusterRequest{
  729. ClusterID: swarm.ID,
  730. Spec: &clusterSpec,
  731. ClusterVersion: &swarmapi.Version{
  732. Index: version,
  733. },
  734. Rotation: swarmapi.KeyRotation{
  735. WorkerJoinToken: flags.RotateWorkerToken,
  736. ManagerJoinToken: flags.RotateManagerToken,
  737. ManagerUnlockKey: flags.RotateManagerUnlockKey,
  738. },
  739. },
  740. )
  741. return err
  742. }
  743. // IsManager returns true if Cluster is participating as a manager.
  744. func (c *Cluster) IsManager() bool {
  745. c.RLock()
  746. defer c.RUnlock()
  747. return c.isActiveManager()
  748. }
  749. // IsAgent returns true if Cluster is participating as a worker/agent.
  750. func (c *Cluster) IsAgent() bool {
  751. c.RLock()
  752. defer c.RUnlock()
  753. return c.node != nil && c.ready
  754. }
  755. // GetLocalAddress returns the local address.
  756. func (c *Cluster) GetLocalAddress() string {
  757. c.RLock()
  758. defer c.RUnlock()
  759. return c.actualLocalAddr
  760. }
  761. // GetListenAddress returns the listen address.
  762. func (c *Cluster) GetListenAddress() string {
  763. c.RLock()
  764. defer c.RUnlock()
  765. if c.node != nil {
  766. return c.node.config.ListenAddr
  767. }
  768. return ""
  769. }
  770. // GetAdvertiseAddress returns the remotely reachable address of this node.
  771. func (c *Cluster) GetAdvertiseAddress() string {
  772. c.RLock()
  773. defer c.RUnlock()
  774. if c.node != nil && c.node.config.AdvertiseAddr != "" {
  775. advertiseHost, _, _ := net.SplitHostPort(c.node.config.AdvertiseAddr)
  776. return advertiseHost
  777. }
  778. return c.actualLocalAddr
  779. }
  780. // GetRemoteAddress returns a known advertise address of a remote manager if
  781. // available.
  782. // todo: change to array/connect with info
  783. func (c *Cluster) GetRemoteAddress() string {
  784. c.RLock()
  785. defer c.RUnlock()
  786. return c.getRemoteAddress()
  787. }
  788. func (c *Cluster) getRemoteAddress() string {
  789. if c.node == nil {
  790. return ""
  791. }
  792. nodeID := c.node.NodeID()
  793. for _, r := range c.node.Remotes() {
  794. if r.NodeID != nodeID {
  795. return r.Addr
  796. }
  797. }
  798. return ""
  799. }
  800. // ListenClusterEvents returns a channel that receives messages on cluster
  801. // participation changes.
  802. // todo: make cancelable and accessible to multiple callers
  803. func (c *Cluster) ListenClusterEvents() <-chan struct{} {
  804. return c.configEvent
  805. }
  806. // Info returns information about the current cluster state.
  807. func (c *Cluster) Info() types.Info {
  808. info := types.Info{
  809. NodeAddr: c.GetAdvertiseAddress(),
  810. }
  811. c.RLock()
  812. defer c.RUnlock()
  813. if c.node == nil {
  814. info.LocalNodeState = types.LocalNodeStateInactive
  815. if c.cancelDelay != nil {
  816. info.LocalNodeState = types.LocalNodeStateError
  817. }
  818. if c.locked {
  819. info.LocalNodeState = types.LocalNodeStateLocked
  820. } else if c.err == ErrSwarmCertificatesExpired {
  821. info.LocalNodeState = types.LocalNodeStateError
  822. }
  823. } else {
  824. info.LocalNodeState = types.LocalNodeStatePending
  825. if c.ready == true {
  826. info.LocalNodeState = types.LocalNodeStateActive
  827. } else if c.locked {
  828. info.LocalNodeState = types.LocalNodeStateLocked
  829. }
  830. }
  831. if c.err != nil {
  832. info.Error = c.err.Error()
  833. }
  834. ctx, cancel := c.getRequestContext()
  835. defer cancel()
  836. if c.isActiveManager() {
  837. info.ControlAvailable = true
  838. swarm, err := c.Inspect()
  839. if err != nil {
  840. info.Error = err.Error()
  841. }
  842. // Strip JoinTokens
  843. info.Cluster = swarm.ClusterInfo
  844. if r, err := c.client.ListNodes(ctx, &swarmapi.ListNodesRequest{}); err == nil {
  845. info.Nodes = len(r.Nodes)
  846. for _, n := range r.Nodes {
  847. if n.ManagerStatus != nil {
  848. info.Managers = info.Managers + 1
  849. }
  850. }
  851. }
  852. }
  853. if c.node != nil {
  854. for _, r := range c.node.Remotes() {
  855. info.RemoteManagers = append(info.RemoteManagers, types.Peer{NodeID: r.NodeID, Addr: r.Addr})
  856. }
  857. info.NodeID = c.node.NodeID()
  858. }
  859. return info
  860. }
  861. // isActiveManager should not be called without a read lock
  862. func (c *Cluster) isActiveManager() bool {
  863. return c.node != nil && c.conn != nil
  864. }
  865. // swarmExists should not be called without a read lock
  866. func (c *Cluster) swarmExists() bool {
  867. return c.node != nil || c.locked || c.err == ErrSwarmCertificatesExpired
  868. }
  869. // errNoManager returns error describing why manager commands can't be used.
  870. // Call with read lock.
  871. func (c *Cluster) errNoManager() error {
  872. if c.node == nil {
  873. if c.locked {
  874. return ErrSwarmLocked
  875. }
  876. if c.err == ErrSwarmCertificatesExpired {
  877. return ErrSwarmCertificatesExpired
  878. }
  879. return fmt.Errorf("This node is not a swarm manager. Use \"docker swarm init\" or \"docker swarm join\" to connect this node to swarm and try again.")
  880. }
  881. if c.node.Manager() != nil {
  882. return fmt.Errorf("This node is not a swarm manager. Manager is being prepared or has trouble connecting to the cluster.")
  883. }
  884. return fmt.Errorf("This node is not a swarm manager. Worker nodes can't be used to view or modify cluster state. Please run this command on a manager node or promote the current node to a manager.")
  885. }
  886. // GetServices returns all services of a managed swarm cluster.
  887. func (c *Cluster) GetServices(options apitypes.ServiceListOptions) ([]types.Service, error) {
  888. c.RLock()
  889. defer c.RUnlock()
  890. if !c.isActiveManager() {
  891. return nil, c.errNoManager()
  892. }
  893. filters, err := newListServicesFilters(options.Filters)
  894. if err != nil {
  895. return nil, err
  896. }
  897. ctx, cancel := c.getRequestContext()
  898. defer cancel()
  899. r, err := c.client.ListServices(
  900. ctx,
  901. &swarmapi.ListServicesRequest{Filters: filters})
  902. if err != nil {
  903. return nil, err
  904. }
  905. services := []types.Service{}
  906. for _, service := range r.Services {
  907. services = append(services, convert.ServiceFromGRPC(*service))
  908. }
  909. return services, nil
  910. }
  911. // imageWithDigestString takes an image such as name or name:tag
  912. // and returns the image pinned to a digest, such as name@sha256:34234...
  913. // Due to the difference between the docker/docker/reference, and the
  914. // docker/distribution/reference packages, we're parsing the image twice.
  915. // As the two packages converge, this function should be simplified.
  916. // TODO(nishanttotla): After the packages converge, the function must
  917. // convert distreference.Named -> distreference.Canonical, and the logic simplified.
  918. func (c *Cluster) imageWithDigestString(ctx context.Context, image string, authConfig *apitypes.AuthConfig) (string, error) {
  919. ref, err := distreference.ParseNamed(image)
  920. if err != nil {
  921. return "", err
  922. }
  923. // only query registry if not a canonical reference (i.e. with digest)
  924. if _, ok := ref.(distreference.Canonical); !ok {
  925. // create a docker/docker/reference Named object because GetRepository needs it
  926. dockerRef, err := reference.ParseNamed(image)
  927. if err != nil {
  928. return "", err
  929. }
  930. dockerRef = reference.WithDefaultTag(dockerRef)
  931. namedTaggedRef, ok := dockerRef.(reference.NamedTagged)
  932. if !ok {
  933. return "", fmt.Errorf("unable to cast image to NamedTagged reference object")
  934. }
  935. repo, _, err := c.config.Backend.GetRepository(ctx, namedTaggedRef, authConfig)
  936. if err != nil {
  937. return "", err
  938. }
  939. dscrptr, err := repo.Tags(ctx).Get(ctx, namedTaggedRef.Tag())
  940. if err != nil {
  941. return "", err
  942. }
  943. namedDigestedRef, err := distreference.WithDigest(distreference.EnsureTagged(ref), dscrptr.Digest)
  944. if err != nil {
  945. return "", err
  946. }
  947. return namedDigestedRef.String(), nil
  948. }
  949. // reference already contains a digest, so just return it
  950. return ref.String(), nil
  951. }
  952. // CreateService creates a new service in a managed swarm cluster.
  953. func (c *Cluster) CreateService(s types.ServiceSpec, encodedAuth string) (*apitypes.ServiceCreateResponse, error) {
  954. c.RLock()
  955. defer c.RUnlock()
  956. if !c.isActiveManager() {
  957. return nil, c.errNoManager()
  958. }
  959. ctx, cancel := c.getRequestContext()
  960. defer cancel()
  961. err := c.populateNetworkID(ctx, c.client, &s)
  962. if err != nil {
  963. return nil, err
  964. }
  965. serviceSpec, err := convert.ServiceSpecToGRPC(s)
  966. if err != nil {
  967. return nil, err
  968. }
  969. ctnr := serviceSpec.Task.GetContainer()
  970. if ctnr == nil {
  971. return nil, fmt.Errorf("service does not use container tasks")
  972. }
  973. if encodedAuth != "" {
  974. ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
  975. }
  976. // retrieve auth config from encoded auth
  977. authConfig := &apitypes.AuthConfig{}
  978. if encodedAuth != "" {
  979. if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil {
  980. logrus.Warnf("invalid authconfig: %v", err)
  981. }
  982. }
  983. resp := &apitypes.ServiceCreateResponse{}
  984. // pin image by digest
  985. if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" {
  986. digestImage, err := c.imageWithDigestString(ctx, ctnr.Image, authConfig)
  987. if err != nil {
  988. logrus.Warnf("unable to pin image %s to digest: %s", ctnr.Image, err.Error())
  989. resp.Warnings = append(resp.Warnings, fmt.Sprintf("unable to pin image %s to digest: %s", ctnr.Image, err.Error()))
  990. } else {
  991. logrus.Debugf("pinning image %s by digest: %s", ctnr.Image, digestImage)
  992. ctnr.Image = digestImage
  993. }
  994. }
  995. r, err := c.client.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
  996. if err != nil {
  997. return nil, err
  998. }
  999. resp.ID = r.Service.ID
  1000. return resp, nil
  1001. }
  1002. // GetService returns a service based on an ID or name.
  1003. func (c *Cluster) GetService(input string) (types.Service, error) {
  1004. c.RLock()
  1005. defer c.RUnlock()
  1006. if !c.isActiveManager() {
  1007. return types.Service{}, c.errNoManager()
  1008. }
  1009. ctx, cancel := c.getRequestContext()
  1010. defer cancel()
  1011. service, err := getService(ctx, c.client, input)
  1012. if err != nil {
  1013. return types.Service{}, err
  1014. }
  1015. return convert.ServiceFromGRPC(*service), nil
  1016. }
  1017. // UpdateService updates existing service to match new properties.
  1018. func (c *Cluster) UpdateService(serviceIDOrName string, version uint64, spec types.ServiceSpec, encodedAuth string, registryAuthFrom string) (*apitypes.ServiceUpdateResponse, error) {
  1019. c.RLock()
  1020. defer c.RUnlock()
  1021. if !c.isActiveManager() {
  1022. return nil, c.errNoManager()
  1023. }
  1024. ctx, cancel := c.getRequestContext()
  1025. defer cancel()
  1026. err := c.populateNetworkID(ctx, c.client, &spec)
  1027. if err != nil {
  1028. return nil, err
  1029. }
  1030. serviceSpec, err := convert.ServiceSpecToGRPC(spec)
  1031. if err != nil {
  1032. return nil, err
  1033. }
  1034. currentService, err := getService(ctx, c.client, serviceIDOrName)
  1035. if err != nil {
  1036. return nil, err
  1037. }
  1038. newCtnr := serviceSpec.Task.GetContainer()
  1039. if newCtnr == nil {
  1040. return nil, fmt.Errorf("service does not use container tasks")
  1041. }
  1042. if encodedAuth != "" {
  1043. newCtnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
  1044. } else {
  1045. // this is needed because if the encodedAuth isn't being updated then we
  1046. // shouldn't lose it, and continue to use the one that was already present
  1047. var ctnr *swarmapi.ContainerSpec
  1048. switch registryAuthFrom {
  1049. case apitypes.RegistryAuthFromSpec, "":
  1050. ctnr = currentService.Spec.Task.GetContainer()
  1051. case apitypes.RegistryAuthFromPreviousSpec:
  1052. if currentService.PreviousSpec == nil {
  1053. return nil, fmt.Errorf("service does not have a previous spec")
  1054. }
  1055. ctnr = currentService.PreviousSpec.Task.GetContainer()
  1056. default:
  1057. return nil, fmt.Errorf("unsupported registryAuthFromValue")
  1058. }
  1059. if ctnr == nil {
  1060. return nil, fmt.Errorf("service does not use container tasks")
  1061. }
  1062. newCtnr.PullOptions = ctnr.PullOptions
  1063. // update encodedAuth so it can be used to pin image by digest
  1064. if ctnr.PullOptions != nil {
  1065. encodedAuth = ctnr.PullOptions.RegistryAuth
  1066. }
  1067. }
  1068. // retrieve auth config from encoded auth
  1069. authConfig := &apitypes.AuthConfig{}
  1070. if encodedAuth != "" {
  1071. if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil {
  1072. logrus.Warnf("invalid authconfig: %v", err)
  1073. }
  1074. }
  1075. resp := &apitypes.ServiceUpdateResponse{}
  1076. // pin image by digest
  1077. if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" {
  1078. digestImage, err := c.imageWithDigestString(ctx, newCtnr.Image, authConfig)
  1079. if err != nil {
  1080. logrus.Warnf("unable to pin image %s to digest: %s", newCtnr.Image, err.Error())
  1081. resp.Warnings = append(resp.Warnings, fmt.Sprintf("unable to pin image %s to digest: %s", newCtnr.Image, err.Error()))
  1082. } else if newCtnr.Image != digestImage {
  1083. logrus.Debugf("pinning image %s by digest: %s", newCtnr.Image, digestImage)
  1084. newCtnr.Image = digestImage
  1085. }
  1086. }
  1087. _, err = c.client.UpdateService(
  1088. ctx,
  1089. &swarmapi.UpdateServiceRequest{
  1090. ServiceID: currentService.ID,
  1091. Spec: &serviceSpec,
  1092. ServiceVersion: &swarmapi.Version{
  1093. Index: version,
  1094. },
  1095. },
  1096. )
  1097. return resp, err
  1098. }
  1099. // RemoveService removes a service from a managed swarm cluster.
  1100. func (c *Cluster) RemoveService(input string) error {
  1101. c.RLock()
  1102. defer c.RUnlock()
  1103. if !c.isActiveManager() {
  1104. return c.errNoManager()
  1105. }
  1106. ctx, cancel := c.getRequestContext()
  1107. defer cancel()
  1108. service, err := getService(ctx, c.client, input)
  1109. if err != nil {
  1110. return err
  1111. }
  1112. if _, err := c.client.RemoveService(ctx, &swarmapi.RemoveServiceRequest{ServiceID: service.ID}); err != nil {
  1113. return err
  1114. }
  1115. return nil
  1116. }
  1117. // ServiceLogs collects service logs and writes them back to `config.OutStream`
  1118. func (c *Cluster) ServiceLogs(ctx context.Context, input string, config *backend.ContainerLogsConfig, started chan struct{}) error {
  1119. c.RLock()
  1120. if !c.isActiveManager() {
  1121. c.RUnlock()
  1122. return c.errNoManager()
  1123. }
  1124. service, err := getService(ctx, c.client, input)
  1125. if err != nil {
  1126. c.RUnlock()
  1127. return err
  1128. }
  1129. stream, err := c.logs.SubscribeLogs(ctx, &swarmapi.SubscribeLogsRequest{
  1130. Selector: &swarmapi.LogSelector{
  1131. ServiceIDs: []string{service.ID},
  1132. },
  1133. Options: &swarmapi.LogSubscriptionOptions{
  1134. Follow: true,
  1135. },
  1136. })
  1137. if err != nil {
  1138. c.RUnlock()
  1139. return err
  1140. }
  1141. wf := ioutils.NewWriteFlusher(config.OutStream)
  1142. defer wf.Close()
  1143. close(started)
  1144. wf.Flush()
  1145. outStream := stdcopy.NewStdWriter(wf, stdcopy.Stdout)
  1146. errStream := stdcopy.NewStdWriter(wf, stdcopy.Stderr)
  1147. // Release the lock before starting the stream.
  1148. c.RUnlock()
  1149. for {
  1150. // Check the context before doing anything.
  1151. select {
  1152. case <-ctx.Done():
  1153. return ctx.Err()
  1154. default:
  1155. }
  1156. subscribeMsg, err := stream.Recv()
  1157. if err == io.EOF {
  1158. return nil
  1159. }
  1160. if err != nil {
  1161. return err
  1162. }
  1163. for _, msg := range subscribeMsg.Messages {
  1164. data := []byte{}
  1165. if config.Timestamps {
  1166. ts, err := ptypes.Timestamp(msg.Timestamp)
  1167. if err != nil {
  1168. return err
  1169. }
  1170. data = append(data, []byte(ts.Format(logger.TimeFormat)+" ")...)
  1171. }
  1172. data = append(data, []byte(fmt.Sprintf("%s.node.id=%s,%s.service.id=%s,%s.task.id=%s ",
  1173. contextPrefix, msg.Context.NodeID,
  1174. contextPrefix, msg.Context.ServiceID,
  1175. contextPrefix, msg.Context.TaskID,
  1176. ))...)
  1177. data = append(data, msg.Data...)
  1178. switch msg.Stream {
  1179. case swarmapi.LogStreamStdout:
  1180. outStream.Write(data)
  1181. case swarmapi.LogStreamStderr:
  1182. errStream.Write(data)
  1183. }
  1184. }
  1185. }
  1186. }
  1187. // GetNodes returns a list of all nodes known to a cluster.
  1188. func (c *Cluster) GetNodes(options apitypes.NodeListOptions) ([]types.Node, error) {
  1189. c.RLock()
  1190. defer c.RUnlock()
  1191. if !c.isActiveManager() {
  1192. return nil, c.errNoManager()
  1193. }
  1194. filters, err := newListNodesFilters(options.Filters)
  1195. if err != nil {
  1196. return nil, err
  1197. }
  1198. ctx, cancel := c.getRequestContext()
  1199. defer cancel()
  1200. r, err := c.client.ListNodes(
  1201. ctx,
  1202. &swarmapi.ListNodesRequest{Filters: filters})
  1203. if err != nil {
  1204. return nil, err
  1205. }
  1206. nodes := []types.Node{}
  1207. for _, node := range r.Nodes {
  1208. nodes = append(nodes, convert.NodeFromGRPC(*node))
  1209. }
  1210. return nodes, nil
  1211. }
  1212. // GetNode returns a node based on an ID or name.
  1213. func (c *Cluster) GetNode(input string) (types.Node, error) {
  1214. c.RLock()
  1215. defer c.RUnlock()
  1216. if !c.isActiveManager() {
  1217. return types.Node{}, c.errNoManager()
  1218. }
  1219. ctx, cancel := c.getRequestContext()
  1220. defer cancel()
  1221. node, err := getNode(ctx, c.client, input)
  1222. if err != nil {
  1223. return types.Node{}, err
  1224. }
  1225. return convert.NodeFromGRPC(*node), nil
  1226. }
  1227. // UpdateNode updates existing nodes properties.
  1228. func (c *Cluster) UpdateNode(input string, version uint64, spec types.NodeSpec) error {
  1229. c.RLock()
  1230. defer c.RUnlock()
  1231. if !c.isActiveManager() {
  1232. return c.errNoManager()
  1233. }
  1234. nodeSpec, err := convert.NodeSpecToGRPC(spec)
  1235. if err != nil {
  1236. return err
  1237. }
  1238. ctx, cancel := c.getRequestContext()
  1239. defer cancel()
  1240. currentNode, err := getNode(ctx, c.client, input)
  1241. if err != nil {
  1242. return err
  1243. }
  1244. _, err = c.client.UpdateNode(
  1245. ctx,
  1246. &swarmapi.UpdateNodeRequest{
  1247. NodeID: currentNode.ID,
  1248. Spec: &nodeSpec,
  1249. NodeVersion: &swarmapi.Version{
  1250. Index: version,
  1251. },
  1252. },
  1253. )
  1254. return err
  1255. }
  1256. // RemoveNode removes a node from a cluster
  1257. func (c *Cluster) RemoveNode(input string, force bool) error {
  1258. c.RLock()
  1259. defer c.RUnlock()
  1260. if !c.isActiveManager() {
  1261. return c.errNoManager()
  1262. }
  1263. ctx, cancel := c.getRequestContext()
  1264. defer cancel()
  1265. node, err := getNode(ctx, c.client, input)
  1266. if err != nil {
  1267. return err
  1268. }
  1269. if _, err := c.client.RemoveNode(ctx, &swarmapi.RemoveNodeRequest{NodeID: node.ID, Force: force}); err != nil {
  1270. return err
  1271. }
  1272. return nil
  1273. }
  1274. // GetTasks returns a list of tasks matching the filter options.
  1275. func (c *Cluster) GetTasks(options apitypes.TaskListOptions) ([]types.Task, error) {
  1276. c.RLock()
  1277. defer c.RUnlock()
  1278. if !c.isActiveManager() {
  1279. return nil, c.errNoManager()
  1280. }
  1281. byName := func(filter filters.Args) error {
  1282. if filter.Include("service") {
  1283. serviceFilters := filter.Get("service")
  1284. for _, serviceFilter := range serviceFilters {
  1285. service, err := c.GetService(serviceFilter)
  1286. if err != nil {
  1287. return err
  1288. }
  1289. filter.Del("service", serviceFilter)
  1290. filter.Add("service", service.ID)
  1291. }
  1292. }
  1293. if filter.Include("node") {
  1294. nodeFilters := filter.Get("node")
  1295. for _, nodeFilter := range nodeFilters {
  1296. node, err := c.GetNode(nodeFilter)
  1297. if err != nil {
  1298. return err
  1299. }
  1300. filter.Del("node", nodeFilter)
  1301. filter.Add("node", node.ID)
  1302. }
  1303. }
  1304. return nil
  1305. }
  1306. filters, err := newListTasksFilters(options.Filters, byName)
  1307. if err != nil {
  1308. return nil, err
  1309. }
  1310. ctx, cancel := c.getRequestContext()
  1311. defer cancel()
  1312. r, err := c.client.ListTasks(
  1313. ctx,
  1314. &swarmapi.ListTasksRequest{Filters: filters})
  1315. if err != nil {
  1316. return nil, err
  1317. }
  1318. tasks := []types.Task{}
  1319. for _, task := range r.Tasks {
  1320. if task.Spec.GetContainer() != nil {
  1321. tasks = append(tasks, convert.TaskFromGRPC(*task))
  1322. }
  1323. }
  1324. return tasks, nil
  1325. }
  1326. // GetTask returns a task by an ID.
  1327. func (c *Cluster) GetTask(input string) (types.Task, error) {
  1328. c.RLock()
  1329. defer c.RUnlock()
  1330. if !c.isActiveManager() {
  1331. return types.Task{}, c.errNoManager()
  1332. }
  1333. ctx, cancel := c.getRequestContext()
  1334. defer cancel()
  1335. task, err := getTask(ctx, c.client, input)
  1336. if err != nil {
  1337. return types.Task{}, err
  1338. }
  1339. return convert.TaskFromGRPC(*task), nil
  1340. }
  1341. // GetNetwork returns a cluster network by an ID.
  1342. func (c *Cluster) GetNetwork(input string) (apitypes.NetworkResource, error) {
  1343. c.RLock()
  1344. defer c.RUnlock()
  1345. if !c.isActiveManager() {
  1346. return apitypes.NetworkResource{}, c.errNoManager()
  1347. }
  1348. ctx, cancel := c.getRequestContext()
  1349. defer cancel()
  1350. network, err := getNetwork(ctx, c.client, input)
  1351. if err != nil {
  1352. return apitypes.NetworkResource{}, err
  1353. }
  1354. return convert.BasicNetworkFromGRPC(*network), nil
  1355. }
  1356. // GetNetworks returns all current cluster managed networks.
  1357. func (c *Cluster) GetNetworks() ([]apitypes.NetworkResource, error) {
  1358. c.RLock()
  1359. defer c.RUnlock()
  1360. if !c.isActiveManager() {
  1361. return nil, c.errNoManager()
  1362. }
  1363. ctx, cancel := c.getRequestContext()
  1364. defer cancel()
  1365. r, err := c.client.ListNetworks(ctx, &swarmapi.ListNetworksRequest{})
  1366. if err != nil {
  1367. return nil, err
  1368. }
  1369. var networks []apitypes.NetworkResource
  1370. for _, network := range r.Networks {
  1371. networks = append(networks, convert.BasicNetworkFromGRPC(*network))
  1372. }
  1373. return networks, nil
  1374. }
  1375. func attacherKey(target, containerID string) string {
  1376. return containerID + ":" + target
  1377. }
  1378. // UpdateAttachment signals the attachment config to the attachment
  1379. // waiter who is trying to start or attach the container to the
  1380. // network.
  1381. func (c *Cluster) UpdateAttachment(target, containerID string, config *network.NetworkingConfig) error {
  1382. c.RLock()
  1383. attacher, ok := c.attachers[attacherKey(target, containerID)]
  1384. c.RUnlock()
  1385. if !ok || attacher == nil {
  1386. return fmt.Errorf("could not find attacher for container %s to network %s", containerID, target)
  1387. }
  1388. attacher.attachWaitCh <- config
  1389. close(attacher.attachWaitCh)
  1390. return nil
  1391. }
  1392. // WaitForDetachment waits for the container to stop or detach from
  1393. // the network.
  1394. func (c *Cluster) WaitForDetachment(ctx context.Context, networkName, networkID, taskID, containerID string) error {
  1395. c.RLock()
  1396. attacher, ok := c.attachers[attacherKey(networkName, containerID)]
  1397. if !ok {
  1398. attacher, ok = c.attachers[attacherKey(networkID, containerID)]
  1399. }
  1400. if c.node == nil || c.node.Agent() == nil {
  1401. c.RUnlock()
  1402. return fmt.Errorf("invalid cluster node while waiting for detachment")
  1403. }
  1404. agent := c.node.Agent()
  1405. c.RUnlock()
  1406. if ok && attacher != nil &&
  1407. attacher.detachWaitCh != nil &&
  1408. attacher.attachCompleteCh != nil {
  1409. // Attachment may be in progress still so wait for
  1410. // attachment to complete.
  1411. select {
  1412. case <-attacher.attachCompleteCh:
  1413. case <-ctx.Done():
  1414. return ctx.Err()
  1415. }
  1416. if attacher.taskID == taskID {
  1417. select {
  1418. case <-attacher.detachWaitCh:
  1419. case <-ctx.Done():
  1420. return ctx.Err()
  1421. }
  1422. }
  1423. }
  1424. return agent.ResourceAllocator().DetachNetwork(ctx, taskID)
  1425. }
  1426. // AttachNetwork generates an attachment request towards the manager.
  1427. func (c *Cluster) AttachNetwork(target string, containerID string, addresses []string) (*network.NetworkingConfig, error) {
  1428. aKey := attacherKey(target, containerID)
  1429. c.Lock()
  1430. if c.node == nil || c.node.Agent() == nil {
  1431. c.Unlock()
  1432. return nil, fmt.Errorf("invalid cluster node while attaching to network")
  1433. }
  1434. if attacher, ok := c.attachers[aKey]; ok {
  1435. c.Unlock()
  1436. return attacher.config, nil
  1437. }
  1438. agent := c.node.Agent()
  1439. attachWaitCh := make(chan *network.NetworkingConfig)
  1440. detachWaitCh := make(chan struct{})
  1441. attachCompleteCh := make(chan struct{})
  1442. c.attachers[aKey] = &attacher{
  1443. attachWaitCh: attachWaitCh,
  1444. attachCompleteCh: attachCompleteCh,
  1445. detachWaitCh: detachWaitCh,
  1446. }
  1447. c.Unlock()
  1448. ctx, cancel := c.getRequestContext()
  1449. defer cancel()
  1450. taskID, err := agent.ResourceAllocator().AttachNetwork(ctx, containerID, target, addresses)
  1451. if err != nil {
  1452. c.Lock()
  1453. delete(c.attachers, aKey)
  1454. c.Unlock()
  1455. return nil, fmt.Errorf("Could not attach to network %s: %v", target, err)
  1456. }
  1457. c.Lock()
  1458. c.attachers[aKey].taskID = taskID
  1459. close(attachCompleteCh)
  1460. c.Unlock()
  1461. logrus.Debugf("Successfully attached to network %s with tid %s", target, taskID)
  1462. var config *network.NetworkingConfig
  1463. select {
  1464. case config = <-attachWaitCh:
  1465. case <-ctx.Done():
  1466. return nil, fmt.Errorf("attaching to network failed, make sure your network options are correct and check manager logs: %v", ctx.Err())
  1467. }
  1468. c.Lock()
  1469. c.attachers[aKey].config = config
  1470. c.Unlock()
  1471. return config, nil
  1472. }
  1473. // DetachNetwork unblocks the waiters waiting on WaitForDetachment so
  1474. // that a request to detach can be generated towards the manager.
  1475. func (c *Cluster) DetachNetwork(target string, containerID string) error {
  1476. aKey := attacherKey(target, containerID)
  1477. c.Lock()
  1478. attacher, ok := c.attachers[aKey]
  1479. delete(c.attachers, aKey)
  1480. c.Unlock()
  1481. if !ok {
  1482. return fmt.Errorf("could not find network attachment for container %s to network %s", containerID, target)
  1483. }
  1484. close(attacher.detachWaitCh)
  1485. return nil
  1486. }
  1487. // CreateNetwork creates a new cluster managed network.
  1488. func (c *Cluster) CreateNetwork(s apitypes.NetworkCreateRequest) (string, error) {
  1489. c.RLock()
  1490. defer c.RUnlock()
  1491. if !c.isActiveManager() {
  1492. return "", c.errNoManager()
  1493. }
  1494. if runconfig.IsPreDefinedNetwork(s.Name) {
  1495. err := fmt.Errorf("%s is a pre-defined network and cannot be created", s.Name)
  1496. return "", apierrors.NewRequestForbiddenError(err)
  1497. }
  1498. ctx, cancel := c.getRequestContext()
  1499. defer cancel()
  1500. networkSpec := convert.BasicNetworkCreateToGRPC(s)
  1501. r, err := c.client.CreateNetwork(ctx, &swarmapi.CreateNetworkRequest{Spec: &networkSpec})
  1502. if err != nil {
  1503. return "", err
  1504. }
  1505. return r.Network.ID, nil
  1506. }
  1507. // RemoveNetwork removes a cluster network.
  1508. func (c *Cluster) RemoveNetwork(input string) error {
  1509. c.RLock()
  1510. defer c.RUnlock()
  1511. if !c.isActiveManager() {
  1512. return c.errNoManager()
  1513. }
  1514. ctx, cancel := c.getRequestContext()
  1515. defer cancel()
  1516. network, err := getNetwork(ctx, c.client, input)
  1517. if err != nil {
  1518. return err
  1519. }
  1520. if _, err := c.client.RemoveNetwork(ctx, &swarmapi.RemoveNetworkRequest{NetworkID: network.ID}); err != nil {
  1521. return err
  1522. }
  1523. return nil
  1524. }
  1525. func (c *Cluster) populateNetworkID(ctx context.Context, client swarmapi.ControlClient, s *types.ServiceSpec) error {
  1526. // Always prefer NetworkAttachmentConfigs from TaskTemplate
  1527. // but fallback to service spec for backward compatibility
  1528. networks := s.TaskTemplate.Networks
  1529. if len(networks) == 0 {
  1530. networks = s.Networks
  1531. }
  1532. for i, n := range networks {
  1533. apiNetwork, err := getNetwork(ctx, client, n.Target)
  1534. if err != nil {
  1535. if ln, _ := c.config.Backend.FindNetwork(n.Target); ln != nil && !ln.Info().Dynamic() {
  1536. err = fmt.Errorf("network %s is not eligible for docker services", ln.Name())
  1537. return apierrors.NewRequestForbiddenError(err)
  1538. }
  1539. return err
  1540. }
  1541. networks[i].Target = apiNetwork.ID
  1542. }
  1543. return nil
  1544. }
  1545. func getNetwork(ctx context.Context, c swarmapi.ControlClient, input string) (*swarmapi.Network, error) {
  1546. // GetNetwork to match via full ID.
  1547. rg, err := c.GetNetwork(ctx, &swarmapi.GetNetworkRequest{NetworkID: input})
  1548. if err != nil {
  1549. // If any error (including NotFound), ListNetworks to match via ID prefix and full name.
  1550. rl, err := c.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: &swarmapi.ListNetworksRequest_Filters{Names: []string{input}}})
  1551. if err != nil || len(rl.Networks) == 0 {
  1552. rl, err = c.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: &swarmapi.ListNetworksRequest_Filters{IDPrefixes: []string{input}}})
  1553. }
  1554. if err != nil {
  1555. return nil, err
  1556. }
  1557. if len(rl.Networks) == 0 {
  1558. return nil, fmt.Errorf("network %s not found", input)
  1559. }
  1560. if l := len(rl.Networks); l > 1 {
  1561. return nil, fmt.Errorf("network %s is ambiguous (%d matches found)", input, l)
  1562. }
  1563. return rl.Networks[0], nil
  1564. }
  1565. return rg.Network, nil
  1566. }
  1567. // Cleanup stops active swarm node. This is run before daemon shutdown.
  1568. func (c *Cluster) Cleanup() {
  1569. c.Lock()
  1570. node := c.node
  1571. if node == nil {
  1572. c.Unlock()
  1573. return
  1574. }
  1575. defer c.Unlock()
  1576. if c.isActiveManager() {
  1577. active, reachable, unreachable, err := c.managerStats()
  1578. if err == nil {
  1579. singlenode := active && isLastManager(reachable, unreachable)
  1580. if active && !singlenode && removingManagerCausesLossOfQuorum(reachable, unreachable) {
  1581. logrus.Errorf("Leaving cluster with %v managers left out of %v. Raft quorum will be lost.", reachable-1, reachable+unreachable)
  1582. }
  1583. }
  1584. }
  1585. c.stopNode()
  1586. }
  1587. func (c *Cluster) managerStats() (current bool, reachable int, unreachable int, err error) {
  1588. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  1589. defer cancel()
  1590. nodes, err := c.client.ListNodes(ctx, &swarmapi.ListNodesRequest{})
  1591. if err != nil {
  1592. return false, 0, 0, err
  1593. }
  1594. for _, n := range nodes.Nodes {
  1595. if n.ManagerStatus != nil {
  1596. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_REACHABLE {
  1597. reachable++
  1598. if n.ID == c.node.NodeID() {
  1599. current = true
  1600. }
  1601. }
  1602. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_UNREACHABLE {
  1603. unreachable++
  1604. }
  1605. }
  1606. }
  1607. return
  1608. }
  1609. func validateAndSanitizeInitRequest(req *types.InitRequest) error {
  1610. var err error
  1611. req.ListenAddr, err = validateAddr(req.ListenAddr)
  1612. if err != nil {
  1613. return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
  1614. }
  1615. return nil
  1616. }
  1617. func validateAndSanitizeJoinRequest(req *types.JoinRequest) error {
  1618. var err error
  1619. req.ListenAddr, err = validateAddr(req.ListenAddr)
  1620. if err != nil {
  1621. return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
  1622. }
  1623. if len(req.RemoteAddrs) == 0 {
  1624. return fmt.Errorf("at least 1 RemoteAddr is required to join")
  1625. }
  1626. for i := range req.RemoteAddrs {
  1627. req.RemoteAddrs[i], err = validateAddr(req.RemoteAddrs[i])
  1628. if err != nil {
  1629. return fmt.Errorf("invalid remoteAddr %q: %v", req.RemoteAddrs[i], err)
  1630. }
  1631. }
  1632. return nil
  1633. }
  1634. func validateAddr(addr string) (string, error) {
  1635. if addr == "" {
  1636. return addr, fmt.Errorf("invalid empty address")
  1637. }
  1638. newaddr, err := opts.ParseTCPAddr(addr, defaultAddr)
  1639. if err != nil {
  1640. return addr, nil
  1641. }
  1642. return strings.TrimPrefix(newaddr, "tcp://"), nil
  1643. }
  1644. func initClusterSpec(node *node, spec types.Spec) error {
  1645. ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
  1646. for conn := range node.ListenControlSocket(ctx) {
  1647. if ctx.Err() != nil {
  1648. return ctx.Err()
  1649. }
  1650. if conn != nil {
  1651. client := swarmapi.NewControlClient(conn)
  1652. var cluster *swarmapi.Cluster
  1653. for i := 0; ; i++ {
  1654. lcr, err := client.ListClusters(ctx, &swarmapi.ListClustersRequest{})
  1655. if err != nil {
  1656. return fmt.Errorf("error on listing clusters: %v", err)
  1657. }
  1658. if len(lcr.Clusters) == 0 {
  1659. if i < 10 {
  1660. time.Sleep(200 * time.Millisecond)
  1661. continue
  1662. }
  1663. return fmt.Errorf("empty list of clusters was returned")
  1664. }
  1665. cluster = lcr.Clusters[0]
  1666. break
  1667. }
  1668. // In init, we take the initial default values from swarmkit, and merge
  1669. // any non nil or 0 value from spec to GRPC spec. This will leave the
  1670. // default value alone.
  1671. // Note that this is different from Update(), as in Update() we expect
  1672. // user to specify the complete spec of the cluster (as they already know
  1673. // the existing one and knows which field to update)
  1674. clusterSpec, err := convert.MergeSwarmSpecToGRPC(spec, cluster.Spec)
  1675. if err != nil {
  1676. return fmt.Errorf("error updating cluster settings: %v", err)
  1677. }
  1678. _, err = client.UpdateCluster(ctx, &swarmapi.UpdateClusterRequest{
  1679. ClusterID: cluster.ID,
  1680. ClusterVersion: &cluster.Meta.Version,
  1681. Spec: &clusterSpec,
  1682. })
  1683. if err != nil {
  1684. return fmt.Errorf("error updating cluster settings: %v", err)
  1685. }
  1686. return nil
  1687. }
  1688. }
  1689. return ctx.Err()
  1690. }
  1691. func detectLockedError(err error) error {
  1692. if err == swarmnode.ErrInvalidUnlockKey {
  1693. return errors.WithStack(ErrSwarmLocked)
  1694. }
  1695. return err
  1696. }