Skip to content

Commit

Permalink
Drop the "main" task via kwarg idea
Browse files Browse the repository at this point in the history
Stop worrying about a "main task" in each actor and instead add an
additional `ActorNursery.run_in_actor()` method which wraps calls
to create an actor and run a lone RPC task inside it. Note this
adjusts the public API of `ActorNursery.start_actor()` to drop
its `main` kwarg.

The dirty deats of making this possible:
- each spawned RPC task is now tracked with a specific cancel scope such
  that when the actor is cancelled all ongoing responders are cancelled
  before any IPC/channel machinery is closed (turns out that spawning
  new actors from `outlive_main=True` actors was probably borked before
  finally getting this working).
- make each initial RPC response be a packet which describes the
  `functype` (eg. `{'functype': 'asyncfunction'}`) allowing for async
  calls/submissions by client actors (this was required to make
  `run_in_actor()` work - `Portal._submit()` is the new async method).
- hooray we can stop faking "main task" results for daemon actors
- add better handling/raising of internal errors caught in the bowels of
  the `Actor` itself.
- drop the rpc spawning nursery; just use the `Actor._root_nursery`
- only wait on `_no_more_peers` if there are existing peer channels that
  are actually still connected.
- an `ActorNursery.__aexit__()` now implicitly waits on `Portal.result()` on close
  for each `run_in_actor()` spawned actor.
- handle cancelling partial started actors which haven't yet connected
  back to the parent

Resolves #24
  • Loading branch information
Tyler Goodlet committed Aug 2, 2018
1 parent f726bd8 commit bb13b79
Show file tree
Hide file tree
Showing 4 changed files with 381 additions and 257 deletions.
7 changes: 3 additions & 4 deletions tractor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,22 @@ async def _main(async_fn, args, kwargs, name, arbiter_addr):
log.info(f"Arbiter seems to exist @ {host}:{port}")
actor = Actor(
name or 'anonymous',
main=main,
arbiter_addr=arbiter_addr,
**kwargs
)
host, port = (host, 0)
else:
# start this local actor as the arbiter
# this should eventually get passed `outlive_main=True`?
actor = Arbiter(
name or 'arbiter', main=main, arbiter_addr=arbiter_addr, **kwargs)
name or 'arbiter', arbiter_addr=arbiter_addr, **kwargs)

# ``Actor._async_main()`` creates an internal nursery if one is not
# provided and thus blocks here until it's main task completes.
# Note that if the current actor is the arbiter it is desirable
# for it to stay up indefinitely until a re-election process has
# taken place - which is not implemented yet FYI).
return await _start_actor(actor, host, port, arbiter_addr=arbiter_addr)
return await _start_actor(
actor, main, host, port, arbiter_addr=arbiter_addr)


def run(
Expand Down
Loading

0 comments on commit bb13b79

Please sign in to comment.