cluster.go 49 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893
  1. package cluster
  2. import (
  3. "encoding/base64"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "io/ioutil"
  8. "net"
  9. "os"
  10. "path/filepath"
  11. "runtime"
  12. "strings"
  13. "sync"
  14. "time"
  15. "github.com/Sirupsen/logrus"
  16. apierrors "github.com/docker/docker/api/errors"
  17. apitypes "github.com/docker/docker/api/types"
  18. "github.com/docker/docker/api/types/backend"
  19. "github.com/docker/docker/api/types/filters"
  20. "github.com/docker/docker/api/types/network"
  21. types "github.com/docker/docker/api/types/swarm"
  22. "github.com/docker/docker/daemon/cluster/convert"
  23. executorpkg "github.com/docker/docker/daemon/cluster/executor"
  24. "github.com/docker/docker/daemon/cluster/executor/container"
  25. "github.com/docker/docker/daemon/logger"
  26. "github.com/docker/docker/opts"
  27. "github.com/docker/docker/pkg/ioutils"
  28. "github.com/docker/docker/pkg/signal"
  29. "github.com/docker/docker/pkg/stdcopy"
  30. "github.com/docker/docker/reference"
  31. "github.com/docker/docker/runconfig"
  32. swarmapi "github.com/docker/swarmkit/api"
  33. "github.com/docker/swarmkit/manager/encryption"
  34. swarmnode "github.com/docker/swarmkit/node"
  35. "github.com/docker/swarmkit/protobuf/ptypes"
  36. "github.com/pkg/errors"
  37. "golang.org/x/net/context"
  38. "google.golang.org/grpc"
  39. )
  40. const swarmDirName = "swarm"
  41. const controlSocket = "control.sock"
  42. const swarmConnectTimeout = 20 * time.Second
  43. const swarmRequestTimeout = 20 * time.Second
  44. const stateFile = "docker-state.json"
  45. const defaultAddr = "0.0.0.0:2377"
  46. const (
  47. initialReconnectDelay = 100 * time.Millisecond
  48. maxReconnectDelay = 30 * time.Second
  49. contextPrefix = "com.docker.swarm"
  50. )
  51. // ErrNoSwarm is returned on leaving a cluster that was never initialized
  52. var ErrNoSwarm = fmt.Errorf("This node is not part of a swarm")
  53. // ErrSwarmExists is returned on initialize or join request for a cluster that has already been activated
  54. var ErrSwarmExists = fmt.Errorf("This node is already part of a swarm. Use \"docker swarm leave\" to leave this swarm and join another one.")
  55. // ErrPendingSwarmExists is returned on initialize or join request for a cluster that is already processing a similar request but has not succeeded yet.
  56. var ErrPendingSwarmExists = fmt.Errorf("This node is processing an existing join request that has not succeeded yet. Use \"docker swarm leave\" to cancel the current request.")
  57. // ErrSwarmJoinTimeoutReached is returned when cluster join could not complete before timeout was reached.
  58. var ErrSwarmJoinTimeoutReached = fmt.Errorf("Timeout was reached before node was joined. The attempt to join the swarm will continue in the background. Use the \"docker info\" command to see the current swarm status of your node.")
  59. // ErrSwarmLocked is returned if the swarm is encrypted and needs a key to unlock it.
  60. var ErrSwarmLocked = fmt.Errorf("Swarm is encrypted and needs to be unlocked before it can be used. Please use \"docker swarm unlock\" to unlock it.")
  61. // NetworkSubnetsProvider exposes functions for retrieving the subnets
  62. // of networks managed by Docker, so they can be filtered.
  63. type NetworkSubnetsProvider interface {
  64. V4Subnets() []net.IPNet
  65. V6Subnets() []net.IPNet
  66. }
  67. // Config provides values for Cluster.
  68. type Config struct {
  69. Root string
  70. Name string
  71. Backend executorpkg.Backend
  72. NetworkSubnetsProvider NetworkSubnetsProvider
  73. // DefaultAdvertiseAddr is the default host/IP or network interface to use
  74. // if no AdvertiseAddr value is specified.
  75. DefaultAdvertiseAddr string
  76. // path to store runtime state, such as the swarm control socket
  77. RuntimeRoot string
  78. }
  79. // Cluster provides capabilities to participate in a cluster as a worker or a
  80. // manager.
  81. type Cluster struct {
  82. sync.RWMutex
  83. *node
  84. root string
  85. runtimeRoot string
  86. config Config
  87. configEvent chan struct{} // todo: make this array and goroutine safe
  88. actualLocalAddr string // after resolution, not persisted
  89. stop bool
  90. err error
  91. cancelDelay func()
  92. attachers map[string]*attacher
  93. locked bool
  94. lastNodeConfig *nodeStartConfig
  95. }
  96. // attacher manages the in-memory attachment state of a container
  97. // attachment to a global scope network managed by swarm manager. It
  98. // helps in identifying the attachment ID via the taskID and the
  99. // corresponding attachment configuration obtained from the manager.
  100. type attacher struct {
  101. taskID string
  102. config *network.NetworkingConfig
  103. attachWaitCh chan *network.NetworkingConfig
  104. attachCompleteCh chan struct{}
  105. detachWaitCh chan struct{}
  106. }
  107. type node struct {
  108. *swarmnode.Node
  109. done chan struct{}
  110. ready bool
  111. conn *grpc.ClientConn
  112. client swarmapi.ControlClient
  113. logs swarmapi.LogsClient
  114. reconnectDelay time.Duration
  115. config nodeStartConfig
  116. }
  117. // nodeStartConfig holds configuration needed to start a new node. Exported
  118. // fields of this structure are saved to disk in json. Unexported fields
  119. // contain data that shouldn't be persisted between daemon reloads.
  120. type nodeStartConfig struct {
  121. // LocalAddr is this machine's local IP or hostname, if specified.
  122. LocalAddr string
  123. // RemoteAddr is the address that was given to "swarm join". It is used
  124. // to find LocalAddr if necessary.
  125. RemoteAddr string
  126. // ListenAddr is the address we bind to, including a port.
  127. ListenAddr string
  128. // AdvertiseAddr is the address other nodes should connect to,
  129. // including a port.
  130. AdvertiseAddr string
  131. joinAddr string
  132. forceNewCluster bool
  133. joinToken string
  134. lockKey []byte
  135. autolock bool
  136. }
  137. // New creates a new Cluster instance using provided config.
  138. func New(config Config) (*Cluster, error) {
  139. root := filepath.Join(config.Root, swarmDirName)
  140. if err := os.MkdirAll(root, 0700); err != nil {
  141. return nil, err
  142. }
  143. if config.RuntimeRoot == "" {
  144. config.RuntimeRoot = root
  145. }
  146. if err := os.MkdirAll(config.RuntimeRoot, 0700); err != nil {
  147. return nil, err
  148. }
  149. c := &Cluster{
  150. root: root,
  151. config: config,
  152. configEvent: make(chan struct{}, 10),
  153. runtimeRoot: config.RuntimeRoot,
  154. attachers: make(map[string]*attacher),
  155. }
  156. nodeConfig, err := c.loadState()
  157. if err != nil {
  158. if os.IsNotExist(err) {
  159. return c, nil
  160. }
  161. return nil, err
  162. }
  163. n, err := c.startNewNode(*nodeConfig)
  164. if err != nil {
  165. return nil, err
  166. }
  167. select {
  168. case <-time.After(swarmConnectTimeout):
  169. logrus.Error("swarm component could not be started before timeout was reached")
  170. case <-n.Ready():
  171. case <-n.done:
  172. if errors.Cause(c.err) == ErrSwarmLocked {
  173. return c, nil
  174. }
  175. return nil, fmt.Errorf("swarm component could not be started: %v", c.err)
  176. }
  177. go c.reconnectOnFailure(n)
  178. return c, nil
  179. }
  180. func (c *Cluster) loadState() (*nodeStartConfig, error) {
  181. dt, err := ioutil.ReadFile(filepath.Join(c.root, stateFile))
  182. if err != nil {
  183. return nil, err
  184. }
  185. // missing certificate means no actual state to restore from
  186. if _, err := os.Stat(filepath.Join(c.root, "certificates/swarm-node.crt")); err != nil {
  187. if os.IsNotExist(err) {
  188. c.clearState()
  189. }
  190. return nil, err
  191. }
  192. var st nodeStartConfig
  193. if err := json.Unmarshal(dt, &st); err != nil {
  194. return nil, err
  195. }
  196. return &st, nil
  197. }
  198. func (c *Cluster) saveState(config nodeStartConfig) error {
  199. dt, err := json.Marshal(config)
  200. if err != nil {
  201. return err
  202. }
  203. return ioutils.AtomicWriteFile(filepath.Join(c.root, stateFile), dt, 0600)
  204. }
  205. func (c *Cluster) reconnectOnFailure(n *node) {
  206. for {
  207. <-n.done
  208. c.Lock()
  209. if c.stop || c.node != nil {
  210. c.Unlock()
  211. return
  212. }
  213. n.reconnectDelay *= 2
  214. if n.reconnectDelay > maxReconnectDelay {
  215. n.reconnectDelay = maxReconnectDelay
  216. }
  217. logrus.Warnf("Restarting swarm in %.2f seconds", n.reconnectDelay.Seconds())
  218. delayCtx, cancel := context.WithTimeout(context.Background(), n.reconnectDelay)
  219. c.cancelDelay = cancel
  220. c.Unlock()
  221. <-delayCtx.Done()
  222. if delayCtx.Err() != context.DeadlineExceeded {
  223. return
  224. }
  225. c.Lock()
  226. if c.node != nil {
  227. c.Unlock()
  228. return
  229. }
  230. var err error
  231. config := n.config
  232. config.RemoteAddr = c.getRemoteAddress()
  233. config.joinAddr = config.RemoteAddr
  234. n, err = c.startNewNode(config)
  235. if err != nil {
  236. c.err = err
  237. close(n.done)
  238. }
  239. c.Unlock()
  240. }
  241. }
  242. func (c *Cluster) startNewNode(conf nodeStartConfig) (*node, error) {
  243. if err := c.config.Backend.IsSwarmCompatible(); err != nil {
  244. return nil, err
  245. }
  246. actualLocalAddr := conf.LocalAddr
  247. if actualLocalAddr == "" {
  248. // If localAddr was not specified, resolve it automatically
  249. // based on the route to joinAddr. localAddr can only be left
  250. // empty on "join".
  251. listenHost, _, err := net.SplitHostPort(conf.ListenAddr)
  252. if err != nil {
  253. return nil, fmt.Errorf("could not parse listen address: %v", err)
  254. }
  255. listenAddrIP := net.ParseIP(listenHost)
  256. if listenAddrIP == nil || !listenAddrIP.IsUnspecified() {
  257. actualLocalAddr = listenHost
  258. } else {
  259. if conf.RemoteAddr == "" {
  260. // Should never happen except using swarms created by
  261. // old versions that didn't save remoteAddr.
  262. conf.RemoteAddr = "8.8.8.8:53"
  263. }
  264. conn, err := net.Dial("udp", conf.RemoteAddr)
  265. if err != nil {
  266. return nil, fmt.Errorf("could not find local IP address: %v", err)
  267. }
  268. localHostPort := conn.LocalAddr().String()
  269. actualLocalAddr, _, _ = net.SplitHostPort(localHostPort)
  270. conn.Close()
  271. }
  272. }
  273. var control string
  274. if runtime.GOOS == "windows" {
  275. control = `\\.\pipe\` + controlSocket
  276. } else {
  277. control = filepath.Join(c.runtimeRoot, controlSocket)
  278. }
  279. c.node = nil
  280. c.cancelDelay = nil
  281. c.stop = false
  282. n, err := swarmnode.New(&swarmnode.Config{
  283. Hostname: c.config.Name,
  284. ForceNewCluster: conf.forceNewCluster,
  285. ListenControlAPI: control,
  286. ListenRemoteAPI: conf.ListenAddr,
  287. AdvertiseRemoteAPI: conf.AdvertiseAddr,
  288. JoinAddr: conf.joinAddr,
  289. StateDir: c.root,
  290. JoinToken: conf.joinToken,
  291. Executor: container.NewExecutor(c.config.Backend),
  292. HeartbeatTick: 1,
  293. ElectionTick: 3,
  294. UnlockKey: conf.lockKey,
  295. AutoLockManagers: conf.autolock,
  296. })
  297. if err != nil {
  298. return nil, err
  299. }
  300. ctx := context.Background()
  301. if err := n.Start(ctx); err != nil {
  302. return nil, err
  303. }
  304. node := &node{
  305. Node: n,
  306. done: make(chan struct{}),
  307. reconnectDelay: initialReconnectDelay,
  308. config: conf,
  309. }
  310. c.node = node
  311. c.actualLocalAddr = actualLocalAddr // not saved
  312. c.saveState(conf)
  313. c.config.Backend.SetClusterProvider(c)
  314. go func() {
  315. err := detectLockedError(n.Err(ctx))
  316. if err != nil {
  317. logrus.Errorf("cluster exited with error: %v", err)
  318. }
  319. c.Lock()
  320. c.node = nil
  321. c.err = err
  322. if errors.Cause(err) == ErrSwarmLocked {
  323. c.locked = true
  324. confClone := conf
  325. c.lastNodeConfig = &confClone
  326. }
  327. c.Unlock()
  328. close(node.done)
  329. }()
  330. go func() {
  331. select {
  332. case <-n.Ready():
  333. c.Lock()
  334. node.ready = true
  335. c.err = nil
  336. c.Unlock()
  337. case <-ctx.Done():
  338. }
  339. c.configEvent <- struct{}{}
  340. }()
  341. go func() {
  342. for conn := range n.ListenControlSocket(ctx) {
  343. c.Lock()
  344. if node.conn != conn {
  345. if conn == nil {
  346. node.client = nil
  347. node.logs = nil
  348. } else {
  349. node.client = swarmapi.NewControlClient(conn)
  350. node.logs = swarmapi.NewLogsClient(conn)
  351. }
  352. }
  353. node.conn = conn
  354. c.Unlock()
  355. c.configEvent <- struct{}{}
  356. }
  357. }()
  358. return node, nil
  359. }
  360. // Init initializes new cluster from user provided request.
  361. func (c *Cluster) Init(req types.InitRequest) (string, error) {
  362. c.Lock()
  363. if node := c.node; node != nil || c.locked {
  364. if !req.ForceNewCluster {
  365. c.Unlock()
  366. return "", ErrSwarmExists
  367. }
  368. if err := c.stopNode(); err != nil {
  369. c.Unlock()
  370. return "", err
  371. }
  372. }
  373. if err := validateAndSanitizeInitRequest(&req); err != nil {
  374. c.Unlock()
  375. return "", err
  376. }
  377. listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
  378. if err != nil {
  379. c.Unlock()
  380. return "", err
  381. }
  382. advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
  383. if err != nil {
  384. c.Unlock()
  385. return "", err
  386. }
  387. localAddr := listenHost
  388. // If the advertise address is not one of the system's
  389. // addresses, we also require a listen address.
  390. listenAddrIP := net.ParseIP(listenHost)
  391. if listenAddrIP != nil && listenAddrIP.IsUnspecified() {
  392. advertiseIP := net.ParseIP(advertiseHost)
  393. if advertiseIP == nil {
  394. // not an IP
  395. c.Unlock()
  396. return "", errMustSpecifyListenAddr
  397. }
  398. systemIPs := listSystemIPs()
  399. found := false
  400. for _, systemIP := range systemIPs {
  401. if systemIP.Equal(advertiseIP) {
  402. found = true
  403. break
  404. }
  405. }
  406. if !found {
  407. c.Unlock()
  408. return "", errMustSpecifyListenAddr
  409. }
  410. localAddr = advertiseIP.String()
  411. }
  412. // todo: check current state existing
  413. n, err := c.startNewNode(nodeStartConfig{
  414. forceNewCluster: req.ForceNewCluster,
  415. autolock: req.AutoLockManagers,
  416. LocalAddr: localAddr,
  417. ListenAddr: net.JoinHostPort(listenHost, listenPort),
  418. AdvertiseAddr: net.JoinHostPort(advertiseHost, advertisePort),
  419. })
  420. if err != nil {
  421. c.Unlock()
  422. return "", err
  423. }
  424. c.Unlock()
  425. select {
  426. case <-n.Ready():
  427. if err := initClusterSpec(n, req.Spec); err != nil {
  428. return "", err
  429. }
  430. go c.reconnectOnFailure(n)
  431. return n.NodeID(), nil
  432. case <-n.done:
  433. c.RLock()
  434. defer c.RUnlock()
  435. if !req.ForceNewCluster { // if failure on first attempt don't keep state
  436. if err := c.clearState(); err != nil {
  437. return "", err
  438. }
  439. }
  440. return "", c.err
  441. }
  442. }
  443. // Join makes current Cluster part of an existing swarm cluster.
  444. func (c *Cluster) Join(req types.JoinRequest) error {
  445. c.Lock()
  446. if node := c.node; node != nil || c.locked {
  447. c.Unlock()
  448. return ErrSwarmExists
  449. }
  450. if err := validateAndSanitizeJoinRequest(&req); err != nil {
  451. c.Unlock()
  452. return err
  453. }
  454. listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
  455. if err != nil {
  456. c.Unlock()
  457. return err
  458. }
  459. var advertiseAddr string
  460. if req.AdvertiseAddr != "" {
  461. advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
  462. // For joining, we don't need to provide an advertise address,
  463. // since the remote side can detect it.
  464. if err == nil {
  465. advertiseAddr = net.JoinHostPort(advertiseHost, advertisePort)
  466. }
  467. }
  468. // todo: check current state existing
  469. n, err := c.startNewNode(nodeStartConfig{
  470. RemoteAddr: req.RemoteAddrs[0],
  471. ListenAddr: net.JoinHostPort(listenHost, listenPort),
  472. AdvertiseAddr: advertiseAddr,
  473. joinAddr: req.RemoteAddrs[0],
  474. joinToken: req.JoinToken,
  475. })
  476. if err != nil {
  477. c.Unlock()
  478. return err
  479. }
  480. c.Unlock()
  481. select {
  482. case <-time.After(swarmConnectTimeout):
  483. // attempt to connect will continue in background, but reconnect only if it didn't fail
  484. go func() {
  485. select {
  486. case <-n.Ready():
  487. c.reconnectOnFailure(n)
  488. case <-n.done:
  489. logrus.Errorf("failed to join the cluster: %+v", c.err)
  490. }
  491. }()
  492. return ErrSwarmJoinTimeoutReached
  493. case <-n.Ready():
  494. go c.reconnectOnFailure(n)
  495. return nil
  496. case <-n.done:
  497. c.RLock()
  498. defer c.RUnlock()
  499. return c.err
  500. }
  501. }
  502. // GetUnlockKey returns the unlock key for the swarm.
  503. func (c *Cluster) GetUnlockKey() (string, error) {
  504. c.RLock()
  505. defer c.RUnlock()
  506. if !c.isActiveManager() {
  507. return "", c.errNoManager()
  508. }
  509. ctx, cancel := c.getRequestContext()
  510. defer cancel()
  511. client := swarmapi.NewCAClient(c.conn)
  512. r, err := client.GetUnlockKey(ctx, &swarmapi.GetUnlockKeyRequest{})
  513. if err != nil {
  514. return "", err
  515. }
  516. if len(r.UnlockKey) == 0 {
  517. // no key
  518. return "", nil
  519. }
  520. return encryption.HumanReadableKey(r.UnlockKey), nil
  521. }
  522. // UnlockSwarm provides a key to decrypt data that is encrypted at rest.
  523. func (c *Cluster) UnlockSwarm(req types.UnlockRequest) error {
  524. key, err := encryption.ParseHumanReadableKey(req.UnlockKey)
  525. if err != nil {
  526. return err
  527. }
  528. c.Lock()
  529. if c.node != nil || c.locked != true {
  530. c.Unlock()
  531. return errors.New("swarm is not locked")
  532. }
  533. config := *c.lastNodeConfig
  534. config.lockKey = key
  535. n, err := c.startNewNode(config)
  536. if err != nil {
  537. c.Unlock()
  538. return err
  539. }
  540. c.Unlock()
  541. select {
  542. case <-n.Ready():
  543. case <-n.done:
  544. if errors.Cause(c.err) == ErrSwarmLocked {
  545. return errors.New("swarm could not be unlocked: invalid key provided")
  546. }
  547. return fmt.Errorf("swarm component could not be started: %v", c.err)
  548. }
  549. go c.reconnectOnFailure(n)
  550. return nil
  551. }
  552. // stopNode is a helper that stops the active c.node and waits until it has
  553. // shut down. Call while keeping the cluster lock.
  554. func (c *Cluster) stopNode() error {
  555. if c.node == nil {
  556. return nil
  557. }
  558. c.stop = true
  559. if c.cancelDelay != nil {
  560. c.cancelDelay()
  561. c.cancelDelay = nil
  562. }
  563. node := c.node
  564. ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
  565. defer cancel()
  566. // TODO: can't hold lock on stop because it calls back to network
  567. c.Unlock()
  568. defer c.Lock()
  569. if err := node.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") {
  570. return err
  571. }
  572. <-node.done
  573. return nil
  574. }
  575. func removingManagerCausesLossOfQuorum(reachable, unreachable int) bool {
  576. return reachable-2 <= unreachable
  577. }
  578. func isLastManager(reachable, unreachable int) bool {
  579. return reachable == 1 && unreachable == 0
  580. }
  581. // Leave shuts down Cluster and removes current state.
  582. func (c *Cluster) Leave(force bool) error {
  583. c.Lock()
  584. node := c.node
  585. if node == nil {
  586. if c.locked {
  587. c.locked = false
  588. c.lastNodeConfig = nil
  589. c.Unlock()
  590. } else {
  591. c.Unlock()
  592. return ErrNoSwarm
  593. }
  594. } else {
  595. if node.Manager() != nil && !force {
  596. msg := "You are attempting to leave the swarm on a node that is participating as a manager. "
  597. if c.isActiveManager() {
  598. active, reachable, unreachable, err := c.managerStats()
  599. if err == nil {
  600. if active && removingManagerCausesLossOfQuorum(reachable, unreachable) {
  601. if isLastManager(reachable, unreachable) {
  602. msg += "Removing the last manager erases all current state of the swarm. Use `--force` to ignore this message. "
  603. c.Unlock()
  604. return fmt.Errorf(msg)
  605. }
  606. msg += fmt.Sprintf("Removing this node leaves %v managers out of %v. Without a Raft quorum your swarm will be inaccessible. ", reachable-1, reachable+unreachable)
  607. }
  608. }
  609. } else {
  610. msg += "Doing so may lose the consensus of your cluster. "
  611. }
  612. msg += "The only way to restore a swarm that has lost consensus is to reinitialize it with `--force-new-cluster`. Use `--force` to suppress this message."
  613. c.Unlock()
  614. return fmt.Errorf(msg)
  615. }
  616. if err := c.stopNode(); err != nil {
  617. logrus.Errorf("failed to shut down cluster node: %v", err)
  618. signal.DumpStacks("")
  619. c.Unlock()
  620. return err
  621. }
  622. c.Unlock()
  623. if nodeID := node.NodeID(); nodeID != "" {
  624. nodeContainers, err := c.listContainerForNode(nodeID)
  625. if err != nil {
  626. return err
  627. }
  628. for _, id := range nodeContainers {
  629. if err := c.config.Backend.ContainerRm(id, &apitypes.ContainerRmConfig{ForceRemove: true}); err != nil {
  630. logrus.Errorf("error removing %v: %v", id, err)
  631. }
  632. }
  633. }
  634. }
  635. c.configEvent <- struct{}{}
  636. // todo: cleanup optional?
  637. if err := c.clearState(); err != nil {
  638. return err
  639. }
  640. return nil
  641. }
  642. func (c *Cluster) listContainerForNode(nodeID string) ([]string, error) {
  643. var ids []string
  644. filters := filters.NewArgs()
  645. filters.Add("label", fmt.Sprintf("com.docker.swarm.node.id=%s", nodeID))
  646. containers, err := c.config.Backend.Containers(&apitypes.ContainerListOptions{
  647. Filters: filters,
  648. })
  649. if err != nil {
  650. return []string{}, err
  651. }
  652. for _, c := range containers {
  653. ids = append(ids, c.ID)
  654. }
  655. return ids, nil
  656. }
  657. func (c *Cluster) clearState() error {
  658. // todo: backup this data instead of removing?
  659. if err := os.RemoveAll(c.root); err != nil {
  660. return err
  661. }
  662. if err := os.MkdirAll(c.root, 0700); err != nil {
  663. return err
  664. }
  665. c.config.Backend.SetClusterProvider(nil)
  666. return nil
  667. }
  668. func (c *Cluster) getRequestContext() (context.Context, func()) { // TODO: not needed when requests don't block on qourum lost
  669. return context.WithTimeout(context.Background(), swarmRequestTimeout)
  670. }
  671. // Inspect retrieves the configuration properties of a managed swarm cluster.
  672. func (c *Cluster) Inspect() (types.Swarm, error) {
  673. c.RLock()
  674. defer c.RUnlock()
  675. if !c.isActiveManager() {
  676. return types.Swarm{}, c.errNoManager()
  677. }
  678. ctx, cancel := c.getRequestContext()
  679. defer cancel()
  680. swarm, err := getSwarm(ctx, c.client)
  681. if err != nil {
  682. return types.Swarm{}, err
  683. }
  684. return convert.SwarmFromGRPC(*swarm), nil
  685. }
  686. // Update updates configuration of a managed swarm cluster.
  687. func (c *Cluster) Update(version uint64, spec types.Spec, flags types.UpdateFlags) error {
  688. c.RLock()
  689. defer c.RUnlock()
  690. if !c.isActiveManager() {
  691. return c.errNoManager()
  692. }
  693. ctx, cancel := c.getRequestContext()
  694. defer cancel()
  695. swarm, err := getSwarm(ctx, c.client)
  696. if err != nil {
  697. return err
  698. }
  699. // In update, client should provide the complete spec of the swarm, including
  700. // Name and Labels. If a field is specified with 0 or nil, then the default value
  701. // will be used to swarmkit.
  702. clusterSpec, err := convert.SwarmSpecToGRPC(spec)
  703. if err != nil {
  704. return err
  705. }
  706. _, err = c.client.UpdateCluster(
  707. ctx,
  708. &swarmapi.UpdateClusterRequest{
  709. ClusterID: swarm.ID,
  710. Spec: &clusterSpec,
  711. ClusterVersion: &swarmapi.Version{
  712. Index: version,
  713. },
  714. Rotation: swarmapi.KeyRotation{
  715. WorkerJoinToken: flags.RotateWorkerToken,
  716. ManagerJoinToken: flags.RotateManagerToken,
  717. ManagerUnlockKey: flags.RotateManagerUnlockKey,
  718. },
  719. },
  720. )
  721. return err
  722. }
  723. // IsManager returns true if Cluster is participating as a manager.
  724. func (c *Cluster) IsManager() bool {
  725. c.RLock()
  726. defer c.RUnlock()
  727. return c.isActiveManager()
  728. }
  729. // IsAgent returns true if Cluster is participating as a worker/agent.
  730. func (c *Cluster) IsAgent() bool {
  731. c.RLock()
  732. defer c.RUnlock()
  733. return c.node != nil && c.ready
  734. }
  735. // GetLocalAddress returns the local address.
  736. func (c *Cluster) GetLocalAddress() string {
  737. c.RLock()
  738. defer c.RUnlock()
  739. return c.actualLocalAddr
  740. }
  741. // GetListenAddress returns the listen address.
  742. func (c *Cluster) GetListenAddress() string {
  743. c.RLock()
  744. defer c.RUnlock()
  745. if c.node != nil {
  746. return c.node.config.ListenAddr
  747. }
  748. return ""
  749. }
  750. // GetAdvertiseAddress returns the remotely reachable address of this node.
  751. func (c *Cluster) GetAdvertiseAddress() string {
  752. c.RLock()
  753. defer c.RUnlock()
  754. if c.node != nil && c.node.config.AdvertiseAddr != "" {
  755. advertiseHost, _, _ := net.SplitHostPort(c.node.config.AdvertiseAddr)
  756. return advertiseHost
  757. }
  758. return c.actualLocalAddr
  759. }
  760. // GetRemoteAddress returns a known advertise address of a remote manager if
  761. // available.
  762. // todo: change to array/connect with info
  763. func (c *Cluster) GetRemoteAddress() string {
  764. c.RLock()
  765. defer c.RUnlock()
  766. return c.getRemoteAddress()
  767. }
  768. func (c *Cluster) getRemoteAddress() string {
  769. if c.node == nil {
  770. return ""
  771. }
  772. nodeID := c.node.NodeID()
  773. for _, r := range c.node.Remotes() {
  774. if r.NodeID != nodeID {
  775. return r.Addr
  776. }
  777. }
  778. return ""
  779. }
  780. // ListenClusterEvents returns a channel that receives messages on cluster
  781. // participation changes.
  782. // todo: make cancelable and accessible to multiple callers
  783. func (c *Cluster) ListenClusterEvents() <-chan struct{} {
  784. return c.configEvent
  785. }
  786. // Info returns information about the current cluster state.
  787. func (c *Cluster) Info() types.Info {
  788. info := types.Info{
  789. NodeAddr: c.GetAdvertiseAddress(),
  790. }
  791. c.RLock()
  792. defer c.RUnlock()
  793. if c.node == nil {
  794. info.LocalNodeState = types.LocalNodeStateInactive
  795. if c.cancelDelay != nil {
  796. info.LocalNodeState = types.LocalNodeStateError
  797. }
  798. if c.locked {
  799. info.LocalNodeState = types.LocalNodeStateLocked
  800. }
  801. } else {
  802. info.LocalNodeState = types.LocalNodeStatePending
  803. if c.ready == true {
  804. info.LocalNodeState = types.LocalNodeStateActive
  805. } else if c.locked {
  806. info.LocalNodeState = types.LocalNodeStateLocked
  807. }
  808. }
  809. if c.err != nil {
  810. info.Error = c.err.Error()
  811. }
  812. ctx, cancel := c.getRequestContext()
  813. defer cancel()
  814. if c.isActiveManager() {
  815. info.ControlAvailable = true
  816. swarm, err := c.Inspect()
  817. if err != nil {
  818. info.Error = err.Error()
  819. }
  820. // Strip JoinTokens
  821. info.Cluster = swarm.ClusterInfo
  822. if r, err := c.client.ListNodes(ctx, &swarmapi.ListNodesRequest{}); err == nil {
  823. info.Nodes = len(r.Nodes)
  824. for _, n := range r.Nodes {
  825. if n.ManagerStatus != nil {
  826. info.Managers = info.Managers + 1
  827. }
  828. }
  829. }
  830. }
  831. if c.node != nil {
  832. for _, r := range c.node.Remotes() {
  833. info.RemoteManagers = append(info.RemoteManagers, types.Peer{NodeID: r.NodeID, Addr: r.Addr})
  834. }
  835. info.NodeID = c.node.NodeID()
  836. }
  837. return info
  838. }
  839. // isActiveManager should not be called without a read lock
  840. func (c *Cluster) isActiveManager() bool {
  841. return c.node != nil && c.conn != nil
  842. }
  843. // errNoManager returns error describing why manager commands can't be used.
  844. // Call with read lock.
  845. func (c *Cluster) errNoManager() error {
  846. if c.node == nil {
  847. if c.locked {
  848. return ErrSwarmLocked
  849. }
  850. return fmt.Errorf("This node is not a swarm manager. Use \"docker swarm init\" or \"docker swarm join\" to connect this node to swarm and try again.")
  851. }
  852. if c.node.Manager() != nil {
  853. return fmt.Errorf("This node is not a swarm manager. Manager is being prepared or has trouble connecting to the cluster.")
  854. }
  855. return fmt.Errorf("This node is not a swarm manager. Worker nodes can't be used to view or modify cluster state. Please run this command on a manager node or promote the current node to a manager.")
  856. }
  857. // GetServices returns all services of a managed swarm cluster.
  858. func (c *Cluster) GetServices(options apitypes.ServiceListOptions) ([]types.Service, error) {
  859. c.RLock()
  860. defer c.RUnlock()
  861. if !c.isActiveManager() {
  862. return nil, c.errNoManager()
  863. }
  864. filters, err := newListServicesFilters(options.Filters)
  865. if err != nil {
  866. return nil, err
  867. }
  868. ctx, cancel := c.getRequestContext()
  869. defer cancel()
  870. r, err := c.client.ListServices(
  871. ctx,
  872. &swarmapi.ListServicesRequest{Filters: filters})
  873. if err != nil {
  874. return nil, err
  875. }
  876. services := []types.Service{}
  877. for _, service := range r.Services {
  878. services = append(services, convert.ServiceFromGRPC(*service))
  879. }
  880. return services, nil
  881. }
  882. // imageWithDigestString takes an image such as name or name:tag
  883. // and returns the image pinned to a digest, such as name@sha256:34234...
  884. func (c *Cluster) imageWithDigestString(ctx context.Context, image string, authConfig *apitypes.AuthConfig) (string, error) {
  885. ref, err := reference.ParseNamed(image)
  886. if err != nil {
  887. return "", err
  888. }
  889. // only query registry if not a canonical reference (i.e. with digest)
  890. if _, ok := ref.(reference.Canonical); !ok {
  891. ref = reference.WithDefaultTag(ref)
  892. namedTaggedRef, ok := ref.(reference.NamedTagged)
  893. if !ok {
  894. return "", fmt.Errorf("unable to cast image to NamedTagged reference object")
  895. }
  896. repo, _, err := c.config.Backend.GetRepository(ctx, namedTaggedRef, authConfig)
  897. if err != nil {
  898. return "", err
  899. }
  900. dscrptr, err := repo.Tags(ctx).Get(ctx, namedTaggedRef.Tag())
  901. if err != nil {
  902. return "", err
  903. }
  904. // TODO(nishanttotla): Currently, the service would lose the tag while calling WithDigest
  905. // To prevent this, we create the image string manually, which is a bad idea in general
  906. // This will be fixed when https://github.com/docker/distribution/pull/2044 is vendored
  907. // namedDigestedRef, err := reference.WithDigest(ref, dscrptr.Digest)
  908. // if err != nil {
  909. // return "", err
  910. // }
  911. // return namedDigestedRef.String(), nil
  912. return image + "@" + dscrptr.Digest.String(), nil
  913. } else {
  914. // reference already contains a digest, so just return it
  915. return ref.String(), nil
  916. }
  917. }
  918. // CreateService creates a new service in a managed swarm cluster.
  919. func (c *Cluster) CreateService(s types.ServiceSpec, encodedAuth string) (string, error) {
  920. c.RLock()
  921. defer c.RUnlock()
  922. if !c.isActiveManager() {
  923. return "", c.errNoManager()
  924. }
  925. ctx, cancel := c.getRequestContext()
  926. defer cancel()
  927. err := c.populateNetworkID(ctx, c.client, &s)
  928. if err != nil {
  929. return "", err
  930. }
  931. serviceSpec, err := convert.ServiceSpecToGRPC(s)
  932. if err != nil {
  933. return "", err
  934. }
  935. ctnr := serviceSpec.Task.GetContainer()
  936. if ctnr == nil {
  937. return "", fmt.Errorf("service does not use container tasks")
  938. }
  939. if encodedAuth != "" {
  940. ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
  941. }
  942. // retrieve auth config from encoded auth
  943. authConfig := &apitypes.AuthConfig{}
  944. if encodedAuth != "" {
  945. if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil {
  946. logrus.Warnf("invalid authconfig: %v", err)
  947. }
  948. }
  949. // pin image by digest
  950. if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" {
  951. digestImage, err := c.imageWithDigestString(ctx, ctnr.Image, authConfig)
  952. if err != nil {
  953. logrus.Warnf("unable to pin image %s to digest: %s", ctnr.Image, err.Error())
  954. } else {
  955. logrus.Debugf("pinning image %s by digest: %s", ctnr.Image, digestImage)
  956. ctnr.Image = digestImage
  957. }
  958. }
  959. r, err := c.client.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
  960. if err != nil {
  961. return "", err
  962. }
  963. return r.Service.ID, nil
  964. }
  965. // GetService returns a service based on an ID or name.
  966. func (c *Cluster) GetService(input string) (types.Service, error) {
  967. c.RLock()
  968. defer c.RUnlock()
  969. if !c.isActiveManager() {
  970. return types.Service{}, c.errNoManager()
  971. }
  972. ctx, cancel := c.getRequestContext()
  973. defer cancel()
  974. service, err := getService(ctx, c.client, input)
  975. if err != nil {
  976. return types.Service{}, err
  977. }
  978. return convert.ServiceFromGRPC(*service), nil
  979. }
  980. // UpdateService updates existing service to match new properties.
  981. func (c *Cluster) UpdateService(serviceIDOrName string, version uint64, spec types.ServiceSpec, encodedAuth string, registryAuthFrom string) error {
  982. c.RLock()
  983. defer c.RUnlock()
  984. if !c.isActiveManager() {
  985. return c.errNoManager()
  986. }
  987. ctx, cancel := c.getRequestContext()
  988. defer cancel()
  989. err := c.populateNetworkID(ctx, c.client, &spec)
  990. if err != nil {
  991. return err
  992. }
  993. serviceSpec, err := convert.ServiceSpecToGRPC(spec)
  994. if err != nil {
  995. return err
  996. }
  997. currentService, err := getService(ctx, c.client, serviceIDOrName)
  998. if err != nil {
  999. return err
  1000. }
  1001. newCtnr := serviceSpec.Task.GetContainer()
  1002. if newCtnr == nil {
  1003. return fmt.Errorf("service does not use container tasks")
  1004. }
  1005. if encodedAuth != "" {
  1006. newCtnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
  1007. } else {
  1008. // this is needed because if the encodedAuth isn't being updated then we
  1009. // shouldn't lose it, and continue to use the one that was already present
  1010. var ctnr *swarmapi.ContainerSpec
  1011. switch registryAuthFrom {
  1012. case apitypes.RegistryAuthFromSpec, "":
  1013. ctnr = currentService.Spec.Task.GetContainer()
  1014. case apitypes.RegistryAuthFromPreviousSpec:
  1015. if currentService.PreviousSpec == nil {
  1016. return fmt.Errorf("service does not have a previous spec")
  1017. }
  1018. ctnr = currentService.PreviousSpec.Task.GetContainer()
  1019. default:
  1020. return fmt.Errorf("unsupported registryAuthFromValue")
  1021. }
  1022. if ctnr == nil {
  1023. return fmt.Errorf("service does not use container tasks")
  1024. }
  1025. newCtnr.PullOptions = ctnr.PullOptions
  1026. // update encodedAuth so it can be used to pin image by digest
  1027. if ctnr.PullOptions != nil {
  1028. encodedAuth = ctnr.PullOptions.RegistryAuth
  1029. }
  1030. }
  1031. // retrieve auth config from encoded auth
  1032. authConfig := &apitypes.AuthConfig{}
  1033. if encodedAuth != "" {
  1034. if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil {
  1035. logrus.Warnf("invalid authconfig: %v", err)
  1036. }
  1037. }
  1038. // pin image by digest
  1039. if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" {
  1040. digestImage, err := c.imageWithDigestString(ctx, newCtnr.Image, authConfig)
  1041. if err != nil {
  1042. logrus.Warnf("unable to pin image %s to digest: %s", newCtnr.Image, err.Error())
  1043. } else if newCtnr.Image != digestImage {
  1044. logrus.Debugf("pinning image %s by digest: %s", newCtnr.Image, digestImage)
  1045. newCtnr.Image = digestImage
  1046. }
  1047. }
  1048. _, err = c.client.UpdateService(
  1049. ctx,
  1050. &swarmapi.UpdateServiceRequest{
  1051. ServiceID: currentService.ID,
  1052. Spec: &serviceSpec,
  1053. ServiceVersion: &swarmapi.Version{
  1054. Index: version,
  1055. },
  1056. },
  1057. )
  1058. return err
  1059. }
  1060. // RemoveService removes a service from a managed swarm cluster.
  1061. func (c *Cluster) RemoveService(input string) error {
  1062. c.RLock()
  1063. defer c.RUnlock()
  1064. if !c.isActiveManager() {
  1065. return c.errNoManager()
  1066. }
  1067. ctx, cancel := c.getRequestContext()
  1068. defer cancel()
  1069. service, err := getService(ctx, c.client, input)
  1070. if err != nil {
  1071. return err
  1072. }
  1073. if _, err := c.client.RemoveService(ctx, &swarmapi.RemoveServiceRequest{ServiceID: service.ID}); err != nil {
  1074. return err
  1075. }
  1076. return nil
  1077. }
  1078. // ServiceLogs collects service logs and writes them back to `config.OutStream`
  1079. func (c *Cluster) ServiceLogs(ctx context.Context, input string, config *backend.ContainerLogsConfig, started chan struct{}) error {
  1080. c.RLock()
  1081. if !c.isActiveManager() {
  1082. c.RUnlock()
  1083. return c.errNoManager()
  1084. }
  1085. service, err := getService(ctx, c.client, input)
  1086. if err != nil {
  1087. c.RUnlock()
  1088. return err
  1089. }
  1090. stream, err := c.logs.SubscribeLogs(ctx, &swarmapi.SubscribeLogsRequest{
  1091. Selector: &swarmapi.LogSelector{
  1092. ServiceIDs: []string{service.ID},
  1093. },
  1094. Options: &swarmapi.LogSubscriptionOptions{
  1095. Follow: true,
  1096. },
  1097. })
  1098. if err != nil {
  1099. c.RUnlock()
  1100. return err
  1101. }
  1102. wf := ioutils.NewWriteFlusher(config.OutStream)
  1103. defer wf.Close()
  1104. close(started)
  1105. wf.Flush()
  1106. outStream := stdcopy.NewStdWriter(wf, stdcopy.Stdout)
  1107. errStream := stdcopy.NewStdWriter(wf, stdcopy.Stderr)
  1108. // Release the lock before starting the stream.
  1109. c.RUnlock()
  1110. for {
  1111. // Check the context before doing anything.
  1112. select {
  1113. case <-ctx.Done():
  1114. return ctx.Err()
  1115. default:
  1116. }
  1117. subscribeMsg, err := stream.Recv()
  1118. if err == io.EOF {
  1119. return nil
  1120. }
  1121. if err != nil {
  1122. return err
  1123. }
  1124. for _, msg := range subscribeMsg.Messages {
  1125. data := []byte{}
  1126. if config.Timestamps {
  1127. ts, err := ptypes.Timestamp(msg.Timestamp)
  1128. if err != nil {
  1129. return err
  1130. }
  1131. data = append(data, []byte(ts.Format(logger.TimeFormat)+" ")...)
  1132. }
  1133. data = append(data, []byte(fmt.Sprintf("%s.node.id=%s,%s.service.id=%s,%s.task.id=%s ",
  1134. contextPrefix, msg.Context.NodeID,
  1135. contextPrefix, msg.Context.ServiceID,
  1136. contextPrefix, msg.Context.TaskID,
  1137. ))...)
  1138. data = append(data, msg.Data...)
  1139. switch msg.Stream {
  1140. case swarmapi.LogStreamStdout:
  1141. outStream.Write(data)
  1142. case swarmapi.LogStreamStderr:
  1143. errStream.Write(data)
  1144. }
  1145. }
  1146. }
  1147. }
  1148. // GetNodes returns a list of all nodes known to a cluster.
  1149. func (c *Cluster) GetNodes(options apitypes.NodeListOptions) ([]types.Node, error) {
  1150. c.RLock()
  1151. defer c.RUnlock()
  1152. if !c.isActiveManager() {
  1153. return nil, c.errNoManager()
  1154. }
  1155. filters, err := newListNodesFilters(options.Filters)
  1156. if err != nil {
  1157. return nil, err
  1158. }
  1159. ctx, cancel := c.getRequestContext()
  1160. defer cancel()
  1161. r, err := c.client.ListNodes(
  1162. ctx,
  1163. &swarmapi.ListNodesRequest{Filters: filters})
  1164. if err != nil {
  1165. return nil, err
  1166. }
  1167. nodes := []types.Node{}
  1168. for _, node := range r.Nodes {
  1169. nodes = append(nodes, convert.NodeFromGRPC(*node))
  1170. }
  1171. return nodes, nil
  1172. }
  1173. // GetNode returns a node based on an ID or name.
  1174. func (c *Cluster) GetNode(input string) (types.Node, error) {
  1175. c.RLock()
  1176. defer c.RUnlock()
  1177. if !c.isActiveManager() {
  1178. return types.Node{}, c.errNoManager()
  1179. }
  1180. ctx, cancel := c.getRequestContext()
  1181. defer cancel()
  1182. node, err := getNode(ctx, c.client, input)
  1183. if err != nil {
  1184. return types.Node{}, err
  1185. }
  1186. return convert.NodeFromGRPC(*node), nil
  1187. }
  1188. // UpdateNode updates existing nodes properties.
  1189. func (c *Cluster) UpdateNode(nodeID string, version uint64, spec types.NodeSpec) error {
  1190. c.RLock()
  1191. defer c.RUnlock()
  1192. if !c.isActiveManager() {
  1193. return c.errNoManager()
  1194. }
  1195. nodeSpec, err := convert.NodeSpecToGRPC(spec)
  1196. if err != nil {
  1197. return err
  1198. }
  1199. ctx, cancel := c.getRequestContext()
  1200. defer cancel()
  1201. _, err = c.client.UpdateNode(
  1202. ctx,
  1203. &swarmapi.UpdateNodeRequest{
  1204. NodeID: nodeID,
  1205. Spec: &nodeSpec,
  1206. NodeVersion: &swarmapi.Version{
  1207. Index: version,
  1208. },
  1209. },
  1210. )
  1211. return err
  1212. }
  1213. // RemoveNode removes a node from a cluster
  1214. func (c *Cluster) RemoveNode(input string, force bool) error {
  1215. c.RLock()
  1216. defer c.RUnlock()
  1217. if !c.isActiveManager() {
  1218. return c.errNoManager()
  1219. }
  1220. ctx, cancel := c.getRequestContext()
  1221. defer cancel()
  1222. node, err := getNode(ctx, c.client, input)
  1223. if err != nil {
  1224. return err
  1225. }
  1226. if _, err := c.client.RemoveNode(ctx, &swarmapi.RemoveNodeRequest{NodeID: node.ID, Force: force}); err != nil {
  1227. return err
  1228. }
  1229. return nil
  1230. }
  1231. // GetTasks returns a list of tasks matching the filter options.
  1232. func (c *Cluster) GetTasks(options apitypes.TaskListOptions) ([]types.Task, error) {
  1233. c.RLock()
  1234. defer c.RUnlock()
  1235. if !c.isActiveManager() {
  1236. return nil, c.errNoManager()
  1237. }
  1238. byName := func(filter filters.Args) error {
  1239. if filter.Include("service") {
  1240. serviceFilters := filter.Get("service")
  1241. for _, serviceFilter := range serviceFilters {
  1242. service, err := c.GetService(serviceFilter)
  1243. if err != nil {
  1244. return err
  1245. }
  1246. filter.Del("service", serviceFilter)
  1247. filter.Add("service", service.ID)
  1248. }
  1249. }
  1250. if filter.Include("node") {
  1251. nodeFilters := filter.Get("node")
  1252. for _, nodeFilter := range nodeFilters {
  1253. node, err := c.GetNode(nodeFilter)
  1254. if err != nil {
  1255. return err
  1256. }
  1257. filter.Del("node", nodeFilter)
  1258. filter.Add("node", node.ID)
  1259. }
  1260. }
  1261. return nil
  1262. }
  1263. filters, err := newListTasksFilters(options.Filters, byName)
  1264. if err != nil {
  1265. return nil, err
  1266. }
  1267. ctx, cancel := c.getRequestContext()
  1268. defer cancel()
  1269. r, err := c.client.ListTasks(
  1270. ctx,
  1271. &swarmapi.ListTasksRequest{Filters: filters})
  1272. if err != nil {
  1273. return nil, err
  1274. }
  1275. tasks := []types.Task{}
  1276. for _, task := range r.Tasks {
  1277. if task.Spec.GetContainer() != nil {
  1278. tasks = append(tasks, convert.TaskFromGRPC(*task))
  1279. }
  1280. }
  1281. return tasks, nil
  1282. }
  1283. // GetTask returns a task by an ID.
  1284. func (c *Cluster) GetTask(input string) (types.Task, error) {
  1285. c.RLock()
  1286. defer c.RUnlock()
  1287. if !c.isActiveManager() {
  1288. return types.Task{}, c.errNoManager()
  1289. }
  1290. ctx, cancel := c.getRequestContext()
  1291. defer cancel()
  1292. task, err := getTask(ctx, c.client, input)
  1293. if err != nil {
  1294. return types.Task{}, err
  1295. }
  1296. return convert.TaskFromGRPC(*task), nil
  1297. }
  1298. // GetNetwork returns a cluster network by an ID.
  1299. func (c *Cluster) GetNetwork(input string) (apitypes.NetworkResource, error) {
  1300. c.RLock()
  1301. defer c.RUnlock()
  1302. if !c.isActiveManager() {
  1303. return apitypes.NetworkResource{}, c.errNoManager()
  1304. }
  1305. ctx, cancel := c.getRequestContext()
  1306. defer cancel()
  1307. network, err := getNetwork(ctx, c.client, input)
  1308. if err != nil {
  1309. return apitypes.NetworkResource{}, err
  1310. }
  1311. return convert.BasicNetworkFromGRPC(*network), nil
  1312. }
  1313. // GetNetworks returns all current cluster managed networks.
  1314. func (c *Cluster) GetNetworks() ([]apitypes.NetworkResource, error) {
  1315. c.RLock()
  1316. defer c.RUnlock()
  1317. if !c.isActiveManager() {
  1318. return nil, c.errNoManager()
  1319. }
  1320. ctx, cancel := c.getRequestContext()
  1321. defer cancel()
  1322. r, err := c.client.ListNetworks(ctx, &swarmapi.ListNetworksRequest{})
  1323. if err != nil {
  1324. return nil, err
  1325. }
  1326. var networks []apitypes.NetworkResource
  1327. for _, network := range r.Networks {
  1328. networks = append(networks, convert.BasicNetworkFromGRPC(*network))
  1329. }
  1330. return networks, nil
  1331. }
  1332. func attacherKey(target, containerID string) string {
  1333. return containerID + ":" + target
  1334. }
  1335. // UpdateAttachment signals the attachment config to the attachment
  1336. // waiter who is trying to start or attach the container to the
  1337. // network.
  1338. func (c *Cluster) UpdateAttachment(target, containerID string, config *network.NetworkingConfig) error {
  1339. c.RLock()
  1340. attacher, ok := c.attachers[attacherKey(target, containerID)]
  1341. c.RUnlock()
  1342. if !ok || attacher == nil {
  1343. return fmt.Errorf("could not find attacher for container %s to network %s", containerID, target)
  1344. }
  1345. attacher.attachWaitCh <- config
  1346. close(attacher.attachWaitCh)
  1347. return nil
  1348. }
  1349. // WaitForDetachment waits for the container to stop or detach from
  1350. // the network.
  1351. func (c *Cluster) WaitForDetachment(ctx context.Context, networkName, networkID, taskID, containerID string) error {
  1352. c.RLock()
  1353. attacher, ok := c.attachers[attacherKey(networkName, containerID)]
  1354. if !ok {
  1355. attacher, ok = c.attachers[attacherKey(networkID, containerID)]
  1356. }
  1357. if c.node == nil || c.node.Agent() == nil {
  1358. c.RUnlock()
  1359. return fmt.Errorf("invalid cluster node while waiting for detachment")
  1360. }
  1361. agent := c.node.Agent()
  1362. c.RUnlock()
  1363. if ok && attacher != nil &&
  1364. attacher.detachWaitCh != nil &&
  1365. attacher.attachCompleteCh != nil {
  1366. // Attachment may be in progress still so wait for
  1367. // attachment to complete.
  1368. select {
  1369. case <-attacher.attachCompleteCh:
  1370. case <-ctx.Done():
  1371. return ctx.Err()
  1372. }
  1373. if attacher.taskID == taskID {
  1374. select {
  1375. case <-attacher.detachWaitCh:
  1376. case <-ctx.Done():
  1377. return ctx.Err()
  1378. }
  1379. }
  1380. }
  1381. return agent.ResourceAllocator().DetachNetwork(ctx, taskID)
  1382. }
  1383. // AttachNetwork generates an attachment request towards the manager.
  1384. func (c *Cluster) AttachNetwork(target string, containerID string, addresses []string) (*network.NetworkingConfig, error) {
  1385. aKey := attacherKey(target, containerID)
  1386. c.Lock()
  1387. if c.node == nil || c.node.Agent() == nil {
  1388. c.Unlock()
  1389. return nil, fmt.Errorf("invalid cluster node while attaching to network")
  1390. }
  1391. if attacher, ok := c.attachers[aKey]; ok {
  1392. c.Unlock()
  1393. return attacher.config, nil
  1394. }
  1395. agent := c.node.Agent()
  1396. attachWaitCh := make(chan *network.NetworkingConfig)
  1397. detachWaitCh := make(chan struct{})
  1398. attachCompleteCh := make(chan struct{})
  1399. c.attachers[aKey] = &attacher{
  1400. attachWaitCh: attachWaitCh,
  1401. attachCompleteCh: attachCompleteCh,
  1402. detachWaitCh: detachWaitCh,
  1403. }
  1404. c.Unlock()
  1405. ctx, cancel := c.getRequestContext()
  1406. defer cancel()
  1407. taskID, err := agent.ResourceAllocator().AttachNetwork(ctx, containerID, target, addresses)
  1408. if err != nil {
  1409. c.Lock()
  1410. delete(c.attachers, aKey)
  1411. c.Unlock()
  1412. return nil, fmt.Errorf("Could not attach to network %s: %v", target, err)
  1413. }
  1414. c.Lock()
  1415. c.attachers[aKey].taskID = taskID
  1416. close(attachCompleteCh)
  1417. c.Unlock()
  1418. logrus.Debugf("Successfully attached to network %s with tid %s", target, taskID)
  1419. var config *network.NetworkingConfig
  1420. select {
  1421. case config = <-attachWaitCh:
  1422. case <-ctx.Done():
  1423. return nil, fmt.Errorf("attaching to network failed, make sure your network options are correct and check manager logs: %v", ctx.Err())
  1424. }
  1425. c.Lock()
  1426. c.attachers[aKey].config = config
  1427. c.Unlock()
  1428. return config, nil
  1429. }
  1430. // DetachNetwork unblocks the waiters waiting on WaitForDetachment so
  1431. // that a request to detach can be generated towards the manager.
  1432. func (c *Cluster) DetachNetwork(target string, containerID string) error {
  1433. aKey := attacherKey(target, containerID)
  1434. c.Lock()
  1435. attacher, ok := c.attachers[aKey]
  1436. delete(c.attachers, aKey)
  1437. c.Unlock()
  1438. if !ok {
  1439. return fmt.Errorf("could not find network attachment for container %s to network %s", containerID, target)
  1440. }
  1441. close(attacher.detachWaitCh)
  1442. return nil
  1443. }
  1444. // CreateNetwork creates a new cluster managed network.
  1445. func (c *Cluster) CreateNetwork(s apitypes.NetworkCreateRequest) (string, error) {
  1446. c.RLock()
  1447. defer c.RUnlock()
  1448. if !c.isActiveManager() {
  1449. return "", c.errNoManager()
  1450. }
  1451. if runconfig.IsPreDefinedNetwork(s.Name) {
  1452. err := fmt.Errorf("%s is a pre-defined network and cannot be created", s.Name)
  1453. return "", apierrors.NewRequestForbiddenError(err)
  1454. }
  1455. ctx, cancel := c.getRequestContext()
  1456. defer cancel()
  1457. networkSpec := convert.BasicNetworkCreateToGRPC(s)
  1458. r, err := c.client.CreateNetwork(ctx, &swarmapi.CreateNetworkRequest{Spec: &networkSpec})
  1459. if err != nil {
  1460. return "", err
  1461. }
  1462. return r.Network.ID, nil
  1463. }
  1464. // RemoveNetwork removes a cluster network.
  1465. func (c *Cluster) RemoveNetwork(input string) error {
  1466. c.RLock()
  1467. defer c.RUnlock()
  1468. if !c.isActiveManager() {
  1469. return c.errNoManager()
  1470. }
  1471. ctx, cancel := c.getRequestContext()
  1472. defer cancel()
  1473. network, err := getNetwork(ctx, c.client, input)
  1474. if err != nil {
  1475. return err
  1476. }
  1477. if _, err := c.client.RemoveNetwork(ctx, &swarmapi.RemoveNetworkRequest{NetworkID: network.ID}); err != nil {
  1478. return err
  1479. }
  1480. return nil
  1481. }
  1482. func (c *Cluster) populateNetworkID(ctx context.Context, client swarmapi.ControlClient, s *types.ServiceSpec) error {
  1483. // Always prefer NetworkAttachmentConfigs from TaskTemplate
  1484. // but fallback to service spec for backward compatibility
  1485. networks := s.TaskTemplate.Networks
  1486. if len(networks) == 0 {
  1487. networks = s.Networks
  1488. }
  1489. for i, n := range networks {
  1490. apiNetwork, err := getNetwork(ctx, client, n.Target)
  1491. if err != nil {
  1492. if ln, _ := c.config.Backend.FindNetwork(n.Target); ln != nil && !ln.Info().Dynamic() {
  1493. err = fmt.Errorf("network %s is not eligible for docker services", ln.Name())
  1494. return apierrors.NewRequestForbiddenError(err)
  1495. }
  1496. return err
  1497. }
  1498. networks[i].Target = apiNetwork.ID
  1499. }
  1500. return nil
  1501. }
  1502. func getNetwork(ctx context.Context, c swarmapi.ControlClient, input string) (*swarmapi.Network, error) {
  1503. // GetNetwork to match via full ID.
  1504. rg, err := c.GetNetwork(ctx, &swarmapi.GetNetworkRequest{NetworkID: input})
  1505. if err != nil {
  1506. // If any error (including NotFound), ListNetworks to match via ID prefix and full name.
  1507. rl, err := c.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: &swarmapi.ListNetworksRequest_Filters{Names: []string{input}}})
  1508. if err != nil || len(rl.Networks) == 0 {
  1509. rl, err = c.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: &swarmapi.ListNetworksRequest_Filters{IDPrefixes: []string{input}}})
  1510. }
  1511. if err != nil {
  1512. return nil, err
  1513. }
  1514. if len(rl.Networks) == 0 {
  1515. return nil, fmt.Errorf("network %s not found", input)
  1516. }
  1517. if l := len(rl.Networks); l > 1 {
  1518. return nil, fmt.Errorf("network %s is ambiguous (%d matches found)", input, l)
  1519. }
  1520. return rl.Networks[0], nil
  1521. }
  1522. return rg.Network, nil
  1523. }
  1524. // Cleanup stops active swarm node. This is run before daemon shutdown.
  1525. func (c *Cluster) Cleanup() {
  1526. c.Lock()
  1527. node := c.node
  1528. if node == nil {
  1529. c.Unlock()
  1530. return
  1531. }
  1532. defer c.Unlock()
  1533. if c.isActiveManager() {
  1534. active, reachable, unreachable, err := c.managerStats()
  1535. if err == nil {
  1536. singlenode := active && isLastManager(reachable, unreachable)
  1537. if active && !singlenode && removingManagerCausesLossOfQuorum(reachable, unreachable) {
  1538. logrus.Errorf("Leaving cluster with %v managers left out of %v. Raft quorum will be lost.", reachable-1, reachable+unreachable)
  1539. }
  1540. }
  1541. }
  1542. c.stopNode()
  1543. }
  1544. func (c *Cluster) managerStats() (current bool, reachable int, unreachable int, err error) {
  1545. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  1546. defer cancel()
  1547. nodes, err := c.client.ListNodes(ctx, &swarmapi.ListNodesRequest{})
  1548. if err != nil {
  1549. return false, 0, 0, err
  1550. }
  1551. for _, n := range nodes.Nodes {
  1552. if n.ManagerStatus != nil {
  1553. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_REACHABLE {
  1554. reachable++
  1555. if n.ID == c.node.NodeID() {
  1556. current = true
  1557. }
  1558. }
  1559. if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_UNREACHABLE {
  1560. unreachable++
  1561. }
  1562. }
  1563. }
  1564. return
  1565. }
  1566. func validateAndSanitizeInitRequest(req *types.InitRequest) error {
  1567. var err error
  1568. req.ListenAddr, err = validateAddr(req.ListenAddr)
  1569. if err != nil {
  1570. return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
  1571. }
  1572. return nil
  1573. }
  1574. func validateAndSanitizeJoinRequest(req *types.JoinRequest) error {
  1575. var err error
  1576. req.ListenAddr, err = validateAddr(req.ListenAddr)
  1577. if err != nil {
  1578. return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
  1579. }
  1580. if len(req.RemoteAddrs) == 0 {
  1581. return fmt.Errorf("at least 1 RemoteAddr is required to join")
  1582. }
  1583. for i := range req.RemoteAddrs {
  1584. req.RemoteAddrs[i], err = validateAddr(req.RemoteAddrs[i])
  1585. if err != nil {
  1586. return fmt.Errorf("invalid remoteAddr %q: %v", req.RemoteAddrs[i], err)
  1587. }
  1588. }
  1589. return nil
  1590. }
  1591. func validateAddr(addr string) (string, error) {
  1592. if addr == "" {
  1593. return addr, fmt.Errorf("invalid empty address")
  1594. }
  1595. newaddr, err := opts.ParseTCPAddr(addr, defaultAddr)
  1596. if err != nil {
  1597. return addr, nil
  1598. }
  1599. return strings.TrimPrefix(newaddr, "tcp://"), nil
  1600. }
  1601. func initClusterSpec(node *node, spec types.Spec) error {
  1602. ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
  1603. for conn := range node.ListenControlSocket(ctx) {
  1604. if ctx.Err() != nil {
  1605. return ctx.Err()
  1606. }
  1607. if conn != nil {
  1608. client := swarmapi.NewControlClient(conn)
  1609. var cluster *swarmapi.Cluster
  1610. for i := 0; ; i++ {
  1611. lcr, err := client.ListClusters(ctx, &swarmapi.ListClustersRequest{})
  1612. if err != nil {
  1613. return fmt.Errorf("error on listing clusters: %v", err)
  1614. }
  1615. if len(lcr.Clusters) == 0 {
  1616. if i < 10 {
  1617. time.Sleep(200 * time.Millisecond)
  1618. continue
  1619. }
  1620. return fmt.Errorf("empty list of clusters was returned")
  1621. }
  1622. cluster = lcr.Clusters[0]
  1623. break
  1624. }
  1625. // In init, we take the initial default values from swarmkit, and merge
  1626. // any non nil or 0 value from spec to GRPC spec. This will leave the
  1627. // default value alone.
  1628. // Note that this is different from Update(), as in Update() we expect
  1629. // user to specify the complete spec of the cluster (as they already know
  1630. // the existing one and knows which field to update)
  1631. clusterSpec, err := convert.MergeSwarmSpecToGRPC(spec, cluster.Spec)
  1632. if err != nil {
  1633. return fmt.Errorf("error updating cluster settings: %v", err)
  1634. }
  1635. _, err = client.UpdateCluster(ctx, &swarmapi.UpdateClusterRequest{
  1636. ClusterID: cluster.ID,
  1637. ClusterVersion: &cluster.Meta.Version,
  1638. Spec: &clusterSpec,
  1639. })
  1640. if err != nil {
  1641. return fmt.Errorf("error updating cluster settings: %v", err)
  1642. }
  1643. return nil
  1644. }
  1645. }
  1646. return ctx.Err()
  1647. }
  1648. func detectLockedError(err error) error {
  1649. if err == swarmnode.ErrInvalidUnlockKey {
  1650. return errors.WithStack(ErrSwarmLocked)
  1651. }
  1652. return err
  1653. }