Skip to content

Commit

Permalink
* refactor av-sync code into window video source where it belongs
Browse files Browse the repository at this point in the history
* when encoding as video, request the image just once and use a sub-image for the edges (fixes performance with shadow servers)

git-svn-id: https://xpra.org/svn/Xpra/trunk@12950 3bb7dfac-3a0b-4e04-842a-767bc560f471
  • Loading branch information
totaam committed Jul 3, 2016
1 parent a97bd35 commit 4da9143
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 96 deletions.
84 changes: 4 additions & 80 deletions src/xpra/server/window/window_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -1373,6 +1373,7 @@ def process_damage_region(self, damage_time, x, y, w, h, coding, options, flush=
This runs in the UI thread.
"""
assert self.ui_thread == threading.current_thread()
assert coding is not None
if w==0 or h==0:
return
if not self.window.is_managed():
Expand All @@ -1384,7 +1385,6 @@ def process_damage_region(self, damage_time, x, y, w, h, coding, options, flush=
log("get_window_pixmap: dropping damage request with sequence=%s", sequence)
return

assert coding is not None
rgb_request_time = time.time()
image = self.window.get_image(x, y, w, h, logger=log)
if image is None:
Expand All @@ -1397,85 +1397,9 @@ def process_damage_region(self, damage_time, x, y, w, h, coding, options, flush=

now = time.time()
item = (w, h, damage_time, now, image, coding, sequence, options, flush)
if self.must_freeze(coding, options):
newstride = image.get_width()*4
if not image.restride(newstride):
avsynclog("Warning: failed to freeze image pixels for:")
avsynclog(" %s", image)
av_delay = self.get_frame_encode_delay(options)
log("process_damage_region: wid=%i, adding pixel data to encode queue (%ix%i - %s), elapsed time: %.1f ms, request time: %.1f ms, frame delay=%ims",
self.wid, w, h, coding, 1000*(now-damage_time), 1000*(now-rgb_request_time), av_delay)
if av_delay<0:
self.call_in_encode_thread(True, self.make_data_packet_cb, *item)
else:
self.encode_queue.append(item)
self.timeout_add(av_delay, self.call_in_encode_thread, True, self.encode_from_queue)

def must_freeze(self, coding, options):
return options.get("av-sync", False)

def get_frame_encode_delay(self, options):
if options.get("av-sync", False):
return -1
l = len(self.encode_queue)
if l>=self.encode_queue_max_size:
#we must free some space!
return 0
return self.av_sync_delay

def encode_from_queue(self):
#note: we use a queue here to ensure we preserve the order
#(so we encode frames in the same order they were grabbed)
eq = self.encode_queue
avsynclog("encode_from_queue: %s items", len(eq))
if not eq:
return #nothing to encode, must have been picked off already
self.update_av_sync_delay()
#find the first item which is due
#in seconds, same as time.time():
av_delay = self.av_sync_delay/1000.0
if len(self.encode_queue)>=self.encode_queue_max_size:
av_delay = 0 #we must free some space!
now = time.time()
still_due = []
pop = None
index = 0
item = None
try:
for index,item in enumerate(eq):
#item = (w, h, damage_time, now, image, coding, sequence, options, flush)
sequence = item[6]
if self.is_cancelled(sequence):
self.free_image_wrapper(item[4])
continue
ts = item[3]
due = ts + av_delay
if due<now and pop is None:
#found an item which is due
pop = index
avsynclog("encode_from_queue: processing item %s/%s (overdue by %ims)", index+1, len(self.encode_queue), int(1000*(now-due)))
self.make_data_packet_cb(*item)
else:
#we only process only one item per call
#and just keep track of extra ones:
still_due.append(due)
except Exception:
avsynclog.error("error processing encode queue at index %i", index)
avsynclog.error("item=%s", item, exc_info=True)
if pop is not None:
eq.pop(pop)
return
#README: encode_from_queue is scheduled to run every time we add an item
#to the encode_queue, but since the av_delay can change it is possible
#for us to not pop() any items from the list sometimes, and therefore we must ensure
#we run this method again later when the items are actually due,
#so we need to calculate when that is:
if len(still_due)==0:
avsynclog("encode_from_queue: nothing due")
return
first_due = int(max(0, min(still_due)-time.time())*1000)
avsynclog("encode_from_queue: first due in %ims, due list=%s", first_due, still_due)
self.timeout_add(first_due, self.call_in_encode_thread, True, self.encode_from_queue)
self.call_in_encode_thread(True, self.make_data_packet_cb, *item)
log("process_damage_region: wid=%i, adding pixel data to encode queue (%4ix%-4i - %5s), elapsed time: %.1f ms, request time: %.1f ms",
self.wid, w, h, coding, 1000*(now-damage_time), 1000*(now-rgb_request_time))


def make_data_packet_cb(self, w, h, damage_time, process_damage_time, image, coding, sequence, options, flush):
Expand Down
151 changes: 135 additions & 16 deletions src/xpra/server/window/window_video_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import os
import time
import operator
import threading

from xpra.net.compression import Compressed, LargeStructure
from xpra.codecs.codec_constants import get_subsampling_divs, \
Expand Down Expand Up @@ -46,6 +47,7 @@ def envint(name, d):
SCALING_HARDCODED = parse_scaling_value(os.environ.get("XPRA_SCALING_HARDCODED", ""))

VIDEO_SUBREGION = envint("XPRA_VIDEO_SUBREGION", 1)==1
FORCE_AV_DELAY = envint("XPRA_FORCE_AV_DELAY", 0)
B_FRAMES = envint("XPRA_B_FRAMES", 1)==1
VIDEO_SKIP_EDGE = envint("XPRA_VIDEO_SKIP_EDGE", 0)==1
SCROLL_ENCODING = envint("XPRA_SCROLL_ENCODING", 0)==1
Expand Down Expand Up @@ -661,28 +663,145 @@ def send_nonvideo(regions=regions, encoding=coding, exclude_region=None, get_bes
sublog("send_delayed_regions: delaying non video regions %s some more by %ims", regions, delay)
self.expire_timer = self.timeout_add(int(delay), self.expire_delayed_region, delay)

def must_encode_full_frame(self, encoding):
return self.full_frames_only or (encoding in self.video_encodings) or not self.non_video_encodings

def process_damage_region(self, damage_time, x, y, w, h, coding, options, flush=None):

def process_damage_region(self, damage_time, x, y, w, h, coding, options, flush=0):
"""
Called by 'damage' or 'send_delayed_regions' to process a damage region.
Actual damage region processing:
we extract the rgb data from the pixmap and:
* if doing av-sync, we place the data on the encode queue with a timer,
when the timer fires, we queue the work for the damage thread
* without av-sync, we just queue the work immediately
The damage thread will call make_data_packet_cb which does the actual compression.
This runs in the UI thread.
"""
assert self.ui_thread == threading.current_thread()
assert coding is not None
if w==0 or h==0:
return
if not self.window.is_managed():
log("the window %s is not composited!?", self.window)
return
sequence = self._sequence
if self.is_cancelled(sequence):
log("get_window_pixmap: dropping damage request with sequence=%s", sequence)
return

rgb_request_time = time.time()
image = self.window.get_image(x, y, w, h, logger=log)
if image is None:
log("get_window_pixmap: no pixel data for window %s, wid=%s", self.window, self.wid)
return
if self.is_cancelled(sequence):
image.free()
return
self.pixel_format = image.get_pixel_format()

must_freeze = options.get("av-sync", False) or coding in self.video_encodings
if must_freeze:
newstride = image.get_width()*4
if not image.restride(newstride):
avsynclog("Warning: failed to freeze image pixels for:")
avsynclog(" %s", image)
av_delay = self.get_frame_encode_delay(options)
def call_encode(ew, eh, eimage, encoding, eflush):
self._sequence += 1
sequence = self._sequence
if self.is_cancelled(sequence):
log("get_window_pixmap: dropping damage request with sequence=%s", sequence)
return
now = time.time()
log("process_damage_region: wid=%i, adding pixel data to encode queue (%4ix%-4i - %5s), elapsed time: %.1f ms, request time: %.1f ms, frame delay=%ims",
self.wid, ew, eh, encoding, 1000*(now-damage_time), 1000*(now-rgb_request_time), av_delay)
item = (ew, eh, damage_time, now, eimage, encoding, sequence, options, eflush)
if av_delay<0:
self.call_in_encode_thread(True, self.make_data_packet_cb, *item)
else:
self.encode_queue.append(item)
self.timeout_add(av_delay, self.call_in_encode_thread, True, self.encode_from_queue)
#now figure out if we need to send edges separately:
if coding in self.video_encodings:
if coding in self.video_encodings and self.edge_encoding and not VIDEO_SKIP_EDGE:
dw = w - (w & self.width_mask)
dh = h - (h & self.height_mask)
else:
dw, dh = 0, 0
if self.edge_encoding and not VIDEO_SKIP_EDGE:
if dw>0:
WindowSource.process_damage_region(self, damage_time, x+w-dw, y, dw, h, self.edge_encoding, options, flush=1+int(dh>0))
sub = image.get_sub_image(w-dw, 0, dw, h)
call_encode(dw, h, sub, self.edge_encoding, flush+1+int(dh>0))
w = w & self.width_mask
if dh>0:
WindowSource.process_damage_region(self, damage_time, x, y+h-dh, w-dw, dh, self.edge_encoding, options, flush=1)
#use the unmasked dimensions to prevent us restriding for nothing:
WindowSource.process_damage_region(self, damage_time, x, y, w, h, coding, options, flush=flush)


def must_freeze(self, coding, options):
return WindowSource.must_freeze(self, coding, options) or coding in self.video_encodings

def must_encode_full_frame(self, encoding):
return self.full_frames_only or (encoding in self.video_encodings) or not self.non_video_encodings
sub = image.get_sub_image(0, h-dh, w, dh)
call_encode(dw, h, sub, self.edge_encoding, flush+1)
h = h & self.height_mask
#the main area:
call_encode(w, h, image, coding, flush)

def get_frame_encode_delay(self, options):
if FORCE_AV_DELAY>0:
return FORCE_AV_DELAY
if options.get("av-sync", False):
return -1
l = len(self.encode_queue)
if l>=self.encode_queue_max_size:
#we must free some space!
return 0
return self.av_sync_delay

def encode_from_queue(self):
#note: we use a queue here to ensure we preserve the order
#(so we encode frames in the same order they were grabbed)
eq = self.encode_queue
avsynclog("encode_from_queue: %s items", len(eq))
if not eq:
return #nothing to encode, must have been picked off already
self.update_av_sync_delay()
#find the first item which is due
#in seconds, same as time.time():
av_delay = self.av_sync_delay/1000.0
if len(self.encode_queue)>=self.encode_queue_max_size:
av_delay = 0 #we must free some space!
now = time.time()
still_due = []
pop = None
index = 0
item = None
try:
for index,item in enumerate(eq):
#item = (w, h, damage_time, now, image, coding, sequence, options, flush)
sequence = item[6]
if self.is_cancelled(sequence):
self.free_image_wrapper(item[4])
continue
ts = item[3]
due = ts + av_delay
if due<now and pop is None:
#found an item which is due
pop = index
avsynclog("encode_from_queue: processing item %s/%s (overdue by %ims)", index+1, len(self.encode_queue), int(1000*(now-due)))
self.make_data_packet_cb(*item)
else:
#we only process only one item per call
#and just keep track of extra ones:
still_due.append(due)
except Exception:
avsynclog.error("error processing encode queue at index %i", index)
avsynclog.error("item=%s", item, exc_info=True)
if pop is not None:
eq.pop(pop)
return
#README: encode_from_queue is scheduled to run every time we add an item
#to the encode_queue, but since the av_delay can change it is possible
#for us to not pop() any items from the list sometimes, and therefore we must ensure
#we run this method again later when the items are actually due,
#so we need to calculate when that is:
if len(still_due)==0:
avsynclog("encode_from_queue: nothing due")
return
first_due = int(max(0, min(still_due)-time.time())*1000)
avsynclog("encode_from_queue: first due in %ims, due list=%s", first_due, still_due)
self.timeout_add(first_due, self.call_in_encode_thread, True, self.encode_from_queue)


def update_encoding_options(self, force_reload=False):
Expand Down

0 comments on commit 4da9143

Please sign in to comment.