Skip to content

Commit

Permalink
Added dynamic X keyboard event monitoring to qvm_start_daemon.py
Browse files Browse the repository at this point in the history
Update keyboard_layout property whenever guivm's layout changes, instead of
only at the start.

requires QubesOS/qubes-core-admin#350

references QubesOS/qubes-issues#1396
references QubesOS/qubes-issues#4294
  • Loading branch information
marmarta committed Jul 15, 2020
1 parent ae39c75 commit 1446a6d
Show file tree
Hide file tree
Showing 3 changed files with 236 additions and 47 deletions.
149 changes: 104 additions & 45 deletions qubesadmin/tools/qvm_start_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import qubesadmin.exc
import qubesadmin.tools
import qubesadmin.vm
from . import xcffibhelpers

have_events = False
try:
Expand Down Expand Up @@ -73,6 +74,106 @@
""")


class KeyboardLayout:
"""Class to store and parse X Keyboard layout data"""
# pylint: disable=too-few-public-methods
def __init__(self, binary_string):
split_string = binary_string.split(b'\0')
self.languages = split_string[2].decode().split(',')
self.variants = split_string[3].decode().split(',')
self.options = split_string[4].decode()

def get_property(self, layout_num):
"""Return the selected keyboard layout as formatted for keyboard_layout
property."""
return '+'.join([self.languages[layout_num],
self.variants[layout_num],
self.options])


class XWatcher:
"""Watch and react for X events related to the keyboard layout changes."""
def __init__(self, conn, app):
self.app = app
self.current_vm = self.app.domains[self.app.local_name]

self.conn = conn
self.ext = self.initialize_extension()

# get root window
self.setup = self.conn.get_setup()
self.root = self.setup.roots[0].root

# atoms (strings) of events we need to watch
# keyboard layout was switched
self.atom_xklavier = self.conn.core.InternAtom(
False, len("XKLAVIER_ALLOW_SECONDARY"),
"XKLAVIER_ALLOW_SECONDARY").reply().atom
# keyboard layout was changed
self.atom_xkb_rules = self.conn.core.InternAtom(
False, len("_XKB_RULES_NAMES"),
"_XKB_RULES_NAMES").reply().atom

self.conn.core.ChangeWindowAttributesChecked(
self.root, xcffib.xproto.CW.EventMask,
[xcffib.xproto.EventMask.PropertyChange])
self.conn.flush()

# initialize state
self.keyboard_layout = KeyboardLayout(self.get_keyboard_layout())
self.selected_layout = self.get_selected_layout()

def initialize_extension(self):
"""Initialize XKB extension (not supported by xcffib by default"""
ext = self.conn(xcffibhelpers.key)
ext.UseExtension()
return ext

def get_keyboard_layout(self):
"""Check what is current keyboard layout definition"""
property_cookie = self.conn.core.GetProperty(
False, # delete
self.root, # window
self.atom_xkb_rules,
xcffib.xproto.Atom.STRING,
0, 1000
)
prop_reply = property_cookie.reply()
return prop_reply.value.buf()

def get_selected_layout(self):
"""Check which keyboard layout is currently selected"""
state_reply = self.ext.GetState().reply()
return state_reply.lockedGroup[0]

def update_keyboard_layout(self):
"""Update current vm's keyboard_layout property"""
new_property = self.keyboard_layout.get_property(
self.selected_layout)

current_property = self.current_vm.keyboard_layout

if new_property != current_property:
self.current_vm.keyboard_layout = new_property

def event_reader(self, callback):
"""Poll for X events related to keyboard layout"""
try:
for event in iter(self.conn.poll_for_event, None):
if isinstance(event, xcffib.xproto.PropertyNotifyEvent):
if event.atom == self.atom_xklavier:
self.selected_layout = self.get_selected_layout()
elif event.atom == self.atom_xkb_rules:
self.keyboard_layout = KeyboardLayout(
self.get_keyboard_layout())
else:
continue

self.update_keyboard_layout()
except xcffib.ConnectionException:
callback()


def get_monitor_layout():
"""Get list of monitors and their size/position"""
outputs = []
Expand Down Expand Up @@ -114,31 +215,6 @@ def get_monitor_layout():
return outputs


def set_keyboard_layout(vm):
"""Set layout configuration into features for Gui admin extension"""
try:
# Examples of 'xprop -root _XKB_RULES_NAMES' output values:
# "evdev", "pc105", "fr", "oss", ""
# "evdev", "pc105", "pl,us", ",", "grp:win_switch,compose:caps"

# We use the first layout provided
xkb_re = r'_XKB_RULES_NAMES\(STRING\) = ' \
r'\"(.*)\", \"(.*)\", \"(.*)\", \"(.*)\", \"(.*)\"\n'
xkb_rules_names = subprocess.check_output(
['xprop', '-root', '_XKB_RULES_NAMES']).decode()
xkb_parsed = re.match(xkb_re, xkb_rules_names)
if xkb_parsed:
xkb_layout = [x.split(',')[0] for x in xkb_parsed.groups()[2:4]]
# We keep all options
xkb_layout.append(xkb_parsed.group(5))
keyboard_layout = '+'.join(xkb_layout)
vm.features['keyboard-layout'] = keyboard_layout
else:
vm.log.warning('Failed to parse layout for %s', vm)
except subprocess.CalledProcessError as e:
vm.log.warning('Failed to set layout for %s: %s', vm, str(e))


class DAEMONLauncher:
"""Launch GUI/AUDIO daemon for VMs"""

Expand Down Expand Up @@ -461,17 +537,6 @@ def register_events(self, events):
events.add_handler('connection-established',
self.on_connection_established)


def x_reader(conn, callback):
"""Try reading something from X connection to check if it's still alive.
In case it isn't, call *callback*.
"""
try:
conn.poll_for_event()
except xcffib.ConnectionException:
callback()


if 'XDG_RUNTIME_DIR' in os.environ:
pidfile_path = os.path.join(os.environ['XDG_RUNTIME_DIR'],
'qvm-start-daemon.pid')
Expand All @@ -492,9 +557,6 @@ def x_reader(conn, callback):
parser.add_argument('--notify-monitor-layout', action='store_true',
help='Notify running instance in --watch mode'
' about changed monitor layout')
parser.add_argument('--set-keyboard-layout', action='store_true',
help='Set keyboard layout values into GuiVM features.'
'This option is implied by --watch')
# Add it for the help only
parser.add_argument('--force', action='store_true', default=False,
help='Force running daemon without enabled services'
Expand All @@ -515,11 +577,6 @@ def main(args=None):
parser.error('--watch option must be used with --all')
if args.watch and args.notify_monitor_layout:
parser.error('--watch cannot be used with --notify-monitor-layout')
if args.watch and 'guivm-gui-agent' in enabled_services:
args.set_keyboard_layout = True
if args.set_keyboard_layout or os.path.exists('/etc/qubes-release'):
guivm = args.app.domains.get_blind(args.app.local_name)
set_keyboard_layout(guivm)
launcher = DAEMONLauncher(args.app)
if args.watch:
if not have_events:
Expand All @@ -541,8 +598,10 @@ def main(args=None):
launcher.send_monitor_layout_all)

conn = xcffib.connect()
x_watcher = XWatcher(conn, args.app)
x_fd = conn.get_file_descriptor()
loop.add_reader(x_fd, x_reader, conn, events_listener.cancel)
loop.add_reader(x_fd, x_watcher.event_reader,
events_listener.cancel)

try:
loop.run_until_complete(events_listener)
Expand Down
128 changes: 128 additions & 0 deletions qubesadmin/tools/xcffibhelpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# -*- encoding: utf8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2020 Marek Marczykowski-Górecki
# <[email protected]>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program; if not, see <http://www.gnu.org/licenses/>.
"""
This is a set of helper classes, designed to facilitate importing an X extension
that's not supported by default by xcffib.
"""
import io
import struct
import xcffib



class XkbUseExtensionReply(xcffib.Reply):
"""Helper class to parse XkbUseExtensionReply
Contains hardcoded values based on X11/XKBproto.h"""
# pylint: disable=too-few-public-methods
def __init__(self, unpacker):
if isinstance(unpacker, xcffib.Protobj):
unpacker = xcffib.MemoryUnpacker(unpacker.pack())
xcffib.Reply.__init__(self, unpacker)
base = unpacker.offset
self.major_version, self.minor_version = unpacker.unpack(
"xx2x4xHH4x4x4x4x")
self.bufsize = unpacker.offset - base


class XkbUseExtensionCookie(xcffib.Cookie):
"""Helper class for use in loading Xkb extension"""
reply_type = XkbUseExtensionReply


class XkbGetStateReply(xcffib.Reply):
"""Helper class to parse XkbGetState; copy&paste from X11/XKBproto.h"""
# pylint: disable=too-few-public-methods
_typedef = """
BYTE type;
BYTE deviceID;
CARD16 sequenceNumber B16;
CARD32 length B32;
CARD8 mods;
CARD8 baseMods;
CARD8 latchedMods;
CARD8 lockedMods;
CARD8 group;
CARD8 lockedGroup;
INT16 baseGroup B16;
INT16 latchedGroup B16;
CARD8 compatState;
CARD8 grabMods;
CARD8 compatGrabMods;
CARD8 lookupMods;
CARD8 compatLookupMods;
CARD8 pad1;
CARD16 ptrBtnState B16;
CARD16 pad2 B16;
CARD32 pad3 B32;"""
_type_mapping = {
"BYTE": "B",
"CARD16": "H",
"CARD8": "B",
"CARD32": "I",
"INT16": "h",
}

def __init__(self, unpacker):
if isinstance(unpacker, xcffib.Protobj):
unpacker = xcffib.MemoryUnpacker(unpacker.pack())
xcffib.Reply.__init__(self, unpacker)
base = unpacker.offset

# dynamic parse of copy&pasted struct content, for easy re-usability
for line in self._typedef.splitlines():
line = line.strip()
line = line.rstrip(';')
if not line:
continue
typename, name = line.split()[:2] # ignore optional third part
setattr(self, name, unpacker.unpack(self._type_mapping[typename]))

self.bufsize = unpacker.offset - base


class XkbGetStateCookie(xcffib.Cookie):
"""Helper class for use in parsing Xkb GetState"""
reply_type = XkbGetStateReply


class XkbExtension(xcffib.Extension):
"""Helper class to load and use Xkb xcffib extension; needed
because there is not XKB support in xcffib."""
# pylint: disable=invalid-name,missing-function-docstring
def UseExtension(self, is_checked=True):
buf = io.BytesIO()
buf.write(struct.pack("=xx2xHH", 1, 0))
return self.send_request(0, buf, XkbGetStateCookie,
is_checked=is_checked)

def GetState(self, deviceSpec=0x100, is_checked=True):
buf = io.BytesIO()
buf.write(struct.pack("=xx2xHxx", deviceSpec))
return self.send_request(4, buf, XkbGetStateCookie,
is_checked=is_checked)


key = xcffib.ExtensionKey("XKEYBOARD")
# this is a lie: there are events and errors types
_events = {}
_errors = {}

# pylint: disable=protected-access
xcffib._add_ext(key, XkbExtension, _events, _errors)
6 changes: 4 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ def get_console_scripts():
if sys.version_info[0:2] >= (3, 4):
for filename in os.listdir('./qubesadmin/tools'):
basename, ext = os.path.splitext(os.path.basename(filename))
if basename in ['__init__', 'dochelpers'] or ext != '.py':
if basename in ['__init__', 'dochelpers', 'xcffibhelpers']\
or ext != '.py':
continue
yield basename.replace('_', '-'), 'qubesadmin.tools.{}'.format(basename)
yield basename.replace('_', '-'), 'qubesadmin.tools.{}'.format(
basename)

# create simple scripts that run much faster than "console entry points"
class CustomInstall(setuptools.command.install.install):
Expand Down

0 comments on commit 1446a6d

Please sign in to comment.