diff --git a/.travis.yml b/.travis.yml index 0afd53d5d6..ee29d1fc23 100644 --- a/.travis.yml +++ b/.travis.yml @@ -11,14 +11,6 @@ dist: trusty matrix: include: - - os: linux - language: python - python: 3.6 - env: DOC_BUILD=1 - - os: linux - language: python - python: 3.6 - env: FORMATTING=1 - os: linux language: generic env: USE_PYPY_NIGHTLY=1 @@ -31,6 +23,14 @@ matrix: - os: osx language: generic env: MACPYTHON=3.6.0 + - os: linux + language: python + python: 3.6 + env: CHECK_DOCS=1 + - os: linux + language: python + python: 3.6 + env: CHECK_FORMATTING=1 script: - ci/travis.sh diff --git a/ci/travis.sh b/ci/travis.sh index a5e6378dd2..ec4f0d92a1 100755 --- a/ci/travis.sh +++ b/ci/travis.sh @@ -35,9 +35,10 @@ if [ "$USE_PYPY_NIGHTLY" = "1" ]; then # something like "pypy-c-jit-89963-748aa3022295-linux64" PYPY_DIR=$(echo pypy-c-jit-*) PYTHON_EXE=$PYPY_DIR/bin/pypy3 - $PYTHON_EXE -m ensurepip - $PYTHON_EXE -m pip install virtualenv - $PYTHON_EXE -m virtualenv testenv + ($PYTHON_EXE -m ensurepip \ + && $PYTHON_EXE -m pip install virtualenv \ + && $PYTHON_EXE -m virtualenv testenv) \ + || echo "pypy nightly is broken; skipping tests"; exit 0 source testenv/bin/activate fi @@ -55,36 +56,37 @@ fi pip install -U pip setuptools wheel -python setup.py sdist --formats=zip -pip install dist/*.zip - -if [ "$DOC_BUILD" = "1" ]; then - pip install -Ur ci/rtd-requirements.txt - cd docs - # -n (nit-picky): warn on missing references - # -W: turn warnings into errors - sphinx-build -nW -b html source build -elif [ "$FORMATTING" = "1" ]; then +if [ "$CHECK_FORMATTING" = "1" ]; then pip install -U yapf - yapf -rdp setup.py trio > formatting-fixes.patch - if [ -s formatting-fixes.patch ]; then + if ! yapf -rpd setup.py trio; then cat <".format(id(self)) + + def started(self, value=None): + if self._called_started: + raise RuntimeError( + "called 'started' twice on the same task status" + ) + self._called_started = True + self._value = value + + # If the old nursery is cancelled, then quietly quit now; the child + # will eventually exit on its own, and we don't want to risk moving + # the children into a different scope while they might have + # propagating Cancelled exceptions that assume they're under the old + # scope. + if _pending_cancel_scope(self._old_nursery._cancel_stack) is not None: + return + + # Can't be closed, b/c we checked in start() and then _pending_starts + # should keep it open. + assert not self._new_nursery._closed + + # otherwise, find all the tasks under the old nursery, and move them + # under the new nursery instead. This means: + # - changing parents of direct children + # - changing cancel stack of all direct+indirect children + # - changing cancel stack of all direct+indirect children's nurseries + # - checking for cancellation in all changed cancel stacks + old_stack = self._old_nursery._cancel_stack + new_stack = self._new_nursery._cancel_stack + # LIFO todo stack for depth-first traversal + todo = list(self._old_nursery._children) + munged_tasks = [] + while todo: + task = todo.pop() + # Direct children need to be reparented + if task._parent_nursery is self._old_nursery: + self._old_nursery._children.remove(task) + task._parent_nursery = self._new_nursery + self._new_nursery._children.add(task) + # Everyone needs their cancel scopes fixed up... + assert task._cancel_stack[:len(old_stack)] == old_stack + task._cancel_stack[:len(old_stack)] = new_stack + # ...and their nurseries' cancel scopes fixed up. + for nursery in task._child_nurseries: + assert nursery._cancel_stack[:len(old_stack)] == old_stack + nursery._cancel_stack[:len(old_stack)] = new_stack + # And then add all the nursery's children to our todo list + todo.extend(nursery._children) + # And make a note to check for cancellation later + munged_tasks.append(task) + + # Tell all the cancel scopes about the change. (There are probably + # some scopes in common between the two stacks, so some scopes will + # get the same tasks removed and then immediately re-added. This is + # fine though.) + for cancel_scope in old_stack: + cancel_scope._tasks_removed_by_adoption(munged_tasks) + for cancel_scope in new_stack: + cancel_scope._tasks_added_by_adoption(munged_tasks) + + # That should have removed all the children from the old nursery + assert not self._old_nursery._children + + # After all the delicate surgery is done, check for cancellation in + # all the tasks that had their cancel scopes munged. This can trigger + # arbitrary abort() callbacks, so we put it off until our internal + # data structures are all self-consistent again. + for task in munged_tasks: + task._attempt_delivery_of_any_pending_cancel() + + # And finally, we cancel the old nursery's scope, so that its + # __aexit__ notices that all the children are gone and it can exit. + # (This is a bit of a hack.) + self._old_nursery.cancel_scope.cancel() + + @acontextmanager @async_generator @enable_ki_protection @@ -223,13 +320,13 @@ async def open_nursery(): # async def __aenter__(self): # self._scope_manager = open_cancel_scope() # scope = self._scope_manager.__enter__() -# self._nursery = Nursery(current_task(), scope) -# return self._nursery +# self._parent_nursery = Nursery(current_task(), scope) +# return self._parent_nursery # # @enable_ki_protection # async def __aexit__(self, etype, exc, tb): # try: -# await self._nursery._clean_up(exc) +# await self._parent_nursery._clean_up(exc) # except BaseException as new_exc: # if not self._scope_manager.__exit__( # type(new_exc), new_exc, new_exc.__traceback__): @@ -250,6 +347,7 @@ def __init__(self, parent, cancel_scope): # the parent task -- only used for introspection, to implement # task.parent_task self._parent = parent + parent._child_nurseries.append(self) # the cancel stack that children inherit - we take a snapshot, so it # won't be affected by any changes in the parent. self._cancel_stack = list(parent._cancel_stack) @@ -258,6 +356,7 @@ def __init__(self, parent, cancel_scope): self.cancel_scope = cancel_scope assert self.cancel_scope is self._cancel_stack[-1] self._children = set() + self._pending_starts = 0 self._zombies = set() self.monitor = _core.UnboundedQueue() self._closed = False @@ -275,9 +374,35 @@ def _child_finished(self, task): self._zombies.add(task) self.monitor.put_nowait(task) + def start_soon(self, async_fn, *args, name=None): + GLOBAL_RUN_CONTEXT.runner.spawn_impl(async_fn, args, self, name) + + # Returns the task, unlike start_soon + #@deprecated("nursery.spawn", version="0.2.0", "nursery.start_soon") def spawn(self, async_fn, *args, name=None): return GLOBAL_RUN_CONTEXT.runner.spawn_impl(async_fn, args, self, name) + async def start(self, async_fn, *args, name=None): + if self._closed: + raise RuntimeError("Nursery is closed to new arrivals") + try: + self._pending_starts += 1 + async with open_nursery() as old_nursery: + task_status = _TaskStatus(old_nursery, self) + thunk = functools.partial(async_fn, task_status=task_status) + old_nursery.start_soon(thunk, *args, name=name) + # If we get here, then the child either got reparented or exited + # normally. The complicated logic is all in __TaskStatus.started(). + # (Any exceptions propagate directly out of the above.) + if not task_status._called_started: + raise RuntimeError( + "child exited without calling task_status.started()" + ) + return task_status._value + finally: + self._pending_starts -= 1 + self.monitor.put_nowait(None) + def reap(self, task): try: self._zombies.remove(task) @@ -303,7 +428,7 @@ async def _clean_up(self, pending_exc): await _core.yield_briefly() except BaseException as exc: exceptions.append(exc) - while self._children or self._zombies: + while self._children or self._zombies or self._pending_starts: # First, reap any zombies. They may or may not still be in the # monitor queue, and they may or may not trigger cancellation # of remaining tasks, so we have to check first before @@ -318,7 +443,7 @@ async def _clean_up(self, pending_exc): clean_up_scope.shield = True cancelled_children = True - if self.children: + if self.children or self._pending_starts: try: # We ignore the return value here, and will pick up # the actual tasks from the zombies set after looping @@ -329,6 +454,8 @@ async def _clean_up(self, pending_exc): exceptions.append(exc) self._closed = True + popped = self._parent._child_nurseries.pop() + assert popped is self if exceptions: mexc = MultiError(exceptions) if (pending_exc and mexc.__cause__ is None @@ -363,9 +490,22 @@ def __del__(self): ################################################################ +def _pending_cancel_scope(cancel_stack): + # Return the outermost exception that is is not outside a shield. + pending_scope = None + for scope in cancel_stack: + # Check shield before _exc, because shield should not block + # processing of *this* scope's exception + if scope.shield: + pending_scope = None + if pending_scope is None and scope.cancel_called: + pending_scope = scope + return pending_scope + + @attr.s(slots=True, cmp=False, hash=False, repr=False) class Task: - _nursery = attr.ib() + _parent_nursery = attr.ib() coro = attr.ib() _runner = attr.ib() name = attr.ib() @@ -380,6 +520,9 @@ class Task: _next_send = attr.ib(default=None) _abort_func = attr.ib(default=None) + # For introspection and nursery.start() + _child_nurseries = attr.ib(default=attr.Factory(list)) + # Task-local values, see _local.py _locals = attr.ib(default=attr.Factory(dict)) @@ -399,10 +542,10 @@ def parent_task(self): Example use case: drawing a visualization of the task tree. """ - if self._nursery is None: + if self._parent_nursery is None: return None else: - return self._nursery._parent + return self._parent_nursery._parent ################ # Monitoring task exit @@ -469,16 +612,7 @@ async def wait(self): _cancel_stack = attr.ib(default=attr.Factory(list), repr=False) def _pending_cancel_scope(self): - # Return the outermost exception that is is not outside a shield. - pending_scope = None - for scope in self._cancel_stack: - # Check shield before _exc, because shield should not block - # processing of *this* scope's exception - if scope.shield: - pending_scope = None - if pending_scope is None and scope.cancel_called: - pending_scope = scope - return pending_scope + return _pending_cancel_scope(self._cancel_stack) def _attempt_abort(self, raise_cancel): # Either the abort succeeds, in which case we will reschedule the @@ -757,7 +891,7 @@ def _return_value_looks_like_wrong_library(value): name = "{}.{}".format(name.__module__, name.__qualname__) except AttributeError: name = repr(name) - task = Task(coro=coro, nursery=nursery, runner=self, name=name) + task = Task(coro=coro, parent_nursery=nursery, runner=self, name=name) self.tasks.add(task) if nursery is not None: nursery._children.add(task) @@ -784,11 +918,11 @@ def task_exited(self, task, result): while task._cancel_stack: task._cancel_stack[-1]._remove_task(task) self.tasks.remove(task) - if task._nursery is None: + if task._parent_nursery is None: # the init task should be the last task to exit assert not self.tasks else: - task._nursery._child_finished(task) + task._parent_nursery._child_finished(task) for monitor in task._monitors: monitor.put_nowait(task) task._monitors.clear() @@ -1443,6 +1577,17 @@ def run_impl(runner, async_fn, args): ################################################################ +class _StatusIgnored: + def __repr__(self): + return "STATUS_IGNORED" + + def started(self, value=None): + pass + + +STATUS_IGNORED = _StatusIgnored() + + def current_task(): """Return the :class:`Task` object representing the current task. diff --git a/trio/_core/tests/conftest.py b/trio/_core/tests/conftest.py index a60b6d16a2..551e650456 100644 --- a/trio/_core/tests/conftest.py +++ b/trio/_core/tests/conftest.py @@ -10,6 +10,11 @@ def mock_clock(): return MockClock() +@pytest.fixture +def autojump_clock(): + return MockClock(autojump_threshold=0) + + # FIXME: split off into a package (or just make part of trio's public # interface?), with config file to enable? and I guess a mark option too; I # guess it's useful with the class- and file-level marking machinery (where diff --git a/trio/_core/tests/test_run.py b/trio/_core/tests/test_run.py index 48a96141a4..435544c8e5 100644 --- a/trio/_core/tests/test_run.py +++ b/trio/_core/tests/test_run.py @@ -1625,3 +1625,182 @@ async def trivial(): assert t.result is not None with assert_yields(): await t.wait() + + +async def test_nursery_start(autojump_clock): + async def no_args(): # pragma: no cover + pass + + # Errors in calling convention get raised immediately from start + async with _core.open_nursery() as nursery: + with pytest.raises(TypeError): + await nursery.start(no_args) + + async def sleep_then_start(seconds, *, task_status=_core.STATUS_IGNORED): + repr(task_status) # smoke test + await sleep(seconds) + task_status.started(seconds) + await sleep(seconds) + + # Basic happy-path check: start waits for the task to call started(), then + # returns, passes back the value, and the given nursery then waits for it + # to exit. + for seconds in [1, 2]: + async with _core.open_nursery() as nursery: + assert len(nursery.children) == 0 + t0 = _core.current_time() + assert await nursery.start(sleep_then_start, seconds) == seconds + assert _core.current_time() - t0 == seconds + assert len(nursery.children) == 1 + assert _core.current_time() - t0 == 2 * seconds + + # Make sure STATUS_IGNORED works so task function can be called directly + t0 = _core.current_time() + await sleep_then_start(3) + assert _core.current_time() - t0 == 2 * 3 + + # calling started twice + async def double_started(task_status=_core.STATUS_IGNORED): + task_status.started() + with pytest.raises(RuntimeError): + task_status.started() + + async with _core.open_nursery() as nursery: + await nursery.start(double_started) + + # child crashes before calling started -> error comes out of .start() + async def raise_keyerror(task_status=_core.STATUS_IGNORED): + raise KeyError("oops") + + async with _core.open_nursery() as nursery: + with pytest.raises(KeyError): + await nursery.start(raise_keyerror) + + # child exiting cleanly before calling started -> triggers a RuntimeError + async def nothing(task_status=_core.STATUS_IGNORED): + return + + async with _core.open_nursery() as nursery: + with pytest.raises(RuntimeError) as excinfo: + await nursery.start(nothing) + assert "exited without calling" in str(excinfo.value) + + # if the call to start() is cancelled, then the call to started() does + # nothing -- the child keeps executing under start(). The value it passed + # is ignored; start() raises Cancelled. + async def just_started(task_status=_core.STATUS_IGNORED): + task_status.started("hi") + + async with _core.open_nursery() as nursery: + with _core.open_cancel_scope() as cs: + cs.cancel() + with pytest.raises(_core.Cancelled): + await nursery.start(just_started) + + # and if after the no-op started(), the child crashes, the error comes out + # of start() + async def raise_keyerror_after_started(task_status=_core.STATUS_IGNORED): + task_status.started() + raise KeyError("whoopsiedaisy") + + async with _core.open_nursery() as nursery: + with _core.open_cancel_scope() as cs: + cs.cancel() + with pytest.raises(_core.MultiError) as excinfo: + await nursery.start(raise_keyerror_after_started) + assert set(type(e) for e in excinfo.value.exceptions) == { + _core.Cancelled, KeyError + } + + # trying to start in a closed nursery raises an error immediately + async with _core.open_nursery() as closed_nursery: + pass + t0 = _core.current_time() + with pytest.raises(RuntimeError): + await closed_nursery.start(sleep_then_start, 7) + assert _core.current_time() == t0 + + +async def test_task_nursery_stack(): + task = _core.current_task() + assert task._child_nurseries == [] + async with _core.open_nursery() as nursery1: + assert task._child_nurseries == [nursery1] + with pytest.raises(KeyError): + async with _core.open_nursery() as nursery2: + assert task._child_nurseries == [nursery1, nursery2] + raise KeyError + assert task._child_nurseries == [nursery1] + assert task._child_nurseries == [] + + +async def test_nursery_start_with_cancelled_nursery(): + # This function isn't testing task_status, it's using task_status as a + # convenient way to get a nursery that we can test spawning stuff into. + async def setup_nursery(task_status=_core.STATUS_IGNORED): + async with _core.open_nursery() as nursery: + task_status.started(nursery) + await sleep_forever() + + # Calls started() while children are asleep, so we can make sure + # that the cancellation machinery notices and aborts when a sleeping task + # is moved into a cancelled scope. + async def sleeping_children(fn, *, task_status=_core.STATUS_IGNORED): + async with _core.open_nursery() as nursery: + nursery.start_soon(sleep_forever) + nursery.start_soon(sleep_forever) + await wait_all_tasks_blocked() + fn() + task_status.started() + + # Cancelling the setup_nursery just *before* calling started() + async with _core.open_nursery() as nursery: + target_nursery = await nursery.start(setup_nursery) + await target_nursery.start( + sleeping_children, target_nursery.cancel_scope.cancel + ) + + # Cancelling the setup_nursery just *after* calling started() + async with _core.open_nursery() as nursery: + target_nursery = await nursery.start(setup_nursery) + await target_nursery.start(sleeping_children, lambda: None) + target_nursery.cancel_scope.cancel() + + +async def test_nursery_start_keeps_nursery_open(autojump_clock): + async def sleep_a_bit(task_status=_core.STATUS_IGNORED): + await sleep(2) + task_status.started() + await sleep(3) + + async with _core.open_nursery() as nursery1: + t0 = _core.current_time() + async with _core.open_nursery() as nursery2: + # Start the 'start' call running in the background + nursery1.start_soon(nursery2.start, sleep_a_bit) + # Sleep a bit + await sleep(1) + # Start another one. + nursery1.start_soon(nursery2.start, sleep_a_bit) + # Then exit this nursery. At this point, there are no tasks + # present in this nursery -- the only thing keeping it open is + # that the tasks will be placed into it soon, when they call + # started(). + assert _core.current_time() - t0 == 6 + + # Check that it still works even if the task that the nursery is waiting + # for ends up crashing, and never actually enters the nursery. + async def sleep_then_crash(task_status=_core.STATUS_IGNORED): + await sleep(7) + raise KeyError + + async def start_sleep_then_crash(nursery): + with pytest.raises(KeyError): + await nursery.start(sleep_then_crash) + + async with _core.open_nursery() as nursery1: + t0 = _core.current_time() + async with _core.open_nursery() as nursery2: + nursery1.start_soon(start_sleep_then_crash, nursery2) + await wait_all_tasks_blocked() + assert _core.current_time() - t0 == 7 diff --git a/trio/tests/test_ssl.py b/trio/tests/test_ssl.py index cce863e0fc..a087347cad 100644 --- a/trio/tests/test_ssl.py +++ b/trio/tests/test_ssl.py @@ -53,9 +53,7 @@ TRIO_TEST_CA = trustme.CA() TRIO_TEST_1_CERT = TRIO_TEST_CA.issue_server_cert("trio-test-1.example.org") -SERVER_CTX = stdlib_ssl.create_default_context( - stdlib_ssl.Purpose.CLIENT_AUTH, -) +SERVER_CTX = stdlib_ssl.create_default_context(stdlib_ssl.Purpose.CLIENT_AUTH) TRIO_TEST_1_CERT.configure_cert(SERVER_CTX) CLIENT_CTX = stdlib_ssl.create_default_context()