From 75e4f1cc59d6fb0ecb808154e82fadc5d43b7366 Mon Sep 17 00:00:00 2001 From: Jake Lishman <jake.lishman@ibm.com> Date: Mon, 22 Jan 2024 14:58:28 +0000 Subject: [PATCH] Vendor Qiskit's `pubsub` implementation Qiskit 1.0 is removing this, so applications that want to continue using it (within themselves; Qiskit will no longer be a central broker) should arrange for a centralised location for their published events. This vendors the code into the IBM Provider, from which Runtime can then re-import. For backwards compatibility, the IBM Provider's `_Broker` singleton will resolve to the Qiskit one, if a version of Qiskit that supplies it is installed. This means that the package remains compatible with code that still accesses the `Subscriber` via Qiskit (e.g. an un-upgraded IBM Runtime). --- qiskit_ibm_provider/ibm_backend.py | 2 +- .../jupyter/dashboard/dashboard.py | 2 +- qiskit_ibm_provider/utils/__init__.py | 5 + qiskit_ibm_provider/utils/pubsub.py | 182 ++++++++++++++++++ .../notes/pubsub-3d4353e50d687425.yaml | 6 + 5 files changed, 195 insertions(+), 2 deletions(-) create mode 100644 qiskit_ibm_provider/utils/pubsub.py create mode 100644 releasenotes/notes/pubsub-3d4353e50d687425.yaml diff --git a/qiskit_ibm_provider/ibm_backend.py b/qiskit_ibm_provider/ibm_backend.py index 25f5af367..124bb2d98 100644 --- a/qiskit_ibm_provider/ibm_backend.py +++ b/qiskit_ibm_provider/ibm_backend.py @@ -38,7 +38,6 @@ ) from qiskit.qobj.utils import MeasLevel, MeasReturnType -from qiskit.tools.events.pubsub import Publisher from qiskit.transpiler.passmanager import PassManager from qiskit.transpiler.target import Target @@ -60,6 +59,7 @@ ) from .utils import validate_job_tags, are_circuits_dynamic from .utils.options import QASM2Options, QASM3Options +from .utils.pubsub import Publisher from .utils.converters import local_to_utc from .utils.json_decoder import ( defaults_from_server_data, diff --git a/qiskit_ibm_provider/jupyter/dashboard/dashboard.py b/qiskit_ibm_provider/jupyter/dashboard/dashboard.py index d1071a060..a9c35092b 100644 --- a/qiskit_ibm_provider/jupyter/dashboard/dashboard.py +++ b/qiskit_ibm_provider/jupyter/dashboard/dashboard.py @@ -19,10 +19,10 @@ from IPython.core.magic import line_magic, Magics, magics_class from IPython.display import display, Javascript from qiskit.exceptions import QiskitError -from qiskit.tools.events.pubsub import Subscriber from qiskit_ibm_provider.job.exceptions import IBMJobApiError from qiskit_ibm_provider.job.ibm_job import IBMJob +from qiskit_ibm_provider.utils.pubsub import Subscriber from .backend_update import update_backend_info from .backend_widget import make_backend_widget from .job_widgets import make_clear_button, make_labels, create_job_widget diff --git a/qiskit_ibm_provider/utils/__init__.py b/qiskit_ibm_provider/utils/__init__.py index ceda58e67..c04964664 100644 --- a/qiskit_ibm_provider/utils/__init__.py +++ b/qiskit_ibm_provider/utils/__init__.py @@ -35,6 +35,10 @@ to_python_identifier validate_job_tags +Publisher/subscriber model +========================== + +.. automodule:: qiskit_ibm_provider.utils.pubsub """ from .converters import ( @@ -45,3 +49,4 @@ ) from .utils import to_python_identifier, validate_job_tags, are_circuits_dynamic from .json import RuntimeEncoder, RuntimeDecoder +from . import pubsub diff --git a/qiskit_ibm_provider/utils/pubsub.py b/qiskit_ibm_provider/utils/pubsub.py new file mode 100644 index 000000000..e05fd211c --- /dev/null +++ b/qiskit_ibm_provider/utils/pubsub.py @@ -0,0 +1,182 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2017, 2024. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +""" +Message broker for the Publisher / Subscriber mechanism +""" + +from __future__ import annotations + +import typing + +from qiskit.exceptions import QiskitError + +try: + from qiskit.tools.events.pubsub import _Broker as _QiskitBroker +except ImportError: + _QiskitBroker = None + +_Callback = typing.Callable[..., None] + + +class _Broker: + """The event/message broker. It's a singleton. + + In order to keep consistency across all the components, it would be great to + have a specific format for new events, documenting their usage. + It's the responsibility of the component emitting an event to document it's usage in + the component docstring. + + Event format:: + + "<namespace>.<component>.<action>" + + Examples: + + * "ibm.job.start" + """ + + _instance: _Broker | None = None + _subscribers: dict[str, list[_Subscription]] = {} + + @staticmethod + def __new__(cls: type[_Broker]) -> _Broker: + if _Broker._instance is None: + # Backwards compatibility for Qiskit pre-1.0; if the Qiskit-internal broker + # singleton exists then we use that instead of defining a new one, so that + # the event streams will be unified even if someone is still using the + # Qiskit entry points to subscribe. + # + # This dynamic switch assumes that the interface of this vendored `Broker` + # code remains identical to the Qiskit 0.45 version. + _Broker._instance = object.__new__(_QiskitBroker or cls) + return _Broker._instance + + class _Subscription: + def __init__(self, event: str, callback: _Callback): + self.event: str = event + self.callback: _Callback = callback + + def __eq__(self, other: object) -> bool: + """Overrides the default implementation""" + if isinstance(other, self.__class__): + return self.event == other.event and id(self.callback) == id( + other.callback + ) # Allow 1:N subscribers + return False + + def subscribe(self, event: str, callback: _Callback) -> bool: + """Subscribes to an event, so when it's emitted all the callbacks subscribed, + will be executed. We are not allowing double registration. + + Args: + event (string): The event to subscribed in the form of: + "terra.<component>.<method>.<action>" + callback (callable): The callback that will be executed when an event is + emitted. + """ + if not callable(callback): + raise QiskitError("Callback is not a callable!") + + if event not in self._subscribers: + self._subscribers[event] = [] + + new_subscription = self._Subscription(event, callback) + if new_subscription in self._subscribers[event]: + # We are not allowing double subscription + return False + + self._subscribers[event].append(new_subscription) + return True + + def dispatch(self, event: str, *args: typing.Any, **kwargs: typing.Any) -> None: + """Emits an event if there are any subscribers. + + Args: + event (String): The event to be emitted + args: Arguments linked with the event + kwargs: Named arguments linked with the event + """ + # No event, no subscribers. + if event not in self._subscribers: + return + + for subscriber in self._subscribers[event]: + subscriber.callback(*args, **kwargs) + + def unsubscribe(self, event: str, callback: _Callback) -> bool: + """Unsubscribe the specific callback to the event. + + Args + event (String): The event to unsubscribe + callback (callable): The callback that won't be executed anymore + + Returns + True: if we have successfully unsubscribed to the event + False: if there's no callback previously registered + """ + + try: + self._subscribers[event].remove(self._Subscription(event, callback)) + except KeyError: + return False + + return True + + def clear(self) -> None: + """Unsubscribe everything, leaving the Broker without subscribers/events.""" + self._subscribers.clear() + + +class Publisher: + """Represents a "publisher". + + Every component (class) can become a :class:`Publisher` and send events by + inheriting this class. Functions can call this class like:: + + Publisher().publish("event", args, ... ) + """ + + def __init__(self) -> None: + self._broker: _Broker = _Broker() + + def publish(self, event: str, *args: typing.Any, **kwargs: typing.Any) -> None: + """Triggers an event, and associates some data to it, so if there are any + subscribers, their callback will be called synchronously.""" + return self._broker.dispatch(event, *args, **kwargs) + + +class Subscriber: + """Represents a "subscriber". + + Every component (class) can become a :class:`Subscriber` and subscribe to events, + that will call callback functions when they are emitted. + """ + + def __init__(self) -> None: + self._broker: _Broker = _Broker() + + def subscribe(self, event: str, callback: _Callback) -> bool: + """Subscribes to an event, associating a callback function to that event, so + when the event occurs, the callback will be called. + + This is a blocking call, so try to keep callbacks as lightweight as possible.""" + return self._broker.subscribe(event, callback) + + def unsubscribe(self, event: str, callback: _Callback) -> bool: + """Unsubscribe a pair event-callback, so the callback will not be called anymore + when the event occurs.""" + return self._broker.unsubscribe(event, callback) + + def clear(self) -> None: + """Unsubscribe everything""" + self._broker.clear() diff --git a/releasenotes/notes/pubsub-3d4353e50d687425.yaml b/releasenotes/notes/pubsub-3d4353e50d687425.yaml new file mode 100644 index 000000000..094669b29 --- /dev/null +++ b/releasenotes/notes/pubsub-3d4353e50d687425.yaml @@ -0,0 +1,6 @@ +--- +upgrade: + - | + The IBM Provider is now the central broker for IBM-related publisher/subscriber events emitted + during job submission. If you manage a component that subscribes to events from the IBM + Provider, you should use :class:`~.pubsub.Subscriber` from this respository.