Skip to content

Commit

Permalink
Merge pull request grpc#11333 from ctiller/faster_timer_pool
Browse files Browse the repository at this point in the history
Get rid of zero-length poll case now that timer pool exists
  • Loading branch information
ctiller authored Jun 7, 2017
2 parents 2f6715d + 6d7c49f commit 6b19d3b
Show file tree
Hide file tree
Showing 9 changed files with 197 additions and 129 deletions.
3 changes: 2 additions & 1 deletion src/core/lib/iomgr/iomgr.c
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ void grpc_iomgr_shutdown(grpc_exec_ctx *exec_ctx) {
}
last_warning_time = gpr_now(GPR_CLOCK_REALTIME);
}
if (grpc_timer_check(exec_ctx, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL)) {
if (grpc_timer_check(exec_ctx, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL) ==
GRPC_TIMERS_FIRED) {
gpr_mu_unlock(&g_mu);
grpc_exec_ctx_flush(exec_ctx);
grpc_iomgr_platform_flush();
Expand Down
10 changes: 8 additions & 2 deletions src/core/lib/iomgr/timer.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,21 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer);

/* iomgr internal api for dealing with timers */

typedef enum {
GRPC_TIMERS_NOT_CHECKED,
GRPC_TIMERS_CHECKED_AND_EMPTY,
GRPC_TIMERS_FIRED,
} grpc_timer_check_result;

/* Check for timers to be run, and run them.
Return true if timer callbacks were executed.
If next is non-null, TRY to update *next with the next running timer
IF that timer occurs before *next current value.
*next is never guaranteed to be updated on any given execution; however,
with high probability at least one thread in the system will see an update
at any time slice. */
bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
gpr_timespec *next);
grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx,
gpr_timespec now, gpr_timespec *next);
void grpc_timer_list_init(gpr_timespec now);
void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx);

Expand Down
58 changes: 27 additions & 31 deletions src/core/lib/iomgr/timer_generic.c
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,10 @@ static gpr_atm saturating_add(gpr_atm a, gpr_atm b) {
return a + b;
}

static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now,
gpr_atm *next, grpc_error *error);
static grpc_timer_check_result run_some_expired_timers(grpc_exec_ctx *exec_ctx,
gpr_atm now,
gpr_atm *next,
grpc_error *error);

static gpr_timespec dbl_to_ts(double d) {
gpr_timespec ts;
Expand Down Expand Up @@ -421,19 +423,22 @@ static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard,
return n;
}

static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now,
gpr_atm *next, grpc_error *error) {
size_t n = 0;
static grpc_timer_check_result run_some_expired_timers(grpc_exec_ctx *exec_ctx,
gpr_atm now,
gpr_atm *next,
grpc_error *error) {
grpc_timer_check_result result = GRPC_TIMERS_NOT_CHECKED;

gpr_atm min_timer = gpr_atm_no_barrier_load(&g_shared_mutables.min_timer);
gpr_tls_set(&g_last_seen_min_timer, min_timer);
if (now < min_timer) {
if (next != NULL) *next = GPR_MIN(*next, min_timer);
return 0;
return GRPC_TIMERS_CHECKED_AND_EMPTY;
}

if (gpr_spinlock_trylock(&g_shared_mutables.checker_mu)) {
gpr_mu_lock(&g_shared_mutables.mu);
result = GRPC_TIMERS_CHECKED_AND_EMPTY;

if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, " .. shard[%d]->min_deadline = %" PRIdPTR,
Expand All @@ -448,14 +453,17 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now,
/* For efficiency, we pop as many available timers as we can from the
shard. This may violate perfect timer deadline ordering, but that
shouldn't be a big deal because we don't make ordering guarantees. */
n +=
pop_timers(exec_ctx, g_shard_queue[0], now, &new_min_deadline, error);
if (pop_timers(exec_ctx, g_shard_queue[0], now, &new_min_deadline,
error) > 0) {
result = GRPC_TIMERS_FIRED;
}

if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, " .. popped --> %" PRIdPTR
", shard[%d]->min_deadline %" PRIdPTR
" --> %" PRIdPTR ", now=%" PRIdPTR,
n, (int)(g_shard_queue[0] - g_shards),
gpr_log(GPR_DEBUG,
" .. result --> %d"
", shard[%d]->min_deadline %" PRIdPTR " --> %" PRIdPTR
", now=%" PRIdPTR,
result, (int)(g_shard_queue[0] - g_shards),
g_shard_queue[0]->min_deadline, new_min_deadline, now);
}

Expand All @@ -476,26 +484,15 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now,
g_shard_queue[0]->min_deadline);
gpr_mu_unlock(&g_shared_mutables.mu);
gpr_spinlock_unlock(&g_shared_mutables.checker_mu);
} else if (next != NULL) {
/* TODO(ctiller): this forces calling code to do an short poll, and
then retry the timer check (because this time through the timer list was
contended).
We could reduce the cost here dramatically by keeping a count of how
many currently active pollers got through the uncontended case above
successfully, and waking up other pollers IFF that count drops to zero.
Once that count is in place, this entire else branch could disappear. */
*next = GPR_MIN(*next, now + 1);
}

GRPC_ERROR_UNREF(error);

return (int)n;
return result;
}

bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
gpr_timespec *next) {
grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx,
gpr_timespec now, gpr_timespec *next) {
// prelude
GPR_ASSERT(now.clock_type == g_clock_type);
gpr_atm now_atm = timespec_to_atm_round_down(now);
Expand All @@ -513,7 +510,7 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
"TIMER CHECK SKIP: now_atm=%" PRIdPTR " min_timer=%" PRIdPTR,
now_atm, min_timer);
}
return 0;
return GRPC_TIMERS_CHECKED_AND_EMPTY;
}

grpc_error *shutdown_error =
Expand All @@ -538,7 +535,7 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
gpr_free(next_str);
}
// actual code
bool r;
grpc_timer_check_result r;
gpr_atm next_atm;
if (next == NULL) {
r = run_some_expired_timers(exec_ctx, now_atm, NULL, shutdown_error);
Expand All @@ -556,11 +553,10 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
gpr_asprintf(&next_str, "%" PRId64 ".%09d [%" PRIdPTR "]", next->tv_sec,
next->tv_nsec, next_atm);
}
gpr_log(GPR_DEBUG, "TIMER CHECK END: %d timers triggered; next=%s", r,
next_str);
gpr_log(GPR_DEBUG, "TIMER CHECK END: r=%d; next=%s", r, next_str);
gpr_free(next_str);
}
return r > 0;
return r;
}

#endif /* GRPC_TIMER_USE_GENERIC */
187 changes: 117 additions & 70 deletions src/core/lib/iomgr/timer_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -107,86 +107,124 @@ void grpc_timer_manager_tick() {
grpc_exec_ctx_finish(&exec_ctx);
}

static void timer_thread(void *unused) {
// this threads exec_ctx: we try to run things through to completion here
// since it's easy to spin up new threads
grpc_exec_ctx exec_ctx =
GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL);
static void run_some_timers(grpc_exec_ctx *exec_ctx) {
// if there's something to execute...
gpr_mu_lock(&g_mu);
// remove a waiter from the pool, and start another thread if necessary
--g_waiter_count;
if (g_waiter_count == 0 && g_threaded) {
start_timer_thread_and_unlock();
} else {
// if there's no thread waiting with a timeout, kick an existing
// waiter
// so that the next deadline is not missed
if (!g_has_timed_waiter) {
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, "kick untimed waiter");
}
gpr_cv_signal(&g_cv_wait);
}
gpr_mu_unlock(&g_mu);
}
// without our lock, flush the exec_ctx
grpc_exec_ctx_flush(exec_ctx);
gpr_mu_lock(&g_mu);
// garbage collect any threads hanging out that are dead
gc_completed_threads();
// get ready to wait again
++g_waiter_count;
gpr_mu_unlock(&g_mu);
}

// wait until 'next' (or forever if there is already a timed waiter in the pool)
// returns true if the thread should continue executing (false if it should
// shutdown)
static bool wait_until(gpr_timespec next) {
const gpr_timespec inf_future = gpr_inf_future(GPR_CLOCK_MONOTONIC);
gpr_mu_lock(&g_mu);
// if we're not threaded anymore, leave
if (!g_threaded) {
gpr_mu_unlock(&g_mu);
return false;
}
// if there's no timed waiter, we should become one: that waiter waits
// only until the next timer should expire
// all other timers wait forever
uint64_t my_timed_waiter_generation = g_timed_waiter_generation - 1;
if (!g_has_timed_waiter && gpr_time_cmp(next, inf_future) != 0) {
g_has_timed_waiter = true;
// we use a generation counter to track the timed waiter so we can
// cancel an existing one quickly (and when it actually times out it'll
// figure stuff out instead of incurring a wakeup)
my_timed_waiter_generation = ++g_timed_waiter_generation;
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_timespec wait_time = gpr_time_sub(next, gpr_now(GPR_CLOCK_MONOTONIC));
gpr_log(GPR_DEBUG, "sleep for a %" PRId64 ".%09d seconds",
wait_time.tv_sec, wait_time.tv_nsec);
}
} else {
next = inf_future;
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, "sleep until kicked");
}
}
gpr_cv_wait(&g_cv_wait, &g_mu, next);
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d",
my_timed_waiter_generation == g_timed_waiter_generation, g_kicked);
}
// if this was the timed waiter, then we need to check timers, and flag
// that there's now no timed waiter... we'll look for a replacement if
// there's work to do after checking timers (code above)
if (my_timed_waiter_generation == g_timed_waiter_generation) {
g_has_timed_waiter = false;
}
// if this was a kick from the timer system, consume it (and don't stop
// this thread yet)
if (g_kicked) {
grpc_timer_consume_kick();
g_kicked = false;
}
gpr_mu_unlock(&g_mu);
return true;
}

static void timer_main_loop(grpc_exec_ctx *exec_ctx) {
const gpr_timespec inf_future = gpr_inf_future(GPR_CLOCK_MONOTONIC);
for (;;) {
gpr_timespec next = inf_future;
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
// check timer state, updates next to the next time to run a check
if (grpc_timer_check(&exec_ctx, now, &next)) {
// if there's something to execute...
gpr_mu_lock(&g_mu);
// remove a waiter from the pool, and start another thread if necessary
--g_waiter_count;
if (g_waiter_count == 0 && g_threaded) {
start_timer_thread_and_unlock();
} else {
// if there's no thread waiting with a timeout, kick an existing waiter
// so that the next deadline is not missed
if (!g_has_timed_waiter) {
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, "kick untimed waiter");
}
gpr_cv_signal(&g_cv_wait);
}
gpr_mu_unlock(&g_mu);
}
// without our lock, flush the exec_ctx
grpc_exec_ctx_flush(&exec_ctx);
gpr_mu_lock(&g_mu);
// garbage collect any threads hanging out that are dead
gc_completed_threads();
// get ready to wait again
++g_waiter_count;
gpr_mu_unlock(&g_mu);
} else {
gpr_mu_lock(&g_mu);
// if we're not threaded anymore, leave
if (!g_threaded) break;
// if there's no timed waiter, we should become one: that waiter waits
// only until the next timer should expire
// all other timers wait forever
uint64_t my_timed_waiter_generation = g_timed_waiter_generation - 1;
if (!g_has_timed_waiter) {
g_has_timed_waiter = true;
// we use a generation counter to track the timed waiter so we can
// cancel an existing one quickly (and when it actually times out it'll
// figure stuff out instead of incurring a wakeup)
my_timed_waiter_generation = ++g_timed_waiter_generation;
switch (grpc_timer_check(exec_ctx, now, &next)) {
case GRPC_TIMERS_FIRED:
run_some_timers(exec_ctx);
break;
case GRPC_TIMERS_NOT_CHECKED:
/* This case only happens under contention, meaning more than one timer
manager thread checked timers concurrently.
If that happens, we're guaranteed that some other thread has just
checked timers, and this will avalanche into some other thread seeing
empty timers and doing a timed sleep.
Consequently, we can just sleep forever here and be happy at some
saved wakeup cycles. */
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, "sleep for a while");
gpr_log(GPR_DEBUG, "timers not checked: expect another thread to");
}
} else {
next = inf_future;
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, "sleep until kicked");
/* fall through */
case GRPC_TIMERS_CHECKED_AND_EMPTY:
if (!wait_until(next)) {
return;
}
}
gpr_cv_wait(&g_cv_wait, &g_mu, next);
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d",
my_timed_waiter_generation == g_timed_waiter_generation,
g_kicked);
}
// if this was the timed waiter, then we need to check timers, and flag
// that there's now no timed waiter... we'll look for a replacement if
// there's work to do after checking timers (code above)
if (my_timed_waiter_generation == g_timed_waiter_generation) {
g_has_timed_waiter = false;
}
// if this was a kick from the timer system, consume it (and don't stop
// this thread yet)
if (g_kicked) {
grpc_timer_consume_kick();
g_kicked = false;
}
gpr_mu_unlock(&g_mu);
break;
}
}
}

static void timer_thread_cleanup(void) {
gpr_mu_lock(&g_mu);
// terminate the thread: drop the waiter count, thread count, and let whomever
// stopped the threading stuff know that we're done
--g_waiter_count;
Expand All @@ -199,12 +237,21 @@ static void timer_thread(void *unused) {
ct->next = g_completed_threads;
g_completed_threads = ct;
gpr_mu_unlock(&g_mu);
grpc_exec_ctx_finish(&exec_ctx);
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, "End timer thread");
}
}

static void timer_thread(void *unused) {
// this threads exec_ctx: we try to run things through to completion here
// since it's easy to spin up new threads
grpc_exec_ctx exec_ctx =
GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL);
timer_main_loop(&exec_ctx);
grpc_exec_ctx_finish(&exec_ctx);
timer_thread_cleanup();
}

static void start_threads(void) {
gpr_mu_lock(&g_mu);
if (!g_threaded) {
Expand Down
6 changes: 3 additions & 3 deletions src/core/lib/iomgr/timer_uv.c
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
}
}

bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
gpr_timespec *next) {
return false;
grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx,
gpr_timespec now, gpr_timespec *next) {
return GRPC_TIMERS_NOT_CHECKED;
}

void grpc_timer_list_init(gpr_timespec now) {}
Expand Down
Loading

0 comments on commit 6b19d3b

Please sign in to comment.