volume.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. package controlapi
  2. import (
  3. "context"
  4. "strings"
  5. "github.com/docker/swarmkit/api"
  6. "github.com/docker/swarmkit/identity"
  7. "github.com/docker/swarmkit/manager/state/store"
  8. "google.golang.org/grpc/codes"
  9. "google.golang.org/grpc/status"
  10. )
  11. func (s *Server) CreateVolume(ctx context.Context, request *api.CreateVolumeRequest) (*api.CreateVolumeResponse, error) {
  12. if request.Spec == nil {
  13. return nil, status.Errorf(codes.InvalidArgument, "spec must not be nil")
  14. }
  15. // validate the volume spec
  16. if request.Spec.Driver == nil {
  17. return nil, status.Errorf(codes.InvalidArgument, "driver must be specified")
  18. }
  19. if request.Spec.Annotations.Name == "" {
  20. return nil, status.Errorf(codes.InvalidArgument, "meta: name must be provided")
  21. }
  22. if request.Spec.AccessMode == nil {
  23. return nil, status.Errorf(codes.InvalidArgument, "AccessMode must not be nil")
  24. }
  25. if request.Spec.AccessMode.GetAccessType() == nil {
  26. return nil, status.Errorf(codes.InvalidArgument, "Volume AccessMode must specify either Mount or Block access type")
  27. }
  28. volume := &api.Volume{
  29. ID: identity.NewID(),
  30. Spec: *request.Spec,
  31. }
  32. err := s.store.Update(func(tx store.Tx) error {
  33. // check all secrets, so that we can return an error indicating ALL
  34. // missing secrets, instead of just the first one.
  35. var missingSecrets []string
  36. for _, secret := range volume.Spec.Secrets {
  37. s := store.GetSecret(tx, secret.Secret)
  38. if s == nil {
  39. missingSecrets = append(missingSecrets, secret.Secret)
  40. }
  41. }
  42. if len(missingSecrets) > 0 {
  43. secretStr := "secrets"
  44. if len(missingSecrets) == 1 {
  45. secretStr = "secret"
  46. }
  47. return status.Errorf(codes.InvalidArgument, "%s not found: %v", secretStr, strings.Join(missingSecrets, ", "))
  48. }
  49. return store.CreateVolume(tx, volume)
  50. })
  51. if err != nil {
  52. return nil, err
  53. }
  54. return &api.CreateVolumeResponse{
  55. Volume: volume,
  56. }, nil
  57. }
  58. func (s *Server) UpdateVolume(ctx context.Context, request *api.UpdateVolumeRequest) (*api.UpdateVolumeResponse, error) {
  59. if request.VolumeID == "" {
  60. return nil, status.Errorf(codes.InvalidArgument, "VolumeID must not be empty")
  61. }
  62. if request.Spec == nil {
  63. return nil, status.Errorf(codes.InvalidArgument, "Spec must not be empty")
  64. }
  65. if request.VolumeVersion == nil {
  66. return nil, status.Errorf(codes.InvalidArgument, "VolumeVersion must not be empty")
  67. }
  68. var volume *api.Volume
  69. if err := s.store.Update(func(tx store.Tx) error {
  70. volume = store.GetVolume(tx, request.VolumeID)
  71. if volume == nil {
  72. return status.Errorf(codes.NotFound, "volume %v not found", request.VolumeID)
  73. }
  74. // compare specs, to see if any invalid fields have changed
  75. if request.Spec.Annotations.Name != volume.Spec.Annotations.Name {
  76. return status.Errorf(codes.InvalidArgument, "Name cannot be updated")
  77. }
  78. if request.Spec.Group != volume.Spec.Group {
  79. return status.Errorf(codes.InvalidArgument, "Group cannot be updated")
  80. }
  81. if request.Spec.AccessibilityRequirements != volume.Spec.AccessibilityRequirements {
  82. return status.Errorf(codes.InvalidArgument, "AccessibilityRequirements cannot be updated")
  83. }
  84. if request.Spec.Driver == nil || request.Spec.Driver.Name != volume.Spec.Driver.Name {
  85. return status.Errorf(codes.InvalidArgument, "Driver cannot be updated")
  86. }
  87. if request.Spec.AccessMode.Scope != volume.Spec.AccessMode.Scope || request.Spec.AccessMode.Sharing != volume.Spec.AccessMode.Sharing {
  88. return status.Errorf(codes.InvalidArgument, "AccessMode cannot be updated")
  89. }
  90. volume.Spec = *request.Spec
  91. volume.Meta.Version = *request.VolumeVersion
  92. if err := store.UpdateVolume(tx, volume); err != nil {
  93. return err
  94. }
  95. // read the volume back out, so it has the correct meta version
  96. // TODO(dperny): this behavior, while likely more correct, may not be
  97. // consistent with the rest of swarmkit...
  98. volume = store.GetVolume(tx, request.VolumeID)
  99. return nil
  100. }); err != nil {
  101. return nil, err
  102. }
  103. return &api.UpdateVolumeResponse{
  104. Volume: volume,
  105. }, nil
  106. }
  107. func (s *Server) ListVolumes(ctx context.Context, request *api.ListVolumesRequest) (*api.ListVolumesResponse, error) {
  108. var (
  109. volumes []*api.Volume
  110. err error
  111. )
  112. // so the way we do this is with two filtering passes. first, we do a store
  113. // request, filtering on one of the parameters. then, from the result of
  114. // the store request, we filter on the remaining filters. This is necessary
  115. // because the store filters do not expose an AND function.
  116. s.store.View(func(tx store.ReadTx) {
  117. var by store.By = store.All
  118. switch {
  119. case request.Filters == nil:
  120. // short circuit to avoid nil pointer deref
  121. case len(request.Filters.Names) > 0:
  122. by = buildFilters(store.ByName, request.Filters.Names)
  123. case len(request.Filters.IDPrefixes) > 0:
  124. by = buildFilters(store.ByIDPrefix, request.Filters.IDPrefixes)
  125. case len(request.Filters.Groups) > 0:
  126. by = buildFilters(store.ByVolumeGroup, request.Filters.Groups)
  127. case len(request.Filters.Drivers) > 0:
  128. by = buildFilters(store.ByDriver, request.Filters.Drivers)
  129. case len(request.Filters.NamePrefixes) > 0:
  130. by = buildFilters(store.ByNamePrefix, request.Filters.NamePrefixes)
  131. }
  132. volumes, err = store.FindVolumes(tx, by)
  133. })
  134. if err != nil {
  135. return nil, err
  136. }
  137. if request.Filters == nil {
  138. return &api.ListVolumesResponse{Volumes: volumes}, nil
  139. }
  140. volumes = filterVolumes(volumes,
  141. // Names
  142. func(v *api.Volume) bool {
  143. return filterContains(v.Spec.Annotations.Name, request.Filters.Names)
  144. },
  145. // NamePrefixes
  146. func(v *api.Volume) bool {
  147. return filterContainsPrefix(v.Spec.Annotations.Name, request.Filters.NamePrefixes)
  148. },
  149. // IDPrefixes
  150. func(v *api.Volume) bool {
  151. return filterContainsPrefix(v.ID, request.Filters.IDPrefixes)
  152. },
  153. // Labels
  154. func(v *api.Volume) bool {
  155. return filterMatchLabels(v.Spec.Annotations.Labels, request.Filters.Labels)
  156. },
  157. // Groups
  158. func(v *api.Volume) bool {
  159. return filterContains(v.Spec.Group, request.Filters.Groups)
  160. },
  161. // Drivers
  162. func(v *api.Volume) bool {
  163. return v.Spec.Driver != nil && filterContains(v.Spec.Driver.Name, request.Filters.Drivers)
  164. },
  165. )
  166. return &api.ListVolumesResponse{
  167. Volumes: volumes,
  168. }, nil
  169. }
  170. func filterVolumes(candidates []*api.Volume, filters ...func(*api.Volume) bool) []*api.Volume {
  171. result := []*api.Volume{}
  172. for _, c := range candidates {
  173. match := true
  174. for _, f := range filters {
  175. if !f(c) {
  176. match = false
  177. break
  178. }
  179. }
  180. if match {
  181. result = append(result, c)
  182. }
  183. }
  184. return result
  185. }
  186. func (s *Server) GetVolume(ctx context.Context, request *api.GetVolumeRequest) (*api.GetVolumeResponse, error) {
  187. var volume *api.Volume
  188. s.store.View(func(tx store.ReadTx) {
  189. volume = store.GetVolume(tx, request.VolumeID)
  190. })
  191. if volume == nil {
  192. return nil, status.Errorf(codes.NotFound, "volume %v not found", request.VolumeID)
  193. }
  194. return &api.GetVolumeResponse{
  195. Volume: volume,
  196. }, nil
  197. }
  198. // RemoveVolume marks a Volume for removal. For a Volume to be removed, it must
  199. // have Availability set to Drain. RemoveVolume does not immediately delete the
  200. // volume, because some clean-up must occur before it can be removed. However,
  201. // calling RemoveVolume is an irrevocable action, and once it occurs, the
  202. // Volume can no longer be used in any way.
  203. func (s *Server) RemoveVolume(ctx context.Context, request *api.RemoveVolumeRequest) (*api.RemoveVolumeResponse, error) {
  204. err := s.store.Update(func(tx store.Tx) error {
  205. volume := store.GetVolume(tx, request.VolumeID)
  206. if volume == nil {
  207. return status.Errorf(codes.NotFound, "volume %s not found", request.VolumeID)
  208. }
  209. // If this is a force delete, we force the delete. No survivors. This
  210. // is a last resort to resolve otherwise intractable problems with
  211. // volumes. Using this has the potential to break other things in the
  212. // cluster, because testing every case where we force-remove a volume
  213. // is difficult at best.
  214. if request.Force {
  215. return store.DeleteVolume(tx, request.VolumeID)
  216. }
  217. if len(volume.PublishStatus) != 0 {
  218. return status.Error(codes.FailedPrecondition, "volume is still in use")
  219. }
  220. volume.PendingDelete = true
  221. return store.UpdateVolume(tx, volume)
  222. })
  223. if err != nil {
  224. return nil, err
  225. }
  226. return &api.RemoveVolumeResponse{}, nil
  227. }