Explorar o código

Merge pull request #1432 from mrjana/lb

Use complete port configs when plumbing mark rules
Madhu Venugopal %!s(int64=8) %!d(string=hai) anos
pai
achega
3873f01f64
Modificáronse 1 ficheiros con 141 adicións e 43 borrados
  1. 141 43
      libnetwork/service_linux.go

+ 141 - 43
libnetwork/service_linux.go

@@ -26,6 +26,7 @@ import (
 
 func init() {
 	reexec.Register("fwmarker", fwMarker)
+	reexec.Register("redirecter", redirecter)
 }
 
 func newService(name string, id string, ingressPorts []*PortConfig, aliases []string) *service {
@@ -275,6 +276,12 @@ func (sb *sandbox) populateLoadbalancers(ep *endpoint) {
 	n := ep.getNetwork()
 	eIP := ep.Iface().Address()
 
+	if n.ingress {
+		if err := addRedirectRules(sb.Key(), eIP, ep.ingressPorts); err != nil {
+			logrus.Errorf("Failed to add redirect rules for ep %s: %v", ep.Name(), err)
+		}
+	}
+
 	if sb.ingress {
 		// For the ingress sandbox if this is not gateway
 		// endpoint do nothing.
@@ -380,17 +387,17 @@ func (sb *sandbox) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*P
 	}
 
 	if addService {
-		var iPorts []*PortConfig
+		var filteredPorts []*PortConfig
 		if sb.ingress {
-			iPorts = filterPortConfigs(ingressPorts, false)
-			if err := programIngress(gwIP, iPorts, false); err != nil {
+			filteredPorts = filterPortConfigs(ingressPorts, false)
+			if err := programIngress(gwIP, filteredPorts, false); err != nil {
 				logrus.Errorf("Failed to add ingress: %v", err)
 				return
 			}
 		}
 
-		logrus.Debugf("Creating service for vip %s fwMark %d ingressPorts %#v", vip, fwMark, iPorts)
-		if err := invokeFWMarker(sb.Key(), vip, fwMark, iPorts, eIP, false); err != nil {
+		logrus.Debugf("Creating service for vip %s fwMark %d ingressPorts %#v", vip, fwMark, ingressPorts)
+		if err := invokeFWMarker(sb.Key(), vip, fwMark, ingressPorts, eIP, false); err != nil {
 			logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err)
 			return
 		}
@@ -453,15 +460,15 @@ func (sb *sandbox) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Po
 			logrus.Errorf("Failed to delete a new service for vip %s fwmark %d: %v", vip, fwMark, err)
 		}
 
-		var iPorts []*PortConfig
+		var filteredPorts []*PortConfig
 		if sb.ingress {
-			iPorts = filterPortConfigs(ingressPorts, true)
-			if err := programIngress(gwIP, iPorts, true); err != nil {
+			filteredPorts = filterPortConfigs(ingressPorts, true)
+			if err := programIngress(gwIP, filteredPorts, true); err != nil {
 				logrus.Errorf("Failed to delete ingress: %v", err)
 			}
 		}
 
-		if err := invokeFWMarker(sb.Key(), vip, fwMark, iPorts, eIP, true); err != nil {
+		if err := invokeFWMarker(sb.Key(), vip, fwMark, ingressPorts, eIP, true); err != nil {
 			logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err)
 		}
 	}
@@ -715,33 +722,57 @@ func plumbProxy(iPort *PortConfig, isDelete bool) error {
 	return nil
 }
 
+func writePortsToFile(ports []*PortConfig) (string, error) {
+	f, err := ioutil.TempFile("", "port_configs")
+	if err != nil {
+		return "", err
+	}
+	defer f.Close()
+
+	buf, err := proto.Marshal(&EndpointRecord{
+		IngressPorts: ports,
+	})
+
+	n, err := f.Write(buf)
+	if err != nil {
+		return "", err
+	}
+
+	if n < len(buf) {
+		return "", io.ErrShortWrite
+	}
+
+	return f.Name(), nil
+}
+
+func readPortsFromFile(fileName string) ([]*PortConfig, error) {
+	buf, err := ioutil.ReadFile(fileName)
+	if err != nil {
+		return nil, err
+	}
+
+	var epRec EndpointRecord
+	err = proto.Unmarshal(buf, &epRec)
+	if err != nil {
+		return nil, err
+	}
+
+	return epRec.IngressPorts, nil
+}
+
 // Invoke fwmarker reexec routine to mark vip destined packets with
 // the passed firewall mark.
 func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, eIP *net.IPNet, isDelete bool) error {
 	var ingressPortsFile string
-	if len(ingressPorts) != 0 {
-		f, err := ioutil.TempFile("", "port_configs")
-		if err != nil {
-			return err
-		}
 
-		buf, err := proto.Marshal(&EndpointRecord{
-			IngressPorts: ingressPorts,
-		})
-
-		n, err := f.Write(buf)
+	if len(ingressPorts) != 0 {
+		var err error
+		ingressPortsFile, err = writePortsToFile(ingressPorts)
 		if err != nil {
-			f.Close()
 			return err
 		}
 
-		if n < len(buf) {
-			f.Close()
-			return io.ErrShortWrite
-		}
-
-		ingressPortsFile = f.Name()
-		f.Close()
+		defer os.Remove(ingressPortsFile)
 	}
 
 	addDelOpt := "-A"
@@ -775,20 +806,12 @@ func fwMarker() {
 
 	var ingressPorts []*PortConfig
 	if os.Args[5] != "" {
-		buf, err := ioutil.ReadFile(os.Args[5])
+		var err error
+		ingressPorts, err = readPortsFromFile(os.Args[5])
 		if err != nil {
-			logrus.Errorf("Failed to read ports config file: %v", err)
+			logrus.Errorf("Failed reading ingress ports file: %v", err)
 			os.Exit(6)
 		}
-
-		var epRec EndpointRecord
-		err = proto.Unmarshal(buf, &epRec)
-		if err != nil {
-			logrus.Errorf("Failed to unmarshal ports config data: %v", err)
-			os.Exit(7)
-		}
-
-		ingressPorts = epRec.IngressPorts
 	}
 
 	vip := os.Args[2]
@@ -801,11 +824,7 @@ func fwMarker() {
 
 	rules := [][]string{}
 	for _, iPort := range ingressPorts {
-		rule := strings.Fields(fmt.Sprintf("-t nat %s PREROUTING -p %s --dport %d -j REDIRECT --to-port %d",
-			addDelOpt, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, iPort.TargetPort))
-		rules = append(rules, rule)
-
-		rule = strings.Fields(fmt.Sprintf("-t mangle %s PREROUTING -p %s --dport %d -j MARK --set-mark %d",
+		rule := strings.Fields(fmt.Sprintf("-t mangle %s PREROUTING -p %s --dport %d -j MARK --set-mark %d",
 			addDelOpt, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, fwMark))
 		rules = append(rules, rule)
 	}
@@ -852,3 +871,82 @@ func fwMarker() {
 		}
 	}
 }
+
+func addRedirectRules(path string, eIP *net.IPNet, ingressPorts []*PortConfig) error {
+	var ingressPortsFile string
+
+	if len(ingressPorts) != 0 {
+		var err error
+		ingressPortsFile, err = writePortsToFile(ingressPorts)
+		if err != nil {
+			return err
+		}
+		defer os.Remove(ingressPortsFile)
+	}
+
+	cmd := &exec.Cmd{
+		Path:   reexec.Self(),
+		Args:   append([]string{"redirecter"}, path, eIP.String(), ingressPortsFile),
+		Stdout: os.Stdout,
+		Stderr: os.Stderr,
+	}
+
+	if err := cmd.Run(); err != nil {
+		return fmt.Errorf("reexec failed: %v", err)
+	}
+
+	return nil
+}
+
+// Redirecter reexec function.
+func redirecter() {
+	runtime.LockOSThread()
+	defer runtime.UnlockOSThread()
+
+	if len(os.Args) < 4 {
+		logrus.Error("invalid number of arguments..")
+		os.Exit(1)
+	}
+
+	var ingressPorts []*PortConfig
+	if os.Args[3] != "" {
+		var err error
+		ingressPorts, err = readPortsFromFile(os.Args[3])
+		if err != nil {
+			logrus.Errorf("Failed reading ingress ports file: %v", err)
+			os.Exit(2)
+		}
+	}
+
+	eIP, _, err := net.ParseCIDR(os.Args[2])
+	if err != nil {
+		logrus.Errorf("Failed to parse endpoint IP %s: %v", os.Args[2], err)
+		os.Exit(3)
+	}
+
+	rules := [][]string{}
+	for _, iPort := range ingressPorts {
+		rule := strings.Fields(fmt.Sprintf("-t nat -A PREROUTING -d %s -p %s --dport %d -j REDIRECT --to-port %d",
+			eIP.String(), strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, iPort.TargetPort))
+		rules = append(rules, rule)
+	}
+
+	ns, err := netns.GetFromPath(os.Args[1])
+	if err != nil {
+		logrus.Errorf("failed get network namespace %q: %v", os.Args[1], err)
+		os.Exit(4)
+	}
+	defer ns.Close()
+
+	if err := netns.Set(ns); err != nil {
+		logrus.Errorf("setting into container net ns %v failed, %v", os.Args[1], err)
+		os.Exit(5)
+	}
+
+	for _, rule := range rules {
+		if err := iptables.RawCombinedOutputNative(rule...); err != nil {
+			logrus.Errorf("setting up rule failed, %v: %v", rule, err)
+			os.Exit(5)
+		}
+	}
+}