From 935e46122be244eb02841c5b507ca2ca09b072fc Mon Sep 17 00:00:00 2001 From: Felix Kolesch Date: Thu, 25 Aug 2022 15:27:59 +0200 Subject: [PATCH 1/7] =?UTF-8?q?=F0=9F=92=A5=20=F0=9F=8E=A8=20Restructure?= =?UTF-8?q?=20Task=20classes?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- python/stack_of_tasks/tasks/Eq_Tasks.py | 101 ++++++++++++++ python/stack_of_tasks/tasks/Ieq_Tasks.py | 21 +++ python/stack_of_tasks/tasks/Task.py | 93 +++++++++---- python/stack_of_tasks/tasks/Tasks.py | 160 ----------------------- python/stack_of_tasks/utils.py | 8 ++ 5 files changed, 194 insertions(+), 189 deletions(-) create mode 100644 python/stack_of_tasks/tasks/Eq_Tasks.py create mode 100644 python/stack_of_tasks/tasks/Ieq_Tasks.py delete mode 100644 python/stack_of_tasks/tasks/Tasks.py diff --git a/python/stack_of_tasks/tasks/Eq_Tasks.py b/python/stack_of_tasks/tasks/Eq_Tasks.py new file mode 100644 index 0000000..7b48c80 --- /dev/null +++ b/python/stack_of_tasks/tasks/Eq_Tasks.py @@ -0,0 +1,101 @@ +import numpy as np +from tf.transformations import rotation_from_matrix + +from stack_of_tasks.tasks.Task import EqTask, TaskSoftnessType +from stack_of_tasks.utils import skew + + +class PositionTask(EqTask): + name = "Position" + task_size: int = 3 + + def _compute(self, J: np.ndarray, current: np.ndarray, target: np.ndarray): + self.A = J[:3] + self.bound = target[:3, 3] - current[:3, 3] + + +class OrientationTask(EqTask): + name = "Orientation" + task_size: int = 3 + + def _compute(self, J, current, target): + self.A = J[3:] + + delta = np.identity(4) + delta[0:3, 0:3] = current[0:3, 0:3].T.dot(target[0:3, 0:3]) + angle, axis, _ = rotation_from_matrix(delta) + + self.bound = current[0:3, 0:3].dot(angle * axis) + + +class DistanceTask(EqTask): + """Keep distance to target position, not considering (approach) direction""" + + name = "Distance" + task_size: int = 1 + + def _compute(self, J, current, target, distance): + delta = current[0:3, 3] - target[0:3, 3] + self.A = np.array([delta.T.dot(J[:3])]) + self.bound = np.array([-(np.linalg.norm(delta) - distance)]) + + +class PlaneTask(EqTask): + name = "Plane" + task_size: int = 1 + + def _compute(self, J, current, target): + """Move eef within plane given by normal vector and distance to origin""" + + normal = target[0:3, 2] + dist = normal.dot(target[0:3, 3]) + + self.A = np.array([normal.T.dot(J[:3])]) + self.bound = np.array([-(normal.dot(current[0:3, 3]) - dist)]) + + +class ParallelTask(EqTask): + name = "Parallel" + task_size: int = 3 + + def _compute(self, J, current, target, robot_axis, target_axis): + """Align axis in eef frame to be parallel to reference axis in base frame""" + + # transform axis from eef frame to base frame + axis = current[0:3, 0:3].dot(robot_axis) + ref = target[0:3, 0:3].dot(target_axis) + self.A = (skew(ref).dot(skew(axis))).dot(J[3:]) + self.bound = np.cross(ref, axis) + + +class LineTask(EqTask): + name = "Line" + task_size: int = 3 + + def _compute(self, J, current, line_point): + normal = line_point[0:3, 2] + sw = skew(normal) + + d = current[0:3, 3] - line_point[0:3, 3] + + self.A = sw.dot(J[:3]) + self.bound = -sw.dot(d) + + +class JointPos(EqTask): + name = "Joint Position" + + def __init__( + self, wheight: float, softnessType: TaskSoftnessType, number_of_joints: int + ) -> None: + super().__init__(wheight, softnessType) + self.task_size = number_of_joints + + def _compute(self, current_jount_pose, desired_joint_pose): + self.A = np.identity(current_jount_pose.size) + self.bound = current_jount_pose + desired_joint_pose + + +if __name__ == "__main__": + p = PositionTask(1, TaskSoftnessType.hard) + print(p.args) diff --git a/python/stack_of_tasks/tasks/Ieq_Tasks.py b/python/stack_of_tasks/tasks/Ieq_Tasks.py new file mode 100644 index 0000000..7ab89cb --- /dev/null +++ b/python/stack_of_tasks/tasks/Ieq_Tasks.py @@ -0,0 +1,21 @@ +import numpy as np + +from stack_of_tasks.utils import skew + +from .Task import IeqTask + + +class ConeTask(IeqTask): + name = "Cone" + + def _compute(self, J, current, target, threshold, target_axis, robot_axis): + """Align axis in eef frame to lie in cone spanned by reference axis and opening angle acos(threshold)""" + + threshold = np.cos(threshold) + # transform axis from eef frame to base frame + axis = current[0:3, 0:3].dot(robot_axis) + reference = target[0:3, 0:3].dot(target_axis) + + self.A = np.array([reference.T.dot(skew(axis)).dot(J[3:])]) + self.upper_bound = np.array([reference.T.dot(axis) - threshold]) + self.lower_bound = np.array([-10e20]) diff --git a/python/stack_of_tasks/tasks/Task.py b/python/stack_of_tasks/tasks/Task.py index a5134d8..5d03e22 100644 --- a/python/stack_of_tasks/tasks/Task.py +++ b/python/stack_of_tasks/tasks/Task.py @@ -1,49 +1,84 @@ -import typing from abc import ABC, abstractmethod +from enum import Enum +from inspect import signature +from typing import Any, NoReturn, Optional, Tuple import numpy as np +from numpy.typing import ArrayLike -class TaskDesc: - def __init__(self, A, upper, lower, name, is_hard=True) -> None: +class TaskSoftnessType(Enum): + hard = 1 + linear = 2 + quadratic: 3 - self.name = name - self.is_hard = is_hard - self.A = A - self.upper = upper - self.lower = lower - @property - def size(self): - return self.A.shape[0] +class Task(ABC): + name: str + task_size: int - def unpack(self): - """returns A, upper, lower bounds""" - return self.A, self.upper, self.lower + def __init__(self, wheight: float, softnessType: TaskSoftnessType) -> None: + super().__init__() + self.args = frozenset( + [ + (p.name, None if p.annotation is p.empty else p.annotation) + for p in signature(self._compute).parameters.values() + ] + ) + self.argmap = dict([(i, i) for i in self.args]) -class EQTaskDesc(TaskDesc): - def __init__(self, A, bound, name=None, is_hard=False) -> None: - super().__init__(A, bound, bound, name, is_hard) + self.weight = wheight + self.softnessType = softnessType + self.A: ArrayLike = np.zeros((0, self.task_size)) -class Task(ABC): - name: str - args: typing.List + self.residual = None + self.violation = None + self.importance = None - def __init__(self, scale: float = 1.0, hard_task: bool = False) -> None: - self._scale = scale - self._hard = hard_task + def set_argument_mapping(self, argument: str, mapping_value: str) -> Optional[NoReturn]: + if argument not in self.args: + raise LookupError( + f"No such argument '{argument}' in task {self.__class__.name}.\n Possible mappings are: {self.args}" + ) - self.argmap = {a: a for a in self.args} + self.argmap[argument] = mapping_value def _map_args(self, data: dict): - return tuple(map(data.get, self.argmap.values())) + return {k: data.get(v) for k, v in self.argmap.items()} @abstractmethod - def compute(self, data): + def _compute(self, *args, **kwargs): pass - @staticmethod - def skew(w): - return np.array([[0, -w[2], w[1]], [w[2], 0, -w[0]], [-w[1], w[0], 0]]) + @abstractmethod + def compute(self, data) -> Any: + mapped = self._map_args(data) + self._compute(**mapped) + + +class EqTask(Task): + def __init__(self, wheight: float, softnessType: TaskSoftnessType) -> None: + super().__init__(wheight, softnessType) + self.bound = np.zeros((1, 0)) + + @abstractmethod + def _compute(self, *args, **kwargs): + pass + + def compute(self, data) -> Tuple[ArrayLike, ArrayLike]: + super().compute(data) + return self.A, self.bound + + +class IeqTask(Task): + def __init__(self, wheight: float, softnessType: TaskSoftnessType) -> None: + super().__init__(wheight, softnessType) + + self.lower_bound = np.zeros((1, 0)) + self.upper_bound = np.zeros((1, 0)) + + def compute(self, data) -> Tuple[ArrayLike, ArrayLike, ArrayLike]: + super().compute(data) + return self.A, self.lower_bound, self.upper_bound diff --git a/python/stack_of_tasks/tasks/Tasks.py b/python/stack_of_tasks/tasks/Tasks.py deleted file mode 100644 index f8f4587..0000000 --- a/python/stack_of_tasks/tasks/Tasks.py +++ /dev/null @@ -1,160 +0,0 @@ -import typing -from concurrent.futures import thread - -import numpy as np -from tf import transformations as tf - -from .Task import EQTaskDesc, Task, TaskDesc - - -class PositionTask(Task): - name = "Pos" - args = ["J", "T_c", "T_t"] - - def compute(self, data): - J, T_c, T_t = self._map_args(data) - A = J[:3] - b = self._scale * (T_t[:3, 3] - T_c[:3, 3]) - return EQTaskDesc(A, b, self.name, self._hard) - - -class OrientationTask(Task): - name = "Ori" - args = ["J", "T_c", "T_t"] - - def compute(self, data): - J, T_c, T_t = self._map_args(data) - - A = J[3:] - delta = np.identity(4) - delta[0:3, 0:3] = T_c[0:3, 0:3].T.dot(T_t[0:3, 0:3]) - angle, axis, _ = tf.rotation_from_matrix(delta) - b = self._scale * (T_c[0:3, 0:3].dot(angle * axis)) - - return EQTaskDesc(A, b, self.name) - - -class DistanceTask(Task): - """Keep distance to target position, not considering (approach) direction""" - - name = "Dist" - args = ["J", "T_c", "T_t", "dist"] - - def compute(self, data): - J, T_c, T_t, dist = self._map_args(data) - - delta = T_c[0:3, 3] - T_t[0:3, 3] - A = np.array([delta.T.dot(J[:3])]) - b = np.array([-self._scale * (np.linalg.norm(delta) - dist)]) - return EQTaskDesc(A, b, self.name) - - -class ConstantSpeed(Task): - name = "ConstantSpeed" - args = ["J", "T_c", "T_t", "normal", "speed"] - - def compute(self, data): - J, T_c, T_t, normal, speed = self._map_args(data) - - axis = T_c[0:3, 0:3].dot(normal) - A = J[:3] - - r = T_t[0:3, 3] - T_c[0:3, 3] - b = np.cross(axis, r) - b = speed * b / np.linalg.norm(b) - - return EQTaskDesc(A, b, self.name) - - -class ParallelTask(Task): - name = "Parallel" - - def __init__(self, tgt_axis, robot_axis, scale=1.0, hard_task=False) -> None: - super().__init__(scale, hard_task) - self.ta = tgt_axis - self.ra = robot_axis - - def compute(self, J, T_c, T_t): - """Align axis in eef frame to be parallel to reference axis in base frame""" - axis = T_c[0:3, 0:3].dot(self.ra) # transform axis from eef frame to base frame - ref = T_t[0:3, 0:3].dot(self.ta) - A = (self.skew(ref).dot(self.skew(axis))).dot(J[3:]) - b = self._scale * np.cross(ref, axis) - return EQTaskDesc(A, b, self.name) - - -class PlaneTask(Task): - name = "Plane" - args = ["J", "T_c", "T_t"] - - def compute(self, data): - """Move eef within plane given by normal vector and distance to origin""" - J, T_c, T_t = self._map_args(data) - - normal = T_t[0:3, 2] - dist = normal.dot(T_t[0:3, 3]) - - A = np.array([normal.T.dot(J[:3])]) - b = np.array([-self._scale * (normal.dot(T_c[0:3, 3]) - dist)]) - return EQTaskDesc(A, b, self.name) - - -class LineTask(Task): - name = "Line" - - def compute(self, J, T_c, T_t): - normal = T_t[0:3, 2] - sw = self.skew(normal) - - d = T_c[0:3, 3] - T_t[0:3, 3] - - A = sw.dot(J[:3]) - b = -self._scale * sw.dot(d) - return EQTaskDesc(A, b, self.name) - - -class ConeTask(Task): - name = "Cone" - args = ["J", "T_c", "T_t", "angle", "target_axis", "robot_axis"] - - def compute(self, data): - """Align axis in eef frame to lie in cone spanned by reference axis and opening angle acos(threshold)""" - J, T_c, T_t, threshold, ta, ra = self._map_args(data) - - threshold = np.cos(threshold) - axis = T_c[0:3, 0:3].dot(ra) # transform axis from eef frame to base frame - reference = T_t[0:3, 0:3].dot(ta) - - A = np.array([reference.T.dot(self.skew(axis)).dot(J[3:])]) - b = np.array([self._scale * (reference.T.dot(axis) - threshold)]) - - return TaskDesc(A, b, np.array([-10e20]), self.name) - - -class JointPos(Task): - name = "Joint Position" - args = ["current_jp", "desired_joint_pose"] - - def compute(self, data): - (current_jp, desired_joint_pose) = self._map_args(data) - - A = np.identity(current_jp.size) - b = -self._scale * (desired_joint_pose - current_jp) - return EQTaskDesc(A, b, self.name) - - -class PreventJointBounds(Task): - name = "Prevent Joints Bound" - args = ["joint_position", "min", "max"] - - def compute(self, data): - (current_jp, mins, maxs) = self._map_args(data) - current_jp = current_jp[:, 0] - - A = np.identity(current_jp.size) - b = -self._scale * ( - np.log(current_jp - mins) / (maxs - current_jp) - - np.log(np.max(maxs - current_jp, 0)) / (current_jp - mins) - ) - - return EQTaskDesc(A, b, self.name) diff --git a/python/stack_of_tasks/utils.py b/python/stack_of_tasks/utils.py index a1349ce..1017d42 100644 --- a/python/stack_of_tasks/utils.py +++ b/python/stack_of_tasks/utils.py @@ -1,9 +1,17 @@ +import numpy as np + + class Callback(list): def __call__(self, *args) -> None: for listener in self: listener(*args) +@staticmethod +def skew(w): + return np.array([[0, -w[2], w[1]], [w[2], 0, -w[0]], [-w[1], w[0], 0]]) + + def prettyprintMatrix(inmatrix): matrix = [] maxL = 0 From 6479aae157e8d3b984b7666414be1cc483db8347 Mon Sep 17 00:00:00 2001 From: Felix Kolesch Date: Thu, 1 Sep 2022 22:52:08 +0200 Subject: [PATCH 2/7] =?UTF-8?q?=F0=9F=90=9B=20Fix=20Tasks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- python/stack_of_tasks/tasks/Eq_Tasks.py | 8 ++++---- python/stack_of_tasks/tasks/Task.py | 14 ++++++++++---- python/stack_of_tasks/tasks/TaskHierachy.py | 8 +++----- 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/python/stack_of_tasks/tasks/Eq_Tasks.py b/python/stack_of_tasks/tasks/Eq_Tasks.py index 7b48c80..c2c734e 100644 --- a/python/stack_of_tasks/tasks/Eq_Tasks.py +++ b/python/stack_of_tasks/tasks/Eq_Tasks.py @@ -88,12 +88,12 @@ class JointPos(EqTask): def __init__( self, wheight: float, softnessType: TaskSoftnessType, number_of_joints: int ) -> None: - super().__init__(wheight, softnessType) self.task_size = number_of_joints + super().__init__(wheight, softnessType) - def _compute(self, current_jount_pose, desired_joint_pose): - self.A = np.identity(current_jount_pose.size) - self.bound = current_jount_pose + desired_joint_pose + def _compute(self, current_joint_pose, desired_joint_pose): + self.A = np.identity(current_joint_pose.size) + self.bound = current_joint_pose - desired_joint_pose if __name__ == "__main__": diff --git a/python/stack_of_tasks/tasks/Task.py b/python/stack_of_tasks/tasks/Task.py index 5d03e22..5dad143 100644 --- a/python/stack_of_tasks/tasks/Task.py +++ b/python/stack_of_tasks/tasks/Task.py @@ -1,3 +1,4 @@ +import typing from abc import ABC, abstractmethod from enum import Enum from inspect import signature @@ -10,7 +11,7 @@ class TaskSoftnessType(Enum): hard = 1 linear = 2 - quadratic: 3 + quadratic = 3 class Task(ABC): @@ -26,7 +27,7 @@ def __init__(self, wheight: float, softnessType: TaskSoftnessType) -> None: for p in signature(self._compute).parameters.values() ] ) - self.argmap = dict([(i, i) for i in self.args]) + self.argmap = dict([(i, i) for i, _ in self.args]) self.weight = wheight self.softnessType = softnessType @@ -38,9 +39,10 @@ def __init__(self, wheight: float, softnessType: TaskSoftnessType) -> None: self.importance = None def set_argument_mapping(self, argument: str, mapping_value: str) -> Optional[NoReturn]: - if argument not in self.args: + if argument not in map(lambda x: x[0], self.args): raise LookupError( - f"No such argument '{argument}' in task {self.__class__.name}.\n Possible mappings are: {self.args}" + f"""No such argument '{argument}' in task {self.__class__.name}. + Possible mappings are: {self.args}""" ) self.argmap[argument] = mapping_value @@ -82,3 +84,7 @@ def __init__(self, wheight: float, softnessType: TaskSoftnessType) -> None: def compute(self, data) -> Tuple[ArrayLike, ArrayLike, ArrayLike]: super().compute(data) return self.A, self.lower_bound, self.upper_bound + + +TaskType = typing.Union[EqTask, IeqTask] +TaskHierachyType = typing.List[typing.List[TaskType]] diff --git a/python/stack_of_tasks/tasks/TaskHierachy.py b/python/stack_of_tasks/tasks/TaskHierachy.py index c6521bd..5a97949 100644 --- a/python/stack_of_tasks/tasks/TaskHierachy.py +++ b/python/stack_of_tasks/tasks/TaskHierachy.py @@ -32,8 +32,6 @@ def remove_task(self, task): pass def compute(self, data): - r = [] - - for h in self.hierarchy: - r.append(list(map(lambda x: x.compute(data), h))) - return r + for level in self.hierarchy: + for task in level: + task.compute(data) From da45fdc973d9b54ed934c064026eca8831d461d2 Mon Sep 17 00:00:00 2001 From: Felix Kolesch Date: Thu, 1 Sep 2022 23:23:33 +0200 Subject: [PATCH 3/7] =?UTF-8?q?=F0=9F=8E=A8=20=20Combine=20matrix=20for=20?= =?UTF-8?q?linear/hard=20and=20quadratic=20tasks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- python/stack_of_tasks/controller.py | 35 +- python/stack_of_tasks/solver/HQPSolver.py | 605 +++++++++++++--------- 2 files changed, 369 insertions(+), 271 deletions(-) diff --git a/python/stack_of_tasks/controller.py b/python/stack_of_tasks/controller.py index 2506715..9acd200 100755 --- a/python/stack_of_tasks/controller.py +++ b/python/stack_of_tasks/controller.py @@ -1,16 +1,13 @@ #!/usr/bin/env python3 -import random - import numpy as np import rospy from interactive_markers.interactive_marker_server import InteractiveMarkerServer from sensor_msgs.msg import JointState -from std_msgs.msg import Header from tf import transformations as tf from stack_of_tasks.marker.interactive_marker import IAMarker -from stack_of_tasks.robot_model import RobotModel, JointType +from stack_of_tasks.robot_model import RobotModel from stack_of_tasks.solver.AbstactSolver import Solver from stack_of_tasks.solver.HQPSolver import HQPSolver from stack_of_tasks.solver.InverseJacobianSolver import InverseJacobianSolver @@ -138,9 +135,11 @@ def hierarchic_control(self, targets): lb = np.maximum(-0.01, (self.mins * 0.95 - self._joint_position) / self.rate) ub = np.minimum(0.01, (self.maxs * 0.95 - self._joint_position) / self.rate) - tasks = self.task_hierarchy.compute(targets) + self.task_hierarchy.compute(targets) - dq, tcr = self.solver.solve(tasks, lb, ub, warmstart=self.last_dq) + dq, tcr = self.solver.solve( + self.task_hierarchy.hierarchy, lb, ub, warmstart=self.last_dq + ) self.last_dq = dq @@ -174,8 +173,10 @@ def delete_marker(self, name): if __name__ == "__main__": from stack_of_tasks.marker.markers import SixDOFMarker - from stack_of_tasks.tasks.Tasks import ConeTask, OrientationTask, PositionTask - from stack_of_tasks.plot.plot_publisher import PlotPublisher + from stack_of_tasks.tasks.Eq_Tasks import JointPos, OrientationTask, PositionTask + from stack_of_tasks.tasks.Task import TaskSoftnessType + + # from stack_of_tasks.tasks.Ieq_Tasks import ConeTask np.set_printoptions(precision=3, suppress=True, linewidth=100, floatmode="fixed") @@ -190,6 +191,7 @@ def set_target(name, data): c = Controller(solver_class=HQPSolver, rho=0.1) c.T_callback.append(lambda T: set_target("T", T)) c.J_callback.append(lambda J: set_target("J", J)) + c.reset() mc = MarkerControl() @@ -198,20 +200,23 @@ def set_target(name, data): mc.add_marker(marker, marker.name) # setup tasks - pos = PositionTask(1.0) - pos.argmap["T_c"] = "T" - pos.argmap["T_t"] = marker.name + pos = PositionTask(1, TaskSoftnessType.linear) + pos.set_argument_mapping("current", "T") + pos.set_argument_mapping("target", marker.name) # cone = ConeTask((0, 0, 1), (0, 0, 1), 0.1) # cone.argmap["T_t"] = "Position" # cone.argmap["angle"] = "Cone_angle" - ori = OrientationTask(1.0) - ori.argmap["T_c"] = "T" - ori.argmap["T_t"] = marker.name + ori = OrientationTask( + 1, + TaskSoftnessType.linear, + ) + ori.set_argument_mapping("current", "T") + ori.set_argument_mapping("target", marker.name) c.task_hierarchy.add_task_lower(pos) - c.task_hierarchy.add_task_same(ori) + c.task_hierarchy.add_task_lower(ori) # pp = PlotPublisher() # pp.add_plot("q", [f"q/{joint.name}" for joint in c.robot.active_joints]) diff --git a/python/stack_of_tasks/solver/HQPSolver.py b/python/stack_of_tasks/solver/HQPSolver.py index c27eff8..bd29461 100755 --- a/python/stack_of_tasks/solver/HQPSolver.py +++ b/python/stack_of_tasks/solver/HQPSolver.py @@ -1,299 +1,392 @@ #!/usr/bin/env python + import numpy as np import osqp +from numpy.typing import NDArray from scipy import sparse +from stack_of_tasks.tasks.Task import ( + EqTask, + TaskHierachyType, + TaskSoftnessType, + TaskType, +) + from .AbstactSolver import Solver -from ..tasks.Task import EQTaskDesc -class HQPComponentSlacks: - """ - Iterator to form QP problems for the stack of tasks. - This variant considers soft constraints via componentwise slack variables, - i.e. by lb - slack ≤ J dq ≤ ub + slack. - For large errors, this results in strong deviations from straight-line solution. - - The constraints of all previous tasks are considered via equality constraints: - J' dq = J' dq'. - - Thus, we can use fixed size constraint and optimization matrices A and H, which are incrementally filled. - The maximal number of slack variables for optimization is the number of soft DoFs of a task at a given level. - The constraint matrix has the following shape: - ⎧1 - dq ⎨ ⋱ - ⎩ 1 - ⎧ 1 - max slack variables ⎨ ⋱ - ⎩ 0 - higher-level tasks { J' 0 - cur. task ub { J -1 - cur. task lb { J +1 +class HQPMatrix: """ - - @staticmethod - def get_matrix_specs(task_stack): - max_rows = 0 - row_sum = 0 - max_slacks = 0 - - for tasks_same_level in task_stack: - level_rows = 0 - for t in tasks_same_level: - level_rows += t.size - max_slacks = max(max_slacks, level_rows) - max_rows = max(max_rows, row_sum + 2 * level_rows) - row_sum += level_rows - - return max_rows, max_slacks - - def __init__(self, N, tasks, lower, upper, rho) -> None: - super().__init__() - - max_rows, self.max_slacks = self.get_matrix_specs(tasks) - self.tasks = tasks - - self.N = N # number of joints - self.max_vars = self.N + self.max_slacks # variable order: dq, slack - - # (quadratic) objective matrix (Hessian): minimize rho*|dq|² + |slack|² - self.H = np.identity(self.max_vars) - self.H[:N, :N] *= rho - # self.H[N:, N:] *= 50 - - self.q = np.zeros(self.max_vars) # (linear) objective vector - - self.A = np.zeros( - (self.max_vars + max_rows, self.max_vars) - ) # constraint matrix for all tasks - self.lb = np.zeros(self.max_vars + max_rows) # lower bound vector for all tasks - self.ub = np.zeros(self.max_vars + max_rows) # upper " - - self.A[: self.N, : self.N] = np.identity( - self.N - ) # joint constraint matrix is identity - self.lb[: self.N] = lower # lower joint bounds - self.lb[self.N : self.max_vars] = 0 # lower slack bounds - - self.ub[: self.N] = upper # upper joint bounds - self.ub[self.N : self.max_vars] = np.PINF # upper slack bounds - - def __iter__(self): # initialize iterator - self._index = -1 - self.start_row = self.max_vars - return self - - def __next__(self): # forward iterator: return QP problem of next hierarchy level - self._index += 1 - if self._index == len(self.tasks): - raise StopIteration - - # configure upper bound constraints: -∞ ≤ J dq - slack ≤ ub - used_vars = self.N # number of optimization variables: dq and slacks - used_rows = self.start_row - for task in self.tasks[self._index]: - rows = task.size - self.A[used_rows : used_rows + rows, : self.N] = task.A - self.A[used_rows : used_rows + rows, used_vars : used_vars + rows] = -np.eye(rows) - self.lb[used_rows : used_rows + rows] = np.NINF - self.ub[used_rows : used_rows + rows] = task.upper - - used_vars += rows - used_rows += rows - - # configure lower bound constraints: lb ≤ J dq + slack ≤ +∞ - used_vars = self.N - for task in self.tasks[self._index]: - rows = task.size - self.A[used_rows : used_rows + rows, : self.N] = task.A - self.A[used_rows : used_rows + rows, used_vars : used_vars + rows] = np.eye(rows) - self.lb[used_rows : used_rows + rows] = task.lower - self.ub[used_rows : used_rows + rows] = np.PINF - - used_vars += rows - used_rows += rows - - self.A[self.N : used_vars, self.N : used_vars] = np.eye( - used_vars - self.N - ) # slack constraints - unused = self.max_vars - used_vars # unused slack variables - self.A[used_vars : self.max_vars, used_vars : self.max_vars] = np.zeros( - (unused, unused) + A lb ub + ⎧1 0 1 for lin. task + max slack variables ⎨ ⋱ -inf inf for quad. task + ⎩ 0 0 1 + ⎧ 1 lb ub + dq ⎨ ⋱ + ⎩ 1 lb ub + higher-level tasks { 0 J' J' dq' J' dq' + hard eq. task { 0 J ξ ξ J dq = ξ + soft eq. task { ξ J ξ ξ s ξ + J dq = ξ + hard ineq. task { 0 J lb ub lb ≤ J dq ≤ ub + quad task ub { +1 J lb inf lb ≤ s + J dq + quad task lb { -1 J -inf ubs -s + J dq ≤ ub + (soft ineq. task { 1 J lb ub lb ≤ 1 s + J dq ≤ ub)""" + + def __init__(self, number_of_vars, rho=0.01) -> None: + self.sot: TaskHierachyType = [] + + self.N = number_of_vars # number of problem variables + self.rho = rho + + # [m]aximal number of slack variables needed + self.m_slack = 0 + # [m]aximal number of rows occupied by tasks without slack variables + self.m_hard_r = 0 + # [m]aximal number of rows occupied by tasks with slack variables in one level + self.m_wslack_r = 0 + # [m]aximal number of rows occupied by nullspace + # constraints of tasks in previous levels + self.m_nspace_r = 0 + + # starting rows for hard, slack and nullspace in A + self._hs = self.N + self.m_slack + self._ss = self._hs + self.m_hard_r + self._ns = self._ss + self.m_wslack_r + + # [u]sed rows by hard tasks + self.u_hard_r = 0 + # [u]sed rows by tasks with slack + self.u_wslack_r = 0 + # [u]sed rows by nullspace constraint + self.u_nspace_r = 0 + + self.J = np.identity(self.N) # J matrix + self.p = np.zeros((self.N)) # p vector + + # internal A matrix, lower and upper bound with space for all constraints + self._A = np.zeros((self.N, self.N)) + self._lower = np.zeros((self.N,)) + self._upper = np.zeros((self.N,)) + + # stubs for r/w views on _A, _lower and _upper + self.slack_diag: NDArray # diagonal entries of slacks in A + + self._A_hard: NDArray # hard tasks in A + self._A_nullspace: NDArray # nullspace tasks in A + self._A_wslack: NDArray # tasks with slack in A + self._A_slacks: NDArray # slack part of tasks with slacks + # same as above but for lower and upper bounds + self._l_dq: NDArray + self._u_dq: NDArray + self._l_slacks: NDArray + self._u_slacks: NDArray + self._l_hard: NDArray + self._u_hard: NDArray + self._l_slacked: NDArray + self._u_slacked: NDArray + self._l_nullspace: NDArray + self._u_nullspace: NDArray + + # iterator index + self._level_index = 0 + + def set_tasks(self, sot: TaskHierachyType): + recalc = self._check_stack_different(sot) + self.sot = sot + if recalc: + self._recalculate_dimensions() + + def _recalculate_dimensions(self): + self._calc_matrix_size() + self._set_matrices() + self._define_views() + + def _check_stack_different(self, new_stack: TaskHierachyType): + if len(new_stack) == len(self.sot): + for nl, ol in zip(new_stack, self.sot): + if len(nl) == len(ol): + for tn, to in zip(nl, ol): + if not (type(tn) is type(to) and tn.softnessType is to.softnessType): + return True + return False + return True + + def _set_matrices(self): + self.J = np.identity(self.N + self.m_slack) + self.J[self.m_slack :, self.m_slack :] = self.rho + self.p = np.zeros((self.N + self.m_slack,)) + + self._A = np.zeros( + ( + self.m_slack + self.N + self.m_hard_r + self.m_wslack_r + self.m_nspace_r, + self.m_slack + self.N, + ) ) - return ( - self.H[:used_vars, :used_vars], - self.q[:used_vars], - self.A[:used_rows, :used_vars], - self.lb[:used_rows], - self.ub[:used_rows], + self._lower = np.zeros( + (self.m_slack + self.N + self.m_hard_r + self.m_wslack_r + self.m_nspace_r,) ) - def nullspace_constraint(self, x): - # Replace upper-bound part of the current task-level's constraints with nullspace constraint - # J' dq = fixed, i.e. don't become worse than current, higher-priority solution - dq = x[: self.N] - used_rows = 0 - for task in self.tasks[self._index]: - rows = task.size - fixed = task.A.dot(dq) - assert np.allclose( - self.A[self.start_row : self.start_row + rows, : self.N], task.A - ) - - self.A[ - self.start_row : self.start_row + rows, self.N : - ] = 0 # clear slacks from constraint rows - self.lb[self.start_row : self.start_row + rows] = fixed - self.ub[self.start_row : self.start_row + rows] = fixed - - used_rows += rows - self.start_row += rows # increase start position for next iteration - - self.A[ - self.start_row : self.start_row + used_rows, self.N : - ] = 0 # clear slacks from all remaining (lb) rows - - return dq, x[self.N :] + self._upper = np.zeros( + (self.m_slack + self.N + self.m_hard_r + self.m_wslack_r + self.m_nspace_r,) + ) + # set diagonal entries of A for dq to 1 + self._A[ + self.m_slack : self.m_slack + self.N, self.m_slack : self.m_slack + self.N + ] = np.identity(self.N) -class HQPLinearSlacks: - """ - Iterator to form QP problems for the stack of tasks. - This variant considers soft equality constraints via linear scale variables λ, - i.e. J dq = λ * ξ, maximizing λ subject to 0 ≤ λ ≤ 1. - - As we need to formulate a minimization problem, we actually minimize s = 1-λ. - - The constraints of all previous tasks are considered via equality constraints: - J' dq = J' dq'. - - Thus, we can use fixed size constraint and optimization matrices A and H, which are incrementally filled. - The maximal number of slack variables for optimization is the number of soft DoFs of a task at a given level. - The constraint matrix has the following structure: - A lb ub - ⎧1 0 1 - max slack variables ⎨ ⋱ - ⎩ 1 0 1 - ⎧ 1 lb ub - dq ⎨ ⋱ - ⎩ 1 lb ub - higher-level tasks { 0 J' J' dq' - hard eq. task { 0 J ξ ξ J dq = ξ - soft eq. task { ξ J ξ ξ s ξ + J dq = ξ - hard ineq. task { 0 J lb ub lb ≤ J dq ≤ ub - (soft ineq. task { 1 J lb ub lb ≤ 1 s + J dq ≤ ub) - """ + def _define_views(self): - def __init__(self, N, tasks, lower, upper, rho) -> None: - super().__init__() - self.tasks = tasks - self.max_slacks = np.amax( - [np.sum([not task.is_hard for task in level]) for level in tasks] + self.slack_diag = np.lib.stride_tricks.as_strided( + self._A, + shape=(self.m_slack,), + strides=(sum(self._A.strides),), ) - max_constraints = np.sum([task.size for level in tasks for task in level]) - - self.N = N # number of joints - self.max_vars = self.N + self.max_slacks # variable order: slack, dq - # objective to minimize: rho*|dq|² + |slack|² - self.H = np.identity(self.max_vars) # Hessian matrix - np.fill_diagonal(self.H[-N:, -N:], rho) - self.q = np.zeros(self.max_vars) # gradient vector + self._A_hard = self._A[self._hs : self._hs + self.m_hard_r, self.m_slack :] + self._A_nullspace = self._A[self._ns : self._ns + self.m_nspace_r, self.m_slack :] + self._A_wslack = self._A[self._ss : self._ss + self.m_wslack_r, self.m_slack :] + self._A_slacks = self._A[self._ss : self._ss + self.m_wslack_r, : self.m_slack] + + self._l_dq = self._lower[self.m_slack : self.m_slack + self.N] + self._u_dq = self._upper[self.m_slack : self.m_slack + self.N] + + self._l_slacks = self._lower[ + : self.m_slack, + ] + + self._u_slacks = self._upper[ + : self.m_slack, + ] + + self._l_hard = self._lower[ + self._hs : self._hs + self.m_hard_r, + ] + + self._u_hard = self._upper[ + self._hs : self._hs + self.m_hard_r, + ] + + self._l_slacked = self._lower[ + self._ss : self._ss + self.m_wslack_r, + ] + + self._u_slacked = self._upper[ + self._ss : self._ss + self.m_wslack_r, + ] + self._l_nullspace = self._lower[ + self._ns : self._ns + self.m_nspace_r, + ] + + self._u_nullspace = self._upper[ + self._ns : self._ns + self.m_nspace_r, + ] + + def _calc_matrix_size(self): + for level in self.sot: + slacks = 0 + rows = 0 + for task in level: + if task.softnessType is TaskSoftnessType.hard: + self.m_hard_r += 1 + elif task.softnessType is TaskSoftnessType.linear: + slacks += 1 + rows += task.task_size + self.m_nspace_r += task.task_size + elif task.softnessType is TaskSoftnessType.quadratic: + slacks += task.task_size + rows += task.task_size * 2 + self.m_nspace_r += task.task_size + self.m_wslack_r = max(self.m_wslack_r, rows) + self.m_slack = max(self.m_slack, slacks) + self._hs = self.N + self.m_slack + self._ss = self._hs + self.m_hard_r + self._ns = self._ss + self.m_wslack_r + + def reset(self): + if self.m_hard_r > 0: + self._A_hard[:] = 0 + if self.m_nspace_r > 0: + self._A_nullspace[:] = 0 + if self.m_wslack_r > 0: + self._A_wslack[:] = 0 + if self.m_slack > 0: + self._A_slacks[:] = 0 + self.slack_diag[:] = 0 + + self.u_hard_r = 0 + self.u_wslack_r = 0 + self.u_nspace_r = 0 + + self._level_index = 0 - # allocate constraint matrix and lb/ub vectors once - self.A = np.zeros((self.max_vars + max_constraints, self.max_vars)) - self.lb = np.zeros(self.max_vars + max_constraints) - self.ub = np.zeros(self.max_vars + max_constraints) + return self - # (fixed) direct variable constraints (slack and dq) - np.fill_diagonal(self.A[: self.max_vars, : self.max_vars], 1.0) - self.lb[self.max_slacks : self.max_vars] = lower # lower joint bounds - self.ub[self.max_slacks : self.max_vars] = upper # upper joint bounds + def set_dq_bounds(self, lower, upper): + self._l_dq[:] = lower + self._u_dq[:] = upper - self.lb[: self.max_slacks] = 0.0 # lower slack bounds - self.ub[: self.max_slacks] = 1.0 # upper slack bounds + def __next__(self): + if self._level_index >= len(self.sot): + return False - def __iter__(self): # initialize iterator - self._index = -1 - self.start_row = self.max_vars - return self + self._A_wslack[:] = 0 + self._A_slacks[:] = 0 + self.slack_diag[:] = 0 + self._l_slacks[:] = 0 + self._u_slacks[:] = 0 - def __next__(self): # forward iterator: return QP problem of next hierarchy level - self._index += 1 - if self._index == len(self.tasks): - raise StopIteration + self.u_wslack_r = 0 - # configure upper bound constraints: -∞ ≤ J dq - slack ≤ ub used_slacks = 0 - used_rows = self.start_row - for task in self.tasks[self._index]: - rows = slice(used_rows, used_rows + task.size) - self.A[rows, : self.N] = 0.0 - self.A[rows, -self.N :] = task.A - if isinstance(task, EQTaskDesc): - if task.is_hard: - self.lb[rows] = self.ub[rows] = task.upper - else: - used_slacks += 1 - self.A[rows, self.max_slacks - used_slacks] = task.upper - self.lb[rows] = self.ub[rows] = task.upper - else: # inequality task - self.lb[rows] = task.lower - self.ub[rows] = task.upper - - used_rows += task.size - - vars = slice(self.max_slacks - used_slacks, None) - rows = slice(self.max_slacks - used_slacks, used_rows) - return ( - self.H[vars, vars], - self.q[vars], - self.A[rows, vars], - self.lb[rows], - self.ub[rows], - ) - - def nullspace_constraint(self, x): - # Replace current task-level's constraints with nullspace constraint - # J' dq = fixed, i.e. don't become worse than current, higher-priority solution - level = self.tasks[self._index] - used_rows = np.sum([task.size for task in level]) - used_slacks = np.sum([task.is_hard for task in level]) - dq = x[-self.N :] - rows = slice(self.start_row, self.start_row + used_rows) - self.lb[rows] = self.ub[rows] = self.A[rows, -self.N :].dot(dq) # fix task delta - self.A[rows, : self.max_slacks] = 0 # zero slack contribution - - return dq, x[self.max_slacks - used_slacks : self.max_slacks] + for task in self.sot[self._level_index]: + task: TaskType + if task.softnessType is TaskSoftnessType.hard: + self._A_hard[self.u_hard_r : self.u_hard_r + task.task_size, :] = task.A + ( + self._l_hard[self.u_hard_r : self.u_hard_r + task.task_size], + self._u_hard[self.u_hard_r : self.u_hard_r + task.task_size], + ) = ( + (task.bound, task.bound) + if isinstance(task, EqTask) + else (task.lower_bound, task.upper_bound) + ) + self.u_hard_r += task.task_size + elif task.softnessType is TaskSoftnessType.linear and isinstance(task, EqTask): + s = self.u_wslack_r + e = s + task.task_size + + self.slack_diag[used_slacks] = 1 + self._l_slacks[used_slacks] = 0 + self._u_slacks[used_slacks] = 1 + + self._A_wslack[s:e, :] = task.A + self._A_slacks[s:e, used_slacks] = task.bound + self._l_slacked[s:e] = task.bound + self._u_slacked[s:e] = task.bound + + used_slacks += 1 + self.u_wslack_r += task.task_size + + elif task.softnessType is TaskSoftnessType.quadratic: + s = self.u_wslack_r + e = s + task.task_size + + self.slack_diag[used_slacks : used_slacks + task.task_size] = 1 + self._l_slacks[used_slacks : used_slacks + task.task_size] = np.NINF + self._u_slacks[used_slacks : used_slacks + task.task_size] = np.PINF + + self._A_wslack[s:e, :] = task.A + self._l_slacked[s:e] = ( + task.bound if isinstance(task, EqTask) else task.lower_bound + ) + self._u_slacked[s:e] = np.PINF + self._A_slacks[s:e][used_slacks : used_slacks + task.task_size] = np.identity( + task.task_size + ) + + s += task.task_size + e += task.task_size + + self._A_wslack[s:e, :] = task.A + self._A_slacks[s:e][ + used_slacks : used_slacks + task.task_size + ] = -np.identity(task.task_size) + self._l_slacked[s:e] = np.NINF + self._u_slacked[s:e] = ( + task.bound if isinstance(task, EqTask) else task.upper_bound + ) + + used_slacks += task.task_size + self.u_wslack_r += task.task_size * 2 + + self._level_index += 1 + return True + + def add_nullspace_constraint(self, solution_vector): + dq, slacks = solution_vector[self.m_slack :], solution_vector[: self.m_slack] + + for task in self.sot[self._level_index - 1]: + s = self.u_nspace_r + e = s + task.task_size + b = task.A.dot(dq) + + self._A_nullspace[s:e] = task.A + self._l_nullspace[s:e] = b + self._u_nullspace[s:e] = b + + self.u_nspace_r += task.task_size + + return dq, slacks + + @property + def A(self): + return self._A[ + np.r_[ + 0 : self.N + self.m_slack, + self._hs : self._hs + self.u_hard_r, + self._ss : self._ss + self.u_wslack_r, + self._ns : self._ns + self.u_nspace_r, + ], + :, + ] + + @property + def lower(self): + return self._lower[ + np.r_[ + : self.N + self.m_slack, + self._hs : self._hs + self.u_hard_r, + self._ss : self._ss + self.u_wslack_r, + self._ns : self._ns + self.u_nspace_r, + ], + ] + + @property + def upper(self): + return self._upper[ + np.r_[ + : self.N + self.m_slack, + self._hs : self._hs + self.u_hard_r, + self._ss : self._ss + self.u_wslack_r, + self._ns : self._ns + self.u_nspace_r, + ], + ] class HQPSolver(Solver): - def __init__(self, number_of_joints, solver=None, **options) -> None: + def __init__(self, number_of_joints, **options) -> None: super().__init__(number_of_joints) self.options = options - if solver == "components": - self.HQP = HQPComponentSlacks - else: - self.HQP = HQPLinearSlacks - def solve(self, tasks, lower_dq, upper_dq, **options): + self.hqp_mats = HQPMatrix(self.N, **options) + + def solve(self, stack_of_tasks, lower_dq, upper_dq, **options): dq = options.get("warmstart") task_residuals = [] - desc = self.HQP(self.N, tasks, lower_dq, upper_dq, **self.options) - for qp in desc: # iterate over QPs corresponding to hierarchy levels - # print("Level: {level}".format(level=desc._index)) - # print(np.vstack(qp[0])) # H - # print(np.hstack([qp[2], qp[3][:,np.newaxis], qp[4][:,np.newaxis]])) # A, lb, ub - sol = self._solve_qp(*qp, dq) + self.hqp_mats.reset() + self.hqp_mats.set_tasks(stack_of_tasks) + self.hqp_mats.set_dq_bounds(lower_dq, upper_dq) + + while next(self.hqp_mats): # iterate over QPs corresponding to hierarchy levels + sol = self._solve_qp( + self.hqp_mats.J, + self.hqp_mats.p, + self.hqp_mats.A, + self.hqp_mats.lower, + self.hqp_mats.upper, + dq, + ) if sol.info.status_val < 0: print(sol.info.status) # return None, None # TODO handling of failed task! else: - dq, slack = desc.nullspace_constraint(sol.x) + + dq, slack = self.hqp_mats.add_nullspace_constraint(sol.x) task_residuals.append(slack) return dq, task_residuals From b7f08aa105f60bfd7f0a7c2c2f29ac573709a5bc Mon Sep 17 00:00:00 2001 From: Felix Kolesch Date: Fri, 2 Sep 2022 18:21:14 +0200 Subject: [PATCH 4/7] =?UTF-8?q?=F0=9F=90=9B=20Fix=20wrong=20indices?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- python/stack_of_tasks/solver/HQPSolver.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/python/stack_of_tasks/solver/HQPSolver.py b/python/stack_of_tasks/solver/HQPSolver.py index bd29461..c49a132 100755 --- a/python/stack_of_tasks/solver/HQPSolver.py +++ b/python/stack_of_tasks/solver/HQPSolver.py @@ -283,7 +283,8 @@ def __next__(self): task.bound if isinstance(task, EqTask) else task.lower_bound ) self._u_slacked[s:e] = np.PINF - self._A_slacks[s:e][used_slacks : used_slacks + task.task_size] = np.identity( + + self._A_slacks[s:e, used_slacks : used_slacks + task.task_size] = np.identity( task.task_size ) @@ -291,8 +292,8 @@ def __next__(self): e += task.task_size self._A_wslack[s:e, :] = task.A - self._A_slacks[s:e][ - used_slacks : used_slacks + task.task_size + self._A_slacks[ + s:e, used_slacks : used_slacks + task.task_size ] = -np.identity(task.task_size) self._l_slacked[s:e] = np.NINF self._u_slacked[s:e] = ( From 7af5b0845e23b5be55685468b7d5fe9104b400cc Mon Sep 17 00:00:00 2001 From: Felix Kolesch Date: Fri, 2 Sep 2022 18:22:15 +0200 Subject: [PATCH 5/7] =?UTF-8?q?=F0=9F=90=9B=20Fix=20=20inverse=20solver=20?= =?UTF-8?q?to=20work=20with=20new=20taskclass?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- python/stack_of_tasks/solver/InverseJacobianSolver.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/python/stack_of_tasks/solver/InverseJacobianSolver.py b/python/stack_of_tasks/solver/InverseJacobianSolver.py index cca87f3..1879b35 100644 --- a/python/stack_of_tasks/solver/InverseJacobianSolver.py +++ b/python/stack_of_tasks/solver/InverseJacobianSolver.py @@ -1,5 +1,9 @@ +from typing import List + import numpy as np +from stack_of_tasks.tasks.Task import EqTask + from .AbstactSolver import Solver @@ -12,7 +16,7 @@ def __init__(self, number_of_joints, **options) -> None: def _invert_smooth_clip(self, s): return s / (self.threshold**2) if s < self.threshold else 1.0 / s - def solve(self, stack_of_tasks, lower_dq, upper_dq, **options): + def solve(self, stack_of_tasks: List[List[EqTask]], lower_dq, upper_dq, **options): N = np.identity(self.N) # nullspace projector of previous tasks JA = np.zeros((0, self.N)) # accumulated Jacobians qdot = np.zeros(self.N) @@ -22,7 +26,7 @@ def solve(self, stack_of_tasks, lower_dq, upper_dq, **options): # combine tasks of this level into one J = np.concatenate([task.A for task in task_level], axis=0) - e = np.concatenate([task.upper for task in task_level], axis=0) + e = np.concatenate([task.bound for task in task_level], axis=0) U, S, Vt = np.linalg.svd(J.dot(N)) # * self.joint_weights[None, :] From a68bbb04b4328ae32b70167fceb16ce9d0803cbe Mon Sep 17 00:00:00 2001 From: Felix Kolesch Date: Fri, 2 Sep 2022 18:23:56 +0200 Subject: [PATCH 6/7] =?UTF-8?q?=F0=9F=8E=A8=20Remove=20specific=20controll?= =?UTF-8?q?er=20callbacks,=20add=20simpler=20ones?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- python/stack_of_tasks/controller.py | 43 +++++---------- python/stack_of_tasks/multi_solver.py | 78 ++++++++++++++++++--------- 2 files changed, 66 insertions(+), 55 deletions(-) diff --git a/python/stack_of_tasks/controller.py b/python/stack_of_tasks/controller.py index 9acd200..fc626b6 100755 --- a/python/stack_of_tasks/controller.py +++ b/python/stack_of_tasks/controller.py @@ -32,11 +32,8 @@ def __init__( ): self.rate = rate - self.joint_state_callback = Callback() - self.T_callback = Callback() - self.J_callback = Callback() - - self.delta_q_callback = Callback() + self.joint_callback = Callback() + self.control_step_callback = Callback() self.robot = RobotModel(param=ns_prefix + "robot_description") @@ -45,8 +42,8 @@ def __init__( self.N = len(self.robot.active_joints) # number of (active) joints - self._T = np.zeros((4, 4)) - self._J = np.zeros((6, self.N)) + self.T = np.zeros((4, 4)) + self.J = np.zeros((6, self.N)) self._joint_position = np.zeros(self.N) self.task_hierarchy = TaskHierarchy() @@ -80,29 +77,11 @@ def _send_joint(joint_values): joint_msg.position = joint_values joint_pub.publish(joint_msg) - self.joint_state_callback.append(_send_joint) + self.joint_callback.append(lambda: _send_joint(self._joint_position)) self.last_dq = None self.reset() # trigger initialization callbacks - @property - def T(self): - return self._T - - @T.setter - def T(self, val): - self._T = val - self.T_callback(self._T) - - @property - def J(self): - return self._J - - @J.setter - def J(self, val): - self._J = val - self.J_callback(self._J) - @property def joint_position(self): return self._joint_position @@ -110,13 +89,13 @@ def joint_position(self): @joint_position.setter def joint_position(self, values): self._joint_position = values - self.joint_state_callback(values) T, J = self.robot.fk(self.target_link, values) T = T.dot(self.target_offset) self.T = T self.J = J + self.joint_callback() def reset(self, randomness=0): # center = 0.5 * (self.maxs + self.mins) @@ -126,7 +105,6 @@ def reset(self, randomness=0): def actuate(self, q_delta): self.joint_position += q_delta.ravel() - self.delta_q_callback(q_delta) def check_end(self): pass @@ -147,6 +125,7 @@ def hierarchic_control(self, targets): self.actuate(dq) # return tcr[0] < 1e-12 or all(i < 1e-8 for i in tcr) + self.control_step_callback() return False @@ -189,10 +168,12 @@ def set_target(name, data): targets = {} c = Controller(solver_class=HQPSolver, rho=0.1) - c.T_callback.append(lambda T: set_target("T", T)) - c.J_callback.append(lambda J: set_target("J", J)) + + c.control_step_callback.append(lambda: set_target("T", c.T)) + c.control_step_callback.append(lambda: set_target("J", c.J)) c.reset() + c.control_step_callback() mc = MarkerControl() mc.marker_data_callback.append(set_target) @@ -210,7 +191,7 @@ def set_target(name, data): ori = OrientationTask( 1, - TaskSoftnessType.linear, + TaskSoftnessType.quadratic, ) ori.set_argument_mapping("current", "T") ori.set_argument_mapping("target", marker.name) diff --git a/python/stack_of_tasks/multi_solver.py b/python/stack_of_tasks/multi_solver.py index b72b6ce..aded4df 100755 --- a/python/stack_of_tasks/multi_solver.py +++ b/python/stack_of_tasks/multi_solver.py @@ -1,13 +1,15 @@ #!/usr/bin/env python3 +import numpy as np +import rospy +from tf import transformations as tf + from stack_of_tasks.controller import Controller from stack_of_tasks.plot.plot_publisher import PlotPublisher from stack_of_tasks.solver.HQPSolver import HQPSolver from stack_of_tasks.solver.InverseJacobianSolver import InverseJacobianSolver -from stack_of_tasks.tasks.Tasks import ConeTask, OrientationTask, PositionTask -import numpy as np -from tf import transformations as tf -import rospy +from stack_of_tasks.tasks.Eq_Tasks import OrientationTask, PositionTask +from stack_of_tasks.tasks.Task import TaskSoftnessType class Main: @@ -19,9 +21,42 @@ def __init__(self): self.plot = PlotPublisher() opt = {"rho": 1.0} - self.add_controller(HQPSolver, "new", **opt) - self.add_controller(HQPSolver, "old", solver="components", **opt) - self.add_controller(InverseJacobianSolver, "jac", **opt) + c1 = self.add_controller(HQPSolver, "new", **opt) + + p1 = PositionTask(1, TaskSoftnessType.linear) + p1.argmap["current"] = "T_new" + p1.argmap["J"] = "J_new" + + o1 = OrientationTask(1, TaskSoftnessType.linear) + o1.argmap["current"] = "T_new" + o1.argmap["J"] = "J_new" + + c1.task_hierarchy.add_task_lower(p1) + c1.task_hierarchy.add_task_same(o1) + + c2 = self.add_controller(HQPSolver, "old", **opt) + p2 = PositionTask(1, TaskSoftnessType.quadratic) + p2.argmap["current"] = "T_old" + p2.argmap["J"] = "J_old" + + o2 = OrientationTask(1, TaskSoftnessType.quadratic) + o2.argmap["current"] = "T_old" + o2.argmap["J"] = "J_old" + + c2.task_hierarchy.add_task_lower(p2) + c2.task_hierarchy.add_task_same(o2) + + c3 = self.add_controller(InverseJacobianSolver, "jac", **opt) + p3 = PositionTask(1, TaskSoftnessType.quadratic) + p3.argmap["current"] = "T_jac" + p3.argmap["J"] = "J_jac" + + o3 = OrientationTask(1, TaskSoftnessType.quadratic) + o3.argmap["current"] = "T_jac" + o3.argmap["J"] = "J_jac" + + c3.task_hierarchy.add_task_lower(p3) + c3.task_hierarchy.add_task_same(o3) def set_target(self, name, data): self.targets[name] = data @@ -31,29 +66,24 @@ def add_controller(self, solverclass, name, **options): solverclass, target_link=f"{name}_joint8", ns_prefix=f"{name}/", **options ) self.controller[name] = controller - controller.T_callback.append(lambda T: self.set_target(f"T_{name}", T)) - controller.J_callback.append(lambda J: self.set_target(f"J_{name}", J)) - - pos = PositionTask(1.0) - pos.argmap["T_c"] = f"T_{name}" - pos.argmap["J"] = f"J_{name}" - - ori = OrientationTask(1.0) - ori.argmap["T_c"] = f"T_{name}" - ori.argmap["J"] = f"J_{name}" - controller.task_hierarchy.add_task_lower(pos) - controller.task_hierarchy.add_task_same(ori) + controller.joint_callback.append(lambda: self.set_target(f"T_{name}", controller.T)) + controller.joint_callback.append(lambda: self.set_target(f"J_{name}", controller.J)) self.plot.add_plot( f"{name}/dq", [f"{name}/dq/{joint.name}" for joint in controller.robot.active_joints], ) - controller.delta_q_callback.append(lambda dq: self.plot.plot(f"{name}/dq", dq)) - controller.delta_q_callback.append(lambda dq: self.set_target(f"{name}_dq", dq)) + controller.control_step_callback.append( + lambda: self.plot.plot(f"{name}/dq", controller.last_dq) + ) + controller.control_step_callback.append( + lambda: self.set_target(f"{name}_dq", controller.last_dq) + ) controller.reset() + return controller def run(self): rate = rospy.Rate(50) @@ -66,7 +96,7 @@ def run(self): t = [t_1, t_2] i = 0 - self.targets["T_t"] = t[i] + self.targets["target"] = t[i] while not rospy.is_shutdown(): for c in self.controller.values(): @@ -75,13 +105,13 @@ def run(self): all_close = True for n in self.controller.keys(): all_close &= np.allclose( - self.targets[f"T_{n}"], self.targets["T_t"], atol=1e-4 + self.targets[f"T_{n}"], self.targets["target"], atol=1e-4 ) all_close &= np.allclose(self.targets[f"{n}_dq"], 0, atol=1e-10) if all_close: i = (i + 1) % 2 - self.targets["T_t"] = t[i] + self.targets["target"] = t[i] print("Next target", i) rospy.sleep(1) From 1a77755ece5a4356e51b4d567e72b555241d958d Mon Sep 17 00:00:00 2001 From: Felix Kolesch Date: Fri, 2 Sep 2022 18:29:31 +0200 Subject: [PATCH 7/7] =?UTF-8?q?=F0=9F=A7=B1=20Update=20tool=20settings?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .pre-commit-config.yaml | 1 + pyproject.toml | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 1229310..dd7eb2f 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -33,6 +33,7 @@ repos: rev: 22.6.0 hooks: - id: black + - id: isort - repo: https://github.com/PyCQA/pylint rev: v2.14.5 diff --git a/pyproject.toml b/pyproject.toml index fd9741e..fd7cc02 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [tool.black] line-length = 94 -[tool.pylint] +[tool.pylint.'MESSAGES CONTROL'] max-line-length = 94 disable = [ "C0103", # (invalid-name) @@ -9,3 +9,6 @@ disable = [ "C0115", # (missing-class-docstring) "C0116", # (missing-function-docstring) ] + +[tool.isort] +profile = "black"