-
Notifications
You must be signed in to change notification settings - Fork 32
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
added wifi and collision monitor packages as well as new collision av…
…oidance appraoch called Reciprocal Velocity Object
- Loading branch information
1 parent
ae037f3
commit 3679bf2
Showing
17 changed files
with
6,070 additions
and
0 deletions.
There are no files selected for viewing
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,245 @@ | ||
#!/usr/bin/env python | ||
import math | ||
import numpy as np | ||
import random | ||
|
||
import rospy | ||
from actionlib import SimpleActionClient | ||
from std_msgs.msg import String | ||
from nav_msgs.srv import GetPlan, GetPlanRequest | ||
from geometry_msgs.msg import PoseWithCovarianceStamped, Twist | ||
from move_base_msgs.msg import MoveBaseGoal, MoveBaseAction | ||
from enum import Enum | ||
from actionlib_msgs.msg import GoalStatusArray | ||
|
||
|
||
class RobotState(Enum): | ||
NORMAL = 1 | ||
AVOIDANCE = 2 | ||
RETURN_TO_PATH = 3 | ||
|
||
|
||
class BWIbot: | ||
def __init__(self): | ||
self.opponent_location = np.zeros(2) # (x, y) | ||
self.robot_pose = None | ||
self.other_robot_pose = None | ||
|
||
self.robot_vel = Twist() | ||
self.other_robot_prev_pose = None | ||
self.other_robot_vel = Twist() | ||
self.other_robot_last_time = rospy.Time.now() | ||
|
||
# Define move_base | ||
self.move_base = SimpleActionClient("/move_base", MoveBaseAction) | ||
connected = self.move_base.wait_for_server(timeout=rospy.Duration(60.0)) | ||
if not connected: | ||
raise TimeoutError("MoveBase does not respond! Please retry after reboot segway base.") | ||
self.goal = MoveBaseGoal() | ||
self.goal.target_pose.header.frame_id = "level_mux_map" | ||
|
||
# Define ROS services | ||
self.make_plan_srv = rospy.ServiceProxy("/move_base/NavfnROS/make_plan", GetPlan) | ||
|
||
# Define subscribers | ||
self.__sub_opponent_amcl = rospy.Subscriber("/comms/amcl_pose", PoseWithCovarianceStamped, self.amcl_cb) | ||
self.__sub_wifi_condition = rospy.Subscriber("/iswifion", String, self.wifi_cb) | ||
self.__sub_robot_amcl = rospy.Subscriber("/amcl_pose", PoseWithCovarianceStamped, self.robot_amcl_cb) | ||
self.__sub_move_base_status = rospy.Subscriber('/move_base/status', GoalStatusArray, self.move_base_status_cb) | ||
|
||
self.__sub_robot_cmd_vel = rospy.Subscriber('/cmd_vel', Twist, self.robot_vel_callback) | ||
self.vel_pub = rospy.Publisher('/cmd_vel', Twist, queue_size=10) | ||
|
||
# right lane added params | ||
self.detection_range = 8.0 # in meters | ||
self.original_goal = None | ||
self.robot_state = RobotState.NORMAL | ||
|
||
# wifi stuff | ||
self.is_wifi_on = False | ||
|
||
self.robot_moving = False | ||
self.was_stopped = False | ||
|
||
self.in_collision = False | ||
self.goal_cancelled = False | ||
self.is_rec_vel_used = False | ||
|
||
def robot_vel_callback(self, msg): | ||
self.robot_vel = msg | ||
|
||
def amcl_cb(self, msg): | ||
# self.opponent_location[0] = amcl.pose.pose.position.x | ||
# self.opponent_location[1] = amcl.pose.pose.position.y | ||
# self.opponent_robot_pose = amcl.pose | ||
|
||
self.other_robot_prev_pose = self.other_robot_pose | ||
self.other_robot_pose = msg.pose.pose | ||
|
||
if self.other_robot_prev_pose: | ||
current_time = rospy.Time.now() | ||
dt = (current_time - self.other_robot_last_time).to_sec() | ||
|
||
self.other_robot_last_time = current_time | ||
|
||
dx = self.other_robot_pose.position.x - self.other_robot_prev_pose.position.x | ||
dy = self.other_robot_pose.position.y - self.other_robot_prev_pose.position.y | ||
|
||
self.other_robot_vel.linear.x = dx / dt | ||
self.other_robot_vel.linear.y = dy / dt | ||
|
||
def robot_amcl_cb(self, msg): | ||
self.robot_pose = msg.pose.pose | ||
|
||
def wifi_cb(self, wifi_msg): | ||
if wifi_msg.data in [str(True), str(False)]: | ||
self.is_wifi_on = wifi_msg.data == str(True) | ||
|
||
if not self.is_wifi_on and self.robot_moving: | ||
rospy.loginfo("WiFi dropped, canceling the goal") | ||
self.move_base.cancel_goal() | ||
self.was_stopped = True | ||
rospy.sleep(3.0) | ||
|
||
if self.is_wifi_on and not self.robot_moving and self.was_stopped: | ||
self.was_stopped = False | ||
|
||
def move_base_status_cb(self, msg): | ||
self.robot_moving = any(goal_status.status == 1 for goal_status in msg.status_list) | ||
|
||
def send_goal_and_wait(self, x, y, yaw): | ||
self.goal.target_pose.header.stamp = rospy.Time.now() | ||
self.goal.target_pose.pose.position.x = x | ||
self.goal.target_pose.pose.position.y = y | ||
self.goal.target_pose.pose.orientation.z = math.sin(yaw / 2.) | ||
self.goal.target_pose.pose.orientation.w = math.cos(yaw / 2.) | ||
|
||
start_time = rospy.Time.now() | ||
self.move_base.send_goal( | ||
goal=self.goal, | ||
feedback_cb=self.feedback_cb, | ||
) | ||
self.move_base.wait_for_result() | ||
ttd = (rospy.Time.now() - start_time).to_sec() | ||
|
||
return ttd | ||
|
||
def feedback_cb(self, feedback): | ||
# curr_pose = feedback.base_position.pose | ||
|
||
# plan = self.get_plan_to_goal(curr_pose) | ||
|
||
# try: | ||
# idx = np.argwhere(np.linalg.norm(plan - self.opponent_location, axis=1) < 0.5)[0][0] | ||
# dist = np.linalg.norm(plan[1:idx + 1] - plan[0:idx], axis=1).sum() | ||
# if dist < self.detection_range and self.robot_state == RobotState.NORMAL: | ||
# self.original_goal = self.goal | ||
# rospy.loginfo("Canceling the goal for avoidance") | ||
# self.move_base.cancel_goal() | ||
# self.robot_state = RobotState.AVOIDANCE_RIGHT | ||
# rospy.sleep(5.0) | ||
|
||
# except IndexError: | ||
# pass | ||
self.avoid_collision() | ||
|
||
def get_plan_to_goal(self, curr_pose): | ||
req = GetPlanRequest() | ||
req.start.header.frame_id = "level_mux_map" | ||
req.goal.header.frame_id = "level_mux_map" | ||
req.start.pose = curr_pose | ||
req.goal.pose = self.goal.target_pose.pose | ||
|
||
rospy.wait_for_service("/move_base/NavfnROS/make_plan") | ||
try: | ||
plan_msg = self.make_plan_srv(req) | ||
plan = np.array( | ||
[[p.pose.position.x, p.pose.position.y] for p in plan_msg.plan.poses] | ||
) | ||
return plan | ||
except rospy.ServiceException: | ||
rospy.logerr("Make plan service call failed.") | ||
|
||
def detect_collision(self): | ||
if not self.robot_pose or not self.other_robot_pose: | ||
return False | ||
|
||
distance = ((self.robot_pose.position.x - self.other_robot_pose.position.x) ** 2 + | ||
(self.robot_pose.position.y - self.other_robot_pose.position.y) ** 2) ** 0.5 | ||
|
||
future_distance = ((self.robot_pose.position.x + self.robot_vel.linear.x - self.other_robot_pose.position.x - self.other_robot_vel.linear.x) ** 2 + | ||
(self.robot_pose.position.y + self.robot_vel.linear.y - self.other_robot_pose.position.y - self.other_robot_vel.linear.y) ** 2) ** 0.5 | ||
|
||
return distance < 8 and future_distance < 8 # Distance threshold for collision | ||
|
||
def avoid_collision(self): | ||
if self.detect_collision(): | ||
if not self.goal_cancelled: | ||
self.move_base.cancel_goal() | ||
self.goal_cancelled = True | ||
self.prev_goal = self.goal | ||
print("Canceling the goal!") | ||
|
||
self.in_collision = True | ||
# self.robot_state = RobotState.AVOIDANCE | ||
rospy.loginfo("Collision detected. Adjusting velocity.") | ||
|
||
adjusted_vel = Twist() | ||
|
||
angle = math.atan2(self.other_robot_pose.position.y - self.robot_pose.position.y, | ||
self.other_robot_pose.position.x - self.robot_pose.position.x) | ||
|
||
avoidance_factor = 0.1 | ||
adjusted_vel.linear.x = self.robot_vel.linear.x + avoidance_factor * math.cos(angle + math.pi / 2) | ||
adjusted_vel.linear.y = self.robot_vel.linear.y + avoidance_factor * math.sin(angle + math.pi / 2) | ||
|
||
self.is_rec_vel_used = True | ||
self.vel_pub.publish(adjusted_vel) | ||
else: | ||
self.in_collision = False | ||
self.goal_cancelled = False | ||
# self.vel_pub.publish(self.robot_vel) | ||
|
||
|
||
# def return_to_path(self): | ||
# rospy.loginfo("Returning to original path") | ||
# self.robot_state = RobotState.NORMAL | ||
# if self.original_goal: | ||
# o_goal_x = self.original_goal.target_pose.pose.position.x | ||
# o_goal_y = self.original_goal.target_pose.pose.position.y | ||
# o_goal_w = self.original_goal.target_pose.pose.orientation.w | ||
# self.send_goal_and_wait(o_goal_x, o_goal_y, o_goal_w) | ||
|
||
#Hallway Simulation | ||
DOOR_LIST = dict( | ||
d2_124a=[6.57340180371, -3.8912084219, 0.686708693059], | ||
d2_124b=[6.57021787292, 12.5941815672, 0.745292404596] | ||
) | ||
|
||
if __name__ == '__main__': | ||
rospy.init_node("visit_door_list_ahg_multiple") | ||
|
||
robot = BWIbot() | ||
|
||
try: | ||
prev_door = None | ||
while not rospy.is_shutdown(): | ||
if robot.is_wifi_on and not robot.robot_moving: | ||
# if robot.robot_state == RobotState.NORMAL: | ||
next_door = random.choice(list(DOOR_LIST.keys())) | ||
if prev_door == next_door: | ||
continue | ||
|
||
prev_door = next_door | ||
x, y, yaw = DOOR_LIST[next_door] | ||
rospy.loginfo("Heading to {}".format(next_door)) | ||
if robot.is_rec_vel_used: | ||
robot.send_goal_and_wait(robot.prev_goal.target_pose.pose.position.x, robot.prev_goal.target_pose.pose.position.y, robot.prev_goal.target_pose.pose.orientation.w) | ||
robot.is_rec_vel_used = False | ||
else: | ||
robot.send_goal_and_wait(x, y, yaw) | ||
rospy.sleep(5.0) | ||
finally: | ||
# wrapup | ||
# robot.rec.bag.close() | ||
pass |
Oops, something went wrong.