Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Delaying Stream ID FC Updates to StreamClose #3665

Merged
merged 22 commits into from
Jun 9, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/api/QUIC_CONNECTION_EVENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ Value | Meaning
**QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL**<br>1 | A unidirectional stream.
**QUIC_STREAM_OPEN_FLAG_0_RTT**<br>2 | The stream was received in 0-RTT.

If a server wishes to use `QUIC_STREAM_OPEN_FLAG_DELAY_FC_UPDATES` for the newly started stream, it may append this flag to `Flags` before it returns from the callback.
nibanks marked this conversation as resolved.
Show resolved Hide resolved

## QUIC_CONNECTION_EVENT_STREAMS_AVAILABLE

This event indicates the number of streams the peer is willing to accept has changed.
Expand Down
1 change: 1 addition & 0 deletions docs/api/StreamOpen.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Value | Meaning
**QUIC_STREAM_OPEN_FLAG_NONE**<br>0 | No special behavior. Defaults to bidirectional stream.
**QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL**<br>1 | Opens a unidirectional stream.
**QUIC_STREAM_OPEN_FLAG_0_RTT**<br>2 | Indicates that the stream may be sent in 0-RTT.
**QUIC_STREAM_OPEN_FLAG_DELAY_FC_UPDATES**<br>4 | Indicates stream flow control limit updates for the connection should be delayed to StreamClose.

`Handler`

Expand Down
8 changes: 1 addition & 7 deletions src/core/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -658,13 +658,7 @@ MsQuicStreamOpen(
goto Error;
}

Status =
QuicStreamInitialize(
Connection,
FALSE,
!!(Flags & QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL),
!!(Flags & QUIC_STREAM_OPEN_FLAG_0_RTT),
(QUIC_STREAM**)NewStream);
Status = QuicStreamInitialize(Connection, FALSE, Flags, (QUIC_STREAM**)NewStream);
if (QUIC_FAILED(Status)) {
goto Error;
}
Expand Down
30 changes: 23 additions & 7 deletions src/core/stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ QUIC_STATUS
QuicStreamInitialize(
_In_ QUIC_CONNECTION* Connection,
_In_ BOOLEAN OpenedRemotely,
_In_ BOOLEAN Unidirectional,
_In_ BOOLEAN Opened0Rtt,
_In_ QUIC_STREAM_OPEN_FLAGS Flags,
_Outptr_ _At_(*NewStream, __drv_allocatesMem(Mem))
QUIC_STREAM** NewStream
)
Expand Down Expand Up @@ -55,8 +54,15 @@ QuicStreamInitialize(
Stream->Type = QUIC_HANDLE_TYPE_STREAM;
Stream->Connection = Connection;
Stream->ID = UINT64_MAX;
Stream->Flags.Unidirectional = Unidirectional;
Stream->Flags.Opened0Rtt = Opened0Rtt;
Stream->Flags.Unidirectional = !!(Flags & QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL);
Stream->Flags.Opened0Rtt = !!(Flags & QUIC_STREAM_OPEN_FLAG_0_RTT);
Stream->Flags.DelayFCUpdate = !!(Flags & QUIC_STREAM_OPEN_FLAG_DELAY_FC_UPDATES);
if (Stream->Flags.DelayFCUpdate) {
QuicTraceLogStreamVerbose(
ConfiguredForDelayedFC,
Stream,
"Configured for delayed FC updates");
}
Stream->Flags.Allocated = TRUE;
Stream->Flags.SendEnabled = TRUE;
Stream->Flags.ReceiveEnabled = TRUE;
Expand All @@ -73,7 +79,7 @@ QuicStreamInitialize(
Stream->RefTypeCount[QUIC_STREAM_REF_APP] = 1;
#endif

if (Unidirectional) {
if (Stream->Flags.Unidirectional) {
if (!OpenedRemotely) {

//
Expand Down Expand Up @@ -354,7 +360,7 @@ QuicStreamClose(

if (!Stream->Flags.ShutdownComplete) {

if (Stream->Flags.Started) {
if (Stream->Flags.Started && !Stream->Flags.HandleShutdown) {
csujedihy marked this conversation as resolved.
Show resolved Hide resolved
//
// TODO - If the stream hasn't been aborted already, then this is a
// fatal error for the connection. The QUIC transport cannot "just
Expand Down Expand Up @@ -384,9 +390,17 @@ QuicStreamClose(
// since nothing else can be done with it now.
//
Stream->Flags.ShutdownComplete = TRUE;
CXPLAT_DBG_ASSERT(!Stream->Flags.InStreamTable);
}
}

if (Stream->Flags.DelayFCUpdate && Stream->Flags.ShutdownComplete) {
//
// Indicate the stream is completely shut down to the connection.
//
QuicStreamSetReleaseStream(&Stream->Connection->Streams, Stream);
}

Stream->ClientCallbackHandler = NULL;

QuicStreamRelease(Stream, QUIC_STREAM_REF_APP);
Expand Down Expand Up @@ -593,7 +607,9 @@ QuicStreamTryCompleteShutdown(
//
// Indicate the stream is completely shut down to the connection.
//
QuicStreamSetReleaseStream(&Stream->Connection->Streams, Stream);
if (!Stream->Flags.DelayFCUpdate || Stream->Flags.HandleClosed) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stream->Flags.HandleClosed

Why is this condition needed?

Copy link
Contributor

@csujedihy csujedihy Jun 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If HandleClose == true, this function, QuicStreamTryCompleteShutdown, is called by QuicStreamClose, right? QuicStreamClose guarantees that QuicStreamSetReleaseStream will be called if DelayFCUpdate is set. Wouldn't we end up calling QuicStreamSetReleaseStream twice?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case is tricky. If an app shuts down (abortively) a stream using the QUIC_STREAM_SHUTDOWN_FLAG_IMMEDIATE flag then we will immediately give them the shutdown complete event and they can close the handle, but MsQuic will continue to track the stream until we've delivered (and received the ACK for) the corresponding stream frames over the wire. In this case, the QuicStreamClose call will run before this code path, but we don't want to release the stream there. Later, when everything is complete, this code path will be run finally, but since the handle has already been closed, we need to release the stream now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.

QuicStreamSetReleaseStream(&Stream->Connection->Streams, Stream);
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/core/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ typedef union QUIC_STREAM_FLAGS {
BOOLEAN Freed : 1; // Freed after last ref count released. Used for Debugging.

BOOLEAN InStreamTable : 1; // The stream is currently in the connection's table.
BOOLEAN DelayFCUpdate : 1; // Delay stream ID FC updates to StreamClose.
};
} QUIC_STREAM_FLAGS;

Expand Down Expand Up @@ -530,8 +531,7 @@ QUIC_STATUS
QuicStreamInitialize(
_In_ QUIC_CONNECTION* Connection,
_In_ BOOLEAN OpenedRemotely,
_In_ BOOLEAN Unidirectional,
_In_ BOOLEAN Opened0Rtt,
_In_ QUIC_STREAM_OPEN_FLAGS Flags,
_Outptr_ _At_(*Stream, __drv_allocatesMem(Mem))
QUIC_STREAM** Stream
);
Expand Down
21 changes: 15 additions & 6 deletions src/core/stream_set.c
Original file line number Diff line number Diff line change
Expand Up @@ -658,14 +658,16 @@ QuicStreamSetGetStreamForPeer(
// Calculate the next Stream ID.
//
uint64_t NewStreamId = StreamType + (Info->TotalStreamCount << 2);
QUIC_STREAM_OPEN_FLAGS OpenFlags = QUIC_STREAM_OPEN_FLAG_NONE;
if (STREAM_ID_IS_UNI_DIR(StreamId)) {
OpenFlags |= QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL;
}
if (FrameIn0Rtt) {
OpenFlags |= QUIC_STREAM_OPEN_FLAG_0_RTT;
}

QUIC_STATUS Status =
QuicStreamInitialize(
Connection,
TRUE,
STREAM_ID_IS_UNI_DIR(StreamId), // Unidirectional
FrameIn0Rtt, // Opened0Rtt
&Stream);
QuicStreamInitialize(Connection, TRUE, OpenFlags, &Stream);
if (QUIC_FAILED(Status)) {
*FatalError = TRUE;
QuicConnTransportError(Connection, QUIC_ERROR_INTERNAL_ERROR);
Expand Down Expand Up @@ -721,6 +723,13 @@ QuicStreamSetGetStreamForPeer(
CXPLAT_FRE_ASSERTMSG(
Stream->ClientCallbackHandler != NULL,
"App MUST set callback handler!");
if (Event.PEER_STREAM_STARTED.Flags & QUIC_STREAM_OPEN_FLAG_DELAY_FC_UPDATES) {
Stream->Flags.DelayFCUpdate = TRUE;
QuicTraceLogStreamVerbose(
ConfiguredForDelayedFC,
Stream,
"Configured for delayed FC updates");
}
}

} while (Info->TotalStreamCount != StreamCount);
Expand Down
1 change: 1 addition & 0 deletions src/cs/lib/msquic_generated.cs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ internal enum QUIC_STREAM_OPEN_FLAGS
NONE = 0x0000,
UNIDIRECTIONAL = 0x0001,
ZERO_RTT = 0x0002,
DELAY_FC_UPDATES = 0x0004,
}

[System.Flags]
Expand Down
18 changes: 18 additions & 0 deletions src/generated/linux/stream.c.clog.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,24 @@ tracepoint(CLOG_STREAM_C, UpdatePriority , arg1, arg3);\



/*----------------------------------------------------------
// Decoder Ring for ConfiguredForDelayedFC
// [strm][%p] Configured for delayed FC updates
// QuicTraceLogStreamVerbose(
ConfiguredForDelayedFC,
Stream,
"Configured for delayed FC updates");
// arg1 = arg1 = Stream = arg1
----------------------------------------------------------*/
#ifndef _clog_3_ARGS_TRACE_ConfiguredForDelayedFC
#define _clog_3_ARGS_TRACE_ConfiguredForDelayedFC(uniqueId, arg1, encoded_arg_string)\
tracepoint(CLOG_STREAM_C, ConfiguredForDelayedFC , arg1);\

#endif




/*----------------------------------------------------------
// Decoder Ring for IndicateStartComplete
// [strm][%p] Indicating QUIC_STREAM_EVENT_START_COMPLETE [Status=0x%x ID=%llu Accepted=%hhu]
Expand Down
19 changes: 19 additions & 0 deletions src/generated/linux/stream.c.clog.h.lttng.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,25 @@ TRACEPOINT_EVENT(CLOG_STREAM_C, UpdatePriority,



/*----------------------------------------------------------
// Decoder Ring for ConfiguredForDelayedFC
// [strm][%p] Configured for delayed FC updates
// QuicTraceLogStreamVerbose(
ConfiguredForDelayedFC,
Stream,
"Configured for delayed FC updates");
// arg1 = arg1 = Stream = arg1
----------------------------------------------------------*/
TRACEPOINT_EVENT(CLOG_STREAM_C, ConfiguredForDelayedFC,
TP_ARGS(
const void *, arg1),
TP_FIELDS(
ctf_integer_hex(uint64_t, arg1, arg1)
)
)



/*----------------------------------------------------------
// Decoder Ring for IndicateStartComplete
// [strm][%p] Indicating QUIC_STREAM_EVENT_START_COMPLETE [Status=0x%x ID=%llu Accepted=%hhu]
Expand Down
18 changes: 18 additions & 0 deletions src/generated/linux/stream_set.c.clog.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,24 @@ tracepoint(CLOG_STREAM_SET_C, IndicatePeerAccepted , arg1);\



/*----------------------------------------------------------
// Decoder Ring for ConfiguredForDelayedFC
// [strm][%p] Configured for delayed FC updates
// QuicTraceLogStreamVerbose(
ConfiguredForDelayedFC,
Stream,
"Configured for delayed FC updates");
// arg1 = arg1 = Stream = arg1
----------------------------------------------------------*/
#ifndef _clog_3_ARGS_TRACE_ConfiguredForDelayedFC
#define _clog_3_ARGS_TRACE_ConfiguredForDelayedFC(uniqueId, arg1, encoded_arg_string)\
tracepoint(CLOG_STREAM_SET_C, ConfiguredForDelayedFC , arg1);\

#endif




/*----------------------------------------------------------
// Decoder Ring for MaxStreamCountUpdated
// [conn][%p] App configured max stream count of %hu (type=%hhu).
Expand Down
19 changes: 19 additions & 0 deletions src/generated/linux/stream_set.c.clog.h.lttng.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,25 @@ TRACEPOINT_EVENT(CLOG_STREAM_SET_C, IndicatePeerAccepted,



/*----------------------------------------------------------
// Decoder Ring for ConfiguredForDelayedFC
// [strm][%p] Configured for delayed FC updates
// QuicTraceLogStreamVerbose(
ConfiguredForDelayedFC,
Stream,
"Configured for delayed FC updates");
// arg1 = arg1 = Stream = arg1
----------------------------------------------------------*/
TRACEPOINT_EVENT(CLOG_STREAM_SET_C, ConfiguredForDelayedFC,
TP_ARGS(
const void *, arg1),
TP_FIELDS(
ctf_integer_hex(uint64_t, arg1, arg1)
)
)



/*----------------------------------------------------------
// Decoder Ring for MaxStreamCountUpdated
// [conn][%p] App configured max stream count of %hu (type=%hhu).
Expand Down
2 changes: 2 additions & 0 deletions src/inc/msquic.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ typedef enum QUIC_STREAM_OPEN_FLAGS {
QUIC_STREAM_OPEN_FLAG_NONE = 0x0000,
QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL = 0x0001, // Indicates the stream is unidirectional.
QUIC_STREAM_OPEN_FLAG_0_RTT = 0x0002, // The stream was opened via a 0-RTT packet.
QUIC_STREAM_OPEN_FLAG_DELAY_FC_UPDATES = 0x0004, // Indicates stream flow control limit updates for the connection
// should be delayed to StreamClose.
} QUIC_STREAM_OPEN_FLAGS;

DEFINE_ENUM_FLAG_OPERATORS(QUIC_STREAM_OPEN_FLAGS)
Expand Down
17 changes: 17 additions & 0 deletions src/manifest/clog.sidecar
Original file line number Diff line number Diff line change
Expand Up @@ -963,6 +963,18 @@
],
"macroName": "QuicTraceLogInfo"
},
"ConfiguredForDelayedFC": {
"ModuleProperites": {},
"TraceString": "[strm][%p] Configured for delayed FC updates",
nibanks marked this conversation as resolved.
Show resolved Hide resolved
"UniqueId": "ConfiguredForDelayedFC",
"splitArgs": [
{
"DefinationEncoding": "p",
"MacroVariableName": "arg1"
}
],
"macroName": "QuicTraceLogStreamVerbose"
},
"ConnAppShutdown": {
"ModuleProperites": {},
"TraceString": "[conn][%p] App Shutdown: %llu (Remote=%hhu)",
Expand Down Expand Up @@ -12766,6 +12778,11 @@
"TraceID": "ConfigurationSettingsUpdated",
"EncodingString": "[cnfg][%p] Settings %p Updated"
},
{
"UniquenessHash": "c80d64df-1584-30c7-6659-f2bba46a5fbb",
"TraceID": "ConfiguredForDelayedFC",
"EncodingString": "[strm][%p] Configured for delayed FC updates"
},
{
"UniquenessHash": "905ba7f9-0427-2d53-7d13-88f2bb532bf0",
"TraceID": "ConnAppShutdown",
Expand Down
1 change: 1 addition & 0 deletions src/plugins/dbg/quictypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ typedef union QUIC_STREAM_FLAGS {
BOOLEAN Freed : 1; // Freed after last ref count released. Used for Debugging.

BOOLEAN InStreamTable : 1; // The stream is currently in the connection's table.
BOOLEAN DelayFCUpdate : 1; // Delay stream ID FC updates to StreamClose.
};
} QUIC_STREAM_FLAGS;

Expand Down
5 changes: 4 additions & 1 deletion src/tools/spin/spinquic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,9 @@ QUIC_STATUS QUIC_API SpinQuicHandleConnectionEvent(HQUIC Connection, void* , QUI
MsQuic.StreamClose(Event->PEER_STREAM_STARTED.Stream);
return QUIC_STATUS_SUCCESS;
}
if (GetRandom(2) == 0) {
Event->PEER_STREAM_STARTED.Flags |= QUIC_STREAM_OPEN_FLAG_DELAY_FC_UPDATES;
}
auto StreamCtx = new SpinQuicStream(*ctx, Event->PEER_STREAM_STARTED.Stream);
MsQuic.SetCallbackHandler(Event->PEER_STREAM_STARTED.Stream, (void *)SpinQuicHandleStreamEvent, StreamCtx);
ctx->AddStream(Event->PEER_STREAM_STARTED.Stream);
Expand Down Expand Up @@ -748,7 +751,7 @@ void Spin(Gbs& Gb, LockableVector<HQUIC>& Connections, std::vector<HQUIC>* Liste
BAIL_ON_NULL_CONNECTION(Connection);
HQUIC Stream;
auto ctx = new SpinQuicStream(*SpinQuicConnection::Get(Connection));
QUIC_STATUS Status = MsQuic.StreamOpen(Connection, (QUIC_STREAM_OPEN_FLAGS)GetRandom(4), SpinQuicHandleStreamEvent, ctx, &Stream);
QUIC_STATUS Status = MsQuic.StreamOpen(Connection, (QUIC_STREAM_OPEN_FLAGS)GetRandom(8), SpinQuicHandleStreamEvent, ctx, &Stream);
if (QUIC_SUCCEEDED(Status)) {
ctx->Handle = Stream;
SpinQuicGetRandomParam(Stream, ThreadID);
Expand Down