diff --git a/py/farm_ng/core/event_service_recorder.py b/py/farm_ng/core/event_service_recorder.py index 34a05ccd..a97b9df1 100644 --- a/py/farm_ng/core/event_service_recorder.py +++ b/py/farm_ng/core/event_service_recorder.py @@ -28,11 +28,13 @@ import asyncio import logging import sys +from collections import deque from datetime import datetime, timezone from pathlib import Path from typing import TYPE_CHECKING import grpc +from farm_ng.core import event_pb2 from farm_ng.core.event_client import EventClient from farm_ng.core.event_service import ( EventServiceGrpc, @@ -48,12 +50,11 @@ from farm_ng.core.events_file_reader import payload_to_protobuf, proto_from_json_file from farm_ng.core.events_file_writer import EventsFileWriter from farm_ng.core.stamp import StampSemantics, get_monotonic_now -from farm_ng.core.uri import get_host_name, uri_query_to_dict +from farm_ng.core.uri import get_host_name, uri_query_to_dict, uri_to_string from google.protobuf.empty_pb2 import Empty from google.protobuf.wrappers_pb2 import StringValue if TYPE_CHECKING: - from farm_ng.core import event_pb2 from google.protobuf.message import Message __all__ = ["EventServiceRecorder", "RecorderService"] @@ -65,12 +66,18 @@ class EventServiceRecorder: # the maximum size of the queue QUEUE_MAX_SIZE: int = 50 - def __init__(self, service_name: str, config_list: EventServiceConfigList) -> None: + def __init__( + self, + service_name: str, + config_list: EventServiceConfigList, + header_events: list[tuple[event_pb2.Event, bytes]] | None = None, + ) -> None: """Initializes the service. Args: service_name (str): the name of the service. config_list (EventServiceConfigList): the configuration data structure. + header_events (list[tuple[event_pb2.Event, bytes]], optional): the initial list header events. """ self.service_name: str = service_name self.config_list: EventServiceConfigList = config_list @@ -100,6 +107,25 @@ def __init__(self, service_name: str, config_list: EventServiceConfigList) -> No self.record_queue: asyncio.Queue[tuple[event_pb2.Event, bytes]] = asyncio.Queue( maxsize=self.QUEUE_MAX_SIZE, ) + self.header_deque: deque[tuple[event_pb2.Event, bytes]] = deque() + if header_events is not None: + for event, payload in header_events: + self.add_header_event(event, payload) + + def add_header_event(self, event: event_pb2.Event, payload: bytes) -> None: + """Add a header event to the header_deque. + + Args: + event: Event to add. + payload: Payload to add. + """ + if not isinstance(event, event_pb2.Event): + error_msg = f"header event must be Event, not {type(event)}" + raise TypeError(error_msg) + if not isinstance(payload, bytes): + error_msg = f"header payload must be bytes, not {type(payload)}" + raise TypeError(error_msg) + self.header_deque.append((event, payload)) @property def logger(self) -> logging.Logger: @@ -124,10 +150,20 @@ async def record( extension (str, optional): the extension of the file. Defaults to ".bin". max_file_mb (int, optional): the maximum size of the file in MB. Defaults to 0. """ - with EventsFileWriter(file_base, extension, max_file_mb) as writer: + with EventsFileWriter( + file_base, + extension, + max_file_mb, + header_events=list(self.header_deque), + ) as writer: + self.header_deque.clear() event: event_pb2.Event payload: bytes while True: + # Add any header events added during recording + while self.header_deque: + event, payload = self.header_deque.popleft() + writer.add_header_event(event, payload, write=True) # await a new event and payload, and write it to the file event, payload = await self.record_queue.get() event.timestamps.append( @@ -149,6 +185,9 @@ async def subscribe( event: event_pb2.Event payload: bytes async for event, payload in client.subscribe(subscription, decode=False): + if event.uri.path.startswith("/header"): + # Handle header events - typically these will be metadata or calibrations + self.add_header_event(event, payload) try: self.record_queue.put_nowait((event, payload)) except asyncio.QueueFull: @@ -263,6 +302,9 @@ def __init__(self, event_service: EventServiceGrpc) -> None: # the recorder task self._recorder_task: asyncio.Task | None = None + # For tracking header events (e.g. metadata, calibrations) to be logged in the recordings + self.header_events: dict[str, tuple[event_pb2.Event, bytes]] = {} + # public methods async def start_recording( @@ -287,9 +329,32 @@ async def start_recording( self._recorder = EventServiceRecorder( config_name or "record_default", config_list, + list(self.header_events.values()), ) + if self._recorder.recorder_config is None: + msg = "recorder_config is None" + raise ValueError(msg) + + # Handle if the max_file_mb is set in the record_config args + # Expectation: single string of "--max-file-mb=500" or "--max-file-mb 500" + max_file_mb: int = 0 + for arg in self._recorder.recorder_config.args: + if arg.startswith("--max-file-mb"): + try: + eq_arg = arg.replace(" ", "=").strip() + max_file_mb = int(eq_arg.split("=")[1]) + self._recorder.logger.info("Setting max_file_mb to %s", max_file_mb) + except ValueError: + self._recorder.logger.exception( + "Failed to parse max_file_mb from %s", + eq_arg, + ) + self._recorder_task = asyncio.create_task( - self._recorder.subscribe_and_record(file_base=file_base), + self._recorder.subscribe_and_record( + file_base=file_base, + max_file_mb=max_file_mb, + ), ) def _safe_done_callback( @@ -303,6 +368,7 @@ def _safe_done_callback( finally: del future del recorder_task + self._recorder = None self._recorder_task.add_done_callback( lambda f: _safe_done_callback(f, self._recorder_task), @@ -335,31 +401,62 @@ async def _request_reply_handler( Message: the response message. """ cmd: str = request.event.uri.path - config_name: str | None = None - if cmd.count("/") == 1: - cmd, config_name = cmd.split("/") + # Ensure path was sent with a leading slash + if not cmd.startswith("/"): + cmd = "/" + cmd + + if cmd.startswith("/start"): + # config_name is the optional part of the path after "/start/" + config_name: str | None = cmd[7:] or None - if cmd == "start": config_list: EventServiceConfigList = payload_to_protobuf( request.event, request.payload, ) - self._event_service.logger.info("start %s: %s", config_name, config_list) + self._event_service.logger.info("/start %s: %s", config_name, config_list) file_base = self._data_dir.joinpath(get_file_name_base()) await self.start_recording(file_base, config_list, config_name) return StringValue(value=str(file_base)) - if cmd == "stop": + if cmd == "/stop": + self._event_service.logger.info("/stop") await self.stop_recording() - elif cmd == "metadata": + elif cmd.startswith("/header"): + header_detail: str = cmd[8:] or "" + if not header_detail: + msg = "/header command requires a specifier, e.g. '/header/metadata'" + raise ValueError(msg) + self._event_service.logger.info( + "header:\n%s", + payload_to_protobuf(request.event, request.payload), + ) + self._event_service.logger.info("with uri:\n%s", request.event.uri) + self.add_header_event(request.event, request.payload) + elif cmd == "/clear_headers": + self._event_service.logger.info("/clear_headers") + self.header_events.clear() if self._recorder is not None: - self._event_service.logger.info("send_metadata: %s", request.payload) - await self._recorder.record_queue.put((request.event, request.payload)) - else: - self._event_service.logger.warning( - "requested to send metadata but not recording", - ) + self._recorder.header_deque.clear() return Empty() + def add_header_event(self, event: event_pb2.Event, payload: bytes) -> None: + """Adds a header event to the header_events list. + + If this is a duplicate (per URI), it will replace the existing message. + + Args: + event (event_pb2.Event): the event. + payload (bytes): the payload. + """ + if not isinstance(event, event_pb2.Event): + error_msg = f"header event must be Event, not {type(event)}" + raise TypeError(error_msg) + if not isinstance(payload, bytes): + error_msg = f"header payload must be bytes, not {type(payload)}" + raise TypeError(error_msg) + self.header_events[uri_to_string(event.uri)] = (event, payload) + if self._recorder is not None: + self._recorder.add_header_event(event, payload) + def service_command(_args): config_list, service_config = load_service_config(args) @@ -389,7 +486,7 @@ def client_start_command(_args): async def job(): reply = await EventClient(service_config).request_reply( - f"start/{config_name}", + f"/start/{config_name}", config_list, ) print(payload_to_protobuf(reply.event, reply.payload)) @@ -400,7 +497,7 @@ async def job(): def client_stop_command(_args): config_list, service_config = load_service_config(args) loop = asyncio.get_event_loop() - loop.run_until_complete(EventClient(service_config).request_reply("stop", Empty())) + loop.run_until_complete(EventClient(service_config).request_reply("/stop", Empty())) def record_command(_args): diff --git a/py/farm_ng/core/events_file_writer.py b/py/farm_ng/core/events_file_writer.py index aab3cb49..402c6b96 100644 --- a/py/farm_ng/core/events_file_writer.py +++ b/py/farm_ng/core/events_file_writer.py @@ -7,8 +7,9 @@ # pylint can't find Event or Uri in protobuf generated files # https://github.com/protocolbuffers/protobuf/issues/10372 from farm_ng.core.event_pb2 import Event +from farm_ng.core.events_file_reader import payload_to_protobuf from farm_ng.core.stamp import StampSemantics, get_monotonic_now, get_system_clock_now -from farm_ng.core.uri import make_proto_uri +from farm_ng.core.uri import make_proto_uri, uri_to_string from google.protobuf.json_format import MessageToJson if TYPE_CHECKING: @@ -56,6 +57,7 @@ def __init__( file_base: str | Path, extension: str = ".bin", max_file_mb: int = 0, + header_events: list[tuple[Event, bytes]] | None = None, ) -> None: """Create a new EventsFileWriter. @@ -63,6 +65,7 @@ def __init__( file_base: Path to and base of file name (without extension) where the events file will be logged. extension: Extension of the file to be logged. E.g., '.bin' or '.log' max_file_mb: Maximum log size in MB. Logging will roll over to new file when reached. Ignored if <= 0. + header_events: Tuples of events & payloads to include in every split of the log file """ if isinstance(file_base, str): file_base = Path(file_base) @@ -79,6 +82,10 @@ def __init__( self._max_file_length = int(max(0, max_file_mb) * 1e6) self._file_idx: int = 0 + self._header_events: dict[str, tuple[Event, bytes]] = {} + for (event, payload) in header_events or []: + self.add_header_event(event, payload) + def __enter__(self) -> EventsFileWriter: """Open the file for writing and return self.""" success: bool = self.open() @@ -134,6 +141,71 @@ def _increment_file_idx(self) -> None: """Increment the file index.""" self._file_idx += 1 + @property + def header_events(self) -> dict[str, tuple[Event, bytes]]: + """Return the dictionary of header events. + + Returns: + dict[str, tuple[Event, bytes]]: Dictionary of header events and payloads. + key: string representation of the uri + value: tuple of event and payload + """ + return self._header_events + + @property + def header_messages(self) -> list[Message]: + """Return the header_events, formatted as a list of protobuf messages. + + Returns: + list[Message]: List of header messages. + """ + return [ + payload_to_protobuf(event, payload) + for (event, payload) in self.header_events.values() + ] + + def add_header_event( + self, + event: Event, + payload: bytes, + write: bool = False, + ) -> None: + """Add a header event, and optionally writes it to the file. + + NOTE: Writing to file will fail if the file is not open. + Args: + event: Event to write. + payload: Payload to write. + write: If True, write the header event to the file. Defaults to False. + """ + if not isinstance(event, Event): + error_msg = f"header event must be Event, not {type(event)}" + raise TypeError(error_msg) + if not isinstance(payload, bytes): + error_msg = f"header payload must be bytes, not {type(payload)}" + raise TypeError(error_msg) + # Once a header event is written to the file, it cannot be changed + if uri_to_string(event.uri) not in self.header_events: + self._header_events[uri_to_string(event.uri)] = (event, payload) + if write: + self.write_event_payload(event, payload) + + def write_header_events(self) -> None: + """Write the header events to the file. + + NOTE: If the header events are too large to fit in the file, + this will raise a RuntimeError to avoid getting stuck in an infinite loop. + """ + + true_max_file_length = self.max_file_length + self._max_file_length = 0 + for (event, payload) in self.header_events.values(): + self.write_event_payload(event, payload) + self._max_file_length = true_max_file_length + if self.max_file_length and self.file_length > self.max_file_length: + msg = f"Header events are too large to fit in a file of size {self.max_file_length}" + raise RuntimeError(msg) + def open(self) -> bool: """Open the file for writing. @@ -141,6 +213,7 @@ def open(self) -> bool: """ self._file_stream = Path(self.file_name).open("wb") self._file_length = 0 + self.write_header_events() return self.is_open() def close(self) -> bool: @@ -162,9 +235,7 @@ def write_event_payload(self, event: Event, payload: bytes) -> None: if event.payload_length != len(payload): msg = f"Payload length mismatch {event.payload_length} != {len(payload)}" - raise RuntimeError( - msg, - ) + raise RuntimeError(msg) file_stream = cast(IO, self._file_stream) diff --git a/py/farm_ng/core/uri.py b/py/farm_ng/core/uri.py index 34d196de..ad9e6e00 100644 --- a/py/farm_ng/core/uri.py +++ b/py/farm_ng/core/uri.py @@ -140,8 +140,9 @@ def uri_to_string(uri: uri_pb2.Uri) -> str: >>> uri.scheme = "protobuf" >>> uri.authority = "farm-ng-1" >>> uri.query = "type=farm_ng.core.proto.Timestamp&pb=farm_ng/core/timestamp.proto" + >>> uri.path = "/stamp_stream" >>> uri_to_string(uri) - 'protobuf://farm-ng-1//?type=farm_ng.core.proto.Timestamp&pb=farm_ng/core/timestamp.proto' + 'protobuf://farm-ng-1//stamp_stream?type=farm_ng.core.proto.Timestamp&pb=farm_ng/core/timestamp.proto' """ return f"{uri.scheme}://{uri.authority}/{uri.path}?{uri.query}" diff --git a/py/tests/_asyncio/test_event_service_recorder.py b/py/tests/_asyncio/test_event_service_recorder.py index ca608bf9..310ddce4 100644 --- a/py/tests/_asyncio/test_event_service_recorder.py +++ b/py/tests/_asyncio/test_event_service_recorder.py @@ -1,7 +1,9 @@ import asyncio from pathlib import Path +from typing import TYPE_CHECKING import pytest +from farm_ng.core.event_pb2 import Event from farm_ng.core.event_service import EventServiceGrpc from farm_ng.core.event_service_pb2 import ( EventServiceConfigList, @@ -9,9 +11,13 @@ ) from farm_ng.core.event_service_recorder import EventServiceRecorder, RecorderService from farm_ng.core.events_file_reader import EventsFileReader, payload_to_protobuf +from farm_ng.core.uri import make_proto_uri from google.protobuf.message import Message from google.protobuf.wrappers_pb2 import Int32Value +if TYPE_CHECKING: + from farm_ng.core.uri_pb2 import Uri + async def request_reply_handler( request: RequestReplyRequest, @@ -79,6 +85,66 @@ async def test_event_service_recorder( event_message = event_log.read_message() assert event_message == message + @pytest.mark.anyio() + async def test_file_headers( + self, + tmp_path: Path, + event_service: EventServiceGrpc, + recorder_service: EventServiceRecorder, + ) -> None: + # reset the counts + event_service.reset() + event_service.request_reply_handler = request_reply_handler + + # Define some test values + header_uri_base: str = "baz_header" + header_count: int = 3 + + # Create the header and pass it to EventServiceRecorder + for i in range(10, 10 + header_count): + message = Int32Value(value=i) + payload: bytes = message.SerializeToString() + uri: Uri = make_proto_uri( + path=f"/{header_uri_base}_{i}", + message=message, + service_name=event_service.config.name, + ) + event = Event( + uri=uri, + timestamps=[], + payload_length=len(payload), + sequence=i, + ) + recorder_service.header_deque.append((event, payload)) + + # start the recording + file_name = tmp_path / "test_headers" + task = asyncio.create_task( + recorder_service.subscribe_and_record(file_name), + ) + await asyncio.sleep(0.1) + + # Cancel the recording + task.cancel() + await asyncio.sleep(0.1) + + file_name_bin = file_name.with_suffix(".0000.bin") + assert file_name_bin.exists() + + # read the file + reader = EventsFileReader(file_name_bin) + assert reader.open() + + # Check the headers + logged_headers: int = 0 + for event_log in reader.get_index(): + # Check headers - skip any other events + if header_uri_base in event_log.event.uri.path: + logged_headers += 1 + event_message = event_log.read_message() + assert isinstance(event_message, Int32Value) + assert logged_headers == header_count + class TestRecorderService: @pytest.mark.anyio() diff --git a/py/tests/test_events_file_writer.py b/py/tests/test_events_file_writer.py new file mode 100644 index 00000000..2b18a05b --- /dev/null +++ b/py/tests/test_events_file_writer.py @@ -0,0 +1,239 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest +from farm_ng.core.event_pb2 import Event +from farm_ng.core.events_file_reader import EventsFileReader, payload_to_protobuf +from farm_ng.core.events_file_writer import EventsFileWriter +from farm_ng.core.uri import make_proto_uri +from google.protobuf.wrappers_pb2 import Int32Value, StringValue + +if TYPE_CHECKING: + from pathlib import Path + + from farm_ng.core.uri_pb2 import Uri + from google.protobuf.message import Message + + +@pytest.fixture() +def file_base(tmp_path: Path) -> Path: + return tmp_path / "test_events_file" + + +@pytest.fixture() +def writer(file_base: Path): + return EventsFileWriter(file_base=file_base) + + +@pytest.fixture() +def reader(file_base: Path, writer: EventsFileWriter): + with writer as opened_writer: + opened_writer.write("test_path", Int32Value(value=123)) + return EventsFileReader(file_name=file_base.with_suffix(".0000.bin")) + + +class TestEventsFileWriter: + def test_open_and_close_file_writer(self, writer): + with writer as opened_writer: + assert opened_writer.is_open() + assert writer.is_closed() + + def test_write_and_read_messages(self, writer, reader): + event_count: int = 10 + with writer as opened_writer: + # Start at 1 to avoid payload length of 0 + for i in range(1, 1 + event_count): + opened_writer.write("test_path", Int32Value(value=i)) + + counted_events: int = 0 + with reader as opened_reader: + for event_log in opened_reader.get_index(): + event_message = event_log.read_message() + assert event_message == Int32Value(value=1 + counted_events) + assert event_log.event is not None + assert event_log.event.uri.path == "test_path" + assert event_log.event.payload_length == 2 + counted_events += 1 + assert counted_events == event_count + + def test_write_event_payload(self, writer, reader): + with writer as opened_writer: + message: StringValue = StringValue(value="test_payload") + event_payload: bytes = message.SerializeToString() + uri: Uri = make_proto_uri( + path="/test_path", + message=message, + service_name="test_service", + ) + event = Event( + uri=uri, + timestamps=[], + payload_length=len(event_payload), + sequence=0, + ) + opened_writer.write_event_payload(event, event_payload) + + with reader as opened_reader: + event_log = opened_reader.read_next_event() + message = opened_reader.read_message(event_log) + assert message == payload_to_protobuf(event, event_payload) + + def test_rollover_behavior(self, file_base: Path): + max_file_bytes: int = 1000 + with EventsFileWriter( + file_base=file_base, + max_file_mb=max_file_bytes * 1e-6, + ) as opened_writer: + approx_expected_length: int = 0 + for i in range(100): + message = StringValue(value=f"test_payload_{i}") + payload: bytes = message.SerializeToString() + uri: Uri = make_proto_uri( + path="/test_path", + message=message, + service_name="test_service", + ) + event = Event( + uri=uri, + timestamps=[], + payload_length=len(payload), + sequence=i, + ) + opened_writer.write_event_payload(event, payload) + approx_expected_length += len(payload) + len(event.SerializeToString()) + + # Check that the file was rolled over the expected number of times + approx_expected_files: int = approx_expected_length // max_file_bytes + actual_files: int = 1 + opened_writer.file_idx + assert approx_expected_files > 5 # sufficient number of rollovers (params) + # The expected logging size is not a perfect estimate, so allow some wiggle room + assert approx_expected_files == pytest.approx(actual_files, rel=0.5) + + for i in range(actual_files): + file_name_bin = file_base.with_suffix(f".{i:04d}.bin") + assert file_name_bin.exists() + with EventsFileReader(file_name_bin) as opened_reader: + if i != actual_files - 1: + # Check that the length is roughly the max file size + assert opened_reader.file_length == pytest.approx( + max_file_bytes, + rel=0.5, + ) + for event_log in opened_reader.get_index(): + message = opened_reader.read_message(event_log) + assert isinstance(message, StringValue) + assert event_log.event.payload_length == len( + message.SerializeToString(), + ) + + def test_headers(self, file_base: Path): + # Build a list of test parameters + header_uri_base: str = "baz_header" + header_uri_service: str = "test_service" + header_count: int = 10 + headers: list[tuple[Event, bytes]] = [] + header_messages: list[Message] = [] + approx_header_size: int = 0 + max_file_bytes: int = 10000 + + # Create the header events + for i in range(header_count): + message = Int32Value(value=i) + event_payload: bytes = message.SerializeToString() + uri: Uri = make_proto_uri( + path=f"/{header_uri_base}_{i}", + message=message, + service_name=header_uri_service, + ) + event = Event( + uri=uri, + timestamps=[], + payload_length=len(event_payload), + sequence=i, + ) + headers.append((event, event_payload)) + header_messages.append(message) + approx_header_size += len(event.SerializeToString()) + len(event_payload) + + # Add some extra headers with duplicate URI's to ensure they are filtered out + dup_message = Int32Value(value=999 + i) + dup_payload: bytes = dup_message.SerializeToString() + dup_event = Event( + uri=uri, + timestamps=[], + payload_length=len(dup_payload), + sequence=i + header_count, + ) + headers.append((dup_event, dup_payload)) + + # Test that headers are written to the file + assert max_file_bytes > approx_header_size * 1.5 # sufficient buffer (params) + with EventsFileWriter( + file_base=file_base, + max_file_mb=max_file_bytes * 1e-6, + header_events=headers, + ) as opened_writer: + assert opened_writer.file_idx == 0 + assert opened_writer.file_length == pytest.approx( + approx_header_size, + rel=0.5, + ) + assert opened_writer.header_messages == header_messages + + # Test that headers cannot exceed the max file size + with pytest.raises(RuntimeError): + with EventsFileWriter( + file_base=file_base, + max_file_mb=(approx_header_size / 2) * 1e-6, + header_events=headers, + ) as opened_writer: + pass + + # Test that headers are added every rollover + file_count: int = 0 + with EventsFileWriter( + file_base=file_base, + max_file_mb=max_file_bytes * 1e-6, + header_events=headers, + ) as opened_writer: + assert opened_writer.file_idx == 0 + assert opened_writer.file_length == pytest.approx( + approx_header_size, + rel=0.5, + ) + for i in range(1000): + opened_writer.write("test_path", StringValue(value=f"test_payload_{i}")) + file_count = 1 + opened_writer.file_idx + assert file_count > 5 # sufficient number of rollovers (params) + assert opened_writer.header_messages == header_messages + + for i in range(file_count): + header_count_in_file: int = 0 + message_count_in_file: int = 0 + file_name_bin = file_base.with_suffix(f".{i:04d}.bin") + assert file_name_bin.exists() + with EventsFileReader(file_name_bin) as opened_reader: + for event_log in opened_reader.get_index(): + message = opened_reader.read_message(event_log) + if isinstance(message, StringValue): + message_count_in_file += 1 + elif isinstance(message, Int32Value): + # Test that the duplicate header was filtered out + # And that the headers are in order + assert message.value == header_count_in_file + header_count_in_file += 1 + else: + msg = f"Unexpected message type: {type(message)}" + raise TypeError(msg) + + assert header_count_in_file == header_count + if i == file_count - 1: + assert message_count_in_file > 0 + else: + assert message_count_in_file > 10 + # Check that the length is roughly the max file size + assert opened_reader.file_length == pytest.approx( + max_file_bytes, + rel=0.5, + )