-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy path_usage.py
225 lines (171 loc) · 7.27 KB
/
_usage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
import os
import sys
curr_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, os.path.split(curr_dir)[0])
import itertools
import asyncio as aio
from asyncio_pool import AioPool
#<<from_here
async def worker(n): # dummy worker
await aio.sleep(1 / n)
return n
async def spawn_n_usage(todo=[range(1,51), range(51,101), range(101,200)]):
futures = []
async with AioPool(size=20) as pool:
for tasks in todo:
for i in tasks: # too many tasks
# Returns quickly for all tasks, does not wait for pool space.
# Workers are not spawned, they wait for pool space in their
# own background tasks.
fut = pool.spawn_n(worker(i))
futures.append(fut)
# At this point not a single worker should start.
# Context manager calls `join` at exit, so this will finish when all
# workers return, crash or cancelled.
assert sum(itertools.chain.from_iterable(todo)) == \
sum(f.result() for f in futures)
async def spawn_usage(todo=range(1,4)):
futures = []
async with AioPool(size=2) as pool:
for i in todo: # 1, 2, 3
# Returns quickly for 1 and 2, then waits for empty space for 3,
# spawns 3 and returns. Can save some resources I guess.
fut = await pool.spawn(worker(i))
futures.append(fut)
# At this point some of the workers already started.
# Context manager calls `join` at exit, so this will finish when all
# workers return, crash or cancelled.
assert sum(todo) == sum(fut.result() for fut in futures) # all done
async def map_usage(todo=range(100)):
pool = AioPool(size=10)
# Waits and collects results from all spawned workers,
# returns them in same order as `todo`, if worker crashes or cancelled:
# returns exception object as a result.
# Basically, it wraps `spawn_usage` code into one call.
results = await pool.map(worker, todo)
# await pool.join() # is not needed here, bcs no other tasks were spawned
assert isinstance(results[0], ZeroDivisionError) \
and sum(results[1:]) == sum(todo)
async def itermap_usage(todo=range(1,11)):
result = 0
async with AioPool(size=10) as pool:
# Combines spawn_n and iterwait, which is a wrapper for asyncio.wait,
# which yields results of finished workers according to `timeout` and
# `yield_when` params passed to asyncio.wait (see it's docs for details)
async for res in pool.itermap(worker, todo, timeout=0.5):
result += res
# technically, you can skip join call
assert result == sum(todo)
async def callbacks_usage():
async def wrk(n): # custom dummy worker
await aio.sleep(1 / n)
return n
async def cb(res, err, ctx): # callback
if err: # error handling
exc, tb = err
assert tb # the only purpose of this is logging
return exc
pool, n = ctx # context can be anything you like
await aio.sleep(1 / (n-1))
return res + n
todo = range(5)
futures = []
async with AioPool(size=2) as pool:
for i in todo:
fut = pool.spawn_n(wrk(i), cb, (pool, i))
futures.append(fut)
results = []
for fut in futures:
# there are helpers for result extraction. `flat` one will do
# exactly what's written below
# from asyncio_pool import getres
# results.append(getres.flat(fut))
try:
results.append(fut.result())
except BaseException as e:
results.append(e)
# First error happens for n == 0 in wrk, exception of it is passed to
# callback, callback returns it to us. Second one happens in callback itself
# and is passed to us by pool.
assert all(isinstance(e, ZeroDivisionError) for e in results[:2])
# All n's in `todo` are passed through `wrk` and `cb` (cb adds wrk result
# and # number, passed inside context), except for n == 0 and n == 1.
assert sum(results[2:]) == 2 * (sum(todo) - 0 - 1)
async def exec_usage(todo=range(1,11)):
async with AioPool(size=4) as pool:
futures = pool.map_n(worker, todo)
# While other workers are waiting or active, you can "synchronously"
# execute one task. It does not interrupt others, just waits for pool
# space, then waits for task to finish and then returns it's result.
important_res = await pool.exec(worker(2))
assert 2 == important_res
# You can continue working as usual:
moar = await pool.spawn(worker(10))
assert sum(todo) == sum(f.result() for f in futures)
async def cancel_usage():
async def wrk(*arg, **kw):
await aio.sleep(0.5)
return 1
pool = AioPool(size=2)
f_quick = pool.spawn_n(aio.sleep(0.1))
f12 = await pool.spawn(wrk()), pool.spawn_n(wrk())
f35 = pool.map_n(wrk, range(3))
# At this point, if you cancel futures, returned by pool methods,
# you just won't be able to retrieve spawned task results, task
# themselves will continue working. Don't do this:
# f_quick.cancel()
# use `pool.cancel` instead:
# cancel some
await aio.sleep(0.1)
cancelled, results = await pool.cancel(f12[0], f35[2]) # running and waiting
assert 2 == cancelled # none of them had time to finish
assert 2 == len(results) and \
all(isinstance(res, aio.CancelledError) for res in results)
# cancel all others
await aio.sleep(0.1)
# not interrupted and finished successfully
assert f_quick.done() and f_quick.result() is None
cancelled, results = await pool.cancel() # all
assert 3 == cancelled
assert len(results) == 3 and \
all(isinstance(res, aio.CancelledError) for res in results)
assert await pool.join() # joins successfully
async def details(todo=range(1,11)):
pool = AioPool(size=5)
# This code:
f1 = []
for i in todo:
f1.append(pool.spawn_n(worker(i)))
# is equivalent to one call of `map_n`:
f2 = pool.map_n(worker, todo)
# Afterwards you can await for any given future:
try:
assert 3 == await f1[2] # result of spawn_n(worker(3))
except BaseException:
# exception happened in worker (including CancelledError) will be re-raised
pass
# Or use `asyncio.wait` to handle results in batches (see `iterwait` also):
important_res = 0
more_important = [f1[1], f2[1], f2[2]]
while more_important:
done, more_important = await aio.wait(more_important, timeout=0.5)
# handle result, note it will re-raise exceptions
important_res += sum(f.result() for f in done)
assert important_res == 2 + 2 + 3
# But you need to join, to allow all spawned workers to finish
# (of course you can `asyncio.wait` all of the futures if you want to)
await pool.join()
assert all(f.done() for f in itertools.chain(f1,f2)) # this is guaranteed
assert 2 * sum(todo) == sum(f.result() for f in itertools.chain(f1,f2))
#>>to_here
if __name__ == "__main__":
aio.get_event_loop().run_until_complete(aio.gather(
spawn_n_usage(),
spawn_usage(),
map_usage(),
itermap_usage(),
callbacks_usage(),
exec_usage(),
cancel_usage(),
details()
))