Skip to content

Commit

Permalink
Add random port/report port in background, switch to malloc for data …
Browse files Browse the repository at this point in the history
…buffers (#4183)

Add port randomization in background, switch to malloc for data buffers
  • Loading branch information
eisenhauer authored Jun 6, 2024
1 parent e85fbbd commit f6df978
Showing 1 changed file with 73 additions and 10 deletions.
83 changes: 73 additions & 10 deletions source/adios2/toolkit/remote/remote_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <cstring> // strerror
#include <errno.h> // errno
#include <fcntl.h> // open
#include <inttypes.h>
#include <regex>
#include <sys/stat.h> // open, fstat
#include <sys/types.h> // open
Expand Down Expand Up @@ -53,6 +54,9 @@ size_t TotalSimpleReads = 0;
size_t TotalGets = 0;
size_t SimpleFilesOpened = 0;
size_t ADIOSFilesOpened = 0;
static int report_port_selection = 0;
int parent_pid;
uint64_t random_cookie = 0;

std::string readable_size(uint64_t size)
{
Expand Down Expand Up @@ -276,17 +280,17 @@ static void GetRequestHandler(CManager cm, CMConnection conn, void *vevent, void
{ \
_ReadResponseMsg Response; \
memset(&Response, 0, sizeof(Response)); \
std::vector<T> RetData; \
auto var = f->m_io->InquireVariable<T>(VarName); \
if (f->m_mode == RemoteOpenRandomAccess) \
var->SetStepSelection({GetMsg->Step, 1}); \
if (GetMsg->BlockID != -1) \
var->SetBlockSelection(GetMsg->BlockID); \
if (GetMsg->Start) \
var->SetSelection(b); \
Response.Size = var->SelectionSize() * sizeof(T); \
T *RetData = (T *)malloc(Response.Size); \
f->m_engine->Get(*var, RetData, Mode::Sync); \
Response.Size = RetData.size() * sizeof(T); \
Response.ReadData = (char *)RetData.data(); \
Response.ReadData = (char *)RetData; \
Response.ReadResponseCondition = GetMsg->GetResponseCondition; \
Response.Dest = GetMsg->Dest; /* final data destination in client memory space */ \
if (verbose >= 2) \
Expand All @@ -297,6 +301,7 @@ static void GetRequestHandler(CManager cm, CMConnection conn, void *vevent, void
TotalGetBytesSent += Response.Size; \
TotalGets++; \
CMwrite(conn, ev_state->ReadResponseFormat, &Response); \
free(RetData); \
}
ADIOS2_FOREACH_PRIMITIVE_STDTYPE_1ARG(GET)
#undef GET
Expand Down Expand Up @@ -506,6 +511,20 @@ static bool server_timeout(void *CMvoid, int time_since_service)
return false;
}

#define IMAX_BITS(m) ((m) / ((m) % 255 + 1) / 255 % 255 * 8 + 7 - 86 / ((m) % 255 + 12))
#define RAND_MAX_WIDTH IMAX_BITS(RAND_MAX)

uint64_t rand64(void)
{
uint64_t r = 0;
for (int i = 0; i < 64; i += RAND_MAX_WIDTH)
{
r <<= RAND_MAX_WIDTH;
r ^= (unsigned)rand();
}
return r;
}

static void timer_start(void *param, unsigned int interval)
{

Expand Down Expand Up @@ -542,6 +561,10 @@ int main(int argc, char **argv)
{
kill_server++;
}
else if (strcmp(argv[i], "-report_port_selection") == 0)
{
report_port_selection++;
}
else if (strcmp(argv[i], "-status") == 0)
{
status_server++;
Expand Down Expand Up @@ -580,7 +603,7 @@ int main(int argc, char **argv)
}
if (background)
{
if (verbose)
if (verbose && !report_port_selection)
{
printf("Forking server to background\n");
}
Expand Down Expand Up @@ -614,10 +637,29 @@ int main(int argc, char **argv)
}
exit(0);
#else
parent_pid = getpid();
if (fork() != 0)
{
/* I'm the parent, wait a sec to let the child start, then exit */
sleep(1);
// sleep(1);
if (report_port_selection)
{
char final_filename[256];
snprintf(final_filename, sizeof(final_filename), "/tmp/port_file_%x", parent_pid);
FILE *f = NULL;
f = fopen(final_filename, "r");
while (f == NULL)
{
sleep(1); // wait until available
f = fopen(final_filename, "r");
}
char buffer[256];
fread(buffer, 1, 256, f);
printf("%s", buffer);
}
close(0);
close(1);
close(2);
exit(0);
}
/* I'm the child, close IO FDs so that ctest continues. No verbosity here */
Expand Down Expand Up @@ -652,15 +694,36 @@ int main(int argc, char **argv)

if (listen_list == NULL)
listen_list = create_attr_list();
add_attr(listen_list, CM_IP_PORT, Attr_Int4, (attr_value)ServerPort);
CMlisten_specific(cm, listen_list);
if (!report_port_selection)
{
// listen on well-known port
add_attr(listen_list, CM_IP_PORT, Attr_Int4, (attr_value)ServerPort);
CMlisten_specific(cm, listen_list);
}
else
{
// randomize port
CMlisten(cm);
listen_list = CMget_contact_list(cm);
int Port = -1;
get_int_attr(listen_list, CM_IP_PORT, &Port);
char filename[256];
char final_filename[256];
snprintf(filename, sizeof(filename), "/tmp/port_file_%x", getpid());
snprintf(final_filename, sizeof(final_filename), "/tmp/port_file_%x", parent_pid);
FILE *f = fopen(filename, "w");
random_cookie = rand64();
fprintf(f, "port:%d;msg:%s;cookie:%#018" PRIx64 "\n", Port, "no_error", random_cookie);
fclose(f);
rename(filename, final_filename);
}

attr_list contact_list = CMget_contact_list(cm);
if (contact_list)
{
char *string_list = attr_list_to_string(contact_list);
std::cout << "Listening at port " << ServerPort << std::endl;
free(string_list);
int Port = -1;
get_int_attr(listen_list, CM_IP_PORT, &Port);
printf("Listening on Port %d\n", Port);
}
ev_state.cm = cm;

Expand Down

0 comments on commit f6df978

Please sign in to comment.