Skip to content

Commit

Permalink
RTSPCameraAgent: connection robustness (#816)
Browse files Browse the repository at this point in the history
* add reconnect attempt

* fix connect check; remove session set_status
  • Loading branch information
davidvng authored Jan 29, 2025
1 parent a31809c commit 2dc432b
Showing 1 changed file with 28 additions and 17 deletions.
45 changes: 28 additions & 17 deletions socs/agents/rtsp_camera/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,18 @@ def _dt_convert(timestr):
else:
return False

def _init_stream(self):
"""Connect to camera and initialize video stream."""
if self.fake:
cap = FakeCamera()
else:
cap = cv2.VideoCapture(self.connection)
if not cap.isOpened():
self.log.error(f"Cannot open RTSP stream at {self.connection}")
return None

return cap

@ocs_agent.param("test_mode", default=False, type=bool)
def acq(self, session, params=None):
"""acq(test_mode=False)
Expand Down Expand Up @@ -205,17 +217,13 @@ def acq(self, session, params=None):

frames_per_snapshot = int(self.seconds * self.record_fps)

session.set_status("running")
self.is_streaming = True

# Open camera stream
if self.fake:
cap = FakeCamera()
else:
cap = cv2.VideoCapture(self.connection)
if not cap.isOpened():
self.log.error(f"Cannot open RTSP stream at {self.connection}")
return False, "Could not open RTSP stream"
cap = self._init_stream()
connected = True
if not cap:
connected = False

# Tracking state of whether we are currently recording motion detection
detecting = False
Expand All @@ -225,6 +233,10 @@ def acq(self, session, params=None):

snap_count = 0
while self.is_streaming:
if not connected:
self.log.info("Trying to reconnect.")
cap = self._init_stream()

# Use UTC
timestamp = time.time()
data = dict()
Expand All @@ -237,7 +249,9 @@ def acq(self, session, params=None):
if not success:
msg = "Failed to retrieve snapshot image from stream"
self.log.error(msg)
return False, "Broken stream"
connected = False
continue
connected = True

# Motion detection. We ignore the first few snapshots and also
# any changes that happen while we are already recording.
Expand Down Expand Up @@ -276,6 +290,7 @@ def acq(self, session, params=None):
"address": self.address,
"timestamp": timestamp,
"path": path,
"connected": connected
}

# Update session.data and publish
Expand All @@ -288,6 +303,7 @@ def acq(self, session, params=None):
"data": {
"address": self.address,
"path": path,
"connected": connected
},
}
session.app.publish_to_feed(self.feed_name, message)
Expand All @@ -309,7 +325,6 @@ def _stop_acq(self, session, params=None):
**Task** - Stop task associated with acq process.
"""
if self.is_streaming:
session.set_status("stopping")
self.is_streaming = False
return True, "Stopping Acquisition"
else:
Expand Down Expand Up @@ -338,13 +353,9 @@ def record(self, session, params=None):

# Open camera stream
self.log.info("Recording: opening camera stream")
if self.fake:
cap = FakeCamera()
else:
cap = cv2.VideoCapture(self.connection)
if not cap.isOpened():
self.log.error(f"Cannot open RTSP stream at {self.connection}")
return False, "Cannot connect to camera"
cap = self._init_stream()
if not cap:
return False, "Cannot connect to camera."

# Total number of frames
total_frames = int(self.record_fps * self.record_duration)
Expand Down

0 comments on commit 2dc432b

Please sign in to comment.