Skip to content

Commit

Permalink
Rename header_msgs -> header_events (more accurate)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hackerman342 committed Dec 15, 2023
1 parent c09db6c commit 84ae0d0
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 43 deletions.
39 changes: 20 additions & 19 deletions py/farm_ng/core/event_service_recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,14 @@ def __init__(
self,
service_name: str,
config_list: EventServiceConfigList,
header_msgs: list[tuple[event_pb2.Event, bytes]] | None = None,
header_events: list[tuple[event_pb2.Event, bytes]] | None = None,
) -> None:
"""Initializes the service.
Args:
service_name (str): the name of the service.
config_list (EventServiceConfigList): the configuration data structure.
header_events (list[tuple[event_pb2.Event, bytes]], optional): the initial list header events.
"""
self.service_name: str = service_name
self.config_list: EventServiceConfigList = config_list
Expand Down Expand Up @@ -102,12 +103,12 @@ def __init__(
maxsize=self.QUEUE_MAX_SIZE,
)
self.header_deque: deque[tuple[event_pb2.Event, bytes]] = deque()
if header_msgs is not None:
for event, payload in header_msgs:
self.add_header_msg(event, payload)
if header_events is not None:
for event, payload in header_events:
self.add_header_event(event, payload)

def add_header_msg(self, event: event_pb2.Event, payload: bytes) -> None:
"""Add a header message to the header_deque.
def add_header_event(self, event: event_pb2.Event, payload: bytes) -> None:
"""Add a header event to the header_deque.
Args:
event: Event to add.
Expand Down Expand Up @@ -148,16 +149,16 @@ async def record(
file_base,
extension,
max_file_mb,
header_msgs=list(self.header_deque),
header_events=list(self.header_deque),
) as writer:
self.header_deque.clear()
event: event_pb2.Event
payload: bytes
while True:
# Add any header messages added during recording
# Add any header events added during recording
while self.header_deque:
event, payload = self.header_deque.popleft()
writer.add_header_msg(event, payload, write=True)
writer.add_header_event(event, payload, write=True)
# await a new event and payload, and write it to the file
event, payload = await self.record_queue.get()
event.timestamps.append(
Expand All @@ -176,13 +177,13 @@ async def subscribe(
Args:
client (EventClient): the client to subscribe to.
subscription (SubscribeRequest): the subscription request.
header_uris (list[str]): the list of URIs (as strings) to consider as header messages.
header_uris (list[str]): the list of URIs (as strings) to consider as header events.
"""
event: event_pb2.Event
payload: bytes
async for event, payload in client.subscribe(subscription, decode=False):
if uri_to_string(event.uri) in header_uris:
# Handle header messages - typically these will be metadata or calibrations
# Handle header events - typically these will be metadata or calibrations
self.header_deque.append((event, payload))
try:
self.record_queue.put_nowait((event, payload))
Expand Down Expand Up @@ -311,8 +312,8 @@ def __init__(self, event_service: EventServiceGrpc) -> None:
# the recorder task
self._recorder_task: asyncio.Task | None = None

# For tracking header messages (e.g. metadata, calibrations) to be logged in the recordings
self.header_msgs: dict[str, tuple[event_pb2.Event, bytes]] = {}
# For tracking header events (e.g. metadata, calibrations) to be logged in the recordings
self.header_events: dict[str, tuple[event_pb2.Event, bytes]] = {}

# public methods

Expand All @@ -338,7 +339,7 @@ async def start_recording(
self._recorder = EventServiceRecorder(
config_name or "record_default",
config_list,
list(self.header_msgs.values()),
list(self.header_events.values()),
)
self._recorder_task = asyncio.create_task(
self._recorder.subscribe_and_record(file_base=file_base),
Expand Down Expand Up @@ -417,16 +418,16 @@ async def _request_reply_handler(
payload_to_protobuf(request.event, request.payload),
)
self._event_service.logger.info("with uri:\n%s", request.event.uri)
self.add_header_msg(request.event, request.payload)
self.add_header_event(request.event, request.payload)
elif cmd == "/clear_headers":
self._event_service.logger.info("/clear_headers")
self.header_msgs.clear()
self.header_events.clear()
if self._recorder is not None:
self._recorder.header_deque.clear()
return Empty()

def add_header_msg(self, event: event_pb2.Event, payload: bytes) -> None:
"""Adds a header message to the header_msgs list.
def add_header_event(self, event: event_pb2.Event, payload: bytes) -> None:
"""Adds a header event to the header_events list.
If this is a duplicate (per URI), it will replace the existing message.
Args:
Expand All @@ -439,7 +440,7 @@ def add_header_msg(self, event: event_pb2.Event, payload: bytes) -> None:
if not isinstance(payload, bytes):
error_msg = f"header payload must be bytes, not {type(payload)}"
raise TypeError(error_msg)
self.header_msgs[uri_to_string(event.uri)] = (event, payload)
self.header_events[uri_to_string(event.uri)] = (event, payload)
if self._recorder is not None:
self._recorder.header_deque.append((event, payload))

Expand Down
45 changes: 25 additions & 20 deletions py/farm_ng/core/events_file_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,15 @@ def __init__(
file_base: str | Path,
extension: str = ".bin",
max_file_mb: int = 0,
header_msgs: list[tuple[Event, bytes]] | None = None,
header_events: list[tuple[Event, bytes]] | None = None,
) -> None:
"""Create a new EventsFileWriter.
Args:
file_base: Path to and base of file name (without extension) where the events file will be logged.
extension: Extension of the file to be logged. E.g., '.bin' or '.log'
max_file_mb: Maximum log size in MB. Logging will roll over to new file when reached. Ignored if <= 0.
header_msgs: Tuples of events & payloads to include in every split of the log file
header_events: Tuples of events & payloads to include in every split of the log file
"""
if isinstance(file_base, str):
file_base = Path(file_base)
Expand All @@ -82,9 +82,9 @@ def __init__(
self._max_file_length = int(max(0, max_file_mb) * 1e6)
self._file_idx: int = 0

self._header_msgs: dict[str, tuple[Event, bytes]] = {}
for (event, payload) in header_msgs or []:
self.add_header_msg(event, payload)
self._header_events: dict[str, tuple[Event, bytes]] = {}
for (event, payload) in header_events or []:
self.add_header_event(event, payload)

def __enter__(self) -> EventsFileWriter:
"""Open the file for writing and return self."""
Expand Down Expand Up @@ -142,54 +142,59 @@ def _increment_file_idx(self) -> None:
self._file_idx += 1

@property
def header_msgs(self) -> dict[str, tuple[Event, bytes]]:
"""Return the list of header messages.
def header_events(self) -> dict[str, tuple[Event, bytes]]:
"""Return the dictionary of header events.
Returns:
dict[str, tuple[Event, bytes]]: Dictionary of header messages.
dict[str, tuple[Event, bytes]]: Dictionary of header events and payloads.
key: string representation of the uri
value: tuple of event and payload
"""
return self._header_msgs
return self._header_events

def add_header_msg(self, event: Event, payload: bytes, write: bool = False) -> None:
"""Add a header message, and optionally writes it to the file.
def add_header_event(
self,
event: Event,
payload: bytes,
write: bool = False,
) -> None:
"""Add a header event, and optionally writes it to the file.
NOTE: Writing to file will fail if the file is not open.
Args:
event: Event to write.
payload: Payload to write.
write: If True, write the header message to the file. Defaults to False.
write: If True, write the header event to the file. Defaults to False.
"""
if not isinstance(event, Event):
error_msg = f"header event must be Event, not {type(event)}"
raise TypeError(error_msg)
if not isinstance(payload, bytes):
error_msg = f"header payload must be bytes, not {type(payload)}"
raise TypeError(error_msg)
# Once a header message is written to the file, it cannot be changed
if uri_to_string(event.uri) not in self.header_msgs:
self._header_msgs[uri_to_string(event.uri)] = (event, payload)
# Once a header event is written to the file, it cannot be changed
if uri_to_string(event.uri) not in self.header_events:
self._header_events[uri_to_string(event.uri)] = (event, payload)
if write:
self.write_event_payload(event, payload)

def write_header_msgs(self) -> None:
"""Write the header messages to the file, without getting stuck in a loop
def write_header_events(self) -> None:
"""Write the header events to the file, without getting stuck in a loop
if the headers are larger than the max file size."""
true_max_file_length = self.max_file_length
self._max_file_length = 0
for (event, payload) in self.header_msgs.values():
for (event, payload) in self.header_events.values():
self.write_event_payload(event, payload)
self._max_file_length = true_max_file_length
if self.max_file_length and self.file_length > self.max_file_length:
msg = f"Header messages are too large to fit in a file of size {self.max_file_length}"
msg = f"Header events are too large to fit in a file of size {self.max_file_length}"
raise RuntimeError(msg)

def open(self) -> bool:
"""Open the file for writing. Return True if successful."""
self._file_stream = Path(self.file_name).open("wb")
self._file_length = 0
self.write_header_msgs()
self.write_header_events()
return self.is_open()

def close(self) -> bool:
Expand Down
8 changes: 4 additions & 4 deletions py/tests/test_events_file_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def test_headers(self, file_base: Path):
approx_header_size: int = 0
max_file_bytes: int = 10000

# Create the header messages
# Create the header events
for i in range(header_count):
message = Int32Value(value=i)
event_payload: bytes = message.SerializeToString()
Expand Down Expand Up @@ -169,7 +169,7 @@ def test_headers(self, file_base: Path):
with EventsFileWriter(
file_base=file_base,
max_file_mb=max_file_bytes * 1e-6,
header_msgs=headers,
header_events=headers,
) as opened_writer:
assert opened_writer.file_idx == 0
assert opened_writer.file_length == pytest.approx(
Expand All @@ -182,7 +182,7 @@ def test_headers(self, file_base: Path):
with EventsFileWriter(
file_base=file_base,
max_file_mb=(approx_header_size / 2) * 1e-6,
header_msgs=headers,
header_events=headers,
) as opened_writer:
pass

Expand All @@ -191,7 +191,7 @@ def test_headers(self, file_base: Path):
with EventsFileWriter(
file_base=file_base,
max_file_mb=max_file_bytes * 1e-6,
header_msgs=headers,
header_events=headers,
) as opened_writer:
assert opened_writer.file_idx == 0
assert opened_writer.file_length == pytest.approx(
Expand Down

0 comments on commit 84ae0d0

Please sign in to comment.