controlbuf.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package transport
  19. import (
  20. "bytes"
  21. "errors"
  22. "fmt"
  23. "runtime"
  24. "strconv"
  25. "sync"
  26. "sync/atomic"
  27. "golang.org/x/net/http2"
  28. "golang.org/x/net/http2/hpack"
  29. "google.golang.org/grpc/internal/grpcutil"
  30. "google.golang.org/grpc/status"
  31. )
  32. var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) {
  33. e.SetMaxDynamicTableSizeLimit(v)
  34. }
  35. type itemNode struct {
  36. it interface{}
  37. next *itemNode
  38. }
  39. type itemList struct {
  40. head *itemNode
  41. tail *itemNode
  42. }
  43. func (il *itemList) enqueue(i interface{}) {
  44. n := &itemNode{it: i}
  45. if il.tail == nil {
  46. il.head, il.tail = n, n
  47. return
  48. }
  49. il.tail.next = n
  50. il.tail = n
  51. }
  52. // peek returns the first item in the list without removing it from the
  53. // list.
  54. func (il *itemList) peek() interface{} {
  55. return il.head.it
  56. }
  57. func (il *itemList) dequeue() interface{} {
  58. if il.head == nil {
  59. return nil
  60. }
  61. i := il.head.it
  62. il.head = il.head.next
  63. if il.head == nil {
  64. il.tail = nil
  65. }
  66. return i
  67. }
  68. func (il *itemList) dequeueAll() *itemNode {
  69. h := il.head
  70. il.head, il.tail = nil, nil
  71. return h
  72. }
  73. func (il *itemList) isEmpty() bool {
  74. return il.head == nil
  75. }
  76. // The following defines various control items which could flow through
  77. // the control buffer of transport. They represent different aspects of
  78. // control tasks, e.g., flow control, settings, streaming resetting, etc.
  79. // maxQueuedTransportResponseFrames is the most queued "transport response"
  80. // frames we will buffer before preventing new reads from occurring on the
  81. // transport. These are control frames sent in response to client requests,
  82. // such as RST_STREAM due to bad headers or settings acks.
  83. const maxQueuedTransportResponseFrames = 50
  84. type cbItem interface {
  85. isTransportResponseFrame() bool
  86. }
  87. // registerStream is used to register an incoming stream with loopy writer.
  88. type registerStream struct {
  89. streamID uint32
  90. wq *writeQuota
  91. }
  92. func (*registerStream) isTransportResponseFrame() bool { return false }
  93. // headerFrame is also used to register stream on the client-side.
  94. type headerFrame struct {
  95. streamID uint32
  96. hf []hpack.HeaderField
  97. endStream bool // Valid on server side.
  98. initStream func(uint32) error // Used only on the client side.
  99. onWrite func()
  100. wq *writeQuota // write quota for the stream created.
  101. cleanup *cleanupStream // Valid on the server side.
  102. onOrphaned func(error) // Valid on client-side
  103. }
  104. func (h *headerFrame) isTransportResponseFrame() bool {
  105. return h.cleanup != nil && h.cleanup.rst // Results in a RST_STREAM
  106. }
  107. type cleanupStream struct {
  108. streamID uint32
  109. rst bool
  110. rstCode http2.ErrCode
  111. onWrite func()
  112. }
  113. func (c *cleanupStream) isTransportResponseFrame() bool { return c.rst } // Results in a RST_STREAM
  114. type earlyAbortStream struct {
  115. httpStatus uint32
  116. streamID uint32
  117. contentSubtype string
  118. status *status.Status
  119. rst bool
  120. }
  121. func (*earlyAbortStream) isTransportResponseFrame() bool { return false }
  122. type dataFrame struct {
  123. streamID uint32
  124. endStream bool
  125. h []byte
  126. d []byte
  127. // onEachWrite is called every time
  128. // a part of d is written out.
  129. onEachWrite func()
  130. }
  131. func (*dataFrame) isTransportResponseFrame() bool { return false }
  132. type incomingWindowUpdate struct {
  133. streamID uint32
  134. increment uint32
  135. }
  136. func (*incomingWindowUpdate) isTransportResponseFrame() bool { return false }
  137. type outgoingWindowUpdate struct {
  138. streamID uint32
  139. increment uint32
  140. }
  141. func (*outgoingWindowUpdate) isTransportResponseFrame() bool {
  142. return false // window updates are throttled by thresholds
  143. }
  144. type incomingSettings struct {
  145. ss []http2.Setting
  146. }
  147. func (*incomingSettings) isTransportResponseFrame() bool { return true } // Results in a settings ACK
  148. type outgoingSettings struct {
  149. ss []http2.Setting
  150. }
  151. func (*outgoingSettings) isTransportResponseFrame() bool { return false }
  152. type incomingGoAway struct {
  153. }
  154. func (*incomingGoAway) isTransportResponseFrame() bool { return false }
  155. type goAway struct {
  156. code http2.ErrCode
  157. debugData []byte
  158. headsUp bool
  159. closeConn bool
  160. }
  161. func (*goAway) isTransportResponseFrame() bool { return false }
  162. type ping struct {
  163. ack bool
  164. data [8]byte
  165. }
  166. func (*ping) isTransportResponseFrame() bool { return true }
  167. type outFlowControlSizeRequest struct {
  168. resp chan uint32
  169. }
  170. func (*outFlowControlSizeRequest) isTransportResponseFrame() bool { return false }
  171. type outStreamState int
  172. const (
  173. active outStreamState = iota
  174. empty
  175. waitingOnStreamQuota
  176. )
  177. type outStream struct {
  178. id uint32
  179. state outStreamState
  180. itl *itemList
  181. bytesOutStanding int
  182. wq *writeQuota
  183. next *outStream
  184. prev *outStream
  185. }
  186. func (s *outStream) deleteSelf() {
  187. if s.prev != nil {
  188. s.prev.next = s.next
  189. }
  190. if s.next != nil {
  191. s.next.prev = s.prev
  192. }
  193. s.next, s.prev = nil, nil
  194. }
  195. type outStreamList struct {
  196. // Following are sentinel objects that mark the
  197. // beginning and end of the list. They do not
  198. // contain any item lists. All valid objects are
  199. // inserted in between them.
  200. // This is needed so that an outStream object can
  201. // deleteSelf() in O(1) time without knowing which
  202. // list it belongs to.
  203. head *outStream
  204. tail *outStream
  205. }
  206. func newOutStreamList() *outStreamList {
  207. head, tail := new(outStream), new(outStream)
  208. head.next = tail
  209. tail.prev = head
  210. return &outStreamList{
  211. head: head,
  212. tail: tail,
  213. }
  214. }
  215. func (l *outStreamList) enqueue(s *outStream) {
  216. e := l.tail.prev
  217. e.next = s
  218. s.prev = e
  219. s.next = l.tail
  220. l.tail.prev = s
  221. }
  222. // remove from the beginning of the list.
  223. func (l *outStreamList) dequeue() *outStream {
  224. b := l.head.next
  225. if b == l.tail {
  226. return nil
  227. }
  228. b.deleteSelf()
  229. return b
  230. }
  231. // controlBuffer is a way to pass information to loopy.
  232. // Information is passed as specific struct types called control frames.
  233. // A control frame not only represents data, messages or headers to be sent out
  234. // but can also be used to instruct loopy to update its internal state.
  235. // It shouldn't be confused with an HTTP2 frame, although some of the control frames
  236. // like dataFrame and headerFrame do go out on wire as HTTP2 frames.
  237. type controlBuffer struct {
  238. ch chan struct{}
  239. done <-chan struct{}
  240. mu sync.Mutex
  241. consumerWaiting bool
  242. list *itemList
  243. err error
  244. // transportResponseFrames counts the number of queued items that represent
  245. // the response of an action initiated by the peer. trfChan is created
  246. // when transportResponseFrames >= maxQueuedTransportResponseFrames and is
  247. // closed and nilled when transportResponseFrames drops below the
  248. // threshold. Both fields are protected by mu.
  249. transportResponseFrames int
  250. trfChan atomic.Value // chan struct{}
  251. }
  252. func newControlBuffer(done <-chan struct{}) *controlBuffer {
  253. return &controlBuffer{
  254. ch: make(chan struct{}, 1),
  255. list: &itemList{},
  256. done: done,
  257. }
  258. }
  259. // throttle blocks if there are too many incomingSettings/cleanupStreams in the
  260. // controlbuf.
  261. func (c *controlBuffer) throttle() {
  262. ch, _ := c.trfChan.Load().(chan struct{})
  263. if ch != nil {
  264. select {
  265. case <-ch:
  266. case <-c.done:
  267. }
  268. }
  269. }
  270. func (c *controlBuffer) put(it cbItem) error {
  271. _, err := c.executeAndPut(nil, it)
  272. return err
  273. }
  274. func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it cbItem) (bool, error) {
  275. var wakeUp bool
  276. c.mu.Lock()
  277. if c.err != nil {
  278. c.mu.Unlock()
  279. return false, c.err
  280. }
  281. if f != nil {
  282. if !f(it) { // f wasn't successful
  283. c.mu.Unlock()
  284. return false, nil
  285. }
  286. }
  287. if c.consumerWaiting {
  288. wakeUp = true
  289. c.consumerWaiting = false
  290. }
  291. c.list.enqueue(it)
  292. if it.isTransportResponseFrame() {
  293. c.transportResponseFrames++
  294. if c.transportResponseFrames == maxQueuedTransportResponseFrames {
  295. // We are adding the frame that puts us over the threshold; create
  296. // a throttling channel.
  297. c.trfChan.Store(make(chan struct{}))
  298. }
  299. }
  300. c.mu.Unlock()
  301. if wakeUp {
  302. select {
  303. case c.ch <- struct{}{}:
  304. default:
  305. }
  306. }
  307. return true, nil
  308. }
  309. // Note argument f should never be nil.
  310. func (c *controlBuffer) execute(f func(it interface{}) bool, it interface{}) (bool, error) {
  311. c.mu.Lock()
  312. if c.err != nil {
  313. c.mu.Unlock()
  314. return false, c.err
  315. }
  316. if !f(it) { // f wasn't successful
  317. c.mu.Unlock()
  318. return false, nil
  319. }
  320. c.mu.Unlock()
  321. return true, nil
  322. }
  323. func (c *controlBuffer) get(block bool) (interface{}, error) {
  324. for {
  325. c.mu.Lock()
  326. if c.err != nil {
  327. c.mu.Unlock()
  328. return nil, c.err
  329. }
  330. if !c.list.isEmpty() {
  331. h := c.list.dequeue().(cbItem)
  332. if h.isTransportResponseFrame() {
  333. if c.transportResponseFrames == maxQueuedTransportResponseFrames {
  334. // We are removing the frame that put us over the
  335. // threshold; close and clear the throttling channel.
  336. ch := c.trfChan.Load().(chan struct{})
  337. close(ch)
  338. c.trfChan.Store((chan struct{})(nil))
  339. }
  340. c.transportResponseFrames--
  341. }
  342. c.mu.Unlock()
  343. return h, nil
  344. }
  345. if !block {
  346. c.mu.Unlock()
  347. return nil, nil
  348. }
  349. c.consumerWaiting = true
  350. c.mu.Unlock()
  351. select {
  352. case <-c.ch:
  353. case <-c.done:
  354. return nil, ErrConnClosing
  355. }
  356. }
  357. }
  358. func (c *controlBuffer) finish() {
  359. c.mu.Lock()
  360. if c.err != nil {
  361. c.mu.Unlock()
  362. return
  363. }
  364. c.err = ErrConnClosing
  365. // There may be headers for streams in the control buffer.
  366. // These streams need to be cleaned out since the transport
  367. // is still not aware of these yet.
  368. for head := c.list.dequeueAll(); head != nil; head = head.next {
  369. hdr, ok := head.it.(*headerFrame)
  370. if !ok {
  371. continue
  372. }
  373. if hdr.onOrphaned != nil { // It will be nil on the server-side.
  374. hdr.onOrphaned(ErrConnClosing)
  375. }
  376. }
  377. // In case throttle() is currently in flight, it needs to be unblocked.
  378. // Otherwise, the transport may not close, since the transport is closed by
  379. // the reader encountering the connection error.
  380. ch, _ := c.trfChan.Load().(chan struct{})
  381. if ch != nil {
  382. close(ch)
  383. }
  384. c.trfChan.Store((chan struct{})(nil))
  385. c.mu.Unlock()
  386. }
  387. type side int
  388. const (
  389. clientSide side = iota
  390. serverSide
  391. )
  392. // Loopy receives frames from the control buffer.
  393. // Each frame is handled individually; most of the work done by loopy goes
  394. // into handling data frames. Loopy maintains a queue of active streams, and each
  395. // stream maintains a queue of data frames; as loopy receives data frames
  396. // it gets added to the queue of the relevant stream.
  397. // Loopy goes over this list of active streams by processing one node every iteration,
  398. // thereby closely resemebling to a round-robin scheduling over all streams. While
  399. // processing a stream, loopy writes out data bytes from this stream capped by the min
  400. // of http2MaxFrameLen, connection-level flow control and stream-level flow control.
  401. type loopyWriter struct {
  402. side side
  403. cbuf *controlBuffer
  404. sendQuota uint32
  405. oiws uint32 // outbound initial window size.
  406. // estdStreams is map of all established streams that are not cleaned-up yet.
  407. // On client-side, this is all streams whose headers were sent out.
  408. // On server-side, this is all streams whose headers were received.
  409. estdStreams map[uint32]*outStream // Established streams.
  410. // activeStreams is a linked-list of all streams that have data to send and some
  411. // stream-level flow control quota.
  412. // Each of these streams internally have a list of data items(and perhaps trailers
  413. // on the server-side) to be sent out.
  414. activeStreams *outStreamList
  415. framer *framer
  416. hBuf *bytes.Buffer // The buffer for HPACK encoding.
  417. hEnc *hpack.Encoder // HPACK encoder.
  418. bdpEst *bdpEstimator
  419. draining bool
  420. // Side-specific handlers
  421. ssGoAwayHandler func(*goAway) (bool, error)
  422. }
  423. func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator) *loopyWriter {
  424. var buf bytes.Buffer
  425. l := &loopyWriter{
  426. side: s,
  427. cbuf: cbuf,
  428. sendQuota: defaultWindowSize,
  429. oiws: defaultWindowSize,
  430. estdStreams: make(map[uint32]*outStream),
  431. activeStreams: newOutStreamList(),
  432. framer: fr,
  433. hBuf: &buf,
  434. hEnc: hpack.NewEncoder(&buf),
  435. bdpEst: bdpEst,
  436. }
  437. return l
  438. }
  439. const minBatchSize = 1000
  440. // run should be run in a separate goroutine.
  441. // It reads control frames from controlBuf and processes them by:
  442. // 1. Updating loopy's internal state, or/and
  443. // 2. Writing out HTTP2 frames on the wire.
  444. //
  445. // Loopy keeps all active streams with data to send in a linked-list.
  446. // All streams in the activeStreams linked-list must have both:
  447. // 1. Data to send, and
  448. // 2. Stream level flow control quota available.
  449. //
  450. // In each iteration of run loop, other than processing the incoming control
  451. // frame, loopy calls processData, which processes one node from the activeStreams linked-list.
  452. // This results in writing of HTTP2 frames into an underlying write buffer.
  453. // When there's no more control frames to read from controlBuf, loopy flushes the write buffer.
  454. // As an optimization, to increase the batch size for each flush, loopy yields the processor, once
  455. // if the batch size is too low to give stream goroutines a chance to fill it up.
  456. func (l *loopyWriter) run() (err error) {
  457. defer func() {
  458. if err == ErrConnClosing {
  459. // Don't log ErrConnClosing as error since it happens
  460. // 1. When the connection is closed by some other known issue.
  461. // 2. User closed the connection.
  462. // 3. A graceful close of connection.
  463. if logger.V(logLevel) {
  464. logger.Infof("transport: loopyWriter.run returning. %v", err)
  465. }
  466. err = nil
  467. }
  468. }()
  469. for {
  470. it, err := l.cbuf.get(true)
  471. if err != nil {
  472. return err
  473. }
  474. if err = l.handle(it); err != nil {
  475. return err
  476. }
  477. if _, err = l.processData(); err != nil {
  478. return err
  479. }
  480. gosched := true
  481. hasdata:
  482. for {
  483. it, err := l.cbuf.get(false)
  484. if err != nil {
  485. return err
  486. }
  487. if it != nil {
  488. if err = l.handle(it); err != nil {
  489. return err
  490. }
  491. if _, err = l.processData(); err != nil {
  492. return err
  493. }
  494. continue hasdata
  495. }
  496. isEmpty, err := l.processData()
  497. if err != nil {
  498. return err
  499. }
  500. if !isEmpty {
  501. continue hasdata
  502. }
  503. if gosched {
  504. gosched = false
  505. if l.framer.writer.offset < minBatchSize {
  506. runtime.Gosched()
  507. continue hasdata
  508. }
  509. }
  510. l.framer.writer.Flush()
  511. break hasdata
  512. }
  513. }
  514. }
  515. func (l *loopyWriter) outgoingWindowUpdateHandler(w *outgoingWindowUpdate) error {
  516. return l.framer.fr.WriteWindowUpdate(w.streamID, w.increment)
  517. }
  518. func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) error {
  519. // Otherwise update the quota.
  520. if w.streamID == 0 {
  521. l.sendQuota += w.increment
  522. return nil
  523. }
  524. // Find the stream and update it.
  525. if str, ok := l.estdStreams[w.streamID]; ok {
  526. str.bytesOutStanding -= int(w.increment)
  527. if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota > 0 && str.state == waitingOnStreamQuota {
  528. str.state = active
  529. l.activeStreams.enqueue(str)
  530. return nil
  531. }
  532. }
  533. return nil
  534. }
  535. func (l *loopyWriter) outgoingSettingsHandler(s *outgoingSettings) error {
  536. return l.framer.fr.WriteSettings(s.ss...)
  537. }
  538. func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error {
  539. if err := l.applySettings(s.ss); err != nil {
  540. return err
  541. }
  542. return l.framer.fr.WriteSettingsAck()
  543. }
  544. func (l *loopyWriter) registerStreamHandler(h *registerStream) error {
  545. str := &outStream{
  546. id: h.streamID,
  547. state: empty,
  548. itl: &itemList{},
  549. wq: h.wq,
  550. }
  551. l.estdStreams[h.streamID] = str
  552. return nil
  553. }
  554. func (l *loopyWriter) headerHandler(h *headerFrame) error {
  555. if l.side == serverSide {
  556. str, ok := l.estdStreams[h.streamID]
  557. if !ok {
  558. if logger.V(logLevel) {
  559. logger.Warningf("transport: loopy doesn't recognize the stream: %d", h.streamID)
  560. }
  561. return nil
  562. }
  563. // Case 1.A: Server is responding back with headers.
  564. if !h.endStream {
  565. return l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite)
  566. }
  567. // else: Case 1.B: Server wants to close stream.
  568. if str.state != empty { // either active or waiting on stream quota.
  569. // add it str's list of items.
  570. str.itl.enqueue(h)
  571. return nil
  572. }
  573. if err := l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite); err != nil {
  574. return err
  575. }
  576. return l.cleanupStreamHandler(h.cleanup)
  577. }
  578. // Case 2: Client wants to originate stream.
  579. str := &outStream{
  580. id: h.streamID,
  581. state: empty,
  582. itl: &itemList{},
  583. wq: h.wq,
  584. }
  585. str.itl.enqueue(h)
  586. return l.originateStream(str)
  587. }
  588. func (l *loopyWriter) originateStream(str *outStream) error {
  589. hdr := str.itl.dequeue().(*headerFrame)
  590. if err := hdr.initStream(str.id); err != nil {
  591. if err == ErrConnClosing {
  592. return err
  593. }
  594. // Other errors(errStreamDrain) need not close transport.
  595. return nil
  596. }
  597. if err := l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
  598. return err
  599. }
  600. l.estdStreams[str.id] = str
  601. return nil
  602. }
  603. func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.HeaderField, onWrite func()) error {
  604. if onWrite != nil {
  605. onWrite()
  606. }
  607. l.hBuf.Reset()
  608. for _, f := range hf {
  609. if err := l.hEnc.WriteField(f); err != nil {
  610. if logger.V(logLevel) {
  611. logger.Warningf("transport: loopyWriter.writeHeader encountered error while encoding headers: %v", err)
  612. }
  613. }
  614. }
  615. var (
  616. err error
  617. endHeaders, first bool
  618. )
  619. first = true
  620. for !endHeaders {
  621. size := l.hBuf.Len()
  622. if size > http2MaxFrameLen {
  623. size = http2MaxFrameLen
  624. } else {
  625. endHeaders = true
  626. }
  627. if first {
  628. first = false
  629. err = l.framer.fr.WriteHeaders(http2.HeadersFrameParam{
  630. StreamID: streamID,
  631. BlockFragment: l.hBuf.Next(size),
  632. EndStream: endStream,
  633. EndHeaders: endHeaders,
  634. })
  635. } else {
  636. err = l.framer.fr.WriteContinuation(
  637. streamID,
  638. endHeaders,
  639. l.hBuf.Next(size),
  640. )
  641. }
  642. if err != nil {
  643. return err
  644. }
  645. }
  646. return nil
  647. }
  648. func (l *loopyWriter) preprocessData(df *dataFrame) error {
  649. str, ok := l.estdStreams[df.streamID]
  650. if !ok {
  651. return nil
  652. }
  653. // If we got data for a stream it means that
  654. // stream was originated and the headers were sent out.
  655. str.itl.enqueue(df)
  656. if str.state == empty {
  657. str.state = active
  658. l.activeStreams.enqueue(str)
  659. }
  660. return nil
  661. }
  662. func (l *loopyWriter) pingHandler(p *ping) error {
  663. if !p.ack {
  664. l.bdpEst.timesnap(p.data)
  665. }
  666. return l.framer.fr.WritePing(p.ack, p.data)
  667. }
  668. func (l *loopyWriter) outFlowControlSizeRequestHandler(o *outFlowControlSizeRequest) error {
  669. o.resp <- l.sendQuota
  670. return nil
  671. }
  672. func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
  673. c.onWrite()
  674. if str, ok := l.estdStreams[c.streamID]; ok {
  675. // On the server side it could be a trailers-only response or
  676. // a RST_STREAM before stream initialization thus the stream might
  677. // not be established yet.
  678. delete(l.estdStreams, c.streamID)
  679. str.deleteSelf()
  680. }
  681. if c.rst { // If RST_STREAM needs to be sent.
  682. if err := l.framer.fr.WriteRSTStream(c.streamID, c.rstCode); err != nil {
  683. return err
  684. }
  685. }
  686. if l.side == clientSide && l.draining && len(l.estdStreams) == 0 {
  687. return ErrConnClosing
  688. }
  689. return nil
  690. }
  691. func (l *loopyWriter) earlyAbortStreamHandler(eas *earlyAbortStream) error {
  692. if l.side == clientSide {
  693. return errors.New("earlyAbortStream not handled on client")
  694. }
  695. // In case the caller forgets to set the http status, default to 200.
  696. if eas.httpStatus == 0 {
  697. eas.httpStatus = 200
  698. }
  699. headerFields := []hpack.HeaderField{
  700. {Name: ":status", Value: strconv.Itoa(int(eas.httpStatus))},
  701. {Name: "content-type", Value: grpcutil.ContentType(eas.contentSubtype)},
  702. {Name: "grpc-status", Value: strconv.Itoa(int(eas.status.Code()))},
  703. {Name: "grpc-message", Value: encodeGrpcMessage(eas.status.Message())},
  704. }
  705. if err := l.writeHeader(eas.streamID, true, headerFields, nil); err != nil {
  706. return err
  707. }
  708. if eas.rst {
  709. if err := l.framer.fr.WriteRSTStream(eas.streamID, http2.ErrCodeNo); err != nil {
  710. return err
  711. }
  712. }
  713. return nil
  714. }
  715. func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error {
  716. if l.side == clientSide {
  717. l.draining = true
  718. if len(l.estdStreams) == 0 {
  719. return ErrConnClosing
  720. }
  721. }
  722. return nil
  723. }
  724. func (l *loopyWriter) goAwayHandler(g *goAway) error {
  725. // Handling of outgoing GoAway is very specific to side.
  726. if l.ssGoAwayHandler != nil {
  727. draining, err := l.ssGoAwayHandler(g)
  728. if err != nil {
  729. return err
  730. }
  731. l.draining = draining
  732. }
  733. return nil
  734. }
  735. func (l *loopyWriter) handle(i interface{}) error {
  736. switch i := i.(type) {
  737. case *incomingWindowUpdate:
  738. return l.incomingWindowUpdateHandler(i)
  739. case *outgoingWindowUpdate:
  740. return l.outgoingWindowUpdateHandler(i)
  741. case *incomingSettings:
  742. return l.incomingSettingsHandler(i)
  743. case *outgoingSettings:
  744. return l.outgoingSettingsHandler(i)
  745. case *headerFrame:
  746. return l.headerHandler(i)
  747. case *registerStream:
  748. return l.registerStreamHandler(i)
  749. case *cleanupStream:
  750. return l.cleanupStreamHandler(i)
  751. case *earlyAbortStream:
  752. return l.earlyAbortStreamHandler(i)
  753. case *incomingGoAway:
  754. return l.incomingGoAwayHandler(i)
  755. case *dataFrame:
  756. return l.preprocessData(i)
  757. case *ping:
  758. return l.pingHandler(i)
  759. case *goAway:
  760. return l.goAwayHandler(i)
  761. case *outFlowControlSizeRequest:
  762. return l.outFlowControlSizeRequestHandler(i)
  763. default:
  764. return fmt.Errorf("transport: unknown control message type %T", i)
  765. }
  766. }
  767. func (l *loopyWriter) applySettings(ss []http2.Setting) error {
  768. for _, s := range ss {
  769. switch s.ID {
  770. case http2.SettingInitialWindowSize:
  771. o := l.oiws
  772. l.oiws = s.Val
  773. if o < l.oiws {
  774. // If the new limit is greater make all depleted streams active.
  775. for _, stream := range l.estdStreams {
  776. if stream.state == waitingOnStreamQuota {
  777. stream.state = active
  778. l.activeStreams.enqueue(stream)
  779. }
  780. }
  781. }
  782. case http2.SettingHeaderTableSize:
  783. updateHeaderTblSize(l.hEnc, s.Val)
  784. }
  785. }
  786. return nil
  787. }
  788. // processData removes the first stream from active streams, writes out at most 16KB
  789. // of its data and then puts it at the end of activeStreams if there's still more data
  790. // to be sent and stream has some stream-level flow control.
  791. func (l *loopyWriter) processData() (bool, error) {
  792. if l.sendQuota == 0 {
  793. return true, nil
  794. }
  795. str := l.activeStreams.dequeue() // Remove the first stream.
  796. if str == nil {
  797. return true, nil
  798. }
  799. dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream.
  800. // A data item is represented by a dataFrame, since it later translates into
  801. // multiple HTTP2 data frames.
  802. // Every dataFrame has two buffers; h that keeps grpc-message header and d that is acutal data.
  803. // As an optimization to keep wire traffic low, data from d is copied to h to make as big as the
  804. // maximum possilbe HTTP2 frame size.
  805. if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // Empty data frame
  806. // Client sends out empty data frame with endStream = true
  807. if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
  808. return false, err
  809. }
  810. str.itl.dequeue() // remove the empty data item from stream
  811. if str.itl.isEmpty() {
  812. str.state = empty
  813. } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
  814. if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
  815. return false, err
  816. }
  817. if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
  818. return false, nil
  819. }
  820. } else {
  821. l.activeStreams.enqueue(str)
  822. }
  823. return false, nil
  824. }
  825. var (
  826. buf []byte
  827. )
  828. // Figure out the maximum size we can send
  829. maxSize := http2MaxFrameLen
  830. if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 { // stream-level flow control.
  831. str.state = waitingOnStreamQuota
  832. return false, nil
  833. } else if maxSize > strQuota {
  834. maxSize = strQuota
  835. }
  836. if maxSize > int(l.sendQuota) { // connection-level flow control.
  837. maxSize = int(l.sendQuota)
  838. }
  839. // Compute how much of the header and data we can send within quota and max frame length
  840. hSize := min(maxSize, len(dataItem.h))
  841. dSize := min(maxSize-hSize, len(dataItem.d))
  842. if hSize != 0 {
  843. if dSize == 0 {
  844. buf = dataItem.h
  845. } else {
  846. // We can add some data to grpc message header to distribute bytes more equally across frames.
  847. // Copy on the stack to avoid generating garbage
  848. var localBuf [http2MaxFrameLen]byte
  849. copy(localBuf[:hSize], dataItem.h)
  850. copy(localBuf[hSize:], dataItem.d[:dSize])
  851. buf = localBuf[:hSize+dSize]
  852. }
  853. } else {
  854. buf = dataItem.d
  855. }
  856. size := hSize + dSize
  857. // Now that outgoing flow controls are checked we can replenish str's write quota
  858. str.wq.replenish(size)
  859. var endStream bool
  860. // If this is the last data message on this stream and all of it can be written in this iteration.
  861. if dataItem.endStream && len(dataItem.h)+len(dataItem.d) <= size {
  862. endStream = true
  863. }
  864. if dataItem.onEachWrite != nil {
  865. dataItem.onEachWrite()
  866. }
  867. if err := l.framer.fr.WriteData(dataItem.streamID, endStream, buf[:size]); err != nil {
  868. return false, err
  869. }
  870. str.bytesOutStanding += size
  871. l.sendQuota -= uint32(size)
  872. dataItem.h = dataItem.h[hSize:]
  873. dataItem.d = dataItem.d[dSize:]
  874. if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // All the data from that message was written out.
  875. str.itl.dequeue()
  876. }
  877. if str.itl.isEmpty() {
  878. str.state = empty
  879. } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // The next item is trailers.
  880. if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
  881. return false, err
  882. }
  883. if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
  884. return false, err
  885. }
  886. } else if int(l.oiws)-str.bytesOutStanding <= 0 { // Ran out of stream quota.
  887. str.state = waitingOnStreamQuota
  888. } else { // Otherwise add it back to the list of active streams.
  889. l.activeStreams.enqueue(str)
  890. }
  891. return false, nil
  892. }
  893. func min(a, b int) int {
  894. if a < b {
  895. return a
  896. }
  897. return b
  898. }