From ccf20e599273873303265ca7be5036f8eca743e9 Mon Sep 17 00:00:00 2001 From: Richard Treu Date: Tue, 20 Aug 2024 10:16:30 +0200 Subject: [PATCH] filter_log_to_metrics: Add timer callback for emitting metrics This commit will change the log_to_metrics filter to use a timer based metric inject and not directly inject metrics on every incoming log record anymore. This will lower the overall load and memory consumption especially in high-volume and high-cardinality situations. Signed-off-by: Richard Treu --- .../filter_log_to_metrics/log_to_metrics.c | 102 ++++++++++++++++-- .../filter_log_to_metrics/log_to_metrics.h | 10 +- 2 files changed, 104 insertions(+), 8 deletions(-) diff --git a/plugins/filter_log_to_metrics/log_to_metrics.c b/plugins/filter_log_to_metrics/log_to_metrics.c index e6c173f6794..f14fb57b5bc 100644 --- a/plugins/filter_log_to_metrics/log_to_metrics.c +++ b/plugins/filter_log_to_metrics/log_to_metrics.c @@ -450,6 +450,35 @@ static int fill_labels(struct log_to_metrics_ctx *ctx, char **label_values, return label_counter; } +/* Timer callback to inject metrics into the pipeline */ +static void cb_send_metric_chunk(struct flb_config *config, void *data) +{ + int ret; + struct log_to_metrics_ctx *ctx = data; + + /* Check that metric context is not empty */ + if (ctx->cmt == NULL || ctx->input_ins == NULL) { + return; + } + + if (ctx->new_data == FLB_TRUE) { + ret = flb_input_metrics_append(ctx->input_ins, ctx->tag, + strlen(ctx->tag), ctx->cmt); + if (ret != 0) { + flb_plg_error(ctx->ins, "could not append metrics"); + } + } + + /* Check if we are shutting down. If so, stop our timer */ + if (config->is_shutting_down) { + if(ctx->timer && ctx->timer->active) { + flb_plg_debug(ctx->ins, "Stopping callback timer"); + flb_sched_timer_cb_disable(ctx->timer); + } + } + ctx->new_data = FLB_FALSE; +} + static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins, struct flb_config *config, void *data) { @@ -462,6 +491,7 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins, char metric_subsystem[MAX_METRIC_LENGTH]; char value_field[MAX_METRIC_LENGTH]; struct flb_input_instance *input_ins; + struct flb_sched *sched; int i; /* Create context */ @@ -729,6 +759,43 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins, } ctx->input_ins = input_ins; + + if (ctx->interval_sec <= 0) { + ctx->interval_sec = atoi(DEFAULT_INTERVAL_SEC); + } + if (ctx->interval_nsec <= 0) { + ctx->interval_nsec = atoi(DEFAULT_INTERVAL_NSEC); + } + if (ctx->interval_sec == 0 && ctx->interval_nsec == 0) { + flb_plg_debug("Interval is set to 0, will not use timer and " + "send metrics immediately"); + ctx->timer_mode = FLB_FALSE; + return 0; + } + + /* Initialize timer for scheduled metric updates */ + sched = flb_sched_ctx_get(); + if(sched == 0) { + flb_plg_error(f_ins, "could not get scheduler context"); + log_to_metrics_destroy(ctx); + return -1; + } + //Convert interval_sec and interval_nsec to milliseconds + ctx->timer_interval = (ctx->interval_sec * 1000) + + (ctx->interval_nsec / 1000000); + flb_plg_debug(ctx->ins, + "Creating metric timer with frequency %d ms", + ctx->timer_interval); + + ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM, + ctx->timer_interval, cb_send_metric_chunk, + ctx, &ctx->timer); + if (ret < 0) { + flb_plg_error(f_ins, "could not create timer callback"); + log_to_metrics_destroy(ctx); + return -1; + } + ctx->timer_mode = FLB_TRUE; return 0; } @@ -920,9 +987,17 @@ static int cb_log_to_metrics_filter(const void *data, size_t bytes, return -1; } - ret = flb_input_metrics_append(ctx->input_ins, ctx->tag, strlen(ctx->tag), ctx->cmt); - if (ret != 0) { - flb_plg_error(ctx->ins, "could not append metrics"); + if (ctx->timer_mode == FLB_FALSE) { + ret = flb_input_metrics_append(ctx->input_ins, ctx->tag, + strlen(ctx->tag), ctx->cmt); + + if (ret != 0) { + flb_plg_error(ctx->ins, "could not append metrics. " + "Please consider to use interval_sec and interval_nsec"); + } + } + else { + ctx->new_data = FLB_TRUE; } /* Cleanup */ @@ -941,6 +1016,7 @@ static int cb_log_to_metrics_filter(const void *data, size_t bytes, } } + if (ctx->discard_logs) { *out_buf = NULL; *out_size = 0; @@ -958,7 +1034,10 @@ static int cb_log_to_metrics_filter(const void *data, size_t bytes, static int cb_log_to_metrics_exit(void *data, struct flb_config *config) { struct log_to_metrics_ctx *ctx = data; - + if(ctx->timer != NULL) { + flb_plg_debug(ctx->ins, "Destroying callback timer"); + flb_sched_timer_destroy(ctx->timer); + } return log_to_metrics_destroy(ctx); } @@ -1037,13 +1116,24 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct log_to_metrics_ctx, emitter_name), "Name of the emitter (advanced users)" }, - { FLB_CONFIG_MAP_SIZE, "emitter_mem_buf_limit", FLB_MEM_BUF_LIMIT_DEFAULT, 0, FLB_TRUE, offsetof(struct log_to_metrics_ctx, emitter_mem_buf_limit), "set a buffer limit to restrict memory usage of metrics emitter" }, - + { + FLB_CONFIG_MAP_INT, "interval_sec", DEFAULT_INTERVAL_SEC, + 0, FLB_TRUE, offsetof(struct log_to_metrics_ctx, interval_sec), + "Set the timer interval for metrics emission. If interval_sec and " + "interval_nsec are set to 0, the timer is disabled (default)." + }, + { + FLB_CONFIG_MAP_INT, "interval_nsec", DEFAULT_INTERVAL_NSEC, + 0, FLB_TRUE, offsetof(struct log_to_metrics_ctx, interval_nsec), + "Set the timer interval (subseconds) for metrics emission. " + "If interval_sec and interval_nsec are set to 0, the timer is disabled " + "(default)." + }, { FLB_CONFIG_MAP_BOOL, "discard_logs", "false", 0, FLB_TRUE, offsetof(struct log_to_metrics_ctx, discard_logs), diff --git a/plugins/filter_log_to_metrics/log_to_metrics.h b/plugins/filter_log_to_metrics/log_to_metrics.h index 77ad06ece5e..0e8fc57aafd 100644 --- a/plugins/filter_log_to_metrics/log_to_metrics.h +++ b/plugins/filter_log_to_metrics/log_to_metrics.h @@ -50,13 +50,13 @@ #define FLB_MEM_BUF_LIMIT_DEFAULT "10M" #define DEFAULT_LOG_TO_METRICS_NAMESPACE "log_metric" - +#define DEFAULT_INTERVAL_SEC "0" +#define DEFAULT_INTERVAL_NSEC "0" struct log_to_metrics_ctx { struct mk_list rules; struct flb_filter_instance *ins; struct cmt *cmt; - struct flb_input_instance *input_ins; char **label_keys; @@ -83,6 +83,12 @@ struct log_to_metrics_ctx { flb_sds_t tag; flb_sds_t emitter_name; size_t emitter_mem_buf_limit; + int interval_sec; + int interval_nsec; + int timer_interval; + int timer_mode; + struct flb_sched_timer *timer; + int new_data; }; struct grep_rule