From 3d9ccdae5c51c9857fca7c85ea145c0a212cb855 Mon Sep 17 00:00:00 2001 From: Peter Hutterer Date: Wed, 30 Mar 2022 15:15:06 +1000 Subject: [PATCH] Add a dbusmock-based Python test suite For complex portals like the InputCapture portal, the current test suite is difficult to use and adapt. A much simpler approach is to use dbusmock to provide the impl.portal emulation, use Python to poke at the actual DBus API of the portal and have xdg-desktop-portal sit in between to negotiate the two. This patch adds a test suite that provides enough infrastructure to add tests for other portals in a similar manner - luckily the impl side is relatively trivial to implement in dbusmock. --- tests/__init__.py | 239 +++++++++++++ tests/portals/test.portal | 2 +- tests/templates/__init__.py | 7 + tests/templates/inputcapture.py | 312 ++++++++++++++++ tests/test_inputcapture.py | 607 ++++++++++++++++++++++++++++++++ 5 files changed, 1166 insertions(+), 1 deletion(-) create mode 100644 tests/__init__.py create mode 100644 tests/templates/__init__.py create mode 100644 tests/templates/inputcapture.py create mode 100644 tests/test_inputcapture.py diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 000000000..93bb6d5fb --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,239 @@ +#!/usr/bin/env python3 +# +# SPDX-License-Identifier: LGPL-2.1-or-later +# +# This file is formatted with Python Black +# + +# Shared setup for portal tests. To test a portal, subclass TestPortal with +# your portal's name (e.g. TestInputCapture). This will auto-fill your portal +# name into some of the functions. +# +# Make sure you have a dbusmock template for the impl.portal of your portal in +# tests/templates. See the dbusmock documentaiton for details on that. +# +# Environment variables: +# XDP_DBUS_MONITOR: if set, starts dbus_monitor on the custom bus, useful +# for debugging +# XDP_UNINSTALLED: run from $PWD (with an emulation of the glib test behavior) +# LIBEXECDIR: run xdg-desktop-portal from that dir, required if +# XDP_UNINSTALLEDis unset +# + +from dbus.mainloop.glib import DBusGMainLoop +from gi.repository import GLib +from itertools import count +from typing import Any, Dict, Tuple +from pathlib import Path + +import dbus +import dbusmock +import fcntl +import logging +import os +import subprocess +import time + +DBusGMainLoop(set_as_default=True) + +_counter = count() + +ASV = Dict[str, Any] + +logger = logging.getLogger("tests") + + +class Request: + """ + Helper class for executing methods that use Requests. + """ + def __init__(self, bus: dbus.Bus, interface: dbus.Interface): + self.interface = interface + self._handle_token = f"request{next(_counter)}" + self.response = -1 + self.results = None + + def sanitize(name): + return name.lstrip(':').replace('.', '_') + + sender_token = sanitize(bus.get_unique_name()) + self.handle = f"/org/freedesktop/portal/desktop/request/{sender_token}/{self._handle_token}" + proxy = bus.get_object( + "org.freedesktop.portal.Desktop", self.handle + ) + self.request_interface = dbus.Interface(proxy, "org.freedesktop.portal.Request") + + def cb_response(response, results): + self.response = response + self.results = results + + self.request_interface.connect_to_signal("Response", cb_response) + + @property + def handle_token(self) -> dbus.String: + """ + Returns the dbus-ready handle_token, ready to be put into the options + """ + return dbus.String(self._handle_token, variant_level=1) + + def call(self, methodname, **kwargs) -> Tuple[int, ASV]: + """ + Synchronously call method ``methodname`` on the interface given in the + request's constructor. The kwargs must be specified in the order the + DBus method takes them but the handle_token is automatically filled + in. + """ + try: + options = kwargs["options"] + except KeyError: + options = dbus.Dictionary({}, signature="sv") + + if "handle_token" not in options: + options["handle_token"] = self.handle_token + + method = getattr(self.interface, methodname) + assert method + handle = method(*list(kwargs.values())) + assert handle.endswith(self._handle_token) + + # Wait 300ms for the Response signal + def timeout(): + loop.quit() + + loop = GLib.MainLoop() + GLib.timeout_add(300, timeout) + loop.run() + + return self.response, self.results + + +class TestPortal(dbusmock.DBusTestCase): + @classmethod + def setUpClass(cls): + logging.basicConfig( + format="%(levelname).1s|%(name)s: %(message)s", level=logging.DEBUG + ) + + cls.start_session_bus() + cls.dbus_con = cls.get_dbus(system_bus=False) + assert cls.dbus_con + + def setUp(self): + self.p_mock = None + self.xdp = None + self._portal_interfaces = {} + + def start_impl_portal(self, params=None, portal=None): + """ + Start the impl.portal for the given portal name. If missing, + the portal name is derived from the class name of the test, e.g. + TestFoo will start impl.portal.Foo. + """ + if portal is None: + portal = type(self).__name__.removeprefix("Test") + self.p_mock, self.obj_portal = self.spawn_server_template( + template=f"tests/templates/{portal.lower()}.py", + parameters=params, + stdout=subprocess.PIPE, + ) + flags = fcntl.fcntl(self.p_mock.stdout, fcntl.F_GETFL) + fcntl.fcntl(self.p_mock.stdout, fcntl.F_SETFL, flags | os.O_NONBLOCK) + self.mock_interface = dbus.Interface(self.obj_portal, dbusmock.MOCK_IFACE) + + self.start_dbus_monitor() + + def start_xdp(self): + """ + Start the xdg-desktop-portal process + """ + + # This roughly resembles test-portals.c and glib's test behavior + if os.getenv("XDP_UNINSTALLED", None): + builddir = Path(os.getenv("G_TEST_BUILDDIR") or "tests") / ".." + else: + builddir = os.getenv("LIBEXECDIR") + if not builddir: + raise NotImplementedError("LIBEXECDIR is not set") + + distdir = os.getenv("G_TEST_SRCDIR") or "tests" + portal_dir = Path(distdir) / "portals" + + argv = [Path(builddir) / "xdg-desktop-portal"] + env = os.environ.copy() + env["G_DEBUG"] = "fatal-criticals" + env["XDG_DESKTOP_PORTAL_DIR"] = portal_dir + + xdp = subprocess.Popen(argv, env=env) + + timeout = 500 + while timeout > 0: + if self.dbus_con.name_has_owner("org.freedesktop.portal.Desktop"): + break + timeout -= 1 + time.sleep(0.1) + + assert timeout > 0, "Timeout while waiting for xdg-desktop-portal to claim the bus" + + self.xdp = xdp + + def start_dbus_monitor(self): + self.dbus_monitor = None + if not os.getenv("XDP_DBUS_MONITOR"): + return + + argv = ['dbus-monitor', '--session'] + self.dbus_monitor = subprocess.Popen(argv); + + def tearDown(self): + if self.dbus_monitor: + self.dbus_monitor.terminate() + self.dbus_monitor.wait() + + if self.xdp: + self.xdp.terminate() + self.xdp.wait() + + if self.p_mock: + if self.p_mock.stdout: + out = (self.p_mock.stdout.read() or b"").decode("utf-8") + if out: + print(out) + self.p_mock.stdout.close() + self.p_mock.terminate() + self.p_mock.wait() + + def get_xdp_dbus_object(self) -> dbus.proxies.ProxyObject: + """ + Return the object that is the org.freedesktop.portal.Desktop proxy + """ + try: + return self._xdp_dbus_object + except AttributeError: + obj = self.dbus_con.get_object( + "org.freedesktop.portal.Desktop", "/org/freedesktop/portal/desktop" + ) + # Useful for debugging: + # print(obj.Introspect(dbus_interface="org.freedesktop.DBus.Introspectable")) + assert obj + self._xdp_dbus_object = obj + return self._xdp_dbus_object + + def get_dbus_interface(self, name=None) -> dbus.Interface: + """ + Return the interface with the given name. + + For portals, it's enough to specify the portal name (e.g. "InputCapture"). + If no name is provided, guess from the test class name. + """ + if name is None: + name = type(self).__name__.removeprefix("Test") + if "." not in name: + name = f"org.freedesktop.portal.{name}" + + try: + intf = self._portal_interfaces[name] + except KeyError: + intf = dbus.Interface(self.get_xdp_dbus_object(), name) + assert intf + self._portal_interfaces[name] = intf + return intf diff --git a/tests/portals/test.portal b/tests/portals/test.portal index e43f90e3d..2a3b69521 100644 --- a/tests/portals/test.portal +++ b/tests/portals/test.portal @@ -1,4 +1,4 @@ [portal] DBusName=org.freedesktop.impl.portal.Test -Interfaces=org.freedesktop.impl.portal.Account;org.freedesktop.impl.portal.Email;org.freedesktop.impl.portal.FileChooser;org.freedesktop.impl.portal.Screenshot;org.freedesktop.impl.portal.Lockdown;org.freedesktop.impl.portal.Print;org.freedesktop.impl.portal.Access;org.freedesktop.impl.portal.Inhibit;org.freedesktop.impl.portal.AppChooser;org.freedesktop.impl.portal.Wallpaper;org.freedesktop.impl.portal.Background;org.freedesktop.impl.portal.Notification;org.freedesktop.impl.portal.Settings; +Interfaces=org.freedesktop.impl.portal.Account;org.freedesktop.impl.portal.Email;org.freedesktop.impl.portal.FileChooser;org.freedesktop.impl.portal.Screenshot;org.freedesktop.impl.portal.Lockdown;org.freedesktop.impl.portal.Print;org.freedesktop.impl.portal.Access;org.freedesktop.impl.portal.Inhibit;org.freedesktop.impl.portal.AppChooser;org.freedesktop.impl.portal.Wallpaper;org.freedesktop.impl.portal.Background;org.freedesktop.impl.portal.Notification;org.freedesktop.impl.portal.Settings;org.freedesktop.impl.portal.InputCapture; UseIn=test diff --git a/tests/templates/__init__.py b/tests/templates/__init__.py new file mode 100644 index 000000000..42044c389 --- /dev/null +++ b/tests/templates/__init__.py @@ -0,0 +1,7 @@ +# SPDX-License-Identifier: LGPL-2.1-or-later +# +# This file is formatted with Python Black + +import logging + +logging.basicConfig(format="%(levelname).1s|%(name)s: %(message)s", level=logging.DEBUG) diff --git a/tests/templates/inputcapture.py b/tests/templates/inputcapture.py new file mode 100644 index 000000000..c335fa30b --- /dev/null +++ b/tests/templates/inputcapture.py @@ -0,0 +1,312 @@ +# SPDX-License-Identifier: LGPL-2.1-or-later +# +# This file is formatted with Python Black + +from collections import namedtuple +from itertools import count +from gi.repository import GLib + +import tests.templates # for logger setup +import dbus.service +import logging +import socket + +BUS_NAME = "org.freedesktop.impl.portal.Test" +MAIN_OBJ = "/org/freedesktop/portal/desktop" +SYSTEM_BUS = False +MAIN_IFACE = "org.freedesktop.impl.portal.InputCapture" +VERSION = 1 + +logger = logging.getLogger(f"templates.{__name__}") +logger.setLevel(logging.DEBUG) + +serials = count() + +Response = namedtuple("Response", ["response", "results"]) +Barrier = namedtuple("Barrier", ["id", "position"]) + + +def load(mock, parameters=None): + logger.debug(f"Loading parameters: {parameters}") + # Delay before Request.response + mock.delay: int = parameters.get("delay", 0) + + mock.supported_capabilities = parameters.get("supported_capabilities", 0xF) + # The actual ones we reply with in the CreateSession request + mock.capabilities = parameters.get("capabilities", None) + + mock.default_zone = parameters.get("default-zone", [(1920, 1080, 0, 0)]) + mock.current_zones = mock.default_zone + mock.current_zone_serial = next(serials) + + mock.disable_delay = parameters.get("disable-delay", 0) + mock.activated_delay = parameters.get("activated-delay", 0) + mock.deactivated_delay = parameters.get("deactivated-delay", 0) + + mock.AddProperties( + MAIN_IFACE, + dbus.Dictionary( + { + "version": dbus.UInt32(parameters.get("version", VERSION)), + "SupportedCapabilities": dbus.UInt32(mock.supported_capabilities), + } + ), + ) + + mock.active_session_handles = [] + + +@dbus.service.method( + MAIN_IFACE, + in_signature="oossa{sv}", + out_signature="ua{sv}", +) +def CreateSession(self, handle, session_handle, app_id, parent_window, options): + try: + logger.debug(f"CreateSession({parent_window}, {options})") + + assert "capabilities" in options + + # Filter to the subset of supported capabilities + if self.capabilities is None: + capabilities = options["capabilities"] + else: + capabilities = self.capabilities + + capabilities &= self.supported_capabilities + response = Response(0, {}) + + response.results["capabilities"] = dbus.UInt32(capabilities) + self.active_session_handles.append(session_handle) + + logger.debug(f"CreateSession with response {response}") + + return response.response, response.results + except Exception as e: + logger.critical(e) + return (2, {}) + + +@dbus.service.method( + MAIN_IFACE, + in_signature="oosa{sv}", + out_signature="ua{sv}", +) +def GetZones(self, handle, session_handle, app_id, options): + try: + logger.debug(f"GetZones({session_handle}, {options})") + + assert session_handle in self.active_session_handles + + response = Response(0, {}) + response.results["zones"] = self.default_zone + response.results["serial"] = dbus.UInt32( + self.current_zone_serial, variant_level=1 + ) + logger.debug(f"GetZones with response {response}") + + if response.response == 0: + self.current_zones = response.results["zones"] + + return response.response, response.results + except Exception as e: + logger.critical(e) + return (2, {}) + + +@dbus.service.method( + MAIN_IFACE, + in_signature="oosa{sv}aa{sv}u", + out_signature="ua{sv}", +) +def SetPointerBarriers(self, handle, session_handle, app_id, options, barriers, serial): + try: + logger.debug( + f"SetPointerBarriers({session_handle}, {options}, {barriers}, {serial})" + ) + + assert session_handle in self.active_session_handles + assert serial == self.current_zone_serial + + self.current_barriers = [] + + failed_barriers = [] + + # Barrier sanity checks: + for b in barriers: + id = b["barrier_id"] + x1, y1, x2, y2 = b["position"] + if (x1 != x2 and y1 != y2) or (x1 == x2 and y1 == y2): + logger.debug(f"Barrier {id} is not horizontal or vertical") + failed_barriers.append(id) + continue + + for z in self.current_zones: + w, h, x, y = z + if x1 < x or x1 > x + w: + continue + if y1 < y or y1 > y + h: + continue + + # x1/y1 fit into our current zone + if x2 < x or x2 > x + w or y2 < y or y2 > y + h: + logger.debug(f"Barrier {id} spans multiple zones") + elif x1 == x2 and (x1 != x and x1 != x + w): + logger.debug(f"Barrier {id} is not on vertical edge") + elif y1 == y2 and (y1 != y and y1 != y + h): + logger.debug(f"Barrier {id} is not on horizontal edge") + else: + self.current_barriers.append(Barrier(id=id, position=b["position"])) + break + + failed_barriers.append(id) + break + else: + logger.debug(f"Barrier {id} does not fit into any zone") + failed_barriers.append(id) + continue + + response = Response(0, {}) + response.results["failed_barriers"] = dbus.Array( + [dbus.UInt32(f) for f in failed_barriers], + signature="u", + variant_level=1, + ) + + logger.debug(f"SetPointerBarriers with response {response}") + + return response.response, response.results + except Exception as e: + logger.critical(e) + return (2, {}) + + +@dbus.service.method( + MAIN_IFACE, + in_signature="oosa{sv}", + out_signature="ua{sv}", +) +def Enable(self, handle, session_handle, app_id, options): + try: + logger.debug(f"Enable({session_handle}, {options})") + + assert session_handle in self.active_session_handles + + response = Response(0, {}) + + logger.debug(f"Enable with response {response}") + + # for use in the signals + serial = next(serials) + barrier = self.current_barriers[0] + pos = (barrier.position[0] + 10, barrier.position[1] + 20) + + if self.disable_delay > 0: + + def disable(): + logger.debug("emitting Disabled") + self.EmitSignal("", "Disabled", "oa{sv}", [session_handle, {}]) + + GLib.timeout_add(self.disable_delay, disable) + + if self.activated_delay > 0: + + def activated(): + logger.debug("emitting Activated") + options = { + "serial": dbus.UInt32(serial, variant_level=1), + "barrier_id": dbus.UInt32(barrier.id, variant_level=1), + "cursor_position": dbus.Struct( + pos, signature="ii", variant_level=1 + ), + } + self.EmitSignal("", "Activated", "oa{sv}", [session_handle, options]) + + GLib.timeout_add(self.activated_delay, activated) + + if self.deactivated_delay > 0: + + def deactivated(): + logger.debug("emitting Deactivated") + options = { + "serial": dbus.UInt32(serial, variant_level=1), + "cursor_position": dbus.Struct( + pos, signature="ii", variant_level=1 + ), + } + self.EmitSignal("", "Deactivated", "oa{sv}", [session_handle, options]) + + GLib.timeout_add(self.deactivated_delay, deactivated) + + return response.response, response.results + except Exception as e: + logger.critical(e) + return (2, {}) + + +@dbus.service.method( + MAIN_IFACE, + in_signature="oosa{sv}", + out_signature="ua{sv}", +) +def Disable(self, handle, session_handle, app_id, options): + try: + logger.debug(f"Disable({session_handle}, {options})") + + assert session_handle in self.active_session_handles + + response = Response(0, {}) + + logger.debug(f"Disable with response {response}") + + return response.response, response.results + except Exception as e: + logger.critical(e) + return (2, {}) + + +@dbus.service.method( + MAIN_IFACE, + in_signature="oosa{sv}", + out_signature="ua{sv}", +) +def Release(self, handle, session_handle, app_id, options): + try: + logger.debug(f"Release({session_handle}, {options})") + + assert session_handle in self.active_session_handles + + response = Response(0, {}) + + logger.debug(f"Release with response {response}") + + return response.response, response.results + except Exception as e: + logger.critical(e) + return (2, {}) + + +@dbus.service.method( + MAIN_IFACE, + in_signature="osa{sv}", + out_signature="h", +) +def ConnectToEIS(self, session_handle, app_id, options): + try: + logger.debug(f"ConnectToEIS({session_handle}, {options})") + + assert session_handle in self.active_session_handles + + sockets = socket.socketpair() + self.eis_socket = sockets[0] + + assert self.eis_socket.send(b"HELLO") == 5 + + fd = sockets[1] + + logger.debug(f"ConnectToEis with fd {fd.fileno()}") + + return dbus.types.UnixFd(fd) + except Exception as e: + logger.critical(e) + return -1 diff --git a/tests/test_inputcapture.py b/tests/test_inputcapture.py new file mode 100644 index 000000000..4fc742a15 --- /dev/null +++ b/tests/test_inputcapture.py @@ -0,0 +1,607 @@ +# SPDX-License-Identifier: LGPL-2.1-or-later +# +# This file is formatted with Python Black + +from typing import Any, Dict, List, Tuple +from gi.repository import GLib + +from itertools import count + +import dbus +import socket + +from tests import Request, TestPortal + +counter = count() + + +class TestInputCapture(TestPortal): + def create_session(self, capabilities=0xf): + """ + Call CreateSession for the given capabilities and return the + (response, results) tuple. + """ + inputcapture_intf = self.get_dbus_interface() + request = Request(self.dbus_con, inputcapture_intf) + + capabilities = dbus.UInt32(capabilities, variant_level=1) + session_handle_token = dbus.String(f"session{next(counter)}", variant_level=1) + + options = dbus.Dictionary( + { + "capabilities": capabilities, + "session_handle_token": session_handle_token, + }, + signature="sv", + ) + + response, results = request.call( + "CreateSession", parent_window="", options=options + ) + assert response == 0 + assert "session_handle" in results + assert "capabilities" in results + caps = results["capabilities"] + # Returned capabilities must be a subset of the requested ones + assert caps & ~capabilities == 0 + + self.current_session_handle = results["session_handle"] + + # Check the impl portal was called with the right args + method_calls = self.mock_interface.GetMethodCalls("CreateSession") + assert len(method_calls) > 0 + _, args = method_calls[-1] + assert args[3] == "" # parent window + assert args[4]["capabilities"] == capabilities + + return response, results + + def get_zones(self): + """ + Call GetZones and return the (response, results) tuple. + """ + inputcapture_intf = self.get_dbus_interface() + request = Request(self.dbus_con, inputcapture_intf) + options = {} + response, results = request.call( + "GetZones", session_handle=self.current_session_handle, options=options + ) + assert response == 0 + assert "zones" in results + assert "serial" in results + + self.current_zone_serial = results["serial"] + + # Check the impl portal was called with the right args + method_calls = self.mock_interface.GetMethodCalls("GetZones") + assert len(method_calls) > 0 + _, args = method_calls[-1] + assert args[0] == request.handle + assert args[1] == self.current_session_handle + + return response, results + + def set_pointer_barriers(self, barriers): + inputcapture_intf = self.get_dbus_interface() + request = Request(self.dbus_con, inputcapture_intf) + options = {} + response, results = request.call( + "SetPointerBarriers", + session_handle=self.current_session_handle, + options=options, + barriers=barriers, + serial=self.current_zone_serial, + ) + assert response == 0 + assert "failed_barriers" in results + + # Check the impl portal was called with the right args + method_calls = self.mock_interface.GetMethodCalls("SetPointerBarriers") + assert len(method_calls) > 0 + _, args = method_calls[-1] + assert args[0] == request.handle + assert args[1] == self.current_session_handle + assert args[4] == barriers + assert args[5] == self.current_zone_serial + + return response, results + + def connect_to_eis(self): + inputcapture_intf = self.get_dbus_interface() + fd = inputcapture_intf.ConnectToEIS(self.current_session_handle, dbus.Dictionary({}, signature="sv")) + + # Our dbusmock template sends HELLO + eis_socket = socket.fromfd(fd.take(), socket.AF_UNIX, socket.SOCK_STREAM) + hello = eis_socket.recv(10) + assert hello == b"HELLO" + + method_calls = self.mock_interface.GetMethodCalls("ConnectToEIS") + assert len(method_calls) > 0 + _, args = method_calls[-1] + assert args[0] == self.current_session_handle + + return eis_socket + + def enable(self): + inputcapture_intf = self.get_dbus_interface() + request = Request(self.dbus_con, inputcapture_intf) + options = {} + response, results = request.call( + "Enable", + session_handle=self.current_session_handle, + options=options, + ) + assert response == 0 + method_calls = self.mock_interface.GetMethodCalls("Enable") + assert len(method_calls) > 0 + _, args = method_calls[-1] + assert args[0] == request.handle + assert args[1] == self.current_session_handle + + return response, results + + def disable(self): + inputcapture_intf = self.get_dbus_interface() + request = Request(self.dbus_con, inputcapture_intf) + options = {} + response, results = request.call( + "Disable", + session_handle=self.current_session_handle, + options=options, + ) + assert response == 0 + method_calls = self.mock_interface.GetMethodCalls("Disable") + assert len(method_calls) > 0 + _, args = method_calls[-1] + assert args[0] == request.handle + assert args[1] == self.current_session_handle + + return response, results + + def release(self, cursor_position=None): + inputcapture_intf = self.get_dbus_interface() + request = Request(self.dbus_con, inputcapture_intf) + options = {} + if cursor_position: + options["cursor_position"] = dbus.Struct(list(cursor_position), signature="ii", variant_level=1) + response, results = request.call( + "Release", + session_handle=self.current_session_handle, + options=options, + ) + assert response == 0 + method_calls = self.mock_interface.GetMethodCalls("Release") + assert len(method_calls) > 0 + _, args = method_calls[-1] + assert args[0] == request.handle + assert args[1] == self.current_session_handle + if cursor_position: + assert "cursor_position" in args[3] + pos = args[3]["cursor_position"] + assert pos == cursor_position + + return response, results + + def test_version(self): + self.start_impl_portal() + self.start_xdp() + + properties_intf = self.get_dbus_interface("org.freedesktop.DBus.Properties") + version = properties_intf.Get("org.freedesktop.portal.InputCapture", "version") + EXPECTED_VERSION = 1 + assert version == EXPECTED_VERSION + + def test_supported_capabilities(self): + params = { + "supported_capabilities": 0b1110, # POINTER_ABS, KEYBOARD, TOUCH + } + self.start_impl_portal(params) + self.start_xdp() + + properties_intf = self.get_dbus_interface("org.freedesktop.DBus.Properties") + caps = properties_intf.Get( + "org.freedesktop.portal.InputCapture", "SupportedCapabilities" + ) + assert caps == 0b1110 + + def test_create_session(self): + self.start_impl_portal() + self.start_xdp() + + self.create_session(capabilities=0b1) # RELATIVE_POINTER + + # Check the impl portal was called with the right args + method_calls = self.mock_interface.GetMethodCalls("CreateSession") + assert len(method_calls) == 1 + _, args = method_calls.pop(0) + assert args[3] == "" # parent window + assert args[4]["capabilities"] == 0b1 + + def test_create_session_limited_caps(self): + params = { + "capabilities": 0b110, # POINTER_ABS, KEYBOARD + "supported_capabilities": 0b1110, # POINTER_ABS, KEYBOARD, TOUCH + } + self.start_impl_portal(params) + self.start_xdp() + + # Request more caps than are supported + response, results = self.create_session(capabilities=0b1111) + caps = results["capabilities"] + # Returned capabilities must the ones we set up in the params + assert caps == 0b110 + + # Check the impl portal was called with the right args + method_calls = self.mock_interface.GetMethodCalls("CreateSession") + assert len(method_calls) == 1 + _, args = method_calls.pop(0) + assert args[3] == "" # parent window + assert args[4]["capabilities"] == 0b1111 + + + def test_get_zones(self): + zones = [(1024, 768, 0, 0), (640, 480, 1024, 0)] + + params = { + "default-zone": dbus.Array( + [dbus.Struct(z, signature="uuii") for z in zones], + signature="(uuii)", + variant_level=1, + ), + } + self.start_impl_portal(params) + self.start_xdp() + + response, results = self.create_session() + response, results = self.get_zones() + for z1, z2 in zip(results["zones"], zones): + assert z1 == z2 + + # Check the impl portal was called with the right args + method_calls = self.mock_interface.GetMethodCalls("CreateSession") + assert len(method_calls) == 1 + method_calls = self.mock_interface.GetMethodCalls("GetZones") + assert len(method_calls) == 1 + + def test_set_pointer_barriers(self): + zones = [(1024, 768, 0, 0), (640, 480, 1024, 0)] + + params = { + "default-zone": dbus.Array( + [dbus.Struct(z, signature="uuii") for z in zones], + signature="(uuii)", + variant_level=1, + ), + } + self.start_impl_portal(params) + self.start_xdp() + + response, results = self.create_session() + response, results = self.get_zones() + + barriers = [ + { + "barrier_id": dbus.UInt32(10, variant_level=1), + "position": dbus.Struct( + [0, 0, 0, 768], signature="iiii", variant_level=1 + ), + }, + { + "barrier_id": dbus.UInt32(11, variant_level=1), + "position": dbus.Struct( + [0, 0, 1024, 0], signature="iiii", variant_level=1 + ), + }, + { + "barrier_id": dbus.UInt32(12, variant_level=1), + "position": dbus.Struct( + [1024, 0, 1024, 768], signature="iiii", variant_level=1 + ), + }, + { + "barrier_id": dbus.UInt32(13, variant_level=1), + "position": dbus.Struct( + [0, 768, 1024, 768], signature="iiii", variant_level=1 + ), + }, + { + "barrier_id": dbus.UInt32(14, variant_level=1), + "position": dbus.Struct( + [100, 768, 500, 768], signature="iiii", variant_level=1 + ), + }, + { + "barrier_id": dbus.UInt32(15, variant_level=1), + "position": dbus.Struct( + [1024, 0, 1024, 480], signature="iiii", variant_level=1 + ), + }, + { + "barrier_id": dbus.UInt32(16, variant_level=1), + "position": dbus.Struct( + [1024 + 640, 0, 1024 + 640, 480], signature="iiii", variant_level=1 + ), + }, + # invalid ones + { + "barrier_id": dbus.UInt32(20, variant_level=1), + "position": dbus.Struct( + [0, 1, 3, 4], signature="iiii", variant_level=1 + ), + }, + { + "barrier_id": dbus.UInt32(21, variant_level=1), + "position": dbus.Struct( + [0, 1, 1024, 1], signature="iiii", variant_level=1 + ), + }, + { + "barrier_id": dbus.UInt32(22, variant_level=1), + "position": dbus.Struct( + [1, 0, 1, 768], signature="iiii", variant_level=1 + ), + }, + { + "barrier_id": dbus.UInt32(23, variant_level=1), + "position": dbus.Struct( + [1023, 0, 1023, 768], signature="iiii", variant_level=1 + ), + }, + { + "barrier_id": dbus.UInt32(24, variant_level=1), + "position": dbus.Struct( + [0, 0, 1050, 0], signature="iiii", variant_level=1 + ), + }, + ] + response, results = self.set_pointer_barriers(barriers=barriers) + failed_barriers = results["failed_barriers"] + assert all([id >= 20 for id in failed_barriers]) + + for id in [b["barrier_id"] for b in barriers if b["barrier_id"] >= 20]: + assert id in failed_barriers + + # Check the impl portal was called with the right args + method_calls = self.mock_interface.GetMethodCalls("CreateSession") + assert len(method_calls) == 1 + method_calls = self.mock_interface.GetMethodCalls("GetZones") + assert len(method_calls) == 1 + method_calls = self.mock_interface.GetMethodCalls("SetPointerBarriers") + assert len(method_calls) == 1 + _, args = method_calls.pop(0) + assert args[4] == barriers + assert args[5] == self.current_zone_serial + + def test_connect_to_eis(self): + self.start_impl_portal() + self.start_xdp() + + self.create_session() + self.get_zones() + + # The default zone is 1920x1080 + barriers = [ + { + "barrier_id": dbus.UInt32(10, variant_level=1), + "position": dbus.Struct( + [0, 0, 1920, 0], signature="iiii", variant_level=1 + ), + }, + ] + self.set_pointer_barriers(barriers) + + self.connect_to_eis() + + def test_enable_disable(self): + self.start_impl_portal() + self.start_xdp() + + self.create_session() + self.create_session() + self.get_zones() + + # The default zone is 1920x1080 + barriers = [ + { + "barrier_id": dbus.UInt32(10, variant_level=1), + "position": dbus.Struct( + [0, 0, 1920, 0], signature="iiii", variant_level=1 + ), + }, + ] + self.set_pointer_barriers(barriers) + self.connect_to_eis() + + # Disable before enable should be a noop + response, results = self.disable() + assert response == 0 + method_calls = self.mock_interface.GetMethodCalls("Disable") + assert len(method_calls) == 1 + + response, results = self.enable() + assert response == 0 + method_calls = self.mock_interface.GetMethodCalls("Enable") + assert len(method_calls) == 1 + + response, results = self.disable() + assert response == 0 + method_calls = self.mock_interface.GetMethodCalls("Disable") + assert len(method_calls) == 2 + + def test_disable_signal(self): + params = { + "disable-delay": 200, + } + self.start_impl_portal(params) + self.start_xdp() + + self.create_session() + self.get_zones() + # The default zone is 1920x1080 + barriers = [ + { + "barrier_id": dbus.UInt32(10, variant_level=1), + "position": dbus.Struct( + [0, 0, 1920, 0], signature="iiii", variant_level=1 + ), + }, + ] + self.set_pointer_barriers(barriers) + self.connect_to_eis() + + disabled_signal_received = False + + def cb_disabled(session_handle, options): + nonlocal disabled_signal_received + disabled_signal_received = True + assert session_handle == session_handle + + inputcapture_intf = self.get_dbus_interface() + inputcapture_intf.connect_to_signal("Disabled", cb_disabled) + + self.enable() + + mainloop = GLib.MainLoop() + GLib.timeout_add(500, mainloop.quit) + mainloop.run() + + assert disabled_signal_received + + def test_activated_signal(self): + params = { + "activated-delay": 200, + "deactivated-delay": 300, + } + self.start_impl_portal(params) + self.start_xdp() + + self.create_session() + self.get_zones() + # The default zone is 1920x1080 + barriers = [ + { + "barrier_id": dbus.UInt32(10, variant_level=1), + "position": dbus.Struct( + [0, 0, 1920, 0], signature="iiii", variant_level=1 + ), + }, + ] + self.set_pointer_barriers(barriers) + self.connect_to_eis() + + disabled_signal_received = False + activated_signal_received = False + deactivated_signal_received = False + + def cb_disabled(session_handle, options): + nonlocal disabled_signal_received + disabled_signal_received = True + + def cb_activated(session_handle, options): + nonlocal activated_signal_received + activated_signal_received = True + assert session_handle == session_handle + assert "serial" in options + assert "barrier_id" in options + assert options["barrier_id"] == 10 # template uses first barrier + assert "cursor_position" in options + assert options["cursor_position"] == (10, 20) # template uses x+10, y+20 of first barrier + + def cb_deactivated(session_handle, options): + nonlocal deactivated_signal_received + deactivated_signal_received = True + assert session_handle == session_handle + assert "serial" in options + assert "cursor_position" in options + assert options["cursor_position"] == (10, 20) # template uses x+10, y+20 of first barrier + + inputcapture_intf = self.get_dbus_interface() + inputcapture_intf.connect_to_signal("Activated", cb_activated) + inputcapture_intf.connect_to_signal("Deactivated", cb_deactivated) + inputcapture_intf.connect_to_signal("Disabled", cb_disabled) + + self.enable() + + mainloop = GLib.MainLoop() + GLib.timeout_add(500, mainloop.quit) + mainloop.run() + + assert activated_signal_received + assert deactivated_signal_received + assert not disabled_signal_received + + # Disabling should not trigger the signal + self.disable() + + mainloop = GLib.MainLoop() + GLib.timeout_add(500, mainloop.quit) + mainloop.run() + + assert not disabled_signal_received + + def test_release(self): + params = { + "activated-delay": 200, + "deactivated-delay": 1000, + "disabled-delay": 1200, + } + self.start_impl_portal(params) + self.start_xdp() + + self.create_session() + self.get_zones() + # The default zone is 1920x1080 + barriers = [ + { + "barrier_id": dbus.UInt32(10, variant_level=1), + "position": dbus.Struct( + [0, 0, 1920, 0], signature="iiii", variant_level=1 + ), + }, + ] + self.set_pointer_barriers(barriers) + self.connect_to_eis() + + disabled_signal_received = False + activated_signal_received = False + deactivated_signal_received = False + + def cb_disabled(session_handle, options): + nonlocal disabled_signal_received + disabled_signal_received = True + + def cb_activated(session_handle, options): + nonlocal activated_signal_received + activated_signal_received = True + + def cb_deactivated(session_handle, options): + nonlocal deactivated_signal_received + deactivated_signal_received = True + + inputcapture_intf = self.get_dbus_interface() + inputcapture_intf.connect_to_signal("Disabled", cb_activated) + inputcapture_intf.connect_to_signal("Activated", cb_activated) + inputcapture_intf.connect_to_signal("Deactivated", cb_deactivated) + + self.enable() + + mainloop = GLib.MainLoop() + GLib.timeout_add(300, mainloop.quit) + mainloop.run() + + assert activated_signal_received + assert not deactivated_signal_received + assert not disabled_signal_received + + self.release(cursor_position=(10, 50)) + + # XDP should filter any signals the implementation may + # send after Release(). + + mainloop = GLib.MainLoop() + GLib.timeout_add(1000, mainloop.quit) + mainloop.run() + + # Release() implies deactivated + assert not deactivated_signal_received + assert not disabled_signal_received