Skip to content

Commit

Permalink
Websocket subscription for work notifications (#2289)
Browse files Browse the repository at this point in the history
* Add websocket subscription for work notifications

* Move hash to request

* Checking if websocket_server is being used

* Fix an issue where distributed_work would think it had done work if local work generation was stopped

* Review comments

* Fix intermittent test failures with TSAN
  • Loading branch information
guilhermelawless authored Sep 8, 2019
1 parent c20c265 commit 90ec334
Show file tree
Hide file tree
Showing 6 changed files with 258 additions and 56 deletions.
53 changes: 34 additions & 19 deletions nano/core_test/distributed_work.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,45 +26,52 @@ TEST (distributed_work, no_peers)
ASSERT_FALSE (nano::work_validate (hash, *work));
// should only be removed after cleanup
ASSERT_EQ (1, node->distributed_work.work.size ());
node->distributed_work.cleanup_finished ();
ASSERT_EQ (0, node->distributed_work.work.size ());
while (!node->distributed_work.work.empty ())
{
node->distributed_work.cleanup_finished ();
ASSERT_NO_ERROR (system.poll ());
}
}

TEST (distributed_work, no_peers_cancel)
{
nano::system system (24000, 1);
auto node (system.nodes[0]);
nano::system system (24000, 0);
nano::node_config node_config (24000, system.logging);
node_config.max_work_generate_multiplier = 1e6;
node_config.max_work_generate_difficulty = nano::difficulty::from_multiplier (node_config.max_work_generate_multiplier, nano::network_constants::publish_test_threshold);
auto & node = *system.add_node (node_config);
nano::block_hash hash;
bool done{ false };
auto callback_to_cancel = [&done](boost::optional<uint64_t> work_a) {
ASSERT_FALSE (work_a.is_initialized ());
done = true;
};
node->distributed_work.make (hash, callback_to_cancel, nano::difficulty::from_multiplier (1000000, node->network_params.network.publish_threshold));
ASSERT_EQ (1, node->distributed_work.work.size ());
node.distributed_work.make (hash, callback_to_cancel, nano::difficulty::from_multiplier (1e6, node.network_params.network.publish_threshold));
ASSERT_EQ (1, node.distributed_work.work.size ());
// cleanup should not cancel or remove an ongoing work
node->distributed_work.cleanup_finished ();
ASSERT_EQ (1, node->distributed_work.work.size ());
node.distributed_work.cleanup_finished ();
ASSERT_EQ (1, node.distributed_work.work.size ());

// manually cancel
node->distributed_work.cancel (hash, true); // forces local stop
system.deadline_set (5s);
node.distributed_work.cancel (hash, true); // forces local stop
system.deadline_set (20s);
while (!done)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_TRUE (node->distributed_work.work.empty ());
ASSERT_TRUE (node.distributed_work.work.empty ());

// now using observer
done = false;
node->distributed_work.make (hash, callback_to_cancel, nano::difficulty::from_multiplier (1000000, node->network_params.network.publish_threshold));
ASSERT_EQ (1, node->distributed_work.work.size ());
node->observers.work_cancel.notify (hash);
node.distributed_work.make (hash, callback_to_cancel, nano::difficulty::from_multiplier (1000000, node.network_params.network.publish_threshold));
ASSERT_EQ (1, node.distributed_work.work.size ());
node.observers.work_cancel.notify (hash);
system.deadline_set (20s);
while (!done)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_TRUE (node->distributed_work.work.empty ());
ASSERT_TRUE (node.distributed_work.work.empty ());
}

TEST (distributed_work, no_peers_multi)
Expand Down Expand Up @@ -95,8 +102,12 @@ TEST (distributed_work, no_peers_multi)
{
ASSERT_NO_ERROR (system.poll ());
}
node->distributed_work.cleanup_finished ();
ASSERT_EQ (0, node->distributed_work.work.size ());
system.deadline_set (5s);
while (node->distributed_work.work.empty ())
{
node->distributed_work.cleanup_finished ();
ASSERT_NO_ERROR (system.poll ());
}
count = 0;
// Test many works for different roots
for (unsigned i{ 0 }; i < total; ++i)
Expand All @@ -115,7 +126,11 @@ TEST (distributed_work, no_peers_multi)
{
ASSERT_NO_ERROR (system.poll ());
}
node->distributed_work.cleanup_finished ();
ASSERT_EQ (0, node->distributed_work.work.size ());
system.deadline_set (5s);
while (node->distributed_work.work.empty ())
{
node->distributed_work.cleanup_finished ();
ASSERT_NO_ERROR (system.poll ());
}
count = 0;
}
77 changes: 77 additions & 0 deletions nano/core_test/websocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -708,3 +708,80 @@ TEST (websocket, vote_options)
client_thread_2.join ();
node1->stop ();
}

// Test client subscribing to notifications for work generation
TEST (websocket, work)
{
nano::system system (24000, 1);
nano::node_config config;
nano::node_flags node_flags;
config.websocket_config.enabled = true;
config.websocket_config.port = 24078;

auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::unique_path (), system.alarm, config, system.work, node_flags));
node1->start ();
system.nodes.push_back (node1);

ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::work));

// Subscribe to work and wait for response asynchronously
ack_ready = false;
auto client_task = ([]() -> boost::optional<std::string> {
auto response = websocket_test_call ("::1", "24078", R"json({"action": "subscribe", "topic": "work", "ack": true})json", true, true);
return response;
});
auto client_future = std::async (std::launch::async, client_task);

// Wait for acknowledge
system.deadline_set (5s);
while (!ack_ready)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::work));

// Generate work
nano::block_hash hash;
auto work (node1->work_generate_blocking (hash));
ASSERT_TRUE (work.is_initialized ());

// Wait for the work notification
system.deadline_set (5s);
while (client_future.wait_for (std::chrono::seconds (0)) != std::future_status::ready)
{
ASSERT_NO_ERROR (system.poll ());
}

// Check the work notification message
auto response = client_future.get ();
ASSERT_TRUE (response);
std::stringstream stream;
stream << response;
boost::property_tree::ptree event;
boost::property_tree::read_json (stream, event);
ASSERT_EQ (event.get<std::string> ("topic"), "work");

auto & contents = event.get_child ("message");
ASSERT_EQ (contents.get<std::string> ("success"), "true");
ASSERT_LT (contents.get<unsigned> ("duration"), 10000);

ASSERT_EQ (1, contents.count ("request"));
auto & request = contents.get_child ("request");
ASSERT_EQ (request.get<std::string> ("hash"), hash.to_string ());
ASSERT_EQ (request.get<std::string> ("difficulty"), nano::to_string_hex (node1->network_params.network.publish_threshold));
ASSERT_EQ (request.get<double> ("multiplier"), 1.0);

ASSERT_EQ (1, contents.count ("result"));
auto & result = contents.get_child ("result");
uint64_t result_difficulty;
nano::from_string_hex (result.get<std::string> ("difficulty"), result_difficulty);
ASSERT_GE (result_difficulty, node1->network_params.network.publish_threshold);
ASSERT_NEAR (result.get<double> ("multiplier"), nano::difficulty::to_multiplier (result_difficulty, node1->network_params.network.publish_threshold), 1e-6);
ASSERT_EQ (result.get<std::string> ("work"), nano::to_string_hex (work.get ()));

ASSERT_EQ (1, contents.count ("bad_peers"));
auto & bad_peers = contents.get_child ("bad_peers");
ASSERT_TRUE (bad_peers.empty ());

ASSERT_EQ (contents.get<std::string> ("reason"), "");
}
Loading

0 comments on commit 90ec334

Please sign in to comment.