Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

State Response compressor /decompressor is implemented in zstd, but disabled yet #2195

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/network/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ add_library(network
impl/peer_manager_impl.cpp
impl/reputation_repository_impl.cpp
helpers/scale_message_read_writer.cpp
helpers/compressor/zstd_stream_compressor.cpp
adapters/adapter_errors.cpp
impl/protocols/protocol_req_pov.cpp
warp/cache.cpp
Expand Down
2 changes: 1 addition & 1 deletion core/network/adapters/protobuf_state_response.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,4 @@ namespace kagome::network {
}
};

} // namespace kagome::network
} // namespace kagome::network
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert, missing empty line at file end

22 changes: 22 additions & 0 deletions core/network/helpers/compressor/compressor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/**
* Copyright Quadrivium LLC
* All Rights Reserved
* SPDX-License-Identifier: Apache-2.0
*/

#pragma once

#include <cstdint>
#include <span>
#include <vector>

#include "outcome/outcome.hpp"

namespace kagome::network {
struct ICompressor {
virtual ~ICompressor() = default;
virtual outcome::result<std::vector<uint8_t>> compress(std::span<uint8_t> data) = 0;
virtual outcome::result<std::vector<uint8_t>> decompress(std::span<uint8_t> compressedData) = 0;
Comment on lines +18 to +19
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
virtual outcome::result<std::vector<uint8_t>> compress(std::span<uint8_t> data) = 0;
virtual outcome::result<std::vector<uint8_t>> decompress(std::span<uint8_t> compressedData) = 0;
virtual outcome::result<Buffer> compress(BufferView data) const = 0;
virtual outcome::result<Buffer> decompress(BufferView compressedData) const = 0;

};

} // namespace kagome::network
149 changes: 149 additions & 0 deletions core/network/helpers/compressor/zstd_error.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/**
* Copyright Quadrivium LLC
* All Rights Reserved
* SPDX-License-Identifier: Apache-2.0
*/

#pragma once

#include <zstd_errors.h>

#include <qtils/enum_error_code.hpp>

namespace kagome::network {
enum class ZstdStreamCompressorError {
Comment on lines +13 to +14
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
namespace kagome::network {
enum class ZstdStreamCompressorError {
Q_ENUM_ERROR_CODE(ZSTD_ErrorCode) {
return ZSTD_getErrorString(static_cast<ZSTD_ErrorCode>(e));
}

UNKNOWN,
EXCEPTION,
UNKNOWN_EXCEPTION,
CONTEXT_ERROR,
ERROR_GENERIC,
PREFIX_UNKNOWN,
VERSION_UNSUPPORTED,
PARAMETER_UNKNOWN,
FRAME_PARAMETER_UNSUPPORTED,
FRAME_PARAMETER_WINDOW_TOO_LARGE,
COMPRESSION_PARAMETER_UNSUPPORTED,
INIT_MISSING,
MEMORY_ALLOCATION,
STAGE_WRONG,
DST_SIZE_TOO_SMALL,
SRC_SIZE_WRONG,
CORRUPTION_DETECTED,
CHECKSUM_WRONG,
TABLE_LOG_TOO_LARGE,
MAX_SYMBOL_VALUE_TOO_LARGE,
MAX_SYMBOL_VALUE_TOO_SMALL,
DICTIONARY_CORRUPTED,
DICTIONARY_WRONG,
DICTIONARY_CREATION_FAILED,
MAX_CODE,
};

ZstdStreamCompressorError convertErrorCode(ZSTD_ErrorCode errorCode) {
switch (errorCode) {
case ZSTD_error_no_error:
return ZstdStreamCompressorError::UNKNOWN;
case ZSTD_error_GENERIC:
return ZstdStreamCompressorError::ERROR_GENERIC;
case ZSTD_error_prefix_unknown:
return ZstdStreamCompressorError::PREFIX_UNKNOWN;
case ZSTD_error_version_unsupported:
return ZstdStreamCompressorError::VERSION_UNSUPPORTED;
case ZSTD_error_parameter_unsupported:
return ZstdStreamCompressorError::COMPRESSION_PARAMETER_UNSUPPORTED;
case ZSTD_error_frameParameter_unsupported:
return ZstdStreamCompressorError::FRAME_PARAMETER_UNSUPPORTED;
case ZSTD_error_frameParameter_windowTooLarge:
return ZstdStreamCompressorError::FRAME_PARAMETER_WINDOW_TOO_LARGE;
case ZSTD_error_init_missing:
return ZstdStreamCompressorError::INIT_MISSING;
case ZSTD_error_memory_allocation:
return ZstdStreamCompressorError::MEMORY_ALLOCATION;
case ZSTD_error_stage_wrong:
return ZstdStreamCompressorError::STAGE_WRONG;
case ZSTD_error_dstSize_tooSmall:
return ZstdStreamCompressorError::DST_SIZE_TOO_SMALL;
case ZSTD_error_srcSize_wrong:
return ZstdStreamCompressorError::SRC_SIZE_WRONG;
case ZSTD_error_corruption_detected:
return ZstdStreamCompressorError::CORRUPTION_DETECTED;
case ZSTD_error_checksum_wrong:
return ZstdStreamCompressorError::CHECKSUM_WRONG;
case ZSTD_error_tableLog_tooLarge:
return ZstdStreamCompressorError::TABLE_LOG_TOO_LARGE;
case ZSTD_error_maxSymbolValue_tooLarge:
return ZstdStreamCompressorError::MAX_SYMBOL_VALUE_TOO_LARGE;
case ZSTD_error_maxSymbolValue_tooSmall:
return ZstdStreamCompressorError::MAX_SYMBOL_VALUE_TOO_SMALL;
case ZSTD_error_dictionary_corrupted:
return ZstdStreamCompressorError::DICTIONARY_CORRUPTED;
case ZSTD_error_dictionary_wrong:
return ZstdStreamCompressorError::DICTIONARY_WRONG;
case ZSTD_error_dictionaryCreation_failed:
return ZstdStreamCompressorError::DICTIONARY_CREATION_FAILED;
case ZSTD_error_maxCode:
return ZstdStreamCompressorError::MAX_CODE;
default:
return ZstdStreamCompressorError::UNKNOWN;
}
}

Q_ENUM_ERROR_CODE(ZstdStreamCompressorError) {
using E = decltype(e);
switch (e) {
case E::UNKNOWN:
return "Unknown error";
case E::CONTEXT_ERROR:
return "Failed to create ZSTD compression context";
case E::ERROR_GENERIC:
return "Generic error";
case E::PREFIX_UNKNOWN:
return "Unknown prefix";
case E::VERSION_UNSUPPORTED:
return "Unsupported version";
case E::PARAMETER_UNKNOWN:
return "Unknown parameter";
case E::FRAME_PARAMETER_UNSUPPORTED:
return "Unsupported frame parameter";
case E::FRAME_PARAMETER_WINDOW_TOO_LARGE:
return "Frame parameter window too large";
case E::COMPRESSION_PARAMETER_UNSUPPORTED:
return "Unsupported compression parameter";
case E::INIT_MISSING:
return "Init missing";
case E::MEMORY_ALLOCATION:
return "Memory allocation error";
case E::STAGE_WRONG:
return "Wrong stage";
case E::DST_SIZE_TOO_SMALL:
return "Destination size too small";
case E::SRC_SIZE_WRONG:
return "Wrong source size";
case E::CORRUPTION_DETECTED:
return "Corruption detected";
case E::CHECKSUM_WRONG:
return "Wrong checksum";
case E::TABLE_LOG_TOO_LARGE:
return "Table log too large";
case E::MAX_SYMBOL_VALUE_TOO_LARGE:
return "Max symbol value too large";
case E::MAX_SYMBOL_VALUE_TOO_SMALL:
return "Max symbol value too small";
case E::DICTIONARY_CORRUPTED:
return "Dictionary corrupted";
case E::DICTIONARY_WRONG:
return "Wrong dictionary";
case E::DICTIONARY_CREATION_FAILED:
return "Dictionary creation failed";
case E::MAX_CODE:
return "Max code";
case E::EXCEPTION:
return "Exception";
case E::UNKNOWN_EXCEPTION:
return "Unknown exception";
default:
return "Unknown error";
}
}

} // namespace kagome::network
95 changes: 95 additions & 0 deletions core/network/helpers/compressor/zstd_stream_compressor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/**
* Copyright Quadrivium LLC
* All Rights Reserved
* SPDX-License-Identifier: Apache-2.0
*/

#include <memory>
#include <exception>

#include <zstd.h>

#include "zstd_stream_compressor.h"

#include "zstd_error.h"

namespace kagome::network {
outcome::result<std::vector<uint8_t>> ZstdStreamCompressor::compress(std::span<uint8_t> data) try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use ZSTD_compress/ZSTD_decompress like uncompressCodeIfNeeded?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it won't work, I tried it. The sream compression is used here, you can find the reference to rust code in RFC

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are compatible, I tested.

But stream decompress would be useful 👍
because ZSTD_getFrameContentSize doesn't work (decompressed size field is empty).
(may move to utils/zstd_decompress.hpp and reuse inside uncompressCodeIfNeeded)

Copy link
Contributor Author

@ErakhtinB ErakhtinB Sep 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested it directly with polkadot instance running on this commit, which is mentioned in FRC liuchengxu/polkadot-sdk@2556fef#diff-d9656480fbba5813d9d8ff38b5f0661a6019d0b793024a751a1140034fc32747R268
The stream is mentioned there as well. You can try to use ZSTD_compress/ZSTD_decompress in this situation and tell then if it's compatible

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::unique_ptr<ZSTD_CCtx, void(*)(ZSTD_CCtx*)> cctx(
ZSTD_createCCtx(),
[](ZSTD_CCtx* c) { ZSTD_freeCCtx(c); }
);

if (!cctx) {
return ZstdStreamCompressorError::CONTEXT_ERROR;
}

const auto setParameterResult = ZSTD_CCtx_setParameter(cctx.get(), ZSTD_c_compressionLevel, m_compressionLevel);
if (ZSTD_isError(setParameterResult)) {
return convertErrorCode(ZSTD_getErrorCode(setParameterResult));
}

const auto maxCompressedSize = ZSTD_compressBound(data.size());
std::vector<uint8_t> compressedData(maxCompressedSize);

ZSTD_inBuffer input = { data.data(), data.size(), 0 };
ZSTD_outBuffer output = { compressedData.data(), compressedData.size(), 0 };

while (input.pos < input.size) {
const auto compressResult = ZSTD_compressStream(cctx.get(), &output, &input);
if (ZSTD_isError(compressResult)) {
return convertErrorCode(ZSTD_getErrorCode(compressResult));
}
}

size_t remaining;
do {
remaining = ZSTD_endStream(cctx.get(), &output);
if (ZSTD_isError(remaining)) {
return convertErrorCode(ZSTD_getErrorCode(remaining));
}
} while (remaining > 0);

compressedData.resize(output.pos);

return compressedData;
} catch (const std::exception& e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove try-catch, zstd functions return error code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exceptions are still possible from working with spans/vectors.

return ZstdStreamCompressorError::EXCEPTION;
}
catch (...) {
return ZstdStreamCompressorError::UNKNOWN_EXCEPTION;
}

outcome::result<std::vector<uint8_t>> ZstdStreamCompressor::decompress(std::span<uint8_t> compressedData) try {
std::unique_ptr<ZSTD_DCtx, void(*)(ZSTD_DCtx*)> dctx(
ZSTD_createDCtx(),
[](ZSTD_DCtx* d) { ZSTD_freeDCtx(d); }
);
if (dctx == nullptr) {
return ZstdStreamCompressorError::CONTEXT_ERROR;
}

std::vector<uint8_t> decompressedData;
std::vector<uint8_t> outBuffer(ZSTD_DStreamOutSize());

ZSTD_inBuffer input = { compressedData.data(), compressedData.size(), 0 };
ZSTD_outBuffer output = { outBuffer.data(), outBuffer.size(), 0 };

while (input.pos < input.size) {
size_t ret = ZSTD_decompressStream(dctx.get(), &output, &input);
if (ZSTD_isError(ret)) {
return convertErrorCode(ZSTD_getErrorCode(ret));
}

decompressedData.insert(decompressedData.end(), outBuffer.data(), outBuffer.data() + output.pos);
output.pos = 0;
}

return decompressedData;
} catch (const std::exception& e) {
return ZstdStreamCompressorError::EXCEPTION;
} catch (...) {
return ZstdStreamCompressorError::UNKNOWN_EXCEPTION;
}

} // namespace kagome::network
19 changes: 19 additions & 0 deletions core/network/helpers/compressor/zstd_stream_compressor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/**
* Copyright Quadrivium LLC
* All Rights Reserved
* SPDX-License-Identifier: Apache-2.0
*/

#pragma once

#include "compressor.h"
namespace kagome::network {
struct ZstdStreamCompressor : public ICompressor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ICompressor doesn't work with streams

Suggested change
struct ZstdStreamCompressor : public ICompressor {
struct ZstdCompressor : public ICompressor {

ZstdStreamCompressor(int compressionLevel = 3) : m_compressionLevel(compressionLevel) {}
outcome::result<std::vector<uint8_t>> compress(std::span<uint8_t> data) override;
outcome::result<std::vector<uint8_t>> decompress(std::span<uint8_t> compressedData) override;
private:
int m_compressionLevel;
};

} // namespace kagome::network
52 changes: 40 additions & 12 deletions core/network/helpers/protobuf_message_read_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "network/adapters/protobuf.hpp"
#include "network/adapters/uvar.hpp"
#include "network/helpers/message_read_writer.hpp"
#include "network/helpers/compressor/compressor.h"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of mixing protobuf and compression,
you can compose them

struct ZstdMessageReadWriter : MessageReadWriter {
  ZstdMessageReadWriter(MessageReadWriter);
};
struct ProtobufMessageReadWriter {
  ProtobufMessageReadWriter(MessageReadWriter);
};
auto state_sync_read_writer =
  ProtobufMessageReadWriter(
    ZstdMessageReadWriter(
      MessageReadWriterUvarint(
        stream)));

#include "scale/scale.hpp"

namespace kagome::network {
Expand All @@ -42,13 +43,22 @@ namespace kagome::network {
* @param cb to be called, when the message is read, or error happens
*/
template <typename MsgType>
void read(ReadCallback<MsgType> &&cb) const {
void read(ReadCallback<MsgType> &&cb, std::shared_ptr<ICompressor> decompressor = nullptr
) const {
read_writer_->read(
[self{shared_from_this()}, cb = std::move(cb)](auto &&read_res) {
[self{shared_from_this()}, cb = std::move(cb), decompressor](auto &&read_res) {
if (!read_res) {
return cb(read_res.error());
}

if (decompressor) {
std::span<uint8_t> compressed{read_res.value()->begin(), read_res.value()->end()};
auto compressionRes = decompressor->decompress(compressed);
if (!compressionRes) {
return cb(outcome::failure(compressionRes.error()));
}
*read_res.value() = std::move(compressionRes.value());
}
using ProtobufRW =
MessageReadWriter<ProtobufMessageAdapter<MsgType>, NoSink>;

Expand All @@ -72,7 +82,7 @@ namespace kagome::network {
*/
template <typename MsgType>
void write(const MsgType &msg,
libp2p::basic::Writer::WriteCallbackFunc &&cb) const {
libp2p::basic::Writer::WriteCallbackFunc &&cb, std::shared_ptr<ICompressor> compressor = nullptr) const {
using ProtobufRW =
MessageReadWriter<ProtobufMessageAdapter<MsgType>, NoSink>;

Expand All @@ -83,15 +93,33 @@ namespace kagome::network {
std::span<uint8_t> data(it.base(),
out.size() - std::distance(out.begin(), it));

read_writer_->write(data,
[self{shared_from_this()},
out{std::move(out)},
cb = std::move(cb)](auto &&write_res) {
if (!write_res) {
return cb(write_res.error());
}
cb(outcome::success());
});
if (compressor == nullptr) {
read_writer_->write(data,
[self{shared_from_this()},
out{std::move(out)},
cb = std::move(cb)](auto &&write_res) {
if (!write_res) {
return cb(write_res.error());
}
cb(outcome::success());
});
} else {
auto compressionRes = compressor->compress(data);
if (!compressionRes) {
return cb(outcome::failure(compressionRes.error()));
}
auto compressedData = std::move(compressionRes.value());
std::span<uint8_t> compressedDataSpan(compressedData.data(), compressedData.size());
read_writer_->write(compressedDataSpan,
[self{shared_from_this()},
compressedData{std::move(compressedData)},
cb = std::move(cb)](auto &&write_res) {
if (!write_res) {
return cb(write_res.error());
}
cb(outcome::success());
});
}
}
};

Expand Down
Loading
Loading