Skip to content

Commit

Permalink
Simplification - just start uri path's with /header
Browse files Browse the repository at this point in the history
  • Loading branch information
Hackerman342 committed Dec 15, 2023
1 parent e790025 commit 3e1c957
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 21 deletions.
2 changes: 0 additions & 2 deletions protos/farm_ng/core/event_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,6 @@ message EventServiceConfig {
DEBUG = 10;
}
LogLevel log_level = 7;
// URIs to treat as headers for the EventServiceRecorder
repeated Uri header_uris = 8;
}

// list of event service configurations
Expand Down
23 changes: 4 additions & 19 deletions py/farm_ng/core/event_service_recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,21 +170,19 @@ async def subscribe(
self,
client: EventClient,
subscription: SubscribeRequest,
header_uris: list[str],
) -> None:
"""Subscribes to a service and puts the events in the queue.
Args:
client (EventClient): the client to subscribe to.
subscription (SubscribeRequest): the subscription request.
header_uris (list[str]): the list of URIs (as strings) to consider as header events.
"""
event: event_pb2.Event
payload: bytes
async for event, payload in client.subscribe(subscription, decode=False):
if uri_to_string(event.uri) in header_uris:
if event.uri.path.startswith("/header"):
# Handle header events - typically these will be metadata or calibrations
self.header_deque.append((event, payload))
self.add_header_event(event, payload)
try:
self.record_queue.put_nowait((event, payload))
except asyncio.QueueFull:
Expand Down Expand Up @@ -228,21 +226,8 @@ async def subscribe_and_record(
self.logger.warning("Invalid subscription: %s", query_service_name)
continue
client: EventClient = self.clients[query_service_name]
# Build a list of header URIs matching the service_name for this subscription
header_uris: list[str] = []
for header_uri in self.recorder_config.header_uris:
header_dict: dict[str, str] = uri_query_to_dict(uri=header_uri)
header_service_name: str = header_dict["service_name"]
if header_service_name == query_service_name:
header_uris.append(uri_to_string(header_uri))
async_tasks.append(
asyncio.create_task(
self.subscribe(
client,
subscription,
header_uris,
),
),
asyncio.create_task(self.subscribe(client, subscription)),
)
try:
await asyncio.gather(*async_tasks)
Expand Down Expand Up @@ -442,7 +427,7 @@ def add_header_event(self, event: event_pb2.Event, payload: bytes) -> None:
raise TypeError(error_msg)
self.header_events[uri_to_string(event.uri)] = (event, payload)
if self._recorder is not None:
self._recorder.header_deque.append((event, payload))
self._recorder.add_header_event(event, payload)


def service_command(_args):
Expand Down

0 comments on commit 3e1c957

Please sign in to comment.