Skip to content

Commit

Permalink
#3706 gstreamer decoder
Browse files Browse the repository at this point in the history
hard-coded for vp8 for now - just a PoC
  • Loading branch information
totaam committed Dec 14, 2022
1 parent d9a6b2e commit ad6d6c2
Show file tree
Hide file tree
Showing 5 changed files with 279 additions and 122 deletions.
132 changes: 132 additions & 0 deletions xpra/codecs/gstreamer/codec_common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
# This file is part of Xpra.
# Copyright (C) 2014-2022 Antoine Martin <[email protected]>
# Xpra is released under the terms of the GNU GPL v2, or, at your option, any
# later version. See the file COPYING for details.

from queue import Queue, Empty

from xpra.util import typedict
from xpra.gst_common import import_gst
from xpra.gst_pipeline import Pipeline, GST_FLOW_OK
from xpra.log import Logger

Gst = import_gst()
log = Logger("encoder", "gstreamer")


def get_version():
return (5, 0)

def get_type():
return "gstreamer"

def get_info():
return {"version" : get_version()}

def init_module():
log("gstreamer.init_module()")

def cleanup_module():
log("gstreamer.cleanup_module()")


class VideoPipeline(Pipeline):
__generic_signals__ = Pipeline.__generic_signals__.copy()
"""
Dispatch video encoding or decoding to a gstreamer pipeline
"""
def init_context(self, encoding, width, height, colorspace, options=None):
options = typedict(options or {})
self.encoding = encoding
self.width = width
self.height = height
self.colorspace = colorspace
self.frames = 0
self.frame_queue = Queue()
self.pipeline_str = ""
self.create_pipeline(options)
self.src = self.pipeline.get_by_name("src")
self.src.set_property("format", Gst.Format.TIME)
#self.src.set_caps(Gst.Caps.from_string(CAPS))
self.sink = self.pipeline.get_by_name("sink")
self.sink.connect("new-sample", self.on_new_sample)
self.sink.connect("new-preroll", self.on_new_preroll)
self.start()

def create_pipeline(self, options):
raise NotImplementedError()

def on_message(self, bus, message):
if message.type == Gst.MessageType.NEED_CONTEXT and self.pipeline_str.find("vaapi")>=0:
log("vaapi is requesting a context")
return GST_FLOW_OK
return super().on_message(bus, message)

def on_new_preroll(self, _appsink):
log("new-preroll")
return GST_FLOW_OK

def process_buffer(self, buf):
r = self.src.emit("push-buffer", buf)
if r!=GST_FLOW_OK:
log.error("Error: unable to push image buffer")
return None
try:
r = self.frame_queue.get(timeout=2 if self.frames==0 else 0.5)
self.frames += 1
return r
except Empty:
log.error("Error: frame queue timeout")
return None


def get_info(self) -> dict:
info = get_info()
if self.colorspace is None:
return info
info.update({
"frames" : self.frames,
"width" : self.width,
"height" : self.height,
"encoding" : self.encoding,
"colorspace": self.colorspace,
"version" : get_version(),
})
return info

def __repr__(self):
if self.colorspace is None:
return "gstreamer(uninitialized)"
return f"gstreamer({self.colorspace} - {self.width}x{self.height})"

def is_ready(self):
return self.colorspace is not None

def is_closed(self):
return self.colorspace is None


def get_encoding(self):
return self.encoding

def get_width(self):
return self.width

def get_height(self):
return self.height

def get_type(self):
return "gstreamer"

def clean(self):
super().stop()
self.width = 0
self.height = 0
self.colorspace = None
self.encoding = ""
self.dst_formats = []
self.frames = 0


def do_emit_info(self):
pass
111 changes: 111 additions & 0 deletions xpra/codecs/gstreamer/decoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# This file is part of Xpra.
# Copyright (C) 2014-2022 Antoine Martin <[email protected]>
# Xpra is released under the terms of the GNU GPL v2, or, at your option, any
# later version. See the file COPYING for details.

from xpra.gst_common import STREAM_TYPE, GST_FORMAT_BYTES, make_buffer
from xpra.gst_pipeline import GST_FLOW_OK
from xpra.codecs.gstreamer.codec_common import (
VideoPipeline,
get_version, get_type, get_info,
init_module, cleanup_module,
)
from xpra.util import roundup
from xpra.log import Logger
from gi.repository import GObject
from xpra.codecs.image_wrapper import ImageWrapper

log = Logger("decoder", "gstreamer")

assert get_version and get_type and init_module and cleanup_module


def get_encodings():
return ("vp8", )

def get_input_colorspaces(encoding):
assert encoding in get_encodings()
return ("YUV420P", )
#return ("YUV420P", "BGRX", )

def get_output_colorspace(encoding, input_colorspace):
assert encoding in get_encodings()
assert input_colorspace in get_input_colorspaces(encoding)
return "YUV420P"


class Decoder(VideoPipeline):
__gsignals__ = VideoPipeline.__generic_signals__.copy()
"""
Dispatch video decoding to a gstreamer pipeline
"""
def create_pipeline(self, options):
if self.encoding not in get_encodings():
raise ValueError(f"invalid encoding {self.encoding!r}")
self.dst_formats = options.strtupleget("dst-formats")
gst_rgb_format = "I420"
#STREAM_CAPS = f"caps=video/x-vp8,width={self.width},height={self.height}"
STREAM_CAPS = f"video/x-vp8,width={self.width},height={self.height}"
IMAGE_CAPS = f"video/x-raw,width={self.width},height={self.height},format=(string){gst_rgb_format}"
elements = [
#"do-timestamp=1",
f"appsrc name=src emit-signals=1 block=0 is-live=1 do-timestamp=1 stream-type={STREAM_TYPE} format={GST_FORMAT_BYTES} caps={STREAM_CAPS}",
"vp8dec",
#avdec_vp8
#"h264parse",
#"avdec_h264",
#video/x-h264,stream-format=avc,alignment=au
"videoconvert",
#mp4mux
f"appsink name=sink emit-signals=true max-buffers=10 drop=true sync=false async=false qos=false caps={IMAGE_CAPS}",
]
if not self.setup_pipeline_and_bus(elements):
raise RuntimeError("failed to setup gstreamer pipeline")

def get_colorspace(self):
return self.colorspace

def on_new_sample(self, _bus):
sample = self.sink.emit("pull-sample")
buf = sample.get_buffer()
size = buf.get_size()
log("on_new_sample size=%s", size)
if size:
mem = memoryview(buf.extract_dup(0, size))
#I420 gstreamer definition:
Ystride = roundup(self.width, 4)
Ysize = Ystride*roundup(self.height, 2)
UVstride = roundup(roundup(self.width, 2)//2, 4)
UVsize = UVstride*roundup(self.height, 2)//2
Y = mem[:Ysize]
U = mem[Ysize:Ysize+UVsize]
V = mem[Ysize+UVsize:Ysize+2*UVsize]
strides = (Ystride, UVstride, UVstride)
image = ImageWrapper(0, 0, self.width, self.height, (Y, U, V), "YUV420P", 24, strides, 3, ImageWrapper.PLANAR_3)
self.frame_queue.put(image)
return GST_FLOW_OK


def decompress_image(self, data, options=None):
log(f"decompress_image(..) state={self.state} data size={len(data)}")
if self.state in ("stopped", "error"):
log(f"pipeline is in {self.state} state, dropping buffer")
return None
buf = make_buffer(data)
#duration = normv(0)
#if duration>0:
# buf.duration = duration
#buf.size = size
#buf.timestamp = timestamp
#buf.offset = offset
#buf.offset_end = offset_end
return self.process_buffer(buf)

GObject.type_register(Decoder)


def selftest(full=False):
log("gstreamer decoder selftest: %s", get_info())
from xpra.codecs.codec_checks import testdecoder
from xpra.codecs.gstreamer import decoder
assert testdecoder(decoder, full)
Loading

0 comments on commit ad6d6c2

Please sign in to comment.