Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Default max_queue blocks websocket cancellation with high traffic #1555

Open
btschwertfeger opened this issue Nov 25, 2024 · 6 comments · May be fixed by #1556
Open

Default max_queue blocks websocket cancellation with high traffic #1555

btschwertfeger opened this issue Nov 25, 2024 · 6 comments · May be fixed by #1556
Labels

Comments

@btschwertfeger
Copy link

I'm working on the python-kraken-sdk and encounter strange behavior, where the websocket connection doesn't get properly closed when the connection is continuously receiving messages filling up the queue (max_queue (https://websockets.readthedocs.io/en/stable/reference/asyncio/client.html#module-websockets.asyncio.client).

This happens locally (ubuntu 24.04, Python 3.11.9) and in CI (GitHub actions, windows-latest, ubuntu-latest, py3.11 and py3.12, e.g. https://github.com/btschwertfeger/python-kraken-sdk/actions/runs/12016162113 and https://github.com/btschwertfeger/python-kraken-sdk/actions/runs/12016429517)

I'm using websockets==14.1.

In the following is the log from Ubuntu (and other runs when having max_queue not touched):

INFO     tests.spot.helper:helper.py:97 ["BTC/USD", {"channel": "book", "type": "update", "data": [{"symbol": "BTC/USD", "bids": [{"price": 95398.3, "qty": 0.25}], "asks": [], "checksum": 329006635, "timestamp": "2024-11-25T18:10:51.622559Z"}]}]
DEBUG    websockets.client:protocol.py:745 > CLOSE 1000 (OK) [2 bytes]
DEBUG    websockets.client:protocol.py:175 = connection is CLOSING
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T18:10:51.656589Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T18:10:51.656612Z"}]}' [181 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T18:10:51.660735Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T18:10:51.660780Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T18:10:51.660828Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T18:10:51.661035Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T18:10:51.661093Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T18:10:51.661109Z"}]}' [181 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T18:10:51.665449Z"}]}' [217 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T18:10:51.666942Z"}]}' [217 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T18:10:51.667265Z"}]}' [217 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T18:10:51.667316Z"}]}' [217 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T18:10:51.667336Z"}]}' [217 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T18:10:51.667410Z"}]}' [216 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T18:10:51.669715Z"}]}' [252 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T18:10:51.675411Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T18:10:51.675436Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T18:10:51.675518Z"}]}' [252 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T18:10:51.675575Z"}]}' [181 bytes]
DEBUG    websockets.client:connection.py:799 % sending keepalive ping
=========================== short test summary info ============================
FAILED tests/spot/test_spot_orderbook.py::test_add_book - Failed: Timeout >60.0s

... when setting the max_queue to None (not recommended by the documentation), everything is fine:

INFO     tests.spot.helper:helper.py:97 ["BTC/USD", {"channel": "book", "type": "update", "data": [{"symbol": "BTC/USD", "bids": [{"price": 94794.3, "qty": 0.2}], "asks": [], "checksum": 3009784427, "timestamp": "2024-11-25T19:55:47.910197Z"}]}]
DEBUG    websockets.client:protocol.py:745 > CLOSE 1000 (OK) [2 bytes]
DEBUG    websockets.client:protocol.py:175 = connection is CLOSING
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.911987Z"}]}' [217 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.912155Z"}]}' [252 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.912337Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.912350Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.915588Z"}]}' [217 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.918207Z"}]}' [181 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.918814Z"}]}' [181 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.918849Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.918940Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.919616Z"}]}' [252 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.920569Z"}]}' [252 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.922159Z"}]}' [217 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.922502Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.923636Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.929364Z"}]}' [251 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.929945Z"}]}' [217 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.930777Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.930900Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.930995Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.932799Z"}]}' [217 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.932858Z"}]}' [251 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.934082Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.934498Z"}]}' [252 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.934839Z"}]}' [218 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.938315Z"}]}' [217 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.938505Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.938646Z"}]}' [252 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.939445Z"}]}' [217 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.939583Z"}]}' [217 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.940378Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.941481Z"}]}' [252 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.942796Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.947044Z"}]}' [252 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.947067Z"}]}' [251 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.949671Z"}]}' [251 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.952241Z"}]}' [217 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.952430Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.953434Z"}]}' [252 bytes]
DEBUG    websockets.client:protocol.py:599 < CLOSE 1000 (OK) [2 bytes]
DEBUG    websockets.client:protocol.py:649 < EOF
DEBUG    websockets.client:protocol.py:757 > EOF
DEBUG    websockets.client:protocol.py:175 = connection is CLOSED
DEBUG    websockets.client:connection.py:960 x closing TCP connection
WARNING  kraken.spot.websocket.connectors:connectors.py:259 Connection closed

I'm not that into how the websockets implementation of yours work, but it doesn't seem to respect connections with high traffic. Proper cancellation should be done in any case, regardless of the size or number of queued messages.

This can be reproduced by running the following:

git clone https://github.com/btschwertfeger/python-kraken-sdk
cd python-kraken-sdk
git switch 
uv venv --pyhton=3.11
source .venv/bin/activate
uv pip install ".[test]"
python3 -m pytest -vv --log-cli-level=DEBUG tests/spot/test_spot_orderbook.py::test_add_book

One may need to run this test multiple times in order to get the situation with too much messages during cancellation. An alternative would be to add more books to the test case (e.g. await orderbook.add_book(pairs=["BTC/USD", "DOT/USD", "ETH/USD", "MATIC/USD", "BTC/EUR"])).

@aaugustin
Copy link
Member

This is a plausible bug. If I understand correctly, to reproduce, I need to:

  • saturate the queue (simply by not reading messages received by the connection)
  • close the connection <-- this is where I'm not 100% sure -- can you confirm it's a simple await websocket.close()? Or is it about cancelling recv() in the sense of cancelling an asyncio task?

@aaugustin
Copy link
Member

As a mitigation, you can use websockets.legacy.client.connect instead of websockets.connect.

See https://websockets.readthedocs.io/en/latest/project/changelog.html and https://websockets.readthedocs.io/en/latest/howto/upgrade.html for context.

@btschwertfeger
Copy link
Author

btschwertfeger commented Nov 26, 2024

This is a plausible bug. If I understand correctly, to reproduce, I need to:

  • saturate the queue (simply by not reading messages received by the connection)
  • close the connection <-- this is where I'm not 100% sure -- can you confirm it's a simple await websocket.close()? Or is it about cancelling recv() in the sense of cancelling an asyncio task?

Unfortunately I'm not sure about websocket.close() and recv().

I tried debugging it using the following script, debugpy, and vscode, but until now I had not much time and didn't managed to access the "asyncio0" thread. Do you have any recommendations how to debug it?

# repro.py
import asyncio
import logging
import json
from websockets.asyncio.client import connect

logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)


class MyClient:

    def __init__(self):
        self.keep_alive = True

    async def start(self):
        if hasattr(self, "task") and not self.task.done():
            return
        self.task: asyncio.Task = asyncio.create_task(self.run())

    async def stop(self):
        self.keep_alive = False
        if hasattr(self, "task") and not self.task.done():
            await self.task

    async def run(self):

        event: asyncio.Event = asyncio.Event()
        async with connect(
            f"wss://ws.kraken.com/v2",
            ping_interval=30,
            # max_queue=None,  # having this enabled doesn't cause problems
        ) as socket:

            if not event.is_set():
                # subscribe to some orderbook events
                # also try to reduce "depth" to 10 and just take DOT/EUR  
                await socket.send(
                    json.dumps(
                        {
                            "method": "subscribe",
                            "params": {
                                "channel": "book",
                                "symbol": [
                                    "BTC/USD",
                                    "DOT/USD",
                                    "ETH/USD",
                                    "MATIC/USD",
                                    "BTC/EUR",
                                ],
                                "depth": 100
                            },
                        }
                    )
                )
                event.set()

            while self.keep_alive:
                try:
                    _message = await asyncio.wait_for(socket.recv(), timeout=10)
                except TimeoutError:
                    pass
                except asyncio.CancelledError:
                    self.keep_alive = False
                else:
                    try:
                        message = json.loads(_message)
                    except ValueError:
                        pass
                    else:
                        pass

    async def __aenter__(self):
        await self.start()
        return self

    async def __aexit__(self, *args, **kwargs):
        await self.stop()


async def main():
    async with MyClient():
        await asyncio.sleep(2)


if __name__ == "__main__":
    asyncio.run(main())

BTW: Thanks for the note about the legacy asyncio stuff, but I just set max_queue=None until this is resolved.

@btschwertfeger
Copy link
Author

I opened a pull request that addresses the issue #1556.

@aaugustin
Copy link
Member

Thank you, that helps.

@aaugustin
Copy link
Member

To make the best use of your time, I recommend that you wait until I have time to review the PR before doing more work (e.g. it isn't useful to polish the code and make CI green if we decide to take a different approach to fixing the issue).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
2 participants