Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
#916 one more round of performance tuning of the synchronization feat…
Browse files Browse the repository at this point in the history
…ure, and also dead code removal.
  • Loading branch information
pmesnier committed Dec 21, 2017
1 parent cdb55c5 commit f44f19a
Showing 1 changed file with 59 additions and 104 deletions.
163 changes: 59 additions & 104 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -519,14 +519,15 @@ namespace eosio {
uint32_t sync_last_requested_num;
uint32_t sync_req_span;
connection_ptr source;
bool active;

deque<block_id_type> _blocks;
chain_plugin * chain_plug;

public:
sync_manager(uint32_t span);
void reset_lib_num();
bool syncing();
bool sync_required();
void request_next_chunk(connection_ptr conn);
void take_chunk(connection_ptr c);
void start_sync(connection_ptr c, uint32_t target);
Expand Down Expand Up @@ -580,7 +581,7 @@ namespace eosio {
last_handshake(),
sent_handshake_count(0),
out_queue(),
connecting(false),
connecting(true),
syncing(false),
write_depth(0),
peer_addr(),
Expand Down Expand Up @@ -610,13 +611,6 @@ namespace eosio {
}

bool connection::current() {
if( syncing ) {
fc_dlog(logger, "skipping connection ${n} due to syncing", ("n",peer_name()));
} else if(!connected()) {
fc_dlog(logger, "skipping connection ${n} due to not connected", ("n",peer_name()));
} else {
fc_dlog(logger, "connection ${n} is current", ("n",peer_name()));
}
return (connected() && !syncing);
}

Expand Down Expand Up @@ -1025,9 +1019,6 @@ namespace eosio {
}
}
else if( ec == boost::asio::error::operation_aborted) {
// if( !connected()) {
// my_impl->sync_master->take_chunk(shared_from_this());
// }
}
else {
elog ("setting timer for sync request got error ${ec}",("ec", ec.message()));
Expand All @@ -1053,7 +1044,6 @@ namespace eosio {
else if( ec == boost::asio::error::operation_aborted ) {
if( !connected( ) ) {
fc_dlog(logger, "fetch timeout was cancelled due to dead connection");
// my_impl->sync_master->reassign_fetch( );
}
}
else {
Expand Down Expand Up @@ -1120,7 +1110,7 @@ namespace eosio {
}
}

bool sync_manager::syncing( ) {
bool sync_manager::sync_required( ) {
fc_dlog(logger, "ours = ${ours} known = ${known} head = ${head}",("ours",sync_last_requested_num)("known",sync_known_lib_num)("head",chain_plug->chain( ).head_block_num( )));

return( sync_last_requested_num < sync_known_lib_num ||
Expand Down Expand Up @@ -1220,10 +1210,10 @@ namespace eosio {
handshake_message hello;
handshake_initializer::populate(hello);
fc_dlog(logger, "All caught up with last known last irreversible block resending handshake");
active = false;
for( auto &ci : my_impl->connections) {
if( ci->current()) {
hello.generation = ++ci->sent_handshake_count;
fc_dlog(logger, "send to ${p}", ("p",ci->peer_name()));
ci->enqueue( hello );
}
}
Expand All @@ -1232,27 +1222,18 @@ namespace eosio {
}

void sync_manager::start_sync( connection_ptr c, uint32_t target) {
#if 0
if( !syncing()) {
sync_last_requested_num = chain_plug->chain().head_block_num();
fc_dlog(logger, "Inform other open connections that we are syncing");

handshake_message hello;
handshake_initializer::populate(hello);
for( auto &ci : my_impl->connections) {
if( ci->current()) {
hello.generation = ++ci->sent_handshake_count;
fc_dlog(logger, "send to ${p}", ("p",ci->peer_name()));
ci->enqueue( hello );
}
}
}
#endif

if (!syncing()) {
fc_dlog( logger, "We are already caught up.");
if (!sync_required()) {
uint32_t bnum = chain_plug->chain().last_irreversible_block_num();
uint32_t hnum = chain_plug->chain().head_block_num();
fc_dlog( logger, "We are already caught up, my irr = ${b}, head = ${h}, target = ${t}",
("b",bnum)("h",hnum)("t",target));
active = false;
return;
}

active = true;

ilog( "Catching up with chain, our last req is ${cc}, theirs is ${t} peer ${p}", ( "cc",sync_last_requested_num)("t",target)("p",c->peer_name()));
if( target > sync_known_lib_num) {
sync_known_lib_num = target;
Expand Down Expand Up @@ -1554,7 +1535,6 @@ namespace eosio {
//--------------------------------
// sync need checkz;
//
// -1. another connection has already decided we are synching so continue with this connection
// 0. my head block id == peer head id means we are all caugnt up block wise
// 1. my head block num < peer lib - start sync locally
// 2. my lib > peer lib - send an last_irr_catch_up notice if not the first generation
Expand All @@ -1566,12 +1546,6 @@ namespace eosio {

uint32_t head = cc.head_block_num( );
block_id_type head_id = cc.head_block_id();

if (sync_master->syncing() ) {
fc_dlog(logger, "sync check state -1");
sync_master->start_sync( c, peer_lib);
return;
}
if (head_id == msg.head_id) {
fc_dlog(logger, "sync check state 0");
notice_message note;
Expand All @@ -1581,12 +1555,12 @@ namespace eosio {
c->enqueue( note );
return;
}
if (lib_num < peer_lib) {
if (head < peer_lib) {
fc_dlog(logger, "sync check state 1");
sync_master->start_sync( c, peer_lib);
return;
}
if (lib_num > peer_lib ) {
if (lib_num > msg.head_num ) {
fc_dlog(logger, "sync check state 2");
if ( msg.generation > 1 ) {
notice_message note;
Expand All @@ -1598,13 +1572,16 @@ namespace eosio {
c->syncing = true;
return;
}

if (head <= msg.head_num ) {
fc_dlog(logger, "sync check state 3");
request_message req;
req.req_trx.mode = none;
req.req_blocks.mode = catch_up;
req.req_blocks.pending = lib_num;
c->enqueue( req );
fc_dlog(logger, "sync check state 3 (skipped = ${s}",("s", sync_master->active));
if (!sync_master->active ) {
request_message req;
req.req_trx.mode = none;
req.req_blocks.mode = catch_up;
req.req_blocks.pending = lib_num;
c->enqueue( req );
}
return;
}
else {
Expand All @@ -1621,34 +1598,6 @@ namespace eosio {
}
elog ("sync check failed to resolve status");
return;
#if 0
if( peer_lib > head || sync_master->syncing() ) {
sync_master->start_sync( c, peer_lib );
}
else if( msg.head_id != head_id ) {
fc_dlog(logger, "msg.head_id = ${m} our head = ${h}",("m",msg.head_id)("h",head_id));

notice_message note;
note.known_blocks.mode = none;
fc_dlog(logger, "msg head = ${mh} msg lib = ${ml} my head = ${h} my lib = ${l}",("mh",msg.head_num)("ml",msg.last_irreversible_block_num)("h",head)("l",lib_num));
if( msg.head_num >= lib_num ) {
note.known_blocks.mode = catch_up;
note.known_blocks.pending = head - lib_num;
} else {
note.known_blocks.mode = last_irr_catch_up;
note.known_blocks.pending = lib_num;
}
note.known_trx.mode = catch_up;
note.known_trx.pending = local_txns.size();
if( note.known_trx.pending > 0 || note.known_blocks.pending > 0) {
if (msg.generation > 1 && note.known_blocks.mode == catch_up) {
fc_dlog(logger, "sending ${m} notice to ${n} about ${t} txns and ${b} blocks",("m",modes_str(note.known_blocks.mode))("n",c->peer_name())("t",note.known_trx.pending)("b",note.known_blocks.pending));
c->enqueue( note );
}
c->syncing = true;
}
}
#endif
}

void net_plugin_impl::handle_message( connection_ptr c, const go_away_message &msg ) {
Expand Down Expand Up @@ -1736,7 +1685,7 @@ namespace eosio {
c->trx_state.insert( ( transaction_state ){ t,true,true,( uint32_t ) - 1,
fc::time_point( ),fc::time_point( ) } );

if( !sync_master->syncing( ) ) {
if( !sync_master->active ) {
fwd.known_trx.ids.push_back( t );
}
req.req_trx.ids.push_back( t );
Expand Down Expand Up @@ -1895,20 +1844,11 @@ namespace eosio {
return;
}
}
if( sync_master->syncing() ) {
if( sync_master->active ) {
fc_dlog(logger, "got a txn during sync - dropping");
return;
}

try {
// chain_plug->chain().validate_transaction(msg);
}
catch ( const transaction_exception &ex ) {
elog("got a bad txn ${ex}", ("ex", ex.get_log()));
c->enqueue( go_away_message( go_away_reason::bad_transaction) );
return;
}

if(entry != local_txns.end( ) ) {
local_txns.modify( entry, update_entry( msg ) );
}
Expand Down Expand Up @@ -1951,7 +1891,7 @@ namespace eosio {
block_id_type blk_id = msg.id();
bool has_chunk = false;
uint32_t num = msg.block_num();
bool syncing = sync_master->syncing();
bool syncing = sync_master->active;
try {
if( cc.is_known_block(blk_id)) {
return;
Expand Down Expand Up @@ -2017,23 +1957,38 @@ namespace eosio {
}
}
else {
if(age < fc::seconds(3) && fc::raw::pack_size(msg) < just_send_it_max && !c->syncing ) {
fc_dlog(logger, "forwarding the signed block");
send_all( msg, [c, blk_id, num](connection_ptr conn) -> bool {
bool sendit = false;
if( c != conn && !conn->syncing ) {
auto b = conn->block_state.get<by_id>().find(blk_id);
if(b == conn->block_state.end()) {
conn->block_state.insert( (block_state){blk_id,true,true,fc::time_point()});
sendit = true;
} else if (!b->is_known) {
conn->block_state.modify(b,make_known());
sendit = true;
if( reason == unlinkable ) {
ilog("See if we can re-request the missing block");

handshake_message hello;
handshake_initializer::populate(hello);
for( auto &ci : my_impl->connections) {
if( ci->current()) {
hello.generation = ++ci->sent_handshake_count;
fc_dlog(logger, "send to ${p}", ("p",ci->peer_name()));
ci->enqueue( hello );
}
}
}
else {
if(age < fc::seconds(3) && fc::raw::pack_size(msg) < just_send_it_max && !c->syncing ) {
fc_dlog(logger, "forwarding the signed block");
send_all( msg, [c, blk_id, num](connection_ptr conn) -> bool {
bool sendit = false;
if( c != conn && !conn->syncing ) {
auto b = conn->block_state.get<by_id>().find(blk_id);
if(b == conn->block_state.end()) {
conn->block_state.insert( (block_state){blk_id,true,true,fc::time_point()});
sendit = true;
} else if (!b->is_known) {
conn->block_state.modify(b,make_known());
sendit = true;
}
}
}
fc_dlog(logger, "${action} block ${num} to ${c}",("action", sendit ? "sending " : "skipping ")("num",num)("c", conn->peer_name() ));
return sendit;
});
fc_dlog(logger, "${action} block ${num} to ${c}",("action", sendit ? "sending " : "skipping ")("num",num)("c", conn->peer_name() ));
return sendit;
});
}
}
}
}
Expand Down

0 comments on commit f44f19a

Please sign in to comment.