From 7eb4da6510045b48e925e5a64ab5d6740ad8082b Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Wed, 11 Oct 2023 12:11:15 +0200 Subject: [PATCH 1/7] Adding support for multicast stream data --- hitl/streaming.py | 13 +++++++++---- py/stabilizer/stream.py | 26 ++++++++++++++++++++------ 2 files changed, 29 insertions(+), 10 deletions(-) diff --git a/hitl/streaming.py b/hitl/streaming.py index ea63fb2d2..e56b66a07 100644 --- a/hitl/streaming.py +++ b/hitl/streaming.py @@ -17,8 +17,8 @@ async def _main(): help="The MQTT topic prefix of the target") parser.add_argument("--broker", "-b", default="mqtt", type=str, help="The MQTT broker address") - parser.add_argument("--host", default="0.0.0.0", - help="Local address to listen on") + parser.add_argument("--multicast", "-m", default="239.192.1.100", type=str, + help="The multicast address to use for streaming") parser.add_argument("--port", type=int, default=9293, help="Local port to listen on") parser.add_argument("--duration", type=float, default=10., @@ -34,11 +34,16 @@ async def _main(): logger.info("Starting stream") await conf.set( - "/stream_target", {"ip": local_ip, "port": args.port}, retain=False) + "/stream_target", { + "ip": [int(x) for x in args.multicast.split('.')], + "port": args.port + }, retain=False) try: logger.info("Testing stream reception") - _transport, stream = await StabilizerStream.open((args.host, args.port)) + _transport, stream = await StabilizerStream.open(args.multicast, + args.port, + args.broker) loss = await measure(stream, args.duration) if loss > args.max_loss: raise RuntimeError("High frame loss", loss) diff --git a/py/stabilizer/stream.py b/py/stabilizer/stream.py index 6adc6f66c..7660f86a9 100644 --- a/py/stabilizer/stream.py +++ b/py/stabilizer/stream.py @@ -87,11 +87,23 @@ class StabilizerStream(asyncio.DatagramProtocol): } @classmethod - async def open(cls, local_addr, maxsize=1): - """Open a UDP socket and start receiving frames""" + async def open(cls, multicast_addr, multicast_port, broker, maxsize=1): + """Open a UDP multicast socket and start receiving frames""" loop = asyncio.get_running_loop() - transport, protocol = await loop.create_datagram_endpoint( - lambda: cls(maxsize), local_addr=local_addr) + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 20) + + # We need to specify which interface to receive broadcasts from, or Windows may choose the + # wrong one. Thus, use the broker address to figure out our local address for the interface + # of interest. + group = socket.inet_aton(multicast_addr) + iface = socket.inet_aton('.'.join([str(x) for x in get_local_ip(broker)])) + sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, group + iface) + + sock.bind(('', multicast_port)) + + transport, protocol = await loop.create_datagram_endpoint(lambda: cls(maxsize), sock=sock) # Increase the OS UDP receive buffer size to 4 MiB so that latency # spikes don't impact much. Achieving 4 MiB may require increasing # the max allowed buffer size, e.g. via @@ -155,7 +167,7 @@ async def _record(): pass logger.info("Received %g MB, %g MB/s", stat.bytes/1e6, - stat.bytes/1e6/duration) + stat.bytes/1e6/duration) sent = stat.received + stat.lost if sent: @@ -173,6 +185,8 @@ async def main(): help="Local port to listen on") parser.add_argument("--host", default="0.0.0.0", help="Local address to listen on") + parser.add_argument("--broker", default="mqtt", + help="The MQTT broker address") parser.add_argument("--maxsize", type=int, default=1, help="Frame queue size") parser.add_argument("--duration", type=float, default=1., @@ -181,7 +195,7 @@ async def main(): logging.basicConfig(level=logging.INFO) _transport, stream = await StabilizerStream.open( - (args.host, args.port), args.maxsize) + (args.host, args.port), args.broker, args.maxsize) await measure(stream, args.duration) From 230e36e7ec572adc4e186272bce09743a10343fe Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Wed, 11 Oct 2023 12:13:27 +0200 Subject: [PATCH 2/7] Fixing args --- py/stabilizer/stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/py/stabilizer/stream.py b/py/stabilizer/stream.py index 7660f86a9..3a4064cc8 100644 --- a/py/stabilizer/stream.py +++ b/py/stabilizer/stream.py @@ -195,7 +195,7 @@ async def main(): logging.basicConfig(level=logging.INFO) _transport, stream = await StabilizerStream.open( - (args.host, args.port), args.broker, args.maxsize) + args.host, args.port, args.broker, args.maxsize) await measure(stream, args.duration) From b7b0c8af86bfa07c37e868ee8c7e4a606d8875df Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Wed, 11 Oct 2023 18:20:02 +0200 Subject: [PATCH 3/7] Updating after review --- hitl/streaming.py | 14 +++++++++----- py/stabilizer/stream.py | 15 ++++++++------- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/hitl/streaming.py b/hitl/streaming.py index e56b66a07..1ad4a51f7 100644 --- a/hitl/streaming.py +++ b/hitl/streaming.py @@ -3,6 +3,7 @@ import asyncio import logging +import ipaddress import argparse from miniconf import Miniconf @@ -17,8 +18,8 @@ async def _main(): help="The MQTT topic prefix of the target") parser.add_argument("--broker", "-b", default="mqtt", type=str, help="The MQTT broker address") - parser.add_argument("--multicast", "-m", default="239.192.1.100", type=str, - help="The multicast address to use for streaming") + parser.add_argument("--ip", default="0.0.0.0", + help="The address to use for streaming") parser.add_argument("--port", type=int, default=9293, help="Local port to listen on") parser.add_argument("--duration", type=float, default=10., @@ -30,18 +31,21 @@ async def _main(): logging.basicConfig(level=logging.INFO) conf = await Miniconf.create(args.prefix, args.broker) - local_ip = get_local_ip(args.broker) + + stream_target = [int(x) for x in args.ip.split('.')] + if ipaddress.ip_address(args.ip).is_unspecified: + stream_target = get_local_ip(args.broker) logger.info("Starting stream") await conf.set( "/stream_target", { - "ip": [int(x) for x in args.multicast.split('.')], + "ip": stream_target, "port": args.port }, retain=False) try: logger.info("Testing stream reception") - _transport, stream = await StabilizerStream.open(args.multicast, + _transport, stream = await StabilizerStream.open(args.ip, args.port, args.broker) loss = await measure(stream, args.duration) diff --git a/py/stabilizer/stream.py b/py/stabilizer/stream.py index 3a4064cc8..99cd0b9fb 100644 --- a/py/stabilizer/stream.py +++ b/py/stabilizer/stream.py @@ -6,6 +6,7 @@ import logging import struct import socket +import ipaddress from collections import namedtuple from dataclasses import dataclass @@ -87,21 +88,21 @@ class StabilizerStream(asyncio.DatagramProtocol): } @classmethod - async def open(cls, multicast_addr, multicast_port, broker, maxsize=1): + async def open(cls, addr, port, broker, maxsize=1): """Open a UDP multicast socket and start receiving frames""" loop = asyncio.get_running_loop() sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 20) # We need to specify which interface to receive broadcasts from, or Windows may choose the # wrong one. Thus, use the broker address to figure out our local address for the interface # of interest. - group = socket.inet_aton(multicast_addr) - iface = socket.inet_aton('.'.join([str(x) for x in get_local_ip(broker)])) - sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, group + iface) + if ipaddress.ip_address(addr).is_multicast: + print('Subscribing to multicast') + group = socket.inet_aton(addr) + iface = socket.inet_aton('.'.join([str(x) for x in get_local_ip(broker)])) + sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, group + iface) - sock.bind(('', multicast_port)) + sock.bind(('', port)) transport, protocol = await loop.create_datagram_endpoint(lambda: cls(maxsize), sock=sock) # Increase the OS UDP receive buffer size to 4 MiB so that latency From 389856b5de685aceeedb8309f1a117be7a797637 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Wed, 11 Oct 2023 18:21:10 +0200 Subject: [PATCH 4/7] Fixing help text --- hitl/streaming.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hitl/streaming.py b/hitl/streaming.py index 1ad4a51f7..36028f1c8 100644 --- a/hitl/streaming.py +++ b/hitl/streaming.py @@ -19,7 +19,7 @@ async def _main(): parser.add_argument("--broker", "-b", default="mqtt", type=str, help="The MQTT broker address") parser.add_argument("--ip", default="0.0.0.0", - help="The address to use for streaming") + help="The IP address to listen on") parser.add_argument("--port", type=int, default=9293, help="Local port to listen on") parser.add_argument("--duration", type=float, default=10., From 1bcfe74862e93519a9572b62e4396380326247d2 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Thu, 12 Oct 2023 12:21:20 +0200 Subject: [PATCH 5/7] Cleaning up impl after review --- py/stabilizer/stream.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/py/stabilizer/stream.py b/py/stabilizer/stream.py index 99cd0b9fb..27d817f83 100644 --- a/py/stabilizer/stream.py +++ b/py/stabilizer/stream.py @@ -89,9 +89,17 @@ class StabilizerStream(asyncio.DatagramProtocol): @classmethod async def open(cls, addr, port, broker, maxsize=1): - """Open a UDP multicast socket and start receiving frames""" + """Open a UDP socket and start receiving frames""" loop = asyncio.get_running_loop() sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + + # Increase the OS UDP receive buffer size to 4 MiB so that latency + # spikes don't impact much. Achieving 4 MiB may require increasing + # the max allowed buffer size, e.g. via + # `sudo sysctl net.core.rmem_max=26214400` but nowadays the default + # max appears to be ~ 50 MiB already. + sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 4 << 20) # We need to specify which interface to receive broadcasts from, or Windows may choose the # wrong one. Thus, use the broker address to figure out our local address for the interface @@ -105,14 +113,6 @@ async def open(cls, addr, port, broker, maxsize=1): sock.bind(('', port)) transport, protocol = await loop.create_datagram_endpoint(lambda: cls(maxsize), sock=sock) - # Increase the OS UDP receive buffer size to 4 MiB so that latency - # spikes don't impact much. Achieving 4 MiB may require increasing - # the max allowed buffer size, e.g. via - # `sudo sysctl net.core.rmem_max=26214400` but nowadays the default - # max appears to be ~ 50 MiB already. - sock = transport.get_extra_info("socket") - if sock is not None: - sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 4 << 20) return transport, protocol def __init__(self, maxsize): From 3f246f8409489c41f3f44101b58fb803e969b05a Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Thu, 12 Oct 2023 12:26:21 +0200 Subject: [PATCH 6/7] Updating CHANGELOG, simplifying streaming hitl script --- CHANGELOG.md | 2 ++ hitl/streaming.py | 16 +++++++++++++--- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1576c58d9..6a649f684 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/). not the number of samples per batch. It has been renamed to `batches`. * All MQTT clients upgraded and APIs updated. * MQTT broker may now be specified via DNS hostnames + * `hitl/streaming.py` no longer requires a prefix + * Streaming now supports UDP multicast addresses ## [v0.8.1](https://github.com/quartiq/stabilizer/compare/v0.8.0...v0.8.1) - 2022-11-14) diff --git a/hitl/streaming.py b/hitl/streaming.py index 36028f1c8..1803d979d 100644 --- a/hitl/streaming.py +++ b/hitl/streaming.py @@ -6,7 +6,7 @@ import ipaddress import argparse -from miniconf import Miniconf +import miniconf from stabilizer.stream import measure, StabilizerStream, get_local_ip logger = logging.getLogger(__name__) @@ -14,7 +14,7 @@ async def _main(): parser = argparse.ArgumentParser(description="Stabilizer Stream HITL test") - parser.add_argument("prefix", type=str, + parser.add_argument("prefix", type=str, nargs='?', help="The MQTT topic prefix of the target") parser.add_argument("--broker", "-b", default="mqtt", type=str, help="The MQTT broker address") @@ -28,9 +28,19 @@ async def _main(): help="Maximum loss for success") args = parser.parse_args() + prefix = args.prefix + if not args.prefix: + devices = await miniconf.discover(args.broker, 'dt/sinara/dual-iir/+', 1) + if not devices: + raise Exception('No Stabilizer (Dual-iir) devices found') + assert len(devices) == 1, \ + f'Multiple Stabilizers found: {devices}. Please specify one with --prefix' + + prefix = devices.pop() + logging.basicConfig(level=logging.INFO) - conf = await Miniconf.create(args.prefix, args.broker) + conf = await miniconf.Miniconf.create(prefix, args.broker) stream_target = [int(x) for x in args.ip.split('.')] if ipaddress.ip_address(args.ip).is_unspecified: From ae66208487337bc7ae3e503cf2c69888b1c3ebdb Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Thu, 12 Oct 2023 12:44:13 +0200 Subject: [PATCH 7/7] Binding to local address for unicast --- py/stabilizer/stream.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/py/stabilizer/stream.py b/py/stabilizer/stream.py index 27d817f83..4452c9c34 100644 --- a/py/stabilizer/stream.py +++ b/py/stabilizer/stream.py @@ -109,8 +109,9 @@ async def open(cls, addr, port, broker, maxsize=1): group = socket.inet_aton(addr) iface = socket.inet_aton('.'.join([str(x) for x in get_local_ip(broker)])) sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, group + iface) - - sock.bind(('', port)) + sock.bind(('', port)) + else: + sock.bind((addr, port)) transport, protocol = await loop.create_datagram_endpoint(lambda: cls(maxsize), sock=sock) return transport, protocol