Browse Source

Merge pull request #41982 from dperny/feature-volumes

Add Swarm Cluster Volume support
Sebastiaan van Stijn 3 years ago
parent
commit
d35731fa15

+ 13 - 0
api/server/router/volume/backend.go

@@ -19,3 +19,16 @@ type Backend interface {
 	Remove(ctx context.Context, name string, opts ...opts.RemoveOption) error
 	Prune(ctx context.Context, pruneFilters filters.Args) (*types.VolumesPruneReport, error)
 }
+
+// ClusterBackend is the backend used for Swarm Cluster Volumes. Regular
+// volumes go through the volume service, but to avoid across-dependency
+// between the cluster package and the volume package, we simply provide two
+// backends here.
+type ClusterBackend interface {
+	GetVolume(nameOrID string) (volume.Volume, error)
+	GetVolumes(options volume.ListOptions) ([]*volume.Volume, error)
+	CreateVolume(volume volume.CreateOptions) (*volume.Volume, error)
+	RemoveVolume(nameOrID string, force bool) error
+	UpdateVolume(nameOrID string, version uint64, volume volume.UpdateOptions) error
+	IsManager() bool
+}

+ 5 - 1
api/server/router/volume/volume.go

@@ -5,13 +5,15 @@ import "github.com/docker/docker/api/server/router"
 // volumeRouter is a router to talk with the volumes controller
 type volumeRouter struct {
 	backend Backend
+	cluster ClusterBackend
 	routes  []router.Route
 }
 
 // NewRouter initializes a new volume router
-func NewRouter(b Backend) router.Router {
+func NewRouter(b Backend, cb ClusterBackend) router.Router {
 	r := &volumeRouter{
 		backend: b,
+		cluster: cb,
 	}
 	r.initRoutes()
 	return r
@@ -30,6 +32,8 @@ func (r *volumeRouter) initRoutes() {
 		// POST
 		router.NewPostRoute("/volumes/create", r.postVolumesCreate),
 		router.NewPostRoute("/volumes/prune", r.postVolumesPrune),
+		// PUT
+		router.NewPutRoute("/volumes/{name:.*}", r.putVolumesUpdate),
 		// DELETE
 		router.NewDeleteRoute("/volumes/{name:.*}", r.deleteVolumes),
 	}

+ 113 - 4
api/server/router/volume/volume_routes.go

@@ -2,13 +2,24 @@ package volume // import "github.com/docker/docker/api/server/router/volume"
 
 import (
 	"context"
+	"fmt"
 	"net/http"
+	"strconv"
 
 	"github.com/docker/docker/api/server/httputils"
 	"github.com/docker/docker/api/types/filters"
+	"github.com/docker/docker/api/types/versions"
 	"github.com/docker/docker/api/types/volume"
+	"github.com/docker/docker/errdefs"
 	"github.com/docker/docker/volume/service/opts"
 	"github.com/pkg/errors"
+	"github.com/sirupsen/logrus"
+)
+
+const (
+	// clusterVolumesVersion defines the API version that swarm cluster volume
+	// functionality was introduced. avoids the use of magic numbers.
+	clusterVolumesVersion = "1.42"
 )
 
 func (v *volumeRouter) getVolumesList(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
@@ -24,6 +35,21 @@ func (v *volumeRouter) getVolumesList(ctx context.Context, w http.ResponseWriter
 	if err != nil {
 		return err
 	}
+
+	version := httputils.VersionFromContext(ctx)
+	if versions.GreaterThanOrEqualTo(version, clusterVolumesVersion) && v.cluster.IsManager() {
+		clusterVolumes, swarmErr := v.cluster.GetVolumes(volume.ListOptions{Filters: filters})
+		if swarmErr != nil {
+			// if there is a swarm error, we may not want to error out right
+			// away. the local list probably worked. instead, let's do what we
+			// do if there's a bad driver while trying to list: add the error
+			// to the warnings. don't do this if swarm is not initialized.
+			warnings = append(warnings, swarmErr.Error())
+		}
+		// add the cluster volumes to the return
+		volumes = append(volumes, clusterVolumes...)
+	}
+
 	return httputils.WriteJSON(w, http.StatusOK, &volume.ListResponse{Volumes: volumes, Warnings: warnings})
 }
 
@@ -31,11 +57,33 @@ func (v *volumeRouter) getVolumeByName(ctx context.Context, w http.ResponseWrite
 	if err := httputils.ParseForm(r); err != nil {
 		return err
 	}
+	version := httputils.VersionFromContext(ctx)
 
+	// re: volume name duplication
+	//
+	// we prefer to get volumes locally before attempting to get them from the
+	// cluster. Local volumes can only be looked up by name, but cluster
+	// volumes can also be looked up by ID.
 	vol, err := v.backend.Get(ctx, vars["name"], opts.WithGetResolveStatus)
-	if err != nil {
+
+	// if the volume is not found in the regular volume backend, and the client
+	// is using an API version greater than 1.42 (when cluster volumes were
+	// introduced), then check if Swarm has the volume.
+	if errdefs.IsNotFound(err) && versions.GreaterThanOrEqualTo(version, clusterVolumesVersion) && v.cluster.IsManager() {
+		swarmVol, err := v.cluster.GetVolume(vars["name"])
+		// if swarm returns an error and that error indicates that swarm is not
+		// initialized, return original NotFound error. Otherwise, we'd return
+		// a weird swarm unavailable error on non-swarm engines.
+		if err != nil {
+			return err
+		}
+		vol = &swarmVol
+	} else if err != nil {
+		// otherwise, if this isn't NotFound, or this isn't a high enough version,
+		// just return the error by itself.
 		return err
 	}
+
 	return httputils.WriteJSON(w, http.StatusOK, vol)
 }
 
@@ -49,21 +97,82 @@ func (v *volumeRouter) postVolumesCreate(ctx context.Context, w http.ResponseWri
 		return err
 	}
 
-	vol, err := v.backend.Create(ctx, req.Name, req.Driver, opts.WithCreateOptions(req.DriverOpts), opts.WithCreateLabels(req.Labels))
+	var (
+		vol     *volume.Volume
+		err     error
+		version = httputils.VersionFromContext(ctx)
+	)
+
+	// if the ClusterVolumeSpec is filled in, then this is a cluster volume
+	// and is created through the swarm cluster volume backend.
+	//
+	// re: volume name duplication
+	//
+	// As it happens, there is no good way to prevent duplication of a volume
+	// name between local and cluster volumes. This is because Swarm volumes
+	// can be created from any manager node, bypassing most of the protections
+	// we could put into the engine side.
+	//
+	// Instead, we will allow creating a volume with a duplicate name, which
+	// should not break anything.
+	if req.ClusterVolumeSpec != nil && versions.GreaterThanOrEqualTo(version, clusterVolumesVersion) {
+		logrus.Debug("using cluster volume")
+		vol, err = v.cluster.CreateVolume(req)
+	} else {
+		logrus.Debug("using regular volume")
+		vol, err = v.backend.Create(ctx, req.Name, req.Driver, opts.WithCreateOptions(req.DriverOpts), opts.WithCreateLabels(req.Labels))
+	}
+
 	if err != nil {
 		return err
 	}
 	return httputils.WriteJSON(w, http.StatusCreated, vol)
 }
 
+func (v *volumeRouter) putVolumesUpdate(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
+	if !v.cluster.IsManager() {
+		return errdefs.Unavailable(errors.New("volume update only valid for cluster volumes, but swarm is unavailable"))
+	}
+
+	if err := httputils.ParseForm(r); err != nil {
+		return err
+	}
+
+	rawVersion := r.URL.Query().Get("version")
+	version, err := strconv.ParseUint(rawVersion, 10, 64)
+	if err != nil {
+		err = fmt.Errorf("invalid swarm object version '%s': %v", rawVersion, err)
+		return errdefs.InvalidParameter(err)
+	}
+
+	var req volume.UpdateOptions
+	if err := httputils.ReadJSON(r, &req); err != nil {
+		return err
+	}
+
+	return v.cluster.UpdateVolume(vars["name"], version, req)
+}
+
 func (v *volumeRouter) deleteVolumes(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
 	if err := httputils.ParseForm(r); err != nil {
 		return err
 	}
 	force := httputils.BoolValue(r, "force")
-	if err := v.backend.Remove(ctx, vars["name"], opts.WithPurgeOnError(force)); err != nil {
-		return err
+
+	version := httputils.VersionFromContext(ctx)
+
+	err := v.backend.Remove(ctx, vars["name"], opts.WithPurgeOnError(force))
+	if err != nil {
+		if errdefs.IsNotFound(err) && versions.GreaterThanOrEqualTo(version, clusterVolumesVersion) && v.cluster.IsManager() {
+			err := v.cluster.RemoveVolume(vars["name"], force)
+			if err != nil {
+				return err
+			}
+		} else {
+			return err
+		}
 	}
+
 	w.WriteHeader(http.StatusNoContent)
 	return nil
 }

+ 752 - 0
api/server/router/volume/volume_routes_test.go

@@ -0,0 +1,752 @@
+package volume
+
+import (
+	"bytes"
+	"context"
+	"encoding/json"
+	"fmt"
+	"net/http/httptest"
+	"testing"
+
+	"gotest.tools/v3/assert"
+
+	"github.com/docker/docker/api/server/httputils"
+	"github.com/docker/docker/api/types"
+	"github.com/docker/docker/api/types/filters"
+	"github.com/docker/docker/api/types/volume"
+	"github.com/docker/docker/errdefs"
+	"github.com/docker/docker/volume/service/opts"
+)
+
+func callGetVolume(v *volumeRouter, name string) (*httptest.ResponseRecorder, error) {
+	ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion)
+	vars := map[string]string{"name": name}
+	req := httptest.NewRequest("GET", fmt.Sprintf("/volumes/%s", name), nil)
+	resp := httptest.NewRecorder()
+
+	err := v.getVolumeByName(ctx, resp, req, vars)
+	return resp, err
+}
+
+func callListVolumes(v *volumeRouter) (*httptest.ResponseRecorder, error) {
+	ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion)
+	vars := map[string]string{}
+	req := httptest.NewRequest("GET", "/volumes", nil)
+	resp := httptest.NewRecorder()
+
+	err := v.getVolumesList(ctx, resp, req, vars)
+	return resp, err
+}
+
+func TestGetVolumeByNameNotFoundNoSwarm(t *testing.T) {
+	v := &volumeRouter{
+		backend: &fakeVolumeBackend{},
+		cluster: &fakeClusterBackend{},
+	}
+
+	_, err := callGetVolume(v, "notReal")
+
+	assert.Assert(t, err != nil)
+	assert.Assert(t, errdefs.IsNotFound(err))
+}
+
+func TestGetVolumeByNameNotFoundNotManager(t *testing.T) {
+	v := &volumeRouter{
+		backend: &fakeVolumeBackend{},
+		cluster: &fakeClusterBackend{swarm: true},
+	}
+
+	_, err := callGetVolume(v, "notReal")
+
+	assert.Assert(t, err != nil)
+	assert.Assert(t, errdefs.IsNotFound(err))
+}
+
+func TestGetVolumeByNameNotFound(t *testing.T) {
+	v := &volumeRouter{
+		backend: &fakeVolumeBackend{},
+		cluster: &fakeClusterBackend{swarm: true, manager: true},
+	}
+
+	_, err := callGetVolume(v, "notReal")
+
+	assert.Assert(t, err != nil)
+	assert.Assert(t, errdefs.IsNotFound(err))
+}
+
+func TestGetVolumeByNameFoundRegular(t *testing.T) {
+	v := &volumeRouter{
+		backend: &fakeVolumeBackend{
+			volumes: map[string]*volume.Volume{
+
+				"volume1": &volume.Volume{
+					Name: "volume1",
+				},
+			},
+		},
+		cluster: &fakeClusterBackend{swarm: true, manager: true},
+	}
+
+	_, err := callGetVolume(v, "volume1")
+	assert.NilError(t, err)
+}
+
+func TestGetVolumeByNameFoundSwarm(t *testing.T) {
+	v := &volumeRouter{
+		backend: &fakeVolumeBackend{},
+		cluster: &fakeClusterBackend{
+			swarm:   true,
+			manager: true,
+			volumes: map[string]*volume.Volume{
+				"volume1": &volume.Volume{
+					Name: "volume1",
+				},
+			},
+		},
+	}
+
+	_, err := callGetVolume(v, "volume1")
+	assert.NilError(t, err)
+}
+func TestListVolumes(t *testing.T) {
+	v := &volumeRouter{
+		backend: &fakeVolumeBackend{
+			volumes: map[string]*volume.Volume{
+				"v1": &volume.Volume{Name: "v1"},
+				"v2": &volume.Volume{Name: "v2"},
+			},
+		},
+		cluster: &fakeClusterBackend{
+			swarm:   true,
+			manager: true,
+			volumes: map[string]*volume.Volume{
+				"v3": &volume.Volume{Name: "v3"},
+				"v4": &volume.Volume{Name: "v4"},
+			},
+		},
+	}
+
+	resp, err := callListVolumes(v)
+	assert.NilError(t, err)
+	d := json.NewDecoder(resp.Result().Body)
+	respVols := volume.ListResponse{}
+	assert.NilError(t, d.Decode(&respVols))
+
+	assert.Assert(t, respVols.Volumes != nil)
+	assert.Equal(t, len(respVols.Volumes), 4, "volumes %v", respVols.Volumes)
+}
+
+func TestListVolumesNoSwarm(t *testing.T) {
+	v := &volumeRouter{
+		backend: &fakeVolumeBackend{
+			volumes: map[string]*volume.Volume{
+				"v1": &volume.Volume{Name: "v1"},
+				"v2": &volume.Volume{Name: "v2"},
+			},
+		},
+		cluster: &fakeClusterBackend{},
+	}
+
+	_, err := callListVolumes(v)
+	assert.NilError(t, err)
+}
+
+func TestListVolumesNoManager(t *testing.T) {
+	v := &volumeRouter{
+		backend: &fakeVolumeBackend{
+			volumes: map[string]*volume.Volume{
+				"v1": &volume.Volume{Name: "v1"},
+				"v2": &volume.Volume{Name: "v2"},
+			},
+		},
+		cluster: &fakeClusterBackend{swarm: true},
+	}
+
+	resp, err := callListVolumes(v)
+	assert.NilError(t, err)
+
+	d := json.NewDecoder(resp.Result().Body)
+	respVols := volume.ListResponse{}
+	assert.NilError(t, d.Decode(&respVols))
+
+	assert.Equal(t, len(respVols.Volumes), 2)
+	assert.Equal(t, len(respVols.Warnings), 0)
+}
+
+func TestCreateRegularVolume(t *testing.T) {
+	b := &fakeVolumeBackend{}
+	c := &fakeClusterBackend{
+		swarm:   true,
+		manager: true,
+	}
+	v := &volumeRouter{
+		backend: b,
+		cluster: c,
+	}
+
+	volumeCreate := volume.CreateOptions{
+		Name:   "vol1",
+		Driver: "foodriver",
+	}
+
+	buf := bytes.Buffer{}
+	e := json.NewEncoder(&buf)
+	e.Encode(volumeCreate)
+
+	ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion)
+	req := httptest.NewRequest("POST", "/volumes/create", &buf)
+	req.Header.Add("Content-Type", "application/json")
+
+	resp := httptest.NewRecorder()
+	err := v.postVolumesCreate(ctx, resp, req, nil)
+
+	assert.NilError(t, err)
+
+	respVolume := volume.Volume{}
+
+	assert.NilError(t, json.NewDecoder(resp.Result().Body).Decode(&respVolume))
+
+	assert.Equal(t, respVolume.Name, "vol1")
+	assert.Equal(t, respVolume.Driver, "foodriver")
+
+	assert.Equal(t, 1, len(b.volumes))
+	assert.Equal(t, 0, len(c.volumes))
+}
+
+func TestCreateSwarmVolumeNoSwarm(t *testing.T) {
+	b := &fakeVolumeBackend{}
+	c := &fakeClusterBackend{}
+
+	v := &volumeRouter{
+		backend: b,
+		cluster: c,
+	}
+
+	volumeCreate := volume.CreateOptions{
+		ClusterVolumeSpec: &volume.ClusterVolumeSpec{},
+		Name:              "volCluster",
+		Driver:            "someCSI",
+	}
+
+	buf := bytes.Buffer{}
+	json.NewEncoder(&buf).Encode(volumeCreate)
+
+	ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion)
+	req := httptest.NewRequest("POST", "/volumes/create", &buf)
+	req.Header.Add("Content-Type", "application/json")
+
+	resp := httptest.NewRecorder()
+	err := v.postVolumesCreate(ctx, resp, req, nil)
+
+	assert.Assert(t, err != nil)
+	assert.Assert(t, errdefs.IsUnavailable(err))
+}
+
+func TestCreateSwarmVolumeNotManager(t *testing.T) {
+	b := &fakeVolumeBackend{}
+	c := &fakeClusterBackend{swarm: true}
+
+	v := &volumeRouter{
+		backend: b,
+		cluster: c,
+	}
+
+	volumeCreate := volume.CreateOptions{
+		ClusterVolumeSpec: &volume.ClusterVolumeSpec{},
+		Name:              "volCluster",
+		Driver:            "someCSI",
+	}
+
+	buf := bytes.Buffer{}
+	json.NewEncoder(&buf).Encode(volumeCreate)
+
+	ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion)
+	req := httptest.NewRequest("POST", "/volumes/create", &buf)
+	req.Header.Add("Content-Type", "application/json")
+
+	resp := httptest.NewRecorder()
+	err := v.postVolumesCreate(ctx, resp, req, nil)
+
+	assert.Assert(t, err != nil)
+	assert.Assert(t, errdefs.IsUnavailable(err))
+}
+
+func TestCreateVolumeCluster(t *testing.T) {
+	b := &fakeVolumeBackend{}
+	c := &fakeClusterBackend{
+		swarm:   true,
+		manager: true,
+	}
+
+	v := &volumeRouter{
+		backend: b,
+		cluster: c,
+	}
+
+	volumeCreate := volume.CreateOptions{
+		ClusterVolumeSpec: &volume.ClusterVolumeSpec{},
+		Name:              "volCluster",
+		Driver:            "someCSI",
+	}
+
+	buf := bytes.Buffer{}
+	json.NewEncoder(&buf).Encode(volumeCreate)
+
+	ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion)
+	req := httptest.NewRequest("POST", "/volumes/create", &buf)
+	req.Header.Add("Content-Type", "application/json")
+
+	resp := httptest.NewRecorder()
+	err := v.postVolumesCreate(ctx, resp, req, nil)
+
+	assert.NilError(t, err)
+
+	respVolume := volume.Volume{}
+
+	assert.NilError(t, json.NewDecoder(resp.Result().Body).Decode(&respVolume))
+
+	assert.Equal(t, respVolume.Name, "volCluster")
+	assert.Equal(t, respVolume.Driver, "someCSI")
+
+	assert.Equal(t, 0, len(b.volumes))
+	assert.Equal(t, 1, len(c.volumes))
+}
+
+func TestUpdateVolume(t *testing.T) {
+	b := &fakeVolumeBackend{}
+	c := &fakeClusterBackend{
+		swarm:   true,
+		manager: true,
+		volumes: map[string]*volume.Volume{
+			"vol1": &volume.Volume{
+				Name: "vo1",
+				ClusterVolume: &volume.ClusterVolume{
+					ID: "vol1",
+				},
+			},
+		},
+	}
+
+	v := &volumeRouter{
+		backend: b,
+		cluster: c,
+	}
+
+	volumeUpdate := volume.UpdateOptions{
+		Spec: &volume.ClusterVolumeSpec{},
+	}
+
+	buf := bytes.Buffer{}
+	json.NewEncoder(&buf).Encode(volumeUpdate)
+	ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion)
+	req := httptest.NewRequest("POST", "/volumes/vol1/update?version=0", &buf)
+	req.Header.Add("Content-Type", "application/json")
+
+	resp := httptest.NewRecorder()
+
+	err := v.putVolumesUpdate(ctx, resp, req, map[string]string{"name": "vol1"})
+	assert.NilError(t, err)
+
+	assert.Equal(t, c.volumes["vol1"].ClusterVolume.Meta.Version.Index, uint64(1))
+}
+
+func TestUpdateVolumeNoSwarm(t *testing.T) {
+	b := &fakeVolumeBackend{}
+	c := &fakeClusterBackend{}
+
+	v := &volumeRouter{
+		backend: b,
+		cluster: c,
+	}
+
+	volumeUpdate := volume.UpdateOptions{
+		Spec: &volume.ClusterVolumeSpec{},
+	}
+
+	buf := bytes.Buffer{}
+	json.NewEncoder(&buf).Encode(volumeUpdate)
+	ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion)
+	req := httptest.NewRequest("POST", "/volumes/vol1/update?version=0", &buf)
+	req.Header.Add("Content-Type", "application/json")
+
+	resp := httptest.NewRecorder()
+
+	err := v.putVolumesUpdate(ctx, resp, req, map[string]string{"name": "vol1"})
+	assert.Assert(t, err != nil)
+	assert.Assert(t, errdefs.IsUnavailable(err))
+}
+
+func TestUpdateVolumeNotFound(t *testing.T) {
+	b := &fakeVolumeBackend{}
+	c := &fakeClusterBackend{
+		swarm:   true,
+		manager: true,
+		volumes: map[string]*volume.Volume{},
+	}
+
+	v := &volumeRouter{
+		backend: b,
+		cluster: c,
+	}
+
+	volumeUpdate := volume.UpdateOptions{
+		Spec: &volume.ClusterVolumeSpec{},
+	}
+
+	buf := bytes.Buffer{}
+	json.NewEncoder(&buf).Encode(volumeUpdate)
+	ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion)
+	req := httptest.NewRequest("POST", "/volumes/vol1/update?version=0", &buf)
+	req.Header.Add("Content-Type", "application/json")
+
+	resp := httptest.NewRecorder()
+
+	err := v.putVolumesUpdate(ctx, resp, req, map[string]string{"name": "vol1"})
+	assert.Assert(t, err != nil)
+	assert.Assert(t, errdefs.IsNotFound(err))
+}
+
+func TestVolumeRemove(t *testing.T) {
+	b := &fakeVolumeBackend{
+		volumes: map[string]*volume.Volume{
+			"vol1": &volume.Volume{
+				Name: "vol1",
+			},
+		},
+	}
+	c := &fakeClusterBackend{swarm: true, manager: true}
+
+	v := &volumeRouter{
+		backend: b,
+		cluster: c,
+	}
+
+	ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion)
+	req := httptest.NewRequest("DELETE", "/volumes/vol1", nil)
+	resp := httptest.NewRecorder()
+
+	err := v.deleteVolumes(ctx, resp, req, map[string]string{"name": "vol1"})
+	assert.NilError(t, err)
+	assert.Equal(t, len(b.volumes), 0)
+}
+
+func TestVolumeRemoveSwarm(t *testing.T) {
+	b := &fakeVolumeBackend{}
+	c := &fakeClusterBackend{
+		swarm:   true,
+		manager: true,
+		volumes: map[string]*volume.Volume{
+			"vol1": &volume.Volume{
+				Name:          "vol1",
+				ClusterVolume: &volume.ClusterVolume{},
+			},
+		},
+	}
+
+	v := &volumeRouter{
+		backend: b,
+		cluster: c,
+	}
+
+	ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion)
+	req := httptest.NewRequest("DELETE", "/volumes/vol1", nil)
+	resp := httptest.NewRecorder()
+
+	err := v.deleteVolumes(ctx, resp, req, map[string]string{"name": "vol1"})
+	assert.NilError(t, err)
+	assert.Equal(t, len(c.volumes), 0)
+}
+
+func TestVolumeRemoveNotFoundNoSwarm(t *testing.T) {
+	b := &fakeVolumeBackend{}
+	c := &fakeClusterBackend{}
+	v := &volumeRouter{
+		backend: b,
+		cluster: c,
+	}
+
+	ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion)
+	req := httptest.NewRequest("DELETE", "/volumes/vol1", nil)
+	resp := httptest.NewRecorder()
+
+	err := v.deleteVolumes(ctx, resp, req, map[string]string{"name": "vol1"})
+	assert.Assert(t, err != nil)
+	assert.Assert(t, errdefs.IsNotFound(err), err.Error())
+}
+
+func TestVolumeRemoveNotFoundNoManager(t *testing.T) {
+	b := &fakeVolumeBackend{}
+	c := &fakeClusterBackend{swarm: true}
+	v := &volumeRouter{
+		backend: b,
+		cluster: c,
+	}
+
+	ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion)
+	req := httptest.NewRequest("DELETE", "/volumes/vol1", nil)
+	resp := httptest.NewRecorder()
+
+	err := v.deleteVolumes(ctx, resp, req, map[string]string{"name": "vol1"})
+	assert.Assert(t, err != nil)
+	assert.Assert(t, errdefs.IsNotFound(err))
+}
+
+func TestVolumeRemoveFoundNoSwarm(t *testing.T) {
+	b := &fakeVolumeBackend{
+		volumes: map[string]*volume.Volume{
+			"vol1": &volume.Volume{
+				Name: "vol1",
+			},
+		},
+	}
+	c := &fakeClusterBackend{}
+
+	v := &volumeRouter{
+		backend: b,
+		cluster: c,
+	}
+
+	ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion)
+	req := httptest.NewRequest("DELETE", "/volumes/vol1", nil)
+	resp := httptest.NewRecorder()
+
+	err := v.deleteVolumes(ctx, resp, req, map[string]string{"name": "vol1"})
+	assert.NilError(t, err)
+	assert.Equal(t, len(b.volumes), 0)
+}
+
+func TestVolumeRemoveNoSwarmInUse(t *testing.T) {
+	b := &fakeVolumeBackend{
+		volumes: map[string]*volume.Volume{
+			"inuse": &volume.Volume{
+				Name: "inuse",
+			},
+		},
+	}
+	c := &fakeClusterBackend{}
+	v := &volumeRouter{
+		backend: b,
+		cluster: c,
+	}
+
+	ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion)
+	req := httptest.NewRequest("DELETE", "/volumes/inuse", nil)
+	resp := httptest.NewRecorder()
+
+	err := v.deleteVolumes(ctx, resp, req, map[string]string{"name": "inuse"})
+	assert.Assert(t, err != nil)
+	assert.Assert(t, errdefs.IsConflict(err))
+}
+
+func TestVolumeRemoveSwarmForce(t *testing.T) {
+	b := &fakeVolumeBackend{}
+	c := &fakeClusterBackend{
+		swarm:   true,
+		manager: true,
+		volumes: map[string]*volume.Volume{
+			"vol1": &volume.Volume{
+				Name:          "vol1",
+				ClusterVolume: &volume.ClusterVolume{},
+				Options:       map[string]string{"mustforce": "yes"},
+			},
+		},
+	}
+
+	v := &volumeRouter{
+		backend: b,
+		cluster: c,
+	}
+
+	ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion)
+	req := httptest.NewRequest("DELETE", "/volumes/vol1", nil)
+	resp := httptest.NewRecorder()
+
+	err := v.deleteVolumes(ctx, resp, req, map[string]string{"name": "vol1"})
+
+	assert.Assert(t, err != nil)
+	assert.Assert(t, errdefs.IsConflict(err))
+
+	ctx = context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion)
+	req = httptest.NewRequest("DELETE", "/volumes/vol1?force=1", nil)
+	resp = httptest.NewRecorder()
+
+	err = v.deleteVolumes(ctx, resp, req, map[string]string{"name": "vol1"})
+
+	assert.NilError(t, err)
+	assert.Equal(t, len(b.volumes), 0)
+}
+
+type fakeVolumeBackend struct {
+	volumes map[string]*volume.Volume
+}
+
+func (b *fakeVolumeBackend) List(_ context.Context, _ filters.Args) ([]*volume.Volume, []string, error) {
+	volumes := []*volume.Volume{}
+	for _, v := range b.volumes {
+		volumes = append(volumes, v)
+	}
+	return volumes, nil, nil
+}
+
+func (b *fakeVolumeBackend) Get(_ context.Context, name string, _ ...opts.GetOption) (*volume.Volume, error) {
+	if v, ok := b.volumes[name]; ok {
+		return v, nil
+	}
+	return nil, errdefs.NotFound(fmt.Errorf("volume %s not found", name))
+}
+
+func (b *fakeVolumeBackend) Create(_ context.Context, name, driverName string, _ ...opts.CreateOption) (*volume.Volume, error) {
+	if _, ok := b.volumes[name]; ok {
+		// TODO(dperny): return appropriate error type
+		return nil, fmt.Errorf("already exists")
+	}
+
+	v := &volume.Volume{
+		Name:   name,
+		Driver: driverName,
+	}
+	if b.volumes == nil {
+		b.volumes = map[string]*volume.Volume{
+			name: v,
+		}
+	} else {
+		b.volumes[name] = v
+	}
+
+	return v, nil
+}
+
+func (b *fakeVolumeBackend) Remove(_ context.Context, name string, _ ...opts.RemoveOption) error {
+	if v, ok := b.volumes[name]; !ok {
+		return errdefs.NotFound(fmt.Errorf("volume %s not found", name))
+	} else if v.Name == "inuse" {
+		return errdefs.Conflict(fmt.Errorf("volume in use"))
+	}
+
+	delete(b.volumes, name)
+
+	return nil
+}
+
+func (b *fakeVolumeBackend) Prune(_ context.Context, _ filters.Args) (*types.VolumesPruneReport, error) {
+	return nil, nil
+}
+
+type fakeClusterBackend struct {
+	swarm   bool
+	manager bool
+	idCount int
+	volumes map[string]*volume.Volume
+}
+
+func (c *fakeClusterBackend) checkSwarm() error {
+	if !c.swarm {
+		return errdefs.Unavailable(fmt.Errorf("this node is not a swarm manager. Use \"docker swarm init\" or \"docker swarm join\" to connect this node to swarm and try again"))
+	} else if !c.manager {
+		return errdefs.Unavailable(fmt.Errorf("this node is not a swarm manager. Worker nodes can't be used to view or modify cluster state. Please run this command on a manager node or promote the current node to a manager"))
+	}
+
+	return nil
+}
+
+func (c *fakeClusterBackend) IsManager() bool {
+	return c.swarm && c.manager
+}
+
+func (c *fakeClusterBackend) GetVolume(nameOrID string) (volume.Volume, error) {
+	if err := c.checkSwarm(); err != nil {
+		return volume.Volume{}, err
+	}
+
+	if v, ok := c.volumes[nameOrID]; ok {
+		return *v, nil
+	}
+	return volume.Volume{}, errdefs.NotFound(fmt.Errorf("volume %s not found", nameOrID))
+}
+
+func (c *fakeClusterBackend) GetVolumes(options volume.ListOptions) ([]*volume.Volume, error) {
+	if err := c.checkSwarm(); err != nil {
+		return nil, err
+	}
+
+	volumes := []*volume.Volume{}
+
+	for _, v := range c.volumes {
+		volumes = append(volumes, v)
+	}
+	return volumes, nil
+}
+
+func (c *fakeClusterBackend) CreateVolume(volumeCreate volume.CreateOptions) (*volume.Volume, error) {
+	if err := c.checkSwarm(); err != nil {
+		return nil, err
+	}
+
+	if _, ok := c.volumes[volumeCreate.Name]; ok {
+		// TODO(dperny): return appropriate already exists error
+		return nil, fmt.Errorf("already exists")
+	}
+
+	v := &volume.Volume{
+		Name:    volumeCreate.Name,
+		Driver:  volumeCreate.Driver,
+		Labels:  volumeCreate.Labels,
+		Options: volumeCreate.DriverOpts,
+		Scope:   "global",
+	}
+
+	v.ClusterVolume = &volume.ClusterVolume{
+		ID:   fmt.Sprintf("cluster_%d", c.idCount),
+		Spec: *volumeCreate.ClusterVolumeSpec,
+	}
+
+	c.idCount = c.idCount + 1
+	if c.volumes == nil {
+		c.volumes = map[string]*volume.Volume{
+			v.Name: v,
+		}
+	} else {
+		c.volumes[v.Name] = v
+	}
+
+	return v, nil
+}
+
+func (c *fakeClusterBackend) RemoveVolume(nameOrID string, force bool) error {
+	if err := c.checkSwarm(); err != nil {
+		return err
+	}
+
+	v, ok := c.volumes[nameOrID]
+	if !ok {
+		return errdefs.NotFound(fmt.Errorf("volume %s not found", nameOrID))
+	}
+
+	if _, mustforce := v.Options["mustforce"]; mustforce && !force {
+		return errdefs.Conflict(fmt.Errorf("volume %s must be force removed", nameOrID))
+	}
+
+	delete(c.volumes, nameOrID)
+
+	return nil
+}
+
+func (c *fakeClusterBackend) UpdateVolume(nameOrID string, version uint64, _ volume.UpdateOptions) error {
+	if err := c.checkSwarm(); err != nil {
+		return err
+	}
+
+	if v, ok := c.volumes[nameOrID]; ok {
+		if v.ClusterVolume.Meta.Version.Index != version {
+			return fmt.Errorf("wrong version")
+		}
+		v.ClusterVolume.Meta.Version.Index = v.ClusterVolume.Meta.Version.Index + 1
+		// for testing, we don't actually need to change anything about the
+		// volume object. let's just increment the version so we can see the
+		// call happened.
+	} else {
+		return errdefs.NotFound(fmt.Errorf("volume %q not found", nameOrID))
+	}
+
+	return nil
+}

+ 299 - 0
api/swagger.yaml

@@ -1996,6 +1996,8 @@ definitions:
         x-nullable: false
         enum: ["local", "global"]
         example: "local"
+      ClusterVolume:
+        $ref: "#/definitions/ClusterVolume"
       Options:
         type: "object"
         description: |
@@ -2069,6 +2071,8 @@ definitions:
         example:
           com.example.some-label: "some-value"
           com.example.some-other-label: "some-other-value"
+      ClusterVolumeSpec:
+        $ref: "#/definitions/ClusterVolumeSpec"
 
   VolumeListResponse:
     type: "object"
@@ -5740,6 +5744,242 @@ definitions:
         items:
           $ref: "#/definitions/OCIPlatform"
 
+  ClusterVolume:
+    type: "object"
+    description: |
+      Options and information specific to, and only present on, Swarm CSI
+      cluster volumes.
+    properties:
+      ID:
+        type: "string"
+        description: |
+          The Swarm ID of this volume. Because cluster volumes are Swarm
+          objects, they have an ID, unlike non-cluster volumes. This ID can
+          be used to refer to the Volume instead of the name.
+      Version:
+        $ref: "#/definitions/ObjectVersion"
+      CreatedAt:
+        type: "string"
+        format: "dateTime"
+      UpdatedAt:
+        type: "string"
+        format: "dateTime"
+      Spec:
+        $ref: "#/definitions/ClusterVolumeSpec"
+      Info:
+        type: "object"
+        description: |
+          Information about the global status of the volume.
+        properties:
+          CapacityBytes:
+            type: "integer"
+            format: "int64"
+            description: |
+              The capacity of the volume in bytes. A value of 0 indicates that
+              the capacity is unknown.
+          VolumeContext:
+            type: "object"
+            description: |
+              A map of strings to strings returned from the storage plugin when
+              the volume is created.
+            additionalProperties:
+              type: "string"
+          VolumeID:
+            type: "string"
+            description: |
+              The ID of the volume as returned by the CSI storage plugin. This
+              is distinct from the volume's ID as provided by Docker. This ID
+              is never used by the user when communicating with Docker to refer
+              to this volume. If the ID is blank, then the Volume has not been
+              successfully created in the plugin yet.
+          AccessibleTopology:
+            type: "array"
+            description: |
+              The topology this volume is actually accessible from.
+            items:
+              $ref: "#/definitions/Topology"
+      PublishStatus:
+        type: "array"
+        description: |
+          The status of the volume as it pertains to its publishing and use on
+          specific nodes
+        items:
+          type: "object"
+          properties:
+            NodeID:
+              type: "string"
+              description: |
+                The ID of the Swarm node the volume is published on.
+            State:
+              type: "string"
+              description: |
+                The published state of the volume.
+                * `pending-publish` The volume should be published to this node, but the call to the controller plugin to do so has not yet been successfully completed.
+                * `published` The volume is published successfully to the node.
+                * `pending-node-unpublish` The volume should be unpublished from the node, and the manager is awaiting confirmation from the worker that it has done so.
+                * `pending-controller-unpublish` The volume is successfully unpublished from the node, but has not yet been successfully unpublished on the controller.
+              enum:
+                - "pending-publish"
+                - "published"
+                - "pending-node-unpublish"
+                - "pending-controller-unpublish"
+            PublishContext:
+              type: "object"
+              description: |
+                A map of strings to strings returned by the CSI controller
+                plugin when a volume is published.
+              additionalProperties:
+                type: "string"
+
+  ClusterVolumeSpec:
+    type: "object"
+    description: |
+      Cluster-specific options used to create the volume.
+    properties:
+      Group:
+        type: "string"
+        description: |
+          Group defines the volume group of this volume. Volumes belonging to
+          the same group can be referred to by group name when creating
+          Services.  Referring to a volume by group instructs Swarm to treat
+          volumes in that group interchangeably for the purpose of scheduling.
+          Volumes with an empty string for a group technically all belong to
+          the same, emptystring group.
+      AccessMode:
+        type: "object"
+        description: |
+          Defines how the volume is used by tasks.
+        properties:
+          Scope:
+            type: "string"
+            description: |
+              The set of nodes this volume can be used on at one time.
+              - `single` The volume may only be scheduled to one node at a time.
+              - `multi` the volume may be scheduled to any supported number of nodes at a time.
+            default: "single"
+            enum: ["single", "multi"]
+            x-nullable: false
+          Sharing:
+            type: "string"
+            description: |
+              The number and way that different tasks can use this volume
+              at one time.
+              - `none` The volume may only be used by one task at a time.
+              - `readonly` The volume may be used by any number of tasks, but they all must mount the volume as readonly
+              - `onewriter` The volume may be used by any number of tasks, but only one may mount it as read/write.
+              - `all` The volume may have any number of readers and writers.
+            default: "none"
+            enum: ["none", "readonly", "onewriter", "all"]
+            x-nullable: false
+          MountVolume:
+            type: "object"
+            description: |
+              Options for using this volume as a Mount-type volume.
+
+                  Either MountVolume or BlockVolume, but not both, must be
+                  present.
+                properties:
+                  FsType:
+                    type: "string"
+                    description: |
+                      Specifies the filesystem type for the mount volume.
+                      Optional.
+                  MountFlags:
+                    type: "array"
+                    description: |
+                      Flags to pass when mounting the volume. Optional.
+                    items:
+                      type: "string"
+              BlockVolume:
+                type: "object"
+                description: |
+                  Options for using this volume as a Block-type volume.
+                  Intentionally empty.
+          Secrets:
+            type: "array"
+            description: |
+              Swarm Secrets that are passed to the CSI storage plugin when
+              operating on this volume.
+            items:
+              type: "object"
+              description: |
+                One cluster volume secret entry. Defines a key-value pair that
+                is passed to the plugin.
+              properties:
+                Key:
+                  type: "string"
+                  description: |
+                    Key is the name of the key of the key-value pair passed to
+                    the plugin.
+                Secret:
+                  type: "string"
+                  description: |
+                    Secret is the swarm Secret object from which to read data.
+                    This can be a Secret name or ID. The Secret data is
+                    retrieved by swarm and used as the value of the key-value
+                    pair passed to the plugin.
+          AccessibilityRequirements:
+            type: "object"
+            description: |
+              Requirements for the accessible topology of the volume. These
+              fields are optional. For an in-depth description of what these
+              fields mean, see the CSI specification.
+            properties:
+              Requisite:
+                type: "array"
+                description: |
+                  A list of required topologies, at least one of which the
+                  volume must be accessible from.
+                items:
+                  $ref: "#/definitions/Topology"
+              Preferred:
+                type: "array"
+                description: |
+                  A list of topologies that the volume should attempt to be
+                  provisioned in.
+                items:
+                  $ref: "#/definitions/Topology"
+          CapacityRange:
+            type: "object"
+            description: |
+              The desired capacity that the volume should be created with. If
+              empty, the plugin will decide the capacity.
+            properties:
+              RequiredBytes:
+                type: "integer"
+                format: "int64"
+                description: |
+                  The volume must be at least this big. The value of 0
+                  indicates an unspecified minimum
+              LimitBytes:
+                type: "integer"
+                format: "int64"
+                description: |
+                  The volume must not be bigger than this. The value of 0
+                  indicates an unspecified maximum.
+          Availability:
+            type: "string"
+            description: |
+              The availability of the volume for use in tasks.
+              - `active` The volume is fully available for scheduling on the cluster
+              - `pause` No new workloads should use the volume, but existing workloads are not stopped.
+              - `drain` All workloads using this volume should be stopped and rescheduled, and no new ones should be started.
+            default: "active"
+            x-nullable: false
+            enum:
+              - "active"
+              - "pause"
+              - "drain"
+
+  Topology:
+    description: |
+      A map of topological domains to topological segments. For in depth
+      details, see documentation for the Topology object in the CSI
+      specification.
+    type: "object"
+    additionalProperties:
+      type: "string"
+
 paths:
   /containers/json:
     get:
@@ -9247,6 +9487,64 @@ paths:
           type: "string"
       tags: ["Volume"]
 
+    put:
+      summary: |
+        "Update a volume. Valid only for Swarm cluster volumes"
+      operationId: "VolumeUpdate"
+      consumes: ["application/json"]
+      produces: ["application/json"]
+      responses:
+        200:
+          description: "no error"
+        400:
+          description: "bad parameter"
+          schema:
+            $ref: "#/definitions/ErrorResponse"
+        404:
+          description: "no such volume"
+          schema:
+            $ref: "#/definitions/ErrorResponse"
+        500:
+          description: "server error"
+          schema:
+            $ref: "#/definitions/ErrorResponse"
+        503:
+          description: "node is not part of a swarm"
+          schema:
+            $ref: "#/definitions/ErrorResponse"
+      parameters:
+        - name: "name"
+          in: "path"
+          description: "The name or ID of the volume"
+          type: "string"
+          required: true
+        - name: "body"
+          in: "body"
+          schema:
+            # though the schema for is an object that contains only a
+            # ClusterVolumeSpec, wrapping the ClusterVolumeSpec in this object
+            # means that if, later on, we support things like changing the
+            # labels, we can do so without duplicating that information to the
+            # ClusterVolumeSpec.
+            type: "object"
+            description: "Volume configuration"
+            properties:
+              Spec:
+                $ref: "#/definitions/ClusterVolumeSpec"
+          description: |
+            The spec of the volume to update. Currently, only Availability may
+            change. All other fields must remain unchanged.
+        - name: "version"
+          in: "query"
+          description: |
+            The version number of the volume being updated. This is required to
+            avoid conflicting writes. Found in the volume's `ClusterVolume`
+            field.
+          type: "integer"
+          format: "int64"
+          required: true
+      tags: ["Volume"]
+
     delete:
       summary: "Remove a volume"
       description: "Instruct the driver to remove the volume."
@@ -9278,6 +9576,7 @@ paths:
           type: "boolean"
           default: false
       tags: ["Volume"]
+
   /volumes/prune:
     post:
       summary: "Delete unused volumes"

+ 11 - 3
api/types/mount/mount.go

@@ -17,6 +17,8 @@ const (
 	TypeTmpfs Type = "tmpfs"
 	// TypeNamedPipe is the type for mounting Windows named pipes
 	TypeNamedPipe Type = "npipe"
+	// TypeCluster is the type for Swarm Cluster Volumes.
+	TypeCluster = "csi"
 )
 
 // Mount represents a mount (volume).
@@ -30,9 +32,10 @@ type Mount struct {
 	ReadOnly    bool        `json:",omitempty"`
 	Consistency Consistency `json:",omitempty"`
 
-	BindOptions   *BindOptions   `json:",omitempty"`
-	VolumeOptions *VolumeOptions `json:",omitempty"`
-	TmpfsOptions  *TmpfsOptions  `json:",omitempty"`
+	BindOptions    *BindOptions    `json:",omitempty"`
+	VolumeOptions  *VolumeOptions  `json:",omitempty"`
+	TmpfsOptions   *TmpfsOptions   `json:",omitempty"`
+	ClusterOptions *ClusterOptions `json:",omitempty"`
 }
 
 // Propagation represents the propagation of a mount.
@@ -129,3 +132,8 @@ type TmpfsOptions struct {
 	// Some of these may be straightforward to add, but others, such as
 	// uid/gid have implications in a clustered system.
 }
+
+// ClusterOptions specifies options for a Cluster volume.
+type ClusterOptions struct {
+	// intentionally empty
+}

+ 24 - 0
api/types/swarm/node.go

@@ -53,6 +53,7 @@ type NodeDescription struct {
 	Resources Resources         `json:",omitempty"`
 	Engine    EngineDescription `json:",omitempty"`
 	TLSInfo   TLSInfo           `json:",omitempty"`
+	CSIInfo   []NodeCSIInfo     `json:",omitempty"`
 }
 
 // Platform represents the platform (Arch/OS).
@@ -68,6 +69,21 @@ type EngineDescription struct {
 	Plugins       []PluginDescription `json:",omitempty"`
 }
 
+// NodeCSIInfo represents information about a CSI plugin available on the node
+type NodeCSIInfo struct {
+	// PluginName is the name of the CSI plugin.
+	PluginName string `json:",omitempty"`
+	// NodeID is the ID of the node as reported by the CSI plugin. This is
+	// different from the swarm node ID.
+	NodeID string `json:",omitempty"`
+	// MaxVolumesPerNode is the maximum number of volumes that may be published
+	// to this node
+	MaxVolumesPerNode int64 `json:",omitempty"`
+	// AccessibleTopology indicates the location of this node in the CSI
+	// plugin's topology
+	AccessibleTopology *Topology `json:",omitempty"`
+}
+
 // PluginDescription represents the description of an engine plugin.
 type PluginDescription struct {
 	Type string `json:",omitempty"`
@@ -113,3 +129,11 @@ const (
 	// NodeStateDisconnected DISCONNECTED
 	NodeStateDisconnected NodeState = "disconnected"
 )
+
+// Topology defines the CSI topology of this node. This type is a duplicate of
+// github.com/docker/docker/api/types.Topology. Because the type definition
+// is so simple and to avoid complicated structure or circular imports, we just
+// duplicate it here. See that type for full documentation
+type Topology struct {
+	Segments map[string]string `json:",omitempty"`
+}

+ 19 - 0
api/types/swarm/task.go

@@ -62,6 +62,11 @@ type Task struct {
 	// used to determine which Tasks belong to which run of the job. This field
 	// is absent if the Service mode is Replicated or Global.
 	JobIteration *Version `json:",omitempty"`
+
+	// Volumes is the list of VolumeAttachments for this task. It specifies
+	// which particular volumes are to be used by this particular task, and
+	// fulfilling what mounts in the spec.
+	Volumes []VolumeAttachment
 }
 
 // TaskSpec represents the spec of a task.
@@ -204,3 +209,17 @@ type ContainerStatus struct {
 type PortStatus struct {
 	Ports []PortConfig `json:",omitempty"`
 }
+
+// VolumeAttachment contains the associating a Volume to a Task.
+type VolumeAttachment struct {
+	// ID is the Swarmkit ID of the Volume. This is not the CSI VolumeId.
+	ID string `json:",omitempty"`
+
+	// Source, together with Target, indicates the Mount, as specified in the
+	// ContainerSpec, that this volume fulfills.
+	Source string `json:",omitempty"`
+
+	// Target, together with Source, indicates the Mount, as specified
+	// in the ContainerSpec, that this volume fulfills.
+	Target string `json:",omitempty"`
+}

+ 420 - 0
api/types/volume/cluster_volume.go

@@ -0,0 +1,420 @@
+package volume
+
+import (
+	"github.com/docker/docker/api/types/swarm"
+)
+
+// ClusterVolume contains options and information specific to, and only present
+// on, Swarm CSI cluster volumes.
+type ClusterVolume struct {
+	// ID is the Swarm ID of the volume. Because cluster volumes are Swarm
+	// objects, they have an ID, unlike non-cluster volumes, which only have a
+	// Name. This ID can be used to refer to the cluster volume.
+	ID string
+
+	// Meta is the swarm metadata about this volume.
+	swarm.Meta
+
+	// Spec is the cluster-specific options from which this volume is derived.
+	Spec ClusterVolumeSpec
+
+	// PublishStatus contains the status of the volume as it pertains to its
+	// publishing on Nodes.
+	PublishStatus []*PublishStatus `json:",omitempty"`
+
+	// Info is information about the global status of the volume.
+	Info *Info `json:",omitempty"`
+}
+
+// ClusterVolumeSpec contains the spec used to create this volume.
+type ClusterVolumeSpec struct {
+	// Group defines the volume group of this volume. Volumes belonging to the
+	// same group can be referred to by group name when creating Services.
+	// Referring to a volume by group instructs swarm to treat volumes in that
+	// group interchangeably for the purpose of scheduling. Volumes with an
+	// empty string for a group technically all belong to the same, emptystring
+	// group.
+	Group string `json:",omitempty"`
+
+	// AccessMode defines how the volume is used by tasks.
+	AccessMode *AccessMode `json:",omitempty"`
+
+	// AccessibilityRequirements specifies where in the cluster a volume must
+	// be accessible from.
+	//
+	// This field must be empty if the plugin does not support
+	// VOLUME_ACCESSIBILITY_CONSTRAINTS capabilities. If it is present but the
+	// plugin does not support it, volume will not be created.
+	//
+	// If AccessibilityRequirements is empty, but the plugin does support
+	// VOLUME_ACCESSIBILITY_CONSTRAINTS, then Swarmkit will assume the entire
+	// cluster is a valid target for the volume.
+	AccessibilityRequirements *TopologyRequirement `json:",omitempty"`
+
+	// CapacityRange defines the desired capacity that the volume should be
+	// created with. If nil, the plugin will decide the capacity.
+	CapacityRange *CapacityRange `json:",omitempty"`
+
+	// Secrets defines Swarm Secrets that are passed to the CSI storage plugin
+	// when operating on this volume.
+	Secrets []Secret `json:",omitempty"`
+
+	// Availability is the Volume's desired availability. Analogous to Node
+	// Availability, this allows the user to take volumes offline in order to
+	// update or delete them.
+	Availability Availability `json:",omitempty"`
+}
+
+// Availability specifies the availability of the volume.
+type Availability string
+
+const (
+	// AvailabilityActive indicates that the volume is active and fully
+	// schedulable on the cluster.
+	AvailabilityActive Availability = "active"
+
+	// AvailabilityPause indicates that no new workloads should use the
+	// volume, but existing workloads can continue to use it.
+	AvailabilityPause Availability = "pause"
+
+	// AvailabilityDrain indicates that all workloads using this volume
+	// should be rescheduled, and the volume unpublished from all nodes.
+	AvailabilityDrain Availability = "drain"
+)
+
+// AccessMode defines the access mode of a volume.
+type AccessMode struct {
+	// Scope defines the set of nodes this volume can be used on at one time.
+	Scope Scope `json:",omitempty"`
+
+	// Sharing defines the number and way that different tasks can use this
+	// volume at one time.
+	Sharing SharingMode `json:",omitempty"`
+
+	// MountVolume defines options for using this volume as a Mount-type
+	// volume.
+	//
+	// Either BlockVolume or MountVolume, but not both, must be present.
+	MountVolume *TypeMount `json:",omitempty"`
+
+	// BlockVolume defines options for using this volume as a Block-type
+	// volume.
+	//
+	// Either BlockVolume or MountVolume, but not both, must be present.
+	BlockVolume *TypeBlock `json:",omitempty"`
+}
+
+// Scope defines the Scope of a CSI Volume. This is how many nodes a
+// Volume can be accessed simultaneously on.
+type Scope string
+
+const (
+	// ScopeSingleNode indicates the volume can be used on one node at a
+	// time.
+	ScopeSingleNode Scope = "single"
+
+	// ScopeMultiNode indicates the volume can be used on many nodes at
+	// the same time.
+	ScopeMultiNode Scope = "multi"
+)
+
+// SharingMode defines the Sharing of a CSI Volume. This is how Tasks using a
+// Volume at the same time can use it.
+type SharingMode string
+
+const (
+	// SharingNone indicates that only one Task may use the Volume at a
+	// time.
+	SharingNone SharingMode = "none"
+
+	// SharingReadOnly indicates that the Volume may be shared by any
+	// number of Tasks, but they must be read-only.
+	SharingReadOnly SharingMode = "readonly"
+
+	// SharingOneWriter indicates that the Volume may be shared by any
+	// number of Tasks, but all after the first must be read-only.
+	SharingOneWriter SharingMode = "onewriter"
+
+	// SharingAll means that the Volume may be shared by any number of
+	// Tasks, as readers or writers.
+	SharingAll SharingMode = "all"
+)
+
+// TypeBlock defines options for using a volume as a block-type volume.
+//
+// Intentionally empty.
+type TypeBlock struct{}
+
+// TypeMount contains options for using a volume as a Mount-type
+// volume.
+type TypeMount struct {
+	// FsType specifies the filesystem type for the mount volume. Optional.
+	FsType string `json:",omitempty"`
+
+	// MountFlags defines flags to pass when mounting the volume. Optional.
+	MountFlags []string `json:",omitempty"`
+}
+
+// TopologyRequirement expresses the user's requirements for a volume's
+// accessible topology.
+type TopologyRequirement struct {
+	// Requisite specifies a list of Topologies, at least one of which the
+	// volume must be accessible from.
+	//
+	// Taken verbatim from the CSI Spec:
+	//
+	// Specifies the list of topologies the provisioned volume MUST be
+	// accessible from.
+	// This field is OPTIONAL. If TopologyRequirement is specified either
+	// requisite or preferred or both MUST be specified.
+	//
+	// If requisite is specified, the provisioned volume MUST be
+	// accessible from at least one of the requisite topologies.
+	//
+	// Given
+	//   x = number of topologies provisioned volume is accessible from
+	//   n = number of requisite topologies
+	// The CO MUST ensure n >= 1. The SP MUST ensure x >= 1
+	// If x==n, then the SP MUST make the provisioned volume available to
+	// all topologies from the list of requisite topologies. If it is
+	// unable to do so, the SP MUST fail the CreateVolume call.
+	// For example, if a volume should be accessible from a single zone,
+	// and requisite =
+	//   {"region": "R1", "zone": "Z2"}
+	// then the provisioned volume MUST be accessible from the "region"
+	// "R1" and the "zone" "Z2".
+	// Similarly, if a volume should be accessible from two zones, and
+	// requisite =
+	//   {"region": "R1", "zone": "Z2"},
+	//   {"region": "R1", "zone": "Z3"}
+	// then the provisioned volume MUST be accessible from the "region"
+	// "R1" and both "zone" "Z2" and "zone" "Z3".
+	//
+	// If x<n, then the SP SHALL choose x unique topologies from the list
+	// of requisite topologies. If it is unable to do so, the SP MUST fail
+	// the CreateVolume call.
+	// For example, if a volume should be accessible from a single zone,
+	// and requisite =
+	//   {"region": "R1", "zone": "Z2"},
+	//   {"region": "R1", "zone": "Z3"}
+	// then the SP may choose to make the provisioned volume available in
+	// either the "zone" "Z2" or the "zone" "Z3" in the "region" "R1".
+	// Similarly, if a volume should be accessible from two zones, and
+	// requisite =
+	//   {"region": "R1", "zone": "Z2"},
+	//   {"region": "R1", "zone": "Z3"},
+	//   {"region": "R1", "zone": "Z4"}
+	// then the provisioned volume MUST be accessible from any combination
+	// of two unique topologies: e.g. "R1/Z2" and "R1/Z3", or "R1/Z2" and
+	//  "R1/Z4", or "R1/Z3" and "R1/Z4".
+	//
+	// If x>n, then the SP MUST make the provisioned volume available from
+	// all topologies from the list of requisite topologies and MAY choose
+	// the remaining x-n unique topologies from the list of all possible
+	// topologies. If it is unable to do so, the SP MUST fail the
+	// CreateVolume call.
+	// For example, if a volume should be accessible from two zones, and
+	// requisite =
+	//   {"region": "R1", "zone": "Z2"}
+	// then the provisioned volume MUST be accessible from the "region"
+	// "R1" and the "zone" "Z2" and the SP may select the second zone
+	// independently, e.g. "R1/Z4".
+	Requisite []Topology `json:",omitempty"`
+
+	// Preferred is a list of Topologies that the volume should attempt to be
+	// provisioned in.
+	//
+	// Taken from the CSI spec:
+	//
+	// Specifies the list of topologies the CO would prefer the volume to
+	// be provisioned in.
+	//
+	// This field is OPTIONAL. If TopologyRequirement is specified either
+	// requisite or preferred or both MUST be specified.
+	//
+	// An SP MUST attempt to make the provisioned volume available using
+	// the preferred topologies in order from first to last.
+	//
+	// If requisite is specified, all topologies in preferred list MUST
+	// also be present in the list of requisite topologies.
+	//
+	// If the SP is unable to to make the provisioned volume available
+	// from any of the preferred topologies, the SP MAY choose a topology
+	// from the list of requisite topologies.
+	// If the list of requisite topologies is not specified, then the SP
+	// MAY choose from the list of all possible topologies.
+	// If the list of requisite topologies is specified and the SP is
+	// unable to to make the provisioned volume available from any of the
+	// requisite topologies it MUST fail the CreateVolume call.
+	//
+	// Example 1:
+	// Given a volume should be accessible from a single zone, and
+	// requisite =
+	//   {"region": "R1", "zone": "Z2"},
+	//   {"region": "R1", "zone": "Z3"}
+	// preferred =
+	//   {"region": "R1", "zone": "Z3"}
+	// then the the SP SHOULD first attempt to make the provisioned volume
+	// available from "zone" "Z3" in the "region" "R1" and fall back to
+	// "zone" "Z2" in the "region" "R1" if that is not possible.
+	//
+	// Example 2:
+	// Given a volume should be accessible from a single zone, and
+	// requisite =
+	//   {"region": "R1", "zone": "Z2"},
+	//   {"region": "R1", "zone": "Z3"},
+	//   {"region": "R1", "zone": "Z4"},
+	//   {"region": "R1", "zone": "Z5"}
+	// preferred =
+	//   {"region": "R1", "zone": "Z4"},
+	//   {"region": "R1", "zone": "Z2"}
+	// then the the SP SHOULD first attempt to make the provisioned volume
+	// accessible from "zone" "Z4" in the "region" "R1" and fall back to
+	// "zone" "Z2" in the "region" "R1" if that is not possible. If that
+	// is not possible, the SP may choose between either the "zone"
+	// "Z3" or "Z5" in the "region" "R1".
+	//
+	// Example 3:
+	// Given a volume should be accessible from TWO zones (because an
+	// opaque parameter in CreateVolumeRequest, for example, specifies
+	// the volume is accessible from two zones, aka synchronously
+	// replicated), and
+	// requisite =
+	//   {"region": "R1", "zone": "Z2"},
+	//   {"region": "R1", "zone": "Z3"},
+	//   {"region": "R1", "zone": "Z4"},
+	//   {"region": "R1", "zone": "Z5"}
+	// preferred =
+	//   {"region": "R1", "zone": "Z5"},
+	//   {"region": "R1", "zone": "Z3"}
+	// then the the SP SHOULD first attempt to make the provisioned volume
+	// accessible from the combination of the two "zones" "Z5" and "Z3" in
+	// the "region" "R1". If that's not possible, it should fall back to
+	// a combination of "Z5" and other possibilities from the list of
+	// requisite. If that's not possible, it should fall back  to a
+	// combination of "Z3" and other possibilities from the list of
+	// requisite. If that's not possible, it should fall back  to a
+	// combination of other possibilities from the list of requisite.
+	Preferred []Topology `json:",omitempty"`
+}
+
+// Topology is a map of topological domains to topological segments.
+//
+// This description is taken verbatim from the CSI Spec:
+//
+// A topological domain is a sub-division of a cluster, like "region",
+// "zone", "rack", etc.
+// A topological segment is a specific instance of a topological domain,
+// like "zone3", "rack3", etc.
+// For example {"com.company/zone": "Z1", "com.company/rack": "R3"}
+// Valid keys have two segments: an OPTIONAL prefix and name, separated
+// by a slash (/), for example: "com.company.example/zone".
+// The key name segment is REQUIRED. The prefix is OPTIONAL.
+// The key name MUST be 63 characters or less, begin and end with an
+// alphanumeric character ([a-z0-9A-Z]), and contain only dashes (-),
+// underscores (_), dots (.), or alphanumerics in between, for example
+// "zone".
+// The key prefix MUST be 63 characters or less, begin and end with a
+// lower-case alphanumeric character ([a-z0-9]), contain only
+// dashes (-), dots (.), or lower-case alphanumerics in between, and
+// follow domain name notation format
+// (https://tools.ietf.org/html/rfc1035#section-2.3.1).
+// The key prefix SHOULD include the plugin's host company name and/or
+// the plugin name, to minimize the possibility of collisions with keys
+// from other plugins.
+// If a key prefix is specified, it MUST be identical across all
+// topology keys returned by the SP (across all RPCs).
+// Keys MUST be case-insensitive. Meaning the keys "Zone" and "zone"
+// MUST not both exist.
+// Each value (topological segment) MUST contain 1 or more strings.
+// Each string MUST be 63 characters or less and begin and end with an
+// alphanumeric character with '-', '_', '.', or alphanumerics in
+// between.
+type Topology struct {
+	Segments map[string]string `json:",omitempty"`
+}
+
+// CapacityRange describes the minimum and maximum capacity a volume should be
+// created with
+type CapacityRange struct {
+	// RequiredBytes specifies that a volume must be at least this big. The
+	// value of 0 indicates an unspecified minimum.
+	RequiredBytes int64
+
+	// LimitBytes specifies that a volume must not be bigger than this. The
+	// value of 0 indicates an unspecified maximum
+	LimitBytes int64
+}
+
+// Secret represents a Swarm Secret value that must be passed to the CSI
+// storage plugin when operating on this Volume. It represents one key-value
+// pair of possibly many.
+type Secret struct {
+	// Key is the name of the key of the key-value pair passed to the plugin.
+	Key string
+
+	// Secret is the swarm Secret object from which to read data. This can be a
+	// Secret name or ID. The Secret data is retrieved by Swarm and used as the
+	// value of the key-value pair passed to the plugin.
+	Secret string
+}
+
+// PublishState represents the state of a Volume as it pertains to its
+// use on a particular Node.
+type PublishState string
+
+const (
+	// StatePending indicates that the volume should be published on
+	// this node, but the call to ControllerPublishVolume has not been
+	// successfully completed yet and the result recorded by swarmkit.
+	StatePending PublishState = "pending-publish"
+
+	// StatePublished means the volume is published successfully to the node.
+	StatePublished PublishState = "published"
+
+	// StatePendingNodeUnpublish indicates that the Volume should be
+	// unpublished on the Node, and we're waiting for confirmation that it has
+	// done so.  After the Node has confirmed that the Volume has been
+	// unpublished, the state will move to StatePendingUnpublish.
+	StatePendingNodeUnpublish PublishState = "pending-node-unpublish"
+
+	// StatePendingUnpublish means the volume is still published to the node
+	// by the controller, awaiting the operation to unpublish it.
+	StatePendingUnpublish PublishState = "pending-controller-unpublish"
+)
+
+// PublishStatus represents the status of the volume as published to an
+// individual node
+type PublishStatus struct {
+	// NodeID is the ID of the swarm node this Volume is published to.
+	NodeID string `json:",omitempty"`
+
+	// State is the publish state of the volume.
+	State PublishState `json:",omitempty"`
+
+	// PublishContext is the PublishContext returned by the CSI plugin when
+	// a volume is published.
+	PublishContext map[string]string `json:",omitempty"`
+}
+
+// Info contains information about the Volume as a whole as provided by
+// the CSI storage plugin.
+type Info struct {
+	// CapacityBytes is the capacity of the volume in bytes. A value of 0
+	// indicates that the capacity is unknown.
+	CapacityBytes int64 `json:",omitempty"`
+
+	// VolumeContext is the context originating from the CSI storage plugin
+	// when the Volume is created.
+	VolumeContext map[string]string `json:",omitempty"`
+
+	// VolumeID is the ID of the Volume as seen by the CSI storage plugin. This
+	// is distinct from the Volume's Swarm ID, which is the ID used by all of
+	// the Docker Engine to refer to the Volume. If this field is blank, then
+	// the Volume has not been successfully created yet.
+	VolumeID string `json:",omitempty"`
+
+	// AccessibleTopolgoy is the topology this volume is actually accessible
+	// from.
+	AccessibleTopology []Topology `json:",omitempty"`
+}

+ 3 - 0
api/types/volume/create_options.go

@@ -9,6 +9,9 @@ package volume
 // swagger:model CreateOptions
 type CreateOptions struct {
 
+	// cluster volume spec
+	ClusterVolumeSpec *ClusterVolumeSpec `json:"ClusterVolumeSpec,omitempty"`
+
 	// Name of the volume driver to use.
 	Driver string `json:"Driver,omitempty"`
 

+ 8 - 0
api/types/volume/options.go

@@ -0,0 +1,8 @@
+package volume // import "github.com/docker/docker/api/types/volume"
+
+import "github.com/docker/docker/api/types/filters"
+
+// ListOptions holds parameters to list volumes.
+type ListOptions struct {
+	Filters filters.Args
+}

+ 3 - 0
api/types/volume/volume.go

@@ -7,6 +7,9 @@ package volume
 // swagger:model Volume
 type Volume struct {
 
+	// cluster volume
+	ClusterVolume *ClusterVolume `json:"ClusterVolume,omitempty"`
+
 	// Date/Time the volume was created.
 	CreatedAt string `json:"CreatedAt,omitempty"`
 

+ 7 - 0
api/types/volume/volume_update.go

@@ -0,0 +1,7 @@
+package volume // import "github.com/docker/docker/api/types/volume"
+
+// UpdateOptions is configuration to update a Volume with.
+type UpdateOptions struct {
+	// Spec is the ClusterVolumeSpec to update the volume to.
+	Spec *ClusterVolumeSpec `json:"Spec,omitempty"`
+}

+ 1 - 0
client/interface.go

@@ -179,6 +179,7 @@ type VolumeAPIClient interface {
 	VolumeList(ctx context.Context, filter filters.Args) (volume.ListResponse, error)
 	VolumeRemove(ctx context.Context, volumeID string, force bool) error
 	VolumesPrune(ctx context.Context, pruneFilter filters.Args) (types.VolumesPruneReport, error)
+	VolumeUpdate(ctx context.Context, volumeID string, version swarm.Version, options volume.UpdateOptions) error
 }
 
 // SecretAPIClient defines API client methods for secrets

+ 8 - 0
client/request.go

@@ -49,6 +49,14 @@ func (cli *Client) postRaw(ctx context.Context, path string, query url.Values, b
 	return cli.sendRequest(ctx, http.MethodPost, path, query, body, headers)
 }
 
+func (cli *Client) put(ctx context.Context, path string, query url.Values, obj interface{}, headers map[string][]string) (serverResponse, error) {
+	body, headers, err := encodeBody(obj, headers)
+	if err != nil {
+		return serverResponse{}, err
+	}
+	return cli.sendRequest(ctx, http.MethodPut, path, query, body, headers)
+}
+
 // putRaw sends an http request to the docker API using the method PUT.
 func (cli *Client) putRaw(ctx context.Context, path string, query url.Values, body io.Reader, headers map[string][]string) (serverResponse, error) {
 	return cli.sendRequest(ctx, http.MethodPut, path, query, body, headers)

+ 25 - 0
client/volume_update.go

@@ -0,0 +1,25 @@
+package client // import "github.com/docker/docker/client"
+
+import (
+	"context"
+	"net/url"
+	"strconv"
+
+	"github.com/docker/docker/api/types/swarm"
+	"github.com/docker/docker/api/types/volume"
+)
+
+// VolumeUpdate updates a volume. This only works for Cluster Volumes, and
+// only some fields can be updated.
+func (cli *Client) VolumeUpdate(ctx context.Context, volumeID string, version swarm.Version, options volume.UpdateOptions) error {
+	if err := cli.NewVersionError("1.42", "volume update"); err != nil {
+		return err
+	}
+
+	query := url.Values{}
+	query.Set("version", strconv.FormatUint(version.Index, 10))
+
+	resp, err := cli.put(ctx, "/volumes/"+volumeID, query, options, nil)
+	ensureReaderClosed(resp)
+	return err
+}

+ 55 - 0
client/volume_update_test.go

@@ -0,0 +1,55 @@
+package client // import "github.com/docker/docker/client"
+
+import (
+	"bytes"
+	"context"
+	"fmt"
+	"io"
+	"net/http"
+	"strings"
+	"testing"
+
+	"github.com/docker/docker/api/types/swarm"
+	volumetypes "github.com/docker/docker/api/types/volume"
+	"github.com/docker/docker/errdefs"
+)
+
+func TestVolumeUpdateError(t *testing.T) {
+	client := &Client{
+		client: newMockClient(errorMock(http.StatusInternalServerError, "Server error")),
+	}
+
+	err := client.VolumeUpdate(context.Background(), "", swarm.Version{}, volumetypes.UpdateOptions{})
+
+	if !errdefs.IsSystem(err) {
+		t.Fatalf("expected a Server Error, got %[1]T: %[1]v", err)
+	}
+}
+
+func TestVolumeUpdate(t *testing.T) {
+	expectedURL := "/volumes/test1"
+	expectedVersion := "version=10"
+
+	client := &Client{
+		client: newMockClient(func(req *http.Request) (*http.Response, error) {
+			if !strings.HasPrefix(req.URL.Path, expectedURL) {
+				return nil, fmt.Errorf("Expected URL '%s', got '%s'", expectedURL, req.URL)
+			}
+			if req.Method != http.MethodPut {
+				return nil, fmt.Errorf("expected PUT method, got %s", req.Method)
+			}
+			if !strings.Contains(req.URL.RawQuery, expectedVersion) {
+				return nil, fmt.Errorf("expected query to contain '%s', got '%s'", expectedVersion, req.URL.RawQuery)
+			}
+			return &http.Response{
+				StatusCode: http.StatusOK,
+				Body:       io.NopCloser(bytes.NewReader([]byte("body"))),
+			}, nil
+		}),
+	}
+
+	err := client.VolumeUpdate(context.Background(), "test1", swarm.Version{Index: uint64(10)}, volumetypes.UpdateOptions{})
+	if err != nil {
+		t.Fatal(err)
+	}
+}

+ 1 - 1
cmd/dockerd/daemon.go

@@ -531,7 +531,7 @@ func initRouter(opts routerOptions) {
 		container.NewRouter(opts.daemon, decoder, opts.daemon.RawSysInfo().CgroupUnified),
 		image.NewRouter(opts.daemon.ImageService()),
 		systemrouter.NewRouter(opts.daemon, opts.cluster, opts.buildkit, opts.features),
-		volume.NewRouter(opts.daemon.VolumesService()),
+		volume.NewRouter(opts.daemon.VolumesService(), opts.cluster),
 		build.NewRouter(opts.buildBackend, opts.daemon, opts.features),
 		sessionrouter.NewRouter(opts.sessionManager),
 		swarmrouter.NewRouter(opts.cluster),

+ 12 - 0
daemon/cluster/convert/node.go

@@ -56,6 +56,18 @@ func NodeFromGRPC(n swarmapi.Node) types.Node {
 			node.Description.TLSInfo.CertIssuerPublicKey = n.Description.TLSInfo.CertIssuerPublicKey
 			node.Description.TLSInfo.CertIssuerSubject = n.Description.TLSInfo.CertIssuerSubject
 		}
+		for _, csi := range n.Description.CSIInfo {
+			if csi != nil {
+				node.Description.CSIInfo = append(
+					node.Description.CSIInfo,
+					types.NodeCSIInfo{
+						PluginName:        csi.PluginName,
+						NodeID:            csi.NodeID,
+						MaxVolumesPerNode: csi.MaxVolumesPerNode,
+					},
+				)
+			}
+		}
 	}
 
 	// Manager

+ 11 - 0
daemon/cluster/convert/task.go

@@ -57,6 +57,17 @@ func TaskFromGRPC(t swarmapi.Task) (types.Task, error) {
 		}
 	}
 
+	// appending to a nil slice is valid. if there are no items in t.Volumes,
+	// then the task.Volumes will remain nil; otherwise, it will contain
+	// converted entries.
+	for _, v := range t.Volumes {
+		task.Volumes = append(task.Volumes, types.VolumeAttachment{
+			ID:     v.ID,
+			Source: v.Source,
+			Target: v.Target,
+		})
+	}
+
 	if t.Status.PortStatus == nil {
 		return task, nil
 	}

+ 311 - 0
daemon/cluster/convert/volume.go

@@ -0,0 +1,311 @@
+package convert // import "github.com/docker/docker/daemon/cluster/convert"
+
+import (
+	volumetypes "github.com/docker/docker/api/types/volume"
+	gogotypes "github.com/gogo/protobuf/types"
+	swarmapi "github.com/moby/swarmkit/v2/api"
+)
+
+// VolumeFromGRPC converts a swarmkit api Volume object to a docker api Volume
+// object
+func VolumeFromGRPC(v *swarmapi.Volume) volumetypes.Volume {
+	clusterVolumeSpec := volumetypes.ClusterVolumeSpec{
+		Group:                     v.Spec.Group,
+		AccessMode:                accessModeFromGRPC(v.Spec.AccessMode),
+		AccessibilityRequirements: topologyRequirementFromGRPC(v.Spec.AccessibilityRequirements),
+		CapacityRange:             capacityRangeFromGRPC(v.Spec.CapacityRange),
+		Secrets:                   volumeSecretsFromGRPC(v.Spec.Secrets),
+		Availability:              volumeAvailabilityFromGRPC(v.Spec.Availability),
+	}
+
+	clusterVolume := &volumetypes.ClusterVolume{
+		ID:            v.ID,
+		Spec:          clusterVolumeSpec,
+		PublishStatus: volumePublishStatusFromGRPC(v.PublishStatus),
+		Info:          volumeInfoFromGRPC(v.VolumeInfo),
+	}
+
+	clusterVolume.Version.Index = v.Meta.Version.Index
+	clusterVolume.CreatedAt, _ = gogotypes.TimestampFromProto(v.Meta.CreatedAt)
+	clusterVolume.UpdatedAt, _ = gogotypes.TimestampFromProto(v.Meta.UpdatedAt)
+
+	return volumetypes.Volume{
+		ClusterVolume: clusterVolume,
+		CreatedAt:     clusterVolume.CreatedAt.String(),
+		Driver:        v.Spec.Driver.Name,
+		Labels:        v.Spec.Annotations.Labels,
+		Name:          v.Spec.Annotations.Name,
+		Options:       v.Spec.Driver.Options,
+		Scope:         "global",
+	}
+}
+
+func volumeSpecToGRPC(spec volumetypes.ClusterVolumeSpec) *swarmapi.VolumeSpec {
+	swarmSpec := &swarmapi.VolumeSpec{
+		Group: spec.Group,
+	}
+
+	if spec.AccessMode != nil {
+		swarmSpec.AccessMode = &swarmapi.VolumeAccessMode{}
+
+		switch spec.AccessMode.Scope {
+		case volumetypes.ScopeSingleNode:
+			swarmSpec.AccessMode.Scope = swarmapi.VolumeScopeSingleNode
+		case volumetypes.ScopeMultiNode:
+			swarmSpec.AccessMode.Scope = swarmapi.VolumeScopeMultiNode
+		}
+
+		switch spec.AccessMode.Sharing {
+		case volumetypes.SharingNone:
+			swarmSpec.AccessMode.Sharing = swarmapi.VolumeSharingNone
+		case volumetypes.SharingReadOnly:
+			swarmSpec.AccessMode.Sharing = swarmapi.VolumeSharingReadOnly
+		case volumetypes.SharingOneWriter:
+			swarmSpec.AccessMode.Sharing = swarmapi.VolumeSharingOneWriter
+		case volumetypes.SharingAll:
+			swarmSpec.AccessMode.Sharing = swarmapi.VolumeSharingAll
+		}
+
+		if spec.AccessMode.BlockVolume != nil {
+			swarmSpec.AccessMode.AccessType = &swarmapi.VolumeAccessMode_Block{
+				Block: &swarmapi.VolumeAccessMode_BlockVolume{},
+			}
+		}
+		if spec.AccessMode.MountVolume != nil {
+			swarmSpec.AccessMode.AccessType = &swarmapi.VolumeAccessMode_Mount{
+				Mount: &swarmapi.VolumeAccessMode_MountVolume{
+					FsType:     spec.AccessMode.MountVolume.FsType,
+					MountFlags: spec.AccessMode.MountVolume.MountFlags,
+				},
+			}
+		}
+	}
+
+	for _, secret := range spec.Secrets {
+		swarmSpec.Secrets = append(swarmSpec.Secrets, &swarmapi.VolumeSecret{
+			Key:    secret.Key,
+			Secret: secret.Secret,
+		})
+	}
+
+	if spec.AccessibilityRequirements != nil {
+		swarmSpec.AccessibilityRequirements = &swarmapi.TopologyRequirement{}
+
+		for _, top := range spec.AccessibilityRequirements.Requisite {
+			swarmSpec.AccessibilityRequirements.Requisite = append(
+				swarmSpec.AccessibilityRequirements.Requisite,
+				&swarmapi.Topology{
+					Segments: top.Segments,
+				},
+			)
+		}
+
+		for _, top := range spec.AccessibilityRequirements.Preferred {
+			swarmSpec.AccessibilityRequirements.Preferred = append(
+				swarmSpec.AccessibilityRequirements.Preferred,
+				&swarmapi.Topology{
+					Segments: top.Segments,
+				},
+			)
+		}
+	}
+
+	if spec.CapacityRange != nil {
+		swarmSpec.CapacityRange = &swarmapi.CapacityRange{
+			RequiredBytes: spec.CapacityRange.RequiredBytes,
+			LimitBytes:    spec.CapacityRange.LimitBytes,
+		}
+	}
+
+	// availability is not a pointer, it is a value. if the user does not
+	// specify an availability, it will be inferred as the 0-value, which is
+	// "active".
+	switch spec.Availability {
+	case volumetypes.AvailabilityActive:
+		swarmSpec.Availability = swarmapi.VolumeAvailabilityActive
+	case volumetypes.AvailabilityPause:
+		swarmSpec.Availability = swarmapi.VolumeAvailabilityPause
+	case volumetypes.AvailabilityDrain:
+		swarmSpec.Availability = swarmapi.VolumeAvailabilityDrain
+	}
+
+	return swarmSpec
+}
+
+// VolumeCreateToGRPC takes a VolumeCreateBody and outputs the matching
+// swarmapi VolumeSpec.
+func VolumeCreateToGRPC(volume *volumetypes.CreateOptions) *swarmapi.VolumeSpec {
+	var swarmSpec *swarmapi.VolumeSpec
+	if volume != nil && volume.ClusterVolumeSpec != nil {
+		swarmSpec = volumeSpecToGRPC(*volume.ClusterVolumeSpec)
+	} else {
+		swarmSpec = &swarmapi.VolumeSpec{}
+	}
+
+	swarmSpec.Annotations = swarmapi.Annotations{
+		Name:   volume.Name,
+		Labels: volume.Labels,
+	}
+
+	swarmSpec.Driver = &swarmapi.Driver{
+		Name:    volume.Driver,
+		Options: volume.DriverOpts,
+	}
+
+	return swarmSpec
+}
+
+func volumeInfoFromGRPC(info *swarmapi.VolumeInfo) *volumetypes.Info {
+	if info == nil {
+		return nil
+	}
+
+	var accessibleTopology []volumetypes.Topology
+	if info.AccessibleTopology != nil {
+		accessibleTopology = make([]volumetypes.Topology, len(info.AccessibleTopology))
+		for i, top := range info.AccessibleTopology {
+			accessibleTopology[i] = topologyFromGRPC(top)
+		}
+	}
+
+	return &volumetypes.Info{
+		CapacityBytes:      info.CapacityBytes,
+		VolumeContext:      info.VolumeContext,
+		VolumeID:           info.VolumeID,
+		AccessibleTopology: accessibleTopology,
+	}
+}
+
+func volumePublishStatusFromGRPC(publishStatus []*swarmapi.VolumePublishStatus) []*volumetypes.PublishStatus {
+	if publishStatus == nil {
+		return nil
+	}
+
+	vps := make([]*volumetypes.PublishStatus, len(publishStatus))
+	for i, status := range publishStatus {
+		var state volumetypes.PublishState
+		switch status.State {
+		case swarmapi.VolumePublishStatus_PENDING_PUBLISH:
+			state = volumetypes.StatePending
+		case swarmapi.VolumePublishStatus_PUBLISHED:
+			state = volumetypes.StatePublished
+		case swarmapi.VolumePublishStatus_PENDING_NODE_UNPUBLISH:
+			state = volumetypes.StatePendingNodeUnpublish
+		case swarmapi.VolumePublishStatus_PENDING_UNPUBLISH:
+			state = volumetypes.StatePendingUnpublish
+		}
+
+		vps[i] = &volumetypes.PublishStatus{
+			NodeID:         status.NodeID,
+			State:          state,
+			PublishContext: status.PublishContext,
+		}
+	}
+
+	return vps
+}
+
+func accessModeFromGRPC(accessMode *swarmapi.VolumeAccessMode) *volumetypes.AccessMode {
+	if accessMode == nil {
+		return nil
+	}
+
+	convertedAccessMode := &volumetypes.AccessMode{}
+
+	switch accessMode.Scope {
+	case swarmapi.VolumeScopeSingleNode:
+		convertedAccessMode.Scope = volumetypes.ScopeSingleNode
+	case swarmapi.VolumeScopeMultiNode:
+		convertedAccessMode.Scope = volumetypes.ScopeMultiNode
+	}
+
+	switch accessMode.Sharing {
+	case swarmapi.VolumeSharingNone:
+		convertedAccessMode.Sharing = volumetypes.SharingNone
+	case swarmapi.VolumeSharingReadOnly:
+		convertedAccessMode.Sharing = volumetypes.SharingReadOnly
+	case swarmapi.VolumeSharingOneWriter:
+		convertedAccessMode.Sharing = volumetypes.SharingOneWriter
+	case swarmapi.VolumeSharingAll:
+		convertedAccessMode.Sharing = volumetypes.SharingAll
+	}
+
+	if block := accessMode.GetBlock(); block != nil {
+		convertedAccessMode.BlockVolume = &volumetypes.TypeBlock{}
+	}
+	if mount := accessMode.GetMount(); mount != nil {
+		convertedAccessMode.MountVolume = &volumetypes.TypeMount{
+			FsType:     mount.FsType,
+			MountFlags: mount.MountFlags,
+		}
+	}
+
+	return convertedAccessMode
+}
+
+func volumeSecretsFromGRPC(secrets []*swarmapi.VolumeSecret) []volumetypes.Secret {
+	if secrets == nil {
+		return nil
+	}
+	convertedSecrets := make([]volumetypes.Secret, len(secrets))
+	for i, secret := range secrets {
+		convertedSecrets[i] = volumetypes.Secret{
+			Key:    secret.Key,
+			Secret: secret.Secret,
+		}
+	}
+	return convertedSecrets
+}
+
+func topologyRequirementFromGRPC(top *swarmapi.TopologyRequirement) *volumetypes.TopologyRequirement {
+	if top == nil {
+		return nil
+	}
+
+	convertedTop := &volumetypes.TopologyRequirement{}
+	if top.Requisite != nil {
+		convertedTop.Requisite = make([]volumetypes.Topology, len(top.Requisite))
+		for i, req := range top.Requisite {
+			convertedTop.Requisite[i] = topologyFromGRPC(req)
+		}
+	}
+
+	if top.Preferred != nil {
+		convertedTop.Preferred = make([]volumetypes.Topology, len(top.Preferred))
+		for i, pref := range top.Preferred {
+			convertedTop.Preferred[i] = topologyFromGRPC(pref)
+		}
+	}
+
+	return convertedTop
+}
+
+func topologyFromGRPC(top *swarmapi.Topology) volumetypes.Topology {
+	if top == nil {
+		return volumetypes.Topology{}
+	}
+	return volumetypes.Topology{
+		Segments: top.Segments,
+	}
+}
+
+func capacityRangeFromGRPC(capacity *swarmapi.CapacityRange) *volumetypes.CapacityRange {
+	if capacity == nil {
+		return nil
+	}
+
+	return &volumetypes.CapacityRange{
+		RequiredBytes: capacity.RequiredBytes,
+		LimitBytes:    capacity.LimitBytes,
+	}
+}
+
+func volumeAvailabilityFromGRPC(availability swarmapi.VolumeSpec_VolumeAvailability) volumetypes.Availability {
+	switch availability {
+	case swarmapi.VolumeAvailabilityActive:
+		return volumetypes.AvailabilityActive
+	case swarmapi.VolumeAvailabilityPause:
+		return volumetypes.AvailabilityPause
+	}
+	return volumetypes.AvailabilityDrain
+}

+ 210 - 0
daemon/cluster/convert/volume_test.go

@@ -0,0 +1,210 @@
+package convert
+
+import (
+	"testing"
+
+	volumetypes "github.com/docker/docker/api/types/volume"
+	swarmapi "github.com/moby/swarmkit/v2/api"
+
+	"gotest.tools/v3/assert"
+)
+
+func TestTopologyFromGRPC(t *testing.T) {
+	nilTopology := topologyFromGRPC(nil)
+	assert.DeepEqual(t, nilTopology, volumetypes.Topology{})
+
+	swarmTop := &swarmapi.Topology{
+		Segments: map[string]string{"foo": "bar"},
+	}
+
+	top := topologyFromGRPC(swarmTop)
+	assert.DeepEqual(t, top.Segments, swarmTop.Segments)
+}
+
+func TestCapacityRangeFromGRPC(t *testing.T) {
+	nilCapacity := capacityRangeFromGRPC(nil)
+	assert.Assert(t, nilCapacity == nil)
+
+	swarmZeroCapacity := &swarmapi.CapacityRange{}
+	zeroCapacity := capacityRangeFromGRPC(swarmZeroCapacity)
+	assert.Assert(t, zeroCapacity != nil)
+	assert.Equal(t, zeroCapacity.RequiredBytes, int64(0))
+	assert.Equal(t, zeroCapacity.LimitBytes, int64(0))
+
+	swarmNonZeroCapacity := &swarmapi.CapacityRange{
+		RequiredBytes: 1024,
+		LimitBytes:    2048,
+	}
+	nonZeroCapacity := capacityRangeFromGRPC(swarmNonZeroCapacity)
+	assert.Assert(t, nonZeroCapacity != nil)
+	assert.Equal(t, nonZeroCapacity.RequiredBytes, int64(1024))
+	assert.Equal(t, nonZeroCapacity.LimitBytes, int64(2048))
+}
+
+func TestVolumeAvailabilityFromGRPC(t *testing.T) {
+	for _, tc := range []struct {
+		name     string
+		in       swarmapi.VolumeSpec_VolumeAvailability
+		expected volumetypes.Availability
+	}{
+		{
+			name:     "Active",
+			in:       swarmapi.VolumeAvailabilityActive,
+			expected: volumetypes.AvailabilityActive,
+		}, {
+			name:     "Pause",
+			in:       swarmapi.VolumeAvailabilityPause,
+			expected: volumetypes.AvailabilityPause,
+		}, {
+			name:     "Drain",
+			in:       swarmapi.VolumeAvailabilityDrain,
+			expected: volumetypes.AvailabilityDrain,
+		},
+	} {
+		tc := tc
+		t.Run(tc.name, func(t *testing.T) {
+			actual := volumeAvailabilityFromGRPC(tc.in)
+			assert.Equal(t, actual, tc.expected)
+		})
+	}
+}
+
+// TestAccessModeFromGRPC tests that the AccessMode type is correctly converted
+func TestAccessModeFromGRPC(t *testing.T) {
+	for _, tc := range []struct {
+		name     string
+		in       *swarmapi.VolumeAccessMode
+		expected *volumetypes.AccessMode
+	}{
+		{
+			name: "MountVolume",
+			in: &swarmapi.VolumeAccessMode{
+				Scope:   swarmapi.VolumeScopeSingleNode,
+				Sharing: swarmapi.VolumeSharingNone,
+				AccessType: &swarmapi.VolumeAccessMode_Mount{
+					Mount: &swarmapi.VolumeAccessMode_MountVolume{
+						FsType: "foo",
+						// TODO(dperny): maybe don't convert this?
+						MountFlags: []string{"one", "two"},
+					},
+				},
+			},
+			expected: &volumetypes.AccessMode{
+				Scope:   volumetypes.ScopeSingleNode,
+				Sharing: volumetypes.SharingNone,
+				MountVolume: &volumetypes.TypeMount{
+					FsType:     "foo",
+					MountFlags: []string{"one", "two"},
+				},
+			},
+		}, {
+			name: "BlockVolume",
+			in: &swarmapi.VolumeAccessMode{
+				Scope:   swarmapi.VolumeScopeSingleNode,
+				Sharing: swarmapi.VolumeSharingNone,
+				AccessType: &swarmapi.VolumeAccessMode_Block{
+					Block: &swarmapi.VolumeAccessMode_BlockVolume{},
+				},
+			},
+			expected: &volumetypes.AccessMode{
+				Scope:       volumetypes.ScopeSingleNode,
+				Sharing:     volumetypes.SharingNone,
+				BlockVolume: &volumetypes.TypeBlock{},
+			},
+		},
+	} {
+		tc := tc
+		t.Run(tc.name, func(t *testing.T) {
+			out := accessModeFromGRPC(tc.in)
+			assert.DeepEqual(t, tc.expected, out)
+		})
+	}
+}
+
+// TestVolumeCreateToGRPC tests that a docker-typed VolumeCreateBody is
+// correctly converted to a swarm-typed VolumeSpec.
+func TestVolumeCreateToGRPC(t *testing.T) {
+	volume := &volumetypes.CreateOptions{
+		Driver:     "plug1",
+		DriverOpts: map[string]string{"options": "yeah"},
+		Labels:     map[string]string{"labeled": "yeah"},
+		Name:       "volume1",
+	}
+
+	spec := &volumetypes.ClusterVolumeSpec{
+		Group: "gronp",
+		AccessMode: &volumetypes.AccessMode{
+			Scope:   volumetypes.ScopeMultiNode,
+			Sharing: volumetypes.SharingAll,
+			MountVolume: &volumetypes.TypeMount{
+				FsType:     "foo",
+				MountFlags: []string{"one", "two"},
+			},
+		},
+		Secrets: []volumetypes.Secret{
+			{Key: "key1", Secret: "secret1"},
+			{Key: "key2", Secret: "secret2"},
+		},
+		AccessibilityRequirements: &volumetypes.TopologyRequirement{
+			Requisite: []volumetypes.Topology{
+				{Segments: map[string]string{"top1": "yup"}},
+				{Segments: map[string]string{"top2": "def"}},
+				{Segments: map[string]string{"top3": "nah"}},
+			},
+			Preferred: []volumetypes.Topology{},
+		},
+		CapacityRange: &volumetypes.CapacityRange{
+			RequiredBytes: 1,
+			LimitBytes:    0,
+		},
+	}
+
+	volume.ClusterVolumeSpec = spec
+
+	swarmSpec := VolumeCreateToGRPC(volume)
+
+	assert.Assert(t, swarmSpec != nil)
+	expectedSwarmSpec := &swarmapi.VolumeSpec{
+		Annotations: swarmapi.Annotations{
+			Name: "volume1",
+			Labels: map[string]string{
+				"labeled": "yeah",
+			},
+		},
+		Group: "gronp",
+		Driver: &swarmapi.Driver{
+			Name: "plug1",
+			Options: map[string]string{
+				"options": "yeah",
+			},
+		},
+		AccessMode: &swarmapi.VolumeAccessMode{
+			Scope:   swarmapi.VolumeScopeMultiNode,
+			Sharing: swarmapi.VolumeSharingAll,
+			AccessType: &swarmapi.VolumeAccessMode_Mount{
+				Mount: &swarmapi.VolumeAccessMode_MountVolume{
+					FsType:     "foo",
+					MountFlags: []string{"one", "two"},
+				},
+			},
+		},
+		Secrets: []*swarmapi.VolumeSecret{
+			{Key: "key1", Secret: "secret1"},
+			{Key: "key2", Secret: "secret2"},
+		},
+		AccessibilityRequirements: &swarmapi.TopologyRequirement{
+			Requisite: []*swarmapi.Topology{
+				{Segments: map[string]string{"top1": "yup"}},
+				{Segments: map[string]string{"top2": "def"}},
+				{Segments: map[string]string{"top3": "nah"}},
+			},
+			Preferred: nil,
+		},
+		CapacityRange: &swarmapi.CapacityRange{
+			RequiredBytes: 1,
+			LimitBytes:    0,
+		},
+	}
+
+	assert.DeepEqual(t, swarmSpec, expectedSwarmSpec)
+}

+ 25 - 1
daemon/cluster/executor/container/adapter.go

@@ -288,7 +288,7 @@ func (c *containerAdapter) create(ctx context.Context) error {
 	if cr, err = c.backend.CreateManagedContainer(types.ContainerCreateConfig{
 		Name:       c.container.name(),
 		Config:     c.container.config(),
-		HostConfig: c.container.hostConfig(),
+		HostConfig: c.container.hostConfig(c.dependencies.Volumes()),
 		// Use the first network in container create
 		NetworkingConfig: c.container.createNetworkingConfig(c.backend),
 	}); err != nil {
@@ -461,6 +461,30 @@ func (c *containerAdapter) createVolumes(ctx context.Context) error {
 	return nil
 }
 
+// waitClusterVolumes blocks until the VolumeGetter returns a path for each
+// cluster volume in use by this task
+func (c *containerAdapter) waitClusterVolumes(ctx context.Context) error {
+	for _, attached := range c.container.task.Volumes {
+		// for every attachment, try until we succeed or until the context
+		// is canceled.
+		for {
+			select {
+			case <-ctx.Done():
+				return ctx.Err()
+			default:
+				// continue through the code.
+			}
+			path, err := c.dependencies.Volumes().Get(attached.ID)
+			if err == nil && path != "" {
+				// break out of the inner-most loop
+				break
+			}
+		}
+	}
+	log.G(ctx).Debug("volumes ready")
+	return nil
+}
+
 func (c *containerAdapter) activateServiceBinding() error {
 	return c.backend.ActivateContainerServiceBinding(c.container.name())
 }

+ 36 - 4
daemon/cluster/executor/container/container.go

@@ -254,14 +254,44 @@ func (c *containerConfig) labels() map[string]string {
 	return labels
 }
 
-func (c *containerConfig) mounts() []enginemount.Mount {
+func (c *containerConfig) mounts(deps exec.VolumeGetter) []enginemount.Mount {
 	var r []enginemount.Mount
 	for _, mount := range c.spec().Mounts {
-		r = append(r, convertMount(mount))
+		if mount.Type == api.MountTypeCSI {
+			r = append(r, c.convertCSIMount(mount, deps))
+		} else {
+			r = append(r, convertMount(mount))
+		}
 	}
 	return r
 }
 
+// convertCSIMount matches the CSI mount with the path of the CSI volume.
+//
+// technically quadratic with respect to the number of CSI mounts, but that
+// number shouldn't ever be large enough for quadratic to matter.
+//
+// TODO(dperny): figure out a scheme for errors? or maybe add code to
+// checkMounts?
+func (c *containerConfig) convertCSIMount(m api.Mount, deps exec.VolumeGetter) enginemount.Mount {
+	var mount enginemount.Mount
+
+	// these are actually bind mounts
+	mount.Type = enginemount.TypeBind
+
+	for _, attach := range c.task.Volumes {
+		if attach.Source == m.Source && attach.Target == m.Target {
+			// we should not get an error here, because we should have checked
+			// already that the volume is ready
+			path, _ := deps.Get(attach.ID)
+			mount.Source = path
+			mount.Target = m.Target
+		}
+	}
+
+	return mount
+}
+
 func convertMount(m api.Mount) enginemount.Mount {
 	mount := enginemount.Mount{
 		Source:   m.Source,
@@ -278,6 +308,8 @@ func convertMount(m api.Mount) enginemount.Mount {
 		mount.Type = enginemount.TypeTmpfs
 	case api.MountTypeNamedPipe:
 		mount.Type = enginemount.TypeNamedPipe
+	case api.MountTypeCSI:
+		mount.Type = enginemount.TypeCluster
 	}
 
 	if m.BindOptions != nil {
@@ -350,12 +382,12 @@ func (c *containerConfig) healthcheck() *enginecontainer.HealthConfig {
 	}
 }
 
-func (c *containerConfig) hostConfig() *enginecontainer.HostConfig {
+func (c *containerConfig) hostConfig(deps exec.VolumeGetter) *enginecontainer.HostConfig {
 	hc := &enginecontainer.HostConfig{
 		Resources:      c.resources(),
 		GroupAdd:       c.spec().Groups,
 		PortBindings:   c.portBindings(),
-		Mounts:         c.mounts(),
+		Mounts:         c.mounts(deps),
 		ReadonlyRootfs: c.spec().ReadOnly,
 		Isolation:      c.isolation(),
 		Init:           c.init(),

+ 8 - 2
daemon/cluster/executor/container/container_test.go

@@ -31,7 +31,10 @@ func TestIsolationConversion(t *testing.T) {
 				},
 			}
 			config := containerConfig{task: &task}
-			assert.Equal(t, c.to, config.hostConfig().Isolation)
+			// NOTE(dperny): you shouldn't ever pass nil outside of testing,
+			// because if there are CSI volumes, the code will panic. However,
+			// in testing. this is acceptable.
+			assert.Equal(t, c.to, config.hostConfig(nil).Isolation)
 		})
 	}
 }
@@ -129,7 +132,10 @@ func TestCredentialSpecConversion(t *testing.T) {
 				},
 			}
 			config := containerConfig{task: &task}
-			assert.DeepEqual(t, c.to, config.hostConfig().SecurityOpt)
+			// NOTE(dperny): you shouldn't ever pass nil outside of testing,
+			// because if there are CSI volumes, the code will panic. However,
+			// in testing. this is acceptable.
+			assert.DeepEqual(t, c.to, config.hostConfig(nil).SecurityOpt)
 		})
 	}
 }

+ 9 - 0
daemon/cluster/executor/container/controller.go

@@ -121,6 +121,15 @@ func (r *controller) Prepare(ctx context.Context) error {
 		return err
 	}
 
+	// could take a while for the cluster volumes to become available. set for
+	// 5 minutes, I guess?
+	// TODO(dperny): do this more intelligently. return a better error.
+	waitClusterVolumesCtx, wcvcancel := context.WithTimeout(ctx, 5*time.Minute)
+	defer wcvcancel()
+	if err := r.adapter.waitClusterVolumes(waitClusterVolumesCtx); err != nil {
+		return err
+	}
+
 	// Make sure all the networks that the task needs are created.
 	if err := r.adapter.createNetworks(ctx); err != nil {
 		return err

+ 8 - 0
daemon/cluster/executor/container/executor.go

@@ -122,6 +122,9 @@ func (e *executor) Describe(ctx context.Context) (*api.NodeDescription, error) {
 		}
 	}
 
+	// TODO(dperny): don't ignore the error here
+	csiInfo, _ := e.Volumes().Plugins().NodeInfo(ctx)
+
 	description := &api.NodeDescription{
 		Hostname: info.Name,
 		Platform: &api.Platform{
@@ -138,6 +141,7 @@ func (e *executor) Describe(ctx context.Context) (*api.NodeDescription, error) {
 			MemoryBytes: info.MemTotal,
 			Generic:     convert.GenericResourcesToGRPC(info.GenericResources),
 		},
+		CSIInfo: csiInfo,
 	}
 
 	// Save the node information in the executor field
@@ -356,6 +360,10 @@ func (e *executor) Configs() exec.ConfigsManager {
 	return e.dependencies.Configs()
 }
 
+func (e *executor) Volumes() exec.VolumesManager {
+	return e.dependencies.Volumes()
+}
+
 type sortedPlugins []api.PluginDescription
 
 func (sp sortedPlugins) Len() int { return len(sp) }

+ 2 - 0
daemon/cluster/executor/container/validate.go

@@ -37,6 +37,8 @@ func validateMounts(mounts []api.Mount) error {
 			if mount.Source == "" {
 				return errors.New("invalid npipe source, source must not be empty")
 			}
+		case api.MountTypeCSI:
+			// nothing to do here.
 		default:
 			return fmt.Errorf("invalid mount type: %s", mount.Type)
 		}

+ 36 - 0
daemon/cluster/helpers.go

@@ -244,3 +244,39 @@ func getNetwork(ctx context.Context, c swarmapi.ControlClient, input string) (*s
 
 	return rl.Networks[0], nil
 }
+
+func getVolume(ctx context.Context, c swarmapi.ControlClient, input string) (*swarmapi.Volume, error) {
+	// GetVolume to match via full ID
+	if v, err := c.GetVolume(ctx, &swarmapi.GetVolumeRequest{VolumeID: input}); err == nil {
+		return v.Volume, nil
+	}
+
+	// If any error (including NotFound), list volumes to match via ID prefix
+	// and full name
+	resp, err := c.ListVolumes(ctx, &swarmapi.ListVolumesRequest{
+		Filters: &swarmapi.ListVolumesRequest_Filters{
+			Names: []string{input},
+		},
+	})
+
+	if err != nil || len(resp.Volumes) == 0 {
+		resp, err = c.ListVolumes(ctx, &swarmapi.ListVolumesRequest{
+			Filters: &swarmapi.ListVolumesRequest_Filters{
+				IDPrefixes: []string{input},
+			},
+		})
+	}
+	if err != nil {
+		return nil, err
+	}
+
+	if len(resp.Volumes) == 0 {
+		return nil, errdefs.NotFound(fmt.Errorf("volume %s not found", input))
+	}
+
+	if l := len(resp.Volumes); l > 1 {
+		return nil, errdefs.InvalidParameter(fmt.Errorf("volume %s is ambiguous (%d matches found)", input, l))
+	}
+
+	return resp.Volumes[0], nil
+}

+ 140 - 0
daemon/cluster/volumes.go

@@ -0,0 +1,140 @@
+package cluster // import "github.com/docker/docker/daemon/cluster"
+
+import (
+	"context"
+	"fmt"
+
+	volumetypes "github.com/docker/docker/api/types/volume"
+	"github.com/docker/docker/daemon/cluster/convert"
+	"github.com/docker/docker/errdefs"
+	swarmapi "github.com/moby/swarmkit/v2/api"
+	"google.golang.org/grpc"
+)
+
+// GetVolume returns a volume from the swarm cluster.
+func (c *Cluster) GetVolume(nameOrID string) (volumetypes.Volume, error) {
+	var volume *swarmapi.Volume
+
+	if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
+		v, err := getVolume(ctx, state.controlClient, nameOrID)
+		if err != nil {
+			return err
+		}
+		volume = v
+		return nil
+	}); err != nil {
+		return volumetypes.Volume{}, err
+	}
+	return convert.VolumeFromGRPC(volume), nil
+}
+
+// GetVolumes returns all of the volumes matching the given options from a swarm cluster.
+func (c *Cluster) GetVolumes(options volumetypes.ListOptions) ([]*volumetypes.Volume, error) {
+	var volumes []*volumetypes.Volume
+	if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
+		r, err := state.controlClient.ListVolumes(
+			ctx, &swarmapi.ListVolumesRequest{},
+			grpc.MaxCallRecvMsgSize(defaultRecvSizeForListResponse),
+		)
+		if err != nil {
+			return err
+		}
+
+		volumes = make([]*volumetypes.Volume, 0, len(r.Volumes))
+		for _, volume := range r.Volumes {
+			v := convert.VolumeFromGRPC(volume)
+			volumes = append(volumes, &v)
+		}
+
+		return nil
+	}); err != nil {
+		return nil, err
+	}
+
+	return volumes, nil
+}
+
+// CreateVolume creates a new cluster volume in the swarm cluster.
+//
+// Returns the volume ID if creation is successful, or an error if not.
+func (c *Cluster) CreateVolume(v volumetypes.CreateOptions) (*volumetypes.Volume, error) {
+	var resp *swarmapi.CreateVolumeResponse
+	if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
+		volumeSpec := convert.VolumeCreateToGRPC(&v)
+
+		r, err := state.controlClient.CreateVolume(
+			ctx, &swarmapi.CreateVolumeRequest{Spec: volumeSpec},
+		)
+		if err != nil {
+			return err
+		}
+		resp = r
+		return nil
+	}); err != nil {
+		return nil, err
+	}
+	createdVol, err := c.GetVolume(resp.Volume.ID)
+	if err != nil {
+		// If there's a failure of some sort in this operation the user would
+		// get a very unhelpful "not found" error on a create, which is not
+		// very helpful at all. Instead, before returning the error, add some
+		// context, and change this to a system-type error, because it's
+		// nothing the user did wrong.
+		return nil, errdefs.System(fmt.Errorf("unable to retrieve created volume: %w", err))
+	}
+	return &createdVol, nil
+}
+
+// RemoveVolume removes a volume from the swarm cluster.
+func (c *Cluster) RemoveVolume(nameOrID string, force bool) error {
+	return c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
+		volume, err := getVolume(ctx, state.controlClient, nameOrID)
+		if err != nil {
+			return err
+		}
+
+		req := &swarmapi.RemoveVolumeRequest{
+			VolumeID: volume.ID,
+			Force:    force,
+		}
+		_, err = state.controlClient.RemoveVolume(ctx, req)
+		return err
+	})
+}
+
+// UpdateVolume updates a volume in the swarm cluster.
+func (c *Cluster) UpdateVolume(nameOrID string, version uint64, volume volumetypes.UpdateOptions) error {
+	return c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
+		v, err := getVolume(ctx, state.controlClient, nameOrID)
+		if err != nil {
+			return err
+		}
+
+		// For now, the only thing we can update is availability. Instead of
+		// converting the whole spec, just pluck out the availability if it has
+		// been set.
+
+		if volume.Spec != nil {
+			switch volume.Spec.Availability {
+			case volumetypes.AvailabilityActive:
+				v.Spec.Availability = swarmapi.VolumeAvailabilityActive
+			case volumetypes.AvailabilityPause:
+				v.Spec.Availability = swarmapi.VolumeAvailabilityPause
+			case volumetypes.AvailabilityDrain:
+				v.Spec.Availability = swarmapi.VolumeAvailabilityDrain
+			}
+			// if default empty value, change nothing.
+		}
+
+		_, err = state.controlClient.UpdateVolume(
+			ctx, &swarmapi.UpdateVolumeRequest{
+				VolumeID: nameOrID,
+				VolumeVersion: &swarmapi.Version{
+					Index: version,
+				},
+				Spec: &v.Spec,
+			},
+		)
+		return err
+	})
+}

+ 10 - 0
docs/api/version-history.md

@@ -77,6 +77,16 @@ keywords: "API, Docker, rcli, REST, documentation"
   `GET /services/{id}/logs` and `GET /tasks/{id}/logs` now set Content-Type header
   to `application/vnd.docker.multiplexed-stream` when a multiplexed stdout/stderr 
   stream is sent to client, `application/vnd.docker.raw-stream` otherwise.
+* `POST /volumes/create` now accepts a new `ClusterVolumeSpec` to create a cluster
+  volume (CNI). This option can only be used if the daemon is a Swarm manager.
+  The Volume response on creation now also can contain a `ClusterVolume` field
+  with information about the created volume.
+* Volume information returned by `GET /volumes/{name}`, `GET /volumes` and
+  `GET /system/df` can now contain a `ClusterVolume` if the volume is a cluster
+  volume (requires the daemon to be a Swarm manager).
+* The `Volume` type, as returned by `Added new `ClusterVolume` fields 
+* Added a new `PUT /volumes{name}` endpoint to update cluster volumes (CNI).
+  Cluster volumes are only supported if the daemon is a Swarm manager.
 
 ## v1.41 API changes
 

+ 210 - 0
docs/cluster_volumes.md

@@ -0,0 +1,210 @@
+Cluster Volumes
+===============
+
+Docker Cluster Volumes is a new feature which allows using CSI plugins to
+create cluster-aware volumes
+
+## Installing a CSI plugin
+
+CSI, the Container Storage Interface, defines an API for storage providers to
+write storage plugins which are cross-compatible between various container
+orchestrators. However, most CSI plugins are shipped with configuration
+specific to Kubernetes. Docker CSI Plugins use the same binaries as those for
+Kubernetes, but in a different environment and sometimes with different
+configuration.
+
+If a plugin is already adapted for and available for Docker, it can be
+installed through the `docker plugin install` command. Though such plugins may
+require configuration specific to the user's environment, they will ultimately
+be detected by and work automatically with Docker once enabled.
+
+Currently, there is no way to automatically deploy a Docker Plugin across all
+nodes in a cluster. Therefore, users must ensure the Docker Plugin is installed
+on all nodes in the cluster on which it is desired.
+
+Docker Swarm worker nodes report their active plugins to the Docker Swarm
+managers, and so it is not necessary to install a plugin on every worker node
+if this is not desired. However, the plugin must be installed on every manager
+node, or a leadership change could result in Docker Swarm no longer having the
+ability to call the plugin.
+
+### Creating a Docker CSI Plugin
+
+Before following this section, readers should ensure they are acquainted with
+the 
+[Docker Engine managed plugin system](https://docs.docker.com/engine/extend/).
+Docker CSI plugins use this system to run.
+
+Docker CSI plugins are identified with a special interface type. There are two
+related interfaces that CSI plugins can expose. In the `config.json`, this
+should be set as such.
+
+```json
+  "interface": {
+    "types": ["docker.csicontroller/1.0","docker.csinode/1.0"]
+  }
+```
+
+Additionally, the CSI specification states that CSI plugins should have
+`CAP_SYS_ADMIN` privileges, so this should be set in the `config.json` as
+well:
+
+```json
+  "linux" : {
+    "capabilities": ["CAP_SYS_ADMIN"]
+  }
+```
+
+Other configuration is largely specific to the CSI plugin.
+
+#### Split-Component Plugins
+
+For split-component plugins, users can specify either the
+`docker.csicontroller/1.0` or `docker.csinode/1.0` plugin interfaces. Manager
+nodes should run plugin instances with the `docker.csicontroller/1.0`
+interface, and worker nodes the `docker.csinode/1.0` interface.
+
+Docker does support running two plugins with the same name, nor does it support
+specifying different drivers for the node and controller plugins. This means in
+a fully split plugin, Swarm will be unable to schedule volumes to manager
+nodes.
+
+If it is desired to run a split-component plugin such that the Volumes managed
+by that plugin are accessible to Tasks on the manager node, the user will need
+to build the plugin such that some proxy or multiplexer provides the illusion
+of combined components to the manager through one socket, and ensure the plugin
+reports both interface types.
+
+## Using Cluster Volumes
+
+### Create a Cluster Volume
+
+Creating a Cluster Volume is done with the same `docker volume` commands as any
+other Volume. To create a Cluster Volume, one needs to do both of things:
+
+* Specify a CSI-capable driver with the `--driver` or `-d` option.
+* Use any one of the cluster-specific `docker volume create` flags.
+
+For example, to create a Cluster Volume called `my-volume` with the
+`democratic-csi` Volume Driver, one might use this command:
+
+```bash
+docker volume create \
+  --driver democratic-csi \
+  --type mount \
+  --sharing all \
+  --scope multi \
+  --limit-bytes 10G \
+  --required-bytes 1G \
+  my-volume
+```
+
+### List Cluster Volumes
+
+Cluster Volumes will be listed along with other volumes when doing
+`docker volume ls`. However, if users want to see only Cluster Volumes, and
+with cluster-specific information, the flag `--cluster` can be specified:
+
+```
+$ docker volume ls --cluster
+VOLUME NAME   GROUP     DRIVER    AVAILABILITY   STATUS
+volume1       group1    driver1   active         pending creation
+volume2       group1    driver1   pause          created
+volume3       group2    driver2   active         in use (1 node)
+volume4       group2    driver2   active         in use (2 nodes)
+```
+
+### Deploying a Service
+
+Cluster Volumes are only compatible with Docker Services, not plain Docker
+Containers.
+
+In Docker Services, a Cluster Volume is used the same way any other volume
+would be used. The `type` should be set to `csi`. For example, to create a
+Service that uses `my-volume` created above, one would execute a command like:
+
+```bash
+docker service create \
+  --name my-service \
+  --mount type=csi,src=my-volume,dst=/srv/www \
+  nginx:alpine
+```
+
+When scheduling Services which use Cluster Volumes, Docker Swarm uses the
+volume's information and state to make decisions about Task placement.
+
+For example, the Service will be constrained to run only on nodes on which the
+volume is available. If the volume is configured with `scope=single`, meaning
+it can only be used on one node in the cluster at a time, then all Tasks for
+that Service will be scheduled to that same node. If that node changes for some
+reason, like a node failure, then the Tasks will be rescheduled to the new
+node automatically, without user input.
+
+If the Cluster Volume is accessible only on some set of nodes at the same time,
+and not the whole cluster, then Docker Swarm will only schedule the Service to
+those nodes as reported by the plugin.
+
+### Using Volume Groups
+
+It is frequently desirable that a Service use any available volume out of an
+interchangeable set. To accomplish this in the most simple and straightforward
+manner possible, Cluster Volumes use the concept of a volume "Group".
+
+The Volume Group is a field, somewhat like a special label, which is used to
+instruct Swarm that a given volume is interchangeable with every other volume
+of the same Group. When creating a Cluster Volume, the Group can be specified
+by using the `--group` flag.
+
+To use a Cluster Volume by Group instead of by Name, the mount `src` option is
+prefixed with `group:`, followed by the group name. For example:
+
+```
+--mount type=csi,src=group:my-group,dst=/srv/www
+```
+
+This instructs Docker Swarm that any Volume with the Group `my-group` can be
+used to satisfy the mounts.
+
+Volumes in a Group do not need to be identical, but they must be
+interchangeable. These caveats should be kept in mind when using Groups:
+
+* No Service ever gets the monopoly on a Cluster Volume. If several Services
+  use the same Group, then the Cluster Volumes in that Group can be used with
+  any of those Services at any time. Just because a particular Volume was used
+  by a particular Service at one point does not mean it won't be used by a
+  different Service later.
+* Volumes in a group can have different configurations, but all of those
+  configurations must be compatible with the Service. For example, if some of
+  the Volumes in a group have `sharing=readonly`, then the Service must be
+  capable of using the volume in read-only mode.
+* Volumes in a Group are created statically ahead of time, not dynamically
+  as-needed. This means that the user must ensure a sufficient number of
+  Volumes belong to the desired Group to support the needs of the Service.
+
+### Taking Cluster Volumes Offline
+
+For various reasons, users may wish to take a particular Cluster Volume
+offline, such that is not actively used by Services. To facilitate this,
+Cluster Volumes have an `availability` option similar to Docker Swarm nodes.
+
+Cluster Volume availability can be one of three states:
+
+* `active` - Default. Volume can be used as normal.
+* `pause` - The volume will not be used for new Services, but existing Tasks
+  using the volume will not be stopped.
+* `drain` - The volume will not be used for new Services, and any running Tasks
+  using the volume will be stopped and rescheduled.
+
+A Volume can only be removed from the cluster entirely if its availability is
+set to `drain`, and it has been fully unpublished from all nodes.
+
+#### Force-Removing Volumes
+
+There are cases where a Volume can get caught in a state where Swarm cannot
+verify their removal. In these cases, 
+
+## Unsupported Features
+
+The CSI Spec allows for a large number of features which Cluster Volumes in
+this initial implementation do not support. Most notably, Cluster Volumes do
+not support snapshots, cloning, or volume expansion.

+ 1 - 1
plugin/manager.go

@@ -211,7 +211,7 @@ func (pm *Manager) reload() error { // todo: restore
 
 			// We should only enable rootfs propagation for certain plugin types that need it.
 			for _, typ := range p.PluginObj.Config.Interface.Types {
-				if (typ.Capability == "volumedriver" || typ.Capability == "graphdriver") && typ.Prefix == "docker" && strings.HasPrefix(typ.Version, "1.") {
+				if (typ.Capability == "volumedriver" || typ.Capability == "graphdriver" || typ.Capability == "csinode" || typ.Capability == "csicontroller") && typ.Prefix == "docker" && strings.HasPrefix(typ.Version, "1.") {
 					if p.PluginObj.Config.PropagatedMount != "" {
 						propRoot := filepath.Join(filepath.Dir(p.Rootfs), "propagated-mount")