Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
feat: send hotkey_detect_request in ddl_client (#632)
Browse files Browse the repository at this point in the history
  • Loading branch information
Smityz authored Sep 28, 2020
1 parent f90bbfc commit 69102a7
Show file tree
Hide file tree
Showing 8 changed files with 501 additions and 6 deletions.
15 changes: 10 additions & 5 deletions include/dsn/dist/replication/replication_ddl_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ class replication_ddl_client

error_with<query_bulk_load_response> query_bulk_load(const std::string &app_name);

error_code detect_hotkey(const dsn::rpc_address &target,
detect_hotkey_request &req,
detect_hotkey_response &resp);

private:
bool static valid_app_char(int c);

Expand Down Expand Up @@ -242,10 +246,10 @@ class replication_ddl_client

/// Send request to multi replica server synchronously.
template <typename TRpcHolder, typename TResponse = typename TRpcHolder::response_type>
void call_rpcs_async(std::map<dsn::rpc_address, TRpcHolder> &rpcs,
std::map<dsn::rpc_address, error_with<TResponse>> &resps,
int reply_thread_hash = 0,
bool enable_retry = true)
void call_rpcs_sync(std::map<dsn::rpc_address, TRpcHolder> &rpcs,
std::map<dsn::rpc_address, error_with<TResponse>> &resps,
int reply_thread_hash = 0,
bool enable_retry = true)
{
dsn::task_tracker tracker;
error_code err = ERR_UNKNOWN;
Expand All @@ -267,7 +271,7 @@ class replication_ddl_client

if (enable_retry && rpcs.size() > 0) {
std::map<dsn::rpc_address, dsn::error_with<TResponse>> retry_resps;
call_rpcs_async(rpcs, retry_resps, reply_thread_hash, false);
call_rpcs_sync(rpcs, retry_resps, reply_thread_hash, false);
for (auto &resp : retry_resps) {
resps.emplace(resp.first, std::move(resp.second));
}
Expand All @@ -278,6 +282,7 @@ class replication_ddl_client
dsn::rpc_address _meta_server;
dsn::task_tracker _tracker;

typedef rpc_holder<detect_hotkey_request, detect_hotkey_response> detect_hotkey_rpc;
typedef rpc_holder<query_disk_info_request, query_disk_info_response> query_disk_info_rpc;
};
} // namespace replication
Expand Down
10 changes: 10 additions & 0 deletions include/dsn/dist/replication/replication_enums.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,14 @@ ENUM_REG(replication::ingestion_status::IS_RUNNING)
ENUM_REG(replication::ingestion_status::IS_SUCCEED)
ENUM_REG(replication::ingestion_status::IS_FAILED)
ENUM_END2(replication::ingestion_status::type, ingestion_status)

ENUM_BEGIN2(replication::hotkey_type::type, hotkey_type, replication::hotkey_type::READ)
ENUM_REG(replication::hotkey_type::READ)
ENUM_REG(replication::hotkey_type::WRITE)
ENUM_END2(replication::hotkey_type::type, hotkey_type)

ENUM_BEGIN2(replication::detect_action::type, detect_action, replication::detect_action::START)
ENUM_REG(replication::detect_action::START)
ENUM_REG(replication::detect_action::STOP)
ENUM_END2(replication::detect_action::type, detect_action)
}
138 changes: 138 additions & 0 deletions include/dsn/dist/replication/replication_types.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 14 additions & 1 deletion src/client/replication_ddl_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1546,7 +1546,7 @@ void replication_ddl_client::query_disk_info(
query_disk_info_rpcs.emplace(target,
query_disk_info_rpc(std::move(request), RPC_QUERY_DISK_INFO));
}
call_rpcs_async(query_disk_info_rpcs, resps);
call_rpcs_sync(query_disk_info_rpcs, resps);
}

error_with<start_bulk_load_response>
Expand Down Expand Up @@ -1580,5 +1580,18 @@ replication_ddl_client::query_bulk_load(const std::string &app_name)
return call_rpc_sync(query_bulk_load_rpc(std::move(req), RPC_CM_QUERY_BULK_LOAD_STATUS));
}

error_code replication_ddl_client::detect_hotkey(const dsn::rpc_address &target,
detect_hotkey_request &req,
detect_hotkey_response &resp)
{
std::map<dsn::rpc_address, detect_hotkey_rpc> detect_hotkey_rpcs;
auto request = make_unique<detect_hotkey_request>(req);
detect_hotkey_rpcs.emplace(target, detect_hotkey_rpc(std::move(request), RPC_DETECT_HOTKEY));
std::map<dsn::rpc_address, error_with<detect_hotkey_response>> resps;
call_rpcs_sync(detect_hotkey_rpcs, resps);
resp = resps.begin()->second.get_value();
return resps.begin()->second.get_error().code();
}

} // namespace replication
} // namespace dsn
Loading

0 comments on commit 69102a7

Please sign in to comment.