Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make fast scan code mode clean #6058

Merged
merged 15 commits into from
Oct 13, 2022
36 changes: 21 additions & 15 deletions dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
const RSOperatorPtr & filter_,
UInt64 max_version_,
size_t expected_block_size_,
bool is_raw_,
bool do_delete_mark_filter_for_raw_,
ReadMode read_mode_,
const int extra_table_id_index,
const TableID physical_table_id,
const String & req_id)
Expand All @@ -60,8 +59,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
, header(toEmptyBlock(columns_to_read))
, max_version(max_version_)
, expected_block_size(expected_block_size_)
, is_raw(is_raw_)
, do_delete_mark_filter_for_raw(do_delete_mark_filter_for_raw_)
, read_mode(read_mode_)
, extra_table_id_index(extra_table_id_index)
, physical_table_id(physical_table_id)
, log(Logger::get(NAME, req_id))
Expand Down Expand Up @@ -102,26 +100,35 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
}

cur_segment = task->segment;
if (is_raw)
auto block_size = std::max(expected_block_size, static_cast<size_t>(dm_context->db_context.getSettingsRef().dt_segment_stable_pack_rows));
switch (read_mode)
{
cur_stream = cur_segment->getInputStreamRaw(
case Normal:
cur_stream = cur_segment->getInputStream(
*dm_context,
columns_to_read,
task->read_snapshot,
task->ranges,
filter,
do_delete_mark_filter_for_raw);
}
else
{
cur_stream = cur_segment->getInputStream(
max_version,
block_size);
break;
case Fast:
cur_stream = cur_segment->getInputStreamFast(
*dm_context,
columns_to_read,
task->read_snapshot,
task->ranges,
filter,
max_version,
std::max(expected_block_size, static_cast<size_t>(dm_context->db_context.getSettingsRef().dt_segment_stable_pack_rows)));
block_size);
break;
case Raw:
cur_stream = cur_segment->getInputStreamRaw(
*dm_context,
columns_to_read,
task->read_snapshot,
task->ranges);
break;
}
LOG_TRACE(log, "Start to read segment, segment={}", cur_segment->simpleInfo());
}
Expand Down Expand Up @@ -172,8 +179,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
Block header;
const UInt64 max_version;
const size_t expected_block_size;
const bool is_raw;
const bool do_delete_mark_filter_for_raw;
const ReadMode read_mode;
// position of the ExtraPhysTblID column in column_names parameter in the StorageDeltaMerge::read function.
const int extra_table_id_index;

Expand Down
14 changes: 5 additions & 9 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,7 @@ void DeltaMergeStore::compact(const Context & db_context, const RowKeyRange & ra
}
}

// Read data without mvcc filtering && delete mark != 0 filtering.
// Read data without mvcc filtering.
// just for debug
// readRaw is called under 'selraw xxxx'
BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context,
Expand Down Expand Up @@ -902,8 +902,7 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context,
EMPTY_FILTER,
std::numeric_limits<UInt64>::max(),
DEFAULT_BLOCK_SIZE,
/* is_raw = */ true,
/* do_delete_mark_filter_for_raw = */ false,
/* read_mode */ Raw,
std::move(tasks),
after_segment_read,
req_info);
Expand Down Expand Up @@ -931,8 +930,7 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context,
EMPTY_FILTER,
std::numeric_limits<UInt64>::max(),
DEFAULT_BLOCK_SIZE,
/* is_raw_ */ true,
/* do_delete_mark_filter_for_raw_ */ false, // don't do filter based on del_mark = 1
/* read_mode */ Raw,
extra_table_id_index,
physical_table_id,
req_info);
Expand Down Expand Up @@ -986,8 +984,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context,
filter,
max_version,
expected_block_size,
/* is_raw = */ is_fast_scan,
/* do_delete_mark_filter_for_raw = */ is_fast_scan,
/* read_mode = */ is_fast_scan ? Fast : Normal,
std::move(tasks),
after_segment_read,
log_tracing_id);
Expand Down Expand Up @@ -1015,8 +1012,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context,
filter,
max_version,
expected_block_size,
/* is_raw_= */ is_fast_scan,
/* do_delete_mark_filter_for_raw_= */ is_fast_scan,
/* read_mode = */ is_fast_scan ? Fast : Normal,
extra_table_id_index,
physical_table_id,
log_tracing_id);
Expand Down
138 changes: 89 additions & 49 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
#include <memory>
#include <numeric>

#include "Storages/DeltaMerge/Filter/RSOperator.h"
hongyunyan marked this conversation as resolved.
Show resolved Hide resolved

namespace ProfileEvents
{
extern const Event DMWriteBlock;
Expand Down Expand Up @@ -507,64 +509,58 @@ BlockInputStreamPtr Segment::getInputStreamForDataExport(const DMContext & dm_co
return data_stream;
}

/// we will call getInputStreamRaw in two condition:
/// 1. Using 'selraw xxxx' statement, which is always in test for debug. (when filter_delete_mark = false)
/// In this case, we will read all the data without mvcc filtering,
/// del_mark != 0 filtering and sorted merge.
/// We will just read all the data and return.
/// 2. We read in fast mode. (when filter_delete_mark = true)
/// In this case, we will read all the data without mvcc filtering and sorted merge,
/// but we will do del_mark != 0 filtering.
BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context,
const ColumnDefines & columns_to_read,
const SegmentSnapshotPtr & segment_snap,
const RowKeyRanges & data_ranges,
const RSOperatorPtr & filter,
bool filter_delete_mark,
size_t expected_block_size)
/// We call getInputStreamFast when we read in fast mode.
/// In this case, we will read all the data in delta and stable, and then merge them without sorting.
/// Besides, we will do del_mark != 0 filtering to drop the deleted rows.
/// In conclusion, the output is unsorted, and does not do mvcc filtering.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very nice explanation!

BlockInputStreamPtr Segment::getInputStreamFast(
const DMContext & dm_context,
const ColumnDefines & columns_to_read,
const SegmentSnapshotPtr & segment_snap,
const RowKeyRanges & data_ranges,
const RSOperatorPtr & filter,
size_t expected_block_size)
{
/// Now, we use filter_delete_mark to determine whether it is in fast mode or just from `selraw * xxxx`
/// But this way seems not to be robustness enough, maybe we need another flag?
auto new_columns_to_read = std::make_shared<ColumnDefines>();

// new_columns_to_read need at most columns_to_read.size() + 2, due to may extra insert into the handle column and del_mark column.
new_columns_to_read->reserve(columns_to_read.size() + 2);

new_columns_to_read->push_back(getExtraHandleColumnDefine(is_common_handle));
if (filter_delete_mark)
{
new_columns_to_read->push_back(getTagColumnDefine());
}

bool enable_handle_clean_read = filter_delete_mark;
bool enable_del_clean_read = filter_delete_mark;
new_columns_to_read->push_back(getTagColumnDefine());

/// When we read in fast mode, we can try to do the following optimization:
/// 1. Handle Column Optimization:
/// when the columns_to_read does not include HANDLE_COLUMN,
/// we can try to skip reading the handle column if the pack's handle range is fully within read range.
/// Thus, in this case, we set enable_handle_clean_read = true.
/// 2. Del Column Optimization:
/// when the columns_to_read does not include TAG_COLUMN,
/// we can try to skip reading the del column if the pack has no deleted rows.
/// Thus, in this case, we set enable_del_clean_read = true.
/// 3. Version Column Optimization:
breezewish marked this conversation as resolved.
Show resolved Hide resolved
/// if the columns_to_read does not include VERSION_COLUMN,
/// we don't need to read version column, thus we don't force push version column into new_columns_to_read.
hongyunyan marked this conversation as resolved.
Show resolved Hide resolved

bool enable_handle_clean_read = true;
bool enable_del_clean_read = true;

for (const auto & c : columns_to_read)
{
if (c.id != EXTRA_HANDLE_COLUMN_ID)
if (c.id == EXTRA_HANDLE_COLUMN_ID)
{
if (filter_delete_mark && c.id == TAG_COLUMN_ID)
{
enable_del_clean_read = false;
}
else
{
new_columns_to_read->push_back(c);
}
enable_handle_clean_read = false;
}
else if (c.id == TAG_COLUMN_ID)
{
enable_del_clean_read = false;
}
else
{
enable_handle_clean_read = false;
new_columns_to_read->push_back(c);
}
}
Lloyd-Pottiger marked this conversation as resolved.
Show resolved Hide resolved

/// when we read in fast mode, if columns_to_read does not include EXTRA_HANDLE_COLUMN_ID,
/// we can try to use clean read to make optimization in stable part.
/// when the pack is under totally data_ranges and has no rows whose del_mark = 1 --> we don't need read handle_column/tag_column/version_column
/// when the pack is under totally data_ranges and has rows whose del_mark = 1 --> we don't need read handle_column/version_column
/// others --> we don't need read version_column
/// Thus, in fast mode, if we don't need read handle_column, we set enable_handle_clean_read as true.
/// If we don't need read del_column, we set enable_del_clean_read as true
BlockInputStreamPtr stable_stream = segment_snap->stable->getInputStream(
dm_context,
*new_columns_to_read,
Expand All @@ -573,25 +569,69 @@ BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context,
std::numeric_limits<UInt64>::max(),
expected_block_size,
/* enable_handle_clean_read */ enable_handle_clean_read,
/* is_fast_scan */ filter_delete_mark,
/* is_fast_scan */ true,
/* enable_del_clean_read */ enable_del_clean_read);

BlockInputStreamPtr delta_stream = std::make_shared<DeltaValueInputStream>(dm_context, segment_snap->delta, new_columns_to_read, this->rowkey_range);

delta_stream = std::make_shared<DMRowKeyFilterBlockInputStream<false>>(delta_stream, data_ranges, 0);
stable_stream = std::make_shared<DMRowKeyFilterBlockInputStream<true>>(stable_stream, data_ranges, 0);

if (filter_delete_mark)
delta_stream = std::make_shared<DMDeleteFilterBlockInputStream>(delta_stream, columns_to_read, dm_context.tracing_id);
stable_stream = std::make_shared<DMDeleteFilterBlockInputStream>(stable_stream, columns_to_read, dm_context.tracing_id);

BlockInputStreams streams;

if (dm_context.read_delta_only)
{
delta_stream = std::make_shared<DMDeleteFilterBlockInputStream>(delta_stream, columns_to_read, dm_context.tracing_id);
stable_stream = std::make_shared<DMDeleteFilterBlockInputStream>(stable_stream, columns_to_read, dm_context.tracing_id);
streams.push_back(delta_stream);
}
else if (dm_context.read_stable_only)
{
streams.push_back(stable_stream);
}
else
{
delta_stream = std::make_shared<DMColumnProjectionBlockInputStream>(delta_stream, columns_to_read);
stable_stream = std::make_shared<DMColumnProjectionBlockInputStream>(stable_stream, columns_to_read);
streams.push_back(delta_stream);
streams.push_back(stable_stream);
}
return std::make_shared<ConcatBlockInputStream>(streams, dm_context.tracing_id);
}

/// We call getInputStreamRaw in 'selraw xxxx' statement, which is always in test for debug.
/// In this case, we will read all the data without mvcc filtering and sorted merge.
BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context,
const ColumnDefines & columns_to_read,
const SegmentSnapshotPtr & segment_snap,
const RowKeyRanges & data_ranges,
size_t expected_block_size)
{
auto new_columns_to_read = std::make_shared<ColumnDefines>();

new_columns_to_read->push_back(getExtraHandleColumnDefine(is_common_handle));

for (const auto & c : columns_to_read)
{
if (c.id != EXTRA_HANDLE_COLUMN_ID)
new_columns_to_read->push_back(c);
}

BlockInputStreamPtr stable_stream = segment_snap->stable->getInputStream(
dm_context,
*new_columns_to_read,
data_ranges,
EMPTY_FILTER,
std::numeric_limits<UInt64>::max(),
expected_block_size,
/* enable_handle_clean_read */ false);

BlockInputStreamPtr delta_stream = std::make_shared<DeltaValueInputStream>(dm_context, segment_snap->delta, new_columns_to_read, this->rowkey_range);

delta_stream = std::make_shared<DMRowKeyFilterBlockInputStream<false>>(delta_stream, data_ranges, 0);
stable_stream = std::make_shared<DMRowKeyFilterBlockInputStream<true>>(stable_stream, data_ranges, 0);

delta_stream = std::make_shared<DMColumnProjectionBlockInputStream>(delta_stream, columns_to_read);
stable_stream = std::make_shared<DMColumnProjectionBlockInputStream>(stable_stream, columns_to_read);

BlockInputStreams streams;

Expand All @@ -611,12 +651,12 @@ BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context,
return std::make_shared<ConcatBlockInputStream>(streams, dm_context.tracing_id);
}

BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, const ColumnDefines & columns_to_read, bool filter_delete_mark)
BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, const ColumnDefines & columns_to_read)
{
auto segment_snap = createSnapshot(dm_context, false, CurrentMetrics::DT_SnapshotOfReadRaw);
if (!segment_snap)
return {};
return getInputStreamRaw(dm_context, columns_to_read, segment_snap, {rowkey_range}, EMPTY_FILTER, filter_delete_mark);
return getInputStreamRaw(dm_context, columns_to_read, segment_snap, {rowkey_range});
}

SegmentPtr Segment::mergeDelta(DMContext & dm_context, const ColumnDefinesPtr & schema_snap) const
Expand Down
11 changes: 8 additions & 3 deletions dbms/src/Storages/DeltaMerge/Segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,19 +179,24 @@ class Segment : private boost::noncopyable
size_t expected_block_size = DEFAULT_BLOCK_SIZE,
bool reorganize_block = true) const;

BlockInputStreamPtr getInputStreamRaw(
BlockInputStreamPtr getInputStreamFast(
const DMContext & dm_context,
const ColumnDefines & columns_to_read,
const SegmentSnapshotPtr & segment_snap,
const RowKeyRanges & data_ranges,
const RSOperatorPtr & filter,
bool filter_delete_mark = true,
size_t expected_block_size = DEFAULT_BLOCK_SIZE);

BlockInputStreamPtr getInputStreamRaw(
const DMContext & dm_context,
const ColumnDefines & columns_to_read,
bool filter_delete_mark = false);
const SegmentSnapshotPtr & segment_snap,
const RowKeyRanges & data_ranges,
size_t expected_block_size = DEFAULT_BLOCK_SIZE);

BlockInputStreamPtr getInputStreamRaw(
const DMContext & dm_context,
const ColumnDefines & columns_to_read);

/// For those split, merge and mergeDelta methods, we should use prepareXXX/applyXXX combo in real production.
/// split(), merge() and mergeDelta() are only used in test cases.
Expand Down
35 changes: 28 additions & 7 deletions dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,35 @@ BlockInputStreamPtr SegmentReadTaskPool::buildInputStream(SegmentReadTaskPtr & t
MemoryTrackerSetter setter(true, mem_tracker.get());
auto seg = t->segment;
BlockInputStreamPtr stream;
if (is_raw)
auto block_size = std::max(expected_block_size, static_cast<size_t>(dm_context->db_context.getSettingsRef().dt_segment_stable_pack_rows));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code in dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp L103-L131 is similar to dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h L103-L131.

Maybe we can do some refactoring and make code reusable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I try to make a new function for L103-L131 to make code reusable, please take a look.

switch (read_mode)
{
stream = seg->getInputStreamRaw(*dm_context, columns_to_read, t->read_snapshot, t->ranges, filter, do_range_filter_for_raw);
}
else
{
auto block_size = std::max(expected_block_size, static_cast<size_t>(dm_context->db_context.getSettingsRef().dt_segment_stable_pack_rows));
stream = seg->getInputStream(*dm_context, columns_to_read, t->read_snapshot, t->ranges, filter, max_version, block_size);
case Normal:
Lloyd-Pottiger marked this conversation as resolved.
Show resolved Hide resolved
stream = seg->getInputStream(
*dm_context,
columns_to_read,
t->read_snapshot,
t->ranges,
filter,
max_version,
block_size);
break;
case Fast:
stream = seg->getInputStreamFast(
*dm_context,
columns_to_read,
t->read_snapshot,
t->ranges,
filter,
block_size);
break;
case Raw:
stream = seg->getInputStreamRaw(
*dm_context,
columns_to_read,
t->read_snapshot,
t->ranges);
break;
}
LOG_FMT_DEBUG(log, "getInputStream succ, pool_id={} segment_id={}", pool_id, seg->segmentId());
return stream;
Expand Down
Loading