|
@@ -13,6 +13,7 @@ import (
|
|
|
log "github.com/Sirupsen/logrus"
|
|
|
"github.com/docker/docker/engine"
|
|
|
"github.com/docker/docker/image"
|
|
|
+ "github.com/docker/docker/pkg/common"
|
|
|
"github.com/docker/docker/pkg/tarsum"
|
|
|
"github.com/docker/docker/registry"
|
|
|
"github.com/docker/docker/utils"
|
|
@@ -170,9 +171,9 @@ func (s *TagStore) pullRepository(r *registry.Session, out io.Writer, repoInfo *
|
|
|
// ensure no two downloads of the same image happen at the same time
|
|
|
if c, err := s.poolAdd("pull", "img:"+img.ID); err != nil {
|
|
|
if c != nil {
|
|
|
- out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Layer already being pulled by another client. Waiting.", nil))
|
|
|
+ out.Write(sf.FormatProgress(common.TruncateID(img.ID), "Layer already being pulled by another client. Waiting.", nil))
|
|
|
<-c
|
|
|
- out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Download complete", nil))
|
|
|
+ out.Write(sf.FormatProgress(common.TruncateID(img.ID), "Download complete", nil))
|
|
|
} else {
|
|
|
log.Debugf("Image (id: %s) pull is already running, skipping: %v", img.ID, err)
|
|
|
}
|
|
@@ -183,12 +184,12 @@ func (s *TagStore) pullRepository(r *registry.Session, out io.Writer, repoInfo *
|
|
|
}
|
|
|
defer s.poolRemove("pull", "img:"+img.ID)
|
|
|
|
|
|
- out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s", img.Tag, repoInfo.CanonicalName), nil))
|
|
|
+ out.Write(sf.FormatProgress(common.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s", img.Tag, repoInfo.CanonicalName), nil))
|
|
|
success := false
|
|
|
var lastErr, err error
|
|
|
var is_downloaded bool
|
|
|
for _, ep := range repoInfo.Index.Mirrors {
|
|
|
- out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, mirror: %s", img.Tag, repoInfo.CanonicalName, ep), nil))
|
|
|
+ out.Write(sf.FormatProgress(common.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, mirror: %s", img.Tag, repoInfo.CanonicalName, ep), nil))
|
|
|
if is_downloaded, err = s.pullImage(r, out, img.ID, ep, repoData.Tokens, sf); err != nil {
|
|
|
// Don't report errors when pulling from mirrors.
|
|
|
log.Debugf("Error pulling image (%s) from %s, mirror: %s, %s", img.Tag, repoInfo.CanonicalName, ep, err)
|
|
@@ -200,12 +201,12 @@ func (s *TagStore) pullRepository(r *registry.Session, out io.Writer, repoInfo *
|
|
|
}
|
|
|
if !success {
|
|
|
for _, ep := range repoData.Endpoints {
|
|
|
- out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, endpoint: %s", img.Tag, repoInfo.CanonicalName, ep), nil))
|
|
|
+ out.Write(sf.FormatProgress(common.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, endpoint: %s", img.Tag, repoInfo.CanonicalName, ep), nil))
|
|
|
if is_downloaded, err = s.pullImage(r, out, img.ID, ep, repoData.Tokens, sf); err != nil {
|
|
|
// It's not ideal that only the last error is returned, it would be better to concatenate the errors.
|
|
|
// As the error is also given to the output stream the user will see the error.
|
|
|
lastErr = err
|
|
|
- out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Error pulling image (%s) from %s, endpoint: %s, %s", img.Tag, repoInfo.CanonicalName, ep, err), nil))
|
|
|
+ out.Write(sf.FormatProgress(common.TruncateID(img.ID), fmt.Sprintf("Error pulling image (%s) from %s, endpoint: %s, %s", img.Tag, repoInfo.CanonicalName, ep, err), nil))
|
|
|
continue
|
|
|
}
|
|
|
layers_downloaded = layers_downloaded || is_downloaded
|
|
@@ -215,13 +216,13 @@ func (s *TagStore) pullRepository(r *registry.Session, out io.Writer, repoInfo *
|
|
|
}
|
|
|
if !success {
|
|
|
err := fmt.Errorf("Error pulling image (%s) from %s, %v", img.Tag, repoInfo.CanonicalName, lastErr)
|
|
|
- out.Write(sf.FormatProgress(utils.TruncateID(img.ID), err.Error(), nil))
|
|
|
+ out.Write(sf.FormatProgress(common.TruncateID(img.ID), err.Error(), nil))
|
|
|
if parallel {
|
|
|
errors <- err
|
|
|
return
|
|
|
}
|
|
|
}
|
|
|
- out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Download complete", nil))
|
|
|
+ out.Write(sf.FormatProgress(common.TruncateID(img.ID), "Download complete", nil))
|
|
|
|
|
|
if parallel {
|
|
|
errors <- nil
|
|
@@ -268,7 +269,7 @@ func (s *TagStore) pullImage(r *registry.Session, out io.Writer, imgID, endpoint
|
|
|
if err != nil {
|
|
|
return false, err
|
|
|
}
|
|
|
- out.Write(sf.FormatProgress(utils.TruncateID(imgID), "Pulling dependent layers", nil))
|
|
|
+ out.Write(sf.FormatProgress(common.TruncateID(imgID), "Pulling dependent layers", nil))
|
|
|
// FIXME: Try to stream the images?
|
|
|
// FIXME: Launch the getRemoteImage() in goroutines
|
|
|
|
|
@@ -284,7 +285,7 @@ func (s *TagStore) pullImage(r *registry.Session, out io.Writer, imgID, endpoint
|
|
|
defer s.poolRemove("pull", "layer:"+id)
|
|
|
|
|
|
if !s.graph.Exists(id) {
|
|
|
- out.Write(sf.FormatProgress(utils.TruncateID(id), "Pulling metadata", nil))
|
|
|
+ out.Write(sf.FormatProgress(common.TruncateID(id), "Pulling metadata", nil))
|
|
|
var (
|
|
|
imgJSON []byte
|
|
|
imgSize int
|
|
@@ -295,7 +296,7 @@ func (s *TagStore) pullImage(r *registry.Session, out io.Writer, imgID, endpoint
|
|
|
for j := 1; j <= retries; j++ {
|
|
|
imgJSON, imgSize, err = r.GetRemoteImageJSON(id, endpoint, token)
|
|
|
if err != nil && j == retries {
|
|
|
- out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil))
|
|
|
+ out.Write(sf.FormatProgress(common.TruncateID(id), "Error pulling dependent layers", nil))
|
|
|
return layers_downloaded, err
|
|
|
} else if err != nil {
|
|
|
time.Sleep(time.Duration(j) * 500 * time.Millisecond)
|
|
@@ -304,7 +305,7 @@ func (s *TagStore) pullImage(r *registry.Session, out io.Writer, imgID, endpoint
|
|
|
img, err = image.NewImgJSON(imgJSON)
|
|
|
layers_downloaded = true
|
|
|
if err != nil && j == retries {
|
|
|
- out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil))
|
|
|
+ out.Write(sf.FormatProgress(common.TruncateID(id), "Error pulling dependent layers", nil))
|
|
|
return layers_downloaded, fmt.Errorf("Failed to parse json: %s", err)
|
|
|
} else if err != nil {
|
|
|
time.Sleep(time.Duration(j) * 500 * time.Millisecond)
|
|
@@ -320,7 +321,7 @@ func (s *TagStore) pullImage(r *registry.Session, out io.Writer, imgID, endpoint
|
|
|
if j > 1 {
|
|
|
status = fmt.Sprintf("Pulling fs layer [retries: %d]", j)
|
|
|
}
|
|
|
- out.Write(sf.FormatProgress(utils.TruncateID(id), status, nil))
|
|
|
+ out.Write(sf.FormatProgress(common.TruncateID(id), status, nil))
|
|
|
layer, err := r.GetRemoteImageLayer(img.ID, endpoint, token, int64(imgSize))
|
|
|
if uerr, ok := err.(*url.Error); ok {
|
|
|
err = uerr.Err
|
|
@@ -329,26 +330,26 @@ func (s *TagStore) pullImage(r *registry.Session, out io.Writer, imgID, endpoint
|
|
|
time.Sleep(time.Duration(j) * 500 * time.Millisecond)
|
|
|
continue
|
|
|
} else if err != nil {
|
|
|
- out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil))
|
|
|
+ out.Write(sf.FormatProgress(common.TruncateID(id), "Error pulling dependent layers", nil))
|
|
|
return layers_downloaded, err
|
|
|
}
|
|
|
layers_downloaded = true
|
|
|
defer layer.Close()
|
|
|
|
|
|
err = s.graph.Register(img,
|
|
|
- utils.ProgressReader(layer, imgSize, out, sf, false, utils.TruncateID(id), "Downloading"))
|
|
|
+ utils.ProgressReader(layer, imgSize, out, sf, false, common.TruncateID(id), "Downloading"))
|
|
|
if terr, ok := err.(net.Error); ok && terr.Timeout() && j < retries {
|
|
|
time.Sleep(time.Duration(j) * 500 * time.Millisecond)
|
|
|
continue
|
|
|
} else if err != nil {
|
|
|
- out.Write(sf.FormatProgress(utils.TruncateID(id), "Error downloading dependent layers", nil))
|
|
|
+ out.Write(sf.FormatProgress(common.TruncateID(id), "Error downloading dependent layers", nil))
|
|
|
return layers_downloaded, err
|
|
|
} else {
|
|
|
break
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- out.Write(sf.FormatProgress(utils.TruncateID(id), "Download complete", nil))
|
|
|
+ out.Write(sf.FormatProgress(common.TruncateID(id), "Download complete", nil))
|
|
|
}
|
|
|
return layers_downloaded, nil
|
|
|
}
|
|
@@ -463,16 +464,16 @@ func (s *TagStore) pullV2Tag(eng *engine.Engine, r *registry.Session, out io.Wri
|
|
|
return false, fmt.Errorf("expected 2 parts in the sumStr, got %#v", chunks)
|
|
|
}
|
|
|
sumType, checksum := chunks[0], chunks[1]
|
|
|
- out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Pulling fs layer", nil))
|
|
|
+ out.Write(sf.FormatProgress(common.TruncateID(img.ID), "Pulling fs layer", nil))
|
|
|
|
|
|
downloadFunc := func(di *downloadInfo) error {
|
|
|
log.Debugf("pulling blob %q to V1 img %s", sumStr, img.ID)
|
|
|
|
|
|
if c, err := s.poolAdd("pull", "img:"+img.ID); err != nil {
|
|
|
if c != nil {
|
|
|
- out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Layer already being pulled by another client. Waiting.", nil))
|
|
|
+ out.Write(sf.FormatProgress(common.TruncateID(img.ID), "Layer already being pulled by another client. Waiting.", nil))
|
|
|
<-c
|
|
|
- out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Download complete", nil))
|
|
|
+ out.Write(sf.FormatProgress(common.TruncateID(img.ID), "Download complete", nil))
|
|
|
} else {
|
|
|
log.Debugf("Image (id: %s) pull is already running, skipping: %v", img.ID, err)
|
|
|
}
|
|
@@ -495,16 +496,16 @@ func (s *TagStore) pullV2Tag(eng *engine.Engine, r *registry.Session, out io.Wri
|
|
|
return fmt.Errorf("unable to wrap image blob reader with TarSum: %s", err)
|
|
|
}
|
|
|
|
|
|
- io.Copy(tmpFile, utils.ProgressReader(ioutil.NopCloser(tarSumReader), int(l), out, sf, false, utils.TruncateID(img.ID), "Downloading"))
|
|
|
+ io.Copy(tmpFile, utils.ProgressReader(ioutil.NopCloser(tarSumReader), int(l), out, sf, false, common.TruncateID(img.ID), "Downloading"))
|
|
|
|
|
|
- out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Verifying Checksum", nil))
|
|
|
+ out.Write(sf.FormatProgress(common.TruncateID(img.ID), "Verifying Checksum", nil))
|
|
|
|
|
|
if finalChecksum := tarSumReader.Sum(nil); !strings.EqualFold(finalChecksum, sumStr) {
|
|
|
log.Infof("Image verification failed: checksum mismatch - expected %q but got %q", sumStr, finalChecksum)
|
|
|
verified = false
|
|
|
}
|
|
|
|
|
|
- out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Download complete", nil))
|
|
|
+ out.Write(sf.FormatProgress(common.TruncateID(img.ID), "Download complete", nil))
|
|
|
|
|
|
log.Debugf("Downloaded %s to tempfile %s", img.ID, tmpFile.Name())
|
|
|
di.tmpFile = tmpFile
|
|
@@ -545,17 +546,17 @@ func (s *TagStore) pullV2Tag(eng *engine.Engine, r *registry.Session, out io.Wri
|
|
|
d.tmpFile.Seek(0, 0)
|
|
|
if d.tmpFile != nil {
|
|
|
err = s.graph.Register(d.img,
|
|
|
- utils.ProgressReader(d.tmpFile, int(d.length), out, sf, false, utils.TruncateID(d.img.ID), "Extracting"))
|
|
|
+ utils.ProgressReader(d.tmpFile, int(d.length), out, sf, false, common.TruncateID(d.img.ID), "Extracting"))
|
|
|
if err != nil {
|
|
|
return false, err
|
|
|
}
|
|
|
|
|
|
// FIXME: Pool release here for parallel tag pull (ensures any downloads block until fully extracted)
|
|
|
}
|
|
|
- out.Write(sf.FormatProgress(utils.TruncateID(d.img.ID), "Pull complete", nil))
|
|
|
+ out.Write(sf.FormatProgress(common.TruncateID(d.img.ID), "Pull complete", nil))
|
|
|
layersDownloaded = true
|
|
|
} else {
|
|
|
- out.Write(sf.FormatProgress(utils.TruncateID(d.img.ID), "Already exists", nil))
|
|
|
+ out.Write(sf.FormatProgress(common.TruncateID(d.img.ID), "Already exists", nil))
|
|
|
}
|
|
|
|
|
|
}
|