|
@@ -258,7 +258,8 @@ func (t *Transport) initConnPool() {
|
|
// HTTP/2 server.
|
|
// HTTP/2 server.
|
|
type ClientConn struct {
|
|
type ClientConn struct {
|
|
t *Transport
|
|
t *Transport
|
|
- tconn net.Conn // usually *tls.Conn, except specialized impls
|
|
|
|
|
|
+ tconn net.Conn // usually *tls.Conn, except specialized impls
|
|
|
|
+ tconnClosed bool
|
|
tlsState *tls.ConnectionState // nil only for specialized impls
|
|
tlsState *tls.ConnectionState // nil only for specialized impls
|
|
reused uint32 // whether conn is being reused; atomic
|
|
reused uint32 // whether conn is being reused; atomic
|
|
singleUse bool // whether being used for a single http.Request
|
|
singleUse bool // whether being used for a single http.Request
|
|
@@ -344,8 +345,8 @@ type clientStream struct {
|
|
readErr error // sticky read error; owned by transportResponseBody.Read
|
|
readErr error // sticky read error; owned by transportResponseBody.Read
|
|
|
|
|
|
reqBody io.ReadCloser
|
|
reqBody io.ReadCloser
|
|
- reqBodyContentLength int64 // -1 means unknown
|
|
|
|
- reqBodyClosed bool // body has been closed; guarded by cc.mu
|
|
|
|
|
|
+ reqBodyContentLength int64 // -1 means unknown
|
|
|
|
+ reqBodyClosed chan struct{} // guarded by cc.mu; non-nil on Close, closed when done
|
|
|
|
|
|
// owned by writeRequest:
|
|
// owned by writeRequest:
|
|
sentEndStream bool // sent an END_STREAM flag to the peer
|
|
sentEndStream bool // sent an END_STREAM flag to the peer
|
|
@@ -385,9 +386,8 @@ func (cs *clientStream) abortStreamLocked(err error) {
|
|
cs.abortErr = err
|
|
cs.abortErr = err
|
|
close(cs.abort)
|
|
close(cs.abort)
|
|
})
|
|
})
|
|
- if cs.reqBody != nil && !cs.reqBodyClosed {
|
|
|
|
- cs.reqBody.Close()
|
|
|
|
- cs.reqBodyClosed = true
|
|
|
|
|
|
+ if cs.reqBody != nil {
|
|
|
|
+ cs.closeReqBodyLocked()
|
|
}
|
|
}
|
|
// TODO(dneil): Clean up tests where cs.cc.cond is nil.
|
|
// TODO(dneil): Clean up tests where cs.cc.cond is nil.
|
|
if cs.cc.cond != nil {
|
|
if cs.cc.cond != nil {
|
|
@@ -400,13 +400,24 @@ func (cs *clientStream) abortRequestBodyWrite() {
|
|
cc := cs.cc
|
|
cc := cs.cc
|
|
cc.mu.Lock()
|
|
cc.mu.Lock()
|
|
defer cc.mu.Unlock()
|
|
defer cc.mu.Unlock()
|
|
- if cs.reqBody != nil && !cs.reqBodyClosed {
|
|
|
|
- cs.reqBody.Close()
|
|
|
|
- cs.reqBodyClosed = true
|
|
|
|
|
|
+ if cs.reqBody != nil && cs.reqBodyClosed == nil {
|
|
|
|
+ cs.closeReqBodyLocked()
|
|
cc.cond.Broadcast()
|
|
cc.cond.Broadcast()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+func (cs *clientStream) closeReqBodyLocked() {
|
|
|
|
+ if cs.reqBodyClosed != nil {
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+ cs.reqBodyClosed = make(chan struct{})
|
|
|
|
+ reqBodyClosed := cs.reqBodyClosed
|
|
|
|
+ go func() {
|
|
|
|
+ cs.reqBody.Close()
|
|
|
|
+ close(reqBodyClosed)
|
|
|
|
+ }()
|
|
|
|
+}
|
|
|
|
+
|
|
type stickyErrWriter struct {
|
|
type stickyErrWriter struct {
|
|
conn net.Conn
|
|
conn net.Conn
|
|
timeout time.Duration
|
|
timeout time.Duration
|
|
@@ -921,10 +932,10 @@ func (cc *ClientConn) onIdleTimeout() {
|
|
cc.closeIfIdle()
|
|
cc.closeIfIdle()
|
|
}
|
|
}
|
|
|
|
|
|
-func (cc *ClientConn) closeConn() error {
|
|
|
|
|
|
+func (cc *ClientConn) closeConn() {
|
|
t := time.AfterFunc(250*time.Millisecond, cc.forceCloseConn)
|
|
t := time.AfterFunc(250*time.Millisecond, cc.forceCloseConn)
|
|
defer t.Stop()
|
|
defer t.Stop()
|
|
- return cc.tconn.Close()
|
|
|
|
|
|
+ cc.tconn.Close()
|
|
}
|
|
}
|
|
|
|
|
|
// A tls.Conn.Close can hang for a long time if the peer is unresponsive.
|
|
// A tls.Conn.Close can hang for a long time if the peer is unresponsive.
|
|
@@ -990,7 +1001,8 @@ func (cc *ClientConn) Shutdown(ctx context.Context) error {
|
|
shutdownEnterWaitStateHook()
|
|
shutdownEnterWaitStateHook()
|
|
select {
|
|
select {
|
|
case <-done:
|
|
case <-done:
|
|
- return cc.closeConn()
|
|
|
|
|
|
+ cc.closeConn()
|
|
|
|
+ return nil
|
|
case <-ctx.Done():
|
|
case <-ctx.Done():
|
|
cc.mu.Lock()
|
|
cc.mu.Lock()
|
|
// Free the goroutine above
|
|
// Free the goroutine above
|
|
@@ -1027,7 +1039,7 @@ func (cc *ClientConn) sendGoAway() error {
|
|
|
|
|
|
// closes the client connection immediately. In-flight requests are interrupted.
|
|
// closes the client connection immediately. In-flight requests are interrupted.
|
|
// err is sent to streams.
|
|
// err is sent to streams.
|
|
-func (cc *ClientConn) closeForError(err error) error {
|
|
|
|
|
|
+func (cc *ClientConn) closeForError(err error) {
|
|
cc.mu.Lock()
|
|
cc.mu.Lock()
|
|
cc.closed = true
|
|
cc.closed = true
|
|
for _, cs := range cc.streams {
|
|
for _, cs := range cc.streams {
|
|
@@ -1035,7 +1047,7 @@ func (cc *ClientConn) closeForError(err error) error {
|
|
}
|
|
}
|
|
cc.cond.Broadcast()
|
|
cc.cond.Broadcast()
|
|
cc.mu.Unlock()
|
|
cc.mu.Unlock()
|
|
- return cc.closeConn()
|
|
|
|
|
|
+ cc.closeConn()
|
|
}
|
|
}
|
|
|
|
|
|
// Close closes the client connection immediately.
|
|
// Close closes the client connection immediately.
|
|
@@ -1043,16 +1055,17 @@ func (cc *ClientConn) closeForError(err error) error {
|
|
// In-flight requests are interrupted. For a graceful shutdown, use Shutdown instead.
|
|
// In-flight requests are interrupted. For a graceful shutdown, use Shutdown instead.
|
|
func (cc *ClientConn) Close() error {
|
|
func (cc *ClientConn) Close() error {
|
|
err := errors.New("http2: client connection force closed via ClientConn.Close")
|
|
err := errors.New("http2: client connection force closed via ClientConn.Close")
|
|
- return cc.closeForError(err)
|
|
|
|
|
|
+ cc.closeForError(err)
|
|
|
|
+ return nil
|
|
}
|
|
}
|
|
|
|
|
|
// closes the client connection immediately. In-flight requests are interrupted.
|
|
// closes the client connection immediately. In-flight requests are interrupted.
|
|
-func (cc *ClientConn) closeForLostPing() error {
|
|
|
|
|
|
+func (cc *ClientConn) closeForLostPing() {
|
|
err := errors.New("http2: client connection lost")
|
|
err := errors.New("http2: client connection lost")
|
|
if f := cc.t.CountError; f != nil {
|
|
if f := cc.t.CountError; f != nil {
|
|
f("conn_close_lost_ping")
|
|
f("conn_close_lost_ping")
|
|
}
|
|
}
|
|
- return cc.closeForError(err)
|
|
|
|
|
|
+ cc.closeForError(err)
|
|
}
|
|
}
|
|
|
|
|
|
// errRequestCanceled is a copy of net/http's errRequestCanceled because it's not
|
|
// errRequestCanceled is a copy of net/http's errRequestCanceled because it's not
|
|
@@ -1430,11 +1443,19 @@ func (cs *clientStream) cleanupWriteRequest(err error) {
|
|
// and in multiple cases: server replies <=299 and >299
|
|
// and in multiple cases: server replies <=299 and >299
|
|
// while still writing request body
|
|
// while still writing request body
|
|
cc.mu.Lock()
|
|
cc.mu.Lock()
|
|
|
|
+ mustCloseBody := false
|
|
|
|
+ if cs.reqBody != nil && cs.reqBodyClosed == nil {
|
|
|
|
+ mustCloseBody = true
|
|
|
|
+ cs.reqBodyClosed = make(chan struct{})
|
|
|
|
+ }
|
|
bodyClosed := cs.reqBodyClosed
|
|
bodyClosed := cs.reqBodyClosed
|
|
- cs.reqBodyClosed = true
|
|
|
|
cc.mu.Unlock()
|
|
cc.mu.Unlock()
|
|
- if !bodyClosed && cs.reqBody != nil {
|
|
|
|
|
|
+ if mustCloseBody {
|
|
cs.reqBody.Close()
|
|
cs.reqBody.Close()
|
|
|
|
+ close(bodyClosed)
|
|
|
|
+ }
|
|
|
|
+ if bodyClosed != nil {
|
|
|
|
+ <-bodyClosed
|
|
}
|
|
}
|
|
|
|
|
|
if err != nil && cs.sentEndStream {
|
|
if err != nil && cs.sentEndStream {
|
|
@@ -1614,7 +1635,7 @@ func (cs *clientStream) writeRequestBody(req *http.Request) (err error) {
|
|
}
|
|
}
|
|
if err != nil {
|
|
if err != nil {
|
|
cc.mu.Lock()
|
|
cc.mu.Lock()
|
|
- bodyClosed := cs.reqBodyClosed
|
|
|
|
|
|
+ bodyClosed := cs.reqBodyClosed != nil
|
|
cc.mu.Unlock()
|
|
cc.mu.Unlock()
|
|
switch {
|
|
switch {
|
|
case bodyClosed:
|
|
case bodyClosed:
|
|
@@ -1709,7 +1730,7 @@ func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error)
|
|
if cc.closed {
|
|
if cc.closed {
|
|
return 0, errClientConnClosed
|
|
return 0, errClientConnClosed
|
|
}
|
|
}
|
|
- if cs.reqBodyClosed {
|
|
|
|
|
|
+ if cs.reqBodyClosed != nil {
|
|
return 0, errStopReqBodyWrite
|
|
return 0, errStopReqBodyWrite
|
|
}
|
|
}
|
|
select {
|
|
select {
|
|
@@ -2005,7 +2026,7 @@ func (cc *ClientConn) forgetStreamID(id uint32) {
|
|
// wake up RoundTrip if there is a pending request.
|
|
// wake up RoundTrip if there is a pending request.
|
|
cc.cond.Broadcast()
|
|
cc.cond.Broadcast()
|
|
|
|
|
|
- closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives()
|
|
|
|
|
|
+ closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() || cc.goAway != nil
|
|
if closeOnIdle && cc.streamsReserved == 0 && len(cc.streams) == 0 {
|
|
if closeOnIdle && cc.streamsReserved == 0 && len(cc.streams) == 0 {
|
|
if VerboseLogs {
|
|
if VerboseLogs {
|
|
cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, cc.nextStreamID-2)
|
|
cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, cc.nextStreamID-2)
|
|
@@ -2081,6 +2102,7 @@ func (rl *clientConnReadLoop) cleanup() {
|
|
err = io.ErrUnexpectedEOF
|
|
err = io.ErrUnexpectedEOF
|
|
}
|
|
}
|
|
cc.closed = true
|
|
cc.closed = true
|
|
|
|
+
|
|
for _, cs := range cc.streams {
|
|
for _, cs := range cc.streams {
|
|
select {
|
|
select {
|
|
case <-cs.peerClosed:
|
|
case <-cs.peerClosed:
|
|
@@ -2674,7 +2696,6 @@ func (rl *clientConnReadLoop) processGoAway(f *GoAwayFrame) error {
|
|
if fn := cc.t.CountError; fn != nil {
|
|
if fn := cc.t.CountError; fn != nil {
|
|
fn("recv_goaway_" + f.ErrCode.stringToken())
|
|
fn("recv_goaway_" + f.ErrCode.stringToken())
|
|
}
|
|
}
|
|
-
|
|
|
|
}
|
|
}
|
|
cc.setGoAway(f)
|
|
cc.setGoAway(f)
|
|
return nil
|
|
return nil
|
|
@@ -3028,7 +3049,7 @@ func traceGotConn(req *http.Request, cc *ClientConn, reused bool) {
|
|
cc.mu.Lock()
|
|
cc.mu.Lock()
|
|
ci.WasIdle = len(cc.streams) == 0 && reused
|
|
ci.WasIdle = len(cc.streams) == 0 && reused
|
|
if ci.WasIdle && !cc.lastActive.IsZero() {
|
|
if ci.WasIdle && !cc.lastActive.IsZero() {
|
|
- ci.IdleTime = time.Now().Sub(cc.lastActive)
|
|
|
|
|
|
+ ci.IdleTime = time.Since(cc.lastActive)
|
|
}
|
|
}
|
|
cc.mu.Unlock()
|
|
cc.mu.Unlock()
|
|
|
|
|