diff --git a/src/dist/replication/meta_server/duplication/duplication_info.cpp b/src/dist/replication/meta_server/duplication/duplication_info.cpp index 93c6b0414c..877ec3c1a3 100644 --- a/src/dist/replication/meta_server/duplication/duplication_info.cpp +++ b/src/dist/replication/meta_server/duplication/duplication_info.cpp @@ -135,6 +135,12 @@ void duplication_info::persist_status() zauto_write_lock l(_lock); dassert_dup(_is_altering, this, ""); + ddebug_dup(this, + "change duplication status from {} to {} successfully [app_id: {}]", + duplication_status_to_string(_status), + duplication_status_to_string(_next_status), + app_id); + _is_altering = false; _status = _next_status; _next_status = duplication_status::DS_INIT; @@ -173,8 +179,12 @@ duplication_info_s_ptr duplication_info::decode_from_blob(dupid_t dup_id, if (!json::json_forwarder::decode(json, info)) { return nullptr; } - auto dup = std::make_shared( - dup_id, app_id, partition_count, std::move(info.remote), std::move(store_path)); + auto dup = std::make_shared(dup_id, + app_id, + partition_count, + info.create_timestamp_ms, + std::move(info.remote), + std::move(store_path)); dup->_status = info.status; return dup; } diff --git a/src/dist/replication/meta_server/duplication/duplication_info.h b/src/dist/replication/meta_server/duplication/duplication_info.h index 9d72302927..601756acef 100644 --- a/src/dist/replication/meta_server/duplication/duplication_info.h +++ b/src/dist/replication/meta_server/duplication/duplication_info.h @@ -48,16 +48,18 @@ class duplication_info { public: /// \see meta_duplication_service::new_dup_from_init + /// \see duplication_info::decode_from_blob duplication_info(dupid_t dupid, int32_t appid, int32_t partition_count, + uint64_t create_now_ms, std::string remote_cluster_name, std::string meta_store_path) : id(dupid), app_id(appid), remote(std::move(remote_cluster_name)), store_path(std::move(meta_store_path)), - create_timestamp_ms(dsn_now_ms()) + create_timestamp_ms(create_now_ms) { for (int i = 0; i < partition_count; i++) { _progress[i] = {}; @@ -153,6 +155,7 @@ class duplication_info private: friend class duplication_info_test; + friend class meta_duplication_service_test; error_code do_alter_status(duplication_status::type to_status); diff --git a/src/dist/replication/meta_server/duplication/meta_duplication_service.cpp b/src/dist/replication/meta_server/duplication/meta_duplication_service.cpp index 8b91655334..93b594f658 100644 --- a/src/dist/replication/meta_server/duplication/meta_duplication_service.cpp +++ b/src/dist/replication/meta_server/duplication/meta_duplication_service.cpp @@ -59,6 +59,62 @@ void meta_duplication_service::query_duplication_info(const duplication_query_re } } +// ThreadPool(WRITE): THREAD_POOL_META_STATE +void meta_duplication_service::change_duplication_status(duplication_status_change_rpc rpc) +{ + const auto &request = rpc.request(); + auto &response = rpc.response(); + + ddebug_f("change status of duplication({}) to {} for app({})", + request.dupid, + duplication_status_to_string(request.status), + request.app_name); + + dupid_t dupid = request.dupid; + + std::shared_ptr app = _state->get_app(request.app_name); + if (!app || app->status != app_status::AS_AVAILABLE) { + response.err = ERR_APP_NOT_EXIST; + return; + } + + duplication_info_s_ptr dup = app->duplications[dupid]; + if (dup == nullptr) { + response.err = ERR_OBJECT_NOT_FOUND; + return; + } + + response.err = dup->alter_status(request.status); + if (response.err != ERR_OK) { + return; + } + + // validation passed + do_change_duplication_status(app, dup, rpc); +} + +// ThreadPool(WRITE): THREAD_POOL_META_STATE +void meta_duplication_service::do_change_duplication_status(std::shared_ptr &app, + duplication_info_s_ptr &dup, + duplication_status_change_rpc &rpc) +{ + // store the duplication in requested status. + blob value = dup->to_json_blob(); + + _meta_svc->get_meta_storage()->set_data( + std::string(dup->store_path), std::move(value), [rpc, this, app, dup]() { + dup->persist_status(); + rpc.response().err = ERR_OK; + rpc.response().appid = app->app_id; + + if (rpc.request().status == duplication_status::DS_REMOVED) { + zauto_write_lock l(app_lock()); + app->duplications.erase(dup->id); + refresh_duplicating_no_lock(app); + } + }); +} + // This call will not recreate if the duplication // with the same app name and remote end point already exists. // ThreadPool(WRITE): THREAD_POOL_META_STATE @@ -156,8 +212,12 @@ meta_duplication_service::new_dup_from_init(const std::string &remote_cluster_na dupid++; std::string dup_path = get_duplication_path(*app, std::to_string(dupid)); - dup = std::make_shared( - dupid, app->app_id, app->partition_count, remote_cluster_name, std::move(dup_path)); + dup = std::make_shared(dupid, + app->app_id, + app->partition_count, + dsn_now_ms(), + remote_cluster_name, + std::move(dup_path)); for (int32_t i = 0; i < app->partition_count; i++) { dup->init_progress(i, invalid_decree); } diff --git a/src/dist/replication/meta_server/duplication/meta_duplication_service.h b/src/dist/replication/meta_server/duplication/meta_duplication_service.h index 49b5719ead..339e07f574 100644 --- a/src/dist/replication/meta_server/duplication/meta_duplication_service.h +++ b/src/dist/replication/meta_server/duplication/meta_duplication_service.h @@ -47,11 +47,17 @@ class meta_duplication_service void add_duplication(duplication_add_rpc rpc); + void change_duplication_status(duplication_status_change_rpc rpc); + private: void do_add_duplication(std::shared_ptr &app, duplication_info_s_ptr &dup, duplication_add_rpc &rpc); + void do_change_duplication_status(std::shared_ptr &app, + duplication_info_s_ptr &dup, + duplication_status_change_rpc &rpc); + // Get zk path for duplication. std::string get_duplication_path(const app_state &app) const { diff --git a/src/dist/replication/meta_server/meta_service.cpp b/src/dist/replication/meta_server/meta_service.cpp index d68c801b87..0a8d61f942 100644 --- a/src/dist/replication/meta_server/meta_service.cpp +++ b/src/dist/replication/meta_server/meta_service.cpp @@ -799,6 +799,20 @@ void meta_service::on_add_duplication(duplication_add_rpc rpc) server_state::sStateHash); } +void meta_service::on_change_duplication_status(duplication_status_change_rpc rpc) +{ + RPC_CHECK_STATUS(rpc.dsn_request(), rpc.response()); + + if (!_dup_svc) { + rpc.response().err = ERR_SERVICE_NOT_ACTIVE; + return; + } + tasking::enqueue(LPC_META_STATE_NORMAL, + tracker(), + [this, rpc]() { _dup_svc->change_duplication_status(std::move(rpc)); }, + server_state::sStateHash); +} + void meta_service::on_query_duplication_info(duplication_query_rpc rpc) { RPC_CHECK_STATUS(rpc.dsn_request(), rpc.response()); @@ -814,6 +828,9 @@ void meta_service::register_duplication_rpc_handlers() { register_rpc_handler_with_rpc_holder( RPC_CM_ADD_DUPLICATION, "add_duplication", &meta_service::on_add_duplication); + register_rpc_handler_with_rpc_holder(RPC_CM_CHANGE_DUPLICATION_STATUS, + "change duplication status", + &meta_service::on_change_duplication_status); register_rpc_handler_with_rpc_holder(RPC_CM_QUERY_DUPLICATION, "query duplication info", &meta_service::on_query_duplication_info); diff --git a/src/dist/replication/test/meta_test/unit_test/duplication_info_test.cpp b/src/dist/replication/test/meta_test/unit_test/duplication_info_test.cpp index ea421da155..20f6558e59 100644 --- a/src/dist/replication/test/meta_test/unit_test/duplication_info_test.cpp +++ b/src/dist/replication/test/meta_test/unit_test/duplication_info_test.cpp @@ -37,7 +37,8 @@ class duplication_info_test : public testing::Test public: static void test_alter_progress() { - duplication_info dup(1, 1, 4, "dsn://slave-cluster/temp", "/meta_test/101/duplication/1"); + duplication_info dup( + 1, 1, 4, 0, "dsn://slave-cluster/temp", "/meta_test/101/duplication/1"); ASSERT_FALSE(dup.alter_progress(1, 5)); dup.init_progress(1, invalid_decree); @@ -66,7 +67,8 @@ class duplication_info_test : public testing::Test static void test_init_and_start() { - duplication_info dup(1, 1, 4, "dsn://slave-cluster/temp", "/meta_test/101/duplication/1"); + duplication_info dup( + 1, 1, 4, 0, "dsn://slave-cluster/temp", "/meta_test/101/duplication/1"); ASSERT_FALSE(dup.is_altering()); ASSERT_EQ(dup._status, duplication_status::DS_INIT); ASSERT_EQ(dup._next_status, duplication_status::DS_INIT); @@ -85,7 +87,8 @@ class duplication_info_test : public testing::Test static void test_persist_status() { - duplication_info dup(1, 1, 4, "dsn://slave-cluster/temp", "/meta_test/101/duplication/1"); + duplication_info dup( + 1, 1, 4, 0, "dsn://slave-cluster/temp", "/meta_test/101/duplication/1"); dup.start(); dup.persist_status(); @@ -96,7 +99,8 @@ class duplication_info_test : public testing::Test static void test_encode_and_decode() { - duplication_info dup(1, 1, 4, "dsn://slave-cluster/temp", "/meta_test/101/duplication/1"); + duplication_info dup( + 1, 1, 4, 0, "dsn://slave-cluster/temp", "/meta_test/101/duplication/1"); dup.start(); dup.persist_status(); @@ -123,7 +127,7 @@ class duplication_info_test : public testing::Test TEST_F(duplication_info_test, alter_status_when_busy) { - duplication_info dup(1, 1, 4, "dsn://slave-cluster/temp", "/meta_test/101/duplication/1"); + duplication_info dup(1, 1, 4, 0, "dsn://slave-cluster/temp", "/meta_test/101/duplication/1"); dup.start(); ASSERT_EQ(dup.alter_status(duplication_status::DS_PAUSE), ERR_BUSY); @@ -154,7 +158,8 @@ TEST_F(duplication_info_test, alter_status) }; for (auto tt : tests) { - duplication_info dup(1, 1, 4, "dsn://slave-cluster/temp", "/meta_test/101/duplication/1"); + duplication_info dup( + 1, 1, 4, 0, "dsn://slave-cluster/temp", "/meta_test/101/duplication/1"); dup.start(); dup.persist_status(); diff --git a/src/dist/replication/test/meta_test/unit_test/meta_duplication_service_test.cpp b/src/dist/replication/test/meta_test/unit_test/meta_duplication_service_test.cpp new file mode 100644 index 0000000000..fa21a6b3b9 --- /dev/null +++ b/src/dist/replication/test/meta_test/unit_test/meta_duplication_service_test.cpp @@ -0,0 +1,349 @@ +/* + * The MIT License (MIT) + * + * Copyright (c) 2015 Microsoft Corporation + * + * -=- Robust Distributed System Nucleus (rDSN) -=- + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#include +#include + +#include "dist/replication/meta_server/server_load_balancer.h" +#include "dist/replication/meta_server/meta_server_failure_detector.h" +#include "dist/replication/meta_server/duplication/meta_duplication_service.h" +#include "dist/replication/test/meta_test/misc/misc.h" + +#include "meta_service_test_app.h" + +namespace dsn { +namespace replication { + +class meta_duplication_service_test : public ::testing::Test +{ +public: + meta_duplication_service_test() {} + + void SetUp() override + { + _ms.reset(new fake_receiver_meta_service); + ASSERT_EQ(_ms->remote_storage_initialize(), ERR_OK); + _ms->initialize_duplication_service(); + ASSERT_TRUE(_ms->_dup_svc); + + _ss = _ms->_state; + _ss->initialize(_ms.get(), _ms->_cluster_root + "/apps"); + + _ms->_started = true; + + // recover apps from meta storage + ASSERT_EQ(_ss->initialize_data_structure(), ERR_OK); + } + + void TearDown() override + { + if (_ss && _ms) { + delete_all_on_meta_storage(); + } + + _ss.reset(); + _ms.reset(nullptr); + } + + meta_duplication_service &dup_svc() { return *(_ms->_dup_svc); } + + // create an app for test with specified name. + void create_app(const std::string &name) + { + configuration_create_app_request req; + configuration_create_app_response resp; + req.app_name = name; + req.options.app_type = "simple_kv"; + req.options.partition_count = 8; + req.options.replica_count = 3; + req.options.success_if_exist = false; + req.options.is_stateful = true; + req.options.envs["value_version"] = "1"; + + auto result = fake_create_app(_ss.get(), req); + fake_wait_rpc(result, resp); + ASSERT_EQ(resp.err, ERR_OK) << resp.err.to_string() << " " << name; + + // wait for the table to create + ASSERT_TRUE(_ss->spin_wait_staging(30)); + } + + std::shared_ptr find_app(const std::string &name) { return _ss->get_app(name); } + + void delete_all_on_meta_storage() + { + _ms->get_meta_storage()->get_children( + {"/"}, [this](bool, const std::vector &children) { + for (const std::string &child : children) { + _ms->get_meta_storage()->delete_node_recursively("/" + child, []() {}); + } + }); + wait_all(); + } + + duplication_add_response create_dup(const std::string &app_name, + const std::string &remote_cluster = "slave-cluster", + bool freezed = false) + { + auto req = make_unique(); + req->app_name = app_name; + req->remote_cluster_name = remote_cluster; + req->freezed = freezed; + + duplication_add_rpc rpc(std::move(req), RPC_CM_ADD_DUPLICATION); + dup_svc().add_duplication(rpc); + wait_all(); + return rpc.response(); + } + + duplication_query_response query_dup_info(const std::string &app_name) + { + auto req = make_unique(); + req->app_name = app_name; + + duplication_query_rpc rpc(std::move(req), RPC_CM_QUERY_DUPLICATION); + dup_svc().query_duplication_info(rpc.request(), rpc.response()); + + return rpc.response(); + } + + duplication_status_change_response + change_dup_status(const std::string &app_name, dupid_t dupid, duplication_status::type status) + { + auto req = make_unique(); + req->dupid = dupid; + req->app_name = app_name; + req->status = status; + + duplication_status_change_rpc rpc(std::move(req), RPC_CM_CHANGE_DUPLICATION_STATUS); + dup_svc().change_duplication_status(rpc); + wait_all(); + + return rpc.response(); + } + + void initialize_node_state() { _ss->initialize_node_state(); } + + void wait_all() { _ms->tracker()->wait_outstanding_tasks(); } + + /// === Tests === + + void test_new_dup_from_init() + { + std::string test_app = "test-app"; + create_app(test_app); + auto app = find_app(test_app); + std::string remote_cluster_address = "dsn://slave-cluster/temp"; + + int last_dup = 0; + for (int i = 0; i < 1000; i++) { + auto dup = dup_svc().new_dup_from_init(remote_cluster_address, app); + + ASSERT_GT(dup->id, 0); + ASSERT_FALSE(dup->is_altering()); + ASSERT_EQ(dup->_status, duplication_status::DS_INIT); + ASSERT_EQ(dup->_next_status, duplication_status::DS_INIT); + + auto ent = dup->to_duplication_entry(); + for (int j = 0; j < app->partition_count; j++) { + ASSERT_EQ(ent.progress[j], invalid_decree); + } + + if (last_dup != 0) { + ASSERT_GT(dup->id, last_dup); + } + last_dup = dup->id; + } + } + + void test_add_duplication() + { + std::string test_app = "test-app"; + std::string test_app_invalid_ver = "test-app-invalid-ver"; + + std::string invalid_remote = "test-invalid-remote"; + std::string ok_remote = "slave-cluster"; + + create_app(test_app); + + create_app(test_app_invalid_ver); + find_app(test_app_invalid_ver)->envs["value_version"] = "0"; + + struct TestData + { + std::string app; + std::string remote; + + error_code wec; + } tests[] = { + // {test_app_invalid_ver, ok_remote, ERR_INVALID_VERSION}, + + {test_app, ok_remote, ERR_OK}, + + {test_app, invalid_remote, ERR_INVALID_PARAMETERS}, + + {test_app, get_current_cluster_name(), ERR_INVALID_PARAMETERS}, + }; + + for (auto tt : tests) { + auto resp = create_dup(tt.app, tt.remote); + ASSERT_EQ(tt.wec, resp.err); + + if (tt.wec == ERR_OK) { + auto app = find_app(test_app); + auto dup = app->duplications[resp.dupid]; + ASSERT_TRUE(dup != nullptr); + ASSERT_EQ(dup->app_id, app->app_id); + ASSERT_EQ(dup->_status, duplication_status::DS_START); + ASSERT_EQ(dup->remote, ok_remote); + ASSERT_EQ(resp.dupid, dup->id); + ASSERT_EQ(app->duplicating, true); + } + } + } + + std::shared_ptr _ss; + std::unique_ptr _ms; +}; + +// This test ensures that duplication upon an unavailable app will +// be rejected with ERR_APP_NOT_EXIST. +TEST_F(meta_duplication_service_test, dup_op_upon_unavail_app) +{ + std::string test_app = "test-app"; + std::string test_app_not_exist = "test-app-not-exists"; + std::string test_app_unavail = "test-app-unavail"; + + create_app(test_app); + auto app = find_app(test_app); + + create_app(test_app_unavail); + find_app(test_app_unavail)->status = app_status::AS_DROPPED; + + dupid_t test_dup = create_dup(test_app).dupid; + + struct TestData + { + std::string app; + + error_code wec; + } tests[] = { + {test_app_not_exist, ERR_APP_NOT_EXIST}, + {test_app_unavail, ERR_APP_NOT_EXIST}, + + {test_app, ERR_OK}, + }; + + for (auto tt : tests) { + ASSERT_EQ(query_dup_info(tt.app).err, tt.wec); + ASSERT_EQ(create_dup(tt.app).err, tt.wec); + ASSERT_EQ(change_dup_status(tt.app, test_dup, duplication_status::DS_PAUSE).err, tt.wec); + } +} + +TEST_F(meta_duplication_service_test, add_duplication) { test_add_duplication(); } + +// Ensure meta server never creates another dup to the same remote cluster and app, +// if there's already one existed. +TEST_F(meta_duplication_service_test, dont_create_if_existed) +{ + std::string test_app = "test-app"; + + create_app(test_app); + auto app = find_app(test_app); + + create_dup(test_app); + create_dup(test_app); + dupid_t dupid = create_dup(test_app).dupid; + + { + auto resp = query_dup_info(test_app); + ASSERT_EQ(resp.err, ERR_OK); + ASSERT_EQ(resp.entry_list.size(), 1); + + const auto &duplication_entry = resp.entry_list.back(); + ASSERT_EQ(duplication_entry.status, duplication_status::DS_START); + ASSERT_EQ(duplication_entry.dupid, dupid); + } +} + +TEST_F(meta_duplication_service_test, change_duplication_status) +{ + std::string test_app = "test-app"; + + create_app(test_app); + auto app = find_app(test_app); + + dupid_t test_dup = create_dup(test_app).dupid; + + struct TestData + { + std::string app; + dupid_t dupid; + duplication_status::type status; + + error_code wec; + } tests[] = { + {test_app, test_dup + 1, duplication_status::DS_INIT, ERR_OBJECT_NOT_FOUND}, + + // ok test + {test_app, test_dup, duplication_status::DS_PAUSE, ERR_OK}, + }; + + for (auto tt : tests) { + auto resp = change_dup_status(tt.app, tt.dupid, tt.status); + ASSERT_EQ(resp.err, tt.wec); + } +} + +// this test ensures that dupid is always increment and larger than zero. +TEST_F(meta_duplication_service_test, new_dup_from_init) { test_new_dup_from_init(); } + +TEST_F(meta_duplication_service_test, query_duplication_info) +{ + std::string test_app = "test-app"; + + create_app(test_app); + auto app = find_app(test_app); + + dupid_t test_dup = create_dup(test_app).dupid; + change_dup_status(test_app, test_dup, duplication_status::DS_PAUSE); + + auto resp = query_dup_info(test_app); + ASSERT_EQ(resp.err, ERR_OK); + ASSERT_EQ(resp.entry_list.size(), 1); + ASSERT_EQ(resp.entry_list.back().status, duplication_status::DS_PAUSE); + ASSERT_EQ(resp.entry_list.back().dupid, test_dup); + ASSERT_EQ(resp.appid, app->app_id); + + change_dup_status(test_app, test_dup, duplication_status::DS_REMOVED); + resp = query_dup_info(test_app); + ASSERT_EQ(resp.err, ERR_OK); + ASSERT_EQ(resp.entry_list.size(), 0); +} + +} // namespace replication +} // namespace dsn