Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat(split): add partition_version #394

Merged
merged 9 commits into from
Feb 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions include/dsn/dist/replication/replication_app_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,13 @@ class replication_app_base : public replica_base
// query app envs.
virtual void query_app_envs(/*out*/ std::map<std::string, std::string> &envs) = 0;

// `partition_version` is used to guarantee data consistency during partition split.
// In normal cases, partition_version = partition_count-1, when this replica rejects read
// and write request, partition_version = -1.
//
// Thread-safe.
virtual void set_partition_version(int32_t partition_version){};

public:
//
// utility functions to be used by app
Expand Down
1 change: 1 addition & 0 deletions src/dist/replication/lib/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ replica::replica(
_options = &stub->options();
init_state();
_config.pid = gpid;
_partition_version = app.partition_count - 1;

std::string counter_str = fmt::format("private.log.size(MB)@{}", gpid);
_counter_private_log_size.init_app_counter(
Expand Down
3 changes: 3 additions & 0 deletions src/dist/replication/lib/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,9 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
// ballot when starting partition split and split will stop if ballot changed
// _child_init_ballot = 0 if partition not in partition split
ballot _child_init_ballot{0};
// in normal cases, _partition_version = partition_count-1
// when replica reject client read write request, partition_version = -1
std::atomic<int32_t> _partition_version;

// perf counters
perf_counter_wrapper _counter_private_log_size;
Expand Down
1 change: 1 addition & 0 deletions src/dist/replication/lib/replica_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ error_code replica::init_app_and_prepare_list(bool create_new)
_app = nullptr;
} else {
_is_initializing = true;
_app->set_partition_version(_app_info.partition_count - 1);

if (nullptr == _private_log) {
ddebug("%s: clear private log, dir = %s", name(), log_dir.c_str());
Expand Down
3 changes: 3 additions & 0 deletions src/dist/replication/test/replica_test/unit_test/mock_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ class mock_replication_app_base : public replication_app_base
void query_app_envs(std::map<std::string, std::string> &out) override { out = _envs; }
decree last_durable_decree() const override { return 0; }

// TODO(heyuchen): implement this function in further pull request
void set_partition_version(int32_t partition_version) override {}

private:
std::map<std::string, std::string> _envs;
decree _decree = 5;
Expand Down