Skip to content

Commit

Permalink
Attempt a migration to qtrio
Browse files Browse the repository at this point in the history
  • Loading branch information
vxgmichel committed Jun 23, 2021
1 parent 2d2dfe2 commit a997080
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 146 deletions.
51 changes: 18 additions & 33 deletions parsec/core/gui/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@

import sys
import signal
from queue import Queue
from typing import Optional
from contextlib import contextmanager
from enum import Enum

import trio
import qtrio
from structlog import get_logger
from PyQt5.QtCore import QTimer, Qt
from PyQt5.QtWidgets import QApplication
Expand All @@ -28,7 +29,7 @@
from parsec.core.gui.new_version import CheckNewVersion
from parsec.core.gui.systray import systray_available, Systray
from parsec.core.gui.main_window import MainWindow
from parsec.core.gui.trio_thread import ThreadSafeQtSignal, run_trio_thread
from parsec.core.gui.trio_thread import ThreadSafeQtSignal, run_trio_job_scheduler
except ImportError as exc:
raise ModuleNotFoundError(
"""PyQt forms haven't been generated.
Expand All @@ -50,7 +51,7 @@ def _before_quit():
IPCServerStartupOutcome = Enum("IPCServerStartupOutcome", "STARTED ALREADY_RUNNING ERROR")


async def _run_ipc_server(config, main_window, start_arg, result_queue):
async def _run_ipc_server(config, main_window, start_arg, task_status=trio.TASK_STATUS_IGNORED):
try:
new_instance_needed_qt = ThreadSafeQtSignal(main_window, "new_instance_needed", object)
foreground_needed_qt = ThreadSafeQtSignal(main_window, "foreground_needed")
Expand All @@ -69,7 +70,7 @@ async def _cmd_handler(cmd):
config.ipc_socket_file,
win32_mutex_name=config.ipc_win32_mutex_name,
):
result_queue.put_nowait(IPCServerStartupOutcome.STARTED)
task_status.started(IPCServerStartupOutcome.STARTED)
await trio.sleep_forever()

except IPCServerAlreadyRunning:
Expand All @@ -87,12 +88,12 @@ async def _cmd_handler(cmd):
continue

# We have successfuly noticed the other running application
result_queue.put_nowait(IPCServerStartupOutcome.ALREADY_RUNNING)
task_status.started(IPCServerStartupOutcome.ALREADY_RUNNING)
break

except Exception:
result_queue.put_nowait(IPCServerStartupOutcome.ERROR)
# Let the exception bubble up so QtToTrioJob logged it as an unexpected error
except Exception as exc:
task_status.started(IPCServerStartupOutcome.ERROR)
logger.exection(exc)
raise


Expand Down Expand Up @@ -140,44 +141,28 @@ def run_gui(config: CoreConfig, start_arg: Optional[str] = None, diagnose: bool
QApplication.setHighDpiScaleFactorRoundingPolicy(
Qt.HighDpiScaleFactorRoundingPolicy.PassThrough
)
return qtrio.run(_run_gui, config, start_arg, diagnose)

app = ParsecApp()

async def _run_gui(config: CoreConfig, start_arg: str = None, diagnose: bool = False):
app = ParsecApp()
app.load_stylesheet()
app.load_font()

lang_key = lang.switch_language(config)

event_bus = EventBus()
with run_trio_thread() as jobs_ctx:
async with run_trio_job_scheduler() as jobs_ctx:
win = MainWindow(
jobs_ctx=jobs_ctx,
event_bus=event_bus,
config=config,
minimize_on_close=config.gui_tray_enabled and systray_available(),
)

result_queue = Queue(maxsize=1)

class ThreadSafeNoQtSignal(ThreadSafeQtSignal):
def __init__(self):
self.qobj = None
self.signal_name = ""
self.args_types = ()

def emit(self, *args):
pass

jobs_ctx.submit_job(
ThreadSafeNoQtSignal(),
ThreadSafeNoQtSignal(),
_run_ipc_server,
config,
win,
start_arg,
result_queue,
)
if result_queue.get() == IPCServerStartupOutcome.ALREADY_RUNNING:
result = await jobs_ctx.nursery.start(_run_ipc_server, config, win, start_arg)

if result == IPCServerStartupOutcome.ALREADY_RUNNING:
# Another instance of Parsec already started, nothing more to do
return

Expand Down Expand Up @@ -226,7 +211,7 @@ def kill_window(*args):

if diagnose:
with fail_on_first_exception(kill_window):
return app.exec_()
await trio.sleep_forever()
else:
with log_pyqt_exceptions():
return app.exec_()
await trio.sleep_forever()
11 changes: 7 additions & 4 deletions parsec/core/gui/instance_widget.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@
MountpointWinfspNotAvailable,
)

from parsec.core.gui.trio_thread import QtToTrioJobScheduler, ThreadSafeQtSignal
from parsec.core.gui.trio_thread import (
QtToTrioJobScheduler,
ThreadSafeQtSignal,
run_trio_job_scheduler,
)
from parsec.core.gui.parsec_application import ParsecApp
from parsec.core.gui.custom_dialogs import show_error
from parsec.core.gui.lang import translate as _
Expand All @@ -37,10 +41,9 @@ async def _do_run_core(config, device, qt_on_ready):
async with logged_core_factory(config=config, device=device, event_bus=None) as core:
# Create our own job scheduler allows us to cancel all pending
# jobs depending on us when we logout
core_jobs_ctx = QtToTrioJobScheduler()
async with trio.open_service_nursery() as nursery:
await nursery.start(core_jobs_ctx._start)
async with run_trio_job_scheduler() as core_jobs_ctx:
qt_on_ready.emit(core, core_jobs_ctx)
await trio.sleep_forever()


class InstanceWidget(QWidget):
Expand Down
128 changes: 20 additions & 108 deletions parsec/core/gui/trio_thread.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
# Parsec Cloud (https://parsec.cloud) Copyright (c) AGPLv3 2016-2021 Scille SAS

import threading
from contextlib import contextmanager
from contextlib import asynccontextmanager
from inspect import iscoroutinefunction, signature

import trio
from structlog import get_logger
from parsec.core.fs import FSError
from parsec.core.mountpoint import MountpointError
from parsec.utils import trio_run, split_multi_error
from parsec.utils import split_multi_error
from PyQt5.QtCore import pyqtBoundSignal, Q_ARG, QMetaObject, Qt


Expand Down Expand Up @@ -41,13 +41,15 @@ def emit(self, *args):


class QtToTrioJob:
def __init__(self, trio_token, fn, args, kwargs, qt_on_success, qt_on_error):
self._trio_token = trio_token
def __init__(self, fn, args, kwargs, qt_on_success, qt_on_error):
# Fool-proof sanity check, signals must be wrapped in `ThreadSafeQtSignal`
assert not [x for x in args if isinstance(x, pyqtBoundSignal)]
assert not [v for v in kwargs.values() if isinstance(v, pyqtBoundSignal)]
assert isinstance(qt_on_success, ThreadSafeQtSignal)
assert qt_on_success.args_types in ((), (QtToTrioJob,))
self._qt_on_success = qt_on_success
assert isinstance(qt_on_error, ThreadSafeQtSignal)
assert qt_on_error.args_types in ((), (QtToTrioJob,))
self._qt_on_success = qt_on_success
self._qt_on_error = qt_on_error
self._fn = fn
self._args = args
Expand Down Expand Up @@ -141,115 +143,25 @@ def _set_done(self):
signal = self._qt_on_success if self.is_ok() else self._qt_on_error
signal.emit(self) if signal.args_types else signal.emit()

def cancel_and_join(self):
assert self.cancel_scope
try:
trio.from_thread.run_sync(self.cancel_scope.cancel, trio_token=self._trio_token)
except trio.RunFinishedError:
pass
self._done.wait()
def cancel(self):
self.cancel_scope.cancel()


class QtToTrioJobScheduler:
def __init__(self):
self._trio_token = None
self._cancel_scope = None
self.started = threading.Event()
self._stopped = trio.Event()

async def _start(self, *, task_status=trio.TASK_STATUS_IGNORED):
assert not self.started.is_set()
self._trio_token = trio.lowlevel.current_trio_token()
self._send_job_channel, recv_job_channel = trio.open_memory_channel(1)
try:
async with trio.open_service_nursery() as nursery, recv_job_channel:
self._cancel_scope = nursery.cancel_scope
self.started.set()
task_status.started()
while True:
job = await recv_job_channel.receive()
assert job.status is None
await nursery.start(job._run_fn)

finally:
self._stopped.set()

async def _stop(self):
self._cancel_scope.cancel()
await self._send_job_channel.aclose()
await self._stopped.wait()

def stop(self):
try:
trio.from_thread.run(self._stop, trio_token=self._trio_token)
except trio.RunFinishedError:
pass

def _run_job(self, job, *args, sync=False):
try:
if sync:
return trio.from_thread.run_sync(job, *args, trio_token=self._trio_token)
else:
return trio.from_thread.run(job, *args, trio_token=self._trio_token)

except trio.BrokenResourceError:
logger.info(f"The submitted job `{job}` won't run as the scheduler is stopped")
raise JobSchedulerNotAvailable("The job scheduler is stopped")

except trio.RunFinishedError:
logger.info(f"The submitted job `{job}` won't run as the trio loop is not running")
raise JobSchedulerNotAvailable("The trio loop is not running")
def __init__(self, nursery):
self.nursery = nursery

def submit_job(self, qt_on_success, qt_on_error, fn, *args, **kwargs):
# Fool-proof sanity check, signals must be wrapped in `ThreadSafeQtSignal`
assert not [x for x in args if isinstance(x, pyqtBoundSignal)]
assert not [v for v in kwargs.values() if isinstance(v, pyqtBoundSignal)]
job = QtToTrioJob(self._trio_token, fn, args, kwargs, qt_on_success, qt_on_error)

async def _submit_job():
# While inside this async function we are blocking the Qt thread
# hence we just wait for the job to start (to avoid concurrent
# crash if the job is cancelled)
await self._send_job_channel.send(job)
await job._started.wait()

try:
self._run_job(_submit_job)
except JobSchedulerNotAvailable as exc:
job.set_cancelled(exc)

return job

# This method is only here for legacy purposes.
# It shouldn't NOT be used as running an async job synchronously
# might block the Qt loop for too long and cause the application
# to freeze. TODO: remove it later

def run(self, afn, *args):
return self._run_job(afn, *args)

# In contrast to the `run` method, it is acceptable to block
# the Qt loop while waiting for a synchronous job to finish
# as it shouldn't take too long (it might simply wait for
# a few scheduled trio task steps to finish). However,
# it shouln't be used too aggressively as it might still slow
# down the application.
assert not self.nursery._closed
job = QtToTrioJob(fn, args, kwargs, qt_on_success, qt_on_error)
self.nursery.start_soon(job._run_fn)

def run_sync(self, fn, *args):
return self._run_job(fn, *args, sync=True)


@contextmanager
def run_trio_thread():
job_scheduler = QtToTrioJobScheduler()
thread = threading.Thread(target=trio_run, args=[job_scheduler._start])
thread.setName("TrioLoop")
thread.start()
job_scheduler.started.wait()
assert not self.nursery._closed
return fn(*args)

try:
yield job_scheduler

finally:
job_scheduler.stop()
thread.join()
@asynccontextmanager
async def run_trio_job_scheduler():
async with trio.open_service_nursery() as nursery:
yield QtToTrioJobScheduler(nursery)
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ def run(self):
]


PYQT_DEPS = ["PyQt5==5.15.2", "pyqt5-sip==12.8.1"]
PYQT_DEPS = ["PyQt5==5.15.2", "pyqt5-sip==12.8.1", "qtrio==0.4.2"]
BABEL_DEP = "Babel==2.6.0"
WHEEL_DEP = "wheel==0.34.2"
DOCUTILS_DEP = "docutils==0.15"
Expand Down

0 comments on commit a997080

Please sign in to comment.