-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bidir streaming #209
Bidir streaming #209
Changes from all commits
4371a0a
babe62a
b706cd9
3625a8c
ebf5315
3f1adc0
68d600d
d59e9ed
be022b8
ddc6c85
bebe26c
18135b4
20e73c5
8017e55
732b9fe
a4a6df5
910df13
b3437da
79c8b75
7069035
f8e2d40
0af5852
0083145
83c4b93
201392a
87f1af0
f2b1ef3
43ce533
197d291
59c8f72
288e2b5
17dc6aa
ced5d42
627f107
17fca76
6f22ee8
6e75913
377b8c1
8371621
e1533d3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,220 @@ | ||
""" | ||
Advanced streaming patterns using bidirectional streams and contexts. | ||
|
||
""" | ||
import itertools | ||
from typing import Set, Dict, List | ||
|
||
import trio | ||
import tractor | ||
|
||
|
||
_registry: Dict[str, Set[tractor.ReceiveMsgStream]] = { | ||
'even': set(), | ||
'odd': set(), | ||
} | ||
|
||
|
||
async def publisher( | ||
|
||
seed: int = 0, | ||
|
||
) -> None: | ||
|
||
global _registry | ||
|
||
def is_even(i): | ||
return i % 2 == 0 | ||
|
||
for val in itertools.count(seed): | ||
|
||
sub = 'even' if is_even(val) else 'odd' | ||
|
||
for sub_stream in _registry[sub].copy(): | ||
await sub_stream.send(val) | ||
|
||
# throttle send rate to ~1kHz | ||
# making it readable to a human user | ||
await trio.sleep(1/1000) | ||
|
||
|
||
@tractor.context | ||
async def subscribe( | ||
|
||
ctx: tractor.Context, | ||
|
||
) -> None: | ||
|
||
global _registry | ||
|
||
# syn caller | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
await ctx.started(None) | ||
|
||
async with ctx.open_stream() as stream: | ||
|
||
# update subs list as consumer requests | ||
async for new_subs in stream: | ||
|
||
new_subs = set(new_subs) | ||
remove = new_subs - _registry.keys() | ||
|
||
print(f'setting sub to {new_subs} for {ctx.chan.uid}') | ||
|
||
# remove old subs | ||
for sub in remove: | ||
_registry[sub].remove(stream) | ||
|
||
# add new subs for consumer | ||
for sub in new_subs: | ||
_registry[sub].add(stream) | ||
|
||
|
||
async def consumer( | ||
|
||
subs: List[str], | ||
|
||
) -> None: | ||
|
||
uid = tractor.current_actor().uid | ||
|
||
async with tractor.wait_for_actor('publisher') as portal: | ||
async with portal.open_context(subscribe) as (ctx, first): | ||
async with ctx.open_stream() as stream: | ||
|
||
# flip between the provided subs dynamically | ||
if len(subs) > 1: | ||
|
||
for sub in itertools.cycle(subs): | ||
print(f'setting dynamic sub to {sub}') | ||
await stream.send([sub]) | ||
|
||
count = 0 | ||
async for value in stream: | ||
print(f'{uid} got: {value}') | ||
if count > 5: | ||
break | ||
count += 1 | ||
|
||
else: # static sub | ||
|
||
await stream.send(subs) | ||
async for value in stream: | ||
print(f'{uid} got: {value}') | ||
|
||
|
||
def test_dynamic_pub_sub(): | ||
|
||
global _registry | ||
|
||
from multiprocessing import cpu_count | ||
cpus = cpu_count() | ||
|
||
async def main(): | ||
async with tractor.open_nursery() as n: | ||
|
||
# name of this actor will be same as target func | ||
await n.run_in_actor(publisher) | ||
|
||
for i, sub in zip( | ||
range(cpus - 2), | ||
itertools.cycle(_registry.keys()) | ||
): | ||
await n.run_in_actor( | ||
consumer, | ||
name=f'consumer_{sub}', | ||
subs=[sub], | ||
) | ||
|
||
# make one dynamic subscriber | ||
await n.run_in_actor( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, I'm thinking we could just move the dynamic case into the immediate task here instead of sleeping? |
||
consumer, | ||
name='consumer_dynamic', | ||
subs=list(_registry.keys()), | ||
) | ||
|
||
# block until cancelled by user | ||
goodboy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
with trio.fail_after(3): | ||
await trio.sleep_forever() | ||
|
||
try: | ||
trio.run(main) | ||
except trio.TooSlowError: | ||
pass | ||
|
||
|
||
@tractor.context | ||
async def one_task_streams_and_one_handles_reqresp( | ||
|
||
ctx: tractor.Context, | ||
|
||
) -> None: | ||
|
||
await ctx.started() | ||
|
||
async with ctx.open_stream() as stream: | ||
|
||
async def pingpong(): | ||
'''Run a simple req/response service. | ||
|
||
''' | ||
async for msg in stream: | ||
print('rpc server ping') | ||
assert msg == 'ping' | ||
print('rpc server pong') | ||
await stream.send('pong') | ||
|
||
async with trio.open_nursery() as n: | ||
n.start_soon(pingpong) | ||
|
||
for _ in itertools.count(): | ||
await stream.send('yo') | ||
await trio.sleep(0.01) | ||
|
||
|
||
def test_reqresp_ontopof_streaming(): | ||
'''Test a subactor that both streams with one task and | ||
spawns another which handles a small requests-response | ||
dialogue over the same bidir-stream. | ||
|
||
''' | ||
async def main(): | ||
|
||
with trio.move_on_after(2): | ||
async with tractor.open_nursery() as n: | ||
|
||
# name of this actor will be same as target func | ||
portal = await n.start_actor( | ||
'dual_tasks', | ||
enable_modules=[__name__] | ||
) | ||
|
||
# flat to make sure we get at least one pong | ||
got_pong: bool = False | ||
|
||
async with portal.open_context( | ||
one_task_streams_and_one_handles_reqresp, | ||
|
||
) as (ctx, first): | ||
|
||
assert first is None | ||
|
||
async with ctx.open_stream() as stream: | ||
|
||
await stream.send('ping') | ||
|
||
async for msg in stream: | ||
print(f'client received: {msg}') | ||
|
||
assert msg in {'pong', 'yo'} | ||
|
||
if msg == 'pong': | ||
got_pong = True | ||
await stream.send('ping') | ||
print('client sent ping') | ||
|
||
assert got_pong | ||
|
||
try: | ||
trio.run(main) | ||
except trio.TooSlowError: | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Linking comment f9dd2ad#r50700925.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gc-ss mentioned,
Yup can do that, I just didn't bc turns out the consumers can update via sending updates on their individual streams.
This can also be done though I'm not sure what you mean by environment. A
Context
isn't really required explicitly here but could be used to control cancellation if wanted.For an alt to
_registry
we could do something like,kinda thing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By environment or Context, I meant the context/environment/configuration due to which the function might behave in a non-deterministic manner but the values of which don't necessarily change between function invocations - like a remote host address:port or username/password.
In this case - think about two actors communicating with each other. The exact same actors (types) might behave in very different ways if their context/environment/configuration were different with everything else (eg: arguments) being the same.
Unlike arguments though, their context/environment/configuration dont change between function invocations (like arguments do).
For example, the registry does not need to change between function invocations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean, yeah a service actor can easily create wtv objects they need and mutate them as actor-local state.
They can go further and offer a seperate api to mutate that object/state from other actors if needed.
I'm just not sure we need to show that in this example since the point is just a super basic dynamic pubsub system.
We do have a pretty terrible example in the docs that could be improved.
Bonus points for sweet PRs 😉