From 6437a4f6bc9309969e1b4647e98fa4708034289a Mon Sep 17 00:00:00 2001 From: Andrew Lytle Date: Tue, 5 Dec 2023 11:11:00 -0600 Subject: [PATCH] Reformat with black --- src/dense_ev/estimator_from_aer.py | 177 ++++++++++++++++++----------- src/dense_ev/rmatrix.py | 2 + src/dense_ev/test_estimator.py | 37 +++--- 3 files changed, 134 insertions(+), 82 deletions(-) diff --git a/src/dense_ev/estimator_from_aer.py b/src/dense_ev/estimator_from_aer.py index 0da3ca4..20f0848 100644 --- a/src/dense_ev/estimator_from_aer.py +++ b/src/dense_ev/estimator_from_aer.py @@ -46,6 +46,7 @@ stream_handler.setFormatter(formatter) logger.addHandler(stream_handler) + class Estimator(BaseEstimator): """ Aer implmentation of Estimator. @@ -100,12 +101,14 @@ def __init__( If approximation is True, this parameter is ignored and assumed to be False. """ super().__init__(options=run_options) - #if abelian_grouping == "DENSE": + # if abelian_grouping == "DENSE": # m = 3 # self.PO = PauliOrganizer(m) backend_options = {} if backend_options is None else backend_options method = ( - "density_matrix" if approximation and "noise_model" in backend_options else "automatic" + "density_matrix" + if approximation and "noise_model" in backend_options + else "automatic" ) self._backend = AerSimulator(method=method) self._backend.set_options(**backend_options) @@ -164,9 +167,11 @@ def _run( observable_indices.append(index) else: observable_indices.append(len(self._observables)) - self._observable_ids[_observable_key(observable)] = len(self._observables) + self._observable_ids[_observable_key(observable)] = len( + self._observables + ) self._observables.append(observable) - #logger.debug(f"{self._circuits[0].num_qubits = }") + # logger.debug(f"{self._circuits[0].num_qubits = }") if self._abelian_grouping == "DENSE": self._m = self._circuits[0].num_qubits self.PO = PauliOrganizer(self._m) @@ -195,7 +200,9 @@ def _compute(self, circuits, observables, parameter_values, run_options): # Create expectation value experiments. if key in self._cache: # Use a cache experiments_dict, obs_maps = self._cache[key] - exp_map = self._pre_process_params(circuits, observables, parameter_values, obs_maps) + exp_map = self._pre_process_params( + circuits, observables, parameter_values, obs_maps + ) experiments, parameter_binds = self._flatten(experiments_dict, exp_map) post_processings = self._create_post_processing( circuits, observables, parameter_values, obs_maps, exp_map @@ -207,17 +214,21 @@ def _compute(self, circuits, observables, parameter_values, run_options): for circ_ind, obs_ind in zip(circuits, observables): circ_obs_map[circ_ind].append(obs_ind) experiments_dict = {} - obs_maps = {} # circ_ind => obs_ind => term_ind (Original Pauli) => basis_ind + obs_maps = ( + {} + ) # circ_ind => obs_ind => term_ind (Original Pauli) => basis_ind # Group and create measurement circuit for circ_ind, obs_indices in circ_obs_map.items(): pauli_list = sum( [self._observables[obs_ind].paulis for obs_ind in obs_indices] ).unique() - if self._abelian_grouping == 'DENSE': - pauli_lists = [PauliList(family.to_string()) for family in self.PO.f] + if self._abelian_grouping == "DENSE": + pauli_lists = [ + PauliList(family.to_string()) for family in self.PO.f + ] # ATL This won't work for unpopulated families.. need to group only those # which are present in operator decompositions. - elif self._abelian_grouping == 'GC': + elif self._abelian_grouping == "GC": pauli_lists = pauli_list.group_commuting() elif self._abelian_grouping: pauli_lists = pauli_list.group_commuting(qubit_wise=True) @@ -235,39 +246,48 @@ def _compute(self, circuits, observables, parameter_values, run_options): obs_maps[circ_ind] = obs_map logger.debug(f"{obs_maps = }") logger.debug(f"{circ_ind = }") - + bases = [_paulis2basis(pauli_list) for pauli_list in pauli_lists] logger.debug(f"{bases = }") - if len(bases) == 1 and not bases[0].x.any() and not bases[0].z.any(): # identity + if ( + len(bases) == 1 and not bases[0].x.any() and not bases[0].z.any() + ): # identity break - meas_circuits = [self._create_meas_circuit(basis, circ_ind) for basis in bases] + meas_circuits = [ + self._create_meas_circuit(basis, circ_ind) for basis in bases + ] logger.debug(f"{meas_circuits = }") logger.debug(f"{meas_circuits[0].metadata = }") if self._abelian_grouping == "DENSE": fams = self.PO.f - _ids = [self.PO.get_fam_index(str(pauli_list[0])) for pauli_list in pauli_lists] - res =[] + _ids = [ + self.PO.get_fam_index(str(pauli_list[0])) + for pauli_list in pauli_lists + ] + res = [] for _id in _ids: - #m = 3 + # m = 3 _q = QuantumCircuit(self._m) fams[_id].apply_to_circuit(_q) _q.measure_all() - _q.metadata = {'basis': Pauli('Y'*self._m)} #e.g. need to set this correctly.. + _q.metadata = { + "basis": Pauli("Y" * self._m) + } # e.g. need to set this correctly.. # not sure what role it plays in practice.. res.append(_q.copy()) meas_circuits = res logger.debug(f"{meas_circuits = }") - ''' + """ print(f"{_ids = }") meas_circuits = [QuantumCircuit(2) for _ in _ids] meas_circuits = [fams[_id].apply_to_circuit(circuit) for _id, circuit in zip(_ids, meas_circuits)] print(f"{fams =}") print(f"{meas_circuits = }") - ''' + """ logger.debug(f"{meas_circuits[0] = }") circuit = ( @@ -278,7 +298,9 @@ def _compute(self, circuits, observables, parameter_values, run_options): experiments_dict[circ_ind] = self._combine_circs(circuit, meas_circuits) self._cache[key] = experiments_dict, obs_maps - exp_map = self._pre_process_params(circuits, observables, parameter_values, obs_maps) + exp_map = self._pre_process_params( + circuits, observables, parameter_values, obs_maps + ) # Flatten experiments, parameter_binds = self._flatten(experiments_dict, exp_map) @@ -309,8 +331,12 @@ def _compute(self, circuits, observables, parameter_values, run_options): return EstimatorResult(np.real_if_close(expectation_values), list(metadata)) def _pre_process_params(self, circuits, observables, parameter_values, obs_maps): - exp_map = defaultdict(dict) # circ_ind => basis_ind => (parameter, parameter_values) - for circ_ind, obs_ind, param_val in zip(circuits, observables, parameter_values): + exp_map = defaultdict( + dict + ) # circ_ind => basis_ind => (parameter, parameter_values) + for circ_ind, obs_ind, param_val in zip( + circuits, observables, parameter_values + ): self._validate_parameter_length(param_val, circ_ind) parameter = self._parameters[circ_ind] for basis_ind in obs_maps[circ_ind][obs_ind]: @@ -373,7 +399,9 @@ def _combine_circs(circuit: QuantumCircuit, meas_circuits: list[QuantumCircuit]) return circs @staticmethod - def _calculate_result_index(circ_ind, obs_ind, term_ind, param_val, obs_maps, exp_map) -> int: + def _calculate_result_index( + circ_ind, obs_ind, term_ind, param_val, obs_maps, exp_map + ) -> int: basis_ind = obs_maps[circ_ind][obs_ind][term_ind] result_index = 0 @@ -383,18 +411,24 @@ def _calculate_result_index(circ_ind, obs_ind, term_ind, param_val, obs_maps, ex result_index += param_vals.index(param_val) return result_index result_index += len(param_vals) - raise AerError("Bug. Please report from issue: https://github.com/Qiskit/qiskit-aer/issues") + raise AerError( + "Bug. Please report from issue: https://github.com/Qiskit/qiskit-aer/issues" + ) def _create_post_processing( self, circuits, observables, parameter_values, obs_maps, exp_map ) -> list[_PostProcessing]: post_processings = [] - for circ_ind, obs_ind, param_val in zip(circuits, observables, parameter_values): + for circ_ind, obs_ind, param_val in zip( + circuits, observables, parameter_values + ): result_indices: list[int | None] = [] paulis = [] coeffs = [] observable = self._observables[obs_ind] - for term_ind, (pauli, coeff) in enumerate(zip(observable.paulis, observable.coeffs)): + for term_ind, (pauli, coeff) in enumerate( + zip(observable.paulis, observable.coeffs) + ): # Identity if not pauli.x.any() and not pauli.z.any(): result_indices.append(None) @@ -405,12 +439,12 @@ def _create_post_processing( result_index = self._calculate_result_index( circ_ind, obs_ind, term_ind, param_val, obs_maps, exp_map ) - if self._abelian_grouping == "DENSE": + if self._abelian_grouping == "DENSE": logger.debug(f"{pauli = }") - #fams = self.PO.f - #_id = self.PO.get_fam_index(pauli.to_label()) - #sign = fams[_id].signs - #coeff = sign*coeff + # fams = self.PO.f + # _id = self.PO.get_fam_index(pauli.to_label()) + # sign = fams[_id].signs + # coeff = sign*coeff if result_index in result_indices: i = result_indices.index(result_index) paulis[i] += pauli @@ -421,12 +455,12 @@ def _create_post_processing( coeffs.append([coeff]) logger.debug(f"{paulis = }") logger.debug(f"{coeffs = }") - + signs_ = None if self._abelian_grouping == "DENSE": signs_ = [] for _i, (pauli_list, coeff_list) in enumerate(zip(paulis, coeffs)): - #print(f"{pauli_list = }") + # print(f"{pauli_list = }") fams = self.PO.f _id = self.PO.get_fam_index(pauli_list[0].to_label()) signs = fams[_id].signs @@ -434,33 +468,34 @@ def _create_post_processing( # Reorder the signs to match Paulis in pauli_list strings = fams[_id].to_string() logger.debug(f"{strings = }") - pauli_dict = {st:si for (st,si) in zip(strings, signs)} + pauli_dict = {st: si for (st, si) in zip(strings, signs)} signs = [pauli_dict[st.to_label()] for st in pauli_list] logger.debug("reordered signs:", signs) - #try: + # try: if len(signs) != len(coeff_list): - signs = [1]*len(coeff_list) + signs = [1] * len(coeff_list) logger.debug("WARNING") - #raise ValueError - #coeff_list = np.array(signs)*np.array(coeff_list) - #coeffs[_i] = coeff_list - #except ValueError: # Need to handle identity string better.. - #signs = None + # raise ValueError + # coeff_list = np.array(signs)*np.array(coeff_list) + # coeffs[_i] = coeff_list + # except ValueError: # Need to handle identity string better.. + # signs = None signs_.append(signs) - #print(f"test {paulis = }") - #print(f"test {coeffs = }") - post_processings.append(_PostProcessing(result_indices, paulis, coeffs, - signs=signs_)) + # print(f"test {paulis = }") + # print(f"test {coeffs = }") + post_processings.append( + _PostProcessing(result_indices, paulis, coeffs, signs=signs_) + ) return post_processings - + def count_Ys(self, pauli_string): # Move this out of class? count = 0 for char in pauli_string: - if char == 'Y': + if char == "Y": count += 1 logger.debug(f"{count = }") return count - + def _compute_with_approximation( self, circuits, observables, parameter_values, run_options, seed ): @@ -522,7 +557,8 @@ def _compute_with_approximation( # Post processing (calculate expectation values) if shots is None: expectation_values = [ - result.data(i)["expectation_value"] for i in experiment_manager.experiment_indices + result.data(i)["expectation_value"] + for i in experiment_manager.experiment_indices ] metadata = [ {"simulator_metadata": result.results[i].metadata} @@ -608,7 +644,7 @@ def __init__( result_indices: list[int], paulis: list[PauliList], coeffs: list[list[float]], - signs = None + signs=None, ): self._result_indices = result_indices self._paulis = paulis @@ -616,7 +652,7 @@ def __init__( logger.debug(f"{self._coeffs = }") if signs: self._signs = [np.array(s) for s in signs] - self._coeffs = [s*c for s, c in zip(self._signs, self._coeffs)] + self._coeffs = [s * c for s, c in zip(self._signs, self._coeffs)] else: self._signs = None @@ -633,13 +669,17 @@ def run(self, results: list[ExperimentResult]) -> tuple[float, dict]: combined_expval = 0.0 combined_var = 0.0 simulator_metadata = [] - for c_i, paulis, coeffs in zip(self._result_indices, self._paulis, self._coeffs): + for c_i, paulis, coeffs in zip( + self._result_indices, self._paulis, self._coeffs + ): logger.debug(f"{c_i = }") logger.debug(f"{paulis = }") logger.debug(f"{coeffs = }") if c_i is None: # Observable is identity - expvals, variances = np.array([1], dtype=complex), np.array([0], dtype=complex) + expvals, variances = np.array([1], dtype=complex), np.array( + [0], dtype=complex + ) shots = 0 else: result = results[c_i] @@ -652,10 +692,11 @@ def run(self, results: list[ExperimentResult]) -> tuple[float, dict]: measured_paulis = PauliList.from_symplectic( paulis.z[:, indices], paulis.x[:, indices], 0 ) - #measured_paulis = paulis # ATL my edit, not sure diff with above line.. + # measured_paulis = paulis # ATL my edit, not sure diff with above line.. logger.debug(f"{measured_paulis = }") - expvals, variances = _pauli_expval_with_variance(count, measured_paulis, - self._signs) + expvals, variances = _pauli_expval_with_variance( + count, measured_paulis, self._signs + ) simulator_metadata.append(result.metadata) combined_expval += np.dot(expvals, coeffs) logger.debug(f"{combined_expval = }") @@ -676,17 +717,19 @@ def _update_metadata(circuit: QuantumCircuit, metadata: dict) -> QuantumCircuit: return circuit -def _pauli_expval_with_variance(counts: dict, paulis: PauliList, signs=None) -> tuple[np.ndarray, np.ndarray]: +def _pauli_expval_with_variance( + counts: dict, paulis: PauliList, signs=None +) -> tuple[np.ndarray, np.ndarray]: # Diag indices size = len(paulis) - #if not signs: + # if not signs: # signs = [1]*size logger.debug(f"_pauli_expval_with_variance {paulis = }") if signs: diag_inds = _paulis2inds_GC(paulis) else: diag_inds = _paulis2inds(paulis) - #diag_inds = [3,2,1] + # diag_inds = [3,2,1] expvals = np.zeros(size, dtype=float) denom = 0 # Total shots for counts dict for hex_outcome, freq in counts.items(): @@ -696,10 +739,10 @@ def _pauli_expval_with_variance(counts: dict, paulis: PauliList, signs=None) -> denom += freq for k in range(size): coeff = (-1) ** _parity(diag_inds[k] & outcome) - # if grouping == "DENSE" and (count_Ys(paulis[k]) % 2 == 1): - # coeff *= -1 + # if grouping == "DENSE" and (count_Ys(paulis[k]) % 2 == 1): + # coeff *= -1 logger.debug(f"{coeff = }") - expvals[k] += freq * coeff #*signs[k] + expvals[k] += freq * coeff # *signs[k] # Divide by total shots expvals /= denom @@ -710,7 +753,9 @@ def _pauli_expval_with_variance(counts: dict, paulis: PauliList, signs=None) -> def _paulis2inds(paulis: PauliList) -> list[int]: nonid = paulis.z | paulis.x - packed_vals = np.packbits(nonid, axis=1, bitorder="little").astype( # pylint:disable=no-member + packed_vals = np.packbits( + nonid, axis=1, bitorder="little" + ).astype( # pylint:disable=no-member object ) power_uint8 = 1 << (8 * np.arange(packed_vals.shape[1], dtype=object)) @@ -718,21 +763,23 @@ def _paulis2inds(paulis: PauliList) -> list[int]: logger.debug(f"_paulis2inds returns {inds.tolist() = }") return inds.tolist() + def _paulis2inds_GC(paulis: PauliList) -> list[int]: if (~paulis.z).all(): - logger.debug('xfam') + logger.debug("xfam") paulis.z = paulis.x logger.debug(paulis.z[0]) - logger.debug(2**(paulis.z[0])) + logger.debug(2 ** (paulis.z[0])) res = [] for zdat in paulis.z: m = len(zdat) _d = zdat - _r = [(2**i)*_d[i] for i in range(m)] + _r = [(2**i) * _d[i] for i in range(m)] logger.debug(f"{_r = }") res.append(np.sum(_r)) return res + def _parity(integer: int) -> int: """Return the parity of an integer""" return bin(integer).count("1") % 2 diff --git a/src/dense_ev/rmatrix.py b/src/dense_ev/rmatrix.py index 03b97b3..548e464 100644 --- a/src/dense_ev/rmatrix.py +++ b/src/dense_ev/rmatrix.py @@ -104,6 +104,7 @@ def array_to_Op(Hmat): # print(type(H_op)) return H_op + def array_to_SparsePauliOp(Hmat, cut=0): N = Hmat.shape[0] m = log(N, 2) @@ -120,6 +121,7 @@ def array_to_SparsePauliOp(Hmat, cut=0): coeffs.append(coeff) return SparsePauliOp(primitives, coeffs=coeffs) + def get_groups(m): """Specification of Pauli string families suitable for use in Qiskit. diff --git a/src/dense_ev/test_estimator.py b/src/dense_ev/test_estimator.py index ff09c18..5e65c44 100644 --- a/src/dense_ev/test_estimator.py +++ b/src/dense_ev/test_estimator.py @@ -11,11 +11,13 @@ from qiskit.circuit.random import random_circuit from qiskit.quantum_info import SparsePauliOp, PauliList from qiskit import QuantumCircuit -#from qiskit_aer.primitives import Estimator + +# from qiskit_aer.primitives import Estimator from dense_ev.estimator_from_aer import Estimator from dense_ev.rmatrix import random_H, array_to_SparsePauliOp + def test_random(m): "Check different grouping methods with random Hamiltonians." N = 2**m @@ -24,44 +26,45 @@ def test_random(m): Hmat = random_H(N) H = array_to_SparsePauliOp(Hmat) - #state = QuantumCircuit(2) - #state.x(0) - #state.h(0) + # state = QuantumCircuit(2) + # state.x(0) + # state.h(0) state = random_circuit(m, 8, measure=True, seed=seed) # Get exact result. nshots = None approx = True grouping = False # This flag doesn't matter here. - run_options = {"shots": nshots, 'seed': seed} - estimator = Estimator(run_options=run_options, - abelian_grouping=grouping, - approximation=approx) + run_options = {"shots": nshots, "seed": seed} + estimator = Estimator( + run_options=run_options, abelian_grouping=grouping, approximation=approx + ) result_exact = estimator.run(state, H, shots=nshots).result().values # Abelian result. nshots = 200000 approx = False grouping = True - run_options = {"shots": nshots, 'seed': seed} - estimator = Estimator(run_options=run_options, - abelian_grouping=grouping, - approximation=approx) + run_options = {"shots": nshots, "seed": seed} + estimator = Estimator( + run_options=run_options, abelian_grouping=grouping, approximation=approx + ) result_abelian = estimator.run(state, H, shots=nshots).result().values - + # Dense result. nshots = 200000 approx = False grouping = "DENSE" - run_options = {"shots": nshots, 'seed': seed} - estimator = Estimator(run_options=run_options, - abelian_grouping=grouping, - approximation=approx) + run_options = {"shots": nshots, "seed": seed} + estimator = Estimator( + run_options=run_options, abelian_grouping=grouping, approximation=approx + ) result_dense = estimator.run(state, H, shots=nshots).result().values print(f"{result_exact = }") print(f"{result_abelian = }") print(f"{result_dense = }") + if __name__ == "__main__": test_random(3)