Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Merge pull request #6149 from EOSIO/key-perf
Browse files Browse the repository at this point in the history
Multi-thread transaction key recovery
  • Loading branch information
b1bart authored Oct 26, 2018
2 parents 36046fa + a9cdbe7 commit 073344a
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 124 deletions.
55 changes: 35 additions & 20 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include <eosio/chain/account_object.hpp>
#include <eosio/chain/block_summary_object.hpp>
#include <eosio/chain/eosio_contract.hpp>
#include <eosio/chain/global_property_object.hpp>
#include <eosio/chain/contract_table_objects.hpp>
#include <eosio/chain/generated_transaction_object.hpp>
Expand All @@ -20,10 +21,11 @@
#include <chainbase/chainbase.hpp>
#include <fc/io/json.hpp>
#include <fc/scoped_exit.hpp>

#include <fc/variant_object.hpp>

#include <eosio/chain/eosio_contract.hpp>
#include <boost/asio/thread_pool.hpp>
#include <boost/asio/post.hpp>


namespace eosio { namespace chain {

Expand Down Expand Up @@ -133,6 +135,7 @@ struct controller_impl {
optional<fc::microseconds> subjective_cpu_leeway;
bool trusted_producer_light_validation = false;
uint32_t snapshot_head_block = 0;
optional<boost::asio::thread_pool> thread_pool;

typedef pair<scope_name,action_name> handler_key;
map< account_name, map<handler_key, apply_handler> > apply_handlers;
Expand All @@ -144,6 +147,14 @@ struct controller_impl {
*/
map<digest_type, transaction_metadata_ptr> unapplied_transactions;

// async on thread_pool and return future
template<typename F>
auto async_thread_pool( F&& f ) {
auto task = std::make_shared<std::packaged_task<decltype( f() )()>>( std::forward<F>( f ) );
boost::asio::post( *thread_pool, [task]() { (*task)(); } );
return task->get_future();
}

void pop_block() {
auto prev = fork_db.get_block( head->header.previous );
EOS_ASSERT( prev, block_validate_exception, "attempt to pop beyond last irreversible block" );
Expand Down Expand Up @@ -335,6 +346,8 @@ struct controller_impl {

void init(const snapshot_reader_ptr& snapshot) {

thread_pool.emplace( conf.thread_pool_size );

if (snapshot) {
EOS_ASSERT(!head, fork_database_exception, "");
snapshot->validate();
Expand Down Expand Up @@ -393,6 +406,9 @@ struct controller_impl {
~controller_impl() {
pending.reset();

thread_pool->join();
thread_pool->stop();

db.flush();
reversible_blocks.flush();
}
Expand Down Expand Up @@ -1161,14 +1177,27 @@ struct controller_impl {
auto producer_block_id = b->id();
start_block( b->timestamp, b->confirmed, s , producer_block_id);

std::vector<transaction_metadata_ptr> packed_transactions;
for( const auto& receipt : b->transactions ) {
if( receipt.trx.contains<packed_transaction>()) {
auto& pt = receipt.trx.get<packed_transaction>();
auto mtrx = std::make_shared<transaction_metadata>( pt );
if( !self.skip_auth_check() ) {
mtrx->signing_keys_future = async_thread_pool( [this, mtrx]() {
return std::make_pair( this->chain_id, mtrx->trx.get_signature_keys( this->chain_id ) );
} );
}
packed_transactions.emplace_back( std::move( mtrx ) );
}
}

transaction_trace_ptr trace;

size_t packed_idx = 0;
for( const auto& receipt : b->transactions ) {
auto num_pending_receipts = pending->_pending_block_state->block->transactions.size();
if( receipt.trx.contains<packed_transaction>() ) {
auto& pt = receipt.trx.get<packed_transaction>();
auto mtrx = std::make_shared<transaction_metadata>(pt);
trace = push_transaction( mtrx, fc::time_point::maximum(), receipt.cpu_usage_us, true );
trace = push_transaction( packed_transactions.at(packed_idx++), fc::time_point::maximum(), receipt.cpu_usage_us, true );
} else if( receipt.trx.contains<transaction_id_type>() ) {
trace = push_scheduled_transaction( receipt.trx.get<transaction_id_type>(), fc::time_point::maximum(), receipt.cpu_usage_us, true );
} else {
Expand Down Expand Up @@ -1251,16 +1280,7 @@ struct controller_impl {
} FC_LOG_AND_RETHROW( )
}

void push_confirmation( const header_confirmation& c ) {
EOS_ASSERT(!pending, block_validate_exception, "it is not valid to push a confirmation when there is a pending block");
fork_db.add( c );
emit( self.accepted_confirmation, c );
if ( read_mode != db_read_mode::IRREVERSIBLE ) {
maybe_switch_forks();
}
}

void maybe_switch_forks( controller::block_status s = controller::block_status::complete ) {
void maybe_switch_forks( controller::block_status s ) {
auto new_head = fork_db.head();

if( new_head->header.previous == head->id ) {
Expand Down Expand Up @@ -1632,11 +1652,6 @@ void controller::push_block( const signed_block_ptr& b, block_status s ) {
my->push_block( b, s );
}

void controller::push_confirmation( const header_confirmation& c ) {
validate_db_available_size();
my->push_confirmation( c );
}

transaction_trace_ptr controller::push_transaction( const transaction_metadata_ptr& trx, fc::time_point deadline, uint32_t billed_cpu_time_us ) {
validate_db_available_size();
EOS_ASSERT( get_read_mode() != chain::db_read_mode::READ_ONLY, transaction_type_exception, "push transaction not allowed in read-only mode" );
Expand Down
2 changes: 2 additions & 0 deletions libraries/chain/include/eosio/chain/block_header_state.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once
#include <eosio/chain/block_header.hpp>
#include <eosio/chain/incremental_merkle.hpp>
#include <future>

namespace eosio { namespace chain {

Expand All @@ -25,6 +26,7 @@ struct block_header_state {
public_key_type block_signing_key;
vector<uint8_t> confirm_count;
vector<header_confirmation> confirmations;
std::shared_future<public_key_type> block_signing_key_future;

block_header_state next( const signed_block_header& h, bool trust = false )const;
block_header_state generate_next( block_timestamp_type when )const;
Expand Down
1 change: 1 addition & 0 deletions libraries/chain/include/eosio/chain/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ const static uint32_t default_max_trx_delay = 45*24*3600; //
const static uint32_t default_max_inline_action_size = 4 * 1024; // 4 KB
const static uint16_t default_max_inline_action_depth = 4;
const static uint16_t default_max_auth_depth = 6;
const static uint16_t default_controller_thread_pool_size = 2;

const static uint32_t min_net_usage_delta_between_base_and_max_for_trx = 10*1024;
// Should be large enough to allow recovery from badly set blockchain parameters without a hard fork
Expand Down
7 changes: 1 addition & 6 deletions libraries/chain/include/eosio/chain/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ namespace eosio { namespace chain {
uint64_t state_guard_size = chain::config::default_state_guard_size;
uint64_t reversible_cache_size = chain::config::default_reversible_cache_size;
uint64_t reversible_guard_size = chain::config::default_reversible_guard_size;
uint16_t thread_pool_size = chain::config::default_controller_thread_pool_size;
bool read_only = false;
bool force_all_checks = false;
bool disable_replay_opts = false;
Expand Down Expand Up @@ -141,12 +142,6 @@ namespace eosio { namespace chain {

void push_block( const signed_block_ptr& b, block_status s = block_status::complete );

/**
* Call this method when a producer confirmation is received, this might update
* the last bft irreversible block and/or cause a switch of forks
*/
void push_confirmation( const header_confirmation& c );

const chainbase::database& db()const;

const fork_database& fork_db()const;
Expand Down
17 changes: 13 additions & 4 deletions libraries/chain/include/eosio/chain/transaction_metadata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
*/
#pragma once
#include <eosio/chain/transaction.hpp>
#include <eosio/chain/block.hpp>
#include <eosio/chain/trace.hpp>
#include <eosio/chain/types.hpp>
#include <future>

namespace eosio { namespace chain {

Expand All @@ -20,6 +20,7 @@ class transaction_metadata {
signed_transaction trx;
packed_transaction packed_trx;
optional<pair<chain_id_type, flat_set<public_key_type>>> signing_keys;
std::future<pair<chain_id_type,flat_set<public_key_type>>> signing_keys_future;
bool accepted = false;
bool implicit = false;
bool scheduled = false;
Expand All @@ -39,8 +40,16 @@ class transaction_metadata {
}

const flat_set<public_key_type>& recover_keys( const chain_id_type& chain_id ) {
if( !signing_keys || signing_keys->first != chain_id ) // Unlikely for more than one chain_id to be used in one nodeos instance
signing_keys = std::make_pair( chain_id, trx.get_signature_keys( chain_id ) );
// Unlikely for more than one chain_id to be used in one nodeos instance
if( !signing_keys || signing_keys->first != chain_id ) {
if( signing_keys_future.valid() ) {
signing_keys = signing_keys_future.get();
if( signing_keys->first == chain_id ) {
return signing_keys->second;
}
}
signing_keys = std::make_pair( chain_id, trx.get_signature_keys( chain_id ));
}
return signing_keys->second;
}

Expand Down
2 changes: 1 addition & 1 deletion libraries/chain/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ flat_set<public_key_type> transaction::get_signature_keys( const vector<signatur
using boost::adaptors::transformed;

constexpr size_t recovery_cache_size = 1000;
static recovery_cache_type recovery_cache;
static thread_local recovery_cache_type recovery_cache;
const digest_type digest = sig_digest(chain_id, cfd);

flat_set<public_key_type> recovered_pub_keys;
Expand Down
8 changes: 8 additions & 0 deletions plugins/chain_plugin/chain_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ void chain_plugin::set_program_options(options_description& cli, options_descrip
("chain-state-db-guard-size-mb", bpo::value<uint64_t>()->default_value(config::default_state_guard_size / (1024 * 1024)), "Safely shut down node when free space remaining in the chain state database drops below this size (in MiB).")
("reversible-blocks-db-size-mb", bpo::value<uint64_t>()->default_value(config::default_reversible_cache_size / (1024 * 1024)), "Maximum size (in MiB) of the reversible blocks database")
("reversible-blocks-db-guard-size-mb", bpo::value<uint64_t>()->default_value(config::default_reversible_guard_size / (1024 * 1024)), "Safely shut down node when free space remaining in the reverseible blocks database drops below this size (in MiB).")
("chain-threads", bpo::value<uint16_t>()->default_value(config::default_controller_thread_pool_size),
"Number of worker threads in controller thread pool")
("contracts-console", bpo::bool_switch()->default_value(false),
"print contract's output to console")
("actor-whitelist", boost::program_options::value<vector<string>>()->composing()->multitoken(),
Expand Down Expand Up @@ -417,6 +419,12 @@ void chain_plugin::plugin_initialize(const variables_map& options) {
if( options.count( "reversible-blocks-db-guard-size-mb" ))
my->chain_config->reversible_guard_size = options.at( "reversible-blocks-db-guard-size-mb" ).as<uint64_t>() * 1024 * 1024;

if( options.count( "chain-threads" )) {
my->chain_config->thread_pool_size = options.at( "chain-threads" ).as<uint16_t>();
EOS_ASSERT( my->chain_config->thread_pool_size > 0, plugin_config_exception,
"chain-threads ${num} must be greater than 0", ("num", my->chain_config->thread_pool_size) );
}

if( my->wasm_runtime )
my->chain_config->wasm_runtime = *my->wasm_runtime;

Expand Down
92 changes: 0 additions & 92 deletions unittests/forked_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,98 +345,6 @@ BOOST_AUTO_TEST_CASE( prune_remove_branch ) try {

} FC_LOG_AND_RETHROW()

BOOST_AUTO_TEST_CASE(confirmation) try {

tester c;
c.produce_blocks(10);
auto r = c.create_accounts( {N(dan),N(sam),N(pam),N(scott)} );
auto res = c.set_producers( {N(dan),N(sam),N(pam),N(scott)} );

private_key_type priv_sam = c.get_private_key( N(sam), "active" );
private_key_type priv_dan = c.get_private_key( N(dan), "active" );
private_key_type priv_pam = c.get_private_key( N(pam), "active" );
private_key_type priv_scott = c.get_private_key( N(scott), "active" );
private_key_type priv_invalid = c.get_private_key( N(invalid), "active" );

wlog("set producer schedule to [dan,sam,pam,scott]");
c.produce_blocks(50);
c.control->abort_block(); // discard pending block

BOOST_REQUIRE_EQUAL(61, c.control->head_block_num());

// 55 is by dan
block_state_ptr blk = c.control->fork_db().get_block_in_current_chain_by_num(55);
block_state_ptr blk61 = c.control->fork_db().get_block_in_current_chain_by_num(61);
block_state_ptr blk50 = c.control->fork_db().get_block_in_current_chain_by_num(50);

BOOST_REQUIRE_EQUAL(0, blk->bft_irreversible_blocknum);
BOOST_REQUIRE_EQUAL(0, blk->confirmations.size());

// invalid signature
BOOST_REQUIRE_EXCEPTION(c.control->push_confirmation(header_confirmation{blk->id, N(sam), priv_invalid.sign(blk->sig_digest())}),
fc::exception,
[] (const fc::exception &ex)->bool {
return ex.to_detail_string().find("confirmation not signed by expected key") != std::string::npos;
});

// invalid schedule
BOOST_REQUIRE_EXCEPTION(c.control->push_confirmation(header_confirmation{blk->id, N(invalid), priv_invalid.sign(blk->sig_digest())}),
fc::exception,
[] (const fc::exception &ex)->bool {
return ex.to_detail_string().find("producer not in current schedule") != std::string::npos;
});

// signed by sam
c.control->push_confirmation(header_confirmation{blk->id, N(sam), priv_sam.sign(blk->sig_digest())});

BOOST_REQUIRE_EQUAL(0, blk->bft_irreversible_blocknum);
BOOST_REQUIRE_EQUAL(1, blk->confirmations.size());

// double confirm not allowed
BOOST_REQUIRE_EXCEPTION(c.control->push_confirmation(header_confirmation{blk->id, N(sam), priv_sam.sign(blk->sig_digest())}),
fc::exception,
[] (const fc::exception &ex)->bool {
return ex.to_detail_string().find("block already confirmed by this producer") != std::string::npos;
});

// signed by dan
c.control->push_confirmation(header_confirmation{blk->id, N(dan), priv_dan.sign(blk->sig_digest())});

BOOST_REQUIRE_EQUAL(0, blk->bft_irreversible_blocknum);
BOOST_REQUIRE_EQUAL(2, blk->confirmations.size());

// signed by pam
c.control->push_confirmation(header_confirmation{blk->id, N(pam), priv_pam.sign(blk->sig_digest())});

// we have more than 2/3 of confirmations, bft irreversible number should be set
BOOST_REQUIRE_EQUAL(55, blk->bft_irreversible_blocknum);
BOOST_REQUIRE_EQUAL(55, blk61->bft_irreversible_blocknum); // bft irreversible number will propagate to higher block
BOOST_REQUIRE_EQUAL(0, blk50->bft_irreversible_blocknum); // bft irreversible number will not propagate to lower block
BOOST_REQUIRE_EQUAL(3, blk->confirmations.size());

// signed by scott
c.control->push_confirmation(header_confirmation{blk->id, N(scott), priv_scott.sign(blk->sig_digest())});

BOOST_REQUIRE_EQUAL(55, blk->bft_irreversible_blocknum);
BOOST_REQUIRE_EQUAL(4, blk->confirmations.size());

// let's confirm block 50 as well
c.control->push_confirmation(header_confirmation{blk50->id, N(sam), priv_sam.sign(blk50->sig_digest())});
c.control->push_confirmation(header_confirmation{blk50->id, N(dan), priv_dan.sign(blk50->sig_digest())});
c.control->push_confirmation(header_confirmation{blk50->id, N(pam), priv_pam.sign(blk50->sig_digest())});
BOOST_REQUIRE_EQUAL(50, blk50->bft_irreversible_blocknum); // bft irreversible number will not propagate to lower block

block_state_ptr blk54 = c.control->fork_db().get_block_in_current_chain_by_num(54);
BOOST_REQUIRE_EQUAL(50, blk54->bft_irreversible_blocknum);
BOOST_REQUIRE_EQUAL(55, blk->bft_irreversible_blocknum); // bft irreversible number will not be updated to lower value
BOOST_REQUIRE_EQUAL(55, blk61->bft_irreversible_blocknum);

c.produce_blocks(20);

block_state_ptr blk81 = c.control->fork_db().get_block_in_current_chain_by_num(81);
BOOST_REQUIRE_EQUAL(55, blk81->bft_irreversible_blocknum); // bft irreversible number will propagate into new blocks

} FC_LOG_AND_RETHROW()

BOOST_AUTO_TEST_CASE( read_modes ) try {
tester c;
Expand Down

0 comments on commit 073344a

Please sign in to comment.