diff --git a/vendor.conf b/vendor.conf index aa8b2f74e5..c302b9398d 100644 --- a/vendor.conf +++ b/vendor.conf @@ -105,7 +105,7 @@ github.com/docker/containerd 9048e5e50717ea4497b757314bad98ea3763c145 github.com/tonistiigi/fifo 1405643975692217d6720f8b54aeee1bf2cd5cf4 # cluster -github.com/docker/swarmkit d5232280c510d70755ab11305d46a5704735371a +github.com/docker/swarmkit b19d028de0a6e9ca281afeb76cea2544b9edd839 github.com/gogo/protobuf 8d70fb3182befc465c4a1eac8ad4d38ff49778e2 github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a github.com/google/certificate-transparency d90e65c3a07988180c5b1ece71791c0b6506826e diff --git a/vendor/github.com/docker/swarmkit/ca/reconciler.go b/vendor/github.com/docker/swarmkit/ca/reconciler.go new file mode 100644 index 0000000000..6e326f3f5b --- /dev/null +++ b/vendor/github.com/docker/swarmkit/ca/reconciler.go @@ -0,0 +1,259 @@ +package ca + +import ( + "bytes" + "context" + "fmt" + "reflect" + "sync" + "time" + + "github.com/cloudflare/cfssl/helpers" + "github.com/docker/swarmkit/api" + "github.com/docker/swarmkit/api/equality" + "github.com/docker/swarmkit/log" + "github.com/docker/swarmkit/manager/state/store" + "github.com/pkg/errors" +) + +// IssuanceStateRotateMaxBatchSize is the maximum number of nodes we'll tell to rotate their certificates in any given update +const IssuanceStateRotateMaxBatchSize = 30 + +func hasIssuer(n *api.Node, info *IssuerInfo) bool { + if n.Description == nil || n.Description.TLSInfo == nil { + return false + } + return bytes.Equal(info.Subject, n.Description.TLSInfo.CertIssuerSubject) && bytes.Equal(info.PublicKey, n.Description.TLSInfo.CertIssuerPublicKey) +} + +var errRootRotationChanged = errors.New("target root rotation has changed") + +// rootRotationReconciler keeps track of all the nodes in the store so that we can determine which ones need reconciliation when nodes are updated +// or the root CA is updated. This is meant to be used with watches on nodes and the cluster, and provides functions to be called when the +// cluster's RootCA has changed and when a node is added, updated, or removed. +type rootRotationReconciler struct { + mu sync.Mutex + clusterID string + batchUpdateInterval time.Duration + ctx context.Context + store *store.MemoryStore + + currentRootCA *api.RootCA + currentIssuer IssuerInfo + unconvergedNodes map[string]*api.Node + + wg sync.WaitGroup + cancel func() +} + +// IssuerFromAPIRootCA returns the desired issuer given an API root CA object +func IssuerFromAPIRootCA(rootCA *api.RootCA) (*IssuerInfo, error) { + wantedIssuer := rootCA.CACert + if rootCA.RootRotation != nil { + wantedIssuer = rootCA.RootRotation.CACert + } + issuerCerts, err := helpers.ParseCertificatesPEM(wantedIssuer) + if err != nil { + return nil, errors.Wrap(err, "invalid certificate in cluster root CA object") + } + if len(issuerCerts) == 0 { + return nil, errors.New("invalid certificate in cluster root CA object") + } + return &IssuerInfo{ + Subject: issuerCerts[0].RawSubject, + PublicKey: issuerCerts[0].RawSubjectPublicKeyInfo, + }, nil +} + +// assumption: UpdateRootCA will never be called with a `nil` root CA because the caller will be acting in response to +// a store update event +func (r *rootRotationReconciler) UpdateRootCA(newRootCA *api.RootCA) { + issuerInfo, err := IssuerFromAPIRootCA(newRootCA) + if err != nil { + log.G(r.ctx).WithError(err).Error("unable to update process the current root CA") + return + } + + var ( + shouldStartNewLoop, waitForPrevLoop bool + loopCtx context.Context + ) + r.mu.Lock() + defer func() { + r.mu.Unlock() + if shouldStartNewLoop { + if waitForPrevLoop { + r.wg.Wait() + } + r.wg.Add(1) + go r.runReconcilerLoop(loopCtx, newRootCA) + } + }() + + // check if the issuer has changed, first + if reflect.DeepEqual(&r.currentIssuer, issuerInfo) { + r.currentRootCA = newRootCA + return + } + // If the issuer has changed, iterate through all the nodes to figure out which ones need rotation + if newRootCA.RootRotation != nil { + var nodes []*api.Node + r.store.View(func(tx store.ReadTx) { + nodes, err = store.FindNodes(tx, store.ByMembership(api.NodeMembershipAccepted)) + }) + if err != nil { + log.G(r.ctx).WithError(err).Error("unable to list nodes, so unable to process the current root CA") + return + } + + // from here on out, there will be no more errors that cause us to have to abandon updating the Root CA, + // so we can start making changes to r's fields + r.unconvergedNodes = make(map[string]*api.Node) + for _, n := range nodes { + if !hasIssuer(n, issuerInfo) { + r.unconvergedNodes[n.ID] = n + } + } + shouldStartNewLoop = true + if r.cancel != nil { // there's already a loop going, so cancel it + r.cancel() + waitForPrevLoop = true + } + loopCtx, r.cancel = context.WithCancel(r.ctx) + } else { + r.unconvergedNodes = nil + } + r.currentRootCA = newRootCA + r.currentIssuer = *issuerInfo +} + +// assumption: UpdateNode will never be called with a `nil` node because the caller will be acting in response to +// a store update event +func (r *rootRotationReconciler) UpdateNode(node *api.Node) { + r.mu.Lock() + defer r.mu.Unlock() + // if we're not in the middle of a root rotation, or if this node does not have membership, ignore it + if r.currentRootCA == nil || r.currentRootCA.RootRotation == nil || node.Spec.Membership != api.NodeMembershipAccepted { + return + } + if hasIssuer(node, &r.currentIssuer) { + delete(r.unconvergedNodes, node.ID) + } else { + r.unconvergedNodes[node.ID] = node + } +} + +// assumption: DeleteNode will never be called with a `nil` node because the caller will be acting in response to +// a store update event +func (r *rootRotationReconciler) DeleteNode(node *api.Node) { + r.mu.Lock() + delete(r.unconvergedNodes, node.ID) + r.mu.Unlock() +} + +func (r *rootRotationReconciler) runReconcilerLoop(ctx context.Context, loopRootCA *api.RootCA) { + defer r.wg.Done() + for { + r.mu.Lock() + if len(r.unconvergedNodes) == 0 { + r.mu.Unlock() + + err := r.store.Update(func(tx store.Tx) error { + return r.finishRootRotation(tx, loopRootCA) + }) + if err == nil { + log.G(r.ctx).Info("completed root rotation") + return + } + log.G(r.ctx).WithError(err).Error("could not complete root rotation") + if err == errRootRotationChanged { + // if the root rotation has changed, this loop will be cancelled anyway, so may as well abort early + return + } + } else { + var toUpdate []*api.Node + for _, n := range r.unconvergedNodes { + iState := n.Certificate.Status.State + if iState != api.IssuanceStateRenew && iState != api.IssuanceStatePending && iState != api.IssuanceStateRotate { + n = n.Copy() + n.Certificate.Status.State = api.IssuanceStateRotate + toUpdate = append(toUpdate, n) + if len(toUpdate) >= IssuanceStateRotateMaxBatchSize { + break + } + } + } + r.mu.Unlock() + + if err := r.batchUpdateNodes(toUpdate); err != nil { + log.G(r.ctx).WithError(err).Errorf("store error when trying to batch update %d nodes to request certificate rotation", len(toUpdate)) + } + } + + select { + case <-ctx.Done(): + return + case <-time.After(r.batchUpdateInterval): + } + } +} + +// This function assumes that the expected root CA has root rotation. This is intended to be used by +// `reconcileNodeRootsAndCerts`, which uses the root CA from the `lastSeenClusterRootCA`, and checks +// that it has a root rotation before calling this function. +func (r *rootRotationReconciler) finishRootRotation(tx store.Tx, expectedRootCA *api.RootCA) error { + cluster := store.GetCluster(tx, r.clusterID) + if cluster == nil { + return fmt.Errorf("unable to get cluster %s", r.clusterID) + } + + // If the RootCA object has changed (because another root rotation was started or because some other node + // had finished the root rotation), we cannot finish the root rotation that we were working on. + if !equality.RootCAEqualStable(expectedRootCA, &cluster.RootCA) { + return errRootRotationChanged + } + + var signerCert []byte + if len(cluster.RootCA.RootRotation.CAKey) > 0 { + signerCert = cluster.RootCA.RootRotation.CACert + } + // we don't actually have to parse out the default node expiration from the cluster - we are just using + // the ca.RootCA object to generate new tokens and the digest + updatedRootCA, err := NewRootCA(cluster.RootCA.RootRotation.CACert, signerCert, cluster.RootCA.RootRotation.CAKey, + DefaultNodeCertExpiration, nil) + if err != nil { + return errors.Wrap(err, "invalid cluster root rotation object") + } + cluster.RootCA = api.RootCA{ + CACert: cluster.RootCA.RootRotation.CACert, + CAKey: cluster.RootCA.RootRotation.CAKey, + CACertHash: updatedRootCA.Digest.String(), + JoinTokens: api.JoinTokens{ + Worker: GenerateJoinToken(&updatedRootCA), + Manager: GenerateJoinToken(&updatedRootCA), + }, + LastForcedRotation: cluster.RootCA.LastForcedRotation, + } + return store.UpdateCluster(tx, cluster) +} + +func (r *rootRotationReconciler) batchUpdateNodes(toUpdate []*api.Node) error { + if len(toUpdate) == 0 { + return nil + } + _, err := r.store.Batch(func(batch *store.Batch) error { + // Directly update the nodes rather than get + update, and ignore version errors. Since + // `rootRotationReconciler` should be hooked up to all node update/delete/create events, we should have + // close to the latest versions of all the nodes. If not, the node will updated later and the + // next batch of updates should catch it. + for _, n := range toUpdate { + if err := batch.Update(func(tx store.Tx) error { + return store.UpdateNode(tx, n) + }); err != nil && err != store.ErrSequenceConflict { + log.G(r.ctx).WithError(err).Errorf("unable to update node %s to request a certificate rotation", n.ID) + } + } + return nil + }) + return err +} diff --git a/vendor/github.com/docker/swarmkit/ca/server.go b/vendor/github.com/docker/swarmkit/ca/server.go index f3fa9cb6c8..982b0e2e83 100644 --- a/vendor/github.com/docker/swarmkit/ca/server.go +++ b/vendor/github.com/docker/swarmkit/ca/server.go @@ -22,6 +22,7 @@ import ( const ( defaultReconciliationRetryInterval = 10 * time.Second + defaultRootReconciliationInterval = 3 * time.Second ) // APISecurityConfigUpdater knows how to update a SecurityConfig from an api.Cluster object @@ -63,6 +64,10 @@ type Server struct { // before we update the security config with the new root CA, we need to be able to save the root certs rootPaths CertPaths + + // lets us monitor and finish root rotations + rootReconciler *rootRotationReconciler + rootReconciliationRetryInterval time.Duration } // DefaultCAConfig returns the default CA Config, with a default expiration. @@ -75,12 +80,13 @@ func DefaultCAConfig() api.CAConfig { // NewServer creates a CA API server. func NewServer(store *store.MemoryStore, securityConfig *SecurityConfig, rootCAPaths CertPaths) *Server { return &Server{ - store: store, - securityConfig: securityConfig, - pending: make(map[string]*api.Node), - started: make(chan struct{}), - reconciliationRetryInterval: defaultReconciliationRetryInterval, - rootPaths: rootCAPaths, + store: store, + securityConfig: securityConfig, + pending: make(map[string]*api.Node), + started: make(chan struct{}), + reconciliationRetryInterval: defaultReconciliationRetryInterval, + rootReconciliationRetryInterval: defaultRootReconciliationInterval, + rootPaths: rootCAPaths, } } @@ -90,6 +96,12 @@ func (s *Server) SetReconciliationRetryInterval(reconciliationRetryInterval time s.reconciliationRetryInterval = reconciliationRetryInterval } +// SetRootReconciliationInterval changes the time interval between root rotation +// reconciliation attempts. This function must be called before Run. +func (s *Server) SetRootReconciliationInterval(interval time.Duration) { + s.rootReconciliationRetryInterval = interval +} + // GetUnlockKey is responsible for returning the current unlock key used for encrypting TLS private keys and // other at rest data. Access to this RPC call should only be allowed via mutual TLS from managers. func (s *Server) GetUnlockKey(ctx context.Context, request *api.GetUnlockKeyRequest) (*api.GetUnlockKeyResponse, error) { @@ -395,14 +407,28 @@ func (s *Server) Run(ctx context.Context) error { return errors.New("CA signer is already running") } s.wg.Add(1) + s.ctx, s.cancel = context.WithCancel(log.WithModule(ctx, "ca")) + ctx = s.ctx + // we need to set it on the server, because `Server.UpdateRootCA` can be called from outside the Run function + s.rootReconciler = &rootRotationReconciler{ + ctx: log.WithField(ctx, "method", "(*Server).rootRotationReconciler"), + clusterID: s.securityConfig.ClientTLSCreds.Organization(), + store: s.store, + batchUpdateInterval: s.rootReconciliationRetryInterval, + } + rootReconciler := s.rootReconciler s.mu.Unlock() - defer s.wg.Done() - ctx = log.WithModule(ctx, "ca") + defer func() { + s.mu.Lock() + s.rootReconciler = nil + s.mu.Unlock() + }() // Retrieve the channels to keep track of changes in the cluster // Retrieve all the currently registered nodes var nodes []*api.Node + updates, cancel, err := store.ViewAndWatch( s.store, func(readTx store.ReadTx) error { @@ -419,13 +445,12 @@ func (s *Server) Run(ctx context.Context) error { }, api.EventCreateNode{}, api.EventUpdateNode{}, + api.EventDeleteNode{}, ) // Do this after updateCluster has been called, so isRunning never // returns true without joinTokens being set correctly. s.mu.Lock() - s.ctx, s.cancel = context.WithCancel(ctx) - ctx = s.ctx close(s.started) s.mu.Unlock() @@ -464,13 +489,18 @@ func (s *Server) Run(ctx context.Context) error { switch v := event.(type) { case api.EventCreateNode: s.evaluateAndSignNodeCert(ctx, v.Node) + rootReconciler.UpdateNode(v.Node) case api.EventUpdateNode: // If this certificate is already at a final state // no need to evaluate and sign it. if !isFinalState(v.Node.Certificate.Status) { s.evaluateAndSignNodeCert(ctx, v.Node) } + rootReconciler.UpdateNode(v.Node) + case api.EventDeleteNode: + rootReconciler.DeleteNode(v.Node) } + case <-ticker.C: for _, node := range s.pending { if err := s.evaluateAndSignNodeCert(ctx, node); err != nil { @@ -541,12 +571,16 @@ func (s *Server) isRunning() bool { func (s *Server) UpdateRootCA(ctx context.Context, cluster *api.Cluster) error { s.mu.Lock() s.joinTokens = cluster.RootCA.JoinTokens.Copy() + reconciler := s.rootReconciler s.mu.Unlock() + rCA := cluster.RootCA.Copy() + if reconciler != nil { + reconciler.UpdateRootCA(rCA) + } s.secConfigMu.Lock() defer s.secConfigMu.Unlock() - rCA := cluster.RootCA - rootCAChanged := len(rCA.CACert) != 0 && !equality.RootCAEqualStable(s.lastSeenClusterRootCA, &cluster.RootCA) + rootCAChanged := len(rCA.CACert) != 0 && !equality.RootCAEqualStable(s.lastSeenClusterRootCA, rCA) externalCAChanged := !equality.ExternalCAsEqualStable(s.lastSeenExternalCAs, cluster.Spec.CAConfig.ExternalCAs) logger := log.G(ctx).WithFields(logrus.Fields{ "cluster.id": cluster.ID, @@ -581,7 +615,6 @@ func (s *Server) UpdateRootCA(ctx context.Context, cluster *api.Cluster) error { if signingKey == nil { signingCert = nil } - updatedRootCA, err := NewRootCA(rCA.CACert, signingCert, signingKey, expiry, intermediates) if err != nil { return errors.Wrap(err, "invalid Root CA object in cluster") @@ -604,7 +637,7 @@ func (s *Server) UpdateRootCA(ctx context.Context, cluster *api.Cluster) error { } // only update the server cache if we've successfully updated the root CA logger.Debug("Root CA updated successfully") - s.lastSeenClusterRootCA = cluster.RootCA.Copy() + s.lastSeenClusterRootCA = rCA } // we want to update if the external CA changed, or if the root CA changed because the root CA could affect what diff --git a/vendor/github.com/docker/swarmkit/manager/controlapi/ca_rotation.go b/vendor/github.com/docker/swarmkit/manager/controlapi/ca_rotation.go index acc0da5182..41e9a4d971 100644 --- a/vendor/github.com/docker/swarmkit/manager/controlapi/ca_rotation.go +++ b/vendor/github.com/docker/swarmkit/manager/controlapi/ca_rotation.go @@ -148,14 +148,16 @@ func validateHasAtLeastOneExternalCA(ctx context.Context, externalCAs map[string // validates that the list of external CAs have valid certs associated with them, and produce a mapping of subject/pubkey:external // for later validation of required external CAs -func getNormalizedExtCAs(caConfig *api.CAConfig) (map[string][]*api.ExternalCA, error) { +func getNormalizedExtCAs(caConfig *api.CAConfig, normalizedCurrentRootCACert []byte) (map[string][]*api.ExternalCA, error) { extCAs := make(map[string][]*api.ExternalCA) for _, extCA := range caConfig.ExternalCAs { - if len(extCA.CACert) == 0 { - return nil, grpc.Errorf(codes.InvalidArgument, "must specify CA certificate for each external CA") + associatedCert := normalizedCurrentRootCACert + // if no associated cert is provided, assume it's the current root cert + if len(extCA.CACert) > 0 { + associatedCert = ca.NormalizePEMs(extCA.CACert) } - certKey := string(ca.NormalizePEMs(extCA.CACert)) + certKey := string(associatedCert) extCAs[certKey] = append(extCAs[certKey], extCA) } @@ -191,12 +193,12 @@ func validateCAConfig(ctx context.Context, securityConfig *ca.SecurityConfig, cl return nil, grpc.Errorf(codes.InvalidArgument, "if a signing CA key is provided, the signing CA cert must also be provided") } - extCAs, err := getNormalizedExtCAs(newConfig) // validate that the list of external CAs is not malformed + normalizedRootCA := ca.NormalizePEMs(cluster.RootCA.CACert) + extCAs, err := getNormalizedExtCAs(newConfig, normalizedRootCA) // validate that the list of external CAs is not malformed if err != nil { return nil, err } - normalizedRootCA := ca.NormalizePEMs(cluster.RootCA.CACert) var oldCertExtCAs []*api.ExternalCA if !hasSigningKey(&cluster.RootCA) { oldCertExtCAs, err = validateHasAtLeastOneExternalCA(ctx, extCAs, securityConfig, normalizedRootCA, "current") diff --git a/vendor/github.com/docker/swarmkit/manager/manager.go b/vendor/github.com/docker/swarmkit/manager/manager.go index 7b436b16f2..026be7df25 100644 --- a/vendor/github.com/docker/swarmkit/manager/manager.go +++ b/vendor/github.com/docker/swarmkit/manager/manager.go @@ -14,6 +14,7 @@ import ( "time" "github.com/Sirupsen/logrus" + "github.com/cloudflare/cfssl/helpers" "github.com/docker/docker/pkg/plugingetter" "github.com/docker/go-events" "github.com/docker/swarmkit/api" @@ -776,22 +777,29 @@ func (m *Manager) rotateRootCAKEK(ctx context.Context, clusterID string) error { } if x509.IsEncryptedPEMBlock(keyBlock) { - // This key is already encrypted, let's try to decrypt with the current main passphrase - _, err = x509.DecryptPEMBlock(keyBlock, []byte(passphrase)) + // PEM encryption does not have a digest, so sometimes decryption doesn't + // error even with the wrong passphrase. So actually try to parse it into a valid key. + _, err := helpers.ParsePrivateKeyPEMWithPassword(privKeyPEM, []byte(passphrase)) if err == nil { - // The main key is the correct KEK, nothing to do here + // This key is already correctly encrypted with the correct KEK, nothing to do here return nil } + // This key is already encrypted, but failed with current main passphrase. - // Let's try to decrypt with the previous passphrase - unencryptedKey, err := x509.DecryptPEMBlock(keyBlock, []byte(passphrasePrev)) + // Let's try to decrypt with the previous passphrase, and parse into a valid key, for the + // same reason as above. + _, err = helpers.ParsePrivateKeyPEMWithPassword(privKeyPEM, []byte(passphrasePrev)) if err != nil { // We were not able to decrypt either with the main or backup passphrase, error return err } + // ok the above passphrase is correct, so decrypt the PEM block so we can re-encrypt - + // since the key was successfully decrypted above, there will be no error doing PEM + // decryption + unencryptedDER, _ := x509.DecryptPEMBlock(keyBlock, []byte(passphrasePrev)) unencryptedKeyBlock := &pem.Block{ Type: keyBlock.Type, - Bytes: unencryptedKey, + Bytes: unencryptedDER, } // we were able to decrypt the key with the previous passphrase - if the current passphrase is empty, diff --git a/vendor/github.com/docker/swarmkit/manager/orchestrator/global/global.go b/vendor/github.com/docker/swarmkit/manager/orchestrator/global/global.go index 58a3a365de..17b6c04318 100644 --- a/vendor/github.com/docker/swarmkit/manager/orchestrator/global/global.go +++ b/vendor/github.com/docker/swarmkit/manager/orchestrator/global/global.go @@ -297,8 +297,9 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin updates := make(map[*api.Service][]orchestrator.Slot) _, err := g.store.Batch(func(batch *store.Batch) error { - var updateTasks []orchestrator.Slot for _, serviceID := range serviceIDs { + var updateTasks []orchestrator.Slot + if _, exists := nodeTasks[serviceID]; !exists { continue } @@ -352,7 +353,6 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin for service, updateTasks := range updates { g.updater.Update(ctx, g.cluster, service, updateTasks) } - } // updateNode updates g.nodes based on the current node value