diff --git a/nano/core_test/CMakeLists.txt b/nano/core_test/CMakeLists.txt index 33c49ce401..e4c0979af7 100644 --- a/nano/core_test/CMakeLists.txt +++ b/nano/core_test/CMakeLists.txt @@ -3,6 +3,7 @@ add_executable (core_test block.cpp block_store.cpp interface.cpp + ipc.cpp conflicts.cpp daemon.cpp entry.cpp diff --git a/nano/core_test/ipc.cpp b/nano/core_test/ipc.cpp new file mode 100644 index 0000000000..c53cac4047 --- /dev/null +++ b/nano/core_test/ipc.cpp @@ -0,0 +1,87 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace std::chrono_literals; + +TEST (ipc, asynchronous) +{ + nano::system system (24000, 1); + nano::rpc rpc (system.io_ctx, *system.nodes[0], nano::rpc_config (true)); + system.nodes[0]->config.ipc_config.transport_tcp.enabled = true; + system.nodes[0]->config.ipc_config.transport_tcp.port = 24077; + nano::ipc::ipc_server ipc (*system.nodes[0], rpc); + nano::ipc::ipc_client client (system.nodes[0]->io_ctx); + + auto req (client.prepare_request (nano::ipc::payload_encoding::json_legacy, std::string (R"({"action": "block_count"})"))); + auto res (std::make_shared> ()); + std::atomic call_completed{ false }; + client.async_connect ("::1", 24077, [&client, &req, &res, &call_completed](nano::error err) { + client.async_write (req, [&client, &req, &res, &call_completed](nano::error err_a, size_t size_a) { + ASSERT_NO_ERROR (static_cast (err_a)); + ASSERT_EQ (size_a, req->size ()); + // Read length + client.async_read (res, sizeof (uint32_t), [&client, &res, &call_completed](nano::error err_read_a, size_t size_read_a) { + ASSERT_NO_ERROR (static_cast (err_read_a)); + ASSERT_EQ (size_read_a, sizeof (uint32_t)); + uint32_t payload_size_l = boost::endian::big_to_native (*reinterpret_cast (res->data ())); + // Read json payload + client.async_read (res, payload_size_l, [&res, &call_completed](nano::error err_read_a, size_t size_read_a) { + std::string payload (res->begin (), res->end ()); + std::stringstream ss; + ss << payload; + + // Make sure the response is valid json + boost::property_tree::ptree blocks; + boost::property_tree::read_json (ss, blocks); + ASSERT_EQ (blocks.get ("count"), 1); + call_completed = true; + }); + }); + }); + }); + system.deadline_set (5s); + while (!call_completed) + { + ASSERT_NO_ERROR (system.poll ()); + } +} + +TEST (ipc, synchronous) +{ + nano::system system (24000, 1); + nano::rpc rpc (system.io_ctx, *system.nodes[0], nano::rpc_config (true)); + system.nodes[0]->config.ipc_config.transport_tcp.enabled = true; + system.nodes[0]->config.ipc_config.transport_tcp.port = 24077; + nano::ipc::ipc_server ipc (*system.nodes[0], rpc); + nano::ipc::rpc_ipc_client client (system.nodes[0]->io_ctx); + + // Start blocking IPC client in a separate thread + std::atomic call_completed{ false }; + std::thread client_thread ([&client, &call_completed]() { + client.connect ("::1", 24077); + std::string response (client.request (std::string (R"({"action": "block_count"})"))); + std::stringstream ss; + ss << response; + // Make sure the response is valid json + boost::property_tree::ptree blocks; + boost::property_tree::read_json (ss, blocks); + ASSERT_EQ (blocks.get ("count"), 1); + + call_completed = true; + }); + client_thread.detach (); + + system.deadline_set (5s); + while (!call_completed) + { + ASSERT_NO_ERROR (system.poll ()); + } +} diff --git a/nano/nano_node/daemon.cpp b/nano/nano_node/daemon.cpp index a9166646b5..6a7e0eded1 100644 --- a/nano/nano_node/daemon.cpp +++ b/nano/nano_node/daemon.cpp @@ -1,10 +1,10 @@ -#include -#include -#include - #include #include #include +#include +#include +#include +#include #include nano_daemon::daemon_config::daemon_config () : @@ -139,10 +139,11 @@ void nano_daemon::daemon::run (boost::filesystem::path const & data_path, nano:: node->flags = flags; node->start (); std::unique_ptr rpc = get_rpc (io_ctx, *node, config.rpc); - if (rpc && config.rpc_enable) + if (rpc) { - rpc->start (); + rpc->start (config.rpc_enable); } + nano::ipc::ipc_server ipc (*node, *rpc); runner = std::make_unique (io_ctx, node->config.io_threads); runner->join (); } diff --git a/nano/nano_wallet/entry.cpp b/nano/nano_wallet/entry.cpp index f0e8780eb5..6056c1e546 100644 --- a/nano/nano_wallet/entry.cpp +++ b/nano/nano_wallet/entry.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -277,12 +278,14 @@ int run_wallet (QApplication & application, int argc, char * const * argv, boost update_config (config, config_path); node->start (); std::unique_ptr rpc = get_rpc (io_ctx, *node, config.rpc); - if (rpc && config.rpc_enable) + if (rpc) { - rpc->start (); + rpc->start (config.rpc_enable); } + nano::ipc::ipc_server ipc (*node, *rpc); nano::thread_runner runner (io_ctx, node->config.io_threads); QObject::connect (&application, &QApplication::aboutToQuit, [&]() { + ipc.stop (); rpc->stop (); node->stop (); }); diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index 07397c091e..d634c5f572 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -23,6 +23,8 @@ add_library (node cli.cpp common.cpp common.hpp + ipc.hpp + ipc.cpp lmdb.cpp lmdb.hpp logging.cpp diff --git a/nano/node/ipc.cpp b/nano/node/ipc.cpp new file mode 100644 index 0000000000..ae0af5868a --- /dev/null +++ b/nano/node/ipc.cpp @@ -0,0 +1,672 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace boost::log; +namespace +{ +/** + * The IPC framing format is simple: preamble followed by an encoding specific payload. + * Preamble is uint8_t {'N', encoding_type, reserved, reserved}. Reserved bytes MUST be zero. + * @note This is intentionally not an enum class as the values are only used as vector indices. + */ +enum preamble_offset +{ + /** Always 'N' */ + lead = 0, + /** One of the payload_encoding values */ + encoding = 1, + /** Always zero */ + reserved_1 = 2, + /** Always zero */ + reserved_2 = 3, +}; +} +nano::error nano::ipc::ipc_config::serialize_json (nano::jsonconfig & json) const +{ + nano::jsonconfig tcp_l; + // Only write out experimental config values if they're previously set explicitly in the config file + if (transport_tcp.io_threads >= 0) + { + tcp_l.put ("io_threads", transport_tcp.io_threads); + } + tcp_l.put ("enable", transport_tcp.enabled); + tcp_l.put ("port", transport_tcp.port); + tcp_l.put ("io_timeout", transport_tcp.io_timeout); + json.put_child ("tcp", tcp_l); + + nano::jsonconfig domain_l; + if (transport_domain.io_threads >= 0) + { + domain_l.put ("io_threads", transport_domain.io_threads); + } + domain_l.put ("enable", transport_domain.enabled); + domain_l.put ("path", transport_domain.path); + domain_l.put ("io_timeout", transport_domain.io_timeout); + json.put_child ("local", domain_l); + return json.get_error (); +} + +nano::error nano::ipc::ipc_config::deserialize_json (nano::jsonconfig & json) +{ + auto tcp_l (json.get_optional_child ("tcp")); + if (tcp_l) + { + tcp_l->get_optional ("io_threads", transport_tcp.io_threads, -1); + tcp_l->get ("enable", transport_tcp.enabled); + tcp_l->get ("port", transport_tcp.port); + tcp_l->get ("io_timeout", transport_tcp.io_timeout); + } + + auto domain_l (json.get_optional_child ("local")); + if (domain_l) + { + domain_l->get_optional ("io_threads", transport_domain.io_threads, -1); + domain_l->get ("enable", transport_domain.enabled); + domain_l->get ("path", transport_domain.path); + domain_l->get ("io_timeout", transport_domain.io_timeout); + } + + return json.get_error (); +} + +/** Abstract base type for sockets, implementing timer logic and a close operation */ +class socket_base +{ +public: + socket_base (boost::asio::io_context & io_ctx_a) : + io_timer (io_ctx_a) + { + } + virtual ~socket_base () = default; + + /** Close socket */ + virtual void close () = 0; + + /** + * Start IO timer. + * @param timeout_a Seconds to wait. To wait indefinitely, use std::chrono::seconds::max () + */ + void timer_start (std::chrono::seconds timeout_a) + { + if (timeout_a < std::chrono::seconds::max ()) + { + io_timer.expires_from_now (boost::posix_time::seconds (timeout_a.count ())); + io_timer.async_wait ([this](const boost::system::error_code & ec) { + if (!ec) + { + this->timer_expired (); + } + }); + } + } + + void timer_expired () + { + close (); + } + + void timer_cancel () + { + boost::system::error_code ec; + io_timer.cancel (ec); + assert (!ec); + } + +private: + /** IO operation timer */ + boost::asio::deadline_timer io_timer; +}; + +/** + * A session represents an inbound connection over which multiple requests/reponses are transmitted. + */ +template +class session : public socket_base, public std::enable_shared_from_this> +{ +public: + session (nano::ipc::ipc_server & server_a, boost::asio::io_context & io_ctx_a, nano::ipc::ipc_config_transport & config_transport_a) : + socket_base (io_ctx_a), + server (server_a), node (server_a.node), session_id (server_a.id_dispenser.fetch_add (1)), io_ctx (io_ctx_a), socket (io_ctx_a), config_transport (config_transport_a) + { + if (node.config.logging.log_ipc ()) + { + BOOST_LOG (node.log) << "IPC: created session with id: " << session_id; + } + } + + SOCKET_TYPE & get_socket () + { + return socket; + } + + /** + * Async read of exactly \p size_a bytes. The callback is invoked only when all the data is available and + * no error has occurred. On error, the error is logged, the read cycle stops and the session ends. Clients + * are expected to implement reconnect logic. + */ + void async_read_exactly (void * buff_a, size_t size_a, std::function callback_a) + { + async_read_exactly (buff_a, size_a, std::chrono::seconds (config_transport.io_timeout), callback_a); + } + + /** + * Async read of exactly \p size_a bytes and a specific \p timeout_a. + * @see async_read_exactly (void *, size_t, std::function) + */ + void async_read_exactly (void * buff_a, size_t size_a, std::chrono::seconds timeout_a, std::function callback_a) + { + timer_start (timeout_a); + auto this_l (this->shared_from_this ()); + boost::asio::async_read (socket, + boost::asio::buffer (buff_a, size_a), + boost::asio::transfer_exactly (size_a), + [this_l, callback_a](boost::system::error_code const & ec, size_t bytes_transferred_a) { + this_l->timer_cancel (); + if (ec == boost::asio::error::connection_aborted || ec == boost::asio::error::connection_reset) + { + if (this_l->node.config.logging.log_ipc ()) + { + BOOST_LOG (this_l->node.log) << boost::str (boost::format ("IPC: error reading %1% ") % ec.message ()); + } + } + else if (bytes_transferred_a > 0) + { + callback_a (); + } + }); + } + + /** Handler for payload_encoding::json_legacy */ + void rpc_handle_query () + { + session_timer.restart (); + auto request_id_l (std::to_string (server.id_dispenser.fetch_add (1))); + + // This is called when nano::rpc_handler#process_request is done. We convert to + // json and write the response to the ipc socket with a length prefix. + auto this_l (this->shared_from_this ()); + auto response_handler_l ([this_l, request_id_l](boost::property_tree::ptree const & tree_a) { + std::stringstream ostream; + boost::property_tree::write_json (ostream, tree_a); + ostream.flush (); + std::string response_body = ostream.str (); + + uint32_t size_response = boost::endian::native_to_big (static_cast (response_body.size ())); + std::vector bufs = { + boost::asio::buffer (&size_response, sizeof (size_response)), + boost::asio::buffer (response_body) + }; + + this_l->timer_start (std::chrono::seconds (this_l->config_transport.io_timeout)); + boost::asio::async_write (this_l->socket, bufs, [this_l](boost::system::error_code const & error_a, size_t size_a) { + this_l->timer_cancel (); + if (!error_a) + { + this_l->read_next_request (); + } + else if (this_l->node.config.logging.log_ipc ()) + { + BOOST_LOG (this_l->node.log) << "IPC: Write failed: " << error_a.message (); + } + }); + + if (this_l->node.config.logging.log_ipc ()) + { + BOOST_LOG (this_l->node.log) << boost::str (boost::format ("IPC/RPC request %1% completed in: %2% %3%") % request_id_l % this_l->session_timer.stop ().count () % this_l->session_timer.unit ()); + } + }); + + node.stats.inc (nano::stat::type::ipc, nano::stat::detail::invocations); + auto body (std::string (reinterpret_cast (buffer.data ()), buffer.size ())); + + // Note that if the rpc action is async, the shared_ptr lifetime will be extended by the action handler + nano::rpc_handler handler (node, server.rpc, body, request_id_l, response_handler_l); + handler.process_request (); + } + + /** Async request reader */ + void read_next_request () + { + auto this_l = this->shared_from_this (); + + // Await next request indefinitely + buffer.resize (sizeof (buffer_size)); + async_read_exactly (buffer.data (), buffer.size (), std::chrono::seconds::max (), [this_l]() { + if (this_l->buffer[preamble_offset::lead] != 'N' || this_l->buffer[preamble_offset::reserved_1] != 0 || this_l->buffer[preamble_offset::reserved_2] != 0) + { + if (this_l->node.config.logging.log_ipc ()) + { + BOOST_LOG (this_l->node.log) << "IPC: Invalid preamble"; + } + } + else if (this_l->buffer[preamble_offset::encoding] == static_cast (nano::ipc::payload_encoding::json_legacy)) + { + // Length of payload + this_l->async_read_exactly (&this_l->buffer_size, sizeof (this_l->buffer_size), [this_l]() { + boost::endian::big_to_native_inplace (this_l->buffer_size); + this_l->buffer.resize (this_l->buffer_size); + // Payload (ptree compliant JSON string) + this_l->async_read_exactly (this_l->buffer.data (), this_l->buffer_size, [this_l]() { + this_l->rpc_handle_query (); + }); + }); + } + else if (this_l->node.config.logging.log_ipc ()) + { + BOOST_LOG (this_l->node.log) << "IPC: Unsupported payload encoding"; + } + }); + } + + /** Shut down and close socket */ + void close () + { + socket.shutdown (boost::asio::ip::tcp::socket::shutdown_both); + socket.close (); + } + +private: + nano::ipc::ipc_server & server; + nano::node & node; + + /** Unique session id used for logging */ + uint64_t session_id; + + /** Timer for measuring the duration of ipc calls */ + nano::timer session_timer; + + /** + * IO context from node, or per-transport, depending on configuration. + * Certain transports may scale better if they use a separate context. + */ + boost::asio::io_context & io_ctx; + + /** A socket of the given asio type */ + SOCKET_TYPE socket; + + /** Buffer sizes are read into this */ + uint32_t buffer_size{ 0 }; + + /** Buffer used to store data received from the client */ + std::vector buffer; + + /** Transport configuration */ + nano::ipc::ipc_config_transport & config_transport; +}; + +/** Domain and TCP socket transport */ +template +class socket_transport : public nano::ipc::transport +{ +public: + socket_transport (nano::ipc::ipc_server & server_a, ENDPOINT_TYPE endpoint_a, nano::ipc::ipc_config_transport & config_transport_a, int concurrency_a) : + server (server_a), config_transport (config_transport_a) + { + // Using a per-transport event dispatcher? + if (concurrency_a > 0) + { + io_ctx = std::make_unique (); + } + + boost::asio::socket_base::reuse_address option (true); + boost::asio::socket_base::keep_alive option_keepalive (true); + acceptor = std::make_unique (context (), endpoint_a); + acceptor->set_option (option); + acceptor->set_option (option_keepalive); + accept (); + + // Start serving IO requests. If concurrency_a is < 1, the node's thread pool/io_context is used instead. + // A separate io_context for domain sockets may facilitate better performance on some systems. + if (concurrency_a > 0) + { + runner = std::make_unique (*io_ctx, concurrency_a); + } + } + + boost::asio::io_context & context () const + { + return io_ctx ? *io_ctx : server.node.io_ctx; + } + + void accept () + { + // Prepare the next session + auto new_session (std::make_shared> (server, context (), config_transport)); + + acceptor->async_accept (new_session->get_socket (), [this, new_session](boost::system::error_code const & ec) { + if (!ec) + { + new_session->read_next_request (); + } + else + { + BOOST_LOG (server.node.log) << "IPC: acceptor error: " << ec.message (); + } + + if (acceptor->is_open () && ec != boost::asio::error::operation_aborted) + { + this->accept (); + } + else + { + BOOST_LOG (server.node.log) << "IPC: shutting down"; + } + }); + } + + void stop () + { + acceptor->close (); + if (io_ctx) + { + io_ctx->stop (); + } + + if (runner) + { + runner->join (); + } + } + +private: + nano::ipc::ipc_server & server; + nano::ipc::ipc_config_transport & config_transport; + std::unique_ptr runner; + std::unique_ptr io_ctx; + std::unique_ptr acceptor; +}; + +/** The domain socket file is attempted removed at both startup and shutdown. */ +class nano::ipc::dsock_file_remover final +{ +public: + dsock_file_remover (std::string const & file_a) : + filename (file_a) + { + std::remove (filename.c_str ()); + } + ~dsock_file_remover () + { + std::remove (filename.c_str ()); + } + std::string filename; +}; + +nano::ipc::ipc_server::ipc_server (nano::node & node_a, nano::rpc & rpc_a) : +node (node_a), rpc (rpc_a) +{ + try + { + if (node_a.config.ipc_config.transport_domain.enabled) + { +#if defined(BOOST_ASIO_HAS_LOCAL_SOCKETS) + size_t threads = node_a.config.ipc_config.transport_domain.io_threads; + file_remover = std::make_unique (node_a.config.ipc_config.transport_domain.path); + boost::asio::local::stream_protocol::endpoint ep{ node_a.config.ipc_config.transport_domain.path }; + transports.push_back (std::make_shared> (*this, ep, node_a.config.ipc_config.transport_domain, threads)); +#else + BOOST_LOG (node.log) << "IPC: Domain sockets are not supported on this platform"; +#endif + } + + if (node_a.config.ipc_config.transport_tcp.enabled) + { + size_t threads = node_a.config.ipc_config.transport_tcp.io_threads; + transports.push_back (std::make_shared> (*this, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v6 (), node_a.config.ipc_config.transport_tcp.port), node_a.config.ipc_config.transport_tcp, threads)); + } + + BOOST_LOG (node.log) << "IPC: server started"; + } + catch (std::runtime_error const & ex) + { + BOOST_LOG (node.log) << "IPC: " << ex.what (); + } +} + +nano::ipc::ipc_server::~ipc_server () +{ + BOOST_LOG (node.log) << "IPC: server stopped"; +} + +void nano::ipc::ipc_server::stop () +{ + for (auto & transport : transports) + { + transport->stop (); + } +} + +/** Socket agnostic IO interface */ +class channel +{ +public: + virtual void async_read (std::shared_ptr> buffer_a, size_t size_a, std::function callback_a) = 0; + virtual void async_write (std::shared_ptr> buffer_a, std::function callback_a) = 0; +}; + +/** Domain and TCP client socket */ +template +class socket_client : public socket_base, public channel +{ +public: + socket_client (boost::asio::io_context & io_ctx_a, ENDPOINT_TYPE endpoint_a) : + socket_base (io_ctx_a), endpoint (endpoint_a), socket (io_ctx_a), resolver (io_ctx_a) + { + } + + void async_resolve (std::string const & host_a, uint16_t port_a, std::function callback_a) + { + this->timer_start (io_timeout); + resolver.async_resolve (boost::asio::ip::tcp::resolver::query (host_a, std::to_string (port_a)), [this, callback_a](boost::system::error_code const & ec, boost::asio::ip::tcp::resolver::iterator endpoint_iterator_a) { + this->timer_cancel (); + boost::asio::ip::tcp::resolver::iterator end; + if (!ec && endpoint_iterator_a != end) + { + endpoint = *endpoint_iterator_a; + callback_a (ec, *endpoint_iterator_a); + } + else + { + callback_a (ec, *end); + } + }); + } + + void async_connect (std::function callback_a) + { + this->timer_start (io_timeout); + socket.async_connect (endpoint, [this, callback_a](boost::system::error_code const & ec) { + this->timer_cancel (); + callback_a (ec); + }); + } + + void async_read (std::shared_ptr> buffer_a, size_t size_a, std::function callback_a) override + { + this->timer_start (io_timeout); + buffer_a->resize (size_a); + boost::asio::async_read (socket, boost::asio::buffer (buffer_a->data (), size_a), [this, callback_a](boost::system::error_code const & ec, size_t size_a) { + this->timer_cancel (); + callback_a (ec, size_a); + }); + } + + void async_write (std::shared_ptr> buffer_a, std::function callback_a) override + { + this->timer_start (io_timeout); + boost::asio::async_write (socket, boost::asio::buffer (buffer_a->data (), buffer_a->size ()), [this, callback_a, buffer_a](boost::system::error_code const & ec, size_t size_a) { + this->timer_cancel (); + callback_a (ec, size_a); + }); + } + + /** Shut down and close socket */ + void close () override + { + socket.shutdown (boost::asio::ip::tcp::socket::shutdown_both); + socket.close (); + } + +private: + ENDPOINT_TYPE endpoint; + SOCKET_TYPE socket; + boost::asio::ip::tcp::resolver resolver; + std::chrono::seconds io_timeout{ 60 }; +}; + +/** + * PIMPL class for ipc_client. This ensures that socket_client and boost details can + * stay out of the header file. + */ +class client_impl : public nano::ipc::ipc_client_impl +{ +public: + client_impl (boost::asio::io_context & io_ctx_a) : + io_ctx (io_ctx_a) + { + } + + void connect (std::string const & host_a, uint16_t port_a, std::function callback_a) + { + tcp_client = std::make_shared> (io_ctx, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v6 (), port_a)); + + tcp_client->async_resolve (host_a, port_a, [this, callback_a](boost::system::error_code const & ec_resolve_a, boost::asio::ip::tcp::endpoint endpoint_a) { + if (!ec_resolve_a) + { + this->tcp_client->async_connect ([callback_a](const boost::system::error_code & ec_connect_a) { + callback_a (nano::error (ec_connect_a)); + }); + } + else + { + callback_a (nano::error (ec_resolve_a)); + } + }); + } + + nano::error connect (std::string const & path_a) + { + nano::error err; +#if defined(BOOST_ASIO_HAS_LOCAL_SOCKETS) + domain_client = std::make_shared> (io_ctx, boost::asio::local::stream_protocol::endpoint (path_a)); +#else + err = nano::error ("Domain sockets are not supported by this platform"); +#endif + return err; + } + + channel & get_channel () + { +#if defined(BOOST_ASIO_HAS_LOCAL_SOCKETS) + return tcp_client ? static_cast (*tcp_client) : static_cast (*domain_client); +#else + return static_cast (*tcp_client); +#endif + } + +private: + boost::asio::io_context & io_ctx; + std::shared_ptr> tcp_client; +#if defined(BOOST_ASIO_HAS_LOCAL_SOCKETS) + std::shared_ptr> domain_client; +#endif +}; + +nano::ipc::ipc_client::ipc_client (boost::asio::io_context & io_ctx_a) : +io_ctx (io_ctx_a) +{ +} + +nano::error nano::ipc::ipc_client::connect (std::string const & path_a) +{ + impl = std::make_unique (io_ctx); + return boost::polymorphic_downcast (impl.get ())->connect (path_a); +} + +void nano::ipc::ipc_client::async_connect (std::string const & host_a, uint16_t port_a, std::function callback_a) +{ + impl = std::make_unique (io_ctx); + auto client (boost::polymorphic_downcast (impl.get ())); + client->connect (host_a, port_a, callback_a); +} + +nano::error nano::ipc::ipc_client::connect (std::string const & host, uint16_t port) +{ + std::promise result_l; + async_connect (host, port, [&result_l](nano::error err_a) { + result_l.set_value (err_a); + }); + return result_l.get_future ().get (); +} + +void nano::ipc::ipc_client::async_write (std::shared_ptr> buffer_a, std::function callback_a) +{ + auto client (boost::polymorphic_downcast (impl.get ())); + client->get_channel ().async_write (buffer_a, [callback_a](const boost::system::error_code & ec_a, size_t bytes_written_a) { + callback_a (nano::error (ec_a), bytes_written_a); + }); +} + +void nano::ipc::ipc_client::async_read (std::shared_ptr> buffer_a, size_t size_a, std::function callback_a) +{ + auto client (boost::polymorphic_downcast (impl.get ())); + client->get_channel ().async_read (buffer_a, size_a, [callback_a](const boost::system::error_code & ec_a, size_t bytes_read_a) { + callback_a (nano::error (ec_a), bytes_read_a); + }); +} + +std::shared_ptr> nano::ipc::ipc_client::prepare_request (nano::ipc::payload_encoding encoding_a, std::string const & payload_a) +{ + auto buffer_l (std::make_shared> ()); + if (encoding_a == nano::ipc::payload_encoding::json_legacy) + { + buffer_l->push_back ('N'); + buffer_l->push_back (static_cast (encoding_a)); + buffer_l->push_back (0); + buffer_l->push_back (0); + + uint32_t payload_length = payload_a.size (); + uint32_t be = boost::endian::native_to_big (payload_length); + char * chars = reinterpret_cast (&be); + buffer_l->insert (buffer_l->end (), chars, chars + sizeof (uint32_t)); + buffer_l->insert (buffer_l->end (), payload_a.begin (), payload_a.end ()); + } + return buffer_l; +} + +std::string nano::ipc::rpc_ipc_client::request (std::string const & rpc_action_a) +{ + auto req (prepare_request (nano::ipc::payload_encoding::json_legacy, rpc_action_a)); + auto res (std::make_shared> ()); + + std::promise result_l; + async_write (req, [this, &res, &result_l](nano::error err_a, size_t size_a) { + // Read length + this->async_read (res, sizeof (uint32_t), [this, &res, &result_l](nano::error err_read_a, size_t size_read_a) { + uint32_t payload_size_l = boost::endian::big_to_native (*reinterpret_cast (res->data ())); + // Read json payload + this->async_read (res, payload_size_l, [&res, &result_l](nano::error err_read_a, size_t size_read_a) { + result_l.set_value (std::string (res->begin (), res->end ())); + }); + }); + }); + + return result_l.get_future ().get (); +} diff --git a/nano/node/ipc.hpp b/nano/node/ipc.hpp new file mode 100644 index 0000000000..ec4ae7acce --- /dev/null +++ b/nano/node/ipc.hpp @@ -0,0 +1,155 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace nano +{ +class node; +class rpc; +} + +namespace nano +{ +namespace ipc +{ + /** + * Payload encodings; add protobuf, flatbuffers and so on as needed. + */ + enum class payload_encoding : uint8_t + { + /** + * Request is preamble followed by 32-bit BE payload length and payload bytes. + * Response is 32-bit BE payload length followed by payload bytes. + */ + json_legacy = 1 + }; + + /** Removes domain socket files on startup and shutdown */ + class dsock_file_remover; + + /** IPC transport interface */ + class transport + { + public: + virtual void stop () = 0; + virtual ~transport () = default; + }; + + /** Base class for transport configurations */ + class ipc_config_transport + { + public: + virtual ~ipc_config_transport () = default; + bool enabled{ false }; + size_t io_timeout{ 15 }; + long io_threads{ -1 }; + }; + + /** Domain socket specific transport config */ + class ipc_config_domain_socket : public ipc_config_transport + { + public: + /** + * Default domain socket path for Unix systems. Once Boost supports Windows 10 usocks, + * this value will be conditional on OS. + */ + std::string path{ "/tmp/nano" }; + }; + + /** TCP specific transport config */ + class ipc_config_tcp_socket : public ipc_config_transport + { + public: + /** Listening port */ + uint16_t port{ 7077 }; + }; + + /** IPC configuration */ + class ipc_config + { + public: + nano::error deserialize_json (nano::jsonconfig & json_a); + nano::error serialize_json (nano::jsonconfig & json) const; + ipc_config_domain_socket transport_domain; + ipc_config_tcp_socket transport_tcp; + }; + + /** The IPC server accepts connections on one or more configured transports */ + class ipc_server + { + public: + ipc_server (nano::node & node, nano::rpc & rpc); + virtual ~ipc_server (); + void stop (); + + nano::node & node; + nano::rpc & rpc; + + /** Unique counter/id shared across sessions */ + std::atomic id_dispenser{ 0 }; + + private: + std::unique_ptr file_remover; + std::vector> transports; + }; + + class ipc_client_impl + { + public: + virtual ~ipc_client_impl () = default; + }; + + /** IPC client */ + class ipc_client + { + public: + ipc_client (boost::asio::io_context & io_ctx_a); + virtual ~ipc_client () = default; + + /** Connect to a domain socket */ + nano::error connect (std::string const & path); + + /** Connect to a tcp socket synchronously */ + nano::error connect (std::string const & host, uint16_t port); + + /** Connect to a tcp socket asynchronously */ + void async_connect (std::string const & host, uint16_t port, std::function callback); + + /** Write buffer asynchronously */ + void async_write (std::shared_ptr> buffer_a, std::function callback_a); + + /** Read \p size_a bytes asynchronously */ + void async_read (std::shared_ptr> buffer_a, size_t size_a, std::function callback_a); + + /** + * Returns a buffer with an IPC preamble for the given \p encoding_a followed by the payload. Depending on encoding, + * the buffer may contain a payload length or end sentinel. + */ + std::shared_ptr> prepare_request (nano::ipc::payload_encoding encoding_a, std::string const & payload_a); + + private: + boost::asio::io_context & io_ctx; + + // PIMPL pattern to hide implementation details + std::unique_ptr impl; + }; + + /** Convenience wrapper for making synchronous RPC calls via IPC */ + class rpc_ipc_client : public ipc_client + { + public: + rpc_ipc_client (boost::asio::io_context & io_ctx_a) : + ipc_client (io_ctx_a) + { + } + /** Calls the RPC server via IPC and waits for the result. The client must be connected. */ + std::string request (std::string const & rpc_action_a); + }; +} +} diff --git a/nano/node/logging.cpp b/nano/node/logging.cpp index 4d41dc6298..fd8e2a898a 100644 --- a/nano/node/logging.cpp +++ b/nano/node/logging.cpp @@ -18,6 +18,7 @@ network_node_id_handshake_logging_value (false), node_lifetime_tracing_value (false), insufficient_work_logging_value (true), log_rpc_value (true), +log_ipc_value (true), bulk_pull_logging_value (false), work_generation_time_value (true), upnp_details_logging_value (false), @@ -58,6 +59,7 @@ nano::error nano::logging::serialize_json (nano::jsonconfig & json) const json.put ("node_lifetime_tracing", node_lifetime_tracing_value); json.put ("insufficient_work", insufficient_work_logging_value); json.put ("log_rpc", log_rpc_value); + json.put ("log_ipc", log_ipc_value); json.put ("bulk_pull", bulk_pull_logging_value); json.put ("work_generation_time", work_generation_time_value); json.put ("upnp_details", upnp_details_logging_value); @@ -94,6 +96,7 @@ bool nano::logging::upgrade_json (unsigned version_a, nano::jsonconfig & json) json.get ("max_size", config_max_size); max_size = std::max (max_size, config_max_size); json.put ("max_size", max_size); + json.put ("log_ipc", true); upgraded_l = true; case 6: break; @@ -138,6 +141,7 @@ nano::error nano::logging::deserialize_json (bool & upgraded_a, nano::jsonconfig json.get ("node_lifetime_tracing", node_lifetime_tracing_value); json.get ("insufficient_work", insufficient_work_logging_value); json.get ("log_rpc", log_rpc_value); + json.get ("log_ipc", log_ipc_value); json.get ("bulk_pull", bulk_pull_logging_value); json.get ("work_generation_time", work_generation_time_value); json.get ("upnp_details", upnp_details_logging_value); @@ -210,6 +214,11 @@ bool nano::logging::log_rpc () const return network_logging () && log_rpc_value; } +bool nano::logging::log_ipc () const +{ + return network_logging () && log_ipc_value; +} + bool nano::logging::bulk_pull_logging () const { return network_logging () && bulk_pull_logging_value; diff --git a/nano/node/logging.hpp b/nano/node/logging.hpp index f4788e7d03..49c5acfa39 100644 --- a/nano/node/logging.hpp +++ b/nano/node/logging.hpp @@ -32,6 +32,7 @@ class logging bool upnp_details_logging () const; bool timing_logging () const; bool log_rpc () const; + bool log_ipc () const; bool bulk_pull_logging () const; bool callback_logging () const; bool work_generation_time () const; @@ -50,6 +51,7 @@ class logging bool node_lifetime_tracing_value; bool insufficient_work_logging_value; bool log_rpc_value; + bool log_ipc_value; bool bulk_pull_logging_value; bool work_generation_time_value; bool upnp_details_logging_value; diff --git a/nano/node/nodeconfig.cpp b/nano/node/nodeconfig.cpp index 2ea7d073b0..719a1561ad 100644 --- a/nano/node/nodeconfig.cpp +++ b/nano/node/nodeconfig.cpp @@ -115,6 +115,11 @@ nano::error nano::node_config::serialize_json (nano::jsonconfig & json) const json.put ("lmdb_max_dbs", lmdb_max_dbs); json.put ("block_processor_batch_max_time", block_processor_batch_max_time.count ()); json.put ("allow_local_peers", allow_local_peers); + + nano::jsonconfig ipc_l; + ipc_config.serialize_json (ipc_l); + json.put_child ("ipc", ipc_l); + return json.get_error (); } @@ -222,6 +227,10 @@ bool nano::node_config::upgrade_json (unsigned version_a, nano::jsonconfig & jso }); json.replace_child (preconfigured_peers_key, peers); + + nano::jsonconfig ipc_l; + ipc_config.serialize_json (ipc_l); + json.put_child ("ipc", ipc_l); upgraded = true; } case 16: @@ -315,6 +324,12 @@ nano::error nano::node_config::deserialize_json (bool & upgraded_a, nano::jsonco auto block_processor_batch_max_time_l (json.get ("block_processor_batch_max_time")); block_processor_batch_max_time = std::chrono::milliseconds (block_processor_batch_max_time_l); + auto ipc_config_l (json.get_optional_child ("ipc")); + if (ipc_config_l) + { + ipc_config.deserialize_json (ipc_config_l.get ()); + } + json.get ("peering_port", peering_port); json.get ("bootstrap_fraction_numerator", bootstrap_fraction_numerator); json.get ("online_weight_quorum", online_weight_quorum); diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index 7982a739cd..aabae5b3bf 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -44,6 +45,7 @@ class node_config int lmdb_max_dbs; bool allow_local_peers; nano::stat_config stat_config; + nano::ipc::ipc_config ipc_config; nano::uint256_union epoch_block_link; nano::account epoch_block_signer; std::chrono::milliseconds block_processor_batch_max_time; diff --git a/nano/node/rpc.cpp b/nano/node/rpc.cpp index 3ebbfb1168..685ab7949d 100644 --- a/nano/node/rpc.cpp +++ b/nano/node/rpc.cpp @@ -94,26 +94,38 @@ node (node_a) { } -void nano::rpc::start () +void nano::rpc::add_block_observer () { - auto endpoint (nano::tcp_endpoint (config.address, config.port)); - acceptor.open (endpoint.protocol ()); - acceptor.set_option (boost::asio::ip::tcp::acceptor::reuse_address (true)); + node.observers.blocks.add ([this](std::shared_ptr block_a, nano::account const & account_a, nano::uint128_t const &, bool) { + observer_action (account_a); + }); +} - boost::system::error_code ec; - acceptor.bind (endpoint, ec); - if (ec) +void nano::rpc::start (bool rpc_enabled_a) +{ + if (rpc_enabled_a) { - BOOST_LOG (node.log) << boost::str (boost::format ("Error while binding for RPC on port %1%: %2%") % endpoint.port () % ec.message ()); - throw std::runtime_error (ec.message ()); + auto endpoint (nano::tcp_endpoint (config.address, config.port)); + acceptor.open (endpoint.protocol ()); + acceptor.set_option (boost::asio::ip::tcp::acceptor::reuse_address (true)); + + boost::system::error_code ec; + acceptor.bind (endpoint, ec); + if (ec) + { + BOOST_LOG (node.log) << boost::str (boost::format ("Error while binding for RPC on port %1%: %2%") % endpoint.port () % ec.message ()); + throw std::runtime_error (ec.message ()); + } + + acceptor.listen (); } - acceptor.listen (); - node.observers.blocks.add ([this](std::shared_ptr block_a, nano::account const & account_a, nano::uint128_t const &, bool) { - observer_action (account_a); - }); + add_block_observer (); - accept (); + if (rpc_enabled_a) + { + accept (); + } } void nano::rpc::accept () diff --git a/nano/node/rpc.hpp b/nano/node/rpc.hpp index 772e65d32f..5dcc1f4a34 100644 --- a/nano/node/rpc.hpp +++ b/nano/node/rpc.hpp @@ -69,7 +69,13 @@ class rpc public: rpc (boost::asio::io_context &, nano::node &, nano::rpc_config const &); virtual ~rpc () = default; - void start (); + + /** + * Start serving RPC requests if \p rpc_enabled_a, otherwise this will only + * add a block observer since requests may still arrive via IPC. + */ + void start (bool rpc_enabled_a = true); + void add_block_observer (); virtual void accept (); void stop (); void observer_action (nano::account const &); diff --git a/nano/node/stats.cpp b/nano/node/stats.cpp index c043ea4255..8c33cf4615 100644 --- a/nano/node/stats.cpp +++ b/nano/node/stats.cpp @@ -316,6 +316,9 @@ std::string nano::stat::type_to_string (uint32_t key) std::string res; switch (type) { + case nano::stat::type::ipc: + res = "ipc"; + break; case nano::stat::type::block: res = "block"; break; @@ -410,6 +413,9 @@ std::string nano::stat::detail_to_string (uint32_t key) case nano::stat::detail::insufficient_work: res = "insufficient_work"; break; + case nano::stat::detail::invocations: + res = "invocations"; + break; case nano::stat::detail::keepalive: res = "keepalive"; break; diff --git a/nano/node/stats.hpp b/nano/node/stats.hpp index 202e40c815..e3b1d5e3ee 100644 --- a/nano/node/stats.hpp +++ b/nano/node/stats.hpp @@ -188,6 +188,7 @@ class stat vote, http_callback, peering, + ipc, udp }; @@ -249,6 +250,9 @@ class stat invalid_node_id_handshake_message, outdated_version, + // ipc + invocations, + // peering handshake, };