c24c37bd8a
When live-restoring a container the volume driver needs be notified that
there is an active mount for the volume.
Before this change the count is zero until the container stops and the
uint64 overflows pretty much making it so the volume can never be
removed until another daemon restart.
Signed-off-by: Brian Goff <cpuguy83@gmail.com>
(cherry picked from commit 647c2a6cdd
)
Signed-off-by: Bjorn Neergaard <bjorn.neergaard@docker.com>
Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
890 lines
24 KiB
Go
890 lines
24 KiB
Go
package service // import "github.com/docker/docker/volume/service"
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/docker/docker/errdefs"
|
|
"github.com/docker/docker/volume"
|
|
"github.com/docker/docker/volume/drivers"
|
|
volumemounts "github.com/docker/docker/volume/mounts"
|
|
"github.com/docker/docker/volume/service/opts"
|
|
"github.com/moby/locker"
|
|
"github.com/pkg/errors"
|
|
"github.com/sirupsen/logrus"
|
|
bolt "go.etcd.io/bbolt"
|
|
)
|
|
|
|
const (
|
|
volumeDataDir = "volumes"
|
|
)
|
|
|
|
var _ volume.LiveRestorer = (*volumeWrapper)(nil)
|
|
|
|
type volumeWrapper struct {
|
|
volume.Volume
|
|
labels map[string]string
|
|
scope string
|
|
options map[string]string
|
|
}
|
|
|
|
func (v volumeWrapper) Options() map[string]string {
|
|
if v.options == nil {
|
|
return nil
|
|
}
|
|
options := make(map[string]string, len(v.options))
|
|
for key, value := range v.options {
|
|
options[key] = value
|
|
}
|
|
return options
|
|
}
|
|
|
|
func (v volumeWrapper) Labels() map[string]string {
|
|
if v.labels == nil {
|
|
return nil
|
|
}
|
|
|
|
labels := make(map[string]string, len(v.labels))
|
|
for key, value := range v.labels {
|
|
labels[key] = value
|
|
}
|
|
return labels
|
|
}
|
|
|
|
func (v volumeWrapper) Scope() string {
|
|
return v.scope
|
|
}
|
|
|
|
func (v volumeWrapper) CachedPath() string {
|
|
if vv, ok := v.Volume.(interface {
|
|
CachedPath() string
|
|
}); ok {
|
|
return vv.CachedPath()
|
|
}
|
|
return v.Volume.Path()
|
|
}
|
|
|
|
func (v volumeWrapper) LiveRestoreVolume(ctx context.Context, ref string) error {
|
|
if vv, ok := v.Volume.(volume.LiveRestorer); ok {
|
|
return vv.LiveRestoreVolume(ctx, ref)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// StoreOpt sets options for a VolumeStore
|
|
type StoreOpt func(store *VolumeStore) error
|
|
|
|
// NewStore creates a new volume store at the given path
|
|
func NewStore(rootPath string, drivers *drivers.Store, opts ...StoreOpt) (*VolumeStore, error) {
|
|
vs := &VolumeStore{
|
|
locks: &locker.Locker{},
|
|
names: make(map[string]volume.Volume),
|
|
refs: make(map[string]map[string]struct{}),
|
|
labels: make(map[string]map[string]string),
|
|
options: make(map[string]map[string]string),
|
|
drivers: drivers,
|
|
}
|
|
|
|
for _, o := range opts {
|
|
if err := o(vs); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if rootPath != "" {
|
|
// initialize metadata store
|
|
volPath := filepath.Join(rootPath, volumeDataDir)
|
|
if err := os.MkdirAll(volPath, 0750); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var err error
|
|
dbPath := filepath.Join(volPath, "metadata.db")
|
|
vs.db, err = bolt.Open(dbPath, 0600, &bolt.Options{Timeout: 1 * time.Second})
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "error while opening volume store metadata database (%s)", dbPath)
|
|
}
|
|
|
|
// initialize volumes bucket
|
|
if err := vs.db.Update(func(tx *bolt.Tx) error {
|
|
if _, err := tx.CreateBucketIfNotExists(volumeBucketName); err != nil {
|
|
return errors.Wrap(err, "error while setting up volume store metadata database")
|
|
}
|
|
return nil
|
|
}); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
vs.restore()
|
|
|
|
return vs, nil
|
|
}
|
|
|
|
// WithEventLogger configures the VolumeStore with the given VolumeEventLogger
|
|
func WithEventLogger(logger VolumeEventLogger) StoreOpt {
|
|
return func(store *VolumeStore) error {
|
|
store.eventLogger = logger
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (s *VolumeStore) getNamed(name string) (volume.Volume, bool) {
|
|
s.globalLock.RLock()
|
|
v, exists := s.names[name]
|
|
s.globalLock.RUnlock()
|
|
return v, exists
|
|
}
|
|
|
|
func (s *VolumeStore) setNamed(v volume.Volume, ref string) {
|
|
name := v.Name()
|
|
|
|
s.globalLock.Lock()
|
|
s.names[name] = v
|
|
if len(ref) > 0 {
|
|
if s.refs[name] == nil {
|
|
s.refs[name] = make(map[string]struct{})
|
|
}
|
|
s.refs[name][ref] = struct{}{}
|
|
}
|
|
s.globalLock.Unlock()
|
|
}
|
|
|
|
// hasRef returns true if the given name has at least one ref.
|
|
// Callers of this function are expected to hold the name lock.
|
|
func (s *VolumeStore) hasRef(name string) bool {
|
|
s.globalLock.RLock()
|
|
l := len(s.refs[name])
|
|
s.globalLock.RUnlock()
|
|
return l > 0
|
|
}
|
|
|
|
// getRefs gets the list of refs for a given name
|
|
// Callers of this function are expected to hold the name lock.
|
|
func (s *VolumeStore) getRefs(name string) []string {
|
|
s.globalLock.RLock()
|
|
defer s.globalLock.RUnlock()
|
|
|
|
refs := make([]string, 0, len(s.refs[name]))
|
|
for r := range s.refs[name] {
|
|
refs = append(refs, r)
|
|
}
|
|
|
|
return refs
|
|
}
|
|
|
|
// purge allows the cleanup of internal data on docker in case
|
|
// the internal data is out of sync with volumes driver plugins.
|
|
func (s *VolumeStore) purge(ctx context.Context, name string) error {
|
|
s.globalLock.Lock()
|
|
defer s.globalLock.Unlock()
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
v, exists := s.names[name]
|
|
if exists {
|
|
driverName := v.DriverName()
|
|
if _, err := s.drivers.ReleaseDriver(driverName); err != nil {
|
|
logrus.WithError(err).WithField("driver", driverName).Error("Error releasing reference to volume driver")
|
|
}
|
|
}
|
|
if err := s.removeMeta(name); err != nil {
|
|
logrus.Errorf("Error removing volume metadata for volume %q: %v", name, err)
|
|
}
|
|
delete(s.names, name)
|
|
delete(s.refs, name)
|
|
delete(s.labels, name)
|
|
delete(s.options, name)
|
|
return nil
|
|
}
|
|
|
|
// VolumeStore is responsible for storing and reference counting volumes.
|
|
type VolumeStore struct {
|
|
// locks ensures that only one action is being performed on a particular volume at a time without locking the entire store
|
|
// since actions on volumes can be quite slow, this ensures the store is free to handle requests for other volumes.
|
|
locks *locker.Locker
|
|
drivers *drivers.Store
|
|
// globalLock is used to protect access to mutable structures used by the store object
|
|
globalLock sync.RWMutex
|
|
// names stores the volume name -> volume relationship.
|
|
// This is used for making lookups faster so we don't have to probe all drivers
|
|
names map[string]volume.Volume
|
|
// refs stores the volume name and the list of things referencing it
|
|
refs map[string]map[string]struct{}
|
|
// labels stores volume labels for each volume
|
|
labels map[string]map[string]string
|
|
// options stores volume options for each volume
|
|
options map[string]map[string]string
|
|
|
|
db *bolt.DB
|
|
eventLogger VolumeEventLogger
|
|
}
|
|
|
|
func filterByDriver(names []string) filterFunc {
|
|
return func(v volume.Volume) bool {
|
|
for _, name := range names {
|
|
if name == v.DriverName() {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (s *VolumeStore) byReferenced(referenced bool) filterFunc {
|
|
return func(v volume.Volume) bool {
|
|
return s.hasRef(v.Name()) == referenced
|
|
}
|
|
}
|
|
|
|
func (s *VolumeStore) filter(ctx context.Context, vols *[]volume.Volume, by By) (warnings []string, err error) {
|
|
// note that this specifically does not support the `FromList` By type.
|
|
switch f := by.(type) {
|
|
case nil:
|
|
if *vols == nil {
|
|
var ls []volume.Volume
|
|
ls, warnings, err = s.list(ctx)
|
|
if err != nil {
|
|
return warnings, err
|
|
}
|
|
*vols = ls
|
|
}
|
|
case byDriver:
|
|
if *vols != nil {
|
|
filter(vols, filterByDriver([]string(f)))
|
|
return nil, nil
|
|
}
|
|
var ls []volume.Volume
|
|
ls, warnings, err = s.list(ctx, []string(f)...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
*vols = ls
|
|
case ByReferenced:
|
|
// TODO(@cpuguy83): It would be nice to optimize this by looking at the list
|
|
// of referenced volumes, however the locking strategy makes this difficult
|
|
// without either providing inconsistent data or deadlocks.
|
|
if *vols == nil {
|
|
var ls []volume.Volume
|
|
ls, warnings, err = s.list(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
*vols = ls
|
|
}
|
|
filter(vols, s.byReferenced(bool(f)))
|
|
case andCombinator:
|
|
for _, by := range f {
|
|
w, err := s.filter(ctx, vols, by)
|
|
if err != nil {
|
|
return warnings, err
|
|
}
|
|
warnings = append(warnings, w...)
|
|
}
|
|
case orCombinator:
|
|
for _, by := range f {
|
|
switch by.(type) {
|
|
case byDriver:
|
|
var ls []volume.Volume
|
|
w, err := s.filter(ctx, &ls, by)
|
|
if err != nil {
|
|
return warnings, err
|
|
}
|
|
warnings = append(warnings, w...)
|
|
default:
|
|
ls, w, err := s.list(ctx)
|
|
if err != nil {
|
|
return warnings, err
|
|
}
|
|
warnings = append(warnings, w...)
|
|
w, err = s.filter(ctx, &ls, by)
|
|
if err != nil {
|
|
return warnings, err
|
|
}
|
|
warnings = append(warnings, w...)
|
|
*vols = append(*vols, ls...)
|
|
}
|
|
}
|
|
unique(vols)
|
|
case CustomFilter:
|
|
if *vols == nil {
|
|
var ls []volume.Volume
|
|
ls, warnings, err = s.list(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
*vols = ls
|
|
}
|
|
filter(vols, filterFunc(f))
|
|
default:
|
|
return nil, errdefs.InvalidParameter(errors.Errorf("unsupported filter: %T", f))
|
|
}
|
|
return warnings, nil
|
|
}
|
|
|
|
func unique(ls *[]volume.Volume) {
|
|
names := make(map[string]bool, len(*ls))
|
|
filter(ls, func(v volume.Volume) bool {
|
|
if names[v.Name()] {
|
|
return false
|
|
}
|
|
names[v.Name()] = true
|
|
return true
|
|
})
|
|
}
|
|
|
|
// Find lists volumes filtered by the past in filter.
|
|
// If a driver returns a volume that has name which conflicts with another volume from a different driver,
|
|
// the first volume is chosen and the conflicting volume is dropped.
|
|
func (s *VolumeStore) Find(ctx context.Context, by By) (vols []volume.Volume, warnings []string, err error) {
|
|
logrus.WithField("ByType", fmt.Sprintf("%T", by)).WithField("ByValue", fmt.Sprintf("%+v", by)).Debug("VolumeStore.Find")
|
|
switch f := by.(type) {
|
|
case nil, orCombinator, andCombinator, byDriver, ByReferenced, CustomFilter:
|
|
warnings, err = s.filter(ctx, &vols, by)
|
|
case fromList:
|
|
warnings, err = s.filter(ctx, f.ls, f.by)
|
|
default:
|
|
// Really shouldn't be possible, but makes sure that any new By's are added to this check.
|
|
err = errdefs.InvalidParameter(errors.Errorf("unsupported filter type: %T", f))
|
|
}
|
|
if err != nil {
|
|
return nil, nil, &OpErr{Err: err, Op: "list"}
|
|
}
|
|
|
|
var out []volume.Volume
|
|
|
|
for _, v := range vols {
|
|
name := normalizeVolumeName(v.Name())
|
|
|
|
s.locks.Lock(name)
|
|
storedV, exists := s.getNamed(name)
|
|
// Note: it's not safe to populate the cache here because the volume may have been
|
|
// deleted before we acquire a lock on its name
|
|
if exists && storedV.DriverName() != v.DriverName() {
|
|
logrus.Warnf("Volume name %s already exists for driver %s, not including volume returned by %s", v.Name(), storedV.DriverName(), v.DriverName())
|
|
s.locks.Unlock(v.Name())
|
|
continue
|
|
}
|
|
|
|
out = append(out, v)
|
|
s.locks.Unlock(v.Name())
|
|
}
|
|
return out, warnings, nil
|
|
}
|
|
|
|
type filterFunc func(volume.Volume) bool
|
|
|
|
func filter(vols *[]volume.Volume, fn filterFunc) {
|
|
var evict []int
|
|
for i, v := range *vols {
|
|
if !fn(v) {
|
|
evict = append(evict, i)
|
|
}
|
|
}
|
|
|
|
for n, i := range evict {
|
|
copy((*vols)[i-n:], (*vols)[i-n+1:])
|
|
(*vols)[len(*vols)-1] = nil
|
|
*vols = (*vols)[:len(*vols)-1]
|
|
}
|
|
}
|
|
|
|
// list goes through each volume driver and asks for its list of volumes.
|
|
// TODO(@cpuguy83): plumb context through
|
|
func (s *VolumeStore) list(ctx context.Context, driverNames ...string) ([]volume.Volume, []string, error) {
|
|
var (
|
|
ls = []volume.Volume{} // do not return a nil value as this affects filtering
|
|
warnings []string
|
|
)
|
|
|
|
var dls []volume.Driver
|
|
|
|
all, err := s.drivers.GetAllDrivers()
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
if len(driverNames) == 0 {
|
|
dls = all
|
|
} else {
|
|
idx := make(map[string]bool, len(driverNames))
|
|
for _, name := range driverNames {
|
|
idx[name] = true
|
|
}
|
|
for _, d := range all {
|
|
if idx[d.Name()] {
|
|
dls = append(dls, d)
|
|
}
|
|
}
|
|
}
|
|
|
|
type vols struct {
|
|
vols []volume.Volume
|
|
err error
|
|
driverName string
|
|
}
|
|
chVols := make(chan vols, len(dls))
|
|
|
|
for _, vd := range dls {
|
|
go func(d volume.Driver) {
|
|
vs, err := d.List()
|
|
if err != nil {
|
|
chVols <- vols{driverName: d.Name(), err: &OpErr{Err: err, Name: d.Name(), Op: "list"}}
|
|
return
|
|
}
|
|
for i, v := range vs {
|
|
s.globalLock.RLock()
|
|
vs[i] = volumeWrapper{v, s.labels[v.Name()], d.Scope(), s.options[v.Name()]}
|
|
s.globalLock.RUnlock()
|
|
}
|
|
|
|
chVols <- vols{vols: vs}
|
|
}(vd)
|
|
}
|
|
|
|
badDrivers := make(map[string]struct{})
|
|
for i := 0; i < len(dls); i++ {
|
|
vs := <-chVols
|
|
|
|
if vs.err != nil {
|
|
warnings = append(warnings, vs.err.Error())
|
|
badDrivers[vs.driverName] = struct{}{}
|
|
}
|
|
ls = append(ls, vs.vols...)
|
|
}
|
|
|
|
if len(badDrivers) > 0 {
|
|
s.globalLock.RLock()
|
|
for _, v := range s.names {
|
|
if _, exists := badDrivers[v.DriverName()]; exists {
|
|
ls = append(ls, v)
|
|
}
|
|
}
|
|
s.globalLock.RUnlock()
|
|
}
|
|
return ls, warnings, nil
|
|
}
|
|
|
|
// Create creates a volume with the given name and driver
|
|
// If the volume needs to be created with a reference to prevent race conditions
|
|
// with volume cleanup, make sure to use the `CreateWithReference` option.
|
|
func (s *VolumeStore) Create(ctx context.Context, name, driverName string, createOpts ...opts.CreateOption) (volume.Volume, error) {
|
|
var cfg opts.CreateConfig
|
|
for _, o := range createOpts {
|
|
o(&cfg)
|
|
}
|
|
|
|
name = normalizeVolumeName(name)
|
|
s.locks.Lock(name)
|
|
defer s.locks.Unlock(name)
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
default:
|
|
}
|
|
|
|
v, created, err := s.create(ctx, name, driverName, cfg.Options, cfg.Labels)
|
|
if err != nil {
|
|
if _, ok := err.(*OpErr); ok {
|
|
return nil, err
|
|
}
|
|
return nil, &OpErr{Err: err, Name: name, Op: "create"}
|
|
}
|
|
|
|
if created && s.eventLogger != nil {
|
|
s.eventLogger.LogVolumeEvent(v.Name(), "create", map[string]string{"driver": v.DriverName()})
|
|
}
|
|
s.setNamed(v, cfg.Reference)
|
|
return v, nil
|
|
}
|
|
|
|
// checkConflict checks the local cache for name collisions with the passed in name,
|
|
// for existing volumes with the same name but in a different driver.
|
|
// This is used by `Create` as a best effort to prevent name collisions for volumes.
|
|
// If a matching volume is found that is not a conflict that is returned so the caller
|
|
// does not need to perform an additional lookup.
|
|
// When no matching volume is found, both returns will be nil
|
|
//
|
|
// Note: This does not probe all the drivers for name collisions because v1 plugins
|
|
// are very slow, particularly if the plugin is down, and cause other issues,
|
|
// particularly around locking the store.
|
|
// TODO(cpuguy83): With v2 plugins this shouldn't be a problem. Could also potentially
|
|
// use a connect timeout for this kind of check to ensure we aren't blocking for a
|
|
// long time.
|
|
func (s *VolumeStore) checkConflict(ctx context.Context, name, driverName string) (volume.Volume, error) {
|
|
// check the local cache
|
|
v, _ := s.getNamed(name)
|
|
if v == nil {
|
|
return nil, nil
|
|
}
|
|
|
|
vDriverName := v.DriverName()
|
|
var conflict bool
|
|
if driverName != "" {
|
|
// Retrieve canonical driver name to avoid inconsistencies (for example
|
|
// "plugin" vs. "plugin:latest")
|
|
vd, err := s.drivers.GetDriver(driverName)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if vDriverName != vd.Name() {
|
|
conflict = true
|
|
}
|
|
}
|
|
|
|
// let's check if the found volume ref
|
|
// is stale by checking with the driver if it still exists
|
|
exists, err := volumeExists(ctx, s.drivers, v)
|
|
if err != nil {
|
|
return nil, errors.Wrapf(errNameConflict, "found reference to volume '%s' in driver '%s', but got an error while checking the driver: %v", name, vDriverName, err)
|
|
}
|
|
|
|
if exists {
|
|
if conflict {
|
|
return nil, errors.Wrapf(errNameConflict, "driver '%s' already has volume '%s'", vDriverName, name)
|
|
}
|
|
return v, nil
|
|
}
|
|
|
|
if s.hasRef(v.Name()) {
|
|
// Containers are referencing this volume but it doesn't seem to exist anywhere.
|
|
// Return a conflict error here, the user can fix this with `docker volume rm -f`
|
|
return nil, errors.Wrapf(errNameConflict, "found references to volume '%s' in driver '%s' but the volume was not found in the driver -- you may need to remove containers referencing this volume or force remove the volume to re-create it", name, vDriverName)
|
|
}
|
|
|
|
// doesn't exist, so purge it from the cache
|
|
s.purge(ctx, name)
|
|
return nil, nil
|
|
}
|
|
|
|
// volumeExists returns if the volume is still present in the driver.
|
|
// An error is returned if there was an issue communicating with the driver.
|
|
func volumeExists(ctx context.Context, store *drivers.Store, v volume.Volume) (bool, error) {
|
|
exists, err := lookupVolume(ctx, store, v.DriverName(), v.Name())
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return exists != nil, nil
|
|
}
|
|
|
|
// create asks the given driver to create a volume with the name/opts.
|
|
// If a volume with the name is already known, it will ask the stored driver for the volume.
|
|
// If the passed in driver name does not match the driver name which is stored
|
|
// for the given volume name, an error is returned after checking if the reference is stale.
|
|
// If the reference is stale, it will be purged and this create can continue.
|
|
// It is expected that callers of this function hold any necessary locks.
|
|
func (s *VolumeStore) create(ctx context.Context, name, driverName string, opts, labels map[string]string) (volume.Volume, bool, error) {
|
|
// Validate the name in a platform-specific manner
|
|
|
|
// volume name validation is specific to the host os and not on container image
|
|
parser := volumemounts.NewParser()
|
|
err := parser.ValidateVolumeName(name)
|
|
if err != nil {
|
|
return nil, false, err
|
|
}
|
|
|
|
v, err := s.checkConflict(ctx, name, driverName)
|
|
if err != nil {
|
|
return nil, false, err
|
|
}
|
|
|
|
if v != nil {
|
|
// there is an existing volume, if we already have this stored locally, return it.
|
|
// TODO: there could be some inconsistent details such as labels here
|
|
if vv, _ := s.getNamed(v.Name()); vv != nil {
|
|
return vv, false, nil
|
|
}
|
|
}
|
|
|
|
// Since there isn't a specified driver name, let's see if any of the existing drivers have this volume name
|
|
if driverName == "" {
|
|
v, _ = s.getVolume(ctx, name, "")
|
|
if v != nil {
|
|
return v, false, nil
|
|
}
|
|
}
|
|
|
|
if driverName == "" {
|
|
driverName = volume.DefaultDriverName
|
|
}
|
|
vd, err := s.drivers.CreateDriver(driverName)
|
|
if err != nil {
|
|
return nil, false, &OpErr{Op: "create", Name: name, Err: err}
|
|
}
|
|
|
|
logrus.Debugf("Registering new volume reference: driver %q, name %q", vd.Name(), name)
|
|
if v, _ = vd.Get(name); v == nil {
|
|
v, err = vd.Create(name, opts)
|
|
if err != nil {
|
|
if _, err := s.drivers.ReleaseDriver(driverName); err != nil {
|
|
logrus.WithError(err).WithField("driver", driverName).Error("Error releasing reference to volume driver")
|
|
}
|
|
return nil, false, err
|
|
}
|
|
}
|
|
|
|
s.globalLock.Lock()
|
|
s.labels[name] = labels
|
|
s.options[name] = opts
|
|
s.refs[name] = make(map[string]struct{})
|
|
s.globalLock.Unlock()
|
|
|
|
metadata := volumeMetadata{
|
|
Name: name,
|
|
Driver: vd.Name(),
|
|
Labels: labels,
|
|
Options: opts,
|
|
}
|
|
|
|
if err := s.setMeta(name, metadata); err != nil {
|
|
return nil, true, err
|
|
}
|
|
return volumeWrapper{v, labels, vd.Scope(), opts}, true, nil
|
|
}
|
|
|
|
// Get looks if a volume with the given name exists and returns it if so
|
|
func (s *VolumeStore) Get(ctx context.Context, name string, getOptions ...opts.GetOption) (volume.Volume, error) {
|
|
var cfg opts.GetConfig
|
|
for _, o := range getOptions {
|
|
o(&cfg)
|
|
}
|
|
name = normalizeVolumeName(name)
|
|
s.locks.Lock(name)
|
|
defer s.locks.Unlock(name)
|
|
|
|
v, err := s.getVolume(ctx, name, cfg.Driver)
|
|
if err != nil {
|
|
return nil, &OpErr{Err: err, Name: name, Op: "get"}
|
|
}
|
|
if cfg.Driver != "" && v.DriverName() != cfg.Driver {
|
|
return nil, &OpErr{Name: name, Op: "get", Err: errdefs.Conflict(errors.New("found volume driver does not match passed in driver"))}
|
|
}
|
|
s.setNamed(v, cfg.Reference)
|
|
return v, nil
|
|
}
|
|
|
|
// getVolume requests the volume, if the driver info is stored it just accesses that driver,
|
|
// if the driver is unknown it probes all drivers until it finds the first volume with that name.
|
|
// it is expected that callers of this function hold any necessary locks
|
|
func (s *VolumeStore) getVolume(ctx context.Context, name, driverName string) (volume.Volume, error) {
|
|
var meta volumeMetadata
|
|
meta, err := s.getMeta(name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if driverName != "" {
|
|
if meta.Driver == "" {
|
|
meta.Driver = driverName
|
|
}
|
|
if driverName != meta.Driver {
|
|
return nil, errdefs.Conflict(errors.New("provided volume driver does not match stored driver"))
|
|
}
|
|
}
|
|
|
|
if driverName == "" {
|
|
driverName = meta.Driver
|
|
}
|
|
if driverName == "" {
|
|
s.globalLock.RLock()
|
|
select {
|
|
case <-ctx.Done():
|
|
s.globalLock.RUnlock()
|
|
return nil, ctx.Err()
|
|
default:
|
|
}
|
|
v, exists := s.names[name]
|
|
s.globalLock.RUnlock()
|
|
if exists {
|
|
meta.Driver = v.DriverName()
|
|
if err := s.setMeta(name, meta); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
}
|
|
|
|
if meta.Driver != "" {
|
|
vol, err := lookupVolume(ctx, s.drivers, meta.Driver, name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if vol == nil {
|
|
s.purge(ctx, name)
|
|
return nil, errNoSuchVolume
|
|
}
|
|
|
|
var scope string
|
|
vd, err := s.drivers.GetDriver(meta.Driver)
|
|
if err == nil {
|
|
scope = vd.Scope()
|
|
}
|
|
return volumeWrapper{vol, meta.Labels, scope, meta.Options}, nil
|
|
}
|
|
|
|
logrus.Debugf("Probing all drivers for volume with name: %s", name)
|
|
drivers, err := s.drivers.GetAllDrivers()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, d := range drivers {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
default:
|
|
}
|
|
v, err := d.Get(name)
|
|
if err != nil || v == nil {
|
|
continue
|
|
}
|
|
meta.Driver = v.DriverName()
|
|
if err := s.setMeta(name, meta); err != nil {
|
|
return nil, err
|
|
}
|
|
return volumeWrapper{v, meta.Labels, d.Scope(), meta.Options}, nil
|
|
}
|
|
return nil, errNoSuchVolume
|
|
}
|
|
|
|
// lookupVolume gets the specified volume from the specified driver.
|
|
// This will only return errors related to communications with the driver.
|
|
// If the driver returns an error that is not communication related, the error
|
|
// is logged but not returned.
|
|
// If the volume is not found it will return `nil, nil`
|
|
// TODO(@cpuguy83): plumb through the context to lower level components
|
|
func lookupVolume(ctx context.Context, store *drivers.Store, driverName, volumeName string) (volume.Volume, error) {
|
|
if driverName == "" {
|
|
driverName = volume.DefaultDriverName
|
|
}
|
|
vd, err := store.GetDriver(driverName)
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "error while checking if volume %q exists in driver %q", volumeName, driverName)
|
|
}
|
|
v, err := vd.Get(volumeName)
|
|
if err != nil {
|
|
var nErr net.Error
|
|
if errors.As(err, &nErr) {
|
|
if v != nil {
|
|
volumeName = v.Name()
|
|
driverName = v.DriverName()
|
|
}
|
|
return nil, errors.Wrapf(err, "error while checking if volume %q exists in driver %q", volumeName, driverName)
|
|
}
|
|
|
|
// At this point, the error could be anything from the driver, such as "no such volume"
|
|
// Let's not check an error here, and instead check if the driver returned a volume
|
|
logrus.WithError(err).WithField("driver", driverName).WithField("volume", volumeName).Debug("Error while looking up volume")
|
|
}
|
|
return v, nil
|
|
}
|
|
|
|
// Remove removes the requested volume. A volume is not removed if it has any refs
|
|
func (s *VolumeStore) Remove(ctx context.Context, v volume.Volume, rmOpts ...opts.RemoveOption) error {
|
|
var cfg opts.RemoveConfig
|
|
for _, o := range rmOpts {
|
|
o(&cfg)
|
|
}
|
|
|
|
name := v.Name()
|
|
s.locks.Lock(name)
|
|
defer s.locks.Unlock(name)
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
if s.hasRef(name) {
|
|
return &OpErr{Err: errVolumeInUse, Name: name, Op: "remove", Refs: s.getRefs(name)}
|
|
}
|
|
|
|
v, err := s.getVolume(ctx, name, v.DriverName())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
vd, err := s.drivers.GetDriver(v.DriverName())
|
|
if err != nil {
|
|
return &OpErr{Err: err, Name: v.DriverName(), Op: "remove"}
|
|
}
|
|
|
|
logrus.Debugf("Removing volume reference: driver %s, name %s", v.DriverName(), name)
|
|
vol := unwrapVolume(v)
|
|
|
|
err = vd.Remove(vol)
|
|
if err != nil {
|
|
err = &OpErr{Err: err, Name: name, Op: "remove"}
|
|
}
|
|
|
|
if err == nil || cfg.PurgeOnError {
|
|
if e := s.purge(ctx, name); e != nil && err == nil {
|
|
err = e
|
|
}
|
|
}
|
|
if err == nil && s.eventLogger != nil {
|
|
s.eventLogger.LogVolumeEvent(v.Name(), "destroy", map[string]string{"driver": v.DriverName()})
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Release releases the specified reference to the volume
|
|
func (s *VolumeStore) Release(ctx context.Context, name string, ref string) error {
|
|
s.locks.Lock(name)
|
|
defer s.locks.Unlock(name)
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
s.globalLock.Lock()
|
|
defer s.globalLock.Unlock()
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
if s.refs[name] != nil {
|
|
delete(s.refs[name], ref)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// CountReferences gives a count of all references for a given volume.
|
|
func (s *VolumeStore) CountReferences(v volume.Volume) int {
|
|
name := normalizeVolumeName(v.Name())
|
|
|
|
s.locks.Lock(name)
|
|
defer s.locks.Unlock(name)
|
|
s.globalLock.Lock()
|
|
defer s.globalLock.Unlock()
|
|
|
|
return len(s.refs[name])
|
|
}
|
|
|
|
func unwrapVolume(v volume.Volume) volume.Volume {
|
|
if vol, ok := v.(volumeWrapper); ok {
|
|
return vol.Volume
|
|
}
|
|
|
|
return v
|
|
}
|
|
|
|
// Shutdown releases all resources used by the volume store
|
|
// It does not make any changes to volumes, drivers, etc.
|
|
func (s *VolumeStore) Shutdown() error {
|
|
return s.db.Close()
|
|
}
|