Skip to content

Commit

Permalink
Merge pull request #2164 from bitshares/pr-2153-recheck-seeds
Browse files Browse the repository at this point in the history
P2P improvement: periodically re-check addresses of seed nodes
  • Loading branch information
abitmore authored May 3, 2020
2 parents 68eebf1 + 30648f2 commit ecc5cbb
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 76 deletions.
77 changes: 3 additions & 74 deletions libraries/app/application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
#include <fc/io/fstream.hpp>
#include <fc/rpc/api_connection.hpp>
#include <fc/rpc/websocket_api.hpp>
#include <fc/network/resolve.hpp>
#include <fc/crypto/base64.hpp>

#include <boost/filesystem/path.hpp>
Expand Down Expand Up @@ -125,62 +124,22 @@ void application_impl::reset_p2p_node(const fc::path& data_dir)
if( _options->count("seed-node") )
{
auto seeds = _options->at("seed-node").as<vector<string>>();
for( const string& endpoint_string : seeds )
{
try {
std::vector<fc::ip::endpoint> endpoints = resolve_string_to_ip_endpoints(endpoint_string);
for (const fc::ip::endpoint& endpoint : endpoints)
{
ilog("Adding seed node ${endpoint}", ("endpoint", endpoint));
_p2p_network->add_node(endpoint);
_p2p_network->connect_to_endpoint(endpoint);
}
} catch( const fc::exception& e ) {
wlog( "caught exception ${e} while adding seed node ${endpoint}",
("e", e.to_detail_string())("endpoint", endpoint_string) );
}
}
_p2p_network->add_seed_nodes(seeds);
}

if( _options->count("seed-nodes") )
{
auto seeds_str = _options->at("seed-nodes").as<string>();
auto seeds = fc::json::from_string(seeds_str).as<vector<string>>(2);
for( const string& endpoint_string : seeds )
{
try {
std::vector<fc::ip::endpoint> endpoints = resolve_string_to_ip_endpoints(endpoint_string);
for (const fc::ip::endpoint& endpoint : endpoints)
{
ilog("Adding seed node ${endpoint}", ("endpoint", endpoint));
_p2p_network->add_node(endpoint);
}
} catch( const fc::exception& e ) {
wlog( "caught exception ${e} while adding seed node ${endpoint}",
("e", e.to_detail_string())("endpoint", endpoint_string) );
}
}
_p2p_network->add_seed_nodes(seeds);
}
else
{
// https://bitsharestalk.org/index.php/topic,23715.0.html
vector<string> seeds = {
#include "../egenesis/seed-nodes.txt"
};
for( const string& endpoint_string : seeds )
{
try {
std::vector<fc::ip::endpoint> endpoints = resolve_string_to_ip_endpoints(endpoint_string);
for (const fc::ip::endpoint& endpoint : endpoints)
{
ilog("Adding seed node ${endpoint}", ("endpoint", endpoint));
_p2p_network->add_node(endpoint);
}
} catch( const fc::exception& e ) {
wlog( "caught exception ${e} while adding seed node ${endpoint}",
("e", e.to_detail_string())("endpoint", endpoint_string) );
}
}
_p2p_network->add_seed_nodes(seeds);
}

if( _options->count("p2p-endpoint") )
Expand All @@ -196,36 +155,6 @@ void application_impl::reset_p2p_node(const fc::path& data_dir)
std::vector<uint32_t>());
} FC_CAPTURE_AND_RETHROW() }

std::vector<fc::ip::endpoint> application_impl::resolve_string_to_ip_endpoints(const std::string& endpoint_string)
{
try
{
string::size_type colon_pos = endpoint_string.find(':');
if (colon_pos == std::string::npos)
FC_THROW("Missing required port number in endpoint string \"${endpoint_string}\"",
("endpoint_string", endpoint_string));
std::string port_string = endpoint_string.substr(colon_pos + 1);
try
{
uint16_t port = boost::lexical_cast<uint16_t>(port_string);

std::string hostname = endpoint_string.substr(0, colon_pos);
std::vector<fc::ip::endpoint> endpoints = fc::resolve(hostname, port);
if (endpoints.empty())
FC_THROW_EXCEPTION( fc::unknown_host_exception,
"The host name can not be resolved: ${hostname}",
("hostname", hostname) );
return endpoints;
}
catch (const boost::bad_lexical_cast&)
{
FC_THROW("Bad port: ${port}", ("port", port_string));
}
}
FC_CAPTURE_AND_RETHROW((endpoint_string))
}


void application_impl::new_connection( const fc::http::websocket_connection_ptr& c )
{
auto wsc = std::make_shared<fc::rpc::websocket_api_connection>(c, GRAPHENE_NET_MAX_NESTED_OBJECTS);
Expand Down
2 changes: 0 additions & 2 deletions libraries/app/application_impl.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ class application_impl : public net::node_delegate

void reset_p2p_node(const fc::path& data_dir);

std::vector<fc::ip::endpoint> resolve_string_to_ip_endpoints(const std::string& endpoint_string);

void new_connection( const fc::http::websocket_connection_ptr& c );

void reset_websocket_server();
Expand Down
23 changes: 23 additions & 0 deletions libraries/net/include/graphene/net/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,11 +211,34 @@ namespace graphene { namespace net {
*/
void add_node( const fc::ip::endpoint& ep );

/*****
* @brief add a list of nodes to seed the p2p network
* @param seeds a vector of url strings
*/
void add_seed_nodes( std::vector<std::string> seeds );

/****
* @brief add a node to seed the p2p network
* @param in the url as a string
*/
void add_seed_node( const std::string& in);

/**
* Attempt to connect to the specified endpoint immediately.
*/
virtual void connect_to_endpoint( const fc::ip::endpoint& ep );

/**
* @brief Helper to convert a string to a collection of endpoints
*
* This converts a string (i.e. "bitshares.eu:665535" to a collection of endpoints.
* NOTE: Throws an exception if not in correct format or was unable to resolve URL.
*
* @param in the incoming string
* @returns a vector of endpoints
*/
static std::vector<fc::ip::endpoint> resolve_string_to_ip_endpoints( const std::string& in );

/**
* Specifies the network interface and port upon which incoming
* connections should be accepted.
Expand Down
130 changes: 130 additions & 0 deletions libraries/net/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <iostream>
#include <algorithm>
#include <tuple>
#include <string>
#include <boost/tuple/tuple.hpp>
#include <boost/circular_buffer.hpp>

Expand Down Expand Up @@ -70,6 +71,7 @@
#include <fc/crypto/rand.hpp>
#include <fc/network/rate_limiting.hpp>
#include <fc/network/ip.hpp>
#include <fc/network/resolve.hpp>

#include <graphene/net/node.hpp>
#include <graphene/net/peer_database.hpp>
Expand Down Expand Up @@ -481,6 +483,43 @@ namespace graphene { namespace net { namespace detail {
// _retrigger_connect_loop_promise->set_value();
}

void node_impl::update_seed_nodes_task()
{
VERIFY_CORRECT_THREAD();

try
{
dlog("Starting an iteration of update_seed_nodes loop.");
for( const std::string& endpoint_string : _seed_nodes )
{
resolve_seed_node_and_add( endpoint_string );
}
dlog("Done an iteration of update_seed_nodes loop.");
}
catch (const fc::canceled_exception&)
{
throw;
}
FC_CAPTURE_AND_LOG( (_seed_nodes) )

schedule_next_update_seed_nodes_task();
}

void node_impl::schedule_next_update_seed_nodes_task()
{
VERIFY_CORRECT_THREAD();

if( _node_is_shutting_down )
return;

if( _update_seed_nodes_loop_done.valid() && _update_seed_nodes_loop_done.canceled() )
return;

_update_seed_nodes_loop_done = fc::schedule( [this]() { update_seed_nodes_task(); },
fc::time_point::now() + fc::hours(3),
"update_seed_nodes_loop" );
}

bool node_impl::have_already_received_sync_item( const item_hash_t& item_hash )
{
VERIFY_CORRECT_THREAD();
Expand Down Expand Up @@ -3760,6 +3799,20 @@ namespace graphene { namespace net { namespace detail {
wlog( "Exception thrown while terminating Fetch updated peer lists loop, ignoring" );
}

try
{
_update_seed_nodes_loop_done.cancel_and_wait("node_impl::close()");
dlog("Update seed nodes loop terminated");
}
catch ( const fc::exception& e )
{
wlog( "Exception thrown while terminating Update seed nodes loop, ignoring: ${e}", ("e", e) );
}
catch (...)
{
wlog( "Exception thrown while terminating Update seed nodes loop, ignoring" );
}

try
{
_bandwidth_monitor_loop_done.cancel_and_wait("node_impl::close()");
Expand Down Expand Up @@ -4164,6 +4217,7 @@ namespace graphene { namespace net { namespace detail {

assert(!_accept_loop_complete.valid() &&
!_p2p_network_connect_loop_done.valid() &&
!_update_seed_nodes_loop_done.valid() &&
!_fetch_sync_items_loop_done.valid() &&
!_fetch_item_loop_done.valid() &&
!_advertise_inventory_loop_done.valid() &&
Expand All @@ -4181,6 +4235,7 @@ namespace graphene { namespace net { namespace detail {
_fetch_updated_peer_lists_loop_done = fc::async([=](){ fetch_updated_peer_lists_loop(); }, "fetch_updated_peer_lists_loop");
_bandwidth_monitor_loop_done = fc::async([=](){ bandwidth_monitor_loop(); }, "bandwidth_monitor_loop");
_dump_node_status_task_done = fc::async([=](){ dump_node_status_task(); }, "dump_node_status_task");
schedule_next_update_seed_nodes_task();
}

void node_impl::add_node(const fc::ip::endpoint& ep)
Expand All @@ -4198,6 +4253,33 @@ namespace graphene { namespace net { namespace detail {
trigger_p2p_network_connect_loop();
}

void node_impl::add_seed_node(const std::string& endpoint_string)
{
VERIFY_CORRECT_THREAD();
_seed_nodes.insert( endpoint_string );
resolve_seed_node_and_add( endpoint_string );
}

void node_impl::resolve_seed_node_and_add(const std::string& endpoint_string)
{
VERIFY_CORRECT_THREAD();
std::vector<fc::ip::endpoint> endpoints;
ilog("Resolving seed node ${endpoint}", ("endpoint", endpoint_string));
try
{
endpoints = graphene::net::node::resolve_string_to_ip_endpoints(endpoint_string);
}
catch(...)
{
wlog( "Unable to resolve endpoint during attempt to add seed node ${ep}", ("ep", endpoint_string) );
}
for (const fc::ip::endpoint& endpoint : endpoints)
{
ilog("Adding seed node ${endpoint}", ("endpoint", endpoint));
add_node(endpoint);
}
}

void node_impl::initiate_connect_to(const peer_connection_ptr& new_peer)
{
new_peer->get_socket().open();
Expand Down Expand Up @@ -5104,4 +5186,52 @@ namespace graphene { namespace net { namespace detail {

} // end namespace detail

// TODO move this function to impl class
std::vector<fc::ip::endpoint> node::resolve_string_to_ip_endpoints(const std::string& in)
{
try
{
std::string::size_type colon_pos = in.find(':');
if (colon_pos == std::string::npos)
FC_THROW("Missing required port number in endpoint string \"${endpoint_string}\"",
("endpoint_string", in));
std::string port_string = in.substr(colon_pos + 1);
try
{
uint16_t port = boost::lexical_cast<uint16_t>(port_string);

std::string hostname = in.substr(0, colon_pos);
std::vector<fc::ip::endpoint> endpoints = fc::resolve(hostname, port);
if (endpoints.empty())
FC_THROW_EXCEPTION( fc::unknown_host_exception,
"The host name can not be resolved: ${hostname}",
("hostname", hostname) );
return endpoints;
}
catch (const boost::bad_lexical_cast&)
{
FC_THROW("Bad port: ${port}", ("port", port_string));
}
}
FC_CAPTURE_AND_RETHROW((in))
}

void node::add_seed_nodes(std::vector<std::string> seeds)
{
for(const std::string& endpoint_string : seeds )
{
try {
add_seed_node(endpoint_string);
} catch( const fc::exception& e ) {
wlog( "caught exception ${e} while adding seed node ${endpoint}",
("e", e.to_detail_string())("endpoint", endpoint_string) );
}
}
}

void node::add_seed_node(const std::string& in)
{
INVOKE_IN_IMPL(add_seed_node, in);
}

} } // end namespace graphene::net
10 changes: 10 additions & 0 deletions libraries/net/node_impl.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,14 @@ class node_impl : public peer_connection_delegate

std::list<fc::future<void> > _handle_message_calls_in_progress;

/// used by the task that checks whether addresses of seed nodes have been updated
// @{
boost::container::flat_set<std::string> _seed_nodes;
fc::future<void> _update_seed_nodes_loop_done;
void update_seed_nodes_task();
void schedule_next_update_seed_nodes_task();
// @}

node_impl(const std::string& user_agent);
virtual ~node_impl();

Expand Down Expand Up @@ -482,6 +490,8 @@ class node_impl : public peer_connection_delegate
void listen_to_p2p_network();
void connect_to_p2p_network();
void add_node( const fc::ip::endpoint& ep );
void add_seed_node( const std::string& seed_string );
void resolve_seed_node_and_add( const std::string& seed_string );
void initiate_connect_to(const peer_connection_ptr& peer);
void connect_to_endpoint(const fc::ip::endpoint& ep);
void listen_on_endpoint(const fc::ip::endpoint& ep , bool wait_if_not_available);
Expand Down

0 comments on commit ecc5cbb

Please sign in to comment.