[graph] Use a pipe for downloads to write progress

The process of pulling an image spawns a new goroutine for each layer in the
image manifest. If any of these downloads fail we would stop everything and
return the error, even though other goroutines would still be running and
writing output through a progress reader which is attached to an http response
writer. Since the request handler had already returned from the first error,
the http server panics when one of these download goroutines makes a write to
the response writer buffer.

This patch prevents this crash in the daemon http server by waiting for all of
the download goroutines to complete, even if one of them fails. Only then does
it return, terminating the request handler.

Docker-DCO-1.1-Signed-off-by: Josh Hawn <josh.hawn@docker.com> (github: jlhawn)

(cherry picked from commit d80c4244d3)
This commit is contained in:
Josh Hawn 2015-08-05 17:47:37 -07:00 committed by David Calavera
parent 2c875215b1
commit 74df05ccaa
2 changed files with 28 additions and 4 deletions

View file

@ -1,6 +1,7 @@
package graph package graph
import ( import (
"errors"
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
@ -108,6 +109,7 @@ type downloadInfo struct {
layer distribution.ReadSeekCloser layer distribution.ReadSeekCloser
size int64 size int64
err chan error err chan error
out io.Writer // Download progress is written here.
} }
type errVerification struct{} type errVerification struct{}
@ -117,7 +119,7 @@ func (errVerification) Error() string { return "verification failed" }
func (p *v2Puller) download(di *downloadInfo) { func (p *v2Puller) download(di *downloadInfo) {
logrus.Debugf("pulling blob %q to %s", di.digest, di.img.ID) logrus.Debugf("pulling blob %q to %s", di.digest, di.img.ID)
out := p.config.OutStream out := di.out
if c, err := p.poolAdd("pull", "img:"+di.img.ID); err != nil { if c, err := p.poolAdd("pull", "img:"+di.img.ID); err != nil {
if c != nil { if c != nil {
@ -191,7 +193,7 @@ func (p *v2Puller) download(di *downloadInfo) {
di.err <- nil di.err <- nil
} }
func (p *v2Puller) pullV2Tag(tag, taggedName string) (bool, error) { func (p *v2Puller) pullV2Tag(tag, taggedName string) (verified bool, err error) {
logrus.Debugf("Pulling tag from V2 registry: %q", tag) logrus.Debugf("Pulling tag from V2 registry: %q", tag)
out := p.config.OutStream out := p.config.OutStream
@ -204,7 +206,7 @@ func (p *v2Puller) pullV2Tag(tag, taggedName string) (bool, error) {
if err != nil { if err != nil {
return false, err return false, err
} }
verified, err := p.validateManifest(manifest, tag) verified, err = p.validateManifest(manifest, tag)
if err != nil { if err != nil {
return false, err return false, err
} }
@ -212,6 +214,27 @@ func (p *v2Puller) pullV2Tag(tag, taggedName string) (bool, error) {
logrus.Printf("Image manifest for %s has been verified", taggedName) logrus.Printf("Image manifest for %s has been verified", taggedName)
} }
// By using a pipeWriter for each of the downloads to write their progress
// to, we can avoid an issue where this function returns an error but
// leaves behind running download goroutines. By splitting the writer
// with a pipe, we can close the pipe if there is any error, consequently
// causing each download to cancel due to an error writing to this pipe.
pipeReader, pipeWriter := io.Pipe()
go func() {
if _, err := io.Copy(out, pipeReader); err != nil {
logrus.Errorf("error copying from layer download progress reader: %s", err)
}
}()
defer func() {
if err != nil {
// All operations on the pipe are synchronous. This call will wait
// until all current readers/writers are done using the pipe then
// set the error. All successive reads/writes will return with this
// error.
pipeWriter.CloseWithError(errors.New("download canceled"))
}
}()
out.Write(p.sf.FormatStatus(tag, "Pulling from %s", p.repo.Name())) out.Write(p.sf.FormatStatus(tag, "Pulling from %s", p.repo.Name()))
downloads := make([]downloadInfo, len(manifest.FSLayers)) downloads := make([]downloadInfo, len(manifest.FSLayers))
@ -242,6 +265,7 @@ func (p *v2Puller) pullV2Tag(tag, taggedName string) (bool, error) {
out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Pulling fs layer", nil)) out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Pulling fs layer", nil))
downloads[i].err = make(chan error) downloads[i].err = make(chan error)
downloads[i].out = pipeWriter
go p.download(&downloads[i]) go p.download(&downloads[i])
} }

View file

@ -446,7 +446,7 @@ func (s *DockerRegistrySuite) TestPullFailsWithAlteredManifest(c *check.C) {
imageReference := fmt.Sprintf("%s@%s", repoName, manifestDigest) imageReference := fmt.Sprintf("%s@%s", repoName, manifestDigest)
out, exitStatus, _ := dockerCmdWithError(c, "pull", imageReference) out, exitStatus, _ := dockerCmdWithError(c, "pull", imageReference)
if exitStatus == 0 { if exitStatus == 0 {
c.Fatalf("expected a zero exit status but got %d: %s", exitStatus, out) c.Fatalf("expected a non-zero exit status but got %d: %s", exitStatus, out)
} }
expectedErrorMsg := fmt.Sprintf("image verification failed for digest %s", manifestDigest) expectedErrorMsg := fmt.Sprintf("image verification failed for digest %s", manifestDigest)