cluster.go 49 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773
  1. package cluster
  2. //
  3. // ## Swarmkit integration
  4. //
  5. // Cluster - static configurable object for accessing everything swarm related.
  6. // Contains methods for connecting and controlling the cluster. Exists always,
  7. // even if swarm mode is not enabled.
  8. //
  9. // NodeRunner - Manager for starting the swarmkit node. Is present only and
  10. // always if swarm mode is enabled. Implements backoff restart loop in case of
  11. // errors.
  12. //
  13. // NodeState - Information about the current node status including access to
  14. // gRPC clients if a manager is active.
  15. //
  16. // ### Locking
  17. //
  18. // `cluster.controlMutex` - taken for the whole lifecycle of the processes that
  19. // can reconfigure cluster(init/join/leave etc). Protects that one
  20. // reconfiguration action has fully completed before another can start.
  21. //
  22. // `cluster.mu` - taken when the actual changes in cluster configurations
  23. // happen. Different from `controlMutex` because in some cases we need to
  24. // access current cluster state even if the long-running reconfiguration is
  25. // going on. For example network stack may ask for the current cluster state in
  26. // the middle of the shutdown. Any time current cluster state is asked you
  27. // should take the read lock of `cluster.mu`. If you are writing an API
  28. // responder that returns synchronously, hold `cluster.mu.RLock()` for the
  29. // duration of the whole handler function. That ensures that node will not be
  30. // shut down until the handler has finished.
  31. //
  32. // NodeRunner implements its internal locks that should not be used outside of
  33. // the struct. Instead, you should just call `nodeRunner.State()` method to get
  34. // the current state of the cluster(still need `cluster.mu.RLock()` to access
  35. // `cluster.nr` reference itself). Most of the changes in NodeRunner happen
  36. // because of an external event(network problem, unexpected swarmkit error) and
  37. // Docker shouldn't take any locks that delay these changes from happening.
  38. //
  39. import (
  40. "crypto/x509"
  41. "encoding/base64"
  42. "encoding/json"
  43. "fmt"
  44. "io"
  45. "net"
  46. "os"
  47. "path/filepath"
  48. "strings"
  49. "sync"
  50. "time"
  51. "github.com/Sirupsen/logrus"
  52. distreference "github.com/docker/distribution/reference"
  53. apierrors "github.com/docker/docker/api/errors"
  54. apitypes "github.com/docker/docker/api/types"
  55. "github.com/docker/docker/api/types/backend"
  56. "github.com/docker/docker/api/types/filters"
  57. "github.com/docker/docker/api/types/network"
  58. types "github.com/docker/docker/api/types/swarm"
  59. "github.com/docker/docker/daemon/cluster/convert"
  60. executorpkg "github.com/docker/docker/daemon/cluster/executor"
  61. "github.com/docker/docker/daemon/logger"
  62. "github.com/docker/docker/opts"
  63. "github.com/docker/docker/pkg/ioutils"
  64. "github.com/docker/docker/pkg/signal"
  65. "github.com/docker/docker/pkg/stdcopy"
  66. "github.com/docker/docker/reference"
  67. "github.com/docker/docker/runconfig"
  68. swarmapi "github.com/docker/swarmkit/api"
  69. "github.com/docker/swarmkit/manager/encryption"
  70. swarmnode "github.com/docker/swarmkit/node"
  71. "github.com/docker/swarmkit/protobuf/ptypes"
  72. "github.com/opencontainers/go-digest"
  73. "github.com/pkg/errors"
  74. "golang.org/x/net/context"
  75. )
  76. const swarmDirName = "swarm"
  77. const controlSocket = "control.sock"
  78. const swarmConnectTimeout = 20 * time.Second
  79. const swarmRequestTimeout = 20 * time.Second
  80. const stateFile = "docker-state.json"
  81. const defaultAddr = "0.0.0.0:2377"
  82. const (
  83. initialReconnectDelay = 100 * time.Millisecond
  84. maxReconnectDelay = 30 * time.Second
  85. contextPrefix = "com.docker.swarm"
  86. )
  87. // errNoSwarm is returned on leaving a cluster that was never initialized
  88. var errNoSwarm = errors.New("This node is not part of a swarm")
  89. // errSwarmExists is returned on initialize or join request for a cluster that has already been activated
  90. var errSwarmExists = errors.New("This node is already part of a swarm. Use \"docker swarm leave\" to leave this swarm and join another one.")
  91. // errSwarmJoinTimeoutReached is returned when cluster join could not complete before timeout was reached.
  92. var errSwarmJoinTimeoutReached = errors.New("Timeout was reached before node was joined. The attempt to join the swarm will continue in the background. Use the \"docker info\" command to see the current swarm status of your node.")
  93. // errSwarmLocked is returned if the swarm is encrypted and needs a key to unlock it.
  94. var errSwarmLocked = errors.New("Swarm is encrypted and needs to be unlocked before it can be used. Please use \"docker swarm unlock\" to unlock it.")
  95. // errSwarmCertificatesExpired is returned if docker was not started for the whole validity period and they had no chance to renew automatically.
  96. var errSwarmCertificatesExpired = errors.New("Swarm certificates have expired. To replace them, leave the swarm and join again.")
  97. // NetworkSubnetsProvider exposes functions for retrieving the subnets
  98. // of networks managed by Docker, so they can be filtered.
  99. type NetworkSubnetsProvider interface {
  100. V4Subnets() []net.IPNet
  101. V6Subnets() []net.IPNet
  102. }
  103. // Config provides values for Cluster.
  104. type Config struct {
  105. Root string
  106. Name string
  107. Backend executorpkg.Backend
  108. NetworkSubnetsProvider NetworkSubnetsProvider
  109. // DefaultAdvertiseAddr is the default host/IP or network interface to use
  110. // if no AdvertiseAddr value is specified.
  111. DefaultAdvertiseAddr string
  112. // path to store runtime state, such as the swarm control socket
  113. RuntimeRoot string
  114. }
  115. // Cluster provides capabilities to participate in a cluster as a worker or a
  116. // manager.
  117. type Cluster struct {
  118. mu sync.RWMutex
  119. controlMutex sync.RWMutex // protect init/join/leave user operations
  120. nr *nodeRunner
  121. root string
  122. runtimeRoot string
  123. config Config
  124. configEvent chan struct{} // todo: make this array and goroutine safe
  125. attachers map[string]*attacher
  126. }
  127. // attacher manages the in-memory attachment state of a container
  128. // attachment to a global scope network managed by swarm manager. It
  129. // helps in identifying the attachment ID via the taskID and the
  130. // corresponding attachment configuration obtained from the manager.
  131. type attacher struct {
  132. taskID string
  133. config *network.NetworkingConfig
  134. attachWaitCh chan *network.NetworkingConfig
  135. attachCompleteCh chan struct{}
  136. detachWaitCh chan struct{}
  137. }
  138. // New creates a new Cluster instance using provided config.
  139. func New(config Config) (*Cluster, error) {
  140. root := filepath.Join(config.Root, swarmDirName)
  141. if err := os.MkdirAll(root, 0700); err != nil {
  142. return nil, err
  143. }
  144. if config.RuntimeRoot == "" {
  145. config.RuntimeRoot = root
  146. }
  147. if err := os.MkdirAll(config.RuntimeRoot, 0700); err != nil {
  148. return nil, err
  149. }
  150. c := &Cluster{
  151. root: root,
  152. config: config,
  153. configEvent: make(chan struct{}, 10),
  154. runtimeRoot: config.RuntimeRoot,
  155. attachers: make(map[string]*attacher),
  156. }
  157. nodeConfig, err := loadPersistentState(root)
  158. if err != nil {
  159. if os.IsNotExist(err) {
  160. return c, nil
  161. }
  162. return nil, err
  163. }
  164. nr, err := c.newNodeRunner(*nodeConfig)
  165. if err != nil {
  166. return nil, err
  167. }
  168. c.nr = nr
  169. select {
  170. case <-time.After(swarmConnectTimeout):
  171. logrus.Error("swarm component could not be started before timeout was reached")
  172. case err := <-nr.Ready():
  173. if err != nil {
  174. if errors.Cause(err) == errSwarmLocked {
  175. return c, nil
  176. }
  177. if err, ok := errors.Cause(c.nr.err).(x509.CertificateInvalidError); ok && err.Reason == x509.Expired {
  178. return c, nil
  179. }
  180. return nil, errors.Wrap(err, "swarm component could not be started")
  181. }
  182. }
  183. return c, nil
  184. }
  185. func (c *Cluster) newNodeRunner(conf nodeStartConfig) (*nodeRunner, error) {
  186. if err := c.config.Backend.IsSwarmCompatible(); err != nil {
  187. return nil, err
  188. }
  189. actualLocalAddr := conf.LocalAddr
  190. if actualLocalAddr == "" {
  191. // If localAddr was not specified, resolve it automatically
  192. // based on the route to joinAddr. localAddr can only be left
  193. // empty on "join".
  194. listenHost, _, err := net.SplitHostPort(conf.ListenAddr)
  195. if err != nil {
  196. return nil, fmt.Errorf("could not parse listen address: %v", err)
  197. }
  198. listenAddrIP := net.ParseIP(listenHost)
  199. if listenAddrIP == nil || !listenAddrIP.IsUnspecified() {
  200. actualLocalAddr = listenHost
  201. } else {
  202. if conf.RemoteAddr == "" {
  203. // Should never happen except using swarms created by
  204. // old versions that didn't save remoteAddr.
  205. conf.RemoteAddr = "8.8.8.8:53"
  206. }
  207. conn, err := net.Dial("udp", conf.RemoteAddr)
  208. if err != nil {
  209. return nil, fmt.Errorf("could not find local IP address: %v", err)
  210. }
  211. localHostPort := conn.LocalAddr().String()
  212. actualLocalAddr, _, _ = net.SplitHostPort(localHostPort)
  213. conn.Close()
  214. }
  215. }
  216. nr := &nodeRunner{cluster: c}
  217. nr.actualLocalAddr = actualLocalAddr
  218. if err := nr.Start(conf); err != nil {
  219. return nil, err
  220. }
  221. c.config.Backend.SetClusterProvider(c)
  222. return nr, nil
  223. }
  224. // Init initializes new cluster from user provided request.
  225. func (c *Cluster) Init(req types.InitRequest) (string, error) {
  226. c.controlMutex.Lock()
  227. defer c.controlMutex.Unlock()
  228. c.mu.Lock()
  229. if c.nr != nil {
  230. if req.ForceNewCluster {
  231. if err := c.nr.Stop(); err != nil {
  232. c.mu.Unlock()
  233. return "", err
  234. }
  235. } else {
  236. c.mu.Unlock()
  237. return "", errSwarmExists
  238. }
  239. }
  240. c.mu.Unlock()
  241. if err := validateAndSanitizeInitRequest(&req); err != nil {
  242. return "", apierrors.NewBadRequestError(err)
  243. }
  244. listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
  245. if err != nil {
  246. return "", err
  247. }
  248. advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
  249. if err != nil {
  250. return "", err
  251. }
  252. localAddr := listenHost
  253. // If the local address is undetermined, the advertise address
  254. // will be used as local address, if it belongs to this system.
  255. // If the advertise address is not local, then we try to find
  256. // a system address to use as local address. If this fails,
  257. // we give up and ask the user to pass the listen address.
  258. if net.ParseIP(localAddr).IsUnspecified() {
  259. advertiseIP := net.ParseIP(advertiseHost)
  260. found := false
  261. for _, systemIP := range listSystemIPs() {
  262. if systemIP.Equal(advertiseIP) {
  263. localAddr = advertiseIP.String()
  264. found = true
  265. break
  266. }
  267. }
  268. if !found {
  269. ip, err := c.resolveSystemAddr()
  270. if err != nil {
  271. logrus.Warnf("Could not find a local address: %v", err)
  272. return "", errMustSpecifyListenAddr
  273. }
  274. localAddr = ip.String()
  275. }
  276. }
  277. if !req.ForceNewCluster {
  278. clearPersistentState(c.root)
  279. }
  280. nr, err := c.newNodeRunner(nodeStartConfig{
  281. forceNewCluster: req.ForceNewCluster,
  282. autolock: req.AutoLockManagers,
  283. LocalAddr: localAddr,
  284. ListenAddr: net.JoinHostPort(listenHost, listenPort),
  285. AdvertiseAddr: net.JoinHostPort(advertiseHost, advertisePort),
  286. availability: req.Availability,
  287. })
  288. if err != nil {
  289. return "", err
  290. }
  291. c.mu.Lock()
  292. c.nr = nr
  293. c.mu.Unlock()
  294. if err := <-nr.Ready(); err != nil {
  295. if !req.ForceNewCluster { // if failure on first attempt don't keep state
  296. if err := clearPersistentState(c.root); err != nil {
  297. return "", err
  298. }
  299. }
  300. if err != nil {
  301. c.mu.Lock()
  302. c.nr = nil
  303. c.mu.Unlock()
  304. }
  305. return "", err
  306. }
  307. state := nr.State()
  308. if state.swarmNode == nil { // should never happen but protect from panic
  309. return "", errors.New("invalid cluster state for spec initialization")
  310. }
  311. if err := initClusterSpec(state.swarmNode, req.Spec); err != nil {
  312. return "", err
  313. }
  314. return state.NodeID(), nil
  315. }
  316. // Join makes current Cluster part of an existing swarm cluster.
  317. func (c *Cluster) Join(req types.JoinRequest) error {
  318. c.controlMutex.Lock()
  319. defer c.controlMutex.Unlock()
  320. c.mu.Lock()
  321. if c.nr != nil {
  322. c.mu.Unlock()
  323. return errSwarmExists
  324. }
  325. c.mu.Unlock()
  326. if err := validateAndSanitizeJoinRequest(&req); err != nil {
  327. return apierrors.NewBadRequestError(err)
  328. }
  329. listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
  330. if err != nil {
  331. return err
  332. }
  333. var advertiseAddr string
  334. if req.AdvertiseAddr != "" {
  335. advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
  336. // For joining, we don't need to provide an advertise address,
  337. // since the remote side can detect it.
  338. if err == nil {
  339. advertiseAddr = net.JoinHostPort(advertiseHost, advertisePort)
  340. }
  341. }
  342. clearPersistentState(c.root)
  343. nr, err := c.newNodeRunner(nodeStartConfig{
  344. RemoteAddr: req.RemoteAddrs[0],
  345. ListenAddr: net.JoinHostPort(listenHost, listenPort),
  346. AdvertiseAddr: advertiseAddr,
  347. joinAddr: req.RemoteAddrs[0],
  348. joinToken: req.JoinToken,
  349. availability: req.Availability,
  350. })
  351. if err != nil {
  352. return err
  353. }
  354. c.mu.Lock()
  355. c.nr = nr
  356. c.mu.Unlock()
  357. select {
  358. case <-time.After(swarmConnectTimeout):
  359. return errSwarmJoinTimeoutReached
  360. case err := <-nr.Ready():
  361. if err != nil {
  362. c.mu.Lock()
  363. c.nr = nil
  364. c.mu.Unlock()
  365. }
  366. return err
  367. }
  368. }
  369. // GetUnlockKey returns the unlock key for the swarm.
  370. func (c *Cluster) GetUnlockKey() (string, error) {
  371. c.mu.RLock()
  372. defer c.mu.RUnlock()
  373. state := c.currentNodeState()
  374. if !state.IsActiveManager() {
  375. return "", c.errNoManager(state)
  376. }
  377. ctx, cancel := c.getRequestContext()
  378. defer cancel()
  379. client := swarmapi.NewCAClient(state.grpcConn)
  380. r, err := client.GetUnlockKey(ctx, &swarmapi.GetUnlockKeyRequest{})
  381. if err != nil {
  382. return "", err
  383. }
  384. if len(r.UnlockKey) == 0 {
  385. // no key
  386. return "", nil
  387. }
  388. return encryption.HumanReadableKey(r.UnlockKey), nil
  389. }
  390. // UnlockSwarm provides a key to decrypt data that is encrypted at rest.
  391. func (c *Cluster) UnlockSwarm(req types.UnlockRequest) error {
  392. c.controlMutex.Lock()
  393. defer c.controlMutex.Unlock()
  394. c.mu.RLock()
  395. state := c.currentNodeState()
  396. if !state.IsActiveManager() {
  397. // when manager is not active,
  398. // unless it is locked, otherwise return error.
  399. if err := c.errNoManager(state); err != errSwarmLocked {
  400. c.mu.RUnlock()
  401. return err
  402. }
  403. } else {
  404. // when manager is active, return an error of "not locked"
  405. c.mu.RUnlock()
  406. return errors.New("swarm is not locked")
  407. }
  408. // only when swarm is locked, code running reaches here
  409. nr := c.nr
  410. c.mu.RUnlock()
  411. key, err := encryption.ParseHumanReadableKey(req.UnlockKey)
  412. if err != nil {
  413. return err
  414. }
  415. config := nr.config
  416. config.lockKey = key
  417. if err := nr.Stop(); err != nil {
  418. return err
  419. }
  420. nr, err = c.newNodeRunner(config)
  421. if err != nil {
  422. return err
  423. }
  424. c.mu.Lock()
  425. c.nr = nr
  426. c.mu.Unlock()
  427. if err := <-nr.Ready(); err != nil {
  428. if errors.Cause(err) == errSwarmLocked {
  429. return errors.New("swarm could not be unlocked: invalid key provided")
  430. }
  431. return fmt.Errorf("swarm component could not be started: %v", err)
  432. }
  433. return nil
  434. }
  435. // Leave shuts down Cluster and removes current state.
  436. func (c *Cluster) Leave(force bool) error {
  437. c.controlMutex.Lock()
  438. defer c.controlMutex.Unlock()
  439. c.mu.Lock()
  440. nr := c.nr
  441. if nr == nil {
  442. c.mu.Unlock()
  443. return errNoSwarm
  444. }
  445. state := c.currentNodeState()
  446. if errors.Cause(state.err) == errSwarmLocked && !force {
  447. // leave a locked swarm without --force is not allowed
  448. c.mu.Unlock()
  449. return errors.New("Swarm is encrypted and locked. Please unlock it first or use `--force` to ignore this message.")
  450. }
  451. if state.IsManager() && !force {
  452. msg := "You are attempting to leave the swarm on a node that is participating as a manager. "
  453. if state.IsActiveManager() {
  454. active, reachable, unreachable, err := managerStats(state.controlClient, state.NodeID())
  455. if err == nil {
  456. if active && removingManagerCausesLossOfQuorum(reachable, unreachable) {
  457. if isLastManager(reachable, unreachable) {
  458. msg += "Removing the last manager erases all current state of the swarm. Use `--force` to ignore this message. "
  459. c.mu.Unlock()
  460. return errors.New(msg)
  461. }
  462. msg += fmt.Sprintf("Removing this node leaves %v managers out of %v. Without a Raft quorum your swarm will be inaccessible. ", reachable-1, reachable+unreachable)
  463. }
  464. }
  465. } else {
  466. msg += "Doing so may lose the consensus of your cluster. "
  467. }
  468. msg += "The only way to restore a swarm that has lost consensus is to reinitialize it with `--force-new-cluster`. Use `--force` to suppress this message."
  469. c.mu.Unlock()
  470. return errors.New(msg)
  471. }
  472. // release readers in here
  473. if err := nr.Stop(); err != nil {
  474. logrus.Errorf("failed to shut down cluster node: %v", err)
  475. signal.DumpStacks("")
  476. c.mu.Unlock()
  477. return err
  478. }
  479. c.nr = nil
  480. c.mu.Unlock()
  481. if nodeID := state.NodeID(); nodeID != "" {
  482. nodeContainers, err := c.listContainerForNode(nodeID)
  483. if err != nil {
  484. return err
  485. }
  486. for _, id := range nodeContainers {
  487. if err := c.config.Backend.ContainerRm(id, &apitypes.ContainerRmConfig{ForceRemove: true}); err != nil {
  488. logrus.Errorf("error removing %v: %v", id, err)
  489. }
  490. }
  491. }
  492. c.configEvent <- struct{}{}
  493. // todo: cleanup optional?
  494. if err := clearPersistentState(c.root); err != nil {
  495. return err
  496. }
  497. c.config.Backend.SetClusterProvider(nil)
  498. return nil
  499. }
  500. func (c *Cluster) listContainerForNode(nodeID string) ([]string, error) {
  501. var ids []string
  502. filters := filters.NewArgs()
  503. filters.Add("label", fmt.Sprintf("com.docker.swarm.node.id=%s", nodeID))
  504. containers, err := c.config.Backend.Containers(&apitypes.ContainerListOptions{
  505. Filters: filters,
  506. })
  507. if err != nil {
  508. return []string{}, err
  509. }
  510. for _, c := range containers {
  511. ids = append(ids, c.ID)
  512. }
  513. return ids, nil
  514. }
  515. func (c *Cluster) getRequestContext() (context.Context, func()) { // TODO: not needed when requests don't block on qourum lost
  516. return context.WithTimeout(context.Background(), swarmRequestTimeout)
  517. }
  518. // Inspect retrieves the configuration properties of a managed swarm cluster.
  519. func (c *Cluster) Inspect() (types.Swarm, error) {
  520. c.mu.RLock()
  521. defer c.mu.RUnlock()
  522. state := c.currentNodeState()
  523. if !state.IsActiveManager() {
  524. return types.Swarm{}, c.errNoManager(state)
  525. }
  526. ctx, cancel := c.getRequestContext()
  527. defer cancel()
  528. swarm, err := getSwarm(ctx, state.controlClient)
  529. if err != nil {
  530. return types.Swarm{}, err
  531. }
  532. return convert.SwarmFromGRPC(*swarm), nil
  533. }
  534. // Update updates configuration of a managed swarm cluster.
  535. func (c *Cluster) Update(version uint64, spec types.Spec, flags types.UpdateFlags) error {
  536. c.mu.RLock()
  537. defer c.mu.RUnlock()
  538. state := c.currentNodeState()
  539. if !state.IsActiveManager() {
  540. return c.errNoManager(state)
  541. }
  542. ctx, cancel := c.getRequestContext()
  543. defer cancel()
  544. swarm, err := getSwarm(ctx, state.controlClient)
  545. if err != nil {
  546. return err
  547. }
  548. // In update, client should provide the complete spec of the swarm, including
  549. // Name and Labels. If a field is specified with 0 or nil, then the default value
  550. // will be used to swarmkit.
  551. clusterSpec, err := convert.SwarmSpecToGRPC(spec)
  552. if err != nil {
  553. return apierrors.NewBadRequestError(err)
  554. }
  555. _, err = state.controlClient.UpdateCluster(
  556. ctx,
  557. &swarmapi.UpdateClusterRequest{
  558. ClusterID: swarm.ID,
  559. Spec: &clusterSpec,
  560. ClusterVersion: &swarmapi.Version{
  561. Index: version,
  562. },
  563. Rotation: swarmapi.KeyRotation{
  564. WorkerJoinToken: flags.RotateWorkerToken,
  565. ManagerJoinToken: flags.RotateManagerToken,
  566. ManagerUnlockKey: flags.RotateManagerUnlockKey,
  567. },
  568. },
  569. )
  570. return err
  571. }
  572. // IsManager returns true if Cluster is participating as a manager.
  573. func (c *Cluster) IsManager() bool {
  574. c.mu.RLock()
  575. defer c.mu.RUnlock()
  576. return c.currentNodeState().IsActiveManager()
  577. }
  578. // IsAgent returns true if Cluster is participating as a worker/agent.
  579. func (c *Cluster) IsAgent() bool {
  580. c.mu.RLock()
  581. defer c.mu.RUnlock()
  582. return c.currentNodeState().status == types.LocalNodeStateActive
  583. }
  584. // GetLocalAddress returns the local address.
  585. func (c *Cluster) GetLocalAddress() string {
  586. c.mu.RLock()
  587. defer c.mu.RUnlock()
  588. return c.currentNodeState().actualLocalAddr
  589. }
  590. // GetListenAddress returns the listen address.
  591. func (c *Cluster) GetListenAddress() string {
  592. c.mu.RLock()
  593. defer c.mu.RUnlock()
  594. if c.nr != nil {
  595. return c.nr.config.ListenAddr
  596. }
  597. return ""
  598. }
  599. // GetAdvertiseAddress returns the remotely reachable address of this node.
  600. func (c *Cluster) GetAdvertiseAddress() string {
  601. c.mu.RLock()
  602. defer c.mu.RUnlock()
  603. if c.nr != nil && c.nr.config.AdvertiseAddr != "" {
  604. advertiseHost, _, _ := net.SplitHostPort(c.nr.config.AdvertiseAddr)
  605. return advertiseHost
  606. }
  607. return c.currentNodeState().actualLocalAddr
  608. }
  609. // GetRemoteAddress returns a known advertise address of a remote manager if
  610. // available.
  611. // todo: change to array/connect with info
  612. func (c *Cluster) GetRemoteAddress() string {
  613. c.mu.RLock()
  614. defer c.mu.RUnlock()
  615. return c.getRemoteAddress()
  616. }
  617. func (c *Cluster) getRemoteAddress() string {
  618. state := c.currentNodeState()
  619. if state.swarmNode == nil {
  620. return ""
  621. }
  622. nodeID := state.swarmNode.NodeID()
  623. for _, r := range state.swarmNode.Remotes() {
  624. if r.NodeID != nodeID {
  625. return r.Addr
  626. }
  627. }
  628. return ""
  629. }
  630. // ListenClusterEvents returns a channel that receives messages on cluster
  631. // participation changes.
  632. // todo: make cancelable and accessible to multiple callers
  633. func (c *Cluster) ListenClusterEvents() <-chan struct{} {
  634. return c.configEvent
  635. }
  636. // Info returns information about the current cluster state.
  637. func (c *Cluster) Info() types.Info {
  638. info := types.Info{
  639. NodeAddr: c.GetAdvertiseAddress(),
  640. }
  641. c.mu.RLock()
  642. defer c.mu.RUnlock()
  643. state := c.currentNodeState()
  644. info.LocalNodeState = state.status
  645. if state.err != nil {
  646. info.Error = state.err.Error()
  647. }
  648. ctx, cancel := c.getRequestContext()
  649. defer cancel()
  650. if state.IsActiveManager() {
  651. info.ControlAvailable = true
  652. swarm, err := c.Inspect()
  653. if err != nil {
  654. info.Error = err.Error()
  655. }
  656. // Strip JoinTokens
  657. info.Cluster = swarm.ClusterInfo
  658. if r, err := state.controlClient.ListNodes(ctx, &swarmapi.ListNodesRequest{}); err != nil {
  659. info.Error = err.Error()
  660. } else {
  661. info.Nodes = len(r.Nodes)
  662. for _, n := range r.Nodes {
  663. if n.ManagerStatus != nil {
  664. info.Managers = info.Managers + 1
  665. }
  666. }
  667. }
  668. }
  669. if state.swarmNode != nil {
  670. for _, r := range state.swarmNode.Remotes() {
  671. info.RemoteManagers = append(info.RemoteManagers, types.Peer{NodeID: r.NodeID, Addr: r.Addr})
  672. }
  673. info.NodeID = state.swarmNode.NodeID()
  674. }
  675. return info
  676. }
  677. // currentNodeState should not be called without a read lock
  678. func (c *Cluster) currentNodeState() nodeState {
  679. return c.nr.State()
  680. }
  681. // errNoManager returns error describing why manager commands can't be used.
  682. // Call with read lock.
  683. func (c *Cluster) errNoManager(st nodeState) error {
  684. if st.swarmNode == nil {
  685. if errors.Cause(st.err) == errSwarmLocked {
  686. return errSwarmLocked
  687. }
  688. if st.err == errSwarmCertificatesExpired {
  689. return errSwarmCertificatesExpired
  690. }
  691. return errors.New("This node is not a swarm manager. Use \"docker swarm init\" or \"docker swarm join\" to connect this node to swarm and try again.")
  692. }
  693. if st.swarmNode.Manager() != nil {
  694. return errors.New("This node is not a swarm manager. Manager is being prepared or has trouble connecting to the cluster.")
  695. }
  696. return errors.New("This node is not a swarm manager. Worker nodes can't be used to view or modify cluster state. Please run this command on a manager node or promote the current node to a manager.")
  697. }
  698. // GetServices returns all services of a managed swarm cluster.
  699. func (c *Cluster) GetServices(options apitypes.ServiceListOptions) ([]types.Service, error) {
  700. c.mu.RLock()
  701. defer c.mu.RUnlock()
  702. state := c.currentNodeState()
  703. if !state.IsActiveManager() {
  704. return nil, c.errNoManager(state)
  705. }
  706. filters, err := newListServicesFilters(options.Filters)
  707. if err != nil {
  708. return nil, err
  709. }
  710. ctx, cancel := c.getRequestContext()
  711. defer cancel()
  712. r, err := state.controlClient.ListServices(
  713. ctx,
  714. &swarmapi.ListServicesRequest{Filters: filters})
  715. if err != nil {
  716. return nil, err
  717. }
  718. services := []types.Service{}
  719. for _, service := range r.Services {
  720. services = append(services, convert.ServiceFromGRPC(*service))
  721. }
  722. return services, nil
  723. }
  724. // imageWithDigestString takes an image such as name or name:tag
  725. // and returns the image pinned to a digest, such as name@sha256:34234...
  726. // Due to the difference between the docker/docker/reference, and the
  727. // docker/distribution/reference packages, we're parsing the image twice.
  728. // As the two packages converge, this function should be simplified.
  729. // TODO(nishanttotla): After the packages converge, the function must
  730. // convert distreference.Named -> distreference.Canonical, and the logic simplified.
  731. func (c *Cluster) imageWithDigestString(ctx context.Context, image string, authConfig *apitypes.AuthConfig) (string, error) {
  732. if _, err := digest.Parse(image); err == nil {
  733. return "", errors.New("image reference is an image ID")
  734. }
  735. ref, err := distreference.ParseNamed(image)
  736. if err != nil {
  737. return "", err
  738. }
  739. // only query registry if not a canonical reference (i.e. with digest)
  740. if _, ok := ref.(distreference.Canonical); !ok {
  741. // create a docker/docker/reference Named object because GetRepository needs it
  742. dockerRef, err := reference.ParseNamed(image)
  743. if err != nil {
  744. return "", err
  745. }
  746. dockerRef = reference.WithDefaultTag(dockerRef)
  747. namedTaggedRef, ok := dockerRef.(reference.NamedTagged)
  748. if !ok {
  749. return "", errors.New("unable to cast image to NamedTagged reference object")
  750. }
  751. repo, _, err := c.config.Backend.GetRepository(ctx, namedTaggedRef, authConfig)
  752. if err != nil {
  753. return "", err
  754. }
  755. dscrptr, err := repo.Tags(ctx).Get(ctx, namedTaggedRef.Tag())
  756. if err != nil {
  757. return "", err
  758. }
  759. namedDigestedRef, err := distreference.WithDigest(distreference.EnsureTagged(ref), dscrptr.Digest)
  760. if err != nil {
  761. return "", err
  762. }
  763. return namedDigestedRef.String(), nil
  764. }
  765. // reference already contains a digest, so just return it
  766. return ref.String(), nil
  767. }
  768. // CreateService creates a new service in a managed swarm cluster.
  769. func (c *Cluster) CreateService(s types.ServiceSpec, encodedAuth string) (*apitypes.ServiceCreateResponse, error) {
  770. c.mu.RLock()
  771. defer c.mu.RUnlock()
  772. state := c.currentNodeState()
  773. if !state.IsActiveManager() {
  774. return nil, c.errNoManager(state)
  775. }
  776. ctx, cancel := c.getRequestContext()
  777. defer cancel()
  778. err := c.populateNetworkID(ctx, state.controlClient, &s)
  779. if err != nil {
  780. return nil, err
  781. }
  782. serviceSpec, err := convert.ServiceSpecToGRPC(s)
  783. if err != nil {
  784. return nil, apierrors.NewBadRequestError(err)
  785. }
  786. ctnr := serviceSpec.Task.GetContainer()
  787. if ctnr == nil {
  788. return nil, errors.New("service does not use container tasks")
  789. }
  790. if encodedAuth != "" {
  791. ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
  792. }
  793. // retrieve auth config from encoded auth
  794. authConfig := &apitypes.AuthConfig{}
  795. if encodedAuth != "" {
  796. if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil {
  797. logrus.Warnf("invalid authconfig: %v", err)
  798. }
  799. }
  800. resp := &apitypes.ServiceCreateResponse{}
  801. // pin image by digest
  802. if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" {
  803. digestImage, err := c.imageWithDigestString(ctx, ctnr.Image, authConfig)
  804. if err != nil {
  805. logrus.Warnf("unable to pin image %s to digest: %s", ctnr.Image, err.Error())
  806. resp.Warnings = append(resp.Warnings, fmt.Sprintf("unable to pin image %s to digest: %s", ctnr.Image, err.Error()))
  807. } else if ctnr.Image != digestImage {
  808. logrus.Debugf("pinning image %s by digest: %s", ctnr.Image, digestImage)
  809. ctnr.Image = digestImage
  810. } else {
  811. logrus.Debugf("creating service using supplied digest reference %s", ctnr.Image)
  812. }
  813. }
  814. r, err := state.controlClient.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
  815. if err != nil {
  816. return nil, err
  817. }
  818. resp.ID = r.Service.ID
  819. return resp, nil
  820. }
  821. // GetService returns a service based on an ID or name.
  822. func (c *Cluster) GetService(input string) (types.Service, error) {
  823. c.mu.RLock()
  824. defer c.mu.RUnlock()
  825. state := c.currentNodeState()
  826. if !state.IsActiveManager() {
  827. return types.Service{}, c.errNoManager(state)
  828. }
  829. ctx, cancel := c.getRequestContext()
  830. defer cancel()
  831. service, err := getService(ctx, state.controlClient, input)
  832. if err != nil {
  833. return types.Service{}, err
  834. }
  835. return convert.ServiceFromGRPC(*service), nil
  836. }
  837. // UpdateService updates existing service to match new properties.
  838. func (c *Cluster) UpdateService(serviceIDOrName string, version uint64, spec types.ServiceSpec, encodedAuth string, registryAuthFrom string) (*apitypes.ServiceUpdateResponse, error) {
  839. c.mu.RLock()
  840. defer c.mu.RUnlock()
  841. state := c.currentNodeState()
  842. if !state.IsActiveManager() {
  843. return nil, c.errNoManager(state)
  844. }
  845. ctx, cancel := c.getRequestContext()
  846. defer cancel()
  847. err := c.populateNetworkID(ctx, state.controlClient, &spec)
  848. if err != nil {
  849. return nil, err
  850. }
  851. serviceSpec, err := convert.ServiceSpecToGRPC(spec)
  852. if err != nil {
  853. return nil, apierrors.NewBadRequestError(err)
  854. }
  855. currentService, err := getService(ctx, state.controlClient, serviceIDOrName)
  856. if err != nil {
  857. return nil, err
  858. }
  859. newCtnr := serviceSpec.Task.GetContainer()
  860. if newCtnr == nil {
  861. return nil, errors.New("service does not use container tasks")
  862. }
  863. if encodedAuth != "" {
  864. newCtnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
  865. } else {
  866. // this is needed because if the encodedAuth isn't being updated then we
  867. // shouldn't lose it, and continue to use the one that was already present
  868. var ctnr *swarmapi.ContainerSpec
  869. switch registryAuthFrom {
  870. case apitypes.RegistryAuthFromSpec, "":
  871. ctnr = currentService.Spec.Task.GetContainer()
  872. case apitypes.RegistryAuthFromPreviousSpec:
  873. if currentService.PreviousSpec == nil {
  874. return nil, errors.New("service does not have a previous spec")
  875. }
  876. ctnr = currentService.PreviousSpec.Task.GetContainer()
  877. default:
  878. return nil, errors.New("unsupported registryAuthFrom value")
  879. }
  880. if ctnr == nil {
  881. return nil, errors.New("service does not use container tasks")
  882. }
  883. newCtnr.PullOptions = ctnr.PullOptions
  884. // update encodedAuth so it can be used to pin image by digest
  885. if ctnr.PullOptions != nil {
  886. encodedAuth = ctnr.PullOptions.RegistryAuth
  887. }
  888. }
  889. // retrieve auth config from encoded auth
  890. authConfig := &apitypes.AuthConfig{}
  891. if encodedAuth != "" {
  892. if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil {
  893. logrus.Warnf("invalid authconfig: %v", err)
  894. }
  895. }
  896. resp := &apitypes.ServiceUpdateResponse{}
  897. // pin image by digest
  898. if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" {
  899. digestImage, err := c.imageWithDigestString(ctx, newCtnr.Image, authConfig)
  900. if err != nil {
  901. logrus.Warnf("unable to pin image %s to digest: %s", newCtnr.Image, err.Error())
  902. resp.Warnings = append(resp.Warnings, fmt.Sprintf("unable to pin image %s to digest: %s", newCtnr.Image, err.Error()))
  903. } else if newCtnr.Image != digestImage {
  904. logrus.Debugf("pinning image %s by digest: %s", newCtnr.Image, digestImage)
  905. newCtnr.Image = digestImage
  906. } else {
  907. logrus.Debugf("updating service using supplied digest reference %s", newCtnr.Image)
  908. }
  909. }
  910. _, err = state.controlClient.UpdateService(
  911. ctx,
  912. &swarmapi.UpdateServiceRequest{
  913. ServiceID: currentService.ID,
  914. Spec: &serviceSpec,
  915. ServiceVersion: &swarmapi.Version{
  916. Index: version,
  917. },
  918. },
  919. )
  920. return resp, err
  921. }
  922. // RemoveService removes a service from a managed swarm cluster.
  923. func (c *Cluster) RemoveService(input string) error {
  924. c.mu.RLock()
  925. defer c.mu.RUnlock()
  926. state := c.currentNodeState()
  927. if !state.IsActiveManager() {
  928. return c.errNoManager(state)
  929. }
  930. ctx, cancel := c.getRequestContext()
  931. defer cancel()
  932. service, err := getService(ctx, state.controlClient, input)
  933. if err != nil {
  934. return err
  935. }
  936. _, err = state.controlClient.RemoveService(ctx, &swarmapi.RemoveServiceRequest{ServiceID: service.ID})
  937. return err
  938. }
  939. // ServiceLogs collects service logs and writes them back to `config.OutStream`
  940. func (c *Cluster) ServiceLogs(ctx context.Context, input string, config *backend.ContainerLogsConfig, started chan struct{}) error {
  941. c.mu.RLock()
  942. state := c.currentNodeState()
  943. if !state.IsActiveManager() {
  944. c.mu.RUnlock()
  945. return c.errNoManager(state)
  946. }
  947. service, err := getService(ctx, state.controlClient, input)
  948. if err != nil {
  949. c.mu.RUnlock()
  950. return err
  951. }
  952. stream, err := state.logsClient.SubscribeLogs(ctx, &swarmapi.SubscribeLogsRequest{
  953. Selector: &swarmapi.LogSelector{
  954. ServiceIDs: []string{service.ID},
  955. },
  956. Options: &swarmapi.LogSubscriptionOptions{
  957. Follow: config.Follow,
  958. },
  959. })
  960. if err != nil {
  961. c.mu.RUnlock()
  962. return err
  963. }
  964. wf := ioutils.NewWriteFlusher(config.OutStream)
  965. defer wf.Close()
  966. close(started)
  967. wf.Flush()
  968. outStream := stdcopy.NewStdWriter(wf, stdcopy.Stdout)
  969. errStream := stdcopy.NewStdWriter(wf, stdcopy.Stderr)
  970. // Release the lock before starting the stream.
  971. c.mu.RUnlock()
  972. for {
  973. // Check the context before doing anything.
  974. select {
  975. case <-ctx.Done():
  976. return ctx.Err()
  977. default:
  978. }
  979. subscribeMsg, err := stream.Recv()
  980. if err == io.EOF {
  981. return nil
  982. }
  983. if err != nil {
  984. return err
  985. }
  986. for _, msg := range subscribeMsg.Messages {
  987. data := []byte{}
  988. if config.Timestamps {
  989. ts, err := ptypes.Timestamp(msg.Timestamp)
  990. if err != nil {
  991. return err
  992. }
  993. data = append(data, []byte(ts.Format(logger.TimeFormat)+" ")...)
  994. }
  995. data = append(data, []byte(fmt.Sprintf("%s.node.id=%s,%s.service.id=%s,%s.task.id=%s ",
  996. contextPrefix, msg.Context.NodeID,
  997. contextPrefix, msg.Context.ServiceID,
  998. contextPrefix, msg.Context.TaskID,
  999. ))...)
  1000. data = append(data, msg.Data...)
  1001. switch msg.Stream {
  1002. case swarmapi.LogStreamStdout:
  1003. outStream.Write(data)
  1004. case swarmapi.LogStreamStderr:
  1005. errStream.Write(data)
  1006. }
  1007. }
  1008. }
  1009. }
  1010. // GetNodes returns a list of all nodes known to a cluster.
  1011. func (c *Cluster) GetNodes(options apitypes.NodeListOptions) ([]types.Node, error) {
  1012. c.mu.RLock()
  1013. defer c.mu.RUnlock()
  1014. state := c.currentNodeState()
  1015. if !state.IsActiveManager() {
  1016. return nil, c.errNoManager(state)
  1017. }
  1018. filters, err := newListNodesFilters(options.Filters)
  1019. if err != nil {
  1020. return nil, err
  1021. }
  1022. ctx, cancel := c.getRequestContext()
  1023. defer cancel()
  1024. r, err := state.controlClient.ListNodes(
  1025. ctx,
  1026. &swarmapi.ListNodesRequest{Filters: filters})
  1027. if err != nil {
  1028. return nil, err
  1029. }
  1030. nodes := []types.Node{}
  1031. for _, node := range r.Nodes {
  1032. nodes = append(nodes, convert.NodeFromGRPC(*node))
  1033. }
  1034. return nodes, nil
  1035. }
  1036. // GetNode returns a node based on an ID.
  1037. func (c *Cluster) GetNode(input string) (types.Node, error) {
  1038. c.mu.RLock()
  1039. defer c.mu.RUnlock()
  1040. state := c.currentNodeState()
  1041. if !state.IsActiveManager() {
  1042. return types.Node{}, c.errNoManager(state)
  1043. }
  1044. ctx, cancel := c.getRequestContext()
  1045. defer cancel()
  1046. node, err := getNode(ctx, state.controlClient, input)
  1047. if err != nil {
  1048. return types.Node{}, err
  1049. }
  1050. return convert.NodeFromGRPC(*node), nil
  1051. }
  1052. // UpdateNode updates existing nodes properties.
  1053. func (c *Cluster) UpdateNode(input string, version uint64, spec types.NodeSpec) error {
  1054. c.mu.RLock()
  1055. defer c.mu.RUnlock()
  1056. state := c.currentNodeState()
  1057. if !state.IsActiveManager() {
  1058. return c.errNoManager(state)
  1059. }
  1060. nodeSpec, err := convert.NodeSpecToGRPC(spec)
  1061. if err != nil {
  1062. return apierrors.NewBadRequestError(err)
  1063. }
  1064. ctx, cancel := c.getRequestContext()
  1065. defer cancel()
  1066. currentNode, err := getNode(ctx, state.controlClient, input)
  1067. if err != nil {
  1068. return err
  1069. }
  1070. _, err = state.controlClient.UpdateNode(
  1071. ctx,
  1072. &swarmapi.UpdateNodeRequest{
  1073. NodeID: currentNode.ID,
  1074. Spec: &nodeSpec,
  1075. NodeVersion: &swarmapi.Version{
  1076. Index: version,
  1077. },
  1078. },
  1079. )
  1080. return err
  1081. }
  1082. // RemoveNode removes a node from a cluster
  1083. func (c *Cluster) RemoveNode(input string, force bool) error {
  1084. c.mu.RLock()
  1085. defer c.mu.RUnlock()
  1086. state := c.currentNodeState()
  1087. if !state.IsActiveManager() {
  1088. return c.errNoManager(state)
  1089. }
  1090. ctx, cancel := c.getRequestContext()
  1091. defer cancel()
  1092. node, err := getNode(ctx, state.controlClient, input)
  1093. if err != nil {
  1094. return err
  1095. }
  1096. _, err = state.controlClient.RemoveNode(ctx, &swarmapi.RemoveNodeRequest{NodeID: node.ID, Force: force})
  1097. return err
  1098. }
  1099. // GetTasks returns a list of tasks matching the filter options.
  1100. func (c *Cluster) GetTasks(options apitypes.TaskListOptions) ([]types.Task, error) {
  1101. c.mu.RLock()
  1102. defer c.mu.RUnlock()
  1103. state := c.currentNodeState()
  1104. if !state.IsActiveManager() {
  1105. return nil, c.errNoManager(state)
  1106. }
  1107. byName := func(filter filters.Args) error {
  1108. if filter.Include("service") {
  1109. serviceFilters := filter.Get("service")
  1110. for _, serviceFilter := range serviceFilters {
  1111. service, err := c.GetService(serviceFilter)
  1112. if err != nil {
  1113. return err
  1114. }
  1115. filter.Del("service", serviceFilter)
  1116. filter.Add("service", service.ID)
  1117. }
  1118. }
  1119. if filter.Include("node") {
  1120. nodeFilters := filter.Get("node")
  1121. for _, nodeFilter := range nodeFilters {
  1122. node, err := c.GetNode(nodeFilter)
  1123. if err != nil {
  1124. return err
  1125. }
  1126. filter.Del("node", nodeFilter)
  1127. filter.Add("node", node.ID)
  1128. }
  1129. }
  1130. return nil
  1131. }
  1132. filters, err := newListTasksFilters(options.Filters, byName)
  1133. if err != nil {
  1134. return nil, err
  1135. }
  1136. ctx, cancel := c.getRequestContext()
  1137. defer cancel()
  1138. r, err := state.controlClient.ListTasks(
  1139. ctx,
  1140. &swarmapi.ListTasksRequest{Filters: filters})
  1141. if err != nil {
  1142. return nil, err
  1143. }
  1144. tasks := []types.Task{}
  1145. for _, task := range r.Tasks {
  1146. if task.Spec.GetContainer() != nil {
  1147. tasks = append(tasks, convert.TaskFromGRPC(*task))
  1148. }
  1149. }
  1150. return tasks, nil
  1151. }
  1152. // GetTask returns a task by an ID.
  1153. func (c *Cluster) GetTask(input string) (types.Task, error) {
  1154. c.mu.RLock()
  1155. defer c.mu.RUnlock()
  1156. state := c.currentNodeState()
  1157. if !state.IsActiveManager() {
  1158. return types.Task{}, c.errNoManager(state)
  1159. }
  1160. ctx, cancel := c.getRequestContext()
  1161. defer cancel()
  1162. task, err := getTask(ctx, state.controlClient, input)
  1163. if err != nil {
  1164. return types.Task{}, err
  1165. }
  1166. return convert.TaskFromGRPC(*task), nil
  1167. }
  1168. // GetNetwork returns a cluster network by an ID.
  1169. func (c *Cluster) GetNetwork(input string) (apitypes.NetworkResource, error) {
  1170. c.mu.RLock()
  1171. defer c.mu.RUnlock()
  1172. state := c.currentNodeState()
  1173. if !state.IsActiveManager() {
  1174. return apitypes.NetworkResource{}, c.errNoManager(state)
  1175. }
  1176. ctx, cancel := c.getRequestContext()
  1177. defer cancel()
  1178. network, err := getNetwork(ctx, state.controlClient, input)
  1179. if err != nil {
  1180. return apitypes.NetworkResource{}, err
  1181. }
  1182. return convert.BasicNetworkFromGRPC(*network), nil
  1183. }
  1184. func (c *Cluster) getNetworks(filters *swarmapi.ListNetworksRequest_Filters) ([]apitypes.NetworkResource, error) {
  1185. c.mu.RLock()
  1186. defer c.mu.RUnlock()
  1187. state := c.currentNodeState()
  1188. if !state.IsActiveManager() {
  1189. return nil, c.errNoManager(state)
  1190. }
  1191. ctx, cancel := c.getRequestContext()
  1192. defer cancel()
  1193. r, err := state.controlClient.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: filters})
  1194. if err != nil {
  1195. return nil, err
  1196. }
  1197. var networks []apitypes.NetworkResource
  1198. for _, network := range r.Networks {
  1199. networks = append(networks, convert.BasicNetworkFromGRPC(*network))
  1200. }
  1201. return networks, nil
  1202. }
  1203. // GetNetworks returns all current cluster managed networks.
  1204. func (c *Cluster) GetNetworks() ([]apitypes.NetworkResource, error) {
  1205. return c.getNetworks(nil)
  1206. }
  1207. // GetNetworksByName returns cluster managed networks by name.
  1208. // It is ok to have multiple networks here. #18864
  1209. func (c *Cluster) GetNetworksByName(name string) ([]apitypes.NetworkResource, error) {
  1210. // Note that swarmapi.GetNetworkRequest.Name is not functional.
  1211. // So we cannot just use that with c.GetNetwork.
  1212. return c.getNetworks(&swarmapi.ListNetworksRequest_Filters{
  1213. Names: []string{name},
  1214. })
  1215. }
  1216. func attacherKey(target, containerID string) string {
  1217. return containerID + ":" + target
  1218. }
  1219. // UpdateAttachment signals the attachment config to the attachment
  1220. // waiter who is trying to start or attach the container to the
  1221. // network.
  1222. func (c *Cluster) UpdateAttachment(target, containerID string, config *network.NetworkingConfig) error {
  1223. c.mu.RLock()
  1224. attacher, ok := c.attachers[attacherKey(target, containerID)]
  1225. c.mu.RUnlock()
  1226. if !ok || attacher == nil {
  1227. return fmt.Errorf("could not find attacher for container %s to network %s", containerID, target)
  1228. }
  1229. attacher.attachWaitCh <- config
  1230. close(attacher.attachWaitCh)
  1231. return nil
  1232. }
  1233. // WaitForDetachment waits for the container to stop or detach from
  1234. // the network.
  1235. func (c *Cluster) WaitForDetachment(ctx context.Context, networkName, networkID, taskID, containerID string) error {
  1236. c.mu.RLock()
  1237. attacher, ok := c.attachers[attacherKey(networkName, containerID)]
  1238. if !ok {
  1239. attacher, ok = c.attachers[attacherKey(networkID, containerID)]
  1240. }
  1241. state := c.currentNodeState()
  1242. if state.swarmNode == nil || state.swarmNode.Agent() == nil {
  1243. c.mu.RUnlock()
  1244. return errors.New("invalid cluster node while waiting for detachment")
  1245. }
  1246. c.mu.RUnlock()
  1247. agent := state.swarmNode.Agent()
  1248. if ok && attacher != nil &&
  1249. attacher.detachWaitCh != nil &&
  1250. attacher.attachCompleteCh != nil {
  1251. // Attachment may be in progress still so wait for
  1252. // attachment to complete.
  1253. select {
  1254. case <-attacher.attachCompleteCh:
  1255. case <-ctx.Done():
  1256. return ctx.Err()
  1257. }
  1258. if attacher.taskID == taskID {
  1259. select {
  1260. case <-attacher.detachWaitCh:
  1261. case <-ctx.Done():
  1262. return ctx.Err()
  1263. }
  1264. }
  1265. }
  1266. return agent.ResourceAllocator().DetachNetwork(ctx, taskID)
  1267. }
  1268. // AttachNetwork generates an attachment request towards the manager.
  1269. func (c *Cluster) AttachNetwork(target string, containerID string, addresses []string) (*network.NetworkingConfig, error) {
  1270. aKey := attacherKey(target, containerID)
  1271. c.mu.Lock()
  1272. state := c.currentNodeState()
  1273. if state.swarmNode == nil || state.swarmNode.Agent() == nil {
  1274. c.mu.Unlock()
  1275. return nil, errors.New("invalid cluster node while attaching to network")
  1276. }
  1277. if attacher, ok := c.attachers[aKey]; ok {
  1278. c.mu.Unlock()
  1279. return attacher.config, nil
  1280. }
  1281. agent := state.swarmNode.Agent()
  1282. attachWaitCh := make(chan *network.NetworkingConfig)
  1283. detachWaitCh := make(chan struct{})
  1284. attachCompleteCh := make(chan struct{})
  1285. c.attachers[aKey] = &attacher{
  1286. attachWaitCh: attachWaitCh,
  1287. attachCompleteCh: attachCompleteCh,
  1288. detachWaitCh: detachWaitCh,
  1289. }
  1290. c.mu.Unlock()
  1291. ctx, cancel := c.getRequestContext()
  1292. defer cancel()
  1293. taskID, err := agent.ResourceAllocator().AttachNetwork(ctx, containerID, target, addresses)
  1294. if err != nil {
  1295. c.mu.Lock()
  1296. delete(c.attachers, aKey)
  1297. c.mu.Unlock()
  1298. return nil, fmt.Errorf("Could not attach to network %s: %v", target, err)
  1299. }
  1300. c.mu.Lock()
  1301. c.attachers[aKey].taskID = taskID
  1302. close(attachCompleteCh)
  1303. c.mu.Unlock()
  1304. logrus.Debugf("Successfully attached to network %s with tid %s", target, taskID)
  1305. var config *network.NetworkingConfig
  1306. select {
  1307. case config = <-attachWaitCh:
  1308. case <-ctx.Done():
  1309. return nil, fmt.Errorf("attaching to network failed, make sure your network options are correct and check manager logs: %v", ctx.Err())
  1310. }
  1311. c.mu.Lock()
  1312. c.attachers[aKey].config = config
  1313. c.mu.Unlock()
  1314. return config, nil
  1315. }
  1316. // DetachNetwork unblocks the waiters waiting on WaitForDetachment so
  1317. // that a request to detach can be generated towards the manager.
  1318. func (c *Cluster) DetachNetwork(target string, containerID string) error {
  1319. aKey := attacherKey(target, containerID)
  1320. c.mu.Lock()
  1321. attacher, ok := c.attachers[aKey]
  1322. delete(c.attachers, aKey)
  1323. c.mu.Unlock()
  1324. if !ok {
  1325. return fmt.Errorf("could not find network attachment for container %s to network %s", containerID, target)
  1326. }
  1327. close(attacher.detachWaitCh)
  1328. return nil
  1329. }
  1330. // CreateNetwork creates a new cluster managed network.
  1331. func (c *Cluster) CreateNetwork(s apitypes.NetworkCreateRequest) (string, error) {
  1332. c.mu.RLock()
  1333. defer c.mu.RUnlock()
  1334. state := c.currentNodeState()
  1335. if !state.IsActiveManager() {
  1336. return "", c.errNoManager(state)
  1337. }
  1338. if runconfig.IsPreDefinedNetwork(s.Name) {
  1339. err := fmt.Errorf("%s is a pre-defined network and cannot be created", s.Name)
  1340. return "", apierrors.NewRequestForbiddenError(err)
  1341. }
  1342. ctx, cancel := c.getRequestContext()
  1343. defer cancel()
  1344. networkSpec := convert.BasicNetworkCreateToGRPC(s)
  1345. r, err := state.controlClient.CreateNetwork(ctx, &swarmapi.CreateNetworkRequest{Spec: &networkSpec})
  1346. if err != nil {
  1347. return "", err
  1348. }
  1349. return r.Network.ID, nil
  1350. }
  1351. // RemoveNetwork removes a cluster network.
  1352. func (c *Cluster) RemoveNetwork(input string) error {
  1353. c.mu.RLock()
  1354. defer c.mu.RUnlock()
  1355. state := c.currentNodeState()
  1356. if !state.IsActiveManager() {
  1357. return c.errNoManager(state)
  1358. }
  1359. ctx, cancel := c.getRequestContext()
  1360. defer cancel()
  1361. network, err := getNetwork(ctx, state.controlClient, input)
  1362. if err != nil {
  1363. return err
  1364. }
  1365. _, err = state.controlClient.RemoveNetwork(ctx, &swarmapi.RemoveNetworkRequest{NetworkID: network.ID})
  1366. return err
  1367. }
  1368. func (c *Cluster) populateNetworkID(ctx context.Context, client swarmapi.ControlClient, s *types.ServiceSpec) error {
  1369. // Always prefer NetworkAttachmentConfigs from TaskTemplate
  1370. // but fallback to service spec for backward compatibility
  1371. networks := s.TaskTemplate.Networks
  1372. if len(networks) == 0 {
  1373. networks = s.Networks
  1374. }
  1375. for i, n := range networks {
  1376. apiNetwork, err := getNetwork(ctx, client, n.Target)
  1377. if err != nil {
  1378. if ln, _ := c.config.Backend.FindNetwork(n.Target); ln != nil && !ln.Info().Dynamic() {
  1379. err = fmt.Errorf("The network %s cannot be used with services. Only networks scoped to the swarm can be used, such as those created with the overlay driver.", ln.Name())
  1380. return apierrors.NewRequestForbiddenError(err)
  1381. }
  1382. return err
  1383. }
  1384. networks[i].Target = apiNetwork.ID
  1385. }
  1386. return nil
  1387. }
  1388. // Cleanup stops active swarm node. This is run before daemon shutdown.
  1389. func (c *Cluster) Cleanup() {
  1390. c.controlMutex.Lock()
  1391. defer c.controlMutex.Unlock()
  1392. c.mu.Lock()
  1393. node := c.nr
  1394. if node == nil {
  1395. c.mu.Unlock()
  1396. return
  1397. }
  1398. defer c.mu.Unlock()
  1399. state := c.currentNodeState()
  1400. if state.IsActiveManager() {
  1401. active, reachable, unreachable, err := managerStats(state.controlClient, state.NodeID())
  1402. if err == nil {
  1403. singlenode := active && isLastManager(reachable, unreachable)
  1404. if active && !singlenode && removingManagerCausesLossOfQuorum(reachable, unreachable) {
  1405. logrus.Errorf("Leaving cluster with %v managers left out of %v. Raft quorum will be lost.", reachable-1, reachable+unreachable)
  1406. }
  1407. }
  1408. }
  1409. if err := node.Stop(); err != nil {
  1410. logrus.Errorf("failed to shut down cluster node: %v", err)
  1411. signal.DumpStacks("")
  1412. }
  1413. c.nr = nil
  1414. }
  1415. func managerStats(client swarmapi.ControlClient, currentNodeID string) (current bool, reachable int, unreachable int, err error) {
  1416. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  1417. defer cancel()
  1418. nodes, err := client.ListNodes(ctx, &swarmapi.ListNodesRequest{})
  1419. if err != nil {
  1420. return false, 0, 0, err
  1421. }
  1422. for _, n := range nodes.Nodes {
  1423. if n.ManagerStatus != nil {
  1424. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_REACHABLE {
  1425. reachable++
  1426. if n.ID == currentNodeID {
  1427. current = true
  1428. }
  1429. }
  1430. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_UNREACHABLE {
  1431. unreachable++
  1432. }
  1433. }
  1434. }
  1435. return
  1436. }
  1437. func validateAndSanitizeInitRequest(req *types.InitRequest) error {
  1438. var err error
  1439. req.ListenAddr, err = validateAddr(req.ListenAddr)
  1440. if err != nil {
  1441. return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
  1442. }
  1443. if req.Spec.Annotations.Name == "" {
  1444. req.Spec.Annotations.Name = "default"
  1445. } else if req.Spec.Annotations.Name != "default" {
  1446. return errors.New(`swarm spec must be named "default"`)
  1447. }
  1448. return nil
  1449. }
  1450. func validateAndSanitizeJoinRequest(req *types.JoinRequest) error {
  1451. var err error
  1452. req.ListenAddr, err = validateAddr(req.ListenAddr)
  1453. if err != nil {
  1454. return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
  1455. }
  1456. if len(req.RemoteAddrs) == 0 {
  1457. return errors.New("at least 1 RemoteAddr is required to join")
  1458. }
  1459. for i := range req.RemoteAddrs {
  1460. req.RemoteAddrs[i], err = validateAddr(req.RemoteAddrs[i])
  1461. if err != nil {
  1462. return fmt.Errorf("invalid remoteAddr %q: %v", req.RemoteAddrs[i], err)
  1463. }
  1464. }
  1465. return nil
  1466. }
  1467. func validateAddr(addr string) (string, error) {
  1468. if addr == "" {
  1469. return addr, errors.New("invalid empty address")
  1470. }
  1471. newaddr, err := opts.ParseTCPAddr(addr, defaultAddr)
  1472. if err != nil {
  1473. return addr, nil
  1474. }
  1475. return strings.TrimPrefix(newaddr, "tcp://"), nil
  1476. }
  1477. func initClusterSpec(node *swarmnode.Node, spec types.Spec) error {
  1478. ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
  1479. for conn := range node.ListenControlSocket(ctx) {
  1480. if ctx.Err() != nil {
  1481. return ctx.Err()
  1482. }
  1483. if conn != nil {
  1484. client := swarmapi.NewControlClient(conn)
  1485. var cluster *swarmapi.Cluster
  1486. for i := 0; ; i++ {
  1487. lcr, err := client.ListClusters(ctx, &swarmapi.ListClustersRequest{})
  1488. if err != nil {
  1489. return fmt.Errorf("error on listing clusters: %v", err)
  1490. }
  1491. if len(lcr.Clusters) == 0 {
  1492. if i < 10 {
  1493. time.Sleep(200 * time.Millisecond)
  1494. continue
  1495. }
  1496. return errors.New("empty list of clusters was returned")
  1497. }
  1498. cluster = lcr.Clusters[0]
  1499. break
  1500. }
  1501. // In init, we take the initial default values from swarmkit, and merge
  1502. // any non nil or 0 value from spec to GRPC spec. This will leave the
  1503. // default value alone.
  1504. // Note that this is different from Update(), as in Update() we expect
  1505. // user to specify the complete spec of the cluster (as they already know
  1506. // the existing one and knows which field to update)
  1507. clusterSpec, err := convert.MergeSwarmSpecToGRPC(spec, cluster.Spec)
  1508. if err != nil {
  1509. return fmt.Errorf("error updating cluster settings: %v", err)
  1510. }
  1511. _, err = client.UpdateCluster(ctx, &swarmapi.UpdateClusterRequest{
  1512. ClusterID: cluster.ID,
  1513. ClusterVersion: &cluster.Meta.Version,
  1514. Spec: &clusterSpec,
  1515. })
  1516. if err != nil {
  1517. return fmt.Errorf("error updating cluster settings: %v", err)
  1518. }
  1519. return nil
  1520. }
  1521. }
  1522. return ctx.Err()
  1523. }
  1524. func detectLockedError(err error) error {
  1525. if err == swarmnode.ErrInvalidUnlockKey {
  1526. return errors.WithStack(errSwarmLocked)
  1527. }
  1528. return err
  1529. }