cluster.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. package controlapi
  2. import (
  3. "strings"
  4. "time"
  5. "github.com/docker/swarmkit/api"
  6. "github.com/docker/swarmkit/ca"
  7. "github.com/docker/swarmkit/log"
  8. "github.com/docker/swarmkit/manager/encryption"
  9. "github.com/docker/swarmkit/manager/state/store"
  10. gogotypes "github.com/gogo/protobuf/types"
  11. "golang.org/x/net/context"
  12. "google.golang.org/grpc"
  13. "google.golang.org/grpc/codes"
  14. )
  15. const (
  16. // expiredCertGrace is the amount of time to keep a node in the
  17. // blacklist beyond its certificate expiration timestamp.
  18. expiredCertGrace = 24 * time.Hour * 7
  19. )
  20. func validateClusterSpec(spec *api.ClusterSpec) error {
  21. if spec == nil {
  22. return grpc.Errorf(codes.InvalidArgument, errInvalidArgument.Error())
  23. }
  24. // Validate that expiry time being provided is valid, and over our minimum
  25. if spec.CAConfig.NodeCertExpiry != nil {
  26. expiry, err := gogotypes.DurationFromProto(spec.CAConfig.NodeCertExpiry)
  27. if err != nil {
  28. return grpc.Errorf(codes.InvalidArgument, errInvalidArgument.Error())
  29. }
  30. if expiry < ca.MinNodeCertExpiration {
  31. return grpc.Errorf(codes.InvalidArgument, "minimum certificate expiry time is: %s", ca.MinNodeCertExpiration)
  32. }
  33. }
  34. // Validate that AcceptancePolicies only include Secrets that are bcrypted
  35. // TODO(diogo): Add a global list of acceptance algorithms. We only support bcrypt for now.
  36. if len(spec.AcceptancePolicy.Policies) > 0 {
  37. for _, policy := range spec.AcceptancePolicy.Policies {
  38. if policy.Secret != nil && strings.ToLower(policy.Secret.Alg) != "bcrypt" {
  39. return grpc.Errorf(codes.InvalidArgument, "hashing algorithm is not supported: %s", policy.Secret.Alg)
  40. }
  41. }
  42. }
  43. // Validate that heartbeatPeriod time being provided is valid
  44. if spec.Dispatcher.HeartbeatPeriod != nil {
  45. heartbeatPeriod, err := gogotypes.DurationFromProto(spec.Dispatcher.HeartbeatPeriod)
  46. if err != nil {
  47. return grpc.Errorf(codes.InvalidArgument, errInvalidArgument.Error())
  48. }
  49. if heartbeatPeriod < 0 {
  50. return grpc.Errorf(codes.InvalidArgument, "heartbeat time period cannot be a negative duration")
  51. }
  52. }
  53. return nil
  54. }
  55. // GetCluster returns a Cluster given a ClusterID.
  56. // - Returns `InvalidArgument` if ClusterID is not provided.
  57. // - Returns `NotFound` if the Cluster is not found.
  58. func (s *Server) GetCluster(ctx context.Context, request *api.GetClusterRequest) (*api.GetClusterResponse, error) {
  59. if request.ClusterID == "" {
  60. return nil, grpc.Errorf(codes.InvalidArgument, errInvalidArgument.Error())
  61. }
  62. var cluster *api.Cluster
  63. s.store.View(func(tx store.ReadTx) {
  64. cluster = store.GetCluster(tx, request.ClusterID)
  65. })
  66. if cluster == nil {
  67. return nil, grpc.Errorf(codes.NotFound, "cluster %s not found", request.ClusterID)
  68. }
  69. redactedClusters := redactClusters([]*api.Cluster{cluster})
  70. // WARN: we should never return cluster here. We need to redact the private fields first.
  71. return &api.GetClusterResponse{
  72. Cluster: redactedClusters[0],
  73. }, nil
  74. }
  75. // UpdateCluster updates a Cluster referenced by ClusterID with the given ClusterSpec.
  76. // - Returns `NotFound` if the Cluster is not found.
  77. // - Returns `InvalidArgument` if the ClusterSpec is malformed.
  78. // - Returns `Unimplemented` if the ClusterSpec references unimplemented features.
  79. // - Returns an error if the update fails.
  80. func (s *Server) UpdateCluster(ctx context.Context, request *api.UpdateClusterRequest) (*api.UpdateClusterResponse, error) {
  81. if request.ClusterID == "" || request.ClusterVersion == nil {
  82. return nil, grpc.Errorf(codes.InvalidArgument, errInvalidArgument.Error())
  83. }
  84. if err := validateClusterSpec(request.Spec); err != nil {
  85. return nil, err
  86. }
  87. var cluster *api.Cluster
  88. err := s.store.Update(func(tx store.Tx) error {
  89. cluster = store.GetCluster(tx, request.ClusterID)
  90. if cluster == nil {
  91. return grpc.Errorf(codes.NotFound, "cluster %s not found", request.ClusterID)
  92. }
  93. // This ensures that we always have the latest security config, so our ca.SecurityConfig.RootCA and
  94. // ca.SecurityConfig.externalCA objects are up-to-date with the current api.Cluster.RootCA and
  95. // api.Cluster.Spec.ExternalCA objects, respectively. Note that if, during this update, the cluster gets
  96. // updated again with different CA info and the security config gets changed under us, that's still fine because
  97. // this cluster update would fail anyway due to its version being too low on write.
  98. if err := s.scu.UpdateRootCA(ctx, cluster); err != nil {
  99. log.G(ctx).WithField(
  100. "method", "(*controlapi.Server).UpdateCluster").WithError(err).Error("could not update security config")
  101. return grpc.Errorf(codes.Internal, "could not update security config")
  102. }
  103. rootCA := s.securityConfig.RootCA()
  104. cluster.Meta.Version = *request.ClusterVersion
  105. cluster.Spec = *request.Spec.Copy()
  106. expireBlacklistedCerts(cluster)
  107. if request.Rotation.WorkerJoinToken {
  108. cluster.RootCA.JoinTokens.Worker = ca.GenerateJoinToken(rootCA)
  109. }
  110. if request.Rotation.ManagerJoinToken {
  111. cluster.RootCA.JoinTokens.Manager = ca.GenerateJoinToken(rootCA)
  112. }
  113. updatedRootCA, err := validateCAConfig(ctx, s.securityConfig, cluster)
  114. if err != nil {
  115. return err
  116. }
  117. cluster.RootCA = *updatedRootCA
  118. var unlockKeys []*api.EncryptionKey
  119. var managerKey *api.EncryptionKey
  120. for _, eKey := range cluster.UnlockKeys {
  121. if eKey.Subsystem == ca.ManagerRole {
  122. if !cluster.Spec.EncryptionConfig.AutoLockManagers {
  123. continue
  124. }
  125. managerKey = eKey
  126. }
  127. unlockKeys = append(unlockKeys, eKey)
  128. }
  129. switch {
  130. case !cluster.Spec.EncryptionConfig.AutoLockManagers:
  131. break
  132. case managerKey == nil:
  133. unlockKeys = append(unlockKeys, &api.EncryptionKey{
  134. Subsystem: ca.ManagerRole,
  135. Key: encryption.GenerateSecretKey(),
  136. })
  137. case request.Rotation.ManagerUnlockKey:
  138. managerKey.Key = encryption.GenerateSecretKey()
  139. }
  140. cluster.UnlockKeys = unlockKeys
  141. return store.UpdateCluster(tx, cluster)
  142. })
  143. if err != nil {
  144. return nil, err
  145. }
  146. redactedClusters := redactClusters([]*api.Cluster{cluster})
  147. // WARN: we should never return cluster here. We need to redact the private fields first.
  148. return &api.UpdateClusterResponse{
  149. Cluster: redactedClusters[0],
  150. }, nil
  151. }
  152. func filterClusters(candidates []*api.Cluster, filters ...func(*api.Cluster) bool) []*api.Cluster {
  153. result := []*api.Cluster{}
  154. for _, c := range candidates {
  155. match := true
  156. for _, f := range filters {
  157. if !f(c) {
  158. match = false
  159. break
  160. }
  161. }
  162. if match {
  163. result = append(result, c)
  164. }
  165. }
  166. return result
  167. }
  168. // ListClusters returns a list of all clusters.
  169. func (s *Server) ListClusters(ctx context.Context, request *api.ListClustersRequest) (*api.ListClustersResponse, error) {
  170. var (
  171. clusters []*api.Cluster
  172. err error
  173. )
  174. s.store.View(func(tx store.ReadTx) {
  175. switch {
  176. case request.Filters != nil && len(request.Filters.Names) > 0:
  177. clusters, err = store.FindClusters(tx, buildFilters(store.ByName, request.Filters.Names))
  178. case request.Filters != nil && len(request.Filters.NamePrefixes) > 0:
  179. clusters, err = store.FindClusters(tx, buildFilters(store.ByNamePrefix, request.Filters.NamePrefixes))
  180. case request.Filters != nil && len(request.Filters.IDPrefixes) > 0:
  181. clusters, err = store.FindClusters(tx, buildFilters(store.ByIDPrefix, request.Filters.IDPrefixes))
  182. default:
  183. clusters, err = store.FindClusters(tx, store.All)
  184. }
  185. })
  186. if err != nil {
  187. return nil, err
  188. }
  189. if request.Filters != nil {
  190. clusters = filterClusters(clusters,
  191. func(e *api.Cluster) bool {
  192. return filterContains(e.Spec.Annotations.Name, request.Filters.Names)
  193. },
  194. func(e *api.Cluster) bool {
  195. return filterContainsPrefix(e.Spec.Annotations.Name, request.Filters.NamePrefixes)
  196. },
  197. func(e *api.Cluster) bool {
  198. return filterContainsPrefix(e.ID, request.Filters.IDPrefixes)
  199. },
  200. func(e *api.Cluster) bool {
  201. return filterMatchLabels(e.Spec.Annotations.Labels, request.Filters.Labels)
  202. },
  203. )
  204. }
  205. // WARN: we should never return cluster here. We need to redact the private fields first.
  206. return &api.ListClustersResponse{
  207. Clusters: redactClusters(clusters),
  208. }, nil
  209. }
  210. // redactClusters is a method that enforces a whitelist of fields that are ok to be
  211. // returned in the Cluster object. It should filter out all sensitive information.
  212. func redactClusters(clusters []*api.Cluster) []*api.Cluster {
  213. var redactedClusters []*api.Cluster
  214. // Only add public fields to the new clusters
  215. for _, cluster := range clusters {
  216. // Copy all the mandatory fields
  217. // Do not copy secret keys
  218. redactedSpec := cluster.Spec.Copy()
  219. redactedSpec.CAConfig.SigningCAKey = nil
  220. redactedRootCA := cluster.RootCA.Copy()
  221. redactedRootCA.CAKey = nil
  222. if r := redactedRootCA.RootRotation; r != nil {
  223. r.CAKey = nil
  224. }
  225. newCluster := &api.Cluster{
  226. ID: cluster.ID,
  227. Meta: cluster.Meta,
  228. Spec: *redactedSpec,
  229. RootCA: *redactedRootCA,
  230. BlacklistedCertificates: cluster.BlacklistedCertificates,
  231. }
  232. redactedClusters = append(redactedClusters, newCluster)
  233. }
  234. return redactedClusters
  235. }
  236. func expireBlacklistedCerts(cluster *api.Cluster) {
  237. nowMinusGrace := time.Now().Add(-expiredCertGrace)
  238. for cn, blacklistedCert := range cluster.BlacklistedCertificates {
  239. if blacklistedCert.Expiry == nil {
  240. continue
  241. }
  242. expiry, err := gogotypes.TimestampFromProto(blacklistedCert.Expiry)
  243. if err == nil && nowMinusGrace.After(expiry) {
  244. delete(cluster.BlacklistedCertificates, cn)
  245. }
  246. }
  247. }