Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cached clocks 3 #800

Merged
merged 11 commits into from
Dec 31, 2019
2 changes: 2 additions & 0 deletions aeron-driver/src/main/c/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ SET(SOURCE
util/aeron_parse_util.c
util/aeron_properties_util.c
util/aeron_http_util.c
util/aeron_clock.c
aeron_driver_context.c
aeron_alloc.c
aeron_driver.c
Expand Down Expand Up @@ -191,6 +192,7 @@ SET(HEADERS
util/aeron_parse_util.h
util/aeron_properties_util.h
util/aeron_http_util.h
util/aeron_clock.h
concurrent/aeron_thread.h
concurrent/aeron_atomic.h
concurrent/aeron_atomic64_gcc_x86_64.h
Expand Down
45 changes: 0 additions & 45 deletions aeron-driver/src/main/c/aeron_driver.c
Original file line number Diff line number Diff line change
Expand Up @@ -113,51 +113,6 @@ void aeron_log_func_none(const char *str)
{
}

int64_t aeron_nano_clock()
{
struct timespec ts;
#if defined(__CYGWIN__) || defined(__linux__)
if (clock_gettime(CLOCK_MONOTONIC, &ts) < 0)
{
return -1;
}
#elif defined(AERON_COMPILER_MSVC)
if (aeron_clock_gettime_monotonic(&ts) < 0)
{
return -1;
}
#else
if (clock_gettime(CLOCK_MONOTONIC_RAW, &ts) < 0)
{
return -1;
}
#endif

return (ts.tv_sec * 1000000000) + ts.tv_nsec;
}

int64_t aeron_epoch_clock()
{
struct timespec ts;
#if defined(AERON_COMPILER_MSVC)
if (aeron_clock_gettime_realtime(&ts) < 0)
{
return -1;
}
#else
#if defined(CLOCK_REALTIME_COARSE)
if (clock_gettime(CLOCK_REALTIME_COARSE, &ts) < 0)
#else
if (clock_gettime(CLOCK_REALTIME, &ts) < 0)
#endif
{
return -1;
}
#endif

return (ts.tv_sec * 1000) + (ts.tv_nsec / 1000000);
}

extern int aeron_number_of_trailing_zeroes(int32_t value);
extern int aeron_number_of_trailing_zeroes_u64(uint64_t value);
extern int aeron_number_of_leading_zeroes(int32_t value);
Expand Down
45 changes: 30 additions & 15 deletions aeron-driver/src/main/c/aeron_driver_conductor.c
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,7 @@ int aeron_driver_conductor_init(aeron_driver_conductor_t *conductor, aeron_drive

int64_t now_ns = context->nano_clock();

conductor->nano_clock = context->nano_clock;
conductor->epoch_clock = context->epoch_clock;
conductor->clock_update_deadline_ns = 0;
conductor->time_of_last_timeout_check_ns = now_ns;
conductor->time_of_last_to_driver_position_change_ns = now_ns;
conductor->next_session_id = aeron_randomised_int32();
Expand Down Expand Up @@ -280,7 +279,8 @@ aeron_client_t *aeron_driver_conductor_get_or_add_client(aeron_driver_conductor_

client->heartbeat_timestamp.counter_id = client_heartbeat.counter_id;
client->heartbeat_timestamp.value_addr = client_heartbeat.value_addr;
aeron_counter_set_ordered(client->heartbeat_timestamp.value_addr, conductor->context->epoch_clock());
const int64_t now_ms = aeron_clock_cached_epoch_time(conductor->context->cached_clock);
aeron_counter_set_ordered(client->heartbeat_timestamp.value_addr, now_ms);

client->client_liveness_timeout_ms = conductor->context->client_liveness_timeout_ns < 1000000 ?
1 : conductor->context->client_liveness_timeout_ns / 1000000;
Expand Down Expand Up @@ -928,7 +928,8 @@ aeron_ipc_publication_t *aeron_driver_conductor_get_or_add_ipc_publication(
client->publication_links.length++;

conductor->ipc_publications.array[conductor->ipc_publications.length++].publication = publication;
publication->conductor_fields.managed_resource.time_of_last_state_change = conductor->nano_clock();
publication->conductor_fields.managed_resource.time_of_last_state_change =
aeron_clock_cached_nano_time(conductor->context->cached_clock);
}
}
}
Expand Down Expand Up @@ -1138,7 +1139,8 @@ aeron_network_publication_t *aeron_driver_conductor_get_or_add_network_publicati
client->publication_links.length++;

conductor->network_publications.array[conductor->network_publications.length++].publication = publication;
publication->conductor_fields.managed_resource.time_of_last_state_change = conductor->nano_clock();
publication->conductor_fields.managed_resource.time_of_last_state_change =
aeron_clock_cached_nano_time(conductor->context->cached_clock);
}
}
}
Expand Down Expand Up @@ -1787,11 +1789,22 @@ void aeron_driver_conductor_on_check_for_blocked_driver_commands(aeron_driver_co
}
}

void aeron_driver_conductor_update_clocks(aeron_driver_conductor_t *conductor, int64_t now_ns)
{
if (conductor->clock_update_deadline_ns - now_ns <= 0)
{
conductor->clock_update_deadline_ns = now_ns + AERON_DRIVER_CONDUCTOR_CLOCK_UPDATE_DURATION_NS;
int64_t now_ms = conductor->context->epoch_clock();
aeron_clock_update_cached_time(conductor->context->cached_clock, now_ms, now_ns);
}
}

int aeron_driver_conductor_do_work(void *clientd)
{
aeron_driver_conductor_t *conductor = (aeron_driver_conductor_t *)clientd;
int work_count = 0;
int64_t now_ns = conductor->nano_clock();
const int64_t now_ns = conductor->context->nano_clock();
aeron_driver_conductor_update_clocks(conductor, now_ns);

work_count += (int)aeron_mpsc_rb_read(
&conductor->to_driver_commands, aeron_driver_conductor_on_command, conductor, 10);
Expand All @@ -1800,7 +1813,7 @@ int aeron_driver_conductor_do_work(void *clientd)

if (now_ns >= (conductor->time_of_last_timeout_check_ns + (int64_t)conductor->context->timer_interval_ns))
{
int64_t now_ms = conductor->epoch_clock();
const int64_t now_ms = aeron_clock_cached_epoch_time(conductor->context->cached_clock);

aeron_mpsc_rb_consumer_heartbeat_time(&conductor->to_driver_commands, now_ms);
aeron_driver_conductor_on_check_managed_resources(conductor, now_ns, now_ms);
Expand Down Expand Up @@ -2077,7 +2090,7 @@ int aeron_driver_conductor_on_add_ipc_publication(
publication->log_file_name,
publication->log_file_name_length);

int64_t now_ns = conductor->context->nano_clock();
int64_t now_ns = aeron_clock_cached_nano_time(conductor->context->cached_clock);

for (size_t i = 0; i < conductor->ipc_subscriptions.length; i++)
{
Expand Down Expand Up @@ -2177,7 +2190,7 @@ int aeron_driver_conductor_on_add_network_publication(
publication->log_file_name,
publication->log_file_name_length);

int64_t now_ns = conductor->context->nano_clock();
int64_t now_ns = aeron_clock_cached_nano_time(conductor->context->cached_clock);

for (size_t i = 0; i < conductor->spy_subscriptions.length; i++)
{
Expand Down Expand Up @@ -2297,7 +2310,7 @@ int aeron_driver_conductor_on_add_ipc_subscription(
aeron_driver_conductor_on_subscription_ready(
conductor, command->correlated.correlation_id, AERON_CHANNEL_STATUS_INDICATOR_NOT_ALLOCATED);

int64_t now_ns = conductor->context->nano_clock();
int64_t now_ns = aeron_clock_cached_nano_time(conductor->context->cached_clock);

for (size_t i = 0; i < conductor->ipc_publications.length; i++)
{
Expand Down Expand Up @@ -2382,7 +2395,7 @@ int aeron_driver_conductor_on_add_spy_subscription(
aeron_driver_conductor_on_subscription_ready(
conductor, command->correlated.correlation_id, AERON_CHANNEL_STATUS_INDICATOR_NOT_ALLOCATED);

int64_t now_ns = conductor->context->nano_clock();
int64_t now_ns = aeron_clock_cached_nano_time(conductor->context->cached_clock);

for (size_t i = 0, length = conductor->network_publications.length; i < length; i++)
{
Expand Down Expand Up @@ -2480,7 +2493,7 @@ int aeron_driver_conductor_on_add_network_subscription(
aeron_driver_conductor_on_subscription_ready(
conductor, command->correlated.correlation_id, endpoint->channel_status.counter_id);

int64_t now_ns = conductor->context->nano_clock();
int64_t now_ns = aeron_clock_cached_nano_time(conductor->context->cached_clock);

for (size_t i = 0, length = conductor->publication_images.length; i < length; i++)
{
Expand Down Expand Up @@ -2609,7 +2622,8 @@ int aeron_driver_conductor_on_client_keepalive(aeron_driver_conductor_t *conduct
if ((index = aeron_driver_conductor_find_client(conductor, client_id)) >= 0)
{
aeron_client_t *client = &conductor->clients.array[index];
aeron_counter_set_ordered(client->heartbeat_timestamp.value_addr, conductor->epoch_clock());
int64_t now_ms = aeron_clock_cached_epoch_time(conductor->context->cached_clock);
aeron_counter_set_ordered(client->heartbeat_timestamp.value_addr, now_ms);
}

return 0;
Expand Down Expand Up @@ -2979,7 +2993,7 @@ void aeron_driver_conductor_on_create_publication_image(void *clientd, void *ite
}

conductor->publication_images.array[conductor->publication_images.length++].image = image;
int64_t now_ns = conductor->context->nano_clock();
int64_t now_ns = aeron_clock_cached_nano_time(conductor->context->cached_clock);

for (size_t i = 0, length = conductor->network_subscriptions.length; i < length; i++)
{
Expand Down Expand Up @@ -3030,7 +3044,8 @@ void aeron_driver_conductor_on_linger_buffer(void *clientd, void *item)

entry->buffer = command->item;
entry->has_reached_end_of_life = false;
entry->timeout_ns = conductor->nano_clock() + AERON_DRIVER_CONDUCTOR_LINGER_RESOURCE_TIMEOUT_NS;
entry->timeout_ns = aeron_clock_cached_nano_time(conductor->context->cached_clock) +
AERON_DRIVER_CONDUCTOR_LINGER_RESOURCE_TIMEOUT_NS;
}

if (AERON_THREADING_MODE_IS_SHARED_OR_INVOKER(conductor->context->threading_mode))
Expand Down
4 changes: 2 additions & 2 deletions aeron-driver/src/main/c/aeron_driver_conductor.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "reports/aeron_loss_reporter.h"

#define AERON_DRIVER_CONDUCTOR_LINGER_RESOURCE_TIMEOUT_NS (5 * 1000 * 1000 * 1000L)
#define AERON_DRIVER_CONDUCTOR_CLOCK_UPDATE_DURATION_NS (1000 * 1000)

typedef struct aeron_publication_link_stct
{
Expand Down Expand Up @@ -268,8 +269,7 @@ typedef struct aeron_driver_conductor_stct
int64_t *unblocked_commands_counter;
int64_t *client_timeouts_counter;

aeron_clock_func_t nano_clock;
aeron_clock_func_t epoch_clock;
int64_t clock_update_deadline_ns;

int32_t next_session_id;
int32_t publication_reserved_session_id_low;
Expand Down
5 changes: 5 additions & 0 deletions aeron-driver/src/main/c/aeron_driver_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,10 @@ int aeron_driver_context_init(aeron_driver_context_t **context)

_context->nano_clock = aeron_nano_clock;
_context->epoch_clock = aeron_epoch_clock;
if (aeron_clock_cache_alloc(&_context->cached_clock) < 0)
{
return -1;
}

_context->conductor_idle_strategy_name = aeron_strndup("backoff", AERON_MAX_PATH);
_context->shared_idle_strategy_name = aeron_strndup("backoff", AERON_MAX_PATH);
Expand Down Expand Up @@ -966,6 +970,7 @@ int aeron_driver_context_close(aeron_driver_context_t *context)
aeron_free((void *)context->receiver_idle_strategy_init_args);
aeron_free((void *)context->shared_idle_strategy_init_args);
aeron_free((void *)context->shared_network_idle_strategy_init_args);
aeron_clock_cache_free(context->cached_clock);
aeron_free(context);

return 0;
Expand Down
1 change: 1 addition & 0 deletions aeron-driver/src/main/c/aeron_driver_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ typedef struct aeron_driver_context_stct

aeron_clock_func_t nano_clock;
aeron_clock_func_t epoch_clock;
aeron_clock_cache_t* cached_clock;

aeron_spsc_concurrent_array_queue_t sender_command_queue;
aeron_spsc_concurrent_array_queue_t receiver_command_queue;
Expand Down
4 changes: 2 additions & 2 deletions aeron-driver/src/main/c/aeron_driver_receiver.c
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ int aeron_driver_receiver_do_work(void *clientd)

aeron_counter_add_ordered(receiver->total_bytes_received_counter, bytes_received);

int64_t now_ns = receiver->context->nano_clock();
int64_t now_ns = aeron_clock_cached_nano_time(receiver->context->cached_clock);

for (size_t i = 0, length = receiver->images.length; i < length; i++)
{
Expand Down Expand Up @@ -406,7 +406,7 @@ int aeron_driver_receiver_add_pending_setup(
entry->endpoint = endpoint;
entry->session_id = session_id;
entry->stream_id = stream_id;
entry->time_of_status_message_ns = receiver->context->nano_clock();
entry->time_of_status_message_ns = aeron_clock_cached_nano_time(receiver->context->cached_clock);
entry->is_periodic = false;
if (NULL != control_addr)
{
Expand Down
2 changes: 1 addition & 1 deletion aeron-driver/src/main/c/aeron_driver_sender.c
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ int aeron_driver_sender_do_work(void *clientd)
work_count += aeron_spsc_concurrent_array_queue_drain(
sender->sender_proxy.command_queue, aeron_driver_sender_on_command, sender, 10);

int64_t now_ns = sender->context->nano_clock();
int64_t now_ns = aeron_clock_cached_nano_time(sender->context->cached_clock);
int64_t bytes_received = 0;
int bytes_sent = aeron_driver_sender_do_send(sender, now_ns);
int poll_result;
Expand Down
3 changes: 1 addition & 2 deletions aeron-driver/src/main/c/aeron_ipc_publication.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ int aeron_ipc_publication_create(
aeron_ipc_publication_t *_pub = NULL;
const uint64_t usable_fs_space = context->usable_fs_space_func(context->aeron_dir);
const uint64_t log_length = aeron_logbuffer_compute_log_length(params->term_length, context->file_page_size);
const int64_t now_ns = context->nano_clock();
int64_t now_ns = aeron_clock_cached_nano_time(context->cached_clock);

*publication = NULL;

Expand Down Expand Up @@ -126,7 +126,6 @@ int aeron_ipc_publication_create(
aeron_logbuffer_fill_default_header(
_pub->mapped_raw_log.log_meta_data.addr, session_id, stream_id, initial_term_id);

_pub->nano_clock = context->nano_clock;
_pub->conductor_fields.subscribable.array = NULL;
_pub->conductor_fields.subscribable.length = 0;
_pub->conductor_fields.subscribable.capacity = 0;
Expand Down
1 change: 0 additions & 1 deletion aeron-driver/src/main/c/aeron_ipc_publication.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ typedef struct aeron_ipc_publication_stct
aeron_logbuffer_metadata_t *log_meta_data;
aeron_position_t pub_lmt_position;
aeron_position_t pub_pos_position;
aeron_clock_func_t nano_clock;

struct aeron_ipc_publication_conductor_fields_stct
{
Expand Down
11 changes: 6 additions & 5 deletions aeron-driver/src/main/c/aeron_network_publication.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ int aeron_network_publication_create(
aeron_network_publication_t *_pub = NULL;
const uint64_t usable_fs_space = context->usable_fs_space_func(context->aeron_dir);
const uint64_t log_length = aeron_logbuffer_compute_log_length(params->term_length, context->file_page_size);
const int64_t now_ns = context->nano_clock();
int64_t now_ns = aeron_clock_cached_nano_time(context->cached_clock);

*publication = NULL;

Expand Down Expand Up @@ -161,7 +161,7 @@ int aeron_network_publication_create(

_pub->endpoint = endpoint;
_pub->flow_control = flow_control_strategy;
_pub->nano_clock = context->nano_clock;
_pub->cached_clock = context->cached_clock;
_pub->conductor_fields.subscribable.array = NULL;
_pub->conductor_fields.subscribable.length = 0;
_pub->conductor_fields.subscribable.capacity = 0;
Expand Down Expand Up @@ -581,7 +581,7 @@ void aeron_network_publication_on_nak(
term_offset,
(size_t)length,
(size_t)(publication->term_length_mask + 1L),
publication->nano_clock(),
aeron_clock_cached_nano_time(publication->cached_clock),
aeron_network_publication_resend,
publication);
}
Expand All @@ -603,7 +603,7 @@ inline static void aeron_network_publication_update_connected_status(
void aeron_network_publication_on_status_message(
aeron_network_publication_t *publication, const uint8_t *buffer, size_t length, struct sockaddr_storage *addr)
{
const int64_t time_ns = publication->nano_clock();
const int64_t time_ns = aeron_clock_cached_nano_time(publication->cached_clock);
publication->status_message_deadline_ns = time_ns + publication->connection_timeout_ns;

if (!publication->has_receivers)
Expand Down Expand Up @@ -779,7 +779,8 @@ void aeron_network_publication_decref(void *clientd)
const int64_t producer_position = aeron_network_publication_producer_position(publication);

publication->conductor_fields.state = AERON_NETWORK_PUBLICATION_STATE_DRAINING;
publication->conductor_fields.time_of_last_activity_ns = publication->nano_clock();
publication->conductor_fields.time_of_last_activity_ns =
aeron_clock_cached_nano_time(publication->cached_clock);

aeron_counter_set_ordered(publication->pub_lmt_position.value_addr, producer_position);
AERON_PUT_ORDERED(publication->log_meta_data->end_of_stream_position, producer_position);
Expand Down
2 changes: 1 addition & 1 deletion aeron-driver/src/main/c/aeron_network_publication.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ typedef struct aeron_network_publication_stct
aeron_logbuffer_metadata_t *log_meta_data;
aeron_send_channel_endpoint_t *endpoint;
aeron_flow_control_strategy_t *flow_control;
aeron_clock_func_t nano_clock;
aeron_clock_cache_t *cached_clock;

char *log_file_name;
int64_t term_window_length;
Expand Down
Loading