浏览代码

Merge pull request #57 from shykes/wait_on_pull_already

Wait on pull already in progress
Michael Crosby 11 年之前
父节点
当前提交
a9230af52e
共有 2 个文件被更改,包括 39 次插入47 次删除
  1. 27 24
      server.go
  2. 12 23
      server_unit_test.go

+ 27 - 24
server.go

@@ -732,9 +732,9 @@ func (srv *Server) pullImage(r *registry.Registry, out io.Writer, imgID, endpoin
 		id := history[i]
 		id := history[i]
 
 
 		// ensure no two downloads of the same layer happen at the same time
 		// ensure no two downloads of the same layer happen at the same time
-		if err := srv.poolAdd("pull", "layer:"+id); err != nil {
+		if c, err := srv.poolAdd("pull", "layer:"+id); err != nil {
 			utils.Errorf("Image (id: %s) pull is already running, skipping: %v", id, err)
 			utils.Errorf("Image (id: %s) pull is already running, skipping: %v", id, err)
-			return nil
+			<-c
 		}
 		}
 		defer srv.poolRemove("pull", "layer:"+id)
 		defer srv.poolRemove("pull", "layer:"+id)
 
 
@@ -829,7 +829,7 @@ func (srv *Server) pullRepository(r *registry.Registry, out io.Writer, localName
 			}
 			}
 
 
 			// ensure no two downloads of the same image happen at the same time
 			// ensure no two downloads of the same image happen at the same time
-			if err := srv.poolAdd("pull", "img:"+img.ID); err != nil {
+			if _, err := srv.poolAdd("pull", "img:"+img.ID); err != nil {
 				utils.Errorf("Image (id: %s) pull is already running, skipping: %v", img.ID, err)
 				utils.Errorf("Image (id: %s) pull is already running, skipping: %v", img.ID, err)
 				if parallel {
 				if parallel {
 					errors <- nil
 					errors <- nil
@@ -900,38 +900,41 @@ func (srv *Server) pullRepository(r *registry.Registry, out io.Writer, localName
 	return nil
 	return nil
 }
 }
 
 
-func (srv *Server) poolAdd(kind, key string) error {
+func (srv *Server) poolAdd(kind, key string) (chan struct{}, error) {
 	srv.Lock()
 	srv.Lock()
 	defer srv.Unlock()
 	defer srv.Unlock()
 
 
-	if _, exists := srv.pullingPool[key]; exists {
-		return fmt.Errorf("pull %s is already in progress", key)
+	if c, exists := srv.pullingPool[key]; exists {
+		return c, fmt.Errorf("pull %s is already in progress", key)
 	}
 	}
-	if _, exists := srv.pushingPool[key]; exists {
-		return fmt.Errorf("push %s is already in progress", key)
+	if c, exists := srv.pushingPool[key]; exists {
+		return c, fmt.Errorf("push %s is already in progress", key)
 	}
 	}
 
 
+	c := make(chan struct{})
 	switch kind {
 	switch kind {
 	case "pull":
 	case "pull":
-		srv.pullingPool[key] = struct{}{}
-		break
+		srv.pullingPool[key] = c
 	case "push":
 	case "push":
-		srv.pushingPool[key] = struct{}{}
-		break
+		srv.pushingPool[key] = c
 	default:
 	default:
-		return fmt.Errorf("Unknown pool type")
+		return nil, fmt.Errorf("Unknown pool type")
 	}
 	}
-	return nil
+	return c, nil
 }
 }
 
 
 func (srv *Server) poolRemove(kind, key string) error {
 func (srv *Server) poolRemove(kind, key string) error {
 	switch kind {
 	switch kind {
 	case "pull":
 	case "pull":
-		delete(srv.pullingPool, key)
-		break
+		if c, exists := srv.pullingPool[key]; exists {
+			close(c)
+			delete(srv.pullingPool, key)
+		}
 	case "push":
 	case "push":
-		delete(srv.pushingPool, key)
-		break
+		if c, exists := srv.pushingPool[key]; exists {
+			close(c)
+			delete(srv.pushingPool, key)
+		}
 	default:
 	default:
 		return fmt.Errorf("Unknown pool type")
 		return fmt.Errorf("Unknown pool type")
 	}
 	}
@@ -943,7 +946,7 @@ func (srv *Server) ImagePull(localName string, tag string, out io.Writer, sf *ut
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
-	if err := srv.poolAdd("pull", localName+":"+tag); err != nil {
+	if _, err := srv.poolAdd("pull", localName+":"+tag); err != nil {
 		return err
 		return err
 	}
 	}
 	defer srv.poolRemove("pull", localName+":"+tag)
 	defer srv.poolRemove("pull", localName+":"+tag)
@@ -1138,7 +1141,7 @@ func (srv *Server) pushImage(r *registry.Registry, out io.Writer, remote, imgID,
 
 
 // FIXME: Allow to interrupt current push when new push of same image is done.
 // FIXME: Allow to interrupt current push when new push of same image is done.
 func (srv *Server) ImagePush(localName string, out io.Writer, sf *utils.StreamFormatter, authConfig *auth.AuthConfig, metaHeaders map[string][]string) error {
 func (srv *Server) ImagePush(localName string, out io.Writer, sf *utils.StreamFormatter, authConfig *auth.AuthConfig, metaHeaders map[string][]string) error {
-	if err := srv.poolAdd("push", localName); err != nil {
+	if _, err := srv.poolAdd("push", localName); err != nil {
 		return err
 		return err
 	}
 	}
 	defer srv.poolRemove("push", localName)
 	defer srv.poolRemove("push", localName)
@@ -1769,8 +1772,8 @@ func NewServer(eng *engine.Engine, config *DaemonConfig) (*Server, error) {
 	srv := &Server{
 	srv := &Server{
 		Eng:         eng,
 		Eng:         eng,
 		runtime:     runtime,
 		runtime:     runtime,
-		pullingPool: make(map[string]struct{}),
-		pushingPool: make(map[string]struct{}),
+		pullingPool: make(map[string]chan struct{}),
+		pushingPool: make(map[string]chan struct{}),
 		events:      make([]utils.JSONMessage, 0, 64), //only keeps the 64 last events
 		events:      make([]utils.JSONMessage, 0, 64), //only keeps the 64 last events
 		listeners:   make(map[string]chan utils.JSONMessage),
 		listeners:   make(map[string]chan utils.JSONMessage),
 		reqFactory:  nil,
 		reqFactory:  nil,
@@ -1807,8 +1810,8 @@ func (srv *Server) LogEvent(action, id, from string) *utils.JSONMessage {
 type Server struct {
 type Server struct {
 	sync.Mutex
 	sync.Mutex
 	runtime     *Runtime
 	runtime     *Runtime
-	pullingPool map[string]struct{}
-	pushingPool map[string]struct{}
+	pullingPool map[string]chan struct{}
+	pushingPool map[string]chan struct{}
 	events      []utils.JSONMessage
 	events      []utils.JSONMessage
 	listeners   map[string]chan utils.JSONMessage
 	listeners   map[string]chan utils.JSONMessage
 	reqFactory  *utils.HTTPRequestFactory
 	reqFactory  *utils.HTTPRequestFactory

+ 12 - 23
server_unit_test.go

@@ -8,49 +8,38 @@ import (
 
 
 func TestPools(t *testing.T) {
 func TestPools(t *testing.T) {
 	srv := &Server{
 	srv := &Server{
-		pullingPool: make(map[string]struct{}),
-		pushingPool: make(map[string]struct{}),
+		pullingPool: make(map[string]chan struct{}),
+		pushingPool: make(map[string]chan struct{}),
 	}
 	}
 
 
-	err := srv.poolAdd("pull", "test1")
-	if err != nil {
+	if _, err := srv.poolAdd("pull", "test1"); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
-	err = srv.poolAdd("pull", "test2")
-	if err != nil {
+	if _, err := srv.poolAdd("pull", "test2"); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
-	err = srv.poolAdd("push", "test1")
-	if err == nil || err.Error() != "pull test1 is already in progress" {
+	if _, err := srv.poolAdd("push", "test1"); err == nil || err.Error() != "pull test1 is already in progress" {
 		t.Fatalf("Expected `pull test1 is already in progress`")
 		t.Fatalf("Expected `pull test1 is already in progress`")
 	}
 	}
-	err = srv.poolAdd("pull", "test1")
-	if err == nil || err.Error() != "pull test1 is already in progress" {
+	if _, err := srv.poolAdd("pull", "test1"); err == nil || err.Error() != "pull test1 is already in progress" {
 		t.Fatalf("Expected `pull test1 is already in progress`")
 		t.Fatalf("Expected `pull test1 is already in progress`")
 	}
 	}
-	err = srv.poolAdd("wait", "test3")
-	if err == nil || err.Error() != "Unknown pool type" {
+	if _, err := srv.poolAdd("wait", "test3"); err == nil || err.Error() != "Unknown pool type" {
 		t.Fatalf("Expected `Unknown pool type`")
 		t.Fatalf("Expected `Unknown pool type`")
 	}
 	}
-
-	err = srv.poolRemove("pull", "test2")
-	if err != nil {
+	if err := srv.poolRemove("pull", "test2"); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
-	err = srv.poolRemove("pull", "test2")
-	if err != nil {
+	if err := srv.poolRemove("pull", "test2"); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
-	err = srv.poolRemove("pull", "test1")
-	if err != nil {
+	if err := srv.poolRemove("pull", "test1"); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
-	err = srv.poolRemove("push", "test1")
-	if err != nil {
+	if err := srv.poolRemove("push", "test1"); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
-	err = srv.poolRemove("wait", "test3")
-	if err == nil || err.Error() != "Unknown pool type" {
+	if err := srv.poolRemove("wait", "test3"); err == nil || err.Error() != "Unknown pool type" {
 		t.Fatalf("Expected `Unknown pool type`")
 		t.Fatalf("Expected `Unknown pool type`")
 	}
 	}
 }
 }