Merge pull request #39949 from thaJeztah/carry_39413

Adding ability to change max download attempts (carry 39413)
This commit is contained in:
Sebastiaan van Stijn 2019-09-24 18:22:12 +02:00 committed by GitHub
commit 30c5ec4365
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 216 additions and 46 deletions

View file

@ -20,7 +20,7 @@ const (
// installCommonConfigFlags adds flags to the pflag.FlagSet to configure the daemon
func installCommonConfigFlags(conf *config.Config, flags *pflag.FlagSet) error {
var maxConcurrentDownloads, maxConcurrentUploads int
var maxConcurrentDownloads, maxConcurrentUploads, maxDownloadAttempts int
defaultPidFile, err := getDefaultPidFile()
if err != nil {
return err
@ -73,6 +73,7 @@ func installCommonConfigFlags(conf *config.Config, flags *pflag.FlagSet) error {
flags.StringVar(&conf.CorsHeaders, "api-cors-header", "", "Set CORS headers in the Engine API")
flags.IntVar(&maxConcurrentDownloads, "max-concurrent-downloads", config.DefaultMaxConcurrentDownloads, "Set the max concurrent downloads for each pull")
flags.IntVar(&maxConcurrentUploads, "max-concurrent-uploads", config.DefaultMaxConcurrentUploads, "Set the max concurrent uploads for each push")
flags.IntVar(&maxDownloadAttempts, "max-download-attempts", config.DefaultDownloadAttempts, "Set the max download attempts for each pull")
flags.IntVar(&conf.ShutdownTimeout, "shutdown-timeout", defaultShutdownTimeout, "Set the default shutdown timeout")
flags.IntVar(&conf.NetworkDiagnosticPort, "network-diagnostic-port", 0, "TCP port number of the network diagnostic server")
_ = flags.MarkHidden("network-diagnostic-port")
@ -87,6 +88,7 @@ func installCommonConfigFlags(conf *config.Config, flags *pflag.FlagSet) error {
conf.MaxConcurrentDownloads = &maxConcurrentDownloads
conf.MaxConcurrentUploads = &maxConcurrentUploads
conf.MaxDownloadAttempts = &maxDownloadAttempts
flags.StringVar(&conf.ContainerdNamespace, "containerd-namespace", daemon.ContainersNamespace, "Containerd namespace to use")
if err := flags.MarkHidden("containerd-namespace"); err != nil {

View file

@ -31,6 +31,10 @@ const (
// maximum number of uploads that
// may take place at a time for each push.
DefaultMaxConcurrentUploads = 5
// DefaultDownloadAttempts is the default value for
// maximum number of attempts that
// may take place at a time for each pull when the connection is lost.
DefaultDownloadAttempts = 5
// StockRuntimeName is the reserved name/alias used to represent the
// OCI runtime being shipped with the docker daemon package.
StockRuntimeName = "runc"
@ -172,6 +176,10 @@ type CommonConfig struct {
// may take place at a time for each push.
MaxConcurrentUploads *int `json:"max-concurrent-uploads,omitempty"`
// MaxDownloadAttempts is the maximum number of attempts that
// may take place at a time for each push.
MaxDownloadAttempts *int `json:"max-download-attempts,omitempty"`
// ShutdownTimeout is the timeout value (in seconds) the daemon will wait for the container
// to stop when daemon is being shutdown
ShutdownTimeout int `json:"shutdown-timeout,omitempty"`
@ -534,7 +542,7 @@ func findConfigurationConflicts(config map[string]interface{}, flags *pflag.Flag
// Validate validates some specific configs.
// such as config.DNS, config.Labels, config.DNSSearch,
// as well as config.MaxConcurrentDownloads, config.MaxConcurrentUploads.
// as well as config.MaxConcurrentDownloads, config.MaxConcurrentUploads and config.MaxDownloadAttempts.
func Validate(config *Config) error {
// validate DNS
for _, dns := range config.DNS {
@ -564,6 +572,9 @@ func Validate(config *Config) error {
if config.MaxConcurrentUploads != nil && *config.MaxConcurrentUploads < 0 {
return fmt.Errorf("invalid max concurrent uploads: %d", *config.MaxConcurrentUploads)
}
if err := ValidateMaxDownloadAttempts(config); err != nil {
return err
}
// validate that "default" runtime is not reset
if runtimes := config.GetAllRuntimes(); len(runtimes) > 0 {
@ -587,6 +598,14 @@ func Validate(config *Config) error {
return config.ValidatePlatformConfig()
}
// ValidateMaxDownloadAttempts validates if the max-download-attempts is within the valid range
func ValidateMaxDownloadAttempts(config *Config) error {
if config.MaxDownloadAttempts != nil && *config.MaxDownloadAttempts <= 0 {
return fmt.Errorf("invalid max download attempts: %d", *config.MaxDownloadAttempts)
}
return nil
}
// ModifiedDiscoverySettings returns whether the discovery configuration has been modified or not.
func ModifiedDiscoverySettings(config *Config, backendType, advertise string, clusterOpts map[string]string) bool {
if config.ClusterStore != backendType || config.ClusterAdvertise != advertise {

View file

@ -223,25 +223,33 @@ func TestValidateReservedNamespaceLabels(t *testing.T) {
}
func TestValidateConfigurationErrors(t *testing.T) {
minusNumber := -10
intPtr := func(i int) *int { return &i }
testCases := []struct {
config *Config
name string
config *Config
expectedErr string
}{
{
name: "single label without value",
config: &Config{
CommonConfig: CommonConfig{
Labels: []string{"one"},
},
},
expectedErr: "bad attribute format: one",
},
{
name: "multiple label without value",
config: &Config{
CommonConfig: CommonConfig{
Labels: []string{"foo=bar", "one"},
},
},
expectedErr: "bad attribute format: one",
},
{
name: "single DNS, invalid IP-address",
config: &Config{
CommonConfig: CommonConfig{
DNSConfig: DNSConfig{
@ -249,8 +257,10 @@ func TestValidateConfigurationErrors(t *testing.T) {
},
},
},
expectedErr: "1.1.1.1o is not an ip address",
},
{
name: "multiple DNS, invalid IP-address",
config: &Config{
CommonConfig: CommonConfig{
DNSConfig: DNSConfig{
@ -258,8 +268,10 @@ func TestValidateConfigurationErrors(t *testing.T) {
},
},
},
expectedErr: "1.1.1.1o is not an ip address",
},
{
name: "single DNSSearch",
config: &Config{
CommonConfig: CommonConfig{
DNSConfig: DNSConfig{
@ -267,8 +279,10 @@ func TestValidateConfigurationErrors(t *testing.T) {
},
},
},
expectedErr: "123456 is not a valid domain",
},
{
name: "multiple DNSSearch",
config: &Config{
CommonConfig: CommonConfig{
DNSConfig: DNSConfig{
@ -276,58 +290,80 @@ func TestValidateConfigurationErrors(t *testing.T) {
},
},
},
expectedErr: "123456 is not a valid domain",
},
{
name: "negative max-concurrent-downloads",
config: &Config{
CommonConfig: CommonConfig{
MaxConcurrentDownloads: &minusNumber,
// This is weird...
ValuesSet: map[string]interface{}{
"max-concurrent-downloads": -1,
},
MaxConcurrentDownloads: intPtr(-10),
},
},
expectedErr: "invalid max concurrent downloads: -10",
},
{
name: "negative max-concurrent-uploads",
config: &Config{
CommonConfig: CommonConfig{
MaxConcurrentUploads: &minusNumber,
// This is weird...
ValuesSet: map[string]interface{}{
"max-concurrent-uploads": -1,
},
MaxConcurrentUploads: intPtr(-10),
},
},
expectedErr: "invalid max concurrent uploads: -10",
},
{
name: "negative max-download-attempts",
config: &Config{
CommonConfig: CommonConfig{
MaxDownloadAttempts: intPtr(-10),
},
},
expectedErr: "invalid max download attempts: -10",
},
{
name: "zero max-download-attempts",
config: &Config{
CommonConfig: CommonConfig{
MaxDownloadAttempts: intPtr(0),
},
},
expectedErr: "invalid max download attempts: 0",
},
{
name: "generic resource without =",
config: &Config{
CommonConfig: CommonConfig{
NodeGenericResources: []string{"foo"},
},
},
expectedErr: "could not parse GenericResource: incorrect term foo, missing '=' or malformed expression",
},
{
name: "generic resource mixed named and discrete",
config: &Config{
CommonConfig: CommonConfig{
NodeGenericResources: []string{"foo=bar", "foo=1"},
},
},
expectedErr: "could not parse GenericResource: mixed discrete and named resources in expression 'foo=[bar 1]'",
},
}
for _, tc := range testCases {
err := Validate(tc.config)
if err == nil {
t.Fatalf("expected error, got nil for config %v", tc.config)
}
t.Run(tc.name, func(t *testing.T) {
err := Validate(tc.config)
assert.Error(t, err, tc.expectedErr)
})
}
}
func TestValidateConfiguration(t *testing.T) {
minusNumber := 4
intPtr := func(i int) *int { return &i }
testCases := []struct {
name string
config *Config
}{
{
name: "with label",
config: &Config{
CommonConfig: CommonConfig{
Labels: []string{"one=two"},
@ -335,6 +371,7 @@ func TestValidateConfiguration(t *testing.T) {
},
},
{
name: "with dns",
config: &Config{
CommonConfig: CommonConfig{
DNSConfig: DNSConfig{
@ -344,6 +381,7 @@ func TestValidateConfiguration(t *testing.T) {
},
},
{
name: "with dns-search",
config: &Config{
CommonConfig: CommonConfig{
DNSConfig: DNSConfig{
@ -353,28 +391,31 @@ func TestValidateConfiguration(t *testing.T) {
},
},
{
name: "with max-concurrent-downloads",
config: &Config{
CommonConfig: CommonConfig{
MaxConcurrentDownloads: &minusNumber,
// This is weird...
ValuesSet: map[string]interface{}{
"max-concurrent-downloads": -1,
},
MaxConcurrentDownloads: intPtr(4),
},
},
},
{
name: "with max-concurrent-uploads",
config: &Config{
CommonConfig: CommonConfig{
MaxConcurrentUploads: &minusNumber,
// This is weird...
ValuesSet: map[string]interface{}{
"max-concurrent-uploads": -1,
},
MaxConcurrentUploads: intPtr(4),
},
},
},
{
name: "with max-download-attempts",
config: &Config{
CommonConfig: CommonConfig{
MaxDownloadAttempts: intPtr(4),
},
},
},
{
name: "with multiple node generic resources",
config: &Config{
CommonConfig: CommonConfig{
NodeGenericResources: []string{"foo=bar", "foo=baz"},
@ -382,6 +423,7 @@ func TestValidateConfiguration(t *testing.T) {
},
},
{
name: "with node generic resources",
config: &Config{
CommonConfig: CommonConfig{
NodeGenericResources: []string{"foo=1"},
@ -390,10 +432,10 @@ func TestValidateConfiguration(t *testing.T) {
},
}
for _, tc := range testCases {
err := Validate(tc.config)
if err != nil {
t.Fatalf("expected no error, got error %v", err)
}
t.Run(tc.name, func(t *testing.T) {
err := Validate(tc.config)
assert.NilError(t, err)
})
}
}

View file

@ -1048,6 +1048,7 @@ func NewDaemon(ctx context.Context, config *config.Config, pluginStore *plugin.S
LayerStores: layerStores,
MaxConcurrentDownloads: *config.MaxConcurrentDownloads,
MaxConcurrentUploads: *config.MaxConcurrentUploads,
MaxDownloadAttempts: *config.MaxDownloadAttempts,
ReferenceStore: rs,
RegistryService: registryService,
TrustKey: trustKey,

View file

@ -38,6 +38,7 @@ type ImageServiceConfig struct {
LayerStores map[string]layer.Store
MaxConcurrentDownloads int
MaxConcurrentUploads int
MaxDownloadAttempts int
ReferenceStore dockerreference.Store
RegistryService registry.Service
TrustKey libtrust.PrivateKey
@ -47,10 +48,11 @@ type ImageServiceConfig struct {
func NewImageService(config ImageServiceConfig) *ImageService {
logrus.Debugf("Max Concurrent Downloads: %d", config.MaxConcurrentDownloads)
logrus.Debugf("Max Concurrent Uploads: %d", config.MaxConcurrentUploads)
logrus.Debugf("Max Download Attempts: %d", config.MaxDownloadAttempts)
return &ImageService{
containers: config.ContainerStore,
distributionMetadataStore: config.DistributionMetadataStore,
downloadManager: xfer.NewLayerDownloadManager(config.LayerStores, config.MaxConcurrentDownloads),
downloadManager: xfer.NewLayerDownloadManager(config.LayerStores, config.MaxConcurrentDownloads, xfer.WithMaxDownloadAttempts(config.MaxDownloadAttempts)),
eventsService: config.EventsService,
imageStore: config.ImageStore,
layerStores: config.LayerStores,

View file

@ -16,6 +16,7 @@ import (
// - Daemon debug log level
// - Daemon max concurrent downloads
// - Daemon max concurrent uploads
// - Daemon max download attempts
// - Daemon shutdown timeout (in seconds)
// - Cluster discovery (reconfigure and restart)
// - Daemon labels
@ -44,6 +45,9 @@ func (daemon *Daemon) Reload(conf *config.Config) (err error) {
}
daemon.reloadDebug(conf, attributes)
daemon.reloadMaxConcurrentDownloadsAndUploads(conf, attributes)
if err := daemon.reloadMaxDownloadAttempts(conf, attributes); err != nil {
return err
}
daemon.reloadShutdownTimeout(conf, attributes)
daemon.reloadFeatures(conf, attributes)
@ -110,6 +114,27 @@ func (daemon *Daemon) reloadMaxConcurrentDownloadsAndUploads(conf *config.Config
attributes["max-concurrent-uploads"] = fmt.Sprintf("%d", *daemon.configStore.MaxConcurrentUploads)
}
// reloadMaxDownloadAttempts updates configuration with max concurrent
// download attempts when a connection is lost and updates the passed attributes
func (daemon *Daemon) reloadMaxDownloadAttempts(conf *config.Config, attributes map[string]string) error {
if err := config.ValidateMaxDownloadAttempts(conf); err != nil {
return err
}
// If no value is set for max-download-attempts we assume it is the default value
// We always "reset" as the cost is lightweight and easy to maintain.
maxDownloadAttempts := config.DefaultDownloadAttempts
if conf.IsValueSet("max-download-attempts") && conf.MaxDownloadAttempts != nil {
maxDownloadAttempts = *conf.MaxDownloadAttempts
}
daemon.configStore.MaxDownloadAttempts = &maxDownloadAttempts
logrus.Debugf("Reset Max Download Attempts: %d", *daemon.configStore.MaxDownloadAttempts)
// prepare reload event attributes with updatable configurations
attributes["max-download-attempts"] = fmt.Sprintf("%d", *daemon.configStore.MaxDownloadAttempts)
return nil
}
// reloadShutdownTimeout updates configuration with daemon shutdown timeout option
// and updates the passed attributes
func (daemon *Daemon) reloadShutdownTimeout(conf *config.Config, attributes map[string]string) {

View file

@ -24,9 +24,10 @@ const maxDownloadAttempts = 5
// registers and downloads those, taking into account dependencies between
// layers.
type LayerDownloadManager struct {
layerStores map[string]layer.Store
tm TransferManager
waitDuration time.Duration
layerStores map[string]layer.Store
tm TransferManager
waitDuration time.Duration
maxDownloadAttempts int
}
// SetConcurrency sets the max concurrent downloads for each pull
@ -37,9 +38,10 @@ func (ldm *LayerDownloadManager) SetConcurrency(concurrency int) {
// NewLayerDownloadManager returns a new LayerDownloadManager.
func NewLayerDownloadManager(layerStores map[string]layer.Store, concurrencyLimit int, options ...func(*LayerDownloadManager)) *LayerDownloadManager {
manager := LayerDownloadManager{
layerStores: layerStores,
tm: NewTransferManager(concurrencyLimit),
waitDuration: time.Second,
layerStores: layerStores,
tm: NewTransferManager(concurrencyLimit),
waitDuration: time.Second,
maxDownloadAttempts: maxDownloadAttempts,
}
for _, option := range options {
option(&manager)
@ -47,6 +49,14 @@ func NewLayerDownloadManager(layerStores map[string]layer.Store, concurrencyLimi
return &manager
}
// WithMaxDownloadAttempts configures the maximum number of download
// attempts for a download manager.
func WithMaxDownloadAttempts(max int) func(*LayerDownloadManager) {
return func(dlm *LayerDownloadManager) {
dlm.maxDownloadAttempts = max
}
}
type downloadTransfer struct {
Transfer
@ -283,13 +293,13 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor,
}
retries++
if _, isDNR := err.(DoNotRetry); isDNR || retries == maxDownloadAttempts {
logrus.Errorf("Download failed: %v", err)
if _, isDNR := err.(DoNotRetry); isDNR || retries > ldm.maxDownloadAttempts {
logrus.Errorf("Download failed after %d attempts: %v", retries, err)
d.err = err
return
}
logrus.Errorf("Download failed, retrying: %v", err)
logrus.Infof("Download failed, retrying (%d/%d): %v", retries, ldm.maxDownloadAttempts, err)
delay := retries * 5
ticker := time.NewTicker(ldm.waitDuration)

View file

@ -17,6 +17,7 @@ import (
"github.com/docker/docker/layer"
"github.com/docker/docker/pkg/progress"
digest "github.com/opencontainers/go-digest"
"gotest.tools/assert"
)
const maxDownloadConcurrency = 3
@ -160,6 +161,7 @@ type mockDownloadDescriptor struct {
registeredDiffID layer.DiffID
expectedDiffID layer.DiffID
simulateRetries int
retries int
}
// Key returns the key used to deduplicate downloads.
@ -213,9 +215,9 @@ func (d *mockDownloadDescriptor) Download(ctx context.Context, progressOutput pr
}
}
if d.simulateRetries != 0 {
d.simulateRetries--
return nil, 0, errors.New("simulating retry")
if d.retries < d.simulateRetries {
d.retries++
return nil, 0, fmt.Errorf("simulating download attempt %d/%d", d.retries, d.simulateRetries)
}
return d.mockTarStream(), 0, nil
@ -359,3 +361,70 @@ func TestCancelledDownload(t *testing.T) {
close(progressChan)
<-progressDone
}
func TestMaxDownloadAttempts(t *testing.T) {
tests := []struct {
name string
simulateRetries int
maxDownloadAttempts int
expectedErr string
}{
{
name: "max-attempts=5, succeed at 2nd attempt",
simulateRetries: 2,
maxDownloadAttempts: 5,
},
{
name: "max-attempts=5, succeed at 5th attempt",
simulateRetries: 5,
maxDownloadAttempts: 5,
},
{
name: "max-attempts=5, fail at 6th attempt",
simulateRetries: 6,
maxDownloadAttempts: 5,
expectedErr: "simulating download attempt 5/6",
},
{
name: "max-attempts=0, fail after 1 attempt",
simulateRetries: 1,
maxDownloadAttempts: 0,
expectedErr: "simulating download attempt 1/1",
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
layerStore := &mockLayerStore{make(map[layer.ChainID]*mockLayer)}
lsMap := make(map[string]layer.Store)
lsMap[runtime.GOOS] = layerStore
ldm := NewLayerDownloadManager(
lsMap,
maxDownloadConcurrency,
func(m *LayerDownloadManager) {
m.waitDuration = time.Millisecond
m.maxDownloadAttempts = tc.maxDownloadAttempts
})
progressChan := make(chan progress.Progress)
progressDone := make(chan struct{})
go func() {
for range progressChan {
}
close(progressDone)
}()
var currentDownloads int32
descriptors := downloadDescriptors(&currentDownloads)
descriptors[4].(*mockDownloadDescriptor).simulateRetries = tc.simulateRetries
_, _, err := ldm.Download(context.Background(), *image.NewRootFS(), runtime.GOOS, descriptors, progress.ChanOutput(progressChan))
if tc.expectedErr == "" {
assert.NilError(t, err)
} else {
assert.Error(t, err, tc.expectedErr)
}
})
}
}