Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: reduce pod and communication dependencies #43

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
35 changes: 9 additions & 26 deletions bluesky_adaptive/adjudicators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,28 +35,24 @@ def append(self, d):
self._set.remove(discarded)


class AdjudicatorBase(BlueskyConsumer, ABC):
class AdjudicatorBase(ABC):
"""
An agent adjudicator that listens to published suggestions by agents.
This Base approach (as per `process_document`) only retains the most recent suggestions by any named agents.
Other mechanisms for tracking can be provided as in example sub-classes.

Parameters
----------
topics : list of str
List of existing_topics as strings such as ["topic-1", "topic-2"]
bootstrap_servers : str
Comma-delimited list of Kafka server addresses as a string
such as ``'broker1:9092,broker2:9092,127.0.0.1:9092'``
group_id : str
Required string identifier for the consumer's Kafka Consumer group.
consumer : BlueskyConsumer
Consumer object to listen for suggestions from agents.
"""

_register_method = BaseAgent._register_method
_register_property = BaseAgent._register_property

def __init__(self, topics: list[str], bootstrap_servers: str, group_id: str, *args, **kwargs):
super().__init__(topics, bootstrap_servers, group_id, *args, **kwargs)
def __init__(self, consumer: BlueskyConsumer):
self._consumer = consumer
self._consumer.process_document = self.process_document
self._lock = Lock()
self._thread = None
self._current_suggestions = {} # agent_name: AdjudicatorMsg
Expand All @@ -70,10 +66,10 @@ def __init__(self, topics: list[str], bootstrap_servers: str, group_id: str, *ar

def start(self, *args, **kwargs):
self._thread = Thread(
target=BlueskyConsumer.start,
target=self._consumer.start,
name="adjudicator-loop",
daemon=True,
args=[self] + list(args),
args=list(args),
kwargs=kwargs,
)
self._thread.start()
Expand Down Expand Up @@ -195,16 +191,6 @@ class NonredundantAdjudicator(AdjudicatorBase):

Parameters
----------
topics : list of str
List of existing_topics as strings such as ["topic-1", "topic-2"]
bootstrap_servers : str
Comma-delimited list of Kafka server addresses as a string
such as ``'broker1:9092,broker2:9092,127.0.0.1:9092'``
group_id : str
Required string identifier for the consumer's Kafka Consumer group.
qservers : dict[str, API_Threads_Mixin]
Dictionary of objects to manage communication with Queue Server. These should be keyed by the beamline TLA
expected in AdjudicatorMsg.suggestions dictionary.
hash_suggestion : Callable
Function that takes the tla and Suggestion object, and returns a hashable object as ::

Expand All @@ -222,15 +208,12 @@ def hash_suggestion(tla: str, suggestion: Suggestion) -> Hashable: ...

def __init__(
self,
topics: list[str],
bootstrap_servers: str,
group_id: str,
*args,
qservers: dict[str, API_Threads_Mixin],
hash_suggestion: Callable,
**kwargs,
):
super().__init__(topics, bootstrap_servers, group_id, *args, **kwargs)
super().__init__(*args, **kwargs)
self.hash_suggestion = hash_suggestion
self.suggestion_set = set()
self._re_managers = qservers
Expand Down
7 changes: 6 additions & 1 deletion bluesky_adaptive/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from bluesky_kafka.tests.conftest import consume_documents_from_kafka_until_first_stop_document # noqa

from ophyd.tests.conftest import hw # noqa

from databroker import temp
from tiled.client import from_profile


Expand All @@ -30,3 +30,8 @@ def tiled_profile():
@pytest.fixture(scope="module")
def tiled_node():
return from_profile("testing_sandbox")


@pytest.fixture(scope="function")
def catalog():
return temp().v2
Loading
Loading