Skip to content

Commit

Permalink
Support Delaying Stream ID FC Updates to StreamClose (#3665) (#3684)
Browse files Browse the repository at this point in the history
  • Loading branch information
nibanks authored Jun 9, 2023
1 parent c0281d1 commit b7b2d5e
Show file tree
Hide file tree
Showing 15 changed files with 143 additions and 23 deletions.
2 changes: 2 additions & 0 deletions docs/api/QUIC_CONNECTION_EVENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ Value | Meaning
**QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL**<br>1 | A unidirectional stream.
**QUIC_STREAM_OPEN_FLAG_0_RTT**<br>2 | The stream was received in 0-RTT.

If a server wishes to use `QUIC_STREAM_OPEN_FLAG_DELAY_ID_FC_UPDATES` for the newly started stream, it may append this flag to `Flags` before it returns from the callback.

## QUIC_CONNECTION_EVENT_STREAMS_AVAILABLE

This event indicates the number of streams the peer is willing to accept has changed.
Expand Down
1 change: 1 addition & 0 deletions docs/api/StreamOpen.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Value | Meaning
**QUIC_STREAM_OPEN_FLAG_NONE**<br>0 | No special behavior. Defaults to bidirectional stream.
**QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL**<br>1 | Opens a unidirectional stream.
**QUIC_STREAM_OPEN_FLAG_0_RTT**<br>2 | Indicates that the stream may be sent in 0-RTT.
**QUIC_STREAM_OPEN_FLAG_DELAY_ID_FC_UPDATES**<br>4 | Indicates stream ID flow control limit updates for the connection should be delayed to StreamClose.
`Handler`
Expand Down
8 changes: 1 addition & 7 deletions src/core/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -658,13 +658,7 @@ MsQuicStreamOpen(
goto Error;
}

Status =
QuicStreamInitialize(
Connection,
FALSE,
!!(Flags & QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL),
!!(Flags & QUIC_STREAM_OPEN_FLAG_0_RTT),
(QUIC_STREAM**)NewStream);
Status = QuicStreamInitialize(Connection, FALSE, Flags, (QUIC_STREAM**)NewStream);
if (QUIC_FAILED(Status)) {
goto Error;
}
Expand Down
30 changes: 23 additions & 7 deletions src/core/stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ QUIC_STATUS
QuicStreamInitialize(
_In_ QUIC_CONNECTION* Connection,
_In_ BOOLEAN OpenedRemotely,
_In_ BOOLEAN Unidirectional,
_In_ BOOLEAN Opened0Rtt,
_In_ QUIC_STREAM_OPEN_FLAGS Flags,
_Outptr_ _At_(*NewStream, __drv_allocatesMem(Mem))
QUIC_STREAM** NewStream
)
Expand Down Expand Up @@ -55,8 +54,15 @@ QuicStreamInitialize(
Stream->Type = QUIC_HANDLE_TYPE_STREAM;
Stream->Connection = Connection;
Stream->ID = UINT64_MAX;
Stream->Flags.Unidirectional = Unidirectional;
Stream->Flags.Opened0Rtt = Opened0Rtt;
Stream->Flags.Unidirectional = !!(Flags & QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL);
Stream->Flags.Opened0Rtt = !!(Flags & QUIC_STREAM_OPEN_FLAG_0_RTT);
Stream->Flags.DelayIdFcUpdate = !!(Flags & QUIC_STREAM_OPEN_FLAG_DELAY_ID_FC_UPDATES);
if (Stream->Flags.DelayIdFcUpdate) {
QuicTraceLogStreamVerbose(
ConfiguredForDelayedIDFC,
Stream,
"Configured for delayed ID FC updates");
}
Stream->Flags.Allocated = TRUE;
Stream->Flags.SendEnabled = TRUE;
Stream->Flags.ReceiveEnabled = TRUE;
Expand All @@ -73,7 +79,7 @@ QuicStreamInitialize(
Stream->RefTypeCount[QUIC_STREAM_REF_APP] = 1;
#endif

if (Unidirectional) {
if (Stream->Flags.Unidirectional) {
if (!OpenedRemotely) {

//
Expand Down Expand Up @@ -354,7 +360,7 @@ QuicStreamClose(

if (!Stream->Flags.ShutdownComplete) {

if (Stream->Flags.Started) {
if (Stream->Flags.Started && !Stream->Flags.HandleShutdown) {
//
// TODO - If the stream hasn't been aborted already, then this is a
// fatal error for the connection. The QUIC transport cannot "just
Expand Down Expand Up @@ -384,9 +390,17 @@ QuicStreamClose(
// since nothing else can be done with it now.
//
Stream->Flags.ShutdownComplete = TRUE;
CXPLAT_DBG_ASSERT(!Stream->Flags.InStreamTable);
}
}

if (Stream->Flags.DelayIdFcUpdate && Stream->Flags.ShutdownComplete) {
//
// Indicate the stream is completely shut down to the connection.
//
QuicStreamSetReleaseStream(&Stream->Connection->Streams, Stream);
}

Stream->ClientCallbackHandler = NULL;

QuicStreamRelease(Stream, QUIC_STREAM_REF_APP);
Expand Down Expand Up @@ -593,7 +607,9 @@ QuicStreamTryCompleteShutdown(
//
// Indicate the stream is completely shut down to the connection.
//
QuicStreamSetReleaseStream(&Stream->Connection->Streams, Stream);
if (!Stream->Flags.DelayIdFcUpdate || Stream->Flags.HandleClosed) {
QuicStreamSetReleaseStream(&Stream->Connection->Streams, Stream);
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/core/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ typedef union QUIC_STREAM_FLAGS {
BOOLEAN Freed : 1; // Freed after last ref count released. Used for Debugging.

BOOLEAN InStreamTable : 1; // The stream is currently in the connection's table.
BOOLEAN DelayIdFcUpdate : 1; // Delay stream ID FC updates to StreamClose.
};
} QUIC_STREAM_FLAGS;

Expand Down Expand Up @@ -530,8 +531,7 @@ QUIC_STATUS
QuicStreamInitialize(
_In_ QUIC_CONNECTION* Connection,
_In_ BOOLEAN OpenedRemotely,
_In_ BOOLEAN Unidirectional,
_In_ BOOLEAN Opened0Rtt,
_In_ QUIC_STREAM_OPEN_FLAGS Flags,
_Outptr_ _At_(*Stream, __drv_allocatesMem(Mem))
QUIC_STREAM** Stream
);
Expand Down
21 changes: 15 additions & 6 deletions src/core/stream_set.c
Original file line number Diff line number Diff line change
Expand Up @@ -658,14 +658,16 @@ QuicStreamSetGetStreamForPeer(
// Calculate the next Stream ID.
//
uint64_t NewStreamId = StreamType + (Info->TotalStreamCount << 2);
QUIC_STREAM_OPEN_FLAGS OpenFlags = QUIC_STREAM_OPEN_FLAG_NONE;
if (STREAM_ID_IS_UNI_DIR(StreamId)) {
OpenFlags |= QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL;
}
if (FrameIn0Rtt) {
OpenFlags |= QUIC_STREAM_OPEN_FLAG_0_RTT;
}

QUIC_STATUS Status =
QuicStreamInitialize(
Connection,
TRUE,
STREAM_ID_IS_UNI_DIR(StreamId), // Unidirectional
FrameIn0Rtt, // Opened0Rtt
&Stream);
QuicStreamInitialize(Connection, TRUE, OpenFlags, &Stream);
if (QUIC_FAILED(Status)) {
*FatalError = TRUE;
QuicConnTransportError(Connection, QUIC_ERROR_INTERNAL_ERROR);
Expand Down Expand Up @@ -721,6 +723,13 @@ QuicStreamSetGetStreamForPeer(
CXPLAT_FRE_ASSERTMSG(
Stream->ClientCallbackHandler != NULL,
"App MUST set callback handler!");
if (Event.PEER_STREAM_STARTED.Flags & QUIC_STREAM_OPEN_FLAG_DELAY_ID_FC_UPDATES) {
Stream->Flags.DelayIdFcUpdate = TRUE;
QuicTraceLogStreamVerbose(
ConfiguredForDelayedIDFC,
Stream,
"Configured for delayed ID FC updates");
}
}

} while (Info->TotalStreamCount != StreamCount);
Expand Down
1 change: 1 addition & 0 deletions src/cs/lib/msquic_generated.cs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ internal enum QUIC_STREAM_OPEN_FLAGS
NONE = 0x0000,
UNIDIRECTIONAL = 0x0001,
ZERO_RTT = 0x0002,
DELAY_ID_FC_UPDATES = 0x0004,
}

[System.Flags]
Expand Down
18 changes: 18 additions & 0 deletions src/generated/linux/stream.c.clog.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,24 @@ tracepoint(CLOG_STREAM_C, UpdatePriority , arg1, arg3);\



/*----------------------------------------------------------
// Decoder Ring for ConfiguredForDelayedIDFC
// [strm][%p] Configured for delayed ID FC updates
// QuicTraceLogStreamVerbose(
ConfiguredForDelayedIDFC,
Stream,
"Configured for delayed ID FC updates");
// arg1 = arg1 = Stream = arg1
----------------------------------------------------------*/
#ifndef _clog_3_ARGS_TRACE_ConfiguredForDelayedIDFC
#define _clog_3_ARGS_TRACE_ConfiguredForDelayedIDFC(uniqueId, arg1, encoded_arg_string)\
tracepoint(CLOG_STREAM_C, ConfiguredForDelayedIDFC , arg1);\

#endif




/*----------------------------------------------------------
// Decoder Ring for IndicateStartComplete
// [strm][%p] Indicating QUIC_STREAM_EVENT_START_COMPLETE [Status=0x%x ID=%llu Accepted=%hhu]
Expand Down
19 changes: 19 additions & 0 deletions src/generated/linux/stream.c.clog.h.lttng.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,25 @@ TRACEPOINT_EVENT(CLOG_STREAM_C, UpdatePriority,



/*----------------------------------------------------------
// Decoder Ring for ConfiguredForDelayedIDFC
// [strm][%p] Configured for delayed ID FC updates
// QuicTraceLogStreamVerbose(
ConfiguredForDelayedIDFC,
Stream,
"Configured for delayed ID FC updates");
// arg1 = arg1 = Stream = arg1
----------------------------------------------------------*/
TRACEPOINT_EVENT(CLOG_STREAM_C, ConfiguredForDelayedIDFC,
TP_ARGS(
const void *, arg1),
TP_FIELDS(
ctf_integer_hex(uint64_t, arg1, arg1)
)
)



/*----------------------------------------------------------
// Decoder Ring for IndicateStartComplete
// [strm][%p] Indicating QUIC_STREAM_EVENT_START_COMPLETE [Status=0x%x ID=%llu Accepted=%hhu]
Expand Down
18 changes: 18 additions & 0 deletions src/generated/linux/stream_set.c.clog.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,24 @@ tracepoint(CLOG_STREAM_SET_C, IndicatePeerAccepted , arg1);\



/*----------------------------------------------------------
// Decoder Ring for ConfiguredForDelayedIDFC
// [strm][%p] Configured for delayed ID FC updates
// QuicTraceLogStreamVerbose(
ConfiguredForDelayedIDFC,
Stream,
"Configured for delayed ID FC updates");
// arg1 = arg1 = Stream = arg1
----------------------------------------------------------*/
#ifndef _clog_3_ARGS_TRACE_ConfiguredForDelayedIDFC
#define _clog_3_ARGS_TRACE_ConfiguredForDelayedIDFC(uniqueId, arg1, encoded_arg_string)\
tracepoint(CLOG_STREAM_SET_C, ConfiguredForDelayedIDFC , arg1);\

#endif




/*----------------------------------------------------------
// Decoder Ring for MaxStreamCountUpdated
// [conn][%p] App configured max stream count of %hu (type=%hhu).
Expand Down
19 changes: 19 additions & 0 deletions src/generated/linux/stream_set.c.clog.h.lttng.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,25 @@ TRACEPOINT_EVENT(CLOG_STREAM_SET_C, IndicatePeerAccepted,



/*----------------------------------------------------------
// Decoder Ring for ConfiguredForDelayedIDFC
// [strm][%p] Configured for delayed ID FC updates
// QuicTraceLogStreamVerbose(
ConfiguredForDelayedIDFC,
Stream,
"Configured for delayed ID FC updates");
// arg1 = arg1 = Stream = arg1
----------------------------------------------------------*/
TRACEPOINT_EVENT(CLOG_STREAM_SET_C, ConfiguredForDelayedIDFC,
TP_ARGS(
const void *, arg1),
TP_FIELDS(
ctf_integer_hex(uint64_t, arg1, arg1)
)
)



/*----------------------------------------------------------
// Decoder Ring for MaxStreamCountUpdated
// [conn][%p] App configured max stream count of %hu (type=%hhu).
Expand Down
2 changes: 2 additions & 0 deletions src/inc/msquic.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ typedef enum QUIC_STREAM_OPEN_FLAGS {
QUIC_STREAM_OPEN_FLAG_NONE = 0x0000,
QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL = 0x0001, // Indicates the stream is unidirectional.
QUIC_STREAM_OPEN_FLAG_0_RTT = 0x0002, // The stream was opened via a 0-RTT packet.
QUIC_STREAM_OPEN_FLAG_DELAY_ID_FC_UPDATES = 0x0004, // Indicates stream ID flow control limit updates for the
// connection should be delayed to StreamClose.
} QUIC_STREAM_OPEN_FLAGS;

DEFINE_ENUM_FLAG_OPERATORS(QUIC_STREAM_OPEN_FLAGS)
Expand Down
17 changes: 17 additions & 0 deletions src/manifest/clog.sidecar
Original file line number Diff line number Diff line change
Expand Up @@ -963,6 +963,18 @@
],
"macroName": "QuicTraceLogInfo"
},
"ConfiguredForDelayedIDFC": {
"ModuleProperites": {},
"TraceString": "[strm][%p] Configured for delayed ID FC updates",
"UniqueId": "ConfiguredForDelayedIDFC",
"splitArgs": [
{
"DefinationEncoding": "p",
"MacroVariableName": "arg1"
}
],
"macroName": "QuicTraceLogStreamVerbose"
},
"ConnAppShutdown": {
"ModuleProperites": {},
"TraceString": "[conn][%p] App Shutdown: %llu (Remote=%hhu)",
Expand Down Expand Up @@ -12670,6 +12682,11 @@
"TraceID": "ConfigurationSettingsUpdated",
"EncodingString": "[cnfg][%p] Settings %p Updated"
},
{
"UniquenessHash": "fd83d3d8-994b-ccc3-13b9-cf8de77990ca",
"TraceID": "ConfiguredForDelayedIDFC",
"EncodingString": "[strm][%p] Configured for delayed ID FC updates"
},
{
"UniquenessHash": "905ba7f9-0427-2d53-7d13-88f2bb532bf0",
"TraceID": "ConnAppShutdown",
Expand Down
1 change: 1 addition & 0 deletions src/plugins/dbg/quictypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ typedef union QUIC_STREAM_FLAGS {
BOOLEAN Freed : 1; // Freed after last ref count released. Used for Debugging.

BOOLEAN InStreamTable : 1; // The stream is currently in the connection's table.
BOOLEAN DelayIdFcUpdate : 1; // Delay stream ID FC updates to StreamClose.
};
} QUIC_STREAM_FLAGS;

Expand Down
5 changes: 4 additions & 1 deletion src/tools/spin/spinquic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,9 @@ QUIC_STATUS QUIC_API SpinQuicHandleConnectionEvent(HQUIC Connection, void* , QUI
MsQuic.StreamClose(Event->PEER_STREAM_STARTED.Stream);
return QUIC_STATUS_SUCCESS;
}
if (GetRandom(2) == 0) {
Event->PEER_STREAM_STARTED.Flags |= QUIC_STREAM_OPEN_FLAG_DELAY_ID_FC_UPDATES;
}
auto StreamCtx = new SpinQuicStream(*ctx, Event->PEER_STREAM_STARTED.Stream);
MsQuic.SetCallbackHandler(Event->PEER_STREAM_STARTED.Stream, (void *)SpinQuicHandleStreamEvent, StreamCtx);
ctx->AddStream(Event->PEER_STREAM_STARTED.Stream);
Expand Down Expand Up @@ -748,7 +751,7 @@ void Spin(Gbs& Gb, LockableVector<HQUIC>& Connections, std::vector<HQUIC>* Liste
BAIL_ON_NULL_CONNECTION(Connection);
HQUIC Stream;
auto ctx = new SpinQuicStream(*SpinQuicConnection::Get(Connection));
QUIC_STATUS Status = MsQuic.StreamOpen(Connection, (QUIC_STREAM_OPEN_FLAGS)GetRandom(4), SpinQuicHandleStreamEvent, ctx, &Stream);
QUIC_STATUS Status = MsQuic.StreamOpen(Connection, (QUIC_STREAM_OPEN_FLAGS)GetRandom(8), SpinQuicHandleStreamEvent, ctx, &Stream);
if (QUIC_SUCCEEDED(Status)) {
ctx->Handle = Stream;
SpinQuicGetRandomParam(Stream, ThreadID);
Expand Down

0 comments on commit b7b2d5e

Please sign in to comment.