Skip to content

Commit

Permalink
Make the cutting workflow work with any Hashable as label
Browse files Browse the repository at this point in the history
Fixes #390
  • Loading branch information
garrison committed Sep 9, 2023
1 parent f33317f commit fb92592
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 53 deletions.
10 changes: 5 additions & 5 deletions circuit_knitting/cutting/cutting_decomposition.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@
class PartitionedCuttingProblem(NamedTuple):
"""The result of decomposing and separating a circuit and observable(s)."""

subcircuits: dict[str | int, QuantumCircuit]
subcircuits: dict[Hashable, QuantumCircuit]
bases: list[QPDBasis]
subobservables: dict[str | int, PauliList] | None = None
subobservables: dict[Hashable, PauliList] | None = None


def partition_circuit_qubits(
Expand Down Expand Up @@ -162,7 +162,7 @@ def cut_gates(

def partition_problem(
circuit: QuantumCircuit,
partition_labels: Sequence[str | int] | None = None,
partition_labels: Sequence[Hashable] | None = None,
observables: PauliList | None = None,
) -> PartitionedCuttingProblem:
r"""
Expand Down Expand Up @@ -258,8 +258,8 @@ def partition_problem(


def decompose_observables(
observables: PauliList, partition_labels: Sequence[str | int]
) -> dict[str | int, PauliList]:
observables: PauliList, partition_labels: Sequence[Hashable]
) -> dict[Hashable, PauliList]:
"""
Decompose a list of observables with respect to some qubit partition labels.
Expand Down
18 changes: 9 additions & 9 deletions circuit_knitting/cutting/cutting_evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from __future__ import annotations

from typing import NamedTuple
from collections.abc import Sequence
from collections.abc import Sequence, Hashable

from qiskit.circuit import QuantumCircuit
from qiskit.quantum_info import PauliList
Expand All @@ -28,15 +28,15 @@
class CuttingExperimentResults(NamedTuple):
"""Circuit cutting subexperiment results and sampling coefficients."""

results: SamplerResult | dict[str | int, SamplerResult]
results: SamplerResult | dict[Hashable, SamplerResult]
coeffs: Sequence[tuple[float, WeightType]]


def execute_experiments(
circuits: QuantumCircuit | dict[str | int, QuantumCircuit],
subobservables: PauliList | dict[str | int, PauliList],
circuits: QuantumCircuit | dict[Hashable, QuantumCircuit],
subobservables: PauliList | dict[Hashable, PauliList],
num_samples: int,
samplers: BaseSampler | dict[str | int, BaseSampler],
samplers: BaseSampler | dict[Hashable, BaseSampler],
) -> CuttingExperimentResults:
r"""
Generate the sampled circuits, append the observables, and run the sub-experiments.
Expand Down Expand Up @@ -90,7 +90,7 @@ def execute_experiments(

if isinstance(samplers, dict):
# Ensure that each sampler is unique
collision_dict: dict[int, str | int] = {}
collision_dict: dict[int, Hashable] = {}
for k, v in samplers.items():
if id(v) in collision_dict:
raise ValueError(
Expand All @@ -109,7 +109,7 @@ def execute_experiments(
)

# Set up subexperiments and samplers
subexperiments_dict: dict[str | int, list[QuantumCircuit]] = {}
subexperiments_dict: dict[Hashable, list[QuantumCircuit]] = {}
if isinstance(subexperiments, list):
subexperiments_dict = {"A": subexperiments}
else:
Expand All @@ -124,7 +124,7 @@ def execute_experiments(
# Make sure the first two cregs in each circuit are for QPD and observable measurements
# Run a job for each partition and collect results
results = {}
for label in sorted(subexperiments_dict.keys()):
for label in subexperiments_dict.keys():
for circ in subexperiments_dict[label]:
if (
len(circ.cregs) != 2
Expand Down Expand Up @@ -152,7 +152,7 @@ def execute_experiments(
return CuttingExperimentResults(results=results_out, coeffs=coefficients)


def _validate_samplers(samplers: BaseSampler | dict[str | int, BaseSampler]) -> None:
def _validate_samplers(samplers: BaseSampler | dict[Hashable, BaseSampler]) -> None:
"""Replace unsupported statevector-based Samplers with ExactSampler."""
if isinstance(samplers, BaseSampler):
if (
Expand Down
62 changes: 30 additions & 32 deletions circuit_knitting/cutting/cutting_experiments.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from __future__ import annotations

from collections import defaultdict
from collections.abc import Sequence
from collections.abc import Sequence, Hashable

import numpy as np
from qiskit.circuit import QuantumCircuit, ClassicalRegister
Expand All @@ -34,11 +34,11 @@


def generate_cutting_experiments(
circuits: QuantumCircuit | dict[str | int, QuantumCircuit],
observables: PauliList | dict[str | int, PauliList],
circuits: QuantumCircuit | dict[Hashable, QuantumCircuit],
observables: PauliList | dict[Hashable, PauliList],
num_samples: int | float,
) -> tuple[
list[QuantumCircuit] | dict[str | int, list[QuantumCircuit]],
list[QuantumCircuit] | dict[Hashable, list[QuantumCircuit]],
list[tuple[float, WeightType]],
]:
r"""
Expand Down Expand Up @@ -98,7 +98,7 @@ def generate_cutting_experiments(
# can be shared between both cases.
if isinstance(circuits, QuantumCircuit):
is_separated = False
subcircuit_list = [circuits]
subcircuit_dict: dict[Hashable, QuantumCircuit] = {"A": circuits}
subobservables_by_subsystem = decompose_observables(
observables, "A" * len(observables[0])
)
Expand All @@ -108,16 +108,16 @@ def generate_cutting_experiments(
}
# Gather the unique bases from the circuit
bases, qpd_gate_ids = _get_bases(circuits)
subcirc_qpd_gate_ids = [qpd_gate_ids]
subcirc_qpd_gate_ids: dict[Hashable, list[list[int]]] = {"A": qpd_gate_ids}

else:
is_separated = True
subcircuit_list = [circuits[key] for key in sorted(circuits.keys())]
subcircuit_dict = circuits
# Gather the unique bases across the subcircuits
subcirc_qpd_gate_ids, subcirc_map_ids = _get_mapping_ids_by_partition(
subcircuit_list
subcircuit_dict
)
bases = _get_bases_by_partition(subcircuit_list, subcirc_qpd_gate_ids)
bases = _get_bases_by_partition(subcircuit_dict, subcirc_qpd_gate_ids)

# Create the commuting observable groups
subsystem_observables = {
Expand All @@ -135,7 +135,7 @@ def generate_cutting_experiments(
sorted_samples = sorted(random_samples.items(), key=lambda x: x[1][0], reverse=True)

# Generate the output experiments and weights
subexperiments_dict: dict[str | int, list[QuantumCircuit]] = defaultdict(list)
subexperiments_dict: dict[Hashable, list[QuantumCircuit]] = defaultdict(list)
weights: list[tuple[float, WeightType]] = []
for z, (map_ids, (redundancy, weight_type)) in enumerate(sorted_samples):
actual_coeff = np.prod(
Expand All @@ -144,22 +144,20 @@ def generate_cutting_experiments(
sampled_coeff = (redundancy / num_samples) * (kappa * np.sign(actual_coeff))
weights.append((sampled_coeff, weight_type))
map_ids_tmp = map_ids
for i, (subcircuit, label) in enumerate(
strict_zip(subcircuit_list, sorted(subsystem_observables.keys()))
):
for label, so in subsystem_observables.items():
subcircuit = subcircuit_dict[label]
if is_separated:
map_ids_tmp = tuple(map_ids[j] for j in subcirc_map_ids[i])
map_ids_tmp = tuple(map_ids[j] for j in subcirc_map_ids[label])
decomp_qc = decompose_qpd_instructions(
subcircuit, subcirc_qpd_gate_ids[i], map_ids_tmp
subcircuit, subcirc_qpd_gate_ids[label], map_ids_tmp
)
so = subsystem_observables[label]
for j, cog in enumerate(so.groups):
meas_qc = _append_measurement_circuit(decomp_qc, cog)
subexperiments_dict[label].append(meas_qc)

# If the input was a single quantum circuit, return the subexperiments as a list
subexperiments_out: list[QuantumCircuit] | dict[
str | int, list[QuantumCircuit]
Hashable, list[QuantumCircuit]
] = dict(subexperiments_dict)
assert isinstance(subexperiments_out, dict)
if isinstance(circuits, QuantumCircuit):
Expand All @@ -170,16 +168,16 @@ def generate_cutting_experiments(


def _get_mapping_ids_by_partition(
circuits: Sequence[QuantumCircuit],
) -> tuple[list[list[list[int]]], list[list[int]]]:
circuits: dict[Hashable, QuantumCircuit],
) -> tuple[dict[Hashable, list[list[int]]], dict[Hashable, list[int]]]:
"""Get indices to the QPD gates in each subcircuit and relevant map ids."""
# Collect QPDGate id's and relevant map id's for each subcircuit
subcirc_qpd_gate_ids: list[list[list[int]]] = []
subcirc_map_ids: list[list[int]] = []
subcirc_qpd_gate_ids: dict[Hashable, list[list[int]]] = {}
subcirc_map_ids: dict[Hashable, list[int]] = {}
decomp_ids = set()
for circ in circuits:
subcirc_qpd_gate_ids.append([])
subcirc_map_ids.append([])
for label, circ in circuits.items():
subcirc_qpd_gate_ids[label] = []
subcirc_map_ids[label] = []
for i, inst in enumerate(circ.data):
if isinstance(inst.operation, SingleQubitQPDGate):
try:
Expand All @@ -194,24 +192,24 @@ def _get_mapping_ids_by_partition(
"belonging to the same cut to be sampled jointly."
)
decomp_ids.add(decomp_id)
subcirc_qpd_gate_ids[-1].append([i])
subcirc_map_ids[-1].append(decomp_id)
subcirc_qpd_gate_ids[label].append([i])
subcirc_map_ids[label].append(decomp_id)

return subcirc_qpd_gate_ids, subcirc_map_ids


def _get_bases_by_partition(
circuits: Sequence[QuantumCircuit], subcirc_qpd_gate_ids: list[list[list[int]]]
circuits: dict[Hashable, QuantumCircuit],
subcirc_qpd_gate_ids: dict[Hashable, list[list[int]]],
) -> list[QPDBasis]:
"""Get a list of each unique QPD basis across the subcircuits."""
# Collect the bases corresponding to each decomposed operation
bases_dict = {}
for i, subcirc in enumerate(subcirc_qpd_gate_ids):
for label, subcirc in subcirc_qpd_gate_ids.items():
circuit = circuits[label]
for basis_id in subcirc:
decomp_id = int(
circuits[i].data[basis_id[0]].operation.label.split("_")[-1]
)
bases_dict[decomp_id] = circuits[i].data[basis_id[0]].operation.basis
decomp_id = int(circuit.data[basis_id[0]].operation.label.split("_")[-1])
bases_dict[decomp_id] = circuit.data[basis_id[0]].operation.basis
bases = [bases_dict[key] for key in sorted(bases_dict.keys())]

return bases
Expand Down
12 changes: 5 additions & 7 deletions circuit_knitting/cutting/cutting_reconstruction.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from __future__ import annotations

from collections.abc import Sequence
from collections.abc import Sequence, Hashable

import numpy as np
from qiskit.quantum_info import PauliList
Expand All @@ -26,9 +26,9 @@


def reconstruct_expectation_values(
results: SamplerResult | dict[str | int, SamplerResult],
results: SamplerResult | dict[Hashable, SamplerResult],
coefficients: Sequence[tuple[float, WeightType]],
observables: PauliList | dict[str | int, PauliList],
observables: PauliList | dict[Hashable, PauliList],
) -> list[float]:
r"""
Reconstruct an expectation value from the results of the sub-experiments.
Expand Down Expand Up @@ -85,7 +85,7 @@ def reconstruct_expectation_values(
subobservables_by_subsystem = decompose_observables(
observables, "A" * len(observables[0])
)
results_dict: dict[str | int, SamplerResult] = {"A": results}
results_dict: dict[Hashable, SamplerResult] = {"A": results}
expvals = np.zeros(len(observables))

else:
Expand All @@ -100,13 +100,11 @@ def reconstruct_expectation_values(
label: ObservableCollection(subobservables)
for label, subobservables in subobservables_by_subsystem.items()
}
sorted_subsystems = sorted(subsystem_observables.keys())

# Reconstruct the expectation values
for i, coeff in enumerate(coefficients):
current_expvals = np.ones((len(expvals),))
for label in sorted_subsystems:
so = subsystem_observables[label]
for label, so in subsystem_observables.items():
subsystem_expvals = [
np.zeros(len(cog.commuting_observables)) for cog in so.groups
]
Expand Down

0 comments on commit fb92592

Please sign in to comment.