Skip to content

Commit

Permalink
out_s3: add gzip compression support for multipart uploads
Browse files Browse the repository at this point in the history
fixing the logic part for choose mode for multipart
current logic is we are going to compress data first then make decision which method(put_object or multipart) we are going to move compress file before put object
remove compress data in upload
add compress in put_all trunk.
when compressed failed we will upload origin file

The patch have been tested using valgrind

Signed-off-by: Andy Wang <[email protected]>
  • Loading branch information
andy-wang-f authored and PettitWesley committed Jul 21, 2022
1 parent 186b3e7 commit 53f2e45
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 50 deletions.
132 changes: 82 additions & 50 deletions plugins/out_s3/s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ int create_headers(struct flb_s3 *ctx, char *body_md5,
if (ctx->content_type != NULL) {
headers_len++;
}
if (ctx->compression == FLB_AWS_COMPRESS_GZIP && multipart_upload == FLB_FALSE) {
if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
headers_len++;
}
if (ctx->canned_acl != NULL) {
Expand Down Expand Up @@ -168,7 +168,7 @@ int create_headers(struct flb_s3 *ctx, char *body_md5,
s3_headers[n].val_len = strlen(ctx->content_type);
n++;
}
if (ctx->compression == FLB_AWS_COMPRESS_GZIP && multipart_upload == FLB_FALSE) {
if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
s3_headers[n] = content_encoding_header;
n++;
}
Expand Down Expand Up @@ -637,6 +637,25 @@ static int cb_s3_init(struct flb_output_instance *ins,
ctx->use_put_object = FLB_TRUE;
}

tmp = flb_output_get_property("compression", ins);
if (tmp) {
ret = flb_aws_compression_get_type(tmp);
if (ret == -1) {
flb_plg_error(ctx->ins, "unknown compression: %s", tmp);
return -1;
}
if (ctx->use_put_object == FLB_FALSE && ctx->compression == FLB_AWS_COMPRESS_ARROW) {
flb_plg_error(ctx->ins,
"use_put_object must be enabled when Apache Arrow is enabled");
return -1;
}
ctx->compression = ret;
}

tmp = flb_output_get_property("content_type", ins);
if (tmp) {
ctx->content_type = (char *) tmp;
}
if (ctx->use_put_object == FLB_FALSE) {
/* upload_chunk_size */
if (ctx->upload_chunk_size <= 0) {
Expand All @@ -652,9 +671,16 @@ static int cb_s3_init(struct flb_output_instance *ins,
flb_plg_error(ctx->ins, "upload_chunk_size must be at least 5,242,880 bytes");
return -1;
}
if (ctx->upload_chunk_size > MAX_CHUNKED_UPLOAD_SIZE) {
flb_plg_error(ctx->ins, "Max upload_chunk_size is 50M");
return -1;
if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
if(ctx->upload_chunk_size > MAX_CHUNKED_UPLOAD_COMPRESS_SIZE) {
flb_plg_error(ctx->ins, "upload_chunk_size in compressed multipart upload cannot exceed 5GB");
return -1;
}
} else {
if (ctx->upload_chunk_size > MAX_CHUNKED_UPLOAD_SIZE) {
flb_plg_error(ctx->ins, "Max upload_chunk_size is 50MB");
return -1;
}
}
}

Expand Down Expand Up @@ -737,26 +763,6 @@ static int cb_s3_init(struct flb_output_instance *ins,
ctx->canned_acl = (char *) tmp;
}

tmp = flb_output_get_property("compression", ins);
if (tmp) {
if (ctx->use_put_object == FLB_FALSE) {
flb_plg_error(ctx->ins,
"use_put_object must be enabled when compression is enabled");
return -1;
}
ret = flb_aws_compression_get_type(tmp);
if (ret == -1) {
flb_plg_error(ctx->ins, "unknown compression: %s", tmp);
return -1;
}
ctx->compression = ret;
}

tmp = flb_output_get_property("content_type", ins);
if (tmp) {
ctx->content_type = (char *) tmp;
}

tmp = flb_output_get_property("storage_class", ins);
if (tmp) {
ctx->storage_class = (char *) tmp;
Expand Down Expand Up @@ -978,6 +984,22 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
int timeout_check = FLB_FALSE;
time_t create_time;
int ret;
void *payload_buf = NULL;
size_t payload_size = 0;
size_t preCompress_size = 0;

if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
/* Map payload */
ret = flb_aws_compression_compress(ctx->compression, body, body_size, &payload_buf, &payload_size);
if (ret == -1) {
flb_plg_error(ctx->ins, "Failed to compress data");
return FLB_RETRY;
} else {
preCompress_size = body_size;
body = (void *) payload_buf;
body_size = payload_size;
}
}

if (ctx->use_put_object == FLB_TRUE) {
goto put_object;
Expand Down Expand Up @@ -1009,6 +1031,10 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
goto multipart;
}
else {
if (ctx->use_put_object == FLB_FALSE && ctx->compression == FLB_AWS_COMPRESS_GZIP) {
flb_plg_info(ctx->ins, "Pre-compression upload_chunk_size= %d, After compression, chunk is only %d bytes, "
"the chunk was too small, using PutObject to upload", preCompress_size, body_size);
}
goto put_object;
}
}
Expand All @@ -1035,6 +1061,9 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
}

ret = s3_put_object(ctx, tag, create_time, body, body_size);
if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
flb_free(payload_buf);
}
if (ret < 0) {
/* re-add chunk to list */
if (chunk) {
Expand All @@ -1059,6 +1088,9 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
if (chunk) {
s3_store_file_unlock(chunk);
}
if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
flb_free(payload_buf);
}
return FLB_RETRY;
}
}
Expand All @@ -1070,13 +1102,19 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
if (chunk) {
s3_store_file_unlock(chunk);
}
if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
flb_free(payload_buf);
}
return FLB_RETRY;
}
m_upload->upload_state = MULTIPART_UPLOAD_STATE_CREATED;
}

ret = upload_part(ctx, m_upload, body, body_size);
if (ret < 0) {
if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
flb_free(payload_buf);
}
m_upload->upload_errors += 1;
/* re-add chunk to list */
if (chunk) {
Expand All @@ -1095,13 +1133,14 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
return FLB_RETRY;
}
m_upload->part_number += 1;

/* data was sent successfully- delete the local buffer */
if (chunk) {
s3_store_file_delete(ctx, chunk);
chunk = NULL;
}

if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
flb_free(payload_buf);
}
if (m_upload->bytes >= ctx->file_size) {
size_check = FLB_TRUE;
flb_plg_info(ctx->ins, "Will complete upload for %s because uploaded data is greater"
Expand Down Expand Up @@ -1144,6 +1183,8 @@ static int put_all_chunks(struct flb_s3 *ctx)
struct mk_list *f_head;
struct flb_fstore_file *fsf;
struct flb_fstore_stream *fs_stream;
void *payload_buf = NULL;
size_t payload_size = 0;
char *buffer = NULL;
size_t buffer_size;
int ret;
Expand Down Expand Up @@ -1186,6 +1227,18 @@ static int put_all_chunks(struct flb_s3 *ctx)
return -1;
}

if (ctx->compression != FLB_AWS_COMPRESS_NONE) {
/* Map payload */
ret = flb_aws_compression_compress(ctx->compression, buffer, buffer_size, &payload_buf, &payload_size);
if (ret == -1) {
flb_plg_error(ctx->ins, "Failed to compress data, uploading uncompressed data instead to prevent data loss");
} else {
flb_plg_info(ctx->ins, "Pre-compression chunk size is %d, After compression, chunk is %d bytes", buffer_size, payload_size);
buffer = (void *) payload_buf;
buffer_size = payload_size;
}
}

ret = s3_put_object(ctx, (const char *)
fsf->meta_buf,
chunk->create_time, buffer, buffer_size);
Expand Down Expand Up @@ -1283,9 +1336,6 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time
char *final_key;
flb_sds_t uri;
flb_sds_t tmp;
void *compressed_body;
char *final_body;
size_t final_body_size;
char final_body_md5[25];

s3_key = flb_get_s3_key(ctx->s3_key_format, create_time, tag, ctx->tag_delimiters,
Expand Down Expand Up @@ -1332,24 +1382,9 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time
flb_sds_destroy(s3_key);
uri = tmp;

if (ctx->compression != FLB_AWS_COMPRESS_NONE) {
ret = flb_aws_compression_compress(ctx->compression, body, body_size,
&compressed_body, &final_body_size);
if (ret == -1) {
flb_plg_error(ctx->ins, "Failed to compress data");
flb_sds_destroy(uri);
return -1;
}
final_body = (char *) compressed_body;
}
else {
final_body = body;
final_body_size = body_size;
}

memset(final_body_md5, 0, sizeof(final_body_md5));
if (ctx->send_content_md5 == FLB_TRUE) {
ret = get_md5_base64(final_body, final_body_size,
ret = get_md5_base64(body, body_size,
final_body_md5, sizeof(final_body_md5));
if (ret != 0) {
flb_plg_error(ctx->ins, "Failed to create Content-MD5 header");
Expand Down Expand Up @@ -1383,11 +1418,8 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time
goto decrement_index;
}
c = s3_client->client_vtable->request(s3_client, FLB_HTTP_PUT,
uri, final_body, final_body_size,
uri, body, body_size,
headers, num_headers);
if (ctx->compression != FLB_AWS_COMPRESS_NONE) {
flb_free(compressed_body);
}
flb_free(headers);
}
if (c) {
Expand Down
1 change: 1 addition & 0 deletions plugins/out_s3/s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
/* Upload data to S3 in 5MB chunks */
#define MIN_CHUNKED_UPLOAD_SIZE 5242880
#define MAX_CHUNKED_UPLOAD_SIZE 50000000
#define MAX_CHUNKED_UPLOAD_COMPRESS_SIZE 5000000000

#define UPLOAD_TIMER_MAX_WAIT 60000
#define UPLOAD_TIMER_MIN_WAIT 6000
Expand Down

0 comments on commit 53f2e45

Please sign in to comment.