Add Swarm cluster volume supports

Adds code to support Cluster Volumes in Swarm using CSI drivers.

Signed-off-by: Drew Erny <derny@mirantis.com>
This commit is contained in:
Drew Erny 2021-05-14 11:38:50 -06:00 committed by Sebastiaan van Stijn
parent 3fb5928233
commit 240a9fcb83
No known key found for this signature in database
GPG key ID: 76698F39D527CE8C
33 changed files with 2796 additions and 17 deletions

View file

@ -19,3 +19,16 @@ type Backend interface {
Remove(ctx context.Context, name string, opts ...opts.RemoveOption) error Remove(ctx context.Context, name string, opts ...opts.RemoveOption) error
Prune(ctx context.Context, pruneFilters filters.Args) (*types.VolumesPruneReport, error) Prune(ctx context.Context, pruneFilters filters.Args) (*types.VolumesPruneReport, error)
} }
// ClusterBackend is the backend used for Swarm Cluster Volumes. Regular
// volumes go through the volume service, but to avoid across-dependency
// between the cluster package and the volume package, we simply provide two
// backends here.
type ClusterBackend interface {
GetVolume(nameOrID string) (volume.Volume, error)
GetVolumes(options volume.ListOptions) ([]*volume.Volume, error)
CreateVolume(volume volume.CreateOptions) (*volume.Volume, error)
RemoveVolume(nameOrID string, force bool) error
UpdateVolume(nameOrID string, version uint64, volume volume.UpdateOptions) error
IsManager() bool
}

View file

@ -5,13 +5,15 @@ import "github.com/docker/docker/api/server/router"
// volumeRouter is a router to talk with the volumes controller // volumeRouter is a router to talk with the volumes controller
type volumeRouter struct { type volumeRouter struct {
backend Backend backend Backend
cluster ClusterBackend
routes []router.Route routes []router.Route
} }
// NewRouter initializes a new volume router // NewRouter initializes a new volume router
func NewRouter(b Backend) router.Router { func NewRouter(b Backend, cb ClusterBackend) router.Router {
r := &volumeRouter{ r := &volumeRouter{
backend: b, backend: b,
cluster: cb,
} }
r.initRoutes() r.initRoutes()
return r return r
@ -30,6 +32,8 @@ func (r *volumeRouter) initRoutes() {
// POST // POST
router.NewPostRoute("/volumes/create", r.postVolumesCreate), router.NewPostRoute("/volumes/create", r.postVolumesCreate),
router.NewPostRoute("/volumes/prune", r.postVolumesPrune), router.NewPostRoute("/volumes/prune", r.postVolumesPrune),
// PUT
router.NewPutRoute("/volumes/{name:.*}", r.putVolumesUpdate),
// DELETE // DELETE
router.NewDeleteRoute("/volumes/{name:.*}", r.deleteVolumes), router.NewDeleteRoute("/volumes/{name:.*}", r.deleteVolumes),
} }

View file

@ -2,13 +2,24 @@ package volume // import "github.com/docker/docker/api/server/router/volume"
import ( import (
"context" "context"
"fmt"
"net/http" "net/http"
"strconv"
"github.com/docker/docker/api/server/httputils" "github.com/docker/docker/api/server/httputils"
"github.com/docker/docker/api/types/filters" "github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/versions"
"github.com/docker/docker/api/types/volume" "github.com/docker/docker/api/types/volume"
"github.com/docker/docker/errdefs"
"github.com/docker/docker/volume/service/opts" "github.com/docker/docker/volume/service/opts"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
const (
// clusterVolumesVersion defines the API version that swarm cluster volume
// functionality was introduced. avoids the use of magic numbers.
clusterVolumesVersion = "1.42"
) )
func (v *volumeRouter) getVolumesList(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { func (v *volumeRouter) getVolumesList(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
@ -24,6 +35,21 @@ func (v *volumeRouter) getVolumesList(ctx context.Context, w http.ResponseWriter
if err != nil { if err != nil {
return err return err
} }
version := httputils.VersionFromContext(ctx)
if versions.GreaterThanOrEqualTo(version, clusterVolumesVersion) && v.cluster.IsManager() {
clusterVolumes, swarmErr := v.cluster.GetVolumes(volume.ListOptions{Filters: filters})
if swarmErr != nil {
// if there is a swarm error, we may not want to error out right
// away. the local list probably worked. instead, let's do what we
// do if there's a bad driver while trying to list: add the error
// to the warnings. don't do this if swarm is not initialized.
warnings = append(warnings, swarmErr.Error())
}
// add the cluster volumes to the return
volumes = append(volumes, clusterVolumes...)
}
return httputils.WriteJSON(w, http.StatusOK, &volume.ListResponse{Volumes: volumes, Warnings: warnings}) return httputils.WriteJSON(w, http.StatusOK, &volume.ListResponse{Volumes: volumes, Warnings: warnings})
} }
@ -31,11 +57,33 @@ func (v *volumeRouter) getVolumeByName(ctx context.Context, w http.ResponseWrite
if err := httputils.ParseForm(r); err != nil { if err := httputils.ParseForm(r); err != nil {
return err return err
} }
version := httputils.VersionFromContext(ctx)
// re: volume name duplication
//
// we prefer to get volumes locally before attempting to get them from the
// cluster. Local volumes can only be looked up by name, but cluster
// volumes can also be looked up by ID.
vol, err := v.backend.Get(ctx, vars["name"], opts.WithGetResolveStatus) vol, err := v.backend.Get(ctx, vars["name"], opts.WithGetResolveStatus)
if err != nil {
// if the volume is not found in the regular volume backend, and the client
// is using an API version greater than 1.42 (when cluster volumes were
// introduced), then check if Swarm has the volume.
if errdefs.IsNotFound(err) && versions.GreaterThanOrEqualTo(version, clusterVolumesVersion) && v.cluster.IsManager() {
swarmVol, err := v.cluster.GetVolume(vars["name"])
// if swarm returns an error and that error indicates that swarm is not
// initialized, return original NotFound error. Otherwise, we'd return
// a weird swarm unavailable error on non-swarm engines.
if err != nil {
return err
}
vol = &swarmVol
} else if err != nil {
// otherwise, if this isn't NotFound, or this isn't a high enough version,
// just return the error by itself.
return err return err
} }
return httputils.WriteJSON(w, http.StatusOK, vol) return httputils.WriteJSON(w, http.StatusOK, vol)
} }
@ -49,21 +97,82 @@ func (v *volumeRouter) postVolumesCreate(ctx context.Context, w http.ResponseWri
return err return err
} }
vol, err := v.backend.Create(ctx, req.Name, req.Driver, opts.WithCreateOptions(req.DriverOpts), opts.WithCreateLabels(req.Labels)) var (
vol *volume.Volume
err error
version = httputils.VersionFromContext(ctx)
)
// if the ClusterVolumeSpec is filled in, then this is a cluster volume
// and is created through the swarm cluster volume backend.
//
// re: volume name duplication
//
// As it happens, there is no good way to prevent duplication of a volume
// name between local and cluster volumes. This is because Swarm volumes
// can be created from any manager node, bypassing most of the protections
// we could put into the engine side.
//
// Instead, we will allow creating a volume with a duplicate name, which
// should not break anything.
if req.ClusterVolumeSpec != nil && versions.GreaterThanOrEqualTo(version, clusterVolumesVersion) {
logrus.Debug("using cluster volume")
vol, err = v.cluster.CreateVolume(req)
} else {
logrus.Debug("using regular volume")
vol, err = v.backend.Create(ctx, req.Name, req.Driver, opts.WithCreateOptions(req.DriverOpts), opts.WithCreateLabels(req.Labels))
}
if err != nil { if err != nil {
return err return err
} }
return httputils.WriteJSON(w, http.StatusCreated, vol) return httputils.WriteJSON(w, http.StatusCreated, vol)
} }
func (v *volumeRouter) putVolumesUpdate(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if !v.cluster.IsManager() {
return errdefs.Unavailable(errors.New("volume update only valid for cluster volumes, but swarm is unavailable"))
}
if err := httputils.ParseForm(r); err != nil {
return err
}
rawVersion := r.URL.Query().Get("version")
version, err := strconv.ParseUint(rawVersion, 10, 64)
if err != nil {
err = fmt.Errorf("invalid swarm object version '%s': %v", rawVersion, err)
return errdefs.InvalidParameter(err)
}
var req volume.UpdateOptions
if err := httputils.ReadJSON(r, &req); err != nil {
return err
}
return v.cluster.UpdateVolume(vars["name"], version, req)
}
func (v *volumeRouter) deleteVolumes(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { func (v *volumeRouter) deleteVolumes(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if err := httputils.ParseForm(r); err != nil { if err := httputils.ParseForm(r); err != nil {
return err return err
} }
force := httputils.BoolValue(r, "force") force := httputils.BoolValue(r, "force")
if err := v.backend.Remove(ctx, vars["name"], opts.WithPurgeOnError(force)); err != nil {
return err version := httputils.VersionFromContext(ctx)
err := v.backend.Remove(ctx, vars["name"], opts.WithPurgeOnError(force))
if err != nil {
if errdefs.IsNotFound(err) && versions.GreaterThanOrEqualTo(version, clusterVolumesVersion) && v.cluster.IsManager() {
err := v.cluster.RemoveVolume(vars["name"], force)
if err != nil {
return err
}
} else {
return err
}
} }
w.WriteHeader(http.StatusNoContent) w.WriteHeader(http.StatusNoContent)
return nil return nil
} }

View file

@ -0,0 +1,752 @@
package volume
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http/httptest"
"testing"
"gotest.tools/v3/assert"
"github.com/docker/docker/api/server/httputils"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/volume"
"github.com/docker/docker/errdefs"
"github.com/docker/docker/volume/service/opts"
)
func callGetVolume(v *volumeRouter, name string) (*httptest.ResponseRecorder, error) {
ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion)
vars := map[string]string{"name": name}
req := httptest.NewRequest("GET", fmt.Sprintf("/volumes/%s", name), nil)
resp := httptest.NewRecorder()
err := v.getVolumeByName(ctx, resp, req, vars)
return resp, err
}
func callListVolumes(v *volumeRouter) (*httptest.ResponseRecorder, error) {
ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion)
vars := map[string]string{}
req := httptest.NewRequest("GET", "/volumes", nil)
resp := httptest.NewRecorder()
err := v.getVolumesList(ctx, resp, req, vars)
return resp, err
}
func TestGetVolumeByNameNotFoundNoSwarm(t *testing.T) {
v := &volumeRouter{
backend: &fakeVolumeBackend{},
cluster: &fakeClusterBackend{},
}
_, err := callGetVolume(v, "notReal")
assert.Assert(t, err != nil)
assert.Assert(t, errdefs.IsNotFound(err))
}
func TestGetVolumeByNameNotFoundNotManager(t *testing.T) {
v := &volumeRouter{
backend: &fakeVolumeBackend{},
cluster: &fakeClusterBackend{swarm: true},
}
_, err := callGetVolume(v, "notReal")
assert.Assert(t, err != nil)
assert.Assert(t, errdefs.IsNotFound(err))
}
func TestGetVolumeByNameNotFound(t *testing.T) {
v := &volumeRouter{
backend: &fakeVolumeBackend{},
cluster: &fakeClusterBackend{swarm: true, manager: true},
}
_, err := callGetVolume(v, "notReal")
assert.Assert(t, err != nil)
assert.Assert(t, errdefs.IsNotFound(err))
}
func TestGetVolumeByNameFoundRegular(t *testing.T) {
v := &volumeRouter{
backend: &fakeVolumeBackend{
volumes: map[string]*volume.Volume{
"volume1": &volume.Volume{
Name: "volume1",
},
},
},
cluster: &fakeClusterBackend{swarm: true, manager: true},
}
_, err := callGetVolume(v, "volume1")
assert.NilError(t, err)
}
func TestGetVolumeByNameFoundSwarm(t *testing.T) {
v := &volumeRouter{
backend: &fakeVolumeBackend{},
cluster: &fakeClusterBackend{
swarm: true,
manager: true,
volumes: map[string]*volume.Volume{
"volume1": &volume.Volume{
Name: "volume1",
},
},
},
}
_, err := callGetVolume(v, "volume1")
assert.NilError(t, err)
}
func TestListVolumes(t *testing.T) {
v := &volumeRouter{
backend: &fakeVolumeBackend{
volumes: map[string]*volume.Volume{
"v1": &volume.Volume{Name: "v1"},
"v2": &volume.Volume{Name: "v2"},
},
},
cluster: &fakeClusterBackend{
swarm: true,
manager: true,
volumes: map[string]*volume.Volume{
"v3": &volume.Volume{Name: "v3"},
"v4": &volume.Volume{Name: "v4"},
},
},
}
resp, err := callListVolumes(v)
assert.NilError(t, err)
d := json.NewDecoder(resp.Result().Body)
respVols := volume.ListResponse{}
assert.NilError(t, d.Decode(&respVols))
assert.Assert(t, respVols.Volumes != nil)
assert.Equal(t, len(respVols.Volumes), 4, "volumes %v", respVols.Volumes)
}
func TestListVolumesNoSwarm(t *testing.T) {
v := &volumeRouter{
backend: &fakeVolumeBackend{
volumes: map[string]*volume.Volume{
"v1": &volume.Volume{Name: "v1"},
"v2": &volume.Volume{Name: "v2"},
},
},
cluster: &fakeClusterBackend{},
}
_, err := callListVolumes(v)
assert.NilError(t, err)
}
func TestListVolumesNoManager(t *testing.T) {
v := &volumeRouter{
backend: &fakeVolumeBackend{
volumes: map[string]*volume.Volume{
"v1": &volume.Volume{Name: "v1"},
"v2": &volume.Volume{Name: "v2"},
},
},
cluster: &fakeClusterBackend{swarm: true},
}
resp, err := callListVolumes(v)
assert.NilError(t, err)
d := json.NewDecoder(resp.Result().Body)
respVols := volume.ListResponse{}
assert.NilError(t, d.Decode(&respVols))
assert.Equal(t, len(respVols.Volumes), 2)
assert.Equal(t, len(respVols.Warnings), 0)
}
func TestCreateRegularVolume(t *testing.T) {
b := &fakeVolumeBackend{}
c := &fakeClusterBackend{
swarm: true,
manager: true,
}
v := &volumeRouter{
backend: b,
cluster: c,
}
volumeCreate := volume.CreateOptions{
Name: "vol1",
Driver: "foodriver",
}
buf := bytes.Buffer{}
e := json.NewEncoder(&buf)
e.Encode(volumeCreate)
ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion)
req := httptest.NewRequest("POST", "/volumes/create", &buf)
req.Header.Add("Content-Type", "application/json")
resp := httptest.NewRecorder()
err := v.postVolumesCreate(ctx, resp, req, nil)
assert.NilError(t, err)
respVolume := volume.Volume{}
assert.NilError(t, json.NewDecoder(resp.Result().Body).Decode(&respVolume))
assert.Equal(t, respVolume.Name, "vol1")
assert.Equal(t, respVolume.Driver, "foodriver")
assert.Equal(t, 1, len(b.volumes))
assert.Equal(t, 0, len(c.volumes))
}
func TestCreateSwarmVolumeNoSwarm(t *testing.T) {
b := &fakeVolumeBackend{}
c := &fakeClusterBackend{}
v := &volumeRouter{
backend: b,
cluster: c,
}
volumeCreate := volume.CreateOptions{
ClusterVolumeSpec: &volume.ClusterVolumeSpec{},
Name: "volCluster",
Driver: "someCSI",
}
buf := bytes.Buffer{}
json.NewEncoder(&buf).Encode(volumeCreate)
ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion)
req := httptest.NewRequest("POST", "/volumes/create", &buf)
req.Header.Add("Content-Type", "application/json")
resp := httptest.NewRecorder()
err := v.postVolumesCreate(ctx, resp, req, nil)
assert.Assert(t, err != nil)
assert.Assert(t, errdefs.IsUnavailable(err))
}
func TestCreateSwarmVolumeNotManager(t *testing.T) {
b := &fakeVolumeBackend{}
c := &fakeClusterBackend{swarm: true}
v := &volumeRouter{
backend: b,
cluster: c,
}
volumeCreate := volume.CreateOptions{
ClusterVolumeSpec: &volume.ClusterVolumeSpec{},
Name: "volCluster",
Driver: "someCSI",
}
buf := bytes.Buffer{}
json.NewEncoder(&buf).Encode(volumeCreate)
ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion)
req := httptest.NewRequest("POST", "/volumes/create", &buf)
req.Header.Add("Content-Type", "application/json")
resp := httptest.NewRecorder()
err := v.postVolumesCreate(ctx, resp, req, nil)
assert.Assert(t, err != nil)
assert.Assert(t, errdefs.IsUnavailable(err))
}
func TestCreateVolumeCluster(t *testing.T) {
b := &fakeVolumeBackend{}
c := &fakeClusterBackend{
swarm: true,
manager: true,
}
v := &volumeRouter{
backend: b,
cluster: c,
}
volumeCreate := volume.CreateOptions{
ClusterVolumeSpec: &volume.ClusterVolumeSpec{},
Name: "volCluster",
Driver: "someCSI",
}
buf := bytes.Buffer{}
json.NewEncoder(&buf).Encode(volumeCreate)
ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion)
req := httptest.NewRequest("POST", "/volumes/create", &buf)
req.Header.Add("Content-Type", "application/json")
resp := httptest.NewRecorder()
err := v.postVolumesCreate(ctx, resp, req, nil)
assert.NilError(t, err)
respVolume := volume.Volume{}
assert.NilError(t, json.NewDecoder(resp.Result().Body).Decode(&respVolume))
assert.Equal(t, respVolume.Name, "volCluster")
assert.Equal(t, respVolume.Driver, "someCSI")
assert.Equal(t, 0, len(b.volumes))
assert.Equal(t, 1, len(c.volumes))
}
func TestUpdateVolume(t *testing.T) {
b := &fakeVolumeBackend{}
c := &fakeClusterBackend{
swarm: true,
manager: true,
volumes: map[string]*volume.Volume{
"vol1": &volume.Volume{
Name: "vo1",
ClusterVolume: &volume.ClusterVolume{
ID: "vol1",
},
},
},
}
v := &volumeRouter{
backend: b,
cluster: c,
}
volumeUpdate := volume.UpdateOptions{
Spec: &volume.ClusterVolumeSpec{},
}
buf := bytes.Buffer{}
json.NewEncoder(&buf).Encode(volumeUpdate)
ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion)
req := httptest.NewRequest("POST", "/volumes/vol1/update?version=0", &buf)
req.Header.Add("Content-Type", "application/json")
resp := httptest.NewRecorder()
err := v.putVolumesUpdate(ctx, resp, req, map[string]string{"name": "vol1"})
assert.NilError(t, err)
assert.Equal(t, c.volumes["vol1"].ClusterVolume.Meta.Version.Index, uint64(1))
}
func TestUpdateVolumeNoSwarm(t *testing.T) {
b := &fakeVolumeBackend{}
c := &fakeClusterBackend{}
v := &volumeRouter{
backend: b,
cluster: c,
}
volumeUpdate := volume.UpdateOptions{
Spec: &volume.ClusterVolumeSpec{},
}
buf := bytes.Buffer{}
json.NewEncoder(&buf).Encode(volumeUpdate)
ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion)
req := httptest.NewRequest("POST", "/volumes/vol1/update?version=0", &buf)
req.Header.Add("Content-Type", "application/json")
resp := httptest.NewRecorder()
err := v.putVolumesUpdate(ctx, resp, req, map[string]string{"name": "vol1"})
assert.Assert(t, err != nil)
assert.Assert(t, errdefs.IsUnavailable(err))
}
func TestUpdateVolumeNotFound(t *testing.T) {
b := &fakeVolumeBackend{}
c := &fakeClusterBackend{
swarm: true,
manager: true,
volumes: map[string]*volume.Volume{},
}
v := &volumeRouter{
backend: b,
cluster: c,
}
volumeUpdate := volume.UpdateOptions{
Spec: &volume.ClusterVolumeSpec{},
}
buf := bytes.Buffer{}
json.NewEncoder(&buf).Encode(volumeUpdate)
ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion)
req := httptest.NewRequest("POST", "/volumes/vol1/update?version=0", &buf)
req.Header.Add("Content-Type", "application/json")
resp := httptest.NewRecorder()
err := v.putVolumesUpdate(ctx, resp, req, map[string]string{"name": "vol1"})
assert.Assert(t, err != nil)
assert.Assert(t, errdefs.IsNotFound(err))
}
func TestVolumeRemove(t *testing.T) {
b := &fakeVolumeBackend{
volumes: map[string]*volume.Volume{
"vol1": &volume.Volume{
Name: "vol1",
},
},
}
c := &fakeClusterBackend{swarm: true, manager: true}
v := &volumeRouter{
backend: b,
cluster: c,
}
ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion)
req := httptest.NewRequest("DELETE", "/volumes/vol1", nil)
resp := httptest.NewRecorder()
err := v.deleteVolumes(ctx, resp, req, map[string]string{"name": "vol1"})
assert.NilError(t, err)
assert.Equal(t, len(b.volumes), 0)
}
func TestVolumeRemoveSwarm(t *testing.T) {
b := &fakeVolumeBackend{}
c := &fakeClusterBackend{
swarm: true,
manager: true,
volumes: map[string]*volume.Volume{
"vol1": &volume.Volume{
Name: "vol1",
ClusterVolume: &volume.ClusterVolume{},
},
},
}
v := &volumeRouter{
backend: b,
cluster: c,
}
ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion)
req := httptest.NewRequest("DELETE", "/volumes/vol1", nil)
resp := httptest.NewRecorder()
err := v.deleteVolumes(ctx, resp, req, map[string]string{"name": "vol1"})
assert.NilError(t, err)
assert.Equal(t, len(c.volumes), 0)
}
func TestVolumeRemoveNotFoundNoSwarm(t *testing.T) {
b := &fakeVolumeBackend{}
c := &fakeClusterBackend{}
v := &volumeRouter{
backend: b,
cluster: c,
}
ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion)
req := httptest.NewRequest("DELETE", "/volumes/vol1", nil)
resp := httptest.NewRecorder()
err := v.deleteVolumes(ctx, resp, req, map[string]string{"name": "vol1"})
assert.Assert(t, err != nil)
assert.Assert(t, errdefs.IsNotFound(err), err.Error())
}
func TestVolumeRemoveNotFoundNoManager(t *testing.T) {
b := &fakeVolumeBackend{}
c := &fakeClusterBackend{swarm: true}
v := &volumeRouter{
backend: b,
cluster: c,
}
ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion)
req := httptest.NewRequest("DELETE", "/volumes/vol1", nil)
resp := httptest.NewRecorder()
err := v.deleteVolumes(ctx, resp, req, map[string]string{"name": "vol1"})
assert.Assert(t, err != nil)
assert.Assert(t, errdefs.IsNotFound(err))
}
func TestVolumeRemoveFoundNoSwarm(t *testing.T) {
b := &fakeVolumeBackend{
volumes: map[string]*volume.Volume{
"vol1": &volume.Volume{
Name: "vol1",
},
},
}
c := &fakeClusterBackend{}
v := &volumeRouter{
backend: b,
cluster: c,
}
ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion)
req := httptest.NewRequest("DELETE", "/volumes/vol1", nil)
resp := httptest.NewRecorder()
err := v.deleteVolumes(ctx, resp, req, map[string]string{"name": "vol1"})
assert.NilError(t, err)
assert.Equal(t, len(b.volumes), 0)
}
func TestVolumeRemoveNoSwarmInUse(t *testing.T) {
b := &fakeVolumeBackend{
volumes: map[string]*volume.Volume{
"inuse": &volume.Volume{
Name: "inuse",
},
},
}
c := &fakeClusterBackend{}
v := &volumeRouter{
backend: b,
cluster: c,
}
ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion)
req := httptest.NewRequest("DELETE", "/volumes/inuse", nil)
resp := httptest.NewRecorder()
err := v.deleteVolumes(ctx, resp, req, map[string]string{"name": "inuse"})
assert.Assert(t, err != nil)
assert.Assert(t, errdefs.IsConflict(err))
}
func TestVolumeRemoveSwarmForce(t *testing.T) {
b := &fakeVolumeBackend{}
c := &fakeClusterBackend{
swarm: true,
manager: true,
volumes: map[string]*volume.Volume{
"vol1": &volume.Volume{
Name: "vol1",
ClusterVolume: &volume.ClusterVolume{},
Options: map[string]string{"mustforce": "yes"},
},
},
}
v := &volumeRouter{
backend: b,
cluster: c,
}
ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion)
req := httptest.NewRequest("DELETE", "/volumes/vol1", nil)
resp := httptest.NewRecorder()
err := v.deleteVolumes(ctx, resp, req, map[string]string{"name": "vol1"})
assert.Assert(t, err != nil)
assert.Assert(t, errdefs.IsConflict(err))
ctx = context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion)
req = httptest.NewRequest("DELETE", "/volumes/vol1?force=1", nil)
resp = httptest.NewRecorder()
err = v.deleteVolumes(ctx, resp, req, map[string]string{"name": "vol1"})
assert.NilError(t, err)
assert.Equal(t, len(b.volumes), 0)
}
type fakeVolumeBackend struct {
volumes map[string]*volume.Volume
}
func (b *fakeVolumeBackend) List(_ context.Context, _ filters.Args) ([]*volume.Volume, []string, error) {
volumes := []*volume.Volume{}
for _, v := range b.volumes {
volumes = append(volumes, v)
}
return volumes, nil, nil
}
func (b *fakeVolumeBackend) Get(_ context.Context, name string, _ ...opts.GetOption) (*volume.Volume, error) {
if v, ok := b.volumes[name]; ok {
return v, nil
}
return nil, errdefs.NotFound(fmt.Errorf("volume %s not found", name))
}
func (b *fakeVolumeBackend) Create(_ context.Context, name, driverName string, _ ...opts.CreateOption) (*volume.Volume, error) {
if _, ok := b.volumes[name]; ok {
// TODO(dperny): return appropriate error type
return nil, fmt.Errorf("already exists")
}
v := &volume.Volume{
Name: name,
Driver: driverName,
}
if b.volumes == nil {
b.volumes = map[string]*volume.Volume{
name: v,
}
} else {
b.volumes[name] = v
}
return v, nil
}
func (b *fakeVolumeBackend) Remove(_ context.Context, name string, _ ...opts.RemoveOption) error {
if v, ok := b.volumes[name]; !ok {
return errdefs.NotFound(fmt.Errorf("volume %s not found", name))
} else if v.Name == "inuse" {
return errdefs.Conflict(fmt.Errorf("volume in use"))
}
delete(b.volumes, name)
return nil
}
func (b *fakeVolumeBackend) Prune(_ context.Context, _ filters.Args) (*types.VolumesPruneReport, error) {
return nil, nil
}
type fakeClusterBackend struct {
swarm bool
manager bool
idCount int
volumes map[string]*volume.Volume
}
func (c *fakeClusterBackend) checkSwarm() error {
if !c.swarm {
return errdefs.Unavailable(fmt.Errorf("this node is not a swarm manager. Use \"docker swarm init\" or \"docker swarm join\" to connect this node to swarm and try again"))
} else if !c.manager {
return errdefs.Unavailable(fmt.Errorf("this node is not a swarm manager. Worker nodes can't be used to view or modify cluster state. Please run this command on a manager node or promote the current node to a manager"))
}
return nil
}
func (c *fakeClusterBackend) IsManager() bool {
return c.swarm && c.manager
}
func (c *fakeClusterBackend) GetVolume(nameOrID string) (volume.Volume, error) {
if err := c.checkSwarm(); err != nil {
return volume.Volume{}, err
}
if v, ok := c.volumes[nameOrID]; ok {
return *v, nil
}
return volume.Volume{}, errdefs.NotFound(fmt.Errorf("volume %s not found", nameOrID))
}
func (c *fakeClusterBackend) GetVolumes(options volume.ListOptions) ([]*volume.Volume, error) {
if err := c.checkSwarm(); err != nil {
return nil, err
}
volumes := []*volume.Volume{}
for _, v := range c.volumes {
volumes = append(volumes, v)
}
return volumes, nil
}
func (c *fakeClusterBackend) CreateVolume(volumeCreate volume.CreateOptions) (*volume.Volume, error) {
if err := c.checkSwarm(); err != nil {
return nil, err
}
if _, ok := c.volumes[volumeCreate.Name]; ok {
// TODO(dperny): return appropriate already exists error
return nil, fmt.Errorf("already exists")
}
v := &volume.Volume{
Name: volumeCreate.Name,
Driver: volumeCreate.Driver,
Labels: volumeCreate.Labels,
Options: volumeCreate.DriverOpts,
Scope: "global",
}
v.ClusterVolume = &volume.ClusterVolume{
ID: fmt.Sprintf("cluster_%d", c.idCount),
Spec: *volumeCreate.ClusterVolumeSpec,
}
c.idCount = c.idCount + 1
if c.volumes == nil {
c.volumes = map[string]*volume.Volume{
v.Name: v,
}
} else {
c.volumes[v.Name] = v
}
return v, nil
}
func (c *fakeClusterBackend) RemoveVolume(nameOrID string, force bool) error {
if err := c.checkSwarm(); err != nil {
return err
}
v, ok := c.volumes[nameOrID]
if !ok {
return errdefs.NotFound(fmt.Errorf("volume %s not found", nameOrID))
}
if _, mustforce := v.Options["mustforce"]; mustforce && !force {
return errdefs.Conflict(fmt.Errorf("volume %s must be force removed", nameOrID))
}
delete(c.volumes, nameOrID)
return nil
}
func (c *fakeClusterBackend) UpdateVolume(nameOrID string, version uint64, _ volume.UpdateOptions) error {
if err := c.checkSwarm(); err != nil {
return err
}
if v, ok := c.volumes[nameOrID]; ok {
if v.ClusterVolume.Meta.Version.Index != version {
return fmt.Errorf("wrong version")
}
v.ClusterVolume.Meta.Version.Index = v.ClusterVolume.Meta.Version.Index + 1
// for testing, we don't actually need to change anything about the
// volume object. let's just increment the version so we can see the
// call happened.
} else {
return errdefs.NotFound(fmt.Errorf("volume %q not found", nameOrID))
}
return nil
}

View file

@ -1996,6 +1996,8 @@ definitions:
x-nullable: false x-nullable: false
enum: ["local", "global"] enum: ["local", "global"]
example: "local" example: "local"
ClusterVolume:
$ref: "#/definitions/ClusterVolume"
Options: Options:
type: "object" type: "object"
description: | description: |
@ -2069,6 +2071,8 @@ definitions:
example: example:
com.example.some-label: "some-value" com.example.some-label: "some-value"
com.example.some-other-label: "some-other-value" com.example.some-other-label: "some-other-value"
ClusterVolumeSpec:
$ref: "#/definitions/ClusterVolumeSpec"
VolumeListResponse: VolumeListResponse:
type: "object" type: "object"
@ -5740,6 +5744,242 @@ definitions:
items: items:
$ref: "#/definitions/OCIPlatform" $ref: "#/definitions/OCIPlatform"
ClusterVolume:
type: "object"
description: |
Options and information specific to, and only present on, Swarm CSI
cluster volumes.
properties:
ID:
type: "string"
description: |
The Swarm ID of this volume. Because cluster volumes are Swarm
objects, they have an ID, unlike non-cluster volumes. This ID can
be used to refer to the Volume instead of the name.
Version:
$ref: "#/definitions/ObjectVersion"
CreatedAt:
type: "string"
format: "dateTime"
UpdatedAt:
type: "string"
format: "dateTime"
Spec:
$ref: "#/definitions/ClusterVolumeSpec"
Info:
type: "object"
description: |
Information about the global status of the volume.
properties:
CapacityBytes:
type: "integer"
format: "int64"
description: |
The capacity of the volume in bytes. A value of 0 indicates that
the capacity is unknown.
VolumeContext:
type: "object"
description: |
A map of strings to strings returned from the storage plugin when
the volume is created.
additionalProperties:
type: "string"
VolumeID:
type: "string"
description: |
The ID of the volume as returned by the CSI storage plugin. This
is distinct from the volume's ID as provided by Docker. This ID
is never used by the user when communicating with Docker to refer
to this volume. If the ID is blank, then the Volume has not been
successfully created in the plugin yet.
AccessibleTopology:
type: "array"
description: |
The topology this volume is actually accessible from.
items:
$ref: "#/definitions/Topology"
PublishStatus:
type: "array"
description: |
The status of the volume as it pertains to its publishing and use on
specific nodes
items:
type: "object"
properties:
NodeID:
type: "string"
description: |
The ID of the Swarm node the volume is published on.
State:
type: "string"
description: |
The published state of the volume.
* `pending-publish` The volume should be published to this node, but the call to the controller plugin to do so has not yet been successfully completed.
* `published` The volume is published successfully to the node.
* `pending-node-unpublish` The volume should be unpublished from the node, and the manager is awaiting confirmation from the worker that it has done so.
* `pending-controller-unpublish` The volume is successfully unpublished from the node, but has not yet been successfully unpublished on the controller.
enum:
- "pending-publish"
- "published"
- "pending-node-unpublish"
- "pending-controller-unpublish"
PublishContext:
type: "object"
description: |
A map of strings to strings returned by the CSI controller
plugin when a volume is published.
additionalProperties:
type: "string"
ClusterVolumeSpec:
type: "object"
description: |
Cluster-specific options used to create the volume.
properties:
Group:
type: "string"
description: |
Group defines the volume group of this volume. Volumes belonging to
the same group can be referred to by group name when creating
Services. Referring to a volume by group instructs Swarm to treat
volumes in that group interchangeably for the purpose of scheduling.
Volumes with an empty string for a group technically all belong to
the same, emptystring group.
AccessMode:
type: "object"
description: |
Defines how the volume is used by tasks.
properties:
Scope:
type: "string"
description: |
The set of nodes this volume can be used on at one time.
- `single` The volume may only be scheduled to one node at a time.
- `multi` the volume may be scheduled to any supported number of nodes at a time.
default: "single"
enum: ["single", "multi"]
x-nullable: false
Sharing:
type: "string"
description: |
The number and way that different tasks can use this volume
at one time.
- `none` The volume may only be used by one task at a time.
- `readonly` The volume may be used by any number of tasks, but they all must mount the volume as readonly
- `onewriter` The volume may be used by any number of tasks, but only one may mount it as read/write.
- `all` The volume may have any number of readers and writers.
default: "none"
enum: ["none", "readonly", "onewriter", "all"]
x-nullable: false
MountVolume:
type: "object"
description: |
Options for using this volume as a Mount-type volume.
Either MountVolume or BlockVolume, but not both, must be
present.
properties:
FsType:
type: "string"
description: |
Specifies the filesystem type for the mount volume.
Optional.
MountFlags:
type: "array"
description: |
Flags to pass when mounting the volume. Optional.
items:
type: "string"
BlockVolume:
type: "object"
description: |
Options for using this volume as a Block-type volume.
Intentionally empty.
Secrets:
type: "array"
description: |
Swarm Secrets that are passed to the CSI storage plugin when
operating on this volume.
items:
type: "object"
description: |
One cluster volume secret entry. Defines a key-value pair that
is passed to the plugin.
properties:
Key:
type: "string"
description: |
Key is the name of the key of the key-value pair passed to
the plugin.
Secret:
type: "string"
description: |
Secret is the swarm Secret object from which to read data.
This can be a Secret name or ID. The Secret data is
retrieved by swarm and used as the value of the key-value
pair passed to the plugin.
AccessibilityRequirements:
type: "object"
description: |
Requirements for the accessible topology of the volume. These
fields are optional. For an in-depth description of what these
fields mean, see the CSI specification.
properties:
Requisite:
type: "array"
description: |
A list of required topologies, at least one of which the
volume must be accessible from.
items:
$ref: "#/definitions/Topology"
Preferred:
type: "array"
description: |
A list of topologies that the volume should attempt to be
provisioned in.
items:
$ref: "#/definitions/Topology"
CapacityRange:
type: "object"
description: |
The desired capacity that the volume should be created with. If
empty, the plugin will decide the capacity.
properties:
RequiredBytes:
type: "integer"
format: "int64"
description: |
The volume must be at least this big. The value of 0
indicates an unspecified minimum
LimitBytes:
type: "integer"
format: "int64"
description: |
The volume must not be bigger than this. The value of 0
indicates an unspecified maximum.
Availability:
type: "string"
description: |
The availability of the volume for use in tasks.
- `active` The volume is fully available for scheduling on the cluster
- `pause` No new workloads should use the volume, but existing workloads are not stopped.
- `drain` All workloads using this volume should be stopped and rescheduled, and no new ones should be started.
default: "active"
x-nullable: false
enum:
- "active"
- "pause"
- "drain"
Topology:
description: |
A map of topological domains to topological segments. For in depth
details, see documentation for the Topology object in the CSI
specification.
type: "object"
additionalProperties:
type: "string"
paths: paths:
/containers/json: /containers/json:
get: get:
@ -9247,6 +9487,64 @@ paths:
type: "string" type: "string"
tags: ["Volume"] tags: ["Volume"]
put:
summary: |
"Update a volume. Valid only for Swarm cluster volumes"
operationId: "VolumeUpdate"
consumes: ["application/json"]
produces: ["application/json"]
responses:
200:
description: "no error"
400:
description: "bad parameter"
schema:
$ref: "#/definitions/ErrorResponse"
404:
description: "no such volume"
schema:
$ref: "#/definitions/ErrorResponse"
500:
description: "server error"
schema:
$ref: "#/definitions/ErrorResponse"
503:
description: "node is not part of a swarm"
schema:
$ref: "#/definitions/ErrorResponse"
parameters:
- name: "name"
in: "path"
description: "The name or ID of the volume"
type: "string"
required: true
- name: "body"
in: "body"
schema:
# though the schema for is an object that contains only a
# ClusterVolumeSpec, wrapping the ClusterVolumeSpec in this object
# means that if, later on, we support things like changing the
# labels, we can do so without duplicating that information to the
# ClusterVolumeSpec.
type: "object"
description: "Volume configuration"
properties:
Spec:
$ref: "#/definitions/ClusterVolumeSpec"
description: |
The spec of the volume to update. Currently, only Availability may
change. All other fields must remain unchanged.
- name: "version"
in: "query"
description: |
The version number of the volume being updated. This is required to
avoid conflicting writes. Found in the volume's `ClusterVolume`
field.
type: "integer"
format: "int64"
required: true
tags: ["Volume"]
delete: delete:
summary: "Remove a volume" summary: "Remove a volume"
description: "Instruct the driver to remove the volume." description: "Instruct the driver to remove the volume."
@ -9278,6 +9576,7 @@ paths:
type: "boolean" type: "boolean"
default: false default: false
tags: ["Volume"] tags: ["Volume"]
/volumes/prune: /volumes/prune:
post: post:
summary: "Delete unused volumes" summary: "Delete unused volumes"

View file

@ -17,6 +17,8 @@ const (
TypeTmpfs Type = "tmpfs" TypeTmpfs Type = "tmpfs"
// TypeNamedPipe is the type for mounting Windows named pipes // TypeNamedPipe is the type for mounting Windows named pipes
TypeNamedPipe Type = "npipe" TypeNamedPipe Type = "npipe"
// TypeCluster is the type for Swarm Cluster Volumes.
TypeCluster = "csi"
) )
// Mount represents a mount (volume). // Mount represents a mount (volume).
@ -30,9 +32,10 @@ type Mount struct {
ReadOnly bool `json:",omitempty"` ReadOnly bool `json:",omitempty"`
Consistency Consistency `json:",omitempty"` Consistency Consistency `json:",omitempty"`
BindOptions *BindOptions `json:",omitempty"` BindOptions *BindOptions `json:",omitempty"`
VolumeOptions *VolumeOptions `json:",omitempty"` VolumeOptions *VolumeOptions `json:",omitempty"`
TmpfsOptions *TmpfsOptions `json:",omitempty"` TmpfsOptions *TmpfsOptions `json:",omitempty"`
ClusterOptions *ClusterOptions `json:",omitempty"`
} }
// Propagation represents the propagation of a mount. // Propagation represents the propagation of a mount.
@ -129,3 +132,8 @@ type TmpfsOptions struct {
// Some of these may be straightforward to add, but others, such as // Some of these may be straightforward to add, but others, such as
// uid/gid have implications in a clustered system. // uid/gid have implications in a clustered system.
} }
// ClusterOptions specifies options for a Cluster volume.
type ClusterOptions struct {
// intentionally empty
}

View file

@ -53,6 +53,7 @@ type NodeDescription struct {
Resources Resources `json:",omitempty"` Resources Resources `json:",omitempty"`
Engine EngineDescription `json:",omitempty"` Engine EngineDescription `json:",omitempty"`
TLSInfo TLSInfo `json:",omitempty"` TLSInfo TLSInfo `json:",omitempty"`
CSIInfo []NodeCSIInfo `json:",omitempty"`
} }
// Platform represents the platform (Arch/OS). // Platform represents the platform (Arch/OS).
@ -68,6 +69,21 @@ type EngineDescription struct {
Plugins []PluginDescription `json:",omitempty"` Plugins []PluginDescription `json:",omitempty"`
} }
// NodeCSIInfo represents information about a CSI plugin available on the node
type NodeCSIInfo struct {
// PluginName is the name of the CSI plugin.
PluginName string `json:",omitempty"`
// NodeID is the ID of the node as reported by the CSI plugin. This is
// different from the swarm node ID.
NodeID string `json:",omitempty"`
// MaxVolumesPerNode is the maximum number of volumes that may be published
// to this node
MaxVolumesPerNode int64 `json:",omitempty"`
// AccessibleTopology indicates the location of this node in the CSI
// plugin's topology
AccessibleTopology *Topology `json:",omitempty"`
}
// PluginDescription represents the description of an engine plugin. // PluginDescription represents the description of an engine plugin.
type PluginDescription struct { type PluginDescription struct {
Type string `json:",omitempty"` Type string `json:",omitempty"`
@ -113,3 +129,11 @@ const (
// NodeStateDisconnected DISCONNECTED // NodeStateDisconnected DISCONNECTED
NodeStateDisconnected NodeState = "disconnected" NodeStateDisconnected NodeState = "disconnected"
) )
// Topology defines the CSI topology of this node. This type is a duplicate of
// github.com/docker/docker/api/types.Topology. Because the type definition
// is so simple and to avoid complicated structure or circular imports, we just
// duplicate it here. See that type for full documentation
type Topology struct {
Segments map[string]string `json:",omitempty"`
}

View file

@ -62,6 +62,11 @@ type Task struct {
// used to determine which Tasks belong to which run of the job. This field // used to determine which Tasks belong to which run of the job. This field
// is absent if the Service mode is Replicated or Global. // is absent if the Service mode is Replicated or Global.
JobIteration *Version `json:",omitempty"` JobIteration *Version `json:",omitempty"`
// Volumes is the list of VolumeAttachments for this task. It specifies
// which particular volumes are to be used by this particular task, and
// fulfilling what mounts in the spec.
Volumes []VolumeAttachment
} }
// TaskSpec represents the spec of a task. // TaskSpec represents the spec of a task.
@ -204,3 +209,17 @@ type ContainerStatus struct {
type PortStatus struct { type PortStatus struct {
Ports []PortConfig `json:",omitempty"` Ports []PortConfig `json:",omitempty"`
} }
// VolumeAttachment contains the associating a Volume to a Task.
type VolumeAttachment struct {
// ID is the Swarmkit ID of the Volume. This is not the CSI VolumeId.
ID string `json:",omitempty"`
// Source, together with Target, indicates the Mount, as specified in the
// ContainerSpec, that this volume fulfills.
Source string `json:",omitempty"`
// Target, together with Source, indicates the Mount, as specified
// in the ContainerSpec, that this volume fulfills.
Target string `json:",omitempty"`
}

View file

@ -0,0 +1,420 @@
package volume
import (
"github.com/docker/docker/api/types/swarm"
)
// ClusterVolume contains options and information specific to, and only present
// on, Swarm CSI cluster volumes.
type ClusterVolume struct {
// ID is the Swarm ID of the volume. Because cluster volumes are Swarm
// objects, they have an ID, unlike non-cluster volumes, which only have a
// Name. This ID can be used to refer to the cluster volume.
ID string
// Meta is the swarm metadata about this volume.
swarm.Meta
// Spec is the cluster-specific options from which this volume is derived.
Spec ClusterVolumeSpec
// PublishStatus contains the status of the volume as it pertains to its
// publishing on Nodes.
PublishStatus []*PublishStatus `json:",omitempty"`
// Info is information about the global status of the volume.
Info *Info `json:",omitempty"`
}
// ClusterVolumeSpec contains the spec used to create this volume.
type ClusterVolumeSpec struct {
// Group defines the volume group of this volume. Volumes belonging to the
// same group can be referred to by group name when creating Services.
// Referring to a volume by group instructs swarm to treat volumes in that
// group interchangeably for the purpose of scheduling. Volumes with an
// empty string for a group technically all belong to the same, emptystring
// group.
Group string `json:",omitempty"`
// AccessMode defines how the volume is used by tasks.
AccessMode *AccessMode `json:",omitempty"`
// AccessibilityRequirements specifies where in the cluster a volume must
// be accessible from.
//
// This field must be empty if the plugin does not support
// VOLUME_ACCESSIBILITY_CONSTRAINTS capabilities. If it is present but the
// plugin does not support it, volume will not be created.
//
// If AccessibilityRequirements is empty, but the plugin does support
// VOLUME_ACCESSIBILITY_CONSTRAINTS, then Swarmkit will assume the entire
// cluster is a valid target for the volume.
AccessibilityRequirements *TopologyRequirement `json:",omitempty"`
// CapacityRange defines the desired capacity that the volume should be
// created with. If nil, the plugin will decide the capacity.
CapacityRange *CapacityRange `json:",omitempty"`
// Secrets defines Swarm Secrets that are passed to the CSI storage plugin
// when operating on this volume.
Secrets []Secret `json:",omitempty"`
// Availability is the Volume's desired availability. Analogous to Node
// Availability, this allows the user to take volumes offline in order to
// update or delete them.
Availability Availability `json:",omitempty"`
}
// Availability specifies the availability of the volume.
type Availability string
const (
// AvailabilityActive indicates that the volume is active and fully
// schedulable on the cluster.
AvailabilityActive Availability = "active"
// AvailabilityPause indicates that no new workloads should use the
// volume, but existing workloads can continue to use it.
AvailabilityPause Availability = "pause"
// AvailabilityDrain indicates that all workloads using this volume
// should be rescheduled, and the volume unpublished from all nodes.
AvailabilityDrain Availability = "drain"
)
// AccessMode defines the access mode of a volume.
type AccessMode struct {
// Scope defines the set of nodes this volume can be used on at one time.
Scope Scope `json:",omitempty"`
// Sharing defines the number and way that different tasks can use this
// volume at one time.
Sharing SharingMode `json:",omitempty"`
// MountVolume defines options for using this volume as a Mount-type
// volume.
//
// Either BlockVolume or MountVolume, but not both, must be present.
MountVolume *TypeMount `json:",omitempty"`
// BlockVolume defines options for using this volume as a Block-type
// volume.
//
// Either BlockVolume or MountVolume, but not both, must be present.
BlockVolume *TypeBlock `json:",omitempty"`
}
// Scope defines the Scope of a CSI Volume. This is how many nodes a
// Volume can be accessed simultaneously on.
type Scope string
const (
// ScopeSingleNode indicates the volume can be used on one node at a
// time.
ScopeSingleNode Scope = "single"
// ScopeMultiNode indicates the volume can be used on many nodes at
// the same time.
ScopeMultiNode Scope = "multi"
)
// SharingMode defines the Sharing of a CSI Volume. This is how Tasks using a
// Volume at the same time can use it.
type SharingMode string
const (
// SharingNone indicates that only one Task may use the Volume at a
// time.
SharingNone SharingMode = "none"
// SharingReadOnly indicates that the Volume may be shared by any
// number of Tasks, but they must be read-only.
SharingReadOnly SharingMode = "readonly"
// SharingOneWriter indicates that the Volume may be shared by any
// number of Tasks, but all after the first must be read-only.
SharingOneWriter SharingMode = "onewriter"
// SharingAll means that the Volume may be shared by any number of
// Tasks, as readers or writers.
SharingAll SharingMode = "all"
)
// TypeBlock defines options for using a volume as a block-type volume.
//
// Intentionally empty.
type TypeBlock struct{}
// TypeMount contains options for using a volume as a Mount-type
// volume.
type TypeMount struct {
// FsType specifies the filesystem type for the mount volume. Optional.
FsType string `json:",omitempty"`
// MountFlags defines flags to pass when mounting the volume. Optional.
MountFlags []string `json:",omitempty"`
}
// TopologyRequirement expresses the user's requirements for a volume's
// accessible topology.
type TopologyRequirement struct {
// Requisite specifies a list of Topologies, at least one of which the
// volume must be accessible from.
//
// Taken verbatim from the CSI Spec:
//
// Specifies the list of topologies the provisioned volume MUST be
// accessible from.
// This field is OPTIONAL. If TopologyRequirement is specified either
// requisite or preferred or both MUST be specified.
//
// If requisite is specified, the provisioned volume MUST be
// accessible from at least one of the requisite topologies.
//
// Given
// x = number of topologies provisioned volume is accessible from
// n = number of requisite topologies
// The CO MUST ensure n >= 1. The SP MUST ensure x >= 1
// If x==n, then the SP MUST make the provisioned volume available to
// all topologies from the list of requisite topologies. If it is
// unable to do so, the SP MUST fail the CreateVolume call.
// For example, if a volume should be accessible from a single zone,
// and requisite =
// {"region": "R1", "zone": "Z2"}
// then the provisioned volume MUST be accessible from the "region"
// "R1" and the "zone" "Z2".
// Similarly, if a volume should be accessible from two zones, and
// requisite =
// {"region": "R1", "zone": "Z2"},
// {"region": "R1", "zone": "Z3"}
// then the provisioned volume MUST be accessible from the "region"
// "R1" and both "zone" "Z2" and "zone" "Z3".
//
// If x<n, then the SP SHALL choose x unique topologies from the list
// of requisite topologies. If it is unable to do so, the SP MUST fail
// the CreateVolume call.
// For example, if a volume should be accessible from a single zone,
// and requisite =
// {"region": "R1", "zone": "Z2"},
// {"region": "R1", "zone": "Z3"}
// then the SP may choose to make the provisioned volume available in
// either the "zone" "Z2" or the "zone" "Z3" in the "region" "R1".
// Similarly, if a volume should be accessible from two zones, and
// requisite =
// {"region": "R1", "zone": "Z2"},
// {"region": "R1", "zone": "Z3"},
// {"region": "R1", "zone": "Z4"}
// then the provisioned volume MUST be accessible from any combination
// of two unique topologies: e.g. "R1/Z2" and "R1/Z3", or "R1/Z2" and
// "R1/Z4", or "R1/Z3" and "R1/Z4".
//
// If x>n, then the SP MUST make the provisioned volume available from
// all topologies from the list of requisite topologies and MAY choose
// the remaining x-n unique topologies from the list of all possible
// topologies. If it is unable to do so, the SP MUST fail the
// CreateVolume call.
// For example, if a volume should be accessible from two zones, and
// requisite =
// {"region": "R1", "zone": "Z2"}
// then the provisioned volume MUST be accessible from the "region"
// "R1" and the "zone" "Z2" and the SP may select the second zone
// independently, e.g. "R1/Z4".
Requisite []Topology `json:",omitempty"`
// Preferred is a list of Topologies that the volume should attempt to be
// provisioned in.
//
// Taken from the CSI spec:
//
// Specifies the list of topologies the CO would prefer the volume to
// be provisioned in.
//
// This field is OPTIONAL. If TopologyRequirement is specified either
// requisite or preferred or both MUST be specified.
//
// An SP MUST attempt to make the provisioned volume available using
// the preferred topologies in order from first to last.
//
// If requisite is specified, all topologies in preferred list MUST
// also be present in the list of requisite topologies.
//
// If the SP is unable to to make the provisioned volume available
// from any of the preferred topologies, the SP MAY choose a topology
// from the list of requisite topologies.
// If the list of requisite topologies is not specified, then the SP
// MAY choose from the list of all possible topologies.
// If the list of requisite topologies is specified and the SP is
// unable to to make the provisioned volume available from any of the
// requisite topologies it MUST fail the CreateVolume call.
//
// Example 1:
// Given a volume should be accessible from a single zone, and
// requisite =
// {"region": "R1", "zone": "Z2"},
// {"region": "R1", "zone": "Z3"}
// preferred =
// {"region": "R1", "zone": "Z3"}
// then the the SP SHOULD first attempt to make the provisioned volume
// available from "zone" "Z3" in the "region" "R1" and fall back to
// "zone" "Z2" in the "region" "R1" if that is not possible.
//
// Example 2:
// Given a volume should be accessible from a single zone, and
// requisite =
// {"region": "R1", "zone": "Z2"},
// {"region": "R1", "zone": "Z3"},
// {"region": "R1", "zone": "Z4"},
// {"region": "R1", "zone": "Z5"}
// preferred =
// {"region": "R1", "zone": "Z4"},
// {"region": "R1", "zone": "Z2"}
// then the the SP SHOULD first attempt to make the provisioned volume
// accessible from "zone" "Z4" in the "region" "R1" and fall back to
// "zone" "Z2" in the "region" "R1" if that is not possible. If that
// is not possible, the SP may choose between either the "zone"
// "Z3" or "Z5" in the "region" "R1".
//
// Example 3:
// Given a volume should be accessible from TWO zones (because an
// opaque parameter in CreateVolumeRequest, for example, specifies
// the volume is accessible from two zones, aka synchronously
// replicated), and
// requisite =
// {"region": "R1", "zone": "Z2"},
// {"region": "R1", "zone": "Z3"},
// {"region": "R1", "zone": "Z4"},
// {"region": "R1", "zone": "Z5"}
// preferred =
// {"region": "R1", "zone": "Z5"},
// {"region": "R1", "zone": "Z3"}
// then the the SP SHOULD first attempt to make the provisioned volume
// accessible from the combination of the two "zones" "Z5" and "Z3" in
// the "region" "R1". If that's not possible, it should fall back to
// a combination of "Z5" and other possibilities from the list of
// requisite. If that's not possible, it should fall back to a
// combination of "Z3" and other possibilities from the list of
// requisite. If that's not possible, it should fall back to a
// combination of other possibilities from the list of requisite.
Preferred []Topology `json:",omitempty"`
}
// Topology is a map of topological domains to topological segments.
//
// This description is taken verbatim from the CSI Spec:
//
// A topological domain is a sub-division of a cluster, like "region",
// "zone", "rack", etc.
// A topological segment is a specific instance of a topological domain,
// like "zone3", "rack3", etc.
// For example {"com.company/zone": "Z1", "com.company/rack": "R3"}
// Valid keys have two segments: an OPTIONAL prefix and name, separated
// by a slash (/), for example: "com.company.example/zone".
// The key name segment is REQUIRED. The prefix is OPTIONAL.
// The key name MUST be 63 characters or less, begin and end with an
// alphanumeric character ([a-z0-9A-Z]), and contain only dashes (-),
// underscores (_), dots (.), or alphanumerics in between, for example
// "zone".
// The key prefix MUST be 63 characters or less, begin and end with a
// lower-case alphanumeric character ([a-z0-9]), contain only
// dashes (-), dots (.), or lower-case alphanumerics in between, and
// follow domain name notation format
// (https://tools.ietf.org/html/rfc1035#section-2.3.1).
// The key prefix SHOULD include the plugin's host company name and/or
// the plugin name, to minimize the possibility of collisions with keys
// from other plugins.
// If a key prefix is specified, it MUST be identical across all
// topology keys returned by the SP (across all RPCs).
// Keys MUST be case-insensitive. Meaning the keys "Zone" and "zone"
// MUST not both exist.
// Each value (topological segment) MUST contain 1 or more strings.
// Each string MUST be 63 characters or less and begin and end with an
// alphanumeric character with '-', '_', '.', or alphanumerics in
// between.
type Topology struct {
Segments map[string]string `json:",omitempty"`
}
// CapacityRange describes the minimum and maximum capacity a volume should be
// created with
type CapacityRange struct {
// RequiredBytes specifies that a volume must be at least this big. The
// value of 0 indicates an unspecified minimum.
RequiredBytes int64
// LimitBytes specifies that a volume must not be bigger than this. The
// value of 0 indicates an unspecified maximum
LimitBytes int64
}
// Secret represents a Swarm Secret value that must be passed to the CSI
// storage plugin when operating on this Volume. It represents one key-value
// pair of possibly many.
type Secret struct {
// Key is the name of the key of the key-value pair passed to the plugin.
Key string
// Secret is the swarm Secret object from which to read data. This can be a
// Secret name or ID. The Secret data is retrieved by Swarm and used as the
// value of the key-value pair passed to the plugin.
Secret string
}
// PublishState represents the state of a Volume as it pertains to its
// use on a particular Node.
type PublishState string
const (
// StatePending indicates that the volume should be published on
// this node, but the call to ControllerPublishVolume has not been
// successfully completed yet and the result recorded by swarmkit.
StatePending PublishState = "pending-publish"
// StatePublished means the volume is published successfully to the node.
StatePublished PublishState = "published"
// StatePendingNodeUnpublish indicates that the Volume should be
// unpublished on the Node, and we're waiting for confirmation that it has
// done so. After the Node has confirmed that the Volume has been
// unpublished, the state will move to StatePendingUnpublish.
StatePendingNodeUnpublish PublishState = "pending-node-unpublish"
// StatePendingUnpublish means the volume is still published to the node
// by the controller, awaiting the operation to unpublish it.
StatePendingUnpublish PublishState = "pending-controller-unpublish"
)
// PublishStatus represents the status of the volume as published to an
// individual node
type PublishStatus struct {
// NodeID is the ID of the swarm node this Volume is published to.
NodeID string `json:",omitempty"`
// State is the publish state of the volume.
State PublishState `json:",omitempty"`
// PublishContext is the PublishContext returned by the CSI plugin when
// a volume is published.
PublishContext map[string]string `json:",omitempty"`
}
// Info contains information about the Volume as a whole as provided by
// the CSI storage plugin.
type Info struct {
// CapacityBytes is the capacity of the volume in bytes. A value of 0
// indicates that the capacity is unknown.
CapacityBytes int64 `json:",omitempty"`
// VolumeContext is the context originating from the CSI storage plugin
// when the Volume is created.
VolumeContext map[string]string `json:",omitempty"`
// VolumeID is the ID of the Volume as seen by the CSI storage plugin. This
// is distinct from the Volume's Swarm ID, which is the ID used by all of
// the Docker Engine to refer to the Volume. If this field is blank, then
// the Volume has not been successfully created yet.
VolumeID string `json:",omitempty"`
// AccessibleTopolgoy is the topology this volume is actually accessible
// from.
AccessibleTopology []Topology `json:",omitempty"`
}

View file

@ -9,6 +9,9 @@ package volume
// swagger:model CreateOptions // swagger:model CreateOptions
type CreateOptions struct { type CreateOptions struct {
// cluster volume spec
ClusterVolumeSpec *ClusterVolumeSpec `json:"ClusterVolumeSpec,omitempty"`
// Name of the volume driver to use. // Name of the volume driver to use.
Driver string `json:"Driver,omitempty"` Driver string `json:"Driver,omitempty"`

View file

@ -0,0 +1,8 @@
package volume // import "github.com/docker/docker/api/types/volume"
import "github.com/docker/docker/api/types/filters"
// ListOptions holds parameters to list volumes.
type ListOptions struct {
Filters filters.Args
}

View file

@ -7,6 +7,9 @@ package volume
// swagger:model Volume // swagger:model Volume
type Volume struct { type Volume struct {
// cluster volume
ClusterVolume *ClusterVolume `json:"ClusterVolume,omitempty"`
// Date/Time the volume was created. // Date/Time the volume was created.
CreatedAt string `json:"CreatedAt,omitempty"` CreatedAt string `json:"CreatedAt,omitempty"`

View file

@ -0,0 +1,7 @@
package volume // import "github.com/docker/docker/api/types/volume"
// UpdateOptions is configuration to update a Volume with.
type UpdateOptions struct {
// Spec is the ClusterVolumeSpec to update the volume to.
Spec *ClusterVolumeSpec `json:"Spec,omitempty"`
}

View file

@ -179,6 +179,7 @@ type VolumeAPIClient interface {
VolumeList(ctx context.Context, filter filters.Args) (volume.ListResponse, error) VolumeList(ctx context.Context, filter filters.Args) (volume.ListResponse, error)
VolumeRemove(ctx context.Context, volumeID string, force bool) error VolumeRemove(ctx context.Context, volumeID string, force bool) error
VolumesPrune(ctx context.Context, pruneFilter filters.Args) (types.VolumesPruneReport, error) VolumesPrune(ctx context.Context, pruneFilter filters.Args) (types.VolumesPruneReport, error)
VolumeUpdate(ctx context.Context, volumeID string, version swarm.Version, options volume.UpdateOptions) error
} }
// SecretAPIClient defines API client methods for secrets // SecretAPIClient defines API client methods for secrets

View file

@ -49,6 +49,14 @@ func (cli *Client) postRaw(ctx context.Context, path string, query url.Values, b
return cli.sendRequest(ctx, http.MethodPost, path, query, body, headers) return cli.sendRequest(ctx, http.MethodPost, path, query, body, headers)
} }
func (cli *Client) put(ctx context.Context, path string, query url.Values, obj interface{}, headers map[string][]string) (serverResponse, error) {
body, headers, err := encodeBody(obj, headers)
if err != nil {
return serverResponse{}, err
}
return cli.sendRequest(ctx, http.MethodPut, path, query, body, headers)
}
// putRaw sends an http request to the docker API using the method PUT. // putRaw sends an http request to the docker API using the method PUT.
func (cli *Client) putRaw(ctx context.Context, path string, query url.Values, body io.Reader, headers map[string][]string) (serverResponse, error) { func (cli *Client) putRaw(ctx context.Context, path string, query url.Values, body io.Reader, headers map[string][]string) (serverResponse, error) {
return cli.sendRequest(ctx, http.MethodPut, path, query, body, headers) return cli.sendRequest(ctx, http.MethodPut, path, query, body, headers)

25
client/volume_update.go Normal file
View file

@ -0,0 +1,25 @@
package client // import "github.com/docker/docker/client"
import (
"context"
"net/url"
"strconv"
"github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/api/types/volume"
)
// VolumeUpdate updates a volume. This only works for Cluster Volumes, and
// only some fields can be updated.
func (cli *Client) VolumeUpdate(ctx context.Context, volumeID string, version swarm.Version, options volume.UpdateOptions) error {
if err := cli.NewVersionError("1.42", "volume update"); err != nil {
return err
}
query := url.Values{}
query.Set("version", strconv.FormatUint(version.Index, 10))
resp, err := cli.put(ctx, "/volumes/"+volumeID, query, options, nil)
ensureReaderClosed(resp)
return err
}

View file

@ -0,0 +1,55 @@
package client // import "github.com/docker/docker/client"
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"strings"
"testing"
"github.com/docker/docker/api/types/swarm"
volumetypes "github.com/docker/docker/api/types/volume"
"github.com/docker/docker/errdefs"
)
func TestVolumeUpdateError(t *testing.T) {
client := &Client{
client: newMockClient(errorMock(http.StatusInternalServerError, "Server error")),
}
err := client.VolumeUpdate(context.Background(), "", swarm.Version{}, volumetypes.UpdateOptions{})
if !errdefs.IsSystem(err) {
t.Fatalf("expected a Server Error, got %[1]T: %[1]v", err)
}
}
func TestVolumeUpdate(t *testing.T) {
expectedURL := "/volumes/test1"
expectedVersion := "version=10"
client := &Client{
client: newMockClient(func(req *http.Request) (*http.Response, error) {
if !strings.HasPrefix(req.URL.Path, expectedURL) {
return nil, fmt.Errorf("Expected URL '%s', got '%s'", expectedURL, req.URL)
}
if req.Method != http.MethodPut {
return nil, fmt.Errorf("expected PUT method, got %s", req.Method)
}
if !strings.Contains(req.URL.RawQuery, expectedVersion) {
return nil, fmt.Errorf("expected query to contain '%s', got '%s'", expectedVersion, req.URL.RawQuery)
}
return &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewReader([]byte("body"))),
}, nil
}),
}
err := client.VolumeUpdate(context.Background(), "test1", swarm.Version{Index: uint64(10)}, volumetypes.UpdateOptions{})
if err != nil {
t.Fatal(err)
}
}

View file

@ -531,7 +531,7 @@ func initRouter(opts routerOptions) {
container.NewRouter(opts.daemon, decoder, opts.daemon.RawSysInfo().CgroupUnified), container.NewRouter(opts.daemon, decoder, opts.daemon.RawSysInfo().CgroupUnified),
image.NewRouter(opts.daemon.ImageService()), image.NewRouter(opts.daemon.ImageService()),
systemrouter.NewRouter(opts.daemon, opts.cluster, opts.buildkit, opts.features), systemrouter.NewRouter(opts.daemon, opts.cluster, opts.buildkit, opts.features),
volume.NewRouter(opts.daemon.VolumesService()), volume.NewRouter(opts.daemon.VolumesService(), opts.cluster),
build.NewRouter(opts.buildBackend, opts.daemon, opts.features), build.NewRouter(opts.buildBackend, opts.daemon, opts.features),
sessionrouter.NewRouter(opts.sessionManager), sessionrouter.NewRouter(opts.sessionManager),
swarmrouter.NewRouter(opts.cluster), swarmrouter.NewRouter(opts.cluster),

View file

@ -56,6 +56,18 @@ func NodeFromGRPC(n swarmapi.Node) types.Node {
node.Description.TLSInfo.CertIssuerPublicKey = n.Description.TLSInfo.CertIssuerPublicKey node.Description.TLSInfo.CertIssuerPublicKey = n.Description.TLSInfo.CertIssuerPublicKey
node.Description.TLSInfo.CertIssuerSubject = n.Description.TLSInfo.CertIssuerSubject node.Description.TLSInfo.CertIssuerSubject = n.Description.TLSInfo.CertIssuerSubject
} }
for _, csi := range n.Description.CSIInfo {
if csi != nil {
node.Description.CSIInfo = append(
node.Description.CSIInfo,
types.NodeCSIInfo{
PluginName: csi.PluginName,
NodeID: csi.NodeID,
MaxVolumesPerNode: csi.MaxVolumesPerNode,
},
)
}
}
} }
// Manager // Manager

View file

@ -57,6 +57,17 @@ func TaskFromGRPC(t swarmapi.Task) (types.Task, error) {
} }
} }
// appending to a nil slice is valid. if there are no items in t.Volumes,
// then the task.Volumes will remain nil; otherwise, it will contain
// converted entries.
for _, v := range t.Volumes {
task.Volumes = append(task.Volumes, types.VolumeAttachment{
ID: v.ID,
Source: v.Source,
Target: v.Target,
})
}
if t.Status.PortStatus == nil { if t.Status.PortStatus == nil {
return task, nil return task, nil
} }

View file

@ -0,0 +1,311 @@
package convert // import "github.com/docker/docker/daemon/cluster/convert"
import (
volumetypes "github.com/docker/docker/api/types/volume"
gogotypes "github.com/gogo/protobuf/types"
swarmapi "github.com/moby/swarmkit/v2/api"
)
// VolumeFromGRPC converts a swarmkit api Volume object to a docker api Volume
// object
func VolumeFromGRPC(v *swarmapi.Volume) volumetypes.Volume {
clusterVolumeSpec := volumetypes.ClusterVolumeSpec{
Group: v.Spec.Group,
AccessMode: accessModeFromGRPC(v.Spec.AccessMode),
AccessibilityRequirements: topologyRequirementFromGRPC(v.Spec.AccessibilityRequirements),
CapacityRange: capacityRangeFromGRPC(v.Spec.CapacityRange),
Secrets: volumeSecretsFromGRPC(v.Spec.Secrets),
Availability: volumeAvailabilityFromGRPC(v.Spec.Availability),
}
clusterVolume := &volumetypes.ClusterVolume{
ID: v.ID,
Spec: clusterVolumeSpec,
PublishStatus: volumePublishStatusFromGRPC(v.PublishStatus),
Info: volumeInfoFromGRPC(v.VolumeInfo),
}
clusterVolume.Version.Index = v.Meta.Version.Index
clusterVolume.CreatedAt, _ = gogotypes.TimestampFromProto(v.Meta.CreatedAt)
clusterVolume.UpdatedAt, _ = gogotypes.TimestampFromProto(v.Meta.UpdatedAt)
return volumetypes.Volume{
ClusterVolume: clusterVolume,
CreatedAt: clusterVolume.CreatedAt.String(),
Driver: v.Spec.Driver.Name,
Labels: v.Spec.Annotations.Labels,
Name: v.Spec.Annotations.Name,
Options: v.Spec.Driver.Options,
Scope: "global",
}
}
func volumeSpecToGRPC(spec volumetypes.ClusterVolumeSpec) *swarmapi.VolumeSpec {
swarmSpec := &swarmapi.VolumeSpec{
Group: spec.Group,
}
if spec.AccessMode != nil {
swarmSpec.AccessMode = &swarmapi.VolumeAccessMode{}
switch spec.AccessMode.Scope {
case volumetypes.ScopeSingleNode:
swarmSpec.AccessMode.Scope = swarmapi.VolumeScopeSingleNode
case volumetypes.ScopeMultiNode:
swarmSpec.AccessMode.Scope = swarmapi.VolumeScopeMultiNode
}
switch spec.AccessMode.Sharing {
case volumetypes.SharingNone:
swarmSpec.AccessMode.Sharing = swarmapi.VolumeSharingNone
case volumetypes.SharingReadOnly:
swarmSpec.AccessMode.Sharing = swarmapi.VolumeSharingReadOnly
case volumetypes.SharingOneWriter:
swarmSpec.AccessMode.Sharing = swarmapi.VolumeSharingOneWriter
case volumetypes.SharingAll:
swarmSpec.AccessMode.Sharing = swarmapi.VolumeSharingAll
}
if spec.AccessMode.BlockVolume != nil {
swarmSpec.AccessMode.AccessType = &swarmapi.VolumeAccessMode_Block{
Block: &swarmapi.VolumeAccessMode_BlockVolume{},
}
}
if spec.AccessMode.MountVolume != nil {
swarmSpec.AccessMode.AccessType = &swarmapi.VolumeAccessMode_Mount{
Mount: &swarmapi.VolumeAccessMode_MountVolume{
FsType: spec.AccessMode.MountVolume.FsType,
MountFlags: spec.AccessMode.MountVolume.MountFlags,
},
}
}
}
for _, secret := range spec.Secrets {
swarmSpec.Secrets = append(swarmSpec.Secrets, &swarmapi.VolumeSecret{
Key: secret.Key,
Secret: secret.Secret,
})
}
if spec.AccessibilityRequirements != nil {
swarmSpec.AccessibilityRequirements = &swarmapi.TopologyRequirement{}
for _, top := range spec.AccessibilityRequirements.Requisite {
swarmSpec.AccessibilityRequirements.Requisite = append(
swarmSpec.AccessibilityRequirements.Requisite,
&swarmapi.Topology{
Segments: top.Segments,
},
)
}
for _, top := range spec.AccessibilityRequirements.Preferred {
swarmSpec.AccessibilityRequirements.Preferred = append(
swarmSpec.AccessibilityRequirements.Preferred,
&swarmapi.Topology{
Segments: top.Segments,
},
)
}
}
if spec.CapacityRange != nil {
swarmSpec.CapacityRange = &swarmapi.CapacityRange{
RequiredBytes: spec.CapacityRange.RequiredBytes,
LimitBytes: spec.CapacityRange.LimitBytes,
}
}
// availability is not a pointer, it is a value. if the user does not
// specify an availability, it will be inferred as the 0-value, which is
// "active".
switch spec.Availability {
case volumetypes.AvailabilityActive:
swarmSpec.Availability = swarmapi.VolumeAvailabilityActive
case volumetypes.AvailabilityPause:
swarmSpec.Availability = swarmapi.VolumeAvailabilityPause
case volumetypes.AvailabilityDrain:
swarmSpec.Availability = swarmapi.VolumeAvailabilityDrain
}
return swarmSpec
}
// VolumeCreateToGRPC takes a VolumeCreateBody and outputs the matching
// swarmapi VolumeSpec.
func VolumeCreateToGRPC(volume *volumetypes.CreateOptions) *swarmapi.VolumeSpec {
var swarmSpec *swarmapi.VolumeSpec
if volume != nil && volume.ClusterVolumeSpec != nil {
swarmSpec = volumeSpecToGRPC(*volume.ClusterVolumeSpec)
} else {
swarmSpec = &swarmapi.VolumeSpec{}
}
swarmSpec.Annotations = swarmapi.Annotations{
Name: volume.Name,
Labels: volume.Labels,
}
swarmSpec.Driver = &swarmapi.Driver{
Name: volume.Driver,
Options: volume.DriverOpts,
}
return swarmSpec
}
func volumeInfoFromGRPC(info *swarmapi.VolumeInfo) *volumetypes.Info {
if info == nil {
return nil
}
var accessibleTopology []volumetypes.Topology
if info.AccessibleTopology != nil {
accessibleTopology = make([]volumetypes.Topology, len(info.AccessibleTopology))
for i, top := range info.AccessibleTopology {
accessibleTopology[i] = topologyFromGRPC(top)
}
}
return &volumetypes.Info{
CapacityBytes: info.CapacityBytes,
VolumeContext: info.VolumeContext,
VolumeID: info.VolumeID,
AccessibleTopology: accessibleTopology,
}
}
func volumePublishStatusFromGRPC(publishStatus []*swarmapi.VolumePublishStatus) []*volumetypes.PublishStatus {
if publishStatus == nil {
return nil
}
vps := make([]*volumetypes.PublishStatus, len(publishStatus))
for i, status := range publishStatus {
var state volumetypes.PublishState
switch status.State {
case swarmapi.VolumePublishStatus_PENDING_PUBLISH:
state = volumetypes.StatePending
case swarmapi.VolumePublishStatus_PUBLISHED:
state = volumetypes.StatePublished
case swarmapi.VolumePublishStatus_PENDING_NODE_UNPUBLISH:
state = volumetypes.StatePendingNodeUnpublish
case swarmapi.VolumePublishStatus_PENDING_UNPUBLISH:
state = volumetypes.StatePendingUnpublish
}
vps[i] = &volumetypes.PublishStatus{
NodeID: status.NodeID,
State: state,
PublishContext: status.PublishContext,
}
}
return vps
}
func accessModeFromGRPC(accessMode *swarmapi.VolumeAccessMode) *volumetypes.AccessMode {
if accessMode == nil {
return nil
}
convertedAccessMode := &volumetypes.AccessMode{}
switch accessMode.Scope {
case swarmapi.VolumeScopeSingleNode:
convertedAccessMode.Scope = volumetypes.ScopeSingleNode
case swarmapi.VolumeScopeMultiNode:
convertedAccessMode.Scope = volumetypes.ScopeMultiNode
}
switch accessMode.Sharing {
case swarmapi.VolumeSharingNone:
convertedAccessMode.Sharing = volumetypes.SharingNone
case swarmapi.VolumeSharingReadOnly:
convertedAccessMode.Sharing = volumetypes.SharingReadOnly
case swarmapi.VolumeSharingOneWriter:
convertedAccessMode.Sharing = volumetypes.SharingOneWriter
case swarmapi.VolumeSharingAll:
convertedAccessMode.Sharing = volumetypes.SharingAll
}
if block := accessMode.GetBlock(); block != nil {
convertedAccessMode.BlockVolume = &volumetypes.TypeBlock{}
}
if mount := accessMode.GetMount(); mount != nil {
convertedAccessMode.MountVolume = &volumetypes.TypeMount{
FsType: mount.FsType,
MountFlags: mount.MountFlags,
}
}
return convertedAccessMode
}
func volumeSecretsFromGRPC(secrets []*swarmapi.VolumeSecret) []volumetypes.Secret {
if secrets == nil {
return nil
}
convertedSecrets := make([]volumetypes.Secret, len(secrets))
for i, secret := range secrets {
convertedSecrets[i] = volumetypes.Secret{
Key: secret.Key,
Secret: secret.Secret,
}
}
return convertedSecrets
}
func topologyRequirementFromGRPC(top *swarmapi.TopologyRequirement) *volumetypes.TopologyRequirement {
if top == nil {
return nil
}
convertedTop := &volumetypes.TopologyRequirement{}
if top.Requisite != nil {
convertedTop.Requisite = make([]volumetypes.Topology, len(top.Requisite))
for i, req := range top.Requisite {
convertedTop.Requisite[i] = topologyFromGRPC(req)
}
}
if top.Preferred != nil {
convertedTop.Preferred = make([]volumetypes.Topology, len(top.Preferred))
for i, pref := range top.Preferred {
convertedTop.Preferred[i] = topologyFromGRPC(pref)
}
}
return convertedTop
}
func topologyFromGRPC(top *swarmapi.Topology) volumetypes.Topology {
if top == nil {
return volumetypes.Topology{}
}
return volumetypes.Topology{
Segments: top.Segments,
}
}
func capacityRangeFromGRPC(capacity *swarmapi.CapacityRange) *volumetypes.CapacityRange {
if capacity == nil {
return nil
}
return &volumetypes.CapacityRange{
RequiredBytes: capacity.RequiredBytes,
LimitBytes: capacity.LimitBytes,
}
}
func volumeAvailabilityFromGRPC(availability swarmapi.VolumeSpec_VolumeAvailability) volumetypes.Availability {
switch availability {
case swarmapi.VolumeAvailabilityActive:
return volumetypes.AvailabilityActive
case swarmapi.VolumeAvailabilityPause:
return volumetypes.AvailabilityPause
}
return volumetypes.AvailabilityDrain
}

View file

@ -0,0 +1,210 @@
package convert
import (
"testing"
volumetypes "github.com/docker/docker/api/types/volume"
swarmapi "github.com/moby/swarmkit/v2/api"
"gotest.tools/v3/assert"
)
func TestTopologyFromGRPC(t *testing.T) {
nilTopology := topologyFromGRPC(nil)
assert.DeepEqual(t, nilTopology, volumetypes.Topology{})
swarmTop := &swarmapi.Topology{
Segments: map[string]string{"foo": "bar"},
}
top := topologyFromGRPC(swarmTop)
assert.DeepEqual(t, top.Segments, swarmTop.Segments)
}
func TestCapacityRangeFromGRPC(t *testing.T) {
nilCapacity := capacityRangeFromGRPC(nil)
assert.Assert(t, nilCapacity == nil)
swarmZeroCapacity := &swarmapi.CapacityRange{}
zeroCapacity := capacityRangeFromGRPC(swarmZeroCapacity)
assert.Assert(t, zeroCapacity != nil)
assert.Equal(t, zeroCapacity.RequiredBytes, int64(0))
assert.Equal(t, zeroCapacity.LimitBytes, int64(0))
swarmNonZeroCapacity := &swarmapi.CapacityRange{
RequiredBytes: 1024,
LimitBytes: 2048,
}
nonZeroCapacity := capacityRangeFromGRPC(swarmNonZeroCapacity)
assert.Assert(t, nonZeroCapacity != nil)
assert.Equal(t, nonZeroCapacity.RequiredBytes, int64(1024))
assert.Equal(t, nonZeroCapacity.LimitBytes, int64(2048))
}
func TestVolumeAvailabilityFromGRPC(t *testing.T) {
for _, tc := range []struct {
name string
in swarmapi.VolumeSpec_VolumeAvailability
expected volumetypes.Availability
}{
{
name: "Active",
in: swarmapi.VolumeAvailabilityActive,
expected: volumetypes.AvailabilityActive,
}, {
name: "Pause",
in: swarmapi.VolumeAvailabilityPause,
expected: volumetypes.AvailabilityPause,
}, {
name: "Drain",
in: swarmapi.VolumeAvailabilityDrain,
expected: volumetypes.AvailabilityDrain,
},
} {
tc := tc
t.Run(tc.name, func(t *testing.T) {
actual := volumeAvailabilityFromGRPC(tc.in)
assert.Equal(t, actual, tc.expected)
})
}
}
// TestAccessModeFromGRPC tests that the AccessMode type is correctly converted
func TestAccessModeFromGRPC(t *testing.T) {
for _, tc := range []struct {
name string
in *swarmapi.VolumeAccessMode
expected *volumetypes.AccessMode
}{
{
name: "MountVolume",
in: &swarmapi.VolumeAccessMode{
Scope: swarmapi.VolumeScopeSingleNode,
Sharing: swarmapi.VolumeSharingNone,
AccessType: &swarmapi.VolumeAccessMode_Mount{
Mount: &swarmapi.VolumeAccessMode_MountVolume{
FsType: "foo",
// TODO(dperny): maybe don't convert this?
MountFlags: []string{"one", "two"},
},
},
},
expected: &volumetypes.AccessMode{
Scope: volumetypes.ScopeSingleNode,
Sharing: volumetypes.SharingNone,
MountVolume: &volumetypes.TypeMount{
FsType: "foo",
MountFlags: []string{"one", "two"},
},
},
}, {
name: "BlockVolume",
in: &swarmapi.VolumeAccessMode{
Scope: swarmapi.VolumeScopeSingleNode,
Sharing: swarmapi.VolumeSharingNone,
AccessType: &swarmapi.VolumeAccessMode_Block{
Block: &swarmapi.VolumeAccessMode_BlockVolume{},
},
},
expected: &volumetypes.AccessMode{
Scope: volumetypes.ScopeSingleNode,
Sharing: volumetypes.SharingNone,
BlockVolume: &volumetypes.TypeBlock{},
},
},
} {
tc := tc
t.Run(tc.name, func(t *testing.T) {
out := accessModeFromGRPC(tc.in)
assert.DeepEqual(t, tc.expected, out)
})
}
}
// TestVolumeCreateToGRPC tests that a docker-typed VolumeCreateBody is
// correctly converted to a swarm-typed VolumeSpec.
func TestVolumeCreateToGRPC(t *testing.T) {
volume := &volumetypes.CreateOptions{
Driver: "plug1",
DriverOpts: map[string]string{"options": "yeah"},
Labels: map[string]string{"labeled": "yeah"},
Name: "volume1",
}
spec := &volumetypes.ClusterVolumeSpec{
Group: "gronp",
AccessMode: &volumetypes.AccessMode{
Scope: volumetypes.ScopeMultiNode,
Sharing: volumetypes.SharingAll,
MountVolume: &volumetypes.TypeMount{
FsType: "foo",
MountFlags: []string{"one", "two"},
},
},
Secrets: []volumetypes.Secret{
{Key: "key1", Secret: "secret1"},
{Key: "key2", Secret: "secret2"},
},
AccessibilityRequirements: &volumetypes.TopologyRequirement{
Requisite: []volumetypes.Topology{
{Segments: map[string]string{"top1": "yup"}},
{Segments: map[string]string{"top2": "def"}},
{Segments: map[string]string{"top3": "nah"}},
},
Preferred: []volumetypes.Topology{},
},
CapacityRange: &volumetypes.CapacityRange{
RequiredBytes: 1,
LimitBytes: 0,
},
}
volume.ClusterVolumeSpec = spec
swarmSpec := VolumeCreateToGRPC(volume)
assert.Assert(t, swarmSpec != nil)
expectedSwarmSpec := &swarmapi.VolumeSpec{
Annotations: swarmapi.Annotations{
Name: "volume1",
Labels: map[string]string{
"labeled": "yeah",
},
},
Group: "gronp",
Driver: &swarmapi.Driver{
Name: "plug1",
Options: map[string]string{
"options": "yeah",
},
},
AccessMode: &swarmapi.VolumeAccessMode{
Scope: swarmapi.VolumeScopeMultiNode,
Sharing: swarmapi.VolumeSharingAll,
AccessType: &swarmapi.VolumeAccessMode_Mount{
Mount: &swarmapi.VolumeAccessMode_MountVolume{
FsType: "foo",
MountFlags: []string{"one", "two"},
},
},
},
Secrets: []*swarmapi.VolumeSecret{
{Key: "key1", Secret: "secret1"},
{Key: "key2", Secret: "secret2"},
},
AccessibilityRequirements: &swarmapi.TopologyRequirement{
Requisite: []*swarmapi.Topology{
{Segments: map[string]string{"top1": "yup"}},
{Segments: map[string]string{"top2": "def"}},
{Segments: map[string]string{"top3": "nah"}},
},
Preferred: nil,
},
CapacityRange: &swarmapi.CapacityRange{
RequiredBytes: 1,
LimitBytes: 0,
},
}
assert.DeepEqual(t, swarmSpec, expectedSwarmSpec)
}

View file

@ -288,7 +288,7 @@ func (c *containerAdapter) create(ctx context.Context) error {
if cr, err = c.backend.CreateManagedContainer(types.ContainerCreateConfig{ if cr, err = c.backend.CreateManagedContainer(types.ContainerCreateConfig{
Name: c.container.name(), Name: c.container.name(),
Config: c.container.config(), Config: c.container.config(),
HostConfig: c.container.hostConfig(), HostConfig: c.container.hostConfig(c.dependencies.Volumes()),
// Use the first network in container create // Use the first network in container create
NetworkingConfig: c.container.createNetworkingConfig(c.backend), NetworkingConfig: c.container.createNetworkingConfig(c.backend),
}); err != nil { }); err != nil {
@ -461,6 +461,30 @@ func (c *containerAdapter) createVolumes(ctx context.Context) error {
return nil return nil
} }
// waitClusterVolumes blocks until the VolumeGetter returns a path for each
// cluster volume in use by this task
func (c *containerAdapter) waitClusterVolumes(ctx context.Context) error {
for _, attached := range c.container.task.Volumes {
// for every attachment, try until we succeed or until the context
// is canceled.
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
// continue through the code.
}
path, err := c.dependencies.Volumes().Get(attached.ID)
if err == nil && path != "" {
// break out of the inner-most loop
break
}
}
}
log.G(ctx).Debug("volumes ready")
return nil
}
func (c *containerAdapter) activateServiceBinding() error { func (c *containerAdapter) activateServiceBinding() error {
return c.backend.ActivateContainerServiceBinding(c.container.name()) return c.backend.ActivateContainerServiceBinding(c.container.name())
} }

View file

@ -254,14 +254,44 @@ func (c *containerConfig) labels() map[string]string {
return labels return labels
} }
func (c *containerConfig) mounts() []enginemount.Mount { func (c *containerConfig) mounts(deps exec.VolumeGetter) []enginemount.Mount {
var r []enginemount.Mount var r []enginemount.Mount
for _, mount := range c.spec().Mounts { for _, mount := range c.spec().Mounts {
r = append(r, convertMount(mount)) if mount.Type == api.MountTypeCSI {
r = append(r, c.convertCSIMount(mount, deps))
} else {
r = append(r, convertMount(mount))
}
} }
return r return r
} }
// convertCSIMount matches the CSI mount with the path of the CSI volume.
//
// technically quadratic with respect to the number of CSI mounts, but that
// number shouldn't ever be large enough for quadratic to matter.
//
// TODO(dperny): figure out a scheme for errors? or maybe add code to
// checkMounts?
func (c *containerConfig) convertCSIMount(m api.Mount, deps exec.VolumeGetter) enginemount.Mount {
var mount enginemount.Mount
// these are actually bind mounts
mount.Type = enginemount.TypeBind
for _, attach := range c.task.Volumes {
if attach.Source == m.Source && attach.Target == m.Target {
// we should not get an error here, because we should have checked
// already that the volume is ready
path, _ := deps.Get(attach.ID)
mount.Source = path
mount.Target = m.Target
}
}
return mount
}
func convertMount(m api.Mount) enginemount.Mount { func convertMount(m api.Mount) enginemount.Mount {
mount := enginemount.Mount{ mount := enginemount.Mount{
Source: m.Source, Source: m.Source,
@ -278,6 +308,8 @@ func convertMount(m api.Mount) enginemount.Mount {
mount.Type = enginemount.TypeTmpfs mount.Type = enginemount.TypeTmpfs
case api.MountTypeNamedPipe: case api.MountTypeNamedPipe:
mount.Type = enginemount.TypeNamedPipe mount.Type = enginemount.TypeNamedPipe
case api.MountTypeCSI:
mount.Type = enginemount.TypeCluster
} }
if m.BindOptions != nil { if m.BindOptions != nil {
@ -350,12 +382,12 @@ func (c *containerConfig) healthcheck() *enginecontainer.HealthConfig {
} }
} }
func (c *containerConfig) hostConfig() *enginecontainer.HostConfig { func (c *containerConfig) hostConfig(deps exec.VolumeGetter) *enginecontainer.HostConfig {
hc := &enginecontainer.HostConfig{ hc := &enginecontainer.HostConfig{
Resources: c.resources(), Resources: c.resources(),
GroupAdd: c.spec().Groups, GroupAdd: c.spec().Groups,
PortBindings: c.portBindings(), PortBindings: c.portBindings(),
Mounts: c.mounts(), Mounts: c.mounts(deps),
ReadonlyRootfs: c.spec().ReadOnly, ReadonlyRootfs: c.spec().ReadOnly,
Isolation: c.isolation(), Isolation: c.isolation(),
Init: c.init(), Init: c.init(),

View file

@ -31,7 +31,10 @@ func TestIsolationConversion(t *testing.T) {
}, },
} }
config := containerConfig{task: &task} config := containerConfig{task: &task}
assert.Equal(t, c.to, config.hostConfig().Isolation) // NOTE(dperny): you shouldn't ever pass nil outside of testing,
// because if there are CSI volumes, the code will panic. However,
// in testing. this is acceptable.
assert.Equal(t, c.to, config.hostConfig(nil).Isolation)
}) })
} }
} }
@ -129,7 +132,10 @@ func TestCredentialSpecConversion(t *testing.T) {
}, },
} }
config := containerConfig{task: &task} config := containerConfig{task: &task}
assert.DeepEqual(t, c.to, config.hostConfig().SecurityOpt) // NOTE(dperny): you shouldn't ever pass nil outside of testing,
// because if there are CSI volumes, the code will panic. However,
// in testing. this is acceptable.
assert.DeepEqual(t, c.to, config.hostConfig(nil).SecurityOpt)
}) })
} }
} }

View file

@ -121,6 +121,15 @@ func (r *controller) Prepare(ctx context.Context) error {
return err return err
} }
// could take a while for the cluster volumes to become available. set for
// 5 minutes, I guess?
// TODO(dperny): do this more intelligently. return a better error.
waitClusterVolumesCtx, wcvcancel := context.WithTimeout(ctx, 5*time.Minute)
defer wcvcancel()
if err := r.adapter.waitClusterVolumes(waitClusterVolumesCtx); err != nil {
return err
}
// Make sure all the networks that the task needs are created. // Make sure all the networks that the task needs are created.
if err := r.adapter.createNetworks(ctx); err != nil { if err := r.adapter.createNetworks(ctx); err != nil {
return err return err

View file

@ -122,6 +122,9 @@ func (e *executor) Describe(ctx context.Context) (*api.NodeDescription, error) {
} }
} }
// TODO(dperny): don't ignore the error here
csiInfo, _ := e.Volumes().Plugins().NodeInfo(ctx)
description := &api.NodeDescription{ description := &api.NodeDescription{
Hostname: info.Name, Hostname: info.Name,
Platform: &api.Platform{ Platform: &api.Platform{
@ -138,6 +141,7 @@ func (e *executor) Describe(ctx context.Context) (*api.NodeDescription, error) {
MemoryBytes: info.MemTotal, MemoryBytes: info.MemTotal,
Generic: convert.GenericResourcesToGRPC(info.GenericResources), Generic: convert.GenericResourcesToGRPC(info.GenericResources),
}, },
CSIInfo: csiInfo,
} }
// Save the node information in the executor field // Save the node information in the executor field
@ -356,6 +360,10 @@ func (e *executor) Configs() exec.ConfigsManager {
return e.dependencies.Configs() return e.dependencies.Configs()
} }
func (e *executor) Volumes() exec.VolumesManager {
return e.dependencies.Volumes()
}
type sortedPlugins []api.PluginDescription type sortedPlugins []api.PluginDescription
func (sp sortedPlugins) Len() int { return len(sp) } func (sp sortedPlugins) Len() int { return len(sp) }

View file

@ -37,6 +37,8 @@ func validateMounts(mounts []api.Mount) error {
if mount.Source == "" { if mount.Source == "" {
return errors.New("invalid npipe source, source must not be empty") return errors.New("invalid npipe source, source must not be empty")
} }
case api.MountTypeCSI:
// nothing to do here.
default: default:
return fmt.Errorf("invalid mount type: %s", mount.Type) return fmt.Errorf("invalid mount type: %s", mount.Type)
} }

View file

@ -244,3 +244,39 @@ func getNetwork(ctx context.Context, c swarmapi.ControlClient, input string) (*s
return rl.Networks[0], nil return rl.Networks[0], nil
} }
func getVolume(ctx context.Context, c swarmapi.ControlClient, input string) (*swarmapi.Volume, error) {
// GetVolume to match via full ID
if v, err := c.GetVolume(ctx, &swarmapi.GetVolumeRequest{VolumeID: input}); err == nil {
return v.Volume, nil
}
// If any error (including NotFound), list volumes to match via ID prefix
// and full name
resp, err := c.ListVolumes(ctx, &swarmapi.ListVolumesRequest{
Filters: &swarmapi.ListVolumesRequest_Filters{
Names: []string{input},
},
})
if err != nil || len(resp.Volumes) == 0 {
resp, err = c.ListVolumes(ctx, &swarmapi.ListVolumesRequest{
Filters: &swarmapi.ListVolumesRequest_Filters{
IDPrefixes: []string{input},
},
})
}
if err != nil {
return nil, err
}
if len(resp.Volumes) == 0 {
return nil, errdefs.NotFound(fmt.Errorf("volume %s not found", input))
}
if l := len(resp.Volumes); l > 1 {
return nil, errdefs.InvalidParameter(fmt.Errorf("volume %s is ambiguous (%d matches found)", input, l))
}
return resp.Volumes[0], nil
}

140
daemon/cluster/volumes.go Normal file
View file

@ -0,0 +1,140 @@
package cluster // import "github.com/docker/docker/daemon/cluster"
import (
"context"
"fmt"
volumetypes "github.com/docker/docker/api/types/volume"
"github.com/docker/docker/daemon/cluster/convert"
"github.com/docker/docker/errdefs"
swarmapi "github.com/moby/swarmkit/v2/api"
"google.golang.org/grpc"
)
// GetVolume returns a volume from the swarm cluster.
func (c *Cluster) GetVolume(nameOrID string) (volumetypes.Volume, error) {
var volume *swarmapi.Volume
if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
v, err := getVolume(ctx, state.controlClient, nameOrID)
if err != nil {
return err
}
volume = v
return nil
}); err != nil {
return volumetypes.Volume{}, err
}
return convert.VolumeFromGRPC(volume), nil
}
// GetVolumes returns all of the volumes matching the given options from a swarm cluster.
func (c *Cluster) GetVolumes(options volumetypes.ListOptions) ([]*volumetypes.Volume, error) {
var volumes []*volumetypes.Volume
if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
r, err := state.controlClient.ListVolumes(
ctx, &swarmapi.ListVolumesRequest{},
grpc.MaxCallRecvMsgSize(defaultRecvSizeForListResponse),
)
if err != nil {
return err
}
volumes = make([]*volumetypes.Volume, 0, len(r.Volumes))
for _, volume := range r.Volumes {
v := convert.VolumeFromGRPC(volume)
volumes = append(volumes, &v)
}
return nil
}); err != nil {
return nil, err
}
return volumes, nil
}
// CreateVolume creates a new cluster volume in the swarm cluster.
//
// Returns the volume ID if creation is successful, or an error if not.
func (c *Cluster) CreateVolume(v volumetypes.CreateOptions) (*volumetypes.Volume, error) {
var resp *swarmapi.CreateVolumeResponse
if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
volumeSpec := convert.VolumeCreateToGRPC(&v)
r, err := state.controlClient.CreateVolume(
ctx, &swarmapi.CreateVolumeRequest{Spec: volumeSpec},
)
if err != nil {
return err
}
resp = r
return nil
}); err != nil {
return nil, err
}
createdVol, err := c.GetVolume(resp.Volume.ID)
if err != nil {
// If there's a failure of some sort in this operation the user would
// get a very unhelpful "not found" error on a create, which is not
// very helpful at all. Instead, before returning the error, add some
// context, and change this to a system-type error, because it's
// nothing the user did wrong.
return nil, errdefs.System(fmt.Errorf("unable to retrieve created volume: %w", err))
}
return &createdVol, nil
}
// RemoveVolume removes a volume from the swarm cluster.
func (c *Cluster) RemoveVolume(nameOrID string, force bool) error {
return c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
volume, err := getVolume(ctx, state.controlClient, nameOrID)
if err != nil {
return err
}
req := &swarmapi.RemoveVolumeRequest{
VolumeID: volume.ID,
Force: force,
}
_, err = state.controlClient.RemoveVolume(ctx, req)
return err
})
}
// UpdateVolume updates a volume in the swarm cluster.
func (c *Cluster) UpdateVolume(nameOrID string, version uint64, volume volumetypes.UpdateOptions) error {
return c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
v, err := getVolume(ctx, state.controlClient, nameOrID)
if err != nil {
return err
}
// For now, the only thing we can update is availability. Instead of
// converting the whole spec, just pluck out the availability if it has
// been set.
if volume.Spec != nil {
switch volume.Spec.Availability {
case volumetypes.AvailabilityActive:
v.Spec.Availability = swarmapi.VolumeAvailabilityActive
case volumetypes.AvailabilityPause:
v.Spec.Availability = swarmapi.VolumeAvailabilityPause
case volumetypes.AvailabilityDrain:
v.Spec.Availability = swarmapi.VolumeAvailabilityDrain
}
// if default empty value, change nothing.
}
_, err = state.controlClient.UpdateVolume(
ctx, &swarmapi.UpdateVolumeRequest{
VolumeID: nameOrID,
VolumeVersion: &swarmapi.Version{
Index: version,
},
Spec: &v.Spec,
},
)
return err
})
}

View file

@ -77,6 +77,16 @@ keywords: "API, Docker, rcli, REST, documentation"
`GET /services/{id}/logs` and `GET /tasks/{id}/logs` now set Content-Type header `GET /services/{id}/logs` and `GET /tasks/{id}/logs` now set Content-Type header
to `application/vnd.docker.multiplexed-stream` when a multiplexed stdout/stderr to `application/vnd.docker.multiplexed-stream` when a multiplexed stdout/stderr
stream is sent to client, `application/vnd.docker.raw-stream` otherwise. stream is sent to client, `application/vnd.docker.raw-stream` otherwise.
* `POST /volumes/create` now accepts a new `ClusterVolumeSpec` to create a cluster
volume (CNI). This option can only be used if the daemon is a Swarm manager.
The Volume response on creation now also can contain a `ClusterVolume` field
with information about the created volume.
* Volume information returned by `GET /volumes/{name}`, `GET /volumes` and
`GET /system/df` can now contain a `ClusterVolume` if the volume is a cluster
volume (requires the daemon to be a Swarm manager).
* The `Volume` type, as returned by `Added new `ClusterVolume` fields
* Added a new `PUT /volumes{name}` endpoint to update cluster volumes (CNI).
Cluster volumes are only supported if the daemon is a Swarm manager.
## v1.41 API changes ## v1.41 API changes

210
docs/cluster_volumes.md Normal file
View file

@ -0,0 +1,210 @@
Cluster Volumes
===============
Docker Cluster Volumes is a new feature which allows using CSI plugins to
create cluster-aware volumes
## Installing a CSI plugin
CSI, the Container Storage Interface, defines an API for storage providers to
write storage plugins which are cross-compatible between various container
orchestrators. However, most CSI plugins are shipped with configuration
specific to Kubernetes. Docker CSI Plugins use the same binaries as those for
Kubernetes, but in a different environment and sometimes with different
configuration.
If a plugin is already adapted for and available for Docker, it can be
installed through the `docker plugin install` command. Though such plugins may
require configuration specific to the user's environment, they will ultimately
be detected by and work automatically with Docker once enabled.
Currently, there is no way to automatically deploy a Docker Plugin across all
nodes in a cluster. Therefore, users must ensure the Docker Plugin is installed
on all nodes in the cluster on which it is desired.
Docker Swarm worker nodes report their active plugins to the Docker Swarm
managers, and so it is not necessary to install a plugin on every worker node
if this is not desired. However, the plugin must be installed on every manager
node, or a leadership change could result in Docker Swarm no longer having the
ability to call the plugin.
### Creating a Docker CSI Plugin
Before following this section, readers should ensure they are acquainted with
the
[Docker Engine managed plugin system](https://docs.docker.com/engine/extend/).
Docker CSI plugins use this system to run.
Docker CSI plugins are identified with a special interface type. There are two
related interfaces that CSI plugins can expose. In the `config.json`, this
should be set as such.
```json
"interface": {
"types": ["docker.csicontroller/1.0","docker.csinode/1.0"]
}
```
Additionally, the CSI specification states that CSI plugins should have
`CAP_SYS_ADMIN` privileges, so this should be set in the `config.json` as
well:
```json
"linux" : {
"capabilities": ["CAP_SYS_ADMIN"]
}
```
Other configuration is largely specific to the CSI plugin.
#### Split-Component Plugins
For split-component plugins, users can specify either the
`docker.csicontroller/1.0` or `docker.csinode/1.0` plugin interfaces. Manager
nodes should run plugin instances with the `docker.csicontroller/1.0`
interface, and worker nodes the `docker.csinode/1.0` interface.
Docker does support running two plugins with the same name, nor does it support
specifying different drivers for the node and controller plugins. This means in
a fully split plugin, Swarm will be unable to schedule volumes to manager
nodes.
If it is desired to run a split-component plugin such that the Volumes managed
by that plugin are accessible to Tasks on the manager node, the user will need
to build the plugin such that some proxy or multiplexer provides the illusion
of combined components to the manager through one socket, and ensure the plugin
reports both interface types.
## Using Cluster Volumes
### Create a Cluster Volume
Creating a Cluster Volume is done with the same `docker volume` commands as any
other Volume. To create a Cluster Volume, one needs to do both of things:
* Specify a CSI-capable driver with the `--driver` or `-d` option.
* Use any one of the cluster-specific `docker volume create` flags.
For example, to create a Cluster Volume called `my-volume` with the
`democratic-csi` Volume Driver, one might use this command:
```bash
docker volume create \
--driver democratic-csi \
--type mount \
--sharing all \
--scope multi \
--limit-bytes 10G \
--required-bytes 1G \
my-volume
```
### List Cluster Volumes
Cluster Volumes will be listed along with other volumes when doing
`docker volume ls`. However, if users want to see only Cluster Volumes, and
with cluster-specific information, the flag `--cluster` can be specified:
```
$ docker volume ls --cluster
VOLUME NAME GROUP DRIVER AVAILABILITY STATUS
volume1 group1 driver1 active pending creation
volume2 group1 driver1 pause created
volume3 group2 driver2 active in use (1 node)
volume4 group2 driver2 active in use (2 nodes)
```
### Deploying a Service
Cluster Volumes are only compatible with Docker Services, not plain Docker
Containers.
In Docker Services, a Cluster Volume is used the same way any other volume
would be used. The `type` should be set to `csi`. For example, to create a
Service that uses `my-volume` created above, one would execute a command like:
```bash
docker service create \
--name my-service \
--mount type=csi,src=my-volume,dst=/srv/www \
nginx:alpine
```
When scheduling Services which use Cluster Volumes, Docker Swarm uses the
volume's information and state to make decisions about Task placement.
For example, the Service will be constrained to run only on nodes on which the
volume is available. If the volume is configured with `scope=single`, meaning
it can only be used on one node in the cluster at a time, then all Tasks for
that Service will be scheduled to that same node. If that node changes for some
reason, like a node failure, then the Tasks will be rescheduled to the new
node automatically, without user input.
If the Cluster Volume is accessible only on some set of nodes at the same time,
and not the whole cluster, then Docker Swarm will only schedule the Service to
those nodes as reported by the plugin.
### Using Volume Groups
It is frequently desirable that a Service use any available volume out of an
interchangeable set. To accomplish this in the most simple and straightforward
manner possible, Cluster Volumes use the concept of a volume "Group".
The Volume Group is a field, somewhat like a special label, which is used to
instruct Swarm that a given volume is interchangeable with every other volume
of the same Group. When creating a Cluster Volume, the Group can be specified
by using the `--group` flag.
To use a Cluster Volume by Group instead of by Name, the mount `src` option is
prefixed with `group:`, followed by the group name. For example:
```
--mount type=csi,src=group:my-group,dst=/srv/www
```
This instructs Docker Swarm that any Volume with the Group `my-group` can be
used to satisfy the mounts.
Volumes in a Group do not need to be identical, but they must be
interchangeable. These caveats should be kept in mind when using Groups:
* No Service ever gets the monopoly on a Cluster Volume. If several Services
use the same Group, then the Cluster Volumes in that Group can be used with
any of those Services at any time. Just because a particular Volume was used
by a particular Service at one point does not mean it won't be used by a
different Service later.
* Volumes in a group can have different configurations, but all of those
configurations must be compatible with the Service. For example, if some of
the Volumes in a group have `sharing=readonly`, then the Service must be
capable of using the volume in read-only mode.
* Volumes in a Group are created statically ahead of time, not dynamically
as-needed. This means that the user must ensure a sufficient number of
Volumes belong to the desired Group to support the needs of the Service.
### Taking Cluster Volumes Offline
For various reasons, users may wish to take a particular Cluster Volume
offline, such that is not actively used by Services. To facilitate this,
Cluster Volumes have an `availability` option similar to Docker Swarm nodes.
Cluster Volume availability can be one of three states:
* `active` - Default. Volume can be used as normal.
* `pause` - The volume will not be used for new Services, but existing Tasks
using the volume will not be stopped.
* `drain` - The volume will not be used for new Services, and any running Tasks
using the volume will be stopped and rescheduled.
A Volume can only be removed from the cluster entirely if its availability is
set to `drain`, and it has been fully unpublished from all nodes.
#### Force-Removing Volumes
There are cases where a Volume can get caught in a state where Swarm cannot
verify their removal. In these cases,
## Unsupported Features
The CSI Spec allows for a large number of features which Cluster Volumes in
this initial implementation do not support. Most notably, Cluster Volumes do
not support snapshots, cloning, or volume expansion.

View file

@ -211,7 +211,7 @@ func (pm *Manager) reload() error { // todo: restore
// We should only enable rootfs propagation for certain plugin types that need it. // We should only enable rootfs propagation for certain plugin types that need it.
for _, typ := range p.PluginObj.Config.Interface.Types { for _, typ := range p.PluginObj.Config.Interface.Types {
if (typ.Capability == "volumedriver" || typ.Capability == "graphdriver") && typ.Prefix == "docker" && strings.HasPrefix(typ.Version, "1.") { if (typ.Capability == "volumedriver" || typ.Capability == "graphdriver" || typ.Capability == "csinode" || typ.Capability == "csicontroller") && typ.Prefix == "docker" && strings.HasPrefix(typ.Version, "1.") {
if p.PluginObj.Config.PropagatedMount != "" { if p.PluginObj.Config.PropagatedMount != "" {
propRoot := filepath.Join(filepath.Dir(p.Rootfs), "propagated-mount") propRoot := filepath.Join(filepath.Dir(p.Rootfs), "propagated-mount")