Skip to content

Commit

Permalink
tests: filter_multiline: Add test for in_emitter pause by using multi…
Browse files Browse the repository at this point in the history
…line

This commit will add a test for pause functionality of in_emitter. The test
uses a small emitter buffer size, so the in_emitter will definitely be paused.

Signed-off-by: Richard Treu <[email protected]>
  • Loading branch information
drbugfinder-work authored and edsiper committed Apr 12, 2024
1 parent 0785d76 commit 33ce918
Showing 1 changed file with 124 additions and 0 deletions.
124 changes: 124 additions & 0 deletions tests/runtime/filter_multiline.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <fluent-bit.h>
#include <fluent-bit/flb_sds.h>
#include <fluent-bit/flb_time.h>
#include "flb_tests_runtime.h"

struct filter_test {
Expand Down Expand Up @@ -120,7 +121,34 @@ static int cb_check_str_list(void *record, size_t size, void *data)
return 0;
}

void wait_with_timeout(uint32_t timeout_ms, int *output_num, int expected)
{
struct flb_time start_time;
struct flb_time end_time;
struct flb_time diff_time;
uint64_t elapsed_time_flb = 0;

flb_time_get(&start_time);

while (true) {
*output_num = get_output_num();

if (*output_num == expected) {
break;
}

flb_time_msleep(100);
flb_time_get(&end_time);
flb_time_diff(&end_time, &start_time, &diff_time);
elapsed_time_flb = flb_time_to_nanosec(&diff_time) / 1000000;

if (elapsed_time_flb > timeout_ms) {
flb_warn("[timeout] elapsed_time: %ld", elapsed_time_flb);
// Reached timeout.
break;
}
}
}

static struct filter_test *filter_test_create(struct flb_lib_out_cb *data)
{
Expand Down Expand Up @@ -682,6 +710,100 @@ static void flb_test_ml_buffered_16_streams()
filter_test_destroy(ctx);
}

/* This test will test the pausing of in_emitter */
static void flb_test_ml_buffered_16_streams_pausing()
{
struct flb_lib_out_cb cb_data;
struct filter_test *ctx;
int i_ffds[16] = {0};
int ffd_num = sizeof(i_ffds)/sizeof(int);
int ret;
int i;
int j;
int bytes;
int len;
char line_buf[2048] = {0};
char tag_buf[32] = {0};
int line_num;
int num;

char *expected_strs[] = {"Exception in thread main java.lang.IllegalStateException: ..null property\\n at com.example.myproject.Author.getBookIds(xx.java:38)\\n at com.example.myproject.Bootstrap.main(Bootstrap.java:14)\\nCaused by: java.lang.NullPointerException\\n at com.example.myproject.Book.getId(Book.java:22)\\n at com.example.myproject.Author.getBookIds(Author.java:35)\\n ... 1 more"};

struct str_list expected = {
.size = sizeof(expected_strs)/sizeof(char*),
.lists = &expected_strs[0],
.ignore_min_line_num = 64,
};

char *ml_logs[] = {"Exception in thread main java.lang.IllegalStateException: ..null property",
" at com.example.myproject.Author.getBookIds(xx.java:38)",
" at com.example.myproject.Bootstrap.main(Bootstrap.java:14)",
"Caused by: java.lang.NullPointerException",
" at com.example.myproject.Book.getId(Book.java:22)",
" at com.example.myproject.Author.getBookIds(Author.java:35)",
" ... 1 more",
"single line"};

cb_data.cb = cb_check_str_list;
cb_data.data = (void *)&expected;

clear_output_num();

line_num = sizeof(ml_logs)/sizeof(char*);

/* Create test context */
ctx = filter_test_create((void *) &cb_data);
if (!ctx) {
exit(EXIT_FAILURE);
}
flb_service_set(ctx->flb,
"Flush", "0.100000000",
"Grace", "2",
NULL);

i_ffds[0] = ctx->i_ffd;
for (i=1; i<ffd_num; i++) {
i_ffds[i] = flb_input(ctx->flb, (char *) "lib", NULL);
TEST_CHECK(i_ffds[i] >= 0);
sprintf(&tag_buf[0], "test%d", i);
flb_input_set(ctx->flb, i_ffds[i], "tag", tag_buf, NULL);
}

/* Configure filter */
/* Set mem_buf_limit small, so in_emitter will be paused */
ret = flb_filter_set(ctx->flb, ctx->f_ffd,
"multiline.key_content", "log",
"multiline.parser", "java",
"buffer", "on",
"debug_flush", "on",
"emitter_mem_buf_limit", "1k",
NULL);
TEST_CHECK(ret == 0);


/* Start the engine */
ret = flb_start(ctx->flb);
TEST_CHECK(ret == 0);

for (i=0; i<line_num; i++) {
sprintf(&line_buf[0], "[%d, {\"log\":\"%s\"}]", i, ml_logs[i]);
len = strlen(line_buf);
for (j=0; j<ffd_num; j++) {
bytes = flb_lib_push(ctx->flb, i_ffds[j], &line_buf[0], len);
TEST_CHECK(bytes == len);
}
}
wait_with_timeout(20000, &num, ffd_num);

if (!TEST_CHECK(num > 0)) {
TEST_MSG("output error. got %d expect more than 0 records.", num);
/* The internal flb_lib_push cannot be paused, so records may be lost */
/* However, there should be at least some records */
}

filter_test_destroy(ctx);
}




Expand All @@ -695,5 +817,7 @@ TEST_LIST = {

{"flb_test_multiline_partial_message_concat" , flb_test_multiline_partial_message_concat },
{"flb_test_multiline_partial_message_concat_two_ids" , flb_test_multiline_partial_message_concat_two_ids },

{"ml_buffered_16_streams_pausing" , flb_test_ml_buffered_16_streams_pausing },
{NULL, NULL}
};

0 comments on commit 33ce918

Please sign in to comment.