Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removing channel::callback #2914

Merged
merged 1 commit into from
Sep 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ TEST (node, connect_after_junk)
std::vector<uint8_t> junk_buffer;
junk_buffer.push_back (0);
auto channel1 (std::make_shared<nano::transport::channel_udp> (node1->network.udp_channels, node0->network.endpoint (), node1->network_params.protocol.protocol_version));
channel1->send_buffer (nano::shared_const_buffer (std::move (junk_buffer)), nano::stat::detail::bulk_pull, [](boost::system::error_code const &, size_t) {});
channel1->send_buffer (nano::shared_const_buffer (std::move (junk_buffer)), [](boost::system::error_code const &, size_t) {});
ASSERT_TIMELY (10s, node0->stats.count (nano::stat::type::error) != 0);
node1->start ();
system.nodes.push_back (node1);
Expand Down Expand Up @@ -1566,7 +1566,7 @@ TEST (node, fork_no_vote_quorum)
confirm.serialize (stream, false);
}
nano::transport::channel_udp channel (node2.network.udp_channels, node3.network.endpoint (), node1.network_params.protocol.protocol_version);
channel.send_buffer (nano::shared_const_buffer (std::move (buffer)), nano::stat::detail::confirm_ack);
channel.send_buffer (nano::shared_const_buffer (std::move (buffer)));
ASSERT_TIMELY (10s, node3.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::in) >= 3);
ASSERT_TRUE (node1.latest (nano::dev_genesis_key.pub) == send1.hash ());
ASSERT_TRUE (node2.latest (nano::dev_genesis_key.pub) == send1.hash ());
Expand Down
4 changes: 2 additions & 2 deletions nano/node/bootstrap/bootstrap_bulk_push.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ void nano::bulk_push_client::send_finished ()
{
nano::shared_const_buffer buffer (static_cast<uint8_t> (nano::block_type::not_a_block));
auto this_l (shared_from_this ());
connection->channel->send_buffer (buffer, nano::stat::detail::all, [this_l](boost::system::error_code const & ec, size_t size_a) {
connection->channel->send_buffer (buffer, [this_l](boost::system::error_code const & ec, size_t size_a) {
try
{
this_l->promise.set_value (false);
Expand All @@ -96,7 +96,7 @@ void nano::bulk_push_client::push_block (nano::block const & block_a)
nano::serialize_block (stream, block_a);
}
auto this_l (shared_from_this ());
connection->channel->send_buffer (nano::shared_const_buffer (std::move (buffer)), nano::stat::detail::all, [this_l](boost::system::error_code const & ec, size_t size_a) {
connection->channel->send_buffer (nano::shared_const_buffer (std::move (buffer)), [this_l](boost::system::error_code const & ec, size_t size_a) {
if (!ec)
{
this_l->push ();
Expand Down
52 changes: 22 additions & 30 deletions nano/node/transport/tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,29 @@ bool nano::transport::channel_tcp::operator== (nano::transport::channel const &
return result;
}

void nano::transport::channel_tcp::send_buffer (nano::shared_const_buffer const & buffer_a, nano::stat::detail detail_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a, nano::buffer_drop_policy drop_policy_a)
void nano::transport::channel_tcp::send_buffer (nano::shared_const_buffer const & buffer_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a, nano::buffer_drop_policy drop_policy_a)
{
if (auto socket_l = socket.lock ())
{
socket_l->async_write (buffer_a, tcp_callback (detail_a, socket_l->remote_endpoint (), callback_a), drop_policy_a);
socket_l->async_write (
buffer_a, [endpoint_a = socket_l->remote_endpoint (), node = std::weak_ptr<nano::node> (node.shared ()), callback_a](boost::system::error_code const & ec, size_t size_a) {
if (auto node_l = node.lock ())
{
if (!ec)
{
node_l->network.tcp_channels.update (endpoint_a);
}
if (ec == boost::system::errc::host_unreachable)
{
node_l->stats.inc (nano::stat::type::error, nano::stat::detail::unreachable_host, nano::stat::dir::out);
}
if (callback_a)
{
callback_a (ec, size_a);
}
}
},
drop_policy_a);
}
else if (callback_a)
{
Expand All @@ -64,32 +82,6 @@ void nano::transport::channel_tcp::send_buffer (nano::shared_const_buffer const
}
}

std::function<void(boost::system::error_code const &, size_t)> nano::transport::channel_tcp::callback (nano::stat::detail detail_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a) const
{
return callback_a;
}

std::function<void(boost::system::error_code const &, size_t)> nano::transport::channel_tcp::tcp_callback (nano::stat::detail detail_a, nano::tcp_endpoint const & endpoint_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a) const
{
return [endpoint_a, node = std::weak_ptr<nano::node> (node.shared ()), callback_a](boost::system::error_code const & ec, size_t size_a) {
if (auto node_l = node.lock ())
{
if (!ec)
{
node_l->network.tcp_channels.update (endpoint_a);
}
if (ec == boost::system::errc::host_unreachable)
{
node_l->stats.inc (nano::stat::type::error, nano::stat::detail::unreachable_host, nano::stat::dir::out);
}
if (callback_a)
{
callback_a (ec, size_a);
}
}
};
}

std::string nano::transport::channel_tcp::to_string () const
{
if (auto socket_l = socket.lock ())
Expand Down Expand Up @@ -560,7 +552,7 @@ void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint_a
std::shared_ptr<std::vector<uint8_t>> receive_buffer (std::make_shared<std::vector<uint8_t>> ());
receive_buffer->resize (256);
node_l->network.tcp_channels.push_node_id_handshake_socket (socket);
channel->send_buffer (bytes, nano::stat::detail::node_id_handshake, [node_w, channel, endpoint_a, receive_buffer, callback_a](boost::system::error_code const & ec, size_t size_a) {
channel->send_buffer (bytes, [node_w, channel, endpoint_a, receive_buffer, callback_a](boost::system::error_code const & ec, size_t size_a) {
if (auto node_l = node_w.lock ())
{
if (!ec && channel)
Expand Down Expand Up @@ -655,7 +647,7 @@ void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptr<n
{
node_l->logger.try_log (boost::str (boost::format ("Node ID handshake response sent with node ID %1% to %2%: query %3%") % node_l->node_id.pub.to_node_id () % endpoint_a % (*message.query).to_string ()));
}
channel_a->send_buffer (bytes, nano::stat::detail::node_id_handshake, [node_w, channel_a, endpoint_a, callback_a, cleanup_and_udp_fallback](boost::system::error_code const & ec, size_t size_a) {
channel_a->send_buffer (bytes, [node_w, channel_a, endpoint_a, callback_a, cleanup_and_udp_fallback](boost::system::error_code const & ec, size_t size_a) {
if (auto node_l = node_w.lock ())
{
if (!ec && channel_a)
Expand Down
4 changes: 1 addition & 3 deletions nano/node/transport/tcp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ namespace transport
~channel_tcp ();
size_t hash_code () const override;
bool operator== (nano::transport::channel const &) const override;
void send_buffer (nano::shared_const_buffer const &, nano::stat::detail, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr, nano::buffer_drop_policy = nano::buffer_drop_policy::limiter) override;
std::function<void(boost::system::error_code const &, size_t)> callback (nano::stat::detail, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr) const override;
std::function<void(boost::system::error_code const &, size_t)> tcp_callback (nano::stat::detail, nano::tcp_endpoint const &, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr) const;
void send_buffer (nano::shared_const_buffer const &, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr, nano::buffer_drop_policy = nano::buffer_drop_policy::limiter) override;
std::string to_string () const override;
bool operator== (nano::transport::channel_tcp const & other_a) const
{
Expand Down
2 changes: 1 addition & 1 deletion nano/node/transport/transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ void nano::transport::channel::send (nano::message const & message_a, std::funct
auto should_drop (node.network.limiter.should_drop (buffer.size ()));
if (!is_droppable_by_limiter || !should_drop)
{
send_buffer (buffer, detail, callback_a, drop_policy_a);
send_buffer (buffer, callback_a, drop_policy_a);
node.stats.inc (nano::stat::type::message, detail, nano::stat::dir::out);
}
else
Expand Down
3 changes: 1 addition & 2 deletions nano/node/transport/transport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ namespace transport
virtual size_t hash_code () const = 0;
virtual bool operator== (nano::transport::channel const &) const = 0;
void send (nano::message const &, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr, nano::buffer_drop_policy = nano::buffer_drop_policy::limiter);
virtual void send_buffer (nano::shared_const_buffer const &, nano::stat::detail, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr, nano::buffer_drop_policy = nano::buffer_drop_policy::limiter) = 0;
virtual std::function<void(boost::system::error_code const &, size_t)> callback (nano::stat::detail, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr) const = 0;
virtual void send_buffer (nano::shared_const_buffer const &, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr, nano::buffer_drop_policy = nano::buffer_drop_policy::limiter) = 0;
virtual std::string to_string () const = 0;
virtual nano::endpoint get_endpoint () const = 0;
virtual nano::tcp_endpoint get_tcp_endpoint () const = 0;
Expand Down
11 changes: 3 additions & 8 deletions nano/node/transport/udp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,10 @@ bool nano::transport::channel_udp::operator== (nano::transport::channel const &
return result;
}

void nano::transport::channel_udp::send_buffer (nano::shared_const_buffer const & buffer_a, nano::stat::detail detail_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a, nano::buffer_drop_policy drop_policy_a)
void nano::transport::channel_udp::send_buffer (nano::shared_const_buffer const & buffer_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a, nano::buffer_drop_policy drop_policy_a)
{
set_last_packet_sent (std::chrono::steady_clock::now ());
channels.send (buffer_a, endpoint, callback (detail_a, callback_a));
}

std::function<void(boost::system::error_code const &, size_t)> nano::transport::channel_udp::callback (nano::stat::detail detail_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a) const
{
return [node = std::weak_ptr<nano::node> (channels.node.shared ()), callback_a](boost::system::error_code const & ec, size_t size_a) {
channels.send (buffer_a, endpoint, [node = std::weak_ptr<nano::node> (channels.node.shared ()), callback_a](boost::system::error_code const & ec, size_t size_a) {
if (auto node_l = node.lock ())
{
if (ec == boost::system::errc::host_unreachable)
Expand All @@ -58,7 +53,7 @@ std::function<void(boost::system::error_code const &, size_t)> nano::transport::
callback_a (ec, size_a);
}
}
};
});
}

std::string nano::transport::channel_udp::to_string () const
Expand Down
3 changes: 1 addition & 2 deletions nano/node/transport/udp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ namespace transport
channel_udp (nano::transport::udp_channels &, nano::endpoint const &, uint8_t protocol_version);
size_t hash_code () const override;
bool operator== (nano::transport::channel const &) const override;
void send_buffer (nano::shared_const_buffer const &, nano::stat::detail, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr, nano::buffer_drop_policy = nano::buffer_drop_policy::limiter) override;
std::function<void(boost::system::error_code const &, size_t)> callback (nano::stat::detail, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr) const override;
void send_buffer (nano::shared_const_buffer const &, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr, nano::buffer_drop_policy = nano::buffer_drop_policy::limiter) override;
std::string to_string () const override;
bool operator== (nano::transport::channel_udp const & other_a) const
{
Expand Down