diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index bba11849d58..933176ff9bb 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -138,7 +138,7 @@ int create_headers(struct flb_s3 *ctx, char *body_md5, if (ctx->content_type != NULL) { headers_len++; } - if (ctx->compression == FLB_AWS_COMPRESS_GZIP && multipart_upload == FLB_FALSE) { + if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { headers_len++; } if (ctx->canned_acl != NULL) { @@ -168,7 +168,7 @@ int create_headers(struct flb_s3 *ctx, char *body_md5, s3_headers[n].val_len = strlen(ctx->content_type); n++; } - if (ctx->compression == FLB_AWS_COMPRESS_GZIP && multipart_upload == FLB_FALSE) { + if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { s3_headers[n] = content_encoding_header; n++; } @@ -637,6 +637,25 @@ static int cb_s3_init(struct flb_output_instance *ins, ctx->use_put_object = FLB_TRUE; } + tmp = flb_output_get_property("compression", ins); + if (tmp) { + ret = flb_aws_compression_get_type(tmp); + if (ret == -1) { + flb_plg_error(ctx->ins, "unknown compression: %s", tmp); + return -1; + } + if (ctx->use_put_object == FLB_FALSE && ctx->compression == FLB_AWS_COMPRESS_ARROW) { + flb_plg_error(ctx->ins, + "use_put_object must be enabled when Apache Arrow is enabled"); + return -1; + } + ctx->compression = ret; + } + + tmp = flb_output_get_property("content_type", ins); + if (tmp) { + ctx->content_type = (char *) tmp; + } if (ctx->use_put_object == FLB_FALSE) { /* upload_chunk_size */ if (ctx->upload_chunk_size <= 0) { @@ -652,9 +671,16 @@ static int cb_s3_init(struct flb_output_instance *ins, flb_plg_error(ctx->ins, "upload_chunk_size must be at least 5,242,880 bytes"); return -1; } - if (ctx->upload_chunk_size > MAX_CHUNKED_UPLOAD_SIZE) { - flb_plg_error(ctx->ins, "Max upload_chunk_size is 50M"); - return -1; + if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { + if(ctx->upload_chunk_size > MAX_CHUNKED_UPLOAD_COMPRESS_SIZE) { + flb_plg_error(ctx->ins, "upload_chunk_size in compressed multipart upload cannot exceed 5GB"); + return -1; + } + } else { + if (ctx->upload_chunk_size > MAX_CHUNKED_UPLOAD_SIZE) { + flb_plg_error(ctx->ins, "Max upload_chunk_size is 50MB"); + return -1; + } } } @@ -737,26 +763,6 @@ static int cb_s3_init(struct flb_output_instance *ins, ctx->canned_acl = (char *) tmp; } - tmp = flb_output_get_property("compression", ins); - if (tmp) { - if (ctx->use_put_object == FLB_FALSE) { - flb_plg_error(ctx->ins, - "use_put_object must be enabled when compression is enabled"); - return -1; - } - ret = flb_aws_compression_get_type(tmp); - if (ret == -1) { - flb_plg_error(ctx->ins, "unknown compression: %s", tmp); - return -1; - } - ctx->compression = ret; - } - - tmp = flb_output_get_property("content_type", ins); - if (tmp) { - ctx->content_type = (char *) tmp; - } - tmp = flb_output_get_property("storage_class", ins); if (tmp) { ctx->storage_class = (char *) tmp; @@ -978,6 +984,22 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, int timeout_check = FLB_FALSE; time_t create_time; int ret; + void *payload_buf = NULL; + size_t payload_size = 0; + size_t preCompress_size = 0; + + if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { + /* Map payload */ + ret = flb_aws_compression_compress(ctx->compression, body, body_size, &payload_buf, &payload_size); + if (ret == -1) { + flb_plg_error(ctx->ins, "Failed to compress data"); + return FLB_RETRY; + } else { + preCompress_size = body_size; + body = (void *) payload_buf; + body_size = payload_size; + } + } if (ctx->use_put_object == FLB_TRUE) { goto put_object; @@ -1009,6 +1031,10 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, goto multipart; } else { + if (ctx->use_put_object == FLB_FALSE && ctx->compression == FLB_AWS_COMPRESS_GZIP) { + flb_plg_info(ctx->ins, "Pre-compression upload_chunk_size= %d, After compression, chunk is only %d bytes, " + "the chunk was too small, using PutObject to upload", preCompress_size, body_size); + } goto put_object; } } @@ -1035,6 +1061,9 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, } ret = s3_put_object(ctx, tag, create_time, body, body_size); + if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { + flb_free(payload_buf); + } if (ret < 0) { /* re-add chunk to list */ if (chunk) { @@ -1059,6 +1088,9 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, if (chunk) { s3_store_file_unlock(chunk); } + if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { + flb_free(payload_buf); + } return FLB_RETRY; } } @@ -1070,6 +1102,9 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, if (chunk) { s3_store_file_unlock(chunk); } + if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { + flb_free(payload_buf); + } return FLB_RETRY; } m_upload->upload_state = MULTIPART_UPLOAD_STATE_CREATED; @@ -1077,6 +1112,9 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, ret = upload_part(ctx, m_upload, body, body_size); if (ret < 0) { + if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { + flb_free(payload_buf); + } m_upload->upload_errors += 1; /* re-add chunk to list */ if (chunk) { @@ -1095,13 +1133,14 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, return FLB_RETRY; } m_upload->part_number += 1; - /* data was sent successfully- delete the local buffer */ if (chunk) { s3_store_file_delete(ctx, chunk); chunk = NULL; } - + if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { + flb_free(payload_buf); + } if (m_upload->bytes >= ctx->file_size) { size_check = FLB_TRUE; flb_plg_info(ctx->ins, "Will complete upload for %s because uploaded data is greater" @@ -1144,6 +1183,8 @@ static int put_all_chunks(struct flb_s3 *ctx) struct mk_list *f_head; struct flb_fstore_file *fsf; struct flb_fstore_stream *fs_stream; + void *payload_buf = NULL; + size_t payload_size = 0; char *buffer = NULL; size_t buffer_size; int ret; @@ -1186,6 +1227,18 @@ static int put_all_chunks(struct flb_s3 *ctx) return -1; } + if (ctx->compression != FLB_AWS_COMPRESS_NONE) { + /* Map payload */ + ret = flb_aws_compression_compress(ctx->compression, buffer, buffer_size, &payload_buf, &payload_size); + if (ret == -1) { + flb_plg_error(ctx->ins, "Failed to compress data, uploading uncompressed data instead to prevent data loss"); + } else { + flb_plg_info(ctx->ins, "Pre-compression chunk size is %d, After compression, chunk is %d bytes", buffer_size, payload_size); + buffer = (void *) payload_buf; + buffer_size = payload_size; + } + } + ret = s3_put_object(ctx, (const char *) fsf->meta_buf, chunk->create_time, buffer, buffer_size); @@ -1283,9 +1336,6 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time char *final_key; flb_sds_t uri; flb_sds_t tmp; - void *compressed_body; - char *final_body; - size_t final_body_size; char final_body_md5[25]; s3_key = flb_get_s3_key(ctx->s3_key_format, create_time, tag, ctx->tag_delimiters, @@ -1332,24 +1382,9 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time flb_sds_destroy(s3_key); uri = tmp; - if (ctx->compression != FLB_AWS_COMPRESS_NONE) { - ret = flb_aws_compression_compress(ctx->compression, body, body_size, - &compressed_body, &final_body_size); - if (ret == -1) { - flb_plg_error(ctx->ins, "Failed to compress data"); - flb_sds_destroy(uri); - return -1; - } - final_body = (char *) compressed_body; - } - else { - final_body = body; - final_body_size = body_size; - } - memset(final_body_md5, 0, sizeof(final_body_md5)); if (ctx->send_content_md5 == FLB_TRUE) { - ret = get_md5_base64(final_body, final_body_size, + ret = get_md5_base64(body, body_size, final_body_md5, sizeof(final_body_md5)); if (ret != 0) { flb_plg_error(ctx->ins, "Failed to create Content-MD5 header"); @@ -1383,11 +1418,8 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time goto decrement_index; } c = s3_client->client_vtable->request(s3_client, FLB_HTTP_PUT, - uri, final_body, final_body_size, + uri, body, body_size, headers, num_headers); - if (ctx->compression != FLB_AWS_COMPRESS_NONE) { - flb_free(compressed_body); - } flb_free(headers); } if (c) { diff --git a/plugins/out_s3/s3.h b/plugins/out_s3/s3.h index bb781d60ccc..326959c1bd0 100644 --- a/plugins/out_s3/s3.h +++ b/plugins/out_s3/s3.h @@ -29,6 +29,7 @@ /* Upload data to S3 in 5MB chunks */ #define MIN_CHUNKED_UPLOAD_SIZE 5242880 #define MAX_CHUNKED_UPLOAD_SIZE 50000000 +#define MAX_CHUNKED_UPLOAD_COMPRESS_SIZE 5000000000 #define UPLOAD_TIMER_MAX_WAIT 60000 #define UPLOAD_TIMER_MIN_WAIT 6000