Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix corruption with intra-L0 on ingested files #5958

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Rocksdb Change Log
## Unreleased
### Bug Fixes
* Fix data corruption casued by output of intra-L0 compaction on ingested file not being placed in correct order in L0.

### Public API Change
* Added an API GetCreationTimeOfOldestFile(uint64_t* creation_time) to get the
file_creation_time of the oldest SST file in the DB.
Expand Down
11 changes: 10 additions & 1 deletion db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,12 @@ void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm,
refs.store(1, std::memory_order_relaxed);
}

SequenceNumber SuperVersion::GetEarliestMemTableSequenceNumber() const {
SequenceNumber earliest_seqno = std::min(
mem->GetEarliestSequenceNumber(), imm->GetEarliestSequenceNumber(false));
return earliest_seqno;
}

namespace {
void SuperVersionUnrefHandle(void* ptr) {
// UnrefHandle is called when a thread exists or a ThreadLocalPtr gets
Expand Down Expand Up @@ -929,8 +935,11 @@ bool ColumnFamilyData::NeedsCompaction() const {

Compaction* ColumnFamilyData::PickCompaction(
const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
SequenceNumber earliest_mem_seqno =
super_version_->GetEarliestMemTableSequenceNumber();
auto* result = compaction_picker_->PickCompaction(
GetName(), mutable_options, current_->storage_info(), log_buffer);
GetName(), mutable_options, current_->storage_info(), log_buffer,
earliest_mem_seqno);
if (result != nullptr) {
result->SetInputVersion(current_);
}
Expand Down
1 change: 1 addition & 0 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ struct SuperVersion {
void Cleanup();
void Init(MemTable* new_mem, MemTableListVersion* new_imm,
Version* new_current);
SequenceNumber GetEarliestMemTableSequenceNumber() const;

// The value of dummy is not actually used. kSVInUse takes its address as a
// mark in the thread local storage to indicate the SuperVersion is in use
Expand Down
39 changes: 28 additions & 11 deletions db/compaction/compaction_picker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,32 +39,49 @@ bool FindIntraL0Compaction(const std::vector<FileMetaData*>& level_files,
size_t min_files_to_compact,
uint64_t max_compact_bytes_per_del_file,
uint64_t max_compaction_bytes,
CompactionInputFiles* comp_inputs) {
size_t compact_bytes = static_cast<size_t>(level_files[0]->fd.file_size);
uint64_t compensated_compact_bytes = level_files[0]->compensated_file_size;
CompactionInputFiles* comp_inputs,
SequenceNumber earliest_mem_seqno) {

// Do not pick ingested file when there is at least one memtable not flushed which of seqno is overlap with the sst.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it would work to pick the span of files for intra-L0 starting at the smallest index i for which level_files[i]->fd.largest_seqno < oldest_mem_seqno?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a nicer phrasing of a suggestion I had in a comment. I don't see anything wrong with this right now, but I'd like to think harder about it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I loaded the page before you reviewed and didn't notice the repetition. I haven't thought of any problems with it. I also plan to experiment with relaxing the restriction more generally on intra-L0 picking from the newest file as it might be related to our imports getting stuck.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would try it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach looks good to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrap the line to follow 80-char rule. If possible, please try to run "make format".

Copy link
Contributor

@yiwu-arbug yiwu-arbug Oct 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ajkr It seems to me it could still order the output to be newer than L0_(i-1) if L0_i is an ingested file and L0_i->largest_seqno < oldest_mem_seqno but L0_i->largest_seqno > L0_(i-1)->largest_seqno.

size_t start = 0;
for (; start < level_files.size(); start++) {
if (level_files[start]->being_compacted) {
return false;
}
// If there is no data in memtable, the earliest sequence number would the largest sequence number in last memtable.
if (level_files[start]->fd.largest_seqno <= earliest_mem_seqno) {
break;
}
}
if (start >= level_files.size()) {
return false;
}
size_t compact_bytes = static_cast<size_t>(level_files[start]->fd.file_size);
uint64_t compensated_compact_bytes = level_files[start]->compensated_file_size;
size_t compact_bytes_per_del_file = port::kMaxSizet;
// Compaction range will be [0, span_len).
Copy link
Contributor

@riversand963 riversand963 Oct 31, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Compaction range will be [0, span_len).

This comment seems to be out-dated because we are compacting [start, limit)

size_t span_len;
size_t limit;
// Pull in files until the amount of compaction work per deleted file begins
// increasing or maximum total compaction size is reached.
size_t new_compact_bytes_per_del_file = 0;
for (span_len = 1; span_len < level_files.size(); ++span_len) {
compact_bytes += static_cast<size_t>(level_files[span_len]->fd.file_size);
compensated_compact_bytes += level_files[span_len]->compensated_file_size;
new_compact_bytes_per_del_file = compact_bytes / span_len;
if (level_files[span_len]->being_compacted ||
for (limit = start + 1; limit < level_files.size(); ++limit) {
compact_bytes += static_cast<size_t>(level_files[limit]->fd.file_size);
compensated_compact_bytes += level_files[limit]->compensated_file_size;
new_compact_bytes_per_del_file = compact_bytes / (limit - start);
if (level_files[limit]->fd.largest_seqno >= earliest_mem_seqno ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know how level_files[limit]->fd.largest_seqno >= earliest_mem_seqno could possibly be true here, but if this is true, and we picked up some files supposed to be newer than this file and compact them together, it feels hard to reason about whether this is correct.

I don't think level_files[limit]->fd.largest_seqno > earliest_mem_seqno is ever true here (we can even add an assertion), but in case it does, can we just return false?

Regarding the == case, this is mystery. It feels inconsistent with the previous check of

    if (level_files[start]->fd.largest_seqno <= earliest_mem_seqno) {
      break;
    }

Can we treat the equality case consistently? Should we include the file whose largest_seqno equals earliest_mem_seqno or not? We may already included level_files[start] which satisfies the equality condition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry. I made a mistake. Largest seqno of files in L0 are sorted in descending ordered. So I just need to check the position which start to pick.

level_files[limit]->being_compacted ||
new_compact_bytes_per_del_file > compact_bytes_per_del_file ||
compensated_compact_bytes > max_compaction_bytes) {
break;
}
compact_bytes_per_del_file = new_compact_bytes_per_del_file;
}

if (span_len >= min_files_to_compact &&
if ((limit - start) >= min_files_to_compact &&
compact_bytes_per_del_file < max_compact_bytes_per_del_file) {
assert(comp_inputs != nullptr);
comp_inputs->level = 0;
for (size_t i = 0; i < span_len; ++i) {
for (size_t i = start; i < limit; ++i) {
comp_inputs->files.push_back(level_files[i]);
}
return true;
Expand Down
27 changes: 14 additions & 13 deletions db/compaction/compaction_picker.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ class CompactionPicker {
// Returns nullptr if there is no compaction to be done.
// Otherwise returns a pointer to a heap-allocated object that
// describes the compaction. Caller should delete the result.
virtual Compaction* PickCompaction(const std::string& cf_name,
const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage,
LogBuffer* log_buffer) = 0;
virtual Compaction* PickCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, LogBuffer* log_buffer,
SequenceNumber earliest_memtable_seqno = kMaxSequenceNumber) = 0;

// Return a compaction object for compacting the range [begin,end] in
// the specified level. Returns nullptr if there is nothing in that
Expand Down Expand Up @@ -247,10 +247,11 @@ class NullCompactionPicker : public CompactionPicker {
virtual ~NullCompactionPicker() {}

// Always return "nullptr"
Compaction* PickCompaction(const std::string& /*cf_name*/,
const MutableCFOptions& /*mutable_cf_options*/,
VersionStorageInfo* /*vstorage*/,
LogBuffer* /*log_buffer*/) override {
Compaction* PickCompaction(
const std::string& /*cf_name*/,
const MutableCFOptions& /*mutable_cf_options*/,
VersionStorageInfo* /*vstorage*/, LogBuffer* /* log_buffer */,
SequenceNumber /* earliest_memtable_seqno */) override {
return nullptr;
}

Expand Down Expand Up @@ -292,11 +293,11 @@ class NullCompactionPicker : public CompactionPicker {
// files. Cannot be nullptr.
//
// @return true iff compaction was found.
bool FindIntraL0Compaction(const std::vector<FileMetaData*>& level_files,
size_t min_files_to_compact,
uint64_t max_compact_bytes_per_del_file,
uint64_t max_compaction_bytes,
CompactionInputFiles* comp_inputs);
bool FindIntraL0Compaction(
const std::vector<FileMetaData*>& level_files, size_t min_files_to_compact,
uint64_t max_compact_bytes_per_del_file, uint64_t max_compaction_bytes,
CompactionInputFiles* comp_inputs,
SequenceNumber earliest_mem_seqno = kMaxSequenceNumber);

CompressionType GetCompressionType(const ImmutableCFOptions& ioptions,
const VersionStorageInfo* vstorage,
Expand Down
3 changes: 2 additions & 1 deletion db/compaction/compaction_picker_fifo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,8 @@ Compaction* FIFOCompactionPicker::PickSizeCompaction(

Compaction* FIFOCompactionPicker::PickCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, LogBuffer* log_buffer) {
VersionStorageInfo* vstorage, LogBuffer* log_buffer,
SequenceNumber /*earliest_memtable_seqno*/) {
assert(vstorage->num_levels() == 1);

Compaction* c = nullptr;
Expand Down
8 changes: 4 additions & 4 deletions db/compaction/compaction_picker_fifo.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ class FIFOCompactionPicker : public CompactionPicker {
const InternalKeyComparator* icmp)
: CompactionPicker(ioptions, icmp) {}

virtual Compaction* PickCompaction(const std::string& cf_name,
const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* version,
LogBuffer* log_buffer) override;
virtual Compaction* PickCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* version, LogBuffer* log_buffer,
SequenceNumber earliest_memtable_seqno = kMaxSequenceNumber) override;

virtual Compaction* CompactRange(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
Expand Down
9 changes: 6 additions & 3 deletions db/compaction/compaction_picker_level.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,14 @@ class LevelCompactionBuilder {
public:
LevelCompactionBuilder(const std::string& cf_name,
VersionStorageInfo* vstorage,
SequenceNumber earliest_mem_seqno,
CompactionPicker* compaction_picker,
LogBuffer* log_buffer,
const MutableCFOptions& mutable_cf_options,
const ImmutableCFOptions& ioptions)
: cf_name_(cf_name),
vstorage_(vstorage),
earliest_mem_seqno_(earliest_mem_seqno),
compaction_picker_(compaction_picker),
log_buffer_(log_buffer),
mutable_cf_options_(mutable_cf_options),
Expand Down Expand Up @@ -97,6 +99,7 @@ class LevelCompactionBuilder {

const std::string& cf_name_;
VersionStorageInfo* vstorage_;
SequenceNumber earliest_mem_seqno_;
CompactionPicker* compaction_picker_;
LogBuffer* log_buffer_;
int start_level_ = -1;
Expand Down Expand Up @@ -539,14 +542,14 @@ bool LevelCompactionBuilder::PickIntraL0Compaction() {
}
return FindIntraL0Compaction(
level_files, kMinFilesForIntraL0Compaction, port::kMaxUint64,
mutable_cf_options_.max_compaction_bytes, &start_level_inputs_);
mutable_cf_options_.max_compaction_bytes, &start_level_inputs_, earliest_mem_seqno_);
}
} // namespace

Compaction* LevelCompactionPicker::PickCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, LogBuffer* log_buffer) {
LevelCompactionBuilder builder(cf_name, vstorage, this, log_buffer,
VersionStorageInfo* vstorage, LogBuffer* log_buffer, SequenceNumber earliest_mem_seqno) {
LevelCompactionBuilder builder(cf_name, vstorage, earliest_mem_seqno, this, log_buffer,
mutable_cf_options, ioptions_);
return builder.PickCompaction();
}
Expand Down
8 changes: 4 additions & 4 deletions db/compaction/compaction_picker_level.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ class LevelCompactionPicker : public CompactionPicker {
LevelCompactionPicker(const ImmutableCFOptions& ioptions,
const InternalKeyComparator* icmp)
: CompactionPicker(ioptions, icmp) {}
virtual Compaction* PickCompaction(const std::string& cf_name,
const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage,
LogBuffer* log_buffer) override;
virtual Compaction* PickCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, LogBuffer* log_buffer,
SequenceNumber earliest_memtable_seqno = kMaxSequenceNumber) override;

virtual bool NeedsCompaction(
const VersionStorageInfo* vstorage) const override;
Expand Down
24 changes: 12 additions & 12 deletions db/compaction/compaction_picker_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1486,12 +1486,12 @@ TEST_F(CompactionPickerTest, IntraL0MaxCompactionBytesNotHit) {
// All 5 L0 files will be picked for intra L0 compaction. The one L1 file
// spans entire L0 key range and is marked as being compacted to avoid
// L0->L1 compaction.
Add(0, 1U, "100", "150", 200000U);
Add(0, 2U, "151", "200", 200000U);
Add(0, 3U, "201", "250", 200000U);
Add(0, 4U, "251", "300", 200000U);
Add(0, 5U, "301", "350", 200000U);
Add(1, 6U, "100", "350", 200000U);
Add(0, 1U, "100", "150", 200000U, 0, 100, 101);
Add(0, 2U, "151", "200", 200000U, 0, 102, 103);
Add(0, 3U, "201", "250", 200000U, 0, 104, 105);
Add(0, 4U, "251", "300", 200000U, 0, 106, 107);
Add(0, 5U, "301", "350", 200000U, 0, 108, 109);
Add(1, 6U, "100", "350", 200000U, 0, 110, 111);
vstorage_->LevelFiles(1)[0]->being_compacted = true;
UpdateVersionStorageInfo();

Expand All @@ -1516,12 +1516,12 @@ TEST_F(CompactionPickerTest, IntraL0MaxCompactionBytesHit) {
// max_compaction_bytes limit (the minimum number of files for triggering
// intra L0 compaction is 4). The one L1 file spans entire L0 key range and
// is marked as being compacted to avoid L0->L1 compaction.
Add(0, 1U, "100", "150", 200000U);
Add(0, 2U, "151", "200", 200000U);
Add(0, 3U, "201", "250", 200000U);
Add(0, 4U, "251", "300", 200000U);
Add(0, 5U, "301", "350", 200000U);
Add(1, 6U, "100", "350", 200000U);
Add(0, 1U, "100", "150", 200000U, 0, 100, 101);
Add(0, 2U, "151", "200", 200000U, 0, 102, 103);
Add(0, 3U, "201", "250", 200000U, 0, 104, 105);
Add(0, 4U, "251", "300", 200000U, 0, 106, 107);
Add(0, 5U, "301", "350", 200000U, 0, 108, 109);
Add(1, 6U, "100", "350", 200000U, 0, 109, 110);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add several tests to cover more scenarios in FindIntraL0Compaction()? For example, when being_compacted shows up in L0? Also, it will be nice to directly cover the earliest_mem_seqno scenarios in tests here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I have add a tests for FindIntraL0Compaction, which make sure that it would skip sst which of largest lsn is larger than earliest_seqno.

vstorage_->LevelFiles(1)[0]->being_compacted = true;
UpdateVersionStorageInfo();

Expand Down
3 changes: 2 additions & 1 deletion db/compaction/compaction_picker_universal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,8 @@ bool UniversalCompactionPicker::NeedsCompaction(

Compaction* UniversalCompactionPicker::PickCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, LogBuffer* log_buffer) {
VersionStorageInfo* vstorage, LogBuffer* log_buffer,
SequenceNumber /* earliest_memtable_seqno */) {
UniversalCompactionBuilder builder(ioptions_, icmp_, cf_name,
mutable_cf_options, vstorage, this,
log_buffer);
Expand Down
9 changes: 4 additions & 5 deletions db/compaction/compaction_picker_universal.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@ class UniversalCompactionPicker : public CompactionPicker {
UniversalCompactionPicker(const ImmutableCFOptions& ioptions,
const InternalKeyComparator* icmp)
: CompactionPicker(ioptions, icmp) {}
virtual Compaction* PickCompaction(const std::string& cf_name,
const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage,
LogBuffer* log_buffer) override;

virtual Compaction* PickCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, LogBuffer* log_buffer,
SequenceNumber earliest_memtable_seqno = kMaxSequenceNumber) override;
virtual int MaxOutputLevel() const override { return NumberLevels() - 1; }

virtual bool NeedsCompaction(
Expand Down
Loading