Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 87 #92

Merged
merged 3 commits into from
Jun 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 67 additions & 71 deletions src/mpiroutines.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#ifdef USEMPI

#include <cassert>
#include <tuple>

//-- For MPI

Expand Down Expand Up @@ -2674,35 +2675,55 @@ void MPISendReceiveBuffWithExtraDMInfoBetweenThreads(Options &opt, Particle *Par
#endif
}

static std::tuple<std::vector<Int_t>, std::vector<float>>
exchange_indices_and_props(
const std::vector<Int_t> &indices, const std::vector<float> &props,
std::size_t props_per_index, int rank, int tag, MPI_Comm mpi_comm)
{
assert(indices.size() <= std::numeric_limits<std::int32_t>::max());
assert(props.size() == indices.size() * props_per_index);

// Send/recv number of indices to allocate reception buffers
int num_indices = indices.size();
int num_indices_recv;
MPI_Status status;
MPI_Sendrecv(
&num_indices, 1, MPI_Int_t, rank, tag * 2,
&num_indices_recv, 1, MPI_Int_t, rank, tag * 2,
mpi_comm, &status);

// Send/recv actual indices and properties
std::vector<Int_t> indices_recv(num_indices_recv);
std::vector<float> props_recv(num_indices_recv * props_per_index);
MPI_Sendrecv(
indices.data(), indices.size(), MPI_Int_t, rank, tag * 3,
indices_recv.data(), indices_recv.size(), MPI_Int_t, rank, tag * 3,
mpi_comm, &status);
MPI_Sendrecv(
props.data(), props.size(), MPI_FLOAT, rank, tag * 4,
props_recv.data(), props_recv.size(), MPI_FLOAT, rank, tag * 4,
mpi_comm, &status);

return std::make_tuple(std::move(indices_recv), std::move(props_recv));
}

void MPISendReceiveFOFHydroInfoBetweenThreads(Options &opt, fofid_in *FoFGroupDataLocal, vector<Int_t> &indicessend, vector<float> &propsendbuff, int recvTask, int tag, MPI_Comm &mpi_comm)
{
#ifdef GASON
MPI_Status status;
vector<Int_t> indicesrecv;
Int_t numsend, numrecv, numextrafields = 0, index, offset = 0;
vector<float> proprecvbuff(0);
Int_t numextrafields = 0, index, offset = 0;
string field;
HydroProperties x;

numextrafields = opt.gas_internalprop_unique_input_names.size() + opt.gas_chem_unique_input_names.size() + opt.gas_chemproduction_unique_input_names.size();
if (numextrafields == 0) return;

numsend = indicessend.size();
assert(propsendbuff.size() == numsend * numextrafields);
vector<Int_t> indicesrecv;
vector<float> proprecvbuff;
std::tie(indicesrecv, proprecvbuff) =
exchange_indices_and_props(indicessend, propsendbuff, numextrafields,
recvTask, tag, mpi_comm);

MPI_Sendrecv(&numsend, 1, MPI_Int_t, recvTask,
tag*2, &numrecv, 1, MPI_Int_t, recvTask, tag*2, mpi_comm, &status);
//send the information. If vectors are of zero size, must increase size so .data() points to a valid address
if (numsend==0) {indicessend.resize(1);propsendbuff.resize(1);}
if (numrecv==0) {indicesrecv.resize(1);proprecvbuff.resize(1);}
else {indicesrecv.resize(numrecv);proprecvbuff.resize(numrecv*numextrafields);}
MPI_Sendrecv(indicessend.data(),numsend, MPI_Int_t, recvTask,
tag*3, indicesrecv.data(),numrecv, MPI_Int_t, recvTask, tag*3, mpi_comm, &status);
MPI_Sendrecv(propsendbuff.data(),numsend*numextrafields, MPI_FLOAT, recvTask,
tag*4, proprecvbuff.data(),numrecv*numextrafields, MPI_FLOAT, recvTask, tag*4, mpi_comm, &status);
if (numrecv == 0) return;
for (auto i=0;i<numrecv;i++)
for (auto i = 0; i < indicesrecv.size(); i++)
{
index=indicesrecv[i];
FoFGroupDataLocal[index].p.SetHydroProperties(x);
Expand Down Expand Up @@ -2733,28 +2754,20 @@ void MPISendReceiveFOFHydroInfoBetweenThreads(Options &opt, fofid_in *FoFGroupDa
void MPISendReceiveFOFStarInfoBetweenThreads(Options &opt, fofid_in *FoFGroupDataLocal, vector<Int_t> &indicessend, vector<float> &propsendbuff, int recvTask, int tag, MPI_Comm &mpi_comm)
{
#ifdef STARON
MPI_Status status;
vector<Int_t> indicesrecv;
Int_t numsend, numrecv, numextrafields = 0, index, offset = 0;
vector<float> proprecvbuff(0);
Int_t numextrafields = 0, index, offset = 0;
string field;
StarProperties x;

numextrafields = opt.star_internalprop_unique_input_names.size() + opt.star_chem_unique_input_names.size() + opt.star_chemproduction_unique_input_names.size();
if (numextrafields == 0) return;
numsend = indicessend.size();
MPI_Sendrecv(&numsend, 1, MPI_Int_t, recvTask,
tag*2, &numrecv, 1, MPI_Int_t, recvTask, tag*2, mpi_comm, &status);
//send the information. If vectors are of zero size, must increase size so .data() points to a valid address
if (numsend==0) {indicessend.resize(1);propsendbuff.resize(1);}
if (numrecv==0) {indicesrecv.resize(1);proprecvbuff.resize(1);}
else {indicesrecv.resize(numrecv);proprecvbuff.resize(numrecv);}
MPI_Sendrecv(indicessend.data(),numsend, MPI_Int_t, recvTask,
tag*3, indicesrecv.data(),numrecv, MPI_Int_t, recvTask, tag*3, mpi_comm, &status);
MPI_Sendrecv(propsendbuff.data(),numsend*numextrafields, MPI_FLOAT, recvTask,
tag*4, proprecvbuff.data(),numrecv*numextrafields, MPI_FLOAT, recvTask, tag*4, mpi_comm, &status);
if (numrecv == 0) return;
for (auto i=0;i<numrecv;i++)

vector<Int_t> indicesrecv;
vector<float> proprecvbuff;
std::tie(indicesrecv, proprecvbuff) =
exchange_indices_and_props(indicessend, propsendbuff, numextrafields,
recvTask, tag, mpi_comm);

for (auto i = 0; i < indicesrecv.size(); i++)
{
index=indicesrecv[i];
FoFGroupDataLocal[index].p.SetStarProperties(x);
Expand Down Expand Up @@ -2785,28 +2798,20 @@ void MPISendReceiveFOFStarInfoBetweenThreads(Options &opt, fofid_in *FoFGroupDat
void MPISendReceiveFOFBHInfoBetweenThreads(Options &opt, fofid_in *FoFGroupDataLocal, vector<Int_t> &indicessend, vector<float> &propsendbuff, int recvTask, int tag, MPI_Comm &mpi_comm)
{
#ifdef BHON
MPI_Status status;
vector<Int_t> indicesrecv;
Int_t numsend, numrecv, numextrafields = 0, index, offset = 0;
vector<float> proprecvbuff(0);
Int_t numextrafields = 0, index, offset = 0;
string field;
BHProperties x;

numextrafields = opt.bh_internalprop_unique_input_names.size() + opt.bh_chem_unique_input_names.size() + opt.bh_chemproduction_unique_input_names.size();
if (numextrafields == 0) return;
numsend = indicessend.size();
MPI_Sendrecv(&numsend, 1, MPI_Int_t, recvTask,
tag*2, &numrecv, 1, MPI_Int_t, recvTask, tag*2, mpi_comm, &status);
//send the information. If vectors are of zero size, must increase size so .data() points to a valid address
if (numsend==0) {indicessend.resize(1);propsendbuff.resize(1);}
if (numrecv==0) {indicesrecv.resize(1);proprecvbuff.resize(1);}
else {indicesrecv.resize(numrecv);proprecvbuff.resize(numrecv);}
MPI_Sendrecv(indicessend.data(),numsend, MPI_Int_t, recvTask,
tag*3, indicesrecv.data(),numrecv, MPI_Int_t, recvTask, tag*3, mpi_comm, &status);
MPI_Sendrecv(propsendbuff.data(),numsend*numextrafields, MPI_FLOAT, recvTask,
tag*4, proprecvbuff.data(),numrecv*numextrafields, MPI_FLOAT, recvTask, tag*4, mpi_comm, &status);
if (numrecv == 0) return;
for (auto i=0;i<numrecv;i++)

vector<Int_t> indicesrecv;
vector<float> proprecvbuff;
std::tie(indicesrecv, proprecvbuff) =
exchange_indices_and_props(indicessend, propsendbuff, numextrafields,
recvTask, tag, mpi_comm);

for (auto i = 0; i < indicesrecv.size(); i++)
{
index=indicesrecv[i];
FoFGroupDataLocal[index].p.SetBHProperties(x);
Expand Down Expand Up @@ -2837,28 +2842,20 @@ void MPISendReceiveFOFBHInfoBetweenThreads(Options &opt, fofid_in *FoFGroupDataL
void MPISendReceiveFOFExtraDMInfoBetweenThreads(Options &opt, fofid_in *FoFGroupDataLocal, vector<Int_t> &indicessend, vector<float> &propsendbuff, int recvTask, int tag, MPI_Comm &mpi_comm)
{
#ifdef EXTRADMON
MPI_Status status;
vector<Int_t> indicesrecv;
Int_t numsend, numrecv, numextrafields = 0, index, offset = 0;
vector<float> proprecvbuff(0);
Int_t numextrafields = 0, index, offset = 0;
string field;
ExtraDMProperties x;

numextrafields = opt.extra_dm_internalprop_unique_input_names.size();
if (numextrafields == 0) return;
numsend = indicessend.size();
MPI_Sendrecv(&numsend, 1, MPI_Int_t, recvTask,
tag*2, &numrecv, 1, MPI_Int_t, recvTask, tag*2, mpi_comm, &status);
//send the information. If vectors are of zero size, must increase size so .data() points to a valid address
if (numsend==0) {indicessend.resize(1);propsendbuff.resize(1);}
if (numrecv==0) {indicesrecv.resize(1);proprecvbuff.resize(1);}
else {indicesrecv.resize(numrecv);proprecvbuff.resize(numrecv);}
MPI_Sendrecv(indicessend.data(),numsend, MPI_Int_t, recvTask,
tag*3, indicesrecv.data(),numrecv, MPI_Int_t, recvTask, tag*3, mpi_comm, &status);
MPI_Sendrecv(propsendbuff.data(),numsend*numextrafields, MPI_FLOAT, recvTask,
tag*4, proprecvbuff.data(),numrecv*numextrafields, MPI_FLOAT, recvTask, tag*4, mpi_comm, &status);
if (numrecv == 0) return;
for (auto i=0;i<numrecv;i++)

vector<Int_t> indicesrecv;
vector<float> proprecvbuff;
std::tie(indicesrecv, proprecvbuff) =
exchange_indices_and_props(indicessend, propsendbuff, numextrafields,
recvTask, tag, mpi_comm);

for (auto i = 0; i < indicesrecv.size(); i++)
{
index=indicesrecv[i];
FoFGroupDataLocal[index].p.SetExtraDMProperties(x);
Expand Down Expand Up @@ -4798,7 +4795,7 @@ Int_t MPILinkAcross(const Int_t nbodies, KDTree *&tree, Particle *Part, Int_t *&
Int_t MPIGroupExchange(Options &opt, const Int_t nbodies, Particle *Part, Int_t *&pfof){
Int_t i, j,nthreads,nexport,nimport,nlocal,n;
Int_t nsend_local[NProcs],noffset_import[NProcs],noffset_export[NProcs],nbuffer[NProcs];
int sendTask,recvTask;
int recvTask;
int maxchunksize=2147483648/NProcs/sizeof(fofid_in);
int nsend,nrecv,nsendchunks,nrecvchunks,numsendrecv;
int sendoffset,recvoffset;
Expand Down Expand Up @@ -4889,7 +4886,6 @@ Int_t MPIGroupExchange(Options &opt, const Int_t nbodies, Particle *Part, Int_t
{
if (j!=ThisTask)
{
sendTask = ThisTask;
recvTask = j;

if(mpi_nsend[ThisTask * NProcs + recvTask] > 0 || mpi_nsend[recvTask * NProcs + ThisTask] > 0)
Expand Down
8 changes: 2 additions & 6 deletions src/ui.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,6 @@ static void log_options_summary(const Options &opt)
/// \todo alter interface as now need to be able to specify only smdata file (no grid, res, normalized res) and functionality to specify eps, background fof search, etc
void GetArgs(int argc, char *argv[], Options &opt)
{
#ifndef USEMPI
int ThisTask =0, NProcs =1;
#endif
#if defined(USEMPI) && defined(USEPARALLELHDF)
opt.mpinprocswritesize=NProcs;
#endif
Expand Down Expand Up @@ -222,12 +219,11 @@ void GetArgs(int argc, char *argv[], Options &opt)
}
}
if(configflag){
if (ThisTask==0)
LOG(info) << "Reading config file " << opt.pname;
LOG_RANK0(info) << "Reading config file " << opt.pname;
GetParamFile(opt);
}
else {
LOG(warning) << "No configuration file passed, using default values";
LOG_RANK0(warning) << "No configuration file passed, using default values";
}
#ifdef USEMPI
MPI_Barrier(MPI_COMM_WORLD);
Expand Down