Skip to content

Commit

Permalink
[pre-commit.ci] auto fixes from pre-commit.com hooks
Browse files Browse the repository at this point in the history
for more information, see https://pre-commit.ci
  • Loading branch information
pre-commit-ci[bot] committed Aug 14, 2024
1 parent ddbc78d commit 0111a6e
Showing 1 changed file with 69 additions and 62 deletions.
131 changes: 69 additions & 62 deletions src/qibo/transpiler/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,24 +176,26 @@ def __init__(
initial_layout: dict,
circuit: Optional[Circuit] = None,
blocks: Optional[CircuitBlocks] = None,
temp: Optional[bool] = False, #2# for temporary circuit
temp: Optional[bool] = False, # 2# for temporary circuit
):
self.initial_layout = dict(sorted(initial_layout.items()))

#1# bidirectional mapping
#1# self._p2l: physical qubit number i -> logical qubit number _p2l[i]
#1# self._l2p: logical qubit number i -> physical qubit number _l2p[i]
self._l2p, self._p2l = [0] * len(self.initial_layout), [0] * len(self.initial_layout)
# 1# bidirectional mapping
# 1# self._p2l: physical qubit number i -> logical qubit number _p2l[i]
# 1# self._l2p: logical qubit number i -> physical qubit number _l2p[i]
self._l2p, self._p2l = [0] * len(self.initial_layout), [0] * len(
self.initial_layout
)
for mapping in self.initial_layout.items():
physical_qubit, logical_qubit = int(mapping[0][1:]), mapping[1]
self._l2p[logical_qubit] = physical_qubit
self._p2l[physical_qubit] = logical_qubit

self._temporary = temp
if self._temporary: #2# if temporary circuit, no need to store the blocks
if self._temporary: # 2# if temporary circuit, no need to store the blocks
return

self._nqubits = circuit.nqubits #1# number of qubits
self._nqubits = circuit.nqubits # 1# number of qubits
if circuit is None:
raise_error(ValueError, "Circuit must be provided.")

Expand All @@ -205,7 +207,7 @@ def __init__(
self._routed_blocks = CircuitBlocks(Circuit(circuit.nqubits))
self._swaps = 0

#1# previous: set_circuit_logical
# 1# previous: set_circuit_logical
def set_p2l(self, p2l_map: list):
"""Sets the current physical to logical qubit mapping.
Expand All @@ -214,8 +216,8 @@ def set_p2l(self, p2l_map: list):
Args:
p2l_map (list): physical to logical mapping.
"""
#1# update bidirectional mapping
#4# use shallow copy
# 1# update bidirectional mapping
# 4# use shallow copy
self._p2l = p2l_map.copy()
self._l2p = [0] * len(self._p2l)
for i, l in enumerate(self._p2l):
Expand All @@ -234,9 +236,7 @@ def execute_block(self, block: Block):
Args:
block (:class:`qibo.transpiler.blocks.Block`): block to be removed.
"""
self._routed_blocks.add_block(
block.on_qubits(self.get_physical_qubits(block))
)
self._routed_blocks.add_block(block.on_qubits(self.get_physical_qubits(block)))
self.circuit_blocks.remove_block(block)

def routed_circuit(self, circuit_kwargs: Optional[dict] = None):
Expand All @@ -253,11 +253,8 @@ def routed_circuit(self, circuit_kwargs: Optional[dict] = None):
def final_layout(self):
"""Returns the final physical-logical qubits mapping."""

#1# return {"q0": lq_num0, "q1": lq_num1, ...}
unsorted_dict = {
"q" + str(i): self._p2l[i]
for i in range(self._nqubits)
}
# 1# return {"q0": lq_num0, "q1": lq_num1, ...}
unsorted_dict = {"q" + str(i): self._p2l[i] for i in range(self._nqubits)}

return dict(sorted(unsorted_dict.items()))

Expand All @@ -273,14 +270,14 @@ def update(self, swap_l: tuple):

swap_p = self.logical_pair_to_physical(swap_l)

#2# add the real SWAP gate, not a temporary circuit
# 2# add the real SWAP gate, not a temporary circuit
if not self._temporary:
self._routed_blocks.add_block(
Block(qubits=swap_p, gates=[gates.SWAP(*swap_p)])
)
self._swaps += 1

#1# update the bidirectional mapping
# 1# update the bidirectional mapping
p1, p2 = swap_p
l1, l2 = swap_l
self._p2l[p1], self._p2l[p2] = l2, l1
Expand All @@ -294,7 +291,7 @@ def undo(self):
self._routed_blocks.remove_block(last_swap_block)
self._swaps -= 1

#1# update the bidirectional mapping
# 1# update the bidirectional mapping
p1, p2 = swap_p
l1, l2 = swap_l
self._p2l[p1], self._p2l[p2] = l2, l1
Expand All @@ -314,7 +311,7 @@ def get_physical_qubits(self, block: Union[int, Block]):

return tuple(self._l2p[q] for q in block.qubits)

#1# logical_to_physical -> logical_pair_to_physical
# 1# logical_to_physical -> logical_pair_to_physical
def logical_pair_to_physical(self, logical_qubits: tuple):
"""Returns the physical qubits associated to the logical qubit pair.
Expand All @@ -324,10 +321,11 @@ def logical_pair_to_physical(self, logical_qubits: tuple):
Returns:
tuple: physical qubit numbers associated to the logical qubit pair.
"""
#1# return physical qubit numbers corresponding to the logical qubit pair
# 1# return physical qubit numbers corresponding to the logical qubit pair
return self._l2p[logical_qubits[0]], self._l2p[logical_qubits[1]]

#1# circuit_to_logical(), circuit_to_physical() removed
# 1# circuit_to_logical(), circuit_to_physical() removed


class ShortestPaths(Router):
"""A class to perform initial qubit mapping and connectivity matching.
Expand Down Expand Up @@ -382,9 +380,12 @@ def __call__(self, circuit: Circuit, initial_layout: dict):
routed_circuit=routed_circuit
)

#1# final layout is reverted to the original labeling
# 1# final layout is reverted to the original labeling
final_layout = self.circuit.final_layout()
final_layout_restored = {"q" + str(self.node_mapping_inv[int(k[1:])]): v for k, v in final_layout.items()}
final_layout_restored = {
"q" + str(self.node_mapping_inv[int(k[1:])]): v
for k, v in final_layout.items()
}
return routed_circuit, final_layout_restored

def _find_new_mapping(self):
Expand Down Expand Up @@ -434,15 +435,14 @@ def _add_swaps(candidate: tuple, circuitmap: CircuitMap):
"""
path = candidate[0]
meeting_point = candidate[1]
forward = path[0 : meeting_point + 1] #1# physical qubits
forward = path[0 : meeting_point + 1] # 1# physical qubits
backward = list(reversed(path[meeting_point + 1 :]))
#1# apply logical swaps
# 1# apply logical swaps
for f in forward[1:]:
circuitmap.update((circuitmap._p2l[f], circuitmap._p2l[forward[0]]))
for b in backward[1:]:
circuitmap.update((circuitmap._p2l[b], circuitmap._p2l[backward[0]]))


def _compute_cost(self, candidate: tuple):
"""Greedy algorithm that decides which path to take and how qubits should be walked.
Expand All @@ -454,14 +454,14 @@ def _compute_cost(self, candidate: tuple):
Returns:
(list, int): best path to move qubits and qubit meeting point in the path.
"""
#2# CircuitMap might be used
# 2# CircuitMap might be used
temporary_circuit = CircuitMap(
initial_layout=self.circuit.initial_layout,
circuit=Circuit(len(self.circuit.initial_layout)),
blocks=deepcopy(self.circuit.circuit_blocks),
)

#1# use set_p2l
# 1# use set_p2l
temporary_circuit.set_p2l(self.circuit._p2l)
self._add_swaps(candidate, temporary_circuit)
temporary_dag = deepcopy(self._dag)
Expand All @@ -476,7 +476,7 @@ def _compute_cost(self, candidate: tuple):
all_executed = True
for block in temporary_front_layer:
if (
#3# might be changed to use _get_dag_layer(qubits=True) to avoid using get_physical_qubits(block_num)
# 3# might be changed to use _get_dag_layer(qubits=True) to avoid using get_physical_qubits(block_num)
temporary_circuit.get_physical_qubits(block)
in self.connectivity.edges
or not temporary_circuit.circuit_blocks.search_by_index(
Expand Down Expand Up @@ -504,7 +504,7 @@ def _check_execution(self):
executable_blocks = []
for block in self._front_layer:
if (
#3# might be changed to use _get_dag_layer(qubits=True) to avoid using get_physical_qubits(block_num)
# 3# might be changed to use _get_dag_layer(qubits=True) to avoid using get_physical_qubits(block_num)
self.circuit.get_physical_qubits(block) in self.connectivity.edges
or not self.circuit.circuit_blocks.search_by_index(block).entangled
):
Expand Down Expand Up @@ -554,7 +554,7 @@ def _preprocessing(self, circuit: Circuit, initial_layout: dict):
initial_layout (dict): initial physical-to-logical qubit mapping.
"""

#1# To simplify routing, some data is relabeled before routing begins.
# 1# To simplify routing, some data is relabeled before routing begins.
node_mapping, new_initial_layout = {}, {}
for i, node in enumerate(self.connectivity.nodes):
node_mapping[node] = i
Expand Down Expand Up @@ -596,8 +596,9 @@ def _append_final_measurements(self, routed_circuit: Circuit):
for measurement in self._final_measurements:
original_qubits = measurement.qubits
routed_qubits = list(
#1# use l2p to get physical qubit numbers
self.circuit._l2p[qubit] for qubit in original_qubits
# 1# use l2p to get physical qubit numbers
self.circuit._l2p[qubit]
for qubit in original_qubits
)
routed_circuit.add(
measurement.on_qubits(dict(zip(original_qubits, routed_qubits)))
Expand Down Expand Up @@ -643,7 +644,7 @@ def __init__(
seed: Optional[int] = None,
):
self.connectivity = connectivity
#1# map to revert the final layout to the original labeling
# 1# map to revert the final layout to the original labeling
self.node_mapping_inv = None
self.lookahead = lookahead
self.decay = decay_lookahead
Expand Down Expand Up @@ -698,9 +699,12 @@ def __call__(self, circuit: Circuit, initial_layout: dict):
routed_circuit=routed_circuit
)

#1# final layout is reverted to the original labeling
# 1# final layout is reverted to the original labeling
final_layout = self.circuit.final_layout()
final_layout_restored = {"q" + str(self.node_mapping_inv[int(k[1:])]): v for k, v in final_layout.items()}
final_layout_restored = {
"q" + str(self.node_mapping_inv[int(k[1:])]): v
for k, v in final_layout.items()
}
return routed_circuit, final_layout_restored

@property
Expand All @@ -724,8 +728,8 @@ def _preprocessing(self, circuit: Circuit, initial_layout: dict):
initial_layout (dict): initial physical-to-logical qubit mapping.
"""

#1# To simplify routing, some data is relabeled before routing begins.
#1# physical qubit is reassigned to a range from 0 to len(self.connectivity.nodes) - 1.
# 1# To simplify routing, some data is relabeled before routing begins.
# 1# physical qubit is reassigned to a range from 0 to len(self.connectivity.nodes) - 1.
node_mapping, new_initial_layout = {}, {}
for i, node in enumerate(self.connectivity.nodes):
node_mapping[node] = i
Expand Down Expand Up @@ -769,8 +773,9 @@ def _append_final_measurements(self, routed_circuit: Circuit):
for measurement in self._final_measurements:
original_qubits = measurement.qubits
routed_qubits = list(
#1# use l2p to get physical qubit numbers
self.circuit._l2p[qubit] for qubit in original_qubits
# 1# use l2p to get physical qubit numbers
self.circuit._l2p[qubit]
for qubit in original_qubits
)
routed_circuit.add(
measurement.on_qubits(dict(zip(original_qubits, routed_qubits)))
Expand Down Expand Up @@ -799,8 +804,8 @@ def _get_dag_layer(self, n_layer, qubits=False):
Otherwise, return the block numbers.
"""

#3# depend on the 'qubits' flag, return the block number or target qubits
#3# return target qubits -> to avoid using get_physical_qubits(block_num)
# 3# depend on the 'qubits' flag, return the block number or target qubits
# 3# return target qubits -> to avoid using get_physical_qubits(block_num)
if qubits:
layer_qubits = []
nodes = self._dag.nodes(data=True)
Expand All @@ -816,7 +821,7 @@ def _find_new_mapping(self):
"""Find the new best mapping by adding one swap."""
candidates_evaluation = {}

#4# use shallow copy
# 4# use shallow copy
self._memory_map.append(self.circuit._p2l.copy())
for candidate in self._swap_candidates():
candidates_evaluation[candidate] = self._compute_cost(candidate)
Expand All @@ -835,31 +840,31 @@ def _find_new_mapping(self):
def _compute_cost(self, candidate: int):
"""Compute the cost associated to a possible SWAP candidate."""

#2# use CircuitMap for temporary circuit to save time
#2# no gates, no block decomposition, no Circuit object
#2# just logical-physical mapping
# 2# use CircuitMap for temporary circuit to save time
# 2# no gates, no block decomposition, no Circuit object
# 2# just logical-physical mapping
temporary_circuit = CircuitMap(
initial_layout=self.circuit.initial_layout,
temp=True,
)

#1# use set_p2l
# 1# use set_p2l
temporary_circuit.set_p2l(self.circuit._p2l)
temporary_circuit.update(candidate)

#1# use p2l to check if the mapping is already in the memory
# 1# use p2l to check if the mapping is already in the memory
if temporary_circuit._p2l in self._memory_map:
return float("inf")

tot_distance = 0.0
weight = 1.0
for layer in range(self.lookahead + 1):
#3# return gates' target qubit pairs in the layer
#3# to avoid using get_physical_qubits(block_num)
# 3# return gates' target qubit pairs in the layer
# 3# to avoid using get_physical_qubits(block_num)
layer_gates = self._get_dag_layer(layer, qubits=True)
avg_layer_distance = 0.0
for lq_pair in layer_gates:
#3# logical qubit pairs to node numbers (physical qubit pairs) in the connectivity graph
# 3# logical qubit pairs to node numbers (physical qubit pairs) in the connectivity graph
qubits = temporary_circuit.logical_pair_to_physical(lq_pair)
avg_layer_distance += (
max(self._delta_register[i] for i in qubits)
Expand All @@ -881,7 +886,7 @@ def _swap_candidates(self):
(list): list of candidates.
"""
candidates = []
#3# might be changed to use _get_dag_layer(qubits=True) to avoid using get_physical_qubits(block_num)
# 3# might be changed to use _get_dag_layer(qubits=True) to avoid using get_physical_qubits(block_num)
for block in self._front_layer:
for qubit in self.circuit.get_physical_qubits(block):
for connected in self.connectivity.neighbors(qubit):
Expand All @@ -907,7 +912,7 @@ def _check_execution(self):
executable_blocks = []
for block in self._front_layer:
if (
#3# might be changed to use _get_dag_layer(qubits=True) to avoid using get_physical_qubits(block_num)
# 3# might be changed to use _get_dag_layer(qubits=True) to avoid using get_physical_qubits(block_num)
self.circuit.get_physical_qubits(block) in self.connectivity.edges
or not self.circuit.circuit_blocks.search_by_index(block).entangled
):
Expand Down Expand Up @@ -950,8 +955,8 @@ def _shortest_path_routing(self):
shortest_path_qubits = None

for block in self._front_layer:
#3# return node numbers (physical qubits) in the connectivity graph
#3# might be changed to use _get_dag_layer(qubits=True) to avoid using get_physical_qubits(block_num)
# 3# return node numbers (physical qubits) in the connectivity graph
# 3# might be changed to use _get_dag_layer(qubits=True) to avoid using get_physical_qubits(block_num)
q1, q2 = self.circuit.get_physical_qubits(block)
distance = self._dist_matrix[q1, q2]

Expand All @@ -964,11 +969,12 @@ def _shortest_path_routing(self):
)

# move q1
#1# qubit moving algorithm is changed
# 1# qubit moving algorithm is changed
q1 = self.circuit._p2l[shortest_path[0]]
for q2 in shortest_path[1:-1]:
self.circuit.update((q1, self.circuit._p2l[q2]))


def _create_dag(gates_qubits_pairs: list):
"""Helper method for :meth:`qibo.transpiler.router.Sabre`.
Expand All @@ -984,7 +990,7 @@ def _create_dag(gates_qubits_pairs: list):
dag = nx.DiGraph()
dag.add_nodes_from(range(len(gates_qubits_pairs)))

#3# additionally store target qubits of the gates
# 3# additionally store target qubits of the gates
for i in range(len(gates_qubits_pairs)):
dag.nodes[i]["qubits"] = gates_qubits_pairs[i]

Expand All @@ -1003,6 +1009,7 @@ def _create_dag(gates_qubits_pairs: list):

return _remove_redundant_connections(dag)


def _remove_redundant_connections(dag: nx.DiGraph):
"""Helper method for :func:`qibo.transpiler.router._create_dag`.
Expand All @@ -1015,9 +1022,9 @@ def _remove_redundant_connections(dag: nx.DiGraph):
(:class:`networkx.DiGraph`): reduced dag.
"""
new_dag = nx.DiGraph()
#3# add nodes with attributes
# 3# add nodes with attributes
new_dag.add_nodes_from(dag.nodes(data=True))
transitive_reduction = nx.transitive_reduction(dag)
new_dag.add_edges_from(transitive_reduction.edges)

return new_dag
return new_dag

0 comments on commit 0111a6e

Please sign in to comment.