Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EventSourceResponse performance #77

Closed
maves-knx opened this issue Sep 22, 2023 · 7 comments
Closed

EventSourceResponse performance #77

maves-knx opened this issue Sep 22, 2023 · 7 comments

Comments

@maves-knx
Copy link

Hey @sysid, thanks for providing this library :)

Overview

We (the company I work for) are currently using sse-starlette to build some of our services. With one somewhat high load one, we discovered a potential performance bottleneck.

It seems that this pull request https://github.com/sysid/sse-starlette/pull/55/files introduced an anyio.Lock to prevent a race condition between the _ping and the stream_reponse task from happening. This lock seems to be a bit slow.

Proposal

We experimented with two solutions:

1: Remove the ping task altogether

After reading (skimming? ^^) https://html.spec.whatwg.org/multipage/server-sent-events.html and the comment of the _ping function in your code, it seems that a ping is not really required by the SSE protocol, so we could provide an EventSourceResponse subclass, which just doesn't do it. If there is still some ping required, the user of the library could integrate it themselves in their stream_response

Example implementation

class EventSourceResponseNoPing(EventSourceResponse):
    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:

        async with anyio.create_task_group() as task_group:
            # https://trio.readthedocs.io/en/latest/reference-core.html#custom-supervisors
            async def wrap(func: Callable[[], Coroutine[None, None, None]]) -> None:
                await func()
                # noinspection PyAsyncCall
                task_group.cancel_scope.cancel()

            task_group.start_soon(wrap, partial(self.stream_response, send))
            task_group.start_soon(wrap, self.listen_for_exit_signal)

            if self.data_sender_callable:
                task_group.start_soon(self.data_sender_callable)

            await wrap(partial(self.listen_for_disconnect, receive))

        if self.background is not None:  # pragma: no cover, tested in StreamResponse
            await self.background()

2: Only ping on timeout

An alternative approach we investigated was moving the ping inside the stream_response function, so that the same loop would send the data and the ping, therefore not requiring a lock. This turned out a bit tricky, since we used anyio.fail_after which cancels the running tasks and requires they async generator provided by the users of the library to be able to handle those cancellations. It seems that non-class based async generators struggle with this.

Example code:

class EventSourceResponsePingOnCancel(EventSourceResponse):
    async def stream_response(self, send) -> None:
        await send(
            {
                "type": "http.response.start",
                "status": self.status_code,
                "headers": self.raw_headers,
            }
        )
        it = aiter(self.body_iterator)
        while True:
            try:
                async with anyio.fail_after(self._ping_interval, False):
                    data = await anext(it)
                chunk = ensure_bytes(data, self.sep)
                _log.debug(f"chunk: {chunk.decode()}")
                await send({"type": "http.response.body", "body": chunk, "more_body": True})
            except TimeoutError:
                ping = (
                    ServerSentEvent(comment=f"ping - {datetime.utcnow()}").encode()
                    if self.ping_message_factory is None
                    else ensure_bytes(self.ping_message_factory(), self.sep)
                )
                _log.debug(f"ping: {ping.decode()}")
                await send({"type": "http.response.body", "body": ping, "more_body": True})
            except StopAsyncIteration:
                _log.debug("end of iterator")
                break

        await send({"type": "http.response.body", "body": b"", "more_body": False})

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:

        async with anyio.create_task_group() as task_group:
            # https://trio.readthedocs.io/en/latest/reference-core.html#custom-supervisors
            async def wrap(func: Callable[[], Coroutine[None, None, None]]) -> None:
                await func()
                # noinspection PyAsyncCall
                task_group.cancel_scope.cancel()

            task_group.start_soon(wrap, partial(self.stream_response, send))
            task_group.start_soon(wrap, self.listen_for_exit_signal)

            if self.data_sender_callable:
                task_group.start_soon(self.data_sender_callable)

            await wrap(partial(self.listen_for_disconnect, receive))

        if self.background is not None:  # pragma: no cover, tested in StreamResponse
            await self.background()

It seems the first proposal would be an easy option to implement and provide some flexibility to the users of the library, what do you think?

If we agree on the approach and some naming, we can provide a pull request :)

Here are some results of the tests we performed.

Tests

I ran the following tests:

I started this script via uvicorn sse_test.py:app (I removed some details here, I know I wouldn't have to repeat the position 500 times)

import asyncio
import json
from fastapi import FastAPI, Request
from sse_starlette.sse import EventSourceResponse, EventSourceResponsePingOnCancel, EventSourceResponseNoPing
from starlette.responses import StreamingResponse


position = (json.dumps({
  "position_timestamp": "2023-09-19T11:25:35.286Z",
  "x": 0,
  "y": 0,
  "z": 0,
  "a": 0,
  "b": 0,
  "c": 0,
  # some more fields
}) + '\n')
positions = [position] * 500

sse_clients = 0

app = FastAPI()


@app.get('/stream')
async def message_stream(request: Request):

    async def event_generator():
        global sse_clients
        sse_clients += 1
        print(f"{sse_clients} sse clients connected", flush=True)
        while True:
            # If client closes connection, stop sending events
            if await request.is_disconnected():
                break

            for p in positions:
                yield p

    return EventSourceResponse(event_generator())

And then I connect some clients and count the returned lines.

For example one client via curl http://localhost:8000/stream | pv --line-mode --average-rate > /dev/null

Or 20 clients with a custom go script.

Result 1: current implementation with anyio.Lock

test via one curl connection (avg over 5 min):

95k/s

test via go 20 clients (running for 5 min):

Average number of received events per second: 4922
Max number of received events per second: 4923
min number of received events per second: 4922

Result 2: removing the ping task and the lock

test via one curl connection (avg over 5 min):

263k/s

test via go 20 clients (running for 5 min):

Average number of received events per second: 13636
Max number of received events per second: 13660
min number of received events per second: 13630

Result 3: handling the ping when a timeout occurs

test via one curl connection (avg over 5 min):

129k/s

test via go 20 clients (running for 5 min):

Average number of received events per second: 6095
Max number of received events per second: 6115
min number of received events per second: 6090

Speed-up

Since the actual numbers are not too relevant, here the speed up:

Speedup from test 1 to test 2:

13660 / 4923 = 2.774730855169612
 263 / 95 = 2.768421052631579

Speedup from test 1 to test 3:

6115 / 4923 = 1.2421287832622385
129 / 95 = 1.3578947368421053
@sysid
Copy link
Owner

sysid commented Sep 23, 2023

@maves-knx , cool analysis, much appreciated!

Your first option, providing a dedicated high-performance class with no ping seems like a promising approach. I am happy to take a PR along this idea.

However, when I am running your example with EventSourceResponseNoPing I get socket.send() raised exception. on the uvicorn console when I CTRL-C the curl consumer.
When using EventSourceResponse this does not happen.

I did not have a lot of time diving into this, but I guess this requires some fine tuning.

@sysid
Copy link
Owner

sysid commented Sep 23, 2023

Ok, found the problem in your test code (see my comments):

@app.get("/stream")
async def message_stream(request: Request):
    async def event_generator():
        global sse_clients
        sse_clients += 1
        print(f"{sse_clients} sse clients connected", flush=True)
        while True:
            # If client closes connection, stop sending events
            if await request.is_disconnected():
                break

            for p in positions:
                # fixes socket.send() raised exception, but makes it very slow!!
                # if await request.is_disconnected():
                #     break
                yield p

    return EventSourceResponse(event_generator())

@ejlangev
Copy link
Contributor

ejlangev commented Sep 23, 2023

See #55 (comment) for a suggestion of a way to improve performance roughly within the current setup. The code would roughly be (haven't run this):

    async def _ping(self, send: Send) -> None:
        # Legacy proxy servers are known to, in certain cases, drop HTTP connections after a short timeout.
        # To protect against such proxy servers, authors can send a custom (ping) event
        # every 15 seconds or so.
        # Alternatively one can send periodically a comment line
        # (one starting with a ':' character)
        while self.active:
            await anyio.sleep(self._ping_interval)
            if self.ping_message_factory:
                assert isinstance(self.ping_message_factory, Callable)  # type: ignore  # https://github.com/python/mypy/issues/6864
            ping = (
                ServerSentEvent(comment=f"ping - {datetime.utcnow()}").encode()
                if self.ping_message_factory is None
                else ensure_bytes(self.ping_message_factory(), self.sep)
            )
            _log.debug(f"ping: {ping.decode()}")
            async with self._send_lock:
                if self.active:
                    await send({"type": "http.response.body", "body": ping, "more_body": True})
    async def stream_response(self, send) -> None:
        await send(
            {
                "type": "http.response.start",
                "status": self.status_code,
                "headers": self.raw_headers,
            }
        )
        async for data in self.body_iterator:
            chunk = ensure_bytes(data, self.sep)
            _log.debug(f"chunk: {chunk.decode()}")
            await send({"type": "http.response.body", "body": chunk, "more_body": True})

        async with self._send_lock:
            self.active = False
            await send({"type": "http.response.body", "body": b"", "more_body": False})

And then you just get rid of safe_send in the __call__ method entirely.

@sysid
Copy link
Owner

sysid commented Sep 25, 2023

Thanks @ejlangev , this seems to be to do the trick. I am getting almost same throughput here as with the ping-free solution.

@maves-knx , can you confirm this observations with your tests as well?
Created branch for it: https://github.com/sysid/sse-starlette/tree/feat/performance

@maves-knx
Copy link
Author

Hey, sorry for the late reply :(

I gotta admit I am very surprised how I could have missed that socket exception. Sorry for that :(

But in my new tests, the socket exception also occurs on the feat/performance branch, if I don't adjust my test script above.

So I concluded, that I'll run two tests, one with the socket exception and one without. I also repeated the old test, to get "fresh" results:

(all tests via go 20 clients (running for 5 min))

With socket exception

Original

The original solution doesn't have the problem with the socket exception. No check if is_disconnected is needed (Or should that check always be present)?

Average number of received events per second: 4878
Max number of received events per second: 4878
min number of received events per second: 4878

Removing the ping

Average number of received events per second: 12531
Max number of received events per second: 12554
min number of received events per second: 12525

feat/performance solution

Average number of received events per second: 12388
Max number of received events per second: 12410
min number of received events per second: 12380

So both solutions seem pretty much equivalent here.

Without socket exception

I fixed the socket exception, as was mentioned above, by adding the is_disconnected check.

Original

Average number of received events per second: 3115
Max number of received events per second: 3115
min number of received events per second: 3115

Remove the ping

Average number of received events per second: 4508
Max number of received events per second: 4508
min number of received events per second: 4508

feat/performance solution

Average number of received events per second: 4460
Max number of received events per second: 4460
min number of received events per second: 4460

So, those numbers are a little meh? If there is no other way of preventing the socket error, we produced a slower result?
Or am I missing something, is the feat/performance not raising that exception for you?

sysid added a commit that referenced this issue Sep 30, 2023
@sysid
Copy link
Owner

sysid commented Sep 30, 2023

Lock contention has been resolved. Thanks @ejlangev for the solution and @maves-knx for bringing it to attention!

The socket exception is a different topic and actually works as designed.

@sysid sysid closed this as completed Sep 30, 2023
@whisper-bye
Copy link

I'm a little confused,
should I use is_disconnected or except asyncio.CancelledError to check for client shutdown?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants