Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upstream: add connection metrics #7713

Merged
merged 3 commits into from
Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions include/fluent-bit/flb_output.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
#include <fluent-bit/flb_processor.h>

#include <cmetrics/cmetrics.h>
#include <cmetrics/cmt_gauge.h>
#include <cmetrics/cmt_counter.h>
#include <cmetrics/cmt_decode_msgpack.h>
#include <cmetrics/cmt_encode_msgpack.h>
Expand Down Expand Up @@ -360,6 +361,11 @@ struct flb_output_instance {
struct cmt_counter *cmt_dropped_records; /* m: output_dropped_records */
struct cmt_counter *cmt_retried_records; /* m: output_retried_records */

/* m: output_upstream_total_connections */
struct cmt_gauge *cmt_upstream_total_connections;
/* m: output_upstream_busy_connections */
struct cmt_gauge *cmt_upstream_busy_connections;

/* OLD Metrics API */
#ifdef FLB_HAVE_METRICS
struct flb_metrics *metrics; /* metrics */
Expand Down
22 changes: 22 additions & 0 deletions include/fluent-bit/flb_upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
#include <fluent-bit/flb_upstream_queue.h>
#include <fluent-bit/flb_stream.h>

#include <cmetrics/cmetrics.h>
#include <cmetrics/cmt_gauge.h>

/*
* Upstream creation FLAGS set by Fluent Bit sub-components
* ========================================================
Expand Down Expand Up @@ -63,6 +66,11 @@ struct flb_upstream {
int ha_mode;
void *ha_ctx;

struct cmt_gauge *cmt_total_connections;
struct cmt_gauge *cmt_busy_connections;
const char *cmt_total_connections_label;
const char *cmt_busy_connections_label;

/*
* If the connections will be in separate threads, this flag is
* enabled and all lists management are protected through mutexes.
Expand Down Expand Up @@ -98,4 +106,18 @@ void flb_upstream_thread_safe(struct flb_upstream *u);
struct mk_list *flb_upstream_get_config_map(struct flb_config *config);
int flb_upstream_needs_proxy(const char *host, const char *proxy, const char *no_proxy);

void flb_upstream_set_total_connections_label(
struct flb_upstream *stream,
const char *label_value);
void flb_upstream_set_total_connections_gauge(
struct flb_upstream *stream,
struct cmt_gauge *gauge_instance);

void flb_upstream_set_busy_connections_label(
struct flb_upstream *stream,
const char *label_value);
void flb_upstream_set_busy_connections_gauge(
struct flb_upstream *stream,
struct cmt_gauge *gauge_instance);

#endif
36 changes: 36 additions & 0 deletions src/flb_output.c
Original file line number Diff line number Diff line change
Expand Up @@ -1176,6 +1176,30 @@ int flb_output_init_all(struct flb_config *config)
1, (char *[]) {"name"});
cmt_counter_set(ins->cmt_retried_records, ts, 0, 1, (char *[]) {name});

/* output_upstream_total_connections */
ins->cmt_upstream_total_connections = cmt_gauge_create(ins->cmt,
"fluentbit",
"output",
"upstream_total_connections",
"Total Connection count.",
1, (char *[]) {"name"});
cmt_gauge_set(ins->cmt_upstream_total_connections,
ts,
0,
1, (char *[]) {name});

/* output_upstream_total_connections */
ins->cmt_upstream_busy_connections = cmt_gauge_create(ins->cmt,
"fluentbit",
"output",
"upstream_busy_connections",
"Busy Connection count.",
1, (char *[]) {"name"});
cmt_gauge_set(ins->cmt_upstream_busy_connections,
ts,
0,
1, (char *[]) {name});

/* old API */
ins->metrics = flb_metrics_create(name);
if (ins->metrics) {
Expand Down Expand Up @@ -1365,6 +1389,18 @@ int flb_output_upstream_set(struct flb_upstream *u, struct flb_output_instance *
/* Set flags */
flb_stream_enable_flags(&u->base, flags);

flb_upstream_set_total_connections_label(u,
flb_output_name(ins));

flb_upstream_set_total_connections_gauge(u,
ins->cmt_upstream_total_connections);

flb_upstream_set_busy_connections_label(u,
flb_output_name(ins));

flb_upstream_set_busy_connections_gauge(u,
ins->cmt_upstream_busy_connections);

/*
* If the output plugin flush callbacks will run in multiple threads, enable
* the thread safe mode for the Upstream context.
Expand Down
153 changes: 153 additions & 0 deletions src/flb_upstream.c
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,18 @@ struct flb_config_map upstream_net[] = {
int flb_upstream_needs_proxy(const char *host, const char *proxy,
const char *no_proxy);

static void flb_upstream_increment_busy_connections_count(
struct flb_upstream *stream);

static void flb_upstream_decrement_busy_connections_count(
struct flb_upstream *stream);

static void flb_upstream_increment_total_connections_count(
struct flb_upstream *stream);

static void flb_upstream_decrement_total_connections_count(
struct flb_upstream *stream);

/* Enable thread-safe mode for upstream connection */
void flb_upstream_thread_safe(struct flb_upstream *u)
{
Expand Down Expand Up @@ -463,6 +475,8 @@ static int prepare_destroy_conn(struct flb_connection *u_conn)
/* Add node to destroy queue */
mk_list_add(&u_conn->_head, &uq->destroy_queue);

flb_upstream_decrement_total_connections_count(u);

/*
* note: the connection context is destroyed by the engine once all events
* have been processed.
Expand Down Expand Up @@ -531,6 +545,8 @@ static struct flb_connection *create_conn(struct flb_upstream *u)
uq = flb_upstream_queue_get(u);
mk_list_add(&conn->_head, &uq->busy_queue);

flb_upstream_increment_total_connections_count(u);

flb_stream_release_lock(&u->base);

flb_connection_reset_connection_timeout(conn);
Expand Down Expand Up @@ -730,6 +746,7 @@ struct flb_connection *flb_upstream_conn_get(struct flb_upstream *u)

if (conn != NULL) {
flb_connection_reset_io_timeout(conn);
flb_upstream_increment_busy_connections_count(u);
}

return conn;
Expand Down Expand Up @@ -757,6 +774,8 @@ int flb_upstream_conn_release(struct flb_connection *conn)
struct flb_upstream *u = conn->upstream;
struct flb_upstream_queue *uq;

flb_upstream_decrement_busy_connections_count(u);

uq = flb_upstream_queue_get(u);

/* If this is a valid KA connection just recycle */
Expand Down Expand Up @@ -897,6 +916,8 @@ int flb_upstream_conn_timeouts(struct mk_list *list)
u_conn->event.mask,
FLB_TRUE);
}

flb_upstream_decrement_busy_connections_count(u);
}
}

Expand Down Expand Up @@ -994,3 +1015,135 @@ int flb_upstream_is_async(struct flb_upstream *u)
{
return flb_stream_is_async(&u->base);
}

void flb_upstream_set_total_connections_label(
struct flb_upstream *stream,
const char *label_value)
{
stream->cmt_total_connections_label = label_value;
}

void flb_upstream_set_total_connections_gauge(
struct flb_upstream *stream,
struct cmt_gauge *gauge_instance)
{
stream->cmt_total_connections = gauge_instance;
}

static void flb_upstream_increment_total_connections_count(
struct flb_upstream *stream)
{
if (stream->parent_upstream != NULL) {
stream = (struct flb_upstream *) stream->parent_upstream;

flb_upstream_increment_total_connections_count(stream);
}
if (stream->cmt_total_connections != NULL) {
if (stream->cmt_total_connections_label != NULL) {
cmt_gauge_inc(
stream->cmt_total_connections,
cfl_time_now(),
1,
(char *[]) {
(char *) stream->cmt_total_connections_label
});
}
else {
cmt_gauge_inc(stream->cmt_total_connections,
cfl_time_now(),
0, NULL);
}
}
}

static void flb_upstream_decrement_total_connections_count(
struct flb_upstream *stream)
{
if (stream->parent_upstream != NULL) {
stream = (struct flb_upstream *) stream->parent_upstream;

flb_upstream_decrement_total_connections_count(stream);
}
else if (stream->cmt_total_connections != NULL) {
if (stream->cmt_total_connections_label != NULL) {
cmt_gauge_dec(
stream->cmt_total_connections,
cfl_time_now(),
1,
(char *[]) {
(char *) stream->cmt_total_connections_label
});
}
else {
cmt_gauge_dec(stream->cmt_total_connections,
cfl_time_now(),
0, NULL);
}
}
}

void flb_upstream_set_busy_connections_label(
struct flb_upstream *stream,
const char *label_value)
{
stream->cmt_busy_connections_label = label_value;
}

void flb_upstream_set_busy_connections_gauge(
struct flb_upstream *stream,
struct cmt_gauge *gauge_instance)
{
stream->cmt_busy_connections = gauge_instance;
}

static void flb_upstream_increment_busy_connections_count(
struct flb_upstream *stream)
{
if (stream->parent_upstream != NULL) {
stream = (struct flb_upstream *) stream->parent_upstream;

flb_upstream_increment_busy_connections_count(stream);
}
else if (stream->cmt_busy_connections != NULL) {
if (stream->cmt_busy_connections_label != NULL) {
cmt_gauge_inc(
stream->cmt_busy_connections,
cfl_time_now(),
1,
(char *[]) {
(char *) stream->cmt_busy_connections_label
});
}
else {
cmt_gauge_inc(stream->cmt_busy_connections,
cfl_time_now(),
0, NULL);
}
}
}

static void flb_upstream_decrement_busy_connections_count(
struct flb_upstream *stream)
{
if (stream->parent_upstream != NULL) {
stream = (struct flb_upstream *) stream->parent_upstream;

flb_upstream_decrement_busy_connections_count(stream);
}
else if (stream->cmt_busy_connections != NULL) {
if (stream->cmt_busy_connections_label != NULL) {
cmt_gauge_dec(
stream->cmt_busy_connections,
cfl_time_now(),
1,
(char *[]) {
(char *) stream->cmt_busy_connections_label
});
}
else {
cmt_gauge_dec(stream->cmt_busy_connections,
cfl_time_now(),
0, NULL);
}
}
}