Skip to content

Commit

Permalink
WIP: processor_content_modifier: add support for metrics scope
Browse files Browse the repository at this point in the history
Signed-off-by: Eduardo Silva <[email protected]>
  • Loading branch information
edsiper committed Oct 27, 2024
1 parent 7e3073d commit 5bb47ac
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 166 deletions.
3 changes: 3 additions & 0 deletions plugins/processor_content_modifier/cm_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ static int set_context(struct content_modifier_ctx *ctx)
else if (strcasecmp(ctx->context_str, "otel_resource_attributes") == 0) {
context = CM_CONTEXT_OTEL_RESOURCE_ATTR;
}
else if (strcasecmp(ctx->context_str, "otel_scope_attributes") == 0) {
context = CM_CONTEXT_OTEL_SCOPE_ATTR;
}
else if (strcasecmp(ctx->context_str, "otel_scope_name") == 0) {
/*
* scope name is restricted to specific actions, make sure the user
Expand Down
111 changes: 3 additions & 108 deletions plugins/processor_content_modifier/cm_logs.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include "cm.h"
#include "cm_utils.h"
#include "cm_opentelemetry.h"

#include <stdio.h>

Expand Down Expand Up @@ -268,133 +269,27 @@ static int run_action_convert(struct content_modifier_ctx *ctx,
return 0;
}

static struct cfl_variant *otel_get_or_create_attributes(struct cfl_kvlist *kvlist)
{
int ret;
struct cfl_list *head;
struct cfl_list *tmp;
struct cfl_kvpair *kvpair;
struct cfl_variant *val;
struct cfl_kvlist *kvlist_tmp;

/* iterate resource to find the attributes field */
cfl_list_foreach_safe(head, tmp, &kvlist->list) {
kvpair = cfl_list_entry(head, struct cfl_kvpair, _head);
if (cfl_sds_len(kvpair->key) != 10) {
continue;
}

if (strncmp(kvpair->key, "attributes", 10) == 0) {
val = kvpair->val;
if (val->type != CFL_VARIANT_KVLIST) {
return NULL;
}

return val;
}
}

/* create an empty kvlist as the value of attributes */
kvlist_tmp = cfl_kvlist_create();
if (!kvlist_tmp) {
return NULL;
}

/* create the attributes kvpair */
ret = cfl_kvlist_insert_kvlist_s(kvlist, "attributes", 10, kvlist_tmp);
if (ret != 0) {
cfl_kvlist_destroy(kvlist_tmp);
return NULL;
}

/* get the last kvpair from the list */
kvpair = cfl_list_entry_last(&kvlist->list, struct cfl_kvpair, _head);
if (!kvpair) {
return NULL;
}

return kvpair->val;
}


static struct cfl_variant *otel_get_attributes(int context, struct flb_mp_chunk_record *record)
{
int key_len;
const char *key_buf;
struct cfl_list *head;
struct cfl_object *obj = NULL;
struct cfl_variant *val;
struct cfl_kvlist *kvlist;
struct cfl_kvpair *kvpair;
struct cfl_variant *var_attr;

if (context == CM_CONTEXT_OTEL_RESOURCE_ATTR) {
key_buf = "resource";
key_len = 8;
}
else if (context == CM_CONTEXT_OTEL_SCOPE_ATTR) {
key_buf = "scope";
key_len = 5;
}
else {
return NULL;
}

obj = record->cobj_record;
kvlist = obj->variant->data.as_kvlist;
cfl_list_foreach(head, &kvlist->list) {
kvpair = cfl_list_entry(head, struct cfl_kvpair, _head);

if (cfl_sds_len(kvpair->key) != key_len) {
continue;
}

if (strncmp(kvpair->key, key_buf, key_len) == 0) {
val = kvpair->val;
if (val->type != CFL_VARIANT_KVLIST) {
return NULL;
}

var_attr = otel_get_or_create_attributes(val->data.as_kvlist);
if (!var_attr) {
return NULL;
}

return var_attr;
}
}

return NULL;
return cm_otel_get_attributes(CM_TELEMETRY_LOGS, context, kvlist);
}

static struct cfl_variant *otel_get_scope(struct flb_mp_chunk_record *record)
{
struct cfl_list *head;
struct cfl_object *obj;
struct cfl_variant *val;
struct cfl_kvlist *kvlist;
struct cfl_kvpair *kvpair;

obj = record->cobj_record;
kvlist = obj->variant->data.as_kvlist;
cfl_list_foreach(head, &kvlist->list) {
kvpair = cfl_list_entry(head, struct cfl_kvpair, _head);

if (cfl_sds_len(kvpair->key) != 5) {
continue;
}

if (strncmp(kvpair->key, "scope", 5) == 0) {
val = kvpair->val;
if (val->type != CFL_VARIANT_KVLIST) {
return NULL;
}

return val;
}
}

return NULL;
return cm_otel_get_scope_metadata(CM_TELEMETRY_LOGS, kvlist);
}

int cm_logs_process(struct flb_processor_instance *ins,
Expand Down
54 changes: 15 additions & 39 deletions plugins/processor_content_modifier/cm_metrics.c
Original file line number Diff line number Diff line change
Expand Up @@ -260,12 +260,7 @@ int cm_metrics_process(struct flb_processor_instance *ins,
const char *tag, int tag_len)
{
int ret = -1;
struct cfl_list *head;
struct cfl_variant *var = NULL;
struct cfl_variant *val;
struct cfl_kvlist *kvlist;
struct cfl_kvpair *kvpair;
struct cfl_variant *var_attr;

printf("\n\n==== BEFORE =====\n");
cfl_kvlist_print(stdout, in_cmt->internal_metadata);
Expand Down Expand Up @@ -297,74 +292,55 @@ int cm_metrics_process(struct flb_processor_instance *ins,
}

var = NULL;
cfl_list_foreach(head, &in_cmt->external_metadata->list) {
kvpair = cfl_list_entry(head, struct cfl_kvpair, _head);

if (cfl_sds_len(kvpair->key) != 8 /* resource */) {
continue;
}

if (strncmp(kvpair->key, "resource", 8) == 0) {
val = kvpair->val;
if (val->type != CFL_VARIANT_KVLIST) {
return FLB_PROCESSOR_FAILURE;
}

var_attr = cm_otel_get_or_create_attributes(val->data.as_kvlist);
if (!var_attr) {
return FLB_PROCESSOR_FAILURE;
}

if (var_attr->type != CFL_VARIANT_KVLIST) {
return FLB_PROCESSOR_FAILURE;
}

var = var_attr;
break; //FiXME return ;
}

var = cm_otel_get_attributes(CM_TELEMETRY_METRICS, ctx->context_type, in_cmt->external_metadata);
if (!var) {
return FLB_PROCESSOR_FAILURE;
}
}
else if (ctx->context_type == CM_CONTEXT_OTEL_SCOPE_ATTR) {
var = cm_otel_get_attributes(CM_TELEMETRY_METRICS, ctx->context_type, in_cmt->external_metadata);
}
else if ((ctx->context_type == CM_CONTEXT_OTEL_SCOPE_NAME || ctx->context_type == CM_CONTEXT_OTEL_SCOPE_VERSION)) {
var = cm_otel_get_scope(in_cmt->external);
var = cm_otel_get_scope_metadata(CM_TELEMETRY_METRICS, in_cmt->external_metadata);
}


if (!var) {
return FLB_PROCESSOR_FAILURE;
}

if (ctx->action_type == CM_ACTION_INSERT) {
ret = run_action_insert(ctx, var->data.as_kvlist, tag, tag_len, ctx->key, ctx->value);
printf("\n run action insert: %i\n", ret);
}
else if (ctx->action_type == CM_ACTION_UPSERT) {
ret = run_action_upsert(ctx, var->data.as_kvlist, tag, tag_len, ctx->key, ctx->value);
printf("\n run action upsert: %i\n", ret);
}
else if (ctx->action_type == CM_ACTION_DELETE) {
ret = run_action_delete(ctx, var->data.as_kvlist, tag, tag_len, ctx->key);
printf("\n run action delete: %i\n", ret);
}
else if (ctx->action_type == CM_ACTION_RENAME) {
ret = run_action_rename(ctx, var->data.as_kvlist, tag, tag_len, ctx->key, ctx->value);
printf("\n run action rename: %i\n", ret);
}
else if (ctx->action_type == CM_ACTION_HASH) {
ret = run_action_hash(ctx, var->data.as_kvlist, tag, tag_len, ctx->key);
printf("\n run action hash: %i\n", ret);
}
else if (ctx->action_type == CM_ACTION_EXTRACT) {
ret = run_action_extract(ctx, var->data.as_kvlist, tag, tag_len, ctx->key, ctx->regex);
printf("\n run action extract: %i\n", ret);
}
else if (ctx->action_type == CM_ACTION_CONVERT) {
ret = run_action_convert(ctx, var->data.as_kvlist, tag, tag_len, ctx->key, ctx->converted_type);
printf("\n run action convert: %i\n", ret);
}

if (ret != 0) {
return FLB_PROCESSOR_FAILURE;
}

printf("\n\n==== AFTER =====\n");
cfl_kvlist_print(stdout, in_cmt->internal_metadata);
printf("\n");
printf("-----external----\n");
cfl_kvlist_print(stdout, in_cmt->external_metadata);
fflush(stdout);

return FLB_PROCESSOR_SUCCESS;
}
Loading

0 comments on commit 5bb47ac

Please sign in to comment.