From 70e33fa2618227882d48faf690848bb4117cdb3e Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Wed, 30 Sep 2020 18:20:22 -0600 Subject: [PATCH] in_tail: read from tail (duh!) (#1667 #1761 #474 #1645 #1330) From now on, Tail plugin implements the following behavior: 1. If a file is already registered in the database and contains an offset, the file is consumed from that offset position. 2. Upon start, if a file is not known by the database, read from it tail. 3. If the new 'read_from_head' property (default: false) is enabled, for newly discovered files read from the beginning. This flag don't override the behavior of a file that already exists in the database. Additional fix: When a file is being monitored in 'static mode', handle truncation properly. Signed-off-by: Eduardo Silva --- plugins/in_tail/tail.c | 8 +- plugins/in_tail/tail_config.h | 1 + plugins/in_tail/tail_file.c | 133 ++++++++++++++++++++++++++-------- 3 files changed, 109 insertions(+), 33 deletions(-) diff --git a/plugins/in_tail/tail.c b/plugins/in_tail/tail.c index 92d4dd34775..6143a1332af 100644 --- a/plugins/in_tail/tail.c +++ b/plugins/in_tail/tail.c @@ -471,6 +471,12 @@ static struct flb_config_map config_map[] = { "as a string under the key name log. This option allows to define an " "alternative name for that key." }, + { + FLB_CONFIG_MAP_BOOL, "read_from_head", "false", + 0, FLB_TRUE, offsetof(struct flb_tail_config, read_from_head), + "For new discovered files on start (without a database offset/position), read the " + "content from the head of the file, not tail." + }, { FLB_CONFIG_MAP_STR, "refresh_interval", "60", 0, FLB_FALSE, 0, @@ -481,7 +487,7 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct flb_tail_config, watcher_interval), }, { - FLB_CONFIG_MAP_INT, "rotate_wait", FLB_TAIL_ROTATE_WAIT, + FLB_CONFIG_MAP_TIME, "rotate_wait", FLB_TAIL_ROTATE_WAIT, 0, FLB_TRUE, offsetof(struct flb_tail_config, rotate_wait), "specify the number of extra time in seconds to monitor a file once is " "rotated in case some pending data is flushed." diff --git a/plugins/in_tail/tail_config.h b/plugins/in_tail/tail_config.h index 85ba905e469..ae87bbb90d5 100644 --- a/plugins/in_tail/tail_config.h +++ b/plugins/in_tail/tail_config.h @@ -68,6 +68,7 @@ struct flb_tail_config { #endif int refresh_interval_sec; /* seconds to re-scan */ long refresh_interval_nsec;/* nanoseconds to re-scan */ + int read_from_head; /* read new files from head */ int rotate_wait; /* sec to wait on rotated files */ int watcher_interval; /* watcher interval */ int ignore_older; /* ignore fields older than X seconds */ diff --git a/plugins/in_tail/tail_file.c b/plugins/in_tail/tail_file.c index 612f4d72da0..79c5610238e 100644 --- a/plugins/in_tail/tail_file.c +++ b/plugins/in_tail/tail_file.c @@ -539,13 +539,56 @@ static inline int flb_tail_file_exists(struct stat *st, return FLB_FALSE; } +/* + * Based in the configuration or database offset, set the proper 'offset' for the + * file in question. + */ +static int set_file_position(struct flb_tail_config *ctx, + struct flb_tail_file *file) +{ + int64_t ret; + +#ifdef FLB_HAVE_SQLDB + /* + * If the database option is enabled, try to gather the file position. The + * database function updates the file->offset entry. + */ + if (ctx->db) { + flb_tail_db_file_set(file, ctx); + if (file->offset > 0) { + ret = lseek(file->fd, file->offset, SEEK_SET); + if (ret == -1) { + flb_errno(); + return -1; + } + } + /* no need to seek */ + return 0; + } +#endif + + if (ctx->read_from_head == FLB_TRUE) { + /* no need to seek, offset position is already zero */ + return 0; + } + + /* tail... */ + ret = lseek(file->fd, 0, SEEK_END); + if (ret == -1) { + flb_errno(); + return -1; + } + file->offset = ret; + + return 0; +} + int flb_tail_file_append(char *path, struct stat *st, int mode, struct flb_tail_config *ctx) { int fd; int ret; size_t len; - int64_t offset; char *tag; size_t tag_len; struct flb_tail_file *file; @@ -685,35 +728,23 @@ int flb_tail_file_append(char *path, struct stat *st, int mode, } } - /* - * Register or update the file entry, likely if the entry already exists - * into the database, the offset may be updated. - */ -#ifdef FLB_HAVE_SQLDB - if (ctx->db) { - flb_tail_db_file_set(file, ctx); - } -#endif - - /* Seek if required */ - if (file->offset > 0) { - flb_plg_debug(ctx->ins, "inode=%"PRIu64" appended file following on offset=%"PRId64, - file->inode, file->offset); - offset = lseek(file->fd, file->offset, SEEK_SET); - if (offset == -1) { - flb_errno(); - flb_tail_file_remove(file); - goto error; - } + /* Set the file position (database offset, head or tail) */ + ret = set_file_position(ctx, file); + if (ret == -1) { + flb_tail_file_remove(file); + goto error; } + /* Remaining bytes to read */ file->pending_bytes = file->size - file->offset; #ifdef FLB_HAVE_METRICS flb_metrics_sum(FLB_TAIL_METRIC_F_OPENED, 1, ctx->ins->metrics); #endif - flb_plg_debug(ctx->ins, "inode=%"PRIu64" appended as %s", file->inode, path); + flb_plg_debug(ctx->ins, + "inode=%"PRIu64" with offset=%"PRId64" appended as %s", + file->inode, file->offset, path); return 0; error: @@ -798,6 +829,46 @@ int flb_tail_file_remove_all(struct flb_tail_config *ctx) return count; } +static int adjust_counters(struct flb_tail_config *ctx, struct flb_tail_file *file) +{ + int ret; + int64_t offset; + struct stat st; + + ret = fstat(file->fd, &st); + if (ret == -1) { + flb_errno(); + return FLB_TAIL_ERROR; + } + + /* Check if the file was truncated */ + if (file->offset > st.st_size) { + offset = lseek(file->fd, 0, SEEK_SET); + if (offset == -1) { + flb_errno(); + return FLB_TAIL_ERROR; + } + + flb_plg_debug(ctx->ins, "inode=%"PRIu64" file truncated %s", + file->inode, file->name); + file->offset = offset; + file->buf_len = 0; + + /* Update offset in the database file */ +#ifdef FLB_HAVE_SQLDB + if (ctx->db) { + flb_tail_db_file_offset(file, ctx); + } +#endif + } + else { + file->size = st.st_size; + file->pending_bytes = (st.st_size - file->offset); + } + + return FLB_TAIL_OK; +} + int flb_tail_file_chunk(struct flb_tail_file *file) { int ret; @@ -903,23 +974,21 @@ int flb_tail_file_chunk(struct flb_tail_file *file) return FLB_TAIL_ERROR; } else { - file->size = st.st_size; - file->pending_bytes = (st.st_size - file->offset); + /* adjust file counters, returns FLB_TAIL_OK or FLB_TAIL_ERROR */ + ret = adjust_counters(ctx, file); } /* Data was consumed but likely some bytes still remain */ - return FLB_TAIL_OK; + return ret; } else if (bytes == 0) { /* We reached the end of file, let's wait for some incoming data */ - ret = fstat(file->fd, &st); - if (ret == -1) { - flb_errno(); + ret = adjust_counters(ctx, file); + if (ret == FLB_TAIL_OK) { + return FLB_TAIL_WAIT; + } + else { return FLB_TAIL_ERROR; } - file->size = st.st_size; - file->pending_bytes = (st.st_size - file->offset); - - return FLB_TAIL_WAIT; } else { /* error */