Skip to content

Commit

Permalink
Merge branch 'grand_dispatch_queue' of github.com:awslabs/aws-c-io in…
Browse files Browse the repository at this point in the history
…to nw_socket
  • Loading branch information
xiazhvera committed Jan 8, 2025
2 parents 630d0bb + d986649 commit 1dc7c7d
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 116 deletions.
13 changes: 13 additions & 0 deletions include/aws/io/io.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,19 @@ AWS_PUSH_SANE_WARNING_LEVEL
#define AWS_C_IO_PACKAGE_ID 1

struct aws_io_handle;
typedef void aws_io_set_queue_on_handle_fn(struct aws_io_handle *handle, void *queue);
typedef void aws_io_clear_queue_on_handle_fn(struct aws_io_handle *handle);

struct aws_io_handle {
union {
int fd;
/* on Apple systems, handle is of type nw_connection_t. On Windows, it's a SOCKET handle. */
void *handle;
} data;
void *additional_data;
aws_io_set_queue_on_handle_fn *set_queue;
aws_io_clear_queue_on_handle_fn *clear_queue;
};

enum aws_io_message_type {
AWS_IO_MESSAGE_APPLICATION_DATA,
Expand Down
14 changes: 0 additions & 14 deletions include/aws/io/private/event_loop_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,6 @@ AWS_PUSH_SANE_WARNING_LEVEL
struct aws_event_loop;
struct aws_overlapped;

typedef void aws_io_set_queue_on_handle_fn(struct aws_io_handle *handle, void *queue);
typedef void aws_io_clear_queue_on_handle_fn(struct aws_io_handle *handle);

struct aws_io_handle {
union {
int fd;
/* on Apple systems, handle is of type nw_connection_t. On Windows, it's a SOCKET handle. */
void *handle;
} data;
void *additional_data;
aws_io_set_queue_on_handle_fn *set_queue;
aws_io_clear_queue_on_handle_fn *clear_queue;
};

typedef void(aws_event_loop_on_completion_fn)(
struct aws_event_loop *event_loop,
struct aws_overlapped *overlapped,
Expand Down
1 change: 0 additions & 1 deletion include/aws/io/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

#include <aws/io/channel.h>
#include <aws/io/io.h>
#include <aws/io/private/event_loop_impl.h>

AWS_PUSH_SANE_WARNING_LEVEL

Expand Down
165 changes: 90 additions & 75 deletions source/darwin/dispatch_queue_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

#include <unistd.h>

#include "dispatch_queue.h"
#include "./dispatch_queue_event_loop_private.h" // private header
#include <Block.h>
#include <dispatch/dispatch.h>
#include <dispatch/queue.h>
Expand Down Expand Up @@ -75,24 +75,35 @@ static struct aws_event_loop_vtable s_vtable = {
* Functions ************
* `s_run_iteration`: The function execute on each single iteration
* `begin_iteration`: Decide if we should run the iteration
* `end_iteration`: Clean up the related resource and decide if we should schedule next iteration
* `end_iteration`: Clean up the related resource and determine if we should schedule next iteration
*
*/

/* Internal ref-counted dispatch loop context to processing Apple Dispatch Queue Resources */

/* The dispatch_scheduling_state holds required information to schedule a "block" on the dispatch_queue. */
struct dispatch_scheduling_state {

/**
* The lock is used to protect the scheduled_services list cross threads. It should be hold while we add/remove
* entries from the scheduled_services list.
*/
struct aws_mutex services_lock;
/**
* List<scheduled_service_entry> in sorted order by timestamp
* List<scheduled_service_entry> in sorted order by timestamp. Each scheduled_service_entry represents a block
* ALREADY SCHEDULED on apple dispatch queue.
*
* When we go to schedule a new iteration, we check here first to see
* if our scheduling attempt is redundant
* When we go to schedule a new iteration, we check here first to see if our scheduling attempt is redundant.
*/
struct aws_linked_list scheduled_services;
};

/* Internal ref-counted dispatch loop context to processing Apple Dispatch Queue Resources */
struct dispatch_loop_context {
/**
* The conetxt lock is a read-write lock used to protect dispatch_loop.
* The write lock will be acquired when we make changes to dispatch_loop. And the read lock will be acquired
* when we need verify if the dispatch_loop is alive. This makes sure that the dispatch_loop will not be destroyed
* from other thread while we are using it.
*/
struct aws_rw_lock lock;
struct dispatch_loop *io_dispatch_loop;
struct dispatch_scheduling_state scheduling_state;
Expand All @@ -111,44 +122,44 @@ struct scheduled_service_entry {
struct dispatch_loop_context *dispatch_queue_context;
};

static void s_acquire_dispatch_loop_context(struct dispatch_loop_context *contxt) {
aws_ref_count_acquire(&contxt->ref_count);
static void *s_acquire_dispatch_loop_context(struct dispatch_loop_context *context) {
return aws_ref_count_acquire(&context->ref_count);
}

static void s_release_dispatch_loop_context(struct dispatch_loop_context *contxt) {
aws_ref_count_release(&contxt->ref_count);
static size_t s_release_dispatch_loop_context(struct dispatch_loop_context *context) {
return aws_ref_count_release(&context->ref_count);
}

static void s_rlock_dispatch_loop_context(struct dispatch_loop_context *contxt) {
aws_rw_lock_rlock(&contxt->lock);
static int s_rlock_dispatch_loop_context(struct dispatch_loop_context *context) {
return aws_rw_lock_rlock(&context->lock);
}

static void s_runlock_dispatch_loop_context(struct dispatch_loop_context *contxt) {
aws_rw_lock_runlock(&contxt->lock);
static int s_runlock_dispatch_loop_context(struct dispatch_loop_context *context) {
return aws_rw_lock_runlock(&context->lock);
}

static void s_wlock_dispatch_loop_context(struct dispatch_loop_context *contxt) {
aws_rw_lock_wlock(&contxt->lock);
static int s_wlock_dispatch_loop_context(struct dispatch_loop_context *context) {
return aws_rw_lock_wlock(&context->lock);
}

static void s_wunlock_dispatch_loop_context(struct dispatch_loop_context *contxt) {
aws_rw_lock_wunlock(&contxt->lock);
static int s_wunlock_dispatch_loop_context(struct dispatch_loop_context *context) {
return aws_rw_lock_wunlock(&context->lock);
}

static void s_lock_cross_thread_data(struct dispatch_loop *loop) {
aws_mutex_lock(&loop->synced_data.lock);
static int s_lock_cross_thread_data(struct dispatch_loop *loop) {
return aws_mutex_lock(&loop->synced_data.lock);
}

static void s_unlock_cross_thread_data(struct dispatch_loop *loop) {
aws_mutex_unlock(&loop->synced_data.lock);
static int s_unlock_cross_thread_data(struct dispatch_loop *loop) {
return aws_mutex_unlock(&loop->synced_data.lock);
}

static void s_lock_service_entries(struct dispatch_loop_context *contxt) {
aws_mutex_lock(&contxt->scheduling_state.services_lock);
static int s_lock_service_entries(struct dispatch_loop_context *context) {
return aws_mutex_lock(&context->scheduling_state.services_lock);
}

static void s_unlock_service_entries(struct dispatch_loop_context *contxt) {
aws_mutex_unlock(&contxt->scheduling_state.services_lock);
static int s_unlock_service_entries(struct dispatch_loop_context *context) {
return aws_mutex_unlock(&context->scheduling_state.services_lock);
}

static struct scheduled_service_entry *s_scheduled_service_entry_new(
Expand All @@ -159,8 +170,7 @@ static struct scheduled_service_entry *s_scheduled_service_entry_new(

entry->allocator = context->allocator;
entry->timestamp = timestamp;
entry->dispatch_queue_context = context;
s_acquire_dispatch_loop_context(context);
entry->dispatch_queue_context = s_acquire_dispatch_loop_context(context);

return entry;
}
Expand All @@ -175,16 +185,18 @@ static void s_scheduled_service_entry_destroy(struct scheduled_service_entry *en
aws_mem_release(entry->allocator, entry);
}

// checks to see if another scheduled iteration already exists that will either
// handle our needs or reschedule at the end to do so
static bool s_should_schedule_iteration(
struct aws_linked_list *scheduled_iterations,
uint64_t proposed_iteration_time) {
if (aws_linked_list_empty(scheduled_iterations)) {
/**
* Helper function to check if another scheduled iteration already exists that will handle our needs
*
* The function should be wrapped with the following locks:
* scheduled_services lock: To safely access the scheduled_services list
*/
static bool s_should_schedule_iteration(struct aws_linked_list *scheduled_services, uint64_t proposed_iteration_time) {
if (aws_linked_list_empty(scheduled_services)) {
return true;
}

struct aws_linked_list_node *head_node = aws_linked_list_front(scheduled_iterations);
struct aws_linked_list_node *head_node = aws_linked_list_front(scheduled_services);
struct scheduled_service_entry *entry = AWS_CONTAINER_OF(head_node, struct scheduled_service_entry, node);

// is the next scheduled iteration later than what we require?
Expand Down Expand Up @@ -227,14 +239,15 @@ static void s_dispatch_event_loop_destroy(void *context) {
AWS_LOGF_DEBUG(AWS_LS_IO_EVENT_LOOP, "id=%p: Destroyed Dispatch Queue Event Loop.", (void *)event_loop);
}

/** Return a aws_string* with unique dispatch queue id string. The id is In format of
* "com.amazonaws.commonruntime.eventloop.<UUID>"*/
static struct aws_byte_cursor AWS_LITERAL_APPLE_DISPATCH_QUEUE_ID_PREFIX =
AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("com.amazonaws.commonruntime.eventloop.");
static const size_t AWS_IO_APPLE_DISPATCH_QUEUE_ID_PREFIX_LENGTH = 37;
static const char AWS_LITERAL_APPLE_DISPATCH_QUEUE_ID_PREFIX[] = "com.amazonaws.commonruntime.eventloop.";
static const size_t AWS_IO_APPLE_DISPATCH_QUEUE_ID_PREFIX_LENGTH =
AWS_ARRAY_SIZE(AWS_LITERAL_APPLE_DISPATCH_QUEUE_ID_PREFIX);
static const size_t AWS_IO_APPLE_DISPATCH_QUEUE_ID_LENGTH =
AWS_IO_APPLE_DISPATCH_QUEUE_ID_PREFIX_LENGTH + AWS_UUID_STR_LEN;

/**
* Generates a unique identifier for a dispatch queue in the format "com.amazonaws.commonruntime.eventloop.<UUID>".
* This identifier will be stored in the provided `result` buffer.
*/
static void s_get_unique_dispatch_queue_id(char result[AWS_IO_APPLE_DISPATCH_QUEUE_ID_LENGTH]) {
struct aws_uuid uuid;
AWS_FATAL_ASSERT(aws_uuid_init(&uuid) == AWS_OP_SUCCESS);
Expand All @@ -243,7 +256,7 @@ static void s_get_unique_dispatch_queue_id(char result[AWS_IO_APPLE_DISPATCH_QUE
uuid_buf.len = 0;
aws_uuid_to_str(&uuid, &uuid_buf);

memcpy(result, AWS_LITERAL_APPLE_DISPATCH_QUEUE_ID_PREFIX.ptr, AWS_IO_APPLE_DISPATCH_QUEUE_ID_PREFIX_LENGTH);
memcpy(result, AWS_LITERAL_APPLE_DISPATCH_QUEUE_ID_PREFIX, AWS_IO_APPLE_DISPATCH_QUEUE_ID_PREFIX_LENGTH);
memcpy(result + AWS_IO_APPLE_DISPATCH_QUEUE_ID_PREFIX_LENGTH, uuid_buf.buffer, uuid_buf.len);
}

Expand All @@ -265,6 +278,7 @@ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue(
dispatch_loop = aws_mem_calloc(alloc, 1, sizeof(struct dispatch_loop));
dispatch_loop->allocator = alloc;
loop->impl_data = dispatch_loop;
dispatch_loop->base_loop = loop;

char dispatch_queue_id[AWS_IO_APPLE_DISPATCH_QUEUE_ID_LENGTH] = {0};
s_get_unique_dispatch_queue_id(dispatch_queue_id);
Expand All @@ -288,8 +302,6 @@ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue(
goto clean_up;
}

dispatch_loop->base_loop = loop;

aws_linked_list_init(&dispatch_loop->synced_data.cross_thread_tasks);

struct dispatch_loop_context *context = aws_mem_calloc(alloc, 1, sizeof(struct dispatch_loop_context));
Expand Down Expand Up @@ -403,8 +415,12 @@ static int s_stop(struct aws_event_loop *event_loop) {
return AWS_OP_SUCCESS;
}

// returns true if we should execute an iteration, false otherwise
// The function should be wrapped with dispatch_loop->context.lock
/**
* The function decides if we should run this iteration.
* Returns true if we should execute an iteration, false otherwise
*
* The function should be wrapped with dispatch_loop->context.lock to retain the dispatch loop while running.
*/
static bool begin_iteration(struct scheduled_service_entry *entry) {
struct dispatch_loop *dispatch_loop = entry->dispatch_queue_context->io_dispatch_loop;

Expand All @@ -414,40 +430,35 @@ static bool begin_iteration(struct scheduled_service_entry *entry) {
return true;
}

// conditionally schedule another iteration as needed
// The function should be wrapped with dispatch_loop->context.lock
/**
* Clean up the related resource and determine if we should schedule next iteration.
* The function should be wrapped with dispatch_loop->context.lock to retain the dispatch loop while running.
* */
static void end_iteration(struct scheduled_service_entry *entry) {

struct dispatch_loop_context *contxt = entry->dispatch_queue_context;
struct dispatch_loop *dispatch_loop = contxt->io_dispatch_loop;
struct dispatch_loop_context *context = entry->dispatch_queue_context;
struct dispatch_loop *dispatch_loop = context->io_dispatch_loop;

s_lock_cross_thread_data(dispatch_loop);
dispatch_loop->synced_data.is_executing = false;

// Remove the node before do scheduling so we didnt consider the entry itself
aws_linked_list_remove(&entry->node);
// if there are any cross-thread tasks, reschedule an iteration for now

bool should_schedule = false;
uint64_t should_schedule_at_time = 0;
if (!aws_linked_list_empty(&dispatch_loop->synced_data.cross_thread_tasks)) {
// added during service which means nothing was scheduled because will_schedule was true
s_lock_service_entries(contxt);
s_try_schedule_new_iteration(contxt, 0);
s_unlock_service_entries(contxt);
} else {
// no cross thread tasks, so check internal time-based scheduler
uint64_t next_task_time = 0;
/* we already know it has tasks, we just scheduled one. We just want the next run time. */
bool has_task = aws_task_scheduler_has_tasks(&dispatch_loop->scheduler, &next_task_time);

if (has_task) {
// only schedule an iteration if there isn't an existing dispatched iteration for the next task time or
// earlier
s_lock_service_entries(contxt);
if (s_should_schedule_iteration(
&dispatch_loop->context->scheduling_state.scheduled_services, next_task_time)) {
s_try_schedule_new_iteration(contxt, next_task_time);
}
s_unlock_service_entries(contxt);
}
should_schedule = true;
}
/* we already know there are tasks to be scheduled, we just want the next run time. */
else if (aws_task_scheduler_has_tasks(&dispatch_loop->scheduler, &should_schedule_at_time)) {
should_schedule = true;
}

if (should_schedule) {
s_lock_service_entries(context);
s_try_schedule_new_iteration(context, should_schedule_at_time);
s_unlock_service_entries(context);
}

s_unlock_cross_thread_data(dispatch_loop);
Expand Down Expand Up @@ -507,7 +518,10 @@ static void s_run_iteration(void *context) {
*
* If timestamp==0, the function will always schedule a new iteration as long as the event loop is not suspended.
*
* The function should be wrapped with dispatch_loop->context->lock & dispatch_loop->synced_data.lock
* The function should be wrapped with the following locks:
* dispatch_loop->context->lock: To retain the dispatch loop
* dispatch_loop->synced_data.lock : To verify if the dispatch loop is suspended
* dispatch_loop_context->scheduling_state->services_lock: To modify the scheduled_services list
*/
static void s_try_schedule_new_iteration(struct dispatch_loop_context *dispatch_loop_context, uint64_t timestamp) {
struct dispatch_loop *dispatch_loop = dispatch_loop_context->io_dispatch_loop;
Expand All @@ -526,12 +540,13 @@ static void s_try_schedule_new_iteration(struct dispatch_loop_context *dispatch_
/**
* The Apple dispatch queue uses automatic reference counting (ARC). If an iteration remains in the queue, it will
* persist until it is executed. Scheduling a block far into the future can keep the dispatch queue alive
* unnecessarily, even if the app is destroyed. To avoid this, Ensure an iteration is scheduled within a 1-second
* interval to prevent it from remaining in the Apple dispatch queue indefinitely.
* unnecessarily, even if the app has shutdown. To avoid this, Ensure an iteration is scheduled within a
* 1-second interval to prevent it from remaining in the Apple dispatch queue indefinitely.
*/
delta = MIN(delta, AWS_TIMESTAMP_NANOS);

if (delta == 0) {
// dispatch_after_f(0 , ...) is equivclient to dispatch_async_f(...) functionality wise, while
// dispatch_after_f(0 , ...) is not as optimal as dispatch_async_f(...)
// https://developer.apple.com/documentation/dispatch/1452878-dispatch_after_f
dispatch_async_f(dispatch_loop->dispatch_queue, entry, s_run_iteration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,6 @@
#include <aws/io/tls_channel_handler.h>
#include <dispatch/dispatch.h>

struct secure_transport_ctx {
struct aws_tls_ctx ctx;
CFAllocatorRef wrapped_allocator;
CFArrayRef certs;
SecIdentityRef secitem_identity;
CFArrayRef ca_cert;
enum aws_tls_versions minimum_version;
struct aws_string *alpn_list;
bool verify_peer;
};

struct dispatch_loop;
struct dispatch_loop_context;

Expand All @@ -40,6 +29,10 @@ struct dispatch_loop {

/* Synced data handle cross thread tasks and events, and event loop operations*/
struct {
/**
* The lock is used to protect synced_data across the threads. It should be acquired whenever we touched the
* data in this synced_data struct.
*/
struct aws_mutex lock;
/*
* `is_executing` flag and `current_thread_id` together are used
Expand Down
Loading

0 comments on commit 1dc7c7d

Please sign in to comment.