|
@@ -18,6 +18,7 @@ import (
|
|
"io/ioutil"
|
|
"io/ioutil"
|
|
"log"
|
|
"log"
|
|
"math"
|
|
"math"
|
|
|
|
+ mathrand "math/rand"
|
|
"net"
|
|
"net"
|
|
"net/http"
|
|
"net/http"
|
|
"sort"
|
|
"sort"
|
|
@@ -86,7 +87,7 @@ type Transport struct {
|
|
|
|
|
|
// MaxHeaderListSize is the http2 SETTINGS_MAX_HEADER_LIST_SIZE to
|
|
// MaxHeaderListSize is the http2 SETTINGS_MAX_HEADER_LIST_SIZE to
|
|
// send in the initial settings frame. It is how many bytes
|
|
// send in the initial settings frame. It is how many bytes
|
|
- // of response headers are allow. Unlike the http2 spec, zero here
|
|
|
|
|
|
+ // of response headers are allowed. Unlike the http2 spec, zero here
|
|
// means to use a default limit (currently 10MB). If you actually
|
|
// means to use a default limit (currently 10MB). If you actually
|
|
// want to advertise an ulimited value to the peer, Transport
|
|
// want to advertise an ulimited value to the peer, Transport
|
|
// interprets the highest possible value here (0xffffffff or 1<<32-1)
|
|
// interprets the highest possible value here (0xffffffff or 1<<32-1)
|
|
@@ -164,15 +165,17 @@ type ClientConn struct {
|
|
goAwayDebug string // goAway frame's debug data, retained as a string
|
|
goAwayDebug string // goAway frame's debug data, retained as a string
|
|
streams map[uint32]*clientStream // client-initiated
|
|
streams map[uint32]*clientStream // client-initiated
|
|
nextStreamID uint32
|
|
nextStreamID uint32
|
|
|
|
+ pendingRequests int // requests blocked and waiting to be sent because len(streams) == maxConcurrentStreams
|
|
pings map[[8]byte]chan struct{} // in flight ping data to notification channel
|
|
pings map[[8]byte]chan struct{} // in flight ping data to notification channel
|
|
bw *bufio.Writer
|
|
bw *bufio.Writer
|
|
br *bufio.Reader
|
|
br *bufio.Reader
|
|
fr *Framer
|
|
fr *Framer
|
|
lastActive time.Time
|
|
lastActive time.Time
|
|
// Settings from peer: (also guarded by mu)
|
|
// Settings from peer: (also guarded by mu)
|
|
- maxFrameSize uint32
|
|
|
|
- maxConcurrentStreams uint32
|
|
|
|
- initialWindowSize uint32
|
|
|
|
|
|
+ maxFrameSize uint32
|
|
|
|
+ maxConcurrentStreams uint32
|
|
|
|
+ peerMaxHeaderListSize uint64
|
|
|
|
+ initialWindowSize uint32
|
|
|
|
|
|
hbuf bytes.Buffer // HPACK encoder writes into this
|
|
hbuf bytes.Buffer // HPACK encoder writes into this
|
|
henc *hpack.Encoder
|
|
henc *hpack.Encoder
|
|
@@ -216,35 +219,45 @@ type clientStream struct {
|
|
resTrailer *http.Header // client's Response.Trailer
|
|
resTrailer *http.Header // client's Response.Trailer
|
|
}
|
|
}
|
|
|
|
|
|
-// awaitRequestCancel runs in its own goroutine and waits for the user
|
|
|
|
-// to cancel a RoundTrip request, its context to expire, or for the
|
|
|
|
-// request to be done (any way it might be removed from the cc.streams
|
|
|
|
-// map: peer reset, successful completion, TCP connection breakage,
|
|
|
|
-// etc)
|
|
|
|
-func (cs *clientStream) awaitRequestCancel(req *http.Request) {
|
|
|
|
|
|
+// awaitRequestCancel waits for the user to cancel a request or for the done
|
|
|
|
+// channel to be signaled. A non-nil error is returned only if the request was
|
|
|
|
+// canceled.
|
|
|
|
+func awaitRequestCancel(req *http.Request, done <-chan struct{}) error {
|
|
ctx := reqContext(req)
|
|
ctx := reqContext(req)
|
|
if req.Cancel == nil && ctx.Done() == nil {
|
|
if req.Cancel == nil && ctx.Done() == nil {
|
|
- return
|
|
|
|
|
|
+ return nil
|
|
}
|
|
}
|
|
select {
|
|
select {
|
|
case <-req.Cancel:
|
|
case <-req.Cancel:
|
|
- cs.cancelStream()
|
|
|
|
- cs.bufPipe.CloseWithError(errRequestCanceled)
|
|
|
|
|
|
+ return errRequestCanceled
|
|
case <-ctx.Done():
|
|
case <-ctx.Done():
|
|
|
|
+ return ctx.Err()
|
|
|
|
+ case <-done:
|
|
|
|
+ return nil
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// awaitRequestCancel waits for the user to cancel a request, its context to
|
|
|
|
+// expire, or for the request to be done (any way it might be removed from the
|
|
|
|
+// cc.streams map: peer reset, successful completion, TCP connection breakage,
|
|
|
|
+// etc). If the request is canceled, then cs will be canceled and closed.
|
|
|
|
+func (cs *clientStream) awaitRequestCancel(req *http.Request) {
|
|
|
|
+ if err := awaitRequestCancel(req, cs.done); err != nil {
|
|
cs.cancelStream()
|
|
cs.cancelStream()
|
|
- cs.bufPipe.CloseWithError(ctx.Err())
|
|
|
|
- case <-cs.done:
|
|
|
|
|
|
+ cs.bufPipe.CloseWithError(err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
func (cs *clientStream) cancelStream() {
|
|
func (cs *clientStream) cancelStream() {
|
|
- cs.cc.mu.Lock()
|
|
|
|
|
|
+ cc := cs.cc
|
|
|
|
+ cc.mu.Lock()
|
|
didReset := cs.didReset
|
|
didReset := cs.didReset
|
|
cs.didReset = true
|
|
cs.didReset = true
|
|
- cs.cc.mu.Unlock()
|
|
|
|
|
|
+ cc.mu.Unlock()
|
|
|
|
|
|
if !didReset {
|
|
if !didReset {
|
|
- cs.cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
|
|
|
|
|
|
+ cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
|
|
|
|
+ cc.forgetStreamID(cs.ID)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -329,7 +342,7 @@ func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Res
|
|
}
|
|
}
|
|
|
|
|
|
addr := authorityAddr(req.URL.Scheme, req.URL.Host)
|
|
addr := authorityAddr(req.URL.Scheme, req.URL.Host)
|
|
- for {
|
|
|
|
|
|
+ for retry := 0; ; retry++ {
|
|
cc, err := t.connPool().GetClientConn(req, addr)
|
|
cc, err := t.connPool().GetClientConn(req, addr)
|
|
if err != nil {
|
|
if err != nil {
|
|
t.vlogf("http2: Transport failed to get client conn for %s: %v", addr, err)
|
|
t.vlogf("http2: Transport failed to get client conn for %s: %v", addr, err)
|
|
@@ -337,9 +350,25 @@ func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Res
|
|
}
|
|
}
|
|
traceGotConn(req, cc)
|
|
traceGotConn(req, cc)
|
|
res, err := cc.RoundTrip(req)
|
|
res, err := cc.RoundTrip(req)
|
|
- if err != nil {
|
|
|
|
- if req, err = shouldRetryRequest(req, err); err == nil {
|
|
|
|
- continue
|
|
|
|
|
|
+ if err != nil && retry <= 6 {
|
|
|
|
+ afterBodyWrite := false
|
|
|
|
+ if e, ok := err.(afterReqBodyWriteError); ok {
|
|
|
|
+ err = e
|
|
|
|
+ afterBodyWrite = true
|
|
|
|
+ }
|
|
|
|
+ if req, err = shouldRetryRequest(req, err, afterBodyWrite); err == nil {
|
|
|
|
+ // After the first retry, do exponential backoff with 10% jitter.
|
|
|
|
+ if retry == 0 {
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+ backoff := float64(uint(1) << (uint(retry) - 1))
|
|
|
|
+ backoff += backoff * (0.1 * mathrand.Float64())
|
|
|
|
+ select {
|
|
|
|
+ case <-time.After(time.Second * time.Duration(backoff)):
|
|
|
|
+ continue
|
|
|
|
+ case <-reqContext(req).Done():
|
|
|
|
+ return nil, reqContext(req).Err()
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if err != nil {
|
|
if err != nil {
|
|
@@ -360,43 +389,60 @@ func (t *Transport) CloseIdleConnections() {
|
|
}
|
|
}
|
|
|
|
|
|
var (
|
|
var (
|
|
- errClientConnClosed = errors.New("http2: client conn is closed")
|
|
|
|
- errClientConnUnusable = errors.New("http2: client conn not usable")
|
|
|
|
-
|
|
|
|
- errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY")
|
|
|
|
- errClientConnGotGoAwayAfterSomeReqBody = errors.New("http2: Transport received Server's graceful shutdown GOAWAY; some request body already written")
|
|
|
|
|
|
+ errClientConnClosed = errors.New("http2: client conn is closed")
|
|
|
|
+ errClientConnUnusable = errors.New("http2: client conn not usable")
|
|
|
|
+ errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY")
|
|
)
|
|
)
|
|
|
|
|
|
|
|
+// afterReqBodyWriteError is a wrapper around errors returned by ClientConn.RoundTrip.
|
|
|
|
+// It is used to signal that err happened after part of Request.Body was sent to the server.
|
|
|
|
+type afterReqBodyWriteError struct {
|
|
|
|
+ err error
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (e afterReqBodyWriteError) Error() string {
|
|
|
|
+ return e.err.Error() + "; some request body already written"
|
|
|
|
+}
|
|
|
|
+
|
|
// shouldRetryRequest is called by RoundTrip when a request fails to get
|
|
// shouldRetryRequest is called by RoundTrip when a request fails to get
|
|
// response headers. It is always called with a non-nil error.
|
|
// response headers. It is always called with a non-nil error.
|
|
// It returns either a request to retry (either the same request, or a
|
|
// It returns either a request to retry (either the same request, or a
|
|
// modified clone), or an error if the request can't be replayed.
|
|
// modified clone), or an error if the request can't be replayed.
|
|
-func shouldRetryRequest(req *http.Request, err error) (*http.Request, error) {
|
|
|
|
- switch err {
|
|
|
|
- default:
|
|
|
|
|
|
+func shouldRetryRequest(req *http.Request, err error, afterBodyWrite bool) (*http.Request, error) {
|
|
|
|
+ if !canRetryError(err) {
|
|
return nil, err
|
|
return nil, err
|
|
- case errClientConnUnusable, errClientConnGotGoAway:
|
|
|
|
|
|
+ }
|
|
|
|
+ if !afterBodyWrite {
|
|
return req, nil
|
|
return req, nil
|
|
- case errClientConnGotGoAwayAfterSomeReqBody:
|
|
|
|
- // If the Body is nil (or http.NoBody), it's safe to reuse
|
|
|
|
- // this request and its Body.
|
|
|
|
- if req.Body == nil || reqBodyIsNoBody(req.Body) {
|
|
|
|
- return req, nil
|
|
|
|
- }
|
|
|
|
- // Otherwise we depend on the Request having its GetBody
|
|
|
|
- // func defined.
|
|
|
|
- getBody := reqGetBody(req) // Go 1.8: getBody = req.GetBody
|
|
|
|
- if getBody == nil {
|
|
|
|
- return nil, errors.New("http2: Transport: peer server initiated graceful shutdown after some of Request.Body was written; define Request.GetBody to avoid this error")
|
|
|
|
- }
|
|
|
|
- body, err := getBody()
|
|
|
|
- if err != nil {
|
|
|
|
- return nil, err
|
|
|
|
- }
|
|
|
|
- newReq := *req
|
|
|
|
- newReq.Body = body
|
|
|
|
- return &newReq, nil
|
|
|
|
}
|
|
}
|
|
|
|
+ // If the Body is nil (or http.NoBody), it's safe to reuse
|
|
|
|
+ // this request and its Body.
|
|
|
|
+ if req.Body == nil || reqBodyIsNoBody(req.Body) {
|
|
|
|
+ return req, nil
|
|
|
|
+ }
|
|
|
|
+ // Otherwise we depend on the Request having its GetBody
|
|
|
|
+ // func defined.
|
|
|
|
+ getBody := reqGetBody(req) // Go 1.8: getBody = req.GetBody
|
|
|
|
+ if getBody == nil {
|
|
|
|
+ return nil, fmt.Errorf("http2: Transport: cannot retry err [%v] after Request.Body was written; define Request.GetBody to avoid this error", err)
|
|
|
|
+ }
|
|
|
|
+ body, err := getBody()
|
|
|
|
+ if err != nil {
|
|
|
|
+ return nil, err
|
|
|
|
+ }
|
|
|
|
+ newReq := *req
|
|
|
|
+ newReq.Body = body
|
|
|
|
+ return &newReq, nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func canRetryError(err error) bool {
|
|
|
|
+ if err == errClientConnUnusable || err == errClientConnGotGoAway {
|
|
|
|
+ return true
|
|
|
|
+ }
|
|
|
|
+ if se, ok := err.(StreamError); ok {
|
|
|
|
+ return se.Code == ErrCodeRefusedStream
|
|
|
|
+ }
|
|
|
|
+ return false
|
|
}
|
|
}
|
|
|
|
|
|
func (t *Transport) dialClientConn(addr string, singleUse bool) (*ClientConn, error) {
|
|
func (t *Transport) dialClientConn(addr string, singleUse bool) (*ClientConn, error) {
|
|
@@ -474,17 +520,18 @@ func (t *Transport) NewClientConn(c net.Conn) (*ClientConn, error) {
|
|
|
|
|
|
func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, error) {
|
|
func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, error) {
|
|
cc := &ClientConn{
|
|
cc := &ClientConn{
|
|
- t: t,
|
|
|
|
- tconn: c,
|
|
|
|
- readerDone: make(chan struct{}),
|
|
|
|
- nextStreamID: 1,
|
|
|
|
- maxFrameSize: 16 << 10, // spec default
|
|
|
|
- initialWindowSize: 65535, // spec default
|
|
|
|
- maxConcurrentStreams: 1000, // "infinite", per spec. 1000 seems good enough.
|
|
|
|
- streams: make(map[uint32]*clientStream),
|
|
|
|
- singleUse: singleUse,
|
|
|
|
- wantSettingsAck: true,
|
|
|
|
- pings: make(map[[8]byte]chan struct{}),
|
|
|
|
|
|
+ t: t,
|
|
|
|
+ tconn: c,
|
|
|
|
+ readerDone: make(chan struct{}),
|
|
|
|
+ nextStreamID: 1,
|
|
|
|
+ maxFrameSize: 16 << 10, // spec default
|
|
|
|
+ initialWindowSize: 65535, // spec default
|
|
|
|
+ maxConcurrentStreams: 1000, // "infinite", per spec. 1000 seems good enough.
|
|
|
|
+ peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead.
|
|
|
|
+ streams: make(map[uint32]*clientStream),
|
|
|
|
+ singleUse: singleUse,
|
|
|
|
+ wantSettingsAck: true,
|
|
|
|
+ pings: make(map[[8]byte]chan struct{}),
|
|
}
|
|
}
|
|
if d := t.idleConnTimeout(); d != 0 {
|
|
if d := t.idleConnTimeout(); d != 0 {
|
|
cc.idleTimeout = d
|
|
cc.idleTimeout = d
|
|
@@ -560,6 +607,8 @@ func (cc *ClientConn) setGoAway(f *GoAwayFrame) {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+// CanTakeNewRequest reports whether the connection can take a new request,
|
|
|
|
+// meaning it has not been closed or received or sent a GOAWAY.
|
|
func (cc *ClientConn) CanTakeNewRequest() bool {
|
|
func (cc *ClientConn) CanTakeNewRequest() bool {
|
|
cc.mu.Lock()
|
|
cc.mu.Lock()
|
|
defer cc.mu.Unlock()
|
|
defer cc.mu.Unlock()
|
|
@@ -571,8 +620,7 @@ func (cc *ClientConn) canTakeNewRequestLocked() bool {
|
|
return false
|
|
return false
|
|
}
|
|
}
|
|
return cc.goAway == nil && !cc.closed &&
|
|
return cc.goAway == nil && !cc.closed &&
|
|
- int64(len(cc.streams)+1) < int64(cc.maxConcurrentStreams) &&
|
|
|
|
- cc.nextStreamID < math.MaxInt32
|
|
|
|
|
|
+ int64(cc.nextStreamID)+int64(cc.pendingRequests) < math.MaxInt32
|
|
}
|
|
}
|
|
|
|
|
|
// onIdleTimeout is called from a time.AfterFunc goroutine. It will
|
|
// onIdleTimeout is called from a time.AfterFunc goroutine. It will
|
|
@@ -694,7 +742,7 @@ func checkConnHeaders(req *http.Request) error {
|
|
// req.ContentLength, where 0 actually means zero (not unknown) and -1
|
|
// req.ContentLength, where 0 actually means zero (not unknown) and -1
|
|
// means unknown.
|
|
// means unknown.
|
|
func actualContentLength(req *http.Request) int64 {
|
|
func actualContentLength(req *http.Request) int64 {
|
|
- if req.Body == nil {
|
|
|
|
|
|
+ if req.Body == nil || reqBodyIsNoBody(req.Body) {
|
|
return 0
|
|
return 0
|
|
}
|
|
}
|
|
if req.ContentLength != 0 {
|
|
if req.ContentLength != 0 {
|
|
@@ -718,15 +766,14 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
|
|
hasTrailers := trailers != ""
|
|
hasTrailers := trailers != ""
|
|
|
|
|
|
cc.mu.Lock()
|
|
cc.mu.Lock()
|
|
- cc.lastActive = time.Now()
|
|
|
|
- if cc.closed || !cc.canTakeNewRequestLocked() {
|
|
|
|
|
|
+ if err := cc.awaitOpenSlotForRequest(req); err != nil {
|
|
cc.mu.Unlock()
|
|
cc.mu.Unlock()
|
|
- return nil, errClientConnUnusable
|
|
|
|
|
|
+ return nil, err
|
|
}
|
|
}
|
|
|
|
|
|
body := req.Body
|
|
body := req.Body
|
|
- hasBody := body != nil
|
|
|
|
contentLen := actualContentLength(req)
|
|
contentLen := actualContentLength(req)
|
|
|
|
+ hasBody := contentLen != 0
|
|
|
|
|
|
// TODO(bradfitz): this is a copy of the logic in net/http. Unify somewhere?
|
|
// TODO(bradfitz): this is a copy of the logic in net/http. Unify somewhere?
|
|
var requestedGzip bool
|
|
var requestedGzip bool
|
|
@@ -816,14 +863,13 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
|
|
cs.abortRequestBodyWrite(errStopReqBodyWrite)
|
|
cs.abortRequestBodyWrite(errStopReqBodyWrite)
|
|
}
|
|
}
|
|
if re.err != nil {
|
|
if re.err != nil {
|
|
- if re.err == errClientConnGotGoAway {
|
|
|
|
- cc.mu.Lock()
|
|
|
|
- if cs.startedWrite {
|
|
|
|
- re.err = errClientConnGotGoAwayAfterSomeReqBody
|
|
|
|
- }
|
|
|
|
- cc.mu.Unlock()
|
|
|
|
- }
|
|
|
|
|
|
+ cc.mu.Lock()
|
|
|
|
+ afterBodyWrite := cs.startedWrite
|
|
|
|
+ cc.mu.Unlock()
|
|
cc.forgetStreamID(cs.ID)
|
|
cc.forgetStreamID(cs.ID)
|
|
|
|
+ if afterBodyWrite {
|
|
|
|
+ return nil, afterReqBodyWriteError{re.err}
|
|
|
|
+ }
|
|
return nil, re.err
|
|
return nil, re.err
|
|
}
|
|
}
|
|
res.Request = req
|
|
res.Request = req
|
|
@@ -836,31 +882,31 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
|
|
case re := <-readLoopResCh:
|
|
case re := <-readLoopResCh:
|
|
return handleReadLoopResponse(re)
|
|
return handleReadLoopResponse(re)
|
|
case <-respHeaderTimer:
|
|
case <-respHeaderTimer:
|
|
- cc.forgetStreamID(cs.ID)
|
|
|
|
if !hasBody || bodyWritten {
|
|
if !hasBody || bodyWritten {
|
|
cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
|
|
cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
|
|
} else {
|
|
} else {
|
|
bodyWriter.cancel()
|
|
bodyWriter.cancel()
|
|
cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel)
|
|
cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel)
|
|
}
|
|
}
|
|
|
|
+ cc.forgetStreamID(cs.ID)
|
|
return nil, errTimeout
|
|
return nil, errTimeout
|
|
case <-ctx.Done():
|
|
case <-ctx.Done():
|
|
- cc.forgetStreamID(cs.ID)
|
|
|
|
if !hasBody || bodyWritten {
|
|
if !hasBody || bodyWritten {
|
|
cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
|
|
cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
|
|
} else {
|
|
} else {
|
|
bodyWriter.cancel()
|
|
bodyWriter.cancel()
|
|
cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel)
|
|
cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel)
|
|
}
|
|
}
|
|
|
|
+ cc.forgetStreamID(cs.ID)
|
|
return nil, ctx.Err()
|
|
return nil, ctx.Err()
|
|
case <-req.Cancel:
|
|
case <-req.Cancel:
|
|
- cc.forgetStreamID(cs.ID)
|
|
|
|
if !hasBody || bodyWritten {
|
|
if !hasBody || bodyWritten {
|
|
cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
|
|
cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
|
|
} else {
|
|
} else {
|
|
bodyWriter.cancel()
|
|
bodyWriter.cancel()
|
|
cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel)
|
|
cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel)
|
|
}
|
|
}
|
|
|
|
+ cc.forgetStreamID(cs.ID)
|
|
return nil, errRequestCanceled
|
|
return nil, errRequestCanceled
|
|
case <-cs.peerReset:
|
|
case <-cs.peerReset:
|
|
// processResetStream already removed the
|
|
// processResetStream already removed the
|
|
@@ -887,6 +933,45 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+// awaitOpenSlotForRequest waits until len(streams) < maxConcurrentStreams.
|
|
|
|
+// Must hold cc.mu.
|
|
|
|
+func (cc *ClientConn) awaitOpenSlotForRequest(req *http.Request) error {
|
|
|
|
+ var waitingForConn chan struct{}
|
|
|
|
+ var waitingForConnErr error // guarded by cc.mu
|
|
|
|
+ for {
|
|
|
|
+ cc.lastActive = time.Now()
|
|
|
|
+ if cc.closed || !cc.canTakeNewRequestLocked() {
|
|
|
|
+ return errClientConnUnusable
|
|
|
|
+ }
|
|
|
|
+ if int64(len(cc.streams))+1 <= int64(cc.maxConcurrentStreams) {
|
|
|
|
+ if waitingForConn != nil {
|
|
|
|
+ close(waitingForConn)
|
|
|
|
+ }
|
|
|
|
+ return nil
|
|
|
|
+ }
|
|
|
|
+ // Unfortunately, we cannot wait on a condition variable and channel at
|
|
|
|
+ // the same time, so instead, we spin up a goroutine to check if the
|
|
|
|
+ // request is canceled while we wait for a slot to open in the connection.
|
|
|
|
+ if waitingForConn == nil {
|
|
|
|
+ waitingForConn = make(chan struct{})
|
|
|
|
+ go func() {
|
|
|
|
+ if err := awaitRequestCancel(req, waitingForConn); err != nil {
|
|
|
|
+ cc.mu.Lock()
|
|
|
|
+ waitingForConnErr = err
|
|
|
|
+ cc.cond.Broadcast()
|
|
|
|
+ cc.mu.Unlock()
|
|
|
|
+ }
|
|
|
|
+ }()
|
|
|
|
+ }
|
|
|
|
+ cc.pendingRequests++
|
|
|
|
+ cc.cond.Wait()
|
|
|
|
+ cc.pendingRequests--
|
|
|
|
+ if waitingForConnErr != nil {
|
|
|
|
+ return waitingForConnErr
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
// requires cc.wmu be held
|
|
// requires cc.wmu be held
|
|
func (cc *ClientConn) writeHeaders(streamID uint32, endStream bool, hdrs []byte) error {
|
|
func (cc *ClientConn) writeHeaders(streamID uint32, endStream bool, hdrs []byte) error {
|
|
first := true // first frame written (HEADERS is first, then CONTINUATION)
|
|
first := true // first frame written (HEADERS is first, then CONTINUATION)
|
|
@@ -1002,8 +1087,13 @@ func (cs *clientStream) writeRequestBody(body io.Reader, bodyCloser io.Closer) (
|
|
var trls []byte
|
|
var trls []byte
|
|
if hasTrailers {
|
|
if hasTrailers {
|
|
cc.mu.Lock()
|
|
cc.mu.Lock()
|
|
- defer cc.mu.Unlock()
|
|
|
|
- trls = cc.encodeTrailers(req)
|
|
|
|
|
|
+ trls, err = cc.encodeTrailers(req)
|
|
|
|
+ cc.mu.Unlock()
|
|
|
|
+ if err != nil {
|
|
|
|
+ cc.writeStreamReset(cs.ID, ErrCodeInternal, err)
|
|
|
|
+ cc.forgetStreamID(cs.ID)
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
cc.wmu.Lock()
|
|
cc.wmu.Lock()
|
|
@@ -1106,62 +1196,86 @@ func (cc *ClientConn) encodeHeaders(req *http.Request, addGzipHeader bool, trail
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- // 8.1.2.3 Request Pseudo-Header Fields
|
|
|
|
- // The :path pseudo-header field includes the path and query parts of the
|
|
|
|
- // target URI (the path-absolute production and optionally a '?' character
|
|
|
|
- // followed by the query production (see Sections 3.3 and 3.4 of
|
|
|
|
- // [RFC3986]).
|
|
|
|
- cc.writeHeader(":authority", host)
|
|
|
|
- cc.writeHeader(":method", req.Method)
|
|
|
|
- if req.Method != "CONNECT" {
|
|
|
|
- cc.writeHeader(":path", path)
|
|
|
|
- cc.writeHeader(":scheme", req.URL.Scheme)
|
|
|
|
- }
|
|
|
|
- if trailers != "" {
|
|
|
|
- cc.writeHeader("trailer", trailers)
|
|
|
|
- }
|
|
|
|
|
|
+ enumerateHeaders := func(f func(name, value string)) {
|
|
|
|
+ // 8.1.2.3 Request Pseudo-Header Fields
|
|
|
|
+ // The :path pseudo-header field includes the path and query parts of the
|
|
|
|
+ // target URI (the path-absolute production and optionally a '?' character
|
|
|
|
+ // followed by the query production (see Sections 3.3 and 3.4 of
|
|
|
|
+ // [RFC3986]).
|
|
|
|
+ f(":authority", host)
|
|
|
|
+ f(":method", req.Method)
|
|
|
|
+ if req.Method != "CONNECT" {
|
|
|
|
+ f(":path", path)
|
|
|
|
+ f(":scheme", req.URL.Scheme)
|
|
|
|
+ }
|
|
|
|
+ if trailers != "" {
|
|
|
|
+ f("trailer", trailers)
|
|
|
|
+ }
|
|
|
|
|
|
- var didUA bool
|
|
|
|
- for k, vv := range req.Header {
|
|
|
|
- lowKey := strings.ToLower(k)
|
|
|
|
- switch lowKey {
|
|
|
|
- case "host", "content-length":
|
|
|
|
- // Host is :authority, already sent.
|
|
|
|
- // Content-Length is automatic, set below.
|
|
|
|
- continue
|
|
|
|
- case "connection", "proxy-connection", "transfer-encoding", "upgrade", "keep-alive":
|
|
|
|
- // Per 8.1.2.2 Connection-Specific Header
|
|
|
|
- // Fields, don't send connection-specific
|
|
|
|
- // fields. We have already checked if any
|
|
|
|
- // are error-worthy so just ignore the rest.
|
|
|
|
- continue
|
|
|
|
- case "user-agent":
|
|
|
|
- // Match Go's http1 behavior: at most one
|
|
|
|
- // User-Agent. If set to nil or empty string,
|
|
|
|
- // then omit it. Otherwise if not mentioned,
|
|
|
|
- // include the default (below).
|
|
|
|
- didUA = true
|
|
|
|
- if len(vv) < 1 {
|
|
|
|
|
|
+ var didUA bool
|
|
|
|
+ for k, vv := range req.Header {
|
|
|
|
+ if strings.EqualFold(k, "host") || strings.EqualFold(k, "content-length") {
|
|
|
|
+ // Host is :authority, already sent.
|
|
|
|
+ // Content-Length is automatic, set below.
|
|
continue
|
|
continue
|
|
- }
|
|
|
|
- vv = vv[:1]
|
|
|
|
- if vv[0] == "" {
|
|
|
|
|
|
+ } else if strings.EqualFold(k, "connection") || strings.EqualFold(k, "proxy-connection") ||
|
|
|
|
+ strings.EqualFold(k, "transfer-encoding") || strings.EqualFold(k, "upgrade") ||
|
|
|
|
+ strings.EqualFold(k, "keep-alive") {
|
|
|
|
+ // Per 8.1.2.2 Connection-Specific Header
|
|
|
|
+ // Fields, don't send connection-specific
|
|
|
|
+ // fields. We have already checked if any
|
|
|
|
+ // are error-worthy so just ignore the rest.
|
|
continue
|
|
continue
|
|
|
|
+ } else if strings.EqualFold(k, "user-agent") {
|
|
|
|
+ // Match Go's http1 behavior: at most one
|
|
|
|
+ // User-Agent. If set to nil or empty string,
|
|
|
|
+ // then omit it. Otherwise if not mentioned,
|
|
|
|
+ // include the default (below).
|
|
|
|
+ didUA = true
|
|
|
|
+ if len(vv) < 1 {
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+ vv = vv[:1]
|
|
|
|
+ if vv[0] == "" {
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ for _, v := range vv {
|
|
|
|
+ f(k, v)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- for _, v := range vv {
|
|
|
|
- cc.writeHeader(lowKey, v)
|
|
|
|
|
|
+ if shouldSendReqContentLength(req.Method, contentLength) {
|
|
|
|
+ f("content-length", strconv.FormatInt(contentLength, 10))
|
|
|
|
+ }
|
|
|
|
+ if addGzipHeader {
|
|
|
|
+ f("accept-encoding", "gzip")
|
|
|
|
+ }
|
|
|
|
+ if !didUA {
|
|
|
|
+ f("user-agent", defaultUserAgent)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- if shouldSendReqContentLength(req.Method, contentLength) {
|
|
|
|
- cc.writeHeader("content-length", strconv.FormatInt(contentLength, 10))
|
|
|
|
- }
|
|
|
|
- if addGzipHeader {
|
|
|
|
- cc.writeHeader("accept-encoding", "gzip")
|
|
|
|
- }
|
|
|
|
- if !didUA {
|
|
|
|
- cc.writeHeader("user-agent", defaultUserAgent)
|
|
|
|
|
|
+
|
|
|
|
+ // Do a first pass over the headers counting bytes to ensure
|
|
|
|
+ // we don't exceed cc.peerMaxHeaderListSize. This is done as a
|
|
|
|
+ // separate pass before encoding the headers to prevent
|
|
|
|
+ // modifying the hpack state.
|
|
|
|
+ hlSize := uint64(0)
|
|
|
|
+ enumerateHeaders(func(name, value string) {
|
|
|
|
+ hf := hpack.HeaderField{Name: name, Value: value}
|
|
|
|
+ hlSize += uint64(hf.Size())
|
|
|
|
+ })
|
|
|
|
+
|
|
|
|
+ if hlSize > cc.peerMaxHeaderListSize {
|
|
|
|
+ return nil, errRequestHeaderListSize
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ // Header list size is ok. Write the headers.
|
|
|
|
+ enumerateHeaders(func(name, value string) {
|
|
|
|
+ cc.writeHeader(strings.ToLower(name), value)
|
|
|
|
+ })
|
|
|
|
+
|
|
return cc.hbuf.Bytes(), nil
|
|
return cc.hbuf.Bytes(), nil
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1188,17 +1302,29 @@ func shouldSendReqContentLength(method string, contentLength int64) bool {
|
|
}
|
|
}
|
|
|
|
|
|
// requires cc.mu be held.
|
|
// requires cc.mu be held.
|
|
-func (cc *ClientConn) encodeTrailers(req *http.Request) []byte {
|
|
|
|
|
|
+func (cc *ClientConn) encodeTrailers(req *http.Request) ([]byte, error) {
|
|
cc.hbuf.Reset()
|
|
cc.hbuf.Reset()
|
|
|
|
+
|
|
|
|
+ hlSize := uint64(0)
|
|
|
|
+ for k, vv := range req.Trailer {
|
|
|
|
+ for _, v := range vv {
|
|
|
|
+ hf := hpack.HeaderField{Name: k, Value: v}
|
|
|
|
+ hlSize += uint64(hf.Size())
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if hlSize > cc.peerMaxHeaderListSize {
|
|
|
|
+ return nil, errRequestHeaderListSize
|
|
|
|
+ }
|
|
|
|
+
|
|
for k, vv := range req.Trailer {
|
|
for k, vv := range req.Trailer {
|
|
- // Transfer-Encoding, etc.. have already been filter at the
|
|
|
|
|
|
+ // Transfer-Encoding, etc.. have already been filtered at the
|
|
// start of RoundTrip
|
|
// start of RoundTrip
|
|
lowKey := strings.ToLower(k)
|
|
lowKey := strings.ToLower(k)
|
|
for _, v := range vv {
|
|
for _, v := range vv {
|
|
cc.writeHeader(lowKey, v)
|
|
cc.writeHeader(lowKey, v)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- return cc.hbuf.Bytes()
|
|
|
|
|
|
+ return cc.hbuf.Bytes(), nil
|
|
}
|
|
}
|
|
|
|
|
|
func (cc *ClientConn) writeHeader(name, value string) {
|
|
func (cc *ClientConn) writeHeader(name, value string) {
|
|
@@ -1246,7 +1372,9 @@ func (cc *ClientConn) streamByID(id uint32, andRemove bool) *clientStream {
|
|
cc.idleTimer.Reset(cc.idleTimeout)
|
|
cc.idleTimer.Reset(cc.idleTimeout)
|
|
}
|
|
}
|
|
close(cs.done)
|
|
close(cs.done)
|
|
- cc.cond.Broadcast() // wake up checkResetOrDone via clientStream.awaitFlowControl
|
|
|
|
|
|
+ // Wake up checkResetOrDone via clientStream.awaitFlowControl and
|
|
|
|
+ // wake up RoundTrip if there is a pending request.
|
|
|
|
+ cc.cond.Broadcast()
|
|
}
|
|
}
|
|
return cs
|
|
return cs
|
|
}
|
|
}
|
|
@@ -1345,8 +1473,9 @@ func (rl *clientConnReadLoop) run() error {
|
|
cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
|
|
cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
|
|
}
|
|
}
|
|
if se, ok := err.(StreamError); ok {
|
|
if se, ok := err.(StreamError); ok {
|
|
- if cs := cc.streamByID(se.StreamID, true /*ended; remove it*/); cs != nil {
|
|
|
|
|
|
+ if cs := cc.streamByID(se.StreamID, false); cs != nil {
|
|
cs.cc.writeStreamReset(cs.ID, se.Code, err)
|
|
cs.cc.writeStreamReset(cs.ID, se.Code, err)
|
|
|
|
+ cs.cc.forgetStreamID(cs.ID)
|
|
if se.Cause == nil {
|
|
if se.Cause == nil {
|
|
se.Cause = cc.fr.errDetail
|
|
se.Cause = cc.fr.errDetail
|
|
}
|
|
}
|
|
@@ -1407,7 +1536,17 @@ func (rl *clientConnReadLoop) run() error {
|
|
|
|
|
|
func (rl *clientConnReadLoop) processHeaders(f *MetaHeadersFrame) error {
|
|
func (rl *clientConnReadLoop) processHeaders(f *MetaHeadersFrame) error {
|
|
cc := rl.cc
|
|
cc := rl.cc
|
|
- cs := cc.streamByID(f.StreamID, f.StreamEnded())
|
|
|
|
|
|
+ if f.StreamEnded() {
|
|
|
|
+ // Issue 20521: If the stream has ended, streamByID() causes
|
|
|
|
+ // clientStream.done to be closed, which causes the request's bodyWriter
|
|
|
|
+ // to be closed with an errStreamClosed, which may be received by
|
|
|
|
+ // clientConn.RoundTrip before the result of processing these headers.
|
|
|
|
+ // Deferring stream closure allows the header processing to occur first.
|
|
|
|
+ // clientConn.RoundTrip may still receive the bodyWriter error first, but
|
|
|
|
+ // the fix for issue 16102 prioritises any response.
|
|
|
|
+ defer cc.streamByID(f.StreamID, true)
|
|
|
|
+ }
|
|
|
|
+ cs := cc.streamByID(f.StreamID, false)
|
|
if cs == nil {
|
|
if cs == nil {
|
|
// We'd get here if we canceled a request while the
|
|
// We'd get here if we canceled a request while the
|
|
// server had its response still in flight. So if this
|
|
// server had its response still in flight. So if this
|
|
@@ -1668,6 +1807,7 @@ func (b transportResponseBody) Close() error {
|
|
}
|
|
}
|
|
|
|
|
|
cs.bufPipe.BreakWithError(errClosedResponseBody)
|
|
cs.bufPipe.BreakWithError(errClosedResponseBody)
|
|
|
|
+ cc.forgetStreamID(cs.ID)
|
|
return nil
|
|
return nil
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1702,6 +1842,14 @@ func (rl *clientConnReadLoop) processData(f *DataFrame) error {
|
|
}
|
|
}
|
|
return nil
|
|
return nil
|
|
}
|
|
}
|
|
|
|
+ if !cs.firstByte {
|
|
|
|
+ cc.logf("protocol error: received DATA before a HEADERS frame")
|
|
|
|
+ rl.endStreamError(cs, StreamError{
|
|
|
|
+ StreamID: f.StreamID,
|
|
|
|
+ Code: ErrCodeProtocol,
|
|
|
|
+ })
|
|
|
|
+ return nil
|
|
|
|
+ }
|
|
if f.Length > 0 {
|
|
if f.Length > 0 {
|
|
// Check connection-level flow control.
|
|
// Check connection-level flow control.
|
|
cc.mu.Lock()
|
|
cc.mu.Lock()
|
|
@@ -1713,16 +1861,27 @@ func (rl *clientConnReadLoop) processData(f *DataFrame) error {
|
|
}
|
|
}
|
|
// Return any padded flow control now, since we won't
|
|
// Return any padded flow control now, since we won't
|
|
// refund it later on body reads.
|
|
// refund it later on body reads.
|
|
- if pad := int32(f.Length) - int32(len(data)); pad > 0 {
|
|
|
|
- cs.inflow.add(pad)
|
|
|
|
- cc.inflow.add(pad)
|
|
|
|
|
|
+ var refund int
|
|
|
|
+ if pad := int(f.Length) - len(data); pad > 0 {
|
|
|
|
+ refund += pad
|
|
|
|
+ }
|
|
|
|
+ // Return len(data) now if the stream is already closed,
|
|
|
|
+ // since data will never be read.
|
|
|
|
+ didReset := cs.didReset
|
|
|
|
+ if didReset {
|
|
|
|
+ refund += len(data)
|
|
|
|
+ }
|
|
|
|
+ if refund > 0 {
|
|
|
|
+ cc.inflow.add(int32(refund))
|
|
cc.wmu.Lock()
|
|
cc.wmu.Lock()
|
|
- cc.fr.WriteWindowUpdate(0, uint32(pad))
|
|
|
|
- cc.fr.WriteWindowUpdate(cs.ID, uint32(pad))
|
|
|
|
|
|
+ cc.fr.WriteWindowUpdate(0, uint32(refund))
|
|
|
|
+ if !didReset {
|
|
|
|
+ cs.inflow.add(int32(refund))
|
|
|
|
+ cc.fr.WriteWindowUpdate(cs.ID, uint32(refund))
|
|
|
|
+ }
|
|
cc.bw.Flush()
|
|
cc.bw.Flush()
|
|
cc.wmu.Unlock()
|
|
cc.wmu.Unlock()
|
|
}
|
|
}
|
|
- didReset := cs.didReset
|
|
|
|
cc.mu.Unlock()
|
|
cc.mu.Unlock()
|
|
|
|
|
|
if len(data) > 0 && !didReset {
|
|
if len(data) > 0 && !didReset {
|
|
@@ -1805,6 +1964,8 @@ func (rl *clientConnReadLoop) processSettings(f *SettingsFrame) error {
|
|
cc.maxFrameSize = s.Val
|
|
cc.maxFrameSize = s.Val
|
|
case SettingMaxConcurrentStreams:
|
|
case SettingMaxConcurrentStreams:
|
|
cc.maxConcurrentStreams = s.Val
|
|
cc.maxConcurrentStreams = s.Val
|
|
|
|
+ case SettingMaxHeaderListSize:
|
|
|
|
+ cc.peerMaxHeaderListSize = uint64(s.Val)
|
|
case SettingInitialWindowSize:
|
|
case SettingInitialWindowSize:
|
|
// Values above the maximum flow-control
|
|
// Values above the maximum flow-control
|
|
// window size of 2^31-1 MUST be treated as a
|
|
// window size of 2^31-1 MUST be treated as a
|
|
@@ -1971,6 +2132,7 @@ func (cc *ClientConn) writeStreamReset(streamID uint32, code ErrCode, err error)
|
|
|
|
|
|
var (
|
|
var (
|
|
errResponseHeaderListSize = errors.New("http2: response header list larger than advertised limit")
|
|
errResponseHeaderListSize = errors.New("http2: response header list larger than advertised limit")
|
|
|
|
+ errRequestHeaderListSize = errors.New("http2: request header list larger than peer's advertised limit")
|
|
errPseudoTrailers = errors.New("http2: invalid pseudo header in trailers")
|
|
errPseudoTrailers = errors.New("http2: invalid pseudo header in trailers")
|
|
)
|
|
)
|
|
|
|
|