From af5d0bd6b5eb537dbc480cac1e3eb71c2594101a Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Tue, 9 Jul 2024 06:44:53 -0600 Subject: [PATCH] Limited shm frame count (#12346) * Only keep 2x detect fps frames in SHM * Don't delete previous shm frames in output * Catch case where images do not exist * Ensure files are closed * Clear out all frames when shutting down * Correct the number of frames saved * Simplify empty shm error handling * Improve frame safety --- frigate/object_processing.py | 15 +++++++++----- frigate/output/output.py | 13 +++++------- frigate/util/image.py | 26 ++++++++++++++---------- frigate/video.py | 39 ++++++++++++++++++++++++------------ 4 files changed, 56 insertions(+), 37 deletions(-) diff --git a/frigate/object_processing.py b/frigate/object_processing.py index dcf6014fcc..512ef5f633 100644 --- a/frigate/object_processing.py +++ b/frigate/object_processing.py @@ -659,6 +659,7 @@ def on(self, event_type: str, callback: Callable[[dict], None]): def update(self, frame_time, current_detections, motion_boxes, regions): # get the new frame frame_id = f"{self.name}{frame_time}" + current_frame = self.frame_manager.get( frame_id, self.camera_config.frame_shape_yuv ) @@ -693,7 +694,7 @@ def update(self, frame_time, current_detections, motion_boxes, regions): for c in self.callbacks["autotrack"]: c(self.name, updated_obj, frame_time) - if thumb_update: + if thumb_update and current_frame is not None: # ensure this frame is stored in the cache if ( updated_obj.thumbnail_data["frame_time"] == frame_time @@ -850,12 +851,16 @@ def update(self, frame_time, current_detections, motion_boxes, regions): with self.current_frame_lock: self.tracked_objects = tracked_objects - self.current_frame_time = frame_time self.motion_boxes = motion_boxes self.regions = regions - self._current_frame = current_frame - if self.previous_frame_id is not None: - self.frame_manager.close(self.previous_frame_id) + + if current_frame is not None: + self.current_frame_time = frame_time + self._current_frame = current_frame + + if self.previous_frame_id is not None: + self.frame_manager.close(self.previous_frame_id) + self.previous_frame_id = frame_id diff --git a/frigate/output/output.py b/frigate/output/output.py index e0e7d0cac0..283c245252 100644 --- a/frigate/output/output.py +++ b/frigate/output/output.py @@ -45,7 +45,6 @@ def receiveSignal(signalNumber, frame): signal.signal(signal.SIGINT, receiveSignal) frame_manager = SharedMemoryFrameManager() - previous_frames = {} # start a websocket server on 8082 WebSocketWSGIHandler.http_version = "1.1" @@ -97,6 +96,9 @@ def receiveSignal(signalNumber, frame): frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv) + if frame is None: + continue + # send camera frame to ffmpeg process if websockets are connected if any( ws.environ["PATH_INFO"].endswith(camera) for ws in websocket_server.manager @@ -124,12 +126,7 @@ def receiveSignal(signalNumber, frame): preview_recorders[camera].write_data( current_tracked_objects, motion_boxes, frame_time, frame ) - - # delete frames after they have been used for output - if camera in previous_frames: - frame_manager.delete(f"{camera}{previous_frames[camera]}") - - previous_frames[camera] = frame_time + frame_manager.close(frame_id) move_preview_frames("clips") @@ -149,7 +146,7 @@ def receiveSignal(signalNumber, frame): frame_id = f"{camera}{frame_time}" frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv) - frame_manager.delete(frame_id) + frame_manager.close(frame_id) detection_subscriber.stop() diff --git a/frigate/util/image.py b/frigate/util/image.py index 3962d9600e..a3619193fb 100644 --- a/frigate/util/image.py +++ b/frigate/util/image.py @@ -687,27 +687,31 @@ def delete(self, name): class SharedMemoryFrameManager(FrameManager): def __init__(self): - self.shm_store = {} + self.shm_store: dict[str, shared_memory.SharedMemory] = {} - def create(self, name, size) -> AnyStr: + def create(self, name: str, size) -> AnyStr: shm = shared_memory.SharedMemory(name=name, create=True, size=size) self.shm_store[name] = shm return shm.buf - def get(self, name, shape): - if name in self.shm_store: - shm = self.shm_store[name] - else: - shm = shared_memory.SharedMemory(name=name) - self.shm_store[name] = shm - return np.ndarray(shape, dtype=np.uint8, buffer=shm.buf) + def get(self, name: str, shape) -> Optional[np.ndarray]: + try: + if name in self.shm_store: + shm = self.shm_store[name] + else: + shm = shared_memory.SharedMemory(name=name) + self.shm_store[name] = shm + return np.ndarray(shape, dtype=np.uint8, buffer=shm.buf) + except FileNotFoundError: + logger.error(f"Failed to get {name} from SHM") + return None - def close(self, name): + def close(self, name: str): if name in self.shm_store: self.shm_store[name].close() del self.shm_store[name] - def delete(self, name): + def delete(self, name: str): if name in self.shm_store: self.shm_store[name].close() self.shm_store[name].unlink() diff --git a/frigate/video.py b/frigate/video.py index 1c74575dc3..3397de6e4f 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -94,7 +94,7 @@ def start_or_restart_ffmpeg( def capture_frames( ffmpeg_process, - camera_name, + config: CameraConfig, frame_shape, frame_manager: FrameManager, frame_queue, @@ -108,24 +108,36 @@ def capture_frames( frame_rate.start() skipped_eps = EventsPerSecond() skipped_eps.start() + + shm_count = max(10, config.detect.fps * 2) + shm_frames: list[str] = [] + while True: fps.value = frame_rate.eps() skipped_fps.value = skipped_eps.eps() - current_frame.value = datetime.datetime.now().timestamp() - frame_name = f"{camera_name}{current_frame.value}" + frame_name = f"{config.name}{current_frame.value}" frame_buffer = frame_manager.create(frame_name, frame_size) try: frame_buffer[:] = ffmpeg_process.stdout.read(frame_size) + + # update frame cache and cleanup existing frames + shm_frames.append(frame_name) + + if len(shm_frames) > shm_count: + expired_frame_name = shm_frames.pop(0) + frame_manager.delete(expired_frame_name) except Exception: + frame_manager.delete(frame_name) + # shutdown has been initiated if stop_event.is_set(): break - logger.error(f"{camera_name}: Unable to read frames from ffmpeg process.") + logger.error(f"{config.name}: Unable to read frames from ffmpeg process.") if ffmpeg_process.poll() is not None: logger.error( - f"{camera_name}: ffmpeg process is not running. exiting capture thread..." + f"{config.name}: ffmpeg process is not running. exiting capture thread..." ) frame_manager.delete(frame_name) break @@ -137,12 +149,13 @@ def capture_frames( try: # add to the queue frame_queue.put(current_frame.value, False) - # close the frame - frame_manager.close(frame_name) except queue.Full: # if the queue is full, skip this frame skipped_eps.update() - frame_manager.delete(frame_name) + + # clear out frames + for frame in shm_frames: + frame_manager.delete(frame) class CameraWatchdog(threading.Thread): @@ -282,7 +295,7 @@ def start_ffmpeg_detect(self): ) self.ffmpeg_pid.value = self.ffmpeg_detect_process.pid self.capture_thread = CameraCapture( - self.camera_name, + self.config, self.ffmpeg_detect_process, self.frame_shape, self.frame_queue, @@ -321,7 +334,7 @@ def get_latest_segment_datetime(self, latest_segment: datetime.datetime) -> int: class CameraCapture(threading.Thread): def __init__( self, - camera_name, + config: CameraConfig, ffmpeg_process, frame_shape, frame_queue, @@ -330,8 +343,8 @@ def __init__( stop_event, ): threading.Thread.__init__(self) - self.name = f"capture:{camera_name}" - self.camera_name = camera_name + self.name = f"capture:{config.name}" + self.config = config self.frame_shape = frame_shape self.frame_queue = frame_queue self.fps = fps @@ -345,7 +358,7 @@ def __init__( def run(self): capture_frames( self.ffmpeg_process, - self.camera_name, + self.config, self.frame_shape, self.frame_manager, self.frame_queue,