Skip to content

Commit

Permalink
pythongh-126914: Store the Preallocated Thread State's Pointer in a P…
Browse files Browse the repository at this point in the history
…yInterpreterState Field (pythongh-126989)

This approach eliminates the originally reported race. It also gets rid of the deadlock reported in pythongh-96071, so we can remove the workaround added then.
  • Loading branch information
ericsnowcurrently committed Nov 21, 2024
1 parent 8cdd636 commit 9581448
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 46 deletions.
2 changes: 2 additions & 0 deletions Include/internal/pycore_interp.h
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,8 @@ struct _is {
/* the initial PyInterpreterState.threads.head */
_PyThreadStateImpl _initial_thread;
Py_ssize_t _interactive_src_count;
// In 3.14+ this is interp->threads.preallocated.
_PyThreadStateImpl *threads_preallocated;
};


Expand Down
30 changes: 30 additions & 0 deletions Lib/test/test_interpreters/test_stress.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def test_create_many_sequential(self):
alive.append(interp)

@support.requires_resource('cpu')
@threading_helper.requires_working_threading()
def test_create_many_threaded(self):
alive = []
def task():
Expand All @@ -32,6 +33,35 @@ def task():
with threading_helper.start_threads(threads):
pass

@support.requires_resource('cpu')
@threading_helper.requires_working_threading()
def test_many_threads_running_interp_in_other_interp(self):
interp = interpreters.create()

script = f"""if True:
import _interpreters
_interpreters.run_string({interp.id}, '1')
"""

def run():
interp = interpreters.create()
alreadyrunning = (f'{interpreters.InterpreterError}: '
'interpreter already running')
success = False
while not success:
try:
interp.exec(script)
except interpreters.ExecutionFailed as exc:
if exc.excinfo.msg != 'interpreter already running':
raise # re-raise
assert exc.excinfo.type.__name__ == 'InterpreterError'
else:
success = True

threads = (threading.Thread(target=run) for _ in range(200))
with threading_helper.start_threads(threads):
pass


if __name__ == '__main__':
# Test needs to be a package, so we can do relative imports.
Expand Down
92 changes: 46 additions & 46 deletions Python/pystate.c
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,8 @@ init_interpreter(PyInterpreterState *interp,
assert(next != NULL || (interp == runtime->interpreters.main));
interp->next = next;

interp->threads_preallocated = &interp->_initial_thread;

// We would call _PyObject_InitState() at this point
// if interp->feature_flags were alredy set.

Expand Down Expand Up @@ -767,7 +769,6 @@ PyInterpreterState_New(void)
return interp;
}


static void
interpreter_clear(PyInterpreterState *interp, PyThreadState *tstate)
{
Expand Down Expand Up @@ -906,6 +907,8 @@ interpreter_clear(PyInterpreterState *interp, PyThreadState *tstate)
// XXX Once we have one allocator per interpreter (i.e.
// per-interpreter GC) we must ensure that all of the interpreter's
// objects have been cleaned up at the point.

// If we had a freelist of thread states, we would clear it here.
}


Expand Down Expand Up @@ -1427,22 +1430,45 @@ allocate_chunk(int size_in_bytes, _PyStackChunk* previous)
return res;
}

static void
reset_threadstate(_PyThreadStateImpl *tstate)
{
// Set to _PyThreadState_INIT directly?
memcpy(tstate,
&initial._main_interpreter._initial_thread,
sizeof(*tstate));
}

static _PyThreadStateImpl *
alloc_threadstate(void)
alloc_threadstate(PyInterpreterState *interp)
{
return PyMem_RawCalloc(1, sizeof(_PyThreadStateImpl));
_PyThreadStateImpl *tstate;

// Try the preallocated tstate first.
tstate = _Py_atomic_exchange_ptr(&interp->threads_preallocated, NULL);

// Fall back to the allocator.
if (tstate == NULL) {
tstate = PyMem_RawCalloc(1, sizeof(_PyThreadStateImpl));
if (tstate == NULL) {
return NULL;
}
reset_threadstate(tstate);
}
return tstate;
}

static void
free_threadstate(_PyThreadStateImpl *tstate)
{
PyInterpreterState *interp = tstate->base.interp;
// The initial thread state of the interpreter is allocated
// as part of the interpreter state so should not be freed.
if (tstate == &tstate->base.interp->_initial_thread) {
// Restore to _PyThreadState_INIT.
memcpy(tstate,
&initial._main_interpreter._initial_thread,
sizeof(*tstate));
if (tstate == &interp->_initial_thread) {
// Make it available again.
reset_threadstate(tstate);
assert(interp->threads_preallocated == NULL);
_Py_atomic_store_ptr(&interp->threads_preallocated, tstate);
}
else {
PyMem_RawFree(tstate);
Expand Down Expand Up @@ -1533,68 +1559,42 @@ add_threadstate(PyInterpreterState *interp, PyThreadState *tstate,
static PyThreadState *
new_threadstate(PyInterpreterState *interp, int whence)
{
_PyThreadStateImpl *tstate;
_PyRuntimeState *runtime = interp->runtime;
// We don't need to allocate a thread state for the main interpreter
// (the common case), but doing it later for the other case revealed a
// reentrancy problem (deadlock). So for now we always allocate before
// taking the interpreters lock. See GH-96071.
_PyThreadStateImpl *new_tstate = alloc_threadstate();
int used_newtstate;
if (new_tstate == NULL) {
// Allocate the thread state.
_PyThreadStateImpl *tstate = alloc_threadstate(interp);
if (tstate == NULL) {
return NULL;
}

#ifdef Py_GIL_DISABLED
Py_ssize_t qsbr_idx = _Py_qsbr_reserve(interp);
if (qsbr_idx < 0) {
PyMem_RawFree(new_tstate);
free_threadstate(tstate);
return NULL;
}
#endif

/* We serialize concurrent creation to protect global state. */
HEAD_LOCK(runtime);
HEAD_LOCK(interp->runtime);

// Initialize the new thread state.
interp->threads.next_unique_id += 1;
uint64_t id = interp->threads.next_unique_id;
init_threadstate(tstate, interp, id, whence);

// Allocate the thread state and add it to the interpreter.
// Add the new thread state to the interpreter.
PyThreadState *old_head = interp->threads.head;
if (old_head == NULL) {
// It's the interpreter's initial thread state.
used_newtstate = 0;
tstate = &interp->_initial_thread;
}
// XXX Re-use interp->_initial_thread if not in use?
else {
// Every valid interpreter must have at least one thread.
assert(id > 1);
assert(old_head->prev == NULL);
used_newtstate = 1;
tstate = new_tstate;
// Set to _PyThreadState_INIT.
memcpy(tstate,
&initial._main_interpreter._initial_thread,
sizeof(*tstate));
}

init_threadstate(tstate, interp, id, whence);
add_threadstate(interp, (PyThreadState *)tstate, old_head);

HEAD_UNLOCK(runtime);
if (!used_newtstate) {
// Must be called with lock unlocked to avoid re-entrancy deadlock.
PyMem_RawFree(new_tstate);
}
else {
HEAD_UNLOCK(interp->runtime);
#ifdef Py_GIL_DISABLED
if (id == 1) {
if (_Py_atomic_load_int(&interp->gc.immortalize) == 0) {
// Immortalize objects marked as using deferred reference counting
// the first time a non-main thread is created.
_PyGC_ImmortalizeDeferredObjects(interp);
}
#endif
}
#endif

#ifdef Py_GIL_DISABLED
// Must be called with lock unlocked to avoid lock ordering deadlocks.
Expand Down

0 comments on commit 9581448

Please sign in to comment.