From 1c843f0c7e8bdae328a1f995d49197568e05f4a8 Mon Sep 17 00:00:00 2001 From: Michel Hidalgo Date: Mon, 22 Apr 2019 17:07:39 -0300 Subject: [PATCH] Migrate tests to new launch_testing API. Signed-off-by: Michel Hidalgo --- CMakeLists.txt | 30 ++- package.xml | 3 +- .../test_services_across_dynamic_bridge.py.in | 190 ++++-------------- test/test_topics_across_dynamic_bridge.py.in | 185 +++++------------ 4 files changed, 114 insertions(+), 294 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index ab2176a6..0885ed95 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -220,22 +220,38 @@ endif() macro(targets) configure_file( - test/test_dynamic_bridge.py.in - test_dynamic_bridge${target_suffix}.py.genexp + test/test_topics_across_dynamic_bridge.py.in + test_topics_across_dynamic_bridge${target_suffix}.py.genexp @ONLY ) file(GENERATE - OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/test_dynamic_bridge${target_suffix}_$.py" - INPUT "${CMAKE_CURRENT_BINARY_DIR}/test_dynamic_bridge${target_suffix}.py.genexp" + OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/test_topics_across_dynamic_bridge${target_suffix}_$.py" + INPUT "${CMAKE_CURRENT_BINARY_DIR}/test_topics_across_dynamic_bridge${target_suffix}.py.genexp" ) - ament_add_pytest_test(test_dynamic_bridge${target_suffix} - "${CMAKE_CURRENT_BINARY_DIR}/test_dynamic_bridge${target_suffix}_$.py" + add_launch_test( + "${CMAKE_CURRENT_BINARY_DIR}/test_topics_across_dynamic_bridge${target_suffix}_$.py" + TARGET test_topics_across_dynamic_bridge${target_suffix} + ENV RMW_IMPLEMENTAION=${rmw_implementaion} + TIMEOUT 60) + + configure_file( + test/test_services_across_dynamic_bridge.py.in + test_services_across_dynamic_bridge${target_suffix}.py.genexp + @ONLY + ) + file(GENERATE + OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/test_services_across_dynamic_bridge${target_suffix}_$.py" + INPUT "${CMAKE_CURRENT_BINARY_DIR}/test_services_across_dynamic_bridge${target_suffix}.py.genexp" + ) + add_launch_test( + "${CMAKE_CURRENT_BINARY_DIR}/test_services_across_dynamic_bridge${target_suffix}_$.py" + TARGET test_services_across_dynamic_bridge${target_suffix} ENV RMW_IMPLEMENTAION=${rmw_implementaion} TIMEOUT 60) endmacro() if(TEST_ROS1_BRIDGE) - find_package(ament_cmake_pytest REQUIRED) + find_package(launch_testing_ament_cmake REQUIRED) add_executable(test_ros1_client "test/test_ros1_client.cpp") ament_target_dependencies(test_ros1_client "ros1_roscpp") diff --git a/package.xml b/package.xml index 2cfcc8bc..77868780 100644 --- a/package.xml +++ b/package.xml @@ -29,13 +29,14 @@ rcutils std_msgs - ament_cmake_pytest ament_lint_auto ament_lint_common demo_nodes_cpp diagnostic_msgs launch launch_testing + launch_testing_ament_cmake + launch_testing_ros ros2run rosidl_interface_packages diff --git a/test/test_services_across_dynamic_bridge.py.in b/test/test_services_across_dynamic_bridge.py.in index d3ad2a45..91b0f1c7 100644 --- a/test/test_services_across_dynamic_bridge.py.in +++ b/test/test_services_across_dynamic_bridge.py.in @@ -12,184 +12,76 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os -import re +import unittest from launch import LaunchDescription -from launch import LaunchService from launch.actions import ExecuteProcess -from launch_testing.legacy import LaunchTestService -from launch_testing.legacy.output import create_output_lines_filter -from launch_testing.legacy.output import create_output_regex_test +from launch.actions import OpaqueFunction - -from ros2run.api import get_executable_path +import launch_testing TEST_BRIDGE_ROS1_ENV = '@TEST_BRIDGE_ROS1_ENV@' TEST_BRIDGE_ROSCORE = '@TEST_BRIDGE_ROSCORE@' -TEST_BRIDGE_ROS1_TALKER = ['rosrun', 'roscpp_tutorials', 'talker'] -TEST_BRIDGE_ROS1_LISTENER = ['rosrun', 'rospy_tutorials', 'listener'] TEST_BRIDGE_ROS1_CLIENT = '@TEST_BRIDGE_ROS1_CLIENT@' TEST_BRIDGE_ROS1_SERVER = '@TEST_BRIDGE_ROS1_SERVER@' TEST_BRIDGE_DYNAMIC_BRIDGE = '@TEST_BRIDGE_DYNAMIC_BRIDGE@' -TEST_BRIDGE_ROS2_TALKER = get_executable_path( - package_name='demo_nodes_cpp', executable_name='talker') -TEST_BRIDGE_ROS2_LISTENER = get_executable_path( - package_name='demo_nodes_cpp', executable_name='listener') TEST_BRIDGE_ROS2_CLIENT = '@TEST_BRIDGE_ROS2_CLIENT@' TEST_BRIDGE_ROS2_SERVER = '@TEST_BRIDGE_ROS2_SERVER@' -TEST_BRIDGE_RMW = '@TEST_BRIDGE_RMW@' -def get_default_launch_test_description(name): +@launch_testing.parametrize('test_name,server_cmd,client_cmd', [ + ('ros1_server_ros2_client_across_dynamic_bridge', + [TEST_BRIDGE_ROS1_ENV, TEST_BRIDGE_ROS1_SERVER], + [TEST_BRIDGE_ROS2_CLIENT]), + ('ros2_server_ros1_client_across_dynamic_bridge', + [TEST_BRIDGE_ROS2_SERVER], + [TEST_BRIDGE_ROS1_ENV, TEST_BRIDGE_ROS1_CLIENT]), +]) +def generate_test_description(test_name, server_cmd, client_cmd, ready_fn): launch_description = LaunchDescription() - launch_test = LaunchTestService() # ROS 1 core - launch_test.add_fixture_action( - launch_description, ExecuteProcess( - cmd=[TEST_BRIDGE_ROS1_ENV, TEST_BRIDGE_ROSCORE], - name=name + '__roscore', - ), exit_allowed=True - ) + launch_description.add_action(ExecuteProcess( + cmd=[TEST_BRIDGE_ROS1_ENV, TEST_BRIDGE_ROSCORE], + name=test_name + '__roscore', + )) # dynamic bridge - launch_test.add_fixture_action( - launch_description, ExecuteProcess( - cmd=[TEST_BRIDGE_ROS1_ENV, TEST_BRIDGE_DYNAMIC_BRIDGE], - name=name + '__dynamic_bridge', - ) + rosbridge_process = ExecuteProcess( + cmd=[TEST_BRIDGE_ROS1_ENV, TEST_BRIDGE_DYNAMIC_BRIDGE], + name=test_name + '__dynamic_bridge', ) + launch_description.add_action(rosbridge_process) - return launch_description, launch_test - - -def test_dynamic_bridge_msg_1to2(): - name = 'test_dynamic_bridge_msg_1to2' - - launch_description, launch_test = get_default_launch_test_description(name) - - launch_test.add_fixture_action( - launch_description, ExecuteProcess( - cmd=[TEST_BRIDGE_ROS1_ENV] + TEST_BRIDGE_ROS1_TALKER, - name=name + '__ros1talker', - ) + server_process = ExecuteProcess( + cmd=server_cmd, name=test_name + '__server', ) + launch_description.add_action(server_process) - action = launch_test.add_fixture_action( - launch_description, ExecuteProcess( - cmd=[TEST_BRIDGE_ROS2_LISTENER], - name=name + '__ros2listener', - output='screen' - ), exit_allowed=True + client_process = ExecuteProcess( + cmd=client_cmd, name=test_name + '__client', ) + launch_description.add_action(client_process) - launch_test.add_output_test( - launch_description, action, - output_test=create_output_regex_test( - expected_patterns=[re.compile(b'I heard.+')] - ), - output_filter=create_output_lines_filter( - filtered_rmw_implementation=TEST_BRIDGE_RMW, - ) + launch_description.add_action( + OpaqueFunction(function=lambda context: ready_fn()) ) - - launch(launch_test, launch_description) - - -def test_dynamic_bridge_msg_2to1(): - name = 'test_dynamic_bridge_msg_2to1' - - launch_description, launch_test = get_default_launch_test_description(name) - - launch_test.add_fixture_action( - launch_description, ExecuteProcess( - cmd=[TEST_BRIDGE_ROS2_TALKER], - name=name + '__ros2talker', - ) - ) - - env = dict(os.environ) - env['PYTHONUNBUFFERED'] = '1' - action = launch_test.add_fixture_action( - launch_description, ExecuteProcess( - cmd=[TEST_BRIDGE_ROS1_ENV] + TEST_BRIDGE_ROS1_LISTENER, - name=name + '__ros1listener', - env=env, - output='screen' - ), exit_allowed=True - ) - - launch_test.add_output_test( - launch_description, action, - output_test=create_output_regex_test( - expected_patterns=[re.compile(b'I heard.+')] - ), - output_filter=create_output_lines_filter( - filtered_rmw_implementation=TEST_BRIDGE_RMW, - ), - side_effect='shutdown', - ) - - launch(launch_test, launch_description) - - -def test_dynamic_bridge_srv_1to2(): - name = 'test_dynamic_bridge_srv_1to2' - - launch_description, launch_test = get_default_launch_test_description(name) - - launch_test.add_fixture_action( - launch_description, ExecuteProcess( - cmd=[TEST_BRIDGE_ROS1_ENV, TEST_BRIDGE_ROS1_SERVER], - name=name + '__ros1server', - ) - ) - - launch_test.add_test_action( - launch_description, ExecuteProcess( - cmd=[TEST_BRIDGE_ROS2_CLIENT], - name=name + '__ros2client', - ) - ) - - launch(launch_test, launch_description) - - -def test_dynamic_bridge_srv_2to1(): - name = 'test_dynamic_bridge_srv_2to1' - - launch_description, launch_test = get_default_launch_test_description(name) - - launch_test.add_fixture_action( - launch_description, ExecuteProcess( - cmd=[TEST_BRIDGE_ROS2_SERVER], - name=name + '__ros2server', - ) - ) - - launch_test.add_test_action( - launch_description, ExecuteProcess( - cmd=[TEST_BRIDGE_ROS1_ENV, TEST_BRIDGE_ROS1_CLIENT], - name=name + '__ros1client', - ) - ) - - launch(launch_test, launch_description) + return launch_description, locals() -def launch(launch_test, launch_description): - launch_service = LaunchService() - launch_service.include_launch_description(launch_description) - rc = launch_test.run(launch_service) +class TestServicesAcrossDynamicBridge(unittest.TestCase): + def test_client_process_terminates_after_a_finite_amount_of_time(self, client_process): + """Test that the client executables terminates after a finite amount of time.""" + self.proc_info.assertWaitForShutdown(process=client_process, timeout=30) - assert rc == 0, \ - "The launch file failed with exit code '%s'" % str(rc) +@launch_testing.post_shutdown_test() +class TestServicesAcrossDynamicBridgeAfterShutdown(unittest.TestCase): -if __name__ == '__main__': - test_dynamic_bridge_msg_1to2() - test_dynamic_bridge_msg_2to1() - test_dynamic_bridge_srv_1to2() - test_dynamic_bridge_srv_2to1() + def test_processes_finished_gracefully(self, proc_info, rosbridge_process, + server_process, client_process): + """Test that both executables finished gracefully.""" + launch_testing.asserts.assertExitCodes(proc_info, process=rosbridge_process) + launch_testing.asserts.assertExitCodes(proc_info, process=server_process) + launch_testing.asserts.assertExitCodes(proc_info, process=client_process) diff --git a/test/test_topics_across_dynamic_bridge.py.in b/test/test_topics_across_dynamic_bridge.py.in index d3ad2a45..57954bf6 100644 --- a/test/test_topics_across_dynamic_bridge.py.in +++ b/test/test_topics_across_dynamic_bridge.py.in @@ -15,13 +15,14 @@ import os import re +import unittest + from launch import LaunchDescription -from launch import LaunchService from launch.actions import ExecuteProcess -from launch_testing.legacy import LaunchTestService -from launch_testing.legacy.output import create_output_lines_filter -from launch_testing.legacy.output import create_output_regex_test +from launch.actions import OpaqueFunction +import launch_testing +import launch_testing_ros from ros2run.api import get_executable_path @@ -30,166 +31,76 @@ TEST_BRIDGE_ROS1_ENV = '@TEST_BRIDGE_ROS1_ENV@' TEST_BRIDGE_ROSCORE = '@TEST_BRIDGE_ROSCORE@' TEST_BRIDGE_ROS1_TALKER = ['rosrun', 'roscpp_tutorials', 'talker'] TEST_BRIDGE_ROS1_LISTENER = ['rosrun', 'rospy_tutorials', 'listener'] -TEST_BRIDGE_ROS1_CLIENT = '@TEST_BRIDGE_ROS1_CLIENT@' -TEST_BRIDGE_ROS1_SERVER = '@TEST_BRIDGE_ROS1_SERVER@' TEST_BRIDGE_DYNAMIC_BRIDGE = '@TEST_BRIDGE_DYNAMIC_BRIDGE@' TEST_BRIDGE_ROS2_TALKER = get_executable_path( package_name='demo_nodes_cpp', executable_name='talker') TEST_BRIDGE_ROS2_LISTENER = get_executable_path( package_name='demo_nodes_cpp', executable_name='listener') -TEST_BRIDGE_ROS2_CLIENT = '@TEST_BRIDGE_ROS2_CLIENT@' -TEST_BRIDGE_ROS2_SERVER = '@TEST_BRIDGE_ROS2_SERVER@' TEST_BRIDGE_RMW = '@TEST_BRIDGE_RMW@' -def get_default_launch_test_description(name): +@launch_testing.parametrize('test_name,talker_cmd,listener_cmd', [ + ('ros1_talker_ros2_listener_across_dynamic_bridge', + [TEST_BRIDGE_ROS1_ENV] + TEST_BRIDGE_ROS1_TALKER, + [TEST_BRIDGE_ROS2_LISTENER]), + ('ros2_talker_ros1_listener_across_dynamic_bridge', + [TEST_BRIDGE_ROS2_TALKER], + [TEST_BRIDGE_ROS1_ENV] + TEST_BRIDGE_ROS1_LISTENER), +]) +def generate_test_description(test_name, talker_cmd, listener_cmd, ready_fn): launch_description = LaunchDescription() - launch_test = LaunchTestService() # ROS 1 core - launch_test.add_fixture_action( - launch_description, ExecuteProcess( - cmd=[TEST_BRIDGE_ROS1_ENV, TEST_BRIDGE_ROSCORE], - name=name + '__roscore', - ), exit_allowed=True - ) + launch_description.add_action(ExecuteProcess( + cmd=[TEST_BRIDGE_ROS1_ENV, TEST_BRIDGE_ROSCORE], + name=test_name + '__roscore', + )) # dynamic bridge - launch_test.add_fixture_action( - launch_description, ExecuteProcess( - cmd=[TEST_BRIDGE_ROS1_ENV, TEST_BRIDGE_DYNAMIC_BRIDGE], - name=name + '__dynamic_bridge', - ) - ) - - return launch_description, launch_test - - -def test_dynamic_bridge_msg_1to2(): - name = 'test_dynamic_bridge_msg_1to2' - - launch_description, launch_test = get_default_launch_test_description(name) - - launch_test.add_fixture_action( - launch_description, ExecuteProcess( - cmd=[TEST_BRIDGE_ROS1_ENV] + TEST_BRIDGE_ROS1_TALKER, - name=name + '__ros1talker', - ) + rosbridge_process = ExecuteProcess( + cmd=[TEST_BRIDGE_ROS1_ENV, TEST_BRIDGE_DYNAMIC_BRIDGE], + name=test_name + '__dynamic_bridge', ) + launch_description.add_action(rosbridge_process) - action = launch_test.add_fixture_action( - launch_description, ExecuteProcess( - cmd=[TEST_BRIDGE_ROS2_LISTENER], - name=name + '__ros2listener', - output='screen' - ), exit_allowed=True - ) - - launch_test.add_output_test( - launch_description, action, - output_test=create_output_regex_test( - expected_patterns=[re.compile(b'I heard.+')] - ), - output_filter=create_output_lines_filter( - filtered_rmw_implementation=TEST_BRIDGE_RMW, - ) - ) - - launch(launch_test, launch_description) - - -def test_dynamic_bridge_msg_2to1(): - name = 'test_dynamic_bridge_msg_2to1' - - launch_description, launch_test = get_default_launch_test_description(name) - - launch_test.add_fixture_action( - launch_description, ExecuteProcess( - cmd=[TEST_BRIDGE_ROS2_TALKER], - name=name + '__ros2talker', - ) + talker_process = ExecuteProcess( + cmd=talker_cmd, name=test_name + '__talker', ) + launch_description.add_action(talker_process) env = dict(os.environ) env['PYTHONUNBUFFERED'] = '1' - action = launch_test.add_fixture_action( - launch_description, ExecuteProcess( - cmd=[TEST_BRIDGE_ROS1_ENV] + TEST_BRIDGE_ROS1_LISTENER, - name=name + '__ros1listener', - env=env, - output='screen' - ), exit_allowed=True + listener_process = ExecuteProcess( + cmd=listener_cmd, name=test_name + '__listener', env=env ) + launch_description.add_action(listener_process) - launch_test.add_output_test( - launch_description, action, - output_test=create_output_regex_test( - expected_patterns=[re.compile(b'I heard.+')] - ), - output_filter=create_output_lines_filter( - filtered_rmw_implementation=TEST_BRIDGE_RMW, - ), - side_effect='shutdown', + launch_description.add_action( + OpaqueFunction(function=lambda context: ready_fn()) ) + return launch_description, locals() - launch(launch_test, launch_description) - - -def test_dynamic_bridge_srv_1to2(): - name = 'test_dynamic_bridge_srv_1to2' - launch_description, launch_test = get_default_launch_test_description(name) +class TestTopicsAcrossDynamicBridge(unittest.TestCase): - launch_test.add_fixture_action( - launch_description, ExecuteProcess( - cmd=[TEST_BRIDGE_ROS1_ENV, TEST_BRIDGE_ROS1_SERVER], - name=name + '__ros1server', + def test_listener_output(self, proc_output, listener_process): + output_filter = launch_testing_ros.tools.basic_output_filter( + filtered_rmw_implementation=TEST_BRIDGE_RMW ) - ) - - launch_test.add_test_action( - launch_description, ExecuteProcess( - cmd=[TEST_BRIDGE_ROS2_CLIENT], - name=name + '__ros2client', + proc_output.assertWaitFor( + expected_output=[re.compile('I heard.+')], + process=listener_process, + output_filter=output_filter, + timeout=10 ) - ) - - launch(launch_test, launch_description) - - -def test_dynamic_bridge_srv_2to1(): - name = 'test_dynamic_bridge_srv_2to1' - - launch_description, launch_test = get_default_launch_test_description(name) - - launch_test.add_fixture_action( - launch_description, ExecuteProcess( - cmd=[TEST_BRIDGE_ROS2_SERVER], - name=name + '__ros2server', - ) - ) - - launch_test.add_test_action( - launch_description, ExecuteProcess( - cmd=[TEST_BRIDGE_ROS1_ENV, TEST_BRIDGE_ROS1_CLIENT], - name=name + '__ros1client', - ) - ) - - launch(launch_test, launch_description) - - -def launch(launch_test, launch_description): - launch_service = LaunchService() - launch_service.include_launch_description(launch_description) - rc = launch_test.run(launch_service) - assert rc == 0, \ - "The launch file failed with exit code '%s'" % str(rc) +@launch_testing.post_shutdown_test() +class TestTopicsAcrossDynamicBridgeAfterShutdown(unittest.TestCase): -if __name__ == '__main__': - test_dynamic_bridge_msg_1to2() - test_dynamic_bridge_msg_2to1() - test_dynamic_bridge_srv_1to2() - test_dynamic_bridge_srv_2to1() + def test_processes_finished_gracefully(self, proc_info, rosbridge_process, + talker_process, listener_process): + """Test that both executables finished gracefully.""" + launch_testing.asserts.assertExitCodes(proc_info, process=rosbridge_process) + launch_testing.asserts.assertExitCodes(proc_info, process=talker_process) + launch_testing.asserts.assertExitCodes(proc_info, process=listener_process)