Explorar o código

Set east-west load balancing to use direct routing

Modify the loadbalancing for east-west traffic to use direct routing
rather than NAT and update tasks to use direct service return under
linux.  This avoids hiding the source address of the sender and improves
the performance in single-client/single-server tests.

Signed-off-by: Chris Telfer <ctelfer@docker.com>
Chris Telfer %!s(int64=6) %!d(string=hai) anos
pai
achega
9a2464f436

+ 20 - 0
libnetwork/ipvs/constants.go

@@ -145,3 +145,23 @@ const (
 	// addresses.
 	// addresses.
 	SourceHashing = "sh"
 	SourceHashing = "sh"
 )
 )
+
+const (
+	// ConnFwdMask is a mask for the fwd methods
+	ConnFwdMask = 0x0007
+
+	// ConnFwdMasq denotes forwarding via masquerading/NAT
+	ConnFwdMasq = 0x0000
+
+	// ConnFwdLocalNode denotes forwarding to a local node
+	ConnFwdLocalNode = 0x0001
+
+	// ConnFwdTunnel denotes forwarding via a tunnel
+	ConnFwdTunnel = 0x0002
+
+	// ConnFwdDirectRoute denotes forwarding via direct routing
+	ConnFwdDirectRoute = 0x0003
+
+	// ConnFwdBypass denotes forwarding while bypassing the cache
+	ConnFwdBypass = 0x0004
+)

+ 30 - 0
libnetwork/osl/namespace_linux.go

@@ -384,6 +384,36 @@ func (n *networkNamespace) RemoveAliasIP(ifName string, ip *net.IPNet) error {
 	return n.nlHandle.AddrDel(iface, &netlink.Addr{IPNet: ip})
 	return n.nlHandle.AddrDel(iface, &netlink.Addr{IPNet: ip})
 }
 }
 
 
+func (n *networkNamespace) DisableARPForVIP(srcName string) (Err error) {
+	dstName := ""
+	for _, i := range n.Interfaces() {
+		if i.SrcName() == srcName {
+			dstName = i.DstName()
+			break
+		}
+	}
+	if dstName == "" {
+		return fmt.Errorf("failed to find interface %s in sandbox", srcName)
+	}
+
+	err := n.InvokeFunc(func() {
+		path := filepath.Join("/proc/sys/net/ipv4/conf", dstName, "arp_ignore")
+		if err := ioutil.WriteFile(path, []byte{'1', '\n'}, 0644); err != nil {
+			Err = fmt.Errorf("Failed to set %s to 1: %v", path, err)
+			return
+		}
+		path = filepath.Join("/proc/sys/net/ipv4/conf", dstName, "arp_announce")
+		if err := ioutil.WriteFile(path, []byte{'2', '\n'}, 0644); err != nil {
+			Err = fmt.Errorf("Failed to set %s to 2: %v", path, err)
+			return
+		}
+	})
+	if err != nil {
+		return err
+	}
+	return
+}
+
 func (n *networkNamespace) InvokeFunc(f func()) error {
 func (n *networkNamespace) InvokeFunc(f func()) error {
 	return nsInvoke(n.nsPath(), func(nsFD int) error { return nil }, func(callerFD int) error {
 	return nsInvoke(n.nsPath(), func(nsFD int) error { return nil }, func(callerFD int) error {
 		f()
 		f()

+ 4 - 0
libnetwork/osl/sandbox.go

@@ -51,6 +51,10 @@ type Sandbox interface {
 	// RemoveAliasIP removes the passed IP address from the named interface
 	// RemoveAliasIP removes the passed IP address from the named interface
 	RemoveAliasIP(ifName string, ip *net.IPNet) error
 	RemoveAliasIP(ifName string, ip *net.IPNet) error
 
 
+	// DisableARPForVIP disables ARP replies and requests for VIP addresses
+	// on a particular interface
+	DisableARPForVIP(ifName string) error
+
 	// Add a static route to the sandbox.
 	// Add a static route to the sandbox.
 	AddStaticRoute(*types.StaticRoute) error
 	AddStaticRoute(*types.StaticRoute) error
 
 

+ 25 - 3
libnetwork/sandbox.go

@@ -730,7 +730,7 @@ func (sb *sandbox) DisableService() (err error) {
 	return nil
 	return nil
 }
 }
 
 
-func releaseOSSboxResources(osSbox osl.Sandbox, ep *endpoint) {
+func releaseOSSboxResources(osSbox osl.Sandbox, ep *endpoint, ingress bool) {
 	for _, i := range osSbox.Info().Interfaces() {
 	for _, i := range osSbox.Info().Interfaces() {
 		// Only remove the interfaces owned by this endpoint from the sandbox.
 		// Only remove the interfaces owned by this endpoint from the sandbox.
 		if ep.hasInterface(i.SrcName()) {
 		if ep.hasInterface(i.SrcName()) {
@@ -742,8 +742,16 @@ func releaseOSSboxResources(osSbox osl.Sandbox, ep *endpoint) {
 
 
 	ep.Lock()
 	ep.Lock()
 	joinInfo := ep.joinInfo
 	joinInfo := ep.joinInfo
+	vip := ep.virtualIP
 	ep.Unlock()
 	ep.Unlock()
 
 
+	if len(vip) > 0 && !ingress {
+		ipNet := &net.IPNet{IP: vip, Mask: net.CIDRMask(32, 32)}
+		if err := osSbox.RemoveAliasIP(osSbox.GetLoopbackIfaceName(), ipNet); err != nil {
+			logrus.WithError(err).Debugf("failed to remove virtual ip %v to loopback", ipNet)
+		}
+	}
+
 	if joinInfo == nil {
 	if joinInfo == nil {
 		return
 		return
 	}
 	}
@@ -760,6 +768,7 @@ func (sb *sandbox) releaseOSSbox() {
 	sb.Lock()
 	sb.Lock()
 	osSbox := sb.osSbox
 	osSbox := sb.osSbox
 	sb.osSbox = nil
 	sb.osSbox = nil
+	ingress := sb.ingress
 	sb.Unlock()
 	sb.Unlock()
 
 
 	if osSbox == nil {
 	if osSbox == nil {
@@ -767,7 +776,7 @@ func (sb *sandbox) releaseOSSbox() {
 	}
 	}
 
 
 	for _, ep := range sb.getConnectedEndpoints() {
 	for _, ep := range sb.getConnectedEndpoints() {
-		releaseOSSboxResources(osSbox, ep)
+		releaseOSSboxResources(osSbox, ep, ingress)
 	}
 	}
 
 
 	osSbox.Destroy()
 	osSbox.Destroy()
@@ -854,6 +863,18 @@ func (sb *sandbox) populateNetworkResources(ep *endpoint) error {
 		if err := sb.osSbox.AddInterface(i.srcName, i.dstPrefix, ifaceOptions...); err != nil {
 		if err := sb.osSbox.AddInterface(i.srcName, i.dstPrefix, ifaceOptions...); err != nil {
 			return fmt.Errorf("failed to add interface %s to sandbox: %v", i.srcName, err)
 			return fmt.Errorf("failed to add interface %s to sandbox: %v", i.srcName, err)
 		}
 		}
+
+		if len(ep.virtualIP) > 0 && !sb.ingress {
+			if sb.loadBalancerNID == "" {
+				if err := sb.osSbox.DisableARPForVIP(i.srcName); err != nil {
+					return fmt.Errorf("failed disable ARP for VIP: %v", err)
+				}
+			}
+			ipNet := &net.IPNet{IP: ep.virtualIP, Mask: net.CIDRMask(32, 32)}
+			if err := sb.osSbox.AddAliasIP(sb.osSbox.GetLoopbackIfaceName(), ipNet); err != nil {
+				return fmt.Errorf("failed to add virtual ip %v to loopback: %v", ipNet, err)
+			}
+		}
 	}
 	}
 
 
 	if joinInfo != nil {
 	if joinInfo != nil {
@@ -904,9 +925,10 @@ func (sb *sandbox) clearNetworkResources(origEp *endpoint) error {
 	sb.Lock()
 	sb.Lock()
 	osSbox := sb.osSbox
 	osSbox := sb.osSbox
 	inDelete := sb.inDelete
 	inDelete := sb.inDelete
+	ingress := sb.ingress
 	sb.Unlock()
 	sb.Unlock()
 	if osSbox != nil {
 	if osSbox != nil {
-		releaseOSSboxResources(osSbox, ep)
+		releaseOSSboxResources(osSbox, ep, ingress)
 	}
 	}
 
 
 	sb.Lock()
 	sb.Lock()

+ 18 - 6
libnetwork/service_linux.go

@@ -142,7 +142,7 @@ func (n *network) addLBBackend(ip net.IP, lb *loadBalancer) {
 		}
 		}
 
 
 		logrus.Debugf("Creating service for vip %s fwMark %d ingressPorts %#v in sbox %.7s (%.7s)", lb.vip, lb.fwMark, lb.service.ingressPorts, sb.ID(), sb.ContainerID())
 		logrus.Debugf("Creating service for vip %s fwMark %d ingressPorts %#v in sbox %.7s (%.7s)", lb.vip, lb.fwMark, lb.service.ingressPorts, sb.ID(), sb.ContainerID())
-		if err := invokeFWMarker(sb.Key(), lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, false); err != nil {
+		if err := invokeFWMarker(sb.Key(), lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, false, n.ingress); err != nil {
 			logrus.Errorf("Failed to add firewall mark rule in sbox %.7s (%.7s): %v", sb.ID(), sb.ContainerID(), err)
 			logrus.Errorf("Failed to add firewall mark rule in sbox %.7s (%.7s): %v", sb.ID(), sb.ContainerID(), err)
 			return
 			return
 		}
 		}
@@ -158,6 +158,9 @@ func (n *network) addLBBackend(ip net.IP, lb *loadBalancer) {
 		Address:       ip,
 		Address:       ip,
 		Weight:        1,
 		Weight:        1,
 	}
 	}
+	if !n.ingress {
+		d.ConnectionFlags = ipvs.ConnFwdDirectRoute
+	}
 
 
 	// Remove the sched name before using the service to add
 	// Remove the sched name before using the service to add
 	// destination.
 	// destination.
@@ -203,6 +206,9 @@ func (n *network) rmLBBackend(ip net.IP, lb *loadBalancer, rmService bool, fullR
 		Address:       ip,
 		Address:       ip,
 		Weight:        1,
 		Weight:        1,
 	}
 	}
+	if !n.ingress {
+		d.ConnectionFlags = ipvs.ConnFwdDirectRoute
+	}
 
 
 	if fullRemove {
 	if fullRemove {
 		if err := i.DelDestination(s, d); err != nil && err != syscall.ENOENT {
 		if err := i.DelDestination(s, d); err != nil && err != syscall.ENOENT {
@@ -231,7 +237,7 @@ func (n *network) rmLBBackend(ip net.IP, lb *loadBalancer, rmService bool, fullR
 			}
 			}
 		}
 		}
 
 
-		if err := invokeFWMarker(sb.Key(), lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, true); err != nil {
+		if err := invokeFWMarker(sb.Key(), lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, true, n.ingress); err != nil {
 			logrus.Errorf("Failed to delete firewall mark rule in sbox %.7s (%.7s): %v", sb.ID(), sb.ContainerID(), err)
 			logrus.Errorf("Failed to delete firewall mark rule in sbox %.7s (%.7s): %v", sb.ID(), sb.ContainerID(), err)
 		}
 		}
 
 
@@ -566,7 +572,7 @@ func readPortsFromFile(fileName string) ([]*PortConfig, error) {
 
 
 // Invoke fwmarker reexec routine to mark vip destined packets with
 // Invoke fwmarker reexec routine to mark vip destined packets with
 // the passed firewall mark.
 // the passed firewall mark.
-func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, eIP *net.IPNet, isDelete bool) error {
+func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, eIP *net.IPNet, isDelete bool, isIngress bool) error {
 	var ingressPortsFile string
 	var ingressPortsFile string
 
 
 	if len(ingressPorts) != 0 {
 	if len(ingressPorts) != 0 {
@@ -584,9 +590,14 @@ func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*Port
 		addDelOpt = "-D"
 		addDelOpt = "-D"
 	}
 	}
 
 
+	isIngressOpt := "false"
+	if isIngress {
+		isIngressOpt = "true"
+	}
+
 	cmd := &exec.Cmd{
 	cmd := &exec.Cmd{
 		Path:   reexec.Self(),
 		Path:   reexec.Self(),
-		Args:   append([]string{"fwmarker"}, path, vip.String(), fmt.Sprintf("%d", fwMark), addDelOpt, ingressPortsFile, eIP.String()),
+		Args:   append([]string{"fwmarker"}, path, vip.String(), fmt.Sprintf("%d", fwMark), addDelOpt, ingressPortsFile, eIP.String(), isIngressOpt),
 		Stdout: os.Stdout,
 		Stdout: os.Stdout,
 		Stderr: os.Stderr,
 		Stderr: os.Stderr,
 	}
 	}
@@ -603,7 +614,7 @@ func fwMarker() {
 	runtime.LockOSThread()
 	runtime.LockOSThread()
 	defer runtime.UnlockOSThread()
 	defer runtime.UnlockOSThread()
 
 
-	if len(os.Args) < 7 {
+	if len(os.Args) < 8 {
 		logrus.Error("invalid number of arguments..")
 		logrus.Error("invalid number of arguments..")
 		os.Exit(1)
 		os.Exit(1)
 	}
 	}
@@ -645,7 +656,8 @@ func fwMarker() {
 		os.Exit(5)
 		os.Exit(5)
 	}
 	}
 
 
-	if addDelOpt == "-A" {
+	isIngressOpt := os.Args[7]
+	if addDelOpt == "-A" && isIngressOpt == "true" {
 		eIP, subnet, err := net.ParseCIDR(os.Args[6])
 		eIP, subnet, err := net.ParseCIDR(os.Args[6])
 		if err != nil {
 		if err != nil {
 			logrus.Errorf("Failed to parse endpoint IP %s: %v", os.Args[6], err)
 			logrus.Errorf("Failed to parse endpoint IP %s: %v", os.Args[6], err)