Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: reimplement pegasus_server_write::on_batched_write_requests #680

Merged
merged 10 commits into from
Jan 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 48 additions & 42 deletions src/server/pegasus_server_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ namespace server {
pegasus_server_write::pegasus_server_write(pegasus_server_impl *server, bool verbose_log)
: replica_base(server), _write_svc(new pegasus_write_service(server)), _verbose_log(verbose_log)
{
init_non_batch_write_handlers();
}

int pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests,
Expand All @@ -50,43 +51,11 @@ int pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests,
return _write_svc->empty_put(_decree);
}

dsn::task_code rpc_code(requests[0]->rpc_code());
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_MULTI_PUT) {
dassert(count == 1, "count = %d", count);
auto rpc = multi_put_rpc::auto_reply(requests[0]);
return _write_svc->multi_put(_write_ctx, rpc.request(), rpc.response());
auto iter = _non_batch_write_handlers.find(requests[0]->rpc_code());
if (iter != _non_batch_write_handlers.end()) {
dassert_f(count == 1, "count = {}", count);
return iter->second(requests[0]);
}
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE) {
dassert(count == 1, "count = %d", count);
auto rpc = multi_remove_rpc::auto_reply(requests[0]);
return _write_svc->multi_remove(_decree, rpc.request(), rpc.response());
}
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_INCR) {
dassert(count == 1, "count = %d", count);
auto rpc = incr_rpc::auto_reply(requests[0]);
return _write_svc->incr(_decree, rpc.request(), rpc.response());
}
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_DUPLICATE) {
dassert(count == 1, "count = %d", count);
auto rpc = duplicate_rpc::auto_reply(requests[0]);
return _write_svc->duplicate(_decree, rpc.request(), rpc.response());
}
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET) {
dassert(count == 1, "count = %d", count);
auto rpc = check_and_set_rpc::auto_reply(requests[0]);
return _write_svc->check_and_set(_decree, rpc.request(), rpc.response());
}
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) {
dassert(count == 1, "count = %d", count);
auto rpc = check_and_mutate_rpc::auto_reply(requests[0]);
return _write_svc->check_and_mutate(_decree, rpc.request(), rpc.response());
}
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_BULK_LOAD) {
dassert(count == 1, "count = %d", count);
auto rpc = ingestion_rpc::auto_reply(requests[0]);
return _write_svc->ingestion_files(_decree, rpc.request(), rpc.response());
}

return on_batched_writes(requests, count);
}

Expand Down Expand Up @@ -116,13 +85,10 @@ int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int coun
local_err = on_single_remove_in_batch(rpc);
_remove_rpc_batch.emplace_back(std::move(rpc));
} else {
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_MULTI_PUT ||
rpc_code == dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE ||
rpc_code == dsn::apps::RPC_RRDB_RRDB_INCR ||
rpc_code == dsn::apps::RPC_RRDB_RRDB_DUPLICATE) {
dfatal("rpc code not allow batch: %s", rpc_code.to_string());
if (_non_batch_write_handlers.find(rpc_code) != _non_batch_write_handlers.end()) {
dfatal_f("rpc code not allow batch: {}", rpc_code.to_string());
} else {
dfatal("rpc code not handled: %s", rpc_code.to_string());
dfatal_f("rpc code not handled: {}", rpc_code.to_string());
}
}

Expand Down Expand Up @@ -170,5 +136,45 @@ void pegasus_server_write::request_key_check(int64_t decree,
}
}

void pegasus_server_write::init_non_batch_write_handlers()
{
_non_batch_write_handlers = {
{dsn::apps::RPC_RRDB_RRDB_MULTI_PUT,
[this](dsn::message_ex *request) -> int {
auto rpc = multi_put_rpc::auto_reply(request);
return _write_svc->multi_put(_write_ctx, rpc.request(), rpc.response());
}},
{dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE,
[this](dsn::message_ex *request) -> int {
auto rpc = multi_remove_rpc::auto_reply(request);
return _write_svc->multi_remove(_decree, rpc.request(), rpc.response());
}},
{dsn::apps::RPC_RRDB_RRDB_INCR,
[this](dsn::message_ex *request) -> int {
auto rpc = incr_rpc::auto_reply(request);
return _write_svc->incr(_decree, rpc.request(), rpc.response());
}},
{dsn::apps::RPC_RRDB_RRDB_DUPLICATE,
[this](dsn::message_ex *request) -> int {
auto rpc = duplicate_rpc::auto_reply(request);
return _write_svc->duplicate(_decree, rpc.request(), rpc.response());
}},
{dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET,
[this](dsn::message_ex *request) -> int {
auto rpc = check_and_set_rpc::auto_reply(request);
return _write_svc->check_and_set(_decree, rpc.request(), rpc.response());
}},
{dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE,
[this](dsn::message_ex *request) -> int {
auto rpc = check_and_mutate_rpc::auto_reply(request);
return _write_svc->check_and_mutate(_decree, rpc.request(), rpc.response());
}},
{dsn::apps::RPC_RRDB_RRDB_BULK_LOAD,
[this](dsn::message_ex *request) -> int {
auto rpc = ingestion_rpc::auto_reply(request);
return _write_svc->ingestion_files(_decree, rpc.request(), rpc.response());
}},
};
}
} // namespace server
} // namespace pegasus
5 changes: 5 additions & 0 deletions src/server/pegasus_server_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ class pegasus_server_write : public dsn::replication::replica_base
void request_key_check(int64_t decree, dsn::message_ex *m, const dsn::blob &key);

private:
void init_non_batch_write_handlers();

friend class pegasus_server_write_test;
friend class pegasus_write_service_test;
friend class pegasus_write_service_impl_test;
Expand All @@ -84,6 +86,9 @@ class pegasus_server_write : public dsn::replication::replica_base
int64_t _decree;

const bool _verbose_log;

typedef std::map<dsn::task_code, std::function<int(dsn::message_ex *)>> non_batch_writes_map;
non_batch_writes_map _non_batch_write_handlers;
};

} // namespace server
Expand Down