Skip to content

Commit

Permalink
Migrate to qtrio
Browse files Browse the repository at this point in the history
PR #1771, issue #1558
  • Loading branch information
vxgmichel authored Aug 2, 2021
2 parents 54133d2 + c3302fc commit 78aa989
Show file tree
Hide file tree
Showing 60 changed files with 1,145 additions and 1,621 deletions.
1 change: 1 addition & 0 deletions newsfragments/1771.empty.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Migrate to QTrio (using the trio guest loop mode) in order to have the core and the GUI running in the same thread.
3 changes: 2 additions & 1 deletion parsec/core/core_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ class CoreEvent(Enum):
GUI_CONFIG_CHANGED = "gui.config.changed"
# Mountpoint
MOUNTPOINT_REMOTE_ERROR = "mountpoint.remote_error"
MOUNTPOINT_STARTED = "mountpoint.started"
MOUNTPOINT_STARTING = "mountpoint.starting"
MOUNTPOINT_STARTED = "mountpoint.started"
MOUNTPOINT_STOPPING = "mountpoint.stopping"
MOUNTPOINT_STOPPED = "mountpoint.stopped"
MOUNTPOINT_UNHANDLED_ERROR = "mountpoint.unhandled_error"
# Others
Expand Down
127 changes: 56 additions & 71 deletions parsec/core/gui/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

import sys
import signal
from queue import Queue
from typing import Optional
from contextlib import contextmanager
from enum import Enum

import trio
import qtrio
from structlog import get_logger
from PyQt5.QtCore import QTimer, Qt
from PyQt5.QtWidgets import QApplication
Expand All @@ -28,7 +28,7 @@
from parsec.core.gui.new_version import CheckNewVersion
from parsec.core.gui.systray import systray_available, Systray
from parsec.core.gui.main_window import MainWindow
from parsec.core.gui.trio_thread import ThreadSafeQtSignal, run_trio_thread
from parsec.core.gui.trio_jobs import run_trio_job_scheduler
except ImportError as exc:
raise ModuleNotFoundError(
"""PyQt forms haven't been generated.
Expand All @@ -47,53 +47,47 @@ def _before_quit():
return _before_quit


IPCServerStartupOutcome = Enum("IPCServerStartupOutcome", "STARTED ALREADY_RUNNING ERROR")
async def _run_ipc_server(config, main_window, start_arg, task_status=trio.TASK_STATUS_IGNORED):
new_instance_needed = main_window.new_instance_needed
foreground_needed = main_window.foreground_needed

async def _cmd_handler(cmd):
if cmd["cmd"] == IPCCommand.FOREGROUND:
foreground_needed.emit()
elif cmd["cmd"] == IPCCommand.NEW_INSTANCE:
new_instance_needed.emit(cmd.get("start_arg"))
return {"status": "ok"}

async def _run_ipc_server(config, main_window, start_arg, result_queue):
try:
new_instance_needed_qt = ThreadSafeQtSignal(main_window, "new_instance_needed", object)
foreground_needed_qt = ThreadSafeQtSignal(main_window, "foreground_needed")
# Loop over attemps at running an IPC server or sending the command to an existing one
while True:

async def _cmd_handler(cmd):
if cmd["cmd"] == IPCCommand.FOREGROUND:
foreground_needed_qt.emit()
elif cmd["cmd"] == IPCCommand.NEW_INSTANCE:
new_instance_needed_qt.emit(cmd.get("start_arg"))
return {"status": "ok"}
# Attempt to run an IPC server if Parsec is not already started
try:
async with run_ipc_server(
_cmd_handler, config.ipc_socket_file, win32_mutex_name=config.ipc_win32_mutex_name
):
task_status.started()
await trio.sleep_forever()

while True:
# Parsec is already started, give it our work then
except IPCServerAlreadyRunning:

# Protect against race conditions, in case the server was shutting down
try:
async with run_ipc_server(
_cmd_handler,
config.ipc_socket_file,
win32_mutex_name=config.ipc_win32_mutex_name,
):
result_queue.put_nowait(IPCServerStartupOutcome.STARTED)
await trio.sleep_forever()

except IPCServerAlreadyRunning:
# Parsec is already started, give it our work then
try:
if start_arg:
await send_to_ipc_server(
config.ipc_socket_file, IPCCommand.NEW_INSTANCE, start_arg=start_arg
)
else:
await send_to_ipc_server(config.ipc_socket_file, IPCCommand.FOREGROUND)

except IPCServerNotRunning:
# IPC server has closed, retry to create our own
continue

# We have successfuly noticed the other running application
result_queue.put_nowait(IPCServerStartupOutcome.ALREADY_RUNNING)
break

except Exception:
result_queue.put_nowait(IPCServerStartupOutcome.ERROR)
# Let the exception bubble up so QtToTrioJob logged it as an unexpected error
raise
if start_arg:
await send_to_ipc_server(
config.ipc_socket_file, IPCCommand.NEW_INSTANCE, start_arg=start_arg
)
else:
await send_to_ipc_server(config.ipc_socket_file, IPCCommand.FOREGROUND)

# IPC server has closed, retry to create our own
except IPCServerNotRunning:
continue

# We have successfuly noticed the other running application
# We can now forward the exception to the caller
raise


@contextmanager
Expand Down Expand Up @@ -141,44 +135,36 @@ def run_gui(config: CoreConfig, start_arg: Optional[str] = None, diagnose: bool
Qt.HighDpiScaleFactorRoundingPolicy.PassThrough
)

# The parsec app needs to be instanciated before qtrio runs in order
# to be the default QApplication instance
app = ParsecApp()
assert QApplication.instance() is app
return qtrio.run(_run_gui, app, config, start_arg, diagnose)


async def _run_gui(
app: ParsecApp, config: CoreConfig, start_arg: str = None, diagnose: bool = False
):
app.load_stylesheet()
app.load_font()

lang_key = lang.switch_language(config)

event_bus = EventBus()
with run_trio_thread() as jobs_ctx:
async with run_trio_job_scheduler() as jobs_ctx:
win = MainWindow(
jobs_ctx=jobs_ctx,
quit_callback=jobs_ctx.close,
event_bus=event_bus,
config=config,
minimize_on_close=config.gui_tray_enabled and systray_available(),
)

result_queue = Queue(maxsize=1)

class ThreadSafeNoQtSignal(ThreadSafeQtSignal):
def __init__(self):
self.qobj = None
self.signal_name = ""
self.args_types = ()

def emit(self, *args):
pass

jobs_ctx.submit_job(
ThreadSafeNoQtSignal(),
ThreadSafeNoQtSignal(),
_run_ipc_server,
config,
win,
start_arg,
result_queue,
)
if result_queue.get() == IPCServerStartupOutcome.ALREADY_RUNNING:
# Another instance of Parsec already started, nothing more to do
# Attempt to run an IPC server if Parsec is not already started
try:
await jobs_ctx.nursery.start(_run_ipc_server, config, win, start_arg)
# Another instance of Parsec already started, nothing more to do
except IPCServerAlreadyRunning:
return

# If we are here, it's either the IPC server has successfully started
Expand Down Expand Up @@ -214,7 +200,6 @@ def emit(self, *args):

def kill_window(*args):
win.close_app(force=True)
QApplication.quit()

signal.signal(signal.SIGINT, kill_window)

Expand All @@ -234,7 +219,7 @@ def kill_window(*args):

if diagnose:
with fail_on_first_exception(kill_window):
return app.exec_()
await trio.sleep_forever()
else:
with log_pyqt_exceptions():
return app.exec_()
await trio.sleep_forever()
21 changes: 8 additions & 13 deletions parsec/core/gui/central_widget.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
FSWorkspaceNoWriteAccess,
FSWorkspaceInMaintenance,
)
from parsec.core.gui.trio_thread import QtToTrioJobScheduler
from parsec.core.gui.trio_jobs import QtToTrioJobScheduler
from parsec.core.gui.mount_widget import MountWidget
from parsec.core.gui.users_widget import UsersWidget
from parsec.core.gui.devices_widget import DevicesWidget
Expand All @@ -37,7 +37,7 @@
from parsec.core.gui.custom_widgets import Pixmap
from parsec.core.gui.custom_dialogs import show_error
from parsec.core.gui.ui.central_widget import Ui_CentralWidget
from parsec.core.gui.trio_thread import JobResultError, ThreadSafeQtSignal, QtToTrioJob
from parsec.core.gui.trio_jobs import JobResultError, QtToTrioJob


async def _do_get_organization_stats(core: LoggedCore) -> OrganizationStats:
Expand Down Expand Up @@ -78,7 +78,6 @@ class CentralWidget(QWidget, Ui_CentralWidget): # type: ignore[misc]
organization_stats_error = pyqtSignal(QtToTrioJob)

connection_state_changed = pyqtSignal(object, object)
vlobs_updated_qt = pyqtSignal()
logout_requested = pyqtSignal()
new_notification = pyqtSignal(str, str)

Expand Down Expand Up @@ -107,9 +106,8 @@ def __init__(
for e in self.NOTIFICATION_EVENTS:
self.event_bus.connect(e, cast(EventCallback, self.handle_event))

self.event_bus.connect(CoreEvent.FS_ENTRY_SYNCED, self._on_vlobs_updated_trio)
self.event_bus.connect(CoreEvent.BACKEND_REALM_VLOBS_UPDATED, self._on_vlobs_updated_trio)
self.vlobs_updated_qt.connect(self._on_vlobs_updated_qt)
self.event_bus.connect(CoreEvent.FS_ENTRY_SYNCED, self._on_vlobs_updated)
self.event_bus.connect(CoreEvent.BACKEND_REALM_VLOBS_UPDATED, self._on_vlobs_updated)

self.set_user_info()
menu = QMenu()
Expand Down Expand Up @@ -264,16 +262,13 @@ def _load_organization_stats(self, delay: float = 0) -> None:
self.jobs_ctx.submit_throttled_job(
"central_widget.load_organization_stats",
delay,
ThreadSafeQtSignal(self, "organization_stats_success", QtToTrioJob),
ThreadSafeQtSignal(self, "organization_stats_error", QtToTrioJob),
self.organization_stats_success,
self.organization_stats_error,
_do_get_organization_stats,
core=self.core,
)

def _on_vlobs_updated_trio(self, *args: object, **kwargs: object) -> None:
self.vlobs_updated_qt.emit()

def _on_vlobs_updated_qt(self) -> None:
def _on_vlobs_updated(self, *args: object, **kwargs: object) -> None:
self._load_organization_stats(delay=self.REFRESH_ORGANIZATION_STATS_DELAY)

def _on_connection_state_changed(
Expand Down Expand Up @@ -369,7 +364,7 @@ def go_to_file_link(self, addr: BackendOrganizationFileLinkAddr, mount: bool = T
if addr.organization_id != self.core.device.organization_id:
raise GoToFileLinkBadOrganizationIDError
try:
workspace = self.jobs_ctx.run_sync(self.core.user_fs.get_workspace, addr.workspace_id)
workspace = self.core.user_fs.get_workspace(addr.workspace_id)
except FSWorkspaceNotFoundError as exc:
raise GoToFileLinkBadWorkspaceIDError from exc
try:
Expand Down
Loading

0 comments on commit 78aa989

Please sign in to comment.