diff --git a/include/dsn/dist/replication/replication_app_base.h b/include/dsn/dist/replication/replication_app_base.h index e43bad20f7..a007ce6e7e 100644 --- a/include/dsn/dist/replication/replication_app_base.h +++ b/include/dsn/dist/replication/replication_app_base.h @@ -249,6 +249,9 @@ class replication_app_base : public replica_base resp.__set_err_hint("on_detect_hotkey implementation not found"); } + // query pegasus data version + virtual uint32_t query_data_version() const = 0; + public: // // utility functions to be used by app diff --git a/src/replica/replica.cpp b/src/replica/replica.cpp index 208942eed0..1c6a060042 100644 --- a/src/replica/replica.cpp +++ b/src/replica/replica.cpp @@ -474,5 +474,11 @@ void replica::on_detect_hotkey(const detect_hotkey_request &req, detect_hotkey_r _app->on_detect_hotkey(req, resp); } +uint32_t replica::query_data_version() const +{ + dassert_replica(_app != nullptr, ""); + return _app->query_data_version(); +} + } // namespace replication } // namespace dsn diff --git a/src/replica/replica.h b/src/replica/replica.h index 84529ae086..c501acce87 100644 --- a/src/replica/replica.h +++ b/src/replica/replica.h @@ -398,6 +398,8 @@ class replica : public serverlet, public ref_counter, public replica_ba void on_detect_hotkey(const detect_hotkey_request &req, /*out*/ detect_hotkey_response &resp); + uint32_t query_data_version() const; + private: friend class ::dsn::replication::test::test_checker; friend class ::dsn::replication::mutation_queue; diff --git a/src/replica/replica_http_service.cpp b/src/replica/replica_http_service.cpp index 7396974e1f..8ebb885f27 100644 --- a/src/replica/replica_http_service.cpp +++ b/src/replica/replica_http_service.cpp @@ -4,6 +4,7 @@ #include #include +#include #include "replica_http_service.h" #include "duplication/duplication_sync_timer.h" @@ -55,5 +56,34 @@ void replica_http_service::query_duplication_handler(const http_request &req, ht resp.body = json.dump(); } +void replica_http_service::query_app_data_version_handler(const http_request &req, + http_response &resp) +{ + auto it = req.query_args.find("app_id"); + if (it == req.query_args.end()) { + resp.body = "app_id should not be empty"; + resp.status_code = http_status_code::bad_request; + return; + } + + int32_t app_id = -1; + if (!buf2int32(it->second, app_id) || app_id < 0) { + resp.body = fmt::format("invalid app_id={}", it->second); + resp.status_code = http_status_code::bad_request; + return; + } + + uint32_t data_version = 0; + error_code ec = _stub->query_app_data_version(app_id, data_version); + + dsn::utils::table_printer tp; + tp.add_row_name_and_data("error", ec.to_string()); + tp.add_row_name_and_data("data_version", data_version); + std::ostringstream out; + tp.output(out, dsn::utils::table_printer::output_format::kJsonCompact); + resp.body = out.str(); + resp.status_code = http_status_code::ok; +} + } // namespace replication } // namespace dsn diff --git a/src/replica/replica_http_service.h b/src/replica/replica_http_service.h index 8f191e6e0b..d58daced5d 100644 --- a/src/replica/replica_http_service.h +++ b/src/replica/replica_http_service.h @@ -20,11 +20,18 @@ class replica_http_service : public http_service std::placeholders::_1, std::placeholders::_2), "ip:port/replica/duplication?appid="); + register_handler("data_version", + std::bind(&replica_http_service::query_app_data_version_handler, + this, + std::placeholders::_1, + std::placeholders::_2), + "ip:port/replica/data_version?app_id="); } std::string path() const override { return "replica"; } void query_duplication_handler(const http_request &req, http_response &resp); + void query_app_data_version_handler(const http_request &req, http_response &resp); private: replica_stub *_stub; diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp index a9e709d775..0f3ca0b217 100644 --- a/src/replica/replica_stub.cpp +++ b/src/replica/replica_stub.cpp @@ -2808,5 +2808,26 @@ void replica_stub::on_detect_hotkey(detect_hotkey_rpc rpc) response.err_hint = fmt::format("not find the replica {} \n", request.pid); } } + +error_code replica_stub::query_app_data_version(int32_t app_id, /*out*/ uint32_t &data_version) +{ + replica_ptr rep = nullptr; + zauto_read_lock l(_replicas_lock); + for (const auto &kv : _replicas) { + if (kv.first.get_app_id() == app_id) { + rep = kv.second; + break; + } + } + if (rep == nullptr) { + dwarn_f("app({}) is not found", app_id); + return ERR_OBJECT_NOT_FOUND; + } + + data_version = rep->query_data_version(); + ddebug_f("app({}) data_version={}", app_id, data_version); + return ERR_OK; +} + } // namespace replication } // namespace dsn diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h index 966d08127a..bb04355863 100644 --- a/src/replica/replica_stub.h +++ b/src/replica/replica_stub.h @@ -284,6 +284,8 @@ class replica_stub : public serverlet, public ref_counter return 0; } + error_code query_app_data_version(int32_t app_id, /*out*/ uint32_t &data_version); + #ifdef DSN_ENABLE_GPERF // Try to release tcmalloc memory back to operating system void gc_tcmalloc_memory(); diff --git a/src/replica/storage/simple_kv/simple_kv.server.impl.h b/src/replica/storage/simple_kv/simple_kv.server.impl.h index 0623c8f148..dd491dd56a 100644 --- a/src/replica/storage/simple_kv/simple_kv.server.impl.h +++ b/src/replica/storage/simple_kv/simple_kv.server.impl.h @@ -90,6 +90,8 @@ class simple_kv_service_impl : public simple_kv_service virtual void query_app_envs(/*out*/ std::map &envs) {} + virtual uint32_t query_data_version() const override { return 0; } + private: void recover(); void recover(const std::string &name, int64_t version); diff --git a/src/replica/storage/simple_kv/test/simple_kv.server.impl.h b/src/replica/storage/simple_kv/test/simple_kv.server.impl.h index 5011687c3f..b6fc5dda7d 100644 --- a/src/replica/storage/simple_kv/test/simple_kv.server.impl.h +++ b/src/replica/storage/simple_kv/test/simple_kv.server.impl.h @@ -90,6 +90,8 @@ class simple_kv_service_impl : public application::simple_kv_service virtual void query_app_envs(/*out*/ std::map &envs) {} + virtual uint32_t query_data_version() const override { return 0; } + private: void recover(); void recover(const std::string &name, int64_t version); diff --git a/src/replica/test/mock_utils.h b/src/replica/test/mock_utils.h index b21521b544..66498f6bdb 100644 --- a/src/replica/test/mock_utils.h +++ b/src/replica/test/mock_utils.h @@ -79,6 +79,8 @@ class mock_replication_app_base : public replication_app_base void set_ingestion_status(ingestion_status::type status) { _ingestion_status = status; } ingestion_status::type get_ingestion_status() override { return _ingestion_status; } + uint32_t query_data_version() const { return 1; } + private: std::map _envs; decree _decree = 5; diff --git a/src/replica/test/replica_test.cpp b/src/replica/test/replica_test.cpp index d4208c8dc1..c7f13ec9eb 100644 --- a/src/replica/test/replica_test.cpp +++ b/src/replica/test/replica_test.cpp @@ -7,6 +7,7 @@ #include #include "replica_test_base.h" #include +#include "replica/replica_http_service.h" namespace dsn { namespace replication { @@ -21,6 +22,7 @@ class replica_test : public replica_test_base public: void SetUp() override { + FLAGS_enable_http_server = false; stub->install_perf_counters(); mock_app_info(); _mock_replica = stub->generate_replica(_app_info, pid, partition_status::PS_PRIMARY, 1); @@ -81,5 +83,37 @@ TEST_F(replica_test, backup_request_qps) ASSERT_GT(get_table_level_backup_request_qps(), 0); } +TEST_F(replica_test, query_data_version_test) +{ + replica_http_service http_svc(stub.get()); + struct query_data_version_test + { + std::string app_id; + http_status_code expected_code; + std::string expected_response_json; + } tests[] = {{"", http_status_code::bad_request, "app_id should not be empty"}, + {"wrong", http_status_code::bad_request, "invalid app_id=wrong"}, + {"2", + http_status_code::ok, + R"({"error":"ERR_OK","data_version":"1"})"}, + {"4", + http_status_code::ok, + R"({"error":"ERR_OBJECT_NOT_FOUND","data_version":"0"})"}}; + for (const auto &test : tests) { + http_request req; + http_response resp; + if (!test.app_id.empty()) { + req.query_args["app_id"] = test.app_id; + } + http_svc.query_app_data_version_handler(req, resp); + ASSERT_EQ(resp.status_code, test.expected_code); + std::string expected_json = test.expected_response_json; + if (test.expected_code == http_status_code::ok) { + expected_json += "\n"; + } + ASSERT_EQ(resp.body, expected_json); + } +} + } // namespace replication } // namespace dsn