diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 22eadd24356..c540600ec5e 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -468,6 +468,8 @@ namespace eosio { void stop_send(); void enqueue( const net_message &msg ); + void cancel_sync(const string &reason); + void cancel_fetch(); bool enqueue_sync_block(); void send_next_message(); void send_next_txn(); @@ -515,18 +517,15 @@ namespace eosio { uint32_t sync_known_lib_num; uint32_t sync_last_requested_num; uint32_t sync_req_span; + connection_ptr source; - // deque full_chunks; - // deque partial_chunks; deque _blocks; chain_plugin * chain_plug; public: sync_manager(uint32_t span); bool syncing(); - void request_next_chunk(); - void assign_chunk(connection_ptr c); - void apply_chunk(sync_state_ptr ss); + void request_next_chunk(connection_ptr conn); void take_chunk(connection_ptr c); void start_sync(connection_ptr c, uint32_t target); @@ -837,7 +836,17 @@ namespace eosio { }); } - bool connection::enqueue_sync_block( ) { + void connection::cancel_sync(const string &reason) { + elog("cancel sync reason = ${m} ", ("m",reason)); + enqueue( ( sync_request_message ) {0,0} ); + my_impl->sync_master->reassign_fetch(); + } + + void connection::cancel_fetch() { + enqueue( ( request_message ) {ordered_txn_ids( ), ordered_blk_ids( )} ); + } + + bool connection::enqueue_sync_block() { chain_controller& cc = app().find_plugin()->chain(); uint32_t num = ++sync_requested->last; @@ -960,14 +969,14 @@ namespace eosio { void connection::sync_timeout( boost::system::error_code ec ) { if( !ec ) { if( sync_receiving && sync_receiving->last < sync_receiving->end_block) { - enqueue( (sync_request_message) {0,0}); - my_impl->sync_master->take_chunk(shared_from_this()); + cancel_sync("sync timeout"); + // my_impl->sync_master->take_chunk(shared_from_this()); } } else if( ec == boost::asio::error::operation_aborted) { - if( !connected()) { - my_impl->sync_master->take_chunk(shared_from_this()); - } + // if( !connected()) { + // my_impl->sync_master->take_chunk(shared_from_this()); + // } } else { elog ("setting timer for sync request got error ${ec}",("ec", ec.message())); @@ -987,14 +996,13 @@ namespace eosio { void connection::fetch_timeout( boost::system::error_code ec ) { if( !ec ) { if( !( pending_fetch->req_trx.empty( ) || pending_fetch->req_blocks.empty( ) ) ) { - enqueue( ( request_message ) {ordered_txn_ids( ), ordered_blk_ids( )} ); - my_impl->sync_master->reassign_fetch( ); + cancel_fetch (); } } else if( ec == boost::asio::error::operation_aborted ) { if( !connected( ) ) { fc_dlog(logger, "fetch timeout was cancelled due to dead connection"); - my_impl->sync_master->reassign_fetch( ); + // my_impl->sync_master->reassign_fetch( ); } } else { @@ -1040,6 +1048,7 @@ namespace eosio { :sync_known_lib_num( 0 ) ,sync_last_requested_num( 0 ) ,sync_req_span( span ) + ,source() { chain_plug = app( ).find_plugin( ); } @@ -1050,18 +1059,53 @@ namespace eosio { chain_plug->chain( ).head_block_num( ) < sync_last_requested_num ); } - void sync_manager::request_next_chunk() { + void sync_manager::request_next_chunk( connection_ptr conn = connection_ptr() ) { uint32_t head_block = chain_plug->chain().head_block_num(); - for(auto &c : my_impl->connections ) { - if(c->sync_receiving && c->sync_receiving->start_block == head_block + 1) { - c->enqueue( (sync_request_message){c->sync_receiving->start_block, - c->sync_receiving->end_block}); - sync_last_requested_num = c->sync_receiving->end_block; + if (conn) { + source = conn; + } + else if (my_impl->connections.size() == 1) { + if (!source) { + source = *my_impl->connections.begin(); + } + if (!source->current()) { + source.reset(); } } - } + else { + auto cptr = my_impl->connections.find(source); + if (cptr == my_impl->connections.end()) { + elog ("unable to find previous source connection in connections list"); + cptr = my_impl->connections.begin(); + } else { + ++cptr; + } + while (true) { + if (cptr == my_impl->connections.end()) { + cptr = my_impl->connections.begin(); + if (!source) { + break; + } + } + if (*cptr == source) { + break; + } + if ((*cptr)->current()) { + source = *cptr; + break; + } + else { + ++cptr; + } + } + } + if (!source) { + elog("Unable to continue syncing at this time"); + sync_last_requested_num = chain_plug->chain().head_block_num(); + sync_known_lib_num = chain_plug->chain().last_irreversible_block_num(); + return; + } - void sync_manager::assign_chunk( connection_ptr c ) { uint32_t start = 0; uint32_t end = 0; @@ -1071,58 +1115,70 @@ namespace eosio { if( end > sync_known_lib_num ) end = sync_known_lib_num; if( end > 0 && end >= start ) { - fc_dlog(logger, "conn ${n} recv blks ${s} to ${e}",("n",c->peer_name() )("s",start)("e",end)); - c->sync_receiving.reset(new sync_state( start, end, sync_last_requested_num ) ); + fc_dlog(logger, "conn ${n} recv blks ${s} to ${e}",("n",source->peer_name() )("s",start)("e",end)); + source->sync_receiving.reset(new sync_state( start, end, sync_last_requested_num ) ); } } else { - fc_dlog(logger, "conn ${n} resetting sync recv",("n",c->peer_name() )); - c->sync_receiving.reset( ); + fc_dlog(logger, "conn ${n} resetting sync recv",("n",source->peer_name() )); + source->sync_receiving.reset( ); + } + + if(source->sync_receiving && source->sync_receiving->start_block == head_block + 1) { + source->enqueue( (sync_request_message){source->sync_receiving->start_block, + source->sync_receiving->end_block}); + sync_last_requested_num = source->sync_receiving->end_block; } } void sync_manager::take_chunk( connection_ptr c) { - if( !c->sync_receiving) { - elog( "take_chunk called, but sync_receiving is empty"); - return; - } - sync_state_ptr ss; - c->sync_receiving.swap(ss); - fc_dlog(logger, "conn ${n} losing recv blks ${s} to ${e}",("n",c->peer_name() )("s",ss->start_block)("e",ss->end_block)); - - if( chain_plug->chain().head_block_num() == sync_known_lib_num ) { - handshake_message hello; - handshake_initializer::populate(hello); - fc_dlog(logger, "All caught up with last known last irreversible block resending handshake"); - for( auto &ci : my_impl->connections) { - if( ci->current()) { - hello.generation = ++ci->sent_handshake_count; - fc_dlog(logger, "send to ${p}", ("p",ci->peer_name())); - ci->enqueue( hello ); - } + if( !c->sync_receiving) { + elog( "take_chunk called, but sync_receiving is empty"); + return; + } + sync_state_ptr ss; + c->sync_receiving.swap(ss); + fc_dlog(logger, "conn ${n} covered blks ${s} to ${e}",("n",c->peer_name() )("s",ss->start_block)("e",ss->end_block)); + + if( chain_plug->chain().head_block_num() == sync_known_lib_num ) { + handshake_message hello; + handshake_initializer::populate(hello); + fc_dlog(logger, "All caught up with last known last irreversible block resending handshake"); + for( auto &ci : my_impl->connections) { + if( ci->current()) { + hello.generation = ++ci->sent_handshake_count; + fc_dlog(logger, "send to ${p}", ("p",ci->peer_name())); + ci->enqueue( hello ); } } - if( c->connected()) { - assign_chunk( c); - } - request_next_chunk(); } + request_next_chunk(); + } - void sync_manager::start_sync( connection_ptr c, uint32_t target) { - if( !syncing()) { - sync_last_requested_num = chain_plug->chain().head_block_num(); - } - ilog( "Catching up with chain, our last req is ${cc}, theirs is ${t}", - ( "cc",sync_last_requested_num)("t",target)); - if( target > sync_known_lib_num) { - sync_known_lib_num = target; - } - if( c->sync_receiving && c->sync_receiving->end_block > 0) { - return; + void sync_manager::start_sync( connection_ptr c, uint32_t target) { + if( !syncing()) { + sync_last_requested_num = chain_plug->chain().head_block_num(); + fc_dlog(logger, "Inform other open connections that we are syncing"); + + handshake_message hello; + handshake_initializer::populate(hello); + for( auto &ci : my_impl->connections) { + if( ci->current()) { + hello.generation = ++ci->sent_handshake_count; + fc_dlog(logger, "send to ${p}", ("p",ci->peer_name())); + ci->enqueue( hello ); + } } - assign_chunk( c); - request_next_chunk(); } + ilog( "Catching up with chain, our last req is ${cc}, theirs is ${t}", ( "cc",sync_last_requested_num)("t",target)); + if( target > sync_known_lib_num) { + sync_known_lib_num = target; + } + if( c->sync_receiving && c->sync_receiving->end_block > 0) { + return; + } + request_next_chunk(c); + } void sync_manager::reassign_fetch() { request_next_chunk(); @@ -1255,11 +1311,13 @@ namespace eosio { close( conn ); } } catch (...) { + elog( "Undefined exception hanlding the read data from connection: ${m}",( "m", ec.message() ) ); close( conn ); } } ); } catch (...) { + elog( "Undefined exception handling reading" ); close( conn ); } } @@ -1614,7 +1672,7 @@ namespace eosio { } void net_plugin_impl::handle_message( connection_ptr c, const sync_request_message &msg) { - fc_dlog(logger, "got a sync_request_message from ${p}", ("p",c->peer_name())); + fc_dlog(logger, "got a sync_request_message { ${s} - ${e} } from ${p}",("s",msg.start_block)("e",msg.end_block) ("p",c->peer_name())); if( msg.end_block == 0) { c->sync_requested.reset(); } else { @@ -1800,6 +1858,9 @@ namespace eosio { fc_dlog(logger, "got signed_block #${n} from ${p}", ("n",msg.block_num())("p",c->peer_name())); chain_controller &cc = chain_plug->chain(); block_id_type blk_id = msg.id(); + bool has_chunk = false; + uint32_t num = msg.block_num(); + bool syncing = sync_master->syncing(); try { if( cc.is_known_block(blk_id)) { return; @@ -1807,28 +1868,25 @@ namespace eosio { } catch( ...) { } if( cc.head_block_num() >= msg.block_num()) { - elog( "received forking block #${n}",( "n",msg.block_num())); + elog( "received forking block #${n} from ${p}",( "n",num)("p",c->peer_name())); } fc::microseconds age( fc::time_point::now() - msg.timestamp); fc_dlog(logger, "got signed_block #${n} from ${p} block age in secs = ${age}",("n",msg.block_num())("p",c->peer_name())("age",age.to_seconds())); - bool has_chunk = false; - uint32_t num = msg.block_num(); - bool syncing = sync_master->syncing(); if( syncing ) { has_chunk =( c->sync_receiving && c->sync_receiving->end_block > 0); if( !has_chunk) { if(c->sync_receiving) - elog("got a block while syncing but sync_receiving end block == 0 #${n}", - ( "n",num)); + elog("got a block while syncing but sync_receiving end block == 0 #${n} from ${p}", ( "n",num)("p",c->peer_name())); else - elog("got a block while syncing but no sync_receiving set #${n}", - ( "n",num)); + elog("got a block while syncing but no sync_receiving set #${n} from ${p}", ( "n",num)("p",c->peer_name())); } else { if( c->sync_receiving->last + 1 != num) { - wlog( "expected block ${next} but got ${num}",("next",c->sync_receiving->last+1)("num",num)); + wlog( "expected block ${next} but got ${num} from ${p}",("next",c->sync_receiving->last+1)("num",num)("p",c->peer_name())); + c->cancel_sync("blk out of order"); + sync_master->reassign_fetch(); return; } c->sync_receiving->last = num; @@ -1841,22 +1899,22 @@ namespace eosio { chain_plug->accept_block(msg, syncing); accepted = true; } catch( const unlinkable_block_exception &ex) { - elog( "handle signed block: unlinkable_block_exception accept block #${n} syncing",("n",num)); + elog( "handle signed block: unlinkable_block_exception accept block #${n} syncing from ${p}",("n",num)("p",c->peer_name())); c->enqueue( go_away_message( go_away_reason::unlinkable )); } catch( const assert_exception &ex) { - elog( "unable to accept block on assert exception ${n}",("n",ex.what())); + elog( "unable to accept block on assert exception ${n} from ${p}",("n",ex.to_string())("p",c->peer_name())); } catch( const fc::exception &ex) { - elog( "accept_block threw a non-assert exception ${x}",( "x",ex.what())); + elog( "accept_block threw a non-assert exception ${x} from ${p}",( "x",ex.to_string())("p",c->peer_name())); } catch( ...) { - elog( "handle sync block caught something else"); + elog( "handle sync block caught something else from ${p}",("num",num)("p",c->peer_name())); } } if( has_chunk) { if( !accepted) { - c->enqueue( ( request_message ) {ordered_txn_ids( ), ordered_blk_ids( )} ); + wlog("block ${num}, ${bid} not accepted from ${p}",("num",num)("p",c->peer_name())); + c->cancel_sync("blk not acceptd"); sync_master->reassign_fetch(); - // c->sync_receiving->block_cache.emplace_back(std::move(c->blk_buffer)); } if( num == c->sync_receiving->end_block) { @@ -1979,9 +2037,9 @@ namespace eosio { if( c->peer_addr.empty( ) ) { --num_clients; } + c->close(); if( c->sync_receiving) sync_master->take_chunk( c); - c->close(); }