From 6a732f4b32033edd910d66593e11751f0e91aead Mon Sep 17 00:00:00 2001 From: gadial Date: Fri, 12 May 2023 00:02:17 +0300 Subject: [PATCH] Experiment data fixes (#1092) ### Summary This PR handles some issues related to `ExperimentData` 1. Fixing a bug in `_add_job_data`. 2. Adding multi-upload capability to `ExperimentData.save()` 3. Different `provider` handling to enable better data loading 4. `start_datetime` and `end_datetime` are not being set at all, and `creation_datetime` and `updated_datetime` are being set only after loading the experiment from the server. ### Details and comments 1. Currently `_add_job_data` is adding the result of a job without explicitly supplying its `job_id`. While in the old `qiskit-ibmq-provider` it was ok, in the new `qiskit-ibm-provider` it seems the job id contained in the `Result` object is different than the job id of the actual job itself. Since `ExperimentData` keeps the original job id, the result is that for every submitted job, it ends up with two different ids: One of a seemingly unfinished job, and the second for a job which was seemingly never initiated. This PR addresses this issue by using the original job id whenever possible. 2. `ExperimentData.save()` currently uploads both analysis results and figures one-by-one, with the result being inefficient which already affects other projects. This issue is handled in `qiskit-ibm-experiments` whose API was enlarged to allow multiple uploading of analysis results and figures; this PR enables this API usage in `ExperimentData` 3. `ExperimentData.load()` currently takes the `experiment_id` and an `IBMExperimentService` object. This has two setbacks: First, `IBMExperimentService` should be transparent to the users as much as possible. Second, `IBMExperimentService` handles the resultDB data, but the job data stored by `ExperimentData` is handled by the `IBMProvider`. This issue can be fixed by allowing the provider to be passed as parameter to `load()` since the service can be obtained from the provider. This change also fixes #1093. 4. `start_datetime` and `end_datetime` were not set by `ExperimentData` nor by the database itself. This PR makes the experiment data set `start_datetime` to the time it was created (unless another value is passed on creation; currently the `BaseExperiment` creates the experiment data right before beginning the experiment. Also, this PR makes every job update the `end_datetime` once it terminates. Along with that, calls to `save()` now update the values of `creation_datetime` and `updated_datetime` (which are set by the server). All the times are stored in UTC timezone, but the getters return them in local time, and the setters convert from local time to UTC. 5. `ExperimentData.save()` did not raise error in case no database service was available. Now it raises an error if `suppress_errors` is `False`. --------- Co-authored-by: Yael Ben-Haim --- .../database_service/exceptions.py | 6 + .../framework/experiment_data.py | 296 ++++++++++++++---- ...xperiment_data_fixes-f69c3569a8ba1342.yaml | 30 ++ requirements.txt | 2 +- .../test_db_experiment_data.py | 13 +- 5 files changed, 273 insertions(+), 74 deletions(-) create mode 100644 releasenotes/notes/0.5/experiment_data_fixes-f69c3569a8ba1342.yaml diff --git a/qiskit_experiments/database_service/exceptions.py b/qiskit_experiments/database_service/exceptions.py index c8884ea01d..23209c1cc9 100644 --- a/qiskit_experiments/database_service/exceptions.py +++ b/qiskit_experiments/database_service/exceptions.py @@ -31,3 +31,9 @@ class ExperimentEntryExists(ExperimentDataError): """Errors raised when an experiment entry already exists.""" pass + + +class ExperimentDataSaveFailed(ExperimentDataError): + """Errors raised when an experiment save fails.""" + + pass diff --git a/qiskit_experiments/framework/experiment_data.py b/qiskit_experiments/framework/experiment_data.py index 5a79d2d98f..cbda58eade 100644 --- a/qiskit_experiments/framework/experiment_data.py +++ b/qiskit_experiments/framework/experiment_data.py @@ -17,7 +17,7 @@ import logging import dataclasses from typing import Dict, Optional, List, Union, Any, Callable, Tuple, TYPE_CHECKING -from datetime import datetime +from datetime import datetime, timezone from concurrent import futures from threading import Event from functools import wraps @@ -31,6 +31,7 @@ import sys import traceback import numpy as np +from dateutil import tz from matplotlib import pyplot from matplotlib.figure import Figure as MatplotlibFigure from qiskit.result import Result @@ -53,6 +54,7 @@ ExperimentDataError, ExperimentEntryNotFound, ExperimentEntryExists, + ExperimentDataSaveFailed, ) if TYPE_CHECKING: @@ -78,6 +80,47 @@ def _wrapped(self, *args, **kwargs): return _wrapped +def utc_to_local(utc_dt: datetime) -> datetime: + """Convert input UTC timestamp to local timezone. + + Args: + utc_dt: Input UTC timestamp. + + Returns: + A ``datetime`` with the local timezone. + """ + if utc_dt is None: + return None + local_dt = utc_dt.astimezone(tz.tzlocal()) + return local_dt + + +def local_to_utc(local_dt: datetime) -> datetime: + """Convert input local timezone timestamp to UTC timezone. + + Args: + local_dt: Input local timestamp. + + Returns: + A ``datetime`` with the UTC timezone. + """ + if local_dt is None: + return None + utc_dt = local_dt.astimezone(tz.UTC) + return utc_dt + + +def parse_utc_datetime(dt_str: str) -> datetime: + """Parses UTC datetime from a string""" + if dt_str is None: + return None + + db_datetime_format = "%Y-%m-%dT%H:%M:%S.%fZ" + dt_utc = datetime.strptime(dt_str, db_datetime_format) + dt_utc = dt_utc.replace(tzinfo=timezone.utc) + return dt_utc + + class FigureData: """Wrapper class for figures and figure metadata. The raw figure can be accessed with the ``figure`` attribute.""" @@ -174,11 +217,13 @@ def __init__( experiment: Optional["BaseExperiment"] = None, backend: Optional[Backend] = None, service: Optional[IBMExperimentService] = None, + provider: Optional[Provider] = None, parent_id: Optional[str] = None, job_ids: Optional[List[str]] = None, child_data: Optional[List[ExperimentData]] = None, verbose: Optional[bool] = True, db_data: Optional[ExperimentDataclass] = None, + start_datetime: Optional[datetime] = None, **kwargs, ): """Initialize experiment data. @@ -188,6 +233,8 @@ def __init__( backend: Backend the experiment runs on. This overrides the backend in the experiment object. service: The service that stores the experiment results to the database + provider: The provider used for the experiments + (can be used to automatically obtain the service) parent_id: ID of the parent experiment data in the setting of a composite experiment job_ids: IDs of jobs submitted for the experiment. @@ -195,12 +242,23 @@ def __init__( verbose: Whether to print messages. db_data: A prepared ExperimentDataclass of the experiment info. This overrides other db parameters. + start_datetime: The time when the experiment was started. + If none, defaults to the current time + + Additional info: + In order to save the experiment data to the cloud service, the class + needs access to the experiment service provider. It can be obtained + via three different methods, given here by priority: + 1. Passing it directly via the ``service`` parameter. + 2. Implicitly obtaining it from the ``provider`` parameter. + 3. Implicitly obtaining it from the ``backend`` parameter, using that backend's provider. """ if experiment is not None: backend = backend or experiment.backend experiment_type = experiment.experiment_type else: - experiment_type = None + # Don't use None since the resultDB won't accept that + experiment_type = "" if job_ids is None: job_ids = [] @@ -230,7 +288,10 @@ def __init__( ) else: self._db_data = db_data - + if self.start_datetime is None: + if start_datetime is None: + start_datetime = datetime.now() + self.start_datetime = start_datetime for key, value in kwargs.items(): if hasattr(self._db_data, key): setattr(self._db_data, key, value) @@ -241,8 +302,13 @@ def __init__( self._backend = None if backend is not None: self._set_backend(backend, recursive=False) + self.provider = provider + if provider is None and backend is not None: + self.provider = backend.provider self._service = service - if self._service is None and self.backend is not None: + if self._service is None and self.provider is not None: + self._service = self.get_service_from_provider(self.provider) + if self._service is None and self.provider is None and self.backend is not None: self._service = self.get_service_from_backend(self.backend) self._auto_save = False self._created_in_db = False @@ -325,44 +391,54 @@ def metadata(self) -> Dict: return self._db_data.metadata @property - def creation_datetime(self) -> "datetime": + def creation_datetime(self) -> datetime: """Return the creation datetime of this experiment data. Returns: - The creation datetime of this experiment data. + The creation datetime of this experiment data in the local timezone. """ - return self._db_data.creation_datetime + return utc_to_local(self._db_data.creation_datetime) @property - def start_datetime(self) -> "datetime": + def start_datetime(self) -> datetime: """Return the start datetime of this experiment data. Returns: The start datetime of this experiment data. """ - return self._db_data.start_datetime + return utc_to_local(self._db_data.start_datetime) + + @start_datetime.setter + def start_datetime(self, new_start_datetime: datetime) -> None: + self._db_data.start_datetime = local_to_utc(new_start_datetime) @property - def updated_datetime(self) -> "datetime": + def updated_datetime(self) -> datetime: """Return the update datetime of this experiment data. Returns: The update datetime of this experiment data. """ - return self._db_data.updated_datetime + return utc_to_local(self._db_data.updated_datetime) @property - def end_datetime(self) -> "datetime": + def end_datetime(self) -> datetime: """Return the end datetime of this experiment data. + The end datetime is the time the latest job data was + added without errors; this can change as more jobs finish. Returns: The end datetime of this experiment data. """ - return self._db_data.end_datetime + return utc_to_local(self._db_data.end_datetime) + + @end_datetime.setter + def end_datetime(self, new_end_datetime: datetime) -> None: + self._db_data.end_datetime = local_to_utc(new_end_datetime) @property def hub(self) -> str: @@ -394,17 +470,6 @@ def project(self) -> str: """ return self._db_data.project - @property - def _provider(self) -> Optional[Provider]: - """Return the provider. - - Returns: - Provider used for the experiment, or ``None`` if unknown. - """ - if self._backend is None: - return None - return self._backend.provider() - @property def experiment_id(self) -> str: """Return experiment ID @@ -567,7 +632,10 @@ def _set_hgp_from_provider(self, provider): project = creds.project # qiskit-ibm-provider style if hasattr(provider, "_hgps"): - hub, group, project = list(self.backend.provider._hgps.keys())[0].split("/") + for hgp_string, hgp in self.backend.provider._hgps.items(): + if self.backend.name in hgp.backends: + hub, group, project = hgp_string.split("/") + break self._db_data.hub = self._db_data.hub or hub self._db_data.group = self._db_data.group or group self._db_data.project = self._db_data.project or project @@ -606,6 +674,24 @@ def service(self, service: IBMExperimentService) -> None: """ self._set_service(service) + @property + def provider(self) -> Optional[Provider]: + """Return the backend provider. + + Returns: + Provider that is used to obtain backends and job data. + """ + return self._provider + + @provider.setter + def provider(self, provider: Provider) -> None: + """Set the provider to be used for obtaining job data + + Args: + provider: Provider to be used. + """ + self._provider = provider + @property def auto_save(self) -> bool: """Return current auto-save option. @@ -790,8 +876,10 @@ def _add_job_data( jid = job.job_id() try: job_result = job.result() - self._add_result_data(job_result) + self._add_result_data(job_result, jid) LOG.debug("Job data added [Job ID: %s]", jid) + # sets the endtime to be the time the last successful job was added + self.end_datetime = datetime.now() return jid, True except Exception as ex: # pylint: disable=broad-except # Handle cancelled jobs @@ -911,20 +999,24 @@ def _run_analysis_callback( LOG.warning(error_msg) return callback_id, False - def _add_result_data(self, result: Result) -> None: + def _add_result_data(self, result: Result, job_id: Optional[str] = None) -> None: """Add data from a Result object Args: result: Result object containing data to be added. + job_id: The id of the job the result came from. If `None`, the + job id in `result` is used. """ - if result.job_id not in self._jobs: - self._jobs[result.job_id] = None - self.job_ids.append(result.job_id) + if job_id is None: + job_id = result.job_id + if job_id not in self._jobs: + self._jobs[job_id] = None + self.job_ids.append(job_id) with self._result_data.lock: # Lock data while adding all result data for i, _ in enumerate(result.results): data = result.data(i) - data["job_id"] = result.job_id + data["job_id"] = job_id if "counts" in data: # Format to Counts object rather than hex dict data["counts"] = result.get_counts(i) @@ -939,22 +1031,28 @@ def _add_result_data(self, result: Result) -> None: def _retrieve_data(self): """Retrieve job data if missing experiment data.""" - if self._result_data or not self._backend: + # Get job results if missing in experiment data. + if self.provider is None: return - # Get job results if missing experiment data. retrieved_jobs = {} - for jid, job in self._jobs.items(): - if job is None: - try: - LOG.debug("Retrieving job from backend %s [Job ID: %s]", self._backend, jid) - job = self._backend.retrieve_job(jid) - retrieved_jobs[jid] = job - except Exception: # pylint: disable=broad-except - LOG.warning( - "Unable to retrieve data from job on backend %s [Job ID: %s]", - self._backend, - jid, - ) + jobs_to_retrieve = [] # the list of all jobs to retrieve from the server + + # first find which jobs are listed in the `job_ids` field of the experiment data + if self.job_ids is not None: + for jid in self.job_ids: + if jid not in self._jobs or self._jobs[jid] is None: + jobs_to_retrieve.append(jid) + + for jid in jobs_to_retrieve: + try: + LOG.debug("Retrieving job [Job ID: %s]", jid) + job = self.provider.retrieve_job(jid) + retrieved_jobs[jid] = job + except Exception: # pylint: disable=broad-except + LOG.warning( + "Unable to retrieve data from job [Job ID: %s]", + jid, + ) # Add retrieved job objects to stored jobs and extract data for jid, job in retrieved_jobs.items(): self._jobs[jid] = job @@ -1349,9 +1447,15 @@ def _save_experiment_metadata(self, suppress_errors: bool = True) -> None: metadata = self._db_data.metadata self._db_data.metadata = {} - self.service.create_or_update_experiment( + result = self.service.create_or_update_experiment( self._db_data, json_encoder=self._json_encoder, create=not self._created_in_db ) + if isinstance(result, dict): + created_datetime = result.get("created_at", None) + updated_datetime = result.get("updated_at", None) + self._db_data.creation_datetime = parse_utc_datetime(created_datetime) + self._db_data.updated_datetime = parse_utc_datetime(updated_datetime) + self._created_in_db = True if handle_metadata_separately: @@ -1371,12 +1475,21 @@ def _metadata_too_large(self): # currently the entire POST JSON request body is limited by default to 100kb return sys.getsizeof(self.metadata) > 10000 - def save(self, suppress_errors: bool = True) -> None: + def save( + self, suppress_errors: bool = True, max_workers: int = 100, save_figures: bool = True + ) -> None: """Save the experiment data to a database service. Args: suppress_errors: should the method catch exceptions (true) or pass them on, potentially aborting the experiemnt (false) + max_workers: Maximum number of concurrent worker threads + save_figures: Whether to save figures in the database or not + + Raises: + ExperimentDataSaveFailed: If no experiment database service + was found, or the experiment service failed to save + .. note:: This saves the experiment metadata, all analysis results, and all figures. Depending on the number of figures and analysis results this @@ -1392,33 +1505,59 @@ def save(self, suppress_errors: bool = True) -> None: "An experiment service is available, for example, " "when using an IBM Quantum backend." ) - return + if suppress_errors: + return + else: + raise ExperimentDataSaveFailed("No service found") self._save_experiment_metadata(suppress_errors=suppress_errors) if not self._created_in_db: LOG.warning("Could not save experiment metadata to DB, aborting experiment save") return + analysis_results_to_create = [] for result in self._analysis_results.values(): - result.save(suppress_errors=suppress_errors) + analysis_results_to_create.append(result._db_data) + try: + self.service.create_analysis_results( + data=analysis_results_to_create, + blocking=True, + json_encoder=self._json_encoder, + max_workers=max_workers, + ) + for result in self._analysis_results.values(): + result._created_in_db = True + except Exception as ex: # pylint: disable=broad-except + # Don't automatically fail the experiment just because its data cannot be saved. + LOG.error("Unable to save the experiment data: %s", traceback.format_exc()) + if not suppress_errors: + raise ExperimentDataSaveFailed( + f"Analysis result save failed\nError Message:\n{str(ex)}" + ) from ex for result in self._deleted_analysis_results.copy(): with service_exception_to_warning(): self._service.delete_analysis_result(result_id=result) self._deleted_analysis_results.remove(result) - with self._figures.lock: - for name, figure in self._figures.items(): - if figure is None: - continue - # currently only the figure and its name are stored in the database - if isinstance(figure, FigureData): - figure = figure.figure - LOG.debug("Figure metadata is currently not saved to the database") - if isinstance(figure, pyplot.Figure): - figure = plot_to_svg_bytes(figure) - self._service.create_or_update_figure( - experiment_id=self.experiment_id, figure=figure, figure_name=name + if save_figures: + with self._figures.lock: + figures_to_create = [] + for name, figure in self._figures.items(): + if figure is None: + continue + # currently only the figure and its name are stored in the database + if isinstance(figure, FigureData): + figure = figure.figure + LOG.debug("Figure metadata is currently not saved to the database") + if isinstance(figure, pyplot.Figure): + figure = plot_to_svg_bytes(figure) + figures_to_create.append((figure, name)) + self.service.create_figures( + experiment_id=self.experiment_id, + figure_list=figures_to_create, + blocking=True, + max_workers=max_workers, ) for name in self._deleted_figures.copy(): @@ -1435,7 +1574,9 @@ def save(self, suppress_errors: bool = True) -> None: for data in self._child_data.values(): original_verbose = data.verbose data.verbose = False - data.save() + data.save( + suppress_errors=suppress_errors, max_workers=max_workers, save_figures=save_figures + ) data.verbose = original_verbose def jobs(self) -> List[Job]: @@ -1866,21 +2007,36 @@ def child_data( raise QiskitError(f"Invalid index type {type(index)}.") @classmethod - def load(cls, experiment_id: str, service: IBMExperimentService) -> "ExperimentData": + def load( + cls, + experiment_id: str, + service: Optional[IBMExperimentService] = None, + provider: Optional[Provider] = None, + ) -> "ExperimentData": """Load a saved experiment data from a database service. Args: experiment_id: Experiment ID. service: the database service. + provider: an IBMProvider required for loading job data and + can be used to initialize the service. Returns: The loaded experiment data. + Raises: + ExperimentDataError: If not service nor provider were given. """ + if service is None: + if provider is None: + raise ExperimentDataError( + "Loading an experiment requires a valid ibm provider or experiment service" + ) + service = cls.get_service_from_provider(provider) data = service.experiment(experiment_id, json_decoder=cls._json_decoder) if service.experiment_has_file(experiment_id, cls._metadata_filename): metadata = service.file_download(experiment_id, cls._metadata_filename) data.metadata.update(metadata) - expdata = cls(service=service, db_data=data) + expdata = cls(service=service, db_data=data, provider=provider) # Retrieve data and analysis results # Maybe this isn't necessary but the repr of the class should @@ -1892,7 +2048,9 @@ def load(cls, experiment_id: str, service: IBMExperimentService) -> "ExperimentD expdata._created_in_db = True child_data_ids = expdata.metadata.pop("child_data_ids", []) - child_data = [ExperimentData.load(child_id, service) for child_id in child_data_ids] + child_data = [ + ExperimentData.load(child_id, service, provider) for child_id in child_data_ids + ] expdata._set_child_data(child_data) return expdata @@ -2147,10 +2305,14 @@ def __getstate__(self): @staticmethod def get_service_from_backend(backend): - """Initializes the server from the backend data""" + """Initializes the service from the backend data""" + return ExperimentData.get_service_from_provider(backend.provider) + + @staticmethod + def get_service_from_provider(provider): + """Initializes the service from the provider data""" db_url = "https://auth.quantum-computing.ibm.com/api" try: - provider = backend._provider # qiskit-ibmq-provider style if hasattr(provider, "credentials"): token = provider.credentials.token diff --git a/releasenotes/notes/0.5/experiment_data_fixes-f69c3569a8ba1342.yaml b/releasenotes/notes/0.5/experiment_data_fixes-f69c3569a8ba1342.yaml new file mode 100644 index 0000000000..8d876e9ef6 --- /dev/null +++ b/releasenotes/notes/0.5/experiment_data_fixes-f69c3569a8ba1342.yaml @@ -0,0 +1,30 @@ +--- +features: + - | + :meth:`.ExperimentData.save` now uses the multithreading capability + of the experiment service to enable faster saving times. + - | + :class:`.ExperimentData` now supports the new method + :meth:`.ExperimentData.get_service_from_provider` enabling the automatic + setting of the experiment database service via passing the provider. + - | + The `start_datetime`property of :class:`.ExperimentData`is now being + set to the time the experiment data was created. + - | + The `end_datetime` property of :class:`.ExperimentData` is now being + set to the latest time a successful job terminated. + - | + The `creation_datetime` and `updated_datetime` properties of :class:`.ExperimentData` + are now being read from the server when saving the experiment. + - | + All the datetime properties are stored in UTC and converted to local when using getters. + - | + :meth:`.ExperimentData.save` can now raise exceptions when saving + fails, unless the `suppress_errors` flag is set (on by default) + + +fixes: + - | + Fixed a bug in :meth:`.ExperimentData._add_job_data` that caused job-id + related test fails. + diff --git a/requirements.txt b/requirements.txt index dd50667a73..f3a3c141e0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ numpy>=1.17 scipy>=1.4 qiskit-terra>=0.24 -qiskit-ibm-experiment>=0.2.5 +qiskit-ibm-experiment>=0.3.1 qiskit_dynamics>=0.3.0 matplotlib>=3.4 uncertainties diff --git a/test/database_service/test_db_experiment_data.py b/test/database_service/test_db_experiment_data.py index 0c804061e3..ec7ee55a4b 100644 --- a/test/database_service/test_db_experiment_data.py +++ b/test/database_service/test_db_experiment_data.py @@ -36,6 +36,7 @@ from qiskit_experiments.framework import ExperimentData from qiskit_experiments.framework import AnalysisResult from qiskit_experiments.framework import BackendData +from qiskit_experiments.framework.experiment_data import local_to_utc from qiskit_experiments.database_service.exceptions import ( ExperimentDataError, ExperimentEntryNotFound, @@ -493,8 +494,8 @@ def test_save(self): exp_data.service = service exp_data.save() service.create_or_update_experiment.assert_called_once() - service.create_or_update_figure.assert_called_once() - analysis_result.save.assert_called_once() + service.create_figures.assert_called_once() + service.create_analysis_results.assert_called_once() def test_save_delete(self): """Test saving all deletion.""" @@ -1066,16 +1067,16 @@ def test_getters(self): data = ExperimentData() test_time = datetime.now() data._db_data.creation_datetime = test_time - self.assertEqual(data.creation_datetime, test_time) + self.assertEqual(data.creation_datetime, local_to_utc(test_time)) test_time = test_time + timedelta(hours=1) data._db_data.start_datetime = test_time - self.assertEqual(data.start_datetime, test_time) + self.assertEqual(data.start_datetime, local_to_utc(test_time)) test_time = test_time + timedelta(hours=1) data._db_data.end_datetime = test_time - self.assertEqual(data.end_datetime, test_time) + self.assertEqual(data.end_datetime, local_to_utc(test_time)) test_time = test_time + timedelta(hours=1) data._db_data.updated_datetime = test_time - self.assertEqual(data.updated_datetime, test_time) + self.assertEqual(data.updated_datetime, local_to_utc(test_time)) data._db_data.hub = "hub_name" data._db_data.group = "group_name"