node.go 37 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255
  1. package node
  2. import (
  3. "bytes"
  4. "crypto/tls"
  5. "encoding/json"
  6. "io/ioutil"
  7. "net"
  8. "os"
  9. "path/filepath"
  10. "reflect"
  11. "sort"
  12. "strings"
  13. "sync"
  14. "time"
  15. "github.com/boltdb/bolt"
  16. "github.com/docker/docker/pkg/plugingetter"
  17. metrics "github.com/docker/go-metrics"
  18. "github.com/docker/swarmkit/agent"
  19. "github.com/docker/swarmkit/agent/exec"
  20. "github.com/docker/swarmkit/api"
  21. "github.com/docker/swarmkit/ca"
  22. "github.com/docker/swarmkit/connectionbroker"
  23. "github.com/docker/swarmkit/ioutils"
  24. "github.com/docker/swarmkit/log"
  25. "github.com/docker/swarmkit/manager"
  26. "github.com/docker/swarmkit/manager/encryption"
  27. "github.com/docker/swarmkit/remotes"
  28. "github.com/docker/swarmkit/xnet"
  29. grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
  30. "github.com/pkg/errors"
  31. "github.com/sirupsen/logrus"
  32. "golang.org/x/net/context"
  33. "google.golang.org/grpc"
  34. "google.golang.org/grpc/codes"
  35. "google.golang.org/grpc/credentials"
  36. )
  37. const (
  38. stateFilename = "state.json"
  39. roleChangeTimeout = 16 * time.Second
  40. )
  41. var (
  42. nodeInfo metrics.LabeledGauge
  43. nodeManager metrics.Gauge
  44. errNodeStarted = errors.New("node: already started")
  45. errNodeNotStarted = errors.New("node: not started")
  46. certDirectory = "certificates"
  47. // ErrInvalidUnlockKey is returned when we can't decrypt the TLS certificate
  48. ErrInvalidUnlockKey = errors.New("node is locked, and needs a valid unlock key")
  49. )
  50. func init() {
  51. ns := metrics.NewNamespace("swarm", "node", nil)
  52. nodeInfo = ns.NewLabeledGauge("info", "Information related to the swarm", "",
  53. "swarm_id",
  54. "node_id",
  55. )
  56. nodeManager = ns.NewGauge("manager", "Whether this node is a manager or not", "")
  57. metrics.Register(ns)
  58. }
  59. // Config provides values for a Node.
  60. type Config struct {
  61. // Hostname is the name of host for agent instance.
  62. Hostname string
  63. // JoinAddr specifies node that should be used for the initial connection to
  64. // other manager in cluster. This should be only one address and optional,
  65. // the actual remotes come from the stored state.
  66. JoinAddr string
  67. // StateDir specifies the directory the node uses to keep the state of the
  68. // remote managers and certificates.
  69. StateDir string
  70. // JoinToken is the token to be used on the first certificate request.
  71. JoinToken string
  72. // ExternalCAs is a list of CAs to which a manager node
  73. // will make certificate signing requests for node certificates.
  74. ExternalCAs []*api.ExternalCA
  75. // ForceNewCluster creates a new cluster from current raft state.
  76. ForceNewCluster bool
  77. // ListenControlAPI specifies address the control API should listen on.
  78. ListenControlAPI string
  79. // ListenRemoteAPI specifies the address for the remote API that agents
  80. // and raft members connect to.
  81. ListenRemoteAPI string
  82. // AdvertiseRemoteAPI specifies the address that should be advertised
  83. // for connections to the remote API (including the raft service).
  84. AdvertiseRemoteAPI string
  85. // Executor specifies the executor to use for the agent.
  86. Executor exec.Executor
  87. // ElectionTick defines the amount of ticks needed without
  88. // leader to trigger a new election
  89. ElectionTick uint32
  90. // HeartbeatTick defines the amount of ticks between each
  91. // heartbeat sent to other members for health-check purposes
  92. HeartbeatTick uint32
  93. // AutoLockManagers determines whether or not an unlock key will be generated
  94. // when bootstrapping a new cluster for the first time
  95. AutoLockManagers bool
  96. // UnlockKey is the key to unlock a node - used for decrypting at rest. This
  97. // only applies to nodes that have already joined a cluster.
  98. UnlockKey []byte
  99. // Availability allows a user to control the current scheduling status of a node
  100. Availability api.NodeSpec_Availability
  101. // PluginGetter provides access to docker's plugin inventory.
  102. PluginGetter plugingetter.PluginGetter
  103. // FIPS is a boolean stating whether the node is FIPS enabled
  104. FIPS bool
  105. }
  106. // Node implements the primary node functionality for a member of a swarm
  107. // cluster. Node handles workloads and may also run as a manager.
  108. type Node struct {
  109. sync.RWMutex
  110. config *Config
  111. remotes *persistentRemotes
  112. connBroker *connectionbroker.Broker
  113. role string
  114. roleCond *sync.Cond
  115. conn *grpc.ClientConn
  116. connCond *sync.Cond
  117. nodeID string
  118. started chan struct{}
  119. startOnce sync.Once
  120. stopped chan struct{}
  121. stopOnce sync.Once
  122. ready chan struct{} // closed when agent has completed registration and manager(if enabled) is ready to receive control requests
  123. closed chan struct{}
  124. err error
  125. agent *agent.Agent
  126. manager *manager.Manager
  127. notifyNodeChange chan *agent.NodeChanges // used by the agent to relay node updates from the dispatcher Session stream to (*Node).run
  128. unlockKey []byte
  129. }
  130. type lastSeenRole struct {
  131. role api.NodeRole
  132. }
  133. // observe notes the latest value of this node role, and returns true if it
  134. // is the first seen value, or is different from the most recently seen value.
  135. func (l *lastSeenRole) observe(newRole api.NodeRole) bool {
  136. changed := l.role != newRole
  137. l.role = newRole
  138. return changed
  139. }
  140. // RemoteAPIAddr returns address on which remote manager api listens.
  141. // Returns nil if node is not manager.
  142. func (n *Node) RemoteAPIAddr() (string, error) {
  143. n.RLock()
  144. defer n.RUnlock()
  145. if n.manager == nil {
  146. return "", errors.New("manager is not running")
  147. }
  148. addr := n.manager.Addr()
  149. if addr == "" {
  150. return "", errors.New("manager addr is not set")
  151. }
  152. return addr, nil
  153. }
  154. // New returns new Node instance.
  155. func New(c *Config) (*Node, error) {
  156. if err := os.MkdirAll(c.StateDir, 0700); err != nil {
  157. return nil, err
  158. }
  159. stateFile := filepath.Join(c.StateDir, stateFilename)
  160. dt, err := ioutil.ReadFile(stateFile)
  161. var p []api.Peer
  162. if err != nil && !os.IsNotExist(err) {
  163. return nil, err
  164. }
  165. if err == nil {
  166. if err := json.Unmarshal(dt, &p); err != nil {
  167. return nil, err
  168. }
  169. }
  170. n := &Node{
  171. remotes: newPersistentRemotes(stateFile, p...),
  172. role: ca.WorkerRole,
  173. config: c,
  174. started: make(chan struct{}),
  175. stopped: make(chan struct{}),
  176. closed: make(chan struct{}),
  177. ready: make(chan struct{}),
  178. notifyNodeChange: make(chan *agent.NodeChanges, 1),
  179. unlockKey: c.UnlockKey,
  180. }
  181. if n.config.JoinAddr != "" || n.config.ForceNewCluster {
  182. n.remotes = newPersistentRemotes(filepath.Join(n.config.StateDir, stateFilename))
  183. if n.config.JoinAddr != "" {
  184. n.remotes.Observe(api.Peer{Addr: n.config.JoinAddr}, remotes.DefaultObservationWeight)
  185. }
  186. }
  187. n.connBroker = connectionbroker.New(n.remotes)
  188. n.roleCond = sync.NewCond(n.RLocker())
  189. n.connCond = sync.NewCond(n.RLocker())
  190. return n, nil
  191. }
  192. // BindRemote starts a listener that exposes the remote API.
  193. func (n *Node) BindRemote(ctx context.Context, listenAddr string, advertiseAddr string) error {
  194. n.RLock()
  195. defer n.RUnlock()
  196. if n.manager == nil {
  197. return errors.New("manager is not running")
  198. }
  199. return n.manager.BindRemote(ctx, manager.RemoteAddrs{
  200. ListenAddr: listenAddr,
  201. AdvertiseAddr: advertiseAddr,
  202. })
  203. }
  204. // Start starts a node instance.
  205. func (n *Node) Start(ctx context.Context) error {
  206. err := errNodeStarted
  207. n.startOnce.Do(func() {
  208. close(n.started)
  209. go n.run(ctx)
  210. err = nil // clear error above, only once.
  211. })
  212. return err
  213. }
  214. func (n *Node) currentRole() api.NodeRole {
  215. n.Lock()
  216. currentRole := api.NodeRoleWorker
  217. if n.role == ca.ManagerRole {
  218. currentRole = api.NodeRoleManager
  219. }
  220. n.Unlock()
  221. return currentRole
  222. }
  223. func (n *Node) run(ctx context.Context) (err error) {
  224. defer func() {
  225. n.err = err
  226. // close the n.closed channel to indicate that the Node has completely
  227. // terminated
  228. close(n.closed)
  229. }()
  230. ctx, cancel := context.WithCancel(ctx)
  231. defer cancel()
  232. ctx = log.WithModule(ctx, "node")
  233. // set up a goroutine to monitor the stop channel, and cancel the run
  234. // context when the node is stopped
  235. go func(ctx context.Context) {
  236. select {
  237. case <-ctx.Done():
  238. case <-n.stopped:
  239. cancel()
  240. }
  241. }(ctx)
  242. // First thing's first: get the SecurityConfig for this node. This includes
  243. // the certificate information, and the root CA. It also returns a cancel
  244. // function. This is needed because the SecurityConfig is a live object,
  245. // and provides a watch queue so that caller can observe changes to the
  246. // security config. This watch queue has to be closed, which is done by the
  247. // secConfigCancel function.
  248. //
  249. // It's also noteworthy that loading the security config with the node's
  250. // loadSecurityConfig method has the side effect of setting the node's ID
  251. // and role fields, meaning it isn't until after that point that node knows
  252. // its ID
  253. paths := ca.NewConfigPaths(filepath.Join(n.config.StateDir, certDirectory))
  254. securityConfig, secConfigCancel, err := n.loadSecurityConfig(ctx, paths)
  255. if err != nil {
  256. return err
  257. }
  258. defer secConfigCancel()
  259. // Now that we have the security config, we can get a TLSRenewer, which is
  260. // a live component handling certificate rotation.
  261. renewer := ca.NewTLSRenewer(securityConfig, n.connBroker, paths.RootCA)
  262. // Now that we have the security goop all loaded, we know the Node's ID and
  263. // can add that to our logging context.
  264. ctx = log.WithLogger(ctx, log.G(ctx).WithField("node.id", n.NodeID()))
  265. // Next, set up the task database. The task database is used by the agent
  266. // to keep a persistent local record of its tasks. Since every manager also
  267. // has an agent, every node needs a task database, so we do this regardless
  268. // of role.
  269. taskDBPath := filepath.Join(n.config.StateDir, "worker", "tasks.db")
  270. // Doing os.MkdirAll will create the necessary directory path for the task
  271. // database if it doesn't already exist, and if it does already exist, no
  272. // error will be returned, so we use this regardless of whether this node
  273. // is new or not.
  274. if err := os.MkdirAll(filepath.Dir(taskDBPath), 0777); err != nil {
  275. return err
  276. }
  277. db, err := bolt.Open(taskDBPath, 0666, nil)
  278. if err != nil {
  279. return err
  280. }
  281. defer db.Close()
  282. // agentDone is a channel that represents the agent having exited. We start
  283. // the agent in a goroutine a few blocks down, and before that goroutine
  284. // exits, it closes this channel to signal to the goroutine just below to
  285. // terminate.
  286. agentDone := make(chan struct{})
  287. // This goroutine is the node changes loop. The n.notifyNodeChange
  288. // channel is passed to the agent. When an new node object gets sent down
  289. // to the agent, it gets passed back up to this node object, so that we can
  290. // check if a role update or a root certificate rotation is required. This
  291. // handles root rotation, but the renewer handles regular certification
  292. // rotation.
  293. go func() {
  294. // lastNodeDesiredRole is the last-seen value of Node.Spec.DesiredRole,
  295. // used to make role changes "edge triggered" and avoid renewal loops.
  296. lastNodeDesiredRole := lastSeenRole{role: n.currentRole()}
  297. for {
  298. select {
  299. case <-agentDone:
  300. return
  301. case nodeChanges := <-n.notifyNodeChange:
  302. if nodeChanges.Node != nil {
  303. // This is a bit complex to be backward compatible with older CAs that
  304. // don't support the Node.Role field. They only use what's presently
  305. // called DesiredRole.
  306. // 1) If DesiredRole changes, kick off a certificate renewal. The renewal
  307. // is delayed slightly to give Role time to change as well if this is
  308. // a newer CA. If the certificate we get back doesn't have the expected
  309. // role, we continue renewing with exponential backoff.
  310. // 2) If the server is sending us IssuanceStateRotate, renew the cert as
  311. // requested by the CA.
  312. desiredRoleChanged := lastNodeDesiredRole.observe(nodeChanges.Node.Spec.DesiredRole)
  313. if desiredRoleChanged {
  314. switch nodeChanges.Node.Spec.DesiredRole {
  315. case api.NodeRoleManager:
  316. renewer.SetExpectedRole(ca.ManagerRole)
  317. case api.NodeRoleWorker:
  318. renewer.SetExpectedRole(ca.WorkerRole)
  319. }
  320. }
  321. if desiredRoleChanged || nodeChanges.Node.Certificate.Status.State == api.IssuanceStateRotate {
  322. renewer.Renew()
  323. }
  324. }
  325. if nodeChanges.RootCert != nil {
  326. if bytes.Equal(nodeChanges.RootCert, securityConfig.RootCA().Certs) {
  327. continue
  328. }
  329. newRootCA, err := ca.NewRootCA(nodeChanges.RootCert, nil, nil, ca.DefaultNodeCertExpiration, nil)
  330. if err != nil {
  331. log.G(ctx).WithError(err).Error("invalid new root certificate from the dispatcher")
  332. continue
  333. }
  334. if err := securityConfig.UpdateRootCA(&newRootCA); err != nil {
  335. log.G(ctx).WithError(err).Error("could not use new root CA from dispatcher")
  336. continue
  337. }
  338. if err := ca.SaveRootCA(newRootCA, paths.RootCA); err != nil {
  339. log.G(ctx).WithError(err).Error("could not save new root certificate from the dispatcher")
  340. continue
  341. }
  342. }
  343. }
  344. }
  345. }()
  346. // Now we're going to launch the main component goroutines, the Agent, the
  347. // Manager (maybe) and the certificate updates loop. We shouldn't exit
  348. // the node object until all 3 of these components have terminated, so we
  349. // create a waitgroup to block termination of the node until then
  350. var wg sync.WaitGroup
  351. wg.Add(3)
  352. // These two blocks update some of the metrics settings.
  353. nodeInfo.WithValues(
  354. securityConfig.ClientTLSCreds.Organization(),
  355. securityConfig.ClientTLSCreds.NodeID(),
  356. ).Set(1)
  357. if n.currentRole() == api.NodeRoleManager {
  358. nodeManager.Set(1)
  359. } else {
  360. nodeManager.Set(0)
  361. }
  362. // We created the renewer way up when we were creating the SecurityConfig
  363. // at the beginning of run, but now we're ready to start receiving
  364. // CertificateUpdates, and launch a goroutine to handle this. Updates is a
  365. // channel we iterate containing the results of certificate renewals.
  366. updates := renewer.Start(ctx)
  367. go func() {
  368. for certUpdate := range updates {
  369. if certUpdate.Err != nil {
  370. logrus.Warnf("error renewing TLS certificate: %v", certUpdate.Err)
  371. continue
  372. }
  373. // Set the new role, and notify our waiting role changing logic
  374. // that the role has changed.
  375. n.Lock()
  376. n.role = certUpdate.Role
  377. n.roleCond.Broadcast()
  378. n.Unlock()
  379. // Export the new role for metrics
  380. if n.currentRole() == api.NodeRoleManager {
  381. nodeManager.Set(1)
  382. } else {
  383. nodeManager.Set(0)
  384. }
  385. }
  386. wg.Done()
  387. }()
  388. // and, finally, start the two main components: the manager and the agent
  389. role := n.role
  390. // Channels to signal when these respective components are up and ready to
  391. // go.
  392. managerReady := make(chan struct{})
  393. agentReady := make(chan struct{})
  394. // these variables are defined in this scope so that they're closed on by
  395. // respective goroutines below.
  396. var managerErr error
  397. var agentErr error
  398. go func() {
  399. // superviseManager is a routine that watches our manager role
  400. managerErr = n.superviseManager(ctx, securityConfig, paths.RootCA, managerReady, renewer) // store err and loop
  401. wg.Done()
  402. cancel()
  403. }()
  404. go func() {
  405. agentErr = n.runAgent(ctx, db, securityConfig, agentReady)
  406. wg.Done()
  407. cancel()
  408. close(agentDone)
  409. }()
  410. // This goroutine is what signals that the node has fully started by
  411. // closing the n.ready channel. First, it waits for the agent to start.
  412. // Then, if this node is a manager, it will wait on either the manager
  413. // starting, or the node role changing. This ensures that if the node is
  414. // demoted before the manager starts, it doesn't get stuck.
  415. go func() {
  416. <-agentReady
  417. if role == ca.ManagerRole {
  418. workerRole := make(chan struct{})
  419. waitRoleCtx, waitRoleCancel := context.WithCancel(ctx)
  420. go func() {
  421. if n.waitRole(waitRoleCtx, ca.WorkerRole) == nil {
  422. close(workerRole)
  423. }
  424. }()
  425. select {
  426. case <-managerReady:
  427. case <-workerRole:
  428. }
  429. waitRoleCancel()
  430. }
  431. close(n.ready)
  432. }()
  433. // And, finally, we park and wait for the node to close up. If we get any
  434. // error other than context canceled, we return it.
  435. wg.Wait()
  436. if managerErr != nil && errors.Cause(managerErr) != context.Canceled {
  437. return managerErr
  438. }
  439. if agentErr != nil && errors.Cause(agentErr) != context.Canceled {
  440. return agentErr
  441. }
  442. // NOTE(dperny): we return err here, but the last time I can see err being
  443. // set is when we open the boltdb way up in this method, so I don't know
  444. // what returning err is supposed to do.
  445. return err
  446. }
  447. // Stop stops node execution
  448. func (n *Node) Stop(ctx context.Context) error {
  449. select {
  450. case <-n.started:
  451. default:
  452. return errNodeNotStarted
  453. }
  454. // ask agent to clean up assignments
  455. n.Lock()
  456. if n.agent != nil {
  457. if err := n.agent.Leave(ctx); err != nil {
  458. log.G(ctx).WithError(err).Error("agent failed to clean up assignments")
  459. }
  460. }
  461. n.Unlock()
  462. n.stopOnce.Do(func() {
  463. close(n.stopped)
  464. })
  465. select {
  466. case <-n.closed:
  467. return nil
  468. case <-ctx.Done():
  469. return ctx.Err()
  470. }
  471. }
  472. // Err returns the error that caused the node to shutdown or nil. Err blocks
  473. // until the node has fully shut down.
  474. func (n *Node) Err(ctx context.Context) error {
  475. select {
  476. case <-n.closed:
  477. return n.err
  478. case <-ctx.Done():
  479. return ctx.Err()
  480. }
  481. }
  482. // runAgent starts the node's agent. When the agent has started, the provided
  483. // ready channel is closed. When the agent exits, this will return the error
  484. // that caused it.
  485. func (n *Node) runAgent(ctx context.Context, db *bolt.DB, securityConfig *ca.SecurityConfig, ready chan<- struct{}) error {
  486. // First, get a channel for knowing when a remote peer has been selected.
  487. // The value returned from the remotesCh is ignored, we just need to know
  488. // when the peer is selected
  489. remotesCh := n.remotes.WaitSelect(ctx)
  490. // then, we set up a new context to pass specifically to
  491. // ListenControlSocket, and start that method to wait on a connection on
  492. // the cluster control API.
  493. waitCtx, waitCancel := context.WithCancel(ctx)
  494. controlCh := n.ListenControlSocket(waitCtx)
  495. // The goal here to wait either until we have a remote peer selected, or
  496. // connection to the control
  497. // socket. These are both ways to connect the
  498. // agent to a manager, and we need to wait until one or the other is
  499. // available to start the agent
  500. waitPeer:
  501. for {
  502. select {
  503. case <-ctx.Done():
  504. break waitPeer
  505. case <-remotesCh:
  506. break waitPeer
  507. case conn := <-controlCh:
  508. // conn will probably be nil the first time we call this, probably,
  509. // but only a non-nil conn represent an actual connection.
  510. if conn != nil {
  511. break waitPeer
  512. }
  513. }
  514. }
  515. // We can stop listening for new control socket connections once we're
  516. // ready
  517. waitCancel()
  518. // NOTE(dperny): not sure why we need to recheck the context here. I guess
  519. // it avoids a race if the context was canceled at the same time that a
  520. // connection or peer was available. I think it's just an optimization.
  521. select {
  522. case <-ctx.Done():
  523. return ctx.Err()
  524. default:
  525. }
  526. // Now we can go ahead and configure, create, and start the agent.
  527. secChangesCh, secChangesCancel := securityConfig.Watch()
  528. defer secChangesCancel()
  529. rootCA := securityConfig.RootCA()
  530. issuer := securityConfig.IssuerInfo()
  531. agentConfig := &agent.Config{
  532. Hostname: n.config.Hostname,
  533. ConnBroker: n.connBroker,
  534. Executor: n.config.Executor,
  535. DB: db,
  536. NotifyNodeChange: n.notifyNodeChange,
  537. NotifyTLSChange: secChangesCh,
  538. Credentials: securityConfig.ClientTLSCreds,
  539. NodeTLSInfo: &api.NodeTLSInfo{
  540. TrustRoot: rootCA.Certs,
  541. CertIssuerPublicKey: issuer.PublicKey,
  542. CertIssuerSubject: issuer.Subject,
  543. },
  544. FIPS: n.config.FIPS,
  545. }
  546. // if a join address has been specified, then if the agent fails to connect
  547. // due to a TLS error, fail fast - don't keep re-trying to join
  548. if n.config.JoinAddr != "" {
  549. agentConfig.SessionTracker = &firstSessionErrorTracker{}
  550. }
  551. a, err := agent.New(agentConfig)
  552. if err != nil {
  553. return err
  554. }
  555. if err := a.Start(ctx); err != nil {
  556. return err
  557. }
  558. n.Lock()
  559. n.agent = a
  560. n.Unlock()
  561. defer func() {
  562. n.Lock()
  563. n.agent = nil
  564. n.Unlock()
  565. }()
  566. // when the agent indicates that it is ready, we close the ready channel.
  567. go func() {
  568. <-a.Ready()
  569. close(ready)
  570. }()
  571. // todo: manually call stop on context cancellation?
  572. return a.Err(context.Background())
  573. }
  574. // Ready returns a channel that is closed after node's initialization has
  575. // completes for the first time.
  576. func (n *Node) Ready() <-chan struct{} {
  577. return n.ready
  578. }
  579. func (n *Node) setControlSocket(conn *grpc.ClientConn) {
  580. n.Lock()
  581. if n.conn != nil {
  582. n.conn.Close()
  583. }
  584. n.conn = conn
  585. n.connBroker.SetLocalConn(conn)
  586. n.connCond.Broadcast()
  587. n.Unlock()
  588. }
  589. // ListenControlSocket listens changes of a connection for managing the
  590. // cluster control api
  591. func (n *Node) ListenControlSocket(ctx context.Context) <-chan *grpc.ClientConn {
  592. c := make(chan *grpc.ClientConn, 1)
  593. n.RLock()
  594. conn := n.conn
  595. c <- conn
  596. done := make(chan struct{})
  597. go func() {
  598. select {
  599. case <-ctx.Done():
  600. n.connCond.Broadcast()
  601. case <-done:
  602. }
  603. }()
  604. go func() {
  605. defer close(c)
  606. defer close(done)
  607. defer n.RUnlock()
  608. for {
  609. select {
  610. case <-ctx.Done():
  611. return
  612. default:
  613. }
  614. if conn == n.conn {
  615. n.connCond.Wait()
  616. continue
  617. }
  618. conn = n.conn
  619. select {
  620. case c <- conn:
  621. case <-ctx.Done():
  622. return
  623. }
  624. }
  625. }()
  626. return c
  627. }
  628. // NodeID returns current node's ID. May be empty if not set.
  629. func (n *Node) NodeID() string {
  630. n.RLock()
  631. defer n.RUnlock()
  632. return n.nodeID
  633. }
  634. // Manager returns manager instance started by node. May be nil.
  635. func (n *Node) Manager() *manager.Manager {
  636. n.RLock()
  637. defer n.RUnlock()
  638. return n.manager
  639. }
  640. // Agent returns agent instance started by node. May be nil.
  641. func (n *Node) Agent() *agent.Agent {
  642. n.RLock()
  643. defer n.RUnlock()
  644. return n.agent
  645. }
  646. // IsStateDirty returns true if any objects have been added to raft which make
  647. // the state "dirty". Currently, the existence of any object other than the
  648. // default cluster or the local node implies a dirty state.
  649. func (n *Node) IsStateDirty() (bool, error) {
  650. n.RLock()
  651. defer n.RUnlock()
  652. if n.manager == nil {
  653. return false, errors.New("node is not a manager")
  654. }
  655. return n.manager.IsStateDirty()
  656. }
  657. // Remotes returns a list of known peers known to node.
  658. func (n *Node) Remotes() []api.Peer {
  659. weights := n.remotes.Weights()
  660. remotes := make([]api.Peer, 0, len(weights))
  661. for p := range weights {
  662. remotes = append(remotes, p)
  663. }
  664. return remotes
  665. }
  666. func (n *Node) loadSecurityConfig(ctx context.Context, paths *ca.SecurityConfigPaths) (*ca.SecurityConfig, func() error, error) {
  667. var (
  668. securityConfig *ca.SecurityConfig
  669. cancel func() error
  670. )
  671. krw := ca.NewKeyReadWriter(paths.Node, n.unlockKey, &manager.RaftDEKData{})
  672. if err := krw.Migrate(); err != nil {
  673. return nil, nil, err
  674. }
  675. // Check if we already have a valid certificates on disk.
  676. rootCA, err := ca.GetLocalRootCA(paths.RootCA)
  677. if err != nil && err != ca.ErrNoLocalRootCA {
  678. return nil, nil, err
  679. }
  680. if err == nil {
  681. // if forcing a new cluster, we allow the certificates to be expired - a new set will be generated
  682. securityConfig, cancel, err = ca.LoadSecurityConfig(ctx, rootCA, krw, n.config.ForceNewCluster)
  683. if err != nil {
  684. _, isInvalidKEK := errors.Cause(err).(ca.ErrInvalidKEK)
  685. if isInvalidKEK {
  686. return nil, nil, ErrInvalidUnlockKey
  687. } else if !os.IsNotExist(err) {
  688. return nil, nil, errors.Wrapf(err, "error while loading TLS certificate in %s", paths.Node.Cert)
  689. }
  690. }
  691. }
  692. if securityConfig == nil {
  693. if n.config.JoinAddr == "" {
  694. // if we're not joining a cluster, bootstrap a new one - and we have to set the unlock key
  695. n.unlockKey = nil
  696. if n.config.AutoLockManagers {
  697. n.unlockKey = encryption.GenerateSecretKey()
  698. }
  699. krw = ca.NewKeyReadWriter(paths.Node, n.unlockKey, &manager.RaftDEKData{})
  700. rootCA, err = ca.CreateRootCA(ca.DefaultRootCN)
  701. if err != nil {
  702. return nil, nil, err
  703. }
  704. if err := ca.SaveRootCA(rootCA, paths.RootCA); err != nil {
  705. return nil, nil, err
  706. }
  707. log.G(ctx).Debug("generated CA key and certificate")
  708. } else if err == ca.ErrNoLocalRootCA { // from previous error loading the root CA from disk
  709. rootCA, err = ca.DownloadRootCA(ctx, paths.RootCA, n.config.JoinToken, n.connBroker)
  710. if err != nil {
  711. return nil, nil, err
  712. }
  713. log.G(ctx).Debug("downloaded CA certificate")
  714. }
  715. // Obtain new certs and setup TLS certificates renewal for this node:
  716. // - If certificates weren't present on disk, we call CreateSecurityConfig, which blocks
  717. // until a valid certificate has been issued.
  718. // - We wait for CreateSecurityConfig to finish since we need a certificate to operate.
  719. // Attempt to load certificate from disk
  720. securityConfig, cancel, err = ca.LoadSecurityConfig(ctx, rootCA, krw, n.config.ForceNewCluster)
  721. if err == nil {
  722. log.G(ctx).WithFields(logrus.Fields{
  723. "node.id": securityConfig.ClientTLSCreds.NodeID(),
  724. }).Debugf("loaded TLS certificate")
  725. } else {
  726. if _, ok := errors.Cause(err).(ca.ErrInvalidKEK); ok {
  727. return nil, nil, ErrInvalidUnlockKey
  728. }
  729. log.G(ctx).WithError(err).Debugf("no node credentials found in: %s", krw.Target())
  730. securityConfig, cancel, err = rootCA.CreateSecurityConfig(ctx, krw, ca.CertificateRequestConfig{
  731. Token: n.config.JoinToken,
  732. Availability: n.config.Availability,
  733. ConnBroker: n.connBroker,
  734. })
  735. if err != nil {
  736. return nil, nil, err
  737. }
  738. }
  739. }
  740. n.Lock()
  741. n.role = securityConfig.ClientTLSCreds.Role()
  742. n.nodeID = securityConfig.ClientTLSCreds.NodeID()
  743. n.roleCond.Broadcast()
  744. n.Unlock()
  745. return securityConfig, cancel, nil
  746. }
  747. func (n *Node) initManagerConnection(ctx context.Context, ready chan<- struct{}) error {
  748. opts := []grpc.DialOption{
  749. grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
  750. grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor),
  751. }
  752. insecureCreds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})
  753. opts = append(opts, grpc.WithTransportCredentials(insecureCreds))
  754. addr := n.config.ListenControlAPI
  755. opts = append(opts, grpc.WithDialer(
  756. func(addr string, timeout time.Duration) (net.Conn, error) {
  757. return xnet.DialTimeoutLocal(addr, timeout)
  758. }))
  759. conn, err := grpc.Dial(addr, opts...)
  760. if err != nil {
  761. return err
  762. }
  763. client := api.NewHealthClient(conn)
  764. for {
  765. resp, err := client.Check(ctx, &api.HealthCheckRequest{Service: "ControlAPI"})
  766. if err != nil {
  767. return err
  768. }
  769. if resp.Status == api.HealthCheckResponse_SERVING {
  770. break
  771. }
  772. time.Sleep(500 * time.Millisecond)
  773. }
  774. n.setControlSocket(conn)
  775. if ready != nil {
  776. close(ready)
  777. }
  778. return nil
  779. }
  780. // waitRole takes a context and a role. it the blocks until the context is
  781. // canceled or the node's role updates to the provided role. returns nil when
  782. // the node has acquired the provided role, or ctx.Err() if the context is
  783. // canceled
  784. func (n *Node) waitRole(ctx context.Context, role string) error {
  785. n.roleCond.L.Lock()
  786. if role == n.role {
  787. n.roleCond.L.Unlock()
  788. return nil
  789. }
  790. finishCh := make(chan struct{})
  791. defer close(finishCh)
  792. go func() {
  793. select {
  794. case <-finishCh:
  795. case <-ctx.Done():
  796. // call broadcast to shutdown this function
  797. n.roleCond.Broadcast()
  798. }
  799. }()
  800. defer n.roleCond.L.Unlock()
  801. for role != n.role {
  802. n.roleCond.Wait()
  803. select {
  804. case <-ctx.Done():
  805. return ctx.Err()
  806. default:
  807. }
  808. }
  809. return nil
  810. }
  811. // runManager runs the manager on this node. It returns a boolean indicating if
  812. // the stoppage was due to a role change, and an error indicating why the
  813. // manager stopped
  814. func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig, rootPaths ca.CertPaths, ready chan struct{}, workerRole <-chan struct{}) (bool, error) {
  815. // First, set up this manager's advertise and listen addresses, if
  816. // provided. they might not be provided if this node is joining the cluster
  817. // instead of creating a new one.
  818. var remoteAPI *manager.RemoteAddrs
  819. if n.config.ListenRemoteAPI != "" {
  820. remoteAPI = &manager.RemoteAddrs{
  821. ListenAddr: n.config.ListenRemoteAPI,
  822. AdvertiseAddr: n.config.AdvertiseRemoteAPI,
  823. }
  824. }
  825. joinAddr := n.config.JoinAddr
  826. if joinAddr == "" {
  827. remoteAddr, err := n.remotes.Select(n.NodeID())
  828. if err == nil {
  829. joinAddr = remoteAddr.Addr
  830. }
  831. }
  832. m, err := manager.New(&manager.Config{
  833. ForceNewCluster: n.config.ForceNewCluster,
  834. RemoteAPI: remoteAPI,
  835. ControlAPI: n.config.ListenControlAPI,
  836. SecurityConfig: securityConfig,
  837. ExternalCAs: n.config.ExternalCAs,
  838. JoinRaft: joinAddr,
  839. ForceJoin: n.config.JoinAddr != "",
  840. StateDir: n.config.StateDir,
  841. HeartbeatTick: n.config.HeartbeatTick,
  842. ElectionTick: n.config.ElectionTick,
  843. AutoLockManagers: n.config.AutoLockManagers,
  844. UnlockKey: n.unlockKey,
  845. Availability: n.config.Availability,
  846. PluginGetter: n.config.PluginGetter,
  847. RootCAPaths: rootPaths,
  848. })
  849. if err != nil {
  850. return false, err
  851. }
  852. // The done channel is used to signal that the manager has exited.
  853. done := make(chan struct{})
  854. // runErr is an error value set by the goroutine that runs the manager
  855. var runErr error
  856. // The context used to start this might have a logger associated with it
  857. // that we'd like to reuse, but we don't want to use that context, so we
  858. // pass to the goroutine only the logger, and create a new context with
  859. //that logger.
  860. go func(logger *logrus.Entry) {
  861. if err := m.Run(log.WithLogger(context.Background(), logger)); err != nil {
  862. runErr = err
  863. }
  864. close(done)
  865. }(log.G(ctx))
  866. // clearData is set in the select below, and is used to signal why the
  867. // manager is stopping, and indicate whether or not to delete raft data and
  868. // keys when stopping the manager.
  869. var clearData bool
  870. defer func() {
  871. n.Lock()
  872. n.manager = nil
  873. n.Unlock()
  874. m.Stop(ctx, clearData)
  875. <-done
  876. n.setControlSocket(nil)
  877. }()
  878. n.Lock()
  879. n.manager = m
  880. n.Unlock()
  881. connCtx, connCancel := context.WithCancel(ctx)
  882. defer connCancel()
  883. // launch a goroutine that will manage our local connection to the manager
  884. // from the agent. Remember the managerReady channel created way back in
  885. // run? This is actually where we close it. Not when the manager starts,
  886. // but when a connection to the control socket has been established.
  887. go n.initManagerConnection(connCtx, ready)
  888. // wait for manager stop or for role change
  889. // The manager can be stopped one of 4 ways:
  890. // 1. The manager may have errored out and returned an error, closing the
  891. // done channel in the process
  892. // 2. The node may have been demoted to a worker. In this case, we're gonna
  893. // have to stop the manager ourselves, setting clearData to true so the
  894. // local raft data, certs, keys, etc, are nuked.
  895. // 3. The manager may have been booted from raft. This could happen if it's
  896. // removed from the raft quorum but the role update hasn't registered
  897. // yet. The fact that there is more than 1 code path to cause the
  898. // manager to exit is a possible source of bugs.
  899. // 4. The context may have been canceled from above, in which case we
  900. // should stop the manager ourselves, but indicate that this is NOT a
  901. // demotion.
  902. select {
  903. case <-done:
  904. return false, runErr
  905. case <-workerRole:
  906. log.G(ctx).Info("role changed to worker, stopping manager")
  907. clearData = true
  908. case <-m.RemovedFromRaft():
  909. log.G(ctx).Info("manager removed from raft cluster, stopping manager")
  910. clearData = true
  911. case <-ctx.Done():
  912. return false, ctx.Err()
  913. }
  914. return clearData, nil
  915. }
  916. // superviseManager controls whether or not we are running a manager on this
  917. // node
  918. func (n *Node) superviseManager(ctx context.Context, securityConfig *ca.SecurityConfig, rootPaths ca.CertPaths, ready chan struct{}, renewer *ca.TLSRenewer) error {
  919. // superviseManager is a loop, because we can come in and out of being a
  920. // manager, and need to appropriately handle that without disrupting the
  921. // node functionality.
  922. for {
  923. // if we're not a manager, we're just gonna park here and wait until we
  924. // are. For normal agent nodes, we'll stay here forever, as intended.
  925. if err := n.waitRole(ctx, ca.ManagerRole); err != nil {
  926. return err
  927. }
  928. // Once we know we are a manager, we get ourselves ready for when we
  929. // lose that role. we create a channel to signal that we've become a
  930. // worker, and close it when n.waitRole completes.
  931. workerRole := make(chan struct{})
  932. waitRoleCtx, waitRoleCancel := context.WithCancel(ctx)
  933. go func() {
  934. if n.waitRole(waitRoleCtx, ca.WorkerRole) == nil {
  935. close(workerRole)
  936. }
  937. }()
  938. // the ready channel passed to superviseManager is in turn passed down
  939. // to the runManager function. It's used to signal to the caller that
  940. // the manager has started.
  941. wasRemoved, err := n.runManager(ctx, securityConfig, rootPaths, ready, workerRole)
  942. if err != nil {
  943. waitRoleCancel()
  944. return errors.Wrap(err, "manager stopped")
  945. }
  946. // If the manager stopped running and our role is still
  947. // "manager", it's possible that the manager was demoted and
  948. // the agent hasn't realized this yet. We should wait for the
  949. // role to change instead of restarting the manager immediately.
  950. err = func() error {
  951. timer := time.NewTimer(roleChangeTimeout)
  952. defer timer.Stop()
  953. defer waitRoleCancel()
  954. select {
  955. case <-timer.C:
  956. case <-workerRole:
  957. return nil
  958. case <-ctx.Done():
  959. return ctx.Err()
  960. }
  961. if !wasRemoved {
  962. log.G(ctx).Warn("failed to get worker role after manager stop, restarting manager")
  963. return nil
  964. }
  965. // We need to be extra careful about restarting the
  966. // manager. It may cause the node to wrongly join under
  967. // a new Raft ID. Since we didn't see a role change
  968. // yet, force a certificate renewal. If the certificate
  969. // comes back with a worker role, we know we shouldn't
  970. // restart the manager. However, if we don't see
  971. // workerRole get closed, it means we didn't switch to
  972. // a worker certificate, either because we couldn't
  973. // contact a working CA, or because we've been
  974. // re-promoted. In this case, we must assume we were
  975. // re-promoted, and restart the manager.
  976. log.G(ctx).Warn("failed to get worker role after manager stop, forcing certificate renewal")
  977. timer.Reset(roleChangeTimeout)
  978. renewer.Renew()
  979. // Now that the renewal request has been sent to the
  980. // renewal goroutine, wait for a change in role.
  981. select {
  982. case <-timer.C:
  983. log.G(ctx).Warn("failed to get worker role after manager stop, restarting manager")
  984. case <-workerRole:
  985. case <-ctx.Done():
  986. return ctx.Err()
  987. }
  988. return nil
  989. }()
  990. if err != nil {
  991. return err
  992. }
  993. // set ready to nil after the first time we've gone through this, as we
  994. // don't need to signal after the first time that the manager is ready.
  995. ready = nil
  996. }
  997. }
  998. // DowngradeKey reverts the node key to older format so that it can
  999. // run on older version of swarmkit
  1000. func (n *Node) DowngradeKey() error {
  1001. paths := ca.NewConfigPaths(filepath.Join(n.config.StateDir, certDirectory))
  1002. krw := ca.NewKeyReadWriter(paths.Node, n.config.UnlockKey, nil)
  1003. return krw.DowngradeKey()
  1004. }
  1005. type persistentRemotes struct {
  1006. sync.RWMutex
  1007. c *sync.Cond
  1008. remotes.Remotes
  1009. storePath string
  1010. lastSavedState []api.Peer
  1011. }
  1012. func newPersistentRemotes(f string, peers ...api.Peer) *persistentRemotes {
  1013. pr := &persistentRemotes{
  1014. storePath: f,
  1015. Remotes: remotes.NewRemotes(peers...),
  1016. }
  1017. pr.c = sync.NewCond(pr.RLocker())
  1018. return pr
  1019. }
  1020. func (s *persistentRemotes) Observe(peer api.Peer, weight int) {
  1021. s.Lock()
  1022. defer s.Unlock()
  1023. s.Remotes.Observe(peer, weight)
  1024. s.c.Broadcast()
  1025. if err := s.save(); err != nil {
  1026. logrus.Errorf("error writing cluster state file: %v", err)
  1027. return
  1028. }
  1029. return
  1030. }
  1031. func (s *persistentRemotes) Remove(peers ...api.Peer) {
  1032. s.Lock()
  1033. defer s.Unlock()
  1034. s.Remotes.Remove(peers...)
  1035. if err := s.save(); err != nil {
  1036. logrus.Errorf("error writing cluster state file: %v", err)
  1037. return
  1038. }
  1039. return
  1040. }
  1041. func (s *persistentRemotes) save() error {
  1042. weights := s.Weights()
  1043. remotes := make([]api.Peer, 0, len(weights))
  1044. for r := range weights {
  1045. remotes = append(remotes, r)
  1046. }
  1047. sort.Sort(sortablePeers(remotes))
  1048. if reflect.DeepEqual(remotes, s.lastSavedState) {
  1049. return nil
  1050. }
  1051. dt, err := json.Marshal(remotes)
  1052. if err != nil {
  1053. return err
  1054. }
  1055. s.lastSavedState = remotes
  1056. return ioutils.AtomicWriteFile(s.storePath, dt, 0600)
  1057. }
  1058. // WaitSelect waits until at least one remote becomes available and then selects one.
  1059. func (s *persistentRemotes) WaitSelect(ctx context.Context) <-chan api.Peer {
  1060. c := make(chan api.Peer, 1)
  1061. s.RLock()
  1062. done := make(chan struct{})
  1063. go func() {
  1064. select {
  1065. case <-ctx.Done():
  1066. s.c.Broadcast()
  1067. case <-done:
  1068. }
  1069. }()
  1070. go func() {
  1071. defer s.RUnlock()
  1072. defer close(c)
  1073. defer close(done)
  1074. for {
  1075. if ctx.Err() != nil {
  1076. return
  1077. }
  1078. p, err := s.Select()
  1079. if err == nil {
  1080. c <- p
  1081. return
  1082. }
  1083. s.c.Wait()
  1084. }
  1085. }()
  1086. return c
  1087. }
  1088. // sortablePeers is a sort wrapper for []api.Peer
  1089. type sortablePeers []api.Peer
  1090. func (sp sortablePeers) Less(i, j int) bool { return sp[i].NodeID < sp[j].NodeID }
  1091. func (sp sortablePeers) Len() int { return len(sp) }
  1092. func (sp sortablePeers) Swap(i, j int) { sp[i], sp[j] = sp[j], sp[i] }
  1093. // firstSessionErrorTracker is a utility that helps determine whether the agent should exit after
  1094. // a TLS failure on establishing the first session. This should only happen if a join address
  1095. // is specified. If establishing the first session succeeds, but later on some session fails
  1096. // because of a TLS error, we don't want to exit the agent because a previously successful
  1097. // session indicates that the TLS error may be a transient issue.
  1098. type firstSessionErrorTracker struct {
  1099. mu sync.Mutex
  1100. pastFirstSession bool
  1101. err error
  1102. }
  1103. func (fs *firstSessionErrorTracker) SessionEstablished() {
  1104. fs.mu.Lock()
  1105. fs.pastFirstSession = true
  1106. fs.mu.Unlock()
  1107. }
  1108. func (fs *firstSessionErrorTracker) SessionError(err error) {
  1109. fs.mu.Lock()
  1110. fs.err = err
  1111. fs.mu.Unlock()
  1112. }
  1113. func (fs *firstSessionErrorTracker) SessionClosed() error {
  1114. fs.mu.Lock()
  1115. defer fs.mu.Unlock()
  1116. // unfortunately grpc connection errors are type grpc.rpcError, which are not exposed, and we can't get at the underlying error type
  1117. if !fs.pastFirstSession && grpc.Code(fs.err) == codes.Internal &&
  1118. strings.HasPrefix(grpc.ErrorDesc(fs.err), "connection error") && strings.Contains(grpc.ErrorDesc(fs.err), "transport: x509:") {
  1119. return fs.err
  1120. }
  1121. return nil
  1122. }