Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

processor_sql: new SQL processor #8580

Closed
wants to merge 12 commits into from
Closed
5 changes: 4 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -282,9 +282,12 @@ option(FLB_FILTER_TENSORFLOW "Enable tensorflow filter"
option(FLB_FILTER_GEOIP2 "Enable geoip2 filter" Yes)
option(FLB_FILTER_NIGHTFALL "Enable Nightfall filter" Yes)
option(FLB_FILTER_WASM "Enable WASM filter" Yes)

# Processors
option(FLB_PROCESSOR_CONTENT_MODIFIER "Enable content modifier processor" Yes)
option(FLB_PROCESSOR_LABELS "Enable metrics label manipulation processor" Yes)
option(FLB_PROCESSOR_METRICS_SELECTOR "Enable metrics selector processor" Yes)
option(FLB_PROCESSOR_CONTENT_MODIFIER "Enable content modifier processor" Yes)
option(FLB_PROCESSOR_SQL "Enable SQL processor" Yes)

if(DEFINED FLB_NIGHTLY_BUILD AND NOT "${FLB_NIGHTLY_BUILD}" STREQUAL "")
FLB_DEFINITION_VAL(FLB_NIGHTLY_BUILD ${FLB_NIGHTLY_BUILD})
Expand Down
6 changes: 4 additions & 2 deletions include/fluent-bit/flb_mp_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ struct flb_mp_chunk_cobj {


struct flb_mp_chunk_record *flb_mp_chunk_record_create(struct flb_mp_chunk_cobj *chunk_cobj);

int flb_mp_chunk_cobj_record_destroy(struct flb_mp_chunk_cobj *chunk_cobj,
struct flb_mp_chunk_record *record);
int flb_mp_chunk_cobj_record_next(struct flb_mp_chunk_cobj *chunk_cobj,
struct flb_mp_chunk_record **out_record);

Expand All @@ -56,8 +59,7 @@ int flb_mp_chunk_cobj_destroy(struct flb_mp_chunk_cobj *chunk_cobj);

int flb_mp_chunk_cobj_encode(struct flb_mp_chunk_cobj *chunk_cobj, char **out_buf, size_t *out_size);

int flb_mp_chunk_cobj_record_next(struct flb_mp_chunk_cobj *chunk_cobj,
struct flb_mp_chunk_record **out_record);



#endif
2 changes: 1 addition & 1 deletion include/fluent-bit/flb_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ struct flb_processor_plugin {
const char *,
int);

int (*cb_exit) (struct flb_processor_instance *);
int (*cb_exit) (struct flb_processor_instance *, void *);

struct mk_list _head; /* Link to parent list (config->filters) */
};
Expand Down
7 changes: 5 additions & 2 deletions include/fluent-bit/flb_processor_plugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@
* limitations under the License.
*/

#ifndef FLB_FILTER_PLUGIN_H
#define FLB_FILTER_PLUGIN_H
#ifndef FLB_PROCESSOR_PLUGIN_H
#define FLB_PROCESSOR_PLUGIN_H

#include <fluent-bit/flb_info.h>
#include <fluent-bit/flb_processor.h>
#include <fluent-bit/flb_log.h>
#include <fluent-bit/flb_log_event_decoder.h>
#include <fluent-bit/flb_mp.h>
#include <fluent-bit/flb_mp_chunk.h>

#define flb_plg_log(ctx, level, fmt, ...) \
if (flb_log_check_level(ctx->log_level, level)) \
Expand Down
3 changes: 2 additions & 1 deletion plugins/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,10 @@ REGISTER_IN_PLUGIN("in_random")

# PROCESSORS
# ==========
REGISTER_PROCESSOR_PLUGIN("processor_content_modifier")
REGISTER_PROCESSOR_PLUGIN("processor_labels")
REGISTER_PROCESSOR_PLUGIN("processor_metrics_selector")
REGISTER_PROCESSOR_PLUGIN("processor_content_modifier")
REGISTER_PROCESSOR_PLUGIN("processor_sql")

# OUTPUTS
# =======
Expand Down
8 changes: 4 additions & 4 deletions plugins/processor_content_modifier/cm.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@ static int cb_init(struct flb_processor_instance *ins, void *source_plugin_insta
return FLB_PROCESSOR_SUCCESS;
}

static int cb_exit(struct flb_processor_instance *ins)
static int cb_exit(struct flb_processor_instance *ins, void *data)
{
struct content_modifier_ctx *ctx;

if (!ins) {
return FLB_PROCESSOR_SUCCESS;
}

ctx = ins->context;
ctx = data;
if (ctx) {
cm_config_destroy(ctx);
}
Expand All @@ -62,13 +62,13 @@ static int cb_exit(struct flb_processor_instance *ins)
}

static int cb_process_logs(struct flb_processor_instance *ins,
void *chunk,
void *chunk_data,
const char *tag,
int tag_len)
{
int ret;
struct content_modifier_ctx *ctx;
struct flb_mp_chunk_cobj *chunk_cobj = (struct flb_mp_chunk_cobj *)chunk;
struct flb_mp_chunk_cobj *chunk_cobj = (struct flb_mp_chunk_cobj *) chunk_data;

if (!ins->context) {
return FLB_PROCESSOR_FAILURE;
Expand Down
7 changes: 3 additions & 4 deletions plugins/processor_labels/labels.c
Original file line number Diff line number Diff line change
Expand Up @@ -405,11 +405,10 @@ static int cb_init(struct flb_processor_instance *processor_instance,
}


static int cb_exit(struct flb_processor_instance *processor_instance)
static int cb_exit(struct flb_processor_instance *processor_instance, void *data)
{
if (processor_instance != NULL &&
processor_instance->context != NULL) {
destroy_context(processor_instance->context);
if (processor_instance != NULL && data != NULL) {
destroy_context(data);
}

return FLB_PROCESSOR_SUCCESS;
Expand Down
12 changes: 12 additions & 0 deletions plugins/processor_sql/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
include_directories(${CMAKE_CURRENT_SOURCE_DIR})

# find_package(FLEX REQUIRED)
# find_package(BISON REQUIRED)

add_subdirectory(parser)

set(src
sql_config.c
sql.c)

FLB_PLUGIN(processor_sql "${src}" "processor-sql-parser")
27 changes: 27 additions & 0 deletions plugins/processor_sql/parser/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
flex_target(lexer sql-parser.l "${CMAKE_CURRENT_BINARY_DIR}/processor-sql-parser_lex.c"
DEFINES_FILE "${CMAKE_CURRENT_BINARY_DIR}/processor-sql-parser_lex.h"
)
bison_target(parser sql-parser.y "${CMAKE_CURRENT_BINARY_DIR}/processor-sql_parser.c")

if(CMAKE_SYSTEM_NAME MATCHES "Windows")
FLB_DEFINITION(YY_NO_UNISTD_H)
message(STATUS "Specifying YY_NO_UNISTD_H")
endif()

set(sources
sql_parser.c
sql_expression.c
)

include_directories(
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_BINARY_DIR}
)

add_library(processor-sql-parser STATIC
${sources}
"${CMAKE_CURRENT_BINARY_DIR}/processor-sql-parser_lex.c"
"${CMAKE_CURRENT_BINARY_DIR}/processor-sql_parser.c"
)

add_flex_bison_dependency(lexer parser)
79 changes: 79 additions & 0 deletions plugins/processor_sql/parser/sql-parser.l
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
%option caseless
%{
#include <stdio.h>
#include <fluent-bit/flb_str.h>

#include "processor-sql_parser.h"
#include "sql.h"

static inline char *remove_dup_qoutes(const char *s, size_t n)
{
char *str;
int dups;
int i, j;

dups = 0;
for (i = 0; i < n; i++) {
if (s[i] == '\'') {
dups++;
i++;
}
}

str = (char *) flb_malloc(n - dups + 1);
if (!str) {
return NULL;
}

j = 0;
for (i = 0; i < n; i++, j++) {
if (s[i] == '\'') {
str[j] = '\'';
i++;
} else {
str[j] = s[i];
}
}
str[j] = '\0';

return str;
}

%}

%option 8bit reentrant bison-bridge
%option warn noyywrap nodefault
%option nounput
%option noinput

%%

SELECT return SELECT;
AS return AS;
FROM return FROM;
STREAM return FROM_STREAM;
WHERE return WHERE;
AND return AND;
OR return OR;
NOT return NOT;

IS return IS;
NULL return NUL;

-?[1-9][0-9]*|0 { yylval->integer = atoi(yytext); return INTEGER; }
(-?[1-9][0-9]*|0)\.[0-9]+ { yylval->fval = atof(yytext); return FLOATING; }
\'([^']|'{2})*\' { yylval->string = remove_dup_qoutes(yytext + 1, yyleng - 2); return STRING; }
[_A-Za-z][A-Za-z0-9_.]* { yylval->string = flb_strdup(yytext); return IDENTIFIER; }

"*" |
"," |
"=" |
"(" |
")" |
";" { return yytext[0]; }

\' return QUOTE;
\n
[ \t]+ /* ignore whitespace */;

%%
Loading
Loading