From cf7f833716849a9e6ef75f114d348474839fc059 Mon Sep 17 00:00:00 2001 From: Jake Lishman Date: Fri, 30 Sep 2022 21:16:20 +0100 Subject: [PATCH 1/2] Update GateDirection and checker for control flow These are largely straightforwards, but since each pass has essentially two separate versions, it became simpler to refactor them out when doing the recursion, rather than using internal closures. As part of the refactor, we also swap to using `Target.instruction_supported` to check for validity - this method was added after the passes were originally made target-aware. --- .../passes/utils/check_gate_direction.py | 62 ++++-- .../transpiler/passes/utils/gate_direction.py | 188 +++++++++--------- ...ate-direction-target-a9f0acd0cf30ed66.yaml | 6 + .../transpiler/test_check_gate_direction.py | 99 ++++++++- test/python/transpiler/test_gate_direction.py | 145 +++++++++++++- 5 files changed, 380 insertions(+), 120 deletions(-) create mode 100644 releasenotes/notes/gate-direction-target-a9f0acd0cf30ed66.yaml diff --git a/qiskit/transpiler/passes/utils/check_gate_direction.py b/qiskit/transpiler/passes/utils/check_gate_direction.py index a9d339788744..bf9615dd5233 100644 --- a/qiskit/transpiler/passes/utils/check_gate_direction.py +++ b/qiskit/transpiler/passes/utils/check_gate_direction.py @@ -12,7 +12,8 @@ """Check if the gates follow the right direction with respect to the coupling map.""" -from qiskit.transpiler.layout import Layout +from qiskit.circuit import ControlFlowOp +from qiskit.converters import circuit_to_dag from qiskit.transpiler.basepasses import AnalysisPass @@ -33,6 +34,40 @@ def __init__(self, coupling_map, target=None): self.coupling_map = coupling_map self.target = target + def _coupling_map_visit(self, dag, wire_map, edges=None): + if edges is None: + edges = self.coupling_map.get_edges() + for node in dag.two_qubit_ops(): + if isinstance(node.op, ControlFlowOp): + continue + if (wire_map[node.qargs[0]], wire_map[node.qargs[1]]) not in edges: + return False + for node in dag.op_nodes(ControlFlowOp): + for block in node.op.blocks: + inner_wire_map = { + inner: wire_map[outer] for outer, inner in zip(node.qargs, block.qubits) + } + if not self._coupling_map_visit(circuit_to_dag(block), inner_wire_map, edges): + return False + return True + + def _target_visit(self, dag, wire_map): + for node in dag.two_qubit_ops(): + if isinstance(node.op, ControlFlowOp): + continue + if not self.target.instruction_supported( + node.op.name, (wire_map[node.qargs[0]], wire_map[node.qargs[1]]) + ): + return False + for node in dag.op_nodes(ControlFlowOp): + for block in node.op.blocks: + inner_wire_map = { + inner: wire_map[outer] for outer, inner in zip(node.qargs, block.qubits) + } + if not self._target_visit(circuit_to_dag(block), inner_wire_map): + return False + return True + def run(self, dag): """Run the CheckGateDirection pass on `dag`. @@ -42,22 +77,9 @@ def run(self, dag): Args: dag (DAGCircuit): DAG to check. """ - self.property_set["is_direction_mapped"] = True - edges = self.coupling_map.get_edges() - trivial_layout = Layout.generate_trivial_layout(*dag.qregs.values()) - if self.target is None: - for gate in dag.two_qubit_ops(): - physical_q0 = trivial_layout[gate.qargs[0]] - physical_q1 = trivial_layout[gate.qargs[1]] - - if (physical_q0, physical_q1) not in edges: - self.property_set["is_direction_mapped"] = False - return - else: - for gate in dag.two_qubit_ops(): - physical_q0 = trivial_layout[gate.qargs[0]] - physical_q1 = trivial_layout[gate.qargs[1]] - - if (physical_q0, physical_q1) not in self.target[gate.op.name]: - self.property_set["is_direction_mapped"] = False - return + wire_map = {bit: i for i, bit in enumerate(dag.qubits)} + self.property_set["is_direction_mapped"] = ( + self._coupling_map_visit(dag, wire_map) + if self.target is None + else self._target_visit(dag, wire_map) + ) diff --git a/qiskit/transpiler/passes/utils/gate_direction.py b/qiskit/transpiler/passes/utils/gate_direction.py index 3c331441d83e..43550d86d290 100644 --- a/qiskit/transpiler/passes/utils/gate_direction.py +++ b/qiskit/transpiler/passes/utils/gate_direction.py @@ -14,11 +14,11 @@ from math import pi -from qiskit.transpiler.layout import Layout from qiskit.transpiler.basepasses import TransformationPass from qiskit.transpiler.exceptions import TranspilerError -from qiskit.circuit import QuantumRegister +from qiskit.converters import dag_to_circuit, circuit_to_dag +from qiskit.circuit import QuantumRegister, ControlFlowOp from qiskit.dagcircuit import DAGCircuit from qiskit.circuit.library.standard_gates import RYGate, HGate, CXGate, CZGate, ECRGate, RZXGate @@ -83,6 +83,8 @@ def __init__(self, coupling_map, target=None): self._cz_dag.add_qreg(qr) self._cz_dag.apply_operation_back(CZGate(), [qr[1], qr[0]], []) + self._static_replacements = {"cx": self._cx_dag, "cz": self._cz_dag, "ecr": self._ecr_dag} + @staticmethod def _rzx_dag(parameter): _rzx_dag = DAGCircuit() @@ -95,6 +97,90 @@ def _rzx_dag(parameter): _rzx_dag.apply_operation_back(HGate(), [qr[1]], []) return _rzx_dag + def _run_coupling_map(self, dag, wire_map, edges=None): + if edges is None: + edges = set(self.coupling_map.get_edges()) + if not edges: + return dag + for node in dag.two_qubit_ops(): + if isinstance(node.op, ControlFlowOp): + continue + qargs = (wire_map[node.qargs[0]], wire_map[node.qargs[1]]) + if qargs not in edges and (qargs[1], qargs[0]) not in edges: + raise TranspilerError( + f"The circuit requires a connection between physical qubits {qargs}" + ) + if qargs not in edges: + replacement = self._static_replacements.get(node.name) + if replacement is not None: + dag.substitute_node_with_dag(node, replacement) + elif node.name == "rzx": + dag.substitute_node_with_dag(node, self._rzx_dag(*node.op.params)) + else: + raise TranspilerError( + f"Flipping of gate direction is only supported " + f"for {list(self._static_replacements)} at this time, not '{node.name}'." + ) + for node in dag.op_nodes(ControlFlowOp): + node.op = node.op.replace_blocks( + dag_to_circuit( + self._run_coupling_map( + circuit_to_dag(block), + {inner: wire_map[outer] for outer, inner in zip(node.qargs, block.qubits)}, + edges, + ) + ) + for block in node.op.blocks + ) + return dag + + def _run_target(self, dag, wire_map): + for node in dag.two_qubit_ops(): + if isinstance(node.op, ControlFlowOp): + continue + qargs = (wire_map[node.qargs[0]], wire_map[node.qargs[1]]) + swapped = (qargs[1], qargs[0]) + if node.name in self._static_replacements: + if self.target.instruction_supported(node.name, qargs): + continue + if self.target.instruction_supported(node.name, swapped): + dag.substitute_node_with_dag(node, self._static_replacements[node.name]) + else: + raise TranspilerError( + f"The circuit requires a connection between physical qubits {qargs}" + f" for {node.name}" + ) + elif node.name == "rzx": + if self.target.instruction_supported( + qargs=qargs, operation_class=RZXGate, parameters=node.op.params + ): + continue + if self.target.instruction_supported( + qargs=swapped, operation_class=RZXGate, parameters=node.op.params + ): + dag.substitute_node_with_dag(node, self._rzx_dag(*node.op.params)) + else: + raise TranspilerError( + f"The circuit requires a connection between physical qubits {qargs}" + f" for {node.name}" + ) + else: + raise TranspilerError( + f"Flipping of gate direction is only supported " + f"for {list(self._static_replacements)} at this time, not '{node.name}'." + ) + for node in dag.op_nodes(ControlFlowOp): + node.op = node.op.replace_blocks( + dag_to_circuit( + self._run_target( + circuit_to_dag(block), + {inner: wire_map[outer] for outer, inner in zip(node.qargs, block.qubits)}, + ) + ) + for block in node.op.blocks + ) + return dag + def run(self, dag): """Run the GateDirection pass on `dag`. @@ -111,104 +197,12 @@ def run(self, dag): TranspilerError: If the circuit cannot be mapped just by flipping the cx nodes. """ - trivial_layout = Layout.generate_trivial_layout(*dag.qregs.values()) - layout_map = trivial_layout.get_virtual_bits() + layout_map = {bit: i for i, bit in enumerate(dag.qubits)} if len(dag.qregs) > 1: raise TranspilerError( "GateDirection expects a single qreg input DAG," "but input DAG had qregs: {}.".format(dag.qregs) ) if self.target is None: - cmap_edges = set(self.coupling_map.get_edges()) - if not cmap_edges: - return dag - - self.coupling_map.compute_distance_matrix() - - dist_matrix = self.coupling_map.distance_matrix - - for node in dag.two_qubit_ops(): - control = node.qargs[0] - target = node.qargs[1] - - physical_q0 = layout_map[control] - physical_q1 = layout_map[target] - - if dist_matrix[physical_q0, physical_q1] != 1: - raise TranspilerError( - "The circuit requires a connection between physical " - "qubits %s and %s" % (physical_q0, physical_q1) - ) - - if (physical_q0, physical_q1) not in cmap_edges: - if node.name == "cx": - dag.substitute_node_with_dag(node, self._cx_dag) - elif node.name == "cz": - dag.substitute_node_with_dag(node, self._cz_dag) - elif node.name == "ecr": - dag.substitute_node_with_dag(node, self._ecr_dag) - elif node.name == "rzx": - dag.substitute_node_with_dag(node, self._rzx_dag(*node.op.params)) - else: - raise TranspilerError( - f"Flipping of gate direction is only supported " - f"for CX, ECR, and RZX at this time, not {node.name}." - ) - else: - # TODO: Work with the gate instances and only use names as look up keys. - # This will require iterating over the target names to build a mapping - # of names to gates that implement CXGate, ECRGate, RZXGate (including - # fixed angle variants) - for node in dag.two_qubit_ops(): - control = node.qargs[0] - target = node.qargs[1] - - physical_q0 = layout_map[control] - physical_q1 = layout_map[target] - - if node.name == "cx": - if (physical_q0, physical_q1) in self.target["cx"]: - continue - if (physical_q1, physical_q0) in self.target["cx"]: - dag.substitute_node_with_dag(node, self._cx_dag) - else: - raise TranspilerError( - "The circuit requires a connection between physical " - "qubits %s and %s for cx" % (physical_q0, physical_q1) - ) - elif node.name == "cz": - if (physical_q0, physical_q1) in self.target["cz"]: - continue - if (physical_q1, physical_q0) in self.target["cz"]: - dag.substitute_node_with_dag(node, self._cz_dag) - else: - raise TranspilerError( - "The circuit requires a connection between physical " - "qubits %s and %s for cz" % (physical_q0, physical_q1) - ) - elif node.name == "ecr": - if (physical_q0, physical_q1) in self.target["ecr"]: - continue - if (physical_q1, physical_q0) in self.target["ecr"]: - dag.substitute_node_with_dag(node, self._ecr_dag) - else: - raise TranspilerError( - "The circuit requires a connection between physical " - "qubits %s and %s for ecr" % (physical_q0, physical_q1) - ) - elif node.name == "rzx": - if (physical_q0, physical_q1) in self.target["rzx"]: - continue - if (physical_q1, physical_q0) in self.target["rzx"]: - dag.substitute_node_with_dag(node, self._rzx_dag(*node.op.params)) - else: - raise TranspilerError( - "The circuit requires a connection between physical " - "qubits %s and %s for rzx" % (physical_q0, physical_q1) - ) - else: - raise TranspilerError( - f"Flipping of gate direction is only supported " - f"for CX, ECR, and RZX at this time, not {node.name}." - ) - return dag + return self._run_coupling_map(dag, layout_map) + return self._run_target(dag, layout_map) diff --git a/releasenotes/notes/gate-direction-target-a9f0acd0cf30ed66.yaml b/releasenotes/notes/gate-direction-target-a9f0acd0cf30ed66.yaml new file mode 100644 index 000000000000..a0a56ff0d85a --- /dev/null +++ b/releasenotes/notes/gate-direction-target-a9f0acd0cf30ed66.yaml @@ -0,0 +1,6 @@ +--- +fixes: + - | + The :class:`.GateDirection` transpiler pass will now respect the available + values for gate parameters when handling parametrised gates with a + :class:`.Target`. diff --git a/test/python/transpiler/test_check_gate_direction.py b/test/python/transpiler/test_check_gate_direction.py index f6cf038fe70b..48131f9705d4 100644 --- a/test/python/transpiler/test_check_gate_direction.py +++ b/test/python/transpiler/test_check_gate_direction.py @@ -14,13 +14,17 @@ import unittest +import ddt + from qiskit import QuantumRegister, QuantumCircuit +from qiskit.circuit.library import CXGate, CZGate, ECRGate from qiskit.transpiler.passes import CheckGateDirection -from qiskit.transpiler import CouplingMap +from qiskit.transpiler import CouplingMap, Target from qiskit.converters import circuit_to_dag from qiskit.test import QiskitTestCase +@ddt.ddt class TestCheckGateDirection(QiskitTestCase): """Tests the CheckGateDirection pass""" @@ -196,6 +200,99 @@ def test_ecr_gate(self): self.assertFalse(pass_.property_set["is_direction_mapped"]) + @ddt.data(CXGate(), CZGate(), ECRGate()) + def test_target_static(self, gate): + """Test that static 2q gates are detected correctly both if available and not available.""" + circuit = QuantumCircuit(2) + circuit.append(gate, [0, 1], []) + + matching = Target(num_qubits=2) + matching.add_instruction(gate, {(0, 1): None}) + pass_ = CheckGateDirection(None, target=matching) + pass_(circuit) + self.assertTrue(pass_.property_set["is_direction_mapped"]) + + swapped = Target(num_qubits=2) + swapped.add_instruction(gate, {(1, 0): None}) + pass_ = CheckGateDirection(None, target=swapped) + pass_(circuit) + self.assertFalse(pass_.property_set["is_direction_mapped"]) + + def test_coupling_map_control_flow(self): + """Test recursing into control-flow operations with a coupling map.""" + matching = CouplingMap.from_line(5, bidirectional=True) + swapped = CouplingMap.from_line(5, bidirectional=False) + + circuit = QuantumCircuit(5, 1) + circuit.h(0) + circuit.measure(0, 0) + with circuit.for_loop((2,)): + circuit.cx(1, 0) + + pass_ = CheckGateDirection(matching) + pass_(circuit) + self.assertTrue(pass_.property_set["is_direction_mapped"]) + pass_ = CheckGateDirection(swapped) + pass_(circuit) + self.assertFalse(pass_.property_set["is_direction_mapped"]) + + circuit = QuantumCircuit(5, 1) + circuit.h(0) + circuit.measure(0, 0) + with circuit.for_loop((2,)): + with circuit.if_test((circuit.clbits[0], True)) as else_: + circuit.cz(3, 2) + with else_: + with circuit.while_loop((circuit.clbits[0], True)): + circuit.ecr(4, 3) + + pass_ = CheckGateDirection(matching) + pass_(circuit) + self.assertTrue(pass_.property_set["is_direction_mapped"]) + pass_ = CheckGateDirection(swapped) + pass_(circuit) + self.assertFalse(pass_.property_set["is_direction_mapped"]) + + def test_target_control_flow(self): + """Test recursing into control-flow operations with a coupling map.""" + swapped = Target(num_qubits=5) + for gate in (CXGate(), CZGate(), ECRGate()): + swapped.add_instruction(gate, {qargs: None for qargs in zip(range(4), range(1, 5))}) + + matching = Target(num_qubits=5) + for gate in (CXGate(), CZGate(), ECRGate()): + matching.add_instruction(gate, {None: None}) + + circuit = QuantumCircuit(5, 1) + circuit.h(0) + circuit.measure(0, 0) + with circuit.for_loop((2,)): + circuit.cx(1, 0) + + pass_ = CheckGateDirection(None, target=matching) + pass_(circuit) + self.assertTrue(pass_.property_set["is_direction_mapped"]) + pass_ = CheckGateDirection(None, target=swapped) + pass_(circuit) + self.assertFalse(pass_.property_set["is_direction_mapped"]) + + circuit = QuantumCircuit(5, 1) + circuit.h(0) + circuit.measure(0, 0) + with circuit.for_loop((2,)): + with circuit.if_test((circuit.clbits[0], True)) as else_: + circuit.cz(3, 2) + with else_: + with circuit.while_loop((circuit.clbits[0], True)): + circuit.ecr(4, 3) + + pass_ = CheckGateDirection(None, target=matching) + pass_(circuit) + self.assertTrue(pass_.property_set["is_direction_mapped"]) + pass_ = CheckGateDirection(None, target=swapped) + pass_(circuit) + self.assertFalse(pass_.property_set["is_direction_mapped"]) + if __name__ == "__main__": unittest.main() diff --git a/test/python/transpiler/test_gate_direction.py b/test/python/transpiler/test_gate_direction.py index 59fa04a517a2..6fd3edd6033a 100644 --- a/test/python/transpiler/test_gate_direction.py +++ b/test/python/transpiler/test_gate_direction.py @@ -11,18 +11,23 @@ # that they have been altered from the originals. """Test the CX Direction pass""" + import unittest from math import pi +import ddt + from qiskit import ClassicalRegister, QuantumRegister, QuantumCircuit +from qiskit.circuit import Parameter +from qiskit.circuit.library import CXGate, CZGate, ECRGate, RZXGate from qiskit.compiler import transpile -from qiskit.transpiler import TranspilerError -from qiskit.transpiler import CouplingMap +from qiskit.transpiler import TranspilerError, CouplingMap, Target from qiskit.transpiler.passes import GateDirection from qiskit.converters import circuit_to_dag from qiskit.test import QiskitTestCase +@ddt.ddt class TestGateDirection(QiskitTestCase): """Tests the GateDirection pass.""" @@ -261,6 +266,142 @@ def test_regression_gh_8387(self): optimization_level=2, ) + @ddt.data(CXGate(), CZGate(), ECRGate()) + def test_target_static(self, gate): + """Test that static 2q gates are swapped correctly both if available and not available.""" + circuit = QuantumCircuit(2) + circuit.append(gate, [0, 1], []) + + matching = Target(num_qubits=2) + matching.add_instruction(gate, {(0, 1): None}) + self.assertEqual(GateDirection(None, target=matching)(circuit), circuit) + + swapped = Target(num_qubits=2) + swapped.add_instruction(gate, {(1, 0): None}) + self.assertNotEqual(GateDirection(None, target=swapped)(circuit), circuit) + + def test_target_parameter_any(self): + """Test that a parametrised 2q gate is replaced correctly both if available and not + available.""" + circuit = QuantumCircuit(2) + circuit.rzx(1.5, 0, 1) + + matching = Target(num_qubits=2) + matching.add_instruction(RZXGate(Parameter("a")), {(0, 1): None}) + self.assertEqual(GateDirection(None, target=matching)(circuit), circuit) + + swapped = Target(num_qubits=2) + swapped.add_instruction(RZXGate(Parameter("a")), {(1, 0): None}) + self.assertNotEqual(GateDirection(None, target=swapped)(circuit), circuit) + + def test_target_parameter_exact(self): + """Test that a parametrised 2q gate is detected correctly both if available and not + available.""" + circuit = QuantumCircuit(2) + circuit.rzx(1.5, 0, 1) + + matching = Target(num_qubits=2) + matching.add_instruction(RZXGate(1.5), {(0, 1): None}) + self.assertEqual(GateDirection(None, target=matching)(circuit), circuit) + + swapped = Target(num_qubits=2) + swapped.add_instruction(RZXGate(1.5), {(1, 0): None}) + self.assertNotEqual(GateDirection(None, target=swapped)(circuit), circuit) + + def test_target_parameter_mismatch(self): + """Test that the pass raises if a gate is not supported due to a parameter mismatch.""" + circuit = QuantumCircuit(2) + circuit.rzx(1.5, 0, 1) + + matching = Target(num_qubits=2) + matching.add_instruction(RZXGate(2.5), {(0, 1): None}) + pass_ = GateDirection(None, target=matching) + with self.assertRaises(TranspilerError): + pass_(circuit) + + swapped = Target(num_qubits=2) + swapped.add_instruction(RZXGate(2.5), {(1, 0): None}) + pass_ = GateDirection(None, target=swapped) + with self.assertRaises(TranspilerError): + pass_(circuit) + + def test_coupling_map_control_flow(self): + """Test that gates are replaced within nested control-flow blocks.""" + circuit = QuantumCircuit(4, 1) + circuit.h(0) + circuit.measure(0, 0) + with circuit.for_loop((1, 2)): + circuit.cx(1, 0) + circuit.cx(0, 1) + with circuit.if_test((circuit.clbits[0], True)) as else_: + circuit.ecr(3, 2) + with else_: + with circuit.while_loop((circuit.clbits[0], True)): + circuit.rzx(2.3, 2, 1) + + expected = QuantumCircuit(4, 1) + expected.h(0) + expected.measure(0, 0) + with expected.for_loop((1, 2)): + expected.h([0, 1]) + expected.cx(0, 1) + expected.h([0, 1]) + expected.cx(0, 1) + with expected.if_test((circuit.clbits[0], True)) as else_: + expected.ry(pi / 2, 2) + expected.ry(-pi / 2, 3) + expected.ecr(2, 3) + expected.h([2, 3]) + with else_: + with expected.while_loop((circuit.clbits[0], True)): + expected.h([1, 2]) + expected.rzx(2.3, 1, 2) + expected.h([1, 2]) + + coupling = CouplingMap.from_line(4, bidirectional=False) + pass_ = GateDirection(coupling) + self.assertEqual(pass_(circuit), expected) + + def test_target_control_flow(self): + """Test that gates are replaced within nested control-flow blocks.""" + circuit = QuantumCircuit(4, 1) + circuit.h(0) + circuit.measure(0, 0) + with circuit.for_loop((1, 2)): + circuit.cx(1, 0) + circuit.cx(0, 1) + with circuit.if_test((circuit.clbits[0], True)) as else_: + circuit.ecr(3, 2) + with else_: + with circuit.while_loop((circuit.clbits[0], True)): + circuit.rzx(2.3, 2, 1) + + expected = QuantumCircuit(4, 1) + expected.h(0) + expected.measure(0, 0) + with expected.for_loop((1, 2)): + expected.h([0, 1]) + expected.cx(0, 1) + expected.h([0, 1]) + expected.cx(0, 1) + with expected.if_test((circuit.clbits[0], True)) as else_: + expected.ry(pi / 2, 2) + expected.ry(-pi / 2, 3) + expected.ecr(2, 3) + expected.h([2, 3]) + with else_: + with expected.while_loop((circuit.clbits[0], True)): + expected.h([1, 2]) + expected.rzx(2.3, 1, 2) + expected.h([1, 2]) + + target = Target(num_qubits=4) + target.add_instruction(CXGate(), {(0, 1): None}) + target.add_instruction(ECRGate(), {(2, 3): None}) + target.add_instruction(RZXGate(Parameter("a")), {(1, 2): None}) + pass_ = GateDirection(None, target) + self.assertEqual(pass_(circuit), expected) + if __name__ == "__main__": unittest.main() From eac245e4822a4cf0a4630e7599fccddad8f489b2 Mon Sep 17 00:00:00 2001 From: Jake Lishman Date: Fri, 30 Sep 2022 22:41:58 +0100 Subject: [PATCH 2/2] Avoid duplicate iteration through all nodes Co-authored-by: Matthew Treinish --- .../passes/utils/check_gate_direction.py | 41 +++++++------- .../transpiler/passes/utils/gate_direction.py | 56 +++++++++++-------- 2 files changed, 54 insertions(+), 43 deletions(-) diff --git a/qiskit/transpiler/passes/utils/check_gate_direction.py b/qiskit/transpiler/passes/utils/check_gate_direction.py index bf9615dd5233..7a32800ab882 100644 --- a/qiskit/transpiler/passes/utils/check_gate_direction.py +++ b/qiskit/transpiler/passes/utils/check_gate_direction.py @@ -37,35 +37,36 @@ def __init__(self, coupling_map, target=None): def _coupling_map_visit(self, dag, wire_map, edges=None): if edges is None: edges = self.coupling_map.get_edges() - for node in dag.two_qubit_ops(): + # Don't include directives to avoid things like barrier, which are assumed always supported. + for node in dag.op_nodes(include_directives=False): if isinstance(node.op, ControlFlowOp): - continue - if (wire_map[node.qargs[0]], wire_map[node.qargs[1]]) not in edges: + for block in node.op.blocks: + inner_wire_map = { + inner: wire_map[outer] for outer, inner in zip(node.qargs, block.qubits) + } + if not self._coupling_map_visit(circuit_to_dag(block), inner_wire_map, edges): + return False + elif ( + len(node.qargs) == 2 + and (wire_map[node.qargs[0]], wire_map[node.qargs[1]]) not in edges + ): return False - for node in dag.op_nodes(ControlFlowOp): - for block in node.op.blocks: - inner_wire_map = { - inner: wire_map[outer] for outer, inner in zip(node.qargs, block.qubits) - } - if not self._coupling_map_visit(circuit_to_dag(block), inner_wire_map, edges): - return False return True def _target_visit(self, dag, wire_map): - for node in dag.two_qubit_ops(): + # Don't include directives to avoid things like barrier, which are assumed always supported. + for node in dag.op_nodes(include_directives=False): if isinstance(node.op, ControlFlowOp): - continue - if not self.target.instruction_supported( + for block in node.op.blocks: + inner_wire_map = { + inner: wire_map[outer] for outer, inner in zip(node.qargs, block.qubits) + } + if not self._target_visit(circuit_to_dag(block), inner_wire_map): + return False + elif len(node.qargs) == 2 and not self.target.instruction_supported( node.op.name, (wire_map[node.qargs[0]], wire_map[node.qargs[1]]) ): return False - for node in dag.op_nodes(ControlFlowOp): - for block in node.op.blocks: - inner_wire_map = { - inner: wire_map[outer] for outer, inner in zip(node.qargs, block.qubits) - } - if not self._target_visit(circuit_to_dag(block), inner_wire_map): - return False return True def run(self, dag): diff --git a/qiskit/transpiler/passes/utils/gate_direction.py b/qiskit/transpiler/passes/utils/gate_direction.py index 43550d86d290..350b0584fa85 100644 --- a/qiskit/transpiler/passes/utils/gate_direction.py +++ b/qiskit/transpiler/passes/utils/gate_direction.py @@ -102,8 +102,24 @@ def _run_coupling_map(self, dag, wire_map, edges=None): edges = set(self.coupling_map.get_edges()) if not edges: return dag - for node in dag.two_qubit_ops(): + # Don't include directives to avoid things like barrier, which are assumed always supported. + for node in dag.op_nodes(include_directives=False): if isinstance(node.op, ControlFlowOp): + node.op = node.op.replace_blocks( + dag_to_circuit( + self._run_coupling_map( + circuit_to_dag(block), + { + inner: wire_map[outer] + for outer, inner in zip(node.qargs, block.qubits) + }, + edges, + ) + ) + for block in node.op.blocks + ) + continue + if len(node.qargs) != 2: continue qargs = (wire_map[node.qargs[0]], wire_map[node.qargs[1]]) if qargs not in edges and (qargs[1], qargs[0]) not in edges: @@ -121,22 +137,26 @@ def _run_coupling_map(self, dag, wire_map, edges=None): f"Flipping of gate direction is only supported " f"for {list(self._static_replacements)} at this time, not '{node.name}'." ) - for node in dag.op_nodes(ControlFlowOp): - node.op = node.op.replace_blocks( - dag_to_circuit( - self._run_coupling_map( - circuit_to_dag(block), - {inner: wire_map[outer] for outer, inner in zip(node.qargs, block.qubits)}, - edges, - ) - ) - for block in node.op.blocks - ) return dag def _run_target(self, dag, wire_map): - for node in dag.two_qubit_ops(): + # Don't include directives to avoid things like barrier, which are assumed always supported. + for node in dag.op_nodes(include_directives=False): if isinstance(node.op, ControlFlowOp): + node.op = node.op.replace_blocks( + dag_to_circuit( + self._run_target( + circuit_to_dag(block), + { + inner: wire_map[outer] + for outer, inner in zip(node.qargs, block.qubits) + }, + ) + ) + for block in node.op.blocks + ) + continue + if len(node.qargs) != 2: continue qargs = (wire_map[node.qargs[0]], wire_map[node.qargs[1]]) swapped = (qargs[1], qargs[0]) @@ -169,16 +189,6 @@ def _run_target(self, dag, wire_map): f"Flipping of gate direction is only supported " f"for {list(self._static_replacements)} at this time, not '{node.name}'." ) - for node in dag.op_nodes(ControlFlowOp): - node.op = node.op.replace_blocks( - dag_to_circuit( - self._run_target( - circuit_to_dag(block), - {inner: wire_map[outer] for outer, inner in zip(node.qargs, block.qubits)}, - ) - ) - for block in node.op.blocks - ) return dag def run(self, dag):