Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the cutting workflow work with any Hashable as label #410

Merged
merged 3 commits into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions circuit_knitting/cutting/cutting_decomposition.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@
class PartitionedCuttingProblem(NamedTuple):
"""The result of decomposing and separating a circuit and observable(s)."""

subcircuits: dict[str | int, QuantumCircuit]
subcircuits: dict[Hashable, QuantumCircuit]
bases: list[QPDBasis]
subobservables: dict[str | int, PauliList] | None = None
subobservables: dict[Hashable, PauliList] | None = None


def partition_circuit_qubits(
Expand Down Expand Up @@ -162,7 +162,7 @@ def cut_gates(

def partition_problem(
circuit: QuantumCircuit,
partition_labels: Sequence[str | int] | None = None,
partition_labels: Sequence[Hashable] | None = None,
observables: PauliList | None = None,
) -> PartitionedCuttingProblem:
r"""
Expand Down Expand Up @@ -258,8 +258,8 @@ def partition_problem(


def decompose_observables(
observables: PauliList, partition_labels: Sequence[str | int]
) -> dict[str | int, PauliList]:
observables: PauliList, partition_labels: Sequence[Hashable]
) -> dict[Hashable, PauliList]:
"""
Decompose a list of observables with respect to some qubit partition labels.

Expand Down
18 changes: 9 additions & 9 deletions circuit_knitting/cutting/cutting_evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from __future__ import annotations

from typing import NamedTuple
from collections.abc import Sequence
from collections.abc import Sequence, Hashable

from qiskit.circuit import QuantumCircuit
from qiskit.quantum_info import PauliList
Expand All @@ -28,15 +28,15 @@
class CuttingExperimentResults(NamedTuple):
"""Circuit cutting subexperiment results and sampling coefficients."""

results: SamplerResult | dict[str | int, SamplerResult]
results: SamplerResult | dict[Hashable, SamplerResult]
coeffs: Sequence[tuple[float, WeightType]]


def execute_experiments(
circuits: QuantumCircuit | dict[str | int, QuantumCircuit],
subobservables: PauliList | dict[str | int, PauliList],
circuits: QuantumCircuit | dict[Hashable, QuantumCircuit],
subobservables: PauliList | dict[Hashable, PauliList],
num_samples: int,
samplers: BaseSampler | dict[str | int, BaseSampler],
samplers: BaseSampler | dict[Hashable, BaseSampler],
) -> CuttingExperimentResults:
r"""
Generate the sampled circuits, append the observables, and run the sub-experiments.
Expand Down Expand Up @@ -90,7 +90,7 @@ def execute_experiments(

if isinstance(samplers, dict):
# Ensure that each sampler is unique
collision_dict: dict[int, str | int] = {}
collision_dict: dict[int, Hashable] = {}
for k, v in samplers.items():
if id(v) in collision_dict:
raise ValueError(
Expand All @@ -109,7 +109,7 @@ def execute_experiments(
)

# Set up subexperiments and samplers
subexperiments_dict: dict[str | int, list[QuantumCircuit]] = {}
subexperiments_dict: dict[Hashable, list[QuantumCircuit]] = {}
if isinstance(subexperiments, list):
subexperiments_dict = {"A": subexperiments}
else:
Expand All @@ -124,7 +124,7 @@ def execute_experiments(
# Make sure the first two cregs in each circuit are for QPD and observable measurements
# Run a job for each partition and collect results
results = {}
for label in sorted(subexperiments_dict.keys()):
for label in subexperiments_dict.keys():
for circ in subexperiments_dict[label]:
if (
len(circ.cregs) != 2
Expand Down Expand Up @@ -152,7 +152,7 @@ def execute_experiments(
return CuttingExperimentResults(results=results_out, coeffs=coefficients)


def _validate_samplers(samplers: BaseSampler | dict[str | int, BaseSampler]) -> None:
def _validate_samplers(samplers: BaseSampler | dict[Hashable, BaseSampler]) -> None:
"""Replace unsupported statevector-based Samplers with ExactSampler."""
if isinstance(samplers, BaseSampler):
if (
Expand Down
62 changes: 30 additions & 32 deletions circuit_knitting/cutting/cutting_experiments.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from __future__ import annotations

from collections import defaultdict
from collections.abc import Sequence
from collections.abc import Sequence, Hashable

import numpy as np
from qiskit.circuit import QuantumCircuit, ClassicalRegister
Expand All @@ -34,11 +34,11 @@


def generate_cutting_experiments(
circuits: QuantumCircuit | dict[str | int, QuantumCircuit],
observables: PauliList | dict[str | int, PauliList],
circuits: QuantumCircuit | dict[Hashable, QuantumCircuit],
observables: PauliList | dict[Hashable, PauliList],
num_samples: int | float,
) -> tuple[
list[QuantumCircuit] | dict[str | int, list[QuantumCircuit]],
list[QuantumCircuit] | dict[Hashable, list[QuantumCircuit]],
list[tuple[float, WeightType]],
]:
r"""
Expand Down Expand Up @@ -98,7 +98,7 @@ def generate_cutting_experiments(
# can be shared between both cases.
if isinstance(circuits, QuantumCircuit):
is_separated = False
subcircuit_list = [circuits]
subcircuit_dict: dict[Hashable, QuantumCircuit] = {"A": circuits}
subobservables_by_subsystem = decompose_observables(
observables, "A" * len(observables[0])
)
Expand All @@ -108,16 +108,16 @@ def generate_cutting_experiments(
}
# Gather the unique bases from the circuit
bases, qpd_gate_ids = _get_bases(circuits)
subcirc_qpd_gate_ids = [qpd_gate_ids]
subcirc_qpd_gate_ids: dict[Hashable, list[list[int]]] = {"A": qpd_gate_ids}

else:
is_separated = True
subcircuit_list = [circuits[key] for key in sorted(circuits.keys())]
subcircuit_dict = circuits
# Gather the unique bases across the subcircuits
subcirc_qpd_gate_ids, subcirc_map_ids = _get_mapping_ids_by_partition(
subcircuit_list
subcircuit_dict
)
bases = _get_bases_by_partition(subcircuit_list, subcirc_qpd_gate_ids)
bases = _get_bases_by_partition(subcircuit_dict, subcirc_qpd_gate_ids)

# Create the commuting observable groups
subsystem_observables = {
Expand All @@ -135,7 +135,7 @@ def generate_cutting_experiments(
sorted_samples = sorted(random_samples.items(), key=lambda x: x[1][0], reverse=True)

# Generate the output experiments and weights
subexperiments_dict: dict[str | int, list[QuantumCircuit]] = defaultdict(list)
subexperiments_dict: dict[Hashable, list[QuantumCircuit]] = defaultdict(list)
weights: list[tuple[float, WeightType]] = []
for z, (map_ids, (redundancy, weight_type)) in enumerate(sorted_samples):
actual_coeff = np.prod(
Expand All @@ -144,22 +144,20 @@ def generate_cutting_experiments(
sampled_coeff = (redundancy / num_samples) * (kappa * np.sign(actual_coeff))
weights.append((sampled_coeff, weight_type))
map_ids_tmp = map_ids
for i, (subcircuit, label) in enumerate(
strict_zip(subcircuit_list, sorted(subsystem_observables.keys()))
):
for label, so in subsystem_observables.items():
subcircuit = subcircuit_dict[label]
if is_separated:
map_ids_tmp = tuple(map_ids[j] for j in subcirc_map_ids[i])
map_ids_tmp = tuple(map_ids[j] for j in subcirc_map_ids[label])
decomp_qc = decompose_qpd_instructions(
subcircuit, subcirc_qpd_gate_ids[i], map_ids_tmp
subcircuit, subcirc_qpd_gate_ids[label], map_ids_tmp
)
so = subsystem_observables[label]
for j, cog in enumerate(so.groups):
meas_qc = _append_measurement_circuit(decomp_qc, cog)
subexperiments_dict[label].append(meas_qc)

# If the input was a single quantum circuit, return the subexperiments as a list
subexperiments_out: list[QuantumCircuit] | dict[
str | int, list[QuantumCircuit]
Hashable, list[QuantumCircuit]
] = dict(subexperiments_dict)
assert isinstance(subexperiments_out, dict)
if isinstance(circuits, QuantumCircuit):
Expand All @@ -170,16 +168,16 @@ def generate_cutting_experiments(


def _get_mapping_ids_by_partition(
circuits: Sequence[QuantumCircuit],
) -> tuple[list[list[list[int]]], list[list[int]]]:
circuits: dict[Hashable, QuantumCircuit],
) -> tuple[dict[Hashable, list[list[int]]], dict[Hashable, list[int]]]:
"""Get indices to the QPD gates in each subcircuit and relevant map ids."""
# Collect QPDGate id's and relevant map id's for each subcircuit
subcirc_qpd_gate_ids: list[list[list[int]]] = []
subcirc_map_ids: list[list[int]] = []
subcirc_qpd_gate_ids: dict[Hashable, list[list[int]]] = {}
subcirc_map_ids: dict[Hashable, list[int]] = {}
decomp_ids = set()
for circ in circuits:
subcirc_qpd_gate_ids.append([])
subcirc_map_ids.append([])
for label, circ in circuits.items():
subcirc_qpd_gate_ids[label] = []
subcirc_map_ids[label] = []
for i, inst in enumerate(circ.data):
if isinstance(inst.operation, SingleQubitQPDGate):
try:
Expand All @@ -194,24 +192,24 @@ def _get_mapping_ids_by_partition(
"belonging to the same cut to be sampled jointly."
)
decomp_ids.add(decomp_id)
subcirc_qpd_gate_ids[-1].append([i])
subcirc_map_ids[-1].append(decomp_id)
subcirc_qpd_gate_ids[label].append([i])
subcirc_map_ids[label].append(decomp_id)

return subcirc_qpd_gate_ids, subcirc_map_ids


def _get_bases_by_partition(
circuits: Sequence[QuantumCircuit], subcirc_qpd_gate_ids: list[list[list[int]]]
circuits: dict[Hashable, QuantumCircuit],
subcirc_qpd_gate_ids: dict[Hashable, list[list[int]]],
) -> list[QPDBasis]:
"""Get a list of each unique QPD basis across the subcircuits."""
# Collect the bases corresponding to each decomposed operation
bases_dict = {}
for i, subcirc in enumerate(subcirc_qpd_gate_ids):
for label, subcirc in subcirc_qpd_gate_ids.items():
circuit = circuits[label]
for basis_id in subcirc:
decomp_id = int(
circuits[i].data[basis_id[0]].operation.label.split("_")[-1]
)
bases_dict[decomp_id] = circuits[i].data[basis_id[0]].operation.basis
decomp_id = int(circuit.data[basis_id[0]].operation.label.split("_")[-1])
bases_dict[decomp_id] = circuit.data[basis_id[0]].operation.basis
bases = [bases_dict[key] for key in sorted(bases_dict.keys())]

return bases
Expand Down
12 changes: 5 additions & 7 deletions circuit_knitting/cutting/cutting_reconstruction.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from __future__ import annotations

from collections.abc import Sequence
from collections.abc import Sequence, Hashable

import numpy as np
from qiskit.quantum_info import PauliList
Expand All @@ -26,9 +26,9 @@


def reconstruct_expectation_values(
results: SamplerResult | dict[str | int, SamplerResult],
results: SamplerResult | dict[Hashable, SamplerResult],
coefficients: Sequence[tuple[float, WeightType]],
observables: PauliList | dict[str | int, PauliList],
observables: PauliList | dict[Hashable, PauliList],
) -> list[float]:
r"""
Reconstruct an expectation value from the results of the sub-experiments.
Expand Down Expand Up @@ -85,7 +85,7 @@ def reconstruct_expectation_values(
subobservables_by_subsystem = decompose_observables(
observables, "A" * len(observables[0])
)
results_dict: dict[str | int, SamplerResult] = {"A": results}
results_dict: dict[Hashable, SamplerResult] = {"A": results}
expvals = np.zeros(len(observables))

else:
Expand All @@ -100,13 +100,11 @@ def reconstruct_expectation_values(
label: ObservableCollection(subobservables)
for label, subobservables in subobservables_by_subsystem.items()
}
sorted_subsystems = sorted(subsystem_observables.keys())

# Reconstruct the expectation values
for i, coeff in enumerate(coefficients):
current_expvals = np.ones((len(expvals),))
for label in sorted_subsystems:
so = subsystem_observables[label]
for label, so in subsystem_observables.items():
subsystem_expvals = [
np.zeros(len(cog.commuting_observables)) for cog in so.groups
]
Expand Down
53 changes: 52 additions & 1 deletion test/cutting/test_cutting_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,22 @@

"""Tests of various cutting workflows, particularly with regard to flexibility in transpilation."""

import pytest
from copy import deepcopy

from qiskit.circuit.library import EfficientSU2, CXGate
from qiskit.quantum_info import PauliList
from qiskit.transpiler.preset_passmanagers import generate_preset_pass_manager
from qiskit.providers.fake_provider import FakeLagosV2
from qiskit_aer.primitives import Sampler

from circuit_knitting.cutting.qpd.instructions import SingleQubitQPDGate
from circuit_knitting.cutting.qpd import QPDBasis
from circuit_knitting.cutting import partition_problem
from circuit_knitting.cutting import (
partition_problem,
generate_cutting_experiments,
reconstruct_expectation_values,
)


def test_transpile_before_realizing_basis_id():
Expand Down Expand Up @@ -48,3 +54,48 @@ def test_transpile_before_realizing_basis_id():
label: pass_manager.run(subcircuits["A"])
for label, circuit in subcircuits.items()
}


@pytest.mark.parametrize(
"label1,label2",
[
("foo", frozenset({1, 2, 4})),
(42, "bar"),
],
)
def test_exotic_labels(label1, label2):
"""Test workflow with labels of non-uniform type."""
circuit = EfficientSU2(4, entanglement="linear", reps=2).decompose()
circuit.assign_parameters([0.8] * len(circuit.parameters), inplace=True)
observables = PauliList(["ZZII", "IZZI", "IIZZ", "XIXI", "ZIZZ", "IXIX"])
subcircuits, bases, subobservables = partition_problem(
circuit=circuit,
partition_labels=[label1, label1, label2, label2],
observables=observables,
)
assert set(subcircuits.keys()) == {label1, label2}

subexperiments, coefficients = generate_cutting_experiments(
subcircuits, subobservables, num_samples=1500
)
assert subexperiments.keys() == subcircuits.keys()

samplers = {
label1: Sampler(run_options={"shots": 10}),
label2: Sampler(run_options={"shots": 10}),
}
results = {
label: sampler.run(subexperiments[label]).result()
for label, sampler in samplers.items()
}

for label in results:
for i, subexperiment in enumerate(subexperiments[label]):
results[label].metadata[i]["num_qpd_bits"] = len(subexperiment.cregs[0])

simulated_expvals = reconstruct_expectation_values(
results,
coefficients,
subobservables,
)
assert len(simulated_expvals) == len(observables)