Skip to content

Commit

Permalink
Merge pull request #78 from sysid/feat/performance
Browse files Browse the repository at this point in the history
feat: remove locks from hot path
  • Loading branch information
sysid authored Sep 30, 2023
2 parents 6d60f92 + a428eae commit 159bbec
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 17 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 1.6.5
current_version = 1.8.0
commit = True
tag = False
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(\-(?P<release>[a-z]+)(?P<build>\d+))?
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.6.5
1.8.0
Empty file added examples/__init__.py
Empty file.
58 changes: 58 additions & 0 deletions examples/load_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
################################################################################
# Load test
# e.g. test for lock contention: https://github.com/sysid/sse-starlette/issues/77
#
# to run it:
# PYTHONPATH=. uvicorn examples.load_test:app
# curl http://localhost:8000/stream | pv --line-mode --average-rate > /dev/null
################################################################################

import uvicorn
import json
from fastapi import FastAPI, Request
from sse_starlette.sse import EventSourceResponse

position = (
json.dumps(
{
"position_timestamp": "2023-09-19T11:25:35.286Z",
"x": 0,
"y": 0,
"z": 0,
"a": 0,
"b": 0,
"c": 0,
# some more fields
}
)
+ "\n"
)
positions = [position] * 500

sse_clients = 0

app = FastAPI()


@app.get("/stream")
async def message_stream(request: Request):
async def event_generator():
global sse_clients
sse_clients += 1
print(f"{sse_clients} sse clients connected", flush=True)
while True:
# If client closes connection, stop sending events
if await request.is_disconnected():
break

for p in positions:
# fixes socket.send() raised exception, but makes it very slow!!
if await request.is_disconnected():
break
yield p

return EventSourceResponse(event_generator())


if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000, log_level="error", log_config=None) # type: ignore
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = sse-starlette
version = 1.6.5
version = 1.8.0
description = "SSE plugin for Starlette"
long_description = file: README.md, LICENSE
long_description_content_type = text/markdown
Expand Down
2 changes: 1 addition & 1 deletion sse_starlette/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from sse_starlette.sse import EventSourceResponse, ServerSentEvent

__all__ = ["ServerSentEvent", "EventSourceResponse"]
__version__ = "1.6.5"
__version__ = "1.8.0"
20 changes: 12 additions & 8 deletions sse_starlette/sse.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ def __init__(
self.active = True

self._ping_task = None

# https://github.com/sysid/sse-starlette/pull/55#issuecomment-1732374113
self._send_lock = anyio.Lock()

@staticmethod
Expand Down Expand Up @@ -226,22 +228,20 @@ async def stream_response(self, send) -> None:
_log.debug(f"chunk: {chunk.decode()}")
await send({"type": "http.response.body", "body": chunk, "more_body": True})

await send({"type": "http.response.body", "body": b"", "more_body": False})
async with self._send_lock:
self.active = False
await send({"type": "http.response.body", "body": b"", "more_body": False})

async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
async def safe_send(message):
async with self._send_lock:
return await send(message)

async with anyio.create_task_group() as task_group:
# https://trio.readthedocs.io/en/latest/reference-core.html#custom-supervisors
async def wrap(func: Callable[[], Coroutine[None, None, None]]) -> None:
await func()
# noinspection PyAsyncCall
task_group.cancel_scope.cancel()

task_group.start_soon(wrap, partial(self.stream_response, safe_send))
task_group.start_soon(wrap, partial(self._ping, safe_send))
task_group.start_soon(wrap, partial(self.stream_response, send))
task_group.start_soon(wrap, partial(self._ping, send))
task_group.start_soon(wrap, self.listen_for_exit_signal)

if self.data_sender_callable:
Expand Down Expand Up @@ -290,4 +290,8 @@ async def _ping(self, send: Send) -> None:
else ensure_bytes(self.ping_message_factory(), self.sep)
)
_log.debug(f"ping: {ping.decode()}")
await send({"type": "http.response.body", "body": ping, "more_body": True})
async with self._send_lock:
if self.active:
await send(
{"type": "http.response.body", "body": ping, "more_body": True}
)
8 changes: 3 additions & 5 deletions tests/test_event_source_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,6 @@ async def test_ping_concurrency(reset_appstatus_event):
# claiming the lock and going to sleep for 1 second so until t=1.5s.
# t=1.0s - ping task wakes up and tries to call send while we know
# that event_publisher is still blocked inside it and holding the lock
#
# If there are concurrent calls to `send` then we will raise the WouldBlock below
# and the test would fail so it merely not failing indicates that the behavior is good
lock = anyio.Lock()

async def event_publisher():
Expand All @@ -127,9 +124,10 @@ async def receive():
await anyio.lowlevel.checkpoint()
return {"type": "something"}

response = EventSourceResponse(event_publisher(), ping=1)
with pytest.raises(anyio.WouldBlock) as e:
response = EventSourceResponse(event_publisher(), ping=1)

await response({}, receive, send)
await response({}, receive, send)


def test_header_charset():
Expand Down

0 comments on commit 159bbec

Please sign in to comment.