From e1dbf82a56b0ea1f317f4a7a9db952fca563ba1e Mon Sep 17 00:00:00 2001 From: Sonia Jin Date: Fri, 24 Sep 2021 10:13:51 -0700 Subject: [PATCH] adds keyboard controls for ros2 bag play Signed-off-by: Sonia Jin --- ros2bag/ros2bag/verb/play.py | 4 + rosbag2_py/src/rosbag2_py/_transport.cpp | 1 + rosbag2_transport/CMakeLists.txt | 8 + .../rosbag2_transport/play_options.hpp | 6 + .../include/rosbag2_transport/player.hpp | 29 ++- rosbag2_transport/package.xml | 1 + .../src/rosbag2_transport/player.cpp | 95 ++++++++-- .../mock_keyboard_handler.hpp | 46 +++++ .../test_keyboard_controls.cpp | 167 ++++++++++++++++++ 9 files changed, 342 insertions(+), 15 deletions(-) create mode 100644 rosbag2_transport/test/rosbag2_transport/mock_keyboard_handler.hpp create mode 100644 rosbag2_transport/test/rosbag2_transport/test_keyboard_controls.cpp diff --git a/ros2bag/ros2bag/verb/play.py b/ros2bag/ros2bag/verb/play.py index 3b2b8d772a..a6f7bd4077 100644 --- a/ros2bag/ros2bag/verb/play.py +++ b/ros2bag/ros2bag/verb/play.py @@ -85,6 +85,9 @@ def add_arguments(self, parser, cli_name): # noqa: D102 parser.add_argument( '-d', '--delay', type=positive_float, default=0.0, help='Sleep duration before play (each loop), in seconds. Negative durations invalid.') + parser.add_argument( + '--disable-keyboard-controls', action='store_true', + help='disables keyboard controls for playback') def main(self, *, args): # noqa: D102 qos_profile_overrides = {} # Specify a valid default @@ -120,6 +123,7 @@ def main(self, *, args): # noqa: D102 play_options.topic_remapping_options = topic_remapping play_options.clock_publish_frequency = args.clock play_options.delay = args.delay + play_options.disable_keyboard_controls = args.disable_keyboard_controls player = Player() player.play(storage_options, play_options) diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index 6d666393ef..2a7bac6552 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -246,6 +246,7 @@ PYBIND11_MODULE(_transport, m) { "delay", &PlayOptions::getDelay, &PlayOptions::setDelay) + .def_readwrite("disable_keyboard_controls", &PlayOptions::disable_keyboard_controls) ; py::class_(m, "RecordOptions") diff --git a/rosbag2_transport/CMakeLists.txt b/rosbag2_transport/CMakeLists.txt index 818daf1048..77fcfe8cf9 100644 --- a/rosbag2_transport/CMakeLists.txt +++ b/rosbag2_transport/CMakeLists.txt @@ -22,6 +22,7 @@ endif() find_package(ament_cmake REQUIRED) find_package(ament_cmake_ros REQUIRED) +find_package(keyboard_handler REQUIRED) find_package(rcl REQUIRED) find_package(rclcpp REQUIRED) find_package(rcutils REQUIRED) @@ -43,6 +44,7 @@ target_include_directories(${PROJECT_NAME} PUBLIC $ $) ament_target_dependencies(${PROJECT_NAME} + keyboard_handler rcl rclcpp rcutils @@ -75,6 +77,7 @@ ament_export_include_directories(include) ament_export_libraries(${PROJECT_NAME}) ament_export_targets(export_${PROJECT_NAME}) ament_export_dependencies( + keyboard_handler rosbag2_cpp rosbag2_compression rosbag2_interfaces @@ -161,6 +164,11 @@ function(create_tests_for_rmw_implementation) LINK_LIBS rosbag2_transport AMENT_DEPS test_msgs rosbag2_test_common) + rosbag2_transport_add_gmock(test_keyboard_controls + test/rosbag2_transport/test_keyboard_controls.cpp + LINK_LIBS rosbag2_transport + AMENT_DEPS test_msgs rosbag2_test_common) + rosbag2_transport_add_gmock(test_record_regex test/rosbag2_transport/test_record_regex.cpp LINK_LIBS rosbag2_transport diff --git a/rosbag2_transport/include/rosbag2_transport/play_options.hpp b/rosbag2_transport/include/rosbag2_transport/play_options.hpp index 2eb62c8835..f8dc1690d6 100644 --- a/rosbag2_transport/include/rosbag2_transport/play_options.hpp +++ b/rosbag2_transport/include/rosbag2_transport/play_options.hpp @@ -20,6 +20,7 @@ #include #include +#include "keyboard_handler/keyboard_handler.hpp" #include "rclcpp/duration.hpp" #include "rclcpp/qos.hpp" @@ -48,6 +49,11 @@ struct PlayOptions // Sleep before play. Negative durations invalid. Will delay at the beginning of each loop. rclcpp::Duration delay = rclcpp::Duration(0, 0); + + bool disable_keyboard_controls = false; + // keybindings + KeyboardHandler::KeyCode pause_resume_toggle_key = KeyboardHandler::KeyCode::SPACE; + KeyboardHandler::KeyCode play_next_key = KeyboardHandler::KeyCode::CURSOR_RIGHT; }; } // namespace rosbag2_transport diff --git a/rosbag2_transport/include/rosbag2_transport/player.hpp b/rosbag2_transport/include/rosbag2_transport/player.hpp index 390a1b61fb..bad64e8836 100644 --- a/rosbag2_transport/include/rosbag2_transport/player.hpp +++ b/rosbag2_transport/include/rosbag2_transport/player.hpp @@ -21,6 +21,9 @@ #include #include #include +#include + +#include "keyboard_handler/keyboard_handler.hpp" #include "moodycamel/readerwriterqueue.h" @@ -75,6 +78,15 @@ class Player : public rclcpp::Node const std::string & node_name = "rosbag2_player", const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions()); + ROSBAG2_TRANSPORT_PUBLIC + Player( + std::unique_ptr reader, + std::shared_ptr keyboard_handler, + const rosbag2_storage::StorageOptions & storage_options, + const rosbag2_transport::PlayOptions & play_options, + const std::string & node_name = "rosbag2_player", + const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions()); + ROSBAG2_TRANSPORT_PUBLIC virtual ~Player(); @@ -87,11 +99,11 @@ class Player : public rclcpp::Node // Playback control interface /// Pause the flow of time for playback. ROSBAG2_TRANSPORT_PUBLIC - void pause(); + virtual void pause(); /// Start the flow of time for playback. ROSBAG2_TRANSPORT_PUBLIC - void resume(); + virtual void resume(); /// Pause if time running, resume if paused. ROSBAG2_TRANSPORT_PUBLIC @@ -117,7 +129,7 @@ class Player : public rclcpp::Node /// this method will wait until new element will be pushed to the queue. /// \return true if player in pause mode and successfully played next message, otherwise false. ROSBAG2_TRANSPORT_PUBLIC - bool play_next(); + virtual bool play_next(); /// \brief Advance player to the message with closest timestamp >= time_point. /// \details This is blocking call and it will wait until current message will be published @@ -154,6 +166,13 @@ class Player : public rclcpp::Node std::mutex reader_mutex_; std::unique_ptr reader_ RCPPUTILS_TSA_GUARDED_BY(reader_mutex_); + + void add_key_callback( + KeyboardHandler::KeyCode key, + const std::function & cb, + const std::string & op_name); + void add_keyboard_callbacks(); + rosbag2_storage::StorageOptions storage_options_; rosbag2_transport::PlayOptions play_options_; moodycamel::ReaderWriterQueue message_queue_; @@ -172,6 +191,10 @@ class Player : public rclcpp::Node rclcpp::Service::SharedPtr srv_get_rate_; rclcpp::Service::SharedPtr srv_set_rate_; rclcpp::Service::SharedPtr srv_play_next_; + + // defaults + std::shared_ptr keyboard_handler_; + std::vector keyboard_callbacks_; }; } // namespace rosbag2_transport diff --git a/rosbag2_transport/package.xml b/rosbag2_transport/package.xml index 697d7fdd97..6c77e8bf45 100644 --- a/rosbag2_transport/package.xml +++ b/rosbag2_transport/package.xml @@ -20,6 +20,7 @@ rmw shared_queues_vendor yaml_cpp_vendor + keyboard_handler ament_cmake_gmock ament_index_cpp diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index d60e2c9894..0233f90833 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -113,11 +113,26 @@ Player::Player( const rosbag2_transport::PlayOptions & play_options, const std::string & node_name, const rclcpp::NodeOptions & node_options) +: Player(std::move(reader), + // only call KeyboardHandler when using default keyboard handler implementation + std::shared_ptr(new KeyboardHandler()), + storage_options, play_options, + node_name, node_options) +{} + +Player::Player( + std::unique_ptr reader, + std::shared_ptr keyboard_handler, + const rosbag2_storage::StorageOptions & storage_options, + const rosbag2_transport::PlayOptions & play_options, + const std::string & node_name, + const rclcpp::NodeOptions & node_options) : rclcpp::Node( node_name, rclcpp::NodeOptions(node_options).arguments(play_options.topic_remapping_options)), storage_options_(storage_options), - play_options_(play_options) + play_options_(play_options), + keyboard_handler_(keyboard_handler) { { std::lock_guard lk(reader_mutex_); @@ -133,7 +148,7 @@ Player::Player( reader_->close(); } - + // service callbacks srv_pause_ = create_service( "~/pause", [this]( @@ -197,10 +212,18 @@ Player::Player( { response->success = play_next(); }); + // keyboard callbacks + add_keyboard_callbacks(); } Player::~Player() { + // remove callbacks on key_codes to prevent race conditions + // Note: keyboard_handler handles locks between removing & executing callbacks + for (auto cb_handle : keyboard_callbacks_) { + keyboard_handler_->delete_key_press_callback(cb_handle); + } + // closes reader std::lock_guard lk(reader_mutex_); if (reader_) { reader_->close(); @@ -234,14 +257,14 @@ void Player::play() delay = play_options_.delay; } else { RCLCPP_WARN_STREAM( - this->get_logger(), + get_logger(), "Invalid delay value: " << play_options_.delay.nanoseconds() << ". Delay is disabled."); } try { do { if (delay > rclcpp::Duration(0, 0)) { - RCLCPP_INFO_STREAM(this->get_logger(), "Sleep " << delay.nanoseconds() << " ns"); + RCLCPP_INFO_STREAM(get_logger(), "Sleep " << delay.nanoseconds() << " ns"); std::chrono::nanoseconds duration(delay.nanoseconds()); std::this_thread::sleep_for(duration); } @@ -267,7 +290,7 @@ void Player::play() reader_->close(); } while (rclcpp::ok() && play_options_.loop); } catch (std::runtime_error & e) { - RCLCPP_ERROR(this->get_logger(), "Failed to play: %s", e.what()); + RCLCPP_ERROR(get_logger(), "Failed to play: %s", e.what()); } std::lock_guard lk(ready_to_play_from_queue_mutex_); is_ready_to_play_from_queue_ = false; @@ -277,16 +300,18 @@ void Player::play() void Player::pause() { clock_->pause(); + RCLCPP_INFO_STREAM(get_logger(), "Pausing play."); } void Player::resume() { clock_->resume(); + RCLCPP_INFO_STREAM(get_logger(), "Resuming play."); } void Player::toggle_paused() { - clock_->is_paused() ? clock_->resume() : clock_->pause(); + is_paused() ? resume() : pause(); } bool Player::is_paused() const @@ -309,7 +334,7 @@ rosbag2_storage::SerializedBagMessageSharedPtr * Player::peek_next_message_from_ rosbag2_storage::SerializedBagMessageSharedPtr * message_ptr = message_queue_.peek(); if (message_ptr == nullptr && !is_storage_completely_loaded() && rclcpp::ok()) { RCLCPP_WARN( - this->get_logger(), + get_logger(), "Message queue starved. Messages will be delayed. Consider " "increasing the --read-ahead-queue-size option."); while (message_ptr == nullptr && !is_storage_completely_loaded() && rclcpp::ok()) { @@ -328,9 +353,12 @@ rosbag2_storage::SerializedBagMessageSharedPtr * Player::peek_next_message_from_ bool Player::play_next() { if (!clock_->is_paused()) { + RCLCPP_WARN_STREAM(get_logger(), "Called play next, but not in paused state."); return false; } + RCLCPP_INFO_STREAM(get_logger(), "Playing next message."); + // Temporary take over playback from play_messages_from_queue() std::lock_guard main_play_loop_lk(skip_message_in_main_play_loop_mutex_); skip_message_in_main_play_loop_ = true; @@ -573,9 +601,9 @@ void Player::prepare_publishers() static_cast(RCUTILS_S_TO_NS(1) / play_options_.clock_publish_frequency)); // NOTE: PlayerClock does not own this publisher because rosbag2_cpp // should not own transport-based functionality - clock_publisher_ = this->create_publisher( + clock_publisher_ = create_publisher( "/clock", rclcpp::ClockQoS()); - clock_publish_timer_ = this->create_wall_timer( + clock_publish_timer_ = create_wall_timer( publish_period, [this]() { auto msg = rosgraph_msgs::msg::Clock(); msg.clock = rclcpp::Time(clock_->now()); @@ -600,16 +628,16 @@ void Player::prepare_publishers() auto topic_qos = publisher_qos_for_topic( topic, topic_qos_profile_overrides_, - this->get_logger()); + get_logger()); try { publishers_.insert( std::make_pair( - topic.name, this->create_generic_publisher(topic.name, topic.type, topic_qos))); + topic.name, create_generic_publisher(topic.name, topic.type, topic_qos))); } catch (const std::runtime_error & e) { // using a warning log seems better than adding a new option // to ignore some unknown message type library RCLCPP_WARN( - this->get_logger(), + get_logger(), "Ignoring a topic '%s', reason: %s.", topic.name.c_str(), e.what()); } } @@ -626,4 +654,47 @@ bool Player::publish_message(rosbag2_storage::SerializedBagMessageSharedPtr mess return message_published; } +void Player::add_key_callback( + KeyboardHandler::KeyCode key, + const std::function & cb, + const std::string & op_name) +{ + std::string key_str = enum_key_code_to_str(key); + if (key == KeyboardHandler::KeyCode::UNKNOWN) { + RCLCPP_ERROR_STREAM( + get_logger(), + "Invalid key binding " << key_str << " for " << op_name); + throw std::invalid_argument("Invalid key binding."); + } + keyboard_callbacks_.push_back( + keyboard_handler_->add_key_press_callback( + [cb](KeyboardHandler::KeyCode /*key_code*/, + KeyboardHandler::KeyModifiers /*key_modifiers*/) {cb();}, + key)); + // show instructions + RCLCPP_INFO_STREAM( + get_logger(), + "Press " << key_str << " for " << op_name); +} + +void Player::add_keyboard_callbacks() +{ + // skip if disabled + if (play_options_.disable_keyboard_controls) { + return; + } + RCLCPP_INFO_STREAM(get_logger(), "Adding keyboard callbacks."); + // check keybindings + add_key_callback( + play_options_.pause_resume_toggle_key, + [this]() {toggle_paused();}, + "Pause/Resume" + ); + add_key_callback( + play_options_.play_next_key, + [this]() {play_next();}, + "Play Next Message" + ); +} + } // namespace rosbag2_transport diff --git a/rosbag2_transport/test/rosbag2_transport/mock_keyboard_handler.hpp b/rosbag2_transport/test/rosbag2_transport/mock_keyboard_handler.hpp new file mode 100644 index 0000000000..4e94a5321c --- /dev/null +++ b/rosbag2_transport/test/rosbag2_transport/mock_keyboard_handler.hpp @@ -0,0 +1,46 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef ROSBAG2_TRANSPORT__MOCK_KEYBOARD_HANDLER_HPP_ +#define ROSBAG2_TRANSPORT__MOCK_KEYBOARD_HANDLER_HPP_ + +#include +#include +#include +#include + +#include "keyboard_handler/keyboard_handler.hpp" + +// press one specific key code one second at a time +class MockKeyboardHandler : public KeyboardHandler +{ +public: + MockKeyboardHandler() + {} + + void press_key(KeyboardHandler::KeyCode key_code) + { + std::lock_guard lk(callbacks_mutex_); + KeyboardHandler::KeyModifiers key_modifiers = KeyboardHandler::KeyModifiers::NONE; + auto range = callbacks_.equal_range(KeyAndModifiers{key_code, key_modifiers}); + for (auto it = range.first; it != range.second; ++it) { + it->second.callback(key_code, key_modifiers); + } + } + + virtual ~MockKeyboardHandler() + {} +}; + +#endif // ROSBAG2_TRANSPORT__MOCK_KEYBOARD_HANDLER_HPP_ diff --git a/rosbag2_transport/test/rosbag2_transport/test_keyboard_controls.cpp b/rosbag2_transport/test/rosbag2_transport/test_keyboard_controls.cpp new file mode 100644 index 0000000000..2c6af5132d --- /dev/null +++ b/rosbag2_transport/test/rosbag2_transport/test_keyboard_controls.cpp @@ -0,0 +1,167 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include +#include +#include +#include + +#include "rclcpp/rclcpp.hpp" + +#include "rosbag2_test_common/subscription_manager.hpp" + +#include "rosbag2_transport/player.hpp" + +#include "test_msgs/msg/arrays.hpp" +#include "test_msgs/msg/basic_types.hpp" +#include "test_msgs/message_fixtures.hpp" + +#include "rosbag2_play_test_fixture.hpp" +#include "rosbag2_transport_test_fixture.hpp" +#include "mock_keyboard_handler.hpp" + +using namespace ::testing; // NOLINT +using namespace rosbag2_transport; // NOLINT +using namespace std::chrono_literals; // NOLINT +using namespace rosbag2_test_common; // NOLINT + +// wrapper player that counts command calls +class CountPlayer : public Player +{ +public: + CountPlayer( + std::unique_ptr reader, + std::shared_ptr keyboard_handler, + const rosbag2_storage::StorageOptions & storage_options, + const rosbag2_transport::PlayOptions & play_options) + : Player(std::move(reader), keyboard_handler, storage_options, play_options) {} + + void pause() override + { + Player::pause(); + num_paused++; + } + + void resume() override + { + Player::resume(); + num_resumed++; + } + + bool play_next() override + { + bool ret = Player::play_next(); + if (ret) { + num_played_next++; + } + return ret; + } + + int num_paused = 0; + int num_resumed = 0; + int num_played_next = 0; +}; + + +TEST_F(RosBag2PlayTestFixture, invalid_keybindings) +{ + auto primitive_message1 = get_messages_strings()[0]; + primitive_message1->string_value = "Hello World 1"; + + auto primitive_message2 = get_messages_strings()[0]; + primitive_message2->string_value = "Hello World 2"; + + auto message_time_difference = rclcpp::Duration(1, 0); + auto topics_and_types = + std::vector{{"topic1", "test_msgs/Strings", "", ""}}; + std::vector> messages = + {serialize_test_message("topic1", 0, primitive_message1), + serialize_test_message("topic1", 0, primitive_message2)}; + + messages[0]->time_stamp = 100; + messages[1]->time_stamp = messages[0]->time_stamp + message_time_difference.nanoseconds(); + + play_options_.play_next_key = KeyboardHandler::KeyCode::UNKNOWN; + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topics_and_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + + EXPECT_THROW( + std::make_shared( + std::move(reader), storage_options_, play_options_), + std::invalid_argument + ); +} + +TEST_F(RosBag2PlayTestFixture, test_keyboard_controls) +{ + auto primitive_message1 = get_messages_strings()[0]; + primitive_message1->string_value = "Hello World 1"; + + auto primitive_message2 = get_messages_strings()[0]; + primitive_message2->string_value = "Hello World 2"; + + auto primitive_message3 = get_messages_strings()[0]; + primitive_message3->string_value = "Hello World 3"; + + auto message_time_difference = rclcpp::Duration(1, 0); + auto topics_and_types = + std::vector{{"topic1", "test_msgs/Strings", "", ""}}; + std::vector> messages = + {serialize_test_message("topic1", 0, primitive_message1), + serialize_test_message("topic1", 0, primitive_message2), + serialize_test_message("topic1", 0, primitive_message3)}; + + messages[0]->time_stamp = 100; + messages[1]->time_stamp = messages[0]->time_stamp + message_time_difference.nanoseconds(); + messages[2]->time_stamp = messages[1]->time_stamp + message_time_difference.nanoseconds(); + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topics_and_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + auto keyboard_handler = new MockKeyboardHandler(); + + // play bag in thread + auto player = std::make_shared( + std::move(reader), std::shared_ptr(keyboard_handler), + storage_options_, play_options_); + + // start in pause mode + EXPECT_THAT(player->is_paused(), false); + keyboard_handler->press_key(play_options_.pause_resume_toggle_key); + EXPECT_THAT(player->is_paused(), true); + + // start play thread + std::thread player_thread = std::thread([player]() {player->play();}); + + // play next + keyboard_handler->press_key(play_options_.play_next_key); + EXPECT_THAT(player->is_paused(), true); + // resume + keyboard_handler->press_key(play_options_.pause_resume_toggle_key); + EXPECT_THAT(player->is_paused(), false); + + if (player_thread.joinable()) { + player_thread.join(); + } + + EXPECT_THAT(player->num_paused, 1); + EXPECT_THAT(player->num_resumed, 1); + EXPECT_THAT(player->num_played_next, 1); +}