Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
Server connection threshold (#234)
Browse files Browse the repository at this point in the history
  • Loading branch information
mentoswang authored and Wu Tao committed Mar 27, 2019
1 parent 450ff18 commit ca636f2
Show file tree
Hide file tree
Showing 7 changed files with 187 additions and 21 deletions.
7 changes: 7 additions & 0 deletions include/dsn/tool-api/network.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ class connection_oriented_network : public network
DSN_API void on_server_session_accepted(rpc_session_ptr &s);
DSN_API void on_server_session_disconnected(rpc_session_ptr &s);

// server connection count threshold
DSN_API bool is_conn_threshold_exceeded(::dsn::rpc_address ep);

// client session management
DSN_API rpc_session_ptr get_client_session(::dsn::rpc_address ep);
DSN_API void on_client_session_connected(rpc_session_ptr &s);
Expand All @@ -187,7 +190,11 @@ class connection_oriented_network : public network

typedef std::unordered_map<::dsn::rpc_address, rpc_session_ptr> server_sessions;
server_sessions _servers; // from_address => rpc_session
typedef std::unordered_map<uint32_t, uint32_t> ip_connection_count;
ip_connection_count _ip_conn_count; // from_ip => connection count
utils::rw_lock_nr _servers_lock;

uint32_t _cfg_conn_threshold_per_ip;
};

/*!
Expand Down
62 changes: 62 additions & 0 deletions src/core/core/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,7 @@ uint32_t network::get_local_ipv4()
connection_oriented_network::connection_oriented_network(rpc_engine *srv, network *inner_provider)
: network(srv, inner_provider)
{
_cfg_conn_threshold_per_ip = 0;
}

void connection_oriented_network::inject_drop_message(message_ex *msg, bool is_send)
Expand Down Expand Up @@ -576,8 +577,10 @@ rpc_session_ptr connection_oriented_network::get_server_session(::dsn::rpc_addre
void connection_oriented_network::on_server_session_accepted(rpc_session_ptr &s)
{
int scount = 0;
int ecount = 1;
{
utils::auto_write_lock l(_servers_lock);

auto pr = _servers.insert(server_sessions::value_type(s->remote_address(), s));
if (pr.second) {
// nothing to do
Expand All @@ -587,16 +590,28 @@ void connection_oriented_network::on_server_session_accepted(rpc_session_ptr &s)
s->remote_address().to_string());
}
scount = (int)_servers.size();

auto pr2 =
_ip_conn_count.insert(ip_connection_count::value_type(s->remote_address().ip(), 1));
if (!pr2.second) {
ecount = ++pr2.first->second;
}
}

ddebug("server session accepted, remote_client = %s, current_count = %d",
s->remote_address().to_string(),
scount);

ddebug("ip session %s, remote_client = %s, current_count = %d",
ecount == 1 ? "inserted" : "increased",
s->remote_address().to_string(),
ecount);
}

void connection_oriented_network::on_server_session_disconnected(rpc_session_ptr &s)
{
int scount = 0;
int ecount = 0;
bool r = false;
{
utils::auto_write_lock l(_servers_lock);
Expand All @@ -606,13 +621,60 @@ void connection_oriented_network::on_server_session_disconnected(rpc_session_ptr
r = true;
}
scount = (int)_servers.size();

auto it2 = _ip_conn_count.find(s->remote_address().ip());
if (it2 != _ip_conn_count.end()) {
if (it2->second > 1) {
ecount = --it2->second;
} else {
_ip_conn_count.erase(it2);
}
}
}

if (r) {
ddebug("server session disconnected, remote_client = %s, current_count = %d",
s->remote_address().to_string(),
scount);
}

if (ecount == 0)
ddebug("ip session erased, remote_client = %s", s->remote_address().to_string());
else
ddebug("ip session decreased, remote_client = %s, current_count = %d",
s->remote_address().to_string(),
ecount);
}

bool connection_oriented_network::is_conn_threshold_exceeded(::dsn::rpc_address ep)
{
if (_cfg_conn_threshold_per_ip <= 0) {
dinfo("new client from %s is connecting to server %s, no connection threshold",
ep.ipv4_str(),
address().to_string());
return false;
}

bool exceeded = false;
int scount = 0;
{
utils::auto_read_lock l(_servers_lock);
auto it = _ip_conn_count.find(ep.ip());
if (it != _ip_conn_count.end()) {
scount = it->second;
}
}
if (scount >= _cfg_conn_threshold_per_ip)
exceeded = true;

dinfo("new client from %s is connecting to server %s, existing connection count "
"= %d, threshold = %u",
ep.ipv4_str(),
address().to_string(),
scount,
_cfg_conn_threshold_per_ip);

return exceeded;
}

rpc_session_ptr connection_oriented_network::get_client_session(::dsn::rpc_address ep)
Expand Down
96 changes: 85 additions & 11 deletions src/core/tests/netprovider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,23 @@
using namespace dsn;
using namespace dsn::tools;

class asio_network_provider_test : public asio_network_provider
{
public:
asio_network_provider_test(rpc_engine *srv, network *inner_provider)
: asio_network_provider(srv, inner_provider)
{
}

public:
void change_test_cfg_conn_threshold_per_ip(uint32_t n)
{
ddebug(
"change _cfg_conn_threshold_per_ip %u -> %u for test", _cfg_conn_threshold_per_ip, n);
_cfg_conn_threshold_per_ip = n;
}
};

static int TEST_PORT = 20401;
DEFINE_TASK_CODE_RPC(RPC_TEST_NETPROVIDER, TASK_PRIORITY_COMMON, THREAD_POOL_TEST_SERVER)

Expand All @@ -73,6 +90,12 @@ void response_handler(dsn::error_code ec,
wait_flag = 1;
}

void reject_response_handler(dsn::error_code ec)
{
wait_flag = 1;
ASSERT_TRUE(ERR_TIMEOUT == ec);
}

void rpc_server_response(dsn::message_ex *request)
{
std::string str_command;
Expand All @@ -88,7 +111,7 @@ void wait_response()
std::this_thread::sleep_for(std::chrono::seconds(1));
}

void rpc_client_session_send(rpc_session_ptr client_session)
void rpc_client_session_send(rpc_session_ptr client_session, bool reject = false)
{
message_ex *msg = message_ex::create_request(RPC_TEST_NETPROVIDER, 0, 0);
std::unique_ptr<char[]> buf(new char[128]);
Expand All @@ -97,17 +120,21 @@ void rpc_client_session_send(rpc_session_ptr client_session)
::dsn::marshall(msg, std::string(buf.get()));

wait_flag = 0;
rpc_response_task *t = new rpc_response_task(msg,
std::bind(&response_handler,
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3,
buf.get()),
0);

client_session->net().engine()->matcher()->on_call(msg, t);
if (!reject) {
rpc_response_task *t = new rpc_response_task(msg,
std::bind(&response_handler,
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3,
buf.get()),
0);
client_session->net().engine()->matcher()->on_call(msg, t);
} else {
rpc_response_task *t = new rpc_response_task(
msg, std::bind(&reject_response_handler, std::placeholders::_1), 0);
client_session->net().engine()->matcher()->on_call(msg, t);
}
client_session->send_message(msg);

wait_response();
}

Expand Down Expand Up @@ -228,3 +255,50 @@ TEST(tools_common, sim_net_provider)

TEST_PORT++;
}

TEST(tools_common, asio_network_provider_connection_threshold)
{
if (dsn::service_engine::instance().spec().semaphore_factory_name ==
"dsn::tools::sim_semaphore_provider")
return;

ASSERT_TRUE(dsn_rpc_register_handler(
RPC_TEST_NETPROVIDER, "rpc.test.netprovider", rpc_server_response));

asio_network_provider_test *asio_network =
new asio_network_provider_test(task::get_current_rpc(), nullptr);

error_code start_result;
start_result = asio_network->start(RPC_CHANNEL_TCP, TEST_PORT, false);
ASSERT_TRUE(start_result == ERR_OK);
asio_network->change_test_cfg_conn_threshold_per_ip(10);

for (int count = 0; count < 20; count++) {
ddebug("client # %d", count);
rpc_session_ptr client_session =
asio_network->create_client_session(rpc_address("localhost", TEST_PORT));
client_session->connect();

rpc_client_session_send(client_session);

client_session->close();
std::this_thread::sleep_for(std::chrono::seconds(1));
}

bool reject = false;
for (int count = 0; count < 20; count++) {

ddebug("client # %d", count);
rpc_session_ptr client_session =
asio_network->create_client_session(rpc_address("localhost", TEST_PORT));
client_session->connect();

if (count >= 10)
reject = true;
rpc_client_session_send(client_session, reject);
}

ASSERT_TRUE(dsn_rpc_unregiser_handler(RPC_TEST_NETPROVIDER));

TEST_PORT++;
}
22 changes: 19 additions & 3 deletions src/core/tools/common/asio_net_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ error_code asio_network_provider::start(rpc_channel channel, int port, bool clie
"io_service_worker_count",
1,
"thread number for io service (timer and boost network)");

// get connection threshold from config, default value 0 means no threshold
_cfg_conn_threshold_per_ip = (uint32_t)dsn_config_get_value_uint64(
"network", "conn_threshold_per_ip", 0, "max connection count to each server per ip");
ddebug("_cfg_conn_threshold_per_ip = %u", _cfg_conn_threshold_per_ip);

for (int i = 0; i < io_service_worker_count; i++) {
_workers.push_back(std::make_shared<std::thread>([this, i]() {
task::set_tls_dsn_context(node(), nullptr);
Expand Down Expand Up @@ -148,10 +154,20 @@ void asio_network_provider::do_accept()
(std::shared_ptr<boost::asio::ip::tcp::socket> &)socket,
null_parser,
false);
on_server_session_accepted(s);

// we should start read immediately after the rpc session is completely created.
s->start_read_next();
// when server connection threshold is hit, close the session, otherwise accept it
if (is_conn_threshold_exceeded(s->remote_address())) {
dwarn("close rpc connection from %s to %s due to hitting server "
"connection threshold per ip",
s->remote_address().to_string(),
address().to_string());
s->close();
} else {
on_server_session_accepted(s);

// we should start read immediately after the rpc session is completely created.
s->start_read_next();
}
}
}

Expand Down
1 change: 1 addition & 0 deletions src/core/tools/common/asio_net_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class asio_network_provider : public connection_oriented_network

private:
friend class asio_rpc_session;
friend class asio_network_provider_test;

std::shared_ptr<boost::asio::ip::tcp::acceptor> _acceptor;
boost::asio::io_service _io_service;
Expand Down
18 changes: 12 additions & 6 deletions src/dist/replication/client/partition_resolver_simple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,12 @@ void partition_resolver_simple::call(request_context_ptr &&request, bool from_me

// init configuration query task if necessary
if (nullptr == it->second->query_config_task) {
it->second->query_config_task = query_config(pindex);
it->second->query_config_task = query_config(pindex, timeout_ms);
}
} else {
_pending_requests_before_partition_count_unknown.push_back(std::move(request));
if (_pending_requests_before_partition_count_unknown.size() == 1) {
_query_config_task = query_config(pindex);
_query_config_task = query_config(pindex, timeout_ms);
}
}
}
Expand All @@ -224,11 +224,17 @@ DEFINE_TASK_CODE_RPC(RPC_CM_QUERY_PARTITION_CONFIG_BY_INDEX,
TASK_PRIORITY_COMMON,
THREAD_POOL_DEFAULT)

task_ptr partition_resolver_simple::query_config(int partition_index)
task_ptr partition_resolver_simple::query_config(int partition_index, int timeout_ms)
{
dinfo(
"%s.client: start query config, gpid = %d.%d", _app_name.c_str(), _app_id, partition_index);
auto msg = dsn::message_ex::create_request(RPC_CM_QUERY_PARTITION_CONFIG_BY_INDEX);
dinfo("%s.client: start query config, gpid = %d.%d, timeout_ms = %d",
_app_name.c_str(),
_app_id,
partition_index,
timeout_ms);
task_spec *sp = task_spec::get(RPC_CM_QUERY_PARTITION_CONFIG_BY_INDEX);
if (timeout_ms >= sp->rpc_timeout_milliseconds)
timeout_ms = 0;
auto msg = dsn::message_ex::create_request(RPC_CM_QUERY_PARTITION_CONFIG_BY_INDEX, timeout_ms);

configuration_query_by_index_request req;
req.app_name = _app_name;
Expand Down
2 changes: 1 addition & 1 deletion src/dist/replication/client/partition_resolver_simple.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class partition_resolver_simple : public partition_resolver
void on_timeout(request_context_ptr &&rc) const;

// with meta server
task_ptr query_config(int partition_index);
task_ptr query_config(int partition_index, int timeout_ms);
void query_config_reply(error_code err,
dsn::message_ex *request,
dsn::message_ex *response,
Expand Down

0 comments on commit ca636f2

Please sign in to comment.