Skip to content

Commit

Permalink
Add batching support to sdk (#647)
Browse files Browse the repository at this point in the history
* Add batching to sdk by uploading qcschema files to a container.

* Refactor/simplify tests to use pytest regressions

* Change input_data_uri back to blob based uri

* Add toc blob to submission

* Add further xyz validation

* Correct the qcshema format.

* Add pytest-regressions to conda env

* Add pytest-regressions to ci

* Add support for submitting batches of qcschema

---------

Co-authored-by: Adam Grofe <[email protected]>
Co-authored-by: Xinyi Joffre <[email protected]>
Co-authored-by: kikomiss <[email protected]>
  • Loading branch information
4 people authored Dec 9, 2024
1 parent 6420d3e commit 20dc5cb
Show file tree
Hide file tree
Showing 33 changed files with 10,054 additions and 11 deletions.
2 changes: 1 addition & 1 deletion .ado/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ jobs:
displayName: Set Python version

- script: |
pip install pytest pytest-azurepipelines pytest-cov
pip install pytest pytest-azurepipelines pytest-cov pytest-regressions
displayName: Install pytest dependencies
- script: |
Expand Down
110 changes: 108 additions & 2 deletions azure-quantum/azure/quantum/target/microsoft/elements/dft/job.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import collections.abc
from typing import Any, Dict, Union
import logging
from typing import Any, Dict, Union, Optional
from azure.quantum.job import JobFailedWithResultsError
from azure.quantum.job.base_job import BaseJob, ContentType
from azure.quantum.job.job import Job, DEFAULT_TIMEOUT
from azure.quantum._client.models import JobDetails
from azure.quantum.workspace import Workspace

logger = logging.getLogger(__name__)

class MicrosoftElementsDftJob(Job):
"""
Expand Down Expand Up @@ -62,4 +67,105 @@ def _is_dft_failure_results(failure_results: Union[Dict[str, Any], str]) -> bool
and "error" in failure_results["results"][0] \
and isinstance(failure_results["results"][0]["error"], dict) \
and "error_type" in failure_results["results"][0]["error"] \
and "error_message" in failure_results["results"][0]["error"]
and "error_message" in failure_results["results"][0]["error"]

@classmethod
def from_input_data_container(
cls,
workspace: "Workspace",
name: str,
target: str,
input_data: bytes,
batch_input_blobs: Dict[str, bytes],
content_type: ContentType = ContentType.json,
blob_name: str = "inputData",
encoding: str = "",
job_id: str = None,
container_name: str = None,
provider_id: str = None,
input_data_format: str = None,
output_data_format: str = None,
input_params: Dict[str, Any] = None,
session_id: Optional[str] = None,
**kwargs
) -> "BaseJob":
"""Create a new Azure Quantum job based on a list of input_data.
:param workspace: Azure Quantum workspace to submit the input_data to
:type workspace: Workspace
:param name: Name of the job
:type name: str
:param target: Azure Quantum target
:type target: str
:param input_data: Raw input data to submit
:type input_data: Dict
:param blob_name: Dict of Input data json to gives a table of contents
:type batch_input_blobs: Dict
:param blob_name: Dict of QcSchema Data where the key is the blob name to store it in the container
:type blob_name: str
:param content_type: Content type, e.g. "application/json"
:type content_type: ContentType
:param encoding: input_data encoding, e.g. "gzip", defaults to empty string
:type encoding: str
:param job_id: Job ID, defaults to None
:type job_id: str
:param container_name: Container name, defaults to None
:type container_name: str
:param provider_id: Provider ID, defaults to None
:type provider_id: str
:param input_data_format: Input data format, defaults to None
:type input_data_format: str
:param output_data_format: Output data format, defaults to None
:type output_data_format: str
:param input_params: Input parameters, defaults to None
:type input_params: Dict[str, Any]
:param input_params: Input params for job
:type input_params: Dict[str, Any]
:return: Azure Quantum Job
:rtype: Job
"""
# Generate job ID if not specified
if job_id is None:
job_id = cls.create_job_id()

# Create container if it does not yet exist
container_uri = workspace.get_container_uri(
job_id=job_id,
container_name=container_name
)
logger.debug(f"Container URI: {container_uri}")

# Upload Input Data
input_data_uri = cls.upload_input_data(
container_uri=container_uri,
input_data=input_data,
content_type=content_type,
blob_name=blob_name,
encoding=encoding,
)

# Upload data to container
for blob_name, input_data_item in batch_input_blobs.items():
cls.upload_input_data(
container_uri=container_uri,
input_data=input_data_item,
content_type=content_type,
blob_name=blob_name,
encoding=encoding,
)

# Create and submit job
return cls.from_storage_uri(
workspace=workspace,
job_id=job_id,
target=target,
input_data_uri=input_data_uri,
container_uri=container_uri,
name=name,
input_data_format=input_data_format,
output_data_format=output_data_format,
provider_id=provider_id,
input_params=input_params,
session_id=session_id,
**kwargs
)
165 changes: 157 additions & 8 deletions azure-quantum/azure/quantum/target/microsoft/elements/dft/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
from azure.quantum.target.target import Target
from azure.quantum.workspace import Workspace
from azure.quantum.target.params import InputParams
from typing import Any, Dict, Type, Union
from typing import Any, Dict, Type, Union, List
from .job import MicrosoftElementsDftJob
from pathlib import Path
import copy
import json


class MicrosoftElementsDft(Target):
Expand Down Expand Up @@ -73,15 +76,161 @@ def submit(self,
if shots is not None:
warnings.warn("The 'shots' parameter is ignored in Microsoft Elements Dft job.")

return super().submit(
input_data=input_data,
name=name,
shots=shots,
input_params=input_params,
**kwargs
)
if isinstance(input_data, list):

qcschema_data = self._assemble_qcshema_from_files(input_data, input_params)

qcschema_blobs = {}
for i in range(len(qcschema_data)):
qcschema_blobs[f"inputData_{i}"] = self._encode_input_data(qcschema_data[i])

toc_str = self._create_table_of_contents(input_data, list(qcschema_blobs.keys()))
toc = self._encode_input_data(toc_str)

return self._get_job_class().from_input_data_container(
workspace=self.workspace,
name=name,
target=self.name,
input_data=toc,
batch_input_blobs=qcschema_blobs,
input_params={ 'numberOfFiles': len(qcschema_data), "inputFiles": list(qcschema_blobs.keys()), **input_params },
content_type=kwargs.pop('content_type', self.content_type),
encoding=kwargs.pop('encoding', self.encoding),
provider_id=self.provider_id,
input_data_format=kwargs.pop('input_data_format', 'microsoft.qc-schema.v1'),
output_data_format=kwargs.pop('output_data_format', self.output_data_format),
session_id=self.get_latest_session_id(),
**kwargs
)
else:
return super().submit(
input_data=input_data,
name=name,
shots=shots,
input_params=input_params,
**kwargs
)



@classmethod
def _assemble_qcshema_from_files(self, input_data: List[str], input_params: Dict) -> str:
"""
Convert a list of files to a list of qcshema objects serialized in json.
"""

qcshema_objects = []
for file in input_data:
file_path = Path(file)
if not file_path.exists():
raise FileNotFoundError(f"File {file} does not exist.")

file_data = file_path.read_text()
if file_path.suffix == '.xyz':
mol = self._xyz_to_qcschema_mol(file_data)
new_qcschema = self._new_qcshema( input_params, mol )
qcshema_objects.append(new_qcschema)
elif file_path.suffix == '.json':
if input_params is not None and len(input_params.keys()) > 0:
warnings.warn('Input parameters were given along with a QcSchema file which contains parameters, using QcSchema parameters as is.')
with open(file_path, 'r') as f:
qcshema_objects.append( json.load(f) )
else:
raise ValueError(f"File type '{file_path.suffix}' for file '{file_path}' is not supported. Please use xyz or QcSchema file formats.")

return qcshema_objects

@classmethod
def _new_qcshema( self, input_params: Dict[str,Any], mol: Dict[str,Any], ) -> Dict[str, Any]:
"""
Create a new default qcshema object.
"""

if input_params.get("driver") == "go":
copy_input_params = copy.deepcopy(input_params)
copy_input_params["driver"] = "gradient"
new_object = {
"schema_name": "qcschema_optimization_input",
"schema_version": 1,
"initial_molecule": mol,
}
if copy_input_params.get("keywords") and copy_input_params["keywords"].get("geometryOptimization"):
new_object["keywords"] = copy_input_params["keywords"].pop("geometryOptimization")
new_object["input_specification"] = copy_input_params
return new_object
elif input_params.get("driver") == "bomd":
copy_input_params = copy.deepcopy(input_params)
copy_input_params["driver"] = "gradient"
new_object = {
"schema_name": "madft_molecular_dynamics_input",
"schema_version": 1,
"initial_molecule": mol,
}
if copy_input_params.get("keywords") and copy_input_params["keywords"].get("molecularDynamics"):
new_object["keywords"] = copy_input_params["keywords"].pop("molecularDynamics")
new_object["input_specification"] = copy_input_params
return new_object
else:
new_object = copy.deepcopy(input_params)
new_object.update({
"schema_name": "qcschema_input",
"schema_version": 1,
"molecule": mol,
})
return new_object


@classmethod
def _xyz_to_qcschema_mol(self, file_data: str ) -> Dict[str, Any]:
"""
Convert xyz format to qcschema molecule.
"""

lines = file_data.split("\n")
if len(lines) < 3:
raise ValueError("Invalid xyz format.")
n_atoms = int(lines.pop(0))
comment = lines.pop(0)
mol = {
"geometry": [],
"symbols": [],
}
for line in lines:
if line:
elements = line.split()
if len(elements) < 4:
raise ValueError("Invalid xyz format.")
symbol, x, y, z = elements
mol["symbols"].append(symbol)
mol["geometry"] += [float(x), float(y), float(z)]
else:
break

if len(mol["symbols"]) != n_atoms:
raise ValueError("Number of inputs does not match the number of atoms in xyz file.")

return mol

@classmethod
def _get_job_class(cls) -> Type[Job]:
return MicrosoftElementsDftJob

@classmethod
def _create_table_of_contents(cls, input_files: List[str], input_blobs: List[str]) -> Dict[str,Any]:
"""Create the table of contents for a batched job that contains a description of file and the mapping between the file names and the blob names"""

assert len(input_files) == len(input_blobs), "Internal error: number of blobs is not that same as the number of files."

toc = []
for i in range(len(input_files)):
toc.append(
{
"inputFileName": input_files[i],
"qcschemaBlobName": input_blobs[i],
}
)

return {
"description": "This files contains the mapping between the xyz file name that were submitted and the qcschema blobs that are used for the calculation.",
"tableOfContents": toc,
}
1 change: 1 addition & 0 deletions azure-quantum/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ dependencies:
- python=3.9
- pip>=22.3.1
- pytest>=7.1.2
- pytest-regressions
- pip:
- -e .[all]
Loading

0 comments on commit 20dc5cb

Please sign in to comment.