Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Best way to cancel websocket.recv #797

Closed
srkunze opened this issue Jul 17, 2020 · 14 comments
Closed

Best way to cancel websocket.recv #797

srkunze opened this issue Jul 17, 2020 · 14 comments

Comments

@srkunze
Copy link

srkunze commented Jul 17, 2020

Would it possible to somehow be able to make cancelation easier? The following raises an exception:

async with websockets.connect(uri) as websocket:
    while True:
        await asyncio.wait([recv, canceled.wait()], return_when=asyncio.FIRST_COMPLETED)
        if canceled.is_set():
            break
Task exception was never retrieved
future: <Task finished coro=<WebSocketCommonProtocol.recv() done, defined at .../lib/python3.6/site-packages/websockets/protocol.py:443> exception=ConnectionClosedOK('code = 1000 (OK), no reason',)>
Traceback (most recent call last):
  File ".../lib/python3.6/site-packages/websockets/protocol.py", line 509, in recv
    await self.ensure_open()
  File ".../lib/python3.6/site-packages/websockets/protocol.py", line 803, in ensure_open
    raise self.connection_closed_exc()
websockets.exceptions.ConnectionClosedOK: code = 1000 (OK), no reason

After some research, I found the following work-around including some wrapping and manually cancelation:

async with websockets.connect(uri) as websocket:
    while True:
        recv = asyncio.Task(websocket.recv())
        await asyncio.wait([recv, canceled.wait()], return_when=asyncio.FIRST_COMPLETED)
        if canceled.is_set():
            recv.cancel()
            break

This was my first naive version without cancalation. And I actually would love to return to some similar form of this:

async with websockets.connect(uri) as websocket:
    while True:
        data = await websocket.recv()

Maybe something such as this, where canceled=asyncio.Event():

async with websockets.connect(uri) as websocket:
    while not canceled.is_set():
        data = await websocket.recv([canceled], default=None)

Maybe, there is a better concept of cancelation I am unaware of.

@aaugustin
Copy link
Member

Unless I missed something, this could happen with any coroutine that may raise an exception. As such, this isn't a websockets issue, but rather an asyncio issue.

At some point, asyncio automatically creates a task which wraps the recv() coroutine. If you forget about the task and recv() eventually raises an exception, the exception is logged.

I believe this may be fixed in more recent versions of Python.

This is also one of the things that can't happen on trio, which is better designed in this regard.

I'm not sure I can do anything about this in websockets. I'm leaving the issue open in case others feel like weighing in.

@srkunze
Copy link
Author

srkunze commented Jul 18, 2020

@aaugustin thanks for reaching out. I am going to look into trio right now to see if that solves the issue in a cleaner way. I will report back as soon as I converted my script to it. :-)

I'm not sure I can do anything about this in websockets.

If it fit in the general design, I would like to have some sort of a cancelation event as given in the last example above.
Why? Because the websocket was closed because of a ConnectionClosedOK. IMO, that one doesn't warrant an exception and therefore further exception handling. In my point of view, it's just more code to write without any benefit. :-/

@cjerdonek
Copy link
Collaborator

Instead of trying to "cancel" the connection, have you tried simply closing it?

My understanding of the API changes in 7.0 were that it eliminated the need to handle CancelledError, etc. I know this in part because I was one who had filed issues prior to 7.0 related to cancellations, and the API changes in 7.0 were made to address those issues (if I remember right).

Here is part of the warning note from the 7.0 changelog:

Previously, connections handlers were canceled. Now, connections are closed with close code 1001 (going away). From the perspective of the connection handler, this is the same as if the remote endpoint was disconnecting. This removes the need to prepare for CancelledError in connection handlers.

Lastly, if you think cancellation really is needed, can you explain why? What is the underlying goal you're trying to achieve?

@cjerdonek
Copy link
Collaborator

Also, in terms of the ConnectionClosedOK exception in your first message, you might want to consider using the async for message in websocket: approach mentioned in the docs.

That handles ConnectionClosedOK for you, as you can see in its code: https://github.com/aaugustin/websockets/blob/017a072705408d3df945e333e5edd93e0aa8c706/src/websockets/protocol.py#L435-L439

@srkunze
Copy link
Author

srkunze commented Jul 18, 2020

if you think cancellation really is needed, can you explain why? What is the underlying goal you're trying to achieve?

Sure:

uri = f'....{token}...'   # token which needs to regenerated from time to time
async with websockets.connect(uri) as websocket:
    # send init messages
    while True:
        await ws.recv()  # when token has reached its end of life, stop doing this and break loop
    # send last words
    # server closes connection BECAUSE of our last messages

consider using the async for message in websocket

That seem to handle the ConnectionClosedOK quite nicely. I still need to wrap my head around using this with trio.

@cjerdonek
Copy link
Collaborator

In your case, since you're not doing anything with the message you should just be able to do:

# send init messages
async for message in websocket:
    pass
# send last words

@srkunze
Copy link
Author

srkunze commented Jul 18, 2020

@cjerdonek Of course I do something with it but I guess it's irrelevant to the control flow. :-)

Additionally, the websocket would never be closed this way because the server will close it BECAUSE we send our last words. That is, we need to break the loop in order to do so and show the server we are ready to be closed.

    # send last words
    # server closes connection BECAUSE of our last messages

@cjerdonek
Copy link
Collaborator

Additionally, the websocket would never be closed this way

You can't be sure that the connection won't get closed before you close it below (e.g. due to the client closing it, network errors, etc). That's the reason you need to be prepared to handle ConnectionClosedOK earlier in your logic.

@srkunze
Copy link
Author

srkunze commented Jul 18, 2020

That's the reason you need to be prepared to handle ConnectionClosedOK earlier in your logic.

That's actually a very good point. Thanks for that hint. In all other cases, ConnectionClosedError should be raised then.

And still need to find a way to break that loop after X seconds.

@srkunze
Copy link
Author

srkunze commented Jul 18, 2020

😂 time to grab some sleep:

TypeError: trio.run received unrecognized yield message
<Future pending cb=[_chain_future.<locals>._call_check_cancel()
at /usr/lib/python3.8/asyncio/futures.py:360]>. Are you trying to use a library
written for some other framework like asyncio? That won't work without some
kind of compatibility shim.

Code looks like this this time:

uri = f'....{token}...'   # token needs to regenerated at 'refresh_at'
async with websockets.connect(uri) as websocket:
    # send request to start
    with trio.move_on_at(refresh_at):
        async for message in websocket:
            # do something useful
    # send request to close

Still thanks for pointing out some very important details!

@aaugustin
Copy link
Member

aaugustin commented Jul 19, 2020

Ah. I'm sorry, I sent you down a dead end :-( At this time websockets doesn't work with trio at all.

The reason why I mentioned trio is because it has cancellation built into its design, unlike asyncio where cancellation was bolted on late in the implementation process. As a consequence Task exception was never retrieved simply cannot happen in trio. However, this isn't a practical solution here.

Suggested solution:

def process_messages(ws):
    while True:
        await ws.recv()


uri = f'....{token}...'   # token which needs to regenerated from time to time
timeout = 60  # if tokens are valid for 60 seconds

async with websockets.connect(uri) as ws:
    # send init messages
    # when token has reached its end of life, cancel processing loop
    yield from asyncio.wait_for(process_messages(ws), timeout)
    # send last words
    # server closes connection BECAUSE of our last messages

    # if you really want to let the server close the connection, you can do this:
    await ws.wait_closed()
    # you can also no nothing, in which case the client will close the connexion
    # when exiting the `with connect(...)` block, which should be fine in practice.

The key is to use wait_for, which cancels the task it's waiting for if a timeout occurs, which prevents the Task exception was never retrieved warning.

@srkunze
Copy link
Author

srkunze commented Jul 19, 2020

Ah. I'm sorry, I sent you down a dead end :-(

Don't worry. I learned quite a few things in those hours. Thanks for pointing out the usage and solution with asyncio.wait_for. It seems I'm going to convert things back to asyncio. :-D

At this time websockets doesn't work with trio at all.

I was considering opening a new issue for trio compatibility but there's already one: #466

But if you say, trio won't happen that's fine and I will raise my point to python-idea/python-dev about introducing block-based cancelation like trio's one (a context manager).

@aaugustin
Copy link
Member

Support for trio isn't going happen next week, but it's definitely possible in the next few months.

@srkunze
Copy link
Author

srkunze commented Jul 19, 2020

@aaugustin If you need some specific help, let me know. :-)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants