Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Merge pull request #7598 from EOSIO/net-plugin-block-notify
Browse files Browse the repository at this point in the history
Net plugin block id notification
  • Loading branch information
heifner authored Jul 10, 2019
2 parents 80b4147 + e8a7884 commit c3e18ce
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 78 deletions.
2 changes: 1 addition & 1 deletion plugins/net_plugin/include/eosio/net_plugin/protocol.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ namespace eosio {
chain_id_type chain_id; ///< used to identify chain
fc::sha256 node_id; ///< used to identify peers and prevent self-connect
chain::public_key_type key; ///< authentication key; may be a producer or peer key, or empty
tstamp time;
tstamp time{0};
fc::sha256 token; ///< digest of time to prove we own the private key of the key above
chain::signature_type sig; ///< signature for the digest
string p2p_address;
Expand Down
205 changes: 128 additions & 77 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,27 @@ namespace eosio {
block_id_type id;
uint32_t block_num = 0;
uint32_t connection_id = 0;
bool have_block = false; // true if we have received the block, false if only received id notification
};

struct by_block_id;

typedef multi_index_container<
eosio::peer_block_state,
indexed_by<
ordered_unique< tag<by_id>,
composite_key< peer_block_state,
member<peer_block_state, uint32_t, &eosio::peer_block_state::connection_id>,
member<peer_block_state, block_id_type, &eosio::peer_block_state::id>
>,
composite_key_compare< std::less<uint32_t>, sha256_less >
>,
ordered_non_unique< tag<by_block_id>,
composite_key< peer_block_state,
member<peer_block_state, block_id_type, &eosio::peer_block_state::id>,
member<peer_block_state, uint32_t, &eosio::peer_block_state::connection_id>
member<peer_block_state, bool, &eosio::peer_block_state::have_block>
>,
composite_key_compare< sha256_less, std::less<uint32_t> >
composite_key_compare< sha256_less, std::greater<bool> >
>,
ordered_non_unique< tag<by_block_num>, member<eosio::peer_block_state, uint32_t, &eosio::peer_block_state::block_num > >
>
Expand Down Expand Up @@ -163,15 +173,16 @@ namespace eosio {
};

class dispatch_manager {
std::mutex blk_state_mtx;
mutable std::mutex blk_state_mtx;
peer_block_state_index blk_state;
std::mutex local_txns_mtx;
mutable std::mutex local_txns_mtx;
node_transaction_index local_txns;

public:
void bcast_transaction(const transaction_metadata_ptr& trx);
void rejected_transaction(const transaction_id_type& msg, uint32_t head_blk_num);
void bcast_block(const block_state_ptr& bs);
void bcast_notice( const block_id_type& id );
void rejected_block(const block_id_type& id);

void recv_block(const connection_ptr& conn, const block_id_type& msg, uint32_t bnum);
Expand All @@ -181,15 +192,17 @@ namespace eosio {

void retry_fetch(const connection_ptr& conn);

bool add_peer_block(const peer_block_state& pbs);
bool peer_has_block(const block_id_type& blkid, uint32_t connection_id);
bool have_block(const block_id_type& blkid);
bool add_peer_block( const block_id_type& blkid, uint32_t connection_id );
bool add_peer_block_id( const block_id_type& blkid, uint32_t connection_id );
bool peer_has_block(const block_id_type& blkid, uint32_t connection_id) const;
bool have_block(const block_id_type& blkid) const;
size_t num_entries( uint32_t connection_id ) const;

bool add_peer_txn( const node_transaction_state& nts );
void update_txns_block_num( const signed_block_ptr& sb );
void update_txns_block_num( const transaction_id_type& id, uint32_t blk_num );
bool peer_has_txn( const transaction_id_type& tid, uint32_t connection_id );
bool have_txn( const transaction_id_type& tid );
bool peer_has_txn( const transaction_id_type& tid, uint32_t connection_id ) const;
bool have_txn( const transaction_id_type& tid ) const;
void expire_txns( uint32_t lib_num );
};

Expand Down Expand Up @@ -376,6 +389,7 @@ namespace eosio {
constexpr auto def_max_read_delays = 100; // number of times read_delay_timer started without any reads
constexpr auto def_max_consecutive_rejected_blocks = 3; // num of rejected blocks before disconnect
constexpr auto def_max_consecutive_immediate_connection_close = 9; // back off if client keeps closing
constexpr auto def_max_peer_block_ids_per_connection = 100*1024; // if we reach this many then the connection is spaming us, disconnect
constexpr auto def_max_clients = 25; // 0 for unlimited clients
constexpr auto def_max_nodes_per_host = 1;
constexpr auto def_conn_retry_wait = 30;
Expand Down Expand Up @@ -407,8 +421,9 @@ namespace eosio {
*/
constexpr uint16_t proto_base = 0;
constexpr uint16_t proto_explicit_sync = 1;
constexpr uint16_t block_id_notify = 2;

constexpr uint16_t net_version = proto_explicit_sync;
constexpr uint16_t net_version = block_id_notify;

/**
* Index by start_block_num
Expand Down Expand Up @@ -712,6 +727,8 @@ namespace eosio {
void handle_message( const packed_transaction& msg ) = delete; // packed_transaction_ptr overload used instead
void handle_message( const packed_transaction_ptr& msg );

void process_signed_block( const signed_block_ptr& msg );

fc::optional<fc::variant_object> _logger_variant;
const fc::variant_object& get_logger_variant() {
if (!_logger_variant) {
Expand Down Expand Up @@ -750,12 +767,9 @@ namespace eosio {
}

void operator()( signed_block&& msg ) const {
// continue call to handle_message on connection strand
shared_ptr<signed_block> ptr = std::make_shared<signed_block>( std::move( msg ) );
connection_wptr weak = c;
app().post(priority::high, [ptr{std::move(ptr)}, weak{std::move(weak)}] {
connection_ptr c = weak.lock();
if( c ) c->handle_message( ptr );
});
c->handle_message( ptr );
}

void operator()( packed_transaction&& msg ) const {
Expand Down Expand Up @@ -981,11 +995,11 @@ namespace eosio {
std::tie( std::ignore, std::ignore, head_num,
std::ignore, std::ignore, head_id ) = my_impl->get_chain_info();

notice_message note;
note.known_blocks.mode = normal;
note.known_blocks.pending = 0;
fc_dlog(logger, "head_num = ${h}",("h",head_num));
if(head_num == 0) {
notice_message note;
note.known_blocks.mode = normal;
note.known_blocks.pending = 0;
enqueue(note);
return;
}
Expand Down Expand Up @@ -1024,7 +1038,7 @@ namespace eosio {
signed_block_ptr b = cc.fetch_block_by_id( blkid );
if( b ) {
fc_dlog( logger, "found block for id at num ${n}", ("n", b->block_num()) );
my_impl->dispatcher->add_peer_block( {blkid, block_header::num_from_id( blkid ), c->connection_id} );
my_impl->dispatcher->add_peer_block( blkid, c->connection_id );
c->strand.post( [c, b{std::move(b)}]() {
c->enqueue_block( b );
} );
Expand Down Expand Up @@ -1657,12 +1671,6 @@ namespace eosio {
fc_ilog( logger, "sync_manager got ${m} block notice", ("m", modes_str( msg.known_blocks.mode )) );
EOS_ASSERT( msg.known_blocks.mode == catch_up || msg.known_blocks.mode == last_irr_catch_up, plugin_exception,
"sync_recv_notice only called on catch_up" );
if( msg.known_blocks.ids.size() > 1 ) {
fc_elog( logger, "Invalid notice_message, known_blocks.ids.size ${s}, closing connection: ${p}",
("s", msg.known_blocks.ids.size())("p", c->peer_address()) );
c->close();
return;
}
if (msg.known_blocks.mode == catch_up) {
if (msg.known_blocks.ids.size() == 0) {
fc_elog( logger,"got a catch up with ids size = 0" );
Expand Down Expand Up @@ -1769,26 +1777,50 @@ namespace eosio {
//------------------------------------------------------------------------

// thread safe
bool dispatch_manager::add_peer_block(const peer_block_state& entry) {
bool dispatch_manager::add_peer_block( const block_id_type& blkid, uint32_t connection_id) {
std::lock_guard<std::mutex> g( blk_state_mtx );
auto bptr = blk_state.get<by_id>().find(std::make_tuple(std::ref(entry.id), entry.connection_id));
auto bptr = blk_state.get<by_id>().find( std::make_tuple( connection_id, std::ref( blkid )));
bool added = (bptr == blk_state.end());
if( added ) {
blk_state.insert(entry);
blk_state.insert( {blkid, block_header::num_from_id( blkid ), connection_id, true} );
} else if( !bptr->have_block ) {
blk_state.modify( bptr, []( auto& pb ) {
pb.have_block = true;
});
}
return added;
}

bool dispatch_manager::peer_has_block( const block_id_type& blkid, uint32_t connection_id ) {
bool dispatch_manager::add_peer_block_id( const block_id_type& blkid, uint32_t connection_id) {
std::lock_guard<std::mutex> g( blk_state_mtx );
auto bptr = blk_state.get<by_id>().find( std::make_tuple( connection_id, std::ref( blkid )));
bool added = (bptr == blk_state.end());
if( added ) {
blk_state.insert( {blkid, block_header::num_from_id( blkid ), connection_id, false} );
}
return added;
}

bool dispatch_manager::peer_has_block( const block_id_type& blkid, uint32_t connection_id ) const {
std::lock_guard<std::mutex> g(blk_state_mtx);
auto blk_itr = blk_state.get<by_id>().find(std::make_tuple(std::ref(blkid), connection_id));
const auto blk_itr = blk_state.get<by_id>().find( std::make_tuple( connection_id, std::ref( blkid )));
return blk_itr != blk_state.end();
}

bool dispatch_manager::have_block( const block_id_type& blkid ) {
bool dispatch_manager::have_block( const block_id_type& blkid ) const {
std::lock_guard<std::mutex> g(blk_state_mtx);
auto blk_itr = blk_state.get<by_id>().find( blkid );
return blk_itr != blk_state.end();
// by_block_id sorts have_block by greater so have_block == true will be the first one found
const auto& index = blk_state.get<by_block_id>();
auto blk_itr = index.find( blkid );
if( blk_itr != index.end() ) {
return blk_itr->have_block;
}
return false;
}

size_t dispatch_manager::num_entries( uint32_t connection_id ) const {
std::lock_guard<std::mutex> g(blk_state_mtx);
return blk_state.get<by_id>().count( connection_id );
}

bool dispatch_manager::add_peer_txn( const node_transaction_state& nts ) {
Expand Down Expand Up @@ -1825,15 +1857,15 @@ namespace eosio {
}
}

bool dispatch_manager::peer_has_txn( const transaction_id_type& tid, uint32_t connection_id ) {
bool dispatch_manager::peer_has_txn( const transaction_id_type& tid, uint32_t connection_id ) const {
std::lock_guard<std::mutex> g( local_txns_mtx );
auto tptr = local_txns.get<by_id>().find( std::make_tuple( std::ref( tid ), connection_id ) );
const auto tptr = local_txns.get<by_id>().find( std::make_tuple( std::ref( tid ), connection_id ) );
return tptr != local_txns.end();
}

bool dispatch_manager::have_txn( const transaction_id_type& tid ) {
bool dispatch_manager::have_txn( const transaction_id_type& tid ) const {
std::lock_guard<std::mutex> g( local_txns_mtx );
auto tptr = local_txns.get<by_id>().find( tid );
const auto tptr = local_txns.get<by_id>().find( tid );
return tptr != local_txns.end();
}

Expand Down Expand Up @@ -1893,8 +1925,7 @@ namespace eosio {
bool has_block = cp->last_handshake_recv.last_irreversible_block_num >= bnum;
g_conn.unlock();
if( !has_block ) {
peer_block_state pbstate{bs->id, bnum, cp->connection_id};
if( !add_peer_block( pbstate ) ) {
if( !add_peer_block( bs->id, cp->connection_id ) ) {
return;
}
fc_dlog( logger, "bcast block ${b} to ${p}", ("b", bnum)( "p", cp->peer_name() ) );
Expand All @@ -1905,10 +1936,36 @@ namespace eosio {
} );
}

void dispatch_manager::bcast_notice( const block_id_type& id ) {
if( my_impl->sync_master->syncing_with_peer() ) return;

fc_dlog( logger, "bcast notice ${b}", ("b", block_header::num_from_id( id )) );
notice_message note;
note.known_blocks.mode = normal;
note.known_blocks.pending = 1; // 1 indicates this is a block id notice
note.known_blocks.ids.emplace_back( id );

for_each_block_connection( [this, note]( auto& cp ) {
if( !cp->current() ) {
return true;
}
cp->strand.post( [this, cp, note]() {
// check protocol_version here since only accessed from strand
if( cp->protocol_version < block_id_notify ) return;
const block_id_type& id = note.known_blocks.ids.back();
if( peer_has_block( id, cp->connection_id ) ) {
return;
}
fc_dlog( logger, "bcast block id ${b} to ${p}", ("b", block_header::num_from_id( id ))("p", cp->peer_name()) );
cp->enqueue( note );
} );
return true;
} );
}

// called from connection strand
void dispatch_manager::recv_block(const connection_ptr& c, const block_id_type& id, uint32_t bnum) {
peer_block_state pbstate{id, bnum, c->connection_id};
add_peer_block( pbstate );
add_peer_block( id, c->connection_id );
std::unique_lock<std::mutex> g( c->conn_mtx );
if (c &&
c->last_req &&
Expand Down Expand Up @@ -1969,49 +2026,28 @@ namespace eosio {

// called from connection strand
void dispatch_manager::recv_notice(const connection_ptr& c, const notice_message& msg, bool generated) {
request_message req;
req.req_trx.mode = none;
req.req_blocks.mode = none;
if (msg.known_trx.mode == normal) {
req.req_trx.mode = normal;
req.req_trx.pending = 0;
} else if (msg.known_trx.mode != none) {
fc_elog( logger, "passed a notice_message with something other than a normal on none known_trx" );
return;
}
if (msg.known_blocks.mode == normal) {
req.req_blocks.mode = normal;
// known_blocks.ids is never > 1
if( !msg.known_blocks.ids.empty() ) {
connection_wptr weak = c;
app().post( priority::low, [this, msg{std::move(msg)}, req{std::move(req)}, weak{std::move(weak)}]() mutable {
connection_ptr c = weak.lock();
if( !c ) return;
const block_id_type& blkid = msg.known_blocks.ids.back();
signed_block_ptr b;
try {
controller& cc = my_impl->chain_plug->chain();
b = cc.fetch_block_by_id( blkid ); // if exists
if( b ) {
add_peer_block( {blkid, block_header::num_from_id( blkid ), c->connection_id} );
}
} catch( const assert_exception& ex ) {
fc_ilog( logger, "caught assert on fetch_block_by_id, ${ex}", ("ex", ex.what()) );
// keep going, client can ask another peer
} catch( ... ) {
fc_elog( logger, "failed to retrieve block for id" );
}
if( !b ) {
req.req_blocks.ids.push_back( blkid );
c->strand.post( [req{std::move(req)}, c{std::move(c)}]() mutable {
fc_dlog( logger, "send req" );
c->enqueue( req );
c->fetch_wait();
std::lock_guard<std::mutex> g( c->conn_mtx );
c->last_req = std::move( req );
});
}
});
if( num_entries( c->connection_id ) > def_max_peer_block_ids_per_connection ) {
fc_elog( logger, "received too many notice_messages, disconnecting" );
c->close( false );
}
const block_id_type& blkid = msg.known_blocks.ids.back();
if( have_block( blkid )) {
add_peer_block( blkid, c->connection_id );
return;
} else {
add_peer_block_id( blkid, c->connection_id );
}
if( msg.known_blocks.pending == 1 ) { // block id notify
return;
}
}
} else if (msg.known_blocks.mode != none) {
fc_elog( logger, "passed a notice_message with something other than a normal on none known_blocks" );
Expand Down Expand Up @@ -2631,6 +2667,12 @@ namespace eosio {
//
peer_ilog( this, "received notice_message" );
connecting = false;
if( msg.known_blocks.ids.size() > 1 ) {
fc_elog( logger, "Invalid notice_message, known_blocks.ids.size ${s}, closing connection: ${p}",
("s", msg.known_blocks.ids.size())("p", peer_address()) );
close( false );
return;
}
if( msg.known_trx.mode != none ) {
fc_dlog( logger, "this is a ${m} notice with ${n} transactions",
("m", modes_str( msg.known_trx.mode ))( "n", msg.known_trx.pending ) );
Expand Down Expand Up @@ -2790,8 +2832,17 @@ namespace eosio {
});
}

// called from connection strand
void connection::handle_message( const signed_block_ptr& ptr ) {
app().post(priority::high, [ptr, weak = weak_from_this()] {
connection_ptr c = weak.lock();
if( c ) c->process_signed_block( ptr );
});
my_impl->dispatcher->bcast_notice( ptr->id() );
}

// called from application thread
void connection::handle_message( const signed_block_ptr& msg ) {
void connection::process_signed_block( const signed_block_ptr& msg ) {
controller& cc = my_impl->chain_plug->chain();
block_id_type blk_id = msg->id();
uint32_t blk_num = msg->block_num();
Expand Down

0 comments on commit c3e18ce

Please sign in to comment.