Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add our version of the std lib's "worker pool" #176

Merged
merged 12 commits into from
Feb 22, 2021
Merged

Conversation

goodboy
Copy link
Owner

@goodboy goodboy commented Dec 23, 2020

This is a draft of the tractor way to implement the example from the
"processs pool" in the stdlib's concurrent.futures docs:

https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor-example

Our runtime is of course slower to startup but once up, we get the same performance, this confirms that we need to focus some effort on warm up and teardown times. The mp forkserver method definitely improves startup delay; rolling our own will likely be a good hot spot to play with.

What's really nice is our implementation is done in approx one 10th of the code ;)

Also, do we want offer and interface that yields results as they arrive?

Relates to #175

What y'all think?

Results from running both scripts:

Ours:

(3.9)  >>> python examples/concurrent_actors_primes.py  
112272535095293 is prime: True
112272535095293 is prime: True
115280095190773 is prime: True
112582705942171 is prime: True
1099726899285419 is prime: False
115797848077099 is prime: True
processing took 1.8208930492401123 seconds
script took 2.9907846450805664 seconds

stdlib's

3.9)  >>> python examples/concurrent_futures_primes.py
112272535095293 is prime: True
112582705942171 is prime: True
112272535095293 is prime: True
115280095190773 is prime: True
115797848077099 is prime: True
1099726899285419 is prime: False
processing took 1.8223650455474854 seconds
script took 1.845200538635254 seconds

for i in range(workers):

# this starts a new sub-actor (process + trio runtime) and
# stores it's "portal" for later use to "submit jobs" (ugh).
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo it's..

portals.append(
await tn.start_actor(
f'worker_{i}',
rpc_module_paths=[__name__],
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about changing this name to exposed_modules.. what y'all think?

async def map(
worker_func: Callable[[int], bool],
sequence: List[int]
) -> List[bool]:
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gah should be List[Tuple[int, bool]] pretty sure

# define an async (local) task to collect results from workers
async def collect_portal_result(func, value, portal):

results.append((value, await portal.run(func, n=value)))
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a use case for keeping sync functions as per #77?

I realize we could just have wrapped func here inside an async function wrapper - is the extra overhead something we should help avoid? does it really matter?

portal
)

return results
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could offer an incremental yielding system here but it will require adding a trio mem channel.

Not sure if this will make the example too complicated?

with concurrent.futures.ProcessPoolExecutor() as executor:
start = time.time()

for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

techincally this is yielding results as they arrive underneath so I guess to be fair we should do that too?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I definitely agree with this. It is trivial for a user to reorder their results when necessary from an as-completed stream but impossible to get results as fast as possible when the library is holding them hostage for ordering!

@goodboy
Copy link
Owner Author

goodboy commented Jan 17, 2021

Give that new example a shot with
$TERM -e watch -n 0.1 "pstree -a $$" & python examples/parallelism/we_are_processes.py && kill $!

@goodboy goodboy marked this pull request as ready for review January 18, 2021 15:51
@goodboy
Copy link
Owner Author

goodboy commented Jan 18, 2021

Still TODO:

  • aclosing() around the yielded async gen
  • yield a separate (cloned?) recv_chan and a new submit_job_chan or something like that to get Executor.submit() style

    To make this work, snd_chan should be closed when the jobs are exhausted, so each send_result should receive a clone of snd_chan and do the async with snd_chan: await snd_chan.send(...) trick. (from @richardsheridan)

@goodboy
Copy link
Owner Author

goodboy commented Jan 24, 2021

@richardsheridan i think that get's aclosing() in.

I'll make a new issue for a more "serious" worker pool recipe to continue the .submit() style api yah?

This is a draft of the `tractor` way to implement the example from the
"processs pool" in the stdlib's `concurrent.futures` docs:

https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor-example

Our runtime is of course slower to startup but once up we of course get
the same performance, this confirms that we need to focus some effort
not on warm up and teardown times.  The mp forkserver method definitely
improves startup delay; rolling our own will likely be a good hot spot
to play with.

What's really nice is our implementation is done in approx 10th the code ;)

Also, do we want offer and interface that yields results as they arrive?

Relates to #175
@richardsheridan
Copy link

I'm a little perplexed between the state of this pr and #163 . It seems you made the aclosing changes over there but not over here?

@goodboy
Copy link
Owner Author

goodboy commented Feb 22, 2021

@richardsheridan haven't rebased that PR yet onto this history 😂

This one is just to get the examples in; that one is to document them in the README.

@goodboy
Copy link
Owner Author

goodboy commented Feb 22, 2021

Yah, aclosing() is right here.

@goodboy goodboy merged commit 35775c6 into master Feb 22, 2021
@goodboy goodboy deleted the eg_worker_poolz branch February 22, 2021 14:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants