cluster.go 50 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923
  1. package cluster
  2. import (
  3. "crypto/x509"
  4. "encoding/base64"
  5. "encoding/json"
  6. "fmt"
  7. "io"
  8. "io/ioutil"
  9. "net"
  10. "os"
  11. "path/filepath"
  12. "runtime"
  13. "strings"
  14. "sync"
  15. "time"
  16. "github.com/Sirupsen/logrus"
  17. "github.com/docker/distribution/digest"
  18. distreference "github.com/docker/distribution/reference"
  19. apierrors "github.com/docker/docker/api/errors"
  20. apitypes "github.com/docker/docker/api/types"
  21. "github.com/docker/docker/api/types/backend"
  22. "github.com/docker/docker/api/types/filters"
  23. "github.com/docker/docker/api/types/network"
  24. types "github.com/docker/docker/api/types/swarm"
  25. "github.com/docker/docker/daemon/cluster/convert"
  26. executorpkg "github.com/docker/docker/daemon/cluster/executor"
  27. "github.com/docker/docker/daemon/cluster/executor/container"
  28. "github.com/docker/docker/daemon/logger"
  29. "github.com/docker/docker/opts"
  30. "github.com/docker/docker/pkg/ioutils"
  31. "github.com/docker/docker/pkg/signal"
  32. "github.com/docker/docker/pkg/stdcopy"
  33. "github.com/docker/docker/reference"
  34. "github.com/docker/docker/runconfig"
  35. swarmapi "github.com/docker/swarmkit/api"
  36. "github.com/docker/swarmkit/manager/encryption"
  37. swarmnode "github.com/docker/swarmkit/node"
  38. "github.com/docker/swarmkit/protobuf/ptypes"
  39. "github.com/pkg/errors"
  40. "golang.org/x/net/context"
  41. "google.golang.org/grpc"
  42. )
  43. const swarmDirName = "swarm"
  44. const controlSocket = "control.sock"
  45. const swarmConnectTimeout = 20 * time.Second
  46. const swarmRequestTimeout = 20 * time.Second
  47. const stateFile = "docker-state.json"
  48. const defaultAddr = "0.0.0.0:2377"
  49. const (
  50. initialReconnectDelay = 100 * time.Millisecond
  51. maxReconnectDelay = 30 * time.Second
  52. contextPrefix = "com.docker.swarm"
  53. )
  54. // ErrNoSwarm is returned on leaving a cluster that was never initialized
  55. var ErrNoSwarm = fmt.Errorf("This node is not part of a swarm")
  56. // ErrSwarmExists is returned on initialize or join request for a cluster that has already been activated
  57. var ErrSwarmExists = fmt.Errorf("This node is already part of a swarm. Use \"docker swarm leave\" to leave this swarm and join another one.")
  58. // ErrSwarmJoinTimeoutReached is returned when cluster join could not complete before timeout was reached.
  59. var ErrSwarmJoinTimeoutReached = fmt.Errorf("Timeout was reached before node was joined. The attempt to join the swarm will continue in the background. Use the \"docker info\" command to see the current swarm status of your node.")
  60. // ErrSwarmLocked is returned if the swarm is encrypted and needs a key to unlock it.
  61. var ErrSwarmLocked = fmt.Errorf("Swarm is encrypted and needs to be unlocked before it can be used. Please use \"docker swarm unlock\" to unlock it.")
  62. // ErrSwarmCertificatesExpired is returned if docker was not started for the whole validity period and they had no chance to renew automatically.
  63. var ErrSwarmCertificatesExpired = errors.New("Swarm certificates have expired. To replace them, leave the swarm and join again.")
  64. // NetworkSubnetsProvider exposes functions for retrieving the subnets
  65. // of networks managed by Docker, so they can be filtered.
  66. type NetworkSubnetsProvider interface {
  67. V4Subnets() []net.IPNet
  68. V6Subnets() []net.IPNet
  69. }
  70. // Config provides values for Cluster.
  71. type Config struct {
  72. Root string
  73. Name string
  74. Backend executorpkg.Backend
  75. NetworkSubnetsProvider NetworkSubnetsProvider
  76. // DefaultAdvertiseAddr is the default host/IP or network interface to use
  77. // if no AdvertiseAddr value is specified.
  78. DefaultAdvertiseAddr string
  79. // path to store runtime state, such as the swarm control socket
  80. RuntimeRoot string
  81. }
  82. // Cluster provides capabilities to participate in a cluster as a worker or a
  83. // manager.
  84. type Cluster struct {
  85. sync.RWMutex
  86. *node
  87. root string
  88. runtimeRoot string
  89. config Config
  90. configEvent chan struct{} // todo: make this array and goroutine safe
  91. actualLocalAddr string // after resolution, not persisted
  92. stop bool
  93. err error
  94. cancelDelay func()
  95. attachers map[string]*attacher
  96. locked bool
  97. lastNodeConfig *nodeStartConfig
  98. }
  99. // attacher manages the in-memory attachment state of a container
  100. // attachment to a global scope network managed by swarm manager. It
  101. // helps in identifying the attachment ID via the taskID and the
  102. // corresponding attachment configuration obtained from the manager.
  103. type attacher struct {
  104. taskID string
  105. config *network.NetworkingConfig
  106. attachWaitCh chan *network.NetworkingConfig
  107. attachCompleteCh chan struct{}
  108. detachWaitCh chan struct{}
  109. }
  110. type node struct {
  111. *swarmnode.Node
  112. done chan struct{}
  113. ready bool
  114. conn *grpc.ClientConn
  115. client swarmapi.ControlClient
  116. logs swarmapi.LogsClient
  117. reconnectDelay time.Duration
  118. config nodeStartConfig
  119. }
  120. // nodeStartConfig holds configuration needed to start a new node. Exported
  121. // fields of this structure are saved to disk in json. Unexported fields
  122. // contain data that shouldn't be persisted between daemon reloads.
  123. type nodeStartConfig struct {
  124. // LocalAddr is this machine's local IP or hostname, if specified.
  125. LocalAddr string
  126. // RemoteAddr is the address that was given to "swarm join". It is used
  127. // to find LocalAddr if necessary.
  128. RemoteAddr string
  129. // ListenAddr is the address we bind to, including a port.
  130. ListenAddr string
  131. // AdvertiseAddr is the address other nodes should connect to,
  132. // including a port.
  133. AdvertiseAddr string
  134. joinAddr string
  135. forceNewCluster bool
  136. joinToken string
  137. lockKey []byte
  138. autolock bool
  139. }
  140. // New creates a new Cluster instance using provided config.
  141. func New(config Config) (*Cluster, error) {
  142. root := filepath.Join(config.Root, swarmDirName)
  143. if err := os.MkdirAll(root, 0700); err != nil {
  144. return nil, err
  145. }
  146. if config.RuntimeRoot == "" {
  147. config.RuntimeRoot = root
  148. }
  149. if err := os.MkdirAll(config.RuntimeRoot, 0700); err != nil {
  150. return nil, err
  151. }
  152. c := &Cluster{
  153. root: root,
  154. config: config,
  155. configEvent: make(chan struct{}, 10),
  156. runtimeRoot: config.RuntimeRoot,
  157. attachers: make(map[string]*attacher),
  158. }
  159. nodeConfig, err := c.loadState()
  160. if err != nil {
  161. if os.IsNotExist(err) {
  162. return c, nil
  163. }
  164. return nil, err
  165. }
  166. n, err := c.startNewNode(*nodeConfig)
  167. if err != nil {
  168. return nil, err
  169. }
  170. select {
  171. case <-time.After(swarmConnectTimeout):
  172. logrus.Error("swarm component could not be started before timeout was reached")
  173. case <-n.Ready():
  174. case <-n.done:
  175. if errors.Cause(c.err) == ErrSwarmLocked {
  176. return c, nil
  177. }
  178. if err, ok := errors.Cause(c.err).(x509.CertificateInvalidError); ok && err.Reason == x509.Expired {
  179. c.err = ErrSwarmCertificatesExpired
  180. return c, nil
  181. }
  182. return nil, fmt.Errorf("swarm component could not be started: %v", c.err)
  183. }
  184. go c.reconnectOnFailure(n)
  185. return c, nil
  186. }
  187. func (c *Cluster) loadState() (*nodeStartConfig, error) {
  188. dt, err := ioutil.ReadFile(filepath.Join(c.root, stateFile))
  189. if err != nil {
  190. return nil, err
  191. }
  192. // missing certificate means no actual state to restore from
  193. if _, err := os.Stat(filepath.Join(c.root, "certificates/swarm-node.crt")); err != nil {
  194. if os.IsNotExist(err) {
  195. c.clearState()
  196. }
  197. return nil, err
  198. }
  199. var st nodeStartConfig
  200. if err := json.Unmarshal(dt, &st); err != nil {
  201. return nil, err
  202. }
  203. return &st, nil
  204. }
  205. func (c *Cluster) saveState(config nodeStartConfig) error {
  206. dt, err := json.Marshal(config)
  207. if err != nil {
  208. return err
  209. }
  210. return ioutils.AtomicWriteFile(filepath.Join(c.root, stateFile), dt, 0600)
  211. }
  212. func (c *Cluster) reconnectOnFailure(n *node) {
  213. for {
  214. <-n.done
  215. c.Lock()
  216. if c.stop || c.node != nil {
  217. c.Unlock()
  218. return
  219. }
  220. n.reconnectDelay *= 2
  221. if n.reconnectDelay > maxReconnectDelay {
  222. n.reconnectDelay = maxReconnectDelay
  223. }
  224. logrus.Warnf("Restarting swarm in %.2f seconds", n.reconnectDelay.Seconds())
  225. delayCtx, cancel := context.WithTimeout(context.Background(), n.reconnectDelay)
  226. c.cancelDelay = cancel
  227. c.Unlock()
  228. <-delayCtx.Done()
  229. if delayCtx.Err() != context.DeadlineExceeded {
  230. return
  231. }
  232. c.Lock()
  233. if c.node != nil {
  234. c.Unlock()
  235. return
  236. }
  237. var err error
  238. config := n.config
  239. config.RemoteAddr = c.getRemoteAddress()
  240. config.joinAddr = config.RemoteAddr
  241. n, err = c.startNewNode(config)
  242. if err != nil {
  243. c.err = err
  244. close(n.done)
  245. }
  246. c.Unlock()
  247. }
  248. }
  249. func (c *Cluster) startNewNode(conf nodeStartConfig) (*node, error) {
  250. if err := c.config.Backend.IsSwarmCompatible(); err != nil {
  251. return nil, err
  252. }
  253. actualLocalAddr := conf.LocalAddr
  254. if actualLocalAddr == "" {
  255. // If localAddr was not specified, resolve it automatically
  256. // based on the route to joinAddr. localAddr can only be left
  257. // empty on "join".
  258. listenHost, _, err := net.SplitHostPort(conf.ListenAddr)
  259. if err != nil {
  260. return nil, fmt.Errorf("could not parse listen address: %v", err)
  261. }
  262. listenAddrIP := net.ParseIP(listenHost)
  263. if listenAddrIP == nil || !listenAddrIP.IsUnspecified() {
  264. actualLocalAddr = listenHost
  265. } else {
  266. if conf.RemoteAddr == "" {
  267. // Should never happen except using swarms created by
  268. // old versions that didn't save remoteAddr.
  269. conf.RemoteAddr = "8.8.8.8:53"
  270. }
  271. conn, err := net.Dial("udp", conf.RemoteAddr)
  272. if err != nil {
  273. return nil, fmt.Errorf("could not find local IP address: %v", err)
  274. }
  275. localHostPort := conn.LocalAddr().String()
  276. actualLocalAddr, _, _ = net.SplitHostPort(localHostPort)
  277. conn.Close()
  278. }
  279. }
  280. var control string
  281. if runtime.GOOS == "windows" {
  282. control = `\\.\pipe\` + controlSocket
  283. } else {
  284. control = filepath.Join(c.runtimeRoot, controlSocket)
  285. }
  286. c.node = nil
  287. c.cancelDelay = nil
  288. c.stop = false
  289. n, err := swarmnode.New(&swarmnode.Config{
  290. Hostname: c.config.Name,
  291. ForceNewCluster: conf.forceNewCluster,
  292. ListenControlAPI: control,
  293. ListenRemoteAPI: conf.ListenAddr,
  294. AdvertiseRemoteAPI: conf.AdvertiseAddr,
  295. JoinAddr: conf.joinAddr,
  296. StateDir: c.root,
  297. JoinToken: conf.joinToken,
  298. Executor: container.NewExecutor(c.config.Backend),
  299. HeartbeatTick: 1,
  300. ElectionTick: 3,
  301. UnlockKey: conf.lockKey,
  302. AutoLockManagers: conf.autolock,
  303. })
  304. if err != nil {
  305. return nil, err
  306. }
  307. ctx := context.Background()
  308. if err := n.Start(ctx); err != nil {
  309. return nil, err
  310. }
  311. node := &node{
  312. Node: n,
  313. done: make(chan struct{}),
  314. reconnectDelay: initialReconnectDelay,
  315. config: conf,
  316. }
  317. c.node = node
  318. c.actualLocalAddr = actualLocalAddr // not saved
  319. c.saveState(conf)
  320. c.config.Backend.SetClusterProvider(c)
  321. go func() {
  322. err := detectLockedError(n.Err(ctx))
  323. if err != nil {
  324. logrus.Errorf("cluster exited with error: %v", err)
  325. }
  326. c.Lock()
  327. c.node = nil
  328. c.err = err
  329. if errors.Cause(err) == ErrSwarmLocked {
  330. c.locked = true
  331. confClone := conf
  332. c.lastNodeConfig = &confClone
  333. }
  334. c.Unlock()
  335. close(node.done)
  336. }()
  337. go func() {
  338. select {
  339. case <-n.Ready():
  340. c.Lock()
  341. node.ready = true
  342. c.err = nil
  343. c.Unlock()
  344. case <-ctx.Done():
  345. }
  346. c.configEvent <- struct{}{}
  347. }()
  348. go func() {
  349. for conn := range n.ListenControlSocket(ctx) {
  350. c.Lock()
  351. if node.conn != conn {
  352. if conn == nil {
  353. node.client = nil
  354. node.logs = nil
  355. } else {
  356. node.client = swarmapi.NewControlClient(conn)
  357. node.logs = swarmapi.NewLogsClient(conn)
  358. }
  359. }
  360. node.conn = conn
  361. c.Unlock()
  362. c.configEvent <- struct{}{}
  363. }
  364. }()
  365. return node, nil
  366. }
  367. // Init initializes new cluster from user provided request.
  368. func (c *Cluster) Init(req types.InitRequest) (string, error) {
  369. c.Lock()
  370. if c.swarmExists() {
  371. if !req.ForceNewCluster {
  372. c.Unlock()
  373. return "", ErrSwarmExists
  374. }
  375. if err := c.stopNode(); err != nil {
  376. c.Unlock()
  377. return "", err
  378. }
  379. }
  380. if err := validateAndSanitizeInitRequest(&req); err != nil {
  381. c.Unlock()
  382. return "", err
  383. }
  384. listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
  385. if err != nil {
  386. c.Unlock()
  387. return "", err
  388. }
  389. advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
  390. if err != nil {
  391. c.Unlock()
  392. return "", err
  393. }
  394. localAddr := listenHost
  395. // If the local address is undetermined, the advertise address
  396. // will be used as local address, if it belongs to this system.
  397. // If the advertise address is not local, then we try to find
  398. // a system address to use as local address. If this fails,
  399. // we give up and ask user to pass the listen address.
  400. if net.ParseIP(localAddr).IsUnspecified() {
  401. advertiseIP := net.ParseIP(advertiseHost)
  402. found := false
  403. for _, systemIP := range listSystemIPs() {
  404. if systemIP.Equal(advertiseIP) {
  405. localAddr = advertiseIP.String()
  406. found = true
  407. break
  408. }
  409. }
  410. if !found {
  411. ip, err := c.resolveSystemAddr()
  412. if err != nil {
  413. c.Unlock()
  414. logrus.Warnf("Could not find a local address: %v", err)
  415. return "", errMustSpecifyListenAddr
  416. }
  417. localAddr = ip.String()
  418. }
  419. }
  420. // todo: check current state existing
  421. n, err := c.startNewNode(nodeStartConfig{
  422. forceNewCluster: req.ForceNewCluster,
  423. autolock: req.AutoLockManagers,
  424. LocalAddr: localAddr,
  425. ListenAddr: net.JoinHostPort(listenHost, listenPort),
  426. AdvertiseAddr: net.JoinHostPort(advertiseHost, advertisePort),
  427. })
  428. if err != nil {
  429. c.Unlock()
  430. return "", err
  431. }
  432. c.Unlock()
  433. select {
  434. case <-n.Ready():
  435. if err := initClusterSpec(n, req.Spec); err != nil {
  436. return "", err
  437. }
  438. go c.reconnectOnFailure(n)
  439. return n.NodeID(), nil
  440. case <-n.done:
  441. c.RLock()
  442. defer c.RUnlock()
  443. if !req.ForceNewCluster { // if failure on first attempt don't keep state
  444. if err := c.clearState(); err != nil {
  445. return "", err
  446. }
  447. }
  448. return "", c.err
  449. }
  450. }
  451. // Join makes current Cluster part of an existing swarm cluster.
  452. func (c *Cluster) Join(req types.JoinRequest) error {
  453. c.Lock()
  454. if c.swarmExists() {
  455. c.Unlock()
  456. return ErrSwarmExists
  457. }
  458. if err := validateAndSanitizeJoinRequest(&req); err != nil {
  459. c.Unlock()
  460. return err
  461. }
  462. listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
  463. if err != nil {
  464. c.Unlock()
  465. return err
  466. }
  467. var advertiseAddr string
  468. if req.AdvertiseAddr != "" {
  469. advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
  470. // For joining, we don't need to provide an advertise address,
  471. // since the remote side can detect it.
  472. if err == nil {
  473. advertiseAddr = net.JoinHostPort(advertiseHost, advertisePort)
  474. }
  475. }
  476. // todo: check current state existing
  477. n, err := c.startNewNode(nodeStartConfig{
  478. RemoteAddr: req.RemoteAddrs[0],
  479. ListenAddr: net.JoinHostPort(listenHost, listenPort),
  480. AdvertiseAddr: advertiseAddr,
  481. joinAddr: req.RemoteAddrs[0],
  482. joinToken: req.JoinToken,
  483. })
  484. if err != nil {
  485. c.Unlock()
  486. return err
  487. }
  488. c.Unlock()
  489. select {
  490. case <-time.After(swarmConnectTimeout):
  491. // attempt to connect will continue in background, but reconnect only if it didn't fail
  492. go func() {
  493. select {
  494. case <-n.Ready():
  495. c.reconnectOnFailure(n)
  496. case <-n.done:
  497. logrus.Errorf("failed to join the cluster: %+v", c.err)
  498. }
  499. }()
  500. return ErrSwarmJoinTimeoutReached
  501. case <-n.Ready():
  502. go c.reconnectOnFailure(n)
  503. return nil
  504. case <-n.done:
  505. c.RLock()
  506. defer c.RUnlock()
  507. return c.err
  508. }
  509. }
  510. // GetUnlockKey returns the unlock key for the swarm.
  511. func (c *Cluster) GetUnlockKey() (string, error) {
  512. c.RLock()
  513. defer c.RUnlock()
  514. if !c.isActiveManager() {
  515. return "", c.errNoManager()
  516. }
  517. ctx, cancel := c.getRequestContext()
  518. defer cancel()
  519. client := swarmapi.NewCAClient(c.conn)
  520. r, err := client.GetUnlockKey(ctx, &swarmapi.GetUnlockKeyRequest{})
  521. if err != nil {
  522. return "", err
  523. }
  524. if len(r.UnlockKey) == 0 {
  525. // no key
  526. return "", nil
  527. }
  528. return encryption.HumanReadableKey(r.UnlockKey), nil
  529. }
  530. // UnlockSwarm provides a key to decrypt data that is encrypted at rest.
  531. func (c *Cluster) UnlockSwarm(req types.UnlockRequest) error {
  532. c.RLock()
  533. if !c.isActiveManager() {
  534. if err := c.errNoManager(); err != ErrSwarmLocked {
  535. c.RUnlock()
  536. return err
  537. }
  538. }
  539. if c.node != nil || c.locked != true {
  540. c.RUnlock()
  541. return errors.New("swarm is not locked")
  542. }
  543. c.RUnlock()
  544. key, err := encryption.ParseHumanReadableKey(req.UnlockKey)
  545. if err != nil {
  546. return err
  547. }
  548. c.Lock()
  549. config := *c.lastNodeConfig
  550. config.lockKey = key
  551. n, err := c.startNewNode(config)
  552. if err != nil {
  553. c.Unlock()
  554. return err
  555. }
  556. c.Unlock()
  557. select {
  558. case <-n.Ready():
  559. case <-n.done:
  560. if errors.Cause(c.err) == ErrSwarmLocked {
  561. return errors.New("swarm could not be unlocked: invalid key provided")
  562. }
  563. return fmt.Errorf("swarm component could not be started: %v", c.err)
  564. }
  565. go c.reconnectOnFailure(n)
  566. return nil
  567. }
  568. // stopNode is a helper that stops the active c.node and waits until it has
  569. // shut down. Call while keeping the cluster lock.
  570. func (c *Cluster) stopNode() error {
  571. if c.node == nil {
  572. return nil
  573. }
  574. c.stop = true
  575. if c.cancelDelay != nil {
  576. c.cancelDelay()
  577. c.cancelDelay = nil
  578. }
  579. node := c.node
  580. ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
  581. defer cancel()
  582. // TODO: can't hold lock on stop because it calls back to network
  583. c.Unlock()
  584. defer c.Lock()
  585. if err := node.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") {
  586. return err
  587. }
  588. <-node.done
  589. return nil
  590. }
  591. func removingManagerCausesLossOfQuorum(reachable, unreachable int) bool {
  592. return reachable-2 <= unreachable
  593. }
  594. func isLastManager(reachable, unreachable int) bool {
  595. return reachable == 1 && unreachable == 0
  596. }
  597. // Leave shuts down Cluster and removes current state.
  598. func (c *Cluster) Leave(force bool) error {
  599. c.Lock()
  600. node := c.node
  601. if node == nil {
  602. if c.locked {
  603. c.locked = false
  604. c.lastNodeConfig = nil
  605. c.Unlock()
  606. } else if c.err == ErrSwarmCertificatesExpired {
  607. c.err = nil
  608. c.Unlock()
  609. } else {
  610. c.Unlock()
  611. return ErrNoSwarm
  612. }
  613. } else {
  614. if node.Manager() != nil && !force {
  615. msg := "You are attempting to leave the swarm on a node that is participating as a manager. "
  616. if c.isActiveManager() {
  617. active, reachable, unreachable, err := c.managerStats()
  618. if err == nil {
  619. if active && removingManagerCausesLossOfQuorum(reachable, unreachable) {
  620. if isLastManager(reachable, unreachable) {
  621. msg += "Removing the last manager erases all current state of the swarm. Use `--force` to ignore this message. "
  622. c.Unlock()
  623. return fmt.Errorf(msg)
  624. }
  625. msg += fmt.Sprintf("Removing this node leaves %v managers out of %v. Without a Raft quorum your swarm will be inaccessible. ", reachable-1, reachable+unreachable)
  626. }
  627. }
  628. } else {
  629. msg += "Doing so may lose the consensus of your cluster. "
  630. }
  631. msg += "The only way to restore a swarm that has lost consensus is to reinitialize it with `--force-new-cluster`. Use `--force` to suppress this message."
  632. c.Unlock()
  633. return fmt.Errorf(msg)
  634. }
  635. if err := c.stopNode(); err != nil {
  636. logrus.Errorf("failed to shut down cluster node: %v", err)
  637. signal.DumpStacks("")
  638. c.Unlock()
  639. return err
  640. }
  641. c.Unlock()
  642. if nodeID := node.NodeID(); nodeID != "" {
  643. nodeContainers, err := c.listContainerForNode(nodeID)
  644. if err != nil {
  645. return err
  646. }
  647. for _, id := range nodeContainers {
  648. if err := c.config.Backend.ContainerRm(id, &apitypes.ContainerRmConfig{ForceRemove: true}); err != nil {
  649. logrus.Errorf("error removing %v: %v", id, err)
  650. }
  651. }
  652. }
  653. }
  654. c.configEvent <- struct{}{}
  655. // todo: cleanup optional?
  656. if err := c.clearState(); err != nil {
  657. return err
  658. }
  659. return nil
  660. }
  661. func (c *Cluster) listContainerForNode(nodeID string) ([]string, error) {
  662. var ids []string
  663. filters := filters.NewArgs()
  664. filters.Add("label", fmt.Sprintf("com.docker.swarm.node.id=%s", nodeID))
  665. containers, err := c.config.Backend.Containers(&apitypes.ContainerListOptions{
  666. Filters: filters,
  667. })
  668. if err != nil {
  669. return []string{}, err
  670. }
  671. for _, c := range containers {
  672. ids = append(ids, c.ID)
  673. }
  674. return ids, nil
  675. }
  676. func (c *Cluster) clearState() error {
  677. // todo: backup this data instead of removing?
  678. if err := os.RemoveAll(c.root); err != nil {
  679. return err
  680. }
  681. if err := os.MkdirAll(c.root, 0700); err != nil {
  682. return err
  683. }
  684. c.config.Backend.SetClusterProvider(nil)
  685. return nil
  686. }
  687. func (c *Cluster) getRequestContext() (context.Context, func()) { // TODO: not needed when requests don't block on qourum lost
  688. return context.WithTimeout(context.Background(), swarmRequestTimeout)
  689. }
  690. // Inspect retrieves the configuration properties of a managed swarm cluster.
  691. func (c *Cluster) Inspect() (types.Swarm, error) {
  692. c.RLock()
  693. defer c.RUnlock()
  694. if !c.isActiveManager() {
  695. return types.Swarm{}, c.errNoManager()
  696. }
  697. ctx, cancel := c.getRequestContext()
  698. defer cancel()
  699. swarm, err := getSwarm(ctx, c.client)
  700. if err != nil {
  701. return types.Swarm{}, err
  702. }
  703. return convert.SwarmFromGRPC(*swarm), nil
  704. }
  705. // Update updates configuration of a managed swarm cluster.
  706. func (c *Cluster) Update(version uint64, spec types.Spec, flags types.UpdateFlags) error {
  707. c.RLock()
  708. defer c.RUnlock()
  709. if !c.isActiveManager() {
  710. return c.errNoManager()
  711. }
  712. ctx, cancel := c.getRequestContext()
  713. defer cancel()
  714. swarm, err := getSwarm(ctx, c.client)
  715. if err != nil {
  716. return err
  717. }
  718. // In update, client should provide the complete spec of the swarm, including
  719. // Name and Labels. If a field is specified with 0 or nil, then the default value
  720. // will be used to swarmkit.
  721. clusterSpec, err := convert.SwarmSpecToGRPC(spec)
  722. if err != nil {
  723. return err
  724. }
  725. _, err = c.client.UpdateCluster(
  726. ctx,
  727. &swarmapi.UpdateClusterRequest{
  728. ClusterID: swarm.ID,
  729. Spec: &clusterSpec,
  730. ClusterVersion: &swarmapi.Version{
  731. Index: version,
  732. },
  733. Rotation: swarmapi.KeyRotation{
  734. WorkerJoinToken: flags.RotateWorkerToken,
  735. ManagerJoinToken: flags.RotateManagerToken,
  736. ManagerUnlockKey: flags.RotateManagerUnlockKey,
  737. },
  738. },
  739. )
  740. return err
  741. }
  742. // IsManager returns true if Cluster is participating as a manager.
  743. func (c *Cluster) IsManager() bool {
  744. c.RLock()
  745. defer c.RUnlock()
  746. return c.isActiveManager()
  747. }
  748. // IsAgent returns true if Cluster is participating as a worker/agent.
  749. func (c *Cluster) IsAgent() bool {
  750. c.RLock()
  751. defer c.RUnlock()
  752. return c.node != nil && c.ready
  753. }
  754. // GetLocalAddress returns the local address.
  755. func (c *Cluster) GetLocalAddress() string {
  756. c.RLock()
  757. defer c.RUnlock()
  758. return c.actualLocalAddr
  759. }
  760. // GetListenAddress returns the listen address.
  761. func (c *Cluster) GetListenAddress() string {
  762. c.RLock()
  763. defer c.RUnlock()
  764. if c.node != nil {
  765. return c.node.config.ListenAddr
  766. }
  767. return ""
  768. }
  769. // GetAdvertiseAddress returns the remotely reachable address of this node.
  770. func (c *Cluster) GetAdvertiseAddress() string {
  771. c.RLock()
  772. defer c.RUnlock()
  773. if c.node != nil && c.node.config.AdvertiseAddr != "" {
  774. advertiseHost, _, _ := net.SplitHostPort(c.node.config.AdvertiseAddr)
  775. return advertiseHost
  776. }
  777. return c.actualLocalAddr
  778. }
  779. // GetRemoteAddress returns a known advertise address of a remote manager if
  780. // available.
  781. // todo: change to array/connect with info
  782. func (c *Cluster) GetRemoteAddress() string {
  783. c.RLock()
  784. defer c.RUnlock()
  785. return c.getRemoteAddress()
  786. }
  787. func (c *Cluster) getRemoteAddress() string {
  788. if c.node == nil {
  789. return ""
  790. }
  791. nodeID := c.node.NodeID()
  792. for _, r := range c.node.Remotes() {
  793. if r.NodeID != nodeID {
  794. return r.Addr
  795. }
  796. }
  797. return ""
  798. }
  799. // ListenClusterEvents returns a channel that receives messages on cluster
  800. // participation changes.
  801. // todo: make cancelable and accessible to multiple callers
  802. func (c *Cluster) ListenClusterEvents() <-chan struct{} {
  803. return c.configEvent
  804. }
  805. // Info returns information about the current cluster state.
  806. func (c *Cluster) Info() types.Info {
  807. info := types.Info{
  808. NodeAddr: c.GetAdvertiseAddress(),
  809. }
  810. c.RLock()
  811. defer c.RUnlock()
  812. if c.node == nil {
  813. info.LocalNodeState = types.LocalNodeStateInactive
  814. if c.cancelDelay != nil {
  815. info.LocalNodeState = types.LocalNodeStateError
  816. }
  817. if c.locked {
  818. info.LocalNodeState = types.LocalNodeStateLocked
  819. } else if c.err == ErrSwarmCertificatesExpired {
  820. info.LocalNodeState = types.LocalNodeStateError
  821. }
  822. } else {
  823. info.LocalNodeState = types.LocalNodeStatePending
  824. if c.ready == true {
  825. info.LocalNodeState = types.LocalNodeStateActive
  826. } else if c.locked {
  827. info.LocalNodeState = types.LocalNodeStateLocked
  828. }
  829. }
  830. if c.err != nil {
  831. info.Error = c.err.Error()
  832. }
  833. ctx, cancel := c.getRequestContext()
  834. defer cancel()
  835. if c.isActiveManager() {
  836. info.ControlAvailable = true
  837. swarm, err := c.Inspect()
  838. if err != nil {
  839. info.Error = err.Error()
  840. }
  841. // Strip JoinTokens
  842. info.Cluster = swarm.ClusterInfo
  843. if r, err := c.client.ListNodes(ctx, &swarmapi.ListNodesRequest{}); err == nil {
  844. info.Nodes = len(r.Nodes)
  845. for _, n := range r.Nodes {
  846. if n.ManagerStatus != nil {
  847. info.Managers = info.Managers + 1
  848. }
  849. }
  850. }
  851. }
  852. if c.node != nil {
  853. for _, r := range c.node.Remotes() {
  854. info.RemoteManagers = append(info.RemoteManagers, types.Peer{NodeID: r.NodeID, Addr: r.Addr})
  855. }
  856. info.NodeID = c.node.NodeID()
  857. }
  858. return info
  859. }
  860. // isActiveManager should not be called without a read lock
  861. func (c *Cluster) isActiveManager() bool {
  862. return c.node != nil && c.conn != nil
  863. }
  864. // swarmExists should not be called without a read lock
  865. func (c *Cluster) swarmExists() bool {
  866. return c.node != nil || c.locked || c.err == ErrSwarmCertificatesExpired
  867. }
  868. // errNoManager returns error describing why manager commands can't be used.
  869. // Call with read lock.
  870. func (c *Cluster) errNoManager() error {
  871. if c.node == nil {
  872. if c.locked {
  873. return ErrSwarmLocked
  874. }
  875. if c.err == ErrSwarmCertificatesExpired {
  876. return ErrSwarmCertificatesExpired
  877. }
  878. return fmt.Errorf("This node is not a swarm manager. Use \"docker swarm init\" or \"docker swarm join\" to connect this node to swarm and try again.")
  879. }
  880. if c.node.Manager() != nil {
  881. return fmt.Errorf("This node is not a swarm manager. Manager is being prepared or has trouble connecting to the cluster.")
  882. }
  883. return fmt.Errorf("This node is not a swarm manager. Worker nodes can't be used to view or modify cluster state. Please run this command on a manager node or promote the current node to a manager.")
  884. }
  885. // GetServices returns all services of a managed swarm cluster.
  886. func (c *Cluster) GetServices(options apitypes.ServiceListOptions) ([]types.Service, error) {
  887. c.RLock()
  888. defer c.RUnlock()
  889. if !c.isActiveManager() {
  890. return nil, c.errNoManager()
  891. }
  892. filters, err := newListServicesFilters(options.Filters)
  893. if err != nil {
  894. return nil, err
  895. }
  896. ctx, cancel := c.getRequestContext()
  897. defer cancel()
  898. r, err := c.client.ListServices(
  899. ctx,
  900. &swarmapi.ListServicesRequest{Filters: filters})
  901. if err != nil {
  902. return nil, err
  903. }
  904. services := []types.Service{}
  905. for _, service := range r.Services {
  906. services = append(services, convert.ServiceFromGRPC(*service))
  907. }
  908. return services, nil
  909. }
  910. // imageWithDigestString takes an image such as name or name:tag
  911. // and returns the image pinned to a digest, such as name@sha256:34234...
  912. // Due to the difference between the docker/docker/reference, and the
  913. // docker/distribution/reference packages, we're parsing the image twice.
  914. // As the two packages converge, this function should be simplified.
  915. // TODO(nishanttotla): After the packages converge, the function must
  916. // convert distreference.Named -> distreference.Canonical, and the logic simplified.
  917. func (c *Cluster) imageWithDigestString(ctx context.Context, image string, authConfig *apitypes.AuthConfig) (string, error) {
  918. if _, err := digest.ParseDigest(image); err == nil {
  919. return "", errors.New("image reference is an image ID")
  920. }
  921. ref, err := distreference.ParseNamed(image)
  922. if err != nil {
  923. return "", err
  924. }
  925. // only query registry if not a canonical reference (i.e. with digest)
  926. if _, ok := ref.(distreference.Canonical); !ok {
  927. // create a docker/docker/reference Named object because GetRepository needs it
  928. dockerRef, err := reference.ParseNamed(image)
  929. if err != nil {
  930. return "", err
  931. }
  932. dockerRef = reference.WithDefaultTag(dockerRef)
  933. namedTaggedRef, ok := dockerRef.(reference.NamedTagged)
  934. if !ok {
  935. return "", fmt.Errorf("unable to cast image to NamedTagged reference object")
  936. }
  937. repo, _, err := c.config.Backend.GetRepository(ctx, namedTaggedRef, authConfig)
  938. if err != nil {
  939. return "", err
  940. }
  941. dscrptr, err := repo.Tags(ctx).Get(ctx, namedTaggedRef.Tag())
  942. if err != nil {
  943. return "", err
  944. }
  945. namedDigestedRef, err := distreference.WithDigest(distreference.EnsureTagged(ref), dscrptr.Digest)
  946. if err != nil {
  947. return "", err
  948. }
  949. return namedDigestedRef.String(), nil
  950. }
  951. // reference already contains a digest, so just return it
  952. return ref.String(), nil
  953. }
  954. // CreateService creates a new service in a managed swarm cluster.
  955. func (c *Cluster) CreateService(s types.ServiceSpec, encodedAuth string) (*apitypes.ServiceCreateResponse, error) {
  956. c.RLock()
  957. defer c.RUnlock()
  958. if !c.isActiveManager() {
  959. return nil, c.errNoManager()
  960. }
  961. ctx, cancel := c.getRequestContext()
  962. defer cancel()
  963. err := c.populateNetworkID(ctx, c.client, &s)
  964. if err != nil {
  965. return nil, err
  966. }
  967. serviceSpec, err := convert.ServiceSpecToGRPC(s)
  968. if err != nil {
  969. return nil, err
  970. }
  971. ctnr := serviceSpec.Task.GetContainer()
  972. if ctnr == nil {
  973. return nil, fmt.Errorf("service does not use container tasks")
  974. }
  975. if encodedAuth != "" {
  976. ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
  977. }
  978. // retrieve auth config from encoded auth
  979. authConfig := &apitypes.AuthConfig{}
  980. if encodedAuth != "" {
  981. if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil {
  982. logrus.Warnf("invalid authconfig: %v", err)
  983. }
  984. }
  985. resp := &apitypes.ServiceCreateResponse{}
  986. // pin image by digest
  987. if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" {
  988. digestImage, err := c.imageWithDigestString(ctx, ctnr.Image, authConfig)
  989. if err != nil {
  990. logrus.Warnf("unable to pin image %s to digest: %s", ctnr.Image, err.Error())
  991. resp.Warnings = append(resp.Warnings, fmt.Sprintf("unable to pin image %s to digest: %s", ctnr.Image, err.Error()))
  992. } else {
  993. logrus.Debugf("pinning image %s by digest: %s", ctnr.Image, digestImage)
  994. ctnr.Image = digestImage
  995. }
  996. }
  997. r, err := c.client.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
  998. if err != nil {
  999. return nil, err
  1000. }
  1001. resp.ID = r.Service.ID
  1002. return resp, nil
  1003. }
  1004. // GetService returns a service based on an ID or name.
  1005. func (c *Cluster) GetService(input string) (types.Service, error) {
  1006. c.RLock()
  1007. defer c.RUnlock()
  1008. if !c.isActiveManager() {
  1009. return types.Service{}, c.errNoManager()
  1010. }
  1011. ctx, cancel := c.getRequestContext()
  1012. defer cancel()
  1013. service, err := getService(ctx, c.client, input)
  1014. if err != nil {
  1015. return types.Service{}, err
  1016. }
  1017. return convert.ServiceFromGRPC(*service), nil
  1018. }
  1019. // UpdateService updates existing service to match new properties.
  1020. func (c *Cluster) UpdateService(serviceIDOrName string, version uint64, spec types.ServiceSpec, encodedAuth string, registryAuthFrom string) (*apitypes.ServiceUpdateResponse, error) {
  1021. c.RLock()
  1022. defer c.RUnlock()
  1023. if !c.isActiveManager() {
  1024. return nil, c.errNoManager()
  1025. }
  1026. ctx, cancel := c.getRequestContext()
  1027. defer cancel()
  1028. err := c.populateNetworkID(ctx, c.client, &spec)
  1029. if err != nil {
  1030. return nil, err
  1031. }
  1032. serviceSpec, err := convert.ServiceSpecToGRPC(spec)
  1033. if err != nil {
  1034. return nil, err
  1035. }
  1036. currentService, err := getService(ctx, c.client, serviceIDOrName)
  1037. if err != nil {
  1038. return nil, err
  1039. }
  1040. newCtnr := serviceSpec.Task.GetContainer()
  1041. if newCtnr == nil {
  1042. return nil, fmt.Errorf("service does not use container tasks")
  1043. }
  1044. if encodedAuth != "" {
  1045. newCtnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
  1046. } else {
  1047. // this is needed because if the encodedAuth isn't being updated then we
  1048. // shouldn't lose it, and continue to use the one that was already present
  1049. var ctnr *swarmapi.ContainerSpec
  1050. switch registryAuthFrom {
  1051. case apitypes.RegistryAuthFromSpec, "":
  1052. ctnr = currentService.Spec.Task.GetContainer()
  1053. case apitypes.RegistryAuthFromPreviousSpec:
  1054. if currentService.PreviousSpec == nil {
  1055. return nil, fmt.Errorf("service does not have a previous spec")
  1056. }
  1057. ctnr = currentService.PreviousSpec.Task.GetContainer()
  1058. default:
  1059. return nil, fmt.Errorf("unsupported registryAuthFromValue")
  1060. }
  1061. if ctnr == nil {
  1062. return nil, fmt.Errorf("service does not use container tasks")
  1063. }
  1064. newCtnr.PullOptions = ctnr.PullOptions
  1065. // update encodedAuth so it can be used to pin image by digest
  1066. if ctnr.PullOptions != nil {
  1067. encodedAuth = ctnr.PullOptions.RegistryAuth
  1068. }
  1069. }
  1070. // retrieve auth config from encoded auth
  1071. authConfig := &apitypes.AuthConfig{}
  1072. if encodedAuth != "" {
  1073. if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil {
  1074. logrus.Warnf("invalid authconfig: %v", err)
  1075. }
  1076. }
  1077. resp := &apitypes.ServiceUpdateResponse{}
  1078. // pin image by digest
  1079. if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" {
  1080. digestImage, err := c.imageWithDigestString(ctx, newCtnr.Image, authConfig)
  1081. if err != nil {
  1082. logrus.Warnf("unable to pin image %s to digest: %s", newCtnr.Image, err.Error())
  1083. resp.Warnings = append(resp.Warnings, fmt.Sprintf("unable to pin image %s to digest: %s", newCtnr.Image, err.Error()))
  1084. } else if newCtnr.Image != digestImage {
  1085. logrus.Debugf("pinning image %s by digest: %s", newCtnr.Image, digestImage)
  1086. newCtnr.Image = digestImage
  1087. }
  1088. }
  1089. _, err = c.client.UpdateService(
  1090. ctx,
  1091. &swarmapi.UpdateServiceRequest{
  1092. ServiceID: currentService.ID,
  1093. Spec: &serviceSpec,
  1094. ServiceVersion: &swarmapi.Version{
  1095. Index: version,
  1096. },
  1097. },
  1098. )
  1099. return resp, err
  1100. }
  1101. // RemoveService removes a service from a managed swarm cluster.
  1102. func (c *Cluster) RemoveService(input string) error {
  1103. c.RLock()
  1104. defer c.RUnlock()
  1105. if !c.isActiveManager() {
  1106. return c.errNoManager()
  1107. }
  1108. ctx, cancel := c.getRequestContext()
  1109. defer cancel()
  1110. service, err := getService(ctx, c.client, input)
  1111. if err != nil {
  1112. return err
  1113. }
  1114. if _, err := c.client.RemoveService(ctx, &swarmapi.RemoveServiceRequest{ServiceID: service.ID}); err != nil {
  1115. return err
  1116. }
  1117. return nil
  1118. }
  1119. // ServiceLogs collects service logs and writes them back to `config.OutStream`
  1120. func (c *Cluster) ServiceLogs(ctx context.Context, input string, config *backend.ContainerLogsConfig, started chan struct{}) error {
  1121. c.RLock()
  1122. if !c.isActiveManager() {
  1123. c.RUnlock()
  1124. return c.errNoManager()
  1125. }
  1126. service, err := getService(ctx, c.client, input)
  1127. if err != nil {
  1128. c.RUnlock()
  1129. return err
  1130. }
  1131. stream, err := c.logs.SubscribeLogs(ctx, &swarmapi.SubscribeLogsRequest{
  1132. Selector: &swarmapi.LogSelector{
  1133. ServiceIDs: []string{service.ID},
  1134. },
  1135. Options: &swarmapi.LogSubscriptionOptions{
  1136. Follow: config.Follow,
  1137. },
  1138. })
  1139. if err != nil {
  1140. c.RUnlock()
  1141. return err
  1142. }
  1143. wf := ioutils.NewWriteFlusher(config.OutStream)
  1144. defer wf.Close()
  1145. close(started)
  1146. wf.Flush()
  1147. outStream := stdcopy.NewStdWriter(wf, stdcopy.Stdout)
  1148. errStream := stdcopy.NewStdWriter(wf, stdcopy.Stderr)
  1149. // Release the lock before starting the stream.
  1150. c.RUnlock()
  1151. for {
  1152. // Check the context before doing anything.
  1153. select {
  1154. case <-ctx.Done():
  1155. return ctx.Err()
  1156. default:
  1157. }
  1158. subscribeMsg, err := stream.Recv()
  1159. if err == io.EOF {
  1160. return nil
  1161. }
  1162. if err != nil {
  1163. return err
  1164. }
  1165. for _, msg := range subscribeMsg.Messages {
  1166. data := []byte{}
  1167. if config.Timestamps {
  1168. ts, err := ptypes.Timestamp(msg.Timestamp)
  1169. if err != nil {
  1170. return err
  1171. }
  1172. data = append(data, []byte(ts.Format(logger.TimeFormat)+" ")...)
  1173. }
  1174. data = append(data, []byte(fmt.Sprintf("%s.node.id=%s,%s.service.id=%s,%s.task.id=%s ",
  1175. contextPrefix, msg.Context.NodeID,
  1176. contextPrefix, msg.Context.ServiceID,
  1177. contextPrefix, msg.Context.TaskID,
  1178. ))...)
  1179. data = append(data, msg.Data...)
  1180. switch msg.Stream {
  1181. case swarmapi.LogStreamStdout:
  1182. outStream.Write(data)
  1183. case swarmapi.LogStreamStderr:
  1184. errStream.Write(data)
  1185. }
  1186. }
  1187. }
  1188. }
  1189. // GetNodes returns a list of all nodes known to a cluster.
  1190. func (c *Cluster) GetNodes(options apitypes.NodeListOptions) ([]types.Node, error) {
  1191. c.RLock()
  1192. defer c.RUnlock()
  1193. if !c.isActiveManager() {
  1194. return nil, c.errNoManager()
  1195. }
  1196. filters, err := newListNodesFilters(options.Filters)
  1197. if err != nil {
  1198. return nil, err
  1199. }
  1200. ctx, cancel := c.getRequestContext()
  1201. defer cancel()
  1202. r, err := c.client.ListNodes(
  1203. ctx,
  1204. &swarmapi.ListNodesRequest{Filters: filters})
  1205. if err != nil {
  1206. return nil, err
  1207. }
  1208. nodes := []types.Node{}
  1209. for _, node := range r.Nodes {
  1210. nodes = append(nodes, convert.NodeFromGRPC(*node))
  1211. }
  1212. return nodes, nil
  1213. }
  1214. // GetNode returns a node based on an ID or name.
  1215. func (c *Cluster) GetNode(input string) (types.Node, error) {
  1216. c.RLock()
  1217. defer c.RUnlock()
  1218. if !c.isActiveManager() {
  1219. return types.Node{}, c.errNoManager()
  1220. }
  1221. ctx, cancel := c.getRequestContext()
  1222. defer cancel()
  1223. node, err := getNode(ctx, c.client, input)
  1224. if err != nil {
  1225. return types.Node{}, err
  1226. }
  1227. return convert.NodeFromGRPC(*node), nil
  1228. }
  1229. // UpdateNode updates existing nodes properties.
  1230. func (c *Cluster) UpdateNode(input string, version uint64, spec types.NodeSpec) error {
  1231. c.RLock()
  1232. defer c.RUnlock()
  1233. if !c.isActiveManager() {
  1234. return c.errNoManager()
  1235. }
  1236. nodeSpec, err := convert.NodeSpecToGRPC(spec)
  1237. if err != nil {
  1238. return err
  1239. }
  1240. ctx, cancel := c.getRequestContext()
  1241. defer cancel()
  1242. currentNode, err := getNode(ctx, c.client, input)
  1243. if err != nil {
  1244. return err
  1245. }
  1246. _, err = c.client.UpdateNode(
  1247. ctx,
  1248. &swarmapi.UpdateNodeRequest{
  1249. NodeID: currentNode.ID,
  1250. Spec: &nodeSpec,
  1251. NodeVersion: &swarmapi.Version{
  1252. Index: version,
  1253. },
  1254. },
  1255. )
  1256. return err
  1257. }
  1258. // RemoveNode removes a node from a cluster
  1259. func (c *Cluster) RemoveNode(input string, force bool) error {
  1260. c.RLock()
  1261. defer c.RUnlock()
  1262. if !c.isActiveManager() {
  1263. return c.errNoManager()
  1264. }
  1265. ctx, cancel := c.getRequestContext()
  1266. defer cancel()
  1267. node, err := getNode(ctx, c.client, input)
  1268. if err != nil {
  1269. return err
  1270. }
  1271. if _, err := c.client.RemoveNode(ctx, &swarmapi.RemoveNodeRequest{NodeID: node.ID, Force: force}); err != nil {
  1272. return err
  1273. }
  1274. return nil
  1275. }
  1276. // GetTasks returns a list of tasks matching the filter options.
  1277. func (c *Cluster) GetTasks(options apitypes.TaskListOptions) ([]types.Task, error) {
  1278. c.RLock()
  1279. defer c.RUnlock()
  1280. if !c.isActiveManager() {
  1281. return nil, c.errNoManager()
  1282. }
  1283. byName := func(filter filters.Args) error {
  1284. if filter.Include("service") {
  1285. serviceFilters := filter.Get("service")
  1286. for _, serviceFilter := range serviceFilters {
  1287. service, err := c.GetService(serviceFilter)
  1288. if err != nil {
  1289. return err
  1290. }
  1291. filter.Del("service", serviceFilter)
  1292. filter.Add("service", service.ID)
  1293. }
  1294. }
  1295. if filter.Include("node") {
  1296. nodeFilters := filter.Get("node")
  1297. for _, nodeFilter := range nodeFilters {
  1298. node, err := c.GetNode(nodeFilter)
  1299. if err != nil {
  1300. return err
  1301. }
  1302. filter.Del("node", nodeFilter)
  1303. filter.Add("node", node.ID)
  1304. }
  1305. }
  1306. return nil
  1307. }
  1308. filters, err := newListTasksFilters(options.Filters, byName)
  1309. if err != nil {
  1310. return nil, err
  1311. }
  1312. ctx, cancel := c.getRequestContext()
  1313. defer cancel()
  1314. r, err := c.client.ListTasks(
  1315. ctx,
  1316. &swarmapi.ListTasksRequest{Filters: filters})
  1317. if err != nil {
  1318. return nil, err
  1319. }
  1320. tasks := []types.Task{}
  1321. for _, task := range r.Tasks {
  1322. if task.Spec.GetContainer() != nil {
  1323. tasks = append(tasks, convert.TaskFromGRPC(*task))
  1324. }
  1325. }
  1326. return tasks, nil
  1327. }
  1328. // GetTask returns a task by an ID.
  1329. func (c *Cluster) GetTask(input string) (types.Task, error) {
  1330. c.RLock()
  1331. defer c.RUnlock()
  1332. if !c.isActiveManager() {
  1333. return types.Task{}, c.errNoManager()
  1334. }
  1335. ctx, cancel := c.getRequestContext()
  1336. defer cancel()
  1337. task, err := getTask(ctx, c.client, input)
  1338. if err != nil {
  1339. return types.Task{}, err
  1340. }
  1341. return convert.TaskFromGRPC(*task), nil
  1342. }
  1343. // GetNetwork returns a cluster network by an ID.
  1344. func (c *Cluster) GetNetwork(input string) (apitypes.NetworkResource, error) {
  1345. c.RLock()
  1346. defer c.RUnlock()
  1347. if !c.isActiveManager() {
  1348. return apitypes.NetworkResource{}, c.errNoManager()
  1349. }
  1350. ctx, cancel := c.getRequestContext()
  1351. defer cancel()
  1352. network, err := getNetwork(ctx, c.client, input)
  1353. if err != nil {
  1354. return apitypes.NetworkResource{}, err
  1355. }
  1356. return convert.BasicNetworkFromGRPC(*network), nil
  1357. }
  1358. // GetNetworks returns all current cluster managed networks.
  1359. func (c *Cluster) GetNetworks() ([]apitypes.NetworkResource, error) {
  1360. c.RLock()
  1361. defer c.RUnlock()
  1362. if !c.isActiveManager() {
  1363. return nil, c.errNoManager()
  1364. }
  1365. ctx, cancel := c.getRequestContext()
  1366. defer cancel()
  1367. r, err := c.client.ListNetworks(ctx, &swarmapi.ListNetworksRequest{})
  1368. if err != nil {
  1369. return nil, err
  1370. }
  1371. var networks []apitypes.NetworkResource
  1372. for _, network := range r.Networks {
  1373. networks = append(networks, convert.BasicNetworkFromGRPC(*network))
  1374. }
  1375. return networks, nil
  1376. }
  1377. func attacherKey(target, containerID string) string {
  1378. return containerID + ":" + target
  1379. }
  1380. // UpdateAttachment signals the attachment config to the attachment
  1381. // waiter who is trying to start or attach the container to the
  1382. // network.
  1383. func (c *Cluster) UpdateAttachment(target, containerID string, config *network.NetworkingConfig) error {
  1384. c.RLock()
  1385. attacher, ok := c.attachers[attacherKey(target, containerID)]
  1386. c.RUnlock()
  1387. if !ok || attacher == nil {
  1388. return fmt.Errorf("could not find attacher for container %s to network %s", containerID, target)
  1389. }
  1390. attacher.attachWaitCh <- config
  1391. close(attacher.attachWaitCh)
  1392. return nil
  1393. }
  1394. // WaitForDetachment waits for the container to stop or detach from
  1395. // the network.
  1396. func (c *Cluster) WaitForDetachment(ctx context.Context, networkName, networkID, taskID, containerID string) error {
  1397. c.RLock()
  1398. attacher, ok := c.attachers[attacherKey(networkName, containerID)]
  1399. if !ok {
  1400. attacher, ok = c.attachers[attacherKey(networkID, containerID)]
  1401. }
  1402. if c.node == nil || c.node.Agent() == nil {
  1403. c.RUnlock()
  1404. return fmt.Errorf("invalid cluster node while waiting for detachment")
  1405. }
  1406. agent := c.node.Agent()
  1407. c.RUnlock()
  1408. if ok && attacher != nil &&
  1409. attacher.detachWaitCh != nil &&
  1410. attacher.attachCompleteCh != nil {
  1411. // Attachment may be in progress still so wait for
  1412. // attachment to complete.
  1413. select {
  1414. case <-attacher.attachCompleteCh:
  1415. case <-ctx.Done():
  1416. return ctx.Err()
  1417. }
  1418. if attacher.taskID == taskID {
  1419. select {
  1420. case <-attacher.detachWaitCh:
  1421. case <-ctx.Done():
  1422. return ctx.Err()
  1423. }
  1424. }
  1425. }
  1426. return agent.ResourceAllocator().DetachNetwork(ctx, taskID)
  1427. }
  1428. // AttachNetwork generates an attachment request towards the manager.
  1429. func (c *Cluster) AttachNetwork(target string, containerID string, addresses []string) (*network.NetworkingConfig, error) {
  1430. aKey := attacherKey(target, containerID)
  1431. c.Lock()
  1432. if c.node == nil || c.node.Agent() == nil {
  1433. c.Unlock()
  1434. return nil, fmt.Errorf("invalid cluster node while attaching to network")
  1435. }
  1436. if attacher, ok := c.attachers[aKey]; ok {
  1437. c.Unlock()
  1438. return attacher.config, nil
  1439. }
  1440. agent := c.node.Agent()
  1441. attachWaitCh := make(chan *network.NetworkingConfig)
  1442. detachWaitCh := make(chan struct{})
  1443. attachCompleteCh := make(chan struct{})
  1444. c.attachers[aKey] = &attacher{
  1445. attachWaitCh: attachWaitCh,
  1446. attachCompleteCh: attachCompleteCh,
  1447. detachWaitCh: detachWaitCh,
  1448. }
  1449. c.Unlock()
  1450. ctx, cancel := c.getRequestContext()
  1451. defer cancel()
  1452. taskID, err := agent.ResourceAllocator().AttachNetwork(ctx, containerID, target, addresses)
  1453. if err != nil {
  1454. c.Lock()
  1455. delete(c.attachers, aKey)
  1456. c.Unlock()
  1457. return nil, fmt.Errorf("Could not attach to network %s: %v", target, err)
  1458. }
  1459. c.Lock()
  1460. c.attachers[aKey].taskID = taskID
  1461. close(attachCompleteCh)
  1462. c.Unlock()
  1463. logrus.Debugf("Successfully attached to network %s with tid %s", target, taskID)
  1464. var config *network.NetworkingConfig
  1465. select {
  1466. case config = <-attachWaitCh:
  1467. case <-ctx.Done():
  1468. return nil, fmt.Errorf("attaching to network failed, make sure your network options are correct and check manager logs: %v", ctx.Err())
  1469. }
  1470. c.Lock()
  1471. c.attachers[aKey].config = config
  1472. c.Unlock()
  1473. return config, nil
  1474. }
  1475. // DetachNetwork unblocks the waiters waiting on WaitForDetachment so
  1476. // that a request to detach can be generated towards the manager.
  1477. func (c *Cluster) DetachNetwork(target string, containerID string) error {
  1478. aKey := attacherKey(target, containerID)
  1479. c.Lock()
  1480. attacher, ok := c.attachers[aKey]
  1481. delete(c.attachers, aKey)
  1482. c.Unlock()
  1483. if !ok {
  1484. return fmt.Errorf("could not find network attachment for container %s to network %s", containerID, target)
  1485. }
  1486. close(attacher.detachWaitCh)
  1487. return nil
  1488. }
  1489. // CreateNetwork creates a new cluster managed network.
  1490. func (c *Cluster) CreateNetwork(s apitypes.NetworkCreateRequest) (string, error) {
  1491. c.RLock()
  1492. defer c.RUnlock()
  1493. if !c.isActiveManager() {
  1494. return "", c.errNoManager()
  1495. }
  1496. if runconfig.IsPreDefinedNetwork(s.Name) {
  1497. err := fmt.Errorf("%s is a pre-defined network and cannot be created", s.Name)
  1498. return "", apierrors.NewRequestForbiddenError(err)
  1499. }
  1500. ctx, cancel := c.getRequestContext()
  1501. defer cancel()
  1502. networkSpec := convert.BasicNetworkCreateToGRPC(s)
  1503. r, err := c.client.CreateNetwork(ctx, &swarmapi.CreateNetworkRequest{Spec: &networkSpec})
  1504. if err != nil {
  1505. return "", err
  1506. }
  1507. return r.Network.ID, nil
  1508. }
  1509. // RemoveNetwork removes a cluster network.
  1510. func (c *Cluster) RemoveNetwork(input string) error {
  1511. c.RLock()
  1512. defer c.RUnlock()
  1513. if !c.isActiveManager() {
  1514. return c.errNoManager()
  1515. }
  1516. ctx, cancel := c.getRequestContext()
  1517. defer cancel()
  1518. network, err := getNetwork(ctx, c.client, input)
  1519. if err != nil {
  1520. return err
  1521. }
  1522. if _, err := c.client.RemoveNetwork(ctx, &swarmapi.RemoveNetworkRequest{NetworkID: network.ID}); err != nil {
  1523. return err
  1524. }
  1525. return nil
  1526. }
  1527. func (c *Cluster) populateNetworkID(ctx context.Context, client swarmapi.ControlClient, s *types.ServiceSpec) error {
  1528. // Always prefer NetworkAttachmentConfigs from TaskTemplate
  1529. // but fallback to service spec for backward compatibility
  1530. networks := s.TaskTemplate.Networks
  1531. if len(networks) == 0 {
  1532. networks = s.Networks
  1533. }
  1534. for i, n := range networks {
  1535. apiNetwork, err := getNetwork(ctx, client, n.Target)
  1536. if err != nil {
  1537. if ln, _ := c.config.Backend.FindNetwork(n.Target); ln != nil && !ln.Info().Dynamic() {
  1538. err = fmt.Errorf("The network %s cannot be used with services. Only networks scoped to the swarm can be used, such as those created with the overlay driver.", ln.Name())
  1539. return apierrors.NewRequestForbiddenError(err)
  1540. }
  1541. return err
  1542. }
  1543. networks[i].Target = apiNetwork.ID
  1544. }
  1545. return nil
  1546. }
  1547. // Cleanup stops active swarm node. This is run before daemon shutdown.
  1548. func (c *Cluster) Cleanup() {
  1549. c.Lock()
  1550. node := c.node
  1551. if node == nil {
  1552. c.Unlock()
  1553. return
  1554. }
  1555. defer c.Unlock()
  1556. if c.isActiveManager() {
  1557. active, reachable, unreachable, err := c.managerStats()
  1558. if err == nil {
  1559. singlenode := active && isLastManager(reachable, unreachable)
  1560. if active && !singlenode && removingManagerCausesLossOfQuorum(reachable, unreachable) {
  1561. logrus.Errorf("Leaving cluster with %v managers left out of %v. Raft quorum will be lost.", reachable-1, reachable+unreachable)
  1562. }
  1563. }
  1564. }
  1565. c.stopNode()
  1566. }
  1567. func (c *Cluster) managerStats() (current bool, reachable int, unreachable int, err error) {
  1568. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  1569. defer cancel()
  1570. nodes, err := c.client.ListNodes(ctx, &swarmapi.ListNodesRequest{})
  1571. if err != nil {
  1572. return false, 0, 0, err
  1573. }
  1574. for _, n := range nodes.Nodes {
  1575. if n.ManagerStatus != nil {
  1576. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_REACHABLE {
  1577. reachable++
  1578. if n.ID == c.node.NodeID() {
  1579. current = true
  1580. }
  1581. }
  1582. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_UNREACHABLE {
  1583. unreachable++
  1584. }
  1585. }
  1586. }
  1587. return
  1588. }
  1589. func validateAndSanitizeInitRequest(req *types.InitRequest) error {
  1590. var err error
  1591. req.ListenAddr, err = validateAddr(req.ListenAddr)
  1592. if err != nil {
  1593. return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
  1594. }
  1595. if req.Spec.Annotations.Name == "" {
  1596. req.Spec.Annotations.Name = "default"
  1597. } else if req.Spec.Annotations.Name != "default" {
  1598. return errors.New(`swarm spec must be named "default"`)
  1599. }
  1600. return nil
  1601. }
  1602. func validateAndSanitizeJoinRequest(req *types.JoinRequest) error {
  1603. var err error
  1604. req.ListenAddr, err = validateAddr(req.ListenAddr)
  1605. if err != nil {
  1606. return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
  1607. }
  1608. if len(req.RemoteAddrs) == 0 {
  1609. return fmt.Errorf("at least 1 RemoteAddr is required to join")
  1610. }
  1611. for i := range req.RemoteAddrs {
  1612. req.RemoteAddrs[i], err = validateAddr(req.RemoteAddrs[i])
  1613. if err != nil {
  1614. return fmt.Errorf("invalid remoteAddr %q: %v", req.RemoteAddrs[i], err)
  1615. }
  1616. }
  1617. return nil
  1618. }
  1619. func validateAddr(addr string) (string, error) {
  1620. if addr == "" {
  1621. return addr, fmt.Errorf("invalid empty address")
  1622. }
  1623. newaddr, err := opts.ParseTCPAddr(addr, defaultAddr)
  1624. if err != nil {
  1625. return addr, nil
  1626. }
  1627. return strings.TrimPrefix(newaddr, "tcp://"), nil
  1628. }
  1629. func initClusterSpec(node *node, spec types.Spec) error {
  1630. ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
  1631. for conn := range node.ListenControlSocket(ctx) {
  1632. if ctx.Err() != nil {
  1633. return ctx.Err()
  1634. }
  1635. if conn != nil {
  1636. client := swarmapi.NewControlClient(conn)
  1637. var cluster *swarmapi.Cluster
  1638. for i := 0; ; i++ {
  1639. lcr, err := client.ListClusters(ctx, &swarmapi.ListClustersRequest{})
  1640. if err != nil {
  1641. return fmt.Errorf("error on listing clusters: %v", err)
  1642. }
  1643. if len(lcr.Clusters) == 0 {
  1644. if i < 10 {
  1645. time.Sleep(200 * time.Millisecond)
  1646. continue
  1647. }
  1648. return fmt.Errorf("empty list of clusters was returned")
  1649. }
  1650. cluster = lcr.Clusters[0]
  1651. break
  1652. }
  1653. // In init, we take the initial default values from swarmkit, and merge
  1654. // any non nil or 0 value from spec to GRPC spec. This will leave the
  1655. // default value alone.
  1656. // Note that this is different from Update(), as in Update() we expect
  1657. // user to specify the complete spec of the cluster (as they already know
  1658. // the existing one and knows which field to update)
  1659. clusterSpec, err := convert.MergeSwarmSpecToGRPC(spec, cluster.Spec)
  1660. if err != nil {
  1661. return fmt.Errorf("error updating cluster settings: %v", err)
  1662. }
  1663. _, err = client.UpdateCluster(ctx, &swarmapi.UpdateClusterRequest{
  1664. ClusterID: cluster.ID,
  1665. ClusterVersion: &cluster.Meta.Version,
  1666. Spec: &clusterSpec,
  1667. })
  1668. if err != nil {
  1669. return fmt.Errorf("error updating cluster settings: %v", err)
  1670. }
  1671. return nil
  1672. }
  1673. }
  1674. return ctx.Err()
  1675. }
  1676. func detectLockedError(err error) error {
  1677. if err == swarmnode.ErrInvalidUnlockKey {
  1678. return errors.WithStack(ErrSwarmLocked)
  1679. }
  1680. return err
  1681. }