cluster.go 49 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767
  1. package cluster
  2. //
  3. // ## Swarmkit integration
  4. //
  5. // Cluster - static configurable object for accessing everything swarm related.
  6. // Contains methods for connecting and controlling the cluster. Exists always,
  7. // even if swarm mode is not enabled.
  8. //
  9. // NodeRunner - Manager for starting the swarmkit node. Is present only and
  10. // always if swarm mode is enabled. Implements backoff restart loop in case of
  11. // errors.
  12. //
  13. // NodeState - Information about the current node status including access to
  14. // gRPC clients if a manager is active.
  15. //
  16. // ### Locking
  17. //
  18. // `cluster.controlMutex` - taken for the whole lifecycle of the processes that
  19. // can reconfigure cluster(init/join/leave etc). Protects that one
  20. // reconfiguration action has fully completed before another can start.
  21. //
  22. // `cluster.mu` - taken when the actual changes in cluster configurations
  23. // happen. Different from `controlMutex` because in some cases we need to
  24. // access current cluster state even if the long-running reconfiguration is
  25. // going on. For example network stack may ask for the current cluster state in
  26. // the middle of the shutdown. Any time current cluster state is asked you
  27. // should take the read lock of `cluster.mu`. If you are writing an API
  28. // responder that returns synchronously, hold `cluster.mu.RLock()` for the
  29. // duration of the whole handler function. That ensures that node will not be
  30. // shut down until the handler has finished.
  31. //
  32. // NodeRunner implements its internal locks that should not be used outside of
  33. // the struct. Instead, you should just call `nodeRunner.State()` method to get
  34. // the current state of the cluster(still need `cluster.mu.RLock()` to access
  35. // `cluster.nr` reference itself). Most of the changes in NodeRunner happen
  36. // because of an external event(network problem, unexpected swarmkit error) and
  37. // Docker shouldn't take any locks that delay these changes from happening.
  38. //
  39. import (
  40. "crypto/x509"
  41. "encoding/base64"
  42. "encoding/json"
  43. "fmt"
  44. "io"
  45. "net"
  46. "os"
  47. "path/filepath"
  48. "strings"
  49. "sync"
  50. "time"
  51. "github.com/Sirupsen/logrus"
  52. "github.com/docker/distribution/reference"
  53. apierrors "github.com/docker/docker/api/errors"
  54. apitypes "github.com/docker/docker/api/types"
  55. "github.com/docker/docker/api/types/backend"
  56. "github.com/docker/docker/api/types/filters"
  57. "github.com/docker/docker/api/types/network"
  58. types "github.com/docker/docker/api/types/swarm"
  59. "github.com/docker/docker/daemon/cluster/convert"
  60. executorpkg "github.com/docker/docker/daemon/cluster/executor"
  61. "github.com/docker/docker/daemon/logger"
  62. "github.com/docker/docker/opts"
  63. "github.com/docker/docker/pkg/ioutils"
  64. "github.com/docker/docker/pkg/signal"
  65. "github.com/docker/docker/pkg/stdcopy"
  66. "github.com/docker/docker/runconfig"
  67. swarmapi "github.com/docker/swarmkit/api"
  68. "github.com/docker/swarmkit/manager/encryption"
  69. swarmnode "github.com/docker/swarmkit/node"
  70. gogotypes "github.com/gogo/protobuf/types"
  71. "github.com/pkg/errors"
  72. "golang.org/x/net/context"
  73. )
  74. const swarmDirName = "swarm"
  75. const controlSocket = "control.sock"
  76. const swarmConnectTimeout = 20 * time.Second
  77. const swarmRequestTimeout = 20 * time.Second
  78. const stateFile = "docker-state.json"
  79. const defaultAddr = "0.0.0.0:2377"
  80. const (
  81. initialReconnectDelay = 100 * time.Millisecond
  82. maxReconnectDelay = 30 * time.Second
  83. contextPrefix = "com.docker.swarm"
  84. )
  85. // errNoSwarm is returned on leaving a cluster that was never initialized
  86. var errNoSwarm = errors.New("This node is not part of a swarm")
  87. // errSwarmExists is returned on initialize or join request for a cluster that has already been activated
  88. var errSwarmExists = errors.New("This node is already part of a swarm. Use \"docker swarm leave\" to leave this swarm and join another one.")
  89. // errSwarmJoinTimeoutReached is returned when cluster join could not complete before timeout was reached.
  90. var errSwarmJoinTimeoutReached = errors.New("Timeout was reached before node was joined. The attempt to join the swarm will continue in the background. Use the \"docker info\" command to see the current swarm status of your node.")
  91. // errSwarmLocked is returned if the swarm is encrypted and needs a key to unlock it.
  92. var errSwarmLocked = errors.New("Swarm is encrypted and needs to be unlocked before it can be used. Please use \"docker swarm unlock\" to unlock it.")
  93. // errSwarmCertificatesExpired is returned if docker was not started for the whole validity period and they had no chance to renew automatically.
  94. var errSwarmCertificatesExpired = errors.New("Swarm certificates have expired. To replace them, leave the swarm and join again.")
  95. // NetworkSubnetsProvider exposes functions for retrieving the subnets
  96. // of networks managed by Docker, so they can be filtered.
  97. type NetworkSubnetsProvider interface {
  98. V4Subnets() []net.IPNet
  99. V6Subnets() []net.IPNet
  100. }
  101. // Config provides values for Cluster.
  102. type Config struct {
  103. Root string
  104. Name string
  105. Backend executorpkg.Backend
  106. NetworkSubnetsProvider NetworkSubnetsProvider
  107. // DefaultAdvertiseAddr is the default host/IP or network interface to use
  108. // if no AdvertiseAddr value is specified.
  109. DefaultAdvertiseAddr string
  110. // path to store runtime state, such as the swarm control socket
  111. RuntimeRoot string
  112. }
  113. // Cluster provides capabilities to participate in a cluster as a worker or a
  114. // manager.
  115. type Cluster struct {
  116. mu sync.RWMutex
  117. controlMutex sync.RWMutex // protect init/join/leave user operations
  118. nr *nodeRunner
  119. root string
  120. runtimeRoot string
  121. config Config
  122. configEvent chan struct{} // todo: make this array and goroutine safe
  123. attachers map[string]*attacher
  124. }
  125. // attacher manages the in-memory attachment state of a container
  126. // attachment to a global scope network managed by swarm manager. It
  127. // helps in identifying the attachment ID via the taskID and the
  128. // corresponding attachment configuration obtained from the manager.
  129. type attacher struct {
  130. taskID string
  131. config *network.NetworkingConfig
  132. attachWaitCh chan *network.NetworkingConfig
  133. attachCompleteCh chan struct{}
  134. detachWaitCh chan struct{}
  135. }
  136. // New creates a new Cluster instance using provided config.
  137. func New(config Config) (*Cluster, error) {
  138. root := filepath.Join(config.Root, swarmDirName)
  139. if err := os.MkdirAll(root, 0700); err != nil {
  140. return nil, err
  141. }
  142. if config.RuntimeRoot == "" {
  143. config.RuntimeRoot = root
  144. }
  145. if err := os.MkdirAll(config.RuntimeRoot, 0700); err != nil {
  146. return nil, err
  147. }
  148. c := &Cluster{
  149. root: root,
  150. config: config,
  151. configEvent: make(chan struct{}, 10),
  152. runtimeRoot: config.RuntimeRoot,
  153. attachers: make(map[string]*attacher),
  154. }
  155. nodeConfig, err := loadPersistentState(root)
  156. if err != nil {
  157. if os.IsNotExist(err) {
  158. return c, nil
  159. }
  160. return nil, err
  161. }
  162. nr, err := c.newNodeRunner(*nodeConfig)
  163. if err != nil {
  164. return nil, err
  165. }
  166. c.nr = nr
  167. select {
  168. case <-time.After(swarmConnectTimeout):
  169. logrus.Error("swarm component could not be started before timeout was reached")
  170. case err := <-nr.Ready():
  171. if err != nil {
  172. if errors.Cause(err) == errSwarmLocked {
  173. return c, nil
  174. }
  175. if err, ok := errors.Cause(c.nr.err).(x509.CertificateInvalidError); ok && err.Reason == x509.Expired {
  176. return c, nil
  177. }
  178. return nil, errors.Wrap(err, "swarm component could not be started")
  179. }
  180. }
  181. return c, nil
  182. }
  183. func (c *Cluster) newNodeRunner(conf nodeStartConfig) (*nodeRunner, error) {
  184. if err := c.config.Backend.IsSwarmCompatible(); err != nil {
  185. return nil, err
  186. }
  187. actualLocalAddr := conf.LocalAddr
  188. if actualLocalAddr == "" {
  189. // If localAddr was not specified, resolve it automatically
  190. // based on the route to joinAddr. localAddr can only be left
  191. // empty on "join".
  192. listenHost, _, err := net.SplitHostPort(conf.ListenAddr)
  193. if err != nil {
  194. return nil, fmt.Errorf("could not parse listen address: %v", err)
  195. }
  196. listenAddrIP := net.ParseIP(listenHost)
  197. if listenAddrIP == nil || !listenAddrIP.IsUnspecified() {
  198. actualLocalAddr = listenHost
  199. } else {
  200. if conf.RemoteAddr == "" {
  201. // Should never happen except using swarms created by
  202. // old versions that didn't save remoteAddr.
  203. conf.RemoteAddr = "8.8.8.8:53"
  204. }
  205. conn, err := net.Dial("udp", conf.RemoteAddr)
  206. if err != nil {
  207. return nil, fmt.Errorf("could not find local IP address: %v", err)
  208. }
  209. localHostPort := conn.LocalAddr().String()
  210. actualLocalAddr, _, _ = net.SplitHostPort(localHostPort)
  211. conn.Close()
  212. }
  213. }
  214. nr := &nodeRunner{cluster: c}
  215. nr.actualLocalAddr = actualLocalAddr
  216. if err := nr.Start(conf); err != nil {
  217. return nil, err
  218. }
  219. c.config.Backend.DaemonJoinsCluster(c)
  220. return nr, nil
  221. }
  222. // Init initializes new cluster from user provided request.
  223. func (c *Cluster) Init(req types.InitRequest) (string, error) {
  224. c.controlMutex.Lock()
  225. defer c.controlMutex.Unlock()
  226. c.mu.Lock()
  227. if c.nr != nil {
  228. if req.ForceNewCluster {
  229. if err := c.nr.Stop(); err != nil {
  230. c.mu.Unlock()
  231. return "", err
  232. }
  233. } else {
  234. c.mu.Unlock()
  235. return "", errSwarmExists
  236. }
  237. }
  238. c.mu.Unlock()
  239. if err := validateAndSanitizeInitRequest(&req); err != nil {
  240. return "", apierrors.NewBadRequestError(err)
  241. }
  242. listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
  243. if err != nil {
  244. return "", err
  245. }
  246. advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
  247. if err != nil {
  248. return "", err
  249. }
  250. localAddr := listenHost
  251. // If the local address is undetermined, the advertise address
  252. // will be used as local address, if it belongs to this system.
  253. // If the advertise address is not local, then we try to find
  254. // a system address to use as local address. If this fails,
  255. // we give up and ask the user to pass the listen address.
  256. if net.ParseIP(localAddr).IsUnspecified() {
  257. advertiseIP := net.ParseIP(advertiseHost)
  258. found := false
  259. for _, systemIP := range listSystemIPs() {
  260. if systemIP.Equal(advertiseIP) {
  261. localAddr = advertiseIP.String()
  262. found = true
  263. break
  264. }
  265. }
  266. if !found {
  267. ip, err := c.resolveSystemAddr()
  268. if err != nil {
  269. logrus.Warnf("Could not find a local address: %v", err)
  270. return "", errMustSpecifyListenAddr
  271. }
  272. localAddr = ip.String()
  273. }
  274. }
  275. if !req.ForceNewCluster {
  276. clearPersistentState(c.root)
  277. }
  278. nr, err := c.newNodeRunner(nodeStartConfig{
  279. forceNewCluster: req.ForceNewCluster,
  280. autolock: req.AutoLockManagers,
  281. LocalAddr: localAddr,
  282. ListenAddr: net.JoinHostPort(listenHost, listenPort),
  283. AdvertiseAddr: net.JoinHostPort(advertiseHost, advertisePort),
  284. availability: req.Availability,
  285. })
  286. if err != nil {
  287. return "", err
  288. }
  289. c.mu.Lock()
  290. c.nr = nr
  291. c.mu.Unlock()
  292. if err := <-nr.Ready(); err != nil {
  293. if !req.ForceNewCluster { // if failure on first attempt don't keep state
  294. if err := clearPersistentState(c.root); err != nil {
  295. return "", err
  296. }
  297. }
  298. if err != nil {
  299. c.mu.Lock()
  300. c.nr = nil
  301. c.mu.Unlock()
  302. }
  303. return "", err
  304. }
  305. state := nr.State()
  306. if state.swarmNode == nil { // should never happen but protect from panic
  307. return "", errors.New("invalid cluster state for spec initialization")
  308. }
  309. if err := initClusterSpec(state.swarmNode, req.Spec); err != nil {
  310. return "", err
  311. }
  312. return state.NodeID(), nil
  313. }
  314. // Join makes current Cluster part of an existing swarm cluster.
  315. func (c *Cluster) Join(req types.JoinRequest) error {
  316. c.controlMutex.Lock()
  317. defer c.controlMutex.Unlock()
  318. c.mu.Lock()
  319. if c.nr != nil {
  320. c.mu.Unlock()
  321. return errSwarmExists
  322. }
  323. c.mu.Unlock()
  324. if err := validateAndSanitizeJoinRequest(&req); err != nil {
  325. return apierrors.NewBadRequestError(err)
  326. }
  327. listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
  328. if err != nil {
  329. return err
  330. }
  331. var advertiseAddr string
  332. if req.AdvertiseAddr != "" {
  333. advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
  334. // For joining, we don't need to provide an advertise address,
  335. // since the remote side can detect it.
  336. if err == nil {
  337. advertiseAddr = net.JoinHostPort(advertiseHost, advertisePort)
  338. }
  339. }
  340. clearPersistentState(c.root)
  341. nr, err := c.newNodeRunner(nodeStartConfig{
  342. RemoteAddr: req.RemoteAddrs[0],
  343. ListenAddr: net.JoinHostPort(listenHost, listenPort),
  344. AdvertiseAddr: advertiseAddr,
  345. joinAddr: req.RemoteAddrs[0],
  346. joinToken: req.JoinToken,
  347. availability: req.Availability,
  348. })
  349. if err != nil {
  350. return err
  351. }
  352. c.mu.Lock()
  353. c.nr = nr
  354. c.mu.Unlock()
  355. select {
  356. case <-time.After(swarmConnectTimeout):
  357. return errSwarmJoinTimeoutReached
  358. case err := <-nr.Ready():
  359. if err != nil {
  360. c.mu.Lock()
  361. c.nr = nil
  362. c.mu.Unlock()
  363. }
  364. return err
  365. }
  366. }
  367. // GetUnlockKey returns the unlock key for the swarm.
  368. func (c *Cluster) GetUnlockKey() (string, error) {
  369. c.mu.RLock()
  370. defer c.mu.RUnlock()
  371. state := c.currentNodeState()
  372. if !state.IsActiveManager() {
  373. return "", c.errNoManager(state)
  374. }
  375. ctx, cancel := c.getRequestContext()
  376. defer cancel()
  377. client := swarmapi.NewCAClient(state.grpcConn)
  378. r, err := client.GetUnlockKey(ctx, &swarmapi.GetUnlockKeyRequest{})
  379. if err != nil {
  380. return "", err
  381. }
  382. if len(r.UnlockKey) == 0 {
  383. // no key
  384. return "", nil
  385. }
  386. return encryption.HumanReadableKey(r.UnlockKey), nil
  387. }
  388. // UnlockSwarm provides a key to decrypt data that is encrypted at rest.
  389. func (c *Cluster) UnlockSwarm(req types.UnlockRequest) error {
  390. c.controlMutex.Lock()
  391. defer c.controlMutex.Unlock()
  392. c.mu.RLock()
  393. state := c.currentNodeState()
  394. if !state.IsActiveManager() {
  395. // when manager is not active,
  396. // unless it is locked, otherwise return error.
  397. if err := c.errNoManager(state); err != errSwarmLocked {
  398. c.mu.RUnlock()
  399. return err
  400. }
  401. } else {
  402. // when manager is active, return an error of "not locked"
  403. c.mu.RUnlock()
  404. return errors.New("swarm is not locked")
  405. }
  406. // only when swarm is locked, code running reaches here
  407. nr := c.nr
  408. c.mu.RUnlock()
  409. key, err := encryption.ParseHumanReadableKey(req.UnlockKey)
  410. if err != nil {
  411. return err
  412. }
  413. config := nr.config
  414. config.lockKey = key
  415. if err := nr.Stop(); err != nil {
  416. return err
  417. }
  418. nr, err = c.newNodeRunner(config)
  419. if err != nil {
  420. return err
  421. }
  422. c.mu.Lock()
  423. c.nr = nr
  424. c.mu.Unlock()
  425. if err := <-nr.Ready(); err != nil {
  426. if errors.Cause(err) == errSwarmLocked {
  427. return errors.New("swarm could not be unlocked: invalid key provided")
  428. }
  429. return fmt.Errorf("swarm component could not be started: %v", err)
  430. }
  431. return nil
  432. }
  433. // Leave shuts down Cluster and removes current state.
  434. func (c *Cluster) Leave(force bool) error {
  435. c.controlMutex.Lock()
  436. defer c.controlMutex.Unlock()
  437. c.mu.Lock()
  438. nr := c.nr
  439. if nr == nil {
  440. c.mu.Unlock()
  441. return errNoSwarm
  442. }
  443. state := c.currentNodeState()
  444. if errors.Cause(state.err) == errSwarmLocked && !force {
  445. // leave a locked swarm without --force is not allowed
  446. c.mu.Unlock()
  447. return errors.New("Swarm is encrypted and locked. Please unlock it first or use `--force` to ignore this message.")
  448. }
  449. if state.IsManager() && !force {
  450. msg := "You are attempting to leave the swarm on a node that is participating as a manager. "
  451. if state.IsActiveManager() {
  452. active, reachable, unreachable, err := managerStats(state.controlClient, state.NodeID())
  453. if err == nil {
  454. if active && removingManagerCausesLossOfQuorum(reachable, unreachable) {
  455. if isLastManager(reachable, unreachable) {
  456. msg += "Removing the last manager erases all current state of the swarm. Use `--force` to ignore this message. "
  457. c.mu.Unlock()
  458. return errors.New(msg)
  459. }
  460. msg += fmt.Sprintf("Removing this node leaves %v managers out of %v. Without a Raft quorum your swarm will be inaccessible. ", reachable-1, reachable+unreachable)
  461. }
  462. }
  463. } else {
  464. msg += "Doing so may lose the consensus of your cluster. "
  465. }
  466. msg += "The only way to restore a swarm that has lost consensus is to reinitialize it with `--force-new-cluster`. Use `--force` to suppress this message."
  467. c.mu.Unlock()
  468. return errors.New(msg)
  469. }
  470. // release readers in here
  471. if err := nr.Stop(); err != nil {
  472. logrus.Errorf("failed to shut down cluster node: %v", err)
  473. signal.DumpStacks("")
  474. c.mu.Unlock()
  475. return err
  476. }
  477. c.nr = nil
  478. c.mu.Unlock()
  479. if nodeID := state.NodeID(); nodeID != "" {
  480. nodeContainers, err := c.listContainerForNode(nodeID)
  481. if err != nil {
  482. return err
  483. }
  484. for _, id := range nodeContainers {
  485. if err := c.config.Backend.ContainerRm(id, &apitypes.ContainerRmConfig{ForceRemove: true}); err != nil {
  486. logrus.Errorf("error removing %v: %v", id, err)
  487. }
  488. }
  489. }
  490. c.configEvent <- struct{}{}
  491. // todo: cleanup optional?
  492. if err := clearPersistentState(c.root); err != nil {
  493. return err
  494. }
  495. c.config.Backend.DaemonLeavesCluster()
  496. return nil
  497. }
  498. func (c *Cluster) listContainerForNode(nodeID string) ([]string, error) {
  499. var ids []string
  500. filters := filters.NewArgs()
  501. filters.Add("label", fmt.Sprintf("com.docker.swarm.node.id=%s", nodeID))
  502. containers, err := c.config.Backend.Containers(&apitypes.ContainerListOptions{
  503. Filters: filters,
  504. })
  505. if err != nil {
  506. return []string{}, err
  507. }
  508. for _, c := range containers {
  509. ids = append(ids, c.ID)
  510. }
  511. return ids, nil
  512. }
  513. func (c *Cluster) getRequestContext() (context.Context, func()) { // TODO: not needed when requests don't block on qourum lost
  514. return context.WithTimeout(context.Background(), swarmRequestTimeout)
  515. }
  516. // Inspect retrieves the configuration properties of a managed swarm cluster.
  517. func (c *Cluster) Inspect() (types.Swarm, error) {
  518. c.mu.RLock()
  519. defer c.mu.RUnlock()
  520. state := c.currentNodeState()
  521. if !state.IsActiveManager() {
  522. return types.Swarm{}, c.errNoManager(state)
  523. }
  524. ctx, cancel := c.getRequestContext()
  525. defer cancel()
  526. swarm, err := getSwarm(ctx, state.controlClient)
  527. if err != nil {
  528. return types.Swarm{}, err
  529. }
  530. return convert.SwarmFromGRPC(*swarm), nil
  531. }
  532. // Update updates configuration of a managed swarm cluster.
  533. func (c *Cluster) Update(version uint64, spec types.Spec, flags types.UpdateFlags) error {
  534. c.mu.RLock()
  535. defer c.mu.RUnlock()
  536. state := c.currentNodeState()
  537. if !state.IsActiveManager() {
  538. return c.errNoManager(state)
  539. }
  540. ctx, cancel := c.getRequestContext()
  541. defer cancel()
  542. swarm, err := getSwarm(ctx, state.controlClient)
  543. if err != nil {
  544. return err
  545. }
  546. // In update, client should provide the complete spec of the swarm, including
  547. // Name and Labels. If a field is specified with 0 or nil, then the default value
  548. // will be used to swarmkit.
  549. clusterSpec, err := convert.SwarmSpecToGRPC(spec)
  550. if err != nil {
  551. return apierrors.NewBadRequestError(err)
  552. }
  553. _, err = state.controlClient.UpdateCluster(
  554. ctx,
  555. &swarmapi.UpdateClusterRequest{
  556. ClusterID: swarm.ID,
  557. Spec: &clusterSpec,
  558. ClusterVersion: &swarmapi.Version{
  559. Index: version,
  560. },
  561. Rotation: swarmapi.KeyRotation{
  562. WorkerJoinToken: flags.RotateWorkerToken,
  563. ManagerJoinToken: flags.RotateManagerToken,
  564. ManagerUnlockKey: flags.RotateManagerUnlockKey,
  565. },
  566. },
  567. )
  568. return err
  569. }
  570. // IsManager returns true if Cluster is participating as a manager.
  571. func (c *Cluster) IsManager() bool {
  572. c.mu.RLock()
  573. defer c.mu.RUnlock()
  574. return c.currentNodeState().IsActiveManager()
  575. }
  576. // IsAgent returns true if Cluster is participating as a worker/agent.
  577. func (c *Cluster) IsAgent() bool {
  578. c.mu.RLock()
  579. defer c.mu.RUnlock()
  580. return c.currentNodeState().status == types.LocalNodeStateActive
  581. }
  582. // GetLocalAddress returns the local address.
  583. func (c *Cluster) GetLocalAddress() string {
  584. c.mu.RLock()
  585. defer c.mu.RUnlock()
  586. return c.currentNodeState().actualLocalAddr
  587. }
  588. // GetListenAddress returns the listen address.
  589. func (c *Cluster) GetListenAddress() string {
  590. c.mu.RLock()
  591. defer c.mu.RUnlock()
  592. if c.nr != nil {
  593. return c.nr.config.ListenAddr
  594. }
  595. return ""
  596. }
  597. // GetAdvertiseAddress returns the remotely reachable address of this node.
  598. func (c *Cluster) GetAdvertiseAddress() string {
  599. c.mu.RLock()
  600. defer c.mu.RUnlock()
  601. if c.nr != nil && c.nr.config.AdvertiseAddr != "" {
  602. advertiseHost, _, _ := net.SplitHostPort(c.nr.config.AdvertiseAddr)
  603. return advertiseHost
  604. }
  605. return c.currentNodeState().actualLocalAddr
  606. }
  607. // GetRemoteAddress returns a known advertise address of a remote manager if
  608. // available.
  609. // todo: change to array/connect with info
  610. func (c *Cluster) GetRemoteAddress() string {
  611. c.mu.RLock()
  612. defer c.mu.RUnlock()
  613. return c.getRemoteAddress()
  614. }
  615. func (c *Cluster) getRemoteAddress() string {
  616. state := c.currentNodeState()
  617. if state.swarmNode == nil {
  618. return ""
  619. }
  620. nodeID := state.swarmNode.NodeID()
  621. for _, r := range state.swarmNode.Remotes() {
  622. if r.NodeID != nodeID {
  623. return r.Addr
  624. }
  625. }
  626. return ""
  627. }
  628. // ListenClusterEvents returns a channel that receives messages on cluster
  629. // participation changes.
  630. // todo: make cancelable and accessible to multiple callers
  631. func (c *Cluster) ListenClusterEvents() <-chan struct{} {
  632. return c.configEvent
  633. }
  634. // Info returns information about the current cluster state.
  635. func (c *Cluster) Info() types.Info {
  636. info := types.Info{
  637. NodeAddr: c.GetAdvertiseAddress(),
  638. }
  639. c.mu.RLock()
  640. defer c.mu.RUnlock()
  641. state := c.currentNodeState()
  642. info.LocalNodeState = state.status
  643. if state.err != nil {
  644. info.Error = state.err.Error()
  645. }
  646. ctx, cancel := c.getRequestContext()
  647. defer cancel()
  648. if state.IsActiveManager() {
  649. info.ControlAvailable = true
  650. swarm, err := c.Inspect()
  651. if err != nil {
  652. info.Error = err.Error()
  653. }
  654. // Strip JoinTokens
  655. info.Cluster = swarm.ClusterInfo
  656. if r, err := state.controlClient.ListNodes(ctx, &swarmapi.ListNodesRequest{}); err != nil {
  657. info.Error = err.Error()
  658. } else {
  659. info.Nodes = len(r.Nodes)
  660. for _, n := range r.Nodes {
  661. if n.ManagerStatus != nil {
  662. info.Managers = info.Managers + 1
  663. }
  664. }
  665. }
  666. }
  667. if state.swarmNode != nil {
  668. for _, r := range state.swarmNode.Remotes() {
  669. info.RemoteManagers = append(info.RemoteManagers, types.Peer{NodeID: r.NodeID, Addr: r.Addr})
  670. }
  671. info.NodeID = state.swarmNode.NodeID()
  672. }
  673. return info
  674. }
  675. // currentNodeState should not be called without a read lock
  676. func (c *Cluster) currentNodeState() nodeState {
  677. return c.nr.State()
  678. }
  679. // errNoManager returns error describing why manager commands can't be used.
  680. // Call with read lock.
  681. func (c *Cluster) errNoManager(st nodeState) error {
  682. if st.swarmNode == nil {
  683. if errors.Cause(st.err) == errSwarmLocked {
  684. return errSwarmLocked
  685. }
  686. if st.err == errSwarmCertificatesExpired {
  687. return errSwarmCertificatesExpired
  688. }
  689. return errors.New("This node is not a swarm manager. Use \"docker swarm init\" or \"docker swarm join\" to connect this node to swarm and try again.")
  690. }
  691. if st.swarmNode.Manager() != nil {
  692. return errors.New("This node is not a swarm manager. Manager is being prepared or has trouble connecting to the cluster.")
  693. }
  694. return errors.New("This node is not a swarm manager. Worker nodes can't be used to view or modify cluster state. Please run this command on a manager node or promote the current node to a manager.")
  695. }
  696. // GetServices returns all services of a managed swarm cluster.
  697. func (c *Cluster) GetServices(options apitypes.ServiceListOptions) ([]types.Service, error) {
  698. c.mu.RLock()
  699. defer c.mu.RUnlock()
  700. state := c.currentNodeState()
  701. if !state.IsActiveManager() {
  702. return nil, c.errNoManager(state)
  703. }
  704. filters, err := newListServicesFilters(options.Filters)
  705. if err != nil {
  706. return nil, err
  707. }
  708. ctx, cancel := c.getRequestContext()
  709. defer cancel()
  710. r, err := state.controlClient.ListServices(
  711. ctx,
  712. &swarmapi.ListServicesRequest{Filters: filters})
  713. if err != nil {
  714. return nil, err
  715. }
  716. services := []types.Service{}
  717. for _, service := range r.Services {
  718. services = append(services, convert.ServiceFromGRPC(*service))
  719. }
  720. return services, nil
  721. }
  722. // imageWithDigestString takes an image such as name or name:tag
  723. // and returns the image pinned to a digest, such as name@sha256:34234
  724. func (c *Cluster) imageWithDigestString(ctx context.Context, image string, authConfig *apitypes.AuthConfig) (string, error) {
  725. ref, err := reference.ParseAnyReference(image)
  726. if err != nil {
  727. return "", err
  728. }
  729. namedRef, ok := ref.(reference.Named)
  730. if !ok {
  731. if _, ok := ref.(reference.Digested); ok {
  732. return "", errors.New("image reference is an image ID")
  733. }
  734. return "", errors.Errorf("unknown image reference format: %s", image)
  735. }
  736. // only query registry if not a canonical reference (i.e. with digest)
  737. if _, ok := namedRef.(reference.Canonical); !ok {
  738. namedRef = reference.TagNameOnly(namedRef)
  739. taggedRef, ok := namedRef.(reference.NamedTagged)
  740. if !ok {
  741. return "", errors.Errorf("image reference not tagged: %s", image)
  742. }
  743. repo, _, err := c.config.Backend.GetRepository(ctx, taggedRef, authConfig)
  744. if err != nil {
  745. return "", err
  746. }
  747. dscrptr, err := repo.Tags(ctx).Get(ctx, taggedRef.Tag())
  748. if err != nil {
  749. return "", err
  750. }
  751. namedDigestedRef, err := reference.WithDigest(taggedRef, dscrptr.Digest)
  752. if err != nil {
  753. return "", err
  754. }
  755. // return familiar form until interface updated to return type
  756. return reference.FamiliarString(namedDigestedRef), nil
  757. }
  758. // reference already contains a digest, so just return it
  759. return reference.FamiliarString(ref), nil
  760. }
  761. // CreateService creates a new service in a managed swarm cluster.
  762. func (c *Cluster) CreateService(s types.ServiceSpec, encodedAuth string) (*apitypes.ServiceCreateResponse, error) {
  763. c.mu.RLock()
  764. defer c.mu.RUnlock()
  765. state := c.currentNodeState()
  766. if !state.IsActiveManager() {
  767. return nil, c.errNoManager(state)
  768. }
  769. ctx, cancel := c.getRequestContext()
  770. defer cancel()
  771. err := c.populateNetworkID(ctx, state.controlClient, &s)
  772. if err != nil {
  773. return nil, err
  774. }
  775. serviceSpec, err := convert.ServiceSpecToGRPC(s)
  776. if err != nil {
  777. return nil, apierrors.NewBadRequestError(err)
  778. }
  779. ctnr := serviceSpec.Task.GetContainer()
  780. if ctnr == nil {
  781. return nil, errors.New("service does not use container tasks")
  782. }
  783. if encodedAuth != "" {
  784. ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
  785. }
  786. // retrieve auth config from encoded auth
  787. authConfig := &apitypes.AuthConfig{}
  788. if encodedAuth != "" {
  789. if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil {
  790. logrus.Warnf("invalid authconfig: %v", err)
  791. }
  792. }
  793. resp := &apitypes.ServiceCreateResponse{}
  794. // pin image by digest
  795. if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" {
  796. digestImage, err := c.imageWithDigestString(ctx, ctnr.Image, authConfig)
  797. if err != nil {
  798. logrus.Warnf("unable to pin image %s to digest: %s", ctnr.Image, err.Error())
  799. resp.Warnings = append(resp.Warnings, fmt.Sprintf("unable to pin image %s to digest: %s", ctnr.Image, err.Error()))
  800. } else if ctnr.Image != digestImage {
  801. logrus.Debugf("pinning image %s by digest: %s", ctnr.Image, digestImage)
  802. ctnr.Image = digestImage
  803. } else {
  804. logrus.Debugf("creating service using supplied digest reference %s", ctnr.Image)
  805. }
  806. }
  807. r, err := state.controlClient.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
  808. if err != nil {
  809. return nil, err
  810. }
  811. resp.ID = r.Service.ID
  812. return resp, nil
  813. }
  814. // GetService returns a service based on an ID or name.
  815. func (c *Cluster) GetService(input string) (types.Service, error) {
  816. c.mu.RLock()
  817. defer c.mu.RUnlock()
  818. state := c.currentNodeState()
  819. if !state.IsActiveManager() {
  820. return types.Service{}, c.errNoManager(state)
  821. }
  822. ctx, cancel := c.getRequestContext()
  823. defer cancel()
  824. service, err := getService(ctx, state.controlClient, input)
  825. if err != nil {
  826. return types.Service{}, err
  827. }
  828. return convert.ServiceFromGRPC(*service), nil
  829. }
  830. // UpdateService updates existing service to match new properties.
  831. func (c *Cluster) UpdateService(serviceIDOrName string, version uint64, spec types.ServiceSpec, encodedAuth string, registryAuthFrom string) (*apitypes.ServiceUpdateResponse, error) {
  832. c.mu.RLock()
  833. defer c.mu.RUnlock()
  834. state := c.currentNodeState()
  835. if !state.IsActiveManager() {
  836. return nil, c.errNoManager(state)
  837. }
  838. ctx, cancel := c.getRequestContext()
  839. defer cancel()
  840. err := c.populateNetworkID(ctx, state.controlClient, &spec)
  841. if err != nil {
  842. return nil, err
  843. }
  844. serviceSpec, err := convert.ServiceSpecToGRPC(spec)
  845. if err != nil {
  846. return nil, apierrors.NewBadRequestError(err)
  847. }
  848. currentService, err := getService(ctx, state.controlClient, serviceIDOrName)
  849. if err != nil {
  850. return nil, err
  851. }
  852. newCtnr := serviceSpec.Task.GetContainer()
  853. if newCtnr == nil {
  854. return nil, errors.New("service does not use container tasks")
  855. }
  856. if encodedAuth != "" {
  857. newCtnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
  858. } else {
  859. // this is needed because if the encodedAuth isn't being updated then we
  860. // shouldn't lose it, and continue to use the one that was already present
  861. var ctnr *swarmapi.ContainerSpec
  862. switch registryAuthFrom {
  863. case apitypes.RegistryAuthFromSpec, "":
  864. ctnr = currentService.Spec.Task.GetContainer()
  865. case apitypes.RegistryAuthFromPreviousSpec:
  866. if currentService.PreviousSpec == nil {
  867. return nil, errors.New("service does not have a previous spec")
  868. }
  869. ctnr = currentService.PreviousSpec.Task.GetContainer()
  870. default:
  871. return nil, errors.New("unsupported registryAuthFrom value")
  872. }
  873. if ctnr == nil {
  874. return nil, errors.New("service does not use container tasks")
  875. }
  876. newCtnr.PullOptions = ctnr.PullOptions
  877. // update encodedAuth so it can be used to pin image by digest
  878. if ctnr.PullOptions != nil {
  879. encodedAuth = ctnr.PullOptions.RegistryAuth
  880. }
  881. }
  882. // retrieve auth config from encoded auth
  883. authConfig := &apitypes.AuthConfig{}
  884. if encodedAuth != "" {
  885. if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil {
  886. logrus.Warnf("invalid authconfig: %v", err)
  887. }
  888. }
  889. resp := &apitypes.ServiceUpdateResponse{}
  890. // pin image by digest
  891. if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" {
  892. digestImage, err := c.imageWithDigestString(ctx, newCtnr.Image, authConfig)
  893. if err != nil {
  894. logrus.Warnf("unable to pin image %s to digest: %s", newCtnr.Image, err.Error())
  895. resp.Warnings = append(resp.Warnings, fmt.Sprintf("unable to pin image %s to digest: %s", newCtnr.Image, err.Error()))
  896. } else if newCtnr.Image != digestImage {
  897. logrus.Debugf("pinning image %s by digest: %s", newCtnr.Image, digestImage)
  898. newCtnr.Image = digestImage
  899. } else {
  900. logrus.Debugf("updating service using supplied digest reference %s", newCtnr.Image)
  901. }
  902. }
  903. _, err = state.controlClient.UpdateService(
  904. ctx,
  905. &swarmapi.UpdateServiceRequest{
  906. ServiceID: currentService.ID,
  907. Spec: &serviceSpec,
  908. ServiceVersion: &swarmapi.Version{
  909. Index: version,
  910. },
  911. },
  912. )
  913. return resp, err
  914. }
  915. // RemoveService removes a service from a managed swarm cluster.
  916. func (c *Cluster) RemoveService(input string) error {
  917. c.mu.RLock()
  918. defer c.mu.RUnlock()
  919. state := c.currentNodeState()
  920. if !state.IsActiveManager() {
  921. return c.errNoManager(state)
  922. }
  923. ctx, cancel := c.getRequestContext()
  924. defer cancel()
  925. service, err := getService(ctx, state.controlClient, input)
  926. if err != nil {
  927. return err
  928. }
  929. _, err = state.controlClient.RemoveService(ctx, &swarmapi.RemoveServiceRequest{ServiceID: service.ID})
  930. return err
  931. }
  932. // ServiceLogs collects service logs and writes them back to `config.OutStream`
  933. func (c *Cluster) ServiceLogs(ctx context.Context, input string, config *backend.ContainerLogsConfig, started chan struct{}) error {
  934. c.mu.RLock()
  935. state := c.currentNodeState()
  936. if !state.IsActiveManager() {
  937. c.mu.RUnlock()
  938. return c.errNoManager(state)
  939. }
  940. service, err := getService(ctx, state.controlClient, input)
  941. if err != nil {
  942. c.mu.RUnlock()
  943. return err
  944. }
  945. stream, err := state.logsClient.SubscribeLogs(ctx, &swarmapi.SubscribeLogsRequest{
  946. Selector: &swarmapi.LogSelector{
  947. ServiceIDs: []string{service.ID},
  948. },
  949. Options: &swarmapi.LogSubscriptionOptions{
  950. Follow: config.Follow,
  951. },
  952. })
  953. if err != nil {
  954. c.mu.RUnlock()
  955. return err
  956. }
  957. wf := ioutils.NewWriteFlusher(config.OutStream)
  958. defer wf.Close()
  959. close(started)
  960. wf.Flush()
  961. outStream := stdcopy.NewStdWriter(wf, stdcopy.Stdout)
  962. errStream := stdcopy.NewStdWriter(wf, stdcopy.Stderr)
  963. // Release the lock before starting the stream.
  964. c.mu.RUnlock()
  965. for {
  966. // Check the context before doing anything.
  967. select {
  968. case <-ctx.Done():
  969. return ctx.Err()
  970. default:
  971. }
  972. subscribeMsg, err := stream.Recv()
  973. if err == io.EOF {
  974. return nil
  975. }
  976. if err != nil {
  977. return err
  978. }
  979. for _, msg := range subscribeMsg.Messages {
  980. data := []byte{}
  981. if config.Timestamps {
  982. ts, err := gogotypes.TimestampFromProto(msg.Timestamp)
  983. if err != nil {
  984. return err
  985. }
  986. data = append(data, []byte(ts.Format(logger.TimeFormat)+" ")...)
  987. }
  988. data = append(data, []byte(fmt.Sprintf("%s.node.id=%s,%s.service.id=%s,%s.task.id=%s ",
  989. contextPrefix, msg.Context.NodeID,
  990. contextPrefix, msg.Context.ServiceID,
  991. contextPrefix, msg.Context.TaskID,
  992. ))...)
  993. data = append(data, msg.Data...)
  994. switch msg.Stream {
  995. case swarmapi.LogStreamStdout:
  996. outStream.Write(data)
  997. case swarmapi.LogStreamStderr:
  998. errStream.Write(data)
  999. }
  1000. }
  1001. }
  1002. }
  1003. // GetNodes returns a list of all nodes known to a cluster.
  1004. func (c *Cluster) GetNodes(options apitypes.NodeListOptions) ([]types.Node, error) {
  1005. c.mu.RLock()
  1006. defer c.mu.RUnlock()
  1007. state := c.currentNodeState()
  1008. if !state.IsActiveManager() {
  1009. return nil, c.errNoManager(state)
  1010. }
  1011. filters, err := newListNodesFilters(options.Filters)
  1012. if err != nil {
  1013. return nil, err
  1014. }
  1015. ctx, cancel := c.getRequestContext()
  1016. defer cancel()
  1017. r, err := state.controlClient.ListNodes(
  1018. ctx,
  1019. &swarmapi.ListNodesRequest{Filters: filters})
  1020. if err != nil {
  1021. return nil, err
  1022. }
  1023. nodes := []types.Node{}
  1024. for _, node := range r.Nodes {
  1025. nodes = append(nodes, convert.NodeFromGRPC(*node))
  1026. }
  1027. return nodes, nil
  1028. }
  1029. // GetNode returns a node based on an ID.
  1030. func (c *Cluster) GetNode(input string) (types.Node, error) {
  1031. c.mu.RLock()
  1032. defer c.mu.RUnlock()
  1033. state := c.currentNodeState()
  1034. if !state.IsActiveManager() {
  1035. return types.Node{}, c.errNoManager(state)
  1036. }
  1037. ctx, cancel := c.getRequestContext()
  1038. defer cancel()
  1039. node, err := getNode(ctx, state.controlClient, input)
  1040. if err != nil {
  1041. return types.Node{}, err
  1042. }
  1043. return convert.NodeFromGRPC(*node), nil
  1044. }
  1045. // UpdateNode updates existing nodes properties.
  1046. func (c *Cluster) UpdateNode(input string, version uint64, spec types.NodeSpec) error {
  1047. c.mu.RLock()
  1048. defer c.mu.RUnlock()
  1049. state := c.currentNodeState()
  1050. if !state.IsActiveManager() {
  1051. return c.errNoManager(state)
  1052. }
  1053. nodeSpec, err := convert.NodeSpecToGRPC(spec)
  1054. if err != nil {
  1055. return apierrors.NewBadRequestError(err)
  1056. }
  1057. ctx, cancel := c.getRequestContext()
  1058. defer cancel()
  1059. currentNode, err := getNode(ctx, state.controlClient, input)
  1060. if err != nil {
  1061. return err
  1062. }
  1063. _, err = state.controlClient.UpdateNode(
  1064. ctx,
  1065. &swarmapi.UpdateNodeRequest{
  1066. NodeID: currentNode.ID,
  1067. Spec: &nodeSpec,
  1068. NodeVersion: &swarmapi.Version{
  1069. Index: version,
  1070. },
  1071. },
  1072. )
  1073. return err
  1074. }
  1075. // RemoveNode removes a node from a cluster
  1076. func (c *Cluster) RemoveNode(input string, force bool) error {
  1077. c.mu.RLock()
  1078. defer c.mu.RUnlock()
  1079. state := c.currentNodeState()
  1080. if !state.IsActiveManager() {
  1081. return c.errNoManager(state)
  1082. }
  1083. ctx, cancel := c.getRequestContext()
  1084. defer cancel()
  1085. node, err := getNode(ctx, state.controlClient, input)
  1086. if err != nil {
  1087. return err
  1088. }
  1089. _, err = state.controlClient.RemoveNode(ctx, &swarmapi.RemoveNodeRequest{NodeID: node.ID, Force: force})
  1090. return err
  1091. }
  1092. // GetTasks returns a list of tasks matching the filter options.
  1093. func (c *Cluster) GetTasks(options apitypes.TaskListOptions) ([]types.Task, error) {
  1094. c.mu.RLock()
  1095. defer c.mu.RUnlock()
  1096. state := c.currentNodeState()
  1097. if !state.IsActiveManager() {
  1098. return nil, c.errNoManager(state)
  1099. }
  1100. byName := func(filter filters.Args) error {
  1101. if filter.Include("service") {
  1102. serviceFilters := filter.Get("service")
  1103. for _, serviceFilter := range serviceFilters {
  1104. service, err := c.GetService(serviceFilter)
  1105. if err != nil {
  1106. return err
  1107. }
  1108. filter.Del("service", serviceFilter)
  1109. filter.Add("service", service.ID)
  1110. }
  1111. }
  1112. if filter.Include("node") {
  1113. nodeFilters := filter.Get("node")
  1114. for _, nodeFilter := range nodeFilters {
  1115. node, err := c.GetNode(nodeFilter)
  1116. if err != nil {
  1117. return err
  1118. }
  1119. filter.Del("node", nodeFilter)
  1120. filter.Add("node", node.ID)
  1121. }
  1122. }
  1123. return nil
  1124. }
  1125. filters, err := newListTasksFilters(options.Filters, byName)
  1126. if err != nil {
  1127. return nil, err
  1128. }
  1129. ctx, cancel := c.getRequestContext()
  1130. defer cancel()
  1131. r, err := state.controlClient.ListTasks(
  1132. ctx,
  1133. &swarmapi.ListTasksRequest{Filters: filters})
  1134. if err != nil {
  1135. return nil, err
  1136. }
  1137. tasks := []types.Task{}
  1138. for _, task := range r.Tasks {
  1139. if task.Spec.GetContainer() != nil {
  1140. tasks = append(tasks, convert.TaskFromGRPC(*task))
  1141. }
  1142. }
  1143. return tasks, nil
  1144. }
  1145. // GetTask returns a task by an ID.
  1146. func (c *Cluster) GetTask(input string) (types.Task, error) {
  1147. c.mu.RLock()
  1148. defer c.mu.RUnlock()
  1149. state := c.currentNodeState()
  1150. if !state.IsActiveManager() {
  1151. return types.Task{}, c.errNoManager(state)
  1152. }
  1153. ctx, cancel := c.getRequestContext()
  1154. defer cancel()
  1155. task, err := getTask(ctx, state.controlClient, input)
  1156. if err != nil {
  1157. return types.Task{}, err
  1158. }
  1159. return convert.TaskFromGRPC(*task), nil
  1160. }
  1161. // GetNetwork returns a cluster network by an ID.
  1162. func (c *Cluster) GetNetwork(input string) (apitypes.NetworkResource, error) {
  1163. c.mu.RLock()
  1164. defer c.mu.RUnlock()
  1165. state := c.currentNodeState()
  1166. if !state.IsActiveManager() {
  1167. return apitypes.NetworkResource{}, c.errNoManager(state)
  1168. }
  1169. ctx, cancel := c.getRequestContext()
  1170. defer cancel()
  1171. network, err := getNetwork(ctx, state.controlClient, input)
  1172. if err != nil {
  1173. return apitypes.NetworkResource{}, err
  1174. }
  1175. return convert.BasicNetworkFromGRPC(*network), nil
  1176. }
  1177. func (c *Cluster) getNetworks(filters *swarmapi.ListNetworksRequest_Filters) ([]apitypes.NetworkResource, error) {
  1178. c.mu.RLock()
  1179. defer c.mu.RUnlock()
  1180. state := c.currentNodeState()
  1181. if !state.IsActiveManager() {
  1182. return nil, c.errNoManager(state)
  1183. }
  1184. ctx, cancel := c.getRequestContext()
  1185. defer cancel()
  1186. r, err := state.controlClient.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: filters})
  1187. if err != nil {
  1188. return nil, err
  1189. }
  1190. var networks []apitypes.NetworkResource
  1191. for _, network := range r.Networks {
  1192. networks = append(networks, convert.BasicNetworkFromGRPC(*network))
  1193. }
  1194. return networks, nil
  1195. }
  1196. // GetNetworks returns all current cluster managed networks.
  1197. func (c *Cluster) GetNetworks() ([]apitypes.NetworkResource, error) {
  1198. return c.getNetworks(nil)
  1199. }
  1200. // GetNetworksByName returns cluster managed networks by name.
  1201. // It is ok to have multiple networks here. #18864
  1202. func (c *Cluster) GetNetworksByName(name string) ([]apitypes.NetworkResource, error) {
  1203. // Note that swarmapi.GetNetworkRequest.Name is not functional.
  1204. // So we cannot just use that with c.GetNetwork.
  1205. return c.getNetworks(&swarmapi.ListNetworksRequest_Filters{
  1206. Names: []string{name},
  1207. })
  1208. }
  1209. func attacherKey(target, containerID string) string {
  1210. return containerID + ":" + target
  1211. }
  1212. // UpdateAttachment signals the attachment config to the attachment
  1213. // waiter who is trying to start or attach the container to the
  1214. // network.
  1215. func (c *Cluster) UpdateAttachment(target, containerID string, config *network.NetworkingConfig) error {
  1216. c.mu.RLock()
  1217. attacher, ok := c.attachers[attacherKey(target, containerID)]
  1218. c.mu.RUnlock()
  1219. if !ok || attacher == nil {
  1220. return fmt.Errorf("could not find attacher for container %s to network %s", containerID, target)
  1221. }
  1222. attacher.attachWaitCh <- config
  1223. close(attacher.attachWaitCh)
  1224. return nil
  1225. }
  1226. // WaitForDetachment waits for the container to stop or detach from
  1227. // the network.
  1228. func (c *Cluster) WaitForDetachment(ctx context.Context, networkName, networkID, taskID, containerID string) error {
  1229. c.mu.RLock()
  1230. attacher, ok := c.attachers[attacherKey(networkName, containerID)]
  1231. if !ok {
  1232. attacher, ok = c.attachers[attacherKey(networkID, containerID)]
  1233. }
  1234. state := c.currentNodeState()
  1235. if state.swarmNode == nil || state.swarmNode.Agent() == nil {
  1236. c.mu.RUnlock()
  1237. return errors.New("invalid cluster node while waiting for detachment")
  1238. }
  1239. c.mu.RUnlock()
  1240. agent := state.swarmNode.Agent()
  1241. if ok && attacher != nil &&
  1242. attacher.detachWaitCh != nil &&
  1243. attacher.attachCompleteCh != nil {
  1244. // Attachment may be in progress still so wait for
  1245. // attachment to complete.
  1246. select {
  1247. case <-attacher.attachCompleteCh:
  1248. case <-ctx.Done():
  1249. return ctx.Err()
  1250. }
  1251. if attacher.taskID == taskID {
  1252. select {
  1253. case <-attacher.detachWaitCh:
  1254. case <-ctx.Done():
  1255. return ctx.Err()
  1256. }
  1257. }
  1258. }
  1259. return agent.ResourceAllocator().DetachNetwork(ctx, taskID)
  1260. }
  1261. // AttachNetwork generates an attachment request towards the manager.
  1262. func (c *Cluster) AttachNetwork(target string, containerID string, addresses []string) (*network.NetworkingConfig, error) {
  1263. aKey := attacherKey(target, containerID)
  1264. c.mu.Lock()
  1265. state := c.currentNodeState()
  1266. if state.swarmNode == nil || state.swarmNode.Agent() == nil {
  1267. c.mu.Unlock()
  1268. return nil, errors.New("invalid cluster node while attaching to network")
  1269. }
  1270. if attacher, ok := c.attachers[aKey]; ok {
  1271. c.mu.Unlock()
  1272. return attacher.config, nil
  1273. }
  1274. agent := state.swarmNode.Agent()
  1275. attachWaitCh := make(chan *network.NetworkingConfig)
  1276. detachWaitCh := make(chan struct{})
  1277. attachCompleteCh := make(chan struct{})
  1278. c.attachers[aKey] = &attacher{
  1279. attachWaitCh: attachWaitCh,
  1280. attachCompleteCh: attachCompleteCh,
  1281. detachWaitCh: detachWaitCh,
  1282. }
  1283. c.mu.Unlock()
  1284. ctx, cancel := c.getRequestContext()
  1285. defer cancel()
  1286. taskID, err := agent.ResourceAllocator().AttachNetwork(ctx, containerID, target, addresses)
  1287. if err != nil {
  1288. c.mu.Lock()
  1289. delete(c.attachers, aKey)
  1290. c.mu.Unlock()
  1291. return nil, fmt.Errorf("Could not attach to network %s: %v", target, err)
  1292. }
  1293. c.mu.Lock()
  1294. c.attachers[aKey].taskID = taskID
  1295. close(attachCompleteCh)
  1296. c.mu.Unlock()
  1297. logrus.Debugf("Successfully attached to network %s with tid %s", target, taskID)
  1298. var config *network.NetworkingConfig
  1299. select {
  1300. case config = <-attachWaitCh:
  1301. case <-ctx.Done():
  1302. return nil, fmt.Errorf("attaching to network failed, make sure your network options are correct and check manager logs: %v", ctx.Err())
  1303. }
  1304. c.mu.Lock()
  1305. c.attachers[aKey].config = config
  1306. c.mu.Unlock()
  1307. return config, nil
  1308. }
  1309. // DetachNetwork unblocks the waiters waiting on WaitForDetachment so
  1310. // that a request to detach can be generated towards the manager.
  1311. func (c *Cluster) DetachNetwork(target string, containerID string) error {
  1312. aKey := attacherKey(target, containerID)
  1313. c.mu.Lock()
  1314. attacher, ok := c.attachers[aKey]
  1315. delete(c.attachers, aKey)
  1316. c.mu.Unlock()
  1317. if !ok {
  1318. return fmt.Errorf("could not find network attachment for container %s to network %s", containerID, target)
  1319. }
  1320. close(attacher.detachWaitCh)
  1321. return nil
  1322. }
  1323. // CreateNetwork creates a new cluster managed network.
  1324. func (c *Cluster) CreateNetwork(s apitypes.NetworkCreateRequest) (string, error) {
  1325. c.mu.RLock()
  1326. defer c.mu.RUnlock()
  1327. state := c.currentNodeState()
  1328. if !state.IsActiveManager() {
  1329. return "", c.errNoManager(state)
  1330. }
  1331. if runconfig.IsPreDefinedNetwork(s.Name) {
  1332. err := fmt.Errorf("%s is a pre-defined network and cannot be created", s.Name)
  1333. return "", apierrors.NewRequestForbiddenError(err)
  1334. }
  1335. ctx, cancel := c.getRequestContext()
  1336. defer cancel()
  1337. networkSpec := convert.BasicNetworkCreateToGRPC(s)
  1338. r, err := state.controlClient.CreateNetwork(ctx, &swarmapi.CreateNetworkRequest{Spec: &networkSpec})
  1339. if err != nil {
  1340. return "", err
  1341. }
  1342. return r.Network.ID, nil
  1343. }
  1344. // RemoveNetwork removes a cluster network.
  1345. func (c *Cluster) RemoveNetwork(input string) error {
  1346. c.mu.RLock()
  1347. defer c.mu.RUnlock()
  1348. state := c.currentNodeState()
  1349. if !state.IsActiveManager() {
  1350. return c.errNoManager(state)
  1351. }
  1352. ctx, cancel := c.getRequestContext()
  1353. defer cancel()
  1354. network, err := getNetwork(ctx, state.controlClient, input)
  1355. if err != nil {
  1356. return err
  1357. }
  1358. _, err = state.controlClient.RemoveNetwork(ctx, &swarmapi.RemoveNetworkRequest{NetworkID: network.ID})
  1359. return err
  1360. }
  1361. func (c *Cluster) populateNetworkID(ctx context.Context, client swarmapi.ControlClient, s *types.ServiceSpec) error {
  1362. // Always prefer NetworkAttachmentConfigs from TaskTemplate
  1363. // but fallback to service spec for backward compatibility
  1364. networks := s.TaskTemplate.Networks
  1365. if len(networks) == 0 {
  1366. networks = s.Networks
  1367. }
  1368. for i, n := range networks {
  1369. apiNetwork, err := getNetwork(ctx, client, n.Target)
  1370. if err != nil {
  1371. if ln, _ := c.config.Backend.FindNetwork(n.Target); ln != nil && !ln.Info().Dynamic() {
  1372. err = fmt.Errorf("The network %s cannot be used with services. Only networks scoped to the swarm can be used, such as those created with the overlay driver.", ln.Name())
  1373. return apierrors.NewRequestForbiddenError(err)
  1374. }
  1375. return err
  1376. }
  1377. networks[i].Target = apiNetwork.ID
  1378. }
  1379. return nil
  1380. }
  1381. // Cleanup stops active swarm node. This is run before daemon shutdown.
  1382. func (c *Cluster) Cleanup() {
  1383. c.controlMutex.Lock()
  1384. defer c.controlMutex.Unlock()
  1385. c.mu.Lock()
  1386. node := c.nr
  1387. if node == nil {
  1388. c.mu.Unlock()
  1389. return
  1390. }
  1391. defer c.mu.Unlock()
  1392. state := c.currentNodeState()
  1393. if state.IsActiveManager() {
  1394. active, reachable, unreachable, err := managerStats(state.controlClient, state.NodeID())
  1395. if err == nil {
  1396. singlenode := active && isLastManager(reachable, unreachable)
  1397. if active && !singlenode && removingManagerCausesLossOfQuorum(reachable, unreachable) {
  1398. logrus.Errorf("Leaving cluster with %v managers left out of %v. Raft quorum will be lost.", reachable-1, reachable+unreachable)
  1399. }
  1400. }
  1401. }
  1402. if err := node.Stop(); err != nil {
  1403. logrus.Errorf("failed to shut down cluster node: %v", err)
  1404. signal.DumpStacks("")
  1405. }
  1406. c.nr = nil
  1407. }
  1408. func managerStats(client swarmapi.ControlClient, currentNodeID string) (current bool, reachable int, unreachable int, err error) {
  1409. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  1410. defer cancel()
  1411. nodes, err := client.ListNodes(ctx, &swarmapi.ListNodesRequest{})
  1412. if err != nil {
  1413. return false, 0, 0, err
  1414. }
  1415. for _, n := range nodes.Nodes {
  1416. if n.ManagerStatus != nil {
  1417. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_REACHABLE {
  1418. reachable++
  1419. if n.ID == currentNodeID {
  1420. current = true
  1421. }
  1422. }
  1423. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_UNREACHABLE {
  1424. unreachable++
  1425. }
  1426. }
  1427. }
  1428. return
  1429. }
  1430. func validateAndSanitizeInitRequest(req *types.InitRequest) error {
  1431. var err error
  1432. req.ListenAddr, err = validateAddr(req.ListenAddr)
  1433. if err != nil {
  1434. return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
  1435. }
  1436. if req.Spec.Annotations.Name == "" {
  1437. req.Spec.Annotations.Name = "default"
  1438. } else if req.Spec.Annotations.Name != "default" {
  1439. return errors.New(`swarm spec must be named "default"`)
  1440. }
  1441. return nil
  1442. }
  1443. func validateAndSanitizeJoinRequest(req *types.JoinRequest) error {
  1444. var err error
  1445. req.ListenAddr, err = validateAddr(req.ListenAddr)
  1446. if err != nil {
  1447. return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
  1448. }
  1449. if len(req.RemoteAddrs) == 0 {
  1450. return errors.New("at least 1 RemoteAddr is required to join")
  1451. }
  1452. for i := range req.RemoteAddrs {
  1453. req.RemoteAddrs[i], err = validateAddr(req.RemoteAddrs[i])
  1454. if err != nil {
  1455. return fmt.Errorf("invalid remoteAddr %q: %v", req.RemoteAddrs[i], err)
  1456. }
  1457. }
  1458. return nil
  1459. }
  1460. func validateAddr(addr string) (string, error) {
  1461. if addr == "" {
  1462. return addr, errors.New("invalid empty address")
  1463. }
  1464. newaddr, err := opts.ParseTCPAddr(addr, defaultAddr)
  1465. if err != nil {
  1466. return addr, nil
  1467. }
  1468. return strings.TrimPrefix(newaddr, "tcp://"), nil
  1469. }
  1470. func initClusterSpec(node *swarmnode.Node, spec types.Spec) error {
  1471. ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
  1472. for conn := range node.ListenControlSocket(ctx) {
  1473. if ctx.Err() != nil {
  1474. return ctx.Err()
  1475. }
  1476. if conn != nil {
  1477. client := swarmapi.NewControlClient(conn)
  1478. var cluster *swarmapi.Cluster
  1479. for i := 0; ; i++ {
  1480. lcr, err := client.ListClusters(ctx, &swarmapi.ListClustersRequest{})
  1481. if err != nil {
  1482. return fmt.Errorf("error on listing clusters: %v", err)
  1483. }
  1484. if len(lcr.Clusters) == 0 {
  1485. if i < 10 {
  1486. time.Sleep(200 * time.Millisecond)
  1487. continue
  1488. }
  1489. return errors.New("empty list of clusters was returned")
  1490. }
  1491. cluster = lcr.Clusters[0]
  1492. break
  1493. }
  1494. // In init, we take the initial default values from swarmkit, and merge
  1495. // any non nil or 0 value from spec to GRPC spec. This will leave the
  1496. // default value alone.
  1497. // Note that this is different from Update(), as in Update() we expect
  1498. // user to specify the complete spec of the cluster (as they already know
  1499. // the existing one and knows which field to update)
  1500. clusterSpec, err := convert.MergeSwarmSpecToGRPC(spec, cluster.Spec)
  1501. if err != nil {
  1502. return fmt.Errorf("error updating cluster settings: %v", err)
  1503. }
  1504. _, err = client.UpdateCluster(ctx, &swarmapi.UpdateClusterRequest{
  1505. ClusterID: cluster.ID,
  1506. ClusterVersion: &cluster.Meta.Version,
  1507. Spec: &clusterSpec,
  1508. })
  1509. if err != nil {
  1510. return fmt.Errorf("error updating cluster settings: %v", err)
  1511. }
  1512. return nil
  1513. }
  1514. }
  1515. return ctx.Err()
  1516. }
  1517. func detectLockedError(err error) error {
  1518. if err == swarmnode.ErrInvalidUnlockKey {
  1519. return errors.WithStack(errSwarmLocked)
  1520. }
  1521. return err
  1522. }