From 4c7dc9806cb88d829b63ea90689e7e630fc59ba2 Mon Sep 17 00:00:00 2001 From: Sebastiaan van Stijn Date: Fri, 18 Feb 2022 14:03:35 +0100 Subject: [PATCH] distribution/xfer: un-export transfer interface accessors This interface is internal to the package, so there's no need to export it's methods. Signed-off-by: Sebastiaan van Stijn --- distribution/xfer/download.go | 42 +++++++++++----------- distribution/xfer/transfer.go | 58 +++++++++++++++--------------- distribution/xfer/transfer_test.go | 40 ++++++++++----------- distribution/xfer/upload.go | 10 +++--- 4 files changed, 75 insertions(+), 75 deletions(-) diff --git a/distribution/xfer/download.go b/distribution/xfer/download.go index e3775db9f5..ff417ea99d 100644 --- a/distribution/xfer/download.go +++ b/distribution/xfer/download.go @@ -151,7 +151,7 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima var topDownloadUncasted transfer if existingDownload, ok := downloadsByKey[key]; ok { xferFunc := ldm.makeDownloadFuncFromDownload(descriptor, existingDownload, topDownload) - defer topDownload.transfer.Release(watcher) + defer topDownload.transfer.release(watcher) topDownloadUncasted, watcher = ldm.tm.transfer(transferKey, xferFunc, progressOutput) topDownload = topDownloadUncasted.(*downloadTransfer) continue @@ -163,7 +163,7 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima var xferFunc DoFunc if topDownload != nil { xferFunc = ldm.makeDownloadFunc(descriptor, "", topDownload) - defer topDownload.transfer.Release(watcher) + defer topDownload.transfer.release(watcher) } else { xferFunc = ldm.makeDownloadFunc(descriptor, rootFS.ChainID(), nil) } @@ -192,15 +192,15 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima select { case <-ctx.Done(): - topDownload.transfer.Release(watcher) + topDownload.transfer.release(watcher) return rootFS, func() {}, ctx.Err() - case <-topDownload.Done(): + case <-topDownload.done(): break } l, err := topDownload.result() if err != nil { - topDownload.transfer.Release(watcher) + topDownload.transfer.release(watcher) return rootFS, func() {}, err } @@ -208,13 +208,13 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima // base layer on Windows. for range layers { if l == nil { - topDownload.transfer.Release(watcher) + topDownload.transfer.release(watcher) return rootFS, func() {}, errors.New("internal error: too few parent layers") } rootFS.DiffIDs = append([]layer.DiffID{l.DiffID()}, rootFS.DiffIDs...) l = l.Parent() } - return rootFS, func() { topDownload.transfer.Release(watcher) }, err + return rootFS, func() { topDownload.transfer.release(watcher) }, err } // makeDownloadFunc returns a function that performs the layer download and @@ -247,7 +247,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor, // Did the parent download already fail or get // cancelled? select { - case <-parentDownload.Done(): + case <-parentDownload.done(): _, err := parentDownload.result() if err != nil { d.err = err @@ -267,7 +267,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor, defer descriptor.Close() for { - downloadReader, size, err = descriptor.Download(d.transfer.Context(), progressOutput) + downloadReader, size, err = descriptor.Download(d.transfer.context(), progressOutput) if err == nil { break } @@ -275,7 +275,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor, // If an error was returned because the context // was cancelled, we shouldn't retry. select { - case <-d.transfer.Context().Done(): + case <-d.transfer.context().Done(): d.err = err return default: @@ -302,7 +302,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor, ticker.Stop() break selectLoop } - case <-d.transfer.Context().Done(): + case <-d.transfer.context().Done(): ticker.Stop() d.err = errors.New("download cancelled during retry delay") return @@ -315,11 +315,11 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor, if parentDownload != nil { select { - case <-d.transfer.Context().Done(): + case <-d.transfer.context().Done(): d.err = errors.New("layer registration cancelled") downloadReader.Close() return - case <-parentDownload.Done(): + case <-parentDownload.done(): } l, err := parentDownload.result() @@ -331,7 +331,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor, parentLayer = l.ChainID() } - reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(d.transfer.Context(), downloadReader), progressOutput, size, descriptor.ID(), "Extracting") + reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(d.transfer.context(), downloadReader), progressOutput, size, descriptor.ID(), "Extracting") defer reader.Close() inflatedLayerData, err := archive.DecompressStream(reader) @@ -351,7 +351,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor, } if err != nil { select { - case <-d.transfer.Context().Done(): + case <-d.transfer.context().Done(): d.err = errors.New("layer registration cancelled") default: d.err = fmt.Errorf("failed to register layer: %v", err) @@ -368,7 +368,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor, // Doesn't actually need to be its own goroutine, but // done like this so we can defer close(c). go func() { - <-d.transfer.Released() + <-d.transfer.released() if d.layer != nil { layer.ReleaseAndLog(d.layerStore, d.layer) } @@ -403,10 +403,10 @@ func (ldm *LayerDownloadManager) makeDownloadFuncFromDownload(descriptor Downloa close(inactive) select { - case <-d.transfer.Context().Done(): + case <-d.transfer.context().Done(): d.err = errors.New("layer registration cancelled") return - case <-parentDownload.Done(): + case <-parentDownload.done(): } l, err := parentDownload.result() @@ -420,10 +420,10 @@ func (ldm *LayerDownloadManager) makeDownloadFuncFromDownload(descriptor Downloa // parentDownload finished, but wait for it explicitly // to be sure. select { - case <-d.transfer.Context().Done(): + case <-d.transfer.context().Done(): d.err = errors.New("layer registration cancelled") return - case <-sourceDownload.Done(): + case <-sourceDownload.done(): } l, err = sourceDownload.result() @@ -461,7 +461,7 @@ func (ldm *LayerDownloadManager) makeDownloadFuncFromDownload(descriptor Downloa // Doesn't actually need to be its own goroutine, but // done like this so we can defer close(c). go func() { - <-d.transfer.Released() + <-d.transfer.released() if d.layer != nil { layer.ReleaseAndLog(d.layerStore, d.layer) } diff --git a/distribution/xfer/transfer.go b/distribution/xfer/transfer.go index 6f85592449..1693708778 100644 --- a/distribution/xfer/transfer.go +++ b/distribution/xfer/transfer.go @@ -36,13 +36,13 @@ type watcher struct { // transfer represents an in-progress transfer. type transfer interface { - Watch(progressOutput progress.Output) *watcher - Release(*watcher) - Context() context.Context - Close() - Done() <-chan struct{} - Released() <-chan struct{} - Broadcast(mainProgressChan <-chan progress.Progress) + watch(progressOutput progress.Output) *watcher + release(*watcher) + context() context.Context + close() + done() <-chan struct{} + released() <-chan struct{} + broadcast(mainProgressChan <-chan progress.Progress) } type xfer struct { @@ -62,9 +62,9 @@ type xfer struct { // running remains open as long as the transfer is in progress. running chan struct{} - // released stays open until all watchers release the transfer and + // releasedChan stays open until all watchers release the transfer and // the transfer is no longer tracked by the transferManager. - released chan struct{} + releasedChan chan struct{} // broadcastDone is true if the main progress channel has closed. broadcastDone bool @@ -82,7 +82,7 @@ func newTransfer() transfer { t := &xfer{ watchers: make(map[chan struct{}]*watcher), running: make(chan struct{}), - released: make(chan struct{}), + releasedChan: make(chan struct{}), broadcastSyncChan: make(chan struct{}), } @@ -95,7 +95,7 @@ func newTransfer() transfer { } // Broadcast copies the progress and error output to all viewers. -func (t *xfer) Broadcast(mainProgressChan <-chan progress.Progress) { +func (t *xfer) broadcast(mainProgressChan <-chan progress.Progress) { for { var ( p progress.Progress @@ -137,7 +137,7 @@ func (t *xfer) Broadcast(mainProgressChan <-chan progress.Progress) { // Watch adds a watcher to the transfer. The supplied channel gets progress // updates and is closed when the transfer finishes. -func (t *xfer) Watch(progressOutput progress.Output) *watcher { +func (t *xfer) watch(progressOutput progress.Output) *watcher { t.mu.Lock() defer t.mu.Unlock() @@ -205,7 +205,7 @@ func (t *xfer) Watch(progressOutput progress.Output) *watcher { // to be notified about the progress of the transfer. All calls to Watch must // be paired with later calls to Release so that the lifecycle of the transfer // is properly managed. -func (t *xfer) Release(watcher *watcher) { +func (t *xfer) release(watcher *watcher) { t.mu.Lock() delete(t.watchers, watcher.releaseChan) @@ -216,9 +216,9 @@ func (t *xfer) Release(watcher *watcher) { // while waiting for a previous watcher goroutine to // finish. select { - case <-t.released: + case <-t.releasedChan: default: - close(t.released) + close(t.releasedChan) } } else { t.cancel() @@ -233,7 +233,7 @@ func (t *xfer) Release(watcher *watcher) { // Done returns a channel which is closed if the transfer completes or is // cancelled. Note that having 0 watchers causes a transfer to be cancelled. -func (t *xfer) Done() <-chan struct{} { +func (t *xfer) done() <-chan struct{} { // Note that this doesn't return t.ctx.Done() because that channel will // be closed the moment Cancel is called, and we need to return a // channel that blocks until a cancellation is actually acknowledged by @@ -243,22 +243,22 @@ func (t *xfer) Done() <-chan struct{} { // Released returns a channel which is closed once all watchers release the // transfer AND the transfer is no longer tracked by the transferManager. -func (t *xfer) Released() <-chan struct{} { - return t.released +func (t *xfer) released() <-chan struct{} { + return t.releasedChan } // Context returns the context associated with the transfer. -func (t *xfer) Context() context.Context { +func (t *xfer) context() context.Context { return t.ctx } // Close is called by the transferManager when the transfer is no longer // being tracked. -func (t *xfer) Close() { +func (t *xfer) close() { t.mu.Lock() t.closed = true if len(t.watchers) == 0 { - close(t.released) + close(t.releasedChan) } t.mu.Unlock() } @@ -311,13 +311,13 @@ func (tm *transferManager) transfer(key string, xferFunc DoFunc, progressOutput break } // transfer is already in progress. - watcher := xfer.Watch(progressOutput) + watcher := xfer.watch(progressOutput) select { - case <-xfer.Context().Done(): + case <-xfer.context().Done(): // We don't want to watch a transfer that has been cancelled. // Wait for it to be removed from the map and try again. - xfer.Release(watcher) + xfer.release(watcher) tm.mu.Unlock() // The goroutine that removes this transfer from the // map is also waiting for xfer.Done(), so yield to it. @@ -327,7 +327,7 @@ func (tm *transferManager) transfer(key string, xferFunc DoFunc, progressOutput // this very rare case seems better than bloating the // interface definition. runtime.Gosched() - <-xfer.Done() + <-xfer.done() tm.mu.Lock() default: return xfer, watcher @@ -346,8 +346,8 @@ func (tm *transferManager) transfer(key string, xferFunc DoFunc, progressOutput mainProgressChan := make(chan progress.Progress) xfer := xferFunc(mainProgressChan, start, inactive) - watcher := xfer.Watch(progressOutput) - go xfer.Broadcast(mainProgressChan) + watcher := xfer.watch(progressOutput) + go xfer.broadcast(mainProgressChan) tm.transfers[key] = xfer // When the transfer is finished, remove from the map. @@ -359,14 +359,14 @@ func (tm *transferManager) transfer(key string, xferFunc DoFunc, progressOutput tm.inactivate(start) tm.mu.Unlock() inactive = nil - case <-xfer.Done(): + case <-xfer.done(): tm.mu.Lock() if inactive != nil { tm.inactivate(start) } delete(tm.transfers, key) tm.mu.Unlock() - xfer.Close() + xfer.close() return } } diff --git a/distribution/xfer/transfer_test.go b/distribution/xfer/transfer_test.go index bae9299767..9faecb8766 100644 --- a/distribution/xfer/transfer_test.go +++ b/distribution/xfer/transfer_test.go @@ -54,8 +54,8 @@ func TestTransfer(t *testing.T) { } for i, xfer := range xfers { - <-xfer.Done() - xfer.Release(watchers[i]) + <-xfer.done() + xfer.release(watchers[i]) } close(progressChan) <-progressDone @@ -112,8 +112,8 @@ func TestConcurrencyLimit(t *testing.T) { } for i, xfer := range xfers { - <-xfer.Done() - xfer.Release(watchers[i]) + <-xfer.done() + xfer.release(watchers[i]) } close(progressChan) <-progressDone @@ -174,8 +174,8 @@ func TestInactiveJobs(t *testing.T) { close(testDone) for i, xfer := range xfers { - <-xfer.Done() - xfer.Release(watchers[i]) + <-xfer.done() + xfer.release(watchers[i]) } close(progressChan) <-progressDone @@ -201,7 +201,7 @@ func TestWatchRelease(t *testing.T) { for i := int64(0); ; i++ { select { case <-time.After(10 * time.Millisecond): - case <-xfer.Context().Done(): + case <-xfer.context().Done(): return } progressChan <- progress.Progress{ID: id, Action: "testing", Current: i, Total: 10} @@ -245,7 +245,7 @@ func TestWatchRelease(t *testing.T) { watchers[i].progressChan = make(chan progress.Progress) watchers[i].progressDone = make(chan struct{}) watchers[i].receivedFirstProgress = make(chan struct{}) - watchers[i].watcher = xfer.Watch(progress.ChanOutput(watchers[i].progressChan)) + watchers[i].watcher = xfer.watch(progress.ChanOutput(watchers[i].progressChan)) go progressConsumer(watchers[i]) } @@ -260,17 +260,17 @@ func TestWatchRelease(t *testing.T) { // Release one watcher every 5ms for _, w := range watchers { - xfer.Release(w.watcher) + xfer.release(w.watcher) <-time.After(5 * time.Millisecond) } // Now that all watchers have been released, Released() should // return a closed channel. - <-xfer.Released() + <-xfer.released() // Done() should return a closed channel because the xfer func returned // due to cancellation. - <-xfer.Done() + <-xfer.done() for _, w := range watchers { close(w.progressChan) @@ -298,22 +298,22 @@ func TestWatchFinishedTransfer(t *testing.T) { xfer, watchers[0] = tm.transfer("id1", makeXferFunc("id1"), progress.ChanOutput(make(chan progress.Progress))) // Give it a watcher immediately - watchers[1] = xfer.Watch(progress.ChanOutput(make(chan progress.Progress))) + watchers[1] = xfer.watch(progress.ChanOutput(make(chan progress.Progress))) // Wait for the transfer to complete - <-xfer.Done() + <-xfer.done() // Set up another watcher - watchers[2] = xfer.Watch(progress.ChanOutput(make(chan progress.Progress))) + watchers[2] = xfer.watch(progress.ChanOutput(make(chan progress.Progress))) // Release the watchers for _, w := range watchers { - xfer.Release(w) + xfer.release(w) } // Now that all watchers have been released, Released() should // return a closed channel. - <-xfer.Released() + <-xfer.released() } func TestDuplicateTransfer(t *testing.T) { @@ -333,7 +333,7 @@ func TestDuplicateTransfer(t *testing.T) { for i := int64(0); ; i++ { select { case <-time.After(10 * time.Millisecond): - case <-xfer.Context().Done(): + case <-xfer.context().Done(): return } progressChan <- progress.Progress{ID: id, Action: "testing", Current: i, Total: 10} @@ -390,17 +390,17 @@ func TestDuplicateTransfer(t *testing.T) { // Release one watcher every 5ms for _, t := range transfers { - t.xfer.Release(t.watcher) + t.xfer.release(t.watcher) <-time.After(5 * time.Millisecond) } for _, t := range transfers { // Now that all watchers have been released, Released() should // return a closed channel. - <-t.xfer.Released() + <-t.xfer.released() // Done() should return a closed channel because the xfer func returned // due to cancellation. - <-t.xfer.Done() + <-t.xfer.done() } for _, t := range transfers { diff --git a/distribution/xfer/upload.go b/distribution/xfer/upload.go index b97a2d8c9a..9f870afb6e 100644 --- a/distribution/xfer/upload.go +++ b/distribution/xfer/upload.go @@ -80,7 +80,7 @@ func (lum *LayerUploadManager) Upload(ctx context.Context, layers []UploadDescri xferFunc := lum.makeUploadFunc(descriptor) upload, watcher := lum.tm.transfer(descriptor.Key(), xferFunc, progressOutput) - defer upload.Release(watcher) + defer upload.release(watcher) uploads = append(uploads, upload.(*uploadTransfer)) dedupDescriptors[key] = upload.(*uploadTransfer) } @@ -89,7 +89,7 @@ func (lum *LayerUploadManager) Upload(ctx context.Context, layers []UploadDescri select { case <-ctx.Done(): return ctx.Err() - case <-upload.transfer.Done(): + case <-upload.transfer.done(): if upload.err != nil { return upload.err } @@ -124,7 +124,7 @@ func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) DoFun retries := 0 for { - remoteDescriptor, err := descriptor.Upload(u.transfer.Context(), progressOutput) + remoteDescriptor, err := descriptor.Upload(u.transfer.context(), progressOutput) if err == nil { u.remoteDescriptor = remoteDescriptor break @@ -133,7 +133,7 @@ func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) DoFun // If an error was returned because the context // was cancelled, we shouldn't retry. select { - case <-u.transfer.Context().Done(): + case <-u.transfer.context().Done(): u.err = err return default: @@ -160,7 +160,7 @@ func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) DoFun ticker.Stop() break selectLoop } - case <-u.transfer.Context().Done(): + case <-u.transfer.context().Done(): ticker.Stop() u.err = errors.New("upload cancelled during retry delay") return