Skip to content

Commit

Permalink
Revert "Limited shm frame count (#12346)"
Browse files Browse the repository at this point in the history
This reverts commit 34812b7.
  • Loading branch information
hawkeye217 authored Jul 9, 2024
1 parent 34812b7 commit 7282b15
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 56 deletions.
15 changes: 5 additions & 10 deletions frigate/object_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,6 @@ def on(self, event_type: str, callback: Callable[[dict], None]):
def update(self, frame_time, current_detections, motion_boxes, regions):
# get the new frame
frame_id = f"{self.name}{frame_time}"

current_frame = self.frame_manager.get(
frame_id, self.camera_config.frame_shape_yuv
)
Expand Down Expand Up @@ -694,7 +693,7 @@ def update(self, frame_time, current_detections, motion_boxes, regions):
for c in self.callbacks["autotrack"]:
c(self.name, updated_obj, frame_time)

if thumb_update and current_frame is not None:
if thumb_update:
# ensure this frame is stored in the cache
if (
updated_obj.thumbnail_data["frame_time"] == frame_time
Expand Down Expand Up @@ -851,16 +850,12 @@ def update(self, frame_time, current_detections, motion_boxes, regions):

with self.current_frame_lock:
self.tracked_objects = tracked_objects
self.current_frame_time = frame_time
self.motion_boxes = motion_boxes
self.regions = regions

if current_frame is not None:
self.current_frame_time = frame_time
self._current_frame = current_frame

if self.previous_frame_id is not None:
self.frame_manager.close(self.previous_frame_id)

self._current_frame = current_frame
if self.previous_frame_id is not None:
self.frame_manager.close(self.previous_frame_id)
self.previous_frame_id = frame_id


Expand Down
13 changes: 8 additions & 5 deletions frigate/output/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def receiveSignal(signalNumber, frame):
signal.signal(signal.SIGINT, receiveSignal)

frame_manager = SharedMemoryFrameManager()
previous_frames = {}

# start a websocket server on 8082
WebSocketWSGIHandler.http_version = "1.1"
Expand Down Expand Up @@ -96,9 +97,6 @@ def receiveSignal(signalNumber, frame):

frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv)

if frame is None:
continue

# send camera frame to ffmpeg process if websockets are connected
if any(
ws.environ["PATH_INFO"].endswith(camera) for ws in websocket_server.manager
Expand Down Expand Up @@ -126,7 +124,12 @@ def receiveSignal(signalNumber, frame):
preview_recorders[camera].write_data(
current_tracked_objects, motion_boxes, frame_time, frame
)
frame_manager.close(frame_id)

# delete frames after they have been used for output
if camera in previous_frames:
frame_manager.delete(f"{camera}{previous_frames[camera]}")

previous_frames[camera] = frame_time

move_preview_frames("clips")

Expand All @@ -146,7 +149,7 @@ def receiveSignal(signalNumber, frame):

frame_id = f"{camera}{frame_time}"
frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv)
frame_manager.close(frame_id)
frame_manager.delete(frame_id)

detection_subscriber.stop()

Expand Down
26 changes: 11 additions & 15 deletions frigate/util/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,31 +687,27 @@ def delete(self, name):

class SharedMemoryFrameManager(FrameManager):
def __init__(self):
self.shm_store: dict[str, shared_memory.SharedMemory] = {}
self.shm_store = {}

def create(self, name: str, size) -> AnyStr:
def create(self, name, size) -> AnyStr:
shm = shared_memory.SharedMemory(name=name, create=True, size=size)
self.shm_store[name] = shm
return shm.buf

def get(self, name: str, shape) -> Optional[np.ndarray]:
try:
if name in self.shm_store:
shm = self.shm_store[name]
else:
shm = shared_memory.SharedMemory(name=name)
self.shm_store[name] = shm
return np.ndarray(shape, dtype=np.uint8, buffer=shm.buf)
except FileNotFoundError:
logger.error(f"Failed to get {name} from SHM")
return None
def get(self, name, shape):
if name in self.shm_store:
shm = self.shm_store[name]
else:
shm = shared_memory.SharedMemory(name=name)
self.shm_store[name] = shm
return np.ndarray(shape, dtype=np.uint8, buffer=shm.buf)

def close(self, name: str):
def close(self, name):
if name in self.shm_store:
self.shm_store[name].close()
del self.shm_store[name]

def delete(self, name: str):
def delete(self, name):
if name in self.shm_store:
self.shm_store[name].close()
self.shm_store[name].unlink()
Expand Down
39 changes: 13 additions & 26 deletions frigate/video.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def start_or_restart_ffmpeg(

def capture_frames(
ffmpeg_process,
config: CameraConfig,
camera_name,
frame_shape,
frame_manager: FrameManager,
frame_queue,
Expand All @@ -108,36 +108,24 @@ def capture_frames(
frame_rate.start()
skipped_eps = EventsPerSecond()
skipped_eps.start()

shm_count = max(10, config.detect.fps * 2)
shm_frames: list[str] = []

while True:
fps.value = frame_rate.eps()
skipped_fps.value = skipped_eps.eps()

current_frame.value = datetime.datetime.now().timestamp()
frame_name = f"{config.name}{current_frame.value}"
frame_name = f"{camera_name}{current_frame.value}"
frame_buffer = frame_manager.create(frame_name, frame_size)
try:
frame_buffer[:] = ffmpeg_process.stdout.read(frame_size)

# update frame cache and cleanup existing frames
shm_frames.append(frame_name)

if len(shm_frames) > shm_count:
expired_frame_name = shm_frames.pop(0)
frame_manager.delete(expired_frame_name)
except Exception:
frame_manager.delete(frame_name)

# shutdown has been initiated
if stop_event.is_set():
break
logger.error(f"{config.name}: Unable to read frames from ffmpeg process.")
logger.error(f"{camera_name}: Unable to read frames from ffmpeg process.")

if ffmpeg_process.poll() is not None:
logger.error(
f"{config.name}: ffmpeg process is not running. exiting capture thread..."
f"{camera_name}: ffmpeg process is not running. exiting capture thread..."
)
frame_manager.delete(frame_name)
break
Expand All @@ -149,13 +137,12 @@ def capture_frames(
try:
# add to the queue
frame_queue.put(current_frame.value, False)
# close the frame
frame_manager.close(frame_name)
except queue.Full:
# if the queue is full, skip this frame
skipped_eps.update()

# clear out frames
for frame in shm_frames:
frame_manager.delete(frame)
frame_manager.delete(frame_name)


class CameraWatchdog(threading.Thread):
Expand Down Expand Up @@ -295,7 +282,7 @@ def start_ffmpeg_detect(self):
)
self.ffmpeg_pid.value = self.ffmpeg_detect_process.pid
self.capture_thread = CameraCapture(
self.config,
self.camera_name,
self.ffmpeg_detect_process,
self.frame_shape,
self.frame_queue,
Expand Down Expand Up @@ -334,7 +321,7 @@ def get_latest_segment_datetime(self, latest_segment: datetime.datetime) -> int:
class CameraCapture(threading.Thread):
def __init__(
self,
config: CameraConfig,
camera_name,
ffmpeg_process,
frame_shape,
frame_queue,
Expand All @@ -343,8 +330,8 @@ def __init__(
stop_event,
):
threading.Thread.__init__(self)
self.name = f"capture:{config.name}"
self.config = config
self.name = f"capture:{camera_name}"
self.camera_name = camera_name
self.frame_shape = frame_shape
self.frame_queue = frame_queue
self.fps = fps
Expand All @@ -358,7 +345,7 @@ def __init__(
def run(self):
capture_frames(
self.ffmpeg_process,
self.config,
self.camera_name,
self.frame_shape,
self.frame_manager,
self.frame_queue,
Expand Down

0 comments on commit 7282b15

Please sign in to comment.