Skip to content

Commit

Permalink
Rebroadcast blocks via process RPC (#852)
Browse files Browse the repository at this point in the history
* The process RPC was reimplementing block processing but didn't properly rebroadcast blocks it processed.  This converts the RPC to just pass the block off to the block processor.  The processing result can be retrieved via logging.

* Fixing formatting.
  • Loading branch information
clemahieu authored May 7, 2018
1 parent ec7432b commit 4771642
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 51 deletions.
37 changes: 36 additions & 1 deletion rai/core_test/rpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1020,7 +1020,13 @@ TEST (rpc, process_block)
system.poll ();
}
ASSERT_EQ (200, response.status);
ASSERT_EQ (send.hash (), system.nodes[0]->latest (rai::test_genesis_key.pub));
auto iterations (0);
while (system.nodes[0]->latest (rai::test_genesis_key.pub) != send.hash ())
{
system.poll ();
++iterations;
ASSERT_LT (iterations, 200);
}
std::string send_hash (response.json.get<std::string> ("hash"));
ASSERT_EQ (send.hash ().to_string (), send_hash);
}
Expand Down Expand Up @@ -1049,6 +1055,35 @@ TEST (rpc, process_block_no_work)
ASSERT_FALSE (response.json.get<std::string> ("error", "").empty ());
}

TEST (rpc, process_republish)
{
rai::system system (24000, 2);
rai::keypair key;
auto latest (system.nodes[0]->latest (rai::test_genesis_key.pub));
auto & node1 (*system.nodes[0]);
rai::send_block send (latest, key.pub, 100, rai::test_genesis_key.prv, rai::test_genesis_key.pub, node1.generate_work (latest));
rai::rpc rpc (system.service, node1, rai::rpc_config (true));
rpc.start ();
boost::property_tree::ptree request;
request.put ("action", "process");
std::string json;
send.serialize_json (json);
request.put ("block", json);
test_response response (request, rpc, system.service);
while (response.status == 0)
{
system.poll ();
}
ASSERT_EQ (200, response.status);
auto iterations (0);
while (system.nodes[1]->latest (rai::test_genesis_key.pub) != send.hash ())
{
system.poll ();
++iterations;
ASSERT_LT (iterations, 200);
}
}

TEST (rpc, keepalive)
{
rai::system system (24000, 1);
Expand Down
72 changes: 34 additions & 38 deletions rai/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1299,30 +1299,7 @@ void rai::block_processor::process_receive_many (std::unique_lock<std::mutex> &
}
}
auto process_result (process_receive_one (transaction, block));
switch (process_result.code)
{
case rai::process_result::progress:
{
if (node.block_arrival.recent (hash))
{
node.active.start (block);
}
}
case rai::process_result::old:
{
auto cached (node.store.unchecked_get (transaction, hash));
for (auto i (cached.begin ()), n (cached.end ()); i != n; ++i)
{
node.store.unchecked_del (transaction, hash, **i);
add (*i);
}
std::lock_guard<std::mutex> lock (node.gap_cache.mutex);
node.gap_cache.blocks.get<1> ().erase (hash);
break;
}
default:
break;
}
(void)process_result;
lock_a.lock ();
}
}
Expand All @@ -1332,6 +1309,7 @@ void rai::block_processor::process_receive_many (std::unique_lock<std::mutex> &
rai::process_return rai::block_processor::process_receive_one (MDB_txn * transaction_a, std::shared_ptr<rai::block> block_a)
{
rai::process_return result;
auto hash (block_a->hash ());
result = node.ledger.process (transaction_a, *block_a);
switch (result.code)
{
Expand All @@ -1341,15 +1319,20 @@ rai::process_return rai::block_processor::process_receive_one (MDB_txn * transac
{
std::string block;
block_a->serialize_json (block);
BOOST_LOG (node.log) << boost::str (boost::format ("Processing block %1%: %2%") % block_a->hash ().to_string () % block);
BOOST_LOG (node.log) << boost::str (boost::format ("Processing block %1%: %2%") % hash.to_string () % block);
}
if (node.block_arrival.recent (hash))
{
node.active.start (block_a);
}
queue_unchecked (transaction_a, hash);
break;
}
case rai::process_result::gap_previous:
{
if (node.config.logging.ledger_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Gap previous for: %1%") % block_a->hash ().to_string ());
BOOST_LOG (node.log) << boost::str (boost::format ("Gap previous for: %1%") % hash.to_string ());
}
node.store.unchecked_put (transaction_a, block_a->previous (), block_a);
node.gap_cache.add (transaction_a, block_a);
Expand All @@ -1359,7 +1342,7 @@ rai::process_return rai::block_processor::process_receive_one (MDB_txn * transac
{
if (node.config.logging.ledger_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Gap source for: %1%") % block_a->hash ().to_string ());
BOOST_LOG (node.log) << boost::str (boost::format ("Gap source for: %1%") % hash.to_string ());
}
node.store.unchecked_put (transaction_a, node.ledger.block_source (transaction_a, *block_a), block_a);
node.gap_cache.add (transaction_a, block_a);
Expand All @@ -1369,7 +1352,7 @@ rai::process_return rai::block_processor::process_receive_one (MDB_txn * transac
{
if (node.config.logging.ledger_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("State blocks are disabled: %1%") % block_a->hash ().to_string ());
BOOST_LOG (node.log) << boost::str (boost::format ("State blocks are disabled: %1%") % hash.to_string ());
}
node.store.unchecked_put (transaction_a, node.ledger.state_block_parse_canary, block_a);
node.gap_cache.add (transaction_a, block_a);
Expand All @@ -1381,86 +1364,99 @@ rai::process_return rai::block_processor::process_receive_one (MDB_txn * transac
{
BOOST_LOG (node.log) << boost::str (boost::format ("Old for: %1%") % block_a->hash ().to_string ());
}
queue_unchecked (transaction_a, hash);
break;
}
case rai::process_result::bad_signature:
{
if (node.config.logging.ledger_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Bad signature for: %1%") % block_a->hash ().to_string ());
BOOST_LOG (node.log) << boost::str (boost::format ("Bad signature for: %1%") % hash.to_string ());
}
break;
}
case rai::process_result::negative_spend:
{
if (node.config.logging.ledger_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Negative spend for: %1%") % block_a->hash ().to_string ());
BOOST_LOG (node.log) << boost::str (boost::format ("Negative spend for: %1%") % hash.to_string ());
}
break;
}
case rai::process_result::unreceivable:
{
if (node.config.logging.ledger_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Unreceivable for: %1%") % block_a->hash ().to_string ());
BOOST_LOG (node.log) << boost::str (boost::format ("Unreceivable for: %1%") % hash.to_string ());
}
break;
}
case rai::process_result::not_receive_from_send:
{
if (node.config.logging.ledger_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Not receive from send for: %1%") % block_a->hash ().to_string ());
BOOST_LOG (node.log) << boost::str (boost::format ("Not receive from send for: %1%") % hash.to_string ());
}
break;
}
case rai::process_result::fork:
{
if (!node.block_arrival.recent (block_a->hash ()))
if (!node.block_arrival.recent (hash))
{
// Only let the bootstrap attempt know about forked blocks that did not arrive via UDP.
node.bootstrap_initiator.process_fork (transaction_a, block_a);
}
if (node.config.logging.ledger_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Fork for: %1% root: %2%") % block_a->hash ().to_string () % block_a->root ().to_string ());
BOOST_LOG (node.log) << boost::str (boost::format ("Fork for: %1% root: %2%") % hash.to_string () % block_a->root ().to_string ());
}
break;
}
case rai::process_result::account_mismatch:
{
if (node.config.logging.ledger_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Account mismatch for: %1%") % block_a->hash ().to_string ());
BOOST_LOG (node.log) << boost::str (boost::format ("Account mismatch for: %1%") % hash.to_string ());
}
break;
}
case rai::process_result::opened_burn_account:
{
BOOST_LOG (node.log) << boost::str (boost::format ("*** Rejecting open block for burn account ***: %1%") % block_a->hash ().to_string ());
BOOST_LOG (node.log) << boost::str (boost::format ("*** Rejecting open block for burn account ***: %1%") % hash.to_string ());
break;
}
case rai::process_result::balance_mismatch:
{
if (node.config.logging.ledger_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Balance mismatch for: %1%") % block_a->hash ().to_string ());
BOOST_LOG (node.log) << boost::str (boost::format ("Balance mismatch for: %1%") % hash.to_string ());
}
break;
}
case rai::process_result::block_position:
{
if (node.config.logging.ledger_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Block %1% cannot follow predecessor %2%") % block_a->hash ().to_string () % block_a->previous ().to_string ());
BOOST_LOG (node.log) << boost::str (boost::format ("Block %1% cannot follow predecessor %2%") % hash.to_string () % block_a->previous ().to_string ());
}
break;
}
}
return result;
}

void rai::block_processor::queue_unchecked (MDB_txn * transaction_a, rai::block_hash const & hash_a)
{
auto cached (node.store.unchecked_get (transaction_a, hash_a));
for (auto i (cached.begin ()), n (cached.end ()); i != n; ++i)
{
node.store.unchecked_del (transaction_a, hash_a, **i);
add (*i);
}
std::lock_guard<std::mutex> lock (node.gap_cache.mutex);
node.gap_cache.blocks.get<1> ().erase (hash_a);
}

rai::node::node (rai::node_init & init_a, boost::asio::io_service & service_a, uint16_t peering_port_a, boost::filesystem::path const & application_path_a, rai::alarm & alarm_a, rai::logging const & logging_a, rai::work_pool & work_a) :
node (init_a, service_a, application_path_a, alarm_a, rai::node_config (peering_port_a, logging_a), work_a)
{
Expand Down
1 change: 1 addition & 0 deletions rai/node/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ class block_processor
rai::process_return process_receive_one (MDB_txn *, std::shared_ptr<rai::block>);

private:
void queue_unchecked (MDB_txn *, rai::block_hash const &);
void process_receive_many (std::unique_lock<std::mutex> &);
bool stopped;
bool active;
Expand Down
13 changes: 1 addition & 12 deletions rai/node/rpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2596,25 +2596,14 @@ void rai::rpc_handler::process ()
auto hash (block->hash ());
node.block_arrival.add (hash);
rai::process_return result;
std::shared_ptr<rai::block> block_a (std::move (block));
{
rai::transaction transaction (node.store.environment, nullptr, true);
result = node.block_processor.process_receive_one (transaction, block_a);
result = node.block_processor.process_receive_one (transaction, std::move (block));
}
switch (result.code)
{
case rai::process_result::progress:
{
rai::transaction transaction (node.store.environment, nullptr, false);
auto account (node.ledger.account (transaction, hash));
auto amount (node.ledger.amount (transaction, hash));
bool is_state_send (false);
if (auto state = dynamic_cast<rai::state_block *> (block_a.get ()))
{
rai::transaction transaction (node.store.environment, nullptr, false);
is_state_send = node.ledger.is_send (transaction, *state);
}
node.observers.blocks (block_a, account, amount, is_state_send);
boost::property_tree::ptree response_l;
response_l.put ("hash", hash.to_string ());
response (response_l);
Expand Down

0 comments on commit 4771642

Please sign in to comment.