server.go 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917
  1. package ca
  2. import (
  3. "bytes"
  4. "crypto/subtle"
  5. "crypto/x509"
  6. "sync"
  7. "time"
  8. "github.com/docker/swarmkit/api"
  9. "github.com/docker/swarmkit/api/equality"
  10. "github.com/docker/swarmkit/identity"
  11. "github.com/docker/swarmkit/log"
  12. "github.com/docker/swarmkit/manager/state/store"
  13. gogotypes "github.com/gogo/protobuf/types"
  14. "github.com/pkg/errors"
  15. "github.com/sirupsen/logrus"
  16. "golang.org/x/net/context"
  17. "google.golang.org/grpc/codes"
  18. "google.golang.org/grpc/status"
  19. )
  20. const (
  21. defaultReconciliationRetryInterval = 10 * time.Second
  22. defaultRootReconciliationInterval = 3 * time.Second
  23. )
  24. // Server is the CA and NodeCA API gRPC server.
  25. // TODO(aaronl): At some point we may want to have separate implementations of
  26. // CA, NodeCA, and other hypothetical future CA services. At the moment,
  27. // breaking it apart doesn't seem worth it.
  28. type Server struct {
  29. mu sync.Mutex
  30. wg sync.WaitGroup
  31. ctx context.Context
  32. cancel func()
  33. store *store.MemoryStore
  34. securityConfig *SecurityConfig
  35. clusterID string
  36. localRootCA *RootCA
  37. externalCA *ExternalCA
  38. externalCAPool *x509.CertPool
  39. joinTokens *api.JoinTokens
  40. reconciliationRetryInterval time.Duration
  41. // pending is a map of nodes with pending certificates issuance or
  42. // renewal. They are indexed by node ID.
  43. pending map[string]*api.Node
  44. // started is a channel which gets closed once the server is running
  45. // and able to service RPCs.
  46. started chan struct{}
  47. // these are cached values to ensure we only update the security config when
  48. // the cluster root CA and external CAs have changed - the cluster object
  49. // can change for other reasons, and it would not be necessary to update
  50. // the security config as a result
  51. lastSeenClusterRootCA *api.RootCA
  52. lastSeenExternalCAs []*api.ExternalCA
  53. // This mutex protects the components of the CA server used to issue new certificates
  54. // (and any attributes used to update those components): `lastSeenClusterRootCA` and
  55. // `lastSeenExternalCA`, which are used to update `externalCA` and the `rootCA` object
  56. // of the SecurityConfig
  57. signingMu sync.Mutex
  58. // lets us monitor and finish root rotations
  59. rootReconciler *rootRotationReconciler
  60. rootReconciliationRetryInterval time.Duration
  61. }
  62. // DefaultCAConfig returns the default CA Config, with a default expiration.
  63. func DefaultCAConfig() api.CAConfig {
  64. return api.CAConfig{
  65. NodeCertExpiry: gogotypes.DurationProto(DefaultNodeCertExpiration),
  66. }
  67. }
  68. // NewServer creates a CA API server.
  69. func NewServer(store *store.MemoryStore, securityConfig *SecurityConfig) *Server {
  70. return &Server{
  71. store: store,
  72. securityConfig: securityConfig,
  73. localRootCA: securityConfig.RootCA(),
  74. externalCA: NewExternalCA(nil, nil),
  75. pending: make(map[string]*api.Node),
  76. started: make(chan struct{}),
  77. reconciliationRetryInterval: defaultReconciliationRetryInterval,
  78. rootReconciliationRetryInterval: defaultRootReconciliationInterval,
  79. clusterID: securityConfig.ClientTLSCreds.Organization(),
  80. }
  81. }
  82. // ExternalCA returns the current external CA - this is exposed to support unit testing only, and the external CA
  83. // should really be a private field
  84. func (s *Server) ExternalCA() *ExternalCA {
  85. s.signingMu.Lock()
  86. defer s.signingMu.Unlock()
  87. return s.externalCA
  88. }
  89. // RootCA returns the current local root CA - this is exposed to support unit testing only, and the root CA
  90. // should really be a private field
  91. func (s *Server) RootCA() *RootCA {
  92. s.signingMu.Lock()
  93. defer s.signingMu.Unlock()
  94. return s.localRootCA
  95. }
  96. // SetReconciliationRetryInterval changes the time interval between
  97. // reconciliation attempts. This function must be called before Run.
  98. func (s *Server) SetReconciliationRetryInterval(reconciliationRetryInterval time.Duration) {
  99. s.reconciliationRetryInterval = reconciliationRetryInterval
  100. }
  101. // SetRootReconciliationInterval changes the time interval between root rotation
  102. // reconciliation attempts. This function must be called before Run.
  103. func (s *Server) SetRootReconciliationInterval(interval time.Duration) {
  104. s.rootReconciliationRetryInterval = interval
  105. }
  106. // GetUnlockKey is responsible for returning the current unlock key used for encrypting TLS private keys and
  107. // other at rest data. Access to this RPC call should only be allowed via mutual TLS from managers.
  108. func (s *Server) GetUnlockKey(ctx context.Context, request *api.GetUnlockKeyRequest) (*api.GetUnlockKeyResponse, error) {
  109. // This directly queries the store, rather than storing the unlock key and version on
  110. // the `Server` object and updating it `updateCluster` is called, because we need this
  111. // API to return the latest version of the key. Otherwise, there might be a slight delay
  112. // between when the cluster gets updated, and when this function returns the latest key.
  113. // This delay is currently unacceptable because this RPC call is the only way, after a
  114. // cluster update, to get the actual value of the unlock key, and we don't want to return
  115. // a cached value.
  116. resp := api.GetUnlockKeyResponse{}
  117. s.store.View(func(tx store.ReadTx) {
  118. cluster := store.GetCluster(tx, s.clusterID)
  119. resp.Version = cluster.Meta.Version
  120. if cluster.Spec.EncryptionConfig.AutoLockManagers {
  121. for _, encryptionKey := range cluster.UnlockKeys {
  122. if encryptionKey.Subsystem == ManagerRole {
  123. resp.UnlockKey = encryptionKey.Key
  124. return
  125. }
  126. }
  127. }
  128. })
  129. return &resp, nil
  130. }
  131. // NodeCertificateStatus returns the current issuance status of an issuance request identified by the nodeID
  132. func (s *Server) NodeCertificateStatus(ctx context.Context, request *api.NodeCertificateStatusRequest) (*api.NodeCertificateStatusResponse, error) {
  133. if request.NodeID == "" {
  134. return nil, status.Errorf(codes.InvalidArgument, codes.InvalidArgument.String())
  135. }
  136. serverCtx, err := s.isRunningLocked()
  137. if err != nil {
  138. return nil, err
  139. }
  140. var node *api.Node
  141. event := api.EventUpdateNode{
  142. Node: &api.Node{ID: request.NodeID},
  143. Checks: []api.NodeCheckFunc{api.NodeCheckID},
  144. }
  145. // Retrieve the current value of the certificate with this token, and create a watcher
  146. updates, cancel, err := store.ViewAndWatch(
  147. s.store,
  148. func(tx store.ReadTx) error {
  149. node = store.GetNode(tx, request.NodeID)
  150. return nil
  151. },
  152. event,
  153. )
  154. if err != nil {
  155. return nil, err
  156. }
  157. defer cancel()
  158. // This node ID doesn't exist
  159. if node == nil {
  160. return nil, status.Errorf(codes.NotFound, codes.NotFound.String())
  161. }
  162. log.G(ctx).WithFields(logrus.Fields{
  163. "node.id": node.ID,
  164. "status": node.Certificate.Status,
  165. "method": "NodeCertificateStatus",
  166. })
  167. // If this certificate has a final state, return it immediately (both pending and renew are transition states)
  168. if isFinalState(node.Certificate.Status) {
  169. return &api.NodeCertificateStatusResponse{
  170. Status: &node.Certificate.Status,
  171. Certificate: &node.Certificate,
  172. }, nil
  173. }
  174. log.G(ctx).WithFields(logrus.Fields{
  175. "node.id": node.ID,
  176. "status": node.Certificate.Status,
  177. "method": "NodeCertificateStatus",
  178. }).Debugf("started watching for certificate updates")
  179. // Certificate is Pending or in an Unknown state, let's wait for changes.
  180. for {
  181. select {
  182. case event := <-updates:
  183. switch v := event.(type) {
  184. case api.EventUpdateNode:
  185. // We got an update on the certificate record. If the status is a final state,
  186. // return the certificate.
  187. if isFinalState(v.Node.Certificate.Status) {
  188. cert := v.Node.Certificate.Copy()
  189. return &api.NodeCertificateStatusResponse{
  190. Status: &cert.Status,
  191. Certificate: cert,
  192. }, nil
  193. }
  194. }
  195. case <-ctx.Done():
  196. return nil, ctx.Err()
  197. case <-serverCtx.Done():
  198. return nil, s.ctx.Err()
  199. }
  200. }
  201. }
  202. // IssueNodeCertificate is responsible for gatekeeping both certificate requests from new nodes in the swarm,
  203. // and authorizing certificate renewals.
  204. // If a node presented a valid certificate, the corresponding certificate is set in a RENEW state.
  205. // If a node failed to present a valid certificate, we check for a valid join token and set the
  206. // role accordingly. A new random node ID is generated, and the corresponding node entry is created.
  207. // IssueNodeCertificate is the only place where new node entries to raft should be created.
  208. func (s *Server) IssueNodeCertificate(ctx context.Context, request *api.IssueNodeCertificateRequest) (*api.IssueNodeCertificateResponse, error) {
  209. // First, let's see if the remote node is presenting a non-empty CSR
  210. if len(request.CSR) == 0 {
  211. return nil, status.Errorf(codes.InvalidArgument, codes.InvalidArgument.String())
  212. }
  213. if err := s.isReadyLocked(); err != nil {
  214. return nil, err
  215. }
  216. var (
  217. blacklistedCerts map[string]*api.BlacklistedCertificate
  218. clusters []*api.Cluster
  219. err error
  220. )
  221. s.store.View(func(readTx store.ReadTx) {
  222. clusters, err = store.FindClusters(readTx, store.ByName(store.DefaultClusterName))
  223. })
  224. // Not having a cluster object yet means we can't check
  225. // the blacklist.
  226. if err == nil && len(clusters) == 1 {
  227. blacklistedCerts = clusters[0].BlacklistedCertificates
  228. }
  229. // Renewing the cert with a local (unix socket) is always valid.
  230. localNodeInfo := ctx.Value(LocalRequestKey)
  231. if localNodeInfo != nil {
  232. nodeInfo, ok := localNodeInfo.(RemoteNodeInfo)
  233. if ok && nodeInfo.NodeID != "" {
  234. return s.issueRenewCertificate(ctx, nodeInfo.NodeID, request.CSR)
  235. }
  236. }
  237. // If the remote node is a worker (either forwarded by a manager, or calling directly),
  238. // issue a renew worker certificate entry with the correct ID
  239. nodeID, err := AuthorizeForwardedRoleAndOrg(ctx, []string{WorkerRole}, []string{ManagerRole}, s.clusterID, blacklistedCerts)
  240. if err == nil {
  241. return s.issueRenewCertificate(ctx, nodeID, request.CSR)
  242. }
  243. // If the remote node is a manager (either forwarded by another manager, or calling directly),
  244. // issue a renew certificate entry with the correct ID
  245. nodeID, err = AuthorizeForwardedRoleAndOrg(ctx, []string{ManagerRole}, []string{ManagerRole}, s.clusterID, blacklistedCerts)
  246. if err == nil {
  247. return s.issueRenewCertificate(ctx, nodeID, request.CSR)
  248. }
  249. // The remote node didn't successfully present a valid MTLS certificate, let's issue a
  250. // certificate with a new random ID
  251. role := api.NodeRole(-1)
  252. s.mu.Lock()
  253. if subtle.ConstantTimeCompare([]byte(s.joinTokens.Manager), []byte(request.Token)) == 1 {
  254. role = api.NodeRoleManager
  255. } else if subtle.ConstantTimeCompare([]byte(s.joinTokens.Worker), []byte(request.Token)) == 1 {
  256. role = api.NodeRoleWorker
  257. }
  258. s.mu.Unlock()
  259. if role < 0 {
  260. return nil, status.Errorf(codes.InvalidArgument, "A valid join token is necessary to join this cluster")
  261. }
  262. // Max number of collisions of ID or CN to tolerate before giving up
  263. maxRetries := 3
  264. // Generate a random ID for this new node
  265. for i := 0; ; i++ {
  266. nodeID = identity.NewID()
  267. // Create a new node
  268. err := s.store.Update(func(tx store.Tx) error {
  269. node := &api.Node{
  270. Role: role,
  271. ID: nodeID,
  272. Certificate: api.Certificate{
  273. CSR: request.CSR,
  274. CN: nodeID,
  275. Role: role,
  276. Status: api.IssuanceStatus{
  277. State: api.IssuanceStatePending,
  278. },
  279. },
  280. Spec: api.NodeSpec{
  281. DesiredRole: role,
  282. Membership: api.NodeMembershipAccepted,
  283. Availability: request.Availability,
  284. },
  285. }
  286. return store.CreateNode(tx, node)
  287. })
  288. if err == nil {
  289. log.G(ctx).WithFields(logrus.Fields{
  290. "node.id": nodeID,
  291. "node.role": role,
  292. "method": "IssueNodeCertificate",
  293. }).Debugf("new certificate entry added")
  294. break
  295. }
  296. if err != store.ErrExist {
  297. return nil, err
  298. }
  299. if i == maxRetries {
  300. return nil, err
  301. }
  302. log.G(ctx).WithFields(logrus.Fields{
  303. "node.id": nodeID,
  304. "node.role": role,
  305. "method": "IssueNodeCertificate",
  306. }).Errorf("randomly generated node ID collided with an existing one - retrying")
  307. }
  308. return &api.IssueNodeCertificateResponse{
  309. NodeID: nodeID,
  310. NodeMembership: api.NodeMembershipAccepted,
  311. }, nil
  312. }
  313. // issueRenewCertificate receives a nodeID and a CSR and modifies the node's certificate entry with the new CSR
  314. // and changes the state to RENEW, so it can be picked up and signed by the signing reconciliation loop
  315. func (s *Server) issueRenewCertificate(ctx context.Context, nodeID string, csr []byte) (*api.IssueNodeCertificateResponse, error) {
  316. var (
  317. cert api.Certificate
  318. node *api.Node
  319. )
  320. err := s.store.Update(func(tx store.Tx) error {
  321. // Attempt to retrieve the node with nodeID
  322. node = store.GetNode(tx, nodeID)
  323. if node == nil {
  324. log.G(ctx).WithFields(logrus.Fields{
  325. "node.id": nodeID,
  326. "method": "issueRenewCertificate",
  327. }).Warnf("node does not exist")
  328. // If this node doesn't exist, we shouldn't be renewing a certificate for it
  329. return status.Errorf(codes.NotFound, "node %s not found when attempting to renew certificate", nodeID)
  330. }
  331. // Create a new Certificate entry for this node with the new CSR and a RENEW state
  332. cert = api.Certificate{
  333. CSR: csr,
  334. CN: node.ID,
  335. Role: node.Role,
  336. Status: api.IssuanceStatus{
  337. State: api.IssuanceStateRenew,
  338. },
  339. }
  340. node.Certificate = cert
  341. return store.UpdateNode(tx, node)
  342. })
  343. if err != nil {
  344. return nil, err
  345. }
  346. log.G(ctx).WithFields(logrus.Fields{
  347. "cert.cn": cert.CN,
  348. "cert.role": cert.Role,
  349. "method": "issueRenewCertificate",
  350. }).Debugf("node certificate updated")
  351. return &api.IssueNodeCertificateResponse{
  352. NodeID: nodeID,
  353. NodeMembership: node.Spec.Membership,
  354. }, nil
  355. }
  356. // GetRootCACertificate returns the certificate of the Root CA. It is used as a convenience for distributing
  357. // the root of trust for the swarm. Clients should be using the CA hash to verify if they weren't target to
  358. // a MiTM. If they fail to do so, node bootstrap works with TOFU semantics.
  359. func (s *Server) GetRootCACertificate(ctx context.Context, request *api.GetRootCACertificateRequest) (*api.GetRootCACertificateResponse, error) {
  360. log.G(ctx).WithFields(logrus.Fields{
  361. "method": "GetRootCACertificate",
  362. })
  363. s.signingMu.Lock()
  364. defer s.signingMu.Unlock()
  365. return &api.GetRootCACertificateResponse{
  366. Certificate: s.localRootCA.Certs,
  367. }, nil
  368. }
  369. // Run runs the CA signer main loop.
  370. // The CA signer can be stopped with cancelling ctx or calling Stop().
  371. func (s *Server) Run(ctx context.Context) error {
  372. s.mu.Lock()
  373. if s.isRunning() {
  374. s.mu.Unlock()
  375. return errors.New("CA signer is already running")
  376. }
  377. s.wg.Add(1)
  378. s.ctx, s.cancel = context.WithCancel(log.WithModule(ctx, "ca"))
  379. ctx = s.ctx
  380. s.mu.Unlock()
  381. defer s.wg.Done()
  382. defer func() {
  383. s.mu.Lock()
  384. s.mu.Unlock()
  385. }()
  386. // Retrieve the channels to keep track of changes in the cluster
  387. // Retrieve all the currently registered nodes
  388. var (
  389. nodes []*api.Node
  390. cluster *api.Cluster
  391. err error
  392. )
  393. updates, cancel, err := store.ViewAndWatch(
  394. s.store,
  395. func(readTx store.ReadTx) error {
  396. cluster = store.GetCluster(readTx, s.clusterID)
  397. if cluster == nil {
  398. return errors.New("could not find cluster object")
  399. }
  400. nodes, err = store.FindNodes(readTx, store.All)
  401. return err
  402. },
  403. api.EventCreateNode{},
  404. api.EventUpdateNode{},
  405. api.EventDeleteNode{},
  406. api.EventUpdateCluster{
  407. Cluster: &api.Cluster{ID: s.clusterID},
  408. Checks: []api.ClusterCheckFunc{api.ClusterCheckID},
  409. },
  410. )
  411. // call once to ensure that the join tokens and local/external CA signer are always set
  412. rootReconciler := &rootRotationReconciler{
  413. ctx: log.WithField(ctx, "method", "(*Server).rootRotationReconciler"),
  414. clusterID: s.clusterID,
  415. store: s.store,
  416. batchUpdateInterval: s.rootReconciliationRetryInterval,
  417. }
  418. s.UpdateRootCA(ctx, cluster, rootReconciler)
  419. // Do this after updateCluster has been called, so Ready() and isRunning never returns true without
  420. // the join tokens and external CA/security config's root CA being set correctly
  421. s.mu.Lock()
  422. close(s.started)
  423. s.mu.Unlock()
  424. if err != nil {
  425. log.G(ctx).WithFields(logrus.Fields{
  426. "method": "(*Server).Run",
  427. }).WithError(err).Errorf("snapshot store view failed")
  428. return err
  429. }
  430. defer cancel()
  431. // We might have missed some updates if there was a leader election,
  432. // so let's pick up the slack.
  433. if err := s.reconcileNodeCertificates(ctx, nodes); err != nil {
  434. // We don't return here because that means the Run loop would
  435. // never run. Log an error instead.
  436. log.G(ctx).WithFields(logrus.Fields{
  437. "method": "(*Server).Run",
  438. }).WithError(err).Errorf("error attempting to reconcile certificates")
  439. }
  440. ticker := time.NewTicker(s.reconciliationRetryInterval)
  441. defer ticker.Stop()
  442. externalTLSCredsChange, externalTLSWatchCancel := s.securityConfig.Watch()
  443. defer externalTLSWatchCancel()
  444. // Watch for new nodes being created, new nodes being updated, and changes
  445. // to the cluster
  446. for {
  447. select {
  448. case <-ctx.Done():
  449. return nil
  450. default:
  451. }
  452. select {
  453. case event := <-updates:
  454. switch v := event.(type) {
  455. case api.EventCreateNode:
  456. s.evaluateAndSignNodeCert(ctx, v.Node)
  457. rootReconciler.UpdateNode(v.Node)
  458. case api.EventUpdateNode:
  459. // If this certificate is already at a final state
  460. // no need to evaluate and sign it.
  461. if !isFinalState(v.Node.Certificate.Status) {
  462. s.evaluateAndSignNodeCert(ctx, v.Node)
  463. }
  464. rootReconciler.UpdateNode(v.Node)
  465. case api.EventDeleteNode:
  466. rootReconciler.DeleteNode(v.Node)
  467. case api.EventUpdateCluster:
  468. if v.Cluster.ID == s.clusterID {
  469. s.UpdateRootCA(ctx, v.Cluster, rootReconciler)
  470. }
  471. }
  472. case <-externalTLSCredsChange:
  473. // The TLS certificates can rotate independently of the root CA (and hence which roots the
  474. // external CA trusts) and external CA URLs. It's possible that the root CA update is received
  475. // before the external TLS cred change notification. During that period, it is possible that
  476. // the TLS creds will expire or otherwise fail to authorize against external CAs. However, in
  477. // that case signing will just fail with a recoverable connectivity error - the state of the
  478. // certificate issuance is left as pending, and on the next tick, the server will try to sign
  479. // all nodes with pending certs again (by which time the TLS cred change will have been
  480. // received).
  481. // Note that if the external CA changes, the new external CA *MUST* trust the current server's
  482. // certificate issuer, and this server's certificates should not be extremely close to expiry,
  483. // otherwise this server would not be able to get new TLS certificates and will no longer be
  484. // able to function.
  485. s.signingMu.Lock()
  486. s.externalCA.UpdateTLSConfig(NewExternalCATLSConfig(
  487. s.securityConfig.ClientTLSCreds.Config().Certificates, s.externalCAPool))
  488. s.signingMu.Unlock()
  489. case <-ticker.C:
  490. for _, node := range s.pending {
  491. if err := s.evaluateAndSignNodeCert(ctx, node); err != nil {
  492. // If this sign operation did not succeed, the rest are
  493. // unlikely to. Yield so that we don't hammer an external CA.
  494. // Since the map iteration order is randomized, there is no
  495. // risk of getting stuck on a problematic CSR.
  496. break
  497. }
  498. }
  499. case <-ctx.Done():
  500. return nil
  501. }
  502. }
  503. }
  504. // Stop stops the CA and closes all grpc streams.
  505. func (s *Server) Stop() error {
  506. s.mu.Lock()
  507. if !s.isRunning() {
  508. s.mu.Unlock()
  509. return errors.New("CA signer is already stopped")
  510. }
  511. s.cancel()
  512. s.started = make(chan struct{})
  513. s.joinTokens = nil
  514. s.mu.Unlock()
  515. // Wait for Run to complete
  516. s.wg.Wait()
  517. return nil
  518. }
  519. // Ready waits on the ready channel and returns when the server is ready to serve.
  520. func (s *Server) Ready() <-chan struct{} {
  521. s.mu.Lock()
  522. defer s.mu.Unlock()
  523. return s.started
  524. }
  525. func (s *Server) isRunningLocked() (context.Context, error) {
  526. s.mu.Lock()
  527. if !s.isRunning() {
  528. s.mu.Unlock()
  529. return nil, status.Errorf(codes.Aborted, "CA signer is stopped")
  530. }
  531. ctx := s.ctx
  532. s.mu.Unlock()
  533. return ctx, nil
  534. }
  535. func (s *Server) isReadyLocked() error {
  536. s.mu.Lock()
  537. defer s.mu.Unlock()
  538. if !s.isRunning() {
  539. return status.Errorf(codes.Aborted, "CA signer is stopped")
  540. }
  541. if s.joinTokens == nil {
  542. return status.Errorf(codes.Aborted, "CA signer is still starting")
  543. }
  544. return nil
  545. }
  546. func (s *Server) isRunning() bool {
  547. if s.ctx == nil {
  548. return false
  549. }
  550. select {
  551. case <-s.ctx.Done():
  552. return false
  553. default:
  554. }
  555. return true
  556. }
  557. // filterExternalCAURLS returns a list of external CA urls filtered by the desired cert.
  558. func filterExternalCAURLS(ctx context.Context, desiredCert, defaultCert []byte, apiExternalCAs []*api.ExternalCA) (urls []string) {
  559. desiredCert = NormalizePEMs(desiredCert)
  560. // TODO(aaronl): In the future, this will be abstracted with an ExternalCA interface that has different
  561. // implementations for different CA types. At the moment, only CFSSL is supported.
  562. for i, extCA := range apiExternalCAs {
  563. // We want to support old external CA specifications which did not have a CA cert. If there is no cert specified,
  564. // we assume it's the old cert
  565. certForExtCA := extCA.CACert
  566. if len(certForExtCA) == 0 {
  567. certForExtCA = defaultCert
  568. }
  569. certForExtCA = NormalizePEMs(certForExtCA)
  570. if extCA.Protocol != api.ExternalCA_CAProtocolCFSSL {
  571. log.G(ctx).Debugf("skipping external CA %d (url: %s) due to unknown protocol type", i, extCA.URL)
  572. continue
  573. }
  574. if !bytes.Equal(certForExtCA, desiredCert) {
  575. log.G(ctx).Debugf("skipping external CA %d (url: %s) because it has the wrong CA cert", i, extCA.URL)
  576. continue
  577. }
  578. urls = append(urls, extCA.URL)
  579. }
  580. return
  581. }
  582. // UpdateRootCA is called when there are cluster changes, and it ensures that the local RootCA is
  583. // always aware of changes in clusterExpiry and the Root CA key material - this can be called by
  584. // anything to update the root CA material
  585. func (s *Server) UpdateRootCA(ctx context.Context, cluster *api.Cluster, reconciler *rootRotationReconciler) error {
  586. s.mu.Lock()
  587. s.joinTokens = cluster.RootCA.JoinTokens.Copy()
  588. s.mu.Unlock()
  589. rCA := cluster.RootCA.Copy()
  590. if reconciler != nil {
  591. reconciler.UpdateRootCA(rCA)
  592. }
  593. s.signingMu.Lock()
  594. defer s.signingMu.Unlock()
  595. firstSeenCluster := s.lastSeenClusterRootCA == nil && s.lastSeenExternalCAs == nil
  596. rootCAChanged := len(rCA.CACert) != 0 && !equality.RootCAEqualStable(s.lastSeenClusterRootCA, rCA)
  597. externalCAChanged := !equality.ExternalCAsEqualStable(s.lastSeenExternalCAs, cluster.Spec.CAConfig.ExternalCAs)
  598. ctx = log.WithLogger(ctx, log.G(ctx).WithFields(logrus.Fields{
  599. "cluster.id": cluster.ID,
  600. "method": "(*Server).UpdateRootCA",
  601. }))
  602. if rootCAChanged {
  603. setOrUpdate := "set"
  604. if !firstSeenCluster {
  605. log.G(ctx).Debug("Updating signing root CA and external CA due to change in cluster Root CA")
  606. setOrUpdate = "updated"
  607. }
  608. expiry := DefaultNodeCertExpiration
  609. if cluster.Spec.CAConfig.NodeCertExpiry != nil {
  610. // NodeCertExpiry exists, let's try to parse the duration out of it
  611. clusterExpiry, err := gogotypes.DurationFromProto(cluster.Spec.CAConfig.NodeCertExpiry)
  612. if err != nil {
  613. log.G(ctx).WithError(err).Warn("failed to parse certificate expiration, using default")
  614. } else {
  615. // We were able to successfully parse the expiration out of the cluster.
  616. expiry = clusterExpiry
  617. }
  618. } else {
  619. // NodeCertExpiry seems to be nil
  620. log.G(ctx).Warn("no certificate expiration specified, using default")
  621. }
  622. // Attempt to update our local RootCA with the new parameters
  623. updatedRootCA, err := RootCAFromAPI(ctx, rCA, expiry)
  624. if err != nil {
  625. return errors.Wrap(err, "invalid Root CA object in cluster")
  626. }
  627. s.localRootCA = &updatedRootCA
  628. s.externalCAPool = updatedRootCA.Pool
  629. externalCACert := rCA.CACert
  630. if rCA.RootRotation != nil {
  631. externalCACert = rCA.RootRotation.CACert
  632. // the external CA has to trust the new CA cert
  633. s.externalCAPool = x509.NewCertPool()
  634. s.externalCAPool.AppendCertsFromPEM(rCA.CACert)
  635. s.externalCAPool.AppendCertsFromPEM(rCA.RootRotation.CACert)
  636. }
  637. s.lastSeenExternalCAs = cluster.Spec.CAConfig.Copy().ExternalCAs
  638. urls := filterExternalCAURLS(ctx, externalCACert, rCA.CACert, s.lastSeenExternalCAs)
  639. // Replace the external CA with the relevant intermediates, URLS, and TLS config
  640. s.externalCA = NewExternalCA(updatedRootCA.Intermediates,
  641. NewExternalCATLSConfig(s.securityConfig.ClientTLSCreds.Config().Certificates, s.externalCAPool), urls...)
  642. // only update the server cache if we've successfully updated the root CA
  643. log.G(ctx).Debugf("Root CA %s successfully", setOrUpdate)
  644. s.lastSeenClusterRootCA = rCA
  645. } else if externalCAChanged {
  646. // we want to update only if the external CA URLS have changed, since if the root CA has changed we already
  647. // run similar logic
  648. if !firstSeenCluster {
  649. log.G(ctx).Debug("Updating security config external CA URLs due to change in cluster spec's list of external CAs")
  650. }
  651. wantedExternalCACert := rCA.CACert // we want to only add external CA URLs that use this cert
  652. if rCA.RootRotation != nil {
  653. // we're rotating to a new root, so we only want external CAs with the new root cert
  654. wantedExternalCACert = rCA.RootRotation.CACert
  655. }
  656. // Update our external CA with the list of External CA URLs from the new cluster state
  657. s.lastSeenExternalCAs = cluster.Spec.CAConfig.Copy().ExternalCAs
  658. urls := filterExternalCAURLS(ctx, wantedExternalCACert, rCA.CACert, s.lastSeenExternalCAs)
  659. s.externalCA.UpdateURLs(urls...)
  660. }
  661. return nil
  662. }
  663. // evaluateAndSignNodeCert implements the logic of which certificates to sign
  664. func (s *Server) evaluateAndSignNodeCert(ctx context.Context, node *api.Node) error {
  665. // If the desired membership and actual state are in sync, there's
  666. // nothing to do.
  667. certState := node.Certificate.Status.State
  668. if node.Spec.Membership == api.NodeMembershipAccepted &&
  669. (certState == api.IssuanceStateIssued || certState == api.IssuanceStateRotate) {
  670. return nil
  671. }
  672. // If the certificate state is renew, then it is a server-sided accepted cert (cert renewals)
  673. if certState == api.IssuanceStateRenew {
  674. return s.signNodeCert(ctx, node)
  675. }
  676. // Sign this certificate if a user explicitly changed it to Accepted, and
  677. // the certificate is in pending state
  678. if node.Spec.Membership == api.NodeMembershipAccepted && certState == api.IssuanceStatePending {
  679. return s.signNodeCert(ctx, node)
  680. }
  681. return nil
  682. }
  683. // signNodeCert does the bulk of the work for signing a certificate
  684. func (s *Server) signNodeCert(ctx context.Context, node *api.Node) error {
  685. s.signingMu.Lock()
  686. rootCA := s.localRootCA
  687. externalCA := s.externalCA
  688. s.signingMu.Unlock()
  689. node = node.Copy()
  690. nodeID := node.ID
  691. // Convert the role from proto format
  692. role, err := ParseRole(node.Certificate.Role)
  693. if err != nil {
  694. log.G(ctx).WithFields(logrus.Fields{
  695. "node.id": node.ID,
  696. "method": "(*Server).signNodeCert",
  697. }).WithError(err).Errorf("failed to parse role")
  698. return errors.New("failed to parse role")
  699. }
  700. s.pending[node.ID] = node
  701. // Attempt to sign the CSR
  702. var (
  703. rawCSR = node.Certificate.CSR
  704. cn = node.Certificate.CN
  705. ou = role
  706. org = s.clusterID
  707. )
  708. // Try using the external CA first.
  709. cert, err := externalCA.Sign(ctx, PrepareCSR(rawCSR, cn, ou, org))
  710. if err == ErrNoExternalCAURLs {
  711. // No external CA servers configured. Try using the local CA.
  712. cert, err = rootCA.ParseValidateAndSignCSR(rawCSR, cn, ou, org)
  713. }
  714. if err != nil {
  715. log.G(ctx).WithFields(logrus.Fields{
  716. "node.id": node.ID,
  717. "method": "(*Server).signNodeCert",
  718. }).WithError(err).Errorf("failed to sign CSR")
  719. // If the current state is already Failed, no need to change it
  720. if node.Certificate.Status.State == api.IssuanceStateFailed {
  721. delete(s.pending, node.ID)
  722. return errors.New("failed to sign CSR")
  723. }
  724. if _, ok := err.(recoverableErr); ok {
  725. // Return without changing the state of the certificate. We may
  726. // retry signing it in the future.
  727. return errors.New("failed to sign CSR")
  728. }
  729. // We failed to sign this CSR, change the state to FAILED
  730. err = s.store.Update(func(tx store.Tx) error {
  731. node := store.GetNode(tx, nodeID)
  732. if node == nil {
  733. return errors.Errorf("node %s not found", nodeID)
  734. }
  735. node.Certificate.Status = api.IssuanceStatus{
  736. State: api.IssuanceStateFailed,
  737. Err: err.Error(),
  738. }
  739. return store.UpdateNode(tx, node)
  740. })
  741. if err != nil {
  742. log.G(ctx).WithFields(logrus.Fields{
  743. "node.id": nodeID,
  744. "method": "(*Server).signNodeCert",
  745. }).WithError(err).Errorf("transaction failed when setting state to FAILED")
  746. }
  747. delete(s.pending, node.ID)
  748. return errors.New("failed to sign CSR")
  749. }
  750. // We were able to successfully sign the new CSR. Let's try to update the nodeStore
  751. for {
  752. err = s.store.Update(func(tx store.Tx) error {
  753. node.Certificate.Certificate = cert
  754. node.Certificate.Status = api.IssuanceStatus{
  755. State: api.IssuanceStateIssued,
  756. }
  757. err := store.UpdateNode(tx, node)
  758. if err != nil {
  759. node = store.GetNode(tx, nodeID)
  760. if node == nil {
  761. err = errors.Errorf("node %s does not exist", nodeID)
  762. }
  763. }
  764. return err
  765. })
  766. if err == nil {
  767. log.G(ctx).WithFields(logrus.Fields{
  768. "node.id": node.ID,
  769. "node.role": node.Certificate.Role,
  770. "method": "(*Server).signNodeCert",
  771. }).Debugf("certificate issued")
  772. delete(s.pending, node.ID)
  773. break
  774. }
  775. if err == store.ErrSequenceConflict {
  776. continue
  777. }
  778. log.G(ctx).WithFields(logrus.Fields{
  779. "node.id": nodeID,
  780. "method": "(*Server).signNodeCert",
  781. }).WithError(err).Errorf("transaction failed")
  782. return errors.New("transaction failed")
  783. }
  784. return nil
  785. }
  786. // reconcileNodeCertificates is a helper method that calls evaluateAndSignNodeCert on all the
  787. // nodes.
  788. func (s *Server) reconcileNodeCertificates(ctx context.Context, nodes []*api.Node) error {
  789. for _, node := range nodes {
  790. s.evaluateAndSignNodeCert(ctx, node)
  791. }
  792. return nil
  793. }
  794. // A successfully issued certificate and a failed certificate are our current final states
  795. func isFinalState(status api.IssuanceStatus) bool {
  796. if status.State == api.IssuanceStateIssued || status.State == api.IssuanceStateFailed ||
  797. status.State == api.IssuanceStateRotate {
  798. return true
  799. }
  800. return false
  801. }
  802. // RootCAFromAPI creates a RootCA object from an api.RootCA object
  803. func RootCAFromAPI(ctx context.Context, apiRootCA *api.RootCA, expiry time.Duration) (RootCA, error) {
  804. var intermediates []byte
  805. signingCert := apiRootCA.CACert
  806. signingKey := apiRootCA.CAKey
  807. if apiRootCA.RootRotation != nil {
  808. signingCert = apiRootCA.RootRotation.CrossSignedCACert
  809. signingKey = apiRootCA.RootRotation.CAKey
  810. intermediates = apiRootCA.RootRotation.CrossSignedCACert
  811. }
  812. if signingKey == nil {
  813. signingCert = nil
  814. }
  815. return NewRootCA(apiRootCA.CACert, signingCert, signingKey, expiry, intermediates)
  816. }