diff --git a/.clang-format b/.clang-format index 5e988172ed8..dbf4adc1e5c 100644 --- a/.clang-format +++ b/.clang-format @@ -4,7 +4,7 @@ Language: Cpp AlignAfterOpenBracket: false BreakBeforeBraces: Custom BraceWrapping: { - AfterCaseLabel: 'true' + #AfterCaseLabel: 'true' AfterClass: 'true' AfterControlStatement: 'true' AfterEnum : 'true' diff --git a/.gitmodules b/.gitmodules index ae7760da6b8..c3a44966d43 100644 --- a/.gitmodules +++ b/.gitmodules @@ -47,3 +47,6 @@ [submodule "contrib/tiflash-proxy"] path = contrib/tiflash-proxy url = git@github.com:solotzg/tikv.git +[submodule "contrib/prometheus-cpp"] + path = contrib/prometheus-cpp + url = git@github.com:jupp0r/prometheus-cpp.git diff --git a/CMakeLists.txt b/CMakeLists.txt index f41f8373150..df39135c75b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -273,6 +273,7 @@ include (cmake/find_llvm.cmake) include (cmake/find_grpc.cmake) include (cmake/find_kvproto.cmake) include (cmake/find_tipb.cmake) +include (cmake/find_prometheus.cmake) include (cmake/find_contrib_lib.cmake) diff --git a/cmake/find_prometheus.cmake b/cmake/find_prometheus.cmake new file mode 100644 index 00000000000..24f3a76e231 --- /dev/null +++ b/cmake/find_prometheus.cmake @@ -0,0 +1,5 @@ +# Currently prometheus cpp should always use bundled library. + +message(STATUS "Using prometheus666: ${ClickHouse_SOURCE_DIR}/contrib/prometheus-cpp") + +set (PROMETHEUS_CPP_FOUND TRUE) diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 8f12967777b..74d1d29d439 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -161,3 +161,5 @@ if (USE_INTERNAL_POCO_LIBRARY) target_include_directories(Crypto PUBLIC ${OPENSSL_INCLUDE_DIR}) endif () endif () + +add_subdirectory (prometheus-cpp) diff --git a/contrib/prometheus-cpp b/contrib/prometheus-cpp new file mode 160000 index 00000000000..ca1f3463e74 --- /dev/null +++ b/contrib/prometheus-cpp @@ -0,0 +1 @@ +Subproject commit ca1f3463e74d957d1cccddd4a1a29e3e5d34bd83 diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index 018cf6ae0d6..cd76e41e650 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -152,6 +152,8 @@ target_link_libraries (dbms clickhouse_common_io flash_service kvproto + prometheus-cpp::core + prometheus-cpp::push kv_client tipb ${Protobuf_LIBRARIES} diff --git a/dbms/src/Server/CMakeLists.txt b/dbms/src/Server/CMakeLists.txt index 5903d78b9d9..10cafe7fce3 100644 --- a/dbms/src/Server/CMakeLists.txt +++ b/dbms/src/Server/CMakeLists.txt @@ -18,6 +18,7 @@ add_library (clickhouse-server-lib HTTPHandler.cpp InterserverIOHTTPHandler.cpp MetricsTransmitter.cpp + MetricsPrometheus.cpp NotFoundHandler.cpp PingRequestHandler.cpp ReplicasStatusHandler.cpp diff --git a/dbms/src/Server/MetricsPrometheus.cpp b/dbms/src/Server/MetricsPrometheus.cpp new file mode 100644 index 00000000000..05f23cfc68b --- /dev/null +++ b/dbms/src/Server/MetricsPrometheus.cpp @@ -0,0 +1,189 @@ +#include "MetricsPrometheus.h" + +#include + +#include +#include +#include + +#include +#include + + +namespace DB +{ +std::shared_ptr MetricsPrometheus::registry_instance_ptr = nullptr; + +std::mutex MetricsPrometheus::registry_instance_mutex; + +std::shared_ptr MetricsPrometheus::getRegistry() +{ + if (registry_instance_ptr == nullptr) + { + std::lock_guard lk(registry_instance_mutex); + if (registry_instance_ptr == nullptr) + { + registry_instance_ptr = std::make_shared(); + } + } + return registry_instance_ptr; +} + +MetricsPrometheus::MetricsPrometheus(Context & context_, const AsynchronousMetrics & async_metrics_) + : context(context_), async_metrics(async_metrics_), log(&Logger::get("Prometheus")) +{ + registry = MetricsPrometheus::getRegistry(); + metricsInterval = context.getConfigRef().getInt("status.metrics-interval", 15); + + const std::string metricsAddr = context.getConfigRef().getString("status.metrics-addr", ""); + if (0 == metricsAddr.compare("")) + { + metricsInterval = 0; + LOG_INFO(log, "Disable sending metrics to prometheus, cause status.metrics-addr is not set!"); + } + else + { + auto pos = metricsAddr.find(':', 0); + auto host = metricsAddr.substr(0, pos); + auto port = metricsAddr.substr(pos + 1, metricsAddr.size()); + + auto serviceAddr = context.getConfigRef().getString("flash.service_addr"); + std::string jobName = serviceAddr; + std::replace(jobName.begin(), jobName.end(), ':', '_'); + std::replace(jobName.begin(), jobName.end(), '.', '_'); + jobName = "tiflash_" + jobName; + + char hostname[1024]; + ::gethostname(hostname, sizeof(hostname)); + + gateway = std::make_shared(host, port, jobName, prometheus::Gateway::GetInstanceLabel(hostname)); + gateway->RegisterCollectable(registry); + + LOG_INFO(log, "Enable sending metrics to prometheus; interval =" << metricsInterval << "; addr = " << serviceAddr); + } +} + +MetricsPrometheus::~MetricsPrometheus() +{ + try + { + { + std::lock_guard lock{mutex}; + quit = true; + } + + cond.notify_one(); + + thread.join(); + } + catch (...) + { + DB::tryLogCurrentException(__PRETTY_FUNCTION__); + } +} + +void MetricsPrometheus::run() +{ + const std::string thread_name = "MetricsPrometheus " + std::to_string(metricsInterval) + "s"; + setThreadName(thread_name.c_str()); + + const auto get_next_time = [](size_t seconds) { + /// To avoid time drift and transmit values exactly each interval: + /// next time aligned to system seconds + /// (60s -> every minute at 00 seconds, 5s -> every minute:[00, 05, 15 ... 55]s, 3600 -> every hour:00:00 + return std::chrono::system_clock::time_point( + (std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()) / seconds) * seconds + + std::chrono::seconds(seconds)); + }; + + std::vector prev_counters(ProfileEvents::end()); + + std::unique_lock lock{mutex}; + + while (true) + { + if (metricsInterval > 0 && registry != nullptr && gateway != nullptr) + break; + + if (cond.wait_until(lock, get_next_time(5), [this] { return quit; })) + break; + } + + while (true) + { + if (cond.wait_until(lock, get_next_time(metricsInterval), [this] { return quit; })) + break; + + sendMetricsToPrometheus(prev_counters); + } +} + +void MetricsPrometheus::sendMetricsToPrometheus(std::vector & prev_counters) +{ + auto async_metrics_values = async_metrics.getValues(); + + GraphiteWriter::KeyValueVector key_vals{}; + key_vals.reserve(ProfileEvents::end() + CurrentMetrics::end() + async_metrics_values.size()); + + for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i) + { + const auto counter = ProfileEvents::counters[i].load(std::memory_order_relaxed); + const auto counter_increment = counter - prev_counters[i]; + prev_counters[i] = counter; + + std::string key{ProfileEvents::getDescription(static_cast(i))}; + key_vals.emplace_back(profile_events_path_prefix + key, counter_increment); + } + + for (size_t i = 0, end = CurrentMetrics::end(); i < end; ++i) + { + const auto value = CurrentMetrics::values[i].load(std::memory_order_relaxed); + + std::string key{CurrentMetrics::getDescription(static_cast(i))}; + key_vals.emplace_back(current_metrics_path_prefix + key, value); + } + + for (const auto & name_value : async_metrics_values) + { + key_vals.emplace_back(asynchronous_metrics_path_prefix + name_value.first, name_value.second); + } + + if (!key_vals.empty()) + doSendMetricsToPrometheus(key_vals); +} + +void MetricsPrometheus::doSendMetricsToPrometheus(const GraphiteWriter::KeyValueVector & key_vals) +{ + using namespace prometheus; + + for (const auto & key_val : key_vals) + { + auto key = key_val.first; + auto it = gauge_map.find(key); + if (it != gauge_map.end()) + { + auto & guage = it->second; + guage.Set(key_val.second); + } + else + { + auto & gauge_family = BuildGauge() + .Name(key_val.first) + .Help("Get from system.metrics, system.events and system.asynchronous_metrics tables") + .Register(*registry); + + auto & guage = gauge_family.Add({}); + guage.Set(key_val.second); + + auto pair = std::pair(key, guage); + gauge_map.insert(pair); + } + } + + auto returnCode = gateway->Push(); + if (returnCode != 200) + { + LOG_WARNING(log, "Failed to push metrics to gateway, return code is " << returnCode); + } +} +} // namespace DB diff --git a/dbms/src/Server/MetricsPrometheus.h b/dbms/src/Server/MetricsPrometheus.h new file mode 100644 index 00000000000..84e1f0c4c88 --- /dev/null +++ b/dbms/src/Server/MetricsPrometheus.h @@ -0,0 +1,62 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + +class AsynchronousMetrics; +class Context; + + +/** Automatically sends + * - difference of ProfileEvents; + * - values of CurrentMetrics; + * - values of AsynchronousMetrics; + * to Prometheus + */ +class MetricsPrometheus +{ +public: + static std::shared_ptr getRegistry(); + + MetricsPrometheus(Context & context_, const AsynchronousMetrics & async_metrics_); + ~MetricsPrometheus(); + +private: + static std::shared_ptr registry_instance_ptr; + static std::mutex registry_instance_mutex; + + static constexpr auto profile_events_path_prefix = "tiflash_system_profile_events_"; + static constexpr auto current_metrics_path_prefix = "tiflash_system_metrics_"; + static constexpr auto asynchronous_metrics_path_prefix = "tiflash_system_asynchronous_metrics_"; + + void run(); + void sendMetricsToPrometheus(std::vector & prev_counters); + void doSendMetricsToPrometheus(const GraphiteWriter::KeyValueVector & key_vals); + + Context & context; + const AsynchronousMetrics & async_metrics; + bool quit = false; + std::mutex mutex; + std::condition_variable cond; + std::thread thread{&MetricsPrometheus::run, this}; + std::shared_ptr gateway; + std::shared_ptr registry; + std::map gauge_map; + int metricsInterval; + Logger * log; +}; + +} // namespace DB diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index aaf053ec007..051ad47cad4 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -39,6 +39,7 @@ #include "ClusterManagerService.h" #include "HTTPHandlerFactory.h" #include "MetricsTransmitter.h" +#include "MetricsPrometheus.h" #include "StatusFile.h" #include "TCPHandlerFactory.h" @@ -785,6 +786,8 @@ int Server::main(const std::vector & /*args*/) metrics_transmitters.emplace_back(std::make_unique(*global_context, async_metrics, graphite_key)); } + auto metrics_prometheus = std::make_unique(*global_context, async_metrics); + SessionCleaner session_cleaner(*global_context); ClusterManagerService cluster_manager_service(*global_context, config_path); diff --git a/docker/builder/Dockerfile b/docker/builder/Dockerfile index 0f801e124c7..399648d53f5 100644 --- a/docker/builder/Dockerfile +++ b/docker/builder/Dockerfile @@ -10,6 +10,7 @@ RUN apt update -y \ build-essential autoconf libtool pkg-config \ libgflags-dev libgtest-dev \ golang curl python3-pip \ + libcurl4-openssl-dev \ # For tests: # bash expect python python-lxml python-termcolor curl perl sudo tzdata && curl https://sh.rustup.rs -sSf | sh -s -- -y --profile minimal --default-toolchain nightly \ && pip3 install pybind11 pyinstaller dnspython uri requests urllib3 toml setuptools \