Skip to content

Commit

Permalink
rpc: remove uri resolver; move partition resolver to replication/clie…
Browse files Browse the repository at this point in the history
…nt (#205)
  • Loading branch information
shengofsun authored and Wu Tao committed Dec 24, 2018
1 parent 1e5d529 commit 38d7c02
Show file tree
Hide file tree
Showing 27 changed files with 508 additions and 931 deletions.
139 changes: 139 additions & 0 deletions include/dsn/dist/replication/partition_resolver.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* The MIT License (MIT)
*
* Copyright (c) 2015 Microsoft Corporation
*
* -=- Robust Distributed System Nucleus (rDSN) -=-
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

#pragma once

#include <dsn/utility/autoref_ptr.h>
#include <dsn/utility/error_code.h>
#include <dsn/tool-api/gpid.h>
#include <dsn/tool-api/rpc_address.h>
#include <dsn/tool-api/rpc_message.h>
#include <dsn/tool-api/async_calls.h>

namespace dsn {
namespace replication {

class partition_resolver : public ref_counter
{
public:
static dsn::ref_ptr<partition_resolver>
get_resolver(const char *cluster_name,
const std::vector<dsn::rpc_address> &meta_list,
const char *app_name);

template <typename TReq, typename TCallback>
dsn::rpc_response_task_ptr call_op(dsn::task_code code,
TReq &&request,
dsn::task_tracker *tracker,
TCallback &&callback,
std::chrono::milliseconds timeout,
uint64_t partition_hash,
int reply_hash = 0)
{
dsn::message_ex *msg = dsn::message_ex::create_request(
code, static_cast<int>(timeout.count()), 0, partition_hash);
marshall(msg, std::forward<TReq>(request));
dsn::rpc_response_task_ptr response_task = rpc::create_rpc_response_task(
msg, tracker, std::forward<TCallback>(callback), reply_hash);
call_task(response_task);
return response_task;
}

// choosing a proper replica server from meta server or local route cache
// and send the read/write request.
// if got reply or error, call the callback.
// parameters like request data, timeout, callback handler are all wrapped
// into "task", you may want to refer to dsn::rpc_response_task for details.
void call_task(const dsn::rpc_response_task_ptr &task);

std::string get_app_name() const { return _app_name; }

dsn::rpc_address get_meta_server() const { return _meta_server; }

protected:
partition_resolver(rpc_address meta_server, const char *app_name)
: _app_name(app_name), _meta_server(meta_server)
{
}

virtual ~partition_resolver() {}

struct resolve_result
{
///< ERR_OK
///< ERR_SERVICE_NOT_FOUND if resolver or app is missing
///< ERR_IO_PENDING if resolve in is progress, callers
///< should call resolve_async in this case
error_code err;
///< IPv4 of the target to send request to
rpc_address address;
///< global partition indentity
dsn::gpid pid;
};

/**
* resolve partition_hash into IP or group addresses to know what to connect next
*
* \param partition_hash the partition hash
* \param callback callback invoked on completion or timeout
* \param timeout_ms timeout to execute the callback
*
* \return see \ref resolve_result for details
*/
virtual void resolve(uint64_t partition_hash,
std::function<void(resolve_result &&)> &&callback,
int timeout_ms) = 0;

/*!
failure handler when access failed for certain partition
\param partition_index zero-based index of the partition.
\param err error code
this is usually to trigger new round of address resolve
*/
virtual void on_access_failure(int partition_index, error_code err) = 0;

/**
* get zero-based partition index
*
* \param partition_count number of partitions.
* \param partition_hash the partition hash.
*
* \return zero-based partition index.
*/

virtual int get_partition_index(int partition_count, uint64_t partition_hash) = 0;

std::string _cluster_name;
std::string _app_name;
rpc_address _meta_server;
};

typedef ref_ptr<partition_resolver> partition_resolver_ptr;

} // namespace replication
} // namespace dsn
8 changes: 7 additions & 1 deletion include/dsn/tool-api/group_address.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ class rpc_group_address : public dsn::ref_counter
rpc_group_address(const rpc_group_address &other);
rpc_group_address &operator=(const rpc_group_address &other);
bool add(rpc_address addr);
void add_list(const std::vector<rpc_address> &list)
{
for (const rpc_address &r : list) {
add(r);
}
}
void set_leader(rpc_address addr);
bool remove(rpc_address addr);
bool contains(rpc_address addr);
Expand Down Expand Up @@ -206,4 +212,4 @@ inline rpc_address rpc_group_address::next(rpc_address current) const
}
}
}
}
} // namespace dsn
124 changes: 0 additions & 124 deletions include/dsn/tool-api/partition_resolver.h

This file was deleted.

17 changes: 0 additions & 17 deletions include/dsn/tool-api/rpc_address.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,11 @@ typedef enum dsn_host_type_t {
HOST_TYPE_INVALID = 0,
HOST_TYPE_IPV4 = 1,
HOST_TYPE_GROUP = 2,
HOST_TYPE_URI = 3,
} dsn_host_type_t;

namespace dsn {

class rpc_group_address;
class rpc_uri_address;

class rpc_address
{
Expand Down Expand Up @@ -98,8 +96,6 @@ class rpc_address
_addr.v4.port = port;
}

void assign_uri(const char *host_uri);

void assign_group(const char *name);

const char *to_string() const;
Expand Down Expand Up @@ -145,8 +141,6 @@ class rpc_address
return (rpc_group_address *)(uintptr_t)_addr.group.group;
}

rpc_uri_address *uri_address() const { return (rpc_uri_address *)(uintptr_t)_addr.uri.uri; }

bool is_invalid() const { return _addr.v4.type == HOST_TYPE_INVALID; }

// before you assign new value, must call set_invalid() to release original value
Expand All @@ -161,8 +155,6 @@ class rpc_address
switch (type()) {
case HOST_TYPE_IPV4:
return ip() == r.ip() && _addr.v4.port == r.port();
case HOST_TYPE_URI:
return strcmp(to_string(), r.to_string()) == 0;
case HOST_TYPE_GROUP:
return _addr.group.group == r._addr.group.group;
default:
Expand All @@ -180,8 +172,6 @@ class rpc_address
switch (type()) {
case HOST_TYPE_IPV4:
return ip() < r.ip() || (ip() == r.ip() && port() < r.port());
case HOST_TYPE_URI:
return strcmp(to_string(), r.to_string()) < 0;
case HOST_TYPE_GROUP:
return _addr.group.group < r._addr.group.group;
default:
Expand All @@ -204,11 +194,6 @@ class rpc_address
unsigned long long ip : 32;
} v4; ///< \ref HOST_TYPE_IPV4
struct
{
unsigned long long type : 2;
unsigned long long uri : 62;
} uri; ///< \ref HOST_TYPE_URI
struct
{
unsigned long long type : 2;
unsigned long long group : 62; ///< dsn_group_t
Expand All @@ -229,8 +214,6 @@ struct hash<::dsn::rpc_address>
switch (ep.type()) {
case HOST_TYPE_IPV4:
return std::hash<uint32_t>()(ep.ip()) ^ std::hash<uint16_t>()(ep.port());
case HOST_TYPE_URI:
return std::hash<std::string>()(std::string(ep.to_string()));
case HOST_TYPE_GROUP:
return std::hash<void *>()(ep.group_address());
default:
Expand Down
4 changes: 2 additions & 2 deletions include/dsn/tool-api/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ class rpc_response_task : public task
//
// TODO(sunweijie): totally elimite this feature
//
const rpc_response_handler &current_handler() const { return _cb; }
void fetch_current_handler(rpc_response_handler &cb) { cb = std::move(_cb); }
void replace_callback(rpc_response_handler &&cb)
{
task_state cur_state = state();
Expand Down Expand Up @@ -684,4 +684,4 @@ __inline /*static*/ env_provider *task::get_current_env()
return tls_dsn.env;
}

} // end namespace
} // namespace dsn
Loading

0 comments on commit 38d7c02

Please sign in to comment.