Skip to content

Commit

Permalink
in_tail: Introducing the compare_filename option to db_file_exists (#…
Browse files Browse the repository at this point in the history
…8025)(2/2)

When checking the existence of a file's inode, if the 'compare_filename'
option is enabled, it is modified to compare the filename as well.
If the inode matches but the filename is different, it removes the stale
inode from the database.

Signed-off-by: jinyong.choi <[email protected]>
  • Loading branch information
jinyongchoi authored and edsiper committed Apr 9, 2024
1 parent fe57a87 commit e4d8d8c
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 2 deletions.
8 changes: 8 additions & 0 deletions plugins/in_tail/tail.c
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,14 @@ static struct flb_config_map config_map[] = {
"provides higher performance. Note that WAL is not compatible with "
"shared network file systems."
},
{
FLB_CONFIG_MAP_BOOL, "db.compare_filename", "false",
0, FLB_TRUE, offsetof(struct flb_tail_config, compare_filename),
"This option determines whether to check both the inode and the filename "
"when retrieving file information from the db."
"'true' verifies both the inode and filename, while 'false' checks only "
"the inode (default)."
},
#endif

/* Multiline Options */
Expand Down
1 change: 1 addition & 0 deletions plugins/in_tail/tail_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ struct flb_tail_config {
struct flb_sqldb *db;
int db_sync;
int db_locking;
int compare_filename;
flb_sds_t db_journal_mode;
sqlite3_stmt *stmt_get_file;
sqlite3_stmt *stmt_insert_file;
Expand Down
58 changes: 56 additions & 2 deletions plugins/in_tail/tail_db.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,46 @@ int flb_tail_db_close(struct flb_sqldb *db)
return 0;
}

static int flb_tail_db_file_delete_by_id(struct flb_tail_config *ctx,
uint64_t id)
{
int ret;

/* Bind parameters */
ret = sqlite3_bind_int64(ctx->stmt_delete_file, 1, id);
if (ret != SQLITE_OK) {
flb_plg_error(ctx->ins, "db: error binding id=%"PRIu64", ret=%d", id, ret);
return -1;
}

ret = sqlite3_step(ctx->stmt_delete_file);

sqlite3_clear_bindings(ctx->stmt_delete_file);
sqlite3_reset(ctx->stmt_delete_file);

if (ret != SQLITE_DONE) {
flb_plg_error(ctx->ins, "db: error deleting stale entry from database:"
" id=%"PRIu64, id);
return -1;
}

flb_plg_info(ctx->ins, "db: stale file deleted from database:"
" id=%"PRIu64, id);
return 0;
}

/*
* Check if an file inode exists in the database. Return FLB_TRUE or
* FLB_FALSE
* Check if an file inode exists in the database.
* If the 'compare_filename' option is enabled,
* it checks along with the filename. Return FLB_TRUE or FLB_FALSE
*/
static int db_file_exists(struct flb_tail_file *file,
struct flb_tail_config *ctx,
uint64_t *id, uint64_t *inode, off_t *offset)
{
int ret;
int exists = FLB_FALSE;
const unsigned char *name;

/* Bind parameters */
sqlite3_bind_int64(ctx->stmt_get_file, 1, file->inode);
Expand All @@ -116,11 +146,30 @@ static int db_file_exists(struct flb_tail_file *file,
/* id: column 0 */
*id = sqlite3_column_int64(ctx->stmt_get_file, 0);

/* name: column 1 */
name = sqlite3_column_text(ctx->stmt_get_file, 1);
if (ctx->compare_filename && name == NULL) {
flb_plg_error(ctx->ins, "db: error getting name: id=%"PRIu64, *id);
return -1;
}

/* offset: column 2 */
*offset = sqlite3_column_int64(ctx->stmt_get_file, 2);

/* inode: column 3 */
*inode = sqlite3_column_int64(ctx->stmt_get_file, 3);

/* Checking if the file's name and inode match exactly */
if (ctx->compare_filename) {
if (flb_tail_target_file_name_cmp((char *) name, file) != 0) {
exists = FLB_FALSE;
flb_plg_debug(ctx->ins, "db: exists stale file from database:"
" id=%"PRIu64" inode=%"PRIu64" offset=%"PRIu64
" name=%s file_inode=%"PRIu64" file_name=%s",
*id, *inode, *offset, name, file->inode,
file->name);
}
}
}
else if (ret == SQLITE_DONE) {
/* all good */
Expand Down Expand Up @@ -221,6 +270,11 @@ int flb_tail_db_file_set(struct flb_tail_file *file,
}

if (ret == FLB_FALSE) {
/* Delete stale file of same inode */
if (ctx->compare_filename && id > 0) {
flb_tail_db_file_delete_by_id(ctx, id);
}

/* Get the database ID for this file */
file->db_id = db_file_insert(file, ctx);
}
Expand Down
148 changes: 148 additions & 0 deletions tests/runtime/in_tail.c
Original file line number Diff line number Diff line change
Expand Up @@ -1802,6 +1802,153 @@ void flb_test_db_delete_stale_file()
test_tail_ctx_destroy(ctx);
unlink(db);
}

void flb_test_db_compare_filename()
{
struct flb_lib_out_cb cb_data;
struct test_tail_ctx *ctx;
char *org_file[] = {"test_db.log"};
char *moved_file[] = {"test_db_moved.log"};
char *db = "test_db.db";
char *msg_init = "hello world";
char *msg_moved = "hello world moved";
char *msg_end = "hello db end";
int i;
int ret;
int num;
int unused;

unlink(db);

clear_output_num();

cb_data.cb = cb_count_msgpack;
cb_data.data = &unused;

ctx = test_tail_ctx_create(&cb_data,
&org_file[0],
sizeof(org_file)/sizeof(char *),
FLB_FALSE);
if (!TEST_CHECK(ctx != NULL)) {
TEST_MSG("test_ctx_create failed");
exit(EXIT_FAILURE);
}

ret = flb_input_set(ctx->flb, ctx->o_ffd,
"path", org_file[0],
"read_from_head", "true",
"db", db,
"db.sync", "full",
"db.compare_filename", "true",
NULL);
TEST_CHECK(ret == 0);

ret = flb_output_set(ctx->flb, ctx->o_ffd,
NULL);
TEST_CHECK(ret == 0);

/* Start the engine */
ret = flb_start(ctx->flb);
TEST_CHECK(ret == 0);

ret = write_msg(ctx, msg_init, strlen(msg_init));
if (!TEST_CHECK(ret > 0)) {
test_tail_ctx_destroy(ctx);
unlink(db);
exit(EXIT_FAILURE);
}

/* waiting to flush */
flb_time_msleep(500);

num = get_output_num();
if (!TEST_CHECK(num > 0)) {
TEST_MSG("no output");
}

if (ctx->fds != NULL) {
for (i=0; i<ctx->fd_num; i++) {
close(ctx->fds[i]);
}
flb_free(ctx->fds);
}
flb_stop(ctx->flb);
flb_destroy(ctx->flb);
flb_free(ctx);

/* re-init to use db */
clear_output_num();

/*
* Changing the file name from 'test_db.log' to 'test_db_moved.log.'
* In this scenario, it is assumed that the FluentBit has been terminated,
* and the file has been recreated with the same inode, with offsets equal
* to or greater than the previous file.
*/
ret = rename(org_file[0], moved_file[0]);
TEST_CHECK(ret == 0);

cb_data.cb = cb_count_msgpack;
cb_data.data = &unused;

ctx = test_tail_ctx_create(&cb_data,
&moved_file[0],
sizeof(moved_file)/sizeof(char *),
FLB_FALSE);
if (!TEST_CHECK(ctx != NULL)) {
TEST_MSG("test_ctx_create failed");
unlink(db);
exit(EXIT_FAILURE);
}

ret = flb_input_set(ctx->flb, ctx->o_ffd,
"path", moved_file[0],
"read_from_head", "true",
"db", db,
"db.sync", "full",
"db.compare_filename", "true",
NULL);
TEST_CHECK(ret == 0);

/*
* Start the engine
* The file has been newly created, and due to the 'db.compare_filename'
* option being set to true, it compares filenames to consider it a new
* file even if the inode is the same. If the option is set to false,
* it can be assumed to be the same file as before.
*/
ret = flb_start(ctx->flb);
TEST_CHECK(ret == 0);

/* waiting to flush */
flb_time_msleep(500);

ret = write_msg(ctx, msg_moved, strlen(msg_moved));
if (!TEST_CHECK(ret > 0)) {
test_tail_ctx_destroy(ctx);
unlink(db);
exit(EXIT_FAILURE);
}

ret = write_msg(ctx, msg_end, strlen(msg_end));
if (!TEST_CHECK(ret > 0)) {
test_tail_ctx_destroy(ctx);
unlink(db);
exit(EXIT_FAILURE);
}

/* waiting to flush */
flb_time_msleep(500);

num = get_output_num();
if (!TEST_CHECK(num == 3)) {
/* 3 = msg_init + msg_moved + msg_end */
TEST_MSG("num error. expect=3 got=%d", num);
}

test_tail_ctx_destroy(ctx);
unlink(db);
}
#endif /* FLB_HAVE_SQLDB */

/* Test list */
Expand All @@ -1828,6 +1975,7 @@ TEST_LIST = {
#ifdef FLB_HAVE_SQLDB
{"db", flb_test_db},
{"db_delete_stale_file", flb_test_db_delete_stale_file},
{"db_compare_filename", flb_test_db_compare_filename},
#endif

#ifdef in_tail
Expand Down

0 comments on commit e4d8d8c

Please sign in to comment.