Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

transaction_metadata thread safety #7304

Merged
merged 6 commits into from
May 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 15 additions & 17 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ struct controller_impl {
if ( read_mode == db_read_mode::SPECULATIVE ) {
EOS_ASSERT( head->block, block_validate_exception, "attempting to pop a block that was sparsely loaded from a snapshot");
for( const auto& t : head->trxs )
unapplied_transactions[t->signed_id] = t;
unapplied_transactions[t->signed_id()] = t;
}

head = prev;
Expand Down Expand Up @@ -1258,8 +1258,10 @@ struct controller_impl {
auto start = fc::time_point::now();
const bool check_auth = !self.skip_auth_check() && !trx->implicit;
// call recover keys so that trx->sig_cpu_usage is set correctly
const fc::microseconds sig_cpu_usage = check_auth ? std::get<0>( trx->recover_keys( chain_id ) ) : fc::microseconds();
const flat_set<public_key_type>& recovered_keys = check_auth ? std::get<1>( trx->recover_keys( chain_id ) ) : flat_set<public_key_type>();
const fc::microseconds sig_cpu_usage =
check_auth ? std::get<0>( trx->recover_keys( chain_id ) ) : fc::microseconds();
const std::shared_ptr<flat_set<public_key_type>> recovered_keys =
check_auth ? std::get<1>( trx->recover_keys( chain_id ) ) : nullptr;
if( !explicit_billed_cpu_time ) {
fc::microseconds already_consumed_time( EOS_PERCENT(sig_cpu_usage.count(), conf.sig_cpu_bill_pct) );

Expand All @@ -1270,8 +1272,8 @@ struct controller_impl {
}
}

const signed_transaction& trn = trx->packed_trx->get_signed_transaction();
transaction_context trx_context(self, trn, trx->id, start);
const signed_transaction& trn = trx->packed_trx()->get_signed_transaction();
transaction_context trx_context(self, trn, trx->id(), start);
if ((bool)subjective_cpu_leeway && pending->_block_status == controller::block_status::incomplete) {
trx_context.leeway = *subjective_cpu_leeway;
}
Expand All @@ -1285,17 +1287,18 @@ struct controller_impl {
trx_context.enforce_whiteblacklist = false;
} else {
bool skip_recording = replay_head_time && (time_point(trn.expiration) <= *replay_head_time);
trx_context.init_for_input_trx( trx->packed_trx->get_unprunable_size(),
trx->packed_trx->get_prunable_size(),
trx_context.init_for_input_trx( trx->packed_trx()->get_unprunable_size(),
trx->packed_trx()->get_prunable_size(),
skip_recording);
}

trx_context.delay = fc::seconds(trn.delay_sec);

if( check_auth ) {
EOS_ASSERT( recovered_keys, missing_auth_exception, "recovered_keys should never be null" );
authorization.check_authorization(
trn.actions,
recovered_keys,
*recovered_keys,
{},
trx_context.delay,
[&trx_context](){ trx_context.checktime(); },
Expand All @@ -1311,7 +1314,7 @@ struct controller_impl {
transaction_receipt::status_enum s = (trx_context.delay == fc::seconds(0))
? transaction_receipt::executed
: transaction_receipt::delayed;
trace->receipt = push_receipt(*trx->packed_trx, s, trx_context.billed_cpu_time_us, trace->net_usage);
trace->receipt = push_receipt(*trx->packed_trx(), s, trx_context.billed_cpu_time_us, trace->net_usage);
pending->_block_stage.get<building_block>()._pending_trx_metas.emplace_back(trx);
} else {
transaction_receipt_header r;
Expand Down Expand Up @@ -1341,7 +1344,7 @@ struct controller_impl {
}

if (!trx->implicit) {
unapplied_transactions.erase( trx->signed_id );
unapplied_transactions.erase( trx->signed_id() );
}
return trace;
} catch( const disallowed_transaction_extensions_bad_block_exception& ) {
Expand All @@ -1355,7 +1358,7 @@ struct controller_impl {
}

if (!failure_is_subjective(*trace->except)) {
unapplied_transactions.erase( trx->signed_id );
unapplied_transactions.erase( trx->signed_id() );
}

emit( self.accepted_transaction, trx );
Expand Down Expand Up @@ -1944,18 +1947,13 @@ struct controller_impl {
if( pending ) {
if ( read_mode == db_read_mode::SPECULATIVE ) {
for( const auto& t : pending->get_trx_metas() )
unapplied_transactions[t->signed_id] = t;
unapplied_transactions[t->signed_id()] = t;
}
pending.reset();
protocol_features.popped_blocks_to( head->block_num );
}
}


bool should_enforce_runtime_limits()const {
return false;
}

checksum256_type calculate_action_merkle() {
vector<digest_type> action_digests;
const auto& actions = pending->_block_stage.get<building_block>()._actions;
Expand Down
51 changes: 28 additions & 23 deletions libraries/chain/include/eosio/chain/transaction_metadata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,26 @@ namespace eosio { namespace chain {

class transaction_metadata;
using transaction_metadata_ptr = std::shared_ptr<transaction_metadata>;
using signing_keys_future_value_type = std::tuple<chain_id_type, fc::microseconds, flat_set<public_key_type>>;
using signing_keys_future_value_type = std::tuple<chain_id_type, fc::microseconds, std::shared_ptr<flat_set<public_key_type>>>;
using signing_keys_future_type = std::shared_future<signing_keys_future_value_type>;
using recovery_keys_type = std::pair<fc::microseconds, const flat_set<public_key_type>&>;
using recovery_keys_type = std::pair<fc::microseconds, std::shared_ptr<flat_set<public_key_type>>>;

/**
* This data structure should store context-free cached data about a transaction such as
* packed/unpacked/compressed and recovered keys
*/
class transaction_metadata {
private:
const packed_transaction_ptr _packed_trx;
const transaction_id_type _id;
const transaction_id_type _signed_id;
mutable std::mutex _signing_keys_future_mtx;
mutable signing_keys_future_type _signing_keys_future;

public:
transaction_id_type id;
transaction_id_type signed_id;
packed_transaction_ptr packed_trx;
signing_keys_future_type signing_keys_future;
bool accepted = false;
bool implicit = false;
bool scheduled = false;
bool accepted = false; // not thread safe
bool implicit = false; // not thread safe
bool scheduled = false; // not thread safe

transaction_metadata() = delete;
transaction_metadata(const transaction_metadata&) = delete;
Expand All @@ -40,28 +43,30 @@ class transaction_metadata {
transaction_metadata operator=(transaction_metadata&&) = delete;

explicit transaction_metadata( const signed_transaction& t, packed_transaction::compression_type c = packed_transaction::none )
:id(t.id()), packed_trx(std::make_shared<packed_transaction>(t, c)) {
//raw_packed = fc::raw::pack( static_cast<const transaction&>(trx) );
signed_id = digest_type::hash(*packed_trx);
: _packed_trx( std::make_shared<packed_transaction>( t, c ) )
, _id( t.id() )
, _signed_id( digest_type::hash( *_packed_trx ) ) {
}

explicit transaction_metadata( const packed_transaction_ptr& ptrx )
:id(ptrx->id()), packed_trx(ptrx) {
//raw_packed = fc::raw::pack( static_cast<const transaction&>(trx) );
signed_id = digest_type::hash(*packed_trx);
: _packed_trx( ptrx )
, _id( ptrx->id() )
, _signed_id( digest_type::hash( *_packed_trx ) ) {
}

// must be called from main application thread. signing_keys_future must be accessed only from main application thread.
// next() should only be called on main application thread after future is valid, to avoid dependency on appbase,
// it is up to the caller to have next() post to the application thread which makes sure future is only accessed from
// application thread and that assignment to future in this method has completed.
static signing_keys_future_type
start_recover_keys( const transaction_metadata_ptr& mtrx, boost::asio::io_context& thread_pool,
const packed_transaction_ptr& packed_trx()const { return _packed_trx; }
const transaction_id_type& id()const { return _id; }
const transaction_id_type& signed_id()const { return _signed_id; }

// can be called from any thread. It is recommended that next() immediately post to application thread for
// future processing since next() will be called from the thread_pool.
static void start_recover_keys( const transaction_metadata_ptr& mtrx, boost::asio::io_context& thread_pool,
const chain_id_type& chain_id, fc::microseconds time_limit,
std::function<void()> next = std::function<void()>() );

// start_recover_keys must be called first
recovery_keys_type recover_keys( const chain_id_type& chain_id );
// start_recover_keys can be called first to begin key recovery
// if time_limit of start_recover_keys exceeded (or any other exception) then this can throw
recovery_keys_type recover_keys( const chain_id_type& chain_id ) const;
};

} } // eosio::chain
49 changes: 27 additions & 22 deletions libraries/chain/transaction_metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,49 +4,54 @@

namespace eosio { namespace chain {

recovery_keys_type transaction_metadata::recover_keys( const chain_id_type& chain_id ) {
recovery_keys_type transaction_metadata::recover_keys( const chain_id_type& chain_id ) const {
// Unlikely for more than one chain_id to be used in one nodeos instance
if( signing_keys_future.valid() ) {
const std::tuple<chain_id_type, fc::microseconds, flat_set<public_key_type>>& sig_keys = signing_keys_future.get();
std::unique_lock<std::mutex> g( _signing_keys_future_mtx );
if( _signing_keys_future.valid() ) {
const signing_keys_future_value_type& sig_keys = _signing_keys_future.get();
if( std::get<0>( sig_keys ) == chain_id ) {
return std::make_pair( std::get<1>( sig_keys ), std::cref( std::get<2>( sig_keys ) ) );
return std::make_pair( std::get<1>( sig_keys ), std::get<2>( sig_keys ) );
}
}

g.unlock();
// shared_keys_future not created or different chain_id
auto recovered_pub_keys = std::make_shared<flat_set<public_key_type>>();
const signed_transaction& trn = _packed_trx->get_signed_transaction();
fc::microseconds cpu_usage = trn.get_signature_keys( chain_id, fc::time_point::maximum(), *recovered_pub_keys );

g.lock();
std::promise<signing_keys_future_value_type> p;
flat_set<public_key_type> recovered_pub_keys;
const signed_transaction& trn = packed_trx->get_signed_transaction();
fc::microseconds cpu_usage = trn.get_signature_keys( chain_id, fc::time_point::maximum(), recovered_pub_keys );
p.set_value( std::make_tuple( chain_id, cpu_usage, std::move( recovered_pub_keys ) ) );
signing_keys_future = p.get_future().share();
_signing_keys_future = p.get_future().share();

const std::tuple<chain_id_type, fc::microseconds, flat_set<public_key_type>>& sig_keys = signing_keys_future.get();
return std::make_pair( std::get<1>( sig_keys ), std::cref( std::get<2>( sig_keys ) ) );
const signing_keys_future_value_type& sig_keys = _signing_keys_future.get();
return std::make_pair( std::get<1>( sig_keys ), std::get<2>( sig_keys ) );
}

signing_keys_future_type transaction_metadata::start_recover_keys( const transaction_metadata_ptr& mtrx,
boost::asio::io_context& thread_pool,
const chain_id_type& chain_id,
fc::microseconds time_limit,
std::function<void()> next )
void transaction_metadata::start_recover_keys( const transaction_metadata_ptr& mtrx,
boost::asio::io_context& thread_pool,
const chain_id_type& chain_id,
fc::microseconds time_limit,
std::function<void()> next )
{
if( mtrx->signing_keys_future.valid() && std::get<0>( mtrx->signing_keys_future.get() ) == chain_id ) { // already created
std::unique_lock<std::mutex> g( mtrx->_signing_keys_future_mtx );
if( mtrx->_signing_keys_future.valid() && std::get<0>( mtrx->_signing_keys_future.get() ) == chain_id ) { // already created
g.unlock();
if( next ) next();
return mtrx->signing_keys_future;
}

mtrx->signing_keys_future = async_thread_pool( thread_pool, [time_limit, chain_id, mtrx, next{std::move(next)}]() {
mtrx->_signing_keys_future = async_thread_pool( thread_pool, [time_limit, chain_id, mtrx, next{std::move(next)}]() {
fc::time_point deadline = time_limit == fc::microseconds::maximum() ?
fc::time_point::maximum() : fc::time_point::now() + time_limit;
flat_set<public_key_type> recovered_pub_keys;
const signed_transaction& trn = mtrx->packed_trx->get_signed_transaction();
fc::microseconds cpu_usage = trn.get_signature_keys( chain_id, deadline, recovered_pub_keys );

auto recovered_pub_keys = std::make_shared<flat_set<public_key_type>>();
const signed_transaction& trn = mtrx->_packed_trx->get_signed_transaction();
fc::microseconds cpu_usage = trn.get_signature_keys( chain_id, deadline, *recovered_pub_keys );
if( next ) next();
return std::make_tuple( chain_id, cpu_usage, std::move( recovered_pub_keys ));
} );

return mtrx->signing_keys_future;
}


Expand Down
16 changes: 6 additions & 10 deletions plugins/mongo_db_plugin/mongo_db_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,7 @@ void mongo_db_plugin_impl::_process_accepted_transaction( const chain::transacti
using bsoncxx::builder::basic::make_array;
namespace bbb = bsoncxx::builder::basic;

const signed_transaction& trx = t->packed_trx->get_signed_transaction();
const signed_transaction& trx = t->packed_trx()->get_signed_transaction();

if( !filter_include( trx ) ) return;

Expand All @@ -755,7 +755,7 @@ void mongo_db_plugin_impl::_process_accepted_transaction( const chain::transacti
auto now = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::microseconds{fc::time_point::now().time_since_epoch().count()} );

const auto& trx_id = t->id;
const auto& trx_id = t->id();
const auto trx_id_str = trx_id.str();

trans_doc.append( kvp( "trx_id", trx_id_str ) );
Expand All @@ -771,14 +771,10 @@ void mongo_db_plugin_impl::_process_accepted_transaction( const chain::transacti
}

fc::variant signing_keys;
if( t->signing_keys_future.valid() ) {
signing_keys = std::get<2>( t->signing_keys_future.get() );
} else {
flat_set<public_key_type> keys;
trx.get_signature_keys( *chain_id, fc::time_point::maximum(), keys, false );
if( !keys.empty() ) {
signing_keys = keys;
}
std::shared_ptr<flat_set<public_key_type>> keys;
std::tie( std::ignore, keys ) = t->recover_keys( *chain_id );
if( !keys->empty() ) {
signing_keys = *keys;
}

if( signing_keys.get_type() == fc::variant::array_type && signing_keys.get_array().size() > 0) {
Expand Down
16 changes: 8 additions & 8 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1888,8 +1888,8 @@ namespace eosio {
}

void dispatch_manager::bcast_transaction(const transaction_metadata_ptr& ptrx) {
const auto& id = ptrx->id;
const packed_transaction& trx = *ptrx->packed_trx;
const auto& id = ptrx->id();
const packed_transaction& trx = *ptrx->packed_trx();
time_point_sec trx_expiration = trx.expiration();
node_transaction_state nts = {id, trx_expiration, 0, 0};

Expand All @@ -1915,7 +1915,7 @@ namespace eosio {
}

void dispatch_manager::recv_transaction(const connection_ptr& c, const transaction_metadata_ptr& txn) {
node_transaction_state nts = {txn->id, txn->packed_trx->expiration(), 0, c->connection_id};
node_transaction_state nts = {txn->id(), txn->packed_trx()->expiration(), 0, c->connection_id};
add_peer_txn( nts );
}

Expand Down Expand Up @@ -2699,7 +2699,7 @@ namespace eosio {
}

auto ptrx = std::make_shared<transaction_metadata>( trx );
const auto& tid = ptrx->id;
const auto& tid = ptrx->id();
peer_dlog( this, "received packed_transaction ${id}", ("id", tid) );

bool have_trx = my_impl->dispatcher->have_txn( tid );
Expand All @@ -2709,14 +2709,14 @@ namespace eosio {
return;
}

trx_in_progress_size += calc_trx_size( ptrx->packed_trx );
trx_in_progress_size += calc_trx_size( ptrx->packed_trx() );
connection_wptr weak = shared_from_this();
my_impl->chain_plug->accept_transaction( ptrx,
[weak{std::move(weak)}, ptrx](const static_variant<fc::exception_ptr, transaction_trace_ptr>& result) {
// next (this lambda) called from application thread
connection_ptr conn = weak.lock();
if( conn ) {
conn->trx_in_progress_size -= calc_trx_size( ptrx->packed_trx );
conn->trx_in_progress_size -= calc_trx_size( ptrx->packed_trx() );
}
bool accepted = false;
if (result.contains<fc::exception_ptr>()) {
Expand All @@ -2740,7 +2740,7 @@ namespace eosio {
if( accepted ) {
my_impl->dispatcher->bcast_transaction( ptrx );
} else {
my_impl->dispatcher->rejected_transaction( ptrx->id, head_blk_num );
my_impl->dispatcher->rejected_transaction( ptrx->id(), head_blk_num );
}
});
});
Expand Down Expand Up @@ -2956,7 +2956,7 @@ namespace eosio {

// called from application thread
void net_plugin_impl::transaction_ack(const std::pair<fc::exception_ptr, transaction_metadata_ptr>& results) {
const auto& id = results.second->id;
const auto& id = results.second->id();
if (results.first) {
fc_dlog( logger, "signaled NACK, trx-id = ${id} : ${why}", ("id", id)( "why", results.first->to_detail_string() ) );

Expand Down
Loading