Skip to content

Commit

Permalink
[test] fix hang in macOS variants of reverse diagnostics server tests (
Browse files Browse the repository at this point in the history
…#40225)

Co-authored-by: Noah Falk <[email protected]>
  • Loading branch information
John Salem and noahfalk authored Aug 5, 2020
1 parent 05e2cfb commit 5c29e14
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 27 deletions.
17 changes: 10 additions & 7 deletions src/coreclr/src/debug/debug-pal/unix/diagnosticsipc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,28 +246,31 @@ int32_t IpcStream::DiagnosticsIpc::Poll(IpcPollHandle *rgIpcPollHandles, uint32_
{
if (pollfds[i].revents != 0)
{
if (callback != nullptr)
callback("IpcStream::DiagnosticsIpc::Poll - poll revents", (uint32_t)pollfds[i].revents);
// error check FIRST
if (pollfds[i].revents & POLLHUP)
{
// check for hangup first because a closed socket
// will technically meet the requirements for POLLIN
// i.e., a call to recv/read won't block
rgIpcPollHandles[i].revents = (uint8_t)PollEvents::HANGUP;
delete[] pollfds;
return -1;
}
else if ((pollfds[i].revents & (POLLERR|POLLNVAL)))
{
if (callback != nullptr)
callback("Poll error", (uint32_t)pollfds[i].revents);
rgIpcPollHandles[i].revents = (uint8_t)PollEvents::ERR;
delete[] pollfds;
return -1;
}
else if (pollfds[i].revents & (POLLIN|POLLPRI))
{
rgIpcPollHandles[i].revents = (uint8_t)PollEvents::SIGNALED;
break;
}
else
{
rgIpcPollHandles[i].revents = (uint8_t)PollEvents::UNKNOWN;
if (callback != nullptr)
callback("unkown poll response", (uint32_t)pollfds[i].revents);
}
}
}
Expand Down Expand Up @@ -341,7 +344,7 @@ bool IpcStream::Read(void *lpBuffer, const uint32_t nBytesToRead, uint32_t &nByt
pfd.fd = _clientSocket;
pfd.events = POLLIN;
int retval = poll(&pfd, 1, timeoutMs);
if (retval <= 0 || pfd.revents != POLLIN)
if (retval <= 0 || !(pfd.revents & POLLIN))
{
// timeout or error
return false;
Expand Down Expand Up @@ -382,7 +385,7 @@ bool IpcStream::Write(const void *lpBuffer, const uint32_t nBytesToWrite, uint32
pfd.fd = _clientSocket;
pfd.events = POLLOUT;
int retval = poll(&pfd, 1, timeoutMs);
if (retval <= 0 || pfd.revents != POLLOUT)
if (retval <= 0 || !(pfd.revents & POLLOUT))
{
// timeout or error
return false;
Expand Down
7 changes: 5 additions & 2 deletions src/coreclr/src/debug/inc/diagnosticsipc.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ typedef void (*ErrorCallback)(const char *szMessage, uint32_t code);

class IpcStream final
{
friend class IpcStreamFactory;
public:
static constexpr int32_t InfiniteTimeout = -1;
~IpcStream();
Expand All @@ -26,6 +27,7 @@ class IpcStream final

class DiagnosticsIpc final
{
friend class IpcStreamFactory;
public:
enum ConnectionMode
{
Expand All @@ -38,7 +40,8 @@ class IpcStream final
NONE = 0x00, // no events
SIGNALED = 0x01, // ready for use
HANGUP = 0x02, // connection remotely closed
ERR = 0x04 // other error
ERR = 0x04, // error
UNKNOWN = 0x80 // unknown state
};

// The bookeeping struct used for polling on server and client structs
Expand Down Expand Up @@ -125,7 +128,7 @@ class IpcStream final
private:
#ifdef TARGET_UNIX
int _clientSocket = -1;
IpcStream(int clientSocket, int serverSocket, DiagnosticsIpc::ConnectionMode mode = DiagnosticsIpc::ConnectionMode::SERVER)
IpcStream(int clientSocket, DiagnosticsIpc::ConnectionMode mode = DiagnosticsIpc::ConnectionMode::SERVER)
: _clientSocket(clientSocket), _mode(mode) {}
#else
HANDLE _hPipe = INVALID_HANDLE_VALUE;
Expand Down
2 changes: 2 additions & 0 deletions src/coreclr/src/vm/diagnosticserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ DWORD WINAPI DiagnosticServer::DiagnosticsServerThread(LPVOID)
continue;
}

STRESS_LOG2(LF_DIAGNOSTICS_PORT, LL_INFO10, "DiagnosticServer - received IPC message with command set (%d) and command id (%d)\n", message.GetHeader().CommandSet, message.GetHeader().CommandId);

switch ((DiagnosticsIpc::DiagnosticServerCommandSet)message.GetHeader().CommandSet)
{
case DiagnosticsIpc::DiagnosticServerCommandSet::EventPipe:
Expand Down
33 changes: 33 additions & 0 deletions src/coreclr/src/vm/ipcstreamfactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ Volatile<bool> IpcStreamFactory::s_isShutdown = false;

bool IpcStreamFactory::ClientConnectionState::GetIpcPollHandle(IpcStream::DiagnosticsIpc::IpcPollHandle *pIpcPollHandle, ErrorCallback callback)
{
STRESS_LOG0(LF_DIAGNOSTICS_PORT, LL_INFO1000, "IpcStreamFactory::ClientConnectionState::GetIpcPollHandle - ENTER.\n");
if (_pStream == nullptr)
{
STRESS_LOG0(LF_DIAGNOSTICS_PORT, LL_INFO10, "IpcStreamFactory::ClientConnectionState::GetIpcPollHandle - cache was empty!\n");
// cache is empty, reconnect, e.g., there was a disconnect
IpcStream *pConnection = _pIpc->Connect(callback);
if (pConnection == nullptr)
Expand All @@ -22,6 +24,11 @@ bool IpcStreamFactory::ClientConnectionState::GetIpcPollHandle(IpcStream::Diagno
callback("Failed to connect to client connection", -1);
return false;
}
#ifdef TARGET_UNIX
STRESS_LOG1(LF_DIAGNOSTICS_PORT, LL_INFO10, "IpcStreamFactory::ClientConnectionState::GetIpcPollHandle - returned connection { _clientSocket = %d }\n", pConnection->_clientSocket);
#else
STRESS_LOG2(LF_DIAGNOSTICS_PORT, LL_INFO10, "IpcStreamFactory::ClientConnectionState::GetIpcPollHandle - returned connection { _hPipe = %d, _oOverlap.hEvent = %d }\n", pConnection->_hPipe, pConnection->_oOverlap.hEvent);
#endif
if (!DiagnosticsIpc::SendIpcAdvertise_V1(pConnection))
{
if (callback != nullptr)
Expand All @@ -33,6 +40,7 @@ bool IpcStreamFactory::ClientConnectionState::GetIpcPollHandle(IpcStream::Diagno
_pStream = pConnection;
}
*pIpcPollHandle = { nullptr, _pStream, 0, this };
STRESS_LOG0(LF_DIAGNOSTICS_PORT, LL_INFO10, "IpcStreamFactory::ClientConnectionState::GetIpcPollHandle - EXIT.\n");
return true;
}

Expand Down Expand Up @@ -139,6 +147,7 @@ int32_t IpcStreamFactory::GetNextTimeout(int32_t currentTimeoutMs)

IpcStream *IpcStreamFactory::GetNextAvailableStream(ErrorCallback callback)
{
STRESS_LOG0(LF_DIAGNOSTICS_PORT, LL_INFO10, "IpcStreamFactory::GetNextAvailableStream - ENTER");
IpcStream *pStream = nullptr;
CQuickArrayList<IpcStream::DiagnosticsIpc::IpcPollHandle> rgIpcPollHandles;

Expand Down Expand Up @@ -168,6 +177,25 @@ IpcStream *IpcStreamFactory::GetNextAvailableStream(ErrorCallback callback)

nPollAttempts++;
STRESS_LOG2(LF_DIAGNOSTICS_PORT, LL_INFO10, "IpcStreamFactory::GetNextAvailableStream - Poll attempt: %d, timeout: %dms.\n", nPollAttempts, pollTimeoutMs);
for (uint32_t i = 0; i < rgIpcPollHandles.Size(); i++)
{
if (rgIpcPollHandles[i].pIpc != nullptr)
{
#ifdef TARGET_UNIX
STRESS_LOG2(LF_DIAGNOSTICS_PORT, LL_INFO10, "\tSERVER IpcPollHandle[%d] = { _serverSocket = %d }\n", i, rgIpcPollHandles[i].pIpc->_serverSocket);
#else
STRESS_LOG3(LF_DIAGNOSTICS_PORT, LL_INFO10, "\tSERVER IpcPollHandle[%d] = { _hPipe = %d, _oOverlap.hEvent = %d }\n", i, rgIpcPollHandles[i].pIpc->_hPipe, rgIpcPollHandles[i].pIpc->_oOverlap.hEvent);
#endif
}
else
{
#ifdef TARGET_UNIX
STRESS_LOG2(LF_DIAGNOSTICS_PORT, LL_INFO10, "\tCLIENT IpcPollHandle[%d] = { _clientSocket = %d }\n", i, rgIpcPollHandles[i].pStream->_clientSocket);
#else
STRESS_LOG3(LF_DIAGNOSTICS_PORT, LL_INFO10, "\tCLIENT IpcPollHandle[%d] = { _hPipe = %d, _oOverlap.hEvent = %d }\n", i, rgIpcPollHandles[i].pStream->_hPipe, rgIpcPollHandles[i].pStream->_oOverlap.hEvent);
#endif
}
}
int32_t retval = IpcStream::DiagnosticsIpc::Poll(rgIpcPollHandles.Ptr(), (uint32_t)rgIpcPollHandles.Size(), pollTimeoutMs, callback);
bool fSawError = false;

Expand Down Expand Up @@ -211,6 +239,11 @@ IpcStream *IpcStreamFactory::GetNextAvailableStream(ErrorCallback callback)
rgIpcPollHandles.Pop();
}

#ifdef TARGET_UNIX
STRESS_LOG2(LF_DIAGNOSTICS_PORT, LL_INFO10, "IpcStreamFactory::GetNextAvailableStream - EXIT :: Poll attempt: %d, stream using handle %d.\n", nPollAttempts, pStream->_clientSocket);
#else
STRESS_LOG2(LF_DIAGNOSTICS_PORT, LL_INFO10, "IpcStreamFactory::GetNextAvailableStream - EXIT :: Poll attempt: %d, stream using handle %d.\n", nPollAttempts, pStream->_hPipe);
#endif
return pStream;
}

Expand Down
2 changes: 1 addition & 1 deletion src/tests/tracing/eventpipe/common/IpcUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public static async Task<bool> RunSubprocess(Assembly currentAssembly, Dictionar
foreach ((string key, string value) in environment)
process.StartInfo.Environment.Add(key, value);
process.StartInfo.FileName = Process.GetCurrentProcess().MainModule.FileName;
process.StartInfo.Arguments = new Uri(currentAssembly.CodeBase).LocalPath + " 0";
process.StartInfo.Arguments = new Uri(currentAssembly.Location).LocalPath + " 0";
process.StartInfo.RedirectStandardOutput = true;
process.StartInfo.RedirectStandardInput = true;
process.StartInfo.RedirectStandardError = true;
Expand Down
22 changes: 7 additions & 15 deletions src/tests/tracing/eventpipe/common/Reverse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ public ReverseServer(string serverAddress, int bufferSize = 16 * 1024)
socket.ReceiveBufferSize = Math.Max(bufferSize, 128);
socket.Bind(remoteEP);
socket.Listen(255);
socket.LingerState.Enabled = false;
_server = socket;
}
}
Expand Down Expand Up @@ -166,20 +165,13 @@ public void Shutdown()
}
break;
case Socket socket:
try
{
socket.Shutdown(SocketShutdown.Both);
}
catch {}
finally
{
_clientSocket?.Close();
socket.Close();
socket.Dispose();
_clientSocket?.Dispose();
if (File.Exists(_serverAddress))
File.Delete(_serverAddress);
}
if (File.Exists(_serverAddress))
File.Delete(_serverAddress);
socket.Close();
socket.Dispose();
_clientSocket?.Shutdown(SocketShutdown.Both);
_clientSocket?.Close();
_clientSocket?.Dispose();
break;
default:
throw new ArgumentException("Invalid server type");
Expand Down
10 changes: 8 additions & 2 deletions src/tests/tracing/eventpipe/reverseouter/reverseouter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,22 @@ public static async Task<bool> TEST_ReverseConnectionCanRecycleWhileTracing()
Logger.logger.Log("Starting EventPipeSession over standard connection");
using Stream stream = EventPipeClient.CollectTracing(pid, config, out var sessionId);
Logger.logger.Log($"Started EventPipeSession over standard connection with session id: 0x{sessionId:x}");
using var source = new EventPipeEventSource(stream);
Task readerTask = Task.Run(() => source.Process());
// using var source = new EventPipeEventSource(stream);
using var memroyStream = new MemoryStream();
Task readerTask = stream.CopyToAsync(memroyStream);//Task.Run(() => source.Process());
await Task.Delay(500);
Logger.logger.Log("Stopping EventPipeSession over standard connection");
EventPipeClient.StopTracing(pid, sessionId);
await readerTask;
Logger.logger.Log("Stopped EventPipeSession over standard connection");
}
catch (Exception e)
{
Logger.logger.Log(e.ToString());
}
finally
{
Logger.logger.Log("setting the MRE");
mre.Set();
}
});
Expand Down

0 comments on commit 5c29e14

Please sign in to comment.