Skip to content
This repository has been archived by the owner on Sep 24, 2024. It is now read-only.

Commit

Permalink
change ServerDDS from read to write
Browse files Browse the repository at this point in the history
  • Loading branch information
monkey-sheng committed Jan 10, 2024
1 parent 9c2f8e9 commit 5333854
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 20 deletions.
47 changes: 28 additions & 19 deletions DPDPU/AppDisagg/ServerDDS/ServerDDS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ void HandleFirstMsg(SOCKET clientSocket, int* msgSize) {
void ThreadFunc(SOCKET clientSocket, DDS_FrontEnd::DDSFrontEnd *Store, int payloadSize, int batchSize, int queueDepth, int clientIndex) {
// Initialize context and preallocate memory
// total no. of outstanding requests is batch size * queue depth
int responseSize = sizeof(MessageHeader) + payloadSize;

////for read
// int responseSize = sizeof(MessageHeader) + payloadSize;
////for write
int responseSize = sizeof(MessageHeader);

int batchRespSize = responseSize * batchSize;

char* bufferMem = (char*)_aligned_malloc(batchSize * queueDepth * responseSize, 4096);
Expand All @@ -46,14 +51,10 @@ void ThreadFunc(SOCKET clientSocket, DDS_FrontEnd::DDSFrontEnd *Store, int paylo
// Receive the first message
int msgSize = 0;
HandleFirstMsg(clientSocket, &msgSize);
int mbatchSize = msgSize / sizeof(MessageHeader);
/*iResult = recv(clientSocket, (char*)&msgSize, sizeof(int), 0);
if (iResult == SOCKET_ERROR || iResult != sizeof(int)) {
cout << "Error receiving the first message: " << WSAGetLastError() << endl;
closesocket(clientSocket);
WSACleanup();
return;
}*/
////for read
// int mbatchSize = msgSize / sizeof(MessageHeader);
////for write
int mbatchSize = msgSize / (sizeof(MessageHeader) + payloadSize);

// cout << "got payload size: " << msgSize << ", got batch size: " << mbatchSize << endl;
// cout << "using batch size: " << batchSize << endl;
Expand Down Expand Up @@ -117,16 +118,17 @@ void ThreadFunc(SOCKET clientSocket, DDS_FrontEnd::DDSFrontEnd *Store, int paylo


// issue file read
MessageHeader* requestMsg = (MessageHeader*)recvBuffer;
MessageHeader* requestMsg;
for (size_t i = 0; i < batchSize; i++) {
requestMsg = (MessageHeader*)(recvBuffer + i * (sizeof(MessageHeader) + payloadSize));
// cout << "requestMsg batch id " << requestMsg->BatchId << endl;
if (requestMsg->BatchId > batchSize) {
cout << "got wrong batch id from clinet: " << requestMsg->BatchId << endl;
}
int index = (int)(requestMsg->BatchId * batchSize + i);
char* respBuffer = bufferMem + index * responseSize;
memcpy(respBuffer, requestMsg, sizeof(MessageHeader));
respBuffer += sizeof(MessageHeader);
////respBuffer += sizeof(MessageHeader);
ServerContext* respCtx = serverContexts + index;
respCtx->clientIndex = clientIndex;
respCtx->ReadStartTime = high_resolution_clock::now().time_since_epoch().count();
Expand All @@ -145,19 +147,23 @@ void ThreadFunc(SOCKET clientSocket, DDS_FrontEnd::DDSFrontEnd *Store, int paylo
// Store->SetFilePointer(requestMsg->FileId, requestMsg->Offset, FilePointerPosition::BEGIN);
//cout << "set file pointer took: " << high_resolution_clock::now().time_since_epoch().count() - readFileStart << endl;

ret = Store->ReadFile(requestMsg->FileId, respBuffer, requestMsg->Offset, payloadSize, &respCtx->BytesRead, NULL, respCtx);

////for read
// ret = Store->ReadFile(requestMsg->FileId, respBuffer, requestMsg->Offset, payloadSize, &respCtx->BytesRead, NULL, respCtx);
////for write
ret = Store->WriteFile(requestMsg->FileId, recvBuffer + i * (sizeof(MessageHeader) + payloadSize) + sizeof(MessageHeader),
requestMsg->Offset, payloadSize, &respCtx->BytesWritten, NULL, respCtx);

//cout << "respctx: " << respCtx << endl;
// cout << "issued read file to front end, took: " << high_resolution_clock::now().time_since_epoch().count() - readFileStart << endl;


while (ret == DDS_ERROR_CODE_TOO_MANY_REQUESTS || ret == DDS_ERROR_CODE_REQUEST_RING_FAILURE) {
// just retry issuing the read

ret = Store->ReadFile(requestMsg->FileId, respBuffer, requestMsg->Offset, payloadSize, &respCtx->BytesRead, NULL, respCtx);
// just retry issuing the write
ret = Store->WriteFile(requestMsg->FileId, recvBuffer + i * (sizeof(MessageHeader) + payloadSize) + sizeof(MessageHeader),
requestMsg->Offset, payloadSize, &respCtx->BytesWritten, NULL, respCtx);
}

requestMsg += 1;
////requestMsg += 1;
}
}

Expand Down Expand Up @@ -256,7 +262,10 @@ void HandleCompletions(int payloadSize, int batchSize, int readNum) {
volatile ErrorCodeT iResult2 = 0;
int oneSend = 0;

int batchRespSize = (sizeof(MessageHeader) + payloadSize) * batchSize;
////for read
// int batchRespSize = (sizeof(MessageHeader) + payloadSize) * batchSize;
////for write
int batchRespSize = (sizeof(MessageHeader)) * batchSize;
int remainingBytesToSend = batchRespSize;

DWORD bytesTransferred = 0;
Expand Down Expand Up @@ -399,7 +408,7 @@ int RunServer(int payloadSize, int batchSize, int queueDepth, int readNum, int f
cout << "Failed to change size of file" << endl;
}
else {
cout << "File size changed" << endl;
cout << "File size changed to "<< fileSize << "G" << endl;
}

// setup io completion port
Expand Down
6 changes: 5 additions & 1 deletion DPDPU/AppDisagg/ServerDDS/ServerDDS.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ thread* pollWaitThread;

struct ServerContext
{
FileIOSizeT BytesRead;
union {
FileIOSizeT BytesRead;
FileIOSizeT BytesWritten;
};
//FileIOSizeT BytesRead;
//atomic_bool IsAvailable;
uint16_t BatchId;
atomic_uint16_t* batchCompletionCounts;
Expand Down

0 comments on commit 5333854

Please sign in to comment.