فهرست منبع

Make API server datastructure

Added daemon field to it, will use it later for acces to daemon from
handlers

Signed-off-by: Alexander Morozov <lk4d4@docker.com>
Alexander Morozov 10 سال پیش
والد
کامیت
d9ed316522
8فایلهای تغییر یافته به همراه131 افزوده شده و 105 حذف شده
  1. 74 48
      api/server/server.go
  2. 13 19
      api/server/server_linux.go
  3. 16 14
      api/server/server_windows.go
  4. 2 2
      api/server/tcp_socket.go
  5. 2 2
      api/server/unix_socket.go
  6. 5 3
      docker/daemon.go
  7. 15 13
      integration/runtime_test.go
  8. 4 4
      pkg/listenbuffer/buffer.go

+ 74 - 48
api/server/server.go

@@ -40,10 +40,6 @@ import (
 	"github.com/docker/docker/utils"
 	"github.com/docker/docker/utils"
 )
 )
 
 
-var (
-	activationLock = make(chan struct{})
-)
-
 type ServerConfig struct {
 type ServerConfig struct {
 	Logging     bool
 	Logging     bool
 	EnableCors  bool
 	EnableCors  bool
@@ -57,6 +53,80 @@ type ServerConfig struct {
 	TlsKey      string
 	TlsKey      string
 }
 }
 
 
+type Server struct {
+	daemon *daemon.Daemon
+	cfg    *ServerConfig
+	router *mux.Router
+	start  chan struct{}
+
+	// TODO: delete engine
+	eng *engine.Engine
+}
+
+func New(cfg *ServerConfig, eng *engine.Engine) *Server {
+	r := createRouter(
+		eng,
+		cfg.Logging,
+		cfg.EnableCors,
+		cfg.CorsHeaders,
+		cfg.Version,
+	)
+	return &Server{
+		cfg:    cfg,
+		router: r,
+		start:  make(chan struct{}),
+		eng:    eng,
+	}
+}
+
+func (s *Server) SetDaemon(d *daemon.Daemon) {
+	s.daemon = d
+}
+
+type serverCloser interface {
+	Serve() error
+	Close() error
+}
+
+// ServeApi loops through all of the protocols sent in to docker and spawns
+// off a go routine to setup a serving http.Server for each.
+func (s *Server) ServeApi(protoAddrs []string) error {
+	var chErrors = make(chan error, len(protoAddrs))
+
+	for _, protoAddr := range protoAddrs {
+		protoAddrParts := strings.SplitN(protoAddr, "://", 2)
+		if len(protoAddrParts) != 2 {
+			return fmt.Errorf("bad format, expected PROTO://ADDR")
+		}
+		go func(proto, addr string) {
+			logrus.Infof("Listening for HTTP on %s (%s)", proto, addr)
+			srv, err := s.newServer(proto, addr)
+			if err != nil {
+				chErrors <- err
+				return
+			}
+			s.eng.OnShutdown(func() {
+				if err := srv.Close(); err != nil {
+					logrus.Error(err)
+				}
+			})
+			if err = srv.Serve(); err != nil && strings.Contains(err.Error(), "use of closed network connection") {
+				err = nil
+			}
+			chErrors <- err
+		}(protoAddrParts[0], protoAddrParts[1])
+	}
+
+	for i := 0; i < len(protoAddrs); i++ {
+		err := <-chErrors
+		if err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
 type HttpServer struct {
 type HttpServer struct {
 	srv *http.Server
 	srv *http.Server
 	l   net.Listener
 	l   net.Listener
@@ -1632,50 +1702,6 @@ func allocateDaemonPort(addr string) error {
 	return nil
 	return nil
 }
 }
 
 
-type Server interface {
-	Serve() error
-	Close() error
-}
-
-// ServeApi loops through all of the protocols sent in to docker and spawns
-// off a go routine to setup a serving http.Server for each.
-func ServeApi(protoAddrs []string, conf *ServerConfig, eng *engine.Engine) error {
-	var chErrors = make(chan error, len(protoAddrs))
-
-	for _, protoAddr := range protoAddrs {
-		protoAddrParts := strings.SplitN(protoAddr, "://", 2)
-		if len(protoAddrParts) != 2 {
-			return fmt.Errorf("bad format, expected PROTO://ADDR")
-		}
-		go func() {
-			logrus.Infof("Listening for HTTP on %s (%s)", protoAddrParts[0], protoAddrParts[1])
-			srv, err := NewServer(protoAddrParts[0], protoAddrParts[1], conf, eng)
-			if err != nil {
-				chErrors <- err
-				return
-			}
-			eng.OnShutdown(func() {
-				if err := srv.Close(); err != nil {
-					logrus.Error(err)
-				}
-			})
-			if err = srv.Serve(); err != nil && strings.Contains(err.Error(), "use of closed network connection") {
-				err = nil
-			}
-			chErrors <- err
-		}()
-	}
-
-	for i := 0; i < len(protoAddrs); i++ {
-		err := <-chErrors
-		if err != nil {
-			return err
-		}
-	}
-
-	return nil
-}
-
 func toBool(s string) bool {
 func toBool(s string) bool {
 	s = strings.ToLower(strings.TrimSpace(s))
 	s = strings.ToLower(strings.TrimSpace(s))
 	return !(s == "" || s == "0" || s == "no" || s == "false" || s == "none")
 	return !(s == "" || s == "0" || s == "no" || s == "false" || s == "none")

+ 13 - 19
api/server/server_linux.go

@@ -8,22 +8,15 @@ import (
 	"net/http"
 	"net/http"
 
 
 	"github.com/Sirupsen/logrus"
 	"github.com/Sirupsen/logrus"
-	"github.com/docker/docker/engine"
+	"github.com/docker/docker/daemon"
 	"github.com/docker/docker/pkg/systemd"
 	"github.com/docker/docker/pkg/systemd"
 )
 )
 
 
-// NewServer sets up the required Server and does protocol specific checking.
-func NewServer(proto, addr string, conf *ServerConfig, eng *engine.Engine) (Server, error) {
+// newServer sets up the required serverCloser and does protocol specific checking.
+func (s *Server) newServer(proto, addr string) (serverCloser, error) {
 	var (
 	var (
 		err error
 		err error
 		l   net.Listener
 		l   net.Listener
-		r   = createRouter(
-			eng,
-			conf.Logging,
-			conf.EnableCors,
-			conf.CorsHeaders,
-			conf.Version,
-		)
 	)
 	)
 	switch proto {
 	switch proto {
 	case "fd":
 	case "fd":
@@ -35,13 +28,13 @@ func NewServer(proto, addr string, conf *ServerConfig, eng *engine.Engine) (Serv
 		// We don't want to start serving on these sockets until the
 		// We don't want to start serving on these sockets until the
 		// daemon is initialized and installed. Otherwise required handlers
 		// daemon is initialized and installed. Otherwise required handlers
 		// won't be ready.
 		// won't be ready.
-		<-activationLock
+		<-s.start
 		// Since ListenFD will return one or more sockets we have
 		// Since ListenFD will return one or more sockets we have
 		// to create a go func to spawn off multiple serves
 		// to create a go func to spawn off multiple serves
 		for i := range ls {
 		for i := range ls {
 			listener := ls[i]
 			listener := ls[i]
 			go func() {
 			go func() {
-				httpSrv := http.Server{Handler: r}
+				httpSrv := http.Server{Handler: s.router}
 				chErrors <- httpSrv.Serve(listener)
 				chErrors <- httpSrv.Serve(listener)
 			}()
 			}()
 		}
 		}
@@ -52,17 +45,17 @@ func NewServer(proto, addr string, conf *ServerConfig, eng *engine.Engine) (Serv
 		}
 		}
 		return nil, nil
 		return nil, nil
 	case "tcp":
 	case "tcp":
-		if !conf.TlsVerify {
+		if !s.cfg.TlsVerify {
 			logrus.Warn("/!\\ DON'T BIND ON ANY IP ADDRESS WITHOUT setting -tlsverify IF YOU DON'T KNOW WHAT YOU'RE DOING /!\\")
 			logrus.Warn("/!\\ DON'T BIND ON ANY IP ADDRESS WITHOUT setting -tlsverify IF YOU DON'T KNOW WHAT YOU'RE DOING /!\\")
 		}
 		}
-		if l, err = NewTcpSocket(addr, tlsConfigFromServerConfig(conf)); err != nil {
+		if l, err = NewTcpSocket(addr, tlsConfigFromServerConfig(s.cfg), s.start); err != nil {
 			return nil, err
 			return nil, err
 		}
 		}
 		if err := allocateDaemonPort(addr); err != nil {
 		if err := allocateDaemonPort(addr); err != nil {
 			return nil, err
 			return nil, err
 		}
 		}
 	case "unix":
 	case "unix":
-		if l, err = NewUnixSocket(addr, conf.SocketGroup); err != nil {
+		if l, err = NewUnixSocket(addr, s.cfg.SocketGroup, s.start); err != nil {
 			return nil, err
 			return nil, err
 		}
 		}
 	default:
 	default:
@@ -71,19 +64,20 @@ func NewServer(proto, addr string, conf *ServerConfig, eng *engine.Engine) (Serv
 	return &HttpServer{
 	return &HttpServer{
 		&http.Server{
 		&http.Server{
 			Addr:    addr,
 			Addr:    addr,
-			Handler: r,
+			Handler: s.router,
 		},
 		},
 		l,
 		l,
 	}, nil
 	}, nil
 }
 }
 
 
-func AcceptConnections() {
+func (s *Server) AcceptConnections(d *daemon.Daemon) {
 	// Tell the init daemon we are accepting requests
 	// Tell the init daemon we are accepting requests
+	s.daemon = d
 	go systemd.SdNotify("READY=1")
 	go systemd.SdNotify("READY=1")
 	// close the lock so the listeners start accepting connections
 	// close the lock so the listeners start accepting connections
 	select {
 	select {
-	case <-activationLock:
+	case <-s.start:
 	default:
 	default:
-		close(activationLock)
+		close(s.start)
 	}
 	}
 }
 }

+ 16 - 14
api/server/server_windows.go

@@ -5,30 +5,24 @@ package server
 import (
 import (
 	"errors"
 	"errors"
 	"net"
 	"net"
+	"net/http"
 
 
 	"github.com/Sirupsen/logrus"
 	"github.com/Sirupsen/logrus"
-	"github.com/docker/docker/engine"
+	"github.com/docker/docker/daemon"
 )
 )
 
 
 // NewServer sets up the required Server and does protocol specific checking.
 // NewServer sets up the required Server and does protocol specific checking.
-func NewServer(proto, addr string, job *engine.Job) (Server, error) {
+func (s *Server) newServer(proto, addr string) (Server, error) {
 	var (
 	var (
 		err error
 		err error
 		l   net.Listener
 		l   net.Listener
-		r   = createRouter(
-			job.Eng,
-			job.GetenvBool("Logging"),
-			job.GetenvBool("EnableCors"),
-			job.Getenv("CorsHeaders"),
-			job.Getenv("Version"),
-		)
 	)
 	)
 	switch proto {
 	switch proto {
 	case "tcp":
 	case "tcp":
-		if !job.GetenvBool("TlsVerify") {
+		if !s.cfg.TlsVerify {
 			logrus.Warn("/!\\ DON'T BIND ON ANY IP ADDRESS WITHOUT setting -tlsverify IF YOU DON'T KNOW WHAT YOU'RE DOING /!\\")
 			logrus.Warn("/!\\ DON'T BIND ON ANY IP ADDRESS WITHOUT setting -tlsverify IF YOU DON'T KNOW WHAT YOU'RE DOING /!\\")
 		}
 		}
-		if l, err = NewTcpSocket(addr, tlsConfigFromJob(job)); err != nil {
+		if l, err = NewTcpSocket(addr, tlsConfigFromServerConfig(s.cfg)); err != nil {
 			return nil, err
 			return nil, err
 		}
 		}
 		if err := allocateDaemonPort(addr); err != nil {
 		if err := allocateDaemonPort(addr); err != nil {
@@ -37,13 +31,21 @@ func NewServer(proto, addr string, job *engine.Job) (Server, error) {
 	default:
 	default:
 		return nil, errors.New("Invalid protocol format. Windows only supports tcp.")
 		return nil, errors.New("Invalid protocol format. Windows only supports tcp.")
 	}
 	}
+	return &HttpServer{
+		&http.Server{
+			Addr:    addr,
+			Handler: s.router,
+		},
+		l,
+	}, nil
 }
 }
 
 
-func AcceptConnections() {
+func (s *Server) AcceptConnections(d *daemon.Daemon) {
+	s.daemon = d
 	// close the lock so the listeners start accepting connections
 	// close the lock so the listeners start accepting connections
 	select {
 	select {
-	case <-activationLock:
+	case <-s.start:
 	default:
 	default:
-		close(activationLock)
+		close(s.start)
 	}
 	}
 }
 }

+ 2 - 2
api/server/tcp_socket.go

@@ -31,8 +31,8 @@ func tlsConfigFromServerConfig(conf *ServerConfig) *tlsConfig {
 	}
 	}
 }
 }
 
 
-func NewTcpSocket(addr string, config *tlsConfig) (net.Listener, error) {
-	l, err := listenbuffer.NewListenBuffer("tcp", addr, activationLock)
+func NewTcpSocket(addr string, config *tlsConfig, activate <-chan struct{}) (net.Listener, error) {
+	l, err := listenbuffer.NewListenBuffer("tcp", addr, activate)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}

+ 2 - 2
api/server/unix_socket.go

@@ -12,13 +12,13 @@ import (
 	"github.com/docker/libcontainer/user"
 	"github.com/docker/libcontainer/user"
 )
 )
 
 
-func NewUnixSocket(path, group string) (net.Listener, error) {
+func NewUnixSocket(path, group string, activate <-chan struct{}) (net.Listener, error) {
 	if err := syscall.Unlink(path); err != nil && !os.IsNotExist(err) {
 	if err := syscall.Unlink(path); err != nil && !os.IsNotExist(err) {
 		return nil, err
 		return nil, err
 	}
 	}
 	mask := syscall.Umask(0777)
 	mask := syscall.Umask(0777)
 	defer syscall.Umask(mask)
 	defer syscall.Umask(mask)
-	l, err := listenbuffer.NewListenBuffer("unix", path, activationLock)
+	l, err := listenbuffer.NewListenBuffer("unix", path, activate)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}

+ 5 - 3
docker/daemon.go

@@ -105,12 +105,14 @@ func mainDaemon() {
 		TlsKey:      *flKey,
 		TlsKey:      *flKey,
 	}
 	}
 
 
+	api := apiserver.New(serverConfig, eng)
+
 	// The serve API routine never exits unless an error occurs
 	// The serve API routine never exits unless an error occurs
 	// We need to start it as a goroutine and wait on it so
 	// We need to start it as a goroutine and wait on it so
 	// daemon doesn't exit
 	// daemon doesn't exit
 	serveAPIWait := make(chan error)
 	serveAPIWait := make(chan error)
 	go func() {
 	go func() {
-		if err := apiserver.ServeApi(flHosts, serverConfig, eng); err != nil {
+		if err := api.ServeApi(flHosts); err != nil {
 			logrus.Errorf("ServeAPI error: %v", err)
 			logrus.Errorf("ServeAPI error: %v", err)
 			serveAPIWait <- err
 			serveAPIWait <- err
 			return
 			return
@@ -143,8 +145,8 @@ func mainDaemon() {
 	b.Install()
 	b.Install()
 
 
 	// after the daemon is done setting up we can tell the api to start
 	// after the daemon is done setting up we can tell the api to start
-	// accepting connections
-	apiserver.AcceptConnections()
+	// accepting connections with specified daemon
+	api.AcceptConnections(d)
 
 
 	// Daemon is fully initialized and handling API traffic
 	// Daemon is fully initialized and handling API traffic
 	// Wait for serve API job to complete
 	// Wait for serve API job to complete

+ 15 - 13
integration/runtime_test.go

@@ -156,6 +156,8 @@ func spawnGlobalDaemon() {
 	globalEngine = eng
 	globalEngine = eng
 	globalDaemon = mkDaemonFromEngine(eng, t)
 	globalDaemon = mkDaemonFromEngine(eng, t)
 
 
+	serverConfig := &apiserver.ServerConfig{Logging: true}
+	api := apiserver.New(serverConfig, eng)
 	// Spawn a Daemon
 	// Spawn a Daemon
 	go func() {
 	go func() {
 		logrus.Debugf("Spawning global daemon for integration tests")
 		logrus.Debugf("Spawning global daemon for integration tests")
@@ -164,8 +166,7 @@ func spawnGlobalDaemon() {
 			Host:   testDaemonAddr,
 			Host:   testDaemonAddr,
 		}
 		}
 
 
-		serverConfig := &apiserver.ServerConfig{Logging: true}
-		if err := apiserver.ServeApi([]string{listenURL.String()}, serverConfig, eng); err != nil {
+		if err := api.ServeApi([]string{listenURL.String()}); err != nil {
 			logrus.Fatalf("Unable to spawn the test daemon: %s", err)
 			logrus.Fatalf("Unable to spawn the test daemon: %s", err)
 		}
 		}
 	}()
 	}()
@@ -174,7 +175,7 @@ func spawnGlobalDaemon() {
 	// FIXME: use inmem transports instead of tcp
 	// FIXME: use inmem transports instead of tcp
 	time.Sleep(time.Second)
 	time.Sleep(time.Second)
 
 
-	apiserver.AcceptConnections()
+	api.AcceptConnections(getDaemon(eng))
 }
 }
 
 
 func spawnLegitHttpsDaemon() {
 func spawnLegitHttpsDaemon() {
@@ -204,6 +205,15 @@ func spawnHttpsDaemon(addr, cacert, cert, key string) *engine.Engine {
 
 
 	eng := newTestEngine(t, true, root)
 	eng := newTestEngine(t, true, root)
 
 
+	serverConfig := &apiserver.ServerConfig{
+		Logging:   true,
+		Tls:       true,
+		TlsVerify: true,
+		TlsCa:     cacert,
+		TlsCert:   cert,
+		TlsKey:    key,
+	}
+	api := apiserver.New(serverConfig, eng)
 	// Spawn a Daemon
 	// Spawn a Daemon
 	go func() {
 	go func() {
 		logrus.Debugf("Spawning https daemon for integration tests")
 		logrus.Debugf("Spawning https daemon for integration tests")
@@ -211,15 +221,7 @@ func spawnHttpsDaemon(addr, cacert, cert, key string) *engine.Engine {
 			Scheme: testDaemonHttpsProto,
 			Scheme: testDaemonHttpsProto,
 			Host:   addr,
 			Host:   addr,
 		}
 		}
-		serverConfig := &apiserver.ServerConfig{
-			Logging:   true,
-			Tls:       true,
-			TlsVerify: true,
-			TlsCa:     cacert,
-			TlsCert:   cert,
-			TlsKey:    key,
-		}
-		if err := apiserver.ServeApi([]string{listenURL.String()}, serverConfig, eng); err != nil {
+		if err := api.ServeApi([]string{listenURL.String()}); err != nil {
 			logrus.Fatalf("Unable to spawn the test daemon: %s", err)
 			logrus.Fatalf("Unable to spawn the test daemon: %s", err)
 		}
 		}
 	}()
 	}()
@@ -227,7 +229,7 @@ func spawnHttpsDaemon(addr, cacert, cert, key string) *engine.Engine {
 	// Give some time to ListenAndServer to actually start
 	// Give some time to ListenAndServer to actually start
 	time.Sleep(time.Second)
 	time.Sleep(time.Second)
 
 
-	apiserver.AcceptConnections()
+	api.AcceptConnections(getDaemon(eng))
 
 
 	return eng
 	return eng
 }
 }

+ 4 - 4
pkg/listenbuffer/buffer.go

@@ -32,7 +32,7 @@ import "net"
 // NewListenBuffer returns a net.Listener listening on addr with the protocol
 // NewListenBuffer returns a net.Listener listening on addr with the protocol
 // passed. The channel passed is used to activate the listenbuffer when the
 // passed. The channel passed is used to activate the listenbuffer when the
 // caller is ready to accept connections.
 // caller is ready to accept connections.
-func NewListenBuffer(proto, addr string, activate chan struct{}) (net.Listener, error) {
+func NewListenBuffer(proto, addr string, activate <-chan struct{}) (net.Listener, error) {
 	wrapped, err := net.Listen(proto, addr)
 	wrapped, err := net.Listen(proto, addr)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
@@ -46,9 +46,9 @@ func NewListenBuffer(proto, addr string, activate chan struct{}) (net.Listener,
 
 
 // defaultListener is the buffered wrapper around the net.Listener
 // defaultListener is the buffered wrapper around the net.Listener
 type defaultListener struct {
 type defaultListener struct {
-	wrapped  net.Listener  // The net.Listener wrapped by listenbuffer
-	ready    bool          // Whether the listenbuffer has been activated
-	activate chan struct{} // Channel to control activation of the listenbuffer
+	wrapped  net.Listener    // The net.Listener wrapped by listenbuffer
+	ready    bool            // Whether the listenbuffer has been activated
+	activate <-chan struct{} // Channel to control activation of the listenbuffer
 }
 }
 
 
 // Close closes the wrapped socket.
 // Close closes the wrapped socket.