Skip to content

Commit

Permalink
in_tail: read from tail (duh!) (#1667 #1761 #474 #1645 #1330)
Browse files Browse the repository at this point in the history
From now on, Tail plugin implements the following behavior:

1. If a file is already registered in the database and contains an offset,
   the file is consumed from that offset position.

2. Upon start, if a file is not known by the database, read from it tail.

3. If the new 'read_from_head' property (default: false) is enabled, for newly
   discovered files read from the beginning. This flag don't override the
   behavior of a file that already exists in the database.

Additional fix:

When a file is being monitored in 'static mode', handle truncation properly.

Signed-off-by: Eduardo Silva <[email protected]>
  • Loading branch information
edsiper committed Oct 1, 2020
1 parent 0f6b4a7 commit 70e33fa
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 33 deletions.
8 changes: 7 additions & 1 deletion plugins/in_tail/tail.c
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,12 @@ static struct flb_config_map config_map[] = {
"as a string under the key name log. This option allows to define an "
"alternative name for that key."
},
{
FLB_CONFIG_MAP_BOOL, "read_from_head", "false",
0, FLB_TRUE, offsetof(struct flb_tail_config, read_from_head),
"For new discovered files on start (without a database offset/position), read the "
"content from the head of the file, not tail."
},
{
FLB_CONFIG_MAP_STR, "refresh_interval", "60",
0, FLB_FALSE, 0,
Expand All @@ -481,7 +487,7 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_tail_config, watcher_interval),
},
{
FLB_CONFIG_MAP_INT, "rotate_wait", FLB_TAIL_ROTATE_WAIT,
FLB_CONFIG_MAP_TIME, "rotate_wait", FLB_TAIL_ROTATE_WAIT,
0, FLB_TRUE, offsetof(struct flb_tail_config, rotate_wait),
"specify the number of extra time in seconds to monitor a file once is "
"rotated in case some pending data is flushed."
Expand Down
1 change: 1 addition & 0 deletions plugins/in_tail/tail_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ struct flb_tail_config {
#endif
int refresh_interval_sec; /* seconds to re-scan */
long refresh_interval_nsec;/* nanoseconds to re-scan */
int read_from_head; /* read new files from head */
int rotate_wait; /* sec to wait on rotated files */
int watcher_interval; /* watcher interval */
int ignore_older; /* ignore fields older than X seconds */
Expand Down
133 changes: 101 additions & 32 deletions plugins/in_tail/tail_file.c
Original file line number Diff line number Diff line change
Expand Up @@ -539,13 +539,56 @@ static inline int flb_tail_file_exists(struct stat *st,
return FLB_FALSE;
}

/*
* Based in the configuration or database offset, set the proper 'offset' for the
* file in question.
*/
static int set_file_position(struct flb_tail_config *ctx,
struct flb_tail_file *file)
{
int64_t ret;

#ifdef FLB_HAVE_SQLDB
/*
* If the database option is enabled, try to gather the file position. The
* database function updates the file->offset entry.
*/
if (ctx->db) {
flb_tail_db_file_set(file, ctx);
if (file->offset > 0) {
ret = lseek(file->fd, file->offset, SEEK_SET);
if (ret == -1) {
flb_errno();
return -1;
}
}
/* no need to seek */
return 0;
}
#endif

if (ctx->read_from_head == FLB_TRUE) {
/* no need to seek, offset position is already zero */
return 0;
}

/* tail... */
ret = lseek(file->fd, 0, SEEK_END);
if (ret == -1) {
flb_errno();
return -1;
}
file->offset = ret;

return 0;
}

int flb_tail_file_append(char *path, struct stat *st, int mode,
struct flb_tail_config *ctx)
{
int fd;
int ret;
size_t len;
int64_t offset;
char *tag;
size_t tag_len;
struct flb_tail_file *file;
Expand Down Expand Up @@ -685,35 +728,23 @@ int flb_tail_file_append(char *path, struct stat *st, int mode,
}
}

/*
* Register or update the file entry, likely if the entry already exists
* into the database, the offset may be updated.
*/
#ifdef FLB_HAVE_SQLDB
if (ctx->db) {
flb_tail_db_file_set(file, ctx);
}
#endif

/* Seek if required */
if (file->offset > 0) {
flb_plg_debug(ctx->ins, "inode=%"PRIu64" appended file following on offset=%"PRId64,
file->inode, file->offset);
offset = lseek(file->fd, file->offset, SEEK_SET);
if (offset == -1) {
flb_errno();
flb_tail_file_remove(file);
goto error;
}
/* Set the file position (database offset, head or tail) */
ret = set_file_position(ctx, file);
if (ret == -1) {
flb_tail_file_remove(file);
goto error;
}

/* Remaining bytes to read */
file->pending_bytes = file->size - file->offset;

#ifdef FLB_HAVE_METRICS
flb_metrics_sum(FLB_TAIL_METRIC_F_OPENED, 1, ctx->ins->metrics);
#endif

flb_plg_debug(ctx->ins, "inode=%"PRIu64" appended as %s", file->inode, path);
flb_plg_debug(ctx->ins,
"inode=%"PRIu64" with offset=%"PRId64" appended as %s",
file->inode, file->offset, path);
return 0;

error:
Expand Down Expand Up @@ -798,6 +829,46 @@ int flb_tail_file_remove_all(struct flb_tail_config *ctx)
return count;
}

static int adjust_counters(struct flb_tail_config *ctx, struct flb_tail_file *file)
{
int ret;
int64_t offset;
struct stat st;

ret = fstat(file->fd, &st);
if (ret == -1) {
flb_errno();
return FLB_TAIL_ERROR;
}

/* Check if the file was truncated */
if (file->offset > st.st_size) {
offset = lseek(file->fd, 0, SEEK_SET);
if (offset == -1) {
flb_errno();
return FLB_TAIL_ERROR;
}

flb_plg_debug(ctx->ins, "inode=%"PRIu64" file truncated %s",
file->inode, file->name);
file->offset = offset;
file->buf_len = 0;

/* Update offset in the database file */
#ifdef FLB_HAVE_SQLDB
if (ctx->db) {
flb_tail_db_file_offset(file, ctx);
}
#endif
}
else {
file->size = st.st_size;
file->pending_bytes = (st.st_size - file->offset);
}

return FLB_TAIL_OK;
}

int flb_tail_file_chunk(struct flb_tail_file *file)
{
int ret;
Expand Down Expand Up @@ -903,23 +974,21 @@ int flb_tail_file_chunk(struct flb_tail_file *file)
return FLB_TAIL_ERROR;
}
else {
file->size = st.st_size;
file->pending_bytes = (st.st_size - file->offset);
/* adjust file counters, returns FLB_TAIL_OK or FLB_TAIL_ERROR */
ret = adjust_counters(ctx, file);
}
/* Data was consumed but likely some bytes still remain */
return FLB_TAIL_OK;
return ret;
}
else if (bytes == 0) {
/* We reached the end of file, let's wait for some incoming data */
ret = fstat(file->fd, &st);
if (ret == -1) {
flb_errno();
ret = adjust_counters(ctx, file);
if (ret == FLB_TAIL_OK) {
return FLB_TAIL_WAIT;
}
else {
return FLB_TAIL_ERROR;
}
file->size = st.st_size;
file->pending_bytes = (st.st_size - file->offset);

return FLB_TAIL_WAIT;
}
else {
/* error */
Expand Down

0 comments on commit 70e33fa

Please sign in to comment.