diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 48eb156ad1c..bc2d70f329d 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -142,8 +142,6 @@ namespace eosio { node_transaction_index local_txns; - shared_ptr resolver; - bool use_socket_read_watermark = false; channels::transaction_ack::channel_type::handle incoming_transaction_ack_subscription; @@ -151,8 +149,8 @@ namespace eosio { uint16_t thread_pool_size = 1; optional thread_pool; - void connect(const connection_ptr& c); - void connect(const connection_ptr& c, tcp::resolver::iterator endpoint_itr); + void connect( const connection_ptr& c ); + void connect( const connection_ptr& c, const std::shared_ptr& resolver, tcp::resolver::results_type endpoints ); bool start_session(const connection_ptr& c); void start_listen_loop(); void start_read_message(const connection_ptr& c); @@ -796,6 +794,7 @@ namespace eosio { void connection::close() { if(socket) { socket->close(); + socket.reset( new tcp::socket( my_impl->thread_pool->get_executor() ) ); } else { fc_wlog( logger, "no socket to close!" ); @@ -974,7 +973,7 @@ namespace eosio { buffer_queue.fill_out_buffer( bufs ); boost::asio::async_write(*socket, bufs, - boost::asio::bind_executor(strand, [c, priority]( boost::system::error_code ec, std::size_t w ) { + boost::asio::bind_executor(strand, [c, socket=socket, priority]( boost::system::error_code ec, std::size_t w ) { app().post(priority, [c, priority, ec, w]() { try { auto conn = c.lock(); @@ -1870,13 +1869,14 @@ namespace eosio { connection_wptr weak_conn = c; // Note: need to add support for IPv6 too + auto resolver = std::make_shared( my_impl->thread_pool->get_executor() ); resolver->async_resolve( query, boost::asio::bind_executor( c->strand, - [weak_conn, this]( const boost::system::error_code& err, tcp::resolver::iterator endpoint_itr ) { - app().post( priority::low, [err, endpoint_itr, weak_conn, this]() { + [weak_conn, resolver, this]( const boost::system::error_code& err, tcp::resolver::results_type endpoints ) { + app().post( priority::low, [err, resolver, endpoints, weak_conn, this]() { auto c = weak_conn.lock(); if( !c ) return; if( !err ) { - connect( c, endpoint_itr ); + connect( c, resolver, endpoints ); } else { fc_elog( logger, "Unable to resolve ${peer_addr}: ${error}", ("peer_addr", c->peer_name())( "error", err.message()) ); @@ -1885,19 +1885,18 @@ namespace eosio { } ) ); } - void net_plugin_impl::connect(const connection_ptr& c, tcp::resolver::iterator endpoint_itr) { + void net_plugin_impl::connect( const connection_ptr& c, const std::shared_ptr& resolver, tcp::resolver::results_type endpoints ) { if( c->no_retry != go_away_reason::no_reason) { string rsn = reason_str(c->no_retry); return; } - auto current_endpoint = *endpoint_itr; - ++endpoint_itr; c->connecting = true; c->pending_message_buffer.reset(); connection_wptr weak_conn = c; - c->socket->async_connect( current_endpoint, boost::asio::bind_executor( c->strand, - [weak_conn, endpoint_itr, this]( const boost::system::error_code& err ) { - app().post( priority::low, [weak_conn, endpoint_itr, this, err]() { + boost::asio::async_connect( *c->socket, endpoints, + boost::asio::bind_executor( c->strand, + [weak_conn, resolver, socket=c->socket, this]( const boost::system::error_code& err, const tcp::endpoint& endpoint ) { + app().post( priority::low, [weak_conn, this, err]() { auto c = weak_conn.lock(); if( !c ) return; if( !err && c->socket->is_open()) { @@ -1905,14 +1904,9 @@ namespace eosio { c->send_handshake(); } } else { - if( endpoint_itr != tcp::resolver::iterator()) { - close( c ); - connect( c, endpoint_itr ); - } else { - fc_elog( logger, "connection failed to ${peer}: ${error}", ("peer", c->peer_name())( "error", err.message())); - c->connecting = false; - my_impl->close( c ); - } + elog( "connection failed to ${peer}: ${error}", ("peer", c->peer_name())( "error", err.message()) ); + c->connecting = false; + my_impl->close( c ); } } ); } ) ); @@ -3023,14 +3017,14 @@ namespace eosio { // currently thread_pool only used for server_ioc my->thread_pool.emplace( "net", my->thread_pool_size ); - my->resolver = std::make_shared( my->thread_pool->get_executor() ); + auto resolver = std::make_shared( my_impl->thread_pool->get_executor() ); if( my->p2p_address.size() > 0 ) { auto host = my->p2p_address.substr( 0, my->p2p_address.find( ':' )); auto port = my->p2p_address.substr( host.size() + 1, my->p2p_address.size()); tcp::resolver::query query( tcp::v4(), host.c_str(), port.c_str()); // Note: need to add support for IPv6 too? - my->listen_endpoint = *my->resolver->resolve( query ); + my->listen_endpoint = *resolver->resolve( query ); my->acceptor.reset( new tcp::acceptor( my_impl->thread_pool->get_executor() ) );