|
@@ -622,7 +622,9 @@ type stream struct {
|
|
resetQueued bool // RST_STREAM queued for write; set by sc.resetStream
|
|
resetQueued bool // RST_STREAM queued for write; set by sc.resetStream
|
|
gotTrailerHeader bool // HEADER frame for trailers was seen
|
|
gotTrailerHeader bool // HEADER frame for trailers was seen
|
|
wroteHeaders bool // whether we wrote headers (not status 100)
|
|
wroteHeaders bool // whether we wrote headers (not status 100)
|
|
|
|
+ readDeadline *time.Timer // nil if unused
|
|
writeDeadline *time.Timer // nil if unused
|
|
writeDeadline *time.Timer // nil if unused
|
|
|
|
+ closeErr error // set before cw is closed
|
|
|
|
|
|
trailer http.Header // accumulated trailers
|
|
trailer http.Header // accumulated trailers
|
|
reqTrailer http.Header // handler's Request.Trailer
|
|
reqTrailer http.Header // handler's Request.Trailer
|
|
@@ -869,7 +871,9 @@ func (sc *serverConn) serve() {
|
|
|
|
|
|
// Each connection starts with initialWindowSize inflow tokens.
|
|
// Each connection starts with initialWindowSize inflow tokens.
|
|
// If a higher value is configured, we add more tokens.
|
|
// If a higher value is configured, we add more tokens.
|
|
- sc.sendWindowUpdate(nil)
|
|
|
|
|
|
+ if diff := sc.srv.initialConnRecvWindowSize() - initialWindowSize; diff > 0 {
|
|
|
|
+ sc.sendWindowUpdate(nil, int(diff))
|
|
|
|
+ }
|
|
|
|
|
|
if err := sc.readPreface(); err != nil {
|
|
if err := sc.readPreface(); err != nil {
|
|
sc.condlogf(err, "http2: server: error reading preface from client %v: %v", sc.conn.RemoteAddr(), err)
|
|
sc.condlogf(err, "http2: server: error reading preface from client %v: %v", sc.conn.RemoteAddr(), err)
|
|
@@ -946,6 +950,8 @@ func (sc *serverConn) serve() {
|
|
}
|
|
}
|
|
case *startPushRequest:
|
|
case *startPushRequest:
|
|
sc.startPush(v)
|
|
sc.startPush(v)
|
|
|
|
+ case func(*serverConn):
|
|
|
|
+ v(sc)
|
|
default:
|
|
default:
|
|
panic(fmt.Sprintf("unexpected type %T", v))
|
|
panic(fmt.Sprintf("unexpected type %T", v))
|
|
}
|
|
}
|
|
@@ -1459,6 +1465,21 @@ func (sc *serverConn) processFrame(f Frame) error {
|
|
sc.sawFirstSettings = true
|
|
sc.sawFirstSettings = true
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ // Discard frames for streams initiated after the identified last
|
|
|
|
+ // stream sent in a GOAWAY, or all frames after sending an error.
|
|
|
|
+ // We still need to return connection-level flow control for DATA frames.
|
|
|
|
+ // RFC 9113 Section 6.8.
|
|
|
|
+ if sc.inGoAway && (sc.goAwayCode != ErrCodeNo || f.Header().StreamID > sc.maxClientStreamID) {
|
|
|
|
+
|
|
|
|
+ if f, ok := f.(*DataFrame); ok {
|
|
|
|
+ if sc.inflow.available() < int32(f.Length) {
|
|
|
|
+ return sc.countError("data_flow", streamError(f.Header().StreamID, ErrCodeFlowControl))
|
|
|
|
+ }
|
|
|
|
+ sc.sendWindowUpdate(nil, int(f.Length)) // conn-level
|
|
|
|
+ }
|
|
|
|
+ return nil
|
|
|
|
+ }
|
|
|
|
+
|
|
switch f := f.(type) {
|
|
switch f := f.(type) {
|
|
case *SettingsFrame:
|
|
case *SettingsFrame:
|
|
return sc.processSettings(f)
|
|
return sc.processSettings(f)
|
|
@@ -1501,9 +1522,6 @@ func (sc *serverConn) processPing(f *PingFrame) error {
|
|
// PROTOCOL_ERROR."
|
|
// PROTOCOL_ERROR."
|
|
return sc.countError("ping_on_stream", ConnectionError(ErrCodeProtocol))
|
|
return sc.countError("ping_on_stream", ConnectionError(ErrCodeProtocol))
|
|
}
|
|
}
|
|
- if sc.inGoAway && sc.goAwayCode != ErrCodeNo {
|
|
|
|
- return nil
|
|
|
|
- }
|
|
|
|
sc.writeFrame(FrameWriteRequest{write: writePingAck{f}})
|
|
sc.writeFrame(FrameWriteRequest{write: writePingAck{f}})
|
|
return nil
|
|
return nil
|
|
}
|
|
}
|
|
@@ -1565,6 +1583,9 @@ func (sc *serverConn) closeStream(st *stream, err error) {
|
|
panic(fmt.Sprintf("invariant; can't close stream in state %v", st.state))
|
|
panic(fmt.Sprintf("invariant; can't close stream in state %v", st.state))
|
|
}
|
|
}
|
|
st.state = stateClosed
|
|
st.state = stateClosed
|
|
|
|
+ if st.readDeadline != nil {
|
|
|
|
+ st.readDeadline.Stop()
|
|
|
|
+ }
|
|
if st.writeDeadline != nil {
|
|
if st.writeDeadline != nil {
|
|
st.writeDeadline.Stop()
|
|
st.writeDeadline.Stop()
|
|
}
|
|
}
|
|
@@ -1586,10 +1607,18 @@ func (sc *serverConn) closeStream(st *stream, err error) {
|
|
if p := st.body; p != nil {
|
|
if p := st.body; p != nil {
|
|
// Return any buffered unread bytes worth of conn-level flow control.
|
|
// Return any buffered unread bytes worth of conn-level flow control.
|
|
// See golang.org/issue/16481
|
|
// See golang.org/issue/16481
|
|
- sc.sendWindowUpdate(nil)
|
|
|
|
|
|
+ sc.sendWindowUpdate(nil, p.Len())
|
|
|
|
|
|
p.CloseWithError(err)
|
|
p.CloseWithError(err)
|
|
}
|
|
}
|
|
|
|
+ if e, ok := err.(StreamError); ok {
|
|
|
|
+ if e.Cause != nil {
|
|
|
|
+ err = e.Cause
|
|
|
|
+ } else {
|
|
|
|
+ err = errStreamClosed
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ st.closeErr = err
|
|
st.cw.Close() // signals Handler's CloseNotifier, unblocks writes, etc
|
|
st.cw.Close() // signals Handler's CloseNotifier, unblocks writes, etc
|
|
sc.writeSched.CloseStream(st.id)
|
|
sc.writeSched.CloseStream(st.id)
|
|
}
|
|
}
|
|
@@ -1686,16 +1715,6 @@ func (sc *serverConn) processSettingInitialWindowSize(val uint32) error {
|
|
func (sc *serverConn) processData(f *DataFrame) error {
|
|
func (sc *serverConn) processData(f *DataFrame) error {
|
|
sc.serveG.check()
|
|
sc.serveG.check()
|
|
id := f.Header().StreamID
|
|
id := f.Header().StreamID
|
|
- if sc.inGoAway && (sc.goAwayCode != ErrCodeNo || id > sc.maxClientStreamID) {
|
|
|
|
- // Discard all DATA frames if the GOAWAY is due to an
|
|
|
|
- // error, or:
|
|
|
|
- //
|
|
|
|
- // Section 6.8: After sending a GOAWAY frame, the sender
|
|
|
|
- // can discard frames for streams initiated by the
|
|
|
|
- // receiver with identifiers higher than the identified
|
|
|
|
- // last stream.
|
|
|
|
- return nil
|
|
|
|
- }
|
|
|
|
|
|
|
|
data := f.Data()
|
|
data := f.Data()
|
|
state, st := sc.state(id)
|
|
state, st := sc.state(id)
|
|
@@ -1734,7 +1753,7 @@ func (sc *serverConn) processData(f *DataFrame) error {
|
|
// sendWindowUpdate, which also schedules sending the
|
|
// sendWindowUpdate, which also schedules sending the
|
|
// frames.
|
|
// frames.
|
|
sc.inflow.take(int32(f.Length))
|
|
sc.inflow.take(int32(f.Length))
|
|
- sc.sendWindowUpdate(nil) // conn-level
|
|
|
|
|
|
+ sc.sendWindowUpdate(nil, int(f.Length)) // conn-level
|
|
|
|
|
|
if st != nil && st.resetQueued {
|
|
if st != nil && st.resetQueued {
|
|
// Already have a stream error in flight. Don't send another.
|
|
// Already have a stream error in flight. Don't send another.
|
|
@@ -1752,7 +1771,7 @@ func (sc *serverConn) processData(f *DataFrame) error {
|
|
return sc.countError("data_flow", streamError(id, ErrCodeFlowControl))
|
|
return sc.countError("data_flow", streamError(id, ErrCodeFlowControl))
|
|
}
|
|
}
|
|
sc.inflow.take(int32(f.Length))
|
|
sc.inflow.take(int32(f.Length))
|
|
- sc.sendWindowUpdate(nil) // conn-level
|
|
|
|
|
|
+ sc.sendWindowUpdate(nil, int(f.Length)) // conn-level
|
|
|
|
|
|
st.body.CloseWithError(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes))
|
|
st.body.CloseWithError(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes))
|
|
// RFC 7540, sec 8.1.2.6: A request or response is also malformed if the
|
|
// RFC 7540, sec 8.1.2.6: A request or response is also malformed if the
|
|
@@ -1770,7 +1789,7 @@ func (sc *serverConn) processData(f *DataFrame) error {
|
|
if len(data) > 0 {
|
|
if len(data) > 0 {
|
|
wrote, err := st.body.Write(data)
|
|
wrote, err := st.body.Write(data)
|
|
if err != nil {
|
|
if err != nil {
|
|
- sc.sendWindowUpdate32(nil, int32(f.Length)-int32(wrote))
|
|
|
|
|
|
+ sc.sendWindowUpdate(nil, int(f.Length)-wrote)
|
|
return sc.countError("body_write_err", streamError(id, ErrCodeStreamClosed))
|
|
return sc.countError("body_write_err", streamError(id, ErrCodeStreamClosed))
|
|
}
|
|
}
|
|
if wrote != len(data) {
|
|
if wrote != len(data) {
|
|
@@ -1838,19 +1857,27 @@ func (st *stream) copyTrailersToHandlerRequest() {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+// onReadTimeout is run on its own goroutine (from time.AfterFunc)
|
|
|
|
+// when the stream's ReadTimeout has fired.
|
|
|
|
+func (st *stream) onReadTimeout() {
|
|
|
|
+ // Wrap the ErrDeadlineExceeded to avoid callers depending on us
|
|
|
|
+ // returning the bare error.
|
|
|
|
+ st.body.CloseWithError(fmt.Errorf("%w", os.ErrDeadlineExceeded))
|
|
|
|
+}
|
|
|
|
+
|
|
// onWriteTimeout is run on its own goroutine (from time.AfterFunc)
|
|
// onWriteTimeout is run on its own goroutine (from time.AfterFunc)
|
|
// when the stream's WriteTimeout has fired.
|
|
// when the stream's WriteTimeout has fired.
|
|
func (st *stream) onWriteTimeout() {
|
|
func (st *stream) onWriteTimeout() {
|
|
- st.sc.writeFrameFromHandler(FrameWriteRequest{write: streamError(st.id, ErrCodeInternal)})
|
|
|
|
|
|
+ st.sc.writeFrameFromHandler(FrameWriteRequest{write: StreamError{
|
|
|
|
+ StreamID: st.id,
|
|
|
|
+ Code: ErrCodeInternal,
|
|
|
|
+ Cause: os.ErrDeadlineExceeded,
|
|
|
|
+ }})
|
|
}
|
|
}
|
|
|
|
|
|
func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error {
|
|
func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error {
|
|
sc.serveG.check()
|
|
sc.serveG.check()
|
|
id := f.StreamID
|
|
id := f.StreamID
|
|
- if sc.inGoAway {
|
|
|
|
- // Ignore.
|
|
|
|
- return nil
|
|
|
|
- }
|
|
|
|
// http://tools.ietf.org/html/rfc7540#section-5.1.1
|
|
// http://tools.ietf.org/html/rfc7540#section-5.1.1
|
|
// Streams initiated by a client MUST use odd-numbered stream
|
|
// Streams initiated by a client MUST use odd-numbered stream
|
|
// identifiers. [...] An endpoint that receives an unexpected
|
|
// identifiers. [...] An endpoint that receives an unexpected
|
|
@@ -1953,6 +1980,9 @@ func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error {
|
|
// (in Go 1.8), though. That's a more sane option anyway.
|
|
// (in Go 1.8), though. That's a more sane option anyway.
|
|
if sc.hs.ReadTimeout != 0 {
|
|
if sc.hs.ReadTimeout != 0 {
|
|
sc.conn.SetReadDeadline(time.Time{})
|
|
sc.conn.SetReadDeadline(time.Time{})
|
|
|
|
+ if st.body != nil {
|
|
|
|
+ st.readDeadline = time.AfterFunc(sc.hs.ReadTimeout, st.onReadTimeout)
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
go sc.runHandler(rw, req, handler)
|
|
go sc.runHandler(rw, req, handler)
|
|
@@ -2021,9 +2051,6 @@ func (sc *serverConn) checkPriority(streamID uint32, p PriorityParam) error {
|
|
}
|
|
}
|
|
|
|
|
|
func (sc *serverConn) processPriority(f *PriorityFrame) error {
|
|
func (sc *serverConn) processPriority(f *PriorityFrame) error {
|
|
- if sc.inGoAway {
|
|
|
|
- return nil
|
|
|
|
- }
|
|
|
|
if err := sc.checkPriority(f.StreamID, f.PriorityParam); err != nil {
|
|
if err := sc.checkPriority(f.StreamID, f.PriorityParam); err != nil {
|
|
return err
|
|
return err
|
|
}
|
|
}
|
|
@@ -2322,39 +2349,24 @@ func (sc *serverConn) noteBodyReadFromHandler(st *stream, n int, err error) {
|
|
|
|
|
|
func (sc *serverConn) noteBodyRead(st *stream, n int) {
|
|
func (sc *serverConn) noteBodyRead(st *stream, n int) {
|
|
sc.serveG.check()
|
|
sc.serveG.check()
|
|
- sc.sendWindowUpdate(nil) // conn-level
|
|
|
|
|
|
+ sc.sendWindowUpdate(nil, n) // conn-level
|
|
if st.state != stateHalfClosedRemote && st.state != stateClosed {
|
|
if st.state != stateHalfClosedRemote && st.state != stateClosed {
|
|
// Don't send this WINDOW_UPDATE if the stream is closed
|
|
// Don't send this WINDOW_UPDATE if the stream is closed
|
|
// remotely.
|
|
// remotely.
|
|
- sc.sendWindowUpdate(st)
|
|
|
|
|
|
+ sc.sendWindowUpdate(st, n)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
// st may be nil for conn-level
|
|
// st may be nil for conn-level
|
|
-func (sc *serverConn) sendWindowUpdate(st *stream) {
|
|
|
|
|
|
+func (sc *serverConn) sendWindowUpdate(st *stream, n int) {
|
|
sc.serveG.check()
|
|
sc.serveG.check()
|
|
-
|
|
|
|
- var n int32
|
|
|
|
- if st == nil {
|
|
|
|
- if avail, windowSize := sc.inflow.available(), sc.srv.initialConnRecvWindowSize(); avail > windowSize/2 {
|
|
|
|
- return
|
|
|
|
- } else {
|
|
|
|
- n = windowSize - avail
|
|
|
|
- }
|
|
|
|
- } else {
|
|
|
|
- if avail, windowSize := st.inflow.available(), sc.srv.initialStreamRecvWindowSize(); avail > windowSize/2 {
|
|
|
|
- return
|
|
|
|
- } else {
|
|
|
|
- n = windowSize - avail
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
// "The legal range for the increment to the flow control
|
|
// "The legal range for the increment to the flow control
|
|
// window is 1 to 2^31-1 (2,147,483,647) octets."
|
|
// window is 1 to 2^31-1 (2,147,483,647) octets."
|
|
// A Go Read call on 64-bit machines could in theory read
|
|
// A Go Read call on 64-bit machines could in theory read
|
|
// a larger Read than this. Very unlikely, but we handle it here
|
|
// a larger Read than this. Very unlikely, but we handle it here
|
|
// rather than elsewhere for now.
|
|
// rather than elsewhere for now.
|
|
const maxUint31 = 1<<31 - 1
|
|
const maxUint31 = 1<<31 - 1
|
|
- for n >= maxUint31 {
|
|
|
|
|
|
+ for n > maxUint31 {
|
|
sc.sendWindowUpdate32(st, maxUint31)
|
|
sc.sendWindowUpdate32(st, maxUint31)
|
|
n -= maxUint31
|
|
n -= maxUint31
|
|
}
|
|
}
|
|
@@ -2474,7 +2486,15 @@ type responseWriterState struct {
|
|
|
|
|
|
type chunkWriter struct{ rws *responseWriterState }
|
|
type chunkWriter struct{ rws *responseWriterState }
|
|
|
|
|
|
-func (cw chunkWriter) Write(p []byte) (n int, err error) { return cw.rws.writeChunk(p) }
|
|
|
|
|
|
+func (cw chunkWriter) Write(p []byte) (n int, err error) {
|
|
|
|
+ n, err = cw.rws.writeChunk(p)
|
|
|
|
+ if err == errStreamClosed {
|
|
|
|
+ // If writing failed because the stream has been closed,
|
|
|
|
+ // return the reason it was closed.
|
|
|
|
+ err = cw.rws.stream.closeErr
|
|
|
|
+ }
|
|
|
|
+ return n, err
|
|
|
|
+}
|
|
|
|
|
|
func (rws *responseWriterState) hasTrailers() bool { return len(rws.trailers) > 0 }
|
|
func (rws *responseWriterState) hasTrailers() bool { return len(rws.trailers) > 0 }
|
|
|
|
|
|
@@ -2668,23 +2688,85 @@ func (rws *responseWriterState) promoteUndeclaredTrailers() {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+func (w *responseWriter) SetReadDeadline(deadline time.Time) error {
|
|
|
|
+ st := w.rws.stream
|
|
|
|
+ if !deadline.IsZero() && deadline.Before(time.Now()) {
|
|
|
|
+ // If we're setting a deadline in the past, reset the stream immediately
|
|
|
|
+ // so writes after SetWriteDeadline returns will fail.
|
|
|
|
+ st.onReadTimeout()
|
|
|
|
+ return nil
|
|
|
|
+ }
|
|
|
|
+ w.rws.conn.sendServeMsg(func(sc *serverConn) {
|
|
|
|
+ if st.readDeadline != nil {
|
|
|
|
+ if !st.readDeadline.Stop() {
|
|
|
|
+ // Deadline already exceeded, or stream has been closed.
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if deadline.IsZero() {
|
|
|
|
+ st.readDeadline = nil
|
|
|
|
+ } else if st.readDeadline == nil {
|
|
|
|
+ st.readDeadline = time.AfterFunc(deadline.Sub(time.Now()), st.onReadTimeout)
|
|
|
|
+ } else {
|
|
|
|
+ st.readDeadline.Reset(deadline.Sub(time.Now()))
|
|
|
|
+ }
|
|
|
|
+ })
|
|
|
|
+ return nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (w *responseWriter) SetWriteDeadline(deadline time.Time) error {
|
|
|
|
+ st := w.rws.stream
|
|
|
|
+ if !deadline.IsZero() && deadline.Before(time.Now()) {
|
|
|
|
+ // If we're setting a deadline in the past, reset the stream immediately
|
|
|
|
+ // so writes after SetWriteDeadline returns will fail.
|
|
|
|
+ st.onWriteTimeout()
|
|
|
|
+ return nil
|
|
|
|
+ }
|
|
|
|
+ w.rws.conn.sendServeMsg(func(sc *serverConn) {
|
|
|
|
+ if st.writeDeadline != nil {
|
|
|
|
+ if !st.writeDeadline.Stop() {
|
|
|
|
+ // Deadline already exceeded, or stream has been closed.
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if deadline.IsZero() {
|
|
|
|
+ st.writeDeadline = nil
|
|
|
|
+ } else if st.writeDeadline == nil {
|
|
|
|
+ st.writeDeadline = time.AfterFunc(deadline.Sub(time.Now()), st.onWriteTimeout)
|
|
|
|
+ } else {
|
|
|
|
+ st.writeDeadline.Reset(deadline.Sub(time.Now()))
|
|
|
|
+ }
|
|
|
|
+ })
|
|
|
|
+ return nil
|
|
|
|
+}
|
|
|
|
+
|
|
func (w *responseWriter) Flush() {
|
|
func (w *responseWriter) Flush() {
|
|
|
|
+ w.FlushError()
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (w *responseWriter) FlushError() error {
|
|
rws := w.rws
|
|
rws := w.rws
|
|
if rws == nil {
|
|
if rws == nil {
|
|
panic("Header called after Handler finished")
|
|
panic("Header called after Handler finished")
|
|
}
|
|
}
|
|
|
|
+ var err error
|
|
if rws.bw.Buffered() > 0 {
|
|
if rws.bw.Buffered() > 0 {
|
|
- if err := rws.bw.Flush(); err != nil {
|
|
|
|
- // Ignore the error. The frame writer already knows.
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
|
|
+ err = rws.bw.Flush()
|
|
} else {
|
|
} else {
|
|
// The bufio.Writer won't call chunkWriter.Write
|
|
// The bufio.Writer won't call chunkWriter.Write
|
|
// (writeChunk with zero bytes, so we have to do it
|
|
// (writeChunk with zero bytes, so we have to do it
|
|
// ourselves to force the HTTP response header and/or
|
|
// ourselves to force the HTTP response header and/or
|
|
// final DATA frame (with END_STREAM) to be sent.
|
|
// final DATA frame (with END_STREAM) to be sent.
|
|
- rws.writeChunk(nil)
|
|
|
|
|
|
+ _, err = chunkWriter{rws}.Write(nil)
|
|
|
|
+ if err == nil {
|
|
|
|
+ select {
|
|
|
|
+ case <-rws.stream.cw:
|
|
|
|
+ err = rws.stream.closeErr
|
|
|
|
+ default:
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
+ return err
|
|
}
|
|
}
|
|
|
|
|
|
func (w *responseWriter) CloseNotify() <-chan bool {
|
|
func (w *responseWriter) CloseNotify() <-chan bool {
|