Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ml: clear last recently used parser to match next parser(#5524) #5564

Merged
merged 2 commits into from
Jul 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions src/multiline/flb_ml.c
Original file line number Diff line number Diff line change
Expand Up @@ -709,9 +709,11 @@ int flb_ml_append(struct flb_ml *ml, uint64_t stream_id,
}
}
else if (lru_parser && lru_parser->last_stream_id > 0) {
flb_ml_flush_parser_instance(ml,
lru_parser,
lru_parser->last_stream_id);
/*
* Clear last recently used parser to match new parser.
* Do not flush last_stream_id since it should continue to parsing.
*/
lru_parser = NULL;
}
}

Expand Down Expand Up @@ -819,9 +821,11 @@ int flb_ml_append_object(struct flb_ml *ml, uint64_t stream_id,
}
}
else if (lru_parser && lru_parser->last_stream_id > 0) {
flb_ml_flush_parser_instance(ml,
lru_parser,
lru_parser->last_stream_id);
/*
* Clear last recently used parser to match new parser.
* Do not flush last_stream_id since it should continue to parsing.
*/
lru_parser = NULL;
Comment on lines +824 to +828
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: I removed flb_ml_flush_parser_instance.
Since last_stream_id may be parsing.

}
}

Expand Down
265 changes: 263 additions & 2 deletions tests/runtime/filter_multiline.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,78 @@ static int cb_check_result(void *record, size_t size, void *data)
}


pthread_mutex_t result_mutex = PTHREAD_MUTEX_INITIALIZER;
int num_output = 0;
static int get_output_num()
{
int ret;
pthread_mutex_lock(&result_mutex);
ret = num_output;
pthread_mutex_unlock(&result_mutex);

return ret;
}

static void set_output_num(int num)
{
pthread_mutex_lock(&result_mutex);
num_output = num;
pthread_mutex_unlock(&result_mutex);
}

static void clear_output_num()
{
set_output_num(0);
}

struct str_list {
size_t size; /* size of lists */
int ignore_min_line_num; /* ignore line if the length is less than this value */
char **lists; /* string lists */
};

/* Callback to check expected results */
static int cb_check_str_list(void *record, size_t size, void *data)
{
char *p;
char *out_line = record;
int num = get_output_num();
int count = 0;
size_t i;
struct str_list *l = (struct str_list *)data;

if (!TEST_CHECK(out_line != NULL)) {
TEST_MSG("out_line is NULL");
return -1;
}

if (!TEST_CHECK(l != NULL)) {
TEST_MSG("l is NULL");
flb_free(out_line);
return -1;
}

if (strlen(out_line) < l->ignore_min_line_num) {
flb_free(out_line);
return 0;
}

for (i=0; i<l->size; i++) {
p = strstr(out_line, l->lists[i]);
if (p != NULL) {
count++;
}
}
if(!TEST_CHECK(count != 0)) {
TEST_MSG("%s is not matched", out_line);
}
set_output_num(num+count);

flb_free(out_line);
return 0;
}



static struct filter_test *filter_test_create(struct flb_lib_out_cb *data)
{
Expand Down Expand Up @@ -86,7 +158,7 @@ static struct filter_test *filter_test_create(struct flb_lib_out_cb *data)
o_ffd = flb_output(ctx->flb, (char *) "lib", (void *) data);
TEST_CHECK(o_ffd >= 0);
flb_output_set(ctx->flb, o_ffd,
"match", "test",
"match", "test*",
"format", "json",
NULL);

Expand Down Expand Up @@ -426,12 +498,201 @@ static void flb_test_multiline_partial_message_concat_two_ids()
filter_test_destroy(ctx);
}


/*
* create 2 in_lib instances and pass multiline
* https://github.com/fluent/fluent-bit/issues/5524
*/
static void flb_test_ml_buffered_two_streams()
{
struct flb_lib_out_cb cb_data;
struct filter_test *ctx;
int i_ffd_2;
int ret;
int i;
int bytes;
int len;
char line_buf[2048] = {0};
int line_num;
int num;

char *expected_strs[] = {"Exception in thread main java.lang.IllegalStateException: ..null property\\n at com.example.myproject.Author.getBookIds(xx.java:38)\\n at com.example.myproject.Bootstrap.main(Bootstrap.java:14)\\nCaused by: java.lang.NullPointerException\\n at com.example.myproject.Book.getId(Book.java:22)\\n at com.example.myproject.Author.getBookIds(Author.java:35)\\n ... 1 more",
"Dec 14 06:41:08 Exception in thread main java.lang.RuntimeException: Something has gone wrong, aborting!\\n at com.myproject.module.MyProject.badMethod(MyProject.java:22)\\n at com.myproject.module.MyProject.oneMoreMethod(MyProject.java:18)\\n at com.myproject.module.MyProject.anotherMethod(MyProject.java:14)\\n at com.myproject.module.MyProject.someMethod(MyProject.java:10)\\n at com.myproject.module.MyProject.main(MyProject.java:6)"};
struct str_list expected = {
.size = sizeof(expected_strs)/sizeof(char*),
.lists = &expected_strs[0],
.ignore_min_line_num = 64,
};

char *ml_logs_1[] = {"Exception in thread main java.lang.IllegalStateException: ..null property",
" at com.example.myproject.Author.getBookIds(xx.java:38)",
" at com.example.myproject.Bootstrap.main(Bootstrap.java:14)",
"Caused by: java.lang.NullPointerException",
" at com.example.myproject.Book.getId(Book.java:22)",
" at com.example.myproject.Author.getBookIds(Author.java:35)",
" ... 1 more",
"single line"};
char *ml_logs_2[] = {
"single line...",
"Dec 14 06:41:08 Exception in thread main java.lang.RuntimeException: Something has gone wrong, aborting!",
" at com.myproject.module.MyProject.badMethod(MyProject.java:22)",
" at com.myproject.module.MyProject.oneMoreMethod(MyProject.java:18)",
" at com.myproject.module.MyProject.anotherMethod(MyProject.java:14)",
" at com.myproject.module.MyProject.someMethod(MyProject.java:10)",
" at com.myproject.module.MyProject.main(MyProject.java:6)",
"another line..."};

cb_data.cb = cb_check_str_list;
cb_data.data = (void *)&expected;

clear_output_num();

TEST_CHECK(sizeof(ml_logs_1)/sizeof(char*) == sizeof(ml_logs_2)/sizeof(char*));
line_num = sizeof(ml_logs_1)/sizeof(char*);

/* Create test context */
ctx = filter_test_create((void *) &cb_data);
if (!ctx) {
exit(EXIT_FAILURE);
}
i_ffd_2 = flb_input(ctx->flb, (char *) "lib", NULL);
TEST_CHECK(i_ffd_2 >= 0);
flb_input_set(ctx->flb, i_ffd_2, "tag", "test2", NULL);

/* Configure filter */
ret = flb_filter_set(ctx->flb, ctx->f_ffd,
"multiline.key_content", "log",
"multiline.parser", "java",
"buffer", "on",
"debug_flush", "on",
NULL);
TEST_CHECK(ret == 0);


/* Start the engine */
ret = flb_start(ctx->flb);
TEST_CHECK(ret == 0);

for (i=0; i<line_num; i++) {
sprintf(&line_buf[0], "[%d, {\"log\":\"%s\"}]", i, ml_logs_1[i]);
len = strlen(line_buf);
bytes = flb_lib_push(ctx->flb, ctx->i_ffd, &line_buf[0], len);
TEST_CHECK(bytes == len);


sprintf(&line_buf[0], "[%d, {\"log\":\"%s\"}]", i, ml_logs_2[i]);
len = strlen(line_buf);
bytes = flb_lib_push(ctx->flb, i_ffd_2, &line_buf[0], len);
TEST_CHECK(bytes == len);
}
sleep(3);

num = get_output_num();
if (!TEST_CHECK(num == 2)) {
TEST_MSG("output error. got %d expect 2", num);
}

filter_test_destroy(ctx);
}

static void flb_test_ml_buffered_16_streams()
{
struct flb_lib_out_cb cb_data;
struct filter_test *ctx;
int i_ffds[16] = {0};
int ffd_num = sizeof(i_ffds)/sizeof(int);
int ret;
int i;
int j;
int bytes;
int len;
char line_buf[2048] = {0};
char tag_buf[32] = {0};
int line_num;
int num;

char *expected_strs[] = {"Exception in thread main java.lang.IllegalStateException: ..null property\\n at com.example.myproject.Author.getBookIds(xx.java:38)\\n at com.example.myproject.Bootstrap.main(Bootstrap.java:14)\\nCaused by: java.lang.NullPointerException\\n at com.example.myproject.Book.getId(Book.java:22)\\n at com.example.myproject.Author.getBookIds(Author.java:35)\\n ... 1 more"};

struct str_list expected = {
.size = sizeof(expected_strs)/sizeof(char*),
.lists = &expected_strs[0],
.ignore_min_line_num = 64,
};

char *ml_logs[] = {"Exception in thread main java.lang.IllegalStateException: ..null property",
" at com.example.myproject.Author.getBookIds(xx.java:38)",
" at com.example.myproject.Bootstrap.main(Bootstrap.java:14)",
"Caused by: java.lang.NullPointerException",
" at com.example.myproject.Book.getId(Book.java:22)",
" at com.example.myproject.Author.getBookIds(Author.java:35)",
" ... 1 more",
"single line"};

cb_data.cb = cb_check_str_list;
cb_data.data = (void *)&expected;

clear_output_num();

line_num = sizeof(ml_logs)/sizeof(char*);

/* Create test context */
ctx = filter_test_create((void *) &cb_data);
if (!ctx) {
exit(EXIT_FAILURE);
}

i_ffds[0] = ctx->i_ffd;
for (i=1; i<ffd_num; i++) {
i_ffds[i] = flb_input(ctx->flb, (char *) "lib", NULL);
TEST_CHECK(i_ffds[i] >= 0);
sprintf(&tag_buf[0], "test%d", i);
flb_input_set(ctx->flb, i_ffds[i], "tag", tag_buf, NULL);
}

/* Configure filter */
ret = flb_filter_set(ctx->flb, ctx->f_ffd,
"multiline.key_content", "log",
"multiline.parser", "java",
"buffer", "on",
"debug_flush", "on",
NULL);
TEST_CHECK(ret == 0);


/* Start the engine */
ret = flb_start(ctx->flb);
TEST_CHECK(ret == 0);

for (i=0; i<line_num; i++) {
sprintf(&line_buf[0], "[%d, {\"log\":\"%s\"}]", i, ml_logs[i]);
len = strlen(line_buf);
for (j=0; j<ffd_num; j++) {
bytes = flb_lib_push(ctx->flb, i_ffds[j], &line_buf[0], len);
TEST_CHECK(bytes == len);
}
}
sleep(3);

num = get_output_num();
if (!TEST_CHECK(num == ffd_num)) {
TEST_MSG("output error. got %d expect %d", num, ffd_num);
}

filter_test_destroy(ctx);
}




TEST_LIST = {
{"ml_buffered_two_streams" , flb_test_ml_buffered_two_streams},
{"ml_buffered_16_streams" , flb_test_ml_buffered_16_streams},

{"multiline_buffered_one_record" , flb_test_multiline_buffered_one_output_record },
{"multiline_buffered_two_record" , flb_test_multiline_buffered_two_output_record },
{"flb_test_multiline_unbuffered" , flb_test_multiline_unbuffered },

{"flb_test_multiline_partial_message_concat" , flb_test_multiline_partial_message_concat },
{"flb_test_multiline_partial_message_concat_two_ids" , flb_test_multiline_partial_message_concat_two_ids },
{NULL, NULL}
};
};