Forráskód Böngészése

Merge pull request #20831 from aaronlehmann/concurrent-upload

Fix concurrent uploads that share layers
Vincent Demeester 9 éve
szülő
commit
621a148da3

+ 22 - 20
distribution/push_v2.go

@@ -42,7 +42,7 @@ type v2Pusher struct {
 	config            *ImagePushConfig
 	repo              distribution.Repository
 
-	// pushState is state built by the Download functions.
+	// pushState is state built by the Upload functions.
 	pushState pushState
 }
 
@@ -224,6 +224,7 @@ type v2PushDescriptor struct {
 	repoInfo          reference.Named
 	repo              distribution.Repository
 	pushState         *pushState
+	remoteDescriptor  distribution.Descriptor
 }
 
 func (pd *v2PushDescriptor) Key() string {
@@ -238,16 +239,16 @@ func (pd *v2PushDescriptor) DiffID() layer.DiffID {
 	return pd.layer.DiffID()
 }
 
-func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.Output) error {
+func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.Output) (distribution.Descriptor, error) {
 	diffID := pd.DiffID()
 
 	pd.pushState.Lock()
-	if _, ok := pd.pushState.remoteLayers[diffID]; ok {
+	if descriptor, ok := pd.pushState.remoteLayers[diffID]; ok {
 		// it is already known that the push is not needed and
 		// therefore doing a stat is unnecessary
 		pd.pushState.Unlock()
 		progress.Update(progressOutput, pd.ID(), "Layer already exists")
-		return nil
+		return descriptor, nil
 	}
 	pd.pushState.Unlock()
 
@@ -257,14 +258,14 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
 		descriptor, exists, err := layerAlreadyExists(ctx, v2Metadata, pd.repoInfo, pd.repo, pd.pushState)
 		if err != nil {
 			progress.Update(progressOutput, pd.ID(), "Image push failed")
-			return retryOnError(err)
+			return distribution.Descriptor{}, retryOnError(err)
 		}
 		if exists {
 			progress.Update(progressOutput, pd.ID(), "Layer already exists")
 			pd.pushState.Lock()
 			pd.pushState.remoteLayers[diffID] = descriptor
 			pd.pushState.Unlock()
-			return nil
+			return descriptor, nil
 		}
 	}
 
@@ -328,9 +329,9 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
 
 			// Cache mapping from this layer's DiffID to the blobsum
 			if err := pd.v2MetadataService.Add(diffID, metadata.V2Metadata{Digest: mountFrom.Digest, SourceRepository: pd.repoInfo.FullName()}); err != nil {
-				return xfer.DoNotRetry{Err: err}
+				return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
 			}
-			return nil
+			return err.Descriptor, nil
 		case nil:
 			// blob upload session created successfully, so begin the upload
 			mountAttemptsRemaining = 0
@@ -345,14 +346,14 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
 	if layerUpload == nil {
 		layerUpload, err = bs.Create(ctx)
 		if err != nil {
-			return retryOnError(err)
+			return distribution.Descriptor{}, retryOnError(err)
 		}
 	}
 	defer layerUpload.Close()
 
 	arch, err := pd.layer.TarStream()
 	if err != nil {
-		return xfer.DoNotRetry{Err: err}
+		return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
 	}
 
 	// don't care if this fails; best effort
@@ -371,12 +372,12 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
 	nn, err := layerUpload.ReadFrom(tee)
 	compressedReader.Close()
 	if err != nil {
-		return retryOnError(err)
+		return distribution.Descriptor{}, retryOnError(err)
 	}
 
 	pushDigest := digester.Digest()
 	if _, err := layerUpload.Commit(ctx, distribution.Descriptor{Digest: pushDigest}); err != nil {
-		return retryOnError(err)
+		return distribution.Descriptor{}, retryOnError(err)
 	}
 
 	logrus.Debugf("uploaded layer %s (%s), %d bytes", diffID, pushDigest, nn)
@@ -384,7 +385,7 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
 
 	// Cache mapping from this layer's DiffID to the blobsum
 	if err := pd.v2MetadataService.Add(diffID, metadata.V2Metadata{Digest: pushDigest, SourceRepository: pd.repoInfo.FullName()}); err != nil {
-		return xfer.DoNotRetry{Err: err}
+		return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
 	}
 
 	pd.pushState.Lock()
@@ -393,23 +394,24 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
 	// speaks the v2 protocol.
 	pd.pushState.confirmedV2 = true
 
-	pd.pushState.remoteLayers[diffID] = distribution.Descriptor{
+	descriptor := distribution.Descriptor{
 		Digest:    pushDigest,
 		MediaType: schema2.MediaTypeLayer,
 		Size:      nn,
 	}
+	pd.pushState.remoteLayers[diffID] = descriptor
 
 	pd.pushState.Unlock()
 
-	return nil
+	return descriptor, nil
+}
+
+func (pd *v2PushDescriptor) SetRemoteDescriptor(descriptor distribution.Descriptor) {
+	pd.remoteDescriptor = descriptor
 }
 
 func (pd *v2PushDescriptor) Descriptor() distribution.Descriptor {
-	// Not necessary to lock pushStatus because this is always
-	// called after all the mutation in pushStatus.
-	// By the time this function is called, every layer will have
-	// an entry in remoteLayers.
-	return pd.pushState.remoteLayers[pd.DiffID()]
+	return pd.remoteDescriptor
 }
 
 // layerAlreadyExists checks if the registry already know about any of the

+ 16 - 7
distribution/xfer/upload.go

@@ -5,6 +5,7 @@ import (
 	"time"
 
 	"github.com/Sirupsen/logrus"
+	"github.com/docker/distribution"
 	"github.com/docker/docker/layer"
 	"github.com/docker/docker/pkg/progress"
 	"golang.org/x/net/context"
@@ -28,8 +29,8 @@ func NewLayerUploadManager(concurrencyLimit int) *LayerUploadManager {
 type uploadTransfer struct {
 	Transfer
 
-	diffID layer.DiffID
-	err    error
+	remoteDescriptor distribution.Descriptor
+	err              error
 }
 
 // An UploadDescriptor references a layer that may need to be uploaded.
@@ -41,7 +42,12 @@ type UploadDescriptor interface {
 	// DiffID should return the DiffID for this layer.
 	DiffID() layer.DiffID
 	// Upload is called to perform the Upload.
-	Upload(ctx context.Context, progressOutput progress.Output) error
+	Upload(ctx context.Context, progressOutput progress.Output) (distribution.Descriptor, error)
+	// SetRemoteDescriptor provides the distribution.Descriptor that was
+	// returned by Upload. This descriptor is not to be confused with
+	// the UploadDescriptor interface, which is used for internally
+	// identifying layers that are being uploaded.
+	SetRemoteDescriptor(descriptor distribution.Descriptor)
 }
 
 // Upload is a blocking function which ensures the listed layers are present on
@@ -50,7 +56,7 @@ type UploadDescriptor interface {
 func (lum *LayerUploadManager) Upload(ctx context.Context, layers []UploadDescriptor, progressOutput progress.Output) error {
 	var (
 		uploads          []*uploadTransfer
-		dedupDescriptors = make(map[string]struct{})
+		dedupDescriptors = make(map[string]*uploadTransfer)
 	)
 
 	for _, descriptor := range layers {
@@ -60,12 +66,12 @@ func (lum *LayerUploadManager) Upload(ctx context.Context, layers []UploadDescri
 		if _, present := dedupDescriptors[key]; present {
 			continue
 		}
-		dedupDescriptors[key] = struct{}{}
 
 		xferFunc := lum.makeUploadFunc(descriptor)
 		upload, watcher := lum.tm.Transfer(descriptor.Key(), xferFunc, progressOutput)
 		defer upload.Release(watcher)
 		uploads = append(uploads, upload.(*uploadTransfer))
+		dedupDescriptors[key] = upload.(*uploadTransfer)
 	}
 
 	for _, upload := range uploads {
@@ -78,6 +84,9 @@ func (lum *LayerUploadManager) Upload(ctx context.Context, layers []UploadDescri
 			}
 		}
 	}
+	for _, l := range layers {
+		l.SetRemoteDescriptor(dedupDescriptors[l.Key()].remoteDescriptor)
+	}
 
 	return nil
 }
@@ -86,7 +95,6 @@ func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) DoFun
 	return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer {
 		u := &uploadTransfer{
 			Transfer: NewTransfer(),
-			diffID:   descriptor.DiffID(),
 		}
 
 		go func() {
@@ -105,8 +113,9 @@ func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) DoFun
 
 			retries := 0
 			for {
-				err := descriptor.Upload(u.Transfer.Context(), progressOutput)
+				remoteDescriptor, err := descriptor.Upload(u.Transfer.Context(), progressOutput)
 				if err == nil {
+					u.remoteDescriptor = remoteDescriptor
 					break
 				}
 

+ 10 - 5
distribution/xfer/upload_test.go

@@ -6,6 +6,7 @@ import (
 	"testing"
 	"time"
 
+	"github.com/docker/distribution"
 	"github.com/docker/distribution/digest"
 	"github.com/docker/docker/layer"
 	"github.com/docker/docker/pkg/progress"
@@ -35,13 +36,17 @@ func (u *mockUploadDescriptor) DiffID() layer.DiffID {
 	return u.diffID
 }
 
+// SetRemoteDescriptor is not used in the mock.
+func (u *mockUploadDescriptor) SetRemoteDescriptor(remoteDescriptor distribution.Descriptor) {
+}
+
 // Upload is called to perform the upload.
-func (u *mockUploadDescriptor) Upload(ctx context.Context, progressOutput progress.Output) error {
+func (u *mockUploadDescriptor) Upload(ctx context.Context, progressOutput progress.Output) (distribution.Descriptor, error) {
 	if u.currentUploads != nil {
 		defer atomic.AddInt32(u.currentUploads, -1)
 
 		if atomic.AddInt32(u.currentUploads, 1) > maxUploadConcurrency {
-			return errors.New("concurrency limit exceeded")
+			return distribution.Descriptor{}, errors.New("concurrency limit exceeded")
 		}
 	}
 
@@ -49,7 +54,7 @@ func (u *mockUploadDescriptor) Upload(ctx context.Context, progressOutput progre
 	for i := int64(0); i <= 10; i++ {
 		select {
 		case <-ctx.Done():
-			return ctx.Err()
+			return distribution.Descriptor{}, ctx.Err()
 		case <-time.After(10 * time.Millisecond):
 			progressOutput.WriteProgress(progress.Progress{ID: u.ID(), Current: i, Total: 10})
 		}
@@ -57,10 +62,10 @@ func (u *mockUploadDescriptor) Upload(ctx context.Context, progressOutput progre
 
 	if u.simulateRetries != 0 {
 		u.simulateRetries--
-		return errors.New("simulating retry")
+		return distribution.Descriptor{}, errors.New("simulating retry")
 	}
 
-	return nil
+	return distribution.Descriptor{}, nil
 }
 
 func uploadDescriptors(currentUploads *int32) []UploadDescriptor {

+ 55 - 0
integration-cli/docker_cli_push_test.go

@@ -148,6 +148,61 @@ func (s *DockerSchema1RegistrySuite) TestPushEmptyLayer(c *check.C) {
 	testPushEmptyLayer(c)
 }
 
+// testConcurrentPush pushes multiple tags to the same repo
+// concurrently.
+func testConcurrentPush(c *check.C) {
+	repoName := fmt.Sprintf("%v/dockercli/busybox", privateRegistryURL)
+
+	repos := []string{}
+	for _, tag := range []string{"push1", "push2", "push3"} {
+		repo := fmt.Sprintf("%v:%v", repoName, tag)
+		_, err := buildImage(repo, fmt.Sprintf(`
+	FROM busybox
+	ENTRYPOINT ["/bin/echo"]
+	ENV FOO foo
+	ENV BAR bar
+	CMD echo %s
+`, repo), true)
+		c.Assert(err, checker.IsNil)
+		repos = append(repos, repo)
+	}
+
+	// Push tags, in parallel
+	results := make(chan error)
+
+	for _, repo := range repos {
+		go func(repo string) {
+			_, _, err := runCommandWithOutput(exec.Command(dockerBinary, "push", repo))
+			results <- err
+		}(repo)
+	}
+
+	for range repos {
+		err := <-results
+		c.Assert(err, checker.IsNil, check.Commentf("concurrent push failed with error: %v", err))
+	}
+
+	// Clear local images store.
+	args := append([]string{"rmi"}, repos...)
+	dockerCmd(c, args...)
+
+	// Re-pull and run individual tags, to make sure pushes succeeded
+	for _, repo := range repos {
+		dockerCmd(c, "pull", repo)
+		dockerCmd(c, "inspect", repo)
+		out, _ := dockerCmd(c, "run", "--rm", repo)
+		c.Assert(strings.TrimSpace(out), checker.Equals, "/bin/sh -c echo "+repo)
+	}
+}
+
+func (s *DockerRegistrySuite) TestConcurrentPush(c *check.C) {
+	testConcurrentPush(c)
+}
+
+func (s *DockerSchema1RegistrySuite) TestConcurrentPush(c *check.C) {
+	testConcurrentPush(c)
+}
+
 func (s *DockerRegistrySuite) TestCrossRepositoryLayerPush(c *check.C) {
 	sourceRepoName := fmt.Sprintf("%v/dockercli/busybox", privateRegistryURL)
 	// tag the image to upload it to the private registry