Prechádzať zdrojové kódy

Merge pull request #3073 from alexlarsson/refcount-driver-mounts

Refcount driver mounts
Guillaume J. Charmes 11 rokov pred
rodič
commit
f61a91f50a

+ 2 - 2
buildfile.go

@@ -488,7 +488,7 @@ func (b *buildFile) CmdAdd(args string) error {
 	}
 	b.tmpContainers[container.ID] = struct{}{}
 
-	if err := container.EnsureMounted(); err != nil {
+	if err := container.Mount(); err != nil {
 		return err
 	}
 	defer container.Unmount()
@@ -610,7 +610,7 @@ func (b *buildFile) commit(id string, autoCmd []string, comment string) error {
 		b.tmpContainers[container.ID] = struct{}{}
 		fmt.Fprintf(b.outStream, " ---> Running in %s\n", utils.TruncateID(container.ID))
 		id = container.ID
-		if err := container.EnsureMounted(); err != nil {
+		if err := container.Mount(); err != nil {
 			return err
 		}
 		defer container.Unmount()

+ 22 - 14
container.go

@@ -201,9 +201,10 @@ func (settings *NetworkSettings) PortMappingAPI() []APIPort {
 
 // Inject the io.Reader at the given path. Note: do not close the reader
 func (container *Container) Inject(file io.Reader, pth string) error {
-	if err := container.EnsureMounted(); err != nil {
+	if err := container.Mount(); err != nil {
 		return fmt.Errorf("inject: error mounting container %s: %s", container.ID, err)
 	}
+	defer container.Unmount()
 
 	// Return error if path exists
 	destPath := path.Join(container.RootfsPath(), pth)
@@ -505,7 +506,7 @@ func (container *Container) Start() (err error) {
 			container.cleanup()
 		}
 	}()
-	if err := container.EnsureMounted(); err != nil {
+	if err := container.Mount(); err != nil {
 		return err
 	}
 	if container.runtime.networkManager.disabled {
@@ -1287,21 +1288,27 @@ func (container *Container) Resize(h, w int) error {
 }
 
 func (container *Container) ExportRw() (archive.Archive, error) {
-	if err := container.EnsureMounted(); err != nil {
+	if err := container.Mount(); err != nil {
 		return nil, err
 	}
 	if container.runtime == nil {
 		return nil, fmt.Errorf("Can't load storage driver for unregistered container %s", container.ID)
 	}
+	defer container.Unmount()
 
 	return container.runtime.Diff(container)
 }
 
 func (container *Container) Export() (archive.Archive, error) {
-	if err := container.EnsureMounted(); err != nil {
+	if err := container.Mount(); err != nil {
 		return nil, err
 	}
-	return archive.Tar(container.RootfsPath(), archive.Uncompressed)
+
+	archive, err := archive.Tar(container.RootfsPath(), archive.Uncompressed)
+	if err != nil {
+		return nil, err
+	}
+	return EofReader(archive, func() { container.Unmount() }), nil
 }
 
 func (container *Container) WaitTimeout(timeout time.Duration) error {
@@ -1319,12 +1326,6 @@ func (container *Container) WaitTimeout(timeout time.Duration) error {
 	}
 }
 
-func (container *Container) EnsureMounted() error {
-	// FIXME: EnsureMounted is deprecated because drivers are now responsible
-	// for re-entrant mounting in their Get() method.
-	return container.Mount()
-}
-
 func (container *Container) Mount() error {
 	return container.runtime.Mount(container)
 }
@@ -1422,10 +1423,11 @@ func (container *Container) GetSize() (int64, int64) {
 		driver             = container.runtime.driver
 	)
 
-	if err := container.EnsureMounted(); err != nil {
+	if err := container.Mount(); err != nil {
 		utils.Errorf("Warning: failed to compute size of container rootfs %s: %s", container.ID, err)
 		return sizeRw, sizeRootfs
 	}
+	defer container.Unmount()
 
 	if differ, ok := container.runtime.driver.(graphdriver.Differ); ok {
 		sizeRw, err = differ.DiffSize(container.ID)
@@ -1453,13 +1455,14 @@ func (container *Container) GetSize() (int64, int64) {
 }
 
 func (container *Container) Copy(resource string) (archive.Archive, error) {
-	if err := container.EnsureMounted(); err != nil {
+	if err := container.Mount(); err != nil {
 		return nil, err
 	}
 	var filter []string
 	basePath := path.Join(container.RootfsPath(), resource)
 	stat, err := os.Stat(basePath)
 	if err != nil {
+		container.Unmount()
 		return nil, err
 	}
 	if !stat.IsDir() {
@@ -1470,10 +1473,15 @@ func (container *Container) Copy(resource string) (archive.Archive, error) {
 		filter = []string{path.Base(basePath)}
 		basePath = path.Dir(basePath)
 	}
-	return archive.TarFilter(basePath, &archive.TarOptions{
+
+	archive, err := archive.TarFilter(basePath, &archive.TarOptions{
 		Compression: archive.Uncompressed,
 		Includes:    filter,
 	})
+	if err != nil {
+		return nil, err
+	}
+	return EofReader(archive, func() { container.Unmount() }), nil
 }
 
 // Returns true if the container exposes a certain port

+ 2 - 0
graph.go

@@ -97,6 +97,7 @@ func (graph *Graph) Get(name string) (*Image, error) {
 		if err != nil {
 			return nil, fmt.Errorf("Driver %s failed to get image rootfs %s: %s", graph.driver, img.ID, err)
 		}
+		defer graph.driver.Put(img.ID)
 
 		var size int64
 		if img.Parent == "" {
@@ -193,6 +194,7 @@ func (graph *Graph) Register(jsonData []byte, layerData archive.Archive, img *Im
 	if err != nil {
 		return fmt.Errorf("Driver %s failed to get image rootfs %s: %s", graph.driver, img.ID, err)
 	}
+	defer graph.driver.Put(img.ID)
 	img.graph = graph
 	if err := StoreImage(img, jsonData, layerData, tmp, rootfs); err != nil {
 		return err

+ 50 - 5
graphdriver/aufs/aufs.go

@@ -31,6 +31,7 @@ import (
 	"os/exec"
 	"path"
 	"strings"
+	"sync"
 )
 
 func init() {
@@ -38,7 +39,9 @@ func init() {
 }
 
 type Driver struct {
-	root string
+	root       string
+	sync.Mutex // Protects concurrent modification to active
+	active     map[string]int
 }
 
 // New returns a new AUFS driver.
@@ -54,12 +57,17 @@ func Init(root string) (graphdriver.Driver, error) {
 		"layers",
 	}
 
+	a := &Driver{
+		root:   root,
+		active: make(map[string]int),
+	}
+
 	// Create the root aufs driver dir and return
 	// if it already exists
 	// If not populate the dir structure
 	if err := os.MkdirAll(root, 0755); err != nil {
 		if os.IsExist(err) {
-			return &Driver{root}, nil
+			return a, nil
 		}
 		return nil, err
 	}
@@ -69,7 +77,7 @@ func Init(root string) (graphdriver.Driver, error) {
 			return nil, err
 		}
 	}
-	return &Driver{root}, nil
+	return a, nil
 }
 
 // Return a nil error if the kernel supports aufs
@@ -167,6 +175,14 @@ func (a *Driver) createDirsFor(id string) error {
 
 // Unmount and remove the dir information
 func (a *Driver) Remove(id string) error {
+	// Protect the a.active from concurrent access
+	a.Lock()
+	defer a.Unlock()
+
+	if a.active[id] != 0 {
+		utils.Errorf("Warning: removing active id %s\n", id)
+	}
+
 	// Make sure the dir is umounted first
 	if err := a.unmount(id); err != nil {
 		return err
@@ -210,18 +226,47 @@ func (a *Driver) Get(id string) (string, error) {
 		ids = []string{}
 	}
 
+	// Protect the a.active from concurrent access
+	a.Lock()
+	defer a.Unlock()
+
+	count := a.active[id]
+
 	// If a dir does not have a parent ( no layers )do not try to mount
 	// just return the diff path to the data
 	out := path.Join(a.rootPath(), "diff", id)
 	if len(ids) > 0 {
 		out = path.Join(a.rootPath(), "mnt", id)
-		if err := a.mount(id); err != nil {
-			return "", err
+
+		if count == 0 {
+			if err := a.mount(id); err != nil {
+				return "", err
+			}
 		}
 	}
+
+	a.active[id] = count + 1
+
 	return out, nil
 }
 
+func (a *Driver) Put(id string) {
+	// Protect the a.active from concurrent access
+	a.Lock()
+	defer a.Unlock()
+
+	if count := a.active[id]; count > 1 {
+		a.active[id] = count - 1
+	} else {
+		ids, _ := getParentIds(a.rootPath(), id)
+		// We only mounted if there are any parents
+		if ids != nil && len(ids) > 0 {
+			a.unmount(id)
+		}
+		delete(a.active, id)
+	}
+}
+
 // Returns an archive of the contents for the id
 func (a *Driver) Diff(id string) (archive.Archive, error) {
 	return archive.TarFilter(path.Join(a.rootPath(), "diff", id), &archive.TarOptions{

+ 41 - 3
graphdriver/devmapper/driver.go

@@ -5,8 +5,10 @@ package devmapper
 import (
 	"fmt"
 	"github.com/dotcloud/docker/graphdriver"
+	"github.com/dotcloud/docker/utils"
 	"io/ioutil"
 	"path"
+	"sync"
 )
 
 func init() {
@@ -20,7 +22,9 @@ func init() {
 
 type Driver struct {
 	*DeviceSet
-	home string
+	home       string
+	sync.Mutex // Protects concurrent modification to active
+	active     map[string]int
 }
 
 var Init = func(home string) (graphdriver.Driver, error) {
@@ -31,6 +35,7 @@ var Init = func(home string) (graphdriver.Driver, error) {
 	d := &Driver{
 		DeviceSet: deviceSet,
 		home:      home,
+		active:    make(map[string]int),
 	}
 	return d, nil
 }
@@ -82,6 +87,14 @@ func (d *Driver) Create(id, parent string) error {
 }
 
 func (d *Driver) Remove(id string) error {
+	// Protect the d.active from concurrent access
+	d.Lock()
+	defer d.Unlock()
+
+	if d.active[id] != 0 {
+		utils.Errorf("Warning: removing active id %s\n", id)
+	}
+
 	mp := path.Join(d.home, "mnt", id)
 	if err := d.unmount(id, mp); err != nil {
 		return err
@@ -90,13 +103,38 @@ func (d *Driver) Remove(id string) error {
 }
 
 func (d *Driver) Get(id string) (string, error) {
+	// Protect the d.active from concurrent access
+	d.Lock()
+	defer d.Unlock()
+
+	count := d.active[id]
+
 	mp := path.Join(d.home, "mnt", id)
-	if err := d.mount(id, mp); err != nil {
-		return "", err
+	if count == 0 {
+		if err := d.mount(id, mp); err != nil {
+			return "", err
+		}
 	}
+
+	d.active[id] = count + 1
+
 	return path.Join(mp, "rootfs"), nil
 }
 
+func (d *Driver) Put(id string) {
+	// Protect the d.active from concurrent access
+	d.Lock()
+	defer d.Unlock()
+
+	if count := d.active[id]; count > 1 {
+		d.active[id] = count - 1
+	} else {
+		mp := path.Join(d.home, "mnt", id)
+		d.unmount(id, mp)
+		delete(d.active, id)
+	}
+}
+
 func (d *Driver) mount(id, mountPoint string) error {
 	// Create the target directories if they don't exist
 	if err := osMkdirAll(mountPoint, 0755); err != nil && !osIsExist(err) {

+ 1 - 0
graphdriver/driver.go

@@ -17,6 +17,7 @@ type Driver interface {
 	Remove(id string) error
 
 	Get(id string) (dir string, err error)
+	Put(id string)
 	Exists(id string) bool
 
 	Status() [][2]string

+ 5 - 0
graphdriver/vfs/driver.go

@@ -84,6 +84,11 @@ func (d *Driver) Get(id string) (string, error) {
 	return dir, nil
 }
 
+func (d *Driver) Put(id string) {
+	// The vfs driver has no runtime resources (e.g. mounts)
+	// to clean up, so we don't need anything here
+}
+
 func (d *Driver) Exists(id string) bool {
 	_, err := os.Stat(d.dir(id))
 	return err == nil

+ 26 - 9
image.go

@@ -104,6 +104,7 @@ func StoreImage(img *Image, jsonData []byte, layerData archive.Archive, root, la
 				if err != nil {
 					return err
 				}
+				defer driver.Put(img.Parent)
 				changes, err := archive.ChangesDirs(layer, parent)
 				if err != nil {
 					return err
@@ -147,7 +148,7 @@ func jsonPath(root string) string {
 }
 
 // TarLayer returns a tar archive of the image's filesystem layer.
-func (img *Image) TarLayer() (archive.Archive, error) {
+func (img *Image) TarLayer() (arch archive.Archive, err error) {
 	if img.graph == nil {
 		return nil, fmt.Errorf("Can't load storage driver for unregistered image %s", img.ID)
 	}
@@ -160,19 +161,35 @@ func (img *Image) TarLayer() (archive.Archive, error) {
 	if err != nil {
 		return nil, err
 	}
-	if img.Parent == "" {
-		return archive.Tar(imgFs, archive.Uncompressed)
-	} else {
-		parentFs, err := driver.Get(img.Parent)
-		if err != nil {
-			return nil, err
+
+	defer func() {
+		if err == nil {
+			driver.Put(img.ID)
 		}
-		changes, err := archive.ChangesDirs(imgFs, parentFs)
+	}()
+
+	if img.Parent == "" {
+		archive, err := archive.Tar(imgFs, archive.Uncompressed)
 		if err != nil {
 			return nil, err
 		}
-		return archive.ExportChanges(imgFs, changes)
+		return EofReader(archive, func() { driver.Put(img.ID) }), nil
+	}
+
+	parentFs, err := driver.Get(img.Parent)
+	if err != nil {
+		return nil, err
+	}
+	defer driver.Put(img.Parent)
+	changes, err := archive.ChangesDirs(imgFs, parentFs)
+	if err != nil {
+		return nil, err
+	}
+	archive, err := archive.ExportChanges(imgFs, changes)
+	if err != nil {
+		return nil, err
 	}
+	return EofReader(archive, func() { driver.Put(img.ID) }), nil
 }
 
 func ValidateID(id string) error {

+ 2 - 1
integration/utils_test.go

@@ -71,9 +71,10 @@ func containerRun(eng *engine.Engine, id string, t utils.Fataler) {
 
 func containerFileExists(eng *engine.Engine, id, dir string, t utils.Fataler) bool {
 	c := getContainer(eng, id, t)
-	if err := c.EnsureMounted(); err != nil {
+	if err := c.Mount(); err != nil {
 		t.Fatal(err)
 	}
+	defer c.Unmount()
 	if _, err := os.Stat(path.Join(c.RootfsPath(), dir)); err != nil {
 		if os.IsNotExist(err) {
 			return false

+ 13 - 4
runtime.go

@@ -134,6 +134,7 @@ func (runtime *Runtime) Register(container *Container) error {
 	if err != nil {
 		return fmt.Errorf("Error getting container filesystem %s from driver %s: %s", container.ID, runtime.driver, err)
 	}
+	defer runtime.driver.Put(container.ID)
 	container.rootfs = rootfs
 
 	container.runtime = runtime
@@ -466,6 +467,8 @@ func (runtime *Runtime) Create(config *Config, name string) (*Container, []strin
 	if err != nil {
 		return nil, nil, err
 	}
+	defer runtime.driver.Put(initID)
+
 	if err := setupInitLayer(initPath); err != nil {
 		return nil, nil, err
 	}
@@ -523,9 +526,10 @@ func (runtime *Runtime) Create(config *Config, name string) (*Container, []strin
 func (runtime *Runtime) Commit(container *Container, repository, tag, comment, author string, config *Config) (*Image, error) {
 	// FIXME: freeze the container before copying it to avoid data corruption?
 	// FIXME: this shouldn't be in commands.
-	if err := container.EnsureMounted(); err != nil {
+	if err := container.Mount(); err != nil {
 		return nil, err
 	}
+	defer container.Unmount()
 
 	rwTar, err := container.ExportRw()
 	if err != nil {
@@ -769,8 +773,7 @@ func (runtime *Runtime) Mount(container *Container) error {
 }
 
 func (runtime *Runtime) Unmount(container *Container) error {
-	// FIXME: Unmount is deprecated because drivers are responsible for mounting
-	// and unmounting when necessary. Use driver.Remove() instead.
+	runtime.driver.Put(container.ID)
 	return nil
 }
 
@@ -782,10 +785,12 @@ func (runtime *Runtime) Changes(container *Container) ([]archive.Change, error)
 	if err != nil {
 		return nil, fmt.Errorf("Error getting container rootfs %s from driver %s: %s", container.ID, container.runtime.driver, err)
 	}
+	defer runtime.driver.Put(container.ID)
 	initDir, err := runtime.driver.Get(container.ID + "-init")
 	if err != nil {
 		return nil, fmt.Errorf("Error getting container init rootfs %s from driver %s: %s", container.ID, container.runtime.driver, err)
 	}
+	defer runtime.driver.Put(container.ID + "-init")
 	return archive.ChangesDirs(cDir, initDir)
 }
 
@@ -804,7 +809,11 @@ func (runtime *Runtime) Diff(container *Container) (archive.Archive, error) {
 		return nil, fmt.Errorf("Error getting container rootfs %s from driver %s: %s", container.ID, container.runtime.driver, err)
 	}
 
-	return archive.ExportChanges(cDir, changes)
+	archive, err := archive.ExportChanges(cDir, changes)
+	if err != nil {
+		return nil, err
+	}
+	return EofReader(archive, func() { runtime.driver.Put(container.ID) }), nil
 }
 
 func (runtime *Runtime) Run(c *Container, startCallback execdriver.StartCallback) (int, error) {

+ 27 - 0
utils.go

@@ -5,8 +5,10 @@ import (
 	"github.com/dotcloud/docker/archive"
 	"github.com/dotcloud/docker/pkg/namesgenerator"
 	"github.com/dotcloud/docker/utils"
+	"io"
 	"strconv"
 	"strings"
+	"sync/atomic"
 )
 
 type Change struct {
@@ -339,3 +341,28 @@ func (c *checker) Exists(name string) bool {
 func generateRandomName(runtime *Runtime) (string, error) {
 	return namesgenerator.GenerateRandomName(&checker{runtime})
 }
+
+// Read an io.Reader and call a function when it returns EOF
+func EofReader(r io.Reader, callback func()) *eofReader {
+	return &eofReader{
+		Reader:   r,
+		callback: callback,
+	}
+}
+
+type eofReader struct {
+	io.Reader
+	gotEOF   int32
+	callback func()
+}
+
+func (r *eofReader) Read(p []byte) (n int, err error) {
+	n, err = r.Reader.Read(p)
+	if err == io.EOF {
+		// Use atomics to make the gotEOF check threadsafe
+		if atomic.CompareAndSwapInt32(&r.gotEOF, 0, 1) {
+			r.callback()
+		}
+	}
+	return
+}