Skip to content

Commit

Permalink
Merge pull request #3014 from JasonRuonanWang/operator
Browse files Browse the repository at this point in the history
Allow nthreads parameter for blosc decompression
  • Loading branch information
JasonRuonanWang authored Jan 25, 2022
2 parents a92620a + 93ba711 commit 3fd1ea7
Show file tree
Hide file tree
Showing 16 changed files with 62 additions and 20 deletions.
6 changes: 3 additions & 3 deletions source/adios2/core/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ namespace adios2
namespace core
{

Operator::Operator(const std::string typeString, const OperatorType typeEnum,
const Params &parameters)
: m_TypeString(typeString), m_TypeEnum(typeEnum),
Operator::Operator(const std::string &typeString, const OperatorType typeEnum,
const std::string &category, const Params &parameters)
: m_TypeString(typeString), m_TypeEnum(typeEnum), m_Category(category),
m_Parameters(helper::LowerCaseParams(parameters))
{
}
Expand Down
5 changes: 3 additions & 2 deletions source/adios2/core/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ class Operator

const std::string m_TypeString;
const OperatorType m_TypeEnum;
const std::string m_Category;

Operator(const std::string typeString, const OperatorType typeEnum,
const Params &parameters);
Operator(const std::string &typeString, const OperatorType typeEnum,
const std::string &category, const Params &parameters);

virtual ~Operator() = default;

Expand Down
8 changes: 6 additions & 2 deletions source/adios2/operator/OperatorFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,15 @@ std::shared_ptr<Operator> MakeOperator(const std::string &type,
return ret;
}

size_t Decompress(const char *bufferIn, const size_t sizeIn, char *dataOut)
size_t Decompress(const char *bufferIn, const size_t sizeIn, char *dataOut,
std::shared_ptr<Operator> op)
{
Operator::OperatorType compressorType;
std::memcpy(&compressorType, bufferIn, 1);
auto op = MakeOperator(OperatorTypeToString(compressorType), Params());
if (op == nullptr || op->m_TypeEnum != compressorType)
{
op = MakeOperator(OperatorTypeToString(compressorType), {});
}
return op->InverseOperate(bufferIn, sizeIn, dataOut);
}

Expand Down
3 changes: 2 additions & 1 deletion source/adios2/operator/OperatorFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ namespace core
std::shared_ptr<Operator> MakeOperator(const std::string &type,
const Params &parameters);

size_t Decompress(const char *bufferIn, const size_t sizeIn, char *dataOut);
size_t Decompress(const char *bufferIn, const size_t sizeIn, char *dataOut,
std::shared_ptr<Operator> op = nullptr);

} // end namespace core
} // end namespace adios2
3 changes: 2 additions & 1 deletion source/adios2/operator/callback/Signature1.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ namespace callback
const size_t, const Dims &, const Dims &, \
const Dims &)> &function, \
const Params &parameters) \
: Operator("Signature1", Operator::CALLBACK_SIGNATURE1, parameters), \
: Operator("Signature1", Operator::CALLBACK_SIGNATURE1, "callback", \
parameters), \
m_Function##L(function) \
{ \
}
Expand Down
2 changes: 1 addition & 1 deletion source/adios2/operator/callback/Signature2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Signature2::Signature2(
const std::string &, const size_t, const Dims &,
const Dims &, const Dims &)> &function,
const Params &parameters)
: Operator("Signature2", Operator::CALLBACK_SIGNATURE2, parameters),
: Operator("Signature2", Operator::CALLBACK_SIGNATURE2, "callback", parameters),
m_Function(function)
{
}
Expand Down
2 changes: 1 addition & 1 deletion source/adios2/operator/compress/CompressBZIP2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace compress
{

CompressBZIP2::CompressBZIP2(const Params &parameters)
: Operator("bzip2", COMPRESS_BZIP2, parameters)
: Operator("bzip2", COMPRESS_BZIP2, "compress", parameters)
{
}

Expand Down
27 changes: 26 additions & 1 deletion source/adios2/operator/compress/CompressBlosc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const std::set<std::string> CompressBlosc::m_Compressors = {
"blosclz", "lz4", "lz4hc", "snappy", "zlib", "zstd"};

CompressBlosc::CompressBlosc(const Params &parameters)
: Operator("blosc", COMPRESS_BLOSC, parameters)
: Operator("blosc", COMPRESS_BLOSC, "compress", parameters)
{
}

Expand Down Expand Up @@ -326,6 +326,19 @@ size_t CompressBlosc::DecompressChunkedFormat(const char *bufferIn,
{
blosc_init();

size_t threads = 1; // defaults
for (const auto &itParameter : m_Parameters)
{
const std::string key = itParameter.first;
const std::string value = itParameter.second;
if (key == "nthreads")
{
threads = static_cast<int>(helper::StringTo<int32_t>(
value, "when setting Blosc nthreads parameter\n"));
}
}
blosc_set_nthreads(threads);

while (inputOffset < inputDataSize)
{
/* move over the size of the compressed data */
Expand Down Expand Up @@ -390,6 +403,18 @@ size_t CompressBlosc::DecompressOldFormat(const char *bufferIn,
const size_t sizeOut) const
{
blosc_init();
size_t threads = 1; // defaults
for (const auto &itParameter : m_Parameters)
{
const std::string key = itParameter.first;
const std::string value = itParameter.second;
if (key == "nthreads")
{
threads = static_cast<int>(helper::StringTo<int32_t>(
value, "when setting Blosc nthreads parameter\n"));
}
}
blosc_set_nthreads(threads);
const int decompressedSize = blosc_decompress(bufferIn, dataOut, sizeOut);
blosc_destroy();
return static_cast<size_t>(decompressedSize);
Expand Down
2 changes: 1 addition & 1 deletion source/adios2/operator/compress/CompressLibPressio.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ static pressio_compressor *adios_to_libpressio_compressor(Params const &params)
}

CompressLibPressio::CompressLibPressio(const Params &parameters)
: Operator("libpressio", COMPRESS_LIBPRESSIO, parameters)
: Operator("libpressio", COMPRESS_LIBPRESSIO, "compress", parameters)
{
}
size_t CompressLibPressio::Operate(const char *dataIn, const Dims &blockStart,
Expand Down
2 changes: 1 addition & 1 deletion source/adios2/operator/compress/CompressMGARD.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ namespace compress
{

CompressMGARD::CompressMGARD(const Params &parameters)
: Operator("mgard", COMPRESS_MGARD, parameters)
: Operator("mgard", COMPRESS_MGARD, "compress", parameters)
{
}

Expand Down
2 changes: 1 addition & 1 deletion source/adios2/operator/compress/CompressNull.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ namespace compress
{

CompressNull::CompressNull(const Params &parameters)
: Operator("null", COMPRESS_NULL, parameters)
: Operator("null", COMPRESS_NULL, "compress", parameters)
{
}

Expand Down
2 changes: 1 addition & 1 deletion source/adios2/operator/compress/CompressPNG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ const std::map<std::string, std::set<uint32_t>> CompressPNG::m_BitDepths = {

// PUBLIC
CompressPNG::CompressPNG(const Params &parameters)
: Operator("png", COMPRESS_PNG, parameters)
: Operator("png", COMPRESS_PNG, "compress", parameters)
{
}

Expand Down
2 changes: 1 addition & 1 deletion source/adios2/operator/compress/CompressSZ.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ namespace compress
{

CompressSZ::CompressSZ(const Params &parameters)
: Operator("sz", COMPRESS_SZ, parameters)
: Operator("sz", COMPRESS_SZ, "compress", parameters)
{
}

Expand Down
2 changes: 1 addition & 1 deletion source/adios2/operator/compress/CompressSirius.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ int CompressSirius::m_Tiers = 0;
bool CompressSirius::m_CurrentReadFinished = false;

CompressSirius::CompressSirius(const Params &parameters)
: Operator("sirius", COMPRESS_SIRIUS, parameters)
: Operator("sirius", COMPRESS_SIRIUS, "compress", parameters)
{
helper::GetParameter(parameters, "Tiers", m_Tiers);
m_TierBuffersMap.resize(m_Tiers);
Expand Down
2 changes: 1 addition & 1 deletion source/adios2/operator/compress/CompressZFP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ zfp_stream *GetZFPStream(const Dims &dimensions, DataType type,
const Params &parameters);

CompressZFP::CompressZFP(const Params &parameters)
: Operator("zfp", COMPRESS_ZFP, parameters)
: Operator("zfp", COMPRESS_ZFP, "compress", parameters)
{
}

Expand Down
12 changes: 11 additions & 1 deletion source/adios2/toolkit/format/bp/bp4/BP4Deserializer.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,17 @@ void BP4Deserializer::PostDataRead(
char *preOpData = m_ThreadBuffers[threadID][0].data();
const char *postOpData = m_ThreadBuffers[threadID][1].data();

core::Decompress(postOpData, blockOperationInfo.PayloadSize, preOpData);
std::shared_ptr<core::Operator> op = nullptr;
for (auto &o : blockInfo.Operations)
{
if (o->m_Category == "compress")
{
op = o;
break;
}
}
core::Decompress(postOpData, blockOperationInfo.PayloadSize, preOpData,
op);

// clip block to match selection
helper::ClipVector(m_ThreadBuffers[threadID][0],
Expand Down

0 comments on commit 3fd1ea7

Please sign in to comment.