Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Optimize transaction signature recovery #6471

Merged
merged 23 commits into from
Dec 18, 2018
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
3095d85
Switch interface from packed_transaction_ptr to transaction_metadata_ptr
heifner Dec 12, 2018
49a9c55
Thread pool does not need to be optional
heifner Dec 12, 2018
4a4a80a
Add transaction_metadata create_signing_keys_future method
heifner Dec 12, 2018
f71d490
Start transaction signature earily in thread pool
heifner Dec 12, 2018
96fb1e5
Refactor packed_transaction for better encapsulation
heifner Dec 13, 2018
20ffa70
Add transaction_metadata create_signing_keys_future method
heifner Dec 12, 2018
f64da40
Start transaction signature earily in thread pool
heifner Dec 12, 2018
22ab63c
Update txn_test_gen_plugin to overlap transaction submit @taokayan
heifner Dec 14, 2018
64537c5
Remove redundant signing_keys check
heifner Dec 17, 2018
ca8e5bc
Add deadline to key recovery
heifner Dec 17, 2018
b8a5659
Modify producer_plugin to have its own thead_pool instead of using ch…
heifner Dec 17, 2018
957db7f
Move thread_pool join/stop to plugin shutdown so that they are joined…
heifner Dec 17, 2018
9270c9c
Fix signature future deadline from starting too early
heifner Dec 17, 2018
3e733f5
Fix overflow of deadline and deadline check
heifner Dec 17, 2018
6e9b441
initial setup of billing CPU for signatures recovered earlier
arhag Dec 18, 2018
2b6a88a
Make recovery cache non-thread local and guard by mutex
heifner Dec 18, 2018
94ad2d1
Calculate cpu usage of signature recovery
heifner Dec 18, 2018
48bf2d4
Add signature-cpu-billable-pct option to chain_plugin
heifner Dec 18, 2018
1ecb7cc
Add missing include of mutex
heifner Dec 18, 2018
75587d0
Assert signature-cpu-billable-pct is 0-100
heifner Dec 18, 2018
21d20a8
Fix capture of cpu_usage. move flat_set<public_key_type> into attribute
heifner Dec 18, 2018
058d4ac
clear recovered_pub_keys to preserve previous behaviour
heifner Dec 18, 2018
1c8640b
Add move into tuple creation
heifner Dec 18, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion libraries/chain/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ add_library( eosio_chain
# contracts/chain_initializer.cpp


# transaction_metadata.cpp
transaction_metadata.cpp
${HEADERS}
)

Expand Down
39 changes: 12 additions & 27 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@
#include <eosio/chain/authorization_manager.hpp>
#include <eosio/chain/resource_limits.hpp>
#include <eosio/chain/chain_snapshot.hpp>
#include <eosio/chain/thread_utils.hpp>

#include <chainbase/chainbase.hpp>
#include <fc/io/json.hpp>
#include <fc/scoped_exit.hpp>
#include <fc/variant_object.hpp>

#include <boost/asio/thread_pool.hpp>
#include <boost/asio/post.hpp>


namespace eosio { namespace chain {

Expand Down Expand Up @@ -135,7 +133,7 @@ struct controller_impl {
optional<fc::microseconds> subjective_cpu_leeway;
bool trusted_producer_light_validation = false;
uint32_t snapshot_head_block = 0;
optional<boost::asio::thread_pool> thread_pool;
boost::asio::thread_pool thread_pool;

typedef pair<scope_name,action_name> handler_key;
map< account_name, map<handler_key, apply_handler> > apply_handlers;
Expand All @@ -147,14 +145,6 @@ struct controller_impl {
*/
map<digest_type, transaction_metadata_ptr> unapplied_transactions;

// async on thread_pool and return future
template<typename F>
auto async_thread_pool( F&& f ) {
auto task = std::make_shared<std::packaged_task<decltype( f() )()>>( std::forward<F>( f ) );
boost::asio::post( *thread_pool, [task]() { (*task)(); } );
return task->get_future();
}

void pop_block() {
auto prev = fork_db.get_block( head->header.previous );
EOS_ASSERT( prev, block_validate_exception, "attempt to pop beyond last irreversible block" );
Expand Down Expand Up @@ -194,7 +184,8 @@ struct controller_impl {
authorization( s, db ),
conf( cfg ),
chain_id( cfg.genesis.compute_chain_id() ),
read_mode( cfg.read_mode )
read_mode( cfg.read_mode ),
thread_pool( cfg.thread_pool_size )
{

#define SET_APP_HANDLER( receiver, contract, action) \
Expand Down Expand Up @@ -349,8 +340,6 @@ struct controller_impl {

void init(std::function<bool()> shutdown, const snapshot_reader_ptr& snapshot) {

thread_pool.emplace( conf.thread_pool_size );

bool report_integrity_hash = !!snapshot;
if (snapshot) {
EOS_ASSERT( !head, fork_database_exception, "" );
Expand Down Expand Up @@ -418,10 +407,8 @@ struct controller_impl {
~controller_impl() {
pending.reset();

if( thread_pool ) {
thread_pool->join();
thread_pool->stop();
}
thread_pool.join();
thread_pool.stop();

db.flush();
reversible_blocks.flush();
Expand Down Expand Up @@ -1205,13 +1192,7 @@ struct controller_impl {
auto& pt = receipt.trx.get<packed_transaction>();
auto mtrx = std::make_shared<transaction_metadata>( std::make_shared<packed_transaction>( pt ) );
if( !self.skip_auth_check() ) {
std::weak_ptr<transaction_metadata> mtrx_wp = mtrx;
mtrx->signing_keys_future = async_thread_pool( [chain_id = this->chain_id, mtrx_wp]() {
auto mtrx = mtrx_wp.lock();
return mtrx ?
std::make_pair( chain_id, mtrx->trx.get_signature_keys( chain_id ) ) :
std::make_pair( chain_id, decltype( mtrx->trx.get_signature_keys( chain_id ) ){} );
} );
transaction_metadata::create_signing_keys_future( mtrx, thread_pool, chain_id );
}
packed_transactions.emplace_back( std::move( mtrx ) );
}
Expand Down Expand Up @@ -1289,7 +1270,7 @@ struct controller_impl {
auto prev = fork_db.get_block( b->previous );
EOS_ASSERT( prev, unlinkable_block_exception, "unlinkable block ${id}", ("id", id)("previous", b->previous) );

return async_thread_pool( [b, prev]() {
return async_thread_pool( thread_pool, [b, prev]() {
const bool skip_validate_signee = false;
return std::make_shared<block_state>( *prev, move( b ), skip_validate_signee );
} );
Expand Down Expand Up @@ -1783,6 +1764,10 @@ void controller::abort_block() {
my->abort_block();
}

boost::asio::thread_pool& controller::get_thread_pool() {
return my->thread_pool;
}

std::future<block_state_ptr> controller::create_block_state_future( const signed_block_ptr& b ) {
return my->create_block_state_future( b );
}
Expand Down
39 changes: 20 additions & 19 deletions libraries/chain/include/eosio/chain/abi_serializer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -418,11 +418,11 @@ namespace impl {
mutable_variant_object mvo;
auto trx = ptrx.get_transaction();
mvo("id", trx.id());
mvo("signatures", ptrx.signatures);
mvo("compression", ptrx.compression);
mvo("packed_context_free_data", ptrx.packed_context_free_data);
mvo("signatures", ptrx.get_signatures());
mvo("compression", ptrx.get_compression());
mvo("packed_context_free_data", ptrx.get_packed_context_free_data());
mvo("context_free_data", ptrx.get_context_free_data());
mvo("packed_trx", ptrx.packed_trx);
mvo("packed_trx", ptrx.get_packed_transaction());
add(mvo, "transaction", trx, resolver, ctx);

out(name, std::move(mvo));
Expand Down Expand Up @@ -577,32 +577,33 @@ namespace impl {
const variant_object& vo = v.get_object();
EOS_ASSERT(vo.contains("signatures"), packed_transaction_type_exception, "Missing signatures");
EOS_ASSERT(vo.contains("compression"), packed_transaction_type_exception, "Missing compression");
from_variant(vo["signatures"], ptrx.signatures);
from_variant(vo["compression"], ptrx.compression);
std::vector<signature_type> signatures;
packed_transaction::compression_type compression;
from_variant(vo["signatures"], signatures);
from_variant(vo["compression"], compression);

// TODO: Make this nicer eventually. But for now, if it works... good enough.
bytes packed_cfd;
arhag marked this conversation as resolved.
Show resolved Hide resolved
if( vo.contains("packed_trx") && vo["packed_trx"].is_string() && !vo["packed_trx"].as_string().empty() ) {
from_variant(vo["packed_trx"], ptrx.packed_trx);
auto trx = ptrx.get_transaction(); // Validates transaction data provided.
bytes packed_trx;
std::vector<bytes> cfd;
from_variant(vo["packed_trx"], packed_trx);
if( vo.contains("packed_context_free_data") && vo["packed_context_free_data"].is_string() && !vo["packed_context_free_data"].as_string().empty() ) {
from_variant(vo["packed_context_free_data"], ptrx.packed_context_free_data );
from_variant(vo["packed_context_free_data"], packed_cfd );
} else if( vo.contains("context_free_data") ) {
vector<bytes> context_free_data;
from_variant(vo["context_free_data"], context_free_data);
ptrx.set_transaction(trx, context_free_data, ptrx.compression);
from_variant(vo["context_free_data"], cfd);
}
ptrx = packed_transaction( std::move(packed_trx), std::move(signatures), std::move(packed_cfd), std::move(cfd), compression );
arhag marked this conversation as resolved.
Show resolved Hide resolved
} else {
EOS_ASSERT(vo.contains("transaction"), packed_transaction_type_exception, "Missing transaction");
transaction trx;
vector<bytes> context_free_data;
signed_transaction trx;
arhag marked this conversation as resolved.
Show resolved Hide resolved
trx.signatures = std::move(signatures);
extract(vo["transaction"], trx, resolver, ctx);
if( vo.contains("packed_context_free_data") && vo["packed_context_free_data"].is_string() && !vo["packed_context_free_data"].as_string().empty() ) {
from_variant(vo["packed_context_free_data"], ptrx.packed_context_free_data );
context_free_data = ptrx.get_context_free_data();
from_variant(vo["packed_context_free_data"], packed_cfd );
} else if( vo.contains("context_free_data") ) {
from_variant(vo["context_free_data"], context_free_data);
from_variant(vo["context_free_data"], trx.context_free_data );
}
ptrx.set_transaction(trx, context_free_data, ptrx.compression);
ptrx = packed_transaction( std::move(trx), std::move(packed_cfd), compression );
}
}
};
Expand Down
8 changes: 6 additions & 2 deletions libraries/chain/include/eosio/chain/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
namespace chainbase {
class database;
}

namespace boost { namespace asio {
class thread_pool;
}}

namespace eosio { namespace chain {

Expand Down Expand Up @@ -87,7 +89,7 @@ namespace eosio { namespace chain {
incomplete = 3, ///< this is an incomplete block (either being produced by a producer or speculatively produced by a node)
};

controller( const config& cfg );
explicit controller( const config& cfg );
~controller();

void add_indices();
Expand Down Expand Up @@ -144,6 +146,8 @@ namespace eosio { namespace chain {
std::future<block_state_ptr> create_block_state_future( const signed_block_ptr& b );
void push_block( std::future<block_state_ptr>& block_state_future );

boost::asio::thread_pool& get_thread_pool();

const chainbase::database& db()const;

const fork_database& fork_db()const;
Expand Down
24 changes: 24 additions & 0 deletions libraries/chain/include/eosio/chain/thread_utils.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/**
* @file
* @copyright defined in eos/LICENSE.txt
*/
#pragma once

#include <boost/asio/thread_pool.hpp>
#include <boost/asio/post.hpp>
#include <future>
#include <memory>

namespace eosio { namespace chain {

// async on thread_pool and return future
template<typename F>
auto async_thread_pool( boost::asio::thread_pool& thread_pool, F&& f ) {
auto task = std::make_shared<std::packaged_task<decltype( f() )()>>( std::forward<F>( f ) );
boost::asio::post( thread_pool, [task]() { (*task)(); } );
return task->get_future();
}

} } // eosio::chain


40 changes: 28 additions & 12 deletions libraries/chain/include/eosio/chain/transaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,40 +108,56 @@ namespace eosio { namespace chain {
packed_transaction& operator=(packed_transaction&&) = default;

explicit packed_transaction(const signed_transaction& t, compression_type _compression = none)
:signatures(t.signatures)
:signatures(t.signatures), compression(_compression)
{
set_transaction(t, t.context_free_data, _compression);
set_transaction(t);
set_context_free_data(t.context_free_data);
}

explicit packed_transaction(signed_transaction&& t, compression_type _compression = none)
:signatures(std::move(t.signatures))
:signatures(std::move(t.signatures)), compression(_compression)
{
set_transaction(t, t.context_free_data, _compression);
set_transaction(t);
set_context_free_data(t.context_free_data);
}

// used by abi_serializer
explicit packed_transaction( bytes&& packed_txn, vector<signature_type>&& sigs,
bytes&& packed_cfd, vector<bytes>&& cfd, compression_type _compression );
explicit packed_transaction( signed_transaction&& t, bytes&& packed_cfd, compression_type _compression );

uint32_t get_unprunable_size()const;
uint32_t get_prunable_size()const;

digest_type packed_digest()const;

vector<signature_type> signatures;
fc::enum_type<uint8_t,compression_type> compression;
bytes packed_context_free_data;
bytes packed_trx;

time_point_sec expiration()const;
transaction_id_type id()const;
transaction_id_type get_uncached_id()const; // thread safe
bytes get_raw_transaction()const; // thread safe
vector<bytes> get_context_free_data()const;
transaction get_transaction()const;
signed_transaction get_signed_transaction()const;
void set_transaction(const transaction& t, compression_type _compression = none);
void set_transaction(const transaction& t, const vector<bytes>& cfd, compression_type _compression = none);

const vector<signature_type>& get_signatures()const { return signatures; }
const fc::enum_type<uint8_t,compression_type>& get_compression()const { return compression; }
const bytes& get_packed_context_free_data()const { return packed_context_free_data; }
const bytes& get_packed_transaction()const { return packed_trx; }

private:
mutable optional<transaction> unpacked_trx; // <-- intermediate buffer used to retrieve values
void local_unpack()const;
void set_transaction(const transaction& t);
void set_context_free_data(const vector<bytes>& cfd);

friend struct fc::reflector<packed_transaction>;
private:
vector<signature_type> signatures;
fc::enum_type<uint8_t,compression_type> compression;
bytes packed_context_free_data;
bytes packed_trx;

private:
mutable optional<transaction> unpacked_trx; // <-- intermediate buffer used to retrieve values
};

using packed_transaction_ptr = std::shared_ptr<packed_transaction>;
Expand Down
25 changes: 10 additions & 15 deletions libraries/chain/include/eosio/chain/transaction_metadata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,14 @@
#include <eosio/chain/types.hpp>
#include <future>

namespace boost { namespace asio {
class thread_pool;
}}

namespace eosio { namespace chain {

class transaction_metadata;
using transaction_metadata_ptr = std::shared_ptr<transaction_metadata>;
/**
* This data structure should store context-free cached data about a transaction such as
* packed/unpacked/compressed and recovered keys
Expand Down Expand Up @@ -43,23 +49,12 @@ class transaction_metadata {
signed_id = digest_type::hash(*packed_trx);
}

const flat_set<public_key_type>& recover_keys( const chain_id_type& chain_id ) {
// Unlikely for more than one chain_id to be used in one nodeos instance
if( !signing_keys || signing_keys->first != chain_id ) {
if( signing_keys_future.valid() ) {
signing_keys = signing_keys_future.get();
if( signing_keys->first == chain_id ) {
return signing_keys->second;
}
}
signing_keys = std::make_pair( chain_id, trx.get_signature_keys( chain_id ));
}
return signing_keys->second;
}
const flat_set<public_key_type>& recover_keys( const chain_id_type& chain_id );

static void create_signing_keys_future( const transaction_metadata_ptr& mtrx,
boost::asio::thread_pool& thread_pool, const chain_id_type& chain_id );

uint32_t total_actions()const { return trx.context_free_actions.size() + trx.actions.size(); }
};

using transaction_metadata_ptr = std::shared_ptr<transaction_metadata>;

} } // eosio::chain
Loading