Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add header messages (including latched) to EventsFileWriter #196

Merged
merged 28 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
e17732f
Add concept of header messages to EventsFileWriter
Hackerman342 Dec 8, 2023
903653c
Fix new mypy issues
Hackerman342 Dec 12, 2023
9598106
Plumb through header msgs as list[(event, payload),...]
Hackerman342 Dec 12, 2023
bee805c
Merge remote-tracking branch 'origin/main' into headers-split-logs
Hackerman342 Dec 12, 2023
8ca0bcd
Add 'header_uris' field to EventServiceConfig
Hackerman342 Dec 12, 2023
a36c989
Fix startup bug when no max file size is specified
Hackerman342 Dec 12, 2023
b1e47c2
Add test for header messages through EventServiceRecorder
Hackerman342 Dec 13, 2023
0e5317d
Add TestEventsFileWriter
Hackerman342 Dec 13, 2023
7e02b15
Last rollover file may only have 1 message
Hackerman342 Dec 13, 2023
b56eeb6
EventsFileWriter headers as dict - prevent unbounded growth
Hackerman342 Dec 13, 2023
2aae43a
More lenient on rough file size comp
Hackerman342 Dec 13, 2023
baad5ab
Bring dict logic for headers into EventServiceRecorder
Hackerman342 Dec 13, 2023
1a79d2e
message_count logic was flipped
Hackerman342 Dec 13, 2023
1b81fdc
EventsFileWriter headers cannot be changed
Hackerman342 Dec 13, 2023
8902d8f
Reduce test dependency on inaccurate file length estimation
Hackerman342 Dec 13, 2023
5fda933
Match on uri_to_string -- seems safer than direct proto comparison
Hackerman342 Dec 13, 2023
1b4def0
Tweak test comments
Hackerman342 Dec 13, 2023
d473696
Merge remote-tracking branch 'origin/main' into headers-split-logs
Hackerman342 Dec 14, 2023
2903251
Set self._recorder = None in _safe_done_callback
Hackerman342 Dec 14, 2023
378b98d
Require addtl arg for header cmd (e.g. 'header/metadata')
Hackerman342 Dec 14, 2023
03293c2
Consistency - leading slash for req-rep (e.g., '/start')
Hackerman342 Dec 14, 2023
c09db6c
Add more type checking on header events / payloads
Hackerman342 Dec 15, 2023
84ae0d0
Rename header_msgs -> header_events (more accurate)
Hackerman342 Dec 15, 2023
e790025
Add method for getting header_messages as protobufs w/ test
Hackerman342 Dec 15, 2023
3e1c957
Simplification - just start uri path's with /header
Hackerman342 Dec 15, 2023
d3863d5
Add handling of --max-file-mb=<###> arg in recorder EventServiceConfig
Hackerman342 Dec 15, 2023
17d815b
Merge remote-tracking branch 'origin/main' into headers-split-logs
Hackerman342 Dec 15, 2023
130924e
Meet new docstring formatting requirements
Hackerman342 Dec 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 117 additions & 20 deletions py/farm_ng/core/event_service_recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
import asyncio
import logging
import sys
from collections import deque
from datetime import datetime, timezone
from pathlib import Path
from typing import TYPE_CHECKING

import grpc
from farm_ng.core import event_pb2
from farm_ng.core.event_client import EventClient
from farm_ng.core.event_service import (
EventServiceGrpc,
Expand All @@ -48,12 +50,11 @@
from farm_ng.core.events_file_reader import payload_to_protobuf, proto_from_json_file
from farm_ng.core.events_file_writer import EventsFileWriter
from farm_ng.core.stamp import StampSemantics, get_monotonic_now
from farm_ng.core.uri import get_host_name, uri_query_to_dict
from farm_ng.core.uri import get_host_name, uri_query_to_dict, uri_to_string
from google.protobuf.empty_pb2 import Empty
from google.protobuf.wrappers_pb2 import StringValue

if TYPE_CHECKING:
from farm_ng.core import event_pb2
from google.protobuf.message import Message

__all__ = ["EventServiceRecorder", "RecorderService"]
Expand All @@ -65,12 +66,18 @@ class EventServiceRecorder:
# the maximum size of the queue
QUEUE_MAX_SIZE: int = 50

def __init__(self, service_name: str, config_list: EventServiceConfigList) -> None:
def __init__(
self,
service_name: str,
config_list: EventServiceConfigList,
header_events: list[tuple[event_pb2.Event, bytes]] | None = None,
) -> None:
"""Initializes the service.

Args:
service_name (str): the name of the service.
config_list (EventServiceConfigList): the configuration data structure.
header_events (list[tuple[event_pb2.Event, bytes]], optional): the initial list header events.
"""
self.service_name: str = service_name
self.config_list: EventServiceConfigList = config_list
Expand Down Expand Up @@ -100,6 +107,25 @@ def __init__(self, service_name: str, config_list: EventServiceConfigList) -> No
self.record_queue: asyncio.Queue[tuple[event_pb2.Event, bytes]] = asyncio.Queue(
maxsize=self.QUEUE_MAX_SIZE,
)
self.header_deque: deque[tuple[event_pb2.Event, bytes]] = deque()
if header_events is not None:
for event, payload in header_events:
self.add_header_event(event, payload)

def add_header_event(self, event: event_pb2.Event, payload: bytes) -> None:
"""Add a header event to the header_deque.

Args:
event: Event to add.
payload: Payload to add.
"""
if not isinstance(event, event_pb2.Event):
error_msg = f"header event must be Event, not {type(event)}"
raise TypeError(error_msg)
if not isinstance(payload, bytes):
error_msg = f"header payload must be bytes, not {type(payload)}"
raise TypeError(error_msg)
self.header_deque.append((event, payload))

@property
def logger(self) -> logging.Logger:
Expand All @@ -124,10 +150,20 @@ async def record(
extension (str, optional): the extension of the file. Defaults to ".bin".
max_file_mb (int, optional): the maximum size of the file in MB. Defaults to 0.
"""
with EventsFileWriter(file_base, extension, max_file_mb) as writer:
with EventsFileWriter(
file_base,
extension,
max_file_mb,
header_events=list(self.header_deque),
) as writer:
self.header_deque.clear()
event: event_pb2.Event
payload: bytes
while True:
# Add any header events added during recording
while self.header_deque:
event, payload = self.header_deque.popleft()
writer.add_header_event(event, payload, write=True)
# await a new event and payload, and write it to the file
event, payload = await self.record_queue.get()
event.timestamps.append(
Expand All @@ -149,6 +185,9 @@ async def subscribe(
event: event_pb2.Event
payload: bytes
async for event, payload in client.subscribe(subscription, decode=False):
if event.uri.path.startswith("/header"):
# Handle header events - typically these will be metadata or calibrations
self.add_header_event(event, payload)
try:
self.record_queue.put_nowait((event, payload))
except asyncio.QueueFull:
Expand Down Expand Up @@ -263,6 +302,9 @@ def __init__(self, event_service: EventServiceGrpc) -> None:
# the recorder task
self._recorder_task: asyncio.Task | None = None

# For tracking header events (e.g. metadata, calibrations) to be logged in the recordings
self.header_events: dict[str, tuple[event_pb2.Event, bytes]] = {}

# public methods

async def start_recording(
Expand All @@ -287,9 +329,32 @@ async def start_recording(
self._recorder = EventServiceRecorder(
config_name or "record_default",
config_list,
list(self.header_events.values()),
)
if self._recorder.recorder_config is None:
msg = "recorder_config is None"
raise ValueError(msg)

# Handle if the max_file_mb is set in the record_config args
# Expectation: single string of "--max-file-mb=500" or "--max-file-mb 500"
max_file_mb: int = 0
for arg in self._recorder.recorder_config.args:
if arg.startswith("--max-file-mb"):
try:
eq_arg = arg.replace(" ", "=").strip()
max_file_mb = int(eq_arg.split("=")[1])
self._recorder.logger.info("Setting max_file_mb to %s", max_file_mb)
except ValueError:
self._recorder.logger.exception(
"Failed to parse max_file_mb from %s",
eq_arg,
)

self._recorder_task = asyncio.create_task(
self._recorder.subscribe_and_record(file_base=file_base),
self._recorder.subscribe_and_record(
file_base=file_base,
max_file_mb=max_file_mb,
),
)

def _safe_done_callback(
Expand All @@ -303,6 +368,7 @@ def _safe_done_callback(
finally:
del future
del recorder_task
self._recorder = None

self._recorder_task.add_done_callback(
lambda f: _safe_done_callback(f, self._recorder_task),
Expand Down Expand Up @@ -335,31 +401,62 @@ async def _request_reply_handler(
Message: the response message.
"""
cmd: str = request.event.uri.path
config_name: str | None = None
if cmd.count("/") == 1:
cmd, config_name = cmd.split("/")
# Ensure path was sent with a leading slash
if not cmd.startswith("/"):
cmd = "/" + cmd

if cmd.startswith("/start"):
# config_name is the optional part of the path after "/start/"
config_name: str | None = cmd[7:] or None

if cmd == "start":
config_list: EventServiceConfigList = payload_to_protobuf(
request.event,
request.payload,
)
self._event_service.logger.info("start %s: %s", config_name, config_list)
self._event_service.logger.info("/start %s: %s", config_name, config_list)
file_base = self._data_dir.joinpath(get_file_name_base())
await self.start_recording(file_base, config_list, config_name)
return StringValue(value=str(file_base))
if cmd == "stop":
if cmd == "/stop":
self._event_service.logger.info("/stop")
await self.stop_recording()
elif cmd == "metadata":
elif cmd.startswith("/header"):
header_detail: str = cmd[8:] or ""
if not header_detail:
msg = "/header command requires a specifier, e.g. '/header/metadata'"
raise ValueError(msg)
self._event_service.logger.info(
"header:\n%s",
payload_to_protobuf(request.event, request.payload),
)
self._event_service.logger.info("with uri:\n%s", request.event.uri)
self.add_header_event(request.event, request.payload)
elif cmd == "/clear_headers":
self._event_service.logger.info("/clear_headers")
self.header_events.clear()
if self._recorder is not None:
self._event_service.logger.info("send_metadata: %s", request.payload)
await self._recorder.record_queue.put((request.event, request.payload))
else:
self._event_service.logger.warning(
"requested to send metadata but not recording",
)
self._recorder.header_deque.clear()
return Empty()

def add_header_event(self, event: event_pb2.Event, payload: bytes) -> None:
"""Adds a header event to the header_events list.

If this is a duplicate (per URI), it will replace the existing message.

Args:
event (event_pb2.Event): the event.
payload (bytes): the payload.
"""
if not isinstance(event, event_pb2.Event):
error_msg = f"header event must be Event, not {type(event)}"
raise TypeError(error_msg)
if not isinstance(payload, bytes):
error_msg = f"header payload must be bytes, not {type(payload)}"
raise TypeError(error_msg)
self.header_events[uri_to_string(event.uri)] = (event, payload)
if self._recorder is not None:
self._recorder.add_header_event(event, payload)


def service_command(_args):
config_list, service_config = load_service_config(args)
Expand Down Expand Up @@ -389,7 +486,7 @@ def client_start_command(_args):

async def job():
reply = await EventClient(service_config).request_reply(
f"start/{config_name}",
f"/start/{config_name}",
config_list,
)
print(payload_to_protobuf(reply.event, reply.payload))
Expand All @@ -400,7 +497,7 @@ async def job():
def client_stop_command(_args):
config_list, service_config = load_service_config(args)
loop = asyncio.get_event_loop()
loop.run_until_complete(EventClient(service_config).request_reply("stop", Empty()))
loop.run_until_complete(EventClient(service_config).request_reply("/stop", Empty()))


def record_command(_args):
Expand Down
79 changes: 75 additions & 4 deletions py/farm_ng/core/events_file_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
# pylint can't find Event or Uri in protobuf generated files
# https://github.com/protocolbuffers/protobuf/issues/10372
from farm_ng.core.event_pb2 import Event
from farm_ng.core.events_file_reader import payload_to_protobuf
from farm_ng.core.stamp import StampSemantics, get_monotonic_now, get_system_clock_now
from farm_ng.core.uri import make_proto_uri
from farm_ng.core.uri import make_proto_uri, uri_to_string
from google.protobuf.json_format import MessageToJson

if TYPE_CHECKING:
Expand Down Expand Up @@ -56,13 +57,15 @@ def __init__(
file_base: str | Path,
extension: str = ".bin",
max_file_mb: int = 0,
header_events: list[tuple[Event, bytes]] | None = None,
) -> None:
"""Create a new EventsFileWriter.

Args:
file_base: Path to and base of file name (without extension) where the events file will be logged.
extension: Extension of the file to be logged. E.g., '.bin' or '.log'
max_file_mb: Maximum log size in MB. Logging will roll over to new file when reached. Ignored if <= 0.
header_events: Tuples of events & payloads to include in every split of the log file
"""
if isinstance(file_base, str):
file_base = Path(file_base)
Expand All @@ -79,6 +82,10 @@ def __init__(
self._max_file_length = int(max(0, max_file_mb) * 1e6)
self._file_idx: int = 0

self._header_events: dict[str, tuple[Event, bytes]] = {}
for (event, payload) in header_events or []:
self.add_header_event(event, payload)

def __enter__(self) -> EventsFileWriter:
"""Open the file for writing and return self."""
success: bool = self.open()
Expand Down Expand Up @@ -134,13 +141,79 @@ def _increment_file_idx(self) -> None:
"""Increment the file index."""
self._file_idx += 1

@property
def header_events(self) -> dict[str, tuple[Event, bytes]]:
"""Return the dictionary of header events.

Returns:
dict[str, tuple[Event, bytes]]: Dictionary of header events and payloads.
key: string representation of the uri
value: tuple of event and payload
"""
return self._header_events

@property
def header_messages(self) -> list[Message]:
"""Return the header_events, formatted as a list of protobuf messages.

Returns:
list[Message]: List of header messages.
"""
return [
payload_to_protobuf(event, payload)
for (event, payload) in self.header_events.values()
]

def add_header_event(
self,
event: Event,
payload: bytes,
write: bool = False,
) -> None:
"""Add a header event, and optionally writes it to the file.

NOTE: Writing to file will fail if the file is not open.
Args:
event: Event to write.
payload: Payload to write.
write: If True, write the header event to the file. Defaults to False.
"""
if not isinstance(event, Event):
error_msg = f"header event must be Event, not {type(event)}"
raise TypeError(error_msg)
if not isinstance(payload, bytes):
error_msg = f"header payload must be bytes, not {type(payload)}"
raise TypeError(error_msg)
# Once a header event is written to the file, it cannot be changed
if uri_to_string(event.uri) not in self.header_events:
self._header_events[uri_to_string(event.uri)] = (event, payload)
if write:
self.write_event_payload(event, payload)

def write_header_events(self) -> None:
"""Write the header events to the file.

NOTE: If the header events are too large to fit in the file,
this will raise a RuntimeError to avoid getting stuck in an infinite loop.
"""

true_max_file_length = self.max_file_length
self._max_file_length = 0
for (event, payload) in self.header_events.values():
self.write_event_payload(event, payload)
self._max_file_length = true_max_file_length
if self.max_file_length and self.file_length > self.max_file_length:
msg = f"Header events are too large to fit in a file of size {self.max_file_length}"
raise RuntimeError(msg)

def open(self) -> bool:
"""Open the file for writing.

Return True if successful.
"""
self._file_stream = Path(self.file_name).open("wb")
self._file_length = 0
self.write_header_events()
return self.is_open()

def close(self) -> bool:
Expand All @@ -162,9 +235,7 @@ def write_event_payload(self, event: Event, payload: bytes) -> None:

if event.payload_length != len(payload):
msg = f"Payload length mismatch {event.payload_length} != {len(payload)}"
raise RuntimeError(
msg,
)
raise RuntimeError(msg)

file_stream = cast(IO, self._file_stream)

Expand Down
3 changes: 2 additions & 1 deletion py/farm_ng/core/uri.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,9 @@ def uri_to_string(uri: uri_pb2.Uri) -> str:
>>> uri.scheme = "protobuf"
>>> uri.authority = "farm-ng-1"
>>> uri.query = "type=farm_ng.core.proto.Timestamp&pb=farm_ng/core/timestamp.proto"
>>> uri.path = "/stamp_stream"
>>> uri_to_string(uri)
'protobuf://farm-ng-1//?type=farm_ng.core.proto.Timestamp&pb=farm_ng/core/timestamp.proto'
'protobuf://farm-ng-1//stamp_stream?type=farm_ng.core.proto.Timestamp&pb=farm_ng/core/timestamp.proto'
"""
return f"{uri.scheme}://{uri.authority}/{uri.path}?{uri.query}"

Expand Down
Loading