Skip to content
This repository was archived by the owner on Aug 2, 2022. It is now read-only.

sync from block vault during block production #9752

Merged
merged 6 commits into from
Dec 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 68 additions & 2 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,28 @@ enum class pending_block_mode {
speculating
};

class producer_plugin_impl;
class block_only_sync : public blockvault::sync_callback {
producer_plugin_impl* _impl;
boost::asio::deadline_timer _start_sync_timer;
bool _pending = false;

public:
block_only_sync(producer_plugin_impl* impl, boost::asio::io_service& io)
: _impl(impl), _start_sync_timer(io) {}

bool is_pending() const { return _pending; }
void cancel() { _start_sync_timer.cancel(); }
void schedule();
void on_snapshot(const char* snapshot_filename) override;
void on_block(eosio::chain::signed_block_ptr block) override;
};

class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin_impl> {
public:
producer_plugin_impl(boost::asio::io_service& io)
:_timer(io)
,_block_vault_resync(this, io)
,_transaction_ack_channel(app().get_channel<compat::channels::transaction_ack>())
{
}
Expand All @@ -148,6 +166,7 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
std::map<chain::public_key_type, signature_provider_type> _signature_providers;
std::set<chain::account_name> _producers;
boost::asio::deadline_timer _timer;
block_only_sync _block_vault_resync;
using producer_watermark = std::pair<uint32_t, block_timestamp_type>;
std::map<chain::account_name, producer_watermark> _producer_watermarks;
pending_block_mode _pending_block_mode = pending_block_mode::speculating;
Expand Down Expand Up @@ -304,7 +323,7 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
if (check_connectivity) {
auto previous = chain.fetch_block_by_id(block->previous);
if (!previous) {
dlog("Don't have previous block for block number ${bn}, looking for block id ${pbi}",
fc_dlog(_log, "Don't have previous block for block number ${bn}, looking for block id ${pbi}",
("bn", block->block_num())("pbi", block->previous));
return true;
}
Expand All @@ -319,7 +338,7 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
// push the new block
auto handle_error = [&](const auto& e)
{
elog((e.to_detail_string()));
fc_elog(_log, (e.to_detail_string()));
throw;
};

Expand Down Expand Up @@ -390,7 +409,12 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
}, [this]( const transaction_id_type& id ) {
return _unapplied_transactions.get_trx( id );
} );

if ( blockvault != nullptr ) {
if (_block_vault_resync.is_pending() && _producers.count( block->producer ) > 0 ) {
// Cancel any pending resync from blockvault if we received any blocks from the same logical producer
_block_vault_resync.cancel();
}
blockvault->async_append_external_block(blk_state->dpos_irreversible_blocknum, blk_state->block, [](bool){});
}
} catch ( const guard_exception& e ) {
Expand Down Expand Up @@ -944,6 +968,7 @@ void producer_plugin::plugin_startup()
void producer_plugin::plugin_shutdown() {
try {
my->_timer.cancel();
my->_block_vault_resync.cancel();
} catch ( const std::bad_alloc& ) {
chain_plugin::handle_bad_alloc();
} catch ( const boost::interprocess::bad_alloc& ) {
Expand Down Expand Up @@ -2064,6 +2089,46 @@ static auto maybe_make_debug_time_logger() -> std::optional<decltype(make_debug_
}
}

void block_only_sync::schedule() {
if (!_pending) {
// wait one second to see if we can actually get the block from net plugin before we try to resync from block vault
_start_sync_timer.expires_from_now(boost::posix_time::seconds(1));
_pending = true;
_start_sync_timer.async_wait(app().get_priority_queue().wrap(
priority::high, [this, weak_impl = _impl->weak_from_this()](const boost::system::error_code& ec) {
auto shared_impl = weak_impl.lock();
auto impl = shared_impl.get();
if (impl && !ec) {
auto id = impl->chain_plug->chain().last_irreversible_block_id();
fc_dlog(_log, "Attempt to resync from block vault");
try {
impl->blockvault->sync(&id, *this);
} catch( fc::exception& er ) {
fc_wlog(_log, "Attempting to resync from blockvault encountered ${details}; the node must restart to "
"continue!",
("details", er.to_detail_string()));
app().quit();
}
}
this->_pending = false;
}));
}
}

void block_only_sync::on_snapshot(const char*) {
EOS_THROW(producer_exception, "a snapshot");
}

void block_only_sync::on_block(eosio::chain::signed_block_ptr block) {
try {
bool connectivity_check = false; // use false right now, should investigate further after 3.0 rc
_impl->on_sync_block(block, connectivity_check);
}
catch (unlinkable_block_exception&) {
fc_dlog(_log, "got unlinkable block ${num} from block vault", ("num", block->block_num()));
}
}

void producer_plugin_impl::produce_block() {
//ilog("produce_block ${t}", ("t", fc::time_point::now())); // for testing _produce_time_offset_us
EOS_ASSERT(_pending_block_mode == pending_block_mode::producing, producer_exception, "called produce_block while not actually producing");
Expand Down Expand Up @@ -2109,6 +2174,7 @@ void producer_plugin_impl::produce_block() {
pending_blk_state->block, [&p](bool b) { p.set_value(b); });
if (!f.get()) {
_latest_rejected_block_num = pending_blk_state->block->block_num();
_block_vault_resync.schedule();
EOS_ASSERT(false, block_validation_error, "Block rejected by block vault");
}

Expand Down
14 changes: 14 additions & 0 deletions tests/blockvault_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,20 @@ def get_successful_constructed_block_numbers_for_node(nodeId):

assert node2.waitForLibToAdvance(timeout=60)

Print("#################################################################################")
Print("# Scenario 4: Test one of the two identical producer node fails and the other #")
Print("# can take over. #")
Print("#################################################################################")
Print("Kill bios, node0, node1")
cluster.biosNode.kill(signal.SIGTERM)
node0.kill(signal.SIGTERM)
node1.kill(signal.SIGTERM)
time.sleep(10)
assert node2.waitForHeadToAdvance(timeout=60)

Print("#################################################################################")
Print("# Verify if there's any double production #")
Print("#################################################################################")
double_produced_block_numbers = get_successful_constructed_block_numbers_for_node(1).intersection(get_successful_constructed_block_numbers_for_node(2))

if len(double_produced_block_numbers) != 0:
Expand Down