Skip to content

Commit

Permalink
Add the block_publisher class (#4178)
Browse files Browse the repository at this point in the history
* Add the block_publisher class

* Move the block publishing to the block_publisher class

* Remove unnecessary header
  • Loading branch information
thsfs authored Mar 9, 2023
1 parent d492408 commit 1d8fc9e
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 3 deletions.
2 changes: 2 additions & 0 deletions nano/node/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ add_library(
block_arrival.cpp
block_broadcast.cpp
block_broadcast.hpp
block_publisher.cpp
block_publisher.hpp
blocking_observer.cpp
blocking_observer.hpp
blockprocessor.hpp
Expand Down
27 changes: 27 additions & 0 deletions nano/node/block_publisher.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#include <nano/node/active_transactions.hpp>
#include <nano/node/block_publisher.hpp>
#include <nano/node/blockprocessor.hpp>

nano::block_publisher::block_publisher (nano::active_transactions & active) :
active{ active }
{
}

void nano::block_publisher::connect (nano::block_processor & block_processor)
{
block_processor.processed.add ([this] (auto const & result, auto const & block) {
switch (result.code)
{
case nano::process_result::fork:
observe (block);
break;
default:
break;
}
});
}

void nano::block_publisher::observe (std::shared_ptr<nano::block> block)
{
active.publish (block);
}
24 changes: 24 additions & 0 deletions nano/node/block_publisher.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#pragma once

#include <memory>

namespace nano
{
class active_transactions;
class block_processor;
class block;

// This class tracks processed blocks to be published.
class block_publisher
{
public:
block_publisher (nano::active_transactions & active);
void connect (nano::block_processor & block_processor);

private:
// Block_processor observer
void observe (std::shared_ptr<nano::block> block);

nano::active_transactions & active;
};
}
2 changes: 0 additions & 2 deletions nano/node/blockprocessor.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#include <nano/lib/threading.hpp>
#include <nano/lib/timer.hpp>
#include <nano/node/blockprocessor.hpp>
#include <nano/node/election.hpp>
#include <nano/node/node.hpp>
#include <nano/node/websocket.hpp>
#include <nano/secure/store.hpp>
Expand Down Expand Up @@ -435,7 +434,6 @@ nano::process_return nano::block_processor::process_one (nano::write_transaction
case nano::process_result::fork:
{
node.stats.inc (nano::stat::type::ledger, nano::stat::detail::fork);
events_a.events.emplace_back ([this, block] (nano::transaction const &) { this->node.active.publish (block); });
if (node.config.logging.ledger_logging ())
{
node.logger.try_log (boost::str (boost::format ("Fork for: %1% root: %2%") % hash.to_string () % block->root ().to_string ()));
Expand Down
4 changes: 3 additions & 1 deletion nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,11 @@ nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path co
epoch_upgrader{ *this, ledger, store, network_params, logger },
startup_time (std::chrono::steady_clock::now ()),
node_seq (seq),
block_broadcast{ network, block_arrival, !flags.disable_block_processor_republishing }
block_broadcast{ network, block_arrival, !flags.disable_block_processor_republishing },
block_publisher{ active }
{
block_broadcast.connect (block_processor);
block_publisher.connect (block_processor);
unchecked.use_memory = [this] () { return ledger.bootstrap_weight_reached (); };
unchecked.satisfied = [this] (nano::unchecked_info const & info) {
this->block_processor.add (info.block);
Expand Down
2 changes: 2 additions & 0 deletions nano/node/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <nano/node/bandwidth_limiter.hpp>
#include <nano/node/block_arrival.hpp>
#include <nano/node/block_broadcast.hpp>
#include <nano/node/block_publisher.hpp>
#include <nano/node/blockprocessor.hpp>
#include <nano/node/bootstrap/bootstrap.hpp>
#include <nano/node/bootstrap/bootstrap_attempt.hpp>
Expand Down Expand Up @@ -192,6 +193,7 @@ class node final : public std::enable_shared_from_this<nano::node>
nano::websocket_server websocket;
nano::epoch_upgrader epoch_upgrader;
nano::block_broadcast block_broadcast;
nano::block_publisher block_publisher;

std::chrono::steady_clock::time_point const startup_time;
std::chrono::seconds unchecked_cutoff = std::chrono::seconds (7 * 24 * 60 * 60); // Week
Expand Down

0 comments on commit 1d8fc9e

Please sign in to comment.