Skip to content

Commit

Permalink
processor_metrics_selector: Implement native processor for metrics ma…
Browse files Browse the repository at this point in the history
…nipulations

* Prevent dangling pointer if cmt context is recreated on input_metric and processor.
* processor_labels: Follow change of the signature of metrics callback
* lib: Support setter for processor to make testing processors easily

Signed-off-by: Hiroshi Hatake <[email protected]>
  • Loading branch information
cosmo0920 authored and edsiper committed Mar 14, 2024
1 parent cac75dc commit a7b3d9b
Show file tree
Hide file tree
Showing 14 changed files with 1,127 additions and 11 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ option(FLB_FILTER_GEOIP2 "Enable geoip2 filter"
option(FLB_FILTER_NIGHTFALL "Enable Nightfall filter" Yes)
option(FLB_FILTER_WASM "Enable WASM filter" Yes)
option(FLB_PROCESSOR_LABELS "Enable metrics label manipulation processor" Yes)
option(FLB_PROCESSOR_ATTRIBUTES "Enable atributes manipulation processor" Yes)
option(FLB_PROCESSOR_METRICS_SELECTOR "Enable metrics selector processor" Yes)
option(FLB_PROCESSOR_CONTENT_MODIFIER "Enable content modifier processor" Yes)

if(DEFINED FLB_NIGHTLY_BUILD AND NOT "${FLB_NIGHTLY_BUILD}" STREQUAL "")
Expand Down
4 changes: 4 additions & 0 deletions include/fluent-bit/flb_lib.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,15 @@ struct flb_lib_out_cb {
/* For Fluent Bit library callers, we only export the following symbols */
typedef struct flb_lib_ctx flb_ctx_t;

struct flb_processor;

FLB_EXPORT void flb_init_env();
FLB_EXPORT flb_ctx_t *flb_create();
FLB_EXPORT void flb_destroy(flb_ctx_t *ctx);
FLB_EXPORT int flb_input(flb_ctx_t *ctx, const char *input, void *data);
FLB_EXPORT int flb_input_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc);
FLB_EXPORT int flb_output(flb_ctx_t *ctx, const char *output, struct flb_lib_out_cb *cb);
FLB_EXPORT int flb_output_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc);
FLB_EXPORT int flb_filter(flb_ctx_t *ctx, const char *filter, void *data);
FLB_EXPORT int flb_input_set(flb_ctx_t *ctx, int ffd, ...);
FLB_EXPORT int flb_input_property_check(flb_ctx_t *ctx, int ffd, char *key, char *val);
Expand Down
1 change: 1 addition & 0 deletions include/fluent-bit/flb_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include <cmetrics/cmt_encode_msgpack.h>
#include <cmetrics/cmt_encode_splunk_hec.h>
#include <cmetrics/cmt_encode_cloudwatch_emf.h>
#include <cmetrics/cmt_filter.h>

/* Metrics IDs for general purpose (used by core and Plugins */
#define FLB_METRIC_N_RECORDS 0
Expand Down
3 changes: 2 additions & 1 deletion include/fluent-bit/flb_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ struct flb_processor_plugin {
int);

int (*cb_process_metrics) (struct flb_processor_instance *,
struct cmt *,
struct cmt *, /* in */
struct cmt **, /* out */
const char *,
int);

Expand Down
1 change: 1 addition & 0 deletions plugins/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ REGISTER_IN_PLUGIN("in_random")
# PROCESSORS
# ==========
REGISTER_PROCESSOR_PLUGIN("processor_labels")
REGISTER_PROCESSOR_PLUGIN("processor_metrics_selector")
REGISTER_PROCESSOR_PLUGIN("processor_content_modifier")

# OUTPUTS
Expand Down
19 changes: 19 additions & 0 deletions plugins/processor_labels/labels.c
Original file line number Diff line number Diff line change
Expand Up @@ -1696,15 +1696,23 @@ static int hash_labels(struct cmt *metrics_context,

static int cb_process_metrics(struct flb_processor_instance *processor_instance,
struct cmt *metrics_context,
struct cmt **out_context,
const char *tag,
int tag_len)
{
struct cmt *out_cmt;
struct internal_processor_context *processor_context;
int result;

processor_context =
(struct internal_processor_context *) processor_instance->context;

out_cmt = cmt_create();
if (out_cmt == NULL) {
flb_plg_error(processor_instance, "could not create out_cmt context");
return FLB_PROCESSOR_FAILURE;
}

result = delete_labels(metrics_context,
&processor_context->delete_labels);

Expand All @@ -1728,6 +1736,17 @@ static int cb_process_metrics(struct flb_processor_instance *processor_instance,
&processor_context->hash_labels);
}

if (result == FLB_PROCESSOR_SUCCESS) {
result = cmt_cat(out_cmt, metrics_context);
if (result != 0) {
cmt_destroy(out_cmt);

return FLB_PROCESSOR_FAILURE;
}

*out_context = out_cmt;
}

if (result != FLB_PROCESSOR_SUCCESS) {
return FLB_PROCESSOR_FAILURE;
}
Expand Down
4 changes: 4 additions & 0 deletions plugins/processor_metrics_selector/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
set(src
selector.c)

FLB_PLUGIN(processor_metrics_selector "${src}" "")
Loading

0 comments on commit a7b3d9b

Please sign in to comment.