Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

dup: add change_duplication_status on meta_server #248

Merged
merged 5 commits into from
May 6, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,12 @@ duplication_info_s_ptr duplication_info::decode_from_blob(dupid_t dup_id,
if (!json::json_forwarder<json_helper>::decode(json, info)) {
return nullptr;
}
auto dup = std::make_shared<duplication_info>(
dup_id, app_id, partition_count, std::move(info.remote), std::move(store_path));
auto dup = std::make_shared<duplication_info>(dup_id,
app_id,
partition_count,
info.create_timestamp_ms,
std::move(info.remote),
std::move(store_path));
dup->_status = info.status;
return dup;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,18 @@ class duplication_info
{
public:
/// \see meta_duplication_service::new_dup_from_init
/// \see duplication_info::decode_from_blob
duplication_info(dupid_t dupid,
int32_t appid,
int32_t partition_count,
uint64_t create_now_ms,
std::string remote_cluster_name,
std::string meta_store_path)
: id(dupid),
app_id(appid),
remote(std::move(remote_cluster_name)),
store_path(std::move(meta_store_path)),
create_timestamp_ms(dsn_now_ms())
create_timestamp_ms(create_now_ms)
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个地方对时间精确度要求高吗?create_now_ms的值应该是初始化赋值时候的时间值,并不是此时这条代码执行的时候值吧?

for (int i = 0; i < partition_count; i++) {
_progress[i] = {};
Expand Down Expand Up @@ -153,6 +155,7 @@ class duplication_info

private:
friend class duplication_info_test;
friend class meta_duplication_service_test;

error_code do_alter_status(duplication_status::type to_status);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,66 @@ void meta_duplication_service::query_duplication_info(const duplication_query_re
}
}

// ThreadPool(WRITE): THREAD_POOL_META_STATE
void meta_duplication_service::change_duplication_status(duplication_status_change_rpc rpc)
{
const auto &request = rpc.request();
auto &response = rpc.response();

ddebug_f("change status of duplication({}) to {} for app({})",
request.dupid,
duplication_status_to_string(request.status),
request.app_name);
hycdong marked this conversation as resolved.
Show resolved Hide resolved

dupid_t dupid = request.dupid;

std::shared_ptr<app_state> app = _state->get_app(request.app_name);
if (!app || app->status != app_status::AS_AVAILABLE) {
response.err = ERR_APP_NOT_EXIST;
return;
}

duplication_info_s_ptr dup = app->duplications[dupid];
if (dup == nullptr) {
response.err = ERR_OBJECT_NOT_FOUND;
return;
}

response.err = dup->alter_status(request.status);
if (response.err != ERR_OK) {
return;
}

// validation passed
do_change_duplication_status(app, dup, rpc);
}

// ThreadPool(WRITE): THREAD_POOL_META_STATE
void meta_duplication_service::do_change_duplication_status(std::shared_ptr<app_state> &app,
duplication_info_s_ptr &dup,
duplication_status_change_rpc &rpc)
{
// store the duplication in requested status.
blob value = dup->to_json_blob();

_meta_svc->get_meta_storage()->set_data(
std::string(dup->store_path), std::move(value), [rpc, this, app, dup]() {
ddebug_dup(dup,
"change duplication status on metastore successfully [app_name:{}]",
app->app_name);

dup->persist_status();
rpc.response().err = ERR_OK;
rpc.response().appid = app->app_id;

if (rpc.request().status == duplication_status::DS_REMOVED) {
zauto_write_lock l(app_lock());
app->duplications.erase(dup->id);
refresh_duplicating_no_lock(app);
hycdong marked this conversation as resolved.
Show resolved Hide resolved
}
});
}

// This call will not recreate if the duplication
// with the same app name and remote end point already exists.
// ThreadPool(WRITE): THREAD_POOL_META_STATE
Expand Down Expand Up @@ -156,8 +216,12 @@ meta_duplication_service::new_dup_from_init(const std::string &remote_cluster_na
dupid++;

std::string dup_path = get_duplication_path(*app, std::to_string(dupid));
dup = std::make_shared<duplication_info>(
dupid, app->app_id, app->partition_count, remote_cluster_name, std::move(dup_path));
dup = std::make_shared<duplication_info>(dupid,
app->app_id,
app->partition_count,
dsn_now_ms(),
remote_cluster_name,
std::move(dup_path));
for (int32_t i = 0; i < app->partition_count; i++) {
dup->init_progress(i, invalid_decree);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,17 @@ class meta_duplication_service

void add_duplication(duplication_add_rpc rpc);

void change_duplication_status(duplication_status_change_rpc rpc);

private:
void do_add_duplication(std::shared_ptr<app_state> &app,
duplication_info_s_ptr &dup,
duplication_add_rpc &rpc);

void do_change_duplication_status(std::shared_ptr<app_state> &app,
duplication_info_s_ptr &dup,
duplication_status_change_rpc &rpc);

// Get zk path for duplication.
std::string get_duplication_path(const app_state &app) const
{
Expand Down
17 changes: 17 additions & 0 deletions src/dist/replication/meta_server/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,20 @@ void meta_service::on_add_duplication(duplication_add_rpc rpc)
server_state::sStateHash);
}

void meta_service::on_change_duplication_status(duplication_status_change_rpc rpc)
{
RPC_CHECK_STATUS(rpc.dsn_request(), rpc.response());

if (!_dup_svc) {
rpc.response().err = ERR_SERVICE_NOT_ACTIVE;
return;
}
tasking::enqueue(LPC_META_STATE_NORMAL,
tracker(),
[this, rpc]() { _dup_svc->change_duplication_status(std::move(rpc)); },
server_state::sStateHash);
}

void meta_service::on_query_duplication_info(duplication_query_rpc rpc)
{
RPC_CHECK_STATUS(rpc.dsn_request(), rpc.response());
Expand All @@ -814,6 +828,9 @@ void meta_service::register_duplication_rpc_handlers()
{
register_rpc_handler_with_rpc_holder(
RPC_CM_ADD_DUPLICATION, "add_duplication", &meta_service::on_add_duplication);
register_rpc_handler_with_rpc_holder(RPC_CM_CHANGE_DUPLICATION_STATUS,
"change duplication status",
&meta_service::on_change_duplication_status);
register_rpc_handler_with_rpc_holder(RPC_CM_QUERY_DUPLICATION,
"query duplication info",
&meta_service::on_query_duplication_info);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ class duplication_info_test : public testing::Test
public:
static void test_alter_progress()
{
duplication_info dup(1, 1, 4, "dsn://slave-cluster/temp", "/meta_test/101/duplication/1");
duplication_info dup(
1, 1, 4, 0, "dsn://slave-cluster/temp", "/meta_test/101/duplication/1");
ASSERT_FALSE(dup.alter_progress(1, 5));

dup.init_progress(1, invalid_decree);
Expand Down Expand Up @@ -66,7 +67,8 @@ class duplication_info_test : public testing::Test

static void test_init_and_start()
{
duplication_info dup(1, 1, 4, "dsn://slave-cluster/temp", "/meta_test/101/duplication/1");
duplication_info dup(
1, 1, 4, 0, "dsn://slave-cluster/temp", "/meta_test/101/duplication/1");
ASSERT_FALSE(dup.is_altering());
ASSERT_EQ(dup._status, duplication_status::DS_INIT);
ASSERT_EQ(dup._next_status, duplication_status::DS_INIT);
Expand All @@ -85,7 +87,8 @@ class duplication_info_test : public testing::Test

static void test_persist_status()
{
duplication_info dup(1, 1, 4, "dsn://slave-cluster/temp", "/meta_test/101/duplication/1");
duplication_info dup(
1, 1, 4, 0, "dsn://slave-cluster/temp", "/meta_test/101/duplication/1");
dup.start();

dup.persist_status();
Expand All @@ -96,7 +99,8 @@ class duplication_info_test : public testing::Test

static void test_encode_and_decode()
{
duplication_info dup(1, 1, 4, "dsn://slave-cluster/temp", "/meta_test/101/duplication/1");
duplication_info dup(
1, 1, 4, 0, "dsn://slave-cluster/temp", "/meta_test/101/duplication/1");
dup.start();
dup.persist_status();

Expand All @@ -123,7 +127,7 @@ class duplication_info_test : public testing::Test

TEST_F(duplication_info_test, alter_status_when_busy)
{
duplication_info dup(1, 1, 4, "dsn://slave-cluster/temp", "/meta_test/101/duplication/1");
duplication_info dup(1, 1, 4, 0, "dsn://slave-cluster/temp", "/meta_test/101/duplication/1");
dup.start();

ASSERT_EQ(dup.alter_status(duplication_status::DS_PAUSE), ERR_BUSY);
Expand Down Expand Up @@ -154,7 +158,8 @@ TEST_F(duplication_info_test, alter_status)
};

for (auto tt : tests) {
duplication_info dup(1, 1, 4, "dsn://slave-cluster/temp", "/meta_test/101/duplication/1");
duplication_info dup(
1, 1, 4, 0, "dsn://slave-cluster/temp", "/meta_test/101/duplication/1");
dup.start();
dup.persist_status();

Expand Down
Loading