Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
rpc: support sending rpc_holder through partition_resolver (#227)
Browse files Browse the repository at this point in the history
  • Loading branch information
Wu Tao authored and qinzuoyan committed Mar 8, 2019
1 parent ddaa1ad commit 36607b7
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 25 deletions.
1 change: 1 addition & 0 deletions include/dsn/cpp/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ struct base : environment

// Await for all running tasks to complete.
void wait_all() { __conf.tracker->wait_outstanding_tasks(); }
void cancel_all() { __conf.tracker->cancel_outstanding_tasks(); }

/// === Pipeline Declaration === ///
/// Declaration of pipeline is not thread-safe.
Expand Down
83 changes: 58 additions & 25 deletions include/dsn/cpp/rpc_holder.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@
#include <dsn/tool-api/task_tracker.h>
#include <dsn/utility/smart_pointers.h>
#include <dsn/utility/chrono_literals.h>
#include <dsn/dist/replication/partition_resolver.h>

namespace dsn {

using literals::chrono_literals::operator"" _ms;

//
// rpc_holder is mainly designed for RAII of dsn::message_ex*.
// rpc_holder is mainly designed for RAII of message_ex*.
// Since the request message will be automatically released after the rpc ends,
// it will become inaccessible when you use it in an async call (probably via tasking::enqueue).
// So in rpc_holder we hold another reference of the message, preventing it to be deleted.
Expand Down Expand Up @@ -78,15 +79,15 @@ class rpc_holder
using response_type = TResponse;

public:
explicit rpc_holder(dsn::message_ex *req = nullptr)
explicit rpc_holder(message_ex *req = nullptr)
{
if (req != nullptr) {
_i = std::make_shared<internal>(req);
}
}

rpc_holder(std::unique_ptr<TRequest> req,
dsn::task_code code,
task_code code,
std::chrono::milliseconds timeout = 0_ms,
uint64_t partition_hash = 0)
: _i(new internal(req, code, timeout, partition_hash))
Expand Down Expand Up @@ -121,7 +122,7 @@ class rpc_holder
return _i->thrift_response;
}

dsn::message_ex *dsn_request() const
message_ex *dsn_request() const
{
dassert(_i, "rpc_holder is uninitialized");
return _i->dsn_request;
Expand All @@ -130,21 +131,21 @@ class rpc_holder
// the remote address where reveice request from and send response to.
rpc_address remote_address() const { return dsn_request()->header->from_address; }

// TCallback = void(dsn::error_code)
// TCallback = void(error_code)
// NOTE that the `error_code` is not the error carried by response. Users should
// check the responded error themselves.
template <typename TCallback>
task_ptr call(::dsn::rpc_address server,
dsn::task_tracker *tracker,
task_ptr call(const rpc_address &server,
task_tracker *tracker,
TCallback &&callback,
int reply_thread_hash = 0)
{
// ensures that TCallback receives exactly one argument, which must be a dsn::error_code.
// ensures that TCallback receives exactly one argument, which must be a error_code.
static_assert(function_traits<TCallback>::arity == 1,
"TCallback must receive exactly one argument");
static_assert(std::is_same<typename function_traits<TCallback>::template arg_t<0>,
dsn::error_code>::value,
"the first argument of TCallback must be dsn::error_code");
static_assert(
std::is_same<typename function_traits<TCallback>::template arg_t<0>, error_code>::value,
"the first argument of TCallback must be error_code");

if (dsn_unlikely(_mail_box != nullptr)) {
_mail_box->emplace_back(*this);
Expand All @@ -155,9 +156,9 @@ class rpc_holder
dsn_request(),
tracker,
[ cb_fwd = std::forward<TCallback>(callback),
rpc = *this ](error_code err, dsn::message_ex * req, dsn::message_ex * resp) mutable {
rpc = *this ](error_code err, message_ex * req, message_ex * resp) mutable {
if (err == ERR_OK) {
::dsn::unmarshall(resp, rpc.response());
unmarshall(resp, rpc.response());
}
cb_fwd(err);
},
Expand All @@ -166,10 +167,42 @@ class rpc_holder
return t;
}

template <typename TCallback>
task_ptr call(replication::partition_resolver_ptr &resolver,
task_tracker *tracker,
TCallback &&callback,
int reply_thread_hash = 0)
{
static_assert(function_traits<TCallback>::arity == 1,
"TCallback must receive exactly one argument");
static_assert(
std::is_same<typename function_traits<TCallback>::template arg_t<0>, error_code>::value,
"the first argument of TCallback must be error_code");

if (dsn_unlikely(_mail_box != nullptr)) {
_mail_box->emplace_back(*this);
return nullptr;
}

rpc_response_task_ptr t = rpc::create_rpc_response_task(
dsn_request(),
tracker,
[ cb_fwd = std::forward<TCallback>(callback),
rpc = *this ](error_code err, message_ex * req, message_ex * resp) mutable {
if (err == ERR_OK) {
unmarshall(resp, rpc.response());
}
cb_fwd(err);
},
reply_thread_hash);
resolver->call_task(t);
return t;
}

// Returns an rpc_holder that will reply the request after its lifetime ends.
// By default rpc_holder never replies.
// SEE: serverlet<T>::register_rpc_handler_with_rpc_holder
static inline rpc_holder auto_reply(dsn::message_ex *req)
static inline rpc_holder auto_reply(message_ex *req)
{
rpc_holder rpc(req);
rpc._i->auto_reply = true;
Expand Down Expand Up @@ -206,28 +239,28 @@ class rpc_holder

struct internal
{
explicit internal(dsn::message_ex *req)
explicit internal(message_ex *req)
: dsn_request(req), thrift_request(make_unique<TRequest>()), auto_reply(false)
{
// we must hold one reference for the request, or rdsn will delete it after
// the rpc call ends.
dsn_request->add_ref();
dsn::unmarshall(req, *thrift_request);
unmarshall(req, *thrift_request);
}

internal(std::unique_ptr<TRequest> &req,
dsn::task_code code,
task_code code,
std::chrono::milliseconds timeout,
uint64_t partition_hash)
: thrift_request(std::move(req)), auto_reply(false)
{
dassert(thrift_request != nullptr, "req should not be null");

// leave thread_hash to 0
dsn_request = dsn::message_ex::create_request(
dsn_request = message_ex::create_request(
code, static_cast<int>(timeout.count()), 0, partition_hash);
dsn_request->add_ref();
dsn::marshall(dsn_request, *thrift_request);
marshall(dsn_request, *thrift_request);
}

void reply()
Expand All @@ -240,8 +273,8 @@ class rpc_holder
return;
}

dsn::message_ex *dsn_response = dsn_request->create_response();
::dsn::marshall(dsn_response, thrift_response);
message_ex *dsn_response = dsn_request->create_response();
marshall(dsn_response, thrift_response);
dsn_rpc_reply(dsn_response);
}

Expand All @@ -253,7 +286,7 @@ class rpc_holder
dsn_request->release_ref();
}

dsn::message_ex *dsn_request;
message_ex *dsn_request;
std::unique_ptr<TRequest> thrift_request;
TResponse thrift_response;

Expand Down Expand Up @@ -285,12 +318,12 @@ struct is_rpc_holder<rpc_holder<TRequest, TResponse>> : public std::true_type
namespace rpc {

// call an RPC specified by rpc_holder.
// TCallback = void(dsn::error_code)
// TCallback = void(error_code)

template <typename TCallback, typename TRpcHolder>
task_ptr call(::dsn::rpc_address server,
task_ptr call(rpc_address server,
TRpcHolder rpc,
dsn::task_tracker *tracker,
task_tracker *tracker,
TCallback &&callback,
int reply_thread_hash = 0)
{
Expand Down

0 comments on commit 36607b7

Please sign in to comment.