From ad6d6c2d49275e547ebf362c17fcd53360675821 Mon Sep 17 00:00:00 2001 From: Antoine Martin Date: Thu, 15 Dec 2022 00:04:17 +0700 Subject: [PATCH] #3706 gstreamer decoder hard-coded for vp8 for now - just a PoC --- xpra/codecs/gstreamer/codec_common.py | 132 +++++++++++++++++++++++++ xpra/codecs/gstreamer/decoder.py | 111 +++++++++++++++++++++ xpra/codecs/gstreamer/encoder.py | 136 +++++--------------------- xpra/codecs/loader.py | 9 +- xpra/codecs/video_helper.py | 13 ++- 5 files changed, 279 insertions(+), 122 deletions(-) create mode 100644 xpra/codecs/gstreamer/codec_common.py create mode 100644 xpra/codecs/gstreamer/decoder.py diff --git a/xpra/codecs/gstreamer/codec_common.py b/xpra/codecs/gstreamer/codec_common.py new file mode 100644 index 0000000000..40493c2fec --- /dev/null +++ b/xpra/codecs/gstreamer/codec_common.py @@ -0,0 +1,132 @@ +# This file is part of Xpra. +# Copyright (C) 2014-2022 Antoine Martin +# Xpra is released under the terms of the GNU GPL v2, or, at your option, any +# later version. See the file COPYING for details. + +from queue import Queue, Empty + +from xpra.util import typedict +from xpra.gst_common import import_gst +from xpra.gst_pipeline import Pipeline, GST_FLOW_OK +from xpra.log import Logger + +Gst = import_gst() +log = Logger("encoder", "gstreamer") + + +def get_version(): + return (5, 0) + +def get_type(): + return "gstreamer" + +def get_info(): + return {"version" : get_version()} + +def init_module(): + log("gstreamer.init_module()") + +def cleanup_module(): + log("gstreamer.cleanup_module()") + + +class VideoPipeline(Pipeline): + __generic_signals__ = Pipeline.__generic_signals__.copy() + """ + Dispatch video encoding or decoding to a gstreamer pipeline + """ + def init_context(self, encoding, width, height, colorspace, options=None): + options = typedict(options or {}) + self.encoding = encoding + self.width = width + self.height = height + self.colorspace = colorspace + self.frames = 0 + self.frame_queue = Queue() + self.pipeline_str = "" + self.create_pipeline(options) + self.src = self.pipeline.get_by_name("src") + self.src.set_property("format", Gst.Format.TIME) + #self.src.set_caps(Gst.Caps.from_string(CAPS)) + self.sink = self.pipeline.get_by_name("sink") + self.sink.connect("new-sample", self.on_new_sample) + self.sink.connect("new-preroll", self.on_new_preroll) + self.start() + + def create_pipeline(self, options): + raise NotImplementedError() + + def on_message(self, bus, message): + if message.type == Gst.MessageType.NEED_CONTEXT and self.pipeline_str.find("vaapi")>=0: + log("vaapi is requesting a context") + return GST_FLOW_OK + return super().on_message(bus, message) + + def on_new_preroll(self, _appsink): + log("new-preroll") + return GST_FLOW_OK + + def process_buffer(self, buf): + r = self.src.emit("push-buffer", buf) + if r!=GST_FLOW_OK: + log.error("Error: unable to push image buffer") + return None + try: + r = self.frame_queue.get(timeout=2 if self.frames==0 else 0.5) + self.frames += 1 + return r + except Empty: + log.error("Error: frame queue timeout") + return None + + + def get_info(self) -> dict: + info = get_info() + if self.colorspace is None: + return info + info.update({ + "frames" : self.frames, + "width" : self.width, + "height" : self.height, + "encoding" : self.encoding, + "colorspace": self.colorspace, + "version" : get_version(), + }) + return info + + def __repr__(self): + if self.colorspace is None: + return "gstreamer(uninitialized)" + return f"gstreamer({self.colorspace} - {self.width}x{self.height})" + + def is_ready(self): + return self.colorspace is not None + + def is_closed(self): + return self.colorspace is None + + + def get_encoding(self): + return self.encoding + + def get_width(self): + return self.width + + def get_height(self): + return self.height + + def get_type(self): + return "gstreamer" + + def clean(self): + super().stop() + self.width = 0 + self.height = 0 + self.colorspace = None + self.encoding = "" + self.dst_formats = [] + self.frames = 0 + + + def do_emit_info(self): + pass diff --git a/xpra/codecs/gstreamer/decoder.py b/xpra/codecs/gstreamer/decoder.py new file mode 100644 index 0000000000..2e886b1d8f --- /dev/null +++ b/xpra/codecs/gstreamer/decoder.py @@ -0,0 +1,111 @@ +# This file is part of Xpra. +# Copyright (C) 2014-2022 Antoine Martin +# Xpra is released under the terms of the GNU GPL v2, or, at your option, any +# later version. See the file COPYING for details. + +from xpra.gst_common import STREAM_TYPE, GST_FORMAT_BYTES, make_buffer +from xpra.gst_pipeline import GST_FLOW_OK +from xpra.codecs.gstreamer.codec_common import ( + VideoPipeline, + get_version, get_type, get_info, + init_module, cleanup_module, + ) +from xpra.util import roundup +from xpra.log import Logger +from gi.repository import GObject +from xpra.codecs.image_wrapper import ImageWrapper + +log = Logger("decoder", "gstreamer") + +assert get_version and get_type and init_module and cleanup_module + + +def get_encodings(): + return ("vp8", ) + +def get_input_colorspaces(encoding): + assert encoding in get_encodings() + return ("YUV420P", ) + #return ("YUV420P", "BGRX", ) + +def get_output_colorspace(encoding, input_colorspace): + assert encoding in get_encodings() + assert input_colorspace in get_input_colorspaces(encoding) + return "YUV420P" + + +class Decoder(VideoPipeline): + __gsignals__ = VideoPipeline.__generic_signals__.copy() + """ + Dispatch video decoding to a gstreamer pipeline + """ + def create_pipeline(self, options): + if self.encoding not in get_encodings(): + raise ValueError(f"invalid encoding {self.encoding!r}") + self.dst_formats = options.strtupleget("dst-formats") + gst_rgb_format = "I420" + #STREAM_CAPS = f"caps=video/x-vp8,width={self.width},height={self.height}" + STREAM_CAPS = f"video/x-vp8,width={self.width},height={self.height}" + IMAGE_CAPS = f"video/x-raw,width={self.width},height={self.height},format=(string){gst_rgb_format}" + elements = [ + #"do-timestamp=1", + f"appsrc name=src emit-signals=1 block=0 is-live=1 do-timestamp=1 stream-type={STREAM_TYPE} format={GST_FORMAT_BYTES} caps={STREAM_CAPS}", + "vp8dec", + #avdec_vp8 + #"h264parse", + #"avdec_h264", + #video/x-h264,stream-format=avc,alignment=au + "videoconvert", + #mp4mux + f"appsink name=sink emit-signals=true max-buffers=10 drop=true sync=false async=false qos=false caps={IMAGE_CAPS}", + ] + if not self.setup_pipeline_and_bus(elements): + raise RuntimeError("failed to setup gstreamer pipeline") + + def get_colorspace(self): + return self.colorspace + + def on_new_sample(self, _bus): + sample = self.sink.emit("pull-sample") + buf = sample.get_buffer() + size = buf.get_size() + log("on_new_sample size=%s", size) + if size: + mem = memoryview(buf.extract_dup(0, size)) + #I420 gstreamer definition: + Ystride = roundup(self.width, 4) + Ysize = Ystride*roundup(self.height, 2) + UVstride = roundup(roundup(self.width, 2)//2, 4) + UVsize = UVstride*roundup(self.height, 2)//2 + Y = mem[:Ysize] + U = mem[Ysize:Ysize+UVsize] + V = mem[Ysize+UVsize:Ysize+2*UVsize] + strides = (Ystride, UVstride, UVstride) + image = ImageWrapper(0, 0, self.width, self.height, (Y, U, V), "YUV420P", 24, strides, 3, ImageWrapper.PLANAR_3) + self.frame_queue.put(image) + return GST_FLOW_OK + + + def decompress_image(self, data, options=None): + log(f"decompress_image(..) state={self.state} data size={len(data)}") + if self.state in ("stopped", "error"): + log(f"pipeline is in {self.state} state, dropping buffer") + return None + buf = make_buffer(data) + #duration = normv(0) + #if duration>0: + # buf.duration = duration + #buf.size = size + #buf.timestamp = timestamp + #buf.offset = offset + #buf.offset_end = offset_end + return self.process_buffer(buf) + +GObject.type_register(Decoder) + + +def selftest(full=False): + log("gstreamer decoder selftest: %s", get_info()) + from xpra.codecs.codec_checks import testdecoder + from xpra.codecs.gstreamer import decoder + assert testdecoder(decoder, full) diff --git a/xpra/codecs/gstreamer/encoder.py b/xpra/codecs/gstreamer/encoder.py index 1b7afd850c..f989c36d14 100755 --- a/xpra/codecs/gstreamer/encoder.py +++ b/xpra/codecs/gstreamer/encoder.py @@ -4,15 +4,20 @@ # later version. See the file COPYING for details. import os -from queue import Queue, Empty +from queue import Empty -from xpra.util import typedict, parse_simple_dict +from xpra.util import parse_simple_dict from xpra.codecs.codec_constants import video_spec from xpra.gst_common import ( import_gst, normv, STREAM_TYPE, BUFFER_FORMAT, ) -from xpra.gst_pipeline import Pipeline, GST_FLOW_OK +from xpra.gst_pipeline import GST_FLOW_OK +from xpra.codecs.gstreamer.codec_common import ( + VideoPipeline, + get_version, get_type, get_info, + init_module, cleanup_module, + ) from xpra.log import Logger from gi.repository import GObject from xpra.codecs.image_wrapper import ImageWrapper @@ -20,6 +25,7 @@ Gst = import_gst() log = Logger("encoder", "gstreamer") +assert get_version and init_module and cleanup_module #ENCODER_PLUGIN = "vaapih264enc" #ENCODER_PLUGIN = "x264enc" ENCODER_PLUGIN = os.environ.get("XPRA_GSTREAMER_ENCODER_PLUGIN", "vaapih264enc") @@ -46,15 +52,6 @@ } -def get_version(): - return (5, 0) - -def get_type(): - return "gstreamer" - -def get_info(): - return {"version" : get_version()} - def get_encodings(): return ("h264", ) @@ -68,12 +65,6 @@ def get_output_colorspaces(encoding, input_colorspace): assert input_colorspace in get_input_colorspaces(encoding) return ("YUV420P", ) -def init_module(): - log("gstreamer.init_module()") - -def cleanup_module(): - log("gstreamer.cleanup_module()") - def get_spec(encoding, colorspace): assert encoding in get_encodings(), "invalid encoding: %s (must be one of %s" % (encoding, get_encodings()) @@ -89,24 +80,16 @@ def get_spec(encoding, colorspace): width_mask=0xFFFE, height_mask=0xFFFE, max_w=4096, max_h=4096) -class Encoder(Pipeline): - __gsignals__ = Pipeline.__generic_signals__.copy() +class Encoder(VideoPipeline): + __gsignals__ = VideoPipeline.__generic_signals__.copy() """ Dispatch video encoding to a gstreamer pipeline """ - def init_context(self, encoding, width, height, src_format, options=None): - options = typedict(options or {}) - if encoding not in get_encodings(): - raise ValueError(f"invalid encoding {encoding!r}") - self.encoding = encoding - self.width = width - self.height = height - self.src_format = src_format + def create_pipeline(self, options): + if self.encoding not in get_encodings(): + raise ValueError(f"invalid encoding {self.encoding!r}") self.dst_formats = options.strtupleget("dst-formats") - self.frames = 0 - self.pipeline_str = "" - self.frame_queue = Queue() - if src_format in ( + if self.colorspace in ( "NV12", "RGBA", "BGRA", "ARGB", "ABGR", "RGB", "BGR", @@ -115,7 +98,7 @@ def init_context(self, encoding, width, height, src_format, options=None): "BGRP", "RGBP", ): #identical name: - gst_rgb_format = src_format + gst_rgb_format = self.colorspace else: #translate to gstreamer name: gst_rgb_format = { @@ -126,7 +109,7 @@ def init_context(self, encoding, width, height, src_format, options=None): "XBGR" : "xBGR", "YUV400" : "GRAY8", #"RGB8P" - }[src_format] + }[self.colorspace] CAPS = f"video/x-raw,width={self.width},height={self.height},format=(string){gst_rgb_format},framerate=60/1,interlace=progressive" #parse encoder plugin string: parts = ENCODER_PLUGIN.split(" ", 1) @@ -148,79 +131,16 @@ def init_context(self, encoding, width, height, src_format, options=None): ] if not self.setup_pipeline_and_bus(elements): raise RuntimeError("failed to setup gstreamer pipeline") - self.src = self.pipeline.get_by_name("src") - self.src.set_property("format", Gst.Format.TIME) - #self.src.set_caps(Gst.Caps.from_string(CAPS)) - self.sink = self.pipeline.get_by_name("sink") - self.sink.connect("new-sample", self.on_new_sample) - self.sink.connect("new-preroll", self.on_new_preroll) - self.start() - def on_message(self, bus, message): - if message.type == Gst.MessageType.NEED_CONTEXT and ENCODER_PLUGIN.startswith("vaapi"): - log("vaapi is requesting a context") - return GST_FLOW_OK - return super().on_message(bus, message) - - def on_new_preroll(self, _appsink): - log("new-preroll") - return GST_FLOW_OK + def get_src_format(self): + return self.colorspace def get_info(self) -> dict: - info = get_info() - if self.src_format is None: - return info - info.update({ - "frames" : self.frames, - "width" : self.width, - "height" : self.height, - "encoding" : self.encoding, - "src_format": self.src_format, - "dst_formats" : self.dst_formats, - "version" : get_version(), - }) + info = super().get_info() + if self.dst_formats: + info["dst_formats"] = self.dst_formats return info - def __repr__(self): - if self.src_format is None: - return "gstreamer(uninitialized)" - return f"gstreamer({self.src_format} - {self.width}x{self.height})" - - def is_ready(self): - return self.src_format is not None - - def is_closed(self): - return self.src_format is None - - def get_encoding(self): - return self.encoding - - def get_width(self): - return self.width - - def get_height(self): - return self.height - - def get_type(self): - return "gstreamer" - - def get_src_format(self): - return self.src_format - - def clean(self): - super().stop() - self.width = 0 - self.height = 0 - self.src_format = None - self.encoding = "" - self.src_format = "" - self.dst_formats = [] - self.frames = 0 - - - def do_emit_info(self): - pass - def on_new_sample(self, _bus): sample = self.sink.emit("pull-sample") @@ -267,17 +187,7 @@ def compress_image(self, image, options=None): #buf.timestamp = timestamp #buf.offset = offset #buf.offset_end = offset_end - r = self.src.emit("push-buffer", buf) - if r!=GST_FLOW_OK: - log.error("Error: unable to push image buffer") - return None - try: - r = self.frame_queue.get(timeout=0.5) - self.frames += 1 - return r - except Empty: - log.error("Error: frame queue timeout") - return None + return self.process_buffer(buf) GObject.type_register(Encoder) diff --git a/xpra/codecs/loader.py b/xpra/codecs/loader.py index b0789513d8..9b05f1d304 100755 --- a/xpra/codecs/loader.py +++ b/xpra/codecs/loader.py @@ -16,7 +16,7 @@ #these codecs may well not load because we #do not require the libraries to be installed -NOWARN = ["nvenc", "nvdec", "enc_nvjpeg", "dec_nvjpeg", "nvfbc", "enc_x265", "enc_ffmpeg", "enc_vpl", "enc_gstreamer"] +NOWARN = ["nvenc", "nvdec", "enc_nvjpeg", "dec_nvjpeg", "nvfbc", "enc_x265", "enc_ffmpeg", "enc_vpl", "enc_gstreamer", "dec_gstreamer"] SELFTEST = envbool("XPRA_CODEC_SELFTEST", True) FULL_SELFTEST = envbool("XPRA_CODEC_FULL_SELFTEST", False) @@ -38,7 +38,7 @@ def filt(*values): CSC_CODECS = filt("csc_swscale", "csc_cython", "csc_libyuv") ENCODER_CODECS = filt("enc_rgb", "enc_pillow", "enc_spng", "enc_webp", "enc_jpeg", "enc_nvjpeg", "enc_avif") ENCODER_VIDEO_CODECS = filt("enc_vpx", "enc_x264", "enc_x265", "nvenc", "enc_ffmpeg", "enc_vpl", "enc_gstreamer") -DECODER_CODECS = filt("dec_pillow", "dec_spng", "dec_webp", "dec_jpeg", "dec_nvjpeg", "dec_avif") +DECODER_CODECS = filt("dec_pillow", "dec_spng", "dec_webp", "dec_jpeg", "dec_nvjpeg", "dec_avif", "dec_gstreamer") DECODER_VIDEO_CODECS = filt("dec_vpx", "dec_avcodec2", "nvdec") SOURCES = filt("v4l2", "evdi", "drm", "nvfbc") @@ -207,6 +207,7 @@ def xpra_codec_import(name, description, top_module, class_module, classname): "dec_vpx" : ("vpx decoder", "vpx", "decoder", "Decoder"), "dec_avcodec2" : ("avcodec2 decoder", "ffmpeg", "decoder", "Decoder"), "nvdec" : ("nvdec decoder", "nvidia.nvdec", "decoder", "Decoder"), + "dec_gstreamer" : ("gstreamer decoder", "gstreamer", "decoder", "Decoder"), #sources: "v4l2" : ("v4l2 source", "v4l2", "pusher", "Pusher"), "evdi" : ("evdi source", "evdi", "capture", "EvdiDevice"), @@ -390,7 +391,7 @@ def main(args): out = Logger("encoding") out.enable_debug() enable_color(format_string=NOPREFIX_FORMAT) - out("modules found:") + out.info("modules found:") #print("codec_status=%s" % codecs) for name in sorted(list_codecs): mod = codecs.get(name, "") @@ -427,7 +428,7 @@ def main(args): elif name in codec_errors: out.error(f"* {name.ljust(20)} : {codec_errors[name]}") out("") - out("codecs versions:") + out.info("codecs versions:") def forcever(v): return pver(v, numsep=".", strsep=".").lstrip("v") print_nested_dict(codec_versions, vformat=forcever, print_fn=out) diff --git a/xpra/codecs/video_helper.py b/xpra/codecs/video_helper.py index ddb9279d48..31cecd06e9 100755 --- a/xpra/codecs/video_helper.py +++ b/xpra/codecs/video_helper.py @@ -29,7 +29,8 @@ "jpeg" : "jpeg", "webp" : "webp", "nvjpeg" : "nvidia.nvjpeg", - "gstreamer" : "gstreamer", + "dec_gstreamer" : "gstreamer.decoder", + "enc_gstreamer" : "gstreamer.encoder", } def has_codec_module(module_name): @@ -52,14 +53,14 @@ def try_import_modules(*codec_names): #all the codecs we know about: #try to import the module that contains them (cheap check): -ALL_VIDEO_ENCODER_OPTIONS = try_import_modules("x264", "vpx", "x265", "nvenc", "ffmpeg", "nvjpeg", "jpeg", "webp", "gstreamer") +ALL_VIDEO_ENCODER_OPTIONS = try_import_modules("x264", "vpx", "x265", "nvenc", "ffmpeg", "nvjpeg", "jpeg", "webp", "enc_gstreamer", "dec_gstreamer") HARDWARE_ENCODER_OPTIONS = try_import_modules("nvenc", "nvjpeg") ALL_CSC_MODULE_OPTIONS = try_import_modules("swscale", "cython", "libyuv") NO_GFX_CSC_OPTIONS = [] ALL_VIDEO_DECODER_OPTIONS = try_import_modules("avcodec2", "vpx") -PREFERRED_ENCODER_ORDER = ("nvenc", "nvjpeg", "x264", "vpx", "jpeg", "webp", "x265", "gstreamer") -PREFERRED_DECODER_ORDER = ("avcodec2", "vpx") +PREFERRED_ENCODER_ORDER = ("nvenc", "nvjpeg", "x264", "vpx", "jpeg", "webp", "x265", "enc_gstreamer") +PREFERRED_DECODER_ORDER = ("avcodec2", "vpx", "dec_gstreamer") log("video_helper: ALL_VIDEO_ENCODER_OPTIONS=%s", ALL_VIDEO_ENCODER_OPTIONS) log("video_helper: ALL_CSC_MODULE_OPTIONS=%s", ALL_CSC_MODULE_OPTIONS) log("video_helper: NO_GFX_CSC_OPTIONS=%s", NO_GFX_CSC_OPTIONS) @@ -75,6 +76,8 @@ def get_encoder_module_name(x): return "enc_"+x #ie: "enc_x264" def get_decoder_module_name(x): + if x.find("dec")>=0: + return x #ie: "nvenc" or "enc_vpx" return "dec_"+x #ie: "dec_vpx" def get_csc_module_name(x): @@ -394,7 +397,7 @@ def get_server_full_csc_modes(self, *client_supported_csc_modes): (taking into account the decoder's actual output colorspace for each encoding) """ log("get_server_full_csc_modes(%s) decoder encodings=%s", - client_supported_csc_modes, self._video_decoder_specs.keys()) + client_supported_csc_modes, csv(self._video_decoder_specs.keys())) full_csc_modes = {} for encoding, encoding_specs in self._video_decoder_specs.items(): assert encoding_specs is not None