Skip to content

Commit

Permalink
tests: runtime: filter_multiline: add multi stream test(fluent#5524)
Browse files Browse the repository at this point in the history
Signed-off-by: Takahiro Yamashita <[email protected]>
  • Loading branch information
nokute78 committed Jun 12, 2022
1 parent b98b935 commit ce53d0d
Showing 1 changed file with 263 additions and 2 deletions.
265 changes: 263 additions & 2 deletions tests/runtime/filter_multiline.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,78 @@ static int cb_check_result(void *record, size_t size, void *data)
}


pthread_mutex_t result_mutex = PTHREAD_MUTEX_INITIALIZER;
int num_output = 0;
static int get_output_num()
{
int ret;
pthread_mutex_lock(&result_mutex);
ret = num_output;
pthread_mutex_unlock(&result_mutex);

return ret;
}

static void set_output_num(int num)
{
pthread_mutex_lock(&result_mutex);
num_output = num;
pthread_mutex_unlock(&result_mutex);
}

static void clear_output_num()
{
set_output_num(0);
}

struct str_list {
size_t size; /* size of lists */
int ignore_min_line_num; /* ignore line if the length is less than this value */
char **lists; /* string lists */
};

/* Callback to check expected results */
static int cb_check_str_list(void *record, size_t size, void *data)
{
char *p;
char *out_line = record;
int num = get_output_num();
int count = 0;
size_t i;
struct str_list *l = (struct str_list *)data;

if (!TEST_CHECK(out_line != NULL)) {
TEST_MSG("out_line is NULL");
return -1;
}

if (!TEST_CHECK(l != NULL)) {
TEST_MSG("l is NULL");
flb_free(out_line);
return -1;
}

if (strlen(out_line) < l->ignore_min_line_num) {
flb_free(out_line);
return 0;
}

for (i=0; i<l->size; i++) {
p = strstr(out_line, l->lists[i]);
if (p != NULL) {
count++;
}
}
if(!TEST_CHECK(count != 0)) {
TEST_MSG("%s is not matched", out_line);
}
set_output_num(num+count);

flb_free(out_line);
return 0;
}



static struct filter_test *filter_test_create(struct flb_lib_out_cb *data)
{
Expand Down Expand Up @@ -86,7 +158,7 @@ static struct filter_test *filter_test_create(struct flb_lib_out_cb *data)
o_ffd = flb_output(ctx->flb, (char *) "lib", (void *) data);
TEST_CHECK(o_ffd >= 0);
flb_output_set(ctx->flb, o_ffd,
"match", "test",
"match", "test*",
"format", "json",
NULL);

Expand Down Expand Up @@ -426,12 +498,201 @@ static void flb_test_multiline_partial_message_concat_two_ids()
filter_test_destroy(ctx);
}


/*
* create 2 in_lib instances and pass multiline
* https://github.com/fluent/fluent-bit/issues/5524
*/
static void flb_test_ml_buffered_two_streams()
{
struct flb_lib_out_cb cb_data;
struct filter_test *ctx;
int i_ffd_2;
int ret;
int i;
int bytes;
int len;
char line_buf[2048] = {0};
int line_num;
int num;

char *expected_strs[] = {"Exception in thread main java.lang.IllegalStateException: ..null property\\n at com.example.myproject.Author.getBookIds(xx.java:38)\\n at com.example.myproject.Bootstrap.main(Bootstrap.java:14)\\nCaused by: java.lang.NullPointerException\\n at com.example.myproject.Book.getId(Book.java:22)\\n at com.example.myproject.Author.getBookIds(Author.java:35)\\n ... 1 more",
"Dec 14 06:41:08 Exception in thread main java.lang.RuntimeException: Something has gone wrong, aborting!\\n at com.myproject.module.MyProject.badMethod(MyProject.java:22)\\n at com.myproject.module.MyProject.oneMoreMethod(MyProject.java:18)\\n at com.myproject.module.MyProject.anotherMethod(MyProject.java:14)\\n at com.myproject.module.MyProject.someMethod(MyProject.java:10)\\n at com.myproject.module.MyProject.main(MyProject.java:6)"};
struct str_list expected = {
.size = sizeof(expected_strs)/sizeof(char*),
.lists = &expected_strs[0],
.ignore_min_line_num = 64,
};

char *ml_logs_1[] = {"Exception in thread main java.lang.IllegalStateException: ..null property",
" at com.example.myproject.Author.getBookIds(xx.java:38)",
" at com.example.myproject.Bootstrap.main(Bootstrap.java:14)",
"Caused by: java.lang.NullPointerException",
" at com.example.myproject.Book.getId(Book.java:22)",
" at com.example.myproject.Author.getBookIds(Author.java:35)",
" ... 1 more",
"single line"};
char *ml_logs_2[] = {
"single line...",
"Dec 14 06:41:08 Exception in thread main java.lang.RuntimeException: Something has gone wrong, aborting!",
" at com.myproject.module.MyProject.badMethod(MyProject.java:22)",
" at com.myproject.module.MyProject.oneMoreMethod(MyProject.java:18)",
" at com.myproject.module.MyProject.anotherMethod(MyProject.java:14)",
" at com.myproject.module.MyProject.someMethod(MyProject.java:10)",
" at com.myproject.module.MyProject.main(MyProject.java:6)",
"another line..."};

cb_data.cb = cb_check_str_list;
cb_data.data = (void *)&expected;

clear_output_num();

TEST_CHECK(sizeof(ml_logs_1)/sizeof(char*) == sizeof(ml_logs_2)/sizeof(char*));
line_num = sizeof(ml_logs_1)/sizeof(char*);

/* Create test context */
ctx = filter_test_create((void *) &cb_data);
if (!ctx) {
exit(EXIT_FAILURE);
}
i_ffd_2 = flb_input(ctx->flb, (char *) "lib", NULL);
TEST_CHECK(i_ffd_2 >= 0);
flb_input_set(ctx->flb, i_ffd_2, "tag", "test2", NULL);

/* Configure filter */
ret = flb_filter_set(ctx->flb, ctx->f_ffd,
"multiline.key_content", "log",
"multiline.parser", "java",
"buffer", "on",
"debug_flush", "on",
NULL);
TEST_CHECK(ret == 0);


/* Start the engine */
ret = flb_start(ctx->flb);
TEST_CHECK(ret == 0);

for (i=0; i<line_num; i++) {
sprintf(&line_buf[0], "[%d, {\"log\":\"%s\"}]", i, ml_logs_1[i]);
len = strlen(line_buf);
bytes = flb_lib_push(ctx->flb, ctx->i_ffd, &line_buf[0], len);
TEST_CHECK(bytes == len);


sprintf(&line_buf[0], "[%d, {\"log\":\"%s\"}]", i, ml_logs_2[i]);
len = strlen(line_buf);
bytes = flb_lib_push(ctx->flb, i_ffd_2, &line_buf[0], len);
TEST_CHECK(bytes == len);
}
sleep(4);

num = get_output_num();
if (!TEST_CHECK(num == 2)) {
TEST_MSG("output error. got %d expect 2", num);
}

filter_test_destroy(ctx);
}

static void flb_test_ml_buffered_16_streams()
{
struct flb_lib_out_cb cb_data;
struct filter_test *ctx;
int i_ffds[16] = {0};
int ffd_num = sizeof(i_ffds)/sizeof(int);
int ret;
int i;
int j;
int bytes;
int len;
char line_buf[2048] = {0};
char tag_buf[32] = {0};
int line_num;
int num;

char *expected_strs[] = {"Exception in thread main java.lang.IllegalStateException: ..null property\\n at com.example.myproject.Author.getBookIds(xx.java:38)\\n at com.example.myproject.Bootstrap.main(Bootstrap.java:14)\\nCaused by: java.lang.NullPointerException\\n at com.example.myproject.Book.getId(Book.java:22)\\n at com.example.myproject.Author.getBookIds(Author.java:35)\\n ... 1 more"};

struct str_list expected = {
.size = sizeof(expected_strs)/sizeof(char*),
.lists = &expected_strs[0],
.ignore_min_line_num = 64,
};

char *ml_logs[] = {"Exception in thread main java.lang.IllegalStateException: ..null property",
" at com.example.myproject.Author.getBookIds(xx.java:38)",
" at com.example.myproject.Bootstrap.main(Bootstrap.java:14)",
"Caused by: java.lang.NullPointerException",
" at com.example.myproject.Book.getId(Book.java:22)",
" at com.example.myproject.Author.getBookIds(Author.java:35)",
" ... 1 more",
"single line"};

cb_data.cb = cb_check_str_list;
cb_data.data = (void *)&expected;

clear_output_num();

line_num = sizeof(ml_logs)/sizeof(char*);

/* Create test context */
ctx = filter_test_create((void *) &cb_data);
if (!ctx) {
exit(EXIT_FAILURE);
}

i_ffds[0] = ctx->i_ffd;
for (i=1; i<ffd_num; i++) {
i_ffds[i] = flb_input(ctx->flb, (char *) "lib", NULL);
TEST_CHECK(i_ffds[i] >= 0);
sprintf(&tag_buf[0], "test%d", i);
flb_input_set(ctx->flb, i_ffds[i], "tag", tag_buf, NULL);
}

/* Configure filter */
ret = flb_filter_set(ctx->flb, ctx->f_ffd,
"multiline.key_content", "log",
"multiline.parser", "java",
"buffer", "on",
"debug_flush", "on",
NULL);
TEST_CHECK(ret == 0);


/* Start the engine */
ret = flb_start(ctx->flb);
TEST_CHECK(ret == 0);

for (i=0; i<line_num; i++) {
sprintf(&line_buf[0], "[%d, {\"log\":\"%s\"}]", i, ml_logs[i]);
len = strlen(line_buf);
for (j=0; j<ffd_num; j++) {
bytes = flb_lib_push(ctx->flb, i_ffds[j], &line_buf[0], len);
TEST_CHECK(bytes == len);
}
}
sleep(4);

num = get_output_num();
if (!TEST_CHECK(num == ffd_num)) {
TEST_MSG("output error. got %d expect %d", num, ffd_num);
}

filter_test_destroy(ctx);
}




TEST_LIST = {
{"ml_buffered_two_streams" , flb_test_ml_buffered_two_streams},
{"ml_buffered_16_streams" , flb_test_ml_buffered_16_streams},

{"multiline_buffered_one_record" , flb_test_multiline_buffered_one_output_record },
{"multiline_buffered_two_record" , flb_test_multiline_buffered_two_output_record },
{"flb_test_multiline_unbuffered" , flb_test_multiline_unbuffered },

{"flb_test_multiline_partial_message_concat" , flb_test_multiline_partial_message_concat },
{"flb_test_multiline_partial_message_concat_two_ids" , flb_test_multiline_partial_message_concat_two_ids },
{NULL, NULL}
};
};

0 comments on commit ce53d0d

Please sign in to comment.