Skip to content

Commit

Permalink
#1577 revisited: Fix dynamic windowing for Topic Statistics (#1695)
Browse files Browse the repository at this point in the history
* Add failing tests for topic statistics frequency for rospy and roscpp

* Fix TopicStatistics dynamic windowing to adjust evaluation frequency in the right direction

* test_roscpp: fixed topic_statistic_frequency

* test_roscpp/topic_statistic_frequency: cleanup
  • Loading branch information
cwecht authored and dirk-thomas committed Aug 12, 2019
1 parent ab67be5 commit 8d9af41
Show file tree
Hide file tree
Showing 12 changed files with 367 additions and 14 deletions.
6 changes: 4 additions & 2 deletions clients/roscpp/include/ros/statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,13 @@ class ROSCPP_DECL StatisticsLogger

private:

// these are hard constrains
// Range of window length, in seconds
int max_window;
int min_window;

// these are soft constrains
// Range of acceptable messages in window.
// Window size will be adjusted if number of observed is
// outside this range.
int max_elements;
int min_elements;

Expand Down
7 changes: 4 additions & 3 deletions clients/roscpp/src/libros/statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ void StatisticsLogger::callback(const boost::shared_ptr<M_string>& connection_he
}

// should publish new statistics?
if (stats.last_publish + ros::Duration(1 / pub_frequency_) < received_time)
double pub_period = 1.0 / pub_frequency_;
if (stats.last_publish + ros::Duration(pub_period) < received_time)
{
ros::Time window_start = stats.last_publish;
stats.last_publish = received_time;
Expand Down Expand Up @@ -236,11 +237,11 @@ void StatisticsLogger::callback(const boost::shared_ptr<M_string>& connection_he
pub_.publish(msg);

// dynamic window resizing
if (stats.arrival_time_list.size() > static_cast<size_t>(max_elements) && pub_frequency_ * 2 <= max_window)
if (stats.arrival_time_list.size() > static_cast<size_t>(max_elements) && pub_period / 2.0 >= min_window)
{
pub_frequency_ *= 2;
}
if (stats.arrival_time_list.size() < static_cast<size_t>(min_elements) && pub_frequency_ / 2 >= min_window)
if (stats.arrival_time_list.size() < static_cast<size_t>(min_elements) && pub_period * 2.0 <= max_window)
{
pub_frequency_ /= 2;
}
Expand Down
17 changes: 9 additions & 8 deletions clients/rospy/src/rospy/impl/statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,15 @@ def read_parameters(self):
Fetch window parameters from parameter server
"""

# Range of window length, in seconds
self.min_elements = rospy.get_param("/statistics_window_min_elements", 10)
self.max_elements = rospy.get_param("/statistics_window_max_elements", 100)

# Range of acceptable messages in window.
# Window size will be adjusted if number of observed is
# outside this range.
self.max_window = rospy.get_param("/statistics_window_max_size", 64)
self.min_elements = rospy.get_param("/statistics_window_min_elements", 10)
self.max_elements = rospy.get_param("/statistics_window_max_elements", 100)

# Range of window length, in seconds
self.min_window = rospy.get_param("/statistics_window_min_size", 4)
self.max_window = rospy.get_param("/statistics_window_max_size", 64)

def callback(self, msg, publisher, stat_bytes):
"""
Expand Down Expand Up @@ -208,9 +208,10 @@ def sendStatistics(self, subscriber_statistics_logger):
self.pub.publish(msg)

# adjust window, if message count is not appropriate.
if len(self.arrival_time_list_) > subscriber_statistics_logger.max_elements and self.pub_frequency.to_sec() * 2 <= subscriber_statistics_logger.max_window:
pub_period = 1.0 / self.pub_frequency.to_sec()
if len(self.arrival_time_list_) > subscriber_statistics_logger.max_elements and pub_period / 2 >= subscriber_statistics_logger.min_window:
self.pub_frequency *= 2
if len(self.arrival_time_list_) < subscriber_statistics_logger.min_elements and self.pub_frequency.to_sec() / 2 >= subscriber_statistics_logger.min_window:
if len(self.arrival_time_list_) < subscriber_statistics_logger.min_elements and pub_period * 2 <= subscriber_statistics_logger.max_window:
self.pub_frequency /= 2

# clear collected stats, start new window.
Expand Down Expand Up @@ -257,7 +258,7 @@ def callback(self, subscriber_statistics_logger, msg, stat_bytes):
self.last_seq_ = msg.header.seq

# send out statistics with a certain frequency
if self.last_pub_time + self.pub_frequency < arrival_time:
if self.last_pub_time + rospy.Duration(1.0 / self.pub_frequency.to_sec()) < arrival_time:
self.last_pub_time = arrival_time
self.sendStatistics(subscriber_statistics_logger)

Expand Down
1 change: 1 addition & 0 deletions test/test_roscpp/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ add_rostest(launch/ns_node_remapping.xml)
add_rostest(launch/search_param.xml)

add_rostest(launch/stamped_topic_statistics_with_empty_timestamp.xml)
add_rostest(launch/topic_statistic_frequency.xml DEPENDENCIES ${PROJECT_NAME}-publisher_rate ${PROJECT_NAME}-subscriber ${PROJECT_NAME}-topic_statistic_frequency)

# Test spinners
add_rostest(launch/spinners.xml)
39 changes: 39 additions & 0 deletions test/test_roscpp/test/launch/topic_statistic_frequency.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
<!-- basic smoke test for TopicStatistics -->
<launch>
<param name="/enable_statistics" value="true" />
<!-- default 10 would take 5s to warm up for very slow talker -->
<param name="/statistics_window_min_elements" value="4" />

<!-- under 1Hz important, since checking window starts there -->
<node name="very_slow_talker" pkg="test_roscpp" type="test_roscpp-publisher_rate" required="true" args="0.8">
<remap from="data" to="very_slow_chatter" />
</node>
<node name="very_slow_listener" pkg="test_roscpp" type="test_roscpp-subscriber" required="true">
<remap from="data" to="very_slow_chatter" />
</node>

<!-- publishing within fairly normal range of frequencies -->
<node name="slow_talker" pkg="test_roscpp" type="test_roscpp-publisher_rate" required="true" args="18">
<remap from="data" to="slow_chatter" />
</node>
<node name="slow_listener" pkg="test_roscpp" type="test_roscpp-subscriber" required="true">
<remap from="data" to="slow_chatter" />
</node>

<node name="fast_talker" pkg="test_roscpp" type="test_roscpp-publisher_rate" required="true" args="53">
<remap from="data" to="fast_chatter" />
</node>
<node name="fast_listener" pkg="test_roscpp" type="test_roscpp-subscriber" required="true">
<remap from="data" to="fast_chatter" />
</node>

<!-- fast outlier (for most ros systems) -->
<node name="very_fast_talker" pkg="test_roscpp" type="test_roscpp-publisher_rate" required="true" args="171">
<remap from="data" to="/very_fast_chatter" />
</node>
<node name="very_fast_listener" pkg="test_roscpp" type="test_roscpp-subscriber" required="true">
<remap from="data" to="/very_fast_chatter" />
</node>

<test test-name="roscpp_topic_statistics" pkg="test_roscpp" type="test_roscpp-topic_statistic_frequency" />
</launch>
11 changes: 10 additions & 1 deletion test/test_roscpp/test/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -205,13 +205,20 @@ add_dependencies(${PROJECT_NAME}-string_msg_expect ${std_msgs_EXPORTED_TARGETS})
add_executable(${PROJECT_NAME}-stamped_topic_statistics_empty_timestamp EXCLUDE_FROM_ALL stamped_topic_statistics_empty_timestamp.cpp)
target_link_libraries(${PROJECT_NAME}-stamped_topic_statistics_empty_timestamp ${GTEST_LIBRARIES} ${catkin_LIBRARIES})

# The publisher and subscriber are compiled but not registered as a unit test
add_executable(${PROJECT_NAME}-topic_statistic_frequency EXCLUDE_FROM_ALL topic_statistic_frequency.cpp)
target_link_libraries(${PROJECT_NAME}-topic_statistic_frequency ${GTEST_LIBRARIES} ${catkin_LIBRARIES})

# The publishers and subscriber are compiled but not registered as a unit test
# since the test execution requires a network connection which drops packages.
# Call scripts/test_udp_with_dropped_packets.sh to run the test.
add_executable(${PROJECT_NAME}-publisher EXCLUDE_FROM_ALL publisher.cpp)
target_link_libraries(${PROJECT_NAME}-publisher ${catkin_LIBRARIES})
add_dependencies(${PROJECT_NAME}-publisher ${std_msgs_EXPORTED_TARGETS})

add_executable(${PROJECT_NAME}-publisher_rate EXCLUDE_FROM_ALL publisher_rate.cpp)
target_link_libraries(${PROJECT_NAME}-publisher_rate ${catkin_LIBRARIES})
add_dependencies(${PROJECT_NAME}-publisher_rate ${std_msgs_EXPORTED_TARGETS})

add_executable(${PROJECT_NAME}-subscriber EXCLUDE_FROM_ALL subscriber.cpp)
target_link_libraries(${PROJECT_NAME}-subscriber ${catkin_LIBRARIES})
add_dependencies(${PROJECT_NAME}-subscriber ${std_msgs_EXPORTED_TARGETS})
Expand Down Expand Up @@ -281,6 +288,7 @@ if(TARGET tests)
${PROJECT_NAME}-publisher
${PROJECT_NAME}-subscriber
${PROJECT_NAME}-stamped_topic_statistics_empty_timestamp
${PROJECT_NAME}-topic_statistic_frequency
)
endif()

Expand Down Expand Up @@ -346,3 +354,4 @@ add_dependencies(${PROJECT_NAME}-left_right ${${PROJECT_NAME}_EXPORTED_TARGETS})
add_dependencies(${PROJECT_NAME}-string_msg_expect ${${PROJECT_NAME}_EXPORTED_TARGETS})
add_dependencies(${PROJECT_NAME}-stamped_topic_statistics_empty_timestamp
${${PROJECT_NAME}_EXPORTED_TARGETS})
add_dependencies(${PROJECT_NAME}-topic_statistic_frequency ${${PROJECT_NAME}_EXPORTED_TARGETS})
38 changes: 38 additions & 0 deletions test/test_roscpp/test/src/publisher_rate.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Publish big data chunks
// Author: Max Schwarz <[email protected]>

#include <ros/publisher.h>
#include <ros/init.h>
#include <ros/node_handle.h>

#include <std_msgs/Int8MultiArray.h>

int main(int argc, char** argv)
{
ros::init(argc, argv, "publisher");
ros::NodeHandle n;

const size_t NUM_BYTES = 8;
std_msgs::Int8MultiArray data;
data.data.reserve(NUM_BYTES);

assert(argc > 1);
float frequency = atof(argv[1]);

ros::Publisher pub = n.advertise<std_msgs::Int8MultiArray>("data", 1);
ros::Rate rate(frequency);

size_t start = 0;
while(ros::ok())
{
data.data.clear();
for(size_t i = 0; i < NUM_BYTES; ++i)
{
data.data.push_back(start + i);
}
pub.publish(data);
rate.sleep();
start++;
}
return 0;
}
65 changes: 65 additions & 0 deletions test/test_roscpp/test/src/topic_statistic_frequency.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#include <map>
#include <string>

#include <ros/ros.h>
#include <gtest/gtest.h>
#include <rosgraph_msgs/TopicStatistics.h>
#include <boost/thread.hpp>
#include <std_msgs/Int8MultiArray.h>

class Aggregator {
public:
std::map<std::string, ros::Duration> topic_period_mean_map_;

void cb(const rosgraph_msgs::TopicStatistics& msg) {
topic_period_mean_map_[msg.topic] = msg.period_mean;
}

bool frequencyAcceptable(const std::string& topic, float expected) {
float errorMargin = 0.1;
float foundFreq = 1.f / topic_period_mean_map_[topic].toSec();
return std::fabs(foundFreq - expected) / expected <= errorMargin;
}
};

void assertEventuallyHasTopic(const Aggregator& agg, const std::string& topic) {
ros::Duration timeout(5.0);
auto start = ros::Time::now();
while (ros::Time::now() - start < timeout && !agg.topic_period_mean_map_.count(topic)) {
ros::Duration(0.5).sleep();
}
ASSERT_EQ(agg.topic_period_mean_map_.count(topic), 1u);
}

TEST(TopicStatisticFrequency, statisticFrequency)
{
ros::NodeHandle nh;
Aggregator agg;
ros::Subscriber stat_sub = nh.subscribe("/statistics", 1, &Aggregator::cb, &agg);

ros::AsyncSpinner spinner(4);
spinner.start();

ros::Duration(5.0).sleep();

assertEventuallyHasTopic(agg, "/very_fast_chatter");
assertEventuallyHasTopic(agg, "/fast_chatter");
assertEventuallyHasTopic(agg, "/slow_chatter");
assertEventuallyHasTopic(agg, "/very_slow_chatter");

ros::shutdown();

ASSERT_TRUE(agg.frequencyAcceptable("/very_fast_chatter", 171));
ASSERT_TRUE(agg.frequencyAcceptable("/fast_chatter", 53));
ASSERT_TRUE(agg.frequencyAcceptable("/slow_chatter", 18));
ASSERT_TRUE(agg.frequencyAcceptable("/very_slow_chatter", 0.8));
}


int main(int argc, char** argv)
{
testing::InitGoogleTest(&argc, argv);
ros::init(argc, argv, "topic_statistic_frequency");
ros::NodeHandle nh;
return RUN_ALL_TESTS();
}
1 change: 1 addition & 0 deletions test/test_rospy/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,5 @@ if(CATKIN_ENABLE_TESTING)
add_rostest(test/rostest/on_shutdown.test)
add_rostest(test/rostest/sub_to_multiple_pubs.test)
add_rostest(test/rostest/latch_unsubscribe.test)
add_rostest(test/rostest/statistics.test)
endif()
58 changes: 58 additions & 0 deletions test/test_rospy/nodes/freq_talker
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#!/usr/bin/env python
# Software License Agreement (BSD License)
#
# Copyright (c) 2008, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

'''
Copy of talker demo for stats testing purposes. Publishes String to 'chatter'
'''

import rospy
from std_msgs.msg import String

NAME = 'talker'


def talker():
pub = rospy.Publisher("chatter", String, queue_size=1)
rospy.init_node(NAME, anonymous=True)
freq = rospy.get_param("~frequency", 10)
rate = rospy.Rate(freq)

count = 0
while not rospy.is_shutdown():
pub.publish(String("hello world {}".format(count)))
count += 1
rate.sleep()


if __name__ == '__main__':
talker()
42 changes: 42 additions & 0 deletions test/test_rospy/test/rostest/statistics.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
<!-- basic smoke test for TopicStatistics -->
<launch>
<param name="/enable_statistics" value="true" />
<!-- default 10 would take 5s to warm up for very slow talker -->
<param name="/statistics_window_min_elements" value="5" />

<!-- under 1Hz important, since checking window starts there -->
<node name="very_slow_talker" pkg="test_rospy" type="freq_talker" required="true">
<param name="frequency" value="0.5" />
<remap from="chatter" to="very_slow_chatter" />
</node>
<node name="very_slow_listener" pkg="test_rospy" type="listener.py" required="true">
<remap from="chatter" to="very_slow_chatter" />
</node>
<!-- publishing within fairly normal range of frequencies -->
<node name="slow_talker" pkg="test_rospy" type="freq_talker" required="true">
<param name="frequency" value="8" />
<remap from="chatter" to="slow_chatter" />
</node>
<node name="slow_listener" pkg="test_rospy" type="listener.py" required="true">
<remap from="chatter" to="slow_chatter" />
</node>

<node name="fast_talker" pkg="test_rospy" type="freq_talker" required="true">
<param name="frequency" value="53" />
<remap from="chatter" to="fast_chatter" />
</node>
<node name="fast_listener" pkg="test_rospy" type="listener.py" required="true">
<remap from="chatter" to="fast_chatter" />
</node>

<!-- fast outlier (for most ros systems) -->
<node name="very_fast_talker" pkg="test_rospy" type="freq_talker" required="true">
<param name="frequency" value="150" />
<remap from="chatter" to="very_fast_chatter" />
</node>
<node name="very_fast_listener" pkg="test_rospy" type="listener.py" required="true">
<remap from="chatter" to="very_fast_chatter" />
</node>

<test test-name="rospy_topic_statistics" pkg="test_rospy" type="test_topic_statistics.py" />
</launch>
Loading

0 comments on commit 8d9af41

Please sign in to comment.