diff --git a/distribution/xfer/download.go b/distribution/xfer/download.go index 121d980467..e3775db9f5 100644 --- a/distribution/xfer/download.go +++ b/distribution/xfer/download.go @@ -108,7 +108,7 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima var ( topLayer layer.Layer topDownload *downloadTransfer - watcher *Watcher + watcher *watcher missingLayer bool transferKey = "" downloadsByKey = make(map[string]*downloadTransfer) diff --git a/distribution/xfer/transfer.go b/distribution/xfer/transfer.go index 68ee9c3ae1..6f85592449 100644 --- a/distribution/xfer/transfer.go +++ b/distribution/xfer/transfer.go @@ -19,8 +19,8 @@ func (e DoNotRetry) Error() string { return e.Err.Error() } -// Watcher is returned by Watch and can be passed to Release to stop watching. -type Watcher struct { +// watcher is returned by Watch and can be passed to Release to stop watching. +type watcher struct { // signalChan is used to signal to the watcher goroutine that // new progress information is available, or that the transfer // has finished. @@ -36,8 +36,8 @@ type Watcher struct { // transfer represents an in-progress transfer. type transfer interface { - Watch(progressOutput progress.Output) *Watcher - Release(*Watcher) + Watch(progressOutput progress.Output) *watcher + Release(*watcher) Context() context.Context Close() Done() <-chan struct{} @@ -53,7 +53,7 @@ type xfer struct { // watchers keeps track of the goroutines monitoring progress output, // indexed by the channels that release them. - watchers map[chan struct{}]*Watcher + watchers map[chan struct{}]*watcher // lastProgress is the most recently received progress event. lastProgress progress.Progress @@ -80,7 +80,7 @@ type xfer struct { // newTransfer creates a new transfer. func newTransfer() transfer { t := &xfer{ - watchers: make(map[chan struct{}]*Watcher), + watchers: make(map[chan struct{}]*watcher), running: make(chan struct{}), released: make(chan struct{}), broadcastSyncChan: make(chan struct{}), @@ -137,11 +137,11 @@ func (t *xfer) Broadcast(mainProgressChan <-chan progress.Progress) { // Watch adds a watcher to the transfer. The supplied channel gets progress // updates and is closed when the transfer finishes. -func (t *xfer) Watch(progressOutput progress.Output) *Watcher { +func (t *xfer) Watch(progressOutput progress.Output) *watcher { t.mu.Lock() defer t.mu.Unlock() - w := &Watcher{ + w := &watcher{ releaseChan: make(chan struct{}), signalChan: make(chan struct{}), running: make(chan struct{}), @@ -205,7 +205,7 @@ func (t *xfer) Watch(progressOutput progress.Output) *Watcher { // to be notified about the progress of the transfer. All calls to Watch must // be paired with later calls to Release so that the lifecycle of the transfer // is properly managed. -func (t *xfer) Release(watcher *Watcher) { +func (t *xfer) Release(watcher *watcher) { t.mu.Lock() delete(t.watchers, watcher.releaseChan) @@ -301,7 +301,7 @@ func (tm *transferManager) setConcurrency(concurrency int) { // transfer checks if a transfer matching the given key is in progress. If not, // it starts one by calling xferFunc. The caller supplies a channel which // receives progress output from the transfer. -func (tm *transferManager) transfer(key string, xferFunc DoFunc, progressOutput progress.Output) (transfer, *Watcher) { +func (tm *transferManager) transfer(key string, xferFunc DoFunc, progressOutput progress.Output) (transfer, *watcher) { tm.mu.Lock() defer tm.mu.Unlock() diff --git a/distribution/xfer/transfer_test.go b/distribution/xfer/transfer_test.go index 6fd3c6dd87..bae9299767 100644 --- a/distribution/xfer/transfer_test.go +++ b/distribution/xfer/transfer_test.go @@ -48,7 +48,7 @@ func TestTransfer(t *testing.T) { // Start a few transfers ids := []string{"id1", "id2", "id3"} xfers := make([]transfer, len(ids)) - watchers := make([]*Watcher, len(ids)) + watchers := make([]*watcher, len(ids)) for i, id := range ids { xfers[i], watchers[i] = tm.transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan)) } @@ -106,7 +106,7 @@ func TestConcurrencyLimit(t *testing.T) { // Start more transfers than the concurrency limit ids := []string{"id1", "id2", "id3", "id4", "id5", "id6", "id7", "id8"} xfers := make([]transfer, len(ids)) - watchers := make([]*Watcher, len(ids)) + watchers := make([]*watcher, len(ids)) for i, id := range ids { xfers[i], watchers[i] = tm.transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan)) } @@ -167,7 +167,7 @@ func TestInactiveJobs(t *testing.T) { // Start more transfers than the concurrency limit ids := []string{"id1", "id2", "id3", "id4", "id5", "id6", "id7", "id8"} xfers := make([]transfer, len(ids)) - watchers := make([]*Watcher, len(ids)) + watchers := make([]*watcher, len(ids)) for i, id := range ids { xfers[i], watchers[i] = tm.transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan)) } @@ -214,7 +214,7 @@ func TestWatchRelease(t *testing.T) { tm := newTransferManager(5) type watcherInfo struct { - watcher *Watcher + watcher *watcher progressChan chan progress.Progress progressDone chan struct{} receivedFirstProgress chan struct{} @@ -293,7 +293,7 @@ func TestWatchFinishedTransfer(t *testing.T) { tm := newTransferManager(5) // Start a transfer - watchers := make([]*Watcher, 3) + watchers := make([]*watcher, 3) var xfer transfer xfer, watchers[0] = tm.transfer("id1", makeXferFunc("id1"), progress.ChanOutput(make(chan progress.Progress))) @@ -347,7 +347,7 @@ func TestDuplicateTransfer(t *testing.T) { type transferInfo struct { xfer transfer - watcher *Watcher + watcher *watcher progressChan chan progress.Progress progressDone chan struct{} receivedFirstProgress chan struct{}