From f35fec92ec9213ee211cf45f451a5970386f7978 Mon Sep 17 00:00:00 2001 From: Damien Neil Date: Wed, 2 Oct 2024 16:45:27 -0700 Subject: [PATCH] http2: detect hung client connections by confirming stream resets Consider the case of an unresponsive client connection, where the server has stopped responding. We send an infinite sequence of requests to the connection in sequence, each with a timeout. Each request counts against the concurrency limit for the connection while active, but when a request times out we send a RST_STREAM and free up the concurrency slot it was using. We continue to try to send requests to the connection forever (or until the kernel closes the underlying TCP connection, or until ReadIdleTimeout/WriteByteTimeout results in us closing the connection). Defend against this scenario by counting a canceled request against the connection concurrency limit until we confirm the server is responding. Specifically: Track the number of in-flight request cancellations in cc.pendingResets. This total counts against the connection concurrency limit. When sending a RST_STREAM for a canceled request, increment cc.pendingResets. Send a PING frame to the server, unless a PING is already in flight. When receiving a PING response, set cc.pendingResets to 0. A hung connection will be used for at most SETTINGS_MAX_CONCURRENT_STREAMS requests. When StrictMaxConcurrentStreams is false, we will create a new connection after reaching the concurrency limit for a hung one. When StrictMaxConcurrentStreams is true, we will continue to wait for the existing connection until some timeout closes it or it becomes responsive again. For golang/go#59690 Change-Id: I0151f9a594af14b32bcb6005a239fa19eb103704 Reviewed-on: https://go-review.googlesource.com/c/net/+/617655 LUCI-TryBot-Result: Go LUCI Reviewed-by: Brad Fitzpatrick Reviewed-by: Jonathan Amsterdam Reviewed-by: Carlos Amedee --- http2/http2_test.go | 8 +++ http2/transport.go | 71 +++++++++++++++++++--- http2/transport_test.go | 126 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 198 insertions(+), 7 deletions(-) diff --git a/http2/http2_test.go b/http2/http2_test.go index b7c946b98..b1e71f153 100644 --- a/http2/http2_test.go +++ b/http2/http2_test.go @@ -283,3 +283,11 @@ func TestNoUnicodeStrings(t *testing.T) { t.Fatal(err) } } + +// must returns v if err is nil, or panics otherwise. +func must[T any](v T, err error) T { + if err != nil { + panic(err) + } + return v +} diff --git a/http2/transport.go b/http2/transport.go index e989bd19e..5d198baa5 100644 --- a/http2/transport.go +++ b/http2/transport.go @@ -364,6 +364,14 @@ type ClientConn struct { readIdleTimeout time.Duration pingTimeout time.Duration + // pendingResets is the number of RST_STREAM frames we have sent to the peer, + // without confirming that the peer has received them. When we send a RST_STREAM, + // we bundle it with a PING frame, unless a PING is already in flight. We count + // the reset stream against the connection's concurrency limit until we get + // a PING response. This limits the number of requests we'll try to send to a + // completely unresponsive connection. + pendingResets int + // reqHeaderMu is a 1-element semaphore channel controlling access to sending new requests. // Write to reqHeaderMu to lock it, read from it to unlock. // Lock reqmu BEFORE mu or wmu. @@ -960,7 +968,7 @@ func (cc *ClientConn) State() ClientConnState { return ClientConnState{ Closed: cc.closed, Closing: cc.closing || cc.singleUse || cc.doNotReuse || cc.goAway != nil, - StreamsActive: len(cc.streams), + StreamsActive: len(cc.streams) + cc.pendingResets, StreamsReserved: cc.streamsReserved, StreamsPending: cc.pendingRequests, LastIdle: cc.lastIdle, @@ -992,7 +1000,13 @@ func (cc *ClientConn) idleStateLocked() (st clientConnIdleState) { // writing it. maxConcurrentOkay = true } else { - maxConcurrentOkay = int64(len(cc.streams)+cc.streamsReserved+1) <= int64(cc.maxConcurrentStreams) + // We can take a new request if the total of + // - active streams; + // - reservation slots for new streams; and + // - streams for which we have sent a RST_STREAM and a PING, + // but received no subsequent frame + // is less than the concurrency limit. + maxConcurrentOkay = cc.currentRequestCountLocked() < int(cc.maxConcurrentStreams) } st.canTakeNewRequest = cc.goAway == nil && !cc.closed && !cc.closing && maxConcurrentOkay && @@ -1002,6 +1016,12 @@ func (cc *ClientConn) idleStateLocked() (st clientConnIdleState) { return } +// currentRequestCountLocked reports the number of concurrency slots currently in use, +// including active streams, reserved slots, and reset streams waiting for acknowledgement. +func (cc *ClientConn) currentRequestCountLocked() int { + return len(cc.streams) + cc.streamsReserved + cc.pendingResets +} + func (cc *ClientConn) canTakeNewRequestLocked() bool { st := cc.idleStateLocked() return st.canTakeNewRequest @@ -1578,6 +1598,7 @@ func (cs *clientStream) cleanupWriteRequest(err error) { cs.reqBodyClosed = make(chan struct{}) } bodyClosed := cs.reqBodyClosed + closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() || cc.goAway != nil cc.mu.Unlock() if mustCloseBody { cs.reqBody.Close() @@ -1602,16 +1623,40 @@ func (cs *clientStream) cleanupWriteRequest(err error) { if cs.sentHeaders { if se, ok := err.(StreamError); ok { if se.Cause != errFromPeer { - cc.writeStreamReset(cs.ID, se.Code, err) + cc.writeStreamReset(cs.ID, se.Code, false, err) } } else { - cc.writeStreamReset(cs.ID, ErrCodeCancel, err) + // We're cancelling an in-flight request. + // + // This could be due to the server becoming unresponsive. + // To avoid sending too many requests on a dead connection, + // we let the request continue to consume a concurrency slot + // until we can confirm the server is still responding. + // We do this by sending a PING frame along with the RST_STREAM + // (unless a ping is already in flight). + // + // For simplicity, we don't bother tracking the PING payload: + // We reset cc.pendingResets any time we receive a PING ACK. + // + // We skip this if the conn is going to be closed on idle, + // because it's short lived and will probably be closed before + // we get the ping response. + ping := false + if !closeOnIdle { + cc.mu.Lock() + if cc.pendingResets == 0 { + ping = true + } + cc.pendingResets++ + cc.mu.Unlock() + } + cc.writeStreamReset(cs.ID, ErrCodeCancel, ping, err) } } cs.bufPipe.CloseWithError(err) // no-op if already closed } else { if cs.sentHeaders && !cs.sentEndStream { - cc.writeStreamReset(cs.ID, ErrCodeNo, nil) + cc.writeStreamReset(cs.ID, ErrCodeNo, false, nil) } cs.bufPipe.CloseWithError(errRequestCanceled) } @@ -1638,7 +1683,7 @@ func (cc *ClientConn) awaitOpenSlotForStreamLocked(cs *clientStream) error { return errClientConnUnusable } cc.lastIdle = time.Time{} - if int64(len(cc.streams)) < int64(cc.maxConcurrentStreams) { + if cc.currentRequestCountLocked() < int(cc.maxConcurrentStreams) { return nil } cc.pendingRequests++ @@ -3065,6 +3110,11 @@ func (rl *clientConnReadLoop) processPing(f *PingFrame) error { close(c) delete(cc.pings, f.Data) } + if cc.pendingResets > 0 { + // See clientStream.cleanupWriteRequest. + cc.pendingResets = 0 + cc.cond.Broadcast() + } return nil } cc := rl.cc @@ -3087,13 +3137,20 @@ func (rl *clientConnReadLoop) processPushPromise(f *PushPromiseFrame) error { return ConnectionError(ErrCodeProtocol) } -func (cc *ClientConn) writeStreamReset(streamID uint32, code ErrCode, err error) { +// writeStreamReset sends a RST_STREAM frame. +// When ping is true, it also sends a PING frame with a random payload. +func (cc *ClientConn) writeStreamReset(streamID uint32, code ErrCode, ping bool, err error) { // TODO: map err to more interesting error codes, once the // HTTP community comes up with some. But currently for // RST_STREAM there's no equivalent to GOAWAY frame's debug // data, and the error codes are all pretty vague ("cancel"). cc.wmu.Lock() cc.fr.WriteRSTStream(streamID, code) + if ping { + var payload [8]byte + rand.Read(payload[:]) + cc.fr.WritePing(false, payload) + } cc.bw.Flush() cc.wmu.Unlock() } diff --git a/http2/transport_test.go b/http2/transport_test.go index 757a45a7a..f6ef295a4 100644 --- a/http2/transport_test.go +++ b/http2/transport_test.go @@ -2559,6 +2559,9 @@ func testTransportReturnsUnusedFlowControl(t *testing.T, oneDataFrame bool) { } return true }, + func(f *PingFrame) bool { + return true + }, func(f *WindowUpdateFrame) bool { if !oneDataFrame && !sentAdditionalData { t.Fatalf("Got WindowUpdateFrame, don't expect one yet") @@ -5512,3 +5515,126 @@ func TestTransport1xxLimits(t *testing.T) { }) } } + +func TestTransportSendPingWithReset(t *testing.T) { + tc := newTestClientConn(t, func(tr *Transport) { + tr.StrictMaxConcurrentStreams = true + }) + + const maxConcurrent = 3 + tc.greet(Setting{SettingMaxConcurrentStreams, maxConcurrent}) + + // Start several requests. + var rts []*testRoundTrip + for i := 0; i < maxConcurrent+1; i++ { + req := must(http.NewRequest("GET", "https://dummy.tld/", nil)) + rt := tc.roundTrip(req) + if i >= maxConcurrent { + tc.wantIdle() + continue + } + tc.wantFrameType(FrameHeaders) + tc.writeHeaders(HeadersFrameParam{ + StreamID: rt.streamID(), + EndHeaders: true, + BlockFragment: tc.makeHeaderBlockFragment( + ":status", "200", + ), + }) + rt.wantStatus(200) + rts = append(rts, rt) + } + + // Cancel one request. We send a PING frame along with the RST_STREAM. + rts[0].response().Body.Close() + tc.wantRSTStream(rts[0].streamID(), ErrCodeCancel) + pf := readFrame[*PingFrame](t, tc) + tc.wantIdle() + + // Cancel another request. No PING frame, since one is in flight. + rts[1].response().Body.Close() + tc.wantRSTStream(rts[1].streamID(), ErrCodeCancel) + tc.wantIdle() + + // Respond to the PING. + // This finalizes the previous resets, and allows the pending request to be sent. + tc.writePing(true, pf.Data) + tc.wantFrameType(FrameHeaders) + tc.wantIdle() + + // Cancel the last request. We send another PING, since none are in flight. + rts[2].response().Body.Close() + tc.wantRSTStream(rts[2].streamID(), ErrCodeCancel) + tc.wantFrameType(FramePing) + tc.wantIdle() +} + +func TestTransportConnBecomesUnresponsive(t *testing.T) { + // We send a number of requests in series to an unresponsive connection. + // Each request is canceled or times out without a response. + // Eventually, we open a new connection rather than trying to use the old one. + tt := newTestTransport(t) + + const maxConcurrent = 3 + + t.Logf("first request opens a new connection and succeeds") + req1 := must(http.NewRequest("GET", "https://dummy.tld/", nil)) + rt1 := tt.roundTrip(req1) + tc1 := tt.getConn() + tc1.wantFrameType(FrameSettings) + tc1.wantFrameType(FrameWindowUpdate) + hf1 := readFrame[*HeadersFrame](t, tc1) + tc1.writeSettings(Setting{SettingMaxConcurrentStreams, maxConcurrent}) + tc1.wantFrameType(FrameSettings) // ack + tc1.writeHeaders(HeadersFrameParam{ + StreamID: hf1.StreamID, + EndHeaders: true, + EndStream: true, + BlockFragment: tc1.makeHeaderBlockFragment( + ":status", "200", + ), + }) + rt1.wantStatus(200) + rt1.response().Body.Close() + + // Send more requests. + // None receive a response. + // Each is canceled. + for i := 0; i < maxConcurrent; i++ { + t.Logf("request %v receives no response and is canceled", i) + ctx, cancel := context.WithCancel(context.Background()) + req := must(http.NewRequestWithContext(ctx, "GET", "https://dummy.tld/", nil)) + tt.roundTrip(req) + if tt.hasConn() { + t.Fatalf("new connection created; expect existing conn to be reused") + } + tc1.wantFrameType(FrameHeaders) + cancel() + tc1.wantFrameType(FrameRSTStream) + if i == 0 { + tc1.wantFrameType(FramePing) + } + tc1.wantIdle() + } + + // The conn has hit its concurrency limit. + // The next request is sent on a new conn. + req2 := must(http.NewRequest("GET", "https://dummy.tld/", nil)) + rt2 := tt.roundTrip(req2) + tc2 := tt.getConn() + tc2.wantFrameType(FrameSettings) + tc2.wantFrameType(FrameWindowUpdate) + hf := readFrame[*HeadersFrame](t, tc2) + tc2.writeSettings(Setting{SettingMaxConcurrentStreams, maxConcurrent}) + tc2.wantFrameType(FrameSettings) // ack + tc2.writeHeaders(HeadersFrameParam{ + StreamID: hf.StreamID, + EndHeaders: true, + EndStream: true, + BlockFragment: tc2.makeHeaderBlockFragment( + ":status", "200", + ), + }) + rt2.wantStatus(200) + rt2.response().Body.Close() +}