From 33ce918351cdae2056b9e1e470ee163293f3fb5d Mon Sep 17 00:00:00 2001 From: Richard Treu Date: Fri, 12 Apr 2024 00:02:03 +0200 Subject: [PATCH] tests: filter_multiline: Add test for in_emitter pause by using multiline This commit will add a test for pause functionality of in_emitter. The test uses a small emitter buffer size, so the in_emitter will definitely be paused. Signed-off-by: Richard Treu --- tests/runtime/filter_multiline.c | 124 +++++++++++++++++++++++++++++++ 1 file changed, 124 insertions(+) diff --git a/tests/runtime/filter_multiline.c b/tests/runtime/filter_multiline.c index 18253a5b2c7..ed6ffb6b7cb 100644 --- a/tests/runtime/filter_multiline.c +++ b/tests/runtime/filter_multiline.c @@ -2,6 +2,7 @@ #include #include +#include #include "flb_tests_runtime.h" struct filter_test { @@ -120,7 +121,34 @@ static int cb_check_str_list(void *record, size_t size, void *data) return 0; } +void wait_with_timeout(uint32_t timeout_ms, int *output_num, int expected) +{ + struct flb_time start_time; + struct flb_time end_time; + struct flb_time diff_time; + uint64_t elapsed_time_flb = 0; + + flb_time_get(&start_time); + + while (true) { + *output_num = get_output_num(); + + if (*output_num == expected) { + break; + } + + flb_time_msleep(100); + flb_time_get(&end_time); + flb_time_diff(&end_time, &start_time, &diff_time); + elapsed_time_flb = flb_time_to_nanosec(&diff_time) / 1000000; + if (elapsed_time_flb > timeout_ms) { + flb_warn("[timeout] elapsed_time: %ld", elapsed_time_flb); + // Reached timeout. + break; + } + } +} static struct filter_test *filter_test_create(struct flb_lib_out_cb *data) { @@ -682,6 +710,100 @@ static void flb_test_ml_buffered_16_streams() filter_test_destroy(ctx); } +/* This test will test the pausing of in_emitter */ +static void flb_test_ml_buffered_16_streams_pausing() +{ + struct flb_lib_out_cb cb_data; + struct filter_test *ctx; + int i_ffds[16] = {0}; + int ffd_num = sizeof(i_ffds)/sizeof(int); + int ret; + int i; + int j; + int bytes; + int len; + char line_buf[2048] = {0}; + char tag_buf[32] = {0}; + int line_num; + int num; + + char *expected_strs[] = {"Exception in thread main java.lang.IllegalStateException: ..null property\\n at com.example.myproject.Author.getBookIds(xx.java:38)\\n at com.example.myproject.Bootstrap.main(Bootstrap.java:14)\\nCaused by: java.lang.NullPointerException\\n at com.example.myproject.Book.getId(Book.java:22)\\n at com.example.myproject.Author.getBookIds(Author.java:35)\\n ... 1 more"}; + + struct str_list expected = { + .size = sizeof(expected_strs)/sizeof(char*), + .lists = &expected_strs[0], + .ignore_min_line_num = 64, + }; + + char *ml_logs[] = {"Exception in thread main java.lang.IllegalStateException: ..null property", + " at com.example.myproject.Author.getBookIds(xx.java:38)", + " at com.example.myproject.Bootstrap.main(Bootstrap.java:14)", + "Caused by: java.lang.NullPointerException", + " at com.example.myproject.Book.getId(Book.java:22)", + " at com.example.myproject.Author.getBookIds(Author.java:35)", + " ... 1 more", + "single line"}; + + cb_data.cb = cb_check_str_list; + cb_data.data = (void *)&expected; + + clear_output_num(); + + line_num = sizeof(ml_logs)/sizeof(char*); + + /* Create test context */ + ctx = filter_test_create((void *) &cb_data); + if (!ctx) { + exit(EXIT_FAILURE); + } + flb_service_set(ctx->flb, + "Flush", "0.100000000", + "Grace", "2", + NULL); + + i_ffds[0] = ctx->i_ffd; + for (i=1; iflb, (char *) "lib", NULL); + TEST_CHECK(i_ffds[i] >= 0); + sprintf(&tag_buf[0], "test%d", i); + flb_input_set(ctx->flb, i_ffds[i], "tag", tag_buf, NULL); + } + + /* Configure filter */ + /* Set mem_buf_limit small, so in_emitter will be paused */ + ret = flb_filter_set(ctx->flb, ctx->f_ffd, + "multiline.key_content", "log", + "multiline.parser", "java", + "buffer", "on", + "debug_flush", "on", + "emitter_mem_buf_limit", "1k", + NULL); + TEST_CHECK(ret == 0); + + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + for (i=0; iflb, i_ffds[j], &line_buf[0], len); + TEST_CHECK(bytes == len); + } + } + wait_with_timeout(20000, &num, ffd_num); + + if (!TEST_CHECK(num > 0)) { + TEST_MSG("output error. got %d expect more than 0 records.", num); + /* The internal flb_lib_push cannot be paused, so records may be lost */ + /* However, there should be at least some records */ + } + + filter_test_destroy(ctx); +} + @@ -695,5 +817,7 @@ TEST_LIST = { {"flb_test_multiline_partial_message_concat" , flb_test_multiline_partial_message_concat }, {"flb_test_multiline_partial_message_concat_two_ids" , flb_test_multiline_partial_message_concat_two_ids }, + + {"ml_buffered_16_streams_pausing" , flb_test_ml_buffered_16_streams_pausing }, {NULL, NULL} };