Skip to content

Commit

Permalink
Fix deadlock when secure tunnel sends multi-frame messages (#65)
Browse files Browse the repository at this point in the history
* Fix deadlock when secure tunnel sends multi-frame messages

* On commit 0119fb3 the secure tunneling library introduced
  a condition variable to limit no more than one frame (15kb)
  being copied to the websocket event loop at a time.
* The synchronization mechanism will deadlock when more than
  one frame of data are sent and the the thread used by the
  client to send data and the websocket thread are the same.
* Since the secure tunneling module makes redundant copies of
  each frame before queuing on the websocket event loop there
  is no apparent reason for limiting the number of frames
  copied to the websocket event loop.
* This commit removes all synchronization from the public api
  used to send data using a secure tunnel.
* Unit tests are added that exercise the public api using
  payloads consisting of one or more frames.
  • Loading branch information
marcoemorais-aws authored Feb 16, 2022
1 parent d5dacf8 commit e3ea832
Show file tree
Hide file tree
Showing 4 changed files with 280 additions and 43 deletions.
16 changes: 11 additions & 5 deletions include/aws/iotdevice/private/secure_tunneling_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ struct aws_secure_tunnel_vtable {
int (*close)(struct aws_secure_tunnel *secure_tunnel);
};

struct aws_websocket_client_connection_options;
struct aws_websocket_send_frame_options;

struct aws_websocket_vtable {
int (*client_connect)(const struct aws_websocket_client_connection_options *options);
int (*send_frame)(struct aws_websocket *websocket, const struct aws_websocket_send_frame_options *options);
void (*close)(struct aws_websocket *websocket, bool free_scarce_resources_immediately);
void (*release)(struct aws_websocket *websocket);
};

struct aws_secure_tunnel {
/* Static settings */
struct aws_allocator *alloc;
Expand All @@ -34,6 +44,7 @@ struct aws_secure_tunnel {
struct aws_tls_ctx *tls_ctx;
struct aws_tls_connection_options tls_con_opt;
struct aws_secure_tunnel_vtable vtable;
struct aws_websocket_vtable websocket_vtable;

struct aws_ref_count ref_count;

Expand All @@ -50,11 +61,6 @@ struct aws_secure_tunnel {
/* The secure tunneling endpoint ELB drops idle connect after 1 minute. We need to send a ping periodically to keep
* the connection */

/* Shared State, making websocket send data sync */
bool can_send_data;
struct aws_mutex send_data_mutex;
struct aws_condition_variable send_data_condition_variable;

struct ping_task_context *ping_task_context;
};

Expand Down
53 changes: 20 additions & 33 deletions source/secure_tunneling.c
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,10 @@ struct aws_secure_tunnel_options_storage *aws_secure_tunnel_options_storage_new(
return NULL;
}

static void s_send_websocket_ping(struct aws_websocket *websocket) {
typedef int(
websocket_send_frame)(struct aws_websocket *websocket, const struct aws_websocket_send_frame_options *options);

static void s_send_websocket_ping(struct aws_websocket *websocket, websocket_send_frame *send_frame) {
if (!websocket) {
return;
}
Expand All @@ -128,7 +131,7 @@ static void s_send_websocket_ping(struct aws_websocket *websocket) {
AWS_ZERO_STRUCT(frame_options);
frame_options.opcode = AWS_WEBSOCKET_OPCODE_PING;
frame_options.fin = true;
aws_websocket_send_frame(websocket, &frame_options);
send_frame(websocket, &frame_options);
}

struct ping_task_context {
Expand All @@ -138,6 +141,9 @@ struct ping_task_context {
struct aws_task ping_task;
struct aws_atomic_var task_cancelled;
struct aws_websocket *websocket;

/* The ping_task shares the vtable function used by the secure tunnel to send frames over the websocket. */
websocket_send_frame *send_frame;
};

static void s_ping_task(struct aws_task *task, void *user_data, enum aws_task_status task_status) {
Expand All @@ -159,7 +165,7 @@ static void s_ping_task(struct aws_task *task, void *user_data, enum aws_task_st
return;
}

s_send_websocket_ping(ping_task_context->websocket);
s_send_websocket_ping(ping_task_context->websocket, ping_task_context->send_frame);

/* Schedule the next task */
uint64_t now;
Expand Down Expand Up @@ -200,6 +206,7 @@ static void s_on_websocket_setup(
aws_event_loop_group_get_next_loop(secure_tunnel->options->bootstrap->event_loop_group);
aws_atomic_store_int(&ping_task_context->task_cancelled, 0);
ping_task_context->websocket = websocket;
ping_task_context->send_frame = secure_tunnel->websocket_vtable.send_frame;

aws_task_init(&ping_task_context->ping_task, s_ping_task, ping_task_context, "SecureTunnelingPingTask");
aws_event_loop_schedule_task_now(ping_task_context->event_loop, &ping_task_context->ping_task);
Expand Down Expand Up @@ -432,7 +439,7 @@ static int s_secure_tunneling_connect(struct aws_secure_tunnel *secure_tunnel) {

struct aws_websocket_client_connection_options websocket_options;
init_websocket_client_connection_options(secure_tunnel, &websocket_options);
if (aws_websocket_client_connect(&websocket_options)) {
if (secure_tunnel->websocket_vtable.client_connect(&websocket_options)) {
return AWS_OP_ERR;
}

Expand All @@ -445,8 +452,8 @@ static int s_secure_tunneling_close(struct aws_secure_tunnel *secure_tunnel) {
}

s_reset_secure_tunnel(secure_tunnel);
aws_websocket_close(secure_tunnel->websocket, false);
aws_websocket_release(secure_tunnel->websocket);
secure_tunnel->websocket_vtable.close(secure_tunnel->websocket, false);
secure_tunnel->websocket_vtable.release(secure_tunnel->websocket);
secure_tunnel->websocket = NULL;
return AWS_OP_SUCCESS;
}
Expand All @@ -461,12 +468,6 @@ static void s_secure_tunneling_on_send_data_complete_callback(
secure_tunnel->options->on_send_data_complete(error_code, pair->secure_tunnel->options->user_data);
aws_byte_buf_clean_up(&pair->buf);
aws_mem_release(secure_tunnel->alloc, pair);

aws_mutex_lock(&secure_tunnel->send_data_mutex);
secure_tunnel->can_send_data = true;
aws_mutex_unlock(&secure_tunnel->send_data_mutex);

aws_condition_variable_notify_one(&secure_tunnel->send_data_condition_variable);
}

bool secure_tunneling_send_data_call(struct aws_websocket *websocket, struct aws_byte_buf *out_buf, void *user_data) {
Expand Down Expand Up @@ -566,13 +567,7 @@ static int s_secure_tunneling_send(
if (secure_tunneling_init_send_frame(&frame_options, secure_tunnel, data, type) != AWS_OP_SUCCESS) {
return AWS_OP_ERR;
}
return aws_websocket_send_frame(secure_tunnel->websocket, &frame_options);
}

static bool s_can_send_data_status(void *user_data) {
struct aws_secure_tunnel *secure_tunnel = (struct aws_secure_tunnel *)user_data;
bool temp = secure_tunnel->can_send_data;
return temp;
return secure_tunnel->websocket_vtable.send_frame(secure_tunnel->websocket, &frame_options);
}

static int s_secure_tunneling_send_data(struct aws_secure_tunnel *secure_tunnel, const struct aws_byte_cursor *data) {
Expand All @@ -582,15 +577,6 @@ static int s_secure_tunneling_send_data(struct aws_secure_tunnel *secure_tunnel,
}
struct aws_byte_cursor new_data = *data;
while (new_data.len) {
aws_mutex_lock(&secure_tunnel->send_data_mutex);
aws_condition_variable_wait_pred(
&secure_tunnel->send_data_condition_variable,
&secure_tunnel->send_data_mutex,
s_can_send_data_status,
secure_tunnel);
secure_tunnel->can_send_data = false;
aws_mutex_unlock(&secure_tunnel->send_data_mutex);

size_t bytes_max = new_data.len;
size_t amount_to_send = bytes_max < AWS_IOT_ST_SPLIT_MESSAGE_SIZE ? bytes_max : AWS_IOT_ST_SPLIT_MESSAGE_SIZE;

Expand Down Expand Up @@ -670,21 +656,22 @@ struct aws_secure_tunnel *aws_secure_tunnel_new(const struct aws_secure_tunnel_o
goto error;
}

/* Setup vtable here */
/* Setup vtables here. */
secure_tunnel->vtable.connect = s_secure_tunneling_connect;
secure_tunnel->vtable.close = s_secure_tunneling_close;
secure_tunnel->vtable.send_data = s_secure_tunneling_send_data;
secure_tunnel->vtable.send_stream_start = s_secure_tunneling_send_stream_start;
secure_tunnel->vtable.send_stream_reset = s_secure_tunneling_send_stream_reset;

secure_tunnel->websocket_vtable.client_connect = aws_websocket_client_connect;
secure_tunnel->websocket_vtable.send_frame = aws_websocket_send_frame;
secure_tunnel->websocket_vtable.close = aws_websocket_close;
secure_tunnel->websocket_vtable.release = aws_websocket_release;

secure_tunnel->handshake_request = NULL;
secure_tunnel->stream_id = INVALID_STREAM_ID;
secure_tunnel->websocket = NULL;

secure_tunnel->can_send_data = true;
aws_mutex_init(&secure_tunnel->send_data_mutex);
aws_condition_variable_init(&secure_tunnel->send_data_condition_variable);

/* TODO: Release this buffer when there is no data to hold */
aws_byte_buf_init(&secure_tunnel->received_data, options->allocator, MAX_WEBSOCKET_PAYLOAD);

Expand Down
2 changes: 2 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ if (UNIX AND NOT APPLE)
add_net_test_case(secure_tunneling_handle_data_receive_test)
add_net_test_case(secure_tunneling_handle_stream_reset_test)
add_net_test_case(secure_tunneling_handle_session_reset_test)
add_net_test_case(secure_tunneling_handle_session_reset_no_stream_test)
add_net_test_case(secure_tunneling_init_websocket_options_test)
add_net_test_case(secure_tunneling_handle_send_data)
add_net_test_case(secure_tunneling_handle_send_data_stream_start)
add_net_test_case(secure_tunneling_handle_send_data_stream_reset)
add_net_test_case(secure_tunneling_handle_send_data_public)
endif()

generate_test_driver(${PROJECT_NAME}-tests)
Expand Down
Loading

0 comments on commit e3ea832

Please sign in to comment.