From a7b3d9b6f1da18d0014ebebecb19f4b3d0404000 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Thu, 22 Feb 2024 20:06:35 +0900 Subject: [PATCH] processor_metrics_selector: Implement native processor for metrics manipulations * Prevent dangling pointer if cmt context is recreated on input_metric and processor. * processor_labels: Follow change of the signature of metrics callback * lib: Support setter for processor to make testing processors easily Signed-off-by: Hiroshi Hatake --- CMakeLists.txt | 2 +- include/fluent-bit/flb_lib.h | 4 + include/fluent-bit/flb_metrics.h | 1 + include/fluent-bit/flb_processor.h | 3 +- plugins/CMakeLists.txt | 1 + plugins/processor_labels/labels.c | 19 + .../processor_metrics_selector/CMakeLists.txt | 4 + plugins/processor_metrics_selector/selector.c | 394 ++++++++++++ plugins/processor_metrics_selector/selector.h | 63 ++ src/flb_input_metric.c | 30 +- src/flb_lib.c | 36 ++ src/flb_processor.c | 13 +- tests/runtime/CMakeLists.txt | 3 + tests/runtime/processor_metrics_selector.c | 565 ++++++++++++++++++ 14 files changed, 1127 insertions(+), 11 deletions(-) create mode 100644 plugins/processor_metrics_selector/CMakeLists.txt create mode 100644 plugins/processor_metrics_selector/selector.c create mode 100644 plugins/processor_metrics_selector/selector.h create mode 100644 tests/runtime/processor_metrics_selector.c diff --git a/CMakeLists.txt b/CMakeLists.txt index 41f8bf1e20b..abde8425e2d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -283,7 +283,7 @@ option(FLB_FILTER_GEOIP2 "Enable geoip2 filter" option(FLB_FILTER_NIGHTFALL "Enable Nightfall filter" Yes) option(FLB_FILTER_WASM "Enable WASM filter" Yes) option(FLB_PROCESSOR_LABELS "Enable metrics label manipulation processor" Yes) -option(FLB_PROCESSOR_ATTRIBUTES "Enable atributes manipulation processor" Yes) +option(FLB_PROCESSOR_METRICS_SELECTOR "Enable metrics selector processor" Yes) option(FLB_PROCESSOR_CONTENT_MODIFIER "Enable content modifier processor" Yes) if(DEFINED FLB_NIGHTLY_BUILD AND NOT "${FLB_NIGHTLY_BUILD}" STREQUAL "") diff --git a/include/fluent-bit/flb_lib.h b/include/fluent-bit/flb_lib.h index 5e213c406f1..f96b06eff86 100644 --- a/include/fluent-bit/flb_lib.h +++ b/include/fluent-bit/flb_lib.h @@ -46,11 +46,15 @@ struct flb_lib_out_cb { /* For Fluent Bit library callers, we only export the following symbols */ typedef struct flb_lib_ctx flb_ctx_t; +struct flb_processor; + FLB_EXPORT void flb_init_env(); FLB_EXPORT flb_ctx_t *flb_create(); FLB_EXPORT void flb_destroy(flb_ctx_t *ctx); FLB_EXPORT int flb_input(flb_ctx_t *ctx, const char *input, void *data); +FLB_EXPORT int flb_input_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc); FLB_EXPORT int flb_output(flb_ctx_t *ctx, const char *output, struct flb_lib_out_cb *cb); +FLB_EXPORT int flb_output_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc); FLB_EXPORT int flb_filter(flb_ctx_t *ctx, const char *filter, void *data); FLB_EXPORT int flb_input_set(flb_ctx_t *ctx, int ffd, ...); FLB_EXPORT int flb_input_property_check(flb_ctx_t *ctx, int ffd, char *key, char *val); diff --git a/include/fluent-bit/flb_metrics.h b/include/fluent-bit/flb_metrics.h index ccd4694ebb1..25b0443224c 100644 --- a/include/fluent-bit/flb_metrics.h +++ b/include/fluent-bit/flb_metrics.h @@ -39,6 +39,7 @@ #include #include #include +#include /* Metrics IDs for general purpose (used by core and Plugins */ #define FLB_METRIC_N_RECORDS 0 diff --git a/include/fluent-bit/flb_processor.h b/include/fluent-bit/flb_processor.h index 1ad0d0ae813..84949664334 100644 --- a/include/fluent-bit/flb_processor.h +++ b/include/fluent-bit/flb_processor.h @@ -143,7 +143,8 @@ struct flb_processor_plugin { int); int (*cb_process_metrics) (struct flb_processor_instance *, - struct cmt *, + struct cmt *, /* in */ + struct cmt **, /* out */ const char *, int); diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt index 46295f14e5a..0f8090e7e09 100644 --- a/plugins/CMakeLists.txt +++ b/plugins/CMakeLists.txt @@ -282,6 +282,7 @@ REGISTER_IN_PLUGIN("in_random") # PROCESSORS # ========== REGISTER_PROCESSOR_PLUGIN("processor_labels") +REGISTER_PROCESSOR_PLUGIN("processor_metrics_selector") REGISTER_PROCESSOR_PLUGIN("processor_content_modifier") # OUTPUTS diff --git a/plugins/processor_labels/labels.c b/plugins/processor_labels/labels.c index c3fc929d027..f6e3b63f81c 100644 --- a/plugins/processor_labels/labels.c +++ b/plugins/processor_labels/labels.c @@ -1696,15 +1696,23 @@ static int hash_labels(struct cmt *metrics_context, static int cb_process_metrics(struct flb_processor_instance *processor_instance, struct cmt *metrics_context, + struct cmt **out_context, const char *tag, int tag_len) { + struct cmt *out_cmt; struct internal_processor_context *processor_context; int result; processor_context = (struct internal_processor_context *) processor_instance->context; + out_cmt = cmt_create(); + if (out_cmt == NULL) { + flb_plg_error(processor_instance, "could not create out_cmt context"); + return FLB_PROCESSOR_FAILURE; + } + result = delete_labels(metrics_context, &processor_context->delete_labels); @@ -1728,6 +1736,17 @@ static int cb_process_metrics(struct flb_processor_instance *processor_instance, &processor_context->hash_labels); } + if (result == FLB_PROCESSOR_SUCCESS) { + result = cmt_cat(out_cmt, metrics_context); + if (result != 0) { + cmt_destroy(out_cmt); + + return FLB_PROCESSOR_FAILURE; + } + + *out_context = out_cmt; + } + if (result != FLB_PROCESSOR_SUCCESS) { return FLB_PROCESSOR_FAILURE; } diff --git a/plugins/processor_metrics_selector/CMakeLists.txt b/plugins/processor_metrics_selector/CMakeLists.txt new file mode 100644 index 00000000000..d151570ea47 --- /dev/null +++ b/plugins/processor_metrics_selector/CMakeLists.txt @@ -0,0 +1,4 @@ +set(src + selector.c) + +FLB_PLUGIN(processor_metrics_selector "${src}" "") diff --git a/plugins/processor_metrics_selector/selector.c b/plugins/processor_metrics_selector/selector.c new file mode 100644 index 00000000000..1e30f905d7e --- /dev/null +++ b/plugins/processor_metrics_selector/selector.c @@ -0,0 +1,394 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "selector.h" + +static void delete_metrics_rules(struct selector_ctx *ctx) +{ + if (ctx->name_regex != NULL) { + flb_regex_destroy(ctx->name_regex); + } +} + +static void destroy_context(struct selector_ctx *context) +{ + if (context != NULL) { + delete_metrics_rules(context); + flb_free(context->selector_pattern); + flb_free(context); + } +} + +static int set_metrics_rules(struct selector_ctx *ctx, struct flb_processor_instance *p_ins) +{ + const char *action; + const char *metric_name; + const char *op_type; + const char *context; + size_t name_len = 0; + + action = flb_processor_instance_get_property("action", p_ins); + if (action == NULL) { + ctx->action_type = SELECTOR_INCLUDE; + } + else if (strncasecmp(action, "include", 7) == 0) { + flb_plg_debug(ctx->ins, "action type INCLUDE"); + ctx->action_type = SELECTOR_INCLUDE; + } + else if (strncasecmp(action, "exclude", 7) == 0) { + flb_plg_debug(ctx->ins, "action type EXCLUDE"); + ctx->action_type = SELECTOR_EXCLUDE; + } + else { + flb_plg_error(ctx->ins, "unknown action type '%s'", action); + return -1; + } + + metric_name = flb_processor_instance_get_property("metric_name", p_ins); + if (metric_name == NULL) { + flb_plg_error(ctx->ins, "metric_name is needed for selector"); + return -1; + } + ctx->selector_pattern = flb_strdup(metric_name); + name_len = strlen(metric_name); + + op_type = flb_processor_instance_get_property("operation_type", p_ins); + if (op_type == NULL) { + ctx->op_type = SELECTOR_OPERATION_PREFIX; + } + else if (strncasecmp(op_type, "prefix", 6) == 0) { + flb_plg_debug(ctx->ins, "operation type PREFIX"); + ctx->op_type = SELECTOR_OPERATION_PREFIX; + } + else if (strncasecmp(op_type, "substring", 9) == 0) { + flb_plg_debug(ctx->ins, "operation type SUBSTRING"); + ctx->op_type = SELECTOR_OPERATION_SUBSTRING; + } + else { + flb_plg_error(ctx->ins, "unknown action type '%s'", op_type); + return -1; + } + + if (ctx->selector_pattern[0] == '/' && ctx->selector_pattern[name_len-1] == '/') { + /* Convert string to regex pattern for metrics */ + ctx->name_regex = flb_regex_create(ctx->selector_pattern); + if (!ctx->name_regex) { + flb_plg_error(ctx->ins, "could not compile regex pattern '%s'", + ctx->selector_pattern); + return -1; + } + ctx->op_type = SELECTOR_OPERATION_REGEX; + } + + context = flb_processor_instance_get_property("context", p_ins); + if (context == NULL) { + ctx->context_type = SELECTOR_CONTEXT_FQNAME; + } + else if (strncasecmp(context, "metric_name", 11) == 0) { + ctx->context_type = SELECTOR_CONTEXT_FQNAME; + } + else { + flb_plg_error(ctx->ins, "unknown context '%s'", context); + delete_metrics_rules(ctx); + return -1; + } + + return 0; +} + +static struct selector_ctx * + create_context(struct flb_processor_instance *processor_instance, + struct flb_config *config) +{ + int result; + struct selector_ctx *ctx; + + ctx = flb_malloc(sizeof(struct selector_ctx)); + if (ctx != NULL) { + ctx->ins = processor_instance; + ctx->config = config; + ctx->name_regex = NULL; + + result = flb_processor_instance_config_map_set(processor_instance, (void *) ctx); + + if (result == 0) { + /* Load rules */ + result= set_metrics_rules(ctx, processor_instance); + if (result == -1) { + destroy_context(ctx); + ctx = NULL; + + return ctx; + } + } + + if (result != 0) { + destroy_context(ctx); + + ctx = NULL; + } + } + else { + flb_errno(); + } + + return ctx; +} + + +static int cb_selector_init(struct flb_processor_instance *processor_instance, + void *source_plugin_instance, + int source_plugin_type, + struct flb_config *config) +{ + /* Create context */ + processor_instance->context = (void *) create_context( + processor_instance, config); + + if (processor_instance->context == NULL) { + return FLB_PROCESSOR_FAILURE; + } + + return FLB_PROCESSOR_SUCCESS; +} + +#ifdef FLB_HAVE_METRICS +static int cmt_regex_match(void *ctx, const char *str, size_t slen) +{ + int ret; + struct flb_regex *r = (struct flb_regex *)ctx; + unsigned char *s = (unsigned char *)str; + ret = flb_regex_match(r, s, slen); + + if (ret == 1) { + ret = CMT_TRUE; + } + else { + ret = CMT_FALSE; + } + + return ret; +} + +static int cmt_regex_exclude(void *ctx, const char *str, size_t slen) +{ + int ret; + struct flb_regex *r = (struct flb_regex *)ctx; + unsigned char *s = (unsigned char *)str; + ret = flb_regex_match(r, s, slen); + + if (ret == 1) { + ret = CMT_FALSE; + } + else { + ret = CMT_TRUE; + } + + return ret; +} + +static inline int selector_metrics_process_fqname(struct cmt *cmt, struct cmt *out_cmt, + struct selector_ctx *ctx) +{ + int ret; + int found = FLB_FALSE; + struct cmt *filtered = NULL; + int flags = 0; + + /* On processor_selector, we only process one rule in each of contexts */ + + filtered = cmt_create(); + if (filtered == NULL) { + flb_plg_error(ctx->ins, "could not create filtered context"); + + return SELECTOR_FAILURE; + } + + if (ctx->op_type == SELECTOR_OPERATION_REGEX) { + if (ctx->action_type == SELECTOR_INCLUDE) { + ret = cmt_filter(filtered, cmt, NULL, NULL, ctx->name_regex, cmt_regex_match, 0); + } + else if (ctx->action_type == SELECTOR_EXCLUDE) { + ret = cmt_filter(filtered, cmt, NULL, NULL, ctx->name_regex, cmt_regex_exclude, 0); + } + } + else if (ctx->selector_pattern != NULL) { + if (ctx->action_type == SELECTOR_EXCLUDE) { + flags |= CMT_FILTER_EXCLUDE; + } + + if (ctx->op_type == SELECTOR_OPERATION_PREFIX) { + flags |= CMT_FILTER_PREFIX; + } + else if (ctx->op_type == SELECTOR_OPERATION_SUBSTRING) { + flags |= CMT_FILTER_SUBSTRING; + } + + ret = cmt_filter(filtered, cmt, ctx->selector_pattern, NULL, NULL, NULL, flags); + } + + if (ret == 0) { + found = FLB_TRUE; + } + else if (ret != 0) { + flb_plg_debug(ctx->ins, "not matched for rule = \"%s\"", ctx->selector_pattern); + } + + cmt_cat(out_cmt, filtered); + cmt_destroy(filtered); + + if (ctx->action_type == SELECTOR_INCLUDE) { + return found ? SELECTOR_RET_KEEP : SELECTOR_RET_EXCLUDE; + } + + /* The last rule is exclude */ + return found ? SELECTOR_RET_EXCLUDE : SELECTOR_RET_KEEP; +} + +/* Given a metrics context, do some select action based on the defined rules */ +static inline int selector_metrics(struct cmt *cmt, struct cmt *out_cmt, + struct selector_ctx *ctx) +{ + if (ctx->context_type == SELECTOR_CONTEXT_FQNAME) { + return selector_metrics_process_fqname(cmt, out_cmt, ctx); + } + + return 0; +} + +static int process_metrics(struct flb_processor_instance *processor_instance, + struct cmt *metrics_context, + struct cmt **out_context, + const char *tag, + int tag_len) +{ + int ret; + struct selector_ctx *ctx; + struct cmt *out_cmt; + + ctx = (struct selector_ctx *) processor_instance->context; + + out_cmt = cmt_create(); + if (out_cmt == NULL) { + flb_plg_error(processor_instance, "could not create out_cmt context"); + return SELECTOR_FAILURE; + } + + ret = selector_metrics(metrics_context, out_cmt, ctx); + + if (ret == SELECTOR_RET_KEEP || ret == SELECTOR_RET_EXCLUDE) { + ret = SELECTOR_SUCCESS; + *out_context = out_cmt; + } + else { + /* destroy out_context contexts */ + cmt_destroy(out_cmt); + + ret = SELECTOR_FAILURE; + } + + return ret; +} +#endif + +static int cb_selector_process_metrics(struct flb_processor_instance *processor_instance, + struct cmt *metrics_context, + struct cmt **out_context, + const char *tag, + int tag_len) +{ + int result = SELECTOR_SUCCESS; + +#ifdef FLB_HAVE_METRICS + result = process_metrics(processor_instance, + metrics_context, + out_context, + tag, tag_len); +#endif + + if (result != SELECTOR_SUCCESS) { + return FLB_PROCESSOR_FAILURE; + } + + return FLB_PROCESSOR_SUCCESS; +} + +static int cb_selector_exit(struct flb_processor_instance *processor_instance) +{ + if (processor_instance != NULL && + processor_instance->context != NULL) { + destroy_context(processor_instance->context); + } + + return FLB_PROCESSOR_SUCCESS; +} + +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_STR, "metric_name", NULL, + FLB_CONFIG_MAP_MULT, FLB_FALSE, 0, + "Keep metrics in which the metric of name matches with the actual name or the regular expression." + }, + { + FLB_CONFIG_MAP_STR, "context", NULL, + FLB_CONFIG_MAP_MULT, FLB_FALSE, 0, + "Specify matching context. Currently, metric_name is only supported." + }, + { + FLB_CONFIG_MAP_STR, "action", NULL, + 0, FLB_FALSE, 0, + "Specify the action for specified metrics. INCLUDE and EXCLUDE are allowed." + }, + { + FLB_CONFIG_MAP_STR, "operation_type", NULL, + 0, FLB_FALSE, 0, + "Specify the operation type of action for metrics payloads. PREFIX and SUBSTRING are allowed." + }, /* EOF */ + {0} +}; + +struct flb_processor_plugin processor_metrics_selector_plugin = { + .name = "metrics_selector", + .description = "select metrics by specified name", + .cb_init = cb_selector_init, + .cb_process_logs = NULL, + .cb_process_metrics = cb_selector_process_metrics, + .cb_process_traces = NULL, + .cb_exit = cb_selector_exit, + .config_map = config_map, + .flags = 0 +}; diff --git a/plugins/processor_metrics_selector/selector.h b/plugins/processor_metrics_selector/selector.h new file mode 100644 index 00000000000..502c34f9d83 --- /dev/null +++ b/plugins/processor_metrics_selector/selector.h @@ -0,0 +1,63 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_PROCESSOR_SELECTOR_H +#define FLB_PROCESSOR_SELECTOR_H + +#include +#include +#include +#include +#include + +/* rule types */ +#define SELECTOR_NO_RULE 0 +#define SELECTOR_INCLUDE 1 +#define SELECTOR_EXCLUDE 2 + +/* actions */ +#define SELECTOR_RET_KEEP 0 +#define SELECTOR_RET_EXCLUDE 1 + +#define SELECTOR_SUCCESS 0 +#define SELECTOR_NOTOUCH 1 +#define SELECTOR_FAILURE 2 + +#define SELECTOR_OPERATION_REGEX 0 +#define SELECTOR_OPERATION_PREFIX 1 +#define SELECTOR_OPERATION_SUBSTRING 2 + +/* context */ +#define SELECTOR_CONTEXT_FQNAME 0 +#define SELECTOR_CONTEXT_LABELS 1 +#define SELECTOR_CONTEXT_DESC 2 + +struct selector_ctx { + struct mk_list metrics_rules; + flb_sds_t action; + int action_type; + int op_type; + int context_type; + char *selector_pattern; + struct flb_regex *name_regex; + struct flb_processor_instance *ins; + struct flb_config *config; +}; + +#endif diff --git a/src/flb_input_metric.c b/src/flb_input_metric.c index 9c4fa478d6d..8b3652a85f9 100644 --- a/src/flb_input_metric.c +++ b/src/flb_input_metric.c @@ -32,6 +32,7 @@ static int input_metrics_append(struct flb_input_instance *ins, char *mt_buf; size_t mt_size; int processor_is_active; + struct cmt *out_context = NULL; processor_is_active = flb_processor_is_active(ins->processor); if (processor_is_active) { @@ -51,20 +52,35 @@ static int input_metrics_append(struct flb_input_instance *ins, FLB_PROCESSOR_METRICS, tag, tag_len, - (char *) cmt, - 0, NULL, NULL); + (char *) cmt, 0, + (void **)&out_context, NULL); if (ret == -1) { return -1; } } - /* Convert metrics to msgpack */ - ret = cmt_encode_msgpack_create(cmt, &mt_buf, &mt_size); - if (ret != 0) { - flb_plg_error(ins, "could not encode metrics"); - return -1; + if (out_context != NULL) { + /* Convert metrics to msgpack */ + ret = cmt_encode_msgpack_create(out_context, &mt_buf, &mt_size); + if (ret != 0) { + flb_plg_error(ins, "could not encode metrics"); + cmt_destroy(out_context); + + return -1; + } + + cmt_destroy(out_context); + } + else { + /* Convert metrics to msgpack */ + ret = cmt_encode_msgpack_create(cmt, &mt_buf, &mt_size); + if (ret != 0) { + flb_plg_error(ins, "could not encode metrics"); + return -1; + + } } /* Append packed metrics */ diff --git a/src/flb_lib.c b/src/flb_lib.c index b77f29b7a1d..1821a2e61ad 100644 --- a/src/flb_lib.c +++ b/src/flb_lib.c @@ -329,6 +329,24 @@ int flb_input_set(flb_ctx_t *ctx, int ffd, ...) return 0; } +int flb_input_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc) +{ + struct flb_input_instance *i_ins; + + i_ins = in_instance_get(ctx, ffd); + if (!i_ins) { + return -1; + } + + if (i_ins->processor) { + flb_processor_destroy(i_ins->processor); + } + + i_ins->processor = proc; + + return 0; +} + static inline int flb_config_map_property_check(char *plugin_name, struct mk_list *config_map, char *key, char *val) { struct flb_kv *kv; @@ -465,6 +483,24 @@ int flb_output_set(flb_ctx_t *ctx, int ffd, ...) return 0; } +int flb_output_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc) +{ + struct flb_output_instance *o_ins; + + o_ins = out_instance_get(ctx, ffd); + if (!o_ins) { + return -1; + } + + if (o_ins->processor) { + flb_processor_destroy(o_ins->processor); + } + + o_ins->processor = proc; + + return 0; +} + int flb_output_set_callback(flb_ctx_t *ctx, int ffd, char *name, void (*cb)(char *, void *, void *)) { diff --git a/src/flb_processor.c b/src/flb_processor.c index a7a6e2b20d9..8b9369eb5ea 100644 --- a/src/flb_processor.c +++ b/src/flb_processor.c @@ -430,9 +430,9 @@ int flb_processor_run(struct flb_processor *proc, { int ret; int finalize; - void *cur_buf; + void *cur_buf = NULL; size_t cur_size; - void *tmp_buf; + void *tmp_buf = NULL; size_t tmp_size; struct mk_list *head; struct mk_list *list = NULL; @@ -640,6 +640,7 @@ int flb_processor_run(struct flb_processor *proc, if (p_ins->p->cb_process_metrics != NULL) { ret = p_ins->p->cb_process_metrics(p_ins, (struct cmt *) cur_buf, + (struct cmt **) &tmp_buf, tag, tag_len); @@ -648,8 +649,16 @@ int flb_processor_run(struct flb_processor *proc, FLB_PROCESSOR_LOCK_RETRY_LIMIT, FLB_PROCESSOR_LOCK_RETRY_DELAY); + out_buf = NULL; + return -1; } + + if (cur_buf != data) { + cmt_destroy(cur_buf); + } + + cur_buf = (void *)tmp_buf; } } else if (type == FLB_PROCESSOR_TRACES) { diff --git a/tests/runtime/CMakeLists.txt b/tests/runtime/CMakeLists.txt index f673a5fbe87..b3fd7d18684 100644 --- a/tests/runtime/CMakeLists.txt +++ b/tests/runtime/CMakeLists.txt @@ -130,6 +130,9 @@ if (FLB_CUSTOM_CALYPTIA) FLB_RT_TEST(FLB_CUSTOM_CALYPTIA "custom_calyptia_test.c") endif() +if (FLB_PROCESSOR_METRICS_SELECTOR) + FLB_RT_TEST(FLB_PROCESSOR_METRICS_SELECTOR "processor_metrics_selector.c") +endif() # HTTP Client Debug (requires -DFLB_HTTP_CLIENT_DEBUG=On) if(FLB_HTTP_CLIENT_DEBUG) diff --git a/tests/runtime/processor_metrics_selector.c b/tests/runtime/processor_metrics_selector.c new file mode 100644 index 00000000000..f25e9751a58 --- /dev/null +++ b/tests/runtime/processor_metrics_selector.c @@ -0,0 +1,565 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +#include +#include +#include "flb_tests_runtime.h" + +pthread_mutex_t result_mutex = PTHREAD_MUTEX_INITIALIZER; +int num_output = 0; + +static int cb_count_metrics_msgpack(void *record, size_t size, void *data) +{ + int i; + int ret; + size_t off = 0; + cfl_sds_t text = NULL; + struct cmt *cmt = NULL; + char *p; + + if (!TEST_CHECK(data != NULL)) { + flb_error("data is NULL"); + } + + /* get cmetrics context */ + ret = cmt_decode_msgpack_create(&cmt, (char *) record, size, &off); + if (ret != 0) { + flb_error("could not process metrics payload"); + return -1; + } + + /* convert to text representation */ + text = cmt_encode_text_create(cmt); + /* To inspect the metrics from the callback, just comment out below: */ + /* flb_info("[filter_grep][test] text = %s", text); */ + for (i = 0; i < strlen(text); i++) { + p = (char *)(text + i); + if (*p == '\n') { + num_output++; + } + } + + if (record) { + flb_free(record); + } + + /* destroy cmt context */ + cmt_destroy(cmt); + + cmt_encode_text_destroy(text); + + return 0; +} + + +static void clear_output_num() +{ + pthread_mutex_lock(&result_mutex); + num_output = 0; + pthread_mutex_unlock(&result_mutex); +} + +static int get_output_num() +{ + int ret; + pthread_mutex_lock(&result_mutex); + ret = num_output; + pthread_mutex_unlock(&result_mutex); + + return ret; +} + +#ifdef FLB_HAVE_METRICS +void flb_test_selector_regex_include(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + struct flb_processor *proc; + struct flb_processor_unit *pu; + struct cfl_variant var = { + .type = CFL_VARIANT_STRING, + .data.as_string = "/storage/", + }; + struct cfl_variant action = { + .type = CFL_VARIANT_STRING, + .data.as_string = "include", + }; + int got; + int n_metrics = 12; + int not_used = 0; + struct flb_lib_out_cb cb_data; + + /* Prepare output callback with expected result */ + cb_data.cb = cb_count_metrics_msgpack; + cb_data.data = ¬_used; + + ctx = flb_create(); + flb_service_set(ctx, + "Flush", "0.200000000", + "Grace", "2", + NULL); + + proc = flb_processor_create(ctx->config, "unit_test", NULL, 0); + TEST_CHECK(proc != NULL); + + pu = flb_processor_unit_create(proc, FLB_PROCESSOR_METRICS, "metrics_selector"); + TEST_CHECK(pu != NULL); + ret = flb_processor_unit_set_property(pu, "metric_name", &var); + TEST_CHECK(ret == 0); + ret = flb_processor_unit_set_property(pu, "action", &action); + TEST_CHECK(ret == 0); + + + /* Input */ + in_ffd = flb_input(ctx, (char *) "fluentbit_metrics", NULL); + TEST_CHECK(in_ffd >= 0); + ret = flb_input_set(ctx, in_ffd, "tag", "test", NULL); + TEST_CHECK(ret == 0); + ret = flb_input_set(ctx, in_ffd, "scrape_on_start", "true", NULL); + TEST_CHECK(ret == 0); + ret = flb_input_set(ctx, in_ffd, "scrape_interval", "1", NULL); + TEST_CHECK(ret == 0); + + /* set up processor */ + ret = flb_input_set_processor(ctx, in_ffd, proc); + TEST_CHECK(ret == 0); + + out_ffd = flb_output(ctx, (char *) "lib", &cb_data); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "test", NULL); + + clear_output_num(); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + flb_time_msleep(1500); /* waiting flush */ + + got = get_output_num(); + if (!TEST_CHECK(got >= n_metrics)) { + TEST_MSG("expect: %d >= %d, got: %d < %d", got, n_metrics, got, n_metrics); + } + + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_selector_regex_exclude(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + struct flb_processor *proc; + struct flb_processor_unit *pu; + struct cfl_variant var = { + .type = CFL_VARIANT_STRING, + .data.as_string = "/input/", + }; + struct cfl_variant action = { + .type = CFL_VARIANT_STRING, + .data.as_string = "exclude", + }; + int got; + int n_metrics = 19; + int not_used = 0; + struct flb_lib_out_cb cb_data; + + /* Prepare output callback with expected result */ + cb_data.cb = cb_count_metrics_msgpack; + cb_data.data = ¬_used; + + ctx = flb_create(); + flb_service_set(ctx, + "Flush", "0.200000000", + "Grace", "2", + NULL); + + proc = flb_processor_create(ctx->config, "unit_test", NULL, 0); + TEST_CHECK(proc != NULL); + + pu = flb_processor_unit_create(proc, FLB_PROCESSOR_METRICS, "metrics_selector"); + TEST_CHECK(pu != NULL); + ret = flb_processor_unit_set_property(pu, "metric_name", &var); + TEST_CHECK(ret == 0); + ret = flb_processor_unit_set_property(pu, "action", &action); + TEST_CHECK(ret == 0); + + + /* Input */ + in_ffd = flb_input(ctx, (char *) "fluentbit_metrics", NULL); + TEST_CHECK(in_ffd >= 0); + ret = flb_input_set(ctx, in_ffd, "tag", "test", NULL); + TEST_CHECK(ret == 0); + ret = flb_input_set(ctx, in_ffd, "scrape_on_start", "true", NULL); + TEST_CHECK(ret == 0); + ret = flb_input_set(ctx, in_ffd, "scrape_interval", "1", NULL); + TEST_CHECK(ret == 0); + + /* set up processor */ + ret = flb_input_set_processor(ctx, in_ffd, proc); + TEST_CHECK(ret == 0); + + out_ffd = flb_output(ctx, (char *) "lib", &cb_data); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "test", NULL); + + clear_output_num(); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + flb_time_msleep(1500); /* waiting flush */ + + got = get_output_num(); + if (!TEST_CHECK(got >= n_metrics)) { + TEST_MSG("expect: %d >= %d, got: %d < %d", got, n_metrics, got, n_metrics); + } + + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_selector_prefix_include(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + struct flb_processor *proc; + struct flb_processor_unit *pu; + struct cfl_variant var = { + .type = CFL_VARIANT_STRING, + .data.as_string = "fluentbit_input", + }; + struct cfl_variant action = { + .type = CFL_VARIANT_STRING, + .data.as_string = "include", + }; + struct cfl_variant op_type = { + .type = CFL_VARIANT_STRING, + .data.as_string = "prefix", + }; + int got; + int n_metrics = 11; + int not_used = 0; + struct flb_lib_out_cb cb_data; + + /* Prepare output callback with expected result */ + cb_data.cb = cb_count_metrics_msgpack; + cb_data.data = ¬_used; + + ctx = flb_create(); + flb_service_set(ctx, + "Flush", "0.200000000", + "Grace", "2", + NULL); + + proc = flb_processor_create(ctx->config, "unit_test", NULL, 0); + TEST_CHECK(proc != NULL); + + pu = flb_processor_unit_create(proc, FLB_PROCESSOR_METRICS, "metrics_selector"); + TEST_CHECK(pu != NULL); + ret = flb_processor_unit_set_property(pu, "metric_name", &var); + TEST_CHECK(ret == 0); + ret = flb_processor_unit_set_property(pu, "action", &action); + TEST_CHECK(ret == 0); + ret = flb_processor_unit_set_property(pu, "operation_type", &op_type); + TEST_CHECK(ret == 0); + + + /* Input */ + in_ffd = flb_input(ctx, (char *) "fluentbit_metrics", NULL); + TEST_CHECK(in_ffd >= 0); + ret = flb_input_set(ctx, in_ffd, "tag", "test", NULL); + TEST_CHECK(ret == 0); + ret = flb_input_set(ctx, in_ffd, "scrape_on_start", "true", NULL); + TEST_CHECK(ret == 0); + ret = flb_input_set(ctx, in_ffd, "scrape_interval", "1", NULL); + TEST_CHECK(ret == 0); + + /* set up processor */ + ret = flb_input_set_processor(ctx, in_ffd, proc); + TEST_CHECK(ret == 0); + + out_ffd = flb_output(ctx, (char *) "lib", &cb_data); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "test", NULL); + + clear_output_num(); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + flb_time_msleep(1500); /* waiting flush */ + + got = get_output_num(); + if (!TEST_CHECK(got >= n_metrics)) { + TEST_MSG("expect: %d >= %d, got: %d < %d", got, n_metrics, got, n_metrics); + } + + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_selector_prefix_exclude(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + struct flb_processor *proc; + struct flb_processor_unit *pu; + struct cfl_variant var = { + .type = CFL_VARIANT_STRING, + .data.as_string = "fluentbit_storage", + }; + struct cfl_variant action = { + .type = CFL_VARIANT_STRING, + .data.as_string = "exclude", + }; + struct cfl_variant op_type = { + .type = CFL_VARIANT_STRING, + .data.as_string = "prefix", + }; + int got; + int n_metrics = 25; + int not_used = 0; + struct flb_lib_out_cb cb_data; + + /* Prepare output callback with expected result */ + cb_data.cb = cb_count_metrics_msgpack; + cb_data.data = ¬_used; + + ctx = flb_create(); + flb_service_set(ctx, + "Flush", "0.200000000", + "Grace", "2", + NULL); + + proc = flb_processor_create(ctx->config, "unit_test", NULL, 0); + TEST_CHECK(proc != NULL); + + pu = flb_processor_unit_create(proc, FLB_PROCESSOR_METRICS, "metrics_selector"); + TEST_CHECK(pu != NULL); + ret = flb_processor_unit_set_property(pu, "metric_name", &var); + TEST_CHECK(ret == 0); + ret = flb_processor_unit_set_property(pu, "action", &action); + TEST_CHECK(ret == 0); + ret = flb_processor_unit_set_property(pu, "operation_type", &op_type); + TEST_CHECK(ret == 0); + + + /* Input */ + in_ffd = flb_input(ctx, (char *) "fluentbit_metrics", NULL); + TEST_CHECK(in_ffd >= 0); + ret = flb_input_set(ctx, in_ffd, "tag", "test", NULL); + TEST_CHECK(ret == 0); + ret = flb_input_set(ctx, in_ffd, "scrape_on_start", "true", NULL); + TEST_CHECK(ret == 0); + ret = flb_input_set(ctx, in_ffd, "scrape_interval", "1", NULL); + TEST_CHECK(ret == 0); + + /* set up processor */ + ret = flb_input_set_processor(ctx, in_ffd, proc); + TEST_CHECK(ret == 0); + + out_ffd = flb_output(ctx, (char *) "lib", &cb_data); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "test", NULL); + + clear_output_num(); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + flb_time_msleep(1500); /* waiting flush */ + + got = get_output_num(); + if (!TEST_CHECK(got >= n_metrics)) { + TEST_MSG("expect: %d >= %d, got: %d < %d", got, n_metrics, got, n_metrics); + } + + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_selector_substring_include(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + struct flb_processor *proc; + struct flb_processor_unit *pu; + struct cfl_variant var = { + .type = CFL_VARIANT_STRING, + .data.as_string = "dropped", + }; + struct cfl_variant action = { + .type = CFL_VARIANT_STRING, + .data.as_string = "include", + }; + struct cfl_variant op_type = { + .type = CFL_VARIANT_STRING, + .data.as_string = "substring", + }; + int got; + int n_metrics = 1; + int not_used = 0; + struct flb_lib_out_cb cb_data; + + /* Prepare output callback with expected result */ + cb_data.cb = cb_count_metrics_msgpack; + cb_data.data = ¬_used; + + ctx = flb_create(); + flb_service_set(ctx, + "Flush", "0.200000000", + "Grace", "2", + NULL); + + proc = flb_processor_create(ctx->config, "unit_test", NULL, 0); + TEST_CHECK(proc != NULL); + + pu = flb_processor_unit_create(proc, FLB_PROCESSOR_METRICS, "metrics_selector"); + TEST_CHECK(pu != NULL); + ret = flb_processor_unit_set_property(pu, "metric_name", &var); + TEST_CHECK(ret == 0); + ret = flb_processor_unit_set_property(pu, "action", &action); + TEST_CHECK(ret == 0); + ret = flb_processor_unit_set_property(pu, "operation_type", &op_type); + TEST_CHECK(ret == 0); + + + /* Input */ + in_ffd = flb_input(ctx, (char *) "fluentbit_metrics", NULL); + TEST_CHECK(in_ffd >= 0); + ret = flb_input_set(ctx, in_ffd, "tag", "test", NULL); + TEST_CHECK(ret == 0); + ret = flb_input_set(ctx, in_ffd, "scrape_on_start", "true", NULL); + TEST_CHECK(ret == 0); + ret = flb_input_set(ctx, in_ffd, "scrape_interval", "1", NULL); + TEST_CHECK(ret == 0); + + /* set up processor */ + ret = flb_input_set_processor(ctx, in_ffd, proc); + TEST_CHECK(ret == 0); + + out_ffd = flb_output(ctx, (char *) "lib", &cb_data); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "test", NULL); + + clear_output_num(); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + flb_time_msleep(1500); /* waiting flush */ + + got = get_output_num(); + if (!TEST_CHECK(got >= n_metrics)) { + TEST_MSG("expect: %d >= %d, got: %d < %d", got, n_metrics, got, n_metrics); + } + + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_selector_substring_exclude(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + struct flb_processor *proc; + struct flb_processor_unit *pu; + struct cfl_variant var = { + .type = CFL_VARIANT_STRING, + .data.as_string = "connections", + }; + struct cfl_variant action = { + .type = CFL_VARIANT_STRING, + .data.as_string = "exclude", + }; + struct cfl_variant op_type = { + .type = CFL_VARIANT_STRING, + .data.as_string = "substring", + }; + int got; + int n_metrics = 28; + int not_used = 0; + struct flb_lib_out_cb cb_data; + + /* Prepare output callback with expected result */ + cb_data.cb = cb_count_metrics_msgpack; + cb_data.data = ¬_used; + + ctx = flb_create(); + flb_service_set(ctx, + "Flush", "0.200000000", + "Grace", "2", + NULL); + + proc = flb_processor_create(ctx->config, "unit_test", NULL, 0); + TEST_CHECK(proc != NULL); + + pu = flb_processor_unit_create(proc, FLB_PROCESSOR_METRICS, "metrics_selector"); + TEST_CHECK(pu != NULL); + ret = flb_processor_unit_set_property(pu, "metric_name", &var); + TEST_CHECK(ret == 0); + ret = flb_processor_unit_set_property(pu, "action", &action); + TEST_CHECK(ret == 0); + ret = flb_processor_unit_set_property(pu, "operation_type", &op_type); + TEST_CHECK(ret == 0); + + + /* Input */ + in_ffd = flb_input(ctx, (char *) "fluentbit_metrics", NULL); + TEST_CHECK(in_ffd >= 0); + ret = flb_input_set(ctx, in_ffd, "tag", "test", NULL); + TEST_CHECK(ret == 0); + ret = flb_input_set(ctx, in_ffd, "scrape_on_start", "true", NULL); + TEST_CHECK(ret == 0); + ret = flb_input_set(ctx, in_ffd, "scrape_interval", "1", NULL); + TEST_CHECK(ret == 0); + + /* set up processor */ + ret = flb_input_set_processor(ctx, in_ffd, proc); + TEST_CHECK(ret == 0); + + out_ffd = flb_output(ctx, (char *) "lib", &cb_data); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "test", NULL); + + clear_output_num(); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + flb_time_msleep(1500); /* waiting flush */ + + got = get_output_num(); + if (!TEST_CHECK(got >= n_metrics)) { + TEST_MSG("expect: %d >= %d, got: %d < %d", got, n_metrics, got, n_metrics); + } + + flb_stop(ctx); + flb_destroy(ctx); +} + +#endif + +/* Test list */ +TEST_LIST = { +#ifdef FLB_HAVE_METRICS + {"regex_include", flb_test_selector_regex_include}, + {"regex_exclude", flb_test_selector_regex_exclude}, + {"prefix_include", flb_test_selector_prefix_include}, + {"prefix_exclude", flb_test_selector_prefix_exclude}, + {"substring_include", flb_test_selector_substring_include}, + {"substring_exclude", flb_test_selector_substring_exclude}, +#endif + {NULL, NULL} +};