Skip to content

Commit

Permalink
Optionally disable local work when work peers are down
Browse files Browse the repository at this point in the history
  • Loading branch information
PlasmaPower committed Apr 11, 2018
1 parent 603599f commit 7a31b6b
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 8 deletions.
2 changes: 1 addition & 1 deletion rai/lib/work.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ done (false),
opencl (opencl_a)
{
static_assert (ATOMIC_INT_LOCK_FREE == 2, "Atomic int needed");
auto count (rai::rai_network == rai::rai_networks::rai_test_network ? 1 : std::max (1u, std::min (max_threads_a, std::thread::hardware_concurrency ())));
auto count (rai::rai_network == rai::rai_networks::rai_test_network ? 1 : std::min (max_threads_a, std::max (1u, std::thread::hardware_concurrency ())));
for (auto i (0); i < count; ++i)
{
auto thread (std::thread ([this, i]() {
Expand Down
37 changes: 30 additions & 7 deletions rai/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1051,7 +1051,6 @@ bool rai::node_config::deserialize_json (bool & upgraded_a, boost::property_tree
result |= password_fanout < 16;
result |= password_fanout > 1024 * 1024;
result |= io_threads == 0;
result |= work_threads == 0;
result |= state_block_parse_canary.decode_hex (state_block_parse_canary_l);
result |= state_block_generate_canary.decode_hex (state_block_generate_canary_l);
}
Expand Down Expand Up @@ -2124,10 +2123,11 @@ class work_request
class distributed_work : public std::enable_shared_from_this<distributed_work>
{
public:
distributed_work (std::shared_ptr<rai::node> const & node_a, rai::block_hash const & root_a, std::function<void(uint64_t)> callback_a) :
distributed_work (std::shared_ptr<rai::node> const & node_a, rai::block_hash const & root_a, std::function<void(uint64_t)> callback_a, uint backoff_a = 1) :
callback (callback_a),
node (node_a),
root (root_a)
root (root_a),
backoff (backoff_a)
{
completed.clear ();
for (auto & i : node_a->config.work_peers)
Expand Down Expand Up @@ -2293,10 +2293,32 @@ class distributed_work : public std::enable_shared_from_this<distributed_work>
{
if (!completed.test_and_set ())
{
auto callback_l (callback);
node->work.generate (root, [callback_l](boost::optional<uint64_t> const & work_a) {
callback_l (work_a.value ());
});
if (node->config.work_threads != 0 || node->work.opencl)
{
auto callback_l (callback);
node->work.generate (root, [callback_l](boost::optional<uint64_t> const & work_a) {
callback_l (work_a.value ());
});
}
else
{
if (backoff == 1 && node->config.logging.work_generation_time ())
{
BOOST_LOG (node->log) << "Work peer(s) failed to generate work for root " << root.to_string () << ", retrying...";
}
auto now (std::chrono::steady_clock::now ());
auto root_l (root);
auto callback_l (callback);
std::weak_ptr<rai::node> node_w (node);
auto next_backoff (std::min (backoff * 2, (uint)60 * 5));
node->alarm.add (now + std::chrono::seconds (backoff), [node_w, root_l, callback_l, next_backoff] {
if (auto node_l = node_w.lock ())
{
auto work_generation (std::make_shared<distributed_work> (node_l, root_l, callback_l, next_backoff));
work_generation->start ();
}
});
}
}
}
}
Expand All @@ -2307,6 +2329,7 @@ class distributed_work : public std::enable_shared_from_this<distributed_work>
return outstanding.empty ();
}
std::function<void(uint64_t)> callback;
uint backoff; // in seconds
std::shared_ptr<rai::node> node;
rai::block_hash root;
std::mutex mutex;
Expand Down

0 comments on commit 7a31b6b

Please sign in to comment.