-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix for realtime endpoint SSE disconnects #8
Conversation
@matthieuEv would you be so kind as to test if this indeed fixes your bug? |
I've just run a test file but it don't seems to work with the example file: import asyncio
from datetime import datetime
from pocketbase import PocketBase
from pocketbase.models.dtos import RealtimeEvent
async def callback(event: RealtimeEvent) -> None:
"""Callback function for handling Realtime events.
Args:
event (RealtimeEvent): The event object containing information about the record change.
"""
# This will get called for every event
# Lets print what is going on
at = datetime.now().isoformat()
print(f"[{at}] {event['action'].upper()}: {event['record']}")
async def realtime_updates():
"""Establishes a PocketBase connection, authenticates, and subscribes to Realtime events."""
unsubscribe = None
try:
# Instantiate the PocketBase connector
pb = PocketBase(CONNECTION_URL)
# Authenticate as an admin
await pb.admins.auth.with_password(ADMIN_EMAIL, ADMIN_PASSWORD)
# Get the collection object
col = pb.collection(COLLECTION_NAME)
# Subscribe to Realtime events for the specific record ID in the collection
unsubscribe = await col.subscribe_all(callback=callback)
# Infinite loop to wait for events (adjusted from the second snippet)
while True:
await asyncio.sleep(60 * 60) # Sleep for an hour to avoid hitting PocketBase's rate limits
except Exception as e:
print(f"Error: {e}")
finally:
# Unsubscribe if still active
if unsubscribe:
try:
await unsubscribe()
except Exception as e:
print(f"Error unsubscribing: {e}")
if __name__ == "__main__":
asyncio.run(realtime_updates()) And i have the same error. I just run the file and wait for like 5min. ERROR:root:Connection to realtime endpoint lost
Traceback (most recent call last):
File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpx/_transports/default.py", line 69, in map_httpcore_exceptions
yield
File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpx/_transports/default.py", line 254, in __aiter__
async for part in self._httpcore_stream:
File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 367, in __aiter__
raise exc from None
File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 363, in __aiter__
async for part in self._stream:
File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpcore/_async/http11.py", line 349, in __aiter__
raise exc
File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpcore/_async/http11.py", line 341, in __aiter__
async for chunk in self._connection._receive_response_body(**kwargs):
File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpcore/_async/http11.py", line 210, in _receive_response_body
event = await self._receive_event(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpcore/_async/http11.py", line 220, in _receive_event
with map_exceptions({h11.RemoteProtocolError: RemoteProtocolError}):
File "/usr/lib/python3.11/contextlib.py", line 155, in __exit__
self.gen.throw(typ, value, traceback)
File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpcore/_exceptions.py", line 14, in map_exceptions
raise to_exc(exc) from exc
httpcore.RemoteProtocolError: peer closed connection without sending complete message body (incomplete chunked read)
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/lib/python3.11/contextlib.py", line 222, in __aexit__
await self.gen.athrow(typ, value, traceback)
File "/usr/lib/python3.11/contextlib.py", line 222, in __aexit__
await self.gen.athrow(typ, value, traceback)
File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpx/_client.py", line 1624, in stream
yield response
File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpx_sse/_api.py", line 70, in aconnect_sse
yield EventSource(response)
File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/pocketbase/services/realtime.py", line 53, in _make_connection
async for message in sse.aiter_sse():
File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpx_sse/_api.py", line 39, in aiter_sse
async for line in self._response.aiter_lines():
File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpx/_models.py", line 963, in aiter_lines
async for text in self.aiter_text():
File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpx/_models.py", line 950, in aiter_text
async for byte_content in self.aiter_bytes():
File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpx/_models.py", line 929, in aiter_bytes
async for raw_bytes in self.aiter_raw():
File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpx/_models.py", line 987, in aiter_raw
async for raw_stream_bytes in self.stream:
File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpx/_client.py", line 149, in __aiter__
async for chunk in self._stream:
File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpx/_transports/default.py", line 253, in __aiter__
with map_httpcore_exceptions():
File "/usr/lib/python3.11/contextlib.py", line 155, in __exit__
self.gen.throw(typ, value, traceback)
File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpx/_transports/default.py", line 86, in map_httpcore_exceptions
raise mapped_exc(message) from exc
httpx.RemoteProtocolError: peer closed connection without sending complete message body (incomplete chunked read)
ERROR:asyncio:Task exception was never retrieved
future: <Task finished name='Task-3' coro=<RealtimeService._make_connection() done, defined at /mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/pocketbase/services/realtime.py:44> exception=RemoteProtocolError('peer closed connection without sending complete message body (incomplete chunked read)')>
Traceback (most recent call last):
File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpx/_transports/default.py", line 69, in map_httpcore_exceptions
yield
File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpx/_transports/default.py", line 254, in __aiter__
async for part in self._httpcore_stream:
File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 367, in __aiter__
raise exc from None
File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 363, in __aiter__
async for part in self._stream:
File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpcore/_async/http11.py", line 349, in __aiter__
raise exc
File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpcore/_async/http11.py", line 341, in __aiter__
async for chunk in self._connection._receive_response_body(**kwargs):
File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpcore/_async/http11.py", line 210, in _receive_response_body
event = await self._receive_event(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpcore/_async/http11.py", line 220, in _receive_event
with map_exceptions({h11.RemoteProtocolError: RemoteProtocolError}):
File "/usr/lib/python3.11/contextlib.py", line 155, in __exit__
self.gen.throw(typ, value, traceback)
File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpcore/_exceptions.py", line 14, in map_exceptions
raise to_exc(exc) from exc
httpcore.RemoteProtocolError: peer closed connection without sending complete message body (incomplete chunked read)
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/lib/python3.11/contextlib.py", line 222, in __aexit__
await self.gen.athrow(typ, value, traceback)
File "/usr/lib/python3.11/contextlib.py", line 222, in __aexit__
await self.gen.athrow(typ, value, traceback)
File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpx/_client.py", line 1624, in stream
yield response
File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpx_sse/_api.py", line 70, in aconnect_sse
yield EventSource(response)
File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/pocketbase/services/realtime.py", line 53, in _make_connection
async for message in sse.aiter_sse():
File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpx_sse/_api.py", line 39, in aiter_sse
async for line in self._response.aiter_lines():
File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpx/_models.py", line 963, in aiter_lines
async for text in self.aiter_text():
File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpx/_models.py", line 950, in aiter_text
async for byte_content in self.aiter_bytes():
File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpx/_models.py", line 929, in aiter_bytes
async for raw_bytes in self.aiter_raw():
File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpx/_models.py", line 987, in aiter_raw
async for raw_stream_bytes in self.stream:
File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpx/_client.py", line 149, in __aiter__
async for chunk in self._stream:
File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpx/_transports/default.py", line 253, in __aiter__
with map_httpcore_exceptions():
File "/usr/lib/python3.11/contextlib.py", line 155, in __exit__
self.gen.throw(typ, value, traceback)
File "/mnt/d/Documents/Projets/myProject/venv/lib/python3.11/site-packages/httpx/_transports/default.py", line 86, in map_httpcore_exceptions
raise mapped_exc(message) from exc
httpx.RemoteProtocolError: peer closed connection without sending complete message body (incomplete chunked read)
^CTraceback (most recent call last):
File "/mnt/d/Documents/Projets/myProject/test.py", line 57, in <module>
asyncio.run(realtime_updates())
File "/usr/lib/python3.11/asyncio/runners.py", line 188, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/asyncio/runners.py", line 120, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/asyncio/base_events.py", line 650, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "/mnt/d/Documents/Projets/myProject/test.py", line 42, in realtime_updates
await asyncio.sleep(60 * 60) # Sleep for an hour to avoid hitting PocketBase's rate limits
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/asyncio/tasks.py", line 644, in sleep
return await future
^^^^^^^^^^^^
asyncio.exceptions.CancelledError
|
Ah, you seem to be getting a RemoteProtocolError which is different, I get a ReadError. That is very strange, but I think I'll add an exception case for it and it should work. |
Works like a charm, thanks! |
Checklist
Description of Changes
This ensures the realtime client reconnects in the case connection is lost.
Related Issues
Fixes #7