Skip to content

Commit

Permalink
fix sequential reader rollover-to-next-file strategy: (#839)
Browse files Browse the repository at this point in the history
- has_next() - calls recursively until next message OR no next file found

- read_next() - calls has_next() to ensure necessary rollovers are made

Signed-off-by: Sonia Jin <[email protected]>
  • Loading branch information
lihui815 authored Aug 5, 2021
1 parent 93960ae commit 7407cd7
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ void SequentialCompressionReader::open(
std::shared_ptr<rosbag2_storage::SerializedBagMessage> SequentialCompressionReader::read_next()
{
if (storage_ && decompressor_) {
// roll over if necessary
has_next();
auto message = storage_->read_next();
if (compression_mode_ == rosbag2_compression::CompressionMode::MESSAGE) {
decompressor_->decompress_serialized_bag_message(message.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,10 @@ TEST_F(SequentialCompressionReaderTest, compression_called_when_loading_split_ba
EXPECT_CALL(*compression_factory, create_decompressor(_)).Times(1);
// open_read_only should be called twice when opening 2 split bags
EXPECT_CALL(*storage_factory_, open_read_only(_)).Times(2);
// storage::has_next() is called twice when reader::has_next() is called
EXPECT_CALL(*storage_, has_next()).Times(2)
EXPECT_CALL(*storage_, has_next()).Times(3)
.WillOnce(Return(false)) // Load the next file
.WillOnce(Return(true)); // We have a message from the new file
.WillOnce(Return(true))
.WillOnce(Return(true));

auto compression_reader = std::make_unique<rosbag2_compression::SequentialCompressionReader>(
std::move(compression_factory),
Expand All @@ -238,8 +238,8 @@ TEST_F(SequentialCompressionReaderTest, compression_called_when_loading_split_ba

compression_reader->open(storage_options_, converter_options_);
EXPECT_EQ(compression_reader->has_next_file(), true);
EXPECT_EQ(compression_reader->has_next(), true);
compression_reader->read_next();
EXPECT_EQ(compression_reader->has_next(), true); // false then true
compression_reader->read_next(); // calls has_next true
}

TEST_F(SequentialCompressionReaderTest, can_find_v4_names)
Expand Down
10 changes: 7 additions & 3 deletions rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,22 +139,26 @@ bool SequentialReader::has_next()
if (storage_) {
// If there's no new message, check if there's at least another file to read and update storage
// to read from there. Otherwise, check if there's another message.
if (!storage_->has_next() && has_next_file()) {
bool current_storage_has_next = storage_->has_next();
if (!current_storage_has_next && has_next_file()) {
load_next_file();
storage_options_.uri = get_current_file();

storage_ = storage_factory_->open_read_only(storage_options_);
storage_->set_filter(topics_filter_);
// recursively call has_next again after rollover
return has_next();
}

return storage_->has_next();
return current_storage_has_next;
}
throw std::runtime_error("Bag is not open. Call open() before reading.");
}

std::shared_ptr<rosbag2_storage::SerializedBagMessage> SequentialReader::read_next()
{
if (storage_) {
// performs rollover if necessary
has_next();
auto message = storage_->read_next();
return converter_ ? converter_->convert(message) : message;
}
Expand Down
42 changes: 18 additions & 24 deletions rosbag2_cpp/test/rosbag2_cpp/test_multifile_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,12 @@ TEST_F(MultifileReaderTest, has_next_reads_next_file)
{
init();

// storage::has_next() is called twice when reader::has_next() is called
EXPECT_CALL(*storage_, has_next()).Times(6)
.WillOnce(Return(true)).WillOnce(Return(true)) // We have a message
.WillOnce(Return(false)) // No message, load next file
.WillOnce(Return(true)) // True since we now have a message
.WillOnce(Return(false)) // No message, load next file
.WillOnce(Return(true)); // True since we now have a message
EXPECT_CALL(*storage_, has_next()).Times(5)
.WillOnce(Return(false))
.WillOnce(Return(true))
.WillOnce(Return(false))
.WillOnce(Return(true))
.WillOnce(Return(true));
EXPECT_CALL(*converter_factory_, load_deserializer(storage_serialization_format_)).Times(0);
EXPECT_CALL(*converter_factory_, load_serializer(storage_serialization_format_)).Times(0);
reader_->open(default_storage_options_, {"", storage_serialization_format_});
Expand All @@ -143,26 +142,23 @@ TEST_F(MultifileReaderTest, has_next_reads_next_file)
auto resolved_absolute_path_1 =
rcpputils::fs::path(absolute_path_1_).string();
EXPECT_EQ(sr.get_current_file(), resolved_relative_path_1);
reader_->has_next();
reader_->read_next();
reader_->has_next();
reader_->read_next(); // calls has_next false then true
EXPECT_EQ(sr.get_current_file(), resolved_relative_path_2);
reader_->read_next();
reader_->has_next();
reader_->has_next(); // calls has_next false then true
reader_->read_next(); // calls has_next true
EXPECT_EQ(sr.get_current_file(), resolved_absolute_path_1);
}

TEST_F(MultifileReaderTestVersion3, has_next_reads_next_file_version3)
{
init();

// storage::has_next() is called twice when reader::has_next() is called
EXPECT_CALL(*storage_, has_next()).Times(6)
.WillOnce(Return(true)).WillOnce(Return(true)) // We have a message
.WillOnce(Return(false)) // No message, load next file
.WillOnce(Return(true)) // True since we now have a message
.WillOnce(Return(false)) // No message, load next file
.WillOnce(Return(true)); // True since we now have a message
EXPECT_CALL(*storage_, has_next()).Times(5)
.WillOnce(Return(false))
.WillOnce(Return(true))
.WillOnce(Return(false))
.WillOnce(Return(true))
.WillOnce(Return(true));
EXPECT_CALL(*converter_factory_, load_deserializer(storage_serialization_format_)).Times(0);
EXPECT_CALL(*converter_factory_, load_serializer(storage_serialization_format_)).Times(0);
reader_->open(default_storage_options_, {"", storage_serialization_format_});
Expand All @@ -178,12 +174,10 @@ TEST_F(MultifileReaderTestVersion3, has_next_reads_next_file_version3)
auto resolved_absolute_path_1 =
rcpputils::fs::path(absolute_path_1_).string();
EXPECT_EQ(sr.get_current_file(), resolved_relative_path_1);
reader_->has_next();
reader_->read_next();
reader_->has_next();
reader_->read_next(); // calls has_next false then true
EXPECT_EQ(sr.get_current_file(), resolved_relative_path_2);
reader_->read_next();
reader_->has_next();
reader_->has_next(); // calls has_next false then true
reader_->read_next(); // calls has_next true
EXPECT_EQ(sr.get_current_file(), resolved_absolute_path_1);
}

Expand Down

0 comments on commit 7407cd7

Please sign in to comment.