http_util.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package transport
  19. import (
  20. "bufio"
  21. "encoding/base64"
  22. "errors"
  23. "fmt"
  24. "io"
  25. "math"
  26. "net"
  27. "net/http"
  28. "net/url"
  29. "strconv"
  30. "strings"
  31. "sync"
  32. "time"
  33. "unicode/utf8"
  34. "github.com/golang/protobuf/proto"
  35. "golang.org/x/net/http2"
  36. "golang.org/x/net/http2/hpack"
  37. spb "google.golang.org/genproto/googleapis/rpc/status"
  38. "google.golang.org/grpc/codes"
  39. "google.golang.org/grpc/status"
  40. )
  41. const (
  42. // http2MaxFrameLen specifies the max length of a HTTP2 frame.
  43. http2MaxFrameLen = 16384 // 16KB frame
  44. // https://httpwg.org/specs/rfc7540.html#SettingValues
  45. http2InitHeaderTableSize = 4096
  46. )
  47. var (
  48. clientPreface = []byte(http2.ClientPreface)
  49. http2ErrConvTab = map[http2.ErrCode]codes.Code{
  50. http2.ErrCodeNo: codes.Internal,
  51. http2.ErrCodeProtocol: codes.Internal,
  52. http2.ErrCodeInternal: codes.Internal,
  53. http2.ErrCodeFlowControl: codes.ResourceExhausted,
  54. http2.ErrCodeSettingsTimeout: codes.Internal,
  55. http2.ErrCodeStreamClosed: codes.Internal,
  56. http2.ErrCodeFrameSize: codes.Internal,
  57. http2.ErrCodeRefusedStream: codes.Unavailable,
  58. http2.ErrCodeCancel: codes.Canceled,
  59. http2.ErrCodeCompression: codes.Internal,
  60. http2.ErrCodeConnect: codes.Internal,
  61. http2.ErrCodeEnhanceYourCalm: codes.ResourceExhausted,
  62. http2.ErrCodeInadequateSecurity: codes.PermissionDenied,
  63. http2.ErrCodeHTTP11Required: codes.Internal,
  64. }
  65. // HTTPStatusConvTab is the HTTP status code to gRPC error code conversion table.
  66. HTTPStatusConvTab = map[int]codes.Code{
  67. // 400 Bad Request - INTERNAL.
  68. http.StatusBadRequest: codes.Internal,
  69. // 401 Unauthorized - UNAUTHENTICATED.
  70. http.StatusUnauthorized: codes.Unauthenticated,
  71. // 403 Forbidden - PERMISSION_DENIED.
  72. http.StatusForbidden: codes.PermissionDenied,
  73. // 404 Not Found - UNIMPLEMENTED.
  74. http.StatusNotFound: codes.Unimplemented,
  75. // 429 Too Many Requests - UNAVAILABLE.
  76. http.StatusTooManyRequests: codes.Unavailable,
  77. // 502 Bad Gateway - UNAVAILABLE.
  78. http.StatusBadGateway: codes.Unavailable,
  79. // 503 Service Unavailable - UNAVAILABLE.
  80. http.StatusServiceUnavailable: codes.Unavailable,
  81. // 504 Gateway timeout - UNAVAILABLE.
  82. http.StatusGatewayTimeout: codes.Unavailable,
  83. }
  84. )
  85. // isReservedHeader checks whether hdr belongs to HTTP2 headers
  86. // reserved by gRPC protocol. Any other headers are classified as the
  87. // user-specified metadata.
  88. func isReservedHeader(hdr string) bool {
  89. if hdr != "" && hdr[0] == ':' {
  90. return true
  91. }
  92. switch hdr {
  93. case "content-type",
  94. "user-agent",
  95. "grpc-message-type",
  96. "grpc-encoding",
  97. "grpc-message",
  98. "grpc-status",
  99. "grpc-timeout",
  100. "grpc-status-details-bin",
  101. // Intentionally exclude grpc-previous-rpc-attempts and
  102. // grpc-retry-pushback-ms, which are "reserved", but their API
  103. // intentionally works via metadata.
  104. "te":
  105. return true
  106. default:
  107. return false
  108. }
  109. }
  110. // isWhitelistedHeader checks whether hdr should be propagated into metadata
  111. // visible to users, even though it is classified as "reserved", above.
  112. func isWhitelistedHeader(hdr string) bool {
  113. switch hdr {
  114. case ":authority", "user-agent":
  115. return true
  116. default:
  117. return false
  118. }
  119. }
  120. const binHdrSuffix = "-bin"
  121. func encodeBinHeader(v []byte) string {
  122. return base64.RawStdEncoding.EncodeToString(v)
  123. }
  124. func decodeBinHeader(v string) ([]byte, error) {
  125. if len(v)%4 == 0 {
  126. // Input was padded, or padding was not necessary.
  127. return base64.StdEncoding.DecodeString(v)
  128. }
  129. return base64.RawStdEncoding.DecodeString(v)
  130. }
  131. func encodeMetadataHeader(k, v string) string {
  132. if strings.HasSuffix(k, binHdrSuffix) {
  133. return encodeBinHeader(([]byte)(v))
  134. }
  135. return v
  136. }
  137. func decodeMetadataHeader(k, v string) (string, error) {
  138. if strings.HasSuffix(k, binHdrSuffix) {
  139. b, err := decodeBinHeader(v)
  140. return string(b), err
  141. }
  142. return v, nil
  143. }
  144. func decodeGRPCStatusDetails(rawDetails string) (*status.Status, error) {
  145. v, err := decodeBinHeader(rawDetails)
  146. if err != nil {
  147. return nil, err
  148. }
  149. st := &spb.Status{}
  150. if err = proto.Unmarshal(v, st); err != nil {
  151. return nil, err
  152. }
  153. return status.FromProto(st), nil
  154. }
  155. type timeoutUnit uint8
  156. const (
  157. hour timeoutUnit = 'H'
  158. minute timeoutUnit = 'M'
  159. second timeoutUnit = 'S'
  160. millisecond timeoutUnit = 'm'
  161. microsecond timeoutUnit = 'u'
  162. nanosecond timeoutUnit = 'n'
  163. )
  164. func timeoutUnitToDuration(u timeoutUnit) (d time.Duration, ok bool) {
  165. switch u {
  166. case hour:
  167. return time.Hour, true
  168. case minute:
  169. return time.Minute, true
  170. case second:
  171. return time.Second, true
  172. case millisecond:
  173. return time.Millisecond, true
  174. case microsecond:
  175. return time.Microsecond, true
  176. case nanosecond:
  177. return time.Nanosecond, true
  178. default:
  179. }
  180. return
  181. }
  182. func decodeTimeout(s string) (time.Duration, error) {
  183. size := len(s)
  184. if size < 2 {
  185. return 0, fmt.Errorf("transport: timeout string is too short: %q", s)
  186. }
  187. if size > 9 {
  188. // Spec allows for 8 digits plus the unit.
  189. return 0, fmt.Errorf("transport: timeout string is too long: %q", s)
  190. }
  191. unit := timeoutUnit(s[size-1])
  192. d, ok := timeoutUnitToDuration(unit)
  193. if !ok {
  194. return 0, fmt.Errorf("transport: timeout unit is not recognized: %q", s)
  195. }
  196. t, err := strconv.ParseInt(s[:size-1], 10, 64)
  197. if err != nil {
  198. return 0, err
  199. }
  200. const maxHours = math.MaxInt64 / int64(time.Hour)
  201. if d == time.Hour && t > maxHours {
  202. // This timeout would overflow math.MaxInt64; clamp it.
  203. return time.Duration(math.MaxInt64), nil
  204. }
  205. return d * time.Duration(t), nil
  206. }
  207. const (
  208. spaceByte = ' '
  209. tildeByte = '~'
  210. percentByte = '%'
  211. )
  212. // encodeGrpcMessage is used to encode status code in header field
  213. // "grpc-message". It does percent encoding and also replaces invalid utf-8
  214. // characters with Unicode replacement character.
  215. //
  216. // It checks to see if each individual byte in msg is an allowable byte, and
  217. // then either percent encoding or passing it through. When percent encoding,
  218. // the byte is converted into hexadecimal notation with a '%' prepended.
  219. func encodeGrpcMessage(msg string) string {
  220. if msg == "" {
  221. return ""
  222. }
  223. lenMsg := len(msg)
  224. for i := 0; i < lenMsg; i++ {
  225. c := msg[i]
  226. if !(c >= spaceByte && c <= tildeByte && c != percentByte) {
  227. return encodeGrpcMessageUnchecked(msg)
  228. }
  229. }
  230. return msg
  231. }
  232. func encodeGrpcMessageUnchecked(msg string) string {
  233. var sb strings.Builder
  234. for len(msg) > 0 {
  235. r, size := utf8.DecodeRuneInString(msg)
  236. for _, b := range []byte(string(r)) {
  237. if size > 1 {
  238. // If size > 1, r is not ascii. Always do percent encoding.
  239. fmt.Fprintf(&sb, "%%%02X", b)
  240. continue
  241. }
  242. // The for loop is necessary even if size == 1. r could be
  243. // utf8.RuneError.
  244. //
  245. // fmt.Sprintf("%%%02X", utf8.RuneError) gives "%FFFD".
  246. if b >= spaceByte && b <= tildeByte && b != percentByte {
  247. sb.WriteByte(b)
  248. } else {
  249. fmt.Fprintf(&sb, "%%%02X", b)
  250. }
  251. }
  252. msg = msg[size:]
  253. }
  254. return sb.String()
  255. }
  256. // decodeGrpcMessage decodes the msg encoded by encodeGrpcMessage.
  257. func decodeGrpcMessage(msg string) string {
  258. if msg == "" {
  259. return ""
  260. }
  261. lenMsg := len(msg)
  262. for i := 0; i < lenMsg; i++ {
  263. if msg[i] == percentByte && i+2 < lenMsg {
  264. return decodeGrpcMessageUnchecked(msg)
  265. }
  266. }
  267. return msg
  268. }
  269. func decodeGrpcMessageUnchecked(msg string) string {
  270. var sb strings.Builder
  271. lenMsg := len(msg)
  272. for i := 0; i < lenMsg; i++ {
  273. c := msg[i]
  274. if c == percentByte && i+2 < lenMsg {
  275. parsed, err := strconv.ParseUint(msg[i+1:i+3], 16, 8)
  276. if err != nil {
  277. sb.WriteByte(c)
  278. } else {
  279. sb.WriteByte(byte(parsed))
  280. i += 2
  281. }
  282. } else {
  283. sb.WriteByte(c)
  284. }
  285. }
  286. return sb.String()
  287. }
  288. type bufWriter struct {
  289. pool *sync.Pool
  290. buf []byte
  291. offset int
  292. batchSize int
  293. conn net.Conn
  294. err error
  295. }
  296. func newBufWriter(conn net.Conn, batchSize int, pool *sync.Pool) *bufWriter {
  297. w := &bufWriter{
  298. batchSize: batchSize,
  299. conn: conn,
  300. pool: pool,
  301. }
  302. // this indicates that we should use non shared buf
  303. if pool == nil {
  304. w.buf = make([]byte, batchSize)
  305. }
  306. return w
  307. }
  308. func (w *bufWriter) Write(b []byte) (n int, err error) {
  309. if w.err != nil {
  310. return 0, w.err
  311. }
  312. if w.batchSize == 0 { // Buffer has been disabled.
  313. n, err = w.conn.Write(b)
  314. return n, toIOError(err)
  315. }
  316. if w.buf == nil {
  317. b := w.pool.Get().(*[]byte)
  318. w.buf = *b
  319. }
  320. for len(b) > 0 {
  321. nn := copy(w.buf[w.offset:], b)
  322. b = b[nn:]
  323. w.offset += nn
  324. n += nn
  325. if w.offset >= w.batchSize {
  326. err = w.flushKeepBuffer()
  327. }
  328. }
  329. return n, err
  330. }
  331. func (w *bufWriter) Flush() error {
  332. err := w.flushKeepBuffer()
  333. // Only release the buffer if we are in a "shared" mode
  334. if w.buf != nil && w.pool != nil {
  335. b := w.buf
  336. w.pool.Put(&b)
  337. w.buf = nil
  338. }
  339. return err
  340. }
  341. func (w *bufWriter) flushKeepBuffer() error {
  342. if w.err != nil {
  343. return w.err
  344. }
  345. if w.offset == 0 {
  346. return nil
  347. }
  348. _, w.err = w.conn.Write(w.buf[:w.offset])
  349. w.err = toIOError(w.err)
  350. w.offset = 0
  351. return w.err
  352. }
  353. type ioError struct {
  354. error
  355. }
  356. func (i ioError) Unwrap() error {
  357. return i.error
  358. }
  359. func isIOError(err error) bool {
  360. return errors.As(err, &ioError{})
  361. }
  362. func toIOError(err error) error {
  363. if err == nil {
  364. return nil
  365. }
  366. return ioError{error: err}
  367. }
  368. type framer struct {
  369. writer *bufWriter
  370. fr *http2.Framer
  371. }
  372. var writeBufferPoolMap map[int]*sync.Pool = make(map[int]*sync.Pool)
  373. var writeBufferMutex sync.Mutex
  374. func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, sharedWriteBuffer bool, maxHeaderListSize uint32) *framer {
  375. if writeBufferSize < 0 {
  376. writeBufferSize = 0
  377. }
  378. var r io.Reader = conn
  379. if readBufferSize > 0 {
  380. r = bufio.NewReaderSize(r, readBufferSize)
  381. }
  382. var pool *sync.Pool
  383. if sharedWriteBuffer {
  384. pool = getWriteBufferPool(writeBufferSize)
  385. }
  386. w := newBufWriter(conn, writeBufferSize, pool)
  387. f := &framer{
  388. writer: w,
  389. fr: http2.NewFramer(w, r),
  390. }
  391. f.fr.SetMaxReadFrameSize(http2MaxFrameLen)
  392. // Opt-in to Frame reuse API on framer to reduce garbage.
  393. // Frames aren't safe to read from after a subsequent call to ReadFrame.
  394. f.fr.SetReuseFrames()
  395. f.fr.MaxHeaderListSize = maxHeaderListSize
  396. f.fr.ReadMetaHeaders = hpack.NewDecoder(http2InitHeaderTableSize, nil)
  397. return f
  398. }
  399. func getWriteBufferPool(writeBufferSize int) *sync.Pool {
  400. writeBufferMutex.Lock()
  401. defer writeBufferMutex.Unlock()
  402. size := writeBufferSize * 2
  403. pool, ok := writeBufferPoolMap[size]
  404. if ok {
  405. return pool
  406. }
  407. pool = &sync.Pool{
  408. New: func() any {
  409. b := make([]byte, size)
  410. return &b
  411. },
  412. }
  413. writeBufferPoolMap[size] = pool
  414. return pool
  415. }
  416. // parseDialTarget returns the network and address to pass to dialer.
  417. func parseDialTarget(target string) (string, string) {
  418. net := "tcp"
  419. m1 := strings.Index(target, ":")
  420. m2 := strings.Index(target, ":/")
  421. // handle unix:addr which will fail with url.Parse
  422. if m1 >= 0 && m2 < 0 {
  423. if n := target[0:m1]; n == "unix" {
  424. return n, target[m1+1:]
  425. }
  426. }
  427. if m2 >= 0 {
  428. t, err := url.Parse(target)
  429. if err != nil {
  430. return net, target
  431. }
  432. scheme := t.Scheme
  433. addr := t.Path
  434. if scheme == "unix" {
  435. if addr == "" {
  436. addr = t.Host
  437. }
  438. return scheme, addr
  439. }
  440. }
  441. return net, target
  442. }