Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
dup: add change_duplication_status on meta_server (#248)
Browse files Browse the repository at this point in the history
  • Loading branch information
Wu Tao authored May 6, 2019
1 parent 8379453 commit 80cb2c3
Show file tree
Hide file tree
Showing 7 changed files with 461 additions and 11 deletions.
14 changes: 12 additions & 2 deletions src/dist/replication/meta_server/duplication/duplication_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ void duplication_info::persist_status()
zauto_write_lock l(_lock);

dassert_dup(_is_altering, this, "");
ddebug_dup(this,
"change duplication status from {} to {} successfully [app_id: {}]",
duplication_status_to_string(_status),
duplication_status_to_string(_next_status),
app_id);

_is_altering = false;
_status = _next_status;
_next_status = duplication_status::DS_INIT;
Expand Down Expand Up @@ -173,8 +179,12 @@ duplication_info_s_ptr duplication_info::decode_from_blob(dupid_t dup_id,
if (!json::json_forwarder<json_helper>::decode(json, info)) {
return nullptr;
}
auto dup = std::make_shared<duplication_info>(
dup_id, app_id, partition_count, std::move(info.remote), std::move(store_path));
auto dup = std::make_shared<duplication_info>(dup_id,
app_id,
partition_count,
info.create_timestamp_ms,
std::move(info.remote),
std::move(store_path));
dup->_status = info.status;
return dup;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,18 @@ class duplication_info
{
public:
/// \see meta_duplication_service::new_dup_from_init
/// \see duplication_info::decode_from_blob
duplication_info(dupid_t dupid,
int32_t appid,
int32_t partition_count,
uint64_t create_now_ms,
std::string remote_cluster_name,
std::string meta_store_path)
: id(dupid),
app_id(appid),
remote(std::move(remote_cluster_name)),
store_path(std::move(meta_store_path)),
create_timestamp_ms(dsn_now_ms())
create_timestamp_ms(create_now_ms)
{
for (int i = 0; i < partition_count; i++) {
_progress[i] = {};
Expand Down Expand Up @@ -153,6 +155,7 @@ class duplication_info

private:
friend class duplication_info_test;
friend class meta_duplication_service_test;

error_code do_alter_status(duplication_status::type to_status);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,62 @@ void meta_duplication_service::query_duplication_info(const duplication_query_re
}
}

// ThreadPool(WRITE): THREAD_POOL_META_STATE
void meta_duplication_service::change_duplication_status(duplication_status_change_rpc rpc)
{
const auto &request = rpc.request();
auto &response = rpc.response();

ddebug_f("change status of duplication({}) to {} for app({})",
request.dupid,
duplication_status_to_string(request.status),
request.app_name);

dupid_t dupid = request.dupid;

std::shared_ptr<app_state> app = _state->get_app(request.app_name);
if (!app || app->status != app_status::AS_AVAILABLE) {
response.err = ERR_APP_NOT_EXIST;
return;
}

duplication_info_s_ptr dup = app->duplications[dupid];
if (dup == nullptr) {
response.err = ERR_OBJECT_NOT_FOUND;
return;
}

response.err = dup->alter_status(request.status);
if (response.err != ERR_OK) {
return;
}

// validation passed
do_change_duplication_status(app, dup, rpc);
}

// ThreadPool(WRITE): THREAD_POOL_META_STATE
void meta_duplication_service::do_change_duplication_status(std::shared_ptr<app_state> &app,
duplication_info_s_ptr &dup,
duplication_status_change_rpc &rpc)
{
// store the duplication in requested status.
blob value = dup->to_json_blob();

_meta_svc->get_meta_storage()->set_data(
std::string(dup->store_path), std::move(value), [rpc, this, app, dup]() {
dup->persist_status();
rpc.response().err = ERR_OK;
rpc.response().appid = app->app_id;

if (rpc.request().status == duplication_status::DS_REMOVED) {
zauto_write_lock l(app_lock());
app->duplications.erase(dup->id);
refresh_duplicating_no_lock(app);
}
});
}

// This call will not recreate if the duplication
// with the same app name and remote end point already exists.
// ThreadPool(WRITE): THREAD_POOL_META_STATE
Expand Down Expand Up @@ -156,8 +212,12 @@ meta_duplication_service::new_dup_from_init(const std::string &remote_cluster_na
dupid++;

std::string dup_path = get_duplication_path(*app, std::to_string(dupid));
dup = std::make_shared<duplication_info>(
dupid, app->app_id, app->partition_count, remote_cluster_name, std::move(dup_path));
dup = std::make_shared<duplication_info>(dupid,
app->app_id,
app->partition_count,
dsn_now_ms(),
remote_cluster_name,
std::move(dup_path));
for (int32_t i = 0; i < app->partition_count; i++) {
dup->init_progress(i, invalid_decree);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,17 @@ class meta_duplication_service

void add_duplication(duplication_add_rpc rpc);

void change_duplication_status(duplication_status_change_rpc rpc);

private:
void do_add_duplication(std::shared_ptr<app_state> &app,
duplication_info_s_ptr &dup,
duplication_add_rpc &rpc);

void do_change_duplication_status(std::shared_ptr<app_state> &app,
duplication_info_s_ptr &dup,
duplication_status_change_rpc &rpc);

// Get zk path for duplication.
std::string get_duplication_path(const app_state &app) const
{
Expand Down
17 changes: 17 additions & 0 deletions src/dist/replication/meta_server/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,20 @@ void meta_service::on_add_duplication(duplication_add_rpc rpc)
server_state::sStateHash);
}

void meta_service::on_change_duplication_status(duplication_status_change_rpc rpc)
{
RPC_CHECK_STATUS(rpc.dsn_request(), rpc.response());

if (!_dup_svc) {
rpc.response().err = ERR_SERVICE_NOT_ACTIVE;
return;
}
tasking::enqueue(LPC_META_STATE_NORMAL,
tracker(),
[this, rpc]() { _dup_svc->change_duplication_status(std::move(rpc)); },
server_state::sStateHash);
}

void meta_service::on_query_duplication_info(duplication_query_rpc rpc)
{
RPC_CHECK_STATUS(rpc.dsn_request(), rpc.response());
Expand All @@ -814,6 +828,9 @@ void meta_service::register_duplication_rpc_handlers()
{
register_rpc_handler_with_rpc_holder(
RPC_CM_ADD_DUPLICATION, "add_duplication", &meta_service::on_add_duplication);
register_rpc_handler_with_rpc_holder(RPC_CM_CHANGE_DUPLICATION_STATUS,
"change duplication status",
&meta_service::on_change_duplication_status);
register_rpc_handler_with_rpc_holder(RPC_CM_QUERY_DUPLICATION,
"query duplication info",
&meta_service::on_query_duplication_info);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ class duplication_info_test : public testing::Test
public:
static void test_alter_progress()
{
duplication_info dup(1, 1, 4, "dsn://slave-cluster/temp", "/meta_test/101/duplication/1");
duplication_info dup(
1, 1, 4, 0, "dsn://slave-cluster/temp", "/meta_test/101/duplication/1");
ASSERT_FALSE(dup.alter_progress(1, 5));

dup.init_progress(1, invalid_decree);
Expand Down Expand Up @@ -66,7 +67,8 @@ class duplication_info_test : public testing::Test

static void test_init_and_start()
{
duplication_info dup(1, 1, 4, "dsn://slave-cluster/temp", "/meta_test/101/duplication/1");
duplication_info dup(
1, 1, 4, 0, "dsn://slave-cluster/temp", "/meta_test/101/duplication/1");
ASSERT_FALSE(dup.is_altering());
ASSERT_EQ(dup._status, duplication_status::DS_INIT);
ASSERT_EQ(dup._next_status, duplication_status::DS_INIT);
Expand All @@ -85,7 +87,8 @@ class duplication_info_test : public testing::Test

static void test_persist_status()
{
duplication_info dup(1, 1, 4, "dsn://slave-cluster/temp", "/meta_test/101/duplication/1");
duplication_info dup(
1, 1, 4, 0, "dsn://slave-cluster/temp", "/meta_test/101/duplication/1");
dup.start();

dup.persist_status();
Expand All @@ -96,7 +99,8 @@ class duplication_info_test : public testing::Test

static void test_encode_and_decode()
{
duplication_info dup(1, 1, 4, "dsn://slave-cluster/temp", "/meta_test/101/duplication/1");
duplication_info dup(
1, 1, 4, 0, "dsn://slave-cluster/temp", "/meta_test/101/duplication/1");
dup.start();
dup.persist_status();

Expand All @@ -123,7 +127,7 @@ class duplication_info_test : public testing::Test

TEST_F(duplication_info_test, alter_status_when_busy)
{
duplication_info dup(1, 1, 4, "dsn://slave-cluster/temp", "/meta_test/101/duplication/1");
duplication_info dup(1, 1, 4, 0, "dsn://slave-cluster/temp", "/meta_test/101/duplication/1");
dup.start();

ASSERT_EQ(dup.alter_status(duplication_status::DS_PAUSE), ERR_BUSY);
Expand Down Expand Up @@ -154,7 +158,8 @@ TEST_F(duplication_info_test, alter_status)
};

for (auto tt : tests) {
duplication_info dup(1, 1, 4, "dsn://slave-cluster/temp", "/meta_test/101/duplication/1");
duplication_info dup(
1, 1, 4, 0, "dsn://slave-cluster/temp", "/meta_test/101/duplication/1");
dup.start();
dup.persist_status();

Expand Down
Loading

0 comments on commit 80cb2c3

Please sign in to comment.