Skip to content

Commit

Permalink
Integrate mobile robots by interfacing with airo-tulip (#147)
Browse files Browse the repository at this point in the history
Adds mobile robot support with airo-tulip v0.1.0.

---------

Co-authored-by: bitscuity <[email protected]>
  • Loading branch information
m-decoster and bitscuity authored Oct 21, 2024
1 parent 5cfb763 commit 86da798
Show file tree
Hide file tree
Showing 9 changed files with 250 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ This project uses a [CalVer](https://calver.org/) versioning scheme with monthly
- `Zed2i.ULTRA_DEPTH_MODE` to enable the ultra depth setting for the Zed2i cameras
- `OpenCVVideoCapture` implementation of `RGBCamera` for working with arbitrary cameras
- `MultiprocessRGBRerunLogger` and `MultiprocessRGBDRerunLogger` now allow you to pass an `entity_path` value which determines where the RGB and depth images will be logged
- `MobileRobot` and `KELORobile` interface and subclass added, to control mobile robots via the `airo-tulip` package

### Changed
- `coco-to-yolo` conversion now creates a single polygon of all disconnected parts of the mask instead of simply taking the first polygon of the list.
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ Repositories that follow the same style as `airo-mono` packages, but are not par
| 🛒 [`airo-models`](https://github.com/airo-ugent/airo-models) | Collection of robot and object models and URDFs |
| 🐉 [`airo-drake`](https://github.com/airo-ugent/airo-drake) | Integration with Drake |
| 🧭 [`airo-planner`](https://github.com/airo-ugent/airo-planner) | Motion planning interfaces |
| 🚗 [`airo-tulip`](https://github.com/airo-ugent/airo-tulip) | Driver for the KELO mobile robot platform |

### Usage & Philosophy 📖
We believe in *keep simple things simple*. Starting a new project should\* be as simple as:
Expand Down
Empty file.
Empty file.
133 changes: 133 additions & 0 deletions airo-robots/airo_robots/drives/hardware/kelo_robile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import math
import time
from threading import Thread

import numpy as np
from airo_robots.awaitable_action import AwaitableAction
from airo_robots.drives.mobile_robot import CompliantLevel, MobileRobot
from airo_tulip.api.client import KELORobile as KELORobileClient # type: ignore
from airo_tulip.hardware.platform_driver import PlatformDriverType # type: ignore
from airo_typing import Vector3DType


class KELORobile(MobileRobot):
"""KELO Robile platform implementation of the MobileRobot interface.
The KELO Robile platform consists of several drives with castor wheels which are connected via EtherCAT and can
be controlled individually. The airo-tulip API, which is used here, provides a higher level interface which
controls the entire platform."""

def __init__(self, robot_ip: str, robot_port: int = 49789):
"""Connect to the KELO robot.
The KELO robot should already be running the airo-tulip server. If this is not the case, any messages
sent from the client may be queued and executed once the server is started, resulting in unexpected
and/or sudden movements.
Args:
robot_ip: IP address of the KELO CPU brick.
robot_port: Port to connect on (default: 49789)."""

self._kelo_robile = KELORobileClient(robot_ip, robot_port)

# Position control.
self._control_loop_done = True

def align_drives(self, x: float, y: float, a: float, timeout: float = 1.0) -> AwaitableAction:
"""Align all drives for driving in a direction given by the linear and angular velocities.
Beware that sending any other velocity commands before the awaitable is done may cause unexpected behaviour,
as this affects the "aligned" condition.
Args:
x: The x velocity (linear, m/s).
y: The y velocity (linear, m/s)
a: The angular velocity (rad/s).
timeout: The awaitable will finish after this time at the latest.
Returns:
An AwaitableAction which will check if the drives are aligned, or if the timeout has expired."""
self._kelo_robile.align_drives(x, y, a, timeout=timeout)

action_sent_time = time.time_ns()
return AwaitableAction(
lambda: self._kelo_robile.are_drives_aligned() or time.time_ns() - action_sent_time > timeout * 1e9,
default_timeout=2 * timeout,
default_sleep_resolution=0.002,
)

def set_platform_velocity_target(self, x: float, y: float, a: float, timeout: float) -> AwaitableAction:
self._kelo_robile.set_platform_velocity_target(x, y, a, timeout=timeout)

action_sent_time = time.time_ns()
return AwaitableAction(
lambda: time.time_ns() - action_sent_time > timeout * 1e9,
default_timeout=2 * timeout,
default_sleep_resolution=0.002,
)

def _move_platform_to_pose_control_loop(
self, target_pose: Vector3DType, action_start_time: float, action_timeout_time: float, timeout: float
) -> None:
stop = False
while not stop:
current_pose = self._kelo_robile.get_odometry()
delta_pose = target_pose - current_pose

vel_vec_angle = np.arctan2(delta_pose[1], delta_pose[0]) - current_pose[2]
vel_vec_norm = min(np.linalg.norm(delta_pose[:2]), 0.5)
vel_x = vel_vec_norm * np.cos(vel_vec_angle)
vel_y = vel_vec_norm * np.sin(vel_vec_angle)

delta_angle = np.arctan2(np.sin(delta_pose[2]), np.cos(delta_pose[2]))
vel_a = max(min(delta_angle, math.pi / 4), -math.pi / 4)

command_timeout = (action_timeout_time - time.time_ns()) * 1e-9
if command_timeout >= 0.0:
self._kelo_robile.set_platform_velocity_target(vel_x, vel_y, vel_a, timeout=command_timeout)

at_target_pose = bool(np.linalg.norm(delta_pose) < 0.01)
stop = at_target_pose or time.time_ns() - action_start_time > timeout * 1e9

self._kelo_robile.set_platform_velocity_target(0.0, 0.0, 0.0)
self._control_loop_done = True

def move_platform_to_pose(self, x: float, y: float, a: float, timeout: float) -> AwaitableAction:
target_pose = np.array([x, y, a])
action_start_time = time.time_ns()
action_timeout_time = action_start_time + timeout * 1e9

self._control_loop_done = False # Will be set to True by the below thread once it's finished.
thread = Thread(
target=self._move_platform_to_pose_control_loop,
args=(target_pose, action_start_time, action_timeout_time, timeout),
)
thread.start()

return AwaitableAction(
lambda: self._control_loop_done,
default_timeout=2 * timeout,
default_sleep_resolution=0.002,
)

def enable_compliant_mode(
self, enabled: bool, compliant_level: CompliantLevel = CompliantLevel.COMPLIANT_WEAK
) -> None:
if enabled:
if compliant_level == CompliantLevel.COMPLIANT_WEAK:
self._kelo_robile.set_driver_type(PlatformDriverType.COMPLIANT_WEAK)
elif compliant_level == CompliantLevel.COMPLIANT_MODERATE:
self._kelo_robile.set_driver_type(PlatformDriverType.COMPLIANT_MODERATE)
else:
self._kelo_robile.set_driver_type(PlatformDriverType.COMPLIANT_STRONG)
else:
self._kelo_robile.set_driver_type(PlatformDriverType.VELOCITY)

def get_odometry(self) -> Vector3DType:
return self._kelo_robile.get_odometry()

def get_velocity(self) -> Vector3DType:
return self._kelo_robile.get_velocity()

def reset_odometry(self) -> None:
self._kelo_robile.reset_odometry()
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
For information on how to set up the KELO Robile, please refer to the documentation of [airo-tulip](https://pypi.org/project/airo-tulip/).
31 changes: 31 additions & 0 deletions airo-robots/airo_robots/drives/hardware/manual_robot_testing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""code for manual testing of mobile robot base class implementations.
"""
from airo_robots.drives.hardware.kelo_robile import KELORobile
from airo_robots.drives.mobile_robot import MobileRobot


def manually_test_robot_implementation(robot: MobileRobot) -> None:
if isinstance(robot, KELORobile):
input("robot will rotate drives")
robot.align_drives(0.1, 0.0, 0.0, 1.0).wait()

input("robot will now move forward 10cm")
robot.set_platform_velocity_target(0.1, 0.0, 0.0, 1.0).wait()

input("robot will now move left 10cm")
robot.set_platform_velocity_target(0.0, 0.1, 0.0, 1.0).wait()

input("robot will now make two short rotations")
robot.set_platform_velocity_target(0.0, 0.0, 0.1, 1.0).wait()
robot.set_platform_velocity_target(0.0, 0.0, -0.1, 1.0).wait()

input("robot will now return to original position")
robot.set_platform_velocity_target(-0.1, -0.1, 0.0, 1.0).wait()

input("robot will now move to 10 cm along +X relative from its starting position with position control")
robot.move_platform_to_pose(0.1, 0.0, 0.0, 10.0).wait()


if __name__ == "__main__":
mobi = KELORobile("10.10.129.21")
manually_test_robot_implementation(mobi)
82 changes: 82 additions & 0 deletions airo-robots/airo_robots/drives/mobile_robot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from abc import ABC, abstractmethod
from enum import Enum

from airo_robots.awaitable_action import AwaitableAction
from airo_typing import Vector3DType


class CompliantLevel(Enum):
"""The level of compliance expected from the mobile robot.
Values may not correspond to identical behaviour on different mobile platforms, but are merely an indication.
A value of weak means a very compliant robot, whereas a value of strong means a slightly compliant robot."""

COMPLIANT_WEAK = 1
COMPLIANT_MODERATE = 2
COMPLIANT_STRONG = 3


class MobileRobot(ABC):
"""
Base class for a mobile robot.
Mobile robots typically allow to set a target velocity for the entire platform and apply torques to their wheels
to achieve the desired velocity.
All values are in metric units:
- Linear velocities are expressed in meters/second
- Angular velocities are expressed in radians/second
"""

@abstractmethod
def set_platform_velocity_target(self, x: float, y: float, a: float, timeout: float) -> AwaitableAction:
"""Set the desired platform velocity.
Args:
x: Linear velocity along the X axis.
y: Linear velocity along the Y axis.
a: Angular velocity.
timeout: After this time, the platform will automatically stop.
Returns:
An awaitable action."""

@abstractmethod
def move_platform_to_pose(self, x: float, y: float, a: float, timeout: float) -> AwaitableAction:
"""Move the platform to the given pose, without guarantees about the followed path.
Args:
x: Position along the startup pose's X axis.
y: Position along the startup pose's Y axis.
a: Orientation around the startup pose's Z axis.
timeout: After this time, the platform will automatically stop.
Returns:
An awaitable action."""

@abstractmethod
def enable_compliant_mode(self, enabled: bool, compliant_level: CompliantLevel) -> None:
"""Enable compliant mode on the robot.
Args:
enabled: If true, will enable compliant mode. Else, will disable compliant mode.
compliant_level: The level of compliance to be expected from the robot. Ignored if `enabled` is `False`."""

@abstractmethod
def get_odometry(self) -> Vector3DType:
"""Get the estimated robot pose as a 3D vector comprising the `x`, `y`, and `theta` values relative to the
robot's starting pose.
Returns: A 3D vector with a value for the `x` position, `y` position, and `theta` angle of the robot."""

@abstractmethod
def reset_odometry(self) -> None:
"""Reset the robot's odometry to `(0, 0, 0)`."""

@abstractmethod
def get_velocity(self) -> Vector3DType:
"""Get the estimated robot's velocity as a 3D vector comprising the `x`, `y` and `theta` values relative to the
robot's starting pose.
Returns:
A 3D vector with a value for the `x` velocity, `y` velocity, and `theta` angular velocity of the robot."""
1 change: 1 addition & 0 deletions airo-robots/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"click",
"airo-typing",
"airo-spatial-algebra",
"airo-tulip==0.1.0",
],
packages=setuptools.find_packages(),
package_data={"airo_robots": ["py.typed"]},
Expand Down

0 comments on commit 86da798

Please sign in to comment.