Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gstream #71

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
4 changes: 3 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,6 @@ jobs:
run: python3 -m coverage run -m pytest test/
- name: Display coverage report
run: python3 -m coverage report --omit=config*

- name: Add packages for gi (pygobject)
run: |
sudo apt install -y python3-gi python3-gi-cairo gir1.2-gtk-3.0 libgirepository1.0-dev build-essential
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,8 @@ python3 -m pip install git+https://github.com/DeGirum/degirum_tools.git
## Release

Release procedure [is described here](https://degirum.atlassian.net/wiki/spaces/SD/pages/1916076041/degirum+tools+Package+Release+Procedure)

## Test Gstream

export PYTHONPATH=$(pwd)/degirum_tools:$PYTHONPATH
python test/test_gstream.py
Binary file added Traffic.mp4
Binary file not shown.
4 changes: 2 additions & 2 deletions degirum_tools/inference_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
open_video_stream,
get_video_stream_properties,
video_source,
open_video_writer,
open_video_writer, VideoCaptureGst
)
from .ui_support import Progress, Display, Timer
from .result_analyzer_base import ResultAnalyzerBase
Expand Down Expand Up @@ -276,7 +276,7 @@ def annotate_video(
display = stack.enter_context(Display(win_name))

if isinstance(video_source_id, cv2.VideoCapture):
stream = video_source_id
stream: Union[cv2.VideoCapture, VideoCaptureGst] = video_source_id
else:
stream = stack.enter_context(open_video_stream(video_source_id))

Expand Down
189 changes: 142 additions & 47 deletions degirum_tools/video_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,130 @@
# Copyright DeGirum Corporation 2024
# All rights reserved
#
# Implements classes and functions to handle video streams for capturing and saving
# Implements classes and functions to handle video streams for capturing & saving
#

import time
import cv2, urllib, numpy as np
import cv2, numpy as np
from contextlib import contextmanager
from functools import cmp_to_key
from pathlib import Path
from . import environment as env
from .ui_support import Progress
from typing import Union, Generator, Optional, Callable, Any
from typing import Union, Generator, Optional, Callable, Any, Tuple
from urllib.parse import urlparse


class VideoCaptureGst:
def __init__(self, pipeline_str):
# Import GStreamer libraries using optional package support
gi = env.import_optional_package("gi")
gi.require_version("Gst", "1.0")
from gi.repository import Gst, GLib

Gst.init(None)
try:
self.pipeline = Gst.parse_launch(pipeline_str)
except GLib.Error as e:
raise Exception(f"Invalid GStreamer pipeline: {pipeline_str}") from e

self.appsink = self.pipeline.get_by_name("sink")
if not self.appsink:
raise Exception(f"Invalid GStreamer pipeline (no appsink): {pipeline_str}")

self.appsink.set_property("emit-signals", True)
self.pipeline.set_state(Gst.State.PLAYING)

# Check if the pipeline transitions to the PLAYING state
state_change_result = self.pipeline.get_state(5 * Gst.SECOND)
if state_change_result[1] != Gst.State.PLAYING:
raise Exception(f"GStreamer pipeline failed to start: {pipeline_str}")

self.running = True

def read(self):
env.import_optional_package("gi")
from gi.repository import Gst
if not self.running:
return False, None
sample = self.appsink.emit("pull-sample")
if not sample:
self.running = False
return False, None

buf = sample.get_buffer()
caps = sample.get_caps()
width = caps.get_structure(0).get_value("width")
height = caps.get_structure(0).get_value("height")
print(width, height)
success, mapinfo = buf.map(Gst.MapFlags.READ)
if not success:
return False, None

try:
frame: np.ndarray = np.ndarray((height, width, 3), buffer=mapinfo.data, dtype=np.uint8)
return True, frame
finally:
buf.unmap(mapinfo)

def get(self, prop: int):
env.import_optional_package("gi")
from gi.repository import Gst

pad = self.appsink.get_static_pad("sink")
caps = pad.get_current_caps()
if not caps:
return None

structure = caps.get_structure(0)
framerate = structure.get_fraction('framerate')

# Convert Gst.Fraction to a Python float or tuple
if framerate:
numerator = framerate.value_numerator
denominator = framerate.value_denominator
frame_rate_float = numerator / denominator
if prop == cv2.CAP_PROP_FRAME_WIDTH:
return structure.get_value("width")
elif prop == cv2.CAP_PROP_FRAME_HEIGHT:
return structure.get_value("height")
elif prop == cv2.CAP_PROP_FPS:
return frame_rate_float
elif prop == cv2.CAP_PROP_FRAME_COUNT:
duration = self.pipeline.query_duration(Gst.Format.TIME)[1]
return (duration / Gst.SECOND) * (numerator / denominator)
return None

def isOpened(self):
return self.running

def release(self):
env.import_optional_package("gi")
from gi.repository import Gst
self.pipeline.set_state(Gst.State.NULL)
self.running = False


@contextmanager
def open_video_stream(
video_source: Union[int, str, Path, None] = None, max_yt_quality: int = 0
) -> Generator[cv2.VideoCapture, None, None]:
"""Open OpenCV video stream from camera with given identifier.

video_source - 0-based index for local cameras
or IP camera URL in the format "rtsp://<user>:<password>@<ip or hostname>",
or local video file path,
or URL to mp4 video file,
or YouTube video URL
max_yt_quality - The maximum video quality for YouTube videos. The units are
in pixels for the height of the video. Will open a video with the highest
resolution less than or equal to max_yt_quality. If 0, open the best quality.

Returns context manager yielding video stream object and closing it on exit
video_source: Union[int, str, Path, None, cv2.VideoCapture, VideoCaptureGst], max_yt_quality: int = 0
) -> Generator[Union[cv2.VideoCapture, "VideoCaptureGst"], None, None]:
"""
Open a video stream, returning a context manager.
Supports OpenCV, YouTube videos, and GStreamer-based streams.

video_source - 0-based index for local cameras,
IP camera URL,
local video file path,
URL to mp4 video file,
YouTube video URL, or
GStreamer pipeline.
max_yt_quality - The maximum video quality for YouTube videos.
The units are in pixels for the height of the video.
Will open a video with the highest resolution <= max_yt_quality.
If 0, open the best quality.

Returns a context manager yielding a video capture object.
"""

if env.get_test_mode() or video_source is None:
video_source = env.get_var(env.var_VideoSource, 0)
if isinstance(video_source, str) and video_source.isnumeric():
Expand All @@ -43,20 +136,13 @@ def open_video_stream(
if isinstance(video_source, Path):
video_source = str(video_source)

if isinstance(video_source, str) and urllib.parse.urlparse(
video_source
).hostname in (
"www.youtube.com",
"youtube.com",
"youtu.be",
): # if source is YouTube video
# Handle YouTube video source
if isinstance(video_source, str) and urlparse(video_source).hostname in ("www.youtube.com", "youtube.com", "youtu.be"):
import pafy

if max_yt_quality == 0:
video_source = pafy.new(video_source).getbest(preftype="mp4").url
else:
# Ignore DASH/HLS YouTube videos because we cannot download them trivially w/ OpenCV or ffmpeg.
# Format ids are from pafy backend https://github.com/ytdl-org/youtube-dl/blob/master/youtube_dl/extractor/youtube.py
dash_hls_formats = [
91,
92,
Expand All @@ -81,8 +167,7 @@ def open_video_stream(
]

video_qualities = pafy.new(video_source).videostreams
# Sort descending based on vertical pixel count.
video_qualities = sorted(video_qualities, key=cmp_to_key(lambda a, b: b.dimensions[1] - a.dimensions[1])) # type: ignore[attr-defined]
video_qualities = sorted(video_qualities, key=lambda x: x.dimensions[1], reverse=True)

for video in video_qualities:
if video.dimensions[1] <= max_yt_quality and video.extension == "mp4":
Expand All @@ -92,7 +177,12 @@ def open_video_stream(
else:
video_source = pafy.new(video_source).getbest(preftype="mp4").url

stream = cv2.VideoCapture(video_source) # type: ignore[arg-type]
# Check if video source is an instance of gstreamer or cv2
stream: Union[VideoCaptureGst, cv2.VideoCapture]
if isinstance(video_source, str) and ("!" in video_source or "filesrc" in video_source):
stream = VideoCaptureGst(video_source)
else:
stream = cv2.VideoCapture(video_source) # type: ignore[arg-type]
if not stream.isOpened():
raise Exception(f"Error opening '{video_source}' video stream")
else:
Expand All @@ -104,25 +194,33 @@ def open_video_stream(
stream.release()


def get_video_stream_properties(
video_source: Union[int, str, Path, None, cv2.VideoCapture]
) -> tuple:
def get_video_stream_properties(video_source: Union[int, str, Path, None, cv2.VideoCapture, VideoCaptureGst]) -> tuple:
"""
Get video stream properties

Args:
video_source - VideoCapture object or argument of open_video_stream() function

Returns:
tuple of (width, height, fps)
"""

def get_props(stream: cv2.VideoCapture) -> tuple:
return (
int(stream.get(cv2.CAP_PROP_FRAME_WIDTH)),
int(stream.get(cv2.CAP_PROP_FRAME_HEIGHT)),
stream.get(cv2.CAP_PROP_FPS),
)
def get_props(stream: Union[cv2.VideoCapture, VideoCaptureGst]) -> Tuple[int, int, float]:
"""
Get properties for cv2.VideoCapture or VideoCaptureGst
"""
if isinstance(stream, cv2.VideoCapture):
return (
int(stream.get(cv2.CAP_PROP_FRAME_WIDTH)),
int(stream.get(cv2.CAP_PROP_FRAME_HEIGHT)),
stream.get(cv2.CAP_PROP_FPS),
)
elif isinstance(stream, VideoCaptureGst):
return (
int(stream.get(cv2.CAP_PROP_FRAME_WIDTH)),
int(stream.get(cv2.CAP_PROP_FRAME_HEIGHT)),
float(stream.get(cv2.CAP_PROP_FPS)),
)
else:
raise ValueError("Unsupported stream type")

if isinstance(video_source, cv2.VideoCapture):
return get_props(video_source)
Expand All @@ -132,7 +230,7 @@ def get_props(stream: cv2.VideoCapture) -> tuple:


def video_source(
stream: cv2.VideoCapture, fps: Optional[float] = None
stream: Union[cv2.VideoCapture, VideoCaptureGst], fps: Optional[float] = None
) -> Generator[np.ndarray, None, None]:
"""Generator function, which returns video frames captured from given video stream.
Useful to pass to model batch_predict().
Expand Down Expand Up @@ -201,10 +299,8 @@ class VideoWriter:
"""
H264 mp4 video stream writer class
"""

def __init__(self, fname: str, w: int = 0, h: int = 0, fps: float = 30.0):
"""Create, open, and return video stream writer

Args:
fname: filename to save video
w, h: frame width/height (optional, can be zero to deduce on first frame)
Expand Down Expand Up @@ -357,5 +453,4 @@ def video2jpegs(
cv2.imwrite(fname, img)
progress.step()
fi += 1

return fi
return fi
Binary file added sample.mp4
Binary file not shown.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
"types-Pillow",
"types-PyYAML",
],
"testing": ["pytest", "coverage"],
"testing": ["pytest", "coverage", "pygobject"],
"build": ["build"],
},
include_package_data=True,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,8 @@ def non_max_suppression(
)
else: # best class only
conf = np.max(cls, axis=1, keepdims=True)
j = np.argmax(cls[:, :], axis=1, keepdims=True)
j = np.argmax(cls[:, :], axis=1) # Remove keepdims
j = np.expand_dims(j, axis=1) # Add the dimension back
x = np.concatenate((box, conf, j, mask), axis=1)

# Filter by class
Expand Down
35 changes: 35 additions & 0 deletions test/test_gstream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from degirum_tools.video_support import open_video_stream


def test_gstreamer_pipeline():
# Test with a sample GStreamer pipeline
gst_pipeline = "filesrc location=Traffic.mp4 ! decodebin ! videoconvert ! video/x-raw,format=RGB ! appsink name=sink"
try:
with open_video_stream(gst_pipeline) as stream:
assert stream.isOpened(), "Failed to open GStreamer pipeline"
ret, frame = stream.read()
assert ret, "Failed to read a frame from GStreamer pipeline"
assert frame is not None, "Frame is None from GStreamer pipeline"
except ImportError:
print("GStreamer not available, skipping test")


def test_invalid_gstreamer_pipeline():
invalid_pipeline = "invalid_pipeline ! appsink"
try:
with open_video_stream(invalid_pipeline):
pass # This should not execute if the pipeline is invalid
except Exception as e:
# Match specific error messages
assert "Invalid GStreamer pipeline" in str(e) or "failed to start" in str(e), (
f"Unexpected error message: {e}"
)
else:
assert False, "Did not raise error for invalid GStreamer pipeline"


if __name__ == "__main__":
print("Running tests...")
test_gstreamer_pipeline()
test_invalid_gstreamer_pipeline()
print("All tests completed.")
Loading