Skip to content
This repository has been archived by the owner on Sep 24, 2024. It is now read-only.

Commit

Permalink
Cleaning
Browse files Browse the repository at this point in the history
  • Loading branch information
qizzz authored Aug 18, 2024
1 parent aed1e1d commit be889b4
Show file tree
Hide file tree
Showing 25 changed files with 6 additions and 5,617 deletions.
441 changes: 0 additions & 441 deletions AppDisagg/ClientLinux/Client.cpp

This file was deleted.

10 changes: 0 additions & 10 deletions AppDisagg/ClientLinux/Client.h

This file was deleted.

13 changes: 0 additions & 13 deletions AppDisagg/ClientLinux/Makefile

This file was deleted.

30 changes: 0 additions & 30 deletions AppDisagg/DisaggStore.sln
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,6 @@ Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "BackEndLib", "..\StorageEng
EndProject
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "DDSFrontEnd", "..\StorageEngine\DDSFrontEnd\DDSFrontEnd.vcxproj", "{BEEF5F46-922D-49AE-B1A7-A9096EC926DB}"
EndProject
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "ServerQueued", "ServerNonPort\ServerNonPort.vcxproj", "{247BA9D3-21DF-4FCC-B6C2-44D9D6E9089D}"
EndProject
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "ServerDDSQueued", "ServerDDSQueued\ServerDDSQueued.vcxproj", "{ADF62960-0AA9-42A8-B6A9-C7D4FEDD2A5D}"
EndProject
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "ServerDDSPortBatched", "ServerDDSPortBatched\ServerDDSPortBatched.vcxproj", "{D41008BF-2BA6-44B2-A1E4-D08E86A4F2EE}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|x64 = Debug|x64
Expand Down Expand Up @@ -103,30 +97,6 @@ Global
{BEEF5F46-922D-49AE-B1A7-A9096EC926DB}.Release|x64.Build.0 = Release|x64
{BEEF5F46-922D-49AE-B1A7-A9096EC926DB}.Release|x86.ActiveCfg = Release|Win32
{BEEF5F46-922D-49AE-B1A7-A9096EC926DB}.Release|x86.Build.0 = Release|Win32
{247BA9D3-21DF-4FCC-B6C2-44D9D6E9089D}.Debug|x64.ActiveCfg = Debug|x64
{247BA9D3-21DF-4FCC-B6C2-44D9D6E9089D}.Debug|x64.Build.0 = Debug|x64
{247BA9D3-21DF-4FCC-B6C2-44D9D6E9089D}.Debug|x86.ActiveCfg = Debug|Win32
{247BA9D3-21DF-4FCC-B6C2-44D9D6E9089D}.Debug|x86.Build.0 = Debug|Win32
{247BA9D3-21DF-4FCC-B6C2-44D9D6E9089D}.Release|x64.ActiveCfg = Release|x64
{247BA9D3-21DF-4FCC-B6C2-44D9D6E9089D}.Release|x64.Build.0 = Release|x64
{247BA9D3-21DF-4FCC-B6C2-44D9D6E9089D}.Release|x86.ActiveCfg = Release|Win32
{247BA9D3-21DF-4FCC-B6C2-44D9D6E9089D}.Release|x86.Build.0 = Release|Win32
{ADF62960-0AA9-42A8-B6A9-C7D4FEDD2A5D}.Debug|x64.ActiveCfg = Debug|x64
{ADF62960-0AA9-42A8-B6A9-C7D4FEDD2A5D}.Debug|x64.Build.0 = Debug|x64
{ADF62960-0AA9-42A8-B6A9-C7D4FEDD2A5D}.Debug|x86.ActiveCfg = Debug|Win32
{ADF62960-0AA9-42A8-B6A9-C7D4FEDD2A5D}.Debug|x86.Build.0 = Debug|Win32
{ADF62960-0AA9-42A8-B6A9-C7D4FEDD2A5D}.Release|x64.ActiveCfg = Release|x64
{ADF62960-0AA9-42A8-B6A9-C7D4FEDD2A5D}.Release|x64.Build.0 = Release|x64
{ADF62960-0AA9-42A8-B6A9-C7D4FEDD2A5D}.Release|x86.ActiveCfg = Release|Win32
{ADF62960-0AA9-42A8-B6A9-C7D4FEDD2A5D}.Release|x86.Build.0 = Release|Win32
{D41008BF-2BA6-44B2-A1E4-D08E86A4F2EE}.Debug|x64.ActiveCfg = Debug|x64
{D41008BF-2BA6-44B2-A1E4-D08E86A4F2EE}.Debug|x64.Build.0 = Debug|x64
{D41008BF-2BA6-44B2-A1E4-D08E86A4F2EE}.Debug|x86.ActiveCfg = Debug|Win32
{D41008BF-2BA6-44B2-A1E4-D08E86A4F2EE}.Debug|x86.Build.0 = Debug|Win32
{D41008BF-2BA6-44B2-A1E4-D08E86A4F2EE}.Release|x64.ActiveCfg = Release|x64
{D41008BF-2BA6-44B2-A1E4-D08E86A4F2EE}.Release|x64.Build.0 = Release|x64
{D41008BF-2BA6-44B2-A1E4-D08E86A4F2EE}.Release|x86.ActiveCfg = Release|Win32
{D41008BF-2BA6-44B2-A1E4-D08E86A4F2EE}.Release|x86.Build.0 = Release|Win32
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
45 changes: 1 addition & 44 deletions AppDisagg/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ void ThreadFunc(SOCKET clientSocket, int payloadSize, int batchSize, int queueDe

int iResult = -1;

// cout << "Thread for client " << clientIndex << " Waiting for the first message..." << endl;

// Receive the first message
int msgSize = 0;
HandleFirstMsg(clientSocket, &msgSize);
Expand All @@ -60,14 +58,9 @@ void ThreadFunc(SOCKET clientSocket, int payloadSize, int batchSize, int queueDe
//for write
int mbatchSize = msgSize / (sizeof(MessageHeader)+ payloadSize);

// cout << "got payload size: " << msgSize << ", got batch size: " << mbatchSize << endl;
// cout << "using batch size: " << batchSize << endl;
if (mbatchSize != batchSize) {
cerr << "ERROR: batch size MISMATCH on server and client" << endl;
}
else {
// cout << "got batch size from client: " << mbatchSize << endl;
}

char* recvBuffer = new char[msgSize];
memset(recvBuffer, 0, msgSize);
Expand All @@ -84,7 +77,6 @@ void ThreadFunc(SOCKET clientSocket, int payloadSize, int batchSize, int queueDe
// Receive message from client and send response

while (oneReceive < msgSize) {
// cout << "receiving message..." << endl;
iResult = recv(clientSocket, recvBuffer + oneReceive, remainingBytesToReceive, 0);

if (iResult == SOCKET_ERROR) {
Expand All @@ -95,27 +87,12 @@ void ThreadFunc(SOCKET clientSocket, int payloadSize, int batchSize, int queueDe
else {
cout << "recv from client but client is finished and closed (conn rest)" << endl;
}

// int ret = closesocket(clientSocket); // close at the end
// if (ret != 0) {
// int err = WSAGetLastError();
// // already done, but remote closed forcibly 10054: conn reset
// if (err == 10054 && (int64_t) msgProcessedCount[clientIndex] == readNum) {
// cout << "client " << clientIndex << " connection closed" << endl;
// }
// else {
// cout << "error closing socket for client " << clientIndex << endl;
// }
// }
// else {
// cout << "closing socket completed for client " << clientIndex << endl;
// }

finished = true;
break;
}
else if (iResult == 0) {
finished = true;
// cout << "client closed socket" << endl;
break;
}

Expand All @@ -132,10 +109,8 @@ void ThreadFunc(SOCKET clientSocket, int payloadSize, int batchSize, int queueDe


// issue file read
//MessageHeader *requestMsg = (MessageHeader *) recvBuffer;
for (size_t i = 0; i < batchSize; i++) {
MessageHeader* requestMsg = (MessageHeader*)(recvBuffer + i * (sizeof(MessageHeader) + payloadSize));
// cout << "requestMsg batch id " << requestMsg->BatchId << endl;
int index = requestMsg->BatchId * batchSize + i;
char *respBuffer = bufferMem + index * responseSize;
memcpy(respBuffer, requestMsg, sizeof(MessageHeader));
Expand All @@ -145,10 +120,7 @@ void ThreadFunc(SOCKET clientSocket, int payloadSize, int batchSize, int queueDe
respCtx->ReadStartTime = high_resolution_clock::now().time_since_epoch().count();
respCtx->hSocket = clientSocket;
respCtx->socketMutex = socketMutex;
// respCtx->Index = index;
respCtx->BatchId = requestMsg->BatchId;
/* respCtx->Overlapped.Offset = requestMsg->Offset;
respCtx->Overlapped.OffsetHigh = 0; */
respCtx->Overlapped.Pointer = (PVOID) requestMsg->Offset;
respCtx->Overlapped.hEvent = NULL;
respCtx->bufferMem = bufferMem;
Expand All @@ -163,7 +135,6 @@ void ThreadFunc(SOCKET clientSocket, int payloadSize, int batchSize, int queueDe
if (err != ERROR_IO_PENDING) {
cerr << "something wrong when ReadFile(), carry on regardless..., err: " << err << endl;
cerr << "offset: " << requestMsg->Offset << endl;
// cerr << "respBuffer: " << (void *)respBuffer << endl;
}
// else it's async
}
Expand Down Expand Up @@ -232,8 +203,6 @@ void ThreadFunc(SOCKET clientSocket, int payloadSize, int batchSize, int queueDe
allClientsSendBackStats[clientIndex] = stats;
allClientsSendBackPercentiles[clientIndex] = PercentileStats;

// cout << "Measurement completes for thread " << clientIndex << endl;

// Clear buffer
delete[] recvBuffer;

Expand All @@ -242,9 +211,6 @@ void ThreadFunc(SOCKET clientSocket, int payloadSize, int batchSize, int queueDe
if (ret) {
cout << "closing socket err: " << WSAGetLastError() << endl;;
}
else {
// cout << "Thread " << clientIndex << " closing client socket completed" << endl;
}
}


Expand Down Expand Up @@ -377,10 +343,8 @@ void HandleCompletions(int payloadSize, int batchSize, int readNum) {
auto sendStart = high_resolution_clock::now().time_since_epoch().count();
ctx->socketMutex->lock(); // can't have concurrent sends, other pollers may also send on same socket
while (oneSend < batchRespSize) {
//cout << "remanining: " << remainingBytesToSend << " batchRespSize " << batchRespSize << endl;
iResult = send(ctx->hSocket, buf + oneSend, remainingBytesToSend, 0);
iResult = remainingBytesToSend;
//cout << "sent " << iResult << "remaining: " << remainingBytesToSend << endl;
if (iResult == SOCKET_ERROR) {
cerr << "Error sending response: " << WSAGetLastError() << endl;
closesocket(ctx->hSocket);
Expand All @@ -397,8 +361,6 @@ void HandleCompletions(int payloadSize, int batchSize, int readNum) {
oneSend = 0;
remainingBytesToSend = batchRespSize;
}
// msgProcessedCount[ctx->clientIndex]++; // this is racy!!!
// else nothing to do
}
}

Expand Down Expand Up @@ -462,7 +424,6 @@ int RunServer(int payloadSize, int batchSize, int queueDepth, int readNum, int f
cout << "Error creating completion port: " << GetLastError() << endl;
return -1;
}
// cout << "IO completion port created" << endl;
for (size_t i = 1; i < nFile; i++) {
HANDLE ret = CreateIoCompletionPort(fileHandles[i], ioCompletionPort, NULL, nCompletionThreads);
if (ret == NULL) {
Expand Down Expand Up @@ -588,7 +549,6 @@ int RunServer(int payloadSize, int batchSize, int queueDepth, int readNum, int f
cout << "closing listen socket err: " << WSAGetLastError() << endl;;
}
WSACleanup();
// cout << "closing listen socket completed" << endl;

// close files opened
for (int i = 0; i < nFile; i++) {
Expand All @@ -597,14 +557,12 @@ int RunServer(int payloadSize, int batchSize, int queueDepth, int readNum, int f
cerr << "closing file " << i << " failed" << endl;
}
}
// cout << "closed files" << endl;

// close completion port
ret = CloseHandle(ioCompletionPort);
if (ret == 0) {
cerr << "closing completion port failed" << endl;
}
// cout << "closed completion port" << endl;

return 0;
}
Expand All @@ -623,7 +581,6 @@ int main(
int nFile = stoi(args[6]);
int nConnections = stoi(args[7]);
int nCompletionThreads = stoi(args[8]);
// cout << "queue depth: " << queueDepth << endl;
return RunServer(readSize, batchSize, queueDepth, readNum, fileSize, nFile, nConnections, nCompletionThreads);
}
else {
Expand Down
Loading

0 comments on commit be889b4

Please sign in to comment.