diff --git a/Docker/config.ini b/Docker/config.ini index 3dd9181f359..a85918d236b 100644 --- a/Docker/config.ini +++ b/Docker/config.ini @@ -1,15 +1,3 @@ -# the endpoint upon which to listen for incoming connections (eosio::bnet_plugin) -bnet-endpoint = 0.0.0.0:4321 - -# the number of threads to use to process network messages (eosio::bnet_plugin) -# bnet-threads = - -# remote endpoint of other node to connect to; Use multiple bnet-connect options as needed to compose a network (eosio::bnet_plugin) -# bnet-connect = - -# this peer will request no pending transactions from other nodes (eosio::bnet_plugin) -bnet-no-trx = false - # the location of the blocks directory (absolute path or relative to application data dir) (eosio::chain_plugin) blocks-dir = "blocks" diff --git a/libraries/chain/include/eosio/chain/chain_id_type.hpp b/libraries/chain/include/eosio/chain/chain_id_type.hpp index a16fc143ae6..59ab8f248b0 100644 --- a/libraries/chain/include/eosio/chain/chain_id_type.hpp +++ b/libraries/chain/include/eosio/chain/chain_id_type.hpp @@ -47,8 +47,6 @@ namespace chain { friend class eosio::net_plugin_impl; friend struct eosio::handshake_message; - - friend struct ::hello; // TODO: Rushed hack to support bnet_plugin. Need a better solution. }; } } // namespace eosio::chain diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt index 8c93df9c48e..e07a10c5b8d 100644 --- a/plugins/CMakeLists.txt +++ b/plugins/CMakeLists.txt @@ -1,4 +1,3 @@ -add_subdirectory(bnet_plugin) add_subdirectory(net_plugin) add_subdirectory(net_api_plugin) add_subdirectory(http_plugin) diff --git a/plugins/bnet_plugin/CMakeLists.txt b/plugins/bnet_plugin/CMakeLists.txt deleted file mode 100644 index d49438298cf..00000000000 --- a/plugins/bnet_plugin/CMakeLists.txt +++ /dev/null @@ -1,7 +0,0 @@ -file(GLOB HEADERS "include/eosio/bnet_plugin/*.hpp") -add_library( bnet_plugin - bnet_plugin.cpp - ${HEADERS} ) - -target_link_libraries( bnet_plugin chain_plugin eosio_chain appbase ) -target_include_directories( bnet_plugin PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" ) diff --git a/plugins/bnet_plugin/bnet_plugin.cpp b/plugins/bnet_plugin/bnet_plugin.cpp deleted file mode 100644 index 08d2091040f..00000000000 --- a/plugins/bnet_plugin/bnet_plugin.cpp +++ /dev/null @@ -1,1569 +0,0 @@ -/** - * The purpose of this protocol is to synchronize (and keep synchronized) two - * blockchains using a very simple algorithm: - * - * 1. find the last block id on our local chain that the remote peer knows about - * 2. if we have the next block send it to them - * 3. if we don't have the next block send them a the oldest unexpired transaction - * - * There are several input events: - * - * 1. new block accepted by local chain - * 2. block deemed irreversible by local chain - * 3. new block header accepted by local chain - * 4. transaction accepted by local chain - * 5. block received from remote peer - * 6. transaction received from remote peer - * 7. socket ready for next write - * - * Each session is responsible for maintaining the following - * - * 1. the most recent block on our current best chain which we know - * with certainty that the remote peer has. - * - this could be the peers last irreversible block - * - a block ID after the LIB that the peer has notified us of - * - a block which we have sent to the remote peer - * - a block which the peer has sent us - * 2. the block IDs we have received from the remote peer so that - * we can disconnect peer if one of those blocks is deemed invalid - * - we can clear these IDs once the block becomes reversible - * 3. the transactions we have received from the remote peer so that - * we do not send them something that they already know. - * - this includes transactions sent as part of blocks - * - we clear this cache after we have applied a block that - * includes the transactions because we know the controller - * should not notify us again (they would be dupe) - * - * Assumptions: - * 1. all blocks we send the peer are valid and will be held in the - * peers fork database until they become irreversible or are replaced - * by an irreversible alternative. - * 2. we don't care what fork the peer is on, so long as we know they have - * the block prior to the one we want to send. The peer will sort it out - * with its fork database and hopfully come to our conclusion. - * 3. the peer will send us blocks on the same basis - * - */ - -#include -#include -#include -#include - -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include - -#include - -using tcp = boost::asio::ip::tcp; -namespace ws = boost::beast::websocket; - -namespace eosio { - using namespace chain; - - static appbase::abstract_plugin& _bnet_plugin = app().register_plugin(); - -} /// namespace eosio - -namespace fc { - extern std::unordered_map& get_logger_map(); -} - -const fc::string logger_name("bnet_plugin"); -fc::logger plugin_logger; -std::string peer_log_format; - -#define peer_dlog( PEER, FORMAT, ... ) \ - FC_MULTILINE_MACRO_BEGIN \ - if( plugin_logger.is_enabled( fc::log_level::debug ) ) \ - plugin_logger.log( FC_LOG_MESSAGE( debug, peer_log_format + FORMAT, __VA_ARGS__ (PEER->get_logger_variant()) ) ); \ - FC_MULTILINE_MACRO_END - -#define peer_ilog( PEER, FORMAT, ... ) \ - FC_MULTILINE_MACRO_BEGIN \ - if( plugin_logger.is_enabled( fc::log_level::info ) ) \ - plugin_logger.log( FC_LOG_MESSAGE( info, peer_log_format + FORMAT, __VA_ARGS__ (PEER->get_logger_variant()) ) ); \ - FC_MULTILINE_MACRO_END - -#define peer_wlog( PEER, FORMAT, ... ) \ - FC_MULTILINE_MACRO_BEGIN \ - if( plugin_logger.is_enabled( fc::log_level::warn ) ) \ - plugin_logger.log( FC_LOG_MESSAGE( warn, peer_log_format + FORMAT, __VA_ARGS__ (PEER->get_logger_variant()) ) ); \ - FC_MULTILINE_MACRO_END - -#define peer_elog( PEER, FORMAT, ... ) \ - FC_MULTILINE_MACRO_BEGIN \ - if( plugin_logger.is_enabled( fc::log_level::error ) ) \ - plugin_logger.log( FC_LOG_MESSAGE( error, peer_log_format + FORMAT, __VA_ARGS__ (PEER->get_logger_variant())) ); \ - FC_MULTILINE_MACRO_END - - -using eosio::public_key_type; -using eosio::chain_id_type; -using eosio::block_id_type; -using eosio::block_timestamp_type; -using std::string; -using eosio::sha256; -using eosio::signed_block_ptr; -using eosio::packed_transaction_ptr; -using std::vector; - -struct hello { - public_key_type peer_id; - string network_version; - string agent; - string protocol_version = "1.0.1"; - string user; - string password; - chain_id_type chain_id; - bool request_transactions = false; - uint32_t last_irr_block_num = 0; - vector pending_block_ids; -}; -// @swap user, password -FC_REFLECT( hello, (peer_id)(network_version)(user)(password)(agent)(protocol_version)(chain_id)(request_transactions)(last_irr_block_num)(pending_block_ids) ) - -struct hello_extension_irreversible_only {}; - -FC_REFLECT( hello_extension_irreversible_only, BOOST_PP_SEQ_NIL ) - -using hello_extension = fc::static_variant; - -/** - * This message is sent upon successful speculative application of a transaction - * and informs a peer not to send this message. - */ -struct trx_notice { - vector signed_trx_id; ///< hash of trx + sigs -}; - -FC_REFLECT( trx_notice, (signed_trx_id) ) - -/** - * This message is sent upon successfully adding a transaction to the fork database - * and informs the remote peer that there is no need to send this block. - */ -struct block_notice { - vector block_ids; -}; - -FC_REFLECT( block_notice, (block_ids) ); - -struct ping { - fc::time_point sent; - fc::sha256 code; - uint32_t lib; ///< the last irreversible block -}; -FC_REFLECT( ping, (sent)(code)(lib) ) - -struct pong { - fc::time_point sent; - fc::sha256 code; -}; -FC_REFLECT( pong, (sent)(code) ) - -using bnet_message = fc::static_variant; - - -struct by_id; -struct by_num; -struct by_received; -struct by_expired; - -namespace eosio { - using namespace chain::plugin_interface; - - class bnet_plugin_impl; - - template - void verify_strand_in_this_thread(const Strand& strand, const char* func, int line) { - if( !strand.running_in_this_thread() ) { - elog( "wrong strand: ${f} : line ${n}, exiting", ("f", func)("n", line) ); - app().quit(); - } - } - - /** - * Each session is presumed to operate in its own strand so that - * operations can execute in parallel. - */ - class session : public std::enable_shared_from_this - { - public: - enum session_state { - hello_state, - sending_state, - idle_state - }; - - struct block_status { - block_status( block_id_type i, bool kby_peer, bool rfrom_peer) - { - known_by_peer = kby_peer; - received_from_peer = rfrom_peer; - id = i; - } - - bool known_by_peer = false; ///< we sent block to peer or peer sent us notice - bool received_from_peer = false; ///< peer sent us this block and considers full block valid - block_id_type id; ///< the block id; - // block_id_type prev; ///< the prev block id - - // shared_ptr< vector > block_msg; ///< packed bnet_message for this block - - uint32_t block_num()const { return block_header::num_from_id(id); } - }; - - typedef boost::multi_index_container, member >, - ordered_non_unique< tag, const_mem_fun > - > - > block_status_index; - - - struct transaction_status { - time_point received; - time_point expired; /// 5 seconds from last accepted - transaction_id_type id; - transaction_metadata_ptr trx; - - void mark_known_by_peer() { received = fc::time_point::maximum(); trx.reset(); } - bool known_by_peer()const { return received == fc::time_point::maximum(); } - }; - - typedef boost::multi_index_container, member >, - ordered_non_unique< tag, member >, - ordered_non_unique< tag, member > - > - > transaction_status_index; - - block_status_index _block_status; - transaction_status_index _transaction_status; - const uint32_t _max_block_status_range = 2048; // limit tracked block_status known_by_peer - - public_key_type _local_peer_id; - uint32_t _local_lib = 0; - block_id_type _local_lib_id; - uint32_t _local_head_block_num = 0; - block_id_type _local_head_block_id; /// the last block id received on local channel - - - public_key_type _remote_peer_id; - uint32_t _remote_lib = 0; - block_id_type _remote_lib_id; - bool _remote_request_trx = false; - bool _remote_request_irreversible_only = false; - - uint32_t _last_sent_block_num = 0; - block_id_type _last_sent_block_id; /// the id of the last block sent - bool _recv_remote_hello = false; - bool _sent_remote_hello = false; - - - fc::sha256 _current_code; - fc::time_point _last_recv_ping_time = fc::time_point::now(); - ping _last_recv_ping; - ping _last_sent_ping; - - - int _session_num = 0; - session_state _state = hello_state; - tcp::resolver _resolver; - bnet_ptr _net_plugin; - boost::asio::io_service& _ios; - unique_ptr> _ws; - boost::asio::strand< boost::asio::io_context::executor_type> _strand; - - methods::get_block_by_number::method_type& _get_block_by_number; - - - string _peer; - string _remote_host; - string _remote_port; - - vector _out_buffer; - //boost::beast::multi_buffer _in_buffer; - boost::beast::flat_buffer _in_buffer; - flat_set _block_header_notices; - fc::optional _logger_variant; - - - int next_session_id()const { - static std::atomic session_count(0); - return ++session_count; - } - - /** - * Creating session from server socket acceptance - */ - explicit session( tcp::socket socket, bnet_ptr net_plug ) - :_resolver(socket.get_io_service()), - _net_plugin( std::move(net_plug) ), - _ios(socket.get_io_service()), - _ws( new ws::stream(move(socket)) ), - _strand(_ws->get_executor() ), - _get_block_by_number( app().get_method() ) - { - _session_num = next_session_id(); - set_socket_options(); - _ws->binary(true); - wlog( "open session ${n}",("n",_session_num) ); - } - - - /** - * Creating outgoing session - */ - explicit session( boost::asio::io_context& ioc, bnet_ptr net_plug ) - :_resolver(ioc), - _net_plugin( std::move(net_plug) ), - _ios(ioc), - _ws( new ws::stream(ioc) ), - _strand( _ws->get_executor() ), - _get_block_by_number( app().get_method() ) - { - _session_num = next_session_id(); - _ws->binary(true); - wlog( "open session ${n}",("n",_session_num) ); - } - - ~session(); - - - void set_socket_options() { - try { - /** to minimize latency when sending short messages */ - _ws->next_layer().set_option( boost::asio::ip::tcp::no_delay(true) ); - - /** to minimize latency when sending large 1MB blocks, the send buffer should not have to - * wait for an "ack", making this larger could result in higher latency for smaller urgent - * messages. - */ - _ws->next_layer().set_option( boost::asio::socket_base::send_buffer_size( 1024*1024 ) ); - _ws->next_layer().set_option( boost::asio::socket_base::receive_buffer_size( 1024*1024 ) ); - } catch ( ... ) { - elog( "uncaught exception on set socket options" ); - } - } - - void run() { - _ws->async_accept( boost::asio::bind_executor( - _strand, - std::bind( &session::on_accept, - shared_from_this(), - std::placeholders::_1) ) ); - } - - void run( const string& peer ) { - auto c = peer.find(':'); - auto host = peer.substr( 0, c ); - auto port = peer.substr( c+1, peer.size() ); - - _peer = peer; - _remote_host = host; - _remote_port = port; - - _resolver.async_resolve( _remote_host, _remote_port, - boost::asio::bind_executor( _strand, - std::bind( &session::on_resolve, - shared_from_this(), - std::placeholders::_1, - std::placeholders::_2 ) ) ); - } - - void on_resolve( boost::system::error_code ec, - tcp::resolver::results_type results ) { - if( ec ) return on_fail( ec, "resolve" ); - - boost::asio::async_connect( _ws->next_layer(), - results.begin(), results.end(), - boost::asio::bind_executor( _strand, - std::bind( &session::on_connect, - shared_from_this(), - std::placeholders::_1 ) ) ); - } - - void on_connect( boost::system::error_code ec ) { - if( ec ) return on_fail( ec, "connect" ); - - set_socket_options(); - - _ws->async_handshake( _remote_host, "/", - boost::asio::bind_executor( _strand, - std::bind( &session::on_handshake, - shared_from_this(), - std::placeholders::_1 ) ) ); - } - - void on_handshake( boost::system::error_code ec ) { - if( ec ) return on_fail( ec, "handshake" ); - - do_hello(); - do_read(); - } - - /** - * This will be called "every time" a the transaction is accepted which happens - * on the speculative block (potentially several such blocks) and when a block - * that contains the transaction is applied and/or when switching forks. - * - * We will add it to the transaction status table as "received now" to be the - * basis of sending it to the peer. When we send it to the peer "received now" - * will be set to the infinite future to mark it as sent so we don't resend it - * when it is accepted again. - * - * Each time the transaction is "accepted" we extend the time we cache it by - * 5 seconds from now. Every time a block is applied we purge all accepted - * transactions that have reached 5 seconds without a new "acceptance". - */ - void on_accepted_transaction( transaction_metadata_ptr t ) { - //ilog( "accepted ${t}", ("t",t->id) ); - auto itr = _transaction_status.find( t->id ); - if( itr != _transaction_status.end() ) { - if( !itr->known_by_peer() ) { - _transaction_status.modify( itr, [&]( auto& stat ) { - stat.expired = std::min( fc::time_point::now() + fc::seconds(5), t->packed_trx->expiration() ); - }); - } - return; - } - - transaction_status stat; - stat.received = fc::time_point::now(); - stat.expired = stat.received + fc::seconds(5); - stat.id = t->id; - stat.trx = t; - _transaction_status.insert( stat ); - - maybe_send_next_message(); - } - - /** - * Remove all transactions that expired from cache prior to now - */ - void purge_transaction_cache() { - auto& idx = _transaction_status.get(); - auto itr = idx.begin(); - auto now = fc::time_point::now(); - while( itr != idx.end() && itr->expired < now ) { - idx.erase(itr); - itr = idx.begin(); - } - } - - /** - * When our local LIB advances we can purge our known history up to - * the LIB or up to the last block known by the remote peer. - */ - void on_new_lib( block_state_ptr s ) { - verify_strand_in_this_thread(_strand, __func__, __LINE__); - _local_lib = s->block_num; - _local_lib_id = s->id; - - auto purge_to = std::min( _local_lib, _last_sent_block_num ); - - auto& idx = _block_status.get(); - auto itr = idx.begin(); - while( itr != idx.end() && itr->block_num() < purge_to ) { - idx.erase(itr); - itr = idx.begin(); - } - - if( _remote_request_irreversible_only ) { - auto bitr = _block_status.find(s->id); - if ( bitr == _block_status.end() || !bitr->received_from_peer ) { - _block_header_notices.insert(s->id); - } - } - - maybe_send_next_message(); - } - - - void on_bad_block( signed_block_ptr b ) { - verify_strand_in_this_thread(_strand, __func__, __LINE__); - try { - auto id = b->id(); - auto itr = _block_status.find( id ); - if( itr == _block_status.end() ) return; - if( itr->received_from_peer ) { - peer_elog(this, "bad signed_block_ptr : unknown" ); - elog( "peer sent bad block #${b} ${i}, disconnect", ("b", b->block_num())("i",b->id()) ); - _ws->next_layer().close(); - } - } catch ( ... ) { - elog( "uncaught exception" ); - } - } - - void on_accepted_block_header( const block_state_ptr& s ) { - verify_strand_in_this_thread(_strand, __func__, __LINE__); - // ilog( "accepted block header ${n}", ("n",s->block_num) ); - const auto& id = s->id; - - if( fc::time_point::now() - s->block->timestamp < fc::seconds(6) ) { - // ilog( "queue notice to peer that we have this block so hopefully they don't send it to us" ); - auto itr = _block_status.find( id ); - if( !_remote_request_irreversible_only && ( itr == _block_status.end() || !itr->received_from_peer ) ) { - _block_header_notices.insert( id ); - } - if( itr == _block_status.end() ) { - _block_status.insert( block_status(id, false, false) ); - } - } - } - - void on_accepted_block( const block_state_ptr& s ) { - verify_strand_in_this_thread(_strand, __func__, __LINE__); - //idump((_block_status.size())(_transaction_status.size())); - //ilog( "accepted block ${n}", ("n",s->block_num) ); - - const auto& id = s->id; - - _local_head_block_id = id; - _local_head_block_num = block_header::num_from_id(id); - - if( _local_head_block_num < _last_sent_block_num ) { - _last_sent_block_num = _local_lib; - _last_sent_block_id = _local_lib_id; - } - - purge_transaction_cache(); - - /** purge all transactions from cache, I will send them as part of a block - * in the future unless peer tells me they already have block. - */ - for( const auto& receipt : s->block->transactions ) { - if( receipt.trx.which() == 1 ) { - const auto& pt = receipt.trx.get(); - const auto& tid = pt.id(); - auto itr = _transaction_status.find( tid ); - if( itr != _transaction_status.end() ) - _transaction_status.erase(itr); - } - } - - maybe_send_next_message(); /// attempt to send if we are idle - } - - - template - void async_get_pending_block_ids( L&& callback ) { - /// send peer my head block status which is read from chain plugin - app().post(priority::low, [self = shared_from_this(),callback]{ - auto& control = app().get_plugin().chain(); - auto lib = control.last_irreversible_block_num(); - auto head = control.fork_db_head_block_id(); - auto head_num = block_header::num_from_id(head); - - - std::vector ids; - if( lib > 0 ) { - ids.reserve((head_num-lib)+1); - for( auto i = lib; i <= head_num; ++i ) { - ids.emplace_back(control.get_block_id_for_num(i)); - } - } - self->_ios.post( boost::asio::bind_executor( - self->_strand, - [callback,ids,lib](){ - callback(ids,lib); - } - )); - }); - } - - template - void async_get_block_num( uint32_t blocknum, L&& callback ) { - app().post(priority::low, [self = shared_from_this(), blocknum, callback]{ - auto& control = app().get_plugin().chain(); - signed_block_ptr sblockptr; - try { - //ilog( "fetch block ${n}", ("n",blocknum) ); - sblockptr = control.fetch_block_by_number( blocknum ); - } catch ( const fc::exception& e ) { - edump((e.to_detail_string())); - } - - self->_ios.post( boost::asio::bind_executor( - self->_strand, - [callback,sblockptr](){ - callback(sblockptr); - } - )); - }); - } - - void do_hello(); - - - void send( const bnet_message& msg ) { try { - auto ps = fc::raw::pack_size(msg); - _out_buffer.resize(ps); - fc::datastream ds(_out_buffer.data(), ps); - fc::raw::pack(ds, msg); - send(); - } FC_LOG_AND_RETHROW() } - - template - void send( const bnet_message& msg, const T& ex ) { try { - auto ex_size = fc::raw::pack_size(ex); - auto ps = fc::raw::pack_size(msg) + fc::raw::pack_size(unsigned_int(ex_size)) + ex_size; - _out_buffer.resize(ps); - fc::datastream ds(_out_buffer.data(), ps); - fc::raw::pack( ds, msg ); - fc::raw::pack( ds, unsigned_int(ex_size) ); - fc::raw::pack( ds, ex ); - send(); - } FC_LOG_AND_RETHROW() } - - void send() { try { - verify_strand_in_this_thread(_strand, __func__, __LINE__); - - _state = sending_state; - _ws->async_write( boost::asio::buffer(_out_buffer), - boost::asio::bind_executor( - _strand, - std::bind( &session::on_write, - shared_from_this(), - std::placeholders::_1, - std::placeholders::_2 ) ) ); - } FC_LOG_AND_RETHROW() } - - void mark_block_status( const block_id_type& id, bool known_by_peer, bool recv_from_peer ) { - auto itr = _block_status.find(id); - if( itr == _block_status.end() ) { - // optimization to avoid sending blocks to nodes that already know about them - // to avoid unbounded memory growth limit number tracked - const auto min_block_num = std::min( _local_lib, _last_sent_block_num ); - const auto max_block_num = min_block_num + _max_block_status_range; - const auto block_num = block_header::num_from_id( id ); - if( block_num > min_block_num && block_num < max_block_num && _block_status.size() < _max_block_status_range ) - _block_status.insert( block_status( id, known_by_peer, recv_from_peer ) ); - } else { - _block_status.modify( itr, [&]( auto& item ) { - item.known_by_peer = known_by_peer; - if (recv_from_peer) item.received_from_peer = true; - }); - } - } - - /** - * This method will determine whether there is a message in the - * out queue, if so it returns. Otherwise it determines the best - * message to send. - */ - void maybe_send_next_message() { - verify_strand_in_this_thread(_strand, __func__, __LINE__); - if( _state == sending_state ) return; /// in process of sending - if( _out_buffer.size() ) return; /// in process of sending - if( !_recv_remote_hello || !_sent_remote_hello ) return; - - clear_expired_trx(); - - if( send_block_notice() ) return; - if( send_pong() ) return; - if( send_ping() ) return; - - /// we don't know where we are (waiting on accept block localhost) - if( _local_head_block_id == block_id_type() ) return ; - if( send_next_block() ) return; - if( send_next_trx() ) return; - } - - bool send_block_notice() { - if( _block_header_notices.size() == 0 ) - return false; - - block_notice notice; - notice.block_ids.reserve( _block_header_notices.size() ); - for( auto& id : _block_header_notices ) - notice.block_ids.emplace_back(id); - send(notice); - _block_header_notices.clear(); - return true; - } - - bool send_pong() { - if( _last_recv_ping.code == fc::sha256() ) - return false; - - send( pong{ fc::time_point::now(), _last_recv_ping.code } ); - _last_recv_ping.code = fc::sha256(); - return true; - } - - bool send_ping() { - auto delta_t = fc::time_point::now() - _last_sent_ping.sent; - if( delta_t < fc::seconds(3) ) return false; - - if( _last_sent_ping.code == fc::sha256() ) { - _last_sent_ping.sent = fc::time_point::now(); - _last_sent_ping.code = fc::sha256::hash(_last_sent_ping.sent); /// TODO: make this more random - _last_sent_ping.lib = _local_lib; - send( _last_sent_ping ); - } - - /// we expect the peer to send us a ping every 3 seconds, so if we haven't gotten one - /// in the past 6 seconds then the connection is likely hung. Unfortunately, we cannot - /// use the round-trip time of ping/pong to measure latency because during syncing the - /// remote peer can be stuck doing CPU intensive tasks that block its reading of the - /// buffer. This buffer gets filled with perhaps 100 blocks taking .1 seconds each for - /// a total processing time of 10+ seconds. That said, the peer should come up for air - /// every .1 seconds so should still be able to send out a ping every 3 seconds. - // - // We don't want to wait a RTT for each block because that could also slow syncing for - // empty blocks... - // - //if( fc::time_point::now() - _last_recv_ping_time > fc::seconds(6) ) { - // do_goodbye( "no ping from peer in last 6 seconds...." ); - //} - return true; - } - - bool is_known_by_peer( block_id_type id ) { - auto itr = _block_status.find(id); - if( itr == _block_status.end() ) return false; - return itr->known_by_peer; - } - - void clear_expired_trx() { - auto& idx = _transaction_status.get(); - auto itr = idx.begin(); - while( itr != idx.end() && itr->expired < fc::time_point::now() ) { - idx.erase(itr); - itr = idx.begin(); - } - } - - bool send_next_trx() { try { - if( !_remote_request_trx ) return false; - - auto& idx = _transaction_status.get(); - auto start = idx.begin(); - if( start == idx.end() || start->known_by_peer() ) - return false; - - - auto ptrx_ptr = start->trx->packed_trx; - - idx.modify( start, [&]( auto& stat ) { - stat.mark_known_by_peer(); - }); - - // wlog("sending trx ${id}", ("id",start->id) ); - send(ptrx_ptr); - - return true; - - } FC_LOG_AND_RETHROW() } - - void on_async_get_block( const signed_block_ptr& nextblock ) { - verify_strand_in_this_thread(_strand, __func__, __LINE__); - if( !nextblock) { - _state = idle_state; - maybe_send_next_message(); - return; - } - - /// if something changed, the next block doesn't link to the last - /// block we sent, local chain must have switched forks - if( nextblock->previous != _last_sent_block_id && _last_sent_block_id != block_id_type() ) { - if( !is_known_by_peer( nextblock->previous ) ) { - _last_sent_block_id = _local_lib_id; - _last_sent_block_num = _local_lib; - _state = idle_state; - maybe_send_next_message(); - return; - } - } - - /// at this point we know the peer can link this block - - auto next_id = nextblock->id(); - - /// if the peer already knows about this block, great no need to - /// send it, mark it as 'sent' and move on. - if( is_known_by_peer( next_id ) ) { - _last_sent_block_id = next_id; - _last_sent_block_num = nextblock->block_num(); - - _state = idle_state; - maybe_send_next_message(); - return; - } - - mark_block_status( next_id, true, false ); - - _last_sent_block_id = next_id; - _last_sent_block_num = nextblock->block_num(); - - send( nextblock ); - status( "sending block " + std::to_string( block_header::num_from_id(next_id) ) ); - - if( nextblock->timestamp > (fc::time_point::now() - fc::seconds(5)) ) { - mark_block_transactions_known_by_peer( nextblock ); - } - } - - /** - * Send the next block after the last block in our current fork that - * we know the remote peer knows. - */ - bool send_next_block() { - - if ( _remote_request_irreversible_only && _last_sent_block_id == _local_lib_id ) { - return false; - } - - if( _last_sent_block_id == _local_head_block_id ) /// we are caught up - return false; - - ///< set sending state because this callback may result in sending a message - _state = sending_state; - async_get_block_num( _last_sent_block_num + 1, - [self=shared_from_this()]( auto sblockptr ) { - self->on_async_get_block( sblockptr ); - }); - - return true; - } - - void on_fail( boost::system::error_code ec, const char* what ) { - try { - verify_strand_in_this_thread(_strand, __func__, __LINE__); - elog( "${w}: ${m}", ("w", what)("m", ec.message() ) ); - _ws->next_layer().close(); - } catch ( ... ) { - elog( "uncaught exception on close" ); - } - } - - void on_accept( boost::system::error_code ec ) { - if( ec ) { - return on_fail( ec, "accept" ); - } - - do_hello(); - do_read(); - } - - void do_read() { - _ws->async_read( _in_buffer, - boost::asio::bind_executor( - _strand, - std::bind( &session::on_read, - shared_from_this(), - std::placeholders::_1, - std::placeholders::_2))); - } - - void on_read( boost::system::error_code ec, std::size_t bytes_transferred ) { - boost::ignore_unused(bytes_transferred); - - if( ec == ws::error::closed ) - return on_fail( ec, "close on read" ); - - if( ec ) { - return on_fail( ec, "read" );; - } - - try { - auto d = boost::asio::buffer_cast(boost::beast::buffers_front(_in_buffer.data())); - auto s = boost::asio::buffer_size(_in_buffer.data()); - fc::datastream ds(d,s); - - bnet_message msg; - fc::raw::unpack( ds, msg ); - on_message( msg, ds ); - _in_buffer.consume( ds.tellp() ); - - wait_on_app(); - return; - - } catch ( ... ) { - wlog( "close bad payload" ); - } - try { - _ws->close( boost::beast::websocket::close_code::bad_payload ); - } catch ( ... ) { - elog( "uncaught exception on close" ); - } - } - - /** if we just call do_read here then this thread might run ahead of - * the main thread, instead we post an event to main which will then - * post a new read event when ready. - * - * This also keeps the "shared pointer" alive in the callback preventing - * the connection from being closed. - */ - void wait_on_app() { - app().post( priority::medium, [self = shared_from_this()]() { - app().get_io_service().post( boost::asio::bind_executor( self->_strand, [self] { self->do_read(); } ) ); - } ); - } - - void on_message( const bnet_message& msg, fc::datastream& ds ) { - try { - switch( msg.which() ) { - case bnet_message::tag::value: - on( msg.get(), ds ); - break; - case bnet_message::tag::value: - on( msg.get() ); - break; - case bnet_message::tag::value: - on( msg.get() ); - break; - case bnet_message::tag::value: - on( msg.get() ); - break; - case bnet_message::tag::value: - on( msg.get() ); - break; - case bnet_message::tag::value: - on( msg.get() ); - break; - default: - wlog( "bad message received" ); - _ws->close( boost::beast::websocket::close_code::bad_payload ); - return; - } - maybe_send_next_message(); - } catch( const fc::exception& e ) { - elog( "${e}", ("e",e.to_detail_string())); - _ws->close( boost::beast::websocket::close_code::bad_payload ); - } - } - - void on( const block_notice& notice ) { - peer_ilog(this, "received block_notice"); - for( const auto& id : notice.block_ids ) { - status( "received notice " + std::to_string( block_header::num_from_id(id) ) ); - mark_block_status( id, true, false ); - } - } - - void on( const hello& hi, fc::datastream& ds ); - - void on( const ping& p ) { - peer_ilog(this, "received ping"); - _last_recv_ping = p; - _remote_lib = p.lib; - _last_recv_ping_time = fc::time_point::now(); - } - - void on( const pong& p ) { - peer_ilog(this, "received pong"); - if( p.code != _last_sent_ping.code ) { - peer_elog(this, "bad ping : invalid pong code"); - return do_goodbye( "invalid pong code" ); - } - _last_sent_ping.code = fc::sha256(); - } - - void do_goodbye( const string& reason ) { - try { - status( "goodbye - " + reason ); - _ws->next_layer().close(); - } catch ( ... ) { - elog( "uncaught exception on close" ); - } - } - - void check_for_redundant_connection(); - - void on( const signed_block_ptr& b ) { - peer_ilog(this, "received signed_block_ptr"); - if (!b) { - peer_elog(this, "bad signed_block_ptr : null pointer"); - EOS_THROW(block_validate_exception, "bad block" ); - } - status( "received block " + std::to_string(b->block_num()) ); - //ilog( "recv block ${n}", ("n", b->block_num()) ); - auto id = b->id(); - mark_block_status( id, true, true ); - - app().get_channel().publish(priority::high, b); - - mark_block_transactions_known_by_peer( b ); - } - - void mark_block_transactions_known_by_peer( const signed_block_ptr& b ) { - for( const auto& receipt : b->transactions ) { - if( receipt.trx.which() == 1 ) { - const auto& pt = receipt.trx.get(); - const auto& id = pt.id(); - mark_transaction_known_by_peer(id); - } - } - } - - /** - * @return true if trx is known by local host, false if new to this host - */ - bool mark_transaction_known_by_peer( const transaction_id_type& id ) { - auto itr = _transaction_status.find( id ); - if( itr != _transaction_status.end() ) { - _transaction_status.modify( itr, [&]( auto& stat ) { - stat.mark_known_by_peer(); - }); - return true; - } else { - transaction_status stat; - stat.id = id; - stat.mark_known_by_peer(); - stat.expired = fc::time_point::now()+fc::seconds(5); - _transaction_status.insert(stat); - } - return false; - } - - void on( const packed_transaction_ptr& p ); - - void on_write( boost::system::error_code ec, std::size_t bytes_transferred ) { - boost::ignore_unused(bytes_transferred); - verify_strand_in_this_thread(_strand, __func__, __LINE__); - if( ec ) { - _ws->next_layer().close(); - return on_fail( ec, "write" ); - } - _state = idle_state; - _out_buffer.resize(0); - maybe_send_next_message(); - } - - void status( const string& msg ) { - // ilog( "${remote_peer}: ${msg}", ("remote_peer",fc::variant(_remote_peer_id).as_string().substr(3,5) )("msg",msg) ); - } - - const fc::variant_object& get_logger_variant() { - if (!_logger_variant) { - boost::system::error_code ec; - auto rep = _ws->lowest_layer().remote_endpoint(ec); - string ip = ec ? "" : rep.address().to_string(); - string port = ec ? "" : std::to_string(rep.port()); - - auto lep = _ws->lowest_layer().local_endpoint(ec); - string lip = ec ? "" : lep.address().to_string(); - string lport = ec ? "" : std::to_string(lep.port()); - - _logger_variant.emplace(fc::mutable_variant_object() - ("_name", _peer) - ("_id", _remote_peer_id) - ("_ip", ip) - ("_port", port) - ("_lip", lip) - ("_lport", lport) - ); - } - return *_logger_variant; - } - }; - - - /** - * Accepts incoming connections and launches the sessions - */ - class listener : public std::enable_shared_from_this { - private: - tcp::acceptor _acceptor; - tcp::socket _socket; - bnet_ptr _net_plugin; - - public: - listener( boost::asio::io_context& ioc, tcp::endpoint endpoint, bnet_ptr np ) - :_acceptor(ioc), _socket(ioc), _net_plugin(std::move(np)) - { - boost::system::error_code ec; - - _acceptor.open( endpoint.protocol(), ec ); - if( ec ) { on_fail( ec, "open" ); return; } - - _acceptor.set_option( boost::asio::socket_base::reuse_address(true) ); - - _acceptor.bind( endpoint, ec ); - if( ec ) { on_fail( ec, "bind" ); return; } - - _acceptor.listen( boost::asio::socket_base::max_listen_connections, ec ); - if( ec ) on_fail( ec, "listen" ); - } - - void run() { - EOS_ASSERT( _acceptor.is_open(), plugin_exception, "unable top open listen socket" ); - do_accept(); - } - - void do_accept() { - _acceptor.async_accept( _socket, [self=shared_from_this()]( auto ec ){ self->on_accept(ec); } ); - } - - void on_fail( boost::system::error_code ec, const char* what ) { - elog( "${w}: ${m}", ("w", what)("m", ec.message() ) ); - } - - void on_accept( boost::system::error_code ec ); - }; - - - class bnet_plugin_impl : public std::enable_shared_from_this { - public: - bnet_plugin_impl() = default; - - const private_key_type _peer_pk = fc::crypto::private_key::generate(); /// one time random key to identify this process - public_key_type _peer_id = _peer_pk.get_public_key(); - string _bnet_endpoint_address = "0.0.0.0"; - uint16_t _bnet_endpoint_port = 4321; - bool _request_trx = true; - bool _follow_irreversible = false; - - std::vector _connect_to_peers; /// list of peers to connect to - std::vector _socket_threads; - int32_t _num_threads = 1; - - std::unique_ptr _ioc; // lifetime guarded by shared_ptr of bnet_plugin_impl - std::shared_ptr _listener; - std::shared_ptr _timer; // only access on app io_service - std::map > _sessions; // only access on app io_service - - channels::irreversible_block::channel_type::handle _on_irb_handle; - channels::accepted_block::channel_type::handle _on_accepted_block_handle; - channels::accepted_block_header::channel_type::handle _on_accepted_block_header_handle; - channels::rejected_block::channel_type::handle _on_bad_block_handle; - channels::accepted_transaction::channel_type::handle _on_appled_trx_handle; - - void async_add_session( std::weak_ptr wp ) { - app().post(priority::low, [wp,this]{ - if( auto l = wp.lock() ) { - _sessions[l.get()] = wp; - } - }); - } - - void on_session_close( const session* s ) { - auto itr = _sessions.find(s); - if( _sessions.end() != itr ) - _sessions.erase(itr); - } - - template - void for_each_session( Call callback ) { - app().post(priority::low, [this, callback = callback] { - for (const auto& item : _sessions) { - if (auto ses = item.second.lock()) { - ses->_ios.post(boost::asio::bind_executor( - ses->_strand, - [ses, cb = callback]() { cb(ses); } - )); - } - } - }); - } - - void on_accepted_transaction( transaction_metadata_ptr trx ) { - if( trx->implicit || trx->scheduled ) return; - for_each_session( [trx]( auto ses ){ ses->on_accepted_transaction( trx ); } ); - } - - /** - * Notify all active connection of the new irreversible block so they - * can purge their block cache - */ - void on_irreversible_block( block_state_ptr s ) { - for_each_session( [s]( auto ses ){ ses->on_new_lib( s ); } ); - } - - /** - * Notify all active connections of the new accepted block so - * they can relay it. This method also pre-packages the block - * as a packed bnet_message so the connections can simply relay - * it on. - */ - void on_accepted_block( block_state_ptr s ) { - _ioc->post( [s,this] { /// post this to the thread pool because packing can be intensive - for_each_session( [s]( auto ses ){ ses->on_accepted_block( s ); } ); - }); - } - - void on_accepted_block_header( block_state_ptr s ) { - _ioc->post( [s,this] { /// post this to the thread pool because packing can be intensive - for_each_session( [s]( auto ses ){ ses->on_accepted_block_header( s ); } ); - }); - } - - /** - * We received a bad block which either - * 1. didn't link to known chain - * 2. violated the consensus rules - * - * Any peer which sent us this block (not noticed) - * should be disconnected as they are objectively bad - */ - void on_bad_block( signed_block_ptr s ) { - for_each_session( [s]( auto ses ) { ses->on_bad_block(s); } ); - }; - - void on_reconnect_peers() { - for( const auto& peer : _connect_to_peers ) { - bool found = false; - for( const auto& con : _sessions ) { - auto ses = con.second.lock(); - if( ses && (ses->_peer == peer) ) { - found = true; - break; - } - } - - if( !found ) { - wlog( "attempt to connect to ${p}", ("p",peer) ); - auto s = std::make_shared( *_ioc, shared_from_this() ); - s->_local_peer_id = _peer_id; - _sessions[s.get()] = s; - s->run( peer ); - } - } - - start_reconnect_timer(); - } - - - void start_reconnect_timer() { - /// add some random delay so that all my peers don't attempt to reconnect to me - /// at the same time after shutting down.. - _timer->expires_from_now( boost::posix_time::microseconds( 1000000*(10+rand()%5) ) ); - _timer->async_wait(app().get_priority_queue().wrap(priority::low, [=](const boost::system::error_code& ec) { - if( ec ) { return; } - on_reconnect_peers(); - })); - } - }; - - - void listener::on_accept( boost::system::error_code ec ) { - if( ec ) { - if( ec == boost::system::errc::too_many_files_open ) - do_accept(); - return; - } - std::shared_ptr newsession; - try { - newsession = std::make_shared( move( _socket ), _net_plugin ); - } - catch( std::exception& e ) { - //making a session creates an instance of std::random_device which may open /dev/urandom - // for example. Unfortuately the only defined error is a std::exception derivative - _socket.close(); - } - if( newsession ) { - _net_plugin->async_add_session( newsession ); - newsession->_local_peer_id = _net_plugin->_peer_id; - newsession->run(); - } - do_accept(); - } - - - bnet_plugin::bnet_plugin() - :my(std::make_shared()) { - } - - bnet_plugin::~bnet_plugin() { - } - - void bnet_plugin::set_program_options(options_description& cli, options_description& cfg) { - cfg.add_options() - ("bnet-endpoint", bpo::value()->default_value("0.0.0.0:4321"), "the endpoint upon which to listen for incoming connections" ) - ("bnet-follow-irreversible", bpo::value()->default_value(false), "this peer will request only irreversible blocks from other nodes" ) - ("bnet-threads", bpo::value(), "the number of threads to use to process network messages" ) - ("bnet-connect", bpo::value>()->composing(), "remote endpoint of other node to connect to; Use multiple bnet-connect options as needed to compose a network" ) - ("bnet-no-trx", bpo::bool_switch()->default_value(false), "this peer will request no pending transactions from other nodes" ) - ("bnet-peer-log-format", bpo::value()->default_value( "[\"${_name}\" ${_ip}:${_port}]" ), - "The string used to format peers when logging messages about them. Variables are escaped with ${}.\n" - "Available Variables:\n" - " _name \tself-reported name\n\n" - " _id \tself-reported ID (Public Key)\n\n" - " _ip \tremote IP address of peer\n\n" - " _port \tremote port number of peer\n\n" - " _lip \tlocal IP address connected to peer\n\n" - " _lport \tlocal port number connected to peer\n\n") - ; - } - - void bnet_plugin::plugin_initialize(const variables_map& options) { - ilog( "Initialize bnet plugin" ); - - try { - peer_log_format = options.at( "bnet-peer-log-format" ).as(); - - if( options.count( "bnet-endpoint" )) { - auto ip_port = options.at( "bnet-endpoint" ).as(); - - //auto host = boost::asio::ip::host_name(ip_port); - auto port = ip_port.substr( ip_port.find( ':' ) + 1, ip_port.size()); - auto host = ip_port.substr( 0, ip_port.find( ':' )); - my->_bnet_endpoint_address = host; - my->_bnet_endpoint_port = std::stoi( port ); - idump((ip_port)( host )( port )( my->_follow_irreversible )); - } - if( options.count( "bnet-follow-irreversible" )) { - my->_follow_irreversible = options.at( "bnet-follow-irreversible" ).as(); - } - - - if( options.count( "bnet-connect" )) { - my->_connect_to_peers = options.at( "bnet-connect" ).as>(); - } - if( options.count( "bnet-threads" )) { - my->_num_threads = options.at( "bnet-threads" ).as(); - if( my->_num_threads > 8 ) - my->_num_threads = 8; - } - my->_request_trx = !options.at( "bnet-no-trx" ).as(); - - } FC_LOG_AND_RETHROW() - } - - void bnet_plugin::plugin_startup() { - handle_sighup(); // Sets logger - - wlog( "bnet startup " ); - - auto& chain = app().get_plugin().chain(); - FC_ASSERT ( chain.get_read_mode() != chain::db_read_mode::IRREVERSIBLE, "bnet is not compatible with \"irreversible\" read_mode"); - - my->_on_appled_trx_handle = app().get_channel() - .subscribe( [this]( transaction_metadata_ptr t ){ - my->on_accepted_transaction(t); - }); - - my->_on_irb_handle = app().get_channel() - .subscribe( [this]( block_state_ptr s ){ - my->on_irreversible_block(s); - }); - - my->_on_accepted_block_handle = app().get_channel() - .subscribe( [this]( block_state_ptr s ){ - my->on_accepted_block(s); - }); - - my->_on_accepted_block_header_handle = app().get_channel() - .subscribe( [this]( block_state_ptr s ){ - my->on_accepted_block_header(s); - }); - - my->_on_bad_block_handle = app().get_channel() - .subscribe( [this]( signed_block_ptr b ){ - my->on_bad_block(b); - }); - - - if( app().get_plugin().chain().get_read_mode() == chain::db_read_mode::READ_ONLY ) { - if (my->_request_trx) { - my->_request_trx = false; - ilog( "forced bnet-no-trx to true since in read-only mode" ); - } - } - - const auto address = boost::asio::ip::make_address( my->_bnet_endpoint_address ); - my->_ioc.reset( new boost::asio::io_context{my->_num_threads} ); - - - auto& ioc = *my->_ioc; - my->_timer = std::make_shared( app().get_io_service() ); - - my->start_reconnect_timer(); - - my->_listener = std::make_shared( ioc, - tcp::endpoint{ address, my->_bnet_endpoint_port }, - my ); - my->_listener->run(); - - my->_socket_threads.reserve( my->_num_threads ); - for( auto i = 0; i < my->_num_threads; ++i ) { - my->_socket_threads.emplace_back( [&ioc, i]{ - std::string tn = "bnet-" + std::to_string( i ); - fc::set_os_thread_name( tn ); - wlog( "start thread" ); - ioc.run(); - wlog( "end thread" ); - } ); - } - - for( const auto& peer : my->_connect_to_peers ) { - auto s = std::make_shared( ioc, my ); - s->_local_peer_id = my->_peer_id; - my->_sessions[s.get()] = s; - s->run( peer ); - } - } - - void bnet_plugin::plugin_shutdown() { - try { - my->_timer->cancel(); - my->_timer.reset(); - } catch ( ... ) { - elog( "exception thrown on timer shutdown" ); - } - - /// shut down all threads and close all connections - - my->for_each_session([](auto ses){ - ses->do_goodbye( "shutting down" ); - }); - - my->_listener.reset(); - my->_ioc->stop(); - - wlog( "joining bnet threads" ); - for( auto& t : my->_socket_threads ) { - t.join(); - } - wlog( "done joining threads" ); - - my->for_each_session([](auto ses){ - EOS_ASSERT( false, plugin_exception, "session ${ses} still active", ("ses", ses->_session_num) ); - }); - - // lifetime of _ioc is guarded by shared_ptr of bnet_plugin_impl - } - - void bnet_plugin::handle_sighup() { - if(fc::get_logger_map().find(logger_name) != fc::get_logger_map().end()) - plugin_logger = fc::get_logger_map()[logger_name]; - } - - - session::~session() { - wlog( "close session ${n}",("n",_session_num) ); - std::weak_ptr netp = _net_plugin; - app().post(priority::low, [netp,ses=this]{ - if( auto net = netp.lock() ) - net->on_session_close(ses); - }); - } - - void session::do_hello() { - /// TODO: find more effecient way to move large array of ids in event of fork - async_get_pending_block_ids( [self = shared_from_this() ]( const vector& ids, uint32_t lib ){ - hello hello_msg; - hello_msg.peer_id = self->_local_peer_id; - hello_msg.last_irr_block_num = lib; - hello_msg.pending_block_ids = ids; - hello_msg.request_transactions = self->_net_plugin->_request_trx; - hello_msg.chain_id = app().get_plugin().get_chain_id(); // TODO: Quick fix in a rush. Maybe a better solution is needed. - - self->_local_lib = lib; - if ( self->_net_plugin->_follow_irreversible ) { - self->send( hello_msg, hello_extension(hello_extension_irreversible_only()) ); - } else { - self->send( hello_msg ); - } - self->_sent_remote_hello = true; - }); - } - - void session::check_for_redundant_connection() { - app().post(priority::low, [self=shared_from_this()]{ - self->_net_plugin->for_each_session( [self]( auto ses ){ - if( ses != self && ses->_remote_peer_id == self->_remote_peer_id ) { - self->do_goodbye( "redundant connection" ); - } - }); - }); - } - - void session::on( const hello& hi, fc::datastream& ds ) { - peer_ilog(this, "received hello"); - _recv_remote_hello = true; - - if( hi.chain_id != app().get_plugin().get_chain_id() ) { // TODO: Quick fix in a rush. Maybe a better solution is needed. - peer_elog(this, "bad hello : wrong chain id"); - return do_goodbye( "disconnecting due to wrong chain id" ); - } - - if( hi.peer_id == _local_peer_id ) { - return do_goodbye( "connected to self" ); - } - - if ( _net_plugin->_follow_irreversible && hi.protocol_version <= "1.0.0") { - return do_goodbye( "need newer protocol version that supports sending only irreversible blocks" ); - } - - if ( hi.protocol_version >= "1.0.1" ) { - //optional extensions - while ( 0 < ds.remaining() ) { - unsigned_int size; - fc::raw::unpack( ds, size ); // next extension size - auto ex_start = ds.pos(); - fc::datastream dsw( ex_start, size ); - unsigned_int wich; - fc::raw::unpack( dsw, wich ); - hello_extension ex; - if ( wich < ex.count() ) { //know extension - fc::datastream dsx( ex_start, size ); //unpack needs to read static_variant _tag again - fc::raw::unpack( dsx, ex ); - if ( ex.which() == hello_extension::tag::value ) { - _remote_request_irreversible_only = true; - } - } else { - //unsupported extension, we just ignore it - //another side does know our protocol version, i.e. it know which extensions we support - //so, it some extensions were crucial, another side will close the connection - } - ds.skip(size); //move to next extension - } - } - - _last_sent_block_num = hi.last_irr_block_num; - _remote_request_trx = hi.request_transactions; - _remote_peer_id = hi.peer_id; - _remote_lib = hi.last_irr_block_num; - - for( const auto& id : hi.pending_block_ids ) - mark_block_status( id, true, false ); - - check_for_redundant_connection(); - - } - - void session::on( const packed_transaction_ptr& p ) { - peer_ilog(this, "received packed_transaction_ptr"); - if (!p) { - peer_elog(this, "bad packed_transaction_ptr : null pointer"); - EOS_THROW(transaction_exception, "bad transaction"); - } - if( !_net_plugin->_request_trx ) - return; - - // ilog( "recv trx ${n}", ("n", id) ); - if( p->expiration() < fc::time_point::now() ) return; - - const auto& id = p->id(); - - if( mark_transaction_known_by_peer( id ) ) - return; - - auto ptr = std::make_shared(p); - - app().get_channel().publish(priority::low, ptr); - } -} /// namespace eosio diff --git a/plugins/bnet_plugin/include/eosio/bnet_plugin/bnet_plugin.hpp b/plugins/bnet_plugin/include/eosio/bnet_plugin/bnet_plugin.hpp deleted file mode 100644 index 5874f2a28ba..00000000000 --- a/plugins/bnet_plugin/include/eosio/bnet_plugin/bnet_plugin.hpp +++ /dev/null @@ -1,55 +0,0 @@ -/** - * @file - * @copyright defined in eos/LICENSE - */ -#pragma once -#include - -#include - -namespace fc { class variant; } - -namespace eosio { - using chain::transaction_id_type; - using std::shared_ptr; - using namespace appbase; - using chain::name; - using fc::optional; - using chain::uint128_t; - - typedef shared_ptr bnet_ptr; - typedef shared_ptr bnet_const_ptr; - - - -/** - * This plugin tracks all actions and keys associated with a set of configured accounts. It enables - * wallets to paginate queries for bnet. - * - * An action will be included in the account's bnet if any of the following: - * - receiver - * - any account named in auth list - * - * A key will be linked to an account if the key is referneced in authorities of updateauth or newaccount - */ -class bnet_plugin : public plugin { - public: - APPBASE_PLUGIN_REQUIRES((chain_plugin)) - - bnet_plugin(); - virtual ~bnet_plugin(); - - virtual void set_program_options(options_description& cli, options_description& cfg) override; - - void plugin_initialize(const variables_map& options); - void plugin_startup(); - void plugin_shutdown(); - void handle_sighup() override; - - private: - bnet_ptr my; -}; - -} /// namespace eosio - - diff --git a/programs/eosio-launcher/main.cpp b/programs/eosio-launcher/main.cpp index 7e6bfbaf7b3..75307e24657 100644 --- a/programs/eosio-launcher/main.cpp +++ b/programs/eosio-launcher/main.cpp @@ -327,12 +327,6 @@ struct last_run_def { vector running_nodes; }; - -enum class p2p_plugin { - NET, - BNET -}; - enum launch_modes { LM_NONE, LM_LOCAL, @@ -396,7 +390,6 @@ struct launcher_def { size_t producers; size_t next_node; string shape; - p2p_plugin p2p; allowed_connection allowed_connections = PC_NONE; bfs::path genesis; bfs::path output; @@ -488,7 +481,6 @@ launcher_def::set_options (bpo::options_description &cfg) { ("producers",bpo::value(&producers)->default_value(21),"total number of non-bios producer instances in this network") ("mode,m",bpo::value>()->multitoken()->default_value({"any"}, "any"),"connection mode, combination of \"any\", \"producers\", \"specified\", \"none\"") ("shape,s",bpo::value(&shape)->default_value("star"),"network topology, use \"star\" \"mesh\" or give a filename for custom") - ("p2p-plugin", bpo::value()->default_value("net"),"select a p2p plugin to use (either net or bnet). Defaults to net.") ("genesis,g",bpo::value()->default_value("./genesis.json"),"set the path to genesis.json") ("skip-signature", bpo::bool_switch(&skip_transaction_signatures)->default_value(false), "nodeos does not require transaction signatures.") ("nodeos", bpo::value(&eosd_extra_args), "forward nodeos command line argument(s) to each instance of nodeos, enclose arg(s) in quotes") @@ -597,20 +589,6 @@ launcher_def::initialize (const variables_map &vmap) { host_map_file = src.stem().string() + "_hosts.json"; } - string nc = vmap["p2p-plugin"].as(); - if ( !nc.empty() ) { - if (boost::iequals(nc,"net")) - p2p = p2p_plugin::NET; - else if (boost::iequals(nc,"bnet")) - p2p = p2p_plugin::BNET; - else { - p2p = p2p_plugin::NET; - } - } - else { - p2p = p2p_plugin::NET; - } - if( !host_map_file.empty() ) { try { fc::json::from_file(host_map_file).as>(bindings); @@ -1107,14 +1085,9 @@ launcher_def::write_config_file (tn_node_def &node) { cfg << "blocks-dir = " << block_dir << "\n"; cfg << "http-server-address = " << host->host_name << ":" << instance.http_port << "\n"; cfg << "http-validate-host = false\n"; - if (p2p == p2p_plugin::NET) { - cfg << "p2p-listen-endpoint = " << host->listen_addr << ":" << instance.p2p_port << "\n"; - cfg << "p2p-server-address = " << host->public_name << ":" << instance.p2p_port << "\n"; - } else { - cfg << "bnet-endpoint = " << host->listen_addr << ":" << instance.p2p_port << "\n"; - // Include the net_plugin endpoint, because the plugin is always loaded (even if not used). - cfg << "p2p-listen-endpoint = " << host->listen_addr << ":" << instance.p2p_port + 1000 << "\n"; - } + cfg << "p2p-listen-endpoint = " << host->listen_addr << ":" << instance.p2p_port << "\n"; + cfg << "p2p-server-address = " << host->public_name << ":" << instance.p2p_port << "\n"; + if (is_bios) { cfg << "enable-stale-production = true\n"; @@ -1140,18 +1113,10 @@ launcher_def::write_config_file (tn_node_def &node) { if(!is_bios) { auto &bios_node = network.nodes["bios"]; - if (p2p == p2p_plugin::NET) { - cfg << "p2p-peer-address = " << bios_node.instance->p2p_endpoint<< "\n"; - } else { - cfg << "bnet-connect = " << bios_node.instance->p2p_endpoint<< "\n"; - } + cfg << "p2p-peer-address = " << bios_node.instance->p2p_endpoint<< "\n"; } for (const auto &p : node.peers) { - if (p2p == p2p_plugin::NET) { - cfg << "p2p-peer-address = " << network.nodes.find(p)->second.instance->p2p_endpoint << "\n"; - } else { - cfg << "bnet-connect = " << network.nodes.find(p)->second.instance->p2p_endpoint << "\n"; - } + cfg << "p2p-peer-address = " << network.nodes.find(p)->second.instance->p2p_endpoint << "\n"; } if (instance.has_db || node.producers.size()) { for (const auto &kp : node.keys ) { @@ -1166,11 +1131,7 @@ launcher_def::write_config_file (tn_node_def &node) { if( instance.has_db ) { cfg << "plugin = eosio::mongo_db_plugin\n"; } - if ( p2p == p2p_plugin::NET ) { - cfg << "plugin = eosio::net_plugin\n"; - } else { - cfg << "plugin = eosio::bnet_plugin\n"; - } + cfg << "plugin = eosio::net_plugin\n"; cfg << "plugin = eosio::chain_api_plugin\n" << "plugin = eosio::history_api_plugin\n"; cfg.close(); diff --git a/programs/nodeos/CMakeLists.txt b/programs/nodeos/CMakeLists.txt index d9fb90ee45d..d5fe8273eb5 100644 --- a/programs/nodeos/CMakeLists.txt +++ b/programs/nodeos/CMakeLists.txt @@ -52,7 +52,6 @@ target_link_libraries( ${NODE_EXECUTABLE_NAME} PRIVATE -Wl,${whole_archive_flag} login_plugin -Wl,${no_whole_archive_flag} PRIVATE -Wl,${whole_archive_flag} history_plugin -Wl,${no_whole_archive_flag} PRIVATE -Wl,${whole_archive_flag} state_history_plugin -Wl,${no_whole_archive_flag} - PRIVATE -Wl,${whole_archive_flag} bnet_plugin -Wl,${no_whole_archive_flag} PRIVATE -Wl,${whole_archive_flag} history_api_plugin -Wl,${no_whole_archive_flag} PRIVATE -Wl,${whole_archive_flag} chain_api_plugin -Wl,${no_whole_archive_flag} PRIVATE -Wl,${whole_archive_flag} net_plugin -Wl,${no_whole_archive_flag} diff --git a/programs/nodeos/logging.json b/programs/nodeos/logging.json index 0b02060b82e..07771457d72 100644 --- a/programs/nodeos/logging.json +++ b/programs/nodeos/logging.json @@ -64,15 +64,6 @@ "stderr", "net" ] - },{ - "name": "bnet_plugin", - "level": "debug", - "enabled": true, - "additivity": false, - "appenders": [ - "stderr", - "net" - ] },{ "name": "producer_plugin", "level": "debug",