From 3e1c9572836c268d4e41bc29e72f4cdc57c6affe Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Fri, 15 Dec 2023 14:12:20 -0500 Subject: [PATCH] Simplification - just start uri path's with /header --- protos/farm_ng/core/event_service.proto | 2 -- py/farm_ng/core/event_service_recorder.py | 23 ++++------------------- 2 files changed, 4 insertions(+), 21 deletions(-) diff --git a/protos/farm_ng/core/event_service.proto b/protos/farm_ng/core/event_service.proto index 7b1fdc98..1cd0a34e 100644 --- a/protos/farm_ng/core/event_service.proto +++ b/protos/farm_ng/core/event_service.proto @@ -104,8 +104,6 @@ message EventServiceConfig { DEBUG = 10; } LogLevel log_level = 7; - // URIs to treat as headers for the EventServiceRecorder - repeated Uri header_uris = 8; } // list of event service configurations diff --git a/py/farm_ng/core/event_service_recorder.py b/py/farm_ng/core/event_service_recorder.py index e525678a..b6f29f27 100644 --- a/py/farm_ng/core/event_service_recorder.py +++ b/py/farm_ng/core/event_service_recorder.py @@ -170,21 +170,19 @@ async def subscribe( self, client: EventClient, subscription: SubscribeRequest, - header_uris: list[str], ) -> None: """Subscribes to a service and puts the events in the queue. Args: client (EventClient): the client to subscribe to. subscription (SubscribeRequest): the subscription request. - header_uris (list[str]): the list of URIs (as strings) to consider as header events. """ event: event_pb2.Event payload: bytes async for event, payload in client.subscribe(subscription, decode=False): - if uri_to_string(event.uri) in header_uris: + if event.uri.path.startswith("/header"): # Handle header events - typically these will be metadata or calibrations - self.header_deque.append((event, payload)) + self.add_header_event(event, payload) try: self.record_queue.put_nowait((event, payload)) except asyncio.QueueFull: @@ -228,21 +226,8 @@ async def subscribe_and_record( self.logger.warning("Invalid subscription: %s", query_service_name) continue client: EventClient = self.clients[query_service_name] - # Build a list of header URIs matching the service_name for this subscription - header_uris: list[str] = [] - for header_uri in self.recorder_config.header_uris: - header_dict: dict[str, str] = uri_query_to_dict(uri=header_uri) - header_service_name: str = header_dict["service_name"] - if header_service_name == query_service_name: - header_uris.append(uri_to_string(header_uri)) async_tasks.append( - asyncio.create_task( - self.subscribe( - client, - subscription, - header_uris, - ), - ), + asyncio.create_task(self.subscribe(client, subscription)), ) try: await asyncio.gather(*async_tasks) @@ -442,7 +427,7 @@ def add_header_event(self, event: event_pb2.Event, payload: bytes) -> None: raise TypeError(error_msg) self.header_events[uri_to_string(event.uri)] = (event, payload) if self._recorder is not None: - self._recorder.header_deque.append((event, payload)) + self._recorder.add_header_event(event, payload) def service_command(_args):