From 42bda0fff6567fc0ae4d1b32da32a8752316528e Mon Sep 17 00:00:00 2001 From: Jason Wang Date: Mon, 24 Jan 2022 01:26:01 -0500 Subject: [PATCH 1/2] add operator object for decompression --- source/adios2/core/Operator.cpp | 6 +++--- source/adios2/core/Operator.h | 5 +++-- source/adios2/operator/OperatorFactory.cpp | 8 ++++++-- source/adios2/operator/OperatorFactory.h | 3 ++- source/adios2/operator/callback/Signature1.cpp | 3 ++- source/adios2/operator/callback/Signature2.cpp | 2 +- source/adios2/operator/compress/CompressBZIP2.cpp | 2 +- source/adios2/operator/compress/CompressBlosc.cpp | 2 +- .../adios2/operator/compress/CompressLibPressio.cpp | 2 +- source/adios2/operator/compress/CompressMGARD.cpp | 2 +- source/adios2/operator/compress/CompressNull.cpp | 2 +- source/adios2/operator/compress/CompressPNG.cpp | 2 +- source/adios2/operator/compress/CompressSZ.cpp | 2 +- source/adios2/operator/compress/CompressSirius.cpp | 2 +- source/adios2/operator/compress/CompressZFP.cpp | 2 +- .../adios2/toolkit/format/bp/bp4/BP4Deserializer.tcc | 12 +++++++++++- 16 files changed, 37 insertions(+), 20 deletions(-) diff --git a/source/adios2/core/Operator.cpp b/source/adios2/core/Operator.cpp index e9cf60e5c6..760462e675 100644 --- a/source/adios2/core/Operator.cpp +++ b/source/adios2/core/Operator.cpp @@ -16,9 +16,9 @@ namespace adios2 namespace core { -Operator::Operator(const std::string typeString, const OperatorType typeEnum, - const Params ¶meters) -: m_TypeString(typeString), m_TypeEnum(typeEnum), +Operator::Operator(const std::string &typeString, const OperatorType typeEnum, + const std::string &category, const Params ¶meters) +: m_TypeString(typeString), m_TypeEnum(typeEnum), m_Category(category), m_Parameters(helper::LowerCaseParams(parameters)) { } diff --git a/source/adios2/core/Operator.h b/source/adios2/core/Operator.h index b445852269..f82ae0b788 100644 --- a/source/adios2/core/Operator.h +++ b/source/adios2/core/Operator.h @@ -43,9 +43,10 @@ class Operator const std::string m_TypeString; const OperatorType m_TypeEnum; + const std::string m_Category; - Operator(const std::string typeString, const OperatorType typeEnum, - const Params ¶meters); + Operator(const std::string &typeString, const OperatorType typeEnum, + const std::string &category, const Params ¶meters); virtual ~Operator() = default; diff --git a/source/adios2/operator/OperatorFactory.cpp b/source/adios2/operator/OperatorFactory.cpp index da07688319..956a092c74 100644 --- a/source/adios2/operator/OperatorFactory.cpp +++ b/source/adios2/operator/OperatorFactory.cpp @@ -152,11 +152,15 @@ std::shared_ptr MakeOperator(const std::string &type, return ret; } -size_t Decompress(const char *bufferIn, const size_t sizeIn, char *dataOut) +size_t Decompress(const char *bufferIn, const size_t sizeIn, char *dataOut, + std::shared_ptr op) { Operator::OperatorType compressorType; std::memcpy(&compressorType, bufferIn, 1); - auto op = MakeOperator(OperatorTypeToString(compressorType), Params()); + if (op == nullptr || op->m_TypeEnum != compressorType) + { + op = MakeOperator(OperatorTypeToString(compressorType), {}); + } return op->InverseOperate(bufferIn, sizeIn, dataOut); } diff --git a/source/adios2/operator/OperatorFactory.h b/source/adios2/operator/OperatorFactory.h index bc3c5dcbb5..ccac962c38 100644 --- a/source/adios2/operator/OperatorFactory.h +++ b/source/adios2/operator/OperatorFactory.h @@ -20,7 +20,8 @@ namespace core std::shared_ptr MakeOperator(const std::string &type, const Params ¶meters); -size_t Decompress(const char *bufferIn, const size_t sizeIn, char *dataOut); +size_t Decompress(const char *bufferIn, const size_t sizeIn, char *dataOut, + std::shared_ptr op = nullptr); } // end namespace core } // end namespace adios2 diff --git a/source/adios2/operator/callback/Signature1.cpp b/source/adios2/operator/callback/Signature1.cpp index 8067e29f23..2644f245b0 100644 --- a/source/adios2/operator/callback/Signature1.cpp +++ b/source/adios2/operator/callback/Signature1.cpp @@ -23,7 +23,8 @@ namespace callback const size_t, const Dims &, const Dims &, \ const Dims &)> &function, \ const Params ¶meters) \ - : Operator("Signature1", Operator::CALLBACK_SIGNATURE1, parameters), \ + : Operator("Signature1", Operator::CALLBACK_SIGNATURE1, "callback", \ + parameters), \ m_Function##L(function) \ { \ } diff --git a/source/adios2/operator/callback/Signature2.cpp b/source/adios2/operator/callback/Signature2.cpp index 6683f21bde..3d76a75b63 100644 --- a/source/adios2/operator/callback/Signature2.cpp +++ b/source/adios2/operator/callback/Signature2.cpp @@ -22,7 +22,7 @@ Signature2::Signature2( const std::string &, const size_t, const Dims &, const Dims &, const Dims &)> &function, const Params ¶meters) -: Operator("Signature2", Operator::CALLBACK_SIGNATURE2, parameters), +: Operator("Signature2", Operator::CALLBACK_SIGNATURE2, "callback", parameters), m_Function(function) { } diff --git a/source/adios2/operator/compress/CompressBZIP2.cpp b/source/adios2/operator/compress/CompressBZIP2.cpp index 0e83bb29fc..fd2235fb72 100644 --- a/source/adios2/operator/compress/CompressBZIP2.cpp +++ b/source/adios2/operator/compress/CompressBZIP2.cpp @@ -28,7 +28,7 @@ namespace compress { CompressBZIP2::CompressBZIP2(const Params ¶meters) -: Operator("bzip2", COMPRESS_BZIP2, parameters) +: Operator("bzip2", COMPRESS_BZIP2, "compress", parameters) { } diff --git a/source/adios2/operator/compress/CompressBlosc.cpp b/source/adios2/operator/compress/CompressBlosc.cpp index fc98dbe3d7..9b1e25dc29 100644 --- a/source/adios2/operator/compress/CompressBlosc.cpp +++ b/source/adios2/operator/compress/CompressBlosc.cpp @@ -38,7 +38,7 @@ const std::set CompressBlosc::m_Compressors = { "blosclz", "lz4", "lz4hc", "snappy", "zlib", "zstd"}; CompressBlosc::CompressBlosc(const Params ¶meters) -: Operator("blosc", COMPRESS_BLOSC, parameters) +: Operator("blosc", COMPRESS_BLOSC, "compress", parameters) { } diff --git a/source/adios2/operator/compress/CompressLibPressio.cpp b/source/adios2/operator/compress/CompressLibPressio.cpp index b56c790408..0a3c913a6f 100644 --- a/source/adios2/operator/compress/CompressLibPressio.cpp +++ b/source/adios2/operator/compress/CompressLibPressio.cpp @@ -281,7 +281,7 @@ static pressio_compressor *adios_to_libpressio_compressor(Params const ¶ms) } CompressLibPressio::CompressLibPressio(const Params ¶meters) -: Operator("libpressio", COMPRESS_LIBPRESSIO, parameters) +: Operator("libpressio", COMPRESS_LIBPRESSIO, "compress", parameters) { } size_t CompressLibPressio::Operate(const char *dataIn, const Dims &blockStart, diff --git a/source/adios2/operator/compress/CompressMGARD.cpp b/source/adios2/operator/compress/CompressMGARD.cpp index 8a05ecc758..67dd158383 100644 --- a/source/adios2/operator/compress/CompressMGARD.cpp +++ b/source/adios2/operator/compress/CompressMGARD.cpp @@ -22,7 +22,7 @@ namespace compress { CompressMGARD::CompressMGARD(const Params ¶meters) -: Operator("mgard", COMPRESS_MGARD, parameters) +: Operator("mgard", COMPRESS_MGARD, "compress", parameters) { } diff --git a/source/adios2/operator/compress/CompressNull.cpp b/source/adios2/operator/compress/CompressNull.cpp index 9b3b557441..4fb7cb08b0 100644 --- a/source/adios2/operator/compress/CompressNull.cpp +++ b/source/adios2/operator/compress/CompressNull.cpp @@ -19,7 +19,7 @@ namespace compress { CompressNull::CompressNull(const Params ¶meters) -: Operator("null", COMPRESS_NULL, parameters) +: Operator("null", COMPRESS_NULL, "compress", parameters) { } diff --git a/source/adios2/operator/compress/CompressPNG.cpp b/source/adios2/operator/compress/CompressPNG.cpp index 01fa1de1a7..a6f2703099 100644 --- a/source/adios2/operator/compress/CompressPNG.cpp +++ b/source/adios2/operator/compress/CompressPNG.cpp @@ -44,7 +44,7 @@ const std::map> CompressPNG::m_BitDepths = { // PUBLIC CompressPNG::CompressPNG(const Params ¶meters) -: Operator("png", COMPRESS_PNG, parameters) +: Operator("png", COMPRESS_PNG, "compress", parameters) { } diff --git a/source/adios2/operator/compress/CompressSZ.cpp b/source/adios2/operator/compress/CompressSZ.cpp index 31a46bf253..7a6a4087ab 100644 --- a/source/adios2/operator/compress/CompressSZ.cpp +++ b/source/adios2/operator/compress/CompressSZ.cpp @@ -26,7 +26,7 @@ namespace compress { CompressSZ::CompressSZ(const Params ¶meters) -: Operator("sz", COMPRESS_SZ, parameters) +: Operator("sz", COMPRESS_SZ, "compress", parameters) { } diff --git a/source/adios2/operator/compress/CompressSirius.cpp b/source/adios2/operator/compress/CompressSirius.cpp index 42db452621..0969c525fb 100644 --- a/source/adios2/operator/compress/CompressSirius.cpp +++ b/source/adios2/operator/compress/CompressSirius.cpp @@ -27,7 +27,7 @@ int CompressSirius::m_Tiers = 0; bool CompressSirius::m_CurrentReadFinished = false; CompressSirius::CompressSirius(const Params ¶meters) -: Operator("sirius", COMPRESS_SIRIUS, parameters) +: Operator("sirius", COMPRESS_SIRIUS, "compress", parameters) { helper::GetParameter(parameters, "Tiers", m_Tiers); m_TierBuffersMap.resize(m_Tiers); diff --git a/source/adios2/operator/compress/CompressZFP.cpp b/source/adios2/operator/compress/CompressZFP.cpp index 719b775c6e..bff5cb5068 100644 --- a/source/adios2/operator/compress/CompressZFP.cpp +++ b/source/adios2/operator/compress/CompressZFP.cpp @@ -53,7 +53,7 @@ zfp_stream *GetZFPStream(const Dims &dimensions, DataType type, const Params ¶meters); CompressZFP::CompressZFP(const Params ¶meters) -: Operator("zfp", COMPRESS_ZFP, parameters) +: Operator("zfp", COMPRESS_ZFP, "compress", parameters) { } diff --git a/source/adios2/toolkit/format/bp/bp4/BP4Deserializer.tcc b/source/adios2/toolkit/format/bp/bp4/BP4Deserializer.tcc index a9c45b18fe..458855d2e4 100644 --- a/source/adios2/toolkit/format/bp/bp4/BP4Deserializer.tcc +++ b/source/adios2/toolkit/format/bp/bp4/BP4Deserializer.tcc @@ -524,7 +524,17 @@ void BP4Deserializer::PostDataRead( char *preOpData = m_ThreadBuffers[threadID][0].data(); const char *postOpData = m_ThreadBuffers[threadID][1].data(); - core::Decompress(postOpData, blockOperationInfo.PayloadSize, preOpData); + std::shared_ptr op = nullptr; + for (auto &o : blockInfo.Operations) + { + if (o->m_Category == "compress") + { + op = o; + break; + } + } + core::Decompress(postOpData, blockOperationInfo.PayloadSize, preOpData, + op); // clip block to match selection helper::ClipVector(m_ThreadBuffers[threadID][0], From 93ba7113073d94fb79948541c9c8d01c9ddf8cc0 Mon Sep 17 00:00:00 2001 From: Jason Wang Date: Mon, 24 Jan 2022 12:41:11 -0500 Subject: [PATCH 2/2] added nthread parameter into blosc decompression --- .../operator/compress/CompressBlosc.cpp | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/source/adios2/operator/compress/CompressBlosc.cpp b/source/adios2/operator/compress/CompressBlosc.cpp index 9b1e25dc29..2dd0beace8 100644 --- a/source/adios2/operator/compress/CompressBlosc.cpp +++ b/source/adios2/operator/compress/CompressBlosc.cpp @@ -326,6 +326,19 @@ size_t CompressBlosc::DecompressChunkedFormat(const char *bufferIn, { blosc_init(); + size_t threads = 1; // defaults + for (const auto &itParameter : m_Parameters) + { + const std::string key = itParameter.first; + const std::string value = itParameter.second; + if (key == "nthreads") + { + threads = static_cast(helper::StringTo( + value, "when setting Blosc nthreads parameter\n")); + } + } + blosc_set_nthreads(threads); + while (inputOffset < inputDataSize) { /* move over the size of the compressed data */ @@ -390,6 +403,18 @@ size_t CompressBlosc::DecompressOldFormat(const char *bufferIn, const size_t sizeOut) const { blosc_init(); + size_t threads = 1; // defaults + for (const auto &itParameter : m_Parameters) + { + const std::string key = itParameter.first; + const std::string value = itParameter.second; + if (key == "nthreads") + { + threads = static_cast(helper::StringTo( + value, "when setting Blosc nthreads parameter\n")); + } + } + blosc_set_nthreads(threads); const int decompressedSize = blosc_decompress(bufferIn, dataOut, sizeOut); blosc_destroy(); return static_cast(decompressedSize);