Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

ddl_client: add duplication related commands #235

Merged
merged 5 commits into from
Mar 26, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 29 additions & 34 deletions include/dsn/dist/replication/duplication_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@

#pragma once

#include <dsn/dist/fmt_logging.h>
#include <dsn/cpp/rpc_holder.h>
#include <dsn/cpp/json_helper.h>
#include <dsn/utility/string_conv.h>
#include <dsn/utility/errors.h>
#include <dsn/dist/replication/replication_types.h>

namespace dsn {
namespace replication {
Expand All @@ -42,41 +41,37 @@ typedef rpc_holder<duplication_sync_request, duplication_sync_response> duplicat

typedef int32_t dupid_t;

inline bool convert_str_to_dupid(string_view str, dupid_t *dupid) { return buf2int32(str, *dupid); }
extern const char *duplication_status_to_string(duplication_status::type status);

inline const char *duplication_status_to_string(const duplication_status::type &status)
{
auto it = _duplication_status_VALUES_TO_NAMES.find(status);
dassert(it != _duplication_status_VALUES_TO_NAMES.end(),
"unexpected type of duplication_status: %d",
status);
return it->second;
}
/// Returns the cluster name (i.e, "onebox") if it's configured under
/// "replication" section:
/// [replication]
/// cluster_name = "onebox"
extern const char *get_current_cluster_name();

inline void json_encode(dsn::json::JsonWriter &out, const duplication_status::type &s)
{
json_encode(out, duplication_status_to_string(s));
}
/// Returns the cluster id of url specified in the duplication-group section
/// of your configuration, for example:
///
/// ```
/// [duplication-group]
/// wuhan-mi-srv-ad = 3
/// tianjin-mi-srv-ad = 4
/// ```
///
/// The returned cluster id of get_duplication_cluster_id("wuhan-mi-srv-ad") is 3.
extern error_with<uint8_t> get_duplication_cluster_id(string_view cluster_name);

inline bool json_decode(const dsn::json::JsonObject &in, duplication_status::type &s)
{
static const std::map<std::string, duplication_status::type>
_duplication_status_NAMES_TO_VALUES = {
{"DS_INIT", duplication_status::DS_INIT},
{"DS_PAUSE", duplication_status::DS_PAUSE},
{"DS_START", duplication_status::DS_START},
{"DS_REMOVED", duplication_status::DS_REMOVED},
};
/// Returns a displayable string for this duplication_entry.
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
extern std::string duplication_entry_to_string(const duplication_entry &dup);

/// Returns a mapping from cluster_name to cluster_id.
extern const std::map<std::string, uint8_t> &get_duplication_group();

std::string name;
json_decode(in, name);
auto it = _duplication_status_NAMES_TO_VALUES.find(name);
if (it != _duplication_status_NAMES_TO_VALUES.end()) {
s = it->second;
return true;
}
dfatal_f("unexpected duplication_status name: {}", name);
__builtin_unreachable();
extern const std::set<uint8_t> &get_distinct_cluster_id_set();

inline bool is_cluster_id_configured(uint8_t cid)
{
return get_distinct_cluster_id_set().find(cid) != get_distinct_cluster_id_set().end();
}

} // namespace replication
Expand Down
34 changes: 32 additions & 2 deletions include/dsn/dist/replication/replication_ddl_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
#include <dsn/dist/replication.h>
#include <dsn/tool-api/task_tracker.h>
#include <dsn/tool-api/async_calls.h>
#include <dsn/utility/errors.h>
#include <vector>

namespace dsn {
namespace replication {
Expand Down Expand Up @@ -108,6 +110,12 @@ class replication_ddl_client
bool skip_lost_partitions,
const std::string &outfile);

error_with<duplication_add_response>
add_dup(std::string app_name, std::string remote_address, bool freezed);
error_with<duplication_status_change_response>
change_dup_status(std::string app_name, int dupid, duplication_status::type status);
error_with<duplication_query_response> query_dup(std::string app_name);

// get host name from ip series
// if can't get a hostname from ip(maybe no hostname or other errors), return UNRESOLVABLE
// if multiple hostname got, return <host1,host2> ...
Expand Down Expand Up @@ -205,9 +213,31 @@ class replication_ddl_client
return task;
}

/// Send request to meta server synchronously.
template <typename TRpcHolder, typename TResponse = typename TRpcHolder::response_type>
error_with<TResponse> call_rpc_sync(TRpcHolder rpc, int reply_thread_hash = 0)
{
// Retry at maximum 2 times when error occurred.
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
error_code err = ERR_UNKNOWN;
for (int retry = 0; retry < 2; retry++) {
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
task_ptr task = rpc.call(_meta_server,
&_tracker,
[&err](error_code code) { err = code; },
reply_thread_hash);
task->wait();
if (err == ERR_OK) {
break;
}
}
if (err != ERR_OK) {
return error_s::make(err, "unable to send rpc to server");
}
return error_with<TResponse>(std::move(rpc.response()));
}

private:
dsn::rpc_address _meta_server;
dsn::task_tracker _tracker;
};
}
} // namespace
} // namespace replication
} // namespace dsn
156 changes: 156 additions & 0 deletions src/dist/replication/common/duplication_common.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* The MIT License (MIT)
*
* Copyright (c) 2015 Microsoft Corporation
*
* -=- Robust Distributed System Nucleus (rDSN) -=-
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

#include <dsn/dist/replication/replication_types.h>
#include <dsn/dist/replication/duplication_common.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/utility/singleton.h>
#include <rapidjson/document.h>
#include <rapidjson/stringbuffer.h>
#include <rapidjson/prettywriter.h>

namespace dsn {
namespace replication {

/*extern*/ const char *duplication_status_to_string(duplication_status::type status)
{
auto it = _duplication_status_VALUES_TO_NAMES.find(status);
dassert(it != _duplication_status_VALUES_TO_NAMES.end(),
"unexpected type of duplication_status: %d",
status);
return it->second;
}

/*extern*/ const char *get_current_cluster_name()
{
static const char *cluster_name =
dsn_config_get_value_string("replication", "cluster_name", "", "name of this cluster");
dassert(!string_view(cluster_name).empty(), "cluster_name is not set");
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
return cluster_name;
}

namespace internal {

class duplication_group_registry : public utils::singleton<duplication_group_registry>
{
private:
std::map<std::string, uint8_t> _group;
std::set<uint8_t> _distinct_cids;

public:
duplication_group_registry()
{
std::vector<std::string> clusters;
dsn_config_get_all_keys("duplication-group", clusters);
for (std::string &cluster : clusters) {
int64_t cluster_id =
dsn_config_get_value_int64("duplication-group", cluster.data(), 0, "");
dassert(cluster_id < 128 && cluster_id > 0,
"cluster_id(%zd) for %s should be in [1, 127]",
cluster_id,
cluster.data());
_group.emplace(cluster, static_cast<uint8_t>(cluster_id));
}
dassert_f(clusters.size() == _group.size(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

独立集群(一个group里面只有一个cluster)的情况, 这里会fail掉吗?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里check的是配置错误

[duplication-group]
c3srv-ai=1
c3srv-ai=3

这种就 fail 掉,测试了,只有一个 cluster 不会fail

"there might be duplicate cluster_name in configuration");

for (const auto &kv : _group) {
_distinct_cids.insert(kv.second);
}
dassert_f(_distinct_cids.size() == _group.size(),
"there might be duplicate cluster_id in configuration");
}

error_with<uint8_t> get_cluster_id(string_view cluster_name) const
{
if (cluster_name.empty()) {
return error_s::make(ERR_INVALID_PARAMETERS, "cluster_name is empty");
}
if (_group.empty()) {
return error_s::make(ERR_OBJECT_NOT_FOUND, "`duplication-group` is not configured");
}

auto it = _group.find(std::string(cluster_name));
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
if (it == _group.end()) {
return error_s::make(ERR_OBJECT_NOT_FOUND, "failed to get cluster id for ")
<< cluster_name.data();
}
return it->second;
}

const std::map<std::string, uint8_t> &get_duplication_group() { return _group; }
const std::set<uint8_t> &get_distinct_cluster_id_set() { return _distinct_cids; }
};

} // namespace internal

/*extern*/ error_with<uint8_t> get_duplication_cluster_id(string_view cluster_name)
{
return internal::duplication_group_registry::instance().get_cluster_id(cluster_name);
}

/*extern*/ std::string duplication_entry_to_string(const duplication_entry &dup)
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
{
rapidjson::Document doc;
doc.SetObject();
auto &alloc = doc.GetAllocator();

doc.AddMember("dupid", dup.dupid, alloc);
doc.AddMember("status", rapidjson::StringRef(duplication_status_to_string(dup.status)), alloc);
doc.AddMember("remote",
rapidjson::StringRef(dup.remote_address.data(), dup.remote_address.length()),
alloc);
doc.AddMember("create_ts", dup.create_ts, alloc);

doc.AddMember("progress", rapidjson::Value(), alloc);
auto &p = doc["progress"];
p.SetArray();
for (const auto &kv : dup.progress) {
rapidjson::Value part;
part.SetObject();
part.AddMember("pid", kv.first, alloc);
part.AddMember("confirmed", kv.second, alloc);
p.PushBack(std::move(part), alloc);
}

rapidjson::StringBuffer sb;
rapidjson::Writer<rapidjson::StringBuffer> writer(sb);
doc.Accept(writer);
return sb.GetString();
}

/*extern*/ const std::map<std::string, uint8_t> &get_duplication_group()
{
return internal::duplication_group_registry::instance().get_duplication_group();
}

/*extern*/ const std::set<uint8_t> &get_distinct_cluster_id_set()
{
return internal::duplication_group_registry::instance().get_distinct_cluster_id_set();
}

} // namespace replication
} // namespace dsn
53 changes: 53 additions & 0 deletions src/dist/replication/common/test/duplication_common_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* The MIT License (MIT)
*
* Copyright (c) 2015 Microsoft Corporation
*
* -=- Robust Distributed System Nucleus (rDSN) -=-
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

#include <dsn/dist/replication/duplication_common.h>
#include <gtest/gtest.h>

namespace dsn {
namespace replication {

TEST(duplication_common, get_duplication_cluster_id)
{
ASSERT_EQ(get_duplication_cluster_id("master-cluster").get_value(), 1);
ASSERT_EQ(get_duplication_cluster_id("slave-cluster").get_value(), 2);

ASSERT_EQ(get_duplication_cluster_id("").get_error().code(), ERR_INVALID_PARAMETERS);
ASSERT_EQ(get_duplication_cluster_id("unknown").get_error().code(), ERR_OBJECT_NOT_FOUND);
}

TEST(duplication_common, get_current_cluster_name)
{
ASSERT_STREQ(get_current_cluster_name(), "master-cluster");
}

TEST(duplication_common, get_distinct_cluster_id_set)
{
ASSERT_EQ(get_distinct_cluster_id_set(), std::set<uint8_t>({1, 2}));
}

} // namespace replication
} // namespace dsn
Loading