Skip to content

Commit

Permalink
Add our version of the std lib's "worker pool"
Browse files Browse the repository at this point in the history
This is a draft of the `tractor` way to implement the example from the
"processs pool" in the stdlib's `concurrent.futures` docs:

https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor-example

Our runtime is of course slower to startup but once up we of course get
the same performance, this confirms that we need to focus some effort
not on warm up and teardown times.  The mp forkserver method definitely
improves startup delay; rolling our own will likely be a good hot spot
to play with.

What's really nice is our implementation is done in approx 10th the code ;)

Also, do we want offer and interface that yields results as they arrive?

Relates to #175
  • Loading branch information
goodboy committed Dec 23, 2020
1 parent f4f39c2 commit 69a4934
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 0 deletions.
119 changes: 119 additions & 0 deletions examples/concurrent_actors_primes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
"""
Demonstration of the prime number detector example from the
``concurrent.futures`` docs:
https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor-example
This uses no extra threads or fancy semaphores besides ``tractor``'s
(TCP) channels.
"""
from contextlib import asynccontextmanager
from typing import List, Callable
import itertools
import math
import time

import tractor
import trio


PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]


def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False

sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True


@asynccontextmanager
async def worker_pool(workers=4):
"""Though it's a trivial special case for ``tractor``, the well
known "worker pool" seems to be the defacto "I want this process
pattern" for most parallelism pilgrims.
"""

async with tractor.open_nursery() as tn:

portals = []
results = []

for i in range(workers):

# this starts a new sub-actor (process + trio runtime) and
# stores it's "portal" for later use to "submit jobs" (ugh).
portals.append(
await tn.start_actor(
f'worker_{i}',
rpc_module_paths=[__name__],
)
)

async def map(
worker_func: Callable[[int], bool],
sequence: List[int]
) -> List[bool]:

# define an async (local) task to collect results from workers
async def collect_portal_result(func, value, portal):

results.append((value, await portal.run(func, n=value)))

async with trio.open_nursery() as n:

for value, portal in zip(sequence, itertools.cycle(portals)):

n.start_soon(
collect_portal_result,
worker_func,
value,
portal
)

return results

yield map

# tear down all "workers"
await tn.cancel()


async def main():
async with worker_pool() as actor_map:

start = time.time()
# for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
for number, prime in await actor_map(is_prime, PRIMES):
print(f'{number} is prime: {prime}')

print(f'processing took {time.time() - start} seconds')

if __name__ == '__main__':
start = time.time()
tractor.run(
main,
loglevel='ERROR',

# uncomment to use ``multiprocessing`` fork server backend
# which gives a startup time boost at the expense of nested
# processs scalability
# start_method='forkserver')
)
print(f'script took {time.time() - start} seconds')
40 changes: 40 additions & 0 deletions examples/concurrent_futures_primes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import time
import concurrent.futures
import math

PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]

def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False

sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True

def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
start = time.time()

for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))

print(f'processing took {time.time() - start} seconds')

if __name__ == '__main__':

start = time.time()
main()
print(f'script took {time.time() - start} seconds')

0 comments on commit 69a4934

Please sign in to comment.