Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Merge pull request #904 from EOSIO/STAT-274-gh-903
Browse files Browse the repository at this point in the history
#903 STAT 274
  • Loading branch information
heifner authored Dec 9, 2017
2 parents b775b04 + aad63f3 commit a6ff477
Showing 1 changed file with 136 additions and 78 deletions.
214 changes: 136 additions & 78 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,8 @@ namespace eosio {
void stop_send();

void enqueue( const net_message &msg );
void cancel_sync(const string &reason);
void cancel_fetch();
bool enqueue_sync_block();
void send_next_message();
void send_next_txn();
Expand Down Expand Up @@ -515,18 +517,15 @@ namespace eosio {
uint32_t sync_known_lib_num;
uint32_t sync_last_requested_num;
uint32_t sync_req_span;
connection_ptr source;

// deque<sync_state_ptr> full_chunks;
// deque<sync_state_ptr> partial_chunks;
deque<block_id_type> _blocks;
chain_plugin * chain_plug;

public:
sync_manager(uint32_t span);
bool syncing();
void request_next_chunk();
void assign_chunk(connection_ptr c);
void apply_chunk(sync_state_ptr ss);
void request_next_chunk(connection_ptr conn);
void take_chunk(connection_ptr c);
void start_sync(connection_ptr c, uint32_t target);

Expand Down Expand Up @@ -837,7 +836,17 @@ namespace eosio {
});
}

bool connection::enqueue_sync_block( ) {
void connection::cancel_sync(const string &reason) {
elog("cancel sync reason = ${m} ", ("m",reason));
enqueue( ( sync_request_message ) {0,0} );
my_impl->sync_master->reassign_fetch();
}

void connection::cancel_fetch() {
enqueue( ( request_message ) {ordered_txn_ids( ), ordered_blk_ids( )} );
}

bool connection::enqueue_sync_block() {
chain_controller& cc = app().find_plugin<chain_plugin>()->chain();
uint32_t num = ++sync_requested->last;

Expand Down Expand Up @@ -960,14 +969,14 @@ namespace eosio {
void connection::sync_timeout( boost::system::error_code ec ) {
if( !ec ) {
if( sync_receiving && sync_receiving->last < sync_receiving->end_block) {
enqueue( (sync_request_message) {0,0});
my_impl->sync_master->take_chunk(shared_from_this());
cancel_sync("sync timeout");
// my_impl->sync_master->take_chunk(shared_from_this());
}
}
else if( ec == boost::asio::error::operation_aborted) {
if( !connected()) {
my_impl->sync_master->take_chunk(shared_from_this());
}
// if( !connected()) {
// my_impl->sync_master->take_chunk(shared_from_this());
// }
}
else {
elog ("setting timer for sync request got error ${ec}",("ec", ec.message()));
Expand All @@ -987,14 +996,13 @@ namespace eosio {
void connection::fetch_timeout( boost::system::error_code ec ) {
if( !ec ) {
if( !( pending_fetch->req_trx.empty( ) || pending_fetch->req_blocks.empty( ) ) ) {
enqueue( ( request_message ) {ordered_txn_ids( ), ordered_blk_ids( )} );
my_impl->sync_master->reassign_fetch( );
cancel_fetch ();
}
}
else if( ec == boost::asio::error::operation_aborted ) {
if( !connected( ) ) {
fc_dlog(logger, "fetch timeout was cancelled due to dead connection");
my_impl->sync_master->reassign_fetch( );
// my_impl->sync_master->reassign_fetch( );
}
}
else {
Expand Down Expand Up @@ -1040,6 +1048,7 @@ namespace eosio {
:sync_known_lib_num( 0 )
,sync_last_requested_num( 0 )
,sync_req_span( span )
,source()
{
chain_plug = app( ).find_plugin<chain_plugin>( );
}
Expand All @@ -1050,18 +1059,53 @@ namespace eosio {
chain_plug->chain( ).head_block_num( ) < sync_last_requested_num );
}

void sync_manager::request_next_chunk() {
void sync_manager::request_next_chunk( connection_ptr conn = connection_ptr() ) {
uint32_t head_block = chain_plug->chain().head_block_num();
for(auto &c : my_impl->connections ) {
if(c->sync_receiving && c->sync_receiving->start_block == head_block + 1) {
c->enqueue( (sync_request_message){c->sync_receiving->start_block,
c->sync_receiving->end_block});
sync_last_requested_num = c->sync_receiving->end_block;
if (conn) {
source = conn;
}
else if (my_impl->connections.size() == 1) {
if (!source) {
source = *my_impl->connections.begin();
}
if (!source->current()) {
source.reset();
}
}
}
else {
auto cptr = my_impl->connections.find(source);
if (cptr == my_impl->connections.end()) {
elog ("unable to find previous source connection in connections list");
cptr = my_impl->connections.begin();
} else {
++cptr;
}
while (true) {
if (cptr == my_impl->connections.end()) {
cptr = my_impl->connections.begin();
if (!source) {
break;
}
}
if (*cptr == source) {
break;
}
if ((*cptr)->current()) {
source = *cptr;
break;
}
else {
++cptr;
}
}
}
if (!source) {
elog("Unable to continue syncing at this time");
sync_last_requested_num = chain_plug->chain().head_block_num();
sync_known_lib_num = chain_plug->chain().last_irreversible_block_num();
return;
}

void sync_manager::assign_chunk( connection_ptr c ) {
uint32_t start = 0;
uint32_t end = 0;

Expand All @@ -1071,58 +1115,70 @@ namespace eosio {
if( end > sync_known_lib_num )
end = sync_known_lib_num;
if( end > 0 && end >= start ) {
fc_dlog(logger, "conn ${n} recv blks ${s} to ${e}",("n",c->peer_name() )("s",start)("e",end));
c->sync_receiving.reset(new sync_state( start, end, sync_last_requested_num ) );
fc_dlog(logger, "conn ${n} recv blks ${s} to ${e}",("n",source->peer_name() )("s",start)("e",end));
source->sync_receiving.reset(new sync_state( start, end, sync_last_requested_num ) );
}
}
else {
fc_dlog(logger, "conn ${n} resetting sync recv",("n",c->peer_name() ));
c->sync_receiving.reset( );
fc_dlog(logger, "conn ${n} resetting sync recv",("n",source->peer_name() ));
source->sync_receiving.reset( );
}

if(source->sync_receiving && source->sync_receiving->start_block == head_block + 1) {
source->enqueue( (sync_request_message){source->sync_receiving->start_block,
source->sync_receiving->end_block});
sync_last_requested_num = source->sync_receiving->end_block;
}
}

void sync_manager::take_chunk( connection_ptr c) {
if( !c->sync_receiving) {
elog( "take_chunk called, but sync_receiving is empty");
return;
}
sync_state_ptr ss;
c->sync_receiving.swap(ss);
fc_dlog(logger, "conn ${n} losing recv blks ${s} to ${e}",("n",c->peer_name() )("s",ss->start_block)("e",ss->end_block));

if( chain_plug->chain().head_block_num() == sync_known_lib_num ) {
handshake_message hello;
handshake_initializer::populate(hello);
fc_dlog(logger, "All caught up with last known last irreversible block resending handshake");
for( auto &ci : my_impl->connections) {
if( ci->current()) {
hello.generation = ++ci->sent_handshake_count;
fc_dlog(logger, "send to ${p}", ("p",ci->peer_name()));
ci->enqueue( hello );
}
if( !c->sync_receiving) {
elog( "take_chunk called, but sync_receiving is empty");
return;
}
sync_state_ptr ss;
c->sync_receiving.swap(ss);
fc_dlog(logger, "conn ${n} covered blks ${s} to ${e}",("n",c->peer_name() )("s",ss->start_block)("e",ss->end_block));

if( chain_plug->chain().head_block_num() == sync_known_lib_num ) {
handshake_message hello;
handshake_initializer::populate(hello);
fc_dlog(logger, "All caught up with last known last irreversible block resending handshake");
for( auto &ci : my_impl->connections) {
if( ci->current()) {
hello.generation = ++ci->sent_handshake_count;
fc_dlog(logger, "send to ${p}", ("p",ci->peer_name()));
ci->enqueue( hello );
}
}
if( c->connected()) {
assign_chunk( c);
}
request_next_chunk();
}
request_next_chunk();
}

void sync_manager::start_sync( connection_ptr c, uint32_t target) {
if( !syncing()) {
sync_last_requested_num = chain_plug->chain().head_block_num();
}
ilog( "Catching up with chain, our last req is ${cc}, theirs is ${t}",
( "cc",sync_last_requested_num)("t",target));
if( target > sync_known_lib_num) {
sync_known_lib_num = target;
}
if( c->sync_receiving && c->sync_receiving->end_block > 0) {
return;
void sync_manager::start_sync( connection_ptr c, uint32_t target) {
if( !syncing()) {
sync_last_requested_num = chain_plug->chain().head_block_num();
fc_dlog(logger, "Inform other open connections that we are syncing");

handshake_message hello;
handshake_initializer::populate(hello);
for( auto &ci : my_impl->connections) {
if( ci->current()) {
hello.generation = ++ci->sent_handshake_count;
fc_dlog(logger, "send to ${p}", ("p",ci->peer_name()));
ci->enqueue( hello );
}
}
assign_chunk( c);
request_next_chunk();
}
ilog( "Catching up with chain, our last req is ${cc}, theirs is ${t}", ( "cc",sync_last_requested_num)("t",target));
if( target > sync_known_lib_num) {
sync_known_lib_num = target;
}
if( c->sync_receiving && c->sync_receiving->end_block > 0) {
return;
}
request_next_chunk(c);
}

void sync_manager::reassign_fetch() {
request_next_chunk();
Expand Down Expand Up @@ -1255,11 +1311,13 @@ namespace eosio {
close( conn );
}
} catch (...) {
elog( "Undefined exception hanlding the read data from connection: ${m}",( "m", ec.message() ) );
close( conn );
}
}
);
} catch (...) {
elog( "Undefined exception handling reading" );
close( conn );
}
}
Expand Down Expand Up @@ -1614,7 +1672,7 @@ namespace eosio {
}

void net_plugin_impl::handle_message( connection_ptr c, const sync_request_message &msg) {
fc_dlog(logger, "got a sync_request_message from ${p}", ("p",c->peer_name()));
fc_dlog(logger, "got a sync_request_message { ${s} - ${e} } from ${p}",("s",msg.start_block)("e",msg.end_block) ("p",c->peer_name()));
if( msg.end_block == 0) {
c->sync_requested.reset();
} else {
Expand Down Expand Up @@ -1800,35 +1858,35 @@ namespace eosio {
fc_dlog(logger, "got signed_block #${n} from ${p}", ("n",msg.block_num())("p",c->peer_name()));
chain_controller &cc = chain_plug->chain();
block_id_type blk_id = msg.id();
bool has_chunk = false;
uint32_t num = msg.block_num();
bool syncing = sync_master->syncing();
try {
if( cc.is_known_block(blk_id)) {
return;
}
} catch( ...) {
}
if( cc.head_block_num() >= msg.block_num()) {
elog( "received forking block #${n}",( "n",msg.block_num()));
elog( "received forking block #${n} from ${p}",( "n",num)("p",c->peer_name()));
}
fc::microseconds age( fc::time_point::now() - msg.timestamp);
fc_dlog(logger, "got signed_block #${n} from ${p} block age in secs = ${age}",("n",msg.block_num())("p",c->peer_name())("age",age.to_seconds()));

bool has_chunk = false;
uint32_t num = msg.block_num();
bool syncing = sync_master->syncing();
if( syncing ) {
has_chunk =( c->sync_receiving && c->sync_receiving->end_block > 0);

if( !has_chunk) {
if(c->sync_receiving)
elog("got a block while syncing but sync_receiving end block == 0 #${n}",
( "n",num));
elog("got a block while syncing but sync_receiving end block == 0 #${n} from ${p}", ( "n",num)("p",c->peer_name()));
else
elog("got a block while syncing but no sync_receiving set #${n}",
( "n",num));
elog("got a block while syncing but no sync_receiving set #${n} from ${p}", ( "n",num)("p",c->peer_name()));
}
else {
if( c->sync_receiving->last + 1 != num) {
wlog( "expected block ${next} but got ${num}",("next",c->sync_receiving->last+1)("num",num));
wlog( "expected block ${next} but got ${num} from ${p}",("next",c->sync_receiving->last+1)("num",num)("p",c->peer_name()));
c->cancel_sync("blk out of order");
sync_master->reassign_fetch();
return;
}
c->sync_receiving->last = num;
Expand All @@ -1841,22 +1899,22 @@ namespace eosio {
chain_plug->accept_block(msg, syncing);
accepted = true;
} catch( const unlinkable_block_exception &ex) {
elog( "handle signed block: unlinkable_block_exception accept block #${n} syncing",("n",num));
elog( "handle signed block: unlinkable_block_exception accept block #${n} syncing from ${p}",("n",num)("p",c->peer_name()));
c->enqueue( go_away_message( go_away_reason::unlinkable ));
} catch( const assert_exception &ex) {
elog( "unable to accept block on assert exception ${n}",("n",ex.what()));
elog( "unable to accept block on assert exception ${n} from ${p}",("n",ex.to_string())("p",c->peer_name()));
} catch( const fc::exception &ex) {
elog( "accept_block threw a non-assert exception ${x}",( "x",ex.what()));
elog( "accept_block threw a non-assert exception ${x} from ${p}",( "x",ex.to_string())("p",c->peer_name()));
} catch( ...) {
elog( "handle sync block caught something else");
elog( "handle sync block caught something else from ${p}",("num",num)("p",c->peer_name()));
}
}

if( has_chunk) {
if( !accepted) {
c->enqueue( ( request_message ) {ordered_txn_ids( ), ordered_blk_ids( )} );
wlog("block ${num}, ${bid} not accepted from ${p}",("num",num)("p",c->peer_name()));
c->cancel_sync("blk not acceptd");
sync_master->reassign_fetch();
// c->sync_receiving->block_cache.emplace_back(std::move(c->blk_buffer));
}

if( num == c->sync_receiving->end_block) {
Expand Down Expand Up @@ -1979,9 +2037,9 @@ namespace eosio {
if( c->peer_addr.empty( ) ) {
--num_clients;
}
c->close();
if( c->sync_receiving)
sync_master->take_chunk( c);
c->close();
}


Expand Down

0 comments on commit a6ff477

Please sign in to comment.