diff --git a/graph/pull_v2.go b/graph/pull_v2.go index bc8440d215..daf906b98a 100644 --- a/graph/pull_v2.go +++ b/graph/pull_v2.go @@ -139,6 +139,7 @@ func (p *v2Puller) download(di *downloadInfo) { di.err <- err return } + di.tmpFile = tmpFile blobs := p.repo.Blobs(nil) @@ -187,7 +188,6 @@ func (p *v2Puller) download(di *downloadInfo) { out.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Download complete", nil)) logrus.Debugf("Downloaded %s to tempfile %s", di.img.ID, tmpFile.Name()) - di.tmpFile = tmpFile di.layer = layerDownload di.err <- nil @@ -237,9 +237,9 @@ func (p *v2Puller) pullV2Tag(tag, taggedName string) (verified bool, err error) out.Write(p.sf.FormatStatus(tag, "Pulling from %s", p.repo.Name())) - downloads := make([]downloadInfo, len(manifest.FSLayers)) + var downloads []*downloadInfo - layerIDs := []string{} + var layerIDs []string defer func() { p.graph.Release(p.sessionID, layerIDs...) }() @@ -250,66 +250,75 @@ func (p *v2Puller) pullV2Tag(tag, taggedName string) (verified bool, err error) logrus.Debugf("error getting image v1 json: %v", err) return false, err } - downloads[i].img = img - downloads[i].digest = manifest.FSLayers[i].BlobSum - p.graph.Retain(p.sessionID, img.ID) layerIDs = append(layerIDs, img.ID) // Check if exists if p.graph.Exists(img.ID) { logrus.Debugf("Image already exists: %s", img.ID) + out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Already exists", nil)) continue } - out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Pulling fs layer", nil)) - downloads[i].err = make(chan error) - downloads[i].out = pipeWriter - go p.download(&downloads[i]) + d := &downloadInfo{ + img: img, + digest: manifest.FSLayers[i].BlobSum, + // TODO: seems like this chan buffer solved hanging problem in go1.5, + // this can indicate some deeper problem that somehow we never take + // error from channel in loop below + err: make(chan error, 1), + out: pipeWriter, + } + downloads = append(downloads, d) + + go p.download(d) + } + + // run clean for all downloads to prevent leftovers + for _, d := range downloads { + defer func(d *downloadInfo) { + if d.tmpFile != nil { + d.tmpFile.Close() + if err := os.RemoveAll(d.tmpFile.Name()); err != nil { + logrus.Errorf("Failed to remove temp file: %s", d.tmpFile.Name()) + } + } + }(d) } var tagUpdated bool - for i := len(downloads) - 1; i >= 0; i-- { - d := &downloads[i] - if d.err != nil { - if err := <-d.err; err != nil { - return false, err - } + for _, d := range downloads { + if err := <-d.err; err != nil { + return false, err } - if d.layer != nil { - // if tmpFile is empty assume download and extracted elsewhere - defer os.Remove(d.tmpFile.Name()) - defer d.tmpFile.Close() - d.tmpFile.Seek(0, 0) - if d.tmpFile != nil { - - reader := progressreader.New(progressreader.Config{ - In: d.tmpFile, - Out: out, - Formatter: p.sf, - Size: d.size, - NewLines: false, - ID: stringid.TruncateID(d.img.ID), - Action: "Extracting", - }) - - err = p.graph.Register(d.img, reader) - if err != nil { - return false, err - } - - if err := p.graph.SetDigest(d.img.ID, d.digest); err != nil { - return false, err - } - - // FIXME: Pool release here for parallel tag pull (ensures any downloads block until fully extracted) - } - out.Write(p.sf.FormatProgress(stringid.TruncateID(d.img.ID), "Pull complete", nil)) - tagUpdated = true - } else { - out.Write(p.sf.FormatProgress(stringid.TruncateID(d.img.ID), "Already exists", nil)) + if d.layer == nil { + continue } + // if tmpFile is empty assume download and extracted elsewhere + d.tmpFile.Seek(0, 0) + reader := progressreader.New(progressreader.Config{ + In: d.tmpFile, + Out: out, + Formatter: p.sf, + Size: d.size, + NewLines: false, + ID: stringid.TruncateID(d.img.ID), + Action: "Extracting", + }) + + err = p.graph.Register(d.img, reader) + if err != nil { + return false, err + } + + if err := p.graph.SetDigest(d.img.ID, d.digest); err != nil { + return false, err + } + + // FIXME: Pool release here for parallel tag pull (ensures any downloads block until fully extracted) + out.Write(p.sf.FormatProgress(stringid.TruncateID(d.img.ID), "Pull complete", nil)) + tagUpdated = true } manifestDigest, _, err := digestFromManifest(manifest, p.repoInfo.LocalName) @@ -336,17 +345,18 @@ func (p *v2Puller) pullV2Tag(tag, taggedName string) (verified bool, err error) out.Write(p.sf.FormatStatus(p.repo.Name()+":"+tag, "The image you are pulling has been verified. Important: image verification is a tech preview feature and should not be relied on to provide security.")) } + firstID := layerIDs[len(layerIDs)-1] if utils.DigestReference(tag) { // TODO(stevvooe): Ideally, we should always set the digest so we can // use the digest whether we pull by it or not. Unfortunately, the tag // store treats the digest as a separate tag, meaning there may be an // untagged digest image that would seem to be dangling by a user. - if err = p.SetDigest(p.repoInfo.LocalName, tag, downloads[0].img.ID); err != nil { + if err = p.SetDigest(p.repoInfo.LocalName, tag, firstID); err != nil { return false, err } } else { // only set the repository/tag -> image ID mapping when pulling by tag (i.e. not by digest) - if err = p.Tag(p.repoInfo.LocalName, tag, downloads[0].img.ID, true); err != nil { + if err = p.Tag(p.repoInfo.LocalName, tag, firstID, true); err != nil { return false, err } }