From d192332967245761bf41cb26ba5e214e94292870 Mon Sep 17 00:00:00 2001
From: marsishandsome <marsishandsome@gmail.com>
Date: Fri, 10 Jan 2020 15:15:02 +0800
Subject: [PATCH] add prometheus

---
 .clang-format                         |   2 +-
 .gitmodules                           |   3 +
 CMakeLists.txt                        |   1 +
 cmake/find_prometheus.cmake           |   5 +
 contrib/CMakeLists.txt                |   2 +
 contrib/prometheus-cpp                |   1 +
 dbms/CMakeLists.txt                   |   2 +
 dbms/src/Server/CMakeLists.txt        |   1 +
 dbms/src/Server/MetricsPrometheus.cpp | 189 ++++++++++++++++++++++++++
 dbms/src/Server/MetricsPrometheus.h   |  62 +++++++++
 dbms/src/Server/Server.cpp            |   3 +
 docker/builder/Dockerfile             |   1 +
 12 files changed, 271 insertions(+), 1 deletion(-)
 create mode 100644 cmake/find_prometheus.cmake
 create mode 160000 contrib/prometheus-cpp
 create mode 100644 dbms/src/Server/MetricsPrometheus.cpp
 create mode 100644 dbms/src/Server/MetricsPrometheus.h

diff --git a/.clang-format b/.clang-format
index 5e988172ed8..dbf4adc1e5c 100644
--- a/.clang-format
+++ b/.clang-format
@@ -4,7 +4,7 @@ Language:        Cpp
 AlignAfterOpenBracket: false
 BreakBeforeBraces: Custom
 BraceWrapping: {
-    AfterCaseLabel: 'true'
+    #AfterCaseLabel: 'true'
     AfterClass: 'true'
     AfterControlStatement: 'true'
     AfterEnum : 'true'
diff --git a/.gitmodules b/.gitmodules
index ae7760da6b8..c3a44966d43 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -47,3 +47,6 @@
 [submodule "contrib/tiflash-proxy"]
 	path = contrib/tiflash-proxy
 	url = git@github.com:solotzg/tikv.git
+[submodule "contrib/prometheus-cpp"]
+	path = contrib/prometheus-cpp
+	url = git@github.com:jupp0r/prometheus-cpp.git
diff --git a/CMakeLists.txt b/CMakeLists.txt
index f41f8373150..df39135c75b 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -273,6 +273,7 @@ include (cmake/find_llvm.cmake)
 include (cmake/find_grpc.cmake)
 include (cmake/find_kvproto.cmake)
 include (cmake/find_tipb.cmake)
+include (cmake/find_prometheus.cmake)
 
 
 include (cmake/find_contrib_lib.cmake)
diff --git a/cmake/find_prometheus.cmake b/cmake/find_prometheus.cmake
new file mode 100644
index 00000000000..24f3a76e231
--- /dev/null
+++ b/cmake/find_prometheus.cmake
@@ -0,0 +1,5 @@
+# Currently prometheus cpp should always use bundled library.
+
+message(STATUS "Using prometheus666: ${ClickHouse_SOURCE_DIR}/contrib/prometheus-cpp")
+
+set (PROMETHEUS_CPP_FOUND TRUE)
diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt
index 8f12967777b..74d1d29d439 100644
--- a/contrib/CMakeLists.txt
+++ b/contrib/CMakeLists.txt
@@ -161,3 +161,5 @@ if (USE_INTERNAL_POCO_LIBRARY)
         target_include_directories(Crypto PUBLIC ${OPENSSL_INCLUDE_DIR})
     endif ()
 endif ()
+
+add_subdirectory (prometheus-cpp)
diff --git a/contrib/prometheus-cpp b/contrib/prometheus-cpp
new file mode 160000
index 00000000000..ca1f3463e74
--- /dev/null
+++ b/contrib/prometheus-cpp
@@ -0,0 +1 @@
+Subproject commit ca1f3463e74d957d1cccddd4a1a29e3e5d34bd83
diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt
index 018cf6ae0d6..cd76e41e650 100644
--- a/dbms/CMakeLists.txt
+++ b/dbms/CMakeLists.txt
@@ -152,6 +152,8 @@ target_link_libraries (dbms
     clickhouse_common_io
     flash_service
     kvproto
+    prometheus-cpp::core
+    prometheus-cpp::push
     kv_client
     tipb
     ${Protobuf_LIBRARIES}
diff --git a/dbms/src/Server/CMakeLists.txt b/dbms/src/Server/CMakeLists.txt
index 5903d78b9d9..10cafe7fce3 100644
--- a/dbms/src/Server/CMakeLists.txt
+++ b/dbms/src/Server/CMakeLists.txt
@@ -18,6 +18,7 @@ add_library (clickhouse-server-lib
     HTTPHandler.cpp
     InterserverIOHTTPHandler.cpp
     MetricsTransmitter.cpp
+    MetricsPrometheus.cpp
     NotFoundHandler.cpp
     PingRequestHandler.cpp
     ReplicasStatusHandler.cpp
diff --git a/dbms/src/Server/MetricsPrometheus.cpp b/dbms/src/Server/MetricsPrometheus.cpp
new file mode 100644
index 00000000000..05f23cfc68b
--- /dev/null
+++ b/dbms/src/Server/MetricsPrometheus.cpp
@@ -0,0 +1,189 @@
+#include "MetricsPrometheus.h"
+
+#include <Interpreters/AsynchronousMetrics.h>
+
+#include <Common/CurrentMetrics.h>
+#include <Common/Exception.h>
+#include <Common/setThreadName.h>
+
+#include <daemon/BaseDaemon.h>
+#include <prometheus/gauge.h>
+
+
+namespace DB
+{
+std::shared_ptr<prometheus::Registry> MetricsPrometheus::registry_instance_ptr = nullptr;
+
+std::mutex MetricsPrometheus::registry_instance_mutex;
+
+std::shared_ptr<prometheus::Registry> MetricsPrometheus::getRegistry()
+{
+    if (registry_instance_ptr == nullptr)
+    {
+        std::lock_guard<std::mutex> lk(registry_instance_mutex);
+        if (registry_instance_ptr == nullptr)
+        {
+            registry_instance_ptr = std::make_shared<prometheus::Registry>();
+        }
+    }
+    return registry_instance_ptr;
+}
+
+MetricsPrometheus::MetricsPrometheus(Context & context_, const AsynchronousMetrics & async_metrics_)
+    : context(context_), async_metrics(async_metrics_), log(&Logger::get("Prometheus"))
+{
+    registry = MetricsPrometheus::getRegistry();
+    metricsInterval = context.getConfigRef().getInt("status.metrics-interval", 15);
+
+    const std::string metricsAddr = context.getConfigRef().getString("status.metrics-addr", "");
+    if (0 == metricsAddr.compare(""))
+    {
+        metricsInterval = 0;
+        LOG_INFO(log, "Disable sending metrics to prometheus, cause status.metrics-addr is not set!");
+    }
+    else
+    {
+        auto pos = metricsAddr.find(':', 0);
+        auto host = metricsAddr.substr(0, pos);
+        auto port = metricsAddr.substr(pos + 1, metricsAddr.size());
+
+        auto serviceAddr = context.getConfigRef().getString("flash.service_addr");
+        std::string jobName = serviceAddr;
+        std::replace(jobName.begin(), jobName.end(), ':', '_');
+        std::replace(jobName.begin(), jobName.end(), '.', '_');
+        jobName = "tiflash_" + jobName;
+
+        char hostname[1024];
+        ::gethostname(hostname, sizeof(hostname));
+
+        gateway = std::make_shared<prometheus::Gateway>(host, port, jobName, prometheus::Gateway::GetInstanceLabel(hostname));
+        gateway->RegisterCollectable(registry);
+
+        LOG_INFO(log, "Enable sending metrics to prometheus; interval =" << metricsInterval << "; addr = " << serviceAddr);
+    }
+}
+
+MetricsPrometheus::~MetricsPrometheus()
+{
+    try
+    {
+        {
+            std::lock_guard<std::mutex> lock{mutex};
+            quit = true;
+        }
+
+        cond.notify_one();
+
+        thread.join();
+    }
+    catch (...)
+    {
+        DB::tryLogCurrentException(__PRETTY_FUNCTION__);
+    }
+}
+
+void MetricsPrometheus::run()
+{
+    const std::string thread_name = "MetricsPrometheus " + std::to_string(metricsInterval) + "s";
+    setThreadName(thread_name.c_str());
+
+    const auto get_next_time = [](size_t seconds) {
+        /// To avoid time drift and transmit values exactly each interval:
+        ///  next time aligned to system seconds
+        /// (60s -> every minute at 00 seconds, 5s -> every minute:[00, 05, 15 ... 55]s, 3600 -> every hour:00:00
+        return std::chrono::system_clock::time_point(
+            (std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()) / seconds) * seconds
+            + std::chrono::seconds(seconds));
+    };
+
+    std::vector<ProfileEvents::Count> prev_counters(ProfileEvents::end());
+
+    std::unique_lock<std::mutex> lock{mutex};
+
+    while (true)
+    {
+        if (metricsInterval > 0 && registry != nullptr && gateway != nullptr)
+            break;
+
+        if (cond.wait_until(lock, get_next_time(5), [this] { return quit; }))
+            break;
+    }
+
+    while (true)
+    {
+        if (cond.wait_until(lock, get_next_time(metricsInterval), [this] { return quit; }))
+            break;
+
+        sendMetricsToPrometheus(prev_counters);
+    }
+}
+
+void MetricsPrometheus::sendMetricsToPrometheus(std::vector<ProfileEvents::Count> & prev_counters)
+{
+    auto async_metrics_values = async_metrics.getValues();
+
+    GraphiteWriter::KeyValueVector<ssize_t> key_vals{};
+    key_vals.reserve(ProfileEvents::end() + CurrentMetrics::end() + async_metrics_values.size());
+
+    for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i)
+    {
+        const auto counter = ProfileEvents::counters[i].load(std::memory_order_relaxed);
+        const auto counter_increment = counter - prev_counters[i];
+        prev_counters[i] = counter;
+
+        std::string key{ProfileEvents::getDescription(static_cast<ProfileEvents::Event>(i))};
+        key_vals.emplace_back(profile_events_path_prefix + key, counter_increment);
+    }
+
+    for (size_t i = 0, end = CurrentMetrics::end(); i < end; ++i)
+    {
+        const auto value = CurrentMetrics::values[i].load(std::memory_order_relaxed);
+
+        std::string key{CurrentMetrics::getDescription(static_cast<CurrentMetrics::Metric>(i))};
+        key_vals.emplace_back(current_metrics_path_prefix + key, value);
+    }
+
+    for (const auto & name_value : async_metrics_values)
+    {
+        key_vals.emplace_back(asynchronous_metrics_path_prefix + name_value.first, name_value.second);
+    }
+
+    if (!key_vals.empty())
+        doSendMetricsToPrometheus(key_vals);
+}
+
+void MetricsPrometheus::doSendMetricsToPrometheus(const GraphiteWriter::KeyValueVector<ssize_t> & key_vals)
+{
+    using namespace prometheus;
+
+    for (const auto & key_val : key_vals)
+    {
+        auto key = key_val.first;
+        auto it = gauge_map.find(key);
+        if (it != gauge_map.end())
+        {
+            auto & guage = it->second;
+            guage.Set(key_val.second);
+        }
+        else
+        {
+            auto & gauge_family = BuildGauge()
+                                      .Name(key_val.first)
+                                      .Help("Get from system.metrics, system.events and system.asynchronous_metrics tables")
+                                      .Register(*registry);
+
+            auto & guage = gauge_family.Add({});
+            guage.Set(key_val.second);
+
+            auto pair = std::pair<std::string, prometheus::Gauge &>(key, guage);
+            gauge_map.insert(pair);
+        }
+    }
+
+    auto returnCode = gateway->Push();
+    if (returnCode != 200)
+    {
+        LOG_WARNING(log, "Failed to push metrics to gateway, return code is " << returnCode);
+    }
+}
+} // namespace DB
diff --git a/dbms/src/Server/MetricsPrometheus.h b/dbms/src/Server/MetricsPrometheus.h
new file mode 100644
index 00000000000..84e1f0c4c88
--- /dev/null
+++ b/dbms/src/Server/MetricsPrometheus.h
@@ -0,0 +1,62 @@
+#pragma once
+
+#include <Common/ProfileEvents.h>
+#include <Interpreters/Context.h>
+#include <daemon/GraphiteWriter.h>
+#include <prometheus/gateway.h>
+#include <prometheus/gauge.h>
+#include <prometheus/registry.h>
+#include <condition_variable>
+#include <mutex>
+#include <string>
+#include <thread>
+#include <vector>
+
+
+namespace DB
+{
+
+class AsynchronousMetrics;
+class Context;
+
+
+/**    Automatically sends
+  * - difference of ProfileEvents;
+  * - values of CurrentMetrics;
+  * - values of AsynchronousMetrics;
+  *  to Prometheus
+  */
+class MetricsPrometheus
+{
+public:
+    static std::shared_ptr<prometheus::Registry> getRegistry();
+
+    MetricsPrometheus(Context & context_, const AsynchronousMetrics & async_metrics_);
+    ~MetricsPrometheus();
+
+private:
+    static std::shared_ptr<prometheus::Registry> registry_instance_ptr;
+    static std::mutex registry_instance_mutex;
+
+    static constexpr auto profile_events_path_prefix = "tiflash_system_profile_events_";
+    static constexpr auto current_metrics_path_prefix = "tiflash_system_metrics_";
+    static constexpr auto asynchronous_metrics_path_prefix = "tiflash_system_asynchronous_metrics_";
+
+    void run();
+    void sendMetricsToPrometheus(std::vector<ProfileEvents::Count> & prev_counters);
+    void doSendMetricsToPrometheus(const GraphiteWriter::KeyValueVector<ssize_t> & key_vals);
+
+    Context & context;
+    const AsynchronousMetrics & async_metrics;
+    bool quit = false;
+    std::mutex mutex;
+    std::condition_variable cond;
+    std::thread thread{&MetricsPrometheus::run, this};
+    std::shared_ptr<prometheus::Gateway> gateway;
+    std::shared_ptr<prometheus::Registry> registry;
+    std::map<std::string, prometheus::Gauge &> gauge_map;
+    int metricsInterval;
+    Logger * log;
+};
+
+} // namespace DB
diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp
index aaf053ec007..051ad47cad4 100644
--- a/dbms/src/Server/Server.cpp
+++ b/dbms/src/Server/Server.cpp
@@ -39,6 +39,7 @@
 #include "ClusterManagerService.h"
 #include "HTTPHandlerFactory.h"
 #include "MetricsTransmitter.h"
+#include "MetricsPrometheus.h"
 #include "StatusFile.h"
 #include "TCPHandlerFactory.h"
 
@@ -785,6 +786,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
             metrics_transmitters.emplace_back(std::make_unique<MetricsTransmitter>(*global_context, async_metrics, graphite_key));
         }
 
+        auto metrics_prometheus = std::make_unique<MetricsPrometheus>(*global_context, async_metrics);
+
         SessionCleaner session_cleaner(*global_context);
         ClusterManagerService cluster_manager_service(*global_context, config_path);
 
diff --git a/docker/builder/Dockerfile b/docker/builder/Dockerfile
index 0f801e124c7..399648d53f5 100644
--- a/docker/builder/Dockerfile
+++ b/docker/builder/Dockerfile
@@ -10,6 +10,7 @@ RUN apt update -y \
         build-essential autoconf libtool pkg-config \
         libgflags-dev libgtest-dev \
         golang curl python3-pip \
+        libcurl4-openssl-dev \
         # For tests: # bash expect python python-lxml python-termcolor curl perl sudo tzdata
  && curl https://sh.rustup.rs -sSf | sh -s -- -y --profile minimal --default-toolchain nightly \
  && pip3 install pybind11 pyinstaller dnspython uri requests urllib3 toml setuptools \