Skip to content

Commit

Permalink
Revert PR 10777 "Fix FIFO causing overlapping seqnos in L0 files due …
Browse files Browse the repository at this point in the history
…to overla…" (facebook#10999)

Summary:
**Context/Summary:**

This reverts commit fc74abb and related HISTORY record.

The issue with PR 10777 or general approach using earliest_mem_seqno like facebook#5958 (comment) is that the earliest seqno of memtable of each CFs does not get persisted and will always start with 0 upon Recover(). Later when creating a new memtable in certain CF, we use the last seqno of the whole DB (but not of that CF from previous DB session) for this CF.  This will lead to false positive overlapping seqno and PR 10777 will throw something like https://github.com/facebook/rocksdb/blob/main/db/compaction/compaction_picker.cc#L1002-L1004

Luckily a more elegant and complete solution to the overlapping seqno problem these PR aim to solve does not have above problem, see facebook#10922. It is already being pursued and in the process of review. So we can just revert this PR and focus on getting PR10922 to land.

Pull Request resolved: facebook#10999

Test Plan: make check

Reviewed By: anand1976

Differential Revision: D41572604

Pulled By: hx235

fbshipit-source-id: 9d9bdf594abd235e2137045cef513ca0b14e0a3a
  • Loading branch information
hx235 committed Nov 29, 2022
1 parent ab9741a commit 3ff86f3
Show file tree
Hide file tree
Showing 17 changed files with 146 additions and 377 deletions.
1 change: 0 additions & 1 deletion HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
* Fixed an iterator performance regression for delete range users when scanning through a consecutive sequence of range tombstones (#10877).

### Bug Fixes
* Fix FIFO compaction causing corruption of overlapping seqnos in L0 files due to ingesting files of overlapping seqnos with memtable's under `CompactionOptionsFIFO::allow_compaction=true` or `CompactionOptionsFIFO::age_for_warm>0` or `CompactRange()/CompactFiles()` is used. Before the fix, `force_consistency_checks=true` may catch the corruption before it's exposed to readers, in which case writes returning `Status::Corruption` would be expected.
* Fix memory corruption error in scans if async_io is enabled. Memory corruption happened if there is IOError while reading the data leading to empty buffer and other buffer already in progress of async read goes again for reading.
* Fix failed memtable flush retry bug that could cause wrongly ordered updates, which would surface to writers as `Status::Corruption` in case of `force_consistency_checks=true` (default). It affects use cases that enable both parallel flush (`max_background_flushes > 1` or `max_background_jobs >= 8`) and non-default memtable count (`max_write_buffer_number > 2`).
* Fixed an issue where the `READ_NUM_MERGE_OPERANDS` ticker was not updated when the base key-value or tombstone was read from an SST file.
Expand Down
5 changes: 1 addition & 4 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1212,14 +1212,11 @@ Compaction* ColumnFamilyData::CompactRange(
const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end, bool* conflict,
uint64_t max_file_num_to_ignore, const std::string& trim_ts) {
SequenceNumber earliest_mem_seqno =
std::min(mem_->GetEarliestSequenceNumber(),
imm_.current()->GetEarliestSequenceNumber(false));
auto* result = compaction_picker_->CompactRange(
GetName(), mutable_cf_options, mutable_db_options,
current_->storage_info(), input_level, output_level,
compact_range_options, begin, end, compaction_end, conflict,
max_file_num_to_ignore, trim_ts, earliest_mem_seqno);
max_file_num_to_ignore, trim_ts);
if (result != nullptr) {
result->SetInputVersion(current_);
}
Expand Down
23 changes: 6 additions & 17 deletions db/compaction/compaction_picker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ bool FindIntraL0Compaction(const std::vector<FileMetaData*>& level_files,
uint64_t max_compact_bytes_per_del_file,
uint64_t max_compaction_bytes,
CompactionInputFiles* comp_inputs,
const SequenceNumber earliest_mem_seqno) {
SequenceNumber earliest_mem_seqno) {
// Do not pick ingested file when there is at least one memtable not flushed
// which of seqno is overlap with the sst.
TEST_SYNC_POINT("FindIntraL0Compaction");
Expand Down Expand Up @@ -613,8 +613,7 @@ Compaction* CompactionPicker::CompactRange(
int input_level, int output_level,
const CompactRangeOptions& compact_range_options, const InternalKey* begin,
const InternalKey* end, InternalKey** compaction_end, bool* manual_conflict,
uint64_t max_file_num_to_ignore, const std::string& trim_ts,
const SequenceNumber /*earliest_mem_seqno*/) {
uint64_t max_file_num_to_ignore, const std::string& trim_ts) {
// CompactionPickerFIFO has its own implementation of compact range
assert(ioptions_.compaction_style != kCompactionStyleFIFO);

Expand Down Expand Up @@ -919,8 +918,7 @@ bool HaveOverlappingKeyRanges(const Comparator* c, const SstFileMetaData& a,

Status CompactionPicker::SanitizeCompactionInputFilesForAllLevels(
std::unordered_set<uint64_t>* input_files,
const ColumnFamilyMetaData& cf_meta, const int output_level,
const SequenceNumber earliest_mem_seqno) const {
const ColumnFamilyMetaData& cf_meta, const int output_level) const {
auto& levels = cf_meta.levels;
auto comparator = icmp_->user_comparator();

Expand Down Expand Up @@ -997,13 +995,6 @@ Status CompactionPicker::SanitizeCompactionInputFilesForAllLevels(
current_files[f].name +
" is currently being compacted.");
}
if (output_level == 0 &&
current_files[f].largest_seqno > earliest_mem_seqno) {
return Status::Aborted(
"Necessary compaction input file " + current_files[f].name +
" has overlapping seqnos with earliest memtable seqnos.");
}

input_files->insert(TableFileNameToNumber(current_files[f].name));
}

Expand Down Expand Up @@ -1060,14 +1051,12 @@ Status CompactionPicker::SanitizeCompactionInputFilesForAllLevels(
"A running compaction is writing to the same output level in an "
"overlapping key range");
}

return Status::OK();
}

Status CompactionPicker::SanitizeCompactionInputFiles(
std::unordered_set<uint64_t>* input_files,
const ColumnFamilyMetaData& cf_meta, const int output_level,
const SequenceNumber earliest_mem_seqno) const {
const ColumnFamilyMetaData& cf_meta, const int output_level) const {
assert(static_cast<int>(cf_meta.levels.size()) - 1 ==
cf_meta.levels[cf_meta.levels.size() - 1].level);
if (output_level >= static_cast<int>(cf_meta.levels.size())) {
Expand All @@ -1093,8 +1082,8 @@ Status CompactionPicker::SanitizeCompactionInputFiles(
"A compaction must contain at least one file.");
}

Status s = SanitizeCompactionInputFilesForAllLevels(
input_files, cf_meta, output_level, earliest_mem_seqno);
Status s = SanitizeCompactionInputFilesForAllLevels(input_files, cf_meta,
output_level);

if (!s.ok()) {
return s;
Expand Down
70 changes: 25 additions & 45 deletions db/compaction/compaction_picker.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,24 +51,15 @@ class CompactionPicker {
virtual ~CompactionPicker();

// Pick level and inputs for a new compaction.
//
// `earliest_mem_seqno` is the earliest seqno of unflushed memtables.
// It is needed to compare with compaction input SST files' largest seqnos
// in order to exclude those of seqnos potentially overlap with memtables'
// seqnos when doing compaction to L0. This will avoid creating a SST files in
// L0 newer than a unflushed memtable. Such SST file can exist in the first
// place when it's ingested or resulted from compaction involving files
// ingested.
//
// Returns nullptr if there is no compaction to be done.
// Otherwise returns a pointer to a heap-allocated object that
// describes the compaction. Caller should delete the result.
virtual Compaction* PickCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage,
LogBuffer* log_buffer, const SequenceNumber earliest_mem_seqno) = 0;
LogBuffer* log_buffer,
SequenceNumber earliest_memtable_seqno = kMaxSequenceNumber) = 0;

// `earliest_mem_seqno`: see PickCompaction() API
// Return a compaction object for compacting the range [begin,end] in
// the specified level. Returns nullptr if there is nothing in that
// level that overlaps the specified range. Caller should delete
Expand All @@ -87,8 +78,7 @@ class CompactionPicker {
const CompactRangeOptions& compact_range_options,
const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end, bool* manual_conflict,
uint64_t max_file_num_to_ignore, const std::string& trim_ts,
const SequenceNumber earliest_mem_seqno);
uint64_t max_file_num_to_ignore, const std::string& trim_ts);

// The maximum allowed output level. Default value is NumberLevels() - 1.
virtual int MaxOutputLevel() const { return NumberLevels() - 1; }
Expand All @@ -101,18 +91,10 @@ class CompactionPicker {
// files. If it's not possible to conver an invalid input_files
// into a valid one by adding more files, the function will return a
// non-ok status with specific reason.
//
// Cases of returning non-ok status include but not limited to:
// - When output_level == 0 and input_files contains sst files
// of largest seqno greater than `earliest_mem_seqno`. This will
// avoid creating a SST files in L0 newer than a unflushed memtable.
// Such SST file can exist in the first place when it's ingested or
// resulted from compaction involving files ingested.
#ifndef ROCKSDB_LITE
Status SanitizeCompactionInputFiles(
std::unordered_set<uint64_t>* input_files,
const ColumnFamilyMetaData& cf_meta, const int output_level,
const SequenceNumber earliest_mem_seqno) const;
Status SanitizeCompactionInputFiles(std::unordered_set<uint64_t>* input_files,
const ColumnFamilyMetaData& cf_meta,
const int output_level) const;
#endif // ROCKSDB_LITE

// Free up the files that participated in a compaction
Expand Down Expand Up @@ -248,8 +230,7 @@ class CompactionPicker {
#ifndef ROCKSDB_LITE
virtual Status SanitizeCompactionInputFilesForAllLevels(
std::unordered_set<uint64_t>* input_files,
const ColumnFamilyMetaData& cf_meta, const int output_level,
const SequenceNumber earliest_mem_seqno) const;
const ColumnFamilyMetaData& cf_meta, const int output_level) const;
#endif // ROCKSDB_LITE

// Keeps track of all compactions that are running on Level0.
Expand Down Expand Up @@ -279,22 +260,23 @@ class NullCompactionPicker : public CompactionPicker {
const MutableCFOptions& /*mutable_cf_options*/,
const MutableDBOptions& /*mutable_db_options*/,
VersionStorageInfo* /*vstorage*/, LogBuffer* /* log_buffer */,
const SequenceNumber /* earliest_mem_seqno */) override {
SequenceNumber /* earliest_memtable_seqno */) override {
return nullptr;
}

// Always return "nullptr"
Compaction* CompactRange(
const std::string& /*cf_name*/,
const MutableCFOptions& /*mutable_cf_options*/,
const MutableDBOptions& /*mutable_db_options*/,
VersionStorageInfo* /*vstorage*/, int /*input_level*/,
int /*output_level*/,
const CompactRangeOptions& /*compact_range_options*/,
const InternalKey* /*begin*/, const InternalKey* /*end*/,
InternalKey** /*compaction_end*/, bool* /*manual_conflict*/,
uint64_t /*max_file_num_to_ignore*/, const std::string& /*trim_ts*/,
const SequenceNumber /* earliest_mem_seqno */) override {
Compaction* CompactRange(const std::string& /*cf_name*/,
const MutableCFOptions& /*mutable_cf_options*/,
const MutableDBOptions& /*mutable_db_options*/,
VersionStorageInfo* /*vstorage*/,
int /*input_level*/, int /*output_level*/,
const CompactRangeOptions& /*compact_range_options*/,
const InternalKey* /*begin*/,
const InternalKey* /*end*/,
InternalKey** /*compaction_end*/,
bool* /*manual_conflict*/,
uint64_t /*max_file_num_to_ignore*/,
const std::string& /*trim_ts*/) override {
return nullptr;
}

Expand All @@ -321,14 +303,12 @@ class NullCompactionPicker : public CompactionPicker {
// initialized with corresponding input
// files. Cannot be nullptr.
//
// @param earliest_mem_seqno See PickCompaction() API
// @return true iff compaction was found.
bool FindIntraL0Compaction(const std::vector<FileMetaData*>& level_files,
size_t min_files_to_compact,
uint64_t max_compact_bytes_per_del_file,
uint64_t max_compaction_bytes,
CompactionInputFiles* comp_inputs,
const SequenceNumber earliest_mem_seqno);
bool FindIntraL0Compaction(
const std::vector<FileMetaData*>& level_files, size_t min_files_to_compact,
uint64_t max_compact_bytes_per_del_file, uint64_t max_compaction_bytes,
CompactionInputFiles* comp_inputs,
SequenceNumber earliest_mem_seqno = kMaxSequenceNumber);

CompressionType GetCompressionType(const VersionStorageInfo* vstorage,
const MutableCFOptions& mutable_cf_options,
Expand Down
27 changes: 10 additions & 17 deletions db/compaction/compaction_picker_fifo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ Compaction* FIFOCompactionPicker::PickTTLCompaction(
Compaction* FIFOCompactionPicker::PickSizeCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage,
LogBuffer* log_buffer, const SequenceNumber earliest_mem_seqno) {
LogBuffer* log_buffer) {
// compute the total size and identify the last non-empty level
int last_level = 0;
uint64_t total_size = 0;
Expand Down Expand Up @@ -176,8 +176,7 @@ Compaction* FIFOCompactionPicker::PickSizeCompaction(
.level0_file_num_compaction_trigger /* min_files_to_compact */
,
max_compact_bytes_per_del_file,
mutable_cf_options.max_compaction_bytes, &comp_inputs,
earliest_mem_seqno)) {
mutable_cf_options.max_compaction_bytes, &comp_inputs)) {
Compaction* c = new Compaction(
vstorage, ioptions_, mutable_cf_options, mutable_db_options,
{comp_inputs}, 0, 16 * 1024 * 1024 /* output file size limit */,
Expand Down Expand Up @@ -276,8 +275,7 @@ Compaction* FIFOCompactionPicker::PickSizeCompaction(
Compaction* FIFOCompactionPicker::PickCompactionToWarm(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage,
LogBuffer* log_buffer, const SequenceNumber earliest_mem_seqno) {
TEST_SYNC_POINT("PickCompactionToWarm");
LogBuffer* log_buffer) {
if (mutable_cf_options.compaction_options_fifo.age_for_warm == 0) {
return nullptr;
}
Expand All @@ -301,8 +299,6 @@ Compaction* FIFOCompactionPicker::PickCompactionToWarm(
cf_name.c_str(), status.ToString().c_str());
return nullptr;
}
TEST_SYNC_POINT_CALLBACK("PickCompactionToWarm::BeforeGetCurrentTime",
&_current_time);
const uint64_t current_time = static_cast<uint64_t>(_current_time);

if (!level0_compactions_in_progress_.empty()) {
Expand Down Expand Up @@ -349,8 +345,7 @@ Compaction* FIFOCompactionPicker::PickCompactionToWarm(
// for warm tier.
break;
}
if (prev_file != nullptr &&
prev_file->fd.largest_seqno <= earliest_mem_seqno) {
if (prev_file != nullptr) {
compaction_size += prev_file->fd.GetFileSize();
if (compaction_size > mutable_cf_options.max_compaction_bytes) {
break;
Expand Down Expand Up @@ -394,19 +389,19 @@ Compaction* FIFOCompactionPicker::PickCompactionToWarm(
Compaction* FIFOCompactionPicker::PickCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage,
LogBuffer* log_buffer, const SequenceNumber earliest_mem_seqno) {
LogBuffer* log_buffer, SequenceNumber /*earliest_memtable_seqno*/) {
Compaction* c = nullptr;
if (mutable_cf_options.ttl > 0) {
c = PickTTLCompaction(cf_name, mutable_cf_options, mutable_db_options,
vstorage, log_buffer);
}
if (c == nullptr) {
c = PickSizeCompaction(cf_name, mutable_cf_options, mutable_db_options,
vstorage, log_buffer, earliest_mem_seqno);
vstorage, log_buffer);
}
if (c == nullptr) {
c = PickCompactionToWarm(cf_name, mutable_cf_options, mutable_db_options,
vstorage, log_buffer, earliest_mem_seqno);
vstorage, log_buffer);
}
RegisterCompaction(c);
return c;
Expand All @@ -419,8 +414,7 @@ Compaction* FIFOCompactionPicker::CompactRange(
const CompactRangeOptions& /*compact_range_options*/,
const InternalKey* /*begin*/, const InternalKey* /*end*/,
InternalKey** compaction_end, bool* /*manual_conflict*/,
uint64_t /*max_file_num_to_ignore*/, const std::string& /*trim_ts*/,
const SequenceNumber earliest_mem_seqno) {
uint64_t /*max_file_num_to_ignore*/, const std::string& /*trim_ts*/) {
#ifdef NDEBUG
(void)input_level;
(void)output_level;
Expand All @@ -429,9 +423,8 @@ Compaction* FIFOCompactionPicker::CompactRange(
assert(output_level == 0);
*compaction_end = nullptr;
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, ioptions_.logger);
Compaction* c =
PickCompaction(cf_name, mutable_cf_options, mutable_db_options, vstorage,
&log_buffer, earliest_mem_seqno);
Compaction* c = PickCompaction(cf_name, mutable_cf_options,
mutable_db_options, vstorage, &log_buffer);
log_buffer.FlushBufferToLog();
return c;
}
Expand Down
16 changes: 5 additions & 11 deletions db/compaction/compaction_picker_fifo.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,17 @@ class FIFOCompactionPicker : public CompactionPicker {
virtual Compaction* PickCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
const MutableDBOptions& mutable_db_options, VersionStorageInfo* version,
LogBuffer* log_buffer, const SequenceNumber earliest_mem_seqno) override;
LogBuffer* log_buffer,
SequenceNumber earliest_memtable_seqno = kMaxSequenceNumber) override;

// `earliest_mem_seqno`: see PickCompaction() API for more. In FIFO's
// implementation of CompactRange(), different from others, we will not return
// `nullptr` right away when intput files of compaction to L0 has seqnos
// potentially overlapping with memtable's but exlucde those files.
virtual Compaction* CompactRange(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage,
int input_level, int output_level,
const CompactRangeOptions& compact_range_options,
const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end, bool* manual_conflict,
uint64_t max_file_num_to_ignore, const std::string& trim_ts,
const SequenceNumber earliest_mem_seqno) override;
uint64_t max_file_num_to_ignore, const std::string& trim_ts) override;

// The maximum allowed output level. Always returns 0.
virtual int MaxOutputLevel() const override { return 0; }
Expand All @@ -55,15 +51,13 @@ class FIFOCompactionPicker : public CompactionPicker {
const MutableCFOptions& mutable_cf_options,
const MutableDBOptions& mutable_db_options,
VersionStorageInfo* version,
LogBuffer* log_buffer,
SequenceNumber earliest_mem_seqno);
LogBuffer* log_buffer);

Compaction* PickCompactionToWarm(const std::string& cf_name,
const MutableCFOptions& mutable_cf_options,
const MutableDBOptions& mutable_db_options,
VersionStorageInfo* version,
LogBuffer* log_buffer,
const SequenceNumber earliest_mem_seqno);
LogBuffer* log_buffer);
};
} // namespace ROCKSDB_NAMESPACE
#endif // !ROCKSDB_LITE
Loading

0 comments on commit 3ff86f3

Please sign in to comment.