From e17732f83e2be6b4e884bfd15fa2ce4127802606 Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Fri, 8 Dec 2023 14:10:13 -0500 Subject: [PATCH 01/25] Add concept of header messages to EventsFileWriter --- py/farm_ng/core/events_file_writer.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/py/farm_ng/core/events_file_writer.py b/py/farm_ng/core/events_file_writer.py index aaa35099..d6f3c025 100644 --- a/py/farm_ng/core/events_file_writer.py +++ b/py/farm_ng/core/events_file_writer.py @@ -57,6 +57,7 @@ def __init__( file_base: str | Path, extension: str = ".bin", max_file_mb: int = 0, + header_msgs: list[tuple(str, Message)] | None = None, ) -> None: """Create a new EventsFileWriter. @@ -64,6 +65,7 @@ def __init__( file_base: Path to and base of file name (without extension) where the events file will be logged. extension: Extension of the file to be logged. E.g., '.bin' or '.log' max_file_mb: Maximum log size in MB. Logging will roll over to new file when reached. Ignored if <= 0. + header_msgs: Tuples of paths & Messages to include in every split of the log file """ if isinstance(file_base, str): file_base = Path(file_base) @@ -80,6 +82,10 @@ def __init__( self._max_file_length = int(max(0, max_file_mb) * 1e6) self._file_idx: int = 0 + if header_msgs is None: + header_msgs = [] + self._header_msgs: list[tuple(str, Message)] = header_msgs + def __enter__(self) -> EventsFileWriter: """Open the file for writing and return self.""" success: bool = self.open() @@ -135,10 +141,29 @@ def _increment_file_idx(self) -> None: """Increment the file index.""" self._file_idx += 1 + @property + def header_msgs(self) -> list[tuple(str, Message)]: + """Return the list of header messages.""" + return self._header_msgs + + def add_header_msg(self, msg: Message) -> None: + """Add a header message.""" + self._header_msgs.append(msg) + + def write_header_msgs(self) -> None: + """Write the header messages to the file, without getting stuck in a loop + if the headers are larger than the max file size.""" + true_max_file_length = self.max_file_length + self._max_file_length = 0 + for (path, msg) in self.header_msgs: + self.write(path=path, message=msg, write_stamps=False) + self._max_file_length = true_max_file_length + def open(self) -> bool: """Open the file for writing. Return True if successful.""" self._file_stream = Path(self.file_name).open("wb") self._file_length = 0 + self.write_header_msgs() return self.is_open() def close(self) -> bool: From 903653ccaeec49518ae6012a762eff1d98302b6d Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Tue, 12 Dec 2023 11:30:26 -0500 Subject: [PATCH 02/25] Fix new mypy issues --- py/farm_ng/core/events_file_writer.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/py/farm_ng/core/events_file_writer.py b/py/farm_ng/core/events_file_writer.py index d6f3c025..2090ffff 100644 --- a/py/farm_ng/core/events_file_writer.py +++ b/py/farm_ng/core/events_file_writer.py @@ -10,11 +10,11 @@ from farm_ng.core.stamp import StampSemantics, get_monotonic_now, get_system_clock_now from farm_ng.core.uri import make_proto_uri from google.protobuf.json_format import MessageToJson +from google.protobuf.message import Message if TYPE_CHECKING: from farm_ng.core.timestamp_pb2 import Timestamp from farm_ng.core.uri_pb2 import Uri - from google.protobuf.message import Message # public symbols @@ -57,7 +57,7 @@ def __init__( file_base: str | Path, extension: str = ".bin", max_file_mb: int = 0, - header_msgs: list[tuple(str, Message)] | None = None, + header_msgs: list[tuple[str, Message]] | None = None, ) -> None: """Create a new EventsFileWriter. @@ -84,7 +84,7 @@ def __init__( if header_msgs is None: header_msgs = [] - self._header_msgs: list[tuple(str, Message)] = header_msgs + self._header_msgs: list[tuple[str, Message]] = header_msgs def __enter__(self) -> EventsFileWriter: """Open the file for writing and return self.""" @@ -142,13 +142,19 @@ def _increment_file_idx(self) -> None: self._file_idx += 1 @property - def header_msgs(self) -> list[tuple(str, Message)]: + def header_msgs(self) -> list[tuple[str, Message]]: """Return the list of header messages.""" return self._header_msgs - def add_header_msg(self, msg: Message) -> None: + def add_header_msg(self, path: str, msg: Message) -> None: """Add a header message.""" - self._header_msgs.append(msg) + if not isinstance(path, str): + error_msg = f"path must be a string, not {type(path)}" + raise TypeError(error_msg) + if not isinstance(msg, Message): + error_msg = f"msg must be a Message, not {type(msg)}" + raise TypeError(error_msg) + self._header_msgs.append((path, msg)) def write_header_msgs(self) -> None: """Write the header messages to the file, without getting stuck in a loop From 95981062bb91c36eb350d72aec30bb4dc24d7e60 Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Tue, 12 Dec 2023 14:47:09 -0500 Subject: [PATCH 03/25] Plumb through header msgs as list[(event, payload),...] --- py/farm_ng/core/event_service_recorder.py | 53 +++++++++++++++++++---- py/farm_ng/core/events_file_writer.py | 47 ++++++++++++-------- 2 files changed, 74 insertions(+), 26 deletions(-) diff --git a/py/farm_ng/core/event_service_recorder.py b/py/farm_ng/core/event_service_recorder.py index 67a1721d..7656a21a 100644 --- a/py/farm_ng/core/event_service_recorder.py +++ b/py/farm_ng/core/event_service_recorder.py @@ -23,6 +23,7 @@ import asyncio import logging import sys +from collections import deque from datetime import datetime, timezone from pathlib import Path from typing import TYPE_CHECKING @@ -60,7 +61,12 @@ class EventServiceRecorder: # the maximum size of the queue QUEUE_MAX_SIZE: int = 50 - def __init__(self, service_name: str, config_list: EventServiceConfigList) -> None: + def __init__( + self, + service_name: str, + config_list: EventServiceConfigList, + header_msgs: list[tuple[event_pb2.Event, bytes]] | None = None, + ) -> None: """Initializes the service. Args: @@ -95,6 +101,9 @@ def __init__(self, service_name: str, config_list: EventServiceConfigList) -> No self.record_queue: asyncio.Queue[tuple[event_pb2.Event, bytes]] = asyncio.Queue( maxsize=self.QUEUE_MAX_SIZE, ) + self.header_deque: deque[tuple[event_pb2.Event, bytes]] = deque() + if header_msgs: + self.header_deque.extend(header_msgs) @property def logger(self) -> logging.Logger: @@ -119,10 +128,20 @@ async def record( extension (str, optional): the extension of the file. Defaults to ".bin". max_file_mb (int, optional): the maximum size of the file in MB. Defaults to 0. """ - with EventsFileWriter(file_base, extension, max_file_mb) as writer: + with EventsFileWriter( + file_base, + extension, + max_file_mb, + header_msgs=list(self.header_deque), + ) as writer: + self.header_deque.clear() event: event_pb2.Event payload: bytes while True: + # Add any header messages added during recording + while self.header_deque: + event, payload = self.header_deque.popleft() + writer.add_header_msg(event, payload, write=True) # await a new event and payload, and write it to the file event, payload = await self.record_queue.get() event.timestamps.append( @@ -258,6 +277,9 @@ def __init__(self, event_service: EventServiceGrpc) -> None: # the recorder task self._recorder_task: asyncio.Task | None = None + # For tracking header messages (e.g. metadata, calibrations) to be logged in the recordings + self.header_msgs: list[tuple[event_pb2.Event, bytes]] = [] + # public methods async def start_recording( @@ -282,6 +304,7 @@ async def start_recording( self._recorder = EventServiceRecorder( config_name or "record_default", config_list, + self.header_msgs, ) self._recorder_task = asyncio.create_task( self._recorder.subscribe_and_record(file_base=file_base), @@ -345,16 +368,28 @@ async def _request_reply_handler( return StringValue(value=str(file_base)) if cmd == "stop": await self.stop_recording() - elif cmd == "metadata": + elif cmd == "add_header_msg": + self._event_service.logger.info("add_header_msg: %s", request.payload) + self.add_header_msg(request.event, request.payload) + elif cmd == "clear_headers": + self._event_service.logger.info("clear_headers") + self.header_msgs.clear() if self._recorder is not None: - self._event_service.logger.info("send_metadata: %s", request.payload) - await self._recorder.record_queue.put((request.event, request.payload)) - else: - self._event_service.logger.warning( - "requested to send metadata but not recording", - ) + self._recorder.header_deque.clear() return Empty() + def add_header_msg(self, event: event_pb2.Event, payload: bytes) -> None: + """Adds a header message to the header_msgs list. + + Args: + event (event_pb2.Event): the event. + payload (bytes): the payload. + """ + self.header_msgs.append((event, payload)) + if self._recorder is not None: + self._event_service.logger.info("Adding header msg to active recording") + self._recorder.header_deque.append((event, payload)) + def service_command(_args): config_list, service_config = load_service_config(args) diff --git a/py/farm_ng/core/events_file_writer.py b/py/farm_ng/core/events_file_writer.py index 2090ffff..86952ffe 100644 --- a/py/farm_ng/core/events_file_writer.py +++ b/py/farm_ng/core/events_file_writer.py @@ -10,11 +10,11 @@ from farm_ng.core.stamp import StampSemantics, get_monotonic_now, get_system_clock_now from farm_ng.core.uri import make_proto_uri from google.protobuf.json_format import MessageToJson -from google.protobuf.message import Message if TYPE_CHECKING: from farm_ng.core.timestamp_pb2 import Timestamp from farm_ng.core.uri_pb2 import Uri + from google.protobuf.message import Message # public symbols @@ -57,7 +57,7 @@ def __init__( file_base: str | Path, extension: str = ".bin", max_file_mb: int = 0, - header_msgs: list[tuple[str, Message]] | None = None, + header_msgs: list[tuple[Event, bytes]] | None = None, ) -> None: """Create a new EventsFileWriter. @@ -65,7 +65,7 @@ def __init__( file_base: Path to and base of file name (without extension) where the events file will be logged. extension: Extension of the file to be logged. E.g., '.bin' or '.log' max_file_mb: Maximum log size in MB. Logging will roll over to new file when reached. Ignored if <= 0. - header_msgs: Tuples of paths & Messages to include in every split of the log file + header_msgs: Tuples of events & payloads to include in every split of the log file """ if isinstance(file_base, str): file_base = Path(file_base) @@ -82,9 +82,7 @@ def __init__( self._max_file_length = int(max(0, max_file_mb) * 1e6) self._file_idx: int = 0 - if header_msgs is None: - header_msgs = [] - self._header_msgs: list[tuple[str, Message]] = header_msgs + self._header_msgs: list[tuple[Event, bytes]] = header_msgs or [] def __enter__(self) -> EventsFileWriter: """Open the file for writing and return self.""" @@ -142,28 +140,43 @@ def _increment_file_idx(self) -> None: self._file_idx += 1 @property - def header_msgs(self) -> list[tuple[str, Message]]: - """Return the list of header messages.""" + def header_msgs(self) -> list[tuple[Event, bytes]]: + """Return the list of header messages. + Returns: + list[tuple[Event, bytes]]: List of header messages. + """ return self._header_msgs - def add_header_msg(self, path: str, msg: Message) -> None: - """Add a header message.""" - if not isinstance(path, str): - error_msg = f"path must be a string, not {type(path)}" + def add_header_msg(self, event: Event, payload: bytes, write: bool = False) -> None: + """Add a header message, and optionally writes it to the file. + NOTE: Writing to file will fail if the file is not open. + + Args: + event: Event to write. + payload: Payload to write. + write: If True, write the header message to the file. Defaults to False. + """ + if not isinstance(event, Event): + error_msg = f"event must be Event, not {type(event)}" raise TypeError(error_msg) - if not isinstance(msg, Message): - error_msg = f"msg must be a Message, not {type(msg)}" + if not isinstance(payload, bytes): + error_msg = f"payload must be bytes, not {type(payload)}" raise TypeError(error_msg) - self._header_msgs.append((path, msg)) + self._header_msgs.append((event, payload)) + if write: + self.write_event_payload(event, payload) def write_header_msgs(self) -> None: """Write the header messages to the file, without getting stuck in a loop if the headers are larger than the max file size.""" true_max_file_length = self.max_file_length self._max_file_length = 0 - for (path, msg) in self.header_msgs: - self.write(path=path, message=msg, write_stamps=False) + for (event, payload) in self.header_msgs: + self.write_event_payload(event, payload) self._max_file_length = true_max_file_length + if self.file_length > self.max_file_length: + msg = f"Header messages are too large to fit in a file of size {self.max_file_length}" + raise RuntimeError(msg) def open(self) -> bool: """Open the file for writing. Return True if successful.""" From 8ca0bcd5500b66607c59feb3fa2c8b3c59277a77 Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Tue, 12 Dec 2023 15:53:58 -0500 Subject: [PATCH 04/25] Add 'header_uris' field to EventServiceConfig --- protos/farm_ng/core/event_service.proto | 2 ++ py/farm_ng/core/event_service_recorder.py | 25 ++++++++++++++++++++++- 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/protos/farm_ng/core/event_service.proto b/protos/farm_ng/core/event_service.proto index 1cd0a34e..7b1fdc98 100644 --- a/protos/farm_ng/core/event_service.proto +++ b/protos/farm_ng/core/event_service.proto @@ -104,6 +104,8 @@ message EventServiceConfig { DEBUG = 10; } LogLevel log_level = 7; + // URIs to treat as headers for the EventServiceRecorder + repeated Uri header_uris = 8; } // list of event service configurations diff --git a/py/farm_ng/core/event_service_recorder.py b/py/farm_ng/core/event_service_recorder.py index 7656a21a..734b5ece 100644 --- a/py/farm_ng/core/event_service_recorder.py +++ b/py/farm_ng/core/event_service_recorder.py @@ -50,6 +50,7 @@ if TYPE_CHECKING: from farm_ng.core import event_pb2 + from farm_ng.core.uri_pb2 import Uri from google.protobuf.message import Message __all__ = ["EventServiceRecorder", "RecorderService"] @@ -153,6 +154,7 @@ async def subscribe( self, client: EventClient, subscription: SubscribeRequest, + header_uris: list[Uri], ) -> None: """Subscribes to a service and puts the events in the queue. @@ -163,6 +165,14 @@ async def subscribe( event: event_pb2.Event payload: bytes async for event, payload in client.subscribe(subscription, decode=False): + if event.uri in header_uris: + # Handle header messages - typically these will be metadata or calibrations + # This assumes they are only published once (likely with latching=True), + # as we break the subscription after the first header message is received + self.logger.info("header received: %s", event) + self.header_deque.append((event, payload)) + self.logger.info("breaking subscription to %s", subscription.uri) + break try: self.record_queue.put_nowait((event, payload)) except asyncio.QueueFull: @@ -206,8 +216,21 @@ async def subscribe_and_record( self.logger.warning("Invalid subscription: %s", query_service_name) continue client: EventClient = self.clients[query_service_name] + # Build a list of header URIs matching the service_name for this subscription + header_uris: list[Uri] = [] + for header_uri in self.recorder_config.header_uris: + header_dict: dict[str, str] = uri_query_to_dict(uri=header_uri) + header_service_name: str = header_dict["service_name"] + if header_service_name == query_service_name: + header_uris.append(header_uri) async_tasks.append( - asyncio.create_task(self.subscribe(client, subscription)), + asyncio.create_task( + self.subscribe( + client, + subscription, + header_uris, + ), + ), ) try: await asyncio.gather(*async_tasks) From a36c989c94b1abf337390fec84905747cb100841 Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Tue, 12 Dec 2023 18:51:00 -0500 Subject: [PATCH 05/25] Fix startup bug when no max file size is specified --- py/farm_ng/core/events_file_writer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/py/farm_ng/core/events_file_writer.py b/py/farm_ng/core/events_file_writer.py index 86952ffe..adf1c9a5 100644 --- a/py/farm_ng/core/events_file_writer.py +++ b/py/farm_ng/core/events_file_writer.py @@ -174,7 +174,7 @@ def write_header_msgs(self) -> None: for (event, payload) in self.header_msgs: self.write_event_payload(event, payload) self._max_file_length = true_max_file_length - if self.file_length > self.max_file_length: + if self.max_file_length and self.file_length > self.max_file_length: msg = f"Header messages are too large to fit in a file of size {self.max_file_length}" raise RuntimeError(msg) From b1e47c2fe0a617f412d9698584c385f0eab0c5e5 Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Wed, 13 Dec 2023 11:16:26 -0500 Subject: [PATCH 06/25] Add test for header messages through EventServiceRecorder --- .../_asyncio/test_event_service_recorder.py | 67 +++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/py/tests/_asyncio/test_event_service_recorder.py b/py/tests/_asyncio/test_event_service_recorder.py index ca608bf9..49a83b6b 100644 --- a/py/tests/_asyncio/test_event_service_recorder.py +++ b/py/tests/_asyncio/test_event_service_recorder.py @@ -1,7 +1,9 @@ import asyncio from pathlib import Path +from typing import TYPE_CHECKING import pytest +from farm_ng.core.event_pb2 import Event from farm_ng.core.event_service import EventServiceGrpc from farm_ng.core.event_service_pb2 import ( EventServiceConfigList, @@ -9,9 +11,13 @@ ) from farm_ng.core.event_service_recorder import EventServiceRecorder, RecorderService from farm_ng.core.events_file_reader import EventsFileReader, payload_to_protobuf +from farm_ng.core.uri import make_proto_uri from google.protobuf.message import Message from google.protobuf.wrappers_pb2 import Int32Value +if TYPE_CHECKING: + from farm_ng.core.uri_pb2 import Uri + async def request_reply_handler( request: RequestReplyRequest, @@ -79,6 +85,67 @@ async def test_event_service_recorder( event_message = event_log.read_message() assert event_message == message + @pytest.mark.anyio() + async def test_file_headers( + self, + tmp_path: Path, + event_service: EventServiceGrpc, + recorder_service: EventServiceRecorder, + ) -> None: + # reset the counts + event_service.reset() + event_service.request_reply_handler = request_reply_handler + + # Define some test values + header_uri_path: str = "/baz_header" + header_count: int = 3 + + # Create the header and pass it to EventServiceRecorder + for i in range(10, 10 + header_count): + message = Int32Value(value=i) + payload: bytes = message.SerializeToString() + uri: Uri = make_proto_uri( + path=header_uri_path, + message=message, + service_name=event_service.config.name, + ) + event = Event( + uri=uri, + timestamps=[], + payload_length=len(payload), + sequence=i, + ) + recorder_service.header_deque.append((event, payload)) + + # start the recording + file_name = tmp_path / "test_headers" + task = asyncio.create_task( + recorder_service.subscribe_and_record(file_name), + ) + await asyncio.sleep(0.1) + + # Cancel the recording + task.cancel() + await asyncio.sleep(0.1) + + file_name_bin = file_name.with_suffix(".0000.bin") + assert file_name_bin.exists() + + # read the file + reader = EventsFileReader(file_name_bin) + assert reader.open() + + # Check the headers + logged_headers: int = 0 + expected_path: str = f"{event_service.config.name}{header_uri_path}" + for event_log in reader.get_index(): + # Check headers - skip any other events + if event_log.event.uri.path == expected_path: + logged_headers += 1 + event_message = event_log.read_message() + assert isinstance(event_message, Int32Value) + assert logged_headers == header_count + class TestRecorderService: @pytest.mark.anyio() From 0e5317d97b4bb37cc9ddf7345fb70967c8583a53 Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Wed, 13 Dec 2023 14:32:52 -0500 Subject: [PATCH 07/25] Add TestEventsFileWriter --- py/tests/test_events_file_writer.py | 223 ++++++++++++++++++++++++++++ 1 file changed, 223 insertions(+) create mode 100644 py/tests/test_events_file_writer.py diff --git a/py/tests/test_events_file_writer.py b/py/tests/test_events_file_writer.py new file mode 100644 index 00000000..87ce1e07 --- /dev/null +++ b/py/tests/test_events_file_writer.py @@ -0,0 +1,223 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest +from farm_ng.core.event_pb2 import Event +from farm_ng.core.events_file_reader import EventsFileReader, payload_to_protobuf +from farm_ng.core.events_file_writer import EventsFileWriter +from farm_ng.core.uri import make_proto_uri +from google.protobuf.wrappers_pb2 import Int32Value, StringValue + +if TYPE_CHECKING: + from pathlib import Path + + from farm_ng.core.uri_pb2 import Uri + + +@pytest.fixture() +def file_base(tmp_path: Path) -> Path: + return tmp_path / "test_events_file" + + +@pytest.fixture() +def writer(file_base: Path): + return EventsFileWriter(file_base=file_base) + + +@pytest.fixture() +def reader(file_base: Path, writer: EventsFileWriter): + with writer as opened_writer: + opened_writer.write("test_path", Int32Value(value=123)) + return EventsFileReader(file_name=file_base.with_suffix(".0000.bin")) + + +class TestEventsFileWriter: + def test_open_and_close_file_writer(self, writer): + with writer as opened_writer: + assert opened_writer.is_open() + assert writer.is_closed() + + def test_write_and_read_messages(self, writer, reader): + event_count: int = 10 + with writer as opened_writer: + # Start at 1 to avoid payload length of 0 + for i in range(1, 1 + event_count): + opened_writer.write("test_path", Int32Value(value=i)) + + counted_events: int = 0 + with reader as opened_reader: + for event_log in opened_reader.get_index(): + event_message = event_log.read_message() + assert event_message == Int32Value(value=1 + counted_events) + assert event_log.event is not None + assert event_log.event.uri.path == "test_path" + assert event_log.event.payload_length == 2 + counted_events += 1 + assert counted_events == event_count + + def test_write_event_payload(self, writer, reader): + with writer as opened_writer: + message: StringValue = StringValue(value="test_payload") + event_payload: bytes = message.SerializeToString() + uri: Uri = make_proto_uri( + path="/test_path", + message=message, + service_name="test_service", + ) + event = Event( + uri=uri, + timestamps=[], + payload_length=len(event_payload), + sequence=0, + ) + opened_writer.write_event_payload(event, event_payload) + + with reader as opened_reader: + event_log = opened_reader.read_next_event() + message = opened_reader.read_message(event_log) + assert message == payload_to_protobuf(event, event_payload) + + def test_rollover_behavior(self, file_base: Path): + max_file_bytes: int = 1000 + with EventsFileWriter( + file_base=file_base, + max_file_mb=max_file_bytes * 1e-6, + ) as opened_writer: + expected_length: int = 0 + for i in range(100): + message = StringValue(value=f"test_payload_{i}") + event_payload: bytes = message.SerializeToString() + uri: Uri = make_proto_uri( + path="/test_path", + message=message, + service_name="test_service", + ) + event = Event( + uri=uri, + timestamps=[], + payload_length=len(event_payload), + sequence=i, + ) + opened_writer.write_event_payload(event, event_payload) + # NOTE: 1.03 is a consistent byte size overhead for writing the event and payload + expected_length += int( + 1.03 + * ( + len(message.SerializeToString()) + + len(event.SerializeToString()) + ), + ) + # Check that the file was rolled over the expected number of times + expected_files: int = expected_length // max_file_bytes + assert ( + expected_files > 5 + ) # If this fails, increase for loop size to trigger enough rollovers + assert expected_files == 1 + opened_writer.file_idx + + for i in range(expected_files): + file_name_bin = file_base.with_suffix(f".{i:04d}.bin") + assert file_name_bin.exists() + with EventsFileReader(file_name_bin) as opened_reader: + if i != expected_files - 1: + # Check that the length is roughly the max file size + assert opened_reader.file_length == pytest.approx( + max_file_bytes, + rel=0.1, + ) + for event_log in opened_reader.get_index(): + message = opened_reader.read_message(event_log) + assert isinstance(message, StringValue) + assert event_log.event.payload_length == len( + message.SerializeToString(), + ) + + def test_headers(self, file_base: Path): + # Build a list of test parameters + header_uri_path: str = "/baz_header" + header_uri_service: str = "test_service" + header_count: int = 10 + headers: list[tuple[Event, bytes]] = [] + header_size: int = 0 + max_file_bytes: int = 10000 + + # Create the header messages + for i in range(header_count): + message = Int32Value(value=i) + event_payload: bytes = message.SerializeToString() + uri: Uri = make_proto_uri( + path=header_uri_path, + message=message, + service_name=header_uri_service, + ) + event = Event( + uri=uri, + timestamps=[], + payload_length=len(event_payload), + sequence=i, + ) + headers.append((event, event_payload)) + # NOTE: 1.03 is a consistent byte size overhead for writing the event and payload + header_size += int( + 1.03 * (len(event.SerializeToString()) + len(event_payload)), + ) + + # Test that headers are written to the file + assert max_file_bytes > header_size # If this fails, edit the test parameters + with EventsFileWriter( + file_base=file_base, + max_file_mb=max_file_bytes * 1e-6, + header_msgs=headers, + ) as opened_writer: + assert opened_writer.file_idx == 0 + assert opened_writer.file_length == pytest.approx(header_size, rel=0.01) + + # Test that headers cannot exceed the max file size + with pytest.raises(RuntimeError): + with EventsFileWriter( + file_base=file_base, + max_file_mb=(header_size / 2) * 1e-6, + header_msgs=headers, + ) as opened_writer: + pass + + # Test that headers are added every rollover + file_count: int = 0 + with EventsFileWriter( + file_base=file_base, + max_file_mb=max_file_bytes * 1e-6, + header_msgs=headers, + ) as opened_writer: + assert opened_writer.file_idx == 0 + assert opened_writer.file_length == pytest.approx(header_size, rel=0.01) + for i in range(1000): + opened_writer.write("test_path", StringValue(value=f"test_payload_{i}")) + file_count = 1 + opened_writer.file_idx + assert ( + file_count > 5 + ) # If this fails, increase for loop size to trigger enough rollovers + + for i in range(file_count): + header_count_in_file: int = 0 + message_count_in_file: int = 0 + file_name_bin = file_base.with_suffix(f".{i:04d}.bin") + assert file_name_bin.exists() + with EventsFileReader(file_name_bin) as opened_reader: + if i != file_count - 1: + # Check that the length is roughly the max file size + assert opened_reader.file_length == pytest.approx( + max_file_bytes, + rel=0.1, + ) + for event_log in opened_reader.get_index(): + message = opened_reader.read_message(event_log) + if isinstance(message, StringValue): + message_count_in_file += 1 + elif isinstance(message, Int32Value): + header_count_in_file += 1 + else: + msg = f"Unexpected message type: {type(message)}" + raise TypeError(msg) + + assert header_count_in_file == header_count + assert message_count_in_file > 1 From 7e02b15db1e65b038d752fc9b69f010eff7185b6 Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Wed, 13 Dec 2023 14:35:49 -0500 Subject: [PATCH 08/25] Last rollover file may only have 1 message --- py/tests/test_events_file_writer.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/py/tests/test_events_file_writer.py b/py/tests/test_events_file_writer.py index 87ce1e07..8eca30fe 100644 --- a/py/tests/test_events_file_writer.py +++ b/py/tests/test_events_file_writer.py @@ -203,12 +203,6 @@ def test_headers(self, file_base: Path): file_name_bin = file_base.with_suffix(f".{i:04d}.bin") assert file_name_bin.exists() with EventsFileReader(file_name_bin) as opened_reader: - if i != file_count - 1: - # Check that the length is roughly the max file size - assert opened_reader.file_length == pytest.approx( - max_file_bytes, - rel=0.1, - ) for event_log in opened_reader.get_index(): message = opened_reader.read_message(event_log) if isinstance(message, StringValue): @@ -220,4 +214,12 @@ def test_headers(self, file_base: Path): raise TypeError(msg) assert header_count_in_file == header_count - assert message_count_in_file > 1 + if i != file_count - 1: + # Check that the length is roughly the max file size + assert opened_reader.file_length == pytest.approx( + max_file_bytes, + rel=0.1, + ) + assert message_count_in_file > 0 + else: + assert message_count_in_file > 10 From b56eeb60b94c91a255ec141e433f9de87edd3a7d Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Wed, 13 Dec 2023 14:56:51 -0500 Subject: [PATCH 09/25] EventsFileWriter headers as dict - prevent unbounded growth --- py/farm_ng/core/events_file_writer.py | 14 ++++++++------ py/farm_ng/core/uri.py | 3 ++- py/tests/_asyncio/test_event_service_recorder.py | 7 +++---- py/tests/test_events_file_writer.py | 12 ++++++++---- 4 files changed, 21 insertions(+), 15 deletions(-) diff --git a/py/farm_ng/core/events_file_writer.py b/py/farm_ng/core/events_file_writer.py index adf1c9a5..420e45b9 100644 --- a/py/farm_ng/core/events_file_writer.py +++ b/py/farm_ng/core/events_file_writer.py @@ -8,7 +8,7 @@ # https://github.com/protocolbuffers/protobuf/issues/10372 from farm_ng.core.event_pb2 import Event from farm_ng.core.stamp import StampSemantics, get_monotonic_now, get_system_clock_now -from farm_ng.core.uri import make_proto_uri +from farm_ng.core.uri import make_proto_uri, uri_to_string from google.protobuf.json_format import MessageToJson if TYPE_CHECKING: @@ -82,7 +82,9 @@ def __init__( self._max_file_length = int(max(0, max_file_mb) * 1e6) self._file_idx: int = 0 - self._header_msgs: list[tuple[Event, bytes]] = header_msgs or [] + self._header_msgs: dict[Uri, tuple[Event, bytes]] = {} + for (event, payload) in header_msgs or []: + self.add_header_msg(event, payload) def __enter__(self) -> EventsFileWriter: """Open the file for writing and return self.""" @@ -140,10 +142,10 @@ def _increment_file_idx(self) -> None: self._file_idx += 1 @property - def header_msgs(self) -> list[tuple[Event, bytes]]: + def header_msgs(self) -> dict[Uri, tuple[Event, bytes]]: """Return the list of header messages. Returns: - list[tuple[Event, bytes]]: List of header messages. + dict[Uri, tuple[Event, bytes]]: List of header messages. """ return self._header_msgs @@ -162,7 +164,7 @@ def add_header_msg(self, event: Event, payload: bytes, write: bool = False) -> N if not isinstance(payload, bytes): error_msg = f"payload must be bytes, not {type(payload)}" raise TypeError(error_msg) - self._header_msgs.append((event, payload)) + self._header_msgs[uri_to_string(event.uri)] = (event, payload) if write: self.write_event_payload(event, payload) @@ -171,7 +173,7 @@ def write_header_msgs(self) -> None: if the headers are larger than the max file size.""" true_max_file_length = self.max_file_length self._max_file_length = 0 - for (event, payload) in self.header_msgs: + for (event, payload) in self.header_msgs.values(): self.write_event_payload(event, payload) self._max_file_length = true_max_file_length if self.max_file_length and self.file_length > self.max_file_length: diff --git a/py/farm_ng/core/uri.py b/py/farm_ng/core/uri.py index 34d196de..ad9e6e00 100644 --- a/py/farm_ng/core/uri.py +++ b/py/farm_ng/core/uri.py @@ -140,8 +140,9 @@ def uri_to_string(uri: uri_pb2.Uri) -> str: >>> uri.scheme = "protobuf" >>> uri.authority = "farm-ng-1" >>> uri.query = "type=farm_ng.core.proto.Timestamp&pb=farm_ng/core/timestamp.proto" + >>> uri.path = "/stamp_stream" >>> uri_to_string(uri) - 'protobuf://farm-ng-1//?type=farm_ng.core.proto.Timestamp&pb=farm_ng/core/timestamp.proto' + 'protobuf://farm-ng-1//stamp_stream?type=farm_ng.core.proto.Timestamp&pb=farm_ng/core/timestamp.proto' """ return f"{uri.scheme}://{uri.authority}/{uri.path}?{uri.query}" diff --git a/py/tests/_asyncio/test_event_service_recorder.py b/py/tests/_asyncio/test_event_service_recorder.py index 49a83b6b..310ddce4 100644 --- a/py/tests/_asyncio/test_event_service_recorder.py +++ b/py/tests/_asyncio/test_event_service_recorder.py @@ -97,7 +97,7 @@ async def test_file_headers( event_service.request_reply_handler = request_reply_handler # Define some test values - header_uri_path: str = "/baz_header" + header_uri_base: str = "baz_header" header_count: int = 3 # Create the header and pass it to EventServiceRecorder @@ -105,7 +105,7 @@ async def test_file_headers( message = Int32Value(value=i) payload: bytes = message.SerializeToString() uri: Uri = make_proto_uri( - path=header_uri_path, + path=f"/{header_uri_base}_{i}", message=message, service_name=event_service.config.name, ) @@ -137,10 +137,9 @@ async def test_file_headers( # Check the headers logged_headers: int = 0 - expected_path: str = f"{event_service.config.name}{header_uri_path}" for event_log in reader.get_index(): # Check headers - skip any other events - if event_log.event.uri.path == expected_path: + if header_uri_base in event_log.event.uri.path: logged_headers += 1 event_message = event_log.read_message() assert isinstance(event_message, Int32Value) diff --git a/py/tests/test_events_file_writer.py b/py/tests/test_events_file_writer.py index 8eca30fe..07bdc2f2 100644 --- a/py/tests/test_events_file_writer.py +++ b/py/tests/test_events_file_writer.py @@ -134,7 +134,7 @@ def test_rollover_behavior(self, file_base: Path): def test_headers(self, file_base: Path): # Build a list of test parameters - header_uri_path: str = "/baz_header" + header_uri_base: str = "baz_header" header_uri_service: str = "test_service" header_count: int = 10 headers: list[tuple[Event, bytes]] = [] @@ -146,7 +146,7 @@ def test_headers(self, file_base: Path): message = Int32Value(value=i) event_payload: bytes = message.SerializeToString() uri: Uri = make_proto_uri( - path=header_uri_path, + path=f"/{header_uri_base}_{i}", message=message, service_name=header_uri_service, ) @@ -162,6 +162,10 @@ def test_headers(self, file_base: Path): 1.03 * (len(event.SerializeToString()) + len(event_payload)), ) + # Add some headers with duplicate URI's to ensure they are filtered out + for i in range(header_count // 2): + headers.append(headers[i]) + # Test that headers are written to the file assert max_file_bytes > header_size # If this fails, edit the test parameters with EventsFileWriter( @@ -170,7 +174,7 @@ def test_headers(self, file_base: Path): header_msgs=headers, ) as opened_writer: assert opened_writer.file_idx == 0 - assert opened_writer.file_length == pytest.approx(header_size, rel=0.01) + assert opened_writer.file_length == pytest.approx(header_size, rel=0.1) # Test that headers cannot exceed the max file size with pytest.raises(RuntimeError): @@ -189,7 +193,7 @@ def test_headers(self, file_base: Path): header_msgs=headers, ) as opened_writer: assert opened_writer.file_idx == 0 - assert opened_writer.file_length == pytest.approx(header_size, rel=0.01) + assert opened_writer.file_length == pytest.approx(header_size, rel=0.1) for i in range(1000): opened_writer.write("test_path", StringValue(value=f"test_payload_{i}")) file_count = 1 + opened_writer.file_idx From 2aae43a34c0b56e41565112816362fff9651aa30 Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Wed, 13 Dec 2023 15:02:44 -0500 Subject: [PATCH 10/25] More lenient on rough file size comp --- py/tests/test_events_file_writer.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/py/tests/test_events_file_writer.py b/py/tests/test_events_file_writer.py index 07bdc2f2..f3cd1dee 100644 --- a/py/tests/test_events_file_writer.py +++ b/py/tests/test_events_file_writer.py @@ -123,7 +123,7 @@ def test_rollover_behavior(self, file_base: Path): # Check that the length is roughly the max file size assert opened_reader.file_length == pytest.approx( max_file_bytes, - rel=0.1, + rel=0.5, ) for event_log in opened_reader.get_index(): message = opened_reader.read_message(event_log) @@ -174,7 +174,7 @@ def test_headers(self, file_base: Path): header_msgs=headers, ) as opened_writer: assert opened_writer.file_idx == 0 - assert opened_writer.file_length == pytest.approx(header_size, rel=0.1) + assert opened_writer.file_length == pytest.approx(header_size, rel=0.5) # Test that headers cannot exceed the max file size with pytest.raises(RuntimeError): @@ -193,7 +193,7 @@ def test_headers(self, file_base: Path): header_msgs=headers, ) as opened_writer: assert opened_writer.file_idx == 0 - assert opened_writer.file_length == pytest.approx(header_size, rel=0.1) + assert opened_writer.file_length == pytest.approx(header_size, rel=0.5) for i in range(1000): opened_writer.write("test_path", StringValue(value=f"test_payload_{i}")) file_count = 1 + opened_writer.file_idx @@ -222,7 +222,7 @@ def test_headers(self, file_base: Path): # Check that the length is roughly the max file size assert opened_reader.file_length == pytest.approx( max_file_bytes, - rel=0.1, + rel=0.5, ) assert message_count_in_file > 0 else: From baad5abf09e1e3153122d9dd8e6579bd0ba003d8 Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Wed, 13 Dec 2023 15:14:08 -0500 Subject: [PATCH 11/25] Bring dict logic for headers into EventServiceRecorder --- py/farm_ng/core/event_service_recorder.py | 15 ++++++--------- py/farm_ng/core/events_file_writer.py | 8 +++++--- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/py/farm_ng/core/event_service_recorder.py b/py/farm_ng/core/event_service_recorder.py index 734b5ece..bef460df 100644 --- a/py/farm_ng/core/event_service_recorder.py +++ b/py/farm_ng/core/event_service_recorder.py @@ -44,7 +44,7 @@ from farm_ng.core.events_file_reader import payload_to_protobuf, proto_from_json_file from farm_ng.core.events_file_writer import EventsFileWriter from farm_ng.core.stamp import StampSemantics, get_monotonic_now -from farm_ng.core.uri import get_host_name, uri_query_to_dict +from farm_ng.core.uri import get_host_name, uri_query_to_dict, uri_to_string from google.protobuf.empty_pb2 import Empty from google.protobuf.wrappers_pb2 import StringValue @@ -167,12 +167,8 @@ async def subscribe( async for event, payload in client.subscribe(subscription, decode=False): if event.uri in header_uris: # Handle header messages - typically these will be metadata or calibrations - # This assumes they are only published once (likely with latching=True), - # as we break the subscription after the first header message is received - self.logger.info("header received: %s", event) + # If this is a duplicate (per URI), the EventsFileWriter will replace the existing message self.header_deque.append((event, payload)) - self.logger.info("breaking subscription to %s", subscription.uri) - break try: self.record_queue.put_nowait((event, payload)) except asyncio.QueueFull: @@ -301,7 +297,7 @@ def __init__(self, event_service: EventServiceGrpc) -> None: self._recorder_task: asyncio.Task | None = None # For tracking header messages (e.g. metadata, calibrations) to be logged in the recordings - self.header_msgs: list[tuple[event_pb2.Event, bytes]] = [] + self.header_msgs: dict[str, tuple[event_pb2.Event, bytes]] = {} # public methods @@ -327,7 +323,7 @@ async def start_recording( self._recorder = EventServiceRecorder( config_name or "record_default", config_list, - self.header_msgs, + list(self.header_msgs.values()), ) self._recorder_task = asyncio.create_task( self._recorder.subscribe_and_record(file_base=file_base), @@ -403,12 +399,13 @@ async def _request_reply_handler( def add_header_msg(self, event: event_pb2.Event, payload: bytes) -> None: """Adds a header message to the header_msgs list. + If this is a duplicate (per URI), it will replace the existing message. Args: event (event_pb2.Event): the event. payload (bytes): the payload. """ - self.header_msgs.append((event, payload)) + self.header_msgs[uri_to_string(event.uri)] = (event, payload) if self._recorder is not None: self._event_service.logger.info("Adding header msg to active recording") self._recorder.header_deque.append((event, payload)) diff --git a/py/farm_ng/core/events_file_writer.py b/py/farm_ng/core/events_file_writer.py index 420e45b9..4d8bdbe3 100644 --- a/py/farm_ng/core/events_file_writer.py +++ b/py/farm_ng/core/events_file_writer.py @@ -82,7 +82,7 @@ def __init__( self._max_file_length = int(max(0, max_file_mb) * 1e6) self._file_idx: int = 0 - self._header_msgs: dict[Uri, tuple[Event, bytes]] = {} + self._header_msgs: dict[str, tuple[Event, bytes]] = {} for (event, payload) in header_msgs or []: self.add_header_msg(event, payload) @@ -142,10 +142,12 @@ def _increment_file_idx(self) -> None: self._file_idx += 1 @property - def header_msgs(self) -> dict[Uri, tuple[Event, bytes]]: + def header_msgs(self) -> dict[str, tuple[Event, bytes]]: """Return the list of header messages. Returns: - dict[Uri, tuple[Event, bytes]]: List of header messages. + dict[str, tuple[Event, bytes]]: Dictionary of header messages. + key: string representation of the uri + value: tuple of event and payload """ return self._header_msgs From 1a79d2e3d8c159e1a35e3c2f8482eb9287ca2695 Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Wed, 13 Dec 2023 15:15:26 -0500 Subject: [PATCH 12/25] message_count logic was flipped --- py/tests/test_events_file_writer.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/py/tests/test_events_file_writer.py b/py/tests/test_events_file_writer.py index f3cd1dee..360fb6f5 100644 --- a/py/tests/test_events_file_writer.py +++ b/py/tests/test_events_file_writer.py @@ -218,12 +218,12 @@ def test_headers(self, file_base: Path): raise TypeError(msg) assert header_count_in_file == header_count - if i != file_count - 1: + if i == file_count - 1: + assert message_count_in_file > 0 + else: + assert message_count_in_file > 10 # Check that the length is roughly the max file size assert opened_reader.file_length == pytest.approx( max_file_bytes, rel=0.5, ) - assert message_count_in_file > 0 - else: - assert message_count_in_file > 10 From 1b81fdca3cb56d9d576b52966923a2cdd9d7b093 Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Wed, 13 Dec 2023 15:30:48 -0500 Subject: [PATCH 13/25] EventsFileWriter headers cannot be changed --- py/farm_ng/core/events_file_writer.py | 12 ++++++------ py/tests/test_events_file_writer.py | 16 +++++++++++++--- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/py/farm_ng/core/events_file_writer.py b/py/farm_ng/core/events_file_writer.py index 4d8bdbe3..48ca7f84 100644 --- a/py/farm_ng/core/events_file_writer.py +++ b/py/farm_ng/core/events_file_writer.py @@ -166,9 +166,11 @@ def add_header_msg(self, event: Event, payload: bytes, write: bool = False) -> N if not isinstance(payload, bytes): error_msg = f"payload must be bytes, not {type(payload)}" raise TypeError(error_msg) - self._header_msgs[uri_to_string(event.uri)] = (event, payload) - if write: - self.write_event_payload(event, payload) + # Once a header message is written to the file, it cannot be changed + if uri_to_string(event.uri) not in self.header_msgs: + self._header_msgs[uri_to_string(event.uri)] = (event, payload) + if write: + self.write_event_payload(event, payload) def write_header_msgs(self) -> None: """Write the header messages to the file, without getting stuck in a loop @@ -205,9 +207,7 @@ def write_event_payload(self, event: Event, payload: bytes) -> None: if event.payload_length != len(payload): msg = f"Payload length mismatch {event.payload_length} != {len(payload)}" - raise RuntimeError( - msg, - ) + raise RuntimeError(msg) file_stream = cast(IO, self._file_stream) diff --git a/py/tests/test_events_file_writer.py b/py/tests/test_events_file_writer.py index 360fb6f5..343536b7 100644 --- a/py/tests/test_events_file_writer.py +++ b/py/tests/test_events_file_writer.py @@ -162,9 +162,16 @@ def test_headers(self, file_base: Path): 1.03 * (len(event.SerializeToString()) + len(event_payload)), ) - # Add some headers with duplicate URI's to ensure they are filtered out - for i in range(header_count // 2): - headers.append(headers[i]) + # Add some extra headers with duplicate URI's to ensure they are filtered out + dup_message = Int32Value(value=999 + i) + dup_payload: bytes = dup_message.SerializeToString() + dup_event = Event( + uri=uri, + timestamps=[], + payload_length=len(dup_payload), + sequence=i + header_count, + ) + headers.append((dup_event, dup_payload)) # Test that headers are written to the file assert max_file_bytes > header_size # If this fails, edit the test parameters @@ -212,6 +219,9 @@ def test_headers(self, file_base: Path): if isinstance(message, StringValue): message_count_in_file += 1 elif isinstance(message, Int32Value): + # Test that the duplicate header was filtered out + # And that the headers are in order + assert message.value == header_count_in_file header_count_in_file += 1 else: msg = f"Unexpected message type: {type(message)}" From 8902d8f5d1263aa9c631b9426978696e63f94204 Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Wed, 13 Dec 2023 15:56:48 -0500 Subject: [PATCH 14/25] Reduce test dependency on inaccurate file length estimation --- py/tests/test_events_file_writer.py | 53 ++++++++++++++--------------- 1 file changed, 25 insertions(+), 28 deletions(-) diff --git a/py/tests/test_events_file_writer.py b/py/tests/test_events_file_writer.py index 343536b7..e9b63c54 100644 --- a/py/tests/test_events_file_writer.py +++ b/py/tests/test_events_file_writer.py @@ -84,10 +84,10 @@ def test_rollover_behavior(self, file_base: Path): file_base=file_base, max_file_mb=max_file_bytes * 1e-6, ) as opened_writer: - expected_length: int = 0 + approx_expected_length: int = 0 for i in range(100): message = StringValue(value=f"test_payload_{i}") - event_payload: bytes = message.SerializeToString() + payload: bytes = message.SerializeToString() uri: Uri = make_proto_uri( path="/test_path", message=message, @@ -96,30 +96,24 @@ def test_rollover_behavior(self, file_base: Path): event = Event( uri=uri, timestamps=[], - payload_length=len(event_payload), + payload_length=len(payload), sequence=i, ) - opened_writer.write_event_payload(event, event_payload) - # NOTE: 1.03 is a consistent byte size overhead for writing the event and payload - expected_length += int( - 1.03 - * ( - len(message.SerializeToString()) - + len(event.SerializeToString()) - ), - ) + opened_writer.write_event_payload(event, payload) + approx_expected_length += len(payload) + len(event.SerializeToString()) + # Check that the file was rolled over the expected number of times - expected_files: int = expected_length // max_file_bytes - assert ( - expected_files > 5 - ) # If this fails, increase for loop size to trigger enough rollovers - assert expected_files == 1 + opened_writer.file_idx + approx_expected_files: int = approx_expected_length // max_file_bytes + actual_files: int = 1 + opened_writer.file_idx + assert approx_expected_files > 5 # sufficient rollovers + # The expected logging size is not a perfect estimate, so allow some wiggle room + assert approx_expected_files == pytest.approx(actual_files, rel=0.5) - for i in range(expected_files): + for i in range(actual_files): file_name_bin = file_base.with_suffix(f".{i:04d}.bin") assert file_name_bin.exists() with EventsFileReader(file_name_bin) as opened_reader: - if i != expected_files - 1: + if i != actual_files - 1: # Check that the length is roughly the max file size assert opened_reader.file_length == pytest.approx( max_file_bytes, @@ -138,7 +132,7 @@ def test_headers(self, file_base: Path): header_uri_service: str = "test_service" header_count: int = 10 headers: list[tuple[Event, bytes]] = [] - header_size: int = 0 + approx_header_size: int = 0 max_file_bytes: int = 10000 # Create the header messages @@ -157,10 +151,7 @@ def test_headers(self, file_base: Path): sequence=i, ) headers.append((event, event_payload)) - # NOTE: 1.03 is a consistent byte size overhead for writing the event and payload - header_size += int( - 1.03 * (len(event.SerializeToString()) + len(event_payload)), - ) + approx_header_size += len(event.SerializeToString()) + len(event_payload) # Add some extra headers with duplicate URI's to ensure they are filtered out dup_message = Int32Value(value=999 + i) @@ -174,20 +165,23 @@ def test_headers(self, file_base: Path): headers.append((dup_event, dup_payload)) # Test that headers are written to the file - assert max_file_bytes > header_size # If this fails, edit the test parameters + assert max_file_bytes > approx_header_size * 1.5 # sufficient space for headers with EventsFileWriter( file_base=file_base, max_file_mb=max_file_bytes * 1e-6, header_msgs=headers, ) as opened_writer: assert opened_writer.file_idx == 0 - assert opened_writer.file_length == pytest.approx(header_size, rel=0.5) + assert opened_writer.file_length == pytest.approx( + approx_header_size, + rel=0.5, + ) # Test that headers cannot exceed the max file size with pytest.raises(RuntimeError): with EventsFileWriter( file_base=file_base, - max_file_mb=(header_size / 2) * 1e-6, + max_file_mb=(approx_header_size / 2) * 1e-6, header_msgs=headers, ) as opened_writer: pass @@ -200,7 +194,10 @@ def test_headers(self, file_base: Path): header_msgs=headers, ) as opened_writer: assert opened_writer.file_idx == 0 - assert opened_writer.file_length == pytest.approx(header_size, rel=0.5) + assert opened_writer.file_length == pytest.approx( + approx_header_size, + rel=0.5, + ) for i in range(1000): opened_writer.write("test_path", StringValue(value=f"test_payload_{i}")) file_count = 1 + opened_writer.file_idx From 5fda9339d2666561430e82a391eee9d33cb62363 Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Wed, 13 Dec 2023 16:07:30 -0500 Subject: [PATCH 15/25] Match on uri_to_string -- seems safer than direct proto comparison --- py/farm_ng/core/event_service_recorder.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/py/farm_ng/core/event_service_recorder.py b/py/farm_ng/core/event_service_recorder.py index bef460df..18ee7783 100644 --- a/py/farm_ng/core/event_service_recorder.py +++ b/py/farm_ng/core/event_service_recorder.py @@ -50,7 +50,6 @@ if TYPE_CHECKING: from farm_ng.core import event_pb2 - from farm_ng.core.uri_pb2 import Uri from google.protobuf.message import Message __all__ = ["EventServiceRecorder", "RecorderService"] @@ -154,20 +153,20 @@ async def subscribe( self, client: EventClient, subscription: SubscribeRequest, - header_uris: list[Uri], + header_uris: list[str], ) -> None: """Subscribes to a service and puts the events in the queue. Args: client (EventClient): the client to subscribe to. subscription (SubscribeRequest): the subscription request. + header_uris (list[str]): the list of URIs (as strings) to consider as header messages. """ event: event_pb2.Event payload: bytes async for event, payload in client.subscribe(subscription, decode=False): - if event.uri in header_uris: + if uri_to_string(event.uri) in header_uris: # Handle header messages - typically these will be metadata or calibrations - # If this is a duplicate (per URI), the EventsFileWriter will replace the existing message self.header_deque.append((event, payload)) try: self.record_queue.put_nowait((event, payload)) @@ -213,12 +212,12 @@ async def subscribe_and_record( continue client: EventClient = self.clients[query_service_name] # Build a list of header URIs matching the service_name for this subscription - header_uris: list[Uri] = [] + header_uris: list[str] = [] for header_uri in self.recorder_config.header_uris: header_dict: dict[str, str] = uri_query_to_dict(uri=header_uri) header_service_name: str = header_dict["service_name"] if header_service_name == query_service_name: - header_uris.append(header_uri) + header_uris.append(uri_to_string(header_uri)) async_tasks.append( asyncio.create_task( self.subscribe( From 1b4def09910484b365f68cbf886e7e95acd771a3 Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Wed, 13 Dec 2023 16:15:04 -0500 Subject: [PATCH 16/25] Tweak test comments --- py/tests/test_events_file_writer.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/py/tests/test_events_file_writer.py b/py/tests/test_events_file_writer.py index e9b63c54..e88e61ca 100644 --- a/py/tests/test_events_file_writer.py +++ b/py/tests/test_events_file_writer.py @@ -105,7 +105,7 @@ def test_rollover_behavior(self, file_base: Path): # Check that the file was rolled over the expected number of times approx_expected_files: int = approx_expected_length // max_file_bytes actual_files: int = 1 + opened_writer.file_idx - assert approx_expected_files > 5 # sufficient rollovers + assert approx_expected_files > 5 # sufficient number of rollovers (params) # The expected logging size is not a perfect estimate, so allow some wiggle room assert approx_expected_files == pytest.approx(actual_files, rel=0.5) @@ -165,7 +165,7 @@ def test_headers(self, file_base: Path): headers.append((dup_event, dup_payload)) # Test that headers are written to the file - assert max_file_bytes > approx_header_size * 1.5 # sufficient space for headers + assert max_file_bytes > approx_header_size * 1.5 # sufficient buffer (params) with EventsFileWriter( file_base=file_base, max_file_mb=max_file_bytes * 1e-6, @@ -201,9 +201,7 @@ def test_headers(self, file_base: Path): for i in range(1000): opened_writer.write("test_path", StringValue(value=f"test_payload_{i}")) file_count = 1 + opened_writer.file_idx - assert ( - file_count > 5 - ) # If this fails, increase for loop size to trigger enough rollovers + assert file_count > 5 # sufficient number of rollovers (params) for i in range(file_count): header_count_in_file: int = 0 From 2903251d7c815607e90544c5c7d416fa67f205c0 Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Thu, 14 Dec 2023 14:32:23 -0500 Subject: [PATCH 17/25] Set self._recorder = None in _safe_done_callback --- py/farm_ng/core/event_service_recorder.py | 1 + 1 file changed, 1 insertion(+) diff --git a/py/farm_ng/core/event_service_recorder.py b/py/farm_ng/core/event_service_recorder.py index 18ee7783..0e57eb52 100644 --- a/py/farm_ng/core/event_service_recorder.py +++ b/py/farm_ng/core/event_service_recorder.py @@ -339,6 +339,7 @@ def _safe_done_callback( finally: del future del recorder_task + self._recorder = None self._recorder_task.add_done_callback( lambda f: _safe_done_callback(f, self._recorder_task), From 378b98d9798a573437c4863072ea5e455839ada7 Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Thu, 14 Dec 2023 14:34:08 -0500 Subject: [PATCH 18/25] Require addtl arg for header cmd (e.g. 'header/metadata') --- py/farm_ng/core/event_service_recorder.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/py/farm_ng/core/event_service_recorder.py b/py/farm_ng/core/event_service_recorder.py index 0e57eb52..843b214d 100644 --- a/py/farm_ng/core/event_service_recorder.py +++ b/py/farm_ng/core/event_service_recorder.py @@ -372,11 +372,12 @@ async def _request_reply_handler( Message: the response message. """ cmd: str = request.event.uri.path - config_name: str | None = None + extra_arg: str | None = None if cmd.count("/") == 1: - cmd, config_name = cmd.split("/") + cmd, extra_arg = cmd.split("/") if cmd == "start": + config_name: str | None = extra_arg config_list: EventServiceConfigList = payload_to_protobuf( request.event, request.payload, @@ -387,8 +388,15 @@ async def _request_reply_handler( return StringValue(value=str(file_base)) if cmd == "stop": await self.stop_recording() - elif cmd == "add_header_msg": - self._event_service.logger.info("add_header_msg: %s", request.payload) + elif cmd == "header": + if not extra_arg: + msg = "header command requires an argument, e.g. 'header/metadata'" + raise ValueError(msg) + self._event_service.logger.info( + "add_header_msg:\n%s", + payload_to_protobuf(request.event, request.payload), + ) + self._event_service.logger.info("with uri:\n%s", request.event.uri) self.add_header_msg(request.event, request.payload) elif cmd == "clear_headers": self._event_service.logger.info("clear_headers") @@ -407,7 +415,6 @@ def add_header_msg(self, event: event_pb2.Event, payload: bytes) -> None: """ self.header_msgs[uri_to_string(event.uri)] = (event, payload) if self._recorder is not None: - self._event_service.logger.info("Adding header msg to active recording") self._recorder.header_deque.append((event, payload)) From 03293c2891d20c8fb50a2f6e2c62f4cac981b73e Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Thu, 14 Dec 2023 15:41:11 -0500 Subject: [PATCH 19/25] Consistency - leading slash for req-rep (e.g., '/start') --- py/farm_ng/core/event_service_recorder.py | 34 +++++++++++++---------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/py/farm_ng/core/event_service_recorder.py b/py/farm_ng/core/event_service_recorder.py index 843b214d..94d39f63 100644 --- a/py/farm_ng/core/event_service_recorder.py +++ b/py/farm_ng/core/event_service_recorder.py @@ -372,34 +372,38 @@ async def _request_reply_handler( Message: the response message. """ cmd: str = request.event.uri.path - extra_arg: str | None = None - if cmd.count("/") == 1: - cmd, extra_arg = cmd.split("/") + # Ensure path was sent with a leading slash + if not cmd.startswith("/"): + cmd = "/" + cmd + + if cmd.startswith("/start"): + # config_name is the optional part of the path after "/start/" + config_name: str | None = cmd[7:] or None - if cmd == "start": - config_name: str | None = extra_arg config_list: EventServiceConfigList = payload_to_protobuf( request.event, request.payload, ) - self._event_service.logger.info("start %s: %s", config_name, config_list) + self._event_service.logger.info("/start %s: %s", config_name, config_list) file_base = self._data_dir.joinpath(get_file_name_base()) await self.start_recording(file_base, config_list, config_name) return StringValue(value=str(file_base)) - if cmd == "stop": + if cmd == "/stop": + self._event_service.logger.info("/stop") await self.stop_recording() - elif cmd == "header": - if not extra_arg: - msg = "header command requires an argument, e.g. 'header/metadata'" + elif cmd.startswith("/header"): + header_detail: str = cmd[8:] or "" + if not header_detail: + msg = "/header command requires a specifier, e.g. '/header/metadata'" raise ValueError(msg) self._event_service.logger.info( - "add_header_msg:\n%s", + "header:\n%s", payload_to_protobuf(request.event, request.payload), ) self._event_service.logger.info("with uri:\n%s", request.event.uri) self.add_header_msg(request.event, request.payload) - elif cmd == "clear_headers": - self._event_service.logger.info("clear_headers") + elif cmd == "/clear_headers": + self._event_service.logger.info("/clear_headers") self.header_msgs.clear() if self._recorder is not None: self._recorder.header_deque.clear() @@ -446,7 +450,7 @@ def client_start_command(_args): async def job(): reply = await EventClient(service_config).request_reply( - f"start/{config_name}", + f"/start/{config_name}", config_list, ) print(payload_to_protobuf(reply.event, reply.payload)) @@ -457,7 +461,7 @@ async def job(): def client_stop_command(_args): config_list, service_config = load_service_config(args) loop = asyncio.get_event_loop() - loop.run_until_complete(EventClient(service_config).request_reply("stop", Empty())) + loop.run_until_complete(EventClient(service_config).request_reply("/stop", Empty())) def record_command(_args): From c09db6ce08054b2efd5fd496a8205c7b53df8654 Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Fri, 15 Dec 2023 11:43:56 -0500 Subject: [PATCH 20/25] Add more type checking on header events / payloads --- py/farm_ng/core/event_service_recorder.py | 28 ++++++++++++++++++++--- py/farm_ng/core/events_file_writer.py | 7 +++--- 2 files changed, 29 insertions(+), 6 deletions(-) diff --git a/py/farm_ng/core/event_service_recorder.py b/py/farm_ng/core/event_service_recorder.py index 94d39f63..31eec166 100644 --- a/py/farm_ng/core/event_service_recorder.py +++ b/py/farm_ng/core/event_service_recorder.py @@ -29,6 +29,7 @@ from typing import TYPE_CHECKING import grpc +from farm_ng.core import event_pb2 from farm_ng.core.event_client import EventClient from farm_ng.core.event_service import ( EventServiceGrpc, @@ -49,7 +50,6 @@ from google.protobuf.wrappers_pb2 import StringValue if TYPE_CHECKING: - from farm_ng.core import event_pb2 from google.protobuf.message import Message __all__ = ["EventServiceRecorder", "RecorderService"] @@ -102,8 +102,24 @@ def __init__( maxsize=self.QUEUE_MAX_SIZE, ) self.header_deque: deque[tuple[event_pb2.Event, bytes]] = deque() - if header_msgs: - self.header_deque.extend(header_msgs) + if header_msgs is not None: + for event, payload in header_msgs: + self.add_header_msg(event, payload) + + def add_header_msg(self, event: event_pb2.Event, payload: bytes) -> None: + """Add a header message to the header_deque. + + Args: + event: Event to add. + payload: Payload to add. + """ + if not isinstance(event, event_pb2.Event): + error_msg = f"header event must be Event, not {type(event)}" + raise TypeError(error_msg) + if not isinstance(payload, bytes): + error_msg = f"header payload must be bytes, not {type(payload)}" + raise TypeError(error_msg) + self.header_deque.append((event, payload)) @property def logger(self) -> logging.Logger: @@ -417,6 +433,12 @@ def add_header_msg(self, event: event_pb2.Event, payload: bytes) -> None: event (event_pb2.Event): the event. payload (bytes): the payload. """ + if not isinstance(event, event_pb2.Event): + error_msg = f"header event must be Event, not {type(event)}" + raise TypeError(error_msg) + if not isinstance(payload, bytes): + error_msg = f"header payload must be bytes, not {type(payload)}" + raise TypeError(error_msg) self.header_msgs[uri_to_string(event.uri)] = (event, payload) if self._recorder is not None: self._recorder.header_deque.append((event, payload)) diff --git a/py/farm_ng/core/events_file_writer.py b/py/farm_ng/core/events_file_writer.py index 48ca7f84..019b8a3d 100644 --- a/py/farm_ng/core/events_file_writer.py +++ b/py/farm_ng/core/events_file_writer.py @@ -144,6 +144,7 @@ def _increment_file_idx(self) -> None: @property def header_msgs(self) -> dict[str, tuple[Event, bytes]]: """Return the list of header messages. + Returns: dict[str, tuple[Event, bytes]]: Dictionary of header messages. key: string representation of the uri @@ -153,18 +154,18 @@ def header_msgs(self) -> dict[str, tuple[Event, bytes]]: def add_header_msg(self, event: Event, payload: bytes, write: bool = False) -> None: """Add a header message, and optionally writes it to the file. - NOTE: Writing to file will fail if the file is not open. + NOTE: Writing to file will fail if the file is not open. Args: event: Event to write. payload: Payload to write. write: If True, write the header message to the file. Defaults to False. """ if not isinstance(event, Event): - error_msg = f"event must be Event, not {type(event)}" + error_msg = f"header event must be Event, not {type(event)}" raise TypeError(error_msg) if not isinstance(payload, bytes): - error_msg = f"payload must be bytes, not {type(payload)}" + error_msg = f"header payload must be bytes, not {type(payload)}" raise TypeError(error_msg) # Once a header message is written to the file, it cannot be changed if uri_to_string(event.uri) not in self.header_msgs: From 84ae0d0536d721e172a6f501a3044331d92293f0 Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Fri, 15 Dec 2023 12:05:31 -0500 Subject: [PATCH 21/25] Rename header_msgs -> header_events (more accurate) --- py/farm_ng/core/event_service_recorder.py | 39 ++++++++++---------- py/farm_ng/core/events_file_writer.py | 45 +++++++++++++---------- py/tests/test_events_file_writer.py | 8 ++-- 3 files changed, 49 insertions(+), 43 deletions(-) diff --git a/py/farm_ng/core/event_service_recorder.py b/py/farm_ng/core/event_service_recorder.py index 31eec166..e525678a 100644 --- a/py/farm_ng/core/event_service_recorder.py +++ b/py/farm_ng/core/event_service_recorder.py @@ -65,13 +65,14 @@ def __init__( self, service_name: str, config_list: EventServiceConfigList, - header_msgs: list[tuple[event_pb2.Event, bytes]] | None = None, + header_events: list[tuple[event_pb2.Event, bytes]] | None = None, ) -> None: """Initializes the service. Args: service_name (str): the name of the service. config_list (EventServiceConfigList): the configuration data structure. + header_events (list[tuple[event_pb2.Event, bytes]], optional): the initial list header events. """ self.service_name: str = service_name self.config_list: EventServiceConfigList = config_list @@ -102,12 +103,12 @@ def __init__( maxsize=self.QUEUE_MAX_SIZE, ) self.header_deque: deque[tuple[event_pb2.Event, bytes]] = deque() - if header_msgs is not None: - for event, payload in header_msgs: - self.add_header_msg(event, payload) + if header_events is not None: + for event, payload in header_events: + self.add_header_event(event, payload) - def add_header_msg(self, event: event_pb2.Event, payload: bytes) -> None: - """Add a header message to the header_deque. + def add_header_event(self, event: event_pb2.Event, payload: bytes) -> None: + """Add a header event to the header_deque. Args: event: Event to add. @@ -148,16 +149,16 @@ async def record( file_base, extension, max_file_mb, - header_msgs=list(self.header_deque), + header_events=list(self.header_deque), ) as writer: self.header_deque.clear() event: event_pb2.Event payload: bytes while True: - # Add any header messages added during recording + # Add any header events added during recording while self.header_deque: event, payload = self.header_deque.popleft() - writer.add_header_msg(event, payload, write=True) + writer.add_header_event(event, payload, write=True) # await a new event and payload, and write it to the file event, payload = await self.record_queue.get() event.timestamps.append( @@ -176,13 +177,13 @@ async def subscribe( Args: client (EventClient): the client to subscribe to. subscription (SubscribeRequest): the subscription request. - header_uris (list[str]): the list of URIs (as strings) to consider as header messages. + header_uris (list[str]): the list of URIs (as strings) to consider as header events. """ event: event_pb2.Event payload: bytes async for event, payload in client.subscribe(subscription, decode=False): if uri_to_string(event.uri) in header_uris: - # Handle header messages - typically these will be metadata or calibrations + # Handle header events - typically these will be metadata or calibrations self.header_deque.append((event, payload)) try: self.record_queue.put_nowait((event, payload)) @@ -311,8 +312,8 @@ def __init__(self, event_service: EventServiceGrpc) -> None: # the recorder task self._recorder_task: asyncio.Task | None = None - # For tracking header messages (e.g. metadata, calibrations) to be logged in the recordings - self.header_msgs: dict[str, tuple[event_pb2.Event, bytes]] = {} + # For tracking header events (e.g. metadata, calibrations) to be logged in the recordings + self.header_events: dict[str, tuple[event_pb2.Event, bytes]] = {} # public methods @@ -338,7 +339,7 @@ async def start_recording( self._recorder = EventServiceRecorder( config_name or "record_default", config_list, - list(self.header_msgs.values()), + list(self.header_events.values()), ) self._recorder_task = asyncio.create_task( self._recorder.subscribe_and_record(file_base=file_base), @@ -417,16 +418,16 @@ async def _request_reply_handler( payload_to_protobuf(request.event, request.payload), ) self._event_service.logger.info("with uri:\n%s", request.event.uri) - self.add_header_msg(request.event, request.payload) + self.add_header_event(request.event, request.payload) elif cmd == "/clear_headers": self._event_service.logger.info("/clear_headers") - self.header_msgs.clear() + self.header_events.clear() if self._recorder is not None: self._recorder.header_deque.clear() return Empty() - def add_header_msg(self, event: event_pb2.Event, payload: bytes) -> None: - """Adds a header message to the header_msgs list. + def add_header_event(self, event: event_pb2.Event, payload: bytes) -> None: + """Adds a header event to the header_events list. If this is a duplicate (per URI), it will replace the existing message. Args: @@ -439,7 +440,7 @@ def add_header_msg(self, event: event_pb2.Event, payload: bytes) -> None: if not isinstance(payload, bytes): error_msg = f"header payload must be bytes, not {type(payload)}" raise TypeError(error_msg) - self.header_msgs[uri_to_string(event.uri)] = (event, payload) + self.header_events[uri_to_string(event.uri)] = (event, payload) if self._recorder is not None: self._recorder.header_deque.append((event, payload)) diff --git a/py/farm_ng/core/events_file_writer.py b/py/farm_ng/core/events_file_writer.py index 019b8a3d..682eaa4e 100644 --- a/py/farm_ng/core/events_file_writer.py +++ b/py/farm_ng/core/events_file_writer.py @@ -57,7 +57,7 @@ def __init__( file_base: str | Path, extension: str = ".bin", max_file_mb: int = 0, - header_msgs: list[tuple[Event, bytes]] | None = None, + header_events: list[tuple[Event, bytes]] | None = None, ) -> None: """Create a new EventsFileWriter. @@ -65,7 +65,7 @@ def __init__( file_base: Path to and base of file name (without extension) where the events file will be logged. extension: Extension of the file to be logged. E.g., '.bin' or '.log' max_file_mb: Maximum log size in MB. Logging will roll over to new file when reached. Ignored if <= 0. - header_msgs: Tuples of events & payloads to include in every split of the log file + header_events: Tuples of events & payloads to include in every split of the log file """ if isinstance(file_base, str): file_base = Path(file_base) @@ -82,9 +82,9 @@ def __init__( self._max_file_length = int(max(0, max_file_mb) * 1e6) self._file_idx: int = 0 - self._header_msgs: dict[str, tuple[Event, bytes]] = {} - for (event, payload) in header_msgs or []: - self.add_header_msg(event, payload) + self._header_events: dict[str, tuple[Event, bytes]] = {} + for (event, payload) in header_events or []: + self.add_header_event(event, payload) def __enter__(self) -> EventsFileWriter: """Open the file for writing and return self.""" @@ -142,24 +142,29 @@ def _increment_file_idx(self) -> None: self._file_idx += 1 @property - def header_msgs(self) -> dict[str, tuple[Event, bytes]]: - """Return the list of header messages. + def header_events(self) -> dict[str, tuple[Event, bytes]]: + """Return the dictionary of header events. Returns: - dict[str, tuple[Event, bytes]]: Dictionary of header messages. + dict[str, tuple[Event, bytes]]: Dictionary of header events and payloads. key: string representation of the uri value: tuple of event and payload """ - return self._header_msgs + return self._header_events - def add_header_msg(self, event: Event, payload: bytes, write: bool = False) -> None: - """Add a header message, and optionally writes it to the file. + def add_header_event( + self, + event: Event, + payload: bytes, + write: bool = False, + ) -> None: + """Add a header event, and optionally writes it to the file. NOTE: Writing to file will fail if the file is not open. Args: event: Event to write. payload: Payload to write. - write: If True, write the header message to the file. Defaults to False. + write: If True, write the header event to the file. Defaults to False. """ if not isinstance(event, Event): error_msg = f"header event must be Event, not {type(event)}" @@ -167,29 +172,29 @@ def add_header_msg(self, event: Event, payload: bytes, write: bool = False) -> N if not isinstance(payload, bytes): error_msg = f"header payload must be bytes, not {type(payload)}" raise TypeError(error_msg) - # Once a header message is written to the file, it cannot be changed - if uri_to_string(event.uri) not in self.header_msgs: - self._header_msgs[uri_to_string(event.uri)] = (event, payload) + # Once a header event is written to the file, it cannot be changed + if uri_to_string(event.uri) not in self.header_events: + self._header_events[uri_to_string(event.uri)] = (event, payload) if write: self.write_event_payload(event, payload) - def write_header_msgs(self) -> None: - """Write the header messages to the file, without getting stuck in a loop + def write_header_events(self) -> None: + """Write the header events to the file, without getting stuck in a loop if the headers are larger than the max file size.""" true_max_file_length = self.max_file_length self._max_file_length = 0 - for (event, payload) in self.header_msgs.values(): + for (event, payload) in self.header_events.values(): self.write_event_payload(event, payload) self._max_file_length = true_max_file_length if self.max_file_length and self.file_length > self.max_file_length: - msg = f"Header messages are too large to fit in a file of size {self.max_file_length}" + msg = f"Header events are too large to fit in a file of size {self.max_file_length}" raise RuntimeError(msg) def open(self) -> bool: """Open the file for writing. Return True if successful.""" self._file_stream = Path(self.file_name).open("wb") self._file_length = 0 - self.write_header_msgs() + self.write_header_events() return self.is_open() def close(self) -> bool: diff --git a/py/tests/test_events_file_writer.py b/py/tests/test_events_file_writer.py index e88e61ca..643b185d 100644 --- a/py/tests/test_events_file_writer.py +++ b/py/tests/test_events_file_writer.py @@ -135,7 +135,7 @@ def test_headers(self, file_base: Path): approx_header_size: int = 0 max_file_bytes: int = 10000 - # Create the header messages + # Create the header events for i in range(header_count): message = Int32Value(value=i) event_payload: bytes = message.SerializeToString() @@ -169,7 +169,7 @@ def test_headers(self, file_base: Path): with EventsFileWriter( file_base=file_base, max_file_mb=max_file_bytes * 1e-6, - header_msgs=headers, + header_events=headers, ) as opened_writer: assert opened_writer.file_idx == 0 assert opened_writer.file_length == pytest.approx( @@ -182,7 +182,7 @@ def test_headers(self, file_base: Path): with EventsFileWriter( file_base=file_base, max_file_mb=(approx_header_size / 2) * 1e-6, - header_msgs=headers, + header_events=headers, ) as opened_writer: pass @@ -191,7 +191,7 @@ def test_headers(self, file_base: Path): with EventsFileWriter( file_base=file_base, max_file_mb=max_file_bytes * 1e-6, - header_msgs=headers, + header_events=headers, ) as opened_writer: assert opened_writer.file_idx == 0 assert opened_writer.file_length == pytest.approx( From e790025f6295bb7ce46fa9e9a3837869343ff21c Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Fri, 15 Dec 2023 12:11:05 -0500 Subject: [PATCH 22/25] Add method for getting header_messages as protobufs w/ test --- py/farm_ng/core/events_file_writer.py | 13 +++++++++++++ py/tests/test_events_file_writer.py | 5 +++++ 2 files changed, 18 insertions(+) diff --git a/py/farm_ng/core/events_file_writer.py b/py/farm_ng/core/events_file_writer.py index 682eaa4e..39746d38 100644 --- a/py/farm_ng/core/events_file_writer.py +++ b/py/farm_ng/core/events_file_writer.py @@ -7,6 +7,7 @@ # pylint can't find Event or Uri in protobuf generated files # https://github.com/protocolbuffers/protobuf/issues/10372 from farm_ng.core.event_pb2 import Event +from farm_ng.core.events_file_reader import payload_to_protobuf from farm_ng.core.stamp import StampSemantics, get_monotonic_now, get_system_clock_now from farm_ng.core.uri import make_proto_uri, uri_to_string from google.protobuf.json_format import MessageToJson @@ -152,6 +153,18 @@ def header_events(self) -> dict[str, tuple[Event, bytes]]: """ return self._header_events + @property + def header_messages(self) -> list[Message]: + """Return the header_events, formatted as a list of protobuf messages. + + Returns: + list[Message]: List of header messages. + """ + return [ + payload_to_protobuf(event, payload) + for (event, payload) in self.header_events.values() + ] + def add_header_event( self, event: Event, diff --git a/py/tests/test_events_file_writer.py b/py/tests/test_events_file_writer.py index 643b185d..2b18a05b 100644 --- a/py/tests/test_events_file_writer.py +++ b/py/tests/test_events_file_writer.py @@ -13,6 +13,7 @@ from pathlib import Path from farm_ng.core.uri_pb2 import Uri + from google.protobuf.message import Message @pytest.fixture() @@ -132,6 +133,7 @@ def test_headers(self, file_base: Path): header_uri_service: str = "test_service" header_count: int = 10 headers: list[tuple[Event, bytes]] = [] + header_messages: list[Message] = [] approx_header_size: int = 0 max_file_bytes: int = 10000 @@ -151,6 +153,7 @@ def test_headers(self, file_base: Path): sequence=i, ) headers.append((event, event_payload)) + header_messages.append(message) approx_header_size += len(event.SerializeToString()) + len(event_payload) # Add some extra headers with duplicate URI's to ensure they are filtered out @@ -176,6 +179,7 @@ def test_headers(self, file_base: Path): approx_header_size, rel=0.5, ) + assert opened_writer.header_messages == header_messages # Test that headers cannot exceed the max file size with pytest.raises(RuntimeError): @@ -202,6 +206,7 @@ def test_headers(self, file_base: Path): opened_writer.write("test_path", StringValue(value=f"test_payload_{i}")) file_count = 1 + opened_writer.file_idx assert file_count > 5 # sufficient number of rollovers (params) + assert opened_writer.header_messages == header_messages for i in range(file_count): header_count_in_file: int = 0 From 3e1c9572836c268d4e41bc29e72f4cdc57c6affe Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Fri, 15 Dec 2023 14:12:20 -0500 Subject: [PATCH 23/25] Simplification - just start uri path's with /header --- protos/farm_ng/core/event_service.proto | 2 -- py/farm_ng/core/event_service_recorder.py | 23 ++++------------------- 2 files changed, 4 insertions(+), 21 deletions(-) diff --git a/protos/farm_ng/core/event_service.proto b/protos/farm_ng/core/event_service.proto index 7b1fdc98..1cd0a34e 100644 --- a/protos/farm_ng/core/event_service.proto +++ b/protos/farm_ng/core/event_service.proto @@ -104,8 +104,6 @@ message EventServiceConfig { DEBUG = 10; } LogLevel log_level = 7; - // URIs to treat as headers for the EventServiceRecorder - repeated Uri header_uris = 8; } // list of event service configurations diff --git a/py/farm_ng/core/event_service_recorder.py b/py/farm_ng/core/event_service_recorder.py index e525678a..b6f29f27 100644 --- a/py/farm_ng/core/event_service_recorder.py +++ b/py/farm_ng/core/event_service_recorder.py @@ -170,21 +170,19 @@ async def subscribe( self, client: EventClient, subscription: SubscribeRequest, - header_uris: list[str], ) -> None: """Subscribes to a service and puts the events in the queue. Args: client (EventClient): the client to subscribe to. subscription (SubscribeRequest): the subscription request. - header_uris (list[str]): the list of URIs (as strings) to consider as header events. """ event: event_pb2.Event payload: bytes async for event, payload in client.subscribe(subscription, decode=False): - if uri_to_string(event.uri) in header_uris: + if event.uri.path.startswith("/header"): # Handle header events - typically these will be metadata or calibrations - self.header_deque.append((event, payload)) + self.add_header_event(event, payload) try: self.record_queue.put_nowait((event, payload)) except asyncio.QueueFull: @@ -228,21 +226,8 @@ async def subscribe_and_record( self.logger.warning("Invalid subscription: %s", query_service_name) continue client: EventClient = self.clients[query_service_name] - # Build a list of header URIs matching the service_name for this subscription - header_uris: list[str] = [] - for header_uri in self.recorder_config.header_uris: - header_dict: dict[str, str] = uri_query_to_dict(uri=header_uri) - header_service_name: str = header_dict["service_name"] - if header_service_name == query_service_name: - header_uris.append(uri_to_string(header_uri)) async_tasks.append( - asyncio.create_task( - self.subscribe( - client, - subscription, - header_uris, - ), - ), + asyncio.create_task(self.subscribe(client, subscription)), ) try: await asyncio.gather(*async_tasks) @@ -442,7 +427,7 @@ def add_header_event(self, event: event_pb2.Event, payload: bytes) -> None: raise TypeError(error_msg) self.header_events[uri_to_string(event.uri)] = (event, payload) if self._recorder is not None: - self._recorder.header_deque.append((event, payload)) + self._recorder.add_header_event(event, payload) def service_command(_args): From d3863d578cef4e48f6419b3c24b58054e4a780df Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Fri, 15 Dec 2023 14:45:10 -0500 Subject: [PATCH 24/25] Add handling of --max-file-mb=<###> arg in recorder EventServiceConfig --- py/farm_ng/core/event_service_recorder.py | 24 ++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/py/farm_ng/core/event_service_recorder.py b/py/farm_ng/core/event_service_recorder.py index b6f29f27..a656ee73 100644 --- a/py/farm_ng/core/event_service_recorder.py +++ b/py/farm_ng/core/event_service_recorder.py @@ -326,8 +326,30 @@ async def start_recording( config_list, list(self.header_events.values()), ) + if self._recorder.recorder_config is None: + msg = "recorder_config is None" + raise ValueError(msg) + + # Handle if the max_file_mb is set in the record_config args + # Expectation: single string of "--max-file-mb=500" or "--max-file-mb 500" + max_file_mb: int = 0 + for arg in self._recorder.recorder_config.args: + if arg.startswith("--max-file-mb"): + try: + eq_arg = arg.replace(" ", "=").strip() + max_file_mb = int(eq_arg.split("=")[1]) + self._recorder.logger.info("Setting max_file_mb to %s", max_file_mb) + except ValueError: + self._recorder.logger.exception( + "Failed to parse max_file_mb from %s", + eq_arg, + ) + self._recorder_task = asyncio.create_task( - self._recorder.subscribe_and_record(file_base=file_base), + self._recorder.subscribe_and_record( + file_base=file_base, + max_file_mb=max_file_mb, + ), ) def _safe_done_callback( From 130924ee36714afab020ad7d935d26f978cbcbff Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Fri, 15 Dec 2023 16:17:01 -0500 Subject: [PATCH 25/25] Meet new docstring formatting requirements --- py/farm_ng/core/event_service_recorder.py | 1 + py/farm_ng/core/events_file_writer.py | 8 ++++++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/py/farm_ng/core/event_service_recorder.py b/py/farm_ng/core/event_service_recorder.py index ef616c9b..a97b9df1 100644 --- a/py/farm_ng/core/event_service_recorder.py +++ b/py/farm_ng/core/event_service_recorder.py @@ -440,6 +440,7 @@ async def _request_reply_handler( def add_header_event(self, event: event_pb2.Event, payload: bytes) -> None: """Adds a header event to the header_events list. + If this is a duplicate (per URI), it will replace the existing message. Args: diff --git a/py/farm_ng/core/events_file_writer.py b/py/farm_ng/core/events_file_writer.py index 49e3aa02..402c6b96 100644 --- a/py/farm_ng/core/events_file_writer.py +++ b/py/farm_ng/core/events_file_writer.py @@ -191,8 +191,12 @@ def add_header_event( self.write_event_payload(event, payload) def write_header_events(self) -> None: - """Write the header events to the file, without getting stuck in a loop - if the headers are larger than the max file size.""" + """Write the header events to the file. + + NOTE: If the header events are too large to fit in the file, + this will raise a RuntimeError to avoid getting stuck in an infinite loop. + """ + true_max_file_length = self.max_file_length self._max_file_length = 0 for (event, payload) in self.header_events.values():