-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathmultiple_streams_one_portal.py
46 lines (28 loc) · 1.04 KB
/
multiple_streams_one_portal.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import trio
import tractor
log = tractor.log.get_logger('multiportal')
async def stream_data(seed=10):
log.info("Starting stream task")
for i in range(seed):
yield i
await trio.sleep(0) # trigger scheduler
async def stream_from_portal(p, consumed):
async with p.open_stream_from(stream_data) as stream:
async for item in stream:
if item in consumed:
consumed.remove(item)
else:
consumed.append(item)
async def main():
async with tractor.open_nursery(loglevel='info') as an:
p = await an.start_actor('stream_boi', enable_modules=[__name__])
consumed = []
async with trio.open_nursery() as n:
for i in range(2):
n.start_soon(stream_from_portal, p, consumed)
# both streaming consumer tasks have completed and so we should
# have nothing in our list thanks to single threadedness
assert not consumed
await an.cancel()
if __name__ == '__main__':
trio.run(main)