http2_client.go 29 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072
  1. /*
  2. *
  3. * Copyright 2014, Google Inc.
  4. * All rights reserved.
  5. *
  6. * Redistribution and use in source and binary forms, with or without
  7. * modification, are permitted provided that the following conditions are
  8. * met:
  9. *
  10. * * Redistributions of source code must retain the above copyright
  11. * notice, this list of conditions and the following disclaimer.
  12. * * Redistributions in binary form must reproduce the above
  13. * copyright notice, this list of conditions and the following disclaimer
  14. * in the documentation and/or other materials provided with the
  15. * distribution.
  16. * * Neither the name of Google Inc. nor the names of its
  17. * contributors may be used to endorse or promote products derived from
  18. * this software without specific prior written permission.
  19. *
  20. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  21. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  22. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  23. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  24. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  25. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  26. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  27. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  28. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  29. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  30. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  31. *
  32. */
  33. package transport
  34. import (
  35. "bytes"
  36. "fmt"
  37. "io"
  38. "math"
  39. "net"
  40. "strings"
  41. "sync"
  42. "time"
  43. "golang.org/x/net/context"
  44. "golang.org/x/net/http2"
  45. "golang.org/x/net/http2/hpack"
  46. "google.golang.org/grpc/codes"
  47. "google.golang.org/grpc/credentials"
  48. "google.golang.org/grpc/grpclog"
  49. "google.golang.org/grpc/metadata"
  50. "google.golang.org/grpc/peer"
  51. )
  52. // http2Client implements the ClientTransport interface with HTTP2.
  53. type http2Client struct {
  54. target string // server name/addr
  55. userAgent string
  56. md interface{}
  57. conn net.Conn // underlying communication channel
  58. authInfo credentials.AuthInfo // auth info about the connection
  59. nextID uint32 // the next stream ID to be used
  60. // writableChan synchronizes write access to the transport.
  61. // A writer acquires the write lock by sending a value on writableChan
  62. // and releases it by receiving from writableChan.
  63. writableChan chan int
  64. // shutdownChan is closed when Close is called.
  65. // Blocking operations should select on shutdownChan to avoid
  66. // blocking forever after Close.
  67. // TODO(zhaoq): Maybe have a channel context?
  68. shutdownChan chan struct{}
  69. // errorChan is closed to notify the I/O error to the caller.
  70. errorChan chan struct{}
  71. // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
  72. // that the server sent GoAway on this transport.
  73. goAway chan struct{}
  74. framer *framer
  75. hBuf *bytes.Buffer // the buffer for HPACK encoding
  76. hEnc *hpack.Encoder // HPACK encoder
  77. // controlBuf delivers all the control related tasks (e.g., window
  78. // updates, reset streams, and various settings) to the controller.
  79. controlBuf *recvBuffer
  80. fc *inFlow
  81. // sendQuotaPool provides flow control to outbound message.
  82. sendQuotaPool *quotaPool
  83. // streamsQuota limits the max number of concurrent streams.
  84. streamsQuota *quotaPool
  85. // The scheme used: https if TLS is on, http otherwise.
  86. scheme string
  87. creds []credentials.PerRPCCredentials
  88. mu sync.Mutex // guard the following variables
  89. state transportState // the state of underlying connection
  90. activeStreams map[uint32]*Stream
  91. // The max number of concurrent streams
  92. maxStreams int
  93. // the per-stream outbound flow control window size set by the peer.
  94. streamSendQuota uint32
  95. // goAwayID records the Last-Stream-ID in the GoAway frame from the server.
  96. goAwayID uint32
  97. // prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
  98. prevGoAwayID uint32
  99. }
  100. func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
  101. if fn != nil {
  102. return fn(ctx, addr)
  103. }
  104. return dialContext(ctx, "tcp", addr)
  105. }
  106. func isTemporary(err error) bool {
  107. switch err {
  108. case io.EOF:
  109. // Connection closures may be resolved upon retry, and are thus
  110. // treated as temporary.
  111. return true
  112. case context.DeadlineExceeded:
  113. // In Go 1.7, context.DeadlineExceeded implements Timeout(), and this
  114. // special case is not needed. Until then, we need to keep this
  115. // clause.
  116. return true
  117. }
  118. switch err := err.(type) {
  119. case interface {
  120. Temporary() bool
  121. }:
  122. return err.Temporary()
  123. case interface {
  124. Timeout() bool
  125. }:
  126. // Timeouts may be resolved upon retry, and are thus treated as
  127. // temporary.
  128. return err.Timeout()
  129. }
  130. return false
  131. }
  132. // newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
  133. // and starts to receive messages on it. Non-nil error returns if construction
  134. // fails.
  135. func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (_ ClientTransport, err error) {
  136. scheme := "http"
  137. conn, err := dial(ctx, opts.Dialer, addr.Addr)
  138. if err != nil {
  139. return nil, connectionErrorf(true, err, "transport: %v", err)
  140. }
  141. // Any further errors will close the underlying connection
  142. defer func(conn net.Conn) {
  143. if err != nil {
  144. conn.Close()
  145. }
  146. }(conn)
  147. var authInfo credentials.AuthInfo
  148. if creds := opts.TransportCredentials; creds != nil {
  149. scheme = "https"
  150. conn, authInfo, err = creds.ClientHandshake(ctx, addr.Addr, conn)
  151. if err != nil {
  152. // Credentials handshake errors are typically considered permanent
  153. // to avoid retrying on e.g. bad certificates.
  154. temp := isTemporary(err)
  155. return nil, connectionErrorf(temp, err, "transport: %v", err)
  156. }
  157. }
  158. ua := primaryUA
  159. if opts.UserAgent != "" {
  160. ua = opts.UserAgent + " " + ua
  161. }
  162. var buf bytes.Buffer
  163. t := &http2Client{
  164. target: addr.Addr,
  165. userAgent: ua,
  166. md: addr.Metadata,
  167. conn: conn,
  168. authInfo: authInfo,
  169. // The client initiated stream id is odd starting from 1.
  170. nextID: 1,
  171. writableChan: make(chan int, 1),
  172. shutdownChan: make(chan struct{}),
  173. errorChan: make(chan struct{}),
  174. goAway: make(chan struct{}),
  175. framer: newFramer(conn),
  176. hBuf: &buf,
  177. hEnc: hpack.NewEncoder(&buf),
  178. controlBuf: newRecvBuffer(),
  179. fc: &inFlow{limit: initialConnWindowSize},
  180. sendQuotaPool: newQuotaPool(defaultWindowSize),
  181. scheme: scheme,
  182. state: reachable,
  183. activeStreams: make(map[uint32]*Stream),
  184. creds: opts.PerRPCCredentials,
  185. maxStreams: math.MaxInt32,
  186. streamSendQuota: defaultWindowSize,
  187. }
  188. // Start the reader goroutine for incoming message. Each transport has
  189. // a dedicated goroutine which reads HTTP2 frame from network. Then it
  190. // dispatches the frame to the corresponding stream entity.
  191. go t.reader()
  192. // Send connection preface to server.
  193. n, err := t.conn.Write(clientPreface)
  194. if err != nil {
  195. t.Close()
  196. return nil, connectionErrorf(true, err, "transport: %v", err)
  197. }
  198. if n != len(clientPreface) {
  199. t.Close()
  200. return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
  201. }
  202. if initialWindowSize != defaultWindowSize {
  203. err = t.framer.writeSettings(true, http2.Setting{
  204. ID: http2.SettingInitialWindowSize,
  205. Val: uint32(initialWindowSize),
  206. })
  207. } else {
  208. err = t.framer.writeSettings(true)
  209. }
  210. if err != nil {
  211. t.Close()
  212. return nil, connectionErrorf(true, err, "transport: %v", err)
  213. }
  214. // Adjust the connection flow control window if needed.
  215. if delta := uint32(initialConnWindowSize - defaultWindowSize); delta > 0 {
  216. if err := t.framer.writeWindowUpdate(true, 0, delta); err != nil {
  217. t.Close()
  218. return nil, connectionErrorf(true, err, "transport: %v", err)
  219. }
  220. }
  221. go t.controller()
  222. t.writableChan <- 0
  223. return t, nil
  224. }
  225. func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
  226. // TODO(zhaoq): Handle uint32 overflow of Stream.id.
  227. s := &Stream{
  228. id: t.nextID,
  229. done: make(chan struct{}),
  230. goAway: make(chan struct{}),
  231. method: callHdr.Method,
  232. sendCompress: callHdr.SendCompress,
  233. buf: newRecvBuffer(),
  234. fc: &inFlow{limit: initialWindowSize},
  235. sendQuotaPool: newQuotaPool(int(t.streamSendQuota)),
  236. headerChan: make(chan struct{}),
  237. }
  238. t.nextID += 2
  239. s.windowHandler = func(n int) {
  240. t.updateWindow(s, uint32(n))
  241. }
  242. // The client side stream context should have exactly the same life cycle with the user provided context.
  243. // That means, s.ctx should be read-only. And s.ctx is done iff ctx is done.
  244. // So we use the original context here instead of creating a copy.
  245. s.ctx = ctx
  246. s.dec = &recvBufferReader{
  247. ctx: s.ctx,
  248. goAway: s.goAway,
  249. recv: s.buf,
  250. }
  251. return s
  252. }
  253. // NewStream creates a stream and register it into the transport as "active"
  254. // streams.
  255. func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
  256. pr := &peer.Peer{
  257. Addr: t.conn.RemoteAddr(),
  258. }
  259. // Attach Auth info if there is any.
  260. if t.authInfo != nil {
  261. pr.AuthInfo = t.authInfo
  262. }
  263. ctx = peer.NewContext(ctx, pr)
  264. authData := make(map[string]string)
  265. for _, c := range t.creds {
  266. // Construct URI required to get auth request metadata.
  267. var port string
  268. if pos := strings.LastIndex(t.target, ":"); pos != -1 {
  269. // Omit port if it is the default one.
  270. if t.target[pos+1:] != "443" {
  271. port = ":" + t.target[pos+1:]
  272. }
  273. }
  274. pos := strings.LastIndex(callHdr.Method, "/")
  275. if pos == -1 {
  276. return nil, streamErrorf(codes.InvalidArgument, "transport: malformed method name: %q", callHdr.Method)
  277. }
  278. audience := "https://" + callHdr.Host + port + callHdr.Method[:pos]
  279. data, err := c.GetRequestMetadata(ctx, audience)
  280. if err != nil {
  281. return nil, streamErrorf(codes.InvalidArgument, "transport: %v", err)
  282. }
  283. for k, v := range data {
  284. authData[k] = v
  285. }
  286. }
  287. t.mu.Lock()
  288. if t.activeStreams == nil {
  289. t.mu.Unlock()
  290. return nil, ErrConnClosing
  291. }
  292. if t.state == draining {
  293. t.mu.Unlock()
  294. return nil, ErrStreamDrain
  295. }
  296. if t.state != reachable {
  297. t.mu.Unlock()
  298. return nil, ErrConnClosing
  299. }
  300. checkStreamsQuota := t.streamsQuota != nil
  301. t.mu.Unlock()
  302. if checkStreamsQuota {
  303. sq, err := wait(ctx, nil, nil, t.shutdownChan, t.streamsQuota.acquire())
  304. if err != nil {
  305. return nil, err
  306. }
  307. // Returns the quota balance back.
  308. if sq > 1 {
  309. t.streamsQuota.add(sq - 1)
  310. }
  311. }
  312. if _, err := wait(ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
  313. // Return the quota back now because there is no stream returned to the caller.
  314. if _, ok := err.(StreamError); ok && checkStreamsQuota {
  315. t.streamsQuota.add(1)
  316. }
  317. return nil, err
  318. }
  319. t.mu.Lock()
  320. if t.state == draining {
  321. t.mu.Unlock()
  322. if checkStreamsQuota {
  323. t.streamsQuota.add(1)
  324. }
  325. // Need to make t writable again so that the rpc in flight can still proceed.
  326. t.writableChan <- 0
  327. return nil, ErrStreamDrain
  328. }
  329. if t.state != reachable {
  330. t.mu.Unlock()
  331. return nil, ErrConnClosing
  332. }
  333. s := t.newStream(ctx, callHdr)
  334. t.activeStreams[s.id] = s
  335. // This stream is not counted when applySetings(...) initialize t.streamsQuota.
  336. // Reset t.streamsQuota to the right value.
  337. var reset bool
  338. if !checkStreamsQuota && t.streamsQuota != nil {
  339. reset = true
  340. }
  341. t.mu.Unlock()
  342. if reset {
  343. t.streamsQuota.reset(-1)
  344. }
  345. // HPACK encodes various headers. Note that once WriteField(...) is
  346. // called, the corresponding headers/continuation frame has to be sent
  347. // because hpack.Encoder is stateful.
  348. t.hBuf.Reset()
  349. t.hEnc.WriteField(hpack.HeaderField{Name: ":method", Value: "POST"})
  350. t.hEnc.WriteField(hpack.HeaderField{Name: ":scheme", Value: t.scheme})
  351. t.hEnc.WriteField(hpack.HeaderField{Name: ":path", Value: callHdr.Method})
  352. t.hEnc.WriteField(hpack.HeaderField{Name: ":authority", Value: callHdr.Host})
  353. t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
  354. t.hEnc.WriteField(hpack.HeaderField{Name: "user-agent", Value: t.userAgent})
  355. t.hEnc.WriteField(hpack.HeaderField{Name: "te", Value: "trailers"})
  356. if callHdr.SendCompress != "" {
  357. t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
  358. }
  359. if dl, ok := ctx.Deadline(); ok {
  360. // Send out timeout regardless its value. The server can detect timeout context by itself.
  361. timeout := dl.Sub(time.Now())
  362. t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)})
  363. }
  364. for k, v := range authData {
  365. // Capital header names are illegal in HTTP/2.
  366. k = strings.ToLower(k)
  367. t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: v})
  368. }
  369. var (
  370. hasMD bool
  371. endHeaders bool
  372. )
  373. if md, ok := metadata.FromContext(ctx); ok {
  374. hasMD = true
  375. for k, v := range md {
  376. // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
  377. if isReservedHeader(k) {
  378. continue
  379. }
  380. for _, entry := range v {
  381. t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry})
  382. }
  383. }
  384. }
  385. if md, ok := t.md.(*metadata.MD); ok {
  386. for k, v := range *md {
  387. if isReservedHeader(k) {
  388. continue
  389. }
  390. for _, entry := range v {
  391. t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry})
  392. }
  393. }
  394. }
  395. first := true
  396. // Sends the headers in a single batch even when they span multiple frames.
  397. for !endHeaders {
  398. size := t.hBuf.Len()
  399. if size > http2MaxFrameLen {
  400. size = http2MaxFrameLen
  401. } else {
  402. endHeaders = true
  403. }
  404. var flush bool
  405. if endHeaders && (hasMD || callHdr.Flush) {
  406. flush = true
  407. }
  408. if first {
  409. // Sends a HeadersFrame to server to start a new stream.
  410. p := http2.HeadersFrameParam{
  411. StreamID: s.id,
  412. BlockFragment: t.hBuf.Next(size),
  413. EndStream: false,
  414. EndHeaders: endHeaders,
  415. }
  416. // Do a force flush for the buffered frames iff it is the last headers frame
  417. // and there is header metadata to be sent. Otherwise, there is flushing until
  418. // the corresponding data frame is written.
  419. err = t.framer.writeHeaders(flush, p)
  420. first = false
  421. } else {
  422. // Sends Continuation frames for the leftover headers.
  423. err = t.framer.writeContinuation(flush, s.id, endHeaders, t.hBuf.Next(size))
  424. }
  425. if err != nil {
  426. t.notifyError(err)
  427. return nil, connectionErrorf(true, err, "transport: %v", err)
  428. }
  429. }
  430. t.writableChan <- 0
  431. return s, nil
  432. }
  433. // CloseStream clears the footprint of a stream when the stream is not needed any more.
  434. // This must not be executed in reader's goroutine.
  435. func (t *http2Client) CloseStream(s *Stream, err error) {
  436. var updateStreams bool
  437. t.mu.Lock()
  438. if t.activeStreams == nil {
  439. t.mu.Unlock()
  440. return
  441. }
  442. if t.streamsQuota != nil {
  443. updateStreams = true
  444. }
  445. delete(t.activeStreams, s.id)
  446. if t.state == draining && len(t.activeStreams) == 0 {
  447. // The transport is draining and s is the last live stream on t.
  448. t.mu.Unlock()
  449. t.Close()
  450. return
  451. }
  452. t.mu.Unlock()
  453. if updateStreams {
  454. t.streamsQuota.add(1)
  455. }
  456. s.mu.Lock()
  457. if q := s.fc.resetPendingData(); q > 0 {
  458. if n := t.fc.onRead(q); n > 0 {
  459. t.controlBuf.put(&windowUpdate{0, n})
  460. }
  461. }
  462. if s.state == streamDone {
  463. s.mu.Unlock()
  464. return
  465. }
  466. if !s.headerDone {
  467. close(s.headerChan)
  468. s.headerDone = true
  469. }
  470. s.state = streamDone
  471. s.mu.Unlock()
  472. if se, ok := err.(StreamError); ok && se.Code != codes.DeadlineExceeded {
  473. t.controlBuf.put(&resetStream{s.id, http2.ErrCodeCancel})
  474. }
  475. }
  476. // Close kicks off the shutdown process of the transport. This should be called
  477. // only once on a transport. Once it is called, the transport should not be
  478. // accessed any more.
  479. func (t *http2Client) Close() (err error) {
  480. t.mu.Lock()
  481. if t.state == closing {
  482. t.mu.Unlock()
  483. return
  484. }
  485. if t.state == reachable || t.state == draining {
  486. close(t.errorChan)
  487. }
  488. t.state = closing
  489. t.mu.Unlock()
  490. close(t.shutdownChan)
  491. err = t.conn.Close()
  492. t.mu.Lock()
  493. streams := t.activeStreams
  494. t.activeStreams = nil
  495. t.mu.Unlock()
  496. // Notify all active streams.
  497. for _, s := range streams {
  498. s.mu.Lock()
  499. if !s.headerDone {
  500. close(s.headerChan)
  501. s.headerDone = true
  502. }
  503. s.mu.Unlock()
  504. s.write(recvMsg{err: ErrConnClosing})
  505. }
  506. return
  507. }
  508. func (t *http2Client) GracefulClose() error {
  509. t.mu.Lock()
  510. switch t.state {
  511. case unreachable:
  512. // The server may close the connection concurrently. t is not available for
  513. // any streams. Close it now.
  514. t.mu.Unlock()
  515. t.Close()
  516. return nil
  517. case closing:
  518. t.mu.Unlock()
  519. return nil
  520. }
  521. // Notify the streams which were initiated after the server sent GOAWAY.
  522. select {
  523. case <-t.goAway:
  524. n := t.prevGoAwayID
  525. if n == 0 && t.nextID > 1 {
  526. n = t.nextID - 2
  527. }
  528. m := t.goAwayID + 2
  529. if m == 2 {
  530. m = 1
  531. }
  532. for i := m; i <= n; i += 2 {
  533. if s, ok := t.activeStreams[i]; ok {
  534. close(s.goAway)
  535. }
  536. }
  537. default:
  538. }
  539. if t.state == draining {
  540. t.mu.Unlock()
  541. return nil
  542. }
  543. t.state = draining
  544. active := len(t.activeStreams)
  545. t.mu.Unlock()
  546. if active == 0 {
  547. return t.Close()
  548. }
  549. return nil
  550. }
  551. // Write formats the data into HTTP2 data frame(s) and sends it out. The caller
  552. // should proceed only if Write returns nil.
  553. // TODO(zhaoq): opts.Delay is ignored in this implementation. Support it later
  554. // if it improves the performance.
  555. func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
  556. r := bytes.NewBuffer(data)
  557. for {
  558. var p []byte
  559. if r.Len() > 0 {
  560. size := http2MaxFrameLen
  561. s.sendQuotaPool.add(0)
  562. // Wait until the stream has some quota to send the data.
  563. sq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, s.sendQuotaPool.acquire())
  564. if err != nil {
  565. return err
  566. }
  567. t.sendQuotaPool.add(0)
  568. // Wait until the transport has some quota to send the data.
  569. tq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.sendQuotaPool.acquire())
  570. if err != nil {
  571. if _, ok := err.(StreamError); ok || err == io.EOF {
  572. t.sendQuotaPool.cancel()
  573. }
  574. return err
  575. }
  576. if sq < size {
  577. size = sq
  578. }
  579. if tq < size {
  580. size = tq
  581. }
  582. p = r.Next(size)
  583. ps := len(p)
  584. if ps < sq {
  585. // Overbooked stream quota. Return it back.
  586. s.sendQuotaPool.add(sq - ps)
  587. }
  588. if ps < tq {
  589. // Overbooked transport quota. Return it back.
  590. t.sendQuotaPool.add(tq - ps)
  591. }
  592. }
  593. var (
  594. endStream bool
  595. forceFlush bool
  596. )
  597. if opts.Last && r.Len() == 0 {
  598. endStream = true
  599. }
  600. // Indicate there is a writer who is about to write a data frame.
  601. t.framer.adjustNumWriters(1)
  602. // Got some quota. Try to acquire writing privilege on the transport.
  603. if _, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.writableChan); err != nil {
  604. if _, ok := err.(StreamError); ok || err == io.EOF {
  605. // Return the connection quota back.
  606. t.sendQuotaPool.add(len(p))
  607. }
  608. if t.framer.adjustNumWriters(-1) == 0 {
  609. // This writer is the last one in this batch and has the
  610. // responsibility to flush the buffered frames. It queues
  611. // a flush request to controlBuf instead of flushing directly
  612. // in order to avoid the race with other writing or flushing.
  613. t.controlBuf.put(&flushIO{})
  614. }
  615. return err
  616. }
  617. select {
  618. case <-s.ctx.Done():
  619. t.sendQuotaPool.add(len(p))
  620. if t.framer.adjustNumWriters(-1) == 0 {
  621. t.controlBuf.put(&flushIO{})
  622. }
  623. t.writableChan <- 0
  624. return ContextErr(s.ctx.Err())
  625. default:
  626. }
  627. if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 {
  628. // Do a force flush iff this is last frame for the entire gRPC message
  629. // and the caller is the only writer at this moment.
  630. forceFlush = true
  631. }
  632. // If WriteData fails, all the pending streams will be handled
  633. // by http2Client.Close(). No explicit CloseStream() needs to be
  634. // invoked.
  635. if err := t.framer.writeData(forceFlush, s.id, endStream, p); err != nil {
  636. t.notifyError(err)
  637. return connectionErrorf(true, err, "transport: %v", err)
  638. }
  639. if t.framer.adjustNumWriters(-1) == 0 {
  640. t.framer.flushWrite()
  641. }
  642. t.writableChan <- 0
  643. if r.Len() == 0 {
  644. break
  645. }
  646. }
  647. if !opts.Last {
  648. return nil
  649. }
  650. s.mu.Lock()
  651. if s.state != streamDone {
  652. s.state = streamWriteDone
  653. }
  654. s.mu.Unlock()
  655. return nil
  656. }
  657. func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) {
  658. t.mu.Lock()
  659. defer t.mu.Unlock()
  660. s, ok := t.activeStreams[f.Header().StreamID]
  661. return s, ok
  662. }
  663. // updateWindow adjusts the inbound quota for the stream and the transport.
  664. // Window updates will deliver to the controller for sending when
  665. // the cumulative quota exceeds the corresponding threshold.
  666. func (t *http2Client) updateWindow(s *Stream, n uint32) {
  667. s.mu.Lock()
  668. defer s.mu.Unlock()
  669. if s.state == streamDone {
  670. return
  671. }
  672. if w := t.fc.onRead(n); w > 0 {
  673. t.controlBuf.put(&windowUpdate{0, w})
  674. }
  675. if w := s.fc.onRead(n); w > 0 {
  676. t.controlBuf.put(&windowUpdate{s.id, w})
  677. }
  678. }
  679. func (t *http2Client) handleData(f *http2.DataFrame) {
  680. size := len(f.Data())
  681. if err := t.fc.onData(uint32(size)); err != nil {
  682. t.notifyError(connectionErrorf(true, err, "%v", err))
  683. return
  684. }
  685. // Select the right stream to dispatch.
  686. s, ok := t.getStream(f)
  687. if !ok {
  688. if w := t.fc.onRead(uint32(size)); w > 0 {
  689. t.controlBuf.put(&windowUpdate{0, w})
  690. }
  691. return
  692. }
  693. if size > 0 {
  694. s.mu.Lock()
  695. if s.state == streamDone {
  696. s.mu.Unlock()
  697. // The stream has been closed. Release the corresponding quota.
  698. if w := t.fc.onRead(uint32(size)); w > 0 {
  699. t.controlBuf.put(&windowUpdate{0, w})
  700. }
  701. return
  702. }
  703. if err := s.fc.onData(uint32(size)); err != nil {
  704. s.state = streamDone
  705. s.statusCode = codes.Internal
  706. s.statusDesc = err.Error()
  707. close(s.done)
  708. s.mu.Unlock()
  709. s.write(recvMsg{err: io.EOF})
  710. t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl})
  711. return
  712. }
  713. s.mu.Unlock()
  714. // TODO(bradfitz, zhaoq): A copy is required here because there is no
  715. // guarantee f.Data() is consumed before the arrival of next frame.
  716. // Can this copy be eliminated?
  717. data := make([]byte, size)
  718. copy(data, f.Data())
  719. s.write(recvMsg{data: data})
  720. }
  721. // The server has closed the stream without sending trailers. Record that
  722. // the read direction is closed, and set the status appropriately.
  723. if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) {
  724. s.mu.Lock()
  725. if s.state == streamDone {
  726. s.mu.Unlock()
  727. return
  728. }
  729. s.state = streamDone
  730. s.statusCode = codes.Internal
  731. s.statusDesc = "server closed the stream without sending trailers"
  732. close(s.done)
  733. s.mu.Unlock()
  734. s.write(recvMsg{err: io.EOF})
  735. }
  736. }
  737. func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
  738. s, ok := t.getStream(f)
  739. if !ok {
  740. return
  741. }
  742. s.mu.Lock()
  743. if s.state == streamDone {
  744. s.mu.Unlock()
  745. return
  746. }
  747. s.state = streamDone
  748. if !s.headerDone {
  749. close(s.headerChan)
  750. s.headerDone = true
  751. }
  752. s.statusCode, ok = http2ErrConvTab[http2.ErrCode(f.ErrCode)]
  753. if !ok {
  754. grpclog.Println("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error ", f.ErrCode)
  755. s.statusCode = codes.Unknown
  756. }
  757. s.statusDesc = fmt.Sprintf("stream terminated by RST_STREAM with error code: %d", f.ErrCode)
  758. close(s.done)
  759. s.mu.Unlock()
  760. s.write(recvMsg{err: io.EOF})
  761. }
  762. func (t *http2Client) handleSettings(f *http2.SettingsFrame) {
  763. if f.IsAck() {
  764. return
  765. }
  766. var ss []http2.Setting
  767. f.ForeachSetting(func(s http2.Setting) error {
  768. ss = append(ss, s)
  769. return nil
  770. })
  771. // The settings will be applied once the ack is sent.
  772. t.controlBuf.put(&settings{ack: true, ss: ss})
  773. }
  774. func (t *http2Client) handlePing(f *http2.PingFrame) {
  775. if f.IsAck() { // Do nothing.
  776. return
  777. }
  778. pingAck := &ping{ack: true}
  779. copy(pingAck.data[:], f.Data[:])
  780. t.controlBuf.put(pingAck)
  781. }
  782. func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
  783. t.mu.Lock()
  784. if t.state == reachable || t.state == draining {
  785. if f.LastStreamID > 0 && f.LastStreamID%2 != 1 {
  786. t.mu.Unlock()
  787. t.notifyError(connectionErrorf(true, nil, "received illegal http2 GOAWAY frame: stream ID %d is even", f.LastStreamID))
  788. return
  789. }
  790. select {
  791. case <-t.goAway:
  792. id := t.goAwayID
  793. // t.goAway has been closed (i.e.,multiple GoAways).
  794. if id < f.LastStreamID {
  795. t.mu.Unlock()
  796. t.notifyError(connectionErrorf(true, nil, "received illegal http2 GOAWAY frame: previously recv GOAWAY frame with LastStramID %d, currently recv %d", id, f.LastStreamID))
  797. return
  798. }
  799. t.prevGoAwayID = id
  800. t.goAwayID = f.LastStreamID
  801. t.mu.Unlock()
  802. return
  803. default:
  804. }
  805. t.goAwayID = f.LastStreamID
  806. close(t.goAway)
  807. }
  808. t.mu.Unlock()
  809. }
  810. func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
  811. id := f.Header().StreamID
  812. incr := f.Increment
  813. if id == 0 {
  814. t.sendQuotaPool.add(int(incr))
  815. return
  816. }
  817. if s, ok := t.getStream(f); ok {
  818. s.sendQuotaPool.add(int(incr))
  819. }
  820. }
  821. // operateHeaders takes action on the decoded headers.
  822. func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
  823. s, ok := t.getStream(frame)
  824. if !ok {
  825. return
  826. }
  827. var state decodeState
  828. for _, hf := range frame.Fields {
  829. state.processHeaderField(hf)
  830. }
  831. if state.err != nil {
  832. s.mu.Lock()
  833. if !s.headerDone {
  834. close(s.headerChan)
  835. s.headerDone = true
  836. }
  837. s.mu.Unlock()
  838. s.write(recvMsg{err: state.err})
  839. // Something wrong. Stops reading even when there is remaining.
  840. return
  841. }
  842. endStream := frame.StreamEnded()
  843. s.mu.Lock()
  844. if !endStream {
  845. s.recvCompress = state.encoding
  846. }
  847. if !s.headerDone {
  848. if !endStream && len(state.mdata) > 0 {
  849. s.header = state.mdata
  850. }
  851. close(s.headerChan)
  852. s.headerDone = true
  853. }
  854. if !endStream || s.state == streamDone {
  855. s.mu.Unlock()
  856. return
  857. }
  858. if len(state.mdata) > 0 {
  859. s.trailer = state.mdata
  860. }
  861. s.statusCode = state.statusCode
  862. s.statusDesc = state.statusDesc
  863. close(s.done)
  864. s.state = streamDone
  865. s.mu.Unlock()
  866. s.write(recvMsg{err: io.EOF})
  867. }
  868. func handleMalformedHTTP2(s *Stream, err error) {
  869. s.mu.Lock()
  870. if !s.headerDone {
  871. close(s.headerChan)
  872. s.headerDone = true
  873. }
  874. s.mu.Unlock()
  875. s.write(recvMsg{err: err})
  876. }
  877. // reader runs as a separate goroutine in charge of reading data from network
  878. // connection.
  879. //
  880. // TODO(zhaoq): currently one reader per transport. Investigate whether this is
  881. // optimal.
  882. // TODO(zhaoq): Check the validity of the incoming frame sequence.
  883. func (t *http2Client) reader() {
  884. // Check the validity of server preface.
  885. frame, err := t.framer.readFrame()
  886. if err != nil {
  887. t.notifyError(err)
  888. return
  889. }
  890. sf, ok := frame.(*http2.SettingsFrame)
  891. if !ok {
  892. t.notifyError(err)
  893. return
  894. }
  895. t.handleSettings(sf)
  896. // loop to keep reading incoming messages on this transport.
  897. for {
  898. frame, err := t.framer.readFrame()
  899. if err != nil {
  900. // Abort an active stream if the http2.Framer returns a
  901. // http2.StreamError. This can happen only if the server's response
  902. // is malformed http2.
  903. if se, ok := err.(http2.StreamError); ok {
  904. t.mu.Lock()
  905. s := t.activeStreams[se.StreamID]
  906. t.mu.Unlock()
  907. if s != nil {
  908. // use error detail to provide better err message
  909. handleMalformedHTTP2(s, streamErrorf(http2ErrConvTab[se.Code], "%v", t.framer.errorDetail()))
  910. }
  911. continue
  912. } else {
  913. // Transport error.
  914. t.notifyError(err)
  915. return
  916. }
  917. }
  918. switch frame := frame.(type) {
  919. case *http2.MetaHeadersFrame:
  920. t.operateHeaders(frame)
  921. case *http2.DataFrame:
  922. t.handleData(frame)
  923. case *http2.RSTStreamFrame:
  924. t.handleRSTStream(frame)
  925. case *http2.SettingsFrame:
  926. t.handleSettings(frame)
  927. case *http2.PingFrame:
  928. t.handlePing(frame)
  929. case *http2.GoAwayFrame:
  930. t.handleGoAway(frame)
  931. case *http2.WindowUpdateFrame:
  932. t.handleWindowUpdate(frame)
  933. default:
  934. grpclog.Printf("transport: http2Client.reader got unhandled frame type %v.", frame)
  935. }
  936. }
  937. }
  938. func (t *http2Client) applySettings(ss []http2.Setting) {
  939. for _, s := range ss {
  940. switch s.ID {
  941. case http2.SettingMaxConcurrentStreams:
  942. // TODO(zhaoq): This is a hack to avoid significant refactoring of the
  943. // code to deal with the unrealistic int32 overflow. Probably will try
  944. // to find a better way to handle this later.
  945. if s.Val > math.MaxInt32 {
  946. s.Val = math.MaxInt32
  947. }
  948. t.mu.Lock()
  949. reset := t.streamsQuota != nil
  950. if !reset {
  951. t.streamsQuota = newQuotaPool(int(s.Val) - len(t.activeStreams))
  952. }
  953. ms := t.maxStreams
  954. t.maxStreams = int(s.Val)
  955. t.mu.Unlock()
  956. if reset {
  957. t.streamsQuota.reset(int(s.Val) - ms)
  958. }
  959. case http2.SettingInitialWindowSize:
  960. t.mu.Lock()
  961. for _, stream := range t.activeStreams {
  962. // Adjust the sending quota for each stream.
  963. stream.sendQuotaPool.reset(int(s.Val - t.streamSendQuota))
  964. }
  965. t.streamSendQuota = s.Val
  966. t.mu.Unlock()
  967. }
  968. }
  969. }
  970. // controller running in a separate goroutine takes charge of sending control
  971. // frames (e.g., window update, reset stream, setting, etc.) to the server.
  972. func (t *http2Client) controller() {
  973. for {
  974. select {
  975. case i := <-t.controlBuf.get():
  976. t.controlBuf.load()
  977. select {
  978. case <-t.writableChan:
  979. switch i := i.(type) {
  980. case *windowUpdate:
  981. t.framer.writeWindowUpdate(true, i.streamID, i.increment)
  982. case *settings:
  983. if i.ack {
  984. t.framer.writeSettingsAck(true)
  985. t.applySettings(i.ss)
  986. } else {
  987. t.framer.writeSettings(true, i.ss...)
  988. }
  989. case *resetStream:
  990. t.framer.writeRSTStream(true, i.streamID, i.code)
  991. case *flushIO:
  992. t.framer.flushWrite()
  993. case *ping:
  994. t.framer.writePing(true, i.ack, i.data)
  995. default:
  996. grpclog.Printf("transport: http2Client.controller got unexpected item type %v\n", i)
  997. }
  998. t.writableChan <- 0
  999. continue
  1000. case <-t.shutdownChan:
  1001. return
  1002. }
  1003. case <-t.shutdownChan:
  1004. return
  1005. }
  1006. }
  1007. }
  1008. func (t *http2Client) Error() <-chan struct{} {
  1009. return t.errorChan
  1010. }
  1011. func (t *http2Client) GoAway() <-chan struct{} {
  1012. return t.goAway
  1013. }
  1014. func (t *http2Client) notifyError(err error) {
  1015. t.mu.Lock()
  1016. // make sure t.errorChan is closed only once.
  1017. if t.state == draining {
  1018. t.mu.Unlock()
  1019. t.Close()
  1020. return
  1021. }
  1022. if t.state == reachable {
  1023. t.state = unreachable
  1024. close(t.errorChan)
  1025. grpclog.Printf("transport: http2Client.notifyError got notified that the client transport was broken %v.", err)
  1026. }
  1027. t.mu.Unlock()
  1028. }