From 87092c237f126b1789f070c65dce563e65807d9f Mon Sep 17 00:00:00 2001 From: Michel Hidalgo Date: Mon, 25 Mar 2019 09:51:00 -0300 Subject: [PATCH 1/3] Update after launch_testing features becoming legacy. Signed-off-by: Michel Hidalgo --- test/test_dynamic_bridge.py.in | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/test_dynamic_bridge.py.in b/test/test_dynamic_bridge.py.in index c31db2a3..d3ad2a45 100644 --- a/test/test_dynamic_bridge.py.in +++ b/test/test_dynamic_bridge.py.in @@ -18,9 +18,9 @@ import re from launch import LaunchDescription from launch import LaunchService from launch.actions import ExecuteProcess -from launch_testing import LaunchTestService -from launch_testing.output import create_output_lines_filter -from launch_testing.output import create_output_regex_test +from launch_testing.legacy import LaunchTestService +from launch_testing.legacy.output import create_output_lines_filter +from launch_testing.legacy.output import create_output_regex_test from ros2run.api import get_executable_path From 1dfff37583cbb21db81a29cd8f08e33879427255 Mon Sep 17 00:00:00 2001 From: Michel Hidalgo Date: Mon, 22 Apr 2019 15:52:32 -0300 Subject: [PATCH 2/3] Rename launch based tests in preparation for a refactor. Signed-off-by: Michel Hidalgo --- ...test_services_across_dynamic_bridge.py.in} | 0 test/test_topics_across_dynamic_bridge.py.in | 195 ++++++++++++++++++ 2 files changed, 195 insertions(+) rename test/{test_dynamic_bridge.py.in => test_services_across_dynamic_bridge.py.in} (100%) create mode 100644 test/test_topics_across_dynamic_bridge.py.in diff --git a/test/test_dynamic_bridge.py.in b/test/test_services_across_dynamic_bridge.py.in similarity index 100% rename from test/test_dynamic_bridge.py.in rename to test/test_services_across_dynamic_bridge.py.in diff --git a/test/test_topics_across_dynamic_bridge.py.in b/test/test_topics_across_dynamic_bridge.py.in new file mode 100644 index 00000000..d3ad2a45 --- /dev/null +++ b/test/test_topics_across_dynamic_bridge.py.in @@ -0,0 +1,195 @@ +# Copyright 2016 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import re + +from launch import LaunchDescription +from launch import LaunchService +from launch.actions import ExecuteProcess +from launch_testing.legacy import LaunchTestService +from launch_testing.legacy.output import create_output_lines_filter +from launch_testing.legacy.output import create_output_regex_test + + +from ros2run.api import get_executable_path + + +TEST_BRIDGE_ROS1_ENV = '@TEST_BRIDGE_ROS1_ENV@' +TEST_BRIDGE_ROSCORE = '@TEST_BRIDGE_ROSCORE@' +TEST_BRIDGE_ROS1_TALKER = ['rosrun', 'roscpp_tutorials', 'talker'] +TEST_BRIDGE_ROS1_LISTENER = ['rosrun', 'rospy_tutorials', 'listener'] +TEST_BRIDGE_ROS1_CLIENT = '@TEST_BRIDGE_ROS1_CLIENT@' +TEST_BRIDGE_ROS1_SERVER = '@TEST_BRIDGE_ROS1_SERVER@' +TEST_BRIDGE_DYNAMIC_BRIDGE = '@TEST_BRIDGE_DYNAMIC_BRIDGE@' +TEST_BRIDGE_ROS2_TALKER = get_executable_path( + package_name='demo_nodes_cpp', executable_name='talker') +TEST_BRIDGE_ROS2_LISTENER = get_executable_path( + package_name='demo_nodes_cpp', executable_name='listener') +TEST_BRIDGE_ROS2_CLIENT = '@TEST_BRIDGE_ROS2_CLIENT@' +TEST_BRIDGE_ROS2_SERVER = '@TEST_BRIDGE_ROS2_SERVER@' +TEST_BRIDGE_RMW = '@TEST_BRIDGE_RMW@' + + +def get_default_launch_test_description(name): + launch_description = LaunchDescription() + launch_test = LaunchTestService() + + # ROS 1 core + launch_test.add_fixture_action( + launch_description, ExecuteProcess( + cmd=[TEST_BRIDGE_ROS1_ENV, TEST_BRIDGE_ROSCORE], + name=name + '__roscore', + ), exit_allowed=True + ) + + # dynamic bridge + launch_test.add_fixture_action( + launch_description, ExecuteProcess( + cmd=[TEST_BRIDGE_ROS1_ENV, TEST_BRIDGE_DYNAMIC_BRIDGE], + name=name + '__dynamic_bridge', + ) + ) + + return launch_description, launch_test + + +def test_dynamic_bridge_msg_1to2(): + name = 'test_dynamic_bridge_msg_1to2' + + launch_description, launch_test = get_default_launch_test_description(name) + + launch_test.add_fixture_action( + launch_description, ExecuteProcess( + cmd=[TEST_BRIDGE_ROS1_ENV] + TEST_BRIDGE_ROS1_TALKER, + name=name + '__ros1talker', + ) + ) + + action = launch_test.add_fixture_action( + launch_description, ExecuteProcess( + cmd=[TEST_BRIDGE_ROS2_LISTENER], + name=name + '__ros2listener', + output='screen' + ), exit_allowed=True + ) + + launch_test.add_output_test( + launch_description, action, + output_test=create_output_regex_test( + expected_patterns=[re.compile(b'I heard.+')] + ), + output_filter=create_output_lines_filter( + filtered_rmw_implementation=TEST_BRIDGE_RMW, + ) + ) + + launch(launch_test, launch_description) + + +def test_dynamic_bridge_msg_2to1(): + name = 'test_dynamic_bridge_msg_2to1' + + launch_description, launch_test = get_default_launch_test_description(name) + + launch_test.add_fixture_action( + launch_description, ExecuteProcess( + cmd=[TEST_BRIDGE_ROS2_TALKER], + name=name + '__ros2talker', + ) + ) + + env = dict(os.environ) + env['PYTHONUNBUFFERED'] = '1' + action = launch_test.add_fixture_action( + launch_description, ExecuteProcess( + cmd=[TEST_BRIDGE_ROS1_ENV] + TEST_BRIDGE_ROS1_LISTENER, + name=name + '__ros1listener', + env=env, + output='screen' + ), exit_allowed=True + ) + + launch_test.add_output_test( + launch_description, action, + output_test=create_output_regex_test( + expected_patterns=[re.compile(b'I heard.+')] + ), + output_filter=create_output_lines_filter( + filtered_rmw_implementation=TEST_BRIDGE_RMW, + ), + side_effect='shutdown', + ) + + launch(launch_test, launch_description) + + +def test_dynamic_bridge_srv_1to2(): + name = 'test_dynamic_bridge_srv_1to2' + + launch_description, launch_test = get_default_launch_test_description(name) + + launch_test.add_fixture_action( + launch_description, ExecuteProcess( + cmd=[TEST_BRIDGE_ROS1_ENV, TEST_BRIDGE_ROS1_SERVER], + name=name + '__ros1server', + ) + ) + + launch_test.add_test_action( + launch_description, ExecuteProcess( + cmd=[TEST_BRIDGE_ROS2_CLIENT], + name=name + '__ros2client', + ) + ) + + launch(launch_test, launch_description) + + +def test_dynamic_bridge_srv_2to1(): + name = 'test_dynamic_bridge_srv_2to1' + + launch_description, launch_test = get_default_launch_test_description(name) + + launch_test.add_fixture_action( + launch_description, ExecuteProcess( + cmd=[TEST_BRIDGE_ROS2_SERVER], + name=name + '__ros2server', + ) + ) + + launch_test.add_test_action( + launch_description, ExecuteProcess( + cmd=[TEST_BRIDGE_ROS1_ENV, TEST_BRIDGE_ROS1_CLIENT], + name=name + '__ros1client', + ) + ) + + launch(launch_test, launch_description) + + +def launch(launch_test, launch_description): + launch_service = LaunchService() + launch_service.include_launch_description(launch_description) + rc = launch_test.run(launch_service) + + assert rc == 0, \ + "The launch file failed with exit code '%s'" % str(rc) + + +if __name__ == '__main__': + test_dynamic_bridge_msg_1to2() + test_dynamic_bridge_msg_2to1() + test_dynamic_bridge_srv_1to2() + test_dynamic_bridge_srv_2to1() From 1c843f0c7e8bdae328a1f995d49197568e05f4a8 Mon Sep 17 00:00:00 2001 From: Michel Hidalgo Date: Mon, 22 Apr 2019 17:07:39 -0300 Subject: [PATCH 3/3] Migrate tests to new launch_testing API. Signed-off-by: Michel Hidalgo --- CMakeLists.txt | 30 ++- package.xml | 3 +- .../test_services_across_dynamic_bridge.py.in | 190 ++++-------------- test/test_topics_across_dynamic_bridge.py.in | 185 +++++------------ 4 files changed, 114 insertions(+), 294 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index ab2176a6..0885ed95 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -220,22 +220,38 @@ endif() macro(targets) configure_file( - test/test_dynamic_bridge.py.in - test_dynamic_bridge${target_suffix}.py.genexp + test/test_topics_across_dynamic_bridge.py.in + test_topics_across_dynamic_bridge${target_suffix}.py.genexp @ONLY ) file(GENERATE - OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/test_dynamic_bridge${target_suffix}_$.py" - INPUT "${CMAKE_CURRENT_BINARY_DIR}/test_dynamic_bridge${target_suffix}.py.genexp" + OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/test_topics_across_dynamic_bridge${target_suffix}_$.py" + INPUT "${CMAKE_CURRENT_BINARY_DIR}/test_topics_across_dynamic_bridge${target_suffix}.py.genexp" ) - ament_add_pytest_test(test_dynamic_bridge${target_suffix} - "${CMAKE_CURRENT_BINARY_DIR}/test_dynamic_bridge${target_suffix}_$.py" + add_launch_test( + "${CMAKE_CURRENT_BINARY_DIR}/test_topics_across_dynamic_bridge${target_suffix}_$.py" + TARGET test_topics_across_dynamic_bridge${target_suffix} + ENV RMW_IMPLEMENTAION=${rmw_implementaion} + TIMEOUT 60) + + configure_file( + test/test_services_across_dynamic_bridge.py.in + test_services_across_dynamic_bridge${target_suffix}.py.genexp + @ONLY + ) + file(GENERATE + OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/test_services_across_dynamic_bridge${target_suffix}_$.py" + INPUT "${CMAKE_CURRENT_BINARY_DIR}/test_services_across_dynamic_bridge${target_suffix}.py.genexp" + ) + add_launch_test( + "${CMAKE_CURRENT_BINARY_DIR}/test_services_across_dynamic_bridge${target_suffix}_$.py" + TARGET test_services_across_dynamic_bridge${target_suffix} ENV RMW_IMPLEMENTAION=${rmw_implementaion} TIMEOUT 60) endmacro() if(TEST_ROS1_BRIDGE) - find_package(ament_cmake_pytest REQUIRED) + find_package(launch_testing_ament_cmake REQUIRED) add_executable(test_ros1_client "test/test_ros1_client.cpp") ament_target_dependencies(test_ros1_client "ros1_roscpp") diff --git a/package.xml b/package.xml index 2cfcc8bc..77868780 100644 --- a/package.xml +++ b/package.xml @@ -29,13 +29,14 @@ rcutils std_msgs - ament_cmake_pytest ament_lint_auto ament_lint_common demo_nodes_cpp diagnostic_msgs launch launch_testing + launch_testing_ament_cmake + launch_testing_ros ros2run rosidl_interface_packages diff --git a/test/test_services_across_dynamic_bridge.py.in b/test/test_services_across_dynamic_bridge.py.in index d3ad2a45..91b0f1c7 100644 --- a/test/test_services_across_dynamic_bridge.py.in +++ b/test/test_services_across_dynamic_bridge.py.in @@ -12,184 +12,76 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os -import re +import unittest from launch import LaunchDescription -from launch import LaunchService from launch.actions import ExecuteProcess -from launch_testing.legacy import LaunchTestService -from launch_testing.legacy.output import create_output_lines_filter -from launch_testing.legacy.output import create_output_regex_test +from launch.actions import OpaqueFunction - -from ros2run.api import get_executable_path +import launch_testing TEST_BRIDGE_ROS1_ENV = '@TEST_BRIDGE_ROS1_ENV@' TEST_BRIDGE_ROSCORE = '@TEST_BRIDGE_ROSCORE@' -TEST_BRIDGE_ROS1_TALKER = ['rosrun', 'roscpp_tutorials', 'talker'] -TEST_BRIDGE_ROS1_LISTENER = ['rosrun', 'rospy_tutorials', 'listener'] TEST_BRIDGE_ROS1_CLIENT = '@TEST_BRIDGE_ROS1_CLIENT@' TEST_BRIDGE_ROS1_SERVER = '@TEST_BRIDGE_ROS1_SERVER@' TEST_BRIDGE_DYNAMIC_BRIDGE = '@TEST_BRIDGE_DYNAMIC_BRIDGE@' -TEST_BRIDGE_ROS2_TALKER = get_executable_path( - package_name='demo_nodes_cpp', executable_name='talker') -TEST_BRIDGE_ROS2_LISTENER = get_executable_path( - package_name='demo_nodes_cpp', executable_name='listener') TEST_BRIDGE_ROS2_CLIENT = '@TEST_BRIDGE_ROS2_CLIENT@' TEST_BRIDGE_ROS2_SERVER = '@TEST_BRIDGE_ROS2_SERVER@' -TEST_BRIDGE_RMW = '@TEST_BRIDGE_RMW@' -def get_default_launch_test_description(name): +@launch_testing.parametrize('test_name,server_cmd,client_cmd', [ + ('ros1_server_ros2_client_across_dynamic_bridge', + [TEST_BRIDGE_ROS1_ENV, TEST_BRIDGE_ROS1_SERVER], + [TEST_BRIDGE_ROS2_CLIENT]), + ('ros2_server_ros1_client_across_dynamic_bridge', + [TEST_BRIDGE_ROS2_SERVER], + [TEST_BRIDGE_ROS1_ENV, TEST_BRIDGE_ROS1_CLIENT]), +]) +def generate_test_description(test_name, server_cmd, client_cmd, ready_fn): launch_description = LaunchDescription() - launch_test = LaunchTestService() # ROS 1 core - launch_test.add_fixture_action( - launch_description, ExecuteProcess( - cmd=[TEST_BRIDGE_ROS1_ENV, TEST_BRIDGE_ROSCORE], - name=name + '__roscore', - ), exit_allowed=True - ) + launch_description.add_action(ExecuteProcess( + cmd=[TEST_BRIDGE_ROS1_ENV, TEST_BRIDGE_ROSCORE], + name=test_name + '__roscore', + )) # dynamic bridge - launch_test.add_fixture_action( - launch_description, ExecuteProcess( - cmd=[TEST_BRIDGE_ROS1_ENV, TEST_BRIDGE_DYNAMIC_BRIDGE], - name=name + '__dynamic_bridge', - ) + rosbridge_process = ExecuteProcess( + cmd=[TEST_BRIDGE_ROS1_ENV, TEST_BRIDGE_DYNAMIC_BRIDGE], + name=test_name + '__dynamic_bridge', ) + launch_description.add_action(rosbridge_process) - return launch_description, launch_test - - -def test_dynamic_bridge_msg_1to2(): - name = 'test_dynamic_bridge_msg_1to2' - - launch_description, launch_test = get_default_launch_test_description(name) - - launch_test.add_fixture_action( - launch_description, ExecuteProcess( - cmd=[TEST_BRIDGE_ROS1_ENV] + TEST_BRIDGE_ROS1_TALKER, - name=name + '__ros1talker', - ) + server_process = ExecuteProcess( + cmd=server_cmd, name=test_name + '__server', ) + launch_description.add_action(server_process) - action = launch_test.add_fixture_action( - launch_description, ExecuteProcess( - cmd=[TEST_BRIDGE_ROS2_LISTENER], - name=name + '__ros2listener', - output='screen' - ), exit_allowed=True + client_process = ExecuteProcess( + cmd=client_cmd, name=test_name + '__client', ) + launch_description.add_action(client_process) - launch_test.add_output_test( - launch_description, action, - output_test=create_output_regex_test( - expected_patterns=[re.compile(b'I heard.+')] - ), - output_filter=create_output_lines_filter( - filtered_rmw_implementation=TEST_BRIDGE_RMW, - ) + launch_description.add_action( + OpaqueFunction(function=lambda context: ready_fn()) ) - - launch(launch_test, launch_description) - - -def test_dynamic_bridge_msg_2to1(): - name = 'test_dynamic_bridge_msg_2to1' - - launch_description, launch_test = get_default_launch_test_description(name) - - launch_test.add_fixture_action( - launch_description, ExecuteProcess( - cmd=[TEST_BRIDGE_ROS2_TALKER], - name=name + '__ros2talker', - ) - ) - - env = dict(os.environ) - env['PYTHONUNBUFFERED'] = '1' - action = launch_test.add_fixture_action( - launch_description, ExecuteProcess( - cmd=[TEST_BRIDGE_ROS1_ENV] + TEST_BRIDGE_ROS1_LISTENER, - name=name + '__ros1listener', - env=env, - output='screen' - ), exit_allowed=True - ) - - launch_test.add_output_test( - launch_description, action, - output_test=create_output_regex_test( - expected_patterns=[re.compile(b'I heard.+')] - ), - output_filter=create_output_lines_filter( - filtered_rmw_implementation=TEST_BRIDGE_RMW, - ), - side_effect='shutdown', - ) - - launch(launch_test, launch_description) - - -def test_dynamic_bridge_srv_1to2(): - name = 'test_dynamic_bridge_srv_1to2' - - launch_description, launch_test = get_default_launch_test_description(name) - - launch_test.add_fixture_action( - launch_description, ExecuteProcess( - cmd=[TEST_BRIDGE_ROS1_ENV, TEST_BRIDGE_ROS1_SERVER], - name=name + '__ros1server', - ) - ) - - launch_test.add_test_action( - launch_description, ExecuteProcess( - cmd=[TEST_BRIDGE_ROS2_CLIENT], - name=name + '__ros2client', - ) - ) - - launch(launch_test, launch_description) - - -def test_dynamic_bridge_srv_2to1(): - name = 'test_dynamic_bridge_srv_2to1' - - launch_description, launch_test = get_default_launch_test_description(name) - - launch_test.add_fixture_action( - launch_description, ExecuteProcess( - cmd=[TEST_BRIDGE_ROS2_SERVER], - name=name + '__ros2server', - ) - ) - - launch_test.add_test_action( - launch_description, ExecuteProcess( - cmd=[TEST_BRIDGE_ROS1_ENV, TEST_BRIDGE_ROS1_CLIENT], - name=name + '__ros1client', - ) - ) - - launch(launch_test, launch_description) + return launch_description, locals() -def launch(launch_test, launch_description): - launch_service = LaunchService() - launch_service.include_launch_description(launch_description) - rc = launch_test.run(launch_service) +class TestServicesAcrossDynamicBridge(unittest.TestCase): + def test_client_process_terminates_after_a_finite_amount_of_time(self, client_process): + """Test that the client executables terminates after a finite amount of time.""" + self.proc_info.assertWaitForShutdown(process=client_process, timeout=30) - assert rc == 0, \ - "The launch file failed with exit code '%s'" % str(rc) +@launch_testing.post_shutdown_test() +class TestServicesAcrossDynamicBridgeAfterShutdown(unittest.TestCase): -if __name__ == '__main__': - test_dynamic_bridge_msg_1to2() - test_dynamic_bridge_msg_2to1() - test_dynamic_bridge_srv_1to2() - test_dynamic_bridge_srv_2to1() + def test_processes_finished_gracefully(self, proc_info, rosbridge_process, + server_process, client_process): + """Test that both executables finished gracefully.""" + launch_testing.asserts.assertExitCodes(proc_info, process=rosbridge_process) + launch_testing.asserts.assertExitCodes(proc_info, process=server_process) + launch_testing.asserts.assertExitCodes(proc_info, process=client_process) diff --git a/test/test_topics_across_dynamic_bridge.py.in b/test/test_topics_across_dynamic_bridge.py.in index d3ad2a45..57954bf6 100644 --- a/test/test_topics_across_dynamic_bridge.py.in +++ b/test/test_topics_across_dynamic_bridge.py.in @@ -15,13 +15,14 @@ import os import re +import unittest + from launch import LaunchDescription -from launch import LaunchService from launch.actions import ExecuteProcess -from launch_testing.legacy import LaunchTestService -from launch_testing.legacy.output import create_output_lines_filter -from launch_testing.legacy.output import create_output_regex_test +from launch.actions import OpaqueFunction +import launch_testing +import launch_testing_ros from ros2run.api import get_executable_path @@ -30,166 +31,76 @@ TEST_BRIDGE_ROS1_ENV = '@TEST_BRIDGE_ROS1_ENV@' TEST_BRIDGE_ROSCORE = '@TEST_BRIDGE_ROSCORE@' TEST_BRIDGE_ROS1_TALKER = ['rosrun', 'roscpp_tutorials', 'talker'] TEST_BRIDGE_ROS1_LISTENER = ['rosrun', 'rospy_tutorials', 'listener'] -TEST_BRIDGE_ROS1_CLIENT = '@TEST_BRIDGE_ROS1_CLIENT@' -TEST_BRIDGE_ROS1_SERVER = '@TEST_BRIDGE_ROS1_SERVER@' TEST_BRIDGE_DYNAMIC_BRIDGE = '@TEST_BRIDGE_DYNAMIC_BRIDGE@' TEST_BRIDGE_ROS2_TALKER = get_executable_path( package_name='demo_nodes_cpp', executable_name='talker') TEST_BRIDGE_ROS2_LISTENER = get_executable_path( package_name='demo_nodes_cpp', executable_name='listener') -TEST_BRIDGE_ROS2_CLIENT = '@TEST_BRIDGE_ROS2_CLIENT@' -TEST_BRIDGE_ROS2_SERVER = '@TEST_BRIDGE_ROS2_SERVER@' TEST_BRIDGE_RMW = '@TEST_BRIDGE_RMW@' -def get_default_launch_test_description(name): +@launch_testing.parametrize('test_name,talker_cmd,listener_cmd', [ + ('ros1_talker_ros2_listener_across_dynamic_bridge', + [TEST_BRIDGE_ROS1_ENV] + TEST_BRIDGE_ROS1_TALKER, + [TEST_BRIDGE_ROS2_LISTENER]), + ('ros2_talker_ros1_listener_across_dynamic_bridge', + [TEST_BRIDGE_ROS2_TALKER], + [TEST_BRIDGE_ROS1_ENV] + TEST_BRIDGE_ROS1_LISTENER), +]) +def generate_test_description(test_name, talker_cmd, listener_cmd, ready_fn): launch_description = LaunchDescription() - launch_test = LaunchTestService() # ROS 1 core - launch_test.add_fixture_action( - launch_description, ExecuteProcess( - cmd=[TEST_BRIDGE_ROS1_ENV, TEST_BRIDGE_ROSCORE], - name=name + '__roscore', - ), exit_allowed=True - ) + launch_description.add_action(ExecuteProcess( + cmd=[TEST_BRIDGE_ROS1_ENV, TEST_BRIDGE_ROSCORE], + name=test_name + '__roscore', + )) # dynamic bridge - launch_test.add_fixture_action( - launch_description, ExecuteProcess( - cmd=[TEST_BRIDGE_ROS1_ENV, TEST_BRIDGE_DYNAMIC_BRIDGE], - name=name + '__dynamic_bridge', - ) - ) - - return launch_description, launch_test - - -def test_dynamic_bridge_msg_1to2(): - name = 'test_dynamic_bridge_msg_1to2' - - launch_description, launch_test = get_default_launch_test_description(name) - - launch_test.add_fixture_action( - launch_description, ExecuteProcess( - cmd=[TEST_BRIDGE_ROS1_ENV] + TEST_BRIDGE_ROS1_TALKER, - name=name + '__ros1talker', - ) + rosbridge_process = ExecuteProcess( + cmd=[TEST_BRIDGE_ROS1_ENV, TEST_BRIDGE_DYNAMIC_BRIDGE], + name=test_name + '__dynamic_bridge', ) + launch_description.add_action(rosbridge_process) - action = launch_test.add_fixture_action( - launch_description, ExecuteProcess( - cmd=[TEST_BRIDGE_ROS2_LISTENER], - name=name + '__ros2listener', - output='screen' - ), exit_allowed=True - ) - - launch_test.add_output_test( - launch_description, action, - output_test=create_output_regex_test( - expected_patterns=[re.compile(b'I heard.+')] - ), - output_filter=create_output_lines_filter( - filtered_rmw_implementation=TEST_BRIDGE_RMW, - ) - ) - - launch(launch_test, launch_description) - - -def test_dynamic_bridge_msg_2to1(): - name = 'test_dynamic_bridge_msg_2to1' - - launch_description, launch_test = get_default_launch_test_description(name) - - launch_test.add_fixture_action( - launch_description, ExecuteProcess( - cmd=[TEST_BRIDGE_ROS2_TALKER], - name=name + '__ros2talker', - ) + talker_process = ExecuteProcess( + cmd=talker_cmd, name=test_name + '__talker', ) + launch_description.add_action(talker_process) env = dict(os.environ) env['PYTHONUNBUFFERED'] = '1' - action = launch_test.add_fixture_action( - launch_description, ExecuteProcess( - cmd=[TEST_BRIDGE_ROS1_ENV] + TEST_BRIDGE_ROS1_LISTENER, - name=name + '__ros1listener', - env=env, - output='screen' - ), exit_allowed=True + listener_process = ExecuteProcess( + cmd=listener_cmd, name=test_name + '__listener', env=env ) + launch_description.add_action(listener_process) - launch_test.add_output_test( - launch_description, action, - output_test=create_output_regex_test( - expected_patterns=[re.compile(b'I heard.+')] - ), - output_filter=create_output_lines_filter( - filtered_rmw_implementation=TEST_BRIDGE_RMW, - ), - side_effect='shutdown', + launch_description.add_action( + OpaqueFunction(function=lambda context: ready_fn()) ) + return launch_description, locals() - launch(launch_test, launch_description) - - -def test_dynamic_bridge_srv_1to2(): - name = 'test_dynamic_bridge_srv_1to2' - launch_description, launch_test = get_default_launch_test_description(name) +class TestTopicsAcrossDynamicBridge(unittest.TestCase): - launch_test.add_fixture_action( - launch_description, ExecuteProcess( - cmd=[TEST_BRIDGE_ROS1_ENV, TEST_BRIDGE_ROS1_SERVER], - name=name + '__ros1server', + def test_listener_output(self, proc_output, listener_process): + output_filter = launch_testing_ros.tools.basic_output_filter( + filtered_rmw_implementation=TEST_BRIDGE_RMW ) - ) - - launch_test.add_test_action( - launch_description, ExecuteProcess( - cmd=[TEST_BRIDGE_ROS2_CLIENT], - name=name + '__ros2client', + proc_output.assertWaitFor( + expected_output=[re.compile('I heard.+')], + process=listener_process, + output_filter=output_filter, + timeout=10 ) - ) - - launch(launch_test, launch_description) - - -def test_dynamic_bridge_srv_2to1(): - name = 'test_dynamic_bridge_srv_2to1' - - launch_description, launch_test = get_default_launch_test_description(name) - - launch_test.add_fixture_action( - launch_description, ExecuteProcess( - cmd=[TEST_BRIDGE_ROS2_SERVER], - name=name + '__ros2server', - ) - ) - - launch_test.add_test_action( - launch_description, ExecuteProcess( - cmd=[TEST_BRIDGE_ROS1_ENV, TEST_BRIDGE_ROS1_CLIENT], - name=name + '__ros1client', - ) - ) - - launch(launch_test, launch_description) - - -def launch(launch_test, launch_description): - launch_service = LaunchService() - launch_service.include_launch_description(launch_description) - rc = launch_test.run(launch_service) - assert rc == 0, \ - "The launch file failed with exit code '%s'" % str(rc) +@launch_testing.post_shutdown_test() +class TestTopicsAcrossDynamicBridgeAfterShutdown(unittest.TestCase): -if __name__ == '__main__': - test_dynamic_bridge_msg_1to2() - test_dynamic_bridge_msg_2to1() - test_dynamic_bridge_srv_1to2() - test_dynamic_bridge_srv_2to1() + def test_processes_finished_gracefully(self, proc_info, rosbridge_process, + talker_process, listener_process): + """Test that both executables finished gracefully.""" + launch_testing.asserts.assertExitCodes(proc_info, process=rosbridge_process) + launch_testing.asserts.assertExitCodes(proc_info, process=talker_process) + launch_testing.asserts.assertExitCodes(proc_info, process=listener_process)