From 2f890200feb3082cb7d9fda7034e4dea0040c372 Mon Sep 17 00:00:00 2001 From: Jason Hu Date: Wed, 10 Jan 2024 16:00:34 +0000 Subject: [PATCH] change ServerDDSQueued to write --- .../ServerDDSQueued/ServerDDSQueued.cpp | 23 +++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/DPDPU/AppDisagg/ServerDDSQueued/ServerDDSQueued.cpp b/DPDPU/AppDisagg/ServerDDSQueued/ServerDDSQueued.cpp index 180e590..e6a664e 100644 --- a/DPDPU/AppDisagg/ServerDDSQueued/ServerDDSQueued.cpp +++ b/DPDPU/AppDisagg/ServerDDSQueued/ServerDDSQueued.cpp @@ -91,8 +91,11 @@ void ThreadFunc(DDS_FrontEnd::DDSFrontEnd *Store, SOCKET clientSocket, int paylo ErrorCodeT ret = -1; // Initialize context and preallocate memory // total no. of outstanding requests is batch size * queue depth - int responseSize = sizeof(MessageHeader) + payloadSize; - int batchRespSize = responseSize * batchSize; + + ////int responseSize = sizeof(MessageHeader) + payloadSize; + int responseSize = sizeof(MessageHeader); + ////int batchRespSize = responseSize * batchSize; + int batchRespSize = (sizeof(MessageHeader)) * batchSize; char* bufferMem = (char*)_aligned_malloc(batchSize * queueDepth * responseSize, 4096); ServerContext* serverContexts = (ServerContext*)malloc(batchSize * queueDepth * sizeof(ServerContext)); @@ -105,7 +108,9 @@ void ThreadFunc(DDS_FrontEnd::DDSFrontEnd *Store, SOCKET clientSocket, int paylo // Receive the first message int msgSize = 0; HandleFirstMsg(clientSocket, &msgSize); - int mbatchSize = msgSize / sizeof(MessageHeader); + + ////int mbatchSize = msgSize / sizeof(MessageHeader); + int mbatchSize = msgSize / (sizeof(MessageHeader) + payloadSize); // cout << "got payload size: " << msgSize << ", got batch size: " << mbatchSize << endl; // cout << "using batch size: " << batchSize << endl; @@ -175,8 +180,9 @@ void ThreadFunc(DDS_FrontEnd::DDSFrontEnd *Store, SOCKET clientSocket, int paylo remainingBytesToReceive = msgSize; // issue file read - requestMsg = (MessageHeader*)recvBuffer; + ////requestMsg = (MessageHeader*)recvBuffer; for (size_t i = 0; i < batchSize; i++) { + requestMsg = (MessageHeader*)(recvBuffer + i * (sizeof(MessageHeader) + payloadSize)); // cout << "requestMsg batch id " << requestMsg->BatchId << endl; int index = requestMsg->BatchId * batchSize + i; char* respBuffer = bufferMem + index * responseSize; @@ -192,16 +198,19 @@ void ThreadFunc(DDS_FrontEnd::DDSFrontEnd *Store, SOCKET clientSocket, int paylo // respCtx->serverContexts = serverContexts; respCtx->batchCompletionCounts = batchCompletionCounts; - ret = Store->ReadFile(requestMsg->FileId, respBuffer, requestMsg->Offset, payloadSize, &respCtx->BytesRead, NULL, respCtx); + ////ret = Store->ReadFile(requestMsg->FileId, respBuffer, requestMsg->Offset, payloadSize, &respCtx->BytesRead, NULL, respCtx); + ret = Store->WriteFile(requestMsg->FileId, recvBuffer + i * (sizeof(MessageHeader) + payloadSize) + sizeof(MessageHeader), + requestMsg->Offset, payloadSize, &respCtx->BytesRead, NULL, respCtx); while (ret == DDS_ERROR_CODE_TOO_MANY_REQUESTS || ret == DDS_ERROR_CODE_REQUEST_RING_FAILURE) { // TODO: back off strategy, how? perhaps use another dedicated thread that runs this tight loop for all callers of readfile? // just retry issuing the read - ret = Store->ReadFile(requestMsg->FileId, respBuffer, requestMsg->Offset, payloadSize, &respCtx->BytesRead, NULL, respCtx); + ret = Store->WriteFile(requestMsg->FileId, recvBuffer + i * (sizeof(MessageHeader) + payloadSize) + sizeof(MessageHeader), + requestMsg->Offset, payloadSize, &respCtx->BytesRead, NULL, respCtx); } - requestMsg += 1; + ////requestMsg += 1; } POP_COMPLETIONS: