Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove message buffer class and its manager class #4141

Merged
merged 7 commits into from
Feb 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 0 additions & 148 deletions nano/core_test/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -700,154 +700,6 @@ TEST (node, port_mapping)
}
}

TEST (message_buffer_manager, one_buffer)
{
nano::stats stats;
nano::message_buffer_manager buffer (stats, 512, 1);
auto buffer1 (buffer.allocate ());
ASSERT_NE (nullptr, buffer1);
buffer.enqueue (buffer1);
auto buffer2 (buffer.dequeue ());
ASSERT_EQ (buffer1, buffer2);
buffer.release (buffer2);
auto buffer3 (buffer.allocate ());
ASSERT_EQ (buffer1, buffer3);
}

TEST (message_buffer_manager, two_buffers)
{
nano::stats stats;
nano::message_buffer_manager buffer (stats, 512, 2);
auto buffer1 (buffer.allocate ());
ASSERT_NE (nullptr, buffer1);
auto buffer2 (buffer.allocate ());
ASSERT_NE (nullptr, buffer2);
ASSERT_NE (buffer1, buffer2);
buffer.enqueue (buffer2);
buffer.enqueue (buffer1);
auto buffer3 (buffer.dequeue ());
ASSERT_EQ (buffer2, buffer3);
auto buffer4 (buffer.dequeue ());
ASSERT_EQ (buffer1, buffer4);
buffer.release (buffer3);
buffer.release (buffer4);
auto buffer5 (buffer.allocate ());
ASSERT_EQ (buffer2, buffer5);
auto buffer6 (buffer.allocate ());
ASSERT_EQ (buffer1, buffer6);
}

TEST (message_buffer_manager, one_overflow)
{
nano::stats stats;
nano::message_buffer_manager buffer (stats, 512, 1);
auto buffer1 (buffer.allocate ());
ASSERT_NE (nullptr, buffer1);
buffer.enqueue (buffer1);
auto buffer2 (buffer.allocate ());
ASSERT_EQ (buffer1, buffer2);
}

TEST (message_buffer_manager, two_overflow)
{
nano::stats stats;
nano::message_buffer_manager buffer (stats, 512, 2);
auto buffer1 (buffer.allocate ());
ASSERT_NE (nullptr, buffer1);
buffer.enqueue (buffer1);
auto buffer2 (buffer.allocate ());
ASSERT_NE (nullptr, buffer2);
ASSERT_NE (buffer1, buffer2);
buffer.enqueue (buffer2);
auto buffer3 (buffer.allocate ());
ASSERT_EQ (buffer1, buffer3);
auto buffer4 (buffer.allocate ());
ASSERT_EQ (buffer2, buffer4);
}

TEST (message_buffer_manager, one_buffer_multithreaded)
{
nano::stats stats;
nano::message_buffer_manager buffer (stats, 512, 1);
boost::thread thread ([&buffer] () {
auto done (false);
while (!done)
{
auto item (buffer.dequeue ());
done = item == nullptr;
if (item != nullptr)
{
buffer.release (item);
}
}
});
auto buffer1 (buffer.allocate ());
ASSERT_NE (nullptr, buffer1);
buffer.enqueue (buffer1);
auto buffer2 (buffer.allocate ());
ASSERT_EQ (buffer1, buffer2);
buffer.stop ();
thread.join ();
}

TEST (message_buffer_manager, many_buffers_multithreaded)
{
nano::stats stats;
nano::message_buffer_manager buffer (stats, 512, 16);
std::vector<boost::thread> threads;
for (auto i (0); i < 4; ++i)
{
threads.push_back (boost::thread ([&buffer] () {
auto done (false);
while (!done)
{
auto item (buffer.dequeue ());
done = item == nullptr;
if (item != nullptr)
{
buffer.release (item);
}
}
}));
}
std::atomic_int count (0);
for (auto i (0); i < 4; ++i)
{
threads.push_back (boost::thread ([&buffer, &count] () {
auto done (false);
for (auto i (0); !done && i < 1000; ++i)
{
auto item (buffer.allocate ());
done = item == nullptr;
if (item != nullptr)
{
buffer.enqueue (item);
++count;
if (count > 3000)
{
buffer.stop ();
}
}
}
}));
}
buffer.stop ();
for (auto & i : threads)
{
i.join ();
}
}

TEST (message_buffer_manager, stats)
{
nano::stats stats;
nano::message_buffer_manager buffer (stats, 512, 1);
auto buffer1 (buffer.allocate ());
buffer.enqueue (buffer1);
buffer.allocate ();
ASSERT_EQ (1, stats.count (nano::stat::type::udp, nano::stat::detail::overflow));
}

TEST (tcp_listener, tcp_node_id_handshake)
{
nano::test::system system (1);
Expand Down
95 changes: 0 additions & 95 deletions nano/node/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#include <nano/node/network.hpp>
#include <nano/node/node.hpp>
#include <nano/node/telemetry.hpp>
#include <nano/secure/buffer.hpp>

#include <boost/format.hpp>

Expand All @@ -19,7 +18,6 @@ nano::network::network (nano::node & node_a, uint16_t port_a) :
debug_assert (message.header.version_using >= node.network_params.network.protocol_version_min);
process_message (message, channel);
} },
buffer_container (node_a.stats, nano::network::buffer_size, 4096), // 2Mb receive buffer
resolver (node_a.io_ctx),
tcp_message_manager (node_a.config.tcp_incoming_connections_max),
node (node_a),
Expand Down Expand Up @@ -92,7 +90,6 @@ void nano::network::stop ()
{
tcp_channels.stop ();
resolver.cancel ();
buffer_container.stop ();
tcp_message_manager.stop ();
port = 0;
for (auto & thread : packet_processing_threads)
Expand Down Expand Up @@ -751,98 +748,6 @@ void nano::network::exclude (std::shared_ptr<nano::transport::channel> const & c
erase (*channel);
}

/*
* message_buffer_manager
*/

nano::message_buffer_manager::message_buffer_manager (nano::stats & stats_a, std::size_t size, std::size_t count) :
stats (stats_a),
free (count),
full (count),
slab (size * count),
entries (count),
stopped (false)
{
debug_assert (count > 0);
debug_assert (size > 0);
auto slab_data (slab.data ());
auto entry_data (entries.data ());
for (auto i (0); i < count; ++i, ++entry_data)
{
*entry_data = { slab_data + i * size, 0, nano::endpoint () };
free.push_back (entry_data);
}
}

nano::message_buffer * nano::message_buffer_manager::allocate ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
if (!stopped && free.empty () && full.empty ())
{
stats.inc (nano::stat::type::udp, nano::stat::detail::blocking, nano::stat::dir::in);
condition.wait (lock, [&stopped = stopped, &free = free, &full = full] { return stopped || !free.empty () || !full.empty (); });
}
nano::message_buffer * result (nullptr);
if (!free.empty ())
{
result = free.front ();
free.pop_front ();
}
if (result == nullptr && !full.empty ())
{
result = full.front ();
full.pop_front ();
stats.inc (nano::stat::type::udp, nano::stat::detail::overflow, nano::stat::dir::in);
}
release_assert (result || stopped);
return result;
}

void nano::message_buffer_manager::enqueue (nano::message_buffer * data_a)
{
debug_assert (data_a != nullptr);
{
nano::lock_guard<nano::mutex> lock{ mutex };
full.push_back (data_a);
}
condition.notify_all ();
}

nano::message_buffer * nano::message_buffer_manager::dequeue ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped && full.empty ())
{
condition.wait (lock);
}
nano::message_buffer * result (nullptr);
if (!full.empty ())
{
result = full.front ();
full.pop_front ();
}
return result;
}

void nano::message_buffer_manager::release (nano::message_buffer * data_a)
{
debug_assert (data_a != nullptr);
{
nano::lock_guard<nano::mutex> lock{ mutex };
free.push_back (data_a);
}
condition.notify_all ();
}

void nano::message_buffer_manager::stop ()
{
{
nano::lock_guard<nano::mutex> lock{ mutex };
stopped = true;
}
condition.notify_all ();
}

nano::tcp_message_manager::tcp_message_manager (unsigned incoming_connections_max_a) :
max_entries (incoming_connections_max_a * nano::tcp_message_manager::max_entries_per_connection + 1)
{
Expand Down
55 changes: 1 addition & 54 deletions nano/node/network.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,65 +7,13 @@

#include <boost/thread/thread.hpp>

#include <deque>
#include <memory>
#include <queue>
#include <unordered_set>

namespace nano
{
class channel;
class node;
class stats;
class transaction;
class message_buffer final
{
public:
uint8_t * buffer{ nullptr };
std::size_t size{ 0 };
nano::endpoint endpoint;
};
/**
* A circular buffer for servicing nano realtime messages.
* This container follows a producer/consumer model where the operating system is producing data in to
* buffers which are serviced by internal threads.
* If buffers are not serviced fast enough they're internally dropped.
* This container has a maximum space to hold N buffers of M size and will allocate them in round-robin order.
* All public methods are thread-safe
*/
class message_buffer_manager final
{
public:
// Stats - Statistics
// Size - Size of each individual buffer
// Count - Number of buffers to allocate
message_buffer_manager (nano::stats & stats, std::size_t, std::size_t);
// Return a buffer where message data can be put
// Method will attempt to return the first free buffer
// If there are no free buffers, an unserviced buffer will be dequeued and returned
// Function will block if there are no free or unserviced buffers
// Return nullptr if the container has stopped
nano::message_buffer * allocate ();
// Queue a buffer that has been filled with message data and notify servicing threads
void enqueue (nano::message_buffer *);
// Return a buffer that has been filled with message data
// Function will block until a buffer has been added
// Return nullptr if the container has stopped
nano::message_buffer * dequeue ();
// Return a buffer to the freelist after is has been serviced
void release (nano::message_buffer *);
// Stop container and notify waiting threads
void stop ();

private:
nano::stats & stats;
nano::mutex mutex;
nano::condition_variable condition;
boost::circular_buffer<nano::message_buffer *> free;
boost::circular_buffer<nano::message_buffer *> full;
std::vector<uint8_t> slab;
std::vector<nano::message_buffer> entries;
bool stopped;
};
class tcp_message_manager final
{
public:
Expand Down Expand Up @@ -181,7 +129,6 @@ class network final

public:
std::function<void (nano::message const &, std::shared_ptr<nano::transport::channel> const &)> inbound;
nano::message_buffer_manager buffer_container;
boost::asio::ip::udp::resolver resolver;
std::vector<boost::thread> packet_processing_threads;
nano::peer_exclusion excluded_peers;
Expand Down