-
Notifications
You must be signed in to change notification settings - Fork 259
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add play-next-n functionality #963
Conversation
Signed-off-by: Geoffrey Biggs <[email protected]>
I'm not seeing any obvious issues, apart from the CI failing. Once that's sorted out I'll give it another once-over just to triple-check. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gbiggs Thank you for your contribution.
I have a few requests for changes and some concerns/questions overall for the design of this feature.
I am curios about in what cases this feature aka playing multiple messages in "burst" mode without respecting their timings will be useful?
I would (and I think many others) expect that if play_next(N)
would exist it should respect timings between following messages.
Also it looks like if we will add such feature it will be more appropriate to change API for play next N
operation to return number of played messages instead of justboolean
value.
In current implementation for instance if someone requested to play 5 messages and for some circumstance play_next(N)
will really play 3 of them play_next(N)
will still return true and it will not be possible to recognize whether those messages was lost on the transport layer or they are was just filtered out or missing inside bag.
* Reverts the implementation of play_next to not use optional. * Respects message timing when calling into play_next(). * Removes play_next_no_message_must_succeed test. * Removes player->wait_for_playback_to_start(). * Update rosbag2_interfaces/srv/PlayNext.srv Co-authored-by: Geoffrey Biggs <[email protected]> * Update rosbag2_transport/include/rosbag2_transport/player.hpp Co-authored-by: Geoffrey Biggs <[email protected]> Co-authored-by: Geoffrey Biggs <[email protected]>
@MichaelOrlov: @agalbachicar has addressed your comments. Can you please re-review? |
@gbiggs Following questions are still unclear for me.
Could you please clarify? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gbiggs @agalbachicar
I have a significant concerns about this feature and it's implementation. Therefore I can't approve this PR.
- You didn't provide any explanations what would be a real world scenario for usage of this new feature and API changes. i.e. why following case doesn't work for you?
for (size_t i = 0; i < num_msgs_in_bag_; i++) {
player->play_next();
}
We use to creating new issue for feature development and discussion or at least need to provide justification and detailed explanations for proposed API changes and reasoning for them.
2. Current implementation broke generic behavior for play_next()
which was before your changes. e.g play_next()
shall play 1 message without delays.
3. Tests start failing and you just quietly adjust timings between messages to let tests pass with new broken functionality. This is unfair and unacceptable!!!
In particular in play_next_playing_all_messages_without_delays
test.
Taking in to the account all above.
Please:
- Provide justification and explanation what would be a reasoning for these changes and usage of this new API
- Revert timings and irrelevant changes in tests. I've made suggestions in line.
- Fix logic in core
player.cpp
to preserve existing functionality and to be able to pass existing tests. We need to keep backward compatibility.
ROSBAG2_TRANSPORT_PUBLIC | ||
virtual bool play_next(const std::optional<uint64_t> num_messages = std::nullopt); | ||
virtual uint64_t play_next(const uint64_t num_messages); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
virtual uint64_t play_next(const uint64_t num_messages); | |
virtual uint64_t play_next(const uint64_t num_messages = 1u); |
{ | ||
if (!clock_->is_paused()) { | ||
RCLCPP_WARN_STREAM(get_logger(), "Called play next, but not in paused state."); | ||
return false; | ||
return 0u; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (num_messages == 0) { | |
RCLCPP_WARN_STREAM(get_logger(), "Called play next for zero number of messages"); | |
return 0u; | |
} |
// Wait for player to be ready for playback messages from queue i.e. wait for Player:play() to | ||
// be called if not yet and queue to be filled with messages. | ||
{ | ||
std::unique_lock<std::mutex> lk(ready_to_play_from_queue_mutex_); | ||
ready_to_play_from_queue_cv_.wait(lk, [this] {return is_ready_to_play_from_queue_;}); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO. It doesn't make sense moving wait for playback start inside loop.
@@ -46,7 +46,7 @@ TEST_F(RosBag2PlayTestFixture, play_next_with_false_preconditions) { | |||
auto player = std::make_shared<MockPlayer>(std::move(reader), storage_options_, play_options_); | |||
|
|||
ASSERT_FALSE(player->is_paused()); | |||
ASSERT_FALSE(player->play_next()); | |||
ASSERT_EQ(0u, player->play_next(1u)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ASSERT_EQ(0u, player->play_next(1u)); | |
ASSERT_EQ(player->play_next(), 0u); |
serialize_test_message("topic1", 210, primitive_message), | ||
serialize_test_message("topic1", 330, primitive_message), | ||
serialize_test_message("topic1", 460, primitive_message), | ||
serialize_test_message("topic1", 590, primitive_message) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following timings was made thousands on purpose. It doesn't have impact on the test execution time.
Please revert changes. This is affect test logic.
serialize_test_message("topic1", 210, primitive_message), | |
serialize_test_message("topic1", 330, primitive_message), | |
serialize_test_message("topic1", 460, primitive_message), | |
serialize_test_message("topic1", 590, primitive_message) | |
serialize_test_message("topic1", 2100, primitive_message), | |
serialize_test_message("topic1", 3300, primitive_message), | |
serialize_test_message("topic1", 4600, primitive_message), | |
serialize_test_message("topic1", 5900, primitive_message) |
|
||
// Jump on third message (1200 ms) | ||
player->seek((start_time_ms_ + message_spacing_ms_ * 2) * 1000000); | ||
EXPECT_TRUE(player->play_next()); | ||
EXPECT_EQ(1u, player->play_next(1u)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EXPECT_EQ(1u, player->play_next(1u)); | |
ASSERT_EQ(player->play_next(), 1u); |
EXPECT_EQ(num_msgs_in_bag_, player->play_next(num_msgs_in_bag_)); | ||
EXPECT_EQ(0u, player->play_next(1u)); // Make sure there are no messages to play |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please revert changes. It does affect test execution time.
EXPECT_EQ(num_msgs_in_bag_, player->play_next(num_msgs_in_bag_)); | |
EXPECT_EQ(0u, player->play_next(1u)); // Make sure there are no messages to play | |
for (size_t i = 0; i < num_msgs_in_bag_; i++) { | |
ASSERT_EQ(player->play_next(), 1u); | |
} | |
ASSERT_EQ(player->play_next(), 0u); // Make sure there are no messages to play |
|
||
// Jump on third message (1200 ms) | ||
player->seek((start_time_ms_ + message_spacing_ms_ * 2) * 1000000); | ||
EXPECT_TRUE(player->play_next()); | ||
EXPECT_EQ(1u, player->play_next(1u)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EXPECT_EQ(1u, player->play_next(1u)); | |
ASSERT_EQ(player->play_next(), 1u); |
EXPECT_EQ(num_msgs_in_bag_, player->play_next(num_msgs_in_bag_)); | ||
EXPECT_EQ(0u, player->play_next(1u)); // Make sure there are no messages to play |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please revert changes. It does affect test execution time.
EXPECT_EQ(num_msgs_in_bag_, player->play_next(num_msgs_in_bag_)); | |
EXPECT_EQ(0u, player->play_next(1u)); // Make sure there are no messages to play | |
for (size_t i = 0; i < num_msgs_in_bag_; i++) { | |
ASSERT_EQ(player->play_next(), 1u); | |
} | |
ASSERT_EQ(player->play_next(), 0u); // Make sure there are no messages to play |
|
||
// Jump in timestamp equal to the timestamp in last message + 1 nanosecond | ||
player->seek((start_time_ms_ + message_spacing_ms_ * (num_msgs_in_bag_ - 1)) * 1000000 + 1); | ||
EXPECT_FALSE(player->play_next()); | ||
EXPECT_EQ(0u, player->play_next(1u)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EXPECT_EQ(0u, player->play_next(1u)); | |
ASSERT_EQ(player->play_next(), 0u); |
@MichaelOrlov Our use case is playing a subset of messages from a bag, but not in real-time, and doing it from the command line. A for-loop over I guess what we are looking for is a "burst mode" playback. Would this PR be acceptable if we changed it to provide a |
@MichaelOrlov , on top of @gbiggs comment , I just would like to add where all this started. Please take a look at the meeting notes of the working group on 2021/05/21. The scope of our use case extended from "play N seconds and pause" (see #960) to that and "play N messages". Anyway, @gbiggs proposal stands. The |
@gbiggs @agalbachicar Will |
We would prefer to burst the messages as fast as possible and in controllable batch sizes. |
@gbiggs You can set up even more aggressive rate 100x for example. |
Our use case is not for debugging, it's for processing of data from a bag. For example, we may want to filter the topics in a bag down to a single image topic, then pull out 10 messages at a time from that topic and process them through a machine learning algorithm. This is not time-based, it is number-of-messages-based, and if we are doing massively parallel computation (as an example) then we want to get all the computation pipelines full as fast as possible. |
@gbiggs Ok. Thanks for the explanation. |
@MichaelOrlov I have created a new PR that implements burst mode. I will close this PR as unmergeable. |
This PR extends the play-next-message service/function with the ability to play the next N messages instead of just one message.
This work was co-authored by @agalbachicar.