Skip to content
This repository has been archived by the owner on Sep 24, 2024. It is now read-only.

Commit

Permalink
change ServerDDSQueued to write
Browse files Browse the repository at this point in the history
  • Loading branch information
monkey-sheng committed Jan 10, 2024
1 parent 5333854 commit 2f89020
Showing 1 changed file with 16 additions and 7 deletions.
23 changes: 16 additions & 7 deletions DPDPU/AppDisagg/ServerDDSQueued/ServerDDSQueued.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,11 @@ void ThreadFunc(DDS_FrontEnd::DDSFrontEnd *Store, SOCKET clientSocket, int paylo
ErrorCodeT ret = -1;
// Initialize context and preallocate memory
// total no. of outstanding requests is batch size * queue depth
int responseSize = sizeof(MessageHeader) + payloadSize;
int batchRespSize = responseSize * batchSize;

////int responseSize = sizeof(MessageHeader) + payloadSize;
int responseSize = sizeof(MessageHeader);
////int batchRespSize = responseSize * batchSize;
int batchRespSize = (sizeof(MessageHeader)) * batchSize;

char* bufferMem = (char*)_aligned_malloc(batchSize * queueDepth * responseSize, 4096);
ServerContext* serverContexts = (ServerContext*)malloc(batchSize * queueDepth * sizeof(ServerContext));
Expand All @@ -105,7 +108,9 @@ void ThreadFunc(DDS_FrontEnd::DDSFrontEnd *Store, SOCKET clientSocket, int paylo
// Receive the first message
int msgSize = 0;
HandleFirstMsg(clientSocket, &msgSize);
int mbatchSize = msgSize / sizeof(MessageHeader);

////int mbatchSize = msgSize / sizeof(MessageHeader);
int mbatchSize = msgSize / (sizeof(MessageHeader) + payloadSize);

// cout << "got payload size: " << msgSize << ", got batch size: " << mbatchSize << endl;
// cout << "using batch size: " << batchSize << endl;
Expand Down Expand Up @@ -175,8 +180,9 @@ void ThreadFunc(DDS_FrontEnd::DDSFrontEnd *Store, SOCKET clientSocket, int paylo
remainingBytesToReceive = msgSize;

// issue file read
requestMsg = (MessageHeader*)recvBuffer;
////requestMsg = (MessageHeader*)recvBuffer;
for (size_t i = 0; i < batchSize; i++) {
requestMsg = (MessageHeader*)(recvBuffer + i * (sizeof(MessageHeader) + payloadSize));
// cout << "requestMsg batch id " << requestMsg->BatchId << endl;
int index = requestMsg->BatchId * batchSize + i;
char* respBuffer = bufferMem + index * responseSize;
Expand All @@ -192,16 +198,19 @@ void ThreadFunc(DDS_FrontEnd::DDSFrontEnd *Store, SOCKET clientSocket, int paylo
// respCtx->serverContexts = serverContexts;
respCtx->batchCompletionCounts = batchCompletionCounts;

ret = Store->ReadFile(requestMsg->FileId, respBuffer, requestMsg->Offset, payloadSize, &respCtx->BytesRead, NULL, respCtx);
////ret = Store->ReadFile(requestMsg->FileId, respBuffer, requestMsg->Offset, payloadSize, &respCtx->BytesRead, NULL, respCtx);
ret = Store->WriteFile(requestMsg->FileId, recvBuffer + i * (sizeof(MessageHeader) + payloadSize) + sizeof(MessageHeader),
requestMsg->Offset, payloadSize, &respCtx->BytesRead, NULL, respCtx);

while (ret == DDS_ERROR_CODE_TOO_MANY_REQUESTS || ret == DDS_ERROR_CODE_REQUEST_RING_FAILURE) {
// TODO: back off strategy, how? perhaps use another dedicated thread that runs this tight loop for all callers of readfile?
// just retry issuing the read

ret = Store->ReadFile(requestMsg->FileId, respBuffer, requestMsg->Offset, payloadSize, &respCtx->BytesRead, NULL, respCtx);
ret = Store->WriteFile(requestMsg->FileId, recvBuffer + i * (sizeof(MessageHeader) + payloadSize) + sizeof(MessageHeader),
requestMsg->Offset, payloadSize, &respCtx->BytesRead, NULL, respCtx);
}

requestMsg += 1;
////requestMsg += 1;
}

POP_COMPLETIONS:
Expand Down

0 comments on commit 2f89020

Please sign in to comment.