Merge pull request #7548 from crosbymichael/proxy-exec

Move userland proxies out of daemon process
This commit is contained in:
Michael Crosby 2014-08-13 14:03:40 -07:00
commit fd1b563b4f
6 changed files with 178 additions and 59 deletions

View file

@ -1,14 +1,19 @@
package bridge
import (
"fmt"
"net"
"strconv"
"testing"
"github.com/docker/docker/daemon/networkdriver/portmapper"
"github.com/docker/docker/engine"
)
func init() {
// reset the new proxy command for mocking out the userland proxy in tests
portmapper.NewProxy = portmapper.NewMockProxyCommand
}
func findFreePort(t *testing.T) int {
l, err := net.Listen("tcp", ":0")
if err != nil {
@ -61,46 +66,3 @@ func TestAllocatePortDetection(t *testing.T) {
t.Fatal("Duplicate port allocation granted by AllocatePort")
}
}
func TestAllocatePortReclaim(t *testing.T) {
eng := engine.New()
eng.Logging = false
freePort := findFreePort(t)
// Init driver
job := eng.Job("initdriver")
if res := InitDriver(job); res != engine.StatusOK {
t.Fatal("Failed to initialize network driver")
}
// Allocate interface
job = eng.Job("allocate_interface", "container_id")
if res := Allocate(job); res != engine.StatusOK {
t.Fatal("Failed to allocate network interface")
}
// Occupy port
listenAddr := fmt.Sprintf(":%d", freePort)
tcpListenAddr, err := net.ResolveTCPAddr("tcp", listenAddr)
if err != nil {
t.Fatalf("Failed to resolve TCP address '%s'", listenAddr)
}
l, err := net.ListenTCP("tcp", tcpListenAddr)
if err != nil {
t.Fatalf("Fail to listen on port %d", freePort)
}
// Allocate port, expect failure
job = newPortAllocationJob(eng, freePort)
if res := AllocatePort(job); res == engine.StatusOK {
t.Fatal("Successfully allocated currently used port")
}
// Reclaim port, retry allocation
l.Close()
if res := AllocatePort(job); res != engine.StatusOK {
t.Fatal("Failed to allocate previously reclaimed port")
}
}

View file

@ -8,12 +8,11 @@ import (
"github.com/docker/docker/daemon/networkdriver/portallocator"
"github.com/docker/docker/pkg/iptables"
"github.com/docker/docker/pkg/proxy"
)
type mapping struct {
proto string
userlandProxy proxy.Proxy
userlandProxy UserlandProxy
host net.Addr
container net.Addr
}
@ -24,7 +23,8 @@ var (
// udp:ip:port
currentMappings = make(map[string]*mapping)
newProxy = proxy.NewProxy
NewProxy = NewProxyCommand
)
var (
@ -45,6 +45,7 @@ func Map(container net.Addr, hostIP net.IP, hostPort int) (host net.Addr, err er
m *mapping
proto string
allocatedHostPort int
proxy UserlandProxy
)
switch container.(type) {
@ -53,21 +54,27 @@ func Map(container net.Addr, hostIP net.IP, hostPort int) (host net.Addr, err er
if allocatedHostPort, err = portallocator.RequestPort(hostIP, proto, hostPort); err != nil {
return nil, err
}
m = &mapping{
proto: proto,
host: &net.TCPAddr{IP: hostIP, Port: allocatedHostPort},
container: container,
}
proxy = NewProxy(proto, hostIP, allocatedHostPort, container.(*net.TCPAddr).IP, container.(*net.TCPAddr).Port)
case *net.UDPAddr:
proto = "udp"
if allocatedHostPort, err = portallocator.RequestPort(hostIP, proto, hostPort); err != nil {
return nil, err
}
m = &mapping{
proto: proto,
host: &net.UDPAddr{IP: hostIP, Port: allocatedHostPort},
container: container,
}
proxy = NewProxy(proto, hostIP, allocatedHostPort, container.(*net.UDPAddr).IP, container.(*net.UDPAddr).Port)
default:
return nil, ErrUnknownBackendAddressType
}
@ -89,17 +96,15 @@ func Map(container net.Addr, hostIP net.IP, hostPort int) (host net.Addr, err er
return nil, err
}
p, err := newProxy(m.host, m.container)
if err != nil {
// need to undo the iptables rules before we return
forward(iptables.Delete, m.proto, hostIP, allocatedHostPort, containerIP.String(), containerPort)
return nil, err
}
m.userlandProxy = p
m.userlandProxy = proxy
currentMappings[key] = m
go p.Run()
if err := proxy.Start(); err != nil {
// need to undo the iptables rules before we return
forward(iptables.Delete, m.proto, hostIP, allocatedHostPort, containerIP.String(), containerPort)
return nil, err
}
return m.host, nil
}
@ -114,7 +119,8 @@ func Unmap(host net.Addr) error {
return ErrPortNotMapped
}
data.userlandProxy.Close()
data.userlandProxy.Stop()
delete(currentMappings, key)
containerIP, containerPort := getIPAndPort(data.container)

View file

@ -6,12 +6,11 @@ import (
"github.com/docker/docker/daemon/networkdriver/portallocator"
"github.com/docker/docker/pkg/iptables"
"github.com/docker/docker/pkg/proxy"
)
func init() {
// override this func to mock out the proxy server
newProxy = proxy.NewStubProxy
NewProxy = NewMockProxyCommand
}
func reset() {

View file

@ -0,0 +1,18 @@
package portmapper
import "net"
func NewMockProxyCommand(proto string, hostIP net.IP, hostPort int, containerIP net.IP, containerPort int) UserlandProxy {
return &mockProxyCommand{}
}
type mockProxyCommand struct {
}
func (p *mockProxyCommand) Start() error {
return nil
}
func (p *mockProxyCommand) Stop() error {
return nil
}

View file

@ -0,0 +1,119 @@
package portmapper
import (
"flag"
"log"
"net"
"os"
"os/exec"
"os/signal"
"strconv"
"syscall"
"github.com/docker/docker/pkg/proxy"
"github.com/docker/docker/reexec"
)
const userlandProxyCommandName = "docker-proxy"
func init() {
reexec.Register(userlandProxyCommandName, execProxy)
}
type UserlandProxy interface {
Start() error
Stop() error
}
// proxyCommand wraps an exec.Cmd to run the userland TCP and UDP
// proxies as separate processes.
type proxyCommand struct {
cmd *exec.Cmd
}
// execProxy is the reexec function that is registered to start the userland proxies
func execProxy() {
host, container := parseHostContainerAddrs()
p, err := proxy.NewProxy(host, container)
if err != nil {
log.Fatal(err)
}
go handleStopSignals(p)
// Run will block until the proxy stops
p.Run()
}
// parseHostContainerAddrs parses the flags passed on reexec to create the TCP or UDP
// net.Addrs to map the host and container ports
func parseHostContainerAddrs() (host net.Addr, container net.Addr) {
var (
proto = flag.String("proto", "tcp", "proxy protocol")
hostIP = flag.String("host-ip", "", "host ip")
hostPort = flag.Int("host-port", -1, "host port")
containerIP = flag.String("container-ip", "", "container ip")
containerPort = flag.Int("container-port", -1, "container port")
)
flag.Parse()
switch *proto {
case "tcp":
host = &net.TCPAddr{IP: net.ParseIP(*hostIP), Port: *hostPort}
container = &net.TCPAddr{IP: net.ParseIP(*containerIP), Port: *containerPort}
case "udp":
host = &net.UDPAddr{IP: net.ParseIP(*hostIP), Port: *hostPort}
container = &net.UDPAddr{IP: net.ParseIP(*containerIP), Port: *containerPort}
default:
log.Fatalf("unsupported protocol %s", *proto)
}
return host, container
}
func handleStopSignals(p proxy.Proxy) {
s := make(chan os.Signal, 10)
signal.Notify(s, os.Interrupt, syscall.SIGTERM, syscall.SIGSTOP)
for _ = range s {
p.Close()
os.Exit(0)
}
}
func NewProxyCommand(proto string, hostIP net.IP, hostPort int, containerIP net.IP, containerPort int) UserlandProxy {
args := []string{
userlandProxyCommandName,
"-proto", proto,
"-host-ip", hostIP.String(),
"-host-port", strconv.Itoa(hostPort),
"-container-ip", containerIP.String(),
"-container-port", strconv.Itoa(containerPort),
}
return &proxyCommand{
cmd: &exec.Cmd{
Path: reexec.Self(),
Args: args,
Stdout: os.Stdout,
Stderr: os.Stderr,
SysProcAttr: &syscall.SysProcAttr{
Pdeathsig: syscall.SIGTERM, // send a sigterm to the proxy if the daemon process dies
},
},
}
}
func (p *proxyCommand) Start() error {
return p.cmd.Start()
}
func (p *proxyCommand) Stop() error {
err := p.cmd.Process.Signal(os.Interrupt)
p.cmd.Wait()
return err
}

View file

@ -3,6 +3,8 @@ package reexec
import (
"fmt"
"os"
"os/exec"
"path/filepath"
)
var registeredInitializers = make(map[string]func())
@ -28,3 +30,16 @@ func Init() bool {
return false
}
// Self returns the path to the current processes binary
func Self() string {
name := os.Args[0]
if filepath.Base(name) == name {
if lp, err := exec.LookPath(name); err == nil {
name = lp
}
}
return name
}