Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix empty segment cannot merge after gc and avoid write index data for empty dmfile (#4500) #4517

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 54 additions & 9 deletions dbms/src/Debug/dbgFuncMisc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,73 @@

namespace DB
{
inline size_t getThreadIdForLog(const String & line)
{
auto sub_line = line.substr(line.find("thread_id="));
std::regex rx(R"((0|[1-9][0-9]*))");
std::smatch m;
if (regex_search(sub_line, m, rx))
return std::stoi(m[1]);
else
return 0;
}

// Usage example:
// The first argument is the key you want to search.
// For example, we want to search the key 'RSFilter exclude rate' in log file, and get the value following it.
// So we can use it as the first argument.
// But many kind of thread can print this keyword,
// so we can use the second argument to specify a keyword that may just be printed by a specific kind of thread.
// Here we use 'Rough set filter' to specify we just want to search read thread.
// And the complete command is the following:
// DBGInvoke search_log_for_key('RSFilter exclude rate', 'Rough set filter')
// TODO: this is still a too hack way to do test, but cannot think a better way now.
void dbgFuncSearchLogForKey(Context & context, const ASTs & args, DBGInvoker::Printer output)
{
if (args.size() < 1)
throw Exception("Args not matched, should be: key", ErrorCodes::BAD_ARGUMENTS);
if (args.size() < 2)
throw Exception("Args not matched, should be: key, thread_hint", ErrorCodes::BAD_ARGUMENTS);

String key = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[0]).value);
// the candidate line must be printed by a thread which also print a line contains `thread_hint`
String thread_hint = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[1]).value);
auto log_path = context.getConfigRef().getString("logger.log");

std::ifstream file(log_path);
std::vector<String> line_candidates;
String line;
while (std::getline(file, line))
// get the lines containing `thread_hint` and `key`
std::vector<String> thread_hint_line_candidates;
std::vector<String> key_line_candidates;
{
if ((line.find(key) != String::npos) && (line.find("DBGInvoke") == String::npos))
line_candidates.emplace_back(line);
String line;
while (std::getline(file, line))
{
if ((line.find(thread_hint) != String::npos) && (line.find("DBGInvoke") == String::npos))
thread_hint_line_candidates.emplace_back(line);
else if ((line.find(key) != String::npos) && (line.find("DBGInvoke") == String::npos))
key_line_candidates.emplace_back(line);
}
}
if (line_candidates.empty())
// get target thread id
if (thread_hint_line_candidates.empty() || key_line_candidates.empty())
{
output("Invalid");
return;
}
auto & target_line = line_candidates.back();
size_t target_thread_id = getThreadIdForLog(thread_hint_line_candidates.back());
if (target_thread_id == 0)
{
output("Invalid");
return;
}
String target_line;
for (auto iter = key_line_candidates.rbegin(); iter != key_line_candidates.rend(); iter++)
{
if (getThreadIdForLog(*iter) == target_thread_id)
{
target_line = *iter;
break;
}
}
// try parse the first number following the key
auto sub_line = target_line.substr(target_line.find(key));
std::regex rx(R"([+-]?([0-9]+([.][0-9]*)?|[.][0-9]+))");
std::smatch m;
Expand Down
13 changes: 13 additions & 0 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaPackFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,21 @@ void DeltaPackFile::calculateStat(const DMContext & context)
auto index_cache = context.db_context.getGlobalContext().getMinMaxIndexCache();
auto hash_salt = context.hash_salt;

<<<<<<< HEAD:dbms/src/Storages/DeltaMerge/Delta/DeltaPackFile.cpp
auto pack_filter
= DMFilePackFilter::loadFrom(file, index_cache, hash_salt, {segment_range}, EMPTY_FILTER, {}, context.db_context.getFileProvider(), context.getReadLimiter());
=======
auto pack_filter = DMFilePackFilter::loadFrom(
file,
index_cache,
/*set_cache_if_miss*/ false,
{segment_range},
EMPTY_FILTER,
{},
context.db_context.getFileProvider(),
context.getReadLimiter(),
/*tracing_logger*/ nullptr);
>>>>>>> 5e0c2f8f2e (fix empty segment cannot merge after gc and avoid write index data for empty dmfile (#4500)):dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp

std::tie(valid_rows, valid_bytes) = pack_filter.validRowsAndBytes();
}
Expand Down
59 changes: 34 additions & 25 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1553,7 +1553,7 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit)
}

assert(segment != nullptr);
if (segment->hasAbandoned() || segment->getLastCheckGCSafePoint() >= gc_safe_point || segment_snap == nullptr)
if (segment->hasAbandoned() || segment_snap == nullptr)
continue;

const auto segment_id = segment->segmentId();
Expand All @@ -1562,43 +1562,52 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit)
// meet empty segment, try merge it
if (segment_snap->getRows() == 0)
{
// release segment_snap before checkSegmentUpdate, otherwise this segment is still in update status.
segment_snap = {};
checkSegmentUpdate(dm_context, segment, ThreadType::BG_GC);
continue;
}

// Avoid recheck this segment when gc_safe_point doesn't change regardless whether we trigger this segment's DeltaMerge or not.
// Because after we calculate StableProperty and compare it with this gc_safe_point,
// there is no need to recheck it again using the same gc_safe_point.
// On the other hand, if it should do DeltaMerge using this gc_safe_point, and the DeltaMerge is interruptted by other process,
// it's still worth to wait another gc_safe_point to check this segment again.
segment->setLastCheckGCSafePoint(gc_safe_point);
dm_context->min_version = gc_safe_point;

// calculate StableProperty if needed
if (!segment->getStable()->isStablePropertyCached())
segment->getStable()->calculateStableProperty(*dm_context, segment_range, isCommonHandle());

try
{
// Check whether we should apply gc on this segment
const bool should_compact
= GC::shouldCompactStable(
segment,
gc_safe_point,
global_context.getSettingsRef().dt_bg_gc_ratio_threhold_to_trigger_gc,
log)
|| GC::shouldCompactDeltaWithStable(
*dm_context,
segment_snap,
segment_range,
global_context.getSettingsRef().dt_bg_gc_delta_delete_ratio_to_trigger_gc,
log);
bool should_compact = false;
if (GC::shouldCompactDeltaWithStable(
*dm_context,
segment_snap,
segment_range,
global_context.getSettingsRef().dt_bg_gc_delta_delete_ratio_to_trigger_gc,
log))
{
should_compact = true;
}
else if (segment->getLastCheckGCSafePoint() < gc_safe_point)
{
// Avoid recheck this segment when gc_safe_point doesn't change regardless whether we trigger this segment's DeltaMerge or not.
// Because after we calculate StableProperty and compare it with this gc_safe_point,
// there is no need to recheck it again using the same gc_safe_point.
// On the other hand, if it should do DeltaMerge using this gc_safe_point, and the DeltaMerge is interruptted by other process,
// it's still worth to wait another gc_safe_point to check this segment again.
segment->setLastCheckGCSafePoint(gc_safe_point);
dm_context->min_version = gc_safe_point;

// calculate StableProperty if needed
if (!segment->getStable()->isStablePropertyCached())
segment->getStable()->calculateStableProperty(*dm_context, segment_range, isCommonHandle());

should_compact = GC::shouldCompactStable(
segment,
gc_safe_point,
global_context.getSettingsRef().dt_bg_gc_ratio_threhold_to_trigger_gc,
log);
}
bool finish_gc_on_segment = false;
if (should_compact)
{
if (segment = segmentMergeDelta(*dm_context, segment, TaskRunThread::BackgroundGCThread, segment_snap); segment)
{
// Continue to check whether we need to apply more tasks on this segment
segment_snap = {};
checkSegmentUpdate(dm_context, segment, ThreadType::BG_GC);
gc_segments_num++;
finish_gc_on_segment = true;
Expand Down
79 changes: 79 additions & 0 deletions dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Interpreters/Context.h>
#include <Storages/DeltaMerge/File/DMFileBlockInputStream.h>

namespace DB::DM
{
DMFileBlockInputStreamBuilder::DMFileBlockInputStreamBuilder(const Context & context)
: file_provider(context.getFileProvider())
, read_limiter(context.getReadLimiter())
{
// init from global context
const auto & global_context = context.getGlobalContext();
setCaches(global_context.getMarkCache(), global_context.getMinMaxIndexCache());
// init from settings
setFromSettings(context.getSettingsRef());
}

DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(const DMFilePtr & dmfile, const ColumnDefines & read_columns, const RowKeyRanges & rowkey_ranges)
{
if (dmfile->getStatus() != DMFile::Status::READABLE)
throw Exception(fmt::format(
"DMFile [{}] is expected to be in READABLE status, but: {}",
dmfile->fileId(),
DMFile::statusString(dmfile->getStatus())),
ErrorCodes::LOGICAL_ERROR);

// if `rowkey_ranges` is empty, we unconditionally read all packs
// `rowkey_ranges` and `is_common_handle` will only be useful in clean read mode.
// It is safe to ignore them here.
if (unlikely(rowkey_ranges.empty() && enable_clean_read))
throw Exception("rowkey ranges shouldn't be empty with clean-read enabled", ErrorCodes::LOGICAL_ERROR);

bool is_common_handle = !rowkey_ranges.empty() && rowkey_ranges[0].is_common_handle;

DMFilePackFilter pack_filter = DMFilePackFilter::loadFrom(
dmfile,
index_cache,
/*set_cache_if_miss*/ true,
rowkey_ranges,
rs_filter,
read_packs,
file_provider,
read_limiter,
tracing_logger);

DMFileReader reader(
dmfile,
read_columns,
is_common_handle,
enable_clean_read,
max_data_version,
std::move(pack_filter),
mark_cache,
enable_column_cache,
column_cache,
aio_threshold,
max_read_buffer_size,
file_provider,
read_limiter,
rows_threshold_per_read,
read_one_pack_every_time,
tracing_logger);

return std::make_shared<DMFileBlockInputStream>(std::move(reader));
}
} // namespace DB::DM
48 changes: 41 additions & 7 deletions dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ using IdSetPtr = std::shared_ptr<IdSet>;
class DMFilePackFilter
{
public:
<<<<<<< HEAD
static DMFilePackFilter loadFrom(const DMFilePtr & dmfile,
const MinMaxIndexCachePtr & index_cache,
UInt64 hash_salt,
Expand All @@ -35,6 +36,20 @@ class DMFilePackFilter
const ReadLimiterPtr & read_limiter)
{
auto pack_filter = DMFilePackFilter(dmfile, index_cache, hash_salt, rowkey_ranges, filter, read_packs, file_provider, read_limiter);
=======
static DMFilePackFilter loadFrom(
const DMFilePtr & dmfile,
const MinMaxIndexCachePtr & index_cache,
bool set_cache_if_miss,
const RowKeyRanges & rowkey_ranges,
const RSOperatorPtr & filter,
const IdSetPtr & read_packs,
const FileProviderPtr & file_provider,
const ReadLimiterPtr & read_limiter,
const DB::LoggerPtr & tracing_logger)
{
auto pack_filter = DMFilePackFilter(dmfile, index_cache, set_cache_if_miss, rowkey_ranges, filter, read_packs, file_provider, read_limiter, tracing_logger);
>>>>>>> 5e0c2f8f2e (fix empty segment cannot merge after gc and avoid write index data for empty dmfile (#4500))
pack_filter.init();
return pack_filter;
}
Expand Down Expand Up @@ -86,15 +101,23 @@ class DMFilePackFilter
private:
DMFilePackFilter(const DMFilePtr & dmfile_,
const MinMaxIndexCachePtr & index_cache_,
<<<<<<< HEAD
UInt64 hash_salt_,
=======
bool set_cache_if_miss_,
>>>>>>> 5e0c2f8f2e (fix empty segment cannot merge after gc and avoid write index data for empty dmfile (#4500))
const RowKeyRanges & rowkey_ranges_, // filter by handle range
const RSOperatorPtr & filter_, // filter by push down where clause
const IdSetPtr & read_packs_, // filter by pack index
const FileProviderPtr & file_provider_,
const ReadLimiterPtr & read_limiter_)
: dmfile(dmfile_)
, index_cache(index_cache_)
<<<<<<< HEAD
, hash_salt(hash_salt_)
=======
, set_cache_if_miss(set_cache_if_miss_)
>>>>>>> 5e0c2f8f2e (fix empty segment cannot merge after gc and avoid write index data for empty dmfile (#4500))
, rowkey_ranges(rowkey_ranges_)
, filter(filter_)
, read_packs(read_packs_)
Expand Down Expand Up @@ -200,20 +223,24 @@ class DMFilePackFilter
const DMFilePtr & dmfile,
const FileProviderPtr & file_provider,
const MinMaxIndexCachePtr & index_cache,
bool set_cache_if_miss,
ColId col_id,
const ReadLimiterPtr & read_limiter)
{
auto & type = dmfile->getColumnStat(col_id).type;
const auto file_name_base = DMFile::getFileNameBase(col_id);

auto load = [&]() {
auto index_file_size = dmfile->colIndexSize(file_name_base);
if (index_file_size == 0)
return std::make_shared<MinMaxIndex>(*type);
if (!dmfile->configuration)
{
auto index_buf = ReadBufferFromFileProvider(
file_provider,
dmfile->colIndexPath(file_name_base),
dmfile->encryptionIndexPath(file_name_base),
std::min(static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE), dmfile->colIndexSize(file_name_base)),
std::min(static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE), index_file_size),
read_limiter);
index_buf.seek(dmfile->colIndexOffset(file_name_base));
return MinMaxIndex::read(*type, index_buf, dmfile->colIndexSize(file_name_base));
Expand All @@ -228,21 +255,24 @@ class DMFilePackFilter
dmfile->configuration->getChecksumAlgorithm(),
dmfile->configuration->getChecksumFrameLength());
index_buf->seek(dmfile->colIndexOffset(file_name_base));
auto file_size = dmfile->colIndexSize(file_name_base);
auto header_size = dmfile->configuration->getChecksumHeaderLength();
auto frame_total_size = dmfile->configuration->getChecksumFrameLength();
auto frame_count = file_size / frame_total_size + (file_size % frame_total_size != 0);
return MinMaxIndex::read(*type, *index_buf, file_size - header_size * frame_count);
auto frame_count = index_file_size / frame_total_size + (index_file_size % frame_total_size != 0);
return MinMaxIndex::read(*type, *index_buf, index_file_size - header_size * frame_count);
}
};
MinMaxIndexPtr minmax_index;
if (index_cache)
if (index_cache && set_cache_if_miss)
{
minmax_index = index_cache->getOrSet(dmfile->colIndexCacheKey(file_name_base), load);
}
else
{
minmax_index = load();
// try load from the cache first
if (index_cache)
minmax_index = index_cache->get(dmfile->colIndexCacheKey(file_name_base));
if (!minmax_index)
minmax_index = load();
}
indexes.emplace(col_id, RSIndex(type, minmax_index));
}
Expand All @@ -255,13 +285,17 @@ class DMFilePackFilter
if (!dmfile->isColIndexExist(col_id))
return;

loadIndex(param.indexes, dmfile, file_provider, index_cache, col_id, read_limiter);
loadIndex(param.indexes, dmfile, file_provider, index_cache, set_cache_if_miss, col_id, read_limiter);
}

private:
DMFilePtr dmfile;
MinMaxIndexCachePtr index_cache;
<<<<<<< HEAD
UInt64 hash_salt;
=======
bool set_cache_if_miss;
>>>>>>> 5e0c2f8f2e (fix empty segment cannot merge after gc and avoid write index data for empty dmfile (#4500))
RowKeyRanges rowkey_ranges;
RSOperatorPtr filter;
IdSetPtr read_packs;
Expand Down
Loading