Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve ledger cache initialization with multiple threads #2876

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions nano/core_test/ledger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3283,3 +3283,104 @@ TEST (ledger, block_confirmed)
ledger.store.confirmation_height_put (transaction, nano::genesis_account, height);
ASSERT_TRUE (ledger.block_confirmed (transaction, send1->hash ()));
}

TEST (ledger, cache)
{
nano::logger_mt logger;
auto store = nano::make_store (logger, nano::unique_path ());
ASSERT_TRUE (!store->init_error ());
nano::stat stats;
nano::ledger ledger (*store, stats);
nano::genesis genesis;
store->initialize (store->tx_begin_write (), genesis, ledger.cache);
nano::work_pool pool (std::numeric_limits<unsigned>::max ());
nano::block_builder builder;

size_t const total = 100;

// Check existing ledger (incremental cache update) and reload on a new ledger
for (size_t i (0); i < total; ++i)
{
auto account_count = 1 + i;
auto block_count = 1 + 2 * (i + 1) - 2;
auto cemented_count = 1 + 2 * (i + 1) - 2;
auto genesis_weight = nano::genesis_amount - i;

auto cache_check = [&, i](nano::ledger_cache const & cache_a) {
ASSERT_EQ (account_count, cache_a.account_count);
ASSERT_EQ (block_count, cache_a.block_count);
ASSERT_EQ (cemented_count, cache_a.cemented_count);
ASSERT_EQ (genesis_weight, cache_a.rep_weights.representation_get (nano::genesis_account));
};

nano::keypair key;
auto const latest = ledger.latest (store->tx_begin_read (), nano::genesis_account);
auto send = builder.state ()
.account (nano::genesis_account)
.previous (latest)
.representative (nano::genesis_account)
.balance (nano::genesis_amount - (i + 1))
.link (key.pub)
.sign (nano::dev_genesis_key.prv, nano::dev_genesis_key.pub)
.work (*pool.generate (latest))
.build ();
auto open = builder.state ()
.account (key.pub)
.previous (0)
.representative (key.pub)
.balance (1)
.link (send->hash ())
.sign (key.prv, key.pub)
.work (*pool.generate (key.pub))
.build ();
{
auto transaction (store->tx_begin_write ());
ASSERT_EQ (nano::process_result::progress, ledger.process (transaction, *send).code);
}

++block_count;
--genesis_weight;
cache_check (ledger.cache);
cache_check (nano::ledger (*store, stats).cache);

{
auto transaction (store->tx_begin_write ());
ASSERT_EQ (nano::process_result::progress, ledger.process (transaction, *open).code);
}

++block_count;
++account_count;
cache_check (ledger.cache);
cache_check (nano::ledger (*store, stats).cache);

{
auto transaction (store->tx_begin_write ());
nano::confirmation_height_info height;
ASSERT_FALSE (ledger.store.confirmation_height_get (transaction, nano::genesis_account, height));
++height.height;
height.frontier = send->hash ();
ledger.store.confirmation_height_put (transaction, nano::genesis_account, height);
ASSERT_TRUE (ledger.block_confirmed (transaction, send->hash ()));
++ledger.cache.cemented_count;
}

++cemented_count;
cache_check (ledger.cache);
cache_check (nano::ledger (*store, stats).cache);

{
auto transaction (store->tx_begin_write ());
nano::confirmation_height_info height;
ledger.store.confirmation_height_get (transaction, key.pub, height);
height.height += 1;
height.frontier = open->hash ();
ledger.store.confirmation_height_put (transaction, key.pub, height);
ASSERT_TRUE (ledger.block_confirmed (transaction, open->hash ()));
++ledger.cache.cemented_count;
}

++cemented_count;
cache_check (ledger.cache);
cache_check (nano::ledger (*store, stats).cache);
}
}
19 changes: 15 additions & 4 deletions nano/lib/rep_weights.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,30 @@ void nano::rep_weights::representation_put (nano::account const & account_a, nan
put (account_a, representation_a);
}

nano::uint128_t nano::rep_weights::representation_get (nano::account const & account_a)
nano::uint128_t nano::rep_weights::representation_get (nano::account const & account_a) const
{
nano::lock_guard<std::mutex> lk (mutex);
return get (account_a);
}

/** Makes a copy */
std::unordered_map<nano::account, nano::uint128_t> nano::rep_weights::get_rep_amounts ()
std::unordered_map<nano::account, nano::uint128_t> nano::rep_weights::get_rep_amounts () const
{
nano::lock_guard<std::mutex> guard (mutex);
return rep_amounts;
}

void nano::rep_weights::copy_from (nano::rep_weights & other_a)
{
nano::lock_guard<std::mutex> guard_this (mutex);
nano::lock_guard<std::mutex> guard_other (other_a.mutex);
for (auto const & entry : other_a.rep_amounts)
{
auto prev_amount (get (entry.first));
put (entry.first, prev_amount + entry.second);
}
}

void nano::rep_weights::put (nano::account const & account_a, nano::uint128_union const & representation_a)
{
auto it = rep_amounts.find (account_a);
Expand All @@ -57,7 +68,7 @@ void nano::rep_weights::put (nano::account const & account_a, nano::uint128_unio
}
}

nano::uint128_t nano::rep_weights::get (nano::account const & account_a)
nano::uint128_t nano::rep_weights::get (nano::account const & account_a) const
{
auto it = rep_amounts.find (account_a);
if (it != rep_amounts.end ())
Expand All @@ -70,7 +81,7 @@ nano::uint128_t nano::rep_weights::get (nano::account const & account_a)
}
}

std::unique_ptr<nano::container_info_component> nano::collect_container_info (nano::rep_weights & rep_weights, const std::string & name)
std::unique_ptr<nano::container_info_component> nano::collect_container_info (nano::rep_weights const & rep_weights, const std::string & name)
{
size_t rep_amounts_count;

Expand Down
13 changes: 7 additions & 6 deletions nano/lib/rep_weights.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,19 @@ class rep_weights
public:
void representation_add (nano::account const & source_rep_a, nano::uint128_t const & amount_a);
void representation_add_dual (nano::account const & source_rep_1, nano::uint128_t const & amount_1, nano::account const & source_rep_2, nano::uint128_t const & amount_2);
nano::uint128_t representation_get (nano::account const & account_a);
nano::uint128_t representation_get (nano::account const & account_a) const;
void representation_put (nano::account const & account_a, nano::uint128_union const & representation_a);
std::unordered_map<nano::account, nano::uint128_t> get_rep_amounts ();
std::unordered_map<nano::account, nano::uint128_t> get_rep_amounts () const;
void copy_from (rep_weights & other_a);

private:
std::mutex mutex;
mutable std::mutex mutex;
std::unordered_map<nano::account, nano::uint128_t> rep_amounts;
void put (nano::account const & account_a, nano::uint128_union const & representation_a);
nano::uint128_t get (nano::account const & account_a);
nano::uint128_t get (nano::account const & account_a) const;

friend std::unique_ptr<container_info_component> collect_container_info (rep_weights &, const std::string &);
friend std::unique_ptr<container_info_component> collect_container_info (rep_weights const &, const std::string &);
};

std::unique_ptr<container_info_component> collect_container_info (rep_weights &, const std::string &);
std::unique_ptr<container_info_component> collect_container_info (rep_weights const &, const std::string &);
}
3 changes: 3 additions & 0 deletions nano/secure/blockstore.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,9 @@ class block_store
virtual nano::store_iterator<nano::block_hash, std::shared_ptr<nano::block>> blocks_begin (nano::transaction const & transaction_a) const = 0;
virtual nano::store_iterator<nano::block_hash, std::shared_ptr<nano::block>> blocks_end () const = 0;

virtual void latest_for_each_par (std::function<void(nano::store_iterator<nano::account, nano::account_info>, nano::store_iterator<nano::account, nano::account_info>)> const &) = 0;
virtual void confirmation_height_for_each_par (std::function<void(nano::store_iterator<nano::account, nano::confirmation_height_info>, nano::store_iterator<nano::account, nano::confirmation_height_info>)> const &) = 0;

virtual uint64_t block_account_height (nano::transaction const & transaction_a, nano::block_hash const & hash_a) const = 0;
virtual std::mutex & get_cache_mutex () = 0;

Expand Down
54 changes: 54 additions & 0 deletions nano/secure/blockstore_partial.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@

#include <crypto/cryptopp/words.h>

#include <thread>

namespace
{
template <typename T>
void parallel_traversal (std::function<void(T const &, T const &, bool const)> const & action);
}

namespace nano
{
template <typename Val, typename Derived_Store>
Expand Down Expand Up @@ -701,6 +709,24 @@ class block_store_partial : public block_store
return count (transaction_a, tables::unchecked);
}

void latest_for_each_par (std::function<void(nano::store_iterator<nano::account, nano::account_info>, nano::store_iterator<nano::account, nano::account_info>)> const & action_a) override
{
parallel_traversal<nano::uint256_t> (
[&action_a, this](nano::uint256_t const & start, nano::uint256_t const & end, bool const is_last) {
auto transaction (this->tx_begin_read ());
action_a (this->latest_begin (transaction, start), !is_last ? this->latest_begin (transaction, end) : this->latest_end ());
});
}

void confirmation_height_for_each_par (std::function<void(nano::store_iterator<nano::account, nano::confirmation_height_info>, nano::store_iterator<nano::account, nano::confirmation_height_info>)> const & action_a) override
{
parallel_traversal<nano::uint256_t> (
[&action_a, this](nano::uint256_t const & start, nano::uint256_t const & end, bool const is_last) {
auto transaction (this->tx_begin_read ());
action_a (this->confirmation_height_begin (transaction, start), !is_last ? this->confirmation_height_begin (transaction, end) : this->confirmation_height_end ());
});
}

int const minimum_version{ 14 };

protected:
Expand Down Expand Up @@ -822,3 +848,31 @@ class block_predecessor_set : public nano::block_visitor
nano::block_store_partial<Val, Derived_Store> & store;
};
}

namespace
{
template <typename T>
void parallel_traversal (std::function<void(T const &, T const &, bool const)> const & action)
{
// Between 10 and 40 threads, scales well even in low power systems as long as actions are I/O bound
unsigned const thread_count = std::max (10u, std::min (40u, 10 * std::thread::hardware_concurrency ()));
T const value_max{ std::numeric_limits<T>::max () };
T const split = value_max / thread_count;
std::vector<std::thread> threads;
threads.reserve (thread_count);
for (unsigned thread (0); thread < thread_count; ++thread)
{
T const start = thread * split;
T const end = (thread + 1) * split;
bool const is_last = thread == thread_count - 1;

threads.emplace_back ([&action, start, end, is_last] {
action (start, end, is_last);
});
}
for (auto & thread : threads)
{
thread.join ();
}
}
}
51 changes: 37 additions & 14 deletions nano/secure/ledger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -741,28 +741,51 @@ epoch_2_started_cb (epoch_2_started_cb_a)
{
if (!store.init_error ())
{
auto transaction = store.tx_begin_read ();
if (generate_cache_a.reps || generate_cache_a.account_count || generate_cache_a.epoch_2 || generate_cache_a.block_count)
{
initialize (generate_cache_a);
}
}

void nano::ledger::initialize (nano::generate_cache const & generate_cache_a)
{
auto transaction = store.tx_begin_read ();

if (generate_cache_a.reps || generate_cache_a.account_count || generate_cache_a.epoch_2 || generate_cache_a.block_count)
{
store.latest_for_each_par (
[this](nano::store_iterator<nano::account, nano::account_info> i, nano::store_iterator<nano::account, nano::account_info> n) {
uint64_t block_count_l{ 0 };
uint64_t account_count_l{ 0 };
decltype (this->cache.rep_weights) rep_weights_l;
bool epoch_2_started_l{ false };
for (auto i (store.latest_begin (transaction)), n (store.latest_end ()); i != n; ++i)
for (; i != n; ++i)
{
nano::account_info const & info (i->second);
cache.rep_weights.representation_add (info.representative, info.balance.number ());
++cache.account_count;
cache.block_count += info.block_count;
block_count_l += info.block_count;
++account_count_l;
rep_weights_l.representation_add (info.representative, info.balance.number ());
epoch_2_started_l = epoch_2_started_l || info.epoch () == nano::epoch::epoch_2;
}
cache.epoch_2_started.store (epoch_2_started_l);
}
if (epoch_2_started_l)
{
this->cache.epoch_2_started.store (true);
}
this->cache.block_count += block_count_l;
this->cache.account_count += account_count_l;
this->cache.rep_weights.copy_from (rep_weights_l);
});
}

if (generate_cache_a.cemented_count)
{
for (auto i (store.confirmation_height_begin (transaction)), n (store.confirmation_height_end ()); i != n; ++i)
if (generate_cache_a.cemented_count)
{
store.confirmation_height_for_each_par (
[this](nano::store_iterator<nano::account, nano::confirmation_height_info> i, nano::store_iterator<nano::account, nano::confirmation_height_info> n) {
uint64_t cemented_count_l (0);
for (; i != n; ++i)
{
cache.cemented_count += i->second.height;
cemented_count_l += i->second.height;
}
}
this->cache.cemented_count += cemented_count_l;
});
}
}

Expand Down
3 changes: 3 additions & 0 deletions nano/secure/ledger.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ class ledger final
uint64_t bootstrap_weight_max_blocks{ 1 };
std::atomic<bool> check_bootstrap_weights;
std::function<void()> epoch_2_started_cb;

private:
void initialize (nano::generate_cache const &);
};

std::unique_ptr<container_info_component> collect_container_info (ledger & ledger, const std::string & name);
Expand Down