From 15ead6b561e0cce87d5bcbaecac308bf8f7f49c2 Mon Sep 17 00:00:00 2001
From: Tyler Goodlet <jgbt@protonmail.com>
Date: Wed, 16 Dec 2020 21:42:28 -0500
Subject: [PATCH 1/4] Add a way to shield a stream's underlying channel

Add a ``tractor._portal.StreamReceiveChannel.shield_channel()`` context
manager which allows for avoiding the closing of an IPC stream's
underlying channel for the purposes of task re-spawning. Sometimes you
might want to cancel a task consuming a stream but not tear down the IPC
between actors (the default). A common use can might be where the task's
"setup" work might need to be redone but you want to keep the
established portal / channel in tact despite the task restart.

Includes a test.
---
 tests/test_streaming.py | 73 +++++++++++++++++++++++++++++++++++++++++
 tractor/_portal.py      | 20 +++++++++++
 2 files changed, 93 insertions(+)

diff --git a/tests/test_streaming.py b/tests/test_streaming.py
index 919b2787b..c7ea4e0e4 100644
--- a/tests/test_streaming.py
+++ b/tests/test_streaming.py
@@ -7,6 +7,7 @@
 
 import trio
 import tractor
+from tractor.testing import tractor_test
 import pytest
 
 
@@ -53,6 +54,7 @@ async def stream_from_single_subactor(stream_func_name):
     """Verify we can spawn a daemon actor and retrieve streamed data.
     """
     async with tractor.find_actor('streamerd') as portals:
+
         if not portals:
             # only one per host address, spawns an actor if None
             async with tractor.open_nursery() as nursery:
@@ -73,8 +75,10 @@ async def stream_from_single_subactor(stream_func_name):
                 # it'd sure be nice to have an asyncitertools here...
                 iseq = iter(seq)
                 ival = next(iseq)
+
                 async for val in stream:
                     assert val == ival
+
                     try:
                         ival = next(iseq)
                     except StopIteration:
@@ -83,6 +87,7 @@ async def stream_from_single_subactor(stream_func_name):
                         await stream.aclose()
 
                 await trio.sleep(0.3)
+
                 try:
                     await stream.__anext__()
                 except StopAsyncIteration:
@@ -109,8 +114,11 @@ def test_stream_from_single_subactor(arb_addr, start_method, stream_func):
 
 # this is the first 2 actors, streamer_1 and streamer_2
 async def stream_data(seed):
+
     for i in range(seed):
+
         yield i
+
         # trigger scheduler to simulate practical usage
         await trio.sleep(0)
 
@@ -246,3 +254,68 @@ def test_not_fast_enough_quad(
     else:
         # should be cancelled mid-streaming
         assert results is None
+
+
+@tractor_test
+async def test_respawn_consumer_task(
+    arb_addr,
+    spawn_backend,
+    loglevel,
+):
+    """Verify that ``._portal.StreamReceiveChannel.shield_channel()``
+    sucessfully protects the underlying IPC channel from being closed
+    when cancelling and respawning a consumer task.
+
+    This also serves to verify that all values from the stream can be
+    received despite the respawns.
+
+    """
+    stream = None
+
+    async with tractor.open_nursery() as n:
+
+        stream = await(await n.run_in_actor(
+            'streamer',
+            stream_data,
+            seed=11,
+        )).result()
+
+        expect = set(range(11))
+        received = []
+
+        # this is the re-spawn task routine
+        async def consume(task_status=trio.TASK_STATUS_IGNORED):
+            print('starting consume task..')
+            nonlocal stream
+
+            with trio.CancelScope() as cs:
+                task_status.started(cs)
+
+                # shield stream's underlying channel from cancellation
+                with stream.shield_channel():
+
+                    async for v in stream:
+                        print(f'from stream: {v}')
+                        expect.remove(v)
+                        received.append(v)
+
+                print('exited consume')
+
+        async with trio.open_nursery() as ln:
+            cs = await ln.start(consume)
+
+            while True:
+
+                await trio.sleep(0.1)
+
+                if received[-1] % 2 == 0:
+
+                    print('cancelling consume task..')
+                    cs.cancel()
+
+                    # respawn
+                    cs = await ln.start(consume)
+
+                if not expect:
+                    print("all values streamed, BREAKING")
+                    break
diff --git a/tractor/_portal.py b/tractor/_portal.py
index af3c1b5c1..50036a841 100644
--- a/tractor/_portal.py
+++ b/tractor/_portal.py
@@ -7,6 +7,7 @@
 from typing import Tuple, Any, Dict, Optional, Set
 from functools import partial
 from dataclasses import dataclass
+from contextlib import contextmanager
 
 import trio
 from async_generator import asynccontextmanager
@@ -59,6 +60,7 @@ def __init__(
         self._cid = cid
         self._rx_chan = rx_chan
         self._portal = portal
+        self._shielded = False
 
     # delegate directly to underlying mem channel
     def receive_nowait(self):
@@ -83,6 +85,18 @@ async def receive(self):
                 "Received internal error at portal?")
             raise unpack_error(msg, self._portal.channel)
 
+    @contextmanager
+    def shield_channel(
+        self
+    ) -> typing.AsyncGenerator['StreamReceiveChannel', None]:
+        """Shield this stream's underlying channel such that a local consumer task
+        can be cancelled (and possibly restarted) using ``trio.Cancelled``.
+
+        """
+        self._shielded = True
+        yield self
+        self._shielded = False
+
     async def aclose(self):
         """Cancel associated remote actor task and local memory channel
         on close.
@@ -90,12 +104,18 @@ async def aclose(self):
         if self._rx_chan._closed:
             log.warning(f"{self} is already closed")
             return
+
+        if self._shielded:
+            log.warning(f"{self} is shielded, portal channel being kept alive")
+            return
+
         cid = self._cid
         with trio.move_on_after(0.5) as cs:
             cs.shield = True
             log.warning(
                 f"Cancelling stream {cid} to "
                 f"{self._portal.channel.uid}")
+
             # NOTE: we're telling the far end actor to cancel a task
             # corresponding to *this actor*. The far end local channel
             # instance is passed to `Actor._cancel_task()` implicitly.

From 201771a521b409f7665a356a4dfa68bca8b43151 Mon Sep 17 00:00:00 2001
From: Tyler Goodlet <jgbt@protonmail.com>
Date: Thu, 17 Dec 2020 11:58:48 -0500
Subject: [PATCH 2/4] 'Fix mypy, change interal type name to `ReceiveStream`,
 settle on `.shield()`'

---
 tests/test_streaming.py |  4 ++--
 tractor/_portal.py      | 14 +++++++-------
 2 files changed, 9 insertions(+), 9 deletions(-)

diff --git a/tests/test_streaming.py b/tests/test_streaming.py
index c7ea4e0e4..a64515f01 100644
--- a/tests/test_streaming.py
+++ b/tests/test_streaming.py
@@ -262,7 +262,7 @@ async def test_respawn_consumer_task(
     spawn_backend,
     loglevel,
 ):
-    """Verify that ``._portal.StreamReceiveChannel.shield_channel()``
+    """Verify that ``._portal.ReceiveStream.shield()``
     sucessfully protects the underlying IPC channel from being closed
     when cancelling and respawning a consumer task.
 
@@ -292,7 +292,7 @@ async def consume(task_status=trio.TASK_STATUS_IGNORED):
                 task_status.started(cs)
 
                 # shield stream's underlying channel from cancellation
-                with stream.shield_channel():
+                with stream.shield():
 
                     async for v in stream:
                         print(f'from stream: {v}')
diff --git a/tractor/_portal.py b/tractor/_portal.py
index 50036a841..15d86e3d5 100644
--- a/tractor/_portal.py
+++ b/tractor/_portal.py
@@ -4,7 +4,7 @@
 import importlib
 import inspect
 import typing
-from typing import Tuple, Any, Dict, Optional, Set
+from typing import Tuple, Any, Dict, Optional, Set, Iterator
 from functools import partial
 from dataclasses import dataclass
 from contextlib import contextmanager
@@ -38,7 +38,7 @@ async def maybe_open_nursery(
             yield nursery
 
 
-class StreamReceiveChannel(trio.abc.ReceiveChannel):
+class ReceiveStream(trio.abc.ReceiveChannel):
     """A wrapper around a ``trio._channel.MemoryReceiveChannel`` with
     special behaviour for signalling stream termination across an
     inter-actor ``Channel``. This is the type returned to a local task
@@ -86,9 +86,9 @@ async def receive(self):
             raise unpack_error(msg, self._portal.channel)
 
     @contextmanager
-    def shield_channel(
+    def shield(
         self
-    ) -> typing.AsyncGenerator['StreamReceiveChannel', None]:
+    ) -> Iterator['ReceiveStream']:  # noqa
         """Shield this stream's underlying channel such that a local consumer task
         can be cancelled (and possibly restarted) using ``trio.Cancelled``.
 
@@ -156,7 +156,7 @@ def __init__(self, channel: Channel) -> None:
         self._expect_result: Optional[
             Tuple[str, Any, str, Dict[str, Any]]
         ] = None
-        self._streams: Set[StreamReceiveChannel] = set()
+        self._streams: Set[ReceiveStream] = set()
         self.actor = current_actor()
 
     async def _submit(
@@ -219,7 +219,7 @@ async def _return_from_resptype(
         # to make async-generators the fundamental IPC API over channels!
         # (think `yield from`, `gen.send()`, and functional reactive stuff)
         if resptype == 'yield':  # stream response
-            rchan = StreamReceiveChannel(cid, recv_chan, self)
+            rchan = ReceiveStream(cid, recv_chan, self)
             self._streams.add(rchan)
             return rchan
 
@@ -322,7 +322,7 @@ class LocalPortal:
     A compatibility shim for normal portals but for invoking functions
     using an in process actor instance.
     """
-    actor: 'Actor'  # type: ignore
+    actor: 'Actor'  # type: ignore # noqa
     channel: Channel
 
     async def run(self, ns: str, func_name: str, **kwargs) -> Any:

From 797bcc1df2c33580494f57bc3f3531a28956d852 Mon Sep 17 00:00:00 2001
From: Tyler Goodlet <jgbt@protonmail.com>
Date: Thu, 17 Dec 2020 13:35:45 -0500
Subject: [PATCH 3/4] Handle early timeouts on last debugger test

---
 tests/test_debugger.py | 30 +++++++++++++++++++++++++-----
 tractor/_ipc.py        |  3 +++
 2 files changed, 28 insertions(+), 5 deletions(-)

diff --git a/tests/test_debugger.py b/tests/test_debugger.py
index 53c3c84be..d63a1bedf 100644
--- a/tests/test_debugger.py
+++ b/tests/test_debugger.py
@@ -6,6 +6,7 @@
 
 TODO: None of these tests have been run successfully on windows yet.
 """
+import time
 from os import path
 
 import pytest
@@ -370,6 +371,8 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(spawn, start_method
     child has unblocked (which can happen when it has the tty lock and
     is engaged in pdb) it is indeed cancelled after exiting the debugger.
     """
+    timed_out_early = False
+
     child = spawn('root_cancelled_but_child_is_in_tty_lock')
 
     child.expect(r"\(Pdb\+\+\)")
@@ -377,9 +380,13 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(spawn, start_method
     before = str(child.before.decode())
     assert "NameError: name 'doggypants' is not defined" in before
     assert "tractor._exceptions.RemoteActorError: ('name_error'" not in before
+    time.sleep(0.5)
+
     child.sendline('c')
 
-    for _ in range(4):
+
+    for i in range(4):
+        time.sleep(0.5)
         try:
             child.expect(r"\(Pdb\+\+\)")
         except TimeoutError:
@@ -390,13 +397,26 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(spawn, start_method
             else:
                 raise
 
+        except pexpect.exceptions.EOF:
+            print(f"Failed early on {i}?")
+            before = str(child.before.decode())
+
+            timed_out_early = True
+
+            # race conditions on how fast the continue is sent?
+            break
+
+
         before = str(child.before.decode())
         assert "NameError: name 'doggypants' is not defined" in before
 
         child.sendline('c')
 
     child.expect(pexpect.EOF)
-    before = str(child.before.decode())
-    assert "tractor._exceptions.RemoteActorError: ('spawner0'" in before
-    assert "tractor._exceptions.RemoteActorError: ('name_error'" in before
-    assert "NameError: name 'doggypants' is not defined" in before
+
+    if not timed_out_early:
+
+        before = str(child.before.decode())
+        assert "tractor._exceptions.RemoteActorError: ('spawner0'" in before
+        assert "tractor._exceptions.RemoteActorError: ('name_error'" in before
+        assert "NameError: name 'doggypants' is not defined" in before
diff --git a/tractor/_ipc.py b/tractor/_ipc.py
index a3a271d4c..7f6a49813 100644
--- a/tractor/_ipc.py
+++ b/tractor/_ipc.py
@@ -214,9 +214,12 @@ async def _aiter_recv(
                     #     # time is pointless
                     #     await self.msgstream.send(sent)
             except trio.BrokenResourceError:
+
                 if not self._autorecon:
                     raise
+
             await self.aclose()
+
             if self._autorecon:  # attempt reconnect
                 await self._reconnect()
                 continue

From 47f68a05323993f076cb8647266d9294f75f486c Mon Sep 17 00:00:00 2001
From: Tyler Goodlet <jgbt@protonmail.com>
Date: Thu, 17 Dec 2020 16:37:05 -0500
Subject: [PATCH 4/4] Skip debugger tests on non-trio backends

---
 tests/test_debugger.py | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/tests/test_debugger.py b/tests/test_debugger.py
index d63a1bedf..8a0423b90 100644
--- a/tests/test_debugger.py
+++ b/tests/test_debugger.py
@@ -42,10 +42,16 @@ def mk_cmd(ex_name: str) -> str:
 
 @pytest.fixture
 def spawn(
+    start_method,
     testdir,
     arb_addr,
 ) -> 'pexpect.spawn':
 
+    if start_method != 'trio':
+        pytest.skip(
+            "Debugger tests are only supported on the trio backend"
+        )
+
     def _spawn(cmd):
         return testdir.spawn(
             cmd=mk_cmd(cmd),