浏览代码

Change poolAdd to return a boolean instead of an error

Previously, its other return value was used even when it returned an
error. This is awkward and goes against the convention. It also could
have resulted in a nil pointer dereference when an error was returned
because of an unknown pool type. This changes the unknown pool type
error to a panic (since the pool types are hardcoded at call sites and
must always be "push" or "pull"), and returns a "found" boolean instead
of an error.

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
Aaron Lehmann 10 年之前
父节点
当前提交
80513d85cf
共有 7 个文件被更改,包括 34 次插入36 次删除
  1. 2 2
      graph/load.go
  2. 8 14
      graph/pools_test.go
  3. 6 6
      graph/pull_v1.go
  4. 4 4
      graph/pull_v2.go
  5. 2 3
      graph/push_v1.go
  6. 2 2
      graph/push_v2.go
  7. 10 5
      graph/tags.go

+ 2 - 2
graph/load.go

@@ -106,8 +106,8 @@ func (s *TagStore) recursiveLoad(address, tmpImageDir string) error {
 		}
 		}
 
 
 		// ensure no two downloads of the same layer happen at the same time
 		// ensure no two downloads of the same layer happen at the same time
-		if ps, err := s.poolAdd("pull", "layer:"+img.ID); err != nil {
-			logrus.Debugf("Image (id: %s) load is already running, waiting: %v", img.ID, err)
+		if ps, found := s.poolAdd("pull", "layer:"+img.ID); found {
+			logrus.Debugf("Image (id: %s) load is already running, waiting", img.ID)
 			ps.Wait(nil, nil)
 			ps.Wait(nil, nil)
 			return nil
 			return nil
 		}
 		}

+ 8 - 14
graph/pools_test.go

@@ -17,20 +17,17 @@ func TestPools(t *testing.T) {
 		pushingPool: make(map[string]*progressreader.ProgressStatus),
 		pushingPool: make(map[string]*progressreader.ProgressStatus),
 	}
 	}
 
 
-	if _, err := s.poolAdd("pull", "test1"); err != nil {
-		t.Fatal(err)
-	}
-	if _, err := s.poolAdd("pull", "test2"); err != nil {
-		t.Fatal(err)
+	if _, found := s.poolAdd("pull", "test1"); found {
+		t.Fatal("Expected pull test1 not to be in progress")
 	}
 	}
-	if _, err := s.poolAdd("push", "test1"); err == nil || err.Error() != "pull test1 is already in progress" {
-		t.Fatalf("Expected `pull test1 is already in progress`")
+	if _, found := s.poolAdd("pull", "test2"); found {
+		t.Fatal("Expected pull test2 not to be in progress")
 	}
 	}
-	if _, err := s.poolAdd("pull", "test1"); err == nil || err.Error() != "pull test1 is already in progress" {
-		t.Fatalf("Expected `pull test1 is already in progress`")
+	if _, found := s.poolAdd("push", "test1"); !found {
+		t.Fatalf("Expected pull test1 to be in progress`")
 	}
 	}
-	if _, err := s.poolAdd("wait", "test3"); err == nil || err.Error() != "Unknown pool type" {
-		t.Fatalf("Expected `Unknown pool type`")
+	if _, found := s.poolAdd("pull", "test1"); !found {
+		t.Fatalf("Expected pull test1 to be in progress`")
 	}
 	}
 	if err := s.poolRemove("pull", "test2"); err != nil {
 	if err := s.poolRemove("pull", "test2"); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
@@ -44,7 +41,4 @@ func TestPools(t *testing.T) {
 	if err := s.poolRemove("push", "test1"); err != nil {
 	if err := s.poolRemove("push", "test1"); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
-	if err := s.poolRemove("wait", "test3"); err == nil || err.Error() != "Unknown pool type" {
-		t.Fatalf("Expected `Unknown pool type`")
-	}
 }
 }

+ 6 - 6
graph/pull_v1.go

@@ -138,8 +138,8 @@ func (p *v1Puller) pullRepository(askedTag string) error {
 			}
 			}
 
 
 			// ensure no two downloads of the same image happen at the same time
 			// ensure no two downloads of the same image happen at the same time
-			ps, err := p.poolAdd("pull", "img:"+img.ID)
-			if err != nil {
+			ps, found := p.poolAdd("pull", "img:"+img.ID)
+			if found {
 				msg := p.sf.FormatProgress(stringid.TruncateID(img.ID), "Layer already being pulled by another client. Waiting.", nil)
 				msg := p.sf.FormatProgress(stringid.TruncateID(img.ID), "Layer already being pulled by another client. Waiting.", nil)
 				ps.Wait(out, msg)
 				ps.Wait(out, msg)
 				out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Download complete", nil))
 				out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Download complete", nil))
@@ -155,7 +155,7 @@ func (p *v1Puller) pullRepository(askedTag string) error {
 
 
 			ps.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s", img.Tag, p.repoInfo.CanonicalName), nil))
 			ps.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s", img.Tag, p.repoInfo.CanonicalName), nil))
 			success := false
 			success := false
-			var lastErr error
+			var lastErr, err error
 			var isDownloaded bool
 			var isDownloaded bool
 			for _, ep := range p.repoInfo.Index.Mirrors {
 			for _, ep := range p.repoInfo.Index.Mirrors {
 				ep += "v1/"
 				ep += "v1/"
@@ -244,9 +244,9 @@ func (p *v1Puller) pullImage(out io.Writer, imgID, endpoint string, token []stri
 		id := history[i]
 		id := history[i]
 
 
 		// ensure no two downloads of the same layer happen at the same time
 		// ensure no two downloads of the same layer happen at the same time
-		ps, err := p.poolAdd("pull", "layer:"+id)
-		if err != nil {
-			logrus.Debugf("Image (id: %s) pull is already running, skipping: %v", id, err)
+		ps, found := p.poolAdd("pull", "layer:"+id)
+		if found {
+			logrus.Debugf("Image (id: %s) pull is already running, skipping", id)
 			msg := p.sf.FormatProgress(stringid.TruncateID(imgID), "Layer already being pulled by another client. Waiting.", nil)
 			msg := p.sf.FormatProgress(stringid.TruncateID(imgID), "Layer already being pulled by another client. Waiting.", nil)
 			ps.Wait(out, msg)
 			ps.Wait(out, msg)
 		} else {
 		} else {

+ 4 - 4
graph/pull_v2.go

@@ -73,8 +73,8 @@ func (p *v2Puller) pullV2Repository(tag string) (err error) {
 
 
 	}
 	}
 
 
-	ps, err := p.poolAdd("pull", taggedName)
-	if err != nil {
+	ps, found := p.poolAdd("pull", taggedName)
+	if found {
 		// Another pull of the same repository is already taking place; just wait for it to finish
 		// Another pull of the same repository is already taking place; just wait for it to finish
 		msg := p.sf.FormatStatus("", "Repository %s already being pulled by another client. Waiting.", p.repoInfo.CanonicalName)
 		msg := p.sf.FormatStatus("", "Repository %s already being pulled by another client. Waiting.", p.repoInfo.CanonicalName)
 		ps.Wait(p.config.OutStream, msg)
 		ps.Wait(p.config.OutStream, msg)
@@ -119,8 +119,8 @@ func (p *v2Puller) download(di *downloadInfo) {
 
 
 	out := di.out
 	out := di.out
 
 
-	ps, err := p.poolAdd("pull", "img:"+di.img.ID)
-	if err != nil {
+	ps, found := p.poolAdd("pull", "img:"+di.img.ID)
+	if found {
 		msg := p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Layer already being pulled by another client. Waiting.", nil)
 		msg := p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Layer already being pulled by another client. Waiting.", nil)
 		ps.Wait(out, msg)
 		ps.Wait(out, msg)
 		out.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Download complete", nil))
 		out.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Download complete", nil))

+ 2 - 3
graph/push_v1.go

@@ -214,7 +214,6 @@ func (p *v1Pusher) pushImageToEndpoint(endpoint string, imageIDs []string, tags
 
 
 // pushRepository pushes layers that do not already exist on the registry.
 // pushRepository pushes layers that do not already exist on the registry.
 func (p *v1Pusher) pushRepository(tag string) error {
 func (p *v1Pusher) pushRepository(tag string) error {
-
 	logrus.Debugf("Local repo: %s", p.localRepo)
 	logrus.Debugf("Local repo: %s", p.localRepo)
 	p.out = ioutils.NewWriteFlusher(p.config.OutStream)
 	p.out = ioutils.NewWriteFlusher(p.config.OutStream)
 	imgList, tags, err := p.getImageList(tag)
 	imgList, tags, err := p.getImageList(tag)
@@ -229,8 +228,8 @@ func (p *v1Pusher) pushRepository(tag string) error {
 		logrus.Debugf("Pushing ID: %s with Tag: %s", data.ID, data.Tag)
 		logrus.Debugf("Pushing ID: %s with Tag: %s", data.ID, data.Tag)
 	}
 	}
 
 
-	if _, err := p.poolAdd("push", p.repoInfo.LocalName); err != nil {
-		return err
+	if _, found := p.poolAdd("push", p.repoInfo.LocalName); found {
+		return fmt.Errorf("push or pull %s is already in progress", p.repoInfo.LocalName)
 	}
 	}
 	defer p.poolRemove("push", p.repoInfo.LocalName)
 	defer p.poolRemove("push", p.repoInfo.LocalName)
 
 

+ 2 - 2
graph/push_v2.go

@@ -57,8 +57,8 @@ func (p *v2Pusher) getImageTags(askedTag string) ([]string, error) {
 
 
 func (p *v2Pusher) pushV2Repository(tag string) error {
 func (p *v2Pusher) pushV2Repository(tag string) error {
 	localName := p.repoInfo.LocalName
 	localName := p.repoInfo.LocalName
-	if _, err := p.poolAdd("push", localName); err != nil {
-		return err
+	if _, found := p.poolAdd("push", localName); found {
+		return fmt.Errorf("push or pull %s is already in progress", localName)
 	}
 	}
 	defer p.poolRemove("push", localName)
 	defer p.poolRemove("push", localName)
 
 

+ 10 - 5
graph/tags.go

@@ -428,27 +428,32 @@ func validateDigest(dgst string) error {
 	return nil
 	return nil
 }
 }
 
 
-func (store *TagStore) poolAdd(kind, key string) (*progressreader.ProgressStatus, error) {
+// poolAdd checks if a push or pull is already running, and returns (ps, true)
+// if a running operation is found. Otherwise, it creates a new one and returns
+// (ps, false).
+func (store *TagStore) poolAdd(kind, key string) (*progressreader.ProgressStatus, bool) {
 	store.Lock()
 	store.Lock()
 	defer store.Unlock()
 	defer store.Unlock()
 
 
 	if p, exists := store.pullingPool[key]; exists {
 	if p, exists := store.pullingPool[key]; exists {
-		return p, fmt.Errorf("pull %s is already in progress", key)
+		return p, true
 	}
 	}
 	if p, exists := store.pushingPool[key]; exists {
 	if p, exists := store.pushingPool[key]; exists {
-		return p, fmt.Errorf("push %s is already in progress", key)
+		return p, true
 	}
 	}
 
 
 	ps := progressreader.NewProgressStatus()
 	ps := progressreader.NewProgressStatus()
+
 	switch kind {
 	switch kind {
 	case "pull":
 	case "pull":
 		store.pullingPool[key] = ps
 		store.pullingPool[key] = ps
 	case "push":
 	case "push":
 		store.pushingPool[key] = ps
 		store.pushingPool[key] = ps
 	default:
 	default:
-		return nil, fmt.Errorf("Unknown pool type")
+		panic("Unknown pool type")
 	}
 	}
-	return ps, nil
+
+	return ps, false
 }
 }
 
 
 func (store *TagStore) poolRemove(kind, key string) error {
 func (store *TagStore) poolRemove(kind, key string) error {