Skip to content

Commit

Permalink
Add header messages (including latched) to EventsFileWriter (#196)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hackerman342 authored and strasdat committed Mar 28, 2024
1 parent 7eb19c5 commit 5273dcb
Show file tree
Hide file tree
Showing 5 changed files with 499 additions and 25 deletions.
137 changes: 117 additions & 20 deletions py/farm_ng/core/event_service_recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
import asyncio
import logging
import sys
from collections import deque
from datetime import datetime, timezone
from pathlib import Path
from typing import TYPE_CHECKING

import grpc
from farm_ng.core import event_pb2
from farm_ng.core.event_client import EventClient
from farm_ng.core.event_service import (
EventServiceGrpc,
Expand All @@ -48,12 +50,11 @@
from farm_ng.core.events_file_reader import payload_to_protobuf, proto_from_json_file
from farm_ng.core.events_file_writer import EventsFileWriter
from farm_ng.core.stamp import StampSemantics, get_monotonic_now
from farm_ng.core.uri import get_host_name, uri_query_to_dict
from farm_ng.core.uri import get_host_name, uri_query_to_dict, uri_to_string
from google.protobuf.empty_pb2 import Empty
from google.protobuf.wrappers_pb2 import StringValue

if TYPE_CHECKING:
from farm_ng.core import event_pb2
from google.protobuf.message import Message

__all__ = ["EventServiceRecorder", "RecorderService"]
Expand All @@ -65,12 +66,18 @@ class EventServiceRecorder:
# the maximum size of the queue
QUEUE_MAX_SIZE: int = 50

def __init__(self, service_name: str, config_list: EventServiceConfigList) -> None:
def __init__(
self,
service_name: str,
config_list: EventServiceConfigList,
header_events: list[tuple[event_pb2.Event, bytes]] | None = None,
) -> None:
"""Initializes the service.
Args:
service_name (str): the name of the service.
config_list (EventServiceConfigList): the configuration data structure.
header_events (list[tuple[event_pb2.Event, bytes]], optional): the initial list header events.
"""
self.service_name: str = service_name
self.config_list: EventServiceConfigList = config_list
Expand Down Expand Up @@ -100,6 +107,25 @@ def __init__(self, service_name: str, config_list: EventServiceConfigList) -> No
self.record_queue: asyncio.Queue[tuple[event_pb2.Event, bytes]] = asyncio.Queue(
maxsize=self.QUEUE_MAX_SIZE,
)
self.header_deque: deque[tuple[event_pb2.Event, bytes]] = deque()
if header_events is not None:
for event, payload in header_events:
self.add_header_event(event, payload)

def add_header_event(self, event: event_pb2.Event, payload: bytes) -> None:
"""Add a header event to the header_deque.
Args:
event: Event to add.
payload: Payload to add.
"""
if not isinstance(event, event_pb2.Event):
error_msg = f"header event must be Event, not {type(event)}"
raise TypeError(error_msg)
if not isinstance(payload, bytes):
error_msg = f"header payload must be bytes, not {type(payload)}"
raise TypeError(error_msg)
self.header_deque.append((event, payload))

@property
def logger(self) -> logging.Logger:
Expand All @@ -124,10 +150,20 @@ async def record(
extension (str, optional): the extension of the file. Defaults to ".bin".
max_file_mb (int, optional): the maximum size of the file in MB. Defaults to 0.
"""
with EventsFileWriter(file_base, extension, max_file_mb) as writer:
with EventsFileWriter(
file_base,
extension,
max_file_mb,
header_events=list(self.header_deque),
) as writer:
self.header_deque.clear()
event: event_pb2.Event
payload: bytes
while True:
# Add any header events added during recording
while self.header_deque:
event, payload = self.header_deque.popleft()
writer.add_header_event(event, payload, write=True)
# await a new event and payload, and write it to the file
event, payload = await self.record_queue.get()
event.timestamps.append(
Expand All @@ -149,6 +185,9 @@ async def subscribe(
event: event_pb2.Event
payload: bytes
async for event, payload in client.subscribe(subscription, decode=False):
if event.uri.path.startswith("/header"):
# Handle header events - typically these will be metadata or calibrations
self.add_header_event(event, payload)
try:
self.record_queue.put_nowait((event, payload))
except asyncio.QueueFull:
Expand Down Expand Up @@ -263,6 +302,9 @@ def __init__(self, event_service: EventServiceGrpc) -> None:
# the recorder task
self._recorder_task: asyncio.Task | None = None

# For tracking header events (e.g. metadata, calibrations) to be logged in the recordings
self.header_events: dict[str, tuple[event_pb2.Event, bytes]] = {}

# public methods

async def start_recording(
Expand All @@ -287,9 +329,32 @@ async def start_recording(
self._recorder = EventServiceRecorder(
config_name or "record_default",
config_list,
list(self.header_events.values()),
)
if self._recorder.recorder_config is None:
msg = "recorder_config is None"
raise ValueError(msg)

# Handle if the max_file_mb is set in the record_config args
# Expectation: single string of "--max-file-mb=500" or "--max-file-mb 500"
max_file_mb: int = 0
for arg in self._recorder.recorder_config.args:
if arg.startswith("--max-file-mb"):
try:
eq_arg = arg.replace(" ", "=").strip()
max_file_mb = int(eq_arg.split("=")[1])
self._recorder.logger.info("Setting max_file_mb to %s", max_file_mb)
except ValueError:
self._recorder.logger.exception(
"Failed to parse max_file_mb from %s",
eq_arg,
)

self._recorder_task = asyncio.create_task(
self._recorder.subscribe_and_record(file_base=file_base),
self._recorder.subscribe_and_record(
file_base=file_base,
max_file_mb=max_file_mb,
),
)

def _safe_done_callback(
Expand All @@ -303,6 +368,7 @@ def _safe_done_callback(
finally:
del future
del recorder_task
self._recorder = None

self._recorder_task.add_done_callback(
lambda f: _safe_done_callback(f, self._recorder_task),
Expand Down Expand Up @@ -335,31 +401,62 @@ async def _request_reply_handler(
Message: the response message.
"""
cmd: str = request.event.uri.path
config_name: str | None = None
if cmd.count("/") == 1:
cmd, config_name = cmd.split("/")
# Ensure path was sent with a leading slash
if not cmd.startswith("/"):
cmd = "/" + cmd

if cmd.startswith("/start"):
# config_name is the optional part of the path after "/start/"
config_name: str | None = cmd[7:] or None

if cmd == "start":
config_list: EventServiceConfigList = payload_to_protobuf(
request.event,
request.payload,
)
self._event_service.logger.info("start %s: %s", config_name, config_list)
self._event_service.logger.info("/start %s: %s", config_name, config_list)
file_base = self._data_dir.joinpath(get_file_name_base())
await self.start_recording(file_base, config_list, config_name)
return StringValue(value=str(file_base))
if cmd == "stop":
if cmd == "/stop":
self._event_service.logger.info("/stop")
await self.stop_recording()
elif cmd == "metadata":
elif cmd.startswith("/header"):
header_detail: str = cmd[8:] or ""
if not header_detail:
msg = "/header command requires a specifier, e.g. '/header/metadata'"
raise ValueError(msg)
self._event_service.logger.info(
"header:\n%s",
payload_to_protobuf(request.event, request.payload),
)
self._event_service.logger.info("with uri:\n%s", request.event.uri)
self.add_header_event(request.event, request.payload)
elif cmd == "/clear_headers":
self._event_service.logger.info("/clear_headers")
self.header_events.clear()
if self._recorder is not None:
self._event_service.logger.info("send_metadata: %s", request.payload)
await self._recorder.record_queue.put((request.event, request.payload))
else:
self._event_service.logger.warning(
"requested to send metadata but not recording",
)
self._recorder.header_deque.clear()
return Empty()

def add_header_event(self, event: event_pb2.Event, payload: bytes) -> None:
"""Adds a header event to the header_events list.
If this is a duplicate (per URI), it will replace the existing message.
Args:
event (event_pb2.Event): the event.
payload (bytes): the payload.
"""
if not isinstance(event, event_pb2.Event):
error_msg = f"header event must be Event, not {type(event)}"
raise TypeError(error_msg)
if not isinstance(payload, bytes):
error_msg = f"header payload must be bytes, not {type(payload)}"
raise TypeError(error_msg)
self.header_events[uri_to_string(event.uri)] = (event, payload)
if self._recorder is not None:
self._recorder.add_header_event(event, payload)


def service_command(_args):
config_list, service_config = load_service_config(args)
Expand Down Expand Up @@ -389,7 +486,7 @@ def client_start_command(_args):

async def job():
reply = await EventClient(service_config).request_reply(
f"start/{config_name}",
f"/start/{config_name}",
config_list,
)
print(payload_to_protobuf(reply.event, reply.payload))
Expand All @@ -400,7 +497,7 @@ async def job():
def client_stop_command(_args):
config_list, service_config = load_service_config(args)
loop = asyncio.get_event_loop()
loop.run_until_complete(EventClient(service_config).request_reply("stop", Empty()))
loop.run_until_complete(EventClient(service_config).request_reply("/stop", Empty()))


def record_command(_args):
Expand Down
79 changes: 75 additions & 4 deletions py/farm_ng/core/events_file_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
# pylint can't find Event or Uri in protobuf generated files
# https://github.com/protocolbuffers/protobuf/issues/10372
from farm_ng.core.event_pb2 import Event
from farm_ng.core.events_file_reader import payload_to_protobuf
from farm_ng.core.stamp import StampSemantics, get_monotonic_now, get_system_clock_now
from farm_ng.core.uri import make_proto_uri
from farm_ng.core.uri import make_proto_uri, uri_to_string
from google.protobuf.json_format import MessageToJson

if TYPE_CHECKING:
Expand Down Expand Up @@ -56,13 +57,15 @@ def __init__(
file_base: str | Path,
extension: str = ".bin",
max_file_mb: int = 0,
header_events: list[tuple[Event, bytes]] | None = None,
) -> None:
"""Create a new EventsFileWriter.
Args:
file_base: Path to and base of file name (without extension) where the events file will be logged.
extension: Extension of the file to be logged. E.g., '.bin' or '.log'
max_file_mb: Maximum log size in MB. Logging will roll over to new file when reached. Ignored if <= 0.
header_events: Tuples of events & payloads to include in every split of the log file
"""
if isinstance(file_base, str):
file_base = Path(file_base)
Expand All @@ -79,6 +82,10 @@ def __init__(
self._max_file_length = int(max(0, max_file_mb) * 1e6)
self._file_idx: int = 0

self._header_events: dict[str, tuple[Event, bytes]] = {}
for (event, payload) in header_events or []:
self.add_header_event(event, payload)

def __enter__(self) -> EventsFileWriter:
"""Open the file for writing and return self."""
success: bool = self.open()
Expand Down Expand Up @@ -134,13 +141,79 @@ def _increment_file_idx(self) -> None:
"""Increment the file index."""
self._file_idx += 1

@property
def header_events(self) -> dict[str, tuple[Event, bytes]]:
"""Return the dictionary of header events.
Returns:
dict[str, tuple[Event, bytes]]: Dictionary of header events and payloads.
key: string representation of the uri
value: tuple of event and payload
"""
return self._header_events

@property
def header_messages(self) -> list[Message]:
"""Return the header_events, formatted as a list of protobuf messages.
Returns:
list[Message]: List of header messages.
"""
return [
payload_to_protobuf(event, payload)
for (event, payload) in self.header_events.values()
]

def add_header_event(
self,
event: Event,
payload: bytes,
write: bool = False,
) -> None:
"""Add a header event, and optionally writes it to the file.
NOTE: Writing to file will fail if the file is not open.
Args:
event: Event to write.
payload: Payload to write.
write: If True, write the header event to the file. Defaults to False.
"""
if not isinstance(event, Event):
error_msg = f"header event must be Event, not {type(event)}"
raise TypeError(error_msg)
if not isinstance(payload, bytes):
error_msg = f"header payload must be bytes, not {type(payload)}"
raise TypeError(error_msg)
# Once a header event is written to the file, it cannot be changed
if uri_to_string(event.uri) not in self.header_events:
self._header_events[uri_to_string(event.uri)] = (event, payload)
if write:
self.write_event_payload(event, payload)

def write_header_events(self) -> None:
"""Write the header events to the file.
NOTE: If the header events are too large to fit in the file,
this will raise a RuntimeError to avoid getting stuck in an infinite loop.
"""

true_max_file_length = self.max_file_length
self._max_file_length = 0
for (event, payload) in self.header_events.values():
self.write_event_payload(event, payload)
self._max_file_length = true_max_file_length
if self.max_file_length and self.file_length > self.max_file_length:
msg = f"Header events are too large to fit in a file of size {self.max_file_length}"
raise RuntimeError(msg)

def open(self) -> bool:
"""Open the file for writing.
Return True if successful.
"""
self._file_stream = Path(self.file_name).open("wb")
self._file_length = 0
self.write_header_events()
return self.is_open()

def close(self) -> bool:
Expand All @@ -162,9 +235,7 @@ def write_event_payload(self, event: Event, payload: bytes) -> None:

if event.payload_length != len(payload):
msg = f"Payload length mismatch {event.payload_length} != {len(payload)}"
raise RuntimeError(
msg,
)
raise RuntimeError(msg)

file_stream = cast(IO, self._file_stream)

Expand Down
3 changes: 2 additions & 1 deletion py/farm_ng/core/uri.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,9 @@ def uri_to_string(uri: uri_pb2.Uri) -> str:
>>> uri.scheme = "protobuf"
>>> uri.authority = "farm-ng-1"
>>> uri.query = "type=farm_ng.core.proto.Timestamp&pb=farm_ng/core/timestamp.proto"
>>> uri.path = "/stamp_stream"
>>> uri_to_string(uri)
'protobuf://farm-ng-1//?type=farm_ng.core.proto.Timestamp&pb=farm_ng/core/timestamp.proto'
'protobuf://farm-ng-1//stamp_stream?type=farm_ng.core.proto.Timestamp&pb=farm_ng/core/timestamp.proto'
"""
return f"{uri.scheme}://{uri.authority}/{uri.path}?{uri.query}"

Expand Down
Loading

0 comments on commit 5273dcb

Please sign in to comment.