Skip to content

Commit

Permalink
Broadcast when a transaction is accepted to the mempool
Browse files Browse the repository at this point in the history
  • Loading branch information
mvandeberg committed Mar 4, 2022
1 parent a9fb5c6 commit d29cc94
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 15 deletions.
2 changes: 1 addition & 1 deletion libraries/mempool/include/koinos/mempool/mempool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class mempool final
uint64_t max_payer_rc,
uint64_t rc_limit )const;

void add_pending_transaction(
uint64_t add_pending_transaction(
const protocol::transaction& transaction,
block_height_type height,
uint64_t max_payer_rc,
Expand Down
12 changes: 8 additions & 4 deletions libraries/mempool/mempool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class mempool_impl final
const account_type& payer,
uint64_t max_payer_resources,
uint64_t trx_resource_limit ) const;
void add_pending_transaction(
uint64_t add_pending_transaction(
const protocol::transaction& transaction,
block_height_type height,
uint64_t max_payer_rc,
Expand Down Expand Up @@ -217,7 +217,7 @@ bool mempool_impl::check_pending_account_resources(
return check_pending_account_resources_lockfree( payer, max_payer_rc, rc_limit );
}

void mempool_impl::add_pending_transaction(
uint64_t mempool_impl::add_pending_transaction(
const protocol::transaction& transaction,
block_height_type height,
uint64_t max_payer_rc,
Expand All @@ -227,6 +227,7 @@ void mempool_impl::add_pending_transaction(
{
const auto& payer = transaction.header().payer();
uint64_t rc_limit = transaction.header().rc_limit();
uint64_t rc_used = rc_limit;

{
std::lock_guard< std::mutex > guard( _account_resources_mutex );
Expand Down Expand Up @@ -333,10 +334,13 @@ void mempool_impl::add_pending_transaction(
aro.resources = new_resources.convert_to< uint64_t >();
aro.last_update = height;
} );

rc_used = max_payer_rc - it->resources;
}
}

LOG(info) << "Transaction added to mempool: " << util::to_hex( transaction.id() );
return rc_used;
}

void mempool_impl::remove_pending_transactions( const std::vector< transaction_id_type >& ids )
Expand Down Expand Up @@ -430,15 +434,15 @@ bool mempool::check_pending_account_resources(
return _my->check_pending_account_resources( payer, max_payer_resources, trx_resource_limit );
}

void mempool::add_pending_transaction(
uint64_t mempool::add_pending_transaction(
const protocol::transaction& transaction,
block_height_type height,
uint64_t max_payer_rc,
uint64_t disk_storaged_used,
uint64_t network_bandwidth_used,
uint64_t compute_bandwidth_used )
{
_my->add_pending_transaction( transaction, height, max_payer_rc, disk_storaged_used, network_bandwidth_used, compute_bandwidth_used );
return _my->add_pending_transaction( transaction, height, max_payer_rc, disk_storaged_used, network_bandwidth_used, compute_bandwidth_used );
}

void mempool::remove_pending_transactions( const std::vector< transaction_id_type >& ids )
Expand Down
24 changes: 21 additions & 3 deletions programs/koinos_mempool/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <koinos/broadcast/broadcast.pb.h>
#include <koinos/exception.hpp>
#include <koinos/mempool/mempool.hpp>
#include <koinos/mq/client.hpp>
#include <koinos/mq/request_handler.hpp>
#include <koinos/rpc/mempool/mempool_rpc.pb.h>
#include <koinos/util/conversion.hpp>
Expand Down Expand Up @@ -47,8 +48,9 @@ int main( int argc, char** argv )
int retcode = EXIT_SUCCESS;
std::vector< std::thread > threads;

boost::asio::io_context main_ioc, server_ioc;
boost::asio::io_context main_ioc, server_ioc, client_ioc;
auto request_handler = koinos::mq::request_handler( server_ioc );
auto client = koinos::mq::client( client_ioc );

try
{
Expand Down Expand Up @@ -122,6 +124,9 @@ int main( int argc, char** argv )
main_ioc.stop();
} );

threads.emplace_back( [&]() { client_ioc.run(); } );
threads.emplace_back( [&]() { client_ioc.run(); } );

for ( std::size_t i = 0; i < jobs; i++ )
threads.emplace_back( [&]() { server_ioc.run(); } );

Expand Down Expand Up @@ -213,14 +218,22 @@ int main( int argc, char** argv )

try
{
mempool.add_pending_transaction(
auto rc_used = mempool.add_pending_transaction(
trx_accept.transaction(),
trx_accept.height(),
trx_accept.receipt().max_payer_rc(),
trx_accept.receipt().disk_storage_used(),
trx_accept.receipt().network_bandwidth_used(),
trx_accept.receipt().compute_bandwidth_used()
);

broadcast::mempool_accepted accepted_broadcast;
accepted_broadcast.mutable_transaction()->CopyFrom( trx_accept.transaction() );
accepted_broadcast.mutable_receipt()->CopyFrom( trx_accept.receipt() );
accepted_broadcast.set_height( trx_accept.height() );
accepted_broadcast.set_pending_rc_used( rc_used );

client.broadcast( "koinos.mempool.accept", util::converter::as< std::string >( accepted_broadcast ) );
}
catch ( const std::exception& e )
{
Expand Down Expand Up @@ -283,10 +296,15 @@ int main( int argc, char** argv )
}
);

LOG(info) << "Connecting AMQP client...";
client.connect( amqp_url );
LOG(info) << "Established AMQP client connection to the server";

LOG(info) << "Connecting AMQP request handler...";
request_handler.connect( amqp_url );
LOG(info) << "Established connection to AMQP";
LOG(info) << "Established request handler connection to the AMQP server";

LOG(info) << "Listening for requests over AMQP";
auto work = asio::make_work_guard( main_ioc );
main_ioc.run();
}
Expand Down
15 changes: 9 additions & 6 deletions tests/tests/mempool_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ BOOST_AUTO_TEST_CASE( mempool_basic_test )

BOOST_TEST_MESSAGE( "adding pending transaction" );
auto trx_resource_limit = t1.header().rc_limit();
mempool.add_pending_transaction( t1, 1, max_payer_resources, 1, 2, 3 );
auto rc_used = mempool.add_pending_transaction( t1, 1, max_payer_resources, 1, 2, 3 );
BOOST_CHECK_EQUAL( t1.header().rc_limit(), rc_used );

BOOST_TEST_MESSAGE( "adding duplicate pending transaction" );
BOOST_REQUIRE_THROW( mempool.add_pending_transaction( t1, 2, max_payer_resources, 1, 2, 3 ), mempool::pending_transaction_insertion_failure );
Expand Down Expand Up @@ -116,23 +117,25 @@ BOOST_AUTO_TEST_CASE( pending_transaction_pagination )
{
mempool::mempool mempool;
protocol::transaction trx;
mempool::account_type payer;
uint64_t max_payer_resources;
mempool::account_type payer = _key1.get_public_key().to_address_bytes();;
uint64_t max_payer_resources = 1000000000000;
uint64_t trx_resource_limit;
chain::value_type nonce_value;
uint64_t rc_used = 0;

for( uint64_t i = 0; i < MAX_PENDING_TRANSACTION_REQUEST + 1; i++ )
{
nonce_value.set_uint64_value( i + 1 );
payer = _key1.get_public_key().to_address_bytes();
max_payer_resources = 1000000000000;

trx.mutable_header()->set_rc_limit( 10 * i );
trx.mutable_header()->set_payer( payer );
trx.mutable_header()->set_nonce( util::converter::as< std::string >( nonce_value ) );
trx.set_id( sign( _key1, trx ) );

mempool.add_pending_transaction( trx, i, max_payer_resources, 1, 1, 1 );
rc_used += trx.header().rc_limit();

auto rc = mempool.add_pending_transaction( trx, i, max_payer_resources, 1, 1, 1 );
BOOST_CHECK_EQUAL( rc, rc_used );
}

BOOST_REQUIRE_THROW( mempool.get_pending_transactions( MAX_PENDING_TRANSACTION_REQUEST + 1 ), mempool::pending_transaction_request_overflow );
Expand Down

0 comments on commit d29cc94

Please sign in to comment.