Skip to content

Commit

Permalink
Merge pull request #98 from rsocket/routing_improvements
Browse files Browse the repository at this point in the history
routing improvements
  • Loading branch information
jell-o-fishi authored Dec 10, 2022
2 parents 39c9be1 + 0e0e277 commit 785d980
Show file tree
Hide file tree
Showing 31 changed files with 522 additions and 141 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,18 @@ v0.4.6
======
- fire_and_forget now only removes the stream id when the future denoting the frame was sent, is done
- API documentation auto generated at rsocket.readthedocs.io
- Request router changes:
- Raise error on empty or None route specified in request router
- Added the following methods to RequestRouter to allow specifying handlers of unknown routes:
- response_unknown
- stream_unknown
- channel_unknown
- fire_and_forget_unknown
- metadata_push_unknown
- Officially support route aliases by using the decorator multiple times on the same method
- Fix value mapping in request router:
-A parameter of any name (not just *payload*) specified on a routed method with a type-hint other than Payload will use the payload_mapper to decode the value
- Any parameter with the type CompositeMetadata will receive the composite metadata

v0.4.5
======
Expand Down
16 changes: 13 additions & 3 deletions docs/api.rst
Original file line number Diff line number Diff line change
@@ -1,27 +1,37 @@
Core API Reference
==================

Controls
--------

Server
------
~~~~~~

.. automodule:: rsocket.rsocket_server
:members:
:inherited-members:

Client
------
~~~~~~

.. automodule:: rsocket.rsocket_client
:members:
:inherited-members:

Handler
-------
~~~~~~~

.. automodule:: rsocket.request_handler
:members:


Models
------

.. automodule:: rsocket.payload
:members:


Interfaces
----------

Expand Down
1 change: 0 additions & 1 deletion docs/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,4 @@ Quick start
.. autosummary::
:toctree: generated


A quick getting started guide is available at https://rsocket.io/guides/rsocket-py/simple
19 changes: 15 additions & 4 deletions examples/tutorial/reactivex/chat_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from reactivex import operators

from examples.tutorial.reactivex.models import (Message, chat_filename_mimetype, ServerStatistics, ClientStatistics,
from examples.tutorial.reactivex.shared import (Message, chat_filename_mimetype, ServerStatistics, ClientStatistics,
ServerStatisticsRequest, encode_dataclass, dataclass_to_payload,
decode_dataclass)
from rsocket.extensions.helpers import composite, route, metadata_item
Expand Down Expand Up @@ -147,6 +147,12 @@ async def list_channels(self) -> List[str]:
).pipe(operators.map(lambda _: utf8_decode(_.data)),
operators.to_list())

async def list_channel_users(self, channel_name: str) -> List[str]:
request = Payload(ensure_bytes(channel_name), composite(route('channel.users')))
return await ReactiveXClient(self._rsocket).request_stream(
request
).pipe(operators.map(lambda _: utf8_decode(_.data)),
operators.to_list())

async def main():
connection1 = await asyncio.open_connection('localhost', 6565)
Expand Down Expand Up @@ -174,16 +180,21 @@ async def messaging_example(user1, user2):
user1.listen_for_messages()
user2.listen_for_messages()

await user1.join('channel1')
await user2.join('channel1')
channel_name = 'channel1'
await user1.join(channel_name)
await user2.join(channel_name)

print(f'Channels: {await user1.list_channels()}')
print(f'Channel {channel_name} users: {await user1.list_channel_users(channel_name)}')

await user1.private_message('user2', 'private message from user1')
await user1.channel_message('channel1', 'channel message from user1')
await user1.channel_message(channel_name, 'channel message from user1')

await asyncio.sleep(1)

await user1.leave(channel_name)
print(f'Channel {channel_name} users: {await user1.list_channel_users(channel_name)}')

user1.stop_listening_for_messages()
user2.stop_listening_for_messages()

Expand Down
43 changes: 26 additions & 17 deletions examples/tutorial/reactivex/chat_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
from more_itertools import first
from reactivex import Observable, operators, Subject, Observer

from examples.tutorial.reactivex.models import (Message, chat_filename_mimetype, ClientStatistics,
from examples.tutorial.reactivex.shared import (Message, chat_filename_mimetype, ClientStatistics,
ServerStatisticsRequest, ServerStatistics, dataclass_to_payload,
decode_dataclass)
decode_dataclass, decode_payload)
from rsocket.extensions.composite_metadata import CompositeMetadata
from rsocket.extensions.helpers import composite, metadata_item
from rsocket.frame_helpers import ensure_bytes
Expand All @@ -36,15 +36,15 @@ class SessionId(str): # allow weak reference
@dataclass()
class UserSessionData:
username: str
session_id: str
session_id: SessionId
messages: Queue = field(default_factory=Queue)
statistics: Optional[ClientStatistics] = None
requested_statistics: ServerStatisticsRequest = field(default_factory=ServerStatisticsRequest)


@dataclass(frozen=True)
class ChatData:
channel_users: Dict[str, Set[str]] = field(default_factory=lambda: defaultdict(WeakSet))
channel_users: Dict[str, Set[SessionId]] = field(default_factory=lambda: defaultdict(WeakSet))
files: Dict[str, bytes] = field(default_factory=dict)
channel_messages: Dict[str, Queue] = field(default_factory=lambda: defaultdict(Queue))
user_session_by_id: Dict[str, UserSessionData] = field(default_factory=WeakValueDictionary)
Expand Down Expand Up @@ -95,6 +95,13 @@ def new_statistics_data(requested_statistics: ServerStatisticsRequest):
return ServerStatistics(**statistics_data)


def find_username_by_session(session_id: SessionId) -> Optional[str]:
session = chat_data.user_session_by_id.get(session_id)
if session is None:
return None
return session.username


class ChatUserSession:

def __init__(self):
Expand All @@ -105,11 +112,10 @@ def remove(self):
del chat_data.user_session_by_id[self._session.session_id]

def router_factory(self):
router = RequestRouter()
router = RequestRouter(payload_mapper=decode_payload)

@router.response('login')
async def login(payload: Payload) -> Observable:
username = utf8_decode(payload.data)
async def login(username: str) -> Observable:
logging.info(f'New user: {username}')
session_id = SessionId(uuid.uuid4())
self._session = UserSessionData(username, session_id)
Expand All @@ -118,15 +124,13 @@ async def login(payload: Payload) -> Observable:
return reactivex.just(Payload(ensure_bytes(session_id)))

@router.response('channel.join')
async def join_channel(payload: Payload) -> Observable:
channel_name = utf8_decode(payload.data)
async def join_channel(channel_name: str) -> Observable:
ensure_channel_exists(channel_name)
chat_data.channel_users[channel_name].add(self._session.session_id)
return reactivex.empty()

@router.response('channel.leave')
async def leave_channel(payload: Payload) -> Observable:
channel_name = utf8_decode(payload.data)
async def leave_channel(channel_name: str) -> Observable:
chat_data.channel_users[channel_name].discard(self._session.session_id)
return reactivex.empty()

Expand Down Expand Up @@ -157,10 +161,17 @@ async def get_channels() -> Observable:
return reactivex.from_iterable(
(Payload(ensure_bytes(channel)) for channel in chat_data.channel_messages.keys()))

@router.fire_and_forget('statistics')
async def receive_statistics(payload: Payload):
statistics = decode_dataclass(payload.data, ClientStatistics)
@router.stream('channel.users')
async def get_channel_users(channel_name: str) -> Observable:
if channel_name not in chat_data.channel_users:
return reactivex.empty()

return reactivex.from_iterable(Payload(ensure_bytes(find_username_by_session(session_id))) for
session_id in
chat_data.channel_users[channel_name])

@router.fire_and_forget('statistics')
async def receive_statistics(statistics: ClientStatistics):
logging.info('Received client statistics. memory usage: %s', statistics.memory_usage)

self._session.statistics = statistics
Expand Down Expand Up @@ -198,9 +209,7 @@ def on_next(payload: Payload):
limit_rate=2)

@router.response('message')
async def send_message(payload: Payload) -> Observable:
message = decode_dataclass(payload.data, Message)

async def send_message(message: Message) -> Observable:
logging.info('Received message for user: %s, channel: %s', message.user, message.channel)

target_message = Message(self._session.username, message.content, message.channel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,14 @@ def dataclass_to_payload(obj) -> Payload:

def decode_dataclass(data: bytes, cls: Type[T]) -> T:
return cls(**json.loads(utf8_decode(data)))


def decode_payload(cls, payload: Payload):
data = payload.data

if cls is bytes:
return data
if cls is str:
return utf8_decode(data)

return decode_dataclass(data, cls)
2 changes: 1 addition & 1 deletion examples/tutorial/step3/chat_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
from typing import Optional

from examples.tutorial.step3.models import Message, encode_dataclass, decode_dataclass
from examples.tutorial.step3.shared import Message, encode_dataclass, decode_dataclass
from reactivestreams.subscriber import DefaultSubscriber
from reactivestreams.subscription import DefaultSubscription
from rsocket.extensions.helpers import composite, route
Expand Down
2 changes: 1 addition & 1 deletion examples/tutorial/step3/chat_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from more_itertools import first

from examples.tutorial.step3.models import Message, dataclass_to_payload, decode_dataclass
from examples.tutorial.step3.shared import Message, dataclass_to_payload, decode_dataclass
from reactivestreams.publisher import DefaultPublisher, Publisher
from reactivestreams.subscriber import Subscriber
from reactivestreams.subscription import DefaultSubscription
Expand Down
File renamed without changes.
19 changes: 15 additions & 4 deletions examples/tutorial/step4/chat_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
from typing import List, Optional

from examples.tutorial.step4.models import Message, encode_dataclass, decode_dataclass
from examples.tutorial.step4.shared import Message, encode_dataclass, decode_dataclass
from reactivestreams.subscriber import DefaultSubscriber
from reactivestreams.subscription import DefaultSubscription
from rsocket.awaitable.awaitable_rsocket import AwaitableRSocket
Expand Down Expand Up @@ -77,6 +77,11 @@ async def list_channels(self) -> List[str]:
response = await AwaitableRSocket(self._rsocket).request_stream(request)
return list(map(lambda _: utf8_decode(_.data), response))

async def list_channel_users(self, channel_name: str):
request = Payload(ensure_bytes(channel_name), composite(route('channel.users')))
response = await AwaitableRSocket(self._rsocket).request_stream(request)
return list(map(lambda _: utf8_decode(_.data), response))


async def main():
connection1 = await asyncio.open_connection('localhost', 6565)
Expand All @@ -100,16 +105,22 @@ async def messaging_example(user1: ChatClient, user2: ChatClient):
user1.listen_for_messages()
user2.listen_for_messages()

await user1.join('channel1')
await user2.join('channel1')
channel_name = 'channel1'

await user1.join(channel_name)
await user2.join(channel_name)

print(f'Channels: {await user1.list_channels()}')
print(f'Channel {channel_name} users: {await user1.list_channel_users(channel_name)}')

await user1.private_message('user2', 'private message from user1')
await user1.channel_message('channel1', 'channel message from user1')
await user1.channel_message(channel_name, 'channel message from user1')

await asyncio.sleep(1)

await user1.leave(channel_name)
print(f'Channel {channel_name} users: {await user1.list_channel_users(channel_name)}')

user1.stop_listening_for_messages()
user2.stop_listening_for_messages()

Expand Down
18 changes: 6 additions & 12 deletions examples/tutorial/step4/chat_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from more_itertools import first

from examples.tutorial.step4.models import (Message, chat_filename_mimetype, dataclass_to_payload, decode_dataclass)
from examples.tutorial.step4.shared import (Message, chat_filename_mimetype, dataclass_to_payload, decode_payload)
from reactivestreams.publisher import DefaultPublisher, Publisher
from reactivestreams.subscriber import Subscriber
from reactivestreams.subscription import DefaultSubscription
Expand Down Expand Up @@ -88,7 +88,7 @@ def __init__(self):
self._session: Optional[UserSessionData] = None

def router_factory(self):
router = RequestRouter()
router = RequestRouter(payload_mapper=decode_payload)

@router.response('login')
async def login(payload: Payload) -> Awaitable[Payload]:
Expand All @@ -101,15 +101,13 @@ async def login(payload: Payload) -> Awaitable[Payload]:
return create_response(ensure_bytes(session_id))

@router.response('channel.join')
async def join_channel(payload: Payload) -> Awaitable[Payload]:
channel_name = utf8_decode(payload.data)
async def join_channel(channel_name: str) -> Awaitable[Payload]:
ensure_channel_exists(channel_name)
chat_data.channel_users[channel_name].add(self._session.session_id)
return create_response()

@router.response('channel.leave')
async def leave_channel(payload: Payload) -> Awaitable[Payload]:
channel_name = utf8_decode(payload.data)
async def leave_channel(channel_name: str) -> Awaitable[Payload]:
chat_data.channel_users[channel_name].discard(self._session.session_id)
return create_response()

Expand All @@ -121,9 +119,7 @@ async def get_channels() -> Publisher:
return StreamFromGenerator(lambda: generator)

@router.response('message')
async def send_message(payload: Payload) -> Awaitable[Payload]:
message = decode_dataclass(payload.data, Message)

async def send_message(message: Message) -> Awaitable[Payload]:
logging.info('Received message for user: %s, channel: %s', message.user, message.channel)

target_message = Message(self._session.username, message.content, message.channel)
Expand Down Expand Up @@ -158,9 +154,7 @@ async def _message_sender(self):
return MessagePublisher(self._session)

@router.stream('channel.users')
async def get_channel_users(payload: Payload) -> Publisher:
channel_name = utf8_decode(payload.data)

async def get_channel_users(channel_name: str) -> Publisher:
if channel_name not in chat_data.channel_users:
return EmptyStream()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,14 @@ def dataclass_to_payload(obj) -> Payload:

def decode_dataclass(data: bytes, cls: Type[T]) -> T:
return cls(**json.loads(utf8_decode(data)))


def decode_payload(cls, payload: Payload):
data = payload.data

if cls is bytes:
return data
if cls is str:
return utf8_decode(data)

return decode_dataclass(data, cls)
Loading

0 comments on commit 785d980

Please sign in to comment.