Skip to content

Commit

Permalink
Allows blocking socket simulation on multiple threads simultaneously
Browse files Browse the repository at this point in the history
  • Loading branch information
anr2me committed Sep 8, 2020
1 parent d8d2fd8 commit 871707c
Showing 1 changed file with 40 additions and 30 deletions.
70 changes: 40 additions & 30 deletions Core/HLE/sceNetAdhoc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ int IsAdhocctlInCB = 0;
int adhocctlNotifyEvent = -1;
int adhocSocketNotifyEvent = -1;
std::map<int, AdhocctlRequest> adhocctlRequests;
std::map<int, AdhocSocketRequest> adhocSocketRequests;
std::map<int, AdhocSendTargets> sendTargetPeers;
std::map<u64, AdhocSocketRequest> adhocSocketRequests;
std::map<u64, AdhocSendTargets> sendTargetPeers;

u32 dummyThreadHackAddr = 0;
u32_le dummyThreadCode[3];
Expand Down Expand Up @@ -546,27 +546,27 @@ static void __AdhocSocketNotify(u64 userdata, int cyclesLate) {
return;

// Socket not found?! Should never happened! but if it ever happen should we just exit here or need to wake the thread first?
if (adhocSocketRequests.find(uid) == adhocSocketRequests.end()) {
WARN_LOG(SCENET, "sceNetAdhoc Socket WaitID(%i) not found!", uid);
if (adhocSocketRequests.find(userdata) == adhocSocketRequests.end()) {
WARN_LOG(SCENET, "sceNetAdhoc Socket WaitID(%i) on Thread(%i) not found!", uid, threadID);
//__KernelResumeThreadFromWait(threadID, ERROR_NET_ADHOC_TIMEOUT);
return;
}

AdhocSocketRequest req = adhocSocketRequests[uid];
AdhocSocketRequest req = adhocSocketRequests[userdata];

switch (req.type) {
case PDP_SEND:
if (sendTargetPeers.find(uid) == sendTargetPeers.end()) {
if (sendTargetPeers.find(userdata) == sendTargetPeers.end()) {
// No destination peers?
result = 0;
break;
}
if (DoBlockingPdpSend(uid, req, result, sendTargetPeers[uid])) {
if (DoBlockingPdpSend(uid, req, result, sendTargetPeers[userdata])) {
// Try again in another 0.5ms until data available or timedout.
CoreTiming::ScheduleEvent(usToCycles(delayUS) - cyclesLate, adhocSocketNotifyEvent, userdata);
return;
}
sendTargetPeers.erase(uid);
sendTargetPeers.erase(userdata);
break;

case PDP_RECV:
Expand Down Expand Up @@ -622,12 +622,14 @@ static void __AdhocSocketNotify(u64 userdata, int cyclesLate) {
DEBUG_LOG(SCENET, "Returning (WaitID: %d, error: %d) Result (%08x) of sceNetAdhoc - SocketID: %d", waitID, error, (int)result, req.id);

// We are done with this socket
adhocSocketRequests.erase(uid);
adhocSocketRequests.erase(userdata);
}

int WaitBlockingAdhocSocket(int socketId, int type, int pspSocketId, void* buffer, s32_le* len, u32 timeoutUS, SceNetEtherAddr* remoteMAC, u16_le* remotePort, const char* reason) {
if (adhocSocketRequests.find(socketId) != adhocSocketRequests.end()) {
WARN_LOG(SCENET, "sceNetAdhoc - WaitID[%d] already existed, Socket[%d] is busy!", socketId, pspSocketId);
// input threadSocketId = ((u64)__KernelGetCurThread()) << 32 | socketId;
int WaitBlockingAdhocSocket(u64 threadSocketId, int type, int pspSocketId, void* buffer, s32_le* len, u32 timeoutUS, SceNetEtherAddr* remoteMAC, u16_le* remotePort, const char* reason) {
int uid = (int)(threadSocketId & 0xFFFFFFFF);
if (adhocSocketRequests.find(threadSocketId) != adhocSocketRequests.end()) {
WARN_LOG(SCENET, "sceNetAdhoc - WaitID[%d] already existed, Socket[%d] is busy!", uid, pspSocketId);
return ERROR_NET_ADHOC_BUSY;
}

Expand All @@ -640,12 +642,11 @@ int WaitBlockingAdhocSocket(int socketId, int type, int pspSocketId, void* buffe
if (tmout > 0)
tmout = std::max(tmout, minSocketTimeoutUS);

u64 param = ((u64)__KernelGetCurThread()) << 32 | socketId;
u64 startTime = (u64)(real_time_now() * 1000000.0);
adhocSocketRequests[socketId] = { type, pspSocketId, buffer, len, tmout, startTime, remoteMAC, remotePort };
adhocSocketRequests[threadSocketId] = { type, pspSocketId, buffer, len, tmout, startTime, remoteMAC, remotePort };
// Some games (ie. Power Stone Collection) are using as small as 100 usec timeout
CoreTiming::ScheduleEvent(usToCycles(100), adhocSocketNotifyEvent, param);
__KernelWaitCurThread(WAITTYPE_NET, socketId, 0, 0, false, reason);
CoreTiming::ScheduleEvent(usToCycles(100), adhocSocketNotifyEvent, threadSocketId);
__KernelWaitCurThread(WAITTYPE_NET, uid, 0, 0, false, reason);

// Fallback return value
return ERROR_NET_ADHOC_TIMEOUT;
Expand Down Expand Up @@ -1114,15 +1115,16 @@ static int sceNetAdhocPdpSend(int id, const char *mac, u32 port, void *data, int
if (sent == SOCKET_ERROR) {
// Simulate blocking behaviour with non-blocking socket
if (!flag && (error == EAGAIN || error == EWOULDBLOCK || error == ETIMEDOUT)) {
if (sendTargetPeers.find(socket->id) != sendTargetPeers.end()) {
u64 threadSocketId = ((u64)__KernelGetCurThread()) << 32 | socket->id;
if (sendTargetPeers.find(threadSocketId) != sendTargetPeers.end()) {
DEBUG_LOG(SCENET, "sceNetAdhocPdpSend[%i:%u]: Socket(%d) is Busy!", id, getLocalPort(socket->id), socket->id);
return ERROR_NET_ADHOC_BUSY;
}

AdhocSendTargets dest = { len, {}, false };
dest.peers.push_back({ target.sin_addr.s_addr, dport });
sendTargetPeers[socket->id] = dest;
return WaitBlockingAdhocSocket(socket->id, PDP_SEND, id, data, nullptr, timeout, nullptr, nullptr, "pdp send");
sendTargetPeers[threadSocketId] = dest;
return WaitBlockingAdhocSocket(threadSocketId, PDP_SEND, id, data, nullptr, timeout, nullptr, nullptr, "pdp send");
}

DEBUG_LOG(SCENET, "Socket Error (%i) on sceNetAdhocPdpSend[%i:%u->%u] (size=%i)", error, id, getLocalPort(socket->id), ntohs(target.sin_port), len);
Expand Down Expand Up @@ -1187,13 +1189,14 @@ static int sceNetAdhocPdpSend(int id, const char *mac, u32 port, void *data, int
// Send Data
// Simulate blocking behaviour with non-blocking socket
if (!flag) {
if (sendTargetPeers.find(socket->id) != sendTargetPeers.end()) {
u64 threadSocketId = ((u64)__KernelGetCurThread()) << 32 | socket->id;
if (sendTargetPeers.find(threadSocketId) != sendTargetPeers.end()) {
DEBUG_LOG(SCENET, "sceNetAdhocPdpSend[%i:%u](BC): Socket(%d) is Busy!", id, getLocalPort(socket->id), socket->id);
return ERROR_NET_ADHOC_BUSY;
}

sendTargetPeers[socket->id] = dest;
return WaitBlockingAdhocSocket(socket->id, PDP_SEND, id, data, nullptr, timeout, nullptr, nullptr, "pdp send broadcast");
sendTargetPeers[threadSocketId] = dest;
return WaitBlockingAdhocSocket(threadSocketId, PDP_SEND, id, data, nullptr, timeout, nullptr, nullptr, "pdp send broadcast");
}
// Non-blocking
else {
Expand Down Expand Up @@ -1361,7 +1364,8 @@ static int sceNetAdhocPdpRecv(int id, void *addr, void * port, void *buf, void *
if (received == SOCKET_ERROR) {
if (flag == 0) {
// Simulate blocking behaviour with non-blocking socket
return WaitBlockingAdhocSocket(socket->id, PDP_RECV, id, buf, len, timeout, saddr, sport, "pdp recv");
u64 threadSocketId = ((u64)__KernelGetCurThread()) << 32 | socket->id;
return WaitBlockingAdhocSocket(threadSocketId, PDP_RECV, id, buf, len, timeout, saddr, sport, "pdp recv");
}

VERBOSE_LOG(SCENET, "Socket Error (%i) on sceNetAdhocPdpRecv[%i:%u] [size=%i]", error, id, socket->lport, *len);
Expand Down Expand Up @@ -1568,11 +1572,13 @@ int sceNetAdhocPollSocket(u32 socketStructAddr, int count, int timeout, int nonb
int affectedsockets = 0;
if (nonblock)
affectedsockets = PollAdhocSocket(sds, count, timeout);
else
else {
// Simulate blocking behaviour with non-blocking socket
// Borrowing some arguments to pass some parameters. The dummy WaitID(count+1) might not be unique thus have duplicate possibilities if there are multiple thread trying to poll the same numbers of socket at the same time
return WaitBlockingAdhocSocket(count+1, ADHOC_POLL_SOCKET, count, sds, nullptr, timeout, nullptr, nullptr, "adhoc pollsocket");

u64 threadSocketId = ((u64)__KernelGetCurThread()) << 32 | (count + 1);
return WaitBlockingAdhocSocket(threadSocketId, ADHOC_POLL_SOCKET, count, sds, nullptr, timeout, nullptr, nullptr, "adhoc pollsocket");
}

// Free Network Lock
//freeNetworkLock();

Expand Down Expand Up @@ -2819,7 +2825,8 @@ static int sceNetAdhocPtpAccept(int id, u32 peerMacAddrPtr, u32 peerPortPtr, int
if (newsocket == SOCKET_ERROR) {
if (flag == 0) {
// Simulate blocking behaviour with non-blocking socket
return WaitBlockingAdhocSocket(socket->id, PTP_ACCEPT, id, nullptr, nullptr, timeout, addr, port, "ptp accept");
u64 threadSocketId = ((u64)__KernelGetCurThread()) << 32 | socket->id;
return WaitBlockingAdhocSocket(threadSocketId, PTP_ACCEPT, id, nullptr, nullptr, timeout, addr, port, "ptp accept");
}
// Prevent spamming Debug Log with retries of non-bocking socket
else {
Expand Down Expand Up @@ -2935,7 +2942,8 @@ static int sceNetAdhocPtpConnect(int id, int timeout, int flag) {
// Blocking Mode
else {
// Simulate blocking behaviour with non-blocking socket
return WaitBlockingAdhocSocket(socket->id, PTP_CONNECT, id, nullptr, nullptr, timeout, nullptr, nullptr, "ptp connect");
u64 threadSocketId = ((u64)__KernelGetCurThread()) << 32 | socket->id;
return WaitBlockingAdhocSocket(threadSocketId, PTP_CONNECT, id, nullptr, nullptr, timeout, nullptr, nullptr, "ptp connect");
}
}
}
Expand Down Expand Up @@ -3240,7 +3248,8 @@ static int sceNetAdhocPtpSend(int id, u32 dataAddr, u32 dataSizeAddr, int timeou
return ERROR_NET_ADHOC_WOULD_BLOCK;

// Simulate blocking behaviour with non-blocking socket
return WaitBlockingAdhocSocket(socket->id, PTP_SEND, id, (void*)data, len, timeout, nullptr, nullptr, "ptp send");
u64 threadSocketId = ((u64)__KernelGetCurThread()) << 32 | socket->id;
return WaitBlockingAdhocSocket(threadSocketId, PTP_SEND, id, (void*)data, len, timeout, nullptr, nullptr, "ptp send");
}

// Change Socket State
Expand Down Expand Up @@ -3312,7 +3321,8 @@ static int sceNetAdhocPtpRecv(int id, u32 dataAddr, u32 dataSizeAddr, int timeou
if (received == SOCKET_ERROR) {
if (flag == 0) {
// Simulate blocking behaviour with non-blocking socket
return WaitBlockingAdhocSocket(socket->id, PTP_RECV, id, buf, len, timeout, nullptr, nullptr, "ptp recv");
u64 threadSocketId = ((u64)__KernelGetCurThread()) << 32 | socket->id;
return WaitBlockingAdhocSocket(threadSocketId, PTP_RECV, id, buf, len, timeout, nullptr, nullptr, "ptp recv");
}

VERBOSE_LOG(SCENET, "Socket Error (%i) on sceNetAdhocPtpRecv[%i:%u] [size=%i]", error, id, socket->lport, *len);
Expand Down

0 comments on commit 871707c

Please sign in to comment.