diff --git a/graph/load.go b/graph/load.go index 8f7efa6af5..d09b04a0e2 100644 --- a/graph/load.go +++ b/graph/load.go @@ -106,14 +106,10 @@ func (s *TagStore) recursiveLoad(address, tmpImageDir string) error { } // ensure no two downloads of the same layer happen at the same time - if c, err := s.poolAdd("pull", "layer:"+img.ID); err != nil { - if c != nil { - logrus.Debugf("Image (id: %s) load is already running, waiting: %v", img.ID, err) - <-c - return nil - } - - return err + if ps, err := s.poolAdd("pull", "layer:"+img.ID); err != nil { + logrus.Debugf("Image (id: %s) load is already running, waiting: %v", img.ID, err) + ps.Wait(nil, nil) + return nil } defer s.poolRemove("pull", "layer:"+img.ID) diff --git a/graph/pools_test.go b/graph/pools_test.go index 129a5e1fec..b0ee3b0337 100644 --- a/graph/pools_test.go +++ b/graph/pools_test.go @@ -3,6 +3,7 @@ package graph import ( "testing" + "github.com/docker/docker/pkg/progressreader" "github.com/docker/docker/pkg/reexec" ) @@ -12,8 +13,8 @@ func init() { func TestPools(t *testing.T) { s := &TagStore{ - pullingPool: make(map[string]chan struct{}), - pushingPool: make(map[string]chan struct{}), + pullingPool: make(map[string]*progressreader.ProgressStatus), + pushingPool: make(map[string]*progressreader.ProgressStatus), } if _, err := s.poolAdd("pull", "test1"); err != nil { diff --git a/graph/pull_v1.go b/graph/pull_v1.go index 0280f1f950..ab1e4a57bd 100644 --- a/graph/pull_v1.go +++ b/graph/pull_v1.go @@ -3,6 +3,7 @@ package graph import ( "errors" "fmt" + "io" "net" "net/url" "strings" @@ -137,31 +138,29 @@ func (p *v1Puller) pullRepository(askedTag string) error { } // ensure no two downloads of the same image happen at the same time - if c, err := p.poolAdd("pull", "img:"+img.ID); err != nil { - if c != nil { - out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Layer already being pulled by another client. Waiting.", nil)) - <-c - out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Download complete", nil)) - } else { - logrus.Debugf("Image (id: %s) pull is already running, skipping: %v", img.ID, err) - } + ps, err := p.poolAdd("pull", "img:"+img.ID) + if err != nil { + msg := p.sf.FormatProgress(stringid.TruncateID(img.ID), "Layer already being pulled by another client. Waiting.", nil) + ps.Wait(out, msg) + out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Download complete", nil)) errors <- nil return } + ps.AddObserver(out) defer p.poolRemove("pull", "img:"+img.ID) // we need to retain it until tagging p.graph.Retain(sessionID, img.ID) imgIDs = append(imgIDs, img.ID) - out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s", img.Tag, p.repoInfo.CanonicalName), nil)) + ps.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s", img.Tag, p.repoInfo.CanonicalName), nil)) success := false - var lastErr, err error + var lastErr error var isDownloaded bool for _, ep := range p.repoInfo.Index.Mirrors { ep += "v1/" - out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, mirror: %s", img.Tag, p.repoInfo.CanonicalName, ep), nil)) - if isDownloaded, err = p.pullImage(img.ID, ep, repoData.Tokens); err != nil { + ps.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, mirror: %s", img.Tag, p.repoInfo.CanonicalName, ep), nil)) + if isDownloaded, err = p.pullImage(ps, img.ID, ep, repoData.Tokens); err != nil { // Don't report errors when pulling from mirrors. logrus.Debugf("Error pulling image (%s) from %s, mirror: %s, %s", img.Tag, p.repoInfo.CanonicalName, ep, err) continue @@ -172,12 +171,12 @@ func (p *v1Puller) pullRepository(askedTag string) error { } if !success { for _, ep := range repoData.Endpoints { - out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, endpoint: %s", img.Tag, p.repoInfo.CanonicalName, ep), nil)) - if isDownloaded, err = p.pullImage(img.ID, ep, repoData.Tokens); err != nil { + ps.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, endpoint: %s", img.Tag, p.repoInfo.CanonicalName, ep), nil)) + if isDownloaded, err = p.pullImage(ps, img.ID, ep, repoData.Tokens); err != nil { // It's not ideal that only the last error is returned, it would be better to concatenate the errors. // As the error is also given to the output stream the user will see the error. lastErr = err - out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Error pulling image (%s) from %s, endpoint: %s, %s", img.Tag, p.repoInfo.CanonicalName, ep, err), nil)) + ps.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Error pulling image (%s) from %s, endpoint: %s, %s", img.Tag, p.repoInfo.CanonicalName, ep, err), nil)) continue } layersDownloaded = layersDownloaded || isDownloaded @@ -187,11 +186,11 @@ func (p *v1Puller) pullRepository(askedTag string) error { } if !success { err := fmt.Errorf("Error pulling image (%s) from %s, %v", img.Tag, p.repoInfo.CanonicalName, lastErr) - out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), err.Error(), nil)) + ps.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), err.Error(), nil)) errors <- err return } - out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Download complete", nil)) + ps.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Download complete", nil)) errors <- nil } @@ -226,12 +225,11 @@ func (p *v1Puller) pullRepository(askedTag string) error { return nil } -func (p *v1Puller) pullImage(imgID, endpoint string, token []string) (bool, error) { +func (p *v1Puller) pullImage(out io.Writer, imgID, endpoint string, token []string) (bool, error) { history, err := p.session.GetRemoteHistory(imgID, endpoint) if err != nil { return false, err } - out := p.config.OutStream out.Write(p.sf.FormatProgress(stringid.TruncateID(imgID), "Pulling dependent layers", nil)) // FIXME: Try to stream the images? // FIXME: Launch the getRemoteImage() in goroutines @@ -246,14 +244,18 @@ func (p *v1Puller) pullImage(imgID, endpoint string, token []string) (bool, erro id := history[i] // ensure no two downloads of the same layer happen at the same time - if c, err := p.poolAdd("pull", "layer:"+id); err != nil { + ps, err := p.poolAdd("pull", "layer:"+id) + if err != nil { logrus.Debugf("Image (id: %s) pull is already running, skipping: %v", id, err) - <-c + msg := p.sf.FormatProgress(stringid.TruncateID(imgID), "Layer already being pulled by another client. Waiting.", nil) + ps.Wait(out, msg) + } else { + ps.AddObserver(out) } defer p.poolRemove("pull", "layer:"+id) if !p.graph.Exists(id) { - out.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Pulling metadata", nil)) + ps.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Pulling metadata", nil)) var ( imgJSON []byte imgSize int64 @@ -264,7 +266,7 @@ func (p *v1Puller) pullImage(imgID, endpoint string, token []string) (bool, erro for j := 1; j <= retries; j++ { imgJSON, imgSize, err = p.session.GetRemoteImageJSON(id, endpoint) if err != nil && j == retries { - out.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Error pulling dependent layers", nil)) + ps.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Error pulling dependent layers", nil)) return layersDownloaded, err } else if err != nil { time.Sleep(time.Duration(j) * 500 * time.Millisecond) @@ -273,7 +275,7 @@ func (p *v1Puller) pullImage(imgID, endpoint string, token []string) (bool, erro img, err = image.NewImgJSON(imgJSON) layersDownloaded = true if err != nil && j == retries { - out.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Error pulling dependent layers", nil)) + ps.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Error pulling dependent layers", nil)) return layersDownloaded, fmt.Errorf("Failed to parse json: %s", err) } else if err != nil { time.Sleep(time.Duration(j) * 500 * time.Millisecond) @@ -289,7 +291,7 @@ func (p *v1Puller) pullImage(imgID, endpoint string, token []string) (bool, erro if j > 1 { status = fmt.Sprintf("Pulling fs layer [retries: %d]", j) } - out.Write(p.sf.FormatProgress(stringid.TruncateID(id), status, nil)) + ps.Write(p.sf.FormatProgress(stringid.TruncateID(id), status, nil)) layer, err := p.session.GetRemoteImageLayer(img.ID, endpoint, imgSize) if uerr, ok := err.(*url.Error); ok { err = uerr.Err @@ -298,7 +300,7 @@ func (p *v1Puller) pullImage(imgID, endpoint string, token []string) (bool, erro time.Sleep(time.Duration(j) * 500 * time.Millisecond) continue } else if err != nil { - out.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Error pulling dependent layers", nil)) + ps.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Error pulling dependent layers", nil)) return layersDownloaded, err } layersDownloaded = true @@ -307,7 +309,7 @@ func (p *v1Puller) pullImage(imgID, endpoint string, token []string) (bool, erro err = p.graph.Register(img, progressreader.New(progressreader.Config{ In: layer, - Out: out, + Out: ps, Formatter: p.sf, Size: imgSize, NewLines: false, @@ -318,14 +320,14 @@ func (p *v1Puller) pullImage(imgID, endpoint string, token []string) (bool, erro time.Sleep(time.Duration(j) * 500 * time.Millisecond) continue } else if err != nil { - out.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Error downloading dependent layers", nil)) + ps.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Error downloading dependent layers", nil)) return layersDownloaded, err } else { break } } } - out.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Download complete", nil)) + ps.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Download complete", nil)) } return layersDownloaded, nil } diff --git a/graph/pull_v2.go b/graph/pull_v2.go index 0b21c1f75a..ee9aee4c0e 100644 --- a/graph/pull_v2.go +++ b/graph/pull_v2.go @@ -73,30 +73,28 @@ func (p *v2Puller) pullV2Repository(tag string) (err error) { } - c, err := p.poolAdd("pull", taggedName) + ps, err := p.poolAdd("pull", taggedName) if err != nil { - if c != nil { - // Another pull of the same repository is already taking place; just wait for it to finish - p.config.OutStream.Write(p.sf.FormatStatus("", "Repository %s already being pulled by another client. Waiting.", p.repoInfo.CanonicalName)) - <-c - return nil - } - return err + // Another pull of the same repository is already taking place; just wait for it to finish + msg := p.sf.FormatStatus("", "Repository %s already being pulled by another client. Waiting.", p.repoInfo.CanonicalName) + ps.Wait(p.config.OutStream, msg) + return nil } defer p.poolRemove("pull", taggedName) + ps.AddObserver(p.config.OutStream) var layersDownloaded bool for _, tag := range tags { // pulledNew is true if either new layers were downloaded OR if existing images were newly tagged // TODO(tiborvass): should we change the name of `layersDownload`? What about message in WriteStatus? - pulledNew, err := p.pullV2Tag(tag, taggedName) + pulledNew, err := p.pullV2Tag(ps, tag, taggedName) if err != nil { return err } layersDownloaded = layersDownloaded || pulledNew } - writeStatus(taggedName, p.config.OutStream, p.sf, layersDownloaded) + writeStatus(taggedName, ps, p.sf, layersDownloaded) return nil } @@ -121,18 +119,16 @@ func (p *v2Puller) download(di *downloadInfo) { out := di.out - if c, err := p.poolAdd("pull", "img:"+di.img.ID); err != nil { - if c != nil { - out.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Layer already being pulled by another client. Waiting.", nil)) - <-c - out.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Download complete", nil)) - } else { - logrus.Debugf("Image (id: %s) pull is already running, skipping: %v", di.img.ID, err) - } + ps, err := p.poolAdd("pull", "img:"+di.img.ID) + if err != nil { + msg := p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Layer already being pulled by another client. Waiting.", nil) + ps.Wait(out, msg) + out.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Download complete", nil)) di.err <- nil return } + ps.AddObserver(out) defer p.poolRemove("pull", "img:"+di.img.ID) tmpFile, err := ioutil.TempFile("", "GetImageBlob") if err != nil { @@ -167,7 +163,7 @@ func (p *v2Puller) download(di *downloadInfo) { reader := progressreader.New(progressreader.Config{ In: ioutil.NopCloser(io.TeeReader(layerDownload, verifier)), - Out: out, + Out: ps, Formatter: p.sf, Size: di.size, NewLines: false, @@ -176,7 +172,7 @@ func (p *v2Puller) download(di *downloadInfo) { }) io.Copy(tmpFile, reader) - out.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Verifying Checksum", nil)) + ps.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Verifying Checksum", nil)) if !verifier.Verified() { err = fmt.Errorf("filesystem layer verification failed for digest %s", di.digest) @@ -185,7 +181,7 @@ func (p *v2Puller) download(di *downloadInfo) { return } - out.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Download complete", nil)) + ps.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Download complete", nil)) logrus.Debugf("Downloaded %s to tempfile %s", di.img.ID, tmpFile.Name()) di.layer = layerDownload @@ -193,9 +189,8 @@ func (p *v2Puller) download(di *downloadInfo) { di.err <- nil } -func (p *v2Puller) pullV2Tag(tag, taggedName string) (verified bool, err error) { +func (p *v2Puller) pullV2Tag(out io.Writer, tag, taggedName string) (verified bool, err error) { logrus.Debugf("Pulling tag from V2 registry: %q", tag) - out := p.config.OutStream manSvc, err := p.repo.Manifests(context.Background()) if err != nil { diff --git a/graph/tags.go b/graph/tags.go index 9deb463735..1485be6705 100644 --- a/graph/tags.go +++ b/graph/tags.go @@ -17,6 +17,7 @@ import ( "github.com/docker/docker/graph/tags" "github.com/docker/docker/image" "github.com/docker/docker/pkg/parsers" + "github.com/docker/docker/pkg/progressreader" "github.com/docker/docker/pkg/stringid" "github.com/docker/docker/registry" "github.com/docker/docker/trust" @@ -36,8 +37,8 @@ type TagStore struct { sync.Mutex // FIXME: move push/pull-related fields // to a helper type - pullingPool map[string]chan struct{} - pushingPool map[string]chan struct{} + pullingPool map[string]*progressreader.ProgressStatus + pushingPool map[string]*progressreader.ProgressStatus registryService *registry.Service eventsService *events.Events trustService *trust.TrustStore @@ -93,8 +94,8 @@ func NewTagStore(path string, cfg *TagStoreConfig) (*TagStore, error) { graph: cfg.Graph, trustKey: cfg.Key, Repositories: make(map[string]Repository), - pullingPool: make(map[string]chan struct{}), - pushingPool: make(map[string]chan struct{}), + pullingPool: make(map[string]*progressreader.ProgressStatus), + pushingPool: make(map[string]*progressreader.ProgressStatus), registryService: cfg.Registry, eventsService: cfg.Events, trustService: cfg.Trust, @@ -427,27 +428,27 @@ func validateDigest(dgst string) error { return nil } -func (store *TagStore) poolAdd(kind, key string) (chan struct{}, error) { +func (store *TagStore) poolAdd(kind, key string) (*progressreader.ProgressStatus, error) { store.Lock() defer store.Unlock() - if c, exists := store.pullingPool[key]; exists { - return c, fmt.Errorf("pull %s is already in progress", key) + if p, exists := store.pullingPool[key]; exists { + return p, fmt.Errorf("pull %s is already in progress", key) } - if c, exists := store.pushingPool[key]; exists { - return c, fmt.Errorf("push %s is already in progress", key) + if p, exists := store.pushingPool[key]; exists { + return p, fmt.Errorf("push %s is already in progress", key) } - c := make(chan struct{}) + ps := progressreader.NewProgressStatus() switch kind { case "pull": - store.pullingPool[key] = c + store.pullingPool[key] = ps case "push": - store.pushingPool[key] = c + store.pushingPool[key] = ps default: return nil, fmt.Errorf("Unknown pool type") } - return c, nil + return ps, nil } func (store *TagStore) poolRemove(kind, key string) error { @@ -455,13 +456,13 @@ func (store *TagStore) poolRemove(kind, key string) error { defer store.Unlock() switch kind { case "pull": - if c, exists := store.pullingPool[key]; exists { - close(c) + if ps, exists := store.pullingPool[key]; exists { + ps.Done() delete(store.pullingPool, key) } case "push": - if c, exists := store.pushingPool[key]; exists { - close(c) + if ps, exists := store.pushingPool[key]; exists { + ps.Done() delete(store.pushingPool, key) } default: diff --git a/pkg/progressreader/progressstatus.go b/pkg/progressreader/progressstatus.go new file mode 100644 index 0000000000..f536b84053 --- /dev/null +++ b/pkg/progressreader/progressstatus.go @@ -0,0 +1,72 @@ +package progressreader + +import ( + "bytes" + "io" + "sync" + + "github.com/docker/docker/vendor/src/github.com/Sirupsen/logrus" +) + +type ProgressStatus struct { + sync.Mutex + c chan struct{} + observers []io.Writer + history bytes.Buffer +} + +func NewProgressStatus() *ProgressStatus { + return &ProgressStatus{ + c: make(chan struct{}), + observers: []io.Writer{}, + } +} + +func (ps *ProgressStatus) Write(p []byte) (n int, err error) { + ps.Lock() + defer ps.Unlock() + ps.history.Write(p) + for _, w := range ps.observers { + // copy paste from MultiWriter, replaced return with continue + n, err = w.Write(p) + if err != nil { + continue + } + if n != len(p) { + err = io.ErrShortWrite + continue + } + } + return len(p), nil +} + +func (ps *ProgressStatus) AddObserver(w io.Writer) { + ps.Lock() + defer ps.Unlock() + w.Write(ps.history.Bytes()) + ps.observers = append(ps.observers, w) +} + +func (ps *ProgressStatus) Done() { + ps.Lock() + close(ps.c) + ps.history.Reset() + ps.Unlock() +} + +func (ps *ProgressStatus) Wait(w io.Writer, msg []byte) error { + ps.Lock() + channel := ps.c + ps.Unlock() + + if channel == nil { + // defensive + logrus.Debugf("Channel is nil ") + } + if w != nil { + w.Write(msg) + ps.AddObserver(w) + } + <-channel + return nil +}