-
Notifications
You must be signed in to change notification settings - Fork 12
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add our version of the std lib's "worker pool"
This is a draft of the `tractor` way to implement the example from the "processs pool" in the stdlib's `concurrent.futures` docs: https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor-example Our runtime is of course slower to startup but once up we of course get the same performance, this confirms that we need to focus some effort not on warm up and teardown times. The mp forkserver method definitely improves startup delay; rolling our own will likely be a good hot spot to play with. What's really nice is our implementation is done in approx 10th the code ;) Also, do we want offer and interface that yields results as they arrive? Relates to #175
- Loading branch information
Showing
2 changed files
with
159 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
""" | ||
Demonstration of the prime number detector example from the | ||
``concurrent.futures`` docs: | ||
https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor-example | ||
This uses no extra threads or fancy semaphores besides ``tractor``'s | ||
(TCP) channels. | ||
""" | ||
from contextlib import asynccontextmanager | ||
from typing import List, Callable | ||
import itertools | ||
import math | ||
import time | ||
|
||
import tractor | ||
import trio | ||
|
||
|
||
PRIMES = [ | ||
112272535095293, | ||
112582705942171, | ||
112272535095293, | ||
115280095190773, | ||
115797848077099, | ||
1099726899285419] | ||
|
||
|
||
def is_prime(n): | ||
if n < 2: | ||
return False | ||
if n == 2: | ||
return True | ||
if n % 2 == 0: | ||
return False | ||
|
||
sqrt_n = int(math.floor(math.sqrt(n))) | ||
for i in range(3, sqrt_n + 1, 2): | ||
if n % i == 0: | ||
return False | ||
return True | ||
|
||
|
||
@asynccontextmanager | ||
async def worker_pool(workers=4): | ||
"""Though it's a trivial special case for ``tractor``, the well | ||
known "worker pool" seems to be the defacto "I want this process | ||
pattern" for most parallelism pilgrims. | ||
""" | ||
|
||
async with tractor.open_nursery() as tn: | ||
|
||
portals = [] | ||
results = [] | ||
|
||
for i in range(workers): | ||
|
||
# this starts a new sub-actor (process + trio runtime) and | ||
# stores it's "portal" for later use to "submit jobs" (ugh). | ||
portals.append( | ||
await tn.start_actor( | ||
f'worker_{i}', | ||
rpc_module_paths=[__name__], | ||
) | ||
) | ||
|
||
async def map( | ||
worker_func: Callable[[int], bool], | ||
sequence: List[int] | ||
) -> List[bool]: | ||
|
||
# define an async (local) task to collect results from workers | ||
async def collect_portal_result(func, value, portal): | ||
|
||
results.append((value, await portal.run(func, n=value))) | ||
|
||
async with trio.open_nursery() as n: | ||
|
||
for value, portal in zip(sequence, itertools.cycle(portals)): | ||
|
||
n.start_soon( | ||
collect_portal_result, | ||
worker_func, | ||
value, | ||
portal | ||
) | ||
|
||
return results | ||
|
||
yield map | ||
|
||
# tear down all "workers" | ||
await tn.cancel() | ||
|
||
|
||
async def main(): | ||
async with worker_pool() as actor_map: | ||
|
||
start = time.time() | ||
# for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)): | ||
for number, prime in await actor_map(is_prime, PRIMES): | ||
print(f'{number} is prime: {prime}') | ||
|
||
print(f'processing took {time.time() - start} seconds') | ||
|
||
if __name__ == '__main__': | ||
start = time.time() | ||
tractor.run( | ||
main, | ||
loglevel='ERROR', | ||
|
||
# uncomment to use ``multiprocessing`` fork server backend | ||
# which gives a startup time boost at the expense of nested | ||
# processs scalability | ||
# start_method='forkserver') | ||
) | ||
print(f'script took {time.time() - start} seconds') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
import time | ||
import concurrent.futures | ||
import math | ||
|
||
PRIMES = [ | ||
112272535095293, | ||
112582705942171, | ||
112272535095293, | ||
115280095190773, | ||
115797848077099, | ||
1099726899285419] | ||
|
||
def is_prime(n): | ||
if n < 2: | ||
return False | ||
if n == 2: | ||
return True | ||
if n % 2 == 0: | ||
return False | ||
|
||
sqrt_n = int(math.floor(math.sqrt(n))) | ||
for i in range(3, sqrt_n + 1, 2): | ||
if n % i == 0: | ||
return False | ||
return True | ||
|
||
def main(): | ||
with concurrent.futures.ProcessPoolExecutor() as executor: | ||
start = time.time() | ||
|
||
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)): | ||
print('%d is prime: %s' % (number, prime)) | ||
|
||
print(f'processing took {time.time() - start} seconds') | ||
|
||
if __name__ == '__main__': | ||
|
||
start = time.time() | ||
main() | ||
print(f'script took {time.time() - start} seconds') |