Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make fast scan code mode clean #6058

Merged
merged 15 commits into from
Oct 13, 2022
34 changes: 6 additions & 28 deletions dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
const RSOperatorPtr & filter_,
UInt64 max_version_,
size_t expected_block_size_,
bool is_raw_,
bool do_delete_mark_filter_for_raw_,
ReadMode read_mode_,
const int extra_table_id_index,
const TableID physical_table_id,
const String & req_id)
Expand All @@ -60,8 +59,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
, header(toEmptyBlock(columns_to_read))
, max_version(max_version_)
, expected_block_size(expected_block_size_)
, is_raw(is_raw_)
, do_delete_mark_filter_for_raw(do_delete_mark_filter_for_raw_)
, read_mode(read_mode_)
, extra_table_id_index(extra_table_id_index)
, physical_table_id(physical_table_id)
, log(Logger::get(NAME, req_id))
Expand Down Expand Up @@ -100,29 +98,10 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
LOG_DEBUG(log, "Read done");
return {};
}

cur_segment = task->segment;
if (is_raw)
{
cur_stream = cur_segment->getInputStreamRaw(
*dm_context,
columns_to_read,
task->read_snapshot,
task->ranges,
filter,
do_delete_mark_filter_for_raw);
}
else
{
cur_stream = cur_segment->getInputStream(
*dm_context,
columns_to_read,
task->read_snapshot,
task->ranges,
filter,
max_version,
std::max(expected_block_size, static_cast<size_t>(dm_context->db_context.getSettingsRef().dt_segment_stable_pack_rows)));
}

auto block_size = std::max(expected_block_size, static_cast<size_t>(dm_context->db_context.getSettingsRef().dt_segment_stable_pack_rows));
cur_stream = task->segment->getInputStream(read_mode, *dm_context, columns_to_read, task->read_snapshot, task->ranges, filter, max_version, block_size);
LOG_TRACE(log, "Start to read segment, segment={}", cur_segment->simpleInfo());
}
FAIL_POINT_PAUSE(FailPoints::pause_when_reading_from_dt_stream);
Expand Down Expand Up @@ -172,8 +151,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
Block header;
const UInt64 max_version;
const size_t expected_block_size;
const bool is_raw;
const bool do_delete_mark_filter_for_raw;
const ReadMode read_mode;
// position of the ExtraPhysTblID column in column_names parameter in the StorageDeltaMerge::read function.
const int extra_table_id_index;

Expand Down
14 changes: 5 additions & 9 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,7 @@ void DeltaMergeStore::compact(const Context & db_context, const RowKeyRange & ra
}
}

// Read data without mvcc filtering && delete mark != 0 filtering.
// Read data without mvcc filtering.
// just for debug
// readRaw is called under 'selraw xxxx'
BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context,
Expand Down Expand Up @@ -908,8 +908,7 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context,
EMPTY_FILTER,
std::numeric_limits<UInt64>::max(),
DEFAULT_BLOCK_SIZE,
/* is_raw = */ true,
/* do_delete_mark_filter_for_raw = */ false,
/* read_mode */ ReadMode::Raw,
std::move(tasks),
after_segment_read,
req_info);
Expand Down Expand Up @@ -937,8 +936,7 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context,
EMPTY_FILTER,
std::numeric_limits<UInt64>::max(),
DEFAULT_BLOCK_SIZE,
/* is_raw_ */ true,
/* do_delete_mark_filter_for_raw_ */ false, // don't do filter based on del_mark = 1
/* read_mode */ ReadMode::Raw,
extra_table_id_index,
physical_table_id,
req_info);
Expand Down Expand Up @@ -992,8 +990,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context,
filter,
max_version,
expected_block_size,
/* is_raw = */ is_fast_scan,
/* do_delete_mark_filter_for_raw = */ is_fast_scan,
/* read_mode = */ is_fast_scan ? ReadMode::Fast : ReadMode::Normal,
std::move(tasks),
after_segment_read,
log_tracing_id);
Expand Down Expand Up @@ -1021,8 +1018,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context,
filter,
max_version,
expected_block_size,
/* is_raw_= */ is_fast_scan,
/* do_delete_mark_filter_for_raw_= */ is_fast_scan,
/* read_mode = */ is_fast_scan ? ReadMode::Fast : ReadMode::Normal,
extra_table_id_index,
physical_table_id,
log_tracing_id);
Expand Down
180 changes: 123 additions & 57 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,13 +395,37 @@ SegmentSnapshotPtr Segment::createSnapshot(const DMContext & dm_context, bool fo
return std::make_shared<SegmentSnapshot>(std::move(delta_snap), std::move(stable_snap));
}

BlockInputStreamPtr Segment::getInputStream(const DMContext & dm_context,

BlockInputStreamPtr Segment::getInputStream(const ReadMode & read_mode,
const DMContext & dm_context,
const ColumnDefines & columns_to_read,
const SegmentSnapshotPtr & segment_snap,
const RowKeyRanges & read_ranges,
const RSOperatorPtr & filter,
UInt64 max_version,
size_t expected_block_size)
{
switch (read_mode)
{
case ReadMode::Normal:
return getInputStreamModeNormal(dm_context, columns_to_read, segment_snap, read_ranges, filter, max_version, expected_block_size);
break;
case ReadMode::Fast:
return getInputStreamModeFast(dm_context, columns_to_read, segment_snap, read_ranges, filter, expected_block_size);
break;
case ReadMode::Raw:
return getInputStreamModeRaw(dm_context, columns_to_read, segment_snap, read_ranges, expected_block_size);
break;
}
}

BlockInputStreamPtr Segment::getInputStreamModeNormal(const DMContext & dm_context,
const ColumnDefines & columns_to_read,
const SegmentSnapshotPtr & segment_snap,
const RowKeyRanges & read_ranges,
const RSOperatorPtr & filter,
UInt64 max_version,
size_t expected_block_size)
{
LOG_TRACE(log, "Begin segment create input stream");

Expand Down Expand Up @@ -479,17 +503,17 @@ BlockInputStreamPtr Segment::getInputStream(const DMContext & dm_context,
return stream;
}

BlockInputStreamPtr Segment::getInputStream(const DMContext & dm_context,
const ColumnDefines & columns_to_read,
const RowKeyRanges & read_ranges,
const RSOperatorPtr & filter,
UInt64 max_version,
size_t expected_block_size)
BlockInputStreamPtr Segment::getInputStreamModeNormal(const DMContext & dm_context,
const ColumnDefines & columns_to_read,
const RowKeyRanges & read_ranges,
const RSOperatorPtr & filter,
UInt64 max_version,
size_t expected_block_size)
{
auto segment_snap = createSnapshot(dm_context, false, CurrentMetrics::DT_SnapshotOfRead);
if (!segment_snap)
return {};
return getInputStream(dm_context, columns_to_read, segment_snap, read_ranges, filter, max_version, expected_block_size);
return getInputStreamModeNormal(dm_context, columns_to_read, segment_snap, read_ranges, filter, max_version, expected_block_size);
}

BlockInputStreamPtr Segment::getInputStreamForDataExport(const DMContext & dm_context,
Expand Down Expand Up @@ -527,64 +551,58 @@ BlockInputStreamPtr Segment::getInputStreamForDataExport(const DMContext & dm_co
return data_stream;
}

/// we will call getInputStreamRaw in two condition:
/// 1. Using 'selraw xxxx' statement, which is always in test for debug. (when filter_delete_mark = false)
/// In this case, we will read all the data without mvcc filtering,
/// del_mark != 0 filtering and sorted merge.
/// We will just read all the data and return.
/// 2. We read in fast mode. (when filter_delete_mark = true)
/// In this case, we will read all the data without mvcc filtering and sorted merge,
/// but we will do del_mark != 0 filtering.
BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context,
const ColumnDefines & columns_to_read,
const SegmentSnapshotPtr & segment_snap,
const RowKeyRanges & data_ranges,
const RSOperatorPtr & filter,
bool filter_delete_mark,
size_t expected_block_size)
/// We call getInputStreamModeFast when we read in fast mode.
/// In this case, we will read all the data in delta and stable, and then merge them without sorting.
/// Besides, we will do del_mark != 0 filtering to drop the deleted rows.
/// In conclusion, the output is unsorted, and does not do mvcc filtering.
BlockInputStreamPtr Segment::getInputStreamModeFast(
const DMContext & dm_context,
const ColumnDefines & columns_to_read,
const SegmentSnapshotPtr & segment_snap,
const RowKeyRanges & data_ranges,
const RSOperatorPtr & filter,
size_t expected_block_size)
{
/// Now, we use filter_delete_mark to determine whether it is in fast mode or just from `selraw * xxxx`
/// But this way seems not to be robustness enough, maybe we need another flag?
auto new_columns_to_read = std::make_shared<ColumnDefines>();

// new_columns_to_read need at most columns_to_read.size() + 2, due to may extra insert into the handle column and del_mark column.
new_columns_to_read->reserve(columns_to_read.size() + 2);

new_columns_to_read->push_back(getExtraHandleColumnDefine(is_common_handle));
if (filter_delete_mark)
{
new_columns_to_read->push_back(getTagColumnDefine());
}

bool enable_handle_clean_read = filter_delete_mark;
bool enable_del_clean_read = filter_delete_mark;
new_columns_to_read->push_back(getTagColumnDefine());

/// When we read in fast mode, we can try to do the following optimization:
/// 1. Handle Column Optimization:
/// when the columns_to_read does not include HANDLE_COLUMN,
/// we can try to skip reading the handle column if the pack's handle range is fully within read range.
/// Thus, in this case, we set enable_handle_clean_read = true.
/// 2. Del Column Optimization:
/// when the columns_to_read does not include TAG_COLUMN,
/// we can try to skip reading the del column if the pack has no deleted rows.
/// Thus, in this case, we set enable_del_clean_read = true.
/// 3. Version Column Optimization:
breezewish marked this conversation as resolved.
Show resolved Hide resolved
/// if the columns_to_read does not include VERSION_COLUMN,
/// we don't need to read version column, thus we don't force push version column into new_columns_to_read.

bool enable_handle_clean_read = true;
bool enable_del_clean_read = true;

for (const auto & c : columns_to_read)
{
if (c.id != EXTRA_HANDLE_COLUMN_ID)
if (c.id == EXTRA_HANDLE_COLUMN_ID)
{
if (filter_delete_mark && c.id == TAG_COLUMN_ID)
{
enable_del_clean_read = false;
}
else
{
new_columns_to_read->push_back(c);
}
enable_handle_clean_read = false;
}
else if (c.id == TAG_COLUMN_ID)
{
enable_del_clean_read = false;
}
else
{
enable_handle_clean_read = false;
new_columns_to_read->push_back(c);
}
}
Lloyd-Pottiger marked this conversation as resolved.
Show resolved Hide resolved

/// when we read in fast mode, if columns_to_read does not include EXTRA_HANDLE_COLUMN_ID,
/// we can try to use clean read to make optimization in stable part.
/// when the pack is under totally data_ranges and has no rows whose del_mark = 1 --> we don't need read handle_column/tag_column/version_column
/// when the pack is under totally data_ranges and has rows whose del_mark = 1 --> we don't need read handle_column/version_column
/// others --> we don't need read version_column
/// Thus, in fast mode, if we don't need read handle_column, we set enable_handle_clean_read as true.
/// If we don't need read del_column, we set enable_del_clean_read as true
BlockInputStreamPtr stable_stream = segment_snap->stable->getInputStream(
dm_context,
*new_columns_to_read,
Expand All @@ -593,25 +611,73 @@ BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context,
std::numeric_limits<UInt64>::max(),
expected_block_size,
/* enable_handle_clean_read */ enable_handle_clean_read,
/* is_fast_scan */ filter_delete_mark,
/* is_fast_scan */ true,
/* enable_del_clean_read */ enable_del_clean_read);

BlockInputStreamPtr delta_stream = std::make_shared<DeltaValueInputStream>(dm_context, segment_snap->delta, new_columns_to_read, this->rowkey_range);

// Do row key filtering based on data_ranges.
delta_stream = std::make_shared<DMRowKeyFilterBlockInputStream<false>>(delta_stream, data_ranges, 0);
stable_stream = std::make_shared<DMRowKeyFilterBlockInputStream<true>>(stable_stream, data_ranges, 0);

if (filter_delete_mark)
// Filter the unneeded column and filter out the rows whose del_mark is true.
delta_stream = std::make_shared<DMDeleteFilterBlockInputStream>(delta_stream, columns_to_read, dm_context.tracing_id);
stable_stream = std::make_shared<DMDeleteFilterBlockInputStream>(stable_stream, columns_to_read, dm_context.tracing_id);

BlockInputStreams streams;

if (dm_context.read_delta_only)
{
delta_stream = std::make_shared<DMDeleteFilterBlockInputStream>(delta_stream, columns_to_read, dm_context.tracing_id);
stable_stream = std::make_shared<DMDeleteFilterBlockInputStream>(stable_stream, columns_to_read, dm_context.tracing_id);
streams.push_back(delta_stream);
}
else if (dm_context.read_stable_only)
{
streams.push_back(stable_stream);
}
else
{
delta_stream = std::make_shared<DMColumnProjectionBlockInputStream>(delta_stream, columns_to_read);
stable_stream = std::make_shared<DMColumnProjectionBlockInputStream>(stable_stream, columns_to_read);
streams.push_back(delta_stream);
streams.push_back(stable_stream);
}
return std::make_shared<ConcatBlockInputStream>(streams, dm_context.tracing_id);
}

/// We call getInputStreamModeRaw in 'selraw xxxx' statement, which is always in test for debug.
/// In this case, we will read all the data without mvcc filtering and sorted merge.
BlockInputStreamPtr Segment::getInputStreamModeRaw(const DMContext & dm_context,
const ColumnDefines & columns_to_read,
const SegmentSnapshotPtr & segment_snap,
const RowKeyRanges & data_ranges,
size_t expected_block_size)
{
auto new_columns_to_read = std::make_shared<ColumnDefines>();

new_columns_to_read->push_back(getExtraHandleColumnDefine(is_common_handle));

for (const auto & c : columns_to_read)
{
if (c.id != EXTRA_HANDLE_COLUMN_ID)
new_columns_to_read->push_back(c);
}

BlockInputStreamPtr stable_stream = segment_snap->stable->getInputStream(
dm_context,
*new_columns_to_read,
data_ranges,
EMPTY_FILTER,
std::numeric_limits<UInt64>::max(),
expected_block_size,
/* enable_handle_clean_read */ false);

BlockInputStreamPtr delta_stream = std::make_shared<DeltaValueInputStream>(dm_context, segment_snap->delta, new_columns_to_read, this->rowkey_range);

// Do row key filtering based on data_ranges.
delta_stream = std::make_shared<DMRowKeyFilterBlockInputStream<false>>(delta_stream, data_ranges, 0);
stable_stream = std::make_shared<DMRowKeyFilterBlockInputStream<true>>(stable_stream, data_ranges, 0);

// Filter the unneeded columns.
delta_stream = std::make_shared<DMColumnProjectionBlockInputStream>(delta_stream, columns_to_read);
stable_stream = std::make_shared<DMColumnProjectionBlockInputStream>(stable_stream, columns_to_read);

BlockInputStreams streams;

Expand All @@ -631,12 +697,12 @@ BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context,
return std::make_shared<ConcatBlockInputStream>(streams, dm_context.tracing_id);
}

BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, const ColumnDefines & columns_to_read, bool filter_delete_mark)
BlockInputStreamPtr Segment::getInputStreamModeRaw(const DMContext & dm_context, const ColumnDefines & columns_to_read)
{
auto segment_snap = createSnapshot(dm_context, false, CurrentMetrics::DT_SnapshotOfReadRaw);
if (!segment_snap)
return {};
return getInputStreamRaw(dm_context, columns_to_read, segment_snap, {rowkey_range}, EMPTY_FILTER, filter_delete_mark);
return getInputStreamModeRaw(dm_context, columns_to_read, segment_snap, {rowkey_range});
}

SegmentPtr Segment::mergeDelta(DMContext & dm_context, const ColumnDefinesPtr & schema_snap) const
Expand Down
Loading