server.go 59 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "context"
  21. "errors"
  22. "fmt"
  23. "io"
  24. "math"
  25. "net"
  26. "net/http"
  27. "reflect"
  28. "runtime"
  29. "strings"
  30. "sync"
  31. "sync/atomic"
  32. "time"
  33. "golang.org/x/net/trace"
  34. "google.golang.org/grpc/codes"
  35. "google.golang.org/grpc/credentials"
  36. "google.golang.org/grpc/encoding"
  37. "google.golang.org/grpc/encoding/proto"
  38. "google.golang.org/grpc/grpclog"
  39. "google.golang.org/grpc/internal"
  40. "google.golang.org/grpc/internal/binarylog"
  41. "google.golang.org/grpc/internal/channelz"
  42. "google.golang.org/grpc/internal/grpcrand"
  43. "google.golang.org/grpc/internal/grpcsync"
  44. "google.golang.org/grpc/internal/transport"
  45. "google.golang.org/grpc/keepalive"
  46. "google.golang.org/grpc/metadata"
  47. "google.golang.org/grpc/peer"
  48. "google.golang.org/grpc/stats"
  49. "google.golang.org/grpc/status"
  50. "google.golang.org/grpc/tap"
  51. )
  52. const (
  53. defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
  54. defaultServerMaxSendMessageSize = math.MaxInt32
  55. // Server transports are tracked in a map which is keyed on listener
  56. // address. For regular gRPC traffic, connections are accepted in Serve()
  57. // through a call to Accept(), and we use the actual listener address as key
  58. // when we add it to the map. But for connections received through
  59. // ServeHTTP(), we do not have a listener and hence use this dummy value.
  60. listenerAddressForServeHTTP = "listenerAddressForServeHTTP"
  61. )
  62. func init() {
  63. internal.GetServerCredentials = func(srv *Server) credentials.TransportCredentials {
  64. return srv.opts.creds
  65. }
  66. internal.DrainServerTransports = func(srv *Server, addr string) {
  67. srv.drainServerTransports(addr)
  68. }
  69. internal.AddExtraServerOptions = func(opt ...ServerOption) {
  70. extraServerOptions = opt
  71. }
  72. internal.ClearExtraServerOptions = func() {
  73. extraServerOptions = nil
  74. }
  75. }
  76. var statusOK = status.New(codes.OK, "")
  77. var logger = grpclog.Component("core")
  78. type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
  79. // MethodDesc represents an RPC service's method specification.
  80. type MethodDesc struct {
  81. MethodName string
  82. Handler methodHandler
  83. }
  84. // ServiceDesc represents an RPC service's specification.
  85. type ServiceDesc struct {
  86. ServiceName string
  87. // The pointer to the service interface. Used to check whether the user
  88. // provided implementation satisfies the interface requirements.
  89. HandlerType interface{}
  90. Methods []MethodDesc
  91. Streams []StreamDesc
  92. Metadata interface{}
  93. }
  94. // serviceInfo wraps information about a service. It is very similar to
  95. // ServiceDesc and is constructed from it for internal purposes.
  96. type serviceInfo struct {
  97. // Contains the implementation for the methods in this service.
  98. serviceImpl interface{}
  99. methods map[string]*MethodDesc
  100. streams map[string]*StreamDesc
  101. mdata interface{}
  102. }
  103. type serverWorkerData struct {
  104. st transport.ServerTransport
  105. wg *sync.WaitGroup
  106. stream *transport.Stream
  107. }
  108. // Server is a gRPC server to serve RPC requests.
  109. type Server struct {
  110. opts serverOptions
  111. mu sync.Mutex // guards following
  112. lis map[net.Listener]bool
  113. // conns contains all active server transports. It is a map keyed on a
  114. // listener address with the value being the set of active transports
  115. // belonging to that listener.
  116. conns map[string]map[transport.ServerTransport]bool
  117. serve bool
  118. drain bool
  119. cv *sync.Cond // signaled when connections close for GracefulStop
  120. services map[string]*serviceInfo // service name -> service info
  121. events trace.EventLog
  122. quit *grpcsync.Event
  123. done *grpcsync.Event
  124. channelzRemoveOnce sync.Once
  125. serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
  126. channelzID *channelz.Identifier
  127. czData *channelzData
  128. serverWorkerChannels []chan *serverWorkerData
  129. }
  130. type serverOptions struct {
  131. creds credentials.TransportCredentials
  132. codec baseCodec
  133. cp Compressor
  134. dc Decompressor
  135. unaryInt UnaryServerInterceptor
  136. streamInt StreamServerInterceptor
  137. chainUnaryInts []UnaryServerInterceptor
  138. chainStreamInts []StreamServerInterceptor
  139. inTapHandle tap.ServerInHandle
  140. statsHandlers []stats.Handler
  141. maxConcurrentStreams uint32
  142. maxReceiveMessageSize int
  143. maxSendMessageSize int
  144. unknownStreamDesc *StreamDesc
  145. keepaliveParams keepalive.ServerParameters
  146. keepalivePolicy keepalive.EnforcementPolicy
  147. initialWindowSize int32
  148. initialConnWindowSize int32
  149. writeBufferSize int
  150. readBufferSize int
  151. connectionTimeout time.Duration
  152. maxHeaderListSize *uint32
  153. headerTableSize *uint32
  154. numServerWorkers uint32
  155. }
  156. var defaultServerOptions = serverOptions{
  157. maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
  158. maxSendMessageSize: defaultServerMaxSendMessageSize,
  159. connectionTimeout: 120 * time.Second,
  160. writeBufferSize: defaultWriteBufSize,
  161. readBufferSize: defaultReadBufSize,
  162. }
  163. var extraServerOptions []ServerOption
  164. // A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
  165. type ServerOption interface {
  166. apply(*serverOptions)
  167. }
  168. // EmptyServerOption does not alter the server configuration. It can be embedded
  169. // in another structure to build custom server options.
  170. //
  171. // Experimental
  172. //
  173. // Notice: This type is EXPERIMENTAL and may be changed or removed in a
  174. // later release.
  175. type EmptyServerOption struct{}
  176. func (EmptyServerOption) apply(*serverOptions) {}
  177. // funcServerOption wraps a function that modifies serverOptions into an
  178. // implementation of the ServerOption interface.
  179. type funcServerOption struct {
  180. f func(*serverOptions)
  181. }
  182. func (fdo *funcServerOption) apply(do *serverOptions) {
  183. fdo.f(do)
  184. }
  185. func newFuncServerOption(f func(*serverOptions)) *funcServerOption {
  186. return &funcServerOption{
  187. f: f,
  188. }
  189. }
  190. // WriteBufferSize determines how much data can be batched before doing a write on the wire.
  191. // The corresponding memory allocation for this buffer will be twice the size to keep syscalls low.
  192. // The default value for this buffer is 32KB.
  193. // Zero will disable the write buffer such that each write will be on underlying connection.
  194. // Note: A Send call may not directly translate to a write.
  195. func WriteBufferSize(s int) ServerOption {
  196. return newFuncServerOption(func(o *serverOptions) {
  197. o.writeBufferSize = s
  198. })
  199. }
  200. // ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
  201. // for one read syscall.
  202. // The default value for this buffer is 32KB.
  203. // Zero will disable read buffer for a connection so data framer can access the underlying
  204. // conn directly.
  205. func ReadBufferSize(s int) ServerOption {
  206. return newFuncServerOption(func(o *serverOptions) {
  207. o.readBufferSize = s
  208. })
  209. }
  210. // InitialWindowSize returns a ServerOption that sets window size for stream.
  211. // The lower bound for window size is 64K and any value smaller than that will be ignored.
  212. func InitialWindowSize(s int32) ServerOption {
  213. return newFuncServerOption(func(o *serverOptions) {
  214. o.initialWindowSize = s
  215. })
  216. }
  217. // InitialConnWindowSize returns a ServerOption that sets window size for a connection.
  218. // The lower bound for window size is 64K and any value smaller than that will be ignored.
  219. func InitialConnWindowSize(s int32) ServerOption {
  220. return newFuncServerOption(func(o *serverOptions) {
  221. o.initialConnWindowSize = s
  222. })
  223. }
  224. // KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
  225. func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
  226. if kp.Time > 0 && kp.Time < time.Second {
  227. logger.Warning("Adjusting keepalive ping interval to minimum period of 1s")
  228. kp.Time = time.Second
  229. }
  230. return newFuncServerOption(func(o *serverOptions) {
  231. o.keepaliveParams = kp
  232. })
  233. }
  234. // KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
  235. func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
  236. return newFuncServerOption(func(o *serverOptions) {
  237. o.keepalivePolicy = kep
  238. })
  239. }
  240. // CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
  241. //
  242. // This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
  243. //
  244. // Deprecated: register codecs using encoding.RegisterCodec. The server will
  245. // automatically use registered codecs based on the incoming requests' headers.
  246. // See also
  247. // https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
  248. // Will be supported throughout 1.x.
  249. func CustomCodec(codec Codec) ServerOption {
  250. return newFuncServerOption(func(o *serverOptions) {
  251. o.codec = codec
  252. })
  253. }
  254. // ForceServerCodec returns a ServerOption that sets a codec for message
  255. // marshaling and unmarshaling.
  256. //
  257. // This will override any lookups by content-subtype for Codecs registered
  258. // with RegisterCodec.
  259. //
  260. // See Content-Type on
  261. // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
  262. // more details. Also see the documentation on RegisterCodec and
  263. // CallContentSubtype for more details on the interaction between encoding.Codec
  264. // and content-subtype.
  265. //
  266. // This function is provided for advanced users; prefer to register codecs
  267. // using encoding.RegisterCodec.
  268. // The server will automatically use registered codecs based on the incoming
  269. // requests' headers. See also
  270. // https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
  271. // Will be supported throughout 1.x.
  272. //
  273. // Experimental
  274. //
  275. // Notice: This API is EXPERIMENTAL and may be changed or removed in a
  276. // later release.
  277. func ForceServerCodec(codec encoding.Codec) ServerOption {
  278. return newFuncServerOption(func(o *serverOptions) {
  279. o.codec = codec
  280. })
  281. }
  282. // RPCCompressor returns a ServerOption that sets a compressor for outbound
  283. // messages. For backward compatibility, all outbound messages will be sent
  284. // using this compressor, regardless of incoming message compression. By
  285. // default, server messages will be sent using the same compressor with which
  286. // request messages were sent.
  287. //
  288. // Deprecated: use encoding.RegisterCompressor instead. Will be supported
  289. // throughout 1.x.
  290. func RPCCompressor(cp Compressor) ServerOption {
  291. return newFuncServerOption(func(o *serverOptions) {
  292. o.cp = cp
  293. })
  294. }
  295. // RPCDecompressor returns a ServerOption that sets a decompressor for inbound
  296. // messages. It has higher priority than decompressors registered via
  297. // encoding.RegisterCompressor.
  298. //
  299. // Deprecated: use encoding.RegisterCompressor instead. Will be supported
  300. // throughout 1.x.
  301. func RPCDecompressor(dc Decompressor) ServerOption {
  302. return newFuncServerOption(func(o *serverOptions) {
  303. o.dc = dc
  304. })
  305. }
  306. // MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
  307. // If this is not set, gRPC uses the default limit.
  308. //
  309. // Deprecated: use MaxRecvMsgSize instead. Will be supported throughout 1.x.
  310. func MaxMsgSize(m int) ServerOption {
  311. return MaxRecvMsgSize(m)
  312. }
  313. // MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
  314. // If this is not set, gRPC uses the default 4MB.
  315. func MaxRecvMsgSize(m int) ServerOption {
  316. return newFuncServerOption(func(o *serverOptions) {
  317. o.maxReceiveMessageSize = m
  318. })
  319. }
  320. // MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
  321. // If this is not set, gRPC uses the default `math.MaxInt32`.
  322. func MaxSendMsgSize(m int) ServerOption {
  323. return newFuncServerOption(func(o *serverOptions) {
  324. o.maxSendMessageSize = m
  325. })
  326. }
  327. // MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
  328. // of concurrent streams to each ServerTransport.
  329. func MaxConcurrentStreams(n uint32) ServerOption {
  330. return newFuncServerOption(func(o *serverOptions) {
  331. o.maxConcurrentStreams = n
  332. })
  333. }
  334. // Creds returns a ServerOption that sets credentials for server connections.
  335. func Creds(c credentials.TransportCredentials) ServerOption {
  336. return newFuncServerOption(func(o *serverOptions) {
  337. o.creds = c
  338. })
  339. }
  340. // UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
  341. // server. Only one unary interceptor can be installed. The construction of multiple
  342. // interceptors (e.g., chaining) can be implemented at the caller.
  343. func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
  344. return newFuncServerOption(func(o *serverOptions) {
  345. if o.unaryInt != nil {
  346. panic("The unary server interceptor was already set and may not be reset.")
  347. }
  348. o.unaryInt = i
  349. })
  350. }
  351. // ChainUnaryInterceptor returns a ServerOption that specifies the chained interceptor
  352. // for unary RPCs. The first interceptor will be the outer most,
  353. // while the last interceptor will be the inner most wrapper around the real call.
  354. // All unary interceptors added by this method will be chained.
  355. func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption {
  356. return newFuncServerOption(func(o *serverOptions) {
  357. o.chainUnaryInts = append(o.chainUnaryInts, interceptors...)
  358. })
  359. }
  360. // StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
  361. // server. Only one stream interceptor can be installed.
  362. func StreamInterceptor(i StreamServerInterceptor) ServerOption {
  363. return newFuncServerOption(func(o *serverOptions) {
  364. if o.streamInt != nil {
  365. panic("The stream server interceptor was already set and may not be reset.")
  366. }
  367. o.streamInt = i
  368. })
  369. }
  370. // ChainStreamInterceptor returns a ServerOption that specifies the chained interceptor
  371. // for streaming RPCs. The first interceptor will be the outer most,
  372. // while the last interceptor will be the inner most wrapper around the real call.
  373. // All stream interceptors added by this method will be chained.
  374. func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption {
  375. return newFuncServerOption(func(o *serverOptions) {
  376. o.chainStreamInts = append(o.chainStreamInts, interceptors...)
  377. })
  378. }
  379. // InTapHandle returns a ServerOption that sets the tap handle for all the server
  380. // transport to be created. Only one can be installed.
  381. //
  382. // Experimental
  383. //
  384. // Notice: This API is EXPERIMENTAL and may be changed or removed in a
  385. // later release.
  386. func InTapHandle(h tap.ServerInHandle) ServerOption {
  387. return newFuncServerOption(func(o *serverOptions) {
  388. if o.inTapHandle != nil {
  389. panic("The tap handle was already set and may not be reset.")
  390. }
  391. o.inTapHandle = h
  392. })
  393. }
  394. // StatsHandler returns a ServerOption that sets the stats handler for the server.
  395. func StatsHandler(h stats.Handler) ServerOption {
  396. return newFuncServerOption(func(o *serverOptions) {
  397. o.statsHandlers = append(o.statsHandlers, h)
  398. })
  399. }
  400. // UnknownServiceHandler returns a ServerOption that allows for adding a custom
  401. // unknown service handler. The provided method is a bidi-streaming RPC service
  402. // handler that will be invoked instead of returning the "unimplemented" gRPC
  403. // error whenever a request is received for an unregistered service or method.
  404. // The handling function and stream interceptor (if set) have full access to
  405. // the ServerStream, including its Context.
  406. func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
  407. return newFuncServerOption(func(o *serverOptions) {
  408. o.unknownStreamDesc = &StreamDesc{
  409. StreamName: "unknown_service_handler",
  410. Handler: streamHandler,
  411. // We need to assume that the users of the streamHandler will want to use both.
  412. ClientStreams: true,
  413. ServerStreams: true,
  414. }
  415. })
  416. }
  417. // ConnectionTimeout returns a ServerOption that sets the timeout for
  418. // connection establishment (up to and including HTTP/2 handshaking) for all
  419. // new connections. If this is not set, the default is 120 seconds. A zero or
  420. // negative value will result in an immediate timeout.
  421. //
  422. // Experimental
  423. //
  424. // Notice: This API is EXPERIMENTAL and may be changed or removed in a
  425. // later release.
  426. func ConnectionTimeout(d time.Duration) ServerOption {
  427. return newFuncServerOption(func(o *serverOptions) {
  428. o.connectionTimeout = d
  429. })
  430. }
  431. // MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
  432. // of header list that the server is prepared to accept.
  433. func MaxHeaderListSize(s uint32) ServerOption {
  434. return newFuncServerOption(func(o *serverOptions) {
  435. o.maxHeaderListSize = &s
  436. })
  437. }
  438. // HeaderTableSize returns a ServerOption that sets the size of dynamic
  439. // header table for stream.
  440. //
  441. // Experimental
  442. //
  443. // Notice: This API is EXPERIMENTAL and may be changed or removed in a
  444. // later release.
  445. func HeaderTableSize(s uint32) ServerOption {
  446. return newFuncServerOption(func(o *serverOptions) {
  447. o.headerTableSize = &s
  448. })
  449. }
  450. // NumStreamWorkers returns a ServerOption that sets the number of worker
  451. // goroutines that should be used to process incoming streams. Setting this to
  452. // zero (default) will disable workers and spawn a new goroutine for each
  453. // stream.
  454. //
  455. // Experimental
  456. //
  457. // Notice: This API is EXPERIMENTAL and may be changed or removed in a
  458. // later release.
  459. func NumStreamWorkers(numServerWorkers uint32) ServerOption {
  460. // TODO: If/when this API gets stabilized (i.e. stream workers become the
  461. // only way streams are processed), change the behavior of the zero value to
  462. // a sane default. Preliminary experiments suggest that a value equal to the
  463. // number of CPUs available is most performant; requires thorough testing.
  464. return newFuncServerOption(func(o *serverOptions) {
  465. o.numServerWorkers = numServerWorkers
  466. })
  467. }
  468. // serverWorkerResetThreshold defines how often the stack must be reset. Every
  469. // N requests, by spawning a new goroutine in its place, a worker can reset its
  470. // stack so that large stacks don't live in memory forever. 2^16 should allow
  471. // each goroutine stack to live for at least a few seconds in a typical
  472. // workload (assuming a QPS of a few thousand requests/sec).
  473. const serverWorkerResetThreshold = 1 << 16
  474. // serverWorkers blocks on a *transport.Stream channel forever and waits for
  475. // data to be fed by serveStreams. This allows different requests to be
  476. // processed by the same goroutine, removing the need for expensive stack
  477. // re-allocations (see the runtime.morestack problem [1]).
  478. //
  479. // [1] https://github.com/golang/go/issues/18138
  480. func (s *Server) serverWorker(ch chan *serverWorkerData) {
  481. // To make sure all server workers don't reset at the same time, choose a
  482. // random number of iterations before resetting.
  483. threshold := serverWorkerResetThreshold + grpcrand.Intn(serverWorkerResetThreshold)
  484. for completed := 0; completed < threshold; completed++ {
  485. data, ok := <-ch
  486. if !ok {
  487. return
  488. }
  489. s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream))
  490. data.wg.Done()
  491. }
  492. go s.serverWorker(ch)
  493. }
  494. // initServerWorkers creates worker goroutines and channels to process incoming
  495. // connections to reduce the time spent overall on runtime.morestack.
  496. func (s *Server) initServerWorkers() {
  497. s.serverWorkerChannels = make([]chan *serverWorkerData, s.opts.numServerWorkers)
  498. for i := uint32(0); i < s.opts.numServerWorkers; i++ {
  499. s.serverWorkerChannels[i] = make(chan *serverWorkerData)
  500. go s.serverWorker(s.serverWorkerChannels[i])
  501. }
  502. }
  503. func (s *Server) stopServerWorkers() {
  504. for i := uint32(0); i < s.opts.numServerWorkers; i++ {
  505. close(s.serverWorkerChannels[i])
  506. }
  507. }
  508. // NewServer creates a gRPC server which has no service registered and has not
  509. // started to accept requests yet.
  510. func NewServer(opt ...ServerOption) *Server {
  511. opts := defaultServerOptions
  512. for _, o := range extraServerOptions {
  513. o.apply(&opts)
  514. }
  515. for _, o := range opt {
  516. o.apply(&opts)
  517. }
  518. s := &Server{
  519. lis: make(map[net.Listener]bool),
  520. opts: opts,
  521. conns: make(map[string]map[transport.ServerTransport]bool),
  522. services: make(map[string]*serviceInfo),
  523. quit: grpcsync.NewEvent(),
  524. done: grpcsync.NewEvent(),
  525. czData: new(channelzData),
  526. }
  527. chainUnaryServerInterceptors(s)
  528. chainStreamServerInterceptors(s)
  529. s.cv = sync.NewCond(&s.mu)
  530. if EnableTracing {
  531. _, file, line, _ := runtime.Caller(1)
  532. s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
  533. }
  534. if s.opts.numServerWorkers > 0 {
  535. s.initServerWorkers()
  536. }
  537. s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
  538. channelz.Info(logger, s.channelzID, "Server created")
  539. return s
  540. }
  541. // printf records an event in s's event log, unless s has been stopped.
  542. // REQUIRES s.mu is held.
  543. func (s *Server) printf(format string, a ...interface{}) {
  544. if s.events != nil {
  545. s.events.Printf(format, a...)
  546. }
  547. }
  548. // errorf records an error in s's event log, unless s has been stopped.
  549. // REQUIRES s.mu is held.
  550. func (s *Server) errorf(format string, a ...interface{}) {
  551. if s.events != nil {
  552. s.events.Errorf(format, a...)
  553. }
  554. }
  555. // ServiceRegistrar wraps a single method that supports service registration. It
  556. // enables users to pass concrete types other than grpc.Server to the service
  557. // registration methods exported by the IDL generated code.
  558. type ServiceRegistrar interface {
  559. // RegisterService registers a service and its implementation to the
  560. // concrete type implementing this interface. It may not be called
  561. // once the server has started serving.
  562. // desc describes the service and its methods and handlers. impl is the
  563. // service implementation which is passed to the method handlers.
  564. RegisterService(desc *ServiceDesc, impl interface{})
  565. }
  566. // RegisterService registers a service and its implementation to the gRPC
  567. // server. It is called from the IDL generated code. This must be called before
  568. // invoking Serve. If ss is non-nil (for legacy code), its type is checked to
  569. // ensure it implements sd.HandlerType.
  570. func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
  571. if ss != nil {
  572. ht := reflect.TypeOf(sd.HandlerType).Elem()
  573. st := reflect.TypeOf(ss)
  574. if !st.Implements(ht) {
  575. logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
  576. }
  577. }
  578. s.register(sd, ss)
  579. }
  580. func (s *Server) register(sd *ServiceDesc, ss interface{}) {
  581. s.mu.Lock()
  582. defer s.mu.Unlock()
  583. s.printf("RegisterService(%q)", sd.ServiceName)
  584. if s.serve {
  585. logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
  586. }
  587. if _, ok := s.services[sd.ServiceName]; ok {
  588. logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
  589. }
  590. info := &serviceInfo{
  591. serviceImpl: ss,
  592. methods: make(map[string]*MethodDesc),
  593. streams: make(map[string]*StreamDesc),
  594. mdata: sd.Metadata,
  595. }
  596. for i := range sd.Methods {
  597. d := &sd.Methods[i]
  598. info.methods[d.MethodName] = d
  599. }
  600. for i := range sd.Streams {
  601. d := &sd.Streams[i]
  602. info.streams[d.StreamName] = d
  603. }
  604. s.services[sd.ServiceName] = info
  605. }
  606. // MethodInfo contains the information of an RPC including its method name and type.
  607. type MethodInfo struct {
  608. // Name is the method name only, without the service name or package name.
  609. Name string
  610. // IsClientStream indicates whether the RPC is a client streaming RPC.
  611. IsClientStream bool
  612. // IsServerStream indicates whether the RPC is a server streaming RPC.
  613. IsServerStream bool
  614. }
  615. // ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
  616. type ServiceInfo struct {
  617. Methods []MethodInfo
  618. // Metadata is the metadata specified in ServiceDesc when registering service.
  619. Metadata interface{}
  620. }
  621. // GetServiceInfo returns a map from service names to ServiceInfo.
  622. // Service names include the package names, in the form of <package>.<service>.
  623. func (s *Server) GetServiceInfo() map[string]ServiceInfo {
  624. ret := make(map[string]ServiceInfo)
  625. for n, srv := range s.services {
  626. methods := make([]MethodInfo, 0, len(srv.methods)+len(srv.streams))
  627. for m := range srv.methods {
  628. methods = append(methods, MethodInfo{
  629. Name: m,
  630. IsClientStream: false,
  631. IsServerStream: false,
  632. })
  633. }
  634. for m, d := range srv.streams {
  635. methods = append(methods, MethodInfo{
  636. Name: m,
  637. IsClientStream: d.ClientStreams,
  638. IsServerStream: d.ServerStreams,
  639. })
  640. }
  641. ret[n] = ServiceInfo{
  642. Methods: methods,
  643. Metadata: srv.mdata,
  644. }
  645. }
  646. return ret
  647. }
  648. // ErrServerStopped indicates that the operation is now illegal because of
  649. // the server being stopped.
  650. var ErrServerStopped = errors.New("grpc: the server has been stopped")
  651. type listenSocket struct {
  652. net.Listener
  653. channelzID *channelz.Identifier
  654. }
  655. func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
  656. return &channelz.SocketInternalMetric{
  657. SocketOptions: channelz.GetSocketOption(l.Listener),
  658. LocalAddr: l.Listener.Addr(),
  659. }
  660. }
  661. func (l *listenSocket) Close() error {
  662. err := l.Listener.Close()
  663. channelz.RemoveEntry(l.channelzID)
  664. channelz.Info(logger, l.channelzID, "ListenSocket deleted")
  665. return err
  666. }
  667. // Serve accepts incoming connections on the listener lis, creating a new
  668. // ServerTransport and service goroutine for each. The service goroutines
  669. // read gRPC requests and then call the registered handlers to reply to them.
  670. // Serve returns when lis.Accept fails with fatal errors. lis will be closed when
  671. // this method returns.
  672. // Serve will return a non-nil error unless Stop or GracefulStop is called.
  673. func (s *Server) Serve(lis net.Listener) error {
  674. s.mu.Lock()
  675. s.printf("serving")
  676. s.serve = true
  677. if s.lis == nil {
  678. // Serve called after Stop or GracefulStop.
  679. s.mu.Unlock()
  680. lis.Close()
  681. return ErrServerStopped
  682. }
  683. s.serveWG.Add(1)
  684. defer func() {
  685. s.serveWG.Done()
  686. if s.quit.HasFired() {
  687. // Stop or GracefulStop called; block until done and return nil.
  688. <-s.done.Done()
  689. }
  690. }()
  691. ls := &listenSocket{Listener: lis}
  692. s.lis[ls] = true
  693. defer func() {
  694. s.mu.Lock()
  695. if s.lis != nil && s.lis[ls] {
  696. ls.Close()
  697. delete(s.lis, ls)
  698. }
  699. s.mu.Unlock()
  700. }()
  701. var err error
  702. ls.channelzID, err = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
  703. if err != nil {
  704. s.mu.Unlock()
  705. return err
  706. }
  707. s.mu.Unlock()
  708. channelz.Info(logger, ls.channelzID, "ListenSocket created")
  709. var tempDelay time.Duration // how long to sleep on accept failure
  710. for {
  711. rawConn, err := lis.Accept()
  712. if err != nil {
  713. if ne, ok := err.(interface {
  714. Temporary() bool
  715. }); ok && ne.Temporary() {
  716. if tempDelay == 0 {
  717. tempDelay = 5 * time.Millisecond
  718. } else {
  719. tempDelay *= 2
  720. }
  721. if max := 1 * time.Second; tempDelay > max {
  722. tempDelay = max
  723. }
  724. s.mu.Lock()
  725. s.printf("Accept error: %v; retrying in %v", err, tempDelay)
  726. s.mu.Unlock()
  727. timer := time.NewTimer(tempDelay)
  728. select {
  729. case <-timer.C:
  730. case <-s.quit.Done():
  731. timer.Stop()
  732. return nil
  733. }
  734. continue
  735. }
  736. s.mu.Lock()
  737. s.printf("done serving; Accept = %v", err)
  738. s.mu.Unlock()
  739. if s.quit.HasFired() {
  740. return nil
  741. }
  742. return err
  743. }
  744. tempDelay = 0
  745. // Start a new goroutine to deal with rawConn so we don't stall this Accept
  746. // loop goroutine.
  747. //
  748. // Make sure we account for the goroutine so GracefulStop doesn't nil out
  749. // s.conns before this conn can be added.
  750. s.serveWG.Add(1)
  751. go func() {
  752. s.handleRawConn(lis.Addr().String(), rawConn)
  753. s.serveWG.Done()
  754. }()
  755. }
  756. }
  757. // handleRawConn forks a goroutine to handle a just-accepted connection that
  758. // has not had any I/O performed on it yet.
  759. func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
  760. if s.quit.HasFired() {
  761. rawConn.Close()
  762. return
  763. }
  764. rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
  765. // Finish handshaking (HTTP2)
  766. st := s.newHTTP2Transport(rawConn)
  767. rawConn.SetDeadline(time.Time{})
  768. if st == nil {
  769. return
  770. }
  771. if !s.addConn(lisAddr, st) {
  772. return
  773. }
  774. go func() {
  775. s.serveStreams(st)
  776. s.removeConn(lisAddr, st)
  777. }()
  778. }
  779. func (s *Server) drainServerTransports(addr string) {
  780. s.mu.Lock()
  781. conns := s.conns[addr]
  782. for st := range conns {
  783. st.Drain()
  784. }
  785. s.mu.Unlock()
  786. }
  787. // newHTTP2Transport sets up a http/2 transport (using the
  788. // gRPC http2 server transport in transport/http2_server.go).
  789. func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {
  790. config := &transport.ServerConfig{
  791. MaxStreams: s.opts.maxConcurrentStreams,
  792. ConnectionTimeout: s.opts.connectionTimeout,
  793. Credentials: s.opts.creds,
  794. InTapHandle: s.opts.inTapHandle,
  795. StatsHandlers: s.opts.statsHandlers,
  796. KeepaliveParams: s.opts.keepaliveParams,
  797. KeepalivePolicy: s.opts.keepalivePolicy,
  798. InitialWindowSize: s.opts.initialWindowSize,
  799. InitialConnWindowSize: s.opts.initialConnWindowSize,
  800. WriteBufferSize: s.opts.writeBufferSize,
  801. ReadBufferSize: s.opts.readBufferSize,
  802. ChannelzParentID: s.channelzID,
  803. MaxHeaderListSize: s.opts.maxHeaderListSize,
  804. HeaderTableSize: s.opts.headerTableSize,
  805. }
  806. st, err := transport.NewServerTransport(c, config)
  807. if err != nil {
  808. s.mu.Lock()
  809. s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
  810. s.mu.Unlock()
  811. // ErrConnDispatched means that the connection was dispatched away from
  812. // gRPC; those connections should be left open.
  813. if err != credentials.ErrConnDispatched {
  814. // Don't log on ErrConnDispatched and io.EOF to prevent log spam.
  815. if err != io.EOF {
  816. channelz.Warning(logger, s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err)
  817. }
  818. c.Close()
  819. }
  820. return nil
  821. }
  822. return st
  823. }
  824. func (s *Server) serveStreams(st transport.ServerTransport) {
  825. defer st.Close()
  826. var wg sync.WaitGroup
  827. var roundRobinCounter uint32
  828. st.HandleStreams(func(stream *transport.Stream) {
  829. wg.Add(1)
  830. if s.opts.numServerWorkers > 0 {
  831. data := &serverWorkerData{st: st, wg: &wg, stream: stream}
  832. select {
  833. case s.serverWorkerChannels[atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers] <- data:
  834. default:
  835. // If all stream workers are busy, fallback to the default code path.
  836. go func() {
  837. s.handleStream(st, stream, s.traceInfo(st, stream))
  838. wg.Done()
  839. }()
  840. }
  841. } else {
  842. go func() {
  843. defer wg.Done()
  844. s.handleStream(st, stream, s.traceInfo(st, stream))
  845. }()
  846. }
  847. }, func(ctx context.Context, method string) context.Context {
  848. if !EnableTracing {
  849. return ctx
  850. }
  851. tr := trace.New("grpc.Recv."+methodFamily(method), method)
  852. return trace.NewContext(ctx, tr)
  853. })
  854. wg.Wait()
  855. }
  856. var _ http.Handler = (*Server)(nil)
  857. // ServeHTTP implements the Go standard library's http.Handler
  858. // interface by responding to the gRPC request r, by looking up
  859. // the requested gRPC method in the gRPC server s.
  860. //
  861. // The provided HTTP request must have arrived on an HTTP/2
  862. // connection. When using the Go standard library's server,
  863. // practically this means that the Request must also have arrived
  864. // over TLS.
  865. //
  866. // To share one port (such as 443 for https) between gRPC and an
  867. // existing http.Handler, use a root http.Handler such as:
  868. //
  869. // if r.ProtoMajor == 2 && strings.HasPrefix(
  870. // r.Header.Get("Content-Type"), "application/grpc") {
  871. // grpcServer.ServeHTTP(w, r)
  872. // } else {
  873. // yourMux.ServeHTTP(w, r)
  874. // }
  875. //
  876. // Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
  877. // separate from grpc-go's HTTP/2 server. Performance and features may vary
  878. // between the two paths. ServeHTTP does not support some gRPC features
  879. // available through grpc-go's HTTP/2 server.
  880. //
  881. // Experimental
  882. //
  883. // Notice: This API is EXPERIMENTAL and may be changed or removed in a
  884. // later release.
  885. func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  886. st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandlers)
  887. if err != nil {
  888. http.Error(w, err.Error(), http.StatusInternalServerError)
  889. return
  890. }
  891. if !s.addConn(listenerAddressForServeHTTP, st) {
  892. return
  893. }
  894. defer s.removeConn(listenerAddressForServeHTTP, st)
  895. s.serveStreams(st)
  896. }
  897. // traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
  898. // If tracing is not enabled, it returns nil.
  899. func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
  900. if !EnableTracing {
  901. return nil
  902. }
  903. tr, ok := trace.FromContext(stream.Context())
  904. if !ok {
  905. return nil
  906. }
  907. trInfo = &traceInfo{
  908. tr: tr,
  909. firstLine: firstLine{
  910. client: false,
  911. remoteAddr: st.RemoteAddr(),
  912. },
  913. }
  914. if dl, ok := stream.Context().Deadline(); ok {
  915. trInfo.firstLine.deadline = time.Until(dl)
  916. }
  917. return trInfo
  918. }
  919. func (s *Server) addConn(addr string, st transport.ServerTransport) bool {
  920. s.mu.Lock()
  921. defer s.mu.Unlock()
  922. if s.conns == nil {
  923. st.Close()
  924. return false
  925. }
  926. if s.drain {
  927. // Transport added after we drained our existing conns: drain it
  928. // immediately.
  929. st.Drain()
  930. }
  931. if s.conns[addr] == nil {
  932. // Create a map entry if this is the first connection on this listener.
  933. s.conns[addr] = make(map[transport.ServerTransport]bool)
  934. }
  935. s.conns[addr][st] = true
  936. return true
  937. }
  938. func (s *Server) removeConn(addr string, st transport.ServerTransport) {
  939. s.mu.Lock()
  940. defer s.mu.Unlock()
  941. conns := s.conns[addr]
  942. if conns != nil {
  943. delete(conns, st)
  944. if len(conns) == 0 {
  945. // If the last connection for this address is being removed, also
  946. // remove the map entry corresponding to the address. This is used
  947. // in GracefulStop() when waiting for all connections to be closed.
  948. delete(s.conns, addr)
  949. }
  950. s.cv.Broadcast()
  951. }
  952. }
  953. func (s *Server) channelzMetric() *channelz.ServerInternalMetric {
  954. return &channelz.ServerInternalMetric{
  955. CallsStarted: atomic.LoadInt64(&s.czData.callsStarted),
  956. CallsSucceeded: atomic.LoadInt64(&s.czData.callsSucceeded),
  957. CallsFailed: atomic.LoadInt64(&s.czData.callsFailed),
  958. LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)),
  959. }
  960. }
  961. func (s *Server) incrCallsStarted() {
  962. atomic.AddInt64(&s.czData.callsStarted, 1)
  963. atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano())
  964. }
  965. func (s *Server) incrCallsSucceeded() {
  966. atomic.AddInt64(&s.czData.callsSucceeded, 1)
  967. }
  968. func (s *Server) incrCallsFailed() {
  969. atomic.AddInt64(&s.czData.callsFailed, 1)
  970. }
  971. func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
  972. data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
  973. if err != nil {
  974. channelz.Error(logger, s.channelzID, "grpc: server failed to encode response: ", err)
  975. return err
  976. }
  977. compData, err := compress(data, cp, comp)
  978. if err != nil {
  979. channelz.Error(logger, s.channelzID, "grpc: server failed to compress response: ", err)
  980. return err
  981. }
  982. hdr, payload := msgHeader(data, compData)
  983. // TODO(dfawley): should we be checking len(data) instead?
  984. if len(payload) > s.opts.maxSendMessageSize {
  985. return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
  986. }
  987. err = t.Write(stream, hdr, payload, opts)
  988. if err == nil {
  989. for _, sh := range s.opts.statsHandlers {
  990. sh.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
  991. }
  992. }
  993. return err
  994. }
  995. // chainUnaryServerInterceptors chains all unary server interceptors into one.
  996. func chainUnaryServerInterceptors(s *Server) {
  997. // Prepend opts.unaryInt to the chaining interceptors if it exists, since unaryInt will
  998. // be executed before any other chained interceptors.
  999. interceptors := s.opts.chainUnaryInts
  1000. if s.opts.unaryInt != nil {
  1001. interceptors = append([]UnaryServerInterceptor{s.opts.unaryInt}, s.opts.chainUnaryInts...)
  1002. }
  1003. var chainedInt UnaryServerInterceptor
  1004. if len(interceptors) == 0 {
  1005. chainedInt = nil
  1006. } else if len(interceptors) == 1 {
  1007. chainedInt = interceptors[0]
  1008. } else {
  1009. chainedInt = chainUnaryInterceptors(interceptors)
  1010. }
  1011. s.opts.unaryInt = chainedInt
  1012. }
  1013. func chainUnaryInterceptors(interceptors []UnaryServerInterceptor) UnaryServerInterceptor {
  1014. return func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) {
  1015. // the struct ensures the variables are allocated together, rather than separately, since we
  1016. // know they should be garbage collected together. This saves 1 allocation and decreases
  1017. // time/call by about 10% on the microbenchmark.
  1018. var state struct {
  1019. i int
  1020. next UnaryHandler
  1021. }
  1022. state.next = func(ctx context.Context, req interface{}) (interface{}, error) {
  1023. if state.i == len(interceptors)-1 {
  1024. return interceptors[state.i](ctx, req, info, handler)
  1025. }
  1026. state.i++
  1027. return interceptors[state.i-1](ctx, req, info, state.next)
  1028. }
  1029. return state.next(ctx, req)
  1030. }
  1031. }
  1032. func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) {
  1033. shs := s.opts.statsHandlers
  1034. if len(shs) != 0 || trInfo != nil || channelz.IsOn() {
  1035. if channelz.IsOn() {
  1036. s.incrCallsStarted()
  1037. }
  1038. var statsBegin *stats.Begin
  1039. for _, sh := range shs {
  1040. beginTime := time.Now()
  1041. statsBegin = &stats.Begin{
  1042. BeginTime: beginTime,
  1043. IsClientStream: false,
  1044. IsServerStream: false,
  1045. }
  1046. sh.HandleRPC(stream.Context(), statsBegin)
  1047. }
  1048. if trInfo != nil {
  1049. trInfo.tr.LazyLog(&trInfo.firstLine, false)
  1050. }
  1051. // The deferred error handling for tracing, stats handler and channelz are
  1052. // combined into one function to reduce stack usage -- a defer takes ~56-64
  1053. // bytes on the stack, so overflowing the stack will require a stack
  1054. // re-allocation, which is expensive.
  1055. //
  1056. // To maintain behavior similar to separate deferred statements, statements
  1057. // should be executed in the reverse order. That is, tracing first, stats
  1058. // handler second, and channelz last. Note that panics *within* defers will
  1059. // lead to different behavior, but that's an acceptable compromise; that
  1060. // would be undefined behavior territory anyway.
  1061. defer func() {
  1062. if trInfo != nil {
  1063. if err != nil && err != io.EOF {
  1064. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  1065. trInfo.tr.SetError()
  1066. }
  1067. trInfo.tr.Finish()
  1068. }
  1069. for _, sh := range shs {
  1070. end := &stats.End{
  1071. BeginTime: statsBegin.BeginTime,
  1072. EndTime: time.Now(),
  1073. }
  1074. if err != nil && err != io.EOF {
  1075. end.Error = toRPCErr(err)
  1076. }
  1077. sh.HandleRPC(stream.Context(), end)
  1078. }
  1079. if channelz.IsOn() {
  1080. if err != nil && err != io.EOF {
  1081. s.incrCallsFailed()
  1082. } else {
  1083. s.incrCallsSucceeded()
  1084. }
  1085. }
  1086. }()
  1087. }
  1088. binlog := binarylog.GetMethodLogger(stream.Method())
  1089. if binlog != nil {
  1090. ctx := stream.Context()
  1091. md, _ := metadata.FromIncomingContext(ctx)
  1092. logEntry := &binarylog.ClientHeader{
  1093. Header: md,
  1094. MethodName: stream.Method(),
  1095. PeerAddr: nil,
  1096. }
  1097. if deadline, ok := ctx.Deadline(); ok {
  1098. logEntry.Timeout = time.Until(deadline)
  1099. if logEntry.Timeout < 0 {
  1100. logEntry.Timeout = 0
  1101. }
  1102. }
  1103. if a := md[":authority"]; len(a) > 0 {
  1104. logEntry.Authority = a[0]
  1105. }
  1106. if peer, ok := peer.FromContext(ctx); ok {
  1107. logEntry.PeerAddr = peer.Addr
  1108. }
  1109. binlog.Log(logEntry)
  1110. }
  1111. // comp and cp are used for compression. decomp and dc are used for
  1112. // decompression. If comp and decomp are both set, they are the same;
  1113. // however they are kept separate to ensure that at most one of the
  1114. // compressor/decompressor variable pairs are set for use later.
  1115. var comp, decomp encoding.Compressor
  1116. var cp Compressor
  1117. var dc Decompressor
  1118. // If dc is set and matches the stream's compression, use it. Otherwise, try
  1119. // to find a matching registered compressor for decomp.
  1120. if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
  1121. dc = s.opts.dc
  1122. } else if rc != "" && rc != encoding.Identity {
  1123. decomp = encoding.GetCompressor(rc)
  1124. if decomp == nil {
  1125. st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
  1126. t.WriteStatus(stream, st)
  1127. return st.Err()
  1128. }
  1129. }
  1130. // If cp is set, use it. Otherwise, attempt to compress the response using
  1131. // the incoming message compression method.
  1132. //
  1133. // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
  1134. if s.opts.cp != nil {
  1135. cp = s.opts.cp
  1136. stream.SetSendCompress(cp.Type())
  1137. } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
  1138. // Legacy compressor not specified; attempt to respond with same encoding.
  1139. comp = encoding.GetCompressor(rc)
  1140. if comp != nil {
  1141. stream.SetSendCompress(rc)
  1142. }
  1143. }
  1144. var payInfo *payloadInfo
  1145. if len(shs) != 0 || binlog != nil {
  1146. payInfo = &payloadInfo{}
  1147. }
  1148. d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
  1149. if err != nil {
  1150. if e := t.WriteStatus(stream, status.Convert(err)); e != nil {
  1151. channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status %v", e)
  1152. }
  1153. return err
  1154. }
  1155. if channelz.IsOn() {
  1156. t.IncrMsgRecv()
  1157. }
  1158. df := func(v interface{}) error {
  1159. if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
  1160. return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
  1161. }
  1162. for _, sh := range shs {
  1163. sh.HandleRPC(stream.Context(), &stats.InPayload{
  1164. RecvTime: time.Now(),
  1165. Payload: v,
  1166. WireLength: payInfo.wireLength + headerLen,
  1167. Data: d,
  1168. Length: len(d),
  1169. })
  1170. }
  1171. if binlog != nil {
  1172. binlog.Log(&binarylog.ClientMessage{
  1173. Message: d,
  1174. })
  1175. }
  1176. if trInfo != nil {
  1177. trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
  1178. }
  1179. return nil
  1180. }
  1181. ctx := NewContextWithServerTransportStream(stream.Context(), stream)
  1182. reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt)
  1183. if appErr != nil {
  1184. appStatus, ok := status.FromError(appErr)
  1185. if !ok {
  1186. // Convert non-status application error to a status error with code
  1187. // Unknown, but handle context errors specifically.
  1188. appStatus = status.FromContextError(appErr)
  1189. appErr = appStatus.Err()
  1190. }
  1191. if trInfo != nil {
  1192. trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
  1193. trInfo.tr.SetError()
  1194. }
  1195. if e := t.WriteStatus(stream, appStatus); e != nil {
  1196. channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
  1197. }
  1198. if binlog != nil {
  1199. if h, _ := stream.Header(); h.Len() > 0 {
  1200. // Only log serverHeader if there was header. Otherwise it can
  1201. // be trailer only.
  1202. binlog.Log(&binarylog.ServerHeader{
  1203. Header: h,
  1204. })
  1205. }
  1206. binlog.Log(&binarylog.ServerTrailer{
  1207. Trailer: stream.Trailer(),
  1208. Err: appErr,
  1209. })
  1210. }
  1211. return appErr
  1212. }
  1213. if trInfo != nil {
  1214. trInfo.tr.LazyLog(stringer("OK"), false)
  1215. }
  1216. opts := &transport.Options{Last: true}
  1217. if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
  1218. if err == io.EOF {
  1219. // The entire stream is done (for unary RPC only).
  1220. return err
  1221. }
  1222. if sts, ok := status.FromError(err); ok {
  1223. if e := t.WriteStatus(stream, sts); e != nil {
  1224. channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
  1225. }
  1226. } else {
  1227. switch st := err.(type) {
  1228. case transport.ConnectionError:
  1229. // Nothing to do here.
  1230. default:
  1231. panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
  1232. }
  1233. }
  1234. if binlog != nil {
  1235. h, _ := stream.Header()
  1236. binlog.Log(&binarylog.ServerHeader{
  1237. Header: h,
  1238. })
  1239. binlog.Log(&binarylog.ServerTrailer{
  1240. Trailer: stream.Trailer(),
  1241. Err: appErr,
  1242. })
  1243. }
  1244. return err
  1245. }
  1246. if binlog != nil {
  1247. h, _ := stream.Header()
  1248. binlog.Log(&binarylog.ServerHeader{
  1249. Header: h,
  1250. })
  1251. binlog.Log(&binarylog.ServerMessage{
  1252. Message: reply,
  1253. })
  1254. }
  1255. if channelz.IsOn() {
  1256. t.IncrMsgSent()
  1257. }
  1258. if trInfo != nil {
  1259. trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
  1260. }
  1261. // TODO: Should we be logging if writing status failed here, like above?
  1262. // Should the logging be in WriteStatus? Should we ignore the WriteStatus
  1263. // error or allow the stats handler to see it?
  1264. err = t.WriteStatus(stream, statusOK)
  1265. if binlog != nil {
  1266. binlog.Log(&binarylog.ServerTrailer{
  1267. Trailer: stream.Trailer(),
  1268. Err: appErr,
  1269. })
  1270. }
  1271. return err
  1272. }
  1273. // chainStreamServerInterceptors chains all stream server interceptors into one.
  1274. func chainStreamServerInterceptors(s *Server) {
  1275. // Prepend opts.streamInt to the chaining interceptors if it exists, since streamInt will
  1276. // be executed before any other chained interceptors.
  1277. interceptors := s.opts.chainStreamInts
  1278. if s.opts.streamInt != nil {
  1279. interceptors = append([]StreamServerInterceptor{s.opts.streamInt}, s.opts.chainStreamInts...)
  1280. }
  1281. var chainedInt StreamServerInterceptor
  1282. if len(interceptors) == 0 {
  1283. chainedInt = nil
  1284. } else if len(interceptors) == 1 {
  1285. chainedInt = interceptors[0]
  1286. } else {
  1287. chainedInt = chainStreamInterceptors(interceptors)
  1288. }
  1289. s.opts.streamInt = chainedInt
  1290. }
  1291. func chainStreamInterceptors(interceptors []StreamServerInterceptor) StreamServerInterceptor {
  1292. return func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error {
  1293. // the struct ensures the variables are allocated together, rather than separately, since we
  1294. // know they should be garbage collected together. This saves 1 allocation and decreases
  1295. // time/call by about 10% on the microbenchmark.
  1296. var state struct {
  1297. i int
  1298. next StreamHandler
  1299. }
  1300. state.next = func(srv interface{}, ss ServerStream) error {
  1301. if state.i == len(interceptors)-1 {
  1302. return interceptors[state.i](srv, ss, info, handler)
  1303. }
  1304. state.i++
  1305. return interceptors[state.i-1](srv, ss, info, state.next)
  1306. }
  1307. return state.next(srv, ss)
  1308. }
  1309. }
  1310. func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, sd *StreamDesc, trInfo *traceInfo) (err error) {
  1311. if channelz.IsOn() {
  1312. s.incrCallsStarted()
  1313. }
  1314. shs := s.opts.statsHandlers
  1315. var statsBegin *stats.Begin
  1316. if len(shs) != 0 {
  1317. beginTime := time.Now()
  1318. statsBegin = &stats.Begin{
  1319. BeginTime: beginTime,
  1320. IsClientStream: sd.ClientStreams,
  1321. IsServerStream: sd.ServerStreams,
  1322. }
  1323. for _, sh := range shs {
  1324. sh.HandleRPC(stream.Context(), statsBegin)
  1325. }
  1326. }
  1327. ctx := NewContextWithServerTransportStream(stream.Context(), stream)
  1328. ss := &serverStream{
  1329. ctx: ctx,
  1330. t: t,
  1331. s: stream,
  1332. p: &parser{r: stream},
  1333. codec: s.getCodec(stream.ContentSubtype()),
  1334. maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
  1335. maxSendMessageSize: s.opts.maxSendMessageSize,
  1336. trInfo: trInfo,
  1337. statsHandler: shs,
  1338. }
  1339. if len(shs) != 0 || trInfo != nil || channelz.IsOn() {
  1340. // See comment in processUnaryRPC on defers.
  1341. defer func() {
  1342. if trInfo != nil {
  1343. ss.mu.Lock()
  1344. if err != nil && err != io.EOF {
  1345. ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  1346. ss.trInfo.tr.SetError()
  1347. }
  1348. ss.trInfo.tr.Finish()
  1349. ss.trInfo.tr = nil
  1350. ss.mu.Unlock()
  1351. }
  1352. if len(shs) != 0 {
  1353. end := &stats.End{
  1354. BeginTime: statsBegin.BeginTime,
  1355. EndTime: time.Now(),
  1356. }
  1357. if err != nil && err != io.EOF {
  1358. end.Error = toRPCErr(err)
  1359. }
  1360. for _, sh := range shs {
  1361. sh.HandleRPC(stream.Context(), end)
  1362. }
  1363. }
  1364. if channelz.IsOn() {
  1365. if err != nil && err != io.EOF {
  1366. s.incrCallsFailed()
  1367. } else {
  1368. s.incrCallsSucceeded()
  1369. }
  1370. }
  1371. }()
  1372. }
  1373. ss.binlog = binarylog.GetMethodLogger(stream.Method())
  1374. if ss.binlog != nil {
  1375. md, _ := metadata.FromIncomingContext(ctx)
  1376. logEntry := &binarylog.ClientHeader{
  1377. Header: md,
  1378. MethodName: stream.Method(),
  1379. PeerAddr: nil,
  1380. }
  1381. if deadline, ok := ctx.Deadline(); ok {
  1382. logEntry.Timeout = time.Until(deadline)
  1383. if logEntry.Timeout < 0 {
  1384. logEntry.Timeout = 0
  1385. }
  1386. }
  1387. if a := md[":authority"]; len(a) > 0 {
  1388. logEntry.Authority = a[0]
  1389. }
  1390. if peer, ok := peer.FromContext(ss.Context()); ok {
  1391. logEntry.PeerAddr = peer.Addr
  1392. }
  1393. ss.binlog.Log(logEntry)
  1394. }
  1395. // If dc is set and matches the stream's compression, use it. Otherwise, try
  1396. // to find a matching registered compressor for decomp.
  1397. if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
  1398. ss.dc = s.opts.dc
  1399. } else if rc != "" && rc != encoding.Identity {
  1400. ss.decomp = encoding.GetCompressor(rc)
  1401. if ss.decomp == nil {
  1402. st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
  1403. t.WriteStatus(ss.s, st)
  1404. return st.Err()
  1405. }
  1406. }
  1407. // If cp is set, use it. Otherwise, attempt to compress the response using
  1408. // the incoming message compression method.
  1409. //
  1410. // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
  1411. if s.opts.cp != nil {
  1412. ss.cp = s.opts.cp
  1413. stream.SetSendCompress(s.opts.cp.Type())
  1414. } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
  1415. // Legacy compressor not specified; attempt to respond with same encoding.
  1416. ss.comp = encoding.GetCompressor(rc)
  1417. if ss.comp != nil {
  1418. stream.SetSendCompress(rc)
  1419. }
  1420. }
  1421. ss.ctx = newContextWithRPCInfo(ss.ctx, false, ss.codec, ss.cp, ss.comp)
  1422. if trInfo != nil {
  1423. trInfo.tr.LazyLog(&trInfo.firstLine, false)
  1424. }
  1425. var appErr error
  1426. var server interface{}
  1427. if info != nil {
  1428. server = info.serviceImpl
  1429. }
  1430. if s.opts.streamInt == nil {
  1431. appErr = sd.Handler(server, ss)
  1432. } else {
  1433. info := &StreamServerInfo{
  1434. FullMethod: stream.Method(),
  1435. IsClientStream: sd.ClientStreams,
  1436. IsServerStream: sd.ServerStreams,
  1437. }
  1438. appErr = s.opts.streamInt(server, ss, info, sd.Handler)
  1439. }
  1440. if appErr != nil {
  1441. appStatus, ok := status.FromError(appErr)
  1442. if !ok {
  1443. // Convert non-status application error to a status error with code
  1444. // Unknown, but handle context errors specifically.
  1445. appStatus = status.FromContextError(appErr)
  1446. appErr = appStatus.Err()
  1447. }
  1448. if trInfo != nil {
  1449. ss.mu.Lock()
  1450. ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
  1451. ss.trInfo.tr.SetError()
  1452. ss.mu.Unlock()
  1453. }
  1454. t.WriteStatus(ss.s, appStatus)
  1455. if ss.binlog != nil {
  1456. ss.binlog.Log(&binarylog.ServerTrailer{
  1457. Trailer: ss.s.Trailer(),
  1458. Err: appErr,
  1459. })
  1460. }
  1461. // TODO: Should we log an error from WriteStatus here and below?
  1462. return appErr
  1463. }
  1464. if trInfo != nil {
  1465. ss.mu.Lock()
  1466. ss.trInfo.tr.LazyLog(stringer("OK"), false)
  1467. ss.mu.Unlock()
  1468. }
  1469. err = t.WriteStatus(ss.s, statusOK)
  1470. if ss.binlog != nil {
  1471. ss.binlog.Log(&binarylog.ServerTrailer{
  1472. Trailer: ss.s.Trailer(),
  1473. Err: appErr,
  1474. })
  1475. }
  1476. return err
  1477. }
  1478. func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
  1479. sm := stream.Method()
  1480. if sm != "" && sm[0] == '/' {
  1481. sm = sm[1:]
  1482. }
  1483. pos := strings.LastIndex(sm, "/")
  1484. if pos == -1 {
  1485. if trInfo != nil {
  1486. trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
  1487. trInfo.tr.SetError()
  1488. }
  1489. errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
  1490. if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
  1491. if trInfo != nil {
  1492. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  1493. trInfo.tr.SetError()
  1494. }
  1495. channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
  1496. }
  1497. if trInfo != nil {
  1498. trInfo.tr.Finish()
  1499. }
  1500. return
  1501. }
  1502. service := sm[:pos]
  1503. method := sm[pos+1:]
  1504. srv, knownService := s.services[service]
  1505. if knownService {
  1506. if md, ok := srv.methods[method]; ok {
  1507. s.processUnaryRPC(t, stream, srv, md, trInfo)
  1508. return
  1509. }
  1510. if sd, ok := srv.streams[method]; ok {
  1511. s.processStreamingRPC(t, stream, srv, sd, trInfo)
  1512. return
  1513. }
  1514. }
  1515. // Unknown service, or known server unknown method.
  1516. if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
  1517. s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
  1518. return
  1519. }
  1520. var errDesc string
  1521. if !knownService {
  1522. errDesc = fmt.Sprintf("unknown service %v", service)
  1523. } else {
  1524. errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
  1525. }
  1526. if trInfo != nil {
  1527. trInfo.tr.LazyPrintf("%s", errDesc)
  1528. trInfo.tr.SetError()
  1529. }
  1530. if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
  1531. if trInfo != nil {
  1532. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  1533. trInfo.tr.SetError()
  1534. }
  1535. channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
  1536. }
  1537. if trInfo != nil {
  1538. trInfo.tr.Finish()
  1539. }
  1540. }
  1541. // The key to save ServerTransportStream in the context.
  1542. type streamKey struct{}
  1543. // NewContextWithServerTransportStream creates a new context from ctx and
  1544. // attaches stream to it.
  1545. //
  1546. // Experimental
  1547. //
  1548. // Notice: This API is EXPERIMENTAL and may be changed or removed in a
  1549. // later release.
  1550. func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
  1551. return context.WithValue(ctx, streamKey{}, stream)
  1552. }
  1553. // ServerTransportStream is a minimal interface that a transport stream must
  1554. // implement. This can be used to mock an actual transport stream for tests of
  1555. // handler code that use, for example, grpc.SetHeader (which requires some
  1556. // stream to be in context).
  1557. //
  1558. // See also NewContextWithServerTransportStream.
  1559. //
  1560. // Experimental
  1561. //
  1562. // Notice: This type is EXPERIMENTAL and may be changed or removed in a
  1563. // later release.
  1564. type ServerTransportStream interface {
  1565. Method() string
  1566. SetHeader(md metadata.MD) error
  1567. SendHeader(md metadata.MD) error
  1568. SetTrailer(md metadata.MD) error
  1569. }
  1570. // ServerTransportStreamFromContext returns the ServerTransportStream saved in
  1571. // ctx. Returns nil if the given context has no stream associated with it
  1572. // (which implies it is not an RPC invocation context).
  1573. //
  1574. // Experimental
  1575. //
  1576. // Notice: This API is EXPERIMENTAL and may be changed or removed in a
  1577. // later release.
  1578. func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream {
  1579. s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
  1580. return s
  1581. }
  1582. // Stop stops the gRPC server. It immediately closes all open
  1583. // connections and listeners.
  1584. // It cancels all active RPCs on the server side and the corresponding
  1585. // pending RPCs on the client side will get notified by connection
  1586. // errors.
  1587. func (s *Server) Stop() {
  1588. s.quit.Fire()
  1589. defer func() {
  1590. s.serveWG.Wait()
  1591. s.done.Fire()
  1592. }()
  1593. s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelzID) })
  1594. s.mu.Lock()
  1595. listeners := s.lis
  1596. s.lis = nil
  1597. conns := s.conns
  1598. s.conns = nil
  1599. // interrupt GracefulStop if Stop and GracefulStop are called concurrently.
  1600. s.cv.Broadcast()
  1601. s.mu.Unlock()
  1602. for lis := range listeners {
  1603. lis.Close()
  1604. }
  1605. for _, cs := range conns {
  1606. for st := range cs {
  1607. st.Close()
  1608. }
  1609. }
  1610. if s.opts.numServerWorkers > 0 {
  1611. s.stopServerWorkers()
  1612. }
  1613. s.mu.Lock()
  1614. if s.events != nil {
  1615. s.events.Finish()
  1616. s.events = nil
  1617. }
  1618. s.mu.Unlock()
  1619. }
  1620. // GracefulStop stops the gRPC server gracefully. It stops the server from
  1621. // accepting new connections and RPCs and blocks until all the pending RPCs are
  1622. // finished.
  1623. func (s *Server) GracefulStop() {
  1624. s.quit.Fire()
  1625. defer s.done.Fire()
  1626. s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelzID) })
  1627. s.mu.Lock()
  1628. if s.conns == nil {
  1629. s.mu.Unlock()
  1630. return
  1631. }
  1632. for lis := range s.lis {
  1633. lis.Close()
  1634. }
  1635. s.lis = nil
  1636. if !s.drain {
  1637. for _, conns := range s.conns {
  1638. for st := range conns {
  1639. st.Drain()
  1640. }
  1641. }
  1642. s.drain = true
  1643. }
  1644. // Wait for serving threads to be ready to exit. Only then can we be sure no
  1645. // new conns will be created.
  1646. s.mu.Unlock()
  1647. s.serveWG.Wait()
  1648. s.mu.Lock()
  1649. for len(s.conns) != 0 {
  1650. s.cv.Wait()
  1651. }
  1652. s.conns = nil
  1653. if s.events != nil {
  1654. s.events.Finish()
  1655. s.events = nil
  1656. }
  1657. s.mu.Unlock()
  1658. }
  1659. // contentSubtype must be lowercase
  1660. // cannot return nil
  1661. func (s *Server) getCodec(contentSubtype string) baseCodec {
  1662. if s.opts.codec != nil {
  1663. return s.opts.codec
  1664. }
  1665. if contentSubtype == "" {
  1666. return encoding.GetCodec(proto.Name)
  1667. }
  1668. codec := encoding.GetCodec(contentSubtype)
  1669. if codec == nil {
  1670. return encoding.GetCodec(proto.Name)
  1671. }
  1672. return codec
  1673. }
  1674. // SetHeader sets the header metadata to be sent from the server to the client.
  1675. // The context provided must be the context passed to the server's handler.
  1676. //
  1677. // Streaming RPCs should prefer the SetHeader method of the ServerStream.
  1678. //
  1679. // When called multiple times, all the provided metadata will be merged. All
  1680. // the metadata will be sent out when one of the following happens:
  1681. //
  1682. // - grpc.SendHeader is called, or for streaming handlers, stream.SendHeader.
  1683. // - The first response message is sent. For unary handlers, this occurs when
  1684. // the handler returns; for streaming handlers, this can happen when stream's
  1685. // SendMsg method is called.
  1686. // - An RPC status is sent out (error or success). This occurs when the handler
  1687. // returns.
  1688. //
  1689. // SetHeader will fail if called after any of the events above.
  1690. //
  1691. // The error returned is compatible with the status package. However, the
  1692. // status code will often not match the RPC status as seen by the client
  1693. // application, and therefore, should not be relied upon for this purpose.
  1694. func SetHeader(ctx context.Context, md metadata.MD) error {
  1695. if md.Len() == 0 {
  1696. return nil
  1697. }
  1698. stream := ServerTransportStreamFromContext(ctx)
  1699. if stream == nil {
  1700. return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
  1701. }
  1702. return stream.SetHeader(md)
  1703. }
  1704. // SendHeader sends header metadata. It may be called at most once, and may not
  1705. // be called after any event that causes headers to be sent (see SetHeader for
  1706. // a complete list). The provided md and headers set by SetHeader() will be
  1707. // sent.
  1708. //
  1709. // The error returned is compatible with the status package. However, the
  1710. // status code will often not match the RPC status as seen by the client
  1711. // application, and therefore, should not be relied upon for this purpose.
  1712. func SendHeader(ctx context.Context, md metadata.MD) error {
  1713. stream := ServerTransportStreamFromContext(ctx)
  1714. if stream == nil {
  1715. return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
  1716. }
  1717. if err := stream.SendHeader(md); err != nil {
  1718. return toRPCErr(err)
  1719. }
  1720. return nil
  1721. }
  1722. // SetTrailer sets the trailer metadata that will be sent when an RPC returns.
  1723. // When called more than once, all the provided metadata will be merged.
  1724. //
  1725. // The error returned is compatible with the status package. However, the
  1726. // status code will often not match the RPC status as seen by the client
  1727. // application, and therefore, should not be relied upon for this purpose.
  1728. func SetTrailer(ctx context.Context, md metadata.MD) error {
  1729. if md.Len() == 0 {
  1730. return nil
  1731. }
  1732. stream := ServerTransportStreamFromContext(ctx)
  1733. if stream == nil {
  1734. return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
  1735. }
  1736. return stream.SetTrailer(md)
  1737. }
  1738. // Method returns the method string for the server context. The returned
  1739. // string is in the format of "/service/method".
  1740. func Method(ctx context.Context) (string, bool) {
  1741. s := ServerTransportStreamFromContext(ctx)
  1742. if s == nil {
  1743. return "", false
  1744. }
  1745. return s.Method(), true
  1746. }
  1747. type channelzServer struct {
  1748. s *Server
  1749. }
  1750. func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric {
  1751. return c.s.channelzMetric()
  1752. }