diff --git a/google/cloud/bigtable/data_connection.cc b/google/cloud/bigtable/data_connection.cc index 1c3f0bc7b9b7f..6642cb136ce84 100644 --- a/google/cloud/bigtable/data_connection.cc +++ b/google/cloud/bigtable/data_connection.cc @@ -77,10 +77,15 @@ future> DataConnection::AsyncBulkApply( RowReader DataConnection::ReadRows(std::string const& table_name, RowSet row_set, std::int64_t rows_limit, Filter filter) { + auto const& options = google::cloud::internal::CurrentOptions(); return ReadRowsFull(ReadRowsParams{ std::move(table_name), - google::cloud::internal::CurrentOptions().get(), - std::move(row_set), rows_limit, std::move(filter)}); + options.get(), + std::move(row_set), + rows_limit, + std::move(filter), + options.get(), + }); } // NOLINTNEXTLINE(performance-unnecessary-value-param) diff --git a/google/cloud/bigtable/data_connection.h b/google/cloud/bigtable/data_connection.h index 17c87dc846a3e..cc5fd0b690b1d 100644 --- a/google/cloud/bigtable/data_connection.h +++ b/google/cloud/bigtable/data_connection.h @@ -49,6 +49,7 @@ struct ReadRowsParams { RowSet row_set; std::int64_t rows_limit; Filter filter = Filter::PassAllFilter(); + bool reverse = false; }; /** diff --git a/google/cloud/bigtable/examples/read_snippets.cc b/google/cloud/bigtable/examples/read_snippets.cc index 935d0f2ca2189..0cbaf149b7791 100644 --- a/google/cloud/bigtable/examples/read_snippets.cc +++ b/google/cloud/bigtable/examples/read_snippets.cc @@ -248,7 +248,7 @@ void ReadRows(google::cloud::bigtable::Table table, using ::google::cloud::StatusOr; [](cbt::Table table) { // Read and print the rows. - for (auto& row : table.ReadRows( + for (StatusOr& row : table.ReadRows( cbt::RowSet("phone#4c410523#20190501", "phone#4c410523#20190502"), cbt::Filter::PassAllFilter())) { if (!row) throw std::move(row).status(); @@ -266,7 +266,7 @@ void ReadRowRange(google::cloud::bigtable::Table table, using ::google::cloud::StatusOr; [](cbt::Table table) { // Read and print the rows. - for (auto& row : + for (StatusOr& row : table.ReadRows(cbt::RowRange::Range("phone#4c410523#20190501", "phone#4c410523#201906201"), cbt::Filter::PassAllFilter())) { @@ -285,7 +285,7 @@ void ReadRowRanges(google::cloud::bigtable::Table table, using ::google::cloud::StatusOr; [](cbt::Table table) { // Read and print the rows. - for (auto& row : table.ReadRows( + for (StatusOr& row : table.ReadRows( cbt::RowSet({cbt::RowRange::Range("phone#4c410523#20190501", "phone#4c410523#20190601"), cbt::RowRange::Range("phone#5c10102#20190501", @@ -306,8 +306,8 @@ void ReadRowPrefix(google::cloud::bigtable::Table table, using ::google::cloud::StatusOr; [](cbt::Table table) { // Read and print the rows. - for (auto& row : table.ReadRows(cbt::RowRange::Prefix("phone"), - cbt::Filter::PassAllFilter())) { + for (StatusOr& row : table.ReadRows( + cbt::RowRange::Prefix("phone"), cbt::Filter::PassAllFilter())) { if (!row) throw std::move(row).status(); PrintRow(*row); } @@ -323,8 +323,9 @@ void ReadFilter(google::cloud::bigtable::Table table, using ::google::cloud::StatusOr; [](cbt::Table table) { // Read and print the rows. - for (auto& row : table.ReadRows(cbt::RowRange::InfiniteRange(), - cbt::Filter::ValueRegex("PQ2A.*"))) { + for (StatusOr& row : + table.ReadRows(cbt::RowRange::InfiniteRange(), + cbt::Filter::ValueRegex("PQ2A.*"))) { if (!row) throw std::move(row).status(); PrintRow(*row); } @@ -333,6 +334,26 @@ void ReadFilter(google::cloud::bigtable::Table table, (std::move(table)); } +void ReadRowsReverse(google::cloud::bigtable::Table table, + std::vector const&) { + //! [reverse scan] [START bigtable_reverse_scan] + namespace cbt = ::google::cloud::bigtable; + using ::google::cloud::Options; + using ::google::cloud::StatusOr; + [](cbt::Table table) { + // Read and print the rows. + for (StatusOr& row : table.ReadRows( + cbt::RowRange::RightOpen("phone#5c10102", "phone#5c10103"), 3, + cbt::Filter::PassAllFilter(), + Options{}.set(true))) { + if (!row) throw std::move(row).status(); + PrintRow(*row); + } + } + //! [reverse scan] [END bigtable_reverse_scan] + (std::move(table)); +} + void RunAll(std::vector const& argv) { namespace examples = ::google::cloud::bigtable::examples; namespace cbt = ::google::cloud::bigtable; @@ -390,6 +411,10 @@ void RunAll(std::vector const& argv) { ReadFilter(table, {}); std::cout << "Running ReadRowsWithLimit() example" << std::endl; ReadRowsWithLimit(table, {"5"}); + if (!google::cloud::bigtable::examples::UsingEmulator()) { + std::cout << "Running ReadRowsReverse() example" << std::endl; + ReadRowsReverse(table, {}); + } std::cout << "Running ReadKeySet() example" << std::endl; ReadKeysSet({table.project_id(), table.instance_id(), table.table_id(), @@ -414,6 +439,7 @@ int main(int argc, char* argv[]) try { MakeCommandEntry("read-row-ranges", {}, ReadRowRanges), MakeCommandEntry("read-row-prefix", {}, ReadRowPrefix), MakeCommandEntry("read-filter", {}, ReadFilter), + MakeCommandEntry("read-rows-reverse", {}, ReadRowsReverse), {"auto", RunAll}, }; diff --git a/google/cloud/bigtable/internal/async_row_reader.cc b/google/cloud/bigtable/internal/async_row_reader.cc index 9507f2e1e445e..5bd7ea2e828cd 100644 --- a/google/cloud/bigtable/internal/async_row_reader.cc +++ b/google/cloud/bigtable/internal/async_row_reader.cc @@ -30,6 +30,7 @@ void AsyncRowReader::MakeRequest() { request.set_app_profile_id(app_profile_id_); request.set_table_name(table_name_); + request.set_reversed(reverse_); auto row_set_proto = row_set_.as_proto(); request.mutable_rows()->Swap(&row_set_proto); @@ -39,7 +40,7 @@ void AsyncRowReader::MakeRequest() { if (rows_limit_ != NO_ROWS_LIMIT) { request.set_rows_limit(rows_limit_ - rows_count_); } - parser_ = bigtable::internal::ReadRowsParserFactory().Create(); + parser_ = bigtable::internal::ReadRowsParserFactory().Create(reverse_); internal::ScopedCallContext scope(call_context_); auto context = std::make_shared(); diff --git a/google/cloud/bigtable/internal/async_row_reader.h b/google/cloud/bigtable/internal/async_row_reader.h index 269511420254e..c1fa859df2a53 100644 --- a/google/cloud/bigtable/internal/async_row_reader.h +++ b/google/cloud/bigtable/internal/async_row_reader.h @@ -57,13 +57,13 @@ class AsyncRowReader : public std::enable_shared_from_this { std::string app_profile_id, std::string table_name, RowFunctor on_row, FinishFunctor on_finish, bigtable::RowSet row_set, std::int64_t rows_limit, - bigtable::Filter filter, + bigtable::Filter filter, bool reverse, std::unique_ptr retry_policy, std::unique_ptr backoff_policy) { auto reader = std::shared_ptr(new AsyncRowReader( std::move(cq), std::move(stub), std::move(app_profile_id), std::move(table_name), std::move(on_row), std::move(on_finish), - std::move(row_set), rows_limit, std::move(filter), + std::move(row_set), rows_limit, std::move(filter), reverse, std::move(retry_policy), std::move(backoff_policy))); reader->MakeRequest(); } @@ -73,7 +73,7 @@ class AsyncRowReader : public std::enable_shared_from_this { std::string app_profile_id, std::string table_name, RowFunctor on_row, FinishFunctor on_finish, bigtable::RowSet row_set, std::int64_t rows_limit, - bigtable::Filter filter, + bigtable::Filter filter, bool reverse, std::unique_ptr retry_policy, std::unique_ptr backoff_policy) : cq_(std::move(cq)), @@ -85,6 +85,7 @@ class AsyncRowReader : public std::enable_shared_from_this { row_set_(std::move(row_set)), rows_limit_(rows_limit), filter_(std::move(filter)), + reverse_(std::move(reverse)), retry_policy_(std::move(retry_policy)), backoff_policy_(std::move(backoff_policy)) {} @@ -129,6 +130,7 @@ class AsyncRowReader : public std::enable_shared_from_this { bigtable::RowSet row_set_; std::int64_t rows_limit_; bigtable::Filter filter_; + bool reverse_; std::unique_ptr retry_policy_; std::unique_ptr backoff_policy_; std::unique_ptr parser_; diff --git a/google/cloud/bigtable/internal/async_row_reader_test.cc b/google/cloud/bigtable/internal/async_row_reader_test.cc index aed55d2ffa3a5..479e654eea3ab 100644 --- a/google/cloud/bigtable/internal/async_row_reader_test.cc +++ b/google/cloud/bigtable/internal/async_row_reader_test.cc @@ -150,8 +150,8 @@ TEST(AsyncRowReaderTest, Success) { AsyncRowReader::Create(cq, mock, kAppProfile, kTableName, on_row.AsStdFunction(), on_finish.AsStdFunction(), bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT, - bigtable::Filter::PassAllFilter(), std::move(retry), - std::move(mock_b)); + bigtable::Filter::PassAllFilter(), false, + std::move(retry), std::move(mock_b)); } /// @test Verify that reading works when the futures are not immediately @@ -218,8 +218,8 @@ TEST(AsyncRowReaderTest, SuccessDelayedFuture) { AsyncRowReader::Create(cq, mock, kAppProfile, kTableName, on_row.AsStdFunction(), on_finish.AsStdFunction(), bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT, - bigtable::Filter::PassAllFilter(), std::move(retry), - std::move(mock_b)); + bigtable::Filter::PassAllFilter(), false, + std::move(retry), std::move(mock_b)); // Satisfy the futures p1.set_value(true); @@ -276,8 +276,8 @@ TEST(AsyncRowReaderTest, ResponseInMultipleChunks) { AsyncRowReader::Create(cq, mock, kAppProfile, kTableName, on_row.AsStdFunction(), on_finish.AsStdFunction(), bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT, - bigtable::Filter::PassAllFilter(), std::move(retry), - std::move(mock_b)); + bigtable::Filter::PassAllFilter(), false, + std::move(retry), std::move(mock_b)); } /// @test Verify that parser fails if the stream finishes prematurely. @@ -327,8 +327,8 @@ TEST(AsyncRowReaderTest, ParserEofFailsOnUnfinishedRow) { AsyncRowReader::Create(cq, mock, kAppProfile, kTableName, on_row.AsStdFunction(), on_finish.AsStdFunction(), bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT, - bigtable::Filter::PassAllFilter(), std::move(retry), - std::move(mock_b)); + bigtable::Filter::PassAllFilter(), false, + std::move(retry), std::move(mock_b)); } /// @test Check that we ignore HandleEndOfStream errors if enough rows were read @@ -381,10 +381,11 @@ TEST(AsyncRowReaderTest, ParserEofDoesntFailOnUnfinishedRowIfRowLimit) { internal::OptionsSpan span( Options{}.set(mock_setup.AsStdFunction())); - AsyncRowReader::Create( - cq, mock, kAppProfile, kTableName, on_row.AsStdFunction(), - on_finish.AsStdFunction(), bigtable::RowSet(), 1, - bigtable::Filter::PassAllFilter(), std::move(retry), std::move(mock_b)); + AsyncRowReader::Create(cq, mock, kAppProfile, kTableName, + on_row.AsStdFunction(), on_finish.AsStdFunction(), + bigtable::RowSet(), 1, + bigtable::Filter::PassAllFilter(), false, + std::move(retry), std::move(mock_b)); } /// @test Verify that permanent errors are not retried and properly passed. @@ -427,8 +428,8 @@ TEST(AsyncRowReaderTest, PermanentFailure) { AsyncRowReader::Create(cq, mock, kAppProfile, kTableName, on_row.AsStdFunction(), on_finish.AsStdFunction(), bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT, - bigtable::Filter::PassAllFilter(), std::move(retry), - std::move(mock_b)); + bigtable::Filter::PassAllFilter(), false, + std::move(retry), std::move(mock_b)); } TEST(AsyncRowReaderTest, RetryPolicyExhausted) { @@ -479,8 +480,8 @@ TEST(AsyncRowReaderTest, RetryPolicyExhausted) { AsyncRowReader::Create(cq, mock, kAppProfile, kTableName, on_row.AsStdFunction(), on_finish.AsStdFunction(), bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT, - bigtable::Filter::PassAllFilter(), std::move(retry), - std::move(mock_b)); + bigtable::Filter::PassAllFilter(), false, + std::move(retry), std::move(mock_b)); } /// @test Verify that retries do not ask for rows we have already read. @@ -563,7 +564,7 @@ TEST(AsyncRowReaderTest, RetrySkipsReadRows) { cq, mock, kAppProfile, kTableName, on_row.AsStdFunction(), on_finish.AsStdFunction(), bigtable::RowSet("r1", "r2"), bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(), - std::move(retry), std::move(mock_b)); + false, std::move(retry), std::move(mock_b)); } /// @test Verify that we do not retry at all if the rowset will be empty. @@ -619,7 +620,7 @@ TEST(AsyncRowReaderTest, NoRetryIfRowSetIsEmpty) { cq, mock, kAppProfile, kTableName, on_row.AsStdFunction(), on_finish.AsStdFunction(), bigtable::RowSet("r1"), bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(), - std::move(retry), std::move(mock_b)); + false, std::move(retry), std::move(mock_b)); } /// @test Verify that the last scanned row is respected. @@ -708,7 +709,7 @@ TEST(AsyncRowReaderTest, LastScannedRowKeyIsRespected) { cq, mock, kAppProfile, kTableName, on_row.AsStdFunction(), on_finish.AsStdFunction(), bigtable::RowSet("r1", "r2", "r3"), bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(), - std::move(retry), std::move(mock_b)); + false, std::move(retry), std::move(mock_b)); } /// @test Verify proper handling of bogus responses from the service. @@ -762,8 +763,8 @@ TEST(AsyncRowReaderTest, ParserFailsOnOutOfOrderRowKeys) { AsyncRowReader::Create(cq, mock, kAppProfile, kTableName, on_row.AsStdFunction(), on_finish.AsStdFunction(), bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT, - bigtable::Filter::PassAllFilter(), std::move(retry), - std::move(mock_b)); + bigtable::Filter::PassAllFilter(), false, + std::move(retry), std::move(mock_b)); } /// @test Verify canceling the stream by satisfying the futures with false @@ -856,8 +857,8 @@ TEST_P(AsyncRowReaderExceptionTest, CancelMidStream) { AsyncRowReader::Create(cq, mock, kAppProfile, kTableName, on_row.AsStdFunction(), on_finish.AsStdFunction(), bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT, - bigtable::Filter::PassAllFilter(), std::move(retry), - std::move(mock_b)); + bigtable::Filter::PassAllFilter(), false, + std::move(retry), std::move(mock_b)); } #if GOOGLE_CLOUD_CPP_HAVE_EXCEPTIONS @@ -923,8 +924,8 @@ TEST(AsyncRowReaderTest, CancelAfterStreamFinish) { AsyncRowReader::Create(cq, mock, kAppProfile, kTableName, on_row.AsStdFunction(), on_finish.AsStdFunction(), bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT, - bigtable::Filter::PassAllFilter(), std::move(retry), - std::move(mock_b)); + bigtable::Filter::PassAllFilter(), false, + std::move(retry), std::move(mock_b)); } /// @test Verify that the recursion described in TryGiveRowToUser is bounded. @@ -997,8 +998,8 @@ TEST(AsyncRowReaderTest, DeepStack) { AsyncRowReader::Create(cq, mock, kAppProfile, kTableName, on_row.AsStdFunction(), on_finish.AsStdFunction(), bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT, - bigtable::Filter::PassAllFilter(), std::move(retry), - std::move(mock_b)); + bigtable::Filter::PassAllFilter(), false, + std::move(retry), std::move(mock_b)); } TEST(AsyncRowReaderTest, TimerErrorEndsLoop) { @@ -1055,8 +1056,8 @@ TEST(AsyncRowReaderTest, TimerErrorEndsLoop) { AsyncRowReader::Create(cq, mock, kAppProfile, kTableName, on_row.AsStdFunction(), on_finish.AsStdFunction(), bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT, - bigtable::Filter::PassAllFilter(), std::move(retry), - std::move(mock_b)); + bigtable::Filter::PassAllFilter(), false, + std::move(retry), std::move(mock_b)); } TEST(AsyncRowReaderTest, CurrentOptionsContinuedOnRetries) { @@ -1110,8 +1111,8 @@ TEST(AsyncRowReaderTest, CurrentOptionsContinuedOnRetries) { AsyncRowReader::Create(cq, mock, kAppProfile, kTableName, on_row.AsStdFunction(), on_finish.AsStdFunction(), bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT, - bigtable::Filter::PassAllFilter(), std::move(retry), - std::move(mock_b)); + bigtable::Filter::PassAllFilter(), false, + std::move(retry), std::move(mock_b)); // Simulate the timer being satisfied in a thread with different prevailing // options than the calling thread. @@ -1119,6 +1120,115 @@ TEST(AsyncRowReaderTest, CurrentOptionsContinuedOnRetries) { timer_promise.set_value(make_status_or(std::chrono::system_clock::now())); } +TEST(AsyncRowReaderTest, ReverseScan) { + CompletionQueue cq; + + auto mock = std::make_shared(); + EXPECT_CALL(*mock, AsyncReadRows) + .WillOnce([](Unused, Unused, v2::ReadRowsRequest const& request) { + EXPECT_TRUE(request.reversed()); + auto stream = std::make_unique(); + ::testing::InSequence s; + EXPECT_CALL(*stream, Start).WillOnce([] { + return make_ready_future(true); + }); + EXPECT_CALL(*stream, Read) + .WillOnce([] { + return make_ready_future( + MakeResponse({{"r2", true}, {"r1", true}})); + }) + .WillOnce([] { return make_ready_future(EndOfStream()); }); + EXPECT_CALL(*stream, Finish).WillOnce([] { + return make_ready_future(Status{}); + }); + return stream; + }); + + MockFunction(bigtable::Row const&)> on_row; + EXPECT_CALL(on_row, Call) + .WillOnce([](bigtable::Row const& row) { + EXPECT_EQ("r2", row.row_key()); + return make_ready_future(true); + }) + .WillOnce([](bigtable::Row const& row) { + EXPECT_EQ("r1", row.row_key()); + return make_ready_future(true); + }); + + MockFunction on_finish; + EXPECT_CALL(on_finish, Call).WillOnce([](Status const& status) { + EXPECT_STATUS_OK(status); + }); + + auto retry = DataLimitedErrorCountRetryPolicy(kNumRetries).clone(); + auto mock_b = std::make_unique(); + EXPECT_CALL(*mock_b, OnCompletion).Times(0); + + MockFunction mock_setup; + EXPECT_CALL(mock_setup, Call).Times(1); + internal::OptionsSpan span( + Options{}.set(mock_setup.AsStdFunction())); + + AsyncRowReader::Create(cq, mock, kAppProfile, kTableName, + on_row.AsStdFunction(), on_finish.AsStdFunction(), + bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT, + bigtable::Filter::PassAllFilter(), true, + std::move(retry), std::move(mock_b)); +} + +TEST(AsyncRowReaderTest, ReverseScanFailsOnIncreasingRowKeyOrder) { + CompletionQueue cq; + + auto mock = std::make_shared(); + EXPECT_CALL(*mock, AsyncReadRows) + .WillOnce([](Unused, Unused, v2::ReadRowsRequest const& request) { + EXPECT_TRUE(request.reversed()); + auto stream = std::make_unique(); + ::testing::InSequence s; + EXPECT_CALL(*stream, Start).WillOnce([] { + return make_ready_future(true); + }); + EXPECT_CALL(*stream, Read).WillOnce([] { + // The rows should be returned out of order for a reverse scan. + return make_ready_future(MakeResponse({{"r1", true}, {"r2", true}})); + }); + EXPECT_CALL(*stream, Cancel); + EXPECT_CALL(*stream, Read).WillOnce([] { + return make_ready_future(EndOfStream()); + }); + EXPECT_CALL(*stream, Finish).WillOnce([] { + return make_ready_future(Status{}); + }); + return stream; + }); + + MockFunction(bigtable::Row const&)> on_row; + EXPECT_CALL(on_row, Call).WillOnce([](bigtable::Row const& row) { + EXPECT_EQ("r1", row.row_key()); + return make_ready_future(true); + }); + + MockFunction on_finish; + EXPECT_CALL(on_finish, Call).WillOnce([](Status const& status) { + EXPECT_THAT(status, StatusIs(StatusCode::kInternal)); + }); + + auto retry = DataLimitedErrorCountRetryPolicy(kNumRetries).clone(); + auto mock_b = std::make_unique(); + EXPECT_CALL(*mock_b, OnCompletion).Times(0); + + MockFunction mock_setup; + EXPECT_CALL(mock_setup, Call).Times(1); + internal::OptionsSpan span( + Options{}.set(mock_setup.AsStdFunction())); + + AsyncRowReader::Create(cq, mock, kAppProfile, kTableName, + on_row.AsStdFunction(), on_finish.AsStdFunction(), + bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT, + bigtable::Filter::PassAllFilter(), true, + std::move(retry), std::move(mock_b)); +} + #ifdef GOOGLE_CLOUD_CPP_HAVE_OPENTELEMETRY using ::google::cloud::testing_util::EnableTracing; using ::google::cloud::testing_util::IsActive; @@ -1149,8 +1259,8 @@ TEST(AsyncRowReaderTest, TracedBackoff) { AsyncRowReader::Create(background.cq(), mock, kAppProfile, kTableName, std::move(on_row), std::move(on_finish), bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT, - bigtable::Filter::PassAllFilter(), std::move(retry), - std::move(mock_b)); + bigtable::Filter::PassAllFilter(), false, + std::move(retry), std::move(mock_b)); // Block until the async call has completed. p.get_future().get(); @@ -1185,8 +1295,8 @@ TEST(AsyncRowReaderTest, CallSpanActiveThroughout) { AsyncRowReader::Create(background.cq(), mock, kAppProfile, kTableName, std::move(on_row), std::move(on_finish), bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT, - bigtable::Filter::PassAllFilter(), std::move(retry), - std::move(mock_b)); + bigtable::Filter::PassAllFilter(), false, + std::move(retry), std::move(mock_b)); // Block until the async call has completed. p.get_future().get(); diff --git a/google/cloud/bigtable/internal/bigtable_stub_factory.cc b/google/cloud/bigtable/internal/bigtable_stub_factory.cc index 0aa9533a1580c..67761c88e7ed9 100644 --- a/google/cloud/bigtable/internal/bigtable_stub_factory.cc +++ b/google/cloud/bigtable/internal/bigtable_stub_factory.cc @@ -24,9 +24,11 @@ #include "google/cloud/common_options.h" #include "google/cloud/grpc_options.h" #include "google/cloud/internal/algorithm.h" +#include "google/cloud/internal/base64_transforms.h" #include "google/cloud/internal/opentelemetry.h" #include "google/cloud/internal/unified_grpc_credentials.h" #include "google/cloud/log.h" +#include #include namespace google { @@ -43,6 +45,18 @@ std::shared_ptr CreateGrpcChannel( return auth.CreateChannel(options.get(), std::move(args)); } +std::string FeaturesMetadata() { + static auto const* const kFeatures = new auto([] { + google::bigtable::v2::FeatureFlags proto; + proto.set_reverse_scans(true); + auto bytes = proto.SerializeAsString(); + internal::Base64Encoder enc; + for (auto c : bytes) enc.PushBack(c); + return std::move(enc).FlushAndPad(); + }()); + return *kFeatures; +} + } // namespace std::shared_ptr CreateBigtableStubRoundRobin( @@ -80,7 +94,8 @@ std::shared_ptr CreateDecoratedStubs( stub = std::make_shared(std::move(auth), std::move(stub)); } stub = std::make_shared( - std::move(stub), std::multimap{}); + std::move(stub), std::multimap{ + {"bigtable-features", FeaturesMetadata()}}); if (google::cloud::internal::Contains(options.get(), "rpc")) { GCP_LOG(INFO) << "Enabled logging for gRPC calls"; diff --git a/google/cloud/bigtable/internal/bigtable_stub_factory_test.cc b/google/cloud/bigtable/internal/bigtable_stub_factory_test.cc index 90bc55b33dd7b..64e03c9d0c3ec 100644 --- a/google/cloud/bigtable/internal/bigtable_stub_factory_test.cc +++ b/google/cloud/bigtable/internal/bigtable_stub_factory_test.cc @@ -41,6 +41,7 @@ using ::google::cloud::testing_util::StatusIs; using ::google::cloud::testing_util::ValidateMetadataFixture; using ::testing::Contains; using ::testing::HasSubstr; +using ::testing::IsEmpty; using ::testing::NotNull; using ::testing::Return; @@ -254,6 +255,35 @@ TEST_F(BigtableStubFactory, AsyncMutateRow) { EXPECT_THAT(log.ExtractLines(), Contains(HasSubstr("AsyncMutateRow"))); } +TEST_F(BigtableStubFactory, FeaturesFlags) { + MockFactory factory; + EXPECT_CALL(factory, Call) + .WillOnce([](std::shared_ptr const&) { + auto mock = std::make_shared(); + EXPECT_CALL(*mock, MutateRow) + .WillOnce([](grpc::ClientContext& context, + google::bigtable::v2::MutateRowRequest const&) { + ValidateMetadataFixture fixture; + auto headers = fixture.GetMetadata(context); + EXPECT_THAT(headers, + Contains(Pair("bigtable-features", Not(IsEmpty())))); + return internal::AbortedError("fail"); + }); + return mock; + }); + + CompletionQueue cq; + auto stub = CreateDecoratedStubs( + std::move(cq), + Options{} + .set("localhost:1") + .set(1) + .set(MakeInsecureCredentials()), + factory.AsStdFunction()); + grpc::ClientContext context; + (void)stub->MutateRow(context, {}); +} + #ifdef GOOGLE_CLOUD_CPP_HAVE_OPENTELEMETRY using ::google::cloud::testing_util::DisableTracing; using ::google::cloud::testing_util::EnableTracing; diff --git a/google/cloud/bigtable/internal/data_connection_impl.cc b/google/cloud/bigtable/internal/data_connection_impl.cc index 09e1578d2182d..fd28cc2295f8c 100644 --- a/google/cloud/bigtable/internal/data_connection_impl.cc +++ b/google/cloud/bigtable/internal/data_connection_impl.cc @@ -178,7 +178,7 @@ bigtable::RowReader DataConnectionImpl::ReadRowsFull( auto impl = std::make_shared( stub_, std::move(params.app_profile_id), std::move(params.table_name), std::move(params.row_set), params.rows_limit, std::move(params.filter), - retry_policy(), backoff_policy()); + params.reverse, retry_policy(), backoff_policy()); return MakeRowReader(std::move(impl)); } @@ -374,10 +374,11 @@ void DataConnectionImpl::AsyncReadRows( std::function(bigtable::Row)> on_row, std::function on_finish, bigtable::RowSet row_set, std::int64_t rows_limit, bigtable::Filter filter) { + auto reverse = internal::CurrentOptions().get(); bigtable_internal::AsyncRowReader::Create( background_->cq(), stub_, app_profile_id(), table_name, std::move(on_row), std::move(on_finish), std::move(row_set), rows_limit, std::move(filter), - retry_policy(), backoff_policy()); + reverse, retry_policy(), backoff_policy()); } future>> diff --git a/google/cloud/bigtable/internal/default_row_reader.cc b/google/cloud/bigtable/internal/default_row_reader.cc index e62511d760291..06e665ef2fd40 100644 --- a/google/cloud/bigtable/internal/default_row_reader.cc +++ b/google/cloud/bigtable/internal/default_row_reader.cc @@ -25,7 +25,7 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN DefaultRowReader::DefaultRowReader( std::shared_ptr stub, std::string app_profile_id, std::string table_name, bigtable::RowSet row_set, std::int64_t rows_limit, - bigtable::Filter filter, + bigtable::Filter filter, bool reverse, std::unique_ptr retry_policy, std::unique_ptr backoff_policy) : stub_(std::move(stub)), @@ -34,6 +34,7 @@ DefaultRowReader::DefaultRowReader( row_set_(std::move(row_set)), rows_limit_(rows_limit), filter_(std::move(filter)), + reverse_(reverse), retry_policy_(std::move(retry_policy)), backoff_policy_(std::move(backoff_policy)) {} @@ -44,6 +45,7 @@ void DefaultRowReader::MakeRequest() { google::bigtable::v2::ReadRowsRequest request; request.set_table_name(table_name_); request.set_app_profile_id(app_profile_id_); + request.set_reversed(reverse_); auto row_set_proto = row_set_.as_proto(); request.mutable_rows()->Swap(&row_set_proto); @@ -61,7 +63,7 @@ void DefaultRowReader::MakeRequest() { stream_ = stub_->ReadRows(std::move(context), request); stream_is_open_ = true; - parser_ = bigtable::internal::ReadRowsParserFactory().Create(); + parser_ = bigtable::internal::ReadRowsParserFactory().Create(reverse_); } bool DefaultRowReader::NextChunk() { @@ -107,8 +109,13 @@ absl::variant DefaultRowReader::Advance() { if (!last_read_row_key_.empty()) { // We've returned some rows and need to make sure we don't // request them again. - row_set_ = - row_set_.Intersect(bigtable::RowRange::Open(last_read_row_key_, "")); + if (reverse_) { + row_set_ = row_set_.Intersect( + bigtable::RowRange::Open("", last_read_row_key_)); + } else { + row_set_ = row_set_.Intersect( + bigtable::RowRange::Open(last_read_row_key_, "")); + } } // If we receive an error, but the retryable set is empty, stop. diff --git a/google/cloud/bigtable/internal/default_row_reader.h b/google/cloud/bigtable/internal/default_row_reader.h index dfdb29f59cf79..b77b646da5340 100644 --- a/google/cloud/bigtable/internal/default_row_reader.h +++ b/google/cloud/bigtable/internal/default_row_reader.h @@ -45,7 +45,7 @@ class DefaultRowReader : public RowReaderImpl { DefaultRowReader(std::shared_ptr stub, std::string app_profile_id, std::string table_name, bigtable::RowSet row_set, std::int64_t rows_limit, - bigtable::Filter filter, + bigtable::Filter filter, bool reverse, std::unique_ptr retry_policy, std::unique_ptr backoff_policy); @@ -89,6 +89,7 @@ class DefaultRowReader : public RowReaderImpl { bigtable::RowSet row_set_; std::int64_t rows_limit_; bigtable::Filter filter_; + bool reverse_; std::unique_ptr retry_policy_; std::unique_ptr backoff_policy_; diff --git a/google/cloud/bigtable/internal/default_row_reader_test.cc b/google/cloud/bigtable/internal/default_row_reader_test.cc index 82f27a36561fe..91248dfa96f58 100644 --- a/google/cloud/bigtable/internal/default_row_reader_test.cc +++ b/google/cloud/bigtable/internal/default_row_reader_test.cc @@ -33,9 +33,12 @@ using ::google::cloud::bigtable::DataLimitedErrorCountRetryPolicy; using ::google::cloud::bigtable::testing::MockBigtableStub; using ::google::cloud::bigtable::testing::MockDataRetryPolicy; using ::google::cloud::bigtable::testing::MockReadRowsStream; +using ::google::cloud::testing_util::IsOkAndHolds; using ::google::cloud::testing_util::MockBackoffPolicy; using ::google::cloud::testing_util::StatusIs; +using ::testing::ElementsAre; using ::testing::Eq; +using ::testing::HasSubstr; using ::testing::Matcher; using ::testing::MockFunction; using ::testing::Property; @@ -84,6 +87,19 @@ google::bigtable::v2::ReadRowsResponse MalformedResponse() { return resp; } +std::vector> StatusOrRowKeys( + bigtable::RowReader& reader) { + std::vector> rows; + for (auto& row : reader) { + if (!row) { + rows.emplace_back(std::move(row).status()); + continue; + } + rows.emplace_back(std::move(row->row_key())); + } + return rows; +} + class DefaultRowReaderTest : public ::testing::Test { protected: // Ensure that we set up the ClientContext once per stream @@ -115,7 +131,7 @@ TEST_F(DefaultRowReaderTest, EmptyReaderHasNoRows) { auto impl = std::make_shared( mock, kAppProfile, kTableName, bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(), - retry_.clone(), backoff_.clone()); + false, retry_.clone(), backoff_.clone()); auto reader = bigtable_internal::MakeRowReader(std::move(impl)); EXPECT_EQ(reader.begin(), reader.end()); @@ -138,7 +154,7 @@ TEST_F(DefaultRowReaderTest, ReadOneRow) { auto impl = std::make_shared( mock, kAppProfile, kTableName, bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(), - retry_.clone(), backoff_.clone()); + false, retry_.clone(), backoff_.clone()); auto reader = bigtable_internal::MakeRowReader(std::move(impl)); auto it = reader.begin(); @@ -169,7 +185,7 @@ TEST_F(DefaultRowReaderTest, StreamIsDrained) { auto impl = std::make_shared( mock, kAppProfile, kTableName, bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(), - retry_.clone(), backoff_.clone()); + false, retry_.clone(), backoff_.clone()); auto reader = bigtable_internal::MakeRowReader(std::move(impl)); auto it = reader.begin(); @@ -205,7 +221,7 @@ TEST_F(DefaultRowReaderTest, RetryThenSuccess) { auto impl = std::make_shared( mock, kAppProfile, kTableName, bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(), - retry_.clone(), backoff_.clone()); + false, retry_.clone(), backoff_.clone()); auto reader = bigtable_internal::MakeRowReader(std::move(impl)); auto it = reader.begin(); @@ -231,7 +247,7 @@ TEST_F(DefaultRowReaderTest, NoRetryOnPermanentError) { auto impl = std::make_shared( mock, kAppProfile, kTableName, bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(), - retry_.clone(), backoff_.clone()); + false, retry_.clone(), backoff_.clone()); auto reader = bigtable_internal::MakeRowReader(std::move(impl)); auto it = reader.begin(); @@ -264,7 +280,7 @@ TEST_F(DefaultRowReaderTest, RetryPolicyExhausted) { auto impl = std::make_shared( mock, kAppProfile, kTableName, bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(), - retry_.clone(), std::move(backoff)); + false, retry_.clone(), std::move(backoff)); auto reader = bigtable_internal::MakeRowReader(std::move(impl)); auto it = reader.begin(); @@ -300,7 +316,7 @@ TEST_F(DefaultRowReaderTest, RetrySkipsAlreadyReadRows) { auto impl = std::make_shared( mock, kAppProfile, kTableName, bigtable::RowSet("r1", "r2"), bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(), - retry_.clone(), backoff_.clone()); + false, retry_.clone(), backoff_.clone()); auto reader = bigtable_internal::MakeRowReader(std::move(impl)); auto it = reader.begin(); @@ -346,7 +362,7 @@ TEST_F(DefaultRowReaderTest, RetrySkipsAlreadyScannedRows) { auto impl = std::make_shared( mock, kAppProfile, kTableName, bigtable::RowSet("r1", "r2", "r3"), bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(), - retry_.clone(), backoff_.clone()); + false, retry_.clone(), backoff_.clone()); auto reader = bigtable_internal::MakeRowReader(std::move(impl)); auto it = reader.begin(); @@ -384,7 +400,7 @@ TEST_F(DefaultRowReaderTest, FailedParseIsRetried) { auto impl = std::make_shared( mock, kAppProfile, kTableName, bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(), - std::move(retry), backoff_.clone()); + false, std::move(retry), backoff_.clone()); auto reader = bigtable_internal::MakeRowReader(std::move(impl)); auto it = reader.begin(); @@ -426,7 +442,7 @@ TEST_F(DefaultRowReaderTest, FailedParseSkipsAlreadyReadRows) { auto impl = std::make_shared( mock, kAppProfile, kTableName, bigtable::RowSet("r1", "r2"), bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(), - std::move(retry), backoff_.clone()); + false, std::move(retry), backoff_.clone()); auto reader = bigtable_internal::MakeRowReader(std::move(impl)); auto it = reader.begin(); @@ -477,7 +493,7 @@ TEST_F(DefaultRowReaderTest, FailedParseSkipsAlreadyScannedRows) { auto impl = std::make_shared( mock, kAppProfile, kTableName, bigtable::RowSet("r1", "r2", "r3"), bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(), - std::move(retry), backoff_.clone()); + false, std::move(retry), backoff_.clone()); auto reader = bigtable_internal::MakeRowReader(std::move(impl)); auto it = reader.begin(); @@ -506,7 +522,7 @@ TEST_F(DefaultRowReaderTest, FailedParseWithPermanentError) { auto impl = std::make_shared( mock, kAppProfile, kTableName, bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(), - retry_.clone(), backoff_.clone()); + false, retry_.clone(), backoff_.clone()); auto reader = bigtable_internal::MakeRowReader(std::move(impl)); auto it = reader.begin(); @@ -534,7 +550,7 @@ TEST_F(DefaultRowReaderTest, NoRetryOnEmptyRowSet) { auto impl = std::make_shared( mock, kAppProfile, kTableName, bigtable::RowSet("r1", "r2"), bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(), - retry_.clone(), backoff_.clone()); + false, retry_.clone(), backoff_.clone()); auto reader = bigtable_internal::MakeRowReader(std::move(impl)); auto it = reader.begin(); @@ -559,7 +575,8 @@ TEST_F(DefaultRowReaderTest, RowLimitIsSent) { auto impl = std::make_shared( mock, kAppProfile, kTableName, bigtable::RowSet(), 42, - bigtable::Filter::PassAllFilter(), retry_.clone(), backoff_.clone()); + bigtable::Filter::PassAllFilter(), false, retry_.clone(), + backoff_.clone()); auto reader = bigtable_internal::MakeRowReader(std::move(impl)); auto it = reader.begin(); @@ -590,7 +607,8 @@ TEST_F(DefaultRowReaderTest, RowLimitIsDecreasedOnRetry) { auto impl = std::make_shared( mock, kAppProfile, kTableName, bigtable::RowSet(), 42, - bigtable::Filter::PassAllFilter(), retry_.clone(), backoff_.clone()); + bigtable::Filter::PassAllFilter(), false, retry_.clone(), + backoff_.clone()); auto reader = bigtable_internal::MakeRowReader(std::move(impl)); auto it = reader.begin(); @@ -619,7 +637,8 @@ TEST_F(DefaultRowReaderTest, NoRetryIfRowLimitReached) { auto impl = std::make_shared( mock, kAppProfile, kTableName, bigtable::RowSet(), 1, - bigtable::Filter::PassAllFilter(), retry_.clone(), backoff_.clone()); + bigtable::Filter::PassAllFilter(), false, retry_.clone(), + backoff_.clone()); auto reader = bigtable_internal::MakeRowReader(std::move(impl)); auto it = reader.begin(); @@ -650,7 +669,7 @@ TEST_F(DefaultRowReaderTest, CancelDrainsStream) { auto impl = std::make_shared( mock, kAppProfile, kTableName, bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(), - retry_.clone(), backoff_.clone()); + false, retry_.clone(), backoff_.clone()); auto reader = bigtable_internal::MakeRowReader(std::move(impl)); auto it = reader.begin(); @@ -675,7 +694,7 @@ TEST_F(DefaultRowReaderTest, CancelBeforeBegin) { auto impl = std::make_shared( mock, kAppProfile, kTableName, bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(), - retry_.clone(), backoff_.clone()); + false, retry_.clone(), backoff_.clone()); auto reader = bigtable_internal::MakeRowReader(std::move(impl)); // Manually cancel the call before a stream was created. @@ -700,7 +719,7 @@ TEST_F(DefaultRowReaderTest, RowReaderConstructorDoesNotCallRpc) { auto impl = std::make_shared( mock, kAppProfile, kTableName, bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(), - retry_.clone(), backoff_.clone()); + false, retry_.clone(), backoff_.clone()); auto reader = bigtable_internal::MakeRowReader(std::move(impl)); } @@ -729,7 +748,7 @@ TEST_F(DefaultRowReaderTest, RetryUsesNewContext) { auto impl = std::make_shared( mock, kAppProfile, kTableName, bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(), - retry_.clone(), backoff_.clone()); + false, retry_.clone(), backoff_.clone()); auto reader = bigtable_internal::MakeRowReader(std::move(impl)); auto it = reader.begin(); @@ -738,6 +757,107 @@ TEST_F(DefaultRowReaderTest, RetryUsesNewContext) { EXPECT_EQ(++it, reader.end()); } +TEST_F(DefaultRowReaderTest, ReverseScansSuccess) { + auto mock = std::make_shared(); + EXPECT_CALL(*mock, ReadRows) + .WillOnce([](auto, google::bigtable::v2::ReadRowsRequest const& request) { + EXPECT_TRUE(request.reversed()); + auto stream = std::make_unique(); + ::testing::InSequence s; + EXPECT_CALL(*stream, Read) + .WillOnce(Return(MakeRow("r3"))) + .WillOnce(Return(MakeRow("r2"))) + .WillOnce(Return(MakeRow("r1"))) + .WillOnce(Return(Status())); + return stream; + }); + + internal::OptionsSpan span(TestOptions(/*expected_streams=*/1)); + + auto impl = std::make_shared( + mock, kAppProfile, kTableName, bigtable::RowSet(), + bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(), + true, retry_.clone(), backoff_.clone()); + auto reader = bigtable_internal::MakeRowReader(std::move(impl)); + + EXPECT_THAT( + StatusOrRowKeys(reader), + ElementsAre(IsOkAndHolds("r3"), IsOkAndHolds("r2"), IsOkAndHolds("r1"))); +} + +TEST_F(DefaultRowReaderTest, ReverseScanFailsOnIncreasingRowKeyOrder) { + auto mock = std::make_shared(); + EXPECT_CALL(*mock, ReadRows) + .WillOnce([](auto, google::bigtable::v2::ReadRowsRequest const& request) { + EXPECT_TRUE(request.reversed()); + auto stream = std::make_unique(); + ::testing::InSequence s; + EXPECT_CALL(*stream, Read) + .WillOnce(Return(MakeRow("r1"))) + .WillOnce(Return(MakeRow("r2"))); + EXPECT_CALL(*stream, Cancel); + EXPECT_CALL(*stream, Read).WillOnce(Return(Status())); + return stream; + }); + + internal::OptionsSpan span(TestOptions(/*expected_streams=*/1)); + + auto impl = std::make_shared( + mock, kAppProfile, kTableName, bigtable::RowSet(), + bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(), + true, retry_.clone(), backoff_.clone()); + auto reader = bigtable_internal::MakeRowReader(std::move(impl)); + + EXPECT_THAT( + StatusOrRowKeys(reader), + ElementsAre( + IsOkAndHolds("r1"), + StatusIs(StatusCode::kInternal, + HasSubstr("keys are expected in decreasing order")))); +} + +TEST_F(DefaultRowReaderTest, ReverseScanResumption) { + auto mock = std::make_shared(); + EXPECT_CALL(*mock, ReadRows) + .WillOnce([](auto, google::bigtable::v2::ReadRowsRequest const& request) { + EXPECT_TRUE(request.reversed()); + // We start our call with 3 rows in the set: "r1", "r2", "r3". + EXPECT_THAT(request, RequestWithRowKeysCount(3)); + auto stream = std::make_unique(); + EXPECT_CALL(*stream, Read) + .WillOnce(Return(MakeRow("r3"))) + // Simulate the server returning an empty chunk with + // `last_scanned_row_key` set to "r2". + .WillOnce([]() { + google::bigtable::v2::ReadRowsResponse resp; + resp.set_last_scanned_row_key("r2"); + return resp; + }) + .WillOnce(Return(Status(StatusCode::kUnavailable, "try again"))); + return stream; + }) + .WillOnce([](auto, google::bigtable::v2::ReadRowsRequest const& request) { + EXPECT_TRUE(request.reversed()); + // We retry the remaining rows. We have "r3" returned, but the service + // has also told us that "r2" was scanned. This means there is only one + // row remaining to read: "r1". + EXPECT_THAT(request, RequestWithRowKeysCount(1)); + auto stream = std::make_unique(); + EXPECT_CALL(*stream, Read).WillOnce(Return(Status())); + return stream; + }); + + internal::OptionsSpan span(TestOptions(/*expected_streams=*/2)); + + auto impl = std::make_shared( + mock, kAppProfile, kTableName, bigtable::RowSet("r1", "r2", "r3"), + bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(), + true, retry_.clone(), backoff_.clone()); + auto reader = bigtable_internal::MakeRowReader(std::move(impl)); + + EXPECT_THAT(StatusOrRowKeys(reader), ElementsAre(IsOkAndHolds("r3"))); +} + } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace bigtable_internal diff --git a/google/cloud/bigtable/internal/legacy_async_row_reader.cc b/google/cloud/bigtable/internal/legacy_async_row_reader.cc index c03e4ff011662..80ce9973850d0 100644 --- a/google/cloud/bigtable/internal/legacy_async_row_reader.cc +++ b/google/cloud/bigtable/internal/legacy_async_row_reader.cc @@ -36,7 +36,7 @@ void LegacyAsyncRowReader::MakeRequest() { if (rows_limit_ != NO_ROWS_LIMIT) { request.set_rows_limit(rows_limit_ - rows_count_); } - parser_ = parser_factory_->Create(); + parser_ = parser_factory_->Create(false); auto context = std::make_unique(); rpc_retry_policy_->Setup(*context); diff --git a/google/cloud/bigtable/internal/legacy_row_reader.cc b/google/cloud/bigtable/internal/legacy_row_reader.cc index 3fa1cc5008316..62eeafff9ca3e 100644 --- a/google/cloud/bigtable/internal/legacy_row_reader.cc +++ b/google/cloud/bigtable/internal/legacy_row_reader.cc @@ -81,7 +81,7 @@ void LegacyRowReader::MakeRequest() { stream_ = client_->ReadRows(context_.get(), request); stream_is_open_ = true; - parser_ = parser_factory_->Create(); + parser_ = parser_factory_->Create(false); } bool LegacyRowReader::NextChunk() { diff --git a/google/cloud/bigtable/internal/legacy_row_reader_test.cc b/google/cloud/bigtable/internal/legacy_row_reader_test.cc index 2c17cfa01958a..f872ed81c1678 100644 --- a/google/cloud/bigtable/internal/legacy_row_reader_test.cc +++ b/google/cloud/bigtable/internal/legacy_row_reader_test.cc @@ -53,6 +53,8 @@ using ::testing::SetArgReferee; class ReadRowsParserMock : public bigtable::internal::ReadRowsParser { public: + explicit ReadRowsParserMock() : ReadRowsParser(false) {} + MOCK_METHOD(void, HandleChunkHook, (ReadRowsResponse_CellChunk chunk, grpc::Status& status)); void HandleChunk(ReadRowsResponse_CellChunk chunk, @@ -93,10 +95,10 @@ class ReadRowsParserMockFactory void AddParser(ParserPtr parser) { parsers_.emplace_back(std::move(parser)); } MOCK_METHOD(void, CreateHook, ()); - ParserPtr Create() override { + ParserPtr Create(bool reverse_scan) override { CreateHook(); if (parsers_.empty()) { - return std::make_unique(); + return std::make_unique(reverse_scan); } ParserPtr parser = std::move(parsers_.front()); parsers_.pop_front(); diff --git a/google/cloud/bigtable/internal/readrowsparser.cc b/google/cloud/bigtable/internal/readrowsparser.cc index 45affa983a35c..a07535e1e73d1 100644 --- a/google/cloud/bigtable/internal/readrowsparser.cc +++ b/google/cloud/bigtable/internal/readrowsparser.cc @@ -37,10 +37,19 @@ void ReadRowsParser::HandleChunk(ReadRowsResponse_CellChunk chunk, } if (!chunk.row_key().empty()) { - if (CompareRowKey(last_seen_row_key_, chunk.row_key()) >= 0) { - status = grpc::Status(grpc::StatusCode::INTERNAL, - "Row keys are expected in increasing order"); - return; + if (!last_seen_row_key_.empty()) { + auto c = CompareRowKey(last_seen_row_key_, chunk.row_key()); + if (reverse_ && c <= 0) { + status = grpc::Status( + grpc::StatusCode::INTERNAL, + "Row keys are expected in decreasing order when reverse=true"); + return; + } + if (!reverse_ && c >= 0) { + status = grpc::Status(grpc::StatusCode::INTERNAL, + "Row keys are expected in increasing order"); + return; + } } using std::swap; swap(*chunk.mutable_row_key(), cell_.row); diff --git a/google/cloud/bigtable/internal/readrowsparser.h b/google/cloud/bigtable/internal/readrowsparser.h index 68aa550350adc..1a173256358ae 100644 --- a/google/cloud/bigtable/internal/readrowsparser.h +++ b/google/cloud/bigtable/internal/readrowsparser.h @@ -27,6 +27,7 @@ namespace cloud { namespace bigtable { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN namespace internal { + /** * Transforms a stream of chunks as returned by the ReadRows streaming * RPC into a sequence of rows. @@ -51,7 +52,7 @@ namespace internal { */ class ReadRowsParser { public: - ReadRowsParser() = default; + explicit ReadRowsParser(bool reverse) : reverse_(reverse) {} virtual ~ReadRowsParser() = default; @@ -90,6 +91,9 @@ class ReadRowsParser { std::vector labels; }; + /// If true, we expect row keys in reverse order. + bool reverse_; + /** * Moves partial results into a Cell class. * @@ -127,10 +131,11 @@ class ReadRowsParserFactory { virtual ~ReadRowsParserFactory() = default; /// Returns a newly created parser instance. - virtual std::unique_ptr Create() { - return std::make_unique(); + virtual std::unique_ptr Create(bool reverse) { + return std::make_unique(reverse); } }; + } // namespace internal GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace bigtable diff --git a/google/cloud/bigtable/internal/readrowsparser_test.cc b/google/cloud/bigtable/internal/readrowsparser_test.cc index 9f417d4427bf6..6d0ddee40774f 100644 --- a/google/cloud/bigtable/internal/readrowsparser_test.cc +++ b/google/cloud/bigtable/internal/readrowsparser_test.cc @@ -36,7 +36,7 @@ using ::testing::Not; TEST(ReadRowsParserTest, NoChunksNoRowsSucceeds) { grpc::Status status; - ReadRowsParser parser; + ReadRowsParser parser(false); EXPECT_FALSE(parser.HasNext()); parser.HandleEndOfStream(status); @@ -45,7 +45,7 @@ TEST(ReadRowsParserTest, NoChunksNoRowsSucceeds) { } TEST(ReadRowsParserTest, HandleEndOfStreamCalledTwiceThrows) { - ReadRowsParser parser; + ReadRowsParser parser(false); grpc::Status status; EXPECT_FALSE(parser.HasNext()); parser.HandleEndOfStream(status); @@ -55,7 +55,7 @@ TEST(ReadRowsParserTest, HandleEndOfStreamCalledTwiceThrows) { } TEST(ReadRowsParserTest, HandleChunkAfterEndOfStreamThrows) { - ReadRowsParser parser; + ReadRowsParser parser(false); ReadRowsResponse_CellChunk chunk; grpc::Status status; chunk.set_value_size(1); @@ -70,7 +70,7 @@ TEST(ReadRowsParserTest, HandleChunkAfterEndOfStreamThrows) { TEST(ReadRowsParserTest, SingleChunkSucceeds) { using ::google::protobuf::TextFormat; - ReadRowsParser parser; + ReadRowsParser parser(false); ReadRowsResponse_CellChunk chunk; std::string chunk1 = R"( row_key: "RK" @@ -105,7 +105,7 @@ TEST(ReadRowsParserTest, SingleChunkSucceeds) { TEST(ReadRowsParserTest, NextAfterEndOfStreamSucceeds) { using ::google::protobuf::TextFormat; - ReadRowsParser parser; + ReadRowsParser parser(false); ReadRowsResponse_CellChunk chunk; std::string chunk1 = R"( row_key: "RK" @@ -130,7 +130,7 @@ TEST(ReadRowsParserTest, NextAfterEndOfStreamSucceeds) { } TEST(ReadRowsParserTest, NextWithNoDataThrows) { - ReadRowsParser parser; + ReadRowsParser parser(false); grpc::Status status; EXPECT_FALSE(parser.HasNext()); parser.HandleEndOfStream(status); @@ -214,7 +214,7 @@ class AcceptanceTest : public ::testing::Test { } private: - ReadRowsParser parser_; + ReadRowsParser parser_{false}; std::vector rows_; }; diff --git a/google/cloud/bigtable/options.h b/google/cloud/bigtable/options.h index 4c5291c547dd2..be7ee916d0b77 100644 --- a/google/cloud/bigtable/options.h +++ b/google/cloud/bigtable/options.h @@ -76,6 +76,26 @@ struct AppProfileIdOption { using Type = std::string; }; +/** + * Read rows in reverse order. + * + * The rows will be streamed in reverse lexicographic order of the keys. This is + * particularly useful to get the last N records before a key. + * + * This option does not affect the contents of the rows, just the order that + * the rows are returned. + * + * @note When using this option, the order of row keys in a `bigtable::RowRange` + * does not change. They still must be supplied in lexicographic order. + * + * @snippet read_snippets.cc reverse scan + * + * @ingroup bigtable-options + */ +struct ReverseScanOption { + using Type = bool; +}; + /** * The endpoint for data operations. * diff --git a/google/cloud/bigtable/tests/data_integration_test.cc b/google/cloud/bigtable/tests/data_integration_test.cc index 64b4118072b5b..931fee6ec048d 100644 --- a/google/cloud/bigtable/tests/data_integration_test.cc +++ b/google/cloud/bigtable/tests/data_integration_test.cc @@ -198,6 +198,12 @@ TEST_P(DataIntegrationTest, TableReadRowsAllRows) { auto read4 = table.ReadRows(RowSet(), Filter::PassAllFilter()); CheckEqualUnordered(created, MoveCellsFromReader(read4)); + + if (GetParam() == "with-data-connection" && !UsingCloudBigtableEmulator()) { + auto read5 = table.ReadRows(RowSet(), Filter::PassAllFilter(), + Options{}.set(true)); + CheckEqualUnordered(created, MoveCellsFromReader(read5)); + } } TEST_P(DataIntegrationTest, TableReadRowsPartialRows) {