From 61a77b8ee242381ac3a2076112b6493953d9dd44 Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Mon, 8 Jul 2024 15:38:17 -0600 Subject: [PATCH 1/8] Only keep 2x detect fps frames in SHM --- frigate/video.py | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/frigate/video.py b/frigate/video.py index 1c74575dc3..69a65bfa0c 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -94,7 +94,7 @@ def start_or_restart_ffmpeg( def capture_frames( ffmpeg_process, - camera_name, + config: CameraConfig, frame_shape, frame_manager: FrameManager, frame_queue, @@ -108,24 +108,34 @@ def capture_frames( frame_rate.start() skipped_eps = EventsPerSecond() skipped_eps.start() + + shm_count = max(10, config.detect.fps * 2) + shm_frames: list[str] = [] + while True: fps.value = frame_rate.eps() skipped_fps.value = skipped_eps.eps() - current_frame.value = datetime.datetime.now().timestamp() - frame_name = f"{camera_name}{current_frame.value}" + frame_name = f"{config.name}{current_frame.value}" frame_buffer = frame_manager.create(frame_name, frame_size) try: frame_buffer[:] = ffmpeg_process.stdout.read(frame_size) + + # update frame cache and cleanup existing frames + shm_frames.append(frame_name) + + if len(shm_frames) > shm_count: + expired_frame_name = shm_frames.pop(0) + frame_manager.delete(expired_frame_name) except Exception: # shutdown has been initiated if stop_event.is_set(): break - logger.error(f"{camera_name}: Unable to read frames from ffmpeg process.") + logger.error(f"{config.name}: Unable to read frames from ffmpeg process.") if ffmpeg_process.poll() is not None: logger.error( - f"{camera_name}: ffmpeg process is not running. exiting capture thread..." + f"{config.name}: ffmpeg process is not running. exiting capture thread..." ) frame_manager.delete(frame_name) break @@ -282,7 +292,7 @@ def start_ffmpeg_detect(self): ) self.ffmpeg_pid.value = self.ffmpeg_detect_process.pid self.capture_thread = CameraCapture( - self.camera_name, + self.config, self.ffmpeg_detect_process, self.frame_shape, self.frame_queue, @@ -321,7 +331,7 @@ def get_latest_segment_datetime(self, latest_segment: datetime.datetime) -> int: class CameraCapture(threading.Thread): def __init__( self, - camera_name, + config: CameraConfig, ffmpeg_process, frame_shape, frame_queue, @@ -330,8 +340,8 @@ def __init__( stop_event, ): threading.Thread.__init__(self) - self.name = f"capture:{camera_name}" - self.camera_name = camera_name + self.name = f"capture:{config.name}" + self.config = config self.frame_shape = frame_shape self.frame_queue = frame_queue self.fps = fps @@ -345,7 +355,7 @@ def __init__( def run(self): capture_frames( self.ffmpeg_process, - self.camera_name, + self.config, self.frame_shape, self.frame_manager, self.frame_queue, From de9160ccaa592b75dc7afad3435e58abe9894230 Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Mon, 8 Jul 2024 15:39:42 -0600 Subject: [PATCH 2/8] Don't delete previous shm frames in output --- frigate/output/output.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/frigate/output/output.py b/frigate/output/output.py index e458d3242e..d0fd430683 100644 --- a/frigate/output/output.py +++ b/frigate/output/output.py @@ -45,7 +45,6 @@ def receiveSignal(signalNumber, frame): signal.signal(signal.SIGINT, receiveSignal) frame_manager = SharedMemoryFrameManager() - previous_frames = {} # start a websocket server on 8082 WebSocketWSGIHandler.http_version = "1.1" @@ -125,12 +124,6 @@ def receiveSignal(signalNumber, frame): current_tracked_objects, motion_boxes, frame_time, frame ) - # delete frames after they have been used for output - if camera in previous_frames: - frame_manager.delete(f"{camera}{previous_frames[camera]}") - - previous_frames[camera] = frame_time - move_preview_frames("clips") while True: @@ -149,7 +142,7 @@ def receiveSignal(signalNumber, frame): frame_id = f"{camera}{frame_time}" frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv) - frame_manager.delete(frame_id) + frame_manager.close(frame_id) detection_subscriber.stop() From 42be7efde47bcec5f7336b293ff409152358104d Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Mon, 8 Jul 2024 16:30:05 -0600 Subject: [PATCH 3/8] Catch case where images do not exist --- frigate/object_processing.py | 11 ++++++++--- frigate/output/output.py | 6 +++++- frigate/util/image.py | 2 ++ frigate/video.py | 11 ++++++----- 4 files changed, 21 insertions(+), 9 deletions(-) diff --git a/frigate/object_processing.py b/frigate/object_processing.py index 7ac0b7276c..f9c513d6b8 100644 --- a/frigate/object_processing.py +++ b/frigate/object_processing.py @@ -659,9 +659,14 @@ def on(self, event_type: str, callback: Callable[[dict], None]): def update(self, frame_time, current_detections, motion_boxes, regions): # get the new frame frame_id = f"{self.name}{frame_time}" - current_frame = self.frame_manager.get( - frame_id, self.camera_config.frame_shape_yuv - ) + + try: + current_frame = self.frame_manager.get( + frame_id, self.camera_config.frame_shape_yuv + ) + except FileNotFoundError: + logger.error(f"Failed to get {frame_id} from SHM") + return tracked_objects = self.tracked_objects.copy() current_ids = set(current_detections.keys()) diff --git a/frigate/output/output.py b/frigate/output/output.py index d0fd430683..d932d88d3b 100644 --- a/frigate/output/output.py +++ b/frigate/output/output.py @@ -94,7 +94,11 @@ def receiveSignal(signalNumber, frame): frame_id = f"{camera}{frame_time}" - frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv) + try: + frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv) + except FileNotFoundError: + logger.error(f"Failed to get {frame_id} from SHM") + continue # send camera frame to ffmpeg process if websockets are connected if any( diff --git a/frigate/util/image.py b/frigate/util/image.py index 3962d9600e..4f5f405158 100644 --- a/frigate/util/image.py +++ b/frigate/util/image.py @@ -712,6 +712,8 @@ def delete(self, name): self.shm_store[name].close() self.shm_store[name].unlink() del self.shm_store[name] + else: + logger.error(f"Could not delete {name} the store is {self.shm_store}") def create_mask(frame_shape, mask): diff --git a/frigate/video.py b/frigate/video.py index 69a65bfa0c..3ab0608264 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -147,8 +147,6 @@ def capture_frames( try: # add to the queue frame_queue.put(current_frame.value, False) - # close the frame - frame_manager.close(frame_name) except queue.Full: # if the queue is full, skip this frame skipped_eps.update() @@ -572,9 +570,12 @@ def process_frames( current_frame_time.value = frame_time ptz_metrics["ptz_frame_time"].value = frame_time - frame = frame_manager.get( - f"{camera_name}{frame_time}", (frame_shape[0] * 3 // 2, frame_shape[1]) - ) + try: + frame = frame_manager.get( + f"{camera_name}{frame_time}", (frame_shape[0] * 3 // 2, frame_shape[1]) + ) + except FileNotFoundError: + frame = None if frame is None: logger.info(f"{camera_name}: frame {frame_time} is not in memory store.") From f24ba6bfc18a419f47cb5d8c4550ee6e2c2cced9 Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Mon, 8 Jul 2024 17:15:07 -0600 Subject: [PATCH 4/8] Ensure files are closed --- frigate/output/output.py | 1 + frigate/util/image.py | 12 +++++------- frigate/video.py | 5 +++-- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/frigate/output/output.py b/frigate/output/output.py index d932d88d3b..5904f0b104 100644 --- a/frigate/output/output.py +++ b/frigate/output/output.py @@ -127,6 +127,7 @@ def receiveSignal(signalNumber, frame): preview_recorders[camera].write_data( current_tracked_objects, motion_boxes, frame_time, frame ) + frame_manager.close(frame_id) move_preview_frames("clips") diff --git a/frigate/util/image.py b/frigate/util/image.py index 4f5f405158..cb8fd18ce0 100644 --- a/frigate/util/image.py +++ b/frigate/util/image.py @@ -687,14 +687,14 @@ def delete(self, name): class SharedMemoryFrameManager(FrameManager): def __init__(self): - self.shm_store = {} + self.shm_store: dict[str, shared_memory.SharedMemory] = {} - def create(self, name, size) -> AnyStr: + def create(self, name: str, size) -> AnyStr: shm = shared_memory.SharedMemory(name=name, create=True, size=size) self.shm_store[name] = shm return shm.buf - def get(self, name, shape): + def get(self, name: str, shape): if name in self.shm_store: shm = self.shm_store[name] else: @@ -702,18 +702,16 @@ def get(self, name, shape): self.shm_store[name] = shm return np.ndarray(shape, dtype=np.uint8, buffer=shm.buf) - def close(self, name): + def close(self, name: str): if name in self.shm_store: self.shm_store[name].close() del self.shm_store[name] - def delete(self, name): + def delete(self, name: str): if name in self.shm_store: self.shm_store[name].close() self.shm_store[name].unlink() del self.shm_store[name] - else: - logger.error(f"Could not delete {name} the store is {self.shm_store}") def create_mask(frame_shape, mask): diff --git a/frigate/video.py b/frigate/video.py index 3ab0608264..dd44109b3a 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -109,7 +109,7 @@ def capture_frames( skipped_eps = EventsPerSecond() skipped_eps.start() - shm_count = max(10, config.detect.fps * 2) + shm_count = 5 shm_frames: list[str] = [] while True: @@ -128,6 +128,8 @@ def capture_frames( expired_frame_name = shm_frames.pop(0) frame_manager.delete(expired_frame_name) except Exception: + frame_manager.delete(frame_name) + # shutdown has been initiated if stop_event.is_set(): break @@ -150,7 +152,6 @@ def capture_frames( except queue.Full: # if the queue is full, skip this frame skipped_eps.update() - frame_manager.delete(frame_name) class CameraWatchdog(threading.Thread): From d1540c70b17892e7d59c85e7b43b865a4a600785 Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Mon, 8 Jul 2024 17:16:48 -0600 Subject: [PATCH 5/8] Clear out all frames when shutting down --- frigate/video.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/frigate/video.py b/frigate/video.py index dd44109b3a..7446356240 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -153,6 +153,10 @@ def capture_frames( # if the queue is full, skip this frame skipped_eps.update() + # clear out frames + for frame in shm_frames: + frame_manager.delete(frame) + class CameraWatchdog(threading.Thread): def __init__( From c4fbe6df656f0baead17e24ed10d394ed0269932 Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Mon, 8 Jul 2024 17:18:06 -0600 Subject: [PATCH 6/8] Correct the number of frames saved --- frigate/video.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/frigate/video.py b/frigate/video.py index 7446356240..88a2653814 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -109,7 +109,7 @@ def capture_frames( skipped_eps = EventsPerSecond() skipped_eps.start() - shm_count = 5 + shm_count = max(10, config.detect.fps * 2) shm_frames: list[str] = [] while True: From 03e32118ae999c0d0cf1774d6e428eb54cf83991 Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Tue, 9 Jul 2024 06:36:15 -0600 Subject: [PATCH 7/8] Simplify empty shm error handling --- frigate/object_processing.py | 17 ++++++++--------- frigate/output/output.py | 7 +++---- frigate/util/image.py | 18 +++++++++++------- frigate/video.py | 9 +++------ 4 files changed, 25 insertions(+), 26 deletions(-) diff --git a/frigate/object_processing.py b/frigate/object_processing.py index f9c513d6b8..aed850fe3b 100644 --- a/frigate/object_processing.py +++ b/frigate/object_processing.py @@ -660,13 +660,9 @@ def update(self, frame_time, current_detections, motion_boxes, regions): # get the new frame frame_id = f"{self.name}{frame_time}" - try: - current_frame = self.frame_manager.get( - frame_id, self.camera_config.frame_shape_yuv - ) - except FileNotFoundError: - logger.error(f"Failed to get {frame_id} from SHM") - return + current_frame = self.frame_manager.get( + frame_id, self.camera_config.frame_shape_yuv + ) tracked_objects = self.tracked_objects.copy() current_ids = set(current_detections.keys()) @@ -698,7 +694,7 @@ def update(self, frame_time, current_detections, motion_boxes, regions): for c in self.callbacks["autotrack"]: c(self.name, updated_obj, frame_time) - if thumb_update: + if thumb_update and current_frame is not None: # ensure this frame is stored in the cache if ( updated_obj.thumbnail_data["frame_time"] == frame_time @@ -858,7 +854,10 @@ def update(self, frame_time, current_detections, motion_boxes, regions): self.current_frame_time = frame_time self.motion_boxes = motion_boxes self.regions = regions - self._current_frame = current_frame + + if current_frame is not None: + self._current_frame = current_frame + if self.previous_frame_id is not None: self.frame_manager.close(self.previous_frame_id) self.previous_frame_id = frame_id diff --git a/frigate/output/output.py b/frigate/output/output.py index 5904f0b104..6339e2b9c6 100644 --- a/frigate/output/output.py +++ b/frigate/output/output.py @@ -94,10 +94,9 @@ def receiveSignal(signalNumber, frame): frame_id = f"{camera}{frame_time}" - try: - frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv) - except FileNotFoundError: - logger.error(f"Failed to get {frame_id} from SHM") + frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv) + + if frame is None: continue # send camera frame to ffmpeg process if websockets are connected diff --git a/frigate/util/image.py b/frigate/util/image.py index cb8fd18ce0..a3619193fb 100644 --- a/frigate/util/image.py +++ b/frigate/util/image.py @@ -694,13 +694,17 @@ def create(self, name: str, size) -> AnyStr: self.shm_store[name] = shm return shm.buf - def get(self, name: str, shape): - if name in self.shm_store: - shm = self.shm_store[name] - else: - shm = shared_memory.SharedMemory(name=name) - self.shm_store[name] = shm - return np.ndarray(shape, dtype=np.uint8, buffer=shm.buf) + def get(self, name: str, shape) -> Optional[np.ndarray]: + try: + if name in self.shm_store: + shm = self.shm_store[name] + else: + shm = shared_memory.SharedMemory(name=name) + self.shm_store[name] = shm + return np.ndarray(shape, dtype=np.uint8, buffer=shm.buf) + except FileNotFoundError: + logger.error(f"Failed to get {name} from SHM") + return None def close(self, name: str): if name in self.shm_store: diff --git a/frigate/video.py b/frigate/video.py index 88a2653814..3397de6e4f 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -575,12 +575,9 @@ def process_frames( current_frame_time.value = frame_time ptz_metrics["ptz_frame_time"].value = frame_time - try: - frame = frame_manager.get( - f"{camera_name}{frame_time}", (frame_shape[0] * 3 // 2, frame_shape[1]) - ) - except FileNotFoundError: - frame = None + frame = frame_manager.get( + f"{camera_name}{frame_time}", (frame_shape[0] * 3 // 2, frame_shape[1]) + ) if frame is None: logger.info(f"{camera_name}: frame {frame_time} is not in memory store.") From 2dbec2967e779ce4fbf720b29d21dd450356f4b6 Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Tue, 9 Jul 2024 06:37:42 -0600 Subject: [PATCH 8/8] Improve frame safety --- frigate/object_processing.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/frigate/object_processing.py b/frigate/object_processing.py index aed850fe3b..aed157f466 100644 --- a/frigate/object_processing.py +++ b/frigate/object_processing.py @@ -851,15 +851,16 @@ def update(self, frame_time, current_detections, motion_boxes, regions): with self.current_frame_lock: self.tracked_objects = tracked_objects - self.current_frame_time = frame_time self.motion_boxes = motion_boxes self.regions = regions if current_frame is not None: + self.current_frame_time = frame_time self._current_frame = current_frame - if self.previous_frame_id is not None: - self.frame_manager.close(self.previous_frame_id) + if self.previous_frame_id is not None: + self.frame_manager.close(self.previous_frame_id) + self.previous_frame_id = frame_id