Merge pull request #43183 from thaJeztah/cleanup_distribution

distribution/xfer: refactor to reduce public api/interface
This commit is contained in:
Akihiro Suda 2022-02-26 23:50:03 +09:00 committed by GitHub
commit d809ad98e3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 198 additions and 202 deletions

View file

@ -23,21 +23,21 @@ const maxDownloadAttempts = 5
// layers.
type LayerDownloadManager struct {
layerStore layer.Store
tm TransferManager
tm *transferManager
waitDuration time.Duration
maxDownloadAttempts int
}
// SetConcurrency sets the max concurrent downloads for each pull
func (ldm *LayerDownloadManager) SetConcurrency(concurrency int) {
ldm.tm.SetConcurrency(concurrency)
ldm.tm.setConcurrency(concurrency)
}
// NewLayerDownloadManager returns a new LayerDownloadManager.
func NewLayerDownloadManager(layerStore layer.Store, concurrencyLimit int, options ...func(*LayerDownloadManager)) *LayerDownloadManager {
func NewLayerDownloadManager(layerStore layer.Store, concurrencyLimit int, options ...DownloadOption) *LayerDownloadManager {
manager := LayerDownloadManager{
layerStore: layerStore,
tm: NewTransferManager(concurrencyLimit),
tm: newTransferManager(concurrencyLimit),
waitDuration: time.Second,
maxDownloadAttempts: maxDownloadAttempts,
}
@ -47,16 +47,19 @@ func NewLayerDownloadManager(layerStore layer.Store, concurrencyLimit int, optio
return &manager
}
// DownloadOption set options for the LayerDownloadManager.
type DownloadOption func(*LayerDownloadManager)
// WithMaxDownloadAttempts configures the maximum number of download
// attempts for a download manager.
func WithMaxDownloadAttempts(max int) func(*LayerDownloadManager) {
func WithMaxDownloadAttempts(max int) DownloadOption {
return func(dlm *LayerDownloadManager) {
dlm.maxDownloadAttempts = max
}
}
type downloadTransfer struct {
Transfer
transfer
layerStore layer.Store
layer layer.Layer
@ -87,13 +90,17 @@ type DownloadDescriptor interface {
Close()
}
// DownloadDescriptorWithRegistered is a DownloadDescriptor that has an
// additional Registered method which gets called after a downloaded layer is
// registered. This allows the user of the download manager to know the DiffID
// of each registered layer. This method is called if a cast to
// DownloadDescriptorWithRegistered is successful.
type DownloadDescriptorWithRegistered interface {
DownloadDescriptor
// DigestRegisterer can be implemented by a DownloadDescriptor, and provides a
// Registered method which gets called after a downloaded layer is registered.
// This allows the user of the download manager to know the DiffID of each
// registered layer. This method is called if a cast to DigestRegisterer is
// successful.
type DigestRegisterer interface {
// TODO existing implementations in distribution and builder-next swallow errors
// when registering the diffID. Consider changing the Registered signature
// to return the error.
Registered(diffID layer.DiffID)
}
@ -108,7 +115,7 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima
var (
topLayer layer.Layer
topDownload *downloadTransfer
watcher *Watcher
watcher *watcher
missingLayer bool
transferKey = ""
downloadsByKey = make(map[string]*downloadTransfer)
@ -137,8 +144,7 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima
missingLayer = false
rootFS.Append(diffID)
// Register this repository as a source of this layer.
withRegistered, hasRegistered := descriptor.(DownloadDescriptorWithRegistered)
if hasRegistered { // As layerstore may set the driver
if withRegistered, ok := descriptor.(DigestRegisterer); ok { // As layerstore may set the driver
withRegistered.Registered(diffID)
}
continue
@ -148,11 +154,11 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima
// Does this layer have the same data as a previous layer in
// the stack? If so, avoid downloading it more than once.
var topDownloadUncasted Transfer
var topDownloadUncasted transfer
if existingDownload, ok := downloadsByKey[key]; ok {
xferFunc := ldm.makeDownloadFuncFromDownload(descriptor, existingDownload, topDownload)
defer topDownload.Transfer.Release(watcher)
topDownloadUncasted, watcher = ldm.tm.Transfer(transferKey, xferFunc, progressOutput)
defer topDownload.transfer.release(watcher)
topDownloadUncasted, watcher = ldm.tm.transfer(transferKey, xferFunc, progressOutput)
topDownload = topDownloadUncasted.(*downloadTransfer)
continue
}
@ -160,14 +166,14 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima
// Layer is not known to exist - download and register it.
progress.Update(progressOutput, descriptor.ID(), "Pulling fs layer")
var xferFunc DoFunc
var xferFunc doFunc
if topDownload != nil {
xferFunc = ldm.makeDownloadFunc(descriptor, "", topDownload)
defer topDownload.Transfer.Release(watcher)
defer topDownload.transfer.release(watcher)
} else {
xferFunc = ldm.makeDownloadFunc(descriptor, rootFS.ChainID(), nil)
}
topDownloadUncasted, watcher = ldm.tm.Transfer(transferKey, xferFunc, progressOutput)
topDownloadUncasted, watcher = ldm.tm.transfer(transferKey, xferFunc, progressOutput)
topDownload = topDownloadUncasted.(*downloadTransfer)
downloadsByKey[key] = topDownload
}
@ -192,15 +198,15 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima
select {
case <-ctx.Done():
topDownload.Transfer.Release(watcher)
topDownload.transfer.release(watcher)
return rootFS, func() {}, ctx.Err()
case <-topDownload.Done():
case <-topDownload.done():
break
}
l, err := topDownload.result()
if err != nil {
topDownload.Transfer.Release(watcher)
topDownload.transfer.release(watcher)
return rootFS, func() {}, err
}
@ -208,13 +214,13 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima
// base layer on Windows.
for range layers {
if l == nil {
topDownload.Transfer.Release(watcher)
topDownload.transfer.release(watcher)
return rootFS, func() {}, errors.New("internal error: too few parent layers")
}
rootFS.DiffIDs = append([]layer.DiffID{l.DiffID()}, rootFS.DiffIDs...)
l = l.Parent()
}
return rootFS, func() { topDownload.Transfer.Release(watcher) }, err
return rootFS, func() { topDownload.transfer.release(watcher) }, err
}
// makeDownloadFunc returns a function that performs the layer download and
@ -222,10 +228,10 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima
// complete before the registration step, and registers the downloaded data
// on top of parentDownload's resulting layer. Otherwise, it registers the
// layer on top of the ChainID given by parentLayer.
func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor, parentLayer layer.ChainID, parentDownload *downloadTransfer) DoFunc {
return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer {
func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor, parentLayer layer.ChainID, parentDownload *downloadTransfer) doFunc {
return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) transfer {
d := &downloadTransfer{
Transfer: NewTransfer(),
transfer: newTransfer(),
layerStore: ldm.layerStore,
}
@ -247,7 +253,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor,
// Did the parent download already fail or get
// cancelled?
select {
case <-parentDownload.Done():
case <-parentDownload.done():
_, err := parentDownload.result()
if err != nil {
d.err = err
@ -267,7 +273,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor,
defer descriptor.Close()
for {
downloadReader, size, err = descriptor.Download(d.Transfer.Context(), progressOutput)
downloadReader, size, err = descriptor.Download(d.transfer.context(), progressOutput)
if err == nil {
break
}
@ -275,7 +281,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor,
// If an error was returned because the context
// was cancelled, we shouldn't retry.
select {
case <-d.Transfer.Context().Done():
case <-d.transfer.context().Done():
d.err = err
return
default:
@ -302,7 +308,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor,
ticker.Stop()
break selectLoop
}
case <-d.Transfer.Context().Done():
case <-d.transfer.context().Done():
ticker.Stop()
d.err = errors.New("download cancelled during retry delay")
return
@ -315,11 +321,11 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor,
if parentDownload != nil {
select {
case <-d.Transfer.Context().Done():
case <-d.transfer.context().Done():
d.err = errors.New("layer registration cancelled")
downloadReader.Close()
return
case <-parentDownload.Done():
case <-parentDownload.done():
}
l, err := parentDownload.result()
@ -331,7 +337,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor,
parentLayer = l.ChainID()
}
reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(d.Transfer.Context(), downloadReader), progressOutput, size, descriptor.ID(), "Extracting")
reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(d.transfer.context(), downloadReader), progressOutput, size, descriptor.ID(), "Extracting")
defer reader.Close()
inflatedLayerData, err := archive.DecompressStream(reader)
@ -351,7 +357,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor,
}
if err != nil {
select {
case <-d.Transfer.Context().Done():
case <-d.transfer.context().Done():
d.err = errors.New("layer registration cancelled")
default:
d.err = fmt.Errorf("failed to register layer: %v", err)
@ -360,15 +366,15 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor,
}
progress.Update(progressOutput, descriptor.ID(), "Pull complete")
withRegistered, hasRegistered := descriptor.(DownloadDescriptorWithRegistered)
if hasRegistered {
if withRegistered, ok := descriptor.(DigestRegisterer); ok {
withRegistered.Registered(d.layer.DiffID())
}
// Doesn't actually need to be its own goroutine, but
// done like this so we can defer close(c).
go func() {
<-d.Transfer.Released()
<-d.transfer.released()
if d.layer != nil {
layer.ReleaseAndLog(d.layerStore, d.layer)
}
@ -386,10 +392,10 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor,
// parentDownload. This function does not log progress output because it would
// interfere with the progress reporting for sourceDownload, which has the same
// Key.
func (ldm *LayerDownloadManager) makeDownloadFuncFromDownload(descriptor DownloadDescriptor, sourceDownload *downloadTransfer, parentDownload *downloadTransfer) DoFunc {
return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer {
func (ldm *LayerDownloadManager) makeDownloadFuncFromDownload(descriptor DownloadDescriptor, sourceDownload *downloadTransfer, parentDownload *downloadTransfer) doFunc {
return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) transfer {
d := &downloadTransfer{
Transfer: NewTransfer(),
transfer: newTransfer(),
layerStore: ldm.layerStore,
}
@ -403,10 +409,10 @@ func (ldm *LayerDownloadManager) makeDownloadFuncFromDownload(descriptor Downloa
close(inactive)
select {
case <-d.Transfer.Context().Done():
case <-d.transfer.context().Done():
d.err = errors.New("layer registration cancelled")
return
case <-parentDownload.Done():
case <-parentDownload.done():
}
l, err := parentDownload.result()
@ -420,10 +426,10 @@ func (ldm *LayerDownloadManager) makeDownloadFuncFromDownload(descriptor Downloa
// parentDownload finished, but wait for it explicitly
// to be sure.
select {
case <-d.Transfer.Context().Done():
case <-d.transfer.context().Done():
d.err = errors.New("layer registration cancelled")
return
case <-sourceDownload.Done():
case <-sourceDownload.done():
}
l, err = sourceDownload.result()
@ -453,15 +459,14 @@ func (ldm *LayerDownloadManager) makeDownloadFuncFromDownload(descriptor Downloa
return
}
withRegistered, hasRegistered := descriptor.(DownloadDescriptorWithRegistered)
if hasRegistered {
if withRegistered, ok := descriptor.(DigestRegisterer); ok {
withRegistered.Registered(d.layer.DiffID())
}
// Doesn't actually need to be its own goroutine, but
// done like this so we can defer close(c).
go func() {
<-d.Transfer.Released()
<-d.transfer.released()
if d.layer != nil {
layer.ReleaseAndLog(d.layerStore, d.layer)
}

View file

@ -19,8 +19,8 @@ func (e DoNotRetry) Error() string {
return e.Err.Error()
}
// Watcher is returned by Watch and can be passed to Release to stop watching.
type Watcher struct {
// watcher is returned by Watch and can be passed to Release to stop watching.
type watcher struct {
// signalChan is used to signal to the watcher goroutine that
// new progress information is available, or that the transfer
// has finished.
@ -34,18 +34,18 @@ type Watcher struct {
running chan struct{}
}
// Transfer represents an in-progress transfer.
type Transfer interface {
Watch(progressOutput progress.Output) *Watcher
Release(*Watcher)
Context() context.Context
Close()
Done() <-chan struct{}
Released() <-chan struct{}
Broadcast(mainProgressChan <-chan progress.Progress)
// transfer represents an in-progress transfer.
type transfer interface {
watch(progressOutput progress.Output) *watcher
release(*watcher)
context() context.Context
close()
done() <-chan struct{}
released() <-chan struct{}
broadcast(mainProgressChan <-chan progress.Progress)
}
type transfer struct {
type xfer struct {
mu sync.Mutex
ctx context.Context
@ -53,7 +53,7 @@ type transfer struct {
// watchers keeps track of the goroutines monitoring progress output,
// indexed by the channels that release them.
watchers map[chan struct{}]*Watcher
watchers map[chan struct{}]*watcher
// lastProgress is the most recently received progress event.
lastProgress progress.Progress
@ -62,9 +62,9 @@ type transfer struct {
// running remains open as long as the transfer is in progress.
running chan struct{}
// released stays open until all watchers release the transfer and
// the transfer is no longer tracked by the transfer manager.
released chan struct{}
// releasedChan stays open until all watchers release the transfer and
// the transfer is no longer tracked by the transferManager.
releasedChan chan struct{}
// broadcastDone is true if the main progress channel has closed.
broadcastDone bool
@ -77,12 +77,12 @@ type transfer struct {
broadcastSyncChan chan struct{}
}
// NewTransfer creates a new transfer.
func NewTransfer() Transfer {
t := &transfer{
watchers: make(map[chan struct{}]*Watcher),
// newTransfer creates a new transfer.
func newTransfer() transfer {
t := &xfer{
watchers: make(map[chan struct{}]*watcher),
running: make(chan struct{}),
released: make(chan struct{}),
releasedChan: make(chan struct{}),
broadcastSyncChan: make(chan struct{}),
}
@ -95,7 +95,7 @@ func NewTransfer() Transfer {
}
// Broadcast copies the progress and error output to all viewers.
func (t *transfer) Broadcast(mainProgressChan <-chan progress.Progress) {
func (t *xfer) broadcast(mainProgressChan <-chan progress.Progress) {
for {
var (
p progress.Progress
@ -137,11 +137,11 @@ func (t *transfer) Broadcast(mainProgressChan <-chan progress.Progress) {
// Watch adds a watcher to the transfer. The supplied channel gets progress
// updates and is closed when the transfer finishes.
func (t *transfer) Watch(progressOutput progress.Output) *Watcher {
func (t *xfer) watch(progressOutput progress.Output) *watcher {
t.mu.Lock()
defer t.mu.Unlock()
w := &Watcher{
w := &watcher{
releaseChan: make(chan struct{}),
signalChan: make(chan struct{}),
running: make(chan struct{}),
@ -205,7 +205,7 @@ func (t *transfer) Watch(progressOutput progress.Output) *Watcher {
// to be notified about the progress of the transfer. All calls to Watch must
// be paired with later calls to Release so that the lifecycle of the transfer
// is properly managed.
func (t *transfer) Release(watcher *Watcher) {
func (t *xfer) release(watcher *watcher) {
t.mu.Lock()
delete(t.watchers, watcher.releaseChan)
@ -216,9 +216,9 @@ func (t *transfer) Release(watcher *Watcher) {
// while waiting for a previous watcher goroutine to
// finish.
select {
case <-t.released:
case <-t.releasedChan:
default:
close(t.released)
close(t.releasedChan)
}
} else {
t.cancel()
@ -233,7 +233,7 @@ func (t *transfer) Release(watcher *Watcher) {
// Done returns a channel which is closed if the transfer completes or is
// cancelled. Note that having 0 watchers causes a transfer to be cancelled.
func (t *transfer) Done() <-chan struct{} {
func (t *xfer) done() <-chan struct{} {
// Note that this doesn't return t.ctx.Done() because that channel will
// be closed the moment Cancel is called, and we need to return a
// channel that blocks until a cancellation is actually acknowledged by
@ -242,75 +242,66 @@ func (t *transfer) Done() <-chan struct{} {
}
// Released returns a channel which is closed once all watchers release the
// transfer AND the transfer is no longer tracked by the transfer manager.
func (t *transfer) Released() <-chan struct{} {
return t.released
// transfer AND the transfer is no longer tracked by the transferManager.
func (t *xfer) released() <-chan struct{} {
return t.releasedChan
}
// Context returns the context associated with the transfer.
func (t *transfer) Context() context.Context {
func (t *xfer) context() context.Context {
return t.ctx
}
// Close is called by the transfer manager when the transfer is no longer
// Close is called by the transferManager when the transfer is no longer
// being tracked.
func (t *transfer) Close() {
func (t *xfer) close() {
t.mu.Lock()
t.closed = true
if len(t.watchers) == 0 {
close(t.released)
close(t.releasedChan)
}
t.mu.Unlock()
}
// DoFunc is a function called by the transfer manager to actually perform
// doFunc is a function called by the transferManager to actually perform
// a transfer. It should be non-blocking. It should wait until the start channel
// is closed before transferring any data. If the function closes inactive, that
// signals to the transfer manager that the job is no longer actively moving
// signals to the transferManager that the job is no longer actively moving
// data - for example, it may be waiting for a dependent transfer to finish.
// This prevents it from taking up a slot.
type DoFunc func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer
// TransferManager is used by LayerDownloadManager and LayerUploadManager to
// schedule and deduplicate transfers. It is up to the TransferManager
// implementation to make the scheduling and concurrency decisions.
type TransferManager interface {
// Transfer checks if a transfer with the given key is in progress. If
// so, it returns progress and error output from that transfer.
// Otherwise, it will call xferFunc to initiate the transfer.
Transfer(key string, xferFunc DoFunc, progressOutput progress.Output) (Transfer, *Watcher)
// SetConcurrency set the concurrencyLimit so that it is adjustable daemon reload
SetConcurrency(concurrency int)
}
type doFunc func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) transfer
// transferManager is used by LayerDownloadManager and LayerUploadManager to
// schedule and deduplicate transfers. It is up to the transferManager
// to make the scheduling and concurrency decisions.
type transferManager struct {
mu sync.Mutex
concurrencyLimit int
activeTransfers int
transfers map[string]Transfer
transfers map[string]transfer
waitingTransfers []chan struct{}
}
// NewTransferManager returns a new TransferManager.
func NewTransferManager(concurrencyLimit int) TransferManager {
// newTransferManager returns a new transferManager.
func newTransferManager(concurrencyLimit int) *transferManager {
return &transferManager{
concurrencyLimit: concurrencyLimit,
transfers: make(map[string]Transfer),
transfers: make(map[string]transfer),
}
}
// SetConcurrency sets the concurrencyLimit
func (tm *transferManager) SetConcurrency(concurrency int) {
// setConcurrency sets the concurrencyLimit
func (tm *transferManager) setConcurrency(concurrency int) {
tm.mu.Lock()
tm.concurrencyLimit = concurrency
tm.mu.Unlock()
}
// Transfer checks if a transfer matching the given key is in progress. If not,
// transfer checks if a transfer matching the given key is in progress. If not,
// it starts one by calling xferFunc. The caller supplies a channel which
// receives progress output from the transfer.
func (tm *transferManager) Transfer(key string, xferFunc DoFunc, progressOutput progress.Output) (Transfer, *Watcher) {
func (tm *transferManager) transfer(key string, xferFunc doFunc, progressOutput progress.Output) (transfer, *watcher) {
tm.mu.Lock()
defer tm.mu.Unlock()
@ -319,24 +310,24 @@ func (tm *transferManager) Transfer(key string, xferFunc DoFunc, progressOutput
if !present {
break
}
// Transfer is already in progress.
watcher := xfer.Watch(progressOutput)
// transfer is already in progress.
watcher := xfer.watch(progressOutput)
select {
case <-xfer.Context().Done():
case <-xfer.context().Done():
// We don't want to watch a transfer that has been cancelled.
// Wait for it to be removed from the map and try again.
xfer.Release(watcher)
xfer.release(watcher)
tm.mu.Unlock()
// The goroutine that removes this transfer from the
// map is also waiting for xfer.Done(), so yield to it.
// This could be avoided by adding a Closed method
// to Transfer to allow explicitly waiting for it to be
// to transfer to allow explicitly waiting for it to be
// removed the map, but forcing a scheduling round in
// this very rare case seems better than bloating the
// interface definition.
runtime.Gosched()
<-xfer.Done()
<-xfer.done()
tm.mu.Lock()
default:
return xfer, watcher
@ -355,8 +346,8 @@ func (tm *transferManager) Transfer(key string, xferFunc DoFunc, progressOutput
mainProgressChan := make(chan progress.Progress)
xfer := xferFunc(mainProgressChan, start, inactive)
watcher := xfer.Watch(progressOutput)
go xfer.Broadcast(mainProgressChan)
watcher := xfer.watch(progressOutput)
go xfer.broadcast(mainProgressChan)
tm.transfers[key] = xfer
// When the transfer is finished, remove from the map.
@ -368,14 +359,14 @@ func (tm *transferManager) Transfer(key string, xferFunc DoFunc, progressOutput
tm.inactivate(start)
tm.mu.Unlock()
inactive = nil
case <-xfer.Done():
case <-xfer.done():
tm.mu.Lock()
if inactive != nil {
tm.inactivate(start)
}
delete(tm.transfers, key)
tm.mu.Unlock()
xfer.Close()
xfer.close()
return
}
}

View file

@ -9,15 +9,15 @@ import (
)
func TestTransfer(t *testing.T) {
makeXferFunc := func(id string) DoFunc {
return func(progressChan chan<- progress.Progress, start <-chan struct{}, _ chan<- struct{}) Transfer {
makeXferFunc := func(id string) doFunc {
return func(progressChan chan<- progress.Progress, start <-chan struct{}, _ chan<- struct{}) transfer {
select {
case <-start:
default:
t.Errorf("%s: transfer function not started even though concurrency limit not reached", id)
}
xfer := NewTransfer()
xfer := newTransfer()
go func() {
for i := 0; i <= 10; i++ {
progressChan <- progress.Progress{ID: id, Action: "testing", Current: int64(i), Total: 10}
@ -29,7 +29,7 @@ func TestTransfer(t *testing.T) {
}
}
tm := NewTransferManager(5)
tm := newTransferManager(5)
progressChan := make(chan progress.Progress)
progressDone := make(chan struct{})
receivedProgress := make(map[string]int64)
@ -47,15 +47,15 @@ func TestTransfer(t *testing.T) {
// Start a few transfers
ids := []string{"id1", "id2", "id3"}
xfers := make([]Transfer, len(ids))
watchers := make([]*Watcher, len(ids))
xfers := make([]transfer, len(ids))
watchers := make([]*watcher, len(ids))
for i, id := range ids {
xfers[i], watchers[i] = tm.Transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan))
xfers[i], watchers[i] = tm.transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan))
}
for i, xfer := range xfers {
<-xfer.Done()
xfer.Release(watchers[i])
<-xfer.done()
xfer.release(watchers[i])
}
close(progressChan)
<-progressDone
@ -68,12 +68,12 @@ func TestTransfer(t *testing.T) {
}
func TestConcurrencyLimit(t *testing.T) {
concurrencyLimit := 3
const concurrencyLimit = 3
var runningJobs int32
makeXferFunc := func(id string) DoFunc {
return func(progressChan chan<- progress.Progress, start <-chan struct{}, _ chan<- struct{}) Transfer {
xfer := NewTransfer()
makeXferFunc := func(id string) doFunc {
return func(progressChan chan<- progress.Progress, start <-chan struct{}, _ chan<- struct{}) transfer {
xfer := newTransfer()
go func() {
<-start
totalJobs := atomic.AddInt32(&runningJobs, 1)
@ -91,7 +91,7 @@ func TestConcurrencyLimit(t *testing.T) {
}
}
tm := NewTransferManager(concurrencyLimit)
tm := newTransferManager(concurrencyLimit)
progressChan := make(chan progress.Progress)
progressDone := make(chan struct{})
receivedProgress := make(map[string]int64)
@ -105,15 +105,15 @@ func TestConcurrencyLimit(t *testing.T) {
// Start more transfers than the concurrency limit
ids := []string{"id1", "id2", "id3", "id4", "id5", "id6", "id7", "id8"}
xfers := make([]Transfer, len(ids))
watchers := make([]*Watcher, len(ids))
xfers := make([]transfer, len(ids))
watchers := make([]*watcher, len(ids))
for i, id := range ids {
xfers[i], watchers[i] = tm.Transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan))
xfers[i], watchers[i] = tm.transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan))
}
for i, xfer := range xfers {
<-xfer.Done()
xfer.Release(watchers[i])
<-xfer.done()
xfer.release(watchers[i])
}
close(progressChan)
<-progressDone
@ -126,13 +126,13 @@ func TestConcurrencyLimit(t *testing.T) {
}
func TestInactiveJobs(t *testing.T) {
concurrencyLimit := 3
const concurrencyLimit = 3
var runningJobs int32
testDone := make(chan struct{})
makeXferFunc := func(id string) DoFunc {
return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer {
xfer := NewTransfer()
makeXferFunc := func(id string) doFunc {
return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) transfer {
xfer := newTransfer()
go func() {
<-start
totalJobs := atomic.AddInt32(&runningJobs, 1)
@ -152,7 +152,7 @@ func TestInactiveJobs(t *testing.T) {
}
}
tm := NewTransferManager(concurrencyLimit)
tm := newTransferManager(concurrencyLimit)
progressChan := make(chan progress.Progress)
progressDone := make(chan struct{})
receivedProgress := make(map[string]int64)
@ -166,16 +166,16 @@ func TestInactiveJobs(t *testing.T) {
// Start more transfers than the concurrency limit
ids := []string{"id1", "id2", "id3", "id4", "id5", "id6", "id7", "id8"}
xfers := make([]Transfer, len(ids))
watchers := make([]*Watcher, len(ids))
xfers := make([]transfer, len(ids))
watchers := make([]*watcher, len(ids))
for i, id := range ids {
xfers[i], watchers[i] = tm.Transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan))
xfers[i], watchers[i] = tm.transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan))
}
close(testDone)
for i, xfer := range xfers {
<-xfer.Done()
xfer.Release(watchers[i])
<-xfer.done()
xfer.release(watchers[i])
}
close(progressChan)
<-progressDone
@ -190,9 +190,9 @@ func TestInactiveJobs(t *testing.T) {
func TestWatchRelease(t *testing.T) {
ready := make(chan struct{})
makeXferFunc := func(id string) DoFunc {
return func(progressChan chan<- progress.Progress, start <-chan struct{}, _ chan<- struct{}) Transfer {
xfer := NewTransfer()
makeXferFunc := func(id string) doFunc {
return func(progressChan chan<- progress.Progress, start <-chan struct{}, _ chan<- struct{}) transfer {
xfer := newTransfer()
go func() {
defer func() {
close(progressChan)
@ -201,7 +201,7 @@ func TestWatchRelease(t *testing.T) {
for i := int64(0); ; i++ {
select {
case <-time.After(10 * time.Millisecond):
case <-xfer.Context().Done():
case <-xfer.context().Done():
return
}
progressChan <- progress.Progress{ID: id, Action: "testing", Current: i, Total: 10}
@ -211,10 +211,10 @@ func TestWatchRelease(t *testing.T) {
}
}
tm := NewTransferManager(5)
tm := newTransferManager(5)
type watcherInfo struct {
watcher *Watcher
watcher *watcher
progressChan chan progress.Progress
progressDone chan struct{}
receivedFirstProgress chan struct{}
@ -233,11 +233,11 @@ func TestWatchRelease(t *testing.T) {
// Start a transfer
watchers := make([]watcherInfo, 5)
var xfer Transfer
var xfer transfer
watchers[0].progressChan = make(chan progress.Progress)
watchers[0].progressDone = make(chan struct{})
watchers[0].receivedFirstProgress = make(chan struct{})
xfer, watchers[0].watcher = tm.Transfer("id1", makeXferFunc("id1"), progress.ChanOutput(watchers[0].progressChan))
xfer, watchers[0].watcher = tm.transfer("id1", makeXferFunc("id1"), progress.ChanOutput(watchers[0].progressChan))
go progressConsumer(watchers[0])
// Give it multiple watchers
@ -245,7 +245,7 @@ func TestWatchRelease(t *testing.T) {
watchers[i].progressChan = make(chan progress.Progress)
watchers[i].progressDone = make(chan struct{})
watchers[i].receivedFirstProgress = make(chan struct{})
watchers[i].watcher = xfer.Watch(progress.ChanOutput(watchers[i].progressChan))
watchers[i].watcher = xfer.watch(progress.ChanOutput(watchers[i].progressChan))
go progressConsumer(watchers[i])
}
@ -260,17 +260,17 @@ func TestWatchRelease(t *testing.T) {
// Release one watcher every 5ms
for _, w := range watchers {
xfer.Release(w.watcher)
xfer.release(w.watcher)
<-time.After(5 * time.Millisecond)
}
// Now that all watchers have been released, Released() should
// return a closed channel.
<-xfer.Released()
<-xfer.released()
// Done() should return a closed channel because the xfer func returned
// due to cancellation.
<-xfer.Done()
<-xfer.done()
for _, w := range watchers {
close(w.progressChan)
@ -279,9 +279,9 @@ func TestWatchRelease(t *testing.T) {
}
func TestWatchFinishedTransfer(t *testing.T) {
makeXferFunc := func(id string) DoFunc {
return func(progressChan chan<- progress.Progress, _ <-chan struct{}, _ chan<- struct{}) Transfer {
xfer := NewTransfer()
makeXferFunc := func(id string) doFunc {
return func(progressChan chan<- progress.Progress, _ <-chan struct{}, _ chan<- struct{}) transfer {
xfer := newTransfer()
go func() {
// Finish immediately
close(progressChan)
@ -290,30 +290,30 @@ func TestWatchFinishedTransfer(t *testing.T) {
}
}
tm := NewTransferManager(5)
tm := newTransferManager(5)
// Start a transfer
watchers := make([]*Watcher, 3)
var xfer Transfer
xfer, watchers[0] = tm.Transfer("id1", makeXferFunc("id1"), progress.ChanOutput(make(chan progress.Progress)))
watchers := make([]*watcher, 3)
var xfer transfer
xfer, watchers[0] = tm.transfer("id1", makeXferFunc("id1"), progress.ChanOutput(make(chan progress.Progress)))
// Give it a watcher immediately
watchers[1] = xfer.Watch(progress.ChanOutput(make(chan progress.Progress)))
watchers[1] = xfer.watch(progress.ChanOutput(make(chan progress.Progress)))
// Wait for the transfer to complete
<-xfer.Done()
<-xfer.done()
// Set up another watcher
watchers[2] = xfer.Watch(progress.ChanOutput(make(chan progress.Progress)))
watchers[2] = xfer.watch(progress.ChanOutput(make(chan progress.Progress)))
// Release the watchers
for _, w := range watchers {
xfer.Release(w)
xfer.release(w)
}
// Now that all watchers have been released, Released() should
// return a closed channel.
<-xfer.Released()
<-xfer.released()
}
func TestDuplicateTransfer(t *testing.T) {
@ -321,10 +321,10 @@ func TestDuplicateTransfer(t *testing.T) {
var xferFuncCalls int32
makeXferFunc := func(id string) DoFunc {
return func(progressChan chan<- progress.Progress, _ <-chan struct{}, _ chan<- struct{}) Transfer {
makeXferFunc := func(id string) doFunc {
return func(progressChan chan<- progress.Progress, _ <-chan struct{}, _ chan<- struct{}) transfer {
atomic.AddInt32(&xferFuncCalls, 1)
xfer := NewTransfer()
xfer := newTransfer()
go func() {
defer func() {
close(progressChan)
@ -333,7 +333,7 @@ func TestDuplicateTransfer(t *testing.T) {
for i := int64(0); ; i++ {
select {
case <-time.After(10 * time.Millisecond):
case <-xfer.Context().Done():
case <-xfer.context().Done():
return
}
progressChan <- progress.Progress{ID: id, Action: "testing", Current: i, Total: 10}
@ -343,11 +343,11 @@ func TestDuplicateTransfer(t *testing.T) {
}
}
tm := NewTransferManager(5)
tm := newTransferManager(5)
type transferInfo struct {
xfer Transfer
watcher *Watcher
xfer transfer
watcher *watcher
progressChan chan progress.Progress
progressDone chan struct{}
receivedFirstProgress chan struct{}
@ -371,7 +371,7 @@ func TestDuplicateTransfer(t *testing.T) {
t.progressChan = make(chan progress.Progress)
t.progressDone = make(chan struct{})
t.receivedFirstProgress = make(chan struct{})
t.xfer, t.watcher = tm.Transfer("id1", makeXferFunc("id1"), progress.ChanOutput(t.progressChan))
t.xfer, t.watcher = tm.transfer("id1", makeXferFunc("id1"), progress.ChanOutput(t.progressChan))
go progressConsumer(*t)
}
@ -390,17 +390,17 @@ func TestDuplicateTransfer(t *testing.T) {
// Release one watcher every 5ms
for _, t := range transfers {
t.xfer.Release(t.watcher)
t.xfer.release(t.watcher)
<-time.After(5 * time.Millisecond)
}
for _, t := range transfers {
// Now that all watchers have been released, Released() should
// return a closed channel.
<-t.xfer.Released()
<-t.xfer.released()
// Done() should return a closed channel because the xfer func returned
// due to cancellation.
<-t.xfer.Done()
<-t.xfer.done()
}
for _, t := range transfers {

View file

@ -16,19 +16,19 @@ const maxUploadAttempts = 5
// LayerUploadManager provides task management and progress reporting for
// uploads.
type LayerUploadManager struct {
tm TransferManager
tm *transferManager
waitDuration time.Duration
}
// SetConcurrency sets the max concurrent uploads for each push
func (lum *LayerUploadManager) SetConcurrency(concurrency int) {
lum.tm.SetConcurrency(concurrency)
lum.tm.setConcurrency(concurrency)
}
// NewLayerUploadManager returns a new LayerUploadManager.
func NewLayerUploadManager(concurrencyLimit int, options ...func(*LayerUploadManager)) *LayerUploadManager {
manager := LayerUploadManager{
tm: NewTransferManager(concurrencyLimit),
tm: newTransferManager(concurrencyLimit),
waitDuration: time.Second,
}
for _, option := range options {
@ -38,7 +38,7 @@ func NewLayerUploadManager(concurrencyLimit int, options ...func(*LayerUploadMan
}
type uploadTransfer struct {
Transfer
transfer
remoteDescriptor distribution.Descriptor
err error
@ -79,8 +79,8 @@ func (lum *LayerUploadManager) Upload(ctx context.Context, layers []UploadDescri
}
xferFunc := lum.makeUploadFunc(descriptor)
upload, watcher := lum.tm.Transfer(descriptor.Key(), xferFunc, progressOutput)
defer upload.Release(watcher)
upload, watcher := lum.tm.transfer(descriptor.Key(), xferFunc, progressOutput)
defer upload.release(watcher)
uploads = append(uploads, upload.(*uploadTransfer))
dedupDescriptors[key] = upload.(*uploadTransfer)
}
@ -89,7 +89,7 @@ func (lum *LayerUploadManager) Upload(ctx context.Context, layers []UploadDescri
select {
case <-ctx.Done():
return ctx.Err()
case <-upload.Transfer.Done():
case <-upload.transfer.done():
if upload.err != nil {
return upload.err
}
@ -102,10 +102,10 @@ func (lum *LayerUploadManager) Upload(ctx context.Context, layers []UploadDescri
return nil
}
func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) DoFunc {
return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer {
func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) doFunc {
return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) transfer {
u := &uploadTransfer{
Transfer: NewTransfer(),
transfer: newTransfer(),
}
go func() {
@ -124,7 +124,7 @@ func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) DoFun
retries := 0
for {
remoteDescriptor, err := descriptor.Upload(u.Transfer.Context(), progressOutput)
remoteDescriptor, err := descriptor.Upload(u.transfer.context(), progressOutput)
if err == nil {
u.remoteDescriptor = remoteDescriptor
break
@ -133,7 +133,7 @@ func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) DoFun
// If an error was returned because the context
// was cancelled, we shouldn't retry.
select {
case <-u.Transfer.Context().Done():
case <-u.transfer.context().Done():
u.err = err
return
default:
@ -160,7 +160,7 @@ func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) DoFun
ticker.Stop()
break selectLoop
}
case <-u.Transfer.Context().Done():
case <-u.transfer.context().Done():
ticker.Stop()
u.err = errors.New("upload cancelled during retry delay")
return

View file

@ -69,12 +69,12 @@ func (u *mockUploadDescriptor) Upload(ctx context.Context, progressOutput progre
func uploadDescriptors(currentUploads *int32) []UploadDescriptor {
return []UploadDescriptor{
&mockUploadDescriptor{currentUploads, layer.DiffID("sha256:cbbf2f9a99b47fc460d422812b6a5adff7dfee951d8fa2e4a98caa0382cfbdbf"), 0},
&mockUploadDescriptor{currentUploads, layer.DiffID("sha256:1515325234325236634634608943609283523908626098235490238423902343"), 0},
&mockUploadDescriptor{currentUploads, layer.DiffID("sha256:6929356290463485374960346430698374523437683470934634534953453453"), 0},
&mockUploadDescriptor{currentUploads, layer.DiffID("sha256:cbbf2f9a99b47fc460d422812b6a5adff7dfee951d8fa2e4a98caa0382cfbdbf"), 0},
&mockUploadDescriptor{currentUploads, layer.DiffID("sha256:8159352387436803946235346346368745389534789534897538734598734987"), 1},
&mockUploadDescriptor{currentUploads, layer.DiffID("sha256:4637863963478346897346987346987346789346789364879364897364987346"), 0},
&mockUploadDescriptor{currentUploads: currentUploads, diffID: "sha256:cbbf2f9a99b47fc460d422812b6a5adff7dfee951d8fa2e4a98caa0382cfbdbf"},
&mockUploadDescriptor{currentUploads: currentUploads, diffID: "sha256:1515325234325236634634608943609283523908626098235490238423902343"},
&mockUploadDescriptor{currentUploads: currentUploads, diffID: "sha256:6929356290463485374960346430698374523437683470934634534953453453"},
&mockUploadDescriptor{currentUploads: currentUploads, diffID: "sha256:cbbf2f9a99b47fc460d422812b6a5adff7dfee951d8fa2e4a98caa0382cfbdbf"},
&mockUploadDescriptor{currentUploads: currentUploads, diffID: "sha256:8159352387436803946235346346368745389534789534897538734598734987", simulateRetries: 1},
&mockUploadDescriptor{currentUploads: currentUploads, diffID: "sha256:4637863963478346897346987346987346789346789364879364897364987346"},
}
}