Skip to content

Commit

Permalink
Fix deleting obsolete files #2
Browse files Browse the repository at this point in the history
Summary: For description of the bug, see comment in db_test. The fix is pretty straight forward.

Test Plan: added unit test. eventually we need better testing of FOF/POF process.

Reviewers: yhchiang, rven, sdong

Reviewed By: sdong

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D33081
  • Loading branch information
igorcanadi committed Feb 10, 2015
1 parent 1851f97 commit 863009b
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 13 deletions.
20 changes: 11 additions & 9 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -464,8 +464,18 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
}
}

// don't delete files that might be currently written to from compaction
// threads
if (!pending_outputs_.empty()) {
job_context->min_pending_output = *pending_outputs_.begin();
} else {
// delete all of them
job_context->min_pending_output = std::numeric_limits<uint64_t>::max();
}

// get obsolete files
versions_->GetObsoleteFiles(&job_context->sst_delete_files);
versions_->GetObsoleteFiles(&job_context->sst_delete_files,
job_context->min_pending_output);

// store the current filenum, lognum, etc
job_context->manifest_file_number = versions_->manifest_file_number();
Expand All @@ -474,14 +484,6 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
job_context->log_number = versions_->MinLogNumber();
job_context->prev_log_number = versions_->prev_log_number();

// don't delete live files
if (pending_outputs_.size()) {
job_context->min_pending_output = *pending_outputs_.begin();
} else {
// delete all of them
job_context->min_pending_output = std::numeric_limits<uint64_t>::max();
}

versions_->AddLiveFiles(&job_context->sst_live);
if (doing_the_full_scan) {
for (uint32_t path_id = 0; path_id < db_options_.db_paths.size();
Expand Down
90 changes: 90 additions & 0 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "db/job_context.h"
#include "db/version_set.h"
#include "db/write_batch_internal.h"
#include "port/stack_trace.h"
#include "rocksdb/cache.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/db.h"
Expand Down Expand Up @@ -10425,6 +10426,95 @@ TEST(DBTest, MutexWaitStats) {
ThreadStatus::STATE_MUTEX_WAIT, 0);
}

// This reproduces a bug where we don't delete a file because when it was
// supposed to be deleted, it was blocked by pending_outputs
// Consider:
// 1. current file_number is 13
// 2. compaction (1) starts, blocks deletion of all files starting with 13
// (pending outputs)
// 3. file 13 is created by compaction (2)
// 4. file 13 is consumed by compaction (3) and file 15 was created. Since file
// 13 has no references, it is put into VersionSet::obsolete_files_
// 5. FindObsoleteFiles() gets file 13 from VersionSet::obsolete_files_. File 13
// is deleted from obsolete_files_ set.
// 6. PurgeObsoleteFiles() tries to delete file 13, but this file is blocked by
// pending outputs since compaction (1) is still running. It is not deleted and
// it is not present in obsolete_files_ anymore. Therefore, we never delete it.
TEST(DBTest, DeleteObsoleteFilesPendingOutputs) {
Options options = CurrentOptions();
options.env = env_;
options.write_buffer_size = 2 * 1024 * 1024; // 2 MB
options.max_bytes_for_level_base = 1024 * 1024; // 1 MB
options.level0_file_num_compaction_trigger =
2; // trigger compaction when we have 2 files
options.max_background_flushes = 2;
options.max_background_compactions = 2;
Reopen(options);

Random rnd(301);
// Create two 1MB sst files
for (int i = 0; i < 2; ++i) {
// Create 1MB sst file
for (int j = 0; j < 100; ++j) {
ASSERT_OK(Put(Key(i * 50 + j), RandomString(&rnd, 10 * 1024)));
}
ASSERT_OK(Flush());
}
// this should execute both L0->L1 and L1->(move)->L2 compactions
dbfull()->TEST_WaitForCompact();
ASSERT_EQ("0,0,1", FilesPerLevel(0));

SleepingBackgroundTask blocking_thread;
port::Mutex mutex_;
bool already_blocked(false);

// block the flush
std::function<void()> block_first_time = [&]() {
bool blocking = false;
{
MutexLock l(&mutex_);
if (!already_blocked) {
blocking = true;
already_blocked = true;
}
}
if (blocking) {
blocking_thread.DoSleep();
}
};
env_->table_write_callback_ = &block_first_time;
// Create 1MB sst file
for (int j = 0; j < 256; ++j) {
ASSERT_OK(Put(Key(j), RandomString(&rnd, 10 * 1024)));
}
// this should trigger a flush, which is blocked with block_first_time
// pending_file is protecting all the files created after

ASSERT_OK(dbfull()->TEST_CompactRange(2, nullptr, nullptr));

ASSERT_EQ("0,0,0,1", FilesPerLevel(0));
std::vector<LiveFileMetaData> metadata;
db_->GetLiveFilesMetaData(&metadata);
ASSERT_EQ(metadata.size(), 1U);
auto file_on_L2 = metadata[0].name;

ASSERT_OK(dbfull()->TEST_CompactRange(3, nullptr, nullptr));
ASSERT_EQ("0,0,0,0,1", FilesPerLevel(0));

// finish the flush!
blocking_thread.WakeUp();
blocking_thread.WaitUntilDone();
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_EQ("1,0,0,0,1", FilesPerLevel(0));

metadata.clear();
db_->GetLiveFilesMetaData(&metadata);
ASSERT_EQ(metadata.size(), 2U);

// This file should have been deleted
ASSERT_TRUE(!env_->FileExists(dbname_ + "/" + file_on_L2));
}

} // namespace rocksdb

int main(int argc, char** argv) {
Expand Down
14 changes: 11 additions & 3 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2772,9 +2772,17 @@ void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
}
}

void VersionSet::GetObsoleteFiles(std::vector<FileMetaData*>* files) {
files->insert(files->end(), obsolete_files_.begin(), obsolete_files_.end());
obsolete_files_.clear();
void VersionSet::GetObsoleteFiles(std::vector<FileMetaData*>* files,
uint64_t min_pending_output) {
std::vector<FileMetaData*> pending_files;
for (auto f : obsolete_files_) {
if (f->fd.GetNumber() < min_pending_output) {
files->push_back(f);
} else {
pending_files.push_back(f);
}
}
obsolete_files_.swap(pending_files);
}

ColumnFamilyData* VersionSet::CreateColumnFamily(
Expand Down
3 changes: 2 additions & 1 deletion db/version_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,8 @@ class VersionSet {

void GetLiveFilesMetaData(std::vector<LiveFileMetaData> *metadata);

void GetObsoleteFiles(std::vector<FileMetaData*>* files);
void GetObsoleteFiles(std::vector<FileMetaData*>* files,
uint64_t min_pending_output);

ColumnFamilySet* GetColumnFamilySet() { return column_family_set_.get(); }
const EnvOptions& env_options() { return env_options_; }
Expand Down

0 comments on commit 863009b

Please sign in to comment.