From 9a78a1336bd73d3f4b20f4c89e7c6f1e8260b0ea Mon Sep 17 00:00:00 2001 From: totaam Date: Sun, 28 May 2023 17:41:05 +0700 Subject: [PATCH] more mypy inspired type hints and tweaks --- fs/share/config/mypy/config | 25 +- xpra/child_reaper.py | 2 +- xpra/client/mixins/audio.py | 57 +-- xpra/clipboard/clipboard_core.py | 485 +++++++++++---------- xpra/clipboard/clipboard_timeout_helper.py | 26 +- xpra/codecs/gstreamer/capture.py | 4 +- xpra/codecs/loader.py | 2 +- xpra/gst_pipeline.py | 2 +- xpra/net/compression.py | 45 +- xpra/net/net_util.py | 16 +- xpra/scripts/config.py | 18 +- xpra/server/mixins/child_command_server.py | 82 ++-- xpra/server/mixins/stub_server_mixin.py | 4 +- xpra/x11/gtk_x11/clipboard.py | 4 +- xpra/x11/gtk_x11/tray.py | 2 + xpra/x11/server.py | 6 +- xpra/x11/server_keyboard_config.py | 2 +- 17 files changed, 401 insertions(+), 381 deletions(-) diff --git a/fs/share/config/mypy/config b/fs/share/config/mypy/config index d5b1e78b00..aa51be682f 100644 --- a/fs/share/config/mypy/config +++ b/fs/share/config/mypy/config @@ -10,6 +10,9 @@ ignore_missing_imports = True [mypy-psutil.*] ignore_missing_imports = True +[mypy-cpuinfo.*] +ignore_missing_imports = True + [mypy-certifi.*] ignore_missing_imports = True @@ -43,6 +46,9 @@ ignore_missing_imports = True [mypy-avahi.*] ignore_missing_imports = True +[mypy-_ssl.*] +ignore_missing_imports = True + [mypy-aioquic.*] ignore_missing_imports = True @@ -64,7 +70,7 @@ ignore_missing_imports = True [mypy-xdg.*] ignore_missing_imports = True -[mypy-xpra.gtk_common.gdk_bindings.*] +[mypy-xpra.gtk_common.gtk3.gdk_bindings.*] ignore_missing_imports = True [mypy-xpra.x11.gtk3.gdk_bindings.*] @@ -85,6 +91,9 @@ ignore_missing_imports = True [mypy-dbus.*] ignore_missing_imports = True +[mypy-xpra.buffers.membuf.*] +ignore_missing_imports = True + [mypy-xpra.buffers.cyxor.*] ignore_missing_imports = True @@ -142,22 +151,20 @@ ignore_missing_imports = True [mypy-comtypes.*] ignore_missing_imports = True -[mypy-ctypes.windll] +[mypy-ctypes.*] ignore_missing_imports = True +disable_error_code = attr-defined -[mypy-ctypes.WinDLL] -ignore_missing_imports = True - -[mypy-ctypes.WINFUNCTYPE] +[mypy-xpra.platform.darwin.*] ignore_missing_imports = True -[mypy-ctypes.GetLastError] +[mypy-objc.*] ignore_missing_imports = True -[mypy-xpra.platform.darwin.*] +[mypy-PyObjCTools.*] ignore_missing_imports = True -[mypy-objc.*] +[mypy-Foundation.*] ignore_missing_imports = True [mypy-Quartz.*] diff --git a/xpra/child_reaper.py b/xpra/child_reaper.py index 2317822bfb..3b09a8caef 100644 --- a/xpra/child_reaper.py +++ b/xpra/child_reaper.py @@ -241,7 +241,7 @@ def getChildReaper() -> ChildReaper: def reaper_cleanup() -> None: s = singleton if s is not None: - singleton.cleanup() + s.cleanup() #keep it around, #so we don't try to reinitialize it from the wrong thread #(signal requires the main thread) diff --git a/xpra/client/mixins/audio.py b/xpra/client/mixins/audio.py index ca06581545..52279f2a90 100644 --- a/xpra/client/mixins/audio.py +++ b/xpra/client/mixins/audio.py @@ -3,7 +3,7 @@ # Xpra is released under the terms of the GNU GPL v2, or, at your option, any # later version. See the file COPYING for details. -from typing import Dict, Any +from typing import Dict, Any, Callable, Tuple, Optional from gi.repository import GLib # @UnresolvedImport from xpra.platform.paths import get_icon_filename @@ -33,31 +33,31 @@ class AudioClient(StubClientMixin): def __init__(self): super().__init__() self.audio_source_plugin = None - self.speaker_allowed = False - self.speaker_enabled = False + self.speaker_allowed : bool = False + self.speaker_enabled : bool = False self.speaker_codecs = [] - self.microphone_allowed = False - self.microphone_enabled = False + self.microphone_allowed : bool = False + self.microphone_enabled : bool = False self.microphone_codecs = [] self.microphone_device = None - self.av_sync = False - self.av_sync_delta = AV_SYNC_DELTA + self.av_sync : bool = False + self.av_sync_delta : int = AV_SYNC_DELTA #audio state: - self.on_sink_ready = None + self.on_sink_ready : Optional[Callable] = None self.audio_sink = None - self.audio_sink_sequence = 0 - self.server_audio_eos_sequence = False + self.audio_sink_sequence : int = 0 + self.server_audio_eos_sequence : bool = False self.audio_source = None - self.audio_source_sequence = 0 - self.audio_in_bytecount = 0 - self.audio_out_bytecount = 0 - self.server_av_sync = False + self.audio_source_sequence : int = 0 + self.audio_in_bytecount : int = 0 + self.audio_out_bytecount : int = 0 + self.server_av_sync : bool = False self.server_pulseaudio_id = None self.server_pulseaudio_server = None self.server_audio_decoders = [] self.server_audio_encoders = [] - self.server_audio_receive = False - self.server_audio_send = False + self.server_audio_receive : int = False + self.server_audio_send : int = False self.queue_used_sent = None #duplicated from ServerInfo mixin: self._remote_machine_id = None @@ -73,8 +73,9 @@ def init(self, opts) -> None: if self.microphone_allowed and len(mic)==2: self.microphone_device = mic[1] self.audio_source_plugin = opts.audio_source - def audio_option_or_all(*_args): + def nooptions(*_args): return [] + audio_option_fn : Callable = nooptions if self.speaker_allowed or self.microphone_allowed: try: from xpra.audio import common @@ -88,6 +89,7 @@ def audio_option_or_all(*_args): else: try: from xpra.audio.common import audio_option_or_all + audio_option_fn = audio_option_or_all from xpra.audio.wrapper import query_audio self.audio_properties = query_audio() assert self.audio_properties, "query did not return any data" @@ -104,8 +106,8 @@ def audio_option_or_all(*_args): self.microphone_allowed = False encoders = self.audio_properties.strtupleget("encoders") decoders = self.audio_properties.strtupleget("decoders") - self.speaker_codecs = audio_option_or_all("speaker-codec", opts.speaker_codec, decoders) - self.microphone_codecs = audio_option_or_all("microphone-codec", opts.microphone_codec, encoders) + self.speaker_codecs = audio_option_fn("speaker-codec", opts.speaker_codec, decoders) + self.microphone_codecs = audio_option_fn("microphone-codec", opts.microphone_codec, encoders) if not self.speaker_codecs: self.speaker_allowed = False if not self.microphone_codecs: @@ -181,7 +183,8 @@ def get_audio_capabilities(self) -> Dict[str,Any]: "send" : self.microphone_allowed, "receive" : self.speaker_allowed, } - sp = self.audio_properties + #make mypy happy about the type: convert typedict to dict with string keys + sp : Dict[str,Any] = dict((str(k), v) for k,v in self.audio_properties.items()) if FULL_INFO<2: #only expose these specific keys: sp = dict((k,v) for k,v in sp.items() if k in ( @@ -258,7 +261,7 @@ def get_matching_codecs(self, local_codecs, server_codecs): log("get_matching_codecs(%s, %s)=%s", local_codecs, server_codecs, matching_codecs) return matching_codecs - def may_notify_audio(self, summary, body) -> None: + def may_notify_audio(self, summary:str, body:str) -> None: #overridden in UI client subclass pass @@ -355,7 +358,7 @@ def audio_source_state_changed(*_args): log.estr(e) return False - def new_stream(self, audio_source, codec): + def new_stream(self, audio_source, codec:str): log("new_stream(%s)", codec) if self.audio_source!=audio_source: log("dropping new-stream signal (current source=%s, signal source=%s)", self.audio_source, audio_source) @@ -422,7 +425,7 @@ def sink_ready(*args): self.emit("speaker-changed") log("start_receiving_audio() done, speaker_enabled=%s", enabled) - def stop_receiving_audio(self, tell_server=True) -> None: + def stop_receiving_audio(self, tell_server:bool=True) -> None: """ ask the server to stop sending audio, toggle flag so we ignore further packets and emit client signal """ log("stop_receiving_audio(%s) audio sink=%s", tell_server, self.audio_sink) ss = self.audio_sink @@ -440,7 +443,7 @@ def stop_receiving_audio(self, tell_server=True) -> None: ss.cleanup() log("stop_receiving_audio(%s) done", tell_server) - def audio_sink_state_changed(self, audio_sink, state) -> None: + def audio_sink_state_changed(self, audio_sink, state:str) -> None: if audio_sink!=self.audio_sink: log("audio_sink_state_changed(%s, %s) not the current sink, ignoring it", audio_sink, state) return @@ -449,7 +452,7 @@ def audio_sink_state_changed(self, audio_sink, state) -> None: if not self.on_sink_ready(): self.on_sink_ready = None self.emit("speaker-changed") - def audio_sink_bitrate_changed(self, audio_sink, bitrate) -> None: + def audio_sink_bitrate_changed(self, audio_sink, bitrate:int) -> None: if audio_sink!=self.audio_sink: log("audio_sink_bitrate_changed(%s, %s) not the current sink, ignoring it", audio_sink, bitrate) return @@ -495,7 +498,7 @@ def audio_sink_exit(self, audio_sink, *args) -> None: ss.codec = "" self.stop_receiving_audio() - def start_audio_sink(self, codec) -> bool: + def start_audio_sink(self, codec:str) -> bool: log("start_audio_sink(%s)", codec) assert self.audio_sink is None, "audio sink already exists!" try: @@ -544,7 +547,7 @@ def send_audio_sync(self, v) -> None: ###################################################################### #packet handlers - def _process_sound_data(self, packet) -> None: + def _process_sound_data(self, packet:Tuple) -> None: codec, data, metadata = packet[1:4] codec = bytestostr(codec) metadata = typedict(metadata) diff --git a/xpra/clipboard/clipboard_core.py b/xpra/clipboard/clipboard_core.py index 0ffedb07a0..12a9ed9519 100644 --- a/xpra/clipboard/clipboard_core.py +++ b/xpra/clipboard/clipboard_core.py @@ -9,7 +9,7 @@ import re from time import monotonic from io import BytesIO -from typing import Tuple, List, Dict, Callable, Optional, Any +from typing import Tuple, List, Dict, Callable, Optional, Any, Iterable from gi.repository import GLib # @UnresolvedImport from xpra.net.compression import Compressible @@ -82,7 +82,7 @@ def must_discard_extra(target:str) -> bool: return any(x for x in DISCARD_EXTRA_TARGETS if x.match(target)) -def _filter_targets(targets): +def _filter_targets(targets : Iterable[str]) -> Tuple[str, ...]: targets_strs = tuple(bytestostr(x) for x in targets) f = tuple(target for target in targets_strs if not must_discard(target)) log("_filter_targets(%s)=%s", csv(targets_strs), f) @@ -90,10 +90,225 @@ def _filter_targets(targets): #CARD32 can actually be 64-bits... CARD32_SIZE : int = sizeof_long*8 -def get_format_size(dformat): +def get_format_size(dformat:int) -> int: return max(8, {32 : CARD32_SIZE}.get(dformat, dformat)) +class ClipboardProxyCore: + def __init__(self, selection): + self._selection : str = selection + self._enabled : bool = False + self._have_token : bool = False + #enabled later during setup + self._can_send : bool = False + self._can_receive : bool = False + #clients that need a new token for every owner-change: (ie: win32 and osx) + #(forces the client to request new contents - prevents stale clipboard data) + self._greedy_client : bool = False + self._want_targets : bool = False + #semaphore to block the sending of the token when we change the owner ourselves: + self._block_owner_change : int = 0 + self._last_emit_token : float = 0 + self._emit_token_timer : int = 0 + #counters for info: + self._selection_request_events : int = 0 + self._selection_get_events : int = 0 + self._selection_clear_events : int = 0 + self._sent_token_events : int = 0 + self._got_token_events : int = 0 + self._get_contents_events : int = 0 + self._request_contents_events : int = 0 + self._last_targets = () + self.preferred_targets = [] + + def set_direction(self, can_send : bool, can_receive : bool) -> None: + self._can_send = can_send + self._can_receive = can_receive + + def set_want_targets(self, want_targets) -> None: + self._want_targets = want_targets + + + def get_info(self) -> Dict[str,Any]: + info : Dict[str,Any] = { + "have_token" : self._have_token, + "enabled" : self._enabled, + "greedy_client" : self._greedy_client, + "preferred-targets" : self.preferred_targets, + "blocked_owner_change" : self._block_owner_change, + "last-targets" : self._last_targets, + "event" : { + "selection_request" : self._selection_request_events, + "selection_get" : self._selection_get_events, + "selection_clear" : self._selection_clear_events, + "got_token" : self._got_token_events, + "sent_token" : self._sent_token_events, + "get_contents" : self._get_contents_events, + "request_contents" : self._request_contents_events, + }, + } + return info + + def cleanup(self) -> None: + self._enabled = False + self.cancel_emit_token() + self.cancel_unblock() + + def is_enabled(self) -> bool: + return self._enabled + + def set_enabled(self, enabled : bool) -> None: + log("%s.set_enabled(%s)", self, enabled) + self._enabled = enabled + + def set_greedy_client(self, greedy : bool) -> None: + log("%s.set_greedy_client(%s)", self, greedy) + self._greedy_client = greedy + + def set_preferred_targets(self, preferred_targets) -> None: + self.preferred_targets = preferred_targets + + + def __repr__(self): + return "ClipboardProxyCore(%s)" % self._selection + + def do_owner_changed(self) -> None: + #an application on our side owns the clipboard selection + #(they are ready to provide something via the clipboard) + log("clipboard: %s owner_changed, enabled=%s, "+ + "can-send=%s, can-receive=%s, have_token=%s, greedy_client=%s, block_owner_change=%s", + bytestostr(self._selection), self._enabled, self._can_send, self._can_receive, + self._have_token, self._greedy_client, self._block_owner_change) + if not self._enabled or self._block_owner_change: + return + if self._have_token or ((self._greedy_client or self._want_targets) and self._can_send): + self.schedule_emit_token() + + def schedule_emit_token(self, min_delay:int=0) -> None: + if min_delay==0 and (self._have_token or (not self._want_targets and not self._greedy_client) or DELAY_SEND_TOKEN<0): + #token ownership will change or told not to wait + GLib.idle_add(self.emit_token) + elif not self._emit_token_timer: + #we already had sent the token, + #or sending it is expensive, so wait a bit: + self.do_schedule_emit_token(min_delay) + + def do_schedule_emit_token(self, min_delay:int=0) -> None: + now = monotonic() + elapsed = int((now-self._last_emit_token)*1000) + delay = max(min_delay, DELAY_SEND_TOKEN-elapsed) + log("do_schedule_emit_token(%i) selection=%s, elapsed=%i (max=%i), delay=%i", + min_delay, self._selection, elapsed, DELAY_SEND_TOKEN, delay) + if delay<=0: + #enough time has passed + self.emit_token() + else: + self._emit_token_timer = GLib.timeout_add(delay, self.emit_token) + + def emit_token(self) -> None: + self._emit_token_timer = 0 + boc = self._block_owner_change + if not boc: + self._block_owner_change = GLib.idle_add(self.remove_block) + self._have_token = False + self._last_emit_token = monotonic() + self.do_emit_token() + self._sent_token_events += 1 + + def do_emit_token(self): + #self.emit("send-clipboard-token") + pass + + def cancel_emit_token(self) -> None: + ett = self._emit_token_timer + if ett: + self._emit_token_timer = 0 + GLib.source_remove(ett) + + def cancel_unblock(self) -> None: + boc = self._block_owner_change + if boc: + self._block_owner_change = 0 + GLib.source_remove(boc) + + #def do_selection_request_event(self, event): + # pass + + #def do_selection_get(self, selection_data, info, time): + # pass + + #def do_selection_clear_event(self, event): + # pass + + def remove_block(self, *_args) -> None: + log("remove_block: %s", self._selection) + self._block_owner_change = 0 + + def claim(self) -> None: + """ + Subclasses may want to take ownership of the clipboard selection. + The X11 clipboard does. + """ + + # This function is called by the xpra core when the peer has requested the + # contents of this clipboard: + def get_contents(self, target:str, got_contents:Callable) -> None: + pass + + + def filter_data(self, dtype:str="" , dformat:int=0, data=None, trusted:bool=False, output_dtype=None): + log("filter_data(%s, %s, %i %s, %s, %s)", + dtype, dformat, len(data), type(data), trusted, output_dtype) + if not data: + return data + IMAGE_OVERLAY = os.environ.get("XPRA_CLIPBOARD_IMAGE_OVERLAY", None) + if IMAGE_OVERLAY and not os.path.exists(IMAGE_OVERLAY): + IMAGE_OVERLAY = None + IMAGE_STAMP = envbool("XPRA_CLIPBOARD_IMAGE_STAMP", False) + SANITIZE_IMAGES = envbool("XPRA_SANITIZE_IMAGES", True) + if dtype in ("image/png", "image/jpeg", "image/tiff") and ( + (output_dtype is not None and dtype!=output_dtype) or + IMAGE_STAMP or + IMAGE_OVERLAY or + (SANITIZE_IMAGES and not trusted) + ): + # pylint: disable=import-outside-toplevel + from xpra.codecs.pillow.decoder import open_only + img_type = dtype.split("/")[-1] + img = open_only(data, (img_type, )) + has_alpha = img.mode=="RGBA" + if not has_alpha and IMAGE_OVERLAY: + img = img.convert("RGBA") + w, h = img.size + if IMAGE_OVERLAY: + from PIL import Image #@UnresolvedImport + overlay = Image.open(IMAGE_OVERLAY) + if overlay.mode!="RGBA": + log.warn("Warning: cannot use overlay image '%s'", IMAGE_OVERLAY) + log.warn(" invalid mode '%s'", overlay.mode) + else: + log("adding clipboard image overlay to %s", dtype) + overlay_resized = overlay.resize((w, h), Image.ANTIALIAS) + composite = Image.alpha_composite(img, overlay_resized) + if not has_alpha and img.mode=="RGBA": + composite = composite.convert("RGB") + img = composite + if IMAGE_STAMP: + log("adding clipboard image stamp to %s", dtype) + from datetime import datetime + from PIL import ImageDraw + img_draw = ImageDraw.Draw(img) + w, h = img.size + img_draw.text((10, max(0, h//2-16)), 'via Xpra, %s' % datetime.now().isoformat(), fill='black') + #now save it: + img_type = (output_dtype or dtype).split("/")[-1] + buf = BytesIO() + img.save(buf, img_type.upper()) #ie: "PNG" + data = buf.getvalue() + buf.close() + return data + + class ClipboardProtocolHelperCore: def __init__(self, send_packet_cb, progress_cb=None, **kwargs): d = typedict(kwargs) @@ -123,7 +338,14 @@ def __init__(self, send_packet_cb, progress_cb=None, **kwargs): self.init_proxies(d.strtupleget("clipboards.local", CLIPBOARDS)) self.remote_clipboards = d.strtupleget("clipboards.remote", CLIPBOARDS) - def init_translation(self, kwargs): + def init_proxies(self, selections : Iterable[str]) -> None: + self._clipboard_proxies : Dict[str, ClipboardProxyCore] = {} + for selection in selections: + proxy = self.make_proxy(selection) + self._clipboard_proxies[selection] = proxy + log("%s.init_proxies : %s", self, self._clipboard_proxies) + + def init_translation(self, kwargs) -> None: def getselection(name) -> Optional[str]: v = kwargs.get(f"clipboard.{name}") #ie: clipboard.remote env_value = os.environ.get(f"XPRA_TRANSLATEDCLIPBOARD_{name.upper()}_SELECTION") @@ -229,23 +451,16 @@ def init_packet_handlers(self): "clipboard-enable-selections" : self._process_clipboard_enable_selections, } - def make_proxy(self, selection): + def make_proxy(self, selection:str) -> ClipboardProxyCore: raise NotImplementedError() - def init_proxies(self, selections) -> None: - self._clipboard_proxies = {} - for selection in selections: - proxy = self.make_proxy(selection) - self._clipboard_proxies[selection] = proxy - log("%s.init_proxies : %s", self, self._clipboard_proxies) - def init_proxies_claim(self) -> None: for proxy in self._clipboard_proxies.values(): proxy.claim() # Used by the client during startup: - def send_tokens(self, selections=()) -> None: + def send_tokens(self, selections:Iterable[str]=()) -> None: log("send_tokens(%s)", selections) for selection in selections: proxy = self._clipboard_proxies.get(selection) @@ -258,7 +473,7 @@ def send_all_tokens(self) -> None: self.send_tokens(tuple(self._clipboard_proxies.keys())) - def _process_clipboard_token(self, packet) -> None: + def _process_clipboard_token(self, packet:Tuple) -> None: selection = bytestostr(packet[1]) name = self.remote_to_local(selection) proxy = self._clipboard_proxies.get(name) @@ -301,13 +516,13 @@ def _process_clipboard_token(self, packet) -> None: synchronous_client = len(packet)>=11 and bool(packet[10]) proxy.got_token(targets, target_data, claim, synchronous_client) - def local_targets(self, remote_targets): + def local_targets(self, remote_targets:Iterable[str]): return _filter_targets(remote_targets) - def remote_targets(self, local_targets): + def remote_targets(self, local_targets:Iterable[str]): return _filter_targets(local_targets) - def _munge_raw_selection_to_wire(self, target, dtype, dformat:int, data) -> Tuple[Any,Any]: + def _munge_raw_selection_to_wire(self, target:str, dtype:str, dformat:int, data) -> Tuple[Any,Any]: log("_munge_raw_selection_to_wire%s", (target, dtype, dformat, repr_ellipsized(bytestostr(data)))) # Some types just cannot be marshalled: if dtype in ("WINDOW", "PIXMAP", "BITMAP", "DRAWABLE", @@ -327,7 +542,7 @@ def _munge_raw_selection_to_wire(self, target, dtype, dformat:int, data) -> Tupl log.error(" dtype=%s, dformat=%s, data=%s (%s)", dtype, dformat, repr_ellipsized(str(data)), type(data)) raise - def _do_munge_raw_selection_to_wire(self, target, dtype, dformat:int, data) -> Tuple[Any,Any]: + def _do_munge_raw_selection_to_wire(self, target:str, dtype:str, dformat:int, data) -> Tuple[Any,Any]: """ this method is overridden in xclipboard to parse X11 atoms """ # Other types need special handling, and all types need to be # converting into an endian-neutral format: @@ -355,7 +570,7 @@ def _do_munge_raw_selection_to_wire(self, target, dtype, dformat:int, data) -> T log.error(f"Error: unhandled format {dformat} for clipboard data type {dtype}") return None, None - def _munge_wire_selection_to_raw(self, encoding:str, dtype, dformat:int, data) -> bytes: + def _munge_wire_selection_to_raw(self, encoding:str, dtype:str, dformat:int, data) -> bytes: log("wire selection to raw, encoding=%s, type=%s, format=%s, len(data)=%s", encoding, dtype, dformat, len(data or b"")) if self.max_clipboard_receive_size > 0: @@ -385,7 +600,7 @@ def _munge_wire_selection_to_raw(self, encoding:str, dtype, dformat:int, data) - return struct.pack(fstr, *data) raise ValueError("unhanled encoding: %s" % ((encoding, dtype, dformat),)) - def _process_clipboard_request(self, packet): + def _process_clipboard_request(self, packet:Tuple) -> None: request_id, selection, target = packet[1:4] selection = bytestostr(selection) target = bytestostr(target) @@ -419,7 +634,7 @@ def got_contents(dtype, dformat, data): self.proxy_got_contents(request_id, selection, target, dtype, dformat, data) proxy.get_contents(target, got_contents) - def proxy_got_contents(self, request_id, selection, target, dtype, dformat, data): + def proxy_got_contents(self, request_id:int, selection:str, target:str, dtype:str, dformat:int, data) -> None: def no_contents(): self.send("clipboard-contents-none", request_id, selection) dtype = bytestostr(dtype) @@ -451,7 +666,7 @@ def no_contents(): dtype, dformat, wire_encoding, wire_data, truncated] self.send(*packet) - def _may_compress(self, dtype, dformat, wire_data): + def _may_compress(self, dtype:str, dformat:int, wire_data): if len(wire_data)>self.max_clipboard_packet_size: log.warn("Warning: clipboard contents are too big and have not been sent") log.warn(" %s compressed bytes dropped (maximum is %s)", len(wire_data), self.max_clipboard_packet_size) @@ -466,7 +681,7 @@ def _may_compress(self, dtype, dformat, wire_data): return Compressible(f"clipboard: {dtype} / {dformat}", wire_data) return wire_data - def _process_clipboard_contents(self, packet:Tuple): + def _process_clipboard_contents(self, packet:Tuple) -> None: request_id, selection, dtype, dformat, wire_encoding, wire_data = packet[1:7] selection = bytestostr(selection) wire_encoding = bytestostr(wire_encoding) @@ -476,14 +691,16 @@ def _process_clipboard_contents(self, packet:Tuple): if log.is_debug_enabled(): r = ellipsizer log("clipboard wire -> raw: %s -> %s", (dtype, dformat, wire_encoding, r(wire_data)), r(raw_data)) + assert isinstance(request_id, int) and isinstance(dformat, int) self._clipboard_got_contents(request_id, dtype, dformat, raw_data) - def _process_clipboard_contents_none(self, packet:Tuple): + def _process_clipboard_contents_none(self, packet:Tuple) -> None: log("process clipboard contents none") request_id = packet[1] - self._clipboard_got_contents(request_id, None, None, None) + assert isinstance(request_id, int) + self._clipboard_got_contents(request_id, "", 0, None) - def _clipboard_got_contents(self, request_id:int, dtype, dformat, data): + def _clipboard_got_contents(self, request_id:int, dtype:str, dformat:int, data) -> None: raise NotImplementedError() @@ -510,219 +727,3 @@ def process_clipboard_packet(self, packet:Tuple) -> None: handler(packet) else: log.warn(f"Warning: no clipboard packet handler for {packet_type!r}") - - - -class ClipboardProxyCore: - def __init__(self, selection): - self._selection : str = selection - self._enabled : bool = False - self._have_token : bool = False - #enabled later during setup - self._can_send : bool = False - self._can_receive : bool = False - #clients that need a new token for every owner-change: (ie: win32 and osx) - #(forces the client to request new contents - prevents stale clipboard data) - self._greedy_client : bool = False - self._want_targets : bool = False - #semaphore to block the sending of the token when we change the owner ourselves: - self._block_owner_change : int = 0 - self._last_emit_token : float = 0 - self._emit_token_timer : int = 0 - #counters for info: - self._selection_request_events : int = 0 - self._selection_get_events : int = 0 - self._selection_clear_events : int = 0 - self._sent_token_events : int = 0 - self._got_token_events : int = 0 - self._get_contents_events : int = 0 - self._request_contents_events : int = 0 - self._last_targets = () - self.preferred_targets = [] - - def set_direction(self, can_send : bool, can_receive : bool) -> None: - self._can_send = can_send - self._can_receive = can_receive - - def set_want_targets(self, want_targets) -> None: - self._want_targets = want_targets - - - def get_info(self) -> Dict[str,Any]: - info : Dict[str,Any] = { - "have_token" : self._have_token, - "enabled" : self._enabled, - "greedy_client" : self._greedy_client, - "preferred-targets" : self.preferred_targets, - "blocked_owner_change" : self._block_owner_change, - "last-targets" : self._last_targets, - "event" : { - "selection_request" : self._selection_request_events, - "selection_get" : self._selection_get_events, - "selection_clear" : self._selection_clear_events, - "got_token" : self._got_token_events, - "sent_token" : self._sent_token_events, - "get_contents" : self._get_contents_events, - "request_contents" : self._request_contents_events, - }, - } - return info - - def cleanup(self) -> None: - self._enabled = False - self.cancel_emit_token() - self.cancel_unblock() - - def is_enabled(self) -> bool: - return self._enabled - - def set_enabled(self, enabled : bool) -> None: - log("%s.set_enabled(%s)", self, enabled) - self._enabled = enabled - - def set_greedy_client(self, greedy : bool) -> None: - log("%s.set_greedy_client(%s)", self, greedy) - self._greedy_client = greedy - - def set_preferred_targets(self, preferred_targets) -> None: - self.preferred_targets = preferred_targets - - - def __repr__(self): - return "ClipboardProxyCore(%s)" % self._selection - - def do_owner_changed(self) -> None: - #an application on our side owns the clipboard selection - #(they are ready to provide something via the clipboard) - log("clipboard: %s owner_changed, enabled=%s, "+ - "can-send=%s, can-receive=%s, have_token=%s, greedy_client=%s, block_owner_change=%s", - bytestostr(self._selection), self._enabled, self._can_send, self._can_receive, - self._have_token, self._greedy_client, self._block_owner_change) - if not self._enabled or self._block_owner_change: - return - if self._have_token or ((self._greedy_client or self._want_targets) and self._can_send): - self.schedule_emit_token() - - def schedule_emit_token(self, min_delay:int=0) -> None: - if min_delay==0 and (self._have_token or (not self._want_targets and not self._greedy_client) or DELAY_SEND_TOKEN<0): - #token ownership will change or told not to wait - GLib.idle_add(self.emit_token) - elif not self._emit_token_timer: - #we already had sent the token, - #or sending it is expensive, so wait a bit: - self.do_schedule_emit_token(min_delay) - - def do_schedule_emit_token(self, min_delay:int=0) -> None: - now = monotonic() - elapsed = int((now-self._last_emit_token)*1000) - delay = max(min_delay, DELAY_SEND_TOKEN-elapsed) - log("do_schedule_emit_token(%i) selection=%s, elapsed=%i (max=%i), delay=%i", - min_delay, self._selection, elapsed, DELAY_SEND_TOKEN, delay) - if delay<=0: - #enough time has passed - self.emit_token() - else: - self._emit_token_timer = GLib.timeout_add(delay, self.emit_token) - - def emit_token(self) -> None: - self._emit_token_timer = 0 - boc = self._block_owner_change - if not boc: - self._block_owner_change = GLib.idle_add(self.remove_block) - self._have_token = False - self._last_emit_token = monotonic() - self.do_emit_token() - self._sent_token_events += 1 - - def do_emit_token(self): - #self.emit("send-clipboard-token") - pass - - def cancel_emit_token(self): - ett = self._emit_token_timer - if ett: - self._emit_token_timer = 0 - GLib.source_remove(ett) - - def cancel_unblock(self): - boc = self._block_owner_change - if boc: - self._block_owner_change = 0 - GLib.source_remove(boc) - - #def do_selection_request_event(self, event): - # pass - - #def do_selection_get(self, selection_data, info, time): - # pass - - #def do_selection_clear_event(self, event): - # pass - - def remove_block(self, *_args) -> None: - log("remove_block: %s", self._selection) - self._block_owner_change = 0 - - def claim(self) -> None: - """ - Subclasses may want to take ownership of the clipboard selection. - The X11 clipboard does. - """ - - # This function is called by the xpra core when the peer has requested the - # contents of this clipboard: - def get_contents(self, target:str, got_contents:Callable) -> None: - pass - - - def filter_data(self, dtype=None, dformat=None, data=None, trusted=False, output_dtype=None): - log("filter_data(%s, %s, %i %s, %s, %s)", - dtype, dformat, len(data), type(data), trusted, output_dtype) - if not data: - return data - IMAGE_OVERLAY = os.environ.get("XPRA_CLIPBOARD_IMAGE_OVERLAY", None) - if IMAGE_OVERLAY and not os.path.exists(IMAGE_OVERLAY): - IMAGE_OVERLAY = None - IMAGE_STAMP = envbool("XPRA_CLIPBOARD_IMAGE_STAMP", False) - SANITIZE_IMAGES = envbool("XPRA_SANITIZE_IMAGES", True) - if dtype in ("image/png", "image/jpeg", "image/tiff") and ( - (output_dtype is not None and dtype!=output_dtype) or - IMAGE_STAMP or - IMAGE_OVERLAY or - (SANITIZE_IMAGES and not trusted) - ): - # pylint: disable=import-outside-toplevel - from xpra.codecs.pillow.decoder import open_only - img_type = dtype.split("/")[-1] - img = open_only(data, (img_type, )) - has_alpha = img.mode=="RGBA" - if not has_alpha and IMAGE_OVERLAY: - img = img.convert("RGBA") - w, h = img.size - if IMAGE_OVERLAY: - from PIL import Image #@UnresolvedImport - overlay = Image.open(IMAGE_OVERLAY) - if overlay.mode!="RGBA": - log.warn("Warning: cannot use overlay image '%s'", IMAGE_OVERLAY) - log.warn(" invalid mode '%s'", overlay.mode) - else: - log("adding clipboard image overlay to %s", dtype) - overlay_resized = overlay.resize((w, h), Image.ANTIALIAS) - composite = Image.alpha_composite(img, overlay_resized) - if not has_alpha and img.mode=="RGBA": - composite = composite.convert("RGB") - img = composite - if IMAGE_STAMP: - log("adding clipboard image stamp to %s", dtype) - from datetime import datetime - from PIL import ImageDraw - img_draw = ImageDraw.Draw(img) - w, h = img.size - img_draw.text((10, max(0, h//2-16)), 'via Xpra, %s' % datetime.now().isoformat(), fill='black') - #now save it: - img_type = (output_dtype or dtype).split("/")[-1] - buf = BytesIO() - img.save(buf, img_type.upper()) #ie: "PNG" - data = buf.getvalue() - buf.close() - return data diff --git a/xpra/clipboard/clipboard_timeout_helper.py b/xpra/clipboard/clipboard_timeout_helper.py index 6f1dc9a1bc..e8024b9c40 100644 --- a/xpra/clipboard/clipboard_timeout_helper.py +++ b/xpra/clipboard/clipboard_timeout_helper.py @@ -1,11 +1,13 @@ # This file is part of Xpra. -# Copyright (C) 2019-2020 Antoine Martin +# Copyright (C) 2019-2023 Antoine Martin # Xpra is released under the terms of the GNU GPL v2, or, at your option, any # later version. See the file COPYING for details. +from typing import Dict, Tuple + from gi.repository import GLib # @UnresolvedImport -from xpra.clipboard.clipboard_core import ClipboardProtocolHelperCore +from xpra.clipboard.clipboard_core import ClipboardProtocolHelperCore, ClipboardProxyCore from xpra.util import repr_ellipsized, ellipsizer, envint, engs from xpra.log import Logger from xpra.platform.features import CLIPBOARD_GREEDY @@ -27,26 +29,26 @@ class ClipboardTimeoutHelper(ClipboardProtocolHelperCore): #a clipboard superclass that handles timeouts def __init__(self, send_packet_cb, progress_cb=None, **kwargs): super().__init__(send_packet_cb, progress_cb, **kwargs) - self._clipboard_outstanding_requests = {} + self._clipboard_outstanding_requests : Dict[int, Tuple] = {} - def cleanup(self): + def cleanup(self) -> None: #reply to outstanding requests with "no data": for request_id in tuple(self._clipboard_outstanding_requests.keys()): self._clipboard_got_contents(request_id) self._clipboard_outstanding_requests = {} super().cleanup() - def make_proxy(self, selection): + def make_proxy(self, selection:str): raise NotImplementedError() - def _get_proxy(self, selection): + def _get_proxy(self, selection:str) -> ClipboardProxyCore: proxy = self._clipboard_proxies.get(selection) if not proxy: log.warn("Warning: no clipboard proxy for '%s'", selection) return None return proxy - def set_want_targets_client(self, want_targets): + def set_want_targets_client(self, want_targets:bool) -> None: super().set_want_targets_client(want_targets) #pass it on to the ClipboardProxy instances: for proxy in self._clipboard_proxies.values(): @@ -56,7 +58,7 @@ def set_want_targets_client(self, want_targets): ############################################################################ # network methods for communicating with the remote clipboard: ############################################################################ - def _send_clipboard_token_handler(self, proxy, packet_data=()): + def _send_clipboard_token_handler(self, proxy : ClipboardProxyCore, packet_data=()): if log.is_debug_enabled(): log("_send_clipboard_token_handler(%s, %s)", proxy, repr_ellipsized(packet_data)) remote = self.local_to_remote(proxy._selection) @@ -78,7 +80,7 @@ def _send_clipboard_token_handler(self, proxy, packet_data=()): log("send_clipboard_token_handler %s to %s", proxy._selection, remote) self.send(*packet) - def _send_clipboard_request_handler(self, proxy, selection, target): + def _send_clipboard_request_handler(self, proxy:ClipboardProxyCore, selection:str, target:str): log("send_clipboard_request_handler%s", (proxy, selection, target)) request_id = self._clipboard_request_counter self._clipboard_request_counter += 1 @@ -89,7 +91,7 @@ def _send_clipboard_request_handler(self, proxy, selection, target): self.progress() self.send("clipboard-request", request_id, remote, target) - def timeout_request(self, request_id): + def timeout_request(self, request_id:int) -> None: try: selection, target = self._clipboard_outstanding_requests.pop(request_id)[1:] except KeyError: @@ -103,7 +105,7 @@ def timeout_request(self, request_id): if proxy: proxy.got_contents(target) - def _clipboard_got_contents(self, request_id, dtype=None, dformat=None, data=None): + def _clipboard_got_contents(self, request_id:int, dtype:str="", dformat:int=0, data=None) -> None: try: timer, selection, target = self._clipboard_outstanding_requests.pop(request_id) except KeyError: @@ -121,7 +123,7 @@ def _clipboard_got_contents(self, request_id, dtype=None, dformat=None, data=Non if proxy: proxy.got_contents(target, dtype, dformat, data) - def client_reset(self): + def client_reset(self) -> None: super().client_reset() #timeout all pending requests cor = self._clipboard_outstanding_requests diff --git a/xpra/codecs/gstreamer/capture.py b/xpra/codecs/gstreamer/capture.py index 32553f6c19..347c055fdd 100755 --- a/xpra/codecs/gstreamer/capture.py +++ b/xpra/codecs/gstreamer/capture.py @@ -47,7 +47,7 @@ def __init__(self, element : str="ximagesrc", pixel_format : str="BGRX", self.height : int = height self.frames : int = 0 self.framerate : int = 10 - self.image = Queue(maxsize=1) + self.image : Queue[ImageWrapper] = Queue(maxsize=1) self.create_pipeline(element) assert width>0 and height>0 @@ -152,7 +152,7 @@ def create_pipeline(self, capture_element:str="ximagesrc"): "speed" : 100, "quality" : 100, }) - self.profile = get_profile(options, encoding, csc_mode="YUV444P", default_profile="high" if encoder=="x264enc" else None) + self.profile = get_profile(options, encoding, csc_mode="YUV444P", default_profile="high" if encoder=="x264enc" else "") eopts = get_video_encoder_options(encoder, self.profile, options) vcaps = get_video_encoder_caps(encoder) self.extra_client_info = vcaps.copy() diff --git a/xpra/codecs/loader.py b/xpra/codecs/loader.py index e27a511f81..7bd59fca3c 100755 --- a/xpra/codecs/loader.py +++ b/xpra/codecs/loader.py @@ -312,7 +312,7 @@ def get_rgb_compression_options() -> List[str]: # pylint: disable=import-outside-toplevel from xpra.net import compression compressors = compression.get_enabled_compressors() - compressors = [x for x in compressors if x!="brotli"] + compressors = tuple(x for x in compressors if x!="brotli") RGB_COMP_OPTIONS : List[str] = ["Raw RGB"] if compressors: RGB_COMP_OPTIONS += ["/".join(compressors)] diff --git a/xpra/gst_pipeline.py b/xpra/gst_pipeline.py index 288d446c13..961c924746 100644 --- a/xpra/gst_pipeline.py +++ b/xpra/gst_pipeline.py @@ -176,7 +176,7 @@ def cleanup(self) -> None: return b.remove_signal_watch() self.pipeline = None - self.state = None + self.state = "destroyed" self.info = {} f = self.file if f: diff --git a/xpra/net/compression.py b/xpra/net/compression.py index 6e29334bae..292ca87b7b 100644 --- a/xpra/net/compression.py +++ b/xpra/net/compression.py @@ -1,30 +1,30 @@ #!/usr/bin/env python3 # This file is part of Xpra. -# Copyright (C) 2011-2022 Antoine Martin +# Copyright (C) 2011-2023 Antoine Martin # Copyright (C) 2008, 2009, 2010 Nathaniel Smith # Xpra is released under the terms of the GNU GPL v2, or, at your option, any # later version. See the file COPYING for details. from collections import namedtuple -from typing import Any +from typing import Any, Tuple, Dict, Callable from xpra.util import envbool from xpra.common import MIN_COMPRESS_SIZE, MAX_DECOMPRESSED_SIZE #all the compressors we know about, in best compatibility order: -ALL_COMPRESSORS = ("lz4", "zlib", "brotli", "none") +ALL_COMPRESSORS : Tuple[str, ...] = ("lz4", "zlib", "brotli", "none") #order for performance: -PERFORMANCE_ORDER = ("none", "lz4", "zlib", "brotli") +PERFORMANCE_ORDER : Tuple[str, ...] = ("none", "lz4", "zlib", "brotli") #require compression (disallow 'none'): -PERFORMANCE_COMPRESSION = ("lz4", "zlib", "brotli") +PERFORMANCE_COMPRESSION : Tuple[str, ...] = ("lz4", "zlib", "brotli") Compression = namedtuple("Compression", ["name", "version", "compress", "decompress"]) -COMPRESSION = {} +COMPRESSION : Dict[str,Compression] = {} -def init_lz4(): +def init_lz4() -> Compression: #pylint: disable=import-outside-toplevel #pylint: disable=redefined-outer-name from xpra.net.lz4.lz4 import compress, decompress, get_version # @UnresolvedImport @@ -36,7 +36,7 @@ def lz4_decompress(data): return decompress(data, max_size=MAX_DECOMPRESSED_SIZE) return Compression("lz4", get_version(), lz4_compress, lz4_decompress) -def init_brotli(): +def init_brotli() -> Compression: #pylint: disable=import-outside-toplevel #pylint: disable=redefined-outer-name from xpra.net.protocol.header import BROTLI_FLAG @@ -55,7 +55,7 @@ def brotli_compress_shim(packet, level): return level | BROTLI_FLAG, brotli_compress(packet, quality=level) return Compression("brotli", brotli_version, brotli_compress_shim, brotli_decompress) -def init_zlib(): +def init_zlib() -> Compression: #pylint: disable=import-outside-toplevel import zlib from xpra.net.protocol.header import ZLIB_FLAG @@ -69,9 +69,9 @@ def zlib_decompress(data): v = d.decompress(data, MAX_DECOMPRESSED_SIZE) assert not d.unconsumed_tail, "not all data was decompressed" return v - return Compression("zlib", zlib.__version__, zlib_compress, zlib_decompress) # @UndefinedVariable + return Compression("zlib", zlib.__version__, zlib_compress, zlib_decompress) # @UndefinedVariable # type: ignore # noqa -def init_none(): +def init_none() -> Compression: def nocompress(packet, _level): if not isinstance(packet, bytes): packet = bytes(str(packet), 'UTF-8') @@ -81,11 +81,12 @@ def nodecompress(v): return Compression("none", None, nocompress, nodecompress) -def init_compressors(*names): +def init_compressors(*names) -> None: for x in names: + assert x not in ("compressors", "all"), "attempted to recurse!" if not envbool("XPRA_"+x.upper(), True): continue - fn = globals().get(f"init_{x}") + fn : Callable = globals().get(f"init_{x}") try: c = fn() assert c @@ -96,7 +97,7 @@ def init_compressors(*names): logger = Logger("network", "protocol") logger(f"no {x}", exc_info=True) -def init_all(): +def init_all() -> None: init_compressors(*(list(ALL_COMPRESSORS)+["none"])) @@ -116,14 +117,14 @@ def get_compression_caps(full_info : int=1) -> dict[str,Any]: ccaps[""] = True return caps -def get_enabled_compressors(order=ALL_COMPRESSORS): +def get_enabled_compressors(order=ALL_COMPRESSORS) -> Tuple[str,...]: return tuple(x for x in order if x in COMPRESSION) -def get_compressor(name): +def get_compressor(name) -> Compression: c = COMPRESSION.get(name) - if c is None: - raise ValueError(f"{name!r} compression is not supported") - return c.compress + if c is not None: + return c.compress + raise ValueError(f"{name!r} compression is not supported") class Compressed: @@ -169,7 +170,7 @@ def compress(self): raise NotImplementedError(f"compress() function is not defined on {self} ({type(self)})") -def compressed_wrapper(datatype, data, level=5, can_inline=True, **kwargs): +def compressed_wrapper(datatype, data, level=5, can_inline=True, **kwargs) -> Compressed: size = len(data) def no(): return Compressed(f"raw {datatype}", data, can_inline=can_inline) @@ -208,7 +209,7 @@ def get_compression_type(level) -> str: return "zlib" -def decompress(data, level): +def decompress(data:bytes, level:int): from xpra.net.protocol.header import LZ4_FLAG, BROTLI_FLAG if level & LZ4_FLAG: algo = "lz4" @@ -218,7 +219,7 @@ def decompress(data, level): algo = "zlib" return decompress_by_name(data, algo) -def decompress_by_name(data, algo): +def decompress_by_name(data:bytes, algo:str): c = COMPRESSION.get(algo) if c is None: raise InvalidCompressionException(f"{algo} is not available") diff --git a/xpra/net/net_util.py b/xpra/net/net_util.py index 653d301fe6..df467daf69 100755 --- a/xpra/net/net_util.py +++ b/xpra/net/net_util.py @@ -8,7 +8,7 @@ import socket import sys -from typing import Dict, Tuple, Any, Optional +from typing import Dict, Tuple, Any, Optional, Callable from xpra.os_util import WIN32 from xpra.version_util import parse_version @@ -362,6 +362,11 @@ def get_net_config() -> dict: return config +SSL_CONV : Dict[str, Tuple[str,Callable]] = { + "" : ("version-str", str), + "_INFO" : ("version", parse_version), + "_NUMBER" : ("version-number", int), + } def get_ssl_info(show_constants=False) -> Dict[str,Any]: try: import ssl # pylint: disable=import-outside-toplevel @@ -388,15 +393,12 @@ def get_ssl_info(show_constants=False) -> Dict[str,Any]: v = getattr(ssl, k, None) if v is not None: info[name] = v - for k, idef in { - "" : ("version-str", str), - "_INFO" : ("version", parse_version), - "_NUMBER" : ("version-number", int), - }.items(): + for k, idef in SSL_CONV.items(): v = getattr(ssl, f"OPENSSL_VERSION{k}", None) if v is not None: name, conv = idef - info.setdefault("openssl", {})[name] = conv(v) + sslinfo = info.setdefault("openssl", {}) + sslinfo[name] = conv(v) return info diff --git a/xpra/scripts/config.py b/xpra/scripts/config.py index ae552c715e..5791d4b05d 100755 --- a/xpra/scripts/config.py +++ b/xpra/scripts/config.py @@ -82,7 +82,7 @@ def get_xorg_bin() -> Optional[str]: if os.path.exists(p): return p #look for it in $PATH: - for x in os.environ.get("PATH").split(os.pathsep): # pragma: no cover + for x in os.environ.get("PATH", "").split(os.pathsep): # pragma: no cover xorg = os.path.join(x, "Xorg") if os.path.isfile(xorg): return xorg @@ -374,12 +374,12 @@ def read_config(conf_file:str) -> Dict: return d -def conf_files(conf_dir:str, xpra_conf_filename:str=DEFAULT_XPRA_CONF_FILENAME): +def conf_files(conf_dir:str, xpra_conf_filename:str=DEFAULT_XPRA_CONF_FILENAME) -> List[str]: """ Returns all the config file paths found in the config directory ie: ["/etc/xpra/conf.d/15_features.conf", ..., "/etc/xpra/xpra.conf"] """ - d = [] + d : List[str] = [] cdir = os.path.expanduser(conf_dir) if not os.path.exists(cdir) or not os.path.isdir(cdir): debug(f"invalid config directory: {cdir!r}") @@ -1146,9 +1146,9 @@ def get_defaults(): NO_FILE_OPTIONS = ("daemon", ) -TRUE_OPTIONS : Tuple[str, ...] = ("yes", "true", "1", "on", True) -FALSE_OPTIONS : Tuple[str, ...] = ("no", "false", "0", "off", False) -ALL_BOOLEAN_OPTIONS : Tuple[str, ...] = tuple(list(TRUE_OPTIONS)+list(FALSE_OPTIONS)) +TRUE_OPTIONS : Tuple[Any, ...] = ("yes", "true", "1", "on", True) +FALSE_OPTIONS : Tuple[Any, ...] = ("no", "false", "0", "off", False) +ALL_BOOLEAN_OPTIONS : Tuple[Any, ...] = tuple(list(TRUE_OPTIONS)+list(FALSE_OPTIONS)) OFF_OPTIONS : Tuple[str, ...] = ("off", ) def parse_bool(k:str, v, auto=None) -> Optional[bool]: @@ -1442,10 +1442,10 @@ def fixup_compression(options) -> None: #packet compression: from xpra.net import compression cstr = csvstrl(options.compressors) - if cstr=="none": - compressors = [] - elif cstr=="all": + if cstr=="all": compressors = compression.PERFORMANCE_ORDER + elif cstr=="none": + compressors = [] else: compressors = nodupes(cstr) unknown = tuple(x for x in compressors if x and x not in compression.ALL_COMPRESSORS) diff --git a/xpra/server/mixins/child_command_server.py b/xpra/server/mixins/child_command_server.py index 8dea3c1f70..d0cd258280 100644 --- a/xpra/server/mixins/child_command_server.py +++ b/xpra/server/mixins/child_command_server.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # This file is part of Xpra. -# Copyright (C) 2010-2022 Antoine Martin +# Copyright (C) 2010-2023 Antoine Martin # Copyright (C) 2008 Nathaniel Smith # Xpra is released under the terms of the GNU GPL v2, or, at your option, any # later version. See the file COPYING for details. @@ -10,9 +10,10 @@ import os.path from time import monotonic from subprocess import Popen +from typing import Dict, List, Callable, Any, Optional, Tuple from xpra.platform.features import COMMAND_SIGNALS -from xpra.child_reaper import getChildReaper, reaper_cleanup +from xpra.child_reaper import getChildReaper, ProcInfo, reaper_cleanup from xpra.os_util import ( bytestostr, restore_script_env, @@ -40,7 +41,7 @@ class ChildCommandServer(StubServerMixin): """ def __init__(self): - self.child_display = None + self.child_display : str = "" self.start_commands = [] self.start_child_commands = [] self.start_after_connect = [] @@ -49,29 +50,29 @@ def __init__(self): self.start_child_on_connect = [] self.start_on_last_client_exit = [] self.start_child_on_last_client_exit = [] - self.exit_with_children = False - self.children_count = 0 - self.start_after_connect_done = False - self.start_new_commands = False + self.exit_with_children : bool = False + self.children_count : int = 0 + self.start_after_connect_done : bool = False + self.start_new_commands : bool = False self.source_env = {} self.start_env = {} self.exec_cwd = None self.exec_wrapper = None - self.terminate_children = False - self.children_started = [] + self.terminate_children : bool = False + self.children_started : List[ProcInfo] = [] self.child_reaper = None - self.reaper_exit = self.reaper_exit_check + self.reaper_exit : Callable = self.reaper_exit_check #does not belong here... if not hasattr(self, "_upgrading"): self._upgrading = None if not hasattr(self, "session_name"): - self.session_name = "" + self.session_name : str = "" self.menu_provider = None #wait for main loop to run #and ensure that we don't run `late_start()` more than once, #even if __init__ is called multiple times: if not getattr(self, "late_start_requested", False): - self.late_start_requested = True + self.late_start_requested : bool = True self.idle_add(self.late_start) def late_start(self): @@ -82,7 +83,7 @@ def do_late_start(): start_thread(do_late_start, "command-late-start", daemon=True) - def init(self, opts): + def init(self, opts) -> None: self.exit_with_children = opts.exit_with_children self.terminate_children = opts.terminate_children self.start_new_commands = opts.start_new_commands @@ -99,21 +100,21 @@ def init(self, opts): if opts.exec_wrapper: self.exec_wrapper = shlex.split(opts.exec_wrapper) self.child_reaper = getChildReaper() - self.source_env = source_env(opts.source_start) + self.source_env : Dict[str, str] = source_env(opts.source_start) self.start_env = parse_env(opts.start_env) if self.start_new_commands: #may already have been initialized by servercore: self.menu_provider = self.menu_provider or get_menu_provider() self.menu_provider.on_reload.append(self.send_updated_menu) - def threaded_setup(self): + def threaded_setup(self) -> None: self.exec_start_commands() def set_reaper_callback(): self.child_reaper.set_quit_callback(self.reaper_exit) self.child_reaper.check() self.idle_add(set_reaper_callback) - def cleanup(self): + def cleanup(self) -> None: if self.terminate_children and self._upgrading!=EXITING_CODE: self.terminate_children_processes() def ignore(): @@ -124,7 +125,7 @@ def ignore(): reaper_cleanup() - def get_server_features(self, _source) -> dict: + def get_server_features(self, _source) -> Dict[str,Any]: return { "start-new-commands" : self.start_new_commands, "exit-with-children" : self.exit_with_children, @@ -141,8 +142,8 @@ def _get_xdg_menu_data(self): return self.menu_provider.get_menu_data() - def get_caps(self, source) -> dict: - caps = {} + def get_caps(self, source) -> Dict[str,Any]: + caps : Dict[str,Any] = {} if not source: return caps #don't assume we have a real ClientConnection object: @@ -155,7 +156,7 @@ def get_caps(self, source) -> dict: return caps - def send_initial_data(self, ss, caps, send_ui, share_count): + def send_initial_data(self, ss, caps:dict, send_ui:bool, share_count:int) -> None: if not getattr(ss, "xdg_menu", False): return if ss.xdg_menu_update: @@ -163,14 +164,14 @@ def send_initial_data(self, ss, caps, send_ui, share_count): #so do it in a throw-away thread: start_thread(self.send_xdg_menu_data, "send-xdg-menu-data", True, (ss,)) - def send_xdg_menu_data(self, ss): + def send_xdg_menu_data(self, ss) -> None: if ss.is_closed(): return xdg_menu = self._get_xdg_menu_data() or {} log(f"{len(xdg_menu)} entries sent in initial data") ss.send_setting_change("xdg-menu", xdg_menu) - def send_updated_menu(self, xdg_menu): + def send_updated_menu(self, xdg_menu) -> None: log("send_updated_menu(%s)", ellipsizer(xdg_menu)) for source in tuple(self._server_sources.values()): #some sources don't have this attribute (ie: RFBSource does not): @@ -178,8 +179,8 @@ def send_updated_menu(self, xdg_menu): source.send_setting_change("xdg-menu", xdg_menu or {}) - def get_info(self, _proto) -> dict: - info = { + def get_info(self, _proto) -> Dict[str,Any]: + info : Dict[str,Any] = { "start" : self.start_commands, "start-late" : self.start_late_commands, "start-child" : self.start_child_commands, @@ -205,11 +206,11 @@ def get_info(self, _proto) -> dict: return {"commands": info} - def last_client_exited(self): + def last_client_exited(self) -> None: self._exec_commands(self.start_on_last_client_exit, self.start_child_on_last_client_exit) - def get_child_env(self): + def get_child_env(self) -> Dict[str,str]: #subclasses may add more items (ie: fakexinerama) env = restore_script_env(super().get_child_env()) env.update(self.source_env) @@ -218,32 +219,32 @@ def get_child_env(self): env["DISPLAY"] = self.child_display return env - def get_full_child_command(self, cmd, use_wrapper=True) -> list: + def get_full_child_command(self, cmd, use_wrapper=True) -> List[str]: #make sure we have it as a list: cmd = super().get_full_child_command(cmd, use_wrapper) if not use_wrapper or not self.exec_wrapper: return cmd return self.exec_wrapper + cmd - def exec_start_late_commands(self): + def exec_start_late_commands(self) -> None: log("exec_start_late_commands() start-late=%s, start_child=%s", self.start_late_commands, self.start_child_late_commands) self._exec_commands(self.start_late_commands, self.start_child_late_commands) - def exec_start_commands(self): + def exec_start_commands(self) -> None: log("exec_start_commands() start=%s, start_child=%s", self.start_commands, self.start_child_commands) self._exec_commands(self.start_commands, self.start_child_commands) - def exec_after_connect_commands(self): + def exec_after_connect_commands(self) -> None: log("exec_after_connect_commands() start=%s, start_child=%s", self.start_after_connect, self.start_child_after_connect) self._exec_commands(self.start_after_connect, self.start_child_after_connect) - def exec_on_connect_commands(self): + def exec_on_connect_commands(self) -> None: log("exec_on_connect_commands() start=%s, start_child=%s", self.start_on_connect, self.start_child_on_connect) self._exec_commands(self.start_on_connect, self.start_child_on_connect) - def _exec_commands(self, start_list, start_child_list): + def _exec_commands(self, start_list, start_child_list) -> None: started = [] if start_list: for x in start_list: @@ -261,7 +262,8 @@ def _exec_commands(self, start_list, start_child_list): if not self.session_name: self.idle_add(self.guess_session_name, procs) - def start_command(self, name, child_cmd, ignore=False, callback=None, use_wrapper=True, shell=False, **kwargs): + def start_command(self, name:str, child_cmd, ignore:bool=False, callback:Optional[Callable]=None, + use_wrapper:bool=True, shell:bool=False, **kwargs): env = self.get_child_env() log("start_command%s exec_wrapper=%s, exec_cwd=%s", (name, child_cmd, ignore, callback, use_wrapper, shell, kwargs), self.exec_wrapper, self.exec_cwd) @@ -290,19 +292,19 @@ def start_command(self, name, child_cmd, ignore=False, callback=None, use_wrappe return None - def add_process(self, process, name, command, ignore=False, callback=None): + def add_process(self, process, name:str, command, ignore:bool=False, callback:Optional[Callable]=None) -> ProcInfo: return self.child_reaper.add_process(process, name, command, ignore, callback=callback) def is_child_alive(self, proc) -> bool: return proc is not None and proc.poll() is None - def reaper_exit_check(self): + def reaper_exit_check(self) -> None: log(f"reaper_exit_check() exit_with_children={self.exit_with_children}") if self.exit_with_children and self.children_count: log.info("all children have exited and --exit-with-children was specified, exiting") self.idle_add(self.clean_quit) - def terminate_children_processes(self): + def terminate_children_processes(self) -> None: cl = tuple(self.children_started) self.children_started = [] log(f"terminate_children_processes() children={cl}") @@ -333,7 +335,7 @@ def terminate_children_processes(self): log(f"still not terminated: {wait_for}") log("done waiting for child commands") - def guess_session_name(self, procs=None): + def guess_session_name(self, procs=None) -> None: if not procs: return from xpra.scripts.server import IBUS_DAEMON_COMMAND # pylint: disable=import-outside-toplevel @@ -366,7 +368,7 @@ def guess_session_name(self, procs=None): self.mdns_update() - def _process_start_command(self, proto, packet): + def _process_start_command(self, proto, packet:Tuple): log(f"start new command: {packet}") if not self.start_new_commands: log.warn("Warning: received start-command request,") @@ -387,7 +389,7 @@ def _process_start_command(self, proto, packet): ss.add_window_filter("window", "pid", "=", proc.pid) log(f"process_start_command: proc={proc}") - def _process_command_signal(self, _proto, packet): + def _process_command_signal(self, _proto, packet:Tuple) -> None: pid = packet[1] signame = bytestostr(packet[2]) if signame not in COMMAND_SIGNALS: @@ -412,7 +414,7 @@ def _process_command_signal(self, _proto, packet): log.estr(e) - def init_packet_handlers(self): + def init_packet_handlers(self) -> None: log("init_packet_handlers() COMMANDS_SIGNALS=%s, start new commands=%s", COMMAND_SIGNALS, self.start_new_commands) if COMMAND_SIGNALS: diff --git a/xpra/server/mixins/stub_server_mixin.py b/xpra/server/mixins/stub_server_mixin.py index 9573829c24..9cb762f23a 100644 --- a/xpra/server/mixins/stub_server_mixin.py +++ b/xpra/server/mixins/stub_server_mixin.py @@ -5,7 +5,7 @@ import os import shlex -from typing import Callable, Any +from typing import Callable, Any, List from xpra.util import typedict from xpra.os_util import WIN32 @@ -134,7 +134,7 @@ def get_child_env(self) -> dict[str,str]: return os.environ.copy() - def get_full_child_command(self, cmd, use_wrapper : bool=True) -> list: + def get_full_child_command(self, cmd, use_wrapper : bool=True) -> List[str]: #make sure we have it as a list: if isinstance(cmd, (list, tuple)): return list(cmd) diff --git a/xpra/x11/gtk_x11/clipboard.py b/xpra/x11/gtk_x11/clipboard.py index f82caf2a11..0f036ee27a 100644 --- a/xpra/x11/gtk_x11/clipboard.py +++ b/xpra/x11/gtk_x11/clipboard.py @@ -346,7 +346,7 @@ def nodata(): self.emit("send-clipboard-request", self._selection, req_target) waiting.append((requestor, target, prop, event.time)) - def set_selection_response(self, requestor, target, prop, dtype, dformat, data, time=0) -> None: + def set_selection_response(self, requestor:int, target:str, prop:str, dtype:str, dformat:int, data, time:int=0) -> None: log("set_selection_response(%s, %s, %s, %s, %s, %r, %i)", requestor, target, prop, dtype, dformat, ellipsizer(data), time) #answer the selection request: @@ -370,7 +370,7 @@ def set_selection_response(self, requestor, target, prop, dtype, dformat, data, log.warn(f" property {prop!r}") log.warn(f" {e}") - def got_contents(self, target, dtype=None, dformat=None, data=None) -> None: + def got_contents(self, target:str, dtype:str="", dformat:int=0, data=None) -> None: #if this is the special target 'TARGETS', cache the result: if target=="TARGETS" and dtype=="ATOM" and dformat==32: self.targets = xatoms_to_strings(data) diff --git a/xpra/x11/gtk_x11/tray.py b/xpra/x11/gtk_x11/tray.py index 1b9cc43c5d..6ca08801aa 100644 --- a/xpra/x11/gtk_x11/tray.py +++ b/xpra/x11/gtk_x11/tray.py @@ -137,6 +137,8 @@ def cleanup(self) -> None: def setup_tray_window(self) -> None: display = Gdk.Display.get_default() root = get_default_root_window() + if root is None: + raise RuntimeError("no root window!") screen = root.get_screen() owner = X11Window.XGetSelectionOwner(SELECTION) log(f"setup tray: current selection owner={owner:x}") diff --git a/xpra/x11/server.py b/xpra/x11/server.py index a1a1e8185e..97c0c95b1e 100644 --- a/xpra/x11/server.py +++ b/xpra/x11/server.py @@ -231,10 +231,10 @@ def update_size_constraints(self, minw, minh, maxw, maxh) -> None: if wm: wm.set_size_constraints(minw, minh, maxw, maxh) elif server_features.windows: - #update the default so Wm will use it + #update the static default so the Wm instance will use it #when we do instantiate it: - from xpra.x11.gtk_x11 import wm - wm.DEFAULT_SIZE_CONSTRAINTS = (0, 0, MAX_WINDOW_SIZE, MAX_WINDOW_SIZE) + from xpra.x11.gtk_x11 import wm as wm_module + wm_module.DEFAULT_SIZE_CONSTRAINTS = (0, 0, MAX_WINDOW_SIZE, MAX_WINDOW_SIZE) def init_packet_handlers(self) -> None: diff --git a/xpra/x11/server_keyboard_config.py b/xpra/x11/server_keyboard_config.py index abd3ea4145..9d63c74f02 100644 --- a/xpra/x11/server_keyboard_config.py +++ b/xpra/x11/server_keyboard_config.py @@ -571,7 +571,7 @@ def toggle_modifier(mod): for entry in entries: if entry.group==group: return entry.keycode, entry.group - return keycode, rgroup + return keycode or 0, rgroup or 0 def get_current_mask(self) -> List: root = get_default_root_window()