Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AsyncDict vs. cancellation #4

Closed
smurfix opened this issue May 12, 2020 · 17 comments
Closed

AsyncDict vs. cancellation #4

smurfix opened this issue May 12, 2020 · 17 comments

Comments

@smurfix
Copy link

smurfix commented May 12, 2020

AsyncDict doesn't adequately cover the "send a request, wait for a reply" pattern: if the waiter is cancelled before the reply arrives, that reply will languish in the dict indefinitely.
Storing the reply can't depend on is_waiting either, because it could arrive before the async request part completes.

Fixing this probably requires an async context manager.

@takluyver
Copy link

I thought of an abstraction that could have avoided at least one issue I stumbled into:

# Task 1 - making the request
async with foo.expecting(request_id) as find_reply:
    conn.send(request_id, data)
    return await find_reply()

# Task 2 - processing incoming replies
foo.fulfil(request_id, reply_data)

i.e. the .expecting() context manager would make a slot of some kind for the expected reply, and clean that slot up when it exits, whether it was fulfilled or not.

@smurfix
Copy link
Author

smurfix commented May 12, 2020

Exactly what I was thinking, though you probably don't need an async context manager for that.

@belm0
Copy link
Contributor

belm0 commented May 13, 2020

this is supposed to be a mere dictionary, with no policy about keys

if we integrate this context manager, it would need a policy about not reusing keys-- which is strange for a dictionary

I wonder if this context manager can be built on top of AsyncDictionary.

@belm0
Copy link
Contributor

belm0 commented May 13, 2020

if the waiter is cancelled before the reply arrives, that reply will languish in the dict indefinitely

i.e. the .expecting() context manager would make a slot of some kind for the expected reply, and clean that slot up when it exits, whether it was fulfilled or not

I think there is also the case where the reply never comes, e.g. if the request was lost in transit. The worst case is where a placeholder was created, the request was issued over the network, the caller was cancelled, and then the request was dropped (perhaps due to unavailability of the server). So in a simple implementation that can still result in metadata existing indefinitely.

The context manager is probably best integrated into the request client, which knows details of the request lifetime and reply state.

@takluyver
Copy link

If the task making the request is cancelled, then leaving the .expecting() context manager should clean up its 'reply slot'. If there's no timeout or cancellation, then it won't clean up, but that's not really about this API, it's a general hazard of waiting for something that may never happen.

The general bit I missed when trying to do this was the 'reply slot', aka what other frameworks call a future. With that, it's easy to make on top of a plain dict.

@belm0
Copy link
Contributor

belm0 commented May 15, 2020

If there's no timeout or cancellation, then it won't clean up, but that's not really about this API, it's a general hazard of waiting for something that may never happen.

But even when the requesting task is cancelled, the dict needs to remember that there is say an outstanding request already on the network, and to ignore the reply. I'm suggesting it's better managed by the code that is writing to the AsyncDictionary, rather than the AsyncDictionary itself.

@smurfix
Copy link
Author

smurfix commented May 15, 2020

the dict needs to remember that there is say an outstanding request already on the network, and to ignore the reply

Why should it? The client shall be required to use the context manager, which remembers all non-cancelled requests. Thus, any reply that arrives with an unknown key can safely be ignored.

In any case, the pattern I commonly use for async clients looks like this:

requests = dict()
…
async def send_request(data):
    key = gen_key()
    requests[key] = evt = trio.Event()
    try:
        await send("req",key,data)
        await evt.wait()
        return requests[key]
    finally:
        del requests[key]
…
async def receive_loop():
    while True:
        key,reply = await recv_message()
        try:
            evt = requests[key]
        except KeyError:
            pass
        else:
            requests[key] = reply
            evt.set()

(Bonus points for sending cancellations to the server when a single request is cancelled, packing results in an Outcome so that you can raise a client error when the request errored out, sending errors if the reader dies, and whatnot.)

This code doesn't need an AsyncDictionary, and in fact cannot benefit from one that doesn't have a context manager.

@belm0
Copy link
Contributor

belm0 commented Sep 16, 2020

I've come around to understanding the value of it, thank you 👍

I wonder if AsyncDictionary can cover it by only adding this simple expecting() context manager:

replies = AsyncDictionary()

# send request
with replies.expecting(key):
    await ...  # some request causing `key` to eventually appear
    result = await replies.pop_wait(key)

# receive loop
async for key, result in channel:
    if replies.is_waiting(key):
        replies[key] = result

@takluyver
Copy link

So is_waiting(k) would return True if a task is waiting in get_wait(k) or pop_wait(k), or if a task is currently inside the with x.expecting(k) block?

I think that works, though it would be easy to forget the .expecting() block - 99% of the time the sending task will get to pop_wait() before the receiving task checks is_waiting(), so you could accidentally rely on that. The workaround I've written for Jeepney forces you to use the .expecting() method as part of retrieving the key.

@takluyver
Copy link

Would it make sense, rather than adding to AsyncDictionary, to define a more restrictive 'expecting' async dictionary (ExpectationManager 😛 ), which would guarantee:

  • You can only retrieve a value inside a with x.expecting(key) block
  • You can only set a key if any task is currently expecting it

@belm0
Copy link
Contributor

belm0 commented Sep 16, 2020

I think that works, though it would be easy to forget the .expecting() block

The same is true for the receive loop checking replies.is_waiting(key). But I assume this is fairly low-level code and a knowledgeable person is writing both the send request and receive loop. In my use case of a multiplexing client, each of these appears only once.

It's attractive because it fits within the existing AsyncDictionary API, and the use is optional. (I suspect there are valid uses of AsyncDictionary without expecting() or is_waiting() but can't yet prove it.)

@belm0
Copy link
Contributor

belm0 commented Sep 19, 2020

replies = AsyncDictionary()

# send request
with replies.expecting(key):
    await ...  # some request causing `key` to eventually appear
    result = await replies.pop_wait(key)

# receive loop
async for key, result in channel:
    if replies.is_waiting(key):
        replies[key] = result

There is still a race here if the send task is cancelled just as the replies entry is inserted.

So I'd have to also change the semantics of pop_wait() to always delete the key on exit.

The request code would need to delete the entry in a manual finally, or expecting() itself would need to be responsible for deleting the key.

--> rename expecting() to expect_and_finalize()

@smurfix
Copy link
Author

smurfix commented Sep 19, 2020

I wouldn't rename that. To me the semantics of expecting state clearly enough that when the block exits there is no longer anything expecting a reply.

Personally I'd structure the API differently:

async with replies.expecting(key) as result:
    await generate_the_request(key)
process_reply(result.value)

i.e. the async context returned by expecting would have an __aexit__ that's responsible for waiting for the reply and for storing the result in result.value. If the receive loop gets an error / is cancelled, accessing the value should raise the corresponding exception.

I wouldn't call this class a dictionary because you'd never use replies[key]. Getting is going to be mediated by the async context, or pop_wait(). Setting the actual value must also be async because AnyIO doesn't have a sync way to trigger an event.

@belm0
Copy link
Contributor

belm0 commented Sep 20, 2020

I think I'm ready to give up on AsyncDictionary:

  1. though I suspect synchronous set/get and blocking get/pop might have some use cases-- particularly, the blocking get can be used for broadcast-- I don't have one myself. And a principle of trio-util is for things to actually be used (particularly by me, so I can understand the API space).
  2. though I prototyped support for the context manager on top of the existing API, the result is less than simple in terms of API and implementation. And I don't want to carry complexity due to the part of the API that doesn't have a proven use case (see 1).

So the next question is, is a multiplexed request/reply utility a good value for trio-util? A few points causing me to not be too thrilled about it at the moment:

  • my use case is more complex than what we've discussed here because I also need to support streaming replies (pubsub). So in addition to trio.Event and reply data, dictionary entries may also hold a memory send channel
  • the actual multiplexing pattern is fairly simple-- I don't know if it needs encapsulation

@smurfix
Copy link
Author

smurfix commented Sep 20, 2020

IMHO the pattern is simple enough, but as you noted the devil is in the detail and implementation requirements differ wildly. For instance, if that request was to an SQL server the first thing I'd do is to implement sending a kill command to the server when somebody cancels a request on the client.

My simple ValueEvent has the same problem: the code responsible for generating the value might want to be cancelled when the client decides that it no longer wants the result. I tried to solve that by adding an optional scope argument. While that enables a possible workaround it's still not a good fit to Structured Concurrency patterns and doesn't even try to propagate cancellations both ways.

@belm0
Copy link
Contributor

belm0 commented Oct 27, 2020

AsyncDictionary has been removed from the package

@belm0 belm0 closed this as completed Oct 27, 2020
@milahu
Copy link

milahu commented Feb 15, 2024

for reference

class AsyncDictionary was removed in f884cf8

last version of class AsyncDictionary

use case

handle dependencies between events that arrive in random order

when a dependency is missing, the event handler waits
until another event handler adds the dependency
or fails on timeout

see also

python asyncio asynchronously fetch data by key from a dict when the key becomes available

async_dict

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants