Skip to content

Commit

Permalink
update first byte timeout algo
Browse files Browse the repository at this point in the history
  • Loading branch information
TingDaoK committed Nov 15, 2024
1 parent c2784aa commit 452155b
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 11 deletions.
3 changes: 3 additions & 0 deletions include/aws/s3/private/s3_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ struct aws_http_connection;
struct aws_http_connection_manager;
struct aws_host_resolver;
struct aws_s3_endpoint;
struct aws_priority_queue;

enum aws_s3_connection_finish_code {
AWS_S3_CONNECTION_FINISH_CODE_SUCCESS,
Expand Down Expand Up @@ -181,6 +182,8 @@ struct aws_s3_upload_part_timeout_stats {
struct {
uint64_t sum_ns;
uint64_t num_samples;
bool collecting_p90;
struct aws_priority_queue p90_samples;
} initial_request_time;

/* Track the timeout rate. */
Expand Down
64 changes: 56 additions & 8 deletions source/s3_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <aws/common/clock.h>
#include <aws/common/device_random.h>
#include <aws/common/json.h>
#include <aws/common/priority_queue.h>
#include <aws/common/string.h>
#include <aws/common/system_info.h>
#include <aws/http/connection.h>
Expand Down Expand Up @@ -763,6 +764,9 @@ static void s_s3_client_finish_destroy_default(struct aws_s3_client *client) {
void *shutdown_user_data = client->shutdown_callback_user_data;

aws_s3_buffer_pool_destroy(client->buffer_pool);
if (client->synced_data.upload_part_stats.initial_request_time.collecting_p90) {
aws_priority_queue_clean_up(&client->synced_data.upload_part_stats.initial_request_time.p90_samples);
}

aws_mem_release(client->allocator, client->network_interface_names_cursor_array);
for (size_t i = 0; i < client->num_network_interface_names; i++) {
Expand Down Expand Up @@ -2528,6 +2532,13 @@ static uint64_t s_upload_timeout_threshold_ns = 5000000000; /* 5 Secs */
const size_t g_expect_timeout_offset_ms =
700; /* 0.7 Secs. From experiments on c5n.18xlarge machine for 30 GiB upload, it gave us best performance. */

static int s_compare_uint64_min_heap(const void *a, const void *b) {
uint64_t arg1 = *(const uint64_t *)a;
uint64_t arg2 = *(const uint64_t *)b;
/* Use a min-heap to get the P90, which will be the min of the largest 10% of data. */
return arg1 > arg2;
}

/**
* The upload timeout optimization: explained.
*
Expand All @@ -2548,10 +2559,10 @@ const size_t g_expect_timeout_offset_ms =
* would be better off waiting 5sec for the response, vs re-uploading the whole request.
*
* The current algorithm:
* 1. Start without a timeout value. After 10 requests completed, we know the average of how long the
* request takes. We decide if it's worth to set a timeout value or not. (If the average of request takes more than
* 5 secs or not) TODO: if the client have different part size, this doesn't make sense
* 2. If it is worth to retry, start with a default timeout value, 1 sec.
* 1. Start without a timeout value. After # of ideal connections requests completed, we know the average of how long
* the request takes. We decide if it's worth to set a timeout value or not. (If the average of request takes more
* than 5 secs or not).
* 2. If it is worth to retry, start with a timeout value from P90 of the initial samples.
* 3. If a request finishes successfully, use the average response_to_first_byte_time + g_expect_timeout_offset_ms as
* our expected timeout value. (TODO: The real expected timeout value should be a P99 of all the requests.)
* 3.1 Adjust the current timeout value against the expected timeout value, via 0.99 * <current timeout> + 0.01 *
Expand Down Expand Up @@ -2589,23 +2600,60 @@ void aws_s3_client_update_upload_part_timeout(
case AWS_ERROR_SUCCESS:
/* We only interested in request succeed */
stats->num_successful_upload_requests = aws_add_u64_saturating(stats->num_successful_upload_requests, 1);
if (stats->num_successful_upload_requests <= 10) {
if (stats->num_successful_upload_requests <= client->ideal_connection_count) {
/* Gether the data */
uint64_t request_time_ns =
metrics->time_metrics.receive_end_timestamp_ns - metrics->time_metrics.send_start_timestamp_ns;
stats->initial_request_time.sum_ns =
aws_add_u64_saturating(stats->initial_request_time.sum_ns, request_time_ns);
++stats->initial_request_time.num_samples;
if (stats->num_successful_upload_requests == 10) {
if (!stats->initial_request_time.collecting_p90) {
/* Initialize the priority queue to collect the p90 of the initial requests. */
stats->initial_request_time.collecting_p90 = true;
size_t queue_size = client->ideal_connection_count / 10;
/* at least take one */
queue_size = queue_size == 0 ? 1 : queue_size;
aws_priority_queue_init_dynamic(
&stats->initial_request_time.p90_samples,
client->allocator,
queue_size,
sizeof(uint64_t),
s_compare_uint64_min_heap);
} else {
/* check if the queue is full, if full pop and top and push the next. */
size_t current_size = aws_priority_queue_size(&stats->initial_request_time.p90_samples);
size_t current_capacity = aws_priority_queue_capacity(&stats->initial_request_time.p90_samples);
if (current_size == current_capacity) {
uint64_t *min = NULL;
aws_priority_queue_top(&stats->initial_request_time.p90_samples, (void **)&min);
if (request_time_ns > *min) {
/* Push the data into the queue if it's larger than the min of the queue. */
uint64_t pop_val = 0;
aws_priority_queue_pop(&stats->initial_request_time.p90_samples, &pop_val);
aws_priority_queue_push(&stats->initial_request_time.p90_samples, &request_time_ns);
}
} else {
aws_priority_queue_push(&stats->initial_request_time.p90_samples, &request_time_ns);
}
}

if (stats->num_successful_upload_requests == client->ideal_connection_count) {
/* Decide we need a timeout or not */
uint64_t average_request_time_ns =
stats->initial_request_time.sum_ns / stats->initial_request_time.num_samples;
if (average_request_time_ns >= s_upload_timeout_threshold_ns) {
/* We don't need a timeout, as retry will be slower than just wait for the server to response */
stats->stop_timeout = true;
} else {
/* Start the timeout at 1 secs */
aws_atomic_store_int(&client->upload_timeout_ms, 1000);
/* Start the timeout at th p90 of the initial samples. */
uint64_t *p90_ns = NULL;
aws_priority_queue_top(&stats->initial_request_time.p90_samples, (void **)&p90_ns);
uint64_t p90_ms =
aws_timestamp_convert(*p90_ns, AWS_TIMESTAMP_NANOS, AWS_TIMESTAMP_MILLIS, NULL);
aws_atomic_store_int(&client->upload_timeout_ms, (size_t)p90_ms);
/* Clean up the queue now, as not needed anymore. */
aws_priority_queue_clean_up(&stats->initial_request_time.p90_samples);
stats->initial_request_time.collecting_p90 = false;
}
}
goto unlock;
Expand Down
58 changes: 55 additions & 3 deletions tests/s3_client_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,19 @@ static int s_starts_upload_retry(struct aws_s3_client *client, struct aws_s3_req
AWS_ZERO_STRUCT(client->synced_data.upload_part_stats);

s_init_mock_s3_request_upload_part_timeout(mock_request, 0, average_time_ns, average_time_ns);
for (size_t i = 0; i < 10; i++) {
/* Mock a number of requests completed with the large time for the request */
size_t init_count = client->ideal_connection_count;
size_t p90_count = init_count / 10 + 1;
for (size_t i = 0; i < init_count - p90_count; i++) {
/* With 90% of the average request time. */
aws_s3_client_update_upload_part_timeout(client, mock_request, AWS_ERROR_SUCCESS);
}

uint64_t one_sec_time_ns = aws_timestamp_convert(1, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL); /* 1 Secs */
s_init_mock_s3_request_upload_part_timeout(mock_request, 0, one_sec_time_ns, one_sec_time_ns);
for (size_t i = 0; i < p90_count; i++) {
/* 10 percent of the request takes 1 sec */
aws_s3_client_update_upload_part_timeout(client, mock_request, AWS_ERROR_SUCCESS);
}
/* Check that retry should be turned off */
ASSERT_FALSE(client->synced_data.upload_part_stats.stop_timeout);
size_t current_timeout_ms = aws_atomic_load_int(&client->upload_timeout_ms);
Expand Down Expand Up @@ -78,7 +86,7 @@ TEST_CASE(client_update_upload_part_timeout) {
uint64_t average_time_ns = aws_timestamp_convert(
250, AWS_TIMESTAMP_MILLIS, AWS_TIMESTAMP_NANOS, NULL); /* 0.25 Secs, close to average for upload a part */

size_t init_count = 10;
size_t init_count = client->ideal_connection_count;
{
/* 1. If the request time is larger than 5 secs, we don't do retry */
AWS_ZERO_STRUCT(client->synced_data.upload_part_stats);
Expand All @@ -95,6 +103,35 @@ TEST_CASE(client_update_upload_part_timeout) {
ASSERT_TRUE(client->synced_data.upload_part_stats.stop_timeout);
size_t current_timeout_ms = aws_atomic_load_int(&client->upload_timeout_ms);
ASSERT_UINT_EQUALS(0, current_timeout_ms);
/* clean up */
if (client->synced_data.upload_part_stats.initial_request_time.collecting_p90) {
aws_priority_queue_clean_up(&client->synced_data.upload_part_stats.initial_request_time.p90_samples);
client->synced_data.upload_part_stats.initial_request_time.collecting_p90 = false;
}
}
{
/* 2. Test that the P90 of the init samples are used correctly */
AWS_ZERO_STRUCT(client->synced_data.upload_part_stats);
/* Hack around to set the ideal connection time for testing. */
size_t test_init_connection = 1000;
*(uint32_t *)(void *)&client->ideal_connection_count = test_init_connection;
for (size_t i = 0; i < test_init_connection; i++) {
/* Mock a number of requests completed with the large time for the request */
uint64_t time_ns = aws_timestamp_convert(i, AWS_TIMESTAMP_MILLIS, AWS_TIMESTAMP_NANOS, NULL);
s_init_mock_s3_request_upload_part_timeout(&mock_request, 0, time_ns, time_ns);
aws_s3_client_update_upload_part_timeout(client, &mock_request, AWS_ERROR_SUCCESS);
}

/* Check that retry should be turned off */
size_t current_timeout_ms = aws_atomic_load_int(&client->upload_timeout_ms);
ASSERT_UINT_EQUALS(900, current_timeout_ms);
/* clean up */
if (client->synced_data.upload_part_stats.initial_request_time.collecting_p90) {
aws_priority_queue_clean_up(&client->synced_data.upload_part_stats.initial_request_time.p90_samples);
client->synced_data.upload_part_stats.initial_request_time.collecting_p90 = false;
}
/* Change it back */
*(uint32_t *)(void *)&client->ideal_connection_count = init_count;
}

{
Expand Down Expand Up @@ -137,6 +174,11 @@ TEST_CASE(client_update_upload_part_timeout) {
aws_timestamp_convert(average_time_ns, AWS_TIMESTAMP_NANOS, AWS_TIMESTAMP_MILLIS, NULL) +
g_expect_timeout_offset_ms,
current_timeout_ms);
/* clean up */
if (client->synced_data.upload_part_stats.initial_request_time.collecting_p90) {
aws_priority_queue_clean_up(&client->synced_data.upload_part_stats.initial_request_time.p90_samples);
client->synced_data.upload_part_stats.initial_request_time.collecting_p90 = false;
}
}

{
Expand All @@ -162,6 +204,11 @@ TEST_CASE(client_update_upload_part_timeout) {
current_timeout_ms = aws_atomic_load_int(&client->upload_timeout_ms);
/* 1.1 secs, still */
ASSERT_UINT_EQUALS(1100, current_timeout_ms);
/* clean up */
if (client->synced_data.upload_part_stats.initial_request_time.collecting_p90) {
aws_priority_queue_clean_up(&client->synced_data.upload_part_stats.initial_request_time.p90_samples);
client->synced_data.upload_part_stats.initial_request_time.collecting_p90 = false;
}
}

{
Expand Down Expand Up @@ -224,6 +271,11 @@ TEST_CASE(client_update_upload_part_timeout) {
current_timeout_ms = aws_atomic_load_int(&client->upload_timeout_ms);
ASSERT_UINT_EQUALS(3000, current_timeout_ms);
ASSERT_FALSE(client->synced_data.upload_part_stats.stop_timeout);
/* clean up */
if (client->synced_data.upload_part_stats.initial_request_time.collecting_p90) {
aws_priority_queue_clean_up(&client->synced_data.upload_part_stats.initial_request_time.p90_samples);
client->synced_data.upload_part_stats.initial_request_time.collecting_p90 = false;
}
}

{
Expand Down

0 comments on commit 452155b

Please sign in to comment.