http2_server.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775
  1. /*
  2. *
  3. * Copyright 2014, Google Inc.
  4. * All rights reserved.
  5. *
  6. * Redistribution and use in source and binary forms, with or without
  7. * modification, are permitted provided that the following conditions are
  8. * met:
  9. *
  10. * * Redistributions of source code must retain the above copyright
  11. * notice, this list of conditions and the following disclaimer.
  12. * * Redistributions in binary form must reproduce the above
  13. * copyright notice, this list of conditions and the following disclaimer
  14. * in the documentation and/or other materials provided with the
  15. * distribution.
  16. * * Neither the name of Google Inc. nor the names of its
  17. * contributors may be used to endorse or promote products derived from
  18. * this software without specific prior written permission.
  19. *
  20. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  21. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  22. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  23. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  24. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  25. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  26. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  27. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  28. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  29. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  30. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  31. *
  32. */
  33. package transport
  34. import (
  35. "bytes"
  36. "errors"
  37. "io"
  38. "math"
  39. "net"
  40. "strconv"
  41. "sync"
  42. "golang.org/x/net/context"
  43. "golang.org/x/net/http2"
  44. "golang.org/x/net/http2/hpack"
  45. "google.golang.org/grpc/codes"
  46. "google.golang.org/grpc/credentials"
  47. "google.golang.org/grpc/grpclog"
  48. "google.golang.org/grpc/metadata"
  49. "google.golang.org/grpc/peer"
  50. )
  51. // ErrIllegalHeaderWrite indicates that setting header is illegal because of
  52. // the stream's state.
  53. var ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
  54. // http2Server implements the ServerTransport interface with HTTP2.
  55. type http2Server struct {
  56. conn net.Conn
  57. maxStreamID uint32 // max stream ID ever seen
  58. authInfo credentials.AuthInfo // auth info about the connection
  59. // writableChan synchronizes write access to the transport.
  60. // A writer acquires the write lock by receiving a value on writableChan
  61. // and releases it by sending on writableChan.
  62. writableChan chan int
  63. // shutdownChan is closed when Close is called.
  64. // Blocking operations should select on shutdownChan to avoid
  65. // blocking forever after Close.
  66. shutdownChan chan struct{}
  67. framer *framer
  68. hBuf *bytes.Buffer // the buffer for HPACK encoding
  69. hEnc *hpack.Encoder // HPACK encoder
  70. // The max number of concurrent streams.
  71. maxStreams uint32
  72. // controlBuf delivers all the control related tasks (e.g., window
  73. // updates, reset streams, and various settings) to the controller.
  74. controlBuf *recvBuffer
  75. fc *inFlow
  76. // sendQuotaPool provides flow control to outbound message.
  77. sendQuotaPool *quotaPool
  78. mu sync.Mutex // guard the following
  79. state transportState
  80. activeStreams map[uint32]*Stream
  81. // the per-stream outbound flow control window size set by the peer.
  82. streamSendQuota uint32
  83. }
  84. // newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
  85. // returned if something goes wrong.
  86. func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthInfo) (_ ServerTransport, err error) {
  87. framer := newFramer(conn)
  88. // Send initial settings as connection preface to client.
  89. var settings []http2.Setting
  90. // TODO(zhaoq): Have a better way to signal "no limit" because 0 is
  91. // permitted in the HTTP2 spec.
  92. if maxStreams == 0 {
  93. maxStreams = math.MaxUint32
  94. } else {
  95. settings = append(settings, http2.Setting{
  96. ID: http2.SettingMaxConcurrentStreams,
  97. Val: maxStreams,
  98. })
  99. }
  100. if initialWindowSize != defaultWindowSize {
  101. settings = append(settings, http2.Setting{
  102. ID: http2.SettingInitialWindowSize,
  103. Val: uint32(initialWindowSize)})
  104. }
  105. if err := framer.writeSettings(true, settings...); err != nil {
  106. return nil, connectionErrorf(true, err, "transport: %v", err)
  107. }
  108. // Adjust the connection flow control window if needed.
  109. if delta := uint32(initialConnWindowSize - defaultWindowSize); delta > 0 {
  110. if err := framer.writeWindowUpdate(true, 0, delta); err != nil {
  111. return nil, connectionErrorf(true, err, "transport: %v", err)
  112. }
  113. }
  114. var buf bytes.Buffer
  115. t := &http2Server{
  116. conn: conn,
  117. authInfo: authInfo,
  118. framer: framer,
  119. hBuf: &buf,
  120. hEnc: hpack.NewEncoder(&buf),
  121. maxStreams: maxStreams,
  122. controlBuf: newRecvBuffer(),
  123. fc: &inFlow{limit: initialConnWindowSize},
  124. sendQuotaPool: newQuotaPool(defaultWindowSize),
  125. state: reachable,
  126. writableChan: make(chan int, 1),
  127. shutdownChan: make(chan struct{}),
  128. activeStreams: make(map[uint32]*Stream),
  129. streamSendQuota: defaultWindowSize,
  130. }
  131. go t.controller()
  132. t.writableChan <- 0
  133. return t, nil
  134. }
  135. // operateHeader takes action on the decoded headers.
  136. func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream)) (close bool) {
  137. buf := newRecvBuffer()
  138. s := &Stream{
  139. id: frame.Header().StreamID,
  140. st: t,
  141. buf: buf,
  142. fc: &inFlow{limit: initialWindowSize},
  143. }
  144. var state decodeState
  145. for _, hf := range frame.Fields {
  146. state.processHeaderField(hf)
  147. }
  148. if err := state.err; err != nil {
  149. if se, ok := err.(StreamError); ok {
  150. t.controlBuf.put(&resetStream{s.id, statusCodeConvTab[se.Code]})
  151. }
  152. return
  153. }
  154. if frame.StreamEnded() {
  155. // s is just created by the caller. No lock needed.
  156. s.state = streamReadDone
  157. }
  158. s.recvCompress = state.encoding
  159. if state.timeoutSet {
  160. s.ctx, s.cancel = context.WithTimeout(context.TODO(), state.timeout)
  161. } else {
  162. s.ctx, s.cancel = context.WithCancel(context.TODO())
  163. }
  164. pr := &peer.Peer{
  165. Addr: t.conn.RemoteAddr(),
  166. }
  167. // Attach Auth info if there is any.
  168. if t.authInfo != nil {
  169. pr.AuthInfo = t.authInfo
  170. }
  171. s.ctx = peer.NewContext(s.ctx, pr)
  172. // Cache the current stream to the context so that the server application
  173. // can find out. Required when the server wants to send some metadata
  174. // back to the client (unary call only).
  175. s.ctx = newContextWithStream(s.ctx, s)
  176. // Attach the received metadata to the context.
  177. if len(state.mdata) > 0 {
  178. s.ctx = metadata.NewContext(s.ctx, state.mdata)
  179. }
  180. s.dec = &recvBufferReader{
  181. ctx: s.ctx,
  182. recv: s.buf,
  183. }
  184. s.recvCompress = state.encoding
  185. s.method = state.method
  186. t.mu.Lock()
  187. if t.state != reachable {
  188. t.mu.Unlock()
  189. return
  190. }
  191. if uint32(len(t.activeStreams)) >= t.maxStreams {
  192. t.mu.Unlock()
  193. t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream})
  194. return
  195. }
  196. if s.id%2 != 1 || s.id <= t.maxStreamID {
  197. t.mu.Unlock()
  198. // illegal gRPC stream id.
  199. grpclog.Println("transport: http2Server.HandleStreams received an illegal stream id: ", s.id)
  200. return true
  201. }
  202. t.maxStreamID = s.id
  203. s.sendQuotaPool = newQuotaPool(int(t.streamSendQuota))
  204. t.activeStreams[s.id] = s
  205. t.mu.Unlock()
  206. s.windowHandler = func(n int) {
  207. t.updateWindow(s, uint32(n))
  208. }
  209. handle(s)
  210. return
  211. }
  212. // HandleStreams receives incoming streams using the given handler. This is
  213. // typically run in a separate goroutine.
  214. func (t *http2Server) HandleStreams(handle func(*Stream)) {
  215. // Check the validity of client preface.
  216. preface := make([]byte, len(clientPreface))
  217. if _, err := io.ReadFull(t.conn, preface); err != nil {
  218. grpclog.Printf("transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
  219. t.Close()
  220. return
  221. }
  222. if !bytes.Equal(preface, clientPreface) {
  223. grpclog.Printf("transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
  224. t.Close()
  225. return
  226. }
  227. frame, err := t.framer.readFrame()
  228. if err == io.EOF || err == io.ErrUnexpectedEOF {
  229. t.Close()
  230. return
  231. }
  232. if err != nil {
  233. grpclog.Printf("transport: http2Server.HandleStreams failed to read frame: %v", err)
  234. t.Close()
  235. return
  236. }
  237. sf, ok := frame.(*http2.SettingsFrame)
  238. if !ok {
  239. grpclog.Printf("transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
  240. t.Close()
  241. return
  242. }
  243. t.handleSettings(sf)
  244. for {
  245. frame, err := t.framer.readFrame()
  246. if err != nil {
  247. if se, ok := err.(http2.StreamError); ok {
  248. t.mu.Lock()
  249. s := t.activeStreams[se.StreamID]
  250. t.mu.Unlock()
  251. if s != nil {
  252. t.closeStream(s)
  253. }
  254. t.controlBuf.put(&resetStream{se.StreamID, se.Code})
  255. continue
  256. }
  257. if err == io.EOF || err == io.ErrUnexpectedEOF {
  258. t.Close()
  259. return
  260. }
  261. grpclog.Printf("transport: http2Server.HandleStreams failed to read frame: %v", err)
  262. t.Close()
  263. return
  264. }
  265. switch frame := frame.(type) {
  266. case *http2.MetaHeadersFrame:
  267. if t.operateHeaders(frame, handle) {
  268. t.Close()
  269. break
  270. }
  271. case *http2.DataFrame:
  272. t.handleData(frame)
  273. case *http2.RSTStreamFrame:
  274. t.handleRSTStream(frame)
  275. case *http2.SettingsFrame:
  276. t.handleSettings(frame)
  277. case *http2.PingFrame:
  278. t.handlePing(frame)
  279. case *http2.WindowUpdateFrame:
  280. t.handleWindowUpdate(frame)
  281. case *http2.GoAwayFrame:
  282. // TODO: Handle GoAway from the client appropriately.
  283. default:
  284. grpclog.Printf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
  285. }
  286. }
  287. }
  288. func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
  289. t.mu.Lock()
  290. defer t.mu.Unlock()
  291. if t.activeStreams == nil {
  292. // The transport is closing.
  293. return nil, false
  294. }
  295. s, ok := t.activeStreams[f.Header().StreamID]
  296. if !ok {
  297. // The stream is already done.
  298. return nil, false
  299. }
  300. return s, true
  301. }
  302. // updateWindow adjusts the inbound quota for the stream and the transport.
  303. // Window updates will deliver to the controller for sending when
  304. // the cumulative quota exceeds the corresponding threshold.
  305. func (t *http2Server) updateWindow(s *Stream, n uint32) {
  306. s.mu.Lock()
  307. defer s.mu.Unlock()
  308. if s.state == streamDone {
  309. return
  310. }
  311. if w := t.fc.onRead(n); w > 0 {
  312. t.controlBuf.put(&windowUpdate{0, w})
  313. }
  314. if w := s.fc.onRead(n); w > 0 {
  315. t.controlBuf.put(&windowUpdate{s.id, w})
  316. }
  317. }
  318. func (t *http2Server) handleData(f *http2.DataFrame) {
  319. size := len(f.Data())
  320. if err := t.fc.onData(uint32(size)); err != nil {
  321. grpclog.Printf("transport: http2Server %v", err)
  322. t.Close()
  323. return
  324. }
  325. // Select the right stream to dispatch.
  326. s, ok := t.getStream(f)
  327. if !ok {
  328. if w := t.fc.onRead(uint32(size)); w > 0 {
  329. t.controlBuf.put(&windowUpdate{0, w})
  330. }
  331. return
  332. }
  333. if size > 0 {
  334. s.mu.Lock()
  335. if s.state == streamDone {
  336. s.mu.Unlock()
  337. // The stream has been closed. Release the corresponding quota.
  338. if w := t.fc.onRead(uint32(size)); w > 0 {
  339. t.controlBuf.put(&windowUpdate{0, w})
  340. }
  341. return
  342. }
  343. if err := s.fc.onData(uint32(size)); err != nil {
  344. s.mu.Unlock()
  345. t.closeStream(s)
  346. t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl})
  347. return
  348. }
  349. s.mu.Unlock()
  350. // TODO(bradfitz, zhaoq): A copy is required here because there is no
  351. // guarantee f.Data() is consumed before the arrival of next frame.
  352. // Can this copy be eliminated?
  353. data := make([]byte, size)
  354. copy(data, f.Data())
  355. s.write(recvMsg{data: data})
  356. }
  357. if f.Header().Flags.Has(http2.FlagDataEndStream) {
  358. // Received the end of stream from the client.
  359. s.mu.Lock()
  360. if s.state != streamDone {
  361. s.state = streamReadDone
  362. }
  363. s.mu.Unlock()
  364. s.write(recvMsg{err: io.EOF})
  365. }
  366. }
  367. func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
  368. s, ok := t.getStream(f)
  369. if !ok {
  370. return
  371. }
  372. t.closeStream(s)
  373. }
  374. func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
  375. if f.IsAck() {
  376. return
  377. }
  378. var ss []http2.Setting
  379. f.ForeachSetting(func(s http2.Setting) error {
  380. ss = append(ss, s)
  381. return nil
  382. })
  383. // The settings will be applied once the ack is sent.
  384. t.controlBuf.put(&settings{ack: true, ss: ss})
  385. }
  386. func (t *http2Server) handlePing(f *http2.PingFrame) {
  387. if f.IsAck() { // Do nothing.
  388. return
  389. }
  390. pingAck := &ping{ack: true}
  391. copy(pingAck.data[:], f.Data[:])
  392. t.controlBuf.put(pingAck)
  393. }
  394. func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
  395. id := f.Header().StreamID
  396. incr := f.Increment
  397. if id == 0 {
  398. t.sendQuotaPool.add(int(incr))
  399. return
  400. }
  401. if s, ok := t.getStream(f); ok {
  402. s.sendQuotaPool.add(int(incr))
  403. }
  404. }
  405. func (t *http2Server) writeHeaders(s *Stream, b *bytes.Buffer, endStream bool) error {
  406. first := true
  407. endHeaders := false
  408. var err error
  409. // Sends the headers in a single batch.
  410. for !endHeaders {
  411. size := t.hBuf.Len()
  412. if size > http2MaxFrameLen {
  413. size = http2MaxFrameLen
  414. } else {
  415. endHeaders = true
  416. }
  417. if first {
  418. p := http2.HeadersFrameParam{
  419. StreamID: s.id,
  420. BlockFragment: b.Next(size),
  421. EndStream: endStream,
  422. EndHeaders: endHeaders,
  423. }
  424. err = t.framer.writeHeaders(endHeaders, p)
  425. first = false
  426. } else {
  427. err = t.framer.writeContinuation(endHeaders, s.id, endHeaders, b.Next(size))
  428. }
  429. if err != nil {
  430. t.Close()
  431. return connectionErrorf(true, err, "transport: %v", err)
  432. }
  433. }
  434. return nil
  435. }
  436. // WriteHeader sends the header metedata md back to the client.
  437. func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
  438. s.mu.Lock()
  439. if s.headerOk || s.state == streamDone {
  440. s.mu.Unlock()
  441. return ErrIllegalHeaderWrite
  442. }
  443. s.headerOk = true
  444. if md.Len() > 0 {
  445. if s.header.Len() > 0 {
  446. s.header = metadata.Join(s.header, md)
  447. } else {
  448. s.header = md
  449. }
  450. }
  451. md = s.header
  452. s.mu.Unlock()
  453. if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
  454. return err
  455. }
  456. t.hBuf.Reset()
  457. t.hEnc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"})
  458. t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
  459. if s.sendCompress != "" {
  460. t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
  461. }
  462. for k, v := range md {
  463. if isReservedHeader(k) {
  464. // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
  465. continue
  466. }
  467. for _, entry := range v {
  468. t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry})
  469. }
  470. }
  471. if err := t.writeHeaders(s, t.hBuf, false); err != nil {
  472. return err
  473. }
  474. t.writableChan <- 0
  475. return nil
  476. }
  477. // WriteStatus sends stream status to the client and terminates the stream.
  478. // There is no further I/O operations being able to perform on this stream.
  479. // TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
  480. // OK is adopted.
  481. func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc string) error {
  482. var headersSent, hasHeader bool
  483. s.mu.Lock()
  484. if s.state == streamDone {
  485. s.mu.Unlock()
  486. return nil
  487. }
  488. if s.headerOk {
  489. headersSent = true
  490. }
  491. if s.header.Len() > 0 {
  492. hasHeader = true
  493. }
  494. s.mu.Unlock()
  495. if !headersSent && hasHeader {
  496. t.WriteHeader(s, nil)
  497. headersSent = true
  498. }
  499. if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
  500. return err
  501. }
  502. t.hBuf.Reset()
  503. if !headersSent {
  504. t.hEnc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"})
  505. t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
  506. }
  507. t.hEnc.WriteField(
  508. hpack.HeaderField{
  509. Name: "grpc-status",
  510. Value: strconv.Itoa(int(statusCode)),
  511. })
  512. t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(statusDesc)})
  513. // Attach the trailer metadata.
  514. for k, v := range s.trailer {
  515. // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
  516. if isReservedHeader(k) {
  517. continue
  518. }
  519. for _, entry := range v {
  520. t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry})
  521. }
  522. }
  523. if err := t.writeHeaders(s, t.hBuf, true); err != nil {
  524. t.Close()
  525. return err
  526. }
  527. t.closeStream(s)
  528. t.writableChan <- 0
  529. return nil
  530. }
  531. // Write converts the data into HTTP2 data frame and sends it out. Non-nil error
  532. // is returns if it fails (e.g., framing error, transport error).
  533. func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
  534. // TODO(zhaoq): Support multi-writers for a single stream.
  535. var writeHeaderFrame bool
  536. s.mu.Lock()
  537. if s.state == streamDone {
  538. s.mu.Unlock()
  539. return streamErrorf(codes.Unknown, "the stream has been done")
  540. }
  541. if !s.headerOk {
  542. writeHeaderFrame = true
  543. }
  544. s.mu.Unlock()
  545. if writeHeaderFrame {
  546. t.WriteHeader(s, nil)
  547. }
  548. r := bytes.NewBuffer(data)
  549. for {
  550. if r.Len() == 0 {
  551. return nil
  552. }
  553. size := http2MaxFrameLen
  554. s.sendQuotaPool.add(0)
  555. // Wait until the stream has some quota to send the data.
  556. sq, err := wait(s.ctx, nil, nil, t.shutdownChan, s.sendQuotaPool.acquire())
  557. if err != nil {
  558. return err
  559. }
  560. t.sendQuotaPool.add(0)
  561. // Wait until the transport has some quota to send the data.
  562. tq, err := wait(s.ctx, nil, nil, t.shutdownChan, t.sendQuotaPool.acquire())
  563. if err != nil {
  564. if _, ok := err.(StreamError); ok {
  565. t.sendQuotaPool.cancel()
  566. }
  567. return err
  568. }
  569. if sq < size {
  570. size = sq
  571. }
  572. if tq < size {
  573. size = tq
  574. }
  575. p := r.Next(size)
  576. ps := len(p)
  577. if ps < sq {
  578. // Overbooked stream quota. Return it back.
  579. s.sendQuotaPool.add(sq - ps)
  580. }
  581. if ps < tq {
  582. // Overbooked transport quota. Return it back.
  583. t.sendQuotaPool.add(tq - ps)
  584. }
  585. t.framer.adjustNumWriters(1)
  586. // Got some quota. Try to acquire writing privilege on the
  587. // transport.
  588. if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
  589. if _, ok := err.(StreamError); ok {
  590. // Return the connection quota back.
  591. t.sendQuotaPool.add(ps)
  592. }
  593. if t.framer.adjustNumWriters(-1) == 0 {
  594. // This writer is the last one in this batch and has the
  595. // responsibility to flush the buffered frames. It queues
  596. // a flush request to controlBuf instead of flushing directly
  597. // in order to avoid the race with other writing or flushing.
  598. t.controlBuf.put(&flushIO{})
  599. }
  600. return err
  601. }
  602. select {
  603. case <-s.ctx.Done():
  604. t.sendQuotaPool.add(ps)
  605. if t.framer.adjustNumWriters(-1) == 0 {
  606. t.controlBuf.put(&flushIO{})
  607. }
  608. t.writableChan <- 0
  609. return ContextErr(s.ctx.Err())
  610. default:
  611. }
  612. var forceFlush bool
  613. if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 && !opts.Last {
  614. forceFlush = true
  615. }
  616. if err := t.framer.writeData(forceFlush, s.id, false, p); err != nil {
  617. t.Close()
  618. return connectionErrorf(true, err, "transport: %v", err)
  619. }
  620. if t.framer.adjustNumWriters(-1) == 0 {
  621. t.framer.flushWrite()
  622. }
  623. t.writableChan <- 0
  624. }
  625. }
  626. func (t *http2Server) applySettings(ss []http2.Setting) {
  627. for _, s := range ss {
  628. if s.ID == http2.SettingInitialWindowSize {
  629. t.mu.Lock()
  630. defer t.mu.Unlock()
  631. for _, stream := range t.activeStreams {
  632. stream.sendQuotaPool.reset(int(s.Val - t.streamSendQuota))
  633. }
  634. t.streamSendQuota = s.Val
  635. }
  636. }
  637. }
  638. // controller running in a separate goroutine takes charge of sending control
  639. // frames (e.g., window update, reset stream, setting, etc.) to the server.
  640. func (t *http2Server) controller() {
  641. for {
  642. select {
  643. case i := <-t.controlBuf.get():
  644. t.controlBuf.load()
  645. select {
  646. case <-t.writableChan:
  647. switch i := i.(type) {
  648. case *windowUpdate:
  649. t.framer.writeWindowUpdate(true, i.streamID, i.increment)
  650. case *settings:
  651. if i.ack {
  652. t.framer.writeSettingsAck(true)
  653. t.applySettings(i.ss)
  654. } else {
  655. t.framer.writeSettings(true, i.ss...)
  656. }
  657. case *resetStream:
  658. t.framer.writeRSTStream(true, i.streamID, i.code)
  659. case *goAway:
  660. t.mu.Lock()
  661. if t.state == closing {
  662. t.mu.Unlock()
  663. // The transport is closing.
  664. return
  665. }
  666. sid := t.maxStreamID
  667. t.state = draining
  668. t.mu.Unlock()
  669. t.framer.writeGoAway(true, sid, http2.ErrCodeNo, nil)
  670. case *flushIO:
  671. t.framer.flushWrite()
  672. case *ping:
  673. t.framer.writePing(true, i.ack, i.data)
  674. default:
  675. grpclog.Printf("transport: http2Server.controller got unexpected item type %v\n", i)
  676. }
  677. t.writableChan <- 0
  678. continue
  679. case <-t.shutdownChan:
  680. return
  681. }
  682. case <-t.shutdownChan:
  683. return
  684. }
  685. }
  686. }
  687. // Close starts shutting down the http2Server transport.
  688. // TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
  689. // could cause some resource issue. Revisit this later.
  690. func (t *http2Server) Close() (err error) {
  691. t.mu.Lock()
  692. if t.state == closing {
  693. t.mu.Unlock()
  694. return errors.New("transport: Close() was already called")
  695. }
  696. t.state = closing
  697. streams := t.activeStreams
  698. t.activeStreams = nil
  699. t.mu.Unlock()
  700. close(t.shutdownChan)
  701. err = t.conn.Close()
  702. // Cancel all active streams.
  703. for _, s := range streams {
  704. s.cancel()
  705. }
  706. return
  707. }
  708. // closeStream clears the footprint of a stream when the stream is not needed
  709. // any more.
  710. func (t *http2Server) closeStream(s *Stream) {
  711. t.mu.Lock()
  712. delete(t.activeStreams, s.id)
  713. if t.state == draining && len(t.activeStreams) == 0 {
  714. defer t.Close()
  715. }
  716. t.mu.Unlock()
  717. // In case stream sending and receiving are invoked in separate
  718. // goroutines (e.g., bi-directional streaming), cancel needs to be
  719. // called to interrupt the potential blocking on other goroutines.
  720. s.cancel()
  721. s.mu.Lock()
  722. if q := s.fc.resetPendingData(); q > 0 {
  723. if w := t.fc.onRead(q); w > 0 {
  724. t.controlBuf.put(&windowUpdate{0, w})
  725. }
  726. }
  727. if s.state == streamDone {
  728. s.mu.Unlock()
  729. return
  730. }
  731. s.state = streamDone
  732. s.mu.Unlock()
  733. }
  734. func (t *http2Server) RemoteAddr() net.Addr {
  735. return t.conn.RemoteAddr()
  736. }
  737. func (t *http2Server) Drain() {
  738. t.controlBuf.put(&goAway{})
  739. }