Przeglądaj źródła

Merge pull request #26844 from mrjana/net

Vendoring libnetwork @6caf9022fa09
Brian Goff 8 lat temu
rodzic
commit
2c41c020d1

+ 7 - 0
daemon/cluster/cluster.go

@@ -719,6 +719,13 @@ func (c *Cluster) GetLocalAddress() string {
 	return c.actualLocalAddr
 }
 
+// GetListenAddress returns the listen address.
+func (c *Cluster) GetListenAddress() string {
+	c.RLock()
+	defer c.RUnlock()
+	return c.listenAddr
+}
+
 // GetAdvertiseAddress returns the remotely reachable address of this node.
 func (c *Cluster) GetAdvertiseAddress() string {
 	c.RLock()

+ 1 - 1
hack/vendor.sh

@@ -70,7 +70,7 @@ clone git github.com/RackSec/srslog 365bf33cd9acc21ae1c355209865f17228ca534e
 clone git github.com/imdario/mergo 0.2.1
 
 #get libnetwork packages
-clone git github.com/docker/libnetwork e69621c5fb6882627f83187ebefe7709a7211277
+clone git github.com/docker/libnetwork 6caf9022fa093e0247f9f4b572edca868c27ece3
 clone git github.com/docker/go-events 18b43f1bc85d9cdd42c05a6cd2d444c7a200a894
 clone git github.com/armon/go-radix e39d623f12e8e41c7b5529e9a9dd67a1e2261f80
 clone git github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec

+ 6 - 3
vendor/src/github.com/docker/libnetwork/agent.go

@@ -172,10 +172,12 @@ func (c *controller) agentSetup() error {
 	advAddr := clusterProvider.GetAdvertiseAddress()
 	remote := clusterProvider.GetRemoteAddress()
 	remoteAddr, _, _ := net.SplitHostPort(remote)
+	listen := clusterProvider.GetListenAddress()
+	listenAddr, _, _ := net.SplitHostPort(listen)
 
-	logrus.Infof("Initializing Libnetwork Agent Local-addr=%s Adv-addr=%s Remote-addr =%s", bindAddr, advAddr, remoteAddr)
+	logrus.Infof("Initializing Libnetwork Agent Listen-Addr=%s Local-addr=%s Adv-addr=%s Remote-addr =%s", listenAddr, bindAddr, advAddr, remoteAddr)
 	if advAddr != "" && c.agent == nil {
-		if err := c.agentInit(bindAddr, advAddr); err != nil {
+		if err := c.agentInit(listenAddr, bindAddr, advAddr); err != nil {
 			logrus.Errorf("Error in agentInit : %v", err)
 		} else {
 			c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
@@ -236,7 +238,7 @@ func (c *controller) getPrimaryKeyTag(subsys string) ([]byte, uint64, error) {
 	return keys[1].Key, keys[1].LamportTime, nil
 }
 
-func (c *controller) agentInit(bindAddrOrInterface, advertiseAddr string) error {
+func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr string) error {
 	if !c.isAgent() {
 		return nil
 	}
@@ -252,6 +254,7 @@ func (c *controller) agentInit(bindAddrOrInterface, advertiseAddr string) error
 	logrus.Info("Gossip cluster hostname ", nodeName)
 
 	nDB, err := networkdb.New(&networkdb.Config{
+		BindAddr:      listenAddr,
 		AdvertiseAddr: advertiseAddr,
 		NodeName:      nodeName,
 		Keys:          keys,

+ 1 - 0
vendor/src/github.com/docker/libnetwork/cluster/provider.go

@@ -10,6 +10,7 @@ type Provider interface {
 	IsManager() bool
 	IsAgent() bool
 	GetLocalAddress() string
+	GetListenAddress() string
 	GetAdvertiseAddress() string
 	GetRemoteAddress() string
 	ListenClusterEvents() <-chan struct{}

+ 15 - 0
vendor/src/github.com/docker/libnetwork/drivers/overlay/filter.go

@@ -12,6 +12,13 @@ const globalChain = "DOCKER-OVERLAY"
 
 var filterOnce sync.Once
 
+var filterChan = make(chan struct{}, 1)
+
+func filterWait() func() {
+	filterChan <- struct{}{}
+	return func() { <-filterChan }
+}
+
 func chainExists(cname string) bool {
 	if _, err := iptables.Raw("-L", cname); err != nil {
 		return false
@@ -69,10 +76,14 @@ func setNetworkChain(cname string, remove bool) error {
 }
 
 func addNetworkChain(cname string) error {
+	defer filterWait()()
+
 	return setNetworkChain(cname, false)
 }
 
 func removeNetworkChain(cname string) error {
+	defer filterWait()()
+
 	return setNetworkChain(cname, true)
 }
 
@@ -119,9 +130,13 @@ func setFilters(cname, brName string, remove bool) error {
 }
 
 func addFilters(cname, brName string) error {
+	defer filterWait()()
+
 	return setFilters(cname, brName, false)
 }
 
 func removeFilters(cname, brName string) error {
+	defer filterWait()()
+
 	return setFilters(cname, brName, true)
 }

+ 1 - 0
vendor/src/github.com/docker/libnetwork/networkdb/cluster.go

@@ -86,6 +86,7 @@ func (nDB *NetworkDB) RemoveKey(key []byte) {
 func (nDB *NetworkDB) clusterInit() error {
 	config := memberlist.DefaultLANConfig()
 	config.Name = nDB.config.NodeName
+	config.BindAddr = nDB.config.BindAddr
 	config.AdvertiseAddr = nDB.config.AdvertiseAddr
 
 	if nDB.config.BindPort != 0 {

+ 4 - 0
vendor/src/github.com/docker/libnetwork/networkdb/networkdb.go

@@ -121,6 +121,10 @@ type Config struct {
 	// NodeName is the cluster wide unique name for this node.
 	NodeName string
 
+	// BindAddr is the IP on which networkdb listens. It can be
+	// 0.0.0.0 to listen on all addresses on the host.
+	BindAddr string
+
 	// AdvertiseAddr is the node's IP address that we advertise for
 	// cluster communication.
 	AdvertiseAddr string

+ 7 - 5
vendor/src/github.com/docker/libnetwork/sandbox.go

@@ -421,8 +421,7 @@ func (sb *sandbox) ResolveIP(ip string) string {
 }
 
 func (sb *sandbox) ExecFunc(f func()) error {
-	sb.osSbox.InvokeFunc(f)
-	return nil
+	return sb.osSbox.InvokeFunc(f)
 }
 
 func (sb *sandbox) ResolveService(name string) ([]*net.SRV, []net.IP) {
@@ -639,9 +638,12 @@ func (sb *sandbox) SetKey(basePath string) error {
 	if oldosSbox != nil && sb.resolver != nil {
 		sb.resolver.Stop()
 
-		sb.osSbox.InvokeFunc(sb.resolver.SetupFunc(0))
-		if err := sb.resolver.Start(); err != nil {
-			log.Errorf("Resolver Setup/Start failed for container %s, %q", sb.ContainerID(), err)
+		if err := sb.osSbox.InvokeFunc(sb.resolver.SetupFunc(0)); err == nil {
+			if err := sb.resolver.Start(); err != nil {
+				log.Errorf("Resolver Start failed for container %s, %q", sb.ContainerID(), err)
+			}
+		} else {
+			log.Errorf("Resolver Setup Function failed for container %s, %q", sb.ContainerID(), err)
 		}
 	}
 

+ 6 - 2
vendor/src/github.com/docker/libnetwork/sandbox_dns_unix.go

@@ -46,9 +46,13 @@ func (sb *sandbox) startResolver(restore bool) {
 		}
 		sb.resolver.SetExtServers(sb.extDNS)
 
-		sb.osSbox.InvokeFunc(sb.resolver.SetupFunc(0))
+		if err = sb.osSbox.InvokeFunc(sb.resolver.SetupFunc(0)); err != nil {
+			log.Errorf("Resolver Setup function failed for container %s, %q", sb.ContainerID(), err)
+			return
+		}
+
 		if err = sb.resolver.Start(); err != nil {
-			log.Errorf("Resolver Setup/Start failed for container %s, %q", sb.ContainerID(), err)
+			log.Errorf("Resolver Start failed for container %s, %q", sb.ContainerID(), err)
 		}
 	})
 }

+ 141 - 43
vendor/src/github.com/docker/libnetwork/service_linux.go

@@ -26,6 +26,7 @@ import (
 
 func init() {
 	reexec.Register("fwmarker", fwMarker)
+	reexec.Register("redirecter", redirecter)
 }
 
 func newService(name string, id string, ingressPorts []*PortConfig, aliases []string) *service {
@@ -275,6 +276,12 @@ func (sb *sandbox) populateLoadbalancers(ep *endpoint) {
 	n := ep.getNetwork()
 	eIP := ep.Iface().Address()
 
+	if n.ingress {
+		if err := addRedirectRules(sb.Key(), eIP, ep.ingressPorts); err != nil {
+			logrus.Errorf("Failed to add redirect rules for ep %s: %v", ep.Name(), err)
+		}
+	}
+
 	if sb.ingress {
 		// For the ingress sandbox if this is not gateway
 		// endpoint do nothing.
@@ -380,17 +387,17 @@ func (sb *sandbox) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*P
 	}
 
 	if addService {
-		var iPorts []*PortConfig
+		var filteredPorts []*PortConfig
 		if sb.ingress {
-			iPorts = filterPortConfigs(ingressPorts, false)
-			if err := programIngress(gwIP, iPorts, false); err != nil {
+			filteredPorts = filterPortConfigs(ingressPorts, false)
+			if err := programIngress(gwIP, filteredPorts, false); err != nil {
 				logrus.Errorf("Failed to add ingress: %v", err)
 				return
 			}
 		}
 
-		logrus.Debugf("Creating service for vip %s fwMark %d ingressPorts %#v", vip, fwMark, iPorts)
-		if err := invokeFWMarker(sb.Key(), vip, fwMark, iPorts, eIP, false); err != nil {
+		logrus.Debugf("Creating service for vip %s fwMark %d ingressPorts %#v", vip, fwMark, ingressPorts)
+		if err := invokeFWMarker(sb.Key(), vip, fwMark, ingressPorts, eIP, false); err != nil {
 			logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err)
 			return
 		}
@@ -453,15 +460,15 @@ func (sb *sandbox) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Po
 			logrus.Errorf("Failed to delete a new service for vip %s fwmark %d: %v", vip, fwMark, err)
 		}
 
-		var iPorts []*PortConfig
+		var filteredPorts []*PortConfig
 		if sb.ingress {
-			iPorts = filterPortConfigs(ingressPorts, true)
-			if err := programIngress(gwIP, iPorts, true); err != nil {
+			filteredPorts = filterPortConfigs(ingressPorts, true)
+			if err := programIngress(gwIP, filteredPorts, true); err != nil {
 				logrus.Errorf("Failed to delete ingress: %v", err)
 			}
 		}
 
-		if err := invokeFWMarker(sb.Key(), vip, fwMark, iPorts, eIP, true); err != nil {
+		if err := invokeFWMarker(sb.Key(), vip, fwMark, ingressPorts, eIP, true); err != nil {
 			logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err)
 		}
 	}
@@ -715,33 +722,57 @@ func plumbProxy(iPort *PortConfig, isDelete bool) error {
 	return nil
 }
 
+func writePortsToFile(ports []*PortConfig) (string, error) {
+	f, err := ioutil.TempFile("", "port_configs")
+	if err != nil {
+		return "", err
+	}
+	defer f.Close()
+
+	buf, err := proto.Marshal(&EndpointRecord{
+		IngressPorts: ports,
+	})
+
+	n, err := f.Write(buf)
+	if err != nil {
+		return "", err
+	}
+
+	if n < len(buf) {
+		return "", io.ErrShortWrite
+	}
+
+	return f.Name(), nil
+}
+
+func readPortsFromFile(fileName string) ([]*PortConfig, error) {
+	buf, err := ioutil.ReadFile(fileName)
+	if err != nil {
+		return nil, err
+	}
+
+	var epRec EndpointRecord
+	err = proto.Unmarshal(buf, &epRec)
+	if err != nil {
+		return nil, err
+	}
+
+	return epRec.IngressPorts, nil
+}
+
 // Invoke fwmarker reexec routine to mark vip destined packets with
 // the passed firewall mark.
 func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, eIP *net.IPNet, isDelete bool) error {
 	var ingressPortsFile string
-	if len(ingressPorts) != 0 {
-		f, err := ioutil.TempFile("", "port_configs")
-		if err != nil {
-			return err
-		}
 
-		buf, err := proto.Marshal(&EndpointRecord{
-			IngressPorts: ingressPorts,
-		})
-
-		n, err := f.Write(buf)
+	if len(ingressPorts) != 0 {
+		var err error
+		ingressPortsFile, err = writePortsToFile(ingressPorts)
 		if err != nil {
-			f.Close()
 			return err
 		}
 
-		if n < len(buf) {
-			f.Close()
-			return io.ErrShortWrite
-		}
-
-		ingressPortsFile = f.Name()
-		f.Close()
+		defer os.Remove(ingressPortsFile)
 	}
 
 	addDelOpt := "-A"
@@ -775,20 +806,12 @@ func fwMarker() {
 
 	var ingressPorts []*PortConfig
 	if os.Args[5] != "" {
-		buf, err := ioutil.ReadFile(os.Args[5])
+		var err error
+		ingressPorts, err = readPortsFromFile(os.Args[5])
 		if err != nil {
-			logrus.Errorf("Failed to read ports config file: %v", err)
+			logrus.Errorf("Failed reading ingress ports file: %v", err)
 			os.Exit(6)
 		}
-
-		var epRec EndpointRecord
-		err = proto.Unmarshal(buf, &epRec)
-		if err != nil {
-			logrus.Errorf("Failed to unmarshal ports config data: %v", err)
-			os.Exit(7)
-		}
-
-		ingressPorts = epRec.IngressPorts
 	}
 
 	vip := os.Args[2]
@@ -801,11 +824,7 @@ func fwMarker() {
 
 	rules := [][]string{}
 	for _, iPort := range ingressPorts {
-		rule := strings.Fields(fmt.Sprintf("-t nat %s PREROUTING -p %s --dport %d -j REDIRECT --to-port %d",
-			addDelOpt, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, iPort.TargetPort))
-		rules = append(rules, rule)
-
-		rule = strings.Fields(fmt.Sprintf("-t mangle %s PREROUTING -p %s --dport %d -j MARK --set-mark %d",
+		rule := strings.Fields(fmt.Sprintf("-t mangle %s PREROUTING -p %s --dport %d -j MARK --set-mark %d",
 			addDelOpt, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, fwMark))
 		rules = append(rules, rule)
 	}
@@ -852,3 +871,82 @@ func fwMarker() {
 		}
 	}
 }
+
+func addRedirectRules(path string, eIP *net.IPNet, ingressPorts []*PortConfig) error {
+	var ingressPortsFile string
+
+	if len(ingressPorts) != 0 {
+		var err error
+		ingressPortsFile, err = writePortsToFile(ingressPorts)
+		if err != nil {
+			return err
+		}
+		defer os.Remove(ingressPortsFile)
+	}
+
+	cmd := &exec.Cmd{
+		Path:   reexec.Self(),
+		Args:   append([]string{"redirecter"}, path, eIP.String(), ingressPortsFile),
+		Stdout: os.Stdout,
+		Stderr: os.Stderr,
+	}
+
+	if err := cmd.Run(); err != nil {
+		return fmt.Errorf("reexec failed: %v", err)
+	}
+
+	return nil
+}
+
+// Redirecter reexec function.
+func redirecter() {
+	runtime.LockOSThread()
+	defer runtime.UnlockOSThread()
+
+	if len(os.Args) < 4 {
+		logrus.Error("invalid number of arguments..")
+		os.Exit(1)
+	}
+
+	var ingressPorts []*PortConfig
+	if os.Args[3] != "" {
+		var err error
+		ingressPorts, err = readPortsFromFile(os.Args[3])
+		if err != nil {
+			logrus.Errorf("Failed reading ingress ports file: %v", err)
+			os.Exit(2)
+		}
+	}
+
+	eIP, _, err := net.ParseCIDR(os.Args[2])
+	if err != nil {
+		logrus.Errorf("Failed to parse endpoint IP %s: %v", os.Args[2], err)
+		os.Exit(3)
+	}
+
+	rules := [][]string{}
+	for _, iPort := range ingressPorts {
+		rule := strings.Fields(fmt.Sprintf("-t nat -A PREROUTING -d %s -p %s --dport %d -j REDIRECT --to-port %d",
+			eIP.String(), strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, iPort.TargetPort))
+		rules = append(rules, rule)
+	}
+
+	ns, err := netns.GetFromPath(os.Args[1])
+	if err != nil {
+		logrus.Errorf("failed get network namespace %q: %v", os.Args[1], err)
+		os.Exit(4)
+	}
+	defer ns.Close()
+
+	if err := netns.Set(ns); err != nil {
+		logrus.Errorf("setting into container net ns %v failed, %v", os.Args[1], err)
+		os.Exit(5)
+	}
+
+	for _, rule := range rules {
+		if err := iptables.RawCombinedOutputNative(rule...); err != nil {
+			logrus.Errorf("setting up rule failed, %v: %v", rule, err)
+			os.Exit(5)
+		}
+	}
+}