Skip to content

Commit

Permalink
Clean up temporary files after TiFlash restart
Browse files Browse the repository at this point in the history
Signed-off-by: JaySon-Huang <[email protected]>
  • Loading branch information
JaySon-Huang committed Jun 2, 2021
1 parent 0eb8581 commit 62cc3ab
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 28 deletions.
14 changes: 10 additions & 4 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ DeltaMergeStore::DeltaMergeStore(Context & db_context,
{
LOG_INFO(log, "Restore DeltaMerge Store start [" << db_name << "." << table_name << "]");

// restore existing dm files and set capacity for path_pool.
// Restore existing dm files and set capacity for path_pool.
// Should be done before any background task setup.
restoreStableFiles();

original_table_columns.emplace_back(original_table_handle_define);
Expand Down Expand Up @@ -236,11 +237,13 @@ void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context)
auto dmfile_scanner = [=]() {
PageStorage::PathAndIdsVec path_and_ids_vec;
auto delegate = path_pool.getStableDiskDelegator();
DMFile::ListOptions options;
options.only_list_can_gc = true;
for (auto & root_path : delegate.listPaths())
{
auto & path_and_ids = path_and_ids_vec.emplace_back();
path_and_ids.first = root_path;
auto file_ids_in_current_path = DMFile::listAllInPath(global_context.getFileProvider(), root_path, /* can_gc= */ true);
auto file_ids_in_current_path = DMFile::listAllInPath(global_context.getFileProvider(), root_path, options);
for (auto id : file_ids_in_current_path)
path_and_ids.second.insert(id);
}
Expand Down Expand Up @@ -1829,10 +1832,13 @@ void DeltaMergeStore::restoreStableFiles()
{
LOG_DEBUG(log, "Loading dt files");

auto path_delegate = path_pool.getStableDiskDelegator();
auto path_delegate = path_pool.getStableDiskDelegator();
DMFile::ListOptions options;
options.only_list_can_gc = false;
options.clean_up = true;
for (const auto & root_path : path_delegate.listPaths())
{
for (auto & file_id : DMFile::listAllInPath(global_context.getFileProvider(), root_path, false))
for (auto & file_id : DMFile::listAllInPath(global_context.getFileProvider(), root_path, options))
{
auto dmfile = DMFile::restore(global_context.getFileProvider(), file_id, /* ref_id= */ 0, root_path, true);
path_delegate.addDTFile(file_id, dmfile->getBytesOnDisk(), root_path);
Expand Down
48 changes: 32 additions & 16 deletions dbms/src/Storages/DeltaMerge/File/DMFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,8 @@ void DMFile::finalizeForSingleFileMode(WriteBuffer & buffer)
old_ngc_file.remove();
}

std::set<UInt64> DMFile::listAllInPath(const FileProviderPtr & file_provider, const String & parent_path, bool can_gc)
std::set<UInt64>
DMFile::listAllInPath(const FileProviderPtr & file_provider, const String & parent_path, const DMFile::ListOptions & options)
{
Poco::File folder(parent_path);
if (!folder.exists())
Expand All @@ -406,24 +407,39 @@ std::set<UInt64> DMFile::listAllInPath(const FileProviderPtr & file_provider, co

for (const auto & name : file_names)
{

// clear deleted (maybe broken) DMFiles
if (startsWith(name, details::FOLDER_PREFIX_DROPPED))
// Clean up temporary files and files should be deleted
// Note that you should not call it if some DTFiles are writing
if (options.clean_up)
{
auto res = try_parse_file_id(name);
if (!res)
if (startsWith(name, details::FOLDER_PREFIX_WRITABLE))
{
LOG_INFO(log, "Unrecognized dropped DM file, ignored: " + name);
// Clear temporary files
const auto full_path = parent_path + "/" + name;
if (Poco::File temp_file(full_path); temp_file.exists())
temp_file.remove(true);
LOG_WARNING(log, __PRETTY_FUNCTION__ << ": Existing temporary dmfile, removed: " << full_path);
continue;
}
else if (startsWith(name, details::FOLDER_PREFIX_DROPPED))
{
// Clear deleted (maybe broken) DTFiles
auto res = try_parse_file_id(name);
if (!res)
{
LOG_INFO(log, "Unrecognized dropped DM file, ignored: " + name);
continue;
}
UInt64 file_id = *res;
// The encryption info use readable path. We are not sure the encryption info is deleted or not.
// Try to delete and ignore if it is already deleted.
const String readable_path = getPathByStatus(parent_path, file_id, DMFile::Status::READABLE);
file_provider->deleteEncryptionInfo(EncryptionPath(readable_path, ""), /* throw_on_error= */ false);
const auto full_path = parent_path + "/" + name;
if (Poco::File del_file(full_path); del_file.exists())
del_file.remove(true);
LOG_WARNING(log, __PRETTY_FUNCTION__ << ": Existing dropped dmfile, removed: " << full_path);
continue;
}
UInt64 file_id = *res;
// The encryption info use readable path. We are not sure the encryption info is deleted or not.
// Try to delete and ignore if it is already deleted.
const String readable_path = getPathByStatus(parent_path, file_id, DMFile::Status::READABLE);
file_provider->deleteEncryptionInfo(EncryptionPath(readable_path, ""), /* throw_on_error= */ false);
if (Poco::File del_file(parent_path + "/" + name); del_file.exists())
del_file.remove(true);
continue;
}

if (!startsWith(name, details::FOLDER_PREFIX_READABLE))
Expand All @@ -439,7 +455,7 @@ std::set<UInt64> DMFile::listAllInPath(const FileProviderPtr & file_provider, co
}
UInt64 file_id = *res;

if (can_gc)
if (options.only_list_can_gc)
{
// Only return the ID if the file is able to be GC-ed.
const auto file_path = parent_path + "/" + name;
Expand Down
9 changes: 8 additions & 1 deletion dbms/src/Storages/DeltaMerge/File/DMFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,14 @@ class DMFile : private boost::noncopyable
static DMFilePtr
restore(const FileProviderPtr & file_provider, UInt64 file_id, UInt64 ref_id, const String & parent_path, bool read_meta = true);

static std::set<UInt64> listAllInPath(const FileProviderPtr & file_provider, const String & parent_path, bool can_gc);
struct ListOptions
{
// Only return the DTFiles id list that can be GC
bool only_list_can_gc = true;
// Try to clean up temporary / dropped files
bool clean_up = false;
};
static std::set<UInt64> listAllInPath(const FileProviderPtr & file_provider, const String & parent_path, const ListOptions & options);

// static helper function for getting path
static String getPathByStatus(const String & parent_path, UInt64 file_id, DMFile::Status status);
Expand Down
24 changes: 17 additions & 7 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,9 @@ try
dm_file = DMFile::create(id, parent_path, single_file_mode);
// Right after created, the fil is not abled to GC and it is ignored by `listAllInPath`
EXPECT_FALSE(dm_file->canGC());
auto scanIds = DMFile::listAllInPath(file_provider, parent_path, /*can_gc=*/true);
DMFile::ListOptions options;
options.only_list_can_gc = true;
auto scanIds = DMFile::listAllInPath(file_provider, parent_path, options);
ASSERT_TRUE(scanIds.empty());

{
Expand All @@ -270,20 +272,24 @@ try

// The file remains not able to GC
ASSERT_FALSE(dm_file->canGC());
options.only_list_can_gc = false;
// Now the file can be scaned
scanIds = DMFile::listAllInPath(file_provider, parent_path, /*can_gc=*/false);
scanIds = DMFile::listAllInPath(file_provider, parent_path, options);
ASSERT_EQ(scanIds.size(), 1UL);
EXPECT_EQ(*scanIds.begin(), id);
scanIds = DMFile::listAllInPath(file_provider, parent_path, /*can_gc=*/true);
options.only_list_can_gc = true;
scanIds = DMFile::listAllInPath(file_provider, parent_path, options);
EXPECT_TRUE(scanIds.empty());

// After enable GC, the file can be scaned with `can_gc=true`
dm_file->enableGC();
ASSERT_TRUE(dm_file->canGC());
scanIds = DMFile::listAllInPath(file_provider, parent_path, /*can_gc=*/false);
options.only_list_can_gc = false;
scanIds = DMFile::listAllInPath(file_provider, parent_path, options);
ASSERT_EQ(scanIds.size(), 1UL);
EXPECT_EQ(*scanIds.begin(), id);
scanIds = DMFile::listAllInPath(file_provider, parent_path, /*can_gc=*/true);
options.only_list_can_gc = true;
scanIds = DMFile::listAllInPath(file_provider, parent_path, options);
ASSERT_EQ(scanIds.size(), 1UL);
EXPECT_EQ(*scanIds.begin(), id);
}
Expand Down Expand Up @@ -355,7 +361,9 @@ try
}

// The broken file is ignored
auto res = DMFile::listAllInPath(file_provider, parent_path, true);
DMFile::ListOptions options;
options.only_list_can_gc = true;
auto res = DMFile::listAllInPath(file_provider, parent_path, options);
EXPECT_TRUE(res.empty());
}
CATCH
Expand Down Expand Up @@ -423,7 +431,9 @@ try
}

// The broken file is ignored
auto res = DMFile::listAllInPath(file_provider, parent_path, true);
DMFile::ListOptions options;
options.only_list_can_gc = true;
auto res = DMFile::listAllInPath(file_provider, parent_path, options);
EXPECT_TRUE(res.empty());
}
CATCH
Expand Down

0 comments on commit 62cc3ab

Please sign in to comment.