Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add batching support to sdk #647

Merged
merged 14 commits into from
Dec 9, 2024
Merged
Prev Previous commit
Next Next commit
Add toc blob to submission
  • Loading branch information
Adam Grofe committed Nov 18, 2024
commit dc551d77ad1fe223f624b4ebe3c61dbbcde0b2e0
89 changes: 0 additions & 89 deletions azure-quantum/azure/quantum/job/base_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,95 +149,6 @@ def from_input_data(
**kwargs
)

@classmethod
def from_input_data_container(
cls,
workspace: "Workspace",
name: str,
target: str,
input_data: Dict[str, bytes],
content_type: ContentType = ContentType.json,
blob_name: str = "inputData",
encoding: str = "",
job_id: str = None,
container_name: str = None,
provider_id: str = None,
input_data_format: str = None,
output_data_format: str = None,
input_params: Dict[str, Any] = None,
session_id: Optional[str] = None,
**kwargs
) -> "BaseJob":
"""Create a new Azure Quantum job based on a list of input_data.

:param workspace: Azure Quantum workspace to submit the input_data to
:type workspace: Workspace
:param name: Name of the job
:type name: str
:param target: Azure Quantum target
:type target: str
:param input_data: Raw input data to submit
:type input_data: Dict
:param blob_name: Dict of Input data where the key is the blob name
:type blob_name: str
:param content_type: Content type, e.g. "application/json"
:type content_type: ContentType
:param encoding: input_data encoding, e.g. "gzip", defaults to empty string
:type encoding: str
:param job_id: Job ID, defaults to None
:type job_id: str
:param container_name: Container name, defaults to None
:type container_name: str
:param provider_id: Provider ID, defaults to None
:type provider_id: str
:param input_data_format: Input data format, defaults to None
:type input_data_format: str
:param output_data_format: Output data format, defaults to None
:type output_data_format: str
:param input_params: Input parameters, defaults to None
:type input_params: Dict[str, Any]
:param input_params: Input params for job
:type input_params: Dict[str, Any]
:return: Azure Quantum Job
:rtype: Job
"""
# Generate job ID if not specified
if job_id is None:
job_id = cls.create_job_id()

# Create container if it does not yet exist
container_uri = workspace.get_container_uri(
job_id=job_id,
container_name=container_name
)
logger.debug(f"Container URI: {container_uri}")

# Upload data to container
for blob_name, input_data_item in input_data.items():
input_data_uri = cls.upload_input_data(
container_uri=container_uri,
input_data=input_data_item,
content_type=content_type,
blob_name=blob_name,
encoding=encoding,
)

# Create and submit job
return cls.from_storage_uri(
workspace=workspace,
job_id=job_id,
target=target,
input_data_uri=input_data_uri,
container_uri=container_uri,
name=name,
input_data_format=input_data_format,
output_data_format=output_data_format,
provider_id=provider_id,
input_params=input_params,
session_id=session_id,
**kwargs
)

@classmethod
def from_storage_uri(
cls,
Expand Down
110 changes: 108 additions & 2 deletions azure-quantum/azure/quantum/target/microsoft/elements/dft/job.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import collections.abc
from typing import Any, Dict, Union
import logging
from typing import Any, Dict, Union, Optional
from azure.quantum.job import JobFailedWithResultsError
from azure.quantum.job.base_job import BaseJob, ContentType
from azure.quantum.job.job import Job, DEFAULT_TIMEOUT
from azure.quantum._client.models import JobDetails
from azure.quantum.workspace import Workspace

logger = logging.getLogger(__name__)

class MicrosoftElementsDftJob(Job):
"""
Expand Down Expand Up @@ -62,4 +67,105 @@ def _is_dft_failure_results(failure_results: Union[Dict[str, Any], str]) -> bool
and "error" in failure_results["results"][0] \
and isinstance(failure_results["results"][0]["error"], dict) \
and "error_type" in failure_results["results"][0]["error"] \
and "error_message" in failure_results["results"][0]["error"]
and "error_message" in failure_results["results"][0]["error"]

@classmethod
def from_input_data_container(
cls,
workspace: "Workspace",
name: str,
target: str,
input_data: bytes,
batch_input_blobs: Dict[str, bytes],
content_type: ContentType = ContentType.json,
blob_name: str = "inputData",
encoding: str = "",
job_id: str = None,
container_name: str = None,
provider_id: str = None,
input_data_format: str = None,
output_data_format: str = None,
input_params: Dict[str, Any] = None,
session_id: Optional[str] = None,
**kwargs
) -> "BaseJob":
"""Create a new Azure Quantum job based on a list of input_data.

:param workspace: Azure Quantum workspace to submit the input_data to
:type workspace: Workspace
:param name: Name of the job
:type name: str
:param target: Azure Quantum target
:type target: str
:param input_data: Raw input data to submit
:type input_data: Dict
:param blob_name: Dict of Input data json to gives a table of contents
:type batch_input_blobs: Dict
:param blob_name: Dict of QcSchema Data where the key is the blob name to store it in the container
:type blob_name: str
:param content_type: Content type, e.g. "application/json"
:type content_type: ContentType
:param encoding: input_data encoding, e.g. "gzip", defaults to empty string
:type encoding: str
:param job_id: Job ID, defaults to None
:type job_id: str
:param container_name: Container name, defaults to None
:type container_name: str
:param provider_id: Provider ID, defaults to None
:type provider_id: str
:param input_data_format: Input data format, defaults to None
:type input_data_format: str
:param output_data_format: Output data format, defaults to None
:type output_data_format: str
:param input_params: Input parameters, defaults to None
:type input_params: Dict[str, Any]
:param input_params: Input params for job
:type input_params: Dict[str, Any]
:return: Azure Quantum Job
:rtype: Job
"""
# Generate job ID if not specified
if job_id is None:
job_id = cls.create_job_id()

# Create container if it does not yet exist
container_uri = workspace.get_container_uri(
job_id=job_id,
container_name=container_name
)
logger.debug(f"Container URI: {container_uri}")

# Upload Input Data
input_data_uri = cls.upload_input_data(
container_uri=container_uri,
input_data=input_data,
content_type=content_type,
blob_name=blob_name,
encoding=encoding,
)

# Upload data to container
for blob_name, input_data_item in batch_input_blobs.items():
cls.upload_input_data(
container_uri=container_uri,
input_data=input_data_item,
content_type=content_type,
blob_name=blob_name,
encoding=encoding,
)

# Create and submit job
return cls.from_storage_uri(
workspace=workspace,
job_id=job_id,
target=target,
input_data_uri=input_data_uri,
container_uri=container_uri,
name=name,
input_data_format=input_data_format,
output_data_format=output_data_format,
provider_id=provider_id,
input_params=input_params,
session_id=session_id,
**kwargs
)
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,18 @@ def submit(self,

qcschema_blobs = {}
for i in range(len(qcschema_data)):
qcschema_blobs[f"input_data_{i}.json"] = self._encode_input_data(qcschema_data[i])
qcschema_blobs[f"inputData_{i}"] = self._encode_input_data(qcschema_data[i])

toc_str = self._create_table_of_contents(input_data, list(qcschema_blobs.keys()))
toc = self._encode_input_data(toc_str)

return self._get_job_class().from_input_data_container(
workspace=self.workspace,
name=name,
target=self.name,
input_data=qcschema_blobs,
input_params={ 'number_of_molecules': len(qcschema_data), "input_files": list(qcschema_blobs.keys()), **input_params },
input_data=toc,
batch_input_blobs=qcschema_blobs,
input_params={ 'numberOfFiles': len(qcschema_data), "inputFiles": list(qcschema_blobs.keys()), **input_params },
content_type=kwargs.pop('content_type', self.content_type),
encoding=kwargs.pop('encoding', self.encoding),
provider_id=self.provider_id,
Expand Down Expand Up @@ -194,3 +198,23 @@ def _xyz_to_qcschema_mol(self, file_data: str ) -> Dict[str, Any]:
@classmethod
def _get_job_class(cls) -> Type[Job]:
return MicrosoftElementsDftJob

@classmethod
def _create_table_of_contents(cls, input_files: List[str], input_blobs: List[str]) -> Dict[str,Any]:
"""Create the table of contents for a batched job that contains a description of file and the mapping between the file names and the blob names"""

assert len(input_files) == len(input_blobs), "Internal error: number of blobs is not that same as the number of files."

toc = []
for i in range(len(input_files)):
toc.append(
{
"xyzFileName": input_files[i],
"qcschemaFileName": input_blobs[i],
}
)

return {
"description": "This files contains the mapping between the xyz file name that were submitted and the qcschema blobs that are used for the calculation.",
"tableOfContents": toc,
}
20 changes: 19 additions & 1 deletion azure-quantum/tests/unit/test_microsoft_elements_dft.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,4 +195,22 @@ def test_assemble_go_qcschema_from_files_success(data_regression, input_params,
def test_assemble_bomd_qcschema_from_files_success(data_regression, input_params, input_data):
target = MicrosoftElementsDft
qcschema_data = target._assemble_qcshema_from_files(input_data, input_params)
data_regression.check(qcschema_data)
data_regression.check(qcschema_data)


@pytest.mark.parametrize(
'inputs', [
{
'input_files': ["molecule_1.xyz"],
'input_blobs': ["inputData0"],
},
{
'input_files': ["molecule_1.xyz","molecule_2.xyz"],
'input_blobs': ["inputData0","inputData1"],
},
]
)
def test_create_toc_data(data_regression, inputs):
target = MicrosoftElementsDft
toc_data = target._create_table_of_contents(inputs["input_files"], inputs["input_blobs"])
data_regression.check(toc_data)
Loading