Skip to content

Commit

Permalink
Add a new API Transaction::GetAttributeGroupIterator (#13119)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #13119

The patch adds a new API `Transaction::GetAttributeGroupIterator` that can be used to create a multi-column-family attribute group iterator over the specified column families, including the data from both the transaction and the underlying database. This API is currently supported for optimistic and write-committed pessimistic transactions.

Reviewed By: jowlyzhang

Differential Revision: D65548324

fbshipit-source-id: 0fb8a22129494770fdba3d6024eef72b3e051136
  • Loading branch information
ltamasi authored and jowlyzhang committed Nov 7, 2024
1 parent 7d2fe50 commit 026659f
Show file tree
Hide file tree
Showing 9 changed files with 604 additions and 4 deletions.
3 changes: 1 addition & 2 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4006,8 +4006,7 @@ std::unique_ptr<IterType> DBImpl::NewMultiCfIterator(
cfh_iter_pairs;
cfh_iter_pairs.reserve(column_families.size());
for (size_t i = 0; i < column_families.size(); ++i) {
cfh_iter_pairs.emplace_back(column_families[i],
std::unique_ptr<Iterator>(child_iterators[i]));
cfh_iter_pairs.emplace_back(column_families[i], child_iterators[i]);
}

return std::make_unique<ImplType>(_read_options,
Expand Down
15 changes: 15 additions & 0 deletions include/rocksdb/utilities/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,21 @@ class Transaction {
virtual Iterator* GetIterator(const ReadOptions& read_options,
ColumnFamilyHandle* column_family) = 0;

// Returns a multi-column-family attribute group iterator for the given column
// families that includes both keys in the DB and uncommitted keys in this
// transaction.
//
// Setting read_options.snapshot will affect what is read from the
// DB but will NOT change which keys are read from this transaction (the keys
// in this transaction do not yet belong to any snapshot and will be fetched
// regardless).
//
// The returned iterator is only valid until Commit(), Rollback(), or
// RollbackToSavePoint() is called.
virtual std::unique_ptr<AttributeGroupIterator> GetAttributeGroupIterator(
const ReadOptions& read_options,
const std::vector<ColumnFamilyHandle*>& column_families) = 0;

// Put, PutEntity, Merge, Delete, and SingleDelete behave similarly to the
// corresponding functions in WriteBatch, but will also do conflict checking
// on the keys being written.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added a new API `Transaction::GetAttributeGroupIterator` that can be used to create a multi-column-family attribute group iterator over the specified column families, including the data from both the transaction and the underlying database. This API is currently supported for optimistic and write-committed pessimistic transactions.
235 changes: 235 additions & 0 deletions utilities/transactions/optimistic_transaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2222,6 +2222,241 @@ TEST_P(OptimisticTransactionTest, EntityReadSanityChecks) {
}
}

TEST_P(OptimisticTransactionTest, AttributeGroupIterator) {
ColumnFamilyOptions cf_opts;
cf_opts.enable_blob_files = true;

ColumnFamilyHandle* cfh1 = nullptr;
ASSERT_OK(txn_db->CreateColumnFamily(cf_opts, "cf1", &cfh1));
std::unique_ptr<ColumnFamilyHandle> cfh1_guard(cfh1);

ColumnFamilyHandle* cfh2 = nullptr;
ASSERT_OK(txn_db->CreateColumnFamily(cf_opts, "cf2", &cfh2));
std::unique_ptr<ColumnFamilyHandle> cfh2_guard(cfh2);

// Note: "cf1" keys are present only in CF1; "cf2" keys are only present in
// CF2; "cf12" keys are present in both CFs. "a" keys are present only in the
// database; "b" keys are present only in the transaction; "c" keys are
// present in both the database and the transaction. The values indicate the
// column family as well as whether the entry came from the database or the
// transaction.

ASSERT_OK(txn_db->Put(WriteOptions(), cfh1, "cf1_a", "cf1_a_db_cf1"));
ASSERT_OK(txn_db->Put(WriteOptions(), cfh1, "cf1_c", "cf1_c_db_cf1"));

ASSERT_OK(txn_db->Put(WriteOptions(), cfh2, "cf2_a", "cf2_a_db_cf2"));
ASSERT_OK(txn_db->Put(WriteOptions(), cfh2, "cf2_c", "cf2_c_db_cf2"));

ASSERT_OK(txn_db->Put(WriteOptions(), cfh1, "cf12_a", "cf12_a_db_cf1"));
ASSERT_OK(txn_db->Put(WriteOptions(), cfh2, "cf12_a", "cf12_a_db_cf2"));
ASSERT_OK(txn_db->Put(WriteOptions(), cfh1, "cf12_c", "cf12_c_db_cf1"));
ASSERT_OK(txn_db->Put(WriteOptions(), cfh2, "cf12_c", "cf12_c_db_cf2"));

ASSERT_OK(txn_db->Flush(FlushOptions(), cfh1));
ASSERT_OK(txn_db->Flush(FlushOptions(), cfh2));

std::unique_ptr<Transaction> txn(txn_db->BeginTransaction(WriteOptions()));

ASSERT_OK(txn->Put(cfh1, "cf1_b", "cf1_b_txn_cf1"));
ASSERT_OK(txn->Put(cfh1, "cf1_c", "cf1_c_txn_cf1"));

ASSERT_OK(txn->Put(cfh2, "cf2_b", "cf2_b_txn_cf2"));
ASSERT_OK(txn->Put(cfh2, "cf2_c", "cf2_c_txn_cf2"));

ASSERT_OK(txn->Put(cfh1, "cf12_b", "cf12_b_txn_cf1"));
ASSERT_OK(txn->Put(cfh2, "cf12_b", "cf12_b_txn_cf2"));
ASSERT_OK(txn->Put(cfh1, "cf12_c", "cf12_c_txn_cf1"));
ASSERT_OK(txn->Put(cfh2, "cf12_c", "cf12_c_txn_cf2"));

auto verify = [&](bool allow_unprepared_value, auto prepare_if_needed) {
ReadOptions read_options;
read_options.allow_unprepared_value = allow_unprepared_value;

std::unique_ptr<AttributeGroupIterator> iter(
txn->GetAttributeGroupIterator(read_options, {cfh1, cfh2}));

{
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "cf12_a");

prepare_if_needed(iter.get());

WideColumns cf1_columns{{kDefaultWideColumnName, "cf12_a_db_cf1"}};
WideColumns cf2_columns{{kDefaultWideColumnName, "cf12_a_db_cf2"}};
IteratorAttributeGroups expected{
IteratorAttributeGroup{cfh1, &cf1_columns},
IteratorAttributeGroup{cfh2, &cf2_columns}};
ASSERT_EQ(iter->attribute_groups(), expected);
}

{
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "cf12_b");

prepare_if_needed(iter.get());

WideColumns cf1_columns{{kDefaultWideColumnName, "cf12_b_txn_cf1"}};
WideColumns cf2_columns{{kDefaultWideColumnName, "cf12_b_txn_cf2"}};
IteratorAttributeGroups expected{
IteratorAttributeGroup{cfh1, &cf1_columns},
IteratorAttributeGroup{cfh2, &cf2_columns}};
ASSERT_EQ(iter->attribute_groups(), expected);
}

{
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "cf12_c");

prepare_if_needed(iter.get());

WideColumns cf1_columns{{kDefaultWideColumnName, "cf12_c_txn_cf1"}};
WideColumns cf2_columns{{kDefaultWideColumnName, "cf12_c_txn_cf2"}};
IteratorAttributeGroups expected{
IteratorAttributeGroup{cfh1, &cf1_columns},
IteratorAttributeGroup{cfh2, &cf2_columns}};
ASSERT_EQ(iter->attribute_groups(), expected);
}

{
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "cf1_a");

prepare_if_needed(iter.get());

WideColumns cf1_columns{{kDefaultWideColumnName, "cf1_a_db_cf1"}};
IteratorAttributeGroups expected{
IteratorAttributeGroup{cfh1, &cf1_columns}};
ASSERT_EQ(iter->attribute_groups(), expected);
}

{
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "cf1_b");

prepare_if_needed(iter.get());

WideColumns cf1_columns{{kDefaultWideColumnName, "cf1_b_txn_cf1"}};
IteratorAttributeGroups expected{
IteratorAttributeGroup{cfh1, &cf1_columns}};
ASSERT_EQ(iter->attribute_groups(), expected);
}

{
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "cf1_c");

prepare_if_needed(iter.get());

WideColumns cf1_columns{{kDefaultWideColumnName, "cf1_c_txn_cf1"}};
IteratorAttributeGroups expected{
IteratorAttributeGroup{cfh1, &cf1_columns}};
ASSERT_EQ(iter->attribute_groups(), expected);
}

{
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "cf2_a");

prepare_if_needed(iter.get());

WideColumns cf2_columns{{kDefaultWideColumnName, "cf2_a_db_cf2"}};
IteratorAttributeGroups expected{
IteratorAttributeGroup{cfh2, &cf2_columns}};
ASSERT_EQ(iter->attribute_groups(), expected);
}

{
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "cf2_b");

prepare_if_needed(iter.get());

WideColumns cf2_columns{{kDefaultWideColumnName, "cf2_b_txn_cf2"}};
IteratorAttributeGroups expected{
IteratorAttributeGroup{cfh2, &cf2_columns}};
ASSERT_EQ(iter->attribute_groups(), expected);
}

{
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "cf2_c");

prepare_if_needed(iter.get());

WideColumns cf2_columns{{kDefaultWideColumnName, "cf2_c_txn_cf2"}};
IteratorAttributeGroups expected{
IteratorAttributeGroup{cfh2, &cf2_columns}};
ASSERT_EQ(iter->attribute_groups(), expected);
}

{
iter->Next();
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
}
};

verify(/* allow_unprepared_value */ false, [](AttributeGroupIterator*) {});
verify(/* allow_unprepared_value */ true, [](AttributeGroupIterator* iter) {
ASSERT_TRUE(iter->attribute_groups().empty());
ASSERT_TRUE(iter->PrepareValue());
});
}

TEST_P(OptimisticTransactionTest, AttributeGroupIteratorSanityChecks) {
ColumnFamilyOptions cf1_opts;
ColumnFamilyHandle* cfh1 = nullptr;
ASSERT_OK(txn_db->CreateColumnFamily(cf1_opts, "cf1", &cfh1));
std::unique_ptr<ColumnFamilyHandle> cfh1_guard(cfh1);

ColumnFamilyOptions cf2_opts;
cf2_opts.comparator = ReverseBytewiseComparator();
ColumnFamilyHandle* cfh2 = nullptr;
ASSERT_OK(txn_db->CreateColumnFamily(cf2_opts, "cf2", &cfh2));
std::unique_ptr<ColumnFamilyHandle> cfh2_guard(cfh2);

std::unique_ptr<Transaction> txn(txn_db->BeginTransaction(WriteOptions()));

{
std::unique_ptr<AttributeGroupIterator> iter(
txn->GetAttributeGroupIterator(ReadOptions(), {}));
ASSERT_TRUE(iter->status().IsInvalidArgument());
}

{
std::unique_ptr<AttributeGroupIterator> iter(
txn->GetAttributeGroupIterator(ReadOptions(), {cfh1, cfh2}));
ASSERT_TRUE(iter->status().IsInvalidArgument());
}

{
ReadOptions read_options;
read_options.io_activity = Env::IOActivity::kCompaction;

std::unique_ptr<AttributeGroupIterator> iter(
txn->GetAttributeGroupIterator(read_options, {cfh1}));
ASSERT_TRUE(iter->status().IsInvalidArgument());
}
}

INSTANTIATE_TEST_CASE_P(
InstanceOccGroup, OptimisticTransactionTest,
testing::Values(OccValidationPolicy::kValidateSerial,
Expand Down
59 changes: 59 additions & 0 deletions utilities/transactions/transaction_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include <cinttypes>

#include "db/attribute_group_iterator_impl.h"
#include "db/column_family.h"
#include "db/db_impl/db_impl.h"
#include "logging/logging.h"
Expand Down Expand Up @@ -486,6 +487,64 @@ Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options,
&read_options);
}

template <typename IterType, typename ImplType, typename ErrorIteratorFuncType>
std::unique_ptr<IterType> TransactionBaseImpl::NewMultiCfIterator(
const ReadOptions& read_options,
const std::vector<ColumnFamilyHandle*>& column_families,
ErrorIteratorFuncType error_iterator_func) {
if (column_families.empty()) {
return error_iterator_func(
Status::InvalidArgument("No Column Family was provided"));
}

const Comparator* const first_comparator =
column_families[0]->GetComparator();
assert(first_comparator);

for (size_t i = 1; i < column_families.size(); ++i) {
const Comparator* cf_comparator = column_families[i]->GetComparator();
assert(cf_comparator);

if (first_comparator != cf_comparator &&
first_comparator->GetId() != cf_comparator->GetId()) {
return error_iterator_func(Status::InvalidArgument(
"Different comparators are being used across CFs"));
}
}

std::vector<Iterator*> child_iterators;
const Status s =
db_->NewIterators(read_options, column_families, &child_iterators);
if (!s.ok()) {
return error_iterator_func(s);
}

assert(column_families.size() == child_iterators.size());

std::vector<std::pair<ColumnFamilyHandle*, std::unique_ptr<Iterator>>>
cfh_iter_pairs;
cfh_iter_pairs.reserve(column_families.size());
for (size_t i = 0; i < column_families.size(); ++i) {
cfh_iter_pairs.emplace_back(
column_families[i],
write_batch_.NewIteratorWithBase(column_families[i], child_iterators[i],
&read_options));
}

return std::make_unique<ImplType>(read_options,
column_families[0]->GetComparator(),
std::move(cfh_iter_pairs));
}

std::unique_ptr<AttributeGroupIterator>
TransactionBaseImpl::GetAttributeGroupIterator(
const ReadOptions& read_options,
const std::vector<ColumnFamilyHandle*>& column_families) {
return NewMultiCfIterator<AttributeGroupIterator, AttributeGroupIteratorImpl>(
read_options, column_families,
[](const Status& s) { return NewAttributeGroupErrorIterator(s); });
}

Status TransactionBaseImpl::PutEntityImpl(ColumnFamilyHandle* column_family,
const Slice& key,
const WideColumns& columns,
Expand Down
11 changes: 11 additions & 0 deletions utilities/transactions/transaction_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ class TransactionBaseImpl : public Transaction {
Iterator* GetIterator(const ReadOptions& read_options,
ColumnFamilyHandle* column_family) override;

std::unique_ptr<AttributeGroupIterator> GetAttributeGroupIterator(
const ReadOptions& read_options,
const std::vector<ColumnFamilyHandle*>& column_families) override;

Status Put(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value, const bool assume_tracked = false) override;
Status Put(const Slice& key, const Slice& value) override {
Expand Down Expand Up @@ -304,6 +308,13 @@ class TransactionBaseImpl : public Transaction {
LockTracker& GetTrackedLocks() { return *tracked_locks_; }

protected:
template <typename IterType, typename ImplType,
typename ErrorIteratorFuncType>
std::unique_ptr<IterType> NewMultiCfIterator(
const ReadOptions& read_options,
const std::vector<ColumnFamilyHandle*>& column_families,
ErrorIteratorFuncType error_iterator_func);

Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, std::string* value) override;

Expand Down
Loading

0 comments on commit 026659f

Please sign in to comment.