Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
feat: add query app data_version interface
Browse files Browse the repository at this point in the history
  • Loading branch information
hycdong committed Dec 24, 2020
1 parent ba19d04 commit 22e2f80
Show file tree
Hide file tree
Showing 11 changed files with 111 additions and 0 deletions.
3 changes: 3 additions & 0 deletions include/dsn/dist/replication/replication_app_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,9 @@ class replication_app_base : public replica_base
resp.__set_err_hint("on_detect_hotkey implementation not found");
}

// query pegasus data version
virtual uint32_t query_data_version() const = 0;

public:
//
// utility functions to be used by app
Expand Down
6 changes: 6 additions & 0 deletions src/replica/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -474,5 +474,11 @@ void replica::on_detect_hotkey(const detect_hotkey_request &req, detect_hotkey_r
_app->on_detect_hotkey(req, resp);
}

uint32_t replica::query_data_version() const
{
dassert_replica(_app != nullptr, "");
return _app->query_data_version();
}

} // namespace replication
} // namespace dsn
2 changes: 2 additions & 0 deletions src/replica/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,8 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba

void on_detect_hotkey(const detect_hotkey_request &req, /*out*/ detect_hotkey_response &resp);

uint32_t query_data_version() const;

private:
friend class ::dsn::replication::test::test_checker;
friend class ::dsn::replication::mutation_queue;
Expand Down
30 changes: 30 additions & 0 deletions src/replica/replica_http_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include <nlohmann/json.hpp>
#include <fmt/format.h>
#include <dsn/utility/output_utils.h>
#include "replica_http_service.h"
#include "duplication/duplication_sync_timer.h"

Expand Down Expand Up @@ -55,5 +56,34 @@ void replica_http_service::query_duplication_handler(const http_request &req, ht
resp.body = json.dump();
}

void replica_http_service::query_app_data_version_handler(const http_request &req,
http_response &resp)
{
auto it = req.query_args.find("app_id");
if (it == req.query_args.end()) {
resp.body = "app_id should not be empty";
resp.status_code = http_status_code::bad_request;
return;
}

int32_t app_id = -1;
if (!buf2int32(it->second, app_id) || app_id < 0) {
resp.body = fmt::format("invalid app_id={}", it->second);
resp.status_code = http_status_code::bad_request;
return;
}

uint32_t data_version = 0;
error_code ec = _stub->query_app_data_version(app_id, data_version);

dsn::utils::table_printer tp;
tp.add_row_name_and_data("error", ec.to_string());
tp.add_row_name_and_data("data_version", data_version);
std::ostringstream out;
tp.output(out, dsn::utils::table_printer::output_format::kJsonCompact);
resp.body = out.str();
resp.status_code = http_status_code::ok;
}

} // namespace replication
} // namespace dsn
7 changes: 7 additions & 0 deletions src/replica/replica_http_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,18 @@ class replica_http_service : public http_service
std::placeholders::_1,
std::placeholders::_2),
"ip:port/replica/duplication?appid=<appid>");
register_handler("data_version",
std::bind(&replica_http_service::query_app_data_version_handler,
this,
std::placeholders::_1,
std::placeholders::_2),
"ip:port/replica/data_version?app_id=<app_id>");
}

std::string path() const override { return "replica"; }

void query_duplication_handler(const http_request &req, http_response &resp);
void query_app_data_version_handler(const http_request &req, http_response &resp);

private:
replica_stub *_stub;
Expand Down
21 changes: 21 additions & 0 deletions src/replica/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2808,5 +2808,26 @@ void replica_stub::on_detect_hotkey(detect_hotkey_rpc rpc)
response.err_hint = fmt::format("not find the replica {} \n", request.pid);
}
}

error_code replica_stub::query_app_data_version(int32_t app_id, /*out*/ uint32_t &data_version)
{
replica_ptr rep = nullptr;
zauto_read_lock l(_replicas_lock);
for (const auto &kv : _replicas) {
if (kv.first.get_app_id() == app_id) {
rep = kv.second;
break;
}
}
if (rep == nullptr) {
dwarn_f("app({}) is not found", app_id);
return ERR_OBJECT_NOT_FOUND;
}

data_version = rep->query_data_version();
ddebug_f("app({}) data_version={}", app_id, data_version);
return ERR_OK;
}

} // namespace replication
} // namespace dsn
2 changes: 2 additions & 0 deletions src/replica/replica_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,8 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
return 0;
}

error_code query_app_data_version(int32_t app_id, /*out*/ uint32_t &data_version);

#ifdef DSN_ENABLE_GPERF
// Try to release tcmalloc memory back to operating system
void gc_tcmalloc_memory();
Expand Down
2 changes: 2 additions & 0 deletions src/replica/storage/simple_kv/simple_kv.server.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ class simple_kv_service_impl : public simple_kv_service

virtual void query_app_envs(/*out*/ std::map<std::string, std::string> &envs) {}

virtual uint32_t query_data_version() const override { return 0; }

private:
void recover();
void recover(const std::string &name, int64_t version);
Expand Down
2 changes: 2 additions & 0 deletions src/replica/storage/simple_kv/test/simple_kv.server.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ class simple_kv_service_impl : public application::simple_kv_service

virtual void query_app_envs(/*out*/ std::map<std::string, std::string> &envs) {}

virtual uint32_t query_data_version() const override { return 0; }

private:
void recover();
void recover(const std::string &name, int64_t version);
Expand Down
2 changes: 2 additions & 0 deletions src/replica/test/mock_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ class mock_replication_app_base : public replication_app_base
void set_ingestion_status(ingestion_status::type status) { _ingestion_status = status; }
ingestion_status::type get_ingestion_status() override { return _ingestion_status; }

uint32_t query_data_version() const { return 1; }

private:
std::map<std::string, std::string> _envs;
decree _decree = 5;
Expand Down
34 changes: 34 additions & 0 deletions src/replica/test/replica_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <dsn/utility/fail_point.h>
#include "replica_test_base.h"
#include <dsn/utility/defer.h>
#include "replica/replica_http_service.h"

namespace dsn {
namespace replication {
Expand All @@ -21,6 +22,7 @@ class replica_test : public replica_test_base
public:
void SetUp() override
{
FLAGS_enable_http_server = false;
stub->install_perf_counters();
mock_app_info();
_mock_replica = stub->generate_replica(_app_info, pid, partition_status::PS_PRIMARY, 1);
Expand Down Expand Up @@ -81,5 +83,37 @@ TEST_F(replica_test, backup_request_qps)
ASSERT_GT(get_table_level_backup_request_qps(), 0);
}

TEST_F(replica_test, query_data_version_test)
{
replica_http_service http_svc(stub.get());
struct query_data_version_test
{
std::string app_id;
http_status_code expected_code;
std::string expected_response_json;
} tests[] = {{"", http_status_code::bad_request, "app_id should not be empty"},
{"wrong", http_status_code::bad_request, "invalid app_id=wrong"},
{"2",
http_status_code::ok,
R"({"error":"ERR_OK","data_version":"1"})"},
{"4",
http_status_code::ok,
R"({"error":"ERR_OBJECT_NOT_FOUND","data_version":"0"})"}};
for (const auto &test : tests) {
http_request req;
http_response resp;
if (!test.app_id.empty()) {
req.query_args["app_id"] = test.app_id;
}
http_svc.query_app_data_version_handler(req, resp);
ASSERT_EQ(resp.status_code, test.expected_code);
std::string expected_json = test.expected_response_json;
if (test.expected_code == http_status_code::ok) {
expected_json += "\n";
}
ASSERT_EQ(resp.body, expected_json);
}
}

} // namespace replication
} // namespace dsn

0 comments on commit 22e2f80

Please sign in to comment.