From d899bc0c46bfc421095360428a6c038dc370c57f Mon Sep 17 00:00:00 2001 From: cryptocode Date: Sat, 8 Dec 2018 04:27:03 +0100 Subject: [PATCH 01/15] Support external RPC servers via IPC --- nano/nano_node/daemon.cpp | 15 +- nano/nano_wallet/entry.cpp | 7 +- nano/node/CMakeLists.txt | 2 + nano/node/ipc.cpp | 425 +++++++++++++++++++++++++++++++++++++ nano/node/ipc.hpp | 102 +++++++++ nano/node/nodeconfig.cpp | 6 + nano/node/nodeconfig.hpp | 2 + nano/node/rpc.cpp | 40 ++-- nano/node/rpc.hpp | 8 +- nano/node/stats.cpp | 6 + nano/node/stats.hpp | 4 + 11 files changed, 594 insertions(+), 23 deletions(-) create mode 100644 nano/node/ipc.cpp create mode 100644 nano/node/ipc.hpp diff --git a/nano/nano_node/daemon.cpp b/nano/nano_node/daemon.cpp index a9166646b5..3ae2a68d9d 100644 --- a/nano/nano_node/daemon.cpp +++ b/nano/nano_node/daemon.cpp @@ -1,11 +1,12 @@ -#include -#include -#include - #include #include #include +#include +#include +#include b +#include #include +#include nano_daemon::daemon_config::daemon_config () : rpc_enable (false), @@ -139,10 +140,12 @@ void nano_daemon::daemon::run (boost::filesystem::path const & data_path, nano:: node->flags = flags; node->start (); std::unique_ptr rpc = get_rpc (io_ctx, *node, config.rpc); - if (rpc && config.rpc_enable) + if (rpc) { - rpc->start (); + rpc->start (config.rpc_enable); } + nano::ipc::ipc_server ipc (*node, *rpc); + config.node.disable_lazy_bootstrap = disable_lazy_bootstrap; runner = std::make_unique (io_ctx, node->config.io_threads); runner->join (); } diff --git a/nano/nano_wallet/entry.cpp b/nano/nano_wallet/entry.cpp index f0e8780eb5..6056c1e546 100644 --- a/nano/nano_wallet/entry.cpp +++ b/nano/nano_wallet/entry.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -277,12 +278,14 @@ int run_wallet (QApplication & application, int argc, char * const * argv, boost update_config (config, config_path); node->start (); std::unique_ptr rpc = get_rpc (io_ctx, *node, config.rpc); - if (rpc && config.rpc_enable) + if (rpc) { - rpc->start (); + rpc->start (config.rpc_enable); } + nano::ipc::ipc_server ipc (*node, *rpc); nano::thread_runner runner (io_ctx, node->config.io_threads); QObject::connect (&application, &QApplication::aboutToQuit, [&]() { + ipc.stop (); rpc->stop (); node->stop (); }); diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index 07397c091e..d634c5f572 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -23,6 +23,8 @@ add_library (node cli.cpp common.cpp common.hpp + ipc.hpp + ipc.cpp lmdb.cpp lmdb.hpp logging.cpp diff --git a/nano/node/ipc.cpp b/nano/node/ipc.cpp new file mode 100644 index 0000000000..97d264500e --- /dev/null +++ b/nano/node/ipc.cpp @@ -0,0 +1,425 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace boost::log; + +std::string nano::error_ipc_messages::message (int ev) const +{ + switch (static_cast (ev)) + { + case nano::error_ipc::generic: + return "Unknown error"; + case nano::error_ipc::invalid_preamble: + return "Invalid preamble"; + } +} +void nano::ipc::ipc_config::serialize_json (nano::jsonconfig & json) +{ + +} + +bool nano::ipc::ipc_config::deserialize_json (nano::jsonconfig & tree_a) +{ + bool error = false; + + auto tcp_l (tree_a.get_optional_child ("tcp")); + if (tcp_l) + { + tcp_l->get ("io_threads", transport_tcp.io_threads); + tcp_l->get ("enable", transport_tcp.enabled); + tcp_l->get ("address", transport_tcp.address); + tcp_l->get ("port", transport_tcp.port); + tcp_l->get ("io_timeout", transport_tcp.io_timeout); + } + + auto domain_l (tree_a.get_optional_child ("local")); + if (domain_l) + { + domain_l->get ("io_threads", transport_domain.io_threads); + domain_l->get ("enable", transport_domain.enabled); + domain_l->get ("path", transport_domain.path); + domain_l->get ("io_timeout", transport_domain.io_timeout); + } + + return error; +} + +/** A client session that manages its own lifetime */ +template +class session : public std::enable_shared_from_this> +{ +public: + session (nano::ipc::ipc_server & server_a, boost::asio::io_context & io_context_a, nano::ipc::ipc_config_transport & config_transport_a) : + server (server_a), node (server_a.node), io_context (io_context_a), socket (io_context_a), writer_strand (io_context_a), io_timer (io_context_a), config_transport (config_transport_a) + { + } + + ~session () + { + BOOST_LOG (node.log) << "IPC: session ended"; + } + + SOCKET_TYPE & get_socket () + { + return socket; + } + + /** + * Async read of exactly \p size bytes. The callback is called only when all the data is available and + * no error has occurred. On error, the error is logged, the read cycle stops and the session ends. Clients + * are expected to implement reconnect logic. + */ + void async_read_exactly (void * buff_a, size_t size_a, std::function fn) + { + async_read_exactly (buff_a, size_a, config_transport.io_timeout, fn); + } + + /** + * Async read of exactly \p size bytes and a specific timeout. + * @see async_read_exactly (void *, size_t, std::function) + */ + void async_read_exactly (void * buff_a, size_t size, size_t timeout_seconds, std::function fn) + { + timer_start (timeout_seconds); + boost::asio::async_read (socket, + boost::asio::buffer (buff_a, size), + boost::asio::transfer_exactly (size), + boost::bind (&session::handle_read_or_error, + this->shared_from_this (), + fn, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred)); + } + + void handle_read_or_error (std::function fn, const boost::system::error_code & error, size_t bytes_transferred) + { + timer_cancel (); + if ((boost::asio::error::connection_aborted == error) || (boost::asio::error::connection_reset == error)) + { + BOOST_LOG (node.log) << boost::str (boost::format ("IPC: error reading %1% ") % error.message ()); + } + else if (bytes_transferred > 0) + { + fn (); + } + } + + /** + * Write callback. If no error occurs, the session starts waiting for another request. + * Clients are expected to implement reconnect logic. + */ + void handle_write (const boost::system::error_code & error, size_t bytes_transferred) + { + timer_cancel (); + if (!error) + { + read_next_request (); + } + else + { + BOOST_LOG (node.log) << "IPC: Write failed: " << error.message (); + } + } + + /** Handler for payloads of type nano::ipc_encoding::json_legacy */ + void rpc_handle_query () + { + auto start (std::chrono::steady_clock::now ()); + request_id_str = (boost::str (boost::format ("%1%") % boost::io::group (std::hex, std::showbase, request_id.fetch_add (1)))); + + // This is called when the nano::rpc_handler#process_request is done. We convert to + // json and writes the response to the ipc socket with a length prefix. + auto response_handler ([this, start](boost::property_tree::ptree const & tree_a) { + std::stringstream ostream; + boost::property_tree::write_json (ostream, tree_a); + ostream.flush (); + request_body = ostream.str (); + + uint32_t size_response = boost::endian::native_to_big ((uint32_t)request_body.size ()); + std::vector bufs = { + boost::asio::buffer (&size_response, 4), + boost::asio::buffer (request_body) + }; + + this->timer_start (this->config_transport.io_timeout); + boost::asio::async_write (this->socket, bufs, + writer_strand.wrap ( + boost::bind ( + &session::handle_write, + this->shared_from_this (), + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred))); + + if (this->node.config.logging.log_rpc ()) + { + BOOST_LOG (this->node.log) << boost::str (boost::format ("IPC/RPC request %1% completed in: %2% microseconds") % request_id_str % std::chrono::duration_cast (std::chrono::steady_clock::now () - start).count ()); + } + }); + + node.stats.inc (nano::stat::type::ipc, nano::stat::detail::invocations); + auto body (std::string ((char *)buffer.data (), buffer.size ())); + + // Note that if the rpc action is async, the shared_ptr lifetime will be extended by the action handler + auto handler (std::make_shared (node, server.rpc, body, request_id_str, response_handler)); + handler->process_request (); + } + + /** Async request reader */ + void read_next_request () + { + auto this_l = this->shared_from_this (); + + // Await preamble + buffer.resize (4); + async_read_exactly (buffer.data (), buffer.size (), std::numeric_limits::max (), [this_l]() { + if (this_l->buffer[0] != 'N') + { + BOOST_LOG (this_l->node.log) << "IPC: Invalid preamble"; + } + else if (this_l->buffer[1] == static_cast (nano::ipc_encoding::json_legacy)) + { + // Length of query + this_l->async_read_exactly (&this_l->buffer_size, 4, [this_l]() { + boost::endian::big_to_native_inplace (this_l->buffer_size); + this_l->buffer.resize (this_l->buffer_size); + // Query + this_l->async_read_exactly (this_l->buffer.data (), this_l->buffer_size, [this_l]() { + this_l->rpc_handle_query (); + }); + }); + } + else + { + BOOST_LOG (this_l->node.log) << "IPC: Unsupported payload encoding"; + } + }); + } + + void close () + { + socket.shutdown (boost::asio::ip::tcp::socket::shutdown_both); + socket.close (); + } + +protected: + /** + * Start IO timer. + * @param sec Seconds to wait. To wait indefinitely, use std::numeric_limits::max(). + */ + void timer_start (size_t sec) + { + if (sec < std::numeric_limits::max ()) + { + io_timer.expires_from_now (boost::posix_time::seconds (sec)); + io_timer.async_wait ([this](const boost::system::error_code & ec) { + if (!ec) + { + this->timer_expired (); + } + }); + } + } + + void timer_expired (void) + { + close (); + BOOST_LOG (node.log) << "IPC: IO timeout"; + } + + void timer_cancel (void) + { + boost::system::error_code ec; + this->io_timer.cancel (ec); + } + +private: + nano::ipc::ipc_server & server; + nano::node & node; + + // Request data for ipc_encoding::json_legacy payloads + std::atomic request_id{ 0 }; + std::string request_body; + std::string request_id_str; + + /** + * IO context from node, or per-transport, depending on configuration. + * Certain transport configurations (tcp+ssl) may scale better if they use a + * separate context. + */ + boost::asio::io_context & io_context; + + /** A socket of the given asio type */ + SOCKET_TYPE socket; + + /** + * Allow multiple threads to write simultaniously. This allows for future extensions like + * async callback feeds without locking. + */ + boost::asio::io_context::strand writer_strand; + + /** Receives the size of header/query payloads */ + uint32_t buffer_size = 0; + + /** Buffer used to store data received from the client */ + std::vector buffer; + + /** IO operation timer */ + boost::asio::deadline_timer io_timer; + + /** Transport configuration */ + nano::ipc::ipc_config_transport & config_transport; +}; + +/** Domain and TCP socket transport */ +template +class socket_transport : public nano::ipc::transport +{ +public: + socket_transport (nano::ipc::ipc_server & server, ENDPOINT_TYPE ep, nano::ipc::ipc_config_transport & config_transport_a, int concurrency) : + server (server), config_transport (config_transport_a) + { + // Using a per-transport event dispatcher? + if (concurrency > 0) + { + io_context = std::make_unique (); + } + + boost::asio::socket_base::reuse_address option (true); + boost::asio::socket_base::keep_alive option_keepalive (true); + acceptor = std::make_unique (context (), ep); + acceptor->set_option (option); + acceptor->set_option (option_keepalive); + accept (); + + // Start serving IO requests. If concurrency is 0, the node's thread pool is used instead. + if (concurrency > 0) + { + runner = std::make_unique (*io_context, concurrency); + } + } + + boost::asio::io_context & context () const + { + return io_context ? *io_context : server.node.io_ctx; + } + + void accept () + { + std::shared_ptr> new_session (new session (server, io_context ? *io_context : server.node.io_ctx, config_transport)); + + acceptor->async_accept (new_session->get_socket (), + boost::bind (&socket_transport::handle_accept, this, new_session, + boost::asio::placeholders::error)); + } + + void handle_accept (std::shared_ptr> new_session, const boost::system::error_code & error) + { + if (!error) + { + new_session->read_next_request (); + } + else + { + BOOST_LOG (server.node.log) << "IPC acceptor error: " << error.message (); + } + + if (acceptor->is_open ()) + { + accept (); + } + } + + void stop () + { + acceptor->close (); + io_context->stop (); + + if (runner) + { + runner->join (); + } + } + +private: + nano::ipc::ipc_server & server; + nano::ipc::ipc_config_transport & config_transport; + std::unique_ptr runner; + std::unique_ptr io_context; + std::unique_ptr acceptor; +}; + +/** Domain socket file remover */ +class nano::ipc::dsock_file_remover +{ +public: + dsock_file_remover (std::string file) : + filename (file) + { + std::remove (filename.c_str ()); + } + ~dsock_file_remover () + { + std::remove (filename.c_str ()); + } + std::string filename; +}; + +nano::ipc::ipc_server::ipc_server (nano::node & node_a, nano::rpc & rpc_a) : +node (node_a), rpc (rpc_a), +stopped (false) +{ + try + { + if (node_a.config.ipc_config.transport_domain.enabled) + { +#if defined(BOOST_ASIO_HAS_LOCAL_SOCKETS) + size_t threads = node_a.config.ipc_config.transport_domain.io_threads; + file_remover = std::make_unique (node_a.config.ipc_config.transport_domain.path); + boost::asio::local::stream_protocol::endpoint ep{ node_a.config.ipc_config.transport_domain.path }; + transports.push_back (std::make_shared> (*this, ep, node_a.config.ipc_config.transport_domain, threads)); +#else + BOOST_LOG (node.log) << "IPC: Domain sockets are not supported on this platform"; +#endif + } + + if (node_a.config.ipc_config.transport_tcp.enabled) + { + size_t threads = node_a.config.ipc_config.transport_tcp.io_threads; + transports.push_back (std::make_shared> (*this, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v6 (), node_a.config.ipc_config.transport_tcp.port), node_a.config.ipc_config.transport_domain, threads)); + } + + BOOST_LOG (node.log) << "IPC: server started"; + } + catch (std::runtime_error const & ex) + { + BOOST_LOG (node.log) << "IPC: " << ex.what (); + } +} + +nano::ipc::ipc_server::~ipc_server () +{ + BOOST_LOG (node.log) << "IPC: server stopped"; +} + +void nano::ipc::ipc_server::stop () +{ + for (auto & transport : transports) + { + transport->stop (); + } + stopped = true; +} diff --git a/nano/node/ipc.hpp b/nano/node/ipc.hpp new file mode 100644 index 0000000000..9a1467ef23 --- /dev/null +++ b/nano/node/ipc.hpp @@ -0,0 +1,102 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace nano +{ +class node; +class rpc; +} + +namespace nano +{ +/** IPC errors. Do not change or reuse enum values as these propagate to clients. */ +enum class error_ipc +{ + generic = 1, + invalid_preamble = 2, +}; + +/** Payload encodings, add protobuf, etc as needed. */ +enum class ipc_encoding : uint8_t +{ + json_legacy = 1 +}; + +namespace ipc +{ + /** Removes domain socket files on startup and shutdown */ + class dsock_file_remover; + + /** IPC transport interface */ + class transport + { + public: + virtual void stop () = 0; + virtual ~transport () = default; + }; + + /** Base class for transport configurations */ + class ipc_config_transport + { + public: + bool enabled{ false }; + size_t io_timeout{ 15 }; + size_t io_threads{std::max (4u, std::thread::hardware_concurrency ())}; + }; + + /** Domain socket specific transport config */ + class ipc_config_domain_socket : public ipc_config_transport + { + public: + /** + * Default domain socket path for Unix systems. Once we support Windows 10 usocks, this value + * will be conditional on OS. + */ + std::string path{ "/tmp/nano" }; + }; + + /** TCP specific transport config */ + class ipc_config_tcp_socket : public ipc_config_transport + { + public: + std::string address{ "::1" }; + uint16_t port{ 7077 }; + }; + + /** IPC configuration */ + class ipc_config + { + public: + /** Reads the JSON "ipc" node from the config, if present */ + bool deserialize_json (nano::jsonconfig & json_a); + void serialize_json (nano::jsonconfig & json); + ipc_config_domain_socket transport_domain; + ipc_config_tcp_socket transport_tcp; + }; + + /** IPC server */ + class ipc_server + { + public: + ipc_server (nano::node & node, nano::rpc & rpc); + ~ipc_server (); + void stop (); + + nano::node & node; + nano::rpc & rpc; + + private: + std::atomic stopped; + std::unique_ptr file_remover; + std::vector> transports; + }; +} +} + +REGISTER_ERROR_CODES (nano, error_ipc) diff --git a/nano/node/nodeconfig.cpp b/nano/node/nodeconfig.cpp index 2ea7d073b0..71c051299a 100644 --- a/nano/node/nodeconfig.cpp +++ b/nano/node/nodeconfig.cpp @@ -315,6 +315,12 @@ nano::error nano::node_config::deserialize_json (bool & upgraded_a, nano::jsonco auto block_processor_batch_max_time_l (json.get ("block_processor_batch_max_time")); block_processor_batch_max_time = std::chrono::milliseconds (block_processor_batch_max_time_l); + auto ipc_config_l (json.get_optional_child ("ipc")); + if (ipc_config_l) + { + ipc_config.deserialize_json (ipc_config_l.get ()); + } + json.get ("peering_port", peering_port); json.get ("bootstrap_fraction_numerator", bootstrap_fraction_numerator); json.get ("online_weight_quorum", online_weight_quorum); diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index 7982a739cd..aabae5b3bf 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -44,6 +45,7 @@ class node_config int lmdb_max_dbs; bool allow_local_peers; nano::stat_config stat_config; + nano::ipc::ipc_config ipc_config; nano::uint256_union epoch_block_link; nano::account epoch_block_signer; std::chrono::milliseconds block_processor_batch_max_time; diff --git a/nano/node/rpc.cpp b/nano/node/rpc.cpp index 3ebbfb1168..685ab7949d 100644 --- a/nano/node/rpc.cpp +++ b/nano/node/rpc.cpp @@ -94,26 +94,38 @@ node (node_a) { } -void nano::rpc::start () +void nano::rpc::add_block_observer () { - auto endpoint (nano::tcp_endpoint (config.address, config.port)); - acceptor.open (endpoint.protocol ()); - acceptor.set_option (boost::asio::ip::tcp::acceptor::reuse_address (true)); + node.observers.blocks.add ([this](std::shared_ptr block_a, nano::account const & account_a, nano::uint128_t const &, bool) { + observer_action (account_a); + }); +} - boost::system::error_code ec; - acceptor.bind (endpoint, ec); - if (ec) +void nano::rpc::start (bool rpc_enabled_a) +{ + if (rpc_enabled_a) { - BOOST_LOG (node.log) << boost::str (boost::format ("Error while binding for RPC on port %1%: %2%") % endpoint.port () % ec.message ()); - throw std::runtime_error (ec.message ()); + auto endpoint (nano::tcp_endpoint (config.address, config.port)); + acceptor.open (endpoint.protocol ()); + acceptor.set_option (boost::asio::ip::tcp::acceptor::reuse_address (true)); + + boost::system::error_code ec; + acceptor.bind (endpoint, ec); + if (ec) + { + BOOST_LOG (node.log) << boost::str (boost::format ("Error while binding for RPC on port %1%: %2%") % endpoint.port () % ec.message ()); + throw std::runtime_error (ec.message ()); + } + + acceptor.listen (); } - acceptor.listen (); - node.observers.blocks.add ([this](std::shared_ptr block_a, nano::account const & account_a, nano::uint128_t const &, bool) { - observer_action (account_a); - }); + add_block_observer (); - accept (); + if (rpc_enabled_a) + { + accept (); + } } void nano::rpc::accept () diff --git a/nano/node/rpc.hpp b/nano/node/rpc.hpp index 772e65d32f..5dcc1f4a34 100644 --- a/nano/node/rpc.hpp +++ b/nano/node/rpc.hpp @@ -69,7 +69,13 @@ class rpc public: rpc (boost::asio::io_context &, nano::node &, nano::rpc_config const &); virtual ~rpc () = default; - void start (); + + /** + * Start serving RPC requests if \p rpc_enabled_a, otherwise this will only + * add a block observer since requests may still arrive via IPC. + */ + void start (bool rpc_enabled_a = true); + void add_block_observer (); virtual void accept (); void stop (); void observer_action (nano::account const &); diff --git a/nano/node/stats.cpp b/nano/node/stats.cpp index c043ea4255..8c33cf4615 100644 --- a/nano/node/stats.cpp +++ b/nano/node/stats.cpp @@ -316,6 +316,9 @@ std::string nano::stat::type_to_string (uint32_t key) std::string res; switch (type) { + case nano::stat::type::ipc: + res = "ipc"; + break; case nano::stat::type::block: res = "block"; break; @@ -410,6 +413,9 @@ std::string nano::stat::detail_to_string (uint32_t key) case nano::stat::detail::insufficient_work: res = "insufficient_work"; break; + case nano::stat::detail::invocations: + res = "invocations"; + break; case nano::stat::detail::keepalive: res = "keepalive"; break; diff --git a/nano/node/stats.hpp b/nano/node/stats.hpp index 202e40c815..e3b1d5e3ee 100644 --- a/nano/node/stats.hpp +++ b/nano/node/stats.hpp @@ -188,6 +188,7 @@ class stat vote, http_callback, peering, + ipc, udp }; @@ -249,6 +250,9 @@ class stat invalid_node_id_handshake_message, outdated_version, + // ipc + invocations, + // peering handshake, }; From 122134edb3c9cc0868ecd11c8b60140a56fd463c Mon Sep 17 00:00:00 2001 From: cryptocode Date: Wed, 12 Dec 2018 16:11:52 +0100 Subject: [PATCH 02/15] Warning fix and rebase of moved config option --- nano/node/ipc.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/nano/node/ipc.cpp b/nano/node/ipc.cpp index 97d264500e..e7257e7a55 100644 --- a/nano/node/ipc.cpp +++ b/nano/node/ipc.cpp @@ -26,6 +26,7 @@ std::string nano::error_ipc_messages::message (int ev) const case nano::error_ipc::invalid_preamble: return "Invalid preamble"; } + return "Invalid error"; } void nano::ipc::ipc_config::serialize_json (nano::jsonconfig & json) { From 88b09f838629572aa4d6daab974ecfb16efeae25 Mon Sep 17 00:00:00 2001 From: Roy Keene Date: Fri, 28 Dec 2018 17:26:45 -0600 Subject: [PATCH 03/15] Fixed merge error --- nano/nano_node/daemon.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/nano/nano_node/daemon.cpp b/nano/nano_node/daemon.cpp index 3ae2a68d9d..3a5a2784e8 100644 --- a/nano/nano_node/daemon.cpp +++ b/nano/nano_node/daemon.cpp @@ -145,7 +145,6 @@ void nano_daemon::daemon::run (boost::filesystem::path const & data_path, nano:: rpc->start (config.rpc_enable); } nano::ipc::ipc_server ipc (*node, *rpc); - config.node.disable_lazy_bootstrap = disable_lazy_bootstrap; runner = std::make_unique (io_ctx, node->config.io_threads); runner->join (); } From b0335df67321e85f4691fb479d2d5d54c8eac187 Mon Sep 17 00:00:00 2001 From: cryptocode Date: Sat, 29 Dec 2018 18:03:14 +0100 Subject: [PATCH 04/15] Lifetime fix for action handlers with async response --- nano/node/ipc.cpp | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/nano/node/ipc.cpp b/nano/node/ipc.cpp index e7257e7a55..e41e60861c 100644 --- a/nano/node/ipc.cpp +++ b/nano/node/ipc.cpp @@ -144,30 +144,31 @@ class session : public std::enable_shared_from_this> // This is called when the nano::rpc_handler#process_request is done. We convert to // json and writes the response to the ipc socket with a length prefix. - auto response_handler ([this, start](boost::property_tree::ptree const & tree_a) { + auto self_l (this->shared_from_this ()); + auto response_handler ([self_l, start](boost::property_tree::ptree const & tree_a) { std::stringstream ostream; boost::property_tree::write_json (ostream, tree_a); ostream.flush (); - request_body = ostream.str (); + self_l->request_body = ostream.str (); - uint32_t size_response = boost::endian::native_to_big ((uint32_t)request_body.size ()); + uint32_t size_response = boost::endian::native_to_big ((uint32_t)self_l->request_body.size ()); std::vector bufs = { boost::asio::buffer (&size_response, 4), - boost::asio::buffer (request_body) + boost::asio::buffer (self_l->request_body) }; - this->timer_start (this->config_transport.io_timeout); - boost::asio::async_write (this->socket, bufs, - writer_strand.wrap ( + self_l->timer_start (self_l->config_transport.io_timeout); + boost::asio::async_write (self_l->socket, bufs, + self_l->writer_strand.wrap ( boost::bind ( &session::handle_write, - this->shared_from_this (), + self_l, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred))); - if (this->node.config.logging.log_rpc ()) + if (self_l->node.config.logging.log_rpc ()) { - BOOST_LOG (this->node.log) << boost::str (boost::format ("IPC/RPC request %1% completed in: %2% microseconds") % request_id_str % std::chrono::duration_cast (std::chrono::steady_clock::now () - start).count ()); + BOOST_LOG (self_l->node.log) << boost::str (boost::format ("IPC/RPC request %1% completed in: %2% microseconds") % self_l->request_id_str % std::chrono::duration_cast (std::chrono::steady_clock::now () - start).count ()); } }); From 8c93af6e7538b4927493dff65d18eb87aaaf0732 Mon Sep 17 00:00:00 2001 From: cryptocode Date: Thu, 10 Jan 2019 03:59:45 +0100 Subject: [PATCH 05/15] Chrono for timeouts, more consistent arg names, docs --- nano/nano_node/daemon.cpp | 3 +- nano/node/ipc.cpp | 88 ++++++++++++++++++++------------------- 2 files changed, 47 insertions(+), 44 deletions(-) diff --git a/nano/nano_node/daemon.cpp b/nano/nano_node/daemon.cpp index 3a5a2784e8..6a7e0eded1 100644 --- a/nano/nano_node/daemon.cpp +++ b/nano/nano_node/daemon.cpp @@ -3,10 +3,9 @@ #include #include #include -#include b +#include #include #include -#include nano_daemon::daemon_config::daemon_config () : rpc_enable (false), diff --git a/nano/node/ipc.cpp b/nano/node/ipc.cpp index e41e60861c..ee35bb2416 100644 --- a/nano/node/ipc.cpp +++ b/nano/node/ipc.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -17,9 +18,9 @@ using namespace boost::log; -std::string nano::error_ipc_messages::message (int ev) const +std::string nano::error_ipc_messages::message (int error_code_a) const { - switch (static_cast (ev)) + switch (static_cast (error_code_a)) { case nano::error_ipc::generic: return "Unknown error"; @@ -84,38 +85,38 @@ class session : public std::enable_shared_from_this> * no error has occurred. On error, the error is logged, the read cycle stops and the session ends. Clients * are expected to implement reconnect logic. */ - void async_read_exactly (void * buff_a, size_t size_a, std::function fn) + void async_read_exactly (void * buff_a, size_t size_a, std::function callback_a) { - async_read_exactly (buff_a, size_a, config_transport.io_timeout, fn); + async_read_exactly (buff_a, size_a, std::chrono::seconds (config_transport.io_timeout), callback_a); } /** * Async read of exactly \p size bytes and a specific timeout. * @see async_read_exactly (void *, size_t, std::function) */ - void async_read_exactly (void * buff_a, size_t size, size_t timeout_seconds, std::function fn) + void async_read_exactly (void * buff_a, size_t size_a, std::chrono::seconds timeout_a, std::function callback_a) { - timer_start (timeout_seconds); + timer_start (timeout_a); boost::asio::async_read (socket, - boost::asio::buffer (buff_a, size), - boost::asio::transfer_exactly (size), + boost::asio::buffer (buff_a, size_a), + boost::asio::transfer_exactly (size_a), boost::bind (&session::handle_read_or_error, this->shared_from_this (), - fn, + callback_a, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); } - void handle_read_or_error (std::function fn, const boost::system::error_code & error, size_t bytes_transferred) + void handle_read_or_error (std::function callback_a, const boost::system::error_code & error_a, size_t bytes_transferred_a) { timer_cancel (); - if ((boost::asio::error::connection_aborted == error) || (boost::asio::error::connection_reset == error)) + if ((boost::asio::error::connection_aborted == error_a) || (boost::asio::error::connection_reset == error_a)) { - BOOST_LOG (node.log) << boost::str (boost::format ("IPC: error reading %1% ") % error.message ()); + BOOST_LOG (node.log) << boost::str (boost::format ("IPC: error reading %1% ") % error_a.message ()); } - else if (bytes_transferred > 0) + else if (bytes_transferred_a > 0) { - fn (); + callback_a (); } } @@ -123,16 +124,16 @@ class session : public std::enable_shared_from_this> * Write callback. If no error occurs, the session starts waiting for another request. * Clients are expected to implement reconnect logic. */ - void handle_write (const boost::system::error_code & error, size_t bytes_transferred) + void handle_write (const boost::system::error_code & error_a, size_t bytes_transferred_a) { timer_cancel (); - if (!error) + if (!error_a) { read_next_request (); } else { - BOOST_LOG (node.log) << "IPC: Write failed: " << error.message (); + BOOST_LOG (node.log) << "IPC: Write failed: " << error_a.message (); } } @@ -143,7 +144,7 @@ class session : public std::enable_shared_from_this> request_id_str = (boost::str (boost::format ("%1%") % boost::io::group (std::hex, std::showbase, request_id.fetch_add (1)))); // This is called when the nano::rpc_handler#process_request is done. We convert to - // json and writes the response to the ipc socket with a length prefix. + // json and write the response to the ipc socket with a length prefix. auto self_l (this->shared_from_this ()); auto response_handler ([self_l, start](boost::property_tree::ptree const & tree_a) { std::stringstream ostream; @@ -153,11 +154,11 @@ class session : public std::enable_shared_from_this> uint32_t size_response = boost::endian::native_to_big ((uint32_t)self_l->request_body.size ()); std::vector bufs = { - boost::asio::buffer (&size_response, 4), + boost::asio::buffer (&size_response, sizeof (size_response)), boost::asio::buffer (self_l->request_body) }; - self_l->timer_start (self_l->config_transport.io_timeout); + self_l->timer_start (std::chrono::seconds (self_l->config_transport.io_timeout)); boost::asio::async_write (self_l->socket, bufs, self_l->writer_strand.wrap ( boost::bind ( @@ -185,9 +186,10 @@ class session : public std::enable_shared_from_this> { auto this_l = this->shared_from_this (); - // Await preamble - buffer.resize (4); - async_read_exactly (buffer.data (), buffer.size (), std::numeric_limits::max (), [this_l]() { + // Await next request indefinitely. + // The request format is four bytes; ['N', payload-type, reserved, reserved] + buffer.resize (sizeof (buffer_size)); + async_read_exactly (buffer.data (), buffer.size (), std::chrono::seconds::max (), [this_l]() { if (this_l->buffer[0] != 'N') { BOOST_LOG (this_l->node.log) << "IPC: Invalid preamble"; @@ -195,7 +197,7 @@ class session : public std::enable_shared_from_this> else if (this_l->buffer[1] == static_cast (nano::ipc_encoding::json_legacy)) { // Length of query - this_l->async_read_exactly (&this_l->buffer_size, 4, [this_l]() { + this_l->async_read_exactly (&this_l->buffer_size, sizeof (this_l->buffer_size), [this_l]() { boost::endian::big_to_native_inplace (this_l->buffer_size); this_l->buffer.resize (this_l->buffer_size); // Query @@ -211,6 +213,7 @@ class session : public std::enable_shared_from_this> }); } + /** Shut down and close socket */ void close () { socket.shutdown (boost::asio::ip::tcp::socket::shutdown_both); @@ -220,13 +223,13 @@ class session : public std::enable_shared_from_this> protected: /** * Start IO timer. - * @param sec Seconds to wait. To wait indefinitely, use std::numeric_limits::max(). + * @param timeout_t Seconds to wait. To wait indefinitely, use std::numeric_limits::max(). */ - void timer_start (size_t sec) + void timer_start (std::chrono::seconds timeout_t) { - if (sec < std::numeric_limits::max ()) + if (timeout_t < std::chrono::seconds::max ()) { - io_timer.expires_from_now (boost::posix_time::seconds (sec)); + io_timer.expires_from_now (boost::posix_time::seconds (timeout_t.count ())); io_timer.async_wait ([this](const boost::system::error_code & ec) { if (!ec) { @@ -236,13 +239,13 @@ class session : public std::enable_shared_from_this> } } - void timer_expired (void) + void timer_expired () { close (); BOOST_LOG (node.log) << "IPC: IO timeout"; } - void timer_cancel (void) + void timer_cancel () { boost::system::error_code ec; this->io_timer.cancel (ec); @@ -273,7 +276,7 @@ class session : public std::enable_shared_from_this> */ boost::asio::io_context::strand writer_strand; - /** Receives the size of header/query payloads */ + /** Buffer sizes are read into this */ uint32_t buffer_size = 0; /** Buffer used to store data received from the client */ @@ -291,11 +294,11 @@ template class socket_transport : public nano::ipc::transport { public: - socket_transport (nano::ipc::ipc_server & server, ENDPOINT_TYPE ep, nano::ipc::ipc_config_transport & config_transport_a, int concurrency) : - server (server), config_transport (config_transport_a) + socket_transport (nano::ipc::ipc_server & server_a, ENDPOINT_TYPE ep, nano::ipc::ipc_config_transport & config_transport_a, int concurrency_a) : + server (server_a), config_transport (config_transport_a) { // Using a per-transport event dispatcher? - if (concurrency > 0) + if (concurrency_a > 0) { io_context = std::make_unique (); } @@ -308,9 +311,10 @@ class socket_transport : public nano::ipc::transport accept (); // Start serving IO requests. If concurrency is 0, the node's thread pool is used instead. - if (concurrency > 0) + // A separate io_context for domain sockets may facilitate better performance on some systems. + if (concurrency_a > 0) { - runner = std::make_unique (*io_context, concurrency); + runner = std::make_unique (*io_context, concurrency_a); } } @@ -328,15 +332,15 @@ class socket_transport : public nano::ipc::transport boost::asio::placeholders::error)); } - void handle_accept (std::shared_ptr> new_session, const boost::system::error_code & error) + void handle_accept (std::shared_ptr> new_session_a, const boost::system::error_code & error_a) { - if (!error) + if (!error_a) { - new_session->read_next_request (); + new_session_a->read_next_request (); } else { - BOOST_LOG (server.node.log) << "IPC acceptor error: " << error.message (); + BOOST_LOG (server.node.log) << "IPC acceptor error: " << error_a.message (); } if (acceptor->is_open ()) @@ -368,8 +372,8 @@ class socket_transport : public nano::ipc::transport class nano::ipc::dsock_file_remover { public: - dsock_file_remover (std::string file) : - filename (file) + dsock_file_remover (std::string file_a) : + filename (file_a) { std::remove (filename.c_str ()); } From 67e3062ac1ea4afaa5c3bdb4acbf6b35fc723bd4 Mon Sep 17 00:00:00 2001 From: cryptocode Date: Thu, 10 Jan 2019 19:41:55 +0100 Subject: [PATCH 06/15] Convert bind to lambda, use nano::timer, id dispenser and misc improvements --- nano/node/ipc.cpp | 165 ++++++++++++++++++++++----------------------- nano/node/ipc.hpp | 7 +- nano/node/node.cpp | 4 ++ 3 files changed, 91 insertions(+), 85 deletions(-) diff --git a/nano/node/ipc.cpp b/nano/node/ipc.cpp index ee35bb2416..a9f71d8a85 100644 --- a/nano/node/ipc.cpp +++ b/nano/node/ipc.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -60,19 +61,28 @@ bool nano::ipc::ipc_config::deserialize_json (nano::jsonconfig & tree_a) return error; } -/** A client session that manages its own lifetime */ +/** + * A session represents a client connection over which multiple requests/reponses are transmittet. + */ template class session : public std::enable_shared_from_this> { public: session (nano::ipc::ipc_server & server_a, boost::asio::io_context & io_context_a, nano::ipc::ipc_config_transport & config_transport_a) : - server (server_a), node (server_a.node), io_context (io_context_a), socket (io_context_a), writer_strand (io_context_a), io_timer (io_context_a), config_transport (config_transport_a) + server (server_a), node (server_a.node), session_id (server_a.id_dispenser.fetch_add (1)), io_context (io_context_a), socket (io_context_a), io_timer (io_context_a), config_transport (config_transport_a) { + if (node.config.logging.log_rpc ()) + { + BOOST_LOG (node.log) << "IPC: created session with id: " << session_id; + } } ~session () { - BOOST_LOG (node.log) << "IPC: session ended"; + if (node.config.logging.log_rpc ()) + { + BOOST_LOG (node.log) << "IPC: ended session with id: " << session_id; + } } SOCKET_TYPE & get_socket () @@ -97,27 +107,21 @@ class session : public std::enable_shared_from_this> void async_read_exactly (void * buff_a, size_t size_a, std::chrono::seconds timeout_a, std::function callback_a) { timer_start (timeout_a); + + auto this_l (this->shared_from_this ()); boost::asio::async_read (socket, boost::asio::buffer (buff_a, size_a), boost::asio::transfer_exactly (size_a), - boost::bind (&session::handle_read_or_error, - this->shared_from_this (), - callback_a, - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred)); - } - - void handle_read_or_error (std::function callback_a, const boost::system::error_code & error_a, size_t bytes_transferred_a) - { - timer_cancel (); - if ((boost::asio::error::connection_aborted == error_a) || (boost::asio::error::connection_reset == error_a)) - { - BOOST_LOG (node.log) << boost::str (boost::format ("IPC: error reading %1% ") % error_a.message ()); - } - else if (bytes_transferred_a > 0) - { - callback_a (); - } + [this_l, callback_a](boost::system::error_code const & ec, size_t bytes_transferred_a) { + if (ec == boost::asio::error::connection_aborted || ec == boost::asio::error::connection_reset) + { + BOOST_LOG (this_l->node.log) << boost::str (boost::format ("IPC: error reading %1% ") % ec.message ()); + } + else if (bytes_transferred_a > 0) + { + callback_a (); + } + }); } /** @@ -140,36 +144,40 @@ class session : public std::enable_shared_from_this> /** Handler for payloads of type nano::ipc_encoding::json_legacy */ void rpc_handle_query () { - auto start (std::chrono::steady_clock::now ()); - request_id_str = (boost::str (boost::format ("%1%") % boost::io::group (std::hex, std::showbase, request_id.fetch_add (1)))); + session_timer.restart (); + auto request_id_l (std::to_string (server.id_dispenser.fetch_add (1))); // This is called when the nano::rpc_handler#process_request is done. We convert to // json and write the response to the ipc socket with a length prefix. - auto self_l (this->shared_from_this ()); - auto response_handler ([self_l, start](boost::property_tree::ptree const & tree_a) { + auto this_l (this->shared_from_this ()); + auto response_handler_l ([this_l, request_id_l](boost::property_tree::ptree const & tree_a) { std::stringstream ostream; boost::property_tree::write_json (ostream, tree_a); ostream.flush (); - self_l->request_body = ostream.str (); + std::string request_body = ostream.str (); - uint32_t size_response = boost::endian::native_to_big ((uint32_t)self_l->request_body.size ()); + uint32_t size_response = boost::endian::native_to_big ((uint32_t)request_body.size ()); std::vector bufs = { boost::asio::buffer (&size_response, sizeof (size_response)), - boost::asio::buffer (self_l->request_body) + boost::asio::buffer (request_body) }; - self_l->timer_start (std::chrono::seconds (self_l->config_transport.io_timeout)); - boost::asio::async_write (self_l->socket, bufs, - self_l->writer_strand.wrap ( - boost::bind ( - &session::handle_write, - self_l, - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred))); + this_l->timer_start (std::chrono::seconds (this_l->config_transport.io_timeout)); + boost::asio::async_write (this_l->socket, bufs, [this_l](boost::system::error_code const & error_a, size_t size_a) { + this_l->timer_cancel (); + if (!error_a) + { + this_l->read_next_request (); + } + else + { + BOOST_LOG (this_l->node.log) << "IPC: Write failed: " << error_a.message (); + } + }); - if (self_l->node.config.logging.log_rpc ()) + if (this_l->node.config.logging.log_rpc ()) { - BOOST_LOG (self_l->node.log) << boost::str (boost::format ("IPC/RPC request %1% completed in: %2% microseconds") % self_l->request_id_str % std::chrono::duration_cast (std::chrono::steady_clock::now () - start).count ()); + BOOST_LOG (this_l->node.log) << boost::str (boost::format ("IPC/RPC request %1% completed in: %2% %3%") % request_id_l % this_l->session_timer.stop ().count () % this_l->session_timer.unit ()); } }); @@ -177,8 +185,8 @@ class session : public std::enable_shared_from_this> auto body (std::string ((char *)buffer.data (), buffer.size ())); // Note that if the rpc action is async, the shared_ptr lifetime will be extended by the action handler - auto handler (std::make_shared (node, server.rpc, body, request_id_str, response_handler)); - handler->process_request (); + nano::rpc_handler handler (node, server.rpc, body, request_id_l, response_handler_l); + handler.process_request (); } /** Async request reader */ @@ -223,13 +231,13 @@ class session : public std::enable_shared_from_this> protected: /** * Start IO timer. - * @param timeout_t Seconds to wait. To wait indefinitely, use std::numeric_limits::max(). + * @param timeout_a Seconds to wait. To wait indefinitely, use std::chrono::seconds::max () */ - void timer_start (std::chrono::seconds timeout_t) + void timer_start (std::chrono::seconds timeout_a) { - if (timeout_t < std::chrono::seconds::max ()) + if (timeout_a < std::chrono::seconds::max ()) { - io_timer.expires_from_now (boost::posix_time::seconds (timeout_t.count ())); + io_timer.expires_from_now (boost::posix_time::seconds (timeout_a.count ())); io_timer.async_wait ([this](const boost::system::error_code & ec) { if (!ec) { @@ -255,10 +263,11 @@ class session : public std::enable_shared_from_this> nano::ipc::ipc_server & server; nano::node & node; - // Request data for ipc_encoding::json_legacy payloads - std::atomic request_id{ 0 }; - std::string request_body; - std::string request_id_str; + /** Unique session id used for logging */ + uint64_t session_id; + + /** Timer for measuring operations */ + nano::timer session_timer; /** * IO context from node, or per-transport, depending on configuration. @@ -270,14 +279,8 @@ class session : public std::enable_shared_from_this> /** A socket of the given asio type */ SOCKET_TYPE socket; - /** - * Allow multiple threads to write simultaniously. This allows for future extensions like - * async callback feeds without locking. - */ - boost::asio::io_context::strand writer_strand; - /** Buffer sizes are read into this */ - uint32_t buffer_size = 0; + uint32_t buffer_size{ 0 }; /** Buffer used to store data received from the client */ std::vector buffer; @@ -294,65 +297,61 @@ template class socket_transport : public nano::ipc::transport { public: - socket_transport (nano::ipc::ipc_server & server_a, ENDPOINT_TYPE ep, nano::ipc::ipc_config_transport & config_transport_a, int concurrency_a) : + socket_transport (nano::ipc::ipc_server & server_a, ENDPOINT_TYPE endpoint_a, nano::ipc::ipc_config_transport & config_transport_a, int concurrency_a) : server (server_a), config_transport (config_transport_a) { // Using a per-transport event dispatcher? if (concurrency_a > 0) { - io_context = std::make_unique (); + io_ctx = std::make_unique (); } boost::asio::socket_base::reuse_address option (true); boost::asio::socket_base::keep_alive option_keepalive (true); - acceptor = std::make_unique (context (), ep); + acceptor = std::make_unique (context (), endpoint_a); acceptor->set_option (option); acceptor->set_option (option_keepalive); accept (); - // Start serving IO requests. If concurrency is 0, the node's thread pool is used instead. + // Start serving IO requests. If concurrency_a is 0, the node's thread pool/io_context is used instead. // A separate io_context for domain sockets may facilitate better performance on some systems. if (concurrency_a > 0) { - runner = std::make_unique (*io_context, concurrency_a); + runner = std::make_unique (*io_ctx, concurrency_a); } } boost::asio::io_context & context () const { - return io_context ? *io_context : server.node.io_ctx; + return io_ctx ? *io_ctx : server.node.io_ctx; } void accept () { - std::shared_ptr> new_session (new session (server, io_context ? *io_context : server.node.io_ctx, config_transport)); - - acceptor->async_accept (new_session->get_socket (), - boost::bind (&socket_transport::handle_accept, this, new_session, - boost::asio::placeholders::error)); - } + // Prepare the next session + auto new_session (std::make_shared> (server, context (), config_transport)); - void handle_accept (std::shared_ptr> new_session_a, const boost::system::error_code & error_a) - { - if (!error_a) - { - new_session_a->read_next_request (); - } - else - { - BOOST_LOG (server.node.log) << "IPC acceptor error: " << error_a.message (); - } + acceptor->async_accept (new_session->get_socket (), [this, new_session](boost::system::error_code const & ec) { + if (!ec) + { + new_session->read_next_request (); + } + else + { + BOOST_LOG (server.node.log) << "IPC acceptor error: " << ec.message (); + } - if (acceptor->is_open ()) - { - accept (); - } + if (acceptor->is_open () && ec != boost::asio::error::operation_aborted) + { + this->accept (); + } + }); } void stop () { acceptor->close (); - io_context->stop (); + io_ctx->stop (); if (runner) { @@ -364,7 +363,7 @@ class socket_transport : public nano::ipc::transport nano::ipc::ipc_server & server; nano::ipc::ipc_config_transport & config_transport; std::unique_ptr runner; - std::unique_ptr io_context; + std::unique_ptr io_ctx; std::unique_ptr acceptor; }; diff --git a/nano/node/ipc.hpp b/nano/node/ipc.hpp index 9a1467ef23..455a77a03e 100644 --- a/nano/node/ipc.hpp +++ b/nano/node/ipc.hpp @@ -22,7 +22,7 @@ enum class error_ipc invalid_preamble = 2, }; -/** Payload encodings, add protobuf, etc as needed. */ +/** Payload encodings; add protobuf, etc as needed. */ enum class ipc_encoding : uint8_t { json_legacy = 1 @@ -80,7 +80,7 @@ namespace ipc ipc_config_tcp_socket transport_tcp; }; - /** IPC server */ + /** The IPC server accepts connections on one or more configured transports */ class ipc_server { public: @@ -91,6 +91,9 @@ namespace ipc nano::node & node; nano::rpc & rpc; + /** Unique counter/id shared across sessions */ + std::atomic id_dispenser{ 0 }; + private: std::atomic stopped; std::unique_ptr file_remover; diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 8ea60eb31c..3db3bc8fd8 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -3664,6 +3664,10 @@ nano::thread_runner::thread_runner (boost::asio::io_context & io_ctx_a, unsigned { io_ctx_a.run (); } + catch (std::exception const & ex) + { + std::cerr << ex.what () << std::endl; + } catch (...) { #ifndef NDEBUG From 7876a73bbe725bf91cae180f99fa01c53d8ea51e Mon Sep 17 00:00:00 2001 From: cryptocode Date: Thu, 17 Jan 2019 02:56:52 +0100 Subject: [PATCH 07/15] Configuration upgrade --- lmdb | 2 +- nano/node/ipc.cpp | 28 ++++++++++++++++++++-------- nano/node/ipc.hpp | 6 +++--- nano/node/nodeconfig.cpp | 12 ++++++++++++ nano/node/nodeconfig.hpp | 2 +- 5 files changed, 37 insertions(+), 13 deletions(-) diff --git a/lmdb b/lmdb index 163c273b46..58d12fb3ab 160000 --- a/lmdb +++ b/lmdb @@ -1 +1 @@ -Subproject commit 163c273b46fd40b49f522820c2a7e2753fa889ac +Subproject commit 58d12fb3abd8d50b7a81d435c6409bb1bc1af9f5 diff --git a/nano/node/ipc.cpp b/nano/node/ipc.cpp index a9f71d8a85..12339b33bd 100644 --- a/nano/node/ipc.cpp +++ b/nano/node/ipc.cpp @@ -30,16 +30,28 @@ std::string nano::error_ipc_messages::message (int error_code_a) const } return "Invalid error"; } -void nano::ipc::ipc_config::serialize_json (nano::jsonconfig & json) +nano::error nano::ipc::ipc_config::serialize_json (nano::jsonconfig & json) const { - + nano::jsonconfig tcp_l; + tcp_l.put ("io_threads", transport_tcp.io_threads); + tcp_l.put ("enable", transport_tcp.enabled); + tcp_l.put ("address", transport_tcp.address); + tcp_l.put ("port", transport_tcp.port); + tcp_l.put ("io_timeout", transport_tcp.io_timeout); + json.put_child ("tcp", tcp_l); + + nano::jsonconfig domain_l; + domain_l.put ("io_threads", transport_domain.io_threads); + domain_l.put ("enable", transport_domain.enabled); + domain_l.put ("path", transport_domain.path); + domain_l.put ("io_timeout", transport_domain.io_timeout); + json.put_child ("domain", domain_l); + return json.get_error (); } -bool nano::ipc::ipc_config::deserialize_json (nano::jsonconfig & tree_a) +nano::error nano::ipc::ipc_config::deserialize_json (nano::jsonconfig & json) { - bool error = false; - - auto tcp_l (tree_a.get_optional_child ("tcp")); + auto tcp_l (json.get_optional_child ("tcp")); if (tcp_l) { tcp_l->get ("io_threads", transport_tcp.io_threads); @@ -49,7 +61,7 @@ bool nano::ipc::ipc_config::deserialize_json (nano::jsonconfig & tree_a) tcp_l->get ("io_timeout", transport_tcp.io_timeout); } - auto domain_l (tree_a.get_optional_child ("local")); + auto domain_l (json.get_optional_child ("local")); if (domain_l) { domain_l->get ("io_threads", transport_domain.io_threads); @@ -58,7 +70,7 @@ bool nano::ipc::ipc_config::deserialize_json (nano::jsonconfig & tree_a) domain_l->get ("io_timeout", transport_domain.io_timeout); } - return error; + return json.get_error (); } /** diff --git a/nano/node/ipc.hpp b/nano/node/ipc.hpp index 455a77a03e..dfe93e2319 100644 --- a/nano/node/ipc.hpp +++ b/nano/node/ipc.hpp @@ -47,7 +47,7 @@ namespace ipc public: bool enabled{ false }; size_t io_timeout{ 15 }; - size_t io_threads{std::max (4u, std::thread::hardware_concurrency ())}; + size_t io_threads{ std::max (4u, std::thread::hardware_concurrency ()) }; }; /** Domain socket specific transport config */ @@ -74,8 +74,8 @@ namespace ipc { public: /** Reads the JSON "ipc" node from the config, if present */ - bool deserialize_json (nano::jsonconfig & json_a); - void serialize_json (nano::jsonconfig & json); + nano::error deserialize_json (nano::jsonconfig & json_a); + nano::error serialize_json (nano::jsonconfig & json) const; ipc_config_domain_socket transport_domain; ipc_config_tcp_socket transport_tcp; }; diff --git a/nano/node/nodeconfig.cpp b/nano/node/nodeconfig.cpp index 71c051299a..a4bb79900d 100644 --- a/nano/node/nodeconfig.cpp +++ b/nano/node/nodeconfig.cpp @@ -115,6 +115,11 @@ nano::error nano::node_config::serialize_json (nano::jsonconfig & json) const json.put ("lmdb_max_dbs", lmdb_max_dbs); json.put ("block_processor_batch_max_time", block_processor_batch_max_time.count ()); json.put ("allow_local_peers", allow_local_peers); + + nano::jsonconfig ipc_l; + ipc_config.serialize_json (ipc_l); + json.put_child ("ipc", ipc_l); + return json.get_error (); } @@ -225,6 +230,13 @@ bool nano::node_config::upgrade_json (unsigned version_a, nano::jsonconfig & jso upgraded = true; } case 16: + { + nano::jsonconfig ipc_l; + ipc_config.serialize_json (ipc_l); + json.put_child ("ipc", ipc_l); + upgraded = true; + } + case 17: break; default: throw std::runtime_error ("Unknown node_config version"); diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index aabae5b3bf..482c4a4169 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -54,7 +54,7 @@ class node_config static std::chrono::minutes constexpr wallet_backup_interval = std::chrono::minutes (5); static int json_version () { - return 16; + return 17; } }; From b5c60eb9b56d3692656b26787082a02612b97ac1 Mon Sep 17 00:00:00 2001 From: cryptocode Date: Thu, 17 Jan 2019 16:57:08 +0100 Subject: [PATCH 08/15] Config upgrade and ipc logging flag --- lmdb | 2 +- nano/node/ipc.hpp | 7 ++++++- nano/node/logging.cpp | 9 +++++++++ nano/node/logging.hpp | 2 ++ 4 files changed, 18 insertions(+), 2 deletions(-) diff --git a/lmdb b/lmdb index 58d12fb3ab..163c273b46 160000 --- a/lmdb +++ b/lmdb @@ -1 +1 @@ -Subproject commit 58d12fb3abd8d50b7a81d435c6409bb1bc1af9f5 +Subproject commit 163c273b46fd40b49f522820c2a7e2753fa889ac diff --git a/nano/node/ipc.hpp b/nano/node/ipc.hpp index dfe93e2319..98e27c4ac3 100644 --- a/nano/node/ipc.hpp +++ b/nano/node/ipc.hpp @@ -22,9 +22,14 @@ enum class error_ipc invalid_preamble = 2, }; -/** Payload encodings; add protobuf, etc as needed. */ +/** + * Payload encodings; add protobuf, flatbuffers and so on as needed. + * The IPC framing format is preamble followed by an encoding specific payload. + * Preamble is uint8_t {'N', encoding_type, reserved, reserved}. Reserved bytes must be zero. + */ enum class ipc_encoding : uint8_t { + /** json_legacy request format is: preamble followed by 32-bit BE payload length. Response is 32-bit BE payload length followed by payload. */ json_legacy = 1 }; diff --git a/nano/node/logging.cpp b/nano/node/logging.cpp index 4d41dc6298..fd8e2a898a 100644 --- a/nano/node/logging.cpp +++ b/nano/node/logging.cpp @@ -18,6 +18,7 @@ network_node_id_handshake_logging_value (false), node_lifetime_tracing_value (false), insufficient_work_logging_value (true), log_rpc_value (true), +log_ipc_value (true), bulk_pull_logging_value (false), work_generation_time_value (true), upnp_details_logging_value (false), @@ -58,6 +59,7 @@ nano::error nano::logging::serialize_json (nano::jsonconfig & json) const json.put ("node_lifetime_tracing", node_lifetime_tracing_value); json.put ("insufficient_work", insufficient_work_logging_value); json.put ("log_rpc", log_rpc_value); + json.put ("log_ipc", log_ipc_value); json.put ("bulk_pull", bulk_pull_logging_value); json.put ("work_generation_time", work_generation_time_value); json.put ("upnp_details", upnp_details_logging_value); @@ -94,6 +96,7 @@ bool nano::logging::upgrade_json (unsigned version_a, nano::jsonconfig & json) json.get ("max_size", config_max_size); max_size = std::max (max_size, config_max_size); json.put ("max_size", max_size); + json.put ("log_ipc", true); upgraded_l = true; case 6: break; @@ -138,6 +141,7 @@ nano::error nano::logging::deserialize_json (bool & upgraded_a, nano::jsonconfig json.get ("node_lifetime_tracing", node_lifetime_tracing_value); json.get ("insufficient_work", insufficient_work_logging_value); json.get ("log_rpc", log_rpc_value); + json.get ("log_ipc", log_ipc_value); json.get ("bulk_pull", bulk_pull_logging_value); json.get ("work_generation_time", work_generation_time_value); json.get ("upnp_details", upnp_details_logging_value); @@ -210,6 +214,11 @@ bool nano::logging::log_rpc () const return network_logging () && log_rpc_value; } +bool nano::logging::log_ipc () const +{ + return network_logging () && log_ipc_value; +} + bool nano::logging::bulk_pull_logging () const { return network_logging () && bulk_pull_logging_value; diff --git a/nano/node/logging.hpp b/nano/node/logging.hpp index f4788e7d03..49c5acfa39 100644 --- a/nano/node/logging.hpp +++ b/nano/node/logging.hpp @@ -32,6 +32,7 @@ class logging bool upnp_details_logging () const; bool timing_logging () const; bool log_rpc () const; + bool log_ipc () const; bool bulk_pull_logging () const; bool callback_logging () const; bool work_generation_time () const; @@ -50,6 +51,7 @@ class logging bool node_lifetime_tracing_value; bool insufficient_work_logging_value; bool log_rpc_value; + bool log_ipc_value; bool bulk_pull_logging_value; bool work_generation_time_value; bool upnp_details_logging_value; From 954db53fd0f071871c5f0fda74408a49d7fc7f8c Mon Sep 17 00:00:00 2001 From: cryptocode Date: Thu, 17 Jan 2019 19:52:51 +0100 Subject: [PATCH 09/15] log_ipc flag, reserved bytes in preamble, misc cleanup --- nano/node/ipc.cpp | 86 ++++++++++++++++++++++++++++++----------------- nano/node/ipc.hpp | 21 ------------ 2 files changed, 56 insertions(+), 51 deletions(-) diff --git a/nano/node/ipc.cpp b/nano/node/ipc.cpp index 12339b33bd..f38d124f93 100644 --- a/nano/node/ipc.cpp +++ b/nano/node/ipc.cpp @@ -18,17 +18,36 @@ #include using namespace boost::log; +namespace +{ +/** + * Payload encodings; add protobuf, flatbuffers and so on as needed. + */ +enum class payload_encoding : uint8_t +{ + /** + * json_legacy request format is: preamble followed by 32-bit BE payload length. + * Response is 32-bit BE payload length followed by payload. + */ + json_legacy = 1 +}; -std::string nano::error_ipc_messages::message (int error_code_a) const +/** + * The IPC framing format is preamble followed by an encoding specific payload. + * Preamble is uint8_t {'N', encoding_type, reserved, reserved}. Reserved bytes must be zero. + * @note This is intentionally not an enum class as the values are only used as vector indices. + */ +enum preamble_offset { - switch (static_cast (error_code_a)) - { - case nano::error_ipc::generic: - return "Unknown error"; - case nano::error_ipc::invalid_preamble: - return "Invalid preamble"; - } - return "Invalid error"; + /** Always 'N' */ + lead = 0, + /** One of the payload_encoding values */ + encoding = 1, + /** Always zero */ + reserved_1 = 2, + /** Always zero */ + reserved_2 = 3, +}; } nano::error nano::ipc::ipc_config::serialize_json (nano::jsonconfig & json) const { @@ -45,7 +64,7 @@ nano::error nano::ipc::ipc_config::serialize_json (nano::jsonconfig & json) cons domain_l.put ("enable", transport_domain.enabled); domain_l.put ("path", transport_domain.path); domain_l.put ("io_timeout", transport_domain.io_timeout); - json.put_child ("domain", domain_l); + json.put_child ("local", domain_l); return json.get_error (); } @@ -83,7 +102,7 @@ class session : public std::enable_shared_from_this> session (nano::ipc::ipc_server & server_a, boost::asio::io_context & io_context_a, nano::ipc::ipc_config_transport & config_transport_a) : server (server_a), node (server_a.node), session_id (server_a.id_dispenser.fetch_add (1)), io_context (io_context_a), socket (io_context_a), io_timer (io_context_a), config_transport (config_transport_a) { - if (node.config.logging.log_rpc ()) + if (node.config.logging.log_ipc ()) { BOOST_LOG (node.log) << "IPC: created session with id: " << session_id; } @@ -91,7 +110,7 @@ class session : public std::enable_shared_from_this> ~session () { - if (node.config.logging.log_rpc ()) + if (node.config.logging.log_ipc ()) { BOOST_LOG (node.log) << "IPC: ended session with id: " << session_id; } @@ -127,7 +146,10 @@ class session : public std::enable_shared_from_this> [this_l, callback_a](boost::system::error_code const & ec, size_t bytes_transferred_a) { if (ec == boost::asio::error::connection_aborted || ec == boost::asio::error::connection_reset) { - BOOST_LOG (this_l->node.log) << boost::str (boost::format ("IPC: error reading %1% ") % ec.message ()); + if (this_l->node.config.logging.log_ipc ()) + { + BOOST_LOG (this_l->node.log) << boost::str (boost::format ("IPC: error reading %1% ") % ec.message ()); + } } else if (bytes_transferred_a > 0) { @@ -138,7 +160,6 @@ class session : public std::enable_shared_from_this> /** * Write callback. If no error occurs, the session starts waiting for another request. - * Clients are expected to implement reconnect logic. */ void handle_write (const boost::system::error_code & error_a, size_t bytes_transferred_a) { @@ -147,13 +168,13 @@ class session : public std::enable_shared_from_this> { read_next_request (); } - else + else if (node.config.logging.log_ipc ()) { BOOST_LOG (node.log) << "IPC: Write failed: " << error_a.message (); } } - /** Handler for payloads of type nano::ipc_encoding::json_legacy */ + /** Handler for payload_encoding::json_legacy */ void rpc_handle_query () { session_timer.restart (); @@ -181,13 +202,13 @@ class session : public std::enable_shared_from_this> { this_l->read_next_request (); } - else + else if (this_l->node.config.logging.log_ipc ()) { BOOST_LOG (this_l->node.log) << "IPC: Write failed: " << error_a.message (); } }); - if (this_l->node.config.logging.log_rpc ()) + if (this_l->node.config.logging.log_ipc ()) { BOOST_LOG (this_l->node.log) << boost::str (boost::format ("IPC/RPC request %1% completed in: %2% %3%") % request_id_l % this_l->session_timer.stop ().count () % this_l->session_timer.unit ()); } @@ -206,27 +227,30 @@ class session : public std::enable_shared_from_this> { auto this_l = this->shared_from_this (); - // Await next request indefinitely. - // The request format is four bytes; ['N', payload-type, reserved, reserved] + // Await next request indefinitely. The request format is as follows: + // u8['N', payload-type, reserved, reserved] buffer.resize (sizeof (buffer_size)); async_read_exactly (buffer.data (), buffer.size (), std::chrono::seconds::max (), [this_l]() { - if (this_l->buffer[0] != 'N') + if (this_l->buffer[preamble_offset::lead] != 'N' || this_l->buffer[preamble_offset::reserved_1] != 0 || this_l->buffer[preamble_offset::reserved_2] != 0) { - BOOST_LOG (this_l->node.log) << "IPC: Invalid preamble"; + if (this_l->node.config.logging.log_ipc ()) + { + BOOST_LOG (this_l->node.log) << "IPC: Invalid preamble"; + } } - else if (this_l->buffer[1] == static_cast (nano::ipc_encoding::json_legacy)) + else if (this_l->buffer[preamble_offset::encoding] == static_cast (payload_encoding::json_legacy)) { - // Length of query + // Length of payload this_l->async_read_exactly (&this_l->buffer_size, sizeof (this_l->buffer_size), [this_l]() { boost::endian::big_to_native_inplace (this_l->buffer_size); this_l->buffer.resize (this_l->buffer_size); - // Query + // Payload (ptree compliant JSON string) this_l->async_read_exactly (this_l->buffer.data (), this_l->buffer_size, [this_l]() { this_l->rpc_handle_query (); }); }); } - else + else if (this_l->node.config.logging.log_ipc ()) { BOOST_LOG (this_l->node.log) << "IPC: Unsupported payload encoding"; } @@ -262,7 +286,10 @@ class session : public std::enable_shared_from_this> void timer_expired () { close (); - BOOST_LOG (node.log) << "IPC: IO timeout"; + if (node.config.logging.log_ipc ()) + { + BOOST_LOG (node.log) << "IPC: IO timeout"; + } } void timer_cancel () @@ -283,8 +310,7 @@ class session : public std::enable_shared_from_this> /** * IO context from node, or per-transport, depending on configuration. - * Certain transport configurations (tcp+ssl) may scale better if they use a - * separate context. + * Certain transports may scale better if they use a separate context. */ boost::asio::io_context & io_context; @@ -350,7 +376,7 @@ class socket_transport : public nano::ipc::transport } else { - BOOST_LOG (server.node.log) << "IPC acceptor error: " << ec.message (); + BOOST_LOG (server.node.log) << "IPC: acceptor error: " << ec.message (); } if (acceptor->is_open () && ec != boost::asio::error::operation_aborted) diff --git a/nano/node/ipc.hpp b/nano/node/ipc.hpp index 98e27c4ac3..5f06f54923 100644 --- a/nano/node/ipc.hpp +++ b/nano/node/ipc.hpp @@ -15,24 +15,6 @@ class rpc; namespace nano { -/** IPC errors. Do not change or reuse enum values as these propagate to clients. */ -enum class error_ipc -{ - generic = 1, - invalid_preamble = 2, -}; - -/** - * Payload encodings; add protobuf, flatbuffers and so on as needed. - * The IPC framing format is preamble followed by an encoding specific payload. - * Preamble is uint8_t {'N', encoding_type, reserved, reserved}. Reserved bytes must be zero. - */ -enum class ipc_encoding : uint8_t -{ - /** json_legacy request format is: preamble followed by 32-bit BE payload length. Response is 32-bit BE payload length followed by payload. */ - json_legacy = 1 -}; - namespace ipc { /** Removes domain socket files on startup and shutdown */ @@ -78,7 +60,6 @@ namespace ipc class ipc_config { public: - /** Reads the JSON "ipc" node from the config, if present */ nano::error deserialize_json (nano::jsonconfig & json_a); nano::error serialize_json (nano::jsonconfig & json) const; ipc_config_domain_socket transport_domain; @@ -106,5 +87,3 @@ namespace ipc }; } } - -REGISTER_ERROR_CODES (nano, error_ipc) From 869b177fc7e82cc679ef6214f45cf7169439d60c Mon Sep 17 00:00:00 2001 From: cryptocode Date: Sun, 20 Jan 2019 01:51:59 +0100 Subject: [PATCH 10/15] Add ipc client and tests --- nano/core_test/CMakeLists.txt | 1 + nano/core_test/ipc.cpp | 87 ++++++++ nano/node/ipc.cpp | 407 ++++++++++++++++++++++++++-------- nano/node/ipc.hpp | 74 ++++++- 4 files changed, 467 insertions(+), 102 deletions(-) create mode 100644 nano/core_test/ipc.cpp diff --git a/nano/core_test/CMakeLists.txt b/nano/core_test/CMakeLists.txt index 33c49ce401..e4c0979af7 100644 --- a/nano/core_test/CMakeLists.txt +++ b/nano/core_test/CMakeLists.txt @@ -3,6 +3,7 @@ add_executable (core_test block.cpp block_store.cpp interface.cpp + ipc.cpp conflicts.cpp daemon.cpp entry.cpp diff --git a/nano/core_test/ipc.cpp b/nano/core_test/ipc.cpp new file mode 100644 index 0000000000..c53cac4047 --- /dev/null +++ b/nano/core_test/ipc.cpp @@ -0,0 +1,87 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace std::chrono_literals; + +TEST (ipc, asynchronous) +{ + nano::system system (24000, 1); + nano::rpc rpc (system.io_ctx, *system.nodes[0], nano::rpc_config (true)); + system.nodes[0]->config.ipc_config.transport_tcp.enabled = true; + system.nodes[0]->config.ipc_config.transport_tcp.port = 24077; + nano::ipc::ipc_server ipc (*system.nodes[0], rpc); + nano::ipc::ipc_client client (system.nodes[0]->io_ctx); + + auto req (client.prepare_request (nano::ipc::payload_encoding::json_legacy, std::string (R"({"action": "block_count"})"))); + auto res (std::make_shared> ()); + std::atomic call_completed{ false }; + client.async_connect ("::1", 24077, [&client, &req, &res, &call_completed](nano::error err) { + client.async_write (req, [&client, &req, &res, &call_completed](nano::error err_a, size_t size_a) { + ASSERT_NO_ERROR (static_cast (err_a)); + ASSERT_EQ (size_a, req->size ()); + // Read length + client.async_read (res, sizeof (uint32_t), [&client, &res, &call_completed](nano::error err_read_a, size_t size_read_a) { + ASSERT_NO_ERROR (static_cast (err_read_a)); + ASSERT_EQ (size_read_a, sizeof (uint32_t)); + uint32_t payload_size_l = boost::endian::big_to_native (*reinterpret_cast (res->data ())); + // Read json payload + client.async_read (res, payload_size_l, [&res, &call_completed](nano::error err_read_a, size_t size_read_a) { + std::string payload (res->begin (), res->end ()); + std::stringstream ss; + ss << payload; + + // Make sure the response is valid json + boost::property_tree::ptree blocks; + boost::property_tree::read_json (ss, blocks); + ASSERT_EQ (blocks.get ("count"), 1); + call_completed = true; + }); + }); + }); + }); + system.deadline_set (5s); + while (!call_completed) + { + ASSERT_NO_ERROR (system.poll ()); + } +} + +TEST (ipc, synchronous) +{ + nano::system system (24000, 1); + nano::rpc rpc (system.io_ctx, *system.nodes[0], nano::rpc_config (true)); + system.nodes[0]->config.ipc_config.transport_tcp.enabled = true; + system.nodes[0]->config.ipc_config.transport_tcp.port = 24077; + nano::ipc::ipc_server ipc (*system.nodes[0], rpc); + nano::ipc::rpc_ipc_client client (system.nodes[0]->io_ctx); + + // Start blocking IPC client in a separate thread + std::atomic call_completed{ false }; + std::thread client_thread ([&client, &call_completed]() { + client.connect ("::1", 24077); + std::string response (client.request (std::string (R"({"action": "block_count"})"))); + std::stringstream ss; + ss << response; + // Make sure the response is valid json + boost::property_tree::ptree blocks; + boost::property_tree::read_json (ss, blocks); + ASSERT_EQ (blocks.get ("count"), 1); + + call_completed = true; + }); + client_thread.detach (); + + system.deadline_set (5s); + while (!call_completed) + { + ASSERT_NO_ERROR (system.poll ()); + } +} diff --git a/nano/node/ipc.cpp b/nano/node/ipc.cpp index f38d124f93..42ccf136b7 100644 --- a/nano/node/ipc.cpp +++ b/nano/node/ipc.cpp @@ -1,14 +1,15 @@ #include #include -#include #include #include +#include #include #include #include #include #include #include +#include #include #include #include @@ -21,20 +22,8 @@ using namespace boost::log; namespace { /** - * Payload encodings; add protobuf, flatbuffers and so on as needed. - */ -enum class payload_encoding : uint8_t -{ - /** - * json_legacy request format is: preamble followed by 32-bit BE payload length. - * Response is 32-bit BE payload length followed by payload. - */ - json_legacy = 1 -}; - -/** - * The IPC framing format is preamble followed by an encoding specific payload. - * Preamble is uint8_t {'N', encoding_type, reserved, reserved}. Reserved bytes must be zero. + * The IPC framing format is simple: preamble followed by an encoding specific payload. + * Preamble is uint8_t {'N', encoding_type, reserved, reserved}. Reserved bytes MUST be zero. * @note This is intentionally not an enum class as the values are only used as vector indices. */ enum preamble_offset @@ -52,15 +41,21 @@ enum preamble_offset nano::error nano::ipc::ipc_config::serialize_json (nano::jsonconfig & json) const { nano::jsonconfig tcp_l; - tcp_l.put ("io_threads", transport_tcp.io_threads); + // Only write out experimental config values if they're previously set explicitly in the config file + if (transport_tcp.io_threads >= 0) + { + tcp_l.put ("io_threads", transport_tcp.io_threads); + } tcp_l.put ("enable", transport_tcp.enabled); - tcp_l.put ("address", transport_tcp.address); tcp_l.put ("port", transport_tcp.port); tcp_l.put ("io_timeout", transport_tcp.io_timeout); json.put_child ("tcp", tcp_l); nano::jsonconfig domain_l; - domain_l.put ("io_threads", transport_domain.io_threads); + if (transport_domain.io_threads >= 0) + { + domain_l.put ("io_threads", transport_domain.io_threads); + } domain_l.put ("enable", transport_domain.enabled); domain_l.put ("path", transport_domain.path); domain_l.put ("io_timeout", transport_domain.io_timeout); @@ -73,9 +68,8 @@ nano::error nano::ipc::ipc_config::deserialize_json (nano::jsonconfig & json) auto tcp_l (json.get_optional_child ("tcp")); if (tcp_l) { - tcp_l->get ("io_threads", transport_tcp.io_threads); + tcp_l->get_optional ("io_threads", transport_tcp.io_threads, -1); tcp_l->get ("enable", transport_tcp.enabled); - tcp_l->get ("address", transport_tcp.address); tcp_l->get ("port", transport_tcp.port); tcp_l->get ("io_timeout", transport_tcp.io_timeout); } @@ -83,7 +77,7 @@ nano::error nano::ipc::ipc_config::deserialize_json (nano::jsonconfig & json) auto domain_l (json.get_optional_child ("local")); if (domain_l) { - domain_l->get ("io_threads", transport_domain.io_threads); + domain_l->get_optional ("io_threads", transport_domain.io_threads, -1); domain_l->get ("enable", transport_domain.enabled); domain_l->get ("path", transport_domain.path); domain_l->get ("io_timeout", transport_domain.io_timeout); @@ -92,15 +86,63 @@ nano::error nano::ipc::ipc_config::deserialize_json (nano::jsonconfig & json) return json.get_error (); } +/** Abstract base type for sockets, implementing timer logic and a close operation */ +class socket_base +{ +public: + socket_base (boost::asio::io_context & io_ctx_a) : + io_timer (io_ctx_a) + { + } + virtual ~socket_base () = default; + + /** Close socket */ + virtual void close () = 0; + + /** + * Start IO timer. + * @param timeout_a Seconds to wait. To wait indefinitely, use std::chrono::seconds::max () + */ + void timer_start (std::chrono::seconds timeout_a) + { + if (timeout_a < std::chrono::seconds::max ()) + { + io_timer.expires_from_now (boost::posix_time::seconds (timeout_a.count ())); + io_timer.async_wait ([this](const boost::system::error_code & ec) { + if (!ec) + { + this->timer_expired (); + } + }); + } + } + + void timer_expired () + { + close (); + } + + void timer_cancel () + { + boost::system::error_code ec; + this->io_timer.cancel (ec); + } + +private: + /** IO operation timer */ + boost::asio::deadline_timer io_timer; +}; + /** - * A session represents a client connection over which multiple requests/reponses are transmittet. + * A session represents an inbound connection over which multiple requests/reponses are transmitted. */ template -class session : public std::enable_shared_from_this> +class session : public socket_base, public std::enable_shared_from_this> { public: - session (nano::ipc::ipc_server & server_a, boost::asio::io_context & io_context_a, nano::ipc::ipc_config_transport & config_transport_a) : - server (server_a), node (server_a.node), session_id (server_a.id_dispenser.fetch_add (1)), io_context (io_context_a), socket (io_context_a), io_timer (io_context_a), config_transport (config_transport_a) + session (nano::ipc::ipc_server & server_a, boost::asio::io_context & io_ctx_a, nano::ipc::ipc_config_transport & config_transport_a) : + socket_base (io_ctx_a), + server (server_a), node (server_a.node), session_id (server_a.id_dispenser.fetch_add (1)), io_ctx (io_ctx_a), socket (io_ctx_a), config_transport (config_transport_a) { if (node.config.logging.log_ipc ()) { @@ -110,10 +152,6 @@ class session : public std::enable_shared_from_this> ~session () { - if (node.config.logging.log_ipc ()) - { - BOOST_LOG (node.log) << "IPC: ended session with id: " << session_id; - } } SOCKET_TYPE & get_socket () @@ -122,7 +160,7 @@ class session : public std::enable_shared_from_this> } /** - * Async read of exactly \p size bytes. The callback is called only when all the data is available and + * Async read of exactly \p size_a bytes. The callback is invoked only when all the data is available and * no error has occurred. On error, the error is logged, the read cycle stops and the session ends. Clients * are expected to implement reconnect logic. */ @@ -132,18 +170,18 @@ class session : public std::enable_shared_from_this> } /** - * Async read of exactly \p size bytes and a specific timeout. + * Async read of exactly \p size_a bytes and a specific \p timeout_a. * @see async_read_exactly (void *, size_t, std::function) */ void async_read_exactly (void * buff_a, size_t size_a, std::chrono::seconds timeout_a, std::function callback_a) { timer_start (timeout_a); - auto this_l (this->shared_from_this ()); boost::asio::async_read (socket, boost::asio::buffer (buff_a, size_a), boost::asio::transfer_exactly (size_a), [this_l, callback_a](boost::system::error_code const & ec, size_t bytes_transferred_a) { + this_l->timer_cancel (); if (ec == boost::asio::error::connection_aborted || ec == boost::asio::error::connection_reset) { if (this_l->node.config.logging.log_ipc ()) @@ -158,41 +196,25 @@ class session : public std::enable_shared_from_this> }); } - /** - * Write callback. If no error occurs, the session starts waiting for another request. - */ - void handle_write (const boost::system::error_code & error_a, size_t bytes_transferred_a) - { - timer_cancel (); - if (!error_a) - { - read_next_request (); - } - else if (node.config.logging.log_ipc ()) - { - BOOST_LOG (node.log) << "IPC: Write failed: " << error_a.message (); - } - } - /** Handler for payload_encoding::json_legacy */ void rpc_handle_query () { session_timer.restart (); auto request_id_l (std::to_string (server.id_dispenser.fetch_add (1))); - // This is called when the nano::rpc_handler#process_request is done. We convert to + // This is called when nano::rpc_handler#process_request is done. We convert to // json and write the response to the ipc socket with a length prefix. auto this_l (this->shared_from_this ()); auto response_handler_l ([this_l, request_id_l](boost::property_tree::ptree const & tree_a) { std::stringstream ostream; boost::property_tree::write_json (ostream, tree_a); ostream.flush (); - std::string request_body = ostream.str (); + std::string response_body = ostream.str (); - uint32_t size_response = boost::endian::native_to_big ((uint32_t)request_body.size ()); + uint32_t size_response = boost::endian::native_to_big ((uint32_t)response_body.size ()); std::vector bufs = { boost::asio::buffer (&size_response, sizeof (size_response)), - boost::asio::buffer (request_body) + boost::asio::buffer (response_body) }; this_l->timer_start (std::chrono::seconds (this_l->config_transport.io_timeout)); @@ -227,8 +249,7 @@ class session : public std::enable_shared_from_this> { auto this_l = this->shared_from_this (); - // Await next request indefinitely. The request format is as follows: - // u8['N', payload-type, reserved, reserved] + // Await next request indefinitely buffer.resize (sizeof (buffer_size)); async_read_exactly (buffer.data (), buffer.size (), std::chrono::seconds::max (), [this_l]() { if (this_l->buffer[preamble_offset::lead] != 'N' || this_l->buffer[preamble_offset::reserved_1] != 0 || this_l->buffer[preamble_offset::reserved_2] != 0) @@ -238,7 +259,7 @@ class session : public std::enable_shared_from_this> BOOST_LOG (this_l->node.log) << "IPC: Invalid preamble"; } } - else if (this_l->buffer[preamble_offset::encoding] == static_cast (payload_encoding::json_legacy)) + else if (this_l->buffer[preamble_offset::encoding] == static_cast (nano::ipc::payload_encoding::json_legacy)) { // Length of payload this_l->async_read_exactly (&this_l->buffer_size, sizeof (this_l->buffer_size), [this_l]() { @@ -264,40 +285,6 @@ class session : public std::enable_shared_from_this> socket.close (); } -protected: - /** - * Start IO timer. - * @param timeout_a Seconds to wait. To wait indefinitely, use std::chrono::seconds::max () - */ - void timer_start (std::chrono::seconds timeout_a) - { - if (timeout_a < std::chrono::seconds::max ()) - { - io_timer.expires_from_now (boost::posix_time::seconds (timeout_a.count ())); - io_timer.async_wait ([this](const boost::system::error_code & ec) { - if (!ec) - { - this->timer_expired (); - } - }); - } - } - - void timer_expired () - { - close (); - if (node.config.logging.log_ipc ()) - { - BOOST_LOG (node.log) << "IPC: IO timeout"; - } - } - - void timer_cancel () - { - boost::system::error_code ec; - this->io_timer.cancel (ec); - } - private: nano::ipc::ipc_server & server; nano::node & node; @@ -305,14 +292,14 @@ class session : public std::enable_shared_from_this> /** Unique session id used for logging */ uint64_t session_id; - /** Timer for measuring operations */ + /** Timer for measuring the duration of ipc calls */ nano::timer session_timer; /** * IO context from node, or per-transport, depending on configuration. * Certain transports may scale better if they use a separate context. */ - boost::asio::io_context & io_context; + boost::asio::io_context & io_ctx; /** A socket of the given asio type */ SOCKET_TYPE socket; @@ -323,9 +310,6 @@ class session : public std::enable_shared_from_this> /** Buffer used to store data received from the client */ std::vector buffer; - /** IO operation timer */ - boost::asio::deadline_timer io_timer; - /** Transport configuration */ nano::ipc::ipc_config_transport & config_transport; }; @@ -351,7 +335,7 @@ class socket_transport : public nano::ipc::transport acceptor->set_option (option_keepalive); accept (); - // Start serving IO requests. If concurrency_a is 0, the node's thread pool/io_context is used instead. + // Start serving IO requests. If concurrency_a is < 1, the node's thread pool/io_context is used instead. // A separate io_context for domain sockets may facilitate better performance on some systems. if (concurrency_a > 0) { @@ -383,13 +367,20 @@ class socket_transport : public nano::ipc::transport { this->accept (); } + else + { + BOOST_LOG (server.node.log) << "IPC: shutting down"; + } }); } void stop () { acceptor->close (); - io_ctx->stop (); + if (io_ctx) + { + io_ctx->stop (); + } if (runner) { @@ -405,7 +396,7 @@ class socket_transport : public nano::ipc::transport std::unique_ptr acceptor; }; -/** Domain socket file remover */ +/** The domain socket file is attemped removed at both startup and shutdown. */ class nano::ipc::dsock_file_remover { public: @@ -442,7 +433,7 @@ stopped (false) if (node_a.config.ipc_config.transport_tcp.enabled) { size_t threads = node_a.config.ipc_config.transport_tcp.io_threads; - transports.push_back (std::make_shared> (*this, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v6 (), node_a.config.ipc_config.transport_tcp.port), node_a.config.ipc_config.transport_domain, threads)); + transports.push_back (std::make_shared> (*this, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v6 (), node_a.config.ipc_config.transport_tcp.port), node_a.config.ipc_config.transport_tcp, threads)); } BOOST_LOG (node.log) << "IPC: server started"; @@ -466,3 +457,223 @@ void nano::ipc::ipc_server::stop () } stopped = true; } + +/** Socket agnostic IO interface */ +class channel +{ +public: + virtual void async_read (std::shared_ptr> buffer_a, size_t size_a, std::function callback_a) = 0; + virtual void async_write (std::shared_ptr> buffer_a, std::function callback_a) = 0; +}; + +/** Domain and TCP client socket */ +template +class socket_client : public socket_base, public channel +{ +public: + socket_client (boost::asio::io_context & io_ctx_a, ENDPOINT_TYPE endpoint_a) : + socket_base (io_ctx_a), endpoint (endpoint_a), socket (io_ctx_a), resolver (io_ctx_a) + { + } + + void async_resolve (std::string host_a, uint16_t port_a, std::function callback_a) + { + this->timer_start (io_timeout); + resolver.async_resolve (boost::asio::ip::tcp::resolver::query (host_a, std::to_string (port_a)), [this, callback_a](boost::system::error_code const & ec, boost::asio::ip::tcp::resolver::iterator endpoint_iterator_a) { + this->timer_cancel (); + boost::asio::ip::tcp::resolver::iterator end; + if (!ec && endpoint_iterator_a != end) + { + endpoint = *endpoint_iterator_a; + callback_a (ec, *endpoint_iterator_a); + } + else + { + callback_a (ec, *end); + } + }); + } + + void async_connect (std::function callback_a) + { + this->timer_start (io_timeout); + socket.async_connect (endpoint, [this, callback_a](boost::system::error_code const & ec) { + this->timer_cancel (); + callback_a (ec); + }); + } + + void async_read (std::shared_ptr> buffer_a, size_t size_a, std::function callback_a) override + { + this->timer_start (io_timeout); + //buffer_a->clear (); + buffer_a->resize (size_a); + boost::asio::async_read (socket, boost::asio::buffer (buffer_a->data (), size_a), [this, callback_a](boost::system::error_code const & ec, size_t size_a) { + this->timer_cancel (); + callback_a (ec, size_a); + }); + } + + void async_write (std::shared_ptr> buffer_a, std::function callback_a) override + { + this->timer_start (io_timeout); + boost::asio::async_write (socket, boost::asio::buffer (buffer_a->data (), buffer_a->size ()), [this, callback_a, buffer_a](boost::system::error_code const & ec, size_t size_a) { + this->timer_cancel (); + callback_a (ec, size_a); + }); + } + + /** Shut down and close socket */ + void close () override + { + socket.shutdown (boost::asio::ip::tcp::socket::shutdown_both); + socket.close (); + } + +private: + ENDPOINT_TYPE endpoint; + SOCKET_TYPE socket; + boost::asio::ip::tcp::resolver resolver; + std::chrono::seconds io_timeout{ 60 }; +}; + +/** + * PIMPL class for ipc_client. This ensures that socket_client and boost details can + * stay out of the header file. + */ +class client_impl : public nano::ipc::ipc_client_impl +{ +public: + client_impl (boost::asio::io_context & io_ctx_a) : + io_ctx (io_ctx_a) + { + } + ~client_impl () + { + } + + void connect (std::string host_a, uint16_t port_a, std::function callback_a) + { + tcp_client = std::make_shared> (io_ctx, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v6 (), port_a)); + + tcp_client->async_resolve (host_a, port_a, [this, callback_a](boost::system::error_code const & ec_resolve_a, boost::asio::ip::tcp::endpoint endpoint_a) { + if (!ec_resolve_a) + { + this->tcp_client->async_connect ([callback_a](const boost::system::error_code & ec_connect_a) { + callback_a (nano::error (ec_connect_a)); + }); + } + else + { + callback_a (nano::error (ec_resolve_a)); + } + }); + } + + nano::error connect (std::string path_a) + { + nano::error err; +#if defined(BOOST_ASIO_HAS_LOCAL_SOCKETS) + domain_client = std::make_shared> (io_ctx, boost::asio::local::stream_protocol::endpoint (path_a)); +#else + err = nano::error ("Domain sockets are not supported by this platform"); +#endif + return err; + } + + channel & channel () + { + return tcp_client ? static_cast (*tcp_client) : static_cast (*domain_client); + } + +private: + boost::asio::io_context & io_ctx; + std::shared_ptr> tcp_client; + std::shared_ptr> domain_client; +}; + +nano::ipc::ipc_client::ipc_client (boost::asio::io_context & io_ctx_a) : +io_ctx (io_ctx_a) +{ +} + +nano::ipc::ipc_client::~ipc_client () +{ +} + +nano::error nano::ipc::ipc_client::connect (std::string path_a) +{ + impl = std::make_unique (io_ctx); + return boost::polymorphic_downcast (impl.get ())->connect (path_a); +} + +void nano::ipc::ipc_client::async_connect (std::string host_a, uint16_t port_a, std::function callback_a) +{ + impl = std::make_unique (io_ctx); + auto client (boost::polymorphic_downcast (impl.get ())); + client->connect (host_a, port_a, callback_a); +} + +nano::error nano::ipc::ipc_client::connect (std::string host, uint16_t port) +{ + std::promise result_l; + async_connect (host, port, [&result_l](nano::error err_a) { + result_l.set_value (err_a); + }); + return result_l.get_future ().get (); +} + +void nano::ipc::ipc_client::async_write (std::shared_ptr> buffer_a, std::function callback_a) +{ + auto client (boost::polymorphic_downcast (impl.get ())); + client->channel ().async_write (buffer_a, [callback_a](const boost::system::error_code & ec_a, size_t bytes_written_a) { + callback_a (nano::error (ec_a), bytes_written_a); + }); +} + +void nano::ipc::ipc_client::async_read (std::shared_ptr> buffer_a, size_t size_a, std::function callback_a) +{ + auto client (boost::polymorphic_downcast (impl.get ())); + client->channel ().async_read (buffer_a, size_a, [callback_a](const boost::system::error_code & ec_a, size_t bytes_read_a) { + callback_a (nano::error (ec_a), bytes_read_a); + }); +} + +std::shared_ptr> nano::ipc::ipc_client::prepare_request (nano::ipc::payload_encoding encoding_a, std::string payload_a) +{ + auto buffer_l (std::make_shared> ()); + if (encoding_a == nano::ipc::payload_encoding::json_legacy) + { + buffer_l->push_back ('N'); + buffer_l->push_back (static_cast (encoding_a)); + buffer_l->push_back (0); + buffer_l->push_back (0); + + uint32_t payload_length = payload_a.size (); + uint32_t be = boost::endian::native_to_big (payload_length); + char * chars = reinterpret_cast (&be); + buffer_l->insert (buffer_l->end (), chars, chars + sizeof (uint32_t)); + buffer_l->insert (buffer_l->end (), payload_a.begin (), payload_a.end ()); + } + return buffer_l; +} + +std::string nano::ipc::rpc_ipc_client::request (std::string rpc_action_a) +{ + auto req (prepare_request (nano::ipc::payload_encoding::json_legacy, rpc_action_a)); + auto res (std::make_shared> ()); + + std::promise result_l; + async_write (req, [this, &res, &result_l](nano::error err_a, size_t size_a) { + // Read length + this->async_read (res, sizeof (uint32_t), [this, &res, &result_l](nano::error err_read_a, size_t size_read_a) { + uint32_t payload_size_l = boost::endian::big_to_native (*reinterpret_cast (res->data ())); + // Read json payload + this->async_read (res, payload_size_l, [&res, &result_l](nano::error err_read_a, size_t size_read_a) { + result_l.set_value (std::string (res->begin (), res->end ())); + }); + }); + }); + + return result_l.get_future ().get (); +} diff --git a/nano/node/ipc.hpp b/nano/node/ipc.hpp index 5f06f54923..00546e7eb9 100644 --- a/nano/node/ipc.hpp +++ b/nano/node/ipc.hpp @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include @@ -17,6 +18,18 @@ namespace nano { namespace ipc { + /** + * Payload encodings; add protobuf, flatbuffers and so on as needed. + */ + enum class payload_encoding : uint8_t + { + /** + * Request is preamble followed by 32-bit BE payload length and payload bytes. + * Response is 32-bit BE payload length followed by payload bytes. + */ + json_legacy = 1 + }; + /** Removes domain socket files on startup and shutdown */ class dsock_file_remover; @@ -34,7 +47,7 @@ namespace ipc public: bool enabled{ false }; size_t io_timeout{ 15 }; - size_t io_threads{ std::max (4u, std::thread::hardware_concurrency ()) }; + long io_threads{ -1 }; }; /** Domain socket specific transport config */ @@ -42,8 +55,8 @@ namespace ipc { public: /** - * Default domain socket path for Unix systems. Once we support Windows 10 usocks, this value - * will be conditional on OS. + * Default domain socket path for Unix systems. Once Boost supports Windows 10 usocks, + * this value will be conditional on OS. */ std::string path{ "/tmp/nano" }; }; @@ -52,7 +65,7 @@ namespace ipc class ipc_config_tcp_socket : public ipc_config_transport { public: - std::string address{ "::1" }; + /** Listening port */ uint16_t port{ 7077 }; }; @@ -85,5 +98,58 @@ namespace ipc std::unique_ptr file_remover; std::vector> transports; }; + + class ipc_client_impl + { + public: + virtual ~ipc_client_impl () = default; + }; + + /** IPC client */ + class ipc_client + { + public: + ipc_client (boost::asio::io_context & io_ctx_a); + ~ipc_client (); + + /** Connect to a domain socket */ + nano::error connect (std::string path); + + /** Connect to a tcp socket synchronously */ + nano::error connect (std::string host, uint16_t port); + + /** Connect to a tcp socket asynchronously */ + void async_connect (std::string host, uint16_t port, std::function callback); + + /** Write buffer asynchronously */ + void async_write (std::shared_ptr> buffer_a, std::function callback_a); + + /** Read \p size_a bytes asynchronously */ + void async_read (std::shared_ptr> buffer_a, size_t size_a, std::function callback_a); + + /** + * Returns a buffer with an IPC preamble for the given \p encoding_a followed by the payload. Depending on encoding, + * the buffer may contain a payload length or end sentinel. + */ + std::shared_ptr> prepare_request (nano::ipc::payload_encoding encoding_a, std::string payload_a); + + private: + boost::asio::io_context & io_ctx; + + // PIMPL pattern to hide implementation details + std::unique_ptr impl; + }; + + /** Convenience wrapper for making synchronous RPC calls via IPC */ + class rpc_ipc_client : public ipc_client + { + public: + rpc_ipc_client (boost::asio::io_context & io_ctx_a) : + ipc_client (io_ctx_a) + { + } + /** Calls the RPC server via IPC and waits for the result. The client must be connected. */ + std::string request (std::string rpc_action_a); + }; } } From dee720823b9c134583a31222b2f29e7651a27c57 Mon Sep 17 00:00:00 2001 From: cryptocode Date: Sun, 20 Jan 2019 02:10:51 +0100 Subject: [PATCH 11/15] Compile-time OS guard for domain sockets --- nano/node/ipc.cpp | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/nano/node/ipc.cpp b/nano/node/ipc.cpp index 42ccf136b7..fead2d2167 100644 --- a/nano/node/ipc.cpp +++ b/nano/node/ipc.cpp @@ -581,15 +581,21 @@ class client_impl : public nano::ipc::ipc_client_impl return err; } - channel & channel () + channel & get_channel () { - return tcp_client ? static_cast (*tcp_client) : static_cast (*domain_client); +#if defined(BOOST_ASIO_HAS_LOCAL_SOCKETS) + return tcp_client ? static_cast (*tcp_client) : static_cast (*domain_client); +#else + return static_cast (*tcp_client); +#endif } private: boost::asio::io_context & io_ctx; std::shared_ptr> tcp_client; +#if defined(BOOST_ASIO_HAS_LOCAL_SOCKETS) std::shared_ptr> domain_client; +#endif }; nano::ipc::ipc_client::ipc_client (boost::asio::io_context & io_ctx_a) : @@ -626,7 +632,7 @@ nano::error nano::ipc::ipc_client::connect (std::string host, uint16_t port) void nano::ipc::ipc_client::async_write (std::shared_ptr> buffer_a, std::function callback_a) { auto client (boost::polymorphic_downcast (impl.get ())); - client->channel ().async_write (buffer_a, [callback_a](const boost::system::error_code & ec_a, size_t bytes_written_a) { + client->get_channel ().async_write (buffer_a, [callback_a](const boost::system::error_code & ec_a, size_t bytes_written_a) { callback_a (nano::error (ec_a), bytes_written_a); }); } @@ -634,7 +640,7 @@ void nano::ipc::ipc_client::async_write (std::shared_ptr> b void nano::ipc::ipc_client::async_read (std::shared_ptr> buffer_a, size_t size_a, std::function callback_a) { auto client (boost::polymorphic_downcast (impl.get ())); - client->channel ().async_read (buffer_a, size_a, [callback_a](const boost::system::error_code & ec_a, size_t bytes_read_a) { + client->get_channel ().async_read (buffer_a, size_a, [callback_a](const boost::system::error_code & ec_a, size_t bytes_read_a) { callback_a (nano::error (ec_a), bytes_read_a); }); } From 13659dfd01648004314f08e6d8edc97ae99ff4d3 Mon Sep 17 00:00:00 2001 From: cryptocode Date: Wed, 23 Jan 2019 00:44:09 +0100 Subject: [PATCH 12/15] Remove debug catch, const string refs where applicable --- nano/node/ipc.cpp | 23 ++++++++++------------- nano/node/ipc.hpp | 11 +++++------ nano/node/node.cpp | 4 ---- 3 files changed, 15 insertions(+), 23 deletions(-) diff --git a/nano/node/ipc.cpp b/nano/node/ipc.cpp index fead2d2167..0cafeca8e5 100644 --- a/nano/node/ipc.cpp +++ b/nano/node/ipc.cpp @@ -400,7 +400,7 @@ class socket_transport : public nano::ipc::transport class nano::ipc::dsock_file_remover { public: - dsock_file_remover (std::string file_a) : + dsock_file_remover (std::string const & file_a) : filename (file_a) { std::remove (filename.c_str ()); @@ -413,8 +413,7 @@ class nano::ipc::dsock_file_remover }; nano::ipc::ipc_server::ipc_server (nano::node & node_a, nano::rpc & rpc_a) : -node (node_a), rpc (rpc_a), -stopped (false) +node (node_a), rpc (rpc_a) { try { @@ -455,7 +454,6 @@ void nano::ipc::ipc_server::stop () { transport->stop (); } - stopped = true; } /** Socket agnostic IO interface */ @@ -476,7 +474,7 @@ class socket_client : public socket_base, public channel { } - void async_resolve (std::string host_a, uint16_t port_a, std::function callback_a) + void async_resolve (std::string const & host_a, uint16_t port_a, std::function callback_a) { this->timer_start (io_timeout); resolver.async_resolve (boost::asio::ip::tcp::resolver::query (host_a, std::to_string (port_a)), [this, callback_a](boost::system::error_code const & ec, boost::asio::ip::tcp::resolver::iterator endpoint_iterator_a) { @@ -506,7 +504,6 @@ class socket_client : public socket_base, public channel void async_read (std::shared_ptr> buffer_a, size_t size_a, std::function callback_a) override { this->timer_start (io_timeout); - //buffer_a->clear (); buffer_a->resize (size_a); boost::asio::async_read (socket, boost::asio::buffer (buffer_a->data (), size_a), [this, callback_a](boost::system::error_code const & ec, size_t size_a) { this->timer_cancel (); @@ -552,7 +549,7 @@ class client_impl : public nano::ipc::ipc_client_impl { } - void connect (std::string host_a, uint16_t port_a, std::function callback_a) + void connect (std::string const & host_a, uint16_t port_a, std::function callback_a) { tcp_client = std::make_shared> (io_ctx, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v6 (), port_a)); @@ -570,7 +567,7 @@ class client_impl : public nano::ipc::ipc_client_impl }); } - nano::error connect (std::string path_a) + nano::error connect (std::string const & path_a) { nano::error err; #if defined(BOOST_ASIO_HAS_LOCAL_SOCKETS) @@ -607,20 +604,20 @@ nano::ipc::ipc_client::~ipc_client () { } -nano::error nano::ipc::ipc_client::connect (std::string path_a) +nano::error nano::ipc::ipc_client::connect (std::string const & path_a) { impl = std::make_unique (io_ctx); return boost::polymorphic_downcast (impl.get ())->connect (path_a); } -void nano::ipc::ipc_client::async_connect (std::string host_a, uint16_t port_a, std::function callback_a) +void nano::ipc::ipc_client::async_connect (std::string const & host_a, uint16_t port_a, std::function callback_a) { impl = std::make_unique (io_ctx); auto client (boost::polymorphic_downcast (impl.get ())); client->connect (host_a, port_a, callback_a); } -nano::error nano::ipc::ipc_client::connect (std::string host, uint16_t port) +nano::error nano::ipc::ipc_client::connect (std::string const & host, uint16_t port) { std::promise result_l; async_connect (host, port, [&result_l](nano::error err_a) { @@ -645,7 +642,7 @@ void nano::ipc::ipc_client::async_read (std::shared_ptr> bu }); } -std::shared_ptr> nano::ipc::ipc_client::prepare_request (nano::ipc::payload_encoding encoding_a, std::string payload_a) +std::shared_ptr> nano::ipc::ipc_client::prepare_request (nano::ipc::payload_encoding encoding_a, std::string const & payload_a) { auto buffer_l (std::make_shared> ()); if (encoding_a == nano::ipc::payload_encoding::json_legacy) @@ -664,7 +661,7 @@ std::shared_ptr> nano::ipc::ipc_client::prepare_request (na return buffer_l; } -std::string nano::ipc::rpc_ipc_client::request (std::string rpc_action_a) +std::string nano::ipc::rpc_ipc_client::request (std::string const & rpc_action_a) { auto req (prepare_request (nano::ipc::payload_encoding::json_legacy, rpc_action_a)); auto res (std::make_shared> ()); diff --git a/nano/node/ipc.hpp b/nano/node/ipc.hpp index 00546e7eb9..83c78edf3d 100644 --- a/nano/node/ipc.hpp +++ b/nano/node/ipc.hpp @@ -94,7 +94,6 @@ namespace ipc std::atomic id_dispenser{ 0 }; private: - std::atomic stopped; std::unique_ptr file_remover; std::vector> transports; }; @@ -113,13 +112,13 @@ namespace ipc ~ipc_client (); /** Connect to a domain socket */ - nano::error connect (std::string path); + nano::error connect (std::string const & path); /** Connect to a tcp socket synchronously */ - nano::error connect (std::string host, uint16_t port); + nano::error connect (std::string const & host, uint16_t port); /** Connect to a tcp socket asynchronously */ - void async_connect (std::string host, uint16_t port, std::function callback); + void async_connect (std::string const & host, uint16_t port, std::function callback); /** Write buffer asynchronously */ void async_write (std::shared_ptr> buffer_a, std::function callback_a); @@ -131,7 +130,7 @@ namespace ipc * Returns a buffer with an IPC preamble for the given \p encoding_a followed by the payload. Depending on encoding, * the buffer may contain a payload length or end sentinel. */ - std::shared_ptr> prepare_request (nano::ipc::payload_encoding encoding_a, std::string payload_a); + std::shared_ptr> prepare_request (nano::ipc::payload_encoding encoding_a, std::string const & payload_a); private: boost::asio::io_context & io_ctx; @@ -149,7 +148,7 @@ namespace ipc { } /** Calls the RPC server via IPC and waits for the result. The client must be connected. */ - std::string request (std::string rpc_action_a); + std::string request (std::string const & rpc_action_a); }; } } diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 3db3bc8fd8..8ea60eb31c 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -3664,10 +3664,6 @@ nano::thread_runner::thread_runner (boost::asio::io_context & io_ctx_a, unsigned { io_ctx_a.run (); } - catch (std::exception const & ex) - { - std::cerr << ex.what () << std::endl; - } catch (...) { #ifndef NDEBUG From 0eb4e8c6f2d8e56584eb08dbf32c31ce810b2022 Mon Sep 17 00:00:00 2001 From: cryptocode Date: Thu, 24 Jan 2019 23:25:00 +0100 Subject: [PATCH 13/15] Destructor and cast fixes, use version 16 in nodeconfig --- nano/node/ipc.cpp | 18 ++++-------------- nano/node/ipc.hpp | 5 +++-- nano/node/nodeconfig.cpp | 7 ++----- nano/node/nodeconfig.hpp | 2 +- 4 files changed, 10 insertions(+), 22 deletions(-) diff --git a/nano/node/ipc.cpp b/nano/node/ipc.cpp index 0cafeca8e5..cf156b2280 100644 --- a/nano/node/ipc.cpp +++ b/nano/node/ipc.cpp @@ -125,7 +125,8 @@ class socket_base void timer_cancel () { boost::system::error_code ec; - this->io_timer.cancel (ec); + io_timer.cancel (ec); + assert (!ec); } private: @@ -150,10 +151,6 @@ class session : public socket_base, public std::enable_shared_from_this (response_body.size ())); std::vector bufs = { boost::asio::buffer (&size_response, sizeof (size_response)), boost::asio::buffer (response_body) @@ -237,7 +234,7 @@ class session : public socket_base, public std::enable_shared_from_this (buffer.data ()), buffer.size ())); // Note that if the rpc action is async, the shared_ptr lifetime will be extended by the action handler nano::rpc_handler handler (node, server.rpc, body, request_id_l, response_handler_l); @@ -545,9 +542,6 @@ class client_impl : public nano::ipc::ipc_client_impl io_ctx (io_ctx_a) { } - ~client_impl () - { - } void connect (std::string const & host_a, uint16_t port_a, std::function callback_a) { @@ -600,10 +594,6 @@ io_ctx (io_ctx_a) { } -nano::ipc::ipc_client::~ipc_client () -{ -} - nano::error nano::ipc::ipc_client::connect (std::string const & path_a) { impl = std::make_unique (io_ctx); diff --git a/nano/node/ipc.hpp b/nano/node/ipc.hpp index 83c78edf3d..4f4d58973e 100644 --- a/nano/node/ipc.hpp +++ b/nano/node/ipc.hpp @@ -45,6 +45,7 @@ namespace ipc class ipc_config_transport { public: + ~ipc_config_transport () = default; bool enabled{ false }; size_t io_timeout{ 15 }; long io_threads{ -1 }; @@ -84,7 +85,7 @@ namespace ipc { public: ipc_server (nano::node & node, nano::rpc & rpc); - ~ipc_server (); + virtual ~ipc_server (); void stop (); nano::node & node; @@ -109,7 +110,7 @@ namespace ipc { public: ipc_client (boost::asio::io_context & io_ctx_a); - ~ipc_client (); + virtual ~ipc_client () = default; /** Connect to a domain socket */ nano::error connect (std::string const & path); diff --git a/nano/node/nodeconfig.cpp b/nano/node/nodeconfig.cpp index a4bb79900d..719a1561ad 100644 --- a/nano/node/nodeconfig.cpp +++ b/nano/node/nodeconfig.cpp @@ -227,16 +227,13 @@ bool nano::node_config::upgrade_json (unsigned version_a, nano::jsonconfig & jso }); json.replace_child (preconfigured_peers_key, peers); - upgraded = true; - } - case 16: - { + nano::jsonconfig ipc_l; ipc_config.serialize_json (ipc_l); json.put_child ("ipc", ipc_l); upgraded = true; } - case 17: + case 16: break; default: throw std::runtime_error ("Unknown node_config version"); diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index 482c4a4169..aabae5b3bf 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -54,7 +54,7 @@ class node_config static std::chrono::minutes constexpr wallet_backup_interval = std::chrono::minutes (5); static int json_version () { - return 17; + return 16; } }; From 94f3ab6c438636a937b8c4e2748bed8fc5121140 Mon Sep 17 00:00:00 2001 From: cryptocode Date: Thu, 24 Jan 2019 23:26:42 +0100 Subject: [PATCH 14/15] Virtual dtor in ipc_config_transport --- nano/node/ipc.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nano/node/ipc.hpp b/nano/node/ipc.hpp index 4f4d58973e..ec4ae7acce 100644 --- a/nano/node/ipc.hpp +++ b/nano/node/ipc.hpp @@ -45,7 +45,7 @@ namespace ipc class ipc_config_transport { public: - ~ipc_config_transport () = default; + virtual ~ipc_config_transport () = default; bool enabled{ false }; size_t io_timeout{ 15 }; long io_threads{ -1 }; From 986892c353847477883754725668a70edbc0c4e8 Mon Sep 17 00:00:00 2001 From: cryptocode Date: Thu, 24 Jan 2019 23:30:57 +0100 Subject: [PATCH 15/15] Typo fix and final specifier --- nano/node/ipc.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nano/node/ipc.cpp b/nano/node/ipc.cpp index cf156b2280..ae0af5868a 100644 --- a/nano/node/ipc.cpp +++ b/nano/node/ipc.cpp @@ -393,8 +393,8 @@ class socket_transport : public nano::ipc::transport std::unique_ptr acceptor; }; -/** The domain socket file is attemped removed at both startup and shutdown. */ -class nano::ipc::dsock_file_remover +/** The domain socket file is attempted removed at both startup and shutdown. */ +class nano::ipc::dsock_file_remover final { public: dsock_file_remover (std::string const & file_a) :