Skip to content

Commit

Permalink
processor_content_modifier: add traces support through processor attr…
Browse files Browse the repository at this point in the history
…ibutes

Signed-off-by: Eduardo Silva <[email protected]>
  • Loading branch information
edsiper committed Mar 12, 2024
1 parent e0dec43 commit 830545a
Show file tree
Hide file tree
Showing 6 changed files with 752 additions and 40 deletions.
1 change: 1 addition & 0 deletions plugins/processor_content_modifier/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
set(src
cm_config.c
cm_logs.c
cm_traces.c
cm.c
)

Expand Down
23 changes: 21 additions & 2 deletions plugins/processor_content_modifier/cm.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,27 @@ static int cb_process_logs(struct flb_processor_instance *ins,
if (!ins->context) {
return FLB_PROCESSOR_FAILURE;
}
ctx = ins->context;

ret = cm_logs_process(ins, ctx, chunk_cobj, tag, tag_len);
return ret;

}

static int cb_process_traces(struct flb_processor_instance *ins,
struct ctrace *traces_context,
const char *tag,
int tag_len)
{
int ret;
struct content_modifier_ctx *ctx;

if (!ins->context) {
return FLB_PROCESSOR_FAILURE;
}
ctx = ins->context;

ret = cm_logs_process(ins, ins->context, chunk_cobj, tag, tag_len);
ret = cm_traces_process(ins, ctx, traces_context, tag, tag_len);
return ret;

}
Expand Down Expand Up @@ -125,7 +144,7 @@ struct flb_processor_plugin processor_content_modifier_plugin = {
.cb_init = cb_init,
.cb_process_logs = cb_process_logs,
.cb_process_metrics = NULL,
.cb_process_traces = NULL,
.cb_process_traces = cb_process_traces,
.cb_exit = cb_exit,
.config_map = config_map,
.flags = 0
Expand Down
5 changes: 5 additions & 0 deletions plugins/processor_content_modifier/cm.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,5 +106,10 @@ int cm_logs_process(struct flb_processor_instance *ins,
const char *tag,
int tag_len);

int cm_traces_process(struct flb_processor_instance *ins,
struct content_modifier_ctx *ctx,
struct ctrace *traces_context,
const char *tag, int tag_len);


#endif
9 changes: 1 addition & 8 deletions plugins/processor_content_modifier/cm_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ static int set_context(struct content_modifier_ctx *ctx)
/* if no context is set, use span attributes */
context = CM_CONTEXT_TRACE_SPAN_ATTRIBUTES;
}
if (strcasecmp(ctx->context_str, "span_name") == 0) {
else if (strcasecmp(ctx->context_str, "span_name") == 0) {
context = CM_CONTEXT_TRACE_SPAN_NAME;
}
else if (strcasecmp(ctx->context_str, "span_kind") == 0) {
Expand Down Expand Up @@ -257,19 +257,12 @@ struct content_modifier_ctx *cm_config_create(struct flb_processor_instance *ins
}
}

// ret = set_converted_type(ctx);
// if (ret == -1) {
// flb_free(ctx);
// return NULL;
// }

/* Certain actions needs extra configuration, e.g: insert -> requires a key and a value */
ret = check_action_requirements(ctx);
if (ret == -1) {
flb_free(ctx);
return NULL;
}

return ctx;
}

Expand Down
56 changes: 26 additions & 30 deletions plugins/processor_content_modifier/cm_logs.c
Original file line number Diff line number Diff line change
Expand Up @@ -336,10 +336,10 @@ static struct cfl_kvpair *cfl_object_kvpair_get(struct cfl_object *obj, cfl_sds_
return NULL;
}

int run_action_insert(struct content_modifier_ctx *ctx,
struct cfl_object *obj,
const char *tag, int tag_len,
cfl_sds_t key, cfl_sds_t value)
static int run_action_insert(struct content_modifier_ctx *ctx,
struct cfl_object *obj,
const char *tag, int tag_len,
cfl_sds_t key, cfl_sds_t value)
{
int ret;
struct cfl_kvlist *kvlist;
Expand All @@ -360,10 +360,10 @@ int run_action_insert(struct content_modifier_ctx *ctx,
return 0;
}

int run_action_upsert(struct content_modifier_ctx *ctx,
struct cfl_object *obj,
const char *tag, int tag_len,
cfl_sds_t key, cfl_sds_t value)
static int run_action_upsert(struct content_modifier_ctx *ctx,
struct cfl_object *obj,
const char *tag, int tag_len,
cfl_sds_t key, cfl_sds_t value)
{
int ret;
struct cfl_kvlist *kvlist;
Expand All @@ -386,10 +386,10 @@ int run_action_upsert(struct content_modifier_ctx *ctx,
return 0;
}

int run_action_delete(struct content_modifier_ctx *ctx,
struct cfl_object *obj,
const char *tag, int tag_len,
cfl_sds_t key, cfl_sds_t value)
static int run_action_delete(struct content_modifier_ctx *ctx,
struct cfl_object *obj,
const char *tag, int tag_len,
cfl_sds_t key)
{
struct cfl_kvpair *kvpair;

Expand All @@ -403,10 +403,10 @@ int run_action_delete(struct content_modifier_ctx *ctx,
return -1;
}

int run_action_rename(struct content_modifier_ctx *ctx,
struct cfl_object *obj,
const char *tag, int tag_len,
cfl_sds_t key, cfl_sds_t value)
static int run_action_rename(struct content_modifier_ctx *ctx,
struct cfl_object *obj,
const char *tag, int tag_len,
cfl_sds_t key, cfl_sds_t value)
{
cfl_sds_t tmp;
struct cfl_kvpair *kvpair;
Expand All @@ -431,10 +431,10 @@ int run_action_rename(struct content_modifier_ctx *ctx,
return 0;
}

int run_action_hash(struct content_modifier_ctx *ctx,
struct cfl_object *obj,
const char *tag, int tag_len,
cfl_sds_t key)
static int run_action_hash(struct content_modifier_ctx *ctx,
struct cfl_object *obj,
const char *tag, int tag_len,
cfl_sds_t key)
{
int ret;
struct cfl_kvpair *kvpair;
Expand Down Expand Up @@ -504,10 +504,10 @@ int run_action_extract(struct content_modifier_ctx *ctx,
return 0;
}

int run_action_convert(struct content_modifier_ctx *ctx,
struct cfl_object *obj,
const char *tag, int tag_len,
cfl_sds_t key, int converted_type)
static int run_action_convert(struct content_modifier_ctx *ctx,
struct cfl_object *obj,
const char *tag, int tag_len,
cfl_sds_t key, int converted_type)
{
int ret;
struct cfl_kvlist *kvlist;
Expand Down Expand Up @@ -556,13 +556,9 @@ int cm_logs_process(struct flb_processor_instance *ins,
/* retrieve the target cfl object */
if (ctx->context_type == CM_CONTEXT_LOG_METADATA) {
obj = record->cobj_metadata;
// printf("> IN : ");
// cfl_object_print(stdout, obj);
}
else if (ctx->context_type == CM_CONTEXT_LOG_BODY) {
obj = record->cobj_record;
// printf("> IN : ");
// cfl_object_print(stdout, obj);
}

/* the operation on top of the data type is unsupported */
Expand All @@ -579,7 +575,7 @@ int cm_logs_process(struct flb_processor_instance *ins,
ret = run_action_upsert(ctx, obj, tag, tag_len, ctx->key, ctx->value);
}
else if (ctx->action_type == CM_ACTION_DELETE) {
ret = run_action_delete(ctx, obj, tag, tag_len, ctx->key, ctx->value);
ret = run_action_delete(ctx, obj, tag, tag_len, ctx->key);
}
else if (ctx->action_type == CM_ACTION_RENAME) {
ret = run_action_rename(ctx, obj, tag, tag_len, ctx->key, ctx->value);
Expand All @@ -600,4 +596,4 @@ int cm_logs_process(struct flb_processor_instance *ins,
}

return FLB_PROCESSOR_SUCCESS;
}
}
Loading

0 comments on commit 830545a

Please sign in to comment.