Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task classes and plotting callbacks #9

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ repos:
rev: 22.6.0
hooks:
- id: black
- id: isort

- repo: https://github.com/PyCQA/pylint
rev: v2.14.5
Expand Down
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
[tool.black]
line-length = 94

[tool.pylint]
[tool.pylint.'MESSAGES CONTROL']
max-line-length = 94
disable = [
"C0103", # (invalid-name)
"C0114", # (missing-module-docstring)
"C0115", # (missing-class-docstring)
"C0116", # (missing-function-docstring)
]

[tool.isort]
profile = "black"
76 changes: 31 additions & 45 deletions python/stack_of_tasks/controller.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
#!/usr/bin/env python3

import random

import numpy as np
import rospy
from interactive_markers.interactive_marker_server import InteractiveMarkerServer
from sensor_msgs.msg import JointState
from std_msgs.msg import Header
from tf import transformations as tf

from stack_of_tasks.marker.interactive_marker import IAMarker
from stack_of_tasks.robot_model import RobotModel, JointType
from stack_of_tasks.robot_model import RobotModel
from stack_of_tasks.solver.AbstactSolver import Solver
from stack_of_tasks.solver.HQPSolver import HQPSolver
from stack_of_tasks.solver.InverseJacobianSolver import InverseJacobianSolver
Expand All @@ -35,11 +32,8 @@ def __init__(
):
self.rate = rate

self.joint_state_callback = Callback()
self.T_callback = Callback()
self.J_callback = Callback()

self.delta_q_callback = Callback()
self.joint_callback = Callback()
self.control_step_callback = Callback()

self.robot = RobotModel(param=ns_prefix + "robot_description")

Expand All @@ -48,8 +42,8 @@ def __init__(

self.N = len(self.robot.active_joints) # number of (active) joints

self._T = np.zeros((4, 4))
self._J = np.zeros((6, self.N))
self.T = np.zeros((4, 4))
self.J = np.zeros((6, self.N))
self._joint_position = np.zeros(self.N)

self.task_hierarchy = TaskHierarchy()
Expand Down Expand Up @@ -83,43 +77,25 @@ def _send_joint(joint_values):
joint_msg.position = joint_values
joint_pub.publish(joint_msg)

self.joint_state_callback.append(_send_joint)
self.joint_callback.append(lambda: _send_joint(self._joint_position))

self.last_dq = None
self.reset() # trigger initialization callbacks

@property
def T(self):
return self._T

@T.setter
def T(self, val):
self._T = val
self.T_callback(self._T)

@property
def J(self):
return self._J

@J.setter
def J(self, val):
self._J = val
self.J_callback(self._J)

@property
def joint_position(self):
return self._joint_position

@joint_position.setter
def joint_position(self, values):
self._joint_position = values
self.joint_state_callback(values)

T, J = self.robot.fk(self.target_link, values)
T = T.dot(self.target_offset)

self.T = T
self.J = J
self.joint_callback()

def reset(self, randomness=0):
# center = 0.5 * (self.maxs + self.mins)
Expand All @@ -129,7 +105,6 @@ def reset(self, randomness=0):

def actuate(self, q_delta):
self.joint_position += q_delta.ravel()
self.delta_q_callback(q_delta)

def check_end(self):
pass
Expand All @@ -138,16 +113,19 @@ def hierarchic_control(self, targets):
lb = np.maximum(-0.01, (self.mins * 0.95 - self._joint_position) / self.rate)
ub = np.minimum(0.01, (self.maxs * 0.95 - self._joint_position) / self.rate)

tasks = self.task_hierarchy.compute(targets)
self.task_hierarchy.compute(targets)

dq, tcr = self.solver.solve(tasks, lb, ub, warmstart=self.last_dq)
dq, tcr = self.solver.solve(
self.task_hierarchy.hierarchy, lb, ub, warmstart=self.last_dq
)

self.last_dq = dq

if dq is not None:
self.actuate(dq)
# return tcr[0] < 1e-12 or all(i < 1e-8 for i in tcr)

self.control_step_callback()
return False


Expand All @@ -174,8 +152,10 @@ def delete_marker(self, name):

if __name__ == "__main__":
from stack_of_tasks.marker.markers import SixDOFMarker
from stack_of_tasks.tasks.Tasks import ConeTask, OrientationTask, PositionTask
from stack_of_tasks.plot.plot_publisher import PlotPublisher
from stack_of_tasks.tasks.Eq_Tasks import JointPos, OrientationTask, PositionTask
from stack_of_tasks.tasks.Task import TaskSoftnessType

# from stack_of_tasks.tasks.Ieq_Tasks import ConeTask

np.set_printoptions(precision=3, suppress=True, linewidth=100, floatmode="fixed")

Expand All @@ -188,30 +168,36 @@ def set_target(name, data):
targets = {}

c = Controller(solver_class=HQPSolver, rho=0.1)
c.T_callback.append(lambda T: set_target("T", T))
c.J_callback.append(lambda J: set_target("J", J))

c.control_step_callback.append(lambda: set_target("T", c.T))
c.control_step_callback.append(lambda: set_target("J", c.J))

c.reset()
c.control_step_callback()

mc = MarkerControl()
mc.marker_data_callback.append(set_target)
marker = SixDOFMarker(name="pose", scale=0.1, pose=targets["T"])
mc.add_marker(marker, marker.name)

# setup tasks
pos = PositionTask(1.0)
pos.argmap["T_c"] = "T"
pos.argmap["T_t"] = marker.name
pos = PositionTask(1, TaskSoftnessType.linear)
pos.set_argument_mapping("current", "T")
pos.set_argument_mapping("target", marker.name)

# cone = ConeTask((0, 0, 1), (0, 0, 1), 0.1)
# cone.argmap["T_t"] = "Position"
# cone.argmap["angle"] = "Cone_angle"

ori = OrientationTask(1.0)
ori.argmap["T_c"] = "T"
ori.argmap["T_t"] = marker.name
ori = OrientationTask(
1,
TaskSoftnessType.quadratic,
)
ori.set_argument_mapping("current", "T")
ori.set_argument_mapping("target", marker.name)

c.task_hierarchy.add_task_lower(pos)
c.task_hierarchy.add_task_same(ori)
c.task_hierarchy.add_task_lower(ori)

# pp = PlotPublisher()
# pp.add_plot("q", [f"q/{joint.name}" for joint in c.robot.active_joints])
Expand Down
78 changes: 54 additions & 24 deletions python/stack_of_tasks/multi_solver.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
#!/usr/bin/env python3

import numpy as np
import rospy
from tf import transformations as tf

from stack_of_tasks.controller import Controller
from stack_of_tasks.plot.plot_publisher import PlotPublisher
from stack_of_tasks.solver.HQPSolver import HQPSolver
from stack_of_tasks.solver.InverseJacobianSolver import InverseJacobianSolver
from stack_of_tasks.tasks.Tasks import ConeTask, OrientationTask, PositionTask
import numpy as np
from tf import transformations as tf
import rospy
from stack_of_tasks.tasks.Eq_Tasks import OrientationTask, PositionTask
from stack_of_tasks.tasks.Task import TaskSoftnessType


class Main:
Expand All @@ -19,9 +21,42 @@ def __init__(self):
self.plot = PlotPublisher()

opt = {"rho": 1.0}
self.add_controller(HQPSolver, "new", **opt)
self.add_controller(HQPSolver, "old", solver="components", **opt)
self.add_controller(InverseJacobianSolver, "jac", **opt)
c1 = self.add_controller(HQPSolver, "new", **opt)

p1 = PositionTask(1, TaskSoftnessType.linear)
p1.argmap["current"] = "T_new"
p1.argmap["J"] = "J_new"

o1 = OrientationTask(1, TaskSoftnessType.linear)
o1.argmap["current"] = "T_new"
o1.argmap["J"] = "J_new"

c1.task_hierarchy.add_task_lower(p1)
c1.task_hierarchy.add_task_same(o1)

c2 = self.add_controller(HQPSolver, "old", **opt)
p2 = PositionTask(1, TaskSoftnessType.quadratic)
p2.argmap["current"] = "T_old"
p2.argmap["J"] = "J_old"

o2 = OrientationTask(1, TaskSoftnessType.quadratic)
o2.argmap["current"] = "T_old"
o2.argmap["J"] = "J_old"

c2.task_hierarchy.add_task_lower(p2)
c2.task_hierarchy.add_task_same(o2)

c3 = self.add_controller(InverseJacobianSolver, "jac", **opt)
p3 = PositionTask(1, TaskSoftnessType.quadratic)
p3.argmap["current"] = "T_jac"
p3.argmap["J"] = "J_jac"

o3 = OrientationTask(1, TaskSoftnessType.quadratic)
o3.argmap["current"] = "T_jac"
o3.argmap["J"] = "J_jac"

c3.task_hierarchy.add_task_lower(p3)
c3.task_hierarchy.add_task_same(o3)

def set_target(self, name, data):
self.targets[name] = data
Expand All @@ -31,29 +66,24 @@ def add_controller(self, solverclass, name, **options):
solverclass, target_link=f"{name}_joint8", ns_prefix=f"{name}/", **options
)
self.controller[name] = controller
controller.T_callback.append(lambda T: self.set_target(f"T_{name}", T))
controller.J_callback.append(lambda J: self.set_target(f"J_{name}", J))

pos = PositionTask(1.0)
pos.argmap["T_c"] = f"T_{name}"
pos.argmap["J"] = f"J_{name}"

ori = OrientationTask(1.0)
ori.argmap["T_c"] = f"T_{name}"
ori.argmap["J"] = f"J_{name}"

controller.task_hierarchy.add_task_lower(pos)
controller.task_hierarchy.add_task_same(ori)
controller.joint_callback.append(lambda: self.set_target(f"T_{name}", controller.T))
controller.joint_callback.append(lambda: self.set_target(f"J_{name}", controller.J))

self.plot.add_plot(
f"{name}/dq",
[f"{name}/dq/{joint.name}" for joint in controller.robot.active_joints],
)

controller.delta_q_callback.append(lambda dq: self.plot.plot(f"{name}/dq", dq))
controller.delta_q_callback.append(lambda dq: self.set_target(f"{name}_dq", dq))
controller.control_step_callback.append(
lambda: self.plot.plot(f"{name}/dq", controller.last_dq)
)
controller.control_step_callback.append(
lambda: self.set_target(f"{name}_dq", controller.last_dq)
)

controller.reset()
return controller

def run(self):
rate = rospy.Rate(50)
Expand All @@ -66,7 +96,7 @@ def run(self):

t = [t_1, t_2]
i = 0
self.targets["T_t"] = t[i]
self.targets["target"] = t[i]

while not rospy.is_shutdown():
for c in self.controller.values():
Expand All @@ -75,13 +105,13 @@ def run(self):
all_close = True
for n in self.controller.keys():
all_close &= np.allclose(
self.targets[f"T_{n}"], self.targets["T_t"], atol=1e-4
self.targets[f"T_{n}"], self.targets["target"], atol=1e-4
)
all_close &= np.allclose(self.targets[f"{n}_dq"], 0, atol=1e-10)

if all_close:
i = (i + 1) % 2
self.targets["T_t"] = t[i]
self.targets["target"] = t[i]
print("Next target", i)
rospy.sleep(1)

Expand Down
Loading