Skip to content

Commit

Permalink
Merge pull request #191 from rsocket/asyncwebsockets
Browse files Browse the repository at this point in the history
Asyncwebsockets
  • Loading branch information
jell-o-fishi authored Oct 20, 2023
2 parents ea2846f + 5262d37 commit ffd54e4
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 20 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ Changelog

v0.4.15
=======
- Websockets (https://github.com/python-websockets/websockets) server support
- Websockets server support (https://github.com/python-websockets/websockets)
- AsyncWebsockets client support (https://github.com/Fuyukai/asyncwebsockets)

v0.4.14
=======
Expand Down
25 changes: 13 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,19 @@ pip install rsocket

You may also install using some **extras**:

| Extra | Functionality | Documentation |
|-------------|--------------------------------------------------------------------------------------------|---------------------------------------------------------------------|
| rx | ReactiveX ([v3](https://pypi.org/project/Rx/)) integration | [Tutorial](https://rsocket.io/guides/rsocket-py/tutorial/reactivex) |
| reactivex | [ReactiveX](https://reactivex.io/) ([v4](https://pypi.org/project/reactivex/)) integration | [Tutorial](https://rsocket.io/guides/rsocket-py/tutorial/reactivex) |
| aiohttp | [aiohttp](https://docs.aiohttp.org/en/stable/) Websocket transport (server/client) | [Tutorial](https://rsocket.io/guides/rsocket-py/tutorial/websocket) |
| quart | [Quart](https://pgjones.gitlab.io/quart/) Websocket transport (server only) | |
| quic | [QUIC](https://github.com/aiortc/aioquic) transport | |
| websockets | [Websockets](https://github.com/python-websockets/websockets) transport (server only) | |
| cli | Command line | [Tutorial](https://rsocket.io/guides/rsocket-py/cli) |
| optimized | Frame parse/serialize optimizations | |
| cloudevents | [CloudEvents](https://cloudevents.io/) integration | |
| graphql | [GraphQL](https://graphql.org/) integration | [Tutorial](https://rsocket.io/guides/rsocket-py/graphql) |
| Extra | Functionality | Documentation |
|-----------------|--------------------------------------------------------------------------------------------|---------------------------------------------------------------------|
| rx | ReactiveX ([v3](https://pypi.org/project/Rx/)) integration | [Tutorial](https://rsocket.io/guides/rsocket-py/tutorial/reactivex) |
| reactivex | [ReactiveX](https://reactivex.io/) ([v4](https://pypi.org/project/reactivex/)) integration | [Tutorial](https://rsocket.io/guides/rsocket-py/tutorial/reactivex) |
| aiohttp | [aiohttp](https://docs.aiohttp.org/en/stable/) Websocket transport (server/client) | [Tutorial](https://rsocket.io/guides/rsocket-py/tutorial/websocket) |
| quart | [Quart](https://pgjones.gitlab.io/quart/) Websocket transport (server only) | |
| quic | [QUIC](https://github.com/aiortc/aioquic) transport | |
| websockets | [Websockets](https://github.com/python-websockets/websockets) transport (server only) | |
| asyncwebsockets | [Websockets](https://github.com/Fuyukai/asyncwebsockets) transport (client only) | |
| cli | Command line | [Tutorial](https://rsocket.io/guides/rsocket-py/cli) |
| optimized | Frame parse/serialize optimizations | |
| cloudevents | [CloudEvents](https://cloudevents.io/) integration | |
| graphql | [GraphQL](https://graphql.org/) integration | [Tutorial](https://rsocket.io/guides/rsocket-py/graphql) |

For example:

Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ pydantic==1.10.13
Werkzeug==3.0.0
graphql-core==3.2.3
gql==3.4.1
websockets==11.0.3
websockets==11.0.3
asyncwebsockets==0.9.4
56 changes: 56 additions & 0 deletions rsocket/transports/asyncwebsockets_transport.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import asyncio
from contextlib import asynccontextmanager

from rsocket.exceptions import RSocketTransportError
from rsocket.frame import Frame
from rsocket.helpers import wrap_transport_exception, single_transport_provider
from rsocket.logger import logger
from rsocket.rsocket_client import RSocketClient
from rsocket.transports.abstract_messaging import AbstractMessagingTransport


@asynccontextmanager
async def websocket_client(url: str,
**kwargs) -> RSocketClient:
"""
Helper method to instantiate an RSocket client using a websocket url over asyncwebsockets client.
"""
from asyncwebsockets import open_websocket
async with open_websocket(url) as websocket:
async with RSocketClient(single_transport_provider(TransportAsyncWebsocketsClient(websocket)),
**kwargs) as client:
yield client


class TransportAsyncWebsocketsClient(AbstractMessagingTransport):
"""
RSocket transport over client side asyncwebsockets.
"""

def __init__(self, websocket):
super().__init__()
self._ws = websocket
self._message_handler = None

async def connect(self):
self._message_handler = asyncio.create_task(self.handle_incoming_ws_messages())

async def handle_incoming_ws_messages(self):
from wsproto.events import BytesMessage
try:
async for message in self._ws:
if isinstance(message, BytesMessage):
async for frame in self._frame_parser.receive_data(message.data, 0):
self._incoming_frame_queue.put_nowait(frame)
except asyncio.CancelledError:
logger().debug('Asyncio task canceled: incoming_data_listener')
except Exception:
self._incoming_frame_queue.put_nowait(RSocketTransportError())

async def send_frame(self, frame: Frame):
with wrap_transport_exception():
await self._ws.send(frame.serialize())

async def close(self):
self._message_handler.cancel()
await self._message_handler
4 changes: 2 additions & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ cloudevents =
graphql =
graphql-core>=3.2.0
gql>=3.4.0
websockets =
websockets>=11.0.0
websockets = websockets>=11.0.0
asyncwebsockets = asyncwebsockets>=0.9.4

[options.entry_points]
cli.console_scripts =
Expand Down
4 changes: 2 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ def setup_logging(level=logging.DEBUG, use_file: bool = False):
setup_logging(logging.WARN)

tested_transports = [
'tcp'
'tcp',
'quart'
]

if sys.version_info[:3] < (3, 11, 5):
tested_transports += [
'aiohttp',
'quart',
'quic',
'http3',
# 'websockets'
Expand Down
4 changes: 2 additions & 2 deletions tests/tools/fixtures_quart.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
async def pipe_factory_quart_websocket(unused_tcp_port, client_arguments=None, server_arguments=None):
from quart import Quart
from rsocket.transports.quart_websocket import websocket_handler
from rsocket.transports.aiohttp_websocket import websocket_client
from rsocket.transports.asyncwebsockets_transport import websocket_client

app = Quart(__name__)
server: Optional[RSocketBase] = None
Expand All @@ -32,7 +32,7 @@ async def ws():
server_task = asyncio.create_task(app.run_task(port=unused_tcp_port))
await asyncio.sleep(0)

async with websocket_client('http://localhost:{}'.format(unused_tcp_port),
async with websocket_client('ws://localhost:{}'.format(unused_tcp_port),
**client_arguments) as client:
await wait_for_server.wait()
yield server, client
Expand Down

0 comments on commit ffd54e4

Please sign in to comment.