From 572f008e892b06262963ccb75c631f2f5d6c6fcd Mon Sep 17 00:00:00 2001 From: Sam Abed Date: Tue, 25 Aug 2015 14:17:42 -0700 Subject: [PATCH 1/5] Show pull progress in terminal for inflight pull requests Based on #12874 from Sam Abed . His original commit was brought up to date by manually porting the changes in pull.go into the new code in pull_v1.go and pull_v2.go. Fixes #8385 Signed-off-by: Aaron Lehmann --- graph/load.go | 12 ++--- graph/pools_test.go | 5 +- graph/pull_v1.go | 60 ++++++++++++----------- graph/pull_v2.go | 41 +++++++--------- graph/tags.go | 35 +++++++------- pkg/progressreader/progressstatus.go | 72 ++++++++++++++++++++++++++++ 6 files changed, 146 insertions(+), 79 deletions(-) create mode 100644 pkg/progressreader/progressstatus.go diff --git a/graph/load.go b/graph/load.go index 8f7efa6af5..d09b04a0e2 100644 --- a/graph/load.go +++ b/graph/load.go @@ -106,14 +106,10 @@ func (s *TagStore) recursiveLoad(address, tmpImageDir string) error { } // ensure no two downloads of the same layer happen at the same time - if c, err := s.poolAdd("pull", "layer:"+img.ID); err != nil { - if c != nil { - logrus.Debugf("Image (id: %s) load is already running, waiting: %v", img.ID, err) - <-c - return nil - } - - return err + if ps, err := s.poolAdd("pull", "layer:"+img.ID); err != nil { + logrus.Debugf("Image (id: %s) load is already running, waiting: %v", img.ID, err) + ps.Wait(nil, nil) + return nil } defer s.poolRemove("pull", "layer:"+img.ID) diff --git a/graph/pools_test.go b/graph/pools_test.go index 129a5e1fec..b0ee3b0337 100644 --- a/graph/pools_test.go +++ b/graph/pools_test.go @@ -3,6 +3,7 @@ package graph import ( "testing" + "github.com/docker/docker/pkg/progressreader" "github.com/docker/docker/pkg/reexec" ) @@ -12,8 +13,8 @@ func init() { func TestPools(t *testing.T) { s := &TagStore{ - pullingPool: make(map[string]chan struct{}), - pushingPool: make(map[string]chan struct{}), + pullingPool: make(map[string]*progressreader.ProgressStatus), + pushingPool: make(map[string]*progressreader.ProgressStatus), } if _, err := s.poolAdd("pull", "test1"); err != nil { diff --git a/graph/pull_v1.go b/graph/pull_v1.go index 0280f1f950..ab1e4a57bd 100644 --- a/graph/pull_v1.go +++ b/graph/pull_v1.go @@ -3,6 +3,7 @@ package graph import ( "errors" "fmt" + "io" "net" "net/url" "strings" @@ -137,31 +138,29 @@ func (p *v1Puller) pullRepository(askedTag string) error { } // ensure no two downloads of the same image happen at the same time - if c, err := p.poolAdd("pull", "img:"+img.ID); err != nil { - if c != nil { - out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Layer already being pulled by another client. Waiting.", nil)) - <-c - out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Download complete", nil)) - } else { - logrus.Debugf("Image (id: %s) pull is already running, skipping: %v", img.ID, err) - } + ps, err := p.poolAdd("pull", "img:"+img.ID) + if err != nil { + msg := p.sf.FormatProgress(stringid.TruncateID(img.ID), "Layer already being pulled by another client. Waiting.", nil) + ps.Wait(out, msg) + out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Download complete", nil)) errors <- nil return } + ps.AddObserver(out) defer p.poolRemove("pull", "img:"+img.ID) // we need to retain it until tagging p.graph.Retain(sessionID, img.ID) imgIDs = append(imgIDs, img.ID) - out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s", img.Tag, p.repoInfo.CanonicalName), nil)) + ps.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s", img.Tag, p.repoInfo.CanonicalName), nil)) success := false - var lastErr, err error + var lastErr error var isDownloaded bool for _, ep := range p.repoInfo.Index.Mirrors { ep += "v1/" - out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, mirror: %s", img.Tag, p.repoInfo.CanonicalName, ep), nil)) - if isDownloaded, err = p.pullImage(img.ID, ep, repoData.Tokens); err != nil { + ps.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, mirror: %s", img.Tag, p.repoInfo.CanonicalName, ep), nil)) + if isDownloaded, err = p.pullImage(ps, img.ID, ep, repoData.Tokens); err != nil { // Don't report errors when pulling from mirrors. logrus.Debugf("Error pulling image (%s) from %s, mirror: %s, %s", img.Tag, p.repoInfo.CanonicalName, ep, err) continue @@ -172,12 +171,12 @@ func (p *v1Puller) pullRepository(askedTag string) error { } if !success { for _, ep := range repoData.Endpoints { - out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, endpoint: %s", img.Tag, p.repoInfo.CanonicalName, ep), nil)) - if isDownloaded, err = p.pullImage(img.ID, ep, repoData.Tokens); err != nil { + ps.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, endpoint: %s", img.Tag, p.repoInfo.CanonicalName, ep), nil)) + if isDownloaded, err = p.pullImage(ps, img.ID, ep, repoData.Tokens); err != nil { // It's not ideal that only the last error is returned, it would be better to concatenate the errors. // As the error is also given to the output stream the user will see the error. lastErr = err - out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Error pulling image (%s) from %s, endpoint: %s, %s", img.Tag, p.repoInfo.CanonicalName, ep, err), nil)) + ps.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Error pulling image (%s) from %s, endpoint: %s, %s", img.Tag, p.repoInfo.CanonicalName, ep, err), nil)) continue } layersDownloaded = layersDownloaded || isDownloaded @@ -187,11 +186,11 @@ func (p *v1Puller) pullRepository(askedTag string) error { } if !success { err := fmt.Errorf("Error pulling image (%s) from %s, %v", img.Tag, p.repoInfo.CanonicalName, lastErr) - out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), err.Error(), nil)) + ps.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), err.Error(), nil)) errors <- err return } - out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Download complete", nil)) + ps.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Download complete", nil)) errors <- nil } @@ -226,12 +225,11 @@ func (p *v1Puller) pullRepository(askedTag string) error { return nil } -func (p *v1Puller) pullImage(imgID, endpoint string, token []string) (bool, error) { +func (p *v1Puller) pullImage(out io.Writer, imgID, endpoint string, token []string) (bool, error) { history, err := p.session.GetRemoteHistory(imgID, endpoint) if err != nil { return false, err } - out := p.config.OutStream out.Write(p.sf.FormatProgress(stringid.TruncateID(imgID), "Pulling dependent layers", nil)) // FIXME: Try to stream the images? // FIXME: Launch the getRemoteImage() in goroutines @@ -246,14 +244,18 @@ func (p *v1Puller) pullImage(imgID, endpoint string, token []string) (bool, erro id := history[i] // ensure no two downloads of the same layer happen at the same time - if c, err := p.poolAdd("pull", "layer:"+id); err != nil { + ps, err := p.poolAdd("pull", "layer:"+id) + if err != nil { logrus.Debugf("Image (id: %s) pull is already running, skipping: %v", id, err) - <-c + msg := p.sf.FormatProgress(stringid.TruncateID(imgID), "Layer already being pulled by another client. Waiting.", nil) + ps.Wait(out, msg) + } else { + ps.AddObserver(out) } defer p.poolRemove("pull", "layer:"+id) if !p.graph.Exists(id) { - out.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Pulling metadata", nil)) + ps.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Pulling metadata", nil)) var ( imgJSON []byte imgSize int64 @@ -264,7 +266,7 @@ func (p *v1Puller) pullImage(imgID, endpoint string, token []string) (bool, erro for j := 1; j <= retries; j++ { imgJSON, imgSize, err = p.session.GetRemoteImageJSON(id, endpoint) if err != nil && j == retries { - out.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Error pulling dependent layers", nil)) + ps.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Error pulling dependent layers", nil)) return layersDownloaded, err } else if err != nil { time.Sleep(time.Duration(j) * 500 * time.Millisecond) @@ -273,7 +275,7 @@ func (p *v1Puller) pullImage(imgID, endpoint string, token []string) (bool, erro img, err = image.NewImgJSON(imgJSON) layersDownloaded = true if err != nil && j == retries { - out.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Error pulling dependent layers", nil)) + ps.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Error pulling dependent layers", nil)) return layersDownloaded, fmt.Errorf("Failed to parse json: %s", err) } else if err != nil { time.Sleep(time.Duration(j) * 500 * time.Millisecond) @@ -289,7 +291,7 @@ func (p *v1Puller) pullImage(imgID, endpoint string, token []string) (bool, erro if j > 1 { status = fmt.Sprintf("Pulling fs layer [retries: %d]", j) } - out.Write(p.sf.FormatProgress(stringid.TruncateID(id), status, nil)) + ps.Write(p.sf.FormatProgress(stringid.TruncateID(id), status, nil)) layer, err := p.session.GetRemoteImageLayer(img.ID, endpoint, imgSize) if uerr, ok := err.(*url.Error); ok { err = uerr.Err @@ -298,7 +300,7 @@ func (p *v1Puller) pullImage(imgID, endpoint string, token []string) (bool, erro time.Sleep(time.Duration(j) * 500 * time.Millisecond) continue } else if err != nil { - out.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Error pulling dependent layers", nil)) + ps.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Error pulling dependent layers", nil)) return layersDownloaded, err } layersDownloaded = true @@ -307,7 +309,7 @@ func (p *v1Puller) pullImage(imgID, endpoint string, token []string) (bool, erro err = p.graph.Register(img, progressreader.New(progressreader.Config{ In: layer, - Out: out, + Out: ps, Formatter: p.sf, Size: imgSize, NewLines: false, @@ -318,14 +320,14 @@ func (p *v1Puller) pullImage(imgID, endpoint string, token []string) (bool, erro time.Sleep(time.Duration(j) * 500 * time.Millisecond) continue } else if err != nil { - out.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Error downloading dependent layers", nil)) + ps.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Error downloading dependent layers", nil)) return layersDownloaded, err } else { break } } } - out.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Download complete", nil)) + ps.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Download complete", nil)) } return layersDownloaded, nil } diff --git a/graph/pull_v2.go b/graph/pull_v2.go index 0b21c1f75a..ee9aee4c0e 100644 --- a/graph/pull_v2.go +++ b/graph/pull_v2.go @@ -73,30 +73,28 @@ func (p *v2Puller) pullV2Repository(tag string) (err error) { } - c, err := p.poolAdd("pull", taggedName) + ps, err := p.poolAdd("pull", taggedName) if err != nil { - if c != nil { - // Another pull of the same repository is already taking place; just wait for it to finish - p.config.OutStream.Write(p.sf.FormatStatus("", "Repository %s already being pulled by another client. Waiting.", p.repoInfo.CanonicalName)) - <-c - return nil - } - return err + // Another pull of the same repository is already taking place; just wait for it to finish + msg := p.sf.FormatStatus("", "Repository %s already being pulled by another client. Waiting.", p.repoInfo.CanonicalName) + ps.Wait(p.config.OutStream, msg) + return nil } defer p.poolRemove("pull", taggedName) + ps.AddObserver(p.config.OutStream) var layersDownloaded bool for _, tag := range tags { // pulledNew is true if either new layers were downloaded OR if existing images were newly tagged // TODO(tiborvass): should we change the name of `layersDownload`? What about message in WriteStatus? - pulledNew, err := p.pullV2Tag(tag, taggedName) + pulledNew, err := p.pullV2Tag(ps, tag, taggedName) if err != nil { return err } layersDownloaded = layersDownloaded || pulledNew } - writeStatus(taggedName, p.config.OutStream, p.sf, layersDownloaded) + writeStatus(taggedName, ps, p.sf, layersDownloaded) return nil } @@ -121,18 +119,16 @@ func (p *v2Puller) download(di *downloadInfo) { out := di.out - if c, err := p.poolAdd("pull", "img:"+di.img.ID); err != nil { - if c != nil { - out.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Layer already being pulled by another client. Waiting.", nil)) - <-c - out.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Download complete", nil)) - } else { - logrus.Debugf("Image (id: %s) pull is already running, skipping: %v", di.img.ID, err) - } + ps, err := p.poolAdd("pull", "img:"+di.img.ID) + if err != nil { + msg := p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Layer already being pulled by another client. Waiting.", nil) + ps.Wait(out, msg) + out.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Download complete", nil)) di.err <- nil return } + ps.AddObserver(out) defer p.poolRemove("pull", "img:"+di.img.ID) tmpFile, err := ioutil.TempFile("", "GetImageBlob") if err != nil { @@ -167,7 +163,7 @@ func (p *v2Puller) download(di *downloadInfo) { reader := progressreader.New(progressreader.Config{ In: ioutil.NopCloser(io.TeeReader(layerDownload, verifier)), - Out: out, + Out: ps, Formatter: p.sf, Size: di.size, NewLines: false, @@ -176,7 +172,7 @@ func (p *v2Puller) download(di *downloadInfo) { }) io.Copy(tmpFile, reader) - out.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Verifying Checksum", nil)) + ps.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Verifying Checksum", nil)) if !verifier.Verified() { err = fmt.Errorf("filesystem layer verification failed for digest %s", di.digest) @@ -185,7 +181,7 @@ func (p *v2Puller) download(di *downloadInfo) { return } - out.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Download complete", nil)) + ps.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Download complete", nil)) logrus.Debugf("Downloaded %s to tempfile %s", di.img.ID, tmpFile.Name()) di.layer = layerDownload @@ -193,9 +189,8 @@ func (p *v2Puller) download(di *downloadInfo) { di.err <- nil } -func (p *v2Puller) pullV2Tag(tag, taggedName string) (verified bool, err error) { +func (p *v2Puller) pullV2Tag(out io.Writer, tag, taggedName string) (verified bool, err error) { logrus.Debugf("Pulling tag from V2 registry: %q", tag) - out := p.config.OutStream manSvc, err := p.repo.Manifests(context.Background()) if err != nil { diff --git a/graph/tags.go b/graph/tags.go index 9deb463735..1485be6705 100644 --- a/graph/tags.go +++ b/graph/tags.go @@ -17,6 +17,7 @@ import ( "github.com/docker/docker/graph/tags" "github.com/docker/docker/image" "github.com/docker/docker/pkg/parsers" + "github.com/docker/docker/pkg/progressreader" "github.com/docker/docker/pkg/stringid" "github.com/docker/docker/registry" "github.com/docker/docker/trust" @@ -36,8 +37,8 @@ type TagStore struct { sync.Mutex // FIXME: move push/pull-related fields // to a helper type - pullingPool map[string]chan struct{} - pushingPool map[string]chan struct{} + pullingPool map[string]*progressreader.ProgressStatus + pushingPool map[string]*progressreader.ProgressStatus registryService *registry.Service eventsService *events.Events trustService *trust.TrustStore @@ -93,8 +94,8 @@ func NewTagStore(path string, cfg *TagStoreConfig) (*TagStore, error) { graph: cfg.Graph, trustKey: cfg.Key, Repositories: make(map[string]Repository), - pullingPool: make(map[string]chan struct{}), - pushingPool: make(map[string]chan struct{}), + pullingPool: make(map[string]*progressreader.ProgressStatus), + pushingPool: make(map[string]*progressreader.ProgressStatus), registryService: cfg.Registry, eventsService: cfg.Events, trustService: cfg.Trust, @@ -427,27 +428,27 @@ func validateDigest(dgst string) error { return nil } -func (store *TagStore) poolAdd(kind, key string) (chan struct{}, error) { +func (store *TagStore) poolAdd(kind, key string) (*progressreader.ProgressStatus, error) { store.Lock() defer store.Unlock() - if c, exists := store.pullingPool[key]; exists { - return c, fmt.Errorf("pull %s is already in progress", key) + if p, exists := store.pullingPool[key]; exists { + return p, fmt.Errorf("pull %s is already in progress", key) } - if c, exists := store.pushingPool[key]; exists { - return c, fmt.Errorf("push %s is already in progress", key) + if p, exists := store.pushingPool[key]; exists { + return p, fmt.Errorf("push %s is already in progress", key) } - c := make(chan struct{}) + ps := progressreader.NewProgressStatus() switch kind { case "pull": - store.pullingPool[key] = c + store.pullingPool[key] = ps case "push": - store.pushingPool[key] = c + store.pushingPool[key] = ps default: return nil, fmt.Errorf("Unknown pool type") } - return c, nil + return ps, nil } func (store *TagStore) poolRemove(kind, key string) error { @@ -455,13 +456,13 @@ func (store *TagStore) poolRemove(kind, key string) error { defer store.Unlock() switch kind { case "pull": - if c, exists := store.pullingPool[key]; exists { - close(c) + if ps, exists := store.pullingPool[key]; exists { + ps.Done() delete(store.pullingPool, key) } case "push": - if c, exists := store.pushingPool[key]; exists { - close(c) + if ps, exists := store.pushingPool[key]; exists { + ps.Done() delete(store.pushingPool, key) } default: diff --git a/pkg/progressreader/progressstatus.go b/pkg/progressreader/progressstatus.go new file mode 100644 index 0000000000..f536b84053 --- /dev/null +++ b/pkg/progressreader/progressstatus.go @@ -0,0 +1,72 @@ +package progressreader + +import ( + "bytes" + "io" + "sync" + + "github.com/docker/docker/vendor/src/github.com/Sirupsen/logrus" +) + +type ProgressStatus struct { + sync.Mutex + c chan struct{} + observers []io.Writer + history bytes.Buffer +} + +func NewProgressStatus() *ProgressStatus { + return &ProgressStatus{ + c: make(chan struct{}), + observers: []io.Writer{}, + } +} + +func (ps *ProgressStatus) Write(p []byte) (n int, err error) { + ps.Lock() + defer ps.Unlock() + ps.history.Write(p) + for _, w := range ps.observers { + // copy paste from MultiWriter, replaced return with continue + n, err = w.Write(p) + if err != nil { + continue + } + if n != len(p) { + err = io.ErrShortWrite + continue + } + } + return len(p), nil +} + +func (ps *ProgressStatus) AddObserver(w io.Writer) { + ps.Lock() + defer ps.Unlock() + w.Write(ps.history.Bytes()) + ps.observers = append(ps.observers, w) +} + +func (ps *ProgressStatus) Done() { + ps.Lock() + close(ps.c) + ps.history.Reset() + ps.Unlock() +} + +func (ps *ProgressStatus) Wait(w io.Writer, msg []byte) error { + ps.Lock() + channel := ps.c + ps.Unlock() + + if channel == nil { + // defensive + logrus.Debugf("Channel is nil ") + } + if w != nil { + w.Write(msg) + ps.AddObserver(w) + } + <-channel + return nil +} From 80513d85cfc0e46f8202fc3030f11052bbfeea7a Mon Sep 17 00:00:00 2001 From: Aaron Lehmann Date: Tue, 11 Aug 2015 09:44:50 -0700 Subject: [PATCH 2/5] Change poolAdd to return a boolean instead of an error Previously, its other return value was used even when it returned an error. This is awkward and goes against the convention. It also could have resulted in a nil pointer dereference when an error was returned because of an unknown pool type. This changes the unknown pool type error to a panic (since the pool types are hardcoded at call sites and must always be "push" or "pull"), and returns a "found" boolean instead of an error. Signed-off-by: Aaron Lehmann --- graph/load.go | 4 ++-- graph/pools_test.go | 22 ++++++++-------------- graph/pull_v1.go | 12 ++++++------ graph/pull_v2.go | 8 ++++---- graph/push_v1.go | 5 ++--- graph/push_v2.go | 4 ++-- graph/tags.go | 15 ++++++++++----- 7 files changed, 34 insertions(+), 36 deletions(-) diff --git a/graph/load.go b/graph/load.go index d09b04a0e2..f1dfce7a1c 100644 --- a/graph/load.go +++ b/graph/load.go @@ -106,8 +106,8 @@ func (s *TagStore) recursiveLoad(address, tmpImageDir string) error { } // ensure no two downloads of the same layer happen at the same time - if ps, err := s.poolAdd("pull", "layer:"+img.ID); err != nil { - logrus.Debugf("Image (id: %s) load is already running, waiting: %v", img.ID, err) + if ps, found := s.poolAdd("pull", "layer:"+img.ID); found { + logrus.Debugf("Image (id: %s) load is already running, waiting", img.ID) ps.Wait(nil, nil) return nil } diff --git a/graph/pools_test.go b/graph/pools_test.go index b0ee3b0337..f88a1cf15b 100644 --- a/graph/pools_test.go +++ b/graph/pools_test.go @@ -17,20 +17,17 @@ func TestPools(t *testing.T) { pushingPool: make(map[string]*progressreader.ProgressStatus), } - if _, err := s.poolAdd("pull", "test1"); err != nil { - t.Fatal(err) + if _, found := s.poolAdd("pull", "test1"); found { + t.Fatal("Expected pull test1 not to be in progress") } - if _, err := s.poolAdd("pull", "test2"); err != nil { - t.Fatal(err) + if _, found := s.poolAdd("pull", "test2"); found { + t.Fatal("Expected pull test2 not to be in progress") } - if _, err := s.poolAdd("push", "test1"); err == nil || err.Error() != "pull test1 is already in progress" { - t.Fatalf("Expected `pull test1 is already in progress`") + if _, found := s.poolAdd("push", "test1"); !found { + t.Fatalf("Expected pull test1 to be in progress`") } - if _, err := s.poolAdd("pull", "test1"); err == nil || err.Error() != "pull test1 is already in progress" { - t.Fatalf("Expected `pull test1 is already in progress`") - } - if _, err := s.poolAdd("wait", "test3"); err == nil || err.Error() != "Unknown pool type" { - t.Fatalf("Expected `Unknown pool type`") + if _, found := s.poolAdd("pull", "test1"); !found { + t.Fatalf("Expected pull test1 to be in progress`") } if err := s.poolRemove("pull", "test2"); err != nil { t.Fatal(err) @@ -44,7 +41,4 @@ func TestPools(t *testing.T) { if err := s.poolRemove("push", "test1"); err != nil { t.Fatal(err) } - if err := s.poolRemove("wait", "test3"); err == nil || err.Error() != "Unknown pool type" { - t.Fatalf("Expected `Unknown pool type`") - } } diff --git a/graph/pull_v1.go b/graph/pull_v1.go index ab1e4a57bd..d5f9492790 100644 --- a/graph/pull_v1.go +++ b/graph/pull_v1.go @@ -138,8 +138,8 @@ func (p *v1Puller) pullRepository(askedTag string) error { } // ensure no two downloads of the same image happen at the same time - ps, err := p.poolAdd("pull", "img:"+img.ID) - if err != nil { + ps, found := p.poolAdd("pull", "img:"+img.ID) + if found { msg := p.sf.FormatProgress(stringid.TruncateID(img.ID), "Layer already being pulled by another client. Waiting.", nil) ps.Wait(out, msg) out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Download complete", nil)) @@ -155,7 +155,7 @@ func (p *v1Puller) pullRepository(askedTag string) error { ps.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s", img.Tag, p.repoInfo.CanonicalName), nil)) success := false - var lastErr error + var lastErr, err error var isDownloaded bool for _, ep := range p.repoInfo.Index.Mirrors { ep += "v1/" @@ -244,9 +244,9 @@ func (p *v1Puller) pullImage(out io.Writer, imgID, endpoint string, token []stri id := history[i] // ensure no two downloads of the same layer happen at the same time - ps, err := p.poolAdd("pull", "layer:"+id) - if err != nil { - logrus.Debugf("Image (id: %s) pull is already running, skipping: %v", id, err) + ps, found := p.poolAdd("pull", "layer:"+id) + if found { + logrus.Debugf("Image (id: %s) pull is already running, skipping", id) msg := p.sf.FormatProgress(stringid.TruncateID(imgID), "Layer already being pulled by another client. Waiting.", nil) ps.Wait(out, msg) } else { diff --git a/graph/pull_v2.go b/graph/pull_v2.go index ee9aee4c0e..ed5605befb 100644 --- a/graph/pull_v2.go +++ b/graph/pull_v2.go @@ -73,8 +73,8 @@ func (p *v2Puller) pullV2Repository(tag string) (err error) { } - ps, err := p.poolAdd("pull", taggedName) - if err != nil { + ps, found := p.poolAdd("pull", taggedName) + if found { // Another pull of the same repository is already taking place; just wait for it to finish msg := p.sf.FormatStatus("", "Repository %s already being pulled by another client. Waiting.", p.repoInfo.CanonicalName) ps.Wait(p.config.OutStream, msg) @@ -119,8 +119,8 @@ func (p *v2Puller) download(di *downloadInfo) { out := di.out - ps, err := p.poolAdd("pull", "img:"+di.img.ID) - if err != nil { + ps, found := p.poolAdd("pull", "img:"+di.img.ID) + if found { msg := p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Layer already being pulled by another client. Waiting.", nil) ps.Wait(out, msg) out.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Download complete", nil)) diff --git a/graph/push_v1.go b/graph/push_v1.go index 6de293403b..9769ac711f 100644 --- a/graph/push_v1.go +++ b/graph/push_v1.go @@ -214,7 +214,6 @@ func (p *v1Pusher) pushImageToEndpoint(endpoint string, imageIDs []string, tags // pushRepository pushes layers that do not already exist on the registry. func (p *v1Pusher) pushRepository(tag string) error { - logrus.Debugf("Local repo: %s", p.localRepo) p.out = ioutils.NewWriteFlusher(p.config.OutStream) imgList, tags, err := p.getImageList(tag) @@ -229,8 +228,8 @@ func (p *v1Pusher) pushRepository(tag string) error { logrus.Debugf("Pushing ID: %s with Tag: %s", data.ID, data.Tag) } - if _, err := p.poolAdd("push", p.repoInfo.LocalName); err != nil { - return err + if _, found := p.poolAdd("push", p.repoInfo.LocalName); found { + return fmt.Errorf("push or pull %s is already in progress", p.repoInfo.LocalName) } defer p.poolRemove("push", p.repoInfo.LocalName) diff --git a/graph/push_v2.go b/graph/push_v2.go index 6823ac5928..7d5ca44d96 100644 --- a/graph/push_v2.go +++ b/graph/push_v2.go @@ -57,8 +57,8 @@ func (p *v2Pusher) getImageTags(askedTag string) ([]string, error) { func (p *v2Pusher) pushV2Repository(tag string) error { localName := p.repoInfo.LocalName - if _, err := p.poolAdd("push", localName); err != nil { - return err + if _, found := p.poolAdd("push", localName); found { + return fmt.Errorf("push or pull %s is already in progress", localName) } defer p.poolRemove("push", localName) diff --git a/graph/tags.go b/graph/tags.go index 1485be6705..51b19babbb 100644 --- a/graph/tags.go +++ b/graph/tags.go @@ -428,27 +428,32 @@ func validateDigest(dgst string) error { return nil } -func (store *TagStore) poolAdd(kind, key string) (*progressreader.ProgressStatus, error) { +// poolAdd checks if a push or pull is already running, and returns (ps, true) +// if a running operation is found. Otherwise, it creates a new one and returns +// (ps, false). +func (store *TagStore) poolAdd(kind, key string) (*progressreader.ProgressStatus, bool) { store.Lock() defer store.Unlock() if p, exists := store.pullingPool[key]; exists { - return p, fmt.Errorf("pull %s is already in progress", key) + return p, true } if p, exists := store.pushingPool[key]; exists { - return p, fmt.Errorf("push %s is already in progress", key) + return p, true } ps := progressreader.NewProgressStatus() + switch kind { case "pull": store.pullingPool[key] = ps case "push": store.pushingPool[key] = ps default: - return nil, fmt.Errorf("Unknown pool type") + panic("Unknown pool type") } - return ps, nil + + return ps, false } func (store *TagStore) poolRemove(kind, key string) error { From 26c9b585042ac7dce8db83478a69fd01a4b003d7 Mon Sep 17 00:00:00 2001 From: Aaron Lehmann Date: Tue, 11 Aug 2015 10:12:47 -0700 Subject: [PATCH 3/5] Clean up ProgressStatus - Rename to Broadcaster - Document exported types - Change Wait function to just wait. Writing a message to the writer and adding the writer to the observers list are now handled by separate function calls. - Avoid importing logrus (the condition where it was used should never happen, anyway). - Make writes non-blocking Signed-off-by: Aaron Lehmann --- graph/load.go | 2 +- graph/pools_test.go | 4 +- graph/pull_v1.go | 50 ++++----- graph/pull_v2.go | 28 ++--- graph/tags.go | 28 ++--- pkg/progressreader/broadcaster.go | 146 +++++++++++++++++++++++++++ pkg/progressreader/progressstatus.go | 72 ------------- 7 files changed, 204 insertions(+), 126 deletions(-) create mode 100644 pkg/progressreader/broadcaster.go delete mode 100644 pkg/progressreader/progressstatus.go diff --git a/graph/load.go b/graph/load.go index f1dfce7a1c..a58c5a3cf9 100644 --- a/graph/load.go +++ b/graph/load.go @@ -108,7 +108,7 @@ func (s *TagStore) recursiveLoad(address, tmpImageDir string) error { // ensure no two downloads of the same layer happen at the same time if ps, found := s.poolAdd("pull", "layer:"+img.ID); found { logrus.Debugf("Image (id: %s) load is already running, waiting", img.ID) - ps.Wait(nil, nil) + ps.Wait() return nil } diff --git a/graph/pools_test.go b/graph/pools_test.go index f88a1cf15b..a7b27271b7 100644 --- a/graph/pools_test.go +++ b/graph/pools_test.go @@ -13,8 +13,8 @@ func init() { func TestPools(t *testing.T) { s := &TagStore{ - pullingPool: make(map[string]*progressreader.ProgressStatus), - pushingPool: make(map[string]*progressreader.ProgressStatus), + pullingPool: make(map[string]*progressreader.Broadcaster), + pushingPool: make(map[string]*progressreader.Broadcaster), } if _, found := s.poolAdd("pull", "test1"); found { diff --git a/graph/pull_v1.go b/graph/pull_v1.go index d5f9492790..d36e9b712a 100644 --- a/graph/pull_v1.go +++ b/graph/pull_v1.go @@ -138,29 +138,30 @@ func (p *v1Puller) pullRepository(askedTag string) error { } // ensure no two downloads of the same image happen at the same time - ps, found := p.poolAdd("pull", "img:"+img.ID) + broadcaster, found := p.poolAdd("pull", "img:"+img.ID) if found { - msg := p.sf.FormatProgress(stringid.TruncateID(img.ID), "Layer already being pulled by another client. Waiting.", nil) - ps.Wait(out, msg) + out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Layer already being pulled by another client. Waiting.", nil)) + broadcaster.Add(out) + broadcaster.Wait() out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Download complete", nil)) errors <- nil return } - ps.AddObserver(out) + broadcaster.Add(out) defer p.poolRemove("pull", "img:"+img.ID) // we need to retain it until tagging p.graph.Retain(sessionID, img.ID) imgIDs = append(imgIDs, img.ID) - ps.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s", img.Tag, p.repoInfo.CanonicalName), nil)) + broadcaster.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s", img.Tag, p.repoInfo.CanonicalName), nil)) success := false var lastErr, err error var isDownloaded bool for _, ep := range p.repoInfo.Index.Mirrors { ep += "v1/" - ps.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, mirror: %s", img.Tag, p.repoInfo.CanonicalName, ep), nil)) - if isDownloaded, err = p.pullImage(ps, img.ID, ep, repoData.Tokens); err != nil { + broadcaster.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, mirror: %s", img.Tag, p.repoInfo.CanonicalName, ep), nil)) + if isDownloaded, err = p.pullImage(broadcaster, img.ID, ep, repoData.Tokens); err != nil { // Don't report errors when pulling from mirrors. logrus.Debugf("Error pulling image (%s) from %s, mirror: %s, %s", img.Tag, p.repoInfo.CanonicalName, ep, err) continue @@ -171,12 +172,12 @@ func (p *v1Puller) pullRepository(askedTag string) error { } if !success { for _, ep := range repoData.Endpoints { - ps.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, endpoint: %s", img.Tag, p.repoInfo.CanonicalName, ep), nil)) - if isDownloaded, err = p.pullImage(ps, img.ID, ep, repoData.Tokens); err != nil { + broadcaster.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, endpoint: %s", img.Tag, p.repoInfo.CanonicalName, ep), nil)) + if isDownloaded, err = p.pullImage(broadcaster, img.ID, ep, repoData.Tokens); err != nil { // It's not ideal that only the last error is returned, it would be better to concatenate the errors. // As the error is also given to the output stream the user will see the error. lastErr = err - ps.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Error pulling image (%s) from %s, endpoint: %s, %s", img.Tag, p.repoInfo.CanonicalName, ep, err), nil)) + broadcaster.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Error pulling image (%s) from %s, endpoint: %s, %s", img.Tag, p.repoInfo.CanonicalName, ep, err), nil)) continue } layersDownloaded = layersDownloaded || isDownloaded @@ -186,11 +187,11 @@ func (p *v1Puller) pullRepository(askedTag string) error { } if !success { err := fmt.Errorf("Error pulling image (%s) from %s, %v", img.Tag, p.repoInfo.CanonicalName, lastErr) - ps.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), err.Error(), nil)) + broadcaster.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), err.Error(), nil)) errors <- err return } - ps.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Download complete", nil)) + broadcaster.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Download complete", nil)) errors <- nil } @@ -244,18 +245,19 @@ func (p *v1Puller) pullImage(out io.Writer, imgID, endpoint string, token []stri id := history[i] // ensure no two downloads of the same layer happen at the same time - ps, found := p.poolAdd("pull", "layer:"+id) + broadcaster, found := p.poolAdd("pull", "layer:"+id) if found { logrus.Debugf("Image (id: %s) pull is already running, skipping", id) - msg := p.sf.FormatProgress(stringid.TruncateID(imgID), "Layer already being pulled by another client. Waiting.", nil) - ps.Wait(out, msg) + out.Write(p.sf.FormatProgress(stringid.TruncateID(imgID), "Layer already being pulled by another client. Waiting.", nil)) + broadcaster.Add(out) + broadcaster.Wait() } else { - ps.AddObserver(out) + broadcaster.Add(out) } defer p.poolRemove("pull", "layer:"+id) if !p.graph.Exists(id) { - ps.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Pulling metadata", nil)) + broadcaster.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Pulling metadata", nil)) var ( imgJSON []byte imgSize int64 @@ -266,7 +268,7 @@ func (p *v1Puller) pullImage(out io.Writer, imgID, endpoint string, token []stri for j := 1; j <= retries; j++ { imgJSON, imgSize, err = p.session.GetRemoteImageJSON(id, endpoint) if err != nil && j == retries { - ps.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Error pulling dependent layers", nil)) + broadcaster.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Error pulling dependent layers", nil)) return layersDownloaded, err } else if err != nil { time.Sleep(time.Duration(j) * 500 * time.Millisecond) @@ -275,7 +277,7 @@ func (p *v1Puller) pullImage(out io.Writer, imgID, endpoint string, token []stri img, err = image.NewImgJSON(imgJSON) layersDownloaded = true if err != nil && j == retries { - ps.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Error pulling dependent layers", nil)) + broadcaster.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Error pulling dependent layers", nil)) return layersDownloaded, fmt.Errorf("Failed to parse json: %s", err) } else if err != nil { time.Sleep(time.Duration(j) * 500 * time.Millisecond) @@ -291,7 +293,7 @@ func (p *v1Puller) pullImage(out io.Writer, imgID, endpoint string, token []stri if j > 1 { status = fmt.Sprintf("Pulling fs layer [retries: %d]", j) } - ps.Write(p.sf.FormatProgress(stringid.TruncateID(id), status, nil)) + broadcaster.Write(p.sf.FormatProgress(stringid.TruncateID(id), status, nil)) layer, err := p.session.GetRemoteImageLayer(img.ID, endpoint, imgSize) if uerr, ok := err.(*url.Error); ok { err = uerr.Err @@ -300,7 +302,7 @@ func (p *v1Puller) pullImage(out io.Writer, imgID, endpoint string, token []stri time.Sleep(time.Duration(j) * 500 * time.Millisecond) continue } else if err != nil { - ps.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Error pulling dependent layers", nil)) + broadcaster.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Error pulling dependent layers", nil)) return layersDownloaded, err } layersDownloaded = true @@ -309,7 +311,7 @@ func (p *v1Puller) pullImage(out io.Writer, imgID, endpoint string, token []stri err = p.graph.Register(img, progressreader.New(progressreader.Config{ In: layer, - Out: ps, + Out: broadcaster, Formatter: p.sf, Size: imgSize, NewLines: false, @@ -320,14 +322,14 @@ func (p *v1Puller) pullImage(out io.Writer, imgID, endpoint string, token []stri time.Sleep(time.Duration(j) * 500 * time.Millisecond) continue } else if err != nil { - ps.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Error downloading dependent layers", nil)) + broadcaster.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Error downloading dependent layers", nil)) return layersDownloaded, err } else { break } } } - ps.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Download complete", nil)) + broadcaster.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Download complete", nil)) } return layersDownloaded, nil } diff --git a/graph/pull_v2.go b/graph/pull_v2.go index ed5605befb..afbda480ce 100644 --- a/graph/pull_v2.go +++ b/graph/pull_v2.go @@ -73,28 +73,29 @@ func (p *v2Puller) pullV2Repository(tag string) (err error) { } - ps, found := p.poolAdd("pull", taggedName) + broadcaster, found := p.poolAdd("pull", taggedName) if found { // Another pull of the same repository is already taking place; just wait for it to finish - msg := p.sf.FormatStatus("", "Repository %s already being pulled by another client. Waiting.", p.repoInfo.CanonicalName) - ps.Wait(p.config.OutStream, msg) + p.config.OutStream.Write(p.sf.FormatStatus("", "Repository %s already being pulled by another client. Waiting.", p.repoInfo.CanonicalName)) + broadcaster.Add(p.config.OutStream) + broadcaster.Wait() return nil } defer p.poolRemove("pull", taggedName) - ps.AddObserver(p.config.OutStream) + broadcaster.Add(p.config.OutStream) var layersDownloaded bool for _, tag := range tags { // pulledNew is true if either new layers were downloaded OR if existing images were newly tagged // TODO(tiborvass): should we change the name of `layersDownload`? What about message in WriteStatus? - pulledNew, err := p.pullV2Tag(ps, tag, taggedName) + pulledNew, err := p.pullV2Tag(broadcaster, tag, taggedName) if err != nil { return err } layersDownloaded = layersDownloaded || pulledNew } - writeStatus(taggedName, ps, p.sf, layersDownloaded) + writeStatus(taggedName, broadcaster, p.sf, layersDownloaded) return nil } @@ -119,16 +120,17 @@ func (p *v2Puller) download(di *downloadInfo) { out := di.out - ps, found := p.poolAdd("pull", "img:"+di.img.ID) + broadcaster, found := p.poolAdd("pull", "img:"+di.img.ID) if found { - msg := p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Layer already being pulled by another client. Waiting.", nil) - ps.Wait(out, msg) + out.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Layer already being pulled by another client. Waiting.", nil)) + broadcaster.Add(out) + broadcaster.Wait() out.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Download complete", nil)) di.err <- nil return } - ps.AddObserver(out) + broadcaster.Add(out) defer p.poolRemove("pull", "img:"+di.img.ID) tmpFile, err := ioutil.TempFile("", "GetImageBlob") if err != nil { @@ -163,7 +165,7 @@ func (p *v2Puller) download(di *downloadInfo) { reader := progressreader.New(progressreader.Config{ In: ioutil.NopCloser(io.TeeReader(layerDownload, verifier)), - Out: ps, + Out: broadcaster, Formatter: p.sf, Size: di.size, NewLines: false, @@ -172,7 +174,7 @@ func (p *v2Puller) download(di *downloadInfo) { }) io.Copy(tmpFile, reader) - ps.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Verifying Checksum", nil)) + broadcaster.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Verifying Checksum", nil)) if !verifier.Verified() { err = fmt.Errorf("filesystem layer verification failed for digest %s", di.digest) @@ -181,7 +183,7 @@ func (p *v2Puller) download(di *downloadInfo) { return } - ps.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Download complete", nil)) + broadcaster.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Download complete", nil)) logrus.Debugf("Downloaded %s to tempfile %s", di.img.ID, tmpFile.Name()) di.layer = layerDownload diff --git a/graph/tags.go b/graph/tags.go index 51b19babbb..d3da607824 100644 --- a/graph/tags.go +++ b/graph/tags.go @@ -37,8 +37,8 @@ type TagStore struct { sync.Mutex // FIXME: move push/pull-related fields // to a helper type - pullingPool map[string]*progressreader.ProgressStatus - pushingPool map[string]*progressreader.ProgressStatus + pullingPool map[string]*progressreader.Broadcaster + pushingPool map[string]*progressreader.Broadcaster registryService *registry.Service eventsService *events.Events trustService *trust.TrustStore @@ -94,8 +94,8 @@ func NewTagStore(path string, cfg *TagStoreConfig) (*TagStore, error) { graph: cfg.Graph, trustKey: cfg.Key, Repositories: make(map[string]Repository), - pullingPool: make(map[string]*progressreader.ProgressStatus), - pushingPool: make(map[string]*progressreader.ProgressStatus), + pullingPool: make(map[string]*progressreader.Broadcaster), + pushingPool: make(map[string]*progressreader.Broadcaster), registryService: cfg.Registry, eventsService: cfg.Events, trustService: cfg.Trust, @@ -428,10 +428,10 @@ func validateDigest(dgst string) error { return nil } -// poolAdd checks if a push or pull is already running, and returns (ps, true) -// if a running operation is found. Otherwise, it creates a new one and returns -// (ps, false). -func (store *TagStore) poolAdd(kind, key string) (*progressreader.ProgressStatus, bool) { +// poolAdd checks if a push or pull is already running, and returns +// (broadcaster, true) if a running operation is found. Otherwise, it creates a +// new one and returns (broadcaster, false). +func (store *TagStore) poolAdd(kind, key string) (*progressreader.Broadcaster, bool) { store.Lock() defer store.Unlock() @@ -442,18 +442,18 @@ func (store *TagStore) poolAdd(kind, key string) (*progressreader.ProgressStatus return p, true } - ps := progressreader.NewProgressStatus() + broadcaster := progressreader.NewBroadcaster() switch kind { case "pull": - store.pullingPool[key] = ps + store.pullingPool[key] = broadcaster case "push": - store.pushingPool[key] = ps + store.pushingPool[key] = broadcaster default: panic("Unknown pool type") } - return ps, false + return broadcaster, false } func (store *TagStore) poolRemove(kind, key string) error { @@ -462,12 +462,12 @@ func (store *TagStore) poolRemove(kind, key string) error { switch kind { case "pull": if ps, exists := store.pullingPool[key]; exists { - ps.Done() + ps.Close() delete(store.pullingPool, key) } case "push": if ps, exists := store.pushingPool[key]; exists { - ps.Done() + ps.Close() delete(store.pushingPool, key) } default: diff --git a/pkg/progressreader/broadcaster.go b/pkg/progressreader/broadcaster.go new file mode 100644 index 0000000000..4b08ce405d --- /dev/null +++ b/pkg/progressreader/broadcaster.go @@ -0,0 +1,146 @@ +package progressreader + +import ( + "bytes" + "errors" + "io" + "sync" +) + +// Broadcaster keeps track of one or more observers watching the progress +// of an operation. For example, if multiple clients are trying to pull an +// image, they share a Broadcaster for the download operation. +type Broadcaster struct { + sync.Mutex + // c is a channel that observers block on, waiting for the operation + // to finish. + c chan struct{} + // cond is a condition variable used to wake up observers when there's + // new data available. + cond *sync.Cond + // history is a buffer of the progress output so far, so a new observer + // can catch up. + history bytes.Buffer + // wg is a WaitGroup used to wait for all writes to finish on Close + wg sync.WaitGroup + // isClosed is set to true when Close is called to avoid closing c + // multiple times. + isClosed bool +} + +// NewBroadcaster returns a Broadcaster structure +func NewBroadcaster() *Broadcaster { + b := &Broadcaster{ + c: make(chan struct{}), + } + b.cond = sync.NewCond(b) + return b +} + +// closed returns true if and only if the broadcaster has been closed +func (broadcaster *Broadcaster) closed() bool { + select { + case <-broadcaster.c: + return true + default: + return false + } +} + +// receiveWrites runs as a goroutine so that writes don't block the Write +// function. It writes the new data in broadcaster.history each time there's +// activity on the broadcaster.cond condition variable. +func (broadcaster *Broadcaster) receiveWrites(observer io.Writer) { + n := 0 + + broadcaster.Lock() + + // The condition variable wait is at the end of this loop, so that the + // first iteration will write the history so far. + for { + newData := broadcaster.history.Bytes()[n:] + // Make a copy of newData so we can release the lock + sendData := make([]byte, len(newData), len(newData)) + copy(sendData, newData) + broadcaster.Unlock() + + if len(sendData) > 0 { + written, err := observer.Write(sendData) + if err != nil { + broadcaster.wg.Done() + return + } + n += written + } + + broadcaster.Lock() + + // detect closure of the broadcast writer + if broadcaster.closed() { + broadcaster.Unlock() + broadcaster.wg.Done() + return + } + + if broadcaster.history.Len() == n { + broadcaster.cond.Wait() + } + + // Mutex is still locked as the loop continues + } +} + +// Write adds data to the history buffer, and also writes it to all current +// observers. +func (broadcaster *Broadcaster) Write(p []byte) (n int, err error) { + broadcaster.Lock() + defer broadcaster.Unlock() + + // Is the broadcaster closed? If so, the write should fail. + if broadcaster.closed() { + return 0, errors.New("attempted write to closed progressreader Broadcaster") + } + + broadcaster.history.Write(p) + broadcaster.cond.Broadcast() + + return len(p), nil +} + +// Add adds an observer to the Broadcaster. The new observer receives the +// data from the history buffer, and also all subsequent data. +func (broadcaster *Broadcaster) Add(w io.Writer) error { + // The lock is acquired here so that Add can't race with Close + broadcaster.Lock() + defer broadcaster.Unlock() + + if broadcaster.closed() { + return errors.New("attempted to add observer to closed progressreader Broadcaster") + } + + broadcaster.wg.Add(1) + go broadcaster.receiveWrites(w) + + return nil +} + +// Close signals to all observers that the operation has finished. +func (broadcaster *Broadcaster) Close() { + broadcaster.Lock() + if broadcaster.isClosed { + broadcaster.Unlock() + return + } + broadcaster.isClosed = true + close(broadcaster.c) + broadcaster.cond.Broadcast() + broadcaster.Unlock() + + // Don't return from Close until all writers have caught up. + broadcaster.wg.Wait() +} + +// Wait blocks until the operation is marked as completed by the Done method. +func (broadcaster *Broadcaster) Wait() { + <-broadcaster.c +} diff --git a/pkg/progressreader/progressstatus.go b/pkg/progressreader/progressstatus.go deleted file mode 100644 index f536b84053..0000000000 --- a/pkg/progressreader/progressstatus.go +++ /dev/null @@ -1,72 +0,0 @@ -package progressreader - -import ( - "bytes" - "io" - "sync" - - "github.com/docker/docker/vendor/src/github.com/Sirupsen/logrus" -) - -type ProgressStatus struct { - sync.Mutex - c chan struct{} - observers []io.Writer - history bytes.Buffer -} - -func NewProgressStatus() *ProgressStatus { - return &ProgressStatus{ - c: make(chan struct{}), - observers: []io.Writer{}, - } -} - -func (ps *ProgressStatus) Write(p []byte) (n int, err error) { - ps.Lock() - defer ps.Unlock() - ps.history.Write(p) - for _, w := range ps.observers { - // copy paste from MultiWriter, replaced return with continue - n, err = w.Write(p) - if err != nil { - continue - } - if n != len(p) { - err = io.ErrShortWrite - continue - } - } - return len(p), nil -} - -func (ps *ProgressStatus) AddObserver(w io.Writer) { - ps.Lock() - defer ps.Unlock() - w.Write(ps.history.Bytes()) - ps.observers = append(ps.observers, w) -} - -func (ps *ProgressStatus) Done() { - ps.Lock() - close(ps.c) - ps.history.Reset() - ps.Unlock() -} - -func (ps *ProgressStatus) Wait(w io.Writer, msg []byte) error { - ps.Lock() - channel := ps.c - ps.Unlock() - - if channel == nil { - // defensive - logrus.Debugf("Channel is nil ") - } - if w != nil { - w.Write(msg) - ps.AddObserver(w) - } - <-channel - return nil -} From 9b9d70ad271b46a67bde57b8a807121f6e85b75f Mon Sep 17 00:00:00 2001 From: Aaron Lehmann Date: Tue, 25 Aug 2015 13:03:35 -0700 Subject: [PATCH 4/5] Remove "...already being pulled" messages These don't get seen, so they are unnecessary. Signed-off-by: Aaron Lehmann --- graph/pull_v1.go | 2 -- graph/pull_v2.go | 2 -- 2 files changed, 4 deletions(-) diff --git a/graph/pull_v1.go b/graph/pull_v1.go index d36e9b712a..13741292fd 100644 --- a/graph/pull_v1.go +++ b/graph/pull_v1.go @@ -140,7 +140,6 @@ func (p *v1Puller) pullRepository(askedTag string) error { // ensure no two downloads of the same image happen at the same time broadcaster, found := p.poolAdd("pull", "img:"+img.ID) if found { - out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Layer already being pulled by another client. Waiting.", nil)) broadcaster.Add(out) broadcaster.Wait() out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Download complete", nil)) @@ -248,7 +247,6 @@ func (p *v1Puller) pullImage(out io.Writer, imgID, endpoint string, token []stri broadcaster, found := p.poolAdd("pull", "layer:"+id) if found { logrus.Debugf("Image (id: %s) pull is already running, skipping", id) - out.Write(p.sf.FormatProgress(stringid.TruncateID(imgID), "Layer already being pulled by another client. Waiting.", nil)) broadcaster.Add(out) broadcaster.Wait() } else { diff --git a/graph/pull_v2.go b/graph/pull_v2.go index afbda480ce..96f47b873a 100644 --- a/graph/pull_v2.go +++ b/graph/pull_v2.go @@ -76,7 +76,6 @@ func (p *v2Puller) pullV2Repository(tag string) (err error) { broadcaster, found := p.poolAdd("pull", taggedName) if found { // Another pull of the same repository is already taking place; just wait for it to finish - p.config.OutStream.Write(p.sf.FormatStatus("", "Repository %s already being pulled by another client. Waiting.", p.repoInfo.CanonicalName)) broadcaster.Add(p.config.OutStream) broadcaster.Wait() return nil @@ -122,7 +121,6 @@ func (p *v2Puller) download(di *downloadInfo) { broadcaster, found := p.poolAdd("pull", "img:"+di.img.ID) if found { - out.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Layer already being pulled by another client. Waiting.", nil)) broadcaster.Add(out) broadcaster.Wait() out.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Download complete", nil)) From 317a5462e428fab6248fe3b028822250c8c9ff7f Mon Sep 17 00:00:00 2001 From: Aaron Lehmann Date: Fri, 28 Aug 2015 10:09:00 -0700 Subject: [PATCH 5/5] Make the broadcaster write messages to the observers in the same units they were written to the broadcaster This means the writing to a WriteFlusher will flush in the same places as it would if the broadcaster wasn't sitting in front of it. Signed-off-by: Aaron Lehmann --- pkg/progressreader/broadcaster.go | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/pkg/progressreader/broadcaster.go b/pkg/progressreader/broadcaster.go index 4b08ce405d..429b1d0f1b 100644 --- a/pkg/progressreader/broadcaster.go +++ b/pkg/progressreader/broadcaster.go @@ -1,7 +1,6 @@ package progressreader import ( - "bytes" "errors" "io" "sync" @@ -19,8 +18,10 @@ type Broadcaster struct { // new data available. cond *sync.Cond // history is a buffer of the progress output so far, so a new observer - // can catch up. - history bytes.Buffer + // can catch up. The history is stored as a slice of separate byte + // slices, so that if the writer is a WriteFlusher, the flushes will + // happen in the right places. + history [][]byte // wg is a WaitGroup used to wait for all writes to finish on Close wg sync.WaitGroup // isClosed is set to true when Close is called to avoid closing c @@ -58,19 +59,20 @@ func (broadcaster *Broadcaster) receiveWrites(observer io.Writer) { // The condition variable wait is at the end of this loop, so that the // first iteration will write the history so far. for { - newData := broadcaster.history.Bytes()[n:] + newData := broadcaster.history[n:] // Make a copy of newData so we can release the lock - sendData := make([]byte, len(newData), len(newData)) + sendData := make([][]byte, len(newData), len(newData)) copy(sendData, newData) broadcaster.Unlock() - if len(sendData) > 0 { - written, err := observer.Write(sendData) + for len(sendData) > 0 { + _, err := observer.Write(sendData[0]) if err != nil { broadcaster.wg.Done() return } - n += written + n++ + sendData = sendData[1:] } broadcaster.Lock() @@ -82,7 +84,7 @@ func (broadcaster *Broadcaster) receiveWrites(observer io.Writer) { return } - if broadcaster.history.Len() == n { + if len(broadcaster.history) == n { broadcaster.cond.Wait() } @@ -101,7 +103,11 @@ func (broadcaster *Broadcaster) Write(p []byte) (n int, err error) { return 0, errors.New("attempted write to closed progressreader Broadcaster") } - broadcaster.history.Write(p) + // Add message in p to the history slice + newEntry := make([]byte, len(p), len(p)) + copy(newEntry, p) + broadcaster.history = append(broadcaster.history, newEntry) + broadcaster.cond.Broadcast() return len(p), nil