From 90e1a044cedcb817e4980e730cb08851dd3eb34d Mon Sep 17 00:00:00 2001 From: Antoine Martin Date: Thu, 22 Feb 2018 06:15:33 +0000 Subject: [PATCH] move ping and info packet handling to a mixin git-svn-id: https://xpra.org/svn/Xpra/trunk@18529 3bb7dfac-3a0b-4e04-842a-767bc560f471 --- src/xpra/client/mixins/network_state.py | 185 ++++++++++++++++++++++++ src/xpra/client/ui_client_base.py | 159 +++----------------- 2 files changed, 203 insertions(+), 141 deletions(-) create mode 100644 src/xpra/client/mixins/network_state.py diff --git a/src/xpra/client/mixins/network_state.py b/src/xpra/client/mixins/network_state.py new file mode 100644 index 0000000000..73663cc68a --- /dev/null +++ b/src/xpra/client/mixins/network_state.py @@ -0,0 +1,185 @@ +# This file is part of Xpra. +# Copyright (C) 2010-2018 Antoine Martin +# Xpra is released under the terms of the GNU GPL v2, or, at your option, any +# later version. See the file COPYING for details. + +import os +import re +from collections import deque + +from xpra.log import Logger +log = Logger("network") +bandwidthlog = Logger("bandwidth") + +from xpra.os_util import monotonic_time, POSIX +from xpra.util import envint, csv +from xpra.exit_codes import EXIT_TIMEOUT + + +FAKE_BROKEN_CONNECTION = envint("XPRA_FAKE_BROKEN_CONNECTION") +PING_TIMEOUT = envint("XPRA_PING_TIMEOUT", 60) +#LOG_INFO_RESPONSE = ("^window.*position", "^window.*size$") +LOG_INFO_RESPONSE = os.environ.get("XPRA_LOG_INFO_RESPONSE", "") + + +""" +Mixin for adding server / network state monitoring functions: +- ping and echo +- info request and response +""" +class NetworkState(object): + + def __init__(self): + self.start_time = monotonic_time() + self.server_start_time = -1 + + #setting: + self.pings = False + + #bandwidth + self.server_bandwidth_limit_change = False + self.server_bandwidth_limit = 0 + self.server_session_name = None + + #info requests + self.server_last_info = None + self.info_request_pending = False + + #network state: + self.server_ping_latency = deque(maxlen=1000) + self.server_load = None + self.client_ping_latency = deque(maxlen=1000) + self._server_ok = True + self.last_ping_echoed_time = 0 + + + def init(self, opts): + self.pings = opts.pings + + + def cleanup(self): + pass + + + def get_caps(self): + return {"info-namespace" : True} + + def parse_server_capabilities(self): + c = self.server_capabilities + self.server_start_time = c.intget("start_time", -1) + self.server_bandwidth_limit_change = c.boolget("network.bandwidth-limit-change") + self.server_bandwidth_limit = c.intget("network.bandwidth-limit") + bandwidthlog("server_bandwidth_limit_change=%s, server_bandwidth_limit=%s", self.server_bandwidth_limit_change, self.server_bandwidth_limit) + + def process_ui_capabilities(self): + self.send_deflate_level() + self.send_ping() + if self.pings>0: + self.timeout_add(1000*self.pings, self.send_ping) + + + ###################################################################### + # info: + def _process_info_response(self, packet): + self.info_request_pending = False + self.server_last_info = packet[1] + log("info-response: %s", self.server_last_info) + if LOG_INFO_RESPONSE: + items = LOG_INFO_RESPONSE.split(",") + logres = [re.compile(v) for v in items] + log.info("info-response debug for %s:", csv(["'%s'" % x for x in items])) + for k in sorted(self.server_last_info.keys()): + if any(lr.match(k) for lr in logres): + log.info(" %s=%s", k, self.server_last_info[k]) + + def send_info_request(self, *categories): + if not self.info_request_pending: + self.info_request_pending = True + self.send("info-request", [self.uuid], tuple(self._id_to_window.keys()), categories) + + + ###################################################################### + # network and status: + def server_ok(self): + return self._server_ok + + def check_server_echo(self, ping_sent_time): + if self._protocol is None: + #no longer connected! + return False + last = self._server_ok + if FAKE_BROKEN_CONNECTION>0: + self._server_ok = (int(monotonic_time()) % FAKE_BROKEN_CONNECTION) <= (FAKE_BROKEN_CONNECTION//2) + else: + self._server_ok = self.last_ping_echoed_time>=ping_sent_time + log("check_server_echo(%s) last=%s, server_ok=%s (last_ping_echoed_time=%s)", ping_sent_time, last, self._server_ok, self.last_ping_echoed_time) + self.server_state_change() + return False + + def server_state_change(self): + log("server_state_change() ok=%s", self._server_ok) + + def check_echo_timeout(self, ping_time): + log("check_echo_timeout(%s) last_ping_echoed_time=%s", ping_time, self.last_ping_echoed_time) + if self.last_ping_echoed_time0: + l = [x for _,x in tuple(self.server_ping_latency)] + avg = sum(l) / len(l) + wait = min(5, 1.0+avg*2.0) + log("send_ping() timestamp=%s, average server latency=%.1f, using max wait %.2fs", now_ms, 1000.0*avg, wait) + self.timeout_add(int(1000.0*wait), self.check_server_echo, now_ms) + return True + + def _process_ping_echo(self, packet): + echoedtime, l1, l2, l3, cl = packet[1:6] + self.last_ping_echoed_time = echoedtime + self.check_server_echo(0) + server_ping_latency = monotonic_time()-echoedtime/1000.0 + self.server_ping_latency.append((monotonic_time(), server_ping_latency)) + self.server_load = l1, l2, l3 + if cl>=0: + self.client_ping_latency.append((monotonic_time(), cl/1000.0)) + log("ping echo server load=%s, measured client latency=%sms", self.server_load, cl) + + def _process_ping(self, packet): + echotime = packet[1] + l1,l2,l3 = 0,0,0 + if POSIX: + try: + (fl1, fl2, fl3) = os.getloadavg() + l1,l2,l3 = int(fl1*1000), int(fl2*1000), int(fl3*1000) + except (OSError, AttributeError): + pass + sl = -1 + if len(self.server_ping_latency)>0: + _, sl = self.server_ping_latency[-1] + self.send("ping_echo", echotime, l1, l2, l3, int(1000.0*sl)) + + + ###################################################################### + # network level packet compression: + def set_deflate_level(self, level): + self.compression_level = level + self.send_deflate_level() + + def send_deflate_level(self): + self._protocol.set_compression_level(self.compression_level) + self.send("set_deflate", self.compression_level) + + + ###################################################################### + # packets: + def init_authenticated_packet_handlers(self): + self.set_packet_handlers(self._packet_handlers, { + "ping": self._process_ping, + "ping_echo": self._process_ping_echo, + "info-response": self._process_info_response, + }) diff --git a/src/xpra/client/ui_client_base.py b/src/xpra/client/ui_client_base.py index 93d821ed99..547dc0354a 100644 --- a/src/xpra/client/ui_client_base.py +++ b/src/xpra/client/ui_client_base.py @@ -6,9 +6,7 @@ # later version. See the file COPYING for details. import os -import re import sys -from collections import deque from xpra.log import Logger log = Logger("client") @@ -23,7 +21,6 @@ from xpra.gtk_common.gobject_util import no_arg_signal from xpra.client.client_base import XpraClientBase -from xpra.exit_codes import EXIT_TIMEOUT from xpra.client.keyboard_helper import KeyboardHelper from xpra.platform import set_name from xpra.platform.features import MMAP_SUPPORTED @@ -36,8 +33,8 @@ from xpra.scripts.config import parse_bool_or_int from xpra.net import compression, packet_encoding from xpra.child_reaper import reaper_cleanup -from xpra.os_util import platform_name, bytestostr, monotonic_time, strtobytes, POSIX, BITS -from xpra.util import nonl, std, envint, envbool, typedict, updict, csv, make_instance, CLIENT_EXIT, XPRA_APP_ID +from xpra.os_util import platform_name, bytestostr, strtobytes, BITS +from xpra.util import nonl, std, envint, envbool, typedict, updict, make_instance, CLIENT_EXIT, XPRA_APP_ID from xpra.version_util import get_version_info_full, get_platform_info #client mixins: from xpra.client.mixins.webcam_forwarder import WebcamForwarder @@ -49,18 +46,12 @@ from xpra.client.mixins.mmap_client import MmapClient from xpra.client.mixins.remote_logging import RemoteLogging from xpra.client.mixins.display_client import DisplayClient +from xpra.client.mixins.network_state import NetworkState -FAKE_BROKEN_CONNECTION = envint("XPRA_FAKE_BROKEN_CONNECTION") -PING_TIMEOUT = envint("XPRA_PING_TIMEOUT", 60) - B_FRAMES = envbool("XPRA_B_FRAMES", True) PAINT_FLUSH = envbool("XPRA_PAINT_FLUSH", True) -#LOG_INFO_RESPONSE = ("^window.*position", "^window.*size$") -LOG_INFO_RESPONSE = os.environ.get("XPRA_LOG_INFO_RESPONSE", "") - - MAX_SOFT_EXPIRED = envint("XPRA_MAX_SOFT_EXPIRED", 5) SEND_TIMESTAMPS = envbool("XPRA_SEND_TIMESTAMPS", False) @@ -71,7 +62,7 @@ Utility superclass for client classes which have a UI. See gtk_client_base and its subclasses. """ -class UIXpraClient(XpraClientBase, DisplayClient, WindowClient, WebcamForwarder, AudioClient, ClipboardClient, NotificationClient, RPCClient, MmapClient, RemoteLogging): +class UIXpraClient(XpraClientBase, DisplayClient, WindowClient, WebcamForwarder, AudioClient, ClipboardClient, NotificationClient, RPCClient, MmapClient, RemoteLogging, NetworkState): #NOTE: these signals aren't registered because this class #does not extend GObject. __gsignals__ = { @@ -97,40 +88,25 @@ def __init__(self): RPCClient.__init__(self) MmapClient.__init__(self) RemoteLogging.__init__(self) + NetworkState.__init__(self) try: pinfo = get_platform_info() osinfo = "%s" % platform_name(sys.platform, pinfo.get("linux_distribution") or pinfo.get("sysrelease", "")) log.info(" running on %s", osinfo) except: log("platform name error:", exc_info=True) - self.start_time = monotonic_time() self._ui_events = 0 self.title = "" self.session_name = u"" - #statistics and server info: - self.server_start_time = -1 self.server_platform = "" - - self.server_bandwidth_limit_change = False - self.server_bandwidth_limit = 0 self.server_session_name = None - self.server_last_info = None - self.info_request_pending = False - self.allowed_encodings = [] self.core_encodings = None self.encoding = None - #network state: - self.server_ping_latency = deque(maxlen=1000) - self.server_load = None - self.client_ping_latency = deque(maxlen=1000) - self._server_ok = True - self.last_ping_echoed_time = 0 - #features: self.opengl_enabled = False self.opengl_props = {} @@ -142,7 +118,6 @@ def __init__(self): self.server_encodings_with_lossless_mode = () self.server_auto_video_encoding = False self.readonly = False - self.pings = False self.xsettings_enabled = False self.server_start_new_commands = False @@ -189,17 +164,14 @@ def init(self, opts): RPCClient.init(self, opts) MmapClient.init(self, opts) RemoteLogging.init(self, opts) + NetworkState.init(self, opts) self.allowed_encodings = opts.encodings self.encoding = opts.encoding self.video_scaling = parse_bool_or_int("video-scaling", opts.video_scaling) self.title = opts.title self.session_name = bytestostr(opts.session_name) - self.xsettings_enabled = opts.xsettings - self.readonly = opts.readonly - self.pings = opts.pings - self.client_supports_sharing = opts.sharing is True self.client_lock = opts.lock is True @@ -265,7 +237,7 @@ def quit(self, exit_code=0): def cleanup(self): log("UIXpraClient.cleanup()") - for x in (XpraClientBase, DisplayClient, WindowClient, WebcamForwarder, AudioClient, ClipboardClient, NotificationClient, RPCClient, MmapClient, RemoteLogging): + for x in (XpraClientBase, DisplayClient, WindowClient, WebcamForwarder, AudioClient, ClipboardClient, NotificationClient, RPCClient, MmapClient, RemoteLogging, NetworkState): x.cleanup(self) for x in (self.keyboard_helper, self.tray, self.menu_helper, self.client_extras, getVideoHelper()): if x is None: @@ -349,7 +321,7 @@ def make_hello(self): for x in ( #generic feature flags: "notify-startup-complete", "wants_events", - "setting-change", "info-namespace", + "setting-change", #legacy (not needed in 1.0 - can be dropped soon): "generic-rgb-encodings", ): @@ -372,6 +344,7 @@ def make_hello(self): #messy unprefixed: caps.update(WindowClient.get_caps(self)) caps.update(DisplayClient.get_caps(self)) + caps.update(NetworkState.get_caps(self)) caps.update(self.get_keyboard_caps()) caps.update(self.get_desktop_caps()) #nicely prefixed: @@ -426,6 +399,7 @@ def parse_server_capabilities(self): return False RemoteLogging.parse_server_capabilities(self) DisplayClient.parse_server_capabilities(self) + NetworkState.parse_server_capabilities(self) c = self.server_capabilities self.server_session_name = strtobytes(c.rawget("session_name", b"")).decode("utf-8") set_name("Xpra", self.session_name or self.server_session_name or "Xpra") @@ -449,12 +423,8 @@ def parse_server_capabilities(self): self.server_encodings_with_quality = c.strlistget("encodings.with_quality", ("jpeg", "webp", "h264")) self.server_encodings_with_lossless_mode = c.strlistget("encodings.with_lossless_mode", ()) self.server_auto_video_encoding = c.boolget("auto-video-encoding") - self.server_start_time = c.intget("start_time", -1) self.server_platform = c.strget("platform") - self.server_bandwidth_limit_change = c.boolget("network.bandwidth-limit-change") - self.server_bandwidth_limit = c.intget("network.bandwidth-limit") - bandwidthlog("server_bandwidth_limit_change=%s, server_bandwidth_limit=%s", self.server_bandwidth_limit_change, self.server_bandwidth_limit) e = c.strget("encoding") if e: if self.encoding and e!=self.encoding: @@ -493,26 +463,19 @@ def process_ui_capabilities(self): RPCClient.parse_capabilities(self) ClipboardClient.parse_capabilities(self) NotificationClient.parse_server_capabilities(self) - #figure out the maximum actual desktop size and use it to - #calculate the maximum size of a packet (a full screen update packet) - self.send_deflate_level() + NetworkState.process_ui_capabilities(self) + #keyboard: c = self.server_capabilities if self.keyboard_helper: modifier_keycodes = c.dictget("modifier_keycodes") if modifier_keycodes: self.keyboard_helper.set_modifier_mappings(modifier_keycodes) - self.key_repeat_delay, self.key_repeat_interval = c.intpair("key_repeat", (-1,-1)) self.handshake_complete() - + self.connect("keyboard-sync-toggled", self.send_keyboard_sync_enabled_status) #FIXME: merge this with parse? ClipboardClient.process_ui_capabilities(self) - self.connect("keyboard-sync-toggled", self.send_keyboard_sync_enabled_status) - self.send_ping() - if self.pings>0: - self.timeout_add(1000*self.pings, self.send_ping) - def _process_startup_complete(self, packet): log("all the existing windows and system trays have been received: %s items", len(self._id_to_window)) @@ -541,26 +504,6 @@ def after_handshake(self, cb, *args): self._on_handshake.append((cb, args)) - ###################################################################### - # info: - def _process_info_response(self, packet): - self.info_request_pending = False - self.server_last_info = packet[1] - log("info-response: %s", self.server_last_info) - if LOG_INFO_RESPONSE: - items = LOG_INFO_RESPONSE.split(",") - logres = [re.compile(v) for v in items] - log.info("info-response debug for %s:", csv(["'%s'" % x for x in items])) - for k in sorted(self.server_last_info.keys()): - if any(lr.match(k) for lr in logres): - log.info(" %s=%s", k, self.server_last_info[k]) - - def send_info_request(self, *categories): - if not self.info_request_pending: - self.info_request_pending = True - self.send("info-request", [self.uuid], tuple(self._id_to_window.keys()), categories) - - ###################################################################### # server messages: def _process_server_event(self, packet): @@ -870,14 +813,6 @@ def send_force_ungrab(self, wid): def send_keyboard_sync_enabled_status(self, *_args): self.send("set-keyboard-sync-enabled", self.keyboard_sync) - def set_deflate_level(self, level): - self.compression_level = level - self.send_deflate_level() - - def send_deflate_level(self): - self._protocol.set_compression_level(self.compression_level) - self.send("set_deflate", self.compression_level) - ###################################################################### # keyboard: @@ -1012,20 +947,8 @@ def get_tray_title(self): ###################################################################### # network and status: - def server_ok(self): - return self._server_ok - - def check_server_echo(self, ping_sent_time): - if self._protocol is None: - #no longer connected! - return False - last = self._server_ok - if FAKE_BROKEN_CONNECTION>0: - self._server_ok = (int(monotonic_time()) % FAKE_BROKEN_CONNECTION) <= (FAKE_BROKEN_CONNECTION//2) - else: - self._server_ok = self.last_ping_echoed_time>=ping_sent_time - log("check_server_echo(%s) last=%s, server_ok=%s (last_ping_echoed_time=%s)", ping_sent_time, last, self._server_ok, self.last_ping_echoed_time) - if last!=self._server_ok and not self._server_ok: + def server_state_change(self): + if not self._server_ok: log.info("server is not responding, drawing spinners over the windows") def timer_redraw(): if self._protocol is None: @@ -1038,7 +961,6 @@ def timer_redraw(): return not ok #repaint again until ok self.idle_add(self.redraw_spinners) self.timeout_add(250, timer_redraw) - return False def redraw_spinners(self): #draws spinner on top of the window, or not (plain repaint) @@ -1049,50 +971,6 @@ def redraw_spinners(self): if not w.is_tray(): w.spinner(ok) - def check_echo_timeout(self, ping_time): - netlog("check_echo_timeout(%s) last_ping_echoed_time=%s", ping_time, self.last_ping_echoed_time) - if self.last_ping_echoed_time0: - l = [x for _,x in tuple(self.server_ping_latency)] - avg = sum(l) / len(l) - wait = min(5, 1.0+avg*2.0) - netlog("send_ping() timestamp=%s, average server latency=%.1f, using max wait %.2fs", now_ms, 1000.0*avg, wait) - self.timeout_add(int(1000.0*wait), self.check_server_echo, now_ms) - return True - - def _process_ping_echo(self, packet): - echoedtime, l1, l2, l3, cl = packet[1:6] - self.last_ping_echoed_time = echoedtime - self.check_server_echo(0) - server_ping_latency = monotonic_time()-echoedtime/1000.0 - self.server_ping_latency.append((monotonic_time(), server_ping_latency)) - self.server_load = l1, l2, l3 - if cl>=0: - self.client_ping_latency.append((monotonic_time(), cl/1000.0)) - netlog("ping echo server load=%s, measured client latency=%sms", self.server_load, cl) - - def _process_ping(self, packet): - echotime = packet[1] - l1,l2,l3 = 0,0,0 - if POSIX: - try: - (fl1, fl2, fl3) = os.getloadavg() - l1,l2,l3 = int(fl1*1000), int(fl2*1000), int(fl3*1000) - except (OSError, AttributeError): - pass - sl = -1 - if len(self.server_ping_latency)>0: - _, sl = self.server_ping_latency[-1] - self.send("ping_echo", echotime, l1, l2, l3, int(1000.0*sl)) - ###################################################################### # packets: @@ -1106,16 +984,15 @@ def init_authenticated_packet_handlers(self): RPCClient.init_authenticated_packet_handlers(self) ClipboardClient.init_authenticated_packet_handlers(self) NotificationClient.init_authenticated_packet_handlers(self) + NetworkState.init_authenticated_packet_handlers(self) + #run from the UI thread: self.set_packet_handlers(self._ui_packet_handlers, { "startup-complete": self._process_startup_complete, "setting-change": self._process_setting_change, "control" : self._process_control, }) - #these handlers can run directly from the network thread: + #run directly from the network thread: self.set_packet_handlers(self._packet_handlers, { - "ping": self._process_ping, - "ping_echo": self._process_ping_echo, - "info-response": self._process_info_response, "server-event": self._process_server_event, })