Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Depthai Viewer v0.0.1 (WIP) #5

Merged
merged 18 commits into from
May 18, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix mypy lint (TODO: Proper typing, especially in the comms from back…
… to store to ws) other types are pretty solid
zrezke committed May 18, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit ed8217e1bb7a3e413541cff28ed4df24a80503f1
2 changes: 1 addition & 1 deletion .mypy.ini
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[mypy]
files = rerun_py/rerun_sdk/rerun, rerun_py/tests, examples/python
files = rerun_py/rerun_sdk/depthai_viewer, rerun_py/tests, examples/python
exclude = examples/python/objectron/proto|examples/python/ros
namespace_packages = True
show_error_codes = True
4 changes: 0 additions & 4 deletions crates/re_viewer/src/ui/auto_layout.rs
Original file line number Diff line number Diff line change
@@ -394,7 +394,6 @@ pub(crate) fn default_tree_from_space_views(
// - Addition should try to layout like currently 3d, 2d views. New views just appear in the top left corner i guess.
let mut tree = egui_dock::Tree::new(Vec::new());

let mut is_color_stream_present = false;
let spaces = space_views
.iter()
.filter(|(space_view_id, _space_view)| visible.contains(space_view_id))
@@ -407,9 +406,6 @@ pub(crate) fn default_tree_from_space_views(
)
})
.map(|(space_view_id, space_view)| {
if space_view.space_path == EntityPath::from("color") {
is_color_stream_present = true;
}
let aspect_ratio = match space_view.category {
ViewCategory::Spatial => {
let state_spatial = &space_view.view_state.state_spatial;
1 change: 1 addition & 0 deletions rerun_py/requirements-lint.txt
Original file line number Diff line number Diff line change
@@ -7,3 +7,4 @@ pyupgrade==2.37.3
ruff==0.0.251
types-Deprecated==1.2.9
types-requests==2.28.10
types-setuptools==67.7.0.2
47 changes: 26 additions & 21 deletions rerun_py/rerun_sdk/depthai_viewer/_backend/config_api.py
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@
from multiprocessing import Queue
from queue import Empty as QueueEmptyException
from signal import SIGINT, signal
from typing import Dict, Tuple
from typing import Dict, Tuple, Any

import depthai as dai
import websockets
@@ -13,24 +13,29 @@
from depthai_viewer._backend.store import Action
from depthai_viewer._backend.topic import Topic

from enum import Enum

signal(SIGINT, lambda *args, **kwargs: exit(0))

# Definitions for linting
# send actions to back
dispatch_action_queue: Queue = None
dispatch_action_queue: Queue[Any]

# bool indicating action success
result_queue: Queue = None
send_message_queue: Queue = None
result_queue: Queue[Any]

# Send messages from backend to frontend without the frontend sending a message first
send_message_queue: Queue[Any]


def dispatch_action(action: Action, **kwargs) -> Tuple[bool, Dict[str, any]]:
def dispatch_action(action: Action, **kwargs) -> Tuple[bool, Dict[str, Any]]: # type: ignore[no-untyped-def]
"""
Dispatches an action that will be executed by store.py.
Returns: (success: bool, result: Dict[str, any]).
Returns: (success: bool, result: Dict[str, Any]).
"""
dispatch_action_queue.put((action, kwargs))
return result_queue.get()
return result_queue.get() # type: ignore[no-any-return]


class MessageType:
@@ -41,7 +46,7 @@ class MessageType:
ERROR = "Error" # Error message


class ErrorAction:
class ErrorAction(Enum):
NONE = None
FULL_RESET = "FullReset"

@@ -51,22 +56,22 @@ def error(message: str, action: ErrorAction) -> str:
return json.dumps({"type": MessageType.ERROR, "data": {"action": action, "message": message}})


async def ws_api(websocket: WebSocketServerProtocol):
async def ws_api(websocket: WebSocketServerProtocol) -> None:
while True:
message = None
raw_message = None
try:
message = await asyncio.wait_for(websocket.recv(), 1)
raw_message = await asyncio.wait_for(websocket.recv(), 1)
except asyncio.TimeoutError:
pass
except websockets.exceptions.ConnectionClosed:
success, message = dispatch_action(Action.RESET)
success, _ = dispatch_action(Action.RESET) # type: ignore[assignment]
if success:
return
raise Exception("Couldn't reset backend after websocket disconnect!")

if message:
if raw_message:
try:
message = json.loads(message)
message: Dict[str, Any] = json.loads(raw_message)
except json.JSONDecodeError:
print("Failed to parse message: ", message)
continue
@@ -80,7 +85,7 @@ async def ws_api(websocket: WebSocketServerProtocol):
subscriptions = [Topic.create(topic_name) for topic_name in data.get(MessageType.SUBSCRIPTIONS, [])]
dispatch_action(Action.SET_SUBSCRIPTIONS, subscriptions=subscriptions)
print("Subscriptions: ", subscriptions)
active_subscriptions = [topic.name for topic in dispatch_action(Action.GET_SUBSCRIPTIONS) if topic]
active_subscriptions = [topic.name for topic in dispatch_action(Action.GET_SUBSCRIPTIONS) if topic] # type: ignore[attr-defined]
await websocket.send(json.dumps({"type": MessageType.SUBSCRIPTIONS, "data": active_subscriptions}))
elif message_type == MessageType.PIPELINE:
data = message.get("data", {})
@@ -98,13 +103,13 @@ async def ws_api(websocket: WebSocketServerProtocol):
await websocket.send(error("Failed to set runtime config", ErrorAction.FULL_RESET))
continue
if success:
active_config: PipelineConfiguration = dispatch_action(Action.GET_PIPELINE)
active_config: PipelineConfiguration = dispatch_action(Action.GET_PIPELINE) # type: ignore[assignment]
print("Active config: ", active_config)
await websocket.send(
json.dumps(
{
"type": MessageType.PIPELINE,
"data": (active_config.to_json(), False) if active_config else None,
"data": (active_config.to_json(), False) if active_config != None else None,
}
)
)
@@ -115,7 +120,7 @@ async def ws_api(websocket: WebSocketServerProtocol):
json.dumps(
{
"type": MessageType.DEVICES,
"data": [d.getMxId() for d in dai.Device.getAllAvailableDevices()],
"data": [d.getMxId() for d in dai.Device.getAllAvailableDevices()], # type: ignore[call-arg]
}
)
)
@@ -149,12 +154,12 @@ async def ws_api(websocket: WebSocketServerProtocol):
await websocket.send(send_message)


async def main():
async with websockets.serve(ws_api, "localhost", 9001):
async def main() -> None:
async with websockets.serve(ws_api, "localhost", 9001): # type: ignore[attr-defined]
await asyncio.Future() # run forever


def start_api(_dispatch_action_queue: Queue, _result_queue: Queue, _send_message_queue: Queue):
def start_api(_dispatch_action_queue: Queue[Any], _result_queue: Queue[Any], _send_message_queue: Queue[Any]) -> None:
"""
Starts the websocket API.
53 changes: 28 additions & 25 deletions rerun_py/rerun_sdk/depthai_viewer/_backend/device_configuration.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, Optional
from typing import Dict, Optional, Any

import depthai as dai
from depthai_sdk import Previews as QueueNames
@@ -24,7 +24,7 @@ class Config:
dai.CameraBoardSocket: lambda v: v.name,
}

def __init__(self, **v):
def __init__(self, **v) -> None: # type: ignore[no-untyped-def]
if v.get("resolution"):
v["resolution"] = getattr(dai.ColorCameraProperties.SensorResolution, v["resolution"])
if v.get("board_socket"):
@@ -33,14 +33,15 @@ def __init__(self, **v):

@property
# Make this select the queue based on ui, also probably not just one queue
def out_queue_name(self) -> str | None:
prefix = QueueNames.color.name
def out_queue_name(self) -> Optional[str]:
prefix: str = QueueNames.color.name
if self.out_preview:
return prefix + "_preview"
if self.xout_still:
return prefix + "_still"
if self.xout_video:
return prefix + "_video"
return None


class MonoCameraConfiguration(BaseModel):
@@ -60,7 +61,7 @@ class Config:
dai.CameraBoardSocket: lambda v: v.name,
}

def __init__(self, **v):
def __init__(self, **v) -> None: # type: ignore[no-untyped-def]
if v.get("resolution"):
v["resolution"] = getattr(dai.MonoCameraProperties.SensorResolution, v["resolution"])
if v.get("board_socket"):
@@ -72,11 +73,11 @@ def out_queue_name(self) -> str:
return "left" if self.board_socket == dai.CameraBoardSocket.LEFT else "right"

@classmethod
def create_left(cls, **kwargs):
def create_left(cls, **kwargs) -> "MonoCameraConfiguration": # type: ignore[no-untyped-def]
return cls(board_socket="LEFT", **kwargs)

@classmethod
def create_right(cls, **kwargs):
def create_right(cls, **kwargs) -> "MonoCameraConfiguration": # type: ignore[no-untyped-def]
return cls(board_socket="RIGHT", **kwargs)


@@ -85,7 +86,7 @@ def create_right(cls, **kwargs):


class DepthConfiguration(BaseModel):
median: Optional[dai.StereoDepthProperties.MedianFilter] = dai.StereoDepthProperties.MedianFilter.KERNEL_7x7
median: Optional[dai.MedianFilter] = dai.MedianFilter.KERNEL_7x7
lr_check: Optional[bool] = True
lrc_threshold: int = 5 # 0..10
extended_disparity: Optional[bool] = False
@@ -98,15 +99,15 @@ class DepthConfiguration(BaseModel):
class Config:
arbitrary_types_allowed = True

def __init__(self, **v):
def __init__(self, **v) -> None: # type: ignore[no-untyped-def]
if v.get("median"):
v["median"] = getattr(dai.StereoDepthProperties.MedianFilter, v["median"])
v["median"] = getattr(dai.MedianFilter, v["median"])
if v.get("align"):
v["align"] = getattr(dai.CameraBoardSocket, v["align"])

return super().__init__(**v)

def to_runtime_controls(self) -> Dict:
def to_runtime_controls(self) -> Dict[str, Any]:
return {
"algorithm_control": {
"align": "RECTIFIED_LEFT"
@@ -121,11 +122,13 @@ def to_runtime_controls(self) -> Dict:
},
"postprocessing": {
"median": {
dai.StereoDepthConfig.MedianFilter.MEDIAN_OFF: 0,
dai.StereoDepthConfig.MedianFilter.KERNEL_3x3: 3,
dai.StereoDepthConfig.MedianFilter.KERNEL_5x5: 5,
dai.StereoDepthConfig.MedianFilter.KERNEL_7x7: 7,
}[self.median],
dai.MedianFilter.MEDIAN_OFF: 0,
dai.MedianFilter.KERNEL_3x3: 3,
dai.MedianFilter.KERNEL_5x5: 5,
dai.MedianFilter.KERNEL_7x7: 7,
}[self.median]
if self.median
else 0,
"bilateral_sigma": self.sigma,
},
"cost_matching": {
@@ -135,12 +138,12 @@ def to_runtime_controls(self) -> Dict:

@property
def out_queue_name(self) -> str:
return QueueNames.depthRaw.name
return str(QueueNames.depthRaw.name)


class AiModelConfiguration(BaseModel):
display_name: str
path: str
display_name: str = "Yolo V8"
path: str = "yolov8n_coco_640x352"


class ImuConfiguration(BaseModel):
@@ -152,15 +155,15 @@ class PipelineConfiguration(BaseModel):
color_camera: ColorCameraConfiguration = ColorCameraConfiguration()
left_camera: MonoCameraConfiguration = MonoCameraConfiguration.create_left()
right_camera: MonoCameraConfiguration = MonoCameraConfiguration.create_right()
depth: DepthConfiguration | None
ai_model: AiModelConfiguration | None
depth: Optional[DepthConfiguration] = DepthConfiguration()
ai_model: Optional[AiModelConfiguration] = AiModelConfiguration()
imu: ImuConfiguration = ImuConfiguration()

def to_json(self):
def to_json(self) -> Dict[str, Any]:
as_dict = self.dict()
return self._fix_depthai_types(as_dict)

def _fix_depthai_types(self, as_dict: dict):
def _fix_depthai_types(self, as_dict: Dict[str, Any]) -> Dict[str, Any]:
"""ATM Config.json_encoders doesn't work, so we manually fix convert the depthai types to strings here."""
if as_dict.get("color_camera"):
as_dict["color_camera"] = self._fix_camera(as_dict["color_camera"])
@@ -172,14 +175,14 @@ def _fix_depthai_types(self, as_dict: dict):
as_dict["depth"] = self._fix_depth(as_dict["depth"])
return as_dict

def _fix_depth(self, as_dict: dict):
def _fix_depth(self, as_dict: Dict[str, Any]) -> Dict[str, Any]:
if as_dict.get("align"):
as_dict["align"] = as_dict["align"].name
if as_dict.get("median"):
as_dict["median"] = as_dict["median"].name
return as_dict

def _fix_camera(self, as_dict: dict):
def _fix_camera(self, as_dict: Dict[str, Any]) -> Dict[str, Any]:
if as_dict.get("resolution"):
as_dict["resolution"] = as_dict["resolution"].name
if as_dict.get("board_socket"):
91 changes: 50 additions & 41 deletions rerun_py/rerun_sdk/depthai_viewer/_backend/main.py
Original file line number Diff line number Diff line change
@@ -3,11 +3,12 @@
import time
from queue import Empty as QueueEmptyException
from queue import Queue
from typing import Dict, Tuple
from typing import Dict, Tuple, Any, Optional, Union

import depthai as dai
import depthai_sdk
import numpy as np
from numpy.typing import NDArray
import pkg_resources
from depthai_sdk import OakCamera
from depthai_sdk.components import CameraComponent, NNComponent, StereoComponent
@@ -43,8 +44,8 @@

class SelectedDevice:
id: str
intrinsic_matrix: Dict[Tuple[int, int], np.ndarray] = {}
calibration_data: dai.CalibrationHandler = None
intrinsic_matrix: Dict[Tuple[int, int], NDArray[np.float32]] = {}
calibration_data: Optional[dai.CalibrationHandler] = None
use_encoding: bool = False
_time_of_last_xlink_update: int = 0

@@ -62,14 +63,16 @@ def __init__(self, device_id: str):
self.oak_cam = OakCamera(self.id)
print("Oak cam: ", self.oak_cam)

def get_intrinsic_matrix(self, width: int, height: int) -> np.ndarray:
if self.intrinsic_matrix.get((width, height)) is np.ndarray:
return self.intrinsic_matrix.get((width, height))
M_right = self.calibration_data.getCameraIntrinsics(dai.CameraBoardSocket.RIGHT, dai.Size2f(width, height))
def get_intrinsic_matrix(self, width: int, height: int) -> NDArray[np.float32]:
if self.intrinsic_matrix.get((width, height)) != None:
return self.intrinsic_matrix.get((width, height)) # type: ignore[return-value]
if self.calibration_data is None:
raise Exception("Missing calibration data!")
M_right = self.calibration_data.getCameraIntrinsics(dai.CameraBoardSocket.RIGHT, dai.Size2f(width, height)) # type: ignore[union-attr]
self.intrinsic_matrix[(width, height)] = np.array(M_right).reshape(3, 3)
return self.intrinsic_matrix[(width, height)]

def get_device_properties(self) -> Dict:
def get_device_properties(self) -> Dict[str, Any]:
dai_props = self.oak_cam.device.getConnectedCameraFeatures()
device_properties = {
"id": self.id,
@@ -85,41 +88,43 @@ def get_device_properties(self) -> Dict:
resolutions_key = "supported_right_mono_resolutions"
for config in cam.configs:
wh = (config.width, config.height)
if wh not in device_properties[resolutions_key]:
device_properties[resolutions_key].append((config.width, config.height))
if wh not in device_properties[resolutions_key]: # type: ignore[comparison-overlap]
device_properties[resolutions_key].append((config.width, config.height)) # type: ignore[attr-defined]
device_properties["supported_color_resolutions"] = list(
map(
lambda x: color_wh_to_enum[x].name,
sorted(device_properties["supported_color_resolutions"], key=lambda x: x[0] * x[1]),
lambda x: color_wh_to_enum[x].name, # type: ignore[index]
sorted(device_properties["supported_color_resolutions"], key=lambda x: x[0] * x[1]), # type: ignore[operator]
)
)
device_properties["supported_left_mono_resolutions"] = list(
map(
lambda x: color_wh_to_enum[x].name,
sorted(device_properties["supported_left_mono_resolutions"], key=lambda x: x[0] * x[1]),
lambda x: color_wh_to_enum[x].name, # type: ignore[index]
sorted(device_properties["supported_left_mono_resolutions"], key=lambda x: x[0] * x[1]), # type: ignore[operator]
)
)
device_properties["supported_right_mono_resolutions"] = list(
map(
lambda x: color_wh_to_enum[x].name,
sorted(device_properties["supported_right_mono_resolutions"], key=lambda x: x[0] * x[1]),
lambda x: color_wh_to_enum[x].name, # type: ignore[index]
sorted(device_properties["supported_right_mono_resolutions"], key=lambda x: x[0] * x[1]), # type: ignore[operator]
)
)
return device_properties

def update_pipeline(
self, config: PipelineConfiguration, runtime_only: bool, callbacks: "SdkCallbacks"
) -> Tuple[bool, str]:
) -> Tuple[bool, Dict[str, str]]:
if self.oak_cam.running():
if runtime_only:
return True, self._stereo.control.send_controls(config.depth.to_runtime_controls())
if config.depth is not None:
return True, self._stereo.control.send_controls(config.depth.to_runtime_controls())
return False, {"message": "Depth is not enabled, can't send runtime controls!"}
print("Cam running, closing...")
self.oak_cam.device.close()
self.oak_cam = None
# Check if the device is available, timeout after 10 seconds
timeout_start = time.time()
while time.time() - timeout_start < 10:
available_devices = [device.getMxId() for device in dai.Device.getAllAvailableDevices()]
available_devices = [device.getMxId() for device in dai.Device.getAllAvailableDevices()] # type: ignore[call-arg]
if self.id in available_devices:
break
try:
@@ -135,21 +140,21 @@ def update_pipeline(
print("Connected device is PoE: Using encoding...")
else:
print("Connected device is USB: Not using encoding...")
if config.color_camera:
if config.color_camera is not None:
print("Creating color camera")
self._color = self.oak_cam.create_camera(
"color", config.color_camera.resolution, config.color_camera.fps, name="color", encode=self.use_encoding
)
if config.color_camera.xout_video:
self.oak_cam.callback(self._color, callbacks.on_color_frame, enable_visualizer=self.use_encoding)
if config.left_camera:
if config.left_camera is not None:
print("Creating left camera")
self._left = self.oak_cam.create_camera(
"left", config.left_camera.resolution, config.left_camera.fps, name="left", encode=self.use_encoding
)
if config.left_camera.xout:
self.oak_cam.callback(self._left, callbacks.on_left_frame, enable_visualizer=self.use_encoding)
if config.right_camera:
if config.right_camera is not None:
print("Creating right camera")
self._right = self.oak_cam.create_camera(
"right", config.right_camera.resolution, config.right_camera.fps, name="right", encode=self.use_encoding
@@ -183,7 +188,7 @@ def update_pipeline(
# self._pc = self.oak_cam.create_pointcloud(stereo=self._stereo, colorize=self._color)
# self.oak_cam.callback(self._pc, callbacks.on_pointcloud)

if config.imu:
if config.imu is not None:
print("Creating IMU")
imu = self.oak_cam.create_imu()
sensors = [
@@ -228,7 +233,7 @@ def update_pipeline(
self.intrinsic_matrix = {}
return running, {"message": "Pipeline started" if running else "Couldn't start pipeline"}

def update(self):
def update(self) -> None:
self.oak_cam.poll()
if time.time_ns() - self._time_of_last_xlink_update >= 16e6:
self._time_of_last_xlink_update = time.time_ns()
@@ -238,12 +243,12 @@ def update(self):


class DepthaiViewerBack:
_device: SelectedDevice = None
_device: Optional[SelectedDevice]

# Queues for communicating with the API process
action_queue: Queue
result_queue: Queue
send_message_queue: Queue
action_queue: Queue[Any]
result_queue: Queue[Any]
send_message_queue: Queue[Any]

# Sdk callbacks for handling data from the device and sending it to the frontend
sdk_callbacks: SdkCallbacks
@@ -266,12 +271,12 @@ def __init__(self, compression: bool = False) -> None:
self.sdk_callbacks = SdkCallbacks(self.store)
self.run()

def set_device(self, device: SelectedDevice | None):
def set_device(self, device: Optional[SelectedDevice] = None) -> None:
self._device = device
if device:
self.sdk_callbacks.set_camera_intrinsics_getter(device.get_intrinsic_matrix)

def on_reset(self) -> Tuple[bool, str]:
def on_reset(self) -> Tuple[bool, Dict[str, str]]:
print("Resetting...")
if self._device:
print("Closing device...")
@@ -282,7 +287,7 @@ def on_reset(self) -> Tuple[bool, str]:
print("Done")
return True, {"message": "Reset successful"}

def select_device(self, device_id: str) -> Tuple[bool, str]:
def select_device(self, device_id: str) -> Tuple[bool, Dict[str, Union[str, Any]]]:
print("Selecting device: ", device_id)
if self._device:
self.on_reset()
@@ -297,8 +302,10 @@ def select_device(self, device_id: str) -> Tuple[bool, str]:
"device_properties": {},
}
try:
device_properties = self._device.get_device_properties()
return True, {"message:": "Device selected successfully", "device_properties": device_properties}
if self._device is not None:
device_properties = self._device.get_device_properties()
return True, {"message:": "Device selected successfully", "device_properties": device_properties}
return False, {"message": "CCouldn't select device", "device_properties": {}}
except RuntimeError as e:
print("Failed to get device properties:", e)
self.on_reset()
@@ -308,19 +315,21 @@ def select_device(self, device_id: str) -> Tuple[bool, str]:
exit(-1)
# return False, {"message": "Failed to get device properties", "device_properties": {}}

def update_pipeline(self, runtime_only: bool) -> bool:
def update_pipeline(self, runtime_only: bool) -> Tuple[bool, Dict[str, str]]:
if not self._device:
print("No device selected, can't update pipeline!")
return False, {"message": "No device selected, can't update pipeline!"}
print("Updating pipeline...")
started, message = self._device.update_pipeline(
self.store.pipeline_config, runtime_only, callbacks=self.sdk_callbacks
)
if not started:
self.set_device(None)
return started, {"message": message}
started, message = False, {"message": "Couldn't start pipeline"}
if self.store.pipeline_config is not None:
started, message = self._device.update_pipeline(
self.store.pipeline_config, runtime_only, callbacks=self.sdk_callbacks
)
if not started:
self.set_device(None)
return started, message

def run(self):
def run(self) -> None:
"""Handles ws messages and polls OakCam."""
while True:
try:
41 changes: 21 additions & 20 deletions rerun_py/rerun_sdk/depthai_viewer/_backend/sdk_callbacks.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from typing import Callable, Dict, List, Tuple, Union
from typing import Callable, Dict, List, Tuple, Optional

import cv2
import depthai as dai
import numpy as np
from numpy.typing import NDArray
from ahrs.filters import Mahony
from depthai_sdk.classes.packets import (
DepthPacket,
@@ -39,7 +40,7 @@ class EntityPath:
class SdkCallbacks:
store: Store
ahrs: Mahony
_get_camera_intrinsics: Callable[[int, int], np.ndarray]
_get_camera_intrinsics: Callable[[int, int], NDArray[np.float32]]

def __init__(self, store: Store):
viewer.init("Depthai Viewer")
@@ -48,10 +49,10 @@ def __init__(self, store: Store):
self.ahrs = Mahony(frequency=100)
self.ahrs.Q = np.array([1, 0, 0, 0], dtype=np.float64)

def set_camera_intrinsics_getter(self, camera_intrinsics_getter: Callable[[int, int], np.ndarray]):
def set_camera_intrinsics_getter(self, camera_intrinsics_getter: Callable[[int, int], NDArray[np.float32]]) -> None:
self._get_camera_intrinsics = camera_intrinsics_getter

def on_imu(self, packet: IMUPacket):
def on_imu(self, packet: IMUPacket) -> None:
for data in packet.data:
gyro: dai.IMUReportGyroscope = data.gyroscope
accel: dai.IMUReportAccelerometer = data.acceleroMeter
@@ -64,7 +65,7 @@ def on_imu(self, packet: IMUPacket):
return
viewer.log_imu([accel.z, accel.x, accel.y], [gyro.z, gyro.x, gyro.y], self.ahrs.Q, [mag.x, mag.y, mag.z])

def on_color_frame(self, frame: FramePacket):
def on_color_frame(self, frame: FramePacket) -> None:
# Always log pinhole cam and pose (TODO(filip): move somewhere else or not)
if Topic.ColorImage not in self.store.subscriptions:
return
@@ -75,7 +76,7 @@ def on_color_frame(self, frame: FramePacket):
)
viewer.log_image(EntityPath.RGB_CAMERA_IMAGE, cv2.cvtColor(frame.frame, cv2.COLOR_BGR2RGB))

def on_left_frame(self, frame: FramePacket):
def on_left_frame(self, frame: FramePacket) -> None:
if Topic.LeftMono not in self.store.subscriptions:
return
h, w = frame.frame.shape
@@ -85,7 +86,7 @@ def on_left_frame(self, frame: FramePacket):
)
viewer.log_image(EntityPath.LEFT_CAMERA_IMAGE, frame.frame)

def on_right_frame(self, frame: FramePacket):
def on_right_frame(self, frame: FramePacket) -> None:
if Topic.RightMono not in self.store.subscriptions:
return
h, w = frame.frame.shape
@@ -95,47 +96,47 @@ def on_right_frame(self, frame: FramePacket):
)
viewer.log_image(EntityPath.RIGHT_CAMERA_IMAGE, frame.frame)

def on_stereo_frame(self, frame: DepthPacket):
def on_stereo_frame(self, frame: DepthPacket) -> None:
if Topic.DepthImage not in self.store.subscriptions:
return
depth_frame = frame.frame
path = EntityPath.RGB_PINHOLE_CAMERA + "/Depth"
depth = self.store.pipeline_config.depth
if not depth:
if not self.store.pipeline_config or not self.store.pipeline_config.depth:
# Essentially impossible to get here
return
depth = self.store.pipeline_config.depth
if depth.align == dai.CameraBoardSocket.LEFT:
path = EntityPath.LEFT_PINHOLE_CAMERA + "/Depth"
elif depth.align == dai.CameraBoardSocket.RIGHT:
path = EntityPath.RIGHT_PINHOLE_CAMERA + "/Depth"
viewer.log_depth_image(path, depth_frame, meter=1e3)

def on_detections(self, packet: DetectionPacket):
def on_detections(self, packet: DetectionPacket) -> None:
rects, colors, labels = self._detections_to_rects_colors_labels(packet)
viewer.log_rects(EntityPath.DETECTIONS, rects, rect_format=RectFormat.XYXY, colors=colors, labels=labels)

def _detections_to_rects_colors_labels(
self, packet: DetectionPacket, labels_dict: Union[Dict, None] = None
) -> Tuple[List, List, List]:
self, packet: DetectionPacket, omz_labels: Optional[List[str]] = None
) -> Tuple[List[List[int]], List[List[int]], List[str]]:
rects = []
colors = []
labels = []
for detection in packet.detections:
rects.append(self._rect_from_detection(detection))
colors.append([0, 255, 0])
label = detection.label
label: str = detection.label
# Open model zoo models output label index
if labels_dict is not None and isinstance(label, int):
label += labels_dict[label]
if omz_labels is not None and isinstance(label, int):
label += omz_labels[label]
label += ", " + str(int(detection.img_detection.confidence * 100)) + "%"
labels.append(label)
return rects, colors, labels

def on_yolo_packet(self, packet: DetectionPacket):
def on_yolo_packet(self, packet: DetectionPacket) -> None:
rects, colors, labels = self._detections_to_rects_colors_labels(packet)
viewer.log_rects(EntityPath.DETECTIONS, rects=rects, colors=colors, labels=labels, rect_format=RectFormat.XYXY)

def on_age_gender_packet(self, packet: TwoStagePacket):
def on_age_gender_packet(self, packet: TwoStagePacket) -> None:
for det, rec in zip(packet.detections, packet.nnData):
age = int(float(np.squeeze(np.array(rec.getLayerFp16("age_conv3")))) * 100)
gender = np.squeeze(np.array(rec.getLayerFp16("prob")))
@@ -151,12 +152,12 @@ def on_age_gender_packet(self, packet: TwoStagePacket):
label=label,
)

def _rect_from_detection(self, detection: _Detection):
def _rect_from_detection(self, detection: _Detection) -> List[int]:
return [
*detection.bottom_right,
*detection.top_left,
]

def on_mobilenet_ssd_packet(self, packet: DetectionPacket):
def on_mobilenet_ssd_packet(self, packet: DetectionPacket) -> None:
rects, colors, labels = self._detections_to_rects_colors_labels(packet, classification_labels.MOBILENET_LABELS)
viewer.log_rects(EntityPath.DETECTIONS, rects=rects, colors=colors, labels=labels, rect_format=RectFormat.XYXY)
20 changes: 10 additions & 10 deletions rerun_py/rerun_sdk/depthai_viewer/_backend/store.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from enum import Enum
from typing import Callable, List, Tuple
from typing import Callable, List, Tuple, Optional, Dict

from depthai_viewer._backend.device_configuration import PipelineConfiguration
from depthai_viewer._backend.topic import Topic
@@ -16,19 +16,19 @@ class Action(Enum):


class Store:
pipeline_config: PipelineConfiguration = PipelineConfiguration()
pipeline_config: Optional[PipelineConfiguration] = PipelineConfiguration()
subscriptions: List[Topic] = []
on_update_pipeline: Callable[[bool], Tuple[bool, str]] = None
on_select_device: Callable[[str], Tuple[bool, str]] = None
on_reset: Callable[[], Tuple[bool, str]] = None
on_update_pipeline: Optional[Callable[[bool], Tuple[bool, Dict[str, str]]]] = None
on_select_device: Optional[Callable[[str], Tuple[bool, Dict[str, str]]]] = None
on_reset: Optional[Callable[[], Tuple[bool, Dict[str, str]]]] = None

def handle_action(self, action: Action, **kwargs) -> Tuple[bool, str]:
def handle_action(self, action: Action, **kwargs) -> Tuple[bool, Dict[str, str]]: # type: ignore[no-untyped-def]
if action == Action.UPDATE_PIPELINE:
if kwargs.get("pipeline_config", None):
if self.on_update_pipeline:
old_pipeline_config = self.pipeline_config
self.pipeline_config = kwargs.get("pipeline_config")
success, message = self.on_update_pipeline(kwargs.get("runtime_only"))
success, message = self.on_update_pipeline(kwargs.get("runtime_only")) # type: ignore[arg-type]
if success:
return success, message
self.pipeline_config = old_pipeline_config
@@ -40,14 +40,14 @@ def handle_action(self, action: Action, **kwargs) -> Tuple[bool, str]:
if self.on_select_device:
return self.on_select_device(device_id)
elif action == Action.GET_SUBSCRIPTIONS:
return self.subscriptions
return self.subscriptions # type: ignore[return-value]
elif action == Action.SET_SUBSCRIPTIONS:
self.subscriptions = kwargs.get("subscriptions", [])
elif action == Action.GET_PIPELINE:
return self.pipeline_config
return self.pipeline_config # type: ignore[return-value]
elif action == Action.RESET:
if self.on_reset:
self.pipeline_config = None
self.subscriptions = []
return self.on_reset()
return False, f"Action: {action} didn't succeed!"
return False, {"message": f"Action: {action} didn't succeed!"}