Skip to content

Commit

Permalink
Rename run_sync_in_worker_thread -> run_sync_in_thread
Browse files Browse the repository at this point in the history
  • Loading branch information
njsmith committed Jun 11, 2019
1 parent 13d27d6 commit c13c60e
Show file tree
Hide file tree
Showing 24 changed files with 102 additions and 83 deletions.
4 changes: 2 additions & 2 deletions docs/source/history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ Upcoming breaking changes with warnings (i.e., stuff that in 0.2.0
functions that take and run a synchronous function. As part of this:

* ``run_in_worker_thread`` is becoming
:func:`run_sync_in_worker_thread`
``run_sync_in_worker_thread``

* We took the opportunity to refactor ``run_in_trio_thread`` and
``await_in_trio_thread`` into the new class
Expand Down Expand Up @@ -655,7 +655,7 @@ CPython, or PyPy3 5.9+.
Other changes
~~~~~~~~~~~~~

* :func:`run_sync_in_worker_thread` now has a :ref:`robust mechanism
* :func:`run_sync_in_thread` now has a :ref:`robust mechanism
for applying capacity limits to the number of concurrent threads
<worker-thread-limiting>` (`#10
<https://github.com/python-trio/trio/issues/170>`__, `#57
Expand Down
18 changes: 9 additions & 9 deletions docs/source/reference-core.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1566,7 +1566,7 @@ In acknowledgment of this reality, Trio provides two useful utilities
for working with real, operating-system level,
:mod:`threading`\-module-style threads. First, if you're in Trio but
need to push some blocking I/O into a thread, there's
:func:`run_sync_in_worker_thread`. And if you're in a thread and need
:func:`run_sync_in_thread`. And if you're in a thread and need
to communicate back with Trio, you can use a
:class:`BlockingTrioPortal`.

Expand All @@ -1589,7 +1589,7 @@ are spawned and the system gets overloaded and crashes. Instead, the N
threads start executing the first N jobs, while the other
(100,000 - N) jobs sit in a queue and wait their turn. Which is
generally what you want, and this is how
:func:`trio.run_sync_in_worker_thread` works by default.
:func:`trio.run_sync_in_thread` works by default.

The downside of this kind of thread pool is that sometimes, you need
more sophisticated logic for controlling how many threads are run at
Expand Down Expand Up @@ -1636,10 +1636,10 @@ re-using threads, but has no admission control policy: if you give it
responsible for providing the policy to make sure that this doesn't
happen – but since it *only* has to worry about policy, it can be much
simpler. In fact, all there is to it is the ``limiter=`` argument
passed to :func:`run_sync_in_worker_thread`. This defaults to a global
passed to :func:`run_sync_in_thread`. This defaults to a global
:class:`CapacityLimiter` object, which gives us the classic fixed-size
thread pool behavior. (See
:func:`current_default_worker_thread_limiter`.) But if you want to use
:func:`current_default_thread_limiter`.) But if you want to use
"separate pools" for type A jobs and type B jobs, then it's just a
matter of creating two separate :class:`CapacityLimiter` objects and
passing them in when running these jobs. Or here's an example of
Expand Down Expand Up @@ -1679,7 +1679,7 @@ time::
return USER_LIMITERS[user_id]
except KeyError:
per_user_limiter = trio.CapacityLimiter(MAX_THREADS_PER_USER)
global_limiter = trio.current_default_worker_thread_limiter()
global_limiter = trio.current_default_thread_limiter()
# IMPORTANT: acquire the per_user_limiter before the global_limiter.
# If we get 100 jobs for a user at the same time, we want
# to only allow 3 of them at a time to even compete for the
Expand All @@ -1690,17 +1690,17 @@ time::


async def run_in_worker_thread_for_user(user_id, async_fn, *args, **kwargs):
# *args belong to async_fn; **kwargs belong to run_sync_in_worker_thread
# *args belong to async_fn; **kwargs belong to run_sync_in_thread
kwargs["limiter"] = get_user_limiter(user_id)
return await trio.run_sync_in_worker_thread(asycn_fn, *args, **kwargs)
return await trio.run_sync_in_thread(asycn_fn, *args, **kwargs)


Putting blocking I/O into worker threads
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. autofunction:: run_sync_in_worker_thread
.. autofunction:: run_sync_in_thread

.. autofunction:: current_default_worker_thread_limiter
.. autofunction:: current_default_thread_limiter


Getting back into the Trio thread from another thread
Expand Down
2 changes: 1 addition & 1 deletion docs/source/reference-core/blocking-trio-portal-example.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ async def main():
# In a background thread, run:
# thread_fn(portal, receive_from_trio, send_to_trio)
nursery.start_soon(
trio.run_sync_in_worker_thread,
trio.run_sync_in_thread,
thread_fn, portal, receive_from_trio, send_to_trio
)

Expand Down
2 changes: 1 addition & 1 deletion docs/source/reference-hazmat.rst
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ This logic is a bit convoluted, but accomplishes all of the following:
loop outside of the ``except BlockingIOError:`` block.

These functions can also be useful in other situations. For example,
when :func:`trio.run_sync_in_worker_thread` schedules some work to run
when :func:`trio.run_sync_in_thread` schedules some work to run
in a worker thread, it blocks until the work is finished (so it's a
schedule point), but by default it doesn't allow cancellation. So to
make sure that the call always acts as a checkpoint, it calls
Expand Down
2 changes: 1 addition & 1 deletion docs/source/reference-io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ To understand why, you need to know two things.
First, right now no mainstream operating system offers a generic,
reliable, native API for async file or filesystem operations, so we
have to fake it by using threads (specifically,
:func:`run_sync_in_worker_thread`). This is cheap but isn't free: on a
:func:`run_sync_in_thread`). This is cheap but isn't free: on a
typical PC, dispatching to a worker thread adds something like ~100 µs
of overhead to each operation. ("µs" is pronounced "microseconds", and
there are 1,000,000 µs in a second. Note that all the numbers here are
Expand Down
5 changes: 5 additions & 0 deletions newsfragments/810.removal.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
``run_sync_in_worker_thread`` was too much of a mouthful – now it's
just called `run_sync_in_thread` (though the old name still works with
a deprecation warning, for now). Similarly,
``current_default_worker_thread_limiter`` is becoming
`current_default_thread_limiter`.
2 changes: 1 addition & 1 deletion notes-to-self/blocking-read-hack.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async def kill_it_after_timeout(new_fd):
async with trio.open_nursery() as nursery:
nursery.start_soon(kill_it_after_timeout, new_fd)
try:
data = await trio.run_sync_in_worker_thread(os.read, new_fd, count)
data = await trio.run_sync_in_thread(os.read, new_fd, count)
except OSError as exc:
if cancel_requested and exc.errno == errno.ENOTCONN:
# Call was successfully cancelled. In a real version we'd
Expand Down
2 changes: 1 addition & 1 deletion notes-to-self/thread-dispatch-bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# minimal a fashion as possible.
#
# This is useful to get a sense of the *lower-bound* cost of
# run_sync_in_worker_thread
# run_sync_in_thread

import threading
from queue import Queue
Expand Down
16 changes: 15 additions & 1 deletion trio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
)

from ._threads import (
run_sync_in_worker_thread, current_default_worker_thread_limiter,
run_sync_in_thread, current_default_thread_limiter,
BlockingTrioPortal
)

Expand Down Expand Up @@ -100,6 +100,20 @@
"library 'subprocess' module"
),
),
"run_sync_in_worker_thread":
_deprecate.DeprecatedAttribute(
run_sync_in_thread,
"0.12.0",
issue=810,
instead=run_sync_in_thread,
),
"current_default_worker_thread_limiter":
_deprecate.DeprecatedAttribute(
current_default_thread_limiter,
"0.12.0",
issue=810,
instead=current_default_thread_limiter,
),
}

# Having the public path in .__module__ attributes is important for:
Expand Down
2 changes: 1 addition & 1 deletion trio/_core/_entry_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class TrioToken:
1. It lets you re-enter the Trio run loop from external threads or signal
handlers. This is the low-level primitive that
:func:`trio.run_sync_in_worker_thread` uses to receive results from
:func:`trio.run_sync_in_thread` uses to receive results from
worker threads, that :func:`trio.open_signal_receiver` uses to receive
notifications about signals, and so forth.
Expand Down
2 changes: 1 addition & 1 deletion trio/_core/_traps.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def abort_func(raise_cancel):
At that point there are again two possibilities. You can simply ignore
the cancellation altogether: wait for the operation to complete and
then reschedule and continue as normal. (For example, this is what
:func:`trio.run_sync_in_worker_thread` does if cancellation is disabled.)
:func:`trio.run_sync_in_thread` does if cancellation is disabled.)
The other possibility is that the ``abort_func`` does succeed in
cancelling the operation, but for some reason isn't able to report that
right away. (Example: on Windows, it's possible to request that an
Expand Down
4 changes: 2 additions & 2 deletions trio/_core/tests/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from .tutil import check_sequence_matches, gc_collect_harder
from ... import _core
from ..._threads import run_sync_in_worker_thread
from ..._threads import run_sync_in_thread
from ..._timeouts import sleep, fail_after
from ..._util import aiter_compat
from ...testing import (
Expand Down Expand Up @@ -552,7 +552,7 @@ async def test_cancel_scope_repr(mock_clock):
scope.deadline = _core.current_time() + 10
assert "deadline is 10.00 seconds from now" in repr(scope)
# when not in async context, can't get the current time
assert "deadline" not in await run_sync_in_worker_thread(repr, scope)
assert "deadline" not in await run_sync_in_thread(repr, scope)
scope.cancel()
assert "cancelled" in repr(scope)
assert "exited" in repr(scope)
Expand Down
10 changes: 5 additions & 5 deletions trio/_file_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
class AsyncIOWrapper(AsyncResource):
"""A generic :class:`~io.IOBase` wrapper that implements the :term:`asynchronous
file object` interface. Wrapped methods that could block are executed in
:meth:`trio.run_sync_in_worker_thread`.
:meth:`trio.run_sync_in_thread`.
All properties and methods defined in in :mod:`~io` are exposed by this
wrapper, if they exist in the wrapped file object.
Expand All @@ -80,7 +80,7 @@ def __getattr__(self, name):
@async_wraps(self.__class__, self._wrapped.__class__, name)
async def wrapper(*args, **kwargs):
func = partial(meth, *args, **kwargs)
return await trio.run_sync_in_worker_thread(func)
return await trio.run_sync_in_thread(func)

# cache the generated method
setattr(self, name, wrapper)
Expand Down Expand Up @@ -115,7 +115,7 @@ async def detach(self):
"""

raw = await trio.run_sync_in_worker_thread(self._wrapped.detach)
raw = await trio.run_sync_in_thread(self._wrapped.detach)
return wrap_file(raw)

async def aclose(self):
Expand All @@ -128,7 +128,7 @@ async def aclose(self):

# ensure the underling file is closed during cancellation
with trio.CancelScope(shield=True):
await trio.run_sync_in_worker_thread(self._wrapped.close)
await trio.run_sync_in_thread(self._wrapped.close)

await trio.hazmat.checkpoint_if_cancelled()

Expand Down Expand Up @@ -165,7 +165,7 @@ async def open_file(
file = fspath(file)

_file = wrap_file(
await trio.run_sync_in_worker_thread(
await trio.run_sync_in_thread(
io.open, file, mode, buffering, encoding, errors, newline, closefd,
opener
)
Expand Down
10 changes: 5 additions & 5 deletions trio/_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def iter_wrapper_factory(cls, meth_name):
async def wrapper(self, *args, **kwargs):
meth = getattr(self._wrapped, meth_name)
func = partial(meth, *args, **kwargs)
items = await trio.run_sync_in_worker_thread(func)
items = await trio.run_sync_in_thread(func)
return (rewrap_path(item) for item in items)

return wrapper
Expand All @@ -70,7 +70,7 @@ async def wrapper(self, *args, **kwargs):
args = unwrap_paths(args)
meth = getattr(self._wrapped, meth_name)
func = partial(meth, *args, **kwargs)
value = await trio.run_sync_in_worker_thread(func)
value = await trio.run_sync_in_thread(func)
return rewrap_path(value)

return wrapper
Expand All @@ -83,7 +83,7 @@ async def wrapper(cls, *args, **kwargs):
args = unwrap_paths(args)
meth = getattr(cls._wraps, meth_name)
func = partial(meth, *args, **kwargs)
value = await trio.run_sync_in_worker_thread(func)
value = await trio.run_sync_in_thread(func)
return rewrap_path(value)

return wrapper
Expand Down Expand Up @@ -145,7 +145,7 @@ def generate_iter(cls, attrs):

class Path(metaclass=AsyncAutoWrapperType):
"""A :class:`pathlib.Path` wrapper that executes blocking methods in
:meth:`trio.run_sync_in_worker_thread`.
:meth:`trio.run_sync_in_thread`.
"""

Expand Down Expand Up @@ -185,7 +185,7 @@ async def open(self, *args, **kwargs):
"""

func = partial(self._wrapped.open, *args, **kwargs)
value = await trio.run_sync_in_worker_thread(func)
value = await trio.run_sync_in_thread(func)
return trio.wrap_file(value)


Expand Down
10 changes: 5 additions & 5 deletions trio/_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import idna as _idna

import trio
from ._threads import run_sync_in_worker_thread
from ._threads import run_sync_in_thread
from ._util import fspath
from ._core import RunVar, wait_socket_readable, wait_socket_writable

Expand Down Expand Up @@ -178,7 +178,7 @@ def numeric_only_failure(exc):
if hr is not None:
return await hr.getaddrinfo(host, port, family, type, proto, flags)
else:
return await run_sync_in_worker_thread(
return await run_sync_in_thread(
_stdlib_socket.getaddrinfo,
host,
port,
Expand All @@ -204,7 +204,7 @@ async def getnameinfo(sockaddr, flags):
if hr is not None:
return await hr.getnameinfo(sockaddr, flags)
else:
return await run_sync_in_worker_thread(
return await run_sync_in_thread(
_stdlib_socket.getnameinfo, sockaddr, flags, cancellable=True
)

Expand All @@ -215,7 +215,7 @@ async def getprotobyname(name):
Like :func:`socket.getprotobyname`, but async.
"""
return await run_sync_in_worker_thread(
return await run_sync_in_thread(
_stdlib_socket.getprotobyname, name, cancellable=True
)

Expand Down Expand Up @@ -463,7 +463,7 @@ async def bind(self, address):
):
# Use a thread for the filesystem traversal (unless it's an
# abstract domain socket)
return await run_sync_in_worker_thread(self._sock.bind, address)
return await run_sync_in_thread(self._sock.bind, address)
else:
# POSIX actually says that bind can return EWOULDBLOCK and
# complete asynchronously, like connect. But in practice AFAICT
Expand Down
4 changes: 2 additions & 2 deletions trio/_subprocess_platform/waitid.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from .. import _core, _subprocess
from .._sync import CapacityLimiter, Event
from .._threads import run_sync_in_worker_thread
from .._threads import run_sync_in_thread

try:
from os import waitid
Expand Down Expand Up @@ -74,7 +74,7 @@ async def _waitid_system_task(pid: int, event: Event) -> None:
# call to trio.run is shutting down.

try:
await run_sync_in_worker_thread(
await run_sync_in_thread(
sync_wait_reapable,
pid,
cancellable=True,
Expand Down
6 changes: 3 additions & 3 deletions trio/_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ class CapacityLimiter:
fixed number of seats, and if they're all taken then you have to wait for
someone to get up before you can sit down.
By default, :func:`run_sync_in_worker_thread` uses a
By default, :func:`run_sync_in_thread` uses a
:class:`CapacityLimiter` to limit the number of threads running at once;
see :func:`current_default_worker_thread_limiter` for details.
see :func:`current_default_thread_limiter` for details.
If you're familiar with semaphores, then you can think of this as a
restricted semaphore that's specialized for one common use case, with
Expand Down Expand Up @@ -246,7 +246,7 @@ def acquire_on_behalf_of_nowait(self, borrower):
Args:
borrower: A :class:`trio.hazmat.Task` or arbitrary opaque object
used to record who is borrowing this token. This is used by
:func:`run_sync_in_worker_thread` to allow threads to "hold
:func:`run_sync_in_thread` to allow threads to "hold
tokens", with the intention in the future of using it to `allow
deadlock detection and other useful things
<https://github.com/python-trio/trio/issues/182>`__
Expand Down
Loading

0 comments on commit c13c60e

Please sign in to comment.