Forráskód Böngészése

Fix volume plugin refecounting on daemon restart

Ensures all known volumes (known b/c they are persisted to disk) have
their volume drivers refcounted properly.

In testing this, I found an issue with `--live-restore` (required since
currently the provided volume plugin doesn't keep state on restart)
where restorted plugins did not have a plugin client loaded causing a
panic when trying to use the plugin.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
(cherry picked from commit 6ef1060cd0acb847e06db890abb335faa837a9e2)
Signed-off-by: Victor Vieux <vieux@docker.com>
Brian Goff 8 éve
szülő
commit
1a865dd303

+ 6 - 0
daemon/daemon.go

@@ -794,6 +794,12 @@ func (daemon *Daemon) Shutdown() error {
 		})
 	}
 
+	if daemon.volumes != nil {
+		if err := daemon.volumes.Shutdown(); err != nil {
+			logrus.Errorf("Error shutting down volume store: %v", err)
+		}
+	}
+
 	if daemon.layerStore != nil {
 		if err := daemon.layerStore.Cleanup(); err != nil {
 			logrus.Errorf("Error during layer Store.Cleanup(): %v", err)

+ 21 - 0
integration-cli/docker_cli_daemon_plugins_test.go

@@ -275,6 +275,27 @@ func (s *DockerDaemonSuite) TestGraphdriverPlugin(c *check.C) {
 	c.Assert(err, checker.IsNil, check.Commentf(out))
 }
 
+func (s *DockerDaemonSuite) TestPluginVolumeRemoveOnRestart(c *check.C) {
+	testRequires(c, DaemonIsLinux, Network, IsAmd64)
+
+	s.d.Start("--live-restore=true")
+
+	out, err := s.d.Cmd("plugin", "install", "--grant-all-permissions", pName)
+	c.Assert(err, checker.IsNil, check.Commentf(out))
+	c.Assert(strings.TrimSpace(out), checker.Contains, pName)
+
+	out, err = s.d.Cmd("volume", "create", "--driver", pName, "test")
+	c.Assert(err, checker.IsNil, check.Commentf(out))
+
+	s.d.Restart("--live-restore=true")
+
+	out, err = s.d.Cmd("plugin", "disable", pName)
+	c.Assert(err, checker.IsNil, check.Commentf(out))
+	out, err = s.d.Cmd("plugin", "rm", pName)
+	c.Assert(err, checker.NotNil, check.Commentf(out))
+	c.Assert(out, checker.Contains, "in use")
+}
+
 func existsMountpointWithPrefix(mountpointPrefix string) (bool, error) {
 	mounts, err := mount.GetMounts()
 	if err != nil {

+ 1 - 1
integration-cli/docker_cli_daemon_test.go

@@ -1343,7 +1343,7 @@ func (s *DockerDaemonSuite) TestDaemonWithWrongkey(c *check.C) {
 	content, _ := ioutil.ReadFile(s.d.logFile.Name())
 
 	if !strings.Contains(string(content), "Public Key ID does not match") {
-		c.Fatal("Missing KeyID message from daemon logs")
+		c.Fatalf("Missing KeyID message from daemon logs: %s", string(content))
 	}
 }
 

+ 78 - 0
volume/store/db.go

@@ -0,0 +1,78 @@
+package store
+
+import (
+	"encoding/json"
+
+	"github.com/boltdb/bolt"
+	"github.com/pkg/errors"
+)
+
+var volumeBucketName = []byte("volumes")
+
+type dbEntry struct {
+	Key   []byte
+	Value []byte
+}
+
+type volumeMetadata struct {
+	Name    string
+	Driver  string
+	Labels  map[string]string
+	Options map[string]string
+}
+
+func (s *VolumeStore) setMeta(name string, meta volumeMetadata) error {
+	return s.db.Update(func(tx *bolt.Tx) error {
+		return setMeta(tx, name, meta)
+	})
+}
+
+func setMeta(tx *bolt.Tx, name string, meta volumeMetadata) error {
+	metaJSON, err := json.Marshal(meta)
+	if err != nil {
+		return err
+	}
+	b := tx.Bucket(volumeBucketName)
+	return errors.Wrap(b.Put([]byte(name), metaJSON), "error setting volume metadata")
+}
+
+func (s *VolumeStore) getMeta(name string) (volumeMetadata, error) {
+	var meta volumeMetadata
+	err := s.db.View(func(tx *bolt.Tx) error {
+		return getMeta(tx, name, &meta)
+	})
+	return meta, err
+}
+
+func getMeta(tx *bolt.Tx, name string, meta *volumeMetadata) error {
+	b := tx.Bucket(volumeBucketName)
+	val := b.Get([]byte(name))
+	if string(val) == "" {
+		return nil
+	}
+	if err := json.Unmarshal(val, meta); err != nil {
+		return errors.Wrap(err, "error unmarshaling volume metadata")
+	}
+	return nil
+}
+
+func (s *VolumeStore) removeMeta(name string) error {
+	return s.db.Update(func(tx *bolt.Tx) error {
+		return removeMeta(tx, name)
+	})
+}
+
+func removeMeta(tx *bolt.Tx, name string) error {
+	b := tx.Bucket(volumeBucketName)
+	return errors.Wrap(b.Delete([]byte(name)), "error removing volume metadata")
+}
+
+func listEntries(tx *bolt.Tx) []*dbEntry {
+	var entries []*dbEntry
+	b := tx.Bucket(volumeBucketName)
+	b.ForEach(func(k, v []byte) error {
+		entries = append(entries, &dbEntry{k, v})
+		return nil
+	})
+	return entries
+}

+ 91 - 0
volume/store/restore.go

@@ -0,0 +1,91 @@
+package store
+
+import (
+	"encoding/json"
+	"sync"
+
+	"github.com/Sirupsen/logrus"
+	"github.com/boltdb/bolt"
+	"github.com/docker/docker/volume"
+	"github.com/docker/docker/volume/drivers"
+)
+
+// restore is called when a new volume store is created.
+// It's primary purpose is to ensure that all drivers' refcounts are set based
+// on known volumes after a restart.
+// This only attempts to track volumes that are actually stored in the on-disk db.
+// It does not probe the available drivers to find anything that may have been added
+// out of band.
+func (s *VolumeStore) restore() {
+	var entries []*dbEntry
+	s.db.View(func(tx *bolt.Tx) error {
+		entries = listEntries(tx)
+		return nil
+	})
+
+	chRemove := make(chan []byte, len(entries))
+	var wg sync.WaitGroup
+	for _, entry := range entries {
+		wg.Add(1)
+		// this is potentially a very slow operation, so do it in a goroutine
+		go func(entry *dbEntry) {
+			defer wg.Done()
+			var meta volumeMetadata
+			if len(entry.Value) != 0 {
+				if err := json.Unmarshal(entry.Value, &meta); err != nil {
+					logrus.Errorf("Error while reading volume metadata for volume %q: %v", string(entry.Key), err)
+					// don't return here, we can try with `getVolume` below
+				}
+			}
+
+			var v volume.Volume
+			var err error
+			if meta.Driver != "" {
+				v, err = lookupVolume(meta.Driver, string(entry.Key))
+				if err != nil && err != errNoSuchVolume {
+					logrus.WithError(err).WithField("driver", meta.Driver).WithField("volume", string(entry.Key)).Warn("Error restoring volume")
+					return
+				}
+				if v == nil {
+					// doesn't exist in the driver, remove it from the db
+					chRemove <- entry.Key
+					return
+				}
+			} else {
+				v, err = s.getVolume(string(entry.Key))
+				if err != nil {
+					if err == errNoSuchVolume {
+						chRemove <- entry.Key
+					}
+					return
+				}
+
+				meta.Driver = v.DriverName()
+				if err := s.setMeta(v.Name(), meta); err != nil {
+					logrus.WithError(err).WithField("driver", meta.Driver).WithField("volume", v.Name()).Warn("Error updating volume metadata on restore")
+				}
+			}
+
+			// increment driver refcount
+			volumedrivers.CreateDriver(meta.Driver)
+
+			// cache the volume
+			s.globalLock.Lock()
+			s.options[v.Name()] = meta.Options
+			s.labels[v.Name()] = meta.Labels
+			s.names[v.Name()] = v
+			s.globalLock.Unlock()
+		}(entry)
+	}
+
+	wg.Wait()
+	close(chRemove)
+	s.db.Update(func(tx *bolt.Tx) error {
+		for k := range chRemove {
+			if err := removeMeta(tx, string(k)); err != nil {
+				logrus.Warnf("Error removing stale entry from volume db: %v", err)
+			}
+		}
+		return nil
+	})
+}

+ 87 - 84
volume/store/store.go

@@ -1,8 +1,6 @@
 package store
 
 import (
-	"bytes"
-	"encoding/json"
 	"net"
 	"os"
 	"path/filepath"
@@ -19,16 +17,9 @@ import (
 )
 
 const (
-	volumeDataDir    = "volumes"
-	volumeBucketName = "volumes"
+	volumeDataDir = "volumes"
 )
 
-type volumeMetadata struct {
-	Name    string
-	Labels  map[string]string
-	Options map[string]string
-}
-
 type volumeWrapper struct {
 	volume.Volume
 	labels  map[string]string
@@ -89,16 +80,17 @@ func New(rootPath string) (*VolumeStore, error) {
 
 		// initialize volumes bucket
 		if err := vs.db.Update(func(tx *bolt.Tx) error {
-			if _, err := tx.CreateBucketIfNotExists([]byte(volumeBucketName)); err != nil {
+			if _, err := tx.CreateBucketIfNotExists(volumeBucketName); err != nil {
 				return errors.Wrap(err, "error while setting up volume store metadata database")
 			}
-
 			return nil
 		}); err != nil {
 			return nil, err
 		}
 	}
 
+	vs.restore()
+
 	return vs, nil
 }
 
@@ -131,6 +123,15 @@ func (s *VolumeStore) getRefs(name string) []string {
 // the internal data is out of sync with volumes driver plugins.
 func (s *VolumeStore) Purge(name string) {
 	s.globalLock.Lock()
+	v, exists := s.names[name]
+	if exists {
+		if _, err := volumedrivers.RemoveDriver(v.DriverName()); err != nil {
+			logrus.Error("Error dereferencing volume driver: %v", err)
+		}
+	}
+	if err := s.removeMeta(name); err != nil {
+		logrus.Errorf("Error removing volume metadata for volume %q: %v", name, err)
+	}
 	delete(s.names, name)
 	delete(s.refs, name)
 	delete(s.labels, name)
@@ -322,24 +323,11 @@ func (s *VolumeStore) checkConflict(name, driverName string) (volume.Volume, err
 // volumeExists returns if the volume is still present in the driver.
 // An error is returned if there was an issue communicating with the driver.
 func volumeExists(v volume.Volume) (bool, error) {
-	vd, err := volumedrivers.GetDriver(v.DriverName())
+	exists, err := lookupVolume(v.DriverName(), v.Name())
 	if err != nil {
-		return false, errors.Wrapf(err, "error while checking if volume %q exists in driver %q", v.Name(), v.DriverName())
-	}
-	exists, err := vd.Get(v.Name())
-	if err != nil {
-		err = errors.Cause(err)
-		if _, ok := err.(net.Error); ok {
-			return false, errors.Wrapf(err, "error while checking if volume %q exists in driver %q", v.Name(), v.DriverName())
-		}
-
-		// At this point, the error could be anything from the driver, such as "no such volume"
-		// Let's not check an error here, and instead check if the driver returned a volume
-	}
-	if exists == nil {
-		return false, nil
+		return false, err
 	}
-	return true, nil
+	return exists != nil, nil
 }
 
 // create asks the given driver to create a volume with the name/opts.
@@ -395,27 +383,16 @@ func (s *VolumeStore) create(name, driverName string, opts, labels map[string]st
 	s.options[name] = opts
 	s.globalLock.Unlock()
 
-	if s.db != nil {
-		metadata := &volumeMetadata{
-			Name:    name,
-			Labels:  labels,
-			Options: opts,
-		}
-
-		volData, err := json.Marshal(metadata)
-		if err != nil {
-			return nil, err
-		}
-
-		if err := s.db.Update(func(tx *bolt.Tx) error {
-			b := tx.Bucket([]byte(volumeBucketName))
-			err := b.Put([]byte(name), volData)
-			return err
-		}); err != nil {
-			return nil, errors.Wrap(err, "error while persisting volume metadata")
-		}
+	metadata := volumeMetadata{
+		Name:    name,
+		Driver:  vd.Name(),
+		Labels:  labels,
+		Options: opts,
 	}
 
+	if err := s.setMeta(name, metadata); err != nil {
+		return nil, err
+	}
 	return volumeWrapper{v, labels, vd.Scope(), opts}, nil
 }
 
@@ -462,48 +439,41 @@ func (s *VolumeStore) Get(name string) (volume.Volume, error) {
 // if the driver is unknown it probes all drivers until it finds the first volume with that name.
 // it is expected that callers of this function hold any necessary locks
 func (s *VolumeStore) getVolume(name string) (volume.Volume, error) {
-	labels := map[string]string{}
-	options := map[string]string{}
-
-	if s.db != nil {
-		// get meta
-		if err := s.db.Update(func(tx *bolt.Tx) error {
-			b := tx.Bucket([]byte(volumeBucketName))
-			data := b.Get([]byte(name))
-
-			if string(data) == "" {
-				return nil
-			}
-
-			var meta volumeMetadata
-			buf := bytes.NewBuffer(data)
+	var meta volumeMetadata
+	meta, err := s.getMeta(name)
+	if err != nil {
+		return nil, err
+	}
 
-			if err := json.NewDecoder(buf).Decode(&meta); err != nil {
-				return err
+	driverName := meta.Driver
+	if driverName == "" {
+		s.globalLock.RLock()
+		v, exists := s.names[name]
+		s.globalLock.RUnlock()
+		if exists {
+			meta.Driver = v.DriverName()
+			if err := s.setMeta(name, meta); err != nil {
+				return nil, err
 			}
-			labels = meta.Labels
-			options = meta.Options
-
-			return nil
-		}); err != nil {
-			return nil, err
 		}
 	}
 
-	logrus.Debugf("Getting volume reference for name: %s", name)
-	s.globalLock.RLock()
-	v, exists := s.names[name]
-	s.globalLock.RUnlock()
-	if exists {
-		vd, err := volumedrivers.GetDriver(v.DriverName())
+	if meta.Driver != "" {
+		vol, err := lookupVolume(meta.Driver, name)
 		if err != nil {
 			return nil, err
 		}
-		vol, err := vd.Get(name)
-		if err != nil {
-			return nil, err
+		if vol == nil {
+			s.Purge(name)
+			return nil, errNoSuchVolume
 		}
-		return volumeWrapper{vol, labels, vd.Scope(), options}, nil
+
+		var scope string
+		vd, err := volumedrivers.GetDriver(meta.Driver)
+		if err == nil {
+			scope = vd.Scope()
+		}
+		return volumeWrapper{vol, meta.Labels, scope, meta.Options}, nil
 	}
 
 	logrus.Debugf("Probing all drivers for volume with name: %s", name)
@@ -514,15 +484,42 @@ func (s *VolumeStore) getVolume(name string) (volume.Volume, error) {
 
 	for _, d := range drivers {
 		v, err := d.Get(name)
-		if err != nil {
+		if err != nil || v == nil {
 			continue
 		}
-
-		return volumeWrapper{v, labels, d.Scope(), options}, nil
+		meta.Driver = v.DriverName()
+		if err := s.setMeta(name, meta); err != nil {
+			return nil, err
+		}
+		return volumeWrapper{v, meta.Labels, d.Scope(), meta.Options}, nil
 	}
 	return nil, errNoSuchVolume
 }
 
+// lookupVolume gets the specified volume from the specified driver.
+// This will only return errors related to communications with the driver.
+// If the driver returns an error that is not communication related the
+//   error is logged but not returned.
+// If the volume is not found it will return `nil, nil``
+func lookupVolume(driverName, volumeName string) (volume.Volume, error) {
+	vd, err := volumedrivers.GetDriver(driverName)
+	if err != nil {
+		return nil, errors.Wrapf(err, "error while checking if volume %q exists in driver %q", volumeName, driverName)
+	}
+	v, err := vd.Get(volumeName)
+	if err != nil {
+		err = errors.Cause(err)
+		if _, ok := err.(net.Error); ok {
+			return nil, errors.Wrapf(err, "error while checking if volume %q exists in driver %q", v.Name(), v.DriverName())
+		}
+
+		// At this point, the error could be anything from the driver, such as "no such volume"
+		// Let's not check an error here, and instead check if the driver returned a volume
+		logrus.WithError(err).WithField("driver", driverName).WithField("volume", volumeName).Warnf("Error while looking up volume")
+	}
+	return v, nil
+}
+
 // Remove removes the requested volume. A volume is not removed if it has any refs
 func (s *VolumeStore) Remove(v volume.Volume) error {
 	name := normaliseVolumeName(v.Name())
@@ -534,7 +531,7 @@ func (s *VolumeStore) Remove(v volume.Volume) error {
 		return &OpErr{Err: errVolumeInUse, Name: v.Name(), Op: "remove", Refs: refs}
 	}
 
-	vd, err := volumedrivers.RemoveDriver(v.DriverName())
+	vd, err := volumedrivers.GetDriver(v.DriverName())
 	if err != nil {
 		return &OpErr{Err: err, Name: vd.Name(), Op: "remove"}
 	}
@@ -635,3 +632,9 @@ func unwrapVolume(v volume.Volume) volume.Volume {
 
 	return v
 }
+
+// Shutdown releases all resources used by the volume store
+// It does not make any changes to volumes, drivers, etc.
+func (s *VolumeStore) Shutdown() error {
+	return s.db.Close()
+}

+ 40 - 7
volume/store/store_test.go

@@ -2,6 +2,8 @@ package store
 
 import (
 	"errors"
+	"io/ioutil"
+	"os"
 	"strings"
 	"testing"
 
@@ -16,7 +18,13 @@ func TestCreate(t *testing.T) {
 
 	volumedrivers.Register(volumetestutils.NewFakeDriver("fake"), "fake")
 	defer volumedrivers.Unregister("fake")
-	s, err := New("")
+	dir, err := ioutil.TempDir("", "test-create")
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer os.RemoveAll(dir)
+
+	s, err := New(dir)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -47,7 +55,12 @@ func TestRemove(t *testing.T) {
 	volumedrivers.Register(volumetestutils.NewFakeDriver("noop"), "noop")
 	defer volumedrivers.Unregister("fake")
 	defer volumedrivers.Unregister("noop")
-	s, err := New("")
+	dir, err := ioutil.TempDir("", "test-remove")
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer os.RemoveAll(dir)
+	s, err := New(dir)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -80,8 +93,13 @@ func TestList(t *testing.T) {
 	volumedrivers.Register(volumetestutils.NewFakeDriver("fake2"), "fake2")
 	defer volumedrivers.Unregister("fake")
 	defer volumedrivers.Unregister("fake2")
+	dir, err := ioutil.TempDir("", "test-list")
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer os.RemoveAll(dir)
 
-	s, err := New("")
+	s, err := New(dir)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -99,9 +117,12 @@ func TestList(t *testing.T) {
 	if len(ls) != 2 {
 		t.Fatalf("expected 2 volumes, got: %d", len(ls))
 	}
+	if err := s.Shutdown(); err != nil {
+		t.Fatal(err)
+	}
 
 	// and again with a new store
-	s, err = New("")
+	s, err = New(dir)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -119,7 +140,11 @@ func TestFilterByDriver(t *testing.T) {
 	volumedrivers.Register(volumetestutils.NewFakeDriver("noop"), "noop")
 	defer volumedrivers.Unregister("fake")
 	defer volumedrivers.Unregister("noop")
-	s, err := New("")
+	dir, err := ioutil.TempDir("", "test-filter-driver")
+	if err != nil {
+		t.Fatal(err)
+	}
+	s, err := New(dir)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -146,8 +171,12 @@ func TestFilterByDriver(t *testing.T) {
 func TestFilterByUsed(t *testing.T) {
 	volumedrivers.Register(volumetestutils.NewFakeDriver("fake"), "fake")
 	volumedrivers.Register(volumetestutils.NewFakeDriver("noop"), "noop")
+	dir, err := ioutil.TempDir("", "test-filter-used")
+	if err != nil {
+		t.Fatal(err)
+	}
 
-	s, err := New("")
+	s, err := New(dir)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -183,8 +212,12 @@ func TestFilterByUsed(t *testing.T) {
 
 func TestDerefMultipleOfSameRef(t *testing.T) {
 	volumedrivers.Register(volumetestutils.NewFakeDriver("fake"), "fake")
+	dir, err := ioutil.TempDir("", "test-same-deref")
+	if err != nil {
+		t.Fatal(err)
+	}
 
-	s, err := New("")
+	s, err := New(dir)
 	if err != nil {
 		t.Fatal(err)
 	}