daemon: make config reloading more transactional

Config reloading has interleaved validations and other fallible
operations with mutating the live daemon configuration. The daemon
configuration could be left in a partially-reloaded state if any of the
operations returns an error. Mutating a copy of the configuration and
atomically swapping the config struct on success is not currently an
option as config values are not copyable due to the presence of
sync.Mutex fields. Introduce a two-phase commit protocol to defer any
mutations of the daemon state until after all fallible operations have
succeeded.

Reload transactions are not yet entirely hermetic. The platform
reloading logic for custom runtimes on *nix could still leave the
directory of generated runtime wrapper scripts in an indeterminate state
if an error is encountered.

Signed-off-by: Cory Snider <csnider@mirantis.com>
This commit is contained in:
Cory Snider 2022-08-17 13:33:21 -04:00
parent 038449467e
commit 742ac6e275
6 changed files with 139 additions and 176 deletions

View file

@ -397,18 +397,21 @@ func newRouterOptions(ctx context.Context, config *config.Config, d *daemon.Daem
func (cli *DaemonCli) reloadConfig() {
reload := func(c *config.Config) {
// Revalidate and reload the authorization plugins
if err := validateAuthzPlugins(c.AuthorizationPlugins, cli.d.PluginStore); err != nil {
logrus.Fatalf("Error validating authorization plugin: %v", err)
return
}
cli.authzMiddleware.SetPlugins(c.AuthorizationPlugins)
if err := cli.d.Reload(c); err != nil {
logrus.Errorf("Error reconfiguring the daemon: %v", err)
return
}
// Apply our own configuration only after the daemon reload has succeeded. We
// don't want to partially apply the config if the daemon is unhappy with it.
cli.authzMiddleware.SetPlugins(c.AuthorizationPlugins)
if c.IsValueSet("debug") {
debugEnabled := debug.IsEnabled()
switch {

View file

@ -9,8 +9,9 @@ import (
"github.com/sirupsen/logrus"
)
// Reload reads configuration changes and modifies the
// daemon according to those changes.
// Reload modifies the live daemon configuration from conf.
// conf is assumed to be a validated configuration.
//
// These are the settings that Reload changes:
// - Platform runtime
// - Daemon debug log level
@ -48,31 +49,52 @@ func (daemon *Daemon) Reload(conf *config.Config) (err error) {
}
}()
if err := daemon.reloadPlatform(conf, attributes); err != nil {
return err
// Ideally reloading should be transactional: the reload either completes
// successfully, or the daemon config and state are left untouched. We use a
// simplified two-phase commit protocol to achieve this. Any fallible reload
// operation is split into two phases. The first phase performs all the fallible
// operations without mutating daemon state and returns a closure: its second
// phase. The second phase applies the changes to the daemon state. If any
// first-phase returns an error, the reload transaction is "rolled back" by
// discarding the second-phase closures.
type TxnCommitter = func(attributes map[string]string)
var txns []TxnCommitter
for _, prepare := range []func(*config.Config) (TxnCommitter, error){
daemon.reloadPlatform,
daemon.reloadRegistryConfig,
} {
commit, err := prepare(conf)
if err != nil {
return err
}
txns = append(txns, commit)
}
daemon.reloadDebug(conf, attributes)
daemon.reloadMaxConcurrentDownloadsAndUploads(conf, attributes)
daemon.reloadMaxDownloadAttempts(conf, attributes)
daemon.reloadShutdownTimeout(conf, attributes)
daemon.reloadFeatures(conf, attributes)
daemon.reloadLabels(conf, attributes)
daemon.reloadLiveRestore(conf, attributes)
daemon.reloadNetworkDiagnosticPort(conf, attributes)
if err := daemon.reloadLabels(conf, attributes); err != nil {
return err
for _, tx := range txns {
tx(attributes)
}
if err := daemon.reloadAllowNondistributableArtifacts(conf, attributes); err != nil {
return err
return nil
}
func marshalAttributeSlice(v []string) string {
if v == nil {
return "[]"
}
if err := daemon.reloadRegistryMirrors(conf, attributes); err != nil {
return err
b, err := json.Marshal(v)
if err != nil {
panic(err) // Should never happen as the input type is fixed.
}
if err := daemon.reloadInsecureRegistries(conf, attributes); err != nil {
return err
}
if err := daemon.reloadLiveRestore(conf, attributes); err != nil {
return err
}
return daemon.reloadNetworkDiagnosticPort(conf, attributes)
return string(b)
}
// reloadDebug updates configuration with Debug option
@ -142,104 +164,50 @@ func (daemon *Daemon) reloadShutdownTimeout(conf *config.Config, attributes map[
// reloadLabels updates configuration with engine labels
// and updates the passed attributes
func (daemon *Daemon) reloadLabels(conf *config.Config, attributes map[string]string) error {
func (daemon *Daemon) reloadLabels(conf *config.Config, attributes map[string]string) {
// update corresponding configuration
if conf.IsValueSet("labels") {
daemon.configStore.Labels = conf.Labels
}
// prepare reload event attributes with updatable configurations
if daemon.configStore.Labels != nil {
labels, err := json.Marshal(daemon.configStore.Labels)
if err != nil {
return err
}
attributes["labels"] = string(labels)
} else {
attributes["labels"] = "[]"
}
return nil
attributes["labels"] = marshalAttributeSlice(daemon.configStore.Labels)
}
// reloadAllowNondistributableArtifacts updates the configuration with allow-nondistributable-artifacts options
// reloadRegistryConfig updates the configuration with registry options
// and updates the passed attributes.
func (daemon *Daemon) reloadAllowNondistributableArtifacts(conf *config.Config, attributes map[string]string) error {
func (daemon *Daemon) reloadRegistryConfig(conf *config.Config) (func(map[string]string), error) {
// Update corresponding configuration.
opts := daemon.configStore.ServiceOptions
if conf.IsValueSet("allow-nondistributable-artifacts") {
daemon.configStore.AllowNondistributableArtifacts = conf.AllowNondistributableArtifacts
if err := daemon.registryService.LoadAllowNondistributableArtifacts(conf.AllowNondistributableArtifacts); err != nil {
return err
}
opts.AllowNondistributableArtifacts = conf.AllowNondistributableArtifacts
}
// Prepare reload event attributes with updatable configurations.
if daemon.configStore.AllowNondistributableArtifacts != nil {
v, err := json.Marshal(daemon.configStore.AllowNondistributableArtifacts)
if err != nil {
return err
}
attributes["allow-nondistributable-artifacts"] = string(v)
} else {
attributes["allow-nondistributable-artifacts"] = "[]"
}
return nil
}
// reloadInsecureRegistries updates configuration with insecure registry option
// and updates the passed attributes
func (daemon *Daemon) reloadInsecureRegistries(conf *config.Config, attributes map[string]string) error {
// update corresponding configuration
if conf.IsValueSet("insecure-registries") {
daemon.configStore.InsecureRegistries = conf.InsecureRegistries
if err := daemon.registryService.LoadInsecureRegistries(conf.InsecureRegistries); err != nil {
return err
}
opts.InsecureRegistries = conf.InsecureRegistries
}
// prepare reload event attributes with updatable configurations
if daemon.configStore.InsecureRegistries != nil {
insecureRegistries, err := json.Marshal(daemon.configStore.InsecureRegistries)
if err != nil {
return err
}
attributes["insecure-registries"] = string(insecureRegistries)
} else {
attributes["insecure-registries"] = "[]"
}
return nil
}
// reloadRegistryMirrors updates configuration with registry mirror options
// and updates the passed attributes
func (daemon *Daemon) reloadRegistryMirrors(conf *config.Config, attributes map[string]string) error {
// update corresponding configuration
if conf.IsValueSet("registry-mirrors") {
daemon.configStore.Mirrors = conf.Mirrors
if err := daemon.registryService.LoadMirrors(conf.Mirrors); err != nil {
return err
}
opts.Mirrors = conf.Mirrors
}
// prepare reload event attributes with updatable configurations
if daemon.configStore.Mirrors != nil {
mirrors, err := json.Marshal(daemon.configStore.Mirrors)
if err != nil {
return err
}
attributes["registry-mirrors"] = string(mirrors)
} else {
attributes["registry-mirrors"] = "[]"
commit, err := daemon.registryService.ReplaceConfig(opts)
if err != nil {
return nil, err
}
return nil
return func(attributes map[string]string) {
commit()
daemon.configStore.ServiceOptions = opts
// Prepare reload event attributes with updatable configurations.
attributes["allow-nondistributable-artifacts"] = marshalAttributeSlice(daemon.configStore.AllowNondistributableArtifacts)
attributes["insecure-registries"] = marshalAttributeSlice(daemon.configStore.InsecureRegistries)
attributes["registry-mirrors"] = marshalAttributeSlice(daemon.configStore.Mirrors)
}, nil
}
// reloadLiveRestore updates configuration with live restore option
// and updates the passed attributes
func (daemon *Daemon) reloadLiveRestore(conf *config.Config, attributes map[string]string) error {
func (daemon *Daemon) reloadLiveRestore(conf *config.Config, attributes map[string]string) {
// update corresponding configuration
if conf.IsValueSet("live-restore") {
daemon.configStore.LiveRestoreEnabled = conf.LiveRestoreEnabled
@ -247,24 +215,21 @@ func (daemon *Daemon) reloadLiveRestore(conf *config.Config, attributes map[stri
// prepare reload event attributes with updatable configurations
attributes["live-restore"] = strconv.FormatBool(daemon.configStore.LiveRestoreEnabled)
return nil
}
// reloadNetworkDiagnosticPort updates the network controller starting the diagnostic if the config is valid
func (daemon *Daemon) reloadNetworkDiagnosticPort(conf *config.Config, attributes map[string]string) error {
func (daemon *Daemon) reloadNetworkDiagnosticPort(conf *config.Config, attributes map[string]string) {
if conf == nil || daemon.netController == nil || !conf.IsValueSet("network-diagnostic-port") ||
conf.NetworkDiagnosticPort < 1 || conf.NetworkDiagnosticPort > 65535 {
// If there is no config make sure that the diagnostic is off
if daemon.netController != nil {
daemon.netController.StopDiagnostic()
}
return nil
return
}
// Enable the network diagnostic if the flag is set with a valid port within the range
logrus.WithFields(logrus.Fields{"port": conf.NetworkDiagnosticPort, "ip": "127.0.0.1"}).Warn("Starting network diagnostic server")
daemon.netController.StartDiagnostic(conf.NetworkDiagnosticPort)
return nil
}
// reloadFeatures updates configuration with enabled/disabled features

View file

@ -19,15 +19,24 @@ func muteLogs() {
logrus.SetLevel(logrus.ErrorLevel)
}
func TestDaemonReloadLabels(t *testing.T) {
func newDaemonForReloadT(t *testing.T, cfg *config.Config) *Daemon {
t.Helper()
daemon := &Daemon{
configStore: &config.Config{
CommonConfig: config.CommonConfig{
Labels: []string{"foo:bar"},
},
},
configStore: cfg,
imageService: images.NewImageService(images.ImageServiceConfig{}),
}
var err error
daemon.registryService, err = registry.NewService(registry.ServiceOptions{})
assert.Assert(t, err)
return daemon
}
func TestDaemonReloadLabels(t *testing.T) {
daemon := newDaemonForReloadT(t, &config.Config{
CommonConfig: config.CommonConfig{
Labels: []string{"foo:bar"},
},
})
muteLogs()
valuesSets := make(map[string]interface{})
@ -50,10 +59,7 @@ func TestDaemonReloadLabels(t *testing.T) {
}
func TestDaemonReloadAllowNondistributableArtifacts(t *testing.T) {
daemon := &Daemon{
configStore: &config.Config{},
imageService: images.NewImageService(images.ImageServiceConfig{}),
}
daemon := newDaemonForReloadT(t, &config.Config{})
muteLogs()
var err error
@ -308,17 +314,13 @@ func TestDaemonReloadInsecureRegistries(t *testing.T) {
}
func TestDaemonReloadNotAffectOthers(t *testing.T) {
daemon := &Daemon{
imageService: images.NewImageService(images.ImageServiceConfig{}),
}
muteLogs()
daemon.configStore = &config.Config{
daemon := newDaemonForReloadT(t, &config.Config{
CommonConfig: config.CommonConfig{
Labels: []string{"foo:bar"},
Debug: true,
},
}
})
muteLogs()
valuesSets := make(map[string]interface{})
valuesSets["labels"] = "foo:baz"
@ -347,10 +349,7 @@ func TestDaemonReloadNetworkDiagnosticPort(t *testing.T) {
if os.Getuid() != 0 {
t.Skip("root required")
}
daemon := &Daemon{
imageService: images.NewImageService(images.ImageServiceConfig{}),
configStore: &config.Config{},
}
daemon := newDaemonForReloadT(t, &config.Config{})
enableConfig := &config.Config{
CommonConfig: config.CommonConfig{

View file

@ -12,50 +12,56 @@ import (
// reloadPlatform updates configuration with platform specific options
// and updates the passed attributes
func (daemon *Daemon) reloadPlatform(conf *config.Config, attributes map[string]string) error {
if err := conf.ValidatePlatformConfig(); err != nil {
return err
}
func (daemon *Daemon) reloadPlatform(conf *config.Config) (func(attributes map[string]string), error) {
var txns []func()
if conf.IsValueSet("runtimes") {
// Always set the default one
conf.Runtimes[config.StockRuntimeName] = types.Runtime{Path: config.DefaultRuntimeBinary}
if err := daemon.initRuntimes(conf.Runtimes); err != nil {
return err
return nil, err
}
daemon.configStore.Runtimes = conf.Runtimes
txns = append(txns, func() {
daemon.configStore.Runtimes = conf.Runtimes
})
}
if conf.DefaultRuntime != "" {
daemon.configStore.DefaultRuntime = conf.DefaultRuntime
txns = append(txns, func() {
daemon.configStore.DefaultRuntime = conf.DefaultRuntime
})
}
if conf.IsValueSet("default-shm-size") {
daemon.configStore.ShmSize = conf.ShmSize
}
if conf.CgroupNamespaceMode != "" {
daemon.configStore.CgroupNamespaceMode = conf.CgroupNamespaceMode
}
if conf.IpcMode != "" {
daemon.configStore.IpcMode = conf.IpcMode
}
// Update attributes
var runtimeList bytes.Buffer
for name, rt := range daemon.configStore.Runtimes {
if runtimeList.Len() > 0 {
runtimeList.WriteRune(' ')
return func(attributes map[string]string) {
for _, commit := range txns {
commit()
}
runtimeList.WriteString(name + ":" + rt.Path)
}
attributes["runtimes"] = runtimeList.String()
attributes["default-runtime"] = daemon.configStore.DefaultRuntime
attributes["default-shm-size"] = strconv.FormatInt(int64(daemon.configStore.ShmSize), 10)
attributes["default-ipc-mode"] = daemon.configStore.IpcMode
attributes["default-cgroupns-mode"] = daemon.configStore.CgroupNamespaceMode
if conf.IsValueSet("default-shm-size") {
daemon.configStore.ShmSize = conf.ShmSize
}
return nil
if conf.CgroupNamespaceMode != "" {
daemon.configStore.CgroupNamespaceMode = conf.CgroupNamespaceMode
}
if conf.IpcMode != "" {
daemon.configStore.IpcMode = conf.IpcMode
}
// Update attributes
var runtimeList bytes.Buffer
for name, rt := range daemon.configStore.Runtimes {
if runtimeList.Len() > 0 {
runtimeList.WriteRune(' ')
}
runtimeList.WriteString(name + ":" + rt.Path)
}
attributes["runtimes"] = runtimeList.String()
attributes["default-runtime"] = daemon.configStore.DefaultRuntime
attributes["default-shm-size"] = strconv.FormatInt(int64(daemon.configStore.ShmSize), 10)
attributes["default-ipc-mode"] = daemon.configStore.IpcMode
attributes["default-cgroupns-mode"] = daemon.configStore.CgroupNamespaceMode
}, nil
}

View file

@ -4,6 +4,6 @@ import "github.com/docker/docker/daemon/config"
// reloadPlatform updates configuration with platform specific options
// and updates the passed attributes
func (daemon *Daemon) reloadPlatform(config *config.Config, attributes map[string]string) error {
return nil
func (daemon *Daemon) reloadPlatform(conf *config.Config) (func(attributes map[string]string), error) {
return func(map[string]string) {}, nil
}

View file

@ -35,28 +35,18 @@ func (s *Service) ServiceConfig() *registry.ServiceConfig {
return s.config.copy()
}
// LoadAllowNondistributableArtifacts loads allow-nondistributable-artifacts registries for Service.
func (s *Service) LoadAllowNondistributableArtifacts(registries []string) error {
s.mu.Lock()
defer s.mu.Unlock()
return s.config.loadAllowNondistributableArtifacts(registries)
}
// LoadMirrors loads registry mirrors for Service
func (s *Service) LoadMirrors(mirrors []string) error {
s.mu.Lock()
defer s.mu.Unlock()
return s.config.loadMirrors(mirrors)
}
// LoadInsecureRegistries loads insecure registries for Service
func (s *Service) LoadInsecureRegistries(registries []string) error {
s.mu.Lock()
defer s.mu.Unlock()
return s.config.loadInsecureRegistries(registries)
// ReplaceConfig prepares a transaction which will atomically replace the
// registry service's configuration when the returned commit function is called.
func (s *Service) ReplaceConfig(options ServiceOptions) (commit func(), err error) {
config, err := newServiceConfig(options)
if err != nil {
return nil, err
}
return func() {
s.mu.Lock()
defer s.mu.Unlock()
s.config = config
}, nil
}
// Auth contacts the public registry with the provided credentials,