|
@@ -147,6 +147,12 @@ type Transport struct {
|
|
|
// waiting for their turn.
|
|
|
StrictMaxConcurrentStreams bool
|
|
|
|
|
|
+ // IdleConnTimeout is the maximum amount of time an idle
|
|
|
+ // (keep-alive) connection will remain idle before closing
|
|
|
+ // itself.
|
|
|
+ // Zero means no limit.
|
|
|
+ IdleConnTimeout time.Duration
|
|
|
+
|
|
|
// ReadIdleTimeout is the timeout after which a health check using ping
|
|
|
// frame will be carried out if no frame is received on the connection.
|
|
|
// Note that a ping response will is considered a received frame, so if
|
|
@@ -178,6 +184,8 @@ type Transport struct {
|
|
|
|
|
|
connPoolOnce sync.Once
|
|
|
connPoolOrDef ClientConnPool // non-nil version of ConnPool
|
|
|
+
|
|
|
+ syncHooks *testSyncHooks
|
|
|
}
|
|
|
|
|
|
func (t *Transport) maxHeaderListSize() uint32 {
|
|
@@ -302,7 +310,7 @@ type ClientConn struct {
|
|
|
readerErr error // set before readerDone is closed
|
|
|
|
|
|
idleTimeout time.Duration // or 0 for never
|
|
|
- idleTimer *time.Timer
|
|
|
+ idleTimer timer
|
|
|
|
|
|
mu sync.Mutex // guards following
|
|
|
cond *sync.Cond // hold mu; broadcast on flow/closed changes
|
|
@@ -344,6 +352,60 @@ type ClientConn struct {
|
|
|
werr error // first write error that has occurred
|
|
|
hbuf bytes.Buffer // HPACK encoder writes into this
|
|
|
henc *hpack.Encoder
|
|
|
+
|
|
|
+ syncHooks *testSyncHooks // can be nil
|
|
|
+}
|
|
|
+
|
|
|
+// Hook points used for testing.
|
|
|
+// Outside of tests, cc.syncHooks is nil and these all have minimal implementations.
|
|
|
+// Inside tests, see the testSyncHooks function docs.
|
|
|
+
|
|
|
+// goRun starts a new goroutine.
|
|
|
+func (cc *ClientConn) goRun(f func()) {
|
|
|
+ if cc.syncHooks != nil {
|
|
|
+ cc.syncHooks.goRun(f)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ go f()
|
|
|
+}
|
|
|
+
|
|
|
+// condBroadcast is cc.cond.Broadcast.
|
|
|
+func (cc *ClientConn) condBroadcast() {
|
|
|
+ if cc.syncHooks != nil {
|
|
|
+ cc.syncHooks.condBroadcast(cc.cond)
|
|
|
+ }
|
|
|
+ cc.cond.Broadcast()
|
|
|
+}
|
|
|
+
|
|
|
+// condWait is cc.cond.Wait.
|
|
|
+func (cc *ClientConn) condWait() {
|
|
|
+ if cc.syncHooks != nil {
|
|
|
+ cc.syncHooks.condWait(cc.cond)
|
|
|
+ }
|
|
|
+ cc.cond.Wait()
|
|
|
+}
|
|
|
+
|
|
|
+// newTimer creates a new time.Timer, or a synthetic timer in tests.
|
|
|
+func (cc *ClientConn) newTimer(d time.Duration) timer {
|
|
|
+ if cc.syncHooks != nil {
|
|
|
+ return cc.syncHooks.newTimer(d)
|
|
|
+ }
|
|
|
+ return newTimeTimer(d)
|
|
|
+}
|
|
|
+
|
|
|
+// afterFunc creates a new time.AfterFunc timer, or a synthetic timer in tests.
|
|
|
+func (cc *ClientConn) afterFunc(d time.Duration, f func()) timer {
|
|
|
+ if cc.syncHooks != nil {
|
|
|
+ return cc.syncHooks.afterFunc(d, f)
|
|
|
+ }
|
|
|
+ return newTimeAfterFunc(d, f)
|
|
|
+}
|
|
|
+
|
|
|
+func (cc *ClientConn) contextWithTimeout(ctx context.Context, d time.Duration) (context.Context, context.CancelFunc) {
|
|
|
+ if cc.syncHooks != nil {
|
|
|
+ return cc.syncHooks.contextWithTimeout(ctx, d)
|
|
|
+ }
|
|
|
+ return context.WithTimeout(ctx, d)
|
|
|
}
|
|
|
|
|
|
// clientStream is the state for a single HTTP/2 stream. One of these
|
|
@@ -425,7 +487,7 @@ func (cs *clientStream) abortStreamLocked(err error) {
|
|
|
// TODO(dneil): Clean up tests where cs.cc.cond is nil.
|
|
|
if cs.cc.cond != nil {
|
|
|
// Wake up writeRequestBody if it is waiting on flow control.
|
|
|
- cs.cc.cond.Broadcast()
|
|
|
+ cs.cc.condBroadcast()
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -435,7 +497,7 @@ func (cs *clientStream) abortRequestBodyWrite() {
|
|
|
defer cc.mu.Unlock()
|
|
|
if cs.reqBody != nil && cs.reqBodyClosed == nil {
|
|
|
cs.closeReqBodyLocked()
|
|
|
- cc.cond.Broadcast()
|
|
|
+ cc.condBroadcast()
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -445,10 +507,10 @@ func (cs *clientStream) closeReqBodyLocked() {
|
|
|
}
|
|
|
cs.reqBodyClosed = make(chan struct{})
|
|
|
reqBodyClosed := cs.reqBodyClosed
|
|
|
- go func() {
|
|
|
+ cs.cc.goRun(func() {
|
|
|
cs.reqBody.Close()
|
|
|
close(reqBodyClosed)
|
|
|
- }()
|
|
|
+ })
|
|
|
}
|
|
|
|
|
|
type stickyErrWriter struct {
|
|
@@ -537,15 +599,6 @@ func authorityAddr(scheme string, authority string) (addr string) {
|
|
|
return net.JoinHostPort(host, port)
|
|
|
}
|
|
|
|
|
|
-var retryBackoffHook func(time.Duration) *time.Timer
|
|
|
-
|
|
|
-func backoffNewTimer(d time.Duration) *time.Timer {
|
|
|
- if retryBackoffHook != nil {
|
|
|
- return retryBackoffHook(d)
|
|
|
- }
|
|
|
- return time.NewTimer(d)
|
|
|
-}
|
|
|
-
|
|
|
// RoundTripOpt is like RoundTrip, but takes options.
|
|
|
func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Response, error) {
|
|
|
if !(req.URL.Scheme == "https" || (req.URL.Scheme == "http" && t.AllowHTTP)) {
|
|
@@ -573,13 +626,27 @@ func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Res
|
|
|
backoff := float64(uint(1) << (uint(retry) - 1))
|
|
|
backoff += backoff * (0.1 * mathrand.Float64())
|
|
|
d := time.Second * time.Duration(backoff)
|
|
|
- timer := backoffNewTimer(d)
|
|
|
+ var tm timer
|
|
|
+ if t.syncHooks != nil {
|
|
|
+ tm = t.syncHooks.newTimer(d)
|
|
|
+ t.syncHooks.blockUntil(func() bool {
|
|
|
+ select {
|
|
|
+ case <-tm.C():
|
|
|
+ case <-req.Context().Done():
|
|
|
+ default:
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ return true
|
|
|
+ })
|
|
|
+ } else {
|
|
|
+ tm = newTimeTimer(d)
|
|
|
+ }
|
|
|
select {
|
|
|
- case <-timer.C:
|
|
|
+ case <-tm.C():
|
|
|
t.vlogf("RoundTrip retrying after failure: %v", roundTripErr)
|
|
|
continue
|
|
|
case <-req.Context().Done():
|
|
|
- timer.Stop()
|
|
|
+ tm.Stop()
|
|
|
err = req.Context().Err()
|
|
|
}
|
|
|
}
|
|
@@ -658,6 +725,9 @@ func canRetryError(err error) bool {
|
|
|
}
|
|
|
|
|
|
func (t *Transport) dialClientConn(ctx context.Context, addr string, singleUse bool) (*ClientConn, error) {
|
|
|
+ if t.syncHooks != nil {
|
|
|
+ return t.newClientConn(nil, singleUse, t.syncHooks)
|
|
|
+ }
|
|
|
host, _, err := net.SplitHostPort(addr)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
@@ -666,7 +736,7 @@ func (t *Transport) dialClientConn(ctx context.Context, addr string, singleUse b
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
- return t.newClientConn(tconn, singleUse)
|
|
|
+ return t.newClientConn(tconn, singleUse, nil)
|
|
|
}
|
|
|
|
|
|
func (t *Transport) newTLSConfig(host string) *tls.Config {
|
|
@@ -732,10 +802,10 @@ func (t *Transport) maxEncoderHeaderTableSize() uint32 {
|
|
|
}
|
|
|
|
|
|
func (t *Transport) NewClientConn(c net.Conn) (*ClientConn, error) {
|
|
|
- return t.newClientConn(c, t.disableKeepAlives())
|
|
|
+ return t.newClientConn(c, t.disableKeepAlives(), nil)
|
|
|
}
|
|
|
|
|
|
-func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, error) {
|
|
|
+func (t *Transport) newClientConn(c net.Conn, singleUse bool, hooks *testSyncHooks) (*ClientConn, error) {
|
|
|
cc := &ClientConn{
|
|
|
t: t,
|
|
|
tconn: c,
|
|
@@ -750,10 +820,15 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
|
|
|
wantSettingsAck: true,
|
|
|
pings: make(map[[8]byte]chan struct{}),
|
|
|
reqHeaderMu: make(chan struct{}, 1),
|
|
|
+ syncHooks: hooks,
|
|
|
+ }
|
|
|
+ if hooks != nil {
|
|
|
+ hooks.newclientconn(cc)
|
|
|
+ c = cc.tconn
|
|
|
}
|
|
|
if d := t.idleConnTimeout(); d != 0 {
|
|
|
cc.idleTimeout = d
|
|
|
- cc.idleTimer = time.AfterFunc(d, cc.onIdleTimeout)
|
|
|
+ cc.idleTimer = cc.afterFunc(d, cc.onIdleTimeout)
|
|
|
}
|
|
|
if VerboseLogs {
|
|
|
t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr())
|
|
@@ -818,7 +893,7 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
|
|
|
return nil, cc.werr
|
|
|
}
|
|
|
|
|
|
- go cc.readLoop()
|
|
|
+ cc.goRun(cc.readLoop)
|
|
|
return cc, nil
|
|
|
}
|
|
|
|
|
@@ -826,7 +901,7 @@ func (cc *ClientConn) healthCheck() {
|
|
|
pingTimeout := cc.t.pingTimeout()
|
|
|
// We don't need to periodically ping in the health check, because the readLoop of ClientConn will
|
|
|
// trigger the healthCheck again if there is no frame received.
|
|
|
- ctx, cancel := context.WithTimeout(context.Background(), pingTimeout)
|
|
|
+ ctx, cancel := cc.contextWithTimeout(context.Background(), pingTimeout)
|
|
|
defer cancel()
|
|
|
cc.vlogf("http2: Transport sending health check")
|
|
|
err := cc.Ping(ctx)
|
|
@@ -1056,7 +1131,7 @@ func (cc *ClientConn) Shutdown(ctx context.Context) error {
|
|
|
// Wait for all in-flight streams to complete or connection to close
|
|
|
done := make(chan struct{})
|
|
|
cancelled := false // guarded by cc.mu
|
|
|
- go func() {
|
|
|
+ cc.goRun(func() {
|
|
|
cc.mu.Lock()
|
|
|
defer cc.mu.Unlock()
|
|
|
for {
|
|
@@ -1068,9 +1143,9 @@ func (cc *ClientConn) Shutdown(ctx context.Context) error {
|
|
|
if cancelled {
|
|
|
break
|
|
|
}
|
|
|
- cc.cond.Wait()
|
|
|
+ cc.condWait()
|
|
|
}
|
|
|
- }()
|
|
|
+ })
|
|
|
shutdownEnterWaitStateHook()
|
|
|
select {
|
|
|
case <-done:
|
|
@@ -1080,7 +1155,7 @@ func (cc *ClientConn) Shutdown(ctx context.Context) error {
|
|
|
cc.mu.Lock()
|
|
|
// Free the goroutine above
|
|
|
cancelled = true
|
|
|
- cc.cond.Broadcast()
|
|
|
+ cc.condBroadcast()
|
|
|
cc.mu.Unlock()
|
|
|
return ctx.Err()
|
|
|
}
|
|
@@ -1118,7 +1193,7 @@ func (cc *ClientConn) closeForError(err error) {
|
|
|
for _, cs := range cc.streams {
|
|
|
cs.abortStreamLocked(err)
|
|
|
}
|
|
|
- cc.cond.Broadcast()
|
|
|
+ cc.condBroadcast()
|
|
|
cc.mu.Unlock()
|
|
|
cc.closeConn()
|
|
|
}
|
|
@@ -1215,6 +1290,10 @@ func (cc *ClientConn) decrStreamReservationsLocked() {
|
|
|
}
|
|
|
|
|
|
func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
|
|
|
+ return cc.roundTrip(req, nil)
|
|
|
+}
|
|
|
+
|
|
|
+func (cc *ClientConn) roundTrip(req *http.Request, streamf func(*clientStream)) (*http.Response, error) {
|
|
|
ctx := req.Context()
|
|
|
cs := &clientStream{
|
|
|
cc: cc,
|
|
@@ -1229,9 +1308,23 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
|
|
|
respHeaderRecv: make(chan struct{}),
|
|
|
donec: make(chan struct{}),
|
|
|
}
|
|
|
- go cs.doRequest(req)
|
|
|
+ cc.goRun(func() {
|
|
|
+ cs.doRequest(req)
|
|
|
+ })
|
|
|
|
|
|
waitDone := func() error {
|
|
|
+ if cc.syncHooks != nil {
|
|
|
+ cc.syncHooks.blockUntil(func() bool {
|
|
|
+ select {
|
|
|
+ case <-cs.donec:
|
|
|
+ case <-ctx.Done():
|
|
|
+ case <-cs.reqCancel:
|
|
|
+ default:
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ return true
|
|
|
+ })
|
|
|
+ }
|
|
|
select {
|
|
|
case <-cs.donec:
|
|
|
return nil
|
|
@@ -1292,7 +1385,24 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
+ if streamf != nil {
|
|
|
+ streamf(cs)
|
|
|
+ }
|
|
|
+
|
|
|
for {
|
|
|
+ if cc.syncHooks != nil {
|
|
|
+ cc.syncHooks.blockUntil(func() bool {
|
|
|
+ select {
|
|
|
+ case <-cs.respHeaderRecv:
|
|
|
+ case <-cs.abort:
|
|
|
+ case <-ctx.Done():
|
|
|
+ case <-cs.reqCancel:
|
|
|
+ default:
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ return true
|
|
|
+ })
|
|
|
+ }
|
|
|
select {
|
|
|
case <-cs.respHeaderRecv:
|
|
|
return handleResponseHeaders()
|
|
@@ -1348,6 +1458,21 @@ func (cs *clientStream) writeRequest(req *http.Request) (err error) {
|
|
|
if cc.reqHeaderMu == nil {
|
|
|
panic("RoundTrip on uninitialized ClientConn") // for tests
|
|
|
}
|
|
|
+ var newStreamHook func(*clientStream)
|
|
|
+ if cc.syncHooks != nil {
|
|
|
+ newStreamHook = cc.syncHooks.newstream
|
|
|
+ cc.syncHooks.blockUntil(func() bool {
|
|
|
+ select {
|
|
|
+ case cc.reqHeaderMu <- struct{}{}:
|
|
|
+ <-cc.reqHeaderMu
|
|
|
+ case <-cs.reqCancel:
|
|
|
+ case <-ctx.Done():
|
|
|
+ default:
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ return true
|
|
|
+ })
|
|
|
+ }
|
|
|
select {
|
|
|
case cc.reqHeaderMu <- struct{}{}:
|
|
|
case <-cs.reqCancel:
|
|
@@ -1372,6 +1497,10 @@ func (cs *clientStream) writeRequest(req *http.Request) (err error) {
|
|
|
}
|
|
|
cc.mu.Unlock()
|
|
|
|
|
|
+ if newStreamHook != nil {
|
|
|
+ newStreamHook(cs)
|
|
|
+ }
|
|
|
+
|
|
|
// TODO(bradfitz): this is a copy of the logic in net/http. Unify somewhere?
|
|
|
if !cc.t.disableCompression() &&
|
|
|
req.Header.Get("Accept-Encoding") == "" &&
|
|
@@ -1452,15 +1581,30 @@ func (cs *clientStream) writeRequest(req *http.Request) (err error) {
|
|
|
var respHeaderTimer <-chan time.Time
|
|
|
var respHeaderRecv chan struct{}
|
|
|
if d := cc.responseHeaderTimeout(); d != 0 {
|
|
|
- timer := time.NewTimer(d)
|
|
|
+ timer := cc.newTimer(d)
|
|
|
defer timer.Stop()
|
|
|
- respHeaderTimer = timer.C
|
|
|
+ respHeaderTimer = timer.C()
|
|
|
respHeaderRecv = cs.respHeaderRecv
|
|
|
}
|
|
|
// Wait until the peer half-closes its end of the stream,
|
|
|
// or until the request is aborted (via context, error, or otherwise),
|
|
|
// whichever comes first.
|
|
|
for {
|
|
|
+ if cc.syncHooks != nil {
|
|
|
+ cc.syncHooks.blockUntil(func() bool {
|
|
|
+ select {
|
|
|
+ case <-cs.peerClosed:
|
|
|
+ case <-respHeaderTimer:
|
|
|
+ case <-respHeaderRecv:
|
|
|
+ case <-cs.abort:
|
|
|
+ case <-ctx.Done():
|
|
|
+ case <-cs.reqCancel:
|
|
|
+ default:
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ return true
|
|
|
+ })
|
|
|
+ }
|
|
|
select {
|
|
|
case <-cs.peerClosed:
|
|
|
return nil
|
|
@@ -1609,7 +1753,7 @@ func (cc *ClientConn) awaitOpenSlotForStreamLocked(cs *clientStream) error {
|
|
|
return nil
|
|
|
}
|
|
|
cc.pendingRequests++
|
|
|
- cc.cond.Wait()
|
|
|
+ cc.condWait()
|
|
|
cc.pendingRequests--
|
|
|
select {
|
|
|
case <-cs.abort:
|
|
@@ -1871,8 +2015,24 @@ func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error)
|
|
|
cs.flow.take(take)
|
|
|
return take, nil
|
|
|
}
|
|
|
- cc.cond.Wait()
|
|
|
+ cc.condWait()
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func validateHeaders(hdrs http.Header) string {
|
|
|
+ for k, vv := range hdrs {
|
|
|
+ if !httpguts.ValidHeaderFieldName(k) {
|
|
|
+ return fmt.Sprintf("name %q", k)
|
|
|
+ }
|
|
|
+ for _, v := range vv {
|
|
|
+ if !httpguts.ValidHeaderFieldValue(v) {
|
|
|
+ // Don't include the value in the error,
|
|
|
+ // because it may be sensitive.
|
|
|
+ return fmt.Sprintf("value for header %q", k)
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
+ return ""
|
|
|
}
|
|
|
|
|
|
var errNilRequestURL = errors.New("http2: Request.URI is nil")
|
|
@@ -1912,19 +2072,14 @@ func (cc *ClientConn) encodeHeaders(req *http.Request, addGzipHeader bool, trail
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // Check for any invalid headers and return an error before we
|
|
|
+ // Check for any invalid headers+trailers and return an error before we
|
|
|
// potentially pollute our hpack state. (We want to be able to
|
|
|
// continue to reuse the hpack encoder for future requests)
|
|
|
- for k, vv := range req.Header {
|
|
|
- if !httpguts.ValidHeaderFieldName(k) {
|
|
|
- return nil, fmt.Errorf("invalid HTTP header name %q", k)
|
|
|
- }
|
|
|
- for _, v := range vv {
|
|
|
- if !httpguts.ValidHeaderFieldValue(v) {
|
|
|
- // Don't include the value in the error, because it may be sensitive.
|
|
|
- return nil, fmt.Errorf("invalid HTTP header value for header %q", k)
|
|
|
- }
|
|
|
- }
|
|
|
+ if err := validateHeaders(req.Header); err != "" {
|
|
|
+ return nil, fmt.Errorf("invalid HTTP header %s", err)
|
|
|
+ }
|
|
|
+ if err := validateHeaders(req.Trailer); err != "" {
|
|
|
+ return nil, fmt.Errorf("invalid HTTP trailer %s", err)
|
|
|
}
|
|
|
|
|
|
enumerateHeaders := func(f func(name, value string)) {
|
|
@@ -2143,7 +2298,7 @@ func (cc *ClientConn) forgetStreamID(id uint32) {
|
|
|
}
|
|
|
// Wake up writeRequestBody via clientStream.awaitFlowControl and
|
|
|
// wake up RoundTrip if there is a pending request.
|
|
|
- cc.cond.Broadcast()
|
|
|
+ cc.condBroadcast()
|
|
|
|
|
|
closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() || cc.goAway != nil
|
|
|
if closeOnIdle && cc.streamsReserved == 0 && len(cc.streams) == 0 {
|
|
@@ -2231,7 +2386,7 @@ func (rl *clientConnReadLoop) cleanup() {
|
|
|
cs.abortStreamLocked(err)
|
|
|
}
|
|
|
}
|
|
|
- cc.cond.Broadcast()
|
|
|
+ cc.condBroadcast()
|
|
|
cc.mu.Unlock()
|
|
|
}
|
|
|
|
|
@@ -2266,10 +2421,9 @@ func (rl *clientConnReadLoop) run() error {
|
|
|
cc := rl.cc
|
|
|
gotSettings := false
|
|
|
readIdleTimeout := cc.t.ReadIdleTimeout
|
|
|
- var t *time.Timer
|
|
|
+ var t timer
|
|
|
if readIdleTimeout != 0 {
|
|
|
- t = time.AfterFunc(readIdleTimeout, cc.healthCheck)
|
|
|
- defer t.Stop()
|
|
|
+ t = cc.afterFunc(readIdleTimeout, cc.healthCheck)
|
|
|
}
|
|
|
for {
|
|
|
f, err := cc.fr.ReadFrame()
|
|
@@ -2684,7 +2838,7 @@ func (rl *clientConnReadLoop) processData(f *DataFrame) error {
|
|
|
})
|
|
|
return nil
|
|
|
}
|
|
|
- if !cs.firstByte {
|
|
|
+ if !cs.pastHeaders {
|
|
|
cc.logf("protocol error: received DATA before a HEADERS frame")
|
|
|
rl.endStreamError(cs, StreamError{
|
|
|
StreamID: f.StreamID,
|
|
@@ -2867,7 +3021,7 @@ func (rl *clientConnReadLoop) processSettingsNoWrite(f *SettingsFrame) error {
|
|
|
for _, cs := range cc.streams {
|
|
|
cs.flow.add(delta)
|
|
|
}
|
|
|
- cc.cond.Broadcast()
|
|
|
+ cc.condBroadcast()
|
|
|
|
|
|
cc.initialWindowSize = s.Val
|
|
|
case SettingHeaderTableSize:
|
|
@@ -2911,9 +3065,18 @@ func (rl *clientConnReadLoop) processWindowUpdate(f *WindowUpdateFrame) error {
|
|
|
fl = &cs.flow
|
|
|
}
|
|
|
if !fl.add(int32(f.Increment)) {
|
|
|
+ // For stream, the sender sends RST_STREAM with an error code of FLOW_CONTROL_ERROR
|
|
|
+ if cs != nil {
|
|
|
+ rl.endStreamError(cs, StreamError{
|
|
|
+ StreamID: f.StreamID,
|
|
|
+ Code: ErrCodeFlowControl,
|
|
|
+ })
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
return ConnectionError(ErrCodeFlowControl)
|
|
|
}
|
|
|
- cc.cond.Broadcast()
|
|
|
+ cc.condBroadcast()
|
|
|
return nil
|
|
|
}
|
|
|
|
|
@@ -2955,24 +3118,38 @@ func (cc *ClientConn) Ping(ctx context.Context) error {
|
|
|
}
|
|
|
cc.mu.Unlock()
|
|
|
}
|
|
|
- errc := make(chan error, 1)
|
|
|
- go func() {
|
|
|
+ var pingError error
|
|
|
+ errc := make(chan struct{})
|
|
|
+ cc.goRun(func() {
|
|
|
cc.wmu.Lock()
|
|
|
defer cc.wmu.Unlock()
|
|
|
- if err := cc.fr.WritePing(false, p); err != nil {
|
|
|
- errc <- err
|
|
|
+ if pingError = cc.fr.WritePing(false, p); pingError != nil {
|
|
|
+ close(errc)
|
|
|
return
|
|
|
}
|
|
|
- if err := cc.bw.Flush(); err != nil {
|
|
|
- errc <- err
|
|
|
+ if pingError = cc.bw.Flush(); pingError != nil {
|
|
|
+ close(errc)
|
|
|
return
|
|
|
}
|
|
|
- }()
|
|
|
+ })
|
|
|
+ if cc.syncHooks != nil {
|
|
|
+ cc.syncHooks.blockUntil(func() bool {
|
|
|
+ select {
|
|
|
+ case <-c:
|
|
|
+ case <-errc:
|
|
|
+ case <-ctx.Done():
|
|
|
+ case <-cc.readerDone:
|
|
|
+ default:
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ return true
|
|
|
+ })
|
|
|
+ }
|
|
|
select {
|
|
|
case <-c:
|
|
|
return nil
|
|
|
- case err := <-errc:
|
|
|
- return err
|
|
|
+ case <-errc:
|
|
|
+ return pingError
|
|
|
case <-ctx.Done():
|
|
|
return ctx.Err()
|
|
|
case <-cc.readerDone:
|
|
@@ -3141,9 +3318,17 @@ func (rt noDialH2RoundTripper) RoundTrip(req *http.Request) (*http.Response, err
|
|
|
}
|
|
|
|
|
|
func (t *Transport) idleConnTimeout() time.Duration {
|
|
|
+ // to keep things backwards compatible, we use non-zero values of
|
|
|
+ // IdleConnTimeout, followed by using the IdleConnTimeout on the underlying
|
|
|
+ // http1 transport, followed by 0
|
|
|
+ if t.IdleConnTimeout != 0 {
|
|
|
+ return t.IdleConnTimeout
|
|
|
+ }
|
|
|
+
|
|
|
if t.t1 != nil {
|
|
|
return t.t1.IdleConnTimeout
|
|
|
}
|
|
|
+
|
|
|
return 0
|
|
|
}
|
|
|
|