diff --git a/CMakeLists.txt b/CMakeLists.txt index cc5cf49cb4..7f63529a1c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -182,9 +182,48 @@ if(ADIOS2_HAVE_MPI) add_definitions(-DOMPI_SKIP_MPICXX -DMPICH_SKIP_MPICXX) endif() + +#------------------------------------------------------------------------------# +# POSIX O_DIRECT is only working for Unix in adios for now +#------------------------------------------------------------------------------# +if(CYGWIN) + #-tb O_DIRECT messes up cygwin + set(ADIOS2_HAVE_O_DIRECT 0) +elseif(MSVC) + # Windows has other things but we are not using them + set(ADIOS2_HAVE_O_DIRECT 0) +elseif(APPLE) + # Mac has other things but we are not using them + set(ADIOS2_HAVE_O_DIRECT 0) +else() + + message(STATUS "Checking for O_DIRECT") + include(CheckCXXSourceCompiles) + check_cxx_source_compiles(" +#include +#include +int main(int argc, char * argv[]) { argc = O_DIRECT; } +" O_DIRECT_WORKS) + + if (O_DIRECT_WORKS) + set(ADIOS2_HAVE_O_DIRECT 1) + else() + set(ADIOS2_HAVE_O_DIRECT 0) + endif() + +endif() + +#if(NOT HAVE_O_DIRECT) +# message(WARNING " ----- The open() flag O_DIRECT is not available! ---- ") +#else() +# message(STATUS " ----- The open() flag O_DIRECT is available! ---- ") +#endif() + + set(ADIOS2_CONFIG_OPTS - BP5 DataMan DataSpaces HDF5 HDF5_VOL MHS SST CUDA Fortran MPI Python Blosc BZip2 LIBPRESSIO MGARD PNG SZ ZFP DAOS IME SysVShMem ZeroMQ Profiling Endian_Reverse + BP5 DataMan DataSpaces HDF5 HDF5_VOL MHS SST CUDA Fortran MPI Python Blosc BZip2 LIBPRESSIO MGARD PNG SZ ZFP DAOS IME SysVShMem ZeroMQ Profiling Endian_Reverse O_DIRECT ) + GenerateADIOSHeaderConfig(${ADIOS2_CONFIG_OPTS}) configure_file( ${PROJECT_SOURCE_DIR}/CTestCustom.cmake.in @@ -229,6 +268,7 @@ if(BUILD_SHARED_LIBS AND ADIOS2_RUN_INSTALL_TEST) endif() endif() + #------------------------------------------------------------------------------# # Third party libraries #------------------------------------------------------------------------------# diff --git a/examples/basics/globalArray/globalArray_write.cpp b/examples/basics/globalArray/globalArray_write.cpp index 33573a079c..d13cedd62c 100644 --- a/examples/basics/globalArray/globalArray_write.cpp +++ b/examples/basics/globalArray/globalArray_write.cpp @@ -70,6 +70,8 @@ int main(int argc, char *argv[]) io.SetParameter("NumAggregators", "1"); io.SetParameter("NumSubFiles", "1"); io.SetParameter("AsyncWrite", "Guided"); + io.SetParameter("AsyncOpen", "true"); + io.SetParameter("O_DIRECT", "true"); /* * Define global array: type, name, global dimensions diff --git a/source/adios2/engine/bp5/BP5Engine.h b/source/adios2/engine/bp5/BP5Engine.h index a7aec0a5c8..9254eaf8ef 100644 --- a/source/adios2/engine/bp5/BP5Engine.h +++ b/source/adios2/engine/bp5/BP5Engine.h @@ -123,7 +123,10 @@ class BP5Engine MACRO(CollectiveMetadata, Bool, bool, true) \ MACRO(NumAggregators, UInt, unsigned int, 0) \ MACRO(NumSubFiles, UInt, unsigned int, 999999) \ - MACRO(FileSystemPageSize, UInt, unsigned int, 4096) \ + MACRO(StripeSize, UInt, unsigned int, 4096) \ + MACRO(DirectIO, Bool, bool, true) \ + MACRO(DirectIOAlignOffset, UInt, unsigned int, 512) \ + MACRO(DirectIOAlignBuffer, UInt, unsigned int, 0) \ MACRO(AggregationType, AggregationType, int, \ (int)AggregationType::TwoLevelShm) \ MACRO(AsyncOpen, Bool, bool, true) \ diff --git a/source/adios2/engine/bp5/BP5Writer.cpp b/source/adios2/engine/bp5/BP5Writer.cpp index 7b997efa93..f9b4efdd7c 100644 --- a/source/adios2/engine/bp5/BP5Writer.cpp +++ b/source/adios2/engine/bp5/BP5Writer.cpp @@ -95,15 +95,16 @@ StepStatus BP5Writer::BeginStep(StepMode mode, const float timeoutSeconds) if (m_Parameters.BufferVType == (int)BufferVType::MallocVType) { - m_BP5Serializer.InitStep(new MallocV("BP5Writer", false, - m_Parameters.InitialBufferSize, - m_Parameters.GrowthFactor)); + m_BP5Serializer.InitStep(new MallocV( + "BP5Writer", false, m_BP5Serializer.m_BufferAlign, + m_BP5Serializer.m_BufferBlockSize, m_Parameters.InitialBufferSize, + m_Parameters.GrowthFactor)); } else { - m_BP5Serializer.InitStep(new ChunkV("BP5Writer", - false /* always copy */, - m_Parameters.BufferChunkSize)); + m_BP5Serializer.InitStep(new ChunkV( + "BP5Writer", false, m_BP5Serializer.m_BufferAlign, + m_BP5Serializer.m_BufferBlockSize, m_Parameters.BufferChunkSize)); } m_ThisTimestepDataSize = 0; @@ -118,7 +119,8 @@ void BP5Writer::PerformPuts() { PERFSTUBS_SCOPED_TIMER("BP5Writer::PerformPuts"); m_Profiler.Start("PP"); - m_BP5Serializer.PerformPuts(); + m_BP5Serializer.PerformPuts(m_Parameters.AsyncWrite || + m_Parameters.DirectIO); m_Profiler.Stop("PP"); return; } @@ -269,8 +271,8 @@ void BP5Writer::WriteData_EveryoneWrites(format::BufferV *Data, } // align to PAGE_SIZE - m_DataPos += helper::PaddingToAlignOffset(m_DataPos, - m_Parameters.FileSystemPageSize); + m_DataPos += + helper::PaddingToAlignOffset(m_DataPos, m_Parameters.StripeSize); m_StartDataPos = m_DataPos; if (!SerializedWriters && a->m_Comm.Rank() < a->m_Comm.Size() - 1) @@ -467,8 +469,8 @@ void BP5Writer::EndStep() MarshalAttributes(); // true: advances step - auto TSInfo = - m_BP5Serializer.CloseTimestep(m_WriterStep, m_Parameters.AsyncWrite); + auto TSInfo = m_BP5Serializer.CloseTimestep( + m_WriterStep, m_Parameters.AsyncWrite || m_Parameters.DirectIO); /* TSInfo includes NewMetaMetaBlocks, the MetaEncodeBuffer, the * AttributeEncodeBuffer and the data encode Vector */ @@ -589,14 +591,38 @@ void BP5Writer::InitParameters() m_Parameters.NumSubFiles = m_Parameters.NumAggregators; } - if (m_Parameters.FileSystemPageSize == 0) + if (m_Parameters.StripeSize == 0) { - m_Parameters.FileSystemPageSize = 4096; + m_Parameters.StripeSize = 4096; } - if (m_Parameters.FileSystemPageSize > 67108864) + + if (m_Parameters.StripeSize > 67108864) { // Limiting to max 64MB page size - m_Parameters.FileSystemPageSize = 67108864; + m_Parameters.StripeSize = 67108864; + } + + if (m_Parameters.DirectIO) + { + if (m_Parameters.DirectIOAlignBuffer == 0) + { + m_Parameters.DirectIOAlignBuffer = m_Parameters.DirectIOAlignOffset; + } + m_BP5Serializer.m_BufferBlockSize = m_Parameters.DirectIOAlignOffset; + m_BP5Serializer.m_BufferAlign = m_Parameters.DirectIOAlignBuffer; + if (m_Parameters.StripeSize % m_Parameters.DirectIOAlignOffset) + { + size_t k = + m_Parameters.StripeSize / m_Parameters.DirectIOAlignOffset + 1; + m_Parameters.StripeSize = k * m_Parameters.DirectIOAlignOffset; + } + if (m_Parameters.BufferChunkSize % m_Parameters.DirectIOAlignOffset) + { + size_t k = m_Parameters.BufferChunkSize / + m_Parameters.DirectIOAlignOffset + + 1; + m_Parameters.BufferChunkSize = k * m_Parameters.DirectIOAlignOffset; + } } } @@ -931,6 +957,14 @@ void BP5Writer::InitTransports() } } + if (m_Parameters.DirectIO) + { + for (size_t i = 0; i < m_IO.m_TransportsParameters.size(); ++i) + { + m_IO.m_TransportsParameters[i]["DirectIO"] = "true"; + } + } + bool useProfiler = true; if (m_IAmWritingData) @@ -953,6 +987,11 @@ void BP5Writer::InitTransports() if (m_Comm.Rank() == 0) { + // force turn off directio to metadata files + for (size_t i = 0; i < m_IO.m_TransportsParameters.size(); ++i) + { + m_IO.m_TransportsParameters[i]["DirectIO"] = "false"; + } m_FileMetaMetadataManager.OpenFiles(m_MetaMetadataFileNames, m_OpenMode, m_IO.m_TransportsParameters, useProfiler); @@ -1303,14 +1342,19 @@ void BP5Writer::FlushData(const bool isFinal) if (m_Parameters.BufferVType == (int)BufferVType::MallocVType) { DataBuf = m_BP5Serializer.ReinitStepData( - new MallocV("BP5Writer", false, m_Parameters.InitialBufferSize, - m_Parameters.GrowthFactor)); + new MallocV("BP5Writer", false, m_BP5Serializer.m_BufferAlign, + m_BP5Serializer.m_BufferBlockSize, + m_Parameters.InitialBufferSize, + m_Parameters.GrowthFactor), + m_Parameters.AsyncWrite || m_Parameters.DirectIO); } else { DataBuf = m_BP5Serializer.ReinitStepData( - new ChunkV("BP5Writer", false /* always copy */, - m_Parameters.BufferChunkSize)); + new ChunkV("BP5Writer", false, m_BP5Serializer.m_BufferAlign, + m_BP5Serializer.m_BufferBlockSize, + m_Parameters.BufferChunkSize), + m_Parameters.AsyncWrite || m_Parameters.DirectIO); } auto databufsize = DataBuf->Size(); diff --git a/source/adios2/engine/bp5/BP5Writer_EveryoneWrites_Async.cpp b/source/adios2/engine/bp5/BP5Writer_EveryoneWrites_Async.cpp index fdf9985315..c9f8f02079 100644 --- a/source/adios2/engine/bp5/BP5Writer_EveryoneWrites_Async.cpp +++ b/source/adios2/engine/bp5/BP5Writer_EveryoneWrites_Async.cpp @@ -242,8 +242,8 @@ void BP5Writer::WriteData_EveryoneWrites_Async(format::BufferV *Data, } // align to PAGE_SIZE - m_DataPos += helper::PaddingToAlignOffset(m_DataPos, - m_Parameters.FileSystemPageSize); + m_DataPos += + helper::PaddingToAlignOffset(m_DataPos, m_Parameters.StripeSize); m_StartDataPos = m_DataPos; if (a->m_Comm.Rank() < a->m_Comm.Size() - 1) diff --git a/source/adios2/engine/bp5/BP5Writer_TwoLevelShm.cpp b/source/adios2/engine/bp5/BP5Writer_TwoLevelShm.cpp index 9bf3473899..6fc96f4b58 100644 --- a/source/adios2/engine/bp5/BP5Writer_TwoLevelShm.cpp +++ b/source/adios2/engine/bp5/BP5Writer_TwoLevelShm.cpp @@ -40,8 +40,8 @@ void BP5Writer::WriteData_TwoLevelShm(format::BufferV *Data) // to arrive from the rank below // align to PAGE_SIZE (only valid on master aggregator at this point) - m_DataPos += helper::PaddingToAlignOffset(m_DataPos, - m_Parameters.FileSystemPageSize); + m_DataPos += + helper::PaddingToAlignOffset(m_DataPos, m_Parameters.StripeSize); // Each aggregator needs to know the total size they write // This calculation is valid on aggregators only @@ -59,7 +59,13 @@ void BP5Writer::WriteData_TwoLevelShm(format::BufferV *Data) if (a->m_Comm.Size() > 1) { - a->CreateShm(static_cast(maxSize), m_Parameters.MaxShmSize); + size_t alignment_size = sizeof(max_align_t); + if (m_Parameters.DirectIO) + { + alignment_size = m_Parameters.DirectIOAlignOffset; + } + a->CreateShm(static_cast(maxSize), m_Parameters.MaxShmSize, + alignment_size); } shm::TokenChain tokenChain(&a->m_Comm); @@ -74,8 +80,8 @@ void BP5Writer::WriteData_TwoLevelShm(format::BufferV *Data) &m_DataPos, 1, a->m_AggregatorChainComm.Rank() - 1, 0, "AggregatorChain token in BP5Writer::WriteData_TwoLevelShm"); // align to PAGE_SIZE - m_DataPos += helper::PaddingToAlignOffset( - m_DataPos, m_Parameters.FileSystemPageSize); + m_DataPos += helper::PaddingToAlignOffset(m_DataPos, + m_Parameters.StripeSize); } m_StartDataPos = m_DataPos; // metadata needs this info if (a->m_AggregatorChainComm.Rank() < diff --git a/source/adios2/engine/bp5/BP5Writer_TwoLevelShm_Async.cpp b/source/adios2/engine/bp5/BP5Writer_TwoLevelShm_Async.cpp index 25d5cf991d..58161da3c7 100644 --- a/source/adios2/engine/bp5/BP5Writer_TwoLevelShm_Async.cpp +++ b/source/adios2/engine/bp5/BP5Writer_TwoLevelShm_Async.cpp @@ -177,8 +177,8 @@ void BP5Writer::WriteData_TwoLevelShm_Async(format::BufferV *Data) // to arrive from the rank below // align to PAGE_SIZE (only valid on master aggregator at this point) - m_DataPos += helper::PaddingToAlignOffset(m_DataPos, - m_Parameters.FileSystemPageSize); + m_DataPos += + helper::PaddingToAlignOffset(m_DataPos, m_Parameters.StripeSize); // Each aggregator needs to know the total size they write // This calculation is valid on aggregators only @@ -196,7 +196,13 @@ void BP5Writer::WriteData_TwoLevelShm_Async(format::BufferV *Data) if (a->m_Comm.Size() > 1) { - a->CreateShm(static_cast(maxSize), m_Parameters.MaxShmSize); + size_t alignment_size = sizeof(max_align_t); + if (m_Parameters.DirectIO) + { + alignment_size = m_Parameters.DirectIOAlignOffset; + } + a->CreateShm(static_cast(maxSize), m_Parameters.MaxShmSize, + alignment_size); } if (a->m_IsAggregator) @@ -209,8 +215,8 @@ void BP5Writer::WriteData_TwoLevelShm_Async(format::BufferV *Data) &m_DataPos, 1, a->m_AggregatorChainComm.Rank() - 1, 0, "AggregatorChain token in BP5Writer::WriteData_TwoLevelShm"); // align to PAGE_SIZE - m_DataPos += helper::PaddingToAlignOffset( - m_DataPos, m_Parameters.FileSystemPageSize); + m_DataPos += helper::PaddingToAlignOffset(m_DataPos, + m_Parameters.StripeSize); } m_StartDataPos = m_DataPos; // metadata needs this info if (a->m_AggregatorChainComm.Rank() < diff --git a/source/adios2/toolkit/aggregator/mpi/MPIShmChain.cpp b/source/adios2/toolkit/aggregator/mpi/MPIShmChain.cpp index bd79ccf8f7..5da0cc6196 100644 --- a/source/adios2/toolkit/aggregator/mpi/MPIShmChain.cpp +++ b/source/adios2/toolkit/aggregator/mpi/MPIShmChain.cpp @@ -214,7 +214,8 @@ void MPIShmChain::HandshakeLinks_Complete(HandshakeStruct &hs) "aggregator, at Open"); } -void MPIShmChain::CreateShm(size_t blocksize, const size_t maxsegmentsize) +void MPIShmChain::CreateShm(size_t blocksize, const size_t maxsegmentsize, + const size_t alignment_size) { if (!m_Comm.IsMPI()) { @@ -224,21 +225,20 @@ void MPIShmChain::CreateShm(size_t blocksize, const size_t maxsegmentsize) } char *ptr; size_t structsize = sizeof(ShmSegment); - structsize += helper::PaddingToAlignOffset(structsize, sizeof(max_align_t)); + structsize += helper::PaddingToAlignOffset(structsize, alignment_size); if (!m_Rank) { - blocksize += - helper::PaddingToAlignOffset(blocksize, sizeof(max_align_t)); + blocksize += helper::PaddingToAlignOffset(blocksize, alignment_size); size_t totalsize = structsize + 2 * blocksize; if (totalsize > maxsegmentsize) { // roll back and calculate sizes from maxsegmentsize - totalsize = maxsegmentsize - sizeof(max_align_t) + 1; + totalsize = maxsegmentsize - alignment_size + 1; totalsize += - helper::PaddingToAlignOffset(totalsize, sizeof(max_align_t)); - blocksize = (totalsize - structsize) / 2 - sizeof(max_align_t) + 1; + helper::PaddingToAlignOffset(totalsize, alignment_size); + blocksize = (totalsize - structsize) / 2 - alignment_size + 1; blocksize += - helper::PaddingToAlignOffset(blocksize, sizeof(max_align_t)); + helper::PaddingToAlignOffset(blocksize, alignment_size); totalsize = structsize + 2 * blocksize; } m_Win = m_Comm.Win_allocate_shared(totalsize, 1, &ptr); diff --git a/source/adios2/toolkit/aggregator/mpi/MPIShmChain.h b/source/adios2/toolkit/aggregator/mpi/MPIShmChain.h index afd7ee5eaf..fafd846133 100644 --- a/source/adios2/toolkit/aggregator/mpi/MPIShmChain.h +++ b/source/adios2/toolkit/aggregator/mpi/MPIShmChain.h @@ -110,7 +110,8 @@ class MPIShmChain : public MPIAggregator void ResetBuffers() noexcept; // 2*blocksize+some is allocated but only up to maxsegmentsize - void CreateShm(size_t blocksize, const size_t maxsegmentsize); + void CreateShm(size_t blocksize, const size_t maxsegmentsize, + const size_t alignment_size); void DestroyShm(); private: diff --git a/source/adios2/toolkit/format/bp5/BP5Serializer.cpp b/source/adios2/toolkit/format/bp5/BP5Serializer.cpp index 7f2ac70c29..dae36615d6 100644 --- a/source/adios2/toolkit/format/bp5/BP5Serializer.cpp +++ b/source/adios2/toolkit/format/bp5/BP5Serializer.cpp @@ -546,10 +546,10 @@ size_t BP5Serializer::CalcSize(const size_t Count, const size_t *Vals) return Elems; } -void BP5Serializer::PerformPuts() +void BP5Serializer::PerformPuts(bool forceCopyDeferred) { // Dump data for externs into iovec - DumpDeferredBlocks(); + DumpDeferredBlocks(forceCopyDeferred); CurDataBuffer->CopyExternalToInternal(); } @@ -883,7 +883,8 @@ void BP5Serializer::InitStep(BufferV *DataBuffer) m_PriorDataBufferSizeTotal = 0; } -BufferV *BP5Serializer::ReinitStepData(BufferV *DataBuffer) +BufferV *BP5Serializer::ReinitStepData(BufferV *DataBuffer, + bool forceCopyDeferred) { if (CurDataBuffer == NULL) { @@ -891,10 +892,10 @@ BufferV *BP5Serializer::ReinitStepData(BufferV *DataBuffer) "ReinitStepData", "without prior Init"); } // Dump data for externs into iovec - DumpDeferredBlocks(); + DumpDeferredBlocks(forceCopyDeferred); m_PriorDataBufferSizeTotal += CurDataBuffer->AddToVec( - 0, NULL, sizeof(max_align_t), true); // output block size aligned + 0, NULL, m_BufferBlockSize, true); // output block size aligned BufferV *tmp = CurDataBuffer; CurDataBuffer = DataBuffer; @@ -970,7 +971,7 @@ BP5Serializer::TimestepInfo BP5Serializer::CloseTimestep(int timestep, DumpDeferredBlocks(forceCopyDeferred); MBase->DataBlockSize = CurDataBuffer->AddToVec( - 0, NULL, sizeof(max_align_t), true); // output block size aligned + 0, NULL, m_BufferBlockSize, true); // output block size aligned MBase->DataBlockSize += m_PriorDataBufferSizeTotal; diff --git a/source/adios2/toolkit/format/bp5/BP5Serializer.h b/source/adios2/toolkit/format/bp5/BP5Serializer.h index 6d2372cf7a..5c91de3d4c 100644 --- a/source/adios2/toolkit/format/bp5/BP5Serializer.h +++ b/source/adios2/toolkit/format/bp5/BP5Serializer.h @@ -81,10 +81,11 @@ class BP5Serializer : virtual public BP5Base * those offsets are relative to the entire sequence of data * produced by a writer rank. */ - BufferV *ReinitStepData(BufferV *DataBuffer); + BufferV *ReinitStepData(BufferV *DataBuffer, + bool forceCopyDeferred = false); TimestepInfo CloseTimestep(int timestep, bool forceCopyDeferred = false); - void PerformPuts(); + void PerformPuts(bool forceCopyDeferred = false); core::Engine *m_Engine = NULL; @@ -111,6 +112,10 @@ class BP5Serializer : virtual public BP5Base /* Variables to help appending to existing file */ size_t m_PreMetaMetadataFileLength = 0; + size_t m_BufferAlign = 1; // align buffers in memory + // align buffers to integer multiples of block size + size_t m_BufferBlockSize = sizeof(max_align_t); + private: void Init(); typedef struct _BP5WriterRec @@ -157,6 +162,7 @@ class BP5Serializer : virtual public BP5Base size_t MetadataSize = 0; BufferV *CurDataBuffer = NULL; + std::vector PreviousMetaMetaInfoBlocks; size_t m_PriorDataBufferSizeTotal = 0; diff --git a/source/adios2/toolkit/format/buffer/BufferV.cpp b/source/adios2/toolkit/format/buffer/BufferV.cpp index 7e488acec7..abbd78f117 100644 --- a/source/adios2/toolkit/format/buffer/BufferV.cpp +++ b/source/adios2/toolkit/format/buffer/BufferV.cpp @@ -16,13 +16,15 @@ namespace adios2 namespace format { -char BufferV::zero[] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, +/*char BufferV::zero[] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};*/ -BufferV::BufferV(const std::string type, const bool AlwaysCopy) -: m_Type(type), m_AlwaysCopy(AlwaysCopy) +BufferV::BufferV(const std::string type, const bool AlwaysCopy, + const size_t MemAlign, const size_t MemBlockSize) +: m_Type(type), m_MemAlign(MemAlign), m_MemBlockSize(MemBlockSize), + m_AlwaysCopy(AlwaysCopy) { } @@ -43,8 +45,11 @@ void BufferV::AlignBuffer(const size_t align) if (badAlign) { size_t addAlign = align - badAlign; - assert(addAlign < sizeof(max_align_t)); - AddToVec(addAlign, zero, 1, true); + if (addAlign > zero.size()) + { + zero.resize(addAlign); + } + AddToVec(addAlign, zero.data(), 1, true); } } diff --git a/source/adios2/toolkit/format/buffer/BufferV.h b/source/adios2/toolkit/format/buffer/BufferV.h index be1d28be80..df8a84359e 100644 --- a/source/adios2/toolkit/format/buffer/BufferV.h +++ b/source/adios2/toolkit/format/buffer/BufferV.h @@ -21,10 +21,14 @@ class BufferV { public: const std::string m_Type; + const size_t m_MemAlign; // allocate each pointer aligned + // keep each buffer integer multiple of this size + const size_t m_MemBlockSize; uint64_t Size() noexcept; - BufferV(const std::string type, const bool AlwaysCopy = false); + BufferV(const std::string type, const bool AlwaysCopy = false, + const size_t MemAlign = 1, const size_t MemBlockSize = 1); virtual ~BufferV(); virtual std::vector DataVec() noexcept = 0; @@ -68,7 +72,7 @@ class BufferV virtual void *GetPtr(int bufferIdx, size_t posInBuffer) = 0; protected: - static char zero[64]; + std::vector zero; const bool m_AlwaysCopy = false; struct VecEntry diff --git a/source/adios2/toolkit/format/buffer/chunk/ChunkV.cpp b/source/adios2/toolkit/format/buffer/chunk/ChunkV.cpp index 8ace4e702e..f20b6caa5f 100644 --- a/source/adios2/toolkit/format/buffer/chunk/ChunkV.cpp +++ b/source/adios2/toolkit/format/buffer/chunk/ChunkV.cpp @@ -22,8 +22,9 @@ namespace format { ChunkV::ChunkV(const std::string type, const bool AlwaysCopy, + const size_t MemAlign, const size_t MemBlockSize, const size_t ChunkSize) -: BufferV(type, AlwaysCopy), m_ChunkSize(ChunkSize) +: BufferV(type, AlwaysCopy, MemAlign, MemBlockSize), m_ChunkSize(ChunkSize) { } @@ -31,7 +32,41 @@ ChunkV::~ChunkV() { for (const auto &Chunk : m_Chunks) { - free((void *)Chunk); + free(Chunk.AllocatedPtr); + } +} + +size_t ChunkV::ChunkAlloc(Chunk &v, const size_t size) +{ + // try to alloc/realloc a buffer to requested size + // first, size must be aligned with block size + size_t actualsize = size; + size_t rem = size % m_MemBlockSize; + if (rem) + { + actualsize = actualsize + (m_MemBlockSize - rem); + } + + // align usable buffer to m_MemAlign bytes + void *b = realloc(v.AllocatedPtr, actualsize + m_MemAlign - 1); + if (b) + { + if (b != v.AllocatedPtr) + { + v.AllocatedPtr = b; + size_t p = (size_t)v.AllocatedPtr; + v.Ptr = (char *)((p + m_MemAlign - 1) & ~(m_MemAlign - 1)); + } + v.Size = actualsize; + return actualsize; + } + else + { + std::cout << "ADIOS2 ERROR: Cannot (re)allocate " << actualsize + << " bytes for a chunk in ChunkV. " + "Continue buffering with chunk size " + << v.Size / 1048576 << " MB" << std::endl; + return 0; } } @@ -50,18 +85,16 @@ void ChunkV::CopyExternalToInternal() { // No room in current chunk, close it out // realloc down to used size (helpful?) and set size in array - m_Chunks.back() = - (char *)realloc(m_Chunks.back(), m_TailChunkPos); - + ChunkAlloc(m_Chunks.back(), m_TailChunkPos); m_TailChunkPos = 0; - m_TailChunk = NULL; + m_TailChunk = nullptr; AppendPossible = false; } if (AppendPossible) { // We can use current chunk, just append the data and modify the // DataV entry - memcpy(m_TailChunk + m_TailChunkPos, DataV[i].Base, size); + memcpy(m_TailChunk->Ptr + m_TailChunkPos, DataV[i].Base, size); DataV[i].External = false; DataV[i].Base = m_TailChunk + m_TailChunkPos; m_TailChunkPos += size; @@ -72,11 +105,13 @@ void ChunkV::CopyExternalToInternal() size_t NewSize = m_ChunkSize; if (size > m_ChunkSize) NewSize = size; - m_TailChunk = (char *)malloc(NewSize); - m_Chunks.push_back(m_TailChunk); - memcpy(m_TailChunk, DataV[i].Base, size); + Chunk c{nullptr, nullptr, 0}; + ChunkAlloc(c, NewSize); + m_Chunks.push_back(c); + m_TailChunk = &m_Chunks.back(); + memcpy(m_TailChunk->Ptr, DataV[i].Base, size); m_TailChunkPos = size; - DataV[i] = {false, m_TailChunk, 0, size}; + DataV[i] = {false, m_TailChunk->Ptr, 0, size}; } } } @@ -85,14 +120,14 @@ void ChunkV::CopyExternalToInternal() size_t ChunkV::AddToVec(const size_t size, const void *buf, size_t align, bool CopyReqd, MemorySpace MemSpace) { + AlignBuffer(align); // may call back AddToVec recursively + size_t retOffset = CurOffset; + if (size == 0) { return CurOffset; } - AlignBuffer(align); - size_t retOffset = CurOffset; - if (!CopyReqd && !m_AlwaysCopy) { // just add buf to internal version of output vector @@ -105,17 +140,24 @@ size_t ChunkV::AddToVec(const size_t size, const void *buf, size_t align, // internal bool AppendPossible = DataV.size() && !DataV.back().External && - (m_TailChunk + m_TailChunkPos - DataV.back().Size == + (m_TailChunk->Ptr + m_TailChunkPos - DataV.back().Size == DataV.back().Base); if (AppendPossible && (m_TailChunkPos + size > m_ChunkSize)) { // No room in current chunk, close it out // realloc down to used size (helpful?) and set size in array - m_Chunks.back() = (char *)realloc(m_Chunks.back(), m_TailChunkPos); - + size_t actualsize = ChunkAlloc(m_Chunks.back(), m_TailChunkPos); + size_t alignment = actualsize - m_TailChunkPos; + if (alignment) + { + auto p = m_Chunks.back().Ptr + m_TailChunkPos; + std::fill(p, p + alignment, 0); + } + retOffset += alignment; + DataV.back().Size = actualsize; m_TailChunkPos = 0; - m_TailChunk = NULL; + m_TailChunk = nullptr; AppendPossible = false; } if (AppendPossible) @@ -131,11 +173,13 @@ size_t ChunkV::AddToVec(const size_t size, const void *buf, size_t align, size_t NewSize = m_ChunkSize; if (size > m_ChunkSize) NewSize = size; - m_TailChunk = (char *)malloc(NewSize); - m_Chunks.push_back(m_TailChunk); + Chunk c{nullptr, nullptr, 0}; + ChunkAlloc(c, NewSize); + m_Chunks.push_back(c); + m_TailChunk = &m_Chunks.back(); CopyDataToBuffer(size, buf, 0, MemSpace); m_TailChunkPos = size; - VecEntry entry = {false, m_TailChunk, 0, size}; + VecEntry entry = {false, m_TailChunk->Ptr, 0, size}; DataV.push_back(entry); } } @@ -149,11 +193,11 @@ void ChunkV::CopyDataToBuffer(const size_t size, const void *buf, size_t pos, #ifdef ADIOS2_HAVE_CUDA if (MemSpace == MemorySpace::CUDA) { - helper::CudaMemCopyToBuffer(m_TailChunk, pos, buf, size); + helper::CudaMemCopyToBuffer(m_TailChunk->Ptr, pos, buf, size); return; } #endif - memcpy(m_TailChunk + pos, buf, size); + memcpy(m_TailChunk->Ptr + pos, buf, size); } BufferV::BufferPos ChunkV::Allocate(const size_t size, size_t align) @@ -169,16 +213,23 @@ BufferV::BufferPos ChunkV::Allocate(const size_t size, size_t align) // internal bool AppendPossible = DataV.size() && !DataV.back().External && - (m_TailChunk + m_TailChunkPos - DataV.back().Size == DataV.back().Base); + (m_TailChunk->Ptr + m_TailChunkPos - DataV.back().Size == + DataV.back().Base); if (AppendPossible && (m_TailChunkPos + size > m_ChunkSize)) { // No room in current chunk, close it out // realloc down to used size (helpful?) and set size in array - m_Chunks.back() = (char *)realloc(m_Chunks.back(), m_TailChunkPos); - + size_t actualsize = ChunkAlloc(m_Chunks.back(), m_TailChunkPos); + size_t alignment = actualsize - m_TailChunkPos; + if (alignment) + { + auto p = m_Chunks.back().Ptr + m_TailChunkPos; + std::fill(p, p + alignment, 0); + } + DataV.back().Size = actualsize; m_TailChunkPos = 0; - m_TailChunk = NULL; + m_TailChunk = nullptr; AppendPossible = false; } @@ -196,11 +247,13 @@ BufferV::BufferPos ChunkV::Allocate(const size_t size, size_t align) size_t NewSize = m_ChunkSize; if (size > m_ChunkSize) NewSize = size; - m_TailChunk = (char *)malloc(NewSize); - m_Chunks.push_back(m_TailChunk); + Chunk c{nullptr, nullptr, 0}; + ChunkAlloc(c, NewSize); + m_Chunks.push_back(c); + m_TailChunk = &m_Chunks.back(); bufferPos = 0; m_TailChunkPos = size; - VecEntry entry = {false, m_TailChunk, 0, size}; + VecEntry entry = {false, m_TailChunk->Ptr, 0, size}; DataV.push_back(entry); } diff --git a/source/adios2/toolkit/format/buffer/chunk/ChunkV.h b/source/adios2/toolkit/format/buffer/chunk/ChunkV.h index 4642108c5f..9187dffddc 100644 --- a/source/adios2/toolkit/format/buffer/chunk/ChunkV.h +++ b/source/adios2/toolkit/format/buffer/chunk/ChunkV.h @@ -26,6 +26,7 @@ class ChunkV : public BufferV const size_t m_ChunkSize; ChunkV(const std::string type, const bool AlwaysCopy = false, + const size_t MemAlign = 1, const size_t MemBlockSize = 1, const size_t ChunkSize = DefaultBufferChunkSize); virtual ~ChunkV(); @@ -45,9 +46,21 @@ class ChunkV : public BufferV MemorySpace MemSpace); private: - std::vector m_Chunks; + struct Chunk + { + char *Ptr; // aligned, do not free + void *AllocatedPtr; // original ptr, free this + size_t Size; + }; + + std::vector m_Chunks; size_t m_TailChunkPos = 0; - char *m_TailChunk = NULL; + Chunk *m_TailChunk = nullptr; + + // allocator function, doing aligned allocation of memory + // return true if (re)allocation is successful + // on failure, VecEntry is unmodified + size_t ChunkAlloc(Chunk &v, const size_t size); }; } // end namespace format diff --git a/source/adios2/toolkit/format/buffer/malloc/MallocV.cpp b/source/adios2/toolkit/format/buffer/malloc/MallocV.cpp index c23cb1e415..9c6cb19c20 100644 --- a/source/adios2/toolkit/format/buffer/malloc/MallocV.cpp +++ b/source/adios2/toolkit/format/buffer/malloc/MallocV.cpp @@ -20,9 +20,10 @@ namespace format { MallocV::MallocV(const std::string type, const bool AlwaysCopy, + const size_t MemAlign, const size_t MemBlockSize, size_t InitialBufferSize, double GrowthFactor) -: BufferV(type, AlwaysCopy), m_InitialBufferSize(InitialBufferSize), - m_GrowthFactor(GrowthFactor) +: BufferV(type, AlwaysCopy, MemAlign, MemBlockSize), + m_InitialBufferSize(InitialBufferSize), m_GrowthFactor(GrowthFactor) { } @@ -87,14 +88,14 @@ void MallocV::CopyExternalToInternal() size_t MallocV::AddToVec(const size_t size, const void *buf, size_t align, bool CopyReqd, MemorySpace MemSpace) { + AlignBuffer(align); // may call back AddToVec recursively + size_t retOffset = CurOffset; + if (size == 0) { return CurOffset; } - AlignBuffer(align); - size_t retOffset = CurOffset; - if (!CopyReqd && !m_AlwaysCopy) { // just add buf to internal version of output vector diff --git a/source/adios2/toolkit/format/buffer/malloc/MallocV.h b/source/adios2/toolkit/format/buffer/malloc/MallocV.h index e78b69b945..ef4e7c2338 100644 --- a/source/adios2/toolkit/format/buffer/malloc/MallocV.h +++ b/source/adios2/toolkit/format/buffer/malloc/MallocV.h @@ -24,6 +24,7 @@ class MallocV : public BufferV uint64_t Size() noexcept; MallocV(const std::string type, const bool AlwaysCopy = false, + const size_t MemAlign = 1, const size_t MemBlockSize = 1, size_t InitialBufferSize = DefaultInitialBufferSize, double GrowthFactor = DefaultBufferGrowthFactor); virtual ~MallocV(); diff --git a/source/adios2/toolkit/transport/Transport.cpp b/source/adios2/toolkit/transport/Transport.cpp index eb70cb8d95..2d48401349 100644 --- a/source/adios2/toolkit/transport/Transport.cpp +++ b/source/adios2/toolkit/transport/Transport.cpp @@ -81,7 +81,8 @@ void Transport::InitProfiler(const Mode openMode, const TimeUnit timeUnit) } void Transport::OpenChain(const std::string &name, const Mode openMode, - const helper::Comm &chainComm, const bool async) + const helper::Comm &chainComm, const bool async, + const bool directio) { std::invalid_argument("ERROR: " + m_Name + " transport type " + m_Type + " using library " + m_Library + diff --git a/source/adios2/toolkit/transport/Transport.h b/source/adios2/toolkit/transport/Transport.h index e828fab822..4511155355 100644 --- a/source/adios2/toolkit/transport/Transport.h +++ b/source/adios2/toolkit/transport/Transport.h @@ -65,7 +65,8 @@ class Transport * @param async */ virtual void Open(const std::string &name, const Mode openMode, - const bool async = false) = 0; + const bool async = false, + const bool directio = false) = 0; /** * Opens transport, possibly asynchronously, in a chain to avoid @@ -78,7 +79,8 @@ class Transport */ virtual void OpenChain(const std::string &name, Mode openMode, const helper::Comm &chainComm, - const bool async = false); + const bool async = false, + const bool directio = false); /** * If OS buffered (FILE* or fstream), sets the buffer size diff --git a/source/adios2/toolkit/transport/file/FileDaos.cpp b/source/adios2/toolkit/transport/file/FileDaos.cpp index c9aa2f7007..2dd0cd81fd 100644 --- a/source/adios2/toolkit/transport/file/FileDaos.cpp +++ b/source/adios2/toolkit/transport/file/FileDaos.cpp @@ -518,7 +518,7 @@ void FileDaos::WaitForOpen() } void FileDaos::Open(const std::string &name, const Mode openMode, - const bool async) + const bool async, const bool directio) { int rc; diff --git a/source/adios2/toolkit/transport/file/FileDaos.h b/source/adios2/toolkit/transport/file/FileDaos.h index 810267a5cc..411fb55def 100644 --- a/source/adios2/toolkit/transport/file/FileDaos.h +++ b/source/adios2/toolkit/transport/file/FileDaos.h @@ -35,8 +35,9 @@ class FileDaos : public Transport void SetParameters(const Params ¶meters); + /** directio option is ignored in this transport */ void Open(const std::string &name, const Mode openMode, - const bool async = false) final; + const bool async = false, const bool directio = false) final; void Write(const char *buffer, size_t size, size_t start = MaxSizeT) final; diff --git a/source/adios2/toolkit/transport/file/FileFStream.cpp b/source/adios2/toolkit/transport/file/FileFStream.cpp index 1fdd3bfe7e..2f7268fe1d 100644 --- a/source/adios2/toolkit/transport/file/FileFStream.cpp +++ b/source/adios2/toolkit/transport/file/FileFStream.cpp @@ -46,7 +46,7 @@ void FileFStream::WaitForOpen() } void FileFStream::Open(const std::string &name, const Mode openMode, - const bool async) + const bool async, const bool directio) { auto lf_AsyncOpenWrite = [&](const std::string &name) -> void { ProfilerStart("open"); @@ -107,7 +107,8 @@ void FileFStream::Open(const std::string &name, const Mode openMode, } void FileFStream::OpenChain(const std::string &name, Mode openMode, - const helper::Comm &chainComm, const bool async) + const helper::Comm &chainComm, const bool async, + const bool directio) { auto lf_AsyncOpenWrite = [&](const std::string &name) -> void { ProfilerStart("open"); diff --git a/source/adios2/toolkit/transport/file/FileFStream.h b/source/adios2/toolkit/transport/file/FileFStream.h index f9441c52d3..3f2e022474 100644 --- a/source/adios2/toolkit/transport/file/FileFStream.h +++ b/source/adios2/toolkit/transport/file/FileFStream.h @@ -32,12 +32,13 @@ class FileFStream : public Transport ~FileFStream() = default; + /** directio option is ignored in this transport */ void Open(const std::string &name, const Mode openMode, - const bool async = false) final; + const bool async = false, const bool directio = false) final; void OpenChain(const std::string &name, Mode openMode, - const helper::Comm &chainComm, - const bool async = false) final; + const helper::Comm &chainComm, const bool async = false, + const bool directio = false) final; void SetBuffer(char *buffer, size_t size) final; diff --git a/source/adios2/toolkit/transport/file/FileIME.cpp b/source/adios2/toolkit/transport/file/FileIME.cpp index 96f442ab50..f0f72edd37 100644 --- a/source/adios2/toolkit/transport/file/FileIME.cpp +++ b/source/adios2/toolkit/transport/file/FileIME.cpp @@ -71,7 +71,7 @@ FileIME::~FileIME() /** Note that async mode is unsupported in FileIME. */ void FileIME::Open(const std::string &name, const Mode openMode, - const bool async) + const bool async, const bool directio) { /** DEFAULT_IME_FILE_PREFIX is "ime://" */ m_Name = DEFAULT_IME_FILE_PREFIX; diff --git a/source/adios2/toolkit/transport/file/FileIME.h b/source/adios2/toolkit/transport/file/FileIME.h index f45b63a9a4..92532714c7 100644 --- a/source/adios2/toolkit/transport/file/FileIME.h +++ b/source/adios2/toolkit/transport/file/FileIME.h @@ -36,7 +36,7 @@ class FileIME : public Transport /** Async option is ignored in FileIME transport */ void Open(const std::string &name, const Mode openMode, - const bool async = false) final; + const bool async = false, const bool directio = false) final; void SetParameters(const Params ¶meters) final; diff --git a/source/adios2/toolkit/transport/file/FilePOSIX.cpp b/source/adios2/toolkit/transport/file/FilePOSIX.cpp index 673081fd37..e0776e2ff1 100644 --- a/source/adios2/toolkit/transport/file/FilePOSIX.cpp +++ b/source/adios2/toolkit/transport/file/FilePOSIX.cpp @@ -10,6 +10,12 @@ #include "FilePOSIX.h" #include "adios2/helper/adiosLog.h" +#ifdef ADIOS2_HAVE_O_DIRECT +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif +#endif + #include // remove #include // strerror #include // errno @@ -58,13 +64,29 @@ void FilePOSIX::WaitForOpen() } } +static int __GetOpenFlag(const int flag, const bool directio) +{ +#ifdef ADIOS2_HAVE_O_DIRECT + if (directio) + { + return flag | O_DIRECT; + } + else +#endif + { + return flag; + } +} + void FilePOSIX::Open(const std::string &name, const Mode openMode, - const bool async) + const bool async, const bool directio) { - auto lf_AsyncOpenWrite = [&](const std::string &name) -> int { + auto lf_AsyncOpenWrite = [&](const std::string &name, + const bool directio) -> int { ProfilerStart("open"); errno = 0; - int FD = open(m_Name.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0666); + int flag = __GetOpenFlag(O_WRONLY | O_CREAT | O_TRUNC, directio); + int FD = open(m_Name.c_str(), flag, 0666); m_Errno = errno; ProfilerStop("open"); return FD; @@ -72,6 +94,7 @@ void FilePOSIX::Open(const std::string &name, const Mode openMode, m_Name = name; CheckName(); + m_DirectIO = directio; m_OpenMode = openMode; switch (m_OpenMode) { @@ -80,15 +103,16 @@ void FilePOSIX::Open(const std::string &name, const Mode openMode, if (async) { m_IsOpening = true; - m_OpenFuture = - std::async(std::launch::async, lf_AsyncOpenWrite, name); + m_OpenFuture = std::async(std::launch::async, lf_AsyncOpenWrite, + name, directio); } else { ProfilerStart("open"); errno = 0; - m_FileDescriptor = - open(m_Name.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0666); + m_FileDescriptor = open( + m_Name.c_str(), + __GetOpenFlag(O_WRONLY | O_CREAT | O_TRUNC, directio), 0666); m_Errno = errno; ProfilerStop("open"); } @@ -98,7 +122,8 @@ void FilePOSIX::Open(const std::string &name, const Mode openMode, ProfilerStart("open"); errno = 0; // m_FileDescriptor = open(m_Name.c_str(), O_RDWR); - m_FileDescriptor = open(m_Name.c_str(), O_RDWR | O_CREAT, 0777); + m_FileDescriptor = open( + m_Name.c_str(), __GetOpenFlag(O_RDWR | O_CREAT, directio), 0777); lseek(m_FileDescriptor, 0, SEEK_END); m_Errno = errno; ProfilerStop("open"); @@ -125,12 +150,15 @@ void FilePOSIX::Open(const std::string &name, const Mode openMode, } void FilePOSIX::OpenChain(const std::string &name, Mode openMode, - const helper::Comm &chainComm, const bool async) + const helper::Comm &chainComm, const bool async, + const bool directio) { - auto lf_AsyncOpenWrite = [&](const std::string &name) -> int { + auto lf_AsyncOpenWrite = [&](const std::string &name, + const bool directio) -> int { ProfilerStart("open"); errno = 0; - int FD = open(m_Name.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0666); + int flag = __GetOpenFlag(O_WRONLY | O_CREAT | O_TRUNC, directio); + int FD = open(m_Name.c_str(), flag, 0666); m_Errno = errno; ProfilerStop("open"); return FD; @@ -146,6 +174,7 @@ void FilePOSIX::OpenChain(const std::string &name, Mode openMode, "Chain token in FilePOSIX::OpenChain"); } + m_DirectIO = directio; m_OpenMode = openMode; switch (m_OpenMode) { @@ -156,8 +185,8 @@ void FilePOSIX::OpenChain(const std::string &name, Mode openMode, // only when process is a single writer, can create the file // asynchronously, otherwise other processes are waiting on it m_IsOpening = true; - m_OpenFuture = - std::async(std::launch::async, lf_AsyncOpenWrite, name); + m_OpenFuture = std::async(std::launch::async, lf_AsyncOpenWrite, + name, directio); } else { @@ -166,11 +195,14 @@ void FilePOSIX::OpenChain(const std::string &name, Mode openMode, if (chainComm.Rank() == 0) { m_FileDescriptor = - open(m_Name.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0666); + open(m_Name.c_str(), + __GetOpenFlag(O_WRONLY | O_CREAT | O_TRUNC, directio), + 0666); } else { - m_FileDescriptor = open(m_Name.c_str(), O_WRONLY, 0666); + m_FileDescriptor = open( + m_Name.c_str(), __GetOpenFlag(O_WRONLY, directio), 0666); lseek(m_FileDescriptor, 0, SEEK_SET); } m_Errno = errno; @@ -183,11 +215,14 @@ void FilePOSIX::OpenChain(const std::string &name, Mode openMode, errno = 0; if (chainComm.Rank() == 0) { - m_FileDescriptor = open(m_Name.c_str(), O_RDWR | O_CREAT, 0666); + m_FileDescriptor = + open(m_Name.c_str(), __GetOpenFlag(O_RDWR | O_CREAT, directio), + 0666); } else { - m_FileDescriptor = open(m_Name.c_str(), O_RDWR); + m_FileDescriptor = + open(m_Name.c_str(), __GetOpenFlag(O_RDWR, directio)); } lseek(m_FileDescriptor, 0, SEEK_END); m_Errno = errno; @@ -248,6 +283,18 @@ void FilePOSIX::Write(const char *buffer, size_t size, size_t start) } }; + /*auto lf_DirectIOCheck = [&](const char *buffer, size_t size, + size_t offset) { + if (m_DirectIO) + { + auto mempos = (uintptr_t)buffer; + std::cout << "FilePOSIX::Write directio" << m_Name + << " offset = " << offset << " size = " << size + << " mempos = " << mempos << " mem%512 = " << mempos % 512 + << std::endl; + } + };*/ + WaitForOpen(); if (start != MaxSizeT) { @@ -277,13 +324,17 @@ void FilePOSIX::Write(const char *buffer, size_t size, size_t start) size_t position = 0; for (size_t b = 0; b < batches; ++b) { + /* lf_DirectIOCheck(&buffer[position], DefaultMaxFileBatchSize, + start + position);*/ lf_Write(&buffer[position], DefaultMaxFileBatchSize); position += DefaultMaxFileBatchSize; } + // lf_DirectIOCheck(&buffer[position], remainder, start + position); lf_Write(&buffer[position], remainder); } else { + // lf_DirectIOCheck(buffer, size, start); lf_Write(buffer, size); } } diff --git a/source/adios2/toolkit/transport/file/FilePOSIX.h b/source/adios2/toolkit/transport/file/FilePOSIX.h index 255dbba2b3..4f352b833c 100644 --- a/source/adios2/toolkit/transport/file/FilePOSIX.h +++ b/source/adios2/toolkit/transport/file/FilePOSIX.h @@ -35,11 +35,11 @@ class FilePOSIX : public Transport ~FilePOSIX(); void Open(const std::string &name, const Mode openMode, - const bool async = false) final; + const bool async = false, const bool directio = false) final; void OpenChain(const std::string &name, Mode openMode, - const helper::Comm &chainComm, - const bool async = false) final; + const helper::Comm &chainComm, const bool async = false, + const bool directio = false) final; void Write(const char *buffer, size_t size, size_t start = MaxSizeT) final; @@ -76,6 +76,7 @@ class FilePOSIX : public Transport int m_Errno = 0; bool m_IsOpening = false; std::future m_OpenFuture; + bool m_DirectIO = false; /** * Check if m_FileDescriptor is -1 after an operation diff --git a/source/adios2/toolkit/transport/file/FileStdio.cpp b/source/adios2/toolkit/transport/file/FileStdio.cpp index 2621d9db7e..2981388fc6 100644 --- a/source/adios2/toolkit/transport/file/FileStdio.cpp +++ b/source/adios2/toolkit/transport/file/FileStdio.cpp @@ -61,7 +61,7 @@ void FileStdio::WaitForOpen() } void FileStdio::Open(const std::string &name, const Mode openMode, - const bool async) + const bool async, const bool directio) { auto lf_AsyncOpenWrite = [&](const std::string &name) -> FILE * { errno = 0; @@ -112,7 +112,8 @@ void FileStdio::Open(const std::string &name, const Mode openMode, } void FileStdio::OpenChain(const std::string &name, Mode openMode, - const helper::Comm &chainComm, const bool async) + const helper::Comm &chainComm, const bool async, + const bool directio) { auto lf_AsyncOpenWrite = [&](const std::string &name) -> FILE * { errno = 0; diff --git a/source/adios2/toolkit/transport/file/FileStdio.h b/source/adios2/toolkit/transport/file/FileStdio.h index f0ccfaf043..7882004626 100644 --- a/source/adios2/toolkit/transport/file/FileStdio.h +++ b/source/adios2/toolkit/transport/file/FileStdio.h @@ -34,12 +34,13 @@ class FileStdio : public Transport ~FileStdio(); + /** directio option is ignored in this transport */ void Open(const std::string &name, const Mode openMode, - const bool async = false) final; + const bool async = false, const bool directio = false) final; void OpenChain(const std::string &name, Mode openMode, - const helper::Comm &chainComm, - const bool async = false) final; + const helper::Comm &chainComm, const bool async = false, + const bool directio = false) final; void SetBuffer(char *buffer, size_t size) final; diff --git a/source/adios2/toolkit/transport/null/NullTransport.cpp b/source/adios2/toolkit/transport/null/NullTransport.cpp index a476ec9c86..ee2d9582f5 100644 --- a/source/adios2/toolkit/transport/null/NullTransport.cpp +++ b/source/adios2/toolkit/transport/null/NullTransport.cpp @@ -33,7 +33,7 @@ NullTransport::NullTransport(helper::Comm const &comm) NullTransport::~NullTransport() = default; void NullTransport::Open(const std::string &name, const Mode openMode, - const bool async) + const bool async, const bool directio) { if (Impl->IsOpen) { diff --git a/source/adios2/toolkit/transport/null/NullTransport.h b/source/adios2/toolkit/transport/null/NullTransport.h index 28c9f272b0..1595ce793a 100644 --- a/source/adios2/toolkit/transport/null/NullTransport.h +++ b/source/adios2/toolkit/transport/null/NullTransport.h @@ -35,7 +35,7 @@ class NullTransport : public Transport virtual ~NullTransport(); void Open(const std::string &name, const Mode openMode, - const bool async = false) override; + const bool async = false, const bool directio = false) override; void SetBuffer(char *buffer, size_t size) override; diff --git a/source/adios2/toolkit/transport/shm/ShmSystemV.cpp b/source/adios2/toolkit/transport/shm/ShmSystemV.cpp index 774940732a..024d65e32c 100644 --- a/source/adios2/toolkit/transport/shm/ShmSystemV.cpp +++ b/source/adios2/toolkit/transport/shm/ShmSystemV.cpp @@ -49,7 +49,7 @@ ShmSystemV::~ShmSystemV() // this might not be correct } void ShmSystemV::Open(const std::string &name, const Mode openMode, - const bool async) + const bool async, const bool directio) { m_Name = name; CheckName(); diff --git a/source/adios2/toolkit/transport/shm/ShmSystemV.h b/source/adios2/toolkit/transport/shm/ShmSystemV.h index 36b539c9d7..fb8838448e 100644 --- a/source/adios2/toolkit/transport/shm/ShmSystemV.h +++ b/source/adios2/toolkit/transport/shm/ShmSystemV.h @@ -39,7 +39,7 @@ class ShmSystemV : public Transport ~ShmSystemV(); void Open(const std::string &name, const Mode openMode, - const bool async = false) final; + const bool async = false, const bool directio = false) final; void Write(const char *buffer, size_t size, size_t start = MaxSizeT) final; diff --git a/source/adios2/toolkit/transportman/TransportMan.cpp b/source/adios2/toolkit/transportman/TransportMan.cpp index 0539598525..c28c530848 100644 --- a/source/adios2/toolkit/transportman/TransportMan.cpp +++ b/source/adios2/toolkit/transportman/TransportMan.cpp @@ -632,6 +632,13 @@ std::shared_ptr TransportMan::OpenFileTransport( return helper::StringTo(AsyncOpen, ""); }; + auto lf_GetDirectIO = [&](const std::string defaultValue, + const Params ¶meters) -> bool { + std::string directio = defaultValue; + helper::SetParameterValue("DirectIO", parameters, directio); + return helper::StringTo(directio, ""); + }; + // BODY OF FUNCTION starts here std::shared_ptr transport; lf_SetFileTransport(lf_GetLibrary(DefaultFileLibrary, parameters), @@ -650,12 +657,14 @@ std::shared_ptr TransportMan::OpenFileTransport( if (useComm) { transport->OpenChain(fileName, openMode, chainComm, - lf_GetAsyncOpen("true", parameters)); + lf_GetAsyncOpen("false", parameters), + lf_GetDirectIO("false", parameters)); } else { transport->Open(fileName, openMode, - lf_GetAsyncOpen("false", parameters)); + lf_GetAsyncOpen("false", parameters), + lf_GetDirectIO("false", parameters)); } return transport; } diff --git a/testing/adios2/engine/bp/CMakeLists.txt b/testing/adios2/engine/bp/CMakeLists.txt index 34e0dccaee..0a7f85f4a8 100644 --- a/testing/adios2/engine/bp/CMakeLists.txt +++ b/testing/adios2/engine/bp/CMakeLists.txt @@ -124,7 +124,10 @@ endif() if(ADIOS2_HAVE_BP5) gtest_add_tests_helper(ParameterSelectSteps MPI_ALLOW BP Engine.BP. .BP5 WORKING_DIRECTORY ${BP5_DIR} EXTRA_ARGS "BP5" -) + ) + gtest_add_tests_helper(DirectIO MPI_ALLOW BP Engine.BP. .BP5 + WORKING_DIRECTORY ${BP5_DIR} EXTRA_ARGS "BP5" + ) endif(ADIOS2_HAVE_BP5) # BP3 only for now diff --git a/testing/adios2/engine/bp/TestBPDirectIO.cpp b/testing/adios2/engine/bp/TestBPDirectIO.cpp new file mode 100644 index 0000000000..3ce4eefa16 --- /dev/null +++ b/testing/adios2/engine/bp/TestBPDirectIO.cpp @@ -0,0 +1,140 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + */ +#include +#include + +#include +#include + +#include +#include + +#include + +std::string engineName; // comes from command line + +class ADIOSReadDirectIOTest : public ::testing::Test +{ +public: + ADIOSReadDirectIOTest() = default; +}; + +TEST_F(ADIOSReadDirectIOTest, BufferResize) +{ + /* Test proper buffer sizes when one chunk cannot hold all variables, + and the last chunck is resized back. It should be properly aligned + to not cause any problems at writing that chunk. + */ + std::string filename = "ADIOSDirectIO.bp"; + + int mpiRank = 0, mpiSize = 1; + +#if ADIOS2_USE_MPI + MPI_Comm_rank(MPI_COMM_WORLD, &mpiRank); + MPI_Comm_size(MPI_COMM_WORLD, &mpiSize); +#endif + + // Write test data using BP + { +#if ADIOS2_USE_MPI + adios2::ADIOS adios(MPI_COMM_WORLD); +#else + adios2::ADIOS adios; +#endif + adios2::IO ioWrite = adios.DeclareIO("TestIOWrite"); + ioWrite.SetEngine(engineName); + ioWrite.SetParameter("DirectIO", "true"); + ioWrite.SetParameter("DirectIOAlignOffset", "4096"); + ioWrite.SetParameter("DirectIOAlignBuffer", "4096"); + ioWrite.SetParameter("StripeSize", "9999"); + ioWrite.SetParameter("BufferChunkSize", "7111"); + // BufferChunkSize should be adjusted to 2*4096 by engine + // StripeSize should be adjusted to 3*4096 by engine + + adios2::Engine engine = ioWrite.Open(filename, adios2::Mode::Write); + // Number of elements per process + const std::size_t Nx = 2000; + // two variables fit in one Chunk but not three + adios2::Dims shape{static_cast(mpiSize * Nx)}; + adios2::Dims start{static_cast(mpiRank * Nx)}; + adios2::Dims count{static_cast(Nx)}; + + auto var0 = ioWrite.DefineVariable("var0", shape, start, count); + auto var1 = ioWrite.DefineVariable("var1", shape, start, count); + auto var2 = ioWrite.DefineVariable("var2", shape, start, count); + + std::vector a0(Nx, 'a'); + std::vector a1(Nx, 'b'); + std::vector a2(Nx, 'c'); + + engine.BeginStep(); + engine.Put(var0, a0.data()); + engine.Put(var1, a1.data()); + engine.Put(var2, a2.data()); + engine.EndStep(); + engine.Close(); + +#if ADIOS2_USE_MPI + MPI_Barrier(MPI_COMM_WORLD); +#endif + adios2::IO ioRead = adios.DeclareIO("TestIORead"); + ioRead.SetEngine(engineName); + adios2::Engine engine_s = ioRead.Open(filename, adios2::Mode::Read); + EXPECT_TRUE(engine_s); + try + { + engine_s.BeginStep(); + adios2::Variable var0 = ioRead.InquireVariable("var0"); + adios2::Variable var1 = ioRead.InquireVariable("var1"); + adios2::Variable var2 = ioRead.InquireVariable("var2"); + + EXPECT_TRUE(var0); + EXPECT_TRUE(var1); + EXPECT_TRUE(var2); + + std::vector res0; + var0.SetSelection({{Nx * mpiRank}, {Nx}}); + engine_s.Get(var0, res0, adios2::Mode::Sync); + EXPECT_EQ(res0, a0); + + std::vector res1; + var1.SetSelection({{Nx * mpiRank}, {Nx}}); + engine_s.Get(var1, res1, adios2::Mode::Sync); + EXPECT_EQ(res1, a1); + + std::vector res2; + var2.SetSelection({{Nx * mpiRank}, {Nx}}); + engine_s.Get(var2, res2, adios2::Mode::Sync); + EXPECT_EQ(res2, a2); + + engine_s.EndStep(); + } + catch (std::exception &e) + { + std::cout << "Exception " << e.what() << std::endl; + } + engine_s.Close(); + } +} + +int main(int argc, char **argv) +{ +#if ADIOS2_USE_MPI + MPI_Init(nullptr, nullptr); +#endif + ::testing::InitGoogleTest(&argc, argv); + + if (argc > 1) + { + engineName = std::string(argv[1]); + } + + int result = RUN_ALL_TESTS(); +#if ADIOS2_USE_MPI + MPI_Finalize(); +#endif + + return result; +}