diff --git a/parsec/core/gui/app.py b/parsec/core/gui/app.py index c7e2e294abe..2b3c9d78968 100644 --- a/parsec/core/gui/app.py +++ b/parsec/core/gui/app.py @@ -2,11 +2,12 @@ import sys import signal -from queue import Queue from typing import Optional from contextlib import contextmanager from enum import Enum + import trio +import qtrio from structlog import get_logger from PyQt5.QtCore import QTimer, Qt from PyQt5.QtWidgets import QApplication @@ -28,7 +29,7 @@ from parsec.core.gui.new_version import CheckNewVersion from parsec.core.gui.systray import systray_available, Systray from parsec.core.gui.main_window import MainWindow - from parsec.core.gui.trio_thread import ThreadSafeQtSignal, run_trio_thread + from parsec.core.gui.trio_thread import ThreadSafeQtSignal, run_trio_job_scheduler except ImportError as exc: raise ModuleNotFoundError( """PyQt forms haven't been generated. @@ -50,7 +51,7 @@ def _before_quit(): IPCServerStartupOutcome = Enum("IPCServerStartupOutcome", "STARTED ALREADY_RUNNING ERROR") -async def _run_ipc_server(config, main_window, start_arg, result_queue): +async def _run_ipc_server(config, main_window, start_arg, task_status=trio.TASK_STATUS_IGNORED): try: new_instance_needed_qt = ThreadSafeQtSignal(main_window, "new_instance_needed", object) foreground_needed_qt = ThreadSafeQtSignal(main_window, "foreground_needed") @@ -69,7 +70,7 @@ async def _cmd_handler(cmd): config.ipc_socket_file, win32_mutex_name=config.ipc_win32_mutex_name, ): - result_queue.put_nowait(IPCServerStartupOutcome.STARTED) + task_status.started(IPCServerStartupOutcome.STARTED) await trio.sleep_forever() except IPCServerAlreadyRunning: @@ -87,12 +88,12 @@ async def _cmd_handler(cmd): continue # We have successfuly noticed the other running application - result_queue.put_nowait(IPCServerStartupOutcome.ALREADY_RUNNING) + task_status.started(IPCServerStartupOutcome.ALREADY_RUNNING) break - except Exception: - result_queue.put_nowait(IPCServerStartupOutcome.ERROR) - # Let the exception bubble up so QtToTrioJob logged it as an unexpected error + except Exception as exc: + task_status.started(IPCServerStartupOutcome.ERROR) + logger.exection(exc) raise @@ -140,16 +141,18 @@ def run_gui(config: CoreConfig, start_arg: Optional[str] = None, diagnose: bool QApplication.setHighDpiScaleFactorRoundingPolicy( Qt.HighDpiScaleFactorRoundingPolicy.PassThrough ) + return qtrio.run(_run_gui, config, start_arg, diagnose) - app = ParsecApp() +async def _run_gui(config: CoreConfig, start_arg: str = None, diagnose: bool = False): + app = ParsecApp() app.load_stylesheet() app.load_font() lang_key = lang.switch_language(config) event_bus = EventBus() - with run_trio_thread() as jobs_ctx: + async with run_trio_job_scheduler() as jobs_ctx: win = MainWindow( jobs_ctx=jobs_ctx, event_bus=event_bus, @@ -157,27 +160,9 @@ def run_gui(config: CoreConfig, start_arg: Optional[str] = None, diagnose: bool minimize_on_close=config.gui_tray_enabled and systray_available(), ) - result_queue = Queue(maxsize=1) - - class ThreadSafeNoQtSignal(ThreadSafeQtSignal): - def __init__(self): - self.qobj = None - self.signal_name = "" - self.args_types = () - - def emit(self, *args): - pass - - jobs_ctx.submit_job( - ThreadSafeNoQtSignal(), - ThreadSafeNoQtSignal(), - _run_ipc_server, - config, - win, - start_arg, - result_queue, - ) - if result_queue.get() == IPCServerStartupOutcome.ALREADY_RUNNING: + result = await jobs_ctx.nursery.start(_run_ipc_server, config, win, start_arg) + + if result == IPCServerStartupOutcome.ALREADY_RUNNING: # Another instance of Parsec already started, nothing more to do return @@ -226,7 +211,7 @@ def kill_window(*args): if diagnose: with fail_on_first_exception(kill_window): - return app.exec_() + await trio.sleep_forever() else: with log_pyqt_exceptions(): - return app.exec_() + await trio.sleep_forever() diff --git a/parsec/core/gui/instance_widget.py b/parsec/core/gui/instance_widget.py index 74946d64512..d925f4e481d 100644 --- a/parsec/core/gui/instance_widget.py +++ b/parsec/core/gui/instance_widget.py @@ -19,7 +19,11 @@ MountpointWinfspNotAvailable, ) -from parsec.core.gui.trio_thread import QtToTrioJobScheduler, ThreadSafeQtSignal +from parsec.core.gui.trio_thread import ( + QtToTrioJobScheduler, + ThreadSafeQtSignal, + run_trio_job_scheduler, +) from parsec.core.gui.parsec_application import ParsecApp from parsec.core.gui.custom_dialogs import show_error from parsec.core.gui.lang import translate as _ @@ -37,10 +41,9 @@ async def _do_run_core(config, device, qt_on_ready): async with logged_core_factory(config=config, device=device, event_bus=None) as core: # Create our own job scheduler allows us to cancel all pending # jobs depending on us when we logout - core_jobs_ctx = QtToTrioJobScheduler() - async with trio.open_service_nursery() as nursery: - await nursery.start(core_jobs_ctx._start) + async with run_trio_job_scheduler() as core_jobs_ctx: qt_on_ready.emit(core, core_jobs_ctx) + await trio.sleep_forever() class InstanceWidget(QWidget): diff --git a/parsec/core/gui/trio_thread.py b/parsec/core/gui/trio_thread.py index fbf17019877..b1652f35a81 100644 --- a/parsec/core/gui/trio_thread.py +++ b/parsec/core/gui/trio_thread.py @@ -1,14 +1,14 @@ # Parsec Cloud (https://parsec.cloud) Copyright (c) AGPLv3 2016-2021 Scille SAS import threading -from contextlib import contextmanager +from contextlib import asynccontextmanager from inspect import iscoroutinefunction, signature import trio from structlog import get_logger from parsec.core.fs import FSError from parsec.core.mountpoint import MountpointError -from parsec.utils import trio_run, split_multi_error +from parsec.utils import split_multi_error from PyQt5.QtCore import pyqtBoundSignal, Q_ARG, QMetaObject, Qt @@ -41,13 +41,15 @@ def emit(self, *args): class QtToTrioJob: - def __init__(self, trio_token, fn, args, kwargs, qt_on_success, qt_on_error): - self._trio_token = trio_token + def __init__(self, fn, args, kwargs, qt_on_success, qt_on_error): + # Fool-proof sanity check, signals must be wrapped in `ThreadSafeQtSignal` + assert not [x for x in args if isinstance(x, pyqtBoundSignal)] + assert not [v for v in kwargs.values() if isinstance(v, pyqtBoundSignal)] assert isinstance(qt_on_success, ThreadSafeQtSignal) assert qt_on_success.args_types in ((), (QtToTrioJob,)) - self._qt_on_success = qt_on_success assert isinstance(qt_on_error, ThreadSafeQtSignal) assert qt_on_error.args_types in ((), (QtToTrioJob,)) + self._qt_on_success = qt_on_success self._qt_on_error = qt_on_error self._fn = fn self._args = args @@ -141,115 +143,25 @@ def _set_done(self): signal = self._qt_on_success if self.is_ok() else self._qt_on_error signal.emit(self) if signal.args_types else signal.emit() - def cancel_and_join(self): - assert self.cancel_scope - try: - trio.from_thread.run_sync(self.cancel_scope.cancel, trio_token=self._trio_token) - except trio.RunFinishedError: - pass - self._done.wait() + def cancel(self): + self.cancel_scope.cancel() class QtToTrioJobScheduler: - def __init__(self): - self._trio_token = None - self._cancel_scope = None - self.started = threading.Event() - self._stopped = trio.Event() - - async def _start(self, *, task_status=trio.TASK_STATUS_IGNORED): - assert not self.started.is_set() - self._trio_token = trio.lowlevel.current_trio_token() - self._send_job_channel, recv_job_channel = trio.open_memory_channel(1) - try: - async with trio.open_service_nursery() as nursery, recv_job_channel: - self._cancel_scope = nursery.cancel_scope - self.started.set() - task_status.started() - while True: - job = await recv_job_channel.receive() - assert job.status is None - await nursery.start(job._run_fn) - - finally: - self._stopped.set() - - async def _stop(self): - self._cancel_scope.cancel() - await self._send_job_channel.aclose() - await self._stopped.wait() - - def stop(self): - try: - trio.from_thread.run(self._stop, trio_token=self._trio_token) - except trio.RunFinishedError: - pass - - def _run_job(self, job, *args, sync=False): - try: - if sync: - return trio.from_thread.run_sync(job, *args, trio_token=self._trio_token) - else: - return trio.from_thread.run(job, *args, trio_token=self._trio_token) - - except trio.BrokenResourceError: - logger.info(f"The submitted job `{job}` won't run as the scheduler is stopped") - raise JobSchedulerNotAvailable("The job scheduler is stopped") - - except trio.RunFinishedError: - logger.info(f"The submitted job `{job}` won't run as the trio loop is not running") - raise JobSchedulerNotAvailable("The trio loop is not running") + def __init__(self, nursery): + self.nursery = nursery def submit_job(self, qt_on_success, qt_on_error, fn, *args, **kwargs): - # Fool-proof sanity check, signals must be wrapped in `ThreadSafeQtSignal` - assert not [x for x in args if isinstance(x, pyqtBoundSignal)] - assert not [v for v in kwargs.values() if isinstance(v, pyqtBoundSignal)] - job = QtToTrioJob(self._trio_token, fn, args, kwargs, qt_on_success, qt_on_error) - - async def _submit_job(): - # While inside this async function we are blocking the Qt thread - # hence we just wait for the job to start (to avoid concurrent - # crash if the job is cancelled) - await self._send_job_channel.send(job) - await job._started.wait() - - try: - self._run_job(_submit_job) - except JobSchedulerNotAvailable as exc: - job.set_cancelled(exc) - - return job - - # This method is only here for legacy purposes. - # It shouldn't NOT be used as running an async job synchronously - # might block the Qt loop for too long and cause the application - # to freeze. TODO: remove it later - - def run(self, afn, *args): - return self._run_job(afn, *args) - - # In contrast to the `run` method, it is acceptable to block - # the Qt loop while waiting for a synchronous job to finish - # as it shouldn't take too long (it might simply wait for - # a few scheduled trio task steps to finish). However, - # it shouln't be used too aggressively as it might still slow - # down the application. + assert not self.nursery._closed + job = QtToTrioJob(fn, args, kwargs, qt_on_success, qt_on_error) + self.nursery.start_soon(job._run_fn) def run_sync(self, fn, *args): - return self._run_job(fn, *args, sync=True) - - -@contextmanager -def run_trio_thread(): - job_scheduler = QtToTrioJobScheduler() - thread = threading.Thread(target=trio_run, args=[job_scheduler._start]) - thread.setName("TrioLoop") - thread.start() - job_scheduler.started.wait() + assert not self.nursery._closed + return fn(*args) - try: - yield job_scheduler - finally: - job_scheduler.stop() - thread.join() +@asynccontextmanager +async def run_trio_job_scheduler(): + async with trio.open_service_nursery() as nursery: + yield QtToTrioJobScheduler(nursery) diff --git a/setup.py b/setup.py index 300eedba45b..ba903934155 100644 --- a/setup.py +++ b/setup.py @@ -315,7 +315,7 @@ def run(self): ] -PYQT_DEPS = ["PyQt5==5.15.2", "pyqt5-sip==12.8.1"] +PYQT_DEPS = ["PyQt5==5.15.2", "pyqt5-sip==12.8.1", "qtrio==0.4.2"] BABEL_DEP = "Babel==2.6.0" WHEEL_DEP = "wheel==0.34.2" DOCUTILS_DEP = "docutils==0.15"