Skip to content

Commit

Permalink
feat: add queue visibility information (#712)
Browse files Browse the repository at this point in the history
* feat: add queue position for tasks (#299)

* feat: add queue position for tasks

* update docstring for queue_position

* update docstrings

* update package_name

* update to enums and dataclass

* test: add integ test for task queue_position (#300)

* refactor: dataclass and file naming for queue info (#301)

* refactor: dataclass and file naming for queue info

* apply suggestions

* update: task queue position after refactor (#309)

* update: task queue position after refactor

* add queue_position type hint details, change order of info

* feat: queue position for hybrid jobs (#302)

* feat: queue position for hybrid jobs

* handle message return

* add docstring changes

* update docstring, and return None

* add context in dataclass

* update docstrings

* indent fix

* test: add integ test for jobs queue position (#310)

* feat: queue position for hybrid jobs

* handle message return

* add docstring changes

* update docstring, and return None

* add context in dataclass

* test: add integ test for jobs queue position

* remove comment

* minor fix

* remove unnecessary branching

* fix docstring merge

* remove dataclass redefinition after merge

* feat: queue depth for devices (#306)

* feat: queue depth for devices (dataclass version)

* add unit-test for queue depth

* modify docstrings, remove helper funcs

* add more info to docstrings

* minor test edit

* docstrings

* indent

* update QueuePriority to QueueType

* test: add integ test for queue_depth (#311)

* refactor: job and quantum_task keywords for queue_depth (#312)

* refactor: job and quantum_task keywords for queue_depth

* Update src/braket/aws/queue_information.py

Co-authored-by: Kshitij Chhabra <[email protected]>

---------

Co-authored-by: Kshitij Chhabra <[email protected]>

* deps: update boto3 version for queue visibility (#319)

* sync: public-main changes into feature/queue_visibility (#320)

* feat: add Aria2 enum (#653)

Co-authored-by: Viraj Chaudhari <[email protected]>
Co-authored-by: Cody Wang <[email protected]>

* infra: bump actions/checkout from 3.6.0 to 4.0.0 (#696)

Bumps [actions/checkout](https://github.com/actions/checkout) from 3.6.0 to 4.0.0.
- [Release notes](https://github.com/actions/checkout/releases)
- [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md)
- [Commits](actions/checkout@f43a0e5...3df4ab1)

---
updated-dependencies:
- dependency-name: actions/checkout
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* prepare release v1.55.0

* update development version to v1.55.1.dev0

* Revert "update: restricting parameter names to not collide with ones we use for OpenQASM generation. (#675)" (#701)

This reverts commit b158736.

* infra: update codeowner file to amazon-braket/braket-dev (#699)

Co-authored-by: Abe Coull <[email protected]>

* doc: Replace aws org with amazon-braket (#705)

* prepare release v1.55.1

* update development version to v1.55.2.dev0

* doc: change the sphinx requirement to be greater than 7.0.0 (#704)

Co-authored-by: Cody Wang <[email protected]>

* doc: add code contributors to the readme (#703)

Co-authored-by: Cody Wang <[email protected]>

* doc: Remove trailing backquotes (#706)

* infra: update the pre-commit hook with linters (#678)

* infra: update the pre-commit hook with linters and secrets check

Co-authored-by: Abe Coull <[email protected]>
Co-authored-by: Cody Wang <[email protected]>

* prepare release v1.55.1.post0

* update development version to v1.55.2.dev0

---------

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: ashlhans <[email protected]>
Co-authored-by: Cody Wang <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: ci <ci>
Co-authored-by: Milan <[email protected]>
Co-authored-by: Angela Guo <[email protected]>
Co-authored-by: Abe Coull <[email protected]>
Co-authored-by: Abe Coull <[email protected]>

* Revert "sync: public-main changes into feature/queue_visibility (#320)"

This reverts commit be6460c.

* update github script

* minor fix

---------

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: Kshitij Chhabra <[email protected]>
Co-authored-by: ashlhans <[email protected]>
Co-authored-by: Cody Wang <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Milan <[email protected]>
Co-authored-by: Angela Guo <[email protected]>
Co-authored-by: Abe Coull <[email protected]>
Co-authored-by: Abe Coull <[email protected]>
  • Loading branch information
9 people authored Sep 26, 2023
1 parent e2d5899 commit c95fddf
Show file tree
Hide file tree
Showing 11 changed files with 398 additions and 16 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
"setuptools",
"backoff",
"boltons",
"boto3>=1.22.3",
"boto3>=1.28.53",
"nest-asyncio",
"networkx",
"numpy",
Expand Down
49 changes: 49 additions & 0 deletions src/braket/aws/aws_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from braket.aws.aws_quantum_task import AwsQuantumTask
from braket.aws.aws_quantum_task_batch import AwsQuantumTaskBatch
from braket.aws.aws_session import AwsSession
from braket.aws.queue_information import QueueDepthInfo, QueueType
from braket.circuits import Circuit, Gate, QubitSet
from braket.circuits.gate_calibrations import GateCalibrations
from braket.device_schema import DeviceCapabilities, ExecutionDay, GateModelQpuParadigmProperties
Expand Down Expand Up @@ -667,6 +668,54 @@ def get_device_region(device_arn: str) -> str:
"see 'https://docs.aws.amazon.com/braket/latest/developerguide/braket-devices.html'"
)

def queue_depth(self) -> QueueDepthInfo:
"""
Task queue depth refers to the total number of quantum tasks currently waiting
to run on a particular device.
Returns:
QueueDepthInfo: Instance of the QueueDepth class representing queue depth
information for quantum tasks and hybrid jobs.
Queue depth refers to the number of quantum tasks and hybrid jobs queued on a particular
device. The normal tasks refers to the quantum tasks not submitted via Hybrid Jobs.
Whereas, the priority tasks refers to the total number of quantum tasks waiting to run
submitted through Amazon Braket Hybrid Jobs. These tasks run before the normal tasks.
If the queue depth for normal or priority quantum tasks is greater than 4000, we display
their respective queue depth as '>4000'. Similarly, for hybrid jobs if there are more
than 1000 jobs queued on a device, display the hybrid jobs queue depth as '>1000'.
Additionally, for QPUs if hybrid jobs queue depth is 0, we display information about
priority and count of the running hybrid job.
Example:
Queue depth information for a running job.
>>> device = AwsDevice(Device.Amazon.SV1)
>>> print(device.queue_depth())
QueueDepthInfo(quantum_tasks={<QueueType.NORMAL: 'Normal'>: '0',
<QueueType.PRIORITY: 'Priority'>: '1'}, jobs='0 (1 prioritized job(s) running)')
If more than 4000 quantum tasks queued on a device.
>>> device = AwsDevice(Device.Amazon.DM1)
>>> print(device.queue_depth())
QueueDepthInfo(quantum_tasks={<QueueType.NORMAL: 'Normal'>: '>4000',
<QueueType.PRIORITY: 'Priority'>: '2000'}, jobs='100')
"""
metadata = self.aws_session.get_device(arn=self.arn)
queue_metadata = metadata.get("deviceQueueInfo")
queue_info = {}

for response in queue_metadata:
queue_name = response.get("queue")
queue_priority = response.get("queuePriority")
queue_size = response.get("queueSize")

if queue_name == "QUANTUM_TASKS_QUEUE":
priority_enum = QueueType(queue_priority)
queue_info.setdefault("quantum_tasks", {})[priority_enum] = queue_size
else:
queue_info["jobs"] = queue_size

return QueueDepthInfo(**queue_info)

def refresh_gate_calibrations(self) -> Optional[GateCalibrations]:
"""
Refreshes the gate calibration data upon request.
Expand Down
33 changes: 33 additions & 0 deletions src/braket/aws/aws_quantum_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

from braket.aws import AwsDevice
from braket.aws.aws_session import AwsSession
from braket.aws.queue_information import HybridJobQueueInfo
from braket.jobs import logs
from braket.jobs.config import (
CheckpointConfig,
Expand Down Expand Up @@ -278,6 +279,38 @@ def state(self, use_cached_value: bool = False) -> str:
"""
return self.metadata(use_cached_value).get("status")

def queue_position(self) -> HybridJobQueueInfo:
"""
The queue position details for the hybrid job.
Returns:
HybridJobQueueInfo: Instance of HybridJobQueueInfo class representing
the queue position information for the hybrid job. The queue_position is
only returned when the hybrid job is not in RUNNING/CANCELLING/TERMINAL states,
else queue_position is returned as None. If the queue position of the hybrid
job is greater than 15, we return '>15' as the queue_position return value.
Examples:
job status = QUEUED and position is 2 in the queue.
>>> job.queue_position()
HybridJobQueueInfo(queue_position='2', message=None)
job status = QUEUED and position is 18 in the queue.
>>> job.queue_position()
HybridJobQueueInfo(queue_position='>15', message=None)
job status = COMPLETED
>>> job.queue_position()
HybridJobQueueInfo(queue_position=None,
message='Job is in COMPLETED status. AmazonBraket does
not show queue position for this status.')
"""
response = self.metadata()["queueInfo"]
queue_position = None if response.get("position") == "None" else response.get("position")
message = response.get("message")

return HybridJobQueueInfo(queue_position=queue_position, message=message)

def logs(self, wait: bool = False, poll_interval_seconds: int = 5) -> None:
"""Display logs for a given hybrid job, optionally tailing them until hybrid job is
complete.
Expand Down
35 changes: 35 additions & 0 deletions src/braket/aws/aws_quantum_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from braket.ahs.analog_hamiltonian_simulation import AnalogHamiltonianSimulation
from braket.annealing.problem import Problem
from braket.aws.aws_session import AwsSession
from braket.aws.queue_information import QuantumTaskQueueInfo, QueueType
from braket.circuits import Instruction
from braket.circuits.circuit import Circuit, Gate, QubitSet
from braket.circuits.circuit_helpers import validate_circuit_and_shots
Expand Down Expand Up @@ -314,6 +315,40 @@ def state(self, use_cached_value: bool = False) -> str:
"""
return self._status(use_cached_value)

def queue_position(self) -> QuantumTaskQueueInfo:
"""
The queue position details for the quantum task.
Returns:
QuantumTaskQueueInfo: Instance of QuantumTaskQueueInfo class
representing the queue position information for the quantum task.
The queue_position is only returned when quantum task is not in
RUNNING/CANCELLING/TERMINAL states, else queue_position is returned as None.
The normal tasks refers to the quantum tasks not submitted via Hybrid Jobs.
Whereas, the priority tasks refers to the total number of quantum tasks waiting to run
submitted through Amazon Braket Hybrid Jobs. These tasks run before the normal tasks.
If the queue position for normal or priority quantum tasks is greater than 2000,
we display their respective queue position as '>2000'.
Examples:
task status = QUEUED and queue position is 2050
>>> task.queue_position()
QuantumTaskQueueInfo(queue_type=<QueueType.NORMAL: 'Normal'>,
queue_position='>2000', message=None)
task status = COMPLETED
>>> task.queue_position()
QuantumTaskQueueInfo(queue_type=<QueueType.NORMAL: 'Normal'>,
queue_position=None, message='Task is in COMPLETED status. AmazonBraket does
not show queue position for this status.')
"""
response = self.metadata()["queueInfo"]
queue_type = QueueType(response["queuePriority"])
queue_position = None if response.get("position") == "None" else response.get("position")
message = response.get("message")

return QuantumTaskQueueInfo(queue_type, queue_position, message)

def _status(self, use_cached_value: bool = False) -> str:
metadata = self.metadata(use_cached_value)
status = metadata.get("status")
Expand Down
6 changes: 4 additions & 2 deletions src/braket/aws/aws_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,9 @@ def get_quantum_task(self, arn: str) -> Dict[str, Any]:
Returns:
Dict[str, Any]: The response from the Amazon Braket `GetQuantumTask` operation.
"""
response = self.braket_client.get_quantum_task(quantumTaskArn=arn)
response = self.braket_client.get_quantum_task(
quantumTaskArn=arn, additionalAttributeNames=["QueueInfo"]
)
broadcast_event(_TaskStatusEvent(arn=response["quantumTaskArn"], status=response["status"]))
return response

Expand Down Expand Up @@ -324,7 +326,7 @@ def get_job(self, arn: str) -> Dict[str, Any]:
Returns:
Dict[str, Any]: The response from the Amazon Braket `GetQuantumJob` operation.
"""
return self.braket_client.get_job(jobArn=arn)
return self.braket_client.get_job(jobArn=arn, additionalAttributeNames=["QueueInfo"])

def cancel_job(self, arn: str) -> Dict[str, Any]:
"""
Expand Down
85 changes: 85 additions & 0 deletions src/braket/aws/queue_information.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.

from dataclasses import dataclass
from enum import Enum
from typing import Dict, Optional


class QueueType(str, Enum):
"""
Enumerates the possible priorities for the queue.
Values:
NORMAL: Represents normal queue for the device.
PRIORITY: Represents priority queue for the device.
"""

NORMAL = "Normal"
PRIORITY = "Priority"


@dataclass()
class QueueDepthInfo:
"""
Represents quantum tasks and hybrid jobs queue depth information.
Attributes:
quantum_tasks (Dict[QueueType, str]): number of quantum tasks waiting
to run on a device. This includes both 'Normal' and 'Priority' tasks.
For Example, {'quantum_tasks': {QueueType.NORMAL: '7', QueueType.PRIORITY: '3'}}
jobs (str): number of hybrid jobs waiting to run on a device. Additionally, for QPUs if
hybrid jobs queue depth is 0, we display information about priority and count of the
running hybrid jobs. Example, 'jobs': '0 (1 prioritized job(s) running)'
"""

quantum_tasks: Dict[QueueType, str]
jobs: str


@dataclass
class QuantumTaskQueueInfo:
"""
Represents quantum tasks queue information.
Attributes:
queue_type (QueueType): type of the quantum_task queue either 'Normal'
or 'Priority'.
queue_position (Optional[str]): current position of your quantum task within a respective
device queue. This value can be None based on the state of the task. Default: None.
message (Optional[str]): Additional message information. This key is present only
if 'queue_position' is None. Default: None.
"""

queue_type: QueueType
queue_position: Optional[str] = None
message: Optional[str] = None


@dataclass
class HybridJobQueueInfo:
"""
Represents hybrid job queue information.
Attributes:
queue_position (Optional[str]): current position of your hybrid job within a respective
device queue. If the queue position of the hybrid job is greater than 15, we
return '>15' as the queue_position return value. The queue_position is only
returned when hybrid job is not in RUNNING/CANCELLING/TERMINAL states, else
queue_position is returned as None.
message (Optional[str]): Additional message information. This key is present only
if 'queue_position' is None. Default: None.
"""

queue_position: Optional[str] = None
message: Optional[str] = None
84 changes: 84 additions & 0 deletions test/integ_tests/test_queue_information.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.

from braket.aws import AwsDevice, AwsQuantumJob
from braket.aws.queue_information import (
HybridJobQueueInfo,
QuantumTaskQueueInfo,
QueueDepthInfo,
QueueType,
)
from braket.circuits import Circuit
from braket.devices import Devices


def test_task_queue_position():
device = AwsDevice(Devices.Amazon.SV1)

bell = Circuit().h(0).cnot(0, 1)
task = device.run(bell, shots=10)

# call the queue_position method.
queue_information = task.queue_position()

# data type validations
assert isinstance(queue_information, QuantumTaskQueueInfo)
assert isinstance(queue_information.queue_type, QueueType)
assert isinstance(queue_information.queue_position, (str, type(None)))

# assert queue priority
assert queue_information.queue_type in [QueueType.NORMAL, QueueType.PRIORITY]

# assert message
if queue_information.queue_position is None:
assert queue_information.message is not None
assert isinstance(queue_information.message, (str, type(None)))
else:
assert queue_information.message is None


def test_job_queue_position(aws_session):
job = AwsQuantumJob.create(
device=Devices.Amazon.SV1,
source_module="test/integ_tests/job_test_script.py",
entry_point="job_test_script:start_here",
aws_session=aws_session,
wait_until_complete=True,
hyperparameters={"test_case": "completed"},
)

# call the queue_position method.
queue_information = job.queue_position()

# data type validations
assert isinstance(queue_information, HybridJobQueueInfo)

# assert message
assert queue_information.queue_position is None
assert isinstance(queue_information.message, str)


def test_queue_depth():
device = AwsDevice(Devices.Amazon.SV1)

# call the queue_depth method.
queue_information = device.queue_depth()

# data type validations
assert isinstance(queue_information, QueueDepthInfo)
assert isinstance(queue_information.quantum_tasks, dict)
assert isinstance(queue_information.jobs, str)

for key, value in queue_information.quantum_tasks.items():
assert isinstance(key, QueueType)
assert isinstance(value, str)
Loading

0 comments on commit c95fddf

Please sign in to comment.