Skip to content

Commit

Permalink
Merge pull request #1022 from wkentaro/jsk-tools-sanity-lib
Browse files Browse the repository at this point in the history
[jsk_tools] Check msg type is same as published one
  • Loading branch information
garaemon committed Jul 3, 2015
2 parents 4450846 + 6691ecd commit 67efb08
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 4 deletions.
1 change: 1 addition & 0 deletions jsk_tools/src/jsk_tools/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import shellblock_directive
import video_directive
import sanity_lib


18 changes: 14 additions & 4 deletions jsk_tools/src/jsk_tools/sanity_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,16 @@ def colored(string, color):
class TopicPublishedChecker():
is_topic_published = False
is_topic_published_lock = Lock()
def __init__(self, topic_name, timeout = 5, echo = False):
def __init__(self, topic_name, timeout=5, echo=False, data_class=None):
self.topic_name = topic_name
self.timeout = timeout
self.launched_time = rospy.Time.now()
self.first_time_callback = True
self.echo = echo
print " Checking %s" % (topic_name)
msg_class, _, _ = rostopic.get_topic_class(topic_name, blocking=True)
if (data_class is not None) and (msg_class is not data_class):
raise rospy.ROSException('Topic msg type is different.')
self.sub = rospy.Subscriber(topic_name, msg_class, self.callback)
def callback(self, msg):
with self.is_topic_published_lock:
Expand Down Expand Up @@ -85,17 +87,25 @@ def check(self):
finally:
self.sub.unregister()

def checkTopicIsPublished(topic_name, class_name,
def checkTopicIsPublished(topic_name, class_name = None,
ok_message = "",
error_message = "",
timeout = 1,
other_topics = [],
echo = False):
"""
@type class_name: type
@property class_name:
ROS message data class.
if not None, it checks if msg type is same as published one.
"""
checkers = []
checkers.append(TopicPublishedChecker(topic_name, timeout, echo))
checkers.append(TopicPublishedChecker(topic_name, timeout, echo,
data_class=class_name))
if other_topics:
for (tpc_name, cls) in other_topics:
checkers.append(TopicPublishedChecker(tpc_name, timeout, echo))
checkers.append(TopicPublishedChecker(tpc_name, timeout, echo,
data_class=class_name))
all_success = True
for checker in checkers:
if not checker.check():
Expand Down

0 comments on commit 67efb08

Please sign in to comment.