Skip to content

Commit

Permalink
out_s3: add store_dir_limit_size to limit S3 disk usage
Browse files Browse the repository at this point in the history
Signed-off-by: Wesley Pettit <[email protected]>
  • Loading branch information
PettitWesley committed Oct 5, 2022
1 parent 5c03b2e commit fa45e91
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 10 deletions.
28 changes: 18 additions & 10 deletions plugins/out_s3/s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,6 @@ static int cb_s3_init(struct flb_output_instance *ins,
int ret;
flb_sds_t tmp_sds;
int async_flags;
int len;
char *role_arn = NULL;
char *session_name;
const char *tmp;
Expand Down Expand Up @@ -540,6 +539,15 @@ static int cb_s3_init(struct flb_output_instance *ins,
return -1;
}

/* the check against -1 is works here because size_t is unsigned
* and (int) -1 == unsigned max value
* Fluent Bit uses -1 (which becomes max value) to indicate undefined
*/
if (ctx->ins->total_limit_size != -1) {
flb_plg_warn(ctx->ins, "Please use 'store_dir_limit_size' with s3 output instead of 'storage.total_limit_size'. "
"S3 has its own buffer files located in the store_dir.");
}

/* Date key */
ctx->date_key = ctx->json_date_key;
tmp = flb_output_get_property("json_date_key", ins);
Expand Down Expand Up @@ -570,15 +578,6 @@ static int cb_s3_init(struct flb_output_instance *ins,
return -1;
}

tmp = flb_output_get_property("chunk_buffer_dir", ins);
if (tmp) {
len = strlen(tmp);
if (tmp[len - 1] == '/' || tmp[len - 1] == '\\') {
flb_plg_error(ctx->ins, "'chunk_buffer_dir' can not end in a / or \\");
return -1;
}
}

/*
* store_dir is the user input, buffer_dir is what the code uses
* We append the bucket name to the dir, to support multiple instances
Expand Down Expand Up @@ -2363,6 +2362,15 @@ static struct flb_config_map config_map[] = {
" data will be locally buffered at any given point in time."
},

{
FLB_CONFIG_MAP_SIZE, "store_dir_limit_size", (char *) NULL,
0, FLB_TRUE, offsetof(struct flb_s3, store_dir_limit_size),
"S3 plugin has its own buffering system with files in the `store_dir`. "
"Use the `store_dir_limit_size` to limit the amount of data S3 buffers in "
"the `store_dir` to limit disk usage. If the limit is reached, "
"data will be discarded. Default is 0 which means unlimited."
},

{
FLB_CONFIG_MAP_STR, "s3_key_format", "/fluent-bit-logs/$TAG/%Y/%m/%d/%H/%M/%S",
0, FLB_TRUE, offsetof(struct flb_s3, s3_key_format),
Expand Down
4 changes: 4 additions & 0 deletions plugins/out_s3/s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ struct flb_s3 {
int compression;
int port;
int insecure;
size_t store_dir_limit_size;

/* track the total amount of buffered data */
size_t current_buffer_size;

struct flb_aws_provider *provider;
struct flb_aws_provider *base_provider;
Expand Down
19 changes: 19 additions & 0 deletions plugins/out_s3/s3_store.c
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,13 @@ int s3_store_buffer_put(struct flb_s3 *ctx, struct s3_file *s3_file,
int ret;
flb_sds_t name;
struct flb_fstore_file *fsf;
size_t space_remaining;

if (ctx->store_dir_limit_size > 0 && ctx->current_buffer_size + bytes >= ctx->store_dir_limit_size) {
flb_plg_error(ctx->ins, "Buffer is full: current_buffer_size=%zu, new_data=%zu, store_dir_limit_size=%zu bytes",
ctx->current_buffer_size, bytes, ctx->store_dir_limit_size);
return -1;
}

/* If no target file was found, create a new one */
if (!s3_file) {
Expand Down Expand Up @@ -184,6 +191,17 @@ int s3_store_buffer_put(struct flb_s3 *ctx, struct s3_file *s3_file,
return -1;
}
s3_file->size += bytes;
ctx->current_buffer_size += bytes;

/* if buffer is 95% full, warn user */
if (ctx->store_dir_limit_size > 0) {
space_remaining = ctx->store_dir_limit_size - ctx->current_buffer_size;
if ((space_remaining * 20) < ctx->store_dir_limit_size) {
flb_plg_warn(ctx->ins, "Buffer is almost full: current_buffer_size=%zu, store_dir_limit_size=%zu bytes",
ctx->current_buffer_size, ctx->store_dir_limit_size);
return -1;
}
}

return 0;
}
Expand Down Expand Up @@ -397,6 +415,7 @@ int s3_store_file_delete(struct flb_s3 *ctx, struct s3_file *s3_file)
struct flb_fstore_file *fsf;

fsf = s3_file->fsf;
ctx->current_buffer_size -= s3_file->size;

/* permanent deletion */
flb_fstore_file_delete(ctx->fs, fsf);
Expand Down

0 comments on commit fa45e91

Please sign in to comment.