From 3c81dc3103d9c88cb333c80e0810f80ab80c374e Mon Sep 17 00:00:00 2001 From: Drew Erny Date: Tue, 7 Aug 2018 13:09:04 -0500 Subject: [PATCH] Block task starting until node attachments are ready Blocks the execution of tasks during the Prepare phase until there exists an IP address for every overlay network in use by the task. This prevents a task from starting before the NetworkAttachment containing the IP address has been sent down to the node. Includes a basic test for the correct use case. Signed-off-by: Drew Erny --- daemon/cluster/executor/container/adapter.go | 52 +++++++ .../executor/container/adapter_test.go | 139 ++++++++++++++++++ .../cluster/executor/container/controller.go | 23 +++ daemon/network/settings.go | 14 +- 4 files changed, 227 insertions(+), 1 deletion(-) create mode 100644 daemon/cluster/executor/container/adapter_test.go diff --git a/daemon/cluster/executor/container/adapter.go b/daemon/cluster/executor/container/adapter.go index f4b97ce660..720b8447fc 100644 --- a/daemon/cluster/executor/container/adapter.go +++ b/daemon/cluster/executor/container/adapter.go @@ -32,6 +32,9 @@ import ( "golang.org/x/time/rate" ) +// nodeAttachmentReadyInterval is the interval to poll +const nodeAttachmentReadyInterval = 100 * time.Millisecond + // containerAdapter conducts remote operations for a container. All calls // are mostly naked calls to the client API, seeded with information from // containerConfig. @@ -146,6 +149,55 @@ func (c *containerAdapter) pullImage(ctx context.Context) error { return nil } +// waitNodeAttachments validates that NetworkAttachments exist on this node +// for every network in use by this task. It blocks until the network +// attachments are ready, or the context times out. If it returns nil, then the +// node's network attachments are all there. +func (c *containerAdapter) waitNodeAttachments(ctx context.Context) error { + // to do this, we're going to get the attachment store and try getting the + // IP address for each network. if any network comes back not existing, + // we'll wait and try again. + attachmentStore := c.backend.GetAttachmentStore() + if attachmentStore == nil { + return fmt.Errorf("error getting attachment store") + } + + // essentially, we're long-polling here. this is really sub-optimal, but a + // better solution based off signaling channels would require a more + // substantial rearchitecture and probably not be worth our time in terms + // of performance gains. + poll := time.NewTicker(nodeAttachmentReadyInterval) + defer poll.Stop() + for { + // set a flag ready to true. if we try to get a network IP that doesn't + // exist yet, we will set this flag to "false" + ready := true + for _, attachment := range c.container.networksAttachments { + // we only need node attachments (IP address) for overlay networks + // TODO(dperny): unsure if this will work with other network + // drivers, but i also don't think other network drivers use the + // node attachment IP address. + if attachment.Network.DriverState.Name == "overlay" { + if _, exists := attachmentStore.GetIPForNetwork(attachment.Network.ID); !exists { + ready = false + } + } + } + + // if everything is ready here, then we can just return no error + if ready { + return nil + } + + // otherwise, try polling again, or wait for context canceled. + select { + case <-ctx.Done(): + return fmt.Errorf("node is missing network attachments, ip addresses may be exhausted") + case <-poll.C: + } + } +} + func (c *containerAdapter) createNetworks(ctx context.Context) error { for name := range c.container.networksAttachments { ncr, err := c.container.networkCreateRequest(name) diff --git a/daemon/cluster/executor/container/adapter_test.go b/daemon/cluster/executor/container/adapter_test.go new file mode 100644 index 0000000000..c4ef2affbb --- /dev/null +++ b/daemon/cluster/executor/container/adapter_test.go @@ -0,0 +1,139 @@ +package container // import "github.com/docker/docker/daemon/cluster/executor/container" + +import ( + "testing" + + "context" + "time" + + "github.com/docker/docker/daemon" + "github.com/docker/swarmkit/api" +) + +// TestWaitNodeAttachment tests that the waitNodeAttachment method successfully +// blocks until the required node attachment becomes available. +func TestWaitNodeAttachment(t *testing.T) { + emptyDaemon := &daemon.Daemon{} + + // the daemon creates an attachment store as an object, which means it's + // initialized to an empty store by default. get that attachment store here + // and add some attachments to it + attachmentStore := emptyDaemon.GetAttachmentStore() + + // create a set of attachments to put into the attahcment store + attachments := map[string]string{ + "network1": "10.1.2.3/24", + } + + // this shouldn't fail, but check it anyway just in case + err := attachmentStore.ResetAttachments(attachments) + if err != nil { + t.Fatalf("error resetting attachments: %v", err) + } + + // create a containerConfig to put in the adapter. we don't need the task, + // actually; only the networkAttachments are needed. + container := &containerConfig{ + task: nil, + networksAttachments: map[string]*api.NetworkAttachment{ + // network1 is already present in the attachment store. + "network1": { + Network: &api.Network{ + ID: "network1", + DriverState: &api.Driver{ + Name: "overlay", + }, + }, + }, + // network2 is not yet present in the attachment store, and we + // should block while waiting for it. + "network2": { + Network: &api.Network{ + ID: "network2", + DriverState: &api.Driver{ + Name: "overlay", + }, + }, + }, + // localnetwork is not and will never be in the attachment store, + // but we should not block on it, because it is not an overlay + // network + "localnetwork": { + Network: &api.Network{ + ID: "localnetwork", + DriverState: &api.Driver{ + Name: "bridge", + }, + }, + }, + }, + } + + // we don't create an adapter using the newContainerAdapter package, + // because it does a bunch of checks and validations. instead, create one + // "from scratch" so we only have the fields we need. + adapter := &containerAdapter{ + backend: emptyDaemon, + container: container, + } + + // create a context to do call the method with + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // create a channel to allow the goroutine that we run the method call in + // to signal that it's done. + doneChan := make(chan struct{}) + + // store the error return value of waitNodeAttachments in this variable + var waitNodeAttachmentsErr error + // NOTE(dperny): be careful running goroutines in test code. if a test + // terminates with ie t.Fatalf or a failed requirement, runtime.Goexit gets + // called, which does run defers but does not clean up child goroutines. + // we defer canceling the context here, which should stop this goroutine + // from leaking + go func() { + waitNodeAttachmentsErr = adapter.waitNodeAttachments(ctx) + // signal that we've completed + close(doneChan) + }() + + // wait 200ms to allow the waitNodeAttachments call to spin for a bit + time.Sleep(200 * time.Millisecond) + select { + case <-doneChan: + if waitNodeAttachmentsErr == nil { + t.Fatalf("waitNodeAttachments exited early with no error") + } else { + t.Fatalf( + "waitNodeAttachments exited early with an error: %v", + waitNodeAttachmentsErr, + ) + } + default: + // allow falling through; this is the desired case + } + + // now update the node attachments to include another network attachment + attachments["network2"] = "10.3.4.5/24" + err = attachmentStore.ResetAttachments(attachments) + if err != nil { + t.Fatalf("error resetting attachments: %v", err) + } + + // now wait 200 ms for waitNodeAttachments to pick up the change + time.Sleep(200 * time.Millisecond) + + // and check that waitNodeAttachments has exited with no error + select { + case <-doneChan: + if waitNodeAttachmentsErr != nil { + t.Fatalf( + "waitNodeAttachments returned an error: %v", + waitNodeAttachmentsErr, + ) + } + default: + t.Fatalf("waitNodeAttachments did not exit yet, but should have") + } +} diff --git a/daemon/cluster/executor/container/controller.go b/daemon/cluster/executor/container/controller.go index bcd426e73d..8d070799f3 100644 --- a/daemon/cluster/executor/container/controller.go +++ b/daemon/cluster/executor/container/controller.go @@ -23,6 +23,10 @@ import ( const defaultGossipConvergeDelay = 2 * time.Second +// waitNodeAttachmentsTimeout defines the total period of time we should wait +// for node attachments to be ready before giving up on starting a task +const waitNodeAttachmentsTimeout = 30 * time.Second + // controller implements agent.Controller against docker's API. // // Most operations against docker's API are done through the container name, @@ -98,6 +102,25 @@ func (r *controller) Prepare(ctx context.Context) error { return err } + // Before we create networks, we need to make sure that the node has all of + // the network attachments that the task needs. This will block until that + // is the case or the context has expired. + // NOTE(dperny): Prepare doesn't time out on its own (that is, the context + // passed in does not expire after any period of time), which means if the + // node attachment never arrives (for example, if the network's IP address + // space is exhausted), then the tasks on the node will park in PREPARING + // forever (or until the node dies). To avoid this case, we create a new + // context with a fixed deadline, and give up. In normal operation, a node + // update with the node IP address should come in hot on the tail of the + // task being assigned to the node, and this should exit on the order of + // milliseconds, but to be extra conservative we'll give it 30 seconds to + // time out before giving up. + waitNodeAttachmentsContext, waitCancel := context.WithTimeout(ctx, waitNodeAttachmentsTimeout) + defer waitCancel() + if err := r.adapter.waitNodeAttachments(waitNodeAttachmentsContext); err != nil { + return err + } + // Make sure all the networks that the task needs are created. if err := r.adapter.createNetworks(ctx); err != nil { return err diff --git a/daemon/network/settings.go b/daemon/network/settings.go index b0460ed6ae..7696d40201 100644 --- a/daemon/network/settings.go +++ b/daemon/network/settings.go @@ -2,6 +2,7 @@ package network // import "github.com/docker/docker/daemon/network" import ( "net" + "sync" networktypes "github.com/docker/docker/api/types/network" clustertypes "github.com/docker/docker/daemon/cluster/provider" @@ -37,6 +38,7 @@ type EndpointSettings struct { // AttachmentStore stores the load balancer IP address for a network id. type AttachmentStore struct { + sync.Mutex //key: networkd id //value: load balancer ip address networkToNodeLBIP map[string]net.IP @@ -45,7 +47,9 @@ type AttachmentStore struct { // ResetAttachments clears any existing load balancer IP to network mapping and // sets the mapping to the given attachments. func (store *AttachmentStore) ResetAttachments(attachments map[string]string) error { - store.ClearAttachments() + store.Lock() + defer store.Unlock() + store.clearAttachments() for nid, nodeIP := range attachments { ip, _, err := net.ParseCIDR(nodeIP) if err != nil { @@ -59,11 +63,19 @@ func (store *AttachmentStore) ResetAttachments(attachments map[string]string) er // ClearAttachments clears all the mappings of network to load balancer IP Address. func (store *AttachmentStore) ClearAttachments() { + store.Lock() + defer store.Unlock() + store.clearAttachments() +} + +func (store *AttachmentStore) clearAttachments() { store.networkToNodeLBIP = make(map[string]net.IP) } // GetIPForNetwork return the load balancer IP address for the given network. func (store *AttachmentStore) GetIPForNetwork(networkID string) (net.IP, bool) { + store.Lock() + defer store.Unlock() ip, exists := store.networkToNodeLBIP[networkID] return ip, exists }