Skip to content

Commit

Permalink
Migrate tests to new launch_testing API.
Browse files Browse the repository at this point in the history
Signed-off-by: Michel Hidalgo <[email protected]>
  • Loading branch information
hidmic committed Apr 23, 2019
1 parent 1dfff37 commit 1c843f0
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 294 deletions.
30 changes: 23 additions & 7 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -220,22 +220,38 @@ endif()

macro(targets)
configure_file(
test/test_dynamic_bridge.py.in
test_dynamic_bridge${target_suffix}.py.genexp
test/test_topics_across_dynamic_bridge.py.in
test_topics_across_dynamic_bridge${target_suffix}.py.genexp
@ONLY
)
file(GENERATE
OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/test_dynamic_bridge${target_suffix}_$<CONFIG>.py"
INPUT "${CMAKE_CURRENT_BINARY_DIR}/test_dynamic_bridge${target_suffix}.py.genexp"
OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/test_topics_across_dynamic_bridge${target_suffix}_$<CONFIG>.py"
INPUT "${CMAKE_CURRENT_BINARY_DIR}/test_topics_across_dynamic_bridge${target_suffix}.py.genexp"
)
ament_add_pytest_test(test_dynamic_bridge${target_suffix}
"${CMAKE_CURRENT_BINARY_DIR}/test_dynamic_bridge${target_suffix}_$<CONFIG>.py"
add_launch_test(
"${CMAKE_CURRENT_BINARY_DIR}/test_topics_across_dynamic_bridge${target_suffix}_$<CONFIG>.py"
TARGET test_topics_across_dynamic_bridge${target_suffix}
ENV RMW_IMPLEMENTAION=${rmw_implementaion}
TIMEOUT 60)

configure_file(
test/test_services_across_dynamic_bridge.py.in
test_services_across_dynamic_bridge${target_suffix}.py.genexp
@ONLY
)
file(GENERATE
OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/test_services_across_dynamic_bridge${target_suffix}_$<CONFIG>.py"
INPUT "${CMAKE_CURRENT_BINARY_DIR}/test_services_across_dynamic_bridge${target_suffix}.py.genexp"
)
add_launch_test(
"${CMAKE_CURRENT_BINARY_DIR}/test_services_across_dynamic_bridge${target_suffix}_$<CONFIG>.py"
TARGET test_services_across_dynamic_bridge${target_suffix}
ENV RMW_IMPLEMENTAION=${rmw_implementaion}
TIMEOUT 60)
endmacro()

if(TEST_ROS1_BRIDGE)
find_package(ament_cmake_pytest REQUIRED)
find_package(launch_testing_ament_cmake REQUIRED)

add_executable(test_ros1_client "test/test_ros1_client.cpp")
ament_target_dependencies(test_ros1_client "ros1_roscpp")
Expand Down
3 changes: 2 additions & 1 deletion package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@
<exec_depend>rcutils</exec_depend>
<exec_depend>std_msgs</exec_depend>

<test_depend>ament_cmake_pytest</test_depend>
<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
<test_depend>demo_nodes_cpp</test_depend>
<test_depend>diagnostic_msgs</test_depend>
<test_depend>launch</test_depend>
<test_depend>launch_testing</test_depend>
<test_depend>launch_testing_ament_cmake</test_depend>
<test_depend>launch_testing_ros</test_depend>
<test_depend>ros2run</test_depend>

<group_depend>rosidl_interface_packages</group_depend>
Expand Down
190 changes: 41 additions & 149 deletions test/test_services_across_dynamic_bridge.py.in
Original file line number Diff line number Diff line change
Expand Up @@ -12,184 +12,76 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import re
import unittest

from launch import LaunchDescription
from launch import LaunchService
from launch.actions import ExecuteProcess
from launch_testing.legacy import LaunchTestService
from launch_testing.legacy.output import create_output_lines_filter
from launch_testing.legacy.output import create_output_regex_test
from launch.actions import OpaqueFunction


from ros2run.api import get_executable_path
import launch_testing


TEST_BRIDGE_ROS1_ENV = '@TEST_BRIDGE_ROS1_ENV@'
TEST_BRIDGE_ROSCORE = '@TEST_BRIDGE_ROSCORE@'
TEST_BRIDGE_ROS1_TALKER = ['rosrun', 'roscpp_tutorials', 'talker']
TEST_BRIDGE_ROS1_LISTENER = ['rosrun', 'rospy_tutorials', 'listener']
TEST_BRIDGE_ROS1_CLIENT = '@TEST_BRIDGE_ROS1_CLIENT@'
TEST_BRIDGE_ROS1_SERVER = '@TEST_BRIDGE_ROS1_SERVER@'
TEST_BRIDGE_DYNAMIC_BRIDGE = '@TEST_BRIDGE_DYNAMIC_BRIDGE@'
TEST_BRIDGE_ROS2_TALKER = get_executable_path(
package_name='demo_nodes_cpp', executable_name='talker')
TEST_BRIDGE_ROS2_LISTENER = get_executable_path(
package_name='demo_nodes_cpp', executable_name='listener')
TEST_BRIDGE_ROS2_CLIENT = '@TEST_BRIDGE_ROS2_CLIENT@'
TEST_BRIDGE_ROS2_SERVER = '@TEST_BRIDGE_ROS2_SERVER@'
TEST_BRIDGE_RMW = '@TEST_BRIDGE_RMW@'


def get_default_launch_test_description(name):
@launch_testing.parametrize('test_name,server_cmd,client_cmd', [
('ros1_server_ros2_client_across_dynamic_bridge',
[TEST_BRIDGE_ROS1_ENV, TEST_BRIDGE_ROS1_SERVER],
[TEST_BRIDGE_ROS2_CLIENT]),
('ros2_server_ros1_client_across_dynamic_bridge',
[TEST_BRIDGE_ROS2_SERVER],
[TEST_BRIDGE_ROS1_ENV, TEST_BRIDGE_ROS1_CLIENT]),
])
def generate_test_description(test_name, server_cmd, client_cmd, ready_fn):
launch_description = LaunchDescription()
launch_test = LaunchTestService()

# ROS 1 core
launch_test.add_fixture_action(
launch_description, ExecuteProcess(
cmd=[TEST_BRIDGE_ROS1_ENV, TEST_BRIDGE_ROSCORE],
name=name + '__roscore',
), exit_allowed=True
)
launch_description.add_action(ExecuteProcess(
cmd=[TEST_BRIDGE_ROS1_ENV, TEST_BRIDGE_ROSCORE],
name=test_name + '__roscore',
))

# dynamic bridge
launch_test.add_fixture_action(
launch_description, ExecuteProcess(
cmd=[TEST_BRIDGE_ROS1_ENV, TEST_BRIDGE_DYNAMIC_BRIDGE],
name=name + '__dynamic_bridge',
)
rosbridge_process = ExecuteProcess(
cmd=[TEST_BRIDGE_ROS1_ENV, TEST_BRIDGE_DYNAMIC_BRIDGE],
name=test_name + '__dynamic_bridge',
)
launch_description.add_action(rosbridge_process)

return launch_description, launch_test


def test_dynamic_bridge_msg_1to2():
name = 'test_dynamic_bridge_msg_1to2'

launch_description, launch_test = get_default_launch_test_description(name)

launch_test.add_fixture_action(
launch_description, ExecuteProcess(
cmd=[TEST_BRIDGE_ROS1_ENV] + TEST_BRIDGE_ROS1_TALKER,
name=name + '__ros1talker',
)
server_process = ExecuteProcess(
cmd=server_cmd, name=test_name + '__server',
)
launch_description.add_action(server_process)

action = launch_test.add_fixture_action(
launch_description, ExecuteProcess(
cmd=[TEST_BRIDGE_ROS2_LISTENER],
name=name + '__ros2listener',
output='screen'
), exit_allowed=True
client_process = ExecuteProcess(
cmd=client_cmd, name=test_name + '__client',
)
launch_description.add_action(client_process)

launch_test.add_output_test(
launch_description, action,
output_test=create_output_regex_test(
expected_patterns=[re.compile(b'I heard.+')]
),
output_filter=create_output_lines_filter(
filtered_rmw_implementation=TEST_BRIDGE_RMW,
)
launch_description.add_action(
OpaqueFunction(function=lambda context: ready_fn())
)

launch(launch_test, launch_description)


def test_dynamic_bridge_msg_2to1():
name = 'test_dynamic_bridge_msg_2to1'

launch_description, launch_test = get_default_launch_test_description(name)

launch_test.add_fixture_action(
launch_description, ExecuteProcess(
cmd=[TEST_BRIDGE_ROS2_TALKER],
name=name + '__ros2talker',
)
)

env = dict(os.environ)
env['PYTHONUNBUFFERED'] = '1'
action = launch_test.add_fixture_action(
launch_description, ExecuteProcess(
cmd=[TEST_BRIDGE_ROS1_ENV] + TEST_BRIDGE_ROS1_LISTENER,
name=name + '__ros1listener',
env=env,
output='screen'
), exit_allowed=True
)

launch_test.add_output_test(
launch_description, action,
output_test=create_output_regex_test(
expected_patterns=[re.compile(b'I heard.+')]
),
output_filter=create_output_lines_filter(
filtered_rmw_implementation=TEST_BRIDGE_RMW,
),
side_effect='shutdown',
)

launch(launch_test, launch_description)


def test_dynamic_bridge_srv_1to2():
name = 'test_dynamic_bridge_srv_1to2'

launch_description, launch_test = get_default_launch_test_description(name)

launch_test.add_fixture_action(
launch_description, ExecuteProcess(
cmd=[TEST_BRIDGE_ROS1_ENV, TEST_BRIDGE_ROS1_SERVER],
name=name + '__ros1server',
)
)

launch_test.add_test_action(
launch_description, ExecuteProcess(
cmd=[TEST_BRIDGE_ROS2_CLIENT],
name=name + '__ros2client',
)
)

launch(launch_test, launch_description)


def test_dynamic_bridge_srv_2to1():
name = 'test_dynamic_bridge_srv_2to1'

launch_description, launch_test = get_default_launch_test_description(name)

launch_test.add_fixture_action(
launch_description, ExecuteProcess(
cmd=[TEST_BRIDGE_ROS2_SERVER],
name=name + '__ros2server',
)
)

launch_test.add_test_action(
launch_description, ExecuteProcess(
cmd=[TEST_BRIDGE_ROS1_ENV, TEST_BRIDGE_ROS1_CLIENT],
name=name + '__ros1client',
)
)

launch(launch_test, launch_description)
return launch_description, locals()


def launch(launch_test, launch_description):
launch_service = LaunchService()
launch_service.include_launch_description(launch_description)
rc = launch_test.run(launch_service)
class TestServicesAcrossDynamicBridge(unittest.TestCase):
def test_client_process_terminates_after_a_finite_amount_of_time(self, client_process):
"""Test that the client executables terminates after a finite amount of time."""
self.proc_info.assertWaitForShutdown(process=client_process, timeout=30)

assert rc == 0, \
"The launch file failed with exit code '%s'" % str(rc)

@launch_testing.post_shutdown_test()
class TestServicesAcrossDynamicBridgeAfterShutdown(unittest.TestCase):

if __name__ == '__main__':
test_dynamic_bridge_msg_1to2()
test_dynamic_bridge_msg_2to1()
test_dynamic_bridge_srv_1to2()
test_dynamic_bridge_srv_2to1()
def test_processes_finished_gracefully(self, proc_info, rosbridge_process,
server_process, client_process):
"""Test that both executables finished gracefully."""
launch_testing.asserts.assertExitCodes(proc_info, process=rosbridge_process)
launch_testing.asserts.assertExitCodes(proc_info, process=server_process)
launch_testing.asserts.assertExitCodes(proc_info, process=client_process)
Loading

0 comments on commit 1c843f0

Please sign in to comment.