Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Merge pull request #6471 from EOSIO/producer-txns
Browse files Browse the repository at this point in the history
Optimize transaction signature recovery
  • Loading branch information
heifner authored Dec 18, 2018
2 parents e2b23eb + 1c8640b commit c9a21a4
Show file tree
Hide file tree
Showing 23 changed files with 375 additions and 187 deletions.
2 changes: 1 addition & 1 deletion libraries/chain/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ add_library( eosio_chain
# contracts/chain_initializer.cpp


# transaction_metadata.cpp
transaction_metadata.cpp
${HEADERS}
)

Expand Down
52 changes: 22 additions & 30 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@
#include <eosio/chain/authorization_manager.hpp>
#include <eosio/chain/resource_limits.hpp>
#include <eosio/chain/chain_snapshot.hpp>
#include <eosio/chain/thread_utils.hpp>

#include <chainbase/chainbase.hpp>
#include <fc/io/json.hpp>
#include <fc/scoped_exit.hpp>
#include <fc/variant_object.hpp>

#include <boost/asio/thread_pool.hpp>
#include <boost/asio/post.hpp>


namespace eosio { namespace chain {

Expand Down Expand Up @@ -135,7 +133,7 @@ struct controller_impl {
optional<fc::microseconds> subjective_cpu_leeway;
bool trusted_producer_light_validation = false;
uint32_t snapshot_head_block = 0;
optional<boost::asio::thread_pool> thread_pool;
boost::asio::thread_pool thread_pool;

typedef pair<scope_name,action_name> handler_key;
map< account_name, map<handler_key, apply_handler> > apply_handlers;
Expand All @@ -147,14 +145,6 @@ struct controller_impl {
*/
map<digest_type, transaction_metadata_ptr> unapplied_transactions;

// async on thread_pool and return future
template<typename F>
auto async_thread_pool( F&& f ) {
auto task = std::make_shared<std::packaged_task<decltype( f() )()>>( std::forward<F>( f ) );
boost::asio::post( *thread_pool, [task]() { (*task)(); } );
return task->get_future();
}

void pop_block() {
auto prev = fork_db.get_block( head->header.previous );
EOS_ASSERT( prev, block_validate_exception, "attempt to pop beyond last irreversible block" );
Expand Down Expand Up @@ -194,7 +184,8 @@ struct controller_impl {
authorization( s, db ),
conf( cfg ),
chain_id( cfg.genesis.compute_chain_id() ),
read_mode( cfg.read_mode )
read_mode( cfg.read_mode ),
thread_pool( cfg.thread_pool_size )
{

#define SET_APP_HANDLER( receiver, contract, action) \
Expand Down Expand Up @@ -349,8 +340,6 @@ struct controller_impl {

void init(std::function<bool()> shutdown, const snapshot_reader_ptr& snapshot) {

thread_pool.emplace( conf.thread_pool_size );

bool report_integrity_hash = !!snapshot;
if (snapshot) {
EOS_ASSERT( !head, fork_database_exception, "" );
Expand Down Expand Up @@ -418,11 +407,6 @@ struct controller_impl {
~controller_impl() {
pending.reset();

if( thread_pool ) {
thread_pool->join();
thread_pool->stop();
}

db.flush();
reversible_blocks.flush();
}
Expand Down Expand Up @@ -1014,7 +998,18 @@ struct controller_impl {

transaction_trace_ptr trace;
try {
transaction_context trx_context(self, trx->trx, trx->id);
auto start = fc::time_point::now();
if( !explicit_billed_cpu_time ) {
fc::microseconds already_consumed_time( EOS_PERCENT(trx->sig_cpu_usage.count(), conf.sig_cpu_bill_pct) );

if( start.time_since_epoch() < already_consumed_time ) {
start = fc::time_point();
} else {
start -= already_consumed_time;
}
}

transaction_context trx_context(self, trx->trx, trx->id, start);
if ((bool)subjective_cpu_leeway && pending->_block_status == controller::block_status::incomplete) {
trx_context.leeway = *subjective_cpu_leeway;
}
Expand All @@ -1030,7 +1025,6 @@ struct controller_impl {
bool skip_recording = replay_head_time && (time_point(trx->trx.expiration) <= *replay_head_time);
trx_context.init_for_input_trx( trx->packed_trx->get_unprunable_size(),
trx->packed_trx->get_prunable_size(),
trx->trx.signatures.size(),
skip_recording);
}

Expand Down Expand Up @@ -1205,13 +1199,7 @@ struct controller_impl {
auto& pt = receipt.trx.get<packed_transaction>();
auto mtrx = std::make_shared<transaction_metadata>( std::make_shared<packed_transaction>( pt ) );
if( !self.skip_auth_check() ) {
std::weak_ptr<transaction_metadata> mtrx_wp = mtrx;
mtrx->signing_keys_future = async_thread_pool( [chain_id = this->chain_id, mtrx_wp]() {
auto mtrx = mtrx_wp.lock();
return mtrx ?
std::make_pair( chain_id, mtrx->trx.get_signature_keys( chain_id ) ) :
std::make_pair( chain_id, decltype( mtrx->trx.get_signature_keys( chain_id ) ){} );
} );
transaction_metadata::create_signing_keys_future( mtrx, thread_pool, chain_id, microseconds::maximum() );
}
packed_transactions.emplace_back( std::move( mtrx ) );
}
Expand Down Expand Up @@ -1289,7 +1277,7 @@ struct controller_impl {
auto prev = fork_db.get_block( b->previous );
EOS_ASSERT( prev, unlinkable_block_exception, "unlinkable block ${id}", ("id", id)("previous", b->previous) );

return async_thread_pool( [b, prev]() {
return async_thread_pool( thread_pool, [b, prev]() {
const bool skip_validate_signee = false;
return std::make_shared<block_state>( *prev, move( b ), skip_validate_signee );
} );
Expand Down Expand Up @@ -1783,6 +1771,10 @@ void controller::abort_block() {
my->abort_block();
}

boost::asio::thread_pool& controller::get_thread_pool() {
return my->thread_pool;
}

std::future<block_state_ptr> controller::create_block_state_future( const signed_block_ptr& b ) {
return my->create_block_state_future( b );
}
Expand Down
39 changes: 20 additions & 19 deletions libraries/chain/include/eosio/chain/abi_serializer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -418,11 +418,11 @@ namespace impl {
mutable_variant_object mvo;
auto trx = ptrx.get_transaction();
mvo("id", trx.id());
mvo("signatures", ptrx.signatures);
mvo("compression", ptrx.compression);
mvo("packed_context_free_data", ptrx.packed_context_free_data);
mvo("signatures", ptrx.get_signatures());
mvo("compression", ptrx.get_compression());
mvo("packed_context_free_data", ptrx.get_packed_context_free_data());
mvo("context_free_data", ptrx.get_context_free_data());
mvo("packed_trx", ptrx.packed_trx);
mvo("packed_trx", ptrx.get_packed_transaction());
add(mvo, "transaction", trx, resolver, ctx);

out(name, std::move(mvo));
Expand Down Expand Up @@ -577,32 +577,33 @@ namespace impl {
const variant_object& vo = v.get_object();
EOS_ASSERT(vo.contains("signatures"), packed_transaction_type_exception, "Missing signatures");
EOS_ASSERT(vo.contains("compression"), packed_transaction_type_exception, "Missing compression");
from_variant(vo["signatures"], ptrx.signatures);
from_variant(vo["compression"], ptrx.compression);
std::vector<signature_type> signatures;
packed_transaction::compression_type compression;
from_variant(vo["signatures"], signatures);
from_variant(vo["compression"], compression);

// TODO: Make this nicer eventually. But for now, if it works... good enough.
bytes packed_cfd;
if( vo.contains("packed_trx") && vo["packed_trx"].is_string() && !vo["packed_trx"].as_string().empty() ) {
from_variant(vo["packed_trx"], ptrx.packed_trx);
auto trx = ptrx.get_transaction(); // Validates transaction data provided.
bytes packed_trx;
std::vector<bytes> cfd;
from_variant(vo["packed_trx"], packed_trx);
if( vo.contains("packed_context_free_data") && vo["packed_context_free_data"].is_string() && !vo["packed_context_free_data"].as_string().empty() ) {
from_variant(vo["packed_context_free_data"], ptrx.packed_context_free_data );
from_variant(vo["packed_context_free_data"], packed_cfd );
} else if( vo.contains("context_free_data") ) {
vector<bytes> context_free_data;
from_variant(vo["context_free_data"], context_free_data);
ptrx.set_transaction(trx, context_free_data, ptrx.compression);
from_variant(vo["context_free_data"], cfd);
}
ptrx = packed_transaction( std::move(packed_trx), std::move(signatures), std::move(packed_cfd), std::move(cfd), compression );
} else {
EOS_ASSERT(vo.contains("transaction"), packed_transaction_type_exception, "Missing transaction");
transaction trx;
vector<bytes> context_free_data;
signed_transaction trx;
trx.signatures = std::move(signatures);
extract(vo["transaction"], trx, resolver, ctx);
if( vo.contains("packed_context_free_data") && vo["packed_context_free_data"].is_string() && !vo["packed_context_free_data"].as_string().empty() ) {
from_variant(vo["packed_context_free_data"], ptrx.packed_context_free_data );
context_free_data = ptrx.get_context_free_data();
from_variant(vo["packed_context_free_data"], packed_cfd );
} else if( vo.contains("context_free_data") ) {
from_variant(vo["context_free_data"], context_free_data);
from_variant(vo["context_free_data"], trx.context_free_data );
}
ptrx.set_transaction(trx, context_free_data, ptrx.compression);
ptrx = packed_transaction( std::move(trx), std::move(packed_cfd), compression );
}
}
};
Expand Down
1 change: 1 addition & 0 deletions libraries/chain/include/eosio/chain/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ const static uint32_t default_max_trx_delay = 45*24*3600; //
const static uint32_t default_max_inline_action_size = 4 * 1024; // 4 KB
const static uint16_t default_max_inline_action_depth = 4;
const static uint16_t default_max_auth_depth = 6;
const static uint32_t default_sig_cpu_bill_pct = 50 * percent_1; // billable percentage of signature recovery
const static uint16_t default_controller_thread_pool_size = 2;

const static uint32_t min_net_usage_delta_between_base_and_max_for_trx = 10*1024;
Expand Down
9 changes: 7 additions & 2 deletions libraries/chain/include/eosio/chain/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
namespace chainbase {
class database;
}

namespace boost { namespace asio {
class thread_pool;
}}

namespace eosio { namespace chain {

Expand Down Expand Up @@ -63,6 +65,7 @@ namespace eosio { namespace chain {
uint64_t state_guard_size = chain::config::default_state_guard_size;
uint64_t reversible_cache_size = chain::config::default_reversible_cache_size;
uint64_t reversible_guard_size = chain::config::default_reversible_guard_size;
uint32_t sig_cpu_bill_pct = chain::config::default_sig_cpu_bill_pct;
uint16_t thread_pool_size = chain::config::default_controller_thread_pool_size;
bool read_only = false;
bool force_all_checks = false;
Expand All @@ -87,7 +90,7 @@ namespace eosio { namespace chain {
incomplete = 3, ///< this is an incomplete block (either being produced by a producer or speculatively produced by a node)
};

controller( const config& cfg );
explicit controller( const config& cfg );
~controller();

void add_indices();
Expand Down Expand Up @@ -144,6 +147,8 @@ namespace eosio { namespace chain {
std::future<block_state_ptr> create_block_state_future( const signed_block_ptr& b );
void push_block( std::future<block_state_ptr>& block_state_future );

boost::asio::thread_pool& get_thread_pool();

const chainbase::database& db()const;

const fork_database& fork_db()const;
Expand Down
24 changes: 24 additions & 0 deletions libraries/chain/include/eosio/chain/thread_utils.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/**
* @file
* @copyright defined in eos/LICENSE.txt
*/
#pragma once

#include <boost/asio/thread_pool.hpp>
#include <boost/asio/post.hpp>
#include <future>
#include <memory>

namespace eosio { namespace chain {

// async on thread_pool and return future
template<typename F>
auto async_thread_pool( boost::asio::thread_pool& thread_pool, F&& f ) {
auto task = std::make_shared<std::packaged_task<decltype( f() )()>>( std::forward<F>( f ) );
boost::asio::post( thread_pool, [task]() { (*task)(); } );
return task->get_future();
}

} } // eosio::chain


53 changes: 36 additions & 17 deletions libraries/chain/include/eosio/chain/transaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,12 @@ namespace eosio { namespace chain {

transaction_id_type id()const;
digest_type sig_digest( const chain_id_type& chain_id, const vector<bytes>& cfd = vector<bytes>() )const;
flat_set<public_key_type> get_signature_keys( const vector<signature_type>& signatures,
fc::microseconds get_signature_keys( const vector<signature_type>& signatures,
const chain_id_type& chain_id,
const vector<bytes>& cfd = vector<bytes>(),
bool allow_duplicate_keys = false,
bool use_cache = true )const;
fc::time_point deadline,
const vector<bytes>& cfd,
flat_set<public_key_type>& recovered_pub_keys,
bool allow_duplicate_keys = false) const;

uint32_t total_actions()const { return context_free_actions.size() + actions.size(); }
account_name first_authorizor()const {
Expand Down Expand Up @@ -92,7 +93,9 @@ namespace eosio { namespace chain {

const signature_type& sign(const private_key_type& key, const chain_id_type& chain_id);
signature_type sign(const private_key_type& key, const chain_id_type& chain_id)const;
flat_set<public_key_type> get_signature_keys( const chain_id_type& chain_id, bool allow_duplicate_keys = false, bool use_cache = true )const;
fc::microseconds get_signature_keys( const chain_id_type& chain_id, fc::time_point deadline,
flat_set<public_key_type>& recovered_pub_keys,
bool allow_duplicate_keys = false )const;
};

struct packed_transaction {
Expand All @@ -108,40 +111,56 @@ namespace eosio { namespace chain {
packed_transaction& operator=(packed_transaction&&) = default;

explicit packed_transaction(const signed_transaction& t, compression_type _compression = none)
:signatures(t.signatures)
:signatures(t.signatures), compression(_compression)
{
set_transaction(t, t.context_free_data, _compression);
set_transaction(t);
set_context_free_data(t.context_free_data);
}

explicit packed_transaction(signed_transaction&& t, compression_type _compression = none)
:signatures(std::move(t.signatures))
:signatures(std::move(t.signatures)), compression(_compression)
{
set_transaction(t, t.context_free_data, _compression);
set_transaction(t);
set_context_free_data(t.context_free_data);
}

// used by abi_serializer
explicit packed_transaction( bytes&& packed_txn, vector<signature_type>&& sigs,
bytes&& packed_cfd, vector<bytes>&& cfd, compression_type _compression );
explicit packed_transaction( signed_transaction&& t, bytes&& packed_cfd, compression_type _compression );

uint32_t get_unprunable_size()const;
uint32_t get_prunable_size()const;

digest_type packed_digest()const;

vector<signature_type> signatures;
fc::enum_type<uint8_t,compression_type> compression;
bytes packed_context_free_data;
bytes packed_trx;

time_point_sec expiration()const;
transaction_id_type id()const;
transaction_id_type get_uncached_id()const; // thread safe
bytes get_raw_transaction()const; // thread safe
vector<bytes> get_context_free_data()const;
transaction get_transaction()const;
signed_transaction get_signed_transaction()const;
void set_transaction(const transaction& t, compression_type _compression = none);
void set_transaction(const transaction& t, const vector<bytes>& cfd, compression_type _compression = none);

const vector<signature_type>& get_signatures()const { return signatures; }
const fc::enum_type<uint8_t,compression_type>& get_compression()const { return compression; }
const bytes& get_packed_context_free_data()const { return packed_context_free_data; }
const bytes& get_packed_transaction()const { return packed_trx; }

private:
mutable optional<transaction> unpacked_trx; // <-- intermediate buffer used to retrieve values
void local_unpack()const;
void set_transaction(const transaction& t);
void set_context_free_data(const vector<bytes>& cfd);

friend struct fc::reflector<packed_transaction>;
private:
vector<signature_type> signatures;
fc::enum_type<uint8_t,compression_type> compression;
bytes packed_context_free_data;
bytes packed_trx;

private:
mutable optional<transaction> unpacked_trx; // <-- intermediate buffer used to retrieve values
};

using packed_transaction_ptr = std::shared_ptr<packed_transaction>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ namespace eosio { namespace chain {

void init_for_input_trx( uint64_t packed_trx_unprunable_size,
uint64_t packed_trx_prunable_size,
uint32_t num_signatures,
bool skip_recording);

void init_for_deferred_trx( fc::time_point published );
Expand Down
Loading

0 comments on commit c9a21a4

Please sign in to comment.