Skip to content

Commit

Permalink
#835: av-sync feature
Browse files Browse the repository at this point in the history
* command line option: "av-sync" defaults to True
* sound-control interface (used by both "xpra control" and the client to update the delay)
* export info via "xpra info"
* encode via a queue for delaying things if the "av-sync" option is set (and it is set for video regions)

git-svn-id: https://xpra.org/svn/Xpra/trunk@9372 3bb7dfac-3a0b-4e04-842a-767bc560f471
  • Loading branch information
totaam committed May 15, 2015
1 parent 17bd631 commit d638401
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 14 deletions.
24 changes: 24 additions & 0 deletions src/xpra/client/ui_client_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
iconlog = Logger("client", "icon")
screenlog = Logger("client", "screen")
mouselog = Logger("mouse")
avsynclog = Logger("av-sync")


from xpra import __version__ as XPRA_VERSION
from xpra.gtk_common.gobject_util import no_arg_signal
Expand Down Expand Up @@ -58,6 +60,8 @@
PING_TIMEOUT = int(os.environ.get("XPRA_PING_TIMEOUT", "60"))
UNGRAB_KEY = os.environ.get("XPRA_UNGRAB_KEY", "Escape")

AV_SYNC_DELTA = int(os.environ.get("XPRA_AV_SYNC_DELTA", "0"))


PYTHON3 = sys.version_info[0] == 3
WIN32 = sys.platform.startswith("win")
Expand Down Expand Up @@ -149,6 +153,7 @@ def __init__(self):
except Exception as e:
soundlog("sound support unavailable: %s", e)
has_gst = False
self.av_sync = False
#sound state:
self.on_sink_ready = None
self.sound_sink = None
Expand All @@ -164,6 +169,7 @@ def __init__(self):
self.server_sound_encoders = []
self.server_sound_receive = False
self.server_sound_send = False
self.queue_used_sent = None

#dbus:
self.dbus_counter = AtomicInteger()
Expand Down Expand Up @@ -277,6 +283,7 @@ def init(self, opts):
assert has_gst
self.microphone_codecs = get_sound_codecs(False, False)
self.microphone_allowed = len(self.microphone_codecs)>0
self.av_sync = opts.av_sync

self.readonly = opts.readonly
self.windows_enabled = opts.windows
Expand Down Expand Up @@ -1016,6 +1023,9 @@ def make_hello(self):
"generic-rgb-encodings" : True,
"encodings" : self.get_encodings(),
"encodings.core" : self.get_core_encodings(),
"av-sync" : self.av_sync,
#start at 0 and rely on sound-control packets to set the correct value:
"av-sync.delay.default" : 0,
})
if self.dpi>0:
#command line (or config file) override supplied:
Expand Down Expand Up @@ -1346,6 +1356,8 @@ def parse_server_capabilities(self):
log("server has randr: %s", self.server_randr)
self.server_sound_sequence = c.boolget("sound_sequence")
self.server_sound_eos_sequence = c.boolget("sound.eos-sequence")
self.server_av_sync = c.boolget("av-sync.enabled")
avsynclog("av-sync: server=%s, client=%s", self.server_av_sync, self.av_sync)
self.server_info_request = c.boolget("info-request")
e = c.strget("encoding")
if e:
Expand Down Expand Up @@ -1807,6 +1819,18 @@ def _process_sound_data(self, packet):
#(some packets (ie: sos, eos) only contain metadata)
if len(data)>0:
ss.add_data(data, metadata)
if self.av_sync and self.server_av_sync:
info = ss.get_info()
queue_used = info.get("queue.used")
if not queue_used:
avsynclog("server sound sync: info=%s", info)
if queue_used and (self.queue_used_sent is None or abs(self.queue_used_sent-queue_used)>=80):
avsynclog("server sound sync: sending updated queue.used=%i (was %s)", queue_used, (self.queue_used_sent or "unset"))
self.queue_used_sent = queue_used
v = queue_used + AV_SYNC_DELTA
if AV_SYNC_DELTA:
avsynclog(" adjusted value=%i with sync delta=%i", v, AV_SYNC_DELTA)
self.send("sound-control", "sync", v)


def send_notify_enabled(self):
Expand Down
3 changes: 2 additions & 1 deletion src/xpra/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ def set_global_logging_handler(h):
"window", "icon", "info", "launcher", "mdns", "cursor",
"mmap", "network", "protocol", "crypto", "encoder", "stats",
"notify", "xsettings", "grab", "xshm", "workspace",
"sound", "printing", "file", "events",
"sound", "av-sync",
"printing", "file", "events",
"opengl",
"osx", "win32",
"paint", "platform", "import",
Expand Down
2 changes: 2 additions & 0 deletions src/xpra/scripts/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ def read_xpra_defaults():
"swap-keys" : bool,
"start-new-commands": bool,
"remote-logging" : bool,
"av-sync" : bool,
#arrays of strings:
"encodings" : list,
"video-encoders" : list,
Expand Down Expand Up @@ -419,6 +420,7 @@ def get_defaults():
"exit-with-client" : False,
"start-new-commands": False,
"remote-logging" : sys.platform.startswith("win") or sys.platform.startswith("darwin"),
"av-sync" : True,
"exit-ssh" : True,
"opengl" : OPENGL_DEFAULT,
"mdns" : False,
Expand Down
5 changes: 5 additions & 0 deletions src/xpra/scripts/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,12 @@ def legacy_bool_parse(optionname, newoptionname=None):
group.add_option("--sound-source", action="store",
dest="sound_source", default=defaults.sound_source,
help="Specifies which sound system to use to capture the sound stream (use 'help' for options)")
legacy_bool_parse("av-sync")
group.add_option("--av-sync", action="store",
dest="av_sync", default=defaults.av_sync,
help="Try to synchronize sound and video. Default: %s." % enabled_str(defaults.av_sync))
else:
hidden_options["av-sync"] = False
hidden_options["speaker"] = False
hidden_options["speaker_codec"] = []
hidden_options["microphone"] = False
Expand Down
5 changes: 4 additions & 1 deletion src/xpra/server/server_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ def init(self, opts):
self.send_pings = opts.pings
self.file_transfer = opts.file_transfer
self.lpadmin = opts.lpadmin
self.av_sync = opts.av_sync
#server-side printer handling is only for posix via pycups for now:
if os.name=="posix" and opts.printing:
try:
Expand Down Expand Up @@ -765,7 +766,7 @@ def get_window_id(wid):
self.idle_timeout, self.idle_timeout_cb, self.idle_grace_timeout_cb, self._socket_dir,
self.get_transient_for, self.get_focus, self.get_cursor_data,
get_window_id,
self.supports_mmap,
self.supports_mmap, self.av_sync,
self.core_encodings, self.encodings, self.default_encoding, self.scaling_control,
self.sound_source_plugin,
self.supports_speaker, self.supports_microphone,
Expand Down Expand Up @@ -911,6 +912,7 @@ def get_server_features(self):
"sound_sequence", "notify-startup-complete", "suspend-resume",
"encoding.generic", "encoding.strict_control",
"sound.server_driven",
"av-sync",
"command_request",
"event_request", "server-events",
"sound.eos-sequence")
Expand All @@ -934,6 +936,7 @@ def make_hello(self, source):
"printer.attributes" : ("printer-info", "device-uri"),
"start-new-commands" : self.start_new_commands,
"exit-with-children" : self.exit_with_children,
"av-sync.enabled" : self.av_sync,
})
for x in self.get_server_features():
capabilities[x] = True
Expand Down
26 changes: 24 additions & 2 deletions src/xpra/server/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
filelog = Logger("file")
timeoutlog = Logger("timeout")
proxylog = Logger("proxy")
avsynclog = Logger("av-sync")


from xpra.server import ClientException
Expand Down Expand Up @@ -50,6 +51,7 @@
GRACE_PERCENT = int(os.environ.get("XPRA_GRACE_PERCENT", "90"))
except:
GRACE_PERCENT = 90
AV_SYNC_DELTA = int(os.environ.get("XPRA_AV_SYNC_DELTA", "0"))


def make_window_metadata(window, propname, get_transient_for=None, get_window_id=None):
Expand Down Expand Up @@ -155,7 +157,7 @@ def __init__(self, protocol, disconnect_cb, idle_add, timeout_add, source_remove
idle_timeout, idle_timeout_cb, idle_grace_timeout_cb, socket_dir,
get_transient_for, get_focus, get_cursor_data_cb,
get_window_id,
supports_mmap,
supports_mmap, av_sync,
core_encodings, encodings, default_encoding, scaling_control,
sound_source_plugin,
supports_speaker, supports_microphone,
Expand All @@ -166,7 +168,7 @@ def __init__(self, protocol, disconnect_cb, idle_add, timeout_add, source_remove
idle_timeout, idle_timeout_cb, idle_grace_timeout_cb, socket_dir,
get_transient_for, get_focus,
get_window_id,
supports_mmap,
supports_mmap, av_sync,
core_encodings, encodings, default_encoding, scaling_control,
sound_source_plugin,
supports_speaker, supports_microphone,
Expand Down Expand Up @@ -209,6 +211,8 @@ def __init__(self, protocol, disconnect_cb, idle_add, timeout_add, source_remove
self.sound_source_sequence = 0
self.sound_source = None
self.sound_sink = None
self.av_sync = av_sync
self.av_sync_delay = 0

self.server_core_encodings = core_encodings
self.server_encodings = encodings
Expand Down Expand Up @@ -606,8 +610,11 @@ def parse_batch_int(value, varname):
self.sound_receive = c.boolget("sound.receive")
self.sound_send = c.boolget("sound.send")
self.server_driven = c.boolget("sound.server_driven")
av_sync = c.boolget("av-sync")
self.set_av_sync_delay(int(self.av_sync and av_sync) * c.intget("av-sync.delay.default", 150))
soundlog("pulseaudio id=%s, server=%s, sound decoders=%s, sound encoders=%s, receive=%s, send=%s",
self.pulseaudio_id, self.pulseaudio_server, self.sound_decoders, self.sound_encoders, self.sound_receive, self.sound_send)
avsynclog("av-sync: server=%s, client=%s, delay=%s", self.av_sync, av_sync, self.av_sync_delay)

log("cursors=%s, bell=%s, notifications=%s", self.send_cursors, self.send_bell, self.send_notifications)
log("client uuid %s", self.uuid)
Expand Down Expand Up @@ -889,6 +896,10 @@ def fadeout():
elif action=="new-sequence":
self.sound_source_sequence = int(args[0])
return "new sequence is %s" % self.sound_source_sequence
elif action=="sync":
assert self.av_sync, "av-sync is not enabled"
self.set_av_sync_delay(args[0])
return "av-sync delay set to %ims" % self.av_sync_delay
#elif action=="quality":
# assert self.sound_source
# quality = args[0]
Expand Down Expand Up @@ -941,6 +952,15 @@ def sink_clean():
return
self.sound_sink.add_data(data, metadata)

def set_av_sync_delay(self, v):
#update all window sources with the given delay
assert self.av_sync, "av-sync is not enabled"
self.av_sync_delay = min(1000, max(0, int(v) + AV_SYNC_DELTA))
avsynclog("av-sync set to %ims (from value=%s and delta=%s)", self.av_sync_delay, v, AV_SYNC_DELTA)
for ws in self.window_sources.values():
ws.av_sync_delay = v


def set_screen_sizes(self, screen_sizes):
self.screen_sizes = screen_sizes or []
log("client screen sizes: %s", screen_sizes)
Expand Down Expand Up @@ -1167,6 +1187,7 @@ def up(prefix, d):
up("encoding", self.default_encoding_options)
up("encoding", self.encoding_options)
up("connection", self.protocol.get_info())
up("av-sync", {"delay" :self.av_sync_delay})
info.update(self.get_sound_info())
info.update(self.get_features_info())
info.update(self.get_screen_info())
Expand Down Expand Up @@ -1650,6 +1671,7 @@ def make_window_source(self, wid, window):
ws = WindowVideoSource(self.queue_size, self.call_in_encode_thread, self.queue_packet, self.compressed_wrapper,
self.statistics,
wid, window, batch_config, self.auto_refresh_delay,
self.av_sync, self.av_sync_delay,
self.video_helper,
self.server_core_encodings, self.server_encodings,
self.encoding, self.encodings, self.core_encodings, self.encoding_options, self.rgb_formats,
Expand Down
80 changes: 72 additions & 8 deletions src/xpra/server/window_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
scalinglog = Logger("scaling")
iconlog = Logger("icon")
deltalog = Logger("delta")
avsynclog = Logger("av-sync")


AUTO_REFRESH_THRESHOLD = int(os.environ.get("XPRA_AUTO_REFRESH_THRESHOLD", 100))
Expand Down Expand Up @@ -72,6 +73,7 @@ def staticinit(idle_add, timeout_add, source_remove):
def __init__(self, queue_size, call_in_encode_thread, queue_packet, compressed_wrapper,
statistics,
wid, window, batch_config, auto_refresh_delay,
av_sync, av_sync_delay,
video_helper,
server_core_encodings, server_encodings,
encoding, encodings, core_encodings, encoding_options, rgb_formats,
Expand All @@ -90,6 +92,9 @@ def __init__(self, queue_size, call_in_encode_thread, queue_packet, compressed_w
self.wid = wid
self.global_statistics = statistics #shared/global statistics from ServerSource
self.statistics = WindowPerformanceStatistics()
self.av_sync = av_sync
self.av_sync_delay = av_sync_delay
self.encode_queue = []

self.server_core_encodings = server_core_encodings
self.server_encodings = server_encodings
Expand Down Expand Up @@ -662,7 +667,7 @@ def cancel_damage(self):
damage requests for a window.
Damage methods will check this value via 'is_cancelled(sequence)'.
"""
log("cancel_damage() wid=%s, dropping delayed region %s and all sequences up to %s", self.wid, self._damage_delayed, self._sequence)
log("cancel_damage() wid=%s, dropping delayed region %s, %s queued encodes, and all sequences up to %s", self.wid, self._damage_delayed, len(self.encode_queue), self._sequence)
#for those in flight, being processed in separate threads, drop by sequence:
self._damage_cancelled = self._sequence
self.cancel_expire_timer()
Expand All @@ -671,6 +676,11 @@ def cancel_damage(self):
self.cancel_timeout_timer()
#if a region was delayed, we can just drop it now:
self.refresh_regions = []
eq = self.encode_queue
if eq:
self.encode_queue = []
for item in eq:
self.free_image_wrapper(item[6])
self._damage_delayed = None
self._damage_delayed_expired = False
self.delta_pixel_data = [None for _ in range(self.delta_buckets)]
Expand Down Expand Up @@ -1139,6 +1149,7 @@ def free_image_wrapper(self, image):
""" when not running in the UI thread,
call this method to free an image wrapper safely
"""
log("free_image_wrapper(%s) thread_safe=%s", image, image.is_thread_safe())
if image.is_thread_safe():
image.free()
else:
Expand All @@ -1150,8 +1161,11 @@ def process_damage_region(self, damage_time, window, x, y, w, h, coding, options
Called by 'damage' or 'send_delayed_regions' to process a damage region.
Actual damage region processing:
we extract the rgb data from the pixmap and place it on the damage queue,
so the damage thread will call make_data_packet_cb which does the actual compression
we extract the rgb data from the pixmap and:
* if doing av-sync, we place the data on the encode queue with a timer,
when the timer fires, we queue the work for the damage thread
* without av-sync, we just queue the work immediately
The damage thread will call make_data_packet_cb which does the actual compression.
This runs in the UI thread.
"""
if w==0 or h==0:
Expand All @@ -1176,15 +1190,65 @@ def process_damage_region(self, damage_time, window, x, y, w, h, coding, options
return

now = time.time()
log("process_damage_regions: wid=%s, adding %s pixel data to queue, elapsed time: %.1f ms, request time: %.1f ms",
self.wid, coding, 1000*(now-damage_time), 1000*(now-rgb_request_time))
self.statistics.encoding_pending[sequence] = (damage_time, w, h)
self.call_in_encode_thread(self.make_data_packet_cb, window, damage_time, now, self.wid, image, coding, sequence, options)
log("process_damage_regions: wid=%i, adding pixel data to encode queue (%ix%i - %s), elapsed time: %.1f ms, request time: %.1f ms",
self.wid, w, h, coding, 1000*(now-damage_time), 1000*(now-rgb_request_time))
item = (window, damage_time, w, h, now, self.wid, image, coding, sequence, options)
av_delay = self.av_sync_delay*int(options.get("av-sync", False))
if av_delay==0:
self.make_data_packet_cb(*item)
else:
#schedule encode via queue:
self.encode_queue.append(item)
self.timeout_add(av_delay, self.call_in_encode_thread, self.encode_from_queue)

def encode_from_queue(self):
#note: we use a queue here to ensure we preserve the order
#(so we encode frames in the same order they were grabbed)
avsynclog("encode_from_queue: %s items", len(self.encode_queue))
if not self.encode_queue:
return #nothing to encode, must have been picked off already
#find the first item which is due
#in seconds, same as time.time():
av_delay = 200/1000.0
now = time.time()
still_due = []
pop = None
try:
for index,item in enumerate(self.encode_queue):
ts = item[4]
due = ts + av_delay
if due<now and pop is None:
#found an item which is due
pop = index
avsynclog("encode_from_queue: processing item %s/%s (overdue by %ims)", index+1, len(self.encode_queue), int(1000*(now-due)))
self.make_data_packet_cb(*item)
else:
#we only process only one item per call
#and just keep track of extra ones:
still_due.append(due)
except Exception as e:
avsynclog.error("error processing encode queue: %s", e, exc_info=True)
if pop is not None:
self.encode_queue.pop(pop)
return
#README: encode_from_queue is scheduled to run every time we add an item
#to the encode_queue, but since the av_delay can change it is possible
#for us to not pop() any items from the list sometimes, and therefore we must ensure
#we run this method again later when the items are actually due,
#so we need to calculate when that is:
if len(still_due)==0:
avsynclog("encode_from_queue: nothing due")
return
first_due = int(max(0, min(still_due)-time.time())*1000)
avsynclog("encode_from_queue: first due in %ims, due list=%s", first_due, still_due)
self.timeout_add(first_due, self.call_in_encode_thread, self.encode_from_queue)

def make_data_packet_cb(self, window, damage_time, process_damage_time, wid, image, coding, sequence, options):

def make_data_packet_cb(self, window, w, h, damage_time, process_damage_time, wid, image, coding, sequence, options):
""" This function is called from the damage data thread!
Extra care must be taken to prevent access to X11 functions on window.
"""
self.statistics.encoding_pending[sequence] = (damage_time, w, h)
try:
packet = self.make_data_packet(damage_time, process_damage_time, wid, image, coding, sequence, options)
finally:
Expand Down
Loading

0 comments on commit d638401

Please sign in to comment.