Skip to content

Commit

Permalink
Only send stop msg if not received from far end
Browse files Browse the repository at this point in the history
  • Loading branch information
goodboy committed May 12, 2021
1 parent 8dcdbfa commit 3f2b840
Showing 1 changed file with 8 additions and 2 deletions.
10 changes: 8 additions & 2 deletions tractor/_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ def __init__(

# delegate directly to underlying mem channel
def receive_nowait(self):
return self._rx_chan.receive_nowait()
msg = self._rx_chan.receive_nowait()
return msg['yield']

async def receive(self):
try:
Expand Down Expand Up @@ -106,6 +107,7 @@ async def receive(self):
# when the send is closed we assume the stream has
# terminated and signal this local iterator to stop
await self.aclose()

# await self._ctx.send_stop()
raise StopAsyncIteration

Expand Down Expand Up @@ -326,10 +328,14 @@ async def open_stream(
try:
yield rchan

finally:
except trio.EndOfChannel:
raise

else:
# signal ``StopAsyncIteration`` on far end.
await self.send_stop()

finally:
if self._portal:
self._portal._streams.remove(rchan)

Expand Down

0 comments on commit 3f2b840

Please sign in to comment.