Skip to content

Commit

Permalink
[GStreamer][WebRTC] Incoming video track handling improvements
Browse files Browse the repository at this point in the history
https://bugs.webkit.org/show_bug.cgi?id=276419

Reviewed by Xabier Rodriguez-Calvar.

The h265.html test now consistently fails, due to bug #269285. Before this patch it was consistently
timing out, due to a caps negotiation issue triggered when the MediaStreamTrack was disabled. The
framerate not being set on the black video frames triggered variable framerate code paths, messing
up downstream elements.

After fixing the framerate issue, another bug surfaced, the buffers coming from the RealtimeIncoming
source had no video meta information, leading to glupload failing to handle frames and raising
errors. By using fakevideosink instead of appsink in the WebRTC pipeline for incoming video tracks
that are decoded the tee element will perform no allocation query shenanigans and the decoder will
correctly attach video metas to buffers.

* LayoutTests/platform/glib/TestExpectations:
* Source/WebCore/platform/mediastream/gstreamer/GStreamerMediaStreamSource.cpp:
* Source/WebCore/platform/mediastream/gstreamer/RealtimeIncomingSourceGStreamer.cpp:
(WebCore::RealtimeIncomingSourceGStreamer::configureAppSink):
(WebCore::RealtimeIncomingSourceGStreamer::configureFakeVideoSink):
(WebCore::RealtimeIncomingSourceGStreamer::handleDownstreamEvent):
(WebCore::RealtimeIncomingSourceGStreamer::registerClient):
* Source/WebCore/platform/mediastream/gstreamer/RealtimeIncomingSourceGStreamer.h:

Canonical link: https://commits.webkit.org/280855@main
  • Loading branch information
philn committed Jul 11, 2024
1 parent 9b632a4 commit aed5721
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 58 deletions.
2 changes: 1 addition & 1 deletion LayoutTests/platform/glib/TestExpectations
Original file line number Diff line number Diff line change
Expand Up @@ -2110,7 +2110,7 @@ webkit.org/b/187064 imported/w3c/web-platform-tests/webrtc/RTCPeerConnection-get

webrtc/video-av1.html [ Skip ]

webkit.org/b/269285 webrtc/h265.html [ Pass Timeout Failure ]
webkit.org/b/269285 webrtc/h265.html [ Failure ]

# Too slow with filtering implemented in WebKit. Should be done directly by GstWebRTC.
webrtc/datachannel/filter-ice-candidate.html [ Skip ]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,13 +349,13 @@ class InternalSource final : public MediaStreamTrackPrivateObserver,
trackEnded(m_track);
}

void pushSample(GRefPtr<GstSample>&& sample, const char* logMessage)
void pushSample(GRefPtr<GstSample>&& sample, const ASCIILiteral logMessage)
{
ASSERT(m_src);
if (!m_src || !m_isObserving)
return;

GST_TRACE_OBJECT(m_src.get(), "%s", logMessage);
GST_TRACE_OBJECT(m_src.get(), "%s", logMessage.characters());

bool drop = m_enoughData;
auto* buffer = gst_sample_get_buffer(sample.get());
Expand Down Expand Up @@ -475,7 +475,7 @@ class InternalSource final : public MediaStreamTrackPrivateObserver,
}

if (m_track.enabled()) {
pushSample(WTFMove(sample), "Pushing video frame from enabled track");
pushSample(WTFMove(sample), "Pushing video frame from enabled track"_s);
return;
}

Expand All @@ -490,7 +490,7 @@ class InternalSource final : public MediaStreamTrackPrivateObserver,
const auto& data = static_cast<const GStreamerAudioData&>(audioData);
if (m_track.enabled()) {
GRefPtr<GstSample> sample = data.getSample();
pushSample(WTFMove(sample), "Pushing audio sample from enabled track");
pushSample(WTFMove(sample), "Pushing audio sample from enabled track"_s);
return;
}

Expand Down Expand Up @@ -544,14 +544,17 @@ class InternalSource final : public MediaStreamTrackPrivateObserver,
auto width = m_lastKnownSize.width() ? m_lastKnownSize.width() : 320;
auto height = m_lastKnownSize.height() ? m_lastKnownSize.height() : 240;

int frameRateNumerator, frameRateDenominator;
gst_util_double_to_fraction(m_track.settings().frameRate(), &frameRateNumerator, &frameRateDenominator);

if (!m_blackFrameCaps)
m_blackFrameCaps = adoptGRef(gst_caps_new_simple("video/x-raw", "format", G_TYPE_STRING, "I420", "width", G_TYPE_INT, width, "height", G_TYPE_INT, height, nullptr));
m_blackFrameCaps = adoptGRef(gst_caps_new_simple("video/x-raw", "format", G_TYPE_STRING, "I420", "width", G_TYPE_INT, width, "height", G_TYPE_INT, height, "framerate", GST_TYPE_FRACTION, frameRateNumerator, frameRateDenominator, nullptr));
else {
auto* structure = gst_caps_get_structure(m_blackFrameCaps.get(), 0);
int currentWidth, currentHeight;
gst_structure_get(structure, "width", G_TYPE_INT, &currentWidth, "height", G_TYPE_INT, &currentHeight, nullptr);
if (currentWidth != width || currentHeight != height)
m_blackFrameCaps = adoptGRef(gst_caps_new_simple("video/x-raw", "format", G_TYPE_STRING, "I420", "width", G_TYPE_INT, width, "height", G_TYPE_INT, height, nullptr));
m_blackFrameCaps = adoptGRef(gst_caps_new_simple("video/x-raw", "format", G_TYPE_STRING, "I420", "width", G_TYPE_INT, width, "height", G_TYPE_INT, height, "framerate", GST_TYPE_FRACTION, frameRateNumerator, frameRateDenominator, nullptr));
}

GstVideoInfo info;
Expand All @@ -566,10 +569,11 @@ class InternalSource final : public MediaStreamTrackPrivateObserver,
memset(data.data(), 0, yOffset);
memset(data.data() + yOffset, 128, data.size() - yOffset);
}
gst_buffer_add_video_meta_full(buffer.get(), GST_VIDEO_FRAME_FLAG_NONE, GST_VIDEO_FORMAT_I420, width, height, 3, info.offset, info.stride);
gst_buffer_add_video_meta_full(buffer.get(), GST_VIDEO_FRAME_FLAG_NONE, GST_VIDEO_INFO_FORMAT(&info), GST_VIDEO_INFO_WIDTH(&info),
GST_VIDEO_INFO_HEIGHT(&info), GST_VIDEO_INFO_N_PLANES(&info), info.offset, info.stride);
GST_BUFFER_DTS(buffer.get()) = GST_BUFFER_PTS(buffer.get()) = gst_element_get_current_running_time(m_parent);
auto sample = adoptGRef(gst_sample_new(buffer.get(), m_blackFrameCaps.get(), nullptr, nullptr));
pushSample(WTFMove(sample), "Pushing black video frame");
pushSample(WTFMove(sample), "Pushing black video frame"_s);
}

void pushSilentSample()
Expand All @@ -590,7 +594,7 @@ class InternalSource final : public MediaStreamTrackPrivateObserver,
webkitGstAudioFormatFillSilence(info.finfo, map.data(), map.size());
}
auto sample = adoptGRef(gst_sample_new(buffer.get(), m_silentSampleCaps.get(), nullptr, nullptr));
pushSample(WTFMove(sample), "Pushing audio silence from disabled track");
pushSample(WTFMove(sample), "Pushing audio silence from disabled track"_s);
}

void createGstStream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,25 +76,8 @@ bool RealtimeIncomingSourceGStreamer::hasClient(const GRefPtr<GstElement>& appsr
return false;
}

std::optional<int> RealtimeIncomingSourceGStreamer::registerClient(GRefPtr<GstElement>&& appsrc)
void RealtimeIncomingSourceGStreamer::configureAppSink(GstElement* sink)
{
if (!m_tee)
return std::nullopt;

Locker lock { m_clientLock };
static Atomic<int> counter = 1;
auto clientId = counter.exchangeAdd(1);

auto* queue = gst_element_factory_make("queue", makeString("queue-"_s, clientId).ascii().data());
auto* sink = makeGStreamerElement("appsink", makeString("sink-"_s, clientId).ascii().data());
g_object_set(sink, "enable-last-sample", FALSE, nullptr);

if (!m_clientQuark)
m_clientQuark = g_quark_from_static_string("client-id");
g_object_set_qdata(G_OBJECT(sink), m_clientQuark, GINT_TO_POINTER(clientId));
GST_DEBUG_OBJECT(m_bin.get(), "Client %" GST_PTR_FORMAT " with id %d associated to new sink %" GST_PTR_FORMAT, appsrc.get(), clientId, sink);
m_clients.add(clientId, WTFMove(appsrc));

static GstAppSinkCallbacks callbacks = {
nullptr, // eos
[](GstAppSink* sink, gpointer userData) -> GstFlowReturn {
Expand Down Expand Up @@ -140,34 +123,7 @@ std::optional<int> RealtimeIncomingSourceGStreamer::registerClient(GRefPtr<GstEl
return false;

auto event = adoptGRef(GST_EVENT_CAST(gst_app_sink_pull_object(sink)));
switch (GST_EVENT_TYPE(event.get())) {
case GST_EVENT_STREAM_START:
case GST_EVENT_CAPS:
case GST_EVENT_SEGMENT:
case GST_EVENT_STREAM_COLLECTION:
return false;
case GST_EVENT_LATENCY: {
GstClockTime minLatency, maxLatency;
if (gst_base_sink_query_latency(GST_BASE_SINK(sink), nullptr, nullptr, &minLatency, &maxLatency)) {
if (int clientId = GPOINTER_TO_INT(g_object_get_qdata(G_OBJECT(sink), strongSource->m_clientQuark))) {
GST_DEBUG_OBJECT(sink, "Setting client latency to min %" GST_TIME_FORMAT " max %" GST_TIME_FORMAT, GST_TIME_ARGS(minLatency), GST_TIME_ARGS(maxLatency));
auto appsrc = strongSource->m_clients.get(clientId);
g_object_set(appsrc, "min-latency", minLatency, "max-latency", maxLatency, nullptr);
}
}
return false;
}
default:
break;
}

if (int clientId = GPOINTER_TO_INT(g_object_get_qdata(G_OBJECT(sink), strongSource->m_clientQuark))) {
GST_DEBUG_OBJECT(sink, "Forwarding event %" GST_PTR_FORMAT " to client", event.get());
auto appsrc = strongSource->m_clients.get(clientId);
auto pad = adoptGRef(gst_element_get_static_pad(appsrc, "src"));
gst_pad_push_event(pad.get(), event.leakRef());
}

strongSource->handleDownstreamEvent(GST_ELEMENT_CAST(sink), WTFMove(event));
return false;
},
#if GST_CHECK_VERSION(1, 24, 0)
Expand All @@ -179,12 +135,120 @@ std::optional<int> RealtimeIncomingSourceGStreamer::registerClient(GRefPtr<GstEl
gst_app_sink_set_callbacks(GST_APP_SINK(sink), &callbacks, new ThreadSafeWeakPtr { *this }, [](void* data) {
delete static_cast<ThreadSafeWeakPtr<RealtimeIncomingSourceGStreamer>*>(data);
});
}

void RealtimeIncomingSourceGStreamer::configureFakeVideoSink(GstElement* sink)
{
g_object_set(sink, "signal-handoffs", TRUE, nullptr);

auto handoffCallback = G_CALLBACK(+[](GstElement*, GstBuffer* buffer, GstPad* pad, gpointer userData) {
auto source = reinterpret_cast<ThreadSafeWeakPtr<RealtimeIncomingSourceGStreamer>*>(userData);
auto strongSource = source->get();
if (!strongSource)
return;

auto caps = adoptGRef(gst_pad_get_current_caps(pad));
auto sample = adoptGRef(gst_sample_new(buffer, caps.get(), nullptr, nullptr));
// dispatchSample might trigger RealtimeMediaSource::notifySettingsDidChangeObservers()
// which expects to run in the main thread.
callOnMainThread([source = ThreadSafeWeakPtr { *strongSource.get() }, sample = WTFMove(sample)]() mutable {
auto strongSource = source.get();
if (!strongSource)
return;

strongSource->dispatchSample(WTFMove(sample));
});
});
g_signal_connect_data(sink, "preroll-handoff", handoffCallback, new ThreadSafeWeakPtr { *this },
[](void* data, GClosure*) {
delete static_cast<ThreadSafeWeakPtr<RealtimeIncomingSourceGStreamer>*>(data);
}, static_cast<GConnectFlags>(0));
g_signal_connect_data(sink, "handoff", handoffCallback, new ThreadSafeWeakPtr { *this },
[](void* data, GClosure*) {
delete static_cast<ThreadSafeWeakPtr<RealtimeIncomingSourceGStreamer>*>(data);
}, static_cast<GConnectFlags>(0));

auto pad = adoptGRef(gst_element_get_static_pad(sink, "sink"));
gst_pad_add_probe(pad.get(), GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, reinterpret_cast<GstPadProbeCallback>(+[](GstPad* pad, GstPadProbeInfo* info, gpointer userData) -> GstPadProbeReturn {
auto source = reinterpret_cast<ThreadSafeWeakPtr<RealtimeIncomingSourceGStreamer>*>(userData);
auto strongSource = source->get();
if (!strongSource)
return GST_PAD_PROBE_OK;
GRefPtr event = GST_PAD_PROBE_INFO_EVENT(info);
auto sink = adoptGRef(gst_pad_get_parent_element(pad));
strongSource->handleDownstreamEvent(sink.get(), WTFMove(event));
return GST_PAD_PROBE_OK;
}), new ThreadSafeWeakPtr { *this }, [](void* data) {
delete static_cast<ThreadSafeWeakPtr<RealtimeIncomingSourceGStreamer>*>(data);
});
}

void RealtimeIncomingSourceGStreamer::handleDownstreamEvent(GstElement* sink, GRefPtr<GstEvent>&& event)
{
switch (GST_EVENT_TYPE(event.get())) {
case GST_EVENT_STREAM_START:
case GST_EVENT_CAPS:
case GST_EVENT_SEGMENT:
case GST_EVENT_STREAM_COLLECTION:
return;
case GST_EVENT_LATENCY: {
GstClockTime minLatency, maxLatency;
if (gst_base_sink_query_latency(GST_BASE_SINK(sink), nullptr, nullptr, &minLatency, &maxLatency)) {
if (int clientId = GPOINTER_TO_INT(g_object_get_qdata(G_OBJECT(sink), m_clientQuark))) {
GST_DEBUG_OBJECT(sink, "Setting client latency to min %" GST_TIME_FORMAT " max %" GST_TIME_FORMAT, GST_TIME_ARGS(minLatency), GST_TIME_ARGS(maxLatency));
auto appsrc = m_clients.get(clientId);
g_object_set(appsrc, "min-latency", minLatency, "max-latency", maxLatency, nullptr);
}
}
return;
}
default:
break;
}

if (int clientId = GPOINTER_TO_INT(g_object_get_qdata(G_OBJECT(sink), m_clientQuark))) {
GST_DEBUG_OBJECT(sink, "Forwarding event %" GST_PTR_FORMAT " to client", event.get());
auto appsrc = m_clients.get(clientId);
auto pad = adoptGRef(gst_element_get_static_pad(appsrc, "src"));
gst_pad_push_event(pad.get(), event.leakRef());
}
}

std::optional<int> RealtimeIncomingSourceGStreamer::registerClient(GRefPtr<GstElement>&& appsrc)
{
if (!m_tee)
return std::nullopt;

Locker lock { m_clientLock };
static Atomic<int> counter = 1;
auto clientId = counter.exchangeAdd(1);

auto* queue = gst_element_factory_make("queue", makeString("queue-"_s, clientId).ascii().data());

ASCIILiteral sinkName = "appsink"_s;
if (isIncomingVideoSource() && m_isUpstreamDecoding)
sinkName = "fakevideosink"_s;
auto* sink = makeGStreamerElement(sinkName.characters(), makeString("sink-"_s, clientId).ascii().data());
g_object_set(sink, "enable-last-sample", FALSE, nullptr);

if (!m_clientQuark)
m_clientQuark = g_quark_from_static_string("client-id");
g_object_set_qdata(G_OBJECT(sink), m_clientQuark, GINT_TO_POINTER(clientId));
GST_DEBUG_OBJECT(m_bin.get(), "Client %" GST_PTR_FORMAT " with id %d associated to new sink %" GST_PTR_FORMAT, appsrc.get(), clientId, sink);
m_clients.add(clientId, WTFMove(appsrc));

if (GST_IS_APP_SINK(sink))
configureAppSink(sink);
else
configureFakeVideoSink(sink);

auto sinkPad = adoptGRef(gst_element_get_static_pad(sink, "sink"));
gst_pad_add_probe(sinkPad.get(), GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM, reinterpret_cast<GstPadProbeCallback>(+[](GstPad* pad, GstPadProbeInfo* info, RealtimeIncomingSourceGStreamer* self) -> GstPadProbeReturn {
auto query = GST_QUERY_CAST(info->data);
if (self->isIncomingVideoSource() && self->m_isUpstreamDecoding && GST_QUERY_TYPE(query) == GST_QUERY_ALLOCATION)
gst_query_add_allocation_meta(query, GST_VIDEO_META_API_TYPE, NULL);
if (self->isIncomingVideoSource() && self->m_isUpstreamDecoding && GST_QUERY_TYPE(query) == GST_QUERY_ALLOCATION) {
// Let fakevideosink handle the allocation query.
return GST_PAD_PROBE_OK;
}

auto sink = adoptGRef(gst_pad_get_parent_element(pad));
int clientId = GPOINTER_TO_INT(g_object_get_qdata(G_OBJECT(sink.get()), self->m_clientQuark));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ class RealtimeIncomingSourceGStreamer : public RealtimeMediaSource, public Threa
void stopProducingData() final;
const RealtimeMediaSourceCapabilities& capabilities() final;

void configureAppSink(GstElement*);
void configureFakeVideoSink(GstElement*);
void handleDownstreamEvent(GstElement*, GRefPtr<GstEvent>&&);

virtual void dispatchSample(GRefPtr<GstSample>&&) { }

void unregisterClientLocked(int);
Expand Down

0 comments on commit aed5721

Please sign in to comment.