transport.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. // Package transport defines and implements message oriented communication
  19. // channel to complete various transactions (e.g., an RPC). It is meant for
  20. // grpc-internal usage and is not intended to be imported directly by users.
  21. package transport
  22. import (
  23. "bytes"
  24. "context"
  25. "errors"
  26. "fmt"
  27. "io"
  28. "net"
  29. "sync"
  30. "sync/atomic"
  31. "time"
  32. "google.golang.org/grpc/codes"
  33. "google.golang.org/grpc/credentials"
  34. "google.golang.org/grpc/internal/channelz"
  35. "google.golang.org/grpc/keepalive"
  36. "google.golang.org/grpc/metadata"
  37. "google.golang.org/grpc/resolver"
  38. "google.golang.org/grpc/stats"
  39. "google.golang.org/grpc/status"
  40. "google.golang.org/grpc/tap"
  41. )
  42. const logLevel = 2
  43. type bufferPool struct {
  44. pool sync.Pool
  45. }
  46. func newBufferPool() *bufferPool {
  47. return &bufferPool{
  48. pool: sync.Pool{
  49. New: func() any {
  50. return new(bytes.Buffer)
  51. },
  52. },
  53. }
  54. }
  55. func (p *bufferPool) get() *bytes.Buffer {
  56. return p.pool.Get().(*bytes.Buffer)
  57. }
  58. func (p *bufferPool) put(b *bytes.Buffer) {
  59. p.pool.Put(b)
  60. }
  61. // recvMsg represents the received msg from the transport. All transport
  62. // protocol specific info has been removed.
  63. type recvMsg struct {
  64. buffer *bytes.Buffer
  65. // nil: received some data
  66. // io.EOF: stream is completed. data is nil.
  67. // other non-nil error: transport failure. data is nil.
  68. err error
  69. }
  70. // recvBuffer is an unbounded channel of recvMsg structs.
  71. //
  72. // Note: recvBuffer differs from buffer.Unbounded only in the fact that it
  73. // holds a channel of recvMsg structs instead of objects implementing "item"
  74. // interface. recvBuffer is written to much more often and using strict recvMsg
  75. // structs helps avoid allocation in "recvBuffer.put"
  76. type recvBuffer struct {
  77. c chan recvMsg
  78. mu sync.Mutex
  79. backlog []recvMsg
  80. err error
  81. }
  82. func newRecvBuffer() *recvBuffer {
  83. b := &recvBuffer{
  84. c: make(chan recvMsg, 1),
  85. }
  86. return b
  87. }
  88. func (b *recvBuffer) put(r recvMsg) {
  89. b.mu.Lock()
  90. if b.err != nil {
  91. b.mu.Unlock()
  92. // An error had occurred earlier, don't accept more
  93. // data or errors.
  94. return
  95. }
  96. b.err = r.err
  97. if len(b.backlog) == 0 {
  98. select {
  99. case b.c <- r:
  100. b.mu.Unlock()
  101. return
  102. default:
  103. }
  104. }
  105. b.backlog = append(b.backlog, r)
  106. b.mu.Unlock()
  107. }
  108. func (b *recvBuffer) load() {
  109. b.mu.Lock()
  110. if len(b.backlog) > 0 {
  111. select {
  112. case b.c <- b.backlog[0]:
  113. b.backlog[0] = recvMsg{}
  114. b.backlog = b.backlog[1:]
  115. default:
  116. }
  117. }
  118. b.mu.Unlock()
  119. }
  120. // get returns the channel that receives a recvMsg in the buffer.
  121. //
  122. // Upon receipt of a recvMsg, the caller should call load to send another
  123. // recvMsg onto the channel if there is any.
  124. func (b *recvBuffer) get() <-chan recvMsg {
  125. return b.c
  126. }
  127. // recvBufferReader implements io.Reader interface to read the data from
  128. // recvBuffer.
  129. type recvBufferReader struct {
  130. closeStream func(error) // Closes the client transport stream with the given error and nil trailer metadata.
  131. ctx context.Context
  132. ctxDone <-chan struct{} // cache of ctx.Done() (for performance).
  133. recv *recvBuffer
  134. last *bytes.Buffer // Stores the remaining data in the previous calls.
  135. err error
  136. freeBuffer func(*bytes.Buffer)
  137. }
  138. // Read reads the next len(p) bytes from last. If last is drained, it tries to
  139. // read additional data from recv. It blocks if there no additional data available
  140. // in recv. If Read returns any non-nil error, it will continue to return that error.
  141. func (r *recvBufferReader) Read(p []byte) (n int, err error) {
  142. if r.err != nil {
  143. return 0, r.err
  144. }
  145. if r.last != nil {
  146. // Read remaining data left in last call.
  147. copied, _ := r.last.Read(p)
  148. if r.last.Len() == 0 {
  149. r.freeBuffer(r.last)
  150. r.last = nil
  151. }
  152. return copied, nil
  153. }
  154. if r.closeStream != nil {
  155. n, r.err = r.readClient(p)
  156. } else {
  157. n, r.err = r.read(p)
  158. }
  159. return n, r.err
  160. }
  161. func (r *recvBufferReader) read(p []byte) (n int, err error) {
  162. select {
  163. case <-r.ctxDone:
  164. return 0, ContextErr(r.ctx.Err())
  165. case m := <-r.recv.get():
  166. return r.readAdditional(m, p)
  167. }
  168. }
  169. func (r *recvBufferReader) readClient(p []byte) (n int, err error) {
  170. // If the context is canceled, then closes the stream with nil metadata.
  171. // closeStream writes its error parameter to r.recv as a recvMsg.
  172. // r.readAdditional acts on that message and returns the necessary error.
  173. select {
  174. case <-r.ctxDone:
  175. // Note that this adds the ctx error to the end of recv buffer, and
  176. // reads from the head. This will delay the error until recv buffer is
  177. // empty, thus will delay ctx cancellation in Recv().
  178. //
  179. // It's done this way to fix a race between ctx cancel and trailer. The
  180. // race was, stream.Recv() may return ctx error if ctxDone wins the
  181. // race, but stream.Trailer() may return a non-nil md because the stream
  182. // was not marked as done when trailer is received. This closeStream
  183. // call will mark stream as done, thus fix the race.
  184. //
  185. // TODO: delaying ctx error seems like a unnecessary side effect. What
  186. // we really want is to mark the stream as done, and return ctx error
  187. // faster.
  188. r.closeStream(ContextErr(r.ctx.Err()))
  189. m := <-r.recv.get()
  190. return r.readAdditional(m, p)
  191. case m := <-r.recv.get():
  192. return r.readAdditional(m, p)
  193. }
  194. }
  195. func (r *recvBufferReader) readAdditional(m recvMsg, p []byte) (n int, err error) {
  196. r.recv.load()
  197. if m.err != nil {
  198. return 0, m.err
  199. }
  200. copied, _ := m.buffer.Read(p)
  201. if m.buffer.Len() == 0 {
  202. r.freeBuffer(m.buffer)
  203. r.last = nil
  204. } else {
  205. r.last = m.buffer
  206. }
  207. return copied, nil
  208. }
  209. type streamState uint32
  210. const (
  211. streamActive streamState = iota
  212. streamWriteDone // EndStream sent
  213. streamReadDone // EndStream received
  214. streamDone // the entire stream is finished.
  215. )
  216. // Stream represents an RPC in the transport layer.
  217. type Stream struct {
  218. id uint32
  219. st ServerTransport // nil for client side Stream
  220. ct *http2Client // nil for server side Stream
  221. ctx context.Context // the associated context of the stream
  222. cancel context.CancelFunc // always nil for client side Stream
  223. done chan struct{} // closed at the end of stream to unblock writers. On the client side.
  224. doneFunc func() // invoked at the end of stream on client side.
  225. ctxDone <-chan struct{} // same as done chan but for server side. Cache of ctx.Done() (for performance)
  226. method string // the associated RPC method of the stream
  227. recvCompress string
  228. sendCompress string
  229. buf *recvBuffer
  230. trReader io.Reader
  231. fc *inFlow
  232. wq *writeQuota
  233. // Holds compressor names passed in grpc-accept-encoding metadata from the
  234. // client. This is empty for the client side stream.
  235. clientAdvertisedCompressors string
  236. // Callback to state application's intentions to read data. This
  237. // is used to adjust flow control, if needed.
  238. requestRead func(int)
  239. headerChan chan struct{} // closed to indicate the end of header metadata.
  240. headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
  241. // headerValid indicates whether a valid header was received. Only
  242. // meaningful after headerChan is closed (always call waitOnHeader() before
  243. // reading its value). Not valid on server side.
  244. headerValid bool
  245. // hdrMu protects header and trailer metadata on the server-side.
  246. hdrMu sync.Mutex
  247. // On client side, header keeps the received header metadata.
  248. //
  249. // On server side, header keeps the header set by SetHeader(). The complete
  250. // header will merged into this after t.WriteHeader() is called.
  251. header metadata.MD
  252. trailer metadata.MD // the key-value map of trailer metadata.
  253. noHeaders bool // set if the client never received headers (set only after the stream is done).
  254. // On the server-side, headerSent is atomically set to 1 when the headers are sent out.
  255. headerSent uint32
  256. state streamState
  257. // On client-side it is the status error received from the server.
  258. // On server-side it is unused.
  259. status *status.Status
  260. bytesReceived uint32 // indicates whether any bytes have been received on this stream
  261. unprocessed uint32 // set if the server sends a refused stream or GOAWAY including this stream
  262. // contentSubtype is the content-subtype for requests.
  263. // this must be lowercase or the behavior is undefined.
  264. contentSubtype string
  265. }
  266. // isHeaderSent is only valid on the server-side.
  267. func (s *Stream) isHeaderSent() bool {
  268. return atomic.LoadUint32(&s.headerSent) == 1
  269. }
  270. // updateHeaderSent updates headerSent and returns true
  271. // if it was alreay set. It is valid only on server-side.
  272. func (s *Stream) updateHeaderSent() bool {
  273. return atomic.SwapUint32(&s.headerSent, 1) == 1
  274. }
  275. func (s *Stream) swapState(st streamState) streamState {
  276. return streamState(atomic.SwapUint32((*uint32)(&s.state), uint32(st)))
  277. }
  278. func (s *Stream) compareAndSwapState(oldState, newState streamState) bool {
  279. return atomic.CompareAndSwapUint32((*uint32)(&s.state), uint32(oldState), uint32(newState))
  280. }
  281. func (s *Stream) getState() streamState {
  282. return streamState(atomic.LoadUint32((*uint32)(&s.state)))
  283. }
  284. func (s *Stream) waitOnHeader() {
  285. if s.headerChan == nil {
  286. // On the server headerChan is always nil since a stream originates
  287. // only after having received headers.
  288. return
  289. }
  290. select {
  291. case <-s.ctx.Done():
  292. // Close the stream to prevent headers/trailers from changing after
  293. // this function returns.
  294. s.ct.CloseStream(s, ContextErr(s.ctx.Err()))
  295. // headerChan could possibly not be closed yet if closeStream raced
  296. // with operateHeaders; wait until it is closed explicitly here.
  297. <-s.headerChan
  298. case <-s.headerChan:
  299. }
  300. }
  301. // RecvCompress returns the compression algorithm applied to the inbound
  302. // message. It is empty string if there is no compression applied.
  303. func (s *Stream) RecvCompress() string {
  304. s.waitOnHeader()
  305. return s.recvCompress
  306. }
  307. // SetSendCompress sets the compression algorithm to the stream.
  308. func (s *Stream) SetSendCompress(name string) error {
  309. if s.isHeaderSent() || s.getState() == streamDone {
  310. return errors.New("transport: set send compressor called after headers sent or stream done")
  311. }
  312. s.sendCompress = name
  313. return nil
  314. }
  315. // SendCompress returns the send compressor name.
  316. func (s *Stream) SendCompress() string {
  317. return s.sendCompress
  318. }
  319. // ClientAdvertisedCompressors returns the compressor names advertised by the
  320. // client via grpc-accept-encoding header.
  321. func (s *Stream) ClientAdvertisedCompressors() string {
  322. return s.clientAdvertisedCompressors
  323. }
  324. // Done returns a channel which is closed when it receives the final status
  325. // from the server.
  326. func (s *Stream) Done() <-chan struct{} {
  327. return s.done
  328. }
  329. // Header returns the header metadata of the stream.
  330. //
  331. // On client side, it acquires the key-value pairs of header metadata once it is
  332. // available. It blocks until i) the metadata is ready or ii) there is no header
  333. // metadata or iii) the stream is canceled/expired.
  334. //
  335. // On server side, it returns the out header after t.WriteHeader is called. It
  336. // does not block and must not be called until after WriteHeader.
  337. func (s *Stream) Header() (metadata.MD, error) {
  338. if s.headerChan == nil {
  339. // On server side, return the header in stream. It will be the out
  340. // header after t.WriteHeader is called.
  341. return s.header.Copy(), nil
  342. }
  343. s.waitOnHeader()
  344. if !s.headerValid || s.noHeaders {
  345. return nil, s.status.Err()
  346. }
  347. return s.header.Copy(), nil
  348. }
  349. // TrailersOnly blocks until a header or trailers-only frame is received and
  350. // then returns true if the stream was trailers-only. If the stream ends
  351. // before headers are received, returns true, nil. Client-side only.
  352. func (s *Stream) TrailersOnly() bool {
  353. s.waitOnHeader()
  354. return s.noHeaders
  355. }
  356. // Trailer returns the cached trailer metedata. Note that if it is not called
  357. // after the entire stream is done, it could return an empty MD. Client
  358. // side only.
  359. // It can be safely read only after stream has ended that is either read
  360. // or write have returned io.EOF.
  361. func (s *Stream) Trailer() metadata.MD {
  362. c := s.trailer.Copy()
  363. return c
  364. }
  365. // ContentSubtype returns the content-subtype for a request. For example, a
  366. // content-subtype of "proto" will result in a content-type of
  367. // "application/grpc+proto". This will always be lowercase. See
  368. // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
  369. // more details.
  370. func (s *Stream) ContentSubtype() string {
  371. return s.contentSubtype
  372. }
  373. // Context returns the context of the stream.
  374. func (s *Stream) Context() context.Context {
  375. return s.ctx
  376. }
  377. // Method returns the method for the stream.
  378. func (s *Stream) Method() string {
  379. return s.method
  380. }
  381. // Status returns the status received from the server.
  382. // Status can be read safely only after the stream has ended,
  383. // that is, after Done() is closed.
  384. func (s *Stream) Status() *status.Status {
  385. return s.status
  386. }
  387. // SetHeader sets the header metadata. This can be called multiple times.
  388. // Server side only.
  389. // This should not be called in parallel to other data writes.
  390. func (s *Stream) SetHeader(md metadata.MD) error {
  391. if md.Len() == 0 {
  392. return nil
  393. }
  394. if s.isHeaderSent() || s.getState() == streamDone {
  395. return ErrIllegalHeaderWrite
  396. }
  397. s.hdrMu.Lock()
  398. s.header = metadata.Join(s.header, md)
  399. s.hdrMu.Unlock()
  400. return nil
  401. }
  402. // SendHeader sends the given header metadata. The given metadata is
  403. // combined with any metadata set by previous calls to SetHeader and
  404. // then written to the transport stream.
  405. func (s *Stream) SendHeader(md metadata.MD) error {
  406. return s.st.WriteHeader(s, md)
  407. }
  408. // SetTrailer sets the trailer metadata which will be sent with the RPC status
  409. // by the server. This can be called multiple times. Server side only.
  410. // This should not be called parallel to other data writes.
  411. func (s *Stream) SetTrailer(md metadata.MD) error {
  412. if md.Len() == 0 {
  413. return nil
  414. }
  415. if s.getState() == streamDone {
  416. return ErrIllegalHeaderWrite
  417. }
  418. s.hdrMu.Lock()
  419. s.trailer = metadata.Join(s.trailer, md)
  420. s.hdrMu.Unlock()
  421. return nil
  422. }
  423. func (s *Stream) write(m recvMsg) {
  424. s.buf.put(m)
  425. }
  426. // Read reads all p bytes from the wire for this stream.
  427. func (s *Stream) Read(p []byte) (n int, err error) {
  428. // Don't request a read if there was an error earlier
  429. if er := s.trReader.(*transportReader).er; er != nil {
  430. return 0, er
  431. }
  432. s.requestRead(len(p))
  433. return io.ReadFull(s.trReader, p)
  434. }
  435. // tranportReader reads all the data available for this Stream from the transport and
  436. // passes them into the decoder, which converts them into a gRPC message stream.
  437. // The error is io.EOF when the stream is done or another non-nil error if
  438. // the stream broke.
  439. type transportReader struct {
  440. reader io.Reader
  441. // The handler to control the window update procedure for both this
  442. // particular stream and the associated transport.
  443. windowHandler func(int)
  444. er error
  445. }
  446. func (t *transportReader) Read(p []byte) (n int, err error) {
  447. n, err = t.reader.Read(p)
  448. if err != nil {
  449. t.er = err
  450. return
  451. }
  452. t.windowHandler(n)
  453. return
  454. }
  455. // BytesReceived indicates whether any bytes have been received on this stream.
  456. func (s *Stream) BytesReceived() bool {
  457. return atomic.LoadUint32(&s.bytesReceived) == 1
  458. }
  459. // Unprocessed indicates whether the server did not process this stream --
  460. // i.e. it sent a refused stream or GOAWAY including this stream ID.
  461. func (s *Stream) Unprocessed() bool {
  462. return atomic.LoadUint32(&s.unprocessed) == 1
  463. }
  464. // GoString is implemented by Stream so context.String() won't
  465. // race when printing %#v.
  466. func (s *Stream) GoString() string {
  467. return fmt.Sprintf("<stream: %p, %v>", s, s.method)
  468. }
  469. // state of transport
  470. type transportState int
  471. const (
  472. reachable transportState = iota
  473. closing
  474. draining
  475. )
  476. // ServerConfig consists of all the configurations to establish a server transport.
  477. type ServerConfig struct {
  478. MaxStreams uint32
  479. ConnectionTimeout time.Duration
  480. Credentials credentials.TransportCredentials
  481. InTapHandle tap.ServerInHandle
  482. StatsHandlers []stats.Handler
  483. KeepaliveParams keepalive.ServerParameters
  484. KeepalivePolicy keepalive.EnforcementPolicy
  485. InitialWindowSize int32
  486. InitialConnWindowSize int32
  487. WriteBufferSize int
  488. ReadBufferSize int
  489. SharedWriteBuffer bool
  490. ChannelzParentID *channelz.Identifier
  491. MaxHeaderListSize *uint32
  492. HeaderTableSize *uint32
  493. }
  494. // ConnectOptions covers all relevant options for communicating with the server.
  495. type ConnectOptions struct {
  496. // UserAgent is the application user agent.
  497. UserAgent string
  498. // Dialer specifies how to dial a network address.
  499. Dialer func(context.Context, string) (net.Conn, error)
  500. // FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.
  501. FailOnNonTempDialError bool
  502. // PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.
  503. PerRPCCredentials []credentials.PerRPCCredentials
  504. // TransportCredentials stores the Authenticator required to setup a client
  505. // connection. Only one of TransportCredentials and CredsBundle is non-nil.
  506. TransportCredentials credentials.TransportCredentials
  507. // CredsBundle is the credentials bundle to be used. Only one of
  508. // TransportCredentials and CredsBundle is non-nil.
  509. CredsBundle credentials.Bundle
  510. // KeepaliveParams stores the keepalive parameters.
  511. KeepaliveParams keepalive.ClientParameters
  512. // StatsHandlers stores the handler for stats.
  513. StatsHandlers []stats.Handler
  514. // InitialWindowSize sets the initial window size for a stream.
  515. InitialWindowSize int32
  516. // InitialConnWindowSize sets the initial window size for a connection.
  517. InitialConnWindowSize int32
  518. // WriteBufferSize sets the size of write buffer which in turn determines how much data can be batched before it's written on the wire.
  519. WriteBufferSize int
  520. // ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
  521. ReadBufferSize int
  522. // SharedWriteBuffer indicates whether connections should reuse write buffer
  523. SharedWriteBuffer bool
  524. // ChannelzParentID sets the addrConn id which initiate the creation of this client transport.
  525. ChannelzParentID *channelz.Identifier
  526. // MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.
  527. MaxHeaderListSize *uint32
  528. // UseProxy specifies if a proxy should be used.
  529. UseProxy bool
  530. }
  531. // NewClientTransport establishes the transport with the required ConnectOptions
  532. // and returns it to the caller.
  533. func NewClientTransport(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (ClientTransport, error) {
  534. return newHTTP2Client(connectCtx, ctx, addr, opts, onClose)
  535. }
  536. // Options provides additional hints and information for message
  537. // transmission.
  538. type Options struct {
  539. // Last indicates whether this write is the last piece for
  540. // this stream.
  541. Last bool
  542. }
  543. // CallHdr carries the information of a particular RPC.
  544. type CallHdr struct {
  545. // Host specifies the peer's host.
  546. Host string
  547. // Method specifies the operation to perform.
  548. Method string
  549. // SendCompress specifies the compression algorithm applied on
  550. // outbound message.
  551. SendCompress string
  552. // Creds specifies credentials.PerRPCCredentials for a call.
  553. Creds credentials.PerRPCCredentials
  554. // ContentSubtype specifies the content-subtype for a request. For example, a
  555. // content-subtype of "proto" will result in a content-type of
  556. // "application/grpc+proto". The value of ContentSubtype must be all
  557. // lowercase, otherwise the behavior is undefined. See
  558. // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
  559. // for more details.
  560. ContentSubtype string
  561. PreviousAttempts int // value of grpc-previous-rpc-attempts header to set
  562. DoneFunc func() // called when the stream is finished
  563. }
  564. // ClientTransport is the common interface for all gRPC client-side transport
  565. // implementations.
  566. type ClientTransport interface {
  567. // Close tears down this transport. Once it returns, the transport
  568. // should not be accessed any more. The caller must make sure this
  569. // is called only once.
  570. Close(err error)
  571. // GracefulClose starts to tear down the transport: the transport will stop
  572. // accepting new RPCs and NewStream will return error. Once all streams are
  573. // finished, the transport will close.
  574. //
  575. // It does not block.
  576. GracefulClose()
  577. // Write sends the data for the given stream. A nil stream indicates
  578. // the write is to be performed on the transport as a whole.
  579. Write(s *Stream, hdr []byte, data []byte, opts *Options) error
  580. // NewStream creates a Stream for an RPC.
  581. NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error)
  582. // CloseStream clears the footprint of a stream when the stream is
  583. // not needed any more. The err indicates the error incurred when
  584. // CloseStream is called. Must be called when a stream is finished
  585. // unless the associated transport is closing.
  586. CloseStream(stream *Stream, err error)
  587. // Error returns a channel that is closed when some I/O error
  588. // happens. Typically the caller should have a goroutine to monitor
  589. // this in order to take action (e.g., close the current transport
  590. // and create a new one) in error case. It should not return nil
  591. // once the transport is initiated.
  592. Error() <-chan struct{}
  593. // GoAway returns a channel that is closed when ClientTransport
  594. // receives the draining signal from the server (e.g., GOAWAY frame in
  595. // HTTP/2).
  596. GoAway() <-chan struct{}
  597. // GetGoAwayReason returns the reason why GoAway frame was received, along
  598. // with a human readable string with debug info.
  599. GetGoAwayReason() (GoAwayReason, string)
  600. // RemoteAddr returns the remote network address.
  601. RemoteAddr() net.Addr
  602. // IncrMsgSent increments the number of message sent through this transport.
  603. IncrMsgSent()
  604. // IncrMsgRecv increments the number of message received through this transport.
  605. IncrMsgRecv()
  606. }
  607. // ServerTransport is the common interface for all gRPC server-side transport
  608. // implementations.
  609. //
  610. // Methods may be called concurrently from multiple goroutines, but
  611. // Write methods for a given Stream will be called serially.
  612. type ServerTransport interface {
  613. // HandleStreams receives incoming streams using the given handler.
  614. HandleStreams(func(*Stream), func(context.Context, string) context.Context)
  615. // WriteHeader sends the header metadata for the given stream.
  616. // WriteHeader may not be called on all streams.
  617. WriteHeader(s *Stream, md metadata.MD) error
  618. // Write sends the data for the given stream.
  619. // Write may not be called on all streams.
  620. Write(s *Stream, hdr []byte, data []byte, opts *Options) error
  621. // WriteStatus sends the status of a stream to the client. WriteStatus is
  622. // the final call made on a stream and always occurs.
  623. WriteStatus(s *Stream, st *status.Status) error
  624. // Close tears down the transport. Once it is called, the transport
  625. // should not be accessed any more. All the pending streams and their
  626. // handlers will be terminated asynchronously.
  627. Close(err error)
  628. // RemoteAddr returns the remote network address.
  629. RemoteAddr() net.Addr
  630. // Drain notifies the client this ServerTransport stops accepting new RPCs.
  631. Drain(debugData string)
  632. // IncrMsgSent increments the number of message sent through this transport.
  633. IncrMsgSent()
  634. // IncrMsgRecv increments the number of message received through this transport.
  635. IncrMsgRecv()
  636. }
  637. // connectionErrorf creates an ConnectionError with the specified error description.
  638. func connectionErrorf(temp bool, e error, format string, a ...any) ConnectionError {
  639. return ConnectionError{
  640. Desc: fmt.Sprintf(format, a...),
  641. temp: temp,
  642. err: e,
  643. }
  644. }
  645. // ConnectionError is an error that results in the termination of the
  646. // entire connection and the retry of all the active streams.
  647. type ConnectionError struct {
  648. Desc string
  649. temp bool
  650. err error
  651. }
  652. func (e ConnectionError) Error() string {
  653. return fmt.Sprintf("connection error: desc = %q", e.Desc)
  654. }
  655. // Temporary indicates if this connection error is temporary or fatal.
  656. func (e ConnectionError) Temporary() bool {
  657. return e.temp
  658. }
  659. // Origin returns the original error of this connection error.
  660. func (e ConnectionError) Origin() error {
  661. // Never return nil error here.
  662. // If the original error is nil, return itself.
  663. if e.err == nil {
  664. return e
  665. }
  666. return e.err
  667. }
  668. // Unwrap returns the original error of this connection error or nil when the
  669. // origin is nil.
  670. func (e ConnectionError) Unwrap() error {
  671. return e.err
  672. }
  673. var (
  674. // ErrConnClosing indicates that the transport is closing.
  675. ErrConnClosing = connectionErrorf(true, nil, "transport is closing")
  676. // errStreamDrain indicates that the stream is rejected because the
  677. // connection is draining. This could be caused by goaway or balancer
  678. // removing the address.
  679. errStreamDrain = status.Error(codes.Unavailable, "the connection is draining")
  680. // errStreamDone is returned from write at the client side to indiacte application
  681. // layer of an error.
  682. errStreamDone = errors.New("the stream is done")
  683. // StatusGoAway indicates that the server sent a GOAWAY that included this
  684. // stream's ID in unprocessed RPCs.
  685. statusGoAway = status.New(codes.Unavailable, "the stream is rejected because server is draining the connection")
  686. )
  687. // GoAwayReason contains the reason for the GoAway frame received.
  688. type GoAwayReason uint8
  689. const (
  690. // GoAwayInvalid indicates that no GoAway frame is received.
  691. GoAwayInvalid GoAwayReason = 0
  692. // GoAwayNoReason is the default value when GoAway frame is received.
  693. GoAwayNoReason GoAwayReason = 1
  694. // GoAwayTooManyPings indicates that a GoAway frame with
  695. // ErrCodeEnhanceYourCalm was received and that the debug data said
  696. // "too_many_pings".
  697. GoAwayTooManyPings GoAwayReason = 2
  698. )
  699. // channelzData is used to store channelz related data for http2Client and http2Server.
  700. // These fields cannot be embedded in the original structs (e.g. http2Client), since to do atomic
  701. // operation on int64 variable on 32-bit machine, user is responsible to enforce memory alignment.
  702. // Here, by grouping those int64 fields inside a struct, we are enforcing the alignment.
  703. type channelzData struct {
  704. kpCount int64
  705. // The number of streams that have started, including already finished ones.
  706. streamsStarted int64
  707. // Client side: The number of streams that have ended successfully by receiving
  708. // EoS bit set frame from server.
  709. // Server side: The number of streams that have ended successfully by sending
  710. // frame with EoS bit set.
  711. streamsSucceeded int64
  712. streamsFailed int64
  713. // lastStreamCreatedTime stores the timestamp that the last stream gets created. It is of int64 type
  714. // instead of time.Time since it's more costly to atomically update time.Time variable than int64
  715. // variable. The same goes for lastMsgSentTime and lastMsgRecvTime.
  716. lastStreamCreatedTime int64
  717. msgSent int64
  718. msgRecv int64
  719. lastMsgSentTime int64
  720. lastMsgRecvTime int64
  721. }
  722. // ContextErr converts the error from context package into a status error.
  723. func ContextErr(err error) error {
  724. switch err {
  725. case context.DeadlineExceeded:
  726. return status.Error(codes.DeadlineExceeded, err.Error())
  727. case context.Canceled:
  728. return status.Error(codes.Canceled, err.Error())
  729. }
  730. return status.Errorf(codes.Internal, "Unexpected error from context packet: %v", err)
  731. }