Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rebroadcast blocks via process RPC #852

Merged
merged 2 commits into from
May 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion rai/core_test/rpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1020,7 +1020,13 @@ TEST (rpc, process_block)
system.poll ();
}
ASSERT_EQ (200, response.status);
ASSERT_EQ (send.hash (), system.nodes[0]->latest (rai::test_genesis_key.pub));
auto iterations (0);
while (system.nodes[0]->latest (rai::test_genesis_key.pub) != send.hash ())
{
system.poll ();
++iterations;
ASSERT_LT (iterations, 200);
}
std::string send_hash (response.json.get<std::string> ("hash"));
ASSERT_EQ (send.hash ().to_string (), send_hash);
}
Expand Down Expand Up @@ -1049,6 +1055,35 @@ TEST (rpc, process_block_no_work)
ASSERT_FALSE (response.json.get<std::string> ("error", "").empty ());
}

TEST (rpc, process_republish)
{
rai::system system (24000, 2);
rai::keypair key;
auto latest (system.nodes[0]->latest (rai::test_genesis_key.pub));
auto & node1 (*system.nodes[0]);
rai::send_block send (latest, key.pub, 100, rai::test_genesis_key.prv, rai::test_genesis_key.pub, node1.generate_work (latest));
rai::rpc rpc (system.service, node1, rai::rpc_config (true));
rpc.start ();
boost::property_tree::ptree request;
request.put ("action", "process");
std::string json;
send.serialize_json (json);
request.put ("block", json);
test_response response (request, rpc, system.service);
while (response.status == 0)
{
system.poll ();
}
ASSERT_EQ (200, response.status);
auto iterations (0);
while (system.nodes[1]->latest (rai::test_genesis_key.pub) != send.hash ())
{
system.poll ();
++iterations;
ASSERT_LT (iterations, 200);
}
}

TEST (rpc, keepalive)
{
rai::system system (24000, 1);
Expand Down
72 changes: 34 additions & 38 deletions rai/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1299,30 +1299,7 @@ void rai::block_processor::process_receive_many (std::unique_lock<std::mutex> &
}
}
auto process_result (process_receive_one (transaction, block));
switch (process_result.code)
{
case rai::process_result::progress:
{
if (node.block_arrival.recent (hash))
{
node.active.start (block);
}
}
case rai::process_result::old:
{
auto cached (node.store.unchecked_get (transaction, hash));
for (auto i (cached.begin ()), n (cached.end ()); i != n; ++i)
{
node.store.unchecked_del (transaction, hash, **i);
add (*i);
}
std::lock_guard<std::mutex> lock (node.gap_cache.mutex);
node.gap_cache.blocks.get<1> ().erase (hash);
break;
}
default:
break;
}
(void)process_result;
lock_a.lock ();
}
}
Expand All @@ -1332,6 +1309,7 @@ void rai::block_processor::process_receive_many (std::unique_lock<std::mutex> &
rai::process_return rai::block_processor::process_receive_one (MDB_txn * transaction_a, std::shared_ptr<rai::block> block_a)
{
rai::process_return result;
auto hash (block_a->hash ());
result = node.ledger.process (transaction_a, *block_a);
switch (result.code)
{
Expand All @@ -1341,15 +1319,20 @@ rai::process_return rai::block_processor::process_receive_one (MDB_txn * transac
{
std::string block;
block_a->serialize_json (block);
BOOST_LOG (node.log) << boost::str (boost::format ("Processing block %1%: %2%") % block_a->hash ().to_string () % block);
BOOST_LOG (node.log) << boost::str (boost::format ("Processing block %1%: %2%") % hash.to_string () % block);
}
if (node.block_arrival.recent (hash))
{
node.active.start (block_a);
}
queue_unchecked (transaction_a, hash);
break;
}
case rai::process_result::gap_previous:
{
if (node.config.logging.ledger_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Gap previous for: %1%") % block_a->hash ().to_string ());
BOOST_LOG (node.log) << boost::str (boost::format ("Gap previous for: %1%") % hash.to_string ());
}
node.store.unchecked_put (transaction_a, block_a->previous (), block_a);
node.gap_cache.add (transaction_a, block_a);
Expand All @@ -1359,7 +1342,7 @@ rai::process_return rai::block_processor::process_receive_one (MDB_txn * transac
{
if (node.config.logging.ledger_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Gap source for: %1%") % block_a->hash ().to_string ());
BOOST_LOG (node.log) << boost::str (boost::format ("Gap source for: %1%") % hash.to_string ());
}
node.store.unchecked_put (transaction_a, node.ledger.block_source (transaction_a, *block_a), block_a);
node.gap_cache.add (transaction_a, block_a);
Expand All @@ -1369,7 +1352,7 @@ rai::process_return rai::block_processor::process_receive_one (MDB_txn * transac
{
if (node.config.logging.ledger_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("State blocks are disabled: %1%") % block_a->hash ().to_string ());
BOOST_LOG (node.log) << boost::str (boost::format ("State blocks are disabled: %1%") % hash.to_string ());
}
node.store.unchecked_put (transaction_a, node.ledger.state_block_parse_canary, block_a);
node.gap_cache.add (transaction_a, block_a);
Expand All @@ -1381,86 +1364,99 @@ rai::process_return rai::block_processor::process_receive_one (MDB_txn * transac
{
BOOST_LOG (node.log) << boost::str (boost::format ("Old for: %1%") % block_a->hash ().to_string ());
}
queue_unchecked (transaction_a, hash);
break;
}
case rai::process_result::bad_signature:
{
if (node.config.logging.ledger_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Bad signature for: %1%") % block_a->hash ().to_string ());
BOOST_LOG (node.log) << boost::str (boost::format ("Bad signature for: %1%") % hash.to_string ());
}
break;
}
case rai::process_result::negative_spend:
{
if (node.config.logging.ledger_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Negative spend for: %1%") % block_a->hash ().to_string ());
BOOST_LOG (node.log) << boost::str (boost::format ("Negative spend for: %1%") % hash.to_string ());
}
break;
}
case rai::process_result::unreceivable:
{
if (node.config.logging.ledger_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Unreceivable for: %1%") % block_a->hash ().to_string ());
BOOST_LOG (node.log) << boost::str (boost::format ("Unreceivable for: %1%") % hash.to_string ());
}
break;
}
case rai::process_result::not_receive_from_send:
{
if (node.config.logging.ledger_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Not receive from send for: %1%") % block_a->hash ().to_string ());
BOOST_LOG (node.log) << boost::str (boost::format ("Not receive from send for: %1%") % hash.to_string ());
}
break;
}
case rai::process_result::fork:
{
if (!node.block_arrival.recent (block_a->hash ()))
if (!node.block_arrival.recent (hash))
{
// Only let the bootstrap attempt know about forked blocks that did not arrive via UDP.
node.bootstrap_initiator.process_fork (transaction_a, block_a);
}
if (node.config.logging.ledger_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Fork for: %1% root: %2%") % block_a->hash ().to_string () % block_a->root ().to_string ());
BOOST_LOG (node.log) << boost::str (boost::format ("Fork for: %1% root: %2%") % hash.to_string () % block_a->root ().to_string ());
}
break;
}
case rai::process_result::account_mismatch:
{
if (node.config.logging.ledger_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Account mismatch for: %1%") % block_a->hash ().to_string ());
BOOST_LOG (node.log) << boost::str (boost::format ("Account mismatch for: %1%") % hash.to_string ());
}
break;
}
case rai::process_result::opened_burn_account:
{
BOOST_LOG (node.log) << boost::str (boost::format ("*** Rejecting open block for burn account ***: %1%") % block_a->hash ().to_string ());
BOOST_LOG (node.log) << boost::str (boost::format ("*** Rejecting open block for burn account ***: %1%") % hash.to_string ());
break;
}
case rai::process_result::balance_mismatch:
{
if (node.config.logging.ledger_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Balance mismatch for: %1%") % block_a->hash ().to_string ());
BOOST_LOG (node.log) << boost::str (boost::format ("Balance mismatch for: %1%") % hash.to_string ());
}
break;
}
case rai::process_result::block_position:
{
if (node.config.logging.ledger_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Block %1% cannot follow predecessor %2%") % block_a->hash ().to_string () % block_a->previous ().to_string ());
BOOST_LOG (node.log) << boost::str (boost::format ("Block %1% cannot follow predecessor %2%") % hash.to_string () % block_a->previous ().to_string ());
}
break;
}
}
return result;
}

void rai::block_processor::queue_unchecked (MDB_txn * transaction_a, rai::block_hash const & hash_a)
{
auto cached (node.store.unchecked_get (transaction_a, hash_a));
for (auto i (cached.begin ()), n (cached.end ()); i != n; ++i)
{
node.store.unchecked_del (transaction_a, hash_a, **i);
add (*i);
}
std::lock_guard<std::mutex> lock (node.gap_cache.mutex);
node.gap_cache.blocks.get<1> ().erase (hash_a);
}

rai::node::node (rai::node_init & init_a, boost::asio::io_service & service_a, uint16_t peering_port_a, boost::filesystem::path const & application_path_a, rai::alarm & alarm_a, rai::logging const & logging_a, rai::work_pool & work_a) :
node (init_a, service_a, application_path_a, alarm_a, rai::node_config (peering_port_a, logging_a), work_a)
{
Expand Down
1 change: 1 addition & 0 deletions rai/node/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ class block_processor
rai::process_return process_receive_one (MDB_txn *, std::shared_ptr<rai::block>);

private:
void queue_unchecked (MDB_txn *, rai::block_hash const &);
void process_receive_many (std::unique_lock<std::mutex> &);
bool stopped;
bool active;
Expand Down
13 changes: 1 addition & 12 deletions rai/node/rpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2596,25 +2596,14 @@ void rai::rpc_handler::process ()
auto hash (block->hash ());
node.block_arrival.add (hash);
rai::process_return result;
std::shared_ptr<rai::block> block_a (std::move (block));
{
rai::transaction transaction (node.store.environment, nullptr, true);
result = node.block_processor.process_receive_one (transaction, block_a);
result = node.block_processor.process_receive_one (transaction, std::move (block));
}
switch (result.code)
{
case rai::process_result::progress:
{
rai::transaction transaction (node.store.environment, nullptr, false);
auto account (node.ledger.account (transaction, hash));
auto amount (node.ledger.amount (transaction, hash));
bool is_state_send (false);
if (auto state = dynamic_cast<rai::state_block *> (block_a.get ()))
{
rai::transaction transaction (node.store.environment, nullptr, false);
is_state_send = node.ledger.is_send (transaction, *state);
}
node.observers.blocks (block_a, account, amount, is_state_send);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While it can be just replaced here
node.active.start (block_a);

boost::property_tree::ptree response_l;
response_l.put ("hash", hash.to_string ());
response (response_l);
Expand Down