diff --git a/py/farm_ng/core/event_service_recorder.py b/py/farm_ng/core/event_service_recorder.py index 31eec166..e525678a 100644 --- a/py/farm_ng/core/event_service_recorder.py +++ b/py/farm_ng/core/event_service_recorder.py @@ -65,13 +65,14 @@ def __init__( self, service_name: str, config_list: EventServiceConfigList, - header_msgs: list[tuple[event_pb2.Event, bytes]] | None = None, + header_events: list[tuple[event_pb2.Event, bytes]] | None = None, ) -> None: """Initializes the service. Args: service_name (str): the name of the service. config_list (EventServiceConfigList): the configuration data structure. + header_events (list[tuple[event_pb2.Event, bytes]], optional): the initial list header events. """ self.service_name: str = service_name self.config_list: EventServiceConfigList = config_list @@ -102,12 +103,12 @@ def __init__( maxsize=self.QUEUE_MAX_SIZE, ) self.header_deque: deque[tuple[event_pb2.Event, bytes]] = deque() - if header_msgs is not None: - for event, payload in header_msgs: - self.add_header_msg(event, payload) + if header_events is not None: + for event, payload in header_events: + self.add_header_event(event, payload) - def add_header_msg(self, event: event_pb2.Event, payload: bytes) -> None: - """Add a header message to the header_deque. + def add_header_event(self, event: event_pb2.Event, payload: bytes) -> None: + """Add a header event to the header_deque. Args: event: Event to add. @@ -148,16 +149,16 @@ async def record( file_base, extension, max_file_mb, - header_msgs=list(self.header_deque), + header_events=list(self.header_deque), ) as writer: self.header_deque.clear() event: event_pb2.Event payload: bytes while True: - # Add any header messages added during recording + # Add any header events added during recording while self.header_deque: event, payload = self.header_deque.popleft() - writer.add_header_msg(event, payload, write=True) + writer.add_header_event(event, payload, write=True) # await a new event and payload, and write it to the file event, payload = await self.record_queue.get() event.timestamps.append( @@ -176,13 +177,13 @@ async def subscribe( Args: client (EventClient): the client to subscribe to. subscription (SubscribeRequest): the subscription request. - header_uris (list[str]): the list of URIs (as strings) to consider as header messages. + header_uris (list[str]): the list of URIs (as strings) to consider as header events. """ event: event_pb2.Event payload: bytes async for event, payload in client.subscribe(subscription, decode=False): if uri_to_string(event.uri) in header_uris: - # Handle header messages - typically these will be metadata or calibrations + # Handle header events - typically these will be metadata or calibrations self.header_deque.append((event, payload)) try: self.record_queue.put_nowait((event, payload)) @@ -311,8 +312,8 @@ def __init__(self, event_service: EventServiceGrpc) -> None: # the recorder task self._recorder_task: asyncio.Task | None = None - # For tracking header messages (e.g. metadata, calibrations) to be logged in the recordings - self.header_msgs: dict[str, tuple[event_pb2.Event, bytes]] = {} + # For tracking header events (e.g. metadata, calibrations) to be logged in the recordings + self.header_events: dict[str, tuple[event_pb2.Event, bytes]] = {} # public methods @@ -338,7 +339,7 @@ async def start_recording( self._recorder = EventServiceRecorder( config_name or "record_default", config_list, - list(self.header_msgs.values()), + list(self.header_events.values()), ) self._recorder_task = asyncio.create_task( self._recorder.subscribe_and_record(file_base=file_base), @@ -417,16 +418,16 @@ async def _request_reply_handler( payload_to_protobuf(request.event, request.payload), ) self._event_service.logger.info("with uri:\n%s", request.event.uri) - self.add_header_msg(request.event, request.payload) + self.add_header_event(request.event, request.payload) elif cmd == "/clear_headers": self._event_service.logger.info("/clear_headers") - self.header_msgs.clear() + self.header_events.clear() if self._recorder is not None: self._recorder.header_deque.clear() return Empty() - def add_header_msg(self, event: event_pb2.Event, payload: bytes) -> None: - """Adds a header message to the header_msgs list. + def add_header_event(self, event: event_pb2.Event, payload: bytes) -> None: + """Adds a header event to the header_events list. If this is a duplicate (per URI), it will replace the existing message. Args: @@ -439,7 +440,7 @@ def add_header_msg(self, event: event_pb2.Event, payload: bytes) -> None: if not isinstance(payload, bytes): error_msg = f"header payload must be bytes, not {type(payload)}" raise TypeError(error_msg) - self.header_msgs[uri_to_string(event.uri)] = (event, payload) + self.header_events[uri_to_string(event.uri)] = (event, payload) if self._recorder is not None: self._recorder.header_deque.append((event, payload)) diff --git a/py/farm_ng/core/events_file_writer.py b/py/farm_ng/core/events_file_writer.py index 019b8a3d..682eaa4e 100644 --- a/py/farm_ng/core/events_file_writer.py +++ b/py/farm_ng/core/events_file_writer.py @@ -57,7 +57,7 @@ def __init__( file_base: str | Path, extension: str = ".bin", max_file_mb: int = 0, - header_msgs: list[tuple[Event, bytes]] | None = None, + header_events: list[tuple[Event, bytes]] | None = None, ) -> None: """Create a new EventsFileWriter. @@ -65,7 +65,7 @@ def __init__( file_base: Path to and base of file name (without extension) where the events file will be logged. extension: Extension of the file to be logged. E.g., '.bin' or '.log' max_file_mb: Maximum log size in MB. Logging will roll over to new file when reached. Ignored if <= 0. - header_msgs: Tuples of events & payloads to include in every split of the log file + header_events: Tuples of events & payloads to include in every split of the log file """ if isinstance(file_base, str): file_base = Path(file_base) @@ -82,9 +82,9 @@ def __init__( self._max_file_length = int(max(0, max_file_mb) * 1e6) self._file_idx: int = 0 - self._header_msgs: dict[str, tuple[Event, bytes]] = {} - for (event, payload) in header_msgs or []: - self.add_header_msg(event, payload) + self._header_events: dict[str, tuple[Event, bytes]] = {} + for (event, payload) in header_events or []: + self.add_header_event(event, payload) def __enter__(self) -> EventsFileWriter: """Open the file for writing and return self.""" @@ -142,24 +142,29 @@ def _increment_file_idx(self) -> None: self._file_idx += 1 @property - def header_msgs(self) -> dict[str, tuple[Event, bytes]]: - """Return the list of header messages. + def header_events(self) -> dict[str, tuple[Event, bytes]]: + """Return the dictionary of header events. Returns: - dict[str, tuple[Event, bytes]]: Dictionary of header messages. + dict[str, tuple[Event, bytes]]: Dictionary of header events and payloads. key: string representation of the uri value: tuple of event and payload """ - return self._header_msgs + return self._header_events - def add_header_msg(self, event: Event, payload: bytes, write: bool = False) -> None: - """Add a header message, and optionally writes it to the file. + def add_header_event( + self, + event: Event, + payload: bytes, + write: bool = False, + ) -> None: + """Add a header event, and optionally writes it to the file. NOTE: Writing to file will fail if the file is not open. Args: event: Event to write. payload: Payload to write. - write: If True, write the header message to the file. Defaults to False. + write: If True, write the header event to the file. Defaults to False. """ if not isinstance(event, Event): error_msg = f"header event must be Event, not {type(event)}" @@ -167,29 +172,29 @@ def add_header_msg(self, event: Event, payload: bytes, write: bool = False) -> N if not isinstance(payload, bytes): error_msg = f"header payload must be bytes, not {type(payload)}" raise TypeError(error_msg) - # Once a header message is written to the file, it cannot be changed - if uri_to_string(event.uri) not in self.header_msgs: - self._header_msgs[uri_to_string(event.uri)] = (event, payload) + # Once a header event is written to the file, it cannot be changed + if uri_to_string(event.uri) not in self.header_events: + self._header_events[uri_to_string(event.uri)] = (event, payload) if write: self.write_event_payload(event, payload) - def write_header_msgs(self) -> None: - """Write the header messages to the file, without getting stuck in a loop + def write_header_events(self) -> None: + """Write the header events to the file, without getting stuck in a loop if the headers are larger than the max file size.""" true_max_file_length = self.max_file_length self._max_file_length = 0 - for (event, payload) in self.header_msgs.values(): + for (event, payload) in self.header_events.values(): self.write_event_payload(event, payload) self._max_file_length = true_max_file_length if self.max_file_length and self.file_length > self.max_file_length: - msg = f"Header messages are too large to fit in a file of size {self.max_file_length}" + msg = f"Header events are too large to fit in a file of size {self.max_file_length}" raise RuntimeError(msg) def open(self) -> bool: """Open the file for writing. Return True if successful.""" self._file_stream = Path(self.file_name).open("wb") self._file_length = 0 - self.write_header_msgs() + self.write_header_events() return self.is_open() def close(self) -> bool: diff --git a/py/tests/test_events_file_writer.py b/py/tests/test_events_file_writer.py index e88e61ca..643b185d 100644 --- a/py/tests/test_events_file_writer.py +++ b/py/tests/test_events_file_writer.py @@ -135,7 +135,7 @@ def test_headers(self, file_base: Path): approx_header_size: int = 0 max_file_bytes: int = 10000 - # Create the header messages + # Create the header events for i in range(header_count): message = Int32Value(value=i) event_payload: bytes = message.SerializeToString() @@ -169,7 +169,7 @@ def test_headers(self, file_base: Path): with EventsFileWriter( file_base=file_base, max_file_mb=max_file_bytes * 1e-6, - header_msgs=headers, + header_events=headers, ) as opened_writer: assert opened_writer.file_idx == 0 assert opened_writer.file_length == pytest.approx( @@ -182,7 +182,7 @@ def test_headers(self, file_base: Path): with EventsFileWriter( file_base=file_base, max_file_mb=(approx_header_size / 2) * 1e-6, - header_msgs=headers, + header_events=headers, ) as opened_writer: pass @@ -191,7 +191,7 @@ def test_headers(self, file_base: Path): with EventsFileWriter( file_base=file_base, max_file_mb=max_file_bytes * 1e-6, - header_msgs=headers, + header_events=headers, ) as opened_writer: assert opened_writer.file_idx == 0 assert opened_writer.file_length == pytest.approx(