From c03464e231614463a86c1a269c6a8e05548eb7ae Mon Sep 17 00:00:00 2001 From: Abe Coull Date: Mon, 20 Nov 2023 10:56:59 -0800 Subject: [PATCH 1/4] feat: add logging for queue depth --- src/braket/aws/aws_quantum_job.py | 19 ++++-- src/braket/aws/aws_quantum_task.py | 14 +++++ src/braket/jobs/hybrid_job.py | 7 ++- src/braket/jobs/logs.py | 11 +++- .../braket/aws/test_aws_quantum_job.py | 61 +++++++++++++++++++ .../braket/aws/test_aws_quantum_task.py | 59 ++++++++++++++++++ 6 files changed, 165 insertions(+), 6 deletions(-) diff --git a/src/braket/aws/aws_quantum_job.py b/src/braket/aws/aws_quantum_job.py index 8ae4bc129..c16480730 100644 --- a/src/braket/aws/aws_quantum_job.py +++ b/src/braket/aws/aws_quantum_job.py @@ -81,6 +81,7 @@ def create( aws_session: AwsSession | None = None, tags: dict[str, str] | None = None, logger: Logger = getLogger(__name__), + quiet: bool = False, ) -> AwsQuantumJob: """Creates a hybrid job by invoking the Braket CreateJob API. @@ -175,6 +176,9 @@ def create( while waiting for quantum task to be in a terminal state. Default is `getLogger(__name__)` + quiet (bool): Sets the verbosity of the logger to low and does not report queue + position. Default is `False`. + Returns: AwsQuantumJob: Hybrid job tracking the execution on Amazon Braket. @@ -204,7 +208,7 @@ def create( ) job_arn = aws_session.create_job(**create_job_kwargs) - job = AwsQuantumJob(job_arn, aws_session) + job = AwsQuantumJob(job_arn, aws_session, quiet) if wait_until_complete: print(f"Initializing Braket Job: {job_arn}") @@ -212,15 +216,18 @@ def create( return job - def __init__(self, arn: str, aws_session: AwsSession | None = None): + def __init__(self, arn: str, aws_session: AwsSession | None = None, quiet: bool = False): """ Args: arn (str): The ARN of the hybrid job. aws_session (AwsSession | None): The `AwsSession` for connecting to AWS services. Default is `None`, in which case an `AwsSession` object will be created with the region of the hybrid job. + quiet (bool): Sets the verbosity of the logger to low and does not report queue + position. Default is `False`. """ self._arn: str = arn + self._quiet = quiet if aws_session: if not self._is_valid_aws_session_region_for_job_arn(aws_session, arn): raise ValueError( @@ -365,10 +372,11 @@ def logs(self, wait: bool = False, poll_interval_seconds: int = 5) -> None: instance_count = self.metadata(use_cached_value=True)["instanceConfig"]["instanceCount"] has_streams = False color_wrap = logs.ColorWrap() + previous_state = self.state() while True: time.sleep(poll_interval_seconds) - + current_state = self.state() has_streams = logs.flush_log_streams( self._aws_session, log_group, @@ -378,14 +386,17 @@ def logs(self, wait: bool = False, poll_interval_seconds: int = 5) -> None: instance_count, has_streams, color_wrap, + [previous_state, current_state], + self.queue_position().queue_position if not self._quiet else None, ) + previous_state = current_state if log_state == AwsQuantumJob.LogState.COMPLETE: break if log_state == AwsQuantumJob.LogState.JOB_COMPLETE: log_state = AwsQuantumJob.LogState.COMPLETE - elif self.state() in AwsQuantumJob.TERMINAL_STATES: + elif current_state in AwsQuantumJob.TERMINAL_STATES: log_state = AwsQuantumJob.LogState.JOB_COMPLETE def metadata(self, use_cached_value: bool = False) -> dict[str, Any]: diff --git a/src/braket/aws/aws_quantum_task.py b/src/braket/aws/aws_quantum_task.py index fa58d911d..6ad07c196 100644 --- a/src/braket/aws/aws_quantum_task.py +++ b/src/braket/aws/aws_quantum_task.py @@ -105,6 +105,7 @@ def create( tags: dict[str, str] | None = None, inputs: dict[str, float] | None = None, gate_definitions: Optional[dict[tuple[Gate, QubitSet], PulseSequence]] | None = None, + quiet: bool = False, *args, **kwargs, ) -> AwsQuantumTask: @@ -151,6 +152,9 @@ def create( a `PulseSequence`. Default: None. + quiet (bool): Sets the verbosity of the logger to low and does not report queue + position. Default is `False`. + Returns: AwsQuantumTask: AwsQuantumTask tracking the quantum task execution on the device. @@ -196,6 +200,7 @@ def create( disable_qubit_rewiring, inputs, gate_definitions=gate_definitions, + quiet=quiet, *args, **kwargs, ) @@ -207,6 +212,7 @@ def __init__( poll_timeout_seconds: float = DEFAULT_RESULTS_POLL_TIMEOUT, poll_interval_seconds: float = DEFAULT_RESULTS_POLL_INTERVAL, logger: Logger = getLogger(__name__), + quiet: bool = False, ): """ Args: @@ -219,6 +225,8 @@ def __init__( logger (Logger): Logger object with which to write logs, such as quantum task statuses while waiting for quantum task to be in a terminal state. Default is `getLogger(__name__)` + quiet (bool): Sets the verbosity of the logger to low and does not report queue + position. Default is `False`. Examples: >>> task = AwsQuantumTask(arn='task_arn') @@ -240,6 +248,7 @@ def __init__( self._poll_interval_seconds = poll_interval_seconds self._logger = logger + self._quiet = quiet self._metadata: dict[str, Any] = {} self._result: Union[ @@ -458,6 +467,11 @@ async def _wait_for_completion( while (time.time() - start_time) < self._poll_timeout_seconds: # Used cached metadata if cached status is terminal task_status = self._update_status_if_nonterminal() + if not self._quiet and task_status == "QUEUED": + queue = self.queue_position() + self._logger.debug( + f"Task is in {queue.queue_type} queue position: {queue.queue_position}" + ) self._logger.debug(f"Task {self._arn}: task status {task_status}") if task_status in AwsQuantumTask.RESULTS_READY_STATES: return self._download_result() diff --git a/src/braket/jobs/hybrid_job.py b/src/braket/jobs/hybrid_job.py index ae17c2715..a87a5548f 100644 --- a/src/braket/jobs/hybrid_job.py +++ b/src/braket/jobs/hybrid_job.py @@ -63,6 +63,7 @@ def hybrid_job( aws_session: AwsSession | None = None, tags: dict[str, str] | None = None, logger: Logger = getLogger(__name__), + quiet: bool | None = None, ) -> Callable: """Defines a hybrid job by decorating the entry point function. The job will be created when the decorated function is called. @@ -70,7 +71,7 @@ def hybrid_job( The job created will be a `LocalQuantumJob` when `local` is set to `True`, otherwise an `AwsQuantumJob`. The following parameters will be ignored when running a job with `local` set to `True`: `wait_until_complete`, `instance_config`, `distribution`, - `copy_checkpoints_from_job`, `stopping_condition`, `tags`, and `logger`. + `copy_checkpoints_from_job`, `stopping_condition`, `tags`, `logger`, and `quiet`. Args: device (str | None): Device ARN of the QPU device that receives priority quantum @@ -152,6 +153,9 @@ def hybrid_job( logger (Logger): Logger object with which to write logs, such as task statuses while waiting for task to be in a terminal state. Default: `getLogger(__name__)` + quiet (bool | None): Sets the verbosity of the logger to low and does not report queue + position. Default is `False`. + Returns: Callable: the callable for creating a Hybrid Job. """ @@ -205,6 +209,7 @@ def job_wrapper(*args, **kwargs) -> Callable: "output_data_config": output_data_config, "aws_session": aws_session, "tags": tags, + "quiet": quiet, } for key, value in optional_args.items(): if value is not None: diff --git a/src/braket/jobs/logs.py b/src/braket/jobs/logs.py index e0f54458d..2e2b03664 100644 --- a/src/braket/jobs/logs.py +++ b/src/braket/jobs/logs.py @@ -155,7 +155,7 @@ def log_stream( yield ev -def flush_log_streams( +def flush_log_streams( # noqa C901 aws_session: AwsSession, log_group: str, stream_prefix: str, @@ -164,6 +164,8 @@ def flush_log_streams( stream_count: int, has_streams: bool, color_wrap: ColorWrap, + state: list[str], + queue_position: str | None = None, ) -> bool: """Flushes log streams to stdout. @@ -183,6 +185,9 @@ def flush_log_streams( been found. This value is possibly updated and returned at the end of execution. color_wrap (ColorWrap): An instance of ColorWrap to potentially color-wrap print statements from different streams. + state (list[str]): The previous and current state of the job. + queue_position (str | None): The current queue position. This is not passed in if the job + is ran with `quiet=True` Returns: bool: Returns 'True' if any streams have been flushed. @@ -225,6 +230,10 @@ def flush_log_streams( positions[stream_names[idx]] = Position(timestamp=ts, skip=count + 1) else: positions[stream_names[idx]] = Position(timestamp=event["timestamp"], skip=1) + elif queue_position is not None and state[1] == "QUEUED": + print(f"Job queue position: {queue_position}", end="\n", flush=True) + elif state[0] != state[1] and state[1] == "RUNNING" and queue_position is not None: + print("Running:", end="\n", flush=True) else: print(".", end="", flush=True) return has_streams diff --git a/test/unit_tests/braket/aws/test_aws_quantum_job.py b/test/unit_tests/braket/aws/test_aws_quantum_job.py index 19b46d72b..ff7ab04c5 100644 --- a/test/unit_tests/braket/aws/test_aws_quantum_job.py +++ b/test/unit_tests/braket/aws/test_aws_quantum_job.py @@ -93,6 +93,7 @@ def _get_job_response(**kwargs): "jobArn": "arn:aws:braket:us-west-2:875981177017:job/job-test-20210628140446", "jobName": "job-test-20210628140446", "outputDataConfig": {"s3Path": "s3://amazon-braket-jobs/job-path/data"}, + "queueInfo": {"position": "1", "queue": "JOBS_QUEUE"}, "roleArn": "arn:aws:iam::875981177017:role/AmazonBraketJobRole", "status": "RUNNING", "stoppingCondition": {"maxRuntimeInSeconds": 1200}, @@ -713,6 +714,14 @@ def test_logs( generate_get_job_response(status="RUNNING"), generate_get_job_response(status="RUNNING"), generate_get_job_response(status="RUNNING"), + generate_get_job_response(status="RUNNING"), + generate_get_job_response(status="RUNNING"), + generate_get_job_response(status="RUNNING"), + generate_get_job_response(status="COMPLETED"), + generate_get_job_response(status="COMPLETED"), + generate_get_job_response(status="COMPLETED"), + generate_get_job_response(status="COMPLETED"), + generate_get_job_response(status="COMPLETED"), generate_get_job_response(status="COMPLETED"), ) quantum_job._aws_session.describe_log_streams.side_effect = log_stream_responses @@ -733,6 +742,48 @@ def test_logs( ) +def test_logs_queue_progress( + quantum_job, + generate_get_job_response, + log_events_responses, + log_stream_responses, + capsys, +): + queue_info = {"queue": "JOBS_QUEUE", "position": "1"} + quantum_job._aws_session.get_job.side_effect = ( + generate_get_job_response(status="QUEUED", queue_info=queue_info), + generate_get_job_response(status="QUEUED", queue_info=queue_info), + generate_get_job_response(status="QUEUED", queue_info=queue_info), + generate_get_job_response(status="RUNNING"), + generate_get_job_response(status="RUNNING"), + generate_get_job_response(status="RUNNING"), + generate_get_job_response(status="COMPLETED"), + generate_get_job_response(status="COMPLETED"), + generate_get_job_response(status="COMPLETED"), + generate_get_job_response(status="COMPLETED"), + generate_get_job_response(status="COMPLETED"), + generate_get_job_response(status="COMPLETED"), + ) + quantum_job._aws_session.describe_log_streams.side_effect = log_stream_responses + quantum_job._aws_session.get_log_events.side_effect = log_events_responses + + quantum_job.logs(wait=True, poll_interval_seconds=0) + + captured = capsys.readouterr() + assert captured.out == "\n".join( + ( + f"Job queue position: {queue_info['position']}", + "Running:", + "", + "hi there #1", + "hi there #2", + "hi there #2a", + "hi there #3", + "", + ) + ) + + @patch.dict("os.environ", {"JPY_PARENT_PID": "True"}) def test_logs_multiple_instances( quantum_job, @@ -746,6 +797,15 @@ def test_logs_multiple_instances( generate_get_job_response(status="RUNNING"), generate_get_job_response(status="RUNNING"), generate_get_job_response(status="RUNNING"), + generate_get_job_response(status="RUNNING"), + generate_get_job_response(status="RUNNING"), + generate_get_job_response(status="RUNNING"), + generate_get_job_response(status="RUNNING"), + generate_get_job_response(status="COMPLETED"), + generate_get_job_response(status="COMPLETED"), + generate_get_job_response(status="COMPLETED"), + generate_get_job_response(status="COMPLETED"), + generate_get_job_response(status="COMPLETED"), generate_get_job_response(status="COMPLETED"), ) log_stream_responses[-1]["logStreams"].append({"logStreamName": "stream-2"}) @@ -811,6 +871,7 @@ def get_log_events(log_group, log_stream, start_time, start_from_head, next_toke def test_logs_error(quantum_job, generate_get_job_response, capsys): quantum_job._aws_session.get_job.side_effect = ( + generate_get_job_response(status="RUNNING"), generate_get_job_response(status="RUNNING"), generate_get_job_response(status="RUNNING"), generate_get_job_response(status="COMPLETED"), diff --git a/test/unit_tests/braket/aws/test_aws_quantum_task.py b/test/unit_tests/braket/aws/test_aws_quantum_task.py index 99270ad25..62b41e5c7 100644 --- a/test/unit_tests/braket/aws/test_aws_quantum_task.py +++ b/test/unit_tests/braket/aws/test_aws_quantum_task.py @@ -83,6 +83,11 @@ def quantum_task(aws_session): return AwsQuantumTask("foo:bar:arn", aws_session, poll_timeout_seconds=2) +@pytest.fixture +def quantum_task_not_quiet(aws_session): + return AwsQuantumTask("foo:bar:arn", aws_session, poll_timeout_seconds=2, quiet=True) + + @pytest.fixture def circuit_task(aws_session): return AwsQuantumTask("foo:bar:arn", aws_session, poll_timeout_seconds=2) @@ -220,6 +225,23 @@ def test_queue_position(quantum_task): ) +def test_queued_not_quiet(quantum_task_not_quiet): + state_1 = "QUEUED" + _mock_metadata(quantum_task_not_quiet._aws_session, state_1) + assert quantum_task_not_quiet.queue_position() == QuantumTaskQueueInfo( + queue_type=QueueType.NORMAL, queue_position="2", message=None + ) + + state_2 = "COMPLETED" + message = ( + f"'Task is in {state_2} status. AmazonBraket does not show queue position for this status.'" + ) + _mock_metadata(quantum_task_not_quiet._aws_session, state_2) + assert quantum_task_not_quiet.queue_position() == QuantumTaskQueueInfo( + queue_type=QueueType.NORMAL, queue_position=None, message=message + ) + + def test_state(quantum_task): state_1 = "RUNNING" _mock_metadata(quantum_task._aws_session, state_1) @@ -409,6 +431,43 @@ def set_result_from_callback(future): assert result_from_future == result +@pytest.mark.parametrize( + "status, result", + [ + ("COMPLETED", GateModelQuantumTaskResult.from_string(MockS3.MOCK_S3_RESULT_GATE_MODEL)), + ("FAILED", None), + ], +) +def test_async_result_queued(circuit_task, status, result): + def set_result_from_callback(future): + # Set the result_from_callback variable in the enclosing functions scope + nonlocal result_from_callback + result_from_callback = future.result() + + _mock_metadata(circuit_task._aws_session, "QUEUED") + _mock_s3(circuit_task._aws_session, MockS3.MOCK_S3_RESULT_GATE_MODEL) + + future = circuit_task.async_result() + + # test the different ways to get the result from async + + # via callback + result_from_callback = None + future.add_done_callback(set_result_from_callback) + + # via asyncio waiting for result + _mock_metadata(circuit_task._aws_session, status) + event_loop = asyncio.get_event_loop() + result_from_waiting = event_loop.run_until_complete(future) + + # via future.result(). Note that this would fail if the future is not complete. + result_from_future = future.result() + + assert result_from_callback == result + assert result_from_waiting == result + assert result_from_future == result + + def test_failed_task(quantum_task): _mock_metadata(quantum_task._aws_session, "FAILED") _mock_s3(quantum_task._aws_session, MockS3.MOCK_S3_RESULT_GATE_MODEL) From ea554a667564e72fa844fce84126ebbdcbea0426 Mon Sep 17 00:00:00 2001 From: Abe Coull Date: Wed, 29 Nov 2023 10:07:35 -0800 Subject: [PATCH 2/4] fix naming --- test/unit_tests/braket/aws/test_aws_quantum_task.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/test/unit_tests/braket/aws/test_aws_quantum_task.py b/test/unit_tests/braket/aws/test_aws_quantum_task.py index 62b41e5c7..d6afab5de 100644 --- a/test/unit_tests/braket/aws/test_aws_quantum_task.py +++ b/test/unit_tests/braket/aws/test_aws_quantum_task.py @@ -84,7 +84,7 @@ def quantum_task(aws_session): @pytest.fixture -def quantum_task_not_quiet(aws_session): +def quantum_task_quiet(aws_session): return AwsQuantumTask("foo:bar:arn", aws_session, poll_timeout_seconds=2, quiet=True) @@ -225,10 +225,10 @@ def test_queue_position(quantum_task): ) -def test_queued_not_quiet(quantum_task_not_quiet): +def test_queued_quiet(quantum_task_quiet): state_1 = "QUEUED" - _mock_metadata(quantum_task_not_quiet._aws_session, state_1) - assert quantum_task_not_quiet.queue_position() == QuantumTaskQueueInfo( + _mock_metadata(quantum_task_quiet._aws_session, state_1) + assert quantum_task_quiet.queue_position() == QuantumTaskQueueInfo( queue_type=QueueType.NORMAL, queue_position="2", message=None ) @@ -236,8 +236,8 @@ def test_queued_not_quiet(quantum_task_not_quiet): message = ( f"'Task is in {state_2} status. AmazonBraket does not show queue position for this status.'" ) - _mock_metadata(quantum_task_not_quiet._aws_session, state_2) - assert quantum_task_not_quiet.queue_position() == QuantumTaskQueueInfo( + _mock_metadata(quantum_task_quiet._aws_session, state_2) + assert quantum_task_quiet.queue_position() == QuantumTaskQueueInfo( queue_type=QueueType.NORMAL, queue_position=None, message=message ) From 6f5ce83926226860b5d9311b5d4faddf2f10d063 Mon Sep 17 00:00:00 2001 From: Abe Coull Date: Wed, 29 Nov 2023 10:34:10 -0800 Subject: [PATCH 3/4] py39 typing fix --- src/braket/jobs/logs.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/braket/jobs/logs.py b/src/braket/jobs/logs.py index 2e2b03664..734d51123 100644 --- a/src/braket/jobs/logs.py +++ b/src/braket/jobs/logs.py @@ -20,7 +20,7 @@ # Support for reading logs # ############################################################################## -from typing import Dict, List, Tuple +from typing import Dict, List, Optional, Tuple from botocore.exceptions import ClientError @@ -165,7 +165,7 @@ def flush_log_streams( # noqa C901 has_streams: bool, color_wrap: ColorWrap, state: list[str], - queue_position: str | None = None, + queue_position: Optional[str] = None, ) -> bool: """Flushes log streams to stdout. @@ -186,7 +186,7 @@ def flush_log_streams( # noqa C901 color_wrap (ColorWrap): An instance of ColorWrap to potentially color-wrap print statements from different streams. state (list[str]): The previous and current state of the job. - queue_position (str | None): The current queue position. This is not passed in if the job + queue_position (Optional[str]): The current queue position. This is not passed in if the job is ran with `quiet=True` Returns: From 66453937939dc2d484646de1008d677164868699 Mon Sep 17 00:00:00 2001 From: Abe Coull Date: Tue, 12 Dec 2023 10:40:54 -0800 Subject: [PATCH 4/4] tox fix --- src/braket/aws/aws_quantum_job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/braket/aws/aws_quantum_job.py b/src/braket/aws/aws_quantum_job.py index 4d466023e..f04ee3c56 100644 --- a/src/braket/aws/aws_quantum_job.py +++ b/src/braket/aws/aws_quantum_job.py @@ -179,7 +179,7 @@ def create( quiet (bool): Sets the verbosity of the logger to low and does not report queue position. Default is `False`. - + reservation_arn (str | None): the reservation window arn provided by Braket Direct to reserve exclusive usage for the device to run the hybrid job on. Default: None.