Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

send metrics to prometheus #387

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Language: Cpp
AlignAfterOpenBracket: false
BreakBeforeBraces: Custom
BraceWrapping: {
AfterCaseLabel: 'true'
#AfterCaseLabel: 'true'
AfterClass: 'true'
AfterControlStatement: 'true'
AfterEnum : 'true'
Expand Down
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,6 @@
[submodule "contrib/tiflash-proxy"]
path = contrib/tiflash-proxy
url = [email protected]:solotzg/tikv.git
[submodule "contrib/prometheus-cpp"]
path = contrib/prometheus-cpp
url = [email protected]:jupp0r/prometheus-cpp.git
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ include (cmake/find_llvm.cmake)
include (cmake/find_grpc.cmake)
include (cmake/find_kvproto.cmake)
include (cmake/find_tipb.cmake)
include (cmake/find_prometheus.cmake)


include (cmake/find_contrib_lib.cmake)
Expand Down
5 changes: 5 additions & 0 deletions cmake/find_prometheus.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Currently prometheus cpp should always use bundled library.

message(STATUS "Using prometheus666: ${ClickHouse_SOURCE_DIR}/contrib/prometheus-cpp")

set (PROMETHEUS_CPP_FOUND TRUE)
2 changes: 2 additions & 0 deletions contrib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,5 @@ if (USE_INTERNAL_POCO_LIBRARY)
target_include_directories(Crypto PUBLIC ${OPENSSL_INCLUDE_DIR})
endif ()
endif ()

add_subdirectory (prometheus-cpp)
1 change: 1 addition & 0 deletions contrib/prometheus-cpp
Submodule prometheus-cpp added at ca1f34
2 changes: 2 additions & 0 deletions dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ target_link_libraries (dbms
clickhouse_common_io
flash_service
kvproto
prometheus-cpp::core
prometheus-cpp::push
kv_client
tipb
${Protobuf_LIBRARIES}
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ add_library (clickhouse-server-lib
HTTPHandler.cpp
InterserverIOHTTPHandler.cpp
MetricsTransmitter.cpp
MetricsPrometheus.cpp
NotFoundHandler.cpp
PingRequestHandler.cpp
ReplicasStatusHandler.cpp
Expand Down
189 changes: 189 additions & 0 deletions dbms/src/Server/MetricsPrometheus.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
#include "MetricsPrometheus.h"

#include <Interpreters/AsynchronousMetrics.h>

#include <Common/CurrentMetrics.h>
#include <Common/Exception.h>
#include <Common/setThreadName.h>

#include <daemon/BaseDaemon.h>
#include <prometheus/gauge.h>


namespace DB
{
std::shared_ptr<prometheus::Registry> MetricsPrometheus::registry_instance_ptr = nullptr;

std::mutex MetricsPrometheus::registry_instance_mutex;

std::shared_ptr<prometheus::Registry> MetricsPrometheus::getRegistry()
{
if (registry_instance_ptr == nullptr)
{
std::lock_guard<std::mutex> lk(registry_instance_mutex);
if (registry_instance_ptr == nullptr)
{
registry_instance_ptr = std::make_shared<prometheus::Registry>();
}
}
return registry_instance_ptr;
}

MetricsPrometheus::MetricsPrometheus(Context & context_, const AsynchronousMetrics & async_metrics_)
: context(context_), async_metrics(async_metrics_), log(&Logger::get("Prometheus"))
{
registry = MetricsPrometheus::getRegistry();
metricsInterval = context.getConfigRef().getInt("status.metrics-interval", 15);

const std::string metricsAddr = context.getConfigRef().getString("status.metrics-addr", "");
if (0 == metricsAddr.compare(""))
{
metricsInterval = 0;
LOG_INFO(log, "Disable sending metrics to prometheus, cause status.metrics-addr is not set!");
}
else
{
auto pos = metricsAddr.find(':', 0);
auto host = metricsAddr.substr(0, pos);
auto port = metricsAddr.substr(pos + 1, metricsAddr.size());

auto serviceAddr = context.getConfigRef().getString("flash.service_addr");
std::string jobName = serviceAddr;
std::replace(jobName.begin(), jobName.end(), ':', '_');
std::replace(jobName.begin(), jobName.end(), '.', '_');
jobName = "tiflash_" + jobName;

char hostname[1024];
::gethostname(hostname, sizeof(hostname));

gateway = std::make_shared<prometheus::Gateway>(host, port, jobName, prometheus::Gateway::GetInstanceLabel(hostname));
gateway->RegisterCollectable(registry);

LOG_INFO(log, "Enable sending metrics to prometheus; interval =" << metricsInterval << "; addr = " << serviceAddr);
}
}

MetricsPrometheus::~MetricsPrometheus()
{
try
{
{
std::lock_guard<std::mutex> lock{mutex};
quit = true;
}

cond.notify_one();

thread.join();
}
catch (...)
{
DB::tryLogCurrentException(__PRETTY_FUNCTION__);
}
}

void MetricsPrometheus::run()
{
const std::string thread_name = "MetricsPrometheus " + std::to_string(metricsInterval) + "s";
setThreadName(thread_name.c_str());

const auto get_next_time = [](size_t seconds) {
/// To avoid time drift and transmit values exactly each interval:
/// next time aligned to system seconds
/// (60s -> every minute at 00 seconds, 5s -> every minute:[00, 05, 15 ... 55]s, 3600 -> every hour:00:00
return std::chrono::system_clock::time_point(
(std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()) / seconds) * seconds
+ std::chrono::seconds(seconds));
};

std::vector<ProfileEvents::Count> prev_counters(ProfileEvents::end());

std::unique_lock<std::mutex> lock{mutex};

while (true)
{
if (metricsInterval > 0 && registry != nullptr && gateway != nullptr)
break;

if (cond.wait_until(lock, get_next_time(5), [this] { return quit; }))
break;
}

while (true)
{
if (cond.wait_until(lock, get_next_time(metricsInterval), [this] { return quit; }))
break;

sendMetricsToPrometheus(prev_counters);
}
}

void MetricsPrometheus::sendMetricsToPrometheus(std::vector<ProfileEvents::Count> & prev_counters)
{
auto async_metrics_values = async_metrics.getValues();

GraphiteWriter::KeyValueVector<ssize_t> key_vals{};
key_vals.reserve(ProfileEvents::end() + CurrentMetrics::end() + async_metrics_values.size());

for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i)
{
const auto counter = ProfileEvents::counters[i].load(std::memory_order_relaxed);
const auto counter_increment = counter - prev_counters[i];
prev_counters[i] = counter;

std::string key{ProfileEvents::getDescription(static_cast<ProfileEvents::Event>(i))};
key_vals.emplace_back(profile_events_path_prefix + key, counter_increment);
}

for (size_t i = 0, end = CurrentMetrics::end(); i < end; ++i)
{
const auto value = CurrentMetrics::values[i].load(std::memory_order_relaxed);

std::string key{CurrentMetrics::getDescription(static_cast<CurrentMetrics::Metric>(i))};
key_vals.emplace_back(current_metrics_path_prefix + key, value);
}

for (const auto & name_value : async_metrics_values)
{
key_vals.emplace_back(asynchronous_metrics_path_prefix + name_value.first, name_value.second);
}

if (!key_vals.empty())
doSendMetricsToPrometheus(key_vals);
}

void MetricsPrometheus::doSendMetricsToPrometheus(const GraphiteWriter::KeyValueVector<ssize_t> & key_vals)
{
using namespace prometheus;

for (const auto & key_val : key_vals)
{
auto key = key_val.first;
auto it = gauge_map.find(key);
if (it != gauge_map.end())
{
auto & guage = it->second;
guage.Set(key_val.second);
}
else
{
auto & gauge_family = BuildGauge()
.Name(key_val.first)
.Help("Get from system.metrics, system.events and system.asynchronous_metrics tables")
.Register(*registry);

auto & guage = gauge_family.Add({});
guage.Set(key_val.second);

auto pair = std::pair<std::string, prometheus::Gauge &>(key, guage);
gauge_map.insert(pair);
}
}

auto returnCode = gateway->Push();
if (returnCode != 200)
{
LOG_WARNING(log, "Failed to push metrics to gateway, return code is " << returnCode);
}
}
} // namespace DB
62 changes: 62 additions & 0 deletions dbms/src/Server/MetricsPrometheus.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#pragma once

#include <Common/ProfileEvents.h>
#include <Interpreters/Context.h>
#include <daemon/GraphiteWriter.h>
#include <prometheus/gateway.h>
#include <prometheus/gauge.h>
#include <prometheus/registry.h>
#include <condition_variable>
#include <mutex>
#include <string>
#include <thread>
#include <vector>


namespace DB
{

class AsynchronousMetrics;
class Context;


/** Automatically sends
* - difference of ProfileEvents;
* - values of CurrentMetrics;
* - values of AsynchronousMetrics;
* to Prometheus
*/
class MetricsPrometheus
{
public:
static std::shared_ptr<prometheus::Registry> getRegistry();

MetricsPrometheus(Context & context_, const AsynchronousMetrics & async_metrics_);
~MetricsPrometheus();

private:
static std::shared_ptr<prometheus::Registry> registry_instance_ptr;
static std::mutex registry_instance_mutex;

static constexpr auto profile_events_path_prefix = "tiflash_system_profile_events_";
static constexpr auto current_metrics_path_prefix = "tiflash_system_metrics_";
static constexpr auto asynchronous_metrics_path_prefix = "tiflash_system_asynchronous_metrics_";

void run();
void sendMetricsToPrometheus(std::vector<ProfileEvents::Count> & prev_counters);
void doSendMetricsToPrometheus(const GraphiteWriter::KeyValueVector<ssize_t> & key_vals);

Context & context;
const AsynchronousMetrics & async_metrics;
bool quit = false;
std::mutex mutex;
std::condition_variable cond;
std::thread thread{&MetricsPrometheus::run, this};
std::shared_ptr<prometheus::Gateway> gateway;
std::shared_ptr<prometheus::Registry> registry;
std::map<std::string, prometheus::Gauge &> gauge_map;
int metricsInterval;
Logger * log;
};

} // namespace DB
3 changes: 3 additions & 0 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "ClusterManagerService.h"
#include "HTTPHandlerFactory.h"
#include "MetricsTransmitter.h"
#include "MetricsPrometheus.h"
#include "StatusFile.h"
#include "TCPHandlerFactory.h"

Expand Down Expand Up @@ -785,6 +786,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
metrics_transmitters.emplace_back(std::make_unique<MetricsTransmitter>(*global_context, async_metrics, graphite_key));
}

auto metrics_prometheus = std::make_unique<MetricsPrometheus>(*global_context, async_metrics);

SessionCleaner session_cleaner(*global_context);
ClusterManagerService cluster_manager_service(*global_context, config_path);

Expand Down
1 change: 1 addition & 0 deletions docker/builder/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ RUN apt update -y \
build-essential autoconf libtool pkg-config \
libgflags-dev libgtest-dev \
golang curl python3-pip \
libcurl4-openssl-dev \
# For tests: # bash expect python python-lxml python-termcolor curl perl sudo tzdata
&& curl https://sh.rustup.rs -sSf | sh -s -- -y --profile minimal --default-toolchain nightly \
&& pip3 install pybind11 pyinstaller dnspython uri requests urllib3 toml setuptools \
Expand Down