Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move write database queue #4535

Merged
merged 5 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 9 additions & 13 deletions nano/core_test/confirming_set.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@ using namespace std::chrono_literals;
TEST (confirming_set, construction)
{
auto ctx = nano::test::context::ledger_empty ();
nano::write_database_queue write_queue{ false };
nano::confirming_set confirming_set (ctx.ledger (), write_queue);
nano::confirming_set confirming_set (ctx.ledger ());
}

TEST (confirming_set, add_exists)
{
auto ctx = nano::test::context::ledger_send_receive ();
nano::write_database_queue write_queue{ false };
nano::confirming_set confirming_set (ctx.ledger (), write_queue);
nano::confirming_set confirming_set (ctx.ledger ());
auto send = ctx.blocks ()[0];
confirming_set.add (send->hash ());
ASSERT_TRUE (confirming_set.exists (send->hash ()));
Expand All @@ -35,8 +33,7 @@ TEST (confirming_set, add_exists)
TEST (confirming_set, process_one)
{
auto ctx = nano::test::context::ledger_send_receive ();
nano::write_database_queue write_queue{ false };
nano::confirming_set confirming_set (ctx.ledger (), write_queue);
nano::confirming_set confirming_set (ctx.ledger ());
std::atomic<int> count = 0;
std::mutex mutex;
std::condition_variable condition;
Expand All @@ -52,8 +49,7 @@ TEST (confirming_set, process_one)
TEST (confirming_set, process_multiple)
{
auto ctx = nano::test::context::ledger_send_receive ();
nano::write_database_queue write_queue{ false };
nano::confirming_set confirming_set (ctx.ledger (), write_queue);
nano::confirming_set confirming_set (ctx.ledger ());
std::atomic<int> count = 0;
std::mutex mutex;
std::condition_variable condition;
Expand Down Expand Up @@ -118,7 +114,7 @@ TEST (confirmation_callback, confirmed_history)
{
nano::test::system system;
nano::node_flags node_flags;
node_flags.force_use_write_database_queue = true;
node_flags.force_use_write_queue = true;
node_flags.disable_ascending_bootstrap = true;
nano::node_config node_config = system.default_config ();
node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
Expand Down Expand Up @@ -155,7 +151,7 @@ TEST (confirmation_callback, confirmed_history)
ASSERT_TIMELY (5s, election = nano::test::start_election (system, *node, send1->hash ()));
{
// The write guard prevents the confirmation height processor doing any writes
auto write_guard = node->write_database_queue.wait (nano::writer::testing);
auto write_guard = node->store.write_queue.wait (nano::store::writer::testing);

// Confirm send1
election->force_confirm ();
Expand All @@ -166,13 +162,13 @@ TEST (confirmation_callback, confirmed_history)
auto transaction = node->store.tx_begin_read ();
ASSERT_FALSE (node->ledger.block_confirmed (transaction, send->hash ()));

ASSERT_TIMELY (10s, node->write_database_queue.contains (nano::writer::confirmation_height));
ASSERT_TIMELY (10s, node->store.write_queue.contains (nano::store::writer::confirmation_height));

// Confirm that no inactive callbacks have been called when the confirmation height processor has already iterated over it, waiting to write
ASSERT_EQ (0, node->stats.count (nano::stat::type::confirmation_observer, nano::stat::detail::inactive_conf_height, nano::stat::dir::out));
}

ASSERT_TIMELY (10s, !node->write_database_queue.contains (nano::writer::confirmation_height));
ASSERT_TIMELY (10s, !node->store.write_queue.contains (nano::store::writer::confirmation_height));

auto transaction = node->store.tx_begin_read ();
ASSERT_TRUE (node->ledger.block_confirmed (transaction, send->hash ()));
Expand All @@ -196,7 +192,7 @@ TEST (confirmation_callback, dependent_election)
{
nano::test::system system;
nano::node_flags node_flags;
node_flags.force_use_write_database_queue = true;
node_flags.force_use_write_queue = true;
nano::node_config node_config = system.default_config ();
node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
auto node = system.add_node (node_config, node_flags);
Expand Down
4 changes: 2 additions & 2 deletions nano/core_test/ledger_confirm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,7 @@ TEST (ledger_confirm, pruned_source)
nano::stats stats;
nano::ledger ledger (*store, stats, nano::dev::constants);
ledger.pruning = true;
nano::write_database_queue write_database_queue (false);
nano::store::write_queue write_queue (false);
nano::work_pool pool{ nano::dev::network_params.network, std::numeric_limits<unsigned>::max () };
nano::keypair key1, key2;
nano::block_builder builder;
Expand Down Expand Up @@ -868,7 +868,7 @@ TEST (ledger_confirmDeathTest, rollback_added_block)
ASSERT_TRUE (!store->init_error ());
nano::stats stats;
nano::ledger ledger (*store, stats, nano::dev::constants);
nano::write_database_queue write_database_queue (false);
nano::store::write_queue write_queue (false);
nano::work_pool pool{ nano::dev::network_params.network, std::numeric_limits<unsigned>::max () };
nano::keypair key1;
nano::block_builder builder;
Expand Down
8 changes: 4 additions & 4 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2663,7 +2663,7 @@ TEST (node, block_processor_full)
{
nano::test::system system;
nano::node_flags node_flags;
node_flags.force_use_write_database_queue = true;
node_flags.force_use_write_queue = true;
node_flags.block_processor_full_size = 3;
auto & node = *system.add_node (nano::node_config (system.get_available_port ()), node_flags);
nano::state_block_builder builder;
Expand Down Expand Up @@ -2709,7 +2709,7 @@ TEST (node, block_processor_half_full)
nano::test::system system;
nano::node_flags node_flags;
node_flags.block_processor_full_size = 6;
node_flags.force_use_write_database_queue = true;
node_flags.force_use_write_queue = true;
auto & node = *system.add_node (nano::node_config (system.get_available_port ()), node_flags);
nano::state_block_builder builder;
auto send1 = builder.make_block ()
Expand Down Expand Up @@ -2740,7 +2740,7 @@ TEST (node, block_processor_half_full)
.work (*node.work_generate_blocking (send2->hash ()))
.build ();
// The write guard prevents block processor doing any writes
auto write_guard = node.write_database_queue.wait (nano::writer::testing);
auto write_guard = node.store.write_queue.wait (nano::store::writer::testing);
node.block_processor.add (send1);
ASSERT_FALSE (node.block_processor.half_full ());
node.block_processor.add (send2);
Expand Down Expand Up @@ -3097,7 +3097,7 @@ TEST (node, rollback_vote_self)

{
// The write guard prevents the block processor from performing the rollback
auto write_guard = node.write_database_queue.wait (nano::writer::testing);
auto write_guard = node.store.write_queue.wait (nano::store::writer::testing);

ASSERT_EQ (0, election->votes_with_weight ().size ());
// Vote with key to switch the winner
Expand Down
2 changes: 0 additions & 2 deletions nano/node/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,6 @@ add_library(
websocketconfig.cpp
websocket_stream.hpp
websocket_stream.cpp
write_database_queue.hpp
write_database_queue.cpp
messages.hpp
messages.cpp
xorshift.hpp)
Expand Down
7 changes: 3 additions & 4 deletions nano/node/blockprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,9 @@ void nano::block_processor::context::set_result (result_t const & result)
* block_processor
*/

nano::block_processor::block_processor (nano::node & node_a, nano::write_database_queue & write_database_queue_a) :
nano::block_processor::block_processor (nano::node & node_a) :
config{ node_a.config.block_processor },
node (node_a),
write_database_queue (write_database_queue_a),
next_log (std::chrono::steady_clock::now ())
{
batch_processed.add ([this] (auto const & items) {
Expand Down Expand Up @@ -300,7 +299,7 @@ auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock
{
processed_batch_t processed;

auto scoped_write_guard = write_database_queue.wait (nano::writer::process_batch);
auto scoped_write_guard = node.store.write_queue.wait (nano::store::writer::process_batch);
auto transaction (node.store.tx_begin_write ({ tables::accounts, tables::blocks, tables::pending, tables::rep_weights }));
nano::timer<std::chrono::milliseconds> timer_l;

Expand Down Expand Up @@ -509,4 +508,4 @@ nano::error nano::block_processor_config::deserialize (nano::tomlconfig & toml)
toml.get ("priority_local", priority_local);

return toml.get_error ();
}
}
4 changes: 1 addition & 3 deletions nano/node/blockprocessor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ namespace nano
{
class block;
class node;
class write_database_queue;
}

namespace nano::store
Expand Down Expand Up @@ -86,7 +85,7 @@ class block_processor final
};

public:
block_processor (nano::node &, nano::write_database_queue &);
block_processor (nano::node &);
~block_processor ();

void start ();
Expand Down Expand Up @@ -127,7 +126,6 @@ class block_processor final
private: // Dependencies
block_processor_config const & config;
nano::node & node;
nano::write_database_queue & write_database_queue;

private:
nano::fair_queue<context, block_source> queue;
Expand Down
7 changes: 3 additions & 4 deletions nano/node/confirming_set.cpp
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
#include <nano/lib/thread_roles.hpp>
#include <nano/node/confirming_set.hpp>
#include <nano/node/write_database_queue.hpp>
#include <nano/secure/ledger.hpp>
#include <nano/store/component.hpp>
#include <nano/store/write_queue.hpp>

nano::confirming_set::confirming_set (nano::ledger & ledger, nano::write_database_queue & write_queue, std::chrono::milliseconds batch_time) :
nano::confirming_set::confirming_set (nano::ledger & ledger, std::chrono::milliseconds batch_time) :
ledger{ ledger },
write_queue{ write_queue },
batch_time{ batch_time }
{
}
Expand Down Expand Up @@ -72,7 +71,7 @@ void nano::confirming_set::run ()
for (auto i = processing.begin (), n = processing.end (); !stopped && i != n;)
{
lock.unlock (); // Waiting for db write is potentially slow
auto guard = write_queue.wait (nano::writer::confirmation_height);
auto guard = ledger.store.write_queue.wait (nano::store::writer::confirmation_height);
auto tx = ledger.store.tx_begin_write ({ nano::tables::confirmation_height });
lock.lock ();
// Process items in the back buffer within a single transaction for a limited amount of time
Expand Down
4 changes: 1 addition & 3 deletions nano/node/confirming_set.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ namespace nano
{
class block;
class ledger;
class write_database_queue;
}

namespace nano
Expand All @@ -27,7 +26,7 @@ class confirming_set final
friend class confirmation_height_pruned_source_Test;

public:
confirming_set (nano::ledger & ledger, nano::write_database_queue & write_queue, std::chrono::milliseconds batch_time = std::chrono::milliseconds{ 500 });
confirming_set (nano::ledger & ledger, std::chrono::milliseconds batch_time = std::chrono::milliseconds{ 500 });
~confirming_set ();
// Adds a block to the set of blocks to be confirmed
void add (nano::block_hash const & hash);
Expand All @@ -45,7 +44,6 @@ class confirming_set final
private:
void run ();
nano::ledger & ledger;
nano::write_database_queue & write_queue;
std::chrono::milliseconds batch_time;
std::unordered_set<nano::block_hash> set;
std::unordered_set<nano::block_hash> processing;
Expand Down
4 changes: 2 additions & 2 deletions nano/node/make_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
#include <nano/store/lmdb/lmdb.hpp>
#include <nano/store/rocksdb/rocksdb.hpp>

std::unique_ptr<nano::store::component> nano::make_store (nano::logger & logger, std::filesystem::path const & path, nano::ledger_constants & constants, bool read_only, bool add_db_postfix, nano::rocksdb_config const & rocksdb_config, nano::txn_tracking_config const & txn_tracking_config_a, std::chrono::milliseconds block_processor_batch_max_time_a, nano::lmdb_config const & lmdb_config_a, bool backup_before_upgrade)
std::unique_ptr<nano::store::component> nano::make_store (nano::logger & logger, std::filesystem::path const & path, nano::ledger_constants & constants, bool read_only, bool add_db_postfix, nano::rocksdb_config const & rocksdb_config, nano::txn_tracking_config const & txn_tracking_config_a, std::chrono::milliseconds block_processor_batch_max_time_a, nano::lmdb_config const & lmdb_config_a, bool backup_before_upgrade, bool force_use_write_queue)
{
if (rocksdb_config.enable)
{
return std::make_unique<nano::store::rocksdb::component> (logger, add_db_postfix ? path / "rocksdb" : path, constants, rocksdb_config, read_only);
return std::make_unique<nano::store::rocksdb::component> (logger, add_db_postfix ? path / "rocksdb" : path, constants, rocksdb_config, read_only, force_use_write_queue);
}

return std::make_unique<nano::store::lmdb::component> (logger, add_db_postfix ? path / "data.ldb" : path, constants, txn_tracking_config_a, block_processor_batch_max_time_a, lmdb_config_a, backup_before_upgrade);
Expand Down
2 changes: 1 addition & 1 deletion nano/node/make_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@ class component;

namespace nano
{
std::unique_ptr<nano::store::component> make_store (nano::logger &, std::filesystem::path const & path, nano::ledger_constants & constants, bool open_read_only = false, bool add_db_postfix = true, nano::rocksdb_config const & rocksdb_config = nano::rocksdb_config{}, nano::txn_tracking_config const & txn_tracking_config_a = nano::txn_tracking_config{}, std::chrono::milliseconds block_processor_batch_max_time_a = std::chrono::milliseconds (5000), nano::lmdb_config const & lmdb_config_a = nano::lmdb_config{}, bool backup_before_upgrade = false);
std::unique_ptr<nano::store::component> make_store (nano::logger &, std::filesystem::path const & path, nano::ledger_constants & constants, bool open_read_only = false, bool add_db_postfix = true, nano::rocksdb_config const & rocksdb_config = nano::rocksdb_config{}, nano::txn_tracking_config const & txn_tracking_config_a = nano::txn_tracking_config{}, std::chrono::milliseconds block_processor_batch_max_time_a = std::chrono::milliseconds (5000), nano::lmdb_config const & lmdb_config_a = nano::lmdb_config{}, bool backup_before_upgrade = false, bool force_use_write_queue = false);
}
9 changes: 4 additions & 5 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
io_ctx_shared{ io_ctx_a },
io_ctx{ *io_ctx_shared },
node_id{ load_or_create_node_id (application_path_a) },
write_database_queue (!flags_a.force_use_write_database_queue && (config_a.rocksdb_config.enable)),
node_initialized_latch (1),
config (config_a),
network_params{ config.network_params },
Expand All @@ -146,7 +145,7 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
flags (flags_a),
work (work_a),
distributed_work (*this),
store_impl (nano::make_store (logger, application_path_a, network_params.ledger, flags.read_only, true, config_a.rocksdb_config, config_a.diagnostics_config.txn_tracking, config_a.block_processor_batch_max_time, config_a.lmdb_config, config_a.backup_before_upgrade)),
store_impl (nano::make_store (logger, application_path_a, network_params.ledger, flags.read_only, true, config_a.rocksdb_config, config_a.diagnostics_config.txn_tracking, config_a.block_processor_batch_max_time, config_a.lmdb_config, config_a.backup_before_upgrade, flags.force_use_write_queue)),
store (*store_impl),
unchecked{ config.max_unchecked_blocks, stats, flags.disable_block_processor_unchecked_deletion },
wallets_store_impl (std::make_unique<nano::mdb_wallets_store> (application_path_a / "wallets.ldb", config_a.lmdb_config)),
Expand All @@ -171,8 +170,8 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
tcp_listener{ std::make_shared<nano::transport::tcp_listener> (network.port, *this, config.tcp_incoming_connections_max) },
application_path (application_path_a),
port_mapping (*this),
block_processor (*this, write_database_queue),
confirming_set_impl{ std::make_unique<nano::confirming_set> (ledger, write_database_queue, config.confirming_set_batch_time) },
block_processor (*this),
confirming_set_impl{ std::make_unique<nano::confirming_set> (ledger, config.confirming_set_batch_time) },
confirming_set{ *confirming_set_impl },
active_impl{ std::make_unique<nano::active_transactions> (*this, confirming_set, block_processor) },
active{ *active_impl },
Expand Down Expand Up @@ -1005,7 +1004,7 @@ void nano::node::ledger_pruning (uint64_t const batch_size_a, bool bootstrap_wei
transaction_write_count = 0;
if (!pruning_targets.empty () && !stopped)
{
auto scoped_write_guard = write_database_queue.wait (nano::writer::pruning);
auto scoped_write_guard = store.write_queue.wait (nano::store::writer::pruning);
auto write_transaction (store.tx_begin_write ({ tables::blocks, tables::pruned }));
while (!pruning_targets.empty () && transaction_write_count < batch_size_a && !stopped)
{
Expand Down
2 changes: 0 additions & 2 deletions nano/node/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
#include <nano/node/vote_processor.hpp>
#include <nano/node/wallet.hpp>
#include <nano/node/websocket.hpp>
#include <nano/node/write_database_queue.hpp>
#include <nano/secure/utility.hpp>

#include <boost/program_options.hpp>
Expand Down Expand Up @@ -138,7 +137,6 @@ class node final : public std::enable_shared_from_this<node>

public:
const nano::keypair node_id;
nano::write_database_queue write_database_queue;
std::shared_ptr<boost::asio::io_context> io_ctx_shared;
boost::asio::io_context & io_ctx;
boost::latch node_initialized_latch;
Expand Down
2 changes: 1 addition & 1 deletion nano/node/nodeconfig.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ class node_flags final
bool allow_bootstrap_peers_duplicates{ false };
bool disable_max_peers_per_ip{ false }; // For testing only
bool disable_max_peers_per_subnetwork{ false }; // For testing only
bool force_use_write_database_queue{ false }; // For testing only. RocksDB does not use the database queue, but some tests rely on it being used.
bool force_use_write_queue{ false }; // For testing only. RocksDB does not use the database queue, but some tests rely on it being used.
bool disable_search_pending{ false }; // For testing only
bool enable_pruning{ false };
bool fast_bootstrap{ false };
Expand Down
6 changes: 3 additions & 3 deletions nano/slow_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1136,14 +1136,14 @@ TEST (confirmation_height, many_accounts_send_receive_self_no_elections)
ASSERT_TRUE (!store->init_error ());
nano::stats stats;
nano::ledger ledger (*store, stats, nano::dev::constants);
nano::write_database_queue write_database_queue (false);
nano::store::write_queue write_database_queue (false);
nano::work_pool pool{ nano::dev::network_params.network, std::numeric_limits<unsigned>::max () };
std::atomic<bool> stopped{ false };
boost::latch initialized_latch{ 0 };

nano::block_hash block_hash_being_processed{ 0 };
nano::write_database_queue write_queue{ false };
nano::confirming_set confirming_set{ ledger, write_queue };
nano::store::write_queue write_queue{ false };
nano::confirming_set confirming_set{ ledger };

auto const num_accounts = 100000;

Expand Down
4 changes: 3 additions & 1 deletion nano/store/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ add_library(
rocksdb/version.cpp
transaction.cpp
version.cpp
versioning.cpp)
versioning.cpp
write_queue.hpp
write_queue.cpp)

target_link_libraries(
nano_store
Expand Down
3 changes: 2 additions & 1 deletion nano/store/component.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#include <nano/store/confirmation_height.hpp>
#include <nano/store/rep_weight.hpp>

nano::store::component::component (nano::store::block & block_store_a, nano::store::account & account_store_a, nano::store::pending & pending_store_a, nano::store::online_weight & online_weight_store_a, nano::store::pruned & pruned_store_a, nano::store::peer & peer_store_a, nano::store::confirmation_height & confirmation_height_store_a, nano::store::final_vote & final_vote_store_a, nano::store::version & version_store_a, nano::store::rep_weight & rep_weight_a) :
nano::store::component::component (nano::store::block & block_store_a, nano::store::account & account_store_a, nano::store::pending & pending_store_a, nano::store::online_weight & online_weight_store_a, nano::store::pruned & pruned_store_a, nano::store::peer & peer_store_a, nano::store::confirmation_height & confirmation_height_store_a, nano::store::final_vote & final_vote_store_a, nano::store::version & version_store_a, nano::store::rep_weight & rep_weight_a, bool use_noops_a) :
block (block_store_a),
account (account_store_a),
pending (pending_store_a),
Expand All @@ -17,6 +17,7 @@ nano::store::component::component (nano::store::block & block_store_a, nano::sto
confirmation_height (confirmation_height_store_a),
final_vote (final_vote_store_a),
version (version_store_a),
write_queue (use_noops_a),
rep_weight (rep_weight_a)
{
}
Expand Down
Loading
Loading