Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limited shm frame count #12346

Merged
merged 8 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions frigate/object_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,7 @@ def on(self, event_type: str, callback: Callable[[dict], None]):
def update(self, frame_time, current_detections, motion_boxes, regions):
# get the new frame
frame_id = f"{self.name}{frame_time}"

current_frame = self.frame_manager.get(
frame_id, self.camera_config.frame_shape_yuv
)
Expand Down Expand Up @@ -693,7 +694,7 @@ def update(self, frame_time, current_detections, motion_boxes, regions):
for c in self.callbacks["autotrack"]:
c(self.name, updated_obj, frame_time)

if thumb_update:
if thumb_update and current_frame is not None:
# ensure this frame is stored in the cache
if (
updated_obj.thumbnail_data["frame_time"] == frame_time
Expand Down Expand Up @@ -850,12 +851,16 @@ def update(self, frame_time, current_detections, motion_boxes, regions):

with self.current_frame_lock:
self.tracked_objects = tracked_objects
self.current_frame_time = frame_time
self.motion_boxes = motion_boxes
self.regions = regions
self._current_frame = current_frame
if self.previous_frame_id is not None:
self.frame_manager.close(self.previous_frame_id)

if current_frame is not None:
self.current_frame_time = frame_time
self._current_frame = current_frame

if self.previous_frame_id is not None:
self.frame_manager.close(self.previous_frame_id)

self.previous_frame_id = frame_id


Expand Down
13 changes: 5 additions & 8 deletions frigate/output/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ def receiveSignal(signalNumber, frame):
signal.signal(signal.SIGINT, receiveSignal)

frame_manager = SharedMemoryFrameManager()
previous_frames = {}

# start a websocket server on 8082
WebSocketWSGIHandler.http_version = "1.1"
Expand Down Expand Up @@ -97,6 +96,9 @@ def receiveSignal(signalNumber, frame):

frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv)

if frame is None:
continue

# send camera frame to ffmpeg process if websockets are connected
if any(
ws.environ["PATH_INFO"].endswith(camera) for ws in websocket_server.manager
Expand Down Expand Up @@ -124,12 +126,7 @@ def receiveSignal(signalNumber, frame):
preview_recorders[camera].write_data(
current_tracked_objects, motion_boxes, frame_time, frame
)

# delete frames after they have been used for output
if camera in previous_frames:
frame_manager.delete(f"{camera}{previous_frames[camera]}")

previous_frames[camera] = frame_time
frame_manager.close(frame_id)

move_preview_frames("clips")

Expand All @@ -149,7 +146,7 @@ def receiveSignal(signalNumber, frame):

frame_id = f"{camera}{frame_time}"
frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv)
frame_manager.delete(frame_id)
frame_manager.close(frame_id)

detection_subscriber.stop()

Expand Down
26 changes: 15 additions & 11 deletions frigate/util/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,27 +687,31 @@ def delete(self, name):

class SharedMemoryFrameManager(FrameManager):
def __init__(self):
self.shm_store = {}
self.shm_store: dict[str, shared_memory.SharedMemory] = {}

def create(self, name, size) -> AnyStr:
def create(self, name: str, size) -> AnyStr:
shm = shared_memory.SharedMemory(name=name, create=True, size=size)
self.shm_store[name] = shm
return shm.buf

def get(self, name, shape):
if name in self.shm_store:
shm = self.shm_store[name]
else:
shm = shared_memory.SharedMemory(name=name)
self.shm_store[name] = shm
return np.ndarray(shape, dtype=np.uint8, buffer=shm.buf)
def get(self, name: str, shape) -> Optional[np.ndarray]:
try:
if name in self.shm_store:
shm = self.shm_store[name]
else:
shm = shared_memory.SharedMemory(name=name)
self.shm_store[name] = shm
return np.ndarray(shape, dtype=np.uint8, buffer=shm.buf)
except FileNotFoundError:
logger.error(f"Failed to get {name} from SHM")
return None

def close(self, name):
def close(self, name: str):
if name in self.shm_store:
self.shm_store[name].close()
del self.shm_store[name]

def delete(self, name):
def delete(self, name: str):
if name in self.shm_store:
self.shm_store[name].close()
self.shm_store[name].unlink()
Expand Down
39 changes: 26 additions & 13 deletions frigate/video.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def start_or_restart_ffmpeg(

def capture_frames(
ffmpeg_process,
camera_name,
config: CameraConfig,
frame_shape,
frame_manager: FrameManager,
frame_queue,
Expand All @@ -108,24 +108,36 @@ def capture_frames(
frame_rate.start()
skipped_eps = EventsPerSecond()
skipped_eps.start()

shm_count = max(10, config.detect.fps * 2)
shm_frames: list[str] = []

while True:
fps.value = frame_rate.eps()
skipped_fps.value = skipped_eps.eps()

current_frame.value = datetime.datetime.now().timestamp()
frame_name = f"{camera_name}{current_frame.value}"
frame_name = f"{config.name}{current_frame.value}"
frame_buffer = frame_manager.create(frame_name, frame_size)
try:
frame_buffer[:] = ffmpeg_process.stdout.read(frame_size)

# update frame cache and cleanup existing frames
shm_frames.append(frame_name)

if len(shm_frames) > shm_count:
expired_frame_name = shm_frames.pop(0)
frame_manager.delete(expired_frame_name)
except Exception:
frame_manager.delete(frame_name)

# shutdown has been initiated
if stop_event.is_set():
break
logger.error(f"{camera_name}: Unable to read frames from ffmpeg process.")
logger.error(f"{config.name}: Unable to read frames from ffmpeg process.")

if ffmpeg_process.poll() is not None:
logger.error(
f"{camera_name}: ffmpeg process is not running. exiting capture thread..."
f"{config.name}: ffmpeg process is not running. exiting capture thread..."
)
frame_manager.delete(frame_name)
break
Expand All @@ -137,12 +149,13 @@ def capture_frames(
try:
# add to the queue
frame_queue.put(current_frame.value, False)
# close the frame
frame_manager.close(frame_name)
except queue.Full:
# if the queue is full, skip this frame
skipped_eps.update()
frame_manager.delete(frame_name)

# clear out frames
for frame in shm_frames:
frame_manager.delete(frame)


class CameraWatchdog(threading.Thread):
Expand Down Expand Up @@ -282,7 +295,7 @@ def start_ffmpeg_detect(self):
)
self.ffmpeg_pid.value = self.ffmpeg_detect_process.pid
self.capture_thread = CameraCapture(
self.camera_name,
self.config,
self.ffmpeg_detect_process,
self.frame_shape,
self.frame_queue,
Expand Down Expand Up @@ -321,7 +334,7 @@ def get_latest_segment_datetime(self, latest_segment: datetime.datetime) -> int:
class CameraCapture(threading.Thread):
def __init__(
self,
camera_name,
config: CameraConfig,
ffmpeg_process,
frame_shape,
frame_queue,
Expand All @@ -330,8 +343,8 @@ def __init__(
stop_event,
):
threading.Thread.__init__(self)
self.name = f"capture:{camera_name}"
self.camera_name = camera_name
self.name = f"capture:{config.name}"
self.config = config
self.frame_shape = frame_shape
self.frame_queue = frame_queue
self.fps = fps
Expand All @@ -345,7 +358,7 @@ def __init__(
def run(self):
capture_frames(
self.ffmpeg_process,
self.camera_name,
self.config,
self.frame_shape,
self.frame_manager,
self.frame_queue,
Expand Down