From fb9259245cb96cc4b7be974773ba2e4a4dc3dc63 Mon Sep 17 00:00:00 2001 From: Jim Garrison Date: Fri, 8 Sep 2023 23:12:05 -0400 Subject: [PATCH] Make the cutting workflow work with any `Hashable` as label Fixes #390 --- .../cutting/cutting_decomposition.py | 10 +-- .../cutting/cutting_evaluation.py | 18 +++--- .../cutting/cutting_experiments.py | 62 +++++++++---------- .../cutting/cutting_reconstruction.py | 12 ++-- 4 files changed, 49 insertions(+), 53 deletions(-) diff --git a/circuit_knitting/cutting/cutting_decomposition.py b/circuit_knitting/cutting/cutting_decomposition.py index 9e367c3f1..c0ef9b986 100644 --- a/circuit_knitting/cutting/cutting_decomposition.py +++ b/circuit_knitting/cutting/cutting_decomposition.py @@ -34,9 +34,9 @@ class PartitionedCuttingProblem(NamedTuple): """The result of decomposing and separating a circuit and observable(s).""" - subcircuits: dict[str | int, QuantumCircuit] + subcircuits: dict[Hashable, QuantumCircuit] bases: list[QPDBasis] - subobservables: dict[str | int, PauliList] | None = None + subobservables: dict[Hashable, PauliList] | None = None def partition_circuit_qubits( @@ -162,7 +162,7 @@ def cut_gates( def partition_problem( circuit: QuantumCircuit, - partition_labels: Sequence[str | int] | None = None, + partition_labels: Sequence[Hashable] | None = None, observables: PauliList | None = None, ) -> PartitionedCuttingProblem: r""" @@ -258,8 +258,8 @@ def partition_problem( def decompose_observables( - observables: PauliList, partition_labels: Sequence[str | int] -) -> dict[str | int, PauliList]: + observables: PauliList, partition_labels: Sequence[Hashable] +) -> dict[Hashable, PauliList]: """ Decompose a list of observables with respect to some qubit partition labels. diff --git a/circuit_knitting/cutting/cutting_evaluation.py b/circuit_knitting/cutting/cutting_evaluation.py index 589869263..5038c6faa 100644 --- a/circuit_knitting/cutting/cutting_evaluation.py +++ b/circuit_knitting/cutting/cutting_evaluation.py @@ -14,7 +14,7 @@ from __future__ import annotations from typing import NamedTuple -from collections.abc import Sequence +from collections.abc import Sequence, Hashable from qiskit.circuit import QuantumCircuit from qiskit.quantum_info import PauliList @@ -28,15 +28,15 @@ class CuttingExperimentResults(NamedTuple): """Circuit cutting subexperiment results and sampling coefficients.""" - results: SamplerResult | dict[str | int, SamplerResult] + results: SamplerResult | dict[Hashable, SamplerResult] coeffs: Sequence[tuple[float, WeightType]] def execute_experiments( - circuits: QuantumCircuit | dict[str | int, QuantumCircuit], - subobservables: PauliList | dict[str | int, PauliList], + circuits: QuantumCircuit | dict[Hashable, QuantumCircuit], + subobservables: PauliList | dict[Hashable, PauliList], num_samples: int, - samplers: BaseSampler | dict[str | int, BaseSampler], + samplers: BaseSampler | dict[Hashable, BaseSampler], ) -> CuttingExperimentResults: r""" Generate the sampled circuits, append the observables, and run the sub-experiments. @@ -90,7 +90,7 @@ def execute_experiments( if isinstance(samplers, dict): # Ensure that each sampler is unique - collision_dict: dict[int, str | int] = {} + collision_dict: dict[int, Hashable] = {} for k, v in samplers.items(): if id(v) in collision_dict: raise ValueError( @@ -109,7 +109,7 @@ def execute_experiments( ) # Set up subexperiments and samplers - subexperiments_dict: dict[str | int, list[QuantumCircuit]] = {} + subexperiments_dict: dict[Hashable, list[QuantumCircuit]] = {} if isinstance(subexperiments, list): subexperiments_dict = {"A": subexperiments} else: @@ -124,7 +124,7 @@ def execute_experiments( # Make sure the first two cregs in each circuit are for QPD and observable measurements # Run a job for each partition and collect results results = {} - for label in sorted(subexperiments_dict.keys()): + for label in subexperiments_dict.keys(): for circ in subexperiments_dict[label]: if ( len(circ.cregs) != 2 @@ -152,7 +152,7 @@ def execute_experiments( return CuttingExperimentResults(results=results_out, coeffs=coefficients) -def _validate_samplers(samplers: BaseSampler | dict[str | int, BaseSampler]) -> None: +def _validate_samplers(samplers: BaseSampler | dict[Hashable, BaseSampler]) -> None: """Replace unsupported statevector-based Samplers with ExactSampler.""" if isinstance(samplers, BaseSampler): if ( diff --git a/circuit_knitting/cutting/cutting_experiments.py b/circuit_knitting/cutting/cutting_experiments.py index 053c49f36..64c9450b8 100644 --- a/circuit_knitting/cutting/cutting_experiments.py +++ b/circuit_knitting/cutting/cutting_experiments.py @@ -14,7 +14,7 @@ from __future__ import annotations from collections import defaultdict -from collections.abc import Sequence +from collections.abc import Sequence, Hashable import numpy as np from qiskit.circuit import QuantumCircuit, ClassicalRegister @@ -34,11 +34,11 @@ def generate_cutting_experiments( - circuits: QuantumCircuit | dict[str | int, QuantumCircuit], - observables: PauliList | dict[str | int, PauliList], + circuits: QuantumCircuit | dict[Hashable, QuantumCircuit], + observables: PauliList | dict[Hashable, PauliList], num_samples: int | float, ) -> tuple[ - list[QuantumCircuit] | dict[str | int, list[QuantumCircuit]], + list[QuantumCircuit] | dict[Hashable, list[QuantumCircuit]], list[tuple[float, WeightType]], ]: r""" @@ -98,7 +98,7 @@ def generate_cutting_experiments( # can be shared between both cases. if isinstance(circuits, QuantumCircuit): is_separated = False - subcircuit_list = [circuits] + subcircuit_dict: dict[Hashable, QuantumCircuit] = {"A": circuits} subobservables_by_subsystem = decompose_observables( observables, "A" * len(observables[0]) ) @@ -108,16 +108,16 @@ def generate_cutting_experiments( } # Gather the unique bases from the circuit bases, qpd_gate_ids = _get_bases(circuits) - subcirc_qpd_gate_ids = [qpd_gate_ids] + subcirc_qpd_gate_ids: dict[Hashable, list[list[int]]] = {"A": qpd_gate_ids} else: is_separated = True - subcircuit_list = [circuits[key] for key in sorted(circuits.keys())] + subcircuit_dict = circuits # Gather the unique bases across the subcircuits subcirc_qpd_gate_ids, subcirc_map_ids = _get_mapping_ids_by_partition( - subcircuit_list + subcircuit_dict ) - bases = _get_bases_by_partition(subcircuit_list, subcirc_qpd_gate_ids) + bases = _get_bases_by_partition(subcircuit_dict, subcirc_qpd_gate_ids) # Create the commuting observable groups subsystem_observables = { @@ -135,7 +135,7 @@ def generate_cutting_experiments( sorted_samples = sorted(random_samples.items(), key=lambda x: x[1][0], reverse=True) # Generate the output experiments and weights - subexperiments_dict: dict[str | int, list[QuantumCircuit]] = defaultdict(list) + subexperiments_dict: dict[Hashable, list[QuantumCircuit]] = defaultdict(list) weights: list[tuple[float, WeightType]] = [] for z, (map_ids, (redundancy, weight_type)) in enumerate(sorted_samples): actual_coeff = np.prod( @@ -144,22 +144,20 @@ def generate_cutting_experiments( sampled_coeff = (redundancy / num_samples) * (kappa * np.sign(actual_coeff)) weights.append((sampled_coeff, weight_type)) map_ids_tmp = map_ids - for i, (subcircuit, label) in enumerate( - strict_zip(subcircuit_list, sorted(subsystem_observables.keys())) - ): + for label, so in subsystem_observables.items(): + subcircuit = subcircuit_dict[label] if is_separated: - map_ids_tmp = tuple(map_ids[j] for j in subcirc_map_ids[i]) + map_ids_tmp = tuple(map_ids[j] for j in subcirc_map_ids[label]) decomp_qc = decompose_qpd_instructions( - subcircuit, subcirc_qpd_gate_ids[i], map_ids_tmp + subcircuit, subcirc_qpd_gate_ids[label], map_ids_tmp ) - so = subsystem_observables[label] for j, cog in enumerate(so.groups): meas_qc = _append_measurement_circuit(decomp_qc, cog) subexperiments_dict[label].append(meas_qc) # If the input was a single quantum circuit, return the subexperiments as a list subexperiments_out: list[QuantumCircuit] | dict[ - str | int, list[QuantumCircuit] + Hashable, list[QuantumCircuit] ] = dict(subexperiments_dict) assert isinstance(subexperiments_out, dict) if isinstance(circuits, QuantumCircuit): @@ -170,16 +168,16 @@ def generate_cutting_experiments( def _get_mapping_ids_by_partition( - circuits: Sequence[QuantumCircuit], -) -> tuple[list[list[list[int]]], list[list[int]]]: + circuits: dict[Hashable, QuantumCircuit], +) -> tuple[dict[Hashable, list[list[int]]], dict[Hashable, list[int]]]: """Get indices to the QPD gates in each subcircuit and relevant map ids.""" # Collect QPDGate id's and relevant map id's for each subcircuit - subcirc_qpd_gate_ids: list[list[list[int]]] = [] - subcirc_map_ids: list[list[int]] = [] + subcirc_qpd_gate_ids: dict[Hashable, list[list[int]]] = {} + subcirc_map_ids: dict[Hashable, list[int]] = {} decomp_ids = set() - for circ in circuits: - subcirc_qpd_gate_ids.append([]) - subcirc_map_ids.append([]) + for label, circ in circuits.items(): + subcirc_qpd_gate_ids[label] = [] + subcirc_map_ids[label] = [] for i, inst in enumerate(circ.data): if isinstance(inst.operation, SingleQubitQPDGate): try: @@ -194,24 +192,24 @@ def _get_mapping_ids_by_partition( "belonging to the same cut to be sampled jointly." ) decomp_ids.add(decomp_id) - subcirc_qpd_gate_ids[-1].append([i]) - subcirc_map_ids[-1].append(decomp_id) + subcirc_qpd_gate_ids[label].append([i]) + subcirc_map_ids[label].append(decomp_id) return subcirc_qpd_gate_ids, subcirc_map_ids def _get_bases_by_partition( - circuits: Sequence[QuantumCircuit], subcirc_qpd_gate_ids: list[list[list[int]]] + circuits: dict[Hashable, QuantumCircuit], + subcirc_qpd_gate_ids: dict[Hashable, list[list[int]]], ) -> list[QPDBasis]: """Get a list of each unique QPD basis across the subcircuits.""" # Collect the bases corresponding to each decomposed operation bases_dict = {} - for i, subcirc in enumerate(subcirc_qpd_gate_ids): + for label, subcirc in subcirc_qpd_gate_ids.items(): + circuit = circuits[label] for basis_id in subcirc: - decomp_id = int( - circuits[i].data[basis_id[0]].operation.label.split("_")[-1] - ) - bases_dict[decomp_id] = circuits[i].data[basis_id[0]].operation.basis + decomp_id = int(circuit.data[basis_id[0]].operation.label.split("_")[-1]) + bases_dict[decomp_id] = circuit.data[basis_id[0]].operation.basis bases = [bases_dict[key] for key in sorted(bases_dict.keys())] return bases diff --git a/circuit_knitting/cutting/cutting_reconstruction.py b/circuit_knitting/cutting/cutting_reconstruction.py index 9a01579b8..dbd297267 100644 --- a/circuit_knitting/cutting/cutting_reconstruction.py +++ b/circuit_knitting/cutting/cutting_reconstruction.py @@ -13,7 +13,7 @@ from __future__ import annotations -from collections.abc import Sequence +from collections.abc import Sequence, Hashable import numpy as np from qiskit.quantum_info import PauliList @@ -26,9 +26,9 @@ def reconstruct_expectation_values( - results: SamplerResult | dict[str | int, SamplerResult], + results: SamplerResult | dict[Hashable, SamplerResult], coefficients: Sequence[tuple[float, WeightType]], - observables: PauliList | dict[str | int, PauliList], + observables: PauliList | dict[Hashable, PauliList], ) -> list[float]: r""" Reconstruct an expectation value from the results of the sub-experiments. @@ -85,7 +85,7 @@ def reconstruct_expectation_values( subobservables_by_subsystem = decompose_observables( observables, "A" * len(observables[0]) ) - results_dict: dict[str | int, SamplerResult] = {"A": results} + results_dict: dict[Hashable, SamplerResult] = {"A": results} expvals = np.zeros(len(observables)) else: @@ -100,13 +100,11 @@ def reconstruct_expectation_values( label: ObservableCollection(subobservables) for label, subobservables in subobservables_by_subsystem.items() } - sorted_subsystems = sorted(subsystem_observables.keys()) # Reconstruct the expectation values for i, coeff in enumerate(coefficients): current_expvals = np.ones((len(expvals),)) - for label in sorted_subsystems: - so = subsystem_observables[label] + for label, so in subsystem_observables.items(): subsystem_expvals = [ np.zeros(len(cog.commuting_observables)) for cog in so.groups ]