Skip to content

Commit

Permalink
threading: fixup scheduler statepoints for GC (#32238)
Browse files Browse the repository at this point in the history
  • Loading branch information
JeffBezanson authored Jun 10, 2019
1 parent 5d02c59 commit aaaa6a1
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 26 deletions.
1 change: 1 addition & 0 deletions src/julia_threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ struct _jl_tls_states_t {
int16_t tid;
uint64_t rngseed;
volatile size_t *safepoint;
volatile int8_t sleep_check_state;
// Whether it is safe to execute GC at the same time.
#define JL_GC_STATE_WAITING 1
// gc_state = 1 means the thread is doing GC or is waiting for the GC to
Expand Down
56 changes: 30 additions & 26 deletions src/partr.c
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,12 @@ static void wake_thread(int16_t self, int16_t tid)
{
if (self != tid) {
jl_ptls_t other = jl_all_tls_states[tid];
uv_mutex_lock(&other->sleep_lock);
uv_cond_signal(&other->wake_signal);
uv_mutex_unlock(&other->sleep_lock);
int16_t state = jl_atomic_exchange(&other->sleep_check_state, not_sleeping);
if (state == sleeping) {
uv_mutex_lock(&other->sleep_lock);
uv_cond_signal(&other->wake_signal);
uv_mutex_unlock(&other->sleep_lock);
}
}
}

Expand Down Expand Up @@ -386,19 +389,26 @@ JL_DLLEXPORT void jl_set_task_tid(jl_task_t *task, int tid) JL_NOTSAFEPOINT
// get the next runnable task from the multiq
static jl_task_t *get_next_task(jl_value_t *getsticky)
{
jl_gc_safepoint();
jl_task_t *task = (jl_task_t*)jl_apply(&getsticky, 1);
if (jl_typeis(task, jl_task_type)) {
int self = jl_get_ptls_states()->tid;
jl_set_task_tid(task, self);
return task;
}
jl_gc_safepoint();
#ifdef JULIA_ENABLE_THREADING
return multiq_deletemin();
#else
return NULL;
#endif
}

static int may_sleep(jl_ptls_t ptls)
{
return jl_atomic_load(&sleep_check_state) == sleeping && jl_atomic_load(&ptls->sleep_check_state) == sleeping;
}

extern volatile unsigned _threadedregion;

JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *getsticky)
Expand All @@ -408,12 +418,12 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *getsticky)
jl_task_t *task;

while (1) {
jl_gc_safepoint();
task = get_next_task(getsticky);
if (task)
return task;

#ifdef JULIA_ENABLE_THREADING
// quick, race-y check to see if there seems to be any stuff in there
jl_cpu_pause();
if (!multiq_check_empty()) {
start_cycles = 0;
Expand All @@ -425,6 +435,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *getsticky)
if (sleep_check_after_threshold(&start_cycles) || (!_threadedregion && ptls->tid == 0)) {
if (!sleep_check_now(ptls->tid))
continue;
jl_atomic_store(&ptls->sleep_check_state, sleeping); // acquire sleep-check lock
task = get_next_task(getsticky);
if (task)
return task;
Expand All @@ -449,29 +460,26 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *getsticky)
JL_UV_UNLOCK();
}
else {
// otherwise, block until someone asks us for the lock
task = get_next_task(getsticky);
if (task) {
JL_UV_UNLOCK();
return task;
}
// otherwise, we may block until someone asks us for the lock
uv_loop_t *loop = jl_global_event_loop();
loop->stop_flag = 0;
active = uv_run(loop, UV_RUN_ONCE);
jl_gc_safepoint();
if (may_sleep(ptls)) {
loop->stop_flag = 0;
active = uv_run(loop, UV_RUN_ONCE);
}
JL_UV_UNLOCK();
// optimization: check again first if we added work for ourself
task = get_next_task(getsticky);
if (task)
return task;
// or someone else might have
if (jl_atomic_load(&sleep_check_state) != sleeping) {
// optimization: check again first if we may have work to do
if (!may_sleep(ptls)) {
start_cycles = 0;
continue;
}
// otherwise, we got a spurious wakeup since some other
// thread just wanted to steal libuv from us,
// thread that just wanted to steal libuv from us,
// just go right back to sleep on the other wake signal
// to let them take it from us without conflict
// TODO: this relinquishes responsibility for all event
// to the last thread to do an explicit operation,
// which may starve other threads of critical work
}
if (!_threadedregion && active && ptls->tid == 0) {
// thread 0 is the only thread permitted to run the event loop
Expand All @@ -480,20 +488,16 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *getsticky)
continue;
}
}

// the other threads will just wait for on signal to resume
int8_t gc_state = jl_gc_safe_enter(ptls);
uv_mutex_lock(&ptls->sleep_lock);
while (jl_atomic_load(&sleep_check_state) == sleeping) {
task = get_next_task(getsticky);
if (task)
break;
while (may_sleep(ptls)) {
uv_cond_wait(&ptls->wake_signal, &ptls->sleep_lock);
}
uv_mutex_unlock(&ptls->sleep_lock);
jl_gc_safe_leave(ptls, gc_state);
jl_gc_safe_leave(ptls, gc_state); // contains jl_gc_safepoint
start_cycles = 0;
if (task)
return task;
}
else {
// maybe check the kernel for new messages too
Expand Down

0 comments on commit aaaa6a1

Please sign in to comment.