From ab8d6bf9604d065c6e635e76f3b3462c87195351 Mon Sep 17 00:00:00 2001 From: jell-o-fishi Date: Fri, 9 Dec 2022 16:37:32 +0200 Subject: [PATCH 1/8] test existing (unintended) functionality of route aliases. added ability to specify fallback route handler when route is unknown. explicitly disallow empty or None routes. --- rsocket/exceptions.py | 8 +++ rsocket/routing/request_router.py | 86 ++++++++++++++++++++++++++++--- tests/rsocket/test_routing.py | 41 +++++++++++++++ 3 files changed, 127 insertions(+), 8 deletions(-) diff --git a/rsocket/exceptions.py b/rsocket/exceptions.py index df95d5f7..d03936ff 100644 --- a/rsocket/exceptions.py +++ b/rsocket/exceptions.py @@ -35,6 +35,14 @@ class RSocketApplicationError(RSocketError): pass +class RSocketEmptyRoute(RSocketApplicationError): + def __init__(self, method_name: str): + self.method_name = method_name + + def __str__(self) -> str: + return f'Empty route set on method {self.method_name}' + + class RSocketUnknownRoute(RSocketApplicationError): def __init__(self, route_id: str): self.route_id = route_id diff --git a/rsocket/routing/request_router.py b/rsocket/routing/request_router.py index cfcfabd6..d6d07d33 100644 --- a/rsocket/routing/request_router.py +++ b/rsocket/routing/request_router.py @@ -1,17 +1,24 @@ +from dataclasses import dataclass from inspect import signature, Parameter from typing import Callable, Any -from rsocket.exceptions import RSocketUnknownRoute +from rsocket.exceptions import RSocketUnknownRoute, RSocketEmptyRoute from rsocket.extensions.composite_metadata import CompositeMetadata from rsocket.frame import FrameType +from rsocket.frame_helpers import safe_len from rsocket.payload import Payload from rsocket.rsocket import RSocket +__all__ = ['RequestRouter'] + decorated_method = Callable[[RSocket, Payload, CompositeMetadata], Any] def decorator_factory(container: dict, route: str): def decorator(function: decorated_method): + if safe_len(route) == 0: + raise RSocketEmptyRoute(function.__name__) + if route in container: raise KeyError('Duplicate route "%s" already registered', route) @@ -21,6 +28,15 @@ def decorator(function: decorated_method): return decorator +@dataclass +class Handlers: + response: callable = None + stream: callable = None + channel: callable = None + fire_and_forget: callable = None + metadata_push: callable = None + + class RequestRouter: __slots__ = ( '_channel_routes', @@ -29,7 +45,8 @@ class RequestRouter: '_fnf_routes', '_metadata_push', '_route_map_by_frame_type', - '_payload_mapper' + '_payload_mapper', + '_unknown' ) def __init__(self, payload_mapper=lambda cls, _: _): @@ -40,6 +57,8 @@ def __init__(self, payload_mapper=lambda cls, _: _): self._fnf_routes = {} self._metadata_push = {} + self._unknown = Handlers() + self._route_map_by_frame_type = { FrameType.REQUEST_CHANNEL: self._channel_routes, FrameType.REQUEST_FNF: self._fnf_routes, @@ -48,21 +67,68 @@ def __init__(self, payload_mapper=lambda cls, _: _): FrameType.METADATA_PUSH: self._metadata_push, } + def _get_unknown_route(self, frame_type): + if frame_type == FrameType.REQUEST_RESPONSE: + return self._unknown.response + if frame_type == FrameType.REQUEST_STREAM: + return self._unknown.stream + if frame_type == FrameType.REQUEST_CHANNEL: + return self._unknown.channel + if frame_type == FrameType.REQUEST_FNF: + return self._unknown.fire_and_forget + if frame_type == FrameType.METADATA_PUSH: + return self._unknown.metadata_push + def response(self, route: str): return decorator_factory(self._response_routes, route) + def response_unknown(self): + def wrapper(function): + self._unknown.response = function + return function + + return wrapper + def stream(self, route: str): return decorator_factory(self._stream_routes, route) + def stream_unknown(self): + def wrapper(function): + self._unknown.stream = function + return function + + return wrapper + def channel(self, route: str): return decorator_factory(self._channel_routes, route) + def channel_unknown(self): + def wrapper(function): + self._unknown.channel = function + return function + + return wrapper + def fire_and_forget(self, route: str): return decorator_factory(self._fnf_routes, route) + def fire_and_forget_unknown(self): + def wrapper(function): + self._unknown.fire_and_forget = function + return function + + return wrapper + def metadata_push(self, route: str): return decorator_factory(self._metadata_push, route) + def metadata_push_unknown(self): + def wrapper(function): + self._unknown.metadata_push = function + return function + + return wrapper + async def route(self, frame_type: FrameType, route: str, @@ -71,15 +137,19 @@ async def route(self, if route in self._route_map_by_frame_type[frame_type]: route_processor = self._route_map_by_frame_type[frame_type][route] - route_kwargs = await self._collect_route_arguments(route_processor, - payload, - composite_metadata) - - return await route_processor(**route_kwargs) else: + route_processor = self._get_unknown_route(frame_type) + + if route_processor is None: raise RSocketUnknownRoute(route) - async def _collect_route_arguments(self, route_processor, payload, composite_metadata): + route_kwargs = self._collect_route_arguments(route_processor, + payload, + composite_metadata) + + return await route_processor(**route_kwargs) + + def _collect_route_arguments(self, route_processor, payload, composite_metadata): route_signature = signature(route_processor) route_kwargs = {} diff --git a/tests/rsocket/test_routing.py b/tests/rsocket/test_routing.py index 5aaa06a7..9dc32ab4 100644 --- a/tests/rsocket/test_routing.py +++ b/tests/rsocket/test_routing.py @@ -61,6 +61,47 @@ async def response(): assert result.data == b'result' +async def test_routed_request_response_aliases(lazy_pipe): + router = RequestRouter() + + def handler_factory(): + return RoutingRequestHandler(router) + + @router.response('test.path') + @router.response('test.alternate_path') + async def response(): + return create_future(Payload(b'result')) + + async with lazy_pipe( + client_arguments={'metadata_encoding': WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA}, + server_arguments={'handler_factory': handler_factory}) as (server, client): + result = await client.request_response(Payload(metadata=composite(route('test.path')))) + + assert result.data == b'result' + + result = await client.request_response(Payload(metadata=composite(route('test.alternate_path')))) + + assert result.data == b'result' + + +async def test_routed_request_response_unknown_route(lazy_pipe): + router = RequestRouter() + + def handler_factory(): + return RoutingRequestHandler(router) + + @router.response_unknown() + async def response(): + return create_future(Payload(b'fallback')) + + async with lazy_pipe( + client_arguments={'metadata_encoding': WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA}, + server_arguments={'handler_factory': handler_factory}) as (server, client): + result = await client.request_response(Payload(metadata=composite(route('test.path')))) + + assert result.data == b'fallback' + + async def test_routed_request_response_with_payload_mapper(lazy_pipe): router = RequestRouter(lambda cls, _: json.loads(_.data.decode())) From c70446daf98f2bbdf351c4b91e59fbcc433aca4c Mon Sep 17 00:00:00 2001 From: jell-o-fishi Date: Fri, 9 Dec 2022 19:16:50 +0200 Subject: [PATCH 2/8] added unit tests for unknown stream and response path --- tests/rsocket/test_routing.py | 18 -------- tests/rsocket/test_routing_unknown_route.py | 47 +++++++++++++++++++++ 2 files changed, 47 insertions(+), 18 deletions(-) create mode 100644 tests/rsocket/test_routing_unknown_route.py diff --git a/tests/rsocket/test_routing.py b/tests/rsocket/test_routing.py index 9dc32ab4..3f3419dd 100644 --- a/tests/rsocket/test_routing.py +++ b/tests/rsocket/test_routing.py @@ -84,24 +84,6 @@ async def response(): assert result.data == b'result' -async def test_routed_request_response_unknown_route(lazy_pipe): - router = RequestRouter() - - def handler_factory(): - return RoutingRequestHandler(router) - - @router.response_unknown() - async def response(): - return create_future(Payload(b'fallback')) - - async with lazy_pipe( - client_arguments={'metadata_encoding': WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA}, - server_arguments={'handler_factory': handler_factory}) as (server, client): - result = await client.request_response(Payload(metadata=composite(route('test.path')))) - - assert result.data == b'fallback' - - async def test_routed_request_response_with_payload_mapper(lazy_pipe): router = RequestRouter(lambda cls, _: json.loads(_.data.decode())) diff --git a/tests/rsocket/test_routing_unknown_route.py b/tests/rsocket/test_routing_unknown_route.py new file mode 100644 index 00000000..37d9d379 --- /dev/null +++ b/tests/rsocket/test_routing_unknown_route.py @@ -0,0 +1,47 @@ +from rsocket.awaitable.awaitable_rsocket import AwaitableRSocket +from rsocket.extensions.helpers import route, composite +from rsocket.extensions.mimetypes import WellKnownMimeTypes +from rsocket.frame_helpers import ensure_bytes +from rsocket.helpers import create_future +from rsocket.payload import Payload +from rsocket.routing.request_router import RequestRouter +from rsocket.routing.routing_request_handler import RoutingRequestHandler +from rsocket.streams.stream_from_generator import StreamFromGenerator + + +async def test_routed_request_response_unknown_route(lazy_pipe): + router = RequestRouter() + + def handler_factory(): + return RoutingRequestHandler(router) + + @router.response_unknown() + async def response(): + return create_future(Payload(b'fallback')) + + async with lazy_pipe( + client_arguments={'metadata_encoding': WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA}, + server_arguments={'handler_factory': handler_factory}) as (server, client): + result = await client.request_response(Payload(metadata=composite(route('test.path')))) + + assert result.data == b'fallback' + + +async def test_routed_request_stream_unknown_route(lazy_pipe): + router = RequestRouter() + + def handler_factory(): + return RoutingRequestHandler(router) + + @router.stream_unknown() + async def response_stream(payload, composite_metadata): + return StreamFromGenerator( + lambda: ((Payload(ensure_bytes(str(i))), index == 9) for i, index in enumerate(range(10)))) + + async with lazy_pipe( + client_arguments={'metadata_encoding': WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA}, + server_arguments={'handler_factory': handler_factory}) as (server, client): + received_messages = await AwaitableRSocket(client).request_stream( + Payload(metadata=composite(route('test.path')))) + + assert len(received_messages) == 10 From d9b5f2d70eae529118f479196f0d1c6f939c94ac Mon Sep 17 00:00:00 2001 From: jell-o-fishi Date: Fri, 9 Dec 2022 19:19:36 +0200 Subject: [PATCH 3/8] added unit tests for empty routes --- tests/rsocket/test_request_router.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tests/rsocket/test_request_router.py b/tests/rsocket/test_request_router.py index ec3b8a37..f3ef8ddd 100644 --- a/tests/rsocket/test_request_router.py +++ b/tests/rsocket/test_request_router.py @@ -1,5 +1,6 @@ import pytest +from rsocket.exceptions import RSocketEmptyRoute from rsocket.helpers import create_future from rsocket.local_typing import Awaitable from rsocket.payload import Payload @@ -17,3 +18,17 @@ async def request_response(payload, composite_metadata) -> Awaitable[Payload]: @router.response('path1') async def request_response2(payload, composite_metadata) -> Awaitable[Payload]: return create_future() + + +async def test_request_router_disallow_empty_routes(): + router = RequestRouter() + + with pytest.raises(RSocketEmptyRoute): + @router.response('') + async def request_response(payload, composite_metadata) -> Awaitable[Payload]: + return create_future() + + with pytest.raises(RSocketEmptyRoute): + @router.response(None) + async def request_response2(payload, composite_metadata) -> Awaitable[Payload]: + return create_future() From 6fcdeaac9d18c76941acd7ed7a74933993d0be26 Mon Sep 17 00:00:00 2001 From: jell-o-fishi Date: Fri, 9 Dec 2022 19:50:16 +0200 Subject: [PATCH 4/8] fix request router payload mapping. --- CHANGELOG.rst | 9 ++++++ examples/tutorial/step4/chat_server.py | 9 +++--- examples/tutorial/step4/models.py | 4 +++ rsocket/routing/request_router.py | 40 +++++++++++++------------- 4 files changed, 37 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 8530a67d..af9c6185 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -4,6 +4,15 @@ Changelog v0.4.6 ====== - fire_and_forget now only removes the stream id when the future denoting the frame was sent, is done +- Raise error on empty or None route specified in request router +- Added the following methods to RequestRouter to allow specifying handlers of unknown routes: + - response_unknown + - stream_unknown + - channel_unknown + - fire_and_forget_unknown + - metadata_push_unknown +- Officially support route aliases by using the decorator multiple times on the same method +- Fix value mapping in request router. A parameter of any name (not just *payload*) specified on a routed method with a type-hint other than Payload will use the payload_mapper specified in the router instantiation to decode the value. v0.4.5 ====== diff --git a/examples/tutorial/step4/chat_server.py b/examples/tutorial/step4/chat_server.py index 422d6e34..3f76e095 100644 --- a/examples/tutorial/step4/chat_server.py +++ b/examples/tutorial/step4/chat_server.py @@ -9,7 +9,8 @@ from more_itertools import first -from examples.tutorial.step4.models import (Message, chat_filename_mimetype, dataclass_to_payload, decode_dataclass) +from examples.tutorial.step4.models import (Message, chat_filename_mimetype, dataclass_to_payload, decode_dataclass, + decode_payload) from reactivestreams.publisher import DefaultPublisher, Publisher from reactivestreams.subscriber import Subscriber from reactivestreams.subscription import DefaultSubscription @@ -88,7 +89,7 @@ def __init__(self): self._session: Optional[UserSessionData] = None def router_factory(self): - router = RequestRouter() + router = RequestRouter(decode_payload) @router.response('login') async def login(payload: Payload) -> Awaitable[Payload]: @@ -121,9 +122,7 @@ async def get_channels() -> Publisher: return StreamFromGenerator(lambda: generator) @router.response('message') - async def send_message(payload: Payload) -> Awaitable[Payload]: - message = decode_dataclass(payload.data, Message) - + async def send_message(message: Message) -> Awaitable[Payload]: logging.info('Received message for user: %s, channel: %s', message.user, message.channel) target_message = Message(self._session.username, message.content, message.channel) diff --git a/examples/tutorial/step4/models.py b/examples/tutorial/step4/models.py index 7215b88b..bb49f71f 100644 --- a/examples/tutorial/step4/models.py +++ b/examples/tutorial/step4/models.py @@ -30,3 +30,7 @@ def dataclass_to_payload(obj) -> Payload: def decode_dataclass(data: bytes, cls: Type[T]) -> T: return cls(**json.loads(utf8_decode(data))) + + +def decode_payload(cls: Type[T], payload: Payload) -> T: + return decode_dataclass(payload.data, cls) diff --git a/rsocket/routing/request_router.py b/rsocket/routing/request_router.py index d6d07d33..1dc394c9 100644 --- a/rsocket/routing/request_router.py +++ b/rsocket/routing/request_router.py @@ -67,18 +67,6 @@ def __init__(self, payload_mapper=lambda cls, _: _): FrameType.METADATA_PUSH: self._metadata_push, } - def _get_unknown_route(self, frame_type): - if frame_type == FrameType.REQUEST_RESPONSE: - return self._unknown.response - if frame_type == FrameType.REQUEST_STREAM: - return self._unknown.stream - if frame_type == FrameType.REQUEST_CHANNEL: - return self._unknown.channel - if frame_type == FrameType.REQUEST_FNF: - return self._unknown.fire_and_forget - if frame_type == FrameType.METADATA_PUSH: - return self._unknown.metadata_push - def response(self, route: str): return decorator_factory(self._response_routes, route) @@ -153,15 +141,27 @@ def _collect_route_arguments(self, route_processor, payload, composite_metadata) route_signature = signature(route_processor) route_kwargs = {} - if 'payload' in route_signature.parameters: - payload_expected_type = route_signature.parameters['payload'] - - if payload_expected_type is not Payload and payload_expected_type is not Parameter: - payload = self._payload_mapper(payload_expected_type, payload) + for parameter in route_signature.parameters: + parameter_type = route_signature.parameters[parameter] - route_kwargs['payload'] = payload + if 'composite_metadata' == parameter: + route_kwargs['composite_metadata'] = composite_metadata + else: + if parameter_type.annotation not in (Payload, parameter_type.empty): + payload = self._payload_mapper(parameter_type.annotation, payload) - if 'composite_metadata' in route_signature.parameters: - route_kwargs['composite_metadata'] = composite_metadata + route_kwargs[parameter] = payload return route_kwargs + + def _get_unknown_route(self, frame_type: FrameType) -> Callable: + if frame_type == FrameType.REQUEST_RESPONSE: + return self._unknown.response + elif frame_type == FrameType.REQUEST_STREAM: + return self._unknown.stream + elif frame_type == FrameType.REQUEST_CHANNEL: + return self._unknown.channel + elif frame_type == FrameType.REQUEST_FNF: + return self._unknown.fire_and_forget + elif frame_type == FrameType.METADATA_PUSH: + return self._unknown.metadata_push From b2f2c259c4139165d6a68cde874b61a2e7f8d86a Mon Sep 17 00:00:00 2001 From: jell-o-fishi Date: Fri, 9 Dec 2022 20:03:07 +0200 Subject: [PATCH 5/8] added test for request router payload mapping --- .../test_request_routing_decode_payload.py | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 tests/rsocket/test_request_routing_decode_payload.py diff --git a/tests/rsocket/test_request_routing_decode_payload.py b/tests/rsocket/test_request_routing_decode_payload.py new file mode 100644 index 00000000..08970a73 --- /dev/null +++ b/tests/rsocket/test_request_routing_decode_payload.py @@ -0,0 +1,35 @@ +import json +from dataclasses import dataclass + +from rsocket.extensions.helpers import route, composite +from rsocket.extensions.mimetypes import WellKnownMimeTypes +from rsocket.frame_helpers import ensure_bytes +from rsocket.helpers import utf8_decode, create_response +from rsocket.payload import Payload +from rsocket.routing.request_router import RequestRouter +from rsocket.routing.routing_request_handler import RoutingRequestHandler + + +async def test_request_response_type_hinted_payload(lazy_pipe): + @dataclass + class Message: + user: str + message: str + + router = RequestRouter(lambda cls, payload: cls(**json.loads(utf8_decode(payload.data)))) + + def handler_factory(): + return RoutingRequestHandler(router) + + @router.response('test.path') + async def response(message: Message): + return create_response(ensure_bytes(message.message)) + + async with lazy_pipe( + client_arguments={'metadata_encoding': WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA}, + server_arguments={'handler_factory': handler_factory}) as (server, client): + result = await client.request_response(Payload( + data=ensure_bytes(json.dumps(Message('George', 'hello').__dict__)), + metadata=composite(route('test.path')))) + + assert result.data == b'hello' From fae813f21c3920f74c6ee13b1a74b5ddf1a87ed9 Mon Sep 17 00:00:00 2001 From: jell-o-fishi Date: Fri, 9 Dec 2022 21:30:12 +0200 Subject: [PATCH 6/8] refactoring and updating to match guide --- examples/tutorial/reactivex/chat_client.py | 2 +- examples/tutorial/reactivex/chat_server.py | 2 +- .../tutorial/reactivex/{models.py => shared.py} | 0 examples/tutorial/step3/chat_client.py | 2 +- examples/tutorial/step3/chat_server.py | 2 +- examples/tutorial/step3/{models.py => shared.py} | 0 examples/tutorial/step4/chat_client.py | 2 +- examples/tutorial/step4/chat_server.py | 15 +++++---------- examples/tutorial/step4/{models.py => shared.py} | 11 +++++++++-- examples/tutorial/step5/chat_client.py | 2 +- examples/tutorial/step5/chat_server.py | 2 +- examples/tutorial/step5/{models.py => shared.py} | 0 examples/tutorial/step6/chat_client.py | 2 +- examples/tutorial/step6/chat_server.py | 2 +- examples/tutorial/step6/{models.py => shared.py} | 0 examples/tutorial/step7/chat_client.py | 2 +- examples/tutorial/step7/chat_server.py | 2 +- examples/tutorial/step7/{models.py => shared.py} | 0 examples/tutorial/step8/chat_client.py | 2 +- examples/tutorial/step8/chat_server.py | 2 +- examples/tutorial/step8/{models.py => shared.py} | 0 21 files changed, 27 insertions(+), 25 deletions(-) rename examples/tutorial/reactivex/{models.py => shared.py} (100%) rename examples/tutorial/step3/{models.py => shared.py} (100%) rename examples/tutorial/step4/{models.py => shared.py} (77%) rename examples/tutorial/step5/{models.py => shared.py} (100%) rename examples/tutorial/step6/{models.py => shared.py} (100%) rename examples/tutorial/step7/{models.py => shared.py} (100%) rename examples/tutorial/step8/{models.py => shared.py} (100%) diff --git a/examples/tutorial/reactivex/chat_client.py b/examples/tutorial/reactivex/chat_client.py index c11877dc..e3f0f3b6 100644 --- a/examples/tutorial/reactivex/chat_client.py +++ b/examples/tutorial/reactivex/chat_client.py @@ -7,7 +7,7 @@ from reactivex import operators -from examples.tutorial.reactivex.models import (Message, chat_filename_mimetype, ServerStatistics, ClientStatistics, +from examples.tutorial.reactivex.shared import (Message, chat_filename_mimetype, ServerStatistics, ClientStatistics, ServerStatisticsRequest, encode_dataclass, dataclass_to_payload, decode_dataclass) from rsocket.extensions.helpers import composite, route, metadata_item diff --git a/examples/tutorial/reactivex/chat_server.py b/examples/tutorial/reactivex/chat_server.py index d3ae71fa..81f516d6 100644 --- a/examples/tutorial/reactivex/chat_server.py +++ b/examples/tutorial/reactivex/chat_server.py @@ -11,7 +11,7 @@ from more_itertools import first from reactivex import Observable, operators, Subject, Observer -from examples.tutorial.reactivex.models import (Message, chat_filename_mimetype, ClientStatistics, +from examples.tutorial.reactivex.shared import (Message, chat_filename_mimetype, ClientStatistics, ServerStatisticsRequest, ServerStatistics, dataclass_to_payload, decode_dataclass) from rsocket.extensions.composite_metadata import CompositeMetadata diff --git a/examples/tutorial/reactivex/models.py b/examples/tutorial/reactivex/shared.py similarity index 100% rename from examples/tutorial/reactivex/models.py rename to examples/tutorial/reactivex/shared.py diff --git a/examples/tutorial/step3/chat_client.py b/examples/tutorial/step3/chat_client.py index d9662bb3..ae50a9c9 100644 --- a/examples/tutorial/step3/chat_client.py +++ b/examples/tutorial/step3/chat_client.py @@ -2,7 +2,7 @@ import logging from typing import Optional -from examples.tutorial.step3.models import Message, encode_dataclass, decode_dataclass +from examples.tutorial.step3.shared import Message, encode_dataclass, decode_dataclass from reactivestreams.subscriber import DefaultSubscriber from reactivestreams.subscription import DefaultSubscription from rsocket.extensions.helpers import composite, route diff --git a/examples/tutorial/step3/chat_server.py b/examples/tutorial/step3/chat_server.py index 9af950db..2a4c4039 100644 --- a/examples/tutorial/step3/chat_server.py +++ b/examples/tutorial/step3/chat_server.py @@ -8,7 +8,7 @@ from more_itertools import first -from examples.tutorial.step3.models import Message, dataclass_to_payload, decode_dataclass +from examples.tutorial.step3.shared import Message, dataclass_to_payload, decode_dataclass from reactivestreams.publisher import DefaultPublisher, Publisher from reactivestreams.subscriber import Subscriber from reactivestreams.subscription import DefaultSubscription diff --git a/examples/tutorial/step3/models.py b/examples/tutorial/step3/shared.py similarity index 100% rename from examples/tutorial/step3/models.py rename to examples/tutorial/step3/shared.py diff --git a/examples/tutorial/step4/chat_client.py b/examples/tutorial/step4/chat_client.py index 00e04717..11f63a04 100644 --- a/examples/tutorial/step4/chat_client.py +++ b/examples/tutorial/step4/chat_client.py @@ -2,7 +2,7 @@ import logging from typing import List, Optional -from examples.tutorial.step4.models import Message, encode_dataclass, decode_dataclass +from examples.tutorial.step4.shared import Message, encode_dataclass, decode_dataclass from reactivestreams.subscriber import DefaultSubscriber from reactivestreams.subscription import DefaultSubscription from rsocket.awaitable.awaitable_rsocket import AwaitableRSocket diff --git a/examples/tutorial/step4/chat_server.py b/examples/tutorial/step4/chat_server.py index 3f76e095..9ee16c43 100644 --- a/examples/tutorial/step4/chat_server.py +++ b/examples/tutorial/step4/chat_server.py @@ -9,8 +9,7 @@ from more_itertools import first -from examples.tutorial.step4.models import (Message, chat_filename_mimetype, dataclass_to_payload, decode_dataclass, - decode_payload) +from examples.tutorial.step4.shared import (Message, chat_filename_mimetype, dataclass_to_payload, decode_payload) from reactivestreams.publisher import DefaultPublisher, Publisher from reactivestreams.subscriber import Subscriber from reactivestreams.subscription import DefaultSubscription @@ -89,7 +88,7 @@ def __init__(self): self._session: Optional[UserSessionData] = None def router_factory(self): - router = RequestRouter(decode_payload) + router = RequestRouter(payload_mapper=decode_payload) @router.response('login') async def login(payload: Payload) -> Awaitable[Payload]: @@ -102,15 +101,13 @@ async def login(payload: Payload) -> Awaitable[Payload]: return create_response(ensure_bytes(session_id)) @router.response('channel.join') - async def join_channel(payload: Payload) -> Awaitable[Payload]: - channel_name = utf8_decode(payload.data) + async def join_channel(channel_name: str) -> Awaitable[Payload]: ensure_channel_exists(channel_name) chat_data.channel_users[channel_name].add(self._session.session_id) return create_response() @router.response('channel.leave') - async def leave_channel(payload: Payload) -> Awaitable[Payload]: - channel_name = utf8_decode(payload.data) + async def leave_channel(channel_name: str) -> Awaitable[Payload]: chat_data.channel_users[channel_name].discard(self._session.session_id) return create_response() @@ -157,9 +154,7 @@ async def _message_sender(self): return MessagePublisher(self._session) @router.stream('channel.users') - async def get_channel_users(payload: Payload) -> Publisher: - channel_name = utf8_decode(payload.data) - + async def get_channel_users(channel_name: str) -> Publisher: if channel_name not in chat_data.channel_users: return EmptyStream() diff --git a/examples/tutorial/step4/models.py b/examples/tutorial/step4/shared.py similarity index 77% rename from examples/tutorial/step4/models.py rename to examples/tutorial/step4/shared.py index bb49f71f..eb6318a2 100644 --- a/examples/tutorial/step4/models.py +++ b/examples/tutorial/step4/shared.py @@ -32,5 +32,12 @@ def decode_dataclass(data: bytes, cls: Type[T]) -> T: return cls(**json.loads(utf8_decode(data))) -def decode_payload(cls: Type[T], payload: Payload) -> T: - return decode_dataclass(payload.data, cls) +def decode_payload(cls, payload: Payload): + data = payload.data + + if cls is bytes: + return data + if cls is str: + return utf8_decode(data) + + return decode_dataclass(data, cls) diff --git a/examples/tutorial/step5/chat_client.py b/examples/tutorial/step5/chat_client.py index d826eae2..b0a0eb02 100644 --- a/examples/tutorial/step5/chat_client.py +++ b/examples/tutorial/step5/chat_client.py @@ -2,7 +2,7 @@ import logging from typing import List, Optional -from examples.tutorial.step5.models import Message, chat_filename_mimetype, encode_dataclass, decode_dataclass +from examples.tutorial.step5.shared import Message, chat_filename_mimetype, encode_dataclass, decode_dataclass from reactivestreams.subscriber import DefaultSubscriber from reactivestreams.subscription import DefaultSubscription from rsocket.awaitable.awaitable_rsocket import AwaitableRSocket diff --git a/examples/tutorial/step5/chat_server.py b/examples/tutorial/step5/chat_server.py index 4f117ca7..6492270b 100644 --- a/examples/tutorial/step5/chat_server.py +++ b/examples/tutorial/step5/chat_server.py @@ -9,7 +9,7 @@ from more_itertools import first -from examples.tutorial.step5.models import (Message, chat_filename_mimetype, dataclass_to_payload, decode_dataclass) +from examples.tutorial.step5.shared import (Message, chat_filename_mimetype, dataclass_to_payload, decode_dataclass) from reactivestreams.publisher import DefaultPublisher, Publisher from reactivestreams.subscriber import Subscriber from reactivestreams.subscription import DefaultSubscription diff --git a/examples/tutorial/step5/models.py b/examples/tutorial/step5/shared.py similarity index 100% rename from examples/tutorial/step5/models.py rename to examples/tutorial/step5/shared.py diff --git a/examples/tutorial/step6/chat_client.py b/examples/tutorial/step6/chat_client.py index 4b2610d0..6e8507fc 100644 --- a/examples/tutorial/step6/chat_client.py +++ b/examples/tutorial/step6/chat_client.py @@ -5,7 +5,7 @@ from datetime import timedelta from typing import List, Optional -from examples.tutorial.step6.models import (Message, chat_filename_mimetype, ServerStatistics, ClientStatistics, +from examples.tutorial.step6.shared import (Message, chat_filename_mimetype, ServerStatistics, ClientStatistics, ServerStatisticsRequest, encode_dataclass, dataclass_to_payload, decode_dataclass) from reactivestreams.publisher import DefaultPublisher diff --git a/examples/tutorial/step6/chat_server.py b/examples/tutorial/step6/chat_server.py index 57ba6be6..bffdf9a3 100644 --- a/examples/tutorial/step6/chat_server.py +++ b/examples/tutorial/step6/chat_server.py @@ -9,7 +9,7 @@ from more_itertools import first -from examples.tutorial.step6.models import (Message, chat_filename_mimetype, ClientStatistics, ServerStatisticsRequest, +from examples.tutorial.step6.shared import (Message, chat_filename_mimetype, ClientStatistics, ServerStatisticsRequest, ServerStatistics, dataclass_to_payload, decode_dataclass) from reactivestreams.publisher import DefaultPublisher, Publisher from reactivestreams.subscriber import Subscriber, DefaultSubscriber diff --git a/examples/tutorial/step6/models.py b/examples/tutorial/step6/shared.py similarity index 100% rename from examples/tutorial/step6/models.py rename to examples/tutorial/step6/shared.py diff --git a/examples/tutorial/step7/chat_client.py b/examples/tutorial/step7/chat_client.py index f5c2b904..5dc28792 100644 --- a/examples/tutorial/step7/chat_client.py +++ b/examples/tutorial/step7/chat_client.py @@ -6,7 +6,7 @@ from datetime import timedelta from typing import List, Optional -from examples.tutorial.step6.models import (Message, chat_filename_mimetype, ServerStatistics, ClientStatistics, +from examples.tutorial.step6.shared import (Message, chat_filename_mimetype, ServerStatistics, ClientStatistics, ServerStatisticsRequest, encode_dataclass, dataclass_to_payload, decode_dataclass) from reactivestreams.publisher import DefaultPublisher diff --git a/examples/tutorial/step7/chat_server.py b/examples/tutorial/step7/chat_server.py index 3cded2bd..9439fb90 100644 --- a/examples/tutorial/step7/chat_server.py +++ b/examples/tutorial/step7/chat_server.py @@ -9,7 +9,7 @@ from more_itertools import first -from examples.tutorial.step6.models import (Message, chat_filename_mimetype, ClientStatistics, ServerStatisticsRequest, +from examples.tutorial.step6.shared import (Message, chat_filename_mimetype, ClientStatistics, ServerStatisticsRequest, ServerStatistics, dataclass_to_payload, decode_dataclass) from reactivestreams.publisher import DefaultPublisher, Publisher from reactivestreams.subscriber import Subscriber, DefaultSubscriber diff --git a/examples/tutorial/step7/models.py b/examples/tutorial/step7/shared.py similarity index 100% rename from examples/tutorial/step7/models.py rename to examples/tutorial/step7/shared.py diff --git a/examples/tutorial/step8/chat_client.py b/examples/tutorial/step8/chat_client.py index 0591610f..0fefc86e 100644 --- a/examples/tutorial/step8/chat_client.py +++ b/examples/tutorial/step8/chat_client.py @@ -8,7 +8,7 @@ import aiohttp -from examples.tutorial.step6.models import (Message, chat_filename_mimetype, ServerStatistics, ClientStatistics, +from examples.tutorial.step6.shared import (Message, chat_filename_mimetype, ServerStatistics, ClientStatistics, ServerStatisticsRequest, encode_dataclass, dataclass_to_payload, decode_dataclass) from reactivestreams.publisher import DefaultPublisher diff --git a/examples/tutorial/step8/chat_server.py b/examples/tutorial/step8/chat_server.py index eb283fad..b1f78243 100644 --- a/examples/tutorial/step8/chat_server.py +++ b/examples/tutorial/step8/chat_server.py @@ -10,7 +10,7 @@ from aiohttp import web from more_itertools import first -from examples.tutorial.step6.models import (Message, chat_filename_mimetype, ClientStatistics, ServerStatisticsRequest, +from examples.tutorial.step6.shared import (Message, chat_filename_mimetype, ClientStatistics, ServerStatisticsRequest, ServerStatistics, dataclass_to_payload, decode_dataclass) from reactivestreams.publisher import DefaultPublisher, Publisher from reactivestreams.subscriber import Subscriber, DefaultSubscriber diff --git a/examples/tutorial/step8/models.py b/examples/tutorial/step8/shared.py similarity index 100% rename from examples/tutorial/step8/models.py rename to examples/tutorial/step8/shared.py From 466d1782068acebe87f6487776be4026803210c5 Mon Sep 17 00:00:00 2001 From: jell-o-fishi Date: Sat, 10 Dec 2022 11:00:11 +0200 Subject: [PATCH 7/8] updated guide examples --- examples/tutorial/reactivex/chat_client.py | 17 +++++++-- examples/tutorial/reactivex/chat_server.py | 41 +++++++++++++--------- examples/tutorial/reactivex/shared.py | 11 ++++++ examples/tutorial/step4/chat_client.py | 17 +++++++-- examples/tutorial/step5/chat_client.py | 14 ++++---- examples/tutorial/step5/chat_server.py | 21 ++++------- examples/tutorial/step5/shared.py | 11 ++++++ examples/tutorial/step6/chat_client.py | 19 +++++++--- examples/tutorial/step6/chat_server.py | 41 ++++++++++++++-------- examples/tutorial/step6/shared.py | 11 ++++++ examples/tutorial/step7/chat_client.py | 19 +++++++--- examples/tutorial/step7/chat_server.py | 41 ++++++++++++++-------- examples/tutorial/step7/shared.py | 11 ++++++ examples/tutorial/step8/chat_client.py | 19 +++++++--- examples/tutorial/step8/chat_server.py | 40 +++++++++++++-------- examples/tutorial/step8/shared.py | 11 ++++++ 16 files changed, 247 insertions(+), 97 deletions(-) diff --git a/examples/tutorial/reactivex/chat_client.py b/examples/tutorial/reactivex/chat_client.py index e3f0f3b6..f9007e22 100644 --- a/examples/tutorial/reactivex/chat_client.py +++ b/examples/tutorial/reactivex/chat_client.py @@ -147,6 +147,12 @@ async def list_channels(self) -> List[str]: ).pipe(operators.map(lambda _: utf8_decode(_.data)), operators.to_list()) + async def list_channel_users(self, channel_name: str) -> List[str]: + request = Payload(ensure_bytes(channel_name), composite(route('channel.users'))) + return await ReactiveXClient(self._rsocket).request_stream( + request + ).pipe(operators.map(lambda _: utf8_decode(_.data)), + operators.to_list()) async def main(): connection1 = await asyncio.open_connection('localhost', 6565) @@ -174,16 +180,21 @@ async def messaging_example(user1, user2): user1.listen_for_messages() user2.listen_for_messages() - await user1.join('channel1') - await user2.join('channel1') + channel_name = 'channel1' + await user1.join(channel_name) + await user2.join(channel_name) print(f'Channels: {await user1.list_channels()}') + print(f'Channel {channel_name} users: {await user1.list_channel_users(channel_name)}') await user1.private_message('user2', 'private message from user1') - await user1.channel_message('channel1', 'channel message from user1') + await user1.channel_message(channel_name, 'channel message from user1') await asyncio.sleep(1) + await user1.leave(channel_name) + print(f'Channel {channel_name} users: {await user1.list_channel_users(channel_name)}') + user1.stop_listening_for_messages() user2.stop_listening_for_messages() diff --git a/examples/tutorial/reactivex/chat_server.py b/examples/tutorial/reactivex/chat_server.py index 81f516d6..1830bfaa 100644 --- a/examples/tutorial/reactivex/chat_server.py +++ b/examples/tutorial/reactivex/chat_server.py @@ -13,7 +13,7 @@ from examples.tutorial.reactivex.shared import (Message, chat_filename_mimetype, ClientStatistics, ServerStatisticsRequest, ServerStatistics, dataclass_to_payload, - decode_dataclass) + decode_dataclass, decode_payload) from rsocket.extensions.composite_metadata import CompositeMetadata from rsocket.extensions.helpers import composite, metadata_item from rsocket.frame_helpers import ensure_bytes @@ -36,7 +36,7 @@ class SessionId(str): # allow weak reference @dataclass() class UserSessionData: username: str - session_id: str + session_id: SessionId messages: Queue = field(default_factory=Queue) statistics: Optional[ClientStatistics] = None requested_statistics: ServerStatisticsRequest = field(default_factory=ServerStatisticsRequest) @@ -44,7 +44,7 @@ class UserSessionData: @dataclass(frozen=True) class ChatData: - channel_users: Dict[str, Set[str]] = field(default_factory=lambda: defaultdict(WeakSet)) + channel_users: Dict[str, Set[SessionId]] = field(default_factory=lambda: defaultdict(WeakSet)) files: Dict[str, bytes] = field(default_factory=dict) channel_messages: Dict[str, Queue] = field(default_factory=lambda: defaultdict(Queue)) user_session_by_id: Dict[str, UserSessionData] = field(default_factory=WeakValueDictionary) @@ -95,6 +95,13 @@ def new_statistics_data(requested_statistics: ServerStatisticsRequest): return ServerStatistics(**statistics_data) +def find_username_by_session(session_id: SessionId) -> Optional[str]: + session = chat_data.user_session_by_id.get(session_id) + if session is None: + return None + return session.username + + class ChatUserSession: def __init__(self): @@ -105,11 +112,10 @@ def remove(self): del chat_data.user_session_by_id[self._session.session_id] def router_factory(self): - router = RequestRouter() + router = RequestRouter(payload_mapper=decode_payload) @router.response('login') - async def login(payload: Payload) -> Observable: - username = utf8_decode(payload.data) + async def login(username: str) -> Observable: logging.info(f'New user: {username}') session_id = SessionId(uuid.uuid4()) self._session = UserSessionData(username, session_id) @@ -118,15 +124,13 @@ async def login(payload: Payload) -> Observable: return reactivex.just(Payload(ensure_bytes(session_id))) @router.response('channel.join') - async def join_channel(payload: Payload) -> Observable: - channel_name = utf8_decode(payload.data) + async def join_channel(channel_name: str) -> Observable: ensure_channel_exists(channel_name) chat_data.channel_users[channel_name].add(self._session.session_id) return reactivex.empty() @router.response('channel.leave') - async def leave_channel(payload: Payload) -> Observable: - channel_name = utf8_decode(payload.data) + async def leave_channel(channel_name: str) -> Observable: chat_data.channel_users[channel_name].discard(self._session.session_id) return reactivex.empty() @@ -157,10 +161,17 @@ async def get_channels() -> Observable: return reactivex.from_iterable( (Payload(ensure_bytes(channel)) for channel in chat_data.channel_messages.keys())) - @router.fire_and_forget('statistics') - async def receive_statistics(payload: Payload): - statistics = decode_dataclass(payload.data, ClientStatistics) + @router.stream('channel.users') + async def get_channel_users(channel_name: str) -> Observable: + if channel_name not in chat_data.channel_users: + return reactivex.empty() + return reactivex.from_iterable(Payload(ensure_bytes(find_username_by_session(session_id))) for + session_id in + chat_data.channel_users[channel_name]) + + @router.fire_and_forget('statistics') + async def receive_statistics(statistics: ClientStatistics): logging.info('Received client statistics. memory usage: %s', statistics.memory_usage) self._session.statistics = statistics @@ -198,9 +209,7 @@ def on_next(payload: Payload): limit_rate=2) @router.response('message') - async def send_message(payload: Payload) -> Observable: - message = decode_dataclass(payload.data, Message) - + async def send_message(message: Message) -> Observable: logging.info('Received message for user: %s, channel: %s', message.user, message.channel) target_message = Message(self._session.username, message.content, message.channel) diff --git a/examples/tutorial/reactivex/shared.py b/examples/tutorial/reactivex/shared.py index 6c6a2d5a..8c996665 100644 --- a/examples/tutorial/reactivex/shared.py +++ b/examples/tutorial/reactivex/shared.py @@ -47,3 +47,14 @@ def dataclass_to_payload(obj) -> Payload: def decode_dataclass(data: bytes, cls: Type[T]) -> T: return cls(**json.loads(utf8_decode(data))) + + +def decode_payload(cls, payload: Payload): + data = payload.data + + if cls is bytes: + return data + if cls is str: + return utf8_decode(data) + + return decode_dataclass(data, cls) \ No newline at end of file diff --git a/examples/tutorial/step4/chat_client.py b/examples/tutorial/step4/chat_client.py index 11f63a04..7c726494 100644 --- a/examples/tutorial/step4/chat_client.py +++ b/examples/tutorial/step4/chat_client.py @@ -77,6 +77,11 @@ async def list_channels(self) -> List[str]: response = await AwaitableRSocket(self._rsocket).request_stream(request) return list(map(lambda _: utf8_decode(_.data), response)) + async def list_channel_users(self, channel_name: str): + request = Payload(ensure_bytes(channel_name), composite(route('channel.users'))) + response = await AwaitableRSocket(self._rsocket).request_stream(request) + return list(map(lambda _: utf8_decode(_.data), response)) + async def main(): connection1 = await asyncio.open_connection('localhost', 6565) @@ -100,16 +105,22 @@ async def messaging_example(user1: ChatClient, user2: ChatClient): user1.listen_for_messages() user2.listen_for_messages() - await user1.join('channel1') - await user2.join('channel1') + channel_name = 'channel1' + + await user1.join(channel_name) + await user2.join(channel_name) print(f'Channels: {await user1.list_channels()}') + print(f'Channel {channel_name} users: {await user1.list_channel_users(channel_name)}') await user1.private_message('user2', 'private message from user1') - await user1.channel_message('channel1', 'channel message from user1') + await user1.channel_message(channel_name, 'channel message from user1') await asyncio.sleep(1) + await user1.leave(channel_name) + print(f'Channel {channel_name} users: {await user1.list_channel_users(channel_name)}') + user1.stop_listening_for_messages() user2.stop_listening_for_messages() diff --git a/examples/tutorial/step5/chat_client.py b/examples/tutorial/step5/chat_client.py index b0a0eb02..a3834af8 100644 --- a/examples/tutorial/step5/chat_client.py +++ b/examples/tutorial/step5/chat_client.py @@ -38,11 +38,6 @@ async def leave(self, channel_name: str): await self._rsocket.request_response(request) return self - async def get_users(self, channel_name: str) -> List[str]: - request = Payload(ensure_bytes(channel_name), composite(route('channel.users'))) - users = await AwaitableRSocket(self._rsocket).request_stream(request) - return [utf8_decode(user.data) for user in users] - def listen_for_messages(self): def print_message(data: bytes): message = decode_dataclass(data, Message) @@ -97,6 +92,11 @@ async def list_channels(self) -> List[str]: response = await AwaitableRSocket(self._rsocket).request_stream(request) return list(map(lambda _: utf8_decode(_.data), response)) + async def list_channel_users(self, channel_name: str): + request = Payload(ensure_bytes(channel_name), composite(route('channel.users'))) + response = await AwaitableRSocket(self._rsocket).request_stream(request) + return list(map(lambda _: utf8_decode(_.data), response)) + async def main(): connection1 = await asyncio.open_connection('localhost', 6565) @@ -129,7 +129,7 @@ async def messaging_example(user1: ChatClient, user2: ChatClient): await user2.join(channel_name) print(f'Channels: {await user1.list_channels()}') - print(f'Channel {channel_name} users: {await user1.get_users(channel_name)}') + print(f'Channel {channel_name} users: {await user1.list_channel_users(channel_name)}') await user1.private_message('user2', 'private message from user1') await user1.channel_message(channel_name, 'channel message from user1') @@ -137,7 +137,7 @@ async def messaging_example(user1: ChatClient, user2: ChatClient): await asyncio.sleep(1) await user1.leave(channel_name) - print(f'Channel {channel_name} users: {await user1.get_users(channel_name)}') + print(f'Channel {channel_name} users: {await user1.list_channel_users(channel_name)}') user1.stop_listening_for_messages() user2.stop_listening_for_messages() diff --git a/examples/tutorial/step5/chat_server.py b/examples/tutorial/step5/chat_server.py index 6492270b..2cf2bb1a 100644 --- a/examples/tutorial/step5/chat_server.py +++ b/examples/tutorial/step5/chat_server.py @@ -9,7 +9,7 @@ from more_itertools import first -from examples.tutorial.step5.shared import (Message, chat_filename_mimetype, dataclass_to_payload, decode_dataclass) +from examples.tutorial.step5.shared import (Message, chat_filename_mimetype, dataclass_to_payload, decode_payload) from reactivestreams.publisher import DefaultPublisher, Publisher from reactivestreams.subscriber import Subscriber from reactivestreams.subscription import DefaultSubscription @@ -95,11 +95,10 @@ def remove(self): del chat_data.user_session_by_id[self._session.session_id] def router_factory(self): - router = RequestRouter() + router = RequestRouter(payload_mapper=decode_payload) @router.response('login') - async def login(payload: Payload) -> Awaitable[Payload]: - username = utf8_decode(payload.data) + async def login(username: str) -> Awaitable[Payload]: logging.info(f'New user: {username}') session_id = SessionId(uuid.uuid4()) self._session = UserSessionData(username, session_id) @@ -108,22 +107,18 @@ async def login(payload: Payload) -> Awaitable[Payload]: return create_response(ensure_bytes(session_id)) @router.response('channel.join') - async def join_channel(payload: Payload) -> Awaitable[Payload]: - channel_name = utf8_decode(payload.data) + async def join_channel(channel_name: str) -> Awaitable[Payload]: ensure_channel_exists(channel_name) chat_data.channel_users[channel_name].add(self._session.session_id) return create_response() @router.response('channel.leave') - async def leave_channel(payload: Payload) -> Awaitable[Payload]: - channel_name = utf8_decode(payload.data) + async def leave_channel(channel_name: str) -> Awaitable[Payload]: chat_data.channel_users[channel_name].discard(self._session.session_id) return create_response() @router.stream('channel.users') - async def get_channel_users(payload: Payload) -> Publisher: - channel_name = utf8_decode(payload.data) - + async def get_channel_users(channel_name: str) -> Publisher: if channel_name not in chat_data.channel_users: return EmptyStream() @@ -160,9 +155,7 @@ async def get_channels() -> Publisher: return StreamFromGenerator(lambda: generator) @router.response('message') - async def send_message(payload: Payload) -> Awaitable[Payload]: - message = decode_dataclass(payload.data, Message) - + async def send_message(message: Message) -> Awaitable[Payload]: logging.info('Received message for user: %s, channel: %s', message.user, message.channel) target_message = Message(self._session.username, message.content, message.channel) diff --git a/examples/tutorial/step5/shared.py b/examples/tutorial/step5/shared.py index 7215b88b..e8dff324 100644 --- a/examples/tutorial/step5/shared.py +++ b/examples/tutorial/step5/shared.py @@ -30,3 +30,14 @@ def dataclass_to_payload(obj) -> Payload: def decode_dataclass(data: bytes, cls: Type[T]) -> T: return cls(**json.loads(utf8_decode(data))) + + +def decode_payload(cls, payload: Payload): + data = payload.data + + if cls is bytes: + return data + if cls is str: + return utf8_decode(data) + + return decode_dataclass(data, cls) \ No newline at end of file diff --git a/examples/tutorial/step6/chat_client.py b/examples/tutorial/step6/chat_client.py index 6e8507fc..07b10b77 100644 --- a/examples/tutorial/step6/chat_client.py +++ b/examples/tutorial/step6/chat_client.py @@ -139,6 +139,11 @@ async def list_channels(self) -> List[str]: response = await AwaitableRSocket(self._rsocket).request_stream(request) return list(map(lambda _: utf8_decode(_.data), response)) + async def list_channel_users(self, channel_name: str): + request = Payload(ensure_bytes(channel_name), composite(route('channel.users'))) + response = await AwaitableRSocket(self._rsocket).request_stream(request) + return list(map(lambda _: utf8_decode(_.data), response)) + async def main(): connection1 = await asyncio.open_connection('localhost', 6565) @@ -162,20 +167,26 @@ async def main(): await files_example(user1, user2) -async def messaging_example(user1, user2): +async def messaging_example(user1: ChatClient, user2: ChatClient): user1.listen_for_messages() user2.listen_for_messages() - await user1.join('channel1') - await user2.join('channel1') + channel_name = 'channel1' + + await user1.join(channel_name) + await user2.join(channel_name) print(f'Channels: {await user1.list_channels()}') + print(f'Channel {channel_name} users: {await user1.list_channel_users(channel_name)}') await user1.private_message('user2', 'private message from user1') - await user1.channel_message('channel1', 'channel message from user1') + await user1.channel_message(channel_name, 'channel message from user1') await asyncio.sleep(1) + await user1.leave(channel_name) + print(f'Channel {channel_name} users: {await user1.list_channel_users(channel_name)}') + user1.stop_listening_for_messages() user2.stop_listening_for_messages() diff --git a/examples/tutorial/step6/chat_server.py b/examples/tutorial/step6/chat_server.py index bffdf9a3..6ca67e40 100644 --- a/examples/tutorial/step6/chat_server.py +++ b/examples/tutorial/step6/chat_server.py @@ -10,7 +10,7 @@ from more_itertools import first from examples.tutorial.step6.shared import (Message, chat_filename_mimetype, ClientStatistics, ServerStatisticsRequest, - ServerStatistics, dataclass_to_payload, decode_dataclass) + ServerStatistics, dataclass_to_payload, decode_dataclass, decode_payload) from reactivestreams.publisher import DefaultPublisher, Publisher from reactivestreams.subscriber import Subscriber, DefaultSubscriber from reactivestreams.subscription import DefaultSubscription @@ -22,6 +22,7 @@ from rsocket.routing.request_router import RequestRouter from rsocket.routing.routing_request_handler import RoutingRequestHandler from rsocket.rsocket_server import RSocketServer +from rsocket.streams.empty_stream import EmptyStream from rsocket.streams.stream_from_generator import StreamFromGenerator from rsocket.transports.tcp import TransportTCP @@ -79,6 +80,13 @@ def find_session_by_username(username: str) -> Optional[UserSessionData]: session.username == username), None) +def find_username_by_session(session_id: SessionId) -> Optional[str]: + session = chat_data.user_session_by_id.get(session_id) + if session is None: + return None + return session.username + + def new_statistics_data(statistics_request: ServerStatisticsRequest): statistics_data = {} @@ -101,11 +109,10 @@ def remove(self): del chat_data.user_session_by_id[self._session.session_id] def router_factory(self): - router = RequestRouter() + router = RequestRouter(payload_mapper=decode_payload) @router.response('login') - async def login(payload: Payload) -> Awaitable[Payload]: - username = utf8_decode(payload.data) + async def login(username: str) -> Awaitable[Payload]: logging.info(f'New user: {username}') session_id = SessionId(uuid.uuid4()) self._session = UserSessionData(username, session_id) @@ -114,15 +121,13 @@ async def login(payload: Payload) -> Awaitable[Payload]: return create_response(ensure_bytes(session_id)) @router.response('channel.join') - async def join_channel(payload: Payload) -> Awaitable[Payload]: - channel_name = utf8_decode(payload.data) + async def join_channel(channel_name: str) -> Awaitable[Payload]: ensure_channel_exists(channel_name) chat_data.channel_users[channel_name].add(self._session.session_id) return create_response() @router.response('channel.leave') - async def leave_channel(payload: Payload) -> Awaitable[Payload]: - channel_name = utf8_decode(payload.data) + async def leave_channel(channel_name: str) -> Awaitable[Payload]: chat_data.channel_users[channel_name].discard(self._session.session_id) return create_response() @@ -152,9 +157,7 @@ async def get_channels() -> Publisher: return StreamFromGenerator(lambda: generator) @router.fire_and_forget('statistics') - async def receive_statistics(payload: Payload): - statistics = decode_dataclass(payload.data, ClientStatistics) - + async def receive_statistics(statistics: ClientStatistics): logging.info('Received client statistics. memory usage: %s', statistics.memory_usage) self._session.statistics = statistics @@ -203,9 +206,7 @@ def on_next(self, payload: Payload, is_complete=False): return response, response @router.response('message') - async def send_message(payload: Payload) -> Awaitable[Payload]: - message = decode_dataclass(payload.data, Message) - + async def send_message(message: Message) -> Awaitable[Payload]: logging.info('Received message for user: %s, channel: %s', message.user, message.channel) target_message = Message(self._session.username, message.content, message.channel) @@ -239,6 +240,18 @@ async def _message_sender(self): return MessagePublisher(self._session) + @router.stream('channel.users') + async def get_channel_users(channel_name: str) -> Publisher: + if channel_name not in chat_data.channel_users: + return EmptyStream() + + count = len(chat_data.channel_users[channel_name]) + generator = ((Payload(ensure_bytes(find_username_by_session(session_id))), index == count) for + (index, session_id) in + enumerate(chat_data.channel_users[channel_name], 1)) + + return StreamFromGenerator(lambda: generator) + return router diff --git a/examples/tutorial/step6/shared.py b/examples/tutorial/step6/shared.py index 67e8976f..ce28789d 100644 --- a/examples/tutorial/step6/shared.py +++ b/examples/tutorial/step6/shared.py @@ -47,3 +47,14 @@ def dataclass_to_payload(obj) -> Payload: def decode_dataclass(data: bytes, cls: Type[T]) -> T: return cls(**json.loads(utf8_decode(data))) + + +def decode_payload(cls, payload: Payload): + data = payload.data + + if cls is bytes: + return data + if cls is str: + return utf8_decode(data) + + return decode_dataclass(data, cls) \ No newline at end of file diff --git a/examples/tutorial/step7/chat_client.py b/examples/tutorial/step7/chat_client.py index 5dc28792..1be0a78a 100644 --- a/examples/tutorial/step7/chat_client.py +++ b/examples/tutorial/step7/chat_client.py @@ -142,6 +142,11 @@ async def list_channels(self) -> List[str]: response = await AwaitableRSocket(self._rsocket).request_stream(request) return list(map(lambda _: utf8_decode(_.data), response)) + async def list_channel_users(self, channel_name: str): + request = Payload(ensure_bytes(channel_name), composite(route('channel.users'))) + response = await AwaitableRSocket(self._rsocket).request_stream(request) + return list(map(lambda _: utf8_decode(_.data), response)) + router = RequestRouter() @@ -179,20 +184,26 @@ async def main(): await files_example(user1, user2) -async def messaging_example(user1, user2): +async def messaging_example(user1: ChatClient, user2: ChatClient): user1.listen_for_messages() user2.listen_for_messages() - await user1.join('channel1') - await user2.join('channel1') + channel_name = 'channel1' + + await user1.join(channel_name) + await user2.join(channel_name) print(f'Channels: {await user1.list_channels()}') + print(f'Channel {channel_name} users: {await user1.list_channel_users(channel_name)}') await user1.private_message('user2', 'private message from user1') - await user1.channel_message('channel1', 'channel message from user1') + await user1.channel_message(channel_name, 'channel message from user1') await asyncio.sleep(1) + await user1.leave(channel_name) + print(f'Channel {channel_name} users: {await user1.list_channel_users(channel_name)}') + user1.stop_listening_for_messages() user2.stop_listening_for_messages() diff --git a/examples/tutorial/step7/chat_server.py b/examples/tutorial/step7/chat_server.py index 9439fb90..dae93a17 100644 --- a/examples/tutorial/step7/chat_server.py +++ b/examples/tutorial/step7/chat_server.py @@ -10,7 +10,7 @@ from more_itertools import first from examples.tutorial.step6.shared import (Message, chat_filename_mimetype, ClientStatistics, ServerStatisticsRequest, - ServerStatistics, dataclass_to_payload, decode_dataclass) + ServerStatistics, dataclass_to_payload, decode_dataclass, decode_payload) from reactivestreams.publisher import DefaultPublisher, Publisher from reactivestreams.subscriber import Subscriber, DefaultSubscriber from reactivestreams.subscription import DefaultSubscription @@ -22,6 +22,7 @@ from rsocket.routing.request_router import RequestRouter from rsocket.routing.routing_request_handler import RoutingRequestHandler from rsocket.rsocket_server import RSocketServer +from rsocket.streams.empty_stream import EmptyStream from rsocket.streams.stream_from_generator import StreamFromGenerator from rsocket.transports.tcp import TransportTCP @@ -79,6 +80,13 @@ def find_session_by_username(username: str) -> Optional[UserSessionData]: session.username == username), None) +def find_username_by_session(session_id: SessionId) -> Optional[str]: + session = chat_data.user_session_by_id.get(session_id) + if session is None: + return None + return session.username + + class ChatUserSession: def __init__(self): @@ -89,11 +97,10 @@ def remove(self): del chat_data.user_session_by_id[self._session.session_id] def router_factory(self): - router = RequestRouter() + router = RequestRouter(payload_mapper=decode_payload) @router.response('login') - async def login(payload: Payload) -> Awaitable[Payload]: - username = utf8_decode(payload.data) + async def login(username: str) -> Awaitable[Payload]: logging.info(f'New user: {username}') session_id = SessionId(uuid.uuid4()) self._session = UserSessionData(username, session_id) @@ -102,15 +109,13 @@ async def login(payload: Payload) -> Awaitable[Payload]: return create_response(ensure_bytes(session_id)) @router.response('channel.join') - async def join_channel(payload: Payload) -> Awaitable[Payload]: - channel_name = utf8_decode(payload.data) + async def join_channel(channel_name: str) -> Awaitable[Payload]: ensure_channel_exists(channel_name) chat_data.channel_users[channel_name].add(self._session.session_id) return create_response() @router.response('channel.leave') - async def leave_channel(payload: Payload) -> Awaitable[Payload]: - channel_name = utf8_decode(payload.data) + async def leave_channel(channel_name: str) -> Awaitable[Payload]: chat_data.channel_users[channel_name].discard(self._session.session_id) return create_response() @@ -140,9 +145,7 @@ async def get_channels() -> Publisher: return StreamFromGenerator(lambda: generator) @router.fire_and_forget('statistics') - async def receive_statistics(payload: Payload): - statistics = decode_dataclass(payload.data, ClientStatistics) - + async def receive_statistics(statistics: ClientStatistics): logging.info('Received client statistics. memory usage: %s', statistics.memory_usage) self._session.statistics = statistics @@ -202,9 +205,7 @@ def on_next(self, payload: Payload, is_complete=False): return response, response @router.response('message') - async def send_message(payload: Payload) -> Awaitable[Payload]: - message = decode_dataclass(payload.data, Message) - + async def send_message(message: Message) -> Awaitable[Payload]: logging.info('Received message for user: %s, channel: %s', message.user, message.channel) target_message = Message(self._session.username, message.content, message.channel) @@ -238,6 +239,18 @@ async def _message_sender(self): return MessagePublisher(self._session) + @router.stream('channel.users') + async def get_channel_users(channel_name: str) -> Publisher: + if channel_name not in chat_data.channel_users: + return EmptyStream() + + count = len(chat_data.channel_users[channel_name]) + generator = ((Payload(ensure_bytes(find_username_by_session(session_id))), index == count) for + (index, session_id) in + enumerate(chat_data.channel_users[channel_name], 1)) + + return StreamFromGenerator(lambda: generator) + return router diff --git a/examples/tutorial/step7/shared.py b/examples/tutorial/step7/shared.py index 67e8976f..ce28789d 100644 --- a/examples/tutorial/step7/shared.py +++ b/examples/tutorial/step7/shared.py @@ -47,3 +47,14 @@ def dataclass_to_payload(obj) -> Payload: def decode_dataclass(data: bytes, cls: Type[T]) -> T: return cls(**json.loads(utf8_decode(data))) + + +def decode_payload(cls, payload: Payload): + data = payload.data + + if cls is bytes: + return data + if cls is str: + return utf8_decode(data) + + return decode_dataclass(data, cls) \ No newline at end of file diff --git a/examples/tutorial/step8/chat_client.py b/examples/tutorial/step8/chat_client.py index 0fefc86e..0248dc2a 100644 --- a/examples/tutorial/step8/chat_client.py +++ b/examples/tutorial/step8/chat_client.py @@ -142,6 +142,11 @@ async def list_channels(self) -> List[str]: response = await AwaitableRSocket(self._rsocket).request_stream(request) return list(map(lambda _: utf8_decode(_.data), response)) + async def list_channel_users(self, channel_name: str): + request = Payload(ensure_bytes(channel_name), composite(route('channel.users'))) + response = await AwaitableRSocket(self._rsocket).request_stream(request) + return list(map(lambda _: utf8_decode(_.data), response)) + @asynccontextmanager async def connect(): @@ -168,20 +173,26 @@ async def main(): await files_example(user1, user2) -async def messaging_example(user1, user2): +async def messaging_example(user1: ChatClient, user2: ChatClient): user1.listen_for_messages() user2.listen_for_messages() - await user1.join('channel1') - await user2.join('channel1') + channel_name = 'channel1' + + await user1.join(channel_name) + await user2.join(channel_name) print(f'Channels: {await user1.list_channels()}') + print(f'Channel {channel_name} users: {await user1.list_channel_users(channel_name)}') await user1.private_message('user2', 'private message from user1') - await user1.channel_message('channel1', 'channel message from user1') + await user1.channel_message(channel_name, 'channel message from user1') await asyncio.sleep(1) + await user1.leave(channel_name) + print(f'Channel {channel_name} users: {await user1.list_channel_users(channel_name)}') + user1.stop_listening_for_messages() user2.stop_listening_for_messages() diff --git a/examples/tutorial/step8/chat_server.py b/examples/tutorial/step8/chat_server.py index b1f78243..b4ae005b 100644 --- a/examples/tutorial/step8/chat_server.py +++ b/examples/tutorial/step8/chat_server.py @@ -11,7 +11,7 @@ from more_itertools import first from examples.tutorial.step6.shared import (Message, chat_filename_mimetype, ClientStatistics, ServerStatisticsRequest, - ServerStatistics, dataclass_to_payload, decode_dataclass) + ServerStatistics, dataclass_to_payload, decode_dataclass, decode_payload) from reactivestreams.publisher import DefaultPublisher, Publisher from reactivestreams.subscriber import Subscriber, DefaultSubscriber from reactivestreams.subscription import DefaultSubscription @@ -23,6 +23,7 @@ from rsocket.routing.request_router import RequestRouter from rsocket.routing.routing_request_handler import RoutingRequestHandler from rsocket.rsocket_server import RSocketServer +from rsocket.streams.empty_stream import EmptyStream from rsocket.streams.stream_from_generator import StreamFromGenerator from rsocket.transports.aiohttp_websocket import TransportAioHttpWebsocket @@ -91,6 +92,12 @@ def new_statistics_data(statistics_request: ServerStatisticsRequest): return ServerStatistics(**statistics_data) +def find_username_by_session(session_id: SessionId) -> Optional[str]: + session = chat_data.user_session_by_id.get(session_id) + if session is None: + return None + return session.username + class ChatUserSession: @@ -102,11 +109,10 @@ def remove(self): del chat_data.user_session_by_id[self._session.session_id] def router_factory(self): - router = RequestRouter() + router = RequestRouter(payload_mapper=decode_payload) @router.response('login') - async def login(payload: Payload) -> Awaitable[Payload]: - username = utf8_decode(payload.data) + async def login(username: str) -> Awaitable[Payload]: logging.info(f'New user: {username}') session_id = SessionId(uuid.uuid4()) self._session = UserSessionData(username, session_id) @@ -115,15 +121,13 @@ async def login(payload: Payload) -> Awaitable[Payload]: return create_response(ensure_bytes(session_id)) @router.response('channel.join') - async def join_channel(payload: Payload) -> Awaitable[Payload]: - channel_name = utf8_decode(payload.data) + async def join_channel(channel_name: str) -> Awaitable[Payload]: ensure_channel_exists(channel_name) chat_data.channel_users[channel_name].add(self._session.session_id) return create_response() @router.response('channel.leave') - async def leave_channel(payload: Payload) -> Awaitable[Payload]: - channel_name = utf8_decode(payload.data) + async def leave_channel(channel_name: str) -> Awaitable[Payload]: chat_data.channel_users[channel_name].discard(self._session.session_id) return create_response() @@ -153,9 +157,7 @@ async def get_channels() -> Publisher: return StreamFromGenerator(lambda: generator) @router.fire_and_forget('statistics') - async def receive_statistics(payload: Payload): - statistics = decode_dataclass(payload.data, ClientStatistics) - + async def receive_statistics(statistics: ClientStatistics): logging.info('Received client statistics. memory usage: %s', statistics.memory_usage) self._session.statistics = statistics @@ -204,9 +206,7 @@ def on_next(self, payload: Payload, is_complete=False): return response, response @router.response('message') - async def send_message(payload: Payload) -> Awaitable[Payload]: - message = decode_dataclass(payload.data, Message) - + async def send_message(message: Message) -> Awaitable[Payload]: logging.info('Received message for user: %s, channel: %s', message.user, message.channel) target_message = Message(self._session.username, message.content, message.channel) @@ -240,6 +240,18 @@ async def _message_sender(self): return MessagePublisher(self._session) + @router.stream('channel.users') + async def get_channel_users(channel_name: str) -> Publisher: + if channel_name not in chat_data.channel_users: + return EmptyStream() + + count = len(chat_data.channel_users[channel_name]) + generator = ((Payload(ensure_bytes(find_username_by_session(session_id))), index == count) for + (index, session_id) in + enumerate(chat_data.channel_users[channel_name], 1)) + + return StreamFromGenerator(lambda: generator) + return router diff --git a/examples/tutorial/step8/shared.py b/examples/tutorial/step8/shared.py index 67e8976f..ce28789d 100644 --- a/examples/tutorial/step8/shared.py +++ b/examples/tutorial/step8/shared.py @@ -47,3 +47,14 @@ def dataclass_to_payload(obj) -> Payload: def decode_dataclass(data: bytes, cls: Type[T]) -> T: return cls(**json.loads(utf8_decode(data))) + + +def decode_payload(cls, payload: Payload): + data = payload.data + + if cls is bytes: + return data + if cls is str: + return utf8_decode(data) + + return decode_dataclass(data, cls) \ No newline at end of file From 0e0e277dc16bc5e474f621cb371678070e2300fc Mon Sep 17 00:00:00 2001 From: jell-o-fishi Date: Sat, 10 Dec 2022 19:04:40 +0200 Subject: [PATCH 8/8] changelog update. allow different names for composite metadata routed method argument. add Payload class to documentation --- CHANGELOG.rst | 21 ++++++++++++--------- docs/api.rst | 16 +++++++++++++--- docs/quickstart.rst | 1 - rsocket/payload.py | 7 +++++++ rsocket/routing/request_router.py | 2 +- 5 files changed, 33 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index d92477bb..a89f2821 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -5,15 +5,18 @@ v0.4.6 ====== - fire_and_forget now only removes the stream id when the future denoting the frame was sent, is done - API documentation auto generated at rsocket.readthedocs.io -- Raise error on empty or None route specified in request router -- Added the following methods to RequestRouter to allow specifying handlers of unknown routes: - - response_unknown - - stream_unknown - - channel_unknown - - fire_and_forget_unknown - - metadata_push_unknown -- Officially support route aliases by using the decorator multiple times on the same method -- Fix value mapping in request router. A parameter of any name (not just *payload*) specified on a routed method with a type-hint other than Payload will use the payload_mapper specified in the router instantiation to decode the value. +- Request router changes: + - Raise error on empty or None route specified in request router + - Added the following methods to RequestRouter to allow specifying handlers of unknown routes: + - response_unknown + - stream_unknown + - channel_unknown + - fire_and_forget_unknown + - metadata_push_unknown + - Officially support route aliases by using the decorator multiple times on the same method + - Fix value mapping in request router: + -A parameter of any name (not just *payload*) specified on a routed method with a type-hint other than Payload will use the payload_mapper to decode the value + - Any parameter with the type CompositeMetadata will receive the composite metadata v0.4.5 ====== diff --git a/docs/api.rst b/docs/api.rst index 84614fa4..c71c97b8 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -1,27 +1,37 @@ Core API Reference ================== +Controls +-------- + Server ------- +~~~~~~ .. automodule:: rsocket.rsocket_server :members: :inherited-members: Client ------- +~~~~~~ .. automodule:: rsocket.rsocket_client :members: :inherited-members: Handler -------- +~~~~~~~ .. automodule:: rsocket.request_handler :members: +Models +------ + +.. automodule:: rsocket.payload + :members: + + Interfaces ---------- diff --git a/docs/quickstart.rst b/docs/quickstart.rst index 35f5fd5b..ccf6948f 100644 --- a/docs/quickstart.rst +++ b/docs/quickstart.rst @@ -4,5 +4,4 @@ Quick start .. autosummary:: :toctree: generated - A quick getting started guide is available at https://rsocket.io/guides/rsocket-py/simple \ No newline at end of file diff --git a/rsocket/payload.py b/rsocket/payload.py index f3e99038..1fe9d0ee 100644 --- a/rsocket/payload.py +++ b/rsocket/payload.py @@ -5,6 +5,13 @@ class Payload: + """ + A stream message (upstream or downstream). Contains data and metadata, boty `bytes`. + + :param data: data segment of payload + :param metadata: metadata segment of payload + """ + __slots__ = ('data', 'metadata') @staticmethod diff --git a/rsocket/routing/request_router.py b/rsocket/routing/request_router.py index 56669c4a..6799d973 100644 --- a/rsocket/routing/request_router.py +++ b/rsocket/routing/request_router.py @@ -151,7 +151,7 @@ def _collect_route_arguments(self, route_processor, payload, composite_metadata) for parameter in route_signature.parameters: parameter_type = route_signature.parameters[parameter] - if 'composite_metadata' == parameter: + if 'composite_metadata' == parameter or parameter_type is CompositeMetadata: route_kwargs['composite_metadata'] = composite_metadata else: if parameter_type.annotation not in (Payload, parameter_type.empty):