From 39d6b1f986551682c3996e0b560e47e8f28039e5 Mon Sep 17 00:00:00 2001 From: WeijieSun Date: Thu, 20 Dec 2018 13:54:52 +0800 Subject: [PATCH 1/4] replication: remove uri resolver, move feature to replication client --- .../replication}/partition_resolver.h | 104 ++++---- include/dsn/tool-api/group_address.h | 8 +- include/dsn/tool-api/rpc_address.h | 20 +- include/dsn/tool-api/task.h | 4 +- include/dsn/tool-api/uri_address.h | 139 ----------- src/core/core/common_providers.cpp | 53 ---- src/core/core/core_main.cpp | 9 - src/core/core/rpc_address.cpp | 22 +- src/core/core/rpc_engine.cpp | 112 +-------- src/core/core/rpc_engine.h | 10 - src/core/core/service_engine.cpp | 3 +- src/core/core/uri_address.cpp | 226 ------------------ src/core/tests/address.cpp | 18 -- src/dist/replication/CMakeLists.txt | 1 + src/dist/replication/client/CMakeLists.txt | 21 ++ .../replication/client/partition_resolver.cpp | 121 ++++++++++ .../client/partition_resolver_manager.cpp | 77 ++++++ .../client/partition_resolver_manager.h | 37 ++- .../client}/partition_resolver_simple.cpp | 97 +++----- .../client}/partition_resolver_simple.h | 20 +- .../storage_engine/simple_kv/CMakeLists.txt | 2 +- .../storage_engine/simple_kv/config.ini | 9 +- .../simple_kv/simple_kv.app.example.h | 13 +- .../simple_kv/simple_kv.client.h | 123 +++++----- .../replication/test/simple_kv/CMakeLists.txt | 1 + .../replication/test/simple_kv/client.cpp | 17 +- 26 files changed, 419 insertions(+), 848 deletions(-) rename include/dsn/{tool-api => dist/replication}/partition_resolver.h (52%) delete mode 100644 include/dsn/tool-api/uri_address.h delete mode 100644 src/core/core/common_providers.cpp delete mode 100644 src/core/core/uri_address.cpp create mode 100644 src/dist/replication/client/CMakeLists.txt create mode 100644 src/dist/replication/client/partition_resolver.cpp create mode 100644 src/dist/replication/client/partition_resolver_manager.cpp rename include/dsn/dist/dist.providers.common.h => src/dist/replication/client/partition_resolver_manager.h (62%) rename src/{core/core => dist/replication/client}/partition_resolver_simple.cpp (84%) rename src/{core/core => dist/replication/client}/partition_resolver_simple.h (93%) diff --git a/include/dsn/tool-api/partition_resolver.h b/include/dsn/dist/replication/partition_resolver.h similarity index 52% rename from include/dsn/tool-api/partition_resolver.h rename to include/dsn/dist/replication/partition_resolver.h index 245ac1bd2a..102dc90a3c 100644 --- a/include/dsn/tool-api/partition_resolver.h +++ b/include/dsn/dist/replication/partition_resolver.h @@ -24,50 +24,51 @@ * THE SOFTWARE. */ -/* - * Description: - * resolve URI as rpc addresses (IP or group) - * - * Revision history: - * Feb., 2016, @imzhenyu (Zhenyu Guo), first draft - * xxxx-xx-xx, author, fix bug about xxx - */ - #pragma once #include #include #include #include +#include +#include namespace dsn { -namespace dist { -void register_partition_resolver_providers(); +namespace replication { class partition_resolver : public ref_counter { public: - /*! app resolve result. */ - struct resolve_result + static dsn::ref_ptr + get_resolver(const char *cluster_name, + const std::vector &meta_list, + const char *app_path); + + template + dsn::rpc_response_task_ptr call_op(dsn::task_code code, + TReq &&request, + dsn::task_tracker *tracker, + TCallback &&callback, + std::chrono::milliseconds timeout, + uint64_t partition_hash, + int reply_hash = 0) { - error_code err; ///< ERR_OK - ///< ERR_SERVICE_NOT_FOUND if resolver or app is missing - ///< ERR_IO_PENDING if resolve in is progress, callers - ///< should call resolve_async in this case - rpc_address address; ///< IPv4 of the target to send request to - dsn::gpid pid; ///< global partition indentity - }; - -public: - template - static partition_resolver *create(rpc_address &meta_server, const char *app_path) - { - return new T(meta_server, app_path); + dsn::message_ex *msg = dsn::message_ex::create_request( + code, static_cast(timeout.count()), 0, partition_hash); + marshall(msg, std::forward(request)); + dsn::rpc_response_task_ptr response_task = rpc::create_rpc_response_task( + msg, tracker, std::forward(callback), reply_hash); + call_task(response_task); + return response_task; } - typedef partition_resolver *(*factory)(rpc_address &, const char *); + void call_task(const dsn::rpc_response_task_ptr &task); -public: + std::string get_app_path() const { return _app_path; } + + dsn::rpc_address get_meta_server() const { return _meta_server; } + +protected: partition_resolver(rpc_address meta_server, const char *app_path) : _app_path(app_path), _meta_server(meta_server) { @@ -75,19 +76,31 @@ class partition_resolver : public ref_counter virtual ~partition_resolver() {} + struct resolve_result + { + ///< ERR_OK + ///< ERR_SERVICE_NOT_FOUND if resolver or app is missing + ///< ERR_IO_PENDING if resolve in is progress, callers + ///< should call resolve_async in this case + error_code err; + ///< IPv4 of the target to send request to + rpc_address address; + ///< global partition indentity + dsn::gpid pid; + }; + /** - * resolve partition_hash into IP or group addresses to know what to connect next - * - * \param partition_hash the partition hash - * \param callback callback invoked on completion or timeout - * \param timeout_ms timeout to execute the callback - * - * \return see \ref resolve_result for details - */ - virtual void - resolve(uint64_t partition_hash, - std::function &&callback, - int timeout_ms) = 0; + * resolve partition_hash into IP or group addresses to know what to connect next + * + * \param partition_hash the partition hash + * \param callback callback invoked on completion or timeout + * \param timeout_ms timeout to execute the callback + * + * \return see \ref resolve_result for details + */ + virtual void resolve(uint64_t partition_hash, + std::function &&callback, + int timeout_ms) = 0; /*! failure handler when access failed for certain partition @@ -110,15 +123,12 @@ class partition_resolver : public ref_counter virtual int get_partition_index(int partition_count, uint64_t partition_hash) = 0; - std::string get_app_path() const { return _app_path; } - - ::dsn::rpc_address get_meta_server() const { return _meta_server; } - -protected: + std::string _cluster_name; std::string _app_path; rpc_address _meta_server; }; typedef ref_ptr partition_resolver_ptr; -} -} + +} // namespace replication +} // namespace dsn diff --git a/include/dsn/tool-api/group_address.h b/include/dsn/tool-api/group_address.h index 6a7f8be820..b164882833 100644 --- a/include/dsn/tool-api/group_address.h +++ b/include/dsn/tool-api/group_address.h @@ -51,6 +51,12 @@ class rpc_group_address : public dsn::ref_counter rpc_group_address(const rpc_group_address &other); rpc_group_address &operator=(const rpc_group_address &other); bool add(rpc_address addr); + void add_list(const std::vector &list) + { + for (const rpc_address &r : list) { + add(r); + } + } void set_leader(rpc_address addr); bool remove(rpc_address addr); bool contains(rpc_address addr); @@ -206,4 +212,4 @@ inline rpc_address rpc_group_address::next(rpc_address current) const } } } -} +} // namespace dsn diff --git a/include/dsn/tool-api/rpc_address.h b/include/dsn/tool-api/rpc_address.h index 44c12a8c6a..3318cb524f 100644 --- a/include/dsn/tool-api/rpc_address.h +++ b/include/dsn/tool-api/rpc_address.h @@ -34,18 +34,15 @@ #include #include -typedef enum dsn_host_type_t -{ +typedef enum dsn_host_type_t { HOST_TYPE_INVALID = 0, HOST_TYPE_IPV4 = 1, HOST_TYPE_GROUP = 2, - HOST_TYPE_URI = 3, } dsn_host_type_t; namespace dsn { class rpc_group_address; -class rpc_uri_address; class rpc_address { @@ -99,8 +96,6 @@ class rpc_address _addr.v4.port = port; } - void assign_uri(const char *host_uri); - void assign_group(const char *name); const char *to_string() const; @@ -146,8 +141,6 @@ class rpc_address return (rpc_group_address *)(uintptr_t)_addr.group.group; } - rpc_uri_address *uri_address() const { return (rpc_uri_address *)(uintptr_t)_addr.uri.uri; } - bool is_invalid() const { return _addr.v4.type == HOST_TYPE_INVALID; } // before you assign new value, must call set_invalid() to release original value @@ -162,8 +155,6 @@ class rpc_address switch (type()) { case HOST_TYPE_IPV4: return ip() == r.ip() && _addr.v4.port == r.port(); - case HOST_TYPE_URI: - return strcmp(to_string(), r.to_string()) == 0; case HOST_TYPE_GROUP: return _addr.group.group == r._addr.group.group; default: @@ -181,8 +172,6 @@ class rpc_address switch (type()) { case HOST_TYPE_IPV4: return ip() < r.ip() || (ip() == r.ip() && port() < r.port()); - case HOST_TYPE_URI: - return strcmp(to_string(), r.to_string()) < 0; case HOST_TYPE_GROUP: return _addr.group.group < r._addr.group.group; default: @@ -205,11 +194,6 @@ class rpc_address unsigned long long ip : 32; } v4; ///< \ref HOST_TYPE_IPV4 struct - { - unsigned long long type : 2; - unsigned long long uri : 62; - } uri; ///< \ref HOST_TYPE_URI - struct { unsigned long long type : 2; unsigned long long group : 62; ///< dsn_group_t @@ -230,8 +214,6 @@ struct hash<::dsn::rpc_address> switch (ep.type()) { case HOST_TYPE_IPV4: return std::hash()(ep.ip()) ^ std::hash()(ep.port()); - case HOST_TYPE_URI: - return std::hash()(std::string(ep.to_string())); case HOST_TYPE_GROUP: return std::hash()(ep.group_address()); default: diff --git a/include/dsn/tool-api/task.h b/include/dsn/tool-api/task.h index 5ae2e5188e..29a0420096 100644 --- a/include/dsn/tool-api/task.h +++ b/include/dsn/tool-api/task.h @@ -501,7 +501,7 @@ class rpc_response_task : public task // // TODO(sunweijie): totally elimite this feature // - const rpc_response_handler ¤t_handler() const { return _cb; } + void fetch_current_handler(rpc_response_handler &cb) { cb = std::move(_cb); } void replace_callback(rpc_response_handler &&cb) { task_state cur_state = state(); @@ -684,4 +684,4 @@ __inline /*static*/ env_provider *task::get_current_env() return tls_dsn.env; } -} // end namespace +} // namespace dsn diff --git a/include/dsn/tool-api/uri_address.h b/include/dsn/tool-api/uri_address.h deleted file mode 100644 index 9c584101a7..0000000000 --- a/include/dsn/tool-api/uri_address.h +++ /dev/null @@ -1,139 +0,0 @@ -/* - * The MIT License (MIT) - * - * Copyright (c) 2015 Microsoft Corporation - * - * -=- Robust Distributed System Nucleus (rDSN) -=- - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - * THE SOFTWARE. - */ - -/* - * Description: - * rpc-address that wrap and solve URI - * - * Revision history: - * Feb., 2016, @imzhenyu, first version - * xxxx-xx-xx, author, fix bug about xxx - */ - -#pragma once - -#include -#include -#include -#include -#include -#include - -namespace dsn { -/** A RPC URI address. */ -class rpc_uri_address : public dsn::ref_counter -{ -public: - /** - * Constructor. - * - * \param uri URI of the document, it is composed with three parts: - * dsn://meta-server:23356/app1 - * protocol : // resolver-address / app-path - */ - rpc_uri_address(const char *uri); - - /** - * Copy constructor. - */ - rpc_uri_address(const rpc_uri_address &other); - - rpc_uri_address &operator=(const rpc_uri_address &other); - - ~rpc_uri_address(); - - /** - * Gets URI address components , e.g., - * given dsn://meta-address:8080/app-path - * return - * - * \return The resolver address - */ - std::pair get_uri_components(); - - const char *uri() const { return _uri.c_str(); } - - dist::partition_resolver_ptr get_resolver() { return _resolver; } - -private: - ::dsn::dist::partition_resolver_ptr _resolver; - std::string _uri; -}; - -class uri_resolver -{ -public: - /** - * Constructor. - * - * \param name resolver name, e.g., dsn://meta-server:port - * \param factory factory for creating partition_resolver - * \param arguments end-point list which composes the meta-server group, - * e.g., host1:port1,host2:port2,host3:port3 - */ - uri_resolver(const char *name, const char *factory, const char *arguments); - - ~uri_resolver(); - - dist::partition_resolver_ptr get_app_resolver(const char *app); - - std::map get_all_app_resolvers(); - - const char *get_factory() const { return _factory.c_str(); } - - const char *get_arguments() const { return _arguments.c_str(); } - -private: - std::unordered_map - _apps; ///< app-path to app-resolver map - zrwlock_nr _apps_lock; - - rpc_address _meta_server; - std::string _name; - std::string _factory; - std::string _arguments; -}; - -class uri_resolver_manager -{ -public: - uri_resolver_manager(); - - std::shared_ptr get(rpc_uri_address *uri) const; - - std::map> get_all() const; - -private: - void setup_resolvers(); - - typedef std::unordered_map> resolvers; - resolvers _resolvers; - mutable utils::rw_lock_nr _lock; -}; - -// ------------------ inline implementation -------------------- -inline uri_resolver_manager::uri_resolver_manager() { setup_resolvers(); } -} diff --git a/src/core/core/common_providers.cpp b/src/core/core/common_providers.cpp deleted file mode 100644 index 23e38f4cb7..0000000000 --- a/src/core/core/common_providers.cpp +++ /dev/null @@ -1,53 +0,0 @@ -/* - * The MIT License (MIT) - * - * Copyright (c) 2015 Microsoft Corporation - * - * -=- Robust Distributed System Nucleus (rDSN) -=- - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - * THE SOFTWARE. - */ - -/* - * Description: - * partition resolver providers - * - * Revision history: - * Feb., 2016, @imzhenyu (Zhenyu Guo), first draft - * xxxx-xx-xx, author, fix bug about xxx - */ -#include "partition_resolver_simple.h" -#include - -namespace dsn { -namespace dist { -static bool register_component_provider(const char *name, - ::dsn::dist::partition_resolver::factory f) -{ - return dsn::utils::factory_store<::dsn::dist::partition_resolver>::register_factory( - name, f, PROVIDER_TYPE_MAIN); -} - -void register_common_providers() -{ - register_component_provider("partition_resolver_simple", - partition_resolver::create); -} -} -} diff --git a/src/core/core/core_main.cpp b/src/core/core/core_main.cpp index e46c8b0089..2c0173fdd7 100644 --- a/src/core/core/core_main.cpp +++ b/src/core/core/core_main.cpp @@ -47,18 +47,12 @@ #include #include -#include - -//# include - void dsn_core_init() { // register all providers dsn::tools::register_common_providers(); dsn::tools::register_hpc_providers(); - // dsn::tools::register_component_provider("thrift"); - // register all possible tools and toollets dsn::tools::register_tool("nativerun"); dsn::tools::register_tool("simulator"); @@ -66,9 +60,6 @@ void dsn_core_init() dsn::tools::register_toollet("profiler"); dsn::tools::register_toollet("fault_injector"); dsn::tools::register_toollet("explorer"); - - // register useful distributed framework providers - dsn::dist::register_common_providers(); } #if defined(__linux__) diff --git a/src/core/core/rpc_address.cpp b/src/core/core/rpc_address.cpp index 4511086890..8022ecf8e7 100644 --- a/src/core/core/rpc_address.cpp +++ b/src/core/core/rpc_address.cpp @@ -37,7 +37,6 @@ #include #include -#include #include #include @@ -146,25 +145,12 @@ rpc_address &rpc_address::operator=(const rpc_address &another) case HOST_TYPE_GROUP: group_address()->add_ref(); break; - case HOST_TYPE_URI: - uri_address()->add_ref(); - break; default: break; } return *this; } -void rpc_address::assign_uri(const char *host_uri) -{ - set_invalid(); - _addr.uri.type = HOST_TYPE_URI; - dsn::rpc_uri_address *addr = new dsn::rpc_uri_address(host_uri); - // take the lifetime of rpc_uri_address, release_ref when change value or call destructor - addr->add_ref(); - _addr.uri.uri = (uint64_t)addr; -} - void rpc_address::assign_group(const char *name) { set_invalid(); @@ -181,9 +167,6 @@ void rpc_address::set_invalid() case HOST_TYPE_GROUP: group_address()->release_ref(); break; - case HOST_TYPE_URI: - uri_address()->release_ref(); - break; default: break; } @@ -221,9 +204,6 @@ const char *rpc_address::to_string() const ip_len = strlen(p); snprintf_p(p + ip_len, sz - ip_len, ":%hu", port()); break; - case HOST_TYPE_URI: - p = (char *)uri_address()->uri(); - break; case HOST_TYPE_GROUP: p = (char *)group_address()->name(); break; @@ -234,4 +214,4 @@ const char *rpc_address::to_string() const return (const char *)p; } -} +} // namespace dsn diff --git a/src/core/core/rpc_engine.cpp b/src/core/core/rpc_engine.cpp index 0bebe34a29..31cac8d5c5 100644 --- a/src/core/core/rpc_engine.cpp +++ b/src/core/core/rpc_engine.cpp @@ -35,7 +35,6 @@ #include #include #include -#include #include #include #include @@ -57,6 +56,7 @@ class rpc_timeout_task : public task } virtual void exec() { _matcher->on_rpc_timeout(_id); } + private: // use the following if the matcher is per rpc session // rpc_client_matcher_ptr _matcher; @@ -516,8 +516,6 @@ error_code rpc_engine::start(const service_app_spec &aspec) sp.second.channel.to_string()); } - _uri_resolver_mgr.reset(new uri_resolver_manager()); - _local_primary_address = _client_nets[NET_HDR_DSN][0]->address(); _local_primary_address.set_port(aspec.ports.size() > 0 ? *aspec.ports.begin() : aspec.id); @@ -626,114 +624,6 @@ void rpc_engine::call(message_ex *request, const rpc_response_task_ptr &call) call_address(request->server_address, request, call); } -DEFINE_TASK_CODE(LPC_RPC_DELAY_CALL, TASK_PRIORITY_COMMON, THREAD_POOL_DEFAULT) - -void rpc_engine::call_uri(rpc_address addr, message_ex *request, const rpc_response_task_ptr &call) -{ - dbg_dassert(addr.type() == HOST_TYPE_URI, "only URI is now supported"); - auto &hdr = *request->header; - - auto resolver = request->server_address.uri_address()->get_resolver(); - if (nullptr == resolver) { - derror("call uri failed as no partition resolver found, uri = %s", - request->server_address.uri_address()->uri()); - - if (call != nullptr) { - call->enqueue(ERR_SERVICE_NOT_FOUND, nullptr); - } else { - // as ref_count for request may be zero - request->add_ref(); - request->release_ref(); - } - } else { // resolver != nullptr - if (call) { - uint64_t deadline_ms = dsn_now_ms() + hdr.client.timeout_ms; - auto old_callback = call->current_handler(); - - auto new_callback = [deadline_ms, old_callback]( - dsn::error_code err, dsn::message_ex *req, dsn::message_ex *resp) { - message_ex *req2 = (message_ex *)req; - if (req2->header->gpid.value() != 0 && err != ERR_OK && - err != ERR_HANDLER_NOT_FOUND && err != ERR_APP_NOT_EXIST && - err != ERR_OPERATION_DISABLED) { - auto resolver = req2->server_address.uri_address()->get_resolver(); - if (nullptr != resolver) { - resolver->on_access_failure(req2->header->gpid.get_partition_index(), err); - - // still got time, retry - uint64_t nms = dsn_now_ms(); - uint64_t gap = 8 << req2->send_retry_count; - if (gap > 1000) - gap = 1000; - if (nms + gap < deadline_ms) { - req2->send_retry_count++; - req2->header->client.timeout_ms = - static_cast(deadline_ms - nms - gap); - - rpc_response_task_ptr ctask = - dynamic_cast(task::get_current_task()); - dassert(ctask != nullptr, "current task must be rpc_response_task"); - ctask->replace_callback(std::move(old_callback)); - dassert(ctask->set_retry(false), - "rpc_response_task set retry failed, state = %s", - enum_to_string(ctask->state())); - - // sleep gap milliseconds before retry - tasking::enqueue(LPC_RPC_DELAY_CALL, - nullptr, - [ server = req2->server_address, ctask ]() { - dsn_rpc_call(server, ctask.get()); - }, - 0, - std::chrono::milliseconds(gap)); - return; - } else { - derror("service access failed (%s), no more time for further " - "tries, set error = ERR_TIMEOUT, trace_id = %016" PRIx64, - error_code(err).to_string(), - req2->header->trace_id); - err = ERR_TIMEOUT; - } - } - } - - if (old_callback) - old_callback(err, req, resp); - }; - - call->replace_callback(std::move(new_callback)); - } - - resolver->resolve(hdr.client.partition_hash, - [=](dist::partition_resolver::resolve_result &&result) mutable { - if (result.err == ERR_OK) { - // update gpid when necessary - auto &hdr2 = request->header; - if (hdr2->gpid.value() != result.pid.value()) { - dassert(hdr2->gpid.value() == 0, "inconsistent gpid"); - hdr2->gpid = result.pid; - - // update thread hash if not assigned by applications - if (hdr2->client.thread_hash == 0) { - hdr2->client.thread_hash = result.pid.thread_hash(); - } - } - - call_address(result.address, request, call); - } else { - if (call != nullptr) { - call->enqueue(result.err, nullptr); - } else { - // as ref_count for request may be zero - request->add_ref(); - request->release_ref(); - } - } - }, - hdr.client.timeout_ms); - } -} - void rpc_engine::call_group(rpc_address addr, message_ex *request, const rpc_response_task_ptr &call) diff --git a/src/core/core/rpc_engine.h b/src/core/core/rpc_engine.h index dd1097b797..3e33fe23f6 100644 --- a/src/core/core/rpc_engine.h +++ b/src/core/core/rpc_engine.h @@ -35,7 +35,6 @@ namespace dsn { class service_node; class rpc_engine; -class uri_resolver_manager; #define MAX_CLIENT_PORT 1023 @@ -160,10 +159,6 @@ class rpc_engine service_node *node() const { return _node; } ::dsn::rpc_address primary_address() const { return _local_primary_address; } rpc_client_matcher *matcher() { return &_rpc_matcher; } - uri_resolver_manager *uri_resolver_mgr() { return _uri_resolver_mgr.get(); } - - // call with URI address only - void call_uri(rpc_address addr, message_ex *request, const rpc_response_task_ptr &call); // call with group address only void call_group(rpc_address addr, message_ex *request, const rpc_response_task_ptr &call); @@ -193,8 +188,6 @@ class rpc_engine rpc_client_matcher _rpc_matcher; rpc_server_dispatcher _rpc_dispatcher; - std::unique_ptr _uri_resolver_mgr; - volatile bool _is_running; volatile bool _is_serving; }; @@ -208,9 +201,6 @@ rpc_engine::call_address(rpc_address addr, message_ex *request, const rpc_respon case HOST_TYPE_IPV4: call_ip(addr, request, call); break; - case HOST_TYPE_URI: - call_uri(addr, request, call); - break; case HOST_TYPE_GROUP: call_group(addr, request, call); break; diff --git a/src/core/core/service_engine.cpp b/src/core/core/service_engine.cpp index 5daefdebd9..e540d99d2f 100644 --- a/src/core/core/service_engine.cpp +++ b/src/core/core/service_engine.cpp @@ -31,7 +31,6 @@ #include #include -#include #include #include #include @@ -314,4 +313,4 @@ std::string service_engine::get_queue_info(const std::vector &args) return ss.str(); } -} // end namespace +} // namespace dsn diff --git a/src/core/core/uri_address.cpp b/src/core/core/uri_address.cpp deleted file mode 100644 index 9fa85b95a6..0000000000 --- a/src/core/core/uri_address.cpp +++ /dev/null @@ -1,226 +0,0 @@ -/* - * The MIT License (MIT) - * - * Copyright (c) 2015 Microsoft Corporation - * - * -=- Robust Distributed System Nucleus (rDSN) -=- - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - * THE SOFTWARE. - */ - -/* - * Description: - * What is this file about? - * - * Revision history: - * xxxx-xx-xx, author, first version - * xxxx-xx-xx, author, fix bug about xxx - */ - -#include -#include -#include -#include -#include -#include -#include -#include - -#include "rpc_engine.h" - -namespace dsn { -void uri_resolver_manager::setup_resolvers() -{ - // [uri-resolver.%resolver-address%] - // factory = %uri-resolver-factory% - // arguments = %uri-resolver-arguments% - - std::vector sections; - dsn_config_get_all_sections(sections); - - const int prefix_len = (const int)strlen("uri-resolver."); - for (auto &s : sections) { - if (s.substr(0, prefix_len) != std::string("uri-resolver.")) - continue; - - auto resolver_addr = s.substr(prefix_len); - auto it = _resolvers.find(resolver_addr); - if (it != _resolvers.end()) { - dwarn("duplicated uri-resolver definition in config file, for '%s'", - resolver_addr.c_str()); - continue; - } - - auto factory = dsn_config_get_value_string( - s.c_str(), - "factory", - "", - "partition-resolver factory name which creates the concrete partition-resolver object"); - auto arguments = - dsn_config_get_value_string(s.c_str(), "arguments", "", "uri-resolver ctor arguments"); - - auto resolver = new uri_resolver(resolver_addr.c_str(), factory, arguments); - _resolvers.emplace(resolver_addr, std::shared_ptr(resolver)); - - dinfo("initialize uri-resolver %s", resolver_addr.c_str()); - } -} - -std::shared_ptr uri_resolver_manager::get(rpc_uri_address *uri) const -{ - std::shared_ptr ret = nullptr; - auto pr = uri->get_uri_components(); - if (pr.first.length() == 0) - return ret; - - { - utils::auto_read_lock l(_lock); - auto it = _resolvers.find(pr.first); - if (it != _resolvers.end()) - ret = it->second; - } - - dassert(ret != nullptr, - "cannot find uri resolver for uri '%s' with resolver address as '%s', " - "please fix it by setting up a uri resolver section in config file, as follows:\n" - "[uri-resolver.%s]\n" - "factory = partition-resolver-factory (e.g., partition_resolver_simple)\n" - "arguments = uri-resolver-arguments (e.g., localhost:34601,localhost:34602)\n", - uri->uri(), - pr.first.c_str(), - pr.first.c_str()); - - return ret; -} - -std::map> uri_resolver_manager::get_all() const -{ - std::map> result; - - { - utils::auto_read_lock l(_lock); - result.insert(_resolvers.begin(), _resolvers.end()); - } - - return result; -} - -//--------------------------------------------------------------- - -rpc_uri_address::rpc_uri_address(const char *uri) : _uri(uri) -{ - auto r1 = task::get_current_rpc()->uri_resolver_mgr()->get(this); - if (r1.get()) { - _resolver = r1->get_app_resolver(get_uri_components().second.c_str()); - } -} - -rpc_uri_address::rpc_uri_address(const rpc_uri_address &other) -{ - _resolver = other._resolver; - _uri = other._uri; -} - -rpc_uri_address &rpc_uri_address::operator=(const rpc_uri_address &other) -{ - _resolver = other._resolver; - _uri = other._uri; - return *this; -} - -rpc_uri_address::~rpc_uri_address() { _resolver = nullptr; } - -std::pair rpc_uri_address::get_uri_components() -{ - auto it = _uri.find("://"); - if (it == std::string::npos) - return std::make_pair("", ""); - - auto it2 = _uri.find('/', it + 3); - if (it2 == std::string::npos) - return std::make_pair(_uri, ""); - - else - return std::make_pair(_uri.substr(0, it2), _uri.substr(it2 + 1)); -} - -//--------------------------------------------------------------- - -uri_resolver::uri_resolver(const char *name, const char *factory, const char *arguments) - : _name(name), _factory(factory), _arguments(arguments) -{ - _meta_server.assign_group(name); - - std::vector args; - utils::split_args(arguments, args, ','); - for (auto &arg : args) { - // name:port - auto pos1 = arg.find_first_of(':'); - if (pos1 != std::string::npos) { - ::dsn::rpc_address ep(arg.substr(0, pos1).c_str(), atoi(arg.substr(pos1 + 1).c_str())); - _meta_server.group_address()->add(ep); - } - } -} - -uri_resolver::~uri_resolver() {} - -dist::partition_resolver_ptr uri_resolver::get_app_resolver(const char *app) -{ - dist::partition_resolver_ptr rv = nullptr; - - { - zauto_read_lock l(_apps_lock); - auto it = _apps.find(app); - if (it != _apps.end()) { - rv = it->second; - } - } - - // create on demand - if (rv == nullptr) { - // here we can pass _meta_server to partition_resolver without clone, - // because the lift time of uri_resolver covers that of partition_resolver. - rv = utils::factory_store::create( - _factory.c_str(), ::dsn::PROVIDER_TYPE_MAIN, _meta_server, app); - - zauto_write_lock l(_apps_lock); - auto it = _apps.find(app); - if (it == _apps.end()) { - _apps.emplace(std::string(app), rv); - } else { - rv = it->second; - } - } - - return rv; -} - -std::map uri_resolver::get_all_app_resolvers() -{ - std::map result; - - { - zauto_read_lock l(_apps_lock); - result.insert(_apps.begin(), _apps.end()); - } - - return result; -} -} diff --git a/src/core/tests/address.cpp b/src/core/tests/address.cpp index 31e43e7bf4..02076dd372 100644 --- a/src/core/tests/address.cpp +++ b/src/core/tests/address.cpp @@ -35,7 +35,6 @@ #include #include -#include #include using namespace ::dsn; @@ -94,13 +93,6 @@ TEST(core, rpc_address_to_string) ASSERT_EQ(std::string("127.0.0.1:8080"), addr.to_std_string()); } - { - const char *uri = "http://localhost:8080/"; - dsn::rpc_address addr; - addr.assign_uri(uri); - ASSERT_EQ(std::string(uri), addr.to_std_string()); - } - { const char *name = "test_group"; dsn::rpc_address addr; @@ -128,16 +120,6 @@ TEST(core, dsn_address_build) ASSERT_EQ(addr, dsn::rpc_address(host_ipv4(127, 0, 0, 1), 8080)); } - { - const char *uri = "http://localhost:8080/"; - dsn::rpc_address addr; - addr.assign_uri(uri); - - ASSERT_EQ(addr.type(), HOST_TYPE_URI); - ASSERT_STREQ(uri, addr.uri_address()->uri()); - ASSERT_EQ(1, addr.uri_address()->get_count()); - } - { const char *name = "test_group"; dsn::rpc_address addr; diff --git a/src/dist/replication/CMakeLists.txt b/src/dist/replication/CMakeLists.txt index 10ee12af9c..f65af9880f 100644 --- a/src/dist/replication/CMakeLists.txt +++ b/src/dist/replication/CMakeLists.txt @@ -1,3 +1,4 @@ +add_subdirectory(client) add_subdirectory(common) add_subdirectory(ddl_lib) add_subdirectory(tool_lib) diff --git a/src/dist/replication/client/CMakeLists.txt b/src/dist/replication/client/CMakeLists.txt new file mode 100644 index 0000000000..5073c8de06 --- /dev/null +++ b/src/dist/replication/client/CMakeLists.txt @@ -0,0 +1,21 @@ +set(MY_PROJ_NAME dsn_replication_client) + +# Source files under CURRENT project directory will be automatically included. +# You can manually set MY_PROJ_SRC to include source files under other directories. +set(MY_PROJ_SRC "") + +# Search mode for source files under CURRENT project directory? +# "GLOB_RECURSE" for recursive search +# "GLOB" for non-recursive search +set(MY_SRC_SEARCH_MODE "GLOB") + +set(MY_PROJ_INC_PATH "") + +set(MY_PROJ_LIBS "") + +set(MY_PROJ_LIB_PATH "") + +# Extra files that will be installed +set(MY_BINPLACES "") + +dsn_add_static_library() diff --git a/src/dist/replication/client/partition_resolver.cpp b/src/dist/replication/client/partition_resolver.cpp new file mode 100644 index 0000000000..34d34dd01b --- /dev/null +++ b/src/dist/replication/client/partition_resolver.cpp @@ -0,0 +1,121 @@ +/* + * The MIT License (MIT) + * + * Copyright (c) 2015 Microsoft Corporation + * + * -=- Robust Distributed System Nucleus (rDSN) -=- + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distrib#ute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#include +#include +#include +#include "dist/replication/client/partition_resolver_simple.h" +#include "dist/replication/client/partition_resolver_manager.h" + +namespace dsn { +namespace replication { + +/*static*/ +partition_resolver_ptr partition_resolver::get_resolver(const char *cluster_name, + const std::vector &meta_list, + const char *app_path) +{ + return partition_resolver_manager::instance().find_or_create(cluster_name, meta_list, app_path); +} + +DEFINE_TASK_CODE(LPC_RPC_DELAY_CALL, TASK_PRIORITY_COMMON, THREAD_POOL_DEFAULT) +void partition_resolver::call_task(const rpc_response_task_ptr &t) +{ + auto &hdr = *(t->get_request()->header); + uint64_t deadline_ms = dsn_now_ms() + hdr.client.timeout_ms; + + rpc_response_handler old_callback; + t->fetch_current_handler(old_callback); + auto new_callback = [this, deadline_ms, oc = std::move(old_callback)]( + dsn::error_code err, dsn::message_ex *req, dsn::message_ex *resp) { + if (req->header->gpid.value() != 0 && err != ERR_OK && err != ERR_HANDLER_NOT_FOUND && + err != ERR_APP_NOT_EXIST && err != ERR_OPERATION_DISABLED) { + + on_access_failure(req->header->gpid.get_partition_index(), err); + // still got time, retry + uint64_t nms = dsn_now_ms(); + uint64_t gap = 8 << req->send_retry_count; + if (gap > 1000) + gap = 1000; + if (nms + gap < deadline_ms) { + req->send_retry_count++; + req->header->client.timeout_ms = static_cast(deadline_ms - nms - gap); + + rpc_response_task_ptr ctask = + dynamic_cast(task::get_current_task()); + partition_resolver *r = this; + + dassert(ctask != nullptr, "current task must be rpc_response_task"); + ctask->replace_callback(std::move(oc)); + dassert(ctask->set_retry(false), + "rpc_response_task set retry failed, state = %s", + enum_to_string(ctask->state())); + + // sleep gap milliseconds before retry + tasking::enqueue(LPC_RPC_DELAY_CALL, + nullptr, + [r, ctask]() { r->call_task(ctask); }, + 0, + std::chrono::milliseconds(gap)); + return; + } else { + derror("service access failed (%s), no more time for further " + "tries, set error = ERR_TIMEOUT, trace_id = %016" PRIx64, + err.to_string(), + req->header->trace_id); + err = ERR_TIMEOUT; + } + } + + if (oc) + oc(err, req, resp); + }; + t->replace_callback(std::move(new_callback)); + + resolve(hdr.client.partition_hash, + [t](resolve_result &&result) mutable { + if (result.err != ERR_OK) { + t->enqueue(result.err, nullptr); + return; + } + + // update gpid when necessary + auto &hdr = *(t->get_request()->header); + if (hdr.gpid.value() != result.pid.value()) { + dassert(hdr.gpid.value() == 0, "inconsistent gpid"); + hdr.gpid = result.pid; + + // update thread hash if not assigned by applications + if (hdr.client.thread_hash == 0) { + hdr.client.thread_hash = result.pid.thread_hash(); + } + } + dsn_rpc_call(result.address, t.get()); + }, + hdr.client.timeout_ms); +} +} // namespace replication +} // namespace dsn diff --git a/src/dist/replication/client/partition_resolver_manager.cpp b/src/dist/replication/client/partition_resolver_manager.cpp new file mode 100644 index 0000000000..5f6331bb5e --- /dev/null +++ b/src/dist/replication/client/partition_resolver_manager.cpp @@ -0,0 +1,77 @@ +/* + * The MIT License (MIT) + * + * Copyright (c) 2015 Microsoft Corporation + * + * -=- Robust Distributed System Nucleus (rDSN) -=- + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#include +#include +#include +#include "dist/replication/client/partition_resolver_manager.h" +#include "dist/replication/client/partition_resolver_simple.h" + +namespace dsn { +namespace replication { + +template +bool vector_equal(const std::vector &a, const std::vector &b) +{ + if (a.size() != b.size()) + return false; + for (const T &item : a) { + if (std::find(b.begin(), b.end(), item) == b.end()) + return false; + } + for (const T &item : b) { + if (std::find(a.begin(), a.end(), item) == a.end()) + return false; + } + return true; +} + +partition_resolver_ptr partition_resolver_manager::find_or_create( + const char *cluster_name, const std::vector &meta_list, const char *app_path) +{ + dsn::zauto_lock l(_lock); + std::map &app_map = _resolvers[cluster_name]; + partition_resolver_ptr &ptr = app_map[app_path]; + + if (ptr == nullptr) { + dsn::rpc_address meta_group; + meta_group.assign_group(cluster_name); + meta_group.group_address()->add_list(meta_list); + ptr = new partition_resolver_simple(meta_group, app_path); + return ptr; + } else { + dsn::rpc_address meta_group = ptr->get_meta_server(); + const std::vector &existing_list = meta_group.group_address()->members(); + if (!vector_equal(meta_list, existing_list)) { + derror("meta list not match for cluster(%s)", cluster_name); + return nullptr; + } + return ptr; + } +} + +} // namespace replication +} // namespace dsn diff --git a/include/dsn/dist/dist.providers.common.h b/src/dist/replication/client/partition_resolver_manager.h similarity index 62% rename from include/dsn/dist/dist.providers.common.h rename to src/dist/replication/client/partition_resolver_manager.h index f362e87f0c..2106fbd8cb 100644 --- a/include/dsn/dist/dist.providers.common.h +++ b/src/dist/replication/client/partition_resolver_manager.h @@ -24,19 +24,30 @@ * THE SOFTWARE. */ -/* - * Description: - * common dist providers built into rDSN core - * - * Revision history: - * Feb., 2016, @imzhenyu (Zhenyu Guo), first draft - * xxxx-xx-xx, author, fix bug about xxx - */ - #pragma once +#include +#include +#include +#include +#include +#include + namespace dsn { -namespace dist { -void register_common_providers(); -} -} +namespace replication { + +class partition_resolver_manager : public dsn::utils::singleton +{ +public: + partition_resolver_ptr find_or_create(const char *cluster_name, + const std::vector &meta_list, + const char *app_path); + +private: + dsn::zlock _lock; + // cluster_name -> + std::map> _resolvers; +}; + +} // namespace replication +} // namespace dsn diff --git a/src/core/core/partition_resolver_simple.cpp b/src/dist/replication/client/partition_resolver_simple.cpp similarity index 84% rename from src/core/core/partition_resolver_simple.cpp rename to src/dist/replication/client/partition_resolver_simple.cpp index 80195abff9..ebae61239f 100644 --- a/src/core/core/partition_resolver_simple.cpp +++ b/src/dist/replication/client/partition_resolver_simple.cpp @@ -1,46 +1,36 @@ /* -* The MIT License (MIT) -* -* Copyright (c) 2015 Microsoft Corporation -* -* -=- Robust Distributed System Nucleus (rDSN) -=- -* -* Permission is hereby granted, free of charge, to any person obtaining a copy -* of this software and associated documentation files (the "Software"), to deal -* in the Software without restriction, including without limitation the rights -* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -* copies of the Software, and to permit persons to whom the Software is -* furnished to do so, subject to the following conditions: -* -* The above copyright notice and this permission notice shall be included in -* all copies or substantial portions of the Software. -* -* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -* THE SOFTWARE. -*/ + * The MIT License (MIT) + * + * Copyright (c) 2015 Microsoft Corporation + * + * -=- Robust Distributed System Nucleus (rDSN) -=- + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ -/* -* Description: -* partition resolver simple provider implementation -* -* Revision history: -* Feb., 2016, @imzhenyu (Zhenyu Guo), first draft -* xxxx-xx-xx, author, fix bug about xxx -*/ - -#include "partition_resolver_simple.h" #include #include #include +#include "dist/replication/client/partition_resolver_simple.h" namespace dsn { -namespace dist { -//------------------------------------------------------------------------------------ +namespace replication { partition_resolver_simple::partition_resolver_simple(rpc_address meta_server, const char *app_path) : partition_resolver(meta_server, app_path), @@ -48,7 +38,6 @@ partition_resolver_simple::partition_resolver_simple(rpc_address meta_server, co _app_partition_count(-1), _app_is_stateful(true) { - dassert(meta_server.type() != HOST_TYPE_URI, "can not use uri address here"); } void partition_resolver_simple::resolve(uint64_t partition_hash, @@ -180,7 +169,7 @@ void partition_resolver_simple::call(request_context_ptr &&request, bool from_me if (from_meta_ack) { tasking::enqueue(LPC_REPLICATION_DELAY_QUERY_CONFIG, &_tracker, - [ =, req2 = request ]() mutable { call(std::move(req2), false); }, + [=, req2 = request]() mutable { call(std::move(req2), false); }, 0, std::chrono::seconds(1)); return; @@ -200,7 +189,7 @@ void partition_resolver_simple::call(request_context_ptr &&request, bool from_me request->timeout_timer = tasking::enqueue(LPC_REPLICATION_CLIENT_REQUEST_TIMEOUT, &_tracker, - [ =, req2 = request ]() mutable { on_timeout(std::move(req2)); }, + [=, req2 = request]() mutable { on_timeout(std::move(req2)); }, 0, std::chrono::milliseconds(timeout_ms)); } @@ -424,34 +413,8 @@ rpc_address partition_resolver_simple::get_address(const partition_configuration return config.last_drops[rand::next_u32(0, config.last_drops.size() - 1)]; } } - - // if (is_write || semantic == read_semantic::ReadLastUpdate) - // return config.primary; - - //// readsnapshot or readoutdated, using random - // else - //{ - // bool has_primary = false; - // int N = static_cast(config.secondaries.size()); - // if (!config.primary.is_invalid()) - // { - // N++; - // has_primary = true; - // } - - // if (0 == N) return config.primary; - - // int r = random32(0, 1000) % N; - // if (has_primary && r == N - 1) - // return config.primary; - // else - // return config.secondaries[r]; - //} } -// ERR_OBJECT_NOT_FOUND not in cache. -// ERR_IO_PENDING in cache but invalid, remove from cache. -// ERR_OK in cache and valid error_code partition_resolver_simple::get_address(int partition_index, /*out*/ rpc_address &addr) { // partition_configuration config; @@ -476,5 +439,5 @@ int partition_resolver_simple::get_partition_index(int partition_count, uint64_t { return partition_hash % static_cast(partition_count); } -} -} +} // namespace replication +} // namespace dsn diff --git a/src/core/core/partition_resolver_simple.h b/src/dist/replication/client/partition_resolver_simple.h similarity index 93% rename from src/core/core/partition_resolver_simple.h rename to src/dist/replication/client/partition_resolver_simple.h index b921f656da..25c9b703e7 100644 --- a/src/core/core/partition_resolver_simple.h +++ b/src/dist/replication/client/partition_resolver_simple.h @@ -24,26 +24,17 @@ * THE SOFTWARE. */ -/* - * Description: - * a simple uri resolver that queries meta server - * - * Revision history: - * Feb., 2016, @imzhenyu (Zhenyu Guo), first draft - * xxxx-xx-xx, author, fix bug about xxx - */ - #pragma once -#include #include #include #include #include +#include namespace dsn { -namespace dist { -#pragma pack(push, 4) +namespace replication { + class partition_resolver_simple : public partition_resolver { public: @@ -129,6 +120,5 @@ class partition_resolver_simple : public partition_resolver dsn::message_ex *response, int partition_index); }; -#pragma pack(pop) -} -} +} // namespace replication +} // namespace dsn diff --git a/src/dist/replication/storage_engine/simple_kv/CMakeLists.txt b/src/dist/replication/storage_engine/simple_kv/CMakeLists.txt index d25d454f9c..428fa1eb4f 100644 --- a/src/dist/replication/storage_engine/simple_kv/CMakeLists.txt +++ b/src/dist/replication/storage_engine/simple_kv/CMakeLists.txt @@ -13,7 +13,7 @@ set(MY_PROJ_INC_PATH "") set(MY_BOOST_PACKAGES system filesystem) -set(MY_PROJ_LIBS dsn_replica_server dsn_meta_server fmt) +set(MY_PROJ_LIBS dsn_replica_server dsn_meta_server dsn_replication_client fmt) set(MY_PROJ_LIB_PATH "") diff --git a/src/dist/replication/storage_engine/simple_kv/config.ini b/src/dist/replication/storage_engine/simple_kv/config.ini index 2add4a9bc3..c537457638 100644 --- a/src/dist/replication/storage_engine/simple_kv/config.ini +++ b/src/dist/replication/storage_engine/simple_kv/config.ini @@ -24,14 +24,12 @@ pools = THREAD_POOL_DEFAULT,THREAD_POOL_REPLICATION_LONG,THREAD_POOL_REPLICATION [apps.client] type = client -arguments = dsn://mycluster/simple_kv.instance0 +arguments = mycluster localhost:34601 simple_kv.instance0 run = true count = 1 pools = THREAD_POOL_DEFAULT [core] - - tool = simulator ;tool = nativerun ;toollets = tracer @@ -123,10 +121,6 @@ is_trace = false server_list = localhost:34601 min_live_node_count_for_unfreeze = 1 -[uri-resolver.dsn://mycluster] -factory = partition_resolver_simple -arguments = localhost:34601 - [replication.app] app_name = simple_kv.instance0 app_type = simple_kv @@ -136,7 +130,6 @@ stateful = true package_id = [replication] - prepare_timeout_ms_for_secondaries = 10000 prepare_timeout_ms_for_potential_secondaries = 20000 diff --git a/src/dist/replication/storage_engine/simple_kv/simple_kv.app.example.h b/src/dist/replication/storage_engine/simple_kv/simple_kv.app.example.h index d84346bed8..d2a8e79560 100644 --- a/src/dist/replication/storage_engine/simple_kv/simple_kv.app.example.h +++ b/src/dist/replication/storage_engine/simple_kv/simple_kv.app.example.h @@ -53,9 +53,10 @@ class simple_kv_client_app : public ::dsn::service_app if (args.size() < 2) return ::dsn::ERR_INVALID_PARAMETERS; - // argv[1]: e.g., dsn://mycluster/simple-kv.instance0 - _server.assign_uri(args[1].c_str()); - _simple_kv_client.reset(new simple_kv_client(_server)); + printf("%s %s %s\n", args[1].c_str(), args[2].c_str(), args[3].c_str()); + dsn::rpc_address meta; + meta.from_string_ipv4(args[2].c_str()); + _simple_kv_client.reset(new simple_kv_client(args[1].c_str(), {meta}, args[3].c_str())); _timer = ::dsn::tasking::enqueue_timer(LPC_SIMPLE_KV_TEST_TIMER, &_tracker, @@ -124,6 +125,6 @@ class simple_kv_client_app : public ::dsn::service_app std::unique_ptr _simple_kv_client; dsn::task_tracker _tracker; }; -} -} -} +} // namespace application +} // namespace replication +} // namespace dsn diff --git a/src/dist/replication/storage_engine/simple_kv/simple_kv.client.h b/src/dist/replication/storage_engine/simple_kv/simple_kv.client.h index 154e6a04d0..a1808c0198 100644 --- a/src/dist/replication/storage_engine/simple_kv/simple_kv.client.h +++ b/src/dist/replication/storage_engine/simple_kv/simple_kv.client.h @@ -2,6 +2,7 @@ #include #include #include +#include #include "dist/replication/storage_engine/simple_kv/simple_kv.code.definition.h" #include "dist/replication/storage_engine/simple_kv/simple_kv_types.h" @@ -11,7 +12,12 @@ namespace application { class simple_kv_client { public: - simple_kv_client(::dsn::rpc_address server) { _server = server; } + simple_kv_client(const char *cluster_name, + const std::vector &meta_list, + const char *app_name) + { + _resolver = partition_resolver::get_resolver(cluster_name, meta_list, app_name); + } simple_kv_client() {} @@ -22,19 +28,16 @@ class simple_kv_client std::pair<::dsn::error_code, std::string> read_sync(const std::string &key, std::chrono::milliseconds timeout = std::chrono::milliseconds(0), - int thread_hash = 0, - uint64_t partition_hash = 0, - dsn::optional<::dsn::rpc_address> server_addr = dsn::none) + uint64_t partition_hash = 0) { return ::dsn::rpc::wait_and_unwrap( - ::dsn::rpc::call(server_addr.unwrap_or(_server), - RPC_SIMPLE_KV_SIMPLE_KV_READ, - key, - nullptr, - empty_rpc_handler, - timeout, - thread_hash, - partition_hash)); + _resolver->call_op(RPC_SIMPLE_KV_SIMPLE_KV_READ, + key, + nullptr, + empty_rpc_handler, + timeout, + partition_hash, + 0)); } // - asynchronous with on-stack std::string and std::string @@ -42,20 +45,16 @@ class simple_kv_client ::dsn::task_ptr read(const std::string &key, TCallback &&callback, std::chrono::milliseconds timeout = std::chrono::milliseconds(0), - int thread_hash = 0, uint64_t partition_hash = 0, - int reply_thread_hash = 0, - dsn::optional<::dsn::rpc_address> server_addr = dsn::none) + int reply_thread_hash = 0) { - return ::dsn::rpc::call(server_addr.unwrap_or(_server), - RPC_SIMPLE_KV_SIMPLE_KV_READ, - key, - nullptr, - std::forward(callback), - timeout, - thread_hash, - partition_hash, - reply_thread_hash); + return _resolver->call_op(RPC_SIMPLE_KV_SIMPLE_KV_READ, + key, + nullptr, + std::forward(callback), + timeout, + partition_hash, + reply_thread_hash); } // ---------- call RPC_SIMPLE_KV_SIMPLE_KV_WRITE ------------ @@ -63,17 +62,13 @@ class simple_kv_client std::pair<::dsn::error_code, int32_t> write_sync(const kv_pair &pr, std::chrono::milliseconds timeout = std::chrono::milliseconds(0), - int thread_hash = 0, - uint64_t partition_hash = 0, - dsn::optional<::dsn::rpc_address> server_addr = dsn::none) + uint64_t partition_hash = 0) { - return ::dsn::rpc::wait_and_unwrap(::dsn::rpc::call(server_addr.unwrap_or(_server), - RPC_SIMPLE_KV_SIMPLE_KV_WRITE, + return dsn::rpc::wait_and_unwrap(_resolver->call_op(RPC_SIMPLE_KV_SIMPLE_KV_WRITE, pr, nullptr, empty_rpc_handler, timeout, - thread_hash, partition_hash, 0)); } @@ -83,20 +78,16 @@ class simple_kv_client ::dsn::task_ptr write(const kv_pair &pr, TCallback &&callback, std::chrono::milliseconds timeout = std::chrono::milliseconds(0), - int thread_hash = 0, uint64_t partition_hash = 0, - int reply_thread_hash = 0, - dsn::optional<::dsn::rpc_address> server_addr = dsn::none) + int reply_thread_hash = 0) { - return ::dsn::rpc::call(server_addr.unwrap_or(_server), - RPC_SIMPLE_KV_SIMPLE_KV_WRITE, - pr, - nullptr, - std::forward(callback), - timeout, - thread_hash, - partition_hash, - reply_thread_hash); + return _resolver->call_op(RPC_SIMPLE_KV_SIMPLE_KV_WRITE, + pr, + nullptr, + std::forward(callback), + timeout, + partition_hash, + reply_thread_hash); } // ---------- call RPC_SIMPLE_KV_SIMPLE_KV_APPEND ------------ @@ -104,18 +95,16 @@ class simple_kv_client std::pair<::dsn::error_code, int32_t> append_sync(const kv_pair &pr, std::chrono::milliseconds timeout = std::chrono::milliseconds(0), - int thread_hash = 0, - uint64_t partition_hash = 0, - dsn::optional<::dsn::rpc_address> server_addr = dsn::none) + uint64_t partition_hash = 0) { - return ::dsn::rpc::wait_and_unwrap(::dsn::rpc::call(server_addr.unwrap_or(_server), - RPC_SIMPLE_KV_SIMPLE_KV_APPEND, - pr, - nullptr, - empty_rpc_handler, - timeout, - thread_hash, - partition_hash)); + return ::dsn::rpc::wait_and_unwrap( + _resolver->call_op(RPC_SIMPLE_KV_SIMPLE_KV_APPEND, + pr, + nullptr, + empty_rpc_handler, + timeout, + partition_hash, + 0)); } // - asynchronous with on-stack kv_pair and int32_t @@ -123,25 +112,21 @@ class simple_kv_client ::dsn::task_ptr append(const kv_pair &pr, TCallback &&callback, std::chrono::milliseconds timeout = std::chrono::milliseconds(0), - int thread_hash = 0, uint64_t partition_hash = 0, - int reply_thread_hash = 0, - dsn::optional<::dsn::rpc_address> server_addr = dsn::none) + int reply_thread_hash = 0) { - return ::dsn::rpc::call(server_addr.unwrap_or(_server), - RPC_SIMPLE_KV_SIMPLE_KV_APPEND, - pr, - nullptr, - std::forward(callback), - timeout, - thread_hash, - partition_hash, - reply_thread_hash); + return _resolver->call_op(RPC_SIMPLE_KV_SIMPLE_KV_APPEND, + pr, + nullptr, + std::forward(callback), + timeout, + partition_hash, + reply_thread_hash); } private: - ::dsn::rpc_address _server; + dsn::replication::partition_resolver_ptr _resolver; }; -} -} -} +} // namespace application +} // namespace replication +} // namespace dsn diff --git a/src/dist/replication/test/simple_kv/CMakeLists.txt b/src/dist/replication/test/simple_kv/CMakeLists.txt index 1c28cd3e3b..b4cbe4f8ff 100644 --- a/src/dist/replication/test/simple_kv/CMakeLists.txt +++ b/src/dist/replication/test/simple_kv/CMakeLists.txt @@ -17,6 +17,7 @@ set(MY_PROJ_LIBS gtest dsn_replica_server dsn_meta_server dsn_replication_common + dsn_replication_client dsn.block_service.local dsn.block_service.fds dsn.failure_detector.multimaster diff --git a/src/dist/replication/test/simple_kv/client.cpp b/src/dist/replication/test/simple_kv/client.cpp index 479efd158e..98bb495cc6 100644 --- a/src/dist/replication/test/simple_kv/client.cpp +++ b/src/dist/replication/test/simple_kv/client.cpp @@ -61,16 +61,11 @@ ::dsn::error_code simple_kv_client_app::start(const std::vector &ar std::vector meta_servers; replica_helper::load_meta_servers(meta_servers); - _meta_server_group.assign_group("meta_servers"); - rpc_group_address *g = _meta_server_group.group_address(); - for (auto &ms : meta_servers) { - g->add(ms); - } + _meta_server_group.group_address()->add_list(meta_servers); - // argv[1]: e.g., dsn://mycluster/simple-kv.instance0 - _service_addr.assign_uri(args[1].c_str()); - _simple_kv_client.reset(new application::simple_kv_client(_service_addr)); + _simple_kv_client.reset( + new application::simple_kv_client("mycluster", meta_servers, "simple_kv.instance0")); dsn::tasking::enqueue( LPC_SIMPLE_KV_TEST, &_tracker, std::bind(&simple_kv_client_app::run, this)); @@ -179,6 +174,6 @@ void simple_kv_client_app::begin_read(int id, const std::string &key, int timeou }, std::chrono::milliseconds(timeout_ms)); } -} -} -} +} // namespace test +} // namespace replication +} // namespace dsn From 09d04525c3cfbe6098593b9aa0fd1797c53c82ef Mon Sep 17 00:00:00 2001 From: neverchanje Date: Sat, 22 Dec 2018 19:50:15 +0800 Subject: [PATCH 2/4] fix code format --- src/dist/replication/client/partition_resolver.cpp | 5 +++-- src/dist/replication/client/partition_resolver_simple.cpp | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/dist/replication/client/partition_resolver.cpp b/src/dist/replication/client/partition_resolver.cpp index 34d34dd01b..2b35770187 100644 --- a/src/dist/replication/client/partition_resolver.cpp +++ b/src/dist/replication/client/partition_resolver.cpp @@ -49,8 +49,9 @@ void partition_resolver::call_task(const rpc_response_task_ptr &t) rpc_response_handler old_callback; t->fetch_current_handler(old_callback); - auto new_callback = [this, deadline_ms, oc = std::move(old_callback)]( - dsn::error_code err, dsn::message_ex *req, dsn::message_ex *resp) { + auto new_callback = [ this, deadline_ms, oc = std::move(old_callback) ]( + dsn::error_code err, dsn::message_ex * req, dsn::message_ex * resp) + { if (req->header->gpid.value() != 0 && err != ERR_OK && err != ERR_HANDLER_NOT_FOUND && err != ERR_APP_NOT_EXIST && err != ERR_OPERATION_DISABLED) { diff --git a/src/dist/replication/client/partition_resolver_simple.cpp b/src/dist/replication/client/partition_resolver_simple.cpp index ebae61239f..582d033b0d 100644 --- a/src/dist/replication/client/partition_resolver_simple.cpp +++ b/src/dist/replication/client/partition_resolver_simple.cpp @@ -169,7 +169,7 @@ void partition_resolver_simple::call(request_context_ptr &&request, bool from_me if (from_meta_ack) { tasking::enqueue(LPC_REPLICATION_DELAY_QUERY_CONFIG, &_tracker, - [=, req2 = request]() mutable { call(std::move(req2), false); }, + [ =, req2 = request ]() mutable { call(std::move(req2), false); }, 0, std::chrono::seconds(1)); return; @@ -189,7 +189,7 @@ void partition_resolver_simple::call(request_context_ptr &&request, bool from_me request->timeout_timer = tasking::enqueue(LPC_REPLICATION_CLIENT_REQUEST_TIMEOUT, &_tracker, - [=, req2 = request]() mutable { on_timeout(std::move(req2)); }, + [ =, req2 = request ]() mutable { on_timeout(std::move(req2)); }, 0, std::chrono::milliseconds(timeout_ms)); } From 0900b9e63d154cd6258ad0b462df9643126e8d24 Mon Sep 17 00:00:00 2001 From: WeijieSun Date: Mon, 24 Dec 2018 13:06:25 +0800 Subject: [PATCH 3/4] fix based on comments --- .../dsn/dist/replication/partition_resolver.h | 10 +++++----- .../replication/client/partition_resolver.cpp | 7 +++---- .../client/partition_resolver_manager.cpp | 6 +++--- .../client/partition_resolver_manager.h | 4 ++-- .../client/partition_resolver_simple.cpp | 18 +++++++++--------- .../client/partition_resolver_simple.h | 2 +- 6 files changed, 23 insertions(+), 24 deletions(-) diff --git a/include/dsn/dist/replication/partition_resolver.h b/include/dsn/dist/replication/partition_resolver.h index 102dc90a3c..e51c774812 100644 --- a/include/dsn/dist/replication/partition_resolver.h +++ b/include/dsn/dist/replication/partition_resolver.h @@ -42,7 +42,7 @@ class partition_resolver : public ref_counter static dsn::ref_ptr get_resolver(const char *cluster_name, const std::vector &meta_list, - const char *app_path); + const char *app_name); template dsn::rpc_response_task_ptr call_op(dsn::task_code code, @@ -64,13 +64,13 @@ class partition_resolver : public ref_counter void call_task(const dsn::rpc_response_task_ptr &task); - std::string get_app_path() const { return _app_path; } + std::string get_app_name() const { return _app_name; } dsn::rpc_address get_meta_server() const { return _meta_server; } protected: - partition_resolver(rpc_address meta_server, const char *app_path) - : _app_path(app_path), _meta_server(meta_server) + partition_resolver(rpc_address meta_server, const char *app_name) + : _app_name(app_name), _meta_server(meta_server) { } @@ -124,7 +124,7 @@ class partition_resolver : public ref_counter virtual int get_partition_index(int partition_count, uint64_t partition_hash) = 0; std::string _cluster_name; - std::string _app_path; + std::string _app_name; rpc_address _meta_server; }; diff --git a/src/dist/replication/client/partition_resolver.cpp b/src/dist/replication/client/partition_resolver.cpp index 2b35770187..7baae60638 100644 --- a/src/dist/replication/client/partition_resolver.cpp +++ b/src/dist/replication/client/partition_resolver.cpp @@ -36,9 +36,9 @@ namespace replication { /*static*/ partition_resolver_ptr partition_resolver::get_resolver(const char *cluster_name, const std::vector &meta_list, - const char *app_path) + const char *app_name) { - return partition_resolver_manager::instance().find_or_create(cluster_name, meta_list, app_path); + return partition_resolver_manager::instance().find_or_create(cluster_name, meta_list, app_name); } DEFINE_TASK_CODE(LPC_RPC_DELAY_CALL, TASK_PRIORITY_COMMON, THREAD_POOL_DEFAULT) @@ -54,7 +54,6 @@ void partition_resolver::call_task(const rpc_response_task_ptr &t) { if (req->header->gpid.value() != 0 && err != ERR_OK && err != ERR_HANDLER_NOT_FOUND && err != ERR_APP_NOT_EXIST && err != ERR_OPERATION_DISABLED) { - on_access_failure(req->header->gpid.get_partition_index(), err); // still got time, retry uint64_t nms = dsn_now_ms(); @@ -67,7 +66,7 @@ void partition_resolver::call_task(const rpc_response_task_ptr &t) rpc_response_task_ptr ctask = dynamic_cast(task::get_current_task()); - partition_resolver *r = this; + partition_resolver_ptr r(this); dassert(ctask != nullptr, "current task must be rpc_response_task"); ctask->replace_callback(std::move(oc)); diff --git a/src/dist/replication/client/partition_resolver_manager.cpp b/src/dist/replication/client/partition_resolver_manager.cpp index 5f6331bb5e..b88b6c6c03 100644 --- a/src/dist/replication/client/partition_resolver_manager.cpp +++ b/src/dist/replication/client/partition_resolver_manager.cpp @@ -50,17 +50,17 @@ bool vector_equal(const std::vector &a, const std::vector &b) } partition_resolver_ptr partition_resolver_manager::find_or_create( - const char *cluster_name, const std::vector &meta_list, const char *app_path) + const char *cluster_name, const std::vector &meta_list, const char *app_name) { dsn::zauto_lock l(_lock); std::map &app_map = _resolvers[cluster_name]; - partition_resolver_ptr &ptr = app_map[app_path]; + partition_resolver_ptr &ptr = app_map[app_name]; if (ptr == nullptr) { dsn::rpc_address meta_group; meta_group.assign_group(cluster_name); meta_group.group_address()->add_list(meta_list); - ptr = new partition_resolver_simple(meta_group, app_path); + ptr = new partition_resolver_simple(meta_group, app_name); return ptr; } else { dsn::rpc_address meta_group = ptr->get_meta_server(); diff --git a/src/dist/replication/client/partition_resolver_manager.h b/src/dist/replication/client/partition_resolver_manager.h index 2106fbd8cb..0addbd1482 100644 --- a/src/dist/replication/client/partition_resolver_manager.h +++ b/src/dist/replication/client/partition_resolver_manager.h @@ -41,11 +41,11 @@ class partition_resolver_manager : public dsn::utils::singleton &meta_list, - const char *app_path); + const char *app_name); private: dsn::zlock _lock; - // cluster_name -> + // cluster_name -> std::map> _resolvers; }; diff --git a/src/dist/replication/client/partition_resolver_simple.cpp b/src/dist/replication/client/partition_resolver_simple.cpp index 582d033b0d..22349e8cd1 100644 --- a/src/dist/replication/client/partition_resolver_simple.cpp +++ b/src/dist/replication/client/partition_resolver_simple.cpp @@ -32,8 +32,8 @@ namespace dsn { namespace replication { -partition_resolver_simple::partition_resolver_simple(rpc_address meta_server, const char *app_path) - : partition_resolver(meta_server, app_path), +partition_resolver_simple::partition_resolver_simple(rpc_address meta_server, const char *app_name) + : partition_resolver(meta_server, app_name), _app_id(-1), _app_partition_count(-1), _app_is_stateful(true) @@ -101,7 +101,7 @@ partition_resolver_simple::~partition_resolver_simple() void partition_resolver_simple::clear_all_pending_requests() { - dinfo("%s.client: clear all pending tasks", _app_path.c_str()); + dinfo("%s.client: clear all pending tasks", _app_name.c_str()); zauto_lock l(_requests_lock); // clear _pending_requests for (auto &pc : _pending_requests) { @@ -227,11 +227,11 @@ DEFINE_TASK_CODE_RPC(RPC_CM_QUERY_PARTITION_CONFIG_BY_INDEX, task_ptr partition_resolver_simple::query_config(int partition_index) { dinfo( - "%s.client: start query config, gpid = %d.%d", _app_path.c_str(), _app_id, partition_index); + "%s.client: start query config, gpid = %d.%d", _app_name.c_str(), _app_id, partition_index); auto msg = dsn::message_ex::create_request(RPC_CM_QUERY_PARTITION_CONFIG_BY_INDEX); configuration_query_by_index_request req; - req.app_name = _app_path; + req.app_name = _app_name; if (partition_index != -1) { req.partition_indices.push_back(partition_index); } @@ -282,7 +282,7 @@ void partition_resolver_simple::query_config_reply(error_code err, dinfo("%s.client: query config reply, gpid = %d.%d, ballot = %" PRId64 ", primary = %s", - _app_path.c_str(), + _app_name.c_str(), new_config.pid.get_app_id(), new_config.pid.get_partition_index(), new_config.ballot, @@ -306,7 +306,7 @@ void partition_resolver_simple::query_config_reply(error_code err, } } else if (resp.err == ERR_OBJECT_NOT_FOUND) { derror("%s.client: query config reply, gpid = %d.%d, err = %s", - _app_path.c_str(), + _app_name.c_str(), _app_id, partition_index, resp.err.to_string()); @@ -314,7 +314,7 @@ void partition_resolver_simple::query_config_reply(error_code err, client_err = ERR_APP_NOT_EXIST; } else { derror("%s.client: query config reply, gpid = %d.%d, err = %s", - _app_path.c_str(), + _app_name.c_str(), _app_id, partition_index, resp.err.to_string()); @@ -323,7 +323,7 @@ void partition_resolver_simple::query_config_reply(error_code err, } } else { derror("%s.client: query config reply, gpid = %d.%d, err = %s", - _app_path.c_str(), + _app_name.c_str(), _app_id, partition_index, err.to_string()); diff --git a/src/dist/replication/client/partition_resolver_simple.h b/src/dist/replication/client/partition_resolver_simple.h index 25c9b703e7..1d32924ced 100644 --- a/src/dist/replication/client/partition_resolver_simple.h +++ b/src/dist/replication/client/partition_resolver_simple.h @@ -38,7 +38,7 @@ namespace replication { class partition_resolver_simple : public partition_resolver { public: - partition_resolver_simple(rpc_address meta_server, const char *app_path); + partition_resolver_simple(rpc_address meta_server, const char *app_name); virtual ~partition_resolver_simple(); From b95ba2ac15343ace222959054712a2f2de2b82a8 Mon Sep 17 00:00:00 2001 From: WeijieSun Date: Mon, 24 Dec 2018 13:22:53 +0800 Subject: [PATCH 4/4] fix based on comments --- include/dsn/dist/replication/partition_resolver.h | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/include/dsn/dist/replication/partition_resolver.h b/include/dsn/dist/replication/partition_resolver.h index e51c774812..ff8b8a22aa 100644 --- a/include/dsn/dist/replication/partition_resolver.h +++ b/include/dsn/dist/replication/partition_resolver.h @@ -62,6 +62,11 @@ class partition_resolver : public ref_counter return response_task; } + // choosing a proper replica server from meta server or local route cache + // and send the read/write request. + // if got reply or error, call the callback. + // parameters like request data, timeout, callback handler are all wrapped + // into "task", you may want to refer to dsn::rpc_response_task for details. void call_task(const dsn::rpc_response_task_ptr &task); std::string get_app_name() const { return _app_name; }