diff --git a/.clang-format-ignore b/.clang-format-ignore new file mode 100644 index 00000000000..31cb9e05bde --- /dev/null +++ b/.clang-format-ignore @@ -0,0 +1,2 @@ +CMakeLists.txt +*.cmake diff --git a/CMakeLists.txt b/CMakeLists.txt index a603e5535a1..8c0b2ecd107 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -17,7 +17,7 @@ SET(CMAKE_CXX_STANDARD 17) SET(CMAKE_POSITION_INDEPENDENT_CODE TRUE) if (NOT MSVC) - SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -fdiagnostics-color=always") + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -fdiagnostics-color=always -fno-lto") ##-fno-lto to fix 'bytecode stream version' SET(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O3 -Wno-error=deprecated-declarations") SET(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -g -Wextra -Wno-unused-parameter -Wno-deprecated-declarations -O0") else() diff --git a/irohad/ametsuchi/impl/postgres_wsv_query.cpp b/irohad/ametsuchi/impl/postgres_wsv_query.cpp index fcaf31821ea..419dbb7e602 100644 --- a/irohad/ametsuchi/impl/postgres_wsv_query.cpp +++ b/irohad/ametsuchi/impl/postgres_wsv_query.cpp @@ -6,6 +6,7 @@ #include "ametsuchi/impl/postgres_wsv_query.hpp" #include + #include "ametsuchi/impl/soci_std_optional.hpp" #include "ametsuchi/impl/soci_utils.hpp" #include "ametsuchi/ledger_state.hpp" @@ -79,6 +80,33 @@ namespace iroha { return getPeersFromSociRowSet(result); } + iroha::expected::Result PostgresWsvQuery::count( + std::string_view what) try { + int count; + sql_ << "SELECT count(*) FROM " << what, soci::into(count); + return count; + } catch (const std::exception &e) { + auto msg = + std::string{"Failed to count "}.append(what) + ", query: " + e.what(); + log_->error(msg); + return iroha::expected::makeError(msg); + } + + iroha::expected::Result + PostgresWsvQuery::countPeers() { + return count("peer"); + } + + iroha::expected::Result + PostgresWsvQuery::countDomains() { + return count("domain"); + } + + iroha::expected::Result + PostgresWsvQuery::countTransactions() { + return count("tx_positions"); + } + boost::optional> PostgresWsvQuery::getPeerByPublicKey( shared_model::interface::types::PublicKeyHexStringView public_key) { diff --git a/irohad/ametsuchi/impl/postgres_wsv_query.hpp b/irohad/ametsuchi/impl/postgres_wsv_query.hpp index 746541b716e..3d45de6d24c 100644 --- a/irohad/ametsuchi/impl/postgres_wsv_query.hpp +++ b/irohad/ametsuchi/impl/postgres_wsv_query.hpp @@ -6,9 +6,9 @@ #ifndef IROHA_POSTGRES_WSV_QUERY_HPP #define IROHA_POSTGRES_WSV_QUERY_HPP -#include "ametsuchi/wsv_query.hpp" - #include + +#include "ametsuchi/wsv_query.hpp" #include "logger/logger_fwd.hpp" namespace iroha { @@ -28,6 +28,10 @@ namespace iroha { std::vector>> getPeers() override; + iroha::expected::Result countPeers() override; + iroha::expected::Result countDomains() override; + iroha::expected::Result countTransactions() override; + boost::optional> getPeerByPublicKey(shared_model::interface::types::PublicKeyHexStringView public_key) override; @@ -43,6 +47,8 @@ namespace iroha { template auto execute(F &&f) -> boost::optional>; + iroha::expected::Result count(std::string_view); + // TODO andrei 24.09.2018: IR-1718 Consistent soci::session fields in // storage classes std::unique_ptr psql_; diff --git a/irohad/ametsuchi/impl/storage_impl.cpp b/irohad/ametsuchi/impl/storage_impl.cpp index b566b77c019..bb78559ce7a 100644 --- a/irohad/ametsuchi/impl/storage_impl.cpp +++ b/irohad/ametsuchi/impl/storage_impl.cpp @@ -5,14 +5,14 @@ #include "ametsuchi/impl/storage_impl.hpp" -#include - #include #include + #include #include #include #include + #include "ametsuchi/impl/mutable_storage_impl.hpp" #include "ametsuchi/impl/peer_query_wsv.hpp" #include "ametsuchi/impl/postgres_block_index.hpp" @@ -36,6 +36,7 @@ #include "logger/logger.hpp" #include "logger/logger_manager.hpp" #include "main/impl/pg_connection_init.hpp" +#include "main/subscription.hpp" namespace iroha { namespace ametsuchi { @@ -307,7 +308,11 @@ namespace iroha { if (not maybe_block) { return fmt::format("Failed to fetch block {}", height); } - notifier_.get_subscriber().on_next(*std::move(maybe_block)); + + std::shared_ptr block_ptr = + std::move(maybe_block.get()); + notifier_.get_subscriber().on_next(block_ptr); + getSubscription()->notify(EventTypes::kOnBlock, block_ptr); } return expected::makeValue(std::move(commit_result.ledger_state)); }; @@ -357,6 +362,9 @@ namespace iroha { } notifier_.get_subscriber().on_next(block); + getSubscription()->notify( + EventTypes::kOnBlock, + std::shared_ptr(block)); decltype(std::declval().getPeers()) opt_ledger_peers; { @@ -455,6 +463,9 @@ namespace iroha { std::shared_ptr block) { if (block_store_->insert(block)) { notifier_.get_subscriber().on_next(block); + getSubscription()->notify( + EventTypes::kOnBlock, + std::shared_ptr(block)); return {}; } return expected::makeError("Block insertion to storage failed"); diff --git a/irohad/ametsuchi/wsv_query.hpp b/irohad/ametsuchi/wsv_query.hpp index bc990996e41..8d2e52cf638 100644 --- a/irohad/ametsuchi/wsv_query.hpp +++ b/irohad/ametsuchi/wsv_query.hpp @@ -6,9 +6,9 @@ #ifndef IROHA_WSV_QUERY_HPP #define IROHA_WSV_QUERY_HPP +#include #include -#include #include "common/result.hpp" #include "interfaces/common_objects/peer.hpp" #include "interfaces/common_objects/string_view_types.hpp" @@ -40,6 +40,35 @@ namespace iroha { std::vector>> getPeers() = 0; + // ToDo? + // /** + // * @brief Fetch domains stored in ledger + // * @return list of domains in insertion to ledger order + // */ + // virtual iroha::expected::Result< + // std::vector>, + // std::string> + // getDomains() = 0; + + /** + * @brief Fetch number of domains in ledger + * @return number of domains in ledger + */ + virtual iroha::expected::Result countPeers() = 0; + + /** + * @brief Fetch number of domains in ledger + * @return number of domains in ledger + */ + virtual iroha::expected::Result countDomains() = 0; + + /** + * @brief Fetch number of valid transactions in ledger + * @return number of transactions in ledger + */ + virtual iroha::expected::Result + countTransactions() = 0; + /** * Fetch peer with given public key from ledger * @return the peer if found, none otherwise diff --git a/irohad/main/application.cpp b/irohad/main/application.cpp index 02be9fbeb19..20fe2214c3e 100644 --- a/irohad/main/application.cpp +++ b/irohad/main/application.cpp @@ -37,6 +37,7 @@ #include "main/impl/pg_connection_init.hpp" #include "main/impl/storage_init.hpp" #include "main/server_runner.hpp" +#include "main/subscription.hpp" #include "multi_sig_transactions/gossip_propagation_strategy.hpp" #include "multi_sig_transactions/mst_processor_impl.hpp" #include "multi_sig_transactions/mst_propagation_strategy_stub.hpp" @@ -114,7 +115,8 @@ Irohad::Irohad( const boost::optional &opt_mst_gossip_params, boost::optional inter_peer_tls_config) - : config_(config), + : se_(getSubscription()), + config_(config), listen_ip_(listen_ip), keypair_(keypair), startup_wsv_sync_policy_(startup_wsv_sync_policy), @@ -154,6 +156,7 @@ Irohad::~Irohad() { } consensus_gate_objects_lifetime.unsubscribe(); consensus_gate_events_subscription.unsubscribe(); + se_->dispose(); } /** @@ -1011,7 +1014,11 @@ Irohad::RunResult Irohad::run() { storage->on_commit().subscribe( ordering_init.commit_notifier.get_subscriber()); - ordering_init.commit_notifier.get_subscriber().on_next(std::move(block)); + std::shared_ptr sh_block{ + std::move(block)}; + ordering_init.commit_notifier.get_subscriber().on_next(sh_block); + getSubscription()->notify(EventTypes::kOnBlock, // kOnInitialBlock + sh_block); ordering_init.sync_event_notifier.get_subscriber().on_next( synchronizer::SynchronizationEvent{ diff --git a/irohad/main/application.hpp b/irohad/main/application.hpp index a8307b2f292..c7489b427ff 100644 --- a/irohad/main/application.hpp +++ b/irohad/main/application.hpp @@ -21,6 +21,7 @@ #include "main/iroha_conf_loader.hpp" #include "main/server_runner.hpp" #include "main/startup_params.hpp" +#include "main/subscription.hpp" #include "multi_sig_transactions/gossip_propagation_strategy_params.hpp" #include "torii/tls_params.hpp" @@ -215,6 +216,7 @@ class Irohad { virtual RunResult initWsvRestorer(); // constructor dependencies + std::shared_ptr se_; IrohadConfig config_; const std::string listen_ip_; boost::optional keypair_; diff --git a/irohad/main/irohad.cpp b/irohad/main/irohad.cpp index cb047b1bc9b..e3f615705dd 100644 --- a/irohad/main/irohad.cpp +++ b/irohad/main/irohad.cpp @@ -28,11 +28,11 @@ #include "main/iroha_conf_literals.hpp" #include "main/iroha_conf_loader.hpp" #include "main/raw_block_loader.hpp" +#include "maintenance/metrics.hpp" #include "network/impl/channel_factory.hpp" #include "util/status_notifier.hpp" #include "util/utility_service.hpp" #include "validators/field_validator.hpp" -#include "maintenance/metrics.hpp" #if defined(USE_LIBURSA) #include "cryptography/ed25519_ursa_impl/crypto_provider.hpp" @@ -108,8 +108,12 @@ DEFINE_string(verbosity, kLogSettingsFromConfigFile, "Log verbosity"); DEFINE_validator(verbosity, &validateVerbosity); /// Metrics. ToDo validator -DEFINE_string(metrics_addr, "127.0.0.1", "Prometeus HTTP server listen address"); -DEFINE_string(metrics_port, "", "Prometeus HTTP server listens port, disabled by default"); +DEFINE_string(metrics_addr, + "127.0.0.1", + "Prometeus HTTP server listen address"); +DEFINE_string(metrics_port, + "", + "Prometeus HTTP server listens port, disabled by default"); std::sig_atomic_t caught_signal = 0; std::promise exit_requested; @@ -260,12 +264,6 @@ int main(int argc, char *argv[]) { log_manager); } - if(FLAGS_metrics_port.size()) { - maintenance_metrics_init(FLAGS_metrics_addr + ":" + FLAGS_metrics_port); - }else if(config.metrics_addr_port.size()){ - maintenance_metrics_init(config.metrics_addr_port); - } - daemon_status_notifier->notify( ::iroha::utility_service::Status::kInitialization); @@ -375,6 +373,14 @@ int main(int argc, char *argv[]) { // clear previous storage if any irohad->dropStorage(); + // Check if iroha daemon storage was successfully re-initialized + if (not irohad->storage) { + // Abort execution if not + log->error("Failed to re-initialize storage"); + daemon_status_notifier->notify( + ::iroha::utility_service::Status::kFailed); + return EXIT_FAILURE; + } const auto txs_num = block->transactions().size(); if (auto e = iroha::expected::resultToOptionalError( @@ -431,6 +437,28 @@ int main(int argc, char *argv[]) { return EXIT_FAILURE; } + std::shared_ptr metrics; // Must be a pointer because 'this' is + // captured to lambdas in constructor. + std::string metrics_addr; + if (FLAGS_metrics_port.size()) { + metrics_addr = FLAGS_metrics_addr + ":" + FLAGS_metrics_port; + } else if (config.metrics_addr_port.size()) { + metrics_addr = config.metrics_addr_port; + } + if (metrics_addr.empty()) { + log->info("Skiping Metrics initialization."); + } else { + try { + metrics = + Metrics::create(metrics_addr, + irohad->storage, + log_manager->getChild("Metrics")->getLogger()); + log->info("Metrics listens on {}", metrics->getListenAddress()); + } catch (std::exception const &ex) { + log->warn("Failed to initialize Metrics: {}", ex.what()); + } + } + // init pipeline components auto init_result = irohad->init(); if (auto error = diff --git a/irohad/main/subscription.hpp b/irohad/main/subscription.hpp index 488675f1dc4..92b57b940c3 100644 --- a/irohad/main/subscription.hpp +++ b/irohad/main/subscription.hpp @@ -17,6 +17,7 @@ namespace iroha { kYac = 0, kRequestProposal, kVoteProcess, + kMetrics, //--------------- kTotalCount }; diff --git a/irohad/maintenance/CMakeLists.txt b/irohad/maintenance/CMakeLists.txt index fc275878397..b8b860ed8fb 100644 --- a/irohad/maintenance/CMakeLists.txt +++ b/irohad/maintenance/CMakeLists.txt @@ -6,4 +6,5 @@ add_library(maintenance metrics.cpp) target_link_libraries(maintenance prometheus-cpp::core prometheus-cpp::pull + async_subscription ) diff --git a/irohad/maintenance/metrics.cpp b/irohad/maintenance/metrics.cpp index b0b9ec89530..e43506a94ad 100644 --- a/irohad/maintenance/metrics.cpp +++ b/irohad/maintenance/metrics.cpp @@ -9,88 +9,144 @@ #include #include -#include -#include -#include #include -#include -#include -#include #include -std::shared_ptr maintenance_metrics_init(std::string const& listen_addr) -{ - using namespace prometheus; +#include "CivetServer.h" // for CivetCallbacks +#include "interfaces/commands/add_peer.hpp" +#include "interfaces/commands/command.hpp" +#include "interfaces/commands/create_domain.hpp" +#include "interfaces/commands/remove_peer.hpp" +#include "interfaces/iroha_internal/block.hpp" +#include "interfaces/transaction.hpp" +#include "logger/logger.hpp" +#include "main/subscription.hpp" - static const std::regex full_matcher("^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]):[0-9]+$"); +using namespace iroha; +using namespace prometheus; + +Metrics::Metrics(std::string const &listen_addr, + std::shared_ptr storage, + logger::LoggerPtr const &logger) + : storage_(storage), logger_(logger) { + static const std::regex full_matcher( + "^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}([0-9]|[1-9][0-" + "9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]):[0-9]+$"); static const std::regex port_matcher("^:?([0-9]{1,5})$"); - std::string listen_addr_port; - if(std::regex_match(listen_addr,full_matcher)) { - listen_addr_port = listen_addr; - } else if(std::regex_match(listen_addr,port_matcher)) { - listen_addr_port = "127.0.0.1"; + if (std::regex_match(listen_addr, full_matcher)) { + listen_addr_port_ = listen_addr; + } else if (std::regex_match(listen_addr, port_matcher)) { + listen_addr_port_ = "127.0.0.1"; if (listen_addr[0] != ':') - listen_addr_port += ":"; - listen_addr_port += listen_addr; + listen_addr_port_ += ":"; + listen_addr_port_ += listen_addr; } else { - return nullptr; + throw std::runtime_error("Metrics does not accept listen address '" + + listen_addr + "'"); } - // create a metrics registry // @note it's the users responsibility to keep the object alive - auto registry = std::make_shared(); - - // Just for example - std::thread([registry,listen_addr_port](){ - // create an http server running on addr:port - Exposer exposer{listen_addr_port}; - - // ask the exposer to scrape the registry on incoming HTTP requests - exposer.RegisterCollectable(registry); - - // add a new counter family to the registry (families combine values with the - // same name, but distinct label dimensions) - // - // @note please follow the metric-naming best-practices: - // https://prometheus.io/docs/practices/naming/ - auto& packet_counter = BuildCounter() - .Name("observed_packets_total") - .Help("Number of observed packets") - .Register(*registry); - - // add and remember dimensional data, incrementing those is very cheap - auto& tcp_rx_counter = - packet_counter.Add({{"protocol", "tcp"}, {"direction", "rx"}}); - auto& tcp_tx_counter = - packet_counter.Add({{"protocol", "tcp"}, {"direction", "tx"}}); - auto& udp_rx_counter = - packet_counter.Add({{"protocol", "udp"}, {"direction", "rx"}}); - auto& udp_tx_counter = - packet_counter.Add({{"protocol", "udp"}, {"direction", "tx"}}); - - // add a counter whose dimensional data is not known at compile time - // nevertheless dimensional values should only occur in low cardinality: - // https://prometheus.io/docs/practices/naming/#labels - auto& http_requests_counter = BuildCounter() - .Name("http_requests_total") - .Help("Number of HTTP requests") - .Register(*registry); - - for (;;) { - std::this_thread::sleep_for(std::chrono::seconds(1)); - const auto random_value = std::rand(); - - if (random_value & 1) tcp_rx_counter.Increment(); - if (random_value & 2) tcp_tx_counter.Increment(); - if (random_value & 4) udp_rx_counter.Increment(); - if (random_value & 8) udp_tx_counter.Increment(); - - const std::array methods = {"GET", "PUT", "POST", "HEAD"}; - auto method = methods.at(random_value % methods.size()); - // dynamically calling Family.Add() works but is slow and should be avoided - http_requests_counter.Add({{"method", method}}).Increment(); + registry_ = std::make_shared(); + + CivetCallbacks cvcbs; + cvcbs.log_message = [](const struct mg_connection *conn, + const char *message) { + // logger_->info("{}, conn={}",message,conn); + return 1; // return non-zero to disable civetweb logger + }; + cvcbs.log_access = [](const struct mg_connection *conn, const char *message) { + // logger_->debug("{}, conn={}",message,conn); + return 1; // return non-zero to disable civetweb logger + }; + // create an http server running on addr:port + exposer_ = std::make_shared(listen_addr_port_, + /*num_threads*/ 2, + &cvcbs); + + // ask the exposer_ to scrape the registry_ on incoming HTTP requests + exposer_->RegisterCollectable(registry_, "/metrics"); + + auto &block_height_gauge = BuildGauge() + .Name("blocks_height") + .Help("Total number of blocks in chain") + .Register(*registry_); + auto &block_height = block_height_gauge.Add({}); + block_height.Set(storage_->getBlockQuery()->getTopBlockHeight()); + + auto &peers_number_gauge = + BuildGauge() + .Name("peers_number") + .Help("Total number peers to send transactions and request proposals") + .Register(*registry_); + auto &number_of_peers = peers_number_gauge.Add({}); + number_of_peers.Set(storage_->getWsvQuery()->getPeers()->size()); + + auto &domains_number_gauge = BuildGauge() + .Name("number_of_domains") + .Help("Total number of domains in WSV") + .Register(*registry_); + auto &domains_number = domains_number_gauge.Add({}); + domains_number.Set(storage_->getWsvQuery()->countDomains().assumeValue()); + + auto &total_number_of_transactions_gauge = + BuildGauge() + .Name("total_number_of_transactions") + .Help("Total number of transactions in blockchain") + .Register(*registry_); + auto &total_number_of_transactions = + total_number_of_transactions_gauge.Add({}); + total_number_of_transactions.Set( + storage_->getWsvQuery()->countTransactions().assumeValue()); + + auto &number_of_signatures_in_last_block_gauge = + BuildGauge() + .Name("number_of_signatures_in_last_block") + .Help("Number of signatures in last block") + .Register(*registry_); + auto &number_of_signatures_in_last_block = + number_of_signatures_in_last_block_gauge.Add({}); + + auto calc_diffs = [](shared_model::interface::Block const &block) { + int domains_diff = 0, peers_diff = 0; + using namespace shared_model::interface; + for (Transaction const &trx : block.transactions()) { + for (Command const &cmd : trx.commands()) { + domains_diff += cmd.is() ? 1 : 0; + peers_diff += cmd.is() ? 1 : 0; + peers_diff -= cmd.is() ? 1 : 0; + } } - }).detach(); + return std::tuple{domains_diff, peers_diff}; + }; - return {registry}; + block_subscriber_ = SubscriberCreator::template create< + EventTypes::kOnBlock, + SubscriptionEngineHandlers::kMetrics>( + [&, registry = this->registry_] /// Values are stored in registry_, hold + /// strong reference to it here + (auto &, BlockPtr pblock) { + // block_height is captured by reference because it is stored inside + // registry_, which is shared_ptr + assert(pblock); + block_height.Set(pblock->height()); + number_of_signatures_in_last_block.Set( + boost::size(pblock->signatures())); + total_number_of_transactions.Increment( + boost::size(pblock->transactions())); + auto [domains_diff, peers_diff] = calc_diffs(*pblock); + number_of_peers.Increment(peers_diff); +#if 1 + domains_number.Increment(domains_diff); +#else // no need to querry DB but here is a way to do + if (domains_diff) { + assert(storage_); + assert(storage_->getWsvQuery()); + auto opt_n_domains = storage_->getWsvQuery()->getNumberOfDomains(); + if (opt_n_domains) + domains_number.Set(*opt_n_domains); + else + logger_->warn("Cannot getNumberOfDomains() from WSV"); + } +#endif + }); } diff --git a/irohad/maintenance/metrics.hpp b/irohad/maintenance/metrics.hpp index b8ca53cfa2a..c3cf39d5b2d 100644 --- a/irohad/maintenance/metrics.hpp +++ b/irohad/maintenance/metrics.hpp @@ -6,11 +6,53 @@ #ifndef IROHA_MAINTENANCE_METRICS_HPP #define IROHA_MAINTENANCE_METRICS_HPP -#include -#include +#include #include -std::shared_ptr - maintenance_metrics_init(std::string const& listen_addr); +#include +#include +#include +#include + +#include "ametsuchi/storage.hpp" +#include "ametsuchi/wsv_query.hpp" +#include "interfaces/common_objects/types.hpp" +#include "interfaces/iroha_internal/block.hpp" +#include "logger/logger_fwd.hpp" +#include "main/subscription.hpp" +#include "network/ordering_gate_common.hpp" + +class Metrics : public std::enable_shared_from_this { + using OnProposalSubscription = iroha::BaseSubscriber< + bool, + iroha::network::OrderingEvent>; // FixMe subscribtion ≠ subscriber + using BlockPtr = std::shared_ptr; + using BlockSubscriber = iroha::BaseSubscriber; + + std::string listen_addr_port_; + std::shared_ptr exposer_; + std::shared_ptr registry_; + std::shared_ptr storage_; + std::shared_ptr block_subscriber_; + std::shared_ptr on_proposal_subscription_; + logger::LoggerPtr logger_; + + Metrics(std::string const &listen_addr, + std::shared_ptr storage, + logger::LoggerPtr const &logger); + + public: + std::string const &getListenAddress() const { + return listen_addr_port_; + } + + template + static std::shared_ptr create(Ts &&... args) { + struct Resolver : Metrics { + Resolver(Ts &&... args) : Metrics(std::forward(args)...) {} + }; + return std::make_shared(std::forward(args)...); + } +}; -#endif //IROHA_MAINTENANCE_METRICS_HPP +#endif // IROHA_MAINTENANCE_METRICS_HPP diff --git a/shared_model/backend/protobuf/impl/block.cpp b/shared_model/backend/protobuf/impl/block.cpp index 62f242296c6..c2fa51f122f 100644 --- a/shared_model/backend/protobuf/impl/block.cpp +++ b/shared_model/backend/protobuf/impl/block.cpp @@ -6,6 +6,7 @@ #include "backend/protobuf/block.hpp" #include + #include "backend/protobuf/common_objects/signature.hpp" #include "backend/protobuf/transaction.hpp" #include "backend/protobuf/util.hpp" @@ -39,7 +40,7 @@ namespace shared_model { SignatureSetType signatures_{[this] { auto signatures = *proto_.mutable_signatures() | boost::adaptors::transformed( - [](auto &x) { return proto::Signature(x); }); + [](auto &x) { return proto::Signature(x); }); return SignatureSetType(signatures.begin(), signatures.end()); }()}; @@ -113,7 +114,7 @@ namespace shared_model { impl_->signatures_ = [this] { auto signatures = *impl_->proto_.mutable_signatures() | boost::adaptors::transformed( - [](auto &x) { return proto::Signature(x); }); + [](auto &x) { return proto::Signature(x); }); return SignatureSetType(signatures.begin(), signatures.end()); }(); @@ -131,6 +132,7 @@ namespace shared_model { } interface::types::TransactionsNumberType Block::txsNumber() const { + // return boost::size(transactions()); return impl_->payload_.tx_number(); } diff --git a/shared_model/interfaces/commands/command.hpp b/shared_model/interfaces/commands/command.hpp index 49f590cf743..d61f670e255 100644 --- a/shared_model/interfaces/commands/command.hpp +++ b/shared_model/interfaces/commands/command.hpp @@ -6,10 +6,10 @@ #ifndef IROHA_SHARED_MODEL_COMMAND_HPP #define IROHA_SHARED_MODEL_COMMAND_HPP -#include "interfaces/base/model_primitive.hpp" - #include +#include "interfaces/base/model_primitive.hpp" + namespace shared_model { namespace interface { @@ -77,6 +77,11 @@ namespace shared_model { std::string toString() const override; bool operator==(const ModelType &rhs) const override; + + template + bool is() const { + return boost::get(&get()) != nullptr; + } }; } // namespace interface diff --git a/shared_model/interfaces/iroha_internal/block.hpp b/shared_model/interfaces/iroha_internal/block.hpp index fa3b57c0ad0..1c83241c8ef 100644 --- a/shared_model/interfaces/iroha_internal/block.hpp +++ b/shared_model/interfaces/iroha_internal/block.hpp @@ -24,6 +24,7 @@ namespace shared_model { * @return hash of a previous block */ virtual const types::HashType &prevHash() const = 0; + /** * @return amount of transactions in block */ diff --git a/test/integration/validation/CMakeLists.txt b/test/integration/validation/CMakeLists.txt index e8f544ba76a..ca87614752a 100644 --- a/test/integration/validation/CMakeLists.txt +++ b/test/integration/validation/CMakeLists.txt @@ -8,4 +8,5 @@ target_link_libraries(chain_validator_storage_test yac chain_validator test_logger + sync_subscription ) diff --git a/test/module/irohad/CMakeLists.txt b/test/module/irohad/CMakeLists.txt index f74ee5187e6..27491c5268a 100644 --- a/test/module/irohad/CMakeLists.txt +++ b/test/module/irohad/CMakeLists.txt @@ -4,8 +4,9 @@ # # Reusable tests -add_subdirectory(ametsuchi) add_subdirectory(common) +add_subdirectory(subscription) +add_subdirectory(ametsuchi) add_subdirectory(consensus) add_subdirectory(logger) add_subdirectory(main) @@ -18,4 +19,3 @@ add_subdirectory(synchronizer) add_subdirectory(torii) add_subdirectory(validation) add_subdirectory(pending_txs_storage) -add_subdirectory(subscription) diff --git a/test/module/irohad/ametsuchi/CMakeLists.txt b/test/module/irohad/ametsuchi/CMakeLists.txt index 887177fa11a..0be05502c2b 100644 --- a/test/module/irohad/ametsuchi/CMakeLists.txt +++ b/test/module/irohad/ametsuchi/CMakeLists.txt @@ -11,26 +11,30 @@ target_link_libraries(ametsuchi_test flat_file_storage shared_model_stateless_validation test_logger + sync_subscription ) addtest(wsv_query_command_test wsv_query_command_test.cpp) target_link_libraries(wsv_query_command_test ametsuchi ametsuchi_fixture + sync_subscription test_logger ) addtest(wsv_query_test wsv_query_test.cpp) target_link_libraries(wsv_query_test - ametsuchi - ametsuchi_fixture - test_logger - ) + ametsuchi + ametsuchi_fixture + sync_subscription + test_logger + ) addtest(flat_file_test flat_file_test.cpp) target_link_libraries(flat_file_test ametsuchi test_logger + sync_subscription ) addtest(block_query_test block_query_test.cpp) @@ -38,6 +42,7 @@ target_link_libraries(block_query_test ametsuchi ametsuchi_fixture shared_model_stateless_validation + sync_subscription ) addtest(storage_init_test storage_init_test.cpp) @@ -47,12 +52,14 @@ target_link_libraries(storage_init_test shared_model_proto_backend pg_connection_init test_logger + sync_subscription ) addtest(postgres_options_test postgres_options_test.cpp) target_link_libraries(postgres_options_test ametsuchi test_logger + sync_subscription ) addtest(postgres_executor_test postgres_executor_test.cpp) @@ -65,6 +72,7 @@ target_link_libraries(postgres_executor_test pg_connection_init test_logger common_test_constants + sync_subscription ) addtest(postgres_query_executor_test postgres_query_executor_test.cpp) @@ -76,41 +84,47 @@ target_link_libraries(postgres_query_executor_test commands_mocks_factory test_logger RapidJSON::rapidjson + sync_subscription ) addtest(tx_presence_cache_test tx_presence_cache_test.cpp) target_link_libraries(tx_presence_cache_test ametsuchi shared_model_interfaces_factories + sync_subscription ) addtest(settings_test settings_test.cpp) target_link_libraries(settings_test - ametsuchi - ametsuchi_fixture - test_logger - commands_mocks_factory - ) + ametsuchi + ametsuchi_fixture + test_logger + commands_mocks_factory + sync_subscription + ) addtest(in_memory_block_storage_test in_memory_block_storage_test.cpp) target_link_libraries(in_memory_block_storage_test ametsuchi + sync_subscription ) addtest(flat_file_block_storage_test flat_file_block_storage_test.cpp) target_link_libraries(flat_file_block_storage_test ametsuchi test_logger + sync_subscription ) addtest(postgres_block_storage_test postgres_block_storage_test.cpp) target_link_libraries(postgres_block_storage_test - ametsuchi - generator - test_logger - integration_framework_config_helper - pg_connection_init - ) + ametsuchi + generator + test_logger + integration_framework_config_helper + pg_connection_init + sync_subscription + ) add_library(ametsuchi_fixture INTERFACE) target_link_libraries(ametsuchi_fixture INTERFACE @@ -126,11 +140,13 @@ target_link_libraries(ametsuchi_fixture INTERFACE addtest(k_times_reconnection_strategy_test k_times_reconnection_strategy_test.cpp) target_link_libraries(k_times_reconnection_strategy_test ametsuchi + sync_subscription ) addtest(peer_query_wsv_test peer_query_wsv_test.cpp) target_link_libraries(peer_query_wsv_test ametsuchi + sync_subscription ) if(USE_BURROW) diff --git a/test/module/irohad/ametsuchi/mock_wsv_query.hpp b/test/module/irohad/ametsuchi/mock_wsv_query.hpp index 4a81e0e3421..cf37ff27ba4 100644 --- a/test/module/irohad/ametsuchi/mock_wsv_query.hpp +++ b/test/module/irohad/ametsuchi/mock_wsv_query.hpp @@ -6,10 +6,10 @@ #ifndef IROHA_MOCK_WSV_QUERY_HPP #define IROHA_MOCK_WSV_QUERY_HPP -#include "ametsuchi/wsv_query.hpp" - #include + #include "ametsuchi/ledger_state.hpp" +#include "ametsuchi/wsv_query.hpp" namespace testing { // iroha::TopBlockInfo is not default-constructible, so this provides a @@ -50,6 +50,12 @@ namespace iroha { MOCK_CONST_METHOD0( getTopBlockInfo, iroha::expected::Result()); + + MOCK_METHOD0(countPeers, iroha::expected::Result()); + MOCK_METHOD0(countDomains, + iroha::expected::Result()); + MOCK_METHOD0(countTransactions, + iroha::expected::Result()); }; } // namespace ametsuchi diff --git a/test/module/irohad/ametsuchi/wsv_query_test.cpp b/test/module/irohad/ametsuchi/wsv_query_test.cpp index 02636988c6d..02d8403f759 100644 --- a/test/module/irohad/ametsuchi/wsv_query_test.cpp +++ b/test/module/irohad/ametsuchi/wsv_query_test.cpp @@ -5,6 +5,7 @@ #include +#include "ametsuchi/impl/postgres_indexer.hpp" #include "ametsuchi/impl/postgres_wsv_command.hpp" #include "ametsuchi/impl/postgres_wsv_query.hpp" #include "backend/plain/account.hpp" @@ -51,6 +52,8 @@ namespace iroha { * @then peer list successfully received */ TEST_F(WsvQueryTest, GetPeers) { + ASSERT_EQ(query->countPeers().assumeValue(), 0); + shared_model::plain::Peer peer1{"some-address", "0a", std::nullopt}; command->insertPeer(peer1); shared_model::plain::Peer peer2{"another-address", "0b", std::nullopt}; @@ -61,6 +64,45 @@ namespace iroha { ASSERT_THAT(*result, testing::ElementsAre(testing::Pointee(testing::Eq(peer1)), testing::Pointee(testing::Eq(peer2)))); + + ASSERT_EQ(query->countPeers().assumeValue(), 2); + } + + TEST_F(WsvQueryTest, countDomains) { + using shared_model::plain::Domain; + using namespace iroha::expected; + command->insertRole("user"); + ASSERT_EQ(query->countDomains().assumeValue(), 0); + ASSERT_FALSE(hasError(command->insertDomain(Domain{"aaa", "user"}))); + ASSERT_FALSE(hasError(command->insertDomain(Domain{"ccc", "user"}))); + ASSERT_EQ(query->countDomains().assumeValue(), 2); + } + + TEST_F(WsvQueryTest, countPeers) { + ASSERT_EQ(query->countPeers().assumeValue(), 0); + command->insertPeer( + shared_model::plain::Peer{"127.0.0.1", "111", std::nullopt}); + command->insertPeer( + shared_model::plain::Peer{"127.0.0.2", "222", std::nullopt}); + ASSERT_EQ(query->countPeers().assumeValue(), 2); + } + + TEST_F(WsvQueryTest, countTransactions) { + ASSERT_EQ(query->countTransactions().assumeValue(), 0); + auto indexer = iroha::ametsuchi::PostgresIndexer(*sql); + using shared_model::crypto::Hash, iroha::ametsuchi::Indexer; + indexer.txPositions("account_type", + Hash("abdef1"), + boost::none, + 123346, + Indexer::TxPosition{1, 2}); + indexer.txPositions("account_type", + Hash("abdef2"), + boost::none, + 123347, + Indexer::TxPosition{1, 3}); + indexer.flush(); + ASSERT_EQ(query->countTransactions().assumeValue(), 2); } /** diff --git a/test/module/irohad/torii/processor/query_processor_test.cpp b/test/module/irohad/torii/processor/query_processor_test.cpp index 7538004b03c..44dcab6d63a 100644 --- a/test/module/irohad/torii/processor/query_processor_test.cpp +++ b/test/module/irohad/torii/processor/query_processor_test.cpp @@ -4,6 +4,7 @@ */ #include + #include "backend/protobuf/block.hpp" #include "backend/protobuf/proto_query_response_factory.hpp" #include "backend/protobuf/query_responses/proto_error_query_response.hpp" @@ -17,7 +18,6 @@ #include "module/irohad/ametsuchi/mock_block_query.hpp" #include "module/irohad/ametsuchi/mock_query_executor.hpp" #include "module/irohad/ametsuchi/mock_storage.hpp" -#include "module/irohad/ametsuchi/mock_wsv_query.hpp" #include "module/irohad/validation/validation_mocks.hpp" #include "module/shared_model/builders/protobuf/test_block_builder.hpp" #include "module/shared_model/builders/protobuf/test_query_builder.hpp" diff --git a/vcpkg/VCPKG_DEPS_LIST b/vcpkg/VCPKG_DEPS_LIST index 3c282a59095..5eb986b9d92 100644 --- a/vcpkg/VCPKG_DEPS_LIST +++ b/vcpkg/VCPKG_DEPS_LIST @@ -22,4 +22,3 @@ boost-accumulators boost-property-tree boost-process iroha-ed25519 -prometheus-cpp diff --git a/vcpkg/VCPKG_HEAD_DEPS_LIST b/vcpkg/VCPKG_HEAD_DEPS_LIST index f81961fe406..a55faaa66bd 100644 --- a/vcpkg/VCPKG_HEAD_DEPS_LIST +++ b/vcpkg/VCPKG_HEAD_DEPS_LIST @@ -1 +1,2 @@ rxcpp +prometheus-cpp diff --git a/vcpkg/patches/0005-disable-prometheus-logs.patch b/vcpkg/patches/0005-disable-prometheus-logs.patch new file mode 100644 index 00000000000..1220b81b8f7 --- /dev/null +++ b/vcpkg/patches/0005-disable-prometheus-logs.patch @@ -0,0 +1,69 @@ +diff --git a/ports/prometheus-cpp/Add-CivetCallbacks-to-Exposer.patch b/ports/prometheus-cpp/Add-CivetCallbacks-to-Exposer.patch +new file mode 100644 +index 000000000..0a237dd54 +--- /dev/null ++++ b/ports/prometheus-cpp/Add-CivetCallbacks-to-Exposer.patch +@@ -0,0 +1,51 @@ ++diff --git a/pull/include/prometheus/exposer.h b/pull/include/prometheus/exposer.h ++index 3e4e01c..6a9c3ff 100644 ++--- a/pull/include/prometheus/exposer.h +++++ b/pull/include/prometheus/exposer.h ++@@ -10,6 +10,7 @@ ++ #include "prometheus/detail/pull_export.h" ++ ++ class CivetServer; +++class CivetCallbacks; ++ ++ namespace prometheus { ++ ++@@ -20,8 +21,9 @@ class Endpoint; ++ class PROMETHEUS_CPP_PULL_EXPORT Exposer { ++ public: ++ explicit Exposer(const std::string& bind_address, ++- const std::size_t num_threads = 2); ++- explicit Exposer(std::vector options); +++ const std::size_t num_threads = 2, +++ const CivetCallbacks *callbacks = nullptr); +++ explicit Exposer(std::vector options, const CivetCallbacks *callbacks = nullptr); ++ ~Exposer(); ++ void RegisterCollectable(const std::weak_ptr& collectable, ++ const std::string& uri = std::string("/metrics")); ++diff --git a/pull/src/exposer.cc b/pull/src/exposer.cc ++index ac53bc8..df1dbaa 100644 ++--- a/pull/src/exposer.cc +++++ b/pull/src/exposer.cc ++@@ -11,13 +11,18 @@ ++ ++ namespace prometheus { ++ ++-Exposer::Exposer(const std::string& bind_address, const std::size_t num_threads) +++Exposer::Exposer(const std::string& bind_address, +++ const std::size_t num_threads, +++ const CivetCallbacks *callbacks) ++ : Exposer(std::vector{"listening_ports", bind_address, ++ "num_threads", ++- std::to_string(num_threads)}) {} +++ std::to_string(num_threads)}, +++ callbacks) {} ++ ++-Exposer::Exposer(std::vector options) ++- : server_(detail::make_unique(std::move(options))) {} +++Exposer::Exposer(std::vector options, +++ const CivetCallbacks *callbacks) +++ : server_(detail::make_unique(std::move(options), +++ callbacks)) {} ++ ++ Exposer::~Exposer() = default; ++ +diff --git a/ports/prometheus-cpp/portfile.cmake b/ports/prometheus-cpp/portfile.cmake +index c6048af18..4a9a749d5 100644 +--- a/ports/prometheus-cpp/portfile.cmake ++++ b/ports/prometheus-cpp/portfile.cmake +@@ -8,6 +8,7 @@ vcpkg_from_github( + REF v0.9.0 + SHA512 d9d5fbbd8c8aad5dd6a5e872275324d689a0c57199e4158d74e13ea62b286fa71dee01bb4197b906b79792bf1ca4e67a46b5c04621d7070241ac32876f6de891 + HEAD_REF master ++ PATCHES Add-CivetCallbacks-to-Exposer.patch + ) + + macro(feature FEATURENAME OPTIONNAME) diff --git a/vcpkg/patches/0006-prometheus-remove-self-metrics.patch b/vcpkg/patches/0006-prometheus-remove-self-metrics.patch new file mode 100644 index 00000000000..c5b354a27ef --- /dev/null +++ b/vcpkg/patches/0006-prometheus-remove-self-metrics.patch @@ -0,0 +1,104 @@ +diff --git a/ports/prometheus-cpp/portfile.cmake b/ports/prometheus-cpp/portfile.cmake +index c6048af18..50ab2c3c6 100644 +--- a/ports/prometheus-cpp/portfile.cmake ++++ b/ports/prometheus-cpp/portfile.cmake +@@ -8,6 +8,7 @@ vcpkg_from_github( + REF v0.9.0 + SHA512 d9d5fbbd8c8aad5dd6a5e872275324d689a0c57199e4158d74e13ea62b286fa71dee01bb4197b906b79792bf1ca4e67a46b5c04621d7070241ac32876f6de891 + HEAD_REF master ++ PATCHES Add-CivetCallbacks-to-Exposer.patch remove-handler-self-metrics.patch + ) + + macro(feature FEATURENAME OPTIONNAME) +diff --git a/ports/prometheus-cpp/remove-handler-self-metrics.patch b/ports/prometheus-cpp/remove-handler-self-metrics.patch +new file mode 100644 +index 000000000..a26427035 +--- /dev/null ++++ b/ports/prometheus-cpp/remove-handler-self-metrics.patch +@@ -0,0 +1,86 @@ ++diff --git a/pull/src/handler.cc b/pull/src/handler.cc ++index cec37f3..d0b0bdf 100644 ++--- a/pull/src/handler.cc +++++ b/pull/src/handler.cc ++@@ -23,25 +23,25 @@ ++ namespace prometheus { ++ namespace detail { ++ ++-MetricsHandler::MetricsHandler(Registry& registry) ++- : bytes_transferred_family_( ++- BuildCounter() ++- .Name("exposer_transferred_bytes_total") ++- .Help("Transferred bytes to metrics services") ++- .Register(registry)), ++- bytes_transferred_(bytes_transferred_family_.Add({})), ++- num_scrapes_family_(BuildCounter() ++- .Name("exposer_scrapes_total") ++- .Help("Number of times metrics were scraped") ++- .Register(registry)), ++- num_scrapes_(num_scrapes_family_.Add({})), ++- request_latencies_family_( ++- BuildSummary() ++- .Name("exposer_request_latencies") ++- .Help("Latencies of serving scrape requests, in microseconds") ++- .Register(registry)), ++- request_latencies_(request_latencies_family_.Add( ++- {}, Summary::Quantiles{{0.5, 0.05}, {0.9, 0.01}, {0.99, 0.001}})) {} +++MetricsHandler::MetricsHandler(Registry& registry) {} +++ // : bytes_transferred_family_( +++ // BuildCounter() +++ // .Name("exposer_transferred_bytes_total") +++ // .Help("Transferred bytes to metrics services") +++ // .Register(registry)), +++ // bytes_transferred_(bytes_transferred_family_.Add({})), +++ // num_scrapes_family_(BuildCounter() +++ // .Name("exposer_scrapes_total") +++ // .Help("Number of times metrics were scraped") +++ // .Register(registry)), +++ // num_scrapes_(num_scrapes_family_.Add({})), +++ // request_latencies_family_( +++ // BuildSummary() +++ // .Name("exposer_request_latencies") +++ // .Help("Latencies of serving scrape requests, in microseconds") +++ // .Register(registry)), +++ // request_latencies_(request_latencies_family_.Add( +++ // {}, Summary::Quantiles{{0.5, 0.05}, {0.9, 0.01}, {0.99, 0.001}})) {} ++ ++ #ifdef HAVE_ZLIB ++ static bool IsEncodingAccepted(struct mg_connection* conn, ++@@ -158,10 +158,10 @@ bool MetricsHandler::handleGet(CivetServer*, struct mg_connection* conn) { ++ auto stop_time_of_request = std::chrono::steady_clock::now(); ++ auto duration = std::chrono::duration_cast( ++ stop_time_of_request - start_time_of_request); ++- request_latencies_.Observe(duration.count()); +++ // request_latencies_.Observe(duration.count()); ++ ++- bytes_transferred_.Increment(bodySize); ++- num_scrapes_.Increment(); +++ // bytes_transferred_.Increment(bodySize); +++ // num_scrapes_.Increment(); ++ return true; ++ } ++ ++diff --git a/pull/src/handler.h b/pull/src/handler.h ++index 10c90f9..94c433c 100644 ++--- a/pull/src/handler.h +++++ b/pull/src/handler.h ++@@ -28,12 +28,12 @@ class MetricsHandler : public CivetHandler { ++ ++ std::mutex collectables_mutex_; ++ std::vector> collectables_; ++- Family& bytes_transferred_family_; ++- Counter& bytes_transferred_; ++- Family& num_scrapes_family_; ++- Counter& num_scrapes_; ++- Family& request_latencies_family_; ++- Summary& request_latencies_; +++ // Family& bytes_transferred_family_; +++ // Counter& bytes_transferred_; +++ // Family& num_scrapes_family_; +++ // Counter& num_scrapes_; +++ // Family& request_latencies_family_; +++ // Summary& request_latencies_; ++ }; ++ } // namespace detail ++ } // namespace prometheus