Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better handle new streams when server is quiescing #481

Merged
merged 8 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -181,14 +181,16 @@ struct ConnectionStreamState {
streamID: HTTP2StreamID,
ignoreRecentlyReset: Bool,
ignoreClosed: Bool = false,
isQuiescing: Bool = false,
_ modifier: (inout HTTP2StreamStateMachine) -> StateMachineResultWithStreamEffect
) -> StateMachineResultWithStreamEffect {
guard let result = self.activeStreams.autoClosingTransform(streamID: streamID, modifier) else {
return StateMachineResultWithStreamEffect(
result: self.streamMissing(
streamID: streamID,
ignoreRecentlyReset: ignoreRecentlyReset,
ignoreClosed: ignoreClosed
ignoreClosed: ignoreClosed,
isQuiescing: isQuiescing
),
effect: nil
)
Expand Down Expand Up @@ -218,11 +220,28 @@ struct ConnectionStreamState {
_ modifier: (inout HTTP2StreamStateMachine) -> StateMachineResultWithStreamEffect
) -> StateMachineResultWithStreamEffect {
guard let result = self.activeStreams.autoClosingTransform(streamID: streamID, modifier) else {
// We never ignore recently reset streams here, as this should only ever be used when *sending* frames.
return StateMachineResultWithStreamEffect(
result: self.streamMissing(streamID: streamID, ignoreRecentlyReset: false, ignoreClosed: false),
effect: nil
)
// This state can be reached when a RST_STREAM frame is sent by this peer but the stream
// doesn't exist. This can happen for a few reasons, including:
//
// 1. The RST_STREAM frame is being sent because the stream wasn't accepted (i.e. the
// client opening the stream raced with the server sending a GOAWAY frame). In this
// case the state machine doesn't know about the stream but does need to send
// a RST_STREAM frame.
// 2. An implementation or user error where the stream genuinely doesn't exist and
// attempting to send a RST_STREAM frame is an error.
if streamID.isClientInitiated, streamID > self.lastClientStreamID {
glbrntt marked this conversation as resolved.
Show resolved Hide resolved
return StateMachineResultWithStreamEffect(result: .succeed, effect: nil)
} else {
return StateMachineResultWithStreamEffect(
result: self.streamMissing(
streamID: streamID,
ignoreRecentlyReset: false,
ignoreClosed: false,
isQuiescing: true
glbrntt marked this conversation as resolved.
Show resolved Hide resolved
),
effect: nil
)
}
}

guard let effect = result.effect, effect.closedStream else {
Expand Down Expand Up @@ -324,7 +343,8 @@ struct ConnectionStreamState {
private func streamMissing(
streamID: HTTP2StreamID,
ignoreRecentlyReset: Bool,
ignoreClosed: Bool
ignoreClosed: Bool,
isQuiescing: Bool
glbrntt marked this conversation as resolved.
Show resolved Hide resolved
) -> StateMachineResult {
if ignoreRecentlyReset && self.recentlyResetStreams.contains(streamID) {
return .ignoreFrame
Expand All @@ -333,11 +353,20 @@ struct ConnectionStreamState {
switch streamID.mayBeInitiatedBy(.client) {
case true where streamID > self.lastClientStreamID,
false where streamID > self.lastServerStreamID:
// The stream in question is idle.
return .connectionError(
underlyingError: NIOHTTP2Errors.noSuchStream(streamID: streamID),
type: .protocolError
)
if isQuiescing {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this flag isn't quite specific enough. I'm not even entirely sure we want to sink this logic into this function: we really only want this flow to happen on receipt of stream creation frames, which might suggest a better name for the parameter at least.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest I wasn't really happy with this whole approach. It's all a bit awkward.

The issue is that we receive HEADERS in a state where it's valid to receive them if the stream already exists but is a stream error if it doesn't. The problem with is that we return the stream error out of the state machine and back into the channel handler, which will then call back into the state machine to send the RST_STREAM frame as a result of the stream error. From the POV of the state machine, that stream doesn't exist, and that's the awkward bit.

It makes me wonder whether we should modify StateMachineResult so that we can represent the notion of rejecting a stream (as opposed to a stream error, for which we assume the stream already exists). WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's an interesting issue. Perhaps the actual fix is that we should tolerate sending RST_STREAM frames on streams that don't exist?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially ruled that out because there are tests which validate that you can't reset a stream twice (or something similar). This would certainly be the simplest fix assuming we're okay with that small regression.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm open to that small regression, yeah.

// This peer quiescing and the remote peer opening a stream raced. Reject the stream.
return .streamError(
streamID: streamID,
underlyingError: NIOHTTP2Errors.streamError(streamID: streamID, baseError: NIOHTTP2Errors.createdStreamAfterGoaway()),
type: .refusedStream
)
} else {
// The stream in question is idle.
return .connectionError(
underlyingError: NIOHTTP2Errors.noSuchStream(streamID: streamID),
type: .protocolError
)
}
default:
// This stream must have already been closed.
if ignoreClosed {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ extension ReceivingHeadersState where Self: LocallyQuiescingState {
}

// At this stage we've quiesced, so the remote peer is not allowed to create new streams.
let result = self.streamState.modifyStreamState(streamID: streamID, ignoreRecentlyReset: true) {
let result = self.streamState.modifyStreamState(streamID: streamID, ignoreRecentlyReset: true, isQuiescing: true) {
$0.receiveHeaders(
headers: headers,
validateHeaderBlock: validateHeaderBlock,
Expand Down
54 changes: 54 additions & 0 deletions Tests/NIOHTTP2Tests/ConnectionStateMachineTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1266,6 +1266,60 @@ class ConnectionStateMachineTests: XCTestCase {
assertSucceeds(temporaryClient.receiveRstStream(streamID: streamOne, reason: .refusedStream))
}

func testClientInitiatesNewStreamBeforeReceivingAlreadySentGoawayWhenServerLocallyQuiesced() {
// Tests the behaviour of the server when it has open streams, sends a GOAWAY frame (so is
// in locally quiescing state), and then receives HEADERS for a stream whose ID is less than
// the last stream ID sent in the GOAWAY frame.
//
// The expected behaviour is that the server should refuse the stream (by sending RST_STREAM
// frame) and emit a stream error.
let streamOne = HTTP2StreamID(1)
let streamThree = HTTP2StreamID(3)

self.exchangePreamble()

// Open stream one.
assertSucceeds(
self.client.sendHeaders(
streamID: streamOne,
headers: ConnectionStateMachineTests.requestHeaders,
isEndStreamSet: false
)
)
assertSucceeds(
self.server.receiveHeaders(
streamID: streamOne,
headers: ConnectionStateMachineTests.requestHeaders,
isEndStreamSet: false
)
)

// Server begins quiescing. It enters the locally quiesced state.
assertSucceeds(self.server.sendGoaway(lastStreamID: .maxID))

// Client opens stream three (important: it hasn't received the GOAWAY frame).
assertSucceeds(
self.client.sendHeaders(
streamID: streamThree,
headers: ConnectionStateMachineTests.requestHeaders,
isEndStreamSet: false
)
)
// Server receives headers for stream three. It should throw a stream error and as a
// result, respond with a RST_STREAM frame.
assertStreamError(
type: .refusedStream,
self.server.receiveHeaders(
streamID: streamThree,
headers: ConnectionStateMachineTests.requestHeaders,
isEndStreamSet: false
)
)

assertSucceeds(self.server.sendRstStream(streamID: streamOne, reason: .refusedStream))
assertSucceeds(self.client.receiveRstStream(streamID: streamOne, reason: .refusedStream))
}

func testHeadersOnClosedStreamAfterClientGoaway() {
let streamOne = HTTP2StreamID(1)
let streamTwo = HTTP2StreamID(2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,70 @@ class SimpleClientServerInlineStreamMultiplexerTests: XCTestCase {
XCTAssertNoThrow(try self.serverChannel.finish())
}

func testOpenStreamBeforeReceivingGoAwayWhenServerLocallyQuiesced() throws {
let serverFrameRecorder = InboundFramePayloadRecorder()
try self.basicHTTP2Connection { channel in
channel.pipeline.addHandler(serverFrameRecorder)
}

let http2Handler = self.clientChannel.pipeline.handler(type: NIOHTTP2Handler.self)
let multiplexer = try http2Handler.flatMap { $0.multiplexer }.wait()

let clientFrameRecorder = InboundFramePayloadRecorder()
let streamOneFuture = multiplexer.createStreamChannel { channel in
channel.eventLoop.makeCompletedFuture {
try channel.pipeline.syncOperations.addHandler(clientFrameRecorder)
}
}
self.clientChannel.embeddedEventLoop.run()
let streamOne = try streamOneFuture.wait()

let headers: HPACKHeaders = [
":path": "/",
":method": "GET",
":scheme": "http",
":authority": "localhost",
]
try streamOne.writeAndFlush(HTTP2Frame.FramePayload.headers(.init(headers: headers))).wait()
self.interactInMemory(self.clientChannel, self.serverChannel)
serverFrameRecorder.receivedFrames.assertFramePayloadsMatch([.headers(.init(headers: headers))])

// Create stream two, but don't write on it (yet).
let streamTwoFuture = multiplexer.createStreamChannel { channel in
channel.eventLoop.makeCompletedFuture {
try channel.pipeline.syncOperations.addHandler(clientFrameRecorder)
}
}
self.clientChannel.embeddedEventLoop.run()
let streamTwo = try streamTwoFuture.wait()

// Send the GOAWAY from the server.
let goAway = HTTP2Frame(
streamID: .rootStream,
payload: .goAway(lastStreamID: .maxID, errorCode: .noError, opaqueData: nil)
)
try self.serverChannel.writeAndFlush(goAway).wait()
// Send HEADERS on the second client stream.
try streamTwo.writeAndFlush(HTTP2Frame.FramePayload.headers(.init(headers: headers))).wait()

// Interacting in memory to exchange frames.
self.interactInMemory(self.clientChannel, self.serverChannel, expectError: true) { error in
if let error = error as? NIOHTTP2Errors.StreamError {
XCTAssert(error.baseError is NIOHTTP2Errors.CreatedStreamAfterGoaway)
} else {
XCTFail("Expected error to be of type StreamError, got error of type \(type(of: error)).")
}
}

// Client receives GOAWAY and RST_STREAM frames.
try self.clientChannel.assertReceivedFrame().assertGoAwayFrame(
lastStreamID: .maxID,
errorCode: 0,
opaqueData: nil
)
clientFrameRecorder.receivedFrames.assertFramePayloadsMatch([.rstStream(.refusedStream)])
}

func testSuccessfullyReceiveAndSendPingEvenWhenConnectionIsFullyQuiesced() throws {
let serverHandler = InboundFramePayloadRecorder()
try self.basicHTTP2Connection { channel in
Expand Down
Loading