node.go 27 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057
  1. package node
  2. import (
  3. "bytes"
  4. "crypto/tls"
  5. "encoding/json"
  6. "io/ioutil"
  7. "net"
  8. "os"
  9. "path/filepath"
  10. "reflect"
  11. "sort"
  12. "sync"
  13. "time"
  14. "github.com/Sirupsen/logrus"
  15. "github.com/boltdb/bolt"
  16. "github.com/docker/docker/pkg/plugingetter"
  17. metrics "github.com/docker/go-metrics"
  18. "github.com/docker/swarmkit/agent"
  19. "github.com/docker/swarmkit/agent/exec"
  20. "github.com/docker/swarmkit/api"
  21. "github.com/docker/swarmkit/ca"
  22. "github.com/docker/swarmkit/connectionbroker"
  23. "github.com/docker/swarmkit/ioutils"
  24. "github.com/docker/swarmkit/log"
  25. "github.com/docker/swarmkit/manager"
  26. "github.com/docker/swarmkit/manager/encryption"
  27. "github.com/docker/swarmkit/remotes"
  28. "github.com/docker/swarmkit/watch"
  29. "github.com/docker/swarmkit/xnet"
  30. grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
  31. "github.com/pkg/errors"
  32. "golang.org/x/net/context"
  33. "google.golang.org/grpc"
  34. "google.golang.org/grpc/credentials"
  35. )
  36. const (
  37. stateFilename = "state.json"
  38. roleChangeTimeout = 16 * time.Second
  39. )
  40. var (
  41. nodeInfo metrics.LabeledGauge
  42. nodeManager metrics.Gauge
  43. errNodeStarted = errors.New("node: already started")
  44. errNodeNotStarted = errors.New("node: not started")
  45. certDirectory = "certificates"
  46. // ErrInvalidUnlockKey is returned when we can't decrypt the TLS certificate
  47. ErrInvalidUnlockKey = errors.New("node is locked, and needs a valid unlock key")
  48. )
  49. func init() {
  50. ns := metrics.NewNamespace("swarm", "node", nil)
  51. nodeInfo = ns.NewLabeledGauge("info", "Information related to the swarm", "",
  52. "swarm_id",
  53. "node_id",
  54. )
  55. nodeManager = ns.NewGauge("manager", "Whether this node is a manager or not", "")
  56. metrics.Register(ns)
  57. }
  58. // Config provides values for a Node.
  59. type Config struct {
  60. // Hostname is the name of host for agent instance.
  61. Hostname string
  62. // JoinAddr specifies node that should be used for the initial connection to
  63. // other manager in cluster. This should be only one address and optional,
  64. // the actual remotes come from the stored state.
  65. JoinAddr string
  66. // StateDir specifies the directory the node uses to keep the state of the
  67. // remote managers and certificates.
  68. StateDir string
  69. // JoinToken is the token to be used on the first certificate request.
  70. JoinToken string
  71. // ExternalCAs is a list of CAs to which a manager node
  72. // will make certificate signing requests for node certificates.
  73. ExternalCAs []*api.ExternalCA
  74. // ForceNewCluster creates a new cluster from current raft state.
  75. ForceNewCluster bool
  76. // ListenControlAPI specifies address the control API should listen on.
  77. ListenControlAPI string
  78. // ListenRemoteAPI specifies the address for the remote API that agents
  79. // and raft members connect to.
  80. ListenRemoteAPI string
  81. // AdvertiseRemoteAPI specifies the address that should be advertised
  82. // for connections to the remote API (including the raft service).
  83. AdvertiseRemoteAPI string
  84. // Executor specifies the executor to use for the agent.
  85. Executor exec.Executor
  86. // ElectionTick defines the amount of ticks needed without
  87. // leader to trigger a new election
  88. ElectionTick uint32
  89. // HeartbeatTick defines the amount of ticks between each
  90. // heartbeat sent to other members for health-check purposes
  91. HeartbeatTick uint32
  92. // AutoLockManagers determines whether or not an unlock key will be generated
  93. // when bootstrapping a new cluster for the first time
  94. AutoLockManagers bool
  95. // UnlockKey is the key to unlock a node - used for decrypting at rest. This
  96. // only applies to nodes that have already joined a cluster.
  97. UnlockKey []byte
  98. // Availability allows a user to control the current scheduling status of a node
  99. Availability api.NodeSpec_Availability
  100. // PluginGetter provides access to docker's plugin inventory.
  101. PluginGetter plugingetter.PluginGetter
  102. }
  103. // Node implements the primary node functionality for a member of a swarm
  104. // cluster. Node handles workloads and may also run as a manager.
  105. type Node struct {
  106. sync.RWMutex
  107. config *Config
  108. remotes *persistentRemotes
  109. connBroker *connectionbroker.Broker
  110. role string
  111. roleCond *sync.Cond
  112. conn *grpc.ClientConn
  113. connCond *sync.Cond
  114. nodeID string
  115. started chan struct{}
  116. startOnce sync.Once
  117. stopped chan struct{}
  118. stopOnce sync.Once
  119. ready chan struct{} // closed when agent has completed registration and manager(if enabled) is ready to receive control requests
  120. closed chan struct{}
  121. err error
  122. agent *agent.Agent
  123. manager *manager.Manager
  124. notifyNodeChange chan *agent.NodeChanges // used by the agent to relay node updates from the dispatcher Session stream to (*Node).run
  125. unlockKey []byte
  126. }
  127. type lastSeenRole struct {
  128. role api.NodeRole
  129. }
  130. // observe notes the latest value of this node role, and returns true if it
  131. // is the first seen value, or is different from the most recently seen value.
  132. func (l *lastSeenRole) observe(newRole api.NodeRole) bool {
  133. changed := l.role != newRole
  134. l.role = newRole
  135. return changed
  136. }
  137. // RemoteAPIAddr returns address on which remote manager api listens.
  138. // Returns nil if node is not manager.
  139. func (n *Node) RemoteAPIAddr() (string, error) {
  140. n.RLock()
  141. defer n.RUnlock()
  142. if n.manager == nil {
  143. return "", errors.New("manager is not running")
  144. }
  145. addr := n.manager.Addr()
  146. if addr == "" {
  147. return "", errors.New("manager addr is not set")
  148. }
  149. return addr, nil
  150. }
  151. // New returns new Node instance.
  152. func New(c *Config) (*Node, error) {
  153. if err := os.MkdirAll(c.StateDir, 0700); err != nil {
  154. return nil, err
  155. }
  156. stateFile := filepath.Join(c.StateDir, stateFilename)
  157. dt, err := ioutil.ReadFile(stateFile)
  158. var p []api.Peer
  159. if err != nil && !os.IsNotExist(err) {
  160. return nil, err
  161. }
  162. if err == nil {
  163. if err := json.Unmarshal(dt, &p); err != nil {
  164. return nil, err
  165. }
  166. }
  167. n := &Node{
  168. remotes: newPersistentRemotes(stateFile, p...),
  169. role: ca.WorkerRole,
  170. config: c,
  171. started: make(chan struct{}),
  172. stopped: make(chan struct{}),
  173. closed: make(chan struct{}),
  174. ready: make(chan struct{}),
  175. notifyNodeChange: make(chan *agent.NodeChanges, 1),
  176. unlockKey: c.UnlockKey,
  177. }
  178. if n.config.JoinAddr != "" || n.config.ForceNewCluster {
  179. n.remotes = newPersistentRemotes(filepath.Join(n.config.StateDir, stateFilename))
  180. if n.config.JoinAddr != "" {
  181. n.remotes.Observe(api.Peer{Addr: n.config.JoinAddr}, remotes.DefaultObservationWeight)
  182. }
  183. }
  184. n.connBroker = connectionbroker.New(n.remotes)
  185. n.roleCond = sync.NewCond(n.RLocker())
  186. n.connCond = sync.NewCond(n.RLocker())
  187. return n, nil
  188. }
  189. // BindRemote starts a listener that exposes the remote API.
  190. func (n *Node) BindRemote(ctx context.Context, listenAddr string, advertiseAddr string) error {
  191. n.RLock()
  192. defer n.RUnlock()
  193. if n.manager == nil {
  194. return errors.New("manager is not running")
  195. }
  196. return n.manager.BindRemote(ctx, manager.RemoteAddrs{
  197. ListenAddr: listenAddr,
  198. AdvertiseAddr: advertiseAddr,
  199. })
  200. }
  201. // Start starts a node instance.
  202. func (n *Node) Start(ctx context.Context) error {
  203. err := errNodeStarted
  204. n.startOnce.Do(func() {
  205. close(n.started)
  206. go n.run(ctx)
  207. err = nil // clear error above, only once.
  208. })
  209. return err
  210. }
  211. func (n *Node) currentRole() api.NodeRole {
  212. n.Lock()
  213. currentRole := api.NodeRoleWorker
  214. if n.role == ca.ManagerRole {
  215. currentRole = api.NodeRoleManager
  216. }
  217. n.Unlock()
  218. return currentRole
  219. }
  220. func (n *Node) run(ctx context.Context) (err error) {
  221. defer func() {
  222. n.err = err
  223. close(n.closed)
  224. }()
  225. ctx, cancel := context.WithCancel(ctx)
  226. defer cancel()
  227. ctx = log.WithModule(ctx, "node")
  228. go func(ctx context.Context) {
  229. select {
  230. case <-ctx.Done():
  231. case <-n.stopped:
  232. cancel()
  233. }
  234. }(ctx)
  235. paths := ca.NewConfigPaths(filepath.Join(n.config.StateDir, certDirectory))
  236. securityConfig, err := n.loadSecurityConfig(ctx, paths)
  237. if err != nil {
  238. return err
  239. }
  240. renewer := ca.NewTLSRenewer(securityConfig, n.connBroker)
  241. ctx = log.WithLogger(ctx, log.G(ctx).WithField("node.id", n.NodeID()))
  242. taskDBPath := filepath.Join(n.config.StateDir, "worker", "tasks.db")
  243. if err := os.MkdirAll(filepath.Dir(taskDBPath), 0777); err != nil {
  244. return err
  245. }
  246. db, err := bolt.Open(taskDBPath, 0666, nil)
  247. if err != nil {
  248. return err
  249. }
  250. defer db.Close()
  251. agentDone := make(chan struct{})
  252. go func() {
  253. // lastNodeDesiredRole is the last-seen value of Node.Spec.DesiredRole,
  254. // used to make role changes "edge triggered" and avoid renewal loops.
  255. lastNodeDesiredRole := lastSeenRole{role: n.currentRole()}
  256. for {
  257. select {
  258. case <-agentDone:
  259. return
  260. case nodeChanges := <-n.notifyNodeChange:
  261. currentRole := n.currentRole()
  262. if nodeChanges.Node != nil {
  263. // This is a bit complex to be backward compatible with older CAs that
  264. // don't support the Node.Role field. They only use what's presently
  265. // called DesiredRole.
  266. // 1) If DesiredRole changes, kick off a certificate renewal. The renewal
  267. // is delayed slightly to give Role time to change as well if this is
  268. // a newer CA. If the certificate we get back doesn't have the expected
  269. // role, we continue renewing with exponential backoff.
  270. // 2) If the server is sending us IssuanceStateRotate, renew the cert as
  271. // requested by the CA.
  272. desiredRoleChanged := lastNodeDesiredRole.observe(nodeChanges.Node.Spec.DesiredRole)
  273. if desiredRoleChanged {
  274. switch nodeChanges.Node.Spec.DesiredRole {
  275. case api.NodeRoleManager:
  276. renewer.SetExpectedRole(ca.ManagerRole)
  277. case api.NodeRoleWorker:
  278. renewer.SetExpectedRole(ca.WorkerRole)
  279. }
  280. }
  281. if desiredRoleChanged || nodeChanges.Node.Certificate.Status.State == api.IssuanceStateRotate {
  282. renewer.Renew()
  283. }
  284. }
  285. if nodeChanges.RootCert != nil {
  286. // We only want to update the root CA if this is a worker node. Manager nodes directly watch the raft
  287. // store and update the root CA, with the necessary signer, from the raft store (since the managers
  288. // need the CA key as well to potentially issue new TLS certificates).
  289. if currentRole == api.NodeRoleManager || bytes.Equal(nodeChanges.RootCert, securityConfig.RootCA().Certs) {
  290. continue
  291. }
  292. newRootCA, err := ca.NewRootCA(nodeChanges.RootCert, nil, nil, ca.DefaultNodeCertExpiration, nil)
  293. if err != nil {
  294. log.G(ctx).WithError(err).Error("invalid new root certificate from the dispatcher")
  295. continue
  296. }
  297. if err := ca.SaveRootCA(newRootCA, paths.RootCA); err != nil {
  298. log.G(ctx).WithError(err).Error("could not save new root certificate from the dispatcher")
  299. continue
  300. }
  301. if err := securityConfig.UpdateRootCA(&newRootCA, newRootCA.Pool); err != nil {
  302. log.G(ctx).WithError(err).Error("could not use new root CA from dispatcher")
  303. continue
  304. }
  305. }
  306. }
  307. }
  308. }()
  309. var wg sync.WaitGroup
  310. wg.Add(3)
  311. nodeInfo.WithValues(
  312. securityConfig.ClientTLSCreds.Organization(),
  313. securityConfig.ClientTLSCreds.NodeID(),
  314. ).Set(1)
  315. if n.currentRole() == api.NodeRoleManager {
  316. nodeManager.Set(1)
  317. } else {
  318. nodeManager.Set(0)
  319. }
  320. updates := renewer.Start(ctx)
  321. go func() {
  322. for certUpdate := range updates {
  323. if certUpdate.Err != nil {
  324. logrus.Warnf("error renewing TLS certificate: %v", certUpdate.Err)
  325. continue
  326. }
  327. n.Lock()
  328. n.role = certUpdate.Role
  329. n.roleCond.Broadcast()
  330. n.Unlock()
  331. // Export the new role.
  332. if n.currentRole() == api.NodeRoleManager {
  333. nodeManager.Set(1)
  334. } else {
  335. nodeManager.Set(0)
  336. }
  337. }
  338. wg.Done()
  339. }()
  340. role := n.role
  341. managerReady := make(chan struct{})
  342. agentReady := make(chan struct{})
  343. var managerErr error
  344. var agentErr error
  345. go func() {
  346. managerErr = n.superviseManager(ctx, securityConfig, paths.RootCA, managerReady, renewer) // store err and loop
  347. wg.Done()
  348. cancel()
  349. }()
  350. go func() {
  351. agentErr = n.runAgent(ctx, db, securityConfig, agentReady)
  352. wg.Done()
  353. cancel()
  354. close(agentDone)
  355. }()
  356. go func() {
  357. <-agentReady
  358. if role == ca.ManagerRole {
  359. workerRole := make(chan struct{})
  360. waitRoleCtx, waitRoleCancel := context.WithCancel(ctx)
  361. go func() {
  362. if n.waitRole(waitRoleCtx, ca.WorkerRole) == nil {
  363. close(workerRole)
  364. }
  365. }()
  366. select {
  367. case <-managerReady:
  368. case <-workerRole:
  369. }
  370. waitRoleCancel()
  371. }
  372. close(n.ready)
  373. }()
  374. wg.Wait()
  375. if managerErr != nil && managerErr != context.Canceled {
  376. return managerErr
  377. }
  378. if agentErr != nil && agentErr != context.Canceled {
  379. return agentErr
  380. }
  381. return err
  382. }
  383. // Stop stops node execution
  384. func (n *Node) Stop(ctx context.Context) error {
  385. select {
  386. case <-n.started:
  387. default:
  388. return errNodeNotStarted
  389. }
  390. // ask agent to clean up assignments
  391. n.Lock()
  392. if n.agent != nil {
  393. if err := n.agent.Leave(ctx); err != nil {
  394. log.G(ctx).WithError(err).Error("agent failed to clean up assignments")
  395. }
  396. }
  397. n.Unlock()
  398. n.stopOnce.Do(func() {
  399. close(n.stopped)
  400. })
  401. select {
  402. case <-n.closed:
  403. return nil
  404. case <-ctx.Done():
  405. return ctx.Err()
  406. }
  407. }
  408. // Err returns the error that caused the node to shutdown or nil. Err blocks
  409. // until the node has fully shut down.
  410. func (n *Node) Err(ctx context.Context) error {
  411. select {
  412. case <-n.closed:
  413. return n.err
  414. case <-ctx.Done():
  415. return ctx.Err()
  416. }
  417. }
  418. func (n *Node) runAgent(ctx context.Context, db *bolt.DB, securityConfig *ca.SecurityConfig, ready chan<- struct{}) error {
  419. waitCtx, waitCancel := context.WithCancel(ctx)
  420. remotesCh := n.remotes.WaitSelect(ctx)
  421. controlCh := n.ListenControlSocket(waitCtx)
  422. waitPeer:
  423. for {
  424. select {
  425. case <-ctx.Done():
  426. break waitPeer
  427. case <-remotesCh:
  428. break waitPeer
  429. case conn := <-controlCh:
  430. if conn != nil {
  431. break waitPeer
  432. }
  433. }
  434. }
  435. waitCancel()
  436. select {
  437. case <-ctx.Done():
  438. return ctx.Err()
  439. default:
  440. }
  441. secChangeQueue := watch.NewQueue()
  442. defer secChangeQueue.Close()
  443. secChangesCh, secChangesCancel := secChangeQueue.Watch()
  444. defer secChangesCancel()
  445. securityConfig.SetWatch(secChangeQueue)
  446. rootCA := securityConfig.RootCA()
  447. issuer := securityConfig.IssuerInfo()
  448. a, err := agent.New(&agent.Config{
  449. Hostname: n.config.Hostname,
  450. ConnBroker: n.connBroker,
  451. Executor: n.config.Executor,
  452. DB: db,
  453. NotifyNodeChange: n.notifyNodeChange,
  454. NotifyTLSChange: secChangesCh,
  455. Credentials: securityConfig.ClientTLSCreds,
  456. NodeTLSInfo: &api.NodeTLSInfo{
  457. TrustRoot: rootCA.Certs,
  458. CertIssuerPublicKey: issuer.PublicKey,
  459. CertIssuerSubject: issuer.Subject,
  460. },
  461. })
  462. if err != nil {
  463. return err
  464. }
  465. if err := a.Start(ctx); err != nil {
  466. return err
  467. }
  468. n.Lock()
  469. n.agent = a
  470. n.Unlock()
  471. defer func() {
  472. n.Lock()
  473. n.agent = nil
  474. n.Unlock()
  475. }()
  476. go func() {
  477. <-a.Ready()
  478. close(ready)
  479. }()
  480. // todo: manually call stop on context cancellation?
  481. return a.Err(context.Background())
  482. }
  483. // Ready returns a channel that is closed after node's initialization has
  484. // completes for the first time.
  485. func (n *Node) Ready() <-chan struct{} {
  486. return n.ready
  487. }
  488. func (n *Node) setControlSocket(conn *grpc.ClientConn) {
  489. n.Lock()
  490. if n.conn != nil {
  491. n.conn.Close()
  492. }
  493. n.conn = conn
  494. n.connBroker.SetLocalConn(conn)
  495. n.connCond.Broadcast()
  496. n.Unlock()
  497. }
  498. // ListenControlSocket listens changes of a connection for managing the
  499. // cluster control api
  500. func (n *Node) ListenControlSocket(ctx context.Context) <-chan *grpc.ClientConn {
  501. c := make(chan *grpc.ClientConn, 1)
  502. n.RLock()
  503. conn := n.conn
  504. c <- conn
  505. done := make(chan struct{})
  506. go func() {
  507. select {
  508. case <-ctx.Done():
  509. n.connCond.Broadcast()
  510. case <-done:
  511. }
  512. }()
  513. go func() {
  514. defer close(c)
  515. defer close(done)
  516. defer n.RUnlock()
  517. for {
  518. select {
  519. case <-ctx.Done():
  520. return
  521. default:
  522. }
  523. if conn == n.conn {
  524. n.connCond.Wait()
  525. continue
  526. }
  527. conn = n.conn
  528. select {
  529. case c <- conn:
  530. case <-ctx.Done():
  531. return
  532. }
  533. }
  534. }()
  535. return c
  536. }
  537. // NodeID returns current node's ID. May be empty if not set.
  538. func (n *Node) NodeID() string {
  539. n.RLock()
  540. defer n.RUnlock()
  541. return n.nodeID
  542. }
  543. // Manager returns manager instance started by node. May be nil.
  544. func (n *Node) Manager() *manager.Manager {
  545. n.RLock()
  546. defer n.RUnlock()
  547. return n.manager
  548. }
  549. // Agent returns agent instance started by node. May be nil.
  550. func (n *Node) Agent() *agent.Agent {
  551. n.RLock()
  552. defer n.RUnlock()
  553. return n.agent
  554. }
  555. // IsStateDirty returns true if any objects have been added to raft which make
  556. // the state "dirty". Currently, the existence of any object other than the
  557. // default cluster or the local node implies a dirty state.
  558. func (n *Node) IsStateDirty() (bool, error) {
  559. n.RLock()
  560. defer n.RUnlock()
  561. if n.manager == nil {
  562. return false, errors.New("node is not a manager")
  563. }
  564. return n.manager.IsStateDirty()
  565. }
  566. // Remotes returns a list of known peers known to node.
  567. func (n *Node) Remotes() []api.Peer {
  568. weights := n.remotes.Weights()
  569. remotes := make([]api.Peer, 0, len(weights))
  570. for p := range weights {
  571. remotes = append(remotes, p)
  572. }
  573. return remotes
  574. }
  575. func (n *Node) loadSecurityConfig(ctx context.Context, paths *ca.SecurityConfigPaths) (*ca.SecurityConfig, error) {
  576. var securityConfig *ca.SecurityConfig
  577. krw := ca.NewKeyReadWriter(paths.Node, n.unlockKey, &manager.RaftDEKData{})
  578. if err := krw.Migrate(); err != nil {
  579. return nil, err
  580. }
  581. // Check if we already have a valid certificates on disk.
  582. rootCA, err := ca.GetLocalRootCA(paths.RootCA)
  583. if err != nil && err != ca.ErrNoLocalRootCA {
  584. return nil, err
  585. }
  586. if err == nil {
  587. // if forcing a new cluster, we allow the certificates to be expired - a new set will be generated
  588. securityConfig, err = ca.LoadSecurityConfig(ctx, rootCA, krw, n.config.ForceNewCluster)
  589. if err != nil {
  590. _, isInvalidKEK := errors.Cause(err).(ca.ErrInvalidKEK)
  591. if isInvalidKEK {
  592. return nil, ErrInvalidUnlockKey
  593. } else if !os.IsNotExist(err) {
  594. return nil, errors.Wrapf(err, "error while loading TLS certificate in %s", paths.Node.Cert)
  595. }
  596. }
  597. }
  598. if securityConfig == nil {
  599. if n.config.JoinAddr == "" {
  600. // if we're not joining a cluster, bootstrap a new one - and we have to set the unlock key
  601. n.unlockKey = nil
  602. if n.config.AutoLockManagers {
  603. n.unlockKey = encryption.GenerateSecretKey()
  604. }
  605. krw = ca.NewKeyReadWriter(paths.Node, n.unlockKey, &manager.RaftDEKData{})
  606. rootCA, err = ca.CreateRootCA(ca.DefaultRootCN)
  607. if err != nil {
  608. return nil, err
  609. }
  610. if err := ca.SaveRootCA(rootCA, paths.RootCA); err != nil {
  611. return nil, err
  612. }
  613. log.G(ctx).Debug("generated CA key and certificate")
  614. } else if err == ca.ErrNoLocalRootCA { // from previous error loading the root CA from disk
  615. rootCA, err = ca.DownloadRootCA(ctx, paths.RootCA, n.config.JoinToken, n.connBroker)
  616. if err != nil {
  617. return nil, err
  618. }
  619. log.G(ctx).Debug("downloaded CA certificate")
  620. }
  621. // Obtain new certs and setup TLS certificates renewal for this node:
  622. // - If certificates weren't present on disk, we call CreateSecurityConfig, which blocks
  623. // until a valid certificate has been issued.
  624. // - We wait for CreateSecurityConfig to finish since we need a certificate to operate.
  625. // Attempt to load certificate from disk
  626. securityConfig, err = ca.LoadSecurityConfig(ctx, rootCA, krw, n.config.ForceNewCluster)
  627. if err == nil {
  628. log.G(ctx).WithFields(logrus.Fields{
  629. "node.id": securityConfig.ClientTLSCreds.NodeID(),
  630. }).Debugf("loaded TLS certificate")
  631. } else {
  632. if _, ok := errors.Cause(err).(ca.ErrInvalidKEK); ok {
  633. return nil, ErrInvalidUnlockKey
  634. }
  635. log.G(ctx).WithError(err).Debugf("no node credentials found in: %s", krw.Target())
  636. securityConfig, err = rootCA.CreateSecurityConfig(ctx, krw, ca.CertificateRequestConfig{
  637. Token: n.config.JoinToken,
  638. Availability: n.config.Availability,
  639. ConnBroker: n.connBroker,
  640. })
  641. if err != nil {
  642. return nil, err
  643. }
  644. }
  645. }
  646. n.Lock()
  647. n.role = securityConfig.ClientTLSCreds.Role()
  648. n.nodeID = securityConfig.ClientTLSCreds.NodeID()
  649. n.roleCond.Broadcast()
  650. n.Unlock()
  651. return securityConfig, nil
  652. }
  653. func (n *Node) initManagerConnection(ctx context.Context, ready chan<- struct{}) error {
  654. opts := []grpc.DialOption{
  655. grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
  656. grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor),
  657. }
  658. insecureCreds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})
  659. opts = append(opts, grpc.WithTransportCredentials(insecureCreds))
  660. addr := n.config.ListenControlAPI
  661. opts = append(opts, grpc.WithDialer(
  662. func(addr string, timeout time.Duration) (net.Conn, error) {
  663. return xnet.DialTimeoutLocal(addr, timeout)
  664. }))
  665. conn, err := grpc.Dial(addr, opts...)
  666. if err != nil {
  667. return err
  668. }
  669. client := api.NewHealthClient(conn)
  670. for {
  671. resp, err := client.Check(ctx, &api.HealthCheckRequest{Service: "ControlAPI"})
  672. if err != nil {
  673. return err
  674. }
  675. if resp.Status == api.HealthCheckResponse_SERVING {
  676. break
  677. }
  678. time.Sleep(500 * time.Millisecond)
  679. }
  680. n.setControlSocket(conn)
  681. if ready != nil {
  682. close(ready)
  683. }
  684. return nil
  685. }
  686. func (n *Node) waitRole(ctx context.Context, role string) error {
  687. n.roleCond.L.Lock()
  688. if role == n.role {
  689. n.roleCond.L.Unlock()
  690. return nil
  691. }
  692. finishCh := make(chan struct{})
  693. defer close(finishCh)
  694. go func() {
  695. select {
  696. case <-finishCh:
  697. case <-ctx.Done():
  698. // call broadcast to shutdown this function
  699. n.roleCond.Broadcast()
  700. }
  701. }()
  702. defer n.roleCond.L.Unlock()
  703. for role != n.role {
  704. n.roleCond.Wait()
  705. select {
  706. case <-ctx.Done():
  707. return ctx.Err()
  708. default:
  709. }
  710. }
  711. return nil
  712. }
  713. func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig, rootPaths ca.CertPaths, ready chan struct{}, workerRole <-chan struct{}) (bool, error) {
  714. var remoteAPI *manager.RemoteAddrs
  715. if n.config.ListenRemoteAPI != "" {
  716. remoteAPI = &manager.RemoteAddrs{
  717. ListenAddr: n.config.ListenRemoteAPI,
  718. AdvertiseAddr: n.config.AdvertiseRemoteAPI,
  719. }
  720. }
  721. remoteAddr, _ := n.remotes.Select(n.NodeID())
  722. m, err := manager.New(&manager.Config{
  723. ForceNewCluster: n.config.ForceNewCluster,
  724. RemoteAPI: remoteAPI,
  725. ControlAPI: n.config.ListenControlAPI,
  726. SecurityConfig: securityConfig,
  727. ExternalCAs: n.config.ExternalCAs,
  728. JoinRaft: remoteAddr.Addr,
  729. StateDir: n.config.StateDir,
  730. HeartbeatTick: n.config.HeartbeatTick,
  731. ElectionTick: n.config.ElectionTick,
  732. AutoLockManagers: n.config.AutoLockManagers,
  733. UnlockKey: n.unlockKey,
  734. Availability: n.config.Availability,
  735. PluginGetter: n.config.PluginGetter,
  736. RootCAPaths: rootPaths,
  737. })
  738. if err != nil {
  739. return false, err
  740. }
  741. done := make(chan struct{})
  742. var runErr error
  743. go func(logger *logrus.Entry) {
  744. if err := m.Run(log.WithLogger(context.Background(), logger)); err != nil {
  745. runErr = err
  746. }
  747. close(done)
  748. }(log.G(ctx))
  749. var clearData bool
  750. defer func() {
  751. n.Lock()
  752. n.manager = nil
  753. n.Unlock()
  754. m.Stop(ctx, clearData)
  755. <-done
  756. n.setControlSocket(nil)
  757. }()
  758. n.Lock()
  759. n.manager = m
  760. n.Unlock()
  761. connCtx, connCancel := context.WithCancel(ctx)
  762. defer connCancel()
  763. go n.initManagerConnection(connCtx, ready)
  764. // wait for manager stop or for role change
  765. select {
  766. case <-done:
  767. return false, runErr
  768. case <-workerRole:
  769. log.G(ctx).Info("role changed to worker, stopping manager")
  770. clearData = true
  771. case <-m.RemovedFromRaft():
  772. log.G(ctx).Info("manager removed from raft cluster, stopping manager")
  773. clearData = true
  774. case <-ctx.Done():
  775. return false, ctx.Err()
  776. }
  777. return clearData, nil
  778. }
  779. func (n *Node) superviseManager(ctx context.Context, securityConfig *ca.SecurityConfig, rootPaths ca.CertPaths, ready chan struct{}, renewer *ca.TLSRenewer) error {
  780. for {
  781. if err := n.waitRole(ctx, ca.ManagerRole); err != nil {
  782. return err
  783. }
  784. workerRole := make(chan struct{})
  785. waitRoleCtx, waitRoleCancel := context.WithCancel(ctx)
  786. go func() {
  787. if n.waitRole(waitRoleCtx, ca.WorkerRole) == nil {
  788. close(workerRole)
  789. }
  790. }()
  791. wasRemoved, err := n.runManager(ctx, securityConfig, rootPaths, ready, workerRole)
  792. if err != nil {
  793. waitRoleCancel()
  794. return errors.Wrap(err, "manager stopped")
  795. }
  796. // If the manager stopped running and our role is still
  797. // "manager", it's possible that the manager was demoted and
  798. // the agent hasn't realized this yet. We should wait for the
  799. // role to change instead of restarting the manager immediately.
  800. err = func() error {
  801. timer := time.NewTimer(roleChangeTimeout)
  802. defer timer.Stop()
  803. defer waitRoleCancel()
  804. select {
  805. case <-timer.C:
  806. case <-workerRole:
  807. return nil
  808. case <-ctx.Done():
  809. return ctx.Err()
  810. }
  811. if !wasRemoved {
  812. log.G(ctx).Warn("failed to get worker role after manager stop, restarting manager")
  813. return nil
  814. }
  815. // We need to be extra careful about restarting the
  816. // manager. It may cause the node to wrongly join under
  817. // a new Raft ID. Since we didn't see a role change
  818. // yet, force a certificate renewal. If the certificate
  819. // comes back with a worker role, we know we shouldn't
  820. // restart the manager. However, if we don't see
  821. // workerRole get closed, it means we didn't switch to
  822. // a worker certificate, either because we couldn't
  823. // contact a working CA, or because we've been
  824. // re-promoted. In this case, we must assume we were
  825. // re-promoted, and restart the manager.
  826. log.G(ctx).Warn("failed to get worker role after manager stop, forcing certificate renewal")
  827. timer.Reset(roleChangeTimeout)
  828. renewer.Renew()
  829. // Now that the renewal request has been sent to the
  830. // renewal goroutine, wait for a change in role.
  831. select {
  832. case <-timer.C:
  833. log.G(ctx).Warn("failed to get worker role after manager stop, restarting manager")
  834. case <-workerRole:
  835. case <-ctx.Done():
  836. return ctx.Err()
  837. }
  838. return nil
  839. }()
  840. if err != nil {
  841. return err
  842. }
  843. ready = nil
  844. }
  845. }
  846. type persistentRemotes struct {
  847. sync.RWMutex
  848. c *sync.Cond
  849. remotes.Remotes
  850. storePath string
  851. lastSavedState []api.Peer
  852. }
  853. func newPersistentRemotes(f string, peers ...api.Peer) *persistentRemotes {
  854. pr := &persistentRemotes{
  855. storePath: f,
  856. Remotes: remotes.NewRemotes(peers...),
  857. }
  858. pr.c = sync.NewCond(pr.RLocker())
  859. return pr
  860. }
  861. func (s *persistentRemotes) Observe(peer api.Peer, weight int) {
  862. s.Lock()
  863. defer s.Unlock()
  864. s.Remotes.Observe(peer, weight)
  865. s.c.Broadcast()
  866. if err := s.save(); err != nil {
  867. logrus.Errorf("error writing cluster state file: %v", err)
  868. return
  869. }
  870. return
  871. }
  872. func (s *persistentRemotes) Remove(peers ...api.Peer) {
  873. s.Lock()
  874. defer s.Unlock()
  875. s.Remotes.Remove(peers...)
  876. if err := s.save(); err != nil {
  877. logrus.Errorf("error writing cluster state file: %v", err)
  878. return
  879. }
  880. return
  881. }
  882. func (s *persistentRemotes) save() error {
  883. weights := s.Weights()
  884. remotes := make([]api.Peer, 0, len(weights))
  885. for r := range weights {
  886. remotes = append(remotes, r)
  887. }
  888. sort.Sort(sortablePeers(remotes))
  889. if reflect.DeepEqual(remotes, s.lastSavedState) {
  890. return nil
  891. }
  892. dt, err := json.Marshal(remotes)
  893. if err != nil {
  894. return err
  895. }
  896. s.lastSavedState = remotes
  897. return ioutils.AtomicWriteFile(s.storePath, dt, 0600)
  898. }
  899. // WaitSelect waits until at least one remote becomes available and then selects one.
  900. func (s *persistentRemotes) WaitSelect(ctx context.Context) <-chan api.Peer {
  901. c := make(chan api.Peer, 1)
  902. s.RLock()
  903. done := make(chan struct{})
  904. go func() {
  905. select {
  906. case <-ctx.Done():
  907. s.c.Broadcast()
  908. case <-done:
  909. }
  910. }()
  911. go func() {
  912. defer s.RUnlock()
  913. defer close(c)
  914. defer close(done)
  915. for {
  916. if ctx.Err() != nil {
  917. return
  918. }
  919. p, err := s.Select()
  920. if err == nil {
  921. c <- p
  922. return
  923. }
  924. s.c.Wait()
  925. }
  926. }()
  927. return c
  928. }
  929. // sortablePeers is a sort wrapper for []api.Peer
  930. type sortablePeers []api.Peer
  931. func (sp sortablePeers) Less(i, j int) bool { return sp[i].NodeID < sp[j].NodeID }
  932. func (sp sortablePeers) Len() int { return len(sp) }
  933. func (sp sortablePeers) Swap(i, j int) { sp[i], sp[j] = sp[j], sp[i] }