diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 933176ff9bb..83b3c0d032f 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -506,7 +506,6 @@ static int cb_s3_init(struct flb_output_instance *ins, int ret; flb_sds_t tmp_sds; int async_flags; - int len; char *role_arn = NULL; char *session_name; const char *tmp; @@ -540,6 +539,11 @@ static int cb_s3_init(struct flb_output_instance *ins, return -1; } + if (ctx->ins->total_limit_size > 0) { + flb_plg_warn(ctx->ins, "Please use 'store_dir_limit_size' with s3 output instead of 'storage.total_limit_size'. " + "S3 has its own buffer files located in the store_dir."); + } + /* Date key */ ctx->date_key = ctx->json_date_key; tmp = flb_output_get_property("json_date_key", ins); @@ -570,15 +574,6 @@ static int cb_s3_init(struct flb_output_instance *ins, return -1; } - tmp = flb_output_get_property("chunk_buffer_dir", ins); - if (tmp) { - len = strlen(tmp); - if (tmp[len - 1] == '/' || tmp[len - 1] == '\\') { - flb_plg_error(ctx->ins, "'chunk_buffer_dir' can not end in a / or \\"); - return -1; - } - } - /* * store_dir is the user input, buffer_dir is what the code uses * We append the bucket name to the dir, to support multiple instances @@ -2363,6 +2358,15 @@ static struct flb_config_map config_map[] = { " data will be locally buffered at any given point in time." }, + { + FLB_CONFIG_MAP_SIZE, "store_dir_limit_size", (char *) NULL, + 0, FLB_TRUE, offsetof(struct flb_s3, store_dir_limit_size), + "S3 plugin has its own buffering system with files in the `store_dir`. " + "Use the `store_dir_limit_size` to limit the amount of data S3 buffers in " + "the `store_dir` to limit disk usage. If the limit is reached, " + "data will be discarded. Default is 0 which means unlimited." + }, + { FLB_CONFIG_MAP_STR, "s3_key_format", "/fluent-bit-logs/$TAG/%Y/%m/%d/%H/%M/%S", 0, FLB_TRUE, offsetof(struct flb_s3, s3_key_format), diff --git a/plugins/out_s3/s3.h b/plugins/out_s3/s3.h index 326959c1bd0..3be84823eee 100644 --- a/plugins/out_s3/s3.h +++ b/plugins/out_s3/s3.h @@ -120,6 +120,10 @@ struct flb_s3 { int compression; int port; int insecure; + size_t store_dir_limit_size; + + /* track the total amount of buffered data */ + size_t current_buffer_size; struct flb_aws_provider *provider; struct flb_aws_provider *base_provider; diff --git a/plugins/out_s3/s3_store.c b/plugins/out_s3/s3_store.c index 7d60f7e1f97..050734a05f0 100644 --- a/plugins/out_s3/s3_store.c +++ b/plugins/out_s3/s3_store.c @@ -130,6 +130,13 @@ int s3_store_buffer_put(struct flb_s3 *ctx, struct s3_file *s3_file, int ret; flb_sds_t name; struct flb_fstore_file *fsf; + size_t space_remaining; + + if (ctx->store_dir_limit_size > 0 && ctx->current_buffer_size + bytes >= ctx->store_dir_limit_size) { + flb_plg_error(ctx->ins, "Buffer is full: current_buffer_size=%zu, new_data=%zu, store_dir_limit_size=%zu bytes", + ctx->current_buffer_size, bytes, ctx->store_dir_limit_size); + return -1; + } /* If no target file was found, create a new one */ if (!s3_file) { @@ -184,6 +191,17 @@ int s3_store_buffer_put(struct flb_s3 *ctx, struct s3_file *s3_file, return -1; } s3_file->size += bytes; + ctx->current_buffer_size += bytes; + + /* if buffer is 95% full, warn user */ + if (ctx->store_dir_limit_size > 0) { + space_remaining = ctx->store_dir_limit_size - ctx->current_buffer_size; + if ((space_remaining * 20) < ctx->store_dir_limit_size) { + flb_plg_warn(ctx->ins, "Buffer is almost full: current_buffer_size=%zu, store_dir_limit_size=%zu bytes", + ctx->current_buffer_size, ctx->store_dir_limit_size); + return -1; + } + } return 0; } @@ -397,6 +415,7 @@ int s3_store_file_delete(struct flb_s3 *ctx, struct s3_file *s3_file) struct flb_fstore_file *fsf; fsf = s3_file->fsf; + ctx->current_buffer_size -= s3_file->size; /* permanent deletion */ flb_fstore_file_delete(ctx->fs, fsf);