Skip to content

Commit

Permalink
processor_calyptia: new lua processor for logs/metrics/traces
Browse files Browse the repository at this point in the history
The new processor has a similar API to filter lua, with the following
differences:

- logs/metrics/traces are supported
- it is now possible to modify logs metadata
- when splitting logs, it is possible to also specify the
  timestamp/metadata of each split log.
- global variables for the return codes were added, these can make
  scripts more readable. The variables are:
  - DROP (-1)
  - KEEP (0)
  - MODIFY (1)
  - MODIFY_KEEP_TIMESTAMP (2)
- For metrics/traces, timestamp is not used (since timestamps are
  specified using internal fields)
- For metrics, metadata is not used for now

Signed-off-by: Thiago Padilha <[email protected]>
  • Loading branch information
tarruda committed Sep 2, 2024
1 parent c6e902a commit c434ae3
Show file tree
Hide file tree
Showing 29 changed files with 2,768 additions and 0 deletions.
1 change: 1 addition & 0 deletions cmake/plugins_options.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ DEFINE_OPTION(FLB_IN_WINSTAT "Enable Windows Stat input plugin"
# Processors
# ==========
DEFINE_OPTION(FLB_PROCESSOR_CONTENT_MODIFIER "Enable content modifier processor" ON)
DEFINE_OPTION(FLB_PROCESSOR_CALYPTIA "Enable calyptia core lua processor" ON)
DEFINE_OPTION(FLB_PROCESSOR_LABELS "Enable metrics label manipulation processor" ON)
DEFINE_OPTION(FLB_PROCESSOR_METRICS_SELECTOR "Enable metrics selector processor" ON)
DEFINE_OPTION(FLB_PROCESSOR_SQL "Enable SQL processor" ON)
Expand Down
1 change: 1 addition & 0 deletions plugins/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ REGISTER_IN_PLUGIN("in_random")
# PROCESSORS
# ==========
REGISTER_PROCESSOR_PLUGIN("processor_content_modifier")
REGISTER_PROCESSOR_PLUGIN("processor_calyptia")
REGISTER_PROCESSOR_PLUGIN("processor_labels")
REGISTER_PROCESSOR_PLUGIN("processor_metrics_selector")
REGISTER_PROCESSOR_PLUGIN("processor_sql")
Expand Down
25 changes: 25 additions & 0 deletions plugins/processor_calyptia/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
set(LUA_HELPERS_SRC ${CMAKE_CURRENT_SOURCE_DIR}/processor_helpers.lua)
set(LUA_HELPERS_DST ${CMAKE_CURRENT_BINARY_DIR}/calyptia_processor_helpers.c)

add_custom_command(
OUTPUT ${LUA_HELPERS_DST}
COMMAND ${CMAKE_COMMAND} -P ${CMAKE_CURRENT_SOURCE_DIR}/generate_lua_helpers.cmake ${LUA_HELPERS_SRC} ${LUA_HELPERS_DST}
DEPENDS generate_lua_helpers.cmake ${LUA_HELPERS_SRC}
)

set(src
calyptia.c
calyptia_logs.c
calyptia_logs_from_lua.c
calyptia_logs_to_lua.c
calyptia_metrics.c
calyptia_metrics_from_lua.c
calyptia_metrics_to_lua.c
calyptia_traces.c
calyptia_traces_to_lua.c
calyptia_traces_from_lua.c
cfl_to_lua.c
lua_to_cfl.c
${LUA_HELPERS_DST})

FLB_PLUGIN(processor_calyptia "${src}" "")
279 changes: 279 additions & 0 deletions plugins/processor_calyptia/calyptia.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,279 @@
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>

#include <cmetrics/cmt_decode_prometheus.h>
#include <ctraces/ctr_encode_opentelemetry.h>
#include <fluent-bit/flb_filter.h>
#include <fluent-bit/flb_lib.h>
#include <fluent-bit/flb_lua.h>
#include <fluent-bit/flb_luajit.h>
#include <fluent-bit/flb_mp.h>
#include <fluent-bit/flb_processor_plugin.h>
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_time.h>
#include <fluent-bit/flb_hash.h>
#include <fluent-bit/flb_pack.h>
#include <fluent-bit/flb_processor.h>
#include <fluent-bit/flb_log_event_decoder.h>
#include <fluent-bit/flb_log_event_encoder.h>

#include "calyptia_defs.h"
#include "calyptia_logs.h"
#include "calyptia_metrics.h"
#include "calyptia_traces.h"
#include "cfl_to_lua.h"

static void calyptia_config_destroy(struct calyptia_context *ctx)
{
if (!ctx) {
return;
}

if (ctx->code) {
flb_sds_destroy(ctx->code);
ctx->code = NULL;
}

if (ctx->script) {
flb_sds_destroy(ctx->script);
ctx->script = NULL;
}

if (ctx->lua) {
flb_luajit_destroy(ctx->lua);
ctx->lua = NULL;
}

flb_free(ctx);
}

static struct calyptia_context *
calyptia_config_create(struct flb_processor_instance *ins,
struct flb_config *config)
{
int ret;
int err;
char buf[PATH_MAX];
const char *tmp = NULL;
const char *script = NULL;
struct stat st;
struct flb_luajit *lj;
(void) config;
struct calyptia_context *ctx;

/* Allocate context */
ctx = flb_calloc(1, sizeof(struct calyptia_context));
if (!ctx) {
flb_errno();
return NULL;
}
ret = flb_processor_instance_config_map_set(ins, (void *) ctx);
if (ret < 0) {
flb_errno();
flb_plg_error(ins, "configuration error");
flb_free(ctx);
return NULL;
}

ctx->ins = ins;

/* config: code */
tmp = flb_processor_instance_get_property("code", ins);
if (tmp) {
ctx->code = flb_sds_create(tmp);
} else {
/* config: script */
script = flb_processor_instance_get_property("script", ins);
if (!script) {
flb_plg_error(ins, "either \"script\" or \"code\" must be set");
calyptia_config_destroy(ctx);
return NULL;
}

/* Compose path */
ret = stat(script, &st);
if (ret == -1 && errno == ENOENT) {
if (script[0] == '/') {
flb_plg_error(ins, "cannot access script '%s'", script);
calyptia_config_destroy(ctx);
return NULL;
}

if (config->conf_path) {
snprintf(buf, sizeof(buf) - 1, "%s%s", config->conf_path,
script);
script = buf;
}
}

/* Validate script path */
ret = access(script, R_OK);
if (ret == -1) {
flb_plg_error(ins, "cannot access script '%s'", script);
calyptia_config_destroy(ctx);
return NULL;
}

ctx->script = flb_sds_create(script);
if (!ctx->script) {
flb_plg_error(ins, "could not allocate string");
calyptia_config_destroy(ctx);
return NULL;
}
}

if (!ctx->call) {
flb_plg_error(ctx->ins, "\"call\" is not set");
calyptia_config_destroy(ctx);
return NULL;
}

/* Create LuaJIT state/vm */
lj = flb_luajit_create(config);
if (!lj) {
calyptia_config_destroy(ctx);
return NULL;
}

ctx->lua = lj;

/* Load the lua helpers */
if (flb_luajit_load_buffer(ctx->lua, calyptia_processor_lua_helpers,
strlen(calyptia_processor_lua_helpers),
"processor_helpers.lua")) {
calyptia_config_destroy(ctx);
return NULL;
}

/* this is here to allow passing options to the lua helpers */
lua_createtable(ctx->lua->state, 0, 1);
lua_pushboolean(ctx->lua->state, ctx->disable_warnings);
lua_setfield(ctx->lua->state, -2, "disable_warnings");
lua_setglobal(ctx->lua->state, "LUA_HELPERS_OPTS");

/* execute the lua helpers script, we expect 3 helper functions as return
* value */
err = lua_pcall(ctx->lua->state, 0, 3, 0);
if (err) {
flb_error("[luajit] invalid lua content, error=%d: %s", err,
lua_tostring(lj->state, -1));
calyptia_config_destroy(ctx);
return NULL;
}

/* push registry key for logs helper */
lua_pushlightuserdata(ctx->lua->state, (void *) LUA_LOGS_HELPER_KEY);
/* push the logs helper function */
lua_pushvalue(ctx->lua->state, -4);
/* store it in the registry */
lua_settable(ctx->lua->state, LUA_REGISTRYINDEX);

/* push registry key for metrics helper */
lua_pushlightuserdata(ctx->lua->state, (void *) LUA_METRICS_HELPER_KEY);
/* push the metrics helper function */
lua_pushvalue(ctx->lua->state, -3);
/* store it in the registry */
lua_settable(ctx->lua->state, LUA_REGISTRYINDEX);

/* push registry key for traces helper */
lua_pushlightuserdata(ctx->lua->state, (void *) LUA_TRACES_HELPER_KEY);
/* push the traces helper function */
lua_pushvalue(ctx->lua->state, -2);
/* store it in the registry */
lua_settable(ctx->lua->state, LUA_REGISTRYINDEX);
/* pop the helpers */
lua_pop(ctx->lua->state, 3);

/* Load the lua script */
if (ctx->code) {
if (flb_luajit_load_buffer(ctx->lua, ctx->code, flb_sds_len(ctx->code),
"processor.lua")) {
calyptia_config_destroy(ctx);
return NULL;
}
} else if (flb_luajit_load_script(ctx->lua, ctx->script)) {
calyptia_config_destroy(ctx);
return NULL;
}

if (ctx->opts) {
push_variant(ctx->lua->state, ctx->opts);
}

/* Execute the lua script */
err = lua_pcall(ctx->lua->state, ctx->opts ? 1 : 0, 0, 0);
if (err) {
flb_error("[luajit] invalid lua content, error=%d: %s", err,
lua_tostring(lj->state, -1));
calyptia_config_destroy(ctx);
return NULL;
}

if (flb_lua_is_valid_func(ctx->lua->state, ctx->call) != FLB_TRUE) {
flb_plg_error(ctx->ins, "function %s is not found", ctx->call);
calyptia_config_destroy(ctx);
return NULL;
}

return ctx;
}

static int cb_init(struct flb_processor_instance *ins,
void *source_plugin_instance, int source_plugin_type,
struct flb_config *config)
{
struct calyptia_context *ctx;

ctx = calyptia_config_create(ins, config);
if (!ctx) {
return -1;
}

flb_processor_instance_set_context(ins, ctx);

return FLB_PROCESSOR_SUCCESS;
}

static int cb_exit(struct flb_processor_instance *ins, void *data)
{
struct calyptia_context *ctx;

if (!ins) {
return FLB_PROCESSOR_SUCCESS;
}

ctx = data;
if (ctx) {
calyptia_config_destroy(ctx);
}

return FLB_PROCESSOR_SUCCESS;
}

static struct flb_config_map config_map[]
= { { FLB_CONFIG_MAP_STR, "code", NULL, 0, FLB_FALSE, 0,
"String that contains the Lua script source code" },
{ FLB_CONFIG_MAP_STR, "script", NULL, 0, FLB_FALSE, 0,
"The path of lua script." },
{ FLB_CONFIG_MAP_STR, "call", NULL, 0, FLB_TRUE,
offsetof(struct calyptia_context, call),
"Lua function that will be called to process logs." },
{ FLB_CONFIG_MAP_BOOL, "disable_warnings", "false", 0, FLB_TRUE,
offsetof(struct calyptia_context, disable_warnings),
"Disable warnings from lua helpers." },
{ FLB_CONFIG_MAP_VARIANT, "opts", NULL, 0, FLB_TRUE,
offsetof(struct calyptia_context, opts),
"Arguments object passed to Lua script" },
{ 0 } };

struct flb_processor_plugin processor_calyptia_plugin
= { .name = "calyptia",
.description = "Use lua to process logs, metrics and traces",
.cb_init = cb_init,
.cb_process_logs = calyptia_process_logs,
.cb_process_metrics = calyptia_process_metrics,
.cb_process_traces = calyptia_process_traces,
.cb_exit = cb_exit,
.config_map = config_map,
.flags = 0 };
25 changes: 25 additions & 0 deletions plugins/processor_calyptia/calyptia_defs.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#ifndef FLB_CALYPTIA_DEFS_H
#define FLB_CALYPTIA_DEFS_H

#include <fluent-bit/flb_lua.h>
#include <fluent-bit/flb_luajit.h>
#include <fluent-bit/flb_processor_plugin.h>

extern char calyptia_processor_lua_helpers[];

#define LUA_LOGS_HELPER_KEY (calyptia_processor_lua_helpers + 1)
#define LUA_METRICS_HELPER_KEY (calyptia_processor_lua_helpers + 2)
#define LUA_TRACES_HELPER_KEY (calyptia_processor_lua_helpers + 3)

struct calyptia_context {
flb_sds_t code; /* lua script source code */
flb_sds_t script; /* lua script path */
flb_sds_t call; /* lua callback to process the event */
struct flb_luajit *lua; /* state context */
struct flb_processor_instance *ins; /* processor instance */
bool disable_warnings; /* disable warnings from lua helpers */
struct cfl_variant *opts; /* arbitrary object passed to lua script */
};


#endif
Loading

0 comments on commit c434ae3

Please sign in to comment.