node.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. package controlapi
  2. import (
  3. "context"
  4. "crypto/x509"
  5. "encoding/pem"
  6. "github.com/docker/swarmkit/api"
  7. "github.com/docker/swarmkit/manager/state/raft/membership"
  8. "github.com/docker/swarmkit/manager/state/store"
  9. gogotypes "github.com/gogo/protobuf/types"
  10. "google.golang.org/grpc/codes"
  11. "google.golang.org/grpc/status"
  12. )
  13. func validateNodeSpec(spec *api.NodeSpec) error {
  14. if spec == nil {
  15. return status.Errorf(codes.InvalidArgument, errInvalidArgument.Error())
  16. }
  17. return nil
  18. }
  19. // GetNode returns a Node given a NodeID.
  20. // - Returns `InvalidArgument` if NodeID is not provided.
  21. // - Returns `NotFound` if the Node is not found.
  22. func (s *Server) GetNode(ctx context.Context, request *api.GetNodeRequest) (*api.GetNodeResponse, error) {
  23. if request.NodeID == "" {
  24. return nil, status.Errorf(codes.InvalidArgument, errInvalidArgument.Error())
  25. }
  26. var node *api.Node
  27. s.store.View(func(tx store.ReadTx) {
  28. node = store.GetNode(tx, request.NodeID)
  29. })
  30. if node == nil {
  31. return nil, status.Errorf(codes.NotFound, "node %s not found", request.NodeID)
  32. }
  33. if s.raft != nil {
  34. memberlist := s.raft.GetMemberlist()
  35. for _, member := range memberlist {
  36. if member.NodeID == node.ID {
  37. node.ManagerStatus = &api.ManagerStatus{
  38. RaftID: member.RaftID,
  39. Addr: member.Addr,
  40. Leader: member.Status.Leader,
  41. Reachability: member.Status.Reachability,
  42. }
  43. break
  44. }
  45. }
  46. }
  47. return &api.GetNodeResponse{
  48. Node: node,
  49. }, nil
  50. }
  51. func filterNodes(candidates []*api.Node, filters ...func(*api.Node) bool) []*api.Node {
  52. result := []*api.Node{}
  53. for _, c := range candidates {
  54. match := true
  55. for _, f := range filters {
  56. if !f(c) {
  57. match = false
  58. break
  59. }
  60. }
  61. if match {
  62. result = append(result, c)
  63. }
  64. }
  65. return result
  66. }
  67. // ListNodes returns a list of all nodes.
  68. func (s *Server) ListNodes(ctx context.Context, request *api.ListNodesRequest) (*api.ListNodesResponse, error) {
  69. var (
  70. nodes []*api.Node
  71. err error
  72. )
  73. s.store.View(func(tx store.ReadTx) {
  74. switch {
  75. case request.Filters != nil && len(request.Filters.Names) > 0:
  76. nodes, err = store.FindNodes(tx, buildFilters(store.ByName, request.Filters.Names))
  77. case request.Filters != nil && len(request.Filters.NamePrefixes) > 0:
  78. nodes, err = store.FindNodes(tx, buildFilters(store.ByNamePrefix, request.Filters.NamePrefixes))
  79. case request.Filters != nil && len(request.Filters.IDPrefixes) > 0:
  80. nodes, err = store.FindNodes(tx, buildFilters(store.ByIDPrefix, request.Filters.IDPrefixes))
  81. case request.Filters != nil && len(request.Filters.Roles) > 0:
  82. filters := make([]store.By, 0, len(request.Filters.Roles))
  83. for _, v := range request.Filters.Roles {
  84. filters = append(filters, store.ByRole(v))
  85. }
  86. nodes, err = store.FindNodes(tx, store.Or(filters...))
  87. case request.Filters != nil && len(request.Filters.Memberships) > 0:
  88. filters := make([]store.By, 0, len(request.Filters.Memberships))
  89. for _, v := range request.Filters.Memberships {
  90. filters = append(filters, store.ByMembership(v))
  91. }
  92. nodes, err = store.FindNodes(tx, store.Or(filters...))
  93. default:
  94. nodes, err = store.FindNodes(tx, store.All)
  95. }
  96. })
  97. if err != nil {
  98. return nil, err
  99. }
  100. if request.Filters != nil {
  101. nodes = filterNodes(nodes,
  102. func(e *api.Node) bool {
  103. if len(request.Filters.Names) == 0 {
  104. return true
  105. }
  106. if e.Description == nil {
  107. return false
  108. }
  109. return filterContains(e.Description.Hostname, request.Filters.Names)
  110. },
  111. func(e *api.Node) bool {
  112. if len(request.Filters.NamePrefixes) == 0 {
  113. return true
  114. }
  115. if e.Description == nil {
  116. return false
  117. }
  118. return filterContainsPrefix(e.Description.Hostname, request.Filters.NamePrefixes)
  119. },
  120. func(e *api.Node) bool {
  121. return filterContainsPrefix(e.ID, request.Filters.IDPrefixes)
  122. },
  123. func(e *api.Node) bool {
  124. if len(request.Filters.Labels) == 0 {
  125. return true
  126. }
  127. if e.Description == nil {
  128. return false
  129. }
  130. return filterMatchLabels(e.Description.Engine.Labels, request.Filters.Labels)
  131. },
  132. func(e *api.Node) bool {
  133. if len(request.Filters.NodeLabels) == 0 {
  134. return true
  135. }
  136. return filterMatchLabels(e.Spec.Annotations.Labels, request.Filters.NodeLabels)
  137. },
  138. func(e *api.Node) bool {
  139. if len(request.Filters.Roles) == 0 {
  140. return true
  141. }
  142. for _, c := range request.Filters.Roles {
  143. if c == e.Role {
  144. return true
  145. }
  146. }
  147. return false
  148. },
  149. func(e *api.Node) bool {
  150. if len(request.Filters.Memberships) == 0 {
  151. return true
  152. }
  153. for _, c := range request.Filters.Memberships {
  154. if c == e.Spec.Membership {
  155. return true
  156. }
  157. }
  158. return false
  159. },
  160. )
  161. }
  162. // Add in manager information on nodes that are managers
  163. if s.raft != nil {
  164. memberlist := s.raft.GetMemberlist()
  165. for _, node := range nodes {
  166. for _, member := range memberlist {
  167. if member.NodeID == node.ID {
  168. node.ManagerStatus = &api.ManagerStatus{
  169. RaftID: member.RaftID,
  170. Addr: member.Addr,
  171. Leader: member.Status.Leader,
  172. Reachability: member.Status.Reachability,
  173. }
  174. break
  175. }
  176. }
  177. }
  178. }
  179. return &api.ListNodesResponse{
  180. Nodes: nodes,
  181. }, nil
  182. }
  183. // UpdateNode updates a Node referenced by NodeID with the given NodeSpec.
  184. // - Returns `NotFound` if the Node is not found.
  185. // - Returns `InvalidArgument` if the NodeSpec is malformed.
  186. // - Returns an error if the update fails.
  187. func (s *Server) UpdateNode(ctx context.Context, request *api.UpdateNodeRequest) (*api.UpdateNodeResponse, error) {
  188. if request.NodeID == "" || request.NodeVersion == nil {
  189. return nil, status.Errorf(codes.InvalidArgument, errInvalidArgument.Error())
  190. }
  191. if err := validateNodeSpec(request.Spec); err != nil {
  192. return nil, err
  193. }
  194. var (
  195. node *api.Node
  196. member *membership.Member
  197. )
  198. err := s.store.Update(func(tx store.Tx) error {
  199. node = store.GetNode(tx, request.NodeID)
  200. if node == nil {
  201. return status.Errorf(codes.NotFound, "node %s not found", request.NodeID)
  202. }
  203. // Demotion sanity checks.
  204. if node.Spec.DesiredRole == api.NodeRoleManager && request.Spec.DesiredRole == api.NodeRoleWorker {
  205. // Check for manager entries in Store.
  206. managers, err := store.FindNodes(tx, store.ByRole(api.NodeRoleManager))
  207. if err != nil {
  208. return status.Errorf(codes.Internal, "internal store error: %v", err)
  209. }
  210. if len(managers) == 1 && managers[0].ID == node.ID {
  211. return status.Errorf(codes.FailedPrecondition, "attempting to demote the last manager of the swarm")
  212. }
  213. // Check for node in memberlist
  214. if member = s.raft.GetMemberByNodeID(request.NodeID); member == nil {
  215. return status.Errorf(codes.NotFound, "can't find manager in raft memberlist")
  216. }
  217. // Quorum safeguard
  218. if !s.raft.CanRemoveMember(member.RaftID) {
  219. return status.Errorf(codes.FailedPrecondition, "can't remove member from the raft: this would result in a loss of quorum")
  220. }
  221. }
  222. node.Meta.Version = *request.NodeVersion
  223. node.Spec = *request.Spec.Copy()
  224. return store.UpdateNode(tx, node)
  225. })
  226. if err != nil {
  227. return nil, err
  228. }
  229. return &api.UpdateNodeResponse{
  230. Node: node,
  231. }, nil
  232. }
  233. func orphanNodeTasks(tx store.Tx, nodeID string) error {
  234. // when a node is deleted, all of its tasks are irrecoverably removed.
  235. // additionally, the Dispatcher can no longer be relied on to update the
  236. // task status. Therefore, when the node is removed, we must additionally
  237. // move all of its assigned tasks to the Orphaned state, so that their
  238. // resources can be cleaned up.
  239. tasks, err := store.FindTasks(tx, store.ByNodeID(nodeID))
  240. if err != nil {
  241. return err
  242. }
  243. for _, task := range tasks {
  244. // this operation must occur within the same transaction boundary. If
  245. // we cannot accomplish this task orphaning in the same transaction, we
  246. // could crash or die between transactions and not get a chance to do
  247. // this. however, in cases were there is an exceptionally large number
  248. // of tasks for a node, this may cause the transaction to exceed the
  249. // max message size.
  250. //
  251. // therefore, we restrict updating to only tasks in a non-terminal
  252. // state. Tasks in a terminal state do not need to be updated.
  253. if task.Status.State < api.TaskStateCompleted {
  254. task.Status = api.TaskStatus{
  255. Timestamp: gogotypes.TimestampNow(),
  256. State: api.TaskStateOrphaned,
  257. Message: "Task belonged to a node that has been deleted",
  258. }
  259. store.UpdateTask(tx, task)
  260. }
  261. }
  262. return nil
  263. }
  264. // RemoveNode removes a Node referenced by NodeID with the given NodeSpec.
  265. // - Returns NotFound if the Node is not found.
  266. // - Returns FailedPrecondition if the Node has manager role (and is part of the memberlist) or is not shut down.
  267. // - Returns InvalidArgument if NodeID or NodeVersion is not valid.
  268. // - Returns an error if the delete fails.
  269. func (s *Server) RemoveNode(ctx context.Context, request *api.RemoveNodeRequest) (*api.RemoveNodeResponse, error) {
  270. if request.NodeID == "" {
  271. return nil, status.Errorf(codes.InvalidArgument, errInvalidArgument.Error())
  272. }
  273. err := s.store.Update(func(tx store.Tx) error {
  274. node := store.GetNode(tx, request.NodeID)
  275. if node == nil {
  276. return status.Errorf(codes.NotFound, "node %s not found", request.NodeID)
  277. }
  278. if node.Spec.DesiredRole == api.NodeRoleManager {
  279. if s.raft == nil {
  280. return status.Errorf(codes.FailedPrecondition, "node %s is a manager but cannot access node information from the raft memberlist", request.NodeID)
  281. }
  282. if member := s.raft.GetMemberByNodeID(request.NodeID); member != nil {
  283. return status.Errorf(codes.FailedPrecondition, "node %s is a cluster manager and is a member of the raft cluster. It must be demoted to worker before removal", request.NodeID)
  284. }
  285. }
  286. if !request.Force && node.Status.State == api.NodeStatus_READY {
  287. return status.Errorf(codes.FailedPrecondition, "node %s is not down and can't be removed", request.NodeID)
  288. }
  289. // lookup the cluster
  290. clusters, err := store.FindClusters(tx, store.ByName(store.DefaultClusterName))
  291. if err != nil {
  292. return err
  293. }
  294. if len(clusters) != 1 {
  295. return status.Errorf(codes.Internal, "could not fetch cluster object")
  296. }
  297. cluster := clusters[0]
  298. blacklistedCert := &api.BlacklistedCertificate{}
  299. // Set an expiry time for this RemovedNode if a certificate
  300. // exists and can be parsed.
  301. if len(node.Certificate.Certificate) != 0 {
  302. certBlock, _ := pem.Decode(node.Certificate.Certificate)
  303. if certBlock != nil {
  304. X509Cert, err := x509.ParseCertificate(certBlock.Bytes)
  305. if err == nil && !X509Cert.NotAfter.IsZero() {
  306. expiry, err := gogotypes.TimestampProto(X509Cert.NotAfter)
  307. if err == nil {
  308. blacklistedCert.Expiry = expiry
  309. }
  310. }
  311. }
  312. }
  313. if cluster.BlacklistedCertificates == nil {
  314. cluster.BlacklistedCertificates = make(map[string]*api.BlacklistedCertificate)
  315. }
  316. cluster.BlacklistedCertificates[node.ID] = blacklistedCert
  317. expireBlacklistedCerts(cluster)
  318. if err := store.UpdateCluster(tx, cluster); err != nil {
  319. return err
  320. }
  321. if err := orphanNodeTasks(tx, request.NodeID); err != nil {
  322. return err
  323. }
  324. return store.DeleteNode(tx, request.NodeID)
  325. })
  326. if err != nil {
  327. return nil, err
  328. }
  329. return &api.RemoveNodeResponse{}, nil
  330. }