Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#4500
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
lidezhu authored and ti-chi-bot committed Mar 30, 2022
1 parent 8242cf2 commit cd00ddc
Show file tree
Hide file tree
Showing 10 changed files with 636 additions and 37 deletions.
63 changes: 54 additions & 9 deletions dbms/src/Debug/dbgFuncMisc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,73 @@

namespace DB
{
inline size_t getThreadIdForLog(const String & line)
{
auto sub_line = line.substr(line.find("thread_id="));
std::regex rx(R"((0|[1-9][0-9]*))");
std::smatch m;
if (regex_search(sub_line, m, rx))
return std::stoi(m[1]);
else
return 0;
}

// Usage example:
// The first argument is the key you want to search.
// For example, we want to search the key 'RSFilter exclude rate' in log file, and get the value following it.
// So we can use it as the first argument.
// But many kind of thread can print this keyword,
// so we can use the second argument to specify a keyword that may just be printed by a specific kind of thread.
// Here we use 'Rough set filter' to specify we just want to search read thread.
// And the complete command is the following:
// DBGInvoke search_log_for_key('RSFilter exclude rate', 'Rough set filter')
// TODO: this is still a too hack way to do test, but cannot think a better way now.
void dbgFuncSearchLogForKey(Context & context, const ASTs & args, DBGInvoker::Printer output)
{
if (args.size() < 1)
throw Exception("Args not matched, should be: key", ErrorCodes::BAD_ARGUMENTS);
if (args.size() < 2)
throw Exception("Args not matched, should be: key, thread_hint", ErrorCodes::BAD_ARGUMENTS);

String key = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[0]).value);
// the candidate line must be printed by a thread which also print a line contains `thread_hint`
String thread_hint = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[1]).value);
auto log_path = context.getConfigRef().getString("logger.log");

std::ifstream file(log_path);
std::vector<String> line_candidates;
String line;
while (std::getline(file, line))
// get the lines containing `thread_hint` and `key`
std::vector<String> thread_hint_line_candidates;
std::vector<String> key_line_candidates;
{
if ((line.find(key) != String::npos) && (line.find("DBGInvoke") == String::npos))
line_candidates.emplace_back(line);
String line;
while (std::getline(file, line))
{
if ((line.find(thread_hint) != String::npos) && (line.find("DBGInvoke") == String::npos))
thread_hint_line_candidates.emplace_back(line);
else if ((line.find(key) != String::npos) && (line.find("DBGInvoke") == String::npos))
key_line_candidates.emplace_back(line);
}
}
if (line_candidates.empty())
// get target thread id
if (thread_hint_line_candidates.empty() || key_line_candidates.empty())
{
output("Invalid");
return;
}
auto & target_line = line_candidates.back();
size_t target_thread_id = getThreadIdForLog(thread_hint_line_candidates.back());
if (target_thread_id == 0)
{
output("Invalid");
return;
}
String target_line;
for (auto iter = key_line_candidates.rbegin(); iter != key_line_candidates.rend(); iter++)
{
if (getThreadIdForLog(*iter) == target_thread_id)
{
target_line = *iter;
break;
}
}
// try parse the first number following the key
auto sub_line = target_line.substr(target_line.find(key));
std::regex rx(R"([+-]?([0-9]+([.][0-9]*)?|[.][0-9]+))");
std::smatch m;
Expand Down
234 changes: 234 additions & 0 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Storages/DeltaMerge/ColumnFile/ColumnFileBig.h>
#include <Storages/DeltaMerge/DMContext.h>
#include <Storages/DeltaMerge/File/DMFileBlockInputStream.h>
#include <Storages/DeltaMerge/RowKeyFilter.h>
#include <Storages/DeltaMerge/convertColumnTypeHelpers.h>
#include <Storages/PathPool.h>

namespace DB
{
namespace DM
{
ColumnFileBig::ColumnFileBig(const DMContext & context, const DMFilePtr & file_, const RowKeyRange & segment_range_)
: file(file_)
, segment_range(segment_range_)
{
calculateStat(context);
}

void ColumnFileBig::calculateStat(const DMContext & context)
{
auto index_cache = context.db_context.getGlobalContext().getMinMaxIndexCache();

auto pack_filter = DMFilePackFilter::loadFrom(
file,
index_cache,
/*set_cache_if_miss*/ false,
{segment_range},
EMPTY_FILTER,
{},
context.db_context.getFileProvider(),
context.getReadLimiter(),
/*tracing_logger*/ nullptr);

std::tie(valid_rows, valid_bytes) = pack_filter.validRowsAndBytes();
}

ColumnFileReaderPtr
ColumnFileBig::getReader(const DMContext & context, const StorageSnapshotPtr & /*storage_snap*/, const ColumnDefinesPtr & col_defs) const
{
return std::make_shared<ColumnFileBigReader>(context, *this, col_defs);
}

void ColumnFileBig::serializeMetadata(WriteBuffer & buf, bool /*save_schema*/) const
{
writeIntBinary(file->refId(), buf);
writeIntBinary(valid_rows, buf);
writeIntBinary(valid_bytes, buf);
}

ColumnFilePersistedPtr ColumnFileBig::deserializeMetadata(DMContext & context, //
const RowKeyRange & segment_range,
ReadBuffer & buf)
{
UInt64 file_ref_id;
size_t valid_rows, valid_bytes;

readIntBinary(file_ref_id, buf);
readIntBinary(valid_rows, buf);
readIntBinary(valid_bytes, buf);

auto file_id = context.storage_pool.dataReader().getNormalPageId(file_ref_id);
auto file_parent_path = context.path_pool.getStableDiskDelegator().getDTFilePath(file_id);

auto dmfile = DMFile::restore(context.db_context.getFileProvider(), file_id, file_ref_id, file_parent_path, DMFile::ReadMetaMode::all());

auto * dp_file = new ColumnFileBig(dmfile, valid_rows, valid_bytes, segment_range);
return std::shared_ptr<ColumnFileBig>(dp_file);
}

void ColumnFileBigReader::initStream()
{
if (file_stream)
return;

DMFileBlockInputStreamBuilder builder(context.db_context);
file_stream = builder.build(column_file.getFile(), *col_defs, RowKeyRanges{column_file.segment_range});

// If we only need to read pk and version columns, then cache columns data in memory.
if (pk_ver_only)
{
Block block;
size_t total_rows = 0;
while ((block = file_stream->read()))
{
Columns columns;
columns.push_back(block.getByPosition(0).column);
if (col_defs->size() == 2)
columns.push_back(block.getByPosition(1).column);
cached_pk_ver_columns.push_back(std::move(columns));
total_rows += block.rows();
cached_block_rows_end.push_back(total_rows);
}
}
}

size_t ColumnFileBigReader::readRowsRepeatedly(MutableColumns & output_cols, size_t rows_offset, size_t rows_limit, const RowKeyRange * range)
{
if (unlikely(rows_offset + rows_limit > column_file.valid_rows))
throw Exception("Try to read more rows", ErrorCodes::LOGICAL_ERROR);

/// Read pk and version columns from cached.

auto [start_block_index, rows_start_in_start_block] = locatePosByAccumulation(cached_block_rows_end, rows_offset);
auto [end_block_index, rows_end_in_end_block] = locatePosByAccumulation(cached_block_rows_end, //
rows_offset + rows_limit);

size_t actual_read = 0;
for (size_t block_index = start_block_index; block_index < cached_pk_ver_columns.size() && block_index <= end_block_index;
++block_index)
{
size_t rows_start_in_block = block_index == start_block_index ? rows_start_in_start_block : 0;
size_t rows_end_in_block
= block_index == end_block_index ? rows_end_in_end_block : cached_pk_ver_columns[block_index].at(0)->size();
size_t rows_in_block_limit = rows_end_in_block - rows_start_in_block;

// Nothing to read.
if (rows_start_in_block == rows_end_in_block)
continue;

const auto & columns = cached_pk_ver_columns.at(block_index);
const auto & pk_column = columns[0];

actual_read += copyColumnsData(columns, pk_column, output_cols, rows_start_in_block, rows_in_block_limit, range);
}
return actual_read;
}

size_t ColumnFileBigReader::readRowsOnce(MutableColumns & output_cols, //
size_t rows_offset,
size_t rows_limit,
const RowKeyRange * range)
{
auto read_next_block = [&, this]() -> bool {
rows_before_cur_block += (static_cast<bool>(cur_block)) ? cur_block.rows() : 0;
cur_block_data.clear();

cur_block = file_stream->read();
cur_block_offset = 0;

if (!cur_block)
{
file_stream = {};
return false;
}
else
{
for (size_t col_index = 0; col_index < output_cols.size(); ++col_index)
cur_block_data.push_back(cur_block.getByPosition(col_index).column);
return true;
}
};

size_t rows_end = rows_offset + rows_limit;
size_t actual_read = 0;
size_t read_offset = rows_offset;
while (read_offset < rows_end)
{
if (!cur_block || cur_block_offset == cur_block.rows())
{
if (unlikely(!read_next_block()))
throw Exception("Not enough delta data to read [offset=" + DB::toString(rows_offset)
+ "] [limit=" + DB::toString(rows_limit) + "] [read_offset=" + DB::toString(read_offset) + "]",
ErrorCodes::LOGICAL_ERROR);
}
if (unlikely(read_offset < rows_before_cur_block + cur_block_offset))
throw Exception("read_offset is too small [offset=" + DB::toString(rows_offset) + "] [limit=" + DB::toString(rows_limit)
+ "] [read_offset=" + DB::toString(read_offset)
+ "] [min_offset=" + DB::toString(rows_before_cur_block + cur_block_offset) + "]",
ErrorCodes::LOGICAL_ERROR);

if (read_offset >= rows_before_cur_block + cur_block.rows())
{
cur_block_offset = cur_block.rows();
continue;
}
auto read_end_for_cur_block = std::min(rows_end, rows_before_cur_block + cur_block.rows());

auto read_start_in_block = read_offset - rows_before_cur_block;
auto read_limit_in_block = read_end_for_cur_block - read_offset;

actual_read += copyColumnsData(cur_block_data, cur_block_data[0], output_cols, read_start_in_block, read_limit_in_block, range);
read_offset += read_limit_in_block;
cur_block_offset += read_limit_in_block;
}
return actual_read;
}

size_t ColumnFileBigReader::readRows(MutableColumns & output_cols, size_t rows_offset, size_t rows_limit, const RowKeyRange * range)
{
initStream();

try
{
if (pk_ver_only)
return readRowsRepeatedly(output_cols, rows_offset, rows_limit, range);
else
return readRowsOnce(output_cols, rows_offset, rows_limit, range);
}
catch (DB::Exception & e)
{
e.addMessage(" while reading DTFile " + column_file.getFile()->path());
throw;
}
}

Block ColumnFileBigReader::readNextBlock()
{
initStream();

return file_stream->read();
}

ColumnFileReaderPtr ColumnFileBigReader::createNewReader(const ColumnDefinesPtr & new_col_defs)
{
// Currently we don't reuse the cache data.
return std::make_shared<ColumnFileBigReader>(context, column_file, new_col_defs);
}

} // namespace DM
} // namespace DB
52 changes: 52 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1489,7 +1489,12 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit)
next_gc_check_key = segment_it->first.toRowKeyValue();
}

<<<<<<< HEAD
if (segment->hasAbandoned())
=======
assert(segment != nullptr);
if (segment->hasAbandoned() || segment_snap == nullptr)
>>>>>>> 5e0c2f8f2e (fix empty segment cannot merge after gc and avoid write index data for empty dmfile (#4500))
continue;

if (segment->getLastCheckGCSafePoint() >= gc_safe_point)
Expand All @@ -1499,6 +1504,7 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit)
RowKeyRange segment_range = segment->getRowKeyRange();
if (segment->getDelta()->isUpdating())
{
<<<<<<< HEAD
LOG_DEBUG(log,
"GC is skipped Segment [" << segment_id << "] [range=" << segment_range.toDebugString() << "] [table=" << table_name
<< "]");
Expand All @@ -1523,6 +1529,47 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit)
// Check whether we should apply gc on this segment
const bool should_compact
= GC::shouldCompact(segment, gc_safe_point, global_context.getSettingsRef().dt_bg_gc_ratio_threhold_to_trigger_gc, log);
=======
// release segment_snap before checkSegmentUpdate, otherwise this segment is still in update status.
segment_snap = {};
checkSegmentUpdate(dm_context, segment, ThreadType::BG_GC);
continue;
}

try
{
// Check whether we should apply gc on this segment
bool should_compact = false;
if (GC::shouldCompactDeltaWithStable(
*dm_context,
segment_snap,
segment_range,
global_context.getSettingsRef().dt_bg_gc_delta_delete_ratio_to_trigger_gc,
log))
{
should_compact = true;
}
else if (segment->getLastCheckGCSafePoint() < gc_safe_point)
{
// Avoid recheck this segment when gc_safe_point doesn't change regardless whether we trigger this segment's DeltaMerge or not.
// Because after we calculate StableProperty and compare it with this gc_safe_point,
// there is no need to recheck it again using the same gc_safe_point.
// On the other hand, if it should do DeltaMerge using this gc_safe_point, and the DeltaMerge is interruptted by other process,
// it's still worth to wait another gc_safe_point to check this segment again.
segment->setLastCheckGCSafePoint(gc_safe_point);
dm_context->min_version = gc_safe_point;

// calculate StableProperty if needed
if (!segment->getStable()->isStablePropertyCached())
segment->getStable()->calculateStableProperty(*dm_context, segment_range, isCommonHandle());

should_compact = GC::shouldCompactStable(
segment,
gc_safe_point,
global_context.getSettingsRef().dt_bg_gc_ratio_threhold_to_trigger_gc,
log);
}
>>>>>>> 5e0c2f8f2e (fix empty segment cannot merge after gc and avoid write index data for empty dmfile (#4500))
bool finish_gc_on_segment = false;
if (should_compact)
{
Expand All @@ -1531,7 +1578,12 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit)
if (segment)
{
// Continue to check whether we need to apply more tasks on this segment
<<<<<<< HEAD
checkSegmentUpdate(dm_context, segment, type);
=======
segment_snap = {};
checkSegmentUpdate(dm_context, segment, ThreadType::BG_GC);
>>>>>>> 5e0c2f8f2e (fix empty segment cannot merge after gc and avoid write index data for empty dmfile (#4500))
gc_segments_num++;
finish_gc_on_segment = true;
LOG_INFO(log,
Expand Down
Loading

0 comments on commit cd00ddc

Please sign in to comment.