http_util.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513
  1. /*
  2. *
  3. * Copyright 2014, Google Inc.
  4. * All rights reserved.
  5. *
  6. * Redistribution and use in source and binary forms, with or without
  7. * modification, are permitted provided that the following conditions are
  8. * met:
  9. *
  10. * * Redistributions of source code must retain the above copyright
  11. * notice, this list of conditions and the following disclaimer.
  12. * * Redistributions in binary form must reproduce the above
  13. * copyright notice, this list of conditions and the following disclaimer
  14. * in the documentation and/or other materials provided with the
  15. * distribution.
  16. * * Neither the name of Google Inc. nor the names of its
  17. * contributors may be used to endorse or promote products derived from
  18. * this software without specific prior written permission.
  19. *
  20. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  21. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  22. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  23. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  24. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  25. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  26. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  27. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  28. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  29. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  30. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  31. *
  32. */
  33. package transport
  34. import (
  35. "bufio"
  36. "bytes"
  37. "fmt"
  38. "io"
  39. "net"
  40. "strconv"
  41. "strings"
  42. "sync/atomic"
  43. "time"
  44. "golang.org/x/net/http2"
  45. "golang.org/x/net/http2/hpack"
  46. "google.golang.org/grpc/codes"
  47. "google.golang.org/grpc/grpclog"
  48. "google.golang.org/grpc/metadata"
  49. )
  50. const (
  51. // The primary user agent
  52. primaryUA = "grpc-go/1.0"
  53. // http2MaxFrameLen specifies the max length of a HTTP2 frame.
  54. http2MaxFrameLen = 16384 // 16KB frame
  55. // http://http2.github.io/http2-spec/#SettingValues
  56. http2InitHeaderTableSize = 4096
  57. // http2IOBufSize specifies the buffer size for sending frames.
  58. http2IOBufSize = 32 * 1024
  59. )
  60. var (
  61. clientPreface = []byte(http2.ClientPreface)
  62. http2ErrConvTab = map[http2.ErrCode]codes.Code{
  63. http2.ErrCodeNo: codes.Internal,
  64. http2.ErrCodeProtocol: codes.Internal,
  65. http2.ErrCodeInternal: codes.Internal,
  66. http2.ErrCodeFlowControl: codes.ResourceExhausted,
  67. http2.ErrCodeSettingsTimeout: codes.Internal,
  68. http2.ErrCodeStreamClosed: codes.Internal,
  69. http2.ErrCodeFrameSize: codes.Internal,
  70. http2.ErrCodeRefusedStream: codes.Unavailable,
  71. http2.ErrCodeCancel: codes.Canceled,
  72. http2.ErrCodeCompression: codes.Internal,
  73. http2.ErrCodeConnect: codes.Internal,
  74. http2.ErrCodeEnhanceYourCalm: codes.ResourceExhausted,
  75. http2.ErrCodeInadequateSecurity: codes.PermissionDenied,
  76. http2.ErrCodeHTTP11Required: codes.FailedPrecondition,
  77. }
  78. statusCodeConvTab = map[codes.Code]http2.ErrCode{
  79. codes.Internal: http2.ErrCodeInternal,
  80. codes.Canceled: http2.ErrCodeCancel,
  81. codes.Unavailable: http2.ErrCodeRefusedStream,
  82. codes.ResourceExhausted: http2.ErrCodeEnhanceYourCalm,
  83. codes.PermissionDenied: http2.ErrCodeInadequateSecurity,
  84. }
  85. )
  86. // Records the states during HPACK decoding. Must be reset once the
  87. // decoding of the entire headers are finished.
  88. type decodeState struct {
  89. err error // first error encountered decoding
  90. encoding string
  91. // statusCode caches the stream status received from the trailer
  92. // the server sent. Client side only.
  93. statusCode codes.Code
  94. statusDesc string
  95. // Server side only fields.
  96. timeoutSet bool
  97. timeout time.Duration
  98. method string
  99. // key-value metadata map from the peer.
  100. mdata map[string][]string
  101. }
  102. // isReservedHeader checks whether hdr belongs to HTTP2 headers
  103. // reserved by gRPC protocol. Any other headers are classified as the
  104. // user-specified metadata.
  105. func isReservedHeader(hdr string) bool {
  106. if hdr != "" && hdr[0] == ':' {
  107. return true
  108. }
  109. switch hdr {
  110. case "content-type",
  111. "grpc-message-type",
  112. "grpc-encoding",
  113. "grpc-message",
  114. "grpc-status",
  115. "grpc-timeout",
  116. "te":
  117. return true
  118. default:
  119. return false
  120. }
  121. }
  122. // isWhitelistedPseudoHeader checks whether hdr belongs to HTTP2 pseudoheaders
  123. // that should be propagated into metadata visible to users.
  124. func isWhitelistedPseudoHeader(hdr string) bool {
  125. switch hdr {
  126. case ":authority":
  127. return true
  128. default:
  129. return false
  130. }
  131. }
  132. func (d *decodeState) setErr(err error) {
  133. if d.err == nil {
  134. d.err = err
  135. }
  136. }
  137. func validContentType(t string) bool {
  138. e := "application/grpc"
  139. if !strings.HasPrefix(t, e) {
  140. return false
  141. }
  142. // Support variations on the content-type
  143. // (e.g. "application/grpc+blah", "application/grpc;blah").
  144. if len(t) > len(e) && t[len(e)] != '+' && t[len(e)] != ';' {
  145. return false
  146. }
  147. return true
  148. }
  149. func (d *decodeState) processHeaderField(f hpack.HeaderField) {
  150. switch f.Name {
  151. case "content-type":
  152. if !validContentType(f.Value) {
  153. d.setErr(streamErrorf(codes.FailedPrecondition, "transport: received the unexpected content-type %q", f.Value))
  154. return
  155. }
  156. case "grpc-encoding":
  157. d.encoding = f.Value
  158. case "grpc-status":
  159. code, err := strconv.Atoi(f.Value)
  160. if err != nil {
  161. d.setErr(streamErrorf(codes.Internal, "transport: malformed grpc-status: %v", err))
  162. return
  163. }
  164. d.statusCode = codes.Code(code)
  165. case "grpc-message":
  166. d.statusDesc = decodeGrpcMessage(f.Value)
  167. case "grpc-timeout":
  168. d.timeoutSet = true
  169. var err error
  170. d.timeout, err = decodeTimeout(f.Value)
  171. if err != nil {
  172. d.setErr(streamErrorf(codes.Internal, "transport: malformed time-out: %v", err))
  173. return
  174. }
  175. case ":path":
  176. d.method = f.Value
  177. default:
  178. if !isReservedHeader(f.Name) || isWhitelistedPseudoHeader(f.Name) {
  179. if f.Name == "user-agent" {
  180. i := strings.LastIndex(f.Value, " ")
  181. if i == -1 {
  182. // There is no application user agent string being set.
  183. return
  184. }
  185. // Extract the application user agent string.
  186. f.Value = f.Value[:i]
  187. }
  188. if d.mdata == nil {
  189. d.mdata = make(map[string][]string)
  190. }
  191. k, v, err := metadata.DecodeKeyValue(f.Name, f.Value)
  192. if err != nil {
  193. grpclog.Printf("Failed to decode (%q, %q): %v", f.Name, f.Value, err)
  194. return
  195. }
  196. d.mdata[k] = append(d.mdata[k], v)
  197. }
  198. }
  199. }
  200. type timeoutUnit uint8
  201. const (
  202. hour timeoutUnit = 'H'
  203. minute timeoutUnit = 'M'
  204. second timeoutUnit = 'S'
  205. millisecond timeoutUnit = 'm'
  206. microsecond timeoutUnit = 'u'
  207. nanosecond timeoutUnit = 'n'
  208. )
  209. func timeoutUnitToDuration(u timeoutUnit) (d time.Duration, ok bool) {
  210. switch u {
  211. case hour:
  212. return time.Hour, true
  213. case minute:
  214. return time.Minute, true
  215. case second:
  216. return time.Second, true
  217. case millisecond:
  218. return time.Millisecond, true
  219. case microsecond:
  220. return time.Microsecond, true
  221. case nanosecond:
  222. return time.Nanosecond, true
  223. default:
  224. }
  225. return
  226. }
  227. const maxTimeoutValue int64 = 100000000 - 1
  228. // div does integer division and round-up the result. Note that this is
  229. // equivalent to (d+r-1)/r but has less chance to overflow.
  230. func div(d, r time.Duration) int64 {
  231. if m := d % r; m > 0 {
  232. return int64(d/r + 1)
  233. }
  234. return int64(d / r)
  235. }
  236. // TODO(zhaoq): It is the simplistic and not bandwidth efficient. Improve it.
  237. func encodeTimeout(t time.Duration) string {
  238. if t <= 0 {
  239. return "0n"
  240. }
  241. if d := div(t, time.Nanosecond); d <= maxTimeoutValue {
  242. return strconv.FormatInt(d, 10) + "n"
  243. }
  244. if d := div(t, time.Microsecond); d <= maxTimeoutValue {
  245. return strconv.FormatInt(d, 10) + "u"
  246. }
  247. if d := div(t, time.Millisecond); d <= maxTimeoutValue {
  248. return strconv.FormatInt(d, 10) + "m"
  249. }
  250. if d := div(t, time.Second); d <= maxTimeoutValue {
  251. return strconv.FormatInt(d, 10) + "S"
  252. }
  253. if d := div(t, time.Minute); d <= maxTimeoutValue {
  254. return strconv.FormatInt(d, 10) + "M"
  255. }
  256. // Note that maxTimeoutValue * time.Hour > MaxInt64.
  257. return strconv.FormatInt(div(t, time.Hour), 10) + "H"
  258. }
  259. func decodeTimeout(s string) (time.Duration, error) {
  260. size := len(s)
  261. if size < 2 {
  262. return 0, fmt.Errorf("transport: timeout string is too short: %q", s)
  263. }
  264. unit := timeoutUnit(s[size-1])
  265. d, ok := timeoutUnitToDuration(unit)
  266. if !ok {
  267. return 0, fmt.Errorf("transport: timeout unit is not recognized: %q", s)
  268. }
  269. t, err := strconv.ParseInt(s[:size-1], 10, 64)
  270. if err != nil {
  271. return 0, err
  272. }
  273. return d * time.Duration(t), nil
  274. }
  275. const (
  276. spaceByte = ' '
  277. tildaByte = '~'
  278. percentByte = '%'
  279. )
  280. // encodeGrpcMessage is used to encode status code in header field
  281. // "grpc-message".
  282. // It checks to see if each individual byte in msg is an
  283. // allowable byte, and then either percent encoding or passing it through.
  284. // When percent encoding, the byte is converted into hexadecimal notation
  285. // with a '%' prepended.
  286. func encodeGrpcMessage(msg string) string {
  287. if msg == "" {
  288. return ""
  289. }
  290. lenMsg := len(msg)
  291. for i := 0; i < lenMsg; i++ {
  292. c := msg[i]
  293. if !(c >= spaceByte && c < tildaByte && c != percentByte) {
  294. return encodeGrpcMessageUnchecked(msg)
  295. }
  296. }
  297. return msg
  298. }
  299. func encodeGrpcMessageUnchecked(msg string) string {
  300. var buf bytes.Buffer
  301. lenMsg := len(msg)
  302. for i := 0; i < lenMsg; i++ {
  303. c := msg[i]
  304. if c >= spaceByte && c < tildaByte && c != percentByte {
  305. buf.WriteByte(c)
  306. } else {
  307. buf.WriteString(fmt.Sprintf("%%%02X", c))
  308. }
  309. }
  310. return buf.String()
  311. }
  312. // decodeGrpcMessage decodes the msg encoded by encodeGrpcMessage.
  313. func decodeGrpcMessage(msg string) string {
  314. if msg == "" {
  315. return ""
  316. }
  317. lenMsg := len(msg)
  318. for i := 0; i < lenMsg; i++ {
  319. if msg[i] == percentByte && i+2 < lenMsg {
  320. return decodeGrpcMessageUnchecked(msg)
  321. }
  322. }
  323. return msg
  324. }
  325. func decodeGrpcMessageUnchecked(msg string) string {
  326. var buf bytes.Buffer
  327. lenMsg := len(msg)
  328. for i := 0; i < lenMsg; i++ {
  329. c := msg[i]
  330. if c == percentByte && i+2 < lenMsg {
  331. parsed, err := strconv.ParseUint(msg[i+1:i+3], 16, 8)
  332. if err != nil {
  333. buf.WriteByte(c)
  334. } else {
  335. buf.WriteByte(byte(parsed))
  336. i += 2
  337. }
  338. } else {
  339. buf.WriteByte(c)
  340. }
  341. }
  342. return buf.String()
  343. }
  344. type framer struct {
  345. numWriters int32
  346. reader io.Reader
  347. writer *bufio.Writer
  348. fr *http2.Framer
  349. }
  350. func newFramer(conn net.Conn) *framer {
  351. f := &framer{
  352. reader: bufio.NewReaderSize(conn, http2IOBufSize),
  353. writer: bufio.NewWriterSize(conn, http2IOBufSize),
  354. }
  355. f.fr = http2.NewFramer(f.writer, f.reader)
  356. f.fr.ReadMetaHeaders = hpack.NewDecoder(http2InitHeaderTableSize, nil)
  357. return f
  358. }
  359. func (f *framer) adjustNumWriters(i int32) int32 {
  360. return atomic.AddInt32(&f.numWriters, i)
  361. }
  362. // The following writeXXX functions can only be called when the caller gets
  363. // unblocked from writableChan channel (i.e., owns the privilege to write).
  364. func (f *framer) writeContinuation(forceFlush bool, streamID uint32, endHeaders bool, headerBlockFragment []byte) error {
  365. if err := f.fr.WriteContinuation(streamID, endHeaders, headerBlockFragment); err != nil {
  366. return err
  367. }
  368. if forceFlush {
  369. return f.writer.Flush()
  370. }
  371. return nil
  372. }
  373. func (f *framer) writeData(forceFlush bool, streamID uint32, endStream bool, data []byte) error {
  374. if err := f.fr.WriteData(streamID, endStream, data); err != nil {
  375. return err
  376. }
  377. if forceFlush {
  378. return f.writer.Flush()
  379. }
  380. return nil
  381. }
  382. func (f *framer) writeGoAway(forceFlush bool, maxStreamID uint32, code http2.ErrCode, debugData []byte) error {
  383. if err := f.fr.WriteGoAway(maxStreamID, code, debugData); err != nil {
  384. return err
  385. }
  386. if forceFlush {
  387. return f.writer.Flush()
  388. }
  389. return nil
  390. }
  391. func (f *framer) writeHeaders(forceFlush bool, p http2.HeadersFrameParam) error {
  392. if err := f.fr.WriteHeaders(p); err != nil {
  393. return err
  394. }
  395. if forceFlush {
  396. return f.writer.Flush()
  397. }
  398. return nil
  399. }
  400. func (f *framer) writePing(forceFlush, ack bool, data [8]byte) error {
  401. if err := f.fr.WritePing(ack, data); err != nil {
  402. return err
  403. }
  404. if forceFlush {
  405. return f.writer.Flush()
  406. }
  407. return nil
  408. }
  409. func (f *framer) writePriority(forceFlush bool, streamID uint32, p http2.PriorityParam) error {
  410. if err := f.fr.WritePriority(streamID, p); err != nil {
  411. return err
  412. }
  413. if forceFlush {
  414. return f.writer.Flush()
  415. }
  416. return nil
  417. }
  418. func (f *framer) writePushPromise(forceFlush bool, p http2.PushPromiseParam) error {
  419. if err := f.fr.WritePushPromise(p); err != nil {
  420. return err
  421. }
  422. if forceFlush {
  423. return f.writer.Flush()
  424. }
  425. return nil
  426. }
  427. func (f *framer) writeRSTStream(forceFlush bool, streamID uint32, code http2.ErrCode) error {
  428. if err := f.fr.WriteRSTStream(streamID, code); err != nil {
  429. return err
  430. }
  431. if forceFlush {
  432. return f.writer.Flush()
  433. }
  434. return nil
  435. }
  436. func (f *framer) writeSettings(forceFlush bool, settings ...http2.Setting) error {
  437. if err := f.fr.WriteSettings(settings...); err != nil {
  438. return err
  439. }
  440. if forceFlush {
  441. return f.writer.Flush()
  442. }
  443. return nil
  444. }
  445. func (f *framer) writeSettingsAck(forceFlush bool) error {
  446. if err := f.fr.WriteSettingsAck(); err != nil {
  447. return err
  448. }
  449. if forceFlush {
  450. return f.writer.Flush()
  451. }
  452. return nil
  453. }
  454. func (f *framer) writeWindowUpdate(forceFlush bool, streamID, incr uint32) error {
  455. if err := f.fr.WriteWindowUpdate(streamID, incr); err != nil {
  456. return err
  457. }
  458. if forceFlush {
  459. return f.writer.Flush()
  460. }
  461. return nil
  462. }
  463. func (f *framer) flushWrite() error {
  464. return f.writer.Flush()
  465. }
  466. func (f *framer) readFrame() (http2.Frame, error) {
  467. return f.fr.ReadFrame()
  468. }
  469. func (f *framer) errorDetail() error {
  470. return f.fr.ErrorDetail()
  471. }