Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add header messages (including latched) to EventsFileWriter #196

Merged
merged 28 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
e17732f
Add concept of header messages to EventsFileWriter
Hackerman342 Dec 8, 2023
903653c
Fix new mypy issues
Hackerman342 Dec 12, 2023
9598106
Plumb through header msgs as list[(event, payload),...]
Hackerman342 Dec 12, 2023
bee805c
Merge remote-tracking branch 'origin/main' into headers-split-logs
Hackerman342 Dec 12, 2023
8ca0bcd
Add 'header_uris' field to EventServiceConfig
Hackerman342 Dec 12, 2023
a36c989
Fix startup bug when no max file size is specified
Hackerman342 Dec 12, 2023
b1e47c2
Add test for header messages through EventServiceRecorder
Hackerman342 Dec 13, 2023
0e5317d
Add TestEventsFileWriter
Hackerman342 Dec 13, 2023
7e02b15
Last rollover file may only have 1 message
Hackerman342 Dec 13, 2023
b56eeb6
EventsFileWriter headers as dict - prevent unbounded growth
Hackerman342 Dec 13, 2023
2aae43a
More lenient on rough file size comp
Hackerman342 Dec 13, 2023
baad5ab
Bring dict logic for headers into EventServiceRecorder
Hackerman342 Dec 13, 2023
1a79d2e
message_count logic was flipped
Hackerman342 Dec 13, 2023
1b81fdc
EventsFileWriter headers cannot be changed
Hackerman342 Dec 13, 2023
8902d8f
Reduce test dependency on inaccurate file length estimation
Hackerman342 Dec 13, 2023
5fda933
Match on uri_to_string -- seems safer than direct proto comparison
Hackerman342 Dec 13, 2023
1b4def0
Tweak test comments
Hackerman342 Dec 13, 2023
d473696
Merge remote-tracking branch 'origin/main' into headers-split-logs
Hackerman342 Dec 14, 2023
2903251
Set self._recorder = None in _safe_done_callback
Hackerman342 Dec 14, 2023
378b98d
Require addtl arg for header cmd (e.g. 'header/metadata')
Hackerman342 Dec 14, 2023
03293c2
Consistency - leading slash for req-rep (e.g., '/start')
Hackerman342 Dec 14, 2023
c09db6c
Add more type checking on header events / payloads
Hackerman342 Dec 15, 2023
84ae0d0
Rename header_msgs -> header_events (more accurate)
Hackerman342 Dec 15, 2023
e790025
Add method for getting header_messages as protobufs w/ test
Hackerman342 Dec 15, 2023
3e1c957
Simplification - just start uri path's with /header
Hackerman342 Dec 15, 2023
d3863d5
Add handling of --max-file-mb=<###> arg in recorder EventServiceConfig
Hackerman342 Dec 15, 2023
17d815b
Merge remote-tracking branch 'origin/main' into headers-split-logs
Hackerman342 Dec 15, 2023
130924e
Meet new docstring formatting requirements
Hackerman342 Dec 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions protos/farm_ng/core/event_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ message EventServiceConfig {
DEBUG = 10;
}
LogLevel log_level = 7;
// URIs to treat as headers for the EventServiceRecorder
repeated Uri header_uris = 8;
}

// list of event service configurations
Expand Down
76 changes: 65 additions & 11 deletions py/farm_ng/core/event_service_recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import asyncio
import logging
import sys
from collections import deque
from datetime import datetime, timezone
from pathlib import Path
from typing import TYPE_CHECKING
Expand All @@ -43,7 +44,7 @@
from farm_ng.core.events_file_reader import payload_to_protobuf, proto_from_json_file
from farm_ng.core.events_file_writer import EventsFileWriter
from farm_ng.core.stamp import StampSemantics, get_monotonic_now
from farm_ng.core.uri import get_host_name, uri_query_to_dict
from farm_ng.core.uri import get_host_name, uri_query_to_dict, uri_to_string
from google.protobuf.empty_pb2 import Empty
from google.protobuf.wrappers_pb2 import StringValue

Expand All @@ -60,7 +61,12 @@ class EventServiceRecorder:
# the maximum size of the queue
QUEUE_MAX_SIZE: int = 50

def __init__(self, service_name: str, config_list: EventServiceConfigList) -> None:
def __init__(
self,
service_name: str,
config_list: EventServiceConfigList,
header_msgs: list[tuple[event_pb2.Event, bytes]] | None = None,
) -> None:
"""Initializes the service.

Args:
Expand Down Expand Up @@ -95,6 +101,9 @@ def __init__(self, service_name: str, config_list: EventServiceConfigList) -> No
self.record_queue: asyncio.Queue[tuple[event_pb2.Event, bytes]] = asyncio.Queue(
maxsize=self.QUEUE_MAX_SIZE,
)
self.header_deque: deque[tuple[event_pb2.Event, bytes]] = deque()
if header_msgs:
Hackerman342 marked this conversation as resolved.
Show resolved Hide resolved
self.header_deque.extend(header_msgs)

@property
def logger(self) -> logging.Logger:
Expand All @@ -119,10 +128,20 @@ async def record(
extension (str, optional): the extension of the file. Defaults to ".bin".
max_file_mb (int, optional): the maximum size of the file in MB. Defaults to 0.
"""
with EventsFileWriter(file_base, extension, max_file_mb) as writer:
with EventsFileWriter(
file_base,
extension,
max_file_mb,
header_msgs=list(self.header_deque),
) as writer:
self.header_deque.clear()
event: event_pb2.Event
payload: bytes
while True:
# Add any header messages added during recording
while self.header_deque:
event, payload = self.header_deque.popleft()
writer.add_header_msg(event, payload, write=True)
# await a new event and payload, and write it to the file
event, payload = await self.record_queue.get()
event.timestamps.append(
Expand All @@ -134,16 +153,21 @@ async def subscribe(
self,
client: EventClient,
subscription: SubscribeRequest,
header_uris: list[str],
) -> None:
"""Subscribes to a service and puts the events in the queue.

Args:
client (EventClient): the client to subscribe to.
subscription (SubscribeRequest): the subscription request.
header_uris (list[str]): the list of URIs (as strings) to consider as header messages.
"""
event: event_pb2.Event
payload: bytes
async for event, payload in client.subscribe(subscription, decode=False):
if uri_to_string(event.uri) in header_uris:
# Handle header messages - typically these will be metadata or calibrations
self.header_deque.append((event, payload))
Hackerman342 marked this conversation as resolved.
Show resolved Hide resolved
try:
self.record_queue.put_nowait((event, payload))
except asyncio.QueueFull:
Expand Down Expand Up @@ -187,8 +211,21 @@ async def subscribe_and_record(
self.logger.warning("Invalid subscription: %s", query_service_name)
continue
client: EventClient = self.clients[query_service_name]
# Build a list of header URIs matching the service_name for this subscription
header_uris: list[str] = []
for header_uri in self.recorder_config.header_uris:
header_dict: dict[str, str] = uri_query_to_dict(uri=header_uri)
header_service_name: str = header_dict["service_name"]
if header_service_name == query_service_name:
header_uris.append(uri_to_string(header_uri))
async_tasks.append(
asyncio.create_task(self.subscribe(client, subscription)),
asyncio.create_task(
self.subscribe(
client,
subscription,
header_uris,
),
),
)
try:
await asyncio.gather(*async_tasks)
Expand Down Expand Up @@ -258,6 +295,9 @@ def __init__(self, event_service: EventServiceGrpc) -> None:
# the recorder task
self._recorder_task: asyncio.Task | None = None

# For tracking header messages (e.g. metadata, calibrations) to be logged in the recordings
self.header_msgs: dict[str, tuple[event_pb2.Event, bytes]] = {}

# public methods

async def start_recording(
Expand All @@ -282,6 +322,7 @@ async def start_recording(
self._recorder = EventServiceRecorder(
config_name or "record_default",
config_list,
list(self.header_msgs.values()),
)
self._recorder_task = asyncio.create_task(
self._recorder.subscribe_and_record(file_base=file_base),
Expand Down Expand Up @@ -345,16 +386,29 @@ async def _request_reply_handler(
return StringValue(value=str(file_base))
if cmd == "stop":
await self.stop_recording()
elif cmd == "metadata":
elif cmd == "add_header_msg":
self._event_service.logger.info("add_header_msg: %s", request.payload)
self.add_header_msg(request.event, request.payload)
elif cmd == "clear_headers":
self._event_service.logger.info("clear_headers")
self.header_msgs.clear()
if self._recorder is not None:
self._event_service.logger.info("send_metadata: %s", request.payload)
await self._recorder.record_queue.put((request.event, request.payload))
else:
self._event_service.logger.warning(
"requested to send metadata but not recording",
)
self._recorder.header_deque.clear()
return Empty()

def add_header_msg(self, event: event_pb2.Event, payload: bytes) -> None:
"""Adds a header message to the header_msgs list.
If this is a duplicate (per URI), it will replace the existing message.

Args:
event (event_pb2.Event): the event.
payload (bytes): the payload.
"""
self.header_msgs[uri_to_string(event.uri)] = (event, payload)
if self._recorder is not None:
self._event_service.logger.info("Adding header msg to active recording")
self._recorder.header_deque.append((event, payload))


def service_command(_args):
config_list, service_config = load_service_config(args)
Expand Down
56 changes: 52 additions & 4 deletions py/farm_ng/core/events_file_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# https://github.com/protocolbuffers/protobuf/issues/10372
from farm_ng.core.event_pb2 import Event
from farm_ng.core.stamp import StampSemantics, get_monotonic_now, get_system_clock_now
from farm_ng.core.uri import make_proto_uri
from farm_ng.core.uri import make_proto_uri, uri_to_string
from google.protobuf.json_format import MessageToJson

if TYPE_CHECKING:
Expand Down Expand Up @@ -57,13 +57,15 @@ def __init__(
file_base: str | Path,
extension: str = ".bin",
max_file_mb: int = 0,
header_msgs: list[tuple[Event, bytes]] | None = None,
) -> None:
"""Create a new EventsFileWriter.

Args:
file_base: Path to and base of file name (without extension) where the events file will be logged.
extension: Extension of the file to be logged. E.g., '.bin' or '.log'
max_file_mb: Maximum log size in MB. Logging will roll over to new file when reached. Ignored if <= 0.
header_msgs: Tuples of events & payloads to include in every split of the log file
"""
if isinstance(file_base, str):
file_base = Path(file_base)
Expand All @@ -80,6 +82,10 @@ def __init__(
self._max_file_length = int(max(0, max_file_mb) * 1e6)
self._file_idx: int = 0

self._header_msgs: dict[str, tuple[Event, bytes]] = {}
for (event, payload) in header_msgs or []:
self.add_header_msg(event, payload)

def __enter__(self) -> EventsFileWriter:
"""Open the file for writing and return self."""
success: bool = self.open()
Expand Down Expand Up @@ -135,10 +141,54 @@ def _increment_file_idx(self) -> None:
"""Increment the file index."""
self._file_idx += 1

@property
def header_msgs(self) -> dict[str, tuple[Event, bytes]]:
"""Return the list of header messages.
Hackerman342 marked this conversation as resolved.
Show resolved Hide resolved
Returns:
dict[str, tuple[Event, bytes]]: Dictionary of header messages.
key: string representation of the uri
value: tuple of event and payload
"""
return self._header_msgs

def add_header_msg(self, event: Event, payload: bytes, write: bool = False) -> None:
"""Add a header message, and optionally writes it to the file.
Hackerman342 marked this conversation as resolved.
Show resolved Hide resolved
NOTE: Writing to file will fail if the file is not open.

Args:
event: Event to write.
payload: Payload to write.
write: If True, write the header message to the file. Defaults to False.
"""
if not isinstance(event, Event):
error_msg = f"event must be Event, not {type(event)}"
raise TypeError(error_msg)
if not isinstance(payload, bytes):
error_msg = f"payload must be bytes, not {type(payload)}"
raise TypeError(error_msg)
# Once a header message is written to the file, it cannot be changed
if uri_to_string(event.uri) not in self.header_msgs:
self._header_msgs[uri_to_string(event.uri)] = (event, payload)
if write:
self.write_event_payload(event, payload)

def write_header_msgs(self) -> None:
"""Write the header messages to the file, without getting stuck in a loop
if the headers are larger than the max file size."""
true_max_file_length = self.max_file_length
self._max_file_length = 0
for (event, payload) in self.header_msgs.values():
self.write_event_payload(event, payload)
self._max_file_length = true_max_file_length
if self.max_file_length and self.file_length > self.max_file_length:
msg = f"Header messages are too large to fit in a file of size {self.max_file_length}"
raise RuntimeError(msg)

def open(self) -> bool:
"""Open the file for writing. Return True if successful."""
self._file_stream = Path(self.file_name).open("wb")
self._file_length = 0
self.write_header_msgs()
return self.is_open()

def close(self) -> bool:
Expand All @@ -157,9 +207,7 @@ def write_event_payload(self, event: Event, payload: bytes) -> None:

if event.payload_length != len(payload):
msg = f"Payload length mismatch {event.payload_length} != {len(payload)}"
raise RuntimeError(
msg,
)
raise RuntimeError(msg)

file_stream = cast(IO, self._file_stream)

Expand Down
3 changes: 2 additions & 1 deletion py/farm_ng/core/uri.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,9 @@ def uri_to_string(uri: uri_pb2.Uri) -> str:
>>> uri.scheme = "protobuf"
>>> uri.authority = "farm-ng-1"
>>> uri.query = "type=farm_ng.core.proto.Timestamp&pb=farm_ng/core/timestamp.proto"
>>> uri.path = "/stamp_stream"
>>> uri_to_string(uri)
'protobuf://farm-ng-1//?type=farm_ng.core.proto.Timestamp&pb=farm_ng/core/timestamp.proto'
'protobuf://farm-ng-1//stamp_stream?type=farm_ng.core.proto.Timestamp&pb=farm_ng/core/timestamp.proto'
"""
return f"{uri.scheme}://{uri.authority}/{uri.path}?{uri.query}"

Expand Down
66 changes: 66 additions & 0 deletions py/tests/_asyncio/test_event_service_recorder.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
import asyncio
from pathlib import Path
from typing import TYPE_CHECKING

import pytest
from farm_ng.core.event_pb2 import Event
from farm_ng.core.event_service import EventServiceGrpc
from farm_ng.core.event_service_pb2 import (
EventServiceConfigList,
RequestReplyRequest,
)
from farm_ng.core.event_service_recorder import EventServiceRecorder, RecorderService
from farm_ng.core.events_file_reader import EventsFileReader, payload_to_protobuf
from farm_ng.core.uri import make_proto_uri
from google.protobuf.message import Message
from google.protobuf.wrappers_pb2 import Int32Value

if TYPE_CHECKING:
from farm_ng.core.uri_pb2 import Uri


async def request_reply_handler(
request: RequestReplyRequest,
Expand Down Expand Up @@ -79,6 +85,66 @@ async def test_event_service_recorder(
event_message = event_log.read_message()
assert event_message == message

@pytest.mark.anyio()
async def test_file_headers(
self,
tmp_path: Path,
event_service: EventServiceGrpc,
recorder_service: EventServiceRecorder,
) -> None:
# reset the counts
event_service.reset()
event_service.request_reply_handler = request_reply_handler

# Define some test values
header_uri_base: str = "baz_header"
header_count: int = 3

# Create the header and pass it to EventServiceRecorder
for i in range(10, 10 + header_count):
message = Int32Value(value=i)
payload: bytes = message.SerializeToString()
uri: Uri = make_proto_uri(
path=f"/{header_uri_base}_{i}",
message=message,
service_name=event_service.config.name,
)
event = Event(
uri=uri,
timestamps=[],
payload_length=len(payload),
sequence=i,
)
recorder_service.header_deque.append((event, payload))

# start the recording
file_name = tmp_path / "test_headers"
task = asyncio.create_task(
recorder_service.subscribe_and_record(file_name),
)
await asyncio.sleep(0.1)

# Cancel the recording
task.cancel()
await asyncio.sleep(0.1)

file_name_bin = file_name.with_suffix(".0000.bin")
assert file_name_bin.exists()

# read the file
reader = EventsFileReader(file_name_bin)
assert reader.open()

# Check the headers
logged_headers: int = 0
for event_log in reader.get_index():
# Check headers - skip any other events
if header_uri_base in event_log.event.uri.path:
logged_headers += 1
event_message = event_log.read_message()
assert isinstance(event_message, Int32Value)
assert logged_headers == header_count


class TestRecorderService:
@pytest.mark.anyio()
Expand Down
Loading