Skip to content

Commit

Permalink
valgrind: fix leaks of rpc_engine (#182)
Browse files Browse the repository at this point in the history
  • Loading branch information
Wu Tao authored and qinzuoyan committed Oct 23, 2018
1 parent e6167ce commit 3a2fc89
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 65 deletions.
34 changes: 10 additions & 24 deletions src/core/core/rpc_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,11 @@
* THE SOFTWARE.
*/

/*
* Description:
* rpc engine implementation
*
* Revision history:
* Mar., 2015, @imzhenyu (Zhenyu Guo), first version
* xxxx-xx-xx, author, fix bug about xxx
*/

#ifdef _WIN32
#include <WinSock2.h>
#else
#include <sys/socket.h>
#include <netdb.h>
#include <ifaddrs.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#endif

#include "rpc_engine.h"
#include "service_engine.h"
Expand Down Expand Up @@ -464,7 +451,7 @@ error_code rpc_engine::start(const service_app_spec &aspec)

// for each format
for (int i = NET_HDR_INVALID + 1; i <= network_header_format::max_value(); i++) {
std::vector<network *> &pnet = _client_nets[i];
std::vector<std::unique_ptr<network>> &pnet = _client_nets[i];
pnet.resize(rpc_channel::max_value() + 1);
auto client_hdr_format = network_header_format(network_header_format::to_string(i));

Expand All @@ -491,7 +478,7 @@ error_code rpc_engine::start(const service_app_spec &aspec)
auto net = create_network(cs, true, client_hdr_format);
if (!net)
return ERR_NETWORK_INIT_FAILED;
pnet[j] = net;
pnet[j].reset(net);

ddebug("[%s] network client started at port %u, channel = %s, fmt = %s ...",
node()->full_name(),
Expand All @@ -505,13 +492,11 @@ error_code rpc_engine::start(const service_app_spec &aspec)
for (auto &sp : aspec.network_server_confs) {
int port = sp.second.port;

std::vector<network *> *pnets;
std::vector<std::unique_ptr<network>> *pnets;
auto it = _server_nets.find(port);

if (it == _server_nets.end()) {
std::vector<network *> nets;
auto pr =
_server_nets.insert(std::map<int, std::vector<network *>>::value_type(port, nets));
auto pr = _server_nets.emplace(port, std::vector<std::unique_ptr<network>>{});
pnets = &pr.first->second;
pnets->resize(rpc_channel::max_value() + 1);
} else {
Expand All @@ -523,7 +508,7 @@ error_code rpc_engine::start(const service_app_spec &aspec)
return ERR_NETWORK_INIT_FAILED;
}

(*pnets)[sp.second.channel] = net;
(*pnets)[sp.second.channel].reset(net);

dwarn("[%s] network server started at port %u, channel = %s, ...",
node()->full_name(),
Expand Down Expand Up @@ -798,7 +783,7 @@ void rpc_engine::call_ip(rpc_address addr,
auto sp = task_spec::get(request->local_rpc_code);
auto &hdr = *request->header;

network *net = _client_nets[request->hdr_format][sp->rpc_call_channel];
network *net = _client_nets[request->hdr_format][sp->rpc_call_channel].get();
dassert(nullptr != net,
"network not present for rpc channel '%s' with format '%s' used by rpc %s",
sp->rpc_call_channel.to_string(),
Expand Down Expand Up @@ -903,7 +888,7 @@ void rpc_engine::reply(message_ex *response, error_code err)

// use the header format recorded in the message
auto rpc_channel = sp ? sp->rpc_call_channel : RPC_CHANNEL_TCP;
network *net = _client_nets[response->hdr_format][rpc_channel];
network *net = _client_nets[response->hdr_format][rpc_channel].get();
dassert(
nullptr != net,
"client network not present for rpc channel '%s' with format '%s' used by rpc %s",
Expand All @@ -925,7 +910,7 @@ void rpc_engine::reply(message_ex *response, error_code err)
"target address must have named port in this case");

auto rpc_channel = sp ? sp->rpc_call_channel : RPC_CHANNEL_TCP;
network *net = _server_nets[response->header->from_address.port()][rpc_channel];
network *net = _server_nets[response->header->from_address.port()][rpc_channel].get();

dassert(nullptr != net,
"server network not present for rpc channel '%s' on port %u used by rpc %s",
Expand Down Expand Up @@ -978,4 +963,5 @@ void rpc_engine::forward(message_ex *request, rpc_address address)
call_ip(address, copied_request, nullptr, false, true);
}
}
}

} // namespace dsn
17 changes: 5 additions & 12 deletions src/core/core/rpc_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,6 @@
* THE SOFTWARE.
*/

/*
* Description:
* rpc service
*
* Revision history:
* Mar., 2015, @imzhenyu (Zhenyu Guo), first version
* xxxx-xx-xx, author, fix bug about xxx
*/

#pragma once

#include <dsn/utility/synchronize.h>
Expand Down Expand Up @@ -194,8 +185,10 @@ class rpc_engine

private:
service_node *_node;
std::vector<std::vector<network *>> _client_nets; // <format, <CHANNEL, network*>>
std::unordered_map<int, std::vector<network *>> _server_nets; // <port, <CHANNEL, network*>>
std::vector<std::vector<std::unique_ptr<network>>>
_client_nets; // <format, <CHANNEL, network*>>
std::unordered_map<int, std::vector<std::unique_ptr<network>>>
_server_nets; // <port, <CHANNEL, network*>>
::dsn::rpc_address _local_primary_address;
rpc_client_matcher _rpc_matcher;
rpc_server_dispatcher _rpc_dispatcher;
Expand Down Expand Up @@ -227,4 +220,4 @@ rpc_engine::call_address(rpc_address addr, message_ex *request, const rpc_respon
}
}

} // end namespace
} // namespace dsn
45 changes: 28 additions & 17 deletions src/core/tools/common/asio_net_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,6 @@
* THE SOFTWARE.
*/

/*
* Description:
* What is this file about?
*
* Revision history:
* xxxx-xx-xx, author, first version
* xxxx-xx-xx, author, fix bug about xxx
*/

#include <dsn/utility/rand.h>

#include "asio_net_provider.h"
Expand All @@ -47,6 +38,17 @@ asio_network_provider::asio_network_provider(rpc_engine *srv, network *inner_pro
_acceptor = nullptr;
}

asio_network_provider::~asio_network_provider()
{
if (_acceptor) {
_acceptor->close();
}
_io_service.stop();
for (auto &w : _workers) {
w->join();
}
}

error_code asio_network_provider::start(rpc_channel channel, int port, bool client_only)
{
if (_acceptor != nullptr)
Expand All @@ -58,7 +60,7 @@ error_code asio_network_provider::start(rpc_channel channel, int port, bool clie
1,
"thread number for io service (timer and boost network)");
for (int i = 0; i < io_service_worker_count; i++) {
_workers.push_back(std::shared_ptr<std::thread>(new std::thread([this, i]() {
_workers.push_back(std::make_shared<std::thread>([this, i]() {
task::set_tls_dsn_context(node(), nullptr);

const char *name = ::dsn::tools::get_service_node_name(node());
Expand All @@ -69,8 +71,10 @@ error_code asio_network_provider::start(rpc_channel channel, int port, bool clie
boost::asio::io_service::work work(_io_service);
boost::system::error_code ec;
_io_service.run(ec);
dassert(false, "boost::asio::io_service run failed: err(%s)", ec.message().data());
})));
if (ec) {
dassert(false, "boost::asio::io_service run failed: err(%s)", ec.message().data());
}
}));
}

_acceptor = nullptr;
Expand Down Expand Up @@ -209,6 +213,11 @@ asio_udp_provider::~asio_udp_provider()
}
delete[] _parsers;
_parsers = nullptr;

_io_service.stop();
for (auto &w : _workers) {
w->join();
}
}

message_parser *asio_udp_provider::get_message_parser(network_header_format hdr_format)
Expand Down Expand Up @@ -348,7 +357,7 @@ error_code asio_udp_provider::start(rpc_channel channel, int port, bool client_o
}

for (int i = 0; i < io_service_worker_count; i++) {
_workers.push_back(std::shared_ptr<std::thread>(new std::thread([this, i]() {
_workers.push_back(std::make_shared<std::thread>([this, i]() {
task::set_tls_dsn_context(node(), nullptr);

const char *name = ::dsn::tools::get_service_node_name(node());
Expand All @@ -359,13 +368,15 @@ error_code asio_udp_provider::start(rpc_channel channel, int port, bool client_o
boost::asio::io_service::work work(_io_service);
boost::system::error_code ec;
_io_service.run(ec);
dassert(false, "boost::asio::io_service run failed: err(%s)", ec.message().data());
})));
if (ec) {
dassert(false, "boost::asio::io_service run failed: err(%s)", ec.message().data());
}
}));
}

do_receive();

return ERR_OK;
}
}
}
} // namespace tools
} // namespace dsn
18 changes: 6 additions & 12 deletions src/core/tools/common/asio_net_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,6 @@
* THE SOFTWARE.
*/

/*
* Description:
* What is this file about?
*
* Revision history:
* xxxx-xx-xx, author, first version
* xxxx-xx-xx, author, fix bug about xxx
*/

#pragma once

#include <dsn/tool_api.h>
Expand All @@ -46,6 +37,8 @@ class asio_network_provider : public connection_oriented_network
public:
asio_network_provider(rpc_engine *srv, network *inner_provider);

~asio_network_provider() override;

virtual error_code start(rpc_channel channel, int port, bool client_only) override;
virtual ::dsn::rpc_address address() override { return _address; }
virtual rpc_session_ptr create_client_session(::dsn::rpc_address server_addr) override;
Expand All @@ -67,7 +60,7 @@ class asio_udp_provider : public network
public:
asio_udp_provider(rpc_engine *srv, network *inner_provider);

virtual ~asio_udp_provider();
~asio_udp_provider() override;

void send_message(message_ex *request) override;

Expand Down Expand Up @@ -99,5 +92,6 @@ class asio_udp_provider : public network

static const size_t max_udp_packet_size = 1000;
};
}
}

} // namespace tools
} // namespace dsn

0 comments on commit 3a2fc89

Please sign in to comment.