Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replace opencv with libcamera? #3657

Open
totaam opened this issue Oct 18, 2022 · 2 comments
Open

replace opencv with libcamera? #3657

totaam opened this issue Oct 18, 2022 · 2 comments
Labels
encoding enhancement New feature or request linux

Comments

@totaam
Copy link
Collaborator

totaam commented Oct 18, 2022

libcamera is A complex camera support library for Linux, Android, and ChromeOS

It is probably more suitable than the humongous dependencies we get with opencv.
Or perhaps we should use gstreamer?

See also: MacOS (#1231), capturing video (#1627), MS Windows (#3336).

@totaam totaam added enhancement New feature or request encoding linux labels Oct 18, 2022
@totaam
Copy link
Collaborator Author

totaam commented Mar 17, 2023

We can do libcamera via gstreamer!

gst-launch-1.0 libcamerasrc ! queue ! videoconvert ! autovideosink

@totaam
Copy link
Collaborator Author

totaam commented Sep 23, 2024

WIP.
For lack of a better place, here's a bit of code that saves a VGA preview of my webcam to file:


import os
import mmap
import time
import selectors

from libcamera import (
    Camera, CameraManager, CameraConfiguration,
    StreamRole, Request, PixelFormat, Size,
    FrameBuffer, FrameBufferAllocator, FrameMetadata,
)

cm = CameraManager.singleton()


class MappedFrameBuffer:
    """
    Provides memoryviews for the FrameBuffer's planes
    """
    def __init__(self, fb: FrameBuffer):
        self.__fb = fb
        self.__planes = ()
        self.__maps = ()

    def __enter__(self):
        return self.mmap()

    def __exit__(self, exc_type, exc_value, exc_traceback):
        self.munmap()

    def mmap(self):
        if self.__planes:
            raise RuntimeError('MappedFrameBuffer already mmapped')
        fb = self.__fb

        # Collect information about the buffers

        bufinfos = {}
        for plane in fb.planes:
            fd = plane.fd

            if fd not in bufinfos:
                buflen = os.lseek(fd, 0, os.SEEK_END)
                bufinfos[fd] = {'maplen': 0, 'buflen': buflen}
            else:
                buflen = bufinfos[fd]['buflen']

            if plane.offset > buflen or plane.offset + plane.length > buflen:
                raise RuntimeError(f'plane is out of buffer: buffer length={buflen}, ' +
                                   f'plane offset={plane.offset}, plane length={plane.length}')

            bufinfos[fd]['maplen'] = max(bufinfos[fd]['maplen'], plane.offset + plane.length)

        # mmap the buffers
        maps = []
        for fd, info in bufinfos.items():
            map = mmap.mmap(fd, info['maplen'], mmap.MAP_SHARED, mmap.PROT_READ | mmap.PROT_WRITE)
            info['map'] = map
            maps.append(map)

        self.__maps = tuple(maps)

        # Create memoryviews for the planes
        planes = []
        for plane in fb.planes:
            fd = plane.fd
            info = bufinfos[fd]

            mv = memoryview(info['map'])

            start = plane.offset
            end = plane.offset + plane.length

            mv = mv[start:end]

            planes.append(mv)

        self.__planes = tuple(planes)
        return self

    def munmap(self) -> None:
        if not self.__planes:
            raise RuntimeError('MappedFrameBuffer not mmapped')

        for p in self.__planes:
            p.release()

        for mm in self.__maps:
            mm.close()

        self.__planes = ()
        self.__maps = ()

    @property
    def planes(self) -> tuple[memoryview, ...]:
        """memoryviews for the planes"""
        if not self.__planes:
            raise RuntimeError('MappedFrameBuffer not mmapped')

        return self.__planes

    @property
    def fb(self):
        return self.__fb



mmap_buffers = {}


def handle_request(req: Request, fmt: PixelFormat, size: Size):
    print(f"handle_request({req}, {fmt}, {size})")
    buffers = req.buffers
    assert len(buffers) == 1

    stream, fb = next(iter(buffers.items()))

    meta = fb.metadata
    if meta.status != FrameMetadata.Status.Success:
        raise RuntimeError(f"failed to grab frame: {meta.status}")
    print(f"got buffer {meta.sequence}, timestamp={meta.timestamp}")
    print(f" cookie {fb.cookie}")
    # print(f" {dir(fb)}")
    # for mid, value in req.metadata.items():
    #    print(f"    {mid.name} = {value}")
    print(f" {len(meta.planes)} planes:")
    for plane in meta.planes:
        print(f"  {plane.bytes_used} bytes")
    for plane in fb.planes:
        # print(f" {dir(plane)}")
        print(f"  fd {plane.fd}")
        print(f"  offset={plane.offset}, length={plane.length}")
    assert len(meta.planes) == 1
    assert len(fb.planes) == 1

    offset = fb.planes[0].offset
    data_size = meta.planes[0].bytes_used

    mfb = mmap_buffers[fb]
    # print(f"mmap buffer: {mfb}")
    assert len(mfb.planes) == 1
    data = mfb.planes[0][offset:offset+data_size].tobytes()
    if fourcc_str(fmt.fourcc) == "YUYV":
        from xpra.util.objects import typedict
        from xpra.codecs.image import ImageWrapper
        from xpra.codecs.loader import load_codec, log
        log.enable_debug()
        libyuv = load_codec("csc_libyuv")
        print(f"{libyuv}")
        converter = libyuv.Converter()
        w = size.width
        h = size.height
        converter.init_context(w, h, "YUYV", w, h, "BGRX", typedict())
        src_image = ImageWrapper(0, 0, w, h, data, "YUYV", 24, w*2)
        dst_image = converter.convert_image(src_image)
        rgb_data = dst_image.get_pixels()
        from PIL import Image
        img = Image.frombuffer("RGBA", (w, h), rgb_data, "raw", "BGRA", dst_image.get_rowstride())
        img.save("camera.png", format="PNG")
    else:
        show = data[:100]
        print(f"{show=}")


def fourcc_str(value=1448695129) -> str:
    s = ""
    for _ in range(4):
        s += chr(value % 256)
        value = value >> 8
    return s


def fourcc_int(value="YUYV") -> int:
    ival = 0
    for c in reversed(list(value)):
        ival = ival << 8
        ival += ord(c)
    return ival


def choose_size(sizes: list[Size], width=640, height=480):
    print(f"choose_size{sizes}")
    candidates = [size for size in sizes if size.width <= width and size.height <= height]
    if not candidates:
        candidates = sizes
    scores = dict((abs(width * height - size.width * size.height), size) for size in candidates)
    best_score = sorted(scores)[0]
    best = scores[best_score]
    print(f"best={best}")
    return best


for i, camera in enumerate(cm.cameras):
    print(f"{i} {camera}")
    for control_id, control_info in camera.controls.items():
        print(f"  {control_id.id}  {control_id.name} {control_id.type}")
        print(f"     {control_info.min} - {control_info.max} {control_info.values}")
    cam_id = camera.id
    for control_id, control_value in camera.properties.items():
        print(f" {control_id.name} : {control_value!r}")
    camera.acquire()
    try:
        # other options: StillCapture, VideoRecording, Viewfinder
        config = camera.generate_configuration([StreamRole.Raw])
        print(f"configuration:")
        if config.empty:
            raise RuntimeError("configuration is empty")
        print(f" orientation={config.orientation}")

        stream_formats = {}
        for stream_index in range(config.size):
            stream_config = config.at(stream_index)
            print(f"stream config: {stream_config}")
            print(f"  buffer count={stream_config.buffer_count}")
            print(f"  color space={stream_config.color_space}")
            formats = stream_config.formats
            for fmt in formats.pixel_formats:
                print("   Pixelformat:", fmt, formats.range(fmt))
                pixfmt = fourcc_str(fmt.fourcc)
                print(f"   {pixfmt} - {fmt.modifier}")
                sizes = []
                for size in formats.sizes(fmt):
                    print("    -", size)
                    sizes.append(size)
                stream_formats.setdefault(stream_index, {})[pixfmt] = (fmt, sizes)
            print(f"  frame size={stream_config.frame_size}")
            print(f"  pixel format={stream_config.pixel_format}")
            print(f"  size={stream_config.size}")
            print(f"  stride={stream_config.stride}")

        stream_index = 0
        stream_config = config.at(stream_index)
        print(f"config={dir(config)}")
        print(f"stream_config={dir(stream_config)}")
        pixel_formats = stream_formats.get(stream_index)
        if "YUYV" in pixel_formats:
            fmt, sizes = pixel_formats.get("YUYV")
            stream_config.pixel_format = fmt
            stream_config.size = choose_size(sizes)
            #config.size = choose_size(sizes)
        r = camera.configure(config)
        if r:
            raise RuntimeError(f"failed to configure the camera: {r}")
        config.validate()

        allocator = FrameBufferAllocator(camera)
        stream = stream_config.stream
        r = allocator.allocate(stream)
        if r < 0:
            raise RuntimeError("failed to allocate buffers")

        buffers = allocator.buffers(stream)
        print(f"allocated {len(buffers)} buffers")

        mmap_buffers = {}
        requests = []
        for buf in buffers[:1]:
            request = camera.create_request()
            if not request:
                raise RuntimeError("failed to create request")

            request.add_buffer(stream, buf)
            requests.append(request)
            mmap_buf = MappedFrameBuffer(buf)
            mmap_buffers[buf] = mmap_buf.mmap()

        print("starting camera")
        camera.start()

        print(f"queueing requests: {requests}")
        for request in requests:
            camera.queue_request(request)

        print("creating selector")
        sel = selectors.DefaultSelector()
        sel.register(cm.event_fd, selectors.EVENT_READ)

        print("waiting for events")
        frames = 0
        while frames < 5:
            events = sel.select()
            if not events:
                continue

            reqs = cm.get_ready_requests()
            for req in reqs:
                handle_request(req, stream_config.pixel_format, stream_config.size)
                frames += 1
                req.reuse()
                camera.queue_request(req)

    finally:
        camera.stop()
        camera.release()

totaam added a commit that referenced this issue Sep 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
encoding enhancement New feature or request linux
Projects
None yet
Development

No branches or pull requests

1 participant