Add long-running client session endpoint

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
This commit is contained in:
Tonis Tiigi 2017-05-15 12:59:15 -07:00
parent f88626b270
commit ec7b6238c3
17 changed files with 660 additions and 50 deletions

View file

@ -27,9 +27,9 @@ type Backend struct {
}
// NewBackend creates a new build backend from components
func NewBackend(components ImageComponent, builderBackend builder.Backend, idMappings *idtools.IDMappings) *Backend {
manager := dockerfile.NewBuildManager(builderBackend, idMappings)
return &Backend{imageComponent: components, manager: manager}
func NewBackend(components ImageComponent, builderBackend builder.Backend, sg dockerfile.SessionGetter, idMappings *idtools.IDMappings) (*Backend, error) {
manager := dockerfile.NewBuildManager(builderBackend, sg, idMappings)
return &Backend{imageComponent: components, manager: manager}, nil
}
// Build builds an image from a Source

View file

@ -127,6 +127,7 @@ func newImageBuildOptions(ctx context.Context, r *http.Request) (*types.ImageBui
}
options.CacheFrom = cacheFrom
}
options.SessionID = r.FormValue("session")
return options, nil
}

View file

@ -0,0 +1,12 @@
package session
import (
"net/http"
"golang.org/x/net/context"
)
// Backend abstracts an session receiver from an http request.
type Backend interface {
HandleHTTPRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) error
}

View file

@ -0,0 +1,29 @@
package session
import "github.com/docker/docker/api/server/router"
// sessionRouter is a router to talk with the session controller
type sessionRouter struct {
backend Backend
routes []router.Route
}
// NewRouter initializes a new session router
func NewRouter(b Backend) router.Router {
r := &sessionRouter{
backend: b,
}
r.initRoutes()
return r
}
// Routes returns the available routers to the session controller
func (r *sessionRouter) Routes() []router.Route {
return r.routes
}
func (r *sessionRouter) initRoutes() {
r.routes = []router.Route{
router.Experimental(router.NewPostRoute("/session", r.startSession)),
}
}

View file

@ -0,0 +1,16 @@
package session
import (
"net/http"
apierrors "github.com/docker/docker/api/errors"
"golang.org/x/net/context"
)
func (sr *sessionRouter) startSession(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
err := sr.backend.HandleHTTPRequest(ctx, w, r)
if err != nil {
return apierrors.NewBadRequestError(err)
}
return nil
}

View file

@ -7,7 +7,7 @@ import (
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/filters"
"github.com/docker/go-units"
units "github.com/docker/go-units"
)
// CheckpointCreateOptions holds parameters to create a checkpoint from a container
@ -178,6 +178,7 @@ type ImageBuildOptions struct {
SecurityOpt []string
ExtraHosts []string // List of extra hosts
Target string
SessionID string
// TODO @jhowardmsft LCOW Support: This will require extending to include
// `Platform string`, but is ommited for now as it's hard-coded temporarily

View file

@ -16,6 +16,7 @@ import (
"github.com/docker/docker/builder/dockerfile/command"
"github.com/docker/docker/builder/dockerfile/parser"
"github.com/docker/docker/builder/remotecontext"
"github.com/docker/docker/client/session"
"github.com/docker/docker/pkg/archive"
"github.com/docker/docker/pkg/chrootarchive"
"github.com/docker/docker/pkg/idtools"
@ -40,18 +41,25 @@ var validCommitCommands = map[string]bool{
"workdir": true,
}
// SessionGetter is object used to get access to a session by uuid
type SessionGetter interface {
Get(ctx context.Context, uuid string) (session.Caller, error)
}
// BuildManager is shared across all Builder objects
type BuildManager struct {
archiver *archive.Archiver
backend builder.Backend
pathCache pathCache // TODO: make this persistent
sg SessionGetter
}
// NewBuildManager creates a BuildManager
func NewBuildManager(b builder.Backend, idMappings *idtools.IDMappings) *BuildManager {
func NewBuildManager(b builder.Backend, sg SessionGetter, idMappings *idtools.IDMappings) *BuildManager {
return &BuildManager{
backend: b,
pathCache: &syncmap.Map{},
sg: sg,
archiver: chrootarchive.NewArchiver(idMappings),
}
}
@ -84,6 +92,13 @@ func (bm *BuildManager) Build(ctx context.Context, config backend.BuildConfig) (
}
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if err := bm.initializeClientSession(ctx, cancel, config.Options); err != nil {
return nil, err
}
builderOptions := builderOptions{
Options: config.Options,
ProgressWriter: config.ProgressWriter,
@ -96,6 +111,22 @@ func (bm *BuildManager) Build(ctx context.Context, config backend.BuildConfig) (
return newBuilder(ctx, builderOptions).build(source, dockerfile)
}
func (bm *BuildManager) initializeClientSession(ctx context.Context, cancel func(), options *types.ImageBuildOptions) error {
if options.SessionID == "" || bm.sg == nil {
return nil
}
logrus.Debug("client is session enabled")
c, err := bm.sg.Get(ctx, options.SessionID)
if err != nil {
return err
}
go func() {
<-c.Context().Done()
cancel()
}()
return nil
}
// builderOptions are the dependencies required by the builder
type builderOptions struct {
Options *types.ImageBuildOptions

View file

@ -1,11 +1,9 @@
package client
import (
"bytes"
"bufio"
"crypto/tls"
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/http/httputil"
@ -16,6 +14,7 @@ import (
"github.com/docker/docker/api/types"
"github.com/docker/docker/pkg/tlsconfig"
"github.com/docker/go-connections/sockets"
"github.com/pkg/errors"
"golang.org/x/net/context"
)
@ -48,49 +47,12 @@ func (cli *Client) postHijacked(ctx context.Context, path string, query url.Valu
}
req = cli.addHeaders(req, headers)
req.Host = cli.addr
req.Header.Set("Connection", "Upgrade")
req.Header.Set("Upgrade", "tcp")
conn, err := dial(cli.proto, cli.addr, resolveTLSConfig(cli.client.Transport))
if err != nil {
if strings.Contains(err.Error(), "connection refused") {
return types.HijackedResponse{}, fmt.Errorf("Cannot connect to the Docker daemon. Is 'docker daemon' running on this host?")
}
return types.HijackedResponse{}, err
}
// When we set up a TCP connection for hijack, there could be long periods
// of inactivity (a long running command with no output) that in certain
// network setups may cause ECONNTIMEOUT, leaving the client in an unknown
// state. Setting TCP KeepAlive on the socket connection will prohibit
// ECONNTIMEOUT unless the socket connection truly is broken
if tcpConn, ok := conn.(*net.TCPConn); ok {
tcpConn.SetKeepAlive(true)
tcpConn.SetKeepAlivePeriod(30 * time.Second)
}
clientconn := httputil.NewClientConn(conn, nil)
defer clientconn.Close()
// Server hijacks the connection, error 'connection closed' expected
resp, err := clientconn.Do(req)
conn, err := cli.setupHijackConn(req, "tcp")
if err != nil {
return types.HijackedResponse{}, err
}
defer resp.Body.Close()
switch resp.StatusCode {
case http.StatusOK, http.StatusSwitchingProtocols:
rwc, br := clientconn.Hijack()
return types.HijackedResponse{Conn: rwc, Reader: br}, err
}
errbody, err := ioutil.ReadAll(resp.Body)
if err != nil {
return types.HijackedResponse{}, err
}
return types.HijackedResponse{}, fmt.Errorf("Error response from daemon: %s", bytes.TrimSpace(errbody))
return types.HijackedResponse{Conn: conn, Reader: bufio.NewReader(conn)}, err
}
func tlsDial(network, addr string, config *tls.Config) (net.Conn, error) {
@ -189,3 +151,56 @@ func dial(proto, addr string, tlsConfig *tls.Config) (net.Conn, error) {
}
return net.Dial(proto, addr)
}
func (cli *Client) setupHijackConn(req *http.Request, proto string) (net.Conn, error) {
req.Host = cli.addr
req.Header.Set("Connection", "Upgrade")
req.Header.Set("Upgrade", proto)
conn, err := dial(cli.proto, cli.addr, resolveTLSConfig(cli.client.Transport))
if err != nil {
return nil, errors.Wrap(err, "cannot connect to the Docker daemon. Is 'docker daemon' running on this host?")
}
// When we set up a TCP connection for hijack, there could be long periods
// of inactivity (a long running command with no output) that in certain
// network setups may cause ECONNTIMEOUT, leaving the client in an unknown
// state. Setting TCP KeepAlive on the socket connection will prohibit
// ECONNTIMEOUT unless the socket connection truly is broken
if tcpConn, ok := conn.(*net.TCPConn); ok {
tcpConn.SetKeepAlive(true)
tcpConn.SetKeepAlivePeriod(30 * time.Second)
}
clientconn := httputil.NewClientConn(conn, nil)
defer clientconn.Close()
// Server hijacks the connection, error 'connection closed' expected
resp, err := clientconn.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusSwitchingProtocols {
resp.Body.Close()
return nil, fmt.Errorf("unable to upgrade to %s, received %d", proto, resp.StatusCode)
}
c, br := clientconn.Hijack()
if br.Buffered() > 0 {
// If there is buffered content, wrap the connection
c = &hijackedConn{c, br}
} else {
br.Reset(nil)
}
return c, nil
}
type hijackedConn struct {
net.Conn
r *bufio.Reader
}
func (c *hijackedConn) Read(b []byte) (int, error) {
return c.r.Read(b)
}

View file

@ -120,6 +120,9 @@ func (cli *Client) imageBuildOptionsToQuery(options types.ImageBuildOptions) (ur
return query, err
}
query.Set("cachefrom", string(cacheFromJSON))
if options.SessionID != "" {
query.Set("session", options.SessionID)
}
return query, nil
}

View file

@ -2,6 +2,7 @@ package client
import (
"io"
"net"
"time"
"github.com/docker/docker/api/types"
@ -35,6 +36,7 @@ type CommonAPIClient interface {
ServerVersion(ctx context.Context) (types.Version, error)
NegotiateAPIVersion(ctx context.Context)
NegotiateAPIVersionPing(types.Ping)
DialSession(ctx context.Context, proto string, meta map[string][]string) (net.Conn, error)
}
// ContainerAPIClient defines API client methods for the containers

19
client/session.go Normal file
View file

@ -0,0 +1,19 @@
package client
import (
"net"
"net/http"
"golang.org/x/net/context"
)
// DialSession returns a connection that can be used communication with daemon
func (cli *Client) DialSession(ctx context.Context, proto string, meta map[string][]string) (net.Conn, error) {
req, err := http.NewRequest("POST", "/session", nil)
if err != nil {
return nil, err
}
req = cli.addHeaders(req, meta)
return cli.setupHijackConn(req, proto)
}

62
client/session/grpc.go Normal file
View file

@ -0,0 +1,62 @@
package session
import (
"net"
"time"
"github.com/Sirupsen/logrus"
"github.com/pkg/errors"
"golang.org/x/net/context"
"golang.org/x/net/http2"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"
)
func serve(ctx context.Context, grpcServer *grpc.Server, conn net.Conn) {
go func() {
<-ctx.Done()
conn.Close()
}()
logrus.Debugf("serving grpc connection")
(&http2.Server{}).ServeConn(conn, &http2.ServeConnOpts{Handler: grpcServer})
}
func grpcClientConn(ctx context.Context, conn net.Conn) (context.Context, *grpc.ClientConn, error) {
dialOpt := grpc.WithDialer(func(addr string, d time.Duration) (net.Conn, error) {
return conn, nil
})
cc, err := grpc.DialContext(ctx, "", dialOpt, grpc.WithInsecure())
if err != nil {
return nil, nil, errors.Wrap(err, "failed to create grpc client")
}
ctx, cancel := context.WithCancel(ctx)
go monitorHealth(ctx, cc, cancel)
return ctx, cc, nil
}
func monitorHealth(ctx context.Context, cc *grpc.ClientConn, cancelConn func()) {
defer cancelConn()
defer cc.Close()
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
healthClient := grpc_health_v1.NewHealthClient(cc)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
<-ticker.C
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
_, err := healthClient.Check(ctx, &grpc_health_v1.HealthCheckRequest{})
cancel()
if err != nil {
return
}
}
}
}

187
client/session/manager.go Normal file
View file

@ -0,0 +1,187 @@
package session
import (
"net/http"
"strings"
"sync"
"github.com/pkg/errors"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
// Caller can invoke requests on the session
type Caller interface {
Context() context.Context
Supports(method string) bool
Conn() *grpc.ClientConn
Name() string
SharedKey() string
}
type client struct {
Session
cc *grpc.ClientConn
supported map[string]struct{}
}
// Manager is a controller for accessing currently active sessions
type Manager struct {
sessions map[string]*client
mu sync.Mutex
updateCondition *sync.Cond
}
// NewManager returns a new Manager
func NewManager() (*Manager, error) {
sm := &Manager{
sessions: make(map[string]*client),
}
sm.updateCondition = sync.NewCond(&sm.mu)
return sm, nil
}
// HandleHTTPRequest handles an incoming HTTP request
func (sm *Manager) HandleHTTPRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
hijacker, ok := w.(http.Hijacker)
if !ok {
return errors.New("handler does not support hijack")
}
uuid := r.Header.Get(headerSessionUUID)
name := r.Header.Get(headerSessionName)
sharedKey := r.Header.Get(headerSessionSharedKey)
proto := r.Header.Get("Upgrade")
sm.mu.Lock()
if _, ok := sm.sessions[uuid]; ok {
sm.mu.Unlock()
return errors.Errorf("session %s already exists", uuid)
}
if proto == "" {
sm.mu.Unlock()
return errors.New("no upgrade proto in request")
}
if proto != "h2c" {
sm.mu.Unlock()
return errors.Errorf("protocol %s not supported", proto)
}
conn, _, err := hijacker.Hijack()
if err != nil {
sm.mu.Unlock()
return errors.Wrap(err, "failed to hijack connection")
}
resp := &http.Response{
StatusCode: http.StatusSwitchingProtocols,
ProtoMajor: 1,
ProtoMinor: 1,
Header: http.Header{},
}
resp.Header.Set("Connection", "Upgrade")
resp.Header.Set("Upgrade", proto)
// set raw mode
conn.Write([]byte{})
resp.Write(conn)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
ctx, cc, err := grpcClientConn(ctx, conn)
if err != nil {
sm.mu.Unlock()
return err
}
c := &client{
Session: Session{
uuid: uuid,
name: name,
sharedKey: sharedKey,
ctx: ctx,
cancelCtx: cancel,
done: make(chan struct{}),
},
cc: cc,
supported: make(map[string]struct{}),
}
for _, m := range r.Header[headerSessionMethod] {
c.supported[strings.ToLower(m)] = struct{}{}
}
sm.sessions[uuid] = c
sm.updateCondition.Broadcast()
sm.mu.Unlock()
defer func() {
sm.mu.Lock()
delete(sm.sessions, uuid)
sm.mu.Unlock()
}()
<-c.ctx.Done()
conn.Close()
close(c.done)
return nil
}
// Get returns a session by UUID
func (sm *Manager) Get(ctx context.Context, uuid string) (Caller, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
select {
case <-ctx.Done():
sm.updateCondition.Broadcast()
}
}()
var c *client
sm.mu.Lock()
for {
select {
case <-ctx.Done():
sm.mu.Unlock()
return nil, errors.Wrapf(ctx.Err(), "no active session for %s", uuid)
default:
}
var ok bool
c, ok = sm.sessions[uuid]
if !ok || c.closed() {
sm.updateCondition.Wait()
continue
}
sm.mu.Unlock()
break
}
return c, nil
}
func (c *client) Context() context.Context {
return c.context()
}
func (c *client) Name() string {
return c.name
}
func (c *client) SharedKey() string {
return c.sharedKey
}
func (c *client) Supports(url string) bool {
_, ok := c.supported[strings.ToLower(url)]
return ok
}
func (c *client) Conn() *grpc.ClientConn {
return c.cc
}

117
client/session/session.go Normal file
View file

@ -0,0 +1,117 @@
package session
import (
"net"
"github.com/docker/docker/pkg/stringid"
"github.com/pkg/errors"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
)
const (
headerSessionUUID = "X-Docker-Expose-Session-Uuid"
headerSessionName = "X-Docker-Expose-Session-Name"
headerSessionSharedKey = "X-Docker-Expose-Session-Sharedkey"
headerSessionMethod = "X-Docker-Expose-Session-Grpc-Method"
)
// Dialer returns a connection that can be used by the session
type Dialer func(ctx context.Context, proto string, meta map[string][]string) (net.Conn, error)
// Attachable defines a feature that can be expsed on a session
type Attachable interface {
Register(*grpc.Server)
}
// Session is a long running connection between client and a daemon
type Session struct {
uuid string
name string
sharedKey string
ctx context.Context
cancelCtx func()
done chan struct{}
grpcServer *grpc.Server
}
// NewSession returns a new long running session
func NewSession(name, sharedKey string) (*Session, error) {
uuid := stringid.GenerateRandomID()
s := &Session{
uuid: uuid,
name: name,
sharedKey: sharedKey,
grpcServer: grpc.NewServer(),
}
grpc_health_v1.RegisterHealthServer(s.grpcServer, health.NewServer())
return s, nil
}
// Allow enable a given service to be reachable through the grpc session
func (s *Session) Allow(a Attachable) {
a.Register(s.grpcServer)
}
// UUID returns unique identifier for the session
func (s *Session) UUID() string {
return s.uuid
}
// Run activates the session
func (s *Session) Run(ctx context.Context, dialer Dialer) error {
ctx, cancel := context.WithCancel(ctx)
s.cancelCtx = cancel
s.done = make(chan struct{})
defer cancel()
defer close(s.done)
meta := make(map[string][]string)
meta[headerSessionUUID] = []string{s.uuid}
meta[headerSessionName] = []string{s.name}
meta[headerSessionSharedKey] = []string{s.sharedKey}
for name, svc := range s.grpcServer.GetServiceInfo() {
for _, method := range svc.Methods {
meta[headerSessionMethod] = append(meta[headerSessionMethod], MethodURL(name, method.Name))
}
}
conn, err := dialer(ctx, "h2c", meta)
if err != nil {
return errors.Wrap(err, "failed to dial gRPC")
}
serve(ctx, s.grpcServer, conn)
return nil
}
// Close closes the session
func (s *Session) Close() error {
if s.cancelCtx != nil && s.done != nil {
s.cancelCtx()
<-s.done
}
return nil
}
func (s *Session) context() context.Context {
return s.ctx
}
func (s *Session) closed() bool {
select {
case <-s.context().Done():
return true
default:
return false
}
}
// MethodURL returns a gRPC method URL for service and method name
func MethodURL(s, m string) string {
return "/" + s + "/" + m
}

View file

@ -23,10 +23,12 @@ import (
"github.com/docker/docker/api/server/router/image"
"github.com/docker/docker/api/server/router/network"
pluginrouter "github.com/docker/docker/api/server/router/plugin"
sessionrouter "github.com/docker/docker/api/server/router/session"
swarmrouter "github.com/docker/docker/api/server/router/swarm"
systemrouter "github.com/docker/docker/api/server/router/system"
"github.com/docker/docker/api/server/router/volume"
"github.com/docker/docker/cli/debug"
"github.com/docker/docker/client/session"
"github.com/docker/docker/daemon"
"github.com/docker/docker/daemon/cluster"
"github.com/docker/docker/daemon/config"
@ -46,6 +48,7 @@ import (
"github.com/docker/docker/runconfig"
"github.com/docker/go-connections/tlsconfig"
swarmapi "github.com/docker/swarmkit/api"
"github.com/pkg/errors"
"github.com/spf13/pflag"
)
@ -215,6 +218,11 @@ func (cli *DaemonCli) start(opts *daemonOptions) (err error) {
logrus.Warnln("LCOW support is enabled - this feature is incomplete")
}
sm, err := session.NewManager()
if err != nil {
return errors.Wrap(err, "failed to create sessionmanager")
}
d, err := daemon.NewDaemon(cli.Config, registryService, containerdRemote, pluginStore)
if err != nil {
return fmt.Errorf("Error starting daemon: %v", err)
@ -260,6 +268,11 @@ func (cli *DaemonCli) start(opts *daemonOptions) (err error) {
logrus.Fatalf("Error starting cluster component: %v", err)
}
bb, err := buildbackend.NewBackend(d, d, sm, d.IDMappings())
if err != nil {
return errors.Wrap(err, "failed to create buildmanager")
}
// Restart all autostart containers which has a swarm endpoint
// and is not yet running now that we have successfully
// initialized the cluster.
@ -269,7 +282,7 @@ func (cli *DaemonCli) start(opts *daemonOptions) (err error) {
cli.d = d
initRouter(api, d, c)
initRouter(api, d, c, sm, bb)
// process cluster change notifications
watchCtx, cancel := context.WithCancel(context.Background())
@ -442,7 +455,7 @@ func loadDaemonCliConfig(opts *daemonOptions) (*config.Config, error) {
return conf, nil
}
func initRouter(s *apiserver.Server, d *daemon.Daemon, c *cluster.Cluster) {
func initRouter(s *apiserver.Server, d *daemon.Daemon, c *cluster.Cluster, sm *session.Manager, bb *buildbackend.Backend) {
decoder := runconfig.ContainerDecoder{}
routers := []router.Router{
@ -452,7 +465,8 @@ func initRouter(s *apiserver.Server, d *daemon.Daemon, c *cluster.Cluster) {
image.NewRouter(d, decoder),
systemrouter.NewRouter(d, c),
volume.NewRouter(d),
build.NewRouter(buildbackend.NewBackend(d, d, d.IDMappings()), d),
build.NewRouter(bb, d),
sessionrouter.NewRouter(sm),
swarmrouter.NewRouter(c),
pluginrouter.NewRouter(d.PluginManager()),
distributionrouter.NewRouter(d),

View file

@ -0,0 +1,49 @@
package main
import (
"net/http"
"github.com/docker/docker/integration-cli/checker"
"github.com/docker/docker/integration-cli/request"
"github.com/docker/docker/pkg/testutil"
"github.com/go-check/check"
)
func (s *DockerSuite) TestSessionCreate(c *check.C) {
testRequires(c, ExperimentalDaemon)
res, body, err := request.Post("/session", func(r *http.Request) error {
r.Header.Set("X-Docker-Expose-Session-Uuid", "testsessioncreate") // so we don't block default name if something else is using it
r.Header.Set("Upgrade", "h2c")
return nil
})
c.Assert(err, checker.IsNil)
c.Assert(res.StatusCode, checker.Equals, http.StatusSwitchingProtocols)
c.Assert(res.Header.Get("Upgrade"), checker.Equals, "h2c")
c.Assert(body.Close(), checker.IsNil)
}
func (s *DockerSuite) TestSessionCreateWithBadUpgrade(c *check.C) {
testRequires(c, ExperimentalDaemon)
res, body, err := request.Post("/session")
c.Assert(err, checker.IsNil)
c.Assert(res.StatusCode, checker.Equals, http.StatusBadRequest)
buf, err := testutil.ReadBody(body)
c.Assert(err, checker.IsNil)
out := string(buf)
c.Assert(out, checker.Contains, "no upgrade")
res, body, err = request.Post("/session", func(r *http.Request) error {
r.Header.Set("Upgrade", "foo")
return nil
})
c.Assert(err, checker.IsNil)
c.Assert(res.StatusCode, checker.Equals, http.StatusBadRequest)
buf, err = testutil.ReadBody(body)
c.Assert(err, checker.IsNil)
out = string(buf)
c.Assert(out, checker.Contains, "not supported")
}

52
vendor/google.golang.org/grpc/health/health.go generated vendored Normal file
View file

@ -0,0 +1,52 @@
// Package health provides some utility functions to health-check a server. The implementation
// is based on protobuf. Users need to write their own implementations if other IDLs are used.
package health
import (
"sync"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)
// Server implements `service Health`.
type Server struct {
mu sync.Mutex
// statusMap stores the serving status of the services this Server monitors.
statusMap map[string]healthpb.HealthCheckResponse_ServingStatus
}
// NewServer returns a new Server.
func NewServer() *Server {
return &Server{
statusMap: make(map[string]healthpb.HealthCheckResponse_ServingStatus),
}
}
// Check implements `service Health`.
func (s *Server) Check(ctx context.Context, in *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
if in.Service == "" {
// check the server overall health status.
return &healthpb.HealthCheckResponse{
Status: healthpb.HealthCheckResponse_SERVING,
}, nil
}
if status, ok := s.statusMap[in.Service]; ok {
return &healthpb.HealthCheckResponse{
Status: status,
}, nil
}
return nil, grpc.Errorf(codes.NotFound, "unknown service")
}
// SetServingStatus is called when need to reset the serving status of a service
// or insert a new service entry into the statusMap.
func (s *Server) SetServingStatus(service string, status healthpb.HealthCheckResponse_ServingStatus) {
s.mu.Lock()
s.statusMap[service] = status
s.mu.Unlock()
}