Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make fast scan code mode clean #6058

Merged
merged 15 commits into from
Oct 13, 2022
35 changes: 2 additions & 33 deletions dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,39 +98,8 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
LOG_FMT_DEBUG(log, "Read done");
return {};
}

cur_segment = task->segment;
auto block_size = std::max(expected_block_size, static_cast<size_t>(dm_context->db_context.getSettingsRef().dt_segment_stable_pack_rows));
switch (read_mode)
{
case ReadMode::Normal:
cur_stream = cur_segment->getInputStream(
*dm_context,
columns_to_read,
task->read_snapshot,
task->ranges,
filter,
max_version,
block_size);
break;
case ReadMode::Fast:
cur_stream = cur_segment->getInputStreamFast(
*dm_context,
columns_to_read,
task->read_snapshot,
task->ranges,
filter,
block_size);
break;
case ReadMode::Raw:
cur_stream = cur_segment->getInputStreamRaw(
*dm_context,
columns_to_read,
task->read_snapshot,
task->ranges);
break;
}
LOG_TRACE(log, "Start to read segment, segment={}", cur_segment->simpleInfo());
buildStreamBasedOnReadMode(cur_stream, read_mode, task, dm_context, columns_to_read, filter, max_version, expected_block_size);
LOG_TRACE(log, "Start to read segment, segment={}", task->segment->simpleInfo());
}
FAIL_POINT_PAUSE(FailPoints::pause_when_reading_from_dt_stream);

Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -572,9 +572,11 @@ BlockInputStreamPtr Segment::getInputStreamFast(

BlockInputStreamPtr delta_stream = std::make_shared<DeltaValueInputStream>(dm_context, segment_snap->delta, new_columns_to_read, this->rowkey_range);

// Do row key filtering based on data_ranges.
delta_stream = std::make_shared<DMRowKeyFilterBlockInputStream<false>>(delta_stream, data_ranges, 0);
stable_stream = std::make_shared<DMRowKeyFilterBlockInputStream<true>>(stable_stream, data_ranges, 0);

// Filter the unneeded column and filter out the rows whose del_mark is true.
delta_stream = std::make_shared<DMDeleteFilterBlockInputStream>(delta_stream, columns_to_read, dm_context.tracing_id);
stable_stream = std::make_shared<DMDeleteFilterBlockInputStream>(stable_stream, columns_to_read, dm_context.tracing_id);

Expand Down Expand Up @@ -625,9 +627,11 @@ BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context,

BlockInputStreamPtr delta_stream = std::make_shared<DeltaValueInputStream>(dm_context, segment_snap->delta, new_columns_to_read, this->rowkey_range);

// Do row key filtering based on data_ranges.
delta_stream = std::make_shared<DMRowKeyFilterBlockInputStream<false>>(delta_stream, data_ranges, 0);
stable_stream = std::make_shared<DMRowKeyFilterBlockInputStream<true>>(stable_stream, data_ranges, 0);

// Filter the unneeded columns.
delta_stream = std::make_shared<DMColumnProjectionBlockInputStream>(delta_stream, columns_to_read);
stable_stream = std::make_shared<DMColumnProjectionBlockInputStream>(stable_stream, columns_to_read);

Expand Down
68 changes: 36 additions & 32 deletions dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,40 @@ extern const Metric DT_SegmentReadTasks;

namespace DB::DM
{
void buildStreamBasedOnReadMode(BlockInputStreamPtr & stream, const ReadMode & read_mode, const SegmentReadTaskPtr & task, const DMContextPtr & dm_context, const ColumnDefines & columns_to_read, const RSOperatorPtr & filter, const uint64_t max_version, const size_t expected_block_size)
{
auto block_size = std::max(expected_block_size, static_cast<size_t>(dm_context->db_context.getSettingsRef().dt_segment_stable_pack_rows));
switch (read_mode)
{
case ReadMode::Normal:
stream = task->segment->getInputStream(
*dm_context,
columns_to_read,
task->read_snapshot,
task->ranges,
filter,
max_version,
block_size);
break;
case ReadMode::Fast:
stream = task->segment->getInputStreamFast(
*dm_context,
columns_to_read,
task->read_snapshot,
task->ranges,
filter,
block_size);
break;
case ReadMode::Raw:
stream = task->segment->getInputStreamRaw(
*dm_context,
columns_to_read,
task->read_snapshot,
task->ranges);
break;
}
}

SegmentReadTask::SegmentReadTask(const SegmentPtr & segment_, //
const SegmentSnapshotPtr & read_snapshot_,
const RowKeyRanges & ranges_)
Expand Down Expand Up @@ -98,39 +132,9 @@ SegmentReadTasks SegmentReadTask::trySplitReadTasks(const SegmentReadTasks & tas
BlockInputStreamPtr SegmentReadTaskPool::buildInputStream(SegmentReadTaskPtr & t)
{
MemoryTrackerSetter setter(true, mem_tracker.get());
auto seg = t->segment;
BlockInputStreamPtr stream;
auto block_size = std::max(expected_block_size, static_cast<size_t>(dm_context->db_context.getSettingsRef().dt_segment_stable_pack_rows));
switch (read_mode)
{
case ReadMode::Normal:
stream = seg->getInputStream(
*dm_context,
columns_to_read,
t->read_snapshot,
t->ranges,
filter,
max_version,
block_size);
break;
case ReadMode::Fast:
stream = seg->getInputStreamFast(
*dm_context,
columns_to_read,
t->read_snapshot,
t->ranges,
filter,
block_size);
break;
case ReadMode::Raw:
stream = seg->getInputStreamRaw(
*dm_context,
columns_to_read,
t->read_snapshot,
t->ranges);
break;
}
LOG_FMT_DEBUG(log, "getInputStream succ, pool_id={} segment_id={}", pool_id, seg->segmentId());
buildStreamBasedOnReadMode(stream, read_mode, t, dm_context, columns_to_read, filter, max_version, expected_block_size);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about this?

  • getInputStream(Mode, ...)
  • getInputStreamModeNormal(...)
  • getInputStreamModeFast(...)
  • getInputStreamModeRaw(...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion! I think getInputStream(Mode, ...) in class Segment is a better way to make the code reuseable and clean.

LOG_FMT_DEBUG(log, "getInputStream succ, pool_id={} segment_id={}", pool_id, t->segment->segmentId());
return stream;
}

Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include <Storages/DeltaMerge/ReadThread/WorkQueue.h>
#include <Storages/DeltaMerge/RowKeyRangeUtils.h>

#include "Storages/DeltaMerge/Segment.h"

namespace DB
{
namespace DM
Expand Down Expand Up @@ -123,14 +125,15 @@ enum class ReadMode
* will be still filtered out.
*/
Fast,

/**
* Read in raw mode, for example, for statements like `SELRAW *`. In raw mode, data is not sort merged and all versions
* are just returned.
*/
Raw,
};

void buildStreamBasedOnReadMode(BlockInputStreamPtr & stream, const ReadMode & read_mode, const SegmentReadTaskPtr & task, const DMContextPtr & dm_context, const ColumnDefines & columns_to_read, const RSOperatorPtr & filter, const uint64_t max_version, const size_t expected_block_size);
class SegmentReadTaskPool : private boost::noncopyable
{
public:
Expand Down