distribution/xfer: un-export transfer interface accessors

This interface is internal to the package, so there's no need to export
it's methods.

Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
This commit is contained in:
Sebastiaan van Stijn 2022-02-18 14:03:35 +01:00
parent 849d8c2d02
commit 4c7dc9806c
No known key found for this signature in database
GPG key ID: 76698F39D527CE8C
4 changed files with 75 additions and 75 deletions

View file

@ -151,7 +151,7 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima
var topDownloadUncasted transfer
if existingDownload, ok := downloadsByKey[key]; ok {
xferFunc := ldm.makeDownloadFuncFromDownload(descriptor, existingDownload, topDownload)
defer topDownload.transfer.Release(watcher)
defer topDownload.transfer.release(watcher)
topDownloadUncasted, watcher = ldm.tm.transfer(transferKey, xferFunc, progressOutput)
topDownload = topDownloadUncasted.(*downloadTransfer)
continue
@ -163,7 +163,7 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima
var xferFunc DoFunc
if topDownload != nil {
xferFunc = ldm.makeDownloadFunc(descriptor, "", topDownload)
defer topDownload.transfer.Release(watcher)
defer topDownload.transfer.release(watcher)
} else {
xferFunc = ldm.makeDownloadFunc(descriptor, rootFS.ChainID(), nil)
}
@ -192,15 +192,15 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima
select {
case <-ctx.Done():
topDownload.transfer.Release(watcher)
topDownload.transfer.release(watcher)
return rootFS, func() {}, ctx.Err()
case <-topDownload.Done():
case <-topDownload.done():
break
}
l, err := topDownload.result()
if err != nil {
topDownload.transfer.Release(watcher)
topDownload.transfer.release(watcher)
return rootFS, func() {}, err
}
@ -208,13 +208,13 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima
// base layer on Windows.
for range layers {
if l == nil {
topDownload.transfer.Release(watcher)
topDownload.transfer.release(watcher)
return rootFS, func() {}, errors.New("internal error: too few parent layers")
}
rootFS.DiffIDs = append([]layer.DiffID{l.DiffID()}, rootFS.DiffIDs...)
l = l.Parent()
}
return rootFS, func() { topDownload.transfer.Release(watcher) }, err
return rootFS, func() { topDownload.transfer.release(watcher) }, err
}
// makeDownloadFunc returns a function that performs the layer download and
@ -247,7 +247,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor,
// Did the parent download already fail or get
// cancelled?
select {
case <-parentDownload.Done():
case <-parentDownload.done():
_, err := parentDownload.result()
if err != nil {
d.err = err
@ -267,7 +267,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor,
defer descriptor.Close()
for {
downloadReader, size, err = descriptor.Download(d.transfer.Context(), progressOutput)
downloadReader, size, err = descriptor.Download(d.transfer.context(), progressOutput)
if err == nil {
break
}
@ -275,7 +275,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor,
// If an error was returned because the context
// was cancelled, we shouldn't retry.
select {
case <-d.transfer.Context().Done():
case <-d.transfer.context().Done():
d.err = err
return
default:
@ -302,7 +302,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor,
ticker.Stop()
break selectLoop
}
case <-d.transfer.Context().Done():
case <-d.transfer.context().Done():
ticker.Stop()
d.err = errors.New("download cancelled during retry delay")
return
@ -315,11 +315,11 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor,
if parentDownload != nil {
select {
case <-d.transfer.Context().Done():
case <-d.transfer.context().Done():
d.err = errors.New("layer registration cancelled")
downloadReader.Close()
return
case <-parentDownload.Done():
case <-parentDownload.done():
}
l, err := parentDownload.result()
@ -331,7 +331,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor,
parentLayer = l.ChainID()
}
reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(d.transfer.Context(), downloadReader), progressOutput, size, descriptor.ID(), "Extracting")
reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(d.transfer.context(), downloadReader), progressOutput, size, descriptor.ID(), "Extracting")
defer reader.Close()
inflatedLayerData, err := archive.DecompressStream(reader)
@ -351,7 +351,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor,
}
if err != nil {
select {
case <-d.transfer.Context().Done():
case <-d.transfer.context().Done():
d.err = errors.New("layer registration cancelled")
default:
d.err = fmt.Errorf("failed to register layer: %v", err)
@ -368,7 +368,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor,
// Doesn't actually need to be its own goroutine, but
// done like this so we can defer close(c).
go func() {
<-d.transfer.Released()
<-d.transfer.released()
if d.layer != nil {
layer.ReleaseAndLog(d.layerStore, d.layer)
}
@ -403,10 +403,10 @@ func (ldm *LayerDownloadManager) makeDownloadFuncFromDownload(descriptor Downloa
close(inactive)
select {
case <-d.transfer.Context().Done():
case <-d.transfer.context().Done():
d.err = errors.New("layer registration cancelled")
return
case <-parentDownload.Done():
case <-parentDownload.done():
}
l, err := parentDownload.result()
@ -420,10 +420,10 @@ func (ldm *LayerDownloadManager) makeDownloadFuncFromDownload(descriptor Downloa
// parentDownload finished, but wait for it explicitly
// to be sure.
select {
case <-d.transfer.Context().Done():
case <-d.transfer.context().Done():
d.err = errors.New("layer registration cancelled")
return
case <-sourceDownload.Done():
case <-sourceDownload.done():
}
l, err = sourceDownload.result()
@ -461,7 +461,7 @@ func (ldm *LayerDownloadManager) makeDownloadFuncFromDownload(descriptor Downloa
// Doesn't actually need to be its own goroutine, but
// done like this so we can defer close(c).
go func() {
<-d.transfer.Released()
<-d.transfer.released()
if d.layer != nil {
layer.ReleaseAndLog(d.layerStore, d.layer)
}

View file

@ -36,13 +36,13 @@ type watcher struct {
// transfer represents an in-progress transfer.
type transfer interface {
Watch(progressOutput progress.Output) *watcher
Release(*watcher)
Context() context.Context
Close()
Done() <-chan struct{}
Released() <-chan struct{}
Broadcast(mainProgressChan <-chan progress.Progress)
watch(progressOutput progress.Output) *watcher
release(*watcher)
context() context.Context
close()
done() <-chan struct{}
released() <-chan struct{}
broadcast(mainProgressChan <-chan progress.Progress)
}
type xfer struct {
@ -62,9 +62,9 @@ type xfer struct {
// running remains open as long as the transfer is in progress.
running chan struct{}
// released stays open until all watchers release the transfer and
// releasedChan stays open until all watchers release the transfer and
// the transfer is no longer tracked by the transferManager.
released chan struct{}
releasedChan chan struct{}
// broadcastDone is true if the main progress channel has closed.
broadcastDone bool
@ -82,7 +82,7 @@ func newTransfer() transfer {
t := &xfer{
watchers: make(map[chan struct{}]*watcher),
running: make(chan struct{}),
released: make(chan struct{}),
releasedChan: make(chan struct{}),
broadcastSyncChan: make(chan struct{}),
}
@ -95,7 +95,7 @@ func newTransfer() transfer {
}
// Broadcast copies the progress and error output to all viewers.
func (t *xfer) Broadcast(mainProgressChan <-chan progress.Progress) {
func (t *xfer) broadcast(mainProgressChan <-chan progress.Progress) {
for {
var (
p progress.Progress
@ -137,7 +137,7 @@ func (t *xfer) Broadcast(mainProgressChan <-chan progress.Progress) {
// Watch adds a watcher to the transfer. The supplied channel gets progress
// updates and is closed when the transfer finishes.
func (t *xfer) Watch(progressOutput progress.Output) *watcher {
func (t *xfer) watch(progressOutput progress.Output) *watcher {
t.mu.Lock()
defer t.mu.Unlock()
@ -205,7 +205,7 @@ func (t *xfer) Watch(progressOutput progress.Output) *watcher {
// to be notified about the progress of the transfer. All calls to Watch must
// be paired with later calls to Release so that the lifecycle of the transfer
// is properly managed.
func (t *xfer) Release(watcher *watcher) {
func (t *xfer) release(watcher *watcher) {
t.mu.Lock()
delete(t.watchers, watcher.releaseChan)
@ -216,9 +216,9 @@ func (t *xfer) Release(watcher *watcher) {
// while waiting for a previous watcher goroutine to
// finish.
select {
case <-t.released:
case <-t.releasedChan:
default:
close(t.released)
close(t.releasedChan)
}
} else {
t.cancel()
@ -233,7 +233,7 @@ func (t *xfer) Release(watcher *watcher) {
// Done returns a channel which is closed if the transfer completes or is
// cancelled. Note that having 0 watchers causes a transfer to be cancelled.
func (t *xfer) Done() <-chan struct{} {
func (t *xfer) done() <-chan struct{} {
// Note that this doesn't return t.ctx.Done() because that channel will
// be closed the moment Cancel is called, and we need to return a
// channel that blocks until a cancellation is actually acknowledged by
@ -243,22 +243,22 @@ func (t *xfer) Done() <-chan struct{} {
// Released returns a channel which is closed once all watchers release the
// transfer AND the transfer is no longer tracked by the transferManager.
func (t *xfer) Released() <-chan struct{} {
return t.released
func (t *xfer) released() <-chan struct{} {
return t.releasedChan
}
// Context returns the context associated with the transfer.
func (t *xfer) Context() context.Context {
func (t *xfer) context() context.Context {
return t.ctx
}
// Close is called by the transferManager when the transfer is no longer
// being tracked.
func (t *xfer) Close() {
func (t *xfer) close() {
t.mu.Lock()
t.closed = true
if len(t.watchers) == 0 {
close(t.released)
close(t.releasedChan)
}
t.mu.Unlock()
}
@ -311,13 +311,13 @@ func (tm *transferManager) transfer(key string, xferFunc DoFunc, progressOutput
break
}
// transfer is already in progress.
watcher := xfer.Watch(progressOutput)
watcher := xfer.watch(progressOutput)
select {
case <-xfer.Context().Done():
case <-xfer.context().Done():
// We don't want to watch a transfer that has been cancelled.
// Wait for it to be removed from the map and try again.
xfer.Release(watcher)
xfer.release(watcher)
tm.mu.Unlock()
// The goroutine that removes this transfer from the
// map is also waiting for xfer.Done(), so yield to it.
@ -327,7 +327,7 @@ func (tm *transferManager) transfer(key string, xferFunc DoFunc, progressOutput
// this very rare case seems better than bloating the
// interface definition.
runtime.Gosched()
<-xfer.Done()
<-xfer.done()
tm.mu.Lock()
default:
return xfer, watcher
@ -346,8 +346,8 @@ func (tm *transferManager) transfer(key string, xferFunc DoFunc, progressOutput
mainProgressChan := make(chan progress.Progress)
xfer := xferFunc(mainProgressChan, start, inactive)
watcher := xfer.Watch(progressOutput)
go xfer.Broadcast(mainProgressChan)
watcher := xfer.watch(progressOutput)
go xfer.broadcast(mainProgressChan)
tm.transfers[key] = xfer
// When the transfer is finished, remove from the map.
@ -359,14 +359,14 @@ func (tm *transferManager) transfer(key string, xferFunc DoFunc, progressOutput
tm.inactivate(start)
tm.mu.Unlock()
inactive = nil
case <-xfer.Done():
case <-xfer.done():
tm.mu.Lock()
if inactive != nil {
tm.inactivate(start)
}
delete(tm.transfers, key)
tm.mu.Unlock()
xfer.Close()
xfer.close()
return
}
}

View file

@ -54,8 +54,8 @@ func TestTransfer(t *testing.T) {
}
for i, xfer := range xfers {
<-xfer.Done()
xfer.Release(watchers[i])
<-xfer.done()
xfer.release(watchers[i])
}
close(progressChan)
<-progressDone
@ -112,8 +112,8 @@ func TestConcurrencyLimit(t *testing.T) {
}
for i, xfer := range xfers {
<-xfer.Done()
xfer.Release(watchers[i])
<-xfer.done()
xfer.release(watchers[i])
}
close(progressChan)
<-progressDone
@ -174,8 +174,8 @@ func TestInactiveJobs(t *testing.T) {
close(testDone)
for i, xfer := range xfers {
<-xfer.Done()
xfer.Release(watchers[i])
<-xfer.done()
xfer.release(watchers[i])
}
close(progressChan)
<-progressDone
@ -201,7 +201,7 @@ func TestWatchRelease(t *testing.T) {
for i := int64(0); ; i++ {
select {
case <-time.After(10 * time.Millisecond):
case <-xfer.Context().Done():
case <-xfer.context().Done():
return
}
progressChan <- progress.Progress{ID: id, Action: "testing", Current: i, Total: 10}
@ -245,7 +245,7 @@ func TestWatchRelease(t *testing.T) {
watchers[i].progressChan = make(chan progress.Progress)
watchers[i].progressDone = make(chan struct{})
watchers[i].receivedFirstProgress = make(chan struct{})
watchers[i].watcher = xfer.Watch(progress.ChanOutput(watchers[i].progressChan))
watchers[i].watcher = xfer.watch(progress.ChanOutput(watchers[i].progressChan))
go progressConsumer(watchers[i])
}
@ -260,17 +260,17 @@ func TestWatchRelease(t *testing.T) {
// Release one watcher every 5ms
for _, w := range watchers {
xfer.Release(w.watcher)
xfer.release(w.watcher)
<-time.After(5 * time.Millisecond)
}
// Now that all watchers have been released, Released() should
// return a closed channel.
<-xfer.Released()
<-xfer.released()
// Done() should return a closed channel because the xfer func returned
// due to cancellation.
<-xfer.Done()
<-xfer.done()
for _, w := range watchers {
close(w.progressChan)
@ -298,22 +298,22 @@ func TestWatchFinishedTransfer(t *testing.T) {
xfer, watchers[0] = tm.transfer("id1", makeXferFunc("id1"), progress.ChanOutput(make(chan progress.Progress)))
// Give it a watcher immediately
watchers[1] = xfer.Watch(progress.ChanOutput(make(chan progress.Progress)))
watchers[1] = xfer.watch(progress.ChanOutput(make(chan progress.Progress)))
// Wait for the transfer to complete
<-xfer.Done()
<-xfer.done()
// Set up another watcher
watchers[2] = xfer.Watch(progress.ChanOutput(make(chan progress.Progress)))
watchers[2] = xfer.watch(progress.ChanOutput(make(chan progress.Progress)))
// Release the watchers
for _, w := range watchers {
xfer.Release(w)
xfer.release(w)
}
// Now that all watchers have been released, Released() should
// return a closed channel.
<-xfer.Released()
<-xfer.released()
}
func TestDuplicateTransfer(t *testing.T) {
@ -333,7 +333,7 @@ func TestDuplicateTransfer(t *testing.T) {
for i := int64(0); ; i++ {
select {
case <-time.After(10 * time.Millisecond):
case <-xfer.Context().Done():
case <-xfer.context().Done():
return
}
progressChan <- progress.Progress{ID: id, Action: "testing", Current: i, Total: 10}
@ -390,17 +390,17 @@ func TestDuplicateTransfer(t *testing.T) {
// Release one watcher every 5ms
for _, t := range transfers {
t.xfer.Release(t.watcher)
t.xfer.release(t.watcher)
<-time.After(5 * time.Millisecond)
}
for _, t := range transfers {
// Now that all watchers have been released, Released() should
// return a closed channel.
<-t.xfer.Released()
<-t.xfer.released()
// Done() should return a closed channel because the xfer func returned
// due to cancellation.
<-t.xfer.Done()
<-t.xfer.done()
}
for _, t := range transfers {

View file

@ -80,7 +80,7 @@ func (lum *LayerUploadManager) Upload(ctx context.Context, layers []UploadDescri
xferFunc := lum.makeUploadFunc(descriptor)
upload, watcher := lum.tm.transfer(descriptor.Key(), xferFunc, progressOutput)
defer upload.Release(watcher)
defer upload.release(watcher)
uploads = append(uploads, upload.(*uploadTransfer))
dedupDescriptors[key] = upload.(*uploadTransfer)
}
@ -89,7 +89,7 @@ func (lum *LayerUploadManager) Upload(ctx context.Context, layers []UploadDescri
select {
case <-ctx.Done():
return ctx.Err()
case <-upload.transfer.Done():
case <-upload.transfer.done():
if upload.err != nil {
return upload.err
}
@ -124,7 +124,7 @@ func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) DoFun
retries := 0
for {
remoteDescriptor, err := descriptor.Upload(u.transfer.Context(), progressOutput)
remoteDescriptor, err := descriptor.Upload(u.transfer.context(), progressOutput)
if err == nil {
u.remoteDescriptor = remoteDescriptor
break
@ -133,7 +133,7 @@ func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) DoFun
// If an error was returned because the context
// was cancelled, we shouldn't retry.
select {
case <-u.transfer.Context().Done():
case <-u.transfer.context().Done():
u.err = err
return
default:
@ -160,7 +160,7 @@ func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) DoFun
ticker.Stop()
break selectLoop
}
case <-u.transfer.Context().Done():
case <-u.transfer.context().Done():
ticker.Stop()
u.err = errors.New("upload cancelled during retry delay")
return