Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple Receive API Support #4182

Merged
merged 53 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from 49 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
453b37f
WIP
nibanks Mar 5, 2024
202512f
Merge branch 'main' into nibanks/multi-recv-api
nibanks Mar 6, 2024
2e47f2c
Fixes
nibanks Mar 6, 2024
8368753
Simplify
nibanks Mar 6, 2024
43b80ab
fix
nibanks Mar 7, 2024
d102d91
Merge branch 'main' into nibanks/multi-recv-api
nibanks Mar 8, 2024
bb35f68
Fix
nibanks Mar 8, 2024
9860108
Fix more bugs
nibanks Mar 8, 2024
238416b
Fix clog and .net
nibanks Mar 8, 2024
db8a078
Fixes
nibanks Mar 8, 2024
2682233
Improvements
nibanks Mar 12, 2024
b88c126
Merge branch 'main' into nibanks/multi-recv-api
nibanks Apr 26, 2024
176312e
Undo a merge issue
nibanks Apr 26, 2024
d0261be
Another merge issue
nibanks Apr 26, 2024
db66cc8
Merge branch 'main' into nibanks/multi-recv-api
nibanks May 8, 2024
f09c55d
Merge branch 'main' into nibanks/multi-recv-api
ami-GS Jun 12, 2024
3781772
Simple test
ami-GS Jun 18, 2024
70e000c
remove fprintf
ami-GS Jun 18, 2024
cc19992
fix signature to upto 255. add Large send case
ami-GS Jun 18, 2024
a1f2339
1G Multi receive
ami-GS Jun 19, 2024
0bc4c4f
fix drain bug
ami-GS Jun 21, 2024
62e3558
kernel test
ami-GS Jun 21, 2024
b6a70e5
Merge branch 'main' into nibanks/multi-recv-api
ami-GS Jun 21, 2024
c0a292e
fix
ami-GS Jun 21, 2024
3f41455
fix
ami-GS Jun 21, 2024
783dd75
unused variable
ami-GS Jun 21, 2024
c97b52f
compare data
ami-GS Jun 21, 2024
dc1b8ab
tmp
ami-GS Jun 27, 2024
393130d
test cases
ami-GS Jun 28, 2024
338a1d2
95% works
ami-GS Jul 1, 2024
aa7a04c
Merge branch 'dev/daiki/multi-recv-debug' into nibanks/multi-recv-api
ami-GS Jul 1, 2024
c731ceb
remove fprintf in core
ami-GS Jul 1, 2024
ebc8e52
Fix Range copy
ami-GS Jul 1, 2024
f5e7950
fix type mimatch
ami-GS Jul 1, 2024
ba4a479
cleanup
ami-GS Jul 1, 2024
b3115e0
Merge branch 'main' into nibanks/multi-recv-api
ami-GS Jul 3, 2024
5972aa3
Fix stall issue
ami-GS Jul 5, 2024
1e7fa64
rollback
ami-GS Jul 5, 2024
58f7c3a
retry if length is 0
ami-GS Jul 6, 2024
052be96
remove continue to reset Readpending in QuicStreamReceiveComplete
ami-GS Jul 6, 2024
7246587
stop enablling send with pending data (race condition exists)
ami-GS Jul 6, 2024
2fcb778
fix QuicRecvBufferHasUnreadData
ami-GS Jul 8, 2024
42d885d
rollback for recv_buff lock bug then return earlier not to indicate F…
ami-GS Jul 8, 2024
714042c
fix
ami-GS Jul 8, 2024
8893063
fix
ami-GS Jul 8, 2024
79ce0b7
reduce test buffer size for CI speed
ami-GS Jul 10, 2024
d4f990c
Merge branch 'main' into nibanks/multi-recv-api
ami-GS Jul 31, 2024
0c405f8
update document
ami-GS Aug 1, 2024
e765cf3
add back buffer count caution
ami-GS Aug 1, 2024
399b926
Merge branch 'main' into nibanks/multi-recv-api
ami-GS Aug 5, 2024
ff1a339
Update docs/api/StreamReceiveComplete.md
ami-GS Aug 5, 2024
657540b
Update docs/api/StreamReceiveComplete.md
ami-GS Aug 5, 2024
9ba15d2
logical conflicts
ami-GS Aug 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/Settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ The following settings are available via registry as well as via [QUIC_SETTINGS]
| Stateless Operation Expiration | uint16_t | StatelessOperationExpirationMs | 100 | The time limit between operations for the same endpoint, in milliseconds. |
| Congestion Control Algorithm | uint16_t | CongestionControlAlgorithm | 0 (Cubic) | The congestion control algorithm used for the connection. |
| ECN | uint8_t | EcnEnabled | 0 (FALSE) | Enable sender-side ECN support. |
| Stream Multi Receive | uint8_t | StreamMultiReceiveEnabled | 0 (FALSE) | Enable multi receive support |

The types map to registry types as follows:
- `uint64_t` is a `REG_QWORD`.
Expand Down
12 changes: 10 additions & 2 deletions docs/Streams.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ Typically, the buffer count is one, which means that most events will include a

When the buffer count is 0, it signifies the reception of a QUIC frame with empty data, which also indicates the end of stream data.

Currently, the maximum buffer count is 2 in the case of partial receive, where only a portion of the buffer data is consumed (as explained below). However, it is strongly advised not to assume in application code that the upper limit is always 2. This caution is important because future releases may incorporate multiple circular buffers to enhance performance, leading to potential changes in the buffer count limit.
Currently, the maximum buffer count is 3 in the case of partial receive, where only a portion of the buffer data is consumed (as explained below). However, it is strongly advised not to assume in application code that the upper limit is always 3. This caution is important because future releases may change internal algorithm, leading to potential changes in the buffer count limit.

The app then may respond to the event in a number of ways:

Expand All @@ -100,4 +100,12 @@ Any value less than or equal to the initial **TotalBufferLength** value is allow

Whenever a receive isn't fully accepted by the app, additional receive events are immediately disabled. The app is assumed to be at capacity and not able to consume more until further indication. To re-enable receive callbacks, the app must call [StreamReceiveSetEnabled](api/StreamReceiveSetEnabled.md).

There are cases where an app may want to partially accept the current data, but still immediately get a callback with the rest of the data. To do this (only works in the synchronous flow) the app must return `QUIC_STATUS_CONTINUE`.
There are cases where an app may want to partially accept the current data, but still immediately get a callback with the rest of the data. To do this (only works in the synchronous flow) the app must return `QUIC_STATUS_CONTINUE`.

## Multi Receive mode

Setting [`StreamMultiReceiveEnabled`](./Settings.md) an app can continue getting indicated by `QUIC_STREAM_EVENT_RECEIVE` without returning `QUIC_STATUS_SUCCESS` nor calling [StreamReceiveComplete](api/StreamReceiveComplete.md).

This changes internal receive buffer more efficient for continuous receiving.

The app need to keep track of total `TotalBufferLength` to later call [StreamReceiveComplete](api/StreamReceiveComplete.md) appropriately.
12 changes: 10 additions & 2 deletions docs/api/QUIC_SETTINGS.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ typedef struct QUIC_SETTINGS {
uint64_t ReliableResetEnabled : 1;
uint64_t OneWayDelayEnabled : 1;
uint64_t NetStatsEventEnabled : 1;
uint64_t RESERVED : 22;
uint64_t StreamMultiReceiveEnabled : 1;
uint64_t RESERVED : 21;
#else
uint64_t RESERVED : 26;
#endif
Expand Down Expand Up @@ -104,7 +105,8 @@ typedef struct QUIC_SETTINGS {
uint64_t ReliableResetEnabled : 1;
uint64_t OneWayDelayEnabled : 1;
uint64_t NetStatsEventEnabled : 1;
uint64_t ReservedFlags : 59;
uint64_t StreamMultiReceiveEnabled : 1;
uint64_t ReservedFlags : 58;
#else
uint64_t ReservedFlags : 63;
#endif
Expand Down Expand Up @@ -351,6 +353,12 @@ Initial stream receive flow control window size for remotely initiated unidirect

**Default value:** 0 (no overwrite)

`StreamMultiReceiveEnabled`

Enable multi receive mode. An app can continue receiving stream data without calling `StreamReceiveComplete` for each `QUIC_STREAM_EVENT_RECEIVE` indication.

**Default value:** 0 (`FALSE`)

# Remarks

When setting new values for the settings, the app must set the corresponding `.IsSet.*` parameter for each actual parameter that is being set or updated. For example:
Expand Down
3 changes: 2 additions & 1 deletion docs/api/StreamReceiveComplete.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ void
# Remarks

This is an asynchronous API but can run inline if called in a callback.
The application must ensure that one `StreamReceiveComplete` call corresponds to one `QUIC_STREAM_EVENT_RECEIVE` event.
The application without setting `StreamMultiReceiveEnabled` must ensure that one `StreamReceiveComplete` call corresponds to one `QUIC_STREAM_EVENT_RECEIVE` event.
Duplicate `StreamReceiveComplete` calls after one `QUIC_STREAM_EVENT_RECEIVE` event are ignored silently even with different `BufferLength`.
`StreamMultiReceiveEnabled` modes doesn't follow this rule. Multiple `QUIC_STREAM_EVENT_RECEIVE` can be released at once by `StreamReceiveComplete`. The application need to keep track of accumulated `TotalBufferLength` with this mode.

# See Also

Expand Down
6 changes: 6 additions & 0 deletions src/core/quicdef.h
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,11 @@ CXPLAT_STATIC_ASSERT(
//
#define QUIC_DEFAULT_NET_STATS_EVENT_ENABLED FALSE

//
// The default settings for using multiple parallel receives for streams.
//
#define QUIC_DEFAULT_STREAM_MULTI_RECEIVE_ENABLED FALSE

//
// The number of rounds in Cubic Slow Start to sample RTT.
//
Expand Down Expand Up @@ -633,6 +638,7 @@ CXPLAT_STATIC_ASSERT(
#define QUIC_SETTING_RELIABLE_RESET_ENABLED "ReliableResetEnabled"
#define QUIC_SETTING_ONE_WAY_DELAY_ENABLED "OneWayDelayEnabled"
#define QUIC_SETTING_NET_STATS_EVENT_ENABLED "NetStatsEventEnabled"
#define QUIC_SETTING_STREAM_MULTI_RECEIVE_ENABLED "StreamMultiReceiveEnabled"

#define QUIC_SETTING_INITIAL_WINDOW_PACKETS "InitialWindowPackets"
#define QUIC_SETTING_SEND_IDLE_TIMEOUT_MS "SendIdleTimeoutMs"
Expand Down
41 changes: 41 additions & 0 deletions src/core/settings.c
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@
if (!Settings->IsSet.NetStatsEventEnabled) {
Settings->NetStatsEventEnabled = QUIC_DEFAULT_NET_STATS_EVENT_ENABLED;
}
if (!Settings->IsSet.StreamMultiReceiveEnabled) {
Settings->StreamMultiReceiveEnabled = QUIC_DEFAULT_STREAM_MULTI_RECEIVE_ENABLED;
}
}

_IRQL_requires_max_(PASSIVE_LEVEL)
Expand Down Expand Up @@ -330,6 +333,9 @@
if (!Destination->IsSet.NetStatsEventEnabled) {
Destination->NetStatsEventEnabled = Source->NetStatsEventEnabled;
}
if (!Destination->IsSet.StreamMultiReceiveEnabled) {
Destination->StreamMultiReceiveEnabled = Source->StreamMultiReceiveEnabled;
}
}

_IRQL_requires_max_(PASSIVE_LEVEL)
Expand Down Expand Up @@ -700,6 +706,11 @@
Destination->NetStatsEventEnabled = Source->NetStatsEventEnabled;
Destination->IsSet.NetStatsEventEnabled = TRUE;
}

if (Source->IsSet.StreamMultiReceiveEnabled && (!Destination->IsSet.StreamMultiReceiveEnabled || OverWrite)) {
Destination->StreamMultiReceiveEnabled = Source->StreamMultiReceiveEnabled;
Destination->IsSet.StreamMultiReceiveEnabled = TRUE;
}
return TRUE;
}

Expand Down Expand Up @@ -1358,6 +1369,16 @@
&ValueLen);
Settings->NetStatsEventEnabled = !!Value;
}
if (!Settings->IsSet.StreamMultiReceiveEnabled) {
Value = QUIC_DEFAULT_STREAM_MULTI_RECEIVE_ENABLED;
ValueLen = sizeof(Value);
CxPlatStorageReadValue(
Storage,
QUIC_SETTING_STREAM_MULTI_RECEIVE_ENABLED,
(uint8_t*)&Value,
&ValueLen);
Settings->StreamMultiReceiveEnabled = !!Value;
}
}

_IRQL_requires_max_(PASSIVE_LEVEL)
Expand Down Expand Up @@ -1426,6 +1447,7 @@
QuicTraceLogVerbose(SettingReliableResetEnabled, "[sett] ReliableResetEnabled = %hhu", Settings->ReliableResetEnabled);
QuicTraceLogVerbose(SettingOneWayDelayEnabled, "[sett] OneWayDelayEnabled = %hhu", Settings->OneWayDelayEnabled);
QuicTraceLogVerbose(SettingNetStatsEventEnabled, "[sett] NetStatsEventEnabled = %hhu", Settings->NetStatsEventEnabled);
QuicTraceLogVerbose(SettingsStreamMultiReceiveEnabled, "[sett] StreamMultiReceiveEnabled= %hhu", Settings->StreamMultiReceiveEnabled);
}

_IRQL_requires_max_(PASSIVE_LEVEL)
Expand Down Expand Up @@ -1587,6 +1609,9 @@
if (Settings->IsSet.NetStatsEventEnabled) {
QuicTraceLogVerbose(SettingNetStatsEventEnabled, "[sett] NetStatsEventEnabled = %hhu", Settings->NetStatsEventEnabled);
}
if (Settings->IsSet.StreamMultiReceiveEnabled) {
QuicTraceLogVerbose(SettingStreamMultiReceiveEnabled, "[sett] StreamMultiReceiveEnabled = %hhu", Settings->StreamMultiReceiveEnabled);

Check warning on line 1613 in src/core/settings.c

View check run for this annotation

Codecov / codecov/patch

src/core/settings.c#L1613

Added line #L1613 was not covered by tests
}
}

#define SETTINGS_SIZE_THRU_FIELD(SettingsType, Field) \
Expand Down Expand Up @@ -1843,6 +1868,14 @@
SettingsSize,
InternalSettings);

SETTING_COPY_FLAG_TO_INTERNAL_SIZED(
Flags,
StreamMultiReceiveEnabled,
QUIC_SETTINGS,
Settings,
SettingsSize,
InternalSettings);

return QUIC_STATUS_SUCCESS;
}

Expand Down Expand Up @@ -2004,6 +2037,14 @@
*SettingsLength,
InternalSettings);

SETTING_COPY_FLAG_FROM_INTERNAL_SIZED(
Flags,
StreamMultiReceiveEnabled,
QUIC_SETTINGS,
Settings,
*SettingsLength,
InternalSettings);

*SettingsLength = CXPLAT_MIN(*SettingsLength, sizeof(QUIC_SETTINGS));

return QUIC_STATUS_SUCCESS;
Expand Down
4 changes: 3 additions & 1 deletion src/core/settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ typedef struct QUIC_SETTINGS_INTERNAL {
uint64_t ReliableResetEnabled : 1;
uint64_t OneWayDelayEnabled : 1;
uint64_t NetStatsEventEnabled : 1;
uint64_t RESERVED : 17;
uint64_t StreamMultiReceiveEnabled : 1;
uint64_t RESERVED : 16;
} IsSet;
};

Expand Down Expand Up @@ -111,6 +112,7 @@ typedef struct QUIC_SETTINGS_INTERNAL {
uint8_t ReliableResetEnabled : 1;
uint8_t OneWayDelayEnabled : 1;
uint8_t NetStatsEventEnabled : 1;
uint8_t StreamMultiReceiveEnabled : 1;
uint8_t MtuDiscoveryMissingProbeCount;

} QUIC_SETTINGS_INTERNAL;
Expand Down
4 changes: 3 additions & 1 deletion src/core/stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ QuicStreamInitialize(
Stream->Flags.Allocated = TRUE;
Stream->Flags.SendEnabled = TRUE;
Stream->Flags.ReceiveEnabled = TRUE;
Stream->Flags.ReceiveMultiple = Connection->Settings.StreamMultiReceiveEnabled;
Stream->RecvMaxLength = UINT64_MAX;
Stream->RefCount = 1;
Stream->SendRequestsTail = &Stream->SendRequests;
Expand Down Expand Up @@ -131,7 +132,8 @@ QuicStreamInitialize(
&Stream->RecvBuffer,
InitialRecvBufferLength,
FlowControlWindowSize,
QUIC_RECV_BUF_MODE_CIRCULAR,
Stream->Flags.ReceiveMultiple ?
QUIC_RECV_BUF_MODE_MULTIPLE : QUIC_RECV_BUF_MODE_CIRCULAR,
PreallocatedRecvChunk);
if (QUIC_FAILED(Status)) {
goto Exit;
Expand Down
1 change: 1 addition & 0 deletions src/core/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ typedef union QUIC_STREAM_FLAGS {

BOOLEAN SendEnabled : 1; // Application is allowed to send data.
BOOLEAN ReceiveEnabled : 1; // Application is ready for receive callbacks.
BOOLEAN ReceiveMultiple : 1; // The app supports multiple parallel receive indications.
BOOLEAN ReceiveFlushQueued : 1; // The receive flush operation is queued.
BOOLEAN ReceiveDataPending : 1; // Data (or FIN) is queued and ready for delivery.
BOOLEAN ReceiveCallActive : 1; // There is an active receive to the app.
Expand Down
25 changes: 13 additions & 12 deletions src/core/stream_recv.c
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,7 @@ QuicStreamRecvQueueFlush(
// The caller has indicated data is ready to be indicated to the
// application. Queue a FLUSH_RECV if one isn't already queued.
//

if (Stream->Flags.ReceiveEnabled &&
Stream->Flags.ReceiveDataPending &&
Stream->RecvPendingLength == 0) {
if (Stream->Flags.ReceiveEnabled && Stream->Flags.ReceiveDataPending) {

if (AllowInlineFlush) {
QuicStreamRecvFlush(Stream);
Expand Down Expand Up @@ -541,7 +538,9 @@ QuicStreamProcessStreamFrame(
}
}

if (ReadyToDeliver) {
if (ReadyToDeliver &&
(Stream->RecvBuffer.RecvMode == QUIC_RECV_BUF_MODE_MULTIPLE ||
Stream->RecvBuffer.ReadPendingLength == 0)) {
Stream->Flags.ReceiveDataPending = TRUE;
QuicStreamRecvQueueFlush(
Stream,
Expand Down Expand Up @@ -870,8 +869,6 @@ QuicStreamRecvFlush(
return;
}

CXPLAT_TEL_ASSERT(!Stream->RecvPendingLength); // N.B. - Will be an invalid assert once we support multiple receives

BOOLEAN FlushRecv = TRUE;
while (FlushRecv) {
CXPLAT_DBG_ASSERT(!Stream->Flags.SentStopSending);
Expand Down Expand Up @@ -924,9 +921,10 @@ QuicStreamRecvFlush(
Event.RECEIVE.Flags |= QUIC_RECEIVE_FLAG_FIN; // TODO - 0-RTT flag?
}

Stream->Flags.ReceiveEnabled = FALSE;
Stream->Flags.ReceiveEnabled = Stream->Flags.ReceiveMultiple;
Stream->Flags.ReceiveCallActive = TRUE;
Stream->RecvPendingLength += Event.RECEIVE.TotalBufferLength;
CXPLAT_DBG_ASSERT(Stream->RecvPendingLength <= Stream->RecvBuffer.ReadPendingLength);

QuicTraceEvent(
StreamAppReceive,
Expand Down Expand Up @@ -1056,7 +1054,7 @@ QuicStreamReceiveComplete(
//
Stream->Flags.ReceiveEnabled = TRUE;

} else {
} else if (!Stream->Flags.ReceiveMultiple) {
//
// The app didn't drain all the data, so we will need to wait for them
// to request a new receive.
Expand All @@ -1080,9 +1078,10 @@ QuicStreamReceiveComplete(
if (Stream->Flags.ReceiveDataPending) {
//
// There is still more data for the app to process and it still has
// receive callbacks enabled, so do another recv flush.
// receive callbacks enabled, so do another recv flush (if not already
// doing multi-receive mode).
//
return TRUE;
return !Stream->Flags.ReceiveMultiple;
}

if (Stream->RecvBuffer.BaseOffset == Stream->RecvMaxLength) {
Expand Down Expand Up @@ -1157,7 +1156,9 @@ QuicStreamRecvSetEnabledState(
CXPLAT_DBG_ASSERT(!Stream->Flags.SentStopSending);
Stream->Flags.ReceiveEnabled = NewRecvEnabled;

if (Stream->Flags.Started && NewRecvEnabled) {
if (Stream->Flags.Started && NewRecvEnabled &&
(Stream->RecvBuffer.RecvMode == QUIC_RECV_BUF_MODE_MULTIPLE ||
Stream->RecvBuffer.ReadPendingLength == 0)) {
//
// The application just resumed receive callbacks. Queue a
// flush receive operation to start draining the receive buffer.
Expand Down
2 changes: 2 additions & 0 deletions src/core/unittest/SettingsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ TEST(SettingsTest, TestAllSettingsFieldsSet)
SETTINGS_FEATURE_SET_TEST(ReliableResetEnabled, QuicSettingsSettingsToInternal);
SETTINGS_FEATURE_SET_TEST(OneWayDelayEnabled, QuicSettingsSettingsToInternal);
SETTINGS_FEATURE_SET_TEST(NetStatsEventEnabled, QuicSettingsSettingsToInternal);
SETTINGS_FEATURE_SET_TEST(StreamMultiReceiveEnabled, QuicSettingsSettingsToInternal);

Settings.IsSetFlags = 0;
Settings.IsSet.RESERVED = ~Settings.IsSet.RESERVED;
Expand Down Expand Up @@ -209,6 +210,7 @@ TEST(SettingsTest, TestAllSettingsFieldsGet)
SETTINGS_FEATURE_GET_TEST(ReliableResetEnabled, QuicSettingsGetSettings);
SETTINGS_FEATURE_GET_TEST(OneWayDelayEnabled, QuicSettingsGetSettings);
SETTINGS_FEATURE_GET_TEST(NetStatsEventEnabled, QuicSettingsGetSettings);
SETTINGS_FEATURE_GET_TEST(StreamMultiReceiveEnabled, QuicSettingsGetSettings);

Settings.IsSetFlags = 0;
Settings.IsSet.RESERVED = ~Settings.IsSet.RESERVED;
Expand Down
Loading
Loading