Skip to content

Commit

Permalink
Restructure SO information collection and writing
Browse files Browse the repository at this point in the history
Spherical overdensity information (Particle IDs and types) are collected
by two different routines: GetInclusiveMasses and GetSOMasses. Both
routines use the following technique: first, for "ngroup" groups, a C
array-of-vectors (AOV) of "ngroup + 1" elements was allocated via "new"
(which was eventually delete[]'ed). Vectors for each group are filled
independently as groups are processed; the loop over groups is
1-indexed, and indexing into these AOVs happens using the
same iteration variable, meaning that their 0 element is skipped.
Finally, these AOVs are passed to WriteSOCatalog for writing.
WritingSOCatalog is aware of the 1-based indexing, and additionally it
flattens the AOVs into a single vector (thus duplicating their memory
requirement) to finally write the data into the output HDF5 file.

This commit originally aimed to reduce the memory overhead of the final
writing of data into the HDF5 file (see pelahi#71). The main change required
to achieve this is to perform the flattening of data at separate times,
such that particles IDs and types are not flattened at the same time,
but one after the other, with memory from the first flattening operation
being released before the second flattening happens, thus reducing the
maximum memory requirement of the application. This goal was achieved.
However, while performing these changes two things became clear:
firstly, that using a vector-of-vectors (VOV) was a better interface
than an AOV (due to automatic memory management), and secondly that the
1-based indexing of the original AOVs introduced much complexity in the
code. The scope of these changes was then broadened to cover these two
extra changes, and therefore this commit considerably grew in size. In
particular the 0-indexing of the VOVs allowed us to more easily use more
std algorithms that clarify the intent in certain places of the code.

There are other minor changes that have also been included in this
commit, mostly to reduce variable scopes, reduce code duplication, and
such. Assertions have also been sprinkled here and there to add further
assurance that the code is working as expected.

As an unintended side-effect, this commit also fixed the
wrongly-calculated Offset dataset, which was off by one index in the
original values. This problem was reported in pelahi#108, and seems to have
always been there.
  • Loading branch information
rtobar committed Jul 12, 2021
1 parent 50807df commit 3ca9afe
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 129 deletions.
218 changes: 113 additions & 105 deletions src/io.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@

//-- IO

#include <algorithm>
#include <cassert>
#include <iterator>
#include <numeric>
#include <vector>

#include "stf.h"

#include "gadgetitems.h"
Expand Down Expand Up @@ -1208,19 +1214,57 @@ void WriteGroupPartType(Options &opt, const Int_t ngroups, Int_t *numingroup, In
LOG(info) << "Wrote particle type info in " << write_timer;
}

template <typename From, typename To>
std::vector<To> get_sizes(const std::vector<std::vector<From>> &vov)
{
std::vector<To> sizes(vov.size());
std::transform(vov.begin(), vov.end(), sizes.begin(),
[](const std::vector<From> &v) { return v.size(); });
return sizes;
}

template <typename T>
std::size_t get_total_size(const std::vector<std::vector<T>> &vov)
{
return std::accumulate(vov.begin(), vov.end(), std::size_t{0},
[](std::size_t s, const std::vector<T> &v) { return s + v.size(); }
);
}

template <typename From, typename To=From>
std::vector<To> flatten(std::vector<std::vector<From>> &vov, std::size_t expected_total)
{
#ifndef NDEBUG
assert(expected_total == get_total_size(vov));
#endif
std::vector<To> all_values;
all_values.reserve(expected_total);
for (const auto &v: vov) {
all_values.insert(all_values.end(), v.begin(), v.end());
}
vov.clear();
vov.shrink_to_fit();
return all_values;
};

///Write the particles in each SO region
///Note that this particle list will not be exclusive
///\todo optimisation memory wise can be implemented by not creating an array
///to store all ids and then copying info from the array of vectors into it.
void WriteSOCatalog(Options &opt, const Int_t ngroups, vector<Int_t> *SOpids, vector<int> *SOtypes){
void WriteSOCatalog(Options &opt, const Int_t ngroups,
std::vector<std::vector<Int_t>> &SOpids,
std::vector<std::vector<int>> &SOtypes)
{
assert(SOpids.size() == ngroups);
#if defined(GASON) || defined(STARON) || defined(BHON)
assert(SOtypes.size() == ngroups);
#else
assert(SOtypes.size() == 0);
#endif

fstream Fout;
string fname;
ostringstream os;
unsigned long ng,noffset=0,ngtot=0,nSOids=0,nSOidstot=0, nwritecommtot=0, nSOwritecommtot=0;
vector<unsigned long> offset;
vector<long long> idval;
vector<int> typeval;
vector<Int_t> numingroup;

#ifdef USEMPI
MPIBuildWriteComm(opt);
Expand All @@ -1229,10 +1273,6 @@ void WriteSOCatalog(Options &opt, const Int_t ngroups, vector<Int_t> *SOpids, ve
H5OutputFile Fhdf;
int itemp=0;
int ival;
#if defined(USEMPI) && defined(USEPARALLELHDF)
vector<Int_t> mpi_offset(NProcs);
Int_t nSOidoffset;
#endif
#endif
#ifdef USEADIOS
int adios_err;
Expand All @@ -1251,28 +1291,17 @@ void WriteSOCatalog(Options &opt, const Int_t ngroups, vector<Int_t> *SOpids, ve
int ThisTask=0,NProcs=1;
#endif

ng=ngroups;
auto mpi_total = [&](unsigned long val) {
#ifdef USEMPI
if (NProcs >1) {
MPI_Allreduce(&ng, &ngtot, 1, MPI_UNSIGNED_LONG, MPI_SUM, MPI_COMM_WORLD);
}
else {
ngtot = ng;
}
#else
ngtot=ng;
#endif
for (Int_t i=1;i<=ngroups;i++) nSOids+=SOpids[i].size();
#ifdef USEMPI
if (NProcs > 1) {
MPI_Allreduce(&nSOids, &nSOidstot, 1, MPI_UNSIGNED_LONG, MPI_SUM, MPI_COMM_WORLD);
}
else {
nSOidstot = nSOids;
}
#else
nSOidstot = nSOids;
#endif
MPI_Allreduce(MPI_IN_PLACE, &val, 1, MPI_UNSIGNED_LONG, MPI_SUM, MPI_COMM_WORLD);
#endif // USEMPI
return val;
};

unsigned long ng = ngroups;
unsigned long ngtot = mpi_total(ng);
unsigned long nSOids = get_total_size(SOpids);
unsigned long nSOidstot = mpi_total(nSOids);

os << opt.outname <<".catalog_SOlist";
#ifdef USEMPI
Expand Down Expand Up @@ -1328,6 +1357,7 @@ void WriteSOCatalog(Options &opt, const Int_t ngroups, vector<Int_t> *SOpids, ve
else if (opt.ibinaryout==OUTHDF) {
#ifdef USEPARALLELHDF
if(opt.mpinprocswritesize>1){
unsigned long nwritecommtot = 0, nSOwritecommtot = 0;
MPI_Allreduce(&ng, &nwritecommtot, 1, MPI_UNSIGNED_LONG_LONG, MPI_SUM, mpi_comm_write);
MPI_Allreduce(&nSOids, &nSOwritecommtot, 1, MPI_UNSIGNED_LONG_LONG, MPI_SUM, mpi_comm_write);
if (ThisWriteTask == 0) {
Expand Down Expand Up @@ -1392,21 +1422,15 @@ void WriteSOCatalog(Options &opt, const Int_t ngroups, vector<Int_t> *SOpids, ve

//write group size
if (opt.ibinaryout==OUTBINARY) {
numingroup.resize(ngroups+1,0);
for (auto i=1;i<=ngroups;i++) numingroup[i] = SOpids[i].size();
if (ngroups > 0 ) Fout.write((char*)&(numingroup.data())[1],sizeof(Int_t)*ngroups);
auto numingroup = get_sizes<Int_t, Int_t>(SOpids);
if (ngroups > 0 ) Fout.write((char*)numingroup.data(),sizeof(Int_t)*ngroups);
numingroup.clear();
}
#ifdef USEHDF
else if (opt.ibinaryout==OUTHDF) {
unsigned int *data=NULL;
if (ng > 0) {
data=new unsigned int[ng];
for (Int_t i=1;i<=ng;i++) data[i-1]=SOpids[i].size();
}
Fhdf.write_dataset(opt, datagroupnames.SO[itemp], ng, data);
auto data = get_sizes<Int_t, unsigned int>(SOpids);
Fhdf.write_dataset(opt, datagroupnames.SO[itemp], ng, data.data());
itemp++;
delete[] data;
}
#endif
#ifdef USEADIOS
Expand All @@ -1432,103 +1456,81 @@ void WriteSOCatalog(Options &opt, const Int_t ngroups, vector<Int_t> *SOpids, ve
Int_t mpioffset=0;
for (Int_t itask=0;itask<ThisTask;itask++)mpioffset+=mpi_ngroups[itask];
adios_err=adios_write(adios_file_handle,"ngmpioffset",&mpioffset);
unsigned int *data = NULL;
if (ng > 0){
data = new unsigned int[ng];
for (Int_t i=1;i<=ng;i++) data[i-1]=SOpids[i].size();
}
auto data = get_sizes<Int_t, unsigned int>(SOpids);
adios_err=adios_write(adios_file_handle,datagroupnames.SO[itemp].c_str(),data);
delete[] data;
itemp++;
}
#endif
else {
for (Int_t i=1;i<=ngroups;i++) Fout<<SOpids[i].size()<<endl;
for (const auto &pids : SOpids)
Fout << pids.size() << endl;
}


//Write offsets
offset.resize(ngroups+1,0);
vector<unsigned long> offsets(ngroups);
if (ngroups > 0) {
offset[0] = offset[1] = 0;
for (Int_t i=2;i<=ngroups;i++) offset[i]=offset[i-1]+SOpids[i].size();
auto sizes = get_sizes<Int_t, unsigned long>(SOpids);
std::partial_sum(sizes.begin(), std::next(sizes.end(), -1), std::next(offsets.begin()));
}

if (opt.ibinaryout==OUTBINARY) Fout.write((char*)&(offset.data())[1],sizeof(Int_t)*ngroups);
if (opt.ibinaryout==OUTBINARY) Fout.write((char*)offsets.data(),sizeof(Int_t)*ngroups);
#ifdef USEHDF
else if (opt.ibinaryout==OUTHDF) {
unsigned long *data = NULL;
if (ng > 0) {
data = new unsigned long[ng];
for (Int_t i=1;i<=ng;i++) data[i-1]=offset[i];
}
#ifdef USEPARALLELHDF
if(opt.mpinprocswritesize>1){
if(opt.mpinprocswritesize>1) {
vector<Int_t> mpi_offset(NProcs);
MPI_Allgather(&nSOids, 1, MPI_Int_t, mpi_offset.data(), 1, MPI_Int_t, mpi_comm_write);
if (ThisWriteTask > 0)
{
nSOidoffset = 0; for (auto itask = 0; itask < ThisWriteTask; itask++) nSOidoffset += mpi_offset[itask];
for (Int_t i=1; i<=ng; i++) data[i-1] += nSOidoffset;
auto task_offset = std::accumulate(mpi_offset.begin(), std::next(mpi_offset.begin(), ThisWriteTask), Int_t{0});
for (auto &offset: offsets) {
offset += task_offset;
}
}
}
#endif
Fhdf.write_dataset(opt, datagroupnames.SO[itemp], ng, data);
Fhdf.write_dataset(opt, datagroupnames.SO[itemp], ng, offsets.data());
itemp++;
delete[] data;
}
#endif
#ifdef USEADIOS
else if (opt.ibinaryout==OUTADIOS) {
//don't delcare new group, just add data
adios_err=adios_define_var(adios_grp_handle,datagroupnames.SO[itemp].c_str(),"",datagroupnames.adiosSOdatatype[itemp],"ng","ngtot","ngmpioffset");
unsigned long *data = NULL;
if (ng > 0) {
data = new unsigned long[ng];
for (Int_t i=1;i<=ng;i++) data[i-1]=offset[i];
}
adios_err=adios_write(adios_file_handle,datagroupnames.SO[itemp].c_str(),data);
adios_err=adios_write(adios_file_handle,datagroupnames.SO[itemp].c_str(), offsets.data());
delete[] data;
itemp++;
}
#endif
else {
for (Int_t i=1;i<=ngroups;i++) Fout<<offset[i]<<endl;
for (const auto offset: offsets)
Fout << offset << endl;
}
offset.clear();
offsets.clear();

if (nSOids>0) {
idval.resize(nSOids,0);
nSOids=0;
for (Int_t i=1;i<=ngroups;i++) {
for (Int_t j=0;j<SOpids[i].size();j++) {
idval[nSOids++]=SOpids[i][j];
}
SOpids[i].clear();
if (opt.ibinaryout==OUTBINARY) {
if (nSOids>0) {
auto ids = flatten<Int_t, long long>(SOpids, nSOids);
Fout.write((char*)ids.data(), sizeof(long long) * nSOids);
}
#if defined(GASON) || defined(STARON) || defined(BHON)
typeval.resize(nSOids,0);
nSOids=0;
for (Int_t i=1;i<=ngroups;i++) {
for (Int_t j=0;j<SOtypes[i].size();j++) {
typeval[nSOids++]=SOtypes[i][j];
}
SOtypes[i].clear();
if (nSOids>0) {
auto types = flatten(SOtypes, nSOids);
Fout.write((char*)types.data(), sizeof(int) * nSOids);
}
#endif
}
if (opt.ibinaryout==OUTBINARY) {
if (nSOids>0) Fout.write((char*)idval.data(),sizeof(Int_t)*nSOids);
#if defined(GASON) || defined(STARON) || defined(BHON)
if (nSOids>0) Fout.write((char*)typeval.data(),sizeof(int)*nSOids);
#endif
}
#ifdef USEHDF
else if (opt.ibinaryout==OUTHDF) {
Fhdf.write_dataset(opt, datagroupnames.SO[itemp], nSOids, idval.data());
#if defined(GASON) || defined(STARON) || defined(BHON)
itemp++;
Fhdf.write_dataset(opt, datagroupnames.SO[itemp], nSOids, typeval.data());
#endif
{
auto ids = flatten<Int_t, long long>(SOpids, nSOids);
Fhdf.write_dataset(opt, datagroupnames.SO[itemp], nSOids, ids.data());
}
#if defined(GASON) || defined(STARON) || defined(BHON)
{
itemp++;
auto types = flatten(SOtypes, nSOids);
Fhdf.write_dataset(opt, datagroupnames.SO[itemp], nSOids, types.data());
}
#endif
}
#endif
#ifdef USEADIOS
Expand All @@ -1554,15 +1556,21 @@ void WriteSOCatalog(Options &opt, const Int_t ngroups, vector<Int_t> *SOpids, ve
}
#endif
else {
for (Int_t i=0;i<nSOids;i++) Fout<<idval[i]<<endl;
{
auto ids = flatten<Int_t, long long>(SOpids, nSOids);
for (const auto id: ids) {
Fout << id << endl;
}
}
#if defined(GASON) || defined(STARON) || defined(BHON)
for (Int_t i=0;i<nSOids;i++) Fout<<typeval[i]<<endl;
{
auto types = flatten(SOtypes, nSOids);
for (const auto type: types) {
Fout << type << endl;
}
}
#endif
}
if (nSOids>0) idval.clear();
#if defined(GASON) || defined(STARON) || defined(BHON)
if (nSOids>0) typeval.clear();
#endif

if (opt.ibinaryout==OUTASCII || opt.ibinaryout==OUTBINARY) Fout.close();
#ifdef USEHDF
Expand Down
2 changes: 1 addition & 1 deletion src/proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ void WriteSimulationInfo(Options &opt);
void WriteUnitInfo(Options &opt);

///Write particle ids of those within spherical overdensity of a field halo
void WriteSOCatalog(Options &opt, const Int_t ngroups, vector<Int_t> *SOpids, vector<int> *SOtypes=NULL);
void WriteSOCatalog(Options &opt, const Int_t ngroups, std::vector<std::vector<Int_t>> &SOpids, std::vector<std::vector<int>> &SOtypes);
///Write profiles
void WriteProfiles(Options &opt, const Int_t ngroups, PropData *pdata);
///Writes ROCKSTAR like output
Expand Down
Loading

0 comments on commit 3ca9afe

Please sign in to comment.