moby/distribution/xfer/upload.go
Yong Tang 7368e41c07 Docker pull/push with max concurrency limits.
This fix tries to address issues raised in #20936 and #22443
where `docker pull` or `docker push` fails because of the
concurrent connection failing.
Currently, the number of maximum concurrent connections is
controlled by `maxDownloadConcurrency` and `maxUploadConcurrency`
which are hardcoded to 3 and 5 respectively. Therefore, in
situations where network connections don't support multiple
downloads/uploads, failures may encounter for `docker push`
or `docker pull`.

This fix tries changes `maxDownloadConcurrency` and
`maxUploadConcurrency` to adjustable by passing
`--max-concurrent-uploads` and `--max-concurrent-downloads` to
`docker daemon` command.

The documentation related to docker daemon has been updated.

Additional test case have been added to cover the changes in this fix.

This fix fixes #20936. This fix fixes #22443.

Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
2016-05-11 19:44:54 -07:00

168 lines
4.3 KiB
Go

package xfer
import (
"errors"
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/distribution"
"github.com/docker/docker/layer"
"github.com/docker/docker/pkg/progress"
"golang.org/x/net/context"
)
const maxUploadAttempts = 5
// LayerUploadManager provides task management and progress reporting for
// uploads.
type LayerUploadManager struct {
tm TransferManager
}
// SetConcurrency set the max concurrent uploads for each push
func (lum *LayerUploadManager) SetConcurrency(concurrency int) {
lum.tm.SetConcurrency(concurrency)
}
// NewLayerUploadManager returns a new LayerUploadManager.
func NewLayerUploadManager(concurrencyLimit int) *LayerUploadManager {
return &LayerUploadManager{
tm: NewTransferManager(concurrencyLimit),
}
}
type uploadTransfer struct {
Transfer
remoteDescriptor distribution.Descriptor
err error
}
// An UploadDescriptor references a layer that may need to be uploaded.
type UploadDescriptor interface {
// Key returns the key used to deduplicate uploads.
Key() string
// ID returns the ID for display purposes.
ID() string
// DiffID should return the DiffID for this layer.
DiffID() layer.DiffID
// Upload is called to perform the Upload.
Upload(ctx context.Context, progressOutput progress.Output) (distribution.Descriptor, error)
// SetRemoteDescriptor provides the distribution.Descriptor that was
// returned by Upload. This descriptor is not to be confused with
// the UploadDescriptor interface, which is used for internally
// identifying layers that are being uploaded.
SetRemoteDescriptor(descriptor distribution.Descriptor)
}
// Upload is a blocking function which ensures the listed layers are present on
// the remote registry. It uses the string returned by the Key method to
// deduplicate uploads.
func (lum *LayerUploadManager) Upload(ctx context.Context, layers []UploadDescriptor, progressOutput progress.Output) error {
var (
uploads []*uploadTransfer
dedupDescriptors = make(map[string]*uploadTransfer)
)
for _, descriptor := range layers {
progress.Update(progressOutput, descriptor.ID(), "Preparing")
key := descriptor.Key()
if _, present := dedupDescriptors[key]; present {
continue
}
xferFunc := lum.makeUploadFunc(descriptor)
upload, watcher := lum.tm.Transfer(descriptor.Key(), xferFunc, progressOutput)
defer upload.Release(watcher)
uploads = append(uploads, upload.(*uploadTransfer))
dedupDescriptors[key] = upload.(*uploadTransfer)
}
for _, upload := range uploads {
select {
case <-ctx.Done():
return ctx.Err()
case <-upload.Transfer.Done():
if upload.err != nil {
return upload.err
}
}
}
for _, l := range layers {
l.SetRemoteDescriptor(dedupDescriptors[l.Key()].remoteDescriptor)
}
return nil
}
func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) DoFunc {
return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer {
u := &uploadTransfer{
Transfer: NewTransfer(),
}
go func() {
defer func() {
close(progressChan)
}()
progressOutput := progress.ChanOutput(progressChan)
select {
case <-start:
default:
progress.Update(progressOutput, descriptor.ID(), "Waiting")
<-start
}
retries := 0
for {
remoteDescriptor, err := descriptor.Upload(u.Transfer.Context(), progressOutput)
if err == nil {
u.remoteDescriptor = remoteDescriptor
break
}
// If an error was returned because the context
// was cancelled, we shouldn't retry.
select {
case <-u.Transfer.Context().Done():
u.err = err
return
default:
}
retries++
if _, isDNR := err.(DoNotRetry); isDNR || retries == maxUploadAttempts {
logrus.Errorf("Upload failed: %v", err)
u.err = err
return
}
logrus.Errorf("Upload failed, retrying: %v", err)
delay := retries * 5
ticker := time.NewTicker(time.Second)
selectLoop:
for {
progress.Updatef(progressOutput, descriptor.ID(), "Retrying in %d second%s", delay, (map[bool]string{true: "s"})[delay != 1])
select {
case <-ticker.C:
delay--
if delay == 0 {
ticker.Stop()
break selectLoop
}
case <-u.Transfer.Context().Done():
ticker.Stop()
u.err = errors.New("upload cancelled during retry delay")
return
}
}
}
}()
return u
}
}