diff --git a/rdsn b/rdsn index fafbd599d1..69102a786f 160000 --- a/rdsn +++ b/rdsn @@ -1 +1 @@ -Subproject commit fafbd599d1df48b36bade4d08940ab79837aaf3b +Subproject commit 69102a786f3b888155bc18b8b6c58031c7d2fd98 diff --git a/src/shell/command_utils.h b/src/shell/command_utils.h index ea534a6f85..2aa480b9f3 100644 --- a/src/shell/command_utils.h +++ b/src/shell/command_utils.h @@ -5,6 +5,42 @@ #pragma once #include +#include +#include + +#include "shell/argh.h" +#include + +inline bool validate_cmd(const argh::parser &cmd, + const std::set ¶ms, + const std::set &flags) +{ + if (cmd.size() > 1) { + fmt::print(stderr, "too many params!\n"); + return false; + } + + for (const auto ¶m : cmd.params()) { + if (params.find(param.first) == params.end()) { + fmt::print(stderr, "unknown param {} = {}\n", param.first, param.second); + return false; + } + } + + for (const auto &flag : cmd.flags()) { + if (params.find(flag) != params.end()) { + fmt::print(stderr, "missing value of {}\n", flag); + return false; + } + + if (flags.find(flag) == flags.end()) { + fmt::print(stderr, "unknown flag {}\n", flag); + return false; + } + } + + return true; +} #define verify_logged(exp, ...) \ do { \ diff --git a/src/shell/commands.h b/src/shell/commands.h index 27618f026b..af29c2cc34 100644 --- a/src/shell/commands.h +++ b/src/shell/commands.h @@ -25,7 +25,6 @@ #include #include "command_executor.h" -#include "command_utils.h" #include "command_helper.h" #include "args.h" @@ -262,3 +261,7 @@ bool pause_bulk_load(command_executor *e, shell_context *sc, arguments args); bool restart_bulk_load(command_executor *e, shell_context *sc, arguments args); bool cancel_bulk_load(command_executor *e, shell_context *sc, arguments args); + +// == detect hotkey (see 'commands/detect_hotkey.cpp') == // + +bool detect_hotkey(command_executor *e, shell_context *sc, arguments args); diff --git a/src/shell/commands/detect_hotkey.cpp b/src/shell/commands/detect_hotkey.cpp new file mode 100644 index 0000000000..900e70f7cc --- /dev/null +++ b/src/shell/commands/detect_hotkey.cpp @@ -0,0 +1,126 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "shell/commands.h" +#include "shell/argh.h" +#include + +bool generate_hotkey_request(dsn::replication::detect_hotkey_request &req, + const std::string &hotkey_action, + const std::string &hotkey_type, + int app_id, + int partition_index, + std::string &err_info) +{ + if (!strcasecmp(hotkey_type.c_str(), "read")) { + req.type = dsn::replication::hotkey_type::type::READ; + } else if (!strcasecmp(hotkey_type.c_str(), "write")) { + req.type = dsn::replication::hotkey_type::type::WRITE; + } else { + err_info = fmt::format("\"{}\" is an invalid hotkey type (should be 'read' or 'write')\n", + hotkey_type); + return false; + } + + if (!strcasecmp(hotkey_action.c_str(), "start")) { + req.action = dsn::replication::detect_action::START; + } else if (!strcasecmp(hotkey_action.c_str(), "stop")) { + req.action = dsn::replication::detect_action::STOP; + } else { + err_info = + fmt::format("\"{}\" is an invalid hotkey detect action (should be 'start' or 'stop')\n", + hotkey_action); + return false; + } + req.pid = dsn::gpid(app_id, partition_index); + return true; +} + +// TODO: (Tangyanzhao) merge hotspot_partition_calculator::send_detect_hotkey_request +bool detect_hotkey(command_executor *e, shell_context *sc, arguments args) +{ + // detect_hotkey + // <-a|--app_id str><-p|--partition_index num><-t|--hotkey_type read|write> + // <-c|--detect_action start|stop><-d|--address str> + const std::set params = {"a", + "app_id", + "p", + "partition_index", + "c", + "hotkey_action", + "t", + "hotkey_type", + "d", + "address"}; + const std::set flags = {}; + argh::parser cmd(args.argc, args.argv, argh::parser::PREFER_PARAM_FOR_UNREG_OPTION); + if (!validate_cmd(cmd, params, flags)) { + return false; + } + + int app_id; + if (!dsn::buf2int32(cmd({"-a", "--app_id"}).str(), app_id)) { + fmt::print(stderr, "\"{}\" is an invalid num\n", cmd({"-a", "--app_id"}).str()); + return false; + } + + int partition_index; + if (!dsn::buf2int32(cmd({"-p", "--partition_index"}).str(), partition_index)) { + fmt::print(stderr, "\"{}\" is an invalid num\n", cmd({"-p", "--partition_index"}).str()); + return false; + } + + dsn::rpc_address target_address; + std::string ip_str = cmd({"-d", "--address"}).str(); + if (!target_address.from_string_ipv4(ip_str.c_str())) { + fmt::print("invalid ip, error={}\n", ip_str); + return false; + } + + std::string err_info; + std::string hotkey_action = cmd({"-c", "--hotkey_action"}).str(); + std::string hotkey_type = cmd({"-t", "--hotkey_type"}).str(); + dsn::replication::detect_hotkey_request req; + if (!generate_hotkey_request( + req, hotkey_action, hotkey_type, app_id, partition_index, err_info)) { + fmt::print(stderr, err_info); + return false; + } + + detect_hotkey_response resp; + auto err = sc->ddl_client->detect_hotkey(dsn::rpc_address(target_address), req, resp); + if (err != dsn::ERR_OK) { + fmt::print(stderr, + "Hotkey detect rpc sending failed, in {}.{}, error_hint:{}\n", + app_id, + partition_index, + err.to_string()); + return false; + } + + if (resp.err != dsn::ERR_OK) { + fmt::print(stderr, + "Hotkey detect rpc performed failed, in {}.{}, error_hint:{} {}\n", + app_id, + partition_index, + resp.err, + resp.err_hint); + return false; + } + + return true; +} diff --git a/src/shell/commands/disk_rebalance.cpp b/src/shell/commands/disk_rebalance.cpp index 97462c9f8e..6ced3ddbb0 100644 --- a/src/shell/commands/disk_rebalance.cpp +++ b/src/shell/commands/disk_rebalance.cpp @@ -10,40 +10,8 @@ #include #include #include -#include #include -bool validate_cmd(const argh::parser &cmd, - const std::set ¶ms, - const std::set &flags) -{ - if (cmd.size() > 1) { - fmt::print(stderr, "too many params!\n"); - return false; - } - - for (const auto ¶m : cmd.params()) { - if (params.find(param.first) == params.end()) { - fmt::print(stderr, "unknown param {} = {}\n", param.first, param.second); - return false; - } - } - - for (const auto &flag : cmd.flags()) { - if (params.find(flag) != params.end()) { - fmt::print(stderr, "missing value of {}\n", flag); - return false; - } - - if (flags.find(flag) == flags.end()) { - fmt::print(stderr, "unknown flag {}\n", flag); - return false; - } - } - - return true; -} - bool query_disk_info( shell_context *sc, const argh::parser &cmd, diff --git a/src/shell/main.cpp b/src/shell/main.cpp index ea2688b2ba..c747556615 100644 --- a/src/shell/main.cpp +++ b/src/shell/main.cpp @@ -474,6 +474,16 @@ static command_executor commands[] = { "<-a --app_name str> [-f --forced]", cancel_bulk_load, }, + { + "detect_hotkey", + "start or stop hotkey detection on a replica of a replica server", + "<-a|--app_id num> " + "<-p|--partition_index num> " + "<-t|--hotkey_type read|write> " + "<-c|--detect_action start|stop> " + "<-d|--address str>", + detect_hotkey, + }, { "exit", "exit shell", "", exit_shell, },