Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel prepare - open file on needed #355

Merged
merged 60 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
ac0c9e6
WIP
TingDaoK Sep 19, 2023
43d792b
hook it up with s3
TingDaoK Sep 19, 2023
6c23fd6
io/future.h
TingDaoK Sep 19, 2023
55d025d
fix compile error
TingDaoK Sep 19, 2023
f36e660
fix use after free
TingDaoK Sep 19, 2023
88e36fc
fix tests
TingDaoK Sep 19, 2023
15b1dc4
disable the test for now
TingDaoK Sep 19, 2023
bc350f0
only allow one read per time
TingDaoK Sep 20, 2023
392ee38
close the file handler on success as well
TingDaoK Sep 20, 2023
71caee4
type case
TingDaoK Sep 20, 2023
b5754ff
add a test about mpu from file path
TingDaoK Sep 20, 2023
c7c16ed
Merge branch 'main' into parallel-read
TingDaoK Sep 20, 2023
f75c45a
use another key to avoid override
TingDaoK Sep 20, 2023
771369d
hack to prepare parts in parallel
TingDaoK Sep 20, 2023
15026da
updates
TingDaoK Sep 21, 2023
54401cd
use after free
TingDaoK Sep 21, 2023
7fc8dce
add one more test for multiple meta requests
TingDaoK Sep 22, 2023
0b8c393
const
TingDaoK Sep 22, 2023
2a0626c
use define
TingDaoK Sep 22, 2023
7630d1c
update comments
TingDaoK Sep 22, 2023
e6a4ae1
try to use mmap
TingDaoK Sep 29, 2023
d6d6318
update the length of the buffer
TingDaoK Sep 29, 2023
524c568
unused lable
TingDaoK Sep 29, 2023
2106654
forgot to destroy the string
TingDaoK Sep 29, 2023
8fc1bda
a wrapper around mmap
TingDaoK Oct 2, 2023
6565dba
add windows impl
TingDaoK Oct 2, 2023
04667c6
a bit more error handling
TingDaoK Oct 2, 2023
8b18679
renaming and comments
TingDaoK Oct 2, 2023
c9b56b2
map and unmap
TingDaoK Oct 3, 2023
d74a4ee
let's fix windows later
TingDaoK Oct 3, 2023
e08da93
windows impl
TingDaoK Oct 3, 2023
b1b0a98
fix warning
TingDaoK Oct 3, 2023
4d75985
add a comment about how that the in-page-offset is not needed
TingDaoK Oct 4, 2023
c6a1011
rcbc
TingDaoK Oct 4, 2023
afe61b8
red code best code
TingDaoK Oct 4, 2023
11141c5
clean up
TingDaoK Oct 5, 2023
cd666c7
check for file modified
TingDaoK Oct 5, 2023
c9c3dce
update the use uint64_t for offset
TingDaoK Oct 5, 2023
d4f531f
one missing
TingDaoK Oct 5, 2023
88b5de8
add test for eos
TingDaoK Oct 5, 2023
b500eb7
heap use after free
TingDaoK Oct 5, 2023
84fff59
fix compile issue
TingDaoK Oct 5, 2023
0b9a4da
Merge branch 'main' into para-pre-rcbc
TingDaoK Oct 6, 2023
f204570
renaming
TingDaoK Oct 6, 2023
8c8a5bb
add two more test with unknown content length set or not
TingDaoK Oct 6, 2023
e2718c7
delete the extra
TingDaoK Oct 6, 2023
25f0140
oops, forgot to update the object size
TingDaoK Oct 6, 2023
1461481
add comment
TingDaoK Oct 6, 2023
bca43d2
oops, test file was committed by accident
TingDaoK Oct 6, 2023
1cc2d01
more test
TingDaoK Oct 6, 2023
52d2d94
whatever, it's not a big deal
TingDaoK Oct 6, 2023
540e5fa
Apply suggestions from code review
TingDaoK Oct 8, 2023
9c96251
address comments
TingDaoK Oct 8, 2023
8614ea5
Merge branch 'para-pre-rcbc' of github.com:awslabs/aws-c-s3 into para…
TingDaoK Oct 8, 2023
0401db5
clean up
TingDaoK Oct 9, 2023
4914373
Apply suggestions from code review
TingDaoK Oct 9, 2023
68aa652
still needs errno.h
TingDaoK Oct 9, 2023
9b78f79
why it fails??
TingDaoK Oct 10, 2023
00a533c
fix the error
TingDaoK Oct 10, 2023
122ac29
fix compile wraning
TingDaoK Oct 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions include/aws/s3/private/s3_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ struct aws_s3_client_vtable {
void (*endpoint_shutdown_callback)(struct aws_s3_client *client);

void (*finish_destroy)(struct aws_s3_client *client);

struct aws_parallel_input_stream *(
*parallel_input_stream_new_from_file)(struct aws_allocator *allocator, struct aws_byte_cursor file_name);
};

/* Represents the state of the S3 client. */
Expand Down
10 changes: 8 additions & 2 deletions include/aws/s3/private/s3_meta_request_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,12 @@ struct aws_s3_meta_request {
/* Initial HTTP Message that this meta request is based on. */
struct aws_http_message *initial_request_message;

/* Async stream for meta request's body.
* NULL if using initial_request_message's synchronous body stream instead. */
/* The meta request's outgoing body comes from one of these:
* 1) request_body_async_stream: if set, then async stream 1 part at a time
* 2) request_body_parallel_stream: if set, then stream multiple parts in parallel
* 3) initial_request_message's body_stream: else synchronously stream parts */
struct aws_async_input_stream *request_body_async_stream;
struct aws_parallel_input_stream *request_body_parallel_stream;

/* Part size to use for uploads and downloads. Passed down by the creating client. */
const size_t part_size;
Expand Down Expand Up @@ -363,10 +366,13 @@ bool aws_s3_meta_request_are_events_out_for_delivery_synced(struct aws_s3_meta_r
* It may read from the underlying stream multiple times, if that's what it takes to fill the buffer.
* Returns a future whose result bool indicates whether end of stream was reached.
* This future may complete on any thread, and may complete synchronously.
*
* Read from offset to fill the buffer
*/
AWS_S3_API
struct aws_future_bool *aws_s3_meta_request_read_body(
struct aws_s3_meta_request *meta_request,
uint64_t offset,
struct aws_byte_buf *buffer);

bool aws_s3_meta_request_body_has_no_more_data(const struct aws_s3_meta_request *meta_request);
Expand Down
105 changes: 105 additions & 0 deletions include/aws/s3/private/s3_parallel_input_stream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

#ifndef AWS_S3_PARALLEL_INPUT_STREAM_H
#define AWS_S3_PARALLEL_INPUT_STREAM_H

#include <aws/s3/s3.h>

#include <aws/common/ref_count.h>

AWS_PUSH_SANE_WARNING_LEVEL

struct aws_byte_buf;
struct aws_future_bool;
struct aws_input_stream;

struct aws_event_loop_group;

struct aws_parallel_input_stream {
const struct aws_parallel_input_stream_vtable *vtable;
struct aws_allocator *alloc;
struct aws_ref_count ref_count;

void *impl;
};

struct aws_parallel_input_stream_vtable {
/**
* Destroy the stream, its refcount has reached 0.
*/
void (*destroy)(struct aws_parallel_input_stream *stream);

/**
* Read into the buffer in parallel.
* The implementation needs to support this to be invoked concurrently from multiple threads
*/
struct aws_future_bool *(
*read)(struct aws_parallel_input_stream *stream, uint64_t offset, struct aws_byte_buf *dest);
};

AWS_EXTERN_C_BEGIN

/**
* Initialize aws_parallel_input_stream "base class"
*/
AWS_S3_API
void aws_parallel_input_stream_init_base(
struct aws_parallel_input_stream *stream,
struct aws_allocator *alloc,
const struct aws_parallel_input_stream_vtable *vtable,
void *impl);

/**
* Increment reference count.
* You may pass in NULL (has no effect).
* Returns whatever pointer was passed in.
*/
AWS_S3_API
struct aws_parallel_input_stream *aws_parallel_input_stream_acquire(struct aws_parallel_input_stream *stream);

/**
* Decrement reference count.
* You may pass in NULL (has no effect).
* Always returns NULL.
*/
AWS_S3_API
struct aws_parallel_input_stream *aws_parallel_input_stream_release(struct aws_parallel_input_stream *stream);

/**
* Read from the offset until fill the dest, or EOF reached.
* It's thread safe to be called from multiple threads without waiting for other read to complete
*
* @param stream The stream to read from
* @param offset The offset in the stream from beginning to start reading
* @param dest The output buffer read to
* @return a future, which will contain an error code if something went wrong,
* or a result bool indicating whether EOF has been reached.
*/
AWS_S3_API
struct aws_future_bool *aws_parallel_input_stream_read(
struct aws_parallel_input_stream *stream,
uint64_t offset,
struct aws_byte_buf *dest);

/**
* Create a new file based parallel input stream.
*
* This implementation will open a file handler when the read happens, and seek to the offset to start reading. Close
* the file handler as read finishes.
*
* @param allocator memory allocator
* @param file_name The file path to read from
* @return aws_parallel_input_stream
*/
AWS_S3_API
struct aws_parallel_input_stream *aws_parallel_input_stream_new_from_file(
struct aws_allocator *allocator,
struct aws_byte_cursor file_name);

AWS_EXTERN_C_END
AWS_POP_SANE_WARNING_LEVEL

#endif /* AWS_S3_PARALLEL_INPUT_STREAM_H */
7 changes: 0 additions & 7 deletions include/aws/s3/private/s3_request_messages.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,6 @@ struct aws_http_message *aws_s3_message_util_copy_http_message_no_body_filter_he
size_t excluded_headers_size,
bool exclude_x_amz_meta);

/* Copy message and retain all headers, but replace body with one that reads directly from a filepath. */
AWS_S3_API
struct aws_http_message *aws_s3_message_util_copy_http_message_filepath_body_all_headers(
struct aws_allocator *allocator,
struct aws_http_message *message,
struct aws_byte_cursor filepath);

/* Copy headers from one message to the other and exclude specific headers.
* exclude_x_amz_meta controls whether S3 user metadata headers (prefixed with "x-amz-meta) are excluded.*/
AWS_S3_API
Expand Down
1 change: 1 addition & 0 deletions include/aws/s3/s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ enum aws_s3_errors {
AWS_ERROR_S3_METRIC_DATA_NOT_AVAILABLE,
AWS_ERROR_S3_INCORRECT_CONTENT_LENGTH,
AWS_ERROR_S3_REQUEST_TIME_TOO_SKEWED,
AWS_ERROR_S3_FILE_MODIFIED,
AWS_ERROR_S3_END_RANGE = AWS_ERROR_ENUM_END_RANGE(AWS_C_S3_PACKAGE_ID)
};

Expand Down
1 change: 1 addition & 0 deletions source/s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ static struct aws_error_info s_errors[] = {
AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_METRIC_DATA_NOT_AVAILABLE, "The metric data is not available, the requests ends before the metric happens."),
AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_INCORRECT_CONTENT_LENGTH, "Request body length must match Content-Length header."),
AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_REQUEST_TIME_TOO_SKEWED, "RequestTimeTooSkewed error received from S3."),
AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_FILE_MODIFIED, "The file was modified during upload."),
};
/* clang-format on */

Expand Down
56 changes: 40 additions & 16 deletions source/s3_auto_ranged_put.c
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,10 @@ static bool s_s3_auto_ranged_put_update(
* Basically returns either part size or if content is not equally divisible into parts, the size of the remaining last
* part.
*/
static size_t s_compute_request_body_size(const struct aws_s3_meta_request *meta_request, uint32_t part_number) {
static size_t s_compute_request_body_size(
const struct aws_s3_meta_request *meta_request,
uint32_t part_number,
uint64_t *offset_out) {
AWS_PRECONDITION(meta_request);

const struct aws_s3_auto_ranged_put *auto_ranged_put = meta_request->impl;
Expand All @@ -731,6 +734,8 @@ static size_t s_compute_request_body_size(const struct aws_s3_meta_request *meta
request_body_size = content_remainder;
}
}
/* The part_number starts at 1 */
*offset_out = (part_number - 1) * meta_request->part_size;

return request_body_size;
}
Expand Down Expand Up @@ -895,8 +900,9 @@ static void s_skip_parts_from_stream_loop(void *user_data) {
AWS_ASSERT(skip_job->part_being_skipped != NULL);
aws_s3_meta_request_unlock_synced_data(meta_request);
/* END CRITICAL SECTION */
uint64_t offset = 0;

size_t request_body_size = s_compute_request_body_size(meta_request, skip_job->part_index + 1);
size_t request_body_size = s_compute_request_body_size(meta_request, skip_job->part_index + 1, &offset);
if (request_body_size != skip_job->part_being_skipped->size) {
error_code = AWS_ERROR_S3_RESUME_FAILED;
AWS_LOGF_ERROR(
Expand All @@ -915,7 +921,8 @@ static void s_skip_parts_from_stream_loop(void *user_data) {
aws_byte_buf_reset(temp_body_buf, false);
}

skip_job->asyncstep_read_each_part = aws_s3_meta_request_read_body(skip_job->meta_request, temp_body_buf);
skip_job->asyncstep_read_each_part =
aws_s3_meta_request_read_body(skip_job->meta_request, offset, temp_body_buf);

/* the read may or may not complete immediately */
if (aws_future_bool_register_callback_if_not_done(
Expand Down Expand Up @@ -1090,6 +1097,7 @@ struct aws_future_http_message *s_s3_prepare_create_multipart_upload(struct aws_
}
return future;
}

/* Prepare an UploadPart request */
struct aws_future_http_message *s_s3_prepare_upload_part(struct aws_s3_request *request) {
struct aws_s3_meta_request *meta_request = request->meta_request;
Expand All @@ -1109,10 +1117,16 @@ struct aws_future_http_message *s_s3_prepare_upload_part(struct aws_s3_request *
* Next async step: read through the body stream until we've
* skipped over parts that were already uploaded (in case we're resuming
* from an upload that had been paused) */
part_prep->asyncstep1_skip_prev_parts = s_skip_parts_from_stream(
meta_request, auto_ranged_put->prepare_data.part_index_for_skipping, request->part_number - 1);
aws_future_void_register_callback(
part_prep->asyncstep1_skip_prev_parts, s_s3_prepare_upload_part_on_skipping_done, part_prep);

if (meta_request->request_body_parallel_stream) {
/* For parallel read stream, which is seekable, don't need to skip the part by reading from the stream. */
s_s3_prepare_upload_part_on_skipping_done((void *)part_prep);
} else {
part_prep->asyncstep1_skip_prev_parts = s_skip_parts_from_stream(
meta_request, auto_ranged_put->prepare_data.part_index_for_skipping, request->part_number - 1);
aws_future_void_register_callback(
part_prep->asyncstep1_skip_prev_parts, s_s3_prepare_upload_part_on_skipping_done, part_prep);
}
} else {
/* Not the first time preparing request (e.g. retry).
* We can skip over the async steps that read the body stream */
Expand All @@ -1128,21 +1142,22 @@ static void s_s3_prepare_upload_part_on_skipping_done(void *user_data) {
struct aws_s3_request *request = part_prep->request;
struct aws_s3_meta_request *meta_request = request->meta_request;

int error_code = aws_future_void_get_error(part_prep->asyncstep1_skip_prev_parts);

/* If skipping failed, the prepare-upload-part job has failed. */
if (error_code) {
s_s3_prepare_upload_part_finish(part_prep, error_code);
return;
if (part_prep->asyncstep1_skip_prev_parts) {
int error_code = aws_future_void_get_error(part_prep->asyncstep1_skip_prev_parts);
/* If skipping failed, the prepare-upload-part job has failed. */
if (error_code) {
s_s3_prepare_upload_part_finish(part_prep, error_code);
return;
}
}

/* Skipping succeeded.
* Next async step: read body stream for this part into a buffer */
uint64_t offset = 0;

size_t request_body_size = s_compute_request_body_size(meta_request, request->part_number);
size_t request_body_size = s_compute_request_body_size(meta_request, request->part_number, &offset);
aws_byte_buf_init(&request->request_body, meta_request->allocator, request_body_size);

part_prep->asyncstep2_read_part = aws_s3_meta_request_read_body(meta_request, &request->request_body);
part_prep->asyncstep2_read_part = aws_s3_meta_request_read_body(meta_request, offset, &request->request_body);
aws_future_bool_register_callback(
part_prep->asyncstep2_read_part, s_s3_prepare_upload_part_on_read_done, part_prep);
}
Expand Down Expand Up @@ -1195,6 +1210,15 @@ static void s_s3_prepare_upload_part_on_read_done(void *user_data) {
if (!request->is_noop) {
auto_ranged_put->prepare_data.part_index_for_skipping = request->part_number;

/* The part can finish out of order. Resize array-list to be long enough to hold this part,
* filling any intermediate slots with NULL. */

aws_array_list_ensure_capacity(&auto_ranged_put->synced_data.part_list, request->part_number);
while (aws_array_list_length(&auto_ranged_put->synced_data.part_list) < request->part_number) {
struct aws_s3_mpu_part_info *null_part = NULL;
aws_array_list_push_back(&auto_ranged_put->synced_data.part_list, &null_part);
}

/* Add part to array-list */
struct aws_s3_mpu_part_info *part =
aws_mem_calloc(meta_request->allocator, 1, sizeof(struct aws_s3_mpu_part_info));
Expand Down
2 changes: 2 additions & 0 deletions source/s3_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "aws/s3/private/s3_copy_object.h"
#include "aws/s3/private/s3_default_meta_request.h"
#include "aws/s3/private/s3_meta_request_impl.h"
#include "aws/s3/private/s3_parallel_input_stream.h"
#include "aws/s3/private/s3_request_messages.h"
#include "aws/s3/private/s3_util.h"

Expand Down Expand Up @@ -126,6 +127,7 @@ static struct aws_s3_client_vtable s_s3_client_default_vtable = {
.process_work = s_s3_client_process_work_default,
.endpoint_shutdown_callback = s_s3_client_endpoint_shutdown_callback,
.finish_destroy = s_s3_client_finish_destroy_default,
.parallel_input_stream_new_from_file = aws_parallel_input_stream_new_from_file,
};

void aws_s3_set_dns_ttl(size_t ttl) {
Expand Down
3 changes: 2 additions & 1 deletion source/s3_default_meta_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ static struct aws_future_void *s_s3_default_prepare_request(struct aws_s3_reques
aws_byte_buf_init(&request->request_body, meta_request->allocator, meta_request_default->content_length);

/* Kick off the async read */
request_prep->step1_read_body = aws_s3_meta_request_read_body(meta_request, &request->request_body);
request_prep->step1_read_body =
aws_s3_meta_request_read_body(meta_request, 0 /*offset*/, &request->request_body);
aws_future_bool_register_callback(
request_prep->step1_read_body, s_s3_default_prepare_request_on_read_done, request_prep);
} else {
Expand Down
Loading