From b33077acf3875cd78389c81ae9bc72793fa8c8cb Mon Sep 17 00:00:00 2001 From: Luis Camero Date: Thu, 27 Apr 2023 16:23:41 -0400 Subject: [PATCH 1/4] Updated Port to ensure always stores --- clearpath_config/common.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clearpath_config/common.py b/clearpath_config/common.py index d7536b1..d0e4a4f 100644 --- a/clearpath_config/common.py +++ b/clearpath_config/common.py @@ -141,7 +141,7 @@ def assert_valid(ip: str) -> None: class Port: def __init__(self, port: int) -> None: self.assert_valid(port) - self.port = port + self.port = int(port) def __str__(self) -> str: return str(self.port) From 2cb0e2f3430a6c9266200a0d151382dad577cead Mon Sep 17 00:00:00 2001 From: Luis Camero Date: Thu, 27 Apr 2023 16:23:58 -0400 Subject: [PATCH 2/4] Added lidar3d --- clearpath_config/sensors/lidars_3d.py | 227 ++++++++++++++++++++++++++ 1 file changed, 227 insertions(+) create mode 100644 clearpath_config/sensors/lidars_3d.py diff --git a/clearpath_config/sensors/lidars_3d.py b/clearpath_config/sensors/lidars_3d.py new file mode 100644 index 0000000..a0419a1 --- /dev/null +++ b/clearpath_config/sensors/lidars_3d.py @@ -0,0 +1,227 @@ +from clearpath_config.common import IP, Port +from clearpath_config.sensors.base import BaseSensor, Accessory, List + + +class BaseLidar3D(BaseSensor): + """ + Base 3D Lidar Class + - contains all common 3d lidar parameters: + - frame_id: to publish PointCloud data + - ip_address: to connect to lidar + - ip_port: to connect to lidar + """ + SENSOR_TYPE = "lidar3d" + SENSOR_MODEL = "base" + TOPIC = "points" + + FRAME_ID = "laser" + IP_ADDRESS = "192.168.131.25" + IP_PORT = "2368" + + class ROS_PARAMETER_KEYS: + FRAME_ID = "frame_id" + IP_ADDRESS = "ip_address" + IP_PORT = "ip_port" + + def __init__( + self, + idx: int = None, + name: str = None, + topic: str = TOPIC, + frame_id: str = FRAME_ID, + ip: str = IP_ADDRESS, + port: int = IP_PORT, + urdf_enabled: bool = BaseSensor.URDF_ENABLED, + launch_enabled: bool = BaseSensor.LAUNCH_ENABLED, + ros_parameters: str = BaseSensor.ROS_PARAMETERS, + parent: str = Accessory.PARENT, + xyz: List[float] = Accessory.XYZ, + rpy: List[float] = Accessory.RPY + ) -> None: + # Frame ID + self.frame_id: str = self.FRAME_ID + self.set_frame_id(frame_id) + # IP Address + self.ip: IP = IP(self.IP_ADDRESS) + self.set_ip(ip) + # IP Port + self.port: Port = Port(self.IP_PORT) + self.set_port(port) + super().__init__( + idx, + name, + topic, + urdf_enabled, + launch_enabled, + ros_parameters, + parent, + xyz, + rpy + ) + # ROS Parameter Keys + pairs = { + # Frame ID + BaseLidar3D.ROS_PARAMETER_KEYS.FRAME_ID: ( + BaseSensor.ROSParameter( + key=BaseLidar3D.ROS_PARAMETER_KEYS.FRAME_ID, + get=lambda obj: obj.get_frame_id(), + set=lambda obj, val: obj.set_frame_id(val) + ) + ), + # IP Address + BaseLidar3D.ROS_PARAMETER_KEYS.IP_ADDRESS: ( + BaseSensor.ROSParameter( + key=BaseLidar3D.ROS_PARAMETER_KEYS.IP_ADDRESS, + get=lambda obj: obj.get_ip(), + set=lambda obj, val: obj.set_ip(val))), + # IP Port + BaseLidar3D.ROS_PARAMETER_KEYS.IP_PORT: ( + BaseSensor.ROSParameter( + key=BaseLidar3D.ROS_PARAMETER_KEYS.IP_PORT, + get=lambda obj: obj.get_port(), + set=lambda obj, val: obj.set_port(val))), + } + self.ros_parameter_pairs.update(pairs) + self.set_ros_parameters(ros_parameters) + + @classmethod + def get_frame_id_from_idx(cls, idx: int) -> str: + return "%s_%s" % ( + cls.get_name_from_idx(idx), + cls.FRAME_ID + ) + + @classmethod + def get_ip_from_idx(cls, idx: int) -> str: + ip = cls.IP_ADDRESS.split('.') + network_id = ip[0:3] + host_id = int(ip[-1]) + idx + return '.'.join(network_id) + '.' + str(host_id) + + def set_idx(self, idx: int) -> None: + # Set Base: Name and Topic + super().set_idx(idx) + # Set Frame ID + self.set_frame_id(self.get_frame_id_from_idx(idx)) + # Set IP + self.set_ip(self.get_ip_from_idx(idx)) + + def get_frame_id(self) -> str: + return self.frame_id + + def set_frame_id(self, link: str) -> None: + Accessory.assert_valid_link(link) + self.frame_id = link + + def get_ip(self) -> str: + return str(self.ip) + + def set_ip(self, ip: str) -> None: + self.ip = IP(ip) + + def get_port(self) -> int: + return int(self.port) + + def set_port(self, port: int) -> None: + self.port = Port(port) + + +class VelodyneLidar(BaseLidar3D): + """ + Velodyne Lidar Class + - extra ros_parameters: + - device_type: model of the lidar: + - '64E' + - '64E_S3' + - '32E' + - '32C' + - 'VLP16' + """ + SENSOR_MODEL = "velodyne_lidar" + + FRAME_ID = "laser" + IP_PORT = 2368 + + HDL_32E = "32E" + HDL_64E = "64E" + HDL_64E_S2 = "64E_S2" + HDL_64E_S3 = "64E_S3" + VLP_16 = "VLP16" + VLP_32C = "32C" + DEVICE_TYPE = VLP_16 + DEVICE_TYPES = [ + HDL_32E, + HDL_64E, + HDL_64E_S2, + HDL_64E_S3, + VLP_16, + VLP_32C + ] + + class ROS_PARAMETER_KEYS: + DEVICE_TYPE = "model" + + def __init__( + self, + idx: int = None, + name: str = None, + topic: str = BaseLidar3D.TOPIC, + frame_id: str = FRAME_ID, + ip: str = BaseLidar3D.IP_ADDRESS, + port: int = IP_PORT, + device_type: str = DEVICE_TYPE, + urdf_enabled: bool = BaseSensor.URDF_ENABLED, + launch_enabled: bool = BaseSensor.LAUNCH_ENABLED, + ros_parameters: str = BaseSensor.ROS_PARAMETERS, + parent: str = Accessory.PARENT, + xyz: List[float] = Accessory.XYZ, + rpy: List[float] = Accessory.RPY + ) -> None: + # Device Type: + self.device_type: str = self.DEVICE_TYPE + self.set_device_type(device_type) + super().__init__( + idx, + name, + topic, + frame_id, + ip, + port, + urdf_enabled, + launch_enabled, + ros_parameters, + parent, + xyz, + rpy + ) + # ROS Parameter Keys + self.ros_parameter_pairs[ + BaseLidar3D.ROS_PARAMETER_KEYS.FRAME_ID].key = "frame_id" + self.ros_parameter_pairs[ + BaseLidar3D.ROS_PARAMETER_KEYS.IP_ADDRESS].key = "device_ip" + self.ros_parameter_pairs[ + BaseLidar3D.ROS_PARAMETER_KEYS.IP_PORT].key = "port" + pairs = { + # Device Type + self.ROS_PARAMETER_KEYS.DEVICE_TYPE: ( + BaseSensor.ROSParameter( + key=self.ROS_PARAMETER_KEYS.DEVICE_TYPE, + get=lambda obj: obj.get_device_type(), + set=lambda obj, val: obj.set_device_type(val) + ) + ), + } + self.ros_parameter_pairs.update(pairs) + self.set_ros_parameters(ros_parameters) + + def get_device_type(self) -> str: + return self.device_type + + def set_device_type(self, device_type: str) -> None: + assert device_type in self.DEVICE_TYPES, ( + "Device type '%s' is not one of '%s'" % ( + device_type, + self.DEVICE_TYPES + ) + ) + self.device_type = device_type From f3a39f376c679637c0401b9aa4313b44efe9fa48 Mon Sep 17 00:00:00 2001 From: Luis Camero Date: Thu, 27 Apr 2023 16:24:25 -0400 Subject: [PATCH 3/4] Parse Lidar3D --- clearpath_config/parser.py | 86 ++++++++++++--- clearpath_config/sensors/sensors.py | 163 +++++++++++++++++++++++++--- 2 files changed, 215 insertions(+), 34 deletions(-) diff --git a/clearpath_config/parser.py b/clearpath_config/parser.py index c6511b3..09de9d3 100644 --- a/clearpath_config/parser.py +++ b/clearpath_config/parser.py @@ -29,9 +29,12 @@ BaseCamera, FlirBlackfly, IntelRealsense, - # Lidars + # Lidar2D Lidar2D, BaseLidar2D, + # Lidar3D + Lidar3D, + BaseLidar3D, # IMU InertialMeasurementUnit, BaseIMU, @@ -575,23 +578,6 @@ def __new__(cls, config: dict) -> BaseLidar2D: ) -class IMUParser(BaseConfigParser): - MODEL = "model" - - def __new__(cls, config: dict) -> BaseIMU: - base = BaseSensorParser(config) - model = cls.get_required_val(IMUParser.MODEL, config) - imu = InertialMeasurementUnit(model) - # Set Base Parameters - imu.set_parent(base.get_parent()) - imu.set_xyz(base.get_xyz()) - imu.set_rpy(base.get_rpy()) - imu.set_urdf_enabled(base.get_urdf_enabled()) - imu.set_launch_enabled(base.get_launch_enabled()) - imu.set_ros_parameters(base.get_ros_parameters()) - return imu - - class Lidar2DParser(BaseConfigParser): MODEL = "model" @@ -614,6 +600,66 @@ def __new__(cls, config: dict) -> BaseLidar2D: return lidar2d +class BaseLidar3DParser(BaseConfigParser): + # Keys + IP = "ip" + PORT = "port" + + def __new__(cls, config: dict) -> BaseLidar3D: + sensor = BaseSensorParser(config) + ip = cls.get_optional_val( + BaseLidar3DParser.IP, config, BaseLidar3D.IP_ADDRESS) + port = cls.get_optional_val( + BaseLidar3DParser.PORT, config, BaseLidar3D.IP_PORT) + return BaseLidar3D( + parent=sensor.get_parent(), + xyz=sensor.get_xyz(), + rpy=sensor.get_rpy(), + urdf_enabled=sensor.get_urdf_enabled(), + launch_enabled=sensor.get_launch_enabled(), + ros_parameters=sensor.get_ros_parameters(), + ip=ip, + port=port, + ) + + +class Lidar3DParser(BaseConfigParser): + MODEL = "model" + + def __new__(cls, config: dict) -> BaseLidar3D: + base = BaseLidar3DParser(config) + model = cls.get_required_val(Lidar3DParser.MODEL, config) + lidar3d = Lidar3D(model) + # Set Base Parameters + lidar3d.set_parent(base.get_parent()) + lidar3d.set_xyz(base.get_xyz()) + lidar3d.set_rpy(base.get_rpy()) + lidar3d.set_urdf_enabled(base.get_urdf_enabled()) + lidar3d.set_launch_enabled(base.get_launch_enabled()) + lidar3d.set_frame_id(base.get_frame_id()) + lidar3d.set_ip(base.get_ip()) + lidar3d.set_port(base.get_port()) + lidar3d.set_ros_parameters(base.get_ros_parameters()) + return lidar3d + + +class IMUParser(BaseConfigParser): + MODEL = "model" + + def __new__(cls, config: dict) -> BaseIMU: + base = BaseSensorParser(config) + model = cls.get_required_val(IMUParser.MODEL, config) + imu = InertialMeasurementUnit(model) + # Set Base Parameters + imu.set_parent(base.get_parent()) + imu.set_xyz(base.get_xyz()) + imu.set_rpy(base.get_rpy()) + imu.set_urdf_enabled(base.get_urdf_enabled()) + imu.set_launch_enabled(base.get_launch_enabled()) + imu.set_ros_parameters(base.get_ros_parameters()) + return imu + + class BaseCameraParser(BaseConfigParser): FPS = "fps" SERIAL = "serial" @@ -759,6 +805,8 @@ def __new__(cls, model: str, config: dict) -> BaseSensor: Sensor.assert_type(model) if model == Sensor.LIDAR2D: return Lidar2DParser(config) + elif model == Sensor.LIDAR3D: + return Lidar3DParser(config) elif model == Sensor.CAMERA: return CameraParser(config) elif model == Sensor.IMU: @@ -777,6 +825,8 @@ def __new__(cls, config: dict) -> SensorConfig: return snrconfig # Lidar2D snrconfig.set_all_lidar_2d(cls.get_sensors(sensors, Sensor.LIDAR2D)) + # Lidar3D + snrconfig.set_all_lidar_3d(cls.get_sensors(sensors, Sensor.LIDAR3D)) # Camera snrconfig.set_all_camera(cls.get_sensors(sensors, Sensor.CAMERA)) # IMU diff --git a/clearpath_config/sensors/sensors.py b/clearpath_config/sensors/sensors.py index c979b6f..ce368a4 100644 --- a/clearpath_config/sensors/sensors.py +++ b/clearpath_config/sensors/sensors.py @@ -10,6 +10,10 @@ HokuyoUST10, SickLMS1XX, ) +from clearpath_config.sensors.lidars_3d import ( + BaseLidar3D, + VelodyneLidar, +) from clearpath_config.sensors.imu import ( BaseIMU, Microstrain, @@ -25,16 +29,16 @@ class InertialMeasurementUnit(): @classmethod def assert_model(cls, model: str) -> None: - assert model in InertialMeasurementUnit.MODEL, ( + assert model in cls.MODEL, ( "Model '%s' must be one of: '%s'" % ( model, - InertialMeasurementUnit.MODEL.keys() + cls.MODEL.keys() ) ) def __new__(cls, model: str) -> BaseIMU: cls.assert_model(model) - return InertialMeasurementUnit.MODEL[model]() + return cls.MODEL[model]() class Camera(): @@ -48,16 +52,16 @@ class Camera(): @classmethod def assert_model(cls, model: str) -> None: - assert model in Camera.MODEL, ( + assert model in cls.MODEL, ( "Model '%s' must be one of: '%s'" % ( model, - Camera.MODEL.keys() + cls.MODEL.keys() ) ) def __new__(cls, model: str) -> BaseCamera: cls.assert_model(model) - return Camera.MODEL[model]() + return cls.MODEL[model]() class Lidar2D(): @@ -71,46 +75,70 @@ class Lidar2D(): @classmethod def assert_model(cls, model: str) -> None: - assert model in Lidar2D.MODEL, ( + assert model in cls.MODEL, ( "Model '%s' must be one of: '%s'" % ( model, - Lidar2D.MODEL.keys() + cls.MODEL.keys() ) ) def __new__(cls, model: str) -> BaseLidar2D: cls.assert_model(model) - return Lidar2D.MODEL[model]() + return cls.MODEL[model]() + + +class Lidar3D(): + VELODYNE_LIDAR = VelodyneLidar.SENSOR_MODEL + + MODEL = { + VELODYNE_LIDAR: VelodyneLidar + } + + @classmethod + def assert_model(cls, model: str) -> None: + assert model in cls.MODEL, ( + "Model '%s' must be one of: '%s'" % ( + model, + cls.MODEL.keys() + ) + ) + + def __new__(cls, model: str) -> BaseLidar3D: + cls.assert_model(model) + return cls.MODEL[model]() class Sensor(): CAMERA = BaseCamera.SENSOR_TYPE LIDAR2D = BaseLidar2D.SENSOR_TYPE + LIDAR3D = BaseLidar3D.SENSOR_TYPE IMU = BaseIMU.SENSOR_TYPE TYPE = { CAMERA: Camera, LIDAR2D: Lidar2D, + LIDAR3D: Lidar3D, IMU: InertialMeasurementUnit, } @classmethod def assert_type(cls, _type: str) -> None: - assert _type in Sensor.TYPE, ( + assert _type in cls.TYPE, ( "Sensor type '%s' must be one of: '%s'" % ( _type, - Sensor.TYPE.keys() + cls.TYPE.keys() ) ) - def __new__(cls, _type: str, _model: str) -> BaseLidar2D: + def __new__(cls, _type: str, _model: str) -> BaseSensor: cls.assert_sensor_type(_type) - return Sensor.TYPE[_type](_model) + return cls.TYPE[_type](_model) # Sensor Config class SensorConfig: LIDAR2D_INDEX = 0 + LIDAR3D_INDEX = 0 CAMERA_INDEX = 0 IMU_INDEX = 0 @@ -119,6 +147,10 @@ def __init__(self) -> None: self.__lidars_2d = OrderedListConfig[BaseLidar2D]( SensorConfig.LIDAR2D_INDEX ) + # 3D Lidars + self.__lidars_3d = OrderedListConfig[BaseLidar3D]( + SensorConfig.LIDAR3D_INDEX + ) # Cameras self.__cameras = OrderedListConfig[BaseCamera]( SensorConfig.CAMERA_INDEX @@ -133,6 +165,8 @@ def get_all_sensors(self) -> List[BaseSensor]: sensors = [] # Lidar2D sensors.extend(self.get_all_lidar_2d()) + # Lidar3D + sensors.extend(self.get_all_lidar_3d()) # Cameras sensors.extend(self.get_all_cameras()) # IMU @@ -281,6 +315,103 @@ def set_lidar_2d(self, lidar_2d: BaseLidar2D) -> None: def set_all_lidar_2d(self, all_lidar_2d: List[BaseLidar2D]) -> None: self.__lidars_2d.set_all(all_lidar_2d) + # Lidar3D: Add Lidar3D by Object or Common Lidar3D Parameters + def add_lidar3d( + self, + # By Object + lidar3d: BaseLidar3D = None, + # By Model and Parameters + model: str = None, + frame_id: str = BaseLidar3D.FRAME_ID, + ip: str = BaseLidar3D.IP_ADDRESS, + port: int = BaseLidar3D.IP_PORT, + urdf_enabled: bool = BaseSensor.URDF_ENABLED, + launch_enabled: bool = BaseSensor.LAUNCH_ENABLED, + parent: str = Accessory.PARENT, + xyz: List[float] = Accessory.XYZ, + rpy: List[float] = Accessory.RPY + ) -> None: + assert lidar3d or model, ( + "Lidar3D object or model must be passed." + ) + if not lidar3d and model: + lidar3d = Lidar3D(model) + lidar3d.set_frame_id(frame_id) + lidar3d.set_ip(ip) + lidar3d.set_port(port) + lidar3d.set_urdf_enabled(urdf_enabled) + lidar3d.set_launch_enabled(launch_enabled) + lidar3d.set_parent(parent) + lidar3d.set_xyz(xyz) + lidar3d.set_rpy(rpy) + self.__lidars_3d.add(lidar3d) + + # Lidar3D: Add Velodyne + def add_velodyne( + self, + # By Object + velodyne: VelodyneLidar = None, + # By Parameters + frame_id: str = VelodyneLidar.FRAME_ID, + ip: str = VelodyneLidar.IP_ADDRESS, + port: int = VelodyneLidar.IP_PORT, + device_type: str = VelodyneLidar.DEVICE_TYPE, + urdf_enabled: bool = VelodyneLidar.URDF_ENABLED, + launch_enabled: bool = VelodyneLidar.LAUNCH_ENABLED, + parent: str = Accessory.PARENT, + xyz: List[float] = Accessory.XYZ, + rpy: List[float] = Accessory.RPY + ) -> None: + if velodyne is None: + velodyne = VelodyneLidar( + frame_id=frame_id, + ip=ip, + port=port, + device_type=device_type, + urdf_enabled=urdf_enabled, + launch_enabled=launch_enabled, + parent=parent, + xyz=xyz, + rpy=rpy + ) + assert isinstance(velodyne, VelodyneLidar), ( + "Lidar3D object must be of type VelodyneLidar" + ) + self.__lidars_3d.add(velodyne) + + # Lidar3D: Remove Lidar3D by passing object or index + def remove_lidar_3d(self, lidar_3d: BaseLidar3D | int) -> None: + self.__lidars_3d.remove(lidar_3d) + + # Lidar3D: Get Single Object + def get_lidar_3d(self, idx: int) -> BaseLidar3D: + return self.__lidars_3d.get(idx) + + # Lidar3D: Get All Objects + def get_all_lidar_3d(self) -> List[BaseLidar3D]: + return self.__lidars_3d.get_all() + + # Lidar3D: Get All Objects of a Specified Model + def get_all_lidar_3d_by_model(self, model: str) -> List[BaseLidar3D]: + Lidar3D.assert_model(model) + all_model_lidar_3d = [] + for lidar_3d in self.get_all_lidar_3d(): + if lidar_3d.SENSOR_MODEL == model: + all_model_lidar_3d.append(lidar_3d) + return all_model_lidar_3d + + # Lidar3D: Get All Objects of Model UST10 + def get_all_velodyne(self) -> List[VelodyneLidar]: + return self.get_all_lidar_3d_by_model(Lidar3D.VELODYNE_LIDAR) + + # Lidar3D: Set Lidar3D Object + def set_lidar_3d(self, lidar_3d: BaseLidar3D) -> None: + self.__lidars_3d.set(lidar_3d) + + # Lidar3D: Set All Lidar3D Objects + def set_all_lidar_3d(self, all_lidar_3d: List[BaseLidar3D]) -> None: + self.__lidars_3d.set_all(all_lidar_3d) + # Camera: Add Camera def add_camera( self, @@ -415,7 +546,7 @@ def set_all_camera(self, cameras: List[BaseCamera]) -> None: self.__cameras.set_all(cameras) # Camera: Get All Objects of a Specified Model - def get_all_cameras_by_model(self, model: str) -> List[BaseLidar2D]: + def get_all_cameras_by_model(self, model: str) -> List[BaseCamera]: Camera.assert_model(model) all_model_camera = [] for camera in self.get_all_cameras(): @@ -423,11 +554,11 @@ def get_all_cameras_by_model(self, model: str) -> List[BaseLidar2D]: all_model_camera.append(camera) return all_model_camera - # Lidar2D: Get All Objects of Model UST10 + # Camera: Get All Objects of Model UST10 def get_all_realsense(self) -> List[IntelRealsense]: return self.get_all_cameras_by_model(Camera.INTEL_REALSENSE) - # Lidar2D: Get All Objects of Model LMS1XX + # Camera: Get All Objects of Model LMS1XX def get_all_blackfly(self) -> List[FlirBlackfly]: return self.get_all_cameras_by_model(Camera.FLIR_BLACKFLY) From 464d2bc26026c744e3edde512ba0b4f3084b3078 Mon Sep 17 00:00:00 2001 From: Luis Camero Date: Thu, 27 Apr 2023 16:24:44 -0400 Subject: [PATCH 4/4] Added Lidar3D to sample config --- clearpath_config/sample/a200_config.yaml | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/clearpath_config/sample/a200_config.yaml b/clearpath_config/sample/a200_config.yaml index 9f43465..7a9a310 100644 --- a/clearpath_config/sample/a200_config.yaml +++ b/clearpath_config/sample/a200_config.yaml @@ -103,6 +103,17 @@ sensors: pointcloud: enable: true anything: true + lidar3d: + - model: velodyne_lidar + urdf_enabled: true + launch_enabled: true + parent: base_link + xyz: [0.0, 0.0, 0.0] + rpy: [0.0, 0.0, 0.0] + ros_parameters: + device_ip: 192.168.131.20 + port: 2368 + model: VLP16 lidar2d: - model: sick_lms1xx urdf_enabled: true