|
@@ -0,0 +1,259 @@
|
|
|
+package ca
|
|
|
+
|
|
|
+import (
|
|
|
+ "bytes"
|
|
|
+ "context"
|
|
|
+ "fmt"
|
|
|
+ "reflect"
|
|
|
+ "sync"
|
|
|
+ "time"
|
|
|
+
|
|
|
+ "github.com/cloudflare/cfssl/helpers"
|
|
|
+ "github.com/docker/swarmkit/api"
|
|
|
+ "github.com/docker/swarmkit/api/equality"
|
|
|
+ "github.com/docker/swarmkit/log"
|
|
|
+ "github.com/docker/swarmkit/manager/state/store"
|
|
|
+ "github.com/pkg/errors"
|
|
|
+)
|
|
|
+
|
|
|
+// IssuanceStateRotateMaxBatchSize is the maximum number of nodes we'll tell to rotate their certificates in any given update
|
|
|
+const IssuanceStateRotateMaxBatchSize = 30
|
|
|
+
|
|
|
+func hasIssuer(n *api.Node, info *IssuerInfo) bool {
|
|
|
+ if n.Description == nil || n.Description.TLSInfo == nil {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ return bytes.Equal(info.Subject, n.Description.TLSInfo.CertIssuerSubject) && bytes.Equal(info.PublicKey, n.Description.TLSInfo.CertIssuerPublicKey)
|
|
|
+}
|
|
|
+
|
|
|
+var errRootRotationChanged = errors.New("target root rotation has changed")
|
|
|
+
|
|
|
+// rootRotationReconciler keeps track of all the nodes in the store so that we can determine which ones need reconciliation when nodes are updated
|
|
|
+// or the root CA is updated. This is meant to be used with watches on nodes and the cluster, and provides functions to be called when the
|
|
|
+// cluster's RootCA has changed and when a node is added, updated, or removed.
|
|
|
+type rootRotationReconciler struct {
|
|
|
+ mu sync.Mutex
|
|
|
+ clusterID string
|
|
|
+ batchUpdateInterval time.Duration
|
|
|
+ ctx context.Context
|
|
|
+ store *store.MemoryStore
|
|
|
+
|
|
|
+ currentRootCA *api.RootCA
|
|
|
+ currentIssuer IssuerInfo
|
|
|
+ unconvergedNodes map[string]*api.Node
|
|
|
+
|
|
|
+ wg sync.WaitGroup
|
|
|
+ cancel func()
|
|
|
+}
|
|
|
+
|
|
|
+// IssuerFromAPIRootCA returns the desired issuer given an API root CA object
|
|
|
+func IssuerFromAPIRootCA(rootCA *api.RootCA) (*IssuerInfo, error) {
|
|
|
+ wantedIssuer := rootCA.CACert
|
|
|
+ if rootCA.RootRotation != nil {
|
|
|
+ wantedIssuer = rootCA.RootRotation.CACert
|
|
|
+ }
|
|
|
+ issuerCerts, err := helpers.ParseCertificatesPEM(wantedIssuer)
|
|
|
+ if err != nil {
|
|
|
+ return nil, errors.Wrap(err, "invalid certificate in cluster root CA object")
|
|
|
+ }
|
|
|
+ if len(issuerCerts) == 0 {
|
|
|
+ return nil, errors.New("invalid certificate in cluster root CA object")
|
|
|
+ }
|
|
|
+ return &IssuerInfo{
|
|
|
+ Subject: issuerCerts[0].RawSubject,
|
|
|
+ PublicKey: issuerCerts[0].RawSubjectPublicKeyInfo,
|
|
|
+ }, nil
|
|
|
+}
|
|
|
+
|
|
|
+// assumption: UpdateRootCA will never be called with a `nil` root CA because the caller will be acting in response to
|
|
|
+// a store update event
|
|
|
+func (r *rootRotationReconciler) UpdateRootCA(newRootCA *api.RootCA) {
|
|
|
+ issuerInfo, err := IssuerFromAPIRootCA(newRootCA)
|
|
|
+ if err != nil {
|
|
|
+ log.G(r.ctx).WithError(err).Error("unable to update process the current root CA")
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ var (
|
|
|
+ shouldStartNewLoop, waitForPrevLoop bool
|
|
|
+ loopCtx context.Context
|
|
|
+ )
|
|
|
+ r.mu.Lock()
|
|
|
+ defer func() {
|
|
|
+ r.mu.Unlock()
|
|
|
+ if shouldStartNewLoop {
|
|
|
+ if waitForPrevLoop {
|
|
|
+ r.wg.Wait()
|
|
|
+ }
|
|
|
+ r.wg.Add(1)
|
|
|
+ go r.runReconcilerLoop(loopCtx, newRootCA)
|
|
|
+ }
|
|
|
+ }()
|
|
|
+
|
|
|
+ // check if the issuer has changed, first
|
|
|
+ if reflect.DeepEqual(&r.currentIssuer, issuerInfo) {
|
|
|
+ r.currentRootCA = newRootCA
|
|
|
+ return
|
|
|
+ }
|
|
|
+ // If the issuer has changed, iterate through all the nodes to figure out which ones need rotation
|
|
|
+ if newRootCA.RootRotation != nil {
|
|
|
+ var nodes []*api.Node
|
|
|
+ r.store.View(func(tx store.ReadTx) {
|
|
|
+ nodes, err = store.FindNodes(tx, store.ByMembership(api.NodeMembershipAccepted))
|
|
|
+ })
|
|
|
+ if err != nil {
|
|
|
+ log.G(r.ctx).WithError(err).Error("unable to list nodes, so unable to process the current root CA")
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ // from here on out, there will be no more errors that cause us to have to abandon updating the Root CA,
|
|
|
+ // so we can start making changes to r's fields
|
|
|
+ r.unconvergedNodes = make(map[string]*api.Node)
|
|
|
+ for _, n := range nodes {
|
|
|
+ if !hasIssuer(n, issuerInfo) {
|
|
|
+ r.unconvergedNodes[n.ID] = n
|
|
|
+ }
|
|
|
+ }
|
|
|
+ shouldStartNewLoop = true
|
|
|
+ if r.cancel != nil { // there's already a loop going, so cancel it
|
|
|
+ r.cancel()
|
|
|
+ waitForPrevLoop = true
|
|
|
+ }
|
|
|
+ loopCtx, r.cancel = context.WithCancel(r.ctx)
|
|
|
+ } else {
|
|
|
+ r.unconvergedNodes = nil
|
|
|
+ }
|
|
|
+ r.currentRootCA = newRootCA
|
|
|
+ r.currentIssuer = *issuerInfo
|
|
|
+}
|
|
|
+
|
|
|
+// assumption: UpdateNode will never be called with a `nil` node because the caller will be acting in response to
|
|
|
+// a store update event
|
|
|
+func (r *rootRotationReconciler) UpdateNode(node *api.Node) {
|
|
|
+ r.mu.Lock()
|
|
|
+ defer r.mu.Unlock()
|
|
|
+ // if we're not in the middle of a root rotation, or if this node does not have membership, ignore it
|
|
|
+ if r.currentRootCA == nil || r.currentRootCA.RootRotation == nil || node.Spec.Membership != api.NodeMembershipAccepted {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if hasIssuer(node, &r.currentIssuer) {
|
|
|
+ delete(r.unconvergedNodes, node.ID)
|
|
|
+ } else {
|
|
|
+ r.unconvergedNodes[node.ID] = node
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// assumption: DeleteNode will never be called with a `nil` node because the caller will be acting in response to
|
|
|
+// a store update event
|
|
|
+func (r *rootRotationReconciler) DeleteNode(node *api.Node) {
|
|
|
+ r.mu.Lock()
|
|
|
+ delete(r.unconvergedNodes, node.ID)
|
|
|
+ r.mu.Unlock()
|
|
|
+}
|
|
|
+
|
|
|
+func (r *rootRotationReconciler) runReconcilerLoop(ctx context.Context, loopRootCA *api.RootCA) {
|
|
|
+ defer r.wg.Done()
|
|
|
+ for {
|
|
|
+ r.mu.Lock()
|
|
|
+ if len(r.unconvergedNodes) == 0 {
|
|
|
+ r.mu.Unlock()
|
|
|
+
|
|
|
+ err := r.store.Update(func(tx store.Tx) error {
|
|
|
+ return r.finishRootRotation(tx, loopRootCA)
|
|
|
+ })
|
|
|
+ if err == nil {
|
|
|
+ log.G(r.ctx).Info("completed root rotation")
|
|
|
+ return
|
|
|
+ }
|
|
|
+ log.G(r.ctx).WithError(err).Error("could not complete root rotation")
|
|
|
+ if err == errRootRotationChanged {
|
|
|
+ // if the root rotation has changed, this loop will be cancelled anyway, so may as well abort early
|
|
|
+ return
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ var toUpdate []*api.Node
|
|
|
+ for _, n := range r.unconvergedNodes {
|
|
|
+ iState := n.Certificate.Status.State
|
|
|
+ if iState != api.IssuanceStateRenew && iState != api.IssuanceStatePending && iState != api.IssuanceStateRotate {
|
|
|
+ n = n.Copy()
|
|
|
+ n.Certificate.Status.State = api.IssuanceStateRotate
|
|
|
+ toUpdate = append(toUpdate, n)
|
|
|
+ if len(toUpdate) >= IssuanceStateRotateMaxBatchSize {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ r.mu.Unlock()
|
|
|
+
|
|
|
+ if err := r.batchUpdateNodes(toUpdate); err != nil {
|
|
|
+ log.G(r.ctx).WithError(err).Errorf("store error when trying to batch update %d nodes to request certificate rotation", len(toUpdate))
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ select {
|
|
|
+ case <-ctx.Done():
|
|
|
+ return
|
|
|
+ case <-time.After(r.batchUpdateInterval):
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// This function assumes that the expected root CA has root rotation. This is intended to be used by
|
|
|
+// `reconcileNodeRootsAndCerts`, which uses the root CA from the `lastSeenClusterRootCA`, and checks
|
|
|
+// that it has a root rotation before calling this function.
|
|
|
+func (r *rootRotationReconciler) finishRootRotation(tx store.Tx, expectedRootCA *api.RootCA) error {
|
|
|
+ cluster := store.GetCluster(tx, r.clusterID)
|
|
|
+ if cluster == nil {
|
|
|
+ return fmt.Errorf("unable to get cluster %s", r.clusterID)
|
|
|
+ }
|
|
|
+
|
|
|
+ // If the RootCA object has changed (because another root rotation was started or because some other node
|
|
|
+ // had finished the root rotation), we cannot finish the root rotation that we were working on.
|
|
|
+ if !equality.RootCAEqualStable(expectedRootCA, &cluster.RootCA) {
|
|
|
+ return errRootRotationChanged
|
|
|
+ }
|
|
|
+
|
|
|
+ var signerCert []byte
|
|
|
+ if len(cluster.RootCA.RootRotation.CAKey) > 0 {
|
|
|
+ signerCert = cluster.RootCA.RootRotation.CACert
|
|
|
+ }
|
|
|
+ // we don't actually have to parse out the default node expiration from the cluster - we are just using
|
|
|
+ // the ca.RootCA object to generate new tokens and the digest
|
|
|
+ updatedRootCA, err := NewRootCA(cluster.RootCA.RootRotation.CACert, signerCert, cluster.RootCA.RootRotation.CAKey,
|
|
|
+ DefaultNodeCertExpiration, nil)
|
|
|
+ if err != nil {
|
|
|
+ return errors.Wrap(err, "invalid cluster root rotation object")
|
|
|
+ }
|
|
|
+ cluster.RootCA = api.RootCA{
|
|
|
+ CACert: cluster.RootCA.RootRotation.CACert,
|
|
|
+ CAKey: cluster.RootCA.RootRotation.CAKey,
|
|
|
+ CACertHash: updatedRootCA.Digest.String(),
|
|
|
+ JoinTokens: api.JoinTokens{
|
|
|
+ Worker: GenerateJoinToken(&updatedRootCA),
|
|
|
+ Manager: GenerateJoinToken(&updatedRootCA),
|
|
|
+ },
|
|
|
+ LastForcedRotation: cluster.RootCA.LastForcedRotation,
|
|
|
+ }
|
|
|
+ return store.UpdateCluster(tx, cluster)
|
|
|
+}
|
|
|
+
|
|
|
+func (r *rootRotationReconciler) batchUpdateNodes(toUpdate []*api.Node) error {
|
|
|
+ if len(toUpdate) == 0 {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ _, err := r.store.Batch(func(batch *store.Batch) error {
|
|
|
+ // Directly update the nodes rather than get + update, and ignore version errors. Since
|
|
|
+ // `rootRotationReconciler` should be hooked up to all node update/delete/create events, we should have
|
|
|
+ // close to the latest versions of all the nodes. If not, the node will updated later and the
|
|
|
+ // next batch of updates should catch it.
|
|
|
+ for _, n := range toUpdate {
|
|
|
+ if err := batch.Update(func(tx store.Tx) error {
|
|
|
+ return store.UpdateNode(tx, n)
|
|
|
+ }); err != nil && err != store.ErrSequenceConflict {
|
|
|
+ log.G(r.ctx).WithError(err).Errorf("unable to update node %s to request a certificate rotation", n.ID)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+ })
|
|
|
+ return err
|
|
|
+}
|