|
@@ -73,12 +73,14 @@ func init() {
|
|
|
internal.DrainServerTransports = func(srv *Server, addr string) {
|
|
|
srv.drainServerTransports(addr)
|
|
|
}
|
|
|
- internal.AddExtraServerOptions = func(opt ...ServerOption) {
|
|
|
- extraServerOptions = opt
|
|
|
+ internal.AddGlobalServerOptions = func(opt ...ServerOption) {
|
|
|
+ extraServerOptions = append(extraServerOptions, opt...)
|
|
|
}
|
|
|
- internal.ClearExtraServerOptions = func() {
|
|
|
+ internal.ClearGlobalServerOptions = func() {
|
|
|
extraServerOptions = nil
|
|
|
}
|
|
|
+ internal.BinaryLogger = binaryLogger
|
|
|
+ internal.JoinServerOptions = newJoinServerOption
|
|
|
}
|
|
|
|
|
|
var statusOK = status.New(codes.OK, "")
|
|
@@ -155,6 +157,7 @@ type serverOptions struct {
|
|
|
streamInt StreamServerInterceptor
|
|
|
chainUnaryInts []UnaryServerInterceptor
|
|
|
chainStreamInts []StreamServerInterceptor
|
|
|
+ binaryLogger binarylog.Logger
|
|
|
inTapHandle tap.ServerInHandle
|
|
|
statsHandlers []stats.Handler
|
|
|
maxConcurrentStreams uint32
|
|
@@ -190,7 +193,7 @@ type ServerOption interface {
|
|
|
// EmptyServerOption does not alter the server configuration. It can be embedded
|
|
|
// in another structure to build custom server options.
|
|
|
//
|
|
|
-// Experimental
|
|
|
+// # Experimental
|
|
|
//
|
|
|
// Notice: This type is EXPERIMENTAL and may be changed or removed in a
|
|
|
// later release.
|
|
@@ -214,6 +217,22 @@ func newFuncServerOption(f func(*serverOptions)) *funcServerOption {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+// joinServerOption provides a way to combine arbitrary number of server
|
|
|
+// options into one.
|
|
|
+type joinServerOption struct {
|
|
|
+ opts []ServerOption
|
|
|
+}
|
|
|
+
|
|
|
+func (mdo *joinServerOption) apply(do *serverOptions) {
|
|
|
+ for _, opt := range mdo.opts {
|
|
|
+ opt.apply(do)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func newJoinServerOption(opts ...ServerOption) ServerOption {
|
|
|
+ return &joinServerOption{opts: opts}
|
|
|
+}
|
|
|
+
|
|
|
// WriteBufferSize determines how much data can be batched before doing a write on the wire.
|
|
|
// The corresponding memory allocation for this buffer will be twice the size to keep syscalls low.
|
|
|
// The default value for this buffer is 32KB.
|
|
@@ -305,7 +324,7 @@ func CustomCodec(codec Codec) ServerOption {
|
|
|
// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
|
|
|
// Will be supported throughout 1.x.
|
|
|
//
|
|
|
-// Experimental
|
|
|
+// # Experimental
|
|
|
//
|
|
|
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
|
|
|
// later release.
|
|
@@ -426,7 +445,7 @@ func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOptio
|
|
|
// InTapHandle returns a ServerOption that sets the tap handle for all the server
|
|
|
// transport to be created. Only one can be installed.
|
|
|
//
|
|
|
-// Experimental
|
|
|
+// # Experimental
|
|
|
//
|
|
|
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
|
|
|
// later release.
|
|
@@ -442,10 +461,24 @@ func InTapHandle(h tap.ServerInHandle) ServerOption {
|
|
|
// StatsHandler returns a ServerOption that sets the stats handler for the server.
|
|
|
func StatsHandler(h stats.Handler) ServerOption {
|
|
|
return newFuncServerOption(func(o *serverOptions) {
|
|
|
+ if h == nil {
|
|
|
+ logger.Error("ignoring nil parameter in grpc.StatsHandler ServerOption")
|
|
|
+ // Do not allow a nil stats handler, which would otherwise cause
|
|
|
+ // panics.
|
|
|
+ return
|
|
|
+ }
|
|
|
o.statsHandlers = append(o.statsHandlers, h)
|
|
|
})
|
|
|
}
|
|
|
|
|
|
+// binaryLogger returns a ServerOption that can set the binary logger for the
|
|
|
+// server.
|
|
|
+func binaryLogger(bl binarylog.Logger) ServerOption {
|
|
|
+ return newFuncServerOption(func(o *serverOptions) {
|
|
|
+ o.binaryLogger = bl
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
// UnknownServiceHandler returns a ServerOption that allows for adding a custom
|
|
|
// unknown service handler. The provided method is a bidi-streaming RPC service
|
|
|
// handler that will be invoked instead of returning the "unimplemented" gRPC
|
|
@@ -469,7 +502,7 @@ func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
|
|
|
// new connections. If this is not set, the default is 120 seconds. A zero or
|
|
|
// negative value will result in an immediate timeout.
|
|
|
//
|
|
|
-// Experimental
|
|
|
+// # Experimental
|
|
|
//
|
|
|
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
|
|
|
// later release.
|
|
@@ -490,7 +523,7 @@ func MaxHeaderListSize(s uint32) ServerOption {
|
|
|
// HeaderTableSize returns a ServerOption that sets the size of dynamic
|
|
|
// header table for stream.
|
|
|
//
|
|
|
-// Experimental
|
|
|
+// # Experimental
|
|
|
//
|
|
|
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
|
|
|
// later release.
|
|
@@ -505,7 +538,7 @@ func HeaderTableSize(s uint32) ServerOption {
|
|
|
// zero (default) will disable workers and spawn a new goroutine for each
|
|
|
// stream.
|
|
|
//
|
|
|
-// Experimental
|
|
|
+// # Experimental
|
|
|
//
|
|
|
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
|
|
|
// later release.
|
|
@@ -898,7 +931,7 @@ func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {
|
|
|
if err != credentials.ErrConnDispatched {
|
|
|
// Don't log on ErrConnDispatched and io.EOF to prevent log spam.
|
|
|
if err != io.EOF {
|
|
|
- channelz.Warning(logger, s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err)
|
|
|
+ channelz.Info(logger, s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err)
|
|
|
}
|
|
|
c.Close()
|
|
|
}
|
|
@@ -956,19 +989,19 @@ var _ http.Handler = (*Server)(nil)
|
|
|
// To share one port (such as 443 for https) between gRPC and an
|
|
|
// existing http.Handler, use a root http.Handler such as:
|
|
|
//
|
|
|
-// if r.ProtoMajor == 2 && strings.HasPrefix(
|
|
|
-// r.Header.Get("Content-Type"), "application/grpc") {
|
|
|
-// grpcServer.ServeHTTP(w, r)
|
|
|
-// } else {
|
|
|
-// yourMux.ServeHTTP(w, r)
|
|
|
-// }
|
|
|
+// if r.ProtoMajor == 2 && strings.HasPrefix(
|
|
|
+// r.Header.Get("Content-Type"), "application/grpc") {
|
|
|
+// grpcServer.ServeHTTP(w, r)
|
|
|
+// } else {
|
|
|
+// yourMux.ServeHTTP(w, r)
|
|
|
+// }
|
|
|
//
|
|
|
// Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
|
|
|
// separate from grpc-go's HTTP/2 server. Performance and features may vary
|
|
|
// between the two paths. ServeHTTP does not support some gRPC features
|
|
|
// available through grpc-go's HTTP/2 server.
|
|
|
//
|
|
|
-// Experimental
|
|
|
+// # Experimental
|
|
|
//
|
|
|
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
|
|
|
// later release.
|
|
@@ -1193,9 +1226,16 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
|
|
}
|
|
|
}()
|
|
|
}
|
|
|
-
|
|
|
- binlog := binarylog.GetMethodLogger(stream.Method())
|
|
|
- if binlog != nil {
|
|
|
+ var binlogs []binarylog.MethodLogger
|
|
|
+ if ml := binarylog.GetMethodLogger(stream.Method()); ml != nil {
|
|
|
+ binlogs = append(binlogs, ml)
|
|
|
+ }
|
|
|
+ if s.opts.binaryLogger != nil {
|
|
|
+ if ml := s.opts.binaryLogger.GetMethodLogger(stream.Method()); ml != nil {
|
|
|
+ binlogs = append(binlogs, ml)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if len(binlogs) != 0 {
|
|
|
ctx := stream.Context()
|
|
|
md, _ := metadata.FromIncomingContext(ctx)
|
|
|
logEntry := &binarylog.ClientHeader{
|
|
@@ -1215,7 +1255,9 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
|
|
if peer, ok := peer.FromContext(ctx); ok {
|
|
|
logEntry.PeerAddr = peer.Addr
|
|
|
}
|
|
|
- binlog.Log(logEntry)
|
|
|
+ for _, binlog := range binlogs {
|
|
|
+ binlog.Log(logEntry)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// comp and cp are used for compression. decomp and dc are used for
|
|
@@ -1255,7 +1297,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
|
|
}
|
|
|
|
|
|
var payInfo *payloadInfo
|
|
|
- if len(shs) != 0 || binlog != nil {
|
|
|
+ if len(shs) != 0 || len(binlogs) != 0 {
|
|
|
payInfo = &payloadInfo{}
|
|
|
}
|
|
|
d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
|
|
@@ -1281,10 +1323,13 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
|
|
Length: len(d),
|
|
|
})
|
|
|
}
|
|
|
- if binlog != nil {
|
|
|
- binlog.Log(&binarylog.ClientMessage{
|
|
|
+ if len(binlogs) != 0 {
|
|
|
+ cm := &binarylog.ClientMessage{
|
|
|
Message: d,
|
|
|
- })
|
|
|
+ }
|
|
|
+ for _, binlog := range binlogs {
|
|
|
+ binlog.Log(cm)
|
|
|
+ }
|
|
|
}
|
|
|
if trInfo != nil {
|
|
|
trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
|
|
@@ -1308,18 +1353,24 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
|
|
if e := t.WriteStatus(stream, appStatus); e != nil {
|
|
|
channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
|
|
|
}
|
|
|
- if binlog != nil {
|
|
|
+ if len(binlogs) != 0 {
|
|
|
if h, _ := stream.Header(); h.Len() > 0 {
|
|
|
// Only log serverHeader if there was header. Otherwise it can
|
|
|
// be trailer only.
|
|
|
- binlog.Log(&binarylog.ServerHeader{
|
|
|
+ sh := &binarylog.ServerHeader{
|
|
|
Header: h,
|
|
|
- })
|
|
|
+ }
|
|
|
+ for _, binlog := range binlogs {
|
|
|
+ binlog.Log(sh)
|
|
|
+ }
|
|
|
}
|
|
|
- binlog.Log(&binarylog.ServerTrailer{
|
|
|
+ st := &binarylog.ServerTrailer{
|
|
|
Trailer: stream.Trailer(),
|
|
|
Err: appErr,
|
|
|
- })
|
|
|
+ }
|
|
|
+ for _, binlog := range binlogs {
|
|
|
+ binlog.Log(st)
|
|
|
+ }
|
|
|
}
|
|
|
return appErr
|
|
|
}
|
|
@@ -1345,26 +1396,34 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
|
|
panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
|
|
|
}
|
|
|
}
|
|
|
- if binlog != nil {
|
|
|
+ if len(binlogs) != 0 {
|
|
|
h, _ := stream.Header()
|
|
|
- binlog.Log(&binarylog.ServerHeader{
|
|
|
+ sh := &binarylog.ServerHeader{
|
|
|
Header: h,
|
|
|
- })
|
|
|
- binlog.Log(&binarylog.ServerTrailer{
|
|
|
+ }
|
|
|
+ st := &binarylog.ServerTrailer{
|
|
|
Trailer: stream.Trailer(),
|
|
|
Err: appErr,
|
|
|
- })
|
|
|
+ }
|
|
|
+ for _, binlog := range binlogs {
|
|
|
+ binlog.Log(sh)
|
|
|
+ binlog.Log(st)
|
|
|
+ }
|
|
|
}
|
|
|
return err
|
|
|
}
|
|
|
- if binlog != nil {
|
|
|
+ if len(binlogs) != 0 {
|
|
|
h, _ := stream.Header()
|
|
|
- binlog.Log(&binarylog.ServerHeader{
|
|
|
+ sh := &binarylog.ServerHeader{
|
|
|
Header: h,
|
|
|
- })
|
|
|
- binlog.Log(&binarylog.ServerMessage{
|
|
|
+ }
|
|
|
+ sm := &binarylog.ServerMessage{
|
|
|
Message: reply,
|
|
|
- })
|
|
|
+ }
|
|
|
+ for _, binlog := range binlogs {
|
|
|
+ binlog.Log(sh)
|
|
|
+ binlog.Log(sm)
|
|
|
+ }
|
|
|
}
|
|
|
if channelz.IsOn() {
|
|
|
t.IncrMsgSent()
|
|
@@ -1376,11 +1435,14 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
|
|
// Should the logging be in WriteStatus? Should we ignore the WriteStatus
|
|
|
// error or allow the stats handler to see it?
|
|
|
err = t.WriteStatus(stream, statusOK)
|
|
|
- if binlog != nil {
|
|
|
- binlog.Log(&binarylog.ServerTrailer{
|
|
|
+ if len(binlogs) != 0 {
|
|
|
+ st := &binarylog.ServerTrailer{
|
|
|
Trailer: stream.Trailer(),
|
|
|
Err: appErr,
|
|
|
- })
|
|
|
+ }
|
|
|
+ for _, binlog := range binlogs {
|
|
|
+ binlog.Log(st)
|
|
|
+ }
|
|
|
}
|
|
|
return err
|
|
|
}
|
|
@@ -1493,8 +1555,15 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
|
|
|
}()
|
|
|
}
|
|
|
|
|
|
- ss.binlog = binarylog.GetMethodLogger(stream.Method())
|
|
|
- if ss.binlog != nil {
|
|
|
+ if ml := binarylog.GetMethodLogger(stream.Method()); ml != nil {
|
|
|
+ ss.binlogs = append(ss.binlogs, ml)
|
|
|
+ }
|
|
|
+ if s.opts.binaryLogger != nil {
|
|
|
+ if ml := s.opts.binaryLogger.GetMethodLogger(stream.Method()); ml != nil {
|
|
|
+ ss.binlogs = append(ss.binlogs, ml)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if len(ss.binlogs) != 0 {
|
|
|
md, _ := metadata.FromIncomingContext(ctx)
|
|
|
logEntry := &binarylog.ClientHeader{
|
|
|
Header: md,
|
|
@@ -1513,7 +1582,9 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
|
|
|
if peer, ok := peer.FromContext(ss.Context()); ok {
|
|
|
logEntry.PeerAddr = peer.Addr
|
|
|
}
|
|
|
- ss.binlog.Log(logEntry)
|
|
|
+ for _, binlog := range ss.binlogs {
|
|
|
+ binlog.Log(logEntry)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// If dc is set and matches the stream's compression, use it. Otherwise, try
|
|
@@ -1579,11 +1650,14 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
|
|
|
ss.mu.Unlock()
|
|
|
}
|
|
|
t.WriteStatus(ss.s, appStatus)
|
|
|
- if ss.binlog != nil {
|
|
|
- ss.binlog.Log(&binarylog.ServerTrailer{
|
|
|
+ if len(ss.binlogs) != 0 {
|
|
|
+ st := &binarylog.ServerTrailer{
|
|
|
Trailer: ss.s.Trailer(),
|
|
|
Err: appErr,
|
|
|
- })
|
|
|
+ }
|
|
|
+ for _, binlog := range ss.binlogs {
|
|
|
+ binlog.Log(st)
|
|
|
+ }
|
|
|
}
|
|
|
// TODO: Should we log an error from WriteStatus here and below?
|
|
|
return appErr
|
|
@@ -1594,11 +1668,14 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
|
|
|
ss.mu.Unlock()
|
|
|
}
|
|
|
err = t.WriteStatus(ss.s, statusOK)
|
|
|
- if ss.binlog != nil {
|
|
|
- ss.binlog.Log(&binarylog.ServerTrailer{
|
|
|
+ if len(ss.binlogs) != 0 {
|
|
|
+ st := &binarylog.ServerTrailer{
|
|
|
Trailer: ss.s.Trailer(),
|
|
|
Err: appErr,
|
|
|
- })
|
|
|
+ }
|
|
|
+ for _, binlog := range ss.binlogs {
|
|
|
+ binlog.Log(st)
|
|
|
+ }
|
|
|
}
|
|
|
return err
|
|
|
}
|
|
@@ -1674,7 +1751,7 @@ type streamKey struct{}
|
|
|
// NewContextWithServerTransportStream creates a new context from ctx and
|
|
|
// attaches stream to it.
|
|
|
//
|
|
|
-// Experimental
|
|
|
+// # Experimental
|
|
|
//
|
|
|
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
|
|
|
// later release.
|
|
@@ -1689,7 +1766,7 @@ func NewContextWithServerTransportStream(ctx context.Context, stream ServerTrans
|
|
|
//
|
|
|
// See also NewContextWithServerTransportStream.
|
|
|
//
|
|
|
-// Experimental
|
|
|
+// # Experimental
|
|
|
//
|
|
|
// Notice: This type is EXPERIMENTAL and may be changed or removed in a
|
|
|
// later release.
|
|
@@ -1704,7 +1781,7 @@ type ServerTransportStream interface {
|
|
|
// ctx. Returns nil if the given context has no stream associated with it
|
|
|
// (which implies it is not an RPC invocation context).
|
|
|
//
|
|
|
-// Experimental
|
|
|
+// # Experimental
|
|
|
//
|
|
|
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
|
|
|
// later release.
|
|
@@ -1825,12 +1902,12 @@ func (s *Server) getCodec(contentSubtype string) baseCodec {
|
|
|
// When called multiple times, all the provided metadata will be merged. All
|
|
|
// the metadata will be sent out when one of the following happens:
|
|
|
//
|
|
|
-// - grpc.SendHeader is called, or for streaming handlers, stream.SendHeader.
|
|
|
-// - The first response message is sent. For unary handlers, this occurs when
|
|
|
-// the handler returns; for streaming handlers, this can happen when stream's
|
|
|
-// SendMsg method is called.
|
|
|
-// - An RPC status is sent out (error or success). This occurs when the handler
|
|
|
-// returns.
|
|
|
+// - grpc.SendHeader is called, or for streaming handlers, stream.SendHeader.
|
|
|
+// - The first response message is sent. For unary handlers, this occurs when
|
|
|
+// the handler returns; for streaming handlers, this can happen when stream's
|
|
|
+// SendMsg method is called.
|
|
|
+// - An RPC status is sent out (error or success). This occurs when the handler
|
|
|
+// returns.
|
|
|
//
|
|
|
// SetHeader will fail if called after any of the events above.
|
|
|
//
|