Skip to content

Commit

Permalink
fix empty segment cannot merge after gc (#4523)
Browse files Browse the repository at this point in the history
close #4511
  • Loading branch information
lidezhu authored Jun 2, 2022
1 parent b0bdec9 commit e87b54b
Show file tree
Hide file tree
Showing 11 changed files with 211 additions and 151 deletions.
63 changes: 54 additions & 9 deletions dbms/src/Debug/dbgFuncMisc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,73 @@

namespace DB
{
inline size_t getThreadIdForLog(const String & line)
{
auto sub_line = line.substr(line.find("thread_id="));
std::regex rx(R"((0|[1-9][0-9]*))");
std::smatch m;
if (regex_search(sub_line, m, rx))
return std::stoi(m[1]);
else
return 0;
}

// Usage example:
// The first argument is the key you want to search.
// For example, we want to search the key 'RSFilter exclude rate' in log file, and get the value following it.
// So we can use it as the first argument.
// But many kind of thread can print this keyword,
// so we can use the second argument to specify a keyword that may just be printed by a specific kind of thread.
// Here we use 'Rough set filter' to specify we just want to search read thread.
// And the complete command is the following:
// DBGInvoke search_log_for_key('RSFilter exclude rate', 'Rough set filter')
// TODO: this is still a too hack way to do test, but cannot think a better way now.
void dbgFuncSearchLogForKey(Context & context, const ASTs & args, DBGInvoker::Printer output)
{
if (args.size() < 1)
throw Exception("Args not matched, should be: key", ErrorCodes::BAD_ARGUMENTS);
if (args.size() < 2)
throw Exception("Args not matched, should be: key, thread_hint", ErrorCodes::BAD_ARGUMENTS);

String key = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[0]).value);
// the candidate line must be printed by a thread which also print a line contains `thread_hint`
String thread_hint = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[1]).value);
auto log_path = context.getConfigRef().getString("logger.log");

std::ifstream file(log_path);
std::vector<String> line_candidates;
String line;
while (std::getline(file, line))
// get the lines containing `thread_hint` and `key`
std::vector<String> thread_hint_line_candidates;
std::vector<String> key_line_candidates;
{
if ((line.find(key) != String::npos) && (line.find("DBGInvoke") == String::npos))
line_candidates.emplace_back(line);
String line;
while (std::getline(file, line))
{
if ((line.find(thread_hint) != String::npos) && (line.find("DBGInvoke") == String::npos))
thread_hint_line_candidates.emplace_back(line);
else if ((line.find(key) != String::npos) && (line.find("DBGInvoke") == String::npos))
key_line_candidates.emplace_back(line);
}
}
if (line_candidates.empty())
// get target thread id
if (thread_hint_line_candidates.empty() || key_line_candidates.empty())
{
output("Invalid");
return;
}
auto & target_line = line_candidates.back();
size_t target_thread_id = getThreadIdForLog(thread_hint_line_candidates.back());
if (target_thread_id == 0)
{
output("Invalid");
return;
}
String target_line;
for (auto iter = key_line_candidates.rbegin(); iter != key_line_candidates.rend(); iter++)
{
if (getThreadIdForLog(*iter) == target_thread_id)
{
target_line = *iter;
break;
}
}
// try parse the first number following the key
auto sub_line = target_line.substr(target_line.find(key));
std::regex rx(R"([+-]?([0-9]+([.][0-9]*)?|[.][0-9]+))");
std::smatch m;
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaPackFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ void DeltaPackFile::calculateStat(const DMContext & context)
auto hash_salt = context.hash_salt;

auto pack_filter
= DMFilePackFilter::loadFrom(file, index_cache, hash_salt, {segment_range}, EMPTY_FILTER, {}, context.db_context.getFileProvider(), context.getReadLimiter());
= DMFilePackFilter::loadFrom(file, index_cache, hash_salt, /*set_cache_if_miss*/ false, {segment_range}, EMPTY_FILTER, {}, context.db_context.getFileProvider(), context.getReadLimiter());

std::tie(valid_rows, valid_bytes) = pack_filter.validRowsAndBytes();
}
Expand Down Expand Up @@ -56,7 +56,7 @@ DeltaPackPtr DeltaPackFile::deserializeMetadata(DMContext & context, //

auto dmfile = DMFile::restore(context.db_context.getFileProvider(), file_id, file_ref_id, file_parent_path, DMFile::ReadMetaMode::all());

auto dp_file = new DeltaPackFile(dmfile, valid_rows, valid_bytes, segment_range);
auto * dp_file = new DeltaPackFile(dmfile, valid_rows, valid_bytes, segment_range);
return std::shared_ptr<DeltaPackFile>(dp_file);
}

Expand Down Expand Up @@ -132,7 +132,7 @@ size_t DPFileReader::readRowsOnce(MutableColumns & output_cols, //
const RowKeyRange * range)
{
auto read_next_block = [&, this]() -> bool {
rows_before_cur_block += ((bool)cur_block) ? cur_block.rows() : 0;
rows_before_cur_block += (static_cast<bool>(cur_block)) ? cur_block.rows() : 0;
cur_block_data.clear();

cur_block = file_stream->read();
Expand Down
Loading

0 comments on commit e87b54b

Please sign in to comment.