Extract volume interaction to a volumes service

This cleans up some of the package API's used for interacting with
volumes, and simplifies management.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
This commit is contained in:
Brian Goff 2018-03-22 17:11:03 -04:00
parent 9c2c887b12
commit e4b6adc88e
50 changed files with 1533 additions and 639 deletions

View file

@ -3,6 +3,7 @@ package volume // import "github.com/docker/docker/api/server/router/volume"
import (
"context"
"github.com/docker/docker/volume/service/opts"
// TODO return types need to be refactored into pkg
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
@ -11,9 +12,9 @@ import (
// Backend is the methods that need to be implemented to provide
// volume specific functionality
type Backend interface {
Volumes(filter string) ([]*types.Volume, []string, error)
VolumeInspect(name string) (*types.Volume, error)
VolumeCreate(name, driverName string, opts, labels map[string]string) (*types.Volume, error)
VolumeRm(name string, force bool) error
VolumesPrune(ctx context.Context, pruneFilters filters.Args) (*types.VolumesPruneReport, error)
List(ctx context.Context, filter filters.Args) ([]*types.Volume, []string, error)
Get(ctx context.Context, name string, opts ...opts.GetOption) (*types.Volume, error)
Create(ctx context.Context, name, driverName string, opts ...opts.CreateOption) (*types.Volume, error)
Remove(ctx context.Context, name string, opts ...opts.RemoveOption) error
Prune(ctx context.Context, pruneFilters filters.Args) (*types.VolumesPruneReport, error)
}

View file

@ -3,7 +3,6 @@ package volume // import "github.com/docker/docker/api/server/router/volume"
import (
"context"
"encoding/json"
"errors"
"io"
"net/http"
@ -11,6 +10,8 @@ import (
"github.com/docker/docker/api/types/filters"
volumetypes "github.com/docker/docker/api/types/volume"
"github.com/docker/docker/errdefs"
"github.com/docker/docker/volume/service/opts"
"github.com/pkg/errors"
)
func (v *volumeRouter) getVolumesList(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
@ -18,7 +19,11 @@ func (v *volumeRouter) getVolumesList(ctx context.Context, w http.ResponseWriter
return err
}
volumes, warnings, err := v.backend.Volumes(r.Form.Get("filters"))
filters, err := filters.FromJSON(r.Form.Get("filters"))
if err != nil {
return errdefs.InvalidParameter(errors.Wrap(err, "error reading volume filters"))
}
volumes, warnings, err := v.backend.List(ctx, filters)
if err != nil {
return err
}
@ -30,7 +35,7 @@ func (v *volumeRouter) getVolumeByName(ctx context.Context, w http.ResponseWrite
return err
}
volume, err := v.backend.VolumeInspect(vars["name"])
volume, err := v.backend.Get(ctx, vars["name"], opts.WithGetResolveStatus)
if err != nil {
return err
}
@ -54,7 +59,7 @@ func (v *volumeRouter) postVolumesCreate(ctx context.Context, w http.ResponseWri
return err
}
volume, err := v.backend.VolumeCreate(req.Name, req.Driver, req.DriverOpts, req.Labels)
volume, err := v.backend.Create(ctx, req.Name, req.Driver, opts.WithCreateOptions(req.DriverOpts), opts.WithCreateLabels(req.Labels))
if err != nil {
return err
}
@ -66,7 +71,7 @@ func (v *volumeRouter) deleteVolumes(ctx context.Context, w http.ResponseWriter,
return err
}
force := httputils.BoolValue(r, "force")
if err := v.backend.VolumeRm(vars["name"], force); err != nil {
if err := v.backend.Remove(ctx, vars["name"], opts.WithPurgeOnError(force)); err != nil {
return err
}
w.WriteHeader(http.StatusNoContent)
@ -83,7 +88,7 @@ func (v *volumeRouter) postVolumesPrune(ctx context.Context, w http.ResponseWrit
return err
}
pruneReport, err := v.backend.VolumesPrune(ctx, pruneFilters)
pruneReport, err := v.backend.Prune(ctx, pruneFilters)
if err != nil {
return err
}

View file

@ -453,7 +453,7 @@ func initRouter(opts routerOptions) {
container.NewRouter(opts.daemon, decoder),
image.NewRouter(opts.daemon.ImageService()),
systemrouter.NewRouter(opts.daemon, opts.cluster, opts.buildCache),
volume.NewRouter(opts.daemon),
volume.NewRouter(opts.daemon.VolumesService()),
build.NewRouter(opts.buildBackend, opts.daemon),
sessionrouter.NewRouter(opts.sessionManager),
swarmrouter.NewRouter(opts.cluster),
@ -595,6 +595,7 @@ func createAndStartCluster(cli *DaemonCli, d *daemon.Daemon) (*cluster.Cluster,
Root: cli.Config.Root,
Name: name,
Backend: d,
VolumeBackend: d.VolumesService(),
ImageBackend: d.ImageService(),
PluginBackend: d.PluginManager(),
NetworkSubnetsProvider: d,

View file

@ -127,7 +127,7 @@ func (container *Container) CopyImagePathContent(v volume.Volume, destination st
return err
}
if _, err = ioutil.ReadDir(rootfs); err != nil {
if _, err := os.Stat(rootfs); err != nil {
if os.IsNotExist(err) {
return nil
}

View file

@ -85,6 +85,7 @@ type Config struct {
Backend executorpkg.Backend
ImageBackend executorpkg.ImageBackend
PluginBackend plugin.Backend
VolumeBackend executorpkg.VolumeBackend
NetworkSubnetsProvider NetworkSubnetsProvider
// DefaultAdvertiseAddr is the default host/IP or network interface to use

View file

@ -18,6 +18,7 @@ import (
clustertypes "github.com/docker/docker/daemon/cluster/provider"
networkSettings "github.com/docker/docker/daemon/network"
"github.com/docker/docker/plugin"
volumeopts "github.com/docker/docker/volume/service/opts"
"github.com/docker/libnetwork"
"github.com/docker/libnetwork/cluster"
networktypes "github.com/docker/libnetwork/types"
@ -47,7 +48,6 @@ type Backend interface {
SetContainerSecretReferences(name string, refs []*swarmtypes.SecretReference) error
SetContainerConfigReferences(name string, refs []*swarmtypes.ConfigReference) error
SystemInfo() (*types.Info, error)
VolumeCreate(name, driverName string, opts, labels map[string]string) (*types.Volume, error)
Containers(config *types.ContainerListOptions) ([]*types.Container, error)
SetNetworkBootstrapKeys([]*networktypes.EncryptionKey) error
DaemonJoinsCluster(provider cluster.Provider)
@ -62,6 +62,11 @@ type Backend interface {
GetAttachmentStore() *networkSettings.AttachmentStore
}
// VolumeBackend is used by an executor to perform volume operations
type VolumeBackend interface {
Create(ctx context.Context, name, driverName string, opts ...volumeopts.CreateOption) (*types.Volume, error)
}
// ImageBackend is used by an executor to perform image operations
type ImageBackend interface {
PullImage(ctx context.Context, image, tag, platform string, metaHeaders map[string][]string, authConfig *types.AuthConfig, outStream io.Writer) error

View file

@ -22,6 +22,7 @@ import (
"github.com/docker/docker/daemon"
"github.com/docker/docker/daemon/cluster/convert"
executorpkg "github.com/docker/docker/daemon/cluster/executor"
volumeopts "github.com/docker/docker/volume/service/opts"
"github.com/docker/libnetwork"
"github.com/docker/swarmkit/agent/exec"
"github.com/docker/swarmkit/api"
@ -36,23 +37,25 @@ import (
// are mostly naked calls to the client API, seeded with information from
// containerConfig.
type containerAdapter struct {
backend executorpkg.Backend
imageBackend executorpkg.ImageBackend
container *containerConfig
dependencies exec.DependencyGetter
backend executorpkg.Backend
imageBackend executorpkg.ImageBackend
volumeBackend executorpkg.VolumeBackend
container *containerConfig
dependencies exec.DependencyGetter
}
func newContainerAdapter(b executorpkg.Backend, i executorpkg.ImageBackend, task *api.Task, node *api.NodeDescription, dependencies exec.DependencyGetter) (*containerAdapter, error) {
func newContainerAdapter(b executorpkg.Backend, i executorpkg.ImageBackend, v executorpkg.VolumeBackend, task *api.Task, node *api.NodeDescription, dependencies exec.DependencyGetter) (*containerAdapter, error) {
ctnr, err := newContainerConfig(task, node)
if err != nil {
return nil, err
}
return &containerAdapter{
container: ctnr,
backend: b,
imageBackend: i,
dependencies: dependencies,
container: ctnr,
backend: b,
imageBackend: i,
volumeBackend: v,
dependencies: dependencies,
}, nil
}
@ -388,7 +391,10 @@ func (c *containerAdapter) createVolumes(ctx context.Context) error {
req := c.container.volumeCreateRequest(&mount)
// Check if this volume exists on the engine
if _, err := c.backend.VolumeCreate(req.Name, req.Driver, req.DriverOpts, req.Labels); err != nil {
if _, err := c.volumeBackend.Create(ctx, req.Name, req.Driver,
volumeopts.WithCreateOptions(req.DriverOpts),
volumeopts.WithCreateLabels(req.Labels),
); err != nil {
// TODO(amitshukla): Today, volume create through the engine api does not return an error
// when the named volume with the same parameters already exists.
// It returns an error if the driver name is different - that is a valid error

View file

@ -21,8 +21,8 @@ type networkAttacherController struct {
closed chan struct{}
}
func newNetworkAttacherController(b executorpkg.Backend, i executorpkg.ImageBackend, task *api.Task, node *api.NodeDescription, dependencies exec.DependencyGetter) (*networkAttacherController, error) {
adapter, err := newContainerAdapter(b, i, task, node, dependencies)
func newNetworkAttacherController(b executorpkg.Backend, i executorpkg.ImageBackend, v executorpkg.VolumeBackend, task *api.Task, node *api.NodeDescription, dependencies exec.DependencyGetter) (*networkAttacherController, error) {
adapter, err := newContainerAdapter(b, i, v, task, node, dependencies)
if err != nil {
return nil, err
}

View file

@ -40,8 +40,8 @@ type controller struct {
var _ exec.Controller = &controller{}
// NewController returns a docker exec runner for the provided task.
func newController(b executorpkg.Backend, i executorpkg.ImageBackend, task *api.Task, node *api.NodeDescription, dependencies exec.DependencyGetter) (*controller, error) {
adapter, err := newContainerAdapter(b, i, task, node, dependencies)
func newController(b executorpkg.Backend, i executorpkg.ImageBackend, v executorpkg.VolumeBackend, task *api.Task, node *api.NodeDescription, dependencies exec.DependencyGetter) (*controller, error) {
adapter, err := newContainerAdapter(b, i, v, task, node, dependencies)
if err != nil {
return nil, err
}

View file

@ -28,17 +28,19 @@ type executor struct {
backend executorpkg.Backend
imageBackend executorpkg.ImageBackend
pluginBackend plugin.Backend
volumeBackend executorpkg.VolumeBackend
dependencies exec.DependencyManager
mutex sync.Mutex // This mutex protects the following node field
node *api.NodeDescription
}
// NewExecutor returns an executor from the docker client.
func NewExecutor(b executorpkg.Backend, p plugin.Backend, i executorpkg.ImageBackend) exec.Executor {
func NewExecutor(b executorpkg.Backend, p plugin.Backend, i executorpkg.ImageBackend, v executorpkg.VolumeBackend) exec.Executor {
return &executor{
backend: b,
pluginBackend: p,
imageBackend: i,
volumeBackend: v,
dependencies: agent.NewDependencyManager(),
}
}
@ -211,7 +213,7 @@ func (e *executor) Controller(t *api.Task) (exec.Controller, error) {
e.mutex.Unlock()
if t.Spec.GetAttachment() != nil {
return newNetworkAttacherController(e.backend, e.imageBackend, t, nodeDescription, dependencyGetter)
return newNetworkAttacherController(e.backend, e.imageBackend, e.volumeBackend, t, nodeDescription, dependencyGetter)
}
var ctlr exec.Controller
@ -240,7 +242,7 @@ func (e *executor) Controller(t *api.Task) (exec.Controller, error) {
return ctlr, fmt.Errorf("unsupported runtime type: %q", runtimeKind)
}
case *api.TaskSpec_Container:
c, err := newController(e.backend, e.imageBackend, t, nodeDescription, dependencyGetter)
c, err := newController(e.backend, e.imageBackend, e.volumeBackend, t, nodeDescription, dependencyGetter)
if err != nil {
return ctlr, err
}

View file

@ -52,7 +52,7 @@ func TestHealthStates(t *testing.T) {
EventsService: e,
}
controller, err := newController(daemon, nil, task, nil, nil)
controller, err := newController(daemon, nil, nil, task, nil, nil)
if err != nil {
t.Fatalf("create controller fail %v", err)
}

View file

@ -12,7 +12,7 @@ import (
)
func newTestControllerWithMount(m api.Mount) (*controller, error) {
return newController(&daemon.Daemon{}, nil, &api.Task{
return newController(&daemon.Daemon{}, nil, nil, &api.Task{
ID: stringid.GenerateRandomID(),
ServiceID: stringid.GenerateRandomID(),
Spec: api.TaskSpec{

View file

@ -123,7 +123,9 @@ func (n *nodeRunner) start(conf nodeStartConfig) error {
Executor: container.NewExecutor(
n.cluster.config.Backend,
n.cluster.config.PluginBackend,
n.cluster.config.ImageBackend),
n.cluster.config.ImageBackend,
n.cluster.config.VolumeBackend,
),
HeartbeatTick: n.cluster.config.RaftHeartbeatTick,
// Recommended value in etcd/raft is 10 x (HeartbeatTick).
// Lower values were seen to have caused instability because of

View file

@ -7,8 +7,6 @@ import (
"strings"
"time"
"github.com/pkg/errors"
"github.com/docker/docker/api/types"
containertypes "github.com/docker/docker/api/types/container"
networktypes "github.com/docker/docker/api/types/network"
@ -16,10 +14,10 @@ import (
"github.com/docker/docker/errdefs"
"github.com/docker/docker/image"
"github.com/docker/docker/pkg/idtools"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/docker/pkg/system"
"github.com/docker/docker/runconfig"
"github.com/opencontainers/selinux/go-selinux/label"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
@ -255,24 +253,6 @@ func (daemon *Daemon) generateSecurityOpt(hostConfig *containertypes.HostConfig)
return nil, nil
}
// VolumeCreate creates a volume with the specified name, driver, and opts
// This is called directly from the Engine API
func (daemon *Daemon) VolumeCreate(name, driverName string, opts, labels map[string]string) (*types.Volume, error) {
if name == "" {
name = stringid.GenerateNonCryptoID()
}
v, err := daemon.volumes.Create(name, driverName, opts, labels)
if err != nil {
return nil, err
}
daemon.LogVolumeEvent(v.Name(), "create", map[string]string{"driver": v.DriverName()})
apiV := volumeToAPIType(v)
apiV.Mountpoint = v.Path()
return apiV, nil
}
func (daemon *Daemon) mergeAndVerifyConfig(config *containertypes.Config, img *image.Image) error {
if img != nil && img.Config != nil {
if err := merge(config, img.Config); err != nil {

View file

@ -3,6 +3,7 @@
package daemon // import "github.com/docker/docker/daemon"
import (
"context"
"fmt"
"os"
"path/filepath"
@ -11,6 +12,7 @@ import (
mounttypes "github.com/docker/docker/api/types/mount"
"github.com/docker/docker/container"
"github.com/docker/docker/pkg/stringid"
volumeopts "github.com/docker/docker/volume/service/opts"
"github.com/opencontainers/selinux/go-selinux/label"
"github.com/sirupsen/logrus"
)
@ -46,16 +48,16 @@ func (daemon *Daemon) createContainerOSSpecificSettings(container *container.Con
return fmt.Errorf("cannot mount volume over existing file, file exists %s", path)
}
v, err := daemon.volumes.CreateWithRef(name, hostConfig.VolumeDriver, container.ID, nil, nil)
v, err := daemon.volumes.Create(context.TODO(), name, hostConfig.VolumeDriver, volumeopts.WithCreateReference(container.ID))
if err != nil {
return err
}
if err := label.Relabel(v.Path(), container.MountLabel, true); err != nil {
if err := label.Relabel(v.Mountpoint, container.MountLabel, true); err != nil {
return err
}
container.AddMountPointWithVolume(destination, v, true)
container.AddMountPointWithVolume(destination, &volumeWrapper{v: v, s: daemon.volumes}, true)
}
return daemon.populateVolumes(container)
}

View file

@ -1,6 +1,7 @@
package daemon // import "github.com/docker/docker/daemon"
import (
"context"
"fmt"
"runtime"
@ -8,6 +9,7 @@ import (
"github.com/docker/docker/container"
"github.com/docker/docker/pkg/stringid"
volumemounts "github.com/docker/docker/volume/mounts"
volumeopts "github.com/docker/docker/volume/service/opts"
)
// createContainerOSSpecificSettings performs host-OS specific container create functionality
@ -49,7 +51,7 @@ func (daemon *Daemon) createContainerOSSpecificSettings(container *container.Con
// Create the volume in the volume driver. If it doesn't exist,
// a new one will be created.
v, err := daemon.volumes.CreateWithRef(mp.Name, volumeDriver, container.ID, nil, nil)
v, err := daemon.volumes.Create(context.TODO(), mp.Name, volumeDriver, volumeopts.WithCreateReference(container.ID))
if err != nil {
return err
}
@ -85,7 +87,7 @@ func (daemon *Daemon) createContainerOSSpecificSettings(container *container.Con
// }
// Add it to container.MountPoints
container.AddMountPointWithVolume(mp.Destination, v, mp.RW)
container.AddMountPointWithVolume(mp.Destination, &volumeWrapper{v: v, s: daemon.volumes}, mp.RW)
}
return nil
}

View file

@ -52,9 +52,7 @@ import (
refstore "github.com/docker/docker/reference"
"github.com/docker/docker/registry"
"github.com/docker/docker/runconfig"
volumedrivers "github.com/docker/docker/volume/drivers"
"github.com/docker/docker/volume/local"
"github.com/docker/docker/volume/store"
volumesservice "github.com/docker/docker/volume/service"
"github.com/docker/libnetwork"
"github.com/docker/libnetwork/cluster"
nwconfig "github.com/docker/libnetwork/config"
@ -83,7 +81,7 @@ type Daemon struct {
RegistryService registry.Service
EventsService *events.Events
netController libnetwork.NetworkController
volumes *store.VolumeStore
volumes *volumesservice.VolumesService
discoveryWatcher discovery.Reloader
root string
seccompEnabled bool
@ -784,8 +782,7 @@ func NewDaemon(config *config.Config, registryService registry.Service, containe
return nil, err
}
// Configure the volumes driver
volStore, err := d.configureVolumes(rootIDs)
d.volumes, err = volumesservice.NewVolumeService(config.Root, d.PluginStore, rootIDs, d)
if err != nil {
return nil, err
}
@ -855,7 +852,6 @@ func NewDaemon(config *config.Config, registryService registry.Service, containe
d.statsCollector = d.newStatsCollector(1 * time.Second)
d.EventsService = events.New()
d.volumes = volStore
d.root = config.Root
d.idMappings = idMappings
d.seccompEnabled = sysInfo.Seccomp
@ -1144,18 +1140,6 @@ func setDefaultMtu(conf *config.Config) {
conf.Mtu = config.DefaultNetworkMtu
}
func (daemon *Daemon) configureVolumes(rootIDs idtools.IDPair) (*store.VolumeStore, error) {
volumeDriver, err := local.New(daemon.configStore.Root, rootIDs)
if err != nil {
return nil, err
}
drivers := volumedrivers.NewStore(daemon.PluginStore)
if !drivers.Register(volumeDriver, volumeDriver.Name()) {
return nil, errors.New("local volume driver could not be registered")
}
return store.New(daemon.configStore.Root, drivers)
}
// IsShuttingDown tells whether the daemon is shutting down or not
func (daemon *Daemon) IsShuttingDown() bool {
return daemon.shutdown

View file

@ -13,9 +13,7 @@ import (
_ "github.com/docker/docker/pkg/discovery/memory"
"github.com/docker/docker/pkg/idtools"
"github.com/docker/docker/pkg/truncindex"
volumedrivers "github.com/docker/docker/volume/drivers"
"github.com/docker/docker/volume/local"
"github.com/docker/docker/volume/store"
volumesservice "github.com/docker/docker/volume/service"
"github.com/docker/go-connections/nat"
"github.com/docker/libnetwork"
"github.com/gotestyourself/gotestyourself/assert"
@ -120,18 +118,10 @@ func initDaemonWithVolumeStore(tmp string) (*Daemon, error) {
repository: tmp,
root: tmp,
}
drivers := volumedrivers.NewStore(nil)
daemon.volumes, err = store.New(tmp, drivers)
daemon.volumes, err = volumesservice.NewVolumeService(tmp, nil, idtools.IDPair{UID: 0, GID: 0}, daemon)
if err != nil {
return nil, err
}
volumesDriver, err := local.New(tmp, idtools.IDPair{UID: 0, GID: 0})
if err != nil {
return nil, err
}
drivers.Register(volumesDriver, volumesDriver.Name())
return daemon, nil
}

View file

@ -11,8 +11,6 @@ import (
"github.com/docker/docker/container"
"github.com/docker/docker/errdefs"
"github.com/docker/docker/pkg/system"
"github.com/docker/docker/volume"
volumestore "github.com/docker/docker/volume/store"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
@ -152,35 +150,3 @@ func (daemon *Daemon) cleanupContainer(container *container.Container, forceRemo
daemon.LogContainerEvent(container, "destroy")
return nil
}
// VolumeRm removes the volume with the given name.
// If the volume is referenced by a container it is not removed
// This is called directly from the Engine API
func (daemon *Daemon) VolumeRm(name string, force bool) error {
v, err := daemon.volumes.Get(name)
if err != nil {
if force && volumestore.IsNotExist(err) {
return nil
}
return err
}
err = daemon.volumeRm(v)
if err != nil && volumestore.IsInUse(err) {
return errdefs.Conflict(err)
}
if err == nil || force {
daemon.volumes.Purge(name)
return nil
}
return err
}
func (daemon *Daemon) volumeRm(v volume.Volume) error {
if err := daemon.volumes.Remove(v); err != nil {
return errors.Wrap(err, "unable to remove volume")
}
daemon.LogVolumeEvent(v.Name(), "destroy", map[string]string{"driver": v.DriverName()})
return nil
}

View file

@ -7,9 +7,6 @@ import (
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/pkg/directory"
"github.com/docker/docker/volume"
"github.com/sirupsen/logrus"
)
// SystemDiskUsage returns information about the daemon data disk usage
@ -34,39 +31,11 @@ func (daemon *Daemon) SystemDiskUsage(ctx context.Context) (*types.DiskUsage, er
return nil, fmt.Errorf("failed to retrieve image list: %v", err)
}
volumes, err := daemon.volumes.FilterByDriver(volume.DefaultDriverName)
localVolumes, err := daemon.volumes.LocalVolumesSize(ctx)
if err != nil {
return nil, err
}
var allVolumes []*types.Volume
for _, v := range volumes {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
if d, ok := v.(volume.DetailedVolume); ok {
if len(d.Options()) > 0 {
// skip local volumes with mount options since these could have external
// mounted filesystems that will be slow to enumerate.
continue
}
}
name := v.Name()
refs := daemon.volumes.Refs(v)
tv := volumeToAPIType(v)
sz, err := directory.Size(ctx, v.Path())
if err != nil {
logrus.Warnf("failed to determine size of volume %v", name)
sz = -1
}
tv.UsageData = &types.VolumeUsageData{Size: sz, RefCount: int64(len(refs))}
allVolumes = append(allVolumes, tv)
}
allLayersSize, err := daemon.imageService.LayerDiskUsage(ctx)
if err != nil {
return nil, err
@ -75,7 +44,7 @@ func (daemon *Daemon) SystemDiskUsage(ctx context.Context) (*types.DiskUsage, er
return &types.DiskUsage{
LayersSize: allLayersSize,
Containers: allContainers,
Volumes: allVolumes,
Volumes: localVolumes,
Images: allImages,
}, nil
}

View file

@ -18,10 +18,6 @@ func containerNotFound(id string) error {
return objNotFoundError{"container", id}
}
func volumeNotFound(id string) error {
return objNotFoundError{"volume", id}
}
type objNotFoundError struct {
object string
id string

View file

@ -14,6 +14,7 @@ import (
"github.com/docker/docker/image"
"github.com/docker/docker/layer"
"github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
@ -26,7 +27,7 @@ var imagesAcceptedFilters = map[string]bool{
// errPruneRunning is returned when a prune request is received while
// one is in progress
var errPruneRunning = fmt.Errorf("a prune operation is already running")
var errPruneRunning = errdefs.Conflict(errors.New("a prune operation is already running"))
// ImagesPrune removes unused images
func (i *ImageService) ImagesPrune(ctx context.Context, pruneFilters filters.Args) (*types.ImagesPruneReport, error) {

View file

@ -13,7 +13,6 @@ import (
"github.com/docker/docker/container"
"github.com/docker/docker/daemon/network"
"github.com/docker/docker/errdefs"
volumestore "github.com/docker/docker/volume/store"
"github.com/docker/go-connections/nat"
)
@ -236,22 +235,6 @@ func (daemon *Daemon) ContainerExecInspect(id string) (*backend.ExecInspect, err
}, nil
}
// VolumeInspect looks up a volume by name. An error is returned if
// the volume cannot be found.
func (daemon *Daemon) VolumeInspect(name string) (*types.Volume, error) {
v, err := daemon.volumes.Get(name)
if err != nil {
if volumestore.IsNotExist(err) {
return nil, volumeNotFound(name)
}
return nil, errdefs.System(err)
}
apiV := volumeToAPIType(v)
apiV.Mountpoint = v.Path()
apiV.Status = v.Status()
return apiV, nil
}
func (daemon *Daemon) getBackwardsCompatibleNetworkSettings(settings *network.Settings) *v1p20.NetworkSettings {
result := &v1p20.NetworkSettings{
NetworkSettingsBase: types.NetworkSettingsBase{

View file

@ -12,19 +12,11 @@ import (
"github.com/docker/docker/daemon/images"
"github.com/docker/docker/errdefs"
"github.com/docker/docker/image"
"github.com/docker/docker/volume"
"github.com/docker/go-connections/nat"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
var acceptedVolumeFilterTags = map[string]bool{
"dangling": true,
"name": true,
"driver": true,
"label": true,
}
var acceptedPsFilterTags = map[string]bool{
"ancestor": true,
"before": true,
@ -605,87 +597,6 @@ func (daemon *Daemon) refreshImage(s *container.Snapshot, ctx *listContext) (*ty
return &c, nil
}
// Volumes lists known volumes, using the filter to restrict the range
// of volumes returned.
func (daemon *Daemon) Volumes(filter string) ([]*types.Volume, []string, error) {
var (
volumesOut []*types.Volume
)
volFilters, err := filters.FromJSON(filter)
if err != nil {
return nil, nil, err
}
if err := volFilters.Validate(acceptedVolumeFilterTags); err != nil {
return nil, nil, err
}
volumes, warnings, err := daemon.volumes.List()
if err != nil {
return nil, nil, err
}
filterVolumes, err := daemon.filterVolumes(volumes, volFilters)
if err != nil {
return nil, nil, err
}
for _, v := range filterVolumes {
apiV := volumeToAPIType(v)
if vv, ok := v.(interface {
CachedPath() string
}); ok {
apiV.Mountpoint = vv.CachedPath()
} else {
apiV.Mountpoint = v.Path()
}
volumesOut = append(volumesOut, apiV)
}
return volumesOut, warnings, nil
}
// filterVolumes filters volume list according to user specified filter
// and returns user chosen volumes
func (daemon *Daemon) filterVolumes(vols []volume.Volume, filter filters.Args) ([]volume.Volume, error) {
// if filter is empty, return original volume list
if filter.Len() == 0 {
return vols, nil
}
var retVols []volume.Volume
for _, vol := range vols {
if filter.Contains("name") {
if !filter.Match("name", vol.Name()) {
continue
}
}
if filter.Contains("driver") {
if !filter.ExactMatch("driver", vol.DriverName()) {
continue
}
}
if filter.Contains("label") {
v, ok := vol.(volume.DetailedVolume)
if !ok {
continue
}
if !filter.MatchKVList("label", v.Labels()) {
continue
}
}
retVols = append(retVols, vol)
}
danglingOnly := false
if filter.Contains("dangling") {
if filter.ExactMatch("dangling", "true") || filter.ExactMatch("dangling", "1") {
danglingOnly = true
} else if !filter.ExactMatch("dangling", "false") && !filter.ExactMatch("dangling", "0") {
return nil, invalidFilter{"dangling", filter.Get("dangling")}
}
retVols = daemon.volumes.FilterByUsed(retVols, !danglingOnly)
}
return retVols, nil
}
func populateImageFilterByParents(ancestorMap map[image.ID]bool, imageID image.ID, getChildren func(image.ID) []image.ID) {
if !ancestorMap[imageID] {
for _, id := range getChildren(imageID) {

View file

@ -1,12 +1,13 @@
package daemon // import "github.com/docker/docker/daemon"
import (
"context"
"fmt"
"strings"
mounttypes "github.com/docker/docker/api/types/mount"
"github.com/docker/docker/container"
volumestore "github.com/docker/docker/volume/store"
volumesservice "github.com/docker/docker/volume/service"
)
func (daemon *Daemon) prepareMountPoints(container *container.Container) error {
@ -20,11 +21,12 @@ func (daemon *Daemon) prepareMountPoints(container *container.Container) error {
func (daemon *Daemon) removeMountPoints(container *container.Container, rm bool) error {
var rmErrors []string
ctx := context.TODO()
for _, m := range container.MountPoints {
if m.Type != mounttypes.TypeVolume || m.Volume == nil {
continue
}
daemon.volumes.Dereference(m.Volume, container.ID)
daemon.volumes.Release(ctx, m.Volume.Name(), container.ID)
if !rm {
continue
}
@ -35,13 +37,13 @@ func (daemon *Daemon) removeMountPoints(container *container.Container, rm bool)
continue
}
err := daemon.volumes.Remove(m.Volume)
err := daemon.volumes.Remove(ctx, m.Volume.Name())
// Ignore volume in use errors because having this
// volume being referenced by other container is
// not an error, but an implementation detail.
// This prevents docker from logging "ERROR: Volume in use"
// where there is another container using the volume.
if err != nil && !volumestore.IsInUse(err) {
if err != nil && !volumesservice.IsInUse(err) {
rmErrors = append(rmErrors, err.Error())
}
}

View file

@ -10,27 +10,23 @@ import (
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
timetypes "github.com/docker/docker/api/types/time"
"github.com/docker/docker/pkg/directory"
"github.com/docker/docker/errdefs"
"github.com/docker/docker/runconfig"
"github.com/docker/docker/volume"
"github.com/docker/libnetwork"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
var (
// errPruneRunning is returned when a prune request is received while
// one is in progress
errPruneRunning = fmt.Errorf("a prune operation is already running")
errPruneRunning = errdefs.Conflict(errors.New("a prune operation is already running"))
containersAcceptedFilters = map[string]bool{
"label": true,
"label!": true,
"until": true,
}
volumesAcceptedFilters = map[string]bool{
"label": true,
"label!": true,
}
networksAcceptedFilters = map[string]bool{
"label": true,
@ -92,67 +88,6 @@ func (daemon *Daemon) ContainersPrune(ctx context.Context, pruneFilters filters.
return rep, nil
}
// VolumesPrune removes unused local volumes
func (daemon *Daemon) VolumesPrune(ctx context.Context, pruneFilters filters.Args) (*types.VolumesPruneReport, error) {
if !atomic.CompareAndSwapInt32(&daemon.pruneRunning, 0, 1) {
return nil, errPruneRunning
}
defer atomic.StoreInt32(&daemon.pruneRunning, 0)
// make sure that only accepted filters have been received
err := pruneFilters.Validate(volumesAcceptedFilters)
if err != nil {
return nil, err
}
rep := &types.VolumesPruneReport{}
volumes, err := daemon.volumes.FilterByDriver(volume.DefaultDriverName)
if err != nil {
return nil, err
}
for _, v := range volumes {
select {
case <-ctx.Done():
logrus.Debugf("VolumesPrune operation cancelled: %#v", *rep)
err := ctx.Err()
if err == context.Canceled {
return rep, nil
}
return rep, err
default:
}
name := v.Name()
refs := daemon.volumes.Refs(v)
if len(refs) == 0 {
detailedVolume, ok := v.(volume.DetailedVolume)
if ok {
if !matchLabels(pruneFilters, detailedVolume.Labels()) {
continue
}
}
vSize, err := directory.Size(ctx, v.Path())
if err != nil {
logrus.Warnf("could not determine size of volume %s: %v", name, err)
}
err = daemon.volumeRm(v)
if err != nil {
logrus.Warnf("could not remove volume %s: %v", name, err)
continue
}
rep.SpaceReclaimed += uint64(vSize)
rep.VolumesDeleted = append(rep.VolumesDeleted, name)
}
}
return rep, nil
}
// localNetworksPrune removes unused local networks
func (daemon *Daemon) localNetworksPrune(ctx context.Context, pruneFilters filters.Args) *types.NetworksPruneReport {
rep := &types.NetworksPruneReport{}

View file

@ -1,6 +1,7 @@
package daemon // import "github.com/docker/docker/daemon"
import (
"context"
"os"
"path/filepath"
"reflect"
@ -15,6 +16,8 @@ import (
"github.com/docker/docker/errdefs"
"github.com/docker/docker/volume"
volumemounts "github.com/docker/docker/volume/mounts"
"github.com/docker/docker/volume/service"
volumeopts "github.com/docker/docker/volume/service/opts"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
@ -27,23 +30,6 @@ var (
type mounts []container.Mount
// volumeToAPIType converts a volume.Volume to the type used by the Engine API
func volumeToAPIType(v volume.Volume) *types.Volume {
createdAt, _ := v.CreatedAt()
tv := &types.Volume{
Name: v.Name(),
Driver: v.DriverName(),
CreatedAt: createdAt.Format(time.RFC3339),
}
if v, ok := v.(volume.DetailedVolume); ok {
tv.Labels = v.Labels()
tv.Options = v.Options()
tv.Scope = v.Scope()
}
return tv
}
// Len returns the number of mounts. Used in sorting.
func (m mounts) Len() int {
return len(m)
@ -78,6 +64,7 @@ func (daemon *Daemon) registerMountPoints(container *container.Container, hostCo
mountPoints := map[string]*volumemounts.MountPoint{}
parser := volumemounts.NewParser(container.OS)
ctx := context.TODO()
defer func() {
// clean up the container mountpoints once return with error
if retErr != nil {
@ -85,7 +72,7 @@ func (daemon *Daemon) registerMountPoints(container *container.Container, hostCo
if m.Volume == nil {
continue
}
daemon.volumes.Dereference(m.Volume, container.ID)
daemon.volumes.Release(ctx, m.Volume.Name(), container.ID)
}
}
}()
@ -94,7 +81,7 @@ func (daemon *Daemon) registerMountPoints(container *container.Container, hostCo
if v, ok := mountPoints[destination]; ok {
logrus.Debugf("Duplicate mount point '%s'", destination)
if v.Volume != nil {
daemon.volumes.Dereference(v.Volume, container.ID)
daemon.volumes.Release(ctx, v.Volume.Name(), container.ID)
}
}
}
@ -130,11 +117,11 @@ func (daemon *Daemon) registerMountPoints(container *container.Container, hostCo
}
if len(cp.Source) == 0 {
v, err := daemon.volumes.GetWithRef(cp.Name, cp.Driver, container.ID)
v, err := daemon.volumes.Get(ctx, cp.Name, volumeopts.WithGetDriver(cp.Driver), volumeopts.WithGetReference(container.ID))
if err != nil {
return err
}
cp.Volume = v
cp.Volume = &volumeWrapper{v: v, s: daemon.volumes}
}
dereferenceIfExists(cp.Destination)
mountPoints[cp.Destination] = cp
@ -163,14 +150,14 @@ func (daemon *Daemon) registerMountPoints(container *container.Container, hostCo
if bind.Type == mounttypes.TypeVolume {
// create the volume
v, err := daemon.volumes.CreateWithRef(bind.Name, bind.Driver, container.ID, nil, nil)
v, err := daemon.volumes.Create(ctx, bind.Name, bind.Driver, volumeopts.WithCreateReference(container.ID))
if err != nil {
return err
}
bind.Volume = v
bind.Source = v.Path()
bind.Volume = &volumeWrapper{v: v, s: daemon.volumes}
bind.Source = v.Mountpoint
// bind.Name is an already existing volume, we need to use that here
bind.Driver = v.DriverName()
bind.Driver = v.Driver
if bind.Driver == volume.DefaultDriverName {
setBindModeIfNull(bind)
}
@ -199,30 +186,30 @@ func (daemon *Daemon) registerMountPoints(container *container.Container, hostCo
}
if mp.Type == mounttypes.TypeVolume {
var v volume.Volume
var v *types.Volume
if cfg.VolumeOptions != nil {
var driverOpts map[string]string
if cfg.VolumeOptions.DriverConfig != nil {
driverOpts = cfg.VolumeOptions.DriverConfig.Options
}
v, err = daemon.volumes.CreateWithRef(mp.Name, mp.Driver, container.ID, driverOpts, cfg.VolumeOptions.Labels)
v, err = daemon.volumes.Create(ctx,
mp.Name,
mp.Driver,
volumeopts.WithCreateReference(container.ID),
volumeopts.WithCreateOptions(driverOpts),
volumeopts.WithCreateLabels(cfg.VolumeOptions.Labels),
)
} else {
v, err = daemon.volumes.CreateWithRef(mp.Name, mp.Driver, container.ID, nil, nil)
v, err = daemon.volumes.Create(ctx, mp.Name, mp.Driver, volumeopts.WithCreateReference(container.ID))
}
if err != nil {
return err
}
mp.Volume = v
mp.Name = v.Name()
mp.Driver = v.DriverName()
mp.Volume = &volumeWrapper{v: v, s: daemon.volumes}
mp.Name = v.Name
mp.Driver = v.Driver
// only use the cached path here since getting the path is not necessary right now and calling `Path()` may be slow
if cv, ok := v.(interface {
CachedPath() string
}); ok {
mp.Source = cv.CachedPath()
}
if mp.Driver == volume.DefaultDriverName {
setBindModeIfNull(mp)
}
@ -239,7 +226,7 @@ func (daemon *Daemon) registerMountPoints(container *container.Container, hostCo
for _, m := range mountPoints {
if parser.IsBackwardCompatible(m) {
if mp, exists := container.MountPoints[m.Destination]; exists && mp.Volume != nil {
daemon.volumes.Dereference(mp.Volume, container.ID)
daemon.volumes.Release(ctx, mp.Volume.Name(), container.ID)
}
}
}
@ -254,11 +241,11 @@ func (daemon *Daemon) registerMountPoints(container *container.Container, hostCo
// This happens after a daemon restart.
func (daemon *Daemon) lazyInitializeVolume(containerID string, m *volumemounts.MountPoint) error {
if len(m.Driver) > 0 && m.Volume == nil {
v, err := daemon.volumes.GetWithRef(m.Name, m.Driver, containerID)
v, err := daemon.volumes.Get(context.TODO(), m.Name, volumeopts.WithGetDriver(m.Driver), volumeopts.WithGetReference(containerID))
if err != nil {
return err
}
m.Volume = v
m.Volume = &volumeWrapper{v: v, s: daemon.volumes}
}
return nil
}
@ -385,3 +372,46 @@ func (daemon *Daemon) backportMountSpec(container *container.Container) {
cm.Spec.ReadOnly = !cm.RW
}
}
// VolumesService is used to perform volume operations
func (daemon *Daemon) VolumesService() *service.VolumesService {
return daemon.volumes
}
type volumeMounter interface {
Mount(ctx context.Context, v *types.Volume, ref string) (string, error)
Unmount(ctx context.Context, v *types.Volume, ref string) error
}
type volumeWrapper struct {
v *types.Volume
s volumeMounter
}
func (v *volumeWrapper) Name() string {
return v.v.Name
}
func (v *volumeWrapper) DriverName() string {
return v.v.Driver
}
func (v *volumeWrapper) Path() string {
return v.v.Mountpoint
}
func (v *volumeWrapper) Mount(ref string) (string, error) {
return v.s.Mount(context.TODO(), v.v, ref)
}
func (v *volumeWrapper) Unmount(ref string) error {
return v.s.Unmount(context.TODO(), v.v, ref)
}
func (v *volumeWrapper) CreatedAt() (time.Time, error) {
return time.Time{}, errors.New("not implemented")
}
func (v *volumeWrapper) Status() map[string]interface{} {
return v.v.Status
}

View file

@ -185,11 +185,12 @@ func (s *DockerSuite) TestVolumeEvents(c *check.C) {
c.Assert(len(events), checker.GreaterThan, 4)
volumeEvents := eventActionsByIDAndType(c, events, "test-event-volume-local", "volume")
c.Assert(volumeEvents, checker.HasLen, 4)
c.Assert(volumeEvents, checker.HasLen, 5)
c.Assert(volumeEvents[0], checker.Equals, "create")
c.Assert(volumeEvents[1], checker.Equals, "mount")
c.Assert(volumeEvents[2], checker.Equals, "unmount")
c.Assert(volumeEvents[3], checker.Equals, "destroy")
c.Assert(volumeEvents[1], checker.Equals, "create")
c.Assert(volumeEvents[2], checker.Equals, "mount")
c.Assert(volumeEvents[3], checker.Equals, "unmount")
c.Assert(volumeEvents[4], checker.Equals, "destroy")
}
func (s *DockerSuite) TestNetworkEvents(c *check.C) {

View file

@ -517,22 +517,20 @@ func (s *DockerExternalVolumeSuite) TestExternalVolumeDriverGetEmptyResponse(c *
}
// Ensure only cached paths are used in volume list to prevent N+1 calls to `VolumeDriver.Path`
//
// TODO(@cpuguy83): This test is testing internal implementation. In all the cases here, there may not even be a path
// available because the volume is not even mounted. Consider removing this test.
func (s *DockerExternalVolumeSuite) TestExternalVolumeDriverPathCalls(c *check.C) {
s.d.Start(c)
c.Assert(s.ec.paths, checker.Equals, 0)
out, err := s.d.Cmd("volume", "create", "test", "--driver=test-external-volume-driver")
c.Assert(err, checker.IsNil, check.Commentf(out))
c.Assert(s.ec.paths, checker.Equals, 1)
c.Assert(s.ec.paths, checker.Equals, 0)
out, err = s.d.Cmd("volume", "ls")
c.Assert(err, checker.IsNil, check.Commentf(out))
c.Assert(s.ec.paths, checker.Equals, 1)
out, err = s.d.Cmd("volume", "inspect", "--format='{{.Mountpoint}}'", "test")
c.Assert(err, checker.IsNil, check.Commentf(out))
c.Assert(strings.TrimSpace(out), checker.Not(checker.Equals), "")
c.Assert(s.ec.paths, checker.Equals, 1)
c.Assert(s.ec.paths, checker.Equals, 0)
}
func (s *DockerExternalVolumeSuite) TestExternalVolumeDriverMountID(c *check.C) {

View file

@ -1,4 +1,4 @@
// +build linux freebsd
// +build linux freebsd darwin
package directory // import "github.com/docker/docker/pkg/directory"

View file

@ -94,7 +94,7 @@ func (a *volumeDriverAdapter) getCapabilities() volume.Capability {
if err != nil {
// `GetCapabilities` is a not a required endpoint.
// On error assume it's a local-only driver
logrus.Warnf("Volume driver %s returned an error while trying to query its capabilities, using default capabilities: %v", a.name, err)
logrus.WithError(err).WithField("driver", a.name).Debug("Volume driver returned an error while trying to query its capabilities, using default capabilities")
return volume.Capability{Scope: volume.LocalScope}
}
@ -105,7 +105,7 @@ func (a *volumeDriverAdapter) getCapabilities() volume.Capability {
cap.Scope = strings.ToLower(cap.Scope)
if cap.Scope != volume.LocalScope && cap.Scope != volume.GlobalScope {
logrus.Warnf("Volume driver %q returned an invalid scope: %q", a.Name(), cap.Scope)
logrus.WithField("driver", a.Name()).WithField("scope", a.Scope).Warn("Volume driver returned an invalid scope")
cap.Scope = volume.LocalScope
}

View file

@ -167,10 +167,10 @@ func (s *Store) ReleaseDriver(name string) (volume.Driver, error) {
func (s *Store) GetDriverList() []string {
var driverList []string
s.mu.Lock()
defer s.mu.Unlock()
for driverName := range s.extensions {
driverList = append(driverList, driverName)
}
s.mu.Unlock()
sort.Strings(driverList)
return driverList
}

89
volume/service/by.go Normal file
View file

@ -0,0 +1,89 @@
package service // import "github.com/docker/docker/volume/service"
import (
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/volume"
)
// By is an interface which is used to implement filtering on volumes.
type By interface {
isBy()
}
// ByDriver is `By` that filters based on the driver names that are passed in
func ByDriver(drivers ...string) By {
return byDriver(drivers)
}
type byDriver []string
func (byDriver) isBy() {}
// ByReferenced is a `By` that filters based on if the volume has references
type ByReferenced bool
func (ByReferenced) isBy() {}
// And creates a `By` combining all the passed in bys using AND logic.
func And(bys ...By) By {
and := make(andCombinator, 0, len(bys))
for _, by := range bys {
and = append(and, by)
}
return and
}
type andCombinator []By
func (andCombinator) isBy() {}
// Or creates a `By` combining all the passed in bys using OR logic.
func Or(bys ...By) By {
or := make(orCombinator, 0, len(bys))
for _, by := range bys {
or = append(or, by)
}
return or
}
type orCombinator []By
func (orCombinator) isBy() {}
// CustomFilter is a `By` that is used by callers to provide custom filtering
// logic.
type CustomFilter filterFunc
func (CustomFilter) isBy() {}
// FromList returns a By which sets the initial list of volumes to use
func FromList(ls *[]volume.Volume, by By) By {
return &fromList{by: by, ls: ls}
}
type fromList struct {
by By
ls *[]volume.Volume
}
func (fromList) isBy() {}
func byLabelFilter(filter filters.Args) By {
return CustomFilter(func(v volume.Volume) bool {
dv, ok := v.(volume.DetailedVolume)
if !ok {
return false
}
labels := dv.Labels()
if !filter.MatchKVList("label", labels) {
return false
}
if filter.Contains("label!") {
if filter.MatchKVList("label!", labels) {
return false
}
}
return true
})
}

132
volume/service/convert.go Normal file
View file

@ -0,0 +1,132 @@
package service
import (
"context"
"time"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/pkg/directory"
"github.com/docker/docker/volume"
"github.com/sirupsen/logrus"
)
// convertOpts are used to pass options to `volumeToAPI`
type convertOpt interface {
isConvertOpt()
}
type useCachedPath bool
func (useCachedPath) isConvertOpt() {}
type calcSize bool
func (calcSize) isConvertOpt() {}
type pathCacher interface {
CachedPath() string
}
func (s *VolumesService) volumesToAPI(ctx context.Context, volumes []volume.Volume, opts ...convertOpt) []*types.Volume {
var (
out = make([]*types.Volume, 0, len(volumes))
getSize bool
cachedPath bool
)
for _, o := range opts {
switch t := o.(type) {
case calcSize:
getSize = bool(t)
case useCachedPath:
cachedPath = bool(t)
}
}
for _, v := range volumes {
select {
case <-ctx.Done():
return nil
default:
}
apiV := volumeToAPIType(v)
if cachedPath {
if vv, ok := v.(pathCacher); ok {
apiV.Mountpoint = vv.CachedPath()
}
} else {
apiV.Mountpoint = v.Path()
}
if getSize {
p := v.Path()
if apiV.Mountpoint == "" {
apiV.Mountpoint = p
}
sz, err := directory.Size(ctx, p)
if err != nil {
logrus.WithError(err).WithField("volume", v.Name()).Warnf("Failed to determine size of volume")
sz = -1
}
apiV.UsageData = &types.VolumeUsageData{Size: sz, RefCount: int64(s.vs.CountReferences(v))}
}
out = append(out, &apiV)
}
return out
}
func volumeToAPIType(v volume.Volume) types.Volume {
createdAt, _ := v.CreatedAt()
tv := types.Volume{
Name: v.Name(),
Driver: v.DriverName(),
CreatedAt: createdAt.Format(time.RFC3339),
}
if v, ok := v.(volume.DetailedVolume); ok {
tv.Labels = v.Labels()
tv.Options = v.Options()
tv.Scope = v.Scope()
}
if cp, ok := v.(pathCacher); ok {
tv.Mountpoint = cp.CachedPath()
}
return tv
}
func filtersToBy(filter filters.Args, acceptedFilters map[string]bool) (By, error) {
if err := filter.Validate(acceptedFilters); err != nil {
return nil, err
}
var bys []By
if drivers := filter.Get("driver"); len(drivers) > 0 {
bys = append(bys, ByDriver(drivers...))
}
if filter.Contains("name") {
bys = append(bys, CustomFilter(func(v volume.Volume) bool {
return filter.Match("name", v.Name())
}))
}
bys = append(bys, byLabelFilter(filter))
if filter.Contains("dangling") {
var dangling bool
if filter.ExactMatch("dangling", "true") || filter.ExactMatch("dangling", "1") {
dangling = true
} else if !filter.ExactMatch("dangling", "false") && !filter.ExactMatch("dangling", "0") {
return nil, invalidFilter{"dangling", filter.Get("dangling")}
}
bys = append(bys, ByReferenced(!dangling))
}
var by By
switch len(bys) {
case 0:
case 1:
by = bys[0]
default:
by = And(bys...)
}
return by, nil
}

View file

@ -1,4 +1,4 @@
package store // import "github.com/docker/docker/volume/store"
package service // import "github.com/docker/docker/volume/service"
import (
"encoding/json"

View file

@ -1,4 +1,4 @@
package store
package service // import "github.com/docker/docker/volume/service"
import (
"io/ioutil"

View file

@ -0,0 +1,21 @@
// +build linux windows
package service // import "github.com/docker/docker/volume/service"
import (
"github.com/docker/docker/pkg/idtools"
"github.com/docker/docker/volume"
"github.com/docker/docker/volume/drivers"
"github.com/docker/docker/volume/local"
"github.com/pkg/errors"
)
func setupDefaultDriver(store *drivers.Store, root string, rootIDs idtools.IDPair) error {
d, err := local.New(root, rootIDs)
if err != nil {
return errors.Wrap(err, "error setting up default driver")
}
if !store.Register(d, volume.DefaultDriverName) {
return errors.New("local volume driver could not be registered")
}
return nil
}

View file

@ -0,0 +1,10 @@
// +build !linux,!windows
package service // import "github.com/docker/docker/volume/service"
import (
"github.com/docker/docker/pkg/idtools"
"github.com/docker/docker/volume/drivers"
)
func setupDefaultDriver(_ *drivers.Store, _ string, _ idtools.IDPair) error { return nil }

View file

@ -1,6 +1,7 @@
package store // import "github.com/docker/docker/volume/store"
package service // import "github.com/docker/docker/volume/service"
import (
"fmt"
"strings"
)
@ -93,3 +94,18 @@ func isErr(err error, expected error) bool {
}
return err == expected
}
type invalidFilter struct {
filter string
value interface{}
}
func (e invalidFilter) Error() string {
msg := "Invalid filter '" + e.filter
if e.value != nil {
msg += fmt.Sprintf("=%s", e.value)
}
return msg + "'"
}
func (e invalidFilter) InvalidParameter() {}

View file

@ -0,0 +1,89 @@
package opts
// CreateOption is used to pass options in when creating a volume
type CreateOption func(*CreateConfig)
// CreateConfig is the set of config options that can be set when creating
// a volume
type CreateConfig struct {
Options map[string]string
Labels map[string]string
Reference string
}
// WithCreateLabels creates a CreateOption which sets the labels to the
// passed in value
func WithCreateLabels(labels map[string]string) CreateOption {
return func(cfg *CreateConfig) {
cfg.Labels = labels
}
}
// WithCreateOptions creates a CreateOption which sets the options passed
// to the volume driver when creating a volume to the options passed in.
func WithCreateOptions(opts map[string]string) CreateOption {
return func(cfg *CreateConfig) {
cfg.Options = opts
}
}
// WithCreateReference creats a CreateOption which sets a reference to use
// when creating a volume. This ensures that the volume is created with a reference
// already attached to it to prevent race conditions with Create and volume cleanup.
func WithCreateReference(ref string) CreateOption {
return func(cfg *CreateConfig) {
cfg.Reference = ref
}
}
// GetConfig is used with `GetOption` to set options for the volumes service's
// `Get` implementation.
type GetConfig struct {
Driver string
Reference string
ResolveStatus bool
}
// GetOption is passed to the service `Get` add extra details on the get request
type GetOption func(*GetConfig)
// WithGetDriver provides the driver to get the volume from
// If no driver is provided to `Get`, first the available metadata is checked
// to see which driver it belongs to, if that is not available all drivers are
// probed to find the volume.
func WithGetDriver(name string) GetOption {
return func(o *GetConfig) {
o.Driver = name
}
}
// WithGetReference indicates to `Get` to increment the reference count for the
// retreived volume with the provided reference ID.
func WithGetReference(ref string) GetOption {
return func(o *GetConfig) {
o.Reference = ref
}
}
// WithGetResolveStatus indicates to `Get` to also fetch the volume status.
// This can cause significant overhead in the volume lookup.
func WithGetResolveStatus(cfg *GetConfig) {
cfg.ResolveStatus = true
}
// RemoveConfig is used by `RemoveOption` to store config options for remove
type RemoveConfig struct {
PurgeOnError bool
}
// RemoveOption is used to pass options to the volumes service `Remove` implementation
type RemoveOption func(*RemoveConfig)
// WithPurgeOnError is an option passed to `Remove` which will purge all cached
// data about a volume even if there was an error while attempting to remove the
// volume.
func WithPurgeOnError(b bool) RemoveOption {
return func(o *RemoveConfig) {
o.PurgeOnError = b
}
}

View file

@ -1,6 +1,7 @@
package store // import "github.com/docker/docker/volume/store"
package service // import "github.com/docker/docker/volume/service"
import (
"context"
"sync"
"github.com/boltdb/bolt"
@ -20,6 +21,7 @@ func (s *VolumeStore) restore() {
ls = listMeta(tx)
return nil
})
ctx := context.Background()
chRemove := make(chan *volumeMetadata, len(ls))
var wg sync.WaitGroup
@ -32,7 +34,7 @@ func (s *VolumeStore) restore() {
var v volume.Volume
var err error
if meta.Driver != "" {
v, err = lookupVolume(s.drivers, meta.Driver, meta.Name)
v, err = lookupVolume(ctx, s.drivers, meta.Driver, meta.Name)
if err != nil && err != errNoSuchVolume {
logrus.WithError(err).WithField("driver", meta.Driver).WithField("volume", meta.Name).Warn("Error restoring volume")
return
@ -43,7 +45,7 @@ func (s *VolumeStore) restore() {
return
}
} else {
v, err = s.getVolume(meta.Name)
v, err = s.getVolume(ctx, meta.Name, meta.Driver)
if err != nil {
if err == errNoSuchVolume {
chRemove <- &meta
@ -65,6 +67,7 @@ func (s *VolumeStore) restore() {
s.options[v.Name()] = meta.Options
s.labels[v.Name()] = meta.Labels
s.names[v.Name()] = v
s.refs[v.Name()] = make(map[string]struct{})
s.globalLock.Unlock()
}(meta)
}

View file

@ -1,12 +1,14 @@
package store
package service // import "github.com/docker/docker/volume/service"
import (
"context"
"io/ioutil"
"os"
"testing"
"github.com/docker/docker/volume"
volumedrivers "github.com/docker/docker/volume/drivers"
"github.com/docker/docker/volume/service/opts"
volumetestutils "github.com/docker/docker/volume/testutils"
"github.com/gotestyourself/gotestyourself/assert"
)
@ -22,24 +24,25 @@ func TestRestore(t *testing.T) {
driverName := "test-restore"
drivers.Register(volumetestutils.NewFakeDriver(driverName), driverName)
s, err := New(dir, drivers)
s, err := NewStore(dir, drivers)
assert.NilError(t, err)
defer s.Shutdown()
_, err = s.Create("test1", driverName, nil, nil)
ctx := context.Background()
_, err = s.Create(ctx, "test1", driverName)
assert.NilError(t, err)
testLabels := map[string]string{"a": "1"}
testOpts := map[string]string{"foo": "bar"}
_, err = s.Create("test2", driverName, testOpts, testLabels)
_, err = s.Create(ctx, "test2", driverName, opts.WithCreateOptions(testOpts), opts.WithCreateLabels(testLabels))
assert.NilError(t, err)
s.Shutdown()
s, err = New(dir, drivers)
s, err = NewStore(dir, drivers)
assert.NilError(t, err)
v, err := s.Get("test1")
v, err := s.Get(ctx, "test1")
assert.NilError(t, err)
dv := v.(volume.DetailedVolume)
@ -47,7 +50,7 @@ func TestRestore(t *testing.T) {
assert.DeepEqual(t, nilMap, dv.Options())
assert.DeepEqual(t, nilMap, dv.Labels())
v, err = s.Get("test2")
v, err = s.Get(ctx, "test2")
assert.NilError(t, err)
dv = v.(volume.DetailedVolume)
assert.DeepEqual(t, testOpts, dv.Options())

243
volume/service/service.go Normal file
View file

@ -0,0 +1,243 @@
package service // import "github.com/docker/docker/volume/service"
import (
"context"
"sync/atomic"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/errdefs"
"github.com/docker/docker/pkg/directory"
"github.com/docker/docker/pkg/idtools"
"github.com/docker/docker/pkg/plugingetter"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/docker/volume"
"github.com/docker/docker/volume/drivers"
"github.com/docker/docker/volume/service/opts"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
type ds interface {
GetDriverList() []string
}
type volumeEventLogger interface {
LogVolumeEvent(volumeID, action string, attributes map[string]string)
}
// VolumesService manages access to volumes
type VolumesService struct {
vs *VolumeStore
ds ds
pruneRunning int32
eventLogger volumeEventLogger
}
// NewVolumeService creates a new volume service
func NewVolumeService(root string, pg plugingetter.PluginGetter, rootIDs idtools.IDPair, logger volumeEventLogger) (*VolumesService, error) {
ds := drivers.NewStore(pg)
if err := setupDefaultDriver(ds, root, rootIDs); err != nil {
return nil, err
}
vs, err := NewStore(root, ds)
if err != nil {
return nil, err
}
return &VolumesService{vs: vs, ds: ds, eventLogger: logger}, nil
}
// GetDriverList gets the list of registered volume drivers
func (s *VolumesService) GetDriverList() []string {
return s.ds.GetDriverList()
}
// Create creates a volume
func (s *VolumesService) Create(ctx context.Context, name, driverName string, opts ...opts.CreateOption) (*types.Volume, error) {
if name == "" {
name = stringid.GenerateNonCryptoID()
}
v, err := s.vs.Create(ctx, name, driverName, opts...)
if err != nil {
return nil, err
}
s.eventLogger.LogVolumeEvent(v.Name(), "create", map[string]string{"driver": v.DriverName()})
apiV := volumeToAPIType(v)
return &apiV, nil
}
// Get gets a volume
func (s *VolumesService) Get(ctx context.Context, name string, getOpts ...opts.GetOption) (*types.Volume, error) {
v, err := s.vs.Get(ctx, name, getOpts...)
if err != nil {
return nil, err
}
vol := volumeToAPIType(v)
var cfg opts.GetConfig
for _, o := range getOpts {
o(&cfg)
}
if cfg.ResolveStatus {
vol.Status = v.Status()
}
return &vol, nil
}
// Mount mounts the volume
func (s *VolumesService) Mount(ctx context.Context, vol *types.Volume, ref string) (string, error) {
v, err := s.vs.Get(ctx, vol.Name, opts.WithGetDriver(vol.Driver))
if err != nil {
if IsNotExist(err) {
err = errdefs.NotFound(err)
}
return "", err
}
return v.Mount(ref)
}
// Unmount unmounts the volume.
// Note that depending on the implementation, the volume may still be mounted due to other resources using it.
func (s *VolumesService) Unmount(ctx context.Context, vol *types.Volume, ref string) error {
v, err := s.vs.Get(ctx, vol.Name, opts.WithGetDriver(vol.Driver))
if err != nil {
if IsNotExist(err) {
err = errdefs.NotFound(err)
}
return err
}
return v.Unmount(ref)
}
// Release releases a volume reference
func (s *VolumesService) Release(ctx context.Context, name string, ref string) error {
return s.vs.Release(ctx, name, ref)
}
// Remove removes a volume
func (s *VolumesService) Remove(ctx context.Context, name string, rmOpts ...opts.RemoveOption) error {
var cfg opts.RemoveConfig
for _, o := range rmOpts {
o(&cfg)
}
v, err := s.vs.Get(ctx, name)
if err != nil {
if IsNotExist(err) && cfg.PurgeOnError {
return nil
}
return err
}
err = s.vs.Remove(ctx, v, rmOpts...)
if IsNotExist(err) {
err = nil
} else if IsInUse(err) {
err = errdefs.Conflict(err)
} else if IsNotExist(err) && cfg.PurgeOnError {
err = nil
}
if err == nil {
s.eventLogger.LogVolumeEvent(v.Name(), "destroy", map[string]string{"driver": v.DriverName()})
}
return err
}
var acceptedPruneFilters = map[string]bool{
"label": true,
"label!": true,
}
var acceptedListFilters = map[string]bool{
"dangling": true,
"name": true,
"driver": true,
"label": true,
}
// LocalVolumesSize gets all local volumes and fetches their size on disk
// Note that this intentionally skips volumes which have mount options. Typically
// volumes with mount options are not really local even if they are using the
// local driver.
func (s *VolumesService) LocalVolumesSize(ctx context.Context) ([]*types.Volume, error) {
ls, _, err := s.vs.Find(ctx, And(ByDriver(volume.DefaultDriverName), CustomFilter(func(v volume.Volume) bool {
dv, ok := v.(volume.DetailedVolume)
return ok && len(dv.Options()) == 0
})))
if err != nil {
return nil, err
}
return s.volumesToAPI(ctx, ls, calcSize(true)), nil
}
// Prune removes (local) volumes which match the past in filter arguments.
// Note that this intentionally skips volumes with mount options as there would
// be no space reclaimed in this case.
func (s *VolumesService) Prune(ctx context.Context, filter filters.Args) (*types.VolumesPruneReport, error) {
if !atomic.CompareAndSwapInt32(&s.pruneRunning, 0, 1) {
return nil, errdefs.Conflict(errors.New("a prune operation is already running"))
}
defer atomic.StoreInt32(&s.pruneRunning, 0)
by, err := filtersToBy(filter, acceptedPruneFilters)
if err != nil {
return nil, err
}
ls, _, err := s.vs.Find(ctx, And(ByDriver(volume.DefaultDriverName), ByReferenced(false), by, CustomFilter(func(v volume.Volume) bool {
dv, ok := v.(volume.DetailedVolume)
return ok && len(dv.Options()) == 0
})))
if err != nil {
return nil, err
}
rep := &types.VolumesPruneReport{VolumesDeleted: make([]string, 0, len(ls))}
for _, v := range ls {
select {
case <-ctx.Done():
err := ctx.Err()
if err == context.Canceled {
err = nil
}
return rep, err
default:
}
vSize, err := directory.Size(ctx, v.Path())
if err != nil {
logrus.WithField("volume", v.Name()).WithError(err).Warn("could not determine size of volume")
}
if err := s.vs.Remove(ctx, v); err != nil {
logrus.WithError(err).WithField("volume", v.Name()).Warnf("Could not determine size of volume")
continue
}
rep.SpaceReclaimed += uint64(vSize)
rep.VolumesDeleted = append(rep.VolumesDeleted, v.Name())
}
return rep, nil
}
// List gets the list of volumes which match the past in filters
// If filters is nil or empty all volumes are returned.
func (s *VolumesService) List(ctx context.Context, filter filters.Args) (volumesOut []*types.Volume, warnings []string, err error) {
by, err := filtersToBy(filter, acceptedListFilters)
if err != nil {
return nil, nil, err
}
volumes, warnings, err := s.vs.Find(ctx, by)
if err != nil {
return nil, nil, err
}
return s.volumesToAPI(ctx, volumes, useCachedPath(true)), warnings, nil
}
// Shutdown shuts down the image service and dependencies
func (s *VolumesService) Shutdown() error {
return s.vs.Shutdown()
}

View file

@ -0,0 +1,67 @@
package service
import (
"context"
"io/ioutil"
"os"
"path/filepath"
"testing"
"github.com/docker/docker/pkg/idtools"
"github.com/docker/docker/volume"
volumedrivers "github.com/docker/docker/volume/drivers"
"github.com/docker/docker/volume/local"
"github.com/docker/docker/volume/service/opts"
"github.com/docker/docker/volume/testutils"
"github.com/gotestyourself/gotestyourself/assert"
is "github.com/gotestyourself/gotestyourself/assert/cmp"
)
func TestLocalVolumeSize(t *testing.T) {
t.Parallel()
ds := volumedrivers.NewStore(nil)
dir, err := ioutil.TempDir("", t.Name())
assert.Assert(t, err)
defer os.RemoveAll(dir)
l, err := local.New(dir, idtools.IDPair{UID: os.Getuid(), GID: os.Getegid()})
assert.Assert(t, err)
assert.Assert(t, ds.Register(l, volume.DefaultDriverName))
assert.Assert(t, ds.Register(testutils.NewFakeDriver("fake"), "fake"))
service, cleanup := newTestService(t, ds)
defer cleanup()
ctx := context.Background()
v1, err := service.Create(ctx, "test1", volume.DefaultDriverName, opts.WithCreateReference("foo"))
assert.Assert(t, err)
v2, err := service.Create(ctx, "test2", volume.DefaultDriverName)
assert.Assert(t, err)
_, err = service.Create(ctx, "test3", "fake")
assert.Assert(t, err)
data := make([]byte, 1024)
err = ioutil.WriteFile(filepath.Join(v1.Mountpoint, "data"), data, 0644)
assert.Assert(t, err)
err = ioutil.WriteFile(filepath.Join(v2.Mountpoint, "data"), data[:1], 0644)
assert.Assert(t, err)
ls, err := service.LocalVolumesSize(ctx)
assert.Assert(t, err)
assert.Assert(t, is.Len(ls, 2))
for _, v := range ls {
switch v.Name {
case "test1":
assert.Assert(t, is.Equal(v.UsageData.Size, int64(len(data))))
assert.Assert(t, is.Equal(v.UsageData.RefCount, int64(1)))
case "test2":
assert.Assert(t, is.Equal(v.UsageData.Size, int64(len(data[:1]))))
assert.Assert(t, is.Equal(v.UsageData.RefCount, int64(0)))
default:
t.Fatalf("got unexpected volume: %+v", v)
}
}
assert.Assert(t, is.Equal(ls[1].UsageData.Size, int64(len(data[:1]))))
}

View file

@ -0,0 +1,253 @@
package service
import (
"context"
"io/ioutil"
"os"
"testing"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/errdefs"
"github.com/docker/docker/volume"
volumedrivers "github.com/docker/docker/volume/drivers"
"github.com/docker/docker/volume/service/opts"
"github.com/docker/docker/volume/testutils"
"github.com/gotestyourself/gotestyourself/assert"
is "github.com/gotestyourself/gotestyourself/assert/cmp"
)
func TestServiceCreate(t *testing.T) {
t.Parallel()
ds := volumedrivers.NewStore(nil)
assert.Assert(t, ds.Register(testutils.NewFakeDriver("d1"), "d1"))
assert.Assert(t, ds.Register(testutils.NewFakeDriver("d2"), "d2"))
ctx := context.Background()
service, cleanup := newTestService(t, ds)
defer cleanup()
_, err := service.Create(ctx, "v1", "notexist")
assert.Assert(t, errdefs.IsNotFound(err), err)
v, err := service.Create(ctx, "v1", "d1")
assert.Assert(t, err)
vCopy, err := service.Create(ctx, "v1", "d1")
assert.Assert(t, err)
assert.Assert(t, is.DeepEqual(v, vCopy))
_, err = service.Create(ctx, "v1", "d2")
assert.Check(t, IsNameConflict(err), err)
assert.Check(t, errdefs.IsConflict(err), err)
assert.Assert(t, service.Remove(ctx, "v1"))
_, err = service.Create(ctx, "v1", "d2")
assert.Assert(t, err)
_, err = service.Create(ctx, "v1", "d2")
assert.Assert(t, err)
}
func TestServiceList(t *testing.T) {
t.Parallel()
ds := volumedrivers.NewStore(nil)
assert.Assert(t, ds.Register(testutils.NewFakeDriver("d1"), "d1"))
assert.Assert(t, ds.Register(testutils.NewFakeDriver("d2"), "d2"))
service, cleanup := newTestService(t, ds)
defer cleanup()
ctx := context.Background()
_, err := service.Create(ctx, "v1", "d1")
assert.Assert(t, err)
_, err = service.Create(ctx, "v2", "d1")
assert.Assert(t, err)
_, err = service.Create(ctx, "v3", "d2")
assert.Assert(t, err)
ls, _, err := service.List(ctx, filters.NewArgs(filters.Arg("driver", "d1")))
assert.Assert(t, err)
assert.Check(t, is.Len(ls, 2))
ls, _, err = service.List(ctx, filters.NewArgs(filters.Arg("driver", "d2")))
assert.Assert(t, err)
assert.Check(t, is.Len(ls, 1))
ls, _, err = service.List(ctx, filters.NewArgs(filters.Arg("driver", "notexist")))
assert.Assert(t, err)
assert.Check(t, is.Len(ls, 0))
ls, _, err = service.List(ctx, filters.NewArgs(filters.Arg("dangling", "true")))
assert.Assert(t, err)
assert.Check(t, is.Len(ls, 3))
ls, _, err = service.List(ctx, filters.NewArgs(filters.Arg("dangling", "false")))
assert.Assert(t, err)
assert.Check(t, is.Len(ls, 0))
_, err = service.Get(ctx, "v1", opts.WithGetReference("foo"))
assert.Assert(t, err)
ls, _, err = service.List(ctx, filters.NewArgs(filters.Arg("dangling", "true")))
assert.Assert(t, err)
assert.Check(t, is.Len(ls, 2))
ls, _, err = service.List(ctx, filters.NewArgs(filters.Arg("dangling", "false")))
assert.Assert(t, err)
assert.Check(t, is.Len(ls, 1))
ls, _, err = service.List(ctx, filters.NewArgs(filters.Arg("dangling", "false"), filters.Arg("driver", "d2")))
assert.Assert(t, err)
assert.Check(t, is.Len(ls, 0))
ls, _, err = service.List(ctx, filters.NewArgs(filters.Arg("dangling", "true"), filters.Arg("driver", "d2")))
assert.Assert(t, err)
assert.Check(t, is.Len(ls, 1))
}
func TestServiceRemove(t *testing.T) {
t.Parallel()
ds := volumedrivers.NewStore(nil)
assert.Assert(t, ds.Register(testutils.NewFakeDriver("d1"), "d1"))
service, cleanup := newTestService(t, ds)
defer cleanup()
ctx := context.Background()
_, err := service.Create(ctx, "test", "d1")
assert.Assert(t, err)
assert.Assert(t, service.Remove(ctx, "test"))
assert.Assert(t, service.Remove(ctx, "test", opts.WithPurgeOnError(true)))
}
func TestServiceGet(t *testing.T) {
t.Parallel()
ds := volumedrivers.NewStore(nil)
assert.Assert(t, ds.Register(testutils.NewFakeDriver("d1"), "d1"))
service, cleanup := newTestService(t, ds)
defer cleanup()
ctx := context.Background()
v, err := service.Get(ctx, "notexist")
assert.Assert(t, IsNotExist(err))
assert.Check(t, v == nil)
created, err := service.Create(ctx, "test", "d1")
assert.Assert(t, err)
assert.Assert(t, created != nil)
v, err = service.Get(ctx, "test")
assert.Assert(t, err)
assert.Assert(t, is.DeepEqual(created, v))
v, err = service.Get(ctx, "test", opts.WithGetResolveStatus)
assert.Assert(t, err)
assert.Assert(t, is.Len(v.Status, 1), v.Status)
v, err = service.Get(ctx, "test", opts.WithGetDriver("notarealdriver"))
assert.Assert(t, errdefs.IsConflict(err), err)
v, err = service.Get(ctx, "test", opts.WithGetDriver("d1"))
assert.Assert(t, err == nil)
assert.Assert(t, is.DeepEqual(created, v))
assert.Assert(t, ds.Register(testutils.NewFakeDriver("d2"), "d2"))
v, err = service.Get(ctx, "test", opts.WithGetDriver("d2"))
assert.Assert(t, errdefs.IsConflict(err), err)
}
func TestServicePrune(t *testing.T) {
t.Parallel()
ds := volumedrivers.NewStore(nil)
assert.Assert(t, ds.Register(testutils.NewFakeDriver(volume.DefaultDriverName), volume.DefaultDriverName))
assert.Assert(t, ds.Register(testutils.NewFakeDriver("other"), "other"))
service, cleanup := newTestService(t, ds)
defer cleanup()
ctx := context.Background()
_, err := service.Create(ctx, "test", volume.DefaultDriverName)
assert.Assert(t, err)
_, err = service.Create(ctx, "test2", "other")
assert.Assert(t, err)
pr, err := service.Prune(ctx, filters.NewArgs(filters.Arg("label", "banana")))
assert.Assert(t, err)
assert.Assert(t, is.Len(pr.VolumesDeleted, 0))
pr, err = service.Prune(ctx, filters.NewArgs())
assert.Assert(t, err)
assert.Assert(t, is.Len(pr.VolumesDeleted, 1))
assert.Assert(t, is.Equal(pr.VolumesDeleted[0], "test"))
_, err = service.Get(ctx, "test")
assert.Assert(t, IsNotExist(err), err)
v, err := service.Get(ctx, "test2")
assert.Assert(t, err)
assert.Assert(t, is.Equal(v.Driver, "other"))
_, err = service.Create(ctx, "test", volume.DefaultDriverName)
assert.Assert(t, err)
pr, err = service.Prune(ctx, filters.NewArgs(filters.Arg("label!", "banana")))
assert.Assert(t, err)
assert.Assert(t, is.Len(pr.VolumesDeleted, 1))
assert.Assert(t, is.Equal(pr.VolumesDeleted[0], "test"))
v, err = service.Get(ctx, "test2")
assert.Assert(t, err)
assert.Assert(t, is.Equal(v.Driver, "other"))
_, err = service.Create(ctx, "test", volume.DefaultDriverName, opts.WithCreateLabels(map[string]string{"banana": ""}))
assert.Assert(t, err)
pr, err = service.Prune(ctx, filters.NewArgs(filters.Arg("label!", "banana")))
assert.Assert(t, err)
assert.Assert(t, is.Len(pr.VolumesDeleted, 0))
_, err = service.Create(ctx, "test3", volume.DefaultDriverName, opts.WithCreateLabels(map[string]string{"banana": "split"}))
assert.Assert(t, err)
pr, err = service.Prune(ctx, filters.NewArgs(filters.Arg("label!", "banana=split")))
assert.Assert(t, err)
assert.Assert(t, is.Len(pr.VolumesDeleted, 1))
assert.Assert(t, is.Equal(pr.VolumesDeleted[0], "test"))
pr, err = service.Prune(ctx, filters.NewArgs(filters.Arg("label", "banana=split")))
assert.Assert(t, err)
assert.Assert(t, is.Len(pr.VolumesDeleted, 1))
assert.Assert(t, is.Equal(pr.VolumesDeleted[0], "test3"))
v, err = service.Create(ctx, "test", volume.DefaultDriverName, opts.WithCreateReference(t.Name()))
assert.Assert(t, err)
pr, err = service.Prune(ctx, filters.NewArgs())
assert.Assert(t, err)
assert.Assert(t, is.Len(pr.VolumesDeleted, 0))
assert.Assert(t, service.Release(ctx, v.Name, t.Name()))
pr, err = service.Prune(ctx, filters.NewArgs())
assert.Assert(t, err)
assert.Assert(t, is.Len(pr.VolumesDeleted, 1))
assert.Assert(t, is.Equal(pr.VolumesDeleted[0], "test"))
}
func newTestService(t *testing.T, ds *volumedrivers.Store) (*VolumesService, func()) {
t.Helper()
dir, err := ioutil.TempDir("", t.Name())
assert.Assert(t, err)
store, err := NewStore(dir, ds)
assert.Assert(t, err)
s := &VolumesService{vs: store, eventLogger: dummyEventLogger{}}
return s, func() {
assert.Check(t, s.Shutdown())
assert.Check(t, os.RemoveAll(dir))
}
}
type dummyEventLogger struct{}
func (dummyEventLogger) LogVolumeEvent(_, _ string, _ map[string]string) {}

View file

@ -1,6 +1,8 @@
package store // import "github.com/docker/docker/volume/store"
package service // import "github.com/docker/docker/volume/service"
import (
"context"
"fmt"
"net"
"os"
"path/filepath"
@ -11,10 +13,12 @@ import (
"github.com/pkg/errors"
"github.com/boltdb/bolt"
"github.com/docker/docker/errdefs"
"github.com/docker/docker/pkg/locker"
"github.com/docker/docker/volume"
"github.com/docker/docker/volume/drivers"
volumemounts "github.com/docker/docker/volume/mounts"
"github.com/docker/docker/volume/service/opts"
"github.com/sirupsen/logrus"
)
@ -65,9 +69,8 @@ func (v volumeWrapper) CachedPath() string {
return v.Volume.Path()
}
// New initializes a VolumeStore to keep
// reference counting of volumes in the system.
func New(rootPath string, drivers *drivers.Store) (*VolumeStore, error) {
// NewStore creates a new volume store at the given path
func NewStore(rootPath string, drivers *drivers.Store) (*VolumeStore, error) {
vs := &VolumeStore{
locks: &locker.Locker{},
names: make(map[string]volume.Volume),
@ -84,10 +87,8 @@ func New(rootPath string, drivers *drivers.Store) (*VolumeStore, error) {
return nil, err
}
dbPath := filepath.Join(volPath, "metadata.db")
var err error
vs.db, err = bolt.Open(dbPath, 0600, &bolt.Options{Timeout: 1 * time.Second})
vs.db, err = bolt.Open(filepath.Join(volPath, "metadata.db"), 0600, &bolt.Options{Timeout: 1 * time.Second})
if err != nil {
return nil, errors.Wrap(err, "error while opening volume store metadata database")
}
@ -152,10 +153,18 @@ func (s *VolumeStore) getRefs(name string) []string {
return refs
}
// Purge allows the cleanup of internal data on docker in case
// purge allows the cleanup of internal data on docker in case
// the internal data is out of sync with volumes driver plugins.
func (s *VolumeStore) Purge(name string) {
func (s *VolumeStore) purge(ctx context.Context, name string) error {
s.globalLock.Lock()
defer s.globalLock.Unlock()
select {
case <-ctx.Done():
return ctx.Err()
default:
}
v, exists := s.names[name]
if exists {
driverName := v.DriverName()
@ -170,7 +179,7 @@ func (s *VolumeStore) Purge(name string) {
delete(s.refs, name)
delete(s.labels, name)
delete(s.options, name)
s.globalLock.Unlock()
return nil
}
// VolumeStore is a struct that stores the list of volumes available and keeps track of their usage counts
@ -193,14 +202,137 @@ type VolumeStore struct {
db *bolt.DB
}
// List proxies to all registered volume drivers to get the full list of volumes
func filterByDriver(names []string) filterFunc {
return func(v volume.Volume) bool {
for _, name := range names {
if name == v.DriverName() {
return true
}
}
return false
}
}
func (s *VolumeStore) byReferenced(referenced bool) filterFunc {
return func(v volume.Volume) bool {
return s.hasRef(v.Name()) == referenced
}
}
func (s *VolumeStore) filter(ctx context.Context, vols *[]volume.Volume, by By) (warnings []string, err error) {
// note that this specifically does not support the `FromList` By type.
switch f := by.(type) {
case nil:
if *vols == nil {
var ls []volume.Volume
ls, warnings, err = s.list(ctx)
if err != nil {
return warnings, err
}
*vols = ls
}
case byDriver:
if *vols != nil {
filter(vols, filterByDriver([]string(f)))
return nil, nil
}
var ls []volume.Volume
ls, warnings, err = s.list(ctx, []string(f)...)
if err != nil {
return nil, err
}
*vols = ls
case ByReferenced:
// TODO(@cpuguy83): It would be nice to optimize this by looking at the list
// of referenced volumes, however the locking strategy makes this difficult
// without either providing inconsistent data or deadlocks.
if *vols == nil {
var ls []volume.Volume
ls, warnings, err = s.list(ctx)
if err != nil {
return nil, err
}
*vols = ls
}
filter(vols, s.byReferenced(bool(f)))
case andCombinator:
for _, by := range f {
w, err := s.filter(ctx, vols, by)
if err != nil {
return warnings, err
}
warnings = append(warnings, w...)
}
case orCombinator:
for _, by := range f {
switch by.(type) {
case byDriver:
var ls []volume.Volume
w, err := s.filter(ctx, &ls, by)
if err != nil {
return warnings, err
}
warnings = append(warnings, w...)
default:
ls, w, err := s.list(ctx)
if err != nil {
return warnings, err
}
warnings = append(warnings, w...)
w, err = s.filter(ctx, &ls, by)
if err != nil {
return warnings, err
}
warnings = append(warnings, w...)
*vols = append(*vols, ls...)
}
}
unique(vols)
case CustomFilter:
if *vols == nil {
var ls []volume.Volume
ls, warnings, err = s.list(ctx)
if err != nil {
return nil, err
}
*vols = ls
}
filter(vols, filterFunc(f))
default:
return nil, errdefs.InvalidParameter(errors.Errorf("unsupported filter: %T", f))
}
return warnings, nil
}
func unique(ls *[]volume.Volume) {
names := make(map[string]bool, len(*ls))
filter(ls, func(v volume.Volume) bool {
if names[v.Name()] {
return false
}
names[v.Name()] = true
return true
})
}
// Find lists volumes filtered by the past in filter.
// If a driver returns a volume that has name which conflicts with another volume from a different driver,
// the first volume is chosen and the conflicting volume is dropped.
func (s *VolumeStore) List() ([]volume.Volume, []string, error) {
vols, warnings, err := s.list()
func (s *VolumeStore) Find(ctx context.Context, by By) (vols []volume.Volume, warnings []string, err error) {
logrus.WithField("ByType", fmt.Sprintf("%T", by)).WithField("ByValue", fmt.Sprintf("%+v", by)).Debug("VolumeStore.Find")
switch f := by.(type) {
case nil, orCombinator, andCombinator, byDriver, ByReferenced, CustomFilter:
warnings, err = s.filter(ctx, &vols, by)
case fromList:
warnings, err = s.filter(ctx, f.ls, f.by)
default:
// Really shouldn't be possible, but makes sure that any new By's are added to this check.
err = errdefs.InvalidParameter(errors.Errorf("unsupported filter type: %T", f))
}
if err != nil {
return nil, nil, &OpErr{Err: err, Op: "list"}
}
var out []volume.Volume
for _, v := range vols {
@ -222,26 +354,59 @@ func (s *VolumeStore) List() ([]volume.Volume, []string, error) {
return out, warnings, nil
}
type filterFunc func(volume.Volume) bool
func filter(vols *[]volume.Volume, fn filterFunc) {
var evict []int
for i, v := range *vols {
if !fn(v) {
evict = append(evict, i)
}
}
for n, i := range evict {
copy((*vols)[i-n:], (*vols)[i-n+1:])
(*vols)[len(*vols)-1] = nil
*vols = (*vols)[:len(*vols)-1]
}
}
// list goes through each volume driver and asks for its list of volumes.
func (s *VolumeStore) list() ([]volume.Volume, []string, error) {
// TODO(@cpuguy83): plumb context through
func (s *VolumeStore) list(ctx context.Context, driverNames ...string) ([]volume.Volume, []string, error) {
var (
ls []volume.Volume
ls = []volume.Volume{} // do not return a nil value as this affects filtering
warnings []string
)
drivers, err := s.drivers.GetAllDrivers()
var dls []volume.Driver
all, err := s.drivers.GetAllDrivers()
if err != nil {
return nil, nil, err
}
if len(driverNames) == 0 {
dls = all
} else {
idx := make(map[string]bool, len(driverNames))
for _, name := range driverNames {
idx[name] = true
}
for _, d := range all {
if idx[d.Name()] {
dls = append(dls, d)
}
}
}
type vols struct {
vols []volume.Volume
err error
driverName string
}
chVols := make(chan vols, len(drivers))
chVols := make(chan vols, len(dls))
for _, vd := range drivers {
for _, vd := range dls {
go func(d volume.Driver) {
vs, err := d.List()
if err != nil {
@ -259,13 +424,12 @@ func (s *VolumeStore) list() ([]volume.Volume, []string, error) {
}
badDrivers := make(map[string]struct{})
for i := 0; i < len(drivers); i++ {
for i := 0; i < len(dls); i++ {
vs := <-chVols
if vs.err != nil {
warnings = append(warnings, vs.err.Error())
badDrivers[vs.driverName] = struct{}{}
logrus.Warn(vs.err)
}
ls = append(ls, vs.vols...)
}
@ -282,14 +446,26 @@ func (s *VolumeStore) list() ([]volume.Volume, []string, error) {
return ls, warnings, nil
}
// CreateWithRef creates a volume with the given name and driver and stores the ref
// This ensures there's no race between creating a volume and then storing a reference.
func (s *VolumeStore) CreateWithRef(name, driverName, ref string, opts, labels map[string]string) (volume.Volume, error) {
// Create creates a volume with the given name and driver
// If the volume needs to be created with a reference to prevent race conditions
// with volume cleanup, make sure to use the `CreateWithReference` option.
func (s *VolumeStore) Create(ctx context.Context, name, driverName string, createOpts ...opts.CreateOption) (volume.Volume, error) {
var cfg opts.CreateConfig
for _, o := range createOpts {
o(&cfg)
}
name = normalizeVolumeName(name)
s.locks.Lock(name)
defer s.locks.Unlock(name)
v, err := s.create(name, driverName, opts, labels)
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
v, err := s.create(ctx, name, driverName, cfg.Options, cfg.Labels)
if err != nil {
if _, ok := err.(*OpErr); ok {
return nil, err
@ -297,16 +473,10 @@ func (s *VolumeStore) CreateWithRef(name, driverName, ref string, opts, labels m
return nil, &OpErr{Err: err, Name: name, Op: "create"}
}
s.setNamed(v, ref)
s.setNamed(v, cfg.Reference)
return v, nil
}
// Create creates a volume with the given name and driver.
// This is just like CreateWithRef() except we don't store a reference while holding the lock.
func (s *VolumeStore) Create(name, driverName string, opts, labels map[string]string) (volume.Volume, error) {
return s.CreateWithRef(name, driverName, "", opts, labels)
}
// checkConflict checks the local cache for name collisions with the passed in name,
// for existing volumes with the same name but in a different driver.
// This is used by `Create` as a best effort to prevent name collisions for volumes.
@ -320,7 +490,7 @@ func (s *VolumeStore) Create(name, driverName string, opts, labels map[string]st
// TODO(cpuguy83): With v2 plugins this shouldn't be a problem. Could also potentially
// use a connect timeout for this kind of check to ensure we aren't blocking for a
// long time.
func (s *VolumeStore) checkConflict(name, driverName string) (volume.Volume, error) {
func (s *VolumeStore) checkConflict(ctx context.Context, name, driverName string) (volume.Volume, error) {
// check the local cache
v, _ := s.getNamed(name)
if v == nil {
@ -344,7 +514,7 @@ func (s *VolumeStore) checkConflict(name, driverName string) (volume.Volume, err
// let's check if the found volume ref
// is stale by checking with the driver if it still exists
exists, err := volumeExists(s.drivers, v)
exists, err := volumeExists(ctx, s.drivers, v)
if err != nil {
return nil, errors.Wrapf(errNameConflict, "found reference to volume '%s' in driver '%s', but got an error while checking the driver: %v", name, vDriverName, err)
}
@ -363,14 +533,14 @@ func (s *VolumeStore) checkConflict(name, driverName string) (volume.Volume, err
}
// doesn't exist, so purge it from the cache
s.Purge(name)
s.purge(ctx, name)
return nil, nil
}
// volumeExists returns if the volume is still present in the driver.
// An error is returned if there was an issue communicating with the driver.
func volumeExists(store *drivers.Store, v volume.Volume) (bool, error) {
exists, err := lookupVolume(store, v.DriverName(), v.Name())
func volumeExists(ctx context.Context, store *drivers.Store, v volume.Volume) (bool, error) {
exists, err := lookupVolume(ctx, store, v.DriverName(), v.Name())
if err != nil {
return false, err
}
@ -383,7 +553,7 @@ func volumeExists(store *drivers.Store, v volume.Volume) (bool, error) {
// for the given volume name, an error is returned after checking if the reference is stale.
// If the reference is stale, it will be purged and this create can continue.
// It is expected that callers of this function hold any necessary locks.
func (s *VolumeStore) create(name, driverName string, opts, labels map[string]string) (volume.Volume, error) {
func (s *VolumeStore) create(ctx context.Context, name, driverName string, opts, labels map[string]string) (volume.Volume, error) {
// Validate the name in a platform-specific manner
// volume name validation is specific to the host os and not on container image
@ -394,7 +564,7 @@ func (s *VolumeStore) create(name, driverName string, opts, labels map[string]st
return nil, err
}
v, err := s.checkConflict(name, driverName)
v, err := s.checkConflict(ctx, name, driverName)
if err != nil {
return nil, err
}
@ -409,7 +579,7 @@ func (s *VolumeStore) create(name, driverName string, opts, labels map[string]st
// Since there isn't a specified driver name, let's see if any of the existing drivers have this volume name
if driverName == "" {
v, _ = s.getVolume(name)
v, _ = s.getVolume(ctx, name, "")
if v != nil {
return v, nil
}
@ -453,61 +623,57 @@ func (s *VolumeStore) create(name, driverName string, opts, labels map[string]st
return volumeWrapper{v, labels, vd.Scope(), opts}, nil
}
// GetWithRef gets a volume with the given name from the passed in driver and stores the ref
// This is just like Get(), but we store the reference while holding the lock.
// This makes sure there are no races between checking for the existence of a volume and adding a reference for it
func (s *VolumeStore) GetWithRef(name, driverName, ref string) (volume.Volume, error) {
name = normalizeVolumeName(name)
s.locks.Lock(name)
defer s.locks.Unlock(name)
if driverName == "" {
driverName = volume.DefaultDriverName
}
vd, err := s.drivers.GetDriver(driverName)
if err != nil {
return nil, &OpErr{Err: err, Name: name, Op: "get"}
}
v, err := vd.Get(name)
if err != nil {
return nil, &OpErr{Err: err, Name: name, Op: "get"}
}
s.setNamed(v, ref)
s.globalLock.RLock()
defer s.globalLock.RUnlock()
return volumeWrapper{v, s.labels[name], vd.Scope(), s.options[name]}, nil
}
// Get looks if a volume with the given name exists and returns it if so
func (s *VolumeStore) Get(name string) (volume.Volume, error) {
func (s *VolumeStore) Get(ctx context.Context, name string, getOptions ...opts.GetOption) (volume.Volume, error) {
var cfg opts.GetConfig
for _, o := range getOptions {
o(&cfg)
}
name = normalizeVolumeName(name)
s.locks.Lock(name)
defer s.locks.Unlock(name)
v, err := s.getVolume(name)
v, err := s.getVolume(ctx, name, cfg.Driver)
if err != nil {
return nil, &OpErr{Err: err, Name: name, Op: "get"}
}
s.setNamed(v, "")
if cfg.Driver != "" && v.DriverName() != cfg.Driver {
return nil, &OpErr{Name: name, Op: "get", Err: errdefs.Conflict(errors.New("found volume driver does not match passed in driver"))}
}
s.setNamed(v, cfg.Reference)
return v, nil
}
// getVolume requests the volume, if the driver info is stored it just accesses that driver,
// if the driver is unknown it probes all drivers until it finds the first volume with that name.
// it is expected that callers of this function hold any necessary locks
func (s *VolumeStore) getVolume(name string) (volume.Volume, error) {
func (s *VolumeStore) getVolume(ctx context.Context, name, driverName string) (volume.Volume, error) {
var meta volumeMetadata
meta, err := s.getMeta(name)
if err != nil {
return nil, err
}
driverName := meta.Driver
if driverName != "" {
if meta.Driver == "" {
meta.Driver = driverName
}
if driverName != meta.Driver {
return nil, errdefs.Conflict(errors.New("provided volume driver does not match stored driver"))
}
}
if driverName == "" {
driverName = meta.Driver
}
if driverName == "" {
s.globalLock.RLock()
select {
case <-ctx.Done():
s.globalLock.RUnlock()
return nil, ctx.Err()
default:
}
v, exists := s.names[name]
s.globalLock.RUnlock()
if exists {
@ -519,12 +685,12 @@ func (s *VolumeStore) getVolume(name string) (volume.Volume, error) {
}
if meta.Driver != "" {
vol, err := lookupVolume(s.drivers, meta.Driver, name)
vol, err := lookupVolume(ctx, s.drivers, meta.Driver, name)
if err != nil {
return nil, err
}
if vol == nil {
s.Purge(name)
s.purge(ctx, name)
return nil, errNoSuchVolume
}
@ -543,6 +709,11 @@ func (s *VolumeStore) getVolume(name string) (volume.Volume, error) {
}
for _, d := range drivers {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
v, err := d.Get(name)
if err != nil || v == nil {
continue
@ -561,7 +732,8 @@ func (s *VolumeStore) getVolume(name string) (volume.Volume, error) {
// If the driver returns an error that is not communication related the
// error is logged but not returned.
// If the volume is not found it will return `nil, nil``
func lookupVolume(store *drivers.Store, driverName, volumeName string) (volume.Volume, error) {
// TODO(@cpuguy83): plumb through the context to lower level components
func lookupVolume(ctx context.Context, store *drivers.Store, driverName, volumeName string) (volume.Volume, error) {
if driverName == "" {
driverName = volume.DefaultDriverName
}
@ -582,19 +754,35 @@ func lookupVolume(store *drivers.Store, driverName, volumeName string) (volume.V
// At this point, the error could be anything from the driver, such as "no such volume"
// Let's not check an error here, and instead check if the driver returned a volume
logrus.WithError(err).WithField("driver", driverName).WithField("volume", volumeName).Warnf("Error while looking up volume")
logrus.WithError(err).WithField("driver", driverName).WithField("volume", volumeName).Debug("Error while looking up volume")
}
return v, nil
}
// Remove removes the requested volume. A volume is not removed if it has any refs
func (s *VolumeStore) Remove(v volume.Volume) error {
name := normalizeVolumeName(v.Name())
func (s *VolumeStore) Remove(ctx context.Context, v volume.Volume, rmOpts ...opts.RemoveOption) error {
var cfg opts.RemoveConfig
for _, o := range rmOpts {
o(&cfg)
}
name := v.Name()
s.locks.Lock(name)
defer s.locks.Unlock(name)
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if s.hasRef(name) {
return &OpErr{Err: errVolumeInUse, Name: v.Name(), Op: "remove", Refs: s.getRefs(name)}
return &OpErr{Err: errVolumeInUse, Name: name, Op: "remove", Refs: s.getRefs(name)}
}
v, err := s.getVolume(ctx, name, v.DriverName())
if err != nil {
return err
}
vd, err := s.drivers.GetDriver(v.DriverName())
@ -604,85 +792,55 @@ func (s *VolumeStore) Remove(v volume.Volume) error {
logrus.Debugf("Removing volume reference: driver %s, name %s", v.DriverName(), name)
vol := unwrapVolume(v)
if err := vd.Remove(vol); err != nil {
return &OpErr{Err: err, Name: name, Op: "remove"}
err = vd.Remove(vol)
if err != nil {
err = &OpErr{Err: err, Name: name, Op: "remove"}
}
s.Purge(name)
return nil
if err == nil || cfg.PurgeOnError {
if e := s.purge(ctx, name); e != nil && err == nil {
err = e
}
}
return err
}
// Dereference removes the specified reference to the volume
func (s *VolumeStore) Dereference(v volume.Volume, ref string) {
name := v.Name()
// Release releases the specified reference to the volume
func (s *VolumeStore) Release(ctx context.Context, name string, ref string) error {
s.locks.Lock(name)
defer s.locks.Unlock(name)
select {
case <-ctx.Done():
return ctx.Err()
default:
}
s.globalLock.Lock()
defer s.globalLock.Unlock()
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if s.refs[name] != nil {
delete(s.refs[name], ref)
}
return nil
}
// Refs gets the current list of refs for the given volume
func (s *VolumeStore) Refs(v volume.Volume) []string {
name := v.Name()
// CountReferences gives a count of all references for a given volume.
func (s *VolumeStore) CountReferences(v volume.Volume) int {
name := normalizeVolumeName(v.Name())
s.locks.Lock(name)
defer s.locks.Unlock(name)
s.globalLock.Lock()
defer s.globalLock.Unlock()
return s.getRefs(name)
}
// FilterByDriver returns the available volumes filtered by driver name
func (s *VolumeStore) FilterByDriver(name string) ([]volume.Volume, error) {
vd, err := s.drivers.GetDriver(name)
if err != nil {
return nil, &OpErr{Err: err, Name: name, Op: "list"}
}
ls, err := vd.List()
if err != nil {
return nil, &OpErr{Err: err, Name: name, Op: "list"}
}
for i, v := range ls {
options := map[string]string{}
s.globalLock.RLock()
for key, value := range s.options[v.Name()] {
options[key] = value
}
ls[i] = volumeWrapper{v, s.labels[v.Name()], vd.Scope(), options}
s.globalLock.RUnlock()
}
return ls, nil
}
// FilterByUsed returns the available volumes filtered by if they are in use or not.
// `used=true` returns only volumes that are being used, while `used=false` returns
// only volumes that are not being used.
func (s *VolumeStore) FilterByUsed(vols []volume.Volume, used bool) []volume.Volume {
return s.filter(vols, func(v volume.Volume) bool {
s.locks.Lock(v.Name())
hasRef := s.hasRef(v.Name())
s.locks.Unlock(v.Name())
return used == hasRef
})
}
// filterFunc defines a function to allow filter volumes in the store
type filterFunc func(vol volume.Volume) bool
// filter returns the available volumes filtered by a filterFunc function
func (s *VolumeStore) filter(vols []volume.Volume, f filterFunc) []volume.Volume {
var ls []volume.Volume
for _, v := range vols {
if f(v) {
ls = append(ls, v)
}
}
return ls
return len(s.refs[name])
}
func unwrapVolume(v volume.Volume) volume.Volume {
@ -698,10 +856,3 @@ func unwrapVolume(v volume.Volume) volume.Volume {
func (s *VolumeStore) Shutdown() error {
return s.db.Close()
}
// GetDriverList gets the list of volume drivers from the configured volume driver
// store.
// TODO(@cpuguy83): This should be factored out into a separate service.
func (s *VolumeStore) GetDriverList() []string {
return s.drivers.GetDriverList()
}

View file

@ -1,6 +1,7 @@
package store // import "github.com/docker/docker/volume/store"
package service // import "github.com/docker/docker/volume/service"
import (
"context"
"errors"
"fmt"
"io/ioutil"
@ -11,6 +12,7 @@ import (
"github.com/docker/docker/volume"
volumedrivers "github.com/docker/docker/volume/drivers"
"github.com/docker/docker/volume/service/opts"
volumetestutils "github.com/docker/docker/volume/testutils"
"github.com/google/go-cmp/cmp"
"github.com/gotestyourself/gotestyourself/assert"
@ -24,22 +26,23 @@ func TestCreate(t *testing.T) {
defer cleanup()
s.drivers.Register(volumetestutils.NewFakeDriver("fake"), "fake")
v, err := s.Create("fake1", "fake", nil, nil)
ctx := context.Background()
v, err := s.Create(ctx, "fake1", "fake")
if err != nil {
t.Fatal(err)
}
if v.Name() != "fake1" {
t.Fatalf("Expected fake1 volume, got %v", v)
}
if l, _, _ := s.List(); len(l) != 1 {
if l, _, _ := s.Find(ctx, nil); len(l) != 1 {
t.Fatalf("Expected 1 volume in the store, got %v: %v", len(l), l)
}
if _, err := s.Create("none", "none", nil, nil); err == nil {
if _, err := s.Create(ctx, "none", "none"); err == nil {
t.Fatalf("Expected unknown driver error, got nil")
}
_, err = s.Create("fakeerror", "fake", map[string]string{"error": "create error"}, nil)
_, err = s.Create(ctx, "fakeerror", "fake", opts.WithCreateOptions(map[string]string{"error": "create error"}))
expected := &OpErr{Op: "create", Name: "fakeerror", Err: errors.New("create error")}
if err != nil && err.Error() != expected.Error() {
t.Fatalf("Expected create fakeError: create error, got %v", err)
@ -55,25 +58,28 @@ func TestRemove(t *testing.T) {
s.drivers.Register(volumetestutils.NewFakeDriver("fake"), "fake")
s.drivers.Register(volumetestutils.NewFakeDriver("noop"), "noop")
ctx := context.Background()
// doing string compare here since this error comes directly from the driver
expected := "no such volume"
if err := s.Remove(volumetestutils.NoopVolume{}); err == nil || !strings.Contains(err.Error(), expected) {
var v volume.Volume = volumetestutils.NoopVolume{}
if err := s.Remove(ctx, v); err == nil || !strings.Contains(err.Error(), expected) {
t.Fatalf("Expected error %q, got %v", expected, err)
}
v, err := s.CreateWithRef("fake1", "fake", "fake", nil, nil)
v, err := s.Create(ctx, "fake1", "fake", opts.WithCreateReference("fake"))
if err != nil {
t.Fatal(err)
}
if err := s.Remove(v); !IsInUse(err) {
if err := s.Remove(ctx, v); !IsInUse(err) {
t.Fatalf("Expected ErrVolumeInUse error, got %v", err)
}
s.Dereference(v, "fake")
if err := s.Remove(v); err != nil {
s.Release(ctx, v.Name(), "fake")
if err := s.Remove(ctx, v); err != nil {
t.Fatal(err)
}
if l, _, _ := s.List(); len(l) != 0 {
if l, _, _ := s.Find(ctx, nil); len(l) != 0 {
t.Fatalf("Expected 0 volumes in the store, got %v, %v", len(l), l)
}
}
@ -89,17 +95,18 @@ func TestList(t *testing.T) {
drivers.Register(volumetestutils.NewFakeDriver("fake"), "fake")
drivers.Register(volumetestutils.NewFakeDriver("fake2"), "fake2")
s, err := New(dir, drivers)
s, err := NewStore(dir, drivers)
assert.NilError(t, err)
if _, err := s.Create("test", "fake", nil, nil); err != nil {
ctx := context.Background()
if _, err := s.Create(ctx, "test", "fake"); err != nil {
t.Fatal(err)
}
if _, err := s.Create("test2", "fake2", nil, nil); err != nil {
if _, err := s.Create(ctx, "test2", "fake2"); err != nil {
t.Fatal(err)
}
ls, _, err := s.List()
ls, _, err := s.Find(ctx, nil)
if err != nil {
t.Fatal(err)
}
@ -111,11 +118,11 @@ func TestList(t *testing.T) {
}
// and again with a new store
s, err = New(dir, drivers)
s, err = NewStore(dir, drivers)
if err != nil {
t.Fatal(err)
}
ls, _, err = s.List()
ls, _, err = s.Find(ctx, nil)
if err != nil {
t.Fatal(err)
}
@ -124,34 +131,38 @@ func TestList(t *testing.T) {
}
}
func TestFilterByDriver(t *testing.T) {
func TestFindByDriver(t *testing.T) {
t.Parallel()
s, cleanup := setupTest(t)
defer cleanup()
s.drivers.Register(volumetestutils.NewFakeDriver("fake"), "fake")
s.drivers.Register(volumetestutils.NewFakeDriver("noop"), "noop")
assert.Assert(t, s.drivers.Register(volumetestutils.NewFakeDriver("fake"), "fake"))
assert.Assert(t, s.drivers.Register(volumetestutils.NewFakeDriver("noop"), "noop"))
if _, err := s.Create("fake1", "fake", nil, nil); err != nil {
t.Fatal(err)
}
if _, err := s.Create("fake2", "fake", nil, nil); err != nil {
t.Fatal(err)
}
if _, err := s.Create("fake3", "noop", nil, nil); err != nil {
t.Fatal(err)
}
ctx := context.Background()
_, err := s.Create(ctx, "fake1", "fake")
assert.NilError(t, err)
if l, _ := s.FilterByDriver("fake"); len(l) != 2 {
t.Fatalf("Expected 2 volumes, got %v, %v", len(l), l)
}
_, err = s.Create(ctx, "fake2", "fake")
assert.NilError(t, err)
if l, _ := s.FilterByDriver("noop"); len(l) != 1 {
t.Fatalf("Expected 1 volume, got %v, %v", len(l), l)
}
_, err = s.Create(ctx, "fake3", "noop")
assert.NilError(t, err)
l, _, err := s.Find(ctx, ByDriver("fake"))
assert.NilError(t, err)
assert.Equal(t, len(l), 2)
l, _, err = s.Find(ctx, ByDriver("noop"))
assert.NilError(t, err)
assert.Equal(t, len(l), 1)
l, _, err = s.Find(ctx, ByDriver("nosuchdriver"))
assert.NilError(t, err)
assert.Equal(t, len(l), 0)
}
func TestFilterByUsed(t *testing.T) {
func TestFindByReferenced(t *testing.T) {
t.Parallel()
s, cleanup := setupTest(t)
defer cleanup()
@ -159,33 +170,23 @@ func TestFilterByUsed(t *testing.T) {
s.drivers.Register(volumetestutils.NewFakeDriver("fake"), "fake")
s.drivers.Register(volumetestutils.NewFakeDriver("noop"), "noop")
if _, err := s.CreateWithRef("fake1", "fake", "volReference", nil, nil); err != nil {
ctx := context.Background()
if _, err := s.Create(ctx, "fake1", "fake", opts.WithCreateReference("volReference")); err != nil {
t.Fatal(err)
}
if _, err := s.Create("fake2", "fake", nil, nil); err != nil {
if _, err := s.Create(ctx, "fake2", "fake"); err != nil {
t.Fatal(err)
}
vols, _, err := s.List()
if err != nil {
t.Fatal(err)
}
dangling, _, err := s.Find(ctx, ByReferenced(false))
assert.Assert(t, err)
assert.Assert(t, len(dangling) == 1)
assert.Check(t, dangling[0].Name() == "fake2")
dangling := s.FilterByUsed(vols, false)
if len(dangling) != 1 {
t.Fatalf("expected 1 dangling volume, got %v", len(dangling))
}
if dangling[0].Name() != "fake2" {
t.Fatalf("expected dangling volume fake2, got %s", dangling[0].Name())
}
used := s.FilterByUsed(vols, true)
if len(used) != 1 {
t.Fatalf("expected 1 used volume, got %v", len(used))
}
if used[0].Name() != "fake1" {
t.Fatalf("expected used volume fake1, got %s", used[0].Name())
}
used, _, err := s.Find(ctx, ByReferenced(true))
assert.Assert(t, err)
assert.Assert(t, len(used) == 1)
assert.Check(t, used[0].Name() == "fake1")
}
func TestDerefMultipleOfSameRef(t *testing.T) {
@ -194,17 +195,18 @@ func TestDerefMultipleOfSameRef(t *testing.T) {
defer cleanup()
s.drivers.Register(volumetestutils.NewFakeDriver("fake"), "fake")
v, err := s.CreateWithRef("fake1", "fake", "volReference", nil, nil)
ctx := context.Background()
v, err := s.Create(ctx, "fake1", "fake", opts.WithCreateReference("volReference"))
if err != nil {
t.Fatal(err)
}
if _, err := s.GetWithRef("fake1", "fake", "volReference"); err != nil {
if _, err := s.Get(ctx, "fake1", opts.WithGetDriver("fake"), opts.WithGetReference("volReference")); err != nil {
t.Fatal(err)
}
s.Dereference(v, "volReference")
if err := s.Remove(v); err != nil {
s.Release(ctx, v.Name(), "volReference")
if err := s.Remove(ctx, v); err != nil {
t.Fatal(err)
}
}
@ -222,7 +224,8 @@ func TestCreateKeepOptsLabelsWhenExistsRemotely(t *testing.T) {
t.Fatal(err)
}
v, err := s.Create("foo", "fake", nil, map[string]string{"hello": "world"})
ctx := context.Background()
v, err := s.Create(ctx, "foo", "fake", opts.WithCreateLabels(map[string]string{"hello": "world"}))
if err != nil {
t.Fatal(err)
}
@ -265,14 +268,15 @@ func TestDefererencePluginOnCreateError(t *testing.T) {
pg := volumetestutils.NewFakePluginGetter(p)
s.drivers = volumedrivers.NewStore(pg)
ctx := context.Background()
// create a good volume so we have a plugin reference
_, err = s.Create("fake1", d.Name(), nil, nil)
_, err = s.Create(ctx, "fake1", d.Name())
if err != nil {
t.Fatal(err)
}
// Now create another one expecting an error
_, err = s.Create("fake2", d.Name(), map[string]string{"error": "some error"}, nil)
_, err = s.Create(ctx, "fake2", d.Name(), opts.WithCreateOptions(map[string]string{"error": "some error"}))
if err == nil || !strings.Contains(err.Error(), "some error") {
t.Fatalf("expected an error on create: %v", err)
}
@ -291,15 +295,16 @@ func TestRefDerefRemove(t *testing.T) {
defer cleanup()
s.drivers.Register(volumetestutils.NewFakeDriver(driverName), driverName)
v, err := s.CreateWithRef("test", driverName, "test-ref", nil, nil)
ctx := context.Background()
v, err := s.Create(ctx, "test", driverName, opts.WithCreateReference("test-ref"))
assert.NilError(t, err)
err = s.Remove(v)
err = s.Remove(ctx, v)
assert.Assert(t, is.ErrorContains(err, ""))
assert.Equal(t, errVolumeInUse, err.(*OpErr).Err)
s.Dereference(v, "test-ref")
err = s.Remove(v)
s.Release(ctx, v.Name(), "test-ref")
err = s.Remove(ctx, v)
assert.NilError(t, err)
}
@ -311,25 +316,26 @@ func TestGet(t *testing.T) {
defer cleanup()
s.drivers.Register(volumetestutils.NewFakeDriver(driverName), driverName)
_, err := s.Get("not-exist")
ctx := context.Background()
_, err := s.Get(ctx, "not-exist")
assert.Assert(t, is.ErrorContains(err, ""))
assert.Equal(t, errNoSuchVolume, err.(*OpErr).Err)
v1, err := s.Create("test", driverName, nil, map[string]string{"a": "1"})
v1, err := s.Create(ctx, "test", driverName, opts.WithCreateLabels(map[string]string{"a": "1"}))
assert.NilError(t, err)
v2, err := s.Get("test")
v2, err := s.Get(ctx, "test")
assert.NilError(t, err)
assert.DeepEqual(t, v1, v2, cmpVolume)
dv := v2.(volume.DetailedVolume)
assert.Equal(t, "1", dv.Labels()["a"])
err = s.Remove(v1)
err = s.Remove(ctx, v1)
assert.NilError(t, err)
}
func TestGetWithRef(t *testing.T) {
func TestGetWithReference(t *testing.T) {
t.Parallel()
driverName := "test-get-with-ref"
@ -337,22 +343,23 @@ func TestGetWithRef(t *testing.T) {
defer cleanup()
s.drivers.Register(volumetestutils.NewFakeDriver(driverName), driverName)
_, err := s.GetWithRef("not-exist", driverName, "test-ref")
ctx := context.Background()
_, err := s.Get(ctx, "not-exist", opts.WithGetDriver(driverName), opts.WithGetReference("test-ref"))
assert.Assert(t, is.ErrorContains(err, ""))
v1, err := s.Create("test", driverName, nil, map[string]string{"a": "1"})
v1, err := s.Create(ctx, "test", driverName, opts.WithCreateLabels(map[string]string{"a": "1"}))
assert.NilError(t, err)
v2, err := s.GetWithRef("test", driverName, "test-ref")
v2, err := s.Get(ctx, "test", opts.WithGetDriver(driverName), opts.WithGetReference("test-ref"))
assert.NilError(t, err)
assert.DeepEqual(t, v1, v2, cmpVolume)
err = s.Remove(v2)
err = s.Remove(ctx, v2)
assert.Assert(t, is.ErrorContains(err, ""))
assert.Equal(t, errVolumeInUse, err.(*OpErr).Err)
s.Dereference(v2, "test-ref")
err = s.Remove(v2)
s.Release(ctx, v2.Name(), "test-ref")
err = s.Remove(ctx, v2)
assert.NilError(t, err)
}
@ -366,14 +373,49 @@ func setupTest(t *testing.T) (*VolumeStore, func()) {
assert.NilError(t, err)
cleanup := func() {
t.Helper()
err := os.RemoveAll(dir)
assert.Check(t, err)
}
s, err := New(dir, volumedrivers.NewStore(nil))
s, err := NewStore(dir, volumedrivers.NewStore(nil))
assert.Check(t, err)
return s, func() {
s.Shutdown()
cleanup()
}
}
func TestFilterFunc(t *testing.T) {
testDriver := volumetestutils.NewFakeDriver("test")
testVolume, err := testDriver.Create("test", nil)
assert.NilError(t, err)
testVolume2, err := testDriver.Create("test2", nil)
assert.NilError(t, err)
testVolume3, err := testDriver.Create("test3", nil)
assert.NilError(t, err)
for _, test := range []struct {
vols []volume.Volume
fn filterFunc
desc string
expect []volume.Volume
}{
{desc: "test nil list", vols: nil, expect: nil, fn: func(volume.Volume) bool { return true }},
{desc: "test empty list", vols: []volume.Volume{}, expect: []volume.Volume{}, fn: func(volume.Volume) bool { return true }},
{desc: "test filter non-empty to empty", vols: []volume.Volume{testVolume}, expect: []volume.Volume{}, fn: func(volume.Volume) bool { return false }},
{desc: "test nothing to fitler non-empty list", vols: []volume.Volume{testVolume}, expect: []volume.Volume{testVolume}, fn: func(volume.Volume) bool { return true }},
{desc: "test filter some", vols: []volume.Volume{testVolume, testVolume2}, expect: []volume.Volume{testVolume}, fn: func(v volume.Volume) bool { return v.Name() == testVolume.Name() }},
{desc: "test filter middle", vols: []volume.Volume{testVolume, testVolume2, testVolume3}, expect: []volume.Volume{testVolume, testVolume3}, fn: func(v volume.Volume) bool { return v.Name() != testVolume2.Name() }},
{desc: "test filter middle and last", vols: []volume.Volume{testVolume, testVolume2, testVolume3}, expect: []volume.Volume{testVolume}, fn: func(v volume.Volume) bool { return v.Name() != testVolume2.Name() && v.Name() != testVolume3.Name() }},
{desc: "test filter first and last", vols: []volume.Volume{testVolume, testVolume2, testVolume3}, expect: []volume.Volume{testVolume2}, fn: func(v volume.Volume) bool { return v.Name() != testVolume.Name() && v.Name() != testVolume3.Name() }},
} {
t.Run(test.desc, func(t *testing.T) {
test := test
t.Parallel()
filter(&test.vols, test.fn)
assert.DeepEqual(t, test.vols, test.expect, cmpVolume)
})
}
}

View file

@ -1,6 +1,6 @@
// +build linux freebsd
// +build linux freebsd darwin
package store // import "github.com/docker/docker/volume/store"
package service // import "github.com/docker/docker/volume/service"
// normalizeVolumeName is a platform specific function to normalize the name
// of a volume. This is a no-op on Unix-like platforms

View file

@ -1,4 +1,4 @@
package store // import "github.com/docker/docker/volume/store"
package service // import "github.com/docker/docker/volume/service"
import "strings"

View file

@ -64,7 +64,9 @@ func (FakeVolume) Mount(_ string) (string, error) { return "fake", nil }
func (FakeVolume) Unmount(_ string) error { return nil }
// Status provides low-level details about the volume
func (FakeVolume) Status() map[string]interface{} { return nil }
func (FakeVolume) Status() map[string]interface{} {
return map[string]interface{}{"datakey": "datavalue"}
}
// CreatedAt provides the time the volume (directory) was created at
func (FakeVolume) CreatedAt() (time.Time, error) { return time.Now(), nil }