Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

expose seqno from compaction filter and iterator #215

Merged
merged 3 commits into from
Dec 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion db/compact_files_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ TEST_F(CompactFilesTest, FilterContext) {
db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
usleep(10000); // Wait for compaction start.
ASSERT_EQ(expected_context->start_key, Slice("1"));
ASSERT_EQ(expected_context->end_key, Slice("99"));
ASSERT_EQ(expected_context->is_end_key_inclusive, 1);
ASSERT_EQ(expected_context->file_numbers[0], 11);
ASSERT_EQ(*compaction_count.get(), 2);
Expand Down
7 changes: 4 additions & 3 deletions db/compaction/compaction_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,11 @@ void CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
// Hack: pass internal key to BlobIndexCompactionFilter since it needs
// to get sequence number.
Slice& filter_key = ikey_.type == kTypeValue ? ikey_.user_key : key_;
SequenceNumber seqno = ikey_.sequence;
{
StopWatchNano timer(env_, report_detailed_time_);
filter = compaction_filter_->FilterV2(
compaction_->level(), filter_key, value_type, value_,
filter = compaction_filter_->FilterV3(
compaction_->level(), filter_key, seqno, value_type, value_,
&compaction_filter_value_, compaction_filter_skip_until_.rep());
iter_stats_.total_filter_time +=
env_ != nullptr && report_detailed_time_ ? timer.ElapsedNanos() : 0;
Expand All @@ -185,7 +186,7 @@ void CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
cmp_->Compare(*compaction_filter_skip_until_.rep(), ikey_.user_key) <=
0) {
// Can't skip to a key smaller than the current one.
// Keep the key as per FilterV2 documentation.
// Keep the key as per FilterV3 documentation.
filter = CompactionFilter::Decision::kKeep;
}

Expand Down
8 changes: 8 additions & 0 deletions db/db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,11 @@ class DBIter final: public Iterator {
return iter_.value();
}
}
bool seqno(SequenceNumber* no) const override {
assert(valid_);
*no = ikey_.sequence;
hicqu marked this conversation as resolved.
Show resolved Hide resolved
return true;
}
Status status() const override {
if (status_.ok()) {
return iter_.status();
Expand Down Expand Up @@ -1602,6 +1607,9 @@ inline void ArenaWrappedDBIter::Next() { db_iter_->Next(); }
inline void ArenaWrappedDBIter::Prev() { db_iter_->Prev(); }
inline Slice ArenaWrappedDBIter::key() const { return db_iter_->key(); }
inline Slice ArenaWrappedDBIter::value() const { return db_iter_->value(); }
inline bool ArenaWrappedDBIter::seqno(SequenceNumber* no) const {
return db_iter_->seqno(no);
}
inline Status ArenaWrappedDBIter::status() const { return db_iter_->status(); }
bool ArenaWrappedDBIter::IsBlob() const { return db_iter_->IsBlob(); }
inline Status ArenaWrappedDBIter::GetProperty(std::string prop_name,
Expand Down
1 change: 1 addition & 0 deletions db/db_iter.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class ArenaWrappedDBIter : public Iterator {
virtual void Prev() override;
virtual Slice key() const override;
virtual Slice value() const override;
virtual bool seqno(SequenceNumber* no) const override;
virtual Status status() const override;
virtual Status Refresh() override;
bool IsBlob() const;
Expand Down
51 changes: 31 additions & 20 deletions include/rocksdb/compaction_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,30 @@ class CompactionFilter {
return false;
}

// Almost same as FilterV3, except won't pass out sequence numbers.
virtual Decision FilterV2(int level, const Slice& key, ValueType value_type,
const Slice& existing_value, std::string* new_value,
std::string* /*skip_until*/) const {
switch (value_type) {
case ValueType::kValue: {
bool value_changed = false;
bool rv = Filter(level, key, existing_value, new_value, &value_changed);
if (rv) {
return Decision::kRemove;
}
return value_changed ? Decision::kChangeValue : Decision::kKeep;
}
case ValueType::kMergeOperand: {
bool rv = FilterMergeOperand(level, key, existing_value);
return rv ? Decision::kRemove : Decision::kKeep;
}
case ValueType::kBlobIndex:
return Decision::kKeep;
}
assert(false);
return Decision::kKeep;
}

// An extended API. Called for both values and merge operands.
// Allows changing value and skipping ranges of keys.
// The default implementation uses Filter() and FilterMergeOperand().
Expand Down Expand Up @@ -169,27 +193,14 @@ class CompactionFilter {
// is a write conflict and may allow a Transaction to Commit that should have
// failed. Instead, it is better to implement any Merge filtering inside the
// MergeOperator.
virtual Decision FilterV2(int level, const Slice& key, ValueType value_type,
//
// Note: for kTypeBlobIndex, `key` is internal key instead of user key.
virtual Decision FilterV3(int level, const Slice& key,
SequenceNumber /*seqno*/, ValueType value_type,
const Slice& existing_value, std::string* new_value,
std::string* /*skip_until*/) const {
switch (value_type) {
case ValueType::kValue: {
bool value_changed = false;
bool rv = Filter(level, key, existing_value, new_value, &value_changed);
if (rv) {
return Decision::kRemove;
}
return value_changed ? Decision::kChangeValue : Decision::kKeep;
}
case ValueType::kMergeOperand: {
bool rv = FilterMergeOperand(level, key, existing_value);
return rv ? Decision::kRemove : Decision::kKeep;
}
case ValueType::kBlobIndex:
return Decision::kKeep;
}
assert(false);
return Decision::kKeep;
std::string* skip_until) const {
return FilterV2(level, key, value_type, existing_value, new_value,
skip_until);
}

// This function is deprecated. Snapshots will always be ignored for
Expand Down
6 changes: 6 additions & 0 deletions include/rocksdb/iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "rocksdb/cleanable.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "rocksdb/types.h"

namespace rocksdb {

Expand Down Expand Up @@ -78,6 +79,11 @@ class Iterator : public Cleanable {
// REQUIRES: Valid()
virtual Slice value() const = 0;

// Return the sequence number for the current entry if it's available.
// Return false if it's not available.
// REQUIRES: Valid()
virtual bool seqno(SequenceNumber* /*seqno*/) const { return false; }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you need to update titan to support this method?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not necessary because now we only use it for write CF, which its value is very small.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's still update Titan. It sounds like a 坑 in the future.


// If an error has occurred, return it. Else return an ok status.
// If non-blocking IO is requested and this operation cannot be
// satisfied without doing some IO, then this returns Status::Incomplete().
Expand Down
2 changes: 2 additions & 0 deletions utilities/blob_db/blob_db_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ class BlobDBIterator : public Iterator {
return value_;
}

bool seqno(SequenceNumber* no) const override { return iter_->seqno(no); }

// Iterator::Refresh() not supported.

private:
Expand Down
2 changes: 2 additions & 0 deletions utilities/ttl/db_ttl_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ class TtlIterator : public Iterator {
return trimmed_value;
}

bool seqno(SequenceNumber* no) const override { return iter_->seqno(no); }

Status status() const override { return iter_->status(); }

private:
Expand Down