Skip to content

Commit

Permalink
Add more type checking on header events / payloads
Browse files Browse the repository at this point in the history
  • Loading branch information
Hackerman342 committed Dec 15, 2023
1 parent 03293c2 commit c09db6c
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 6 deletions.
28 changes: 25 additions & 3 deletions py/farm_ng/core/event_service_recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from typing import TYPE_CHECKING

import grpc
from farm_ng.core import event_pb2
from farm_ng.core.event_client import EventClient
from farm_ng.core.event_service import (
EventServiceGrpc,
Expand All @@ -49,7 +50,6 @@
from google.protobuf.wrappers_pb2 import StringValue

if TYPE_CHECKING:
from farm_ng.core import event_pb2
from google.protobuf.message import Message

__all__ = ["EventServiceRecorder", "RecorderService"]
Expand Down Expand Up @@ -102,8 +102,24 @@ def __init__(
maxsize=self.QUEUE_MAX_SIZE,
)
self.header_deque: deque[tuple[event_pb2.Event, bytes]] = deque()
if header_msgs:
self.header_deque.extend(header_msgs)
if header_msgs is not None:
for event, payload in header_msgs:
self.add_header_msg(event, payload)

def add_header_msg(self, event: event_pb2.Event, payload: bytes) -> None:
"""Add a header message to the header_deque.
Args:
event: Event to add.
payload: Payload to add.
"""
if not isinstance(event, event_pb2.Event):
error_msg = f"header event must be Event, not {type(event)}"
raise TypeError(error_msg)
if not isinstance(payload, bytes):
error_msg = f"header payload must be bytes, not {type(payload)}"
raise TypeError(error_msg)
self.header_deque.append((event, payload))

@property
def logger(self) -> logging.Logger:
Expand Down Expand Up @@ -417,6 +433,12 @@ def add_header_msg(self, event: event_pb2.Event, payload: bytes) -> None:
event (event_pb2.Event): the event.
payload (bytes): the payload.
"""
if not isinstance(event, event_pb2.Event):
error_msg = f"header event must be Event, not {type(event)}"
raise TypeError(error_msg)
if not isinstance(payload, bytes):
error_msg = f"header payload must be bytes, not {type(payload)}"
raise TypeError(error_msg)
self.header_msgs[uri_to_string(event.uri)] = (event, payload)
if self._recorder is not None:
self._recorder.header_deque.append((event, payload))
Expand Down
7 changes: 4 additions & 3 deletions py/farm_ng/core/events_file_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ def _increment_file_idx(self) -> None:
@property
def header_msgs(self) -> dict[str, tuple[Event, bytes]]:
"""Return the list of header messages.
Returns:
dict[str, tuple[Event, bytes]]: Dictionary of header messages.
key: string representation of the uri
Expand All @@ -153,18 +154,18 @@ def header_msgs(self) -> dict[str, tuple[Event, bytes]]:

def add_header_msg(self, event: Event, payload: bytes, write: bool = False) -> None:
"""Add a header message, and optionally writes it to the file.
NOTE: Writing to file will fail if the file is not open.
NOTE: Writing to file will fail if the file is not open.
Args:
event: Event to write.
payload: Payload to write.
write: If True, write the header message to the file. Defaults to False.
"""
if not isinstance(event, Event):
error_msg = f"event must be Event, not {type(event)}"
error_msg = f"header event must be Event, not {type(event)}"
raise TypeError(error_msg)
if not isinstance(payload, bytes):
error_msg = f"payload must be bytes, not {type(payload)}"
error_msg = f"header payload must be bytes, not {type(payload)}"
raise TypeError(error_msg)
# Once a header message is written to the file, it cannot be changed
if uri_to_string(event.uri) not in self.header_msgs:
Expand Down

0 comments on commit c09db6c

Please sign in to comment.