Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

out_s3: add store_dir_limit_size to limit S3 disk usage #5902

Merged
merged 1 commit into from
Nov 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 18 additions & 10 deletions plugins/out_s3/s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,6 @@ static int cb_s3_init(struct flb_output_instance *ins,
int ret;
flb_sds_t tmp_sds;
int async_flags;
int len;
char *role_arn = NULL;
char *session_name;
const char *tmp;
Expand Down Expand Up @@ -540,6 +539,15 @@ static int cb_s3_init(struct flb_output_instance *ins,
return -1;
}

/* the check against -1 is works here because size_t is unsigned
* and (int) -1 == unsigned max value
* Fluent Bit uses -1 (which becomes max value) to indicate undefined
*/
if (ctx->ins->total_limit_size != -1) {
flb_plg_warn(ctx->ins, "Please use 'store_dir_limit_size' with s3 output instead of 'storage.total_limit_size'. "
"S3 has its own buffer files located in the store_dir.");
}

/* Date key */
ctx->date_key = ctx->json_date_key;
tmp = flb_output_get_property("json_date_key", ins);
Expand Down Expand Up @@ -570,15 +578,6 @@ static int cb_s3_init(struct flb_output_instance *ins,
return -1;
}

tmp = flb_output_get_property("chunk_buffer_dir", ins);
if (tmp) {
len = strlen(tmp);
if (tmp[len - 1] == '/' || tmp[len - 1] == '\\') {
flb_plg_error(ctx->ins, "'chunk_buffer_dir' can not end in a / or \\");
return -1;
}
}

/*
* store_dir is the user input, buffer_dir is what the code uses
* We append the bucket name to the dir, to support multiple instances
Expand Down Expand Up @@ -2363,6 +2362,15 @@ static struct flb_config_map config_map[] = {
" data will be locally buffered at any given point in time."
},

{
PettitWesley marked this conversation as resolved.
Show resolved Hide resolved
FLB_CONFIG_MAP_SIZE, "store_dir_limit_size", (char *) NULL,
0, FLB_TRUE, offsetof(struct flb_s3, store_dir_limit_size),
"S3 plugin has its own buffering system with files in the `store_dir`. "
"Use the `store_dir_limit_size` to limit the amount of data S3 buffers in "
"the `store_dir` to limit disk usage. If the limit is reached, "
"data will be discarded. Default is 0 which means unlimited."
},

{
FLB_CONFIG_MAP_STR, "s3_key_format", "/fluent-bit-logs/$TAG/%Y/%m/%d/%H/%M/%S",
0, FLB_TRUE, offsetof(struct flb_s3, s3_key_format),
Expand Down
4 changes: 4 additions & 0 deletions plugins/out_s3/s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ struct flb_s3 {
int compression;
int port;
int insecure;
size_t store_dir_limit_size;

/* track the total amount of buffered data */
size_t current_buffer_size;

struct flb_aws_provider *provider;
struct flb_aws_provider *base_provider;
Expand Down
19 changes: 19 additions & 0 deletions plugins/out_s3/s3_store.c
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,13 @@ int s3_store_buffer_put(struct flb_s3 *ctx, struct s3_file *s3_file,
int ret;
flb_sds_t name;
struct flb_fstore_file *fsf;
size_t space_remaining;

if (ctx->store_dir_limit_size > 0 && ctx->current_buffer_size + bytes >= ctx->store_dir_limit_size) {
flb_plg_error(ctx->ins, "Buffer is full: current_buffer_size=%zu, new_data=%zu, store_dir_limit_size=%zu bytes",
ctx->current_buffer_size, bytes, ctx->store_dir_limit_size);
return -1;
}

/* If no target file was found, create a new one */
if (!s3_file) {
Expand Down Expand Up @@ -184,6 +191,17 @@ int s3_store_buffer_put(struct flb_s3 *ctx, struct s3_file *s3_file,
return -1;
}
s3_file->size += bytes;
ctx->current_buffer_size += bytes;

/* if buffer is 95% full, warn user */
if (ctx->store_dir_limit_size > 0) {
space_remaining = ctx->store_dir_limit_size - ctx->current_buffer_size;
if ((space_remaining * 20) < ctx->store_dir_limit_size) {
PettitWesley marked this conversation as resolved.
Show resolved Hide resolved
flb_plg_warn(ctx->ins, "Buffer is almost full: current_buffer_size=%zu, store_dir_limit_size=%zu bytes",
ctx->current_buffer_size, ctx->store_dir_limit_size);
return -1;
}
}

return 0;
}
Expand Down Expand Up @@ -397,6 +415,7 @@ int s3_store_file_delete(struct flb_s3 *ctx, struct s3_file *s3_file)
struct flb_fstore_file *fsf;

fsf = s3_file->fsf;
ctx->current_buffer_size -= s3_file->size;

/* permanent deletion */
flb_fstore_file_delete(ctx->fs, fsf);
Expand Down