diff --git a/.gitignore b/.gitignore index 5c4fe8e..78d03e3 100644 --- a/.gitignore +++ b/.gitignore @@ -58,3 +58,6 @@ bin/ # PyBuilder target/ + +# PyCharm +.idea/ diff --git a/examples/async_simple_tcp_client.py b/examples/async_simple_tcp_client.py new file mode 100644 index 0000000..b5297c7 --- /dev/null +++ b/examples/async_simple_tcp_client.py @@ -0,0 +1,49 @@ +"""Small example Asynchronous OSC TCP client + +This program listens for incoming messages in one task, and +sends 10 random values between 0.0 and 1.0 to the /filter address, +waiting for 1 seconds between each value in a second task. +""" +import argparse +import asyncio +import random +import sys + +from pythonosc import tcp_client + + +async def get_messages(client): + async for msg in client.get_messages(60): + print(msg) + + +async def send_messages(client): + for x in range(10): + r = random.random() + print(f"Sending /filter {r}") + await client.send_message("/filter", r) + await asyncio.sleep(1) + + +async def init_main(): + parser = argparse.ArgumentParser() + parser.add_argument("--ip", default="127.0.0.1", + help="The ip of the OSC server") + parser.add_argument("--port", type=int, default=5005, + help="The port the OSC server is listening on") + parser.add_argument("--mode", default="1.1", + help="The OSC protocol version of the server (default is 1.1)") + args = parser.parse_args() + + async with tcp_client.AsyncSimpleTCPClient(args.ip, args.port, mode=args.mode) as client: + async with asyncio.TaskGroup() as tg: + tg.create_task(get_messages(client)) + tg.create_task(send_messages(client)) + +if sys.version_info >= (3, 7): + asyncio.run(init_main()) +else: + # TODO(python-upgrade): drop this once 3.6 is no longer supported + event_loop = asyncio.get_event_loop() + event_loop.run_until_complete(init_main()) + event_loop.close() diff --git a/examples/async_tcp_server.py b/examples/async_tcp_server.py new file mode 100644 index 0000000..873ac3d --- /dev/null +++ b/examples/async_tcp_server.py @@ -0,0 +1,46 @@ +import argparse +import asyncio +import sys + +from pythonosc.dispatcher import Dispatcher +from pythonosc.osc_tcp_server import AsyncOSCTCPServer + + +def filter_handler(address, *args): + print(f"{address}: {args}") + + +dispatcher = Dispatcher() +dispatcher.map("/filter", filter_handler) + + +async def loop(): + """Example main loop that only runs for 10 iterations before finishing""" + for i in range(10): + print(f"Loop {i}") + await asyncio.sleep(10) + + +async def init_main(): + parser = argparse.ArgumentParser() + parser.add_argument("--ip", default="127.0.0.1", + help="The ip of the OSC server") + parser.add_argument("--port", type=int, default=5005, + help="The port the OSC server is listening on") + parser.add_argument("--mode", default="1.1", + help="The OSC protocol version of the server (default is 1.1)") + args = parser.parse_args() + + async with AsyncOSCTCPServer(args.ip, args.port, dispatcher, mode=args.mode) as server: + async with asyncio.TaskGroup() as tg: + tg.create_task(server.start()) + tg.create_task(loop()) + + +if sys.version_info >= (3, 7): + asyncio.run(init_main()) +else: + # TODO(python-upgrade): drop this once 3.6 is no longer supported + event_loop = asyncio.get_event_loop() + event_loop.run_until_complete(init_main()) + event_loop.close() diff --git a/examples/simple_tcp_client.py b/examples/simple_tcp_client.py new file mode 100644 index 0000000..53d319c --- /dev/null +++ b/examples/simple_tcp_client.py @@ -0,0 +1,31 @@ +"""Small example OSC client + +This program sends 10 random values between 0.0 and 1.0 to the /filter address, +and listens for incoming messages for 1 second between each value. +""" +import argparse +import random + +from pythonosc import tcp_client + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--ip", default="127.0.0.1", + help="The ip of the OSC server") + parser.add_argument("--port", type=int, default=5005, + help="The port the OSC server is listening on") + parser.add_argument("--mode", default="1.1", + help="The OSC protocol version of the server (default is 1.1)") + args = parser.parse_args() + + with tcp_client.SimpleTCPClient(args.ip, args.port, mode=args.mode) as client: + for x in range(10): + n = random.random() + print(f"Sending /filter {n}") + client.send_message("/filter", n) + resp = client.get_messages(1) + for r in resp: + try: + print(r) + except Exception as e: + print(f"oops {str(e)}: {r}") diff --git a/examples/simple_tcp_server.py b/examples/simple_tcp_server.py new file mode 100644 index 0000000..c5883ea --- /dev/null +++ b/examples/simple_tcp_server.py @@ -0,0 +1,43 @@ +"""Small example OSC server + +This program listens to the specified address and port, and prints some information about +received packets. +""" +import argparse +import math + +from pythonosc import osc_tcp_server +from pythonosc.dispatcher import Dispatcher + + +def print_volume_handler(unused_addr, args, volume): + print("[{0}] ~ {1}".format(args[0], volume)) + + +def print_compute_handler(unused_addr, args, volume): + try: + print("[{0}] ~ {1}".format(args[0], args[1](volume))) + except ValueError: + pass + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--ip", + default="127.0.0.1", help="The ip to listen on") + parser.add_argument("--port", + type=int, default=5005, help="The port to listen on") + parser.add_argument("--mode", default="1.1", + help="The OSC protocol version of the server (default is 1.1)") + + args = parser.parse_args() + + dispatcher = Dispatcher() + dispatcher.map("/filter", print) + dispatcher.map("/volume", print_volume_handler, "Volume") + dispatcher.map("/logvolume", print_compute_handler, "Log volume", math.log) + + server = osc_tcp_server.ThreadingOSCTCPServer( + (args.ip, args.port), dispatcher, mode=args.mode) + print("Serving on {}".format(server.server_address)) + server.serve_forever() diff --git a/pythonosc/osc_message.py b/pythonosc/osc_message.py index 3d46551..180372d 100644 --- a/pythonosc/osc_message.py +++ b/pythonosc/osc_message.py @@ -22,6 +22,9 @@ def __init__(self, dgram: bytes) -> None: self._parameters = [] # type: List[Any] self._parse_datagram() + def __str__(self): + return f"{self.address} {' '.join(str(p) for p in self.params)}" + def _parse_datagram(self) -> None: try: self._address_regexp, index = osc_types.get_string(self._dgram, 0) diff --git a/pythonosc/osc_message_builder.py b/pythonosc/osc_message_builder.py index e91b76c..ffbb1e5 100644 --- a/pythonosc/osc_message_builder.py +++ b/pythonosc/osc_message_builder.py @@ -1,4 +1,5 @@ """Build OSC messages for client applications.""" +from typing import Iterable from pythonosc import osc_message from pythonosc.parsing import osc_types @@ -195,3 +196,16 @@ def build(self) -> osc_message.OscMessage: return osc_message.OscMessage(dgram) except osc_types.BuildError as be: raise BuildError("Could not build the message: {}".format(be)) + + +def build_msg(address: str, value: ArgValue): + builder = OscMessageBuilder(address=address) + if value is None: + values = [] + elif not isinstance(value, Iterable) or isinstance(value, (str, bytes)): + values = [value] + else: + values = value + for val in values: + builder.add_arg(val) + return builder.build() \ No newline at end of file diff --git a/pythonosc/osc_tcp_server.py b/pythonosc/osc_tcp_server.py new file mode 100644 index 0000000..c648638 --- /dev/null +++ b/pythonosc/osc_tcp_server.py @@ -0,0 +1,392 @@ +# TODO: timeouts! + + +"""OSC Servers that receive TCP packets and invoke handlers accordingly. + +Use like this: + +dispatcher = dispatcher.Dispatcher() +# This will print all parameters to stdout. +dispatcher.map("/bpm", print) +server = ForkingOSCTCPServer((ip, port), dispatcher) +server.serve_forever() + +or run the server on its own thread: +server = ForkingOSCTCPServer((ip, port), dispatcher) +server_thread = threading.Thread(target=server.serve_forever) +server_thread.start() +... +server.shutdown() + + +Those servers are using the standard socketserver from the standard library: +http://docs.python.org/library/socketserver.html + + +Alternatively, the AsyncIOOSCTCPServer server can be integrated with an +asyncio event loop: + +loop = asyncio.get_event_loop() +server = AsyncIOOSCTCPServer(server_address, dispatcher) +server.serve() +loop.run_forever() + +""" + +import asyncio +import inspect +import logging +import os +import socketserver +import struct +import time + +from pythonosc import osc_message_builder, osc_packet, slip + +LOG = logging.getLogger() +MODE_1_0 = "1.0" +MODE_1_1 = "1.1" + + +def _call_handlers_for_packet(data, dispatcher): + """ + This function calls the handlers registered to the dispatcher for + every message it found in the packet. + The process/thread granularity is thus the OSC packet, not the handler. + + If parameters were registered with the dispatcher, then the handlers are + called this way: + handler('/address that triggered the message', + registered_param_list, osc_msg_arg1, osc_msg_arg2, ...) + if no parameters were registered, then it is just called like this: + handler('/address that triggered the message', + osc_msg_arg1, osc_msg_arg2, osc_msg_param3, ...) + """ + + # Get OSC messages from all bundles or standalone message. + all_resp = [] + try: + LOG.debug("_call_handlers_for_packet: data ", data) + packet = osc_packet.OscPacket(data) + for timed_msg in packet.messages: + now = time.time() + handlers = dispatcher.handlers_for_address( + timed_msg.message.address) + if not handlers: + continue + # If the message is to be handled later, then so be it. + if timed_msg.time > now: + time.sleep(timed_msg.time - now) + for handler in handlers: + if handler.args: + resp = handler.callback( + timed_msg.message.address, handler.args, *timed_msg.message) + else: + resp = handler.callback(timed_msg.message.address, *timed_msg.message) + if resp: + all_resp.append(resp) + except osc_packet.ParseError: + pass + return all_resp + + +class _TCPHandler1_0(socketserver.BaseRequestHandler): + """Handles correct OSC1.0 messages. + + Whether this will be run on its own thread, the server's or a whole new + process depends on the server you instantiated, look at their documentation. + + This method is called after a basic sanity check was done on the datagram, + basically whether this datagram looks like an osc message or bundle, + if not the server won't even bother to call it and so no new + threads/processes will be spawned. + """ + def handle(self): + LOG.debug("handle OSC 1.0 protocol") + while True: + lengthbuf = self.recvall(4) + if lengthbuf is None: + break + length, = struct.unpack('!I', lengthbuf) + data = self.recvall(length) + if data is None: + break + + resp = _call_handlers_for_packet(data, self.server.dispatcher) + for r in resp: + if r is not None: + if not isinstance(r, list): + r = [r] + msg = osc_message_builder.build_msg(r[0], r[1:]) + b = struct.pack('!I', len(msg.dgram)) + self.request.sendall(b + msg.dgram) + + def recvall(self, count): + buf = b'' + while count > 0: + newbuf = self.request.recv(count) + if not newbuf: + return None + buf += newbuf + count -= len(newbuf) + return buf + + +class _TCPHandler1_1(socketserver.BaseRequestHandler): + """Handles correct OSC1.1 messages. + + Whether this will be run on its own thread, the server's or a whole new + process depends on the server you instantiated, look at their documentation. + + This method is called after a basic sanity check was done on the datagram, + basically whether this datagram looks like an osc message or bundle, + if not the server won't even bother to call it and so no new + threads/processes will be spawned. + """ + def handle(self): + LOG.debug("handle OSC 1.1 protocol") + while True: + packets = self.recvall() + if packets is None: + break + + for p in packets: + resp = _call_handlers_for_packet(p, self.server.dispatcher) + for r in resp: + if not isinstance(r, list): + r = [r] + msg = osc_message_builder.build_msg(r[0], r[1:]) + self.request.sendall(slip.encode(msg.dgram)) + + def recvall(self): + buf = self.request.recv(4096) + if not buf: + return None + # If the last byte is not an END marker there could be more data coming + while buf[-1] != 192: + newbuf = self.request.recv(4096) + if not newbuf: + # Maybe should raise an exception here? + break + buf += newbuf + + packets = [slip.decode(p) for p in buf.split(slip.END_END)] + return packets + + +class OSCTCPServer(socketserver.TCPServer): + """Superclass for different flavors of OSCTCPServer""" + + def __init__(self, server_address, dispatcher, mode: str = MODE_1_1): + self.request_queue_size = 300 + self.mode = mode + if mode not in [MODE_1_0, MODE_1_1]: + raise ValueError("OSC Mode must be '1.0' or '1.1'") + if self.mode == MODE_1_0: + super().__init__(server_address, _TCPHandler1_0) + else: + super().__init__(server_address, _TCPHandler1_1) + self._dispatcher = dispatcher + + def verify_request(self, request, client_address): + """Returns true if the data looks like a valid OSC TCP datagram.""" + # d = request.recv(9999).decode("utf-8") + # print("d:type=%s d=%s" % (type(d), d)) + return True + + @property + def dispatcher(self): + """Dispatcher accessor for handlers to dispatch osc messages.""" + return self._dispatcher + + +class BlockingOSCTCPServer(OSCTCPServer): + """Blocking version of the TCP server. + + Each message will be handled sequentially on the same thread. + Use this is you don't care about latency in your message handling or don't + have a multiprocess/multithread environment (really?). + """ + + +class ThreadingOSCTCPServer(socketserver.ThreadingMixIn, OSCTCPServer): + """Threading version of the OSC TCP server. + + Each message will be handled in its own new thread. + Use this when lightweight operations are done by each message handlers. + """ + + +if hasattr(os, "fork"): + class ForkingOSCTCPServer(socketserver.ForkingMixIn, OSCTCPServer): + """Forking version of the OSC TCP server. + + Each message will be handled in its own new process. + Use this when heavyweight operations are done by each message handlers + and forking a whole new process for each of them is worth it. + """ + + +class AsyncOSCTCPServer: + """Asyncio version of the OSC TCP Server. + Each TCP message is handled by _call_handlers_for_packet, the same method as in the + OSCTCPServer family of blocking, threading, and forking servers + """ + + def __init__(self, server_address: str, port: int, dispatcher, mode: str = MODE_1_1): + """ + :param server_address: tuple of (IP address to bind to, port) + :param dispatcher: a pythonosc.dispatcher.Dispatcher + """ + self._port = port + self._server_address = server_address + self._dispatcher = dispatcher + self._server = None + self._mode = mode + + # class _OSCProtocolFactory(asyncio.DatagramProtocol): + # """OSC protocol factory which passes datagrams to _call_handlers_for_packet""" + # + # def __init__(self, dispatcher): + # self.dispatcher = dispatcher + # + # def datagram_received(self, data, unused_addr): + # _call_handlers_for_packet(data, self.dispatcher) + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.stop() + + async def start(self): + """creates a socket endpoint and registers it with our event loop""" + self._server = await asyncio.start_server( + self.handle, self._server_address, self._port) + + addrs = ', '.join(str(sock.getsockname()) for sock in self._server.sockets) + LOG.debug(f'Serving on {addrs}') + + async with self._server: + await self._server.serve_forever() + + async def stop(self): + await self._server.cancel() + + @property + def dispatcher(self): + return self._dispatcher + + async def handle(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): + if self._mode == MODE_1_1: + await self.handle_1_1(reader, writer) + else: + await self.handle1_0(reader, writer) + writer.write_eof() + LOG.debug("Close the connection") + writer.close() + await writer.wait_closed() + + async def handle1_0(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): + LOG.debug("Incoming socket open 1.0") + while True: + try: + buf = await reader.read(4) + except Exception as e: + LOG.exception("Read error", e) + return + if buf == b'': + break + length, = struct.unpack('!I', buf) + buf = b'' + while length > 0: + newbuf = await reader.read(length) + if not newbuf: + break + buf += newbuf + length -= len(newbuf) + + result = await self._call_handlers_for_packet(buf) + for r in result: + if r is not None: + if not isinstance(r, list): + r = [r] + msg = osc_message_builder.build_msg(r[0], r[1:]) + b = struct.pack('!I', len(msg.dgram)) + writer.write(b + msg.dgram) + await writer.drain() + + async def handle_1_1(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): + LOG.debug("Incoming socket open 1.1") + while True: + try: + buf = await reader.read(4096) + except Exception as e: + LOG.exception("Read error", e) + return + if buf == b'': + break + while len(buf) > 0 and buf[-1] != 192: + newbuf = await reader.read(4096) + if not newbuf: + # Maybe should raise an exception here? + break + buf += newbuf + + packets = [slip.decode(p) for p in buf.split(slip.END_END)] + for p in packets: + result = await self._call_handlers_for_packet(p) + for r in result: + if r is not None: + if not isinstance(r, list): + r = [r] + msg = osc_message_builder.build_msg(r[0], r[1:]) + writer.write(slip.encode(msg.dgram)) + await writer.drain() + + async def _call_handlers_for_packet(self, data) -> list: + """ + This function calls the handlers registered to the dispatcher for + every message it found in the packet. + The process/thread granularity is thus the OSC packet, not the handler. + + If parameters were registered with the dispatcher, then the handlers are + called this way: + handler('/address that triggered the message', + registered_param_list, osc_msg_arg1, osc_msg_arg2, ...) + if no parameters were registered, then it is just called like this: + handler('/address that triggered the message', + osc_msg_arg1, osc_msg_arg2, osc_msg_param3, ...) + """ + + # Get OSC messages from all bundles or standalone message. + results = [] + try: + packet = osc_packet.OscPacket(data) + for timed_msg in packet.messages: + now = time.time() + handlers = self._dispatcher.handlers_for_address( + timed_msg.message.address) + if not handlers: + continue + # If the message is to be handled later, then so be it. + if timed_msg.time > now: + time.sleep(timed_msg.time - now) + for handler in handlers: + if inspect.iscoroutinefunction(handler.callback): + if handler.args: + result = await handler.callback( + timed_msg.message.address, handler.args, *timed_msg.message) + else: + result = await handler.callback(timed_msg.message.address, + *timed_msg.message) + else: + if handler.args: + result = handler.callback( + timed_msg.message.address, handler.args, *timed_msg.message) + else: + result = handler.callback(timed_msg.message.address, *timed_msg.message) + results.append(result) + except osc_packet.ParseError as e: + LOG.debug(f"Packet parse error: {str(e)}") + return results diff --git a/pythonosc/slip.py b/pythonosc/slip.py new file mode 100644 index 0000000..a3d2bcb --- /dev/null +++ b/pythonosc/slip.py @@ -0,0 +1,82 @@ +# Copyright (c) 2020. Ruud de Jong +# This file is part of the SlipLib project which is released under the MIT license. +# See https://github.com/rhjdjong/SlipLib for details. + +import re + +END = b'\xc0' +ESC = b'\xdb' +ESC_END = b'\xdc' +ESC_ESC = b'\xdd' +END_END = b'\xc0\xc0' +"""These constants represent the special SLIP bytes""" + + +class ProtocolError(ValueError): + """Exception to indicate that a SLIP protocol error has occurred. + + This exception is raised when an attempt is made to decode + a packet with an invalid byte sequence. + An invalid byte sequence is either an :const:`ESC` byte followed + by any byte that is not an :const:`ESC_ESC` or :const:`ESC_END` byte, + or a trailing :const:`ESC` byte as last byte of the packet. + + The :exc:`ProtocolError` carries the invalid packet + as the first (and only) element in in its :attr:`args` tuple. + """ + + +def encode(msg: bytes) -> bytes: + """Encodes a message (a byte sequence) into a SLIP-encoded packet. + + Args: + msg: The message that must be encoded + + Returns: + The SLIP-encoded message + """ + if msg: + msg = bytes(msg) + else: + msg = b'' + return END + msg.replace(ESC, ESC + ESC_ESC).replace(END, ESC + ESC_END) + END + + +def decode(packet: bytes) -> bytes: + """Retrieves the message from the SLIP-encoded packet. + + Args: + packet: The SLIP-encoded message. + Note that this must be exactly one complete packet. + The :func:`decode` function does not provide any buffering + for incomplete packages, nor does it provide support + for decoding data with multiple packets. + Returns: + The decoded message + + Raises: + ProtocolError: if the packet contains an invalid byte sequence. + """ + if not is_valid(packet): + raise ProtocolError(packet) + return packet.strip(END).replace(ESC + ESC_END, END).replace(ESC + ESC_ESC, ESC) + + +def is_valid(packet: bytes) -> bool: + """Indicates if the packet's contents conform to the SLIP specification. + + A packet is valid if: + + * It contains no :const:`END` bytes other than leading and/or trailing :const:`END` bytes, and + * Each :const:`ESC` byte is followed by either an :const:`ESC_END` or an :const:`ESC_ESC` byte. + + Args: + packet: The packet to inspect. + + Returns: + :const:`True` if the packet is valid, :const:`False` otherwise + """ + packet = packet.strip(END) + return not (END in packet or + packet.endswith(ESC) or + re.search(ESC + b'[^' + ESC_END + ESC_ESC + b']', packet)) diff --git a/pythonosc/tcp_client.py b/pythonosc/tcp_client.py new file mode 100644 index 0000000..e9c0107 --- /dev/null +++ b/pythonosc/tcp_client.py @@ -0,0 +1,213 @@ +"""TCP Clients for sending OSC messages to an OSC server""" +import asyncio +import socket +import struct +from typing import AsyncGenerator, Generator, List, Union + +from pythonosc import slip +from pythonosc.osc_bundle import OscBundle +from pythonosc.osc_message import OscMessage +from pythonosc.osc_message_builder import ArgValue, build_msg +from pythonosc.osc_tcp_server import MODE_1_1 + + +class TCPClient(object): + """Async OSC client to send :class:`OscMessage` or :class:`OscBundle` via TCP""" + + def __init__(self, address: str, port: int, + family: socket.AddressFamily = socket.AF_INET, mode: str = MODE_1_1) -> None: + """Initialize client + + Args: + address: IP address of server + port: Port of server + family: address family parameter (passed to socket.getaddrinfo) + """ + self.address = address + self.port = port + self.family = family + self.mode = mode + self.socket = socket.socket(self.family, socket.SOCK_STREAM) + self.socket.settimeout(30) + self.socket.connect((address, port)) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + def send(self, content: Union[OscMessage, OscBundle]) -> None: + """Sends an :class:`OscMessage` or :class:`OscBundle` via TCP + + Args: + content: Message or bundle to be sent + """ + if self.mode == MODE_1_1: + self.socket.sendall(slip.encode(content.dgram)) + else: + b = struct.pack('!I', len(content.dgram)) + self.socket.sendall(b + content.dgram) + + def receive(self, timeout: int = 30) -> List[bytes]: + self.socket.settimeout(timeout) + if self.mode == MODE_1_1: + try: + buf = self.socket.recv(4096) + except TimeoutError: + return [] + if not buf: + return [] + # If the last byte is not an END marker there could be more data coming + while buf[-1] != 192: + try: + newbuf = self.socket.recv(4096) + except TimeoutError: + break + if not newbuf: + # Maybe should raise an exception here? + break + buf += newbuf + return [slip.decode(p) for p in buf.split(slip.END_END)] + else: + buf = b'' + try: + lengthbuf = self.socket.recv(4) + except TimeoutError: + return [] + length, = struct.unpack('!I', lengthbuf) + while length > 0: + try: + newbuf = self.socket.recv(length) + except TimeoutError: + return [] + if not newbuf: + return [] + buf += newbuf + length -= len(newbuf) + return [buf] + + def close(self): + self.socket.close() + + +class SimpleTCPClient(TCPClient): + """Simple OSC client that automatically builds :class:`OscMessage` from arguments""" + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def send_message(self, address: str, value: ArgValue = None) -> None: + """Build :class:`OscMessage` from arguments and send to server + + Args: + address: OSC address the message shall go to + value: One or more arguments to be added to the message + """ + msg = build_msg(address, value) + return self.send(msg) + + def get_messages(self, timeout: int = 30) -> Generator: + r = self.receive(timeout) + while r: + yield OscMessage(r) + r = self.receive(timeout) + + +class AsyncOSCTCPClient: + """Async OSC client to send :class:`OscMessage` or :class:`OscBundle` via TCP""" + + def __init__(self, address: str, port: int, + family: socket.AddressFamily = socket.AF_INET, mode: str = MODE_1_1) -> None: + """Initialize client + + Args: + address: IP address of server + port: Port of server + family: address family parameter (passed to socket.getaddrinfo) + """ + self.reader: asyncio.StreamReader = None + self.writer: asyncio.StreamWriter = None + self.address: str = address + self.port: int = port + self.mode: str = mode + self.family: socket.AddressFamily = family + + def __await__(self): + async def closure(): + await self.__open__() + return self + + return closure().__await__() + + async def __aenter__(self): + await self.__open__() + return self + + async def __open__(self): + self.reader, self.writer = await asyncio.open_connection( + self.address, self.port) + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.close() + + async def send(self, content: Union[OscMessage, OscBundle]) -> None: + """Sends an :class:`OscMessage` or :class:`OscBundle` via TCP + + Args: + content: Message or bundle to be sent + """ + if self.mode == MODE_1_1: + self.writer.write(slip.encode(content.dgram)) + else: + b = struct.pack('!I', len(content.dgram)) + self.writer.write(b + content.dgram) + await self.writer.drain() + + async def receive(self, timeout: int = 30) -> List[bytes]: + try: + async with asyncio.timeout(timeout): + buf = await self.reader.read(4096) + except TimeoutError: + return [] + if not buf: + return [] + # If the last byte is not an END marker there could be more data coming + while buf[-1] != 192: + try: + async with asyncio.timeout(timeout): + newbuf = await self.reader.read(4096) + except asyncio.TimeoutError: + break + if not newbuf: + # Maybe should raise an exception here? + break + buf += newbuf + return [slip.decode(p) for p in buf.split(slip.END_END)] + + async def close(self): + self.writer.write_eof() + self.writer.close() + await self.writer.wait_closed() + + +class AsyncSimpleTCPClient(AsyncOSCTCPClient): + """Simple OSC client that automatically builds :class:`OscMessage` from arguments""" + def __init__(self, address, port, family: socket.AddressFamily = socket.AF_INET, mode: str = MODE_1_1): + super().__init__(address, port, family, mode) + + async def send_message(self, address: str, value: ArgValue = None) -> None: + """Build :class:`OscMessage` from arguments and send to server + + Args: + address: OSC address the message shall go to + value: One or more arguments to be added to the message + """ + msg = build_msg(address, value) + return await self.send(msg) + + async def get_messages(self, timeout: int = 30) -> AsyncGenerator: + r = await self.receive(timeout) + while r: + for m in r: + yield OscMessage(m) + r = await self.receive(timeout) diff --git a/pythonosc/test/test_osc_tcp_server.py b/pythonosc/test/test_osc_tcp_server.py new file mode 100644 index 0000000..2743be8 --- /dev/null +++ b/pythonosc/test/test_osc_tcp_server.py @@ -0,0 +1,275 @@ +import struct +import unittest +import unittest.mock as mock + +from pythonosc import dispatcher, osc_tcp_server +from pythonosc.slip import END + +_SIMPLE_PARAM_INT_MSG = ( + b"/SYNC\x00\x00\x00" + b",i\x00\x00" + b"\x00\x00\x00\x04") + +LEN_SIMPLE_PARAM_INT_MSG = struct.pack('!I', len(_SIMPLE_PARAM_INT_MSG)) +_SIMPLE_PARAM_INT_MSG_1_1 = END + _SIMPLE_PARAM_INT_MSG + END + +# Regression test for a datagram that should NOT be stripped, ever... +_SIMPLE_PARAM_INT_9 = b'/debug\x00\x00,i\x00\x00\x00\x00\x00\t' +LEN_SIMPLE_PARAM_INT_9 = struct.pack('!I', len(_SIMPLE_PARAM_INT_9)) + +_SIMPLE_PARAM_INT_9_1_1 = END + _SIMPLE_PARAM_INT_9 + END + +_SIMPLE_MSG_NO_PARAMS = b"/SYNC\x00\x00\x00" +LEN_SIMPLE_MSG_NO_PARAMS = struct.pack('!I', len(_SIMPLE_MSG_NO_PARAMS)) +_SIMPLE_MSG_NO_PARAMS_1_1 = END + _SIMPLE_MSG_NO_PARAMS + END + + +class TestOscTcpServer(unittest.TestCase): + pass + + +class TestTCP_1_1_Handler(unittest.TestCase): + def setUp(self): + super().setUp() + self.dispatcher = dispatcher.Dispatcher() + # We do not want to create real UDP connections during unit tests. + self.server = unittest.mock.Mock(spec=osc_tcp_server.BlockingOSCTCPServer) + # Need to attach property mocks to types, not objects... weird. + type(self.server).dispatcher = unittest.mock.PropertyMock( + return_value=self.dispatcher) + self.client_address = ("127.0.0.1", 8080) + self.mock_meth = unittest.mock.MagicMock() + self.mock_meth.return_value = None + + def test_no_match(self): + self.dispatcher.map("/foobar", self.mock_meth) + mock_sock = mock.Mock() + mock_sock.recv = mock.Mock() + mock_sock.recv.side_effect = [_SIMPLE_MSG_NO_PARAMS_1_1, _SIMPLE_PARAM_INT_MSG_1_1, ""] + osc_tcp_server._TCPHandler1_1( + mock_sock, self.client_address, self.server) + self.assertFalse(self.mock_meth.called) + + def test_match_with_args(self): + self.dispatcher.map("/SYNC", self.mock_meth, 1, 2, 3) + mock_sock = mock.Mock() + mock_sock.recv = mock.Mock() + mock_sock.recv.side_effect = [_SIMPLE_PARAM_INT_MSG_1_1, ""] + osc_tcp_server._TCPHandler1_1( + mock_sock, self.client_address, self.server) + self.mock_meth.assert_called_with("/SYNC", [1, 2, 3], 4) + + def test_match_int9(self): + self.dispatcher.map("/debug", self.mock_meth) + mock_sock = mock.Mock() + mock_sock.recv = mock.Mock() + mock_sock.recv.side_effect = [_SIMPLE_PARAM_INT_9_1_1, ""] + osc_tcp_server._TCPHandler1_1( + mock_sock, self.client_address, self.server) + self.assertTrue(self.mock_meth.called) + self.mock_meth.assert_called_with("/debug", 9) + + def test_match_without_args(self): + self.dispatcher.map("/SYNC", self.mock_meth) + mock_sock = mock.Mock() + mock_sock.recv = mock.Mock() + mock_sock.recv.side_effect = [_SIMPLE_MSG_NO_PARAMS_1_1, ""] + osc_tcp_server._TCPHandler1_1( + mock_sock, self.client_address, self.server) + self.mock_meth.assert_called_with("/SYNC") + + def test_match_default_handler(self): + self.dispatcher.set_default_handler(self.mock_meth) + mock_sock = mock.Mock() + mock_sock.recv = mock.Mock() + mock_sock.recv.side_effect = [_SIMPLE_MSG_NO_PARAMS_1_1, ""] + osc_tcp_server._TCPHandler1_1( + mock_sock, self.client_address, self.server) + self.mock_meth.assert_called_with("/SYNC") + + def test_response_no_args(self): + def respond(*args, **kwargs): + return "/SYNC" + self.dispatcher.map("/SYNC", respond) + mock_sock = mock.Mock() + mock_sock.recv = mock.Mock() + mock_sock.recv.side_effect = [_SIMPLE_MSG_NO_PARAMS_1_1, ""] + mock_sock.sendall = mock.Mock() + mock_sock.sendall.return_value = None + osc_tcp_server._TCPHandler1_1( + mock_sock, self.client_address, self.server) + mock_sock.sendall.assert_called_with(b'\xc0/SYNC\00\00\00,\00\00\00\xc0') + + def test_response_with_args(self): + def respond(*args, **kwargs): + return ["/SYNC", 1, "2", 3.0,] + self.dispatcher.map("/SYNC", respond) + mock_sock = mock.Mock() + mock_sock.recv = mock.Mock() + mock_sock.recv.side_effect = [_SIMPLE_MSG_NO_PARAMS_1_1, ""] + mock_sock.sendall = mock.Mock() + mock_sock.sendall.return_value = None + osc_tcp_server._TCPHandler1_1( + mock_sock, self.client_address, self.server) + mock_sock.sendall.assert_called_with(b'\xc0/SYNC\00\00\00,isf\x00\x00\x00\x00\x00\x00\x00\x012\x00\x00\x00@@\x00\x00\xc0') + + +class TestTCP_1_0_Handler(unittest.TestCase): + def setUp(self): + super().setUp() + self.dispatcher = dispatcher.Dispatcher() + # We do not want to create real UDP connections during unit tests. + self.server = unittest.mock.Mock(spec=osc_tcp_server.BlockingOSCTCPServer) + # Need to attach property mocks to types, not objects... weird. + type(self.server).dispatcher = unittest.mock.PropertyMock( + return_value=self.dispatcher) + self.client_address = ("127.0.0.1", 8080) + self.mock_meth = unittest.mock.MagicMock() + self.mock_meth.return_value = None + + def test_no_match(self): + self.dispatcher.map("/foobar", self.mock_meth) + mock_sock = mock.Mock() + mock_sock.recv = mock.Mock() + mock_sock.recv.side_effect = [LEN_SIMPLE_MSG_NO_PARAMS, _SIMPLE_MSG_NO_PARAMS, LEN_SIMPLE_PARAM_INT_MSG, + _SIMPLE_PARAM_INT_MSG, ""] + osc_tcp_server._TCPHandler1_0( + mock_sock, self.client_address, self.server) + self.assertFalse(self.mock_meth.called) + + def test_match_with_args(self): + self.dispatcher.map("/SYNC", self.mock_meth, 1, 2, 3) + mock_sock = mock.Mock() + mock_sock.recv = mock.Mock() + mock_sock.recv.side_effect = [LEN_SIMPLE_PARAM_INT_MSG, _SIMPLE_PARAM_INT_MSG, ""] + osc_tcp_server._TCPHandler1_0( + mock_sock, self.client_address, self.server) + self.mock_meth.assert_called_with("/SYNC", [1, 2, 3], 4) + + def test_match_int9(self): + self.dispatcher.map("/debug", self.mock_meth) + mock_sock = mock.Mock() + mock_sock.recv = mock.Mock() + mock_sock.recv.side_effect = [LEN_SIMPLE_PARAM_INT_9, _SIMPLE_PARAM_INT_9, ""] + osc_tcp_server._TCPHandler1_0( + mock_sock, self.client_address, self.server) + self.assertTrue(self.mock_meth.called) + self.mock_meth.assert_called_with("/debug", 9) + + def test_match_without_args(self): + self.dispatcher.map("/SYNC", self.mock_meth) + mock_sock = mock.Mock() + mock_sock.recv = mock.Mock() + mock_sock.recv.side_effect = [LEN_SIMPLE_MSG_NO_PARAMS, _SIMPLE_MSG_NO_PARAMS, ""] + osc_tcp_server._TCPHandler1_0( + mock_sock, self.client_address, self.server) + self.mock_meth.assert_called_with("/SYNC") + + def test_match_default_handler(self): + self.dispatcher.set_default_handler(self.mock_meth) + mock_sock = mock.Mock() + mock_sock.recv = mock.Mock() + mock_sock.recv.side_effect = [LEN_SIMPLE_MSG_NO_PARAMS, _SIMPLE_MSG_NO_PARAMS, ""] + osc_tcp_server._TCPHandler1_0( + mock_sock, self.client_address, self.server) + self.mock_meth.assert_called_with("/SYNC") + + def test_response_no_args(self): + def respond(*args, **kwargs): + return "/SYNC" + self.dispatcher.map("/SYNC", respond) + mock_sock = mock.Mock() + mock_sock.recv = mock.Mock() + mock_sock.recv.side_effect = [LEN_SIMPLE_MSG_NO_PARAMS, _SIMPLE_MSG_NO_PARAMS, ""] + mock_sock.sendall = mock.Mock() + mock_sock.sendall.return_value = None + osc_tcp_server._TCPHandler1_0( + mock_sock, self.client_address, self.server) + mock_sock.sendall.assert_called_with(b'\x00\x00\x00\x0c/SYNC\00\00\00,\00\00\00') + + def test_response_with_args(self): + def respond(*args, **kwargs): + return ["/SYNC", 1, "2", 3.0,] + self.dispatcher.map("/SYNC", respond) + mock_sock = mock.Mock() + mock_sock.recv = mock.Mock() + mock_sock.recv.side_effect = [LEN_SIMPLE_MSG_NO_PARAMS, _SIMPLE_MSG_NO_PARAMS, ""] + mock_sock.sendall = mock.Mock() + mock_sock.sendall.return_value = None + osc_tcp_server._TCPHandler1_0( + mock_sock, self.client_address, self.server) + mock_sock.sendall.assert_called_with(b'\x00\x00\x00\x1c/SYNC\00\00\00,isf\x00\x00\x00\x00\x00\x00\x00\x012\x00\x00\x00@@\x00\x00') + + +class TestAsync1_1Handler(unittest.IsolatedAsyncioTestCase): + def setUp(self): + super().setUp() + self.dispatcher = dispatcher.Dispatcher() + # We do not want to create real UDP connections during unit tests. + self.server = unittest.mock.Mock(spec=osc_tcp_server.BlockingOSCTCPServer) + # Need to attach property mocks to types, not objects... weird. + type(self.server).dispatcher = unittest.mock.PropertyMock( + return_value=self.dispatcher) + self.client_address = ("127.0.0.1", 8080) + self.mock_writer = mock.Mock() + self.mock_writer.close = mock.Mock() + self.mock_writer.write = mock.Mock() + self.mock_writer.write_eof = mock.Mock() + self.mock_writer.drain = mock.AsyncMock() + self.mock_writer.wait_closed = mock.AsyncMock() + self.mock_reader = mock.Mock() + self.mock_reader.read = mock.AsyncMock() + self.server = osc_tcp_server.AsyncOSCTCPServer("127.0.0.1", 8008, self.dispatcher) + self.mock_meth = unittest.mock.MagicMock() + self.mock_meth.return_value = None + + async def test_no_match(self): + self.dispatcher.map("/foobar", self.mock_meth) + self.mock_reader.read.side_effect = [_SIMPLE_MSG_NO_PARAMS_1_1, _SIMPLE_PARAM_INT_MSG_1_1, b''] + await osc_tcp_server.AsyncOSCTCPServer.handle(self.server, self.mock_reader, self.mock_writer) + self.assertFalse(self.mock_meth.called) + + async def test_match_with_args(self): + self.dispatcher.map("/SYNC", self.mock_meth, 1, 2, 3) + self.mock_reader.read.side_effect = [_SIMPLE_PARAM_INT_MSG_1_1, b''] + await osc_tcp_server.AsyncOSCTCPServer.handle(self.server, self.mock_reader, self.mock_writer) + self.mock_meth.assert_called_with("/SYNC", [1, 2, 3], 4) + + async def test_match_int9(self): + self.dispatcher.map("/debug", self.mock_meth) + self.mock_reader.read.side_effect = [_SIMPLE_PARAM_INT_9_1_1, b''] + await osc_tcp_server.AsyncOSCTCPServer.handle(self.server, self.mock_reader, self.mock_writer) + self.assertTrue(self.mock_meth.called) + self.mock_meth.assert_called_with("/debug", 9) + + async def test_match_without_args(self): + self.dispatcher.map("/SYNC", self.mock_meth) + self.mock_reader.read.side_effect = [_SIMPLE_MSG_NO_PARAMS_1_1, b''] + await osc_tcp_server.AsyncOSCTCPServer.handle(self.server, self.mock_reader, self.mock_writer) + self.mock_meth.assert_called_with("/SYNC") + + async def test_match_default_handler(self): + self.dispatcher.set_default_handler(self.mock_meth) + self.mock_reader.read.side_effect = [_SIMPLE_MSG_NO_PARAMS_1_1, b''] + await osc_tcp_server.AsyncOSCTCPServer.handle(self.server, self.mock_reader, self.mock_writer) + self.mock_meth.assert_called_with("/SYNC") + + async def test_response_no_args(self): + def respond(*args, **kwargs): + return "/SYNC" + self.dispatcher.map("/SYNC", respond) + self.mock_reader.read.side_effect = [_SIMPLE_MSG_NO_PARAMS_1_1, b''] + await osc_tcp_server.AsyncOSCTCPServer.handle(self.server, self.mock_reader, self.mock_writer) + self.mock_writer.write.assert_called_with(b'\xc0/SYNC\00\00\00,\00\00\00\xc0') + + async def test_response_with_args(self): + def respond(*args, **kwargs): + return ["/SYNC", 1, "2", 3.0,] + self.dispatcher.map("/SYNC", respond) + self.mock_reader.read.side_effect = [_SIMPLE_MSG_NO_PARAMS_1_1, b''] + await osc_tcp_server.AsyncOSCTCPServer.handle(self.server, self.mock_reader, self.mock_writer) + self.mock_writer.write.assert_called_with(b'\xc0/SYNC\00\00\00,isf\x00\x00\x00\x00\x00\x00\x00\x012\x00\x00\x00@@\x00\x00\xc0') + + +if __name__ == "__main__": + unittest.main() diff --git a/pythonosc/test/test_tcp_client.py b/pythonosc/test/test_tcp_client.py new file mode 100644 index 0000000..37990e4 --- /dev/null +++ b/pythonosc/test/test_tcp_client.py @@ -0,0 +1,69 @@ +import asyncio +import unittest +from unittest import mock + +from pythonosc import osc_message_builder, slip, tcp_client + + +class TestTcpClient(unittest.TestCase): + @mock.patch('socket.socket') + def test_client(self, mock_socket_ctor): + mock_socket = mock_socket_ctor.return_value + mock_send = mock.Mock() + mock_recv = mock.Mock() + mock_send.return_value = None + mock_recv.return_value = "" + + mock_socket.sendall = mock_send + mock_socket.recv = mock_recv + msg = osc_message_builder.OscMessageBuilder('/').build() + with tcp_client.TCPClient('::1', 31337) as client: + client.send(msg) + mock_socket.sendall.assert_called_once_with(slip.encode(msg.dgram)) + + @mock.patch('socket.socket') + def test_simple_client(self, mock_socket_ctor): + mock_socket = mock_socket_ctor.return_value + mock_send = mock.Mock() + mock_recv = mock.Mock() + mock_send.return_value = None + mock_recv.return_value = "" + + mock_socket.sendall = mock_send + mock_socket.recv = mock_recv + with tcp_client.SimpleTCPClient('::1', 31337) as client: + client.send_message('/', []) + mock_socket.sendall.assert_called_once() + + +class TestAsyncTcpClient(unittest.IsolatedAsyncioTestCase): + @mock.patch('asyncio.open_connection') + async def test_send(self, mock_socket_ctor): + mock_reader = mock.Mock() + mock_writer = mock.Mock() + mock_writer.drain = mock.AsyncMock() + mock_writer.wait_closed = mock.AsyncMock() + mock_socket_ctor.return_value = (mock_reader, mock_writer) + loop = asyncio.get_running_loop() + loop.set_debug(False) + msg = osc_message_builder.OscMessageBuilder('/').build() + async with tcp_client.AsyncOSCTCPClient('::1', 31337) as client: + await client.send(msg) + + self.assertTrue(mock_writer.write.called) + mock_writer.write.assert_called_once_with(slip.encode(msg.dgram)) + + @mock.patch('asyncio.open_connection') + async def test_send_message_calls_send_with_msg(self, mock_socket_ctor): + mock_reader = mock.Mock() + mock_writer = mock.Mock() + mock_writer.drain = mock.AsyncMock() + mock_writer.wait_closed = mock.AsyncMock() + mock_socket_ctor.return_value = (mock_reader, mock_writer) + async with tcp_client.AsyncSimpleTCPClient('::1', 31337) as client: + await client.send_message('/address', 1) + self.assertTrue(mock_writer.write.called) + + +if __name__ == "__main__": + unittest.main()