diff --git a/docs/api/gym_jiminy/common/blocks/body_orientation_observer.rst b/docs/api/gym_jiminy/common/blocks/body_orientation_observer.rst new file mode 100644 index 0000000000..83b15bccc8 --- /dev/null +++ b/docs/api/gym_jiminy/common/blocks/body_orientation_observer.rst @@ -0,0 +1,7 @@ +Body Orientation State Observation +================================== + +.. automodule:: gym_jiminy.common.blocks.body_orientation_observer + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/api/gym_jiminy/common/blocks/index.rst b/docs/api/gym_jiminy/common/blocks/index.rst index 503021a085..d825f980dc 100644 --- a/docs/api/gym_jiminy/common/blocks/index.rst +++ b/docs/api/gym_jiminy/common/blocks/index.rst @@ -7,4 +7,6 @@ Blocks proportional_derivative_controller motor_safety_limit mahony_filter + body_orientation_observer deformation_estimator + quantity_observer diff --git a/docs/api/gym_jiminy/common/blocks/quantity_observer.rst b/docs/api/gym_jiminy/common/blocks/quantity_observer.rst new file mode 100644 index 0000000000..a29ba2dc0d --- /dev/null +++ b/docs/api/gym_jiminy/common/blocks/quantity_observer.rst @@ -0,0 +1,7 @@ +Generic Quantity Observer +========================= + +.. automodule:: gym_jiminy.common.blocks.quantity_observer + :members: + :undoc-members: + :show-inheritance: diff --git a/python/gym_jiminy/common/gym_jiminy/common/blocks/__init__.py b/python/gym_jiminy/common/gym_jiminy/common/blocks/__init__.py index e023858705..12a7fff8c0 100644 --- a/python/gym_jiminy/common/gym_jiminy/common/blocks/__init__.py +++ b/python/gym_jiminy/common/gym_jiminy/common/blocks/__init__.py @@ -1,17 +1,19 @@ # pylint: disable=missing-module-docstring -from .quantity_observer import QuantityObserver -from .mahony_filter import MahonyFilter from .motor_safety_limit import MotorSafetyLimit from .proportional_derivative_controller import PDController, PDAdapter +from .quantity_observer import QuantityObserver +from .mahony_filter import MahonyFilter +from .body_orientation_observer import BodyObserver from .deformation_estimator import DeformationEstimator __all__ = [ - 'QuantityObserver', - 'MahonyFilter', 'MotorSafetyLimit', 'PDController', 'PDAdapter', + 'QuantityObserver', + 'MahonyFilter', + 'BodyObserver', 'DeformationEstimator' ] diff --git a/python/gym_jiminy/common/gym_jiminy/common/blocks/body_orientation_observer.py b/python/gym_jiminy/common/gym_jiminy/common/blocks/body_orientation_observer.py new file mode 100644 index 0000000000..ee9785710e --- /dev/null +++ b/python/gym_jiminy/common/gym_jiminy/common/blocks/body_orientation_observer.py @@ -0,0 +1,174 @@ +"""Implementation of a stateless body orientation state estimator block +compatible with gym_jiminy reinforcement learning pipeline environment design. +""" +from collections import OrderedDict +from collections.abc import Mapping +from typing import List, Dict, Sequence, Union + +import numpy as np +import gymnasium as gym + +from jiminy_py.core import ImuSensor # pylint: disable=no-name-in-module + +from ..bases import BaseAct, BaseObs, BaseObserverBlock, InterfaceJiminyEnv +from ..wrappers.observation_layout import NestedData +from ..utils import (DataNested, + quat_to_rpy, + matrices_to_quat, + quat_multiply, + quat_apply, + remove_twist_from_quat) + + +class BodyObserver(BaseObserverBlock[ + Dict[str, np.ndarray], np.ndarray, BaseObs, BaseAct]): + """Compute the orientation and angular velocity in local frame of the + parent body associated with all the IMU sensors of the robot. + """ + def __init__(self, + name: str, + env: InterfaceJiminyEnv[BaseObs, BaseAct], + *, + nested_imu_quat_key: NestedData = ( + "features", "mahony_filter", "quat"), + nested_imu_omega_key: NestedData = ( + "features", "mahony_filter", "omega",), + ignore_twist: bool = False, + compute_rpy: bool = True, + update_ratio: int = 1) -> None: + """ + :param name: Name of the block. + :param env: Environment to connect with. + :param nested_imu_quat_key: Nested key from environment observation + mapping to the IMU quaternion estimates. + Their ordering must be consistent with the + true IMU sensors of the robot. + :param nested_imu_omega_key: Nested key from environment observation + mapping to the IMU angular velocity in + local frame estimates. Their ordering must + be consistent with the true IMU sensors of + the robot. + :param ignore_twist: Whether to ignore the twist of the IMU quaternion + estimate. + :param compute_rpy: Whether to compute the Yaw-Pitch-Roll Euler angles + representation for the 3D orientation of the IMU, + in addition to the quaternion representation. + Optional: True by default. + :param update_ratio: Ratio between the update period of the observer + and the one of the subsequent observer. -1 to + match the simulation timestep of the environment. + Optional: `1` by default. + """ + # Backup some of the user-argument(s) + self.ignore_twist = ignore_twist + self.compute_rpy = compute_rpy + + # Define observed / estimated IMU data proxies for fast access + obs_imu_data_list: List[np.ndarray] = [] + for nested_imu_key in (nested_imu_quat_key, nested_imu_omega_key): + obs_imu_data: DataNested = env.observation + for key in nested_imu_key: + if isinstance(key, str): + assert isinstance(obs_imu_data, Mapping) + obs_imu_data = obs_imu_data[key] + elif isinstance(key, int): + assert isinstance(obs_imu_data, Sequence) + obs_imu_data = obs_imu_data[key] + else: + assert isinstance(obs_imu_data, np.ndarray) + slices: List[Union[int, slice]] = [] + for start_end in key: + if isinstance(start_end, int): + slices.append(start_end) + elif not start_end: + slices.append(slice(None,)) + else: + slices.append(slice(*start_end)) + obs_imu_data = obs_imu_data[tuple(slices)] + assert isinstance(obs_imu_data, np.ndarray) + obs_imu_data_list.append(obs_imu_data) + self._obs_imu_quats, self._obs_imu_omegas = obs_imu_data_list + + # Extract the relative IMU frame placement wrt its parent body + imu_rel_rot_mats: List[np.ndarray] = [] + for sensor in env.robot.sensors[ImuSensor.type]: + frame = env.robot.pinocchio_model.frames[sensor.frame_index] + imu_rel_rot_mats.append(frame.placement.rotation) + self._imu_rel_quats = matrices_to_quat(tuple(imu_rel_rot_mats)) + + # Initialize the observer + super().__init__(name, env, update_ratio) + + # Define proxies for the body orientations and angular velocities + self._quat = self.observation["quat"] + if self.compute_rpy: + self._rpy = self.observation["rpy"] + else: + self._rpy = np.array([]) + self._omega = self.observation["omega"] + + def _initialize_observation_space(self) -> None: + num_imu_sensors = len(self.env.robot.sensors[ImuSensor.type]) + observation_space: Dict[str, gym.Space] = OrderedDict() + observation_space["quat"] = gym.spaces.Box( + low=np.full((4, num_imu_sensors), -1.0 - 1e-9), + high=np.full((4, num_imu_sensors), 1.0 + 1e-9), + dtype=np.float64) + if self.compute_rpy: + high = np.array([np.pi, np.pi/2, np.pi]) + 1e-9 + observation_space["rpy"] = gym.spaces.Box( + low=-high[:, np.newaxis].repeat(num_imu_sensors, axis=1), + high=high[:, np.newaxis].repeat(num_imu_sensors, axis=1), + dtype=np.float64) + observation_space["omega"] = gym.spaces.Box( + low=float('-inf'), + high=float('inf'), + shape=(3, num_imu_sensors), + dtype=np.float64) + self.observation_space = gym.spaces.Dict(observation_space) + + def _setup(self) -> None: + # Call base implementation + super()._setup() + + # Fix initialization of the observation to be valid quaternions + self._quat[-1] = 1.0 + + @property + def fieldnames(self) -> Dict[str, List[List[str]]]: + imu_sensors = self.env.robot.sensors[ImuSensor.type] + fieldnames: Dict[str, List[List[str]]] = {} + fieldnames["quat"] = [ + [".".join((sensor.name, f"Quat{e}")) for sensor in imu_sensors] + for e in ("X", "Y", "Z", "W")] + if self.compute_rpy: + fieldnames["rpy"] = [ + [".".join((sensor.name, e)) for sensor in imu_sensors] + for e in ("Roll", "Pitch", "Yaw")] + fieldnames["omega"] = [ + [".".join((sensor.name, e)) for sensor in imu_sensors] + for e in ("X", "Y", "Z")] + return fieldnames + + def refresh_observation(self, measurement: BaseObs) -> None: + # Compute the parent body orientations + quat_multiply(self._obs_imu_quats, + self._imu_rel_quats, + out=self._quat, + is_right_conjugate=True) + + # Remove twist if requested + if self.ignore_twist: + remove_twist_from_quat(self._quat) + + # Compute the parent body angular velocities in local frame. + # Note that batched "quaternion apply" is faster than sequential + # "rotation matrix apply" in practice, both when using `np.einsum` or + # single-threaded numba jitted implementation. + quat_apply(self._imu_rel_quats, + self._obs_imu_omegas, + out=self._omega) + + # Compute the RPY representation if requested + if self.compute_rpy: + quat_to_rpy(self._quat, self._rpy) diff --git a/python/gym_jiminy/common/gym_jiminy/common/blocks/deformation_estimator.py b/python/gym_jiminy/common/gym_jiminy/common/blocks/deformation_estimator.py index f82d2e7ef0..abb4babf6c 100644 --- a/python/gym_jiminy/common/gym_jiminy/common/blocks/deformation_estimator.py +++ b/python/gym_jiminy/common/gym_jiminy/common/blocks/deformation_estimator.py @@ -3,7 +3,7 @@ """ from collections import OrderedDict from collections.abc import Mapping -from typing import List, Dict, Sequence, Tuple, Optional +from typing import List, Dict, Sequence, Tuple, Union, Optional import numpy as np import numba as nb @@ -16,6 +16,7 @@ import pinocchio as pin from ..bases import BaseAct, BaseObs, BaseObserverBlock, InterfaceJiminyEnv +from ..wrappers.observation_layout import NestedData from ..utils import (DataNested, quat_to_rpy, matrices_to_quat, @@ -508,7 +509,7 @@ def __init__(self, imu_frame_names: Sequence[str], flex_frame_names: Sequence[str], ignore_twist: bool = True, - nested_imu_key: Sequence[str] = ( + nested_imu_key: NestedData = ( "features", "mahony_filter", "quat"), compute_rpy: bool = True, update_ratio: int = 1) -> None: @@ -649,11 +650,26 @@ def __init__(self, self._is_flex_flipped.append(np.array(is_flipped)) self._is_chain_orphan.append((is_parent_orphan, is_child_orphan)) - # Define IMU and encoder measurement proxy for fast access + # Define IMU observation proxy for fast access obs_imu_quats: DataNested = env.observation for key in nested_imu_key: - assert isinstance(obs_imu_quats, Mapping) - obs_imu_quats = obs_imu_quats[key] + if isinstance(key, str): + assert isinstance(obs_imu_quats, Mapping) + obs_imu_quats = obs_imu_quats[key] + elif isinstance(key, int): + assert isinstance(obs_imu_quats, Sequence) + obs_imu_quats = obs_imu_quats[key] + else: + assert isinstance(obs_imu_quats, np.ndarray) + slices: List[Union[int, slice]] = [] + for start_end in key: + if isinstance(start_end, int): + slices.append(start_end) + elif not start_end: + slices.append(slice(None,)) + else: + slices.append(slice(*start_end)) + obs_imu_quats = obs_imu_quats[tuple(slices)] assert isinstance(obs_imu_quats, np.ndarray) self._obs_imu_quats = obs_imu_quats diff --git a/python/gym_jiminy/common/gym_jiminy/common/blocks/mahony_filter.py b/python/gym_jiminy/common/gym_jiminy/common/blocks/mahony_filter.py index fde3a55ef1..6f9f77d963 100644 --- a/python/gym_jiminy/common/gym_jiminy/common/blocks/mahony_filter.py +++ b/python/gym_jiminy/common/gym_jiminy/common/blocks/mahony_filter.py @@ -162,15 +162,16 @@ class MahonyFilter(BaseObserverBlock[ .. note:: The feature space of this observer is a dictionary storing quaternion - estimates under key 'quat', and optionally, their corresponding - Yaw-Pitch-Roll Euler angles representations under key 'key' if - `compute_rpy=True`. Both leaves are 2D array of shape (N, M), where - N is the number of components of the representation while M is the - number of IMUs of the robot. More specifically, `N=4` for quaternions - (Qx, Qy, Qz, Qw) and 'N=3' for Euler angles (Roll-Pitch-Yaw). Indeed, - the Yaw angle of the Yaw-Pitch-Roll Euler angles representations is - systematically included in the feature space, even if its value is - meaningless, i.e. `ignore_twist=True`. + estimates under key 'quat', optionally, their corresponding + Yaw-Pitch-Roll Euler angles representations under key 'rpy' if + `compute_rpy=True`, and finally, the angular velocity in local frame + estimates under key 'omega'. Both leaves are 2D array of shape (N, M), + where N is the number of components of the representation while M is + the number of IMUs of the robot. Specifically, `N=4` for quaternions + (Qx, Qy, Qz, Qw), 'N=3' for both the Euler angles (Roll-Pitch-Yaw) and + the angular velocity. The Yaw angle of the Yaw-Pitch-Roll Euler angles + representations is systematically included in the feature space, even + if its value is meaningless, i.e. `ignore_twist=True`. .. warning:: This filter works best for 'observe_dt' smaller or equal to 5ms. Its @@ -186,7 +187,7 @@ def __init__(self, exact_init: bool = True, kp: Union[np.ndarray, float] = 1.0, ki: Union[np.ndarray, float] = 0.1, - compute_rpy: bool = True, + compute_rpy: bool = False, update_ratio: int = 1) -> None: """ :param name: Name of the block. @@ -205,14 +206,14 @@ def __init__(self, is not recommended because the robot is often free-falling at init, which is not realistic anyway. Optional: `True` by default. - :param mahony_kp: Proportional gain used for gyro-accel sensor fusion. - Set it to 0.0 to use only the gyroscope. In such a - case, the orientation estimate would be exact if the - sensor is bias- and noise-free, and the update period - matches the simulation integrator timestep. - Optional: `1.0` by default. - :param mahony_ki: Integral gain used for gyro bias estimate. - Optional: `0.1` by default. + :param kp: Proportional gain used for gyro-accel sensor fusion. Set it + to 0.0 to use only the gyroscope. In such a case, the + orientation estimate would be exact if the sensor is bias- + and noise-free, and the update period matches the + simulation integrator timestep. + Optional: `1.0` by default. + :param ki: Integral gain used for gyro bias estimate. + Optional: `0.1` by default. :param compute_rpy: Whether to compute the Yaw-Pitch-Roll Euler angles representation for the 3D orientation of the IMU, in addition to the quaternion representation. @@ -279,9 +280,6 @@ def __init__(self, if self._update_twist: self._state["twist"] = self._twist - # Store the estimate angular velocity to avoid redundant computations - self._omega = np.zeros((3, num_imu_sensors)) - # Initialize the observer super().__init__(name, env, update_ratio) @@ -291,6 +289,7 @@ def __init__(self, self._rpy = self.observation["rpy"] else: self._rpy = np.array([]) + self._omega = self.observation["omega"] def _initialize_state_space(self) -> None: """Configure the internal state space of the observer. @@ -340,6 +339,11 @@ def _initialize_observation_space(self) -> None: low=-high[:, np.newaxis].repeat(num_imu_sensors, axis=1), high=high[:, np.newaxis].repeat(num_imu_sensors, axis=1), dtype=np.float64) + observation_space["omega"] = gym.spaces.Box( + low=float('-inf'), + high=float('inf'), + shape=(3, num_imu_sensors), + dtype=np.float64) self.observation_space = gym.spaces.Dict(observation_space) def _setup(self) -> None: @@ -390,8 +394,11 @@ def fieldnames(self) -> Dict[str, List[List[str]]]: imu_sensors = self.env.robot.sensors[ImuSensor.type] fieldnames: Dict[str, List[List[str]]] = {} fieldnames["quat"] = [ - [f"{sensor.name}.Quat{e}" for sensor in imu_sensors] - for e in ("x", "y", "z", "w")] + [".".join((sensor.name, f"Quat{e}")) for sensor in imu_sensors] + for e in ("X", "Y", "Z", "W")] + fieldnames["omega"] = [ + [".".join((sensor.name, e)) for sensor in imu_sensors] + for e in ("X", "Y", "Z")] if self.compute_rpy: fieldnames["rpy"] = [ [".".join((sensor.name, e)) for sensor in imu_sensors] @@ -425,7 +432,7 @@ def refresh_observation(self, measurement: BaseObs) -> None: rot = robot.pinocchio_data.oMf[sensor.frame_index].rotation imu_rots.append(rot) - # Convert it to quaternions + # Convert the rotation matrices of the IMUs to quaternions matrices_to_quat(tuple(imu_rots), self._quat) # Keep track of tilt if necessary @@ -433,6 +440,11 @@ def refresh_observation(self, measurement: BaseObs) -> None: self._twist[:] = np.arctan2(self._quat[2], self._quat[3]) self._is_initialized = True + + # Compute the RPY representation if requested + if self.compute_rpy: + quat_to_rpy(self._quat, self._rpy) + return # Run an iteration of the filter, computing the next state estimate