From 53f2e45291bb798e3bc942d039f9b1596aed50f4 Mon Sep 17 00:00:00 2001 From: Andy Wang Date: Tue, 21 Jun 2022 14:41:07 -0700 Subject: [PATCH] out_s3: add gzip compression support for multipart uploads fixing the logic part for choose mode for multipart current logic is we are going to compress data first then make decision which method(put_object or multipart) we are going to move compress file before put object remove compress data in upload add compress in put_all trunk. when compressed failed we will upload origin file The patch have been tested using valgrind Signed-off-by: Andy Wang --- plugins/out_s3/s3.c | 132 +++++++++++++++++++++++++++----------------- plugins/out_s3/s3.h | 1 + 2 files changed, 83 insertions(+), 50 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index bba11849d58..933176ff9bb 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -138,7 +138,7 @@ int create_headers(struct flb_s3 *ctx, char *body_md5, if (ctx->content_type != NULL) { headers_len++; } - if (ctx->compression == FLB_AWS_COMPRESS_GZIP && multipart_upload == FLB_FALSE) { + if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { headers_len++; } if (ctx->canned_acl != NULL) { @@ -168,7 +168,7 @@ int create_headers(struct flb_s3 *ctx, char *body_md5, s3_headers[n].val_len = strlen(ctx->content_type); n++; } - if (ctx->compression == FLB_AWS_COMPRESS_GZIP && multipart_upload == FLB_FALSE) { + if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { s3_headers[n] = content_encoding_header; n++; } @@ -637,6 +637,25 @@ static int cb_s3_init(struct flb_output_instance *ins, ctx->use_put_object = FLB_TRUE; } + tmp = flb_output_get_property("compression", ins); + if (tmp) { + ret = flb_aws_compression_get_type(tmp); + if (ret == -1) { + flb_plg_error(ctx->ins, "unknown compression: %s", tmp); + return -1; + } + if (ctx->use_put_object == FLB_FALSE && ctx->compression == FLB_AWS_COMPRESS_ARROW) { + flb_plg_error(ctx->ins, + "use_put_object must be enabled when Apache Arrow is enabled"); + return -1; + } + ctx->compression = ret; + } + + tmp = flb_output_get_property("content_type", ins); + if (tmp) { + ctx->content_type = (char *) tmp; + } if (ctx->use_put_object == FLB_FALSE) { /* upload_chunk_size */ if (ctx->upload_chunk_size <= 0) { @@ -652,9 +671,16 @@ static int cb_s3_init(struct flb_output_instance *ins, flb_plg_error(ctx->ins, "upload_chunk_size must be at least 5,242,880 bytes"); return -1; } - if (ctx->upload_chunk_size > MAX_CHUNKED_UPLOAD_SIZE) { - flb_plg_error(ctx->ins, "Max upload_chunk_size is 50M"); - return -1; + if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { + if(ctx->upload_chunk_size > MAX_CHUNKED_UPLOAD_COMPRESS_SIZE) { + flb_plg_error(ctx->ins, "upload_chunk_size in compressed multipart upload cannot exceed 5GB"); + return -1; + } + } else { + if (ctx->upload_chunk_size > MAX_CHUNKED_UPLOAD_SIZE) { + flb_plg_error(ctx->ins, "Max upload_chunk_size is 50MB"); + return -1; + } } } @@ -737,26 +763,6 @@ static int cb_s3_init(struct flb_output_instance *ins, ctx->canned_acl = (char *) tmp; } - tmp = flb_output_get_property("compression", ins); - if (tmp) { - if (ctx->use_put_object == FLB_FALSE) { - flb_plg_error(ctx->ins, - "use_put_object must be enabled when compression is enabled"); - return -1; - } - ret = flb_aws_compression_get_type(tmp); - if (ret == -1) { - flb_plg_error(ctx->ins, "unknown compression: %s", tmp); - return -1; - } - ctx->compression = ret; - } - - tmp = flb_output_get_property("content_type", ins); - if (tmp) { - ctx->content_type = (char *) tmp; - } - tmp = flb_output_get_property("storage_class", ins); if (tmp) { ctx->storage_class = (char *) tmp; @@ -978,6 +984,22 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, int timeout_check = FLB_FALSE; time_t create_time; int ret; + void *payload_buf = NULL; + size_t payload_size = 0; + size_t preCompress_size = 0; + + if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { + /* Map payload */ + ret = flb_aws_compression_compress(ctx->compression, body, body_size, &payload_buf, &payload_size); + if (ret == -1) { + flb_plg_error(ctx->ins, "Failed to compress data"); + return FLB_RETRY; + } else { + preCompress_size = body_size; + body = (void *) payload_buf; + body_size = payload_size; + } + } if (ctx->use_put_object == FLB_TRUE) { goto put_object; @@ -1009,6 +1031,10 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, goto multipart; } else { + if (ctx->use_put_object == FLB_FALSE && ctx->compression == FLB_AWS_COMPRESS_GZIP) { + flb_plg_info(ctx->ins, "Pre-compression upload_chunk_size= %d, After compression, chunk is only %d bytes, " + "the chunk was too small, using PutObject to upload", preCompress_size, body_size); + } goto put_object; } } @@ -1035,6 +1061,9 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, } ret = s3_put_object(ctx, tag, create_time, body, body_size); + if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { + flb_free(payload_buf); + } if (ret < 0) { /* re-add chunk to list */ if (chunk) { @@ -1059,6 +1088,9 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, if (chunk) { s3_store_file_unlock(chunk); } + if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { + flb_free(payload_buf); + } return FLB_RETRY; } } @@ -1070,6 +1102,9 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, if (chunk) { s3_store_file_unlock(chunk); } + if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { + flb_free(payload_buf); + } return FLB_RETRY; } m_upload->upload_state = MULTIPART_UPLOAD_STATE_CREATED; @@ -1077,6 +1112,9 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, ret = upload_part(ctx, m_upload, body, body_size); if (ret < 0) { + if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { + flb_free(payload_buf); + } m_upload->upload_errors += 1; /* re-add chunk to list */ if (chunk) { @@ -1095,13 +1133,14 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, return FLB_RETRY; } m_upload->part_number += 1; - /* data was sent successfully- delete the local buffer */ if (chunk) { s3_store_file_delete(ctx, chunk); chunk = NULL; } - + if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { + flb_free(payload_buf); + } if (m_upload->bytes >= ctx->file_size) { size_check = FLB_TRUE; flb_plg_info(ctx->ins, "Will complete upload for %s because uploaded data is greater" @@ -1144,6 +1183,8 @@ static int put_all_chunks(struct flb_s3 *ctx) struct mk_list *f_head; struct flb_fstore_file *fsf; struct flb_fstore_stream *fs_stream; + void *payload_buf = NULL; + size_t payload_size = 0; char *buffer = NULL; size_t buffer_size; int ret; @@ -1186,6 +1227,18 @@ static int put_all_chunks(struct flb_s3 *ctx) return -1; } + if (ctx->compression != FLB_AWS_COMPRESS_NONE) { + /* Map payload */ + ret = flb_aws_compression_compress(ctx->compression, buffer, buffer_size, &payload_buf, &payload_size); + if (ret == -1) { + flb_plg_error(ctx->ins, "Failed to compress data, uploading uncompressed data instead to prevent data loss"); + } else { + flb_plg_info(ctx->ins, "Pre-compression chunk size is %d, After compression, chunk is %d bytes", buffer_size, payload_size); + buffer = (void *) payload_buf; + buffer_size = payload_size; + } + } + ret = s3_put_object(ctx, (const char *) fsf->meta_buf, chunk->create_time, buffer, buffer_size); @@ -1283,9 +1336,6 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time char *final_key; flb_sds_t uri; flb_sds_t tmp; - void *compressed_body; - char *final_body; - size_t final_body_size; char final_body_md5[25]; s3_key = flb_get_s3_key(ctx->s3_key_format, create_time, tag, ctx->tag_delimiters, @@ -1332,24 +1382,9 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time flb_sds_destroy(s3_key); uri = tmp; - if (ctx->compression != FLB_AWS_COMPRESS_NONE) { - ret = flb_aws_compression_compress(ctx->compression, body, body_size, - &compressed_body, &final_body_size); - if (ret == -1) { - flb_plg_error(ctx->ins, "Failed to compress data"); - flb_sds_destroy(uri); - return -1; - } - final_body = (char *) compressed_body; - } - else { - final_body = body; - final_body_size = body_size; - } - memset(final_body_md5, 0, sizeof(final_body_md5)); if (ctx->send_content_md5 == FLB_TRUE) { - ret = get_md5_base64(final_body, final_body_size, + ret = get_md5_base64(body, body_size, final_body_md5, sizeof(final_body_md5)); if (ret != 0) { flb_plg_error(ctx->ins, "Failed to create Content-MD5 header"); @@ -1383,11 +1418,8 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time goto decrement_index; } c = s3_client->client_vtable->request(s3_client, FLB_HTTP_PUT, - uri, final_body, final_body_size, + uri, body, body_size, headers, num_headers); - if (ctx->compression != FLB_AWS_COMPRESS_NONE) { - flb_free(compressed_body); - } flb_free(headers); } if (c) { diff --git a/plugins/out_s3/s3.h b/plugins/out_s3/s3.h index bb781d60ccc..326959c1bd0 100644 --- a/plugins/out_s3/s3.h +++ b/plugins/out_s3/s3.h @@ -29,6 +29,7 @@ /* Upload data to S3 in 5MB chunks */ #define MIN_CHUNKED_UPLOAD_SIZE 5242880 #define MAX_CHUNKED_UPLOAD_SIZE 50000000 +#define MAX_CHUNKED_UPLOAD_COMPRESS_SIZE 5000000000 #define UPLOAD_TIMER_MAX_WAIT 60000 #define UPLOAD_TIMER_MIN_WAIT 6000