Enabling ILB/ELB on windows using per-node, per-network LB endpoint.

Signed-off-by: Pradip Dhara <pradipd@microsoft.com>
This commit is contained in:
Pradip Dhara 2017-08-28 23:49:26 -07:00
parent 0300fa7f80
commit 9bed0883e7
7 changed files with 284 additions and 36 deletions

View file

@ -15,6 +15,7 @@ import (
swarmtypes "github.com/docker/docker/api/types/swarm"
containerpkg "github.com/docker/docker/container"
clustertypes "github.com/docker/docker/daemon/cluster/provider"
networkSettings "github.com/docker/docker/daemon/network"
"github.com/docker/docker/plugin"
"github.com/docker/libnetwork"
"github.com/docker/libnetwork/cluster"
@ -61,4 +62,5 @@ type Backend interface {
LookupImage(name string) (*types.ImageInspect, error)
PluginManager() *plugin.Manager
PluginGetter() *plugin.Store
GetLBAttachmentStore() *networkSettings.LBAttachmentStore
}

View file

@ -136,23 +136,32 @@ func (e *executor) Describe(ctx context.Context) (*api.NodeDescription, error) {
}
func (e *executor) Configure(ctx context.Context, node *api.Node) error {
na := node.Attachment
if na == nil {
var ingressNA *api.NetworkAttachment
lbAttachments := make(map[string]string)
for _, na := range node.LbAttachments {
if na.Network.Spec.Ingress {
ingressNA = na
}
lbAttachments[na.Network.ID] = na.Addresses[0]
}
if ingressNA == nil {
e.backend.ReleaseIngress()
return nil
return e.backend.GetLBAttachmentStore().ResetLBAttachments(lbAttachments)
}
options := types.NetworkCreate{
Driver: na.Network.DriverState.Name,
Driver: ingressNA.Network.DriverState.Name,
IPAM: &network.IPAM{
Driver: na.Network.IPAM.Driver.Name,
Driver: ingressNA.Network.IPAM.Driver.Name,
},
Options: na.Network.DriverState.Options,
Options: ingressNA.Network.DriverState.Options,
Ingress: true,
CheckDuplicate: true,
}
for _, ic := range na.Network.IPAM.Configs {
for _, ic := range ingressNA.Network.IPAM.Configs {
c := network.IPAMConfig{
Subnet: ic.Subnet,
IPRange: ic.Range,
@ -162,14 +171,17 @@ func (e *executor) Configure(ctx context.Context, node *api.Node) error {
}
_, err := e.backend.SetupIngress(clustertypes.NetworkCreateRequest{
ID: na.Network.ID,
ID: ingressNA.Network.ID,
NetworkCreateRequest: types.NetworkCreateRequest{
Name: na.Network.Spec.Annotations.Name,
Name: ingressNA.Network.Spec.Annotations.Name,
NetworkCreate: options,
},
}, na.Addresses[0])
}, ingressNA.Addresses[0])
if err != nil {
return err
}
return err
return e.backend.GetLBAttachmentStore().ResetLBAttachments(lbAttachments)
}
// Controller returns a docker container runner.

View file

@ -28,6 +28,7 @@ import (
"github.com/docker/docker/daemon/events"
"github.com/docker/docker/daemon/exec"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/daemon/network"
"github.com/sirupsen/logrus"
// register graph drivers
_ "github.com/docker/docker/daemon/graphdriver/register"
@ -121,6 +122,8 @@ type Daemon struct {
pruneRunning int32
hosts map[string]bool // hosts stores the addresses the daemon is listening on
startupDone chan struct{}
lbAttachmentStore network.LBAttachmentStore
}
// StoreHosts stores the addresses the daemon is listening on
@ -488,6 +491,8 @@ func (daemon *Daemon) DaemonLeavesCluster() {
} else {
logrus.Warnf("failed to initiate ingress network removal: %v", err)
}
daemon.lbAttachmentStore.ClearLBAttachments()
}
// setClusterProvider sets a component for querying the current cluster state.
@ -1242,3 +1247,8 @@ func fixMemorySwappiness(resources *containertypes.Resources) {
resources.MemorySwappiness = nil
}
}
// GetLBAttachmentStore returns current load balancer store associated with the daemon
func (daemon *Daemon) GetLBAttachmentStore() *network.LBAttachmentStore {
return &daemon.lbAttachmentStore
}

View file

@ -182,27 +182,8 @@ func (daemon *Daemon) setupIngress(create *clustertypes.NetworkCreateRequest, ip
logrus.Errorf("Failed getting ingress network by id after creating: %v", err)
}
sb, err := controller.NewSandbox("ingress-sbox", libnetwork.OptionIngress())
if err != nil {
if _, ok := err.(networktypes.ForbiddenError); !ok {
logrus.Errorf("Failed creating ingress sandbox: %v", err)
}
return
}
ep, err := n.CreateEndpoint("ingress-endpoint", libnetwork.CreateOptionIpam(ip, nil, nil, nil))
if err != nil {
logrus.Errorf("Failed creating ingress endpoint: %v", err)
return
}
if err := ep.Join(sb, nil); err != nil {
logrus.Errorf("Failed joining ingress sandbox to ingress endpoint: %v", err)
return
}
if err := sb.EnableService(); err != nil {
logrus.Errorf("Failed enabling service for ingress sandbox")
if err = daemon.createLoadBalancerSandbox("ingress", create.ID, ip, n, libnetwork.OptionIngress()); err != nil {
logrus.Errorf("Failed creating load balancer sandbox for ingress network: %v", err)
}
}
@ -283,6 +264,34 @@ func (daemon *Daemon) CreateNetwork(create types.NetworkCreateRequest) (*types.N
return resp, err
}
func (daemon *Daemon) createLoadBalancerSandbox(prefix, id string, ip net.IP, n libnetwork.Network, options ...libnetwork.SandboxOption) error {
c := daemon.netController
sandboxName := prefix + "-sbox"
sb, err := c.NewSandbox(sandboxName, options...)
if err != nil {
if _, ok := err.(networktypes.ForbiddenError); !ok {
return errors.Wrapf(err, "Failed creating %s sandbox", sandboxName)
}
return nil
}
endpointName := prefix + "-endpoint"
ep, err := n.CreateEndpoint(endpointName, libnetwork.CreateOptionIpam(ip, nil, nil, nil), libnetwork.CreateOptionLoadBalancer())
if err != nil {
return errors.Wrapf(err, "Failed creating %s in sandbox %s", endpointName, sandboxName)
}
if err := ep.Join(sb, nil); err != nil {
return errors.Wrapf(err, "Failed joining %s to sandbox %s", endpointName, sandboxName)
}
if err := sb.EnableService(); err != nil {
return errors.Wrapf(err, "Failed enabling service in %s sandbox", sandboxName)
}
return nil
}
func (daemon *Daemon) createNetwork(create types.NetworkCreateRequest, id string, agent bool) (*types.NetworkCreateResponse, error) {
if runconfig.IsPreDefinedNetwork(create.Name) && !agent {
err := fmt.Errorf("%s is a pre-defined network and cannot be created", create.Name)
@ -360,6 +369,18 @@ func (daemon *Daemon) createNetwork(create types.NetworkCreateRequest, id string
}
daemon.LogNetworkEvent(n, "create")
if agent && !n.Info().Ingress() && n.Type() == "overlay" {
nodeIP, exists := daemon.GetLBAttachmentStore().GetLBIPForNetwork(id)
if !exists {
return nil, fmt.Errorf("Failed to find a load balancer IP to use for network: %v", id)
}
if err := daemon.createLoadBalancerSandbox(create.Name, id, nodeIP, n); err != nil {
return nil, err
}
}
return &types.NetworkCreateResponse{
ID: n.ID(),
Warning: warning,
@ -496,6 +517,31 @@ func (daemon *Daemon) DeleteNetwork(networkID string) error {
return daemon.deleteNetwork(networkID, false)
}
func (daemon *Daemon) deleteLoadBalancerSandbox(n libnetwork.Network) {
controller := daemon.netController
//The only endpoint left should be the LB endpoint (nw.Name() + "-endpoint")
endpoints := n.Endpoints()
if len(endpoints) == 1 {
sandboxName := n.Name() + "-sbox"
if err := endpoints[0].Info().Sandbox().DisableService(); err != nil {
logrus.Errorf("Failed to disable service on sandbox %s: %v", sandboxName, err)
//Ignore error and attempt to delete the load balancer endpoint
}
if err := endpoints[0].Delete(true); err != nil {
logrus.Errorf("Failed to delete endpoint %s (%s) in %s: %v", endpoints[0].Name(), endpoints[0].ID(), sandboxName, err)
//Ignore error and attempt to delete the sandbox.
}
if err := controller.SandboxDestroy(sandboxName); err != nil {
logrus.Errorf("Failed to delete %s sandbox: %v", sandboxName, err)
//Ignore error and attempt to delete the network.
}
}
}
func (daemon *Daemon) deleteNetwork(networkID string, dynamic bool) error {
nw, err := daemon.FindNetwork(networkID)
if err != nil {
@ -517,6 +563,10 @@ func (daemon *Daemon) deleteNetwork(networkID string, dynamic bool) error {
return notAllowedError{err}
}
if !nw.Info().Ingress() && nw.Type() == "overlay" {
daemon.deleteLoadBalancerSandbox(nw)
}
if err := nw.Delete(); err != nil {
return err
}

View file

@ -1,9 +1,12 @@
package network
import (
"net"
networktypes "github.com/docker/docker/api/types/network"
clustertypes "github.com/docker/docker/daemon/cluster/provider"
"github.com/docker/go-connections/nat"
"github.com/pkg/errors"
)
// Settings stores configuration details about the daemon network config
@ -31,3 +34,36 @@ type EndpointSettings struct {
*networktypes.EndpointSettings
IPAMOperational bool
}
// LBAttachmentStore stores the load balancer IP address for a network id.
type LBAttachmentStore struct {
//key: networkd id
//value: load balancer ip address
networkToNodeLBIP map[string]net.IP
}
// ResetLBAttachments clears any exsiting load balancer IP to network mapping and
// sets the mapping to the given lbAttachments.
func (lbStore *LBAttachmentStore) ResetLBAttachments(lbAttachments map[string]string) error {
lbStore.ClearLBAttachments()
for nid, nodeIP := range lbAttachments {
ip, _, err := net.ParseCIDR(nodeIP)
if err != nil {
lbStore.networkToNodeLBIP = make(map[string]net.IP)
return errors.Wrapf(err, "Failed to parse load balancer address %s", nodeIP)
}
lbStore.networkToNodeLBIP[nid] = ip
}
return nil
}
// ClearLBAttachments clears all the mappings of network to load balancer IP Address.
func (lbStore *LBAttachmentStore) ClearLBAttachments() {
lbStore.networkToNodeLBIP = make(map[string]net.IP)
}
// GetLBIPForNetwork return the load balancer IP address for the given network.
func (lbStore *LBAttachmentStore) GetLBIPForNetwork(networkID string) (net.IP, bool) {
ip, exists := lbStore.networkToNodeLBIP[networkID]
return ip, exists
}

View file

@ -19,13 +19,21 @@ import (
func pruneNetworkAndVerify(c *check.C, d *daemon.Swarm, kept, pruned []string) {
_, err := d.Cmd("network", "prune", "--force")
c.Assert(err, checker.IsNil)
out, err := d.Cmd("network", "ls", "--format", "{{.Name}}")
c.Assert(err, checker.IsNil)
for _, s := range kept {
c.Assert(out, checker.Contains, s)
waitAndAssert(c, defaultReconciliationTimeout, func(*check.C) (interface{}, check.CommentInterface) {
out, err := d.Cmd("network", "ls", "--format", "{{.Name}}")
c.Assert(err, checker.IsNil)
return out, nil
}, checker.Contains, s)
}
for _, s := range pruned {
c.Assert(out, checker.Not(checker.Contains), s)
waitAndAssert(c, defaultReconciliationTimeout, func(*check.C) (interface{}, check.CommentInterface) {
out, err := d.Cmd("network", "ls", "--format", "{{.Name}}")
c.Assert(err, checker.IsNil)
return out, nil
}, checker.Not(checker.Contains), s)
}
}
@ -64,6 +72,7 @@ func (s *DockerSwarmSuite) TestPruneNetwork(c *check.C) {
_, err = d.Cmd("service", "rm", serviceName)
c.Assert(err, checker.IsNil)
waitAndAssert(c, defaultReconciliationTimeout, d.CheckActiveContainerCount, checker.Equals, 0)
pruneNetworkAndVerify(c, d, []string{}, []string{"n1", "n3"})
}

View file

@ -0,0 +1,129 @@
package service
import (
"testing"
"time"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/client"
"github.com/docker/docker/integration-cli/request"
"github.com/gotestyourself/gotestyourself/poll"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
)
func TestCreateWithLBSandbox(t *testing.T) {
defer setupTest(t)()
d := newSwarm(t)
defer d.Stop(t)
client, err := request.NewClientForHost(d.Sock())
require.NoError(t, err)
overlayName := "overlay1"
networkCreate := types.NetworkCreate{
CheckDuplicate: true,
Driver: "overlay",
}
netResp, err := client.NetworkCreate(context.Background(), overlayName, networkCreate)
require.NoError(t, err)
overlayID := netResp.ID
var instances uint64 = 1
serviceSpec := swarmServiceSpec("TestService", instances)
serviceSpec.TaskTemplate.Networks = append(serviceSpec.TaskTemplate.Networks, swarm.NetworkAttachmentConfig{Target: overlayName})
serviceResp, err := client.ServiceCreate(context.Background(), serviceSpec, types.ServiceCreateOptions{
QueryRegistry: false,
})
require.NoError(t, err)
serviceID := serviceResp.ID
poll.WaitOn(t, serviceRunningTasksCount(client, serviceID, instances))
_, _, err = client.ServiceInspectWithRaw(context.Background(), serviceID, types.ServiceInspectOptions{})
require.NoError(t, err)
network, err := client.NetworkInspect(context.Background(), overlayID, types.NetworkInspectOptions{})
require.NoError(t, err)
assert.Contains(t, network.Containers, overlayName+"-sbox")
err = client.ServiceRemove(context.Background(), serviceID)
require.NoError(t, err)
poll.WaitOn(t, serviceIsRemoved(client, serviceID))
err = client.NetworkRemove(context.Background(), overlayID)
require.NoError(t, err)
poll.WaitOn(t, networkIsRemoved(client, overlayID), poll.WithTimeout(1*time.Minute), poll.WithDelay(10*time.Second))
}
func swarmServiceSpec(name string, replicas uint64) swarm.ServiceSpec {
return swarm.ServiceSpec{
Annotations: swarm.Annotations{
Name: name,
},
TaskTemplate: swarm.TaskSpec{
ContainerSpec: &swarm.ContainerSpec{
Image: "busybox:latest",
Command: []string{"/bin/top"},
},
},
Mode: swarm.ServiceMode{
Replicated: &swarm.ReplicatedService{
Replicas: &replicas,
},
},
}
}
func serviceRunningTasksCount(client client.ServiceAPIClient, serviceID string, instances uint64) func(log poll.LogT) poll.Result {
return func(log poll.LogT) poll.Result {
filter := filters.NewArgs()
filter.Add("service", serviceID)
tasks, err := client.TaskList(context.Background(), types.TaskListOptions{
Filters: filter,
})
switch {
case err != nil:
return poll.Error(err)
case len(tasks) == int(instances):
for _, task := range tasks {
if task.Status.State != swarm.TaskStateRunning {
return poll.Continue("waiting for tasks to enter run state")
}
}
return poll.Success()
default:
return poll.Continue("task count at %d waiting for %d", len(tasks), instances)
}
}
}
func serviceIsRemoved(client client.ServiceAPIClient, serviceID string) func(log poll.LogT) poll.Result {
return func(log poll.LogT) poll.Result {
filter := filters.NewArgs()
filter.Add("service", serviceID)
_, err := client.TaskList(context.Background(), types.TaskListOptions{
Filters: filter,
})
if err == nil {
return poll.Continue("waiting for service %s to be deleted", serviceID)
}
return poll.Success()
}
}
func networkIsRemoved(client client.NetworkAPIClient, networkID string) func(log poll.LogT) poll.Result {
return func(log poll.LogT) poll.Result {
_, err := client.NetworkInspect(context.Background(), networkID, types.NetworkInspectOptions{})
if err == nil {
return poll.Continue("waiting for network %s to be removed", networkID)
}
return poll.Success()
}
}