Skip to content

Commit

Permalink
skip on compression
Browse files Browse the repository at this point in the history
Summary:
Updates the how hashing is skipped when compression is enabled. Looks at the rpc metadata struct to determine if the data is compressed or not be hashing. Hashing can be skipped when compressing because compression provides its own hashing. The checksumming will only run on uncompressed data.

Also improves the error message to match what the existing crc32 exception returns so this has the correct error codes.

Reviewed By: cevans87, avalonalex

Differential Revision: D66292663

fbshipit-source-id: e3d646b21f64e0df031a481dae94b0b065aee925
  • Loading branch information
Robert Roeser authored and facebook-github-bot committed Nov 27, 2024
1 parent 1b06194 commit e724f66
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 23 deletions.
17 changes: 0 additions & 17 deletions thrift/conformance/stresstest/StressTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,23 +47,6 @@ int main(int argc, char* argv[]) {

// create a test runner instance
auto clientCfg = ClientConfig::createFromFlags();
if (clientCfg.enableChecksum) {
LOG(INFO) << "Initializing checksum payload serializer" << std::endl;
PayloadSerializer::initialize(
ChecksumPayloadSerializerStrategy<LegacyPayloadSerializerStrategy>(
ChecksumPayloadSerializerStrategyOptions{
.recordChecksumFailure =
[] { LOG(FATAL) << "Checksum failure detected"; },
.recordChecksumSuccess =
[] {
LOG_EVERY_N(INFO, 1'000'000)
<< "Checksum success detected";
},
.recordChecksumCalculated =
[] {
LOG_EVERY_N(INFO, 1'000'000) << "Checksum calculated";
}}));
}

TestRunner testRunner(std::move(clientCfg));
testRunner.runTests();
Expand Down
50 changes: 49 additions & 1 deletion thrift/conformance/stresstest/client/StressTestClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
* limitations under the License.
*/

#include <folly/synchronization/CallOnce.h>
#include <thrift/conformance/stresstest/client/StressTestClient.h>
#include <thrift/lib/cpp2/transport/rocket/payload/PayloadSerializer.h>

#include <folly/coro/Sleep.h>

Expand Down Expand Up @@ -50,6 +52,27 @@ folly::coro::Task<std::string> ThriftStressTestClient::co_echo(
co_await timedExecute([&]() -> folly::coro::Task<void> {
RpcOptions rpcOptions;
if (enableChecksum_) {
static folly::once_flag flag;

folly::call_once(flag, [] {
LOG(INFO) << "Initializing checksum payload serializer" << std::endl;
rocket::PayloadSerializer::initialize(
rocket::ChecksumPayloadSerializerStrategy<
rocket::LegacyPayloadSerializerStrategy>(
rocket::ChecksumPayloadSerializerStrategyOptions{
.recordChecksumFailure =
[] { LOG(FATAL) << "Checksum failure detected"; },
.recordChecksumSuccess =
[] {
LOG_EVERY_N(INFO, 1'000)
<< "Checksum success detected";
},
.recordChecksumCalculated =
[] {
LOG_EVERY_N(INFO, 1'000) << "Checksum calculated";
}}));
});

rpcOptions.setChecksum(RpcOptions::Checksum::XXH3_64);
}
ret = co_await client_->co_echo(rpcOptions, x);
Expand All @@ -61,7 +84,32 @@ folly::coro::Task<std::string> ThriftStressTestClient::co_echoEb(
const std::string& x) {
std::string ret;
co_await timedExecute([&]() -> folly::coro::Task<void> {
ret = co_await client_->co_echoEb(x);
RpcOptions rpcOptions;
if (enableChecksum_) {
static folly::once_flag flag;

folly::call_once(flag, [] {
LOG(INFO) << "Initializing checksum payload serializer" << std::endl;
rocket::PayloadSerializer::initialize(
rocket::ChecksumPayloadSerializerStrategy<
rocket::LegacyPayloadSerializerStrategy>(
rocket::ChecksumPayloadSerializerStrategyOptions{
.recordChecksumFailure =
[] { LOG(FATAL) << "Checksum failure detected"; },
.recordChecksumSuccess =
[] {
LOG_EVERY_N(INFO, 1'000)
<< "Checksum success detected";
},
.recordChecksumCalculated =
[] {
LOG_EVERY_N(INFO, 1'000) << "Checksum calculated";
}}));
});

rpcOptions.setChecksum(RpcOptions::Checksum::XXH3_64);
}
ret = co_await client_->co_echoEb(rpcOptions, x);
});
co_return ret;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#pragma once

#include <folly/Function.h>
#include <folly/logging/xlog.h>
#include <thrift/lib/cpp/TApplicationException.h>
#include <thrift/lib/cpp2/transport/rocket/ChecksumGenerator.h>
#include <thrift/lib/cpp2/transport/rocket/payload/PayloadSerializerStrategy.h>
#include <thrift/lib/thrift/gen-cpp2/RpcMetadata_types.h>
Expand Down Expand Up @@ -57,7 +59,7 @@ class ChecksumPayloadSerializerStrategy final
template <class T>
FOLLY_ERASE folly::Try<T> unpackAsCompressed(
Payload&& payload, bool decodeMetadataUsingBinary) {
return unpackImpl<T, /*VerifyChecksum=*/false>(
return unpackImpl<T>(
std::move(payload),
[this, decodeMetadataUsingBinary](Payload&& payload) -> folly::Try<T> {
return delegate_.template unpackAsCompressed<T>(
Expand All @@ -68,7 +70,7 @@ class ChecksumPayloadSerializerStrategy final
template <typename T>
FOLLY_ERASE folly::Try<T> unpack(
Payload&& payload, bool decodeMetadataUsingBinary) {
return unpackImpl<T, /*VerifyChecksum=*/true>(
return unpackImpl<T>(
std::move(payload),
[this, decodeMetadataUsingBinary](Payload&& payload) -> folly::Try<T> {
return delegate_.template unpack<T>(
Expand All @@ -81,6 +83,13 @@ class ChecksumPayloadSerializerStrategy final
return delegate_.packCompact(std::forward<T>(data));
}

template <typename Metadata>
bool isDataCompressed(Metadata* metadata) {
return metadata->compression().has_value() &&
metadata->compression().value() !=
apache::thrift::CompressionAlgorithm::NONE;
}

template <typename Metadata>
FOLLY_ERASE rocket::Payload packWithFds(
Metadata* metadata,
Expand Down Expand Up @@ -215,17 +224,38 @@ class ChecksumPayloadSerializerStrategy final
folly::Function<void()> recordChecksumSuccess_;
folly::Function<void()> recordChecksumCalculated_;

template <typename T, bool VerifyChecksum, typename DelegateFunc>
/**
* Helper function that makes checks to make sure that the checksum wasn't
* invalid because of incorrect setup vs an actual checksum failure.
*/
void validateInvalidChecksum(const Checksum& c) {
auto value = c.checksum().value();
auto salt = c.salt().value();

if (salt == 0 && value == 0) {
XLOG_EVERY_MS(ERR, 1'000)
<< "Received a request to checksum the payload but received a checksum and salt that are zero. "
<< "Please make sure that the ChecksumPayloadSerializerStrategy is enabled on both the client and server.";
}
}

template <typename T, typename DelegateFunc>
FOLLY_ERASE folly::Try<T> unpackImpl(Payload&& payload, DelegateFunc func) {
if (payload.hasNonemptyMetadata()) {
folly::Try<T> t = func(std::move(payload));
bool compressed = isDataCompressed(&t.value().metadata);
folly::IOBuf& buf = *t->payload.get();
if (t.hasException() || !VerifyChecksum) {
if (t.hasException() || compressed) {
return t;
} else if (validateChecksum(buf, t->metadata.checksum())) {
return t;
} else {
return folly::Try<T>(std::runtime_error("Checksum mismatch"));
if (FOLLY_LIKELY(t->metadata.checksum().has_value())) {
validateInvalidChecksum(t->metadata.checksum().value());
}
return folly::Try<T>(
folly::make_exception_wrapper<TApplicationException>(
TApplicationException::CHECKSUM_MISMATCH, "Checksum mismatch"));
}
} else {
return func(std::move(payload));
Expand Down

0 comments on commit e724f66

Please sign in to comment.