diff --git a/CHANGELOG.md b/CHANGELOG.md index 1576c58d9..6a649f684 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/). not the number of samples per batch. It has been renamed to `batches`. * All MQTT clients upgraded and APIs updated. * MQTT broker may now be specified via DNS hostnames + * `hitl/streaming.py` no longer requires a prefix + * Streaming now supports UDP multicast addresses ## [v0.8.1](https://github.com/quartiq/stabilizer/compare/v0.8.0...v0.8.1) - 2022-11-14) diff --git a/hitl/streaming.py b/hitl/streaming.py index ea63fb2d2..1803d979d 100644 --- a/hitl/streaming.py +++ b/hitl/streaming.py @@ -3,9 +3,10 @@ import asyncio import logging +import ipaddress import argparse -from miniconf import Miniconf +import miniconf from stabilizer.stream import measure, StabilizerStream, get_local_ip logger = logging.getLogger(__name__) @@ -13,12 +14,12 @@ async def _main(): parser = argparse.ArgumentParser(description="Stabilizer Stream HITL test") - parser.add_argument("prefix", type=str, + parser.add_argument("prefix", type=str, nargs='?', help="The MQTT topic prefix of the target") parser.add_argument("--broker", "-b", default="mqtt", type=str, help="The MQTT broker address") - parser.add_argument("--host", default="0.0.0.0", - help="Local address to listen on") + parser.add_argument("--ip", default="0.0.0.0", + help="The IP address to listen on") parser.add_argument("--port", type=int, default=9293, help="Local port to listen on") parser.add_argument("--duration", type=float, default=10., @@ -27,18 +28,36 @@ async def _main(): help="Maximum loss for success") args = parser.parse_args() + prefix = args.prefix + if not args.prefix: + devices = await miniconf.discover(args.broker, 'dt/sinara/dual-iir/+', 1) + if not devices: + raise Exception('No Stabilizer (Dual-iir) devices found') + assert len(devices) == 1, \ + f'Multiple Stabilizers found: {devices}. Please specify one with --prefix' + + prefix = devices.pop() + logging.basicConfig(level=logging.INFO) - conf = await Miniconf.create(args.prefix, args.broker) - local_ip = get_local_ip(args.broker) + conf = await miniconf.Miniconf.create(prefix, args.broker) + + stream_target = [int(x) for x in args.ip.split('.')] + if ipaddress.ip_address(args.ip).is_unspecified: + stream_target = get_local_ip(args.broker) logger.info("Starting stream") await conf.set( - "/stream_target", {"ip": local_ip, "port": args.port}, retain=False) + "/stream_target", { + "ip": stream_target, + "port": args.port + }, retain=False) try: logger.info("Testing stream reception") - _transport, stream = await StabilizerStream.open((args.host, args.port)) + _transport, stream = await StabilizerStream.open(args.ip, + args.port, + args.broker) loss = await measure(stream, args.duration) if loss > args.max_loss: raise RuntimeError("High frame loss", loss) diff --git a/py/stabilizer/stream.py b/py/stabilizer/stream.py index 6adc6f66c..4452c9c34 100644 --- a/py/stabilizer/stream.py +++ b/py/stabilizer/stream.py @@ -6,6 +6,7 @@ import logging import struct import socket +import ipaddress from collections import namedtuple from dataclasses import dataclass @@ -87,19 +88,32 @@ class StabilizerStream(asyncio.DatagramProtocol): } @classmethod - async def open(cls, local_addr, maxsize=1): + async def open(cls, addr, port, broker, maxsize=1): """Open a UDP socket and start receiving frames""" loop = asyncio.get_running_loop() - transport, protocol = await loop.create_datagram_endpoint( - lambda: cls(maxsize), local_addr=local_addr) + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + # Increase the OS UDP receive buffer size to 4 MiB so that latency # spikes don't impact much. Achieving 4 MiB may require increasing # the max allowed buffer size, e.g. via # `sudo sysctl net.core.rmem_max=26214400` but nowadays the default # max appears to be ~ 50 MiB already. - sock = transport.get_extra_info("socket") - if sock is not None: - sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 4 << 20) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 4 << 20) + + # We need to specify which interface to receive broadcasts from, or Windows may choose the + # wrong one. Thus, use the broker address to figure out our local address for the interface + # of interest. + if ipaddress.ip_address(addr).is_multicast: + print('Subscribing to multicast') + group = socket.inet_aton(addr) + iface = socket.inet_aton('.'.join([str(x) for x in get_local_ip(broker)])) + sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, group + iface) + sock.bind(('', port)) + else: + sock.bind((addr, port)) + + transport, protocol = await loop.create_datagram_endpoint(lambda: cls(maxsize), sock=sock) return transport, protocol def __init__(self, maxsize): @@ -155,7 +169,7 @@ async def _record(): pass logger.info("Received %g MB, %g MB/s", stat.bytes/1e6, - stat.bytes/1e6/duration) + stat.bytes/1e6/duration) sent = stat.received + stat.lost if sent: @@ -173,6 +187,8 @@ async def main(): help="Local port to listen on") parser.add_argument("--host", default="0.0.0.0", help="Local address to listen on") + parser.add_argument("--broker", default="mqtt", + help="The MQTT broker address") parser.add_argument("--maxsize", type=int, default=1, help="Frame queue size") parser.add_argument("--duration", type=float, default=1., @@ -181,7 +197,7 @@ async def main(): logging.basicConfig(level=logging.INFO) _transport, stream = await StabilizerStream.open( - (args.host, args.port), args.maxsize) + args.host, args.port, args.broker, args.maxsize) await measure(stream, args.duration)