diff --git a/source/adios2/core/Operator.cpp b/source/adios2/core/Operator.cpp index 3e0d764449..c6b8d80fc8 100644 --- a/source/adios2/core/Operator.cpp +++ b/source/adios2/core/Operator.cpp @@ -9,8 +9,6 @@ */ #include "Operator.h" - -#include "adios2/common/ADIOSMacros.h" #include "adios2/helper/adiosFunctions.h" namespace adios2 diff --git a/source/adios2/core/Operator.h b/source/adios2/core/Operator.h index 8a79e2c12e..e6e4dfd239 100644 --- a/source/adios2/core/Operator.h +++ b/source/adios2/core/Operator.h @@ -13,15 +13,10 @@ #ifndef ADIOS2_CORE_OPERATOR_H_ #define ADIOS2_CORE_OPERATOR_H_ -/// \cond EXCLUDE_FROM_DOXYGEN -#include -#include -#include -#include -/// \endcond - #include "adios2/common/ADIOSMacros.h" #include "adios2/common/ADIOSTypes.h" +#include +#include namespace adios2 { @@ -125,6 +120,47 @@ class Operator return ret; } + template + void PutParameters(char *buffer, U &pos, const Params ¶meters) + { + uint8_t size = static_cast(parameters.size()); + PutParameter(buffer, pos, size); + for (const auto &p : parameters) + { + size = static_cast(p.first.size()); + PutParameter(buffer, pos, size); + + std::memcpy(buffer + pos, p.first.data(), size); + pos += size; + + size = static_cast(p.second.size()); + PutParameter(buffer, pos, size); + + std::memcpy(buffer + pos, p.second.data(), size); + pos += size; + } + } + + template + Params GetParameters(const char *buffer, U &pos) + { + Params ret; + uint8_t params = GetParameter(buffer, pos); + for (uint8_t i = 0; i < params; ++i) + { + uint8_t size = GetParameter(buffer, pos); + std::string key = + std::string(reinterpret_cast(buffer + pos), size); + pos += size; + size = GetParameter(buffer, pos); + std::string value = + std::string(reinterpret_cast(buffer + pos), size); + pos += size; + ret[key] = value; + } + return ret; + } + private: void CheckCallbackType(const std::string type) const; }; diff --git a/source/adios2/operator/compress/CompressBZIP2.cpp b/source/adios2/operator/compress/CompressBZIP2.cpp index e861b71fa0..8ea686a761 100644 --- a/source/adios2/operator/compress/CompressBZIP2.cpp +++ b/source/adios2/operator/compress/CompressBZIP2.cpp @@ -126,8 +126,8 @@ size_t CompressBZIP2::DecompressV1(const char *bufferIn, const size_t sizeIn, size_t bufferInOffset = 4; // skip the first four bytes - size_t sizeOut = GetParameter(bufferIn, bufferInOffset); - size_t batches = GetParameter(bufferIn, bufferInOffset); + size_t sizeOut = GetParameter(bufferIn, bufferInOffset); + size_t batches = GetParameter(bufferIn, bufferInOffset); int small = 0; int verbosity = 0; diff --git a/source/adios2/operator/compress/CompressBlosc.cpp b/source/adios2/operator/compress/CompressBlosc.cpp index d91ede36c3..2d0b91b570 100644 --- a/source/adios2/operator/compress/CompressBlosc.cpp +++ b/source/adios2/operator/compress/CompressBlosc.cpp @@ -234,7 +234,7 @@ size_t CompressBlosc::DecompressV1(const char *bufferIn, const size_t sizeIn, // DecompressV2 and keep this function for decompressing lagacy data. size_t bufferInOffset = 0; - size_t sizeOut = GetParameter(bufferIn, bufferInOffset); + size_t sizeOut = GetParameter(bufferIn, bufferInOffset); if (sizeIn - bufferInOffset < sizeof(DataHeader)) { throw("corrupted blosc buffer header"); diff --git a/source/adios2/operator/compress/CompressMGARD.cpp b/source/adios2/operator/compress/CompressMGARD.cpp index cc07122018..1abc929139 100644 --- a/source/adios2/operator/compress/CompressMGARD.cpp +++ b/source/adios2/operator/compress/CompressMGARD.cpp @@ -133,11 +133,11 @@ size_t CompressMGARD::DecompressV1(const char *bufferIn, const size_t sizeIn, size_t bufferInOffset = 0; - const size_t ndims = GetParameter(bufferIn, bufferInOffset); + const size_t ndims = GetParameter(bufferIn, bufferInOffset); Dims blockCount(ndims); for (size_t i = 0; i < ndims; ++i) { - blockCount[i] = GetParameter(bufferIn, bufferInOffset); + blockCount[i] = GetParameter(bufferIn, bufferInOffset); } const DataType type = GetParameter(bufferIn, bufferInOffset); diff --git a/source/adios2/operator/compress/CompressSZ.cpp b/source/adios2/operator/compress/CompressSZ.cpp index cd4a449b07..d0ecd02811 100644 --- a/source/adios2/operator/compress/CompressSZ.cpp +++ b/source/adios2/operator/compress/CompressSZ.cpp @@ -319,11 +319,11 @@ size_t CompressSZ::DecompressV1(const char *bufferIn, const size_t sizeIn, size_t bufferInOffset = 0; - const size_t ndims = GetParameter(bufferIn, bufferInOffset); + const size_t ndims = GetParameter(bufferIn, bufferInOffset); Dims blockCount(ndims); for (size_t i = 0; i < ndims; ++i) { - blockCount[i] = GetParameter(bufferIn, bufferInOffset); + blockCount[i] = GetParameter(bufferIn, bufferInOffset); } const DataType type = GetParameter(bufferIn, bufferInOffset); diff --git a/source/adios2/operator/compress/CompressSirius.cpp b/source/adios2/operator/compress/CompressSirius.cpp index 87ff912a8d..33247d6ec5 100644 --- a/source/adios2/operator/compress/CompressSirius.cpp +++ b/source/adios2/operator/compress/CompressSirius.cpp @@ -101,16 +101,16 @@ size_t CompressSirius::DecompressV1(const char *bufferIn, const size_t sizeIn, // DecompressV2 and keep this function for decompressing lagacy data. size_t bufferInOffset = 0; - const size_t ndims = GetParameter(bufferIn, bufferInOffset); + const size_t ndims = GetParameter(bufferIn, bufferInOffset); Dims blockStart(ndims); Dims blockCount(ndims); for (size_t i = 0; i < ndims; ++i) { - blockStart[i] = GetParameter(bufferIn, bufferInOffset); + blockStart[i] = GetParameter(bufferIn, bufferInOffset); } for (size_t i = 0; i < ndims; ++i) { - blockCount[i] = GetParameter(bufferIn, bufferInOffset); + blockCount[i] = GetParameter(bufferIn, bufferInOffset); } const DataType type = GetParameter(bufferIn, bufferInOffset); diff --git a/source/adios2/operator/compress/CompressZFP.cpp b/source/adios2/operator/compress/CompressZFP.cpp index b1d1affadf..6a7a0a6565 100644 --- a/source/adios2/operator/compress/CompressZFP.cpp +++ b/source/adios2/operator/compress/CompressZFP.cpp @@ -28,13 +28,33 @@ size_t CompressZFP::Compress(const char *dataIn, const Dims &blockStart, Params &info) { - Dims convertedDims = ConvertDims(blockCount, type, 3); + const uint8_t bufferVersion = 1; + size_t bufferOutOffset = 0; + + // Universal operator metadata + PutParameter(bufferOut, bufferOutOffset, OperatorType::Sz); + PutParameter(bufferOut, bufferOutOffset, bufferVersion); + bufferOutOffset += 2; + // Universal operator metadata end + const size_t ndims = blockCount.size(); + + // zfp V1 metadata + PutParameter(bufferOut, bufferOutOffset, ndims); + for (const auto &d : blockCount) + { + PutParameter(bufferOut, bufferOutOffset, d); + } + PutParameter(bufferOut, bufferOutOffset, type); + PutParameters(bufferOut, bufferOutOffset, parameters); + // zfp V1 metadata end + + Dims convertedDims = ConvertDims(blockCount, type, 3); zfp_field *field = GetZFPField(dataIn, convertedDims, type); zfp_stream *stream = GetZFPStream(convertedDims, type, parameters); size_t maxSize = zfp_stream_maximum_size(stream, field); // associate bitstream - bitstream *bitstream = stream_open(bufferOut, maxSize); + bitstream *bitstream = stream_open(bufferOut + bufferOutOffset, maxSize); zfp_stream_set_bit_stream(stream, bitstream); zfp_stream_rewind(stream); @@ -46,24 +66,41 @@ size_t CompressZFP::Compress(const char *dataIn, const Dims &blockStart, "size is 0, in call to Compress"); } + bufferOutOffset += sizeOut; + zfp_field_free(field); zfp_stream_close(stream); stream_close(bitstream); - return sizeOut; + return bufferOutOffset; } -size_t CompressZFP::Decompress(const char *bufferIn, const size_t sizeIn, - char *dataOut, const DataType type, - const Dims &blockStart, const Dims &blockCount, - const Params ¶meters, Params &info) +size_t CompressZFP::DecompressV1(const char *bufferIn, const size_t sizeIn, + char *dataOut) { + // Do NOT remove even if the buffer version is updated. Data might be still + // in lagacy formats. This function must be kept for backward compatibility. + // If a newer buffer format is implemented, create another function, e.g. + // DecompressV2 and keep this function for decompressing lagacy data. + + size_t bufferInOffset = 0; + + const size_t ndims = GetParameter(bufferIn, bufferInOffset); + Dims blockCount(ndims); + for (size_t i = 0; i < ndims; ++i) + { + blockCount[i] = GetParameter(bufferIn, bufferInOffset); + } + const DataType type = GetParameter(bufferIn, bufferInOffset); + const Params parameters = GetParameters(bufferIn, bufferInOffset); + Dims convertedDims = ConvertDims(blockCount, type, 3); zfp_field *field = GetZFPField(dataOut, convertedDims, type); zfp_stream *stream = GetZFPStream(convertedDims, type, parameters); // associate bitstream - bitstream *bitstream = stream_open(const_cast(bufferIn), sizeIn); + bitstream *bitstream = stream_open( + const_cast(bufferIn + bufferInOffset), sizeIn - bufferInOffset); zfp_stream_set_bit_stream(stream, bitstream); zfp_stream_rewind(stream); @@ -80,13 +117,41 @@ size_t CompressZFP::Decompress(const char *bufferIn, const size_t sizeIn, zfp_stream_close(stream); stream_close(bitstream); - const size_t typeSizeBytes = helper::GetDataTypeSize(type); const size_t dataSizeBytes = - helper::GetTotalSize(convertedDims) * typeSizeBytes; + helper::GetTotalSize(convertedDims, helper::GetDataTypeSize(type)); return dataSizeBytes; } +size_t CompressZFP::Decompress(const char *bufferIn, const size_t sizeIn, + char *dataOut, const DataType /*type*/, + const Dims & /*blockStart*/, + const Dims & /*blockCount*/, + const Params & /*parameters*/, Params &info) +{ + size_t bufferInOffset = 1; // skip operator type + const uint8_t bufferVersion = + GetParameter(bufferIn, bufferInOffset); + bufferInOffset += 2; // skip two reserved bytes + + if (bufferVersion == 1) + { + return DecompressV1(bufferIn + bufferInOffset, sizeIn - bufferInOffset, + dataOut); + } + else if (bufferVersion == 2) + { + // TODO: if a Version 2 zfp buffer is being implemented, put it here + // and keep the DecompressV1 routine for backward compatibility + } + else + { + throw("unknown zfp buffer version"); + } + + return 0; +} + bool CompressZFP::IsDataTypeValid(const DataType type) const { #define declare_type(T) \ diff --git a/source/adios2/operator/compress/CompressZFP.h b/source/adios2/operator/compress/CompressZFP.h index 4887455ac0..e5dfd02731 100644 --- a/source/adios2/operator/compress/CompressZFP.h +++ b/source/adios2/operator/compress/CompressZFP.h @@ -94,6 +94,18 @@ class CompressZFP : public Operator * @param hint extra exception information */ void CheckStatus(const int status, const std::string hint) const; + + /** + * Decompress function for V1 buffer. Do NOT remove even if the buffer + * version is updated. Data might be still in lagacy formats. This function + * must be kept for backward compatibility + * @param bufferIn : compressed data buffer (V1 only) + * @param sizeIn : number of bytes in bufferIn + * @param dataOut : decompressed data buffer + * @return : number of bytes in dataOut + */ + size_t DecompressV1(const char *bufferIn, const size_t sizeIn, + char *dataOut); }; } // end namespace compress