Browse Source

fix 8926: rmi dangling is unsafe when pulling

Signed-off-by: Ma Shimiao <mashimiao.fnst@cn.fujitsu.com>
Signed-off-by: Tibor Vass <tibor@docker.com>
Ma Shimiao 10 years ago
parent
commit
1b67c38f6f
8 changed files with 120 additions and 9 deletions
  1. 3 0
      builder/evaluator.go
  2. 4 0
      builder/internals.go
  3. 6 1
      builder/job.go
  4. 3 0
      daemon/image_delete.go
  5. 65 3
      graph/graph.go
  6. 14 0
      graph/pull_v1.go
  7. 17 5
      graph/pull_v2.go
  8. 8 0
      registry/session.go

+ 3 - 0
builder/evaluator.go

@@ -131,6 +131,9 @@ type Builder struct {
 	memorySwap   int64
 	memorySwap   int64
 
 
 	cancelled <-chan struct{} // When closed, job was cancelled.
 	cancelled <-chan struct{} // When closed, job was cancelled.
+
+	activeImages []string
+	id           string // Used to hold reference images
 }
 }
 
 
 // Run the builder with the context. This is the lynchpin of this package. This
 // Run the builder with the context. This is the lynchpin of this package. This

+ 4 - 0
builder/internals.go

@@ -128,6 +128,8 @@ func (b *Builder) commit(id string, autoCmd *runconfig.Command, comment string)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
+	b.Daemon.Graph().Retain(b.id, image.ID)
+	b.activeImages = append(b.activeImages, image.ID)
 	b.image = image.ID
 	b.image = image.ID
 	return nil
 	return nil
 }
 }
@@ -592,6 +594,8 @@ func (b *Builder) probeCache() (bool, error) {
 	fmt.Fprintf(b.OutStream, " ---> Using cache\n")
 	fmt.Fprintf(b.OutStream, " ---> Using cache\n")
 	logrus.Debugf("[BUILDER] Use cached version")
 	logrus.Debugf("[BUILDER] Use cached version")
 	b.image = cache.ID
 	b.image = cache.ID
+	b.Daemon.Graph().Retain(b.id, cache.ID)
+	b.activeImages = append(b.activeImages, cache.ID)
 	return true, nil
 	return true, nil
 }
 }
 
 

+ 6 - 1
builder/job.go

@@ -20,6 +20,7 @@ import (
 	"github.com/docker/docker/pkg/parsers"
 	"github.com/docker/docker/pkg/parsers"
 	"github.com/docker/docker/pkg/progressreader"
 	"github.com/docker/docker/pkg/progressreader"
 	"github.com/docker/docker/pkg/streamformatter"
 	"github.com/docker/docker/pkg/streamformatter"
+	"github.com/docker/docker/pkg/stringid"
 	"github.com/docker/docker/pkg/urlutil"
 	"github.com/docker/docker/pkg/urlutil"
 	"github.com/docker/docker/registry"
 	"github.com/docker/docker/registry"
 	"github.com/docker/docker/runconfig"
 	"github.com/docker/docker/runconfig"
@@ -200,13 +201,17 @@ func Build(d *daemon.Daemon, buildConfig *Config) error {
 		memory:          buildConfig.Memory,
 		memory:          buildConfig.Memory,
 		memorySwap:      buildConfig.MemorySwap,
 		memorySwap:      buildConfig.MemorySwap,
 		cancelled:       buildConfig.WaitCancelled(),
 		cancelled:       buildConfig.WaitCancelled(),
+		id:              stringid.GenerateRandomID(),
 	}
 	}
 
 
+	defer func() {
+		builder.Daemon.Graph().Release(builder.id, builder.activeImages...)
+	}()
+
 	id, err := builder.Run(context)
 	id, err := builder.Run(context)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
-
 	if repoName != "" {
 	if repoName != "" {
 		return d.Repositories().Tag(repoName, tag, id, true)
 		return d.Repositories().Tag(repoName, tag, id, true)
 	}
 	}

+ 3 - 0
daemon/image_delete.go

@@ -137,6 +137,9 @@ func (daemon *Daemon) imgDeleteHelper(name string, list *[]types.ImageDelete, fi
 }
 }
 
 
 func (daemon *Daemon) canDeleteImage(imgID string, force bool) error {
 func (daemon *Daemon) canDeleteImage(imgID string, force bool) error {
+	if daemon.Graph().IsHeld(imgID) {
+		return fmt.Errorf("Conflict, cannot delete because %s is held by an ongoing pull or build", stringid.TruncateID(imgID))
+	}
 	for _, container := range daemon.List() {
 	for _, container := range daemon.List() {
 		if container.ImageID == "" {
 		if container.ImageID == "" {
 			// This technically should never happen, but if the container
 			// This technically should never happen, but if the container

+ 65 - 3
graph/graph.go

@@ -13,6 +13,7 @@ import (
 	"runtime"
 	"runtime"
 	"strconv"
 	"strconv"
 	"strings"
 	"strings"
+	"sync"
 	"time"
 	"time"
 
 
 	"github.com/Sirupsen/logrus"
 	"github.com/Sirupsen/logrus"
@@ -29,12 +30,56 @@ import (
 	"github.com/docker/docker/runconfig"
 	"github.com/docker/docker/runconfig"
 )
 )
 
 
+// The type is used to protect pulling or building related image
+// layers from deleteing when filtered by dangling=true
+// The key of layers is the images ID which is pulling or building
+// The value of layers is a slice which hold layer IDs referenced to
+// pulling or building images
+type retainedLayers struct {
+	layerHolders map[string]map[string]struct{} // map[layerID]map[sessionID]
+	sync.Mutex
+}
+
+func (r *retainedLayers) Add(sessionID string, layerIDs []string) {
+	r.Lock()
+	defer r.Unlock()
+	for _, layerID := range layerIDs {
+		if r.layerHolders[layerID] == nil {
+			r.layerHolders[layerID] = map[string]struct{}{}
+		}
+		r.layerHolders[layerID][sessionID] = struct{}{}
+	}
+}
+
+func (r *retainedLayers) Delete(sessionID string, layerIDs []string) {
+	r.Lock()
+	defer r.Unlock()
+	for _, layerID := range layerIDs {
+		holders, ok := r.layerHolders[layerID]
+		if !ok {
+			continue
+		}
+		delete(holders, sessionID)
+		if len(holders) == 0 {
+			delete(r.layerHolders, layerID) // Delete any empty reference set.
+		}
+	}
+}
+
+func (r *retainedLayers) Exists(layerID string) bool {
+	r.Lock()
+	_, exists := r.layerHolders[layerID]
+	r.Unlock()
+	return exists
+}
+
 // A Graph is a store for versioned filesystem images and the relationship between them.
 // A Graph is a store for versioned filesystem images and the relationship between them.
 type Graph struct {
 type Graph struct {
 	root       string
 	root       string
 	idIndex    *truncindex.TruncIndex
 	idIndex    *truncindex.TruncIndex
 	driver     graphdriver.Driver
 	driver     graphdriver.Driver
 	imageMutex imageMutex // protect images in driver.
 	imageMutex imageMutex // protect images in driver.
+	retained   *retainedLayers
 }
 }
 
 
 type Image struct {
 type Image struct {
@@ -73,9 +118,10 @@ func NewGraph(root string, driver graphdriver.Driver) (*Graph, error) {
 	}
 	}
 
 
 	graph := &Graph{
 	graph := &Graph{
-		root:    abspath,
-		idIndex: truncindex.NewTruncIndex([]string{}),
-		driver:  driver,
+		root:     abspath,
+		idIndex:  truncindex.NewTruncIndex([]string{}),
+		driver:   driver,
+		retained: &retainedLayers{layerHolders: make(map[string]map[string]struct{})},
 	}
 	}
 	if err := graph.restore(); err != nil {
 	if err := graph.restore(); err != nil {
 		return nil, err
 		return nil, err
@@ -83,6 +129,11 @@ func NewGraph(root string, driver graphdriver.Driver) (*Graph, error) {
 	return graph, nil
 	return graph, nil
 }
 }
 
 
+// IsHeld returns whether the given layerID is being used by an ongoing pull or build.
+func (graph *Graph) IsHeld(layerID string) bool {
+	return graph.retained.Exists(layerID)
+}
+
 func (graph *Graph) restore() error {
 func (graph *Graph) restore() error {
 	dir, err := ioutil.ReadDir(graph.root)
 	dir, err := ioutil.ReadDir(graph.root)
 	if err != nil {
 	if err != nil {
@@ -367,6 +418,17 @@ func (graph *Graph) ByParent() map[string][]*Image {
 	return byParent
 	return byParent
 }
 }
 
 
+// If the images and layers are in pulling chain, retain them.
+// If not, they may be deleted by rmi with dangling condition.
+func (graph *Graph) Retain(sessionID string, layerIDs ...string) {
+	graph.retained.Add(sessionID, layerIDs)
+}
+
+// Release removes the referenced image id from the provided set of layers.
+func (graph *Graph) Release(sessionID string, layerIDs ...string) {
+	graph.retained.Delete(sessionID, layerIDs)
+}
+
 // Heads returns all heads in the graph, keyed by id.
 // Heads returns all heads in the graph, keyed by id.
 // A head is an image which is not the parent of another image in the graph.
 // A head is an image which is not the parent of another image in the graph.
 func (graph *Graph) Heads() map[string]*Image {
 func (graph *Graph) Heads() map[string]*Image {

+ 14 - 0
graph/pull_v1.go

@@ -117,6 +117,11 @@ func (p *v1Puller) pullRepository(askedTag string) error {
 	errors := make(chan error)
 	errors := make(chan error)
 
 
 	layersDownloaded := false
 	layersDownloaded := false
+	imgIDs := []string{}
+	sessionID := p.session.ID()
+	defer func() {
+		p.graph.Release(sessionID, imgIDs...)
+	}()
 	for _, image := range repoData.ImgList {
 	for _, image := range repoData.ImgList {
 		downloadImage := func(img *registry.ImgData) {
 		downloadImage := func(img *registry.ImgData) {
 			if askedTag != "" && img.Tag != askedTag {
 			if askedTag != "" && img.Tag != askedTag {
@@ -144,6 +149,10 @@ func (p *v1Puller) pullRepository(askedTag string) error {
 			}
 			}
 			defer p.poolRemove("pull", "img:"+img.ID)
 			defer p.poolRemove("pull", "img:"+img.ID)
 
 
+			// we need to retain it until tagging
+			p.graph.Retain(sessionID, img.ID)
+			imgIDs = append(imgIDs, img.ID)
+
 			out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s", img.Tag, p.repoInfo.CanonicalName), nil))
 			out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s", img.Tag, p.repoInfo.CanonicalName), nil))
 			success := false
 			success := false
 			var lastErr, err error
 			var lastErr, err error
@@ -226,6 +235,11 @@ func (p *v1Puller) pullImage(imgID, endpoint string, token []string) (bool, erro
 	// FIXME: Try to stream the images?
 	// FIXME: Try to stream the images?
 	// FIXME: Launch the getRemoteImage() in goroutines
 	// FIXME: Launch the getRemoteImage() in goroutines
 
 
+	sessionID := p.session.ID()
+	// As imgID has been retained in pullRepository, no need to retain again
+	p.graph.Retain(sessionID, history[1:]...)
+	defer p.graph.Release(sessionID, history[1:]...)
+
 	layersDownloaded := false
 	layersDownloaded := false
 	for i := len(history) - 1; i >= 0; i-- {
 	for i := len(history) - 1; i >= 0; i-- {
 		id := history[i]
 		id := history[i]

+ 17 - 5
graph/pull_v2.go

@@ -21,11 +21,12 @@ import (
 
 
 type v2Puller struct {
 type v2Puller struct {
 	*TagStore
 	*TagStore
-	endpoint registry.APIEndpoint
-	config   *ImagePullConfig
-	sf       *streamformatter.StreamFormatter
-	repoInfo *registry.RepositoryInfo
-	repo     distribution.Repository
+	endpoint  registry.APIEndpoint
+	config    *ImagePullConfig
+	sf        *streamformatter.StreamFormatter
+	repoInfo  *registry.RepositoryInfo
+	repo      distribution.Repository
+	sessionID string
 }
 }
 
 
 func (p *v2Puller) Pull(tag string) (fallback bool, err error) {
 func (p *v2Puller) Pull(tag string) (fallback bool, err error) {
@@ -36,6 +37,8 @@ func (p *v2Puller) Pull(tag string) (fallback bool, err error) {
 		return true, err
 		return true, err
 	}
 	}
 
 
+	p.sessionID = stringid.GenerateRandomID()
+
 	if err := p.pullV2Repository(tag); err != nil {
 	if err := p.pullV2Repository(tag); err != nil {
 		if registry.ContinueOnError(err) {
 		if registry.ContinueOnError(err) {
 			logrus.Debugf("Error trying v2 registry: %v", err)
 			logrus.Debugf("Error trying v2 registry: %v", err)
@@ -198,6 +201,12 @@ func (p *v2Puller) pullV2Tag(tag, taggedName string) (bool, error) {
 	out.Write(p.sf.FormatStatus(tag, "Pulling from %s", p.repo.Name()))
 	out.Write(p.sf.FormatStatus(tag, "Pulling from %s", p.repo.Name()))
 
 
 	downloads := make([]downloadInfo, len(manifest.FSLayers))
 	downloads := make([]downloadInfo, len(manifest.FSLayers))
+
+	layerIDs := []string{}
+	defer func() {
+		p.graph.Release(p.sessionID, layerIDs...)
+	}()
+
 	for i := len(manifest.FSLayers) - 1; i >= 0; i-- {
 	for i := len(manifest.FSLayers) - 1; i >= 0; i-- {
 		img, err := NewImgJSON([]byte(manifest.History[i].V1Compatibility))
 		img, err := NewImgJSON([]byte(manifest.History[i].V1Compatibility))
 		if err != nil {
 		if err != nil {
@@ -207,6 +216,9 @@ func (p *v2Puller) pullV2Tag(tag, taggedName string) (bool, error) {
 		downloads[i].img = img
 		downloads[i].img = img
 		downloads[i].digest = manifest.FSLayers[i].BlobSum
 		downloads[i].digest = manifest.FSLayers[i].BlobSum
 
 
+		p.graph.Retain(p.sessionID, img.ID)
+		layerIDs = append(layerIDs, img.ID)
+
 		// Check if exists
 		// Check if exists
 		if p.graph.Exists(img.ID) {
 		if p.graph.Exists(img.ID) {
 			logrus.Debugf("Image already exists: %s", img.ID)
 			logrus.Debugf("Image already exists: %s", img.ID)

+ 8 - 0
registry/session.go

@@ -23,6 +23,7 @@ import (
 	"github.com/docker/docker/cliconfig"
 	"github.com/docker/docker/cliconfig"
 	"github.com/docker/docker/pkg/httputils"
 	"github.com/docker/docker/pkg/httputils"
 	"github.com/docker/docker/pkg/ioutils"
 	"github.com/docker/docker/pkg/ioutils"
+	"github.com/docker/docker/pkg/stringid"
 	"github.com/docker/docker/pkg/tarsum"
 	"github.com/docker/docker/pkg/tarsum"
 )
 )
 
 
@@ -35,6 +36,7 @@ type Session struct {
 	client        *http.Client
 	client        *http.Client
 	// TODO(tiborvass): remove authConfig
 	// TODO(tiborvass): remove authConfig
 	authConfig *cliconfig.AuthConfig
 	authConfig *cliconfig.AuthConfig
+	id         string
 }
 }
 
 
 type authTransport struct {
 type authTransport struct {
@@ -158,6 +160,7 @@ func NewSession(client *http.Client, authConfig *cliconfig.AuthConfig, endpoint
 		authConfig:    authConfig,
 		authConfig:    authConfig,
 		client:        client,
 		client:        client,
 		indexEndpoint: endpoint,
 		indexEndpoint: endpoint,
+		id:            stringid.GenerateRandomID(),
 	}
 	}
 
 
 	var alwaysSetBasicAuth bool
 	var alwaysSetBasicAuth bool
@@ -188,6 +191,11 @@ func NewSession(client *http.Client, authConfig *cliconfig.AuthConfig, endpoint
 	return r, nil
 	return r, nil
 }
 }
 
 
+// ID returns this registry session's ID.
+func (r *Session) ID() string {
+	return r.id
+}
+
 // Retrieve the history of a given image from the Registry.
 // Retrieve the history of a given image from the Registry.
 // Return a list of the parent's json (requested image included)
 // Return a list of the parent's json (requested image included)
 func (r *Session) GetRemoteHistory(imgID, registry string) ([]string, error) {
 func (r *Session) GetRemoteHistory(imgID, registry string) ([]string, error) {