diff --git a/include/aws/s3/private/s3_client_impl.h b/include/aws/s3/private/s3_client_impl.h index 14aec8c6c..effdf8e03 100644 --- a/include/aws/s3/private/s3_client_impl.h +++ b/include/aws/s3/private/s3_client_impl.h @@ -162,6 +162,9 @@ struct aws_s3_client_vtable { void (*endpoint_shutdown_callback)(struct aws_s3_client *client); void (*finish_destroy)(struct aws_s3_client *client); + + struct aws_parallel_input_stream *( + *parallel_input_stream_new_from_file)(struct aws_allocator *allocator, struct aws_byte_cursor file_name); }; /* Represents the state of the S3 client. */ diff --git a/include/aws/s3/private/s3_meta_request_impl.h b/include/aws/s3/private/s3_meta_request_impl.h index 6cdc01c67..0c1c9fd83 100644 --- a/include/aws/s3/private/s3_meta_request_impl.h +++ b/include/aws/s3/private/s3_meta_request_impl.h @@ -138,9 +138,12 @@ struct aws_s3_meta_request { /* Initial HTTP Message that this meta request is based on. */ struct aws_http_message *initial_request_message; - /* Async stream for meta request's body. - * NULL if using initial_request_message's synchronous body stream instead. */ + /* The meta request's outgoing body comes from one of these: + * 1) request_body_async_stream: if set, then async stream 1 part at a time + * 2) request_body_parallel_stream: if set, then stream multiple parts in parallel + * 3) initial_request_message's body_stream: else synchronously stream parts */ struct aws_async_input_stream *request_body_async_stream; + struct aws_parallel_input_stream *request_body_parallel_stream; /* Part size to use for uploads and downloads. Passed down by the creating client. */ const size_t part_size; @@ -363,10 +366,13 @@ bool aws_s3_meta_request_are_events_out_for_delivery_synced(struct aws_s3_meta_r * It may read from the underlying stream multiple times, if that's what it takes to fill the buffer. * Returns a future whose result bool indicates whether end of stream was reached. * This future may complete on any thread, and may complete synchronously. + * + * Read from offset to fill the buffer */ AWS_S3_API struct aws_future_bool *aws_s3_meta_request_read_body( struct aws_s3_meta_request *meta_request, + uint64_t offset, struct aws_byte_buf *buffer); bool aws_s3_meta_request_body_has_no_more_data(const struct aws_s3_meta_request *meta_request); diff --git a/include/aws/s3/private/s3_parallel_input_stream.h b/include/aws/s3/private/s3_parallel_input_stream.h new file mode 100644 index 000000000..de7fa814a --- /dev/null +++ b/include/aws/s3/private/s3_parallel_input_stream.h @@ -0,0 +1,105 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#ifndef AWS_S3_PARALLEL_INPUT_STREAM_H +#define AWS_S3_PARALLEL_INPUT_STREAM_H + +#include + +#include + +AWS_PUSH_SANE_WARNING_LEVEL + +struct aws_byte_buf; +struct aws_future_bool; +struct aws_input_stream; + +struct aws_event_loop_group; + +struct aws_parallel_input_stream { + const struct aws_parallel_input_stream_vtable *vtable; + struct aws_allocator *alloc; + struct aws_ref_count ref_count; + + void *impl; +}; + +struct aws_parallel_input_stream_vtable { + /** + * Destroy the stream, its refcount has reached 0. + */ + void (*destroy)(struct aws_parallel_input_stream *stream); + + /** + * Read into the buffer in parallel. + * The implementation needs to support this to be invoked concurrently from multiple threads + */ + struct aws_future_bool *( + *read)(struct aws_parallel_input_stream *stream, uint64_t offset, struct aws_byte_buf *dest); +}; + +AWS_EXTERN_C_BEGIN + +/** + * Initialize aws_parallel_input_stream "base class" + */ +AWS_S3_API +void aws_parallel_input_stream_init_base( + struct aws_parallel_input_stream *stream, + struct aws_allocator *alloc, + const struct aws_parallel_input_stream_vtable *vtable, + void *impl); + +/** + * Increment reference count. + * You may pass in NULL (has no effect). + * Returns whatever pointer was passed in. + */ +AWS_S3_API +struct aws_parallel_input_stream *aws_parallel_input_stream_acquire(struct aws_parallel_input_stream *stream); + +/** + * Decrement reference count. + * You may pass in NULL (has no effect). + * Always returns NULL. + */ +AWS_S3_API +struct aws_parallel_input_stream *aws_parallel_input_stream_release(struct aws_parallel_input_stream *stream); + +/** + * Read from the offset until fill the dest, or EOF reached. + * It's thread safe to be called from multiple threads without waiting for other read to complete + * + * @param stream The stream to read from + * @param offset The offset in the stream from beginning to start reading + * @param dest The output buffer read to + * @return a future, which will contain an error code if something went wrong, + * or a result bool indicating whether EOF has been reached. + */ +AWS_S3_API +struct aws_future_bool *aws_parallel_input_stream_read( + struct aws_parallel_input_stream *stream, + uint64_t offset, + struct aws_byte_buf *dest); + +/** + * Create a new file based parallel input stream. + * + * This implementation will open a file handler when the read happens, and seek to the offset to start reading. Close + * the file handler as read finishes. + * + * @param allocator memory allocator + * @param file_name The file path to read from + * @return aws_parallel_input_stream + */ +AWS_S3_API +struct aws_parallel_input_stream *aws_parallel_input_stream_new_from_file( + struct aws_allocator *allocator, + struct aws_byte_cursor file_name); + +AWS_EXTERN_C_END +AWS_POP_SANE_WARNING_LEVEL + +#endif /* AWS_S3_PARALLEL_INPUT_STREAM_H */ diff --git a/include/aws/s3/private/s3_request_messages.h b/include/aws/s3/private/s3_request_messages.h index 76fee4c1b..f0096754c 100644 --- a/include/aws/s3/private/s3_request_messages.h +++ b/include/aws/s3/private/s3_request_messages.h @@ -37,13 +37,6 @@ struct aws_http_message *aws_s3_message_util_copy_http_message_no_body_filter_he size_t excluded_headers_size, bool exclude_x_amz_meta); -/* Copy message and retain all headers, but replace body with one that reads directly from a filepath. */ -AWS_S3_API -struct aws_http_message *aws_s3_message_util_copy_http_message_filepath_body_all_headers( - struct aws_allocator *allocator, - struct aws_http_message *message, - struct aws_byte_cursor filepath); - /* Copy headers from one message to the other and exclude specific headers. * exclude_x_amz_meta controls whether S3 user metadata headers (prefixed with "x-amz-meta) are excluded.*/ AWS_S3_API diff --git a/include/aws/s3/s3.h b/include/aws/s3/s3.h index 1d0690ef9..72f91492a 100644 --- a/include/aws/s3/s3.h +++ b/include/aws/s3/s3.h @@ -40,6 +40,7 @@ enum aws_s3_errors { AWS_ERROR_S3_METRIC_DATA_NOT_AVAILABLE, AWS_ERROR_S3_INCORRECT_CONTENT_LENGTH, AWS_ERROR_S3_REQUEST_TIME_TOO_SKEWED, + AWS_ERROR_S3_FILE_MODIFIED, AWS_ERROR_S3_END_RANGE = AWS_ERROR_ENUM_END_RANGE(AWS_C_S3_PACKAGE_ID) }; diff --git a/source/s3.c b/source/s3.c index a08b6e812..ab2b8f0a3 100644 --- a/source/s3.c +++ b/source/s3.c @@ -39,6 +39,7 @@ static struct aws_error_info s_errors[] = { AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_METRIC_DATA_NOT_AVAILABLE, "The metric data is not available, the requests ends before the metric happens."), AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_INCORRECT_CONTENT_LENGTH, "Request body length must match Content-Length header."), AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_REQUEST_TIME_TOO_SKEWED, "RequestTimeTooSkewed error received from S3."), + AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_FILE_MODIFIED, "The file was modified during upload."), }; /* clang-format on */ diff --git a/source/s3_auto_ranged_put.c b/source/s3_auto_ranged_put.c index 26bcbd568..946d6137c 100644 --- a/source/s3_auto_ranged_put.c +++ b/source/s3_auto_ranged_put.c @@ -717,7 +717,10 @@ static bool s_s3_auto_ranged_put_update( * Basically returns either part size or if content is not equally divisible into parts, the size of the remaining last * part. */ -static size_t s_compute_request_body_size(const struct aws_s3_meta_request *meta_request, uint32_t part_number) { +static size_t s_compute_request_body_size( + const struct aws_s3_meta_request *meta_request, + uint32_t part_number, + uint64_t *offset_out) { AWS_PRECONDITION(meta_request); const struct aws_s3_auto_ranged_put *auto_ranged_put = meta_request->impl; @@ -731,6 +734,8 @@ static size_t s_compute_request_body_size(const struct aws_s3_meta_request *meta request_body_size = content_remainder; } } + /* The part_number starts at 1 */ + *offset_out = (part_number - 1) * meta_request->part_size; return request_body_size; } @@ -895,8 +900,9 @@ static void s_skip_parts_from_stream_loop(void *user_data) { AWS_ASSERT(skip_job->part_being_skipped != NULL); aws_s3_meta_request_unlock_synced_data(meta_request); /* END CRITICAL SECTION */ + uint64_t offset = 0; - size_t request_body_size = s_compute_request_body_size(meta_request, skip_job->part_index + 1); + size_t request_body_size = s_compute_request_body_size(meta_request, skip_job->part_index + 1, &offset); if (request_body_size != skip_job->part_being_skipped->size) { error_code = AWS_ERROR_S3_RESUME_FAILED; AWS_LOGF_ERROR( @@ -915,7 +921,8 @@ static void s_skip_parts_from_stream_loop(void *user_data) { aws_byte_buf_reset(temp_body_buf, false); } - skip_job->asyncstep_read_each_part = aws_s3_meta_request_read_body(skip_job->meta_request, temp_body_buf); + skip_job->asyncstep_read_each_part = + aws_s3_meta_request_read_body(skip_job->meta_request, offset, temp_body_buf); /* the read may or may not complete immediately */ if (aws_future_bool_register_callback_if_not_done( @@ -1090,6 +1097,7 @@ struct aws_future_http_message *s_s3_prepare_create_multipart_upload(struct aws_ } return future; } + /* Prepare an UploadPart request */ struct aws_future_http_message *s_s3_prepare_upload_part(struct aws_s3_request *request) { struct aws_s3_meta_request *meta_request = request->meta_request; @@ -1109,10 +1117,16 @@ struct aws_future_http_message *s_s3_prepare_upload_part(struct aws_s3_request * * Next async step: read through the body stream until we've * skipped over parts that were already uploaded (in case we're resuming * from an upload that had been paused) */ - part_prep->asyncstep1_skip_prev_parts = s_skip_parts_from_stream( - meta_request, auto_ranged_put->prepare_data.part_index_for_skipping, request->part_number - 1); - aws_future_void_register_callback( - part_prep->asyncstep1_skip_prev_parts, s_s3_prepare_upload_part_on_skipping_done, part_prep); + + if (meta_request->request_body_parallel_stream) { + /* For parallel read stream, which is seekable, don't need to skip the part by reading from the stream. */ + s_s3_prepare_upload_part_on_skipping_done((void *)part_prep); + } else { + part_prep->asyncstep1_skip_prev_parts = s_skip_parts_from_stream( + meta_request, auto_ranged_put->prepare_data.part_index_for_skipping, request->part_number - 1); + aws_future_void_register_callback( + part_prep->asyncstep1_skip_prev_parts, s_s3_prepare_upload_part_on_skipping_done, part_prep); + } } else { /* Not the first time preparing request (e.g. retry). * We can skip over the async steps that read the body stream */ @@ -1128,21 +1142,22 @@ static void s_s3_prepare_upload_part_on_skipping_done(void *user_data) { struct aws_s3_request *request = part_prep->request; struct aws_s3_meta_request *meta_request = request->meta_request; - int error_code = aws_future_void_get_error(part_prep->asyncstep1_skip_prev_parts); - - /* If skipping failed, the prepare-upload-part job has failed. */ - if (error_code) { - s_s3_prepare_upload_part_finish(part_prep, error_code); - return; + if (part_prep->asyncstep1_skip_prev_parts) { + int error_code = aws_future_void_get_error(part_prep->asyncstep1_skip_prev_parts); + /* If skipping failed, the prepare-upload-part job has failed. */ + if (error_code) { + s_s3_prepare_upload_part_finish(part_prep, error_code); + return; + } } - /* Skipping succeeded. * Next async step: read body stream for this part into a buffer */ + uint64_t offset = 0; - size_t request_body_size = s_compute_request_body_size(meta_request, request->part_number); + size_t request_body_size = s_compute_request_body_size(meta_request, request->part_number, &offset); aws_byte_buf_init(&request->request_body, meta_request->allocator, request_body_size); - part_prep->asyncstep2_read_part = aws_s3_meta_request_read_body(meta_request, &request->request_body); + part_prep->asyncstep2_read_part = aws_s3_meta_request_read_body(meta_request, offset, &request->request_body); aws_future_bool_register_callback( part_prep->asyncstep2_read_part, s_s3_prepare_upload_part_on_read_done, part_prep); } @@ -1195,6 +1210,15 @@ static void s_s3_prepare_upload_part_on_read_done(void *user_data) { if (!request->is_noop) { auto_ranged_put->prepare_data.part_index_for_skipping = request->part_number; + /* The part can finish out of order. Resize array-list to be long enough to hold this part, + * filling any intermediate slots with NULL. */ + + aws_array_list_ensure_capacity(&auto_ranged_put->synced_data.part_list, request->part_number); + while (aws_array_list_length(&auto_ranged_put->synced_data.part_list) < request->part_number) { + struct aws_s3_mpu_part_info *null_part = NULL; + aws_array_list_push_back(&auto_ranged_put->synced_data.part_list, &null_part); + } + /* Add part to array-list */ struct aws_s3_mpu_part_info *part = aws_mem_calloc(meta_request->allocator, 1, sizeof(struct aws_s3_mpu_part_info)); diff --git a/source/s3_client.c b/source/s3_client.c index 81653fba4..405290466 100644 --- a/source/s3_client.c +++ b/source/s3_client.c @@ -9,6 +9,7 @@ #include "aws/s3/private/s3_copy_object.h" #include "aws/s3/private/s3_default_meta_request.h" #include "aws/s3/private/s3_meta_request_impl.h" +#include "aws/s3/private/s3_parallel_input_stream.h" #include "aws/s3/private/s3_request_messages.h" #include "aws/s3/private/s3_util.h" @@ -126,6 +127,7 @@ static struct aws_s3_client_vtable s_s3_client_default_vtable = { .process_work = s_s3_client_process_work_default, .endpoint_shutdown_callback = s_s3_client_endpoint_shutdown_callback, .finish_destroy = s_s3_client_finish_destroy_default, + .parallel_input_stream_new_from_file = aws_parallel_input_stream_new_from_file, }; void aws_s3_set_dns_ttl(size_t ttl) { diff --git a/source/s3_default_meta_request.c b/source/s3_default_meta_request.c index f5db6de6e..420eecf80 100644 --- a/source/s3_default_meta_request.c +++ b/source/s3_default_meta_request.c @@ -243,7 +243,8 @@ static struct aws_future_void *s_s3_default_prepare_request(struct aws_s3_reques aws_byte_buf_init(&request->request_body, meta_request->allocator, meta_request_default->content_length); /* Kick off the async read */ - request_prep->step1_read_body = aws_s3_meta_request_read_body(meta_request, &request->request_body); + request_prep->step1_read_body = + aws_s3_meta_request_read_body(meta_request, 0 /*offset*/, &request->request_body); aws_future_bool_register_callback( request_prep->step1_read_body, s_s3_default_prepare_request_on_read_done, request_prep); } else { diff --git a/source/s3_meta_request.c b/source/s3_meta_request.c index b529a57f3..5607439a9 100644 --- a/source/s3_meta_request.c +++ b/source/s3_meta_request.c @@ -7,6 +7,7 @@ #include "aws/s3/private/s3_checksums.h" #include "aws/s3/private/s3_client_impl.h" #include "aws/s3/private/s3_meta_request_impl.h" +#include "aws/s3/private/s3_parallel_input_stream.h" #include "aws/s3/private/s3_request_messages.h" #include "aws/s3/private/s3_util.h" #include @@ -243,16 +244,25 @@ int aws_s3_meta_request_init_base( meta_request->cached_signing_config = aws_cached_signing_config_new(allocator, options->signing_config); } + /* Client is currently optional to allow spinning up a meta_request without a client in a test. */ + if (client != NULL) { + meta_request->client = aws_s3_client_acquire(client); + meta_request->io_event_loop = aws_event_loop_group_get_next_loop(client->body_streaming_elg); + meta_request->synced_data.read_window_running_total = client->initial_read_window; + } + /* Set initial_meta_request, based on how the request's body is being passed in * (we checked earlier that it's not being passed multiple ways) */ if (options->send_filepath.len > 0) { - /* Create copy of original message, but with body-stream that reads directly from file */ - meta_request->initial_request_message = aws_s3_message_util_copy_http_message_filepath_body_all_headers( - allocator, options->message, options->send_filepath); - if (meta_request->initial_request_message == NULL) { + /* Create parallel read stream from file */ + meta_request->request_body_parallel_stream = + client->vtable->parallel_input_stream_new_from_file(allocator, options->send_filepath); + if (meta_request->request_body_parallel_stream == NULL) { goto error; } + /* but keep original message around for headers, method, etc */ + meta_request->initial_request_message = aws_http_message_acquire(options->message); } else if (options->send_async_stream != NULL) { /* Read from async body-stream, but keep original message around for headers, method, etc */ meta_request->request_body_async_stream = aws_async_input_stream_acquire(options->send_async_stream); @@ -263,14 +273,6 @@ int aws_s3_meta_request_init_base( meta_request->initial_request_message = aws_http_message_acquire(options->message); } - /* Client is currently optional to allow spinning up a meta_request without a client in a test. */ - if (client != NULL) { - aws_s3_client_acquire(client); - meta_request->client = client; - meta_request->io_event_loop = aws_event_loop_group_get_next_loop(client->body_streaming_elg); - meta_request->synced_data.read_window_running_total = client->initial_read_window; - } - meta_request->synced_data.next_streaming_part = 1; meta_request->meta_request_level_running_response_sum = NULL; @@ -628,7 +630,14 @@ static void s_s3_meta_request_schedule_prepare_request_default( aws_task_init( &payload->task, s_s3_meta_request_prepare_request_task, payload, "s3_meta_request_prepare_request_task"); - aws_event_loop_schedule_task_now(meta_request->io_event_loop, &payload->task); + if (meta_request->request_body_parallel_stream) { + /* The body stream supports reading in parallel, so schedule task on any I/O thread. + * If we always used the meta-request's dedicated io_event_loop, we wouldn't get any parallelism. */ + struct aws_event_loop *loop = aws_event_loop_group_get_next_loop(client->body_streaming_elg); + aws_event_loop_schedule_task_now(loop, &payload->task); + } else { + aws_event_loop_schedule_task_now(meta_request->io_event_loop, &payload->task); + } } static void s_s3_meta_request_prepare_request_task(struct aws_task *task, void *arg, enum aws_task_status task_status) { @@ -1674,6 +1683,8 @@ void aws_s3_meta_request_finish_default(struct aws_s3_meta_request *meta_request * that, the downstream high level language doesn't need to wait for shutdown to clean related resource (eg: input * stream) */ meta_request->request_body_async_stream = aws_async_input_stream_release(meta_request->request_body_async_stream); + meta_request->request_body_parallel_stream = + aws_parallel_input_stream_release(meta_request->request_body_parallel_stream); meta_request->initial_request_message = aws_http_message_release(meta_request->initial_request_message); if (meta_request->finish_callback != NULL) { @@ -1690,6 +1701,7 @@ void aws_s3_meta_request_finish_default(struct aws_s3_meta_request *meta_request struct aws_future_bool *aws_s3_meta_request_read_body( struct aws_s3_meta_request *meta_request, + uint64_t offset, struct aws_byte_buf *buffer) { AWS_PRECONDITION(meta_request); @@ -1700,6 +1712,11 @@ struct aws_future_bool *aws_s3_meta_request_read_body( return aws_async_input_stream_read_to_fill(meta_request->request_body_async_stream, buffer); } + /* If parallel-stream, simply call read(), which must fill the buffer and/or EOF */ + if (meta_request->request_body_parallel_stream != NULL) { + return aws_parallel_input_stream_read(meta_request->request_body_parallel_stream, offset, buffer); + } + /* Else synchronous aws_input_stream */ struct aws_input_stream *synchronous_stream = aws_http_message_get_body_stream(meta_request->initial_request_message); diff --git a/source/s3_parallel_input_stream.c b/source/s3_parallel_input_stream.c new file mode 100644 index 000000000..461525762 --- /dev/null +++ b/source/s3_parallel_input_stream.c @@ -0,0 +1,140 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include "aws/s3/private/s3_parallel_input_stream.h" + +#include + +#include +#include + +#include + +void aws_parallel_input_stream_init_base( + struct aws_parallel_input_stream *stream, + struct aws_allocator *alloc, + const struct aws_parallel_input_stream_vtable *vtable, + void *impl) { + + AWS_ZERO_STRUCT(*stream); + stream->alloc = alloc; + stream->vtable = vtable; + stream->impl = impl; + aws_ref_count_init(&stream->ref_count, stream, (aws_simple_completion_callback *)vtable->destroy); +} + +struct aws_parallel_input_stream *aws_parallel_input_stream_acquire(struct aws_parallel_input_stream *stream) { + if (stream != NULL) { + aws_ref_count_acquire(&stream->ref_count); + } + return stream; +} + +struct aws_parallel_input_stream *aws_parallel_input_stream_release(struct aws_parallel_input_stream *stream) { + if (stream != NULL) { + aws_ref_count_release(&stream->ref_count); + } + return NULL; +} + +struct aws_future_bool *aws_parallel_input_stream_read( + struct aws_parallel_input_stream *stream, + uint64_t offset, + struct aws_byte_buf *dest) { + /* Ensure the buffer has space available */ + if (dest->len == dest->capacity) { + struct aws_future_bool *future = aws_future_bool_new(stream->alloc); + aws_future_bool_set_error(future, AWS_ERROR_SHORT_BUFFER); + return future; + } + struct aws_future_bool *future = stream->vtable->read(stream, offset, dest); + return future; +} + +struct aws_parallel_input_stream_from_file_impl { + struct aws_parallel_input_stream base; + + struct aws_string *file_path; +}; + +static void s_para_from_file_destroy(struct aws_parallel_input_stream *stream) { + struct aws_parallel_input_stream_from_file_impl *impl = stream->impl; + + aws_string_destroy(impl->file_path); + + aws_mem_release(stream->alloc, impl); +} + +struct aws_future_bool *s_para_from_file_read( + struct aws_parallel_input_stream *stream, + uint64_t offset, + struct aws_byte_buf *dest) { + + struct aws_future_bool *future = aws_future_bool_new(stream->alloc); + struct aws_parallel_input_stream_from_file_impl *impl = stream->impl; + bool success = false; + struct aws_input_stream *file_stream = NULL; + struct aws_stream_status status = { + .is_end_of_stream = false, + .is_valid = true, + }; + + file_stream = aws_input_stream_new_from_file(stream->alloc, aws_string_c_str(impl->file_path)); + if (!file_stream) { + goto done; + } + + if (aws_input_stream_seek(file_stream, offset, AWS_SSB_BEGIN)) { + goto done; + } + /* Keep reading until fill the buffer. + * Note that we must read() after seek() to determine if we're EOF, the seek alone won't trigger it. */ + while ((dest->len < dest->capacity) && !status.is_end_of_stream) { + /* Read from stream */ + if (aws_input_stream_read(file_stream, dest) != AWS_OP_SUCCESS) { + goto done; + } + + /* Check if stream is done */ + if (aws_input_stream_get_status(file_stream, &status) != AWS_OP_SUCCESS) { + goto done; + } + } + success = true; +done: + if (success) { + aws_future_bool_set_result(future, status.is_end_of_stream); + } else { + aws_future_bool_set_error(future, aws_last_error()); + } + + aws_input_stream_release(file_stream); + + return future; +} + +static struct aws_parallel_input_stream_vtable s_parallel_input_stream_from_file_vtable = { + .destroy = s_para_from_file_destroy, + .read = s_para_from_file_read, +}; + +struct aws_parallel_input_stream *aws_parallel_input_stream_new_from_file( + struct aws_allocator *allocator, + struct aws_byte_cursor file_name) { + + struct aws_parallel_input_stream_from_file_impl *impl = + aws_mem_calloc(allocator, 1, sizeof(struct aws_parallel_input_stream_from_file_impl)); + aws_parallel_input_stream_init_base(&impl->base, allocator, &s_parallel_input_stream_from_file_vtable, impl); + impl->file_path = aws_string_new_from_cursor(allocator, &file_name); + if (!aws_path_exists(impl->file_path)) { + /* If file path not exists, raise error from errno. */ + aws_translate_and_raise_io_error(errno); + goto error; + } + return &impl->base; +error: + s_para_from_file_destroy(&impl->base); + return NULL; +} diff --git a/source/s3_request_messages.c b/source/s3_request_messages.c index 89d76e883..2549c208b 100644 --- a/source/s3_request_messages.c +++ b/source/s3_request_messages.c @@ -938,49 +938,6 @@ struct aws_http_message *aws_s3_message_util_copy_http_message_no_body_filter_he return NULL; } -/* Copy message and retain all headers, but replace body with one that reads directly from a filepath. */ -struct aws_http_message *aws_s3_message_util_copy_http_message_filepath_body_all_headers( - struct aws_allocator *allocator, - struct aws_http_message *base_message, - struct aws_byte_cursor filepath) { - - bool success = false; - struct aws_string *filepath_str = NULL; - struct aws_input_stream *body_stream = NULL; - struct aws_http_message *message = NULL; - - /* Copy message and retain all headers */ - message = aws_s3_message_util_copy_http_message_no_body_filter_headers( - allocator, - base_message, - NULL /*excluded_header_array*/, - 0 /*excluded_header_array_size*/, - false /*exclude_x_amz_meta*/); - if (!message) { - goto clean_up; - } - - /* Create body-stream that reads from file */ - filepath_str = aws_string_new_from_cursor(allocator, &filepath); - body_stream = aws_input_stream_new_from_file(allocator, aws_string_c_str(filepath_str)); - if (!body_stream) { - goto clean_up; - } - aws_http_message_set_body_stream(message, body_stream); - - success = true; - -clean_up: - aws_string_destroy(filepath_str); - aws_input_stream_release(body_stream); - if (success) { - return message; - } else { - aws_http_message_release(message); - return NULL; - } -} - void aws_s3_message_util_copy_headers( struct aws_http_message *source_message, struct aws_http_message *dest_message, diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index f455c0206..ff2311da1 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -77,6 +77,7 @@ add_net_test_case(test_s3_put_object_tls_enabled) add_net_test_case(test_s3_put_object_tls_default) add_net_test_case(test_s3_multipart_put_object_with_acl) add_net_test_case(test_s3_put_object_multiple) +add_net_test_case(test_s3_put_object_multiple_with_filepath) add_net_test_case(test_s3_put_object_less_than_part_size) add_net_test_case(test_s3_put_object_less_than_part_size_with_content_encoding) add_net_test_case(test_s3_put_object_mpu_with_content_encoding) @@ -138,6 +139,9 @@ add_net_test_case(test_s3_round_trip_mpu_multipart_get_fc) add_net_test_case(test_s3_round_trip_mpu_multipart_get_with_list_algorithm_fc) add_net_test_case(test_s3_round_trip_mpu_default_get_fc) add_net_test_case(test_s3_round_trip_with_filepath) +add_net_test_case(test_s3_round_trip_mpu_with_filepath) +add_net_test_case(test_s3_round_trip_with_filepath_no_content_length) +add_net_test_case(test_s3_round_trip_mpu_with_filepath_no_content_length) add_net_test_case(test_s3_chunked_then_unchunked) add_net_test_case(test_s3_cancel_mpu_one_part_completed_fc) @@ -159,6 +163,7 @@ add_net_test_case(test_s3_error_missing_file) add_net_test_case(test_s3_existing_host_entry) add_net_test_case(test_s3_put_fail_object_invalid_request) add_net_test_case(test_s3_put_fail_object_invalid_send_filepath) +add_net_test_case(test_s3_put_fail_object_bad_parallel_read_stream) add_net_test_case(test_s3_put_fail_object_inputstream_fail_reading) add_net_test_case(test_s3_put_fail_object_inputstream_mismatch_content_length) add_net_test_case(test_s3_put_single_part_fail_object_inputstream_fail_reading) @@ -284,6 +289,9 @@ if(AWS_ENABLE_S3_ENDPOINT_RESOLVER) add_test_case(test_s3_endpoint_resolver_resolve_endpoint_force_path_style) endif() +add_test_case(parallel_read_stream_from_file_sanity_test) +add_test_case(parallel_read_stream_from_large_file_test) + set(TEST_BINARY_NAME ${PROJECT_NAME}-tests) generate_test_driver(${TEST_BINARY_NAME}) diff --git a/tests/s3_data_plane_tests.c b/tests/s3_data_plane_tests.c index 11ae454ee..89a155030 100644 --- a/tests/s3_data_plane_tests.c +++ b/tests/s3_data_plane_tests.c @@ -1692,18 +1692,15 @@ static int s_test_s3_multipart_put_object_with_acl(struct aws_allocator *allocat return 0; } -AWS_TEST_CASE(test_s3_put_object_multiple, s_test_s3_put_object_multiple) -static int s_test_s3_put_object_multiple(struct aws_allocator *allocator, void *ctx) { - (void)ctx; +static int s_test_s3_put_object_multiple_helper(struct aws_allocator *allocator, bool file_on_disk) { - struct aws_s3_meta_request *meta_requests[2]; - struct aws_s3_meta_request_test_results meta_request_test_results[2]; - struct aws_http_message *messages[2]; - struct aws_input_stream *input_streams[2]; - struct aws_byte_buf input_stream_buffers[2]; - size_t num_meta_requests = AWS_ARRAY_SIZE(meta_requests); - - ASSERT_TRUE(num_meta_requests == AWS_ARRAY_SIZE(meta_request_test_results)); +#define NUM_REQUESTS 5 + struct aws_s3_meta_request *meta_requests[NUM_REQUESTS]; + struct aws_s3_meta_request_test_results meta_request_test_results[NUM_REQUESTS]; + struct aws_http_message *messages[NUM_REQUESTS]; + struct aws_input_stream *input_streams[NUM_REQUESTS]; + struct aws_byte_buf input_stream_buffers[NUM_REQUESTS]; + struct aws_string *filepath_str[NUM_REQUESTS]; struct aws_s3_tester tester; AWS_ZERO_STRUCT(tester); @@ -1720,7 +1717,9 @@ static int s_test_s3_put_object_multiple(struct aws_allocator *allocator, void * struct aws_string *host_name = aws_s3_tester_build_endpoint_string(allocator, &g_test_bucket_name, &g_test_s3_region); - for (size_t i = 0; i < num_meta_requests; ++i) { + size_t content_length = MB_TO_BYTES(10); + + for (size_t i = 0; i < NUM_REQUESTS; ++i) { aws_s3_meta_request_test_results_init(&meta_request_test_results[i], allocator); char object_path_buffer[128] = ""; snprintf( @@ -1730,18 +1729,26 @@ static int s_test_s3_put_object_multiple(struct aws_allocator *allocator, void * AWS_BYTE_CURSOR_PRI(g_put_object_prefix), i); AWS_ZERO_STRUCT(input_stream_buffers[i]); - aws_s3_create_test_buffer(allocator, 10 * 1024ULL * 1024ULL, &input_stream_buffers[i]); + aws_s3_create_test_buffer(allocator, content_length, &input_stream_buffers[i]); struct aws_byte_cursor test_body_cursor = aws_byte_cursor_from_buf(&input_stream_buffers[i]); input_streams[i] = aws_input_stream_new_from_cursor(allocator, &test_body_cursor); struct aws_byte_cursor test_object_path = aws_byte_cursor_from_c_str(object_path_buffer); struct aws_byte_cursor host_cur = aws_byte_cursor_from_string(host_name); - messages[i] = aws_s3_test_put_object_request_new( - allocator, &host_cur, test_object_path, g_test_body_content_type, input_streams[i], 0); + struct aws_s3_meta_request_options options; AWS_ZERO_STRUCT(options); options.type = AWS_S3_META_REQUEST_TYPE_PUT_OBJECT; + if (file_on_disk) { + filepath_str[i] = aws_s3_tester_create_file(allocator, test_object_path, input_streams[i]); + messages[i] = aws_s3_test_put_object_request_new_without_body( + allocator, &host_cur, g_test_body_content_type, test_object_path, content_length, 0 /*flags*/); + options.send_filepath = aws_byte_cursor_from_string(filepath_str[i]); + } else { + filepath_str[i] = NULL; + messages[i] = aws_s3_test_put_object_request_new( + allocator, &host_cur, test_object_path, g_test_body_content_type, input_streams[i], 0); + } options.message = messages[i]; - ASSERT_SUCCESS(aws_s3_tester_bind_meta_request(&tester, &options, &meta_request_test_results[i])); /* Trigger accelerating of our Put Object request. */ @@ -1757,21 +1764,25 @@ static int s_test_s3_put_object_multiple(struct aws_allocator *allocator, void * ASSERT_TRUE(tester.synced_data.finish_error_code == AWS_ERROR_SUCCESS); aws_s3_tester_unlock_synced_data(&tester); - for (size_t i = 0; i < num_meta_requests; ++i) { + for (size_t i = 0; i < NUM_REQUESTS; ++i) { meta_requests[i] = aws_s3_meta_request_release(meta_requests[i]); } aws_s3_tester_wait_for_meta_request_shutdown(&tester); - for (size_t i = 0; i < num_meta_requests; ++i) { + for (size_t i = 0; i < NUM_REQUESTS; ++i) { aws_s3_tester_validate_get_object_results(&meta_request_test_results[i], 0); aws_s3_meta_request_test_results_clean_up(&meta_request_test_results[i]); } - for (size_t i = 0; i < num_meta_requests; ++i) { + for (size_t i = 0; i < NUM_REQUESTS; ++i) { aws_http_message_release(messages[i]); aws_input_stream_release(input_streams[i]); aws_byte_buf_clean_up(&input_stream_buffers[i]); + if (filepath_str[i]) { + ASSERT_SUCCESS(aws_file_delete(filepath_str[i])); + aws_string_destroy(filepath_str[i]); + } } aws_string_destroy(host_name); @@ -1784,6 +1795,18 @@ static int s_test_s3_put_object_multiple(struct aws_allocator *allocator, void * return 0; } +AWS_TEST_CASE(test_s3_put_object_multiple, s_test_s3_put_object_multiple) +static int s_test_s3_put_object_multiple(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + return s_test_s3_put_object_multiple_helper(allocator, false); +} + +AWS_TEST_CASE(test_s3_put_object_multiple_with_filepath, s_test_s3_put_object_multiple_with_filepath) +static int s_test_s3_put_object_multiple_with_filepath(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + return s_test_s3_put_object_multiple_helper(allocator, true); +} + AWS_TEST_CASE(test_s3_put_object_less_than_part_size, s_test_s3_put_object_less_than_part_size) static int s_test_s3_put_object_less_than_part_size(struct aws_allocator *allocator, void *ctx) { (void)ctx; @@ -3655,9 +3678,11 @@ static int s_test_s3_round_trip_mpu_default_get_fc(struct aws_allocator *allocat return 0; } -AWS_TEST_CASE(test_s3_round_trip_with_filepath, s_test_s3_round_trip_with_filepath) -static int s_test_s3_round_trip_with_filepath(struct aws_allocator *allocator, void *ctx) { - (void)ctx; +static int s_test_s3_round_trip_with_filepath_helper( + struct aws_allocator *allocator, + struct aws_byte_cursor key, + int object_size_mb, + bool unknown_content_length) { struct aws_s3_tester tester; ASSERT_SUCCESS(aws_s3_tester_init(allocator, &tester)); @@ -3677,8 +3702,7 @@ static int s_test_s3_round_trip_with_filepath(struct aws_allocator *allocator, v struct aws_byte_buf path_buf; AWS_ZERO_STRUCT(path_buf); - ASSERT_SUCCESS(aws_s3_tester_upload_file_path_init( - allocator, &path_buf, aws_byte_cursor_from_c_str("/prefix/round_trip/with_filepath"))); + ASSERT_SUCCESS(aws_s3_tester_upload_file_path_init(allocator, &path_buf, key)); struct aws_byte_cursor object_path = aws_byte_cursor_from_buf(&path_buf); @@ -3688,9 +3712,10 @@ static int s_test_s3_round_trip_with_filepath(struct aws_allocator *allocator, v .client = client, .put_options = { - .object_size_mb = 1, + .object_size_mb = object_size_mb, .object_path_override = object_path, .file_on_disk = true, + .skip_content_length = unknown_content_length, }, }; @@ -3722,6 +3747,45 @@ static int s_test_s3_round_trip_with_filepath(struct aws_allocator *allocator, v return 0; } +AWS_TEST_CASE(test_s3_round_trip_with_filepath, s_test_s3_round_trip_with_filepath) +static int s_test_s3_round_trip_with_filepath(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + return s_test_s3_round_trip_with_filepath_helper( + allocator, aws_byte_cursor_from_c_str("/prefix/round_trip/with_filepath"), 1, false /*unknown_content_length*/); +} + +AWS_TEST_CASE(test_s3_round_trip_mpu_with_filepath, s_test_s3_round_trip_mpu_with_filepath) +static int s_test_s3_round_trip_mpu_with_filepath(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + return s_test_s3_round_trip_with_filepath_helper( + allocator, + aws_byte_cursor_from_c_str("/prefix/round_trip/with_filepath_mpu"), + 50, + false /*unknown_content_length*/); +} + +AWS_TEST_CASE(test_s3_round_trip_with_filepath_no_content_length, s_test_s3_round_trip_with_filepath_no_content_length) +static int s_test_s3_round_trip_with_filepath_no_content_length(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + return s_test_s3_round_trip_with_filepath_helper( + allocator, + aws_byte_cursor_from_c_str("/prefix/round_trip/with_filepath_no_content_length"), + 1, + true /*unknown_content_length*/); +} + +AWS_TEST_CASE( + test_s3_round_trip_mpu_with_filepath_no_content_length, + s_test_s3_round_trip_mpu_with_filepath_no_content_length) +static int s_test_s3_round_trip_mpu_with_filepath_no_content_length(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + return s_test_s3_round_trip_with_filepath_helper( + allocator, + aws_byte_cursor_from_c_str("/prefix/round_trip/with_filepath_mpu_no_content_length"), + 50, + true /*unknown_content_length*/); +} + AWS_TEST_CASE(test_s3_chunked_then_unchunked, s_test_s3_chunked_then_unchunked) static int s_test_s3_chunked_then_unchunked(struct aws_allocator *allocator, void *ctx) { (void)ctx; @@ -4509,6 +4573,48 @@ static int s_test_s3_put_fail_object_invalid_send_filepath(struct aws_allocator return 0; } +/* Test that the parallel read stream failed to send read the second part. */ +AWS_TEST_CASE(test_s3_put_fail_object_bad_parallel_read_stream, s_test_s3_put_fail_object_bad_parallel_read_stream) +static int s_test_s3_put_fail_object_bad_parallel_read_stream(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + struct aws_s3_tester tester; + ASSERT_SUCCESS(aws_s3_tester_init(allocator, &tester)); + + struct aws_s3_tester_client_options client_options; + AWS_ZERO_STRUCT(client_options); + struct aws_s3_client *client = NULL; + ASSERT_SUCCESS(aws_s3_tester_client_new(&tester, &client_options, &client)); + /* Override the parallel input stream new function to create a bad parallel input stream */ + client->vtable->parallel_input_stream_new_from_file = aws_parallel_input_stream_new_from_file_failure_tester; + + struct aws_s3_meta_request_test_results meta_request_test_results; + aws_s3_meta_request_test_results_init(&meta_request_test_results, allocator); + + struct aws_s3_tester_meta_request_options options = { + .allocator = allocator, + .client = client, + .meta_request_type = AWS_S3_META_REQUEST_TYPE_PUT_OBJECT, + .validate_type = AWS_S3_TESTER_VALIDATE_TYPE_EXPECT_FAILURE, + .put_options = + { + .object_size_mb = 100, + .file_on_disk = true, + }, + }; + + ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(&tester, &options, &meta_request_test_results)); + + ASSERT_UINT_EQUALS(AWS_ERROR_UNIMPLEMENTED, meta_request_test_results.finished_error_code); + + aws_s3_meta_request_test_results_clean_up(&meta_request_test_results); + client = aws_s3_client_release(client); + + aws_s3_tester_clean_up(&tester); + + return AWS_OP_SUCCESS; +} + AWS_TEST_CASE( test_s3_put_single_part_fail_object_inputstream_fail_reading, s_test_s3_put_single_part_fail_object_inputstream_fail_reading) @@ -5882,7 +5988,7 @@ static void s_meta_request_finished_request_patched_for_pause_resume_tests( aws_atomic_fetch_add(&test_data->total_bytes_uploaded, request->request_body.len); size_t total_bytes_uploaded = aws_atomic_load_int(&test_data->total_bytes_uploaded); - size_t offset_to_pause = aws_atomic_load_int(&test_data->request_pause_offset); + uint64_t offset_to_pause = aws_atomic_load_int(&test_data->request_pause_offset); if (total_bytes_uploaded >= offset_to_pause) { /* offset of the upload at which we should pause was reached. let's pause the upload */ diff --git a/tests/s3_parallel_read_stream_test.c b/tests/s3_parallel_read_stream_test.c new file mode 100644 index 000000000..a79fb65ac --- /dev/null +++ b/tests/s3_parallel_read_stream_test.c @@ -0,0 +1,314 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include "aws/s3/private/s3_parallel_input_stream.h" +#include "aws/s3/private/s3_util.h" +#include "aws/s3/s3_client.h" +#include "s3_tester.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#define TEST_CASE(NAME) \ + AWS_TEST_CASE(NAME, s_test_##NAME); \ + static int s_test_##NAME(struct aws_allocator *allocator, void *ctx) + +#define DEFINE_HEADER(NAME, VALUE) \ + { .name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL(NAME), .value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL(VALUE), } + +#define ONE_SEC_IN_NS ((uint64_t)AWS_TIMESTAMP_NANOS) +#define MAX_TIMEOUT_NS (600 * ONE_SEC_IN_NS) + +AWS_STATIC_STRING_FROM_LITERAL(s_parallel_stream_test, "SimpleParallelStreamTest"); + +static int s_create_read_file(const char *file_path, size_t length) { + remove(file_path); + + FILE *file = aws_fopen(file_path, "w"); + size_t loop = length / s_parallel_stream_test->len; + for (size_t i = 0; i < loop; ++i) { + fprintf(file, "%s", (char *)s_parallel_stream_test->bytes); + } + size_t reminder = length % s_parallel_stream_test->len; + if (reminder) { + fprintf(file, "%.*s", (int)reminder, s_parallel_stream_test->bytes); + } + fclose(file); + return AWS_OP_SUCCESS; +} + +struct aws_parallel_read_from_test_args { + struct aws_allocator *alloc; + + size_t buffer_start_pos; + size_t file_start_pos; + size_t read_length; + struct aws_future_bool *final_end_future; + struct aws_byte_buf *final_dest; + + struct aws_parallel_input_stream *parallel_read_stream; + struct aws_atomic_var *completed_count; + struct aws_atomic_var *end_of_stream; + + size_t split_num; +}; + +static void s_s3_parallel_from_file_read_test_task(struct aws_task *task, void *arg, enum aws_task_status task_status) { + (void)task_status; + struct aws_parallel_read_from_test_args *test_args = arg; + + struct aws_byte_buf read_buf = { + .allocator = NULL, + .buffer = test_args->final_dest->buffer + test_args->buffer_start_pos, + .len = 0, + .capacity = test_args->read_length, + }; + struct aws_future_bool *read_future = + aws_parallel_input_stream_read(test_args->parallel_read_stream, test_args->file_start_pos, &read_buf); + aws_future_bool_wait(read_future, MAX_TIMEOUT_NS); + bool end_of_stream = aws_future_bool_get_result(read_future); + aws_future_bool_release(read_future); + + struct aws_future_bool *end_future = test_args->final_end_future; + size_t read_completed = aws_atomic_fetch_add(test_args->completed_count, 1); + if (end_of_stream) { + aws_atomic_store_int(test_args->end_of_stream, 1); + } + bool completed = read_completed == test_args->split_num - 1; + + bool reached_eos = aws_atomic_load_int(test_args->end_of_stream) == 1; + aws_mem_release(test_args->alloc, task); + aws_mem_release(test_args->alloc, test_args); + if (completed) { + aws_future_bool_set_result(end_future, reached_eos); + } + aws_future_bool_release(end_future); +} + +static int s_parallel_read_test_helper( + struct aws_allocator *alloc, + struct aws_parallel_input_stream *parallel_read_stream, + struct aws_byte_buf *read_buf, + struct aws_event_loop_group *elg, + size_t start_pos, + size_t total_length, + size_t split_num, + bool *out_eos) { + + struct aws_atomic_var completed_count; + aws_atomic_store_int(&completed_count, 0); + struct aws_atomic_var end_of_stream; + aws_atomic_store_int(&end_of_stream, 0); + size_t number_bytes_per_read = total_length / split_num; + if (number_bytes_per_read == 0) { + struct aws_future_bool *read_future = aws_parallel_input_stream_read(parallel_read_stream, 0, read_buf); + ASSERT_TRUE(aws_future_bool_wait(read_future, MAX_TIMEOUT_NS)); + aws_future_bool_release(read_future); + return AWS_OP_SUCCESS; + } + + struct aws_future_bool *future = aws_future_bool_new(alloc); + for (size_t i = 0; i < split_num; i++) { + struct aws_event_loop *loop = aws_event_loop_group_get_next_loop(elg); + struct aws_parallel_read_from_test_args *test_args = + aws_mem_calloc(alloc, 1, sizeof(struct aws_parallel_read_from_test_args)); + + size_t read_length = number_bytes_per_read; + if (i == split_num - 1) { + /* Last part, adjust the size */ + read_length += total_length % split_num; + } + + test_args->alloc = alloc; + test_args->buffer_start_pos = i * number_bytes_per_read; + test_args->file_start_pos = start_pos + test_args->buffer_start_pos; + test_args->final_end_future = aws_future_bool_acquire(future); + test_args->read_length = read_length; + test_args->final_dest = read_buf; + test_args->parallel_read_stream = parallel_read_stream; + test_args->completed_count = &completed_count; + test_args->end_of_stream = &end_of_stream; + test_args->split_num = split_num; + + struct aws_task *read_task = aws_mem_calloc(alloc, 1, sizeof(struct aws_task)); + aws_task_init(read_task, s_s3_parallel_from_file_read_test_task, test_args, "s3_parallel_read_test_task"); + aws_event_loop_schedule_task_now(loop, read_task); + } + + ASSERT_TRUE(aws_future_bool_wait(future, MAX_TIMEOUT_NS)); + *out_eos = aws_future_bool_get_result(future); + aws_future_bool_release(future); + read_buf->len = total_length; + return AWS_OP_SUCCESS; +} + +TEST_CASE(parallel_read_stream_from_file_sanity_test) { + (void)ctx; + struct aws_s3_tester tester; + ASSERT_SUCCESS(aws_s3_tester_init(allocator, &tester)); + + const char *file_path = "s3_test_parallel_input_stream_read.txt"; /* unique name */ + ASSERT_SUCCESS(s_create_read_file(file_path, s_parallel_stream_test->len)); + struct aws_byte_cursor path_cursor = aws_byte_cursor_from_c_str(file_path); + + struct aws_parallel_input_stream *parallel_read_stream = + aws_parallel_input_stream_new_from_file(allocator, path_cursor); + ASSERT_NOT_NULL(parallel_read_stream); + + aws_parallel_input_stream_acquire(parallel_read_stream); + aws_parallel_input_stream_release(parallel_read_stream); + struct aws_event_loop_group *el_group = aws_event_loop_group_new_default(allocator, 0, NULL); + + { + struct aws_byte_buf read_buf; + aws_byte_buf_init(&read_buf, allocator, s_parallel_stream_test->len); + bool eos_reached = false; + ASSERT_SUCCESS(s_parallel_read_test_helper( + allocator, parallel_read_stream, &read_buf, el_group, 0, s_parallel_stream_test->len, 8, &eos_reached)); + + /* Read the exact number of bytes will not reach to the EOS */ + ASSERT_FALSE(eos_reached); + ASSERT_TRUE(aws_string_eq_byte_buf(s_parallel_stream_test, &read_buf)); + aws_byte_buf_clean_up(&read_buf); + } + + { + size_t extra_byte_len = s_parallel_stream_test->len + 1; + struct aws_byte_buf read_buf; + aws_byte_buf_init(&read_buf, allocator, extra_byte_len); + bool eos_reached = false; + ASSERT_SUCCESS(s_parallel_read_test_helper( + allocator, parallel_read_stream, &read_buf, el_group, 0, extra_byte_len, 8, &eos_reached)); + + /* Read the exact number of bytes will not reach to the EOS */ + ASSERT_TRUE(eos_reached); + aws_byte_buf_clean_up(&read_buf); + } + + { + /* Failure from short buffer */ + struct aws_byte_buf read_buf; + aws_byte_buf_init(&read_buf, allocator, s_parallel_stream_test->len); + /* Set the buffer length to be capacity */ + read_buf.len = s_parallel_stream_test->len; + struct aws_future_bool *read_future = aws_parallel_input_stream_read(parallel_read_stream, 0, &read_buf); + ASSERT_TRUE(aws_future_bool_is_done(read_future)); + int error = aws_future_bool_get_error(read_future); + ASSERT_UINT_EQUALS(AWS_ERROR_SHORT_BUFFER, error); + aws_byte_buf_clean_up(&read_buf); + aws_future_bool_release(read_future); + } + + { + /* offset larger than the length of file, will read nothing and return EOS */ + struct aws_byte_buf read_buf; + aws_byte_buf_init(&read_buf, allocator, s_parallel_stream_test->len); + struct aws_future_bool *read_future = + aws_parallel_input_stream_read(parallel_read_stream, 2 * s_parallel_stream_test->len, &read_buf); + ASSERT_TRUE(aws_future_bool_is_done(read_future)); + int error = aws_future_bool_get_error(read_future); + bool eos = aws_future_bool_get_result(read_future); + /* Seek to offset larger than the length will not fail. */ + ASSERT_UINT_EQUALS(AWS_ERROR_SUCCESS, error); + ASSERT_TRUE(eos); + ASSERT_UINT_EQUALS(0, read_buf.len); + aws_byte_buf_clean_up(&read_buf); + aws_future_bool_release(read_future); + } + + remove(file_path); + aws_parallel_input_stream_release(parallel_read_stream); + aws_event_loop_group_release(el_group); + aws_s3_tester_clean_up(&tester); + + return AWS_OP_SUCCESS; +} + +TEST_CASE(parallel_read_stream_from_large_file_test) { + (void)ctx; + struct aws_s3_tester tester; + ASSERT_SUCCESS(aws_s3_tester_init(allocator, &tester)); + size_t file_length = MB_TO_BYTES(10); + + const char *file_path = "s3_test_parallel_input_stream_read_large.txt"; /* unique name */ + ASSERT_SUCCESS(s_create_read_file(file_path, file_length)); + struct aws_event_loop_group *el_group = aws_event_loop_group_new_default(allocator, 0, NULL); + struct aws_byte_cursor path_cursor = aws_byte_cursor_from_c_str(file_path); + + struct aws_parallel_input_stream *parallel_read_stream = + aws_parallel_input_stream_new_from_file(allocator, path_cursor); + ASSERT_NOT_NULL(parallel_read_stream); + + { + /* The whole file */ + struct aws_byte_buf read_buf; + aws_byte_buf_init(&read_buf, allocator, file_length); + struct aws_byte_buf expected_read_buf; + aws_byte_buf_init(&expected_read_buf, allocator, file_length); + bool eos_reached = false; + + ASSERT_SUCCESS(s_parallel_read_test_helper( + allocator, parallel_read_stream, &read_buf, el_group, 0, file_length, 8, &eos_reached)); + + /* Read the exact number of bytes will not reach to the EOS */ + ASSERT_FALSE(eos_reached); + struct aws_input_stream *stream = aws_input_stream_new_from_file(allocator, file_path); + ASSERT_SUCCESS(aws_input_stream_read(stream, &expected_read_buf)); + + ASSERT_TRUE(aws_byte_buf_eq(&expected_read_buf, &read_buf)); + aws_byte_buf_clean_up(&read_buf); + aws_byte_buf_clean_up(&expected_read_buf); + aws_input_stream_release(stream); + } + + { + /* First string */ + struct aws_byte_buf read_buf; + aws_byte_buf_init(&read_buf, allocator, file_length); + bool eos_reached = true; + + ASSERT_SUCCESS(s_parallel_read_test_helper( + allocator, parallel_read_stream, &read_buf, el_group, 0, s_parallel_stream_test->len, 8, &eos_reached)); + + ASSERT_FALSE(eos_reached); + ASSERT_TRUE(aws_string_eq_byte_buf(s_parallel_stream_test, &read_buf)); + aws_byte_buf_clean_up(&read_buf); + } + + { + /* Second string */ + struct aws_byte_buf read_buf; + aws_byte_buf_init(&read_buf, allocator, file_length); + + bool eos_reached = true; + ASSERT_SUCCESS(s_parallel_read_test_helper( + allocator, + parallel_read_stream, + &read_buf, + el_group, + s_parallel_stream_test->len, + s_parallel_stream_test->len, + 8, + &eos_reached)); + + ASSERT_FALSE(eos_reached); + ASSERT_TRUE(aws_string_eq_byte_buf(s_parallel_stream_test, &read_buf)); + aws_byte_buf_clean_up(&read_buf); + } + remove(file_path); + aws_event_loop_group_release(el_group); + aws_parallel_input_stream_release(parallel_read_stream); + aws_s3_tester_clean_up(&tester); + + return AWS_OP_SUCCESS; +} diff --git a/tests/s3_test_parallel_stream.c b/tests/s3_test_parallel_stream.c new file mode 100644 index 000000000..fc263ebbe --- /dev/null +++ b/tests/s3_test_parallel_stream.c @@ -0,0 +1,66 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include "aws/s3/private/s3_parallel_input_stream.h" +#include "s3_tester.h" +#include + +struct aws_parallel_input_stream_from_file_failure_impl { + struct aws_parallel_input_stream base; + + struct aws_atomic_var number_read; +}; + +static void s_para_from_file_failure_destroy(struct aws_parallel_input_stream *stream) { + struct aws_parallel_input_stream_from_file_failure_impl *impl = stream->impl; + + aws_mem_release(stream->alloc, impl); +} + +struct aws_future_bool *s_para_from_file_failure_read( + struct aws_parallel_input_stream *stream, + uint64_t offset, + struct aws_byte_buf *dest) { + (void)offset; + + struct aws_future_bool *future = aws_future_bool_new(stream->alloc); + struct aws_parallel_input_stream_from_file_failure_impl *impl = stream->impl; + size_t previous_number_read = aws_atomic_fetch_add(&impl->number_read, 1); + if (previous_number_read == 1) { + /* TODO: make the failure configurable */ + aws_future_bool_set_error(future, AWS_ERROR_UNIMPLEMENTED); + } else { + + struct aws_byte_cursor test_string = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("This is an S3 test."); + while (dest->len < dest->capacity) { + size_t remaining_in_buffer = dest->capacity - dest->len; + if (remaining_in_buffer < test_string.len) { + test_string.len = remaining_in_buffer; + } + aws_byte_buf_append(dest, &test_string); + } + aws_future_bool_set_result(future, false); + } + return future; +} + +static struct aws_parallel_input_stream_vtable s_parallel_input_stream_from_file_failure_vtable = { + .destroy = s_para_from_file_failure_destroy, + .read = s_para_from_file_failure_read, +}; + +struct aws_parallel_input_stream *aws_parallel_input_stream_new_from_file_failure_tester( + struct aws_allocator *allocator, + struct aws_byte_cursor file_name) { + (void)file_name; + + struct aws_parallel_input_stream_from_file_failure_impl *impl = + aws_mem_calloc(allocator, 1, sizeof(struct aws_parallel_input_stream_from_file_failure_impl)); + aws_parallel_input_stream_init_base( + &impl->base, allocator, &s_parallel_input_stream_from_file_failure_vtable, impl); + + aws_atomic_init_int(&impl->number_read, 0); + return &impl->base; +} diff --git a/tests/s3_tester.c b/tests/s3_tester.c index 76d1044e5..0cd26668b 100644 --- a/tests/s3_tester.c +++ b/tests/s3_tester.c @@ -11,7 +11,6 @@ #include "aws/s3/private/s3_util.h" #include #include -#include #include #include #include @@ -1548,35 +1547,7 @@ int aws_s3_tester_send_meta_request_with_options( /* if uploading via filepath, write input_stream out as tmp file on disk, and then upload that */ if (options->put_options.file_on_disk) { ASSERT_NOT_NULL(input_stream); - struct aws_byte_buf filepath_buf; - aws_byte_buf_init(&filepath_buf, allocator, 128); - struct aws_byte_cursor filepath_prefix = aws_byte_cursor_from_c_str("tmp"); - aws_byte_buf_append_dynamic(&filepath_buf, &filepath_prefix); - aws_byte_buf_append_dynamic(&filepath_buf, &test_object_path); - for (size_t i = 0; i < filepath_buf.len; ++i) { - if (!isalnum(filepath_buf.buffer[i])) { - filepath_buf.buffer[i] = '_'; /* sanitize filename */ - } - } - filepath_str = aws_string_new_from_buf(allocator, &filepath_buf); - aws_byte_buf_clean_up(&filepath_buf); - - FILE *file = aws_fopen(aws_string_c_str(filepath_str), "wb"); - ASSERT_NOT_NULL(file, "Cannot open file for write: %s", aws_string_c_str(filepath_str)); - - int64_t stream_length = 0; - ASSERT_SUCCESS(aws_input_stream_get_length(input_stream, &stream_length)); - - struct aws_byte_buf data_buf; - ASSERT_SUCCESS(aws_byte_buf_init(&data_buf, allocator, (size_t)stream_length)); - ASSERT_SUCCESS(aws_input_stream_read(input_stream, &data_buf)); - ASSERT_UINT_EQUALS((size_t)stream_length, data_buf.len); - - ASSERT_UINT_EQUALS(data_buf.len, fwrite(data_buf.buffer, 1, data_buf.len, file)); - fclose(file); - aws_byte_buf_clean_up(&data_buf); - - /* use filepath instead of input_stream */ + filepath_str = aws_s3_tester_create_file(allocator, test_object_path, input_stream); meta_request_options.send_filepath = aws_byte_cursor_from_string(filepath_str); input_stream = aws_input_stream_release(input_stream); } @@ -2047,3 +2018,38 @@ int aws_s3_tester_get_content_length(const struct aws_http_headers *headers, uin ASSERT_SUCCESS(aws_byte_cursor_utf8_parse_u64(value_cursor, out_content_length)); return AWS_OP_SUCCESS; } + +struct aws_string *aws_s3_tester_create_file( + struct aws_allocator *allocator, + struct aws_byte_cursor test_object_path, + struct aws_input_stream *input_stream) { + + struct aws_byte_buf filepath_buf; + aws_byte_buf_init(&filepath_buf, allocator, 128); + struct aws_byte_cursor filepath_prefix = aws_byte_cursor_from_c_str("tmp"); + aws_byte_buf_append_dynamic(&filepath_buf, &filepath_prefix); + aws_byte_buf_append_dynamic(&filepath_buf, &test_object_path); + for (size_t i = 0; i < filepath_buf.len; ++i) { + if (!isalnum(filepath_buf.buffer[i])) { + filepath_buf.buffer[i] = '_'; /* sanitize filename */ + } + } + struct aws_string *filepath_str = aws_string_new_from_buf(allocator, &filepath_buf); + aws_byte_buf_clean_up(&filepath_buf); + + FILE *file = aws_fopen(aws_string_c_str(filepath_str), "wb"); + AWS_FATAL_ASSERT(file != NULL); + + int64_t stream_length = 0; + AWS_FATAL_ASSERT(aws_input_stream_get_length(input_stream, &stream_length) == AWS_OP_SUCCESS); + + struct aws_byte_buf data_buf; + AWS_FATAL_ASSERT(aws_byte_buf_init(&data_buf, allocator, (size_t)stream_length) == AWS_OP_SUCCESS); + AWS_FATAL_ASSERT(aws_input_stream_read(input_stream, &data_buf) == AWS_OP_SUCCESS); + AWS_FATAL_ASSERT((size_t)stream_length == data_buf.len); + AWS_FATAL_ASSERT(data_buf.len == fwrite(data_buf.buffer, 1, data_buf.len, file)); + fclose(file); + aws_byte_buf_clean_up(&data_buf); + + return filepath_str; +} diff --git a/tests/s3_tester.h b/tests/s3_tester.h index b70bdeece..c35462f62 100644 --- a/tests/s3_tester.h +++ b/tests/s3_tester.h @@ -15,6 +15,7 @@ #include #include +#include #include #include #include @@ -446,8 +447,18 @@ int aws_s3_tester_upload_file_path_init( struct aws_byte_buf *out_path_buffer, struct aws_byte_cursor file_path); +/* Create a file on disk based on the input stream. Return the file path */ +struct aws_string *aws_s3_tester_create_file( + struct aws_allocator *allocator, + struct aws_byte_cursor test_object_path, + struct aws_input_stream *input_stream); + int aws_s3_tester_get_content_length(const struct aws_http_headers *headers, uint64_t *out_content_length); +struct aws_parallel_input_stream *aws_parallel_input_stream_new_from_file_failure_tester( + struct aws_allocator *allocator, + struct aws_byte_cursor file_name); + extern struct aws_s3_client_vtable g_aws_s3_client_mock_vtable; extern const struct aws_byte_cursor g_mock_server_uri;