Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Merge pull request #885 from EOSIO/p2p-stat-266-gh879
Browse files Browse the repository at this point in the history
#879 implement sync_manager::reasign_chunk
  • Loading branch information
heifner authored Dec 7, 2017
2 parents 59cc5e0 + 7bcf0b1 commit b2eb166
Showing 1 changed file with 83 additions and 111 deletions.
194 changes: 83 additions & 111 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -344,13 +344,13 @@ namespace eosio {
struct sync_state {
sync_state(uint32_t start = 0, uint32_t end = 0, uint32_t last_acted = 0)
:start_block( start ), end_block( end ), last( last_acted ),
start_time(time_point::now()), block_cache()
start_time(time_point::now())//, block_cache()
{}
uint32_t start_block;
uint32_t end_block;
uint32_t last; ///< last sent or received
time_point start_time; ///< time request made or received
deque<vector<char> > block_cache;
// deque<vector<char> > block_cache;
};

using sync_state_ptr = shared_ptr< sync_state >;
Expand Down Expand Up @@ -516,8 +516,8 @@ namespace eosio {
uint32_t sync_last_requested_num;
uint32_t sync_req_span;

deque<sync_state_ptr> full_chunks;
deque<sync_state_ptr> partial_chunks;
// deque<sync_state_ptr> full_chunks;
// deque<sync_state_ptr> partial_chunks;
deque<block_id_type> _blocks;
chain_plugin * chain_plug;

Expand All @@ -532,7 +532,7 @@ namespace eosio {

void set_blocks_to_fetch(vector<block_id_type>);
void assign_fectch(connection_ptr c);
void reassign_fetch(connection_ptr c);
void reassign_fetch();

static const fc::string logger_name;
static fc::logger logger;
Expand Down Expand Up @@ -987,14 +987,14 @@ namespace eosio {
void connection::fetch_timeout( boost::system::error_code ec ) {
if( !ec ) {
if( !( pending_fetch->req_trx.empty( ) || pending_fetch->req_blocks.empty( ) ) ) {
enqueue( ( request_message ) {ordered_txn_ids( ), ordered_blk_ids( )} );
my_impl->sync_master->reassign_fetch( shared_from_this( ) );
enqueue( ( request_message ) {ordered_txn_ids( ), ordered_blk_ids( )} );
my_impl->sync_master->reassign_fetch( );
}
}
else if( ec == boost::asio::error::operation_aborted ) {
if( !connected( ) ) {
fc_dlog(logger, "fetch timeout was cancelled due to dead connection");
my_impl->sync_master->reassign_fetch( shared_from_this( ) );
my_impl->sync_master->reassign_fetch( );
}
}
else {
Expand Down Expand Up @@ -1065,13 +1065,7 @@ namespace eosio {
uint32_t start = 0;
uint32_t end = 0;

if( !partial_chunks.empty( ) ) {
c->sync_receiving = partial_chunks.front( );
partial_chunks.pop_front( );
start = c->sync_receiving->last + 1;
end = c->sync_receiving->end_block;
}
else if( sync_last_requested_num != sync_known_lib_num ) {
if( sync_last_requested_num != sync_known_lib_num ) {
start = sync_last_requested_num + 1;
end = ( start + sync_req_span - 1 );
if( end > sync_known_lib_num )
Expand All @@ -1085,45 +1079,9 @@ namespace eosio {
fc_dlog(logger, "conn ${n} resetting sync recv",("n",c->peer_name() ));
c->sync_receiving.reset( );
}
#if 0
if( end > 0 && end >= start ) {
c->enqueue( (sync_request_message){start, end} );
sync_last_requested_num = end;
}
#endif
}

struct postcache : public fc::visitor<void> {
chain_plugin * chain_plug;
postcache(chain_plugin *cp) : chain_plug (cp) {}

void operator( )(const signed_block &block) const
{
try {
chain_plug->accept_block( block,true );
} catch( const unlinkable_block_exception &ex ) {
elog( "post cache: unlinkable_block_exception accept block #${n}",("n",block.block_num()));
} catch (const assert_exception &ex) {
elog("post cache: unable to accept block on assert exception ${n}",("n",ex.to_string()));
} catch (const fc::exception &ex) {
elog("post cache: accept_block threw a non-assert exception ${x}", ("x",ex.what()));
} catch (...) {
elog("post cache: unknown error accepting cached block");
}
}

template <typename T> void operator()(const T &msg) const { /* no-op */ }
};

void sync_manager::apply_chunk( sync_state_ptr ss) {
postcache pc(chain_plug);
for( auto & blk : ss->block_cache) {
auto block = fc::raw::unpack<net_message>( blk );
block.visit( pc);
}
}

void sync_manager::take_chunk( connection_ptr c) {
void sync_manager::take_chunk( connection_ptr c) {
if( !c->sync_receiving) {
elog( "take_chunk called, but sync_receiving is empty");
return;
Expand All @@ -1132,44 +1090,6 @@ namespace eosio {
c->sync_receiving.swap(ss);
fc_dlog(logger, "conn ${n} losing recv blks ${s} to ${e}",("n",c->peer_name() )("s",ss->start_block)("e",ss->end_block));

if( !ss->block_cache.empty()) {
if( ss->last < ss->end_block) {
partial_chunks.push_back( ss );
return;
}

if( ss->start_block != chain_plug->chain().last_irreversible_block_num() + 1) {
bool found = false;
for( auto pos = full_chunks.begin(); !found && pos != full_chunks.end(); ++pos) {
if( ss->end_block <( *pos)->start_block) {
full_chunks.insert( pos,ss);
found = true;
}
}
if( !found) {
full_chunks.push_back( ss); //either full chunks is empty or pos ran off the end
}
}
else {
apply_chunk( ss);
}
}
while( !full_chunks.empty()) {
auto chunk = full_chunks.front();
if( chunk->start_block == chain_plug->chain().head_block_num() + 1) {
apply_chunk( chunk);
if( chunk->last == chunk->end_block ) {
full_chunks.pop_front();
}
else {
chunk->start_block = chunk->last+1;
break;
}
}
else
break;
}

if( chain_plug->chain().head_block_num() == sync_known_lib_num ) {
handshake_message hello;
handshake_initializer::populate(hello);
Expand Down Expand Up @@ -1204,8 +1124,8 @@ namespace eosio {
request_next_chunk();
}

void sync_manager::reassign_fetch( connection_ptr c) {
#warning( "TODO: migrate remaining fetch requests to other peers");
void sync_manager::reassign_fetch() {
request_next_chunk();
}

//------------------------------------------------------------------------
Expand Down Expand Up @@ -1523,47 +1443,62 @@ namespace eosio {
// notices of previously unknown blocks or txns,
//
fc_dlog(logger, "got a notice_message from ${p}", ("p",c->peer_name()));
bool can_fwd = false;
notice_message fwd;
request_message req;
bool send_req = false;
chain_controller &cc = chain_plug->chain();
if( msg.known_trx.pending > 0) {
// plan to get all except what we already know about.
req.req_trx.mode = id_list_modes::catch_up;
send_req = true;
size_t known_sum = local_txns.size();
if( known_sum ) {
for( const auto& t : local_txns.get<by_id>( ) ) {
req.req_trx.ids.push_back( t.id );
switch (msg.known_trx.mode) {
case id_list_modes::none:
case id_list_modes::last_irr_catch_up: {
req.req_trx.mode = id_list_modes::none;
fwd.known_trx.mode = id_list_modes::none;
break;
}
case id_list_modes::catch_up : {
if( msg.known_trx.pending > 0) {
// plan to get all except what we already know about.
req.req_trx.mode = id_list_modes::catch_up;
send_req = true;
size_t known_sum = local_txns.size();
if( known_sum ) {
for( const auto& t : local_txns.get<by_id>( ) ) {
req.req_trx.ids.push_back( t.id );
}
}
}
break;
}
else {
case id_list_modes::normal: {
fwd.known_trx.mode = id_list_modes::normal;
fwd.known_trx.pending = 0;
req.req_trx.mode = id_list_modes::normal;
req.req_trx.pending = 0;
for( const auto& t : msg.known_trx.ids ) {
const auto &tx = my_impl->local_txns.get<by_id>( ).find( t );

if( tx == my_impl->local_txns.end( ) ) {
c->trx_state.insert( ( transaction_state ){ t,true,true,( uint32_t ) - 1,
c->trx_state.insert( ( transaction_state ){ t,true,true,( uint32_t ) - 1,
fc::time_point( ),fc::time_point( ) } );
if( !sync_master->syncing( ) ) {
// my_impl->local_txns.insert();
if( !sync_master->syncing( ) ) {
fwd.known_trx.ids.push_back( t );
}
req.req_trx.ids.push_back( t );
}
}
send_req = !req.req_trx.ids.empty();
}
}

fc_dlog(logger,"this is a ${m} notice with ${n} blocks", ("m",modes_str(msg.known_blocks.mode))("n",msg.known_blocks.pending));


switch (msg.known_blocks.mode) {
case id_list_modes::none : {

return;
if (msg.known_trx.mode != id_list_modes::normal) {
return;
}
break;
}
case id_list_modes::last_irr_catch_up : {
sync_master->start_sync(c, msg.known_blocks.pending);
Expand Down Expand Up @@ -1602,10 +1537,45 @@ namespace eosio {
}
}

if( can_fwd ) {
send_all( fwd, [c,fwd](connection_ptr cptr) -> bool {
return cptr != c;
});
if (fwd.known_trx.mode == id_list_modes::normal ||
fwd.known_blocks.mode == id_list_modes::normal) {
for (auto &conn : my_impl->connections) {
if (conn->syncing || conn == c) {
continue;
}
notice_message to_peer;
to_peer.known_trx.mode = fwd.known_trx.mode;
if (fwd.known_trx.mode == id_list_modes::normal) {
for (const auto &t : fwd.known_trx.ids) {
const auto &tx = conn->trx_state.get<by_id>( ).find( t );
if( tx == conn->trx_state.end( ) ) {
conn->trx_state.insert((transaction_state){t,false,true,(uint32_t)-1,
fc::time_point(),fc::time_point()});
to_peer.known_trx.ids.push_back( t );
}
}
if (to_peer.known_trx.ids.empty()) {
to_peer.known_trx.mode = id_list_modes::none;
}
}
to_peer.known_blocks.mode = fwd.known_blocks.mode;
if (fwd.known_blocks.mode == id_list_modes::normal) {
for (const auto &bid : fwd.known_blocks.ids) {
const auto &blk = conn->block_state.get<by_id>( ).find( bid );
if( blk == conn->block_state.end( ) ) {
conn->block_state.insert((block_state){bid,false,true,fc::time_point()} );
to_peer.known_blocks.ids.push_back( bid );
}
}
if (to_peer.known_blocks.ids.empty()) {
to_peer.known_blocks.mode = id_list_modes::none;
}
}
if (to_peer.known_trx.mode == id_list_modes::normal ||
to_peer.known_blocks.mode == id_list_modes::normal) {
conn->enqueue (to_peer);
}
}
}
fc_dlog(logger, "send req = ${sr}", ("sr",send_req));
if( send_req) {
Expand Down Expand Up @@ -1884,7 +1854,9 @@ namespace eosio {

if( has_chunk) {
if( !accepted) {
c->sync_receiving->block_cache.emplace_back(std::move(c->blk_buffer));
c->enqueue( ( request_message ) {ordered_txn_ids( ), ordered_blk_ids( )} );
sync_master->reassign_fetch();
// c->sync_receiving->block_cache.emplace_back(std::move(c->blk_buffer));
}

if( num == c->sync_receiving->end_block) {
Expand Down

0 comments on commit b2eb166

Please sign in to comment.