Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Is there an easier way to cancel groups of tasks? #141

Closed
merrellb opened this issue Apr 26, 2017 · 9 comments
Closed

Is there an easier way to cancel groups of tasks? #141

merrellb opened this issue Apr 26, 2017 · 9 comments

Comments

@merrellb
Copy link

merrellb commented Apr 26, 2017

I am struggling with a good way to manage groups of tasks. The best I've come up with seems a bit convoluted, involving a factory function. Something along the lines of:

def cancellable_factory(func, cancel_scopes, shutdown_event=None, desc=""):
    async def scope_func():
        try:
            print(desc,"started")
            with trio.open_cancel_scope() as cancel_scope:
                cancel_scopes.append(cancel_scope)
                await func()
        except Exception as exc:
            print(desc, "crashed: {!r}".format(exc))
            if shutdown_event:
                shutdown_event.set()
        print(desc, "stopped")
    return scope_func

async def wait_shutdown(cancel_scopes, shutdown_event):
    await shutdown_event.wait()
    for cancel_scope in cancel_scopes:
        cancel_scope.cancel()

cancel_scopes = []
shutdown_event = trio.Event()
nursery.spawn(cancellable_factory(blah_a, cancel_scopes, shutdown_event, "blahA"))
nursery.spawn(cancellable_factory(blah_b, cancel_scopes, shutdown_event, "blahB"))
nursery.spawn(cancellable_factory(blah_c, cancel_scopes, shutdown_event, "blahC"))
nursery.spawn(wait_shutdown, cancel_scopes, shutdown_event)

Am I missing an easy way to manage groups of tasks (eg associated with each connection to a server)?

Is it possible to cancel a task directly instead of tracking separate cancel_scopes? Something like:

def cancellable_factory(func, shutdown_event=None, desc=""):
    async def scope_func():
...
async def wait_shutdown(tasks, shutdown_event):
    await shutdown_event.wait()
    for task in tasks:
        task.cancel()

taska = nursery.spawn(cancellable_factory(blah_a, shutdown_event, "blahA"))
taskb = nursery.spawn(cancellable_factory(blah_b, shutdown_event, "blahB"))
taskc = nursery.spawn(cancellable_factory(blah_c, shutdown_event, "blahC"))
nursery.spawn(wait_shutdown, (taska, taskb, taskc), shutdown_event)
@njsmith
Copy link
Member

njsmith commented Apr 26, 2017

That logic looks very like the nursery logic... Would it work to make a dedicated nursery for these children? So "this nursery's children" would be your group?

async def dedicated_supervisor_task(q):
    async with trio.open_nursery() as nursery:
        await q.put(nursery)
        # Hold this nursery open until it's cancelled
        nursery.spawn(trio.sleep_forever)

async def connection_listener(q):
    supervisor_nursery = await q.get()
    while True:
        conn = await accept_next_connection()
        supervisor_nursery.spawn(connection_handler, conn)

async def main():
    async with trio.open_nursery() as nursery:
        q = trio.Queue(1)
        nursery.spawn(dedicated_supervisor_task, q)
        nursery.spawn(connection_listener, q)

Then supervisor_nursery.cancel_scope.cancel() will cancel all the tasks spawned into that nursery and cause dedicated_supervisor_task to exit, as will any exceptions raised by the connection_handler tasks.

There are a few slightly awkward things about this that we might want to polish if it's a useful idiom. But the first question is: is it useful? The exact version above doesn't really accomplish anything useful; I don't know why you want a group of tasks like this, so I just took a set of tasks that in this case could have gone directly under main and put them under a subtask instead :-)

I've been thinking about something like this for cases like "graceful shutdown" in a web server, where you want to stop accepting new connections but let the existing ones finish. One way to do that would be to put listener tasks into one group, and connection handler tasks into another, and then do a graceful shutdown by cancelling the listener tasks then waiting for the connection handler tasks to finish (or optionally cancelling them too if they haven't finished after a timeout).

@merrellb
Copy link
Author

merrellb commented Apr 26, 2017

That isn't quite my use case but probably could be adapted. I need a set of tasks associated with each connection to a listener, so I would need to create a nursery per connection. Do nurseries have much overhead?

A disadvantage of the nursery approach is that it gives up control of shutdown order.

Behind the scenes how does the nursery end the tasks? Does it inject a cancel or just abandon the tasks? I keep looking for a task.cancel() or nursery.task_cancel(task) method without any luck :-)

I am intrigued by the idea of grouping tasks. How might that look?

@njsmith
Copy link
Member

njsmith commented Apr 27, 2017

I need a set of tasks associated with each connection to a listener

Ah, right, then I think a very natural approach would be something like

async def connection_handler(sock):
    with sock:
        async with trio.open_nursery as nursery:
            nursery.spawn(first_worker, nursery)  # spawns more tasks into this nursery as needed

(adjust to fit)

Do nurseries have much overhead?

Not really. And if it becomes a problem I'd rather work on cutting down the overhead than try to create something that's almost-the-same but different :-)

A disadvantage of the nursery approach is that it gives up control of shutdown order.

Hmm, interesting point. I made the conscious decision to initially support only "group" cancellation with cancel scopes to keep things simple internally and keep cancellation and tasks more orthogonal, but I can see how targeting individual tasks might be useful for e.g. rest-for-one supervisors. Plus whatever your mysterious use case is :-)

Behind the scenes how does the nursery end the tasks? Does it inject a cancel or just abandon the tasks?

Eek, yes, it cancels them. Trio never ever ever abandons tasks, that's like the number one rule that the entire rest of the design is warped to fit :-).

I keep looking for a task.cancel() or nursery.task_cancel(task) method without any luck :-)

As alluded to above, there is none (currently) – just nursery.cancel_scope.cancel() which cancels everything inside the nursery. But I'm extremely interested to hear more about what you're trying to do.

@merrellb
Copy link
Author

merrellb commented May 2, 2017

I've been able to greatly simplify my websocket example code (#124) using nurseries to manage per-connection tasks. I still think it might be useful to add a task.cancel() method for fine-grained control but will save that for another issue (when I can think of a good use case :-)

@merrellb merrellb closed this as completed May 2, 2017
@miracle2k
Copy link
Contributor

I'm struggling with something similar, I think.

  • When I get a message from my websocket, I need to start a bunch of tasks.
  • When I get another message from the websocket, I need to shutdown the previous tasks, and replaces them with a new set.
  • And so on.

I built the following based on the suggestion above:

async def generate_task_holder_nurseries(queue):
    while True:
        async with trio.open_nursery() as nursery:
            await queue.put(nursery)

            # Wait until the nursery is cancelled
            nursery.start_soon(trio.sleep_forever)

async with trio.open_nursery() as nursery:
    task_holder_nursery_queue = trio.Queue(1)
    nursery.start_soon(generate_task_holder_nurseries, task_holder_nursery_queue)

    current_task_holder = None 
    async for message in websocket:
          
        if current_task_holder:
            current_task_holder.cancel_scope.cancel()

        current_task_holder = await task_holder_nursery_queue.get()
        for foobar in message:
              current_task_holder.start_soon(task, foobar)

That seems to work, but is there/should there be an easier way? I also think there might still be some KeyboardInterrupt/cancellation issues I need to handle correctly, so it's not even that straightforward.

@smurfix
Copy link
Contributor

smurfix commented May 4, 2018

Another way would be to explicitly call the iterator.

    msg_iter = aiter(websocket)
    message = await anext(msg_iter)
    while True:
        async with trio.open_nursery() as nursery:
            for foobar in message:
                nursery.start_soon(task, foobar)
            try:
                message = await anext(msg_iter)
            finally:
                nursery.cancel_scope.cancel()

@miracle2k
Copy link
Contributor

@smurfix Clever!

@smurfix
Copy link
Contributor

smurfix commented May 5, 2018

@miracle2k except for forgetting the await in front of anext() :-/

@smurfix
Copy link
Contributor

smurfix commented May 5, 2018

Another way would be for the tasks to pass back a local cancel scope for later cancellation, i.e.

def task_runner(task, foobar, task_status=trio.TASK_STATUS_IGNORED):
    with trio.open_cancel_scope() as scope:
        task_status.started(scope)
        await task(foobar)

…
    async with trio.open_nursery() as nursery:
        scopes = []
        try:
            for message in websocket:
                for s in scopes: s.cancel()
                scopes = []
                for foobar in message:
                    scopes.append(await nursery.start(task_runner, task, foobar))
        finally:
            nursery.cancel_scope.cancel()

The advantage (or disadvantage – depends on your actual usecase) is that this way doesn't wait for all the old tasks to clean themselves up before starting new tasks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants