Skip to content

Commit

Permalink
Merge branch 'para-pre-rcbc' into para-pre-keep-reading-checksum
Browse files Browse the repository at this point in the history
  • Loading branch information
TingDaoK committed Oct 12, 2023
2 parents e712ba6 + 122ac29 commit 9bcdfda
Show file tree
Hide file tree
Showing 10 changed files with 139 additions and 15 deletions.
3 changes: 3 additions & 0 deletions include/aws/s3/private/s3_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ struct aws_s3_client_vtable {
void (*endpoint_shutdown_callback)(struct aws_s3_client *client);

void (*finish_destroy)(struct aws_s3_client *client);

struct aws_parallel_input_stream *(
*parallel_input_stream_new_from_file)(struct aws_allocator *allocator, struct aws_byte_cursor file_name);
};

/* Represents the state of the S3 client. */
Expand Down
4 changes: 2 additions & 2 deletions include/aws/s3/private/s3_parallel_input_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ AWS_S3_API
struct aws_parallel_input_stream *aws_parallel_input_stream_release(struct aws_parallel_input_stream *stream);

/**
* Read from the offset until fill the dest, or EOS reached.
* Read from the offset until fill the dest, or EOF reached.
* It's thread safe to be called from multiple threads without waiting for other read to complete
*
* @param stream The stream to read from
Expand All @@ -85,7 +85,7 @@ struct aws_future_bool *aws_parallel_input_stream_read(
struct aws_byte_buf *dest);

/**
* Create a new file based parallel input stream implementation.
* Create a new file based parallel input stream.
*
* This implementation will open a file handler when the read happens, and seek to the offset to start reading. Close
* the file handler as read finishes.
Expand Down
11 changes: 10 additions & 1 deletion source/s3_auto_ranged_put.c
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,7 @@ static size_t s_compute_request_body_size(
request_body_size = content_remainder;
}
}
/* The part_number starts with 1 */
/* The part_number starts at 1 */
*offset_out = (part_number - 1) * meta_request->part_size;

return request_body_size;
Expand Down Expand Up @@ -1000,6 +1000,15 @@ static void s_s3_prepare_upload_part_on_read_done(void *user_data) {

if (!request->is_noop) {

/* The part can finish out of order. Resize array-list to be long enough to hold this part,
* filling any intermediate slots with NULL. */

aws_array_list_ensure_capacity(&auto_ranged_put->synced_data.part_list, request->part_number);
while (aws_array_list_length(&auto_ranged_put->synced_data.part_list) < request->part_number) {
struct aws_s3_mpu_part_info *null_part = NULL;
aws_array_list_push_back(&auto_ranged_put->synced_data.part_list, &null_part);
}

/* Add part to array-list */
struct aws_s3_mpu_part_info *part =
aws_mem_calloc(meta_request->allocator, 1, sizeof(struct aws_s3_mpu_part_info));
Expand Down
2 changes: 2 additions & 0 deletions source/s3_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "aws/s3/private/s3_copy_object.h"
#include "aws/s3/private/s3_default_meta_request.h"
#include "aws/s3/private/s3_meta_request_impl.h"
#include "aws/s3/private/s3_parallel_input_stream.h"
#include "aws/s3/private/s3_request_messages.h"
#include "aws/s3/private/s3_util.h"

Expand Down Expand Up @@ -126,6 +127,7 @@ static struct aws_s3_client_vtable s_s3_client_default_vtable = {
.process_work = s_s3_client_process_work_default,
.endpoint_shutdown_callback = s_s3_client_endpoint_shutdown_callback,
.finish_destroy = s_s3_client_finish_destroy_default,
.parallel_input_stream_new_from_file = aws_parallel_input_stream_new_from_file,
};

void aws_s3_set_dns_ttl(size_t ttl) {
Expand Down
16 changes: 9 additions & 7 deletions source/s3_meta_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,7 @@ int aws_s3_meta_request_init_base(

/* Client is currently optional to allow spinning up a meta_request without a client in a test. */
if (client != NULL) {
aws_s3_client_acquire(client);
meta_request->client = client;
meta_request->client = aws_s3_client_acquire(client);
meta_request->io_event_loop = aws_event_loop_group_get_next_loop(client->body_streaming_elg);
meta_request->synced_data.read_window_running_total = client->initial_read_window;
}
Expand All @@ -256,13 +255,14 @@ int aws_s3_meta_request_init_base(
* (we checked earlier that it's not being passed multiple ways) */
if (options->send_filepath.len > 0) {
/* Create parallel read stream from file */
meta_request->initial_request_message = aws_http_message_acquire(options->message);
AWS_ASSERT(client != NULL);
meta_request->request_body_parallel_stream =
aws_parallel_input_stream_new_from_file(allocator, options->send_filepath);
client->vtable->parallel_input_stream_new_from_file(allocator, options->send_filepath);
if (meta_request->request_body_parallel_stream == NULL) {
goto error;
}

/* but keep original message around for headers, method, etc */
meta_request->initial_request_message = aws_http_message_acquire(options->message);
} else if (options->send_async_stream != NULL) {
/* Read from async body-stream, but keep original message around for headers, method, etc */
meta_request->request_body_async_stream = aws_async_input_stream_acquire(options->send_async_stream);
Expand Down Expand Up @@ -631,8 +631,8 @@ static void s_s3_meta_request_schedule_prepare_request_default(
aws_task_init(
&payload->task, s_s3_meta_request_prepare_request_task, payload, "s3_meta_request_prepare_request_task");
if (meta_request->request_body_parallel_stream) {
/* The body stream supports to be read in parallel, so, we can prepare requests in parallel. Grab a thread from
* IO thread pool instead of using the thread to the meta request to prepare request. */
/* The body stream supports reading in parallel, so schedule task on any I/O thread.
* If we always used the meta-request's dedicated io_event_loop, we wouldn't get any parallelism. */
struct aws_event_loop *loop = aws_event_loop_group_get_next_loop(client->body_streaming_elg);
aws_event_loop_schedule_task_now(loop, &payload->task);
} else {
Expand Down Expand Up @@ -1711,6 +1711,8 @@ struct aws_future_bool *aws_s3_meta_request_read_body(
if (meta_request->request_body_async_stream != NULL) {
return aws_async_input_stream_read_to_fill(meta_request->request_body_async_stream, buffer);
}

/* If parallel-stream, simply call read(), which must fill the buffer and/or EOF */
if (meta_request->request_body_parallel_stream != NULL) {
return aws_parallel_input_stream_read(meta_request->request_body_parallel_stream, offset, buffer);
}
Expand Down
5 changes: 0 additions & 5 deletions source/s3_parallel_input_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,12 @@

#include "aws/s3/private/s3_parallel_input_stream.h"

#include <aws/common/atomics.h>
#include <aws/common/file.h>
#include <aws/common/string.h>
#include <aws/common/task_scheduler.h>

#include <aws/io/event_loop.h>
#include <aws/io/future.h>
#include <aws/io/stream.h>

#include <errno.h>
#include <sys/stat.h>

void aws_parallel_input_stream_init_base(
struct aws_parallel_input_stream *stream,
Expand Down
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ add_net_test_case(test_s3_error_missing_file)
add_net_test_case(test_s3_existing_host_entry)
add_net_test_case(test_s3_put_fail_object_invalid_request)
add_net_test_case(test_s3_put_fail_object_invalid_send_filepath)
add_net_test_case(test_s3_put_fail_object_bad_parallel_read_stream)
add_net_test_case(test_s3_put_fail_object_inputstream_fail_reading)
add_net_test_case(test_s3_put_fail_object_inputstream_mismatch_content_length)
add_net_test_case(test_s3_put_single_part_fail_object_inputstream_fail_reading)
Expand Down
42 changes: 42 additions & 0 deletions tests/s3_data_plane_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -4573,6 +4573,48 @@ static int s_test_s3_put_fail_object_invalid_send_filepath(struct aws_allocator
return 0;
}

/* Test that the parallel read stream failed to send read the second part. */
AWS_TEST_CASE(test_s3_put_fail_object_bad_parallel_read_stream, s_test_s3_put_fail_object_bad_parallel_read_stream)
static int s_test_s3_put_fail_object_bad_parallel_read_stream(struct aws_allocator *allocator, void *ctx) {
(void)ctx;

struct aws_s3_tester tester;
ASSERT_SUCCESS(aws_s3_tester_init(allocator, &tester));

struct aws_s3_tester_client_options client_options;
AWS_ZERO_STRUCT(client_options);
struct aws_s3_client *client = NULL;
ASSERT_SUCCESS(aws_s3_tester_client_new(&tester, &client_options, &client));
/* Override the parallel input stream new function to create a bad parallel input stream */
client->vtable->parallel_input_stream_new_from_file = aws_parallel_input_stream_new_from_file_failure_tester;

struct aws_s3_meta_request_test_results meta_request_test_results;
aws_s3_meta_request_test_results_init(&meta_request_test_results, allocator);

struct aws_s3_tester_meta_request_options options = {
.allocator = allocator,
.client = client,
.meta_request_type = AWS_S3_META_REQUEST_TYPE_PUT_OBJECT,
.validate_type = AWS_S3_TESTER_VALIDATE_TYPE_EXPECT_FAILURE,
.put_options =
{
.object_size_mb = 100,
.file_on_disk = true,
},
};

ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(&tester, &options, &meta_request_test_results));

ASSERT_UINT_EQUALS(AWS_ERROR_UNIMPLEMENTED, meta_request_test_results.finished_error_code);

aws_s3_meta_request_test_results_clean_up(&meta_request_test_results);
client = aws_s3_client_release(client);

aws_s3_tester_clean_up(&tester);

return AWS_OP_SUCCESS;
}

AWS_TEST_CASE(
test_s3_put_single_part_fail_object_inputstream_fail_reading,
s_test_s3_put_single_part_fail_object_inputstream_fail_reading)
Expand Down
66 changes: 66 additions & 0 deletions tests/s3_test_parallel_stream.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

#include "aws/s3/private/s3_parallel_input_stream.h"
#include "s3_tester.h"
#include <aws/common/atomics.h>

struct aws_parallel_input_stream_from_file_failure_impl {
struct aws_parallel_input_stream base;

struct aws_atomic_var number_read;
};

static void s_para_from_file_failure_destroy(struct aws_parallel_input_stream *stream) {
struct aws_parallel_input_stream_from_file_failure_impl *impl = stream->impl;

aws_mem_release(stream->alloc, impl);
}

struct aws_future_bool *s_para_from_file_failure_read(
struct aws_parallel_input_stream *stream,
uint64_t offset,
struct aws_byte_buf *dest) {
(void)offset;

struct aws_future_bool *future = aws_future_bool_new(stream->alloc);
struct aws_parallel_input_stream_from_file_failure_impl *impl = stream->impl;
size_t previous_number_read = aws_atomic_fetch_add(&impl->number_read, 1);
if (previous_number_read == 1) {
/* TODO: make the failure configurable */
aws_future_bool_set_error(future, AWS_ERROR_UNIMPLEMENTED);
} else {

struct aws_byte_cursor test_string = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("This is an S3 test.");
while (dest->len < dest->capacity) {
size_t remaining_in_buffer = dest->capacity - dest->len;
if (remaining_in_buffer < test_string.len) {
test_string.len = remaining_in_buffer;
}
aws_byte_buf_append(dest, &test_string);
}
aws_future_bool_set_result(future, false);
}
return future;
}

static struct aws_parallel_input_stream_vtable s_parallel_input_stream_from_file_failure_vtable = {
.destroy = s_para_from_file_failure_destroy,
.read = s_para_from_file_failure_read,
};

struct aws_parallel_input_stream *aws_parallel_input_stream_new_from_file_failure_tester(
struct aws_allocator *allocator,
struct aws_byte_cursor file_name) {
(void)file_name;

struct aws_parallel_input_stream_from_file_failure_impl *impl =
aws_mem_calloc(allocator, 1, sizeof(struct aws_parallel_input_stream_from_file_failure_impl));
aws_parallel_input_stream_init_base(
&impl->base, allocator, &s_parallel_input_stream_from_file_failure_vtable, impl);

aws_atomic_init_int(&impl->number_read, 0);
return &impl->base;
}
4 changes: 4 additions & 0 deletions tests/s3_tester.h
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,10 @@ struct aws_string *aws_s3_tester_create_file(

int aws_s3_tester_get_content_length(const struct aws_http_headers *headers, uint64_t *out_content_length);

struct aws_parallel_input_stream *aws_parallel_input_stream_new_from_file_failure_tester(
struct aws_allocator *allocator,
struct aws_byte_cursor file_name);

extern struct aws_s3_client_vtable g_aws_s3_client_mock_vtable;

extern const struct aws_byte_cursor g_mock_server_uri;
Expand Down

0 comments on commit 9bcdfda

Please sign in to comment.