http2_server.go 45 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package transport
  19. import (
  20. "bytes"
  21. "context"
  22. "errors"
  23. "fmt"
  24. "io"
  25. "math"
  26. "net"
  27. "net/http"
  28. "strconv"
  29. "sync"
  30. "sync/atomic"
  31. "time"
  32. "github.com/golang/protobuf/proto"
  33. "golang.org/x/net/http2"
  34. "golang.org/x/net/http2/hpack"
  35. "google.golang.org/grpc/internal/grpclog"
  36. "google.golang.org/grpc/internal/grpcutil"
  37. "google.golang.org/grpc/internal/pretty"
  38. "google.golang.org/grpc/internal/syscall"
  39. "google.golang.org/grpc/codes"
  40. "google.golang.org/grpc/credentials"
  41. "google.golang.org/grpc/internal/channelz"
  42. "google.golang.org/grpc/internal/grpcrand"
  43. "google.golang.org/grpc/internal/grpcsync"
  44. "google.golang.org/grpc/keepalive"
  45. "google.golang.org/grpc/metadata"
  46. "google.golang.org/grpc/peer"
  47. "google.golang.org/grpc/stats"
  48. "google.golang.org/grpc/status"
  49. "google.golang.org/grpc/tap"
  50. )
  51. var (
  52. // ErrIllegalHeaderWrite indicates that setting header is illegal because of
  53. // the stream's state.
  54. ErrIllegalHeaderWrite = status.Error(codes.Internal, "transport: SendHeader called multiple times")
  55. // ErrHeaderListSizeLimitViolation indicates that the header list size is larger
  56. // than the limit set by peer.
  57. ErrHeaderListSizeLimitViolation = status.Error(codes.Internal, "transport: trying to send header list size larger than the limit set by peer")
  58. )
  59. // serverConnectionCounter counts the number of connections a server has seen
  60. // (equal to the number of http2Servers created). Must be accessed atomically.
  61. var serverConnectionCounter uint64
  62. // http2Server implements the ServerTransport interface with HTTP2.
  63. type http2Server struct {
  64. lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
  65. ctx context.Context
  66. done chan struct{}
  67. conn net.Conn
  68. loopy *loopyWriter
  69. readerDone chan struct{} // sync point to enable testing.
  70. writerDone chan struct{} // sync point to enable testing.
  71. remoteAddr net.Addr
  72. localAddr net.Addr
  73. authInfo credentials.AuthInfo // auth info about the connection
  74. inTapHandle tap.ServerInHandle
  75. framer *framer
  76. // The max number of concurrent streams.
  77. maxStreams uint32
  78. // controlBuf delivers all the control related tasks (e.g., window
  79. // updates, reset streams, and various settings) to the controller.
  80. controlBuf *controlBuffer
  81. fc *trInFlow
  82. stats []stats.Handler
  83. // Keepalive and max-age parameters for the server.
  84. kp keepalive.ServerParameters
  85. // Keepalive enforcement policy.
  86. kep keepalive.EnforcementPolicy
  87. // The time instance last ping was received.
  88. lastPingAt time.Time
  89. // Number of times the client has violated keepalive ping policy so far.
  90. pingStrikes uint8
  91. // Flag to signify that number of ping strikes should be reset to 0.
  92. // This is set whenever data or header frames are sent.
  93. // 1 means yes.
  94. resetPingStrikes uint32 // Accessed atomically.
  95. initialWindowSize int32
  96. bdpEst *bdpEstimator
  97. maxSendHeaderListSize *uint32
  98. mu sync.Mutex // guard the following
  99. // drainEvent is initialized when Drain() is called the first time. After
  100. // which the server writes out the first GoAway(with ID 2^31-1) frame. Then
  101. // an independent goroutine will be launched to later send the second
  102. // GoAway. During this time we don't want to write another first GoAway(with
  103. // ID 2^31 -1) frame. Thus call to Drain() will be a no-op if drainEvent is
  104. // already initialized since draining is already underway.
  105. drainEvent *grpcsync.Event
  106. state transportState
  107. activeStreams map[uint32]*Stream
  108. // idle is the time instant when the connection went idle.
  109. // This is either the beginning of the connection or when the number of
  110. // RPCs go down to 0.
  111. // When the connection is busy, this value is set to 0.
  112. idle time.Time
  113. // Fields below are for channelz metric collection.
  114. channelzID *channelz.Identifier
  115. czData *channelzData
  116. bufferPool *bufferPool
  117. connectionID uint64
  118. // maxStreamMu guards the maximum stream ID
  119. // This lock may not be taken if mu is already held.
  120. maxStreamMu sync.Mutex
  121. maxStreamID uint32 // max stream ID ever seen
  122. logger *grpclog.PrefixLogger
  123. }
  124. // NewServerTransport creates a http2 transport with conn and configuration
  125. // options from config.
  126. //
  127. // It returns a non-nil transport and a nil error on success. On failure, it
  128. // returns a nil transport and a non-nil error. For a special case where the
  129. // underlying conn gets closed before the client preface could be read, it
  130. // returns a nil transport and a nil error.
  131. func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
  132. var authInfo credentials.AuthInfo
  133. rawConn := conn
  134. if config.Credentials != nil {
  135. var err error
  136. conn, authInfo, err = config.Credentials.ServerHandshake(rawConn)
  137. if err != nil {
  138. // ErrConnDispatched means that the connection was dispatched away
  139. // from gRPC; those connections should be left open. io.EOF means
  140. // the connection was closed before handshaking completed, which can
  141. // happen naturally from probers. Return these errors directly.
  142. if err == credentials.ErrConnDispatched || err == io.EOF {
  143. return nil, err
  144. }
  145. return nil, connectionErrorf(false, err, "ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
  146. }
  147. }
  148. writeBufSize := config.WriteBufferSize
  149. readBufSize := config.ReadBufferSize
  150. maxHeaderListSize := defaultServerMaxHeaderListSize
  151. if config.MaxHeaderListSize != nil {
  152. maxHeaderListSize = *config.MaxHeaderListSize
  153. }
  154. framer := newFramer(conn, writeBufSize, readBufSize, config.SharedWriteBuffer, maxHeaderListSize)
  155. // Send initial settings as connection preface to client.
  156. isettings := []http2.Setting{{
  157. ID: http2.SettingMaxFrameSize,
  158. Val: http2MaxFrameLen,
  159. }}
  160. if config.MaxStreams != math.MaxUint32 {
  161. isettings = append(isettings, http2.Setting{
  162. ID: http2.SettingMaxConcurrentStreams,
  163. Val: config.MaxStreams,
  164. })
  165. }
  166. dynamicWindow := true
  167. iwz := int32(initialWindowSize)
  168. if config.InitialWindowSize >= defaultWindowSize {
  169. iwz = config.InitialWindowSize
  170. dynamicWindow = false
  171. }
  172. icwz := int32(initialWindowSize)
  173. if config.InitialConnWindowSize >= defaultWindowSize {
  174. icwz = config.InitialConnWindowSize
  175. dynamicWindow = false
  176. }
  177. if iwz != defaultWindowSize {
  178. isettings = append(isettings, http2.Setting{
  179. ID: http2.SettingInitialWindowSize,
  180. Val: uint32(iwz)})
  181. }
  182. if config.MaxHeaderListSize != nil {
  183. isettings = append(isettings, http2.Setting{
  184. ID: http2.SettingMaxHeaderListSize,
  185. Val: *config.MaxHeaderListSize,
  186. })
  187. }
  188. if config.HeaderTableSize != nil {
  189. isettings = append(isettings, http2.Setting{
  190. ID: http2.SettingHeaderTableSize,
  191. Val: *config.HeaderTableSize,
  192. })
  193. }
  194. if err := framer.fr.WriteSettings(isettings...); err != nil {
  195. return nil, connectionErrorf(false, err, "transport: %v", err)
  196. }
  197. // Adjust the connection flow control window if needed.
  198. if delta := uint32(icwz - defaultWindowSize); delta > 0 {
  199. if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
  200. return nil, connectionErrorf(false, err, "transport: %v", err)
  201. }
  202. }
  203. kp := config.KeepaliveParams
  204. if kp.MaxConnectionIdle == 0 {
  205. kp.MaxConnectionIdle = defaultMaxConnectionIdle
  206. }
  207. if kp.MaxConnectionAge == 0 {
  208. kp.MaxConnectionAge = defaultMaxConnectionAge
  209. }
  210. // Add a jitter to MaxConnectionAge.
  211. kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
  212. if kp.MaxConnectionAgeGrace == 0 {
  213. kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
  214. }
  215. if kp.Time == 0 {
  216. kp.Time = defaultServerKeepaliveTime
  217. }
  218. if kp.Timeout == 0 {
  219. kp.Timeout = defaultServerKeepaliveTimeout
  220. }
  221. if kp.Time != infinity {
  222. if err = syscall.SetTCPUserTimeout(rawConn, kp.Timeout); err != nil {
  223. return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)
  224. }
  225. }
  226. kep := config.KeepalivePolicy
  227. if kep.MinTime == 0 {
  228. kep.MinTime = defaultKeepalivePolicyMinTime
  229. }
  230. done := make(chan struct{})
  231. t := &http2Server{
  232. ctx: setConnection(context.Background(), rawConn),
  233. done: done,
  234. conn: conn,
  235. remoteAddr: conn.RemoteAddr(),
  236. localAddr: conn.LocalAddr(),
  237. authInfo: authInfo,
  238. framer: framer,
  239. readerDone: make(chan struct{}),
  240. writerDone: make(chan struct{}),
  241. maxStreams: config.MaxStreams,
  242. inTapHandle: config.InTapHandle,
  243. fc: &trInFlow{limit: uint32(icwz)},
  244. state: reachable,
  245. activeStreams: make(map[uint32]*Stream),
  246. stats: config.StatsHandlers,
  247. kp: kp,
  248. idle: time.Now(),
  249. kep: kep,
  250. initialWindowSize: iwz,
  251. czData: new(channelzData),
  252. bufferPool: newBufferPool(),
  253. }
  254. t.logger = prefixLoggerForServerTransport(t)
  255. // Add peer information to the http2server context.
  256. t.ctx = peer.NewContext(t.ctx, t.getPeer())
  257. t.controlBuf = newControlBuffer(t.done)
  258. if dynamicWindow {
  259. t.bdpEst = &bdpEstimator{
  260. bdp: initialWindowSize,
  261. updateFlowControl: t.updateFlowControl,
  262. }
  263. }
  264. for _, sh := range t.stats {
  265. t.ctx = sh.TagConn(t.ctx, &stats.ConnTagInfo{
  266. RemoteAddr: t.remoteAddr,
  267. LocalAddr: t.localAddr,
  268. })
  269. connBegin := &stats.ConnBegin{}
  270. sh.HandleConn(t.ctx, connBegin)
  271. }
  272. t.channelzID, err = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
  273. if err != nil {
  274. return nil, err
  275. }
  276. t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1)
  277. t.framer.writer.Flush()
  278. defer func() {
  279. if err != nil {
  280. t.Close(err)
  281. }
  282. }()
  283. // Check the validity of client preface.
  284. preface := make([]byte, len(clientPreface))
  285. if _, err := io.ReadFull(t.conn, preface); err != nil {
  286. // In deployments where a gRPC server runs behind a cloud load balancer
  287. // which performs regular TCP level health checks, the connection is
  288. // closed immediately by the latter. Returning io.EOF here allows the
  289. // grpc server implementation to recognize this scenario and suppress
  290. // logging to reduce spam.
  291. if err == io.EOF {
  292. return nil, io.EOF
  293. }
  294. return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
  295. }
  296. if !bytes.Equal(preface, clientPreface) {
  297. return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
  298. }
  299. frame, err := t.framer.fr.ReadFrame()
  300. if err == io.EOF || err == io.ErrUnexpectedEOF {
  301. return nil, err
  302. }
  303. if err != nil {
  304. return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
  305. }
  306. atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
  307. sf, ok := frame.(*http2.SettingsFrame)
  308. if !ok {
  309. return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
  310. }
  311. t.handleSettings(sf)
  312. go func() {
  313. t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger)
  314. t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
  315. t.loopy.run()
  316. close(t.writerDone)
  317. }()
  318. go t.keepalive()
  319. return t, nil
  320. }
  321. // operateHeaders takes action on the decoded headers. Returns an error if fatal
  322. // error encountered and transport needs to close, otherwise returns nil.
  323. func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) error {
  324. // Acquire max stream ID lock for entire duration
  325. t.maxStreamMu.Lock()
  326. defer t.maxStreamMu.Unlock()
  327. streamID := frame.Header().StreamID
  328. // frame.Truncated is set to true when framer detects that the current header
  329. // list size hits MaxHeaderListSize limit.
  330. if frame.Truncated {
  331. t.controlBuf.put(&cleanupStream{
  332. streamID: streamID,
  333. rst: true,
  334. rstCode: http2.ErrCodeFrameSize,
  335. onWrite: func() {},
  336. })
  337. return nil
  338. }
  339. if streamID%2 != 1 || streamID <= t.maxStreamID {
  340. // illegal gRPC stream id.
  341. return fmt.Errorf("received an illegal stream id: %v. headers frame: %+v", streamID, frame)
  342. }
  343. t.maxStreamID = streamID
  344. buf := newRecvBuffer()
  345. s := &Stream{
  346. id: streamID,
  347. st: t,
  348. buf: buf,
  349. fc: &inFlow{limit: uint32(t.initialWindowSize)},
  350. }
  351. var (
  352. // if false, content-type was missing or invalid
  353. isGRPC = false
  354. contentType = ""
  355. mdata = make(metadata.MD, len(frame.Fields))
  356. httpMethod string
  357. // these are set if an error is encountered while parsing the headers
  358. protocolError bool
  359. headerError *status.Status
  360. timeoutSet bool
  361. timeout time.Duration
  362. )
  363. for _, hf := range frame.Fields {
  364. switch hf.Name {
  365. case "content-type":
  366. contentSubtype, validContentType := grpcutil.ContentSubtype(hf.Value)
  367. if !validContentType {
  368. contentType = hf.Value
  369. break
  370. }
  371. mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
  372. s.contentSubtype = contentSubtype
  373. isGRPC = true
  374. case "grpc-accept-encoding":
  375. mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
  376. if hf.Value == "" {
  377. continue
  378. }
  379. compressors := hf.Value
  380. if s.clientAdvertisedCompressors != "" {
  381. compressors = s.clientAdvertisedCompressors + "," + compressors
  382. }
  383. s.clientAdvertisedCompressors = compressors
  384. case "grpc-encoding":
  385. s.recvCompress = hf.Value
  386. case ":method":
  387. httpMethod = hf.Value
  388. case ":path":
  389. s.method = hf.Value
  390. case "grpc-timeout":
  391. timeoutSet = true
  392. var err error
  393. if timeout, err = decodeTimeout(hf.Value); err != nil {
  394. headerError = status.Newf(codes.Internal, "malformed grpc-timeout: %v", err)
  395. }
  396. // "Transports must consider requests containing the Connection header
  397. // as malformed." - A41
  398. case "connection":
  399. if t.logger.V(logLevel) {
  400. t.logger.Infof("Received a HEADERS frame with a :connection header which makes the request malformed, as per the HTTP/2 spec")
  401. }
  402. protocolError = true
  403. default:
  404. if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) {
  405. break
  406. }
  407. v, err := decodeMetadataHeader(hf.Name, hf.Value)
  408. if err != nil {
  409. headerError = status.Newf(codes.Internal, "malformed binary metadata %q in header %q: %v", hf.Value, hf.Name, err)
  410. t.logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err)
  411. break
  412. }
  413. mdata[hf.Name] = append(mdata[hf.Name], v)
  414. }
  415. }
  416. // "If multiple Host headers or multiple :authority headers are present, the
  417. // request must be rejected with an HTTP status code 400 as required by Host
  418. // validation in RFC 7230 §5.4, gRPC status code INTERNAL, or RST_STREAM
  419. // with HTTP/2 error code PROTOCOL_ERROR." - A41. Since this is a HTTP/2
  420. // error, this takes precedence over a client not speaking gRPC.
  421. if len(mdata[":authority"]) > 1 || len(mdata["host"]) > 1 {
  422. errMsg := fmt.Sprintf("num values of :authority: %v, num values of host: %v, both must only have 1 value as per HTTP/2 spec", len(mdata[":authority"]), len(mdata["host"]))
  423. if t.logger.V(logLevel) {
  424. t.logger.Infof("Aborting the stream early: %v", errMsg)
  425. }
  426. t.controlBuf.put(&earlyAbortStream{
  427. httpStatus: http.StatusBadRequest,
  428. streamID: streamID,
  429. contentSubtype: s.contentSubtype,
  430. status: status.New(codes.Internal, errMsg),
  431. rst: !frame.StreamEnded(),
  432. })
  433. return nil
  434. }
  435. if protocolError {
  436. t.controlBuf.put(&cleanupStream{
  437. streamID: streamID,
  438. rst: true,
  439. rstCode: http2.ErrCodeProtocol,
  440. onWrite: func() {},
  441. })
  442. return nil
  443. }
  444. if !isGRPC {
  445. t.controlBuf.put(&earlyAbortStream{
  446. httpStatus: http.StatusUnsupportedMediaType,
  447. streamID: streamID,
  448. contentSubtype: s.contentSubtype,
  449. status: status.Newf(codes.InvalidArgument, "invalid gRPC request content-type %q", contentType),
  450. rst: !frame.StreamEnded(),
  451. })
  452. return nil
  453. }
  454. if headerError != nil {
  455. t.controlBuf.put(&earlyAbortStream{
  456. httpStatus: http.StatusBadRequest,
  457. streamID: streamID,
  458. contentSubtype: s.contentSubtype,
  459. status: headerError,
  460. rst: !frame.StreamEnded(),
  461. })
  462. return nil
  463. }
  464. // "If :authority is missing, Host must be renamed to :authority." - A41
  465. if len(mdata[":authority"]) == 0 {
  466. // No-op if host isn't present, no eventual :authority header is a valid
  467. // RPC.
  468. if host, ok := mdata["host"]; ok {
  469. mdata[":authority"] = host
  470. delete(mdata, "host")
  471. }
  472. } else {
  473. // "If :authority is present, Host must be discarded" - A41
  474. delete(mdata, "host")
  475. }
  476. if frame.StreamEnded() {
  477. // s is just created by the caller. No lock needed.
  478. s.state = streamReadDone
  479. }
  480. if timeoutSet {
  481. s.ctx, s.cancel = context.WithTimeout(t.ctx, timeout)
  482. } else {
  483. s.ctx, s.cancel = context.WithCancel(t.ctx)
  484. }
  485. // Attach the received metadata to the context.
  486. if len(mdata) > 0 {
  487. s.ctx = metadata.NewIncomingContext(s.ctx, mdata)
  488. if statsTags := mdata["grpc-tags-bin"]; len(statsTags) > 0 {
  489. s.ctx = stats.SetIncomingTags(s.ctx, []byte(statsTags[len(statsTags)-1]))
  490. }
  491. if statsTrace := mdata["grpc-trace-bin"]; len(statsTrace) > 0 {
  492. s.ctx = stats.SetIncomingTrace(s.ctx, []byte(statsTrace[len(statsTrace)-1]))
  493. }
  494. }
  495. t.mu.Lock()
  496. if t.state != reachable {
  497. t.mu.Unlock()
  498. s.cancel()
  499. return nil
  500. }
  501. if uint32(len(t.activeStreams)) >= t.maxStreams {
  502. t.mu.Unlock()
  503. t.controlBuf.put(&cleanupStream{
  504. streamID: streamID,
  505. rst: true,
  506. rstCode: http2.ErrCodeRefusedStream,
  507. onWrite: func() {},
  508. })
  509. s.cancel()
  510. return nil
  511. }
  512. if httpMethod != http.MethodPost {
  513. t.mu.Unlock()
  514. errMsg := fmt.Sprintf("Received a HEADERS frame with :method %q which should be POST", httpMethod)
  515. if t.logger.V(logLevel) {
  516. t.logger.Infof("Aborting the stream early: %v", errMsg)
  517. }
  518. t.controlBuf.put(&earlyAbortStream{
  519. httpStatus: 405,
  520. streamID: streamID,
  521. contentSubtype: s.contentSubtype,
  522. status: status.New(codes.Internal, errMsg),
  523. rst: !frame.StreamEnded(),
  524. })
  525. s.cancel()
  526. return nil
  527. }
  528. if t.inTapHandle != nil {
  529. var err error
  530. if s.ctx, err = t.inTapHandle(s.ctx, &tap.Info{FullMethodName: s.method}); err != nil {
  531. t.mu.Unlock()
  532. if t.logger.V(logLevel) {
  533. t.logger.Infof("Aborting the stream early due to InTapHandle failure: %v", err)
  534. }
  535. stat, ok := status.FromError(err)
  536. if !ok {
  537. stat = status.New(codes.PermissionDenied, err.Error())
  538. }
  539. t.controlBuf.put(&earlyAbortStream{
  540. httpStatus: 200,
  541. streamID: s.id,
  542. contentSubtype: s.contentSubtype,
  543. status: stat,
  544. rst: !frame.StreamEnded(),
  545. })
  546. return nil
  547. }
  548. }
  549. t.activeStreams[streamID] = s
  550. if len(t.activeStreams) == 1 {
  551. t.idle = time.Time{}
  552. }
  553. t.mu.Unlock()
  554. if channelz.IsOn() {
  555. atomic.AddInt64(&t.czData.streamsStarted, 1)
  556. atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
  557. }
  558. s.requestRead = func(n int) {
  559. t.adjustWindow(s, uint32(n))
  560. }
  561. s.ctx = traceCtx(s.ctx, s.method)
  562. for _, sh := range t.stats {
  563. s.ctx = sh.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
  564. inHeader := &stats.InHeader{
  565. FullMethod: s.method,
  566. RemoteAddr: t.remoteAddr,
  567. LocalAddr: t.localAddr,
  568. Compression: s.recvCompress,
  569. WireLength: int(frame.Header().Length),
  570. Header: mdata.Copy(),
  571. }
  572. sh.HandleRPC(s.ctx, inHeader)
  573. }
  574. s.ctxDone = s.ctx.Done()
  575. s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
  576. s.trReader = &transportReader{
  577. reader: &recvBufferReader{
  578. ctx: s.ctx,
  579. ctxDone: s.ctxDone,
  580. recv: s.buf,
  581. freeBuffer: t.bufferPool.put,
  582. },
  583. windowHandler: func(n int) {
  584. t.updateWindow(s, uint32(n))
  585. },
  586. }
  587. // Register the stream with loopy.
  588. t.controlBuf.put(&registerStream{
  589. streamID: s.id,
  590. wq: s.wq,
  591. })
  592. handle(s)
  593. return nil
  594. }
  595. // HandleStreams receives incoming streams using the given handler. This is
  596. // typically run in a separate goroutine.
  597. // traceCtx attaches trace to ctx and returns the new context.
  598. func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
  599. defer close(t.readerDone)
  600. for {
  601. t.controlBuf.throttle()
  602. frame, err := t.framer.fr.ReadFrame()
  603. atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
  604. if err != nil {
  605. if se, ok := err.(http2.StreamError); ok {
  606. if t.logger.V(logLevel) {
  607. t.logger.Warningf("Encountered http2.StreamError: %v", se)
  608. }
  609. t.mu.Lock()
  610. s := t.activeStreams[se.StreamID]
  611. t.mu.Unlock()
  612. if s != nil {
  613. t.closeStream(s, true, se.Code, false)
  614. } else {
  615. t.controlBuf.put(&cleanupStream{
  616. streamID: se.StreamID,
  617. rst: true,
  618. rstCode: se.Code,
  619. onWrite: func() {},
  620. })
  621. }
  622. continue
  623. }
  624. if err == io.EOF || err == io.ErrUnexpectedEOF {
  625. t.Close(err)
  626. return
  627. }
  628. t.Close(err)
  629. return
  630. }
  631. switch frame := frame.(type) {
  632. case *http2.MetaHeadersFrame:
  633. if err := t.operateHeaders(frame, handle, traceCtx); err != nil {
  634. t.Close(err)
  635. break
  636. }
  637. case *http2.DataFrame:
  638. t.handleData(frame)
  639. case *http2.RSTStreamFrame:
  640. t.handleRSTStream(frame)
  641. case *http2.SettingsFrame:
  642. t.handleSettings(frame)
  643. case *http2.PingFrame:
  644. t.handlePing(frame)
  645. case *http2.WindowUpdateFrame:
  646. t.handleWindowUpdate(frame)
  647. case *http2.GoAwayFrame:
  648. // TODO: Handle GoAway from the client appropriately.
  649. default:
  650. if t.logger.V(logLevel) {
  651. t.logger.Infof("Received unsupported frame type %T", frame)
  652. }
  653. }
  654. }
  655. }
  656. func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
  657. t.mu.Lock()
  658. defer t.mu.Unlock()
  659. if t.activeStreams == nil {
  660. // The transport is closing.
  661. return nil, false
  662. }
  663. s, ok := t.activeStreams[f.Header().StreamID]
  664. if !ok {
  665. // The stream is already done.
  666. return nil, false
  667. }
  668. return s, true
  669. }
  670. // adjustWindow sends out extra window update over the initial window size
  671. // of stream if the application is requesting data larger in size than
  672. // the window.
  673. func (t *http2Server) adjustWindow(s *Stream, n uint32) {
  674. if w := s.fc.maybeAdjust(n); w > 0 {
  675. t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
  676. }
  677. }
  678. // updateWindow adjusts the inbound quota for the stream and the transport.
  679. // Window updates will deliver to the controller for sending when
  680. // the cumulative quota exceeds the corresponding threshold.
  681. func (t *http2Server) updateWindow(s *Stream, n uint32) {
  682. if w := s.fc.onRead(n); w > 0 {
  683. t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
  684. increment: w,
  685. })
  686. }
  687. }
  688. // updateFlowControl updates the incoming flow control windows
  689. // for the transport and the stream based on the current bdp
  690. // estimation.
  691. func (t *http2Server) updateFlowControl(n uint32) {
  692. t.mu.Lock()
  693. for _, s := range t.activeStreams {
  694. s.fc.newLimit(n)
  695. }
  696. t.initialWindowSize = int32(n)
  697. t.mu.Unlock()
  698. t.controlBuf.put(&outgoingWindowUpdate{
  699. streamID: 0,
  700. increment: t.fc.newLimit(n),
  701. })
  702. t.controlBuf.put(&outgoingSettings{
  703. ss: []http2.Setting{
  704. {
  705. ID: http2.SettingInitialWindowSize,
  706. Val: n,
  707. },
  708. },
  709. })
  710. }
  711. func (t *http2Server) handleData(f *http2.DataFrame) {
  712. size := f.Header().Length
  713. var sendBDPPing bool
  714. if t.bdpEst != nil {
  715. sendBDPPing = t.bdpEst.add(size)
  716. }
  717. // Decouple connection's flow control from application's read.
  718. // An update on connection's flow control should not depend on
  719. // whether user application has read the data or not. Such a
  720. // restriction is already imposed on the stream's flow control,
  721. // and therefore the sender will be blocked anyways.
  722. // Decoupling the connection flow control will prevent other
  723. // active(fast) streams from starving in presence of slow or
  724. // inactive streams.
  725. if w := t.fc.onData(size); w > 0 {
  726. t.controlBuf.put(&outgoingWindowUpdate{
  727. streamID: 0,
  728. increment: w,
  729. })
  730. }
  731. if sendBDPPing {
  732. // Avoid excessive ping detection (e.g. in an L7 proxy)
  733. // by sending a window update prior to the BDP ping.
  734. if w := t.fc.reset(); w > 0 {
  735. t.controlBuf.put(&outgoingWindowUpdate{
  736. streamID: 0,
  737. increment: w,
  738. })
  739. }
  740. t.controlBuf.put(bdpPing)
  741. }
  742. // Select the right stream to dispatch.
  743. s, ok := t.getStream(f)
  744. if !ok {
  745. return
  746. }
  747. if s.getState() == streamReadDone {
  748. t.closeStream(s, true, http2.ErrCodeStreamClosed, false)
  749. return
  750. }
  751. if size > 0 {
  752. if err := s.fc.onData(size); err != nil {
  753. t.closeStream(s, true, http2.ErrCodeFlowControl, false)
  754. return
  755. }
  756. if f.Header().Flags.Has(http2.FlagDataPadded) {
  757. if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
  758. t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
  759. }
  760. }
  761. // TODO(bradfitz, zhaoq): A copy is required here because there is no
  762. // guarantee f.Data() is consumed before the arrival of next frame.
  763. // Can this copy be eliminated?
  764. if len(f.Data()) > 0 {
  765. buffer := t.bufferPool.get()
  766. buffer.Reset()
  767. buffer.Write(f.Data())
  768. s.write(recvMsg{buffer: buffer})
  769. }
  770. }
  771. if f.StreamEnded() {
  772. // Received the end of stream from the client.
  773. s.compareAndSwapState(streamActive, streamReadDone)
  774. s.write(recvMsg{err: io.EOF})
  775. }
  776. }
  777. func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
  778. // If the stream is not deleted from the transport's active streams map, then do a regular close stream.
  779. if s, ok := t.getStream(f); ok {
  780. t.closeStream(s, false, 0, false)
  781. return
  782. }
  783. // If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map.
  784. t.controlBuf.put(&cleanupStream{
  785. streamID: f.Header().StreamID,
  786. rst: false,
  787. rstCode: 0,
  788. onWrite: func() {},
  789. })
  790. }
  791. func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
  792. if f.IsAck() {
  793. return
  794. }
  795. var ss []http2.Setting
  796. var updateFuncs []func()
  797. f.ForeachSetting(func(s http2.Setting) error {
  798. switch s.ID {
  799. case http2.SettingMaxHeaderListSize:
  800. updateFuncs = append(updateFuncs, func() {
  801. t.maxSendHeaderListSize = new(uint32)
  802. *t.maxSendHeaderListSize = s.Val
  803. })
  804. default:
  805. ss = append(ss, s)
  806. }
  807. return nil
  808. })
  809. t.controlBuf.executeAndPut(func(any) bool {
  810. for _, f := range updateFuncs {
  811. f()
  812. }
  813. return true
  814. }, &incomingSettings{
  815. ss: ss,
  816. })
  817. }
  818. const (
  819. maxPingStrikes = 2
  820. defaultPingTimeout = 2 * time.Hour
  821. )
  822. func (t *http2Server) handlePing(f *http2.PingFrame) {
  823. if f.IsAck() {
  824. if f.Data == goAwayPing.data && t.drainEvent != nil {
  825. t.drainEvent.Fire()
  826. return
  827. }
  828. // Maybe it's a BDP ping.
  829. if t.bdpEst != nil {
  830. t.bdpEst.calculate(f.Data)
  831. }
  832. return
  833. }
  834. pingAck := &ping{ack: true}
  835. copy(pingAck.data[:], f.Data[:])
  836. t.controlBuf.put(pingAck)
  837. now := time.Now()
  838. defer func() {
  839. t.lastPingAt = now
  840. }()
  841. // A reset ping strikes means that we don't need to check for policy
  842. // violation for this ping and the pingStrikes counter should be set
  843. // to 0.
  844. if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
  845. t.pingStrikes = 0
  846. return
  847. }
  848. t.mu.Lock()
  849. ns := len(t.activeStreams)
  850. t.mu.Unlock()
  851. if ns < 1 && !t.kep.PermitWithoutStream {
  852. // Keepalive shouldn't be active thus, this new ping should
  853. // have come after at least defaultPingTimeout.
  854. if t.lastPingAt.Add(defaultPingTimeout).After(now) {
  855. t.pingStrikes++
  856. }
  857. } else {
  858. // Check if keepalive policy is respected.
  859. if t.lastPingAt.Add(t.kep.MinTime).After(now) {
  860. t.pingStrikes++
  861. }
  862. }
  863. if t.pingStrikes > maxPingStrikes {
  864. // Send goaway and close the connection.
  865. t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: errors.New("got too many pings from the client")})
  866. }
  867. }
  868. func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
  869. t.controlBuf.put(&incomingWindowUpdate{
  870. streamID: f.Header().StreamID,
  871. increment: f.Increment,
  872. })
  873. }
  874. func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField {
  875. for k, vv := range md {
  876. if isReservedHeader(k) {
  877. // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
  878. continue
  879. }
  880. for _, v := range vv {
  881. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  882. }
  883. }
  884. return headerFields
  885. }
  886. func (t *http2Server) checkForHeaderListSize(it any) bool {
  887. if t.maxSendHeaderListSize == nil {
  888. return true
  889. }
  890. hdrFrame := it.(*headerFrame)
  891. var sz int64
  892. for _, f := range hdrFrame.hf {
  893. if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
  894. if t.logger.V(logLevel) {
  895. t.logger.Infof("Header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
  896. }
  897. return false
  898. }
  899. }
  900. return true
  901. }
  902. func (t *http2Server) streamContextErr(s *Stream) error {
  903. select {
  904. case <-t.done:
  905. return ErrConnClosing
  906. default:
  907. }
  908. return ContextErr(s.ctx.Err())
  909. }
  910. // WriteHeader sends the header metadata md back to the client.
  911. func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
  912. s.hdrMu.Lock()
  913. defer s.hdrMu.Unlock()
  914. if s.getState() == streamDone {
  915. return t.streamContextErr(s)
  916. }
  917. if s.updateHeaderSent() {
  918. return ErrIllegalHeaderWrite
  919. }
  920. if md.Len() > 0 {
  921. if s.header.Len() > 0 {
  922. s.header = metadata.Join(s.header, md)
  923. } else {
  924. s.header = md
  925. }
  926. }
  927. if err := t.writeHeaderLocked(s); err != nil {
  928. return status.Convert(err).Err()
  929. }
  930. return nil
  931. }
  932. func (t *http2Server) setResetPingStrikes() {
  933. atomic.StoreUint32(&t.resetPingStrikes, 1)
  934. }
  935. func (t *http2Server) writeHeaderLocked(s *Stream) error {
  936. // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
  937. // first and create a slice of that exact size.
  938. headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
  939. headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
  940. headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(s.contentSubtype)})
  941. if s.sendCompress != "" {
  942. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
  943. }
  944. headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
  945. success, err := t.controlBuf.executeAndPut(t.checkForHeaderListSize, &headerFrame{
  946. streamID: s.id,
  947. hf: headerFields,
  948. endStream: false,
  949. onWrite: t.setResetPingStrikes,
  950. })
  951. if !success {
  952. if err != nil {
  953. return err
  954. }
  955. t.closeStream(s, true, http2.ErrCodeInternal, false)
  956. return ErrHeaderListSizeLimitViolation
  957. }
  958. for _, sh := range t.stats {
  959. // Note: Headers are compressed with hpack after this call returns.
  960. // No WireLength field is set here.
  961. outHeader := &stats.OutHeader{
  962. Header: s.header.Copy(),
  963. Compression: s.sendCompress,
  964. }
  965. sh.HandleRPC(s.Context(), outHeader)
  966. }
  967. return nil
  968. }
  969. // WriteStatus sends stream status to the client and terminates the stream.
  970. // There is no further I/O operations being able to perform on this stream.
  971. // TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
  972. // OK is adopted.
  973. func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
  974. s.hdrMu.Lock()
  975. defer s.hdrMu.Unlock()
  976. if s.getState() == streamDone {
  977. return nil
  978. }
  979. // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
  980. // first and create a slice of that exact size.
  981. headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
  982. if !s.updateHeaderSent() { // No headers have been sent.
  983. if len(s.header) > 0 { // Send a separate header frame.
  984. if err := t.writeHeaderLocked(s); err != nil {
  985. return err
  986. }
  987. } else { // Send a trailer only response.
  988. headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
  989. headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(s.contentSubtype)})
  990. }
  991. }
  992. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
  993. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
  994. if p := st.Proto(); p != nil && len(p.Details) > 0 {
  995. stBytes, err := proto.Marshal(p)
  996. if err != nil {
  997. // TODO: return error instead, when callers are able to handle it.
  998. t.logger.Errorf("Failed to marshal rpc status: %s, error: %v", pretty.ToJSON(p), err)
  999. } else {
  1000. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
  1001. }
  1002. }
  1003. // Attach the trailer metadata.
  1004. headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer)
  1005. trailingHeader := &headerFrame{
  1006. streamID: s.id,
  1007. hf: headerFields,
  1008. endStream: true,
  1009. onWrite: t.setResetPingStrikes,
  1010. }
  1011. success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
  1012. if !success {
  1013. if err != nil {
  1014. return err
  1015. }
  1016. t.closeStream(s, true, http2.ErrCodeInternal, false)
  1017. return ErrHeaderListSizeLimitViolation
  1018. }
  1019. // Send a RST_STREAM after the trailers if the client has not already half-closed.
  1020. rst := s.getState() == streamActive
  1021. t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
  1022. for _, sh := range t.stats {
  1023. // Note: The trailer fields are compressed with hpack after this call returns.
  1024. // No WireLength field is set here.
  1025. sh.HandleRPC(s.Context(), &stats.OutTrailer{
  1026. Trailer: s.trailer.Copy(),
  1027. })
  1028. }
  1029. return nil
  1030. }
  1031. // Write converts the data into HTTP2 data frame and sends it out. Non-nil error
  1032. // is returns if it fails (e.g., framing error, transport error).
  1033. func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
  1034. if !s.isHeaderSent() { // Headers haven't been written yet.
  1035. if err := t.WriteHeader(s, nil); err != nil {
  1036. return err
  1037. }
  1038. } else {
  1039. // Writing headers checks for this condition.
  1040. if s.getState() == streamDone {
  1041. return t.streamContextErr(s)
  1042. }
  1043. }
  1044. df := &dataFrame{
  1045. streamID: s.id,
  1046. h: hdr,
  1047. d: data,
  1048. onEachWrite: t.setResetPingStrikes,
  1049. }
  1050. if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
  1051. return t.streamContextErr(s)
  1052. }
  1053. return t.controlBuf.put(df)
  1054. }
  1055. // keepalive running in a separate goroutine does the following:
  1056. // 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
  1057. // 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
  1058. // 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
  1059. // 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
  1060. // after an additional duration of keepalive.Timeout.
  1061. func (t *http2Server) keepalive() {
  1062. p := &ping{}
  1063. // True iff a ping has been sent, and no data has been received since then.
  1064. outstandingPing := false
  1065. // Amount of time remaining before which we should receive an ACK for the
  1066. // last sent ping.
  1067. kpTimeoutLeft := time.Duration(0)
  1068. // Records the last value of t.lastRead before we go block on the timer.
  1069. // This is required to check for read activity since then.
  1070. prevNano := time.Now().UnixNano()
  1071. // Initialize the different timers to their default values.
  1072. idleTimer := time.NewTimer(t.kp.MaxConnectionIdle)
  1073. ageTimer := time.NewTimer(t.kp.MaxConnectionAge)
  1074. kpTimer := time.NewTimer(t.kp.Time)
  1075. defer func() {
  1076. // We need to drain the underlying channel in these timers after a call
  1077. // to Stop(), only if we are interested in resetting them. Clearly we
  1078. // are not interested in resetting them here.
  1079. idleTimer.Stop()
  1080. ageTimer.Stop()
  1081. kpTimer.Stop()
  1082. }()
  1083. for {
  1084. select {
  1085. case <-idleTimer.C:
  1086. t.mu.Lock()
  1087. idle := t.idle
  1088. if idle.IsZero() { // The connection is non-idle.
  1089. t.mu.Unlock()
  1090. idleTimer.Reset(t.kp.MaxConnectionIdle)
  1091. continue
  1092. }
  1093. val := t.kp.MaxConnectionIdle - time.Since(idle)
  1094. t.mu.Unlock()
  1095. if val <= 0 {
  1096. // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
  1097. // Gracefully close the connection.
  1098. t.Drain("max_idle")
  1099. return
  1100. }
  1101. idleTimer.Reset(val)
  1102. case <-ageTimer.C:
  1103. t.Drain("max_age")
  1104. ageTimer.Reset(t.kp.MaxConnectionAgeGrace)
  1105. select {
  1106. case <-ageTimer.C:
  1107. // Close the connection after grace period.
  1108. if t.logger.V(logLevel) {
  1109. t.logger.Infof("Closing server transport due to maximum connection age")
  1110. }
  1111. t.controlBuf.put(closeConnection{})
  1112. case <-t.done:
  1113. }
  1114. return
  1115. case <-kpTimer.C:
  1116. lastRead := atomic.LoadInt64(&t.lastRead)
  1117. if lastRead > prevNano {
  1118. // There has been read activity since the last time we were
  1119. // here. Setup the timer to fire at kp.Time seconds from
  1120. // lastRead time and continue.
  1121. outstandingPing = false
  1122. kpTimer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
  1123. prevNano = lastRead
  1124. continue
  1125. }
  1126. if outstandingPing && kpTimeoutLeft <= 0 {
  1127. t.Close(fmt.Errorf("keepalive ping not acked within timeout %s", t.kp.Time))
  1128. return
  1129. }
  1130. if !outstandingPing {
  1131. if channelz.IsOn() {
  1132. atomic.AddInt64(&t.czData.kpCount, 1)
  1133. }
  1134. t.controlBuf.put(p)
  1135. kpTimeoutLeft = t.kp.Timeout
  1136. outstandingPing = true
  1137. }
  1138. // The amount of time to sleep here is the minimum of kp.Time and
  1139. // timeoutLeft. This will ensure that we wait only for kp.Time
  1140. // before sending out the next ping (for cases where the ping is
  1141. // acked).
  1142. sleepDuration := minTime(t.kp.Time, kpTimeoutLeft)
  1143. kpTimeoutLeft -= sleepDuration
  1144. kpTimer.Reset(sleepDuration)
  1145. case <-t.done:
  1146. return
  1147. }
  1148. }
  1149. }
  1150. // Close starts shutting down the http2Server transport.
  1151. // TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
  1152. // could cause some resource issue. Revisit this later.
  1153. func (t *http2Server) Close(err error) {
  1154. t.mu.Lock()
  1155. if t.state == closing {
  1156. t.mu.Unlock()
  1157. return
  1158. }
  1159. if t.logger.V(logLevel) {
  1160. t.logger.Infof("Closing: %v", err)
  1161. }
  1162. t.state = closing
  1163. streams := t.activeStreams
  1164. t.activeStreams = nil
  1165. t.mu.Unlock()
  1166. t.controlBuf.finish()
  1167. close(t.done)
  1168. if err := t.conn.Close(); err != nil && t.logger.V(logLevel) {
  1169. t.logger.Infof("Error closing underlying net.Conn during Close: %v", err)
  1170. }
  1171. channelz.RemoveEntry(t.channelzID)
  1172. // Cancel all active streams.
  1173. for _, s := range streams {
  1174. s.cancel()
  1175. }
  1176. for _, sh := range t.stats {
  1177. connEnd := &stats.ConnEnd{}
  1178. sh.HandleConn(t.ctx, connEnd)
  1179. }
  1180. }
  1181. // deleteStream deletes the stream s from transport's active streams.
  1182. func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
  1183. t.mu.Lock()
  1184. if _, ok := t.activeStreams[s.id]; ok {
  1185. delete(t.activeStreams, s.id)
  1186. if len(t.activeStreams) == 0 {
  1187. t.idle = time.Now()
  1188. }
  1189. }
  1190. t.mu.Unlock()
  1191. if channelz.IsOn() {
  1192. if eosReceived {
  1193. atomic.AddInt64(&t.czData.streamsSucceeded, 1)
  1194. } else {
  1195. atomic.AddInt64(&t.czData.streamsFailed, 1)
  1196. }
  1197. }
  1198. }
  1199. // finishStream closes the stream and puts the trailing headerFrame into controlbuf.
  1200. func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
  1201. // In case stream sending and receiving are invoked in separate
  1202. // goroutines (e.g., bi-directional streaming), cancel needs to be
  1203. // called to interrupt the potential blocking on other goroutines.
  1204. s.cancel()
  1205. oldState := s.swapState(streamDone)
  1206. if oldState == streamDone {
  1207. // If the stream was already done, return.
  1208. return
  1209. }
  1210. hdr.cleanup = &cleanupStream{
  1211. streamID: s.id,
  1212. rst: rst,
  1213. rstCode: rstCode,
  1214. onWrite: func() {
  1215. t.deleteStream(s, eosReceived)
  1216. },
  1217. }
  1218. t.controlBuf.put(hdr)
  1219. }
  1220. // closeStream clears the footprint of a stream when the stream is not needed any more.
  1221. func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
  1222. // In case stream sending and receiving are invoked in separate
  1223. // goroutines (e.g., bi-directional streaming), cancel needs to be
  1224. // called to interrupt the potential blocking on other goroutines.
  1225. s.cancel()
  1226. s.swapState(streamDone)
  1227. t.deleteStream(s, eosReceived)
  1228. t.controlBuf.put(&cleanupStream{
  1229. streamID: s.id,
  1230. rst: rst,
  1231. rstCode: rstCode,
  1232. onWrite: func() {},
  1233. })
  1234. }
  1235. func (t *http2Server) RemoteAddr() net.Addr {
  1236. return t.remoteAddr
  1237. }
  1238. func (t *http2Server) Drain(debugData string) {
  1239. t.mu.Lock()
  1240. defer t.mu.Unlock()
  1241. if t.drainEvent != nil {
  1242. return
  1243. }
  1244. t.drainEvent = grpcsync.NewEvent()
  1245. t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte(debugData), headsUp: true})
  1246. }
  1247. var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
  1248. // Handles outgoing GoAway and returns true if loopy needs to put itself
  1249. // in draining mode.
  1250. func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
  1251. t.maxStreamMu.Lock()
  1252. t.mu.Lock()
  1253. if t.state == closing { // TODO(mmukhi): This seems unnecessary.
  1254. t.mu.Unlock()
  1255. t.maxStreamMu.Unlock()
  1256. // The transport is closing.
  1257. return false, ErrConnClosing
  1258. }
  1259. if !g.headsUp {
  1260. // Stop accepting more streams now.
  1261. t.state = draining
  1262. sid := t.maxStreamID
  1263. retErr := g.closeConn
  1264. if len(t.activeStreams) == 0 {
  1265. retErr = errors.New("second GOAWAY written and no active streams left to process")
  1266. }
  1267. t.mu.Unlock()
  1268. t.maxStreamMu.Unlock()
  1269. if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
  1270. return false, err
  1271. }
  1272. if retErr != nil {
  1273. return false, retErr
  1274. }
  1275. return true, nil
  1276. }
  1277. t.mu.Unlock()
  1278. t.maxStreamMu.Unlock()
  1279. // For a graceful close, send out a GoAway with stream ID of MaxUInt32,
  1280. // Follow that with a ping and wait for the ack to come back or a timer
  1281. // to expire. During this time accept new streams since they might have
  1282. // originated before the GoAway reaches the client.
  1283. // After getting the ack or timer expiration send out another GoAway this
  1284. // time with an ID of the max stream server intends to process.
  1285. if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, g.debugData); err != nil {
  1286. return false, err
  1287. }
  1288. if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
  1289. return false, err
  1290. }
  1291. go func() {
  1292. timer := time.NewTimer(time.Minute)
  1293. defer timer.Stop()
  1294. select {
  1295. case <-t.drainEvent.Done():
  1296. case <-timer.C:
  1297. case <-t.done:
  1298. return
  1299. }
  1300. t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
  1301. }()
  1302. return false, nil
  1303. }
  1304. func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
  1305. s := channelz.SocketInternalMetric{
  1306. StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted),
  1307. StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded),
  1308. StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed),
  1309. MessagesSent: atomic.LoadInt64(&t.czData.msgSent),
  1310. MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv),
  1311. KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount),
  1312. LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
  1313. LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
  1314. LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
  1315. LocalFlowControlWindow: int64(t.fc.getSize()),
  1316. SocketOptions: channelz.GetSocketOption(t.conn),
  1317. LocalAddr: t.localAddr,
  1318. RemoteAddr: t.remoteAddr,
  1319. // RemoteName :
  1320. }
  1321. if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
  1322. s.Security = au.GetSecurityValue()
  1323. }
  1324. s.RemoteFlowControlWindow = t.getOutFlowWindow()
  1325. return &s
  1326. }
  1327. func (t *http2Server) IncrMsgSent() {
  1328. atomic.AddInt64(&t.czData.msgSent, 1)
  1329. atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
  1330. }
  1331. func (t *http2Server) IncrMsgRecv() {
  1332. atomic.AddInt64(&t.czData.msgRecv, 1)
  1333. atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
  1334. }
  1335. func (t *http2Server) getOutFlowWindow() int64 {
  1336. resp := make(chan uint32, 1)
  1337. timer := time.NewTimer(time.Second)
  1338. defer timer.Stop()
  1339. t.controlBuf.put(&outFlowControlSizeRequest{resp})
  1340. select {
  1341. case sz := <-resp:
  1342. return int64(sz)
  1343. case <-t.done:
  1344. return -1
  1345. case <-timer.C:
  1346. return -2
  1347. }
  1348. }
  1349. func (t *http2Server) getPeer() *peer.Peer {
  1350. return &peer.Peer{
  1351. Addr: t.remoteAddr,
  1352. AuthInfo: t.authInfo, // Can be nil
  1353. }
  1354. }
  1355. func getJitter(v time.Duration) time.Duration {
  1356. if v == infinity {
  1357. return 0
  1358. }
  1359. // Generate a jitter between +/- 10% of the value.
  1360. r := int64(v / 10)
  1361. j := grpcrand.Int63n(2*r) - r
  1362. return time.Duration(j)
  1363. }
  1364. type connectionKey struct{}
  1365. // GetConnection gets the connection from the context.
  1366. func GetConnection(ctx context.Context) net.Conn {
  1367. conn, _ := ctx.Value(connectionKey{}).(net.Conn)
  1368. return conn
  1369. }
  1370. // SetConnection adds the connection to the context to be able to get
  1371. // information about the destination ip and port for an incoming RPC. This also
  1372. // allows any unary or streaming interceptors to see the connection.
  1373. func setConnection(ctx context.Context, conn net.Conn) context.Context {
  1374. return context.WithValue(ctx, connectionKey{}, conn)
  1375. }