Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ROS Parameters #10

Merged
merged 4 commits into from
Apr 26, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 88 additions & 19 deletions clearpath_config/parser.py
Original file line number Diff line number Diff line change
@@ -37,9 +37,27 @@


class BaseConfigParser:
@staticmethod
def is_nested_key(key: str) -> bool:
return "." in key

@staticmethod
def get_nested_keys(key: str) -> list:
return key.split(".")

@staticmethod
def check_key_exists(key: str, config: dict) -> bool:
return key in config
if BaseConfigParser.is_nested_key(key):
keys = BaseConfigParser.get_nested_keys(key)
key = keys.pop(0)
if key in config:
return False
else:
return BaseConfigParser.check_key_exists(
".".join(keys), config
)
else:
return key in config

@staticmethod
def assert_key_exists(key: str, config: dict) -> None:
@@ -50,12 +68,28 @@ def assert_key_exists(key: str, config: dict) -> None:
@staticmethod
def get_required_val(key: str, config: dict):
BaseConfigParser.assert_key_exists(key, config)
if BaseConfigParser.is_nested_key(key):
keys = BaseConfigParser.get_nested_keys(key)
key = keys.pop(0)
return BaseConfigParser.get_required_val(
key=".".join(keys),
config=config[key]
)
return config[key]

@staticmethod
def get_optional_val(key: str, config: dict, default=None):
if BaseConfigParser.check_key_exists(key, config):
return config[key]
if BaseConfigParser.is_nested_key(key):
keys = BaseConfigParser.get_nested_keys(key)
key = keys.pop(0)
return BaseConfigParser.get_optional_val(
key=".".join(keys),
config=config[key],
default=default
)
else:
return config[key]
else:
return default

@@ -476,6 +510,7 @@ class BaseSensorParser(BaseConfigParser):
# Keys
URDF_ENABLED = "urdf_enabled"
LAUNCH_ENABLED = "launch_enabled"
ROS_PARAMETERS = "ros_parameters"

def __new__(cls, config: dict) -> BaseSensor:
parent = cls.get_optional_val(
@@ -488,11 +523,14 @@ def __new__(cls, config: dict) -> BaseSensor:
BaseSensorParser.URDF_ENABLED, config, BaseSensor.URDF_ENABLED)
launch_enabled = cls.get_optional_val(
BaseSensorParser.LAUNCH_ENABLED, config, BaseSensor.LAUNCH_ENABLED)
ros_parameters = cls.get_optional_val(
BaseSensorParser.ROS_PARAMETERS, config, BaseSensor.ROS_PARAMETERS)
return BaseSensor(
parent=parent,
xyz=xyz, rpy=rpy,
urdf_enabled=urdf_enabled,
launch_enabled=launch_enabled
launch_enabled=launch_enabled,
ros_parameters=ros_parameters
)


@@ -519,6 +557,7 @@ def __new__(cls, config: dict) -> BaseLidar2D:
rpy=sensor.get_rpy(),
urdf_enabled=sensor.get_urdf_enabled(),
launch_enabled=sensor.get_launch_enabled(),
ros_parameters=sensor.get_ros_parameters(),
ip=ip,
port=port,
min_angle=min_angle,
@@ -543,6 +582,7 @@ def __new__(cls, config: dict) -> BaseLidar2D:
lidar2d.set_port(base.get_port())
lidar2d.set_min_angle(base.get_min_angle())
lidar2d.set_max_angle(base.get_max_angle())
lidar2d.set_ros_parameters(base.get_ros_parameters())
return lidar2d


@@ -562,6 +602,7 @@ def __new__(cls, config: dict) -> BaseCamera:
rpy=sensor.get_rpy(),
urdf_enabled=sensor.get_urdf_enabled(),
launch_enabled=sensor.get_launch_enabled(),
ros_parameters=sensor.get_ros_parameters(),
fps=fps,
serial=serial
)
@@ -575,25 +616,23 @@ class CameraParser(BaseConfigParser):
ENCODING = "encoding"

# Realsense Parameters
WIDTH = "width"
HEIGHT = "height"
COLOR_ENABLE = "color_enabled"
COLOR_FPS = "color_fps"
COLOR_WIDTH = "color_width"
COLOR_HEIGHT = "color_height"
DEPTH_ENABLED = "depth_enabled"
DEPTH_FPS = "depth_fps"
DEPTH_WIDTH = "depth_width"
DEPTH_HEIGHT = "depth_height"
POINTCLOUD_ENABLED = "pointcloud_enabled"

# ROS Parameters
ROS_PARAMETERS = "ros_parameters"

def __new__(cls, config: dict) -> BaseLidar2D:
base = BaseCameraParser(config)
model = cls.get_required_val(CameraParser.MODEL, config)
camera = Camera(model)
# Set Base Parameters
camera.set_parent(base.get_parent())
camera.set_xyz(base.get_xyz())
camera.set_rpy(base.get_rpy())
camera.set_urdf_enabled(base.get_urdf_enabled())
camera.set_launch_enabled(base.get_launch_enabled())
camera.set_fps(base.get_fps())
camera.set_serial(base.get_serial())
# Set Specific Parameters
if model == Camera.FLIR_BLACKFLY:
camera.set_connection_type(
@@ -611,18 +650,32 @@ def __new__(cls, config: dict) -> BaseLidar2D:
)
)
elif model == Camera.INTEL_REALSENSE:
camera.set_width(
camera.set_color_enabled(
cls.get_optional_val(
CameraParser.WIDTH,
CameraParser.COLOR_ENABLE,
config,
IntelRealsense.WIDTH
IntelRealsense.COLOR_ENABLED
)
)
camera.set_height(
camera.set_color_fps(
cls.get_optional_val(
CameraParser.HEIGHT,
CameraParser.COLOR_FPS,
config,
IntelRealsense.HEIGHT
IntelRealsense.COLOR_FPS
)
)
camera.set_color_width(
cls.get_optional_val(
CameraParser.COLOR_WIDTH,
config,
IntelRealsense.COLOR_WIDTH
)
)
camera.set_color_height(
cls.get_optional_val(
CameraParser.COLOR_HEIGHT,
config,
IntelRealsense.COLOR_HEIGHT
)
)
camera.set_depth_enabled(
@@ -653,6 +706,22 @@ def __new__(cls, config: dict) -> BaseLidar2D:
IntelRealsense.DEPTH_HEIGHT
)
)
camera.set_pointcloud_enabled(
cls.get_optional_val(
CameraParser.POINTCLOUD_ENABLED,
config,
IntelRealsense.POINTCLOUD_ENABLED
)
)
# Set Base Parameters
camera.set_parent(base.get_parent())
camera.set_xyz(base.get_xyz())
camera.set_rpy(base.get_rpy())
camera.set_urdf_enabled(base.get_urdf_enabled())
camera.set_launch_enabled(base.get_launch_enabled())
camera.set_fps(base.get_fps())
camera.set_serial(base.get_serial())
camera.set_ros_parameters(base.get_ros_parameters())
return camera


50 changes: 30 additions & 20 deletions clearpath_config/sample/a200_config.yaml
Original file line number Diff line number Diff line change
@@ -41,6 +41,9 @@ mounts:
angle: 0.0
xyz: [0.0, 0.0, 0.0]
rpy: [0.0, 0.0, 0.0]
flir_ptu:
- {}
- {}
riser:
- rows: 8
columns: 7
@@ -74,45 +77,52 @@ sensors:
- model: flir_blackfly
urdf_enabled: true
launch_enabled: true
connection_type: USB3
encoding: BayerRG8
fps: 30
serial: 0
parent: base_link
xyz: [0.0, 0.0, 0.0]
rpy: [0.0, 0.0, 0.0]
ros_parameters:
connection_type: USB3
encoding: BayerRG8
fps: 30
serial: 0
- model: intel_realsense
urdf_enabled: true
launch_enabled: true
fps: 30
serial: 0
width: 640
height: 480
depth_enabled: true
depth_fps: 30
depth_width: 640
depth_height: 480
parent: base_link
xyz: [0.0, 0.0, 0.0]
rpy: [0.0, 0.0, 0.0]
ros_parameters:
serial_no: 12345
device_type: d435
enable_color: true
rgb_camera:
profile: "640,480,30"
enable_depth: true
depth_module:
profile: "640,480,30"
pointcloud:
enable: true
anything: true
lidar2d:
- model: sick_lms1xx
urdf_enabled: true
launch_enabled: true
ip: 192.168.131.20
port: 6000
min_angle: -3.1415
max_angle: 3.1415
parent: base_link
xyz: [0.0, 0.0, 0.0]
rpy: [0.0, 0.0, 0.0]
ros_parameters:
hostname: 192.168.131.20
port: 6000
min_ang: -3.1415
max_ang: 3.1415
- model: hokuyo_ust10
urdf_enabled: true
launch_enabled: true
ip: 192.168.131.21
port: 6000
min_angle: -3.1415
max_angle: 3.1415
parent: base_link
xyz: [0.0, 0.0, 0.0]
rpy: [0.0, 0.0, 0.0]
ros_parameters:
ip_address: 192.168.131.21
ip_port: 6000
angle_max: -3.1415
angle_min: 3.1415
64 changes: 63 additions & 1 deletion clearpath_config/sensors/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from clearpath_config.common import Accessory, IndexedAccessory
from typing import List
from typing import List, Callable
import copy


class BaseSensor(IndexedAccessory):
@@ -13,6 +14,18 @@ class BaseSensor(IndexedAccessory):
TOPIC = "base"
URDF_ENABLED = True
LAUNCH_ENABLED = True
ROS_PARAMETERS = {}

class ROSParameter:
def __init__(
self,
key: str,
get: Callable,
set: Callable
) -> None:
self.key = key
self.get = get
self.set = set

def __init__(
self,
@@ -21,6 +34,7 @@ def __init__(
topic: str = TOPIC,
urdf_enabled: bool = URDF_ENABLED,
launch_enabled: bool = LAUNCH_ENABLED,
ros_parameters: str = ROS_PARAMETERS,
parent: str = Accessory.PARENT,
xyz: List[float] = Accessory.XYZ,
rpy: List[float] = Accessory.RPY,
@@ -38,6 +52,11 @@ def __init__(
# - enables the sensor launch in the generated launch
self.launch_enabled = True
self.enable_launch if launch_enabled else self.disable_launch()
# ROS Parameters
# - dictionary with parameters for launch file
self.ros_parameters = {}
self.ros_parameter_pairs = {}
self.set_ros_parameters(ros_parameters)
super().__init__(idx, name, parent, xyz, rpy)

@classmethod
@@ -101,3 +120,46 @@ def set_launch_enabled(self, enabled: bool) -> None:

def get_launch_enabled(self) -> bool:
return self.launch_enabled

@staticmethod
def __get_key_recursive(keys: list, config: dict) -> any:
key = keys.pop(0)
if key in config:
if keys:
return BaseSensor.__get_key_recursive(
keys,
config[key]
)
else:
return config[key]
else:
return None

def set_ros_parameters(self, ros_parameters: dict) -> None:
assert isinstance(ros_parameters, dict), (
"ROS paramaters must be a dictionary")
for key, val in ros_parameters.items():
for _, param in self.ros_parameter_pairs.items():
if "." in param.key:
val = BaseSensor.__get_key_recursive(
param.key.split("."),
ros_parameters
)
if val is not None:
param.set(self, val)
else:
if key == param.key:
param.set(self, val)
self.ros_parameters = ros_parameters

def get_ros_parameters(self) -> dict:
for _, param in self.ros_parameter_pairs.items():
if "." in param.key:
keys = param.key.split(".")
params = {keys.pop(): param.get(self)}
for i in range(len(keys) - 1, -1, -1):
params = {keys[i]: params}
self.ros_parameters.update(params)
else:
self.ros_parameters[param.key] = param.get(self)
return self.ros_parameters
Loading