-
Notifications
You must be signed in to change notification settings - Fork 2.4k
/
sabre_swap.py
371 lines (300 loc) · 15.4 KB
/
sabre_swap.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
# This code is part of Qiskit.
#
# (C) Copyright IBM 2017, 2020.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.
"""Routing via SWAP insertion using the SABRE method from Li et al."""
import logging
from collections import defaultdict
from copy import copy, deepcopy
import numpy as np
from qiskit.circuit.library.standard_gates import SwapGate
from qiskit.circuit.quantumregister import Qubit
from qiskit.transpiler.basepasses import TransformationPass
from qiskit.transpiler.exceptions import TranspilerError
from qiskit.transpiler.layout import Layout
from qiskit.dagcircuit import DAGOpNode
logger = logging.getLogger(__name__)
EXTENDED_SET_SIZE = 20 # Size of lookahead window. TODO: set dynamically to len(current_layout)
EXTENDED_SET_WEIGHT = 0.5 # Weight of lookahead window compared to front_layer.
DECAY_RATE = 0.001 # Decay coefficient for penalizing serial swaps.
DECAY_RESET_INTERVAL = 5 # How often to reset all decay rates to 1.
class SabreSwap(TransformationPass):
r"""Map input circuit onto a backend topology via insertion of SWAPs.
Implementation of the SWAP-based heuristic search from the SABRE qubit
mapping paper [1] (Algorithm 1). The heuristic aims to minimize the number
of lossy SWAPs inserted and the depth of the circuit.
This algorithm starts from an initial layout of virtual qubits onto physical
qubits, and iterates over the circuit DAG until all gates are exhausted,
inserting SWAPs along the way. It only considers 2-qubit gates as only those
are germane for the mapping problem (it is assumed that 3+ qubit gates are
already decomposed).
In each iteration, it will first check if there are any gates in the
``front_layer`` that can be directly applied. If so, it will apply them and
remove them from ``front_layer``, and replenish that layer with new gates
if possible. Otherwise, it will try to search for SWAPs, insert the SWAPs,
and update the mapping.
The search for SWAPs is restricted, in the sense that we only consider
physical qubits in the neighborhood of those qubits involved in
``front_layer``. These give rise to a ``swap_candidate_list`` which is
scored according to some heuristic cost function. The best SWAP is
implemented and ``current_layout`` updated.
**References:**
[1] Li, Gushu, Yufei Ding, and Yuan Xie. "Tackling the qubit mapping problem
for NISQ-era quantum devices." ASPLOS 2019.
`arXiv:1809.02573 <https://arxiv.org/pdf/1809.02573.pdf>`_
"""
def __init__(self, coupling_map, heuristic="basic", seed=None, fake_run=False):
r"""SabreSwap initializer.
Args:
coupling_map (CouplingMap): CouplingMap of the target backend.
heuristic (str): The type of heuristic to use when deciding best
swap strategy ('basic' or 'lookahead' or 'decay').
seed (int): random seed used to tie-break among candidate swaps.
fake_run (bool): if true, it only pretend to do routing, i.e., no
swap is effectively added.
Additional Information:
The search space of possible SWAPs on physical qubits is explored
by assigning a score to the layout that would result from each SWAP.
The goodness of a layout is evaluated based on how viable it makes
the remaining virtual gates that must be applied. A few heuristic
cost functions are supported
- 'basic':
The sum of distances for corresponding physical qubits of
interacting virtual qubits in the front_layer.
.. math::
H_{basic} = \sum_{gate \in F} D[\pi(gate.q_1)][\pi(gate.q2)]
- 'lookahead':
This is the sum of two costs: first is the same as the basic cost.
Second is the basic cost but now evaluated for the
extended set as well (i.e. :math:`|E|` number of upcoming successors to gates in
front_layer F). This is weighted by some amount EXTENDED_SET_WEIGHT (W) to
signify that upcoming gates are less important that the front_layer.
.. math::
H_{decay}=\frac{1}{\left|{F}\right|}\sum_{gate \in F} D[\pi(gate.q_1)][\pi(gate.q2)]
+ W*\frac{1}{\left|{E}\right|} \sum_{gate \in E} D[\pi(gate.q_1)][\pi(gate.q2)]
- 'decay':
This is the same as 'lookahead', but the whole cost is multiplied by a
decay factor. This increases the cost if the SWAP that generated the
trial layout was recently used (i.e. it penalizes increase in depth).
.. math::
H_{decay} = max(decay(SWAP.q_1), decay(SWAP.q_2)) {
\frac{1}{\left|{F}\right|} \sum_{gate \in F} D[\pi(gate.q_1)][\pi(gate.q2)]\\
+ W *\frac{1}{\left|{E}\right|} \sum_{gate \in E} D[\pi(gate.q_1)][\pi(gate.q2)]
}
"""
super().__init__()
# Assume bidirectional couplings, fixing gate direction is easy later.
if coupling_map.is_symmetric:
self.coupling_map = coupling_map
else:
self.coupling_map = deepcopy(coupling_map)
self.coupling_map.make_symmetric()
self.heuristic = heuristic
self.seed = seed
self.fake_run = fake_run
self.applied_predecessors = None
self.qubits_decay = None
self._bit_indices = None
def run(self, dag):
"""Run the SabreSwap pass on `dag`.
Args:
dag (DAGCircuit): the directed acyclic graph to be mapped.
Returns:
DAGCircuit: A dag mapped to be compatible with the coupling_map.
Raises:
TranspilerError: if the coupling map or the layout are not
compatible with the DAG
"""
if len(dag.qregs) != 1 or dag.qregs.get("q", None) is None:
raise TranspilerError("Sabre swap runs on physical circuits only.")
if len(dag.qubits) > self.coupling_map.size():
raise TranspilerError("More virtual qubits exist than physical.")
rng = np.random.default_rng(self.seed)
# Preserve input DAG's name, regs, wire_map, etc. but replace the graph.
mapped_dag = None
if not self.fake_run:
mapped_dag = dag._copy_circuit_metadata()
canonical_register = dag.qregs["q"]
current_layout = Layout.generate_trivial_layout(canonical_register)
self._bit_indices = {bit: idx for idx, bit in enumerate(canonical_register)}
# A decay factor for each qubit used to heuristically penalize recently
# used qubits (to encourage parallelism).
self.qubits_decay = {qubit: 1 for qubit in dag.qubits}
# Start algorithm from the front layer and iterate until all gates done.
num_search_steps = 0
front_layer = dag.front_layer()
self.applied_predecessors = defaultdict(int)
for _, input_node in dag.input_map.items():
for successor in self._successors(input_node, dag):
self.applied_predecessors[successor] += 1
while front_layer:
execute_gate_list = []
# Remove as many immediately applicable gates as possible
for node in front_layer:
if len(node.qargs) == 2:
v0, v1 = node.qargs
if self.coupling_map.graph.has_edge(current_layout[v0], current_layout[v1]):
execute_gate_list.append(node)
else: # Single-qubit gates as well as barriers are free
execute_gate_list.append(node)
if execute_gate_list:
for node in execute_gate_list:
self._apply_gate(mapped_dag, node, current_layout, canonical_register)
front_layer.remove(node)
for successor in self._successors(node, dag):
self.applied_predecessors[successor] += 1
if self._is_resolved(successor):
front_layer.append(successor)
if node.qargs:
self._reset_qubits_decay()
# Diagnostics
logger.debug(
"free! %s",
[
(n.name if isinstance(n, DAGOpNode) else None, n.qargs)
for n in execute_gate_list
],
)
logger.debug(
"front_layer: %s",
[(n.name if isinstance(n, DAGOpNode) else None, n.qargs) for n in front_layer],
)
continue
# After all free gates are exhausted, heuristically find
# the best swap and insert it. When two or more swaps tie
# for best score, pick one randomly.
extended_set = self._obtain_extended_set(dag, front_layer)
swap_candidates = self._obtain_swaps(front_layer, current_layout)
swap_scores = dict.fromkeys(swap_candidates, 0)
for swap_qubits in swap_scores:
trial_layout = current_layout.copy()
trial_layout.swap(*swap_qubits)
score = self._score_heuristic(
self.heuristic, front_layer, extended_set, trial_layout, swap_qubits
)
swap_scores[swap_qubits] = score
min_score = min(swap_scores.values())
best_swaps = [k for k, v in swap_scores.items() if v == min_score]
best_swaps.sort(key=lambda x: (self._bit_indices[x[0]], self._bit_indices[x[1]]))
best_swap = rng.choice(best_swaps)
swap_node = DAGOpNode(op=SwapGate(), qargs=best_swap)
self._apply_gate(mapped_dag, swap_node, current_layout, canonical_register)
current_layout.swap(*best_swap)
num_search_steps += 1
if num_search_steps % DECAY_RESET_INTERVAL == 0:
self._reset_qubits_decay()
else:
self.qubits_decay[best_swap[0]] += DECAY_RATE
self.qubits_decay[best_swap[1]] += DECAY_RATE
# Diagnostics
logger.debug("SWAP Selection...")
logger.debug("extended_set: %s", [(n.name, n.qargs) for n in extended_set])
logger.debug("swap scores: %s", swap_scores)
logger.debug("best swap: %s", best_swap)
logger.debug("qubits decay: %s", self.qubits_decay)
self.property_set["final_layout"] = current_layout
if not self.fake_run:
return mapped_dag
return dag
def _apply_gate(self, mapped_dag, node, current_layout, canonical_register):
if self.fake_run:
return
new_node = _transform_gate_for_layout(node, current_layout, canonical_register)
mapped_dag.apply_operation_back(new_node.op, new_node.qargs, new_node.cargs)
def _reset_qubits_decay(self):
"""Reset all qubit decay factors to 1 upon request (to forget about
past penalizations).
"""
self.qubits_decay = {k: 1 for k in self.qubits_decay.keys()}
def _successors(self, node, dag):
for _, successor, edge_data in dag.edges(node):
if not isinstance(successor, DAGOpNode):
continue
if isinstance(edge_data, Qubit):
yield successor
def _is_resolved(self, node):
"""Return True if all of a node's predecessors in dag are applied."""
return self.applied_predecessors[node] == len(node.qargs)
def _obtain_extended_set(self, dag, front_layer):
"""Populate extended_set by looking ahead a fixed number of gates.
For each existing element add a successor until reaching limit.
"""
extended_set = list()
incremented = list()
tmp_front_layer = front_layer
done = False
while tmp_front_layer and not done:
new_tmp_front_layer = list()
for node in tmp_front_layer:
for successor in self._successors(node, dag):
incremented.append(successor)
self.applied_predecessors[successor] += 1
if self._is_resolved(successor):
new_tmp_front_layer.append(successor)
if len(successor.qargs) == 2:
extended_set.append(successor)
if len(extended_set) >= EXTENDED_SET_SIZE:
done = True
break
tmp_front_layer = new_tmp_front_layer
for node in incremented:
self.applied_predecessors[node] -= 1
return extended_set
def _obtain_swaps(self, front_layer, current_layout):
"""Return a set of candidate swaps that affect qubits in front_layer.
For each virtual qubit in front_layer, find its current location
on hardware and the physical qubits in that neighborhood. Every SWAP
on virtual qubits that corresponds to one of those physical couplings
is a candidate SWAP.
Candidate swaps are sorted so SWAP(i,j) and SWAP(j,i) are not duplicated.
"""
candidate_swaps = set()
for node in front_layer:
for virtual in node.qargs:
physical = current_layout[virtual]
for neighbor in self.coupling_map.neighbors(physical):
virtual_neighbor = current_layout[neighbor]
swap = sorted([virtual, virtual_neighbor], key=lambda q: self._bit_indices[q])
candidate_swaps.add(tuple(swap))
return candidate_swaps
def _compute_cost(self, layer, layout):
cost = 0
for node in layer:
cost += self.coupling_map.distance(layout[node.qargs[0]], layout[node.qargs[1]])
return cost
def _score_heuristic(self, heuristic, front_layer, extended_set, layout, swap_qubits=None):
"""Return a heuristic score for a trial layout.
Assuming a trial layout has resulted from a SWAP, we now assign a cost
to it. The goodness of a layout is evaluated based on how viable it makes
the remaining virtual gates that must be applied.
"""
first_cost = self._compute_cost(front_layer, layout)
if heuristic == "basic":
return first_cost
first_cost /= len(front_layer)
second_cost = 0
if extended_set:
second_cost = self._compute_cost(extended_set, layout) / len(extended_set)
total_cost = first_cost + EXTENDED_SET_WEIGHT * second_cost
if heuristic == "lookahead":
return total_cost
if heuristic == "decay":
return (
max(self.qubits_decay[swap_qubits[0]], self.qubits_decay[swap_qubits[1]])
* total_cost
)
raise TranspilerError("Heuristic %s not recognized." % heuristic)
def _transform_gate_for_layout(op_node, layout, device_qreg):
"""Return node implementing a virtual op on given layout."""
mapped_op_node = copy(op_node)
premap_qargs = op_node.qargs
mapped_qargs = map(lambda x: device_qreg[layout[x]], premap_qargs)
mapped_op_node.qargs = list(mapped_qargs)
return mapped_op_node