Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat(split): replica add validate_partition_hash #747

Merged
merged 4 commits into from
Feb 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/dsn/utility/error_code.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,6 @@ DEFINE_ERR_CODE(ERR_KRB5_INTERNAL)
DEFINE_ERR_CODE(ERR_SASL_INTERNAL)
DEFINE_ERR_CODE(ERR_SASL_INCOMPLETE)
DEFINE_ERR_CODE(ERR_ACL_DENY)
DEFINE_ERR_CODE(ERR_SPLITTING)
DEFINE_ERR_CODE(ERR_PARENT_PARTITION_MISUSED)
} // namespace dsn
2 changes: 2 additions & 0 deletions src/replica/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ void replica::on_client_read(dsn::message_ex *request, bool ignore_throttling)
response_client_read(request, ERR_ACL_DENY);
}

CHECK_REQUEST_IF_SPLITTING(read)

if (status() == partition_status::PS_INACTIVE ||
status() == partition_status::PS_POTENTIAL_SECONDARY) {
response_client_read(request, ERR_INVALID_STATE);
Expand Down
19 changes: 19 additions & 0 deletions src/replica/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,19 @@ enum manual_compaction_status
};
const char *manual_compaction_status_to_string(manual_compaction_status status);

#define CHECK_REQUEST_IF_SPLITTING(op_type) \
if (_validate_partition_hash) { \
if (_split_mgr->should_reject_request()) { \
response_client_##op_type(request, ERR_SPLITTING); \
return; \
} \
if (!_split_mgr->check_partition_hash( \
((dsn::message_ex *)request)->header->client.partition_hash, #op_type)) { \
response_client_##op_type(request, ERR_PARENT_PARTITION_MISUSED); \
return; \
} \
}

class replica : public serverlet<replica>, public ref_counter, public replica_base
{
public:
Expand Down Expand Up @@ -414,6 +427,11 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
// update allowed users for access controller
void update_ac_allowed_users(const std::map<std::string, std::string> &envs);

// update bool app envs
void update_bool_envs(const std::map<std::string, std::string> &envs,
const std::string &name,
/*out*/ bool &value);

private:
friend class ::dsn::replication::test::test_checker;
friend class ::dsn::replication::mutation_queue;
Expand Down Expand Up @@ -519,6 +537,7 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba

// partition split
std::unique_ptr<replica_split_manager> _split_mgr;
bool _validate_partition_hash{false};

// disk migrator
std::unique_ptr<replica_disk_migrator> _disk_migrator;
Expand Down
2 changes: 2 additions & 0 deletions src/replica/replica_2pc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling)
return;
}

CHECK_REQUEST_IF_SPLITTING(write)

if (partition_status::PS_PRIMARY != status()) {
response_client_write(request, ERR_INVALID_STATE);
return;
Expand Down
34 changes: 20 additions & 14 deletions src/replica/replica_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -554,26 +554,32 @@ void replica::update_app_envs(const std::map<std::string, std::string> &envs)

void replica::update_app_envs_internal(const std::map<std::string, std::string> &envs)
{
// DENY_CLIENT_WRITE
bool deny_client_write = false;
auto find = envs.find(replica_envs::DENY_CLIENT_WRITE);
if (find != envs.end()) {
if (!buf2bool(find->second, deny_client_write)) {
dwarn_replica(
"invalid value of env {}: \"{}\"", replica_envs::DENY_CLIENT_WRITE, find->second);
}
}
if (deny_client_write != _deny_client_write) {
ddebug_replica(
"switch _deny_client_write from {} to {}", _deny_client_write, deny_client_write);
_deny_client_write = deny_client_write;
}
update_bool_envs(envs, replica_envs::DENY_CLIENT_WRITE, _deny_client_write);

update_bool_envs(envs, replica_envs::SPLIT_VALIDATE_PARTITION_HASH, _validate_partition_hash);

update_throttle_envs(envs);

update_ac_allowed_users(envs);
}

void replica::update_bool_envs(const std::map<std::string, std::string> &envs,
const std::string &name,
bool &value)
{
bool new_value = value;
auto iter = envs.find(name);
if (iter != envs.end()) {
if (!buf2bool(iter->second, new_value)) {
dwarn_replica("invalid value of env {}: \"{}\"", name, iter->second);
}
}
if (new_value != value) {
ddebug_replica("switch env[{}] from {} to {}", name, value, new_value);
value = new_value;
}
}

void replica::update_ac_allowed_users(const std::map<std::string, std::string> &envs)
{
std::string allowed_users;
Expand Down
16 changes: 16 additions & 0 deletions src/replica/split/replica_split_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,22 @@ class replica_split_manager : replica_base
ballot get_ballot() const { return _replica->get_ballot(); }
decree last_committed_decree() const { return _replica->last_committed_decree(); }
task_tracker *tracker() { return _replica->tracker(); }
bool should_reject_request() const { return get_partition_version() == -1; }
bool check_partition_hash(const uint64_t &partition_hash, const std::string &op) const
{
auto target_pidx = get_partition_version() & partition_hash;
if (dsn_unlikely(target_pidx != get_gpid().get_partition_index())) {
derror_replica(
"receive {} request with wrong partition_hash({}), partition_version = {}, "
"target_pidx = {}",
op,
partition_hash,
get_partition_version(),
target_pidx);
return false;
}
return true;
}

private:
replica *_replica;
Expand Down
27 changes: 27 additions & 0 deletions src/replica/split/test/replica_split_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,12 @@ class replica_split_test : public replica_test_base
_parent_replica->tracker()->wait_outstanding_tasks();
}

bool test_check_partition_hash(const int32_t &partition_version, const uint64_t &partition_hash)
{
_parent_split_mgr->_partition_version.store(partition_version);
return _parent_split_mgr->check_partition_hash(partition_hash, "write");
}

/// helper functions
void cleanup_prepare_list(mock_replica_ptr rep) { rep->_prepare_list->reset(0); }
void cleanup_child_split_context()
Expand Down Expand Up @@ -918,5 +924,26 @@ TEST_F(replica_split_test, primary_parent_handle_stop_test)
}
}

TEST_F(replica_split_test, check_partition_hash_test)
{
uint64_t send_to_parent_after_split = 1;
uint64_t send_to_child_after_split = 9;

struct check_partition_hash_test
{
int32_t partition_version;
uint64_t partition_hash;
bool expected_result;
} tests[]{{OLD_PARTITION_COUNT - 1, send_to_parent_after_split, true},
{OLD_PARTITION_COUNT - 1, send_to_child_after_split, true},
{NEW_PARTITION_COUNT - 1, send_to_parent_after_split, true},
{NEW_PARTITION_COUNT - 1, send_to_child_after_split, false}};

for (const auto &test : tests) {
ASSERT_EQ(test_check_partition_hash(test.partition_version, test.partition_hash),
test.expected_result);
}
}

} // namespace replication
} // namespace dsn
42 changes: 42 additions & 0 deletions src/replica/test/replica_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <dsn/utility/fail_point.h>
#include "replica_test_base.h"
#include <dsn/utility/defer.h>
#include <dsn/dist/replication/replica_envs.h>
#include "replica/replica_http_service.h"

namespace dsn {
Expand Down Expand Up @@ -38,6 +39,22 @@ class replica_test : public replica_test_base
return _mock_replica->_counter_backup_request_qps->get_integer_value();
}

bool get_validate_partition_hash() const { return _mock_replica->_validate_partition_hash; }

void reset_validate_partition_hash() { _mock_replica->_validate_partition_hash = false; }

void update_validate_partition_hash(bool old_value, bool set_in_map, std::string new_value)
{
_mock_replica->_validate_partition_hash = old_value;
std::map<std::string, std::string> envs;
if (set_in_map) {
envs[replica_envs::SPLIT_VALIDATE_PARTITION_HASH] = new_value;
}
_mock_replica->update_bool_envs(envs,
replica_envs::SPLIT_VALIDATE_PARTITION_HASH,
_mock_replica->_validate_partition_hash);
}

void mock_app_info()
{
_app_info.app_id = 2;
Expand Down Expand Up @@ -146,5 +163,30 @@ TEST_F(replica_test, query_compaction_test)
}
}

TEST_F(replica_test, update_validate_partition_hash_test)
{
struct update_validate_partition_hash_test
{
bool old_value;
bool set_in_map;
std::string new_value;
bool expected_value;
} tests[]{
{false, false, "false", false},
{false, true, "false", false},
{false, false, "true", false},
{false, true, "true", true},
{false, true, "ture", false},
{true, true, "false", false},
{true, true, "true", true},
{true, true, "flase", true},
};
for (const auto &test : tests) {
update_validate_partition_hash(test.old_value, test.set_in_map, test.new_value);
ASSERT_EQ(get_validate_partition_hash(), test.expected_value);
reset_validate_partition_hash();
}
}

} // namespace replication
} // namespace dsn