فهرست منبع

Add ID to JSONMessage in pull
Use goroutines to pull in parallel
If multiple images pulled at the same time, each progress is displayed on a new line

Victor Vieux 12 سال پیش
والد
کامیت
0e71e368a8
4فایلهای تغییر یافته به همراه48 افزوده شده و 30 حذف شده
  1. 1 1
      commands.go
  2. 1 1
      graph.go
  3. 37 24
      server.go
  4. 9 4
      utils/utils.go

+ 1 - 1
commands.go

@@ -196,7 +196,7 @@ func (cli *DockerCli) CmdBuild(args ...string) error {
 	// FIXME: ProgressReader shouldn't be this annoyning to use
 	if context != nil {
 		sf := utils.NewStreamFormatter(false)
-		body = utils.ProgressReader(ioutil.NopCloser(context), 0, cli.err, sf.FormatProgress("Uploading context", "%v bytes%0.0s%0.0s"), sf)
+		body = utils.ProgressReader(ioutil.NopCloser(context), 0, cli.err, sf.FormatProgress("Uploading context", "%v bytes%0.0s%0.0s", ""), sf)
 	}
 	// Upload the build context
 	v := &url.Values{}

+ 1 - 1
graph.go

@@ -175,7 +175,7 @@ func (graph *Graph) TempLayerArchive(id string, compression Compression, sf *uti
 	if err != nil {
 		return nil, err
 	}
-	return NewTempArchive(utils.ProgressReader(ioutil.NopCloser(archive), 0, output, sf.FormatProgress("Buffering to disk", "%v/%v (%v)"), sf), tmp.Root)
+	return NewTempArchive(utils.ProgressReader(ioutil.NopCloser(archive), 0, output, sf.FormatProgress("Buffering to disk", "%v/%v (%v)", ""), sf), tmp.Root)
 }
 
 // Mktemp creates a temporary sub-directory inside the graph's filesystem.

+ 37 - 24
server.go

@@ -145,7 +145,7 @@ func (srv *Server) ImageInsert(name, url, path string, out io.Writer, sf *utils.
 		return "", err
 	}
 
-	if err := c.Inject(utils.ProgressReader(file.Body, int(file.ContentLength), out, sf.FormatProgress("Downloading", "%8v/%v (%v)"), sf), path); err != nil {
+	if err := c.Inject(utils.ProgressReader(file.Body, int(file.ContentLength), out, sf.FormatProgress("Downloading", "%8v/%v (%v)", ""), sf), path); err != nil {
 		return "", err
 	}
 	// FIXME: Handle custom repo, tag comment, author
@@ -425,7 +425,7 @@ func (srv *Server) pullImage(r *registry.Registry, out io.Writer, imgID, endpoin
 				return err
 			}
 			defer layer.Close()
-			if err := srv.runtime.graph.Register(utils.ProgressReader(layer, imgSize, out, sf.FormatProgress("Downloading", "%8v/%v (%v)"), sf), false, img); err != nil {
+			if err := srv.runtime.graph.Register(utils.ProgressReader(layer, imgSize, out, sf.FormatProgress("Downloading", "%8v/%v (%v)", id), sf), false, img); err != nil {
 				return err
 			}
 		}
@@ -477,30 +477,43 @@ func (srv *Server) pullRepository(r *registry.Registry, out io.Writer, localName
 		repoData.ImgList[id].Tag = askedTag
 	}
 
-	for _, img := range repoData.ImgList {
-		if askedTag != "" && img.Tag != askedTag {
-			utils.Debugf("(%s) does not match %s (id: %s), skipping", img.Tag, askedTag, img.ID)
-			continue
-		}
+	errors := make(chan error)
+	for _, image := range repoData.ImgList {
+		go func(img *registry.ImgData) {
+			if askedTag != "" && img.Tag != askedTag {
+				utils.Debugf("(%s) does not match %s (id: %s), skipping", img.Tag, askedTag, img.ID)
+				errors <- nil
+				return
+			}
 
-		if img.Tag == "" {
-			utils.Debugf("Image (id: %s) present in this repository but untagged, skipping", img.ID)
-			continue
-		}
-		out.Write(sf.FormatStatus("Pulling image %s (%s) from %s", img.ID, img.Tag, localName))
-		success := false
-		for _, ep := range repoData.Endpoints {
-			if err := srv.pullImage(r, out, img.ID, ep, repoData.Tokens, sf); err != nil {
-				out.Write(sf.FormatStatus("Error while retrieving image for tag: %s (%s); checking next endpoint", askedTag, err))
-				continue
+			if img.Tag == "" {
+				utils.Debugf("Image (id: %s) present in this repository but untagged, skipping", img.ID)
+				errors <- nil
+				return
 			}
-			success = true
-			break
-		}
-		if !success {
-			return fmt.Errorf("Could not find repository on any of the indexed registries.")
+			out.Write(sf.FormatStatus("Pulling image %s (%s) from %s", img.ID, img.Tag, localName))
+			success := false
+			for _, ep := range repoData.Endpoints {
+				if err := srv.pullImage(r, out, img.ID, ep, repoData.Tokens, sf); err != nil {
+					out.Write(sf.FormatStatus("Error while retrieving image for tag: %s (%s); checking next endpoint", askedTag, err))
+					continue
+				}
+				success = true
+				break
+			}
+			if !success {
+				errors <- fmt.Errorf("Could not find repository on any of the indexed registries.")
+			}
+			errors <- nil
+		}(image)
+	}
+
+	for i := 0; i < len(repoData.ImgList); i++ {
+		if err := <-errors; err != nil {
+			return err
 		}
 	}
+
 	for tag, id := range tagsList {
 		if askedTag != "" && tag != askedTag {
 			continue
@@ -748,7 +761,7 @@ func (srv *Server) pushImage(r *registry.Registry, out io.Writer, remote, imgID,
 	}
 
 	// Send the layer
-	if err := r.PushImageLayerRegistry(imgData.ID, utils.ProgressReader(layerData, int(layerData.Size), out, sf.FormatProgress("Pushing", "%8v/%v (%v)"), sf), ep, token); err != nil {
+	if err := r.PushImageLayerRegistry(imgData.ID, utils.ProgressReader(layerData, int(layerData.Size), out, sf.FormatProgress("Pushing", "%8v/%v (%v)", ""), sf), ep, token); err != nil {
 		return err
 	}
 	return nil
@@ -818,7 +831,7 @@ func (srv *Server) ImageImport(src, repo, tag string, in io.Reader, out io.Write
 		if err != nil {
 			return err
 		}
-		archive = utils.ProgressReader(resp.Body, int(resp.ContentLength), out, sf.FormatProgress("Importing", "%8v/%v (%v)"), sf)
+		archive = utils.ProgressReader(resp.Body, int(resp.ContentLength), out, sf.FormatProgress("Importing", "%8v/%v (%v)", ""), sf)
 	}
 	img, err := srv.runtime.graph.Create(archive, nil, "Imported from "+src, "", nil)
 	if err != nil {

+ 9 - 4
utils/utils.go

@@ -107,7 +107,7 @@ func (r *progressReader) Close() error {
 func ProgressReader(r io.ReadCloser, size int, output io.Writer, template []byte, sf *StreamFormatter) *progressReader {
 	tpl := string(template)
 	if tpl == "" {
-		tpl = string(sf.FormatProgress("", "%8v/%v (%v)"))
+		tpl = string(sf.FormatProgress("", "%8v/%v (%v)", ""))
 	}
 	return &progressReader{r, NewWriteFlusher(output), size, 0, 0, tpl, sf}
 }
@@ -587,11 +587,14 @@ type NopFlusher struct{}
 func (f *NopFlusher) Flush() {}
 
 type WriteFlusher struct {
+	sync.Mutex
 	w       io.Writer
 	flusher http.Flusher
 }
 
 func (wf *WriteFlusher) Write(b []byte) (n int, err error) {
+	wf.Lock()
+	defer wf.Unlock()
 	n, err = wf.w.Write(b)
 	wf.flusher.Flush()
 	return n, err
@@ -619,7 +622,9 @@ func (jm *JSONMessage) Display(out io.Writer) (error) {
 	if jm.Time != 0 {
 		fmt.Fprintf(out, "[%s] ", time.Unix(jm.Time, 0))
 	}
-	if jm.Progress != "" {
+	if jm.Progress != "" && jm.ID != ""{
+		fmt.Fprintf(out, "\n%s %s %s\r", jm.Status, jm.ID, jm.Progress)
+	} else if jm.Progress != "" {
 		fmt.Fprintf(out, "%s %s\r", jm.Status, jm.Progress)
 	} else if jm.Error != "" {
 		return fmt.Errorf(jm.Error)
@@ -665,10 +670,10 @@ func (sf *StreamFormatter) FormatError(err error) []byte {
 	return []byte("Error: " + err.Error() + "\r\n")
 }
 
-func (sf *StreamFormatter) FormatProgress(action, str string) []byte {
+func (sf *StreamFormatter) FormatProgress(action, str, id string) []byte {
 	sf.used = true
 	if sf.json {
-		b, err := json.Marshal(&JSONMessage{Status: action, Progress: str})
+		b, err := json.Marshal(&JSONMessage{Status: action, Progress: str, ID:id})
 		if err != nil {
 			return nil
 		}