瀏覽代碼

Merge pull request #6682 from erikh/6476-persistent_port_allocation

fix persistent port allocation
Tibor Vass 11 年之前
父節點
當前提交
fef1e0a708

+ 45 - 50
daemon/networkdriver/bridge/driver.go

@@ -20,7 +20,8 @@ import (
 )
 
 const (
-	DefaultNetworkBridge = "docker0"
+	DefaultNetworkBridge     = "docker0"
+	MaxAllocatedPortAttempts = 10
 )
 
 // Network interface represents the networking stack of a container
@@ -354,9 +355,6 @@ func Release(job *engine.Job) engine.Status {
 	var (
 		id                 = job.Args[0]
 		containerInterface = currentInterfaces.Get(id)
-		ip                 net.IP
-		port               int
-		proto              string
 	)
 
 	if containerInterface == nil {
@@ -367,22 +365,6 @@ func Release(job *engine.Job) engine.Status {
 		if err := portmapper.Unmap(nat); err != nil {
 			log.Printf("Unable to unmap port %s: %s", nat, err)
 		}
-
-		// this is host mappings
-		switch a := nat.(type) {
-		case *net.TCPAddr:
-			proto = "tcp"
-			ip = a.IP
-			port = a.Port
-		case *net.UDPAddr:
-			proto = "udp"
-			ip = a.IP
-			port = a.Port
-		}
-
-		if err := portallocator.ReleasePort(ip, proto, port); err != nil {
-			log.Printf("Unable to release port %s", nat)
-		}
 	}
 
 	if err := ipallocator.ReleaseIP(bridgeNetwork, &containerInterface.IP); err != nil {
@@ -399,7 +381,7 @@ func AllocatePort(job *engine.Job) engine.Status {
 		ip            = defaultBindingIP
 		id            = job.Args[0]
 		hostIP        = job.Getenv("HostIP")
-		origHostPort  = job.GetenvInt("HostPort")
+		hostPort      = job.GetenvInt("HostPort")
 		containerPort = job.GetenvInt("ContainerPort")
 		proto         = job.Getenv("Proto")
 		network       = currentInterfaces.Get(id)
@@ -409,39 +391,46 @@ func AllocatePort(job *engine.Job) engine.Status {
 		ip = net.ParseIP(hostIP)
 	}
 
-	var (
-		hostPort  int
-		container net.Addr
-		host      net.Addr
-	)
-
-	/*
-	 Try up to 10 times to get a port that's not already allocated.
+	// host ip, proto, and host port
+	var container net.Addr
+	switch proto {
+	case "tcp":
+		container = &net.TCPAddr{IP: network.IP, Port: containerPort}
+	case "udp":
+		container = &net.UDPAddr{IP: network.IP, Port: containerPort}
+	default:
+		return job.Errorf("unsupported address type %s", proto)
+	}
 
-	 In the event of failure to bind, return the error that portmapper.Map
-	 yields.
-	*/
-	for i := 0; i < 10; i++ {
-		// host ip, proto, and host port
-		hostPort, err = portallocator.RequestPort(ip, proto, origHostPort)
+	//
+	// Try up to 10 times to get a port that's not already allocated.
+	//
+	// In the event of failure to bind, return the error that portmapper.Map
+	// yields.
+	//
 
-		if err != nil {
-			return job.Error(err)
+	var host net.Addr
+	for i := 0; i < MaxAllocatedPortAttempts; i++ {
+		if host, err = portmapper.Map(container, ip, hostPort); err == nil {
+			break
 		}
 
-		if proto == "tcp" {
-			host = &net.TCPAddr{IP: ip, Port: hostPort}
-			container = &net.TCPAddr{IP: network.IP, Port: containerPort}
-		} else {
-			host = &net.UDPAddr{IP: ip, Port: hostPort}
-			container = &net.UDPAddr{IP: network.IP, Port: containerPort}
-		}
+		switch allocerr := err.(type) {
+		case portallocator.ErrPortAlreadyAllocated:
+			// There is no point in immediately retrying to map an explicitly
+			// chosen port.
+			if hostPort != 0 {
+				job.Logf("Failed to bind %s for container address %s: %s", allocerr.IPPort(), container.String(), allocerr.Error())
+				break
+			}
 
-		if err = portmapper.Map(container, ip, hostPort); err == nil {
+			// Automatically chosen 'free' port failed to bind: move on the next.
+			job.Logf("Failed to bind %s for container address %s. Trying another port.", allocerr.IPPort(), container.String())
+		default:
+			// some other error during mapping
+			job.Logf("Received an unexpected error during port allocation: %s", err.Error())
 			break
 		}
-
-		job.Logf("Failed to bind %s:%d for container address %s:%d. Trying another port.", ip.String(), hostPort, network.IP.String(), containerPort)
 	}
 
 	if err != nil {
@@ -451,12 +440,18 @@ func AllocatePort(job *engine.Job) engine.Status {
 	network.PortMappings = append(network.PortMappings, host)
 
 	out := engine.Env{}
-	out.Set("HostIP", ip.String())
-	out.SetInt("HostPort", hostPort)
-
+	switch netAddr := host.(type) {
+	case *net.TCPAddr:
+		out.Set("HostIP", netAddr.IP.String())
+		out.SetInt("HostPort", netAddr.Port)
+	case *net.UDPAddr:
+		out.Set("HostIP", netAddr.IP.String())
+		out.SetInt("HostPort", netAddr.Port)
+	}
 	if _, err := out.WriteTo(job.Stdout); err != nil {
 		return job.Error(err)
 	}
+
 	return engine.StatusOK
 }
 

+ 106 - 0
daemon/networkdriver/bridge/driver_test.go

@@ -0,0 +1,106 @@
+package bridge
+
+import (
+	"fmt"
+	"net"
+	"strconv"
+	"testing"
+
+	"github.com/dotcloud/docker/engine"
+)
+
+func findFreePort(t *testing.T) int {
+	l, err := net.Listen("tcp", ":0")
+	if err != nil {
+		t.Fatal("Failed to find a free port")
+	}
+	defer l.Close()
+
+	result, err := net.ResolveTCPAddr("tcp", l.Addr().String())
+	if err != nil {
+		t.Fatal("Failed to resolve address to identify free port")
+	}
+	return result.Port
+}
+
+func newPortAllocationJob(eng *engine.Engine, port int) (job *engine.Job) {
+	strPort := strconv.Itoa(port)
+
+	job = eng.Job("allocate_port", "container_id")
+	job.Setenv("HostIP", "127.0.0.1")
+	job.Setenv("HostPort", strPort)
+	job.Setenv("Proto", "tcp")
+	job.Setenv("ContainerPort", strPort)
+	return
+}
+
+func TestAllocatePortDetection(t *testing.T) {
+	eng := engine.New()
+	eng.Logging = false
+
+	freePort := findFreePort(t)
+
+	// Init driver
+	job := eng.Job("initdriver")
+	if res := InitDriver(job); res != engine.StatusOK {
+		t.Fatal("Failed to initialize network driver")
+	}
+
+	// Allocate interface
+	job = eng.Job("allocate_interface", "container_id")
+	if res := Allocate(job); res != engine.StatusOK {
+		t.Fatal("Failed to allocate network interface")
+	}
+
+	// Allocate same port twice, expect failure on second call
+	job = newPortAllocationJob(eng, freePort)
+	if res := AllocatePort(job); res != engine.StatusOK {
+		t.Fatal("Failed to find a free port to allocate")
+	}
+	if res := AllocatePort(job); res == engine.StatusOK {
+		t.Fatal("Duplicate port allocation granted by AllocatePort")
+	}
+}
+
+func TestAllocatePortReclaim(t *testing.T) {
+	eng := engine.New()
+	eng.Logging = false
+
+	freePort := findFreePort(t)
+
+	// Init driver
+	job := eng.Job("initdriver")
+	if res := InitDriver(job); res != engine.StatusOK {
+		t.Fatal("Failed to initialize network driver")
+	}
+
+	// Allocate interface
+	job = eng.Job("allocate_interface", "container_id")
+	if res := Allocate(job); res != engine.StatusOK {
+		t.Fatal("Failed to allocate network interface")
+	}
+
+	// Occupy port
+	listenAddr := fmt.Sprintf(":%d", freePort)
+	tcpListenAddr, err := net.ResolveTCPAddr("tcp", listenAddr)
+	if err != nil {
+		t.Fatalf("Failed to resolve TCP address '%s'", listenAddr)
+	}
+
+	l, err := net.ListenTCP("tcp", tcpListenAddr)
+	if err != nil {
+		t.Fatalf("Fail to listen on port %d", freePort)
+	}
+
+	// Allocate port, expect failure
+	job = newPortAllocationJob(eng, freePort)
+	if res := AllocatePort(job); res == engine.StatusOK {
+		t.Fatal("Successfully allocated currently used port")
+	}
+
+	// Reclaim port, retry allocation
+	l.Close()
+	if res := AllocatePort(job); res != engine.StatusOK {
+		t.Fatal("Failed to allocate previously reclaimed port")
+	}
+}

+ 60 - 21
daemon/networkdriver/portallocator/portallocator.go

@@ -2,13 +2,18 @@ package portallocator
 
 import (
 	"errors"
+	"fmt"
 	"net"
 	"sync"
 )
 
+type portMap struct {
+	p    map[int]struct{}
+	last int
+}
+
 type (
-	portMap     map[int]bool
-	protocolMap map[string]portMap
+	protocolMap map[string]*portMap
 	ipMapping   map[string]protocolMap
 )
 
@@ -18,9 +23,8 @@ const (
 )
 
 var (
-	ErrAllPortsAllocated    = errors.New("all ports are allocated")
-	ErrPortAlreadyAllocated = errors.New("port has already been allocated")
-	ErrUnknownProtocol      = errors.New("unknown protocol")
+	ErrAllPortsAllocated = errors.New("all ports are allocated")
+	ErrUnknownProtocol   = errors.New("unknown protocol")
 )
 
 var (
@@ -30,6 +34,34 @@ var (
 	globalMap = ipMapping{}
 )
 
+type ErrPortAlreadyAllocated struct {
+	ip   string
+	port int
+}
+
+func NewErrPortAlreadyAllocated(ip string, port int) ErrPortAlreadyAllocated {
+	return ErrPortAlreadyAllocated{
+		ip:   ip,
+		port: port,
+	}
+}
+
+func (e ErrPortAlreadyAllocated) IP() string {
+	return e.ip
+}
+
+func (e ErrPortAlreadyAllocated) Port() int {
+	return e.port
+}
+
+func (e ErrPortAlreadyAllocated) IPPort() string {
+	return fmt.Sprintf("%s:%d", e.ip, e.port)
+}
+
+func (e ErrPortAlreadyAllocated) Error() string {
+	return fmt.Sprintf("Bind for %s:%d failed: port is already allocated", e.ip, e.port)
+}
+
 func RequestPort(ip net.IP, proto string, port int) (int, error) {
 	mutex.Lock()
 	defer mutex.Unlock()
@@ -43,11 +75,11 @@ func RequestPort(ip net.IP, proto string, port int) (int, error) {
 	mapping := getOrCreate(ip)
 
 	if port > 0 {
-		if !mapping[proto][port] {
-			mapping[proto][port] = true
+		if _, ok := mapping[proto].p[port]; !ok {
+			mapping[proto].p[port] = struct{}{}
 			return port, nil
 		} else {
-			return 0, ErrPortAlreadyAllocated
+			return 0, NewErrPortAlreadyAllocated(ip.String(), port)
 		}
 	} else {
 		port, err := findPort(ip, proto)
@@ -66,8 +98,8 @@ func ReleasePort(ip net.IP, proto string, port int) error {
 
 	ip = getDefault(ip)
 
-	mapping := getOrCreate(ip)
-	delete(mapping[proto], port)
+	mapping := getOrCreate(ip)[proto]
+	delete(mapping.p, port)
 
 	return nil
 }
@@ -86,8 +118,8 @@ func getOrCreate(ip net.IP) protocolMap {
 
 	if _, ok := globalMap[ipstr]; !ok {
 		globalMap[ipstr] = protocolMap{
-			"tcp": portMap{},
-			"udp": portMap{},
+			"tcp": &portMap{p: map[int]struct{}{}, last: 0},
+			"udp": &portMap{p: map[int]struct{}{}, last: 0},
 		}
 	}
 
@@ -95,21 +127,28 @@ func getOrCreate(ip net.IP) protocolMap {
 }
 
 func findPort(ip net.IP, proto string) (int, error) {
-	port := BeginPortRange
-
-	mapping := getOrCreate(ip)
+	mapping := getOrCreate(ip)[proto]
 
-	for mapping[proto][port] {
-		port++
+	if mapping.last == 0 {
+		mapping.p[BeginPortRange] = struct{}{}
+		mapping.last = BeginPortRange
+		return BeginPortRange, nil
+	}
 
+	for port := mapping.last + 1; port != mapping.last; port++ {
 		if port > EndPortRange {
-			return 0, ErrAllPortsAllocated
+			port = BeginPortRange
+		}
+
+		if _, ok := mapping.p[port]; !ok {
+			mapping.p[port] = struct{}{}
+			mapping.last = port
+			return port, nil
 		}
-	}
 
-	mapping[proto][port] = true
+	}
 
-	return port, nil
+	return 0, ErrAllPortsAllocated
 }
 
 func getDefault(ip net.IP) net.IP {

+ 5 - 2
daemon/networkdriver/portallocator/portallocator_test.go

@@ -83,8 +83,11 @@ func TestReleaseUnreadledPort(t *testing.T) {
 	}
 
 	port, err = RequestPort(defaultIP, "tcp", 5000)
-	if err != ErrPortAlreadyAllocated {
-		t.Fatalf("Expected error %s got %s", ErrPortAlreadyAllocated, err)
+
+	switch err.(type) {
+	case ErrPortAlreadyAllocated:
+	default:
+		t.Fatalf("Expected port allocation error got %s", err)
 	}
 }
 

+ 53 - 16
daemon/networkdriver/portmapper/mapper.go

@@ -3,10 +3,12 @@ package portmapper
 import (
 	"errors"
 	"fmt"
-	"github.com/dotcloud/docker/pkg/iptables"
-	"github.com/dotcloud/docker/pkg/proxy"
 	"net"
 	"sync"
+
+	"github.com/dotcloud/docker/daemon/networkdriver/portallocator"
+	"github.com/dotcloud/docker/pkg/iptables"
+	"github.com/dotcloud/docker/pkg/proxy"
 )
 
 type mapping struct {
@@ -35,43 +37,66 @@ func SetIptablesChain(c *iptables.Chain) {
 	chain = c
 }
 
-func Map(container net.Addr, hostIP net.IP, hostPort int) error {
+func Map(container net.Addr, hostIP net.IP, hostPort int) (net.Addr, error) {
 	lock.Lock()
 	defer lock.Unlock()
 
-	var m *mapping
+	var (
+		m                 *mapping
+		err               error
+		proto             string
+		allocatedHostPort int
+	)
+
+	// release the port on any error during return.
+	defer func() {
+		if err != nil {
+			portallocator.ReleasePort(hostIP, proto, allocatedHostPort)
+		}
+	}()
+
 	switch container.(type) {
 	case *net.TCPAddr:
+		proto = "tcp"
+		if allocatedHostPort, err = portallocator.RequestPort(hostIP, proto, hostPort); err != nil {
+			return nil, err
+		}
 		m = &mapping{
-			proto:     "tcp",
-			host:      &net.TCPAddr{IP: hostIP, Port: hostPort},
+			proto:     proto,
+			host:      &net.TCPAddr{IP: hostIP, Port: allocatedHostPort},
 			container: container,
 		}
 	case *net.UDPAddr:
+		proto = "udp"
+		if allocatedHostPort, err = portallocator.RequestPort(hostIP, proto, hostPort); err != nil {
+			return nil, err
+		}
 		m = &mapping{
-			proto:     "udp",
-			host:      &net.UDPAddr{IP: hostIP, Port: hostPort},
+			proto:     proto,
+			host:      &net.UDPAddr{IP: hostIP, Port: allocatedHostPort},
 			container: container,
 		}
 	default:
-		return ErrUnknownBackendAddressType
+		err = ErrUnknownBackendAddressType
+		return nil, err
 	}
 
 	key := getKey(m.host)
 	if _, exists := currentMappings[key]; exists {
-		return ErrPortMappedForIP
+		err = ErrPortMappedForIP
+		return nil, err
 	}
 
 	containerIP, containerPort := getIPAndPort(m.container)
-	if err := forward(iptables.Add, m.proto, hostIP, hostPort, containerIP.String(), containerPort); err != nil {
-		return err
+	if err := forward(iptables.Add, m.proto, hostIP, allocatedHostPort, containerIP.String(), containerPort); err != nil {
+		return nil, err
 	}
 
 	p, err := newProxy(m.host, m.container)
 	if err != nil {
-		// need to undo the iptables rules before we reutrn
-		forward(iptables.Delete, m.proto, hostIP, hostPort, containerIP.String(), containerPort)
-		return err
+		// need to undo the iptables rules before we return
+		forward(iptables.Delete, m.proto, hostIP, allocatedHostPort, containerIP.String(), containerPort)
+		return nil, err
 	}
 
 	m.userlandProxy = p
@@ -79,7 +104,7 @@ func Map(container net.Addr, hostIP net.IP, hostPort int) error {
 
 	go p.Run()
 
-	return nil
+	return m.host, nil
 }
 
 func Unmap(host net.Addr) error {
@@ -100,6 +125,18 @@ func Unmap(host net.Addr) error {
 	if err := forward(iptables.Delete, data.proto, hostIP, hostPort, containerIP.String(), containerPort); err != nil {
 		return err
 	}
+
+	switch a := host.(type) {
+	case *net.TCPAddr:
+		if err := portallocator.ReleasePort(a.IP, "tcp", a.Port); err != nil {
+			return err
+		}
+	case *net.UDPAddr:
+		if err := portallocator.ReleasePort(a.IP, "udp", a.Port); err != nil {
+			return err
+		}
+	}
+
 	return nil
 }
 

+ 49 - 4
daemon/networkdriver/portmapper/mapper_test.go

@@ -1,6 +1,7 @@
 package portmapper
 
 import (
+	"github.com/dotcloud/docker/daemon/networkdriver/portallocator"
 	"github.com/dotcloud/docker/pkg/iptables"
 	"github.com/dotcloud/docker/pkg/proxy"
 	"net"
@@ -44,19 +45,26 @@ func TestMapPorts(t *testing.T) {
 	srcAddr1 := &net.TCPAddr{Port: 1080, IP: net.ParseIP("172.16.0.1")}
 	srcAddr2 := &net.TCPAddr{Port: 1080, IP: net.ParseIP("172.16.0.2")}
 
-	if err := Map(srcAddr1, dstIp1, 80); err != nil {
+	addrEqual := func(addr1, addr2 net.Addr) bool {
+		return (addr1.Network() == addr2.Network()) && (addr1.String() == addr2.String())
+	}
+
+	if host, err := Map(srcAddr1, dstIp1, 80); err != nil {
 		t.Fatalf("Failed to allocate port: %s", err)
+	} else if !addrEqual(dstAddr1, host) {
+		t.Fatalf("Incorrect mapping result: expected %s:%s, got %s:%s",
+			dstAddr1.String(), dstAddr1.Network(), host.String(), host.Network())
 	}
 
-	if Map(srcAddr1, dstIp1, 80) == nil {
+	if _, err := Map(srcAddr1, dstIp1, 80); err == nil {
 		t.Fatalf("Port is in use - mapping should have failed")
 	}
 
-	if Map(srcAddr2, dstIp1, 80) == nil {
+	if _, err := Map(srcAddr2, dstIp1, 80); err == nil {
 		t.Fatalf("Port is in use - mapping should have failed")
 	}
 
-	if err := Map(srcAddr2, dstIp2, 80); err != nil {
+	if _, err := Map(srcAddr2, dstIp2, 80); err != nil {
 		t.Fatalf("Failed to allocate port: %s", err)
 	}
 
@@ -105,3 +113,40 @@ func TestGetUDPIPAndPort(t *testing.T) {
 		t.Fatalf("expected port %d got %d", ep, port)
 	}
 }
+
+func TestMapAllPortsSingleInterface(t *testing.T) {
+	dstIp1 := net.ParseIP("0.0.0.0")
+	srcAddr1 := &net.TCPAddr{Port: 1080, IP: net.ParseIP("172.16.0.1")}
+
+	hosts := []net.Addr{}
+	var host net.Addr
+	var err error
+
+	defer func() {
+		for _, val := range hosts {
+			Unmap(val)
+		}
+	}()
+
+	for i := 0; i < 10; i++ {
+		for i := portallocator.BeginPortRange; i < portallocator.EndPortRange; i++ {
+			if host, err = Map(srcAddr1, dstIp1, 0); err != nil {
+				t.Fatal(err)
+			}
+
+			hosts = append(hosts, host)
+		}
+
+		if _, err := Map(srcAddr1, dstIp1, portallocator.BeginPortRange); err == nil {
+			t.Fatal("Port %d should be bound but is not", portallocator.BeginPortRange)
+		}
+
+		for _, val := range hosts {
+			if err := Unmap(val); err != nil {
+				t.Fatal(err)
+			}
+		}
+
+		hosts = []net.Addr{}
+	}
+}