Sfoglia il codice sorgente

Merge pull request #4168 from crosbymichael/add-listenbuffer

Hold connections until the daemon has fully loaded
Victor Vieux 11 anni fa
parent
commit
4187f4e750
4 ha cambiato i file con 118 aggiunte e 21 eliminazioni
  1. 23 3
      api/api.go
  2. 29 18
      docker/docker.go
  3. 5 0
      integration/runtime_test.go
  4. 61 0
      pkg/listenbuffer/buffer.go

+ 23 - 3
api/api.go

@@ -10,6 +10,7 @@ import (
 	"fmt"
 	"github.com/dotcloud/docker/auth"
 	"github.com/dotcloud/docker/engine"
+	"github.com/dotcloud/docker/pkg/listenbuffer"
 	"github.com/dotcloud/docker/pkg/systemd"
 	"github.com/dotcloud/docker/utils"
 	"github.com/gorilla/mux"
@@ -25,6 +26,7 @@ import (
 	"strconv"
 	"strings"
 	"syscall"
+	"time"
 )
 
 // FIXME: move code common to client and server to common.go
@@ -34,6 +36,10 @@ const (
 	DEFAULTUNIXSOCKET = "/var/run/docker.sock"
 )
 
+var (
+	activationLock chan struct{}
+)
+
 func ValidateHost(val string) (string, error) {
 	host, err := utils.ParseHost(DEFAULTHTTPHOST, DEFAULTUNIXSOCKET, val)
 	if err != nil {
@@ -1159,7 +1165,7 @@ func ListenAndServe(proto, addr string, eng *engine.Engine, logging, enableCors
 		}
 	}
 
-	l, err := net.Listen(proto, addr)
+	l, err := listenbuffer.NewListenBuffer(proto, addr, activationLock, 15*time.Minute)
 	if err != nil {
 		return err
 	}
@@ -1201,8 +1207,15 @@ func ListenAndServe(proto, addr string, eng *engine.Engine, logging, enableCors
 // ServeApi loops through all of the protocols sent in to docker and spawns
 // off a go routine to setup a serving http.Server for each.
 func ServeApi(job *engine.Job) engine.Status {
-	protoAddrs := job.Args
-	chErrors := make(chan error, len(protoAddrs))
+	var (
+		protoAddrs = job.Args
+		chErrors   = make(chan error, len(protoAddrs))
+	)
+	activationLock = make(chan struct{})
+
+	if err := job.Eng.Register("acceptconnections", AcceptConnections); err != nil {
+		return job.Error(err)
+	}
 
 	for _, protoAddr := range protoAddrs {
 		protoAddrParts := strings.SplitN(protoAddr, "://", 2)
@@ -1219,8 +1232,15 @@ func ServeApi(job *engine.Job) engine.Status {
 		}
 	}
 
+	return engine.StatusOK
+}
+
+func AcceptConnections(job *engine.Job) engine.Status {
 	// Tell the init daemon we are accepting requests
 	go systemd.SdNotify("READY=1")
 
+	// close the lock so the listeners start accepting connections
+	close(activationLock)
+
 	return engine.StatusOK
 }

+ 29 - 18
docker/docker.go

@@ -81,25 +81,36 @@ func main() {
 		if err != nil {
 			log.Fatal(err)
 		}
-		// Load plugin: httpapi
-		job := eng.Job("initserver")
-		job.Setenv("Pidfile", *pidfile)
-		job.Setenv("Root", *flRoot)
-		job.SetenvBool("AutoRestart", *flAutoRestart)
-		job.SetenvList("Dns", flDns.GetAll())
-		job.SetenvBool("EnableIptables", *flEnableIptables)
-		job.SetenvBool("EnableIpForward", *flEnableIpForward)
-		job.Setenv("BridgeIface", *bridgeName)
-		job.Setenv("BridgeIP", *bridgeIp)
-		job.Setenv("DefaultIp", *flDefaultIp)
-		job.SetenvBool("InterContainerCommunication", *flInterContainerComm)
-		job.Setenv("GraphDriver", *flGraphDriver)
-		job.SetenvInt("Mtu", *flMtu)
-		if err := job.Run(); err != nil {
-			log.Fatal(err)
-		}
+		// load the daemon in the background so we can immediately start
+		// the http api so that connections don't fail while the daemon
+		// is booting
+		go func() {
+			// Load plugin: httpapi
+			job := eng.Job("initserver")
+			job.Setenv("Pidfile", *pidfile)
+			job.Setenv("Root", *flRoot)
+			job.SetenvBool("AutoRestart", *flAutoRestart)
+			job.SetenvList("Dns", flDns.GetAll())
+			job.SetenvBool("EnableIptables", *flEnableIptables)
+			job.SetenvBool("EnableIpForward", *flEnableIpForward)
+			job.Setenv("BridgeIface", *bridgeName)
+			job.Setenv("BridgeIP", *bridgeIp)
+			job.Setenv("DefaultIp", *flDefaultIp)
+			job.SetenvBool("InterContainerCommunication", *flInterContainerComm)
+			job.Setenv("GraphDriver", *flGraphDriver)
+			job.SetenvInt("Mtu", *flMtu)
+			if err := job.Run(); err != nil {
+				log.Fatal(err)
+			}
+			// after the daemon is done setting up we can tell the api to start
+			// accepting connections
+			if err := eng.Job("acceptconnections").Run(); err != nil {
+				log.Fatal(err)
+			}
+		}()
+
 		// Serve api
-		job = eng.Job("serveapi", flHosts.GetAll()...)
+		job := eng.Job("serveapi", flHosts.GetAll()...)
 		job.SetenvBool("Logging", true)
 		job.SetenvBool("EnableCors", *flEnableCors)
 		job.Setenv("Version", dockerversion.VERSION)

+ 5 - 0
integration/runtime_test.go

@@ -171,9 +171,14 @@ func spawnGlobalDaemon() {
 			log.Fatalf("Unable to spawn the test daemon: %s", err)
 		}
 	}()
+
 	// Give some time to ListenAndServer to actually start
 	// FIXME: use inmem transports instead of tcp
 	time.Sleep(time.Second)
+
+	if err := eng.Job("acceptconnections").Run(); err != nil {
+		log.Fatalf("Unable to accept connections for test api: %s", err)
+	}
 }
 
 // FIXME: test that ImagePull(json=true) send correct json output

+ 61 - 0
pkg/listenbuffer/buffer.go

@@ -0,0 +1,61 @@
+/*
+   Package to allow go applications to immediately start
+   listening on a socket, unix, tcp, udp but hold connections
+   until the application has booted and is ready to accept them
+*/
+package listenbuffer
+
+import (
+	"fmt"
+	"net"
+	"time"
+)
+
+// NewListenBuffer returns a listener listening on addr with the protocol.  It sets the
+// timeout to wait on first connection before an error is returned
+func NewListenBuffer(proto, addr string, activate chan struct{}, timeout time.Duration) (net.Listener, error) {
+	wrapped, err := net.Listen(proto, addr)
+	if err != nil {
+		return nil, err
+	}
+
+	return &defaultListener{
+		wrapped:  wrapped,
+		activate: activate,
+		timeout:  timeout,
+	}, nil
+}
+
+type defaultListener struct {
+	wrapped  net.Listener // the real listener to wrap
+	ready    bool         // is the listner ready to start accpeting connections
+	activate chan struct{}
+	timeout  time.Duration // how long to wait before we consider this an error
+}
+
+func (l *defaultListener) Close() error {
+	return l.wrapped.Close()
+}
+
+func (l *defaultListener) Addr() net.Addr {
+	return l.wrapped.Addr()
+}
+
+func (l *defaultListener) Accept() (net.Conn, error) {
+	// if the listen has been told it is ready then we can go ahead and
+	// start returning connections
+	if l.ready {
+		return l.wrapped.Accept()
+	}
+
+	select {
+	case <-time.After(l.timeout):
+		// close the connection so any clients are disconnected
+		l.Close()
+		return nil, fmt.Errorf("timeout (%s) reached waiting for listener to become ready", l.timeout.String())
+	case <-l.activate:
+		l.ready = true
+		return l.Accept()
+	}
+	panic("unreachable")
+}