|
@@ -581,9 +581,11 @@ type serverConn struct {
|
|
|
advMaxStreams uint32 // our SETTINGS_MAX_CONCURRENT_STREAMS advertised the client
|
|
|
curClientStreams uint32 // number of open streams initiated by the client
|
|
|
curPushedStreams uint32 // number of open streams initiated by server push
|
|
|
+ curHandlers uint32 // number of running handler goroutines
|
|
|
maxClientStreamID uint32 // max ever seen from client (odd), or 0 if there have been no client requests
|
|
|
maxPushPromiseID uint32 // ID of the last push promise (even), or 0 if there have been no pushes
|
|
|
streams map[uint32]*stream
|
|
|
+ unstartedHandlers []unstartedHandler
|
|
|
initialStreamSendWindowSize int32
|
|
|
maxFrameSize int32
|
|
|
peerMaxHeaderListSize uint32 // zero means unknown (default)
|
|
@@ -981,6 +983,8 @@ func (sc *serverConn) serve() {
|
|
|
return
|
|
|
case gracefulShutdownMsg:
|
|
|
sc.startGracefulShutdownInternal()
|
|
|
+ case handlerDoneMsg:
|
|
|
+ sc.handlerDone()
|
|
|
default:
|
|
|
panic("unknown timer")
|
|
|
}
|
|
@@ -1012,14 +1016,6 @@ func (sc *serverConn) serve() {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (sc *serverConn) awaitGracefulShutdown(sharedCh <-chan struct{}, privateCh chan struct{}) {
|
|
|
- select {
|
|
|
- case <-sc.doneServing:
|
|
|
- case <-sharedCh:
|
|
|
- close(privateCh)
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
type serverMessage int
|
|
|
|
|
|
// Message values sent to serveMsgCh.
|
|
@@ -1028,6 +1024,7 @@ var (
|
|
|
idleTimerMsg = new(serverMessage)
|
|
|
shutdownTimerMsg = new(serverMessage)
|
|
|
gracefulShutdownMsg = new(serverMessage)
|
|
|
+ handlerDoneMsg = new(serverMessage)
|
|
|
)
|
|
|
|
|
|
func (sc *serverConn) onSettingsTimer() { sc.sendServeMsg(settingsTimerMsg) }
|
|
@@ -1900,9 +1897,11 @@ func (st *stream) copyTrailersToHandlerRequest() {
|
|
|
// onReadTimeout is run on its own goroutine (from time.AfterFunc)
|
|
|
// when the stream's ReadTimeout has fired.
|
|
|
func (st *stream) onReadTimeout() {
|
|
|
- // Wrap the ErrDeadlineExceeded to avoid callers depending on us
|
|
|
- // returning the bare error.
|
|
|
- st.body.CloseWithError(fmt.Errorf("%w", os.ErrDeadlineExceeded))
|
|
|
+ if st.body != nil {
|
|
|
+ // Wrap the ErrDeadlineExceeded to avoid callers depending on us
|
|
|
+ // returning the bare error.
|
|
|
+ st.body.CloseWithError(fmt.Errorf("%w", os.ErrDeadlineExceeded))
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// onWriteTimeout is run on its own goroutine (from time.AfterFunc)
|
|
@@ -2020,13 +2019,10 @@ func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error {
|
|
|
// (in Go 1.8), though. That's a more sane option anyway.
|
|
|
if sc.hs.ReadTimeout != 0 {
|
|
|
sc.conn.SetReadDeadline(time.Time{})
|
|
|
- if st.body != nil {
|
|
|
- st.readDeadline = time.AfterFunc(sc.hs.ReadTimeout, st.onReadTimeout)
|
|
|
- }
|
|
|
+ st.readDeadline = time.AfterFunc(sc.hs.ReadTimeout, st.onReadTimeout)
|
|
|
}
|
|
|
|
|
|
- go sc.runHandler(rw, req, handler)
|
|
|
- return nil
|
|
|
+ return sc.scheduleHandler(id, rw, req, handler)
|
|
|
}
|
|
|
|
|
|
func (sc *serverConn) upgradeRequest(req *http.Request) {
|
|
@@ -2046,6 +2042,10 @@ func (sc *serverConn) upgradeRequest(req *http.Request) {
|
|
|
sc.conn.SetReadDeadline(time.Time{})
|
|
|
}
|
|
|
|
|
|
+ // This is the first request on the connection,
|
|
|
+ // so start the handler directly rather than going
|
|
|
+ // through scheduleHandler.
|
|
|
+ sc.curHandlers++
|
|
|
go sc.runHandler(rw, req, sc.handler.ServeHTTP)
|
|
|
}
|
|
|
|
|
@@ -2286,8 +2286,62 @@ func (sc *serverConn) newResponseWriter(st *stream, req *http.Request) *response
|
|
|
return &responseWriter{rws: rws}
|
|
|
}
|
|
|
|
|
|
+type unstartedHandler struct {
|
|
|
+ streamID uint32
|
|
|
+ rw *responseWriter
|
|
|
+ req *http.Request
|
|
|
+ handler func(http.ResponseWriter, *http.Request)
|
|
|
+}
|
|
|
+
|
|
|
+// scheduleHandler starts a handler goroutine,
|
|
|
+// or schedules one to start as soon as an existing handler finishes.
|
|
|
+func (sc *serverConn) scheduleHandler(streamID uint32, rw *responseWriter, req *http.Request, handler func(http.ResponseWriter, *http.Request)) error {
|
|
|
+ sc.serveG.check()
|
|
|
+ maxHandlers := sc.advMaxStreams
|
|
|
+ if sc.curHandlers < maxHandlers {
|
|
|
+ sc.curHandlers++
|
|
|
+ go sc.runHandler(rw, req, handler)
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ if len(sc.unstartedHandlers) > int(4*sc.advMaxStreams) {
|
|
|
+ return sc.countError("too_many_early_resets", ConnectionError(ErrCodeEnhanceYourCalm))
|
|
|
+ }
|
|
|
+ sc.unstartedHandlers = append(sc.unstartedHandlers, unstartedHandler{
|
|
|
+ streamID: streamID,
|
|
|
+ rw: rw,
|
|
|
+ req: req,
|
|
|
+ handler: handler,
|
|
|
+ })
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (sc *serverConn) handlerDone() {
|
|
|
+ sc.serveG.check()
|
|
|
+ sc.curHandlers--
|
|
|
+ i := 0
|
|
|
+ maxHandlers := sc.advMaxStreams
|
|
|
+ for ; i < len(sc.unstartedHandlers); i++ {
|
|
|
+ u := sc.unstartedHandlers[i]
|
|
|
+ if sc.streams[u.streamID] == nil {
|
|
|
+ // This stream was reset before its goroutine had a chance to start.
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ if sc.curHandlers >= maxHandlers {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ sc.curHandlers++
|
|
|
+ go sc.runHandler(u.rw, u.req, u.handler)
|
|
|
+ sc.unstartedHandlers[i] = unstartedHandler{} // don't retain references
|
|
|
+ }
|
|
|
+ sc.unstartedHandlers = sc.unstartedHandlers[i:]
|
|
|
+ if len(sc.unstartedHandlers) == 0 {
|
|
|
+ sc.unstartedHandlers = nil
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
// Run on its own goroutine.
|
|
|
func (sc *serverConn) runHandler(rw *responseWriter, req *http.Request, handler func(http.ResponseWriter, *http.Request)) {
|
|
|
+ defer sc.sendServeMsg(handlerDoneMsg)
|
|
|
didPanic := true
|
|
|
defer func() {
|
|
|
rw.rws.stream.cancelCtx()
|