Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
cutting the verbosity of non-debug runs, make sure the sync process c…
Browse files Browse the repository at this point in the history
…an tolerate restart of peer
  • Loading branch information
pmesnier committed Dec 11, 2017
1 parent 6ebe934 commit 387ef2e
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 26 deletions.
17 changes: 3 additions & 14 deletions plugins/net_plugin/include/eos/net_plugin/message_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ namespace eosio {
* Does not affect the read or write pointer.
*/
void add_buffer_to_chain() {
elog ("adding a buffer, = ${s}. buff.size = ${b}",("s",sanity_check)("b",buffers.size()));
sanity_check++;
buffers.push_back(pool().malloc());
}
Expand All @@ -86,10 +85,6 @@ namespace eosio {
*/
void add_space(uint32_t bytes) {
int buffers_to_add = bytes / buffer_len + 1;
if (write_ind.first >= buffers.size()) {
elog ("growing buffer, from ${bs} (sanity ${s}) adding ${bta} ",("bs",buffers.size())("bta",buffers_to_add)("s",sanity_check));
}

for (int i = 0; i < buffers_to_add; i++) {
sanity_check++;
buffers.push_back(pool().malloc());
Expand All @@ -101,8 +96,8 @@ namespace eosio {
* discarded.
*/
void reset() {
elog ("read_ind = ${r1}, ${r2} write_ind = ${w1}, ${w2}, buff.size = ${bs}, sanity = ${s}",
("r1",read_ind.first)("r2",read_ind.second)("w1",write_ind.first)("w2",write_ind.second)("bs",buffers.size())("s",sanity_check));
//dlog ("read_ind = ${r1}, ${r2} write_ind = ${w1}, ${w2}, buff.size = ${bs}, sanity = ${s}",
// ("r1",read_ind.first)("r2",read_ind.second)("w1",write_ind.first)("w2",write_ind.second)("bs",buffers.size())("s",sanity_check));
if( buffers.size() != sanity_check) {
exit(0);
}
Expand Down Expand Up @@ -152,10 +147,8 @@ namespace eosio {
void advance_read_ptr(uint32_t bytes) {
advance_index(read_ind, bytes);
if (read_ind == write_ind) {
ilog("calling reset");
reset();
} else if (read_ind.first > 0) {
elog ("shrinking buffer, from ${bs} (sanity = ${s}) by ${r1}",("bs",buffers.size())("r1",read_ind.first)("s",sanity_check));
while (read_ind.first > 0) {
pool().destroy(buffers.front());
buffers.pop_front();
Expand All @@ -172,9 +165,6 @@ namespace eosio {
*/
void advance_write_ptr(uint32_t bytes) {
advance_index(write_ind, bytes);
if (write_ind.first >= buffers.size()) {
elog ("growing buffer, from ${bs} (sanity = ${s}) to ${w1}",("bs",buffers.size())("w1",write_ind.first+1)("s",sanity_check));
}
while (write_ind.first >= buffers.size()) {
sanity_check++;
buffers.push_back(pool().malloc());
Expand Down Expand Up @@ -270,9 +260,8 @@ namespace eosio {
index_t read_ind;
index_t write_ind;
size_t sanity_check;
};


};

/*
* @brief datastream adapter that adapts message_buffer for use with fc unpack
Expand Down
23 changes: 11 additions & 12 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ namespace eosio {
sync_state_ptr sync_requested; // this peer is requesting info from us
socket_ptr socket;

message_buffer<65546> pending_message_buffer;
message_buffer<1024*1024> pending_message_buffer;
vector<char> send_buffer;
vector<char> blk_buffer;

Expand Down Expand Up @@ -884,6 +884,7 @@ namespace eosio {
switch (reason) {
case validation :
case fatal_other : {
no_retry = reason;
enqueue( go_away_message( reason ));
break;
}
Expand Down Expand Up @@ -1107,8 +1108,8 @@ namespace eosio {

void sync_manager::reset_lib_num() {
sync_known_lib_num = chain_plug->chain().last_irreversible_block_num();
sync_last_requested_num = 0;
for (auto& c : my_impl->connections) {
sync_last_requested_num = chain_plug->chain().head_block_num();
for (auto& c : my_impl->connections) {
if( c->last_handshake.last_irreversible_block_num > sync_known_lib_num) {
sync_known_lib_num =c->last_handshake.last_irreversible_block_num;
}
Expand All @@ -1128,7 +1129,7 @@ namespace eosio {
uint32_t head_block = chain_plug->chain().head_block_num();

if (head_block < sync_last_requested_num) {
ilog ("ignoring request, head is ${h} last req = ${r}",("h",head_block)("r",sync_last_requested_num));
fc_dlog (logger, "ignoring request, head is ${h} last req = ${r}",("h",head_block)("r",sync_last_requested_num));
return;
}

Expand Down Expand Up @@ -1196,7 +1197,7 @@ namespace eosio {
}

if(source->sync_receiving && source->sync_receiving->start_block == head_block + 1) {
source->enqueue( (sync_request_message){source->sync_receiving->start_block,
source->enqueue( (sync_request_message){source->sync_receiving->start_block,
source->sync_receiving->end_block});
sync_last_requested_num = source->sync_receiving->end_block;
}
Expand Down Expand Up @@ -1246,6 +1247,7 @@ namespace eosio {
sync_known_lib_num = target;
}
if( c->sync_receiving && c->sync_receiving->end_block > 0) {
ilog("connection already has end block ${eb}",("eb",c->sync_receiving->end_block));
return;
}
request_next_chunk(c);
Expand Down Expand Up @@ -1355,8 +1357,10 @@ namespace eosio {
[this,conn]( boost::system::error_code ec, std::size_t bytes_transferred ) {
try {
if( !ec ) {
ilog("bytes_xferd = ${bt} btw = ${btw}",("bt",bytes_transferred)("btw",conn->pending_message_buffer.bytes_to_write()));
// FC_ASSERT(bytes_transferred <= conn->pending_message_buffer.bytes_to_write());
if (bytes_transferred > conn->pending_message_buffer.bytes_to_write()) {
elog("async_read_some callback: bytes_transfered = ${bt}, buffer.bytes_to_write = ${btw}",("bt",bytes_transferred)("btw",conn->pending_message_buffer.bytes_to_write()));
}
FC_ASSERT(bytes_transferred <= conn->pending_message_buffer.bytes_to_write());
conn->pending_message_buffer.advance_write_ptr(bytes_transferred);
while (conn->pending_message_buffer.bytes_to_read() > 0) {
uint32_t bytes_in_buffer = conn->pending_message_buffer.bytes_to_read();
Expand Down Expand Up @@ -1518,7 +1522,6 @@ namespace eosio {
block_id_type head_id = cc.head_block_id();

if( peer_lib > head || sync_master->syncing() ) {
ilog("calling start_sync, peer_lib = ${pl} head = ${h}",("pl",peer_lib)("h",head));
sync_master->start_sync( c, peer_lib );
}
else if( msg.head_id != head_id ) {
Expand Down Expand Up @@ -1649,7 +1652,6 @@ namespace eosio {
break;
}
case id_list_modes::last_irr_catch_up : {
ilog("calling start_sync, pending = ${pl}",("pl",msg.known_blocks.pending));
if (!c->sync_receiving) {
sync_master->start_sync(c, msg.known_blocks.pending);
}
Expand Down Expand Up @@ -1765,10 +1767,8 @@ namespace eosio {
}

void net_plugin_impl::handle_message( connection_ptr c, const sync_request_message &msg) {
ilog( "got a sync_request_message { ${s} - ${e} } from ${p}",("s",msg.start_block)("e",msg.end_block) ("p",c->peer_name()));
if( msg.end_block == 0) {
c->sync_requested.reset();
ilog ("out_queue size is ${os} and write_queue size is ${ws}",("os",c->out_queue.size())("ws", c->write_queue.size()));
c->flush_queues();
} else {
c->sync_requested.reset(new sync_state( msg.start_block,msg.end_block,msg.start_block-1));
Expand Down Expand Up @@ -2016,7 +2016,6 @@ namespace eosio {
if( num == c->sync_receiving->end_block) {
sync_master->take_chunk( c);
} else {
// ilog ("calling sync wait again");
c->sync_wait( );
}
}
Expand Down

0 comments on commit 387ef2e

Please sign in to comment.