Skip to content

Commit

Permalink
Re-create socket on re-connect. Use non-deprecated async_connect.
Browse files Browse the repository at this point in the history
  • Loading branch information
heifner committed Aug 12, 2019
1 parent 3f16468 commit 4c9dbdc
Showing 1 changed file with 18 additions and 24 deletions.
42 changes: 18 additions & 24 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,17 +142,15 @@ namespace eosio {

node_transaction_index local_txns;

shared_ptr<tcp::resolver> resolver;

bool use_socket_read_watermark = false;

channels::transaction_ack::channel_type::handle incoming_transaction_ack_subscription;

uint16_t thread_pool_size = 1;
optional<eosio::chain::named_thread_pool> thread_pool;

void connect(const connection_ptr& c);
void connect(const connection_ptr& c, tcp::resolver::iterator endpoint_itr);
void connect( const connection_ptr& c );
void connect( const connection_ptr& c, const std::shared_ptr<tcp::resolver>& resolver, tcp::resolver::results_type endpoints );
bool start_session(const connection_ptr& c);
void start_listen_loop();
void start_read_message(const connection_ptr& c);
Expand Down Expand Up @@ -796,6 +794,7 @@ namespace eosio {
void connection::close() {
if(socket) {
socket->close();
socket.reset( new tcp::socket( my_impl->thread_pool->get_executor() ) );
}
else {
fc_wlog( logger, "no socket to close!" );
Expand Down Expand Up @@ -974,7 +973,7 @@ namespace eosio {
buffer_queue.fill_out_buffer( bufs );

boost::asio::async_write(*socket, bufs,
boost::asio::bind_executor(strand, [c, priority]( boost::system::error_code ec, std::size_t w ) {
boost::asio::bind_executor(strand, [c, socket=socket, priority]( boost::system::error_code ec, std::size_t w ) {
app().post(priority, [c, priority, ec, w]() {
try {
auto conn = c.lock();
Expand Down Expand Up @@ -1870,13 +1869,14 @@ namespace eosio {
connection_wptr weak_conn = c;
// Note: need to add support for IPv6 too

auto resolver = std::make_shared<tcp::resolver>( my_impl->thread_pool->get_executor() );
resolver->async_resolve( query, boost::asio::bind_executor( c->strand,
[weak_conn, this]( const boost::system::error_code& err, tcp::resolver::iterator endpoint_itr ) {
app().post( priority::low, [err, endpoint_itr, weak_conn, this]() {
[weak_conn, resolver, this]( const boost::system::error_code& err, tcp::resolver::results_type endpoints ) {
app().post( priority::low, [err, resolver, endpoints, weak_conn, this]() {
auto c = weak_conn.lock();
if( !c ) return;
if( !err ) {
connect( c, endpoint_itr );
connect( c, resolver, endpoints );
} else {
fc_elog( logger, "Unable to resolve ${peer_addr}: ${error}",
("peer_addr", c->peer_name())( "error", err.message()) );
Expand All @@ -1885,34 +1885,28 @@ namespace eosio {
} ) );
}

void net_plugin_impl::connect(const connection_ptr& c, tcp::resolver::iterator endpoint_itr) {
void net_plugin_impl::connect( const connection_ptr& c, const std::shared_ptr<tcp::resolver>& resolver, tcp::resolver::results_type endpoints ) {
if( c->no_retry != go_away_reason::no_reason) {
string rsn = reason_str(c->no_retry);
return;
}
auto current_endpoint = *endpoint_itr;
++endpoint_itr;
c->connecting = true;
c->pending_message_buffer.reset();
connection_wptr weak_conn = c;
c->socket->async_connect( current_endpoint, boost::asio::bind_executor( c->strand,
[weak_conn, endpoint_itr, this]( const boost::system::error_code& err ) {
app().post( priority::low, [weak_conn, endpoint_itr, this, err]() {
boost::asio::async_connect( *c->socket, endpoints,
boost::asio::bind_executor( c->strand,
[weak_conn, resolver, socket=c->socket, this]( const boost::system::error_code& err, const tcp::endpoint& endpoint ) {
app().post( priority::low, [weak_conn, this, err]() {
auto c = weak_conn.lock();
if( !c ) return;
if( !err && c->socket->is_open()) {
if( start_session( c )) {
c->send_handshake();
}
} else {
if( endpoint_itr != tcp::resolver::iterator()) {
close( c );
connect( c, endpoint_itr );
} else {
fc_elog( logger, "connection failed to ${peer}: ${error}", ("peer", c->peer_name())( "error", err.message()));
c->connecting = false;
my_impl->close( c );
}
elog( "connection failed to ${peer}: ${error}", ("peer", c->peer_name())( "error", err.message()) );
c->connecting = false;
my_impl->close( c );
}
} );
} ) );
Expand Down Expand Up @@ -3023,14 +3017,14 @@ namespace eosio {
// currently thread_pool only used for server_ioc
my->thread_pool.emplace( "net", my->thread_pool_size );

my->resolver = std::make_shared<tcp::resolver>( my->thread_pool->get_executor() );
auto resolver = std::make_shared<tcp::resolver>( my_impl->thread_pool->get_executor() );
if( my->p2p_address.size() > 0 ) {
auto host = my->p2p_address.substr( 0, my->p2p_address.find( ':' ));
auto port = my->p2p_address.substr( host.size() + 1, my->p2p_address.size());
tcp::resolver::query query( tcp::v4(), host.c_str(), port.c_str());
// Note: need to add support for IPv6 too?

my->listen_endpoint = *my->resolver->resolve( query );
my->listen_endpoint = *resolver->resolve( query );

my->acceptor.reset( new tcp::acceptor( my_impl->thread_pool->get_executor() ) );

Expand Down

0 comments on commit 4c9dbdc

Please sign in to comment.