Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Optimize transaction signature recovery #6471

Merged
merged 23 commits into from
Dec 18, 2018
Merged
Changes from 5 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
3095d85
Switch interface from packed_transaction_ptr to transaction_metadata_ptr
heifner Dec 12, 2018
49a9c55
Thread pool does not need to be optional
heifner Dec 12, 2018
4a4a80a
Add transaction_metadata create_signing_keys_future method
heifner Dec 12, 2018
f71d490
Start transaction signature earily in thread pool
heifner Dec 12, 2018
96fb1e5
Refactor packed_transaction for better encapsulation
heifner Dec 13, 2018
20ffa70
Add transaction_metadata create_signing_keys_future method
heifner Dec 12, 2018
f64da40
Start transaction signature earily in thread pool
heifner Dec 12, 2018
22ab63c
Update txn_test_gen_plugin to overlap transaction submit @taokayan
heifner Dec 14, 2018
64537c5
Remove redundant signing_keys check
heifner Dec 17, 2018
ca8e5bc
Add deadline to key recovery
heifner Dec 17, 2018
b8a5659
Modify producer_plugin to have its own thead_pool instead of using ch…
heifner Dec 17, 2018
957db7f
Move thread_pool join/stop to plugin shutdown so that they are joined…
heifner Dec 17, 2018
9270c9c
Fix signature future deadline from starting too early
heifner Dec 17, 2018
3e733f5
Fix overflow of deadline and deadline check
heifner Dec 17, 2018
6e9b441
initial setup of billing CPU for signatures recovered earlier
arhag Dec 18, 2018
2b6a88a
Make recovery cache non-thread local and guard by mutex
heifner Dec 18, 2018
94ad2d1
Calculate cpu usage of signature recovery
heifner Dec 18, 2018
48bf2d4
Add signature-cpu-billable-pct option to chain_plugin
heifner Dec 18, 2018
1ecb7cc
Add missing include of mutex
heifner Dec 18, 2018
75587d0
Assert signature-cpu-billable-pct is 0-100
heifner Dec 18, 2018
21d20a8
Fix capture of cpu_usage. move flat_set<public_key_type> into attribute
heifner Dec 18, 2018
058d4ac
clear recovered_pub_keys to preserve previous behaviour
heifner Dec 18, 2018
1c8640b
Add move into tuple creation
heifner Dec 18, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
@@ -407,9 +407,6 @@ struct controller_impl {
~controller_impl() {
pending.reset();

thread_pool.join();
thread_pool.stop();

db.flush();
reversible_blocks.flush();
}
@@ -1192,7 +1189,7 @@ struct controller_impl {
auto& pt = receipt.trx.get<packed_transaction>();
auto mtrx = std::make_shared<transaction_metadata>( std::make_shared<packed_transaction>( pt ) );
if( !self.skip_auth_check() ) {
transaction_metadata::create_signing_keys_future( mtrx, thread_pool, chain_id );
transaction_metadata::create_signing_keys_future( mtrx, thread_pool, chain_id, microseconds::maximum() );
}
packed_transactions.emplace_back( std::move( mtrx ) );
}
7 changes: 4 additions & 3 deletions libraries/chain/include/eosio/chain/transaction.hpp
Original file line number Diff line number Diff line change
@@ -60,9 +60,9 @@ namespace eosio { namespace chain {
digest_type sig_digest( const chain_id_type& chain_id, const vector<bytes>& cfd = vector<bytes>() )const;
flat_set<public_key_type> get_signature_keys( const vector<signature_type>& signatures,
const chain_id_type& chain_id,
fc::time_point deadline,
const vector<bytes>& cfd = vector<bytes>(),
bool allow_duplicate_keys = false,
bool use_cache = true )const;
bool allow_duplicate_keys = false)const;

uint32_t total_actions()const { return context_free_actions.size() + actions.size(); }
account_name first_authorizor()const {
@@ -92,7 +92,8 @@ namespace eosio { namespace chain {

const signature_type& sign(const private_key_type& key, const chain_id_type& chain_id);
signature_type sign(const private_key_type& key, const chain_id_type& chain_id)const;
flat_set<public_key_type> get_signature_keys( const chain_id_type& chain_id, bool allow_duplicate_keys = false, bool use_cache = true )const;
flat_set<public_key_type> get_signature_keys( const chain_id_type& chain_id, fc::time_point deadline,
bool allow_duplicate_keys = false )const;
};

struct packed_transaction {
4 changes: 2 additions & 2 deletions libraries/chain/include/eosio/chain/transaction_metadata.hpp
Original file line number Diff line number Diff line change
@@ -51,8 +51,8 @@ class transaction_metadata {

const flat_set<public_key_type>& recover_keys( const chain_id_type& chain_id );

static void create_signing_keys_future( const transaction_metadata_ptr& mtrx,
boost::asio::thread_pool& thread_pool, const chain_id_type& chain_id );
static void create_signing_keys_future( const transaction_metadata_ptr& mtrx, boost::asio::thread_pool& thread_pool,
const chain_id_type& chain_id, fc::microseconds timelimit );

uint32_t total_actions()const { return trx.context_free_actions.size() + trx.actions.size(); }
};
33 changes: 16 additions & 17 deletions libraries/chain/transaction.cpp
Original file line number Diff line number Diff line change
@@ -82,28 +82,28 @@ digest_type transaction::sig_digest( const chain_id_type& chain_id, const vector
}

flat_set<public_key_type> transaction::get_signature_keys( const vector<signature_type>& signatures,
const chain_id_type& chain_id, const vector<bytes>& cfd, bool allow_duplicate_keys, bool use_cache )const
const chain_id_type& chain_id, fc::time_point deadline, const vector<bytes>& cfd, bool allow_duplicate_keys)const
{ try {
using boost::adaptors::transformed;

constexpr size_t recovery_cache_size = 1000;
static thread_local recovery_cache_type recovery_cache;
fc::time_point start = fc::time_point::now();
const digest_type digest = sig_digest(chain_id, cfd);

flat_set<public_key_type> recovered_pub_keys;
for(const signature_type& sig : signatures) {
auto now = fc::time_point::now();
EOS_ASSERT( start + now <= deadline, tx_cpu_usage_exceeded, "transaction signature verification executed for too long",
arhag marked this conversation as resolved.
Show resolved Hide resolved
("now", now)("deadline", deadline)("start", start) );
public_key_type recov;
if( use_cache ) {
recovery_cache_type::index<by_sig>::type::iterator it = recovery_cache.get<by_sig>().find( sig );
const auto& tid = id();
if( it == recovery_cache.get<by_sig>().end() || it->trx_id != tid) {
recov = public_key_type( sig, digest );
recovery_cache.emplace_back(cached_pub_key{tid, recov, sig} ); //could fail on dup signatures; not a problem
} else {
recov = it->pub_key;
}
} else {
recovery_cache_type::index<by_sig>::type::iterator it = recovery_cache.get<by_sig>().find( sig );
const auto& tid = id();
if( it == recovery_cache.get<by_sig>().end() || it->trx_id != tid ) {
recov = public_key_type( sig, digest );
recovery_cache.emplace_back( cached_pub_key{tid, recov, sig} ); //could fail on dup signatures; not a problem
} else {
recov = it->pub_key;
}
bool successful_insertion = false;
std::tie(std::ignore, successful_insertion) = recovered_pub_keys.insert(recov);
@@ -113,10 +113,8 @@ flat_set<public_key_type> transaction::get_signature_keys( const vector<signatur
);
}

if( use_cache ) {
while ( recovery_cache.size() > recovery_cache_size )
recovery_cache.erase( recovery_cache.begin() );
}
while ( recovery_cache.size() > recovery_cache_size )
recovery_cache.erase( recovery_cache.begin());

return recovered_pub_keys;
} FC_CAPTURE_AND_RETHROW() }
@@ -131,9 +129,10 @@ signature_type signed_transaction::sign(const private_key_type& key, const chain
return key.sign(sig_digest(chain_id, context_free_data));
}

flat_set<public_key_type> signed_transaction::get_signature_keys( const chain_id_type& chain_id, bool allow_duplicate_keys, bool use_cache )const
flat_set<public_key_type> signed_transaction::get_signature_keys( const chain_id_type& chain_id, fc::time_point deadline,
bool allow_duplicate_keys)const
{
return transaction::get_signature_keys(signatures, chain_id, context_free_data, allow_duplicate_keys, use_cache);
return transaction::get_signature_keys(signatures, chain_id, deadline, context_free_data, allow_duplicate_keys);
}

uint32_t packed_transaction::get_unprunable_size()const {
14 changes: 6 additions & 8 deletions libraries/chain/transaction_metadata.cpp
Original file line number Diff line number Diff line change
@@ -14,25 +14,23 @@ const flat_set<public_key_type>& transaction_metadata::recover_keys( const chain
return signing_keys->second;
}
}
signing_keys = std::make_pair( chain_id, trx.get_signature_keys( chain_id ));
signing_keys = std::make_pair( chain_id, trx.get_signature_keys( chain_id, fc::time_point::maximum() ));
}
return signing_keys->second;
}

void transaction_metadata::create_signing_keys_future( const transaction_metadata_ptr& mtrx,
boost::asio::thread_pool& thread_pool, const chain_id_type& chain_id ) {
if( mtrx->signing_keys && mtrx->signing_keys->first == chain_id )
return;

boost::asio::thread_pool& thread_pool, const chain_id_type& chain_id, fc::microseconds timelimit ) {
if( mtrx->signing_keys.valid() ) // already created
return;

std::weak_ptr<transaction_metadata> mtrx_wp = mtrx;
mtrx->signing_keys_future = async_thread_pool( thread_pool, [chain_id, mtrx_wp]() {
mtrx->signing_keys_future = async_thread_pool( thread_pool, [timelimit, chain_id, mtrx_wp]() {
fc::time_point deadline = fc::time_point::now() + timelimit;
auto mtrx = mtrx_wp.lock();
return mtrx ?
std::make_pair( chain_id, mtrx->trx.get_signature_keys( chain_id ) ) :
std::make_pair( chain_id, decltype( mtrx->trx.get_signature_keys( chain_id ) ){} );
std::make_pair( chain_id, mtrx->trx.get_signature_keys( chain_id, deadline ) ) :
std::make_pair( chain_id, decltype( mtrx->trx.get_signature_keys( chain_id, deadline ) ){} );
} );
}

2 changes: 2 additions & 0 deletions plugins/chain_plugin/chain_plugin.cpp
Original file line number Diff line number Diff line change
@@ -738,6 +738,8 @@ void chain_plugin::plugin_shutdown() {
my->accepted_transaction_connection.reset();
my->applied_transaction_connection.reset();
my->accepted_confirmation_connection.reset();
my->chain->get_thread_pool().stop();
my->chain->get_thread_pool().join();
my->chain.reset();
}

2 changes: 1 addition & 1 deletion plugins/mongo_db_plugin/mongo_db_plugin.cpp
Original file line number Diff line number Diff line change
@@ -761,7 +761,7 @@ void mongo_db_plugin_impl::_process_accepted_transaction( const chain::transacti
if( t->signing_keys.valid() ) {
signing_keys_json = fc::json::to_string( t->signing_keys->second );
} else {
auto signing_keys = trx.get_signature_keys( *chain_id, false, false );
auto signing_keys = trx.get_signature_keys( *chain_id, fc::time_point::maximum(), false );
if( !signing_keys.empty() ) {
signing_keys_json = fc::json::to_string( signing_keys );
}
17 changes: 15 additions & 2 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
@@ -131,6 +131,7 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
std::map<chain::account_name, uint32_t> _producer_watermarks;
pending_block_mode _pending_block_mode;
transaction_id_with_expiry_index _persistent_transactions;
fc::optional<boost::asio::thread_pool> _thread_pool;

int32_t _max_transaction_time_ms;
fc::microseconds _max_irreversible_block_age_us;
@@ -347,8 +348,9 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin

void on_incoming_transaction_async(const transaction_metadata_ptr& trx, bool persist_until_expired, next_function<transaction_trace_ptr> next) {
chain::controller& chain = chain_plug->chain();
transaction_metadata::create_signing_keys_future( trx, chain.get_thread_pool(), chain.get_chain_id() );
boost::asio::post( chain.get_thread_pool(), [self = this, trx, persist_until_expired, next]() {
const auto& cfg = chain.get_global_properties().configuration;
transaction_metadata::create_signing_keys_future( trx, *_thread_pool, chain.get_chain_id(), fc::microseconds( cfg.max_transaction_cpu_usage ) );
boost::asio::post( *_thread_pool, [self = this, trx, persist_until_expired, next]() {
if( trx->signing_keys_future.valid() )
trx->signing_keys_future.wait();
app().get_io_service().post( [self, trx, persist_until_expired, next]() {
@@ -541,6 +543,8 @@ void producer_plugin::set_program_options(
"offset of last block producing time in microseconds. Negative number results in blocks to go out sooner, and positive number results in blocks to go out later")
("incoming-defer-ratio", bpo::value<double>()->default_value(1.0),
"ratio between incoming transations and deferred transactions when both are exhausted")
("producer-threads", bpo::value<uint16_t>()->default_value(config::default_controller_thread_pool_size),
"Number of worker threads in producer thread pool")
("snapshots-dir", bpo::value<bfs::path>()->default_value("snapshots"),
"the location of the snapshots directory (absolute path or relative to application data dir)")
;
@@ -673,6 +677,11 @@ void producer_plugin::plugin_initialize(const boost::program_options::variables_

my->_incoming_defer_ratio = options.at("incoming-defer-ratio").as<double>();

auto thread_pool_size = options.at( "producer-threads" ).as<uint16_t>();
EOS_ASSERT( thread_pool_size > 0, plugin_config_exception,
"producer-threads ${num} must be greater than 0", ("num", thread_pool_size));
my->_thread_pool.emplace( thread_pool_size );

if( options.count( "snapshots-dir" )) {
auto sd = options.at( "snapshots-dir" ).as<bfs::path>();
if( sd.is_relative()) {
@@ -774,6 +783,10 @@ void producer_plugin::plugin_shutdown() {
edump((e.to_detail_string()));
}

if( my->_thread_pool ) {
my->_thread_pool->join();
my->_thread_pool->stop();
}
my->_accepted_block_connection.reset();
my->_irreversible_block_connection.reset();
}
2 changes: 1 addition & 1 deletion tests/wallet_tests.cpp
Original file line number Diff line number Diff line change
@@ -173,7 +173,7 @@ BOOST_AUTO_TEST_CASE(wallet_manager_test)
pubkeys.emplace(pkey1.get_public_key());
pubkeys.emplace(pkey2.get_public_key());
trx = wm.sign_transaction(trx, pubkeys, chain_id );
const auto& pks = trx.get_signature_keys(chain_id);
const auto& pks = trx.get_signature_keys(chain_id, fc::time_point::maximum());
BOOST_CHECK_EQUAL(2, pks.size());
BOOST_CHECK(find(pks.cbegin(), pks.cend(), pkey1.get_public_key()) != pks.cend());
BOOST_CHECK(find(pks.cbegin(), pks.cend(), pkey2.get_public_key()) != pks.cend());
8 changes: 4 additions & 4 deletions unittests/api_tests.cpp
Original file line number Diff line number Diff line change
@@ -149,7 +149,7 @@ transaction_trace_ptr CallAction(TESTER& test, T ac, const vector<account_name>&

test.set_transaction_headers(trx);
auto sigs = trx.sign(test.get_private_key(scope[0], "active"), test.control->get_chain_id());
trx.get_signature_keys(test.control->get_chain_id());
trx.get_signature_keys(test.control->get_chain_id(), fc::time_point::maximum());
auto res = test.push_transaction(trx);
BOOST_CHECK_EQUAL(res->receipt->status, transaction_receipt::executed);
test.produce_block();
@@ -173,7 +173,7 @@ transaction_trace_ptr CallFunction(TESTER& test, T ac, const vector<char>& data,

test.set_transaction_headers(trx, test.DEFAULT_EXPIRATION_DELTA);
auto sigs = trx.sign(test.get_private_key(scope[0], "active"), test.control->get_chain_id());
trx.get_signature_keys(test.control->get_chain_id() );
trx.get_signature_keys(test.control->get_chain_id(), fc::time_point::maximum());
auto res = test.push_transaction(trx);
BOOST_CHECK_EQUAL(res->receipt->status, transaction_receipt::executed);
test.produce_block();
@@ -266,7 +266,7 @@ BOOST_FIXTURE_TEST_CASE(action_receipt_tests, TESTER) { try {
trx.actions.push_back(act);
this->set_transaction_headers(trx, this->DEFAULT_EXPIRATION_DELTA);
trx.sign(this->get_private_key(config::system_account_name, "active"), control->get_chain_id());
trx.get_signature_keys(control->get_chain_id() );
trx.get_signature_keys(control->get_chain_id(), fc::time_point::maximum());
auto res = this->push_transaction(trx);
BOOST_CHECK_EQUAL(res->receipt->status, transaction_receipt::executed);
this->produce_block();
@@ -745,7 +745,7 @@ void call_test(TESTER& test, T ac, uint32_t billed_cpu_time_us , uint32_t max_cp
test.set_transaction_headers(trx);
//trx.max_cpu_usage_ms = max_cpu_usage_ms;
auto sigs = trx.sign(test.get_private_key(N(testapi), "active"), test.control->get_chain_id());
trx.get_signature_keys(test.control->get_chain_id() );
trx.get_signature_keys(test.control->get_chain_id(), fc::time_point::maximum());
auto res = test.push_transaction( trx, fc::time_point::now() + fc::milliseconds(max_cpu_usage_ms), billed_cpu_time_us );
BOOST_CHECK_EQUAL(res->receipt->status, transaction_receipt::executed);
test.produce_block();