Skip to content

Commit

Permalink
1G Multi receive
Browse files Browse the repository at this point in the history
  • Loading branch information
ami-GS committed Jun 19, 2024
1 parent cc19992 commit 9398cd9
Showing 1 changed file with 103 additions and 33 deletions.
136 changes: 103 additions & 33 deletions src/test/lib/DataTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3796,13 +3796,23 @@ QuicTestStreamReliableResetMultipleSends(
}
#endif // QUIC_PARAM_STREAM_RELIABLE_OFFSET


#define MultiRecvNumSend 18
#define MultiRecvNumSend 3
uint8_t Buffer1G[1000000000] = {};
struct MultiReceiveTestContext {
CxPlatEvent PktRecvd[MultiRecvNumSend] {0};
MsQuicStream* ServerStream {nullptr};
int Recvd {0};
int RecvdSignatures[MultiRecvNumSend] {0};
uint8_t RecvdSignatures[MultiRecvNumSend] {0};
int PseudoProcessingLength {0};
CXPLAT_LOCK Lock;
uint32_t TotalLength {0};

MultiReceiveTestContext() {
CxPlatLockInitialize(&Lock);
}
~MultiReceiveTestContext() {
CxPlatLockUninitialize(&Lock);
}

static QUIC_STATUS ServerStreamCallback(_In_ MsQuicStream* Stream, _In_opt_ void* Context, _Inout_ QUIC_STREAM_EVENT* Event) {
UNREFERENCED_PARAMETER(Stream);
Expand All @@ -3811,7 +3821,14 @@ struct MultiReceiveTestContext {
QUIC_STATUS Status = QUIC_STATUS_SUCCESS;
auto TestContext = (MultiReceiveTestContext*)Context;
if (Event->Type == QUIC_STREAM_EVENT_RECEIVE) {
TestContext->RecvdSignatures[TestContext->Recvd] = Event->RECEIVE.Buffers[0].Buffer[Event->RECEIVE.Buffers[0].Length-1];
int64_t TotalLength =Event->RECEIVE.TotalBufferLength;
const QUIC_BUFFER* Buffers = Event->RECEIVE.Buffers;
uint32_t BufferCount = Event->RECEIVE.BufferCount;
TestContext->RecvdSignatures[TestContext->Recvd] = Buffers[BufferCount-1].Buffer[Buffers[BufferCount-1].Length-1];
CxPlatLockAcquire(&TestContext->Lock);
TestContext->PseudoProcessingLength += Event->RECEIVE.TotalBufferLength;
CxPlatLockRelease(&TestContext->Lock);
TestContext->TotalLength += Event->RECEIVE.TotalBufferLength;
if (TestContext->RecvdSignatures[TestContext->Recvd] != 0) {
TestContext->PktRecvd[TestContext->Recvd++].Set();
}
Expand Down Expand Up @@ -3852,9 +3869,10 @@ QuicTestStreamMultiReceive(

// Server side multi receive simple. 3 Sends and Complete at once
{
uint8_t RawBuffer[128] = {};
QUIC_BUFFER Buffer { sizeof(RawBuffer), RawBuffer };
int NumSend = 3;
uint32_t BufferSize = 128;
QUIC_BUFFER Buffer { BufferSize, Buffer1G };
int NumSend = MultiRecvNumSend;

MultiReceiveTestContext Context;
MsQuicAutoAcceptListener Listener(Registration, ServerConfiguration, MultiReceiveTestContext::ConnCallback, &Context);
TEST_QUIC_SUCCEEDED(Listener.GetInitStatus());
Expand All @@ -3874,11 +3892,11 @@ QuicTestStreamMultiReceive(
TEST_QUIC_SUCCEEDED(Stream.Start(QUIC_STREAM_START_FLAG_IMMEDIATE));

for (int i = 0; i < NumSend; i++) {
Buffer.Buffer[sizeof(RawBuffer)-1] = i + 1;
Buffer.Buffer[BufferSize-1] = i + 1;
TEST_QUIC_SUCCEEDED(Stream.Send(&Buffer, 1, i == NumSend - 1 ? QUIC_SEND_FLAG_FIN : QUIC_SEND_FLAG_NONE));
TEST_TRUE(Context.PktRecvd[i].WaitTimeout(TestWaitTimeout));
}
Context.ServerStream->ReceiveComplete(sizeof(RawBuffer) * NumSend);
Context.ServerStream->ReceiveComplete(BufferSize * NumSend);

for (int i = 0; i < NumSend; i++) {
TEST_TRUE(Context.RecvdSignatures[i] == i + 1);
Expand All @@ -3888,8 +3906,10 @@ QuicTestStreamMultiReceive(
// Server side multi receive. MultiRecvNumSend Sends and Complete every 8 sends
// Possible packet split
{
uint8_t RawBuffer[2048] = {};
QUIC_BUFFER Buffer { sizeof(RawBuffer), RawBuffer };
uint32_t BufferSize = 2048;
QUIC_BUFFER Buffer { BufferSize, Buffer1G };
int NumSend = MultiRecvNumSend;

MultiReceiveTestContext Context;
MsQuicAutoAcceptListener Listener(Registration, ServerConfiguration, MultiReceiveTestContext::ConnCallback, &Context);
TEST_QUIC_SUCCEEDED(Listener.GetInitStatus());
Expand All @@ -3909,29 +3929,31 @@ QuicTestStreamMultiReceive(
TEST_QUIC_SUCCEEDED(Stream.Start(QUIC_STREAM_START_FLAG_IMMEDIATE));

int lastCompleted = -1;
for (int i = 0; i < MultiRecvNumSend; i++) {
Buffer.Buffer[sizeof(RawBuffer)-1] = (i % 255) + 1;
TEST_QUIC_SUCCEEDED(Stream.Send(&Buffer, 1, i == MultiRecvNumSend - 1 ? QUIC_SEND_FLAG_FIN : QUIC_SEND_FLAG_NONE));
for (int i = 0; i < NumSend; i++) {
Buffer.Buffer[BufferSize-1] = (i % 255) + 1;
TEST_QUIC_SUCCEEDED(Stream.Send(&Buffer, 1, i == NumSend - 1 ? QUIC_SEND_FLAG_FIN : QUIC_SEND_FLAG_NONE));
TEST_TRUE(Context.PktRecvd[i].WaitTimeout(TestWaitTimeout));
if ((i + 1) % 8 == 0) { // ReceiveComplete every 8 sends
Context.ServerStream->ReceiveComplete(sizeof(RawBuffer) * (i - lastCompleted));
Context.ServerStream->ReceiveComplete(BufferSize * (i - lastCompleted));
lastCompleted = i;
}
}
if (lastCompleted != MultiRecvNumSend - 1) {
Context.ServerStream->ReceiveComplete(sizeof(RawBuffer) * (MultiRecvNumSend - lastCompleted - 1));
if (lastCompleted != NumSend - 1) {
Context.ServerStream->ReceiveComplete(BufferSize * (NumSend - lastCompleted - 1));
}

for (int i = 0; i < MultiRecvNumSend; i++) {
for (int i = 0; i < NumSend; i++) {
TEST_TRUE(Context.RecvdSignatures[i] == (i % 255) + 1);
}
}

// Server side multi receive. MultiRecvNumSend Sends and Complete every 8 sends
// handle MAX_STREAM_DATA and STREAM_DATA_BLOCKED
// Server side multi receive.
// simple case of handle MAX_STREAM_DATA and STREAM_DATA_BLOCKED
{
uint8_t RawBuffer[16384] = {};
QUIC_BUFFER Buffer { sizeof(RawBuffer), RawBuffer };
uint32_t BufferSize = 65537;
QUIC_BUFFER Buffer { BufferSize, Buffer1G };
int NumSend = 1;

MultiReceiveTestContext Context;
MsQuicAutoAcceptListener Listener(Registration, ServerConfiguration, MultiReceiveTestContext::ConnCallback, &Context);
TEST_QUIC_SUCCEEDED(Listener.GetInitStatus());
Expand All @@ -3950,23 +3972,71 @@ QuicTestStreamMultiReceive(
TEST_QUIC_SUCCEEDED(Stream.GetInitStatus());
TEST_QUIC_SUCCEEDED(Stream.Start(QUIC_STREAM_START_FLAG_IMMEDIATE));

int lastCompleted = -1;
for (int i = 0; i < MultiRecvNumSend; i++) {
Buffer.Buffer[sizeof(RawBuffer)-1] = (i % 255) + 1;
TEST_QUIC_SUCCEEDED(Stream.Send(&Buffer, 1, i == MultiRecvNumSend - 1 ? QUIC_SEND_FLAG_FIN : QUIC_SEND_FLAG_NONE));
for (int i = 0; i < NumSend; i++) {
Buffer.Buffer[BufferSize-1] = (i % 255) + 1;
TEST_QUIC_SUCCEEDED(Stream.Send(&Buffer, 1, i == NumSend - 1 ? QUIC_SEND_FLAG_FIN : QUIC_SEND_FLAG_NONE));
CxPlatSleep(10);
Context.ServerStream->ReceiveComplete(65536-i);
TEST_TRUE(Context.PktRecvd[i].WaitTimeout(TestWaitTimeout));
if ((i + 1) % 8 == 0) { // ReceiveComplete every 8 sends
Context.ServerStream->ReceiveComplete(sizeof(RawBuffer) * (i - lastCompleted));
lastCompleted = i;
}
Context.ServerStream->ReceiveComplete(1+i);
}
if (lastCompleted != MultiRecvNumSend - 1) {
Context.ServerStream->ReceiveComplete(sizeof(RawBuffer) * (MultiRecvNumSend - lastCompleted - 1));

for (int i = 0; i < NumSend; i++) {
TEST_TRUE(Context.RecvdSignatures[i] == (i % 255) + 1);
}
}

// Server side multi receive. Send 1G bytes
// handle MAX_STREAM_DATA and STREAM_DATA_BLOCKED
{
uint32_t BufferSize = sizeof(Buffer1G);
QUIC_BUFFER Buffer { BufferSize, Buffer1G };
int NumSend = 1;

for (int i = 0; i < MultiRecvNumSend; i++) {
MultiReceiveTestContext Context;
MsQuicAutoAcceptListener Listener(Registration, ServerConfiguration, MultiReceiveTestContext::ConnCallback, &Context);
TEST_QUIC_SUCCEEDED(Listener.GetInitStatus());
TEST_QUIC_SUCCEEDED(Listener.Start("MsQuicTest"));
QuicAddr ServerLocalAddr;
TEST_QUIC_SUCCEEDED(Listener.GetLocalAddr(ServerLocalAddr));

MsQuicConnection Connection(Registration);
TEST_QUIC_SUCCEEDED(Connection.GetInitStatus());

TEST_QUIC_SUCCEEDED(Connection.Start(ClientConfiguration, ServerLocalAddr.GetFamily(), QUIC_TEST_LOOPBACK_FOR_AF(ServerLocalAddr.GetFamily()), ServerLocalAddr.GetPort()));
TEST_TRUE(Connection.HandshakeCompleteEvent.WaitTimeout(TestWaitTimeout));
TEST_TRUE(Connection.HandshakeComplete);

MsQuicStream Stream(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, MultiReceiveTestContext::ClientStreamCallback, &Context);
TEST_QUIC_SUCCEEDED(Stream.GetInitStatus());
TEST_QUIC_SUCCEEDED(Stream.Start(QUIC_STREAM_START_FLAG_IMMEDIATE));

for (int i = 0; i < NumSend; i++) {
Buffer.Buffer[BufferSize-1] = (i % 255) + 1;
TEST_QUIC_SUCCEEDED(Stream.Send(&Buffer, 1, i == NumSend - 1 ? QUIC_SEND_FLAG_FIN : QUIC_SEND_FLAG_NONE));

int CompletingLength = 0;
while (!(Context.PktRecvd[i].WaitTimeout(1))) {
CxPlatLockAcquire(&Context.Lock);
CompletingLength = Context.PseudoProcessingLength;
Context.PseudoProcessingLength = 0;
CxPlatLockRelease(&Context.Lock);
if (CompletingLength > 0) {
fprintf(stderr, "Complete: %d\n", CompletingLength);
Context.ServerStream->ReceiveComplete(CompletingLength);
}
}
if (Context.PseudoProcessingLength > 0) {
fprintf(stderr, "Complete FIN: %d\n", Context.PseudoProcessingLength);
Context.ServerStream->ReceiveComplete(Context.PseudoProcessingLength);
Context.PseudoProcessingLength = 0;
}
}

for (int i = 0; i < NumSend; i++) {
TEST_TRUE(Context.RecvdSignatures[i] == (i % 255) + 1);
}
TEST_TRUE(Context.TotalLength == BufferSize * NumSend);
}


Expand Down

0 comments on commit 9398cd9

Please sign in to comment.