From b23b4e00b1734428ca60b11e6b7f4e83f580fc01 Mon Sep 17 00:00:00 2001 From: Takashi Imamichi Date: Wed, 17 Jan 2024 19:09:35 +0900 Subject: [PATCH] Add Statevector-based BaseSamplerV2 implementation Co-authored-by: Ian Hincks --- qiskit/primitives/__init__.py | 1 + qiskit/primitives/base/base_sampler.py | 2 +- qiskit/primitives/statevector_sampler.py | 215 +++++++ .../primitives/test_statevector_sampler.py | 551 ++++++++++++++++++ 4 files changed, 768 insertions(+), 1 deletion(-) create mode 100644 qiskit/primitives/statevector_sampler.py create mode 100644 test/python/primitives/test_statevector_sampler.py diff --git a/qiskit/primitives/__init__.py b/qiskit/primitives/__init__.py index 2f54a3346d44..a33fba9870e4 100644 --- a/qiskit/primitives/__init__.py +++ b/qiskit/primitives/__init__.py @@ -62,3 +62,4 @@ from .containers import BindingsArray, ObservablesArray, PrimitiveResult, PubResult, SamplerPub from .estimator import Estimator from .sampler import Sampler +from .statevector_sampler import Sampler as StatevectorSampler diff --git a/qiskit/primitives/base/base_sampler.py b/qiskit/primitives/base/base_sampler.py index 9cf315e84fd8..afd9b44d1af5 100644 --- a/qiskit/primitives/base/base_sampler.py +++ b/qiskit/primitives/base/base_sampler.py @@ -278,7 +278,7 @@ class BaseSamplerV2: @abstractmethod def run( - self, pubs: Iterable[SamplerPubLike], shots: int | None = None + self, pubs: Iterable[SamplerPubLike], *, shots: int | None = None ) -> BasePrimitiveJob[PrimitiveResult[PubResult]]: """Run and collect samples from each pub. diff --git a/qiskit/primitives/statevector_sampler.py b/qiskit/primitives/statevector_sampler.py new file mode 100644 index 000000000000..052558448cc7 --- /dev/null +++ b/qiskit/primitives/statevector_sampler.py @@ -0,0 +1,215 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2023, 2024. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. +""" +Statevector Sampler class +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Iterable + +import numpy as np +from numpy.typing import NDArray + +from qiskit import ClassicalRegister, QiskitError, QuantumCircuit +from qiskit.circuit import ControlFlowOp +from qiskit.quantum_info import Statevector + +from .base import BaseSamplerV2 +from .base.validation import _has_measure +from .containers import ( + BitArray, + PrimitiveResult, + PubResult, + SamplerPub, + SamplerPubLike, + make_data_bin, +) +from .containers.bit_array import _min_num_bytes +from .primitive_job import PrimitiveJob +from .utils import bound_circuit_to_instruction + + +@dataclass +class _MeasureInfo: + creg_name: str + num_bits: int + num_bytes: int + qreg_indices: list[int] + + +class Sampler(BaseSamplerV2): + """ + Simple implementation of :class:`BaseSamplerV2` with Statevector. + """ + + _DEFAULT_SHOTS: int = 512 + + def __init__(self, *, seed: np.random.Generator | int | None = None): + """ + Args: + seed: The seed for random number generator. + """ + self._seed = seed + if isinstance(self._seed, np.random.Generator): + self._rng = self._seed + else: + self._rng = np.random.default_rng(self._seed) + + @property + def seed(self) -> np.random.Generator | int | None: + """Return the seed for random number generator. + + Returns: + np.random.Generator | int | None: The seed for random number generator. + """ + return self._seed + + def run( + self, pubs: Iterable[SamplerPubLike], shots: int | None = None + ) -> PrimitiveJob[PrimitiveResult[PubResult]]: + job: PrimitiveJob[PubResult] = PrimitiveJob(self._run, pubs, shots) + job._submit() + return job + + def _run( + self, pubs: Iterable[SamplerPubLike], shots: int | None = None + ) -> PrimitiveResult[PrimitiveResult[PubResult]]: + if shots is None: + shots = self._DEFAULT_SHOTS + coerced_pubs = [SamplerPub.coerce(pub, shots) for pub in pubs] + for pub in coerced_pubs: + pub.validate() + + results = [] + for pub in coerced_pubs: + circuit, qargs, meas_info = _preprocess_circuit(pub.circuit) + bound_circuits = pub.parameter_values.bind_all(circuit) + arrays = { + item.creg_name: np.zeros( + bound_circuits.shape + (pub.shots, item.num_bytes), dtype=np.uint8 + ) + for item in meas_info + } + for index in np.ndindex(*bound_circuits.shape): + bound_circuit = bound_circuits[index] + final_state = Statevector(bound_circuit_to_instruction(bound_circuit)) + final_state.seed(self._rng) + if qargs: + samples = final_state.sample_memory(shots=pub.shots, qargs=qargs) + else: + samples = [""] * pub.shots + samples_array = np.array( + [np.fromiter(sample, dtype=np.uint8) for sample in samples] + ) + for item in meas_info: + ary = _samples_to_packed_array(samples_array, item.num_bits, item.qreg_indices) + arrays[item.creg_name][index] = ary + + data_bin_cls = make_data_bin( + [(item.creg_name, BitArray) for item in meas_info], + shape=bound_circuits.shape, + ) + meas = { + item.creg_name: BitArray(arrays[item.creg_name], item.num_bits) + for item in meas_info + } + data_bin = data_bin_cls(**meas) + results.append(PubResult(data_bin, metadata={"shots": pub.shots})) + return PrimitiveResult(results) + + +def _preprocess_circuit(circuit: QuantumCircuit): + num_bits_dict = {creg.name: creg.size for creg in circuit.cregs} + mapping = _final_measurement_mapping(circuit) + qargs = sorted(set(mapping.values())) + qargs_index = {v: k for k, v in enumerate(qargs)} + circuit = circuit.remove_final_measurements(inplace=False) + if _has_control_flow(circuit): + raise QiskitError("StatevectorSampler cannot handle ControlFlowOp") + if _has_measure(circuit): + raise QiskitError("StatevectorSampler cannot handle mid-circuit measurements") + # num_qubits is used as sentinel to fill 0 in _samples_to_packed_array + sentinel = len(qargs) + indices = {key: [sentinel] * val for key, val in num_bits_dict.items()} + for key, qreg in mapping.items(): + creg, ind = key + indices[creg.name][ind] = qargs_index[qreg] + meas_info = [ + _MeasureInfo( + creg_name=name, + num_bits=num_bits, + num_bytes=_min_num_bytes(num_bits), + qreg_indices=indices[name], + ) + for name, num_bits in num_bits_dict.items() + ] + return circuit, qargs, meas_info + + +def _samples_to_packed_array( + samples: NDArray[np.uint8], num_bits: int, indices: list[int] +) -> NDArray[np.uint8]: + # samples of `Statevector.sample_memory` will be in the order of + # qubit_last, ..., qubit_1, qubit_0. + # reverse the sample order into qubit_0, qubit_1, ..., qubit_last and + # pad 0 in the rightmost to be used for the sentinel introduced by _preprocess_circuit. + ary = np.pad(samples[:, ::-1], ((0, 0), (0, 1)), constant_values=0) + # place samples in the order of clbit_last, ..., clbit_1, clbit_0 + ary = ary[:, indices[::-1]] + # pad 0 in the left to align the number to be mod 8 + # since np.packbits(bitorder='big') pads 0 to the right. + pad_size = -num_bits % 8 + ary = np.pad(ary, ((0, 0), (pad_size, 0)), constant_values=0) + # pack bits in big endian order + ary = np.packbits(ary, axis=-1) + return ary + + +def _final_measurement_mapping(circuit: QuantumCircuit) -> dict[tuple[ClassicalRegister, int], int]: + """Return the final measurement mapping for the circuit. + + Parameters: + circuit: Input quantum circuit. + + Returns: + Mapping of classical bits to qubits for final measurements. + """ + active_qubits = set(range(circuit.num_qubits)) + active_cbits = set(range(circuit.num_clbits)) + + # Find final measurements starting in back + mapping = {} + for item in circuit[::-1]: + if item.operation.name == "measure": + loc = circuit.find_bit(item.clbits[0]) + cbit = loc.index + creg = loc.registers[0] + qbit = circuit.find_bit(item.qubits[0]).index + if cbit in active_cbits and qbit in active_qubits: + mapping[creg] = qbit + active_cbits.remove(cbit) + elif item.operation.name not in ["barrier", "delay"]: + for qq in item.qubits: + _temp_qubit = circuit.find_bit(qq).index + if _temp_qubit in active_qubits: + active_qubits.remove(_temp_qubit) + + if not active_cbits or not active_qubits: + break + + return mapping + + +def _has_control_flow(circuit: QuantumCircuit) -> bool: + return any(isinstance(instruction.operation, ControlFlowOp) for instruction in circuit) diff --git a/test/python/primitives/test_statevector_sampler.py b/test/python/primitives/test_statevector_sampler.py new file mode 100644 index 000000000000..ea3384df18df --- /dev/null +++ b/test/python/primitives/test_statevector_sampler.py @@ -0,0 +1,551 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2023, 2024. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +"""Tests for Statevector Sampler.""" + +from __future__ import annotations + +import unittest +from dataclasses import astuple + +import numpy as np +from numpy.typing import NDArray + +from qiskit import ClassicalRegister, QiskitError, QuantumCircuit, QuantumRegister +from qiskit.circuit import Parameter +from qiskit.circuit.library import RealAmplitudes, UnitaryGate +from qiskit.primitives import PrimitiveResult, PubResult, SamplerPub +from qiskit.primitives.containers import BitArray +from qiskit.primitives.containers.data_bin import DataBin +from qiskit.primitives.statevector_sampler import Sampler +from qiskit.providers import JobStatus +from qiskit.test import QiskitTestCase + + +class TestStatevectorSampler(QiskitTestCase): + """Test for Statevector Sampler""" + + def setUp(self): + super().setUp() + self._shots = 10000 + self._seed = 123 + + self._cases = [] + hadamard = QuantumCircuit(1, 1, name="Hadamard") + hadamard.h(0) + hadamard.measure(0, 0) + self._cases.append((hadamard, None, {0: 5000, 1: 5000})) # case 0 + + bell = QuantumCircuit(2, name="Bell") + bell.h(0) + bell.cx(0, 1) + bell.measure_all() + self._cases.append((bell, None, {0: 5000, 3: 5000})) # case 1 + + pqc = RealAmplitudes(num_qubits=2, reps=2) + pqc.measure_all() + self._cases.append((pqc, [0] * 6, {0: 10000})) # case 2 + self._cases.append((pqc, [1] * 6, {0: 168, 1: 3389, 2: 470, 3: 5973})) # case 3 + self._cases.append((pqc, [0, 1, 1, 2, 3, 5], {0: 1339, 1: 3534, 2: 912, 3: 4215})) # case 4 + self._cases.append((pqc, [1, 2, 3, 4, 5, 6], {0: 634, 1: 291, 2: 6039, 3: 3036})) # case 5 + + pqc2 = RealAmplitudes(num_qubits=2, reps=3) + pqc2.measure_all() + self._cases.append( + (pqc2, [0, 1, 2, 3, 4, 5, 6, 7], {0: 1898, 1: 6864, 2: 928, 3: 311}) + ) # case 6 + + def _assert_allclose(self, bitarray: BitArray, target: NDArray | BitArray, rtol=1e-1): + self.assertEqual(bitarray.shape, target.shape) + for idx in np.ndindex(bitarray.shape): + int_counts = bitarray.get_int_counts(idx) + target_counts = ( + target.get_int_counts(idx) if isinstance(target, BitArray) else target[idx] + ) + max_key = max(max(int_counts.keys()), max(target_counts.keys())) + ary = np.array([int_counts.get(i, 0) for i in range(max_key + 1)]) + tgt = np.array([target_counts.get(i, 0) for i in range(max_key + 1)]) + np.testing.assert_allclose(ary, tgt, rtol=rtol, err_msg=f"index: {idx}") + + def test_sampler_run(self): + """Test Sampler.run().""" + bell, _, target = self._cases[1] + + with self.subTest("single"): + sampler = Sampler(seed=self._seed) + job = sampler.run([bell], shots=self._shots) + result = job.result() + self.assertIsInstance(result, PrimitiveResult) + self.assertIsInstance(result.metadata, dict) + self.assertEqual(len(result), 1) + self.assertIsInstance(result[0], PubResult) + self.assertIsInstance(result[0].data, DataBin) + self.assertIsInstance(result[0].data.meas, BitArray) + self._assert_allclose(result[0].data.meas, np.array(target)) + + with self.subTest("single with param"): + sampler = Sampler(seed=self._seed) + job = sampler.run([(bell, ())], shots=self._shots) + result = job.result() + self.assertIsInstance(result, PrimitiveResult) + self.assertIsInstance(result.metadata, dict) + self.assertEqual(len(result), 1) + self.assertIsInstance(result[0], PubResult) + self.assertIsInstance(result[0].data, DataBin) + self.assertIsInstance(result[0].data.meas, BitArray) + self._assert_allclose(result[0].data.meas, np.array(target)) + + with self.subTest("single array"): + sampler = Sampler(seed=self._seed) + job = sampler.run([(bell, [()])], shots=self._shots) + result = job.result() + self.assertIsInstance(result, PrimitiveResult) + self.assertIsInstance(result.metadata, dict) + self.assertEqual(len(result), 1) + self.assertIsInstance(result[0], PubResult) + self.assertIsInstance(result[0].data, DataBin) + self.assertIsInstance(result[0].data.meas, BitArray) + self._assert_allclose(result[0].data.meas, np.array([target])) + + with self.subTest("multiple"): + sampler = Sampler(seed=self._seed) + job = sampler.run([(bell, [(), (), ()])], shots=self._shots) + result = job.result() + self.assertIsInstance(result, PrimitiveResult) + self.assertIsInstance(result.metadata, dict) + self.assertEqual(len(result), 1) + self.assertIsInstance(result[0], PubResult) + self.assertIsInstance(result[0].data, DataBin) + self.assertIsInstance(result[0].data.meas, BitArray) + self._assert_allclose(result[0].data.meas, np.array([target, target, target])) + + def test_sample_run_multiple_circuits(self): + """Test Sampler.run() with multiple circuits.""" + bell, _, target = self._cases[1] + sampler = Sampler(seed=self._seed) + result = sampler.run([bell, bell, bell], shots=self._shots).result() + self.assertEqual(len(result), 3) + self._assert_allclose(result[0].data.meas, np.array(target)) + self._assert_allclose(result[1].data.meas, np.array(target)) + self._assert_allclose(result[2].data.meas, np.array(target)) + + def test_sampler_run_with_parameterized_circuits(self): + """Test Sampler.run() with parameterized circuits.""" + + pqc1, param1, target1 = self._cases[4] + pqc2, param2, target2 = self._cases[5] + pqc3, param3, target3 = self._cases[6] + + sampler = Sampler(seed=self._seed) + result = sampler.run( + [(pqc1, param1), (pqc2, param2), (pqc3, param3)], shots=self._shots + ).result() + self.assertEqual(len(result), 3) + self._assert_allclose(result[0].data.meas, np.array(target1)) + self._assert_allclose(result[1].data.meas, np.array(target2)) + self._assert_allclose(result[2].data.meas, np.array(target3)) + + def test_run_1qubit(self): + """test for 1-qubit cases""" + qc = QuantumCircuit(1) + qc.measure_all() + qc2 = QuantumCircuit(1) + qc2.x(0) + qc2.measure_all() + + sampler = Sampler(seed=self._seed) + result = sampler.run([qc, qc2], shots=self._shots).result() + self.assertEqual(len(result), 2) + for i in range(2): + self._assert_allclose(result[i].data.meas, np.array({i: self._shots})) + + def test_run_2qubit(self): + """test for 2-qubit cases""" + qc0 = QuantumCircuit(2) + qc0.measure_all() + qc1 = QuantumCircuit(2) + qc1.x(0) + qc1.measure_all() + qc2 = QuantumCircuit(2) + qc2.x(1) + qc2.measure_all() + qc3 = QuantumCircuit(2) + qc3.x([0, 1]) + qc3.measure_all() + + sampler = Sampler(seed=self._seed) + result = sampler.run([qc0, qc1, qc2, qc3], shots=self._shots).result() + self.assertEqual(len(result), 4) + for i in range(4): + self._assert_allclose(result[i].data.meas, np.array({i: self._shots})) + + def test_run_single_circuit(self): + """Test for single circuit case.""" + + with self.subTest("No parameter"): + circuit, _, target = self._cases[1] + param_target = [ + (None, np.array(target)), + ((), np.array(target)), + ([], np.array(target)), + (np.array([]), np.array(target)), + (((),), np.array([target])), + (([],), np.array([target])), + ([[]], np.array([target])), + ([()], np.array([target])), + (np.array([[]]), np.array([target])), + ] + for param, target in param_target: + with self.subTest(f"{circuit.name} w/ {param}"): + sampler = Sampler(seed=self._seed) + result = sampler.run([(circuit, param)], shots=self._shots).result() + self.assertEqual(len(result), 1) + self._assert_allclose(result[0].data.meas, target) + + with self.subTest("One parameter"): + circuit = QuantumCircuit(1, 1, name="X gate") + param = Parameter("x") + circuit.ry(param, 0) + circuit.measure(0, 0) + param_target = [ + ([np.pi], np.array({1: self._shots})), + ((np.pi,), np.array({1: self._shots})), + (np.array([np.pi]), np.array({1: self._shots})), + ([[np.pi]], np.array([{1: self._shots}])), + (((np.pi,),), np.array([{1: self._shots}])), + (np.array([[np.pi]]), np.array([{1: self._shots}])), + ] + for param, target in param_target: + with self.subTest(f"{circuit.name} w/ {param}"): + sampler = Sampler(seed=self._seed) + result = sampler.run([(circuit, param)], shots=self._shots).result() + self.assertEqual(len(result), 1) + self._assert_allclose(result[0].data.c, target) + + with self.subTest("More than one parameter"): + circuit, param, target = self._cases[3] + param_target = [ + (param, np.array(target)), + (tuple(param), np.array(target)), + (np.array(param), np.array(target)), + ((param,), np.array([target])), + ([param], np.array([target])), + (np.array([param]), np.array([target])), + ] + for param, target in param_target: + with self.subTest(f"{circuit.name} w/ {param}"): + sampler = Sampler(seed=self._seed) + result = sampler.run([(circuit, param)], shots=self._shots).result() + self.assertEqual(len(result), 1) + self._assert_allclose(result[0].data.meas, target) + + def test_run_reverse_meas_order(self): + """test for sampler with reverse measurement order""" + x = Parameter("x") + y = Parameter("y") + + qc = QuantumCircuit(3, 3) + qc.rx(x, 0) + qc.rx(y, 1) + qc.x(2) + qc.measure(0, 2) + qc.measure(1, 1) + qc.measure(2, 0) + + sampler = Sampler(seed=self._seed) + result = sampler.run([(qc, [0, 0]), (qc, [np.pi / 2, 0])], shots=self._shots).result() + self.assertEqual(len(result), 2) + + # qc({x: 0, y: 0}) + self._assert_allclose(result[0].data.c, np.array({1: self._shots})) + + # qc({x: pi/2, y: 0}) + self._assert_allclose(result[1].data.c, np.array({1: self._shots / 2, 5: self._shots / 2})) + + def test_run_errors(self): + """Test for errors with run method""" + qc1 = QuantumCircuit(1) + qc1.measure_all() + qc2 = RealAmplitudes(num_qubits=1, reps=1) + qc2.measure_all() + qc3 = QuantumCircuit(1) + qc4 = QuantumCircuit(1, 1) + with qc4.for_loop(range(5)): + qc4.h(0) + + sampler = Sampler() + with self.subTest("set parameter values to a non-parameterized circuit"): + with self.assertRaises(ValueError): + _ = sampler.run([(qc1, [1e2])]).result() + with self.subTest("missing all parameter values for a parameterized circuit"): + with self.assertRaises(ValueError): + _ = sampler.run([qc2]).result() + with self.assertRaises(ValueError): + _ = sampler.run([(qc2, [])]).result() + with self.assertRaises(ValueError): + _ = sampler.run([(qc2, None)]).result() + with self.subTest("missing some parameter values for a parameterized circuit"): + with self.assertRaises(ValueError): + _ = sampler.run([(qc2, [1e2])]).result() + with self.subTest("too many parameter values for a parameterized circuit"): + with self.assertRaises(ValueError): + _ = sampler.run([(qc2, [1e2] * 100)]).result() + with self.subTest("no classical bits"): + with self.assertRaises(ValueError): + _ = sampler.run([qc3]).result() + with self.subTest("with control flow"): + with self.assertRaises(QiskitError): + _ = sampler.run([qc4]).result() + + def test_run_empty_parameter(self): + """Test for empty parameter""" + n = 5 + qc = QuantumCircuit(n, n - 1) + qc.measure(range(n - 1), range(n - 1)) + sampler = Sampler(seed=self._seed) + with self.subTest("one circuit"): + result = sampler.run([qc], shots=self._shots).result() + self.assertEqual(len(result), 1) + self._assert_allclose(result[0].data.c, np.array({0: self._shots})) + + with self.subTest("two circuits"): + result = sampler.run([qc, qc], shots=self._shots).result() + self.assertEqual(len(result), 2) + for i in range(2): + self._assert_allclose(result[i].data.c, np.array({0: self._shots})) + + def test_run_numpy_params(self): + """Test for numpy array as parameter values""" + qc = RealAmplitudes(num_qubits=2, reps=2) + qc.measure_all() + k = 5 + params_array = np.linspace(0, 1, k * qc.num_parameters).reshape((k, qc.num_parameters)) + params_list = params_array.tolist() + sampler = Sampler(seed=self._seed) + target = sampler.run([(qc, params_list)], shots=self._shots).result() + + with self.subTest("ndarray"): + sampler = Sampler(seed=self._seed) + result = sampler.run([(qc, params_array)], shots=self._shots).result() + self.assertEqual(len(result), 1) + self._assert_allclose(result[0].data.meas, target[0].data.meas) + + with self.subTest("split a list"): + sampler = Sampler(seed=self._seed) + result = sampler.run( + [(qc, params) for params in params_list], shots=self._shots + ).result() + self.assertEqual(len(result), k) + for i in range(k): + self._assert_allclose( + result[i].data.meas, np.array(target[0].data.meas.get_int_counts(i)) + ) + + def test_run_with_shots_option(self): + """test with shots option.""" + bell, _, _ = self._cases[1] + shots = 100 + + with self.subTest("run arg"): + sampler = Sampler(seed=self._seed) + result = sampler.run([bell], shots=shots).result() + self.assertEqual(len(result), 1) + self.assertEqual(result[0].data.meas.num_shots, shots) + self.assertEqual(sum(result[0].data.meas.get_counts().values()), shots) + self.assertIn("shots", result[0].metadata) + self.assertEqual(result[0].metadata["shots"], shots) + + with self.subTest("default shots"): + default_shots = 512 + sampler = Sampler(seed=self._seed) + result = sampler.run([bell]).result() + self.assertEqual(len(result), 1) + self.assertEqual(result[0].data.meas.num_shots, default_shots) + self.assertEqual(sum(result[0].data.meas.get_counts().values()), default_shots) + self.assertIn("shots", result[0].metadata) + self.assertEqual(result[0].metadata["shots"], default_shots) + + with self.subTest("pub-like"): + sampler = Sampler(seed=self._seed) + result = sampler.run([(bell, None, shots)]).result() + self.assertEqual(len(result), 1) + self.assertEqual(result[0].data.meas.num_shots, shots) + self.assertEqual(sum(result[0].data.meas.get_counts().values()), shots) + self.assertIn("shots", result[0].metadata) + self.assertEqual(result[0].metadata["shots"], shots) + + with self.subTest("pub"): + sampler = Sampler(seed=self._seed) + result = sampler.run([SamplerPub(bell, shots=shots)]).result() + self.assertEqual(len(result), 1) + self.assertEqual(result[0].data.meas.num_shots, shots) + self.assertEqual(sum(result[0].data.meas.get_counts().values()), shots) + self.assertIn("shots", result[0].metadata) + self.assertEqual(result[0].metadata["shots"], shots) + + with self.subTest("multiple pubs"): + sampler = Sampler(seed=self._seed) + shots1 = 100 + shots2 = 200 + result = sampler.run( + [ + SamplerPub(bell, shots=shots1), + SamplerPub(bell, shots=shots2), + ], + shots=self._shots, + ).result() + self.assertEqual(len(result), 2) + self.assertEqual(result[0].data.meas.num_shots, shots1) + self.assertEqual(sum(result[0].data.meas.get_counts().values()), shots1) + self.assertIn("shots", result[0].metadata) + self.assertEqual(result[0].metadata["shots"], shots1) + + self.assertEqual(result[1].data.meas.num_shots, shots2) + self.assertEqual(sum(result[1].data.meas.get_counts().values()), shots2) + self.assertIn("shots", result[1].metadata) + self.assertEqual(result[1].metadata["shots"], shots2) + + def test_run_shots_result_size(self): + """test with shots option to validate the result size""" + n = 10 + qc = QuantumCircuit(n) + qc.h(range(n)) + qc.measure_all() + sampler = Sampler(seed=self._seed) + result = sampler.run([qc], shots=self._shots).result() + self.assertEqual(len(result), 1) + self.assertLessEqual(result[0].data.meas.num_shots, self._shots) + self.assertEqual(sum(result[0].data.meas.get_counts().values()), self._shots) + + def test_primitive_job_status_done(self): + """test primitive job's status""" + bell, _, _ = self._cases[1] + sampler = Sampler(seed=self._seed) + job = sampler.run([bell], shots=self._shots) + _ = job.result() + self.assertEqual(job.status(), JobStatus.DONE) + + def test_seed(self): + """Test for seed options""" + with self.subTest("empty"): + sampler = Sampler() + self.assertIsNone(sampler.seed) + with self.subTest("set int"): + sampler = Sampler(seed=self._seed) + self.assertEqual(sampler.seed, self._seed) + with self.subTest("set generator"): + sampler = Sampler(seed=np.random.default_rng(self._seed)) + self.assertIsInstance(sampler.seed, np.random.Generator) + + def test_circuit_with_unitary(self): + """Test for circuit with unitary gate.""" + + with self.subTest("identity"): + gate = UnitaryGate(np.eye(2)) + + circuit = QuantumCircuit(1) + circuit.append(gate, [0]) + circuit.measure_all() + + sampler = Sampler(seed=self._seed) + result = sampler.run([circuit], shots=self._shots).result() + self.assertEqual(len(result), 1) + self._assert_allclose(result[0].data.meas, np.array({0: self._shots})) + + with self.subTest("X"): + gate = UnitaryGate([[0, 1], [1, 0]]) + + circuit = QuantumCircuit(1) + circuit.append(gate, [0]) + circuit.measure_all() + + sampler = Sampler(seed=self._seed) + result = sampler.run([circuit], shots=self._shots).result() + self.assertEqual(len(result), 1) + self._assert_allclose(result[0].data.meas, np.array({1: self._shots})) + + def test_circuit_with_multiple_cregs(self): + """Test for circuit with multiple classical registers.""" + cases = [] + + # case 1 + a = ClassicalRegister(1, "a") + b = ClassicalRegister(2, "b") + c = ClassicalRegister(3, "c") + + qc = QuantumCircuit(QuantumRegister(3), a, b, c) + qc.h(range(3)) + qc.measure([0, 1, 2, 2], [0, 2, 4, 5]) + target = {"a": {0: 5000, 1: 5000}, "b": {0: 5000, 2: 5000}, "c": {0: 5000, 6: 5000}} + cases.append(("use all cregs", qc, target)) + + # case 2 + a = ClassicalRegister(1, "a") + b = ClassicalRegister(5, "b") + c = ClassicalRegister(3, "c") + + qc = QuantumCircuit(QuantumRegister(3), a, b, c) + qc.h(range(3)) + qc.measure([0, 1, 2, 2], [0, 2, 4, 5]) + target = { + "a": {0: 5000, 1: 5000}, + "b": {0: 2500, 2: 2500, 24: 2500, 26: 2500}, + "c": {0: 10000}, + } + cases.append(("use only a and b", qc, target)) + + # case 3 + a = ClassicalRegister(1, "a") + b = ClassicalRegister(2, "b") + c = ClassicalRegister(3, "c") + + qc = QuantumCircuit(QuantumRegister(3), a, b, c) + qc.h(range(3)) + qc.measure(1, 5) + target = {"a": {0: 10000}, "b": {0: 10000}, "c": {0: 5000, 4: 5000}} + cases.append(("use only c", qc, target)) + + # case 4 + a = ClassicalRegister(1, "a") + b = ClassicalRegister(2, "b") + c = ClassicalRegister(3, "c") + + qc = QuantumCircuit(QuantumRegister(3), a, b, c) + qc.h(range(3)) + qc.measure([0, 1, 2], [5, 5, 5]) + target = {"a": {0: 10000}, "b": {0: 10000}, "c": {0: 5000, 4: 5000}} + cases.append(("use only c multiple qubits", qc, target)) + + # case 5 + a = ClassicalRegister(1, "a") + b = ClassicalRegister(2, "b") + c = ClassicalRegister(3, "c") + + qc = QuantumCircuit(QuantumRegister(3), a, b, c) + qc.h(range(3)) + target = {"a": {0: 10000}, "b": {0: 10000}, "c": {0: 10000}} + cases.append(("no measure", qc, target)) + + for title, qc, target in cases: + with self.subTest(title): + sampler = Sampler(seed=self._seed) + result = sampler.run([qc], shots=self._shots).result() + self.assertEqual(len(result), 1) + data = result[0].data + self.assertEqual(len(astuple(data)), 3) + for creg in qc.cregs: + self.assertTrue(hasattr(data, creg.name)) + self._assert_allclose(getattr(data, creg.name), np.array(target[creg.name])) + + +if __name__ == "__main__": + unittest.main()