Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

explore real-time multi-thread recover_key capability #6460

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion libraries/appbase
17 changes: 15 additions & 2 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1727,8 +1727,8 @@ authorization_manager& controller::get_mutable_authorization_manager()
return my->authorization;
}

controller::controller( const controller::config& cfg )
:my( new controller_impl( cfg, *this ) )
controller::controller( const controller::config& cfg, std::shared_ptr<boost::asio::io_service> io_service )
:my( new controller_impl( cfg, *this ) ), io_serv(io_service)
{
}

Expand Down Expand Up @@ -1800,6 +1800,19 @@ transaction_trace_ptr controller::push_transaction( const transaction_metadata_p
return my->push_transaction(trx, deadline, billed_cpu_time_us, billed_cpu_time_us > 0 );
}

void controller::warmup_transaction(transaction_metadata_ptr trx, const std::function<void()> next) {
if (trx->signing_keys || !io_serv || !(my->thread_pool)) {
next();
} else {
boost::asio::post(*my->thread_pool, [this, trx, next, chain_id = my->chain_id]() {
try {
trx->signing_keys = std::make_pair(chain_id, trx->trx.get_signature_keys( chain_id ));
} catch (...) { }
io_serv->post(next);
});
}
}

transaction_trace_ptr controller::push_scheduled_transaction( const transaction_id_type& trxid, fc::time_point deadline, uint32_t billed_cpu_time_us )
{
validate_db_available_size();
Expand Down
8 changes: 7 additions & 1 deletion libraries/chain/include/eosio/chain/controller.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
#pragma once

#include <boost/asio.hpp>

#include <eosio/chain/block_state.hpp>
#include <eosio/chain/trace.hpp>
#include <eosio/chain/genesis_state.hpp>
Expand Down Expand Up @@ -87,7 +90,7 @@ namespace eosio { namespace chain {
incomplete = 3, ///< this is an incomplete block (either being produced by a producer or speculatively produced by a node)
};

controller( const config& cfg );
controller( const config& cfg, std::shared_ptr<boost::asio::io_service> io_service = nullptr);
~controller();

void add_indices();
Expand Down Expand Up @@ -130,6 +133,8 @@ namespace eosio { namespace chain {
*/
transaction_trace_ptr push_transaction( const transaction_metadata_ptr& trx, fc::time_point deadline, uint32_t billed_cpu_time_us = 0 );

void warmup_transaction(transaction_metadata_ptr trx, const std::function<void()> next);

/**
* Attempt to execute a specific transaction in our deferred trx database
*
Expand Down Expand Up @@ -293,6 +298,7 @@ namespace eosio { namespace chain {
chainbase::database& mutable_db()const;

std::unique_ptr<controller_impl> my;
std::shared_ptr<boost::asio::io_service> io_serv;

};

Expand Down
2 changes: 1 addition & 1 deletion plugins/chain_plugin/chain_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ void chain_plugin::plugin_initialize(const variables_map& options) {
my->chain_config->block_validation_mode = options.at("validation-mode").as<validation_mode>();
}

my->chain.emplace( *my->chain_config );
my->chain.emplace( *my->chain_config, app().get_io_service_ptr());
my->chain_id.emplace( my->chain->get_chain_id());

// set up method providers
Expand Down
21 changes: 15 additions & 6 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -342,12 +342,21 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
}
}

std::deque<std::tuple<packed_transaction_ptr, bool, next_function<transaction_trace_ptr>>> _pending_incoming_transactions;
std::deque<std::tuple<packed_transaction_ptr, transaction_metadata_ptr, bool, next_function<transaction_trace_ptr>>> _pending_incoming_transactions;

void on_incoming_transaction_async(const packed_transaction_ptr& trx, bool persist_until_expired, next_function<transaction_trace_ptr> next) {
chain::controller& chain = app().get_plugin<chain_plugin>().chain();
packed_transaction_ptr trx_ptr = trx;
auto trx_meta = std::make_shared<transaction_metadata>(*trx);
chain.warmup_transaction(trx_meta, [this, trx_ptr, trx_meta, persist_until_expired, next]()->void {
on_incoming_transaction_async(trx_ptr, trx_meta, persist_until_expired, next);
});
}

void on_incoming_transaction_async(const packed_transaction_ptr& trx, const transaction_metadata_ptr &trx_meta, bool persist_until_expired, next_function<transaction_trace_ptr> next) {
chain::controller& chain = app().get_plugin<chain_plugin>().chain();
if (!chain.pending_block_state()) {
_pending_incoming_transactions.emplace_back(trx, persist_until_expired, next);
_pending_incoming_transactions.emplace_back(trx, trx_meta, persist_until_expired, next);
return;
}

Expand Down Expand Up @@ -401,10 +410,10 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
}

try {
auto trace = chain.push_transaction(std::make_shared<transaction_metadata>(*trx), deadline);
auto trace = chain.push_transaction(trx_meta, deadline);
if (trace->except) {
if (failure_is_subjective(*trace->except, deadline_is_subjective)) {
_pending_incoming_transactions.emplace_back(trx, persist_until_expired, next);
_pending_incoming_transactions.emplace_back(trx, trx_meta, persist_until_expired, next);
if (_pending_block_mode == pending_block_mode::producing) {
fc_dlog(_trx_trace_log, "[TRX_TRACE] Block ${block_num} for producer ${prod} COULD NOT FIT, tx: ${txid} RETRYING ",
("block_num", chain.head_block_num() + 1)
Expand Down Expand Up @@ -1242,7 +1251,7 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block(bool
_pending_incoming_transactions.pop_front();
--orig_pending_txn_size;
_incoming_trx_weight -= 1.0;
on_incoming_transaction_async(std::get<0>(e), std::get<1>(e), std::get<2>(e));
on_incoming_transaction_async(std::get<0>(e), std::get<1>(e), std::get<2>(e), std::get<3>(e));
}

if (block_time <= fc::time_point::now()) {
Expand Down Expand Up @@ -1305,7 +1314,7 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block(bool
auto e = _pending_incoming_transactions.front();
_pending_incoming_transactions.pop_front();
--orig_pending_txn_size;
on_incoming_transaction_async(std::get<0>(e), std::get<1>(e), std::get<2>(e));
on_incoming_transaction_async(std::get<0>(e), std::get<1>(e), std::get<2>(e), std::get<3>(e));
if (block_time <= fc::time_point::now()) return start_block_result::exhausted;
}
}
Expand Down
50 changes: 36 additions & 14 deletions plugins/txn_test_gen_plugin/txn_test_gen_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,19 +87,38 @@ using namespace eosio::chain;
api_handle->call_name(vs.at(0).as<in_param0>(), vs.at(1).as<in_param1>(), result_handler);

struct txn_test_gen_plugin_impl {
static void push_next_transaction(const std::shared_ptr<std::vector<signed_transaction>>& trxs, size_t index, const std::function<void(const fc::exception_ptr&)>& next ) {

uint64_t _total_us = 0;
uint64_t _txcount = 0;

int _remain = 0;

void push_next_transaction(const std::shared_ptr<std::vector<signed_transaction>>& trxs, size_t index, const std::function<void(const fc::exception_ptr&)>& next ) {
chain_plugin& cp = app().get_plugin<chain_plugin>();
cp.accept_transaction( packed_transaction(trxs->at(index)), [=](const fc::static_variant<fc::exception_ptr, transaction_trace_ptr>& result){
if (result.contains<fc::exception_ptr>()) {
next(result.get<fc::exception_ptr>());
} else {
if (index + 1 < trxs->size()) {
push_next_transaction(trxs, index + 1, next);

const int overlap = 20;
int end = std::min(index + overlap, trxs->size());
_remain = end - index;
for (int i = index; i < end; ++i) {
cp.accept_transaction( packed_transaction(trxs->at(i)), [=](const fc::static_variant<fc::exception_ptr, transaction_trace_ptr>& result){
if (result.contains<fc::exception_ptr>()) {
next(result.get<fc::exception_ptr>());
} else {
next(nullptr);
if (result.contains<transaction_trace_ptr>() && result.get<transaction_trace_ptr>()->receipt) {
_total_us += result.get<transaction_trace_ptr>()->receipt->cpu_usage_us;
++_txcount;
}
--_remain;
if (_remain == 0 ) {
if (end < trxs->size()) {
push_next_transaction(trxs, index + overlap, next);
} else {
next(nullptr);
}
}
}
}
});
});
}
}

void push_transactions( std::vector<signed_transaction>&& trxs, const std::function<void(fc::exception_ptr)>& next ) {
Expand Down Expand Up @@ -295,13 +314,11 @@ struct txn_test_gen_plugin_impl {
try {
controller& cc = app().get_plugin<chain_plugin>().chain();
auto chainid = app().get_plugin<chain_plugin>().get_chain_id();
auto abi_serializer_max_time = app().get_plugin<chain_plugin>().get_abi_serializer_max_time();

fc::crypto::private_key a_priv_key = fc::crypto::private_key::regenerate(fc::sha256(std::string(64, 'a')));
fc::crypto::private_key b_priv_key = fc::crypto::private_key::regenerate(fc::sha256(std::string(64, 'b')));
static fc::crypto::private_key a_priv_key = fc::crypto::private_key::regenerate(fc::sha256(std::string(64, 'a')));
static fc::crypto::private_key b_priv_key = fc::crypto::private_key::regenerate(fc::sha256(std::string(64, 'b')));

static uint64_t nonce = static_cast<uint64_t>(fc::time_point::now().sec_since_epoch()) << 32;
abi_serializer eosio_serializer(cc.db().find<account_object, by_name>(config::system_account_name)->get_abi(), abi_serializer_max_time);

uint32_t reference_block_num = cc.last_irreversible_block_num();
if (txn_reference_block_lag >= 0) {
Expand Down Expand Up @@ -351,6 +368,11 @@ struct txn_test_gen_plugin_impl {
timer.cancel();
running = false;
ilog("Stopping transaction generation test");

if (_txcount) {
ilog("${d} transactions executed, ${t}us / transaction", ("d", _txcount)("t", _total_us / (double)_txcount));
_txcount = _total_us = 0;
}
}

boost::asio::high_resolution_timer timer{app().get_io_service()};
Expand Down