Skip to content

Commit

Permalink
keyboard controls for pause/resume toggle and play-next: (#847)
Browse files Browse the repository at this point in the history
* adds keyboard controls for ros2 bag play

Signed-off-by: Sonia Jin <[email protected]>
Co-authored-by: Emerson Knapp <[email protected]>
  • Loading branch information
lihui815 and Emerson Knapp authored Sep 27, 2021
1 parent 2376b49 commit 6173fe1
Show file tree
Hide file tree
Showing 9 changed files with 354 additions and 15 deletions.
4 changes: 4 additions & 0 deletions ros2bag/ros2bag/verb/play.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ def add_arguments(self, parser, cli_name): # noqa: D102
parser.add_argument(
'-d', '--delay', type=positive_float, default=0.0,
help='Sleep duration before play (each loop), in seconds. Negative durations invalid.')
parser.add_argument(
'--disable-keyboard-controls', action='store_true',
help='disables keyboard controls for playback')

def main(self, *, args): # noqa: D102
qos_profile_overrides = {} # Specify a valid default
Expand Down Expand Up @@ -120,6 +123,7 @@ def main(self, *, args): # noqa: D102
play_options.topic_remapping_options = topic_remapping
play_options.clock_publish_frequency = args.clock
play_options.delay = args.delay
play_options.disable_keyboard_controls = args.disable_keyboard_controls

player = Player()
player.play(storage_options, play_options)
1 change: 1 addition & 0 deletions rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ PYBIND11_MODULE(_transport, m) {
"delay",
&PlayOptions::getDelay,
&PlayOptions::setDelay)
.def_readwrite("disable_keyboard_controls", &PlayOptions::disable_keyboard_controls)
;

py::class_<RecordOptions>(m, "RecordOptions")
Expand Down
8 changes: 8 additions & 0 deletions rosbag2_transport/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ endif()

find_package(ament_cmake REQUIRED)
find_package(ament_cmake_ros REQUIRED)
find_package(keyboard_handler REQUIRED)
find_package(rcl REQUIRED)
find_package(rclcpp REQUIRED)
find_package(rcutils REQUIRED)
Expand All @@ -43,6 +44,7 @@ target_include_directories(${PROJECT_NAME} PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>)
ament_target_dependencies(${PROJECT_NAME}
keyboard_handler
rcl
rclcpp
rcutils
Expand Down Expand Up @@ -75,6 +77,7 @@ ament_export_include_directories(include)
ament_export_libraries(${PROJECT_NAME})
ament_export_targets(export_${PROJECT_NAME})
ament_export_dependencies(
keyboard_handler
rosbag2_cpp
rosbag2_compression
rosbag2_interfaces
Expand Down Expand Up @@ -161,6 +164,11 @@ function(create_tests_for_rmw_implementation)
LINK_LIBS rosbag2_transport
AMENT_DEPS test_msgs rosbag2_test_common)

rosbag2_transport_add_gmock(test_keyboard_controls
test/rosbag2_transport/test_keyboard_controls.cpp
LINK_LIBS rosbag2_transport
AMENT_DEPS test_msgs rosbag2_test_common)

rosbag2_transport_add_gmock(test_record_regex
test/rosbag2_transport/test_record_regex.cpp
LINK_LIBS rosbag2_transport
Expand Down
6 changes: 6 additions & 0 deletions rosbag2_transport/include/rosbag2_transport/play_options.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <unordered_map>
#include <vector>

#include "keyboard_handler/keyboard_handler.hpp"
#include "rclcpp/duration.hpp"
#include "rclcpp/qos.hpp"

Expand Down Expand Up @@ -48,6 +49,11 @@ struct PlayOptions

// Sleep before play. Negative durations invalid. Will delay at the beginning of each loop.
rclcpp::Duration delay = rclcpp::Duration(0, 0);

bool disable_keyboard_controls = false;
// keybindings
KeyboardHandler::KeyCode pause_resume_toggle_key = KeyboardHandler::KeyCode::SPACE;
KeyboardHandler::KeyCode play_next_key = KeyboardHandler::KeyCode::CURSOR_RIGHT;
};

} // namespace rosbag2_transport
Expand Down
29 changes: 26 additions & 3 deletions rosbag2_transport/include/rosbag2_transport/player.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
#include <queue>
#include <string>
#include <unordered_map>
#include <vector>

#include "keyboard_handler/keyboard_handler.hpp"

#include "moodycamel/readerwriterqueue.h"

Expand Down Expand Up @@ -75,6 +78,15 @@ class Player : public rclcpp::Node
const std::string & node_name = "rosbag2_player",
const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions());

ROSBAG2_TRANSPORT_PUBLIC
Player(
std::unique_ptr<rosbag2_cpp::Reader> reader,
std::shared_ptr<KeyboardHandler> keyboard_handler,
const rosbag2_storage::StorageOptions & storage_options,
const rosbag2_transport::PlayOptions & play_options,
const std::string & node_name = "rosbag2_player",
const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions());

ROSBAG2_TRANSPORT_PUBLIC
virtual ~Player();

Expand All @@ -87,11 +99,11 @@ class Player : public rclcpp::Node
// Playback control interface
/// Pause the flow of time for playback.
ROSBAG2_TRANSPORT_PUBLIC
void pause();
virtual void pause();

/// Start the flow of time for playback.
ROSBAG2_TRANSPORT_PUBLIC
void resume();
virtual void resume();

/// Pause if time running, resume if paused.
ROSBAG2_TRANSPORT_PUBLIC
Expand All @@ -117,7 +129,7 @@ class Player : public rclcpp::Node
/// this method will wait until new element will be pushed to the queue.
/// \return true if player in pause mode and successfully played next message, otherwise false.
ROSBAG2_TRANSPORT_PUBLIC
bool play_next();
virtual bool play_next();

/// \brief Advance player to the message with closest timestamp >= time_point.
/// \details This is blocking call and it will wait until current message will be published
Expand Down Expand Up @@ -154,6 +166,13 @@ class Player : public rclcpp::Node

std::mutex reader_mutex_;
std::unique_ptr<rosbag2_cpp::Reader> reader_ RCPPUTILS_TSA_GUARDED_BY(reader_mutex_);

void add_key_callback(
KeyboardHandler::KeyCode key,
const std::function<void()> & cb,
const std::string & op_name);
void add_keyboard_callbacks();

rosbag2_storage::StorageOptions storage_options_;
rosbag2_transport::PlayOptions play_options_;
moodycamel::ReaderWriterQueue<rosbag2_storage::SerializedBagMessageSharedPtr> message_queue_;
Expand All @@ -172,6 +191,10 @@ class Player : public rclcpp::Node
rclcpp::Service<rosbag2_interfaces::srv::GetRate>::SharedPtr srv_get_rate_;
rclcpp::Service<rosbag2_interfaces::srv::SetRate>::SharedPtr srv_set_rate_;
rclcpp::Service<rosbag2_interfaces::srv::PlayNext>::SharedPtr srv_play_next_;

// defaults
std::shared_ptr<KeyboardHandler> keyboard_handler_;
std::vector<KeyboardHandler::callback_handle_t> keyboard_callbacks_;
};

} // namespace rosbag2_transport
Expand Down
1 change: 1 addition & 0 deletions rosbag2_transport/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
<depend>rmw</depend>
<depend>shared_queues_vendor</depend>
<depend>yaml_cpp_vendor</depend>
<depend>keyboard_handler</depend>

<test_depend>ament_cmake_gmock</test_depend>
<test_depend>ament_index_cpp</test_depend>
Expand Down
95 changes: 83 additions & 12 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,26 @@ Player::Player(
const rosbag2_transport::PlayOptions & play_options,
const std::string & node_name,
const rclcpp::NodeOptions & node_options)
: Player(std::move(reader),
// only call KeyboardHandler when using default keyboard handler implementation
std::shared_ptr<KeyboardHandler>(new KeyboardHandler()),
storage_options, play_options,
node_name, node_options)
{}

Player::Player(
std::unique_ptr<rosbag2_cpp::Reader> reader,
std::shared_ptr<KeyboardHandler> keyboard_handler,
const rosbag2_storage::StorageOptions & storage_options,
const rosbag2_transport::PlayOptions & play_options,
const std::string & node_name,
const rclcpp::NodeOptions & node_options)
: rclcpp::Node(
node_name,
rclcpp::NodeOptions(node_options).arguments(play_options.topic_remapping_options)),
storage_options_(storage_options),
play_options_(play_options)
play_options_(play_options),
keyboard_handler_(keyboard_handler)
{
{
std::lock_guard<std::mutex> lk(reader_mutex_);
Expand All @@ -133,7 +148,7 @@ Player::Player(

reader_->close();
}

// service callbacks
srv_pause_ = create_service<rosbag2_interfaces::srv::Pause>(
"~/pause",
[this](
Expand Down Expand Up @@ -197,10 +212,18 @@ Player::Player(
{
response->success = play_next();
});
// keyboard callbacks
add_keyboard_callbacks();
}

Player::~Player()
{
// remove callbacks on key_codes to prevent race conditions
// Note: keyboard_handler handles locks between removing & executing callbacks
for (auto cb_handle : keyboard_callbacks_) {
keyboard_handler_->delete_key_press_callback(cb_handle);
}
// closes reader
std::lock_guard<std::mutex> lk(reader_mutex_);
if (reader_) {
reader_->close();
Expand Down Expand Up @@ -234,14 +257,14 @@ void Player::play()
delay = play_options_.delay;
} else {
RCLCPP_WARN_STREAM(
this->get_logger(),
get_logger(),
"Invalid delay value: " << play_options_.delay.nanoseconds() << ". Delay is disabled.");
}

try {
do {
if (delay > rclcpp::Duration(0, 0)) {
RCLCPP_INFO_STREAM(this->get_logger(), "Sleep " << delay.nanoseconds() << " ns");
RCLCPP_INFO_STREAM(get_logger(), "Sleep " << delay.nanoseconds() << " ns");
std::chrono::nanoseconds duration(delay.nanoseconds());
std::this_thread::sleep_for(duration);
}
Expand All @@ -267,7 +290,7 @@ void Player::play()
reader_->close();
} while (rclcpp::ok() && play_options_.loop);
} catch (std::runtime_error & e) {
RCLCPP_ERROR(this->get_logger(), "Failed to play: %s", e.what());
RCLCPP_ERROR(get_logger(), "Failed to play: %s", e.what());
}
std::lock_guard<std::mutex> lk(ready_to_play_from_queue_mutex_);
is_ready_to_play_from_queue_ = false;
Expand All @@ -277,16 +300,18 @@ void Player::play()
void Player::pause()
{
clock_->pause();
RCLCPP_INFO_STREAM(get_logger(), "Pausing play.");
}

void Player::resume()
{
clock_->resume();
RCLCPP_INFO_STREAM(get_logger(), "Resuming play.");
}

void Player::toggle_paused()
{
clock_->is_paused() ? clock_->resume() : clock_->pause();
is_paused() ? resume() : pause();
}

bool Player::is_paused() const
Expand All @@ -309,7 +334,7 @@ rosbag2_storage::SerializedBagMessageSharedPtr * Player::peek_next_message_from_
rosbag2_storage::SerializedBagMessageSharedPtr * message_ptr = message_queue_.peek();
if (message_ptr == nullptr && !is_storage_completely_loaded() && rclcpp::ok()) {
RCLCPP_WARN(
this->get_logger(),
get_logger(),
"Message queue starved. Messages will be delayed. Consider "
"increasing the --read-ahead-queue-size option.");
while (message_ptr == nullptr && !is_storage_completely_loaded() && rclcpp::ok()) {
Expand All @@ -328,9 +353,12 @@ rosbag2_storage::SerializedBagMessageSharedPtr * Player::peek_next_message_from_
bool Player::play_next()
{
if (!clock_->is_paused()) {
RCLCPP_WARN_STREAM(get_logger(), "Called play next, but not in paused state.");
return false;
}

RCLCPP_INFO_STREAM(get_logger(), "Playing next message.");

// Temporary take over playback from play_messages_from_queue()
std::lock_guard<std::mutex> main_play_loop_lk(skip_message_in_main_play_loop_mutex_);
skip_message_in_main_play_loop_ = true;
Expand Down Expand Up @@ -573,9 +601,9 @@ void Player::prepare_publishers()
static_cast<uint64_t>(RCUTILS_S_TO_NS(1) / play_options_.clock_publish_frequency));
// NOTE: PlayerClock does not own this publisher because rosbag2_cpp
// should not own transport-based functionality
clock_publisher_ = this->create_publisher<rosgraph_msgs::msg::Clock>(
clock_publisher_ = create_publisher<rosgraph_msgs::msg::Clock>(
"/clock", rclcpp::ClockQoS());
clock_publish_timer_ = this->create_wall_timer(
clock_publish_timer_ = create_wall_timer(
publish_period, [this]() {
auto msg = rosgraph_msgs::msg::Clock();
msg.clock = rclcpp::Time(clock_->now());
Expand All @@ -600,16 +628,16 @@ void Player::prepare_publishers()

auto topic_qos = publisher_qos_for_topic(
topic, topic_qos_profile_overrides_,
this->get_logger());
get_logger());
try {
publishers_.insert(
std::make_pair(
topic.name, this->create_generic_publisher(topic.name, topic.type, topic_qos)));
topic.name, create_generic_publisher(topic.name, topic.type, topic_qos)));
} catch (const std::runtime_error & e) {
// using a warning log seems better than adding a new option
// to ignore some unknown message type library
RCLCPP_WARN(
this->get_logger(),
get_logger(),
"Ignoring a topic '%s', reason: %s.", topic.name.c_str(), e.what());
}
}
Expand All @@ -626,4 +654,47 @@ bool Player::publish_message(rosbag2_storage::SerializedBagMessageSharedPtr mess
return message_published;
}

void Player::add_key_callback(
KeyboardHandler::KeyCode key,
const std::function<void()> & cb,
const std::string & op_name)
{
std::string key_str = enum_key_code_to_str(key);
if (key == KeyboardHandler::KeyCode::UNKNOWN) {
RCLCPP_ERROR_STREAM(
get_logger(),
"Invalid key binding " << key_str << " for " << op_name);
throw std::invalid_argument("Invalid key binding.");
}
keyboard_callbacks_.push_back(
keyboard_handler_->add_key_press_callback(
[cb](KeyboardHandler::KeyCode /*key_code*/,
KeyboardHandler::KeyModifiers /*key_modifiers*/) {cb();},
key));
// show instructions
RCLCPP_INFO_STREAM(
get_logger(),
"Press " << key_str << " for " << op_name);
}

void Player::add_keyboard_callbacks()
{
// skip if disabled
if (play_options_.disable_keyboard_controls) {
return;
}
RCLCPP_INFO_STREAM(get_logger(), "Adding keyboard callbacks.");
// check keybindings
add_key_callback(
play_options_.pause_resume_toggle_key,
[this]() {toggle_paused();},
"Pause/Resume"
);
add_key_callback(
play_options_.play_next_key,
[this]() {play_next();},
"Play Next Message"
);
}

} // namespace rosbag2_transport
Loading

0 comments on commit 6173fe1

Please sign in to comment.