Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Merge pull request #7304 from EOSIO/trx-meta-thread
Browse files Browse the repository at this point in the history
transaction_metadata thread safety
  • Loading branch information
heifner authored May 9, 2019
2 parents a303c72 + 2a26f5e commit 3bdec33
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 116 deletions.
32 changes: 15 additions & 17 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ struct controller_impl {
if ( read_mode == db_read_mode::SPECULATIVE ) {
EOS_ASSERT( head->block, block_validate_exception, "attempting to pop a block that was sparsely loaded from a snapshot");
for( const auto& t : head->trxs )
unapplied_transactions[t->signed_id] = t;
unapplied_transactions[t->signed_id()] = t;
}

head = prev;
Expand Down Expand Up @@ -1258,8 +1258,10 @@ struct controller_impl {
auto start = fc::time_point::now();
const bool check_auth = !self.skip_auth_check() && !trx->implicit;
// call recover keys so that trx->sig_cpu_usage is set correctly
const fc::microseconds sig_cpu_usage = check_auth ? std::get<0>( trx->recover_keys( chain_id ) ) : fc::microseconds();
const flat_set<public_key_type>& recovered_keys = check_auth ? std::get<1>( trx->recover_keys( chain_id ) ) : flat_set<public_key_type>();
const fc::microseconds sig_cpu_usage =
check_auth ? std::get<0>( trx->recover_keys( chain_id ) ) : fc::microseconds();
const std::shared_ptr<flat_set<public_key_type>> recovered_keys =
check_auth ? std::get<1>( trx->recover_keys( chain_id ) ) : nullptr;
if( !explicit_billed_cpu_time ) {
fc::microseconds already_consumed_time( EOS_PERCENT(sig_cpu_usage.count(), conf.sig_cpu_bill_pct) );

Expand All @@ -1270,8 +1272,8 @@ struct controller_impl {
}
}

const signed_transaction& trn = trx->packed_trx->get_signed_transaction();
transaction_context trx_context(self, trn, trx->id, start);
const signed_transaction& trn = trx->packed_trx()->get_signed_transaction();
transaction_context trx_context(self, trn, trx->id(), start);
if ((bool)subjective_cpu_leeway && pending->_block_status == controller::block_status::incomplete) {
trx_context.leeway = *subjective_cpu_leeway;
}
Expand All @@ -1285,17 +1287,18 @@ struct controller_impl {
trx_context.enforce_whiteblacklist = false;
} else {
bool skip_recording = replay_head_time && (time_point(trn.expiration) <= *replay_head_time);
trx_context.init_for_input_trx( trx->packed_trx->get_unprunable_size(),
trx->packed_trx->get_prunable_size(),
trx_context.init_for_input_trx( trx->packed_trx()->get_unprunable_size(),
trx->packed_trx()->get_prunable_size(),
skip_recording);
}

trx_context.delay = fc::seconds(trn.delay_sec);

if( check_auth ) {
EOS_ASSERT( recovered_keys, missing_auth_exception, "recovered_keys should never be null" );
authorization.check_authorization(
trn.actions,
recovered_keys,
*recovered_keys,
{},
trx_context.delay,
[&trx_context](){ trx_context.checktime(); },
Expand All @@ -1311,7 +1314,7 @@ struct controller_impl {
transaction_receipt::status_enum s = (trx_context.delay == fc::seconds(0))
? transaction_receipt::executed
: transaction_receipt::delayed;
trace->receipt = push_receipt(*trx->packed_trx, s, trx_context.billed_cpu_time_us, trace->net_usage);
trace->receipt = push_receipt(*trx->packed_trx(), s, trx_context.billed_cpu_time_us, trace->net_usage);
pending->_block_stage.get<building_block>()._pending_trx_metas.emplace_back(trx);
} else {
transaction_receipt_header r;
Expand Down Expand Up @@ -1341,7 +1344,7 @@ struct controller_impl {
}

if (!trx->implicit) {
unapplied_transactions.erase( trx->signed_id );
unapplied_transactions.erase( trx->signed_id() );
}
return trace;
} catch( const disallowed_transaction_extensions_bad_block_exception& ) {
Expand All @@ -1355,7 +1358,7 @@ struct controller_impl {
}

if (!failure_is_subjective(*trace->except)) {
unapplied_transactions.erase( trx->signed_id );
unapplied_transactions.erase( trx->signed_id() );
}

emit( self.accepted_transaction, trx );
Expand Down Expand Up @@ -1944,18 +1947,13 @@ struct controller_impl {
if( pending ) {
if ( read_mode == db_read_mode::SPECULATIVE ) {
for( const auto& t : pending->get_trx_metas() )
unapplied_transactions[t->signed_id] = t;
unapplied_transactions[t->signed_id()] = t;
}
pending.reset();
protocol_features.popped_blocks_to( head->block_num );
}
}


bool should_enforce_runtime_limits()const {
return false;
}

checksum256_type calculate_action_merkle() {
vector<digest_type> action_digests;
const auto& actions = pending->_block_stage.get<building_block>()._actions;
Expand Down
51 changes: 28 additions & 23 deletions libraries/chain/include/eosio/chain/transaction_metadata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,26 @@ namespace eosio { namespace chain {

class transaction_metadata;
using transaction_metadata_ptr = std::shared_ptr<transaction_metadata>;
using signing_keys_future_value_type = std::tuple<chain_id_type, fc::microseconds, flat_set<public_key_type>>;
using signing_keys_future_value_type = std::tuple<chain_id_type, fc::microseconds, std::shared_ptr<flat_set<public_key_type>>>;
using signing_keys_future_type = std::shared_future<signing_keys_future_value_type>;
using recovery_keys_type = std::pair<fc::microseconds, const flat_set<public_key_type>&>;
using recovery_keys_type = std::pair<fc::microseconds, std::shared_ptr<flat_set<public_key_type>>>;

/**
* This data structure should store context-free cached data about a transaction such as
* packed/unpacked/compressed and recovered keys
*/
class transaction_metadata {
private:
const packed_transaction_ptr _packed_trx;
const transaction_id_type _id;
const transaction_id_type _signed_id;
mutable std::mutex _signing_keys_future_mtx;
mutable signing_keys_future_type _signing_keys_future;

public:
transaction_id_type id;
transaction_id_type signed_id;
packed_transaction_ptr packed_trx;
signing_keys_future_type signing_keys_future;
bool accepted = false;
bool implicit = false;
bool scheduled = false;
bool accepted = false; // not thread safe
bool implicit = false; // not thread safe
bool scheduled = false; // not thread safe

transaction_metadata() = delete;
transaction_metadata(const transaction_metadata&) = delete;
Expand All @@ -40,28 +43,30 @@ class transaction_metadata {
transaction_metadata operator=(transaction_metadata&&) = delete;

explicit transaction_metadata( const signed_transaction& t, packed_transaction::compression_type c = packed_transaction::none )
:id(t.id()), packed_trx(std::make_shared<packed_transaction>(t, c)) {
//raw_packed = fc::raw::pack( static_cast<const transaction&>(trx) );
signed_id = digest_type::hash(*packed_trx);
: _packed_trx( std::make_shared<packed_transaction>( t, c ) )
, _id( t.id() )
, _signed_id( digest_type::hash( *_packed_trx ) ) {
}

explicit transaction_metadata( const packed_transaction_ptr& ptrx )
:id(ptrx->id()), packed_trx(ptrx) {
//raw_packed = fc::raw::pack( static_cast<const transaction&>(trx) );
signed_id = digest_type::hash(*packed_trx);
: _packed_trx( ptrx )
, _id( ptrx->id() )
, _signed_id( digest_type::hash( *_packed_trx ) ) {
}

// must be called from main application thread. signing_keys_future must be accessed only from main application thread.
// next() should only be called on main application thread after future is valid, to avoid dependency on appbase,
// it is up to the caller to have next() post to the application thread which makes sure future is only accessed from
// application thread and that assignment to future in this method has completed.
static signing_keys_future_type
start_recover_keys( const transaction_metadata_ptr& mtrx, boost::asio::io_context& thread_pool,
const packed_transaction_ptr& packed_trx()const { return _packed_trx; }
const transaction_id_type& id()const { return _id; }
const transaction_id_type& signed_id()const { return _signed_id; }

// can be called from any thread. It is recommended that next() immediately post to application thread for
// future processing since next() will be called from the thread_pool.
static void start_recover_keys( const transaction_metadata_ptr& mtrx, boost::asio::io_context& thread_pool,
const chain_id_type& chain_id, fc::microseconds time_limit,
std::function<void()> next = std::function<void()>() );

// start_recover_keys must be called first
recovery_keys_type recover_keys( const chain_id_type& chain_id );
// start_recover_keys can be called first to begin key recovery
// if time_limit of start_recover_keys exceeded (or any other exception) then this can throw
recovery_keys_type recover_keys( const chain_id_type& chain_id ) const;
};

} } // eosio::chain
49 changes: 27 additions & 22 deletions libraries/chain/transaction_metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,49 +4,54 @@

namespace eosio { namespace chain {

recovery_keys_type transaction_metadata::recover_keys( const chain_id_type& chain_id ) {
recovery_keys_type transaction_metadata::recover_keys( const chain_id_type& chain_id ) const {
// Unlikely for more than one chain_id to be used in one nodeos instance
if( signing_keys_future.valid() ) {
const std::tuple<chain_id_type, fc::microseconds, flat_set<public_key_type>>& sig_keys = signing_keys_future.get();
std::unique_lock<std::mutex> g( _signing_keys_future_mtx );
if( _signing_keys_future.valid() ) {
const signing_keys_future_value_type& sig_keys = _signing_keys_future.get();
if( std::get<0>( sig_keys ) == chain_id ) {
return std::make_pair( std::get<1>( sig_keys ), std::cref( std::get<2>( sig_keys ) ) );
return std::make_pair( std::get<1>( sig_keys ), std::get<2>( sig_keys ) );
}
}

g.unlock();
// shared_keys_future not created or different chain_id
auto recovered_pub_keys = std::make_shared<flat_set<public_key_type>>();
const signed_transaction& trn = _packed_trx->get_signed_transaction();
fc::microseconds cpu_usage = trn.get_signature_keys( chain_id, fc::time_point::maximum(), *recovered_pub_keys );

g.lock();
std::promise<signing_keys_future_value_type> p;
flat_set<public_key_type> recovered_pub_keys;
const signed_transaction& trn = packed_trx->get_signed_transaction();
fc::microseconds cpu_usage = trn.get_signature_keys( chain_id, fc::time_point::maximum(), recovered_pub_keys );
p.set_value( std::make_tuple( chain_id, cpu_usage, std::move( recovered_pub_keys ) ) );
signing_keys_future = p.get_future().share();
_signing_keys_future = p.get_future().share();

const std::tuple<chain_id_type, fc::microseconds, flat_set<public_key_type>>& sig_keys = signing_keys_future.get();
return std::make_pair( std::get<1>( sig_keys ), std::cref( std::get<2>( sig_keys ) ) );
const signing_keys_future_value_type& sig_keys = _signing_keys_future.get();
return std::make_pair( std::get<1>( sig_keys ), std::get<2>( sig_keys ) );
}

signing_keys_future_type transaction_metadata::start_recover_keys( const transaction_metadata_ptr& mtrx,
boost::asio::io_context& thread_pool,
const chain_id_type& chain_id,
fc::microseconds time_limit,
std::function<void()> next )
void transaction_metadata::start_recover_keys( const transaction_metadata_ptr& mtrx,
boost::asio::io_context& thread_pool,
const chain_id_type& chain_id,
fc::microseconds time_limit,
std::function<void()> next )
{
if( mtrx->signing_keys_future.valid() && std::get<0>( mtrx->signing_keys_future.get() ) == chain_id ) { // already created
std::unique_lock<std::mutex> g( mtrx->_signing_keys_future_mtx );
if( mtrx->_signing_keys_future.valid() && std::get<0>( mtrx->_signing_keys_future.get() ) == chain_id ) { // already created
g.unlock();
if( next ) next();
return mtrx->signing_keys_future;
}

mtrx->signing_keys_future = async_thread_pool( thread_pool, [time_limit, chain_id, mtrx, next{std::move(next)}]() {
mtrx->_signing_keys_future = async_thread_pool( thread_pool, [time_limit, chain_id, mtrx, next{std::move(next)}]() {
fc::time_point deadline = time_limit == fc::microseconds::maximum() ?
fc::time_point::maximum() : fc::time_point::now() + time_limit;
flat_set<public_key_type> recovered_pub_keys;
const signed_transaction& trn = mtrx->packed_trx->get_signed_transaction();
fc::microseconds cpu_usage = trn.get_signature_keys( chain_id, deadline, recovered_pub_keys );

auto recovered_pub_keys = std::make_shared<flat_set<public_key_type>>();
const signed_transaction& trn = mtrx->_packed_trx->get_signed_transaction();
fc::microseconds cpu_usage = trn.get_signature_keys( chain_id, deadline, *recovered_pub_keys );
if( next ) next();
return std::make_tuple( chain_id, cpu_usage, std::move( recovered_pub_keys ));
} );

return mtrx->signing_keys_future;
}


Expand Down
16 changes: 6 additions & 10 deletions plugins/mongo_db_plugin/mongo_db_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,7 @@ void mongo_db_plugin_impl::_process_accepted_transaction( const chain::transacti
using bsoncxx::builder::basic::make_array;
namespace bbb = bsoncxx::builder::basic;

const signed_transaction& trx = t->packed_trx->get_signed_transaction();
const signed_transaction& trx = t->packed_trx()->get_signed_transaction();

if( !filter_include( trx ) ) return;

Expand All @@ -755,7 +755,7 @@ void mongo_db_plugin_impl::_process_accepted_transaction( const chain::transacti
auto now = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::microseconds{fc::time_point::now().time_since_epoch().count()} );

const auto& trx_id = t->id;
const auto& trx_id = t->id();
const auto trx_id_str = trx_id.str();

trans_doc.append( kvp( "trx_id", trx_id_str ) );
Expand All @@ -771,14 +771,10 @@ void mongo_db_plugin_impl::_process_accepted_transaction( const chain::transacti
}

fc::variant signing_keys;
if( t->signing_keys_future.valid() ) {
signing_keys = std::get<2>( t->signing_keys_future.get() );
} else {
flat_set<public_key_type> keys;
trx.get_signature_keys( *chain_id, fc::time_point::maximum(), keys, false );
if( !keys.empty() ) {
signing_keys = keys;
}
std::shared_ptr<flat_set<public_key_type>> keys;
std::tie( std::ignore, keys ) = t->recover_keys( *chain_id );
if( !keys->empty() ) {
signing_keys = *keys;
}

if( signing_keys.get_type() == fc::variant::array_type && signing_keys.get_array().size() > 0) {
Expand Down
16 changes: 8 additions & 8 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1888,8 +1888,8 @@ namespace eosio {
}

void dispatch_manager::bcast_transaction(const transaction_metadata_ptr& ptrx) {
const auto& id = ptrx->id;
const packed_transaction& trx = *ptrx->packed_trx;
const auto& id = ptrx->id();
const packed_transaction& trx = *ptrx->packed_trx();
time_point_sec trx_expiration = trx.expiration();
node_transaction_state nts = {id, trx_expiration, 0, 0};

Expand All @@ -1915,7 +1915,7 @@ namespace eosio {
}

void dispatch_manager::recv_transaction(const connection_ptr& c, const transaction_metadata_ptr& txn) {
node_transaction_state nts = {txn->id, txn->packed_trx->expiration(), 0, c->connection_id};
node_transaction_state nts = {txn->id(), txn->packed_trx()->expiration(), 0, c->connection_id};
add_peer_txn( nts );
}

Expand Down Expand Up @@ -2699,7 +2699,7 @@ namespace eosio {
}

auto ptrx = std::make_shared<transaction_metadata>( trx );
const auto& tid = ptrx->id;
const auto& tid = ptrx->id();
peer_dlog( this, "received packed_transaction ${id}", ("id", tid) );

bool have_trx = my_impl->dispatcher->have_txn( tid );
Expand All @@ -2709,14 +2709,14 @@ namespace eosio {
return;
}

trx_in_progress_size += calc_trx_size( ptrx->packed_trx );
trx_in_progress_size += calc_trx_size( ptrx->packed_trx() );
connection_wptr weak = shared_from_this();
my_impl->chain_plug->accept_transaction( ptrx,
[weak{std::move(weak)}, ptrx](const static_variant<fc::exception_ptr, transaction_trace_ptr>& result) {
// next (this lambda) called from application thread
connection_ptr conn = weak.lock();
if( conn ) {
conn->trx_in_progress_size -= calc_trx_size( ptrx->packed_trx );
conn->trx_in_progress_size -= calc_trx_size( ptrx->packed_trx() );
}
bool accepted = false;
if (result.contains<fc::exception_ptr>()) {
Expand All @@ -2740,7 +2740,7 @@ namespace eosio {
if( accepted ) {
my_impl->dispatcher->bcast_transaction( ptrx );
} else {
my_impl->dispatcher->rejected_transaction( ptrx->id, head_blk_num );
my_impl->dispatcher->rejected_transaction( ptrx->id(), head_blk_num );
}
});
});
Expand Down Expand Up @@ -2956,7 +2956,7 @@ namespace eosio {

// called from application thread
void net_plugin_impl::transaction_ack(const std::pair<fc::exception_ptr, transaction_metadata_ptr>& results) {
const auto& id = results.second->id;
const auto& id = results.second->id();
if (results.first) {
fc_dlog( logger, "signaled NACK, trx-id = ${id} : ${why}", ("id", id)( "why", results.first->to_detail_string() ) );

Expand Down
Loading

0 comments on commit 3bdec33

Please sign in to comment.