Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Vendor Qiskit's pubsub implementation #799

Merged
merged 2 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion qiskit_ibm_provider/ibm_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
)

from qiskit.qobj.utils import MeasLevel, MeasReturnType
from qiskit.tools.events.pubsub import Publisher
from qiskit.transpiler.passmanager import PassManager
from qiskit.transpiler.target import Target

Expand All @@ -60,6 +59,7 @@
)
from .utils import validate_job_tags, are_circuits_dynamic
from .utils.options import QASM2Options, QASM3Options
from .utils.pubsub import Publisher
from .utils.converters import local_to_utc
from .utils.json_decoder import (
defaults_from_server_data,
Expand Down
2 changes: 1 addition & 1 deletion qiskit_ibm_provider/jupyter/dashboard/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
from IPython.core.magic import line_magic, Magics, magics_class
from IPython.display import display, Javascript
from qiskit.exceptions import QiskitError
from qiskit.tools.events.pubsub import Subscriber

from qiskit_ibm_provider.job.exceptions import IBMJobApiError
from qiskit_ibm_provider.job.ibm_job import IBMJob
from qiskit_ibm_provider.utils.pubsub import Subscriber
from .backend_update import update_backend_info
from .backend_widget import make_backend_widget
from .job_widgets import make_clear_button, make_labels, create_job_widget
Expand Down
5 changes: 5 additions & 0 deletions qiskit_ibm_provider/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
to_python_identifier
validate_job_tags

Publisher/subscriber model
==========================

.. automodule:: qiskit_ibm_provider.utils.pubsub
"""

from .converters import (
Expand All @@ -45,3 +49,4 @@
)
from .utils import to_python_identifier, validate_job_tags, are_circuits_dynamic
from .json import RuntimeEncoder, RuntimeDecoder
from . import pubsub
182 changes: 182 additions & 0 deletions qiskit_ibm_provider/utils/pubsub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
# This code is part of Qiskit.
#
# (C) Copyright IBM 2017, 2024.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

"""
Message broker for the Publisher / Subscriber mechanism
"""

from __future__ import annotations

import typing

from qiskit.exceptions import QiskitError

try:
from qiskit.tools.events.pubsub import _Broker as _QiskitBroker
except ImportError:
_QiskitBroker = None

_Callback = typing.Callable[..., None]


class _Broker:
"""The event/message broker. It's a singleton.

In order to keep consistency across all the components, it would be great to
have a specific format for new events, documenting their usage.
It's the responsibility of the component emitting an event to document it's usage in
the component docstring.

Event format::

"<namespace>.<component>.<action>"

Examples:

* "ibm.job.start"
"""

_instance: _Broker | None = None
_subscribers: dict[str, list[_Subscription]] = {}

@staticmethod
def __new__(cls: type[_Broker]) -> _Broker:
if _Broker._instance is None:
# Backwards compatibility for Qiskit pre-1.0; if the Qiskit-internal broker
# singleton exists then we use that instead of defining a new one, so that
# the event streams will be unified even if someone is still using the
# Qiskit entry points to subscribe.
#
# This dynamic switch assumes that the interface of this vendored `Broker`
# code remains identical to the Qiskit 0.45 version.
_Broker._instance = object.__new__(_QiskitBroker or cls)
return _Broker._instance

class _Subscription:
def __init__(self, event: str, callback: _Callback):
self.event: str = event
self.callback: _Callback = callback

def __eq__(self, other: object) -> bool:
"""Overrides the default implementation"""
if isinstance(other, self.__class__):
return self.event == other.event and id(self.callback) == id(
other.callback
) # Allow 1:N subscribers
return False

def subscribe(self, event: str, callback: _Callback) -> bool:
"""Subscribes to an event, so when it's emitted all the callbacks subscribed,
will be executed. We are not allowing double registration.

Args:
event (string): The event to subscribed in the form of:
"terra.<component>.<method>.<action>"
callback (callable): The callback that will be executed when an event is
emitted.
"""
if not callable(callback):
raise QiskitError("Callback is not a callable!")

if event not in self._subscribers:
self._subscribers[event] = []

new_subscription = self._Subscription(event, callback)
if new_subscription in self._subscribers[event]:
# We are not allowing double subscription
return False

self._subscribers[event].append(new_subscription)
return True

def dispatch(self, event: str, *args: typing.Any, **kwargs: typing.Any) -> None:
"""Emits an event if there are any subscribers.

Args:
event (String): The event to be emitted
args: Arguments linked with the event
kwargs: Named arguments linked with the event
"""
# No event, no subscribers.
if event not in self._subscribers:
return

for subscriber in self._subscribers[event]:
subscriber.callback(*args, **kwargs)

def unsubscribe(self, event: str, callback: _Callback) -> bool:
"""Unsubscribe the specific callback to the event.

Args
event (String): The event to unsubscribe
callback (callable): The callback that won't be executed anymore

Returns
True: if we have successfully unsubscribed to the event
False: if there's no callback previously registered
"""

try:
self._subscribers[event].remove(self._Subscription(event, callback))
except KeyError:
return False

return True

def clear(self) -> None:
"""Unsubscribe everything, leaving the Broker without subscribers/events."""
self._subscribers.clear()


class Publisher:
"""Represents a "publisher".

Every component (class) can become a :class:`Publisher` and send events by
inheriting this class. Functions can call this class like::

Publisher().publish("event", args, ... )
"""

def __init__(self) -> None:
self._broker: _Broker = _Broker()

def publish(self, event: str, *args: typing.Any, **kwargs: typing.Any) -> None:
"""Triggers an event, and associates some data to it, so if there are any
subscribers, their callback will be called synchronously."""
return self._broker.dispatch(event, *args, **kwargs)


class Subscriber:
"""Represents a "subscriber".

Every component (class) can become a :class:`Subscriber` and subscribe to events,
that will call callback functions when they are emitted.
"""

def __init__(self) -> None:
self._broker: _Broker = _Broker()

def subscribe(self, event: str, callback: _Callback) -> bool:
"""Subscribes to an event, associating a callback function to that event, so
when the event occurs, the callback will be called.

This is a blocking call, so try to keep callbacks as lightweight as possible."""
return self._broker.subscribe(event, callback)

def unsubscribe(self, event: str, callback: _Callback) -> bool:
"""Unsubscribe a pair event-callback, so the callback will not be called anymore
when the event occurs."""
return self._broker.unsubscribe(event, callback)

def clear(self) -> None:
"""Unsubscribe everything"""
self._broker.clear()
6 changes: 6 additions & 0 deletions releasenotes/notes/pubsub-3d4353e50d687425.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
upgrade:
- |
The IBM Provider is now the central broker for IBM-related publisher/subscriber events emitted
during job submission. If you manage a component that subscribes to events from the IBM
Provider, you should use :class:`~.pubsub.Subscriber` from this respository.
Loading