Skip to content

Commit

Permalink
feat(bigtable): reverse scans
Browse files Browse the repository at this point in the history
  • Loading branch information
dbolduc committed Jul 4, 2023
1 parent 8ba0bf2 commit fb6fba3
Show file tree
Hide file tree
Showing 20 changed files with 455 additions and 94 deletions.
9 changes: 7 additions & 2 deletions google/cloud/bigtable/data_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,15 @@ future<std::vector<FailedMutation>> DataConnection::AsyncBulkApply(
RowReader DataConnection::ReadRows(std::string const& table_name,
RowSet row_set, std::int64_t rows_limit,
Filter filter) {
auto const& options = google::cloud::internal::CurrentOptions();
return ReadRowsFull(ReadRowsParams{
std::move(table_name),
google::cloud::internal::CurrentOptions().get<AppProfileIdOption>(),
std::move(row_set), rows_limit, std::move(filter)});
options.get<AppProfileIdOption>(),
std::move(row_set),
rows_limit,
std::move(filter),
options.get<ReverseScanOption>(),
});
}

// NOLINTNEXTLINE(performance-unnecessary-value-param)
Expand Down
1 change: 1 addition & 0 deletions google/cloud/bigtable/data_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ struct ReadRowsParams {
RowSet row_set;
std::int64_t rows_limit;
Filter filter = Filter::PassAllFilter();
bool reverse = false;
};

/**
Expand Down
40 changes: 33 additions & 7 deletions google/cloud/bigtable/examples/read_snippets.cc
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ void ReadRows(google::cloud::bigtable::Table table,
using ::google::cloud::StatusOr;
[](cbt::Table table) {
// Read and print the rows.
for (auto& row : table.ReadRows(
for (StatusOr<cbt::Row>& row : table.ReadRows(
cbt::RowSet("phone#4c410523#20190501", "phone#4c410523#20190502"),
cbt::Filter::PassAllFilter())) {
if (!row) throw std::move(row).status();
Expand All @@ -266,7 +266,7 @@ void ReadRowRange(google::cloud::bigtable::Table table,
using ::google::cloud::StatusOr;
[](cbt::Table table) {
// Read and print the rows.
for (auto& row :
for (StatusOr<cbt::Row>& row :
table.ReadRows(cbt::RowRange::Range("phone#4c410523#20190501",
"phone#4c410523#201906201"),
cbt::Filter::PassAllFilter())) {
Expand All @@ -285,7 +285,7 @@ void ReadRowRanges(google::cloud::bigtable::Table table,
using ::google::cloud::StatusOr;
[](cbt::Table table) {
// Read and print the rows.
for (auto& row : table.ReadRows(
for (StatusOr<cbt::Row>& row : table.ReadRows(
cbt::RowSet({cbt::RowRange::Range("phone#4c410523#20190501",
"phone#4c410523#20190601"),
cbt::RowRange::Range("phone#5c10102#20190501",
Expand All @@ -306,8 +306,8 @@ void ReadRowPrefix(google::cloud::bigtable::Table table,
using ::google::cloud::StatusOr;
[](cbt::Table table) {
// Read and print the rows.
for (auto& row : table.ReadRows(cbt::RowRange::Prefix("phone"),
cbt::Filter::PassAllFilter())) {
for (StatusOr<cbt::Row>& row : table.ReadRows(
cbt::RowRange::Prefix("phone"), cbt::Filter::PassAllFilter())) {
if (!row) throw std::move(row).status();
PrintRow(*row);
}
Expand All @@ -323,8 +323,9 @@ void ReadFilter(google::cloud::bigtable::Table table,
using ::google::cloud::StatusOr;
[](cbt::Table table) {
// Read and print the rows.
for (auto& row : table.ReadRows(cbt::RowRange::InfiniteRange(),
cbt::Filter::ValueRegex("PQ2A.*"))) {
for (StatusOr<cbt::Row>& row :
table.ReadRows(cbt::RowRange::InfiniteRange(),
cbt::Filter::ValueRegex("PQ2A.*"))) {
if (!row) throw std::move(row).status();
PrintRow(*row);
}
Expand All @@ -333,6 +334,26 @@ void ReadFilter(google::cloud::bigtable::Table table,
(std::move(table));
}

void ReadRowsReverse(google::cloud::bigtable::Table table,
std::vector<std::string> const&) {
//! [reverse scan] [START bigtable_reverse_scan]
namespace cbt = ::google::cloud::bigtable;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](cbt::Table table) {
// Read and print the rows.
for (StatusOr<cbt::Row>& row : table.ReadRows(
cbt::RowRange::RightOpen("phone#5c10102", "phone#5c10103"), 3,
cbt::Filter::PassAllFilter(),
Options{}.set<cbt::ReverseScanOption>(true))) {
if (!row) throw std::move(row).status();
PrintRow(*row);
}
}
//! [reverse scan] [END bigtable_reverse_scan]
(std::move(table));
}

void RunAll(std::vector<std::string> const& argv) {
namespace examples = ::google::cloud::bigtable::examples;
namespace cbt = ::google::cloud::bigtable;
Expand Down Expand Up @@ -390,6 +411,10 @@ void RunAll(std::vector<std::string> const& argv) {
ReadFilter(table, {});
std::cout << "Running ReadRowsWithLimit() example" << std::endl;
ReadRowsWithLimit(table, {"5"});
if (!google::cloud::bigtable::examples::UsingEmulator()) {
std::cout << "Running ReadRowsReverse() example" << std::endl;
ReadRowsReverse(table, {});
}

std::cout << "Running ReadKeySet() example" << std::endl;
ReadKeysSet({table.project_id(), table.instance_id(), table.table_id(),
Expand All @@ -414,6 +439,7 @@ int main(int argc, char* argv[]) try {
MakeCommandEntry("read-row-ranges", {}, ReadRowRanges),
MakeCommandEntry("read-row-prefix", {}, ReadRowPrefix),
MakeCommandEntry("read-filter", {}, ReadFilter),
MakeCommandEntry("read-rows-reverse", {}, ReadRowsReverse),
{"auto", RunAll},
};

Expand Down
3 changes: 2 additions & 1 deletion google/cloud/bigtable/internal/async_row_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ void AsyncRowReader::MakeRequest() {

request.set_app_profile_id(app_profile_id_);
request.set_table_name(table_name_);
request.set_reversed(reverse_);
auto row_set_proto = row_set_.as_proto();
request.mutable_rows()->Swap(&row_set_proto);

Expand All @@ -39,7 +40,7 @@ void AsyncRowReader::MakeRequest() {
if (rows_limit_ != NO_ROWS_LIMIT) {
request.set_rows_limit(rows_limit_ - rows_count_);
}
parser_ = bigtable::internal::ReadRowsParserFactory().Create();
parser_ = bigtable::internal::ReadRowsParserFactory().Create(reverse_);

internal::ScopedCallContext scope(call_context_);
auto context = std::make_shared<grpc::ClientContext>();
Expand Down
8 changes: 5 additions & 3 deletions google/cloud/bigtable/internal/async_row_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ class AsyncRowReader : public std::enable_shared_from_this<AsyncRowReader> {
std::string app_profile_id, std::string table_name,
RowFunctor on_row, FinishFunctor on_finish,
bigtable::RowSet row_set, std::int64_t rows_limit,
bigtable::Filter filter,
bigtable::Filter filter, bool reverse,
std::unique_ptr<bigtable::DataRetryPolicy> retry_policy,
std::unique_ptr<BackoffPolicy> backoff_policy) {
auto reader = std::shared_ptr<AsyncRowReader>(new AsyncRowReader(
std::move(cq), std::move(stub), std::move(app_profile_id),
std::move(table_name), std::move(on_row), std::move(on_finish),
std::move(row_set), rows_limit, std::move(filter),
std::move(row_set), rows_limit, std::move(filter), reverse,
std::move(retry_policy), std::move(backoff_policy)));
reader->MakeRequest();
}
Expand All @@ -73,7 +73,7 @@ class AsyncRowReader : public std::enable_shared_from_this<AsyncRowReader> {
std::string app_profile_id, std::string table_name,
RowFunctor on_row, FinishFunctor on_finish,
bigtable::RowSet row_set, std::int64_t rows_limit,
bigtable::Filter filter,
bigtable::Filter filter, bool reverse,
std::unique_ptr<bigtable::DataRetryPolicy> retry_policy,
std::unique_ptr<BackoffPolicy> backoff_policy)
: cq_(std::move(cq)),
Expand All @@ -85,6 +85,7 @@ class AsyncRowReader : public std::enable_shared_from_this<AsyncRowReader> {
row_set_(std::move(row_set)),
rows_limit_(rows_limit),
filter_(std::move(filter)),
reverse_(std::move(reverse)),
retry_policy_(std::move(retry_policy)),
backoff_policy_(std::move(backoff_policy)) {}

Expand Down Expand Up @@ -129,6 +130,7 @@ class AsyncRowReader : public std::enable_shared_from_this<AsyncRowReader> {
bigtable::RowSet row_set_;
std::int64_t rows_limit_;
bigtable::Filter filter_;
bool reverse_;
std::unique_ptr<bigtable::DataRetryPolicy> retry_policy_;
std::unique_ptr<BackoffPolicy> backoff_policy_;
std::unique_ptr<bigtable::internal::ReadRowsParser> parser_;
Expand Down
Loading

0 comments on commit fb6fba3

Please sign in to comment.