diff --git a/qiskit/circuit/controlflow/builder.py b/qiskit/circuit/controlflow/builder.py index 349ec0db0a42..95abf5446b81 100644 --- a/qiskit/circuit/controlflow/builder.py +++ b/qiskit/circuit/controlflow/builder.py @@ -17,12 +17,14 @@ # having a far more complete builder of all circuits, with more classical control and creation, in # the future. +from __future__ import annotations import abc import itertools import typing -from typing import Callable, Collection, Iterable, List, FrozenSet, Tuple, Union, Optional +from typing import Collection, Iterable, List, FrozenSet, Tuple, Union, Optional +from qiskit.circuit.classical import expr from qiskit.circuit.classicalregister import Clbit, ClassicalRegister from qiskit.circuit.exceptions import CircuitError from qiskit.circuit.instruction import Instruction @@ -36,6 +38,120 @@ import qiskit +class CircuitScopeInterface(abc.ABC): + """An interface that circuits and builder blocks explicitly fulfil, which contains the primitive + methods of circuit construction and object validation. + + This allows core circuit methods to be applied to the currently open builder scope, and allows + the builders to hook into all places where circuit resources might be used. This allows the + builders to track the resources being used, without getting in the way of + :class:`.QuantumCircuit` doing its own thing. + + This is a Qiskit-internal interface.""" + + __slots__ = () + + @abc.abstractmethod + def append(self, instruction: CircuitInstruction) -> CircuitInstruction: + """Low-level 'append' primitive; this may assume that the qubits, clbits and operation are + all valid for the circuit. + + Abstraction of :meth:`.QuantumCircuit._append` (the low-level one, not the high-level). + + Args: + instruction: the resource-validated instruction context object. + + Returns: + the instruction context object actually appended. This is not required to be the same + as the object given (but typically will be). + """ + + @abc.abstractmethod + def resolve_classical_resource( + self, specifier: Clbit | ClassicalRegister | int + ) -> Clbit | ClassicalRegister: + """Resolve a single bit-like classical-resource specifier. + + A resource refers to either a classical bit or a register, where integers index into the + classical bits of the greater circuit. + + This is called whenever a classical bit or register is being used outside the standard + :class:`.Clbit` usage of instructions in :meth:`append`, such as in a legacy two-tuple + condition. + + Args: + specifier: the classical resource specifier. + + Returns: + the resolved resource. This cannot be an integer any more; an integer input is resolved + into a classical bit. + + Raises: + CircuitError: if the resource cannot be used by the scope, such as an out-of-range index + or a :class:`.Clbit` that isn't actually in the circuit. + """ + + @abc.abstractmethod + def add_uninitialized_var(self, var: expr.Var): + """Add an uninitialized variable to the circuit scope. + + The general circuit context is responsible for ensuring the variable is initialized. These + uninitialized variables are guaranteed to be standalone. + + Args: + var: the variable to add, if valid. + + Raises: + CircuitError: if the variable cannot be added, such as because it invalidly shadows or + redefines an existing name. + """ + + @abc.abstractmethod + def remove_var(self, var: expr.Var): + """Remove a variable from the locals of this scope. + + This is only called in the case that an exception occurred while initializing the variable, + and is not exposed to users. + + Args: + var: the variable to remove. It can be assumed that this was already the subject of an + :meth:`add_uninitialized_var` call. + """ + + @abc.abstractmethod + def use_var(self, var: expr.Var): + """Called for every standalone classical runtime variable being used by some circuit + instruction. + + The given variable is guaranteed to be a stand-alone variable; bit-like resource-wrapping + variables will have been filtered out and their resources given to + :meth:`resolve_classical_resource`. + + Args: + var: the variable to validate. + + Returns: + the same variable. + + Raises: + CircuitError: if the variable is not valid for this scope. + """ + + @abc.abstractmethod + def get_var(self, name: str) -> Optional[expr.Var]: + """Get the variable (if any) in scope with the given name. + + This should call up to the parent scope if in a control-flow builder scope, in case the + variable exists in an outer scope. + + Args: + name: the name of the symbol to lookup. + + Returns: + the variable if it is found, otherwise ``None``. + """ + + class InstructionResources(typing.NamedTuple): """The quantum and classical resources used within a particular instruction. @@ -169,7 +285,7 @@ def repeat(self, n): raise CircuitError("Cannot repeat a placeholder instruction.") -class ControlFlowBuilderBlock: +class ControlFlowBuilderBlock(CircuitScopeInterface): """A lightweight scoped block for holding instructions within a control-flow builder context. This class is designed only to be used by :obj:`.QuantumCircuit` as an internal context for @@ -204,9 +320,11 @@ class ControlFlowBuilderBlock: "clbits", "registers", "_allow_jumps", - "_resource_requester", + "_parent", "_built", "_forbidden_message", + "_vars_local", + "_vars_capture", ) def __init__( @@ -214,8 +332,8 @@ def __init__( qubits: Iterable[Qubit], clbits: Iterable[Clbit], *, + parent: CircuitScopeInterface, registers: Iterable[Register] = (), - resource_requester: Callable, allow_jumps: bool = True, forbidden_message: Optional[str] = None, ): @@ -237,13 +355,7 @@ def __init__( uses *exactly* the same set of resources. We cannot verify this from within the builder interface (and it is too expensive to do when the ``for`` op is made), so we fail safe, and require the user to use the more verbose, internal form. - resource_requester: A callback function that takes in some classical resource specifier, - and returns a concrete classical resource, if this scope is allowed to access that - resource. In almost all cases, this should be a resolver from the - :obj:`.QuantumCircuit` that this scope is contained in. See - :meth:`.QuantumCircuit._resolve_classical_resource` for the normal expected input - here, and the documentation of :obj:`.InstructionSet`, which uses this same - callback. + parent: The scope interface of the containing scope. forbidden_message: If a string is given here, a :exc:`.CircuitError` will be raised on any attempts to append instructions to the scope with this message. This is used by pseudo scopes where the state machine of the builder scopes has changed into a @@ -254,8 +366,10 @@ def __init__( self.qubits = set(qubits) self.clbits = set(clbits) self.registers = set(registers) + self._vars_local = {} + self._vars_capture = {} self._allow_jumps = allow_jumps - self._resource_requester = resource_requester + self._parent = parent self._built = False self._forbidden_message = forbidden_message @@ -274,8 +388,6 @@ def allow_jumps(self): return self._allow_jumps def append(self, instruction: CircuitInstruction) -> CircuitInstruction: - """Add an instruction into the scope, keeping track of the qubits and clbits that have been - used in total.""" if self._forbidden_message is not None: raise CircuitError(self._forbidden_message) @@ -296,33 +408,60 @@ def append(self, instruction: CircuitInstruction) -> CircuitInstruction: self.clbits.update(instruction.clbits) return instruction - def request_classical_resource(self, specifier): - """Resolve a single classical resource specifier into a concrete resource, raising an error - if the specifier is invalid, and track it as now being used in scope. - - Args: - specifier (Union[Clbit, ClassicalRegister, int]): a specifier of a classical resource - present in this circuit. An ``int`` will be resolved into a :obj:`.Clbit` using the - same conventions that measurement operations on this circuit use. - - Returns: - Union[Clbit, ClassicalRegister]: the requested resource, resolved into a concrete - instance of :obj:`.Clbit` or :obj:`.ClassicalRegister`. - - Raises: - CircuitError: if the resource is not present in this circuit, or if the integer index - passed is out-of-bounds. - """ + def resolve_classical_resource(self, specifier): if self._built: raise CircuitError("Cannot add resources after the scope has been built.") # Allow the inner resolve to propagate exceptions. - resource = self._resource_requester(specifier) + resource = self._parent.resolve_classical_resource(specifier) if isinstance(resource, Clbit): self.add_bits((resource,)) else: self.add_register(resource) return resource + def add_uninitialized_var(self, var: expr.Var): + if self._built: + raise CircuitError("Cannot add resources after the scope has been built.") + # We can shadow a name if it was declared in an outer scope, but only if we haven't already + # captured it ourselves yet. + if (previous := self._vars_local.get(var.name)) is not None: + if previous == var: + raise CircuitError(f"'{var}' is already present in the scope") + raise CircuitError(f"cannot add '{var}' as its name shadows the existing '{previous}'") + if var.name in self._vars_capture: + raise CircuitError(f"cannot add '{var}' as its name shadows the existing '{previous}'") + self._vars_local[var.name] = var + + def remove_var(self, var: expr.Var): + if self._built: + raise RuntimeError("exception handler 'remove_var' called after scope built") + self._vars_local.pop(var.name) + + def get_var(self, name: str): + if (out := self._vars_local.get(name)) is not None: + return out + return self._parent.get_var(name) + + def use_var(self, var: expr.Var): + if (local := self._vars_local.get(var.name)) is not None: + if local == var: + return + raise CircuitError(f"cannot use '{var}' which is shadowed by the local '{local}'") + if self._vars_capture.get(var.name) == var: + return + if self._parent.get_var(var.name) != var: + raise CircuitError(f"cannot close over '{var}', which is not in scope") + self._parent.use_var(var) + self._vars_capture[var.name] = var + + def iter_local_vars(self): + """Iterator over the variables currently declared in this scope.""" + return self._vars_local.values() + + def iter_captured_vars(self): + """Iterator over the variables currently captured in this scope.""" + return self._vars_capture.values() + def peek(self) -> CircuitInstruction: """Get the value of the most recent instruction tuple in this scope.""" if not self.instructions: @@ -418,7 +557,16 @@ def build( # We start off by only giving the QuantumCircuit the qubits we _know_ it will need, and add # more later as needed. - out = QuantumCircuit(list(self.qubits), list(self.clbits), *self.registers) + out = QuantumCircuit( + list(self.qubits), + list(self.clbits), + *self.registers, + captures=self._vars_capture.values(), + ) + for var in self._vars_local.values(): + # The requisite `Store` instruction to initialise the variable will have been appended + # into the instructions. + out.add_uninitialized_var(var) for instruction in self.instructions: if isinstance(instruction.operation, InstructionPlaceholder): @@ -483,6 +631,9 @@ def copy(self) -> "ControlFlowBuilderBlock": out.qubits = self.qubits.copy() out.clbits = self.clbits.copy() out.registers = self.registers.copy() + out._vars_local = self._vars_local.copy() + out._vars_capture = self._vars_capture.copy() + out._parent = self._parent out._allow_jumps = self._allow_jumps out._forbidden_message = self._forbidden_message return out diff --git a/qiskit/circuit/quantumcircuit.py b/qiskit/circuit/quantumcircuit.py index b6cc184c4e10..8e786e7df702 100644 --- a/qiskit/circuit/quantumcircuit.py +++ b/qiskit/circuit/quantumcircuit.py @@ -49,6 +49,7 @@ from . import _classical_resource_map from ._utils import sort_parameters from .controlflow import ControlFlowOp +from .controlflow.builder import CircuitScopeInterface, ControlFlowBuilderBlock from .controlflow.break_loop import BreakLoopOp, BreakLoopPlaceholder from .controlflow.continue_loop import ContinueLoopOp, ContinueLoopPlaceholder from .controlflow.for_loop import ForLoopOp, ForLoopContext @@ -258,6 +259,10 @@ def __init__( self.name = name self._increment_instances() + # An explicit implementation of the circuit scope builder interface used to dispatch appends + # and the like to the relevant control-flow scope. + self._builder_api = _OuterCircuitScopeInterface(self) + # Data contains a list of instructions and their contexts, # in the order they were applied. self._data: list[CircuitInstruction] = [] @@ -928,6 +933,10 @@ def compose( # has to be strictly larger. This allows composing final measurements onto unitary circuits. if isinstance(other, QuantumCircuit): if not self.clbits and other.clbits: + if dest._control_flow_scopes: + raise CircuitError( + "cannot implicitly add clbits while within a control-flow scope" + ) dest.add_bits(other.clbits) for reg in other.cregs: dest.add_register(reg) @@ -1004,9 +1013,9 @@ def compose( # adjust new instrs before original ones and update all parameters mapped_instrs += dest.data dest.clear() - append = dest._control_flow_scopes[-1].append if dest._control_flow_scopes else dest._append + api = dest._current_scope() for instr in mapped_instrs: - append(instr) + api.append(instr) for gate, cals in other.calibrations.items(): dest._calibrations[gate].update(cals) @@ -1166,6 +1175,9 @@ def iter_vars(self) -> typing.Iterable[expr.Var]: This method will iterate over all variables in scope. For more fine-grained iterators, see :meth:`iter_declared_vars`, :meth:`iter_input_vars` and :meth:`iter_captured_vars`.""" + if self._control_flow_scopes: + builder = self._control_flow_scopes[-1] + return itertools.chain(builder.iter_captured_vars(), builder.iter_local_vars()) return itertools.chain( self._vars_input.values(), self._vars_capture.values(), self._vars_local.values() ) @@ -1174,18 +1186,24 @@ def iter_declared_vars(self) -> typing.Iterable[expr.Var]: """Get an iterable over all runtime classical variables that are declared with automatic storage duration in this scope. This excludes input variables (see :meth:`iter_input_vars`) and captured variables (see :meth:`iter_captured_vars`).""" + if self._control_flow_scopes: + return self._control_flow_scopes[-1].iter_local_vars() return self._vars_local.values() def iter_input_vars(self) -> typing.Iterable[expr.Var]: """Get an iterable over all runtime classical variables that are declared as inputs to this circuit scope. This excludes locally declared variables (see :meth:`iter_declared_vars`) and captured variables (see :meth:`iter_captured_vars`).""" + if self._control_flow_scopes: + return () return self._vars_input.values() def iter_captured_vars(self) -> typing.Iterable[expr.Var]: """Get an iterable over all runtime classical variables that are captured by this circuit scope from a containing scope. This excludes input variables (see :meth:`iter_input_vars`) and locally declared variables (see :meth:`iter_declared_vars`).""" + if self._control_flow_scopes: + return self._control_flow_scopes[-1].iter_captured_vars() return self._vars_capture.values() def __and__(self, rhs: "QuantumCircuit") -> "QuantumCircuit": @@ -1260,61 +1278,6 @@ def cbit_argument_conversion(self, clbit_representation: ClbitSpecifier) -> list clbit_representation, self.clbits, self._clbit_indices, Clbit ) - def _resolve_classical_resource(self, specifier): - """Resolve a single classical resource specifier into a concrete resource, raising an error - if the specifier is invalid. - - This is slightly different to :meth:`.cbit_argument_conversion`, because it should not - unwrap :obj:`.ClassicalRegister` instances into lists, and in general it should not allow - iterables or broadcasting. It is expected to be used as a callback for things like - :meth:`.InstructionSet.c_if` to check the validity of their arguments. - - Args: - specifier (Union[Clbit, ClassicalRegister, int]): a specifier of a classical resource - present in this circuit. An ``int`` will be resolved into a :obj:`.Clbit` using the - same conventions as measurement operations on this circuit use. - - Returns: - Union[Clbit, ClassicalRegister]: the resolved resource. - - Raises: - CircuitError: if the resource is not present in this circuit, or if the integer index - passed is out-of-bounds. - """ - if isinstance(specifier, Clbit): - if specifier not in self._clbit_indices: - raise CircuitError(f"Clbit {specifier} is not present in this circuit.") - return specifier - if isinstance(specifier, ClassicalRegister): - # This is linear complexity for something that should be constant, but QuantumCircuit - # does not currently keep a hashmap of registers, and requires non-trivial changes to - # how it exposes its registers publically before such a map can be safely stored so it - # doesn't miss updates. (Jake, 2021-11-10). - if specifier not in self.cregs: - raise CircuitError(f"Register {specifier} is not present in this circuit.") - return specifier - if isinstance(specifier, int): - try: - return self._clbits[specifier] - except IndexError: - raise CircuitError(f"Classical bit index {specifier} is out-of-range.") from None - raise CircuitError(f"Unknown classical resource specifier: '{specifier}'.") - - def _validate_expr(self, node: expr.Expr) -> expr.Expr: - for var in expr.iter_vars(node): - if var.standalone: - if not self.has_var(var): - raise CircuitError(f"Variable '{var}' is not present in this circuit.") - elif isinstance(var.var, Clbit): - if var.var not in self._clbit_indices: - raise CircuitError(f"Clbit {var.var} is not present in this circuit.") - elif isinstance(var.var, ClassicalRegister): - if var.var not in self.cregs: - raise CircuitError(f"Register {var.var} is not present in this circuit.") - else: - raise RuntimeError(f"unhandled Var inner type in '{var}'") - return node - def append( self, instruction: Operation | CircuitInstruction, @@ -1369,13 +1332,15 @@ def append( "Object to append must be an Operation or have a to_instruction() method." ) + api = self._current_scope() + # Make copy of parameterized gate instances if params := getattr(operation, "params", ()): is_parameter = False for param in params: is_parameter = is_parameter or isinstance(param, Parameter) if isinstance(param, expr.Expr): - self._validate_expr(param) + param = _validate_expr(api, param) if is_parameter: operation = copy.deepcopy(operation) if isinstance(operation, ControlFlowOp): @@ -1396,28 +1361,18 @@ def append( expanded_qargs = [self.qbit_argument_conversion(qarg) for qarg in qargs or []] expanded_cargs = [self.cbit_argument_conversion(carg) for carg in cargs or []] - if self._control_flow_scopes: - appender = self._control_flow_scopes[-1].append - requester = self._control_flow_scopes[-1].request_classical_resource - else: - appender = self._append - requester = self._resolve_classical_resource - instructions = InstructionSet(resource_requester=requester) - if isinstance(operation, Instruction): - for qarg, carg in operation.broadcast_arguments(expanded_qargs, expanded_cargs): - self._check_dups(qarg) - instruction = CircuitInstruction(operation, qarg, carg) - appender(instruction) - instructions.add(instruction) - else: - # For Operations that are non-Instructions, we use the Instruction's default method - for qarg, carg in Instruction.broadcast_arguments( - operation, expanded_qargs, expanded_cargs - ): - self._check_dups(qarg) - instruction = CircuitInstruction(operation, qarg, carg) - appender(instruction) - instructions.add(instruction) + instructions = InstructionSet(resource_requester=api.resolve_classical_resource) + # For Operations that are non-Instructions, we use the Instruction's default method + broadcast_iter = ( + operation.broadcast_arguments(expanded_qargs, expanded_cargs) + if isinstance(operation, Instruction) + else Instruction.broadcast_arguments(operation, expanded_qargs, expanded_cargs) + ) + for qarg, carg in broadcast_iter: + self._check_dups(qarg) + instruction = CircuitInstruction(operation, qarg, carg) + api.append(instruction) + instructions.add(instruction) return instructions # Preferred new style. @@ -1549,19 +1504,14 @@ def get_var(self, name: str, default: typing.Any = ...): assert qc.get_var("my_var", None) is my_var assert qc.get_var("unknown_variable", None) is None """ - - if (out := self._vars_local.get(name)) is not None: - return out - if (out := self._vars_capture.get(name)) is not None: - return out - if (out := self._vars_input.get(name)) is not None: + if (out := self._current_scope().get_var(name)) is not None: return out if default is Ellipsis: raise KeyError(f"no variable named '{name}' is present") return default def has_var(self, name_or_var: str | expr.Var, /) -> bool: - """Check whether a variable is defined in this scope. + """Check whether a variable is accessible in this scope. Args: name_or_var: the variable, or name of a variable to check. If this is a @@ -1569,7 +1519,7 @@ def has_var(self, name_or_var: str | expr.Var, /) -> bool: function to return ``True``. Returns: - whether a matching variable is present. + whether a matching variable is accessible. See also: :meth:`QuantumCircuit.get_var`: retrieve a named variable from a circuit. @@ -1671,15 +1621,24 @@ def add_var(self, name_or_var: str | expr.Var, /, initial: typing.Any) -> expr.V """ # Validate the initialiser first to catch cases where the variable to be declared is being # used in the initialiser. - initial = self._validate_expr(expr.lift(initial)) - var = self._prepare_new_var(name_or_var, initial.type) - # Store is responsible for ensuring the type safety of the initialisation. We build this - # before actually modifying any of our own state, so we don't get into an inconsistent state - # if an exception is raised later. - store = Store(var, initial) - - self._vars_local[var.name] = var - self._append(CircuitInstruction(store, (), ())) + api = self._current_scope() + initial = _validate_expr(api, expr.lift(initial)) + if isinstance(name_or_var, str): + var = expr.Var.new(name_or_var, initial.type) + elif not name_or_var.standalone: + raise CircuitError( + "cannot add variables that wrap `Clbit` or `ClassicalRegister` instances." + ) + else: + var = name_or_var + api.add_uninitialized_var(var) + try: + # Store is responsible for ensuring the type safety of the initialisation. + store = Store(var, initial) + except CircuitError: + api.remove_var(var) + raise + api.append(CircuitInstruction(store, (), ())) return var def add_uninitialized_var(self, var: expr.Var, /): @@ -1709,8 +1668,11 @@ def add_uninitialized_var(self, var: expr.Var, /): # that _some_ sort of handling of uninitialised variables is taken into account in our # structures, so that doesn't become a huge edge case, even though we make no assertions # about the _meaning_ if such an expression was run on hardware. - var = self._prepare_new_var(var, None) - self._vars_local[var.name] = var + if self._control_flow_scopes: + raise CircuitError("cannot add an uninitialized variable in a control-flow scope") + if not var.standalone: + raise CircuitError("cannot add a variable wrapping a bit or register to a circuit") + self._builder_api.add_uninitialized_var(var) def add_capture(self, var: expr.Var): """Add a variable to the circuit that it should capture from a scope it will be contained @@ -1732,6 +1694,11 @@ def add_capture(self, var: expr.Var): Raises: CircuitError: if the variable cannot be created due to shadowing an existing variable. """ + if self._control_flow_scopes: + # Allow manual capturing. Not sure why it'd be useful, but there's a clear expected + # behaviour here. + self._control_flow_scopes[-1].use_var(var) + return if self._vars_input: raise CircuitError( "circuits with input variables cannot be enclosed, so cannot be closures" @@ -1764,6 +1731,8 @@ def add_input( # pylint: disable=missing-raises-doc Raises: CircuitError: if the variable cannot be created due to shadowing an existing variable. """ + if self._control_flow_scopes: + raise CircuitError("cannot add an input variable in a control-flow scope") if self._vars_capture: raise CircuitError("circuits to be enclosed with captures cannot have input variables") if isinstance(name_or_var, expr.Var) and type_ is not None: @@ -2517,6 +2486,7 @@ def copy_empty_like(self, name: str | None = None) -> "QuantumCircuit": # copy registers correctly, in copy.copy they are only copied via reference cpy.qregs = self.qregs.copy() cpy.cregs = self.cregs.copy() + cpy._builder_api = _OuterCircuitScopeInterface(cpy) cpy._qubits = self._qubits.copy() cpy._ancillas = self._ancillas.copy() cpy._clbits = self._clbits.copy() @@ -3330,7 +3300,9 @@ def delay( else: qubits.append(qarg) - instructions = InstructionSet(resource_requester=self._resolve_classical_resource) + instructions = InstructionSet( + resource_requester=self._current_scope().resolve_classical_resource + ) for q in qubits: inst: tuple[ Instruction, Sequence[QubitSpecifier] | None, Sequence[ClbitSpecifier] | None @@ -5220,6 +5192,11 @@ def snapshot(self, label, snapshot_type="statevector", qubits=None, params=None) return self.append(snap, qubits) + def _current_scope(self) -> CircuitScopeInterface: + if self._control_flow_scopes: + return self._control_flow_scopes[-1] + return self._builder_api + def _push_scope( self, qubits: Iterable[Qubit] = (), @@ -5240,28 +5217,18 @@ def _push_scope( forbidden_message: If given, all attempts to add instructions to this scope will raise a :exc:`.CircuitError` with this message. """ - # pylint: disable=cyclic-import - from qiskit.circuit.controlflow.builder import ControlFlowBuilderBlock - - # Chain resource requests so things like registers added to inner scopes via conditions are - # requested in the outer scope as well. - if self._control_flow_scopes: - resource_requester = self._control_flow_scopes[-1].request_classical_resource - else: - resource_requester = self._resolve_classical_resource - self._control_flow_scopes.append( ControlFlowBuilderBlock( qubits, clbits, - resource_requester=resource_requester, + parent=self._current_scope(), registers=registers, allow_jumps=allow_jumps, forbidden_message=forbidden_message, ) ) - def _pop_scope(self) -> "qiskit.circuit.controlflow.builder.ControlFlowBuilderBlock": + def _pop_scope(self) -> ControlFlowBuilderBlock: """Finish a scope used in the control-flow builder interface, and return it to the caller. This should only be done by the control-flow context managers, since they naturally @@ -5389,10 +5356,11 @@ def while_loop(self, condition, body=None, qubits=None, clbits=None, *, label=No Raises: CircuitError: if an incorrect calling convention is used. """ + api = self._current_scope() if isinstance(condition, expr.Expr): - condition = self._validate_expr(condition) + condition = _validate_expr(api, condition) else: - condition = (self._resolve_classical_resource(condition[0]), condition[1]) + condition = (api.resolve_classical_resource(condition[0]), condition[1]) if body is None: if qubits is not None or clbits is not None: @@ -5591,10 +5559,11 @@ def if_test( Returns: A handle to the instruction created. """ + api = self._current_scope() if isinstance(condition, expr.Expr): - condition = self._validate_expr(condition) + condition = _validate_expr(api, condition) else: - condition = (self._resolve_classical_resource(condition[0]), condition[1]) + condition = (api.resolve_classical_resource(condition[0]), condition[1]) if true_body is None: if qubits is not None or clbits is not None: @@ -5657,10 +5626,11 @@ def if_else( Returns: A handle to the instruction created. """ + api = self._current_scope() if isinstance(condition, expr.Expr): - condition = self._validate_expr(condition) + condition = _validate_expr(api, condition) else: - condition = (self._resolve_classical_resource(condition[0]), condition[1]) + condition = (api.resolve_classical_resource(condition[0]), condition[1]) return self.append(IfElseOp(condition, true_body, false_body, label), qubits, clbits) @@ -5740,10 +5710,11 @@ def switch(self, target, cases=None, qubits=None, clbits=None, *, label=None): CircuitError: if an incorrect calling convention is used. """ + api = self._current_scope() if isinstance(target, expr.Expr): - target = self._validate_expr(target) + target = _validate_expr(api, target) else: - target = self._resolve_classical_resource(target) + target = api.resolve_classical_resource(target) if cases is None: if qubits is not None or clbits is not None: raise CircuitError( @@ -5964,6 +5935,74 @@ def qubit_stop_time(self, *qubits: Union[Qubit, int]) -> float: QuantumCircuit.isometry = QuantumCircuit.iso +class _OuterCircuitScopeInterface(CircuitScopeInterface): + # This is an explicit interface-fulfilling object friend of QuantumCircuit that acts as its + # implementation of the control-flow builder scope methods. + + __slots__ = ("circuit",) + + def __init__(self, circuit: QuantumCircuit): + self.circuit = circuit + + def append(self, instruction): + # QuantumCircuit._append is semi-public, so we just call back to it. + return self.circuit._append(instruction) + + def resolve_classical_resource(self, specifier): + # This is slightly different to cbit_argument_conversion, because it should not + # unwrap :obj:`.ClassicalRegister` instances into lists, and in general it should not allow + # iterables or broadcasting. It is expected to be used as a callback for things like + # :meth:`.InstructionSet.c_if` to check the validity of their arguments. + if isinstance(specifier, Clbit): + if specifier not in self.circuit._clbit_indices: + raise CircuitError(f"Clbit {specifier} is not present in this circuit.") + return specifier + if isinstance(specifier, ClassicalRegister): + # This is linear complexity for something that should be constant, but QuantumCircuit + # does not currently keep a hashmap of registers, and requires non-trivial changes to + # how it exposes its registers publically before such a map can be safely stored so it + # doesn't miss updates. (Jake, 2021-11-10). + if specifier not in self.circuit.cregs: + raise CircuitError(f"Register {specifier} is not present in this circuit.") + return specifier + if isinstance(specifier, int): + try: + return self.circuit._clbits[specifier] + except IndexError: + raise CircuitError(f"Classical bit index {specifier} is out-of-range.") from None + raise CircuitError(f"Unknown classical resource specifier: '{specifier}'.") + + def add_uninitialized_var(self, var): + var = self.circuit._prepare_new_var(var, None) + self.circuit._vars_local[var.name] = var + + def remove_var(self, var): + self.circuit._vars_local.pop(var.name) + + def get_var(self, name): + if (out := self.circuit._vars_local.get(name)) is not None: + return out + if (out := self.circuit._vars_capture.get(name)) is not None: + return out + return self.circuit._vars_input.get(name) + + def use_var(self, var): + if self.get_var(var.name) != var: + raise CircuitError(f"'{var}' is not present in this circuit") + + +def _validate_expr(api: CircuitScopeInterface, node: expr.Expr) -> expr.Expr: + # This takes the `api` object as an argument rather than being a circuit method and inferring it + # because we may want to call this several times, and we almost invariably already need the + # interface implementation for something else anyway. + for var in set(expr.iter_vars(node)): + if var.standalone: + api.use_var(var) + else: + api.resolve_classical_resource(var.var) + return node + + class _ParameterBindsDict: __slots__ = ("mapping", "allowed_keys") diff --git a/test/python/circuit/test_control_flow_builders.py b/test/python/circuit/test_control_flow_builders.py index 35db299c2fd6..0dbb7de8008a 100644 --- a/test/python/circuit/test_control_flow_builders.py +++ b/test/python/circuit/test_control_flow_builders.py @@ -10,6 +10,8 @@ # copyright notice, and modified files need to carry a notice indicating # that they have been altered from the originals. +# pylint: disable=missing-function-docstring + """Test operations on the builder interfaces for control flow in dynamic QuantumCircuits.""" import copy @@ -25,10 +27,10 @@ QuantumCircuit, QuantumRegister, Qubit, + Store, ) from qiskit.circuit.classical import expr, types from qiskit.circuit.controlflow import ForLoopOp, IfElseOp, WhileLoopOp, SwitchCaseOp, CASE_DEFAULT -from qiskit.circuit.controlflow.builder import ControlFlowBuilderBlock from qiskit.circuit.controlflow.if_else import IfElsePlaceholder from qiskit.circuit.exceptions import CircuitError from qiskit.test import QiskitTestCase @@ -2940,6 +2942,251 @@ def test_inplace_compose_within_builder(self): self.assertEqual(canonicalize_control_flow(outer), canonicalize_control_flow(expected)) + def test_can_capture_input(self): + a = expr.Var.new("a", types.Bool()) + b = expr.Var.new("b", types.Bool()) + base = QuantumCircuit(inputs=[a, b]) + with base.for_loop(range(3)): + base.store(a, expr.lift(True)) + self.assertEqual(set(base.data[-1].operation.blocks[0].iter_captured_vars()), {a}) + + def test_can_capture_declared(self): + a = expr.Var.new("a", types.Bool()) + b = expr.Var.new("b", types.Bool()) + base = QuantumCircuit(declarations=[(a, expr.lift(False)), (b, expr.lift(True))]) + with base.if_test(expr.lift(False)): + base.store(a, expr.lift(True)) + self.assertEqual(set(base.data[-1].operation.blocks[0].iter_captured_vars()), {a}) + + def test_can_capture_capture(self): + # It's a bit wild to be manually building an outer circuit that's intended to be a subblock, + # but be using the control-flow builder interface internally, but eh, it should work. + a = expr.Var.new("a", types.Bool()) + b = expr.Var.new("b", types.Bool()) + base = QuantumCircuit(captures=[a, b]) + with base.while_loop(expr.lift(False)): + base.store(a, expr.lift(True)) + self.assertEqual(set(base.data[-1].operation.blocks[0].iter_captured_vars()), {a}) + + def test_can_capture_from_nested(self): + a = expr.Var.new("a", types.Bool()) + b = expr.Var.new("b", types.Bool()) + c = expr.Var.new("c", types.Bool()) + base = QuantumCircuit(inputs=[a, b]) + with base.switch(expr.lift(False)) as case, case(case.DEFAULT): + base.add_var(c, expr.lift(False)) + with base.if_test(expr.lift(False)): + base.store(a, c) + outer_block = base.data[-1].operation.blocks[0] + inner_block = outer_block.data[-1].operation.blocks[0] + self.assertEqual(set(inner_block.iter_captured_vars()), {a, c}) + + # The containing block should have captured it as well, despite not using it explicitly. + self.assertEqual(set(outer_block.iter_captured_vars()), {a}) + self.assertEqual(set(outer_block.iter_declared_vars()), {c}) + + def test_can_manually_capture(self): + a = expr.Var.new("a", types.Bool()) + b = expr.Var.new("b", types.Bool()) + base = QuantumCircuit(inputs=[a, b]) + with base.while_loop(expr.lift(False)): + # Why do this? Who knows, but it clearly has a well-defined meaning. + base.add_capture(a) + self.assertEqual(set(base.data[-1].operation.blocks[0].iter_captured_vars()), {a}) + + def test_later_blocks_do_not_inherit_captures(self): + """Neither 'if' nor 'switch' should have later blocks inherit the captures from the earlier + blocks, and the earlier blocks shouldn't be affected by later ones.""" + a = expr.Var.new("a", types.Bool()) + b = expr.Var.new("b", types.Bool()) + c = expr.Var.new("c", types.Bool()) + + base = QuantumCircuit(inputs=[a, b, c]) + with base.if_test(expr.lift(False)) as else_: + base.store(a, expr.lift(False)) + with else_: + base.store(b, expr.lift(False)) + blocks = base.data[-1].operation.blocks + self.assertEqual(set(blocks[0].iter_captured_vars()), {a}) + self.assertEqual(set(blocks[1].iter_captured_vars()), {b}) + + base = QuantumCircuit(inputs=[a, b, c]) + with base.switch(expr.lift(False)) as case: + with case(0): + base.store(a, expr.lift(False)) + with case(case.DEFAULT): + base.store(b, expr.lift(False)) + blocks = base.data[-1].operation.blocks + self.assertEqual(set(blocks[0].iter_captured_vars()), {a}) + self.assertEqual(set(blocks[1].iter_captured_vars()), {b}) + + def test_blocks_have_independent_declarations(self): + """The blocks of if and switch should be separate scopes for declarations.""" + b1 = expr.Var.new("b", types.Bool()) + b2 = expr.Var.new("b", types.Bool()) + self.assertNotEqual(b1, b2) + + base = QuantumCircuit() + with base.if_test(expr.lift(False)) as else_: + base.add_var(b1, expr.lift(False)) + with else_: + base.add_var(b2, expr.lift(False)) + blocks = base.data[-1].operation.blocks + self.assertEqual(set(blocks[0].iter_declared_vars()), {b1}) + self.assertEqual(set(blocks[1].iter_declared_vars()), {b2}) + + base = QuantumCircuit() + with base.switch(expr.lift(False)) as case: + with case(0): + base.add_var(b1, expr.lift(False)) + with case(case.DEFAULT): + base.add_var(b2, expr.lift(False)) + blocks = base.data[-1].operation.blocks + self.assertEqual(set(blocks[0].iter_declared_vars()), {b1}) + self.assertEqual(set(blocks[1].iter_declared_vars()), {b2}) + + def test_can_shadow_outer_name(self): + outer = expr.Var.new("a", types.Bool()) + inner = expr.Var.new("a", types.Bool()) + base = QuantumCircuit(inputs=[outer]) + with base.if_test(expr.lift(False)): + base.add_var(inner, expr.lift(True)) + block = base.data[-1].operation.blocks[0] + self.assertEqual(set(block.iter_declared_vars()), {inner}) + self.assertEqual(set(block.iter_captured_vars()), set()) + + def test_iterators_run_over_scope(self): + a = expr.Var.new("a", types.Bool()) + b = expr.Var.new("b", types.Bool()) + c = expr.Var.new("c", types.Bool()) + d = expr.Var.new("d", types.Bool()) + + base = QuantumCircuit(inputs=[a, b, c]) + self.assertEqual(set(base.iter_input_vars()), {a, b, c}) + self.assertEqual(set(base.iter_declared_vars()), set()) + self.assertEqual(set(base.iter_captured_vars()), set()) + + with base.switch(expr.lift(3)) as case: + with case(0): + # Nothing here. + self.assertEqual(set(base.iter_vars()), set()) + self.assertEqual(set(base.iter_input_vars()), set()) + self.assertEqual(set(base.iter_declared_vars()), set()) + self.assertEqual(set(base.iter_captured_vars()), set()) + + # Capture a variable. + base.store(a, expr.lift(False)) + self.assertEqual(set(base.iter_captured_vars()), {a}) + + # Declare a variable. + base.add_var(d, expr.lift(False)) + self.assertEqual(set(base.iter_declared_vars()), {d}) + self.assertEqual(set(base.iter_vars()), {a, d}) + + with case(1): + # We should have reset. + self.assertEqual(set(base.iter_vars()), set()) + self.assertEqual(set(base.iter_input_vars()), set()) + self.assertEqual(set(base.iter_declared_vars()), set()) + self.assertEqual(set(base.iter_captured_vars()), set()) + + # Capture a variable. + base.store(b, expr.lift(False)) + self.assertEqual(set(base.iter_captured_vars()), {b}) + + # Capture some more in another scope. + with base.while_loop(expr.lift(False)): + self.assertEqual(set(base.iter_vars()), set()) + base.store(c, expr.lift(False)) + self.assertEqual(set(base.iter_captured_vars()), {c}) + + self.assertEqual(set(base.iter_captured_vars()), {b, c}) + self.assertEqual(set(base.iter_vars()), {b, c}) + # And back to the outer scope. + self.assertEqual(set(base.iter_input_vars()), {a, b, c}) + self.assertEqual(set(base.iter_declared_vars()), set()) + self.assertEqual(set(base.iter_captured_vars()), set()) + + def test_get_var_respects_scope(self): + outer = expr.Var.new("a", types.Bool()) + inner = expr.Var.new("a", types.Bool()) + base = QuantumCircuit(inputs=[outer]) + self.assertEqual(base.get_var("a"), outer) + with base.if_test(expr.lift(False)) as else_: + # Before we've done anything, getting the variable should get the outer one. + self.assertEqual(base.get_var("a"), outer) + + # If we shadow it, we should get the shadowed one after. + base.add_var(inner, expr.lift(False)) + self.assertEqual(base.get_var("a"), inner) + with else_: + # In a new scope, we should see the outer one again. + self.assertEqual(base.get_var("a"), outer) + # ... until we shadow it. + base.add_var(inner, expr.lift(False)) + self.assertEqual(base.get_var("a"), inner) + self.assertEqual(base.get_var("a"), outer) + + def test_has_var_respects_scope(self): + outer = expr.Var.new("a", types.Bool()) + inner = expr.Var.new("a", types.Bool()) + base = QuantumCircuit(inputs=[outer]) + self.assertEqual(base.get_var("a"), outer) + with base.if_test(expr.lift(False)) as else_: + self.assertFalse(base.has_var("b")) + + # Before we've done anything, we should see the outer one. + self.assertTrue(base.has_var("a")) + self.assertTrue(base.has_var(outer)) + self.assertFalse(base.has_var(inner)) + + # If we shadow it, we should see the shadowed one after. + base.add_var(inner, expr.lift(False)) + self.assertTrue(base.has_var("a")) + self.assertFalse(base.has_var(outer)) + self.assertTrue(base.has_var(inner)) + with else_: + # In a new scope, we should see the outer one again. + self.assertTrue(base.has_var("a")) + self.assertTrue(base.has_var(outer)) + self.assertFalse(base.has_var(inner)) + + # ... until we shadow it. + base.add_var(inner, expr.lift(False)) + self.assertTrue(base.has_var("a")) + self.assertFalse(base.has_var(outer)) + self.assertTrue(base.has_var(inner)) + + self.assertTrue(base.has_var("a")) + self.assertTrue(base.has_var(outer)) + self.assertFalse(base.has_var(inner)) + + def test_store_to_clbit_captures_bit(self): + base = QuantumCircuit(1, 2) + with base.if_test(expr.lift(False)): + base.store(expr.lift(base.clbits[0]), expr.lift(True)) + + expected = QuantumCircuit(1, 2) + body = QuantumCircuit([expected.clbits[0]]) + body.store(expr.lift(expected.clbits[0]), expr.lift(True)) + expected.if_test(expr.lift(False), body, [], [0]) + + self.assertEqual(base, expected) + + def test_store_to_register_captures_register(self): + cr1 = ClassicalRegister(2, "cr1") + cr2 = ClassicalRegister(2, "cr2") + base = QuantumCircuit(cr1, cr2) + with base.if_test(expr.lift(False)): + base.store(expr.lift(cr1), expr.lift(3)) + + body = QuantumCircuit(cr1) + body.store(expr.lift(cr1), expr.lift(3)) + expected = QuantumCircuit(cr1, cr2) + expected.if_test(expr.lift(False), body, [], cr1[:]) + + self.assertEqual(base, expected) + @ddt.ddt class TestControlFlowBuildersFailurePaths(QiskitTestCase): @@ -3447,23 +3694,6 @@ def test_non_context_manager_calling_states_reject_missing_resources(self, resou ): test.switch(test.clbits[0], [(False, body)], qubits=qubits, clbits=clbits) - @ddt.data(None, [Clbit()], 0) - def test_builder_block_add_bits_reject_bad_bits(self, bit): - """Test that :obj:`.ControlFlowBuilderBlock` raises if something is given that is an - incorrect type. - - This isn't intended to be something users do at all; the builder block is an internal - construct only, but this keeps coverage checking happy.""" - - def dummy_requester(resource): - raise CircuitError - - builder_block = ControlFlowBuilderBlock( - qubits=(), clbits=(), resource_requester=dummy_requester - ) - with self.assertRaisesRegex(TypeError, r"Can only add qubits or classical bits.*"): - builder_block.add_bits([bit]) - def test_compose_front_inplace_invalid_within_builder(self): """Test that `QuantumCircuit.compose` raises a sensible error when called within a control-flow builder block.""" @@ -3488,3 +3718,124 @@ def test_compose_new_invalid_within_builder(self): with outer.if_test((outer.clbits[0], 1)): with self.assertRaisesRegex(CircuitError, r"Cannot emit a new composed circuit.*"): outer.compose(inner, inplace=False) + + def test_cannot_capture_variable_not_in_scope(self): + a = expr.Var.new("a", types.Bool()) + + base = QuantumCircuit(1, 1) + with base.if_test((0, True)) as else_, self.assertRaisesRegex(CircuitError, "not in scope"): + base.store(a, expr.lift(False)) + with else_, self.assertRaisesRegex(CircuitError, "not in scope"): + base.store(a, expr.lift(False)) + + base.add_input(a) + with base.while_loop((0, True)), self.assertRaisesRegex(CircuitError, "not in scope"): + base.store(expr.Var.new("a", types.Bool()), expr.lift(False)) + + with base.for_loop(range(3)): + with base.switch(base.clbits[0]) as case, case(0): + with self.assertRaisesRegex(CircuitError, "not in scope"): + base.store(expr.Var.new("a", types.Bool()), expr.lift(False)) + + def test_cannot_add_existing_variable(self): + a = expr.Var.new("a", types.Bool()) + base = QuantumCircuit() + with base.if_test(expr.lift(False)) as else_: + base.add_var(a, expr.lift(False)) + with self.assertRaisesRegex(CircuitError, "already present"): + base.add_var(a, expr.lift(False)) + with else_: + base.add_var(a, expr.lift(False)) + with self.assertRaisesRegex(CircuitError, "already present"): + base.add_var(a, expr.lift(False)) + + def test_cannot_shadow_in_same_scope(self): + a = expr.Var.new("a", types.Bool()) + base = QuantumCircuit() + with base.switch(expr.lift(3)) as case: + with case(0): + base.add_var(a, expr.lift(False)) + with self.assertRaisesRegex(CircuitError, "its name shadows"): + base.add_var(a.name, expr.lift(False)) + with case(case.DEFAULT): + base.add_var(a, expr.lift(False)) + with self.assertRaisesRegex(CircuitError, "its name shadows"): + base.add_var(a.name, expr.lift(False)) + + def test_cannot_shadow_captured_variable(self): + """It shouldn't be possible to shadow a variable that has already been captured into the + block.""" + outer = expr.Var.new("a", types.Bool()) + inner = expr.Var.new("a", types.Bool()) + + base = QuantumCircuit(inputs=[outer]) + with base.while_loop(expr.lift(True)): + # Capture the outer. + base.store(outer, expr.lift(True)) + # Attempt to shadow it. + with self.assertRaisesRegex(CircuitError, "its name shadows"): + base.add_var(inner, expr.lift(False)) + + def test_cannot_use_outer_variable_after_shadow(self): + """If we've shadowed a variable, the outer one shouldn't be visible to us for use.""" + outer = expr.Var.new("a", types.Bool()) + inner = expr.Var.new("a", types.Bool()) + + base = QuantumCircuit(inputs=[outer]) + with base.for_loop(range(3)): + # Shadow the outer. + base.add_var(inner, expr.lift(False)) + with self.assertRaisesRegex(CircuitError, "cannot use.*shadowed"): + base.store(outer, expr.lift(True)) + + def test_cannot_use_beyond_outer_shadow(self): + outer = expr.Var.new("a", types.Bool()) + inner = expr.Var.new("a", types.Bool()) + base = QuantumCircuit(inputs=[outer]) + with base.while_loop(expr.lift(True)): + # Shadow 'outer' + base.add_var(inner, expr.lift(True)) + with base.switch(expr.lift(3)) as case, case(0): + with self.assertRaisesRegex(CircuitError, "not in scope"): + # Attempt to access the shadowed variable. + base.store(outer, expr.lift(False)) + + def test_exception_during_initialisation_does_not_add_variable(self): + uint_var = expr.Var.new("a", types.Uint(16)) + bool_expr = expr.Value(False, types.Bool()) + with self.assertRaises(CircuitError): + Store(uint_var, bool_expr) + base = QuantumCircuit() + with base.while_loop(expr.lift(False)): + # Should succeed. + b = base.add_var("b", expr.lift(False)) + try: + base.add_var(uint_var, bool_expr) + except CircuitError: + pass + # Should succeed. + c = base.add_var("c", expr.lift(False)) + local_vars = set(base.iter_vars()) + self.assertEqual(local_vars, {b, c}) + + def test_cannot_use_old_var_not_in_circuit(self): + base = QuantumCircuit() + with base.if_test(expr.lift(False)) as else_: + with self.assertRaisesRegex(CircuitError, "not present"): + base.store(expr.lift(Clbit()), expr.lift(False)) + with else_: + with self.assertRaisesRegex(CircuitError, "not present"): + with base.if_test(expr.equal(ClassicalRegister(2, "c"), 3)): + pass + + def test_cannot_add_input_in_scope(self): + base = QuantumCircuit() + with base.for_loop(range(3)): + with self.assertRaisesRegex(CircuitError, "cannot add an input variable"): + base.add_input("a", types.Bool()) + + def test_cannot_add_uninitialized_in_scope(self): + base = QuantumCircuit() + with base.for_loop(range(3)): + with self.assertRaisesRegex(CircuitError, "cannot add an uninitialized variable"): + base.add_uninitialized_var(expr.Var.new("a", types.Bool()))